Skip to content

Kafka — Practical

import { Kafka, CompressionTypes } from 'kafkajs';
const kafka = new Kafka({ clientId: 'app', brokers: ['k1:9092','k2:9092'] });
const producer = kafka.producer({
idempotent: true, // enable.idempotence
maxInFlightRequests: 5,
allowAutoTopicCreation: false,
});
await producer.connect();
await producer.send({
topic: 'orders',
acks: -1, // -1 = all
compression: CompressionTypes.Snappy,
messages: [{
key: order.userId, // partitions by user — preserves per-user order
value: JSON.stringify(order),
headers: { 'event-id': order.id, 'event-type': 'OrderCreated' },
}],
});

Consumer with manual commit (at-least-once)

Section titled “Consumer with manual commit (at-least-once)”
const consumer = kafka.consumer({
groupId: 'orders-processor',
sessionTimeout: 30000,
heartbeatInterval: 3000,
maxWaitTimeInMs: 500,
});
await consumer.connect();
await consumer.subscribe({ topic: 'orders', fromBeginning: false });
await consumer.run({
eachBatchAutoResolve: false,
eachBatch: async ({ batch, resolveOffset, heartbeat, isRunning, isStale }) => {
for (const msg of batch.messages) {
if (!isRunning() || isStale()) break;
await processMessage(msg); // must be idempotent
resolveOffset(msg.offset);
await heartbeat();
}
},
});
async function processMessage(msg: KafkaMessage) {
const eventId = msg.headers!['event-id']!.toString();
await db.tx(async t => {
const ok = await t.query(
`INSERT INTO processed_events (id) VALUES ($1) ON CONFLICT DO NOTHING RETURNING id`,
[eventId]);
if (!ok.rowCount) return; // duplicate, skip
await applyOrder(t, JSON.parse(msg.value!.toString()));
});
}
orders
→ orders-retry (TTL 30s, then back to orders) [up to N attempts]
→ orders-dlq (manual inspection / replay)
async function handle(msg: KafkaMessage) {
try {
await process(msg);
} catch (e) {
const attempts = Number(msg.headers!['retry-count'] ?? 0) + 1;
const target = attempts > MAX ? 'orders-dlq' : 'orders-retry';
await producer.send({
topic: target,
messages: [{
key: msg.key, value: msg.value,
headers: { ...msg.headers, 'retry-count': String(attempts), 'last-error': e.message },
}],
});
}
}
Terminal window
kafka-topics --create \
--bootstrap-server k1:9092 \
--topic orders \
--partitions 12 \
--replication-factor 3 \
--config min.insync.replicas=2 \
--config retention.ms=604800000 \
--config compression.type=producer
Terminal window
kafka-topics --create --topic user-state \
--partitions 6 --replication-factor 3 \
--config cleanup.policy=compact \
--config min.cleanable.dirty.ratio=0.1

Schema-aware producer (Avro + Schema Registry)

Section titled “Schema-aware producer (Avro + Schema Registry)”
import { SchemaRegistry } from '@kafkajs/confluent-schema-registry';
const sr = new SchemaRegistry({ host: 'http://sr:8081' });
const id = await sr.getLatestSchemaId('orders-value');
const encoded = await sr.encode(id, payload);
await producer.send({ topic: 'orders', messages: [{ value: encoded }] });
Terminal window
# group lag
kafka-consumer-groups --bootstrap-server k:9092 --describe --group g
# under-replicated partitions
kafka-topics --describe --bootstrap-server k:9092 --under-replicated-partitions
# topic config
kafka-configs --describe --bootstrap-server k:9092 --entity-type topics --entity-name t

Lag monitoring (Prometheus + Kafka Exporter)

Section titled “Lag monitoring (Prometheus + Kafka Exporter)”

Critical metrics:

  • kafka_consumergroup_lag per group/topic/partition
  • kafka_topic_partition_under_replicated_partition
  • kafka_topic_partitions
Terminal window
# move to start
kafka-consumer-groups --bootstrap-server k:9092 \
--group g --topic t --reset-offsets --to-earliest --execute
# move to specific timestamp
kafka-consumer-groups --bootstrap-server k:9092 \
--group g --topic t --reset-offsets \
--to-datetime 2026-05-01T00:00:00.000 --execute
SettingEffect
acks=allwait for ISR — durable
enable.idempotence=truededupe retries
linger.ms=10small wait → better batching
batch.size=65536per partition batch
compression.type=snappyproducer compression
max.in.flight.requests.per.connection=5OK with idempotence
auto.offset.reset=earliest/latest/noneconsumer’s first read
enable.auto.commit=falsemanual commits
max.poll.records=500tune batch size
max.poll.interval.ms=300000max time between polls
  • kcat (formerly kafkacat) — Swiss army knife.
  • Conduktor / Redpanda Console / Kafdrop — UI.
  • Cruise Control — automated rebalancing.
  • Burrow — lag monitor.
  • Debezium — CDC connector.
  • Strimzi — Kafka on Kubernetes operator.
  • Sending huge messages (>1MB) — split or store payload externally + send reference.
  • Single-key hot partition — all traffic to one partition.
  • Skipping schema registry — producers/consumers diverge silently.
  • Auto-creating topics in prod.

