是否有可能终止一个正在运行的线程而不设置/检查任何标志/信号/等等?


当前回答

正如其他人所提到的,规范是设置一个停止标志。对于一些轻量级的东西(没有Thread的子类化,没有全局变量),lambda回调是一个选项。(注意if stop()中的括号。)

import threading
import time

def do_work(id, stop):
    print("I am thread", id)
    while True:
        print("I am thread {} doing something".format(id))
        if stop():
            print("  Exiting loop.")
            break
    print("Thread {}, signing off".format(id))


def main():
    stop_threads = False
    workers = []
    for id in range(0,3):
        tmp = threading.Thread(target=do_work, args=(id, lambda: stop_threads))
        workers.append(tmp)
        tmp.start()
    time.sleep(3)
    print('main: done sleeping; time to stop the threads.')
    stop_threads = True
    for worker in workers:
        worker.join()
    print('Finis.')

if __name__ == '__main__':
    main()

将print()替换为始终刷新的pr()函数(sys.stdout.flush())可以提高shell输出的精度。

(仅在Windows/Eclipse/Python3.3上测试)

其他回答

没有官方API来做这个,没有。

你需要使用平台API来杀死线程,例如pthread_kill或TerminateThread。你可以访问这样的API,例如通过pythonwin,或通过ctypes。

注意,这本质上是不安全的。这可能会导致不可收集的垃圾(来自堆栈帧的局部变量,成为垃圾),并可能导致死锁,如果被杀死的线程在被杀死时具有GIL。

这是基于thread2—可杀死线程ActiveState配方。

您需要调用PyThreadState_SetAsyncExc(),它只能通过ctypes模块使用。

这只在Python 2.7.3上进行了测试,但它很可能与最近的其他2一起工作。x版本。PyThreadState_SetAsyncExc()仍然存在于Python 3中以实现向后兼容性(但我还没有测试它)。

import ctypes

def terminate_thread(thread):
    """Terminates a python thread from another thread.

    :param thread: a threading.Thread instance
    """
    if not thread.isAlive():
        return

    exc = ctypes.py_object(SystemExit)
    res = ctypes.pythonapi.PyThreadState_SetAsyncExc(
        ctypes.c_long(thread.ident), exc)
    if res == 0:
        raise ValueError("nonexistent thread id")
    elif res > 1:
        # """if it returns a number greater than one, you're in trouble,
        # and you should call it again with exc=NULL to revert the effect"""
        ctypes.pythonapi.PyThreadState_SetAsyncExc(thread.ident, None)
        raise SystemError("PyThreadState_SetAsyncExc failed")

Python版本:3.8

使用守护线程来执行我们想要的,如果我们想要终止守护线程,我们只需要让父线程退出,然后系统就会终止父线程创建的守护线程。

还支持协程和协程函数。

def main():
    start_time = time.perf_counter()
    t1 = ExitThread(time.sleep, (10,), debug=False)
    t1.start()
    time.sleep(0.5)
    t1.exit()
    try:
        print(t1.result_future.result())
    except concurrent.futures.CancelledError:
        pass
    end_time = time.perf_counter()
    print(f"time cost {end_time - start_time:0.2f}")

下面是ExitThread源代码

import concurrent.futures
import threading
import typing
import asyncio


class _WorkItem(object):
    """ concurrent\futures\thread.py

    """

    def __init__(self, future, fn, args, kwargs, *, debug=None):
        self._debug = debug
        self.future = future
        self.fn = fn
        self.args = args
        self.kwargs = kwargs

    def run(self):
        if self._debug:
            print("ExitThread._WorkItem run")
        if not self.future.set_running_or_notify_cancel():
            return

        try:
            coroutine = None
            if asyncio.iscoroutinefunction(self.fn):
                coroutine = self.fn(*self.args, **self.kwargs)
            elif asyncio.iscoroutine(self.fn):
                coroutine = self.fn
            if coroutine is None:
                result = self.fn(*self.args, **self.kwargs)
            else:
                result = asyncio.run(coroutine)
            if self._debug:
                print("_WorkItem done")
        except BaseException as exc:
            self.future.set_exception(exc)
            # Break a reference cycle with the exception 'exc'
            self = None
        else:
            self.future.set_result(result)


