我知道pydispatcher,但是Python周围一定有其他与事件相关的包。

哪些库是可用的?

我对作为大型框架一部分的事件管理器不感兴趣,我宁愿使用可以轻松扩展的小型骨架解决方案。


当前回答

我创建了一个EventManager类(代码在最后)。语法如下:

#Create an event with no listeners assigned to it
EventManager.addEvent( eventName = [] )

#Create an event with listeners assigned to it
EventManager.addEvent( eventName = [fun1, fun2,...] )

#Create any number event with listeners assigned to them
EventManager.addEvent( eventName1 = [e1fun1, e1fun2,...], eventName2 = [e2fun1, e2fun2,...], ... )

#Add or remove listener to an existing event
EventManager.eventName += extra_fun
EventManager.eventName -= removed_fun

#Delete an event
del EventManager.eventName

#Fire the event
EventManager.eventName()

下面是一个例子:

def hello(name):
    print "Hello {}".format(name)
    
def greetings(name):
    print "Greetings {}".format(name)

EventManager.addEvent( salute = [greetings] )
EventManager.salute += hello

print "\nInitial salute"
EventManager.salute('Oscar')

print "\nNow remove greetings"
EventManager.salute -= greetings
EventManager.salute('Oscar')

输出:

最初的敬礼 问候奥斯卡 你好奥斯卡 现在删除问候语 你好奥斯卡

事件经理代码:

class EventManager:
    
    class Event:
        def __init__(self,functions):
            if type(functions) is not list:
                raise ValueError("functions parameter has to be a list")
            self.functions = functions
            
        def __iadd__(self,func):
            self.functions.append(func)
            return self
            
        def __isub__(self,func):
            self.functions.remove(func)
            return self
            
        def __call__(self,*args,**kvargs):
            for func in self.functions : func(*args,**kvargs)
            
    @classmethod
    def addEvent(cls,**kvargs):
        """
        addEvent( event1 = [f1,f2,...], event2 = [g1,g2,...], ... )
        creates events using **kvargs to create any number of events. Each event recieves a list of functions,
        where every function in the list recieves the same parameters.
        
        Example:
        
        def hello(): print "Hello ",
        def world(): print "World"
        
        EventManager.addEvent( salute = [hello] )
        EventManager.salute += world
        
        EventManager.salute()
        
        Output:
        Hello World
        """
        for key in kvargs.keys():
            if type(kvargs[key]) is not list:
                raise ValueError("value has to be a list")
            else:
                kvargs[key] = cls.Event(kvargs[key])
        
        cls.__dict__.update(kvargs)

其他回答

我在《宝贵的课程》上找到了这个小脚本。它似乎具有我所追求的简单性/功率比。Peter Thatcher是以下代码的作者(未提及许可)。

class Event:
    def __init__(self):
        self.handlers = set()

    def handle(self, handler):
        self.handlers.add(handler)
        return self

    def unhandle(self, handler):
        try:
            self.handlers.remove(handler)
        except:
            raise ValueError("Handler is not handling this event, so cannot unhandle it.")
        return self

    def fire(self, *args, **kargs):
        for handler in self.handlers:
            handler(*args, **kargs)

    def getHandlerCount(self):
        return len(self.handlers)

    __iadd__ = handle
    __isub__ = unhandle
    __call__ = fire
    __len__  = getHandlerCount

class MockFileWatcher:
    def __init__(self):
        self.fileChanged = Event()

    def watchFiles(self):
        source_path = "foo"
        self.fileChanged(source_path)

def log_file_change(source_path):
    print "%r changed." % (source_path,)

def log_file_change2(source_path):
    print "%r changed!" % (source_path,)

watcher              = MockFileWatcher()
watcher.fileChanged += log_file_change2
watcher.fileChanged += log_file_change
watcher.fileChanged -= log_file_change2
watcher.watchFiles()

如果你需要一个跨进程或网络边界的事件总线,你可以尝试PyMQ。它目前支持发布/订阅、消息队列和同步RPC。默认版本工作在一个Redis后端,所以你需要一个运行的Redis服务器。还有一个用于测试的内存后端。您还可以编写自己的后端。

import pymq

# common code
class MyEvent:
    pass

# subscribe code
@pymq.subscriber
def on_event(event: MyEvent):
    print('event received')

# publisher code
pymq.publish(MyEvent())