import { Kafka } from 'kafkajs'
const kafka = new Kafka({ clientId: 'orders-svc', brokers: ['b1:9092','b2:9092','b3:9092'] })
const producer = kafka.producer({ idempotent: true })
const consumer = kafka.consumer({ groupId: 'orders-billing-v1' })
await producer.connect()
await producer.send({ topic: 'orders', messages: [{ key: 'user-42', value: JSON.stringify({ id: 1, total: 99 }) }] })
await consumer.connect()
await consumer.subscribe({ topic: 'orders', fromBeginning: false })
await consumer.run({ eachMessage: async ({ topic, partition, message }) => {
console.log({ p: partition, off: message.offset, key: message.key?.toString() })
}})
const tenantPartitioner = () => ({ topic, partitionMetadata, message }) => {
const tenantId = message.headers?.['tenant-id']?.toString() ?? 'default'
const numPartitions = partitionMetadata.length
// simple FNV-1a; production: use murmur2 to match Java clients
let h = 2166136261
for (const c of tenantId) { h ^= c.charCodeAt(0); h = Math.imul(h, 16777619) }
return Math.abs(h) % numPartitions
}
const producer = kafka.producer({ createPartitioner: tenantPartitioner })

Deep — static membership + cooperative rebalance

Section titled “Deep — static membership + cooperative rebalance”
const consumer = kafka.consumer({
groupId: 'invoices-v2',
groupInstanceId: `invoices-${process.env.POD_NAME}`, // static membership
sessionTimeout: 45000,
heartbeatInterval: 3000,
rebalanceTimeout: 60000,
})
await consumer.run({
partitionsConsumedConcurrently: 3,
eachMessage: async ({ message, heartbeat }) => {
await handle(message)
await heartbeat() // long handlers must heartbeat or get evicted
},
})

Deep — exactly-once consume-process-produce

Section titled “Deep — exactly-once consume-process-produce”
const producer = kafka.producer({
idempotent: true, // implies acks=-1, retries=Infinity
maxInFlightRequests: 1, // required for KafkaJS EoS ordering
transactionalId: `enricher-${process.env.POD_NAME}`,
})
await producer.connect()
const consumer = kafka.consumer({ groupId: 'enricher' })
await consumer.subscribe({ topic: 'orders.raw' })
await consumer.run({
eachBatchAutoResolve: false,
eachBatch: async ({ batch, resolveOffset, heartbeat }) => {
const tx = await producer.transaction()
try {
for (const m of batch.messages) {
const enriched = await enrich(m.value)
await tx.send({ topic: 'orders.enriched', messages: [{ key: m.key, value: enriched }] })
resolveOffset(m.offset); await heartbeat()
}
await tx.sendOffsets({
consumerGroupId: 'enricher',
topics: [{ topic: batch.topic, partitions: [{ partition: batch.partition, offset: (BigInt(batch.lastOffset()) + 1n).toString() }] }],
})
await tx.commit()
} catch (e) { await tx.abort(); throw e }
},
})
const RETRIES = [{ topic: 'orders.retry.5s', delayMs: 5_000 },
{ topic: 'orders.retry.30s', delayMs: 30_000 },
{ topic: 'orders.retry.5m', delayMs: 300_000 }]
const DLQ = 'orders.DLQ'
async function processWithRetry(message, retryIndex = -1) {
const eventId = message.headers['event-id'].toString()
if (await dedupe.exists(eventId)) return // idempotent consumer
try {
await businessLogic(JSON.parse(message.value))
await dedupe.set(eventId, Date.now(), { ttl: '7d' })
} catch (err) {
const next = RETRIES[retryIndex + 1]
const target = next?.topic ?? DLQ
await producer.send({ topic: target, messages: [{
key: message.key, value: message.value,
headers: { ...message.headers,
'x-original-topic': 'orders',
'x-error': err.message, 'x-attempt': String(retryIndex + 2),
'x-retry-after': String(Date.now() + (next?.delayMs ?? 0)) },
}]})
}
}
// Retry consumer waits for delay before processing
async function retryHandler({ message }) {
const ready = Number(message.headers['x-retry-after'])
const wait = ready - Date.now()
if (wait > 0) await new Promise(r => setTimeout(r, wait))
await processWithRetry(message, currentRetryIndex)
}

Deep — Debezium Postgres connector config

Section titled “Deep — Debezium Postgres connector config”
{
"name": "orders-cdc",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "pg-primary", "database.port": "5432",
"database.user": "debezium", "database.dbname": "orders",
"plugin.name": "pgoutput",
"publication.name": "dbz_orders",
"slot.name": "debezium_orders",
"table.include.list": "public.orders,public.outbox",
"topic.prefix": "orders",
"snapshot.mode": "initial",
"transforms": "outbox",
"transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
"transforms.outbox.route.topic.replacement": "${routedByValue}"
}
}
// v1
{ "type":"record", "name":"Order", "fields":[
{"name":"id","type":"string"}, {"name":"total","type":"double"}
]}
// v2 — BACKWARD compatible (consumer with v2 reads v1 data using default)
{ "type":"record", "name":"Order", "fields":[
{"name":"id","type":"string"}, {"name":"total","type":"double"},
{"name":"currency","type":"string","default":"USD"}
]}
await admin.createTopics({ topics: [{
topic: 'orders.state', numPartitions: 12, replicationFactor: 3,
configEntries: [
{ name: 'min.insync.replicas', value: '2' },
{ name: 'cleanup.policy', value: 'compact' },
{ name: 'unclean.leader.election.enable', value: 'false' },
{ name: 'segment.ms', value: '604800000' },
],
}]})
// Producer matched to durability target
const producer = kafka.producer({ idempotent: true /* sets acks=-1 */ })
StreamsBuilder b = new StreamsBuilder();
KStream<String, Order> orders = b.stream("orders");
KTable<String, Long> countsPerCustomer = orders
.groupBy((k, o) -> o.customerId())
.count(Materialized.as("orders-by-customer"));
countsPerCustomer.toStream().to("customer-order-counts", Produced.with(Serdes.String(), Serdes.Long()));
  • Treating Kafka as a request/response system.