The Akka Streams library already comes with quite a wealth of documentation. However, the main problem for me is that it provides too much material - I feel quite overwhelmed by the number of concepts that I have to learn. Lots of examples shown there feel very heavyweight and can't easily be translated to real world use cases and are therefore quite esoteric. I think it gives way too much details without explaining how to build all the building blocks together and how exactly it helps to solve specific problems.

有源、汇、流、图阶段、部分图、物化、图DSL等等,我只是不知道从哪里开始。快速入门指南是一个开始的地方,但我不明白它。它只是抛出了上面提到的概念,而没有解释它们。此外,代码示例不能执行-有缺失的部分,使它或多或少不可能让我跟随文本。

谁能解释一下源、汇、流、图阶段、局部图、物化等概念,也许还有其他一些我错过的东西,用简单的语言和简单的例子来解释,这些例子不能解释每一个细节(可能在一开始就不需要)?


当前回答

这个答案是基于akka-stream 2.4.2版本的。在其他版本中,API可能略有不同。依赖项可以被sbt使用:

libraryDependencies += "com.typesafe.akka" %% "akka-stream" % "2.4.2"

好了,我们开始吧。Akka Streams的API由三种主要类型组成。与反应式流相比,这些类型更强大,因此也更复杂。假设对于所有的代码示例,已经存在以下定义:

import scala.concurrent._
import akka._
import akka.actor._
import akka.stream._
import akka.stream.scaladsl._
import akka.util._

implicit val system = ActorSystem("TestSystem")
implicit val materializer = ActorMaterializer()
import system.dispatcher

The import statements are needed for the type declarations. system represents the actor system of Akka and materializer represents the evaluation context of the stream. In our case we use a ActorMaterializer, which means that the streams are evaluated on top of actors. Both values are marked as implicit, which gives the Scala compiler the possibility to inject these two dependencies automatically whenever they are needed. We also import system.dispatcher, which is a execution context for Futures.

新的API

Akka流具有以下关键属性:

They implement the Reactive Streams specification, whose three main goals backpressure, async and non-blocking boundaries and interoperability between different implementations do fully apply for Akka Streams too. They provide an abstraction for an evaluation engine for the streams, which is called Materializer. Programs are formulated as reusable building blocks, which are represented as the three main types Source, Sink and Flow. The building blocks form a graph whose evaluation is based on the Materializer and needs to be explicitly triggered.

下面将更深入地介绍如何使用这三种主要类型。

源是数据的创建者,它作为流的输入源。每个源都有一个输出通道,没有输入通道。所有数据都通过输出通道流向连接到源的任何设备。

图片来自boldradius.com。

可以通过多种方式创建Source:

scala> val s = Source.empty
s: akka.stream.scaladsl.Source[Nothing,akka.NotUsed] = ...

scala> val s = Source.single("single element")
s: akka.stream.scaladsl.Source[String,akka.NotUsed] = ...

scala> val s = Source(1 to 3)
s: akka.stream.scaladsl.Source[Int,akka.NotUsed] = ...

scala> val s = Source(Future("single value from a Future"))
s: akka.stream.scaladsl.Source[String,akka.NotUsed] = ...

scala> s runForeach println
res0: scala.concurrent.Future[akka.Done] = ...
single value from a Future

In the above cases we fed the Source with finite data, which means they will terminate eventually. One should not forget, that Reactive Streams are lazy and asynchronous by default. This means one explicitly has to request the evaluation of the stream. In Akka Streams this can be done through the run* methods. The runForeach would be no different to the well known foreach function - through the run addition it makes explicit that we ask for an evaluation of the stream. Since finite data is boring, we continue with infinite one:

scala> val s = Source.repeat(5)
s: akka.stream.scaladsl.Source[Int,akka.NotUsed] = ...

scala> s take 3 runForeach println
res1: scala.concurrent.Future[akka.Done] = ...
5
5
5

使用take方法,我们可以创建一个人为的停止点,防止我们无限地求值。由于actor支持是内置的,我们还可以轻松地将消息发送给actor:

def run(actor: ActorRef) = {
  Future { Thread.sleep(300); actor ! 1 }
  Future { Thread.sleep(200); actor ! 2 }
  Future { Thread.sleep(100); actor ! 3 }
}
val s = Source
  .actorRef[Int](bufferSize = 0, OverflowStrategy.fail)
  .mapMaterializedValue(run)

scala> s runForeach println
res1: scala.concurrent.Future[akka.Done] = ...
3
2
1

We can see that the Futures are executed asynchronously on different threads, which explains the result. In the above example a buffer for the incoming elements is not necessary and therefore with OverflowStrategy.fail we can configure that the stream should fail on a buffer overflow. Especially through this actor interface, we can feed the stream through any data source. It doesn't matter if the data is created by the same thread, by a different one, by another process or if they come from a remote system over the Internet.

Sink

接收器基本上与源相反。它是流的端点,因此消耗数据。Sink只有一个输入通道,没有输出通道。当我们希望以可重用的方式指定数据收集器的行为而不计算流时,接收器尤其需要。已知的run*方法不允许我们使用这些属性,因此首选使用Sink。

图片来自boldradius.com。

一个简单的Sink实例:

scala> val source = Source(1 to 3)
source: akka.stream.scaladsl.Source[Int,akka.NotUsed] = ...

scala> val sink = Sink.foreach[Int](elem => println(s"sink received: $elem"))
sink: akka.stream.scaladsl.Sink[Int,scala.concurrent.Future[akka.Done]] = ...

scala> val flow = source to sink
flow: akka.stream.scaladsl.RunnableGraph[akka.NotUsed] = ...

