是否有可能终止一个正在运行的线程而不设置/检查任何标志/信号/等等?


当前回答

有一个图书馆就是为了这个目的而建的,别说了。尽管这里列出的一些注意事项仍然适用,但至少这个库提供了一种常规的、可重复的技术来实现既定的目标。

其他回答

如果您确实需要终止子任务的能力,请使用另一种实现。Multiprocessing和gevent都支持不加选择地杀死一个“线程”。

Python的线程不支持取消。不要尝试。你的代码很可能死锁、损坏或泄漏内存,或者有其他意想不到的“有趣的”难以调试的效果,这种情况很少发生,而且不确定。

多处理。进程可以p.terminate()

如果我想杀死一个线程,但不想使用标志/锁/信号/信号量/事件/任何东西,我就把线程提升到完整的进程。对于只使用几个线程的代码,开销并没有那么糟糕。

例如,这可以方便地终止执行阻塞I/O的助手“线程”

转换很简单:在相关代码中替换所有线程。多线程线程。进程和所有队列。多处理队列。排队并将p.t terminate()所需的调用添加到想要杀死子进程p的父进程中

关于多处理,请参阅Python文档。

例子:

import multiprocessing
proc = multiprocessing.Process(target=your_proc_function, args=())
proc.start()
# Terminate the process
proc.terminate()  # sends a SIGTERM

实现一个线程是绝对可能的。方法,如下例代码所示:

import sys
import threading
import time


class StopThread(StopIteration):
    pass

threading.SystemExit = SystemExit, StopThread


class Thread2(threading.Thread):

    def stop(self):
        self.__stop = True

    def _bootstrap(self):
        if threading._trace_hook is not None:
            raise ValueError('Cannot run thread with tracing!')
        self.__stop = False
        sys.settrace(self.__trace)
        super()._bootstrap()

    def __trace(self, frame, event, arg):
        if self.__stop:
            raise StopThread()
        return self.__trace


class Thread3(threading.Thread):

    def _bootstrap(self, stop_thread=False):
        def stop():
            nonlocal stop_thread
            stop_thread = True
        self.stop = stop

        def tracer(*_):
            if stop_thread:
                raise StopThread()
            return tracer
        sys.settrace(tracer)
        super()._bootstrap()

###############################################################################


def main():
    test1 = Thread2(target=printer)
    test1.start()
    time.sleep(1)
    test1.stop()
    test1.join()
    test2 = Thread2(target=speed_test)
    test2.start()
    time.sleep(1)
    test2.stop()
    test2.join()
    test3 = Thread3(target=speed_test)
    test3.start()
    time.sleep(1)
    test3.stop()
    test3.join()


def printer():
    while True:
        print(time.time() % 1)
        time.sleep(0.1)


def speed_test(count=0):
    try:
        while True:
            count += 1
    except StopThread:
        print('Count =', count)

if __name__ == '__main__':
    main()

Thread3类运行代码的速度似乎比Thread2类快大约33%。

我对这个游戏已经很晚了,但我一直在与一个类似的问题作斗争,下面的内容似乎为我完美地解决了这个问题,并且让我在守护子线程退出时做一些基本的线程状态检查和清理:

import threading
import time
import atexit

def do_work():

  i = 0
  @atexit.register
  def goodbye():
    print ("'CLEANLY' kill sub-thread with value: %s [THREAD: %s]" %
           (i, threading.currentThread().ident))

  while True:
    print i
    i += 1
    time.sleep(1)

t = threading.Thread(target=do_work)
t.daemon = True
t.start()

def after_timeout():
  print "KILL MAIN THREAD: %s" % threading.currentThread().ident
  raise SystemExit

threading.Timer(2, after_timeout).start()

收益率:

0
1
KILL MAIN THREAD: 140013208254208
'CLEANLY' kill sub-thread with value: 2 [THREAD: 140013674317568]

正如@Kozyarchuk的回答中提到的,安装跟踪工作。由于这个答案不包含代码,下面是一个工作就绪的示例:

import sys, threading, time 

class TraceThread(threading.Thread): 
    def __init__(self, *args, **keywords): 
        threading.Thread.__init__(self, *args, **keywords) 
        self.killed = False
    def start(self): 
        self._run = self.run 
        self.run = self.settrace_and_run
        threading.Thread.start(self) 
    def settrace_and_run(self): 
        sys.settrace(self.globaltrace) 
        self._run()
    def globaltrace(self, frame, event, arg): 
        return self.localtrace if event == 'call' else None
    def localtrace(self, frame, event, arg): 
        if self.killed and event == 'line': 
            raise SystemExit() 
        return self.localtrace 

def f(): 
    while True: 
        print('1') 
        time.sleep(2)
        print('2') 
        time.sleep(2)
        print('3') 
        time.sleep(2)

t = TraceThread(target=f) 
t.start() 
time.sleep(2.5) 
t.killed = True

它在输出1和2之后停止。3不打印。