我在试着了解颠覆者的模式。我看了InfoQ的视频,并试着读了他们的论文。我知道有一个环缓冲区涉及,它被初始化为一个非常大的数组,以利用缓存局部性,消除分配的新内存。

听起来好像有一个或多个原子整数来记录位置。每个“事件”似乎都有一个唯一的id,它在环中的位置是通过计算它的模量相对于环的大小,等等。

不幸的是,我对它的工作原理没有直观的感觉。我做了很多交易应用程序,研究了参与者模型,看了SEDA等。

在他们的演讲中,他们提到这个模式基本上就是路由器的工作方式;然而,我也没有找到任何关于路由器如何工作的很好的描述。

有没有更好的解释?


The Google Code project does reference a technical paper on the implementation of the ring buffer, however it is a bit dry, academic and tough going for someone wanting to learn how it works. However there are some blog posts that have started to explain the internals in a more readable way. There is an explanation of ring buffer that is the core of the disruptor pattern, a description of the consumer barriers (the part related to reading from the disruptor) and some information on handling multiple producers available.

对中断器最简单的描述是:它是一种在线程之间以最有效的方式发送消息的方法。它可以用作队列的替代品,但它还与SEDA和actor共享许多特性。

与队列相比:

Disruptor提供了将消息传递到另一个线程的能力,并在需要时将其唤醒(类似于BlockingQueue)。然而,有3个明显的区别。

The user of the Disruptor defines how messages are stored by extending Entry class and providing a factory to do the preallocation. This allows for either memory reuse (copying) or the Entry could contain a reference to another object. Putting messages into the Disruptor is a 2-phase process, first a slot is claimed in the ring buffer, which provides the user with the Entry that can be filled with the appropriate data. Then the entry must be committed, this 2-phase approach is necessary to allow for the flexible use of memory mentioned above. It is the commit that makes the message visible to the consumer threads. It is the responsibility of the consumer to keep track of the messages that have been consumed from the ring buffer. Moving this responsibility away from the ring buffer itself helped reduce the amount of write contention as each thread maintains its own counter.

与演员相比

Actor模型比大多数其他编程模型更接近于Disruptor,特别是如果您使用所提供的BatchConsumer/BatchHandler类时。这些类隐藏了维护所使用的序列号的所有复杂性,并在重要事件发生时提供了一组简单的回调。然而,有一些细微的区别。

The Disruptor uses a 1 thread - 1 consumer model, where Actors use an N:M model i.e. you can have as many actors as you like and they will be distributed across a fixed numbers of threads (generally 1 per core). The BatchHandler interface provides an additional (and very important) callback onEndOfBatch(). This allows for slow consumers, e.g. those doing I/O to batch events together to improve throughput. It is possible to do batching in other Actor frameworks, however as nearly all other frameworks don't provide a callback at the end of the batch you need to use a timeout to determine the end of the batch, resulting in poor latency.

与SEDA相比

LMAX构建了破坏者模式来取代基于SEDA的方法。

The main improvement that it provided over SEDA was the ability to do work in parallel. To do this the Disruptor supports multi-casting the same messages (in the same order) to multiple consumers. This avoids the need for fork stages in the pipeline. We also allow consumers to wait on the results of other consumers without having to put another queuing stage between them. A consumer can simply watch the sequence number of a consumer that it is dependent on. This avoids the need for join stages in pipeline.

与记忆障碍相比

另一种思考方式是,它是一个结构化的,有序的记忆屏障。其中生产者屏障形成写屏障,消费者屏障是读屏障。


首先,我们想了解它提供的编程模型。

有一个或多个作者。有一个或多个阅读器。有一行条目,完全按照从旧到新的顺序排列(如图从左到右)。作者可以在右端添加新条目。每个阅读器从左到右依次读取条目。显然,读者无法读懂过去的作家。

没有删除条目的概念。我使用“读者”而不是“消费者”来避免条目被消费的形象。然而,我们知道,最后一个读卡器左边的条目将变得无用。

一般来说,读者可以同时阅读和独立阅读。但是,我们可以在阅读器之间声明依赖关系。读卡器的依赖关系可以是任意的无循环图。如果读者B依赖于读者A,读者B就无法读过读者A。

产生阅读器依赖是因为阅读器A可以注释条目,而阅读器B依赖于该注释。例如,A对一个条目进行一些计算,并将结果存储在条目的字段A中。然后继续,现在B可以读取条目,A的值被存储。如果读者C不依赖于A,则C不应试图读取A。

这确实是一个有趣的编程模型。撇开性能不谈,模型本身就可以使许多应用受益。

当然,LMAX的主要目标是性能。它使用预先分配的条目环。这个环足够大,但它是有界限的,这样系统的负载就不会超过设计容量。如果圈满了,作者会等到最慢的读者前进并腾出空间。

条目对象是预先分配的,并且永远存在,以减少垃圾收集成本。我们不插入新的条目对象或删除旧的条目对象,相反,写入器请求一个预先存在的条目,填充它的字段,并通知读取器。这种明显的两相作用实际上只是原子作用

setNewEntry(EntryPopulator);

interface EntryPopulator{ void populate(Entry existingEntry); }

预分配条目还意味着相邻条目(很可能)位于相邻的内存单元中,并且由于读取器顺序读取条目,因此这对于利用CPU缓存非常重要。

并努力避免锁,CAS,甚至内存障碍(例如,如果只有一个写入器,则使用非易失性序列变量)

对于读取器的开发人员:不同注释的读取器应该写入不同的字段,以避免写入争用。(实际上它们应该写入不同的缓存线。)注释读者不应该接触其他非依赖读者可能阅读的任何内容。这就是为什么我说这些阅读器注释条目,而不是修改条目。


Martin Fowler写了一篇关于LMAX和颠覆者模式的文章,《LMAX架构》,这篇文章可能会进一步阐明它。


事实上,出于好奇心,我花时间研究了实际的来源,它背后的想法很简单。在写这篇文章时,最新的版本是3.2.1。

有一个缓冲区存储预先分配的事件,该事件将保存供使用者读取的数据。

缓冲区由其长度的标志数组(整数数组)支持,该数组描述了缓冲区插槽的可用性(详细信息请参阅进一步)。访问数组的方式类似于java的#AtomicIntegerArray,因此为了便于解释,您不妨假设它就是一个数组。

可以有任意数量的生产者。当生产者想要写入缓冲区时,会生成一个长数字(就像调用AtomicLong#getAndIncrement一样,破坏者实际上使用自己的实现,但它以相同的方式工作)。让我们称这个生成的长为producerCallId。以类似的方式,当消费者结束从缓冲区读取插槽时生成consumerCallId。访问最近的consumerCallId。

(如果有很多消费者,则选择id最低的呼叫。)

然后比较这些id,如果两者之间的差异小于缓冲区端,则允许生产者写入。

(如果producerCallId大于最近的consumerCallId + bufferSize,这意味着缓冲区已满,生产者被迫进行总线等待,直到有可用的位置。)

然后生产者根据他的callId(它是producerCallId对bufferSize的模,但由于bufferSize总是2的幂(在缓冲区创建时强制限制),实际使用的操作是producerCallId & (bufferSize - 1))分配缓冲区中的插槽。然后,它可以自由地修改该插槽中的事件。

(实际的算法稍微复杂一些,为了优化,需要将最近的consumerId缓存在一个单独的原子引用中。)

当事件被修改时,更改将被“发布”。发布时,标志数组中的相应槽位将被更新的标志填充。标志值是循环的编号(producerCallId除以bufferSize(因为bufferSize是2的幂,所以实际操作是右移)。

以类似的方式,可以有任意数量的消费者。每次消费者想要访问缓冲区时,都会生成一个consumerCallId(根据消费者是如何添加到中断器的,id生成中使用的原子可能是共享的,也可能是单独的)。然后将这个consumerCallId与最近的producentCallId进行比较,如果它在两者中较低,则允许读取器继续前进。

(类似地,如果producerCallId是even to consumerCallId,这意味着缓冲区是空的,消费者被迫等待。等待的方式由创建中断器期间的WaitStrategy定义。)

对于单个消费者(具有自己的id生成器的消费者),检查的下一件事是批量消费的能力。缓冲区中的槽按照从对应于consumerCallId的槽(索引的确定方式与生产者相同)到对应于最近的producerCallId的槽的顺序进行检查。

通过将写入标记数组中的标记值与为consumerCallId生成的标记值进行比较,在循环中检查它们。如果标志匹配,则意味着填充插槽的生产者已经提交了他们的更改。如果不是,则循环被打破,并返回提交的最高的changeId。从ConsumerCallId到changeId中接收的槽可以批量使用。

如果一组消费者一起读取(使用共享id生成器的消费者),每个消费者只接受一个callId,并且只检查和返回该callId的插槽。


从这篇文章中:

中断器模式是一个由循环备份的批处理队列 数组(即环形缓冲区)填充预分配的传输 对象,该对象使用内存屏障同步生产者和 消费者通过序列。

记忆障碍很难解释,在我看来,Trisha的博客已经做了最好的尝试:http://mechanitis.blogspot.com/2011/08/dissecting-disruptor-why-its-so-fast.html

但是如果您不想深入了解底层的细节,您可以只知道Java中的内存屏障是通过volatile关键字或Java .util.concurrent. atomiclong实现的。干扰器模式序列是atomiclong,通过内存屏障而不是锁在生产者和消费者之间来回通信。

我发现通过代码更容易理解一个概念,所以下面的代码是来自CoralQueue的一个简单helloworld,它是由我所属的CoralBlocks完成的一个干扰模式实现。在下面的代码中,你可以看到干扰器模式如何实现批处理,以及环形缓冲区(即循环数组)如何允许两个线程之间无垃圾通信:

package com.coralblocks.coralqueue.sample.queue;

import com.coralblocks.coralqueue.AtomicQueue;
import com.coralblocks.coralqueue.Queue;
import com.coralblocks.coralqueue.util.MutableLong;

public class Sample {

    public static void main(String[] args) throws InterruptedException {

        final Queue<MutableLong> queue = new AtomicQueue<MutableLong>(1024, MutableLong.class);

        Thread consumer = new Thread() {

            @Override
            public void run() {

                boolean running = true;

                while(running) {
                    long avail;
                    while((avail = queue.availableToPoll()) == 0); // busy spin
                    for(int i = 0; i < avail; i++) {
                        MutableLong ml = queue.poll();
                        if (ml.get() == -1) {
                            running = false;
                        } else {
                            System.out.println(ml.get());
                        }
                    }
                    queue.donePolling();
                }
            }

        };

        consumer.start();

        MutableLong ml;

        for(int i = 0; i < 10; i++) {
            while((ml = queue.nextToDispatch()) == null); // busy spin
            ml.set(System.nanoTime());
            queue.flush();
        }

        // send a message to stop consumer...
        while((ml = queue.nextToDispatch()) == null); // busy spin
        ml.set(-1);
        queue.flush();

        consumer.join(); // wait for the consumer thread to die...
    }
}