scala> flow.run()
res3: akka.NotUsed = NotUsed
sink received: 1
sink received: 2
sink received: 3

可以使用to方法将源连接到接收器。它返回一个所谓的RunnableFlow,我们稍后将看到一种特殊形式的流——可以通过调用它的run()方法来执行的流。

图片来自boldradius.com。

当然,可以将到达接收器的所有值转发给actor:

val actor = system.actorOf(Props(new Actor {
  override def receive = {
    case msg => println(s"actor received: $msg")
  }
}))

scala> val sink = Sink.actorRef[Int](actor, onCompleteMessage = "stream completed")
sink: akka.stream.scaladsl.Sink[Int,akka.NotUsed] = ...

scala> val runnable = Source(1 to 3) to sink
runnable: akka.stream.scaladsl.RunnableGraph[akka.NotUsed] = ...

scala> runnable.run()
res3: akka.NotUsed = NotUsed
actor received: 1
actor received: 2
actor received: 3
actor received: stream completed

Flow

如果你需要在Akka流和现有系统之间建立连接,数据源和接收器是很好的选择,但实际上你无法对它们做任何事情。流是Akka Streams基本抽象中最后一个缺失的部分。它们充当不同流之间的连接器,并可用于转换流的元素。

图片来自boldradius.com。

如果一个流连接到一个源,一个新的源就是结果。同样地,连接到接收器的流将创建一个新的接收器。同时连接源和接收器的流会产生RunnableFlow。因此,它们位于输入和输出通道之间,但只要它们没有连接到源或接收器,它们本身就不对应于任何一种风味。

图片来自boldradius.com。

为了更好地理解flow,我们将看一些例子:

scala> val source = Source(1 to 3)
source: akka.stream.scaladsl.Source[Int,akka.NotUsed] = ...

scala> val sink = Sink.foreach[Int](println)
sink: akka.stream.scaladsl.Sink[Int,scala.concurrent.Future[akka.Done]] = ...

scala> val invert = Flow[Int].map(elem => elem * -1)
invert: akka.stream.scaladsl.Flow[Int,Int,akka.NotUsed] = ...

scala> val doubler = Flow[Int].map(elem => elem * 2)
doubler: akka.stream.scaladsl.Flow[Int,Int,akka.NotUsed] = ...

scala> val runnable = source via invert via doubler to sink
runnable: akka.stream.scaladsl.RunnableGraph[akka.NotUsed] = ...

scala> runnable.run()
res10: akka.NotUsed = NotUsed
-2
-4
-6

通过Via方法,我们可以将源与流连接起来。我们需要指定输入类型,因为编译器不能为我们推断它。正如我们在这个简单的例子中已经看到的,倒置和双流完全独立于任何数据生产者和消费者。它们只是转换数据并将其转发到输出通道。这意味着我们可以在多个流之间重用一个流:

scala> val s1 = Source(1 to 3) via invert to sink
s1: akka.stream.scaladsl.RunnableGraph[akka.NotUsed] = ...

scala> val s2 = Source(-3 to -1) via invert to sink
s2: akka.stream.scaladsl.RunnableGraph[akka.NotUsed] = ...

scala> s1.run()
res10: akka.NotUsed = NotUsed
-1
-2
-3

scala> s2.run()
res11: akka.NotUsed = NotUsed
3
2
1

S1和s2代表了全新的流——它们不通过构建块共享任何数据。

无界数据流

在我们继续之前,我们应该首先回顾一下反应流的一些关键方面。不限数量的元素可以到达任意点,并可以将流置于不同的状态。除了通常的可运行流状态外,流可能会通过错误或指示没有进一步数据到达的信号而停止。流可以通过在时间轴上标记事件以图形化的方式建模,如下所示:

图片取自你错过的《响应式编程导论》。

我们已经在前一节的示例中看到了可运行流。只要流可以实际物化,我们就会得到RunnableGraph,这意味着Sink连接到Source。到目前为止,我们总是物化为值Unit,可以在类型中看到:

val source: Source[Int, NotUsed] = Source(1 to 3)
val sink: Sink[Int, Future[Done]] = Sink.foreach[Int](println)
val flow: Flow[Int, Int, NotUsed] = Flow[Int].map(x => x)

For Source and Sink the second type parameter and for Flow the third type parameter denote the materialized value. Throughout this answer, the full meaning of materialization shall not be explained. However, further details about materialization can be found at the official documentation. For now the only thing we need to know is that the materialized value is what we get when we run a stream. Since we were only interested in side effects so far, we got Unit as the materialized value. The exception to this was a materialization of a sink, which resulted in a Future. It gave us back a Future, since this value can denote when the stream that is connected to the sink has been ended. So far, the previous code examples were nice to explain the concept but they were also boring because we only dealt with finite streams or with very simple infinite ones. To make it more interesting, in the following a full asynchronous and unbounded stream shall be explained.

点击流的例子

As an example, we want to have a stream that captures click events. To make it more challenging, let's say we also want to group click events that happen in a short time after each other. This way we could easily discover double, triple or tenfold clicks. Furthermore, we want to filter out all single clicks. Take a deep breath and imagine how you would solve that problem in an imperative manner. I bet no one would be able to implement a solution that works correctly on the first try. In a reactive fashion this problem is trivial to solve. In fact, the solution is so simple and straightforward to implement that we can even express it in a diagram that directly describes the behavior of the code:

图片取自你错过的《响应式编程导论》。

The gray boxes are functions that describe how one stream is transformed into another. With the throttle function we accumulate clicks within 250 milliseconds, the map and filter functions should be self-explanatory. The color orbs represent an event and the arrows depict how they flow through our functions. Later in the processing steps, we get less and less elements that flow through our stream, since we group them together and filter them out. The code for this image would look something like this:

