RabbitMQ Java客户端包含以下几个概念:

连接-到RabbitMQ服务器实例的连接 通道- ?? 消费者线程池——一个线程池,用于从RabbitMQ服务器队列中消费消息 队列-以FIFO顺序保存消息的结构

我试图理解它们之间的关系,更重要的是,它们之间的联系。

I'm still not quite sure what a Channel is, other than the fact that this is the structure that you publish and consume from, and that it is created from an open connection. If someone could explain to me what the "Channel" represents, it might help clear a few things up. What is the relationship between Channel and Queue? Can the same Channel be used to communicate to multiples Queues, or does it have to be 1:1? What is the relationship between Queue and the Consumer Pool? Can multiple Consumers be subscribed to the same Queue? Can multiple Queues be consumed by the same Consumer? Or is the relationship 1:1?


A Connection represents a real TCP connection to the message broker, whereas a Channel is a virtual connection (AMQP connection) inside it. This way you can use as many (virtual) connections as you want inside your application without overloading the broker with TCP connections. You can use one Channel for everything. However, if you have multiple threads, it's suggested to use a different Channel for each thread. Channel thread-safety in Java Client API Guide: Channel instances are safe for use by multiple threads. Requests into a Channel are serialized, with only one thread being able to run a command on the Channel at a time. Even so, applications should prefer using a Channel per thread instead of sharing the same Channel across multiple threads. There is no direct relation between Channel and Queue. A Channel is used to send AMQP commands to the broker. This can be the creation of a queue or similar, but these concepts are not tied together. Each Consumer runs in its own thread allocated from the consumer thread pool. If multiple Consumers are subscribed to the same Queue, the broker uses round-robin to distribute the messages between them equally. See Tutorial two: "Work Queues". It is also possible to attach the same Consumer to multiple Queues. You can understand Consumers as callbacks. These are called everytime a message arrives on a Queue the Consumer is bound to. For the case of the Java Client, each Consumers has a method handleDelivery(...), which represents the callback method. What you typically do is, subclass DefaultConsumer and override handleDelivery(...). Note: If you attach the same Consumer instance to multiple queues, this method will be called by different threads. So take care of synchronization if necessary.


我找到了这篇文章,它解释了AMQP模型的各个方面,通道是其中之一。我发现这对完善我的理解很有帮助

https://www.rabbitmq.com/tutorials/amqp-concepts.html

Some applications need multiple connections to an AMQP broker. However, it is undesirable to keep many TCP connections open at the same time because doing so consumes system resources and makes it more difficult to configure firewalls. AMQP 0-9-1 connections are multiplexed with channels that can be thought of as "lightweight connections that share a single TCP connection". For applications that use multiple threads/processes for processing, it is very common to open a new channel per thread/process and not share channels between them. Communication on a particular channel is completely separate from communication on another channel, therefore every AMQP method also carries a channel number that clients use to figure out which channel the method is for (and thus, which event handler needs to be invoked, for example).


在这里,对AMQP协议“底层”功能的良好概念理解是有用的。我想说的是,AMQP 0.9.1选择部署的文档和API使这个问题特别令人困惑,所以这个问题本身是许多人不得不努力解决的问题。

博士TL;

连接是与AMQP服务器协商的物理TCP套接字。正确实现的客户端每个应用程序都有一个这样的线程安全的、可在线程间共享的客户机。

通道是连接上的单个应用程序会话。一个线程将有一个或多个这样的会话。AMQP架构0.9.1的特点是,这些线程不能在线程之间共享,并且应该在创建它的线程结束时关闭/销毁。当发生各种违反协议的情况时,服务器也会关闭它们。

使用者是一种虚拟构造,表示特定通道上存在“邮箱”。使用者的使用告诉代理将消息从特定队列推送到该通道端点。

连接的事实

首先,正如其他人正确指出的那样,连接是表示到服务器的实际TCP连接的对象。在AMQP中,连接是在协议级别指定的,与代理的所有通信都是通过一个或多个连接进行的。

Since it's an actual TCP connection, it has an IP Address and Port #. Protocol parameters are negotiated on a per-client basis as part of setting up the connection (a process known as the handshake. It is designed to be long-lived; there are few cases where connection closure is part of the protocol design. From an OSI perspective, it probably resides somewhere around Layer 6 Heartbeats can be set up to monitor the connection status, as TCP does not contain anything in and of itself to do this. It is best to have a dedicated thread manage reads and writes to the underlying TCP socket. Most, if not all, RabbitMQ clients do this. In that regard, they are generally thread-safe. Relatively speaking, connections are "expensive" to create (due to the handshake), but practically speaking, this really doesn't matter. Most processes really will only need one connection object. But, you can maintain connections in a pool, if you find you need more throughput than a single thread/socket can provide (unlikely with current computing technology).

通道的事实

Channel是应用程序会话,为应用程序的每个部分打开,用于与RabbitMQ代理通信。它在单个连接上操作,并表示与代理的会话。

As it represents a logical part of application logic, each channel usually exists on its own thread. Typically, all channels opened by your app will share a single connection (they are lightweight sessions that operate on top of the connection). Connections are thread-safe, so this is OK. Most AMQP operations take place over channels. From an OSI Layer perspective, channels are probably around Layer 7. Channels are designed to be transient; part of the design of AMQP is that the channel is typically closed in response to an error (e.g. re-declaring a queue with different parameters before deleting the existing queue). Since they are transient, channels should not be pooled by your app. The server uses an integer to identify a channel. When the thread managing the connection receives a packet for a particular channel, it uses this number to tell the broker which channel/session the packet belongs to. Channels are not generally thread-safe as it would make no sense to share them among threads. If you have another thread that needs to use the broker, a new channel is needed.

消费者的事实

消费者是AMQP协议定义的对象。它既不是通道也不是连接,而是您的特定应用程序用作发送消息的某种“邮箱”。

"Creating a consumer" means that you tell the broker (using a channel via a connection) that you would like messages pushed to you over that channel. In response, the broker will register that you have a consumer on the channel and begin pushing messages to you. Each message pushed over the connection will reference both a channel number and a consumer number. In that way, the connection-managing thread (in this case, within the Java API) knows what to do with the message; then, the channel-handling thread also knows what to do with the message. Consumer implementation has the widest amount of variation, because it's literally application-specific. In my implementation, I chose to spin off a task each time a message arrived via the consumer; thus, I had a thread managing the connection, a thread managing the channel (and by extension, the consumer), and one or more task threads for each message delivered via the consumer. Closing a connection closes all channels on the connection. Closing a channel closes all consumers on the channel. It is also possible to cancel a consumer (without closing the channel). There are various cases when it makes sense to do any of the three things. Typically, the implementation of a consumer in an AMQP client will allocate one dedicated channel to the consumer to avoid conflicts with the activities of other threads or code (including publishing).

就你所说的消费者线程池而言,我怀疑Java客户端所做的事情与我编写的客户端所做的事情类似(我的客户端基于. net客户端,但经过了大量修改)。


有一种关系,就像TCP连接可以有多个通道。

通道:它是连接中的虚拟连接。当从队列中发布或使用消息时,这都是通过通道完成的。然而连接:它是应用程序和RabbitMQ代理之间的TCP连接。

In multi-threading architecture, you may need a separate connection per thread. That may lead to underutilization of TCP connection, also it adds overhead to the operating system to establish as many TCP connections it requires during the peak time of the network. The performance of the system could be drastically reduced. This is where the channel comes handy, it creates virtual connections inside a TCP connection. It straightaway reduces the overhead of the OS, also it allows us to perform asynchronous operations in a more faster, reliable and simultaneously way.