Message Queues — Practical
Message Queues — Practical patterns
Section titled “Message Queues — Practical patterns”RabbitMQ producer/consumer (Node, amqplib)
Section titled “RabbitMQ producer/consumer (Node, amqplib)”import amqp from 'amqplib';
const conn = await amqp.connect('amqp://user:pass@rabbit:5672');const ch = await conn.createConfirmChannel(); // for publisher confirms
await ch.assertExchange('orders', 'topic', { durable: true });await ch.assertQueue('orders.processor', { durable: true, arguments: { 'x-dead-letter-exchange': 'orders.dlx', 'x-message-ttl': 600000, },});await ch.bindQueue('orders.processor', 'orders', 'order.*.created');
// publishch.publish('orders', 'order.us.created', Buffer.from(JSON.stringify(payload)), { persistent: true, messageId: id, contentType: 'application/json' }, (err) => err ? log.error(err) : log.debug('confirmed'));
// consumech.prefetch(20);ch.consume('orders.processor', async (msg) => { if (!msg) return; try { const data = JSON.parse(msg.content.toString()); await handle(data); ch.ack(msg); } catch (e) { ch.nack(msg, false, false); // don't requeue → goes to DLX }}, { noAck: false });RabbitMQ DLQ wiring
Section titled “RabbitMQ DLQ wiring”await ch.assertExchange('orders.dlx', 'topic', { durable: true });await ch.assertQueue('orders.dlq', { durable: true });await ch.bindQueue('orders.dlq', 'orders.dlx', '#');SQS (AWS SDK v3, Node)
Section titled “SQS (AWS SDK v3, Node)”import { SQSClient, SendMessageCommand, ReceiveMessageCommand, DeleteMessageCommand } from '@aws-sdk/client-sqs';
const sqs = new SQSClient({});
await sqs.send(new SendMessageCommand({ QueueUrl: URL, MessageBody: JSON.stringify(payload), MessageGroupId: order.id, // FIFO only MessageDeduplicationId: idempotencyKey, // FIFO only MessageAttributes: { type: { StringValue: 'OrderCreated', DataType: 'String' } },}));
while (running) { const res = await sqs.send(new ReceiveMessageCommand({ QueueUrl: URL, WaitTimeSeconds: 20, // long poll MaxNumberOfMessages: 10, VisibilityTimeout: 60, })); for (const m of res.Messages ?? []) { try { await handle(JSON.parse(m.Body!)); await sqs.send(new DeleteMessageCommand({ QueueUrl: URL, ReceiptHandle: m.ReceiptHandle! })); } catch (e) { // do nothing — message reappears after VisibilityTimeout log.error(e); } }}SQS DLQ via redrive
Section titled “SQS DLQ via redrive”resource "aws_sqs_queue" "main" { name = "orders" redrive_policy = jsonencode({ deadLetterTargetArn = aws_sqs_queue.dlq.arn maxReceiveCount = 5 })}resource "aws_sqs_queue" "dlq" { name = "orders-dlq" }Redis Streams (Node, ioredis)
Section titled “Redis Streams (Node, ioredis)”// produceawait redis.xadd('orders', '*', 'id', order.id, 'payload', JSON.stringify(order));
// create group onceawait redis.xgroup('CREATE', 'orders', 'workers', '$', 'MKSTREAM').catch(() => {});
// consume in a loopwhile (running) { const res = await redis.xreadgroup( 'GROUP', 'workers', `worker-${id}`, 'COUNT', 10, 'BLOCK', 5000, 'STREAMS', 'orders', '>', ); if (!res) continue; for (const [, entries] of res) { for (const [streamId, fields] of entries) { const obj = Object.fromEntries(fields.reduce((acc, _, i, a) => i % 2 ? acc : [...acc, [a[i], a[i+1]]], [])); try { await handle(JSON.parse(obj.payload)); await redis.xack('orders', 'workers', streamId); } catch (e) { log.error(e); } } }}
// claim stuck messages periodicallyconst idle = await redis.xpending('orders', 'workers', '-', '+', 100);for (const [id, , idleMs] of idle) { if (idleMs > 60_000) await redis.xclaim('orders', 'workers', `worker-${id}`, 60_000, id);}BullMQ (Redis-backed job queue)
Section titled “BullMQ (Redis-backed job queue)”import { Queue, Worker } from 'bullmq';
const q = new Queue('emails', { connection: redis });await q.add('welcome', { userId: 1 }, { attempts: 5, backoff: { type: 'exponential', delay: 1000 }, removeOnComplete: 1000, removeOnFail: 5000,});
new Worker('emails', async (job) => { await sendEmail(job.data);}, { connection: redis, concurrency: 10 });Outbox relay (Postgres → broker)
Section titled “Outbox relay (Postgres → broker)”async function relay() { while (true) { const rows = await db.query(` SELECT id, type, payload FROM outbox WHERE published_at IS NULL ORDER BY created_at LIMIT 100 FOR UPDATE SKIP LOCKED`); if (!rows.length) { await sleep(500); continue; } for (const r of rows) { await broker.publish(r.type, r.payload, { messageId: r.id }); await db.query(`UPDATE outbox SET published_at = now() WHERE id = $1`, [r.id]); } }}Idempotent consumer (inbox)
Section titled “Idempotent consumer (inbox)”async function consume(msg) { await db.tx(async t => { const ok = await t.query( `INSERT INTO inbox(message_id) VALUES ($1) ON CONFLICT DO NOTHING RETURNING 1`, [msg.id]); if (!ok.rowCount) return; // duplicate await applyEffect(t, msg.payload); });}Health & monitoring
Section titled “Health & monitoring”| Metric | Why |
|---|---|
| backlog / queue depth | growing = need more consumers |
| oldest message age | correlates with user-visible lag |
| consumer concurrency | matches workload |
| DLQ depth | always alert on > 0 |
| processing latency p95/p99 | per-message cost |
| ack/nack rates | consumer health |
| publish error rate | producer health |
- CloudAMQP / RabbitMQ Console — RMQ ops.
- AWS Console / SQS metrics — SQS.
- RedisInsight — Redis Streams.
- MQ Explorer / k6 — load testing queues.
- Conduktor / Redpanda Console — Kafka.
- Pact / Asyncapi — define event contracts.