val multiClickStream = clickStream
    .throttle(250.millis)
    .map(clickEvents => clickEvents.length)
    .filter(numberOfClicks => numberOfClicks >= 2)

整个逻辑可以用四行代码来表示!在Scala中,我们可以把它写得更短:

val multiClickStream = clickStream.throttle(250.millis).map(_.length).filter(_ >= 2)

The definition of clickStream is a little bit more complex but this is only the case because the example program runs on the JVM, where capturing of click events is not easily possible. Another complication is that Akka by default doesn't provide the throttle function. Instead we had to write it by ourselves. Since this function is (as it is the case for the map or filter functions) reusable across different use cases I don't count these lines to the number of lines we needed to implement the logic. In imperative languages however, it is normal that logic can't be reused that easily and that the different logical steps happen all at one place instead of being applied sequentially, which means that we probably would have misshaped our code with the throttling logic. The full code example is available as a gist and shall not be discussed here any further.

SimpleWebServer例子

What should be discussed instead is another example. While the click stream is a nice example to let Akka Streams handle a real world example, it lacks the power to show parallel execution in action. The next example shall represent a small web server that can handle multiple requests in parallel. The web sever shall be able to accept incoming connections and receive byte sequences from them that represent printable ASCII signs. These byte sequences or strings should be split at all newline-characters into smaller parts. After that, the server shall respond to the client with each of the split lines. Alternatively, it could do something else with the lines and give a special answer token, but we want to keep it simple in this example and therefore don't introduce any fancy features. Remember, the server needs to be able to handle multiple requests at the same time, which basically means that no request is allowed to block any other request from further execution. Solving all of these requirements can be hard in an imperative way - with Akka Streams however, we shouldn't need more than a few lines to solve any of these. First, let's have an overview over the server itself:

基本上,只有三个主要的构建模块。第一个需要接受传入连接。第二个需要处理传入的请求,第三个需要发送响应。实现这三个构建块只比实现点击流复杂一点:

def mkServer(address: String, port: Int)(implicit system: ActorSystem, materializer: Materializer): Unit = {
  import system.dispatcher

  val connectionHandler: Sink[Tcp.IncomingConnection, Future[Unit]] =
    Sink.foreach[Tcp.IncomingConnection] { conn =>
      println(s"Incoming connection from: ${conn.remoteAddress}")
      conn.handleWith(serverLogic)
    }

  val incomingCnnections: Source[Tcp.IncomingConnection, Future[Tcp.ServerBinding]] =
    Tcp().bind(address, port)

  val binding: Future[Tcp.ServerBinding] =
    incomingCnnections.to(connectionHandler).run()

  binding onComplete {
    case Success(b) =>
      println(s"Server started, listening on: ${b.localAddress}")
    case Failure(e) =>
      println(s"Server could not be bound to $address:$port: ${e.getMessage}")
  }
}

The function mkServer takes (besides from the address and the port of the server) also an actor system and a materializer as implicit parameters. The control flow of the server is represented by binding, which takes a source of incoming connections and forwards them to a sink of incoming connections. Inside of connectionHandler, which is our sink, we handle every connection by the flow serverLogic, which will be described later. binding returns a Future, which completes when the server has been started or the start failed, which could be the case when the port is already taken by another process. The code however, doesn't completely reflect the graphic as we can't see a building block that handles responses. The reason for this is that the connection already provides this logic by itself. It is a bidirectional flow and not just a unidirectional one as the flows we have seen in the previous examples. As it was the case for materialization, such complex flows shall not be explained here. The official documentation has plenty of material to cover more complex flow graphs. For now it is enough to know that Tcp.IncomingConnection represents a connection that knows how to receive requests and how to send responses. The part that is still missing is the serverLogic building block. It can look like this:

Once again, we are able to split the logic in several simple building blocks that all together form the flow of our program. First we want to split our sequence of bytes in lines, which we have to do whenever we find a newline character. After that, the bytes of each line need to be converted to a string because working with raw bytes is cumbersome. Overall we could receive a binary stream of a complicated protocol, which would make working with the incoming raw data extremely challenging. Once we have a readable string, we can create an answer. For simplicity reasons the answer can be anything in our case. In the end, we have to convert back our answer to a sequence of bytes that can be sent over the wire. The code for the entire logic may look like this:

val serverLogic: Flow[ByteString, ByteString, Unit] = {
  val delimiter = Framing.delimiter(
    ByteString("\n"),
    maximumFrameLength = 256,
    allowTruncation = true)

  val receiver = Flow[ByteString].map { bytes =>
    val message = bytes.utf8String
    println(s"Server received: $message")
    message
  }

  val responder = Flow[String].map { message =>
    val answer = s"Server hereby responds to message: $message\n"
    ByteString(answer)
  }

  Flow[ByteString]
    .via(delimiter)
    .via(receiver)
    .via(responder)
}

We already know that serverLogic is a flow that takes a ByteString and has to produce a ByteString. With delimiter we can split a ByteString in smaller parts - in our case it needs to happen whenever a newline character occurs. receiver is the flow that takes all of the split byte sequences and converts them to a string. This is of course a dangerous conversion, since only printable ASCII characters should be converted to a string but for our needs it is good enough. responder is the last component and is responsible for creating an answer and converting the answer back to a sequence of bytes. As opposed to the graphic we didn't split this last component in two, since the logic is trivial. At the end, we connect all of the flows through the via function. At this point one may ask whether we took care of the multi-user property that was mentioned at the beginning. And indeed we did even though it may not be obvious immediately. By looking at this graphic it should get more clear:

