Examples & Troubleshooting
Complete examples and common issue resolution for the Kafka helpers.
Producer: Send Messages with Health Monitoring
typescript
import { KafkaProducerHelper, KafkaAcks } from '@venizia/ignis-helpers/kafka';
import { stringSerializers } from '@platformatic/kafka';
const helper = KafkaProducerHelper.newInstance({
bootstrapBrokers: ['broker1:9092', 'broker2:9092'],
clientId: 'interval-producer',
serializers: stringSerializers,
acks: KafkaAcks.ALL,
onBrokerConnect: ({ broker }) => console.log(`Connected to ${broker.host}:${broker.port}`),
onBrokerDisconnect: ({ broker }) => console.warn(`Disconnected from ${broker.host}`),
});
const producer = helper.getProducer();
let count = 0;
const interval = setInterval(async () => {
if (!helper.isHealthy()) {
console.warn('Producer not healthy, skipping...');
return;
}
await producer.send({
messages: [{
topic: 'events',
key: `key-${count % 3}`,
value: JSON.stringify({ index: count, timestamp: new Date().toISOString() }),
}],
});
count++;
}, 100);
process.on('SIGINT', async () => {
clearInterval(interval);
console.log(`Shutting down... (sent ${count} messages)`);
await helper.close();
process.exit(0);
});Consumer: Callback-Based with Lag Monitoring
typescript
import { KafkaConsumerHelper } from '@venizia/ignis-helpers/kafka';
import { stringDeserializers } from '@platformatic/kafka';
const helper = KafkaConsumerHelper.newInstance({
bootstrapBrokers: ['broker1:9092', 'broker2:9092'],
clientId: 'event-consumer',
groupId: 'processing-group',
deserializers: stringDeserializers,
onMessage: async ({ message }) => {
const data = JSON.parse(message.value!);
console.log(`Processing: ${message.key} -> ${JSON.stringify(data)}`);
await message.commit();
},
onMessageDone: ({ message }) => {
console.log(`Done: ${message.key}`);
},
onMessageError: ({ error, message }) => {
console.error(`Error processing ${message?.key}:`, error.message);
},
onGroupJoin: ({ groupId, memberId }) => {
console.log(`Joined group ${groupId} as ${memberId}`);
},
onGroupRebalance: ({ groupId }) => {
console.log(`Rebalance in ${groupId}`);
},
onLag: ({ lag }) => {
for (const [topic, partitionLags] of lag) {
partitionLags.forEach((lagValue, partition) => {
if (lagValue > 1000n) {
console.warn(`High lag on ${topic}[${partition}]: ${lagValue}`);
}
});
}
},
onLagError: ({ error }) => console.error('Lag error:', error),
});
await helper.start({ topics: ['events'] });
helper.startLagMonitoring({ topics: ['events'], interval: 10_000 });
process.on('SIGINT', async () => {
await helper.close();
process.exit(0);
});Consumer: Direct Stream Access (for-await)
typescript
import { KafkaConsumerHelper } from '@venizia/ignis-helpers/kafka';
import { stringDeserializers } from '@platformatic/kafka';
const helper = KafkaConsumerHelper.newInstance({
bootstrapBrokers: ['localhost:9092'],
clientId: 'stream-consumer',
groupId: 'stream-group',
deserializers: stringDeserializers,
onBrokerConnect: ({ broker }) => console.log(`Connected to ${broker.host}`),
});
// Use the consumer directly for async iterator pattern
const consumer = helper.getConsumer();
const stream = await consumer.consume({
topics: ['orders'],
mode: 'committed',
fallbackMode: 'latest',
});
for await (const message of stream) {
console.log(`${message.topic}[${message.partition}] @${message.offset}: ${message.key} -> ${message.value}`);
await message.commit();
}
await stream.close();
await helper.close();Admin: Topic Setup Script
typescript
import { KafkaAdminHelper } from '@venizia/ignis-helpers/kafka';
async function setupTopics() {
const helper = KafkaAdminHelper.newInstance({
bootstrapBrokers: ['localhost:9092'],
clientId: 'topic-setup',
onBrokerConnect: ({ broker }) => console.log(`Connected to ${broker.host}`),
});
const admin = helper.getAdmin();
// Create topics
await admin.createTopics({
topics: ['orders', 'inventory', 'notifications'],
partitions: 6,
replicas: 3,
configs: [
{ name: 'retention.ms', value: '604800000' },
{ name: 'compression.type', value: 'zstd' },
],
});
// Verify
const topics = await admin.listTopics({ includeInternals: false });
console.log('Topics:', topics);
// Health check
console.log('Healthy:', helper.isHealthy());
await helper.close();
}
setupTopics();Exactly-Once: Consume-Transform-Produce
typescript
import { KafkaProducerHelper, KafkaConsumerHelper } from '@venizia/ignis-helpers/kafka';
import { stringSerializers, stringDeserializers } from '@platformatic/kafka';
const producer = KafkaProducerHelper.newInstance({
bootstrapBrokers: ['localhost:9092'],
clientId: 'eos-producer',
serializers: stringSerializers,
transactionalId: 'eos-tx',
idempotent: true,
});
const consumer = KafkaConsumerHelper.newInstance({
bootstrapBrokers: ['localhost:9092'],
clientId: 'eos-consumer',
groupId: 'eos-group',
deserializers: stringDeserializers,
autocommit: false,
onMessage: async ({ message }) => {
// Consume-transform-produce within a single transaction
const transformed = JSON.stringify({
...JSON.parse(message.value!),
processedAt: new Date().toISOString(),
});
await producer.runInTransaction(async ({ send, addConsumer, addOffset }) => {
await addConsumer(consumer.getConsumer());
await addOffset(message);
await send({
messages: [{ topic: 'processed-events', key: message.key, value: transformed }],
});
});
},
onMessageError: ({ error }) => console.error('Processing error:', error),
});
await consumer.start({ topics: ['raw-events'] });Schema Registry: Validated Messages
typescript
import {
KafkaSchemaRegistryHelper,
KafkaProducerHelper,
KafkaConsumerHelper,
} from '@venizia/ignis-helpers/kafka';
const registry = KafkaSchemaRegistryHelper.newInstance({
url: 'http://localhost:8081',
});
const producer = KafkaProducerHelper.newInstance({
bootstrapBrokers: ['localhost:9092'],
clientId: 'schema-producer',
registry: registry.getRegistry(),
onBrokerConnect: ({ broker }) => console.log(`Producer connected to ${broker.host}`),
});
// Sends schema-validated objects
await producer.getProducer().send({
messages: [{
topic: 'orders',
key: 'order-1',
value: { id: 1, status: 'created', total: 99.99 },
}],
});
const consumer = KafkaConsumerHelper.newInstance({
bootstrapBrokers: ['localhost:9092'],
clientId: 'schema-consumer',
groupId: 'schema-group',
registry: registry.getRegistry(),
onMessage: async ({ message }) => {
// message.value is auto-deserialized to the schema type
console.log(message.value.id, message.value.status);
await message.commit();
},
});
await consumer.start({ topics: ['orders'] });Using Helpers with Ignis IoC
typescript
import {
KafkaProducerHelper,
KafkaConsumerHelper,
KafkaAdminHelper,
} from '@venizia/ignis-helpers/kafka';
import { stringSerializers, stringDeserializers } from '@platformatic/kafka';
import { inject, injectable } from '@venizia/ignis-inversion';
// Register helpers in the IoC container
app.bind('kafka.producer').to(
KafkaProducerHelper.newInstance({
bootstrapBrokers: ['localhost:9092'],
clientId: 'order-service-producer',
serializers: stringSerializers,
onBrokerConnect: ({ broker }) => console.log(`Producer -> ${broker.host}`),
}),
);
app.bind('kafka.consumer').to(
KafkaConsumerHelper.newInstance({
bootstrapBrokers: ['localhost:9092'],
clientId: 'order-service-consumer',
groupId: 'order-service',
deserializers: stringDeserializers,
onMessage: async ({ message }) => {
// Handled by the service below
},
onBrokerConnect: ({ broker }) => console.log(`Consumer -> ${broker.host}`),
}),
);
// Inject into services
@injectable()
export class OrderEventService {
constructor(
@inject({ key: 'kafka.producer' }) private producer: KafkaProducerHelper,
@inject({ key: 'kafka.consumer' }) private consumer: KafkaConsumerHelper,
) {}
async publishOrderCreated(orderId: string, data: Record<string, unknown>) {
await this.producer.getProducer().send({
messages: [{ topic: 'order-events', key: orderId, value: JSON.stringify(data) }],
});
}
async startConsuming() {
await this.consumer.start({ topics: ['order-events'] });
}
}Troubleshooting
Common Issues
| Error | Cause | Fix |
|---|---|---|
ECONNREFUSED localhost:9092 | Broker advertised.listeners set to localhost but connecting remotely | Set KAFKA_ADVERTISED_LISTENERS with the correct external host IP |
Request timed out | SASL handshake or broker unreachable | Add connectTimeout: 30_000, requestTimeout: 30_000 |
Connection closed | Connecting without SASL to a SASL-required listener | Check KAFKA_LISTENER_SECURITY_PROTOCOL_MAP -- use SASL_PLAINTEXT |
Cannot find a suitable SASL mechanism | Wrong mechanism (e.g., PLAIN when broker only supports SCRAM-SHA-512) | Check error message for supported mechanisms, match mechanism |
Failed to deserialize a message | Mismatch between serializer and deserializer | Ensure matching serde. For old data, use a new consumer group or recreate topic |
JSON.stringify cannot serialize BigInt | message.offset and message.timestamp are bigint | Use custom replacer: (_k, v) => typeof v === 'bigint' ? v.toString() : v |
| Consumer idle (no messages) | More consumers than partitions | Ensure numPartitions >= numConsumers |
isHealthy() returns false | All brokers disconnected (a single idle disconnect won't trigger this) | Check broker addresses, SASL config, network connectivity. Use getConnectedBrokerCount() for details |
isReady() returns false (consumer) | Consumer not active -- start() not called or stream closed | Call await helper.start({ topics }) before checking readiness |
| Graceful shutdown timeout | In-flight requests taking too long | Increase shutdownTimeout or use close({ isForce: true }) |
Docker Kafka Configuration
When running Kafka in Docker and connecting from outside the container:
yaml
environment:
DOCKER_HOST_IP: '192.168.1.100' # Your host machine's IP
KAFKA_ADVERTISED_LISTENERS: >
INTERNAL://kafka-1:29092,
EXTERNAL://${DOCKER_HOST_IP}:19092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: >
INTERNAL:PLAINTEXT,
EXTERNAL:SASL_PLAINTEXT,
CONTROLLER:PLAINTEXTINTERNAL-- used for inter-broker communicationEXTERNAL-- used for client connections from outside DockerCONTROLLER-- used for KRaft controller communication
See Also
Kafka Pages:
- Overview & Fundamentals -- Connection, serialization, constants, compression
- Producer -- Producer helper, transactions, API reference
- Consumer -- Consumer helper, callbacks, lag monitoring, API reference
- Admin -- Admin helper & API reference
- Schema Registry -- Schema registry helper
Other Helpers:
- Queue Helper -- BullMQ, MQTT, and in-memory queues
- Redis Helper -- Redis connection management
External Resources:
- @platformatic/kafka -- Underlying Kafka client library
- Apache Kafka Documentation -- Official Kafka docs
- KIP-848 -- New consumer group protocol