是否有可能终止一个正在运行的线程而不设置/检查任何标志/信号/等等?


当前回答

Python版本:3.8

使用守护线程来执行我们想要的,如果我们想要终止守护线程,我们只需要让父线程退出,然后系统就会终止父线程创建的守护线程。

还支持协程和协程函数。

def main():
    start_time = time.perf_counter()
    t1 = ExitThread(time.sleep, (10,), debug=False)
    t1.start()
    time.sleep(0.5)
    t1.exit()
    try:
        print(t1.result_future.result())
    except concurrent.futures.CancelledError:
        pass
    end_time = time.perf_counter()
    print(f"time cost {end_time - start_time:0.2f}")

下面是ExitThread源代码

import concurrent.futures
import threading
import typing
import asyncio


class _WorkItem(object):
    """ concurrent\futures\thread.py

    """

    def __init__(self, future, fn, args, kwargs, *, debug=None):
        self._debug = debug
        self.future = future
        self.fn = fn
        self.args = args
        self.kwargs = kwargs

    def run(self):
        if self._debug:
            print("ExitThread._WorkItem run")
        if not self.future.set_running_or_notify_cancel():
            return

        try:
            coroutine = None
            if asyncio.iscoroutinefunction(self.fn):
                coroutine = self.fn(*self.args, **self.kwargs)
            elif asyncio.iscoroutine(self.fn):
                coroutine = self.fn
            if coroutine is None:
                result = self.fn(*self.args, **self.kwargs)
            else:
                result = asyncio.run(coroutine)
            if self._debug:
                print("_WorkItem done")
        except BaseException as exc:
            self.future.set_exception(exc)
            # Break a reference cycle with the exception 'exc'
            self = None
        else:
            self.future.set_result(result)


class ExitThread:
    """ Like a stoppable thread

    Using coroutine for target then exit before running may cause RuntimeWarning.

    """

    def __init__(self, target: typing.Union[typing.Coroutine, typing.Callable] = None
                 , args=(), kwargs={}, *, daemon=None, debug=None):
        #
        self._debug = debug
        self._parent_thread = threading.Thread(target=self._parent_thread_run, name="ExitThread_parent_thread"
                                               , daemon=daemon)
        self._child_daemon_thread = None
        self.result_future = concurrent.futures.Future()
        self._workItem = _WorkItem(self.result_future, target, args, kwargs, debug=debug)
        self._parent_thread_exit_lock = threading.Lock()
        self._parent_thread_exit_lock.acquire()
        self._parent_thread_exit_lock_released = False  # When done it will be True
        self._started = False
        self._exited = False
        self.result_future.add_done_callback(self._release_parent_thread_exit_lock)

    def _parent_thread_run(self):
        self._child_daemon_thread = threading.Thread(target=self._child_daemon_thread_run
                                                     , name="ExitThread_child_daemon_thread"
                                                     , daemon=True)
        self._child_daemon_thread.start()
        # Block manager thread
        self._parent_thread_exit_lock.acquire()
        self._parent_thread_exit_lock.release()
        if self._debug:
            print("ExitThread._parent_thread_run exit")

    def _release_parent_thread_exit_lock(self, _future):
        if self._debug:
            print(f"ExitThread._release_parent_thread_exit_lock {self._parent_thread_exit_lock_released} {_future}")
        if not self._parent_thread_exit_lock_released:
            self._parent_thread_exit_lock_released = True
            self._parent_thread_exit_lock.release()

    def _child_daemon_thread_run(self):
        self._workItem.run()

    def start(self):
        if self._debug:
            print(f"ExitThread.start {self._started}")
        if not self._started:
            self._started = True
            self._parent_thread.start()

    def exit(self):
        if self._debug:
            print(f"ExitThread.exit exited: {self._exited} lock_released: {self._parent_thread_exit_lock_released}")
        if self._parent_thread_exit_lock_released:
            return
        if not self._exited:
            self._exited = True
            if not self.result_future.cancel():
                if self.result_future.running():
                    self.result_future.set_exception(concurrent.futures.CancelledError())

其他回答

这是基于thread2—可杀死线程ActiveState配方。

您需要调用PyThreadState_SetAsyncExc(),它只能通过ctypes模块使用。

这只在Python 2.7.3上进行了测试,但它很可能与最近的其他2一起工作。x版本。PyThreadState_SetAsyncExc()仍然存在于Python 3中以实现向后兼容性(但我还没有测试它)。

import ctypes

def terminate_thread(thread):
    """Terminates a python thread from another thread.

    :param thread: a threading.Thread instance
    """
    if not thread.isAlive():
        return

    exc = ctypes.py_object(SystemExit)
    res = ctypes.pythonapi.PyThreadState_SetAsyncExc(
        ctypes.c_long(thread.ident), exc)
    if res == 0:
        raise ValueError("nonexistent thread id")
    elif res > 1:
        # """if it returns a number greater than one, you're in trouble,
        # and you should call it again with exc=NULL to revert the effect"""
        ctypes.pythonapi.PyThreadState_SetAsyncExc(thread.ident, None)
        raise SystemError("PyThreadState_SetAsyncExc failed")

没有官方API来做这个,没有。

你需要使用平台API来杀死线程,例如pthread_kill或TerminateThread。你可以访问这样的API,例如通过pythonwin,或通过ctypes。

注意,这本质上是不安全的。这可能会导致不可收集的垃圾(来自堆栈帧的局部变量,成为垃圾),并可能导致死锁,如果被杀死的线程在被杀死时具有GIL。

你不应该在没有与线程合作的情况下强行终止线程。

杀死一个线程消除了try/finally阻塞设置的任何保证,所以你可能会让锁锁定,文件打开等等。

唯一可以认为强制终止线程是一个好主意的情况是快速终止程序,但绝不是单个线程。

只是建立在@SCB的想法(这正是我所需要的),创建一个KillableThread子类与自定义函数:

from threading import Thread, Event

class KillableThread(Thread):
    def __init__(self, sleep_interval=1, target=None, name=None, args=(), kwargs={}):
        super().__init__(None, target, name, args, kwargs)
        self._kill = Event()
        self._interval = sleep_interval
        print(self._target)

    def run(self):
        while True:
            # Call custom function with arguments
            self._target(*self._args)

            # If no kill signal is set, sleep for the interval,
            # If kill signal comes in while sleeping, immediately
            #  wake up and handle
            is_killed = self._kill.wait(self._interval)
            if is_killed:
                break

        print("Killing Thread")

    def kill(self):
        self._kill.set()

if __name__ == '__main__':

    def print_msg(msg):
        print(msg)

    t = KillableThread(10, print_msg, args=("hello world"))
    t.start()
    time.sleep(6)
    print("About to kill thread")
    t.kill()

自然地,就像@SBC一样,线程不会等待运行一个新的循环来停止。在这个例子中,你会看到“kill Thread”消息紧跟在“About to kill Thread”之后,而不是等待4秒钟线程完成(因为我们已经睡了6秒了)。

KillableThread构造函数中的第二个参数是您的自定义函数(print_msg)。Args参数是在调用函数(("hello world"))时使用的参数。

这似乎与windows 7上的pywin32一起工作

my_thread = threading.Thread()
my_thread.start()
my_thread._Thread__stop()