The serverLogic component is nothing but a flow that contains smaller flows. This component takes an input, which is a request, and produces an output, which is the response. Since flows can be constructed multiple times and they all work independently to each other, we achieve through this nesting our multi-user property. Every request is handled within its own request and therefore a short running request can overrun a previously started long running request. In case you wondered, the definition of serverLogic that was shown previously can of course be written a lot shorter by inlining most of its inner definitions:

val serverLogic = Flow[ByteString]
  .via(Framing.delimiter(
      ByteString("\n"),
      maximumFrameLength = 256,
      allowTruncation = true))
  .map(_.utf8String)
  .map(msg => s"Server hereby responds to message: $msg\n")
  .map(ByteString(_))

web服务器的测试可能是这样的:

$ # Client
$ echo "Hello World\nHow are you?" | netcat 127.0.0.1 6666
Server hereby responds to message: Hello World
Server hereby responds to message: How are you?

为了使上面的代码示例正确运行,我们首先需要启动服务器,这是由startServer脚本描述的:

$ # Server
$ ./startServer 127.0.0.1 6666
[DEBUG] Server started, listening on: /127.0.0.1:6666
[DEBUG] Incoming connection from: /127.0.0.1:37972
[DEBUG] Server received: Hello World
[DEBUG] Server received: How are you?

这个简单的TCP服务器的完整代码示例可以在这里找到。我们不仅可以用Akka Streams编写服务器,还可以写客户端。它可能是这样的:

val connection = Tcp().outgoingConnection(address, port)
val flow = Flow[ByteString]
  .via(Framing.delimiter(
      ByteString("\n"),
      maximumFrameLength = 256,
      allowTruncation = true))
  .map(_.utf8String)
  .map(println)
  .map(_ ⇒ StdIn.readLine("> "))
  .map(_+"\n")
  .map(ByteString(_))

connection.join(flow).run()

TCP客户端的完整代码可以在这里找到。代码看起来非常相似,但与服务器相比,我们不再需要管理传入的连接。

复杂的图形

In the previous sections we have seen how we can construct simple programs out of flows. However, in reality it is often not enough to just rely on already built-in functions to construct more complex streams. If we want to be able to use Akka Streams for arbitrary programs we need to know how to build our own custom control structures and combinable flows that allow us to tackle the complexity of our applications. The good news is that Akka Streams was designed to scale with the needs of the users and in order to give you a short introduction into the more complex parts of Akka Streams, we add some more features to our client/server example.

我们还不能做的一件事是关闭连接。在这一点上,它开始变得有点复杂,因为我们到目前为止看到的流API不允许我们在任意点停止流。然而,GraphStage抽象可用于创建具有任意数量输入或输出端口的任意图形处理阶段。让我们先来看看服务器端,在那里我们引入了一个名为closeConnection的新组件:

val closeConnection = new GraphStage[FlowShape[String, String]] {
  val in = Inlet[String]("closeConnection.in")
  val out = Outlet[String]("closeConnection.out")

  override val shape = FlowShape(in, out)

  override def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) {
    setHandler(in, new InHandler {
      override def onPush() = grab(in) match {
        case "q" ⇒
          push(out, "BYE")
          completeStage()
        case msg ⇒
          push(out, s"Server hereby responds to message: $msg\n")
      }
    })
    setHandler(out, new OutHandler {
      override def onPull() = pull(in)
    })
  }
}

This API looks a lot more cumbersome than the flow API. No wonder, we have to do a lot of imperative steps here. In exchange, we have more control over the behavior of our streams. In the above example, we only specify one input and one output port and make them available to the system by overriding the shape value. Furthermore we defined a so called InHandler and a OutHandler, which are in this order responsible for receiving and emitting elements. If you looked closely to the full click stream example you should recognize these components already. In the InHandler we grab an element and if it is a string with a single character 'q', we want to close the stream. In order to give the client a chance to find out that the stream will get closed soon, we emit the string "BYE" and then we immediately close the stage afterwards. The closeConnection component can be combined with a stream via the via method, which was introduced in the section about flows.

除了能够关闭连接之外,如果我们能够向新创建的连接显示欢迎消息也会很好。为了做到这一点,我们必须再进一步:

def serverLogic
    (conn: Tcp.IncomingConnection)
    (implicit system: ActorSystem)
    : Flow[ByteString, ByteString, NotUsed]
    = Flow.fromGraph(GraphDSL.create() { implicit b ⇒
  import GraphDSL.Implicits._
  val welcome = Source.single(ByteString(s"Welcome port ${conn.remoteAddress}!\n"))
  val logic = b.add(internalLogic)
  val concat = b.add(Concat[ByteString]())
  welcome ~> concat.in(0)
  logic.outlet ~> concat.in(1)

  FlowShape(logic.in, concat.out)
})

The function serverLogic now takes the incoming connection as a parameter. Inside of its body we use a DSL that allows us to describe complex stream behavior. With welcome we create a stream that can only emit one element - the welcome message. logic is what was described as serverLogic in the previous section. The only notable difference is that we added closeConnection to it. Now actually comes the interesting part of the DSL. The GraphDSL.create function makes a builder b available, which is used to express the stream as a graph. With the ~> function it is possible to connect input and output ports with each other. The Concat component that is used in the example can concatenate elements and is here used to prepend the welcome message in front of the other elements that come out of internalLogic. In the last line, we only make the input port of the server logic and the output port of the concatenated stream available because all the other ports shall remain an implementation detail of the serverLogic component. For an in-depth introduction to the graph DSL of Akka Streams, visit the corresponding section in the official documentation. The full code example of the complex TCP server and of a client that can communicate with it can be found here. Whenever you open a new connection from the client you should see a welcoming message and by typing "q" on the client you should see a message that tells you that the connection has been canceled.

