RabbitMQ / AMQP: 동일한 메시지에 대해 단일 대기열, 다중 소비자?
저는 이제 막 RabbitMQ와 AMQP를 일반적으로 사용하기 시작했습니다.
- 메시지 대기열이 있습니다.
- 저는 여러 명의 소비자가 있는데, 같은 메시지로 다른 일을 하고 싶습니다.
대부분의 토끼MQ 문서는 라운드 로빈, 즉 단일 소비자가 단일 메시지를 소비하고 각 소비자 간에 부하가 분산되는 방식에 초점을 맞춘 것으로 보입니다.이것이 바로 제가 목격한 행동입니다.
예: 생산자는 단일 대기열을 가지고 있으며 2초마다 메시지를 보냅니다.
var amqp = require('amqp');
var connection = amqp.createConnection({ host: "localhost", port: 5672 });
var count = 1;
connection.on('ready', function () {
var sendMessage = function(connection, queue_name, payload) {
var encoded_payload = JSON.stringify(payload);
connection.publish(queue_name, encoded_payload);
}
setInterval( function() {
var test_message = 'TEST '+count
sendMessage(connection, "my_queue_name", test_message)
count += 1;
}, 2000)
})
그리고 여기 소비자가 있습니다.
var amqp = require('amqp');
var connection = amqp.createConnection({ host: "localhost", port: 5672 });
connection.on('ready', function () {
connection.queue("my_queue_name", function(queue){
queue.bind('#');
queue.subscribe(function (message) {
var encoded_payload = unescape(message.data)
var payload = JSON.parse(encoded_payload)
console.log('Recieved a message:')
console.log(payload)
})
})
})
소비자를 두 번 시작하면 각 소비자가 라운드 로빈 방식으로 대체 메시지를 소비하고 있음을 알 수 있습니다. 예, 하나의 터미널에는 1, 3, 5, 다른 터미널에는 2, 4, 6이라는 메시지가 표시됩니다.
제 질문은:
각 소비자에게 동일한 메시지를 받도록 할 수 있습니까?즉, 두 소비자 모두 메시지 1, 2, 3, 4, 5, 6을 수신합니까?AMQP/RabbitMQ 스피크에서 이것은 무엇이라고 합니까?일반적으로 어떻게 구성됩니까?
이것은 일반적으로 하나요?교환으로 메시지를 한 명의 소비자와 함께 두 개의 개별 대기열로 보낼 수 있을까요?
각 소비자에게 동일한 메시지를 받도록 할 수 있습니까?즉, 두 소비자 모두 메시지 1, 2, 3, 4, 5, 6을 수신합니까?AMQP/RabbitMQ 스피크에서 이것은 무엇이라고 합니까?일반적으로 어떻게 구성됩니까?
아니요, 소비자들이 같은 줄에 있다면 그렇지 않습니다.토끼로부터MQ의 AMQP 개념 가이드:
AMQP 0-9-1에서는 메시지가 소비자 간에 로드 밸런싱된다는 것을 이해하는 것이 중요합니다.
이는 대기열 내에서 라운드 로빈 동작이 주어진 것이며 구성할 수 없음을 의미하는 것으로 보입니다.즉, 여러 소비자가 동일한 메시지 ID를 처리하려면 별도의 대기열이 필요합니다.
이것은 일반적으로 하나요?교환으로 메시지를 한 명의 소비자와 함께 두 개의 개별 대기열로 보낼 수 있을까요?
아닙니다. 각 소비자가 동일한 메시지 ID를 처리하는 단일 대기열/여러 소비자는 사용할 수 없습니다.메시지를 두 개의 개별 대기열로 전송하는 교환 경로가 실제로 더 좋습니다.
저는 너무 복잡한 라우팅이 필요하지 않기 때문에 팬아웃 교환이 이를 잘 처리할 것입니다.node-amqp에는 연결에 메시지를 직접 게시할 수 있는 '기본 교환' 개념이 있지만 대부분의 AMQP 메시지는 특정 교환에 게시됩니다.
다음은 팬아웃 교환입니다. 보내고 받는 것입니다.
var amqp = require('amqp');
var connection = amqp.createConnection({ host: "localhost", port: 5672 });
var count = 1;
connection.on('ready', function () {
connection.exchange("my_exchange", options={type:'fanout'}, function(exchange) {
var sendMessage = function(exchange, payload) {
console.log('about to publish')
var encoded_payload = JSON.stringify(payload);
exchange.publish('', encoded_payload, {})
}
// Recieve messages
connection.queue("my_queue_name", function(queue){
console.log('Created queue')
queue.bind(exchange, '');
queue.subscribe(function (message) {
console.log('subscribed to queue')
var encoded_payload = unescape(message.data)
var payload = JSON.parse(encoded_payload)
console.log('Recieved a message:')
console.log(payload)
})
})
setInterval( function() {
var test_message = 'TEST '+count
sendMessage(exchange, test_message)
count += 1;
}, 2000)
})
})
마지막 두 가지 답변은 거의 정확합니다. 저는 다른 소비자에게 전달해야 하는 메시지를 생성하는 수많은 앱을 가지고 있기 때문에 프로세스가 매우 간단합니다.
여러 소비자가 동일한 메시지를 표시하도록 하려면 다음 절차를 수행합니다.
각 대기열 속성에서 메시지를 수신할 각 앱에 대해 하나씩 여러 대기열을 만들고 amq.direct 교환으로 라우팅 태그를 "바인딩"합니다.amq.direct로 보낼 게시 앱을 변경하고 routing-tag(큐가 아님)를 사용합니다.그런 다음 AMQP는 동일한 바인딩을 사용하여 메시지를 각 대기열에 복사합니다.매력적으로 작동합니다 :)
예: 제가 생성한 JSON 문자열이 있다고 가정해 보겠습니다. 라우팅 태그 "new-sales-order"를 사용하여 "amq.direct" 교환에 게시하고 주문을 인쇄하는 order_printer 앱의 대기열이 있습니다.저는 주문서 사본을 발송하고 고객에게 송장을 발송하는 청구 시스템의 대기열을 가지고 있으며, 과거/준수 이유로 주문을 보관하는 웹 아카이브 시스템과 주문에 대한 다른 정보가 들어오면 주문이 추적되는 클라이언트 웹 인터페이스를 가지고 있습니다.
따라서 제 큐는 다음과 같습니다. order_printer, order_billing, order_archive 및 order_tracking 모두 바인딩 태그 "new-sales-order"가 바인딩되어 있으며, 4개 모두 JSON 데이터를 가져옵니다.
이것은 게시 앱이 수신 앱을 모르거나 신경 쓰지 않고 데이터를 보내는 이상적인 방법입니다.
rabbitmq 튜토리얼을 읽어보세요.메시지를 대기열이 아닌 교환에 게시한 다음 적절한 대기열로 라우트됩니다.이 경우 각 소비자에 대해 별도의 대기열을 바인딩해야 합니다.이렇게 하면 메시지를 완전히 독립적으로 사용할 수 있습니다.
예, 각 소비자는 동일한 메시지를 수신할 수 있습니다.http://www.rabbitmq.com/tutorials/tutorial-three-python.html http://www.rabbitmq.com/tutorials/tutorial-four-python.html http://www.rabbitmq.com/tutorials/tutorial-five-python.html 을 확인해 보십시오.
메시지를 라우트하는 다양한 방법에 대해 설명합니다.나는 그것들이 파이썬과 자바를 위한 것이라는 것을 알지만, 원칙을 이해하고, 당신이 무엇을 하고 있는지 결정하고, JS에서 그것을 하는 방법을 찾는 것이 좋습니다.교환에 연결된 모든 대기열에 메시지를 보내는 간단한 팬아웃(튜토리얼 3)을 수행하려는 것처럼 들립니다.
현재 진행 중인 작업과 원하는 작업의 차이점은 기본적으로 팬아웃을 설정하고 교환하거나 입력한다는 것입니다.팬아웃 교환은 모든 메시지를 연결된 모든 대기열로 보냅니다.각 대기열에는 모든 메시지에 개별적으로 액세스할 수 있는 소비자가 있습니다.
네, 이것은 일반적으로 이루어집니다, 이것은 AMPQ의 특징 중 하나입니다.
전송 패턴은 일대일 관계입니다.둘 이상의 수신기에 "보내기"하려면 펍/서브 패턴을 사용해야 합니다.자세한 내용은 http://www.rabbitmq.com/tutorials/tutorial-three-python.html 을 참조하십시오.
RabbitMQ / AMQP: 단일 큐, 동일한 메시지 및 페이지 새로 고침을 위한 여러 소비자.
rabbit.on('ready', function () { });
sockjs_chat.on('connection', function (conn) {
conn.on('data', function (message) {
try {
var obj = JSON.parse(message.replace(/\r/g, '').replace(/\n/g, ''));
if (obj.header == "register") {
// Connect to RabbitMQ
try {
conn.exchange = rabbit.exchange(exchange, { type: 'topic',
autoDelete: false,
durable: false,
exclusive: false,
confirm: true
});
conn.q = rabbit.queue('my-queue-'+obj.agentID, {
durable: false,
autoDelete: false,
exclusive: false
}, function () {
conn.channel = 'my-queue-'+obj.agentID;
conn.q.bind(conn.exchange, conn.channel);
conn.q.subscribe(function (message) {
console.log("[MSG] ---> " + JSON.stringify(message));
conn.write(JSON.stringify(message) + "\n");
}).addCallback(function(ok) {
ctag[conn.channel] = ok.consumerTag; });
});
} catch (err) {
console.log("Could not create connection to RabbitMQ. \nStack trace -->" + err.stack);
}
} else if (obj.header == "typing") {
var reply = {
type: 'chatMsg',
msg: utils.escp(obj.msga),
visitorNick: obj.channel,
customField1: '',
time: utils.getDateTime(),
channel: obj.channel
};
conn.exchange.publish('my-queue-'+obj.agentID, reply);
}
} catch (err) {
console.log("ERROR ----> " + err.stack);
}
});
// When the visitor closes or reloads a page we need to unbind from RabbitMQ?
conn.on('close', function () {
try {
// Close the socket
conn.close();
// Close RabbitMQ
conn.q.unsubscribe(ctag[conn.channel]);
} catch (er) {
console.log(":::::::: EXCEPTION SOCKJS (ON-CLOSE) ::::::::>>>>>>> " + er.stack);
}
});
});
당신의 경우는 다음과 같습니다.
메시지 대기열이 있습니다(메시지 수신 소스, Q111로 이름 지정).
저는 여러 명의 소비자가 있는데, 같은 메시지로 다른 일을 하고 싶습니다.
여기서 문제는 이 대기열에서 3개의 메시지가 수신되는 동안 A 소비자가 1개의 메시지를 소비하고, B 소비자와 C 소비자가 2개와 3개의 메시지를 소비한다는 것입니다.rabbitmq가 이 세 메시지(1,2,3)의 동일한 복사본을 연결된 세 소비자(A,B,C) 모두에게 동시에 전달하는 설정이 필요한 경우.
이를 위해 많은 구성을 수행할 수 있지만 간단한 방법은 다음 2단계 개념을 사용하는 것입니다.
- 동적 rabbitmq-shovel을 사용하여 원하는 대기열(q111)에서 메시지를 픽업하고 팬아웃 교환(이를 위해 독점적으로 생성 및 전용 교환)에 게시합니다.
- 이제 각 소비자에 대한 전용 익명 대기열을 사용하여 이 팬아웃 교환에서 직접 청취하도록 소비자 A, B 및 C(큐 111)를 재구성합니다.
참고: 이 개념을 사용하는 동안 이미 사용된 메시지는 팬아웃 교환으로 이동되지 않으므로 소스 대기열(q111)에서 직접 소비되지 않습니다.
이게 당신의 정확한 요구사항을 충족시키지 못한다고 생각하신다면...당신의 제안을 자유롭게 게시하세요 :-)
나는 당신이 팬아웃 교환기를 이용해서 메시지를 보내는 것을 확인해야 한다고 생각합니다.그렇게 하면 다른 소비자들에게 동일한 메시지를 받게 될 것입니다. Rabbit 표 아래에 있습니다.MQ는 이 새로운 소비자/가입자 각각에 대해 서로 다른 대기열을 생성하고 있습니다.
이것은 javascript https://www.rabbitmq.com/tutorials/tutorial-one-javascript.html 의 튜토리얼 예제를 참조하기 위한 링크입니다.
원하는 동작을 얻으려면 각 소비자가 자신의 대기열에서 소비하도록 하면 됩니다.메시지를 모든 대기열에 한 번에 전달하려면 직접적이지 않은 교환 유형(항목, 헤더, 팬아웃)을 사용해야 합니다.
만약 당신이 나처럼 amqplib 라이브러리를 사용하고 있다면, 그들은 Rabbit 게시/구독 구현의 유용한 예를 가지고 있습니다.유용한 MQ 튜토리얼입니다.
이 시나리오에서 제가 여기 답변에서 찾지 못한 한 가지 흥미로운 옵션이 있습니다.
한 소비자의 "대기열" 기능이 있는 메시지를 다른 소비자의 메시지로 처리할 수 있습니다.일반적으로 말해서, 그것은 올바른 방법이 아니지만, 아마도 누군가에게는 충분히 좋을 것입니다.
https://www.rabbitmq.com/nack.html
그리고 루프(모든 소비자가 메시지를 수신할 때)에 주의하십시오!
팬아웃은 분명히 당신이 원했던 것입니다. fanout
토끼를 보다MQ 튜토리얼: https://www.rabbitmq.com/tutorials/tutorial-three-javascript.html
여기 제 예가 있습니다.
Publisher.js:
amqp.connect('amqp://<user>:<pass>@<host>:<port>', async (error0, connection) => {
if (error0) {
throw error0;
}
console.log('RabbitMQ connected')
try {
// Create exchange for queues
channel = await connection.createChannel()
await channel.assertExchange(process.env.EXCHANGE_NAME, 'fanout', { durable: false });
await channel.publish(process.env.EXCHANGE_NAME, '', Buffer.from('msg'))
} catch(error) {
console.error(error)
}
})
Subscriber.js:
amqp.connect('amqp://<user>:<pass>@<host>:<port>', async (error0, connection) => {
if (error0) {
throw error0;
}
console.log('RabbitMQ connected')
try {
// Create/Bind a consumer queue for an exchange broker
channel = await connection.createChannel()
await channel.assertExchange(process.env.EXCHANGE_NAME, 'fanout', { durable: false });
const queue = await channel.assertQueue('', {exclusive: true})
channel.bindQueue(queue.queue, process.env.EXCHANGE_NAME, '')
console.log(" [*] Waiting for messages in %s. To exit press CTRL+C");
channel.consume('', consumeMessage, {noAck: true});
} catch(error) {
console.error(error)
}
});
여기 내가 인터넷에서 찾은 예가 있습니다. 아마도 또한 도움이 될 것입니다.https://www.codota.com/code/javascript/functions/amqplib/Channel/assertExchange
당신은 소비자들에게 다른 그룹을 할당하기만 하면 됩니다.
언급URL : https://stackoverflow.com/questions/10620976/rabbitmq-amqp-single-queue-multiple-consumers-for-same-message
'programing' 카테고리의 다른 글
| Node.js와 함께 사용할 웹 소켓 라이브러리는 무엇입니까? (0) | 2023.05.29 |
|---|---|
| 각도 2 - 내부HTML 스타일 (0) | 2023.05.29 |
| 현재 사용 중인 대화형 셸을 확인하는 방법(명령줄) (0) | 2023.05.29 |
| iOS 10에서 Xcode 7 사용 (0) | 2023.05.29 |
| 버튼 클릭 시 비동기 메서드 호출 (0) | 2023.05.29 |