我刚刚开始使用RabbitMQ和AMQP。
我有一个消息队列
我有多个消费者,我希望用相同的消息做不同的事情。
大多数RabbitMQ文档似乎都关注于循环,即单个消息由单个消费者消费,负载分布在每个消费者之间。这确实是我亲眼所见的行为。
举个例子:生产者只有一个队列,每2秒发送一次消息:
var amqp = require('amqp');
var connection = amqp.createConnection({ host: "localhost", port: 5672 });
var count = 1;
connection.on('ready', function () {
var sendMessage = function(connection, queue_name, payload) {
var encoded_payload = JSON.stringify(payload);
connection.publish(queue_name, encoded_payload);
}
setInterval( function() {
var test_message = 'TEST '+count
sendMessage(connection, "my_queue_name", test_message)
count += 1;
}, 2000)
})
这是一位消费者:
var amqp = require('amqp');
var connection = amqp.createConnection({ host: "localhost", port: 5672 });
connection.on('ready', function () {
connection.queue("my_queue_name", function(queue){
queue.bind('#');
queue.subscribe(function (message) {
var encoded_payload = unescape(message.data)
var payload = JSON.parse(encoded_payload)
console.log('Recieved a message:')
console.log(payload)
})
})
})
如果我启动消费者两次,我可以看到每个消费者都在循环行为中使用备用消息。例如,我将在一个终端上看到消息1,3,5,在另一个终端上看到消息2,4,6。
我的问题是:
我能让每个消费者收到相同的消息吗?也就是说,两个消费者都得到消息1,2,3,4,5,6 ?这在AMQP/RabbitMQ中被称为什么?它通常是如何配置的?
这种做法普遍吗?我是否应该让交换器将消息路由到两个单独的队列中,只有一个消费者?
显然你想要的是扇形散开。扇出
阅读rabbitMQ教程:
https://www.rabbitmq.com/tutorials/tutorial-three-javascript.html
以下是我的例子:
Publisher.js:
amqp.connect('amqp://<user>:<pass>@<host>:<port>', async (error0, connection) => {
if (error0) {
throw error0;
}
console.log('RabbitMQ connected')
try {
// Create exchange for queues
channel = await connection.createChannel()
await channel.assertExchange(process.env.EXCHANGE_NAME, 'fanout', { durable: false });
await channel.publish(process.env.EXCHANGE_NAME, '', Buffer.from('msg'))
} catch(error) {
console.error(error)
}
})
Subscriber.js:
amqp.connect('amqp://<user>:<pass>@<host>:<port>', async (error0, connection) => {
if (error0) {
throw error0;
}
console.log('RabbitMQ connected')
try {
// Create/Bind a consumer queue for an exchange broker
channel = await connection.createChannel()
await channel.assertExchange(process.env.EXCHANGE_NAME, 'fanout', { durable: false });
const queue = await channel.assertQueue('', {exclusive: true})
channel.bindQueue(queue.queue, process.env.EXCHANGE_NAME, '')
console.log(" [*] Waiting for messages in %s. To exit press CTRL+C");
channel.consume('', consumeMessage, {noAck: true});
} catch(error) {
console.error(error)
}
});
这是我在网上找到的一个例子。也许还能帮上忙。
https://www.codota.com/code/javascript/functions/amqplib/Channel/assertExchange
据我评估,你的情况是
我有一个消息队列(您接收消息的源,让它命名为q111)
我有多个消费者,我希望用相同的消息做不同的事情。
这里的问题是,当这个队列接收到3个消息时,消息1被消费者a消费,其他消费者B和C消费消息2和3。当你需要一个设置,让rabbitmq同时将这三个消息(1,2,3)的相同副本传递给所有连接的消费者(a,B,C)。
虽然可以通过许多配置来实现这一点,但简单的方法是使用以下两步概念:
使用动态rabbitmq-shovel从所需队列(q111)提取消息并发布到扇出交换机(专门为此目的创建和专用的交换机)。
现在重新配置你的消费者A,B和C(他们正在监听队列(q111)),直接使用每个消费者的排他和匿名队列从这个Fanout交换机监听。
注意:当使用这个概念时,不要直接从源队列(q111)消费,因为已经消费的消息不会被移到你的Fanout交换机。
如果您认为这不能满足您的具体要求……欢迎发表你的建议:-)