Skip to content

Message Queues — Practical

RabbitMQ producer/consumer (Node, amqplib)

Section titled “RabbitMQ producer/consumer (Node, amqplib)”
import amqp from 'amqplib';
const conn = await amqp.connect('amqp://user:pass@rabbit:5672');
const ch = await conn.createConfirmChannel(); // for publisher confirms
await ch.assertExchange('orders', 'topic', { durable: true });
await ch.assertQueue('orders.processor', {
durable: true,
arguments: {
'x-dead-letter-exchange': 'orders.dlx',
'x-message-ttl': 600000,
},
});
await ch.bindQueue('orders.processor', 'orders', 'order.*.created');
// publish
ch.publish('orders', 'order.us.created', Buffer.from(JSON.stringify(payload)),
{ persistent: true, messageId: id, contentType: 'application/json' },
(err) => err ? log.error(err) : log.debug('confirmed'));
// consume
ch.prefetch(20);
ch.consume('orders.processor', async (msg) => {
if (!msg) return;
try {
const data = JSON.parse(msg.content.toString());
await handle(data);
ch.ack(msg);
} catch (e) {
ch.nack(msg, false, false); // don't requeue → goes to DLX
}
}, { noAck: false });
await ch.assertExchange('orders.dlx', 'topic', { durable: true });
await ch.assertQueue('orders.dlq', { durable: true });
await ch.bindQueue('orders.dlq', 'orders.dlx', '#');
import { SQSClient, SendMessageCommand, ReceiveMessageCommand, DeleteMessageCommand } from '@aws-sdk/client-sqs';
const sqs = new SQSClient({});
await sqs.send(new SendMessageCommand({
QueueUrl: URL,
MessageBody: JSON.stringify(payload),
MessageGroupId: order.id, // FIFO only
MessageDeduplicationId: idempotencyKey, // FIFO only
MessageAttributes: { type: { StringValue: 'OrderCreated', DataType: 'String' } },
}));
while (running) {
const res = await sqs.send(new ReceiveMessageCommand({
QueueUrl: URL,
WaitTimeSeconds: 20, // long poll
MaxNumberOfMessages: 10,
VisibilityTimeout: 60,
}));
for (const m of res.Messages ?? []) {
try {
await handle(JSON.parse(m.Body!));
await sqs.send(new DeleteMessageCommand({ QueueUrl: URL, ReceiptHandle: m.ReceiptHandle! }));
} catch (e) {
// do nothing — message reappears after VisibilityTimeout
log.error(e);
}
}
}
resource "aws_sqs_queue" "main" {
name = "orders"
redrive_policy = jsonencode({
deadLetterTargetArn = aws_sqs_queue.dlq.arn
maxReceiveCount = 5
})
}
resource "aws_sqs_queue" "dlq" { name = "orders-dlq" }
// produce
await redis.xadd('orders', '*', 'id', order.id, 'payload', JSON.stringify(order));
// create group once
await redis.xgroup('CREATE', 'orders', 'workers', '$', 'MKSTREAM').catch(() => {});
// consume in a loop
while (running) {
const res = await redis.xreadgroup(
'GROUP', 'workers', `worker-${id}`,
'COUNT', 10, 'BLOCK', 5000,
'STREAMS', 'orders', '>',
);
if (!res) continue;
for (const [, entries] of res) {
for (const [streamId, fields] of entries) {
const obj = Object.fromEntries(fields.reduce((acc, _, i, a) => i % 2 ? acc : [...acc, [a[i], a[i+1]]], []));
try {
await handle(JSON.parse(obj.payload));
await redis.xack('orders', 'workers', streamId);
} catch (e) { log.error(e); }
}
}
}
// claim stuck messages periodically
const idle = await redis.xpending('orders', 'workers', '-', '+', 100);
for (const [id, , idleMs] of idle) {
if (idleMs > 60_000) await redis.xclaim('orders', 'workers', `worker-${id}`, 60_000, id);
}
import { Queue, Worker } from 'bullmq';
const q = new Queue('emails', { connection: redis });
await q.add('welcome', { userId: 1 }, {
attempts: 5,
backoff: { type: 'exponential', delay: 1000 },
removeOnComplete: 1000,
removeOnFail: 5000,
});
new Worker('emails', async (job) => {
await sendEmail(job.data);
}, { connection: redis, concurrency: 10 });
async function relay() {
while (true) {
const rows = await db.query(`
SELECT id, type, payload FROM outbox
WHERE published_at IS NULL
ORDER BY created_at LIMIT 100
FOR UPDATE SKIP LOCKED`);
if (!rows.length) { await sleep(500); continue; }
for (const r of rows) {
await broker.publish(r.type, r.payload, { messageId: r.id });
await db.query(`UPDATE outbox SET published_at = now() WHERE id = $1`, [r.id]);
}
}
}
async function consume(msg) {
await db.tx(async t => {
const ok = await t.query(
`INSERT INTO inbox(message_id) VALUES ($1) ON CONFLICT DO NOTHING RETURNING 1`,
[msg.id]);
if (!ok.rowCount) return; // duplicate
await applyEffect(t, msg.payload);
});
}
MetricWhy
backlog / queue depthgrowing = need more consumers
oldest message agecorrelates with user-visible lag
consumer concurrencymatches workload
DLQ depthalways alert on > 0
processing latency p95/p99per-message cost
ack/nack ratesconsumer health
publish error rateproducer health
  • CloudAMQP / RabbitMQ Console — RMQ ops.
  • AWS Console / SQS metrics — SQS.
  • RedisInsight — Redis Streams.
  • MQ Explorer / k6 — load testing queues.
  • Conduktor / Redpanda Console — Kafka.
  • Pact / Asyncapi — define event contracts.