我知道pydispatcher,但是Python周围一定有其他与事件相关的包。

哪些库是可用的?

我对作为大型框架一部分的事件管理器不感兴趣,我宁愿使用可以轻松扩展的小型骨架解决方案。


当前回答

前段时间我写了一个库,可能对你有用。 它允许您拥有本地和全局侦听器、多种不同的注册方式、执行优先级等等。

from pyeventdispatcher import register

register("foo.bar", lambda event: print("second"))
register("foo.bar", lambda event: print("first "), -100)

dispatch(Event("foo.bar", {"id": 1}))
# first second

看看调度员

其他回答

如果你需要一个跨进程或网络边界的事件总线,你可以尝试PyMQ。它目前支持发布/订阅、消息队列和同步RPC。默认版本工作在一个Redis后端,所以你需要一个运行的Redis服务器。还有一个用于测试的内存后端。您还可以编写自己的后端。

import pymq

# common code
class MyEvent:
    pass

# subscribe code
@pymq.subscriber
def on_event(event: MyEvent):
    print('event received')

# publisher code
pymq.publish(MyEvent())

# you can also customize channels
pymq.subscribe(on_event, channel='my_channel')
pymq.publish(MyEvent(), channel='my_channel')

初始化系统。

from pymq.provider.redis import RedisConfig

# starts a new thread with a Redis event loop
pymq.init(RedisConfig())

# main application control loop

pymq.shutdown()

声明:我是这个库的作者

我们使用Michael ford在他的事件模式中建议的EventHook:

只需添加EventHooks到你的类:

class MyBroadcaster()
    def __init__():
        self.onChange = EventHook()

theBroadcaster = MyBroadcaster()

# add a listener to the event
theBroadcaster.onChange += myFunction

# remove listener from the event
theBroadcaster.onChange -= myFunction

# fire event
theBroadcaster.onChange.fire()

我们添加了从对象中移除所有侦听器到Michaels类的功能,最终得到了这样的结果:

class EventHook(object):

    def __init__(self):
        self.__handlers = []

    def __iadd__(self, handler):
        self.__handlers.append(handler)
        return self

    def __isub__(self, handler):
        self.__handlers.remove(handler)
        return self

    def fire(self, *args, **keywargs):
        for handler in self.__handlers:
            handler(*args, **keywargs)

    def clearObjectHandlers(self, inObject):
        for theHandler in self.__handlers:
            if theHandler.im_self == inObject:
                self -= theHandler

下面是另一个需要考虑的模块。对于要求更高的应用程序,这似乎是一个可行的选择。

Py-notify is a Python package providing tools for implementing Observer programming pattern. These tools include signals, conditions and variables. Signals are lists of handlers that are called when signal is emitted. Conditions are basically boolean variables coupled with a signal that is emitted when condition state changes. They can be combined using standard logical operators (not, and, etc.) into compound conditions. Variables, unlike conditions, can hold any Python object, not just booleans, but they cannot be combined.

如果你想做更复杂的事情,比如合并事件或重试,你可以使用Observable模式和一个成熟的库来实现它。https://github.com/ReactiveX/RxPY。可观察对象在Javascript和Java中非常常见,在一些异步任务中使用非常方便。

from rx import Observable, Observer


def push_five_strings(observer):
        observer.on_next("Alpha")
        observer.on_next("Beta")
        observer.on_next("Gamma")
        observer.on_next("Delta")
        observer.on_next("Epsilon")
        observer.on_completed()


class PrintObserver(Observer):

    def on_next(self, value):
        print("Received {0}".format(value))

    def on_completed(self):
        print("Done!")

    def on_error(self, error):
        print("Error Occurred: {0}".format(error))

source = Observable.create(push_five_strings)

source.subscribe(PrintObserver())

输出:

Received Alpha
Received Beta
Received Gamma
Received Delta
Received Epsilon
Done!

前段时间我写了一个库,可能对你有用。 它允许您拥有本地和全局侦听器、多种不同的注册方式、执行优先级等等。

from pyeventdispatcher import register

register("foo.bar", lambda event: print("second"))
register("foo.bar", lambda event: print("first "), -100)

dispatch(Event("foo.bar", {"id": 1}))
# first second

看看调度员