我刚刚开始使用RabbitMQ和AMQP。

我有一个消息队列 我有多个消费者,我希望用相同的消息做不同的事情。

大多数RabbitMQ文档似乎都关注于循环,即单个消息由单个消费者消费,负载分布在每个消费者之间。这确实是我亲眼所见的行为。

举个例子:生产者只有一个队列,每2秒发送一次消息:

var amqp = require('amqp');
var connection = amqp.createConnection({ host: "localhost", port: 5672 });
var count = 1;

connection.on('ready', function () {
  var sendMessage = function(connection, queue_name, payload) {
    var encoded_payload = JSON.stringify(payload);  
    connection.publish(queue_name, encoded_payload);
  }

  setInterval( function() {    
    var test_message = 'TEST '+count
    sendMessage(connection, "my_queue_name", test_message)  
    count += 1;
  }, 2000) 


})

这是一位消费者:

var amqp = require('amqp');
var connection = amqp.createConnection({ host: "localhost", port: 5672 });
connection.on('ready', function () {
  connection.queue("my_queue_name", function(queue){
    queue.bind('#'); 
    queue.subscribe(function (message) {
      var encoded_payload = unescape(message.data)
      var payload = JSON.parse(encoded_payload)
      console.log('Recieved a message:')
      console.log(payload)
    })
  })
})

如果我启动消费者两次,我可以看到每个消费者都在循环行为中使用备用消息。例如,我将在一个终端上看到消息1,3,5,在另一个终端上看到消息2,4,6。

我的问题是:

我能让每个消费者收到相同的消息吗?也就是说,两个消费者都得到消息1,2,3,4,5,6 ?这在AMQP/RabbitMQ中被称为什么?它通常是如何配置的? 这种做法普遍吗?我是否应该让交换器将消息路由到两个单独的队列中,只有一个消费者?


当前回答

显然你想要的是扇形散开。扇出

阅读rabbitMQ教程: https://www.rabbitmq.com/tutorials/tutorial-three-javascript.html

以下是我的例子:

Publisher.js:

amqp.connect('amqp://<user>:<pass>@<host>:<port>', async (error0, connection) => {
    if (error0) {
      throw error0;
    }
    console.log('RabbitMQ connected')
    try {
      // Create exchange for queues
      channel = await connection.createChannel()
      await channel.assertExchange(process.env.EXCHANGE_NAME, 'fanout', { durable: false });
      await channel.publish(process.env.EXCHANGE_NAME, '', Buffer.from('msg'))
    } catch(error) {
      console.error(error)
    }
})

Subscriber.js:

amqp.connect('amqp://<user>:<pass>@<host>:<port>', async (error0, connection) => {
    if (error0) {
      throw error0;
    }
    console.log('RabbitMQ connected')
    try {
      // Create/Bind a consumer queue for an exchange broker
      channel = await connection.createChannel()
      await channel.assertExchange(process.env.EXCHANGE_NAME, 'fanout', { durable: false });
      const queue = await channel.assertQueue('', {exclusive: true})
      channel.bindQueue(queue.queue, process.env.EXCHANGE_NAME, '')

      console.log(" [*] Waiting for messages in %s. To exit press CTRL+C");
      channel.consume('', consumeMessage, {noAck: true});
    } catch(error) {
      console.error(error)
    }
});

这是我在网上找到的一个例子。也许还能帮上忙。 https://www.codota.com/code/javascript/functions/amqplib/Channel/assertExchange

其他回答

是的,每个消费者都可以收到相同的消息。看一看 http://www.rabbitmq.com/tutorials/tutorial-three-python.html http://www.rabbitmq.com/tutorials/tutorial-four-python.html http://www.rabbitmq.com/tutorials/tutorial-five-python.html

用于路由消息的不同方法。我知道他们是python和java,但它很好地理解原则,决定你在做什么,然后找到如何在JS中做到这一点。听起来好像您想要做一个简单的扇出(教程3),它将消息发送到连接到交换机的所有队列。

你正在做的和你想做的之间的区别基本上是你要设置和交换或输入扇出。扇出交换机将所有消息发送到所有连接的队列。每个队列都有一个消费者,该消费者可以单独访问所有消息。

是的,这是很常见的,这是AMPQ的特点之一。

发送模式是一对一的关系。如果你想“发送”给多个接收者,你应该使用“发布/订阅”模式。详情见http://www.rabbitmq.com/tutorials/tutorial-three-python.html。

请阅读rabbitmq教程。你发布消息是为了交换,而不是为了排队;然后将其路由到适当的队列。在本例中,您应该为每个消费者绑定单独的队列。这样,它们就可以完全独立地使用消息。

要获得您想要的行为,只需让每个消费者从自己的队列中消费。您必须使用非直接交换类型(主题、报头、扇出),以便将消息一次性发送到所有队列。

我能让每个消费者收到相同的消息吗?也就是说,两个消费者都得到消息1,2,3,4,5,6 ?这在AMQP/RabbitMQ中被称为什么?它通常是如何配置的?

不,如果消费者在同一个队列上,就不会。摘自RabbitMQ的AMQP概念指南:

重要的是要理解,在AMQP 0-9-1中,消息在消费者之间是负载均衡的。

这似乎意味着队列中的循环行为是给定的,不可配置。也就是说,为了让多个使用者处理相同的消息ID,需要单独的队列。

这种做法普遍吗?我是否应该让交换器将消息路由到两个单独的队列中,只有一个消费者?

不,它不是,单个队列/多个消费者,每个消费者处理相同的消息ID是不可能的。让交换器将消息路由到两个单独的队列确实更好。

由于我不需要太复杂的路由,扇出交换机将很好地处理这个问题。我之前没有过多关注exchange,因为node-amqp具有“默认交换”的概念,允许您直接将消息发布到连接,但是大多数AMQP消息都发布到特定的交换。

以下是我的扇出交换,包括发送和接收:

var amqp = require('amqp');
var connection = amqp.createConnection({ host: "localhost", port: 5672 });
var count = 1;

connection.on('ready', function () {
  connection.exchange("my_exchange", options={type:'fanout'}, function(exchange) {   
 
    var sendMessage = function(exchange, payload) {
      console.log('about to publish')
      var encoded_payload = JSON.stringify(payload);
      exchange.publish('', encoded_payload, {})
    }

    // Recieve messages
    connection.queue("my_queue_name", function(queue){
      console.log('Created queue')
      queue.bind(exchange, ''); 
      queue.subscribe(function (message) {
        console.log('subscribed to queue')
        var encoded_payload = unescape(message.data)
        var payload = JSON.parse(encoded_payload)
        console.log('Recieved a message:')
        console.log(payload)
      })
    })
  
    setInterval( function() {    
      var test_message = 'TEST '+count
      sendMessage(exchange, test_message)  
      count += 1;
    }, 2000) 
 })
})