我知道pydispatcher,但是Python周围一定有其他与事件相关的包。
哪些库是可用的?
我对作为大型框架一部分的事件管理器不感兴趣,我宁愿使用可以轻松扩展的小型骨架解决方案。
我知道pydispatcher,但是Python周围一定有其他与事件相关的包。
哪些库是可用的?
我对作为大型框架一部分的事件管理器不感兴趣,我宁愿使用可以轻松扩展的小型骨架解决方案。
当前回答
我使用zope.event。这是你能想象到的最简单的骨架。: -) 事实上,这里是完整的源代码:
subscribers = []
def notify(event):
for subscriber in subscribers:
subscriber(event)
例如,请注意不能在进程之间发送消息。它不是一个消息系统,只是一个事件系统,仅此而已。
其他回答
我使用zope.event。这是你能想象到的最简单的骨架。: -) 事实上,这里是完整的源代码:
subscribers = []
def notify(event):
for subscriber in subscribers:
subscriber(event)
例如,请注意不能在进程之间发送消息。它不是一个消息系统,只是一个事件系统,仅此而已。
PyPI packages
截至2022年10月,这些是PyPI上可用的与事件相关的软件包, 按最近发布日期订购。
PyDispatcher 2.0.6: Aug 2022 眨眼器1.5:2022年6月 pymitter 0.4.0: 2022年6月 python-dispatch 0.2.0: 2022年4月 插件1.0.0:2021年8月 事件0.4:2020年10月 zope。事件4.5.0:2020年9月 RxPy3 1.0.1: 2020年6月 路易2.0:2019年9月 PyPubSub 4.0.3: 2019年1月 Pyeventdispatcher 0.2.3a0: 2018 公交车道0.0.5:2018 PyPyDispatcher 2.1.2: 2017 Axel 0.0.7: 2016 Dispatcher 1.0: 2012 Py-notify 0.3.1: 2008
有更多的
有很多库可供选择,使用非常不同的术语(事件、信号、处理程序、方法分派、钩子等等)。
我试图对上述软件包以及这里的答案中提到的技术进行概述。
首先,一些术语……
观察者模式
事件系统最基本的风格是“处理程序方法包”,这是一个 观察者模式的简单实现。
基本上,处理程序方法(可调用对象)存储在一个数组中,每个方法在事件“触发”时被调用。
发布-订阅
观察者事件系统的缺点是只能在实际事件上注册处理程序 对象(或处理程序列表)。因此在注册时,事件需要已经存在。
这就是为什么存在第二种类型的事件系统 发布-订阅模式。 这里,处理程序不在事件对象(或处理程序列表)上注册,而是在中央调度程序上注册。 此外,通知器只与调度程序通信。该听什么,该发布什么 由'signal'决定,它只不过是一个名称(字符串)。
调停者模式
您可能也会感兴趣:中介模式。
钩子
钩子系统通常用于应用程序插件的上下文中。的 应用程序包含固定的集成点(钩子),每个插件都可以 连接到该钩子并执行某些操作。
其他“事件”
注意:线程。事件不是一个“事件系统” 在上述意义上。这是一个线程同步系统,其中一个线程等待,直到另一个线程“信号”事件对象。
网络消息传递库也经常使用术语“事件”;有时它们在概念上是相似的;有时不是。 它们当然可以跨越线程、进程和计算机边界。如见。 pyzmq pymq, Twisted, Tornado, gevent, eventlet。
弱引用
在Python中,保留对方法或对象的引用可以确保它不会被删除 被垃圾收集器。这可能是可取的,但也可能导致内存泄漏: 链接的处理程序从来都不是 清理干净。
一些事件系统使用弱引用而不是常规引用来解决这个问题。
一些关于各种图书馆的词汇
观察者风格的事件系统:
zope.event shows the bare bones of how this works (see Lennart's answer). Note: this example does not even support handler arguments. LongPoke's 'callable list' implementation shows that such an event system can be implemented very minimalistically by subclassing list. Felk's variation EventHook also ensures the signatures of callees and callers. spassig's EventHook (Michael Foord's Event Pattern) is a straightforward implementation. Josip's Valued Lessons Event class is basically the same, but uses a set instead of a list to store the bag, and implements __call__ which are both reasonable additions. PyNotify is similar in concept and also provides additional concepts of variables and conditions ('variable changed event'). Homepage is not functional. axel is basically a bag-of-handlers with more features related to threading, error handling, ... python-dispatch requires the even source classes to derive from pydispatch.Dispatcher. buslane is class-based, supports single- or multiple handlers and facilitates extensive type hints. Pithikos' Observer/Event is a lightweight design.
发布-订阅库:
blinker has some nifty features such as automatic disconnection and filtering based on sender. PyPubSub is a stable package, and promises "advanced features that facilitate debugging and maintaining topics and messages". pymitter is a Python port of Node.js EventEmitter2 and offers namespaces, wildcards and TTL. PyDispatcher seems to emphasize flexibility with regards to many-to-many publication etc. Supports weak references. louie is a reworked PyDispatcher and should work "in a wide variety of contexts". pypydispatcher is based on (you guessed it...) PyDispatcher and also works in PyPy. django.dispatch is a rewritten PyDispatcher "with a more limited interface, but higher performance". pyeventdispatcher is based on PHP's Symfony framework's event-dispatcher. dispatcher was extracted from django.dispatch but is getting fairly old. Cristian Garcia's EventManger is a really short implementation.
其他:
Pluggy包含一个pytest插件使用的钩子系统。 RxPy3实现了Observable模式,并允许合并事件,重试等。 Qt的信号和插槽可以从PyQt中获得 或PySide2。当在同一个线程中使用时,它们作为回调工作, 或者作为两个不同线程之间的事件(使用事件循环)。信号和槽有它们的限制 仅适用于从QObject派生的类的对象。
这里是一个最小的设计,应该工作得很好。你要做的就是简单地在一个类中继承Observer,然后使用observe(event_name, callback_fn)来监听特定的事件。当特定事件在代码中的任何地方被触发时(例如。事件('USB connected')),则将触发相应的回调。
class Observer():
_observers = []
def __init__(self):
self._observers.append(self)
self._observed_events = []
def observe(self, event_name, callback_fn):
self._observed_events.append({'event_name' : event_name, 'callback_fn' : callback_fn})
class Event():
def __init__(self, event_name, *callback_args):
for observer in Observer._observers:
for observable in observer._observed_events:
if observable['event_name'] == event_name:
observable['callback_fn'](*callback_args)
例子:
class Room(Observer):
def __init__(self):
print("Room is ready.")
Observer.__init__(self) # DON'T FORGET THIS
def someone_arrived(self, who):
print(who + " has arrived!")
# Observe for specific event
room = Room()
room.observe('someone arrived', room.someone_arrived)
# Fire some events
Event('someone left', 'John')
Event('someone arrived', 'Lenard') # will output "Lenard has arrived!"
Event('someone Farted', 'Lenard')
如果我在pyQt中做代码,我使用QT套接字/信号范式,django也是如此
如果我正在做异步I/O,我使用本机选择模块
如果我使用SAX python解析器,我使用SAX提供的事件API。所以看起来我是底层API的受害者:-)
也许你应该问问自己,你对事件框架/模块有什么期望。我个人倾向于使用qt中的Socket/Signal范例。更多关于这方面的信息可以在这里找到
我对Longpoke的极简主义方法做了一个变化,也确保了呼叫者和呼叫者的签名:
class EventHook(object):
'''
A simple implementation of the Observer-Pattern.
The user can specify an event signature upon inizializazion,
defined by kwargs in the form of argumentname=class (e.g. id=int).
The arguments' types are not checked in this implementation though.
Callables with a fitting signature can be added with += or removed with -=.
All listeners can be notified by calling the EventHook class with fitting
arguments.
>>> event = EventHook(id=int, data=dict)
>>> event += lambda id, data: print("%d %s" % (id, data))
>>> event(id=5, data={"foo": "bar"})
5 {'foo': 'bar'}
>>> event = EventHook(id=int)
>>> event += lambda wrong_name: None
Traceback (most recent call last):
...
ValueError: Listener must have these arguments: (id=int)
>>> event = EventHook(id=int)
>>> event += lambda id: None
>>> event(wrong_name=0)
Traceback (most recent call last):
...
ValueError: This EventHook must be called with these arguments: (id=int)
'''
def __init__(self, **signature):
self._signature = signature
self._argnames = set(signature.keys())
self._handlers = []
def _kwargs_str(self):
return ", ".join(k+"="+v.__name__ for k, v in self._signature.items())
def __iadd__(self, handler):
params = inspect.signature(handler).parameters
valid = True
argnames = set(n for n in params.keys())
if argnames != self._argnames:
valid = False
for p in params.values():
if p.kind == p.VAR_KEYWORD:
valid = True
break
if p.kind not in (p.POSITIONAL_OR_KEYWORD, p.KEYWORD_ONLY):
valid = False
break
if not valid:
raise ValueError("Listener must have these arguments: (%s)"
% self._kwargs_str())
self._handlers.append(handler)
return self
def __isub__(self, handler):
self._handlers.remove(handler)
return self
def __call__(self, *args, **kwargs):
if args or set(kwargs.keys()) != self._argnames:
raise ValueError("This EventHook must be called with these " +
"keyword arguments: (%s)" % self._kwargs_str())
for handler in self._handlers[:]:
handler(**kwargs)
def __repr__(self):
return "EventHook(%s)" % self._kwargs_str()