还有一些问题没有在这个答案中涵盖。特别是物质化可能会吓到一个或另一个读者,但我相信,有了这里所涵盖的材料,每个人都应该能够自己走下一个步骤。如前所述,官方文档是继续学习Akka Streams的好地方。

其他回答

这个答案是基于akka-stream 2.4.2版本的。在其他版本中,API可能略有不同。依赖项可以被sbt使用:

libraryDependencies += "com.typesafe.akka" %% "akka-stream" % "2.4.2"

好了,我们开始吧。Akka Streams的API由三种主要类型组成。与反应式流相比,这些类型更强大,因此也更复杂。假设对于所有的代码示例,已经存在以下定义:

import scala.concurrent._
import akka._
import akka.actor._
import akka.stream._
import akka.stream.scaladsl._
import akka.util._

implicit val system = ActorSystem("TestSystem")
implicit val materializer = ActorMaterializer()
import system.dispatcher

The import statements are needed for the type declarations. system represents the actor system of Akka and materializer represents the evaluation context of the stream. In our case we use a ActorMaterializer, which means that the streams are evaluated on top of actors. Both values are marked as implicit, which gives the Scala compiler the possibility to inject these two dependencies automatically whenever they are needed. We also import system.dispatcher, which is a execution context for Futures.

新的API

Akka流具有以下关键属性:

They implement the Reactive Streams specification, whose three main goals backpressure, async and non-blocking boundaries and interoperability between different implementations do fully apply for Akka Streams too. They provide an abstraction for an evaluation engine for the streams, which is called Materializer. Programs are formulated as reusable building blocks, which are represented as the three main types Source, Sink and Flow. The building blocks form a graph whose evaluation is based on the Materializer and needs to be explicitly triggered.

下面将更深入地介绍如何使用这三种主要类型。

源是数据的创建者,它作为流的输入源。每个源都有一个输出通道,没有输入通道。所有数据都通过输出通道流向连接到源的任何设备。

图片来自boldradius.com。

可以通过多种方式创建Source:

scala> val s = Source.empty
s: akka.stream.scaladsl.Source[Nothing,akka.NotUsed] = ...

scala> val s = Source.single("single element")
s: akka.stream.scaladsl.Source[String,akka.NotUsed] = ...

scala> val s = Source(1 to 3)
s: akka.stream.scaladsl.Source[Int,akka.NotUsed] = ...

scala> val s = Source(Future("single value from a Future"))
s: akka.stream.scaladsl.Source[String,akka.NotUsed] = ...

scala> s runForeach println
res0: scala.concurrent.Future[akka.Done] = ...
single value from a Future

In the above cases we fed the Source with finite data, which means they will terminate eventually. One should not forget, that Reactive Streams are lazy and asynchronous by default. This means one explicitly has to request the evaluation of the stream. In Akka Streams this can be done through the run* methods. The runForeach would be no different to the well known foreach function - through the run addition it makes explicit that we ask for an evaluation of the stream. Since finite data is boring, we continue with infinite one:

scala> val s = Source.repeat(5)
s: akka.stream.scaladsl.Source[Int,akka.NotUsed] = ...

scala> s take 3 runForeach println
res1: scala.concurrent.Future[akka.Done] = ...
5
5
5

使用take方法,我们可以创建一个人为的停止点,防止我们无限地求值。由于actor支持是内置的,我们还可以轻松地将消息发送给actor:

def run(actor: ActorRef) = {
  Future { Thread.sleep(300); actor ! 1 }
  Future { Thread.sleep(200); actor ! 2 }
  Future { Thread.sleep(100); actor ! 3 }
}
val s = Source
  .actorRef[Int](bufferSize = 0, OverflowStrategy.fail)
  .mapMaterializedValue(run)

scala> s runForeach println
res1: scala.concurrent.Future[akka.Done] = ...
3
2
1

We can see that the Futures are executed asynchronously on different threads, which explains the result. In the above example a buffer for the incoming elements is not necessary and therefore with OverflowStrategy.fail we can configure that the stream should fail on a buffer overflow. Especially through this actor interface, we can feed the stream through any data source. It doesn't matter if the data is created by the same thread, by a different one, by another process or if they come from a remote system over the Internet.

Sink

接收器基本上与源相反。它是流的端点,因此消耗数据。Sink只有一个输入通道,没有输出通道。当我们希望以可重用的方式指定数据收集器的行为而不计算流时,接收器尤其需要。已知的run*方法不允许我们使用这些属性,因此首选使用Sink。

图片来自boldradius.com。

一个简单的Sink实例:

scala> val source = Source(1 to 3)
source: akka.stream.scaladsl.Source[Int,akka.NotUsed] = ...

scala> val sink = Sink.foreach[Int](elem => println(s"sink received: $elem"))
sink: akka.stream.scaladsl.Sink[Int,scala.concurrent.Future[akka.Done]] = ...

scala> val flow = source to sink
flow: akka.stream.scaladsl.RunnableGraph[akka.NotUsed] = ...

scala> flow.run()
res3: akka.NotUsed = NotUsed
sink received: 1
sink received: 2
sink received: 3

可以使用to方法将源连接到接收器。它返回一个所谓的RunnableFlow,我们稍后将看到一种特殊形式的流——可以通过调用它的run()方法来执行的流。

图片来自boldradius.com。

当然,可以将到达接收器的所有值转发给actor:

val actor = system.actorOf(Props(new Actor {
  override def receive = {
    case msg => println(s"actor received: $msg")
  }
}))

