Skip to content

Changelog - 2026-03-12

Kafka Helpers Enhancement

Major enhancement to the Kafka helpers adding enterprise-grade capabilities while maintaining backward compatibility. All three helpers now share a common BaseKafkaHelper base class with health tracking, broker event callbacks, and graceful shutdown.

New Features

BaseKafkaHelper — Shared Base Class

All Kafka helpers (producer, consumer, admin) now extend BaseKafkaHelper<TClient> which provides:

  • Health tracking: isHealthy(), isReady(), getHealthStatus() — automatically updated via broker events
  • Broker event callbacks: onBrokerConnect, onBrokerDisconnect via options
  • Graceful shutdown: Timeout-based with automatic force fallback
  • Health statuses: 'connected' | 'disconnected' | 'unknown'
typescript
const helper = KafkaProducerHelper.newInstance({
  bootstrapBrokers: ['localhost:9092'],
  clientId: 'my-producer',
  shutdownTimeout: 30_000,
  onBrokerConnect: ({ broker }) => console.log(`Connected to ${broker.host}:${broker.port}`),
  onBrokerDisconnect: ({ broker }) => console.log(`Disconnected from ${broker.host}`),
});

helper.isHealthy();      // true when broker connected
helper.getHealthStatus(); // 'connected' | 'disconnected' | 'unknown'
await helper.close();     // graceful shutdown with timeout fallback

Consumer Message Callbacks

Callbacks-in-options pattern (matching BullMQ/MQTT helpers):

typescript
const consumer = KafkaConsumerHelper.newInstance({
  ...,
  onMessage: async ({ message }) => {
    await processMessage(message);
    await message.commit();
  },
  onMessageDone: ({ message }) => console.log('Done:', message.key),
  onMessageError: ({ error, message }) => console.error('Error:', error),
});

await consumer.start({ topics: ['orders'] });

Consumer Group Event Callbacks

typescript
const consumer = KafkaConsumerHelper.newInstance({
  ...,
  onGroupJoin: ({ groupId, memberId, generationId }) => { ... },
  onGroupLeave: ({ groupId, memberId }) => { ... },
  onGroupRebalance: ({ groupId }) => { ... },
  onHeartbeatError: ({ error, groupId, memberId }) => { ... },
});

Consumer Lag Monitoring

typescript
const consumer = KafkaConsumerHelper.newInstance({
  ...,
  onLag: ({ lag }) => { ... },
  onLagError: ({ error }) => { ... },
});

consumer.startLagMonitoring({ topics: ['orders'], interval: 10_000 });
consumer.stopLagMonitoring();

Producer Transaction Helper

typescript
const result = await producer.runInTransaction(async ({ send, addConsumer, addOffset }) => {
  await addConsumer(consumer.getConsumer());
  await addOffset(incomingMessage);
  return send({ messages: [{ topic: 'output', key: 'k1', value: 'v1' }] });
});

Schema Registry Helper (New)

typescript
const registry = KafkaSchemaRegistryHelper.newInstance({
  url: 'http://localhost:8081',
});

const producer = KafkaProducerHelper.newInstance({
  ...,
  registry: registry.getRegistry(),
});

Constants & Types

New Constants

ConstantDescription
KafkaHealthStatusesCONNECTED, DISCONNECTED, UNKNOWN
KafkaClientEventsAll broker + consumer event name constants
KafkaDefaults.SHUTDOWN_TIMEOUT30000
KafkaDefaults.LAG_MONITOR_INTERVAL30000
KafkaDefaults.CONSUME_MODE'committed'
KafkaDefaults.CONSUME_FALLBACK_MODE'latest'

New Callback Types

All callbacks follow the Ignis opts: { ... } pattern with ValueOrPromise<void> return type:

  • TKafkaBrokerEventCallback
  • TKafkaMessageCallback, TKafkaMessageDoneCallback, TKafkaMessageErrorCallback
  • TKafkaGroupJoinCallback, TKafkaGroupLeaveCallback, TKafkaGroupRebalanceCallback
  • TKafkaHeartbeatErrorCallback
  • TKafkaLagCallback, TKafkaLagErrorCallback
  • TKafkaTransactionCallback

Renamed Types

BeforeAfter
IKafkaProducerOptsIKafkaProducerOptions
IKafkaConsumerOptsIKafkaConsumerOptions
IKafkaAdminOptsIKafkaAdminOptions
IKafkaSchemaRegistryOptsIKafkaSchemaRegistryOptions

Files Changed

Helpers Package (packages/helpers)

FileChanges
kafka/common/constants.tsAdded KafkaHealthStatuses, KafkaClientEvents, new defaults
kafka/common/types.tsAdded callback types, enhanced options, transaction context, schema registry options
kafka/base.tsNew: BaseKafkaHelper<TClient> — shared health tracking, broker events, graceful shutdown
kafka/producer.tsExtended BaseKafkaHelper, added runInTransaction(), graceful shutdown
kafka/consumer.tsExtended BaseKafkaHelper, added start(), message callbacks, lag monitoring, graceful shutdown
kafka/admin.tsExtended BaseKafkaHelper, added graceful shutdown
kafka/schema/registry.tsNew: KafkaSchemaRegistryHelper
kafka/index.tsAdded base and schema exports

Docs Package (packages/docs)

FileChanges
wiki/extensions/helpers/kafka/index.mdRewritten — architecture, BaseKafkaHelper, all constants
wiki/extensions/helpers/kafka/producer.mdRewritten — health, transactions, graceful shutdown
wiki/extensions/helpers/kafka/consumer.mdRewritten — callbacks, start(), lag monitoring, graceful shutdown
wiki/extensions/helpers/kafka/admin.mdUpdated — health tracking, graceful shutdown
wiki/extensions/helpers/kafka/schema-registry.mdNew
wiki/extensions/helpers/kafka/examples.mdRewritten — all examples use new callback/health API
wiki/changelogs/2026-03-12-kafka-helpers-enhancement.mdThis changelog

Backward Compatibility

  • All new options are optional — existing code works unchanged
  • getProducer(), getConsumer(), getAdmin() still return raw platformatic objects
  • close() signature adds optional { isForce? } to the existing call
  • Generic type defaults remain string for all positions
  • static newInstance() accepts existing options without changes