Skip to content

Examples & Troubleshooting

Complete examples and common issue resolution for the Kafka helpers.

Producer: Send Messages with Health Monitoring

typescript
import { KafkaProducerHelper, KafkaAcks } from '@venizia/ignis-helpers/kafka';
import { stringSerializers } from '@platformatic/kafka';

const helper = KafkaProducerHelper.newInstance({
  bootstrapBrokers: ['broker1:9092', 'broker2:9092'],
  clientId: 'interval-producer',
  serializers: stringSerializers,
  acks: KafkaAcks.ALL,
  onBrokerConnect: ({ broker }) => console.log(`Connected to ${broker.host}:${broker.port}`),
  onBrokerDisconnect: ({ broker }) => console.warn(`Disconnected from ${broker.host}`),
});

const producer = helper.getProducer();
let count = 0;

const interval = setInterval(async () => {
  if (!helper.isHealthy()) {
    console.warn('Producer not healthy, skipping...');
    return;
  }

  await producer.send({
    messages: [{
      topic: 'events',
      key: `key-${count % 3}`,
      value: JSON.stringify({ index: count, timestamp: new Date().toISOString() }),
    }],
  });
  count++;
}, 100);

process.on('SIGINT', async () => {
  clearInterval(interval);
  console.log(`Shutting down... (sent ${count} messages)`);
  await helper.close();
  process.exit(0);
});

Consumer: Callback-Based with Lag Monitoring

typescript
import { KafkaConsumerHelper } from '@venizia/ignis-helpers/kafka';
import { stringDeserializers } from '@platformatic/kafka';

const helper = KafkaConsumerHelper.newInstance({
  bootstrapBrokers: ['broker1:9092', 'broker2:9092'],
  clientId: 'event-consumer',
  groupId: 'processing-group',
  deserializers: stringDeserializers,

  onMessage: async ({ message }) => {
    const data = JSON.parse(message.value!);
    console.log(`Processing: ${message.key} -> ${JSON.stringify(data)}`);
    await message.commit();
  },
  onMessageDone: ({ message }) => {
    console.log(`Done: ${message.key}`);
  },
  onMessageError: ({ error, message }) => {
    console.error(`Error processing ${message?.key}:`, error.message);
  },

  onGroupJoin: ({ groupId, memberId }) => {
    console.log(`Joined group ${groupId} as ${memberId}`);
  },
  onGroupRebalance: ({ groupId }) => {
    console.log(`Rebalance in ${groupId}`);
  },

  onLag: ({ lag }) => {
    for (const [topic, partitionLags] of lag) {
      partitionLags.forEach((lagValue, partition) => {
        if (lagValue > 1000n) {
          console.warn(`High lag on ${topic}[${partition}]: ${lagValue}`);
        }
      });
    }
  },
  onLagError: ({ error }) => console.error('Lag error:', error),
});

await helper.start({ topics: ['events'] });
helper.startLagMonitoring({ topics: ['events'], interval: 10_000 });

process.on('SIGINT', async () => {
  await helper.close();
  process.exit(0);
});

Consumer: Direct Stream Access (for-await)

typescript
import { KafkaConsumerHelper } from '@venizia/ignis-helpers/kafka';
import { stringDeserializers } from '@platformatic/kafka';

const helper = KafkaConsumerHelper.newInstance({
  bootstrapBrokers: ['localhost:9092'],
  clientId: 'stream-consumer',
  groupId: 'stream-group',
  deserializers: stringDeserializers,
  onBrokerConnect: ({ broker }) => console.log(`Connected to ${broker.host}`),
});

// Use the consumer directly for async iterator pattern
const consumer = helper.getConsumer();
const stream = await consumer.consume({
  topics: ['orders'],
  mode: 'committed',
  fallbackMode: 'latest',
});

for await (const message of stream) {
  console.log(`${message.topic}[${message.partition}] @${message.offset}: ${message.key} -> ${message.value}`);
  await message.commit();
}

await stream.close();
await helper.close();

Admin: Topic Setup Script

typescript
import { KafkaAdminHelper } from '@venizia/ignis-helpers/kafka';

async function setupTopics() {
  const helper = KafkaAdminHelper.newInstance({
    bootstrapBrokers: ['localhost:9092'],
    clientId: 'topic-setup',
    onBrokerConnect: ({ broker }) => console.log(`Connected to ${broker.host}`),
  });

  const admin = helper.getAdmin();

  // Create topics
  await admin.createTopics({
    topics: ['orders', 'inventory', 'notifications'],
    partitions: 6,
    replicas: 3,
    configs: [
      { name: 'retention.ms', value: '604800000' },
      { name: 'compression.type', value: 'zstd' },
    ],
  });

  // Verify
  const topics = await admin.listTopics({ includeInternals: false });
  console.log('Topics:', topics);

  // Health check
  console.log('Healthy:', helper.isHealthy());

  await helper.close();
}

setupTopics();

Exactly-Once: Consume-Transform-Produce

typescript
import { KafkaProducerHelper, KafkaConsumerHelper } from '@venizia/ignis-helpers/kafka';
import { stringSerializers, stringDeserializers } from '@platformatic/kafka';

const producer = KafkaProducerHelper.newInstance({
  bootstrapBrokers: ['localhost:9092'],
  clientId: 'eos-producer',
  serializers: stringSerializers,
  transactionalId: 'eos-tx',
  idempotent: true,
});

const consumer = KafkaConsumerHelper.newInstance({
  bootstrapBrokers: ['localhost:9092'],
  clientId: 'eos-consumer',
  groupId: 'eos-group',
  deserializers: stringDeserializers,
  autocommit: false,

  onMessage: async ({ message }) => {
    // Consume-transform-produce within a single transaction
    const transformed = JSON.stringify({
      ...JSON.parse(message.value!),
      processedAt: new Date().toISOString(),
    });

    await producer.runInTransaction(async ({ send, addConsumer, addOffset }) => {
      await addConsumer(consumer.getConsumer());
      await addOffset(message);
      await send({
        messages: [{ topic: 'processed-events', key: message.key, value: transformed }],
      });
    });
  },
  onMessageError: ({ error }) => console.error('Processing error:', error),
});

await consumer.start({ topics: ['raw-events'] });

Schema Registry: Validated Messages

typescript
import {
  KafkaSchemaRegistryHelper,
  KafkaProducerHelper,
  KafkaConsumerHelper,
} from '@venizia/ignis-helpers/kafka';

const registry = KafkaSchemaRegistryHelper.newInstance({
  url: 'http://localhost:8081',
});

const producer = KafkaProducerHelper.newInstance({
  bootstrapBrokers: ['localhost:9092'],
  clientId: 'schema-producer',
  registry: registry.getRegistry(),
  onBrokerConnect: ({ broker }) => console.log(`Producer connected to ${broker.host}`),
});

// Sends schema-validated objects
await producer.getProducer().send({
  messages: [{
    topic: 'orders',
    key: 'order-1',
    value: { id: 1, status: 'created', total: 99.99 },
  }],
});

const consumer = KafkaConsumerHelper.newInstance({
  bootstrapBrokers: ['localhost:9092'],
  clientId: 'schema-consumer',
  groupId: 'schema-group',
  registry: registry.getRegistry(),
  onMessage: async ({ message }) => {
    // message.value is auto-deserialized to the schema type
    console.log(message.value.id, message.value.status);
    await message.commit();
  },
});

await consumer.start({ topics: ['orders'] });

Using Helpers with Ignis IoC

typescript
import {
  KafkaProducerHelper,
  KafkaConsumerHelper,
  KafkaAdminHelper,
} from '@venizia/ignis-helpers/kafka';
import { stringSerializers, stringDeserializers } from '@platformatic/kafka';
import { inject, injectable } from '@venizia/ignis-inversion';

// Register helpers in the IoC container
app.bind('kafka.producer').to(
  KafkaProducerHelper.newInstance({
    bootstrapBrokers: ['localhost:9092'],
    clientId: 'order-service-producer',
    serializers: stringSerializers,
    onBrokerConnect: ({ broker }) => console.log(`Producer -> ${broker.host}`),
  }),
);

app.bind('kafka.consumer').to(
  KafkaConsumerHelper.newInstance({
    bootstrapBrokers: ['localhost:9092'],
    clientId: 'order-service-consumer',
    groupId: 'order-service',
    deserializers: stringDeserializers,
    onMessage: async ({ message }) => {
      // Handled by the service below
    },
    onBrokerConnect: ({ broker }) => console.log(`Consumer -> ${broker.host}`),
  }),
);

// Inject into services
@injectable()
export class OrderEventService {
  constructor(
    @inject({ key: 'kafka.producer' }) private producer: KafkaProducerHelper,
    @inject({ key: 'kafka.consumer' }) private consumer: KafkaConsumerHelper,
  ) {}

  async publishOrderCreated(orderId: string, data: Record<string, unknown>) {
    await this.producer.getProducer().send({
      messages: [{ topic: 'order-events', key: orderId, value: JSON.stringify(data) }],
    });
  }

  async startConsuming() {
    await this.consumer.start({ topics: ['order-events'] });
  }
}

Troubleshooting

Common Issues

ErrorCauseFix
ECONNREFUSED localhost:9092Broker advertised.listeners set to localhost but connecting remotelySet KAFKA_ADVERTISED_LISTENERS with the correct external host IP
Request timed outSASL handshake or broker unreachableAdd connectTimeout: 30_000, requestTimeout: 30_000
Connection closedConnecting without SASL to a SASL-required listenerCheck KAFKA_LISTENER_SECURITY_PROTOCOL_MAP -- use SASL_PLAINTEXT
Cannot find a suitable SASL mechanismWrong mechanism (e.g., PLAIN when broker only supports SCRAM-SHA-512)Check error message for supported mechanisms, match mechanism
Failed to deserialize a messageMismatch between serializer and deserializerEnsure matching serde. For old data, use a new consumer group or recreate topic
JSON.stringify cannot serialize BigIntmessage.offset and message.timestamp are bigintUse custom replacer: (_k, v) => typeof v === 'bigint' ? v.toString() : v
Consumer idle (no messages)More consumers than partitionsEnsure numPartitions >= numConsumers
isHealthy() returns falseAll brokers disconnected (a single idle disconnect won't trigger this)Check broker addresses, SASL config, network connectivity. Use getConnectedBrokerCount() for details
isReady() returns false (consumer)Consumer not active -- start() not called or stream closedCall await helper.start({ topics }) before checking readiness
Graceful shutdown timeoutIn-flight requests taking too longIncrease shutdownTimeout or use close({ isForce: true })

Docker Kafka Configuration

When running Kafka in Docker and connecting from outside the container:

yaml
environment:
  DOCKER_HOST_IP: '192.168.1.100'  # Your host machine's IP
  KAFKA_ADVERTISED_LISTENERS: >
    INTERNAL://kafka-1:29092,
    EXTERNAL://${DOCKER_HOST_IP}:19092
  KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: >
    INTERNAL:PLAINTEXT,
    EXTERNAL:SASL_PLAINTEXT,
    CONTROLLER:PLAINTEXT
  • INTERNAL -- used for inter-broker communication
  • EXTERNAL -- used for client connections from outside Docker
  • CONTROLLER -- used for KRaft controller communication

See Also