# you can also customize channels
pymq.subscribe(on_event, channel='my_channel')
pymq.publish(MyEvent(), channel='my_channel')

初始化系统。

from pymq.provider.redis import RedisConfig

# starts a new thread with a Redis event loop
pymq.init(RedisConfig())

# main application control loop

pymq.shutdown()

声明:我是这个库的作者

另一个方便的包是事件。它将核心封装到事件订阅和事件触发,感觉像是语言的“自然”部分。它似乎类似于c#语言,后者提供了一种方便的方式来声明、订阅和触发事件。从技术上讲,事件是一个“槽”,可以将回调函数(事件处理程序)附加到其中—称为订阅事件的过程。

# Define a callback function
def something_changed(reason):
    print "something changed because %s" % reason

# Use events module to create an event and register one or more callback functions
from events import Events
events = Events()
events.on_change += something_changed

触发事件时,将按顺序调用所有附加的事件处理程序。要触发该事件,请对槽位执行调用:

events.on_change('it had to happen')

这将输出:

'something changed because it had to happen'

更多的文档可以在github repo或文档中找到。

我创建了一个EventManager类(代码在最后)。语法如下:

#Create an event with no listeners assigned to it
EventManager.addEvent( eventName = [] )

#Create an event with listeners assigned to it
EventManager.addEvent( eventName = [fun1, fun2,...] )

#Create any number event with listeners assigned to them
EventManager.addEvent( eventName1 = [e1fun1, e1fun2,...], eventName2 = [e2fun1, e2fun2,...], ... )

#Add or remove listener to an existing event
EventManager.eventName += extra_fun
EventManager.eventName -= removed_fun

#Delete an event
del EventManager.eventName

#Fire the event
EventManager.eventName()

下面是一个例子:

def hello(name):
    print "Hello {}".format(name)
    
def greetings(name):
    print "Greetings {}".format(name)

EventManager.addEvent( salute = [greetings] )
EventManager.salute += hello

print "\nInitial salute"
EventManager.salute('Oscar')

print "\nNow remove greetings"
EventManager.salute -= greetings
EventManager.salute('Oscar')

输出:

最初的敬礼 问候奥斯卡 你好奥斯卡 现在删除问候语 你好奥斯卡

事件经理代码:

class EventManager:
    
    class Event:
        def __init__(self,functions):
            if type(functions) is not list:
                raise ValueError("functions parameter has to be a list")
            self.functions = functions
            
        def __iadd__(self,func):
            self.functions.append(func)
            return self
            
        def __isub__(self,func):
            self.functions.remove(func)
            return self
            
        def __call__(self,*args,**kvargs):
            for func in self.functions : func(*args,**kvargs)
            
    @classmethod
    def addEvent(cls,**kvargs):
        """
        addEvent( event1 = [f1,f2,...], event2 = [g1,g2,...], ... )
        creates events using **kvargs to create any number of events. Each event recieves a list of functions,
        where every function in the list recieves the same parameters.
        
        Example:
        
        def hello(): print "Hello ",
        def world(): print "World"
        
        EventManager.addEvent( salute = [hello] )
        EventManager.salute += world
        
        EventManager.salute()
        
        Output:
        Hello World
        """
        for key in kvargs.keys():
            if type(kvargs[key]) is not list:
                raise ValueError("value has to be a list")
            else:
                kvargs[key] = cls.Event(kvargs[key])
        
        cls.__dict__.update(kvargs)

你可以试试公交模块。

这个库使基于消息的系统的实现更容易。它支持命令(单个处理程序)和事件(0或多个处理程序)方法。Buslane使用Python类型注释来正确注册处理程序。

简单的例子:

from dataclasses import dataclass

from buslane.commands import Command, CommandHandler, CommandBus


@dataclass(frozen=True)
class RegisterUserCommand(Command):
    email: str
    password: str


class RegisterUserCommandHandler(CommandHandler[RegisterUserCommand]):

    def handle(self, command: RegisterUserCommand) -> None:
        assert command == RegisterUserCommand(
            email='john@lennon.com',
            password='secret',
        )


command_bus = CommandBus()
command_bus.register(handler=RegisterUserCommandHandler())
command_bus.execute(command=RegisterUserCommand(
    email='john@lennon.com',
    password='secret',
))

安装巴士道,只需使用pip:

$ pip install buslane