scala> val sink = Sink.actorRef[Int](actor, onCompleteMessage = "stream completed")
sink: akka.stream.scaladsl.Sink[Int,akka.NotUsed] = ...

scala> val runnable = Source(1 to 3) to sink
runnable: akka.stream.scaladsl.RunnableGraph[akka.NotUsed] = ...

scala> runnable.run()
res3: akka.NotUsed = NotUsed
actor received: 1
actor received: 2
actor received: 3
actor received: stream completed

Flow

如果你需要在Akka流和现有系统之间建立连接,数据源和接收器是很好的选择,但实际上你无法对它们做任何事情。流是Akka Streams基本抽象中最后一个缺失的部分。它们充当不同流之间的连接器,并可用于转换流的元素。

图片来自boldradius.com。

如果一个流连接到一个源,一个新的源就是结果。同样地,连接到接收器的流将创建一个新的接收器。同时连接源和接收器的流会产生RunnableFlow。因此,它们位于输入和输出通道之间,但只要它们没有连接到源或接收器,它们本身就不对应于任何一种风味。

图片来自boldradius.com。

为了更好地理解flow,我们将看一些例子:

scala> val source = Source(1 to 3)
source: akka.stream.scaladsl.Source[Int,akka.NotUsed] = ...

scala> val sink = Sink.foreach[Int](println)
sink: akka.stream.scaladsl.Sink[Int,scala.concurrent.Future[akka.Done]] = ...

scala> val invert = Flow[Int].map(elem => elem * -1)
invert: akka.stream.scaladsl.Flow[Int,Int,akka.NotUsed] = ...

scala> val doubler = Flow[Int].map(elem => elem * 2)
doubler: akka.stream.scaladsl.Flow[Int,Int,akka.NotUsed] = ...

scala> val runnable = source via invert via doubler to sink
runnable: akka.stream.scaladsl.RunnableGraph[akka.NotUsed] = ...

scala> runnable.run()
res10: akka.NotUsed = NotUsed
-2
-4
-6

通过Via方法,我们可以将源与流连接起来。我们需要指定输入类型,因为编译器不能为我们推断它。正如我们在这个简单的例子中已经看到的,倒置和双流完全独立于任何数据生产者和消费者。它们只是转换数据并将其转发到输出通道。这意味着我们可以在多个流之间重用一个流:

scala> val s1 = Source(1 to 3) via invert to sink
s1: akka.stream.scaladsl.RunnableGraph[akka.NotUsed] = ...

scala> val s2 = Source(-3 to -1) via invert to sink
s2: akka.stream.scaladsl.RunnableGraph[akka.NotUsed] = ...

scala> s1.run()
res10: akka.NotUsed = NotUsed
-1
-2
-3

scala> s2.run()
res11: akka.NotUsed = NotUsed
3
2
1

S1和s2代表了全新的流——它们不通过构建块共享任何数据。

无界数据流

在我们继续之前,我们应该首先回顾一下反应流的一些关键方面。不限数量的元素可以到达任意点,并可以将流置于不同的状态。除了通常的可运行流状态外,流可能会通过错误或指示没有进一步数据到达的信号而停止。流可以通过在时间轴上标记事件以图形化的方式建模,如下所示:

图片取自你错过的《响应式编程导论》。

我们已经在前一节的示例中看到了可运行流。只要流可以实际物化,我们就会得到RunnableGraph,这意味着Sink连接到Source。到目前为止,我们总是物化为值Unit,可以在类型中看到:

val source: Source[Int, NotUsed] = Source(1 to 3)
val sink: Sink[Int, Future[Done]] = Sink.foreach[Int](println)
val flow: Flow[Int, Int, NotUsed] = Flow[Int].map(x => x)

For Source and Sink the second type parameter and for Flow the third type parameter denote the materialized value. Throughout this answer, the full meaning of materialization shall not be explained. However, further details about materialization can be found at the official documentation. For now the only thing we need to know is that the materialized value is what we get when we run a stream. Since we were only interested in side effects so far, we got Unit as the materialized value. The exception to this was a materialization of a sink, which resulted in a Future. It gave us back a Future, since this value can denote when the stream that is connected to the sink has been ended. So far, the previous code examples were nice to explain the concept but they were also boring because we only dealt with finite streams or with very simple infinite ones. To make it more interesting, in the following a full asynchronous and unbounded stream shall be explained.

点击流的例子

As an example, we want to have a stream that captures click events. To make it more challenging, let's say we also want to group click events that happen in a short time after each other. This way we could easily discover double, triple or tenfold clicks. Furthermore, we want to filter out all single clicks. Take a deep breath and imagine how you would solve that problem in an imperative manner. I bet no one would be able to implement a solution that works correctly on the first try. In a reactive fashion this problem is trivial to solve. In fact, the solution is so simple and straightforward to implement that we can even express it in a diagram that directly describes the behavior of the code:

图片取自你错过的《响应式编程导论》。

The gray boxes are functions that describe how one stream is transformed into another. With the throttle function we accumulate clicks within 250 milliseconds, the map and filter functions should be self-explanatory. The color orbs represent an event and the arrows depict how they flow through our functions. Later in the processing steps, we get less and less elements that flow through our stream, since we group them together and filter them out. The code for this image would look something like this:

val multiClickStream = clickStream
    .throttle(250.millis)
    .map(clickEvents => clickEvents.length)
    .filter(numberOfClicks => numberOfClicks >= 2)

整个逻辑可以用四行代码来表示!在Scala中,我们可以把它写得更短:

val multiClickStream = clickStream.throttle(250.millis).map(_.length).filter(_ >= 2)

