我刚刚开始使用RabbitMQ和AMQP。

我有一个消息队列 我有多个消费者,我希望用相同的消息做不同的事情。

大多数RabbitMQ文档似乎都关注于循环,即单个消息由单个消费者消费,负载分布在每个消费者之间。这确实是我亲眼所见的行为。

举个例子:生产者只有一个队列,每2秒发送一次消息:

var amqp = require('amqp');
var connection = amqp.createConnection({ host: "localhost", port: 5672 });
var count = 1;

connection.on('ready', function () {
  var sendMessage = function(connection, queue_name, payload) {
    var encoded_payload = JSON.stringify(payload);  
    connection.publish(queue_name, encoded_payload);
  }

  setInterval( function() {    
    var test_message = 'TEST '+count
    sendMessage(connection, "my_queue_name", test_message)  
    count += 1;
  }, 2000) 


})

这是一位消费者:

var amqp = require('amqp');
var connection = amqp.createConnection({ host: "localhost", port: 5672 });
connection.on('ready', function () {
  connection.queue("my_queue_name", function(queue){
    queue.bind('#'); 
    queue.subscribe(function (message) {
      var encoded_payload = unescape(message.data)
      var payload = JSON.parse(encoded_payload)
      console.log('Recieved a message:')
      console.log(payload)
    })
  })
})

如果我启动消费者两次,我可以看到每个消费者都在循环行为中使用备用消息。例如,我将在一个终端上看到消息1,3,5,在另一个终端上看到消息2,4,6。

我的问题是:

我能让每个消费者收到相同的消息吗?也就是说,两个消费者都得到消息1,2,3,4,5,6 ?这在AMQP/RabbitMQ中被称为什么?它通常是如何配置的? 这种做法普遍吗?我是否应该让交换器将消息路由到两个单独的队列中,只有一个消费者?


当前回答

如果你碰巧像我一样使用amqplib库,他们有一个发布/订阅RabbitMQ教程的实现示例,你可能会觉得很方便。

其他回答

我认为你应该检查一下用扇出交换器发送信息。这样你就会收到不同消费者的相同消息,在表下面,RabbitMQ正在为每个新的消费者/订阅者创建不同的队列。

这是javascript教程示例的链接 https://www.rabbitmq.com/tutorials/tutorial-one-javascript.html

RabbitMQ / AMQP:单队列,同一消息和页面刷新的多个消费者。

rabbit.on('ready', function () {    });
    sockjs_chat.on('connection', function (conn) {

        conn.on('data', function (message) {
            try {
                var obj = JSON.parse(message.replace(/\r/g, '').replace(/\n/g, ''));

                if (obj.header == "register") {

                    // Connect to RabbitMQ
                    try {
                        conn.exchange = rabbit.exchange(exchange, { type: 'topic',
                            autoDelete: false,
                            durable: false,
                            exclusive: false,
                            confirm: true
                        });

                        conn.q = rabbit.queue('my-queue-'+obj.agentID, {
                            durable: false,
                            autoDelete: false,
                            exclusive: false
                        }, function () {
                            conn.channel = 'my-queue-'+obj.agentID;
                            conn.q.bind(conn.exchange, conn.channel);

                            conn.q.subscribe(function (message) {
                                console.log("[MSG] ---> " + JSON.stringify(message));
                                conn.write(JSON.stringify(message) + "\n");
                            }).addCallback(function(ok) {
                                ctag[conn.channel] = ok.consumerTag; });
                        });
                    } catch (err) {
                        console.log("Could not create connection to RabbitMQ. \nStack trace -->" + err.stack);
                    }

                } else if (obj.header == "typing") {

                    var reply = {
                        type: 'chatMsg',
                        msg: utils.escp(obj.msga),
                        visitorNick: obj.channel,
                        customField1: '',
                        time: utils.getDateTime(),
                        channel: obj.channel
                    };

                    conn.exchange.publish('my-queue-'+obj.agentID, reply);
                }

            } catch (err) {
                console.log("ERROR ----> " + err.stack);
            }
        });

        // When the visitor closes or reloads a page we need to unbind from RabbitMQ?
        conn.on('close', function () {
            try {

                // Close the socket
                conn.close();

                // Close RabbitMQ           
               conn.q.unsubscribe(ctag[conn.channel]);

            } catch (er) {
                console.log(":::::::: EXCEPTION SOCKJS (ON-CLOSE) ::::::::>>>>>>> " + er.stack);
            }
        });
    });

据我评估,你的情况是

我有一个消息队列(您接收消息的源,让它命名为q111) 我有多个消费者,我希望用相同的消息做不同的事情。

这里的问题是,当这个队列接收到3个消息时,消息1被消费者a消费,其他消费者B和C消费消息2和3。当你需要一个设置,让rabbitmq同时将这三个消息(1,2,3)的相同副本传递给所有连接的消费者(a,B,C)。

虽然可以通过许多配置来实现这一点,但简单的方法是使用以下两步概念:

使用动态rabbitmq-shovel从所需队列(q111)提取消息并发布到扇出交换机(专门为此目的创建和专用的交换机)。 现在重新配置你的消费者A,B和C(他们正在监听队列(q111)),直接使用每个消费者的排他和匿名队列从这个Fanout交换机监听。

注意:当使用这个概念时,不要直接从源队列(q111)消费,因为已经消费的消息不会被移到你的Fanout交换机。

如果您认为这不能满足您的具体要求……欢迎发表你的建议:-)

显然你想要的是扇形散开。扇出

阅读rabbitMQ教程: https://www.rabbitmq.com/tutorials/tutorial-three-javascript.html

以下是我的例子:

Publisher.js:

amqp.connect('amqp://<user>:<pass>@<host>:<port>', async (error0, connection) => {
    if (error0) {
      throw error0;
    }
    console.log('RabbitMQ connected')
    try {
      // Create exchange for queues
      channel = await connection.createChannel()
      await channel.assertExchange(process.env.EXCHANGE_NAME, 'fanout', { durable: false });
      await channel.publish(process.env.EXCHANGE_NAME, '', Buffer.from('msg'))
    } catch(error) {
      console.error(error)
    }
})

Subscriber.js:

amqp.connect('amqp://<user>:<pass>@<host>:<port>', async (error0, connection) => {
    if (error0) {
      throw error0;
    }
    console.log('RabbitMQ connected')
    try {
      // Create/Bind a consumer queue for an exchange broker
      channel = await connection.createChannel()
      await channel.assertExchange(process.env.EXCHANGE_NAME, 'fanout', { durable: false });
      const queue = await channel.assertQueue('', {exclusive: true})
      channel.bindQueue(queue.queue, process.env.EXCHANGE_NAME, '')

      console.log(" [*] Waiting for messages in %s. To exit press CTRL+C");
      channel.consume('', consumeMessage, {noAck: true});
    } catch(error) {
      console.error(error)
    }
});

这是我在网上找到的一个例子。也许还能帮上忙。 https://www.codota.com/code/javascript/functions/amqplib/Channel/assertExchange

请阅读rabbitmq教程。你发布消息是为了交换,而不是为了排队;然后将其路由到适当的队列。在本例中,您应该为每个消费者绑定单独的队列。这样,它们就可以完全独立地使用消息。