我想在一段时间后在我的一个工人中收到一条消息。在发现了所谓的死信交换之后,我决定使用Node和RabbitMQ。
该消息似乎已发送到DeadExchange中的队列,但是使用方在WorkExchange中的WorkQueue中经过了一段时间之后,使用者再也没有收到该消息。要么bindQueue关闭,要么死信不起作用?
我现在尝试 了很多 不同的值。有人可以指出我所缺少的吗?
var amqp = require('amqplib'); var url = 'amqp://dev.rabbitmq.com'; amqp.connect(url).then(function(conn) { //Subscribe to the WorkQueue in WorkExchange to which the "delayed" messages get dead-letter'ed (is that a verb?) to. return conn.createChannel().then(function(ch) { return ch.assertExchange('WorkExchange', 'direct').then(function() { return ch.assertQueue('WorkQueue', { autoDelete: false, durable: true }) }).then(function() { return ch.bindQueue('WorkQueue', 'WorkExchange', ''); }).then(function() { console.log('Waiting for consume.'); return ch.consume('WorkQueue', function(msg) { console.log('Received message.'); console.log(msg.content.toString()); ch.ack(msg); }); }); }) }).then(function() { //Now send a test message to DeadExchange to a random (unique) queue. return amqp.connect(url).then(function(conn) { return conn.createChannel(); }).then(function(ch) { return ch.assertExchange('DeadExchange', 'direct').then(function() { return ch.assertQueue('', { arguments: { 'x-dead-letter-exchange': 'WorkExchange', 'x-message-ttl': 2000, 'x-expires': 10000 } }) }).then(function(ok) { console.log('Sending delayed message'); return ch.sendToQueue(ok.queue, new Buffer(':)')); }); }) }).then(null, function(error) { console.log('error\'ed') console.log(error); console.log(error.stack); });
我正在使用amqp.node(https://github.com/squaremo/amqp.node),它是npm中的amqplib。尽管node- amqp(https://github.com/postwait/node-amqp)似乎更受欢迎,但它并未实现完整的协议,并且在重新连接方面存在许多突出的问题。
dev.rabbitmq.com正在运行RabbitMQ 3.1.3。
AMQP.Node中的Channel#assertQueue中有一个错误已得到修复,请参见https://github.com/squaremo/amqp.node/commit/3749c66b448875d2df374e6a89946c0bdd0cb918。该修补程序位于GitHub上,但尚未在npm中。