我知道pydispatcher,但是Python周围一定有其他与事件相关的包。
哪些库是可用的?
我对作为大型框架一部分的事件管理器不感兴趣,我宁愿使用可以轻松扩展的小型骨架解决方案。
我知道pydispatcher,但是Python周围一定有其他与事件相关的包。
哪些库是可用的?
我对作为大型框架一部分的事件管理器不感兴趣,我宁愿使用可以轻松扩展的小型骨架解决方案。
当前回答
这里是一个最小的设计,应该工作得很好。你要做的就是简单地在一个类中继承Observer,然后使用observe(event_name, callback_fn)来监听特定的事件。当特定事件在代码中的任何地方被触发时(例如。事件('USB connected')),则将触发相应的回调。
class Observer():
_observers = []
def __init__(self):
self._observers.append(self)
self._observed_events = []
def observe(self, event_name, callback_fn):
self._observed_events.append({'event_name' : event_name, 'callback_fn' : callback_fn})
class Event():
def __init__(self, event_name, *callback_args):
for observer in Observer._observers:
for observable in observer._observed_events:
if observable['event_name'] == event_name:
observable['callback_fn'](*callback_args)
例子:
class Room(Observer):
def __init__(self):
print("Room is ready.")
Observer.__init__(self) # DON'T FORGET THIS
def someone_arrived(self, who):
print(who + " has arrived!")
# Observe for specific event
room = Room()
room.observe('someone arrived', room.someone_arrived)
# Fire some events
Event('someone left', 'John')
Event('someone arrived', 'Lenard') # will output "Lenard has arrived!"
Event('someone Farted', 'Lenard')
其他回答
另一个方便的包是事件。它将核心封装到事件订阅和事件触发,感觉像是语言的“自然”部分。它似乎类似于c#语言,后者提供了一种方便的方式来声明、订阅和触发事件。从技术上讲,事件是一个“槽”,可以将回调函数(事件处理程序)附加到其中—称为订阅事件的过程。
# Define a callback function
def something_changed(reason):
print "something changed because %s" % reason
# Use events module to create an event and register one or more callback functions
from events import Events
events = Events()
events.on_change += something_changed
触发事件时,将按顺序调用所有附加的事件处理程序。要触发该事件,请对槽位执行调用:
events.on_change('it had to happen')
这将输出:
'something changed because it had to happen'
更多的文档可以在github repo或文档中找到。
这里是一个最小的设计,应该工作得很好。你要做的就是简单地在一个类中继承Observer,然后使用observe(event_name, callback_fn)来监听特定的事件。当特定事件在代码中的任何地方被触发时(例如。事件('USB connected')),则将触发相应的回调。
class Observer():
_observers = []
def __init__(self):
self._observers.append(self)
self._observed_events = []
def observe(self, event_name, callback_fn):
self._observed_events.append({'event_name' : event_name, 'callback_fn' : callback_fn})
class Event():
def __init__(self, event_name, *callback_args):
for observer in Observer._observers:
for observable in observer._observed_events:
if observable['event_name'] == event_name:
observable['callback_fn'](*callback_args)
例子:
class Room(Observer):
def __init__(self):
print("Room is ready.")
Observer.__init__(self) # DON'T FORGET THIS
def someone_arrived(self, who):
print(who + " has arrived!")
# Observe for specific event
room = Room()
room.observe('someone arrived', room.someone_arrived)
# Fire some events
Event('someone left', 'John')
Event('someone arrived', 'Lenard') # will output "Lenard has arrived!"
Event('someone Farted', 'Lenard')
你可以看看pymitter (pypi)。它是一个小的单文件(~250 loc)方法 提供名称空间、通配符和TTL。
这里有一个基本的例子:
from pymitter import EventEmitter
ee = EventEmitter()
# decorator usage
@ee.on("myevent")
def handler1(arg):
print "handler1 called with", arg
# callback usage
def handler2(arg):
print "handler2 called with", arg
ee.on("myotherevent", handler2)
# emit
ee.emit("myevent", "foo")
# -> "handler1 called with foo"
ee.emit("myotherevent", "bar")
# -> "handler2 called with bar"
下面是另一个需要考虑的模块。对于要求更高的应用程序,这似乎是一个可行的选择。
Py-notify is a Python package providing tools for implementing Observer programming pattern. These tools include signals, conditions and variables. Signals are lists of handlers that are called when signal is emitted. Conditions are basically boolean variables coupled with a signal that is emitted when condition state changes. They can be combined using standard logical operators (not, and, etc.) into compound conditions. Variables, unlike conditions, can hold any Python object, not just booleans, but they cannot be combined.
如果你想做更复杂的事情,比如合并事件或重试,你可以使用Observable模式和一个成熟的库来实现它。https://github.com/ReactiveX/RxPY。可观察对象在Javascript和Java中非常常见,在一些异步任务中使用非常方便。
from rx import Observable, Observer
def push_five_strings(observer):
observer.on_next("Alpha")
observer.on_next("Beta")
observer.on_next("Gamma")
observer.on_next("Delta")
observer.on_next("Epsilon")
observer.on_completed()
class PrintObserver(Observer):
def on_next(self, value):
print("Received {0}".format(value))
def on_completed(self):
print("Done!")
def on_error(self, error):
print("Error Occurred: {0}".format(error))
source = Observable.create(push_five_strings)
source.subscribe(PrintObserver())
输出:
Received Alpha
Received Beta
Received Gamma
Received Delta
Received Epsilon
Done!