The definition of clickStream is a little bit more complex but this is only the case because the example program runs on the JVM, where capturing of click events is not easily possible. Another complication is that Akka by default doesn't provide the throttle function. Instead we had to write it by ourselves. Since this function is (as it is the case for the map or filter functions) reusable across different use cases I don't count these lines to the number of lines we needed to implement the logic. In imperative languages however, it is normal that logic can't be reused that easily and that the different logical steps happen all at one place instead of being applied sequentially, which means that we probably would have misshaped our code with the throttling logic. The full code example is available as a gist and shall not be discussed here any further.

SimpleWebServer例子

What should be discussed instead is another example. While the click stream is a nice example to let Akka Streams handle a real world example, it lacks the power to show parallel execution in action. The next example shall represent a small web server that can handle multiple requests in parallel. The web sever shall be able to accept incoming connections and receive byte sequences from them that represent printable ASCII signs. These byte sequences or strings should be split at all newline-characters into smaller parts. After that, the server shall respond to the client with each of the split lines. Alternatively, it could do something else with the lines and give a special answer token, but we want to keep it simple in this example and therefore don't introduce any fancy features. Remember, the server needs to be able to handle multiple requests at the same time, which basically means that no request is allowed to block any other request from further execution. Solving all of these requirements can be hard in an imperative way - with Akka Streams however, we shouldn't need more than a few lines to solve any of these. First, let's have an overview over the server itself:

基本上,只有三个主要的构建模块。第一个需要接受传入连接。第二个需要处理传入的请求,第三个需要发送响应。实现这三个构建块只比实现点击流复杂一点:

def mkServer(address: String, port: Int)(implicit system: ActorSystem, materializer: Materializer): Unit = {
  import system.dispatcher

  val connectionHandler: Sink[Tcp.IncomingConnection, Future[Unit]] =
    Sink.foreach[Tcp.IncomingConnection] { conn =>
      println(s"Incoming connection from: ${conn.remoteAddress}")
      conn.handleWith(serverLogic)
    }

  val incomingCnnections: Source[Tcp.IncomingConnection, Future[Tcp.ServerBinding]] =
    Tcp().bind(address, port)

  val binding: Future[Tcp.ServerBinding] =
    incomingCnnections.to(connectionHandler).run()

  binding onComplete {
    case Success(b) =>
      println(s"Server started, listening on: ${b.localAddress}")
    case Failure(e) =>
      println(s"Server could not be bound to $address:$port: ${e.getMessage}")
  }
}

The function mkServer takes (besides from the address and the port of the server) also an actor system and a materializer as implicit parameters. The control flow of the server is represented by binding, which takes a source of incoming connections and forwards them to a sink of incoming connections. Inside of connectionHandler, which is our sink, we handle every connection by the flow serverLogic, which will be described later. binding returns a Future, which completes when the server has been started or the start failed, which could be the case when the port is already taken by another process. The code however, doesn't completely reflect the graphic as we can't see a building block that handles responses. The reason for this is that the connection already provides this logic by itself. It is a bidirectional flow and not just a unidirectional one as the flows we have seen in the previous examples. As it was the case for materialization, such complex flows shall not be explained here. The official documentation has plenty of material to cover more complex flow graphs. For now it is enough to know that Tcp.IncomingConnection represents a connection that knows how to receive requests and how to send responses. The part that is still missing is the serverLogic building block. It can look like this:

Once again, we are able to split the logic in several simple building blocks that all together form the flow of our program. First we want to split our sequence of bytes in lines, which we have to do whenever we find a newline character. After that, the bytes of each line need to be converted to a string because working with raw bytes is cumbersome. Overall we could receive a binary stream of a complicated protocol, which would make working with the incoming raw data extremely challenging. Once we have a readable string, we can create an answer. For simplicity reasons the answer can be anything in our case. In the end, we have to convert back our answer to a sequence of bytes that can be sent over the wire. The code for the entire logic may look like this:

val serverLogic: Flow[ByteString, ByteString, Unit] = {
  val delimiter = Framing.delimiter(
    ByteString("\n"),
    maximumFrameLength = 256,
    allowTruncation = true)

  val receiver = Flow[ByteString].map { bytes =>
    val message = bytes.utf8String
    println(s"Server received: $message")
    message
  }

  val responder = Flow[String].map { message =>
    val answer = s"Server hereby responds to message: $message\n"
    ByteString(answer)
  }

  Flow[ByteString]
    .via(delimiter)
    .via(receiver)
    .via(responder)
}

We already know that serverLogic is a flow that takes a ByteString and has to produce a ByteString. With delimiter we can split a ByteString in smaller parts - in our case it needs to happen whenever a newline character occurs. receiver is the flow that takes all of the split byte sequences and converts them to a string. This is of course a dangerous conversion, since only printable ASCII characters should be converted to a string but for our needs it is good enough. responder is the last component and is responsible for creating an answer and converting the answer back to a sequence of bytes. As opposed to the graphic we didn't split this last component in two, since the logic is trivial. At the end, we connect all of the flows through the via function. At this point one may ask whether we took care of the multi-user property that was mentioned at the beginning. And indeed we did even though it may not be obvious immediately. By looking at this graphic it should get more clear:

The serverLogic component is nothing but a flow that contains smaller flows. This component takes an input, which is a request, and produces an output, which is the response. Since flows can be constructed multiple times and they all work independently to each other, we achieve through this nesting our multi-user property. Every request is handled within its own request and therefore a short running request can overrun a previously started long running request. In case you wondered, the definition of serverLogic that was shown previously can of course be written a lot shorter by inlining most of its inner definitions:

