我知道pydispatcher,但是Python周围一定有其他与事件相关的包。
哪些库是可用的?
我对作为大型框架一部分的事件管理器不感兴趣,我宁愿使用可以轻松扩展的小型骨架解决方案。
我知道pydispatcher,但是Python周围一定有其他与事件相关的包。
哪些库是可用的?
我对作为大型框架一部分的事件管理器不感兴趣,我宁愿使用可以轻松扩展的小型骨架解决方案。
当前回答
如果你需要一个跨进程或网络边界的事件总线,你可以尝试PyMQ。它目前支持发布/订阅、消息队列和同步RPC。默认版本工作在一个Redis后端,所以你需要一个运行的Redis服务器。还有一个用于测试的内存后端。您还可以编写自己的后端。
import pymq
# common code
class MyEvent:
pass
# subscribe code
@pymq.subscriber
def on_event(event: MyEvent):
print('event received')
# publisher code
pymq.publish(MyEvent())
# you can also customize channels
pymq.subscribe(on_event, channel='my_channel')
pymq.publish(MyEvent(), channel='my_channel')
初始化系统。
from pymq.provider.redis import RedisConfig
# starts a new thread with a Redis event loop
pymq.init(RedisConfig())
# main application control loop
pymq.shutdown()
声明:我是这个库的作者
其他回答
我对Longpoke的极简主义方法做了一个变化,也确保了呼叫者和呼叫者的签名:
class EventHook(object):
'''
A simple implementation of the Observer-Pattern.
The user can specify an event signature upon inizializazion,
defined by kwargs in the form of argumentname=class (e.g. id=int).
The arguments' types are not checked in this implementation though.
Callables with a fitting signature can be added with += or removed with -=.
All listeners can be notified by calling the EventHook class with fitting
arguments.
>>> event = EventHook(id=int, data=dict)
>>> event += lambda id, data: print("%d %s" % (id, data))
>>> event(id=5, data={"foo": "bar"})
5 {'foo': 'bar'}
>>> event = EventHook(id=int)
>>> event += lambda wrong_name: None
Traceback (most recent call last):
...
ValueError: Listener must have these arguments: (id=int)
>>> event = EventHook(id=int)
>>> event += lambda id: None
>>> event(wrong_name=0)
Traceback (most recent call last):
...
ValueError: This EventHook must be called with these arguments: (id=int)
'''
def __init__(self, **signature):
self._signature = signature
self._argnames = set(signature.keys())
self._handlers = []
def _kwargs_str(self):
return ", ".join(k+"="+v.__name__ for k, v in self._signature.items())
def __iadd__(self, handler):
params = inspect.signature(handler).parameters
valid = True
argnames = set(n for n in params.keys())
if argnames != self._argnames:
valid = False
for p in params.values():
if p.kind == p.VAR_KEYWORD:
valid = True
break
if p.kind not in (p.POSITIONAL_OR_KEYWORD, p.KEYWORD_ONLY):
valid = False
break
if not valid:
raise ValueError("Listener must have these arguments: (%s)"
% self._kwargs_str())
self._handlers.append(handler)
return self
def __isub__(self, handler):
self._handlers.remove(handler)
return self
def __call__(self, *args, **kwargs):
if args or set(kwargs.keys()) != self._argnames:
raise ValueError("This EventHook must be called with these " +
"keyword arguments: (%s)" % self._kwargs_str())
for handler in self._handlers[:]:
handler(**kwargs)
def __repr__(self):
return "EventHook(%s)" % self._kwargs_str()
我使用zope.event。这是你能想象到的最简单的骨架。: -) 事实上,这里是完整的源代码:
subscribers = []
def notify(event):
for subscriber in subscribers:
subscriber(event)
例如,请注意不能在进程之间发送消息。它不是一个消息系统,只是一个事件系统,仅此而已。
我在《宝贵的课程》上找到了这个小脚本。它似乎具有我所追求的简单性/功率比。Peter Thatcher是以下代码的作者(未提及许可)。
class Event:
def __init__(self):
self.handlers = set()
def handle(self, handler):
self.handlers.add(handler)
return self
def unhandle(self, handler):
try:
self.handlers.remove(handler)
except:
raise ValueError("Handler is not handling this event, so cannot unhandle it.")
return self
def fire(self, *args, **kargs):
for handler in self.handlers:
handler(*args, **kargs)
def getHandlerCount(self):
return len(self.handlers)
__iadd__ = handle
__isub__ = unhandle
__call__ = fire
__len__ = getHandlerCount
class MockFileWatcher:
def __init__(self):
self.fileChanged = Event()
def watchFiles(self):
source_path = "foo"
self.fileChanged(source_path)
def log_file_change(source_path):
print "%r changed." % (source_path,)
def log_file_change2(source_path):
print "%r changed!" % (source_path,)
watcher = MockFileWatcher()
watcher.fileChanged += log_file_change2
watcher.fileChanged += log_file_change
watcher.fileChanged -= log_file_change2
watcher.watchFiles()
你可以看看pymitter (pypi)。它是一个小的单文件(~250 loc)方法 提供名称空间、通配符和TTL。
这里有一个基本的例子:
from pymitter import EventEmitter
ee = EventEmitter()
# decorator usage
@ee.on("myevent")
def handler1(arg):
print "handler1 called with", arg
# callback usage
def handler2(arg):
print "handler2 called with", arg
ee.on("myotherevent", handler2)
# emit
ee.emit("myevent", "foo")
# -> "handler1 called with foo"
ee.emit("myotherevent", "bar")
# -> "handler2 called with bar"
如果你想做更复杂的事情,比如合并事件或重试,你可以使用Observable模式和一个成熟的库来实现它。https://github.com/ReactiveX/RxPY。可观察对象在Javascript和Java中非常常见,在一些异步任务中使用非常方便。
from rx import Observable, Observer
def push_five_strings(observer):
observer.on_next("Alpha")
observer.on_next("Beta")
observer.on_next("Gamma")
observer.on_next("Delta")
observer.on_next("Epsilon")
observer.on_completed()
class PrintObserver(Observer):
def on_next(self, value):
print("Received {0}".format(value))
def on_completed(self):
print("Done!")
def on_error(self, error):
print("Error Occurred: {0}".format(error))
source = Observable.create(push_five_strings)
source.subscribe(PrintObserver())
输出:
Received Alpha
Received Beta
Received Gamma
Received Delta
Received Epsilon
Done!