我知道pydispatcher,但是Python周围一定有其他与事件相关的包。

哪些库是可用的?

我对作为大型框架一部分的事件管理器不感兴趣,我宁愿使用可以轻松扩展的小型骨架解决方案。


当前回答

如果我在pyQt中做代码,我使用QT套接字/信号范式,django也是如此

如果我正在做异步I/O,我使用本机选择模块

如果我使用SAX python解析器,我使用SAX提供的事件API。所以看起来我是底层API的受害者:-)

也许你应该问问自己,你对事件框架/模块有什么期望。我个人倾向于使用qt中的Socket/Signal范例。更多关于这方面的信息可以在这里找到

其他回答

我使用zope.event。这是你能想象到的最简单的骨架。: -) 事实上,这里是完整的源代码:

subscribers = []

def notify(event):
    for subscriber in subscribers:
        subscriber(event)

例如,请注意不能在进程之间发送消息。它不是一个消息系统,只是一个事件系统,仅此而已。

你可以看看pymitter (pypi)。它是一个小的单文件(~250 loc)方法 提供名称空间、通配符和TTL。

这里有一个基本的例子:

from pymitter import EventEmitter

ee = EventEmitter()

# decorator usage
@ee.on("myevent")
def handler1(arg):
   print "handler1 called with", arg

# callback usage
def handler2(arg):
    print "handler2 called with", arg
ee.on("myotherevent", handler2)

# emit
ee.emit("myevent", "foo")
# -> "handler1 called with foo"

ee.emit("myotherevent", "bar")
# -> "handler2 called with bar"

另一个方便的包是事件。它将核心封装到事件订阅和事件触发,感觉像是语言的“自然”部分。它似乎类似于c#语言,后者提供了一种方便的方式来声明、订阅和触发事件。从技术上讲,事件是一个“槽”,可以将回调函数(事件处理程序)附加到其中—称为订阅事件的过程。

# Define a callback function
def something_changed(reason):
    print "something changed because %s" % reason

# Use events module to create an event and register one or more callback functions
from events import Events
events = Events()
events.on_change += something_changed

触发事件时,将按顺序调用所有附加的事件处理程序。要触发该事件,请对槽位执行调用:

events.on_change('it had to happen')

这将输出:

'something changed because it had to happen'

更多的文档可以在github repo或文档中找到。

我对Longpoke的极简主义方法做了一个变化,也确保了呼叫者和呼叫者的签名:

class EventHook(object):
    '''
    A simple implementation of the Observer-Pattern.
    The user can specify an event signature upon inizializazion,
    defined by kwargs in the form of argumentname=class (e.g. id=int).
    The arguments' types are not checked in this implementation though.
    Callables with a fitting signature can be added with += or removed with -=.
    All listeners can be notified by calling the EventHook class with fitting
    arguments.

    >>> event = EventHook(id=int, data=dict)
    >>> event += lambda id, data: print("%d %s" % (id, data))
    >>> event(id=5, data={"foo": "bar"})
    5 {'foo': 'bar'}

    >>> event = EventHook(id=int)
    >>> event += lambda wrong_name: None
    Traceback (most recent call last):
        ...
    ValueError: Listener must have these arguments: (id=int)

    >>> event = EventHook(id=int)
    >>> event += lambda id: None
    >>> event(wrong_name=0)
    Traceback (most recent call last):
        ...
    ValueError: This EventHook must be called with these arguments: (id=int)
    '''
    def __init__(self, **signature):
        self._signature = signature
        self._argnames = set(signature.keys())
        self._handlers = []

    def _kwargs_str(self):
        return ", ".join(k+"="+v.__name__ for k, v in self._signature.items())

    def __iadd__(self, handler):
        params = inspect.signature(handler).parameters
        valid = True
        argnames = set(n for n in params.keys())
        if argnames != self._argnames:
            valid = False
        for p in params.values():
            if p.kind == p.VAR_KEYWORD:
                valid = True
                break
            if p.kind not in (p.POSITIONAL_OR_KEYWORD, p.KEYWORD_ONLY):
                valid = False
                break
        if not valid:
            raise ValueError("Listener must have these arguments: (%s)"
                             % self._kwargs_str())
        self._handlers.append(handler)
        return self

    def __isub__(self, handler):
        self._handlers.remove(handler)
        return self

    def __call__(self, *args, **kwargs):
        if args or set(kwargs.keys()) != self._argnames:
            raise ValueError("This EventHook must be called with these " +
                             "keyword arguments: (%s)" % self._kwargs_str())
        for handler in self._handlers[:]:
            handler(**kwargs)

    def __repr__(self):
        return "EventHook(%s)" % self._kwargs_str()

如果你想做更复杂的事情,比如合并事件或重试,你可以使用Observable模式和一个成熟的库来实现它。https://github.com/ReactiveX/RxPY。可观察对象在Javascript和Java中非常常见,在一些异步任务中使用非常方便。

from rx import Observable, Observer


def push_five_strings(observer):
        observer.on_next("Alpha")
        observer.on_next("Beta")
        observer.on_next("Gamma")
        observer.on_next("Delta")
        observer.on_next("Epsilon")
        observer.on_completed()


class PrintObserver(Observer):

    def on_next(self, value):
        print("Received {0}".format(value))

    def on_completed(self):
        print("Done!")

    def on_error(self, error):
        print("Error Occurred: {0}".format(error))

source = Observable.create(push_five_strings)

source.subscribe(PrintObserver())

输出:

Received Alpha
Received Beta
Received Gamma
Received Delta
Received Epsilon
Done!