class ExitThread:
    """ Like a stoppable thread

    Using coroutine for target then exit before running may cause RuntimeWarning.

    """

    def __init__(self, target: typing.Union[typing.Coroutine, typing.Callable] = None
                 , args=(), kwargs={}, *, daemon=None, debug=None):
        #
        self._debug = debug
        self._parent_thread = threading.Thread(target=self._parent_thread_run, name="ExitThread_parent_thread"
                                               , daemon=daemon)
        self._child_daemon_thread = None
        self.result_future = concurrent.futures.Future()
        self._workItem = _WorkItem(self.result_future, target, args, kwargs, debug=debug)
        self._parent_thread_exit_lock = threading.Lock()
        self._parent_thread_exit_lock.acquire()
        self._parent_thread_exit_lock_released = False  # When done it will be True
        self._started = False
        self._exited = False
        self.result_future.add_done_callback(self._release_parent_thread_exit_lock)

    def _parent_thread_run(self):
        self._child_daemon_thread = threading.Thread(target=self._child_daemon_thread_run
                                                     , name="ExitThread_child_daemon_thread"
                                                     , daemon=True)
        self._child_daemon_thread.start()
        # Block manager thread
        self._parent_thread_exit_lock.acquire()
        self._parent_thread_exit_lock.release()
        if self._debug:
            print("ExitThread._parent_thread_run exit")

    def _release_parent_thread_exit_lock(self, _future):
        if self._debug:
            print(f"ExitThread._release_parent_thread_exit_lock {self._parent_thread_exit_lock_released} {_future}")
        if not self._parent_thread_exit_lock_released:
            self._parent_thread_exit_lock_released = True
            self._parent_thread_exit_lock.release()

    def _child_daemon_thread_run(self):
        self._workItem.run()

    def start(self):
        if self._debug:
            print(f"ExitThread.start {self._started}")
        if not self._started:
            self._started = True
            self._parent_thread.start()

    def exit(self):
        if self._debug:
            print(f"ExitThread.exit exited: {self._exited} lock_released: {self._parent_thread_exit_lock_released}")
        if self._parent_thread_exit_lock_released:
            return
        if not self._exited:
            self._exited = True
            if not self.result_future.cancel():
                if self.result_future.running():
                    self.result_future.set_exception(concurrent.futures.CancelledError())

正如@Kozyarchuk的回答中提到的,安装跟踪工作。由于这个答案不包含代码,下面是一个工作就绪的示例:

import sys, threading, time 

class TraceThread(threading.Thread): 
    def __init__(self, *args, **keywords): 
        threading.Thread.__init__(self, *args, **keywords) 
        self.killed = False
    def start(self): 
        self._run = self.run 
        self.run = self.settrace_and_run
        threading.Thread.start(self) 
    def settrace_and_run(self): 
        sys.settrace(self.globaltrace) 
        self._run()
    def globaltrace(self, frame, event, arg): 
        return self.localtrace if event == 'call' else None
    def localtrace(self, frame, event, arg): 
        if self.killed and event == 'line': 
            raise SystemExit() 
        return self.localtrace 

def f(): 
    while True: 
        print('1') 
        time.sleep(2)
        print('2') 
        time.sleep(2)
        print('3') 
        time.sleep(2)

t = TraceThread(target=f) 
t.start() 
time.sleep(2.5) 
t.killed = True

它在输出1和2之后停止。3不打印。

我想补充的一件事是,如果你阅读threading lib Python的官方文档,建议避免使用“恶魔”线程,当你不希望线程突然结束时,带有Paolo Rovelli提到的标志。

来自官方文件:

守护进程线程在关机时突然停止。它们的资源(如打开的文件、数据库事务等)可能无法正确释放。如果您希望线程优雅地停止,请将它们设置为非守护进程,并使用适当的信号机制(如Event)。

我认为创建守护线程取决于您的应用程序,但通常(在我看来)最好避免杀死它们或使它们成为守护线程。在多处理中,您可以使用is_alive()来检查进程状态,并使用“terminate”来完成它们(也可以避免GIL问题)。但有时,当你在Windows中执行代码时,你会发现更多的问题。

并且永远记住,如果你有“活动线程”,Python解释器将运行等待它们。(因为这个守护程序可以帮助你如果不重要的事情突然结束)。