Kafka — Practical
Kafka — Practical patterns
Section titled “Kafka — Practical patterns”Producer (Node, kafkajs)
Section titled “Producer (Node, kafkajs)”import { Kafka, CompressionTypes } from 'kafkajs';
const kafka = new Kafka({ clientId: 'app', brokers: ['k1:9092','k2:9092'] });const producer = kafka.producer({ idempotent: true, // enable.idempotence maxInFlightRequests: 5, allowAutoTopicCreation: false,});await producer.connect();
await producer.send({ topic: 'orders', acks: -1, // -1 = all compression: CompressionTypes.Snappy, messages: [{ key: order.userId, // partitions by user — preserves per-user order value: JSON.stringify(order), headers: { 'event-id': order.id, 'event-type': 'OrderCreated' }, }],});Consumer with manual commit (at-least-once)
Section titled “Consumer with manual commit (at-least-once)”const consumer = kafka.consumer({ groupId: 'orders-processor', sessionTimeout: 30000, heartbeatInterval: 3000, maxWaitTimeInMs: 500,});await consumer.connect();await consumer.subscribe({ topic: 'orders', fromBeginning: false });
await consumer.run({ eachBatchAutoResolve: false, eachBatch: async ({ batch, resolveOffset, heartbeat, isRunning, isStale }) => { for (const msg of batch.messages) { if (!isRunning() || isStale()) break; await processMessage(msg); // must be idempotent resolveOffset(msg.offset); await heartbeat(); } },});Idempotent processing (DB-level dedupe)
Section titled “Idempotent processing (DB-level dedupe)”async function processMessage(msg: KafkaMessage) { const eventId = msg.headers!['event-id']!.toString(); await db.tx(async t => { const ok = await t.query( `INSERT INTO processed_events (id) VALUES ($1) ON CONFLICT DO NOTHING RETURNING id`, [eventId]); if (!ok.rowCount) return; // duplicate, skip await applyOrder(t, JSON.parse(msg.value!.toString())); });}Retry + DLQ topology
Section titled “Retry + DLQ topology”orders → orders-retry (TTL 30s, then back to orders) [up to N attempts] → orders-dlq (manual inspection / replay)async function handle(msg: KafkaMessage) { try { await process(msg); } catch (e) { const attempts = Number(msg.headers!['retry-count'] ?? 0) + 1; const target = attempts > MAX ? 'orders-dlq' : 'orders-retry'; await producer.send({ topic: target, messages: [{ key: msg.key, value: msg.value, headers: { ...msg.headers, 'retry-count': String(attempts), 'last-error': e.message }, }], }); }}Topic creation with sensible defaults
Section titled “Topic creation with sensible defaults”kafka-topics --create \ --bootstrap-server k1:9092 \ --topic orders \ --partitions 12 \ --replication-factor 3 \ --config min.insync.replicas=2 \ --config retention.ms=604800000 \ --config compression.type=producerCompacted topic (changelog)
Section titled “Compacted topic (changelog)”kafka-topics --create --topic user-state \ --partitions 6 --replication-factor 3 \ --config cleanup.policy=compact \ --config min.cleanable.dirty.ratio=0.1Schema-aware producer (Avro + Schema Registry)
Section titled “Schema-aware producer (Avro + Schema Registry)”import { SchemaRegistry } from '@kafkajs/confluent-schema-registry';const sr = new SchemaRegistry({ host: 'http://sr:8081' });const id = await sr.getLatestSchemaId('orders-value');const encoded = await sr.encode(id, payload);await producer.send({ topic: 'orders', messages: [{ value: encoded }] });Monitoring queries
Section titled “Monitoring queries”# group lagkafka-consumer-groups --bootstrap-server k:9092 --describe --group g
# under-replicated partitionskafka-topics --describe --bootstrap-server k:9092 --under-replicated-partitions
# topic configkafka-configs --describe --bootstrap-server k:9092 --entity-type topics --entity-name tLag monitoring (Prometheus + Kafka Exporter)
Section titled “Lag monitoring (Prometheus + Kafka Exporter)”Critical metrics:
kafka_consumergroup_lagper group/topic/partitionkafka_topic_partition_under_replicated_partitionkafka_topic_partitions
Reset consumer group offsets
Section titled “Reset consumer group offsets”# move to startkafka-consumer-groups --bootstrap-server k:9092 \ --group g --topic t --reset-offsets --to-earliest --execute
# move to specific timestampkafka-consumer-groups --bootstrap-server k:9092 \ --group g --topic t --reset-offsets \ --to-datetime 2026-05-01T00:00:00.000 --executeProducer/consumer config cheatsheet
Section titled “Producer/consumer config cheatsheet”| Setting | Effect |
|---|---|
acks=all | wait for ISR — durable |
enable.idempotence=true | dedupe retries |
linger.ms=10 | small wait → better batching |
batch.size=65536 | per partition batch |
compression.type=snappy | producer compression |
max.in.flight.requests.per.connection=5 | OK with idempotence |
auto.offset.reset=earliest/latest/none | consumer’s first read |
enable.auto.commit=false | manual commits |
max.poll.records=500 | tune batch size |
max.poll.interval.ms=300000 | max time between polls |
Useful tools
Section titled “Useful tools”- kcat (formerly kafkacat) — Swiss army knife.
- Conduktor / Redpanda Console / Kafdrop — UI.
- Cruise Control — automated rebalancing.
- Burrow — lag monitor.
- Debezium — CDC connector.
- Strimzi — Kafka on Kubernetes operator.
Anti-patterns
Section titled “Anti-patterns”- Sending huge messages (>1MB) — split or store payload externally + send reference.
- Single-key hot partition — all traffic to one partition.
- Skipping schema registry — producers/consumers diverge silently.
- Auto-creating topics in prod.
Deep — KafkaJS producer + consumer
Section titled “Deep — KafkaJS producer + consumer”import { Kafka } from 'kafkajs'const kafka = new Kafka({ clientId: 'orders-svc', brokers: ['b1:9092','b2:9092','b3:9092'] })
const producer = kafka.producer({ idempotent: true })const consumer = kafka.consumer({ groupId: 'orders-billing-v1' })
await producer.connect()await producer.send({ topic: 'orders', messages: [{ key: 'user-42', value: JSON.stringify({ id: 1, total: 99 }) }] })
await consumer.connect()await consumer.subscribe({ topic: 'orders', fromBeginning: false })await consumer.run({ eachMessage: async ({ topic, partition, message }) => { console.log({ p: partition, off: message.offset, key: message.key?.toString() })}})Deep — custom partitioner
Section titled “Deep — custom partitioner”const tenantPartitioner = () => ({ topic, partitionMetadata, message }) => { const tenantId = message.headers?.['tenant-id']?.toString() ?? 'default' const numPartitions = partitionMetadata.length // simple FNV-1a; production: use murmur2 to match Java clients let h = 2166136261 for (const c of tenantId) { h ^= c.charCodeAt(0); h = Math.imul(h, 16777619) } return Math.abs(h) % numPartitions}const producer = kafka.producer({ createPartitioner: tenantPartitioner })Deep — static membership + cooperative rebalance
Section titled “Deep — static membership + cooperative rebalance”const consumer = kafka.consumer({ groupId: 'invoices-v2', groupInstanceId: `invoices-${process.env.POD_NAME}`, // static membership sessionTimeout: 45000, heartbeatInterval: 3000, rebalanceTimeout: 60000,})await consumer.run({ partitionsConsumedConcurrently: 3, eachMessage: async ({ message, heartbeat }) => { await handle(message) await heartbeat() // long handlers must heartbeat or get evicted },})Deep — exactly-once consume-process-produce
Section titled “Deep — exactly-once consume-process-produce”const producer = kafka.producer({ idempotent: true, // implies acks=-1, retries=Infinity maxInFlightRequests: 1, // required for KafkaJS EoS ordering transactionalId: `enricher-${process.env.POD_NAME}`,})await producer.connect()const consumer = kafka.consumer({ groupId: 'enricher' })await consumer.subscribe({ topic: 'orders.raw' })
await consumer.run({ eachBatchAutoResolve: false, eachBatch: async ({ batch, resolveOffset, heartbeat }) => { const tx = await producer.transaction() try { for (const m of batch.messages) { const enriched = await enrich(m.value) await tx.send({ topic: 'orders.enriched', messages: [{ key: m.key, value: enriched }] }) resolveOffset(m.offset); await heartbeat() } await tx.sendOffsets({ consumerGroupId: 'enricher', topics: [{ topic: batch.topic, partitions: [{ partition: batch.partition, offset: (BigInt(batch.lastOffset()) + 1n).toString() }] }], }) await tx.commit() } catch (e) { await tx.abort(); throw e } },})Deep — DLQ + retry chain
Section titled “Deep — DLQ + retry chain”const RETRIES = [{ topic: 'orders.retry.5s', delayMs: 5_000 }, { topic: 'orders.retry.30s', delayMs: 30_000 }, { topic: 'orders.retry.5m', delayMs: 300_000 }]const DLQ = 'orders.DLQ'
async function processWithRetry(message, retryIndex = -1) { const eventId = message.headers['event-id'].toString() if (await dedupe.exists(eventId)) return // idempotent consumer try { await businessLogic(JSON.parse(message.value)) await dedupe.set(eventId, Date.now(), { ttl: '7d' }) } catch (err) { const next = RETRIES[retryIndex + 1] const target = next?.topic ?? DLQ await producer.send({ topic: target, messages: [{ key: message.key, value: message.value, headers: { ...message.headers, 'x-original-topic': 'orders', 'x-error': err.message, 'x-attempt': String(retryIndex + 2), 'x-retry-after': String(Date.now() + (next?.delayMs ?? 0)) }, }]}) }}// Retry consumer waits for delay before processingasync function retryHandler({ message }) { const ready = Number(message.headers['x-retry-after']) const wait = ready - Date.now() if (wait > 0) await new Promise(r => setTimeout(r, wait)) await processWithRetry(message, currentRetryIndex)}Deep — Debezium Postgres connector config
Section titled “Deep — Debezium Postgres connector config”{ "name": "orders-cdc", "config": { "connector.class": "io.debezium.connector.postgresql.PostgresConnector", "database.hostname": "pg-primary", "database.port": "5432", "database.user": "debezium", "database.dbname": "orders", "plugin.name": "pgoutput", "publication.name": "dbz_orders", "slot.name": "debezium_orders", "table.include.list": "public.orders,public.outbox", "topic.prefix": "orders", "snapshot.mode": "initial", "transforms": "outbox", "transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter", "transforms.outbox.route.topic.replacement": "${routedByValue}" }}Deep — Avro schema evolution example
Section titled “Deep — Avro schema evolution example”// v1{ "type":"record", "name":"Order", "fields":[ {"name":"id","type":"string"}, {"name":"total","type":"double"}]}// v2 — BACKWARD compatible (consumer with v2 reads v1 data using default){ "type":"record", "name":"Order", "fields":[ {"name":"id","type":"string"}, {"name":"total","type":"double"}, {"name":"currency","type":"string","default":"USD"}]}Deep — topic config via admin API
Section titled “Deep — topic config via admin API”await admin.createTopics({ topics: [{ topic: 'orders.state', numPartitions: 12, replicationFactor: 3, configEntries: [ { name: 'min.insync.replicas', value: '2' }, { name: 'cleanup.policy', value: 'compact' }, { name: 'unclean.leader.election.enable', value: 'false' }, { name: 'segment.ms', value: '604800000' }, ],}]})// Producer matched to durability targetconst producer = kafka.producer({ idempotent: true /* sets acks=-1 */ })Deep — Kafka Streams DSL (Java)
Section titled “Deep — Kafka Streams DSL (Java)”StreamsBuilder b = new StreamsBuilder();KStream<String, Order> orders = b.stream("orders");KTable<String, Long> countsPerCustomer = orders .groupBy((k, o) -> o.customerId()) .count(Materialized.as("orders-by-customer"));countsPerCustomer.toStream().to("customer-order-counts", Produced.with(Serdes.String(), Serdes.Long()));- Treating Kafka as a request/response system.