是否有可能终止一个正在运行的线程而不设置/检查任何标志/信号/等等?


当前回答

Pieter Hintjens (ØMQ-project的创始人之一)表示,使用ØMQ并避免使用锁、互斥对象、事件等同步原语,是编写多线程程序最明智、最安全的方法:

http://zguide.zeromq.org/py:all#Multithreading-with-ZeroMQ

这包括告诉子线程,它应该取消它的工作。这可以通过为线程配备ØMQ-socket并轮询该套接字以获得一条表示它应该取消的消息来实现。

该链接还提供了一个使用ØMQ的多线程python代码示例。

其他回答

这里还有另一种方法,但代码非常干净和简单,适用于2021年的Python 3.7:

import ctypes 

def kill_thread(thread):
    """
    thread: a threading.Thread object
    """
    thread_id = thread.ident
    res = ctypes.pythonapi.PyThreadState_SetAsyncExc(thread_id, ctypes.py_object(SystemExit))
    if res > 1:
        ctypes.pythonapi.PyThreadState_SetAsyncExc(thread_id, 0)
        print('Exception raise failure')

改编自:https://www.geeksforgeeks.org/python-different-ways-to-kill-a-thread/

有一个图书馆就是为了这个目的而建的,别说了。尽管这里列出的一些注意事项仍然适用,但至少这个库提供了一种常规的、可重复的技术来实现既定的目标。

正如@Kozyarchuk的回答中提到的,安装跟踪工作。由于这个答案不包含代码,下面是一个工作就绪的示例:

import sys, threading, time 

class TraceThread(threading.Thread): 
    def __init__(self, *args, **keywords): 
        threading.Thread.__init__(self, *args, **keywords) 
        self.killed = False
    def start(self): 
        self._run = self.run 
        self.run = self.settrace_and_run
        threading.Thread.start(self) 
    def settrace_and_run(self): 
        sys.settrace(self.globaltrace) 
        self._run()
    def globaltrace(self, frame, event, arg): 
        return self.localtrace if event == 'call' else None
    def localtrace(self, frame, event, arg): 
        if self.killed and event == 'line': 
            raise SystemExit() 
        return self.localtrace 

def f(): 
    while True: 
        print('1') 
        time.sleep(2)
        print('2') 
        time.sleep(2)
        print('3') 
        time.sleep(2)

t = TraceThread(target=f) 
t.start() 
time.sleep(2.5) 
t.killed = True

它在输出1和2之后停止。3不打印。

可以通过在将退出线程的线程中安装trace来终止线程。请参阅所附的链接,了解一种可能的实现。

在Python中杀死一个线程

实现一个线程是绝对可能的。方法,如下例代码所示:

import sys
import threading
import time


class StopThread(StopIteration):
    pass

threading.SystemExit = SystemExit, StopThread


class Thread2(threading.Thread):

    def stop(self):
        self.__stop = True

    def _bootstrap(self):
        if threading._trace_hook is not None:
            raise ValueError('Cannot run thread with tracing!')
        self.__stop = False
        sys.settrace(self.__trace)
        super()._bootstrap()

    def __trace(self, frame, event, arg):
        if self.__stop:
            raise StopThread()
        return self.__trace


class Thread3(threading.Thread):

    def _bootstrap(self, stop_thread=False):
        def stop():
            nonlocal stop_thread
            stop_thread = True
        self.stop = stop

        def tracer(*_):
            if stop_thread:
                raise StopThread()
            return tracer
        sys.settrace(tracer)
        super()._bootstrap()

###############################################################################


def main():
    test1 = Thread2(target=printer)
    test1.start()
    time.sleep(1)
    test1.stop()
    test1.join()
    test2 = Thread2(target=speed_test)
    test2.start()
    time.sleep(1)
    test2.stop()
    test2.join()
    test3 = Thread3(target=speed_test)
    test3.start()
    time.sleep(1)
    test3.stop()
    test3.join()


def printer():
    while True:
        print(time.time() % 1)
        time.sleep(0.1)


def speed_test(count=0):
    try:
        while True:
            count += 1
    except StopThread:
        print('Count =', count)

if __name__ == '__main__':
    main()

Thread3类运行代码的速度似乎比Thread2类快大约33%。