val serverLogic = Flow[ByteString]
  .via(Framing.delimiter(
      ByteString("\n"),
      maximumFrameLength = 256,
      allowTruncation = true))
  .map(_.utf8String)
  .map(msg => s"Server hereby responds to message: $msg\n")
  .map(ByteString(_))

web服务器的测试可能是这样的:

$ # Client
$ echo "Hello World\nHow are you?" | netcat 127.0.0.1 6666
Server hereby responds to message: Hello World
Server hereby responds to message: How are you?

为了使上面的代码示例正确运行,我们首先需要启动服务器,这是由startServer脚本描述的:

$ # Server
$ ./startServer 127.0.0.1 6666
[DEBUG] Server started, listening on: /127.0.0.1:6666
[DEBUG] Incoming connection from: /127.0.0.1:37972
[DEBUG] Server received: Hello World
[DEBUG] Server received: How are you?

这个简单的TCP服务器的完整代码示例可以在这里找到。我们不仅可以用Akka Streams编写服务器,还可以写客户端。它可能是这样的:

val connection = Tcp().outgoingConnection(address, port)
val flow = Flow[ByteString]
  .via(Framing.delimiter(
      ByteString("\n"),
      maximumFrameLength = 256,
      allowTruncation = true))
  .map(_.utf8String)
  .map(println)
  .map(_ ⇒ StdIn.readLine("> "))
  .map(_+"\n")
  .map(ByteString(_))

connection.join(flow).run()

TCP客户端的完整代码可以在这里找到。代码看起来非常相似,但与服务器相比,我们不再需要管理传入的连接。

复杂的图形

In the previous sections we have seen how we can construct simple programs out of flows. However, in reality it is often not enough to just rely on already built-in functions to construct more complex streams. If we want to be able to use Akka Streams for arbitrary programs we need to know how to build our own custom control structures and combinable flows that allow us to tackle the complexity of our applications. The good news is that Akka Streams was designed to scale with the needs of the users and in order to give you a short introduction into the more complex parts of Akka Streams, we add some more features to our client/server example.

我们还不能做的一件事是关闭连接。在这一点上,它开始变得有点复杂,因为我们到目前为止看到的流API不允许我们在任意点停止流。然而,GraphStage抽象可用于创建具有任意数量输入或输出端口的任意图形处理阶段。让我们先来看看服务器端,在那里我们引入了一个名为closeConnection的新组件:

val closeConnection = new GraphStage[FlowShape[String, String]] {
  val in = Inlet[String]("closeConnection.in")
  val out = Outlet[String]("closeConnection.out")

  override val shape = FlowShape(in, out)

  override def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) {
    setHandler(in, new InHandler {
      override def onPush() = grab(in) match {
        case "q" ⇒
          push(out, "BYE")
          completeStage()
        case msg ⇒
          push(out, s"Server hereby responds to message: $msg\n")
      }
    })
    setHandler(out, new OutHandler {
      override def onPull() = pull(in)
    })
  }
}

This API looks a lot more cumbersome than the flow API. No wonder, we have to do a lot of imperative steps here. In exchange, we have more control over the behavior of our streams. In the above example, we only specify one input and one output port and make them available to the system by overriding the shape value. Furthermore we defined a so called InHandler and a OutHandler, which are in this order responsible for receiving and emitting elements. If you looked closely to the full click stream example you should recognize these components already. In the InHandler we grab an element and if it is a string with a single character 'q', we want to close the stream. In order to give the client a chance to find out that the stream will get closed soon, we emit the string "BYE" and then we immediately close the stage afterwards. The closeConnection component can be combined with a stream via the via method, which was introduced in the section about flows.

除了能够关闭连接之外,如果我们能够向新创建的连接显示欢迎消息也会很好。为了做到这一点,我们必须再进一步:

def serverLogic
    (conn: Tcp.IncomingConnection)
    (implicit system: ActorSystem)
    : Flow[ByteString, ByteString, NotUsed]
    = Flow.fromGraph(GraphDSL.create() { implicit b ⇒
  import GraphDSL.Implicits._
  val welcome = Source.single(ByteString(s"Welcome port ${conn.remoteAddress}!\n"))
  val logic = b.add(internalLogic)
  val concat = b.add(Concat[ByteString]())
  welcome ~> concat.in(0)
  logic.outlet ~> concat.in(1)

  FlowShape(logic.in, concat.out)
})

The function serverLogic now takes the incoming connection as a parameter. Inside of its body we use a DSL that allows us to describe complex stream behavior. With welcome we create a stream that can only emit one element - the welcome message. logic is what was described as serverLogic in the previous section. The only notable difference is that we added closeConnection to it. Now actually comes the interesting part of the DSL. The GraphDSL.create function makes a builder b available, which is used to express the stream as a graph. With the ~> function it is possible to connect input and output ports with each other. The Concat component that is used in the example can concatenate elements and is here used to prepend the welcome message in front of the other elements that come out of internalLogic. In the last line, we only make the input port of the server logic and the output port of the concatenated stream available because all the other ports shall remain an implementation detail of the serverLogic component. For an in-depth introduction to the graph DSL of Akka Streams, visit the corresponding section in the official documentation. The full code example of the complex TCP server and of a client that can communicate with it can be found here. Whenever you open a new connection from the client you should see a welcoming message and by typing "q" on the client you should see a message that tells you that the connection has been canceled.

还有一些问题没有在这个答案中涵盖。特别是物质化可能会吓到一个或另一个读者,但我相信,有了这里所涵盖的材料,每个人都应该能够自己走下一个步骤。如前所述,官方文档是继续学习Akka Streams的好地方。