Skip to content

Queue

Message queuing and asynchronous task management with BullMQ, Kafka, MQTT, and in-memory solutions.

Quick Reference

ClassExtendsPeer DependencyUse Case
BullMQHelperBaseHelperbullmq (^5.70.0)Redis-backed job queue -- background processing, task scheduling
KafkaProducerHelperBaseKafkaHelper@platformatic/kafka (^1.30.0)Kafka message producer with transaction support
KafkaConsumerHelperBaseKafkaHelper@platformatic/kafka (^1.30.0)Kafka message consumer with lag monitoring
KafkaAdminHelperBaseKafkaHelper@platformatic/kafka (^1.30.0)Kafka admin operations (topic management)
KafkaSchemaRegistryHelperBaseHelper@platformatic/kafka (^1.30.0)Confluent Schema Registry integration
MQTTClientHelperBaseHelpermqtt (^5.15.0)MQTT broker messaging -- real-time events, IoT
QueueHelperBaseHelperNoneIn-memory generator queue -- sequential tasks, single process

Common Operations

HelperSubscribe / ConsumePublish / Produce
BullMQCreate with role: 'worker'queue.add(name, data) via the exposed BullMQ Queue instance
Kafkaconsumer.start({ topics })producer.getProducer().send({ messages })
MQTTsubscribe({ topics })publish({ topic, message })
In-Memorynew QueueHelper({ onMessage })enqueue(payload)

Import Paths

typescript
// In-memory queue (from base package)
import { QueueHelper, QueueStatuses } from '@venizia/ignis-helpers';
import type { TQueueStatus, TQueueElement } from '@venizia/ignis-helpers';

// BullMQ (separate export path)
import { BullMQHelper } from '@venizia/ignis-helpers/bullmq';
import type { TBullQueueRole } from '@venizia/ignis-helpers/bullmq';

// Kafka (separate export path)
import {
  KafkaProducerHelper,
  KafkaConsumerHelper,
  KafkaAdminHelper,
  KafkaSchemaRegistryHelper,
  BaseKafkaHelper,
} from '@venizia/ignis-helpers/kafka';
import type {
  IKafkaProducerOptions,
  IKafkaConsumerOptions,
  IKafkaAdminOptions,
  IKafkaSchemaRegistryOptions,
  IKafkaConsumeStartOptions,
  IKafkaTransactionContext,
  TKafkaBrokerEventCallback,
  TKafkaMessageCallback,
  TKafkaMessageDoneCallback,
  TKafkaMessageErrorCallback,
  TKafkaGroupJoinCallback,
  TKafkaGroupLeaveCallback,
  TKafkaGroupRebalanceCallback,
  TKafkaHeartbeatErrorCallback,
  TKafkaLagCallback,
  TKafkaLagErrorCallback,
} from '@venizia/ignis-helpers/kafka';

// MQTT (separate export path)
import { MQTTClientHelper } from '@venizia/ignis-helpers/mqtt';
import type { IMQTTClientOptions } from '@venizia/ignis-helpers/mqtt';

Creating an Instance

All queue helpers extend BaseHelper (Kafka helpers via BaseKafkaHelper), providing scoped logging via this.logger.

BullMQHelper

The BullMQHelper wraps the BullMQ library for Redis-backed job queuing. It operates in one of two roles: 'queue' (producer) or 'worker' (consumer). The role is set at construction time and determines which BullMQ primitives are initialized.

typescript
import { DefaultRedisHelper } from '@venizia/ignis-helpers';
import { BullMQHelper } from '@venizia/ignis-helpers/bullmq';

const worker = new BullMQHelper({
  queueName: 'email-queue',
  identifier: 'email-worker',
  role: 'worker',
  redisConnection: redisHelper,
  numberOfWorker: 3,
  lockDuration: 90 * 60 * 1000,
  onWorkerData: async (job) => {
    console.log(`Processing job ${job.id}:`, job.data);
    return { status: 'sent' };
  },
  onWorkerDataCompleted: async (job, result) => {
    console.log(`Job ${job.id} completed:`, result);
  },
  onWorkerDataFail: async (job, error) => {
    console.error(`Job ${job?.id} failed:`, error.message);
  },
});

IBullMQOptions

IBullMQOptions<TQueueElement = any, TQueueResult = any>

OptionTypeDefaultDescription
queueNamestring--Name of the BullMQ queue. Must be non-empty.
identifierstring--Unique identifier used for scoped logging.
roleTBullQueueRole--'queue' (producer) or 'worker' (consumer).
redisConnectionDefaultRedisHelper--Redis helper instance. The helper calls getClient().duplicate() internally.
numberOfWorkernumber1Worker concurrency (number of jobs processed in parallel).
lockDurationnumber5400000Job lock duration in milliseconds (default: 90 minutes).
onWorkerData(job: Job<TQueueElement, TQueueResult>) => Promise<any>undefinedJob processing callback. If omitted, the worker logs job details.
onWorkerDataCompleted(job: Job<TQueueElement, TQueueResult>, result: any) => Promise<void>undefinedCallback fired when a job completes successfully.
onWorkerDataFail(job: Job<TQueueElement, TQueueResult> | undefined, error: Error) => Promise<void>undefinedCallback fired when a job fails.

IMPORTANT

Pass the DefaultRedisHelper instance to redisConnection, not the raw ioredis client. The helper internally calls redisConnection.getClient().duplicate() to create dedicated connections for the queue and worker.

MQTTClientHelper

The MQTTClientHelper provides a pub/sub interface to an MQTT broker. The client connects automatically during construction.

typescript
import { MQTTClientHelper } from '@venizia/ignis-helpers/mqtt';

const mqttClient = new MQTTClientHelper({
  identifier: 'sensor-client',
  url: 'mqtt://localhost:1883',
  options: {
    username: 'user',
    password: 'password',
  },
  onMessage: ({ topic, message }) => {
    console.log(`Received on ${topic}:`, message.toString());
  },
  onConnect: () => {
    console.log('Connected to MQTT broker');
  },
  onDisconnect: () => {
    console.log('Disconnected from MQTT broker');
  },
  onError: (error) => {
    console.error('MQTT error:', error);
  },
  onClose: (error) => {
    if (error) console.error('Connection closed with error:', error);
  },
});

IMQTTClientOptions

OptionTypeDefaultDescription
identifierstring--Unique identifier for scoped logging.
urlstring--MQTT broker URL (e.g., mqtt://localhost:1883). Must be non-empty.
optionsmqtt.IClientOptions--MQTT.js client options (username, password, keepalive, etc.).
onMessage(opts: { topic: string; message: Buffer }) => void--Message handler. Required.
onConnect() => voidundefinedCallback fired when the client connects to the broker.
onDisconnect() => voidundefinedCallback fired on disconnection.
onError(error: Error) => voidundefinedCallback fired on client errors.
onClose(error?: Error) => voidundefinedCallback fired when the connection is closed.

QueueHelper

The QueueHelper is a generator-based, in-memory queue with a built-in state machine. It processes enqueued items one at a time, making it suitable for sequential task processing within a single process.

typescript
import { QueueHelper } from '@venizia/ignis-helpers';

const queue = new QueueHelper<string>({
  identifier: 'task-queue',
  autoDispatch: true,
  onMessage: async ({ identifier, queueElement }) => {
    console.log(`[${identifier}] Processing:`, queueElement.payload);
  },
  onDataEnqueue: async ({ identifier, queueElement }) => {
    console.log(`[${identifier}] Enqueued:`, queueElement.payload);
  },
  onDataDequeue: async ({ identifier, queueElement }) => {
    console.log(`[${identifier}] Dequeued:`, queueElement.payload);
  },
  onStateChange: async ({ identifier, from, to }) => {
    console.log(`[${identifier}] State: ${from} -> ${to}`);
  },
});

IQueueCallback

IQueueCallback<TElementPayload>

OptionTypeDefaultDescription
identifierstring--Unique identifier for scoped logging.
autoDispatchbooleantrueIf true, automatically triggers processing when an element is enqueued.
onMessage(opts: { identifier: string; queueElement: TQueueElement<T> }) => ValueOrPromise<void>undefinedMessage processing callback. If omitted, the generator exits immediately.
onDataEnqueue(opts: { identifier: string; queueElement: TQueueElement<T> }) => ValueOrPromise<void>undefinedCallback fired after an element is added to the queue.
onDataDequeue(opts: { identifier: string; queueElement: TQueueElement<T> }) => ValueOrPromise<void>undefinedCallback fired after an element is removed from the queue.
onStateChange(opts: { identifier: string; from: TQueueStatus; to: TQueueStatus }) => ValueOrPromise<void>undefinedCallback fired on every state transition.

TQueueElement

Each element in the queue is wrapped in a TQueueElement:

typescript
type TQueueElement<T> = { isLocked: boolean; payload: T };

Kafka Producer

The KafkaProducerHelper wraps @platformatic/kafka Producer with lifecycle management, health tracking, and transaction support.

typescript
import { KafkaProducerHelper } from '@venizia/ignis-helpers/kafka';

const producer = KafkaProducerHelper.newInstance({
  bootstrapBrokers: ['127.0.0.1:29092'],
  clientId: 'my-producer',
  acks: -1,
  idempotent: true,
  onBrokerConnect: ({ broker }) => {
    console.log(`Connected to ${broker.host}:${broker.port}`);
  },
  onBrokerDisconnect: ({ broker }) => {
    console.log(`Disconnected from ${broker.host}:${broker.port}`);
  },
});

IKafkaProducerOptions

IKafkaProducerOptions<KeyType, ValueType, HeaderKeyType, HeaderValueType> extends IKafkaConnectionOptions.

OptionTypeDefaultDescription
bootstrapBrokersstring[]--Kafka broker addresses.
clientIdstring--Client identifier.
identifierstring'kafka-producer'Scoped logging identifier.
acks0 | 1 | -1--Acknowledgment mode. 0 = none, 1 = leader, -1 = all ISR.
idempotentboolean--Enable idempotent producer.
transactionalIdstring--Transactional ID (required for transactions with idempotent: true).
compressionCompressionAlgorithmValue--Message compression algorithm.
strictbooleantrueStrict mode for topic validation.
autocreateTopicsbooleanfalseAuto-create topics on send.
retriesnumber3Number of retries on failure.
retryDelaynumber1000Delay between retries in milliseconds.
shutdownTimeoutnumber30000Graceful shutdown timeout in milliseconds.
serializersPartial<Serializers<...>>--Custom key/value/header serializers.
registrySchemaRegistry<...>--Schema registry for serialization.
saslSASLOptions--SASL authentication options.
tlsTLSOptions--TLS connection options.
sslSSLOptions--SSL connection options.
connectTimeoutnumber--Connection timeout in milliseconds.
requestTimeoutnumber--Request timeout in milliseconds.
onBrokerConnectTKafkaBrokerEventCallback--Callback when a broker connects.
onBrokerDisconnectTKafkaBrokerEventCallback--Callback when a broker disconnects.

Kafka Consumer

The KafkaConsumerHelper wraps @platformatic/kafka Consumer with message processing callbacks, consumer group lifecycle events, and lag monitoring.

typescript
import { KafkaConsumerHelper } from '@venizia/ignis-helpers/kafka';

const consumer = KafkaConsumerHelper.newInstance({
  bootstrapBrokers: ['127.0.0.1:29092'],
  clientId: 'my-consumer',
  groupId: 'my-consumer-group',
  onMessage: async ({ message }) => {
    console.log('Received:', message.value);
    await message.commit();
  },
  onMessageError: ({ error }) => {
    console.error('Error:', error);
  },
  onGroupJoin: ({ groupId, memberId }) => {
    console.log(`Joined ${groupId} as ${memberId}`);
  },
});

IKafkaConsumerOptions

IKafkaConsumerOptions<KeyType, ValueType, HeaderKeyType, HeaderValueType> extends IKafkaConnectionOptions.

OptionTypeDefaultDescription
bootstrapBrokersstring[]--Kafka broker addresses.
clientIdstring--Client identifier.
groupIdstring--Consumer group ID. Required.
identifierstring'kafka-consumer'Scoped logging identifier.
groupProtocol'classic' | 'consumer''classic'Consumer group protocol.
groupInstanceIdstring--Static group membership instance ID.
autocommitboolean | numberfalseAuto-commit offsets. false = manual, true or number = interval.
sessionTimeoutnumber30000Session timeout in milliseconds.
heartbeatIntervalnumber3000Heartbeat interval in milliseconds.
rebalanceTimeoutnumbersessionTimeoutRebalance timeout in milliseconds.
highWaterMarknumber1024Stream high water mark.
minBytesnumber1Minimum bytes to fetch per request.
maxBytesnumber--Maximum bytes to fetch per request.
maxWaitTimenumber--Maximum wait time for fetch in milliseconds.
metadataMaxAgenumber300000Metadata cache max age in milliseconds.
retriesnumber3Number of retries on failure.
retryDelaynumber1000Delay between retries in milliseconds.
shutdownTimeoutnumber30000Graceful shutdown timeout in milliseconds.
deserializersPartial<Deserializers<...>>--Custom key/value/header deserializers.
registrySchemaRegistry<...>--Schema registry for deserialization.
onBrokerConnectTKafkaBrokerEventCallback--Callback when a broker connects.
onBrokerDisconnectTKafkaBrokerEventCallback--Callback when a broker disconnects.
onMessageTKafkaMessageCallback--Message processing callback.
onMessageDoneTKafkaMessageDoneCallback--Callback after message processing completes.
onMessageErrorTKafkaMessageErrorCallback--Callback on message processing error.
onGroupJoinTKafkaGroupJoinCallback--Callback when consumer joins the group.
onGroupLeaveTKafkaGroupLeaveCallback--Callback when consumer leaves the group.
onGroupRebalanceTKafkaGroupRebalanceCallback--Callback on group rebalance.
onHeartbeatErrorTKafkaHeartbeatErrorCallback--Callback on heartbeat error.
onLagTKafkaLagCallback--Callback with lag offset data.
onLagErrorTKafkaLagErrorCallback--Callback on lag monitoring error.

Kafka Admin

The KafkaAdminHelper wraps @platformatic/kafka Admin for topic and cluster management.

typescript
import { KafkaAdminHelper } from '@venizia/ignis-helpers/kafka';

const admin = KafkaAdminHelper.newInstance({
  bootstrapBrokers: ['127.0.0.1:29092'],
  clientId: 'my-admin',
  onBrokerConnect: ({ broker }) => {
    console.log(`Connected to ${broker.host}:${broker.port}`);
  },
});

// Access the full Admin API directly
const adminClient = admin.getAdmin();
await adminClient.createTopics({ topics: ['my-topic'], partitions: 3, replicas: 1 });

IKafkaAdminOptions

OptionTypeDefaultDescription
bootstrapBrokersstring[]--Kafka broker addresses.
clientIdstring--Client identifier.
identifierstring'kafka-admin'Scoped logging identifier.
retriesnumber3Number of retries on failure.
retryDelaynumber1000Delay between retries in milliseconds.
shutdownTimeoutnumber30000Graceful shutdown timeout in milliseconds.
onBrokerConnectTKafkaBrokerEventCallback--Callback when a broker connects.
onBrokerDisconnectTKafkaBrokerEventCallback--Callback when a broker disconnects.

Kafka Schema Registry

The KafkaSchemaRegistryHelper wraps @platformatic/kafka ConfluentSchemaRegistry for Avro/Protobuf/JSON schema integration with producer and consumer helpers.

typescript
import { KafkaSchemaRegistryHelper, KafkaProducerHelper } from '@venizia/ignis-helpers/kafka';

const schemaRegistry = KafkaSchemaRegistryHelper.newInstance({
  url: 'http://localhost:8081',
});

const producer = KafkaProducerHelper.newInstance({
  bootstrapBrokers: ['127.0.0.1:29092'],
  clientId: 'my-producer',
  registry: schemaRegistry.getRegistry(),
});

IKafkaSchemaRegistryOptions

OptionTypeDefaultDescription
urlstring--Schema Registry URL.
identifierstring'kafka-schema-registry'Scoped logging identifier.
authobject--Authentication credentials.
protobufTypeMapperfunction--Protobuf type mapping function.
jsonValidateSendboolean--Validate JSON schemas on send.

Usage

BullMQ -- Adding Jobs

When created with role: 'queue', the helper exposes a queue property (a BullMQ Queue instance) for adding jobs.

typescript
const producer = new BullMQHelper({
  queueName: 'email-queue',
  identifier: 'email-producer',
  role: 'queue',
  redisConnection: redisHelper,
});

// Add a job via the BullMQ Queue API
await producer.queue.add('send-welcome', { email: 'user@example.com', template: 'welcome' });
await producer.queue.add('send-reset', { email: 'user@example.com', token: 'abc123' });

TIP

You can also use the static factory method: BullMQHelper.newInstance({ ... }) which is equivalent to new BullMQHelper({ ... }).

Default Job Options

Jobs are created with these defaults:

typescript
defaultJobOptions: {
  removeOnComplete: true,
  removeOnFail: true,
}

BullMQ -- Processing Jobs

When created with role: 'worker', the helper initializes a BullMQ Worker that listens for jobs on the specified queue.

typescript
const consumer = new BullMQHelper<{ email: string }, { status: string }>({
  queueName: 'email-queue',
  identifier: 'email-consumer',
  role: 'worker',
  redisConnection: redisHelper,
  numberOfWorker: 3,
  lockDuration: 10 * 60 * 1000, // 10 minutes
  onWorkerData: async (job) => {
    await sendEmail(job.data.email);
    return { status: 'sent' };
  },
  onWorkerDataCompleted: async (job, result) => {
    console.log(`Job ${job.id} done:`, result);
  },
  onWorkerDataFail: async (job, error) => {
    console.error(`Job ${job?.id} failed:`, error.message);
  },
});

If onWorkerData is not provided, the worker logs the job's id, name, and data at the info level.

BullMQ -- Redis Cluster

When using Redis Cluster with BullMQ, you must set maxRetriesPerRequest: null on the cluster config -- this is required by BullMQ.

typescript
import { Cluster } from 'ioredis';
import { DefaultRedisHelper } from '@venizia/ignis-helpers';
import { BullMQHelper } from '@venizia/ignis-helpers/bullmq';

const cluster = new Cluster(
  [
    { host: 'node1.redis.example.com', port: 6379 },
    { host: 'node2.redis.example.com', port: 6379 },
    { host: 'node3.redis.example.com', port: 6379 },
  ],
  {
    maxRetriesPerRequest: null,  // Required by BullMQ
    enableReadyCheck: true,
    scaleReads: 'slave',
    redisOptions: {
      password: 'your-password',
      tls: {},
    },
  }
);

const redisHelper = new DefaultRedisHelper({
  scope: 'BullMQ',
  identifier: 'cluster-redis',
  client: cluster,
});

const worker = BullMQHelper.newInstance({
  queueName: 'my-queue',
  identifier: 'cluster-worker',
  role: 'worker',
  redisConnection: redisHelper,
  onWorkerData: async (job) => {
    // process job
  },
});

BullMQ -- Graceful Shutdown

Call close() to gracefully shut down both the worker and queue connections.

typescript
await producer.close();
await consumer.close();

close() calls worker.close() and queue.close() in sequence. If closing fails, it logs the error and re-throws.

MQTT -- Subscribe and Publish

After the client connects to the broker, use subscribe() and publish() for topic-based messaging.

typescript
// Subscribe to multiple topics
await mqttClient.subscribe({ topics: ['sensors/temperature', 'sensors/humidity'] });

// Publish a string message
await mqttClient.publish({ topic: 'sensors/temperature', message: '23.5' });

// Publish a Buffer message
await mqttClient.publish({ topic: 'sensors/raw', message: Buffer.from([0x01, 0x02]) });

NOTE

Both subscribe() and publish() reject with an ApplicationError (status 400) if the MQTT client is not connected. Ensure the connection is established before calling these methods.

MQTT -- Event Handling

The MQTTClientHelper calls configure() automatically during construction. Once connected, the onMessage callback receives messages for all subscribed topics.

typescript
const client = new MQTTClientHelper({
  identifier: 'iot-gateway',
  url: 'mqtt://broker.example.com:1883',
  options: { keepalive: 60 },
  onConnect: () => {
    // Subscribe once connected
    client.subscribe({ topics: ['devices/+/status'] });
  },
  onMessage: ({ topic, message }) => {
    const deviceId = topic.split('/')[1];
    console.log(`Device ${deviceId}:`, message.toString());
  },
  onError: (error) => {
    console.error('Connection error:', error.message);
  },
  onClose: () => {
    console.log('Connection closed');
  },
});

Kafka -- Producing Messages

Use getProducer() to access the underlying @platformatic/kafka Producer and send messages.

typescript
const producer = KafkaProducerHelper.newInstance({
  bootstrapBrokers: ['127.0.0.1:29092'],
  clientId: 'my-producer',
  acks: -1,
});

// Send messages via the underlying producer
await producer.getProducer().send({
  messages: [
    { topic: 'orders', key: 'order-1', value: JSON.stringify({ item: 'widget' }) },
  ],
});

// Health check
producer.isHealthy(); // true when connected to a broker

Kafka -- Consuming Messages

Call start() to begin consuming from topics. Messages are delivered via the onMessage callback.

typescript
const consumer = KafkaConsumerHelper.newInstance({
  bootstrapBrokers: ['127.0.0.1:29092'],
  clientId: 'my-consumer',
  groupId: 'my-group',
  onMessage: async ({ message }) => {
    console.log('Key:', message.key, 'Value:', message.value);
    await message.commit();
  },
  onMessageDone: async ({ message }) => {
    console.log('Processing complete for offset:', message.offset);
  },
  onMessageError: ({ error, message }) => {
    console.error('Processing failed:', error.message);
  },
});

await consumer.start({
  topics: ['orders'],
  mode: 'committed',          // Default: 'committed'
  fallbackMode: 'latest',     // Default: 'latest'
});

IKafkaConsumeStartOptions

OptionTypeDefaultDescription
topicsstring[]--Topics to consume from.
modeMessagesStreamModeValue'committed'Consume mode.
fallbackModeMessagesStreamFallbackModeValue'latest'Fallback mode when no committed offset exists.

Kafka -- Lag Monitoring

Monitor consumer lag to detect processing delays.

typescript
consumer.startLagMonitoring({
  topics: ['orders'],
  interval: 30_000, // Default: 30 seconds
});

// Stop monitoring
consumer.stopLagMonitoring();

Use the onLag and onLagError callbacks to react to lag data:

typescript
const consumer = KafkaConsumerHelper.newInstance({
  // ...
  onLag: ({ lag }) => {
    console.log('Current lag offsets:', lag);
  },
  onLagError: ({ error }) => {
    console.error('Lag monitoring error:', error);
  },
});

Kafka -- Transactions

The KafkaProducerHelper supports exactly-once semantics via transactions. Requires transactionalId and idempotent: true.

typescript
const producer = KafkaProducerHelper.newInstance({
  bootstrapBrokers: ['127.0.0.1:29092'],
  clientId: 'transactional-producer',
  transactionalId: 'my-tx-id',
  idempotent: true,
  acks: -1,
});

const result = await producer.runInTransaction(async ({ send, addConsumer, addOffset }) => {
  return send({
    messages: [
      { topic: 'orders', key: 'o1', value: JSON.stringify({ status: 'created' }) },
    ],
  });
});

The transaction context provides:

MethodDescription
send(opts)Send messages within the transaction. Returns ProduceResult.
addConsumer(consumer)Add a consumer to the transaction for read-process-write patterns.
addOffset(message)Commit consumer offsets as part of the transaction.
transactionThe underlying transaction object for advanced operations.

If the callback throws, the transaction is automatically aborted.

Kafka -- Health Checks

All Kafka helpers provide health checking via BaseKafkaHelper:

typescript
producer.isHealthy();      // true when connected
consumer.isReady();        // true when connected AND active (consuming)
admin.getHealthStatus();   // 'connected' | 'disconnected' | 'unknown'

Kafka -- Graceful Shutdown

All Kafka helpers support graceful shutdown with an optional force flag and configurable timeout (default: 30 seconds).

typescript
// Graceful shutdown (waits up to shutdownTimeout, then forces)
await producer.close();
await consumer.close();
await admin.close();

// Force immediate shutdown
await producer.close({ isForce: true });
await consumer.close({ isForce: true });

For consumers, close() also stops lag monitoring and closes the message stream before closing the client.

In-Memory Queue -- Enqueueing and Processing

With autoDispatch: true (default), elements are processed automatically as they are enqueued.

typescript
import { QueueHelper } from '@venizia/ignis-helpers';

const queue = new QueueHelper<{ task: string; priority: number }>({
  identifier: 'task-processor',
  onMessage: async ({ queueElement }) => {
    console.log('Processing:', queueElement.payload.task);
    await performTask(queueElement.payload);
  },
});

// Elements are processed one at a time, in order
await queue.enqueue({ task: 'resize-image', priority: 1 });
await queue.enqueue({ task: 'send-notification', priority: 2 });

Manual Dispatch

Set autoDispatch: false to control when processing begins. Call nextMessage() to trigger processing of the next element.

typescript
const queue = new QueueHelper<string>({
  identifier: 'manual-queue',
  autoDispatch: false,
  onMessage: async ({ queueElement }) => {
    console.log('Processing:', queueElement.payload);
  },
});

await queue.enqueue('item-1');
await queue.enqueue('item-2');

// Nothing processed yet -- trigger manually
queue.nextMessage(); // processes 'item-1'

NOTE

nextMessage() only triggers processing when the queue state is WAITING. It logs a warning and returns if the queue is in any other state.

In-Memory Queue -- State Machine

The QueueHelper uses a state machine to manage its lifecycle:

WAITING ──enqueue──> PROCESSING ──done──> WAITING
   |                     |
   └──lock()──> LOCKED <─┘
                  |
              unlock()──> WAITING
                  |
              settle()──> SETTLED (terminal)
StateValueDescription
QueueStatuses.WAITING'000_WAITING'Idle, ready to process the next element.
QueueStatuses.PROCESSING'100_PROCESSING'Currently handling a message via onMessage.
QueueStatuses.LOCKED'200_LOCKED'Paused. No new processing until unlock() is called.
QueueStatuses.SETTLED'300_SETTLED'Terminal state. No more elements accepted.

You can validate a state string with QueueStatuses.isValid(state).

In-Memory Queue -- Lock and Unlock

Use lock() / unlock() to pause and resume processing without losing queued elements.

typescript
// Pause the queue (e.g., during maintenance)
queue.lock();

// Elements can still be enqueued while locked,
// but they won't be processed until unlocked
await queue.enqueue('queued-while-locked');

// Resume processing
queue.unlock({ shouldProcessNextElement: true });

// Resume without processing the next element
queue.unlock({ shouldProcessNextElement: false });

lock() logs an error and returns if the queue is already LOCKED or SETTLED.

unlock() logs an error and returns if the queue is SETTLED (past LOCKED state).

In-Memory Queue -- Settling and Closing

Once settled, the queue rejects new elements and transitions to SETTLED after all in-flight work completes.

typescript
// Signal that no more elements will be added
queue.settle();

// Check if the queue is settled and empty
if (queue.isSettled()) {
  console.log('All work done, total events:', queue.getTotalEvent());
}

// Or close entirely (settle + terminate generator)
queue.close();

settle() sets isSettleRequested to true. If the queue is not currently processing, it immediately transitions to SETTLED. If processing, it transitions to SETTLED after the current message completes and the storage is empty.

close() calls settle() and then terminates the internal generator via generator.return().

API Summary

BullMQHelper

MethodReturnsDescription
static newInstance(opts)BullMQHelperFactory method, equivalent to new BullMQHelper(opts).
configureQueue()voidSets up the BullMQ Queue instance. Called automatically for role: 'queue'.
configureWorker()voidSets up the BullMQ Worker instance. Called automatically for role: 'worker'.
configure()voidDelegates to configureQueue() or configureWorker() based on the role.
close()Promise<void>Gracefully closes the worker and queue connections.

Properties

PropertyTypeDescription
queueQueue<TQueueElement, TQueueResult>BullMQ Queue instance (available when role: 'queue').
workerWorker<TQueueElement, TQueueResult>BullMQ Worker instance (available when role: 'worker').

KafkaProducerHelper

MethodReturnsDescription
static newInstance(opts)KafkaProducerHelperFactory method.
getProducer()ProducerAccess the underlying @platformatic/kafka Producer.
runInTransaction(callback)Promise<ResultType>Execute a callback within a Kafka transaction. Auto-commits or aborts.
isHealthy()booleanReturns true when connected to a broker.
getHealthStatus()TKafkaHealthStatusReturns 'connected', 'disconnected', or 'unknown'.
close(opts?)Promise<void>Graceful shutdown. opts: { isForce?: boolean }

KafkaConsumerHelper

MethodReturnsDescription
static newInstance(opts)KafkaConsumerHelperFactory method.
getConsumer()ConsumerAccess the underlying @platformatic/kafka Consumer.
getStream()MessagesStream | nullAccess the current message stream (null if not started).
start(opts)Promise<void>Start consuming. opts: { topics, mode?, fallbackMode? }
isReady()booleanReturns true when connected AND actively consuming.
isHealthy()booleanReturns true when connected to a broker.
getHealthStatus()TKafkaHealthStatusReturns 'connected', 'disconnected', or 'unknown'.
startLagMonitoring(opts)voidStart lag monitoring. opts: { topics, interval? }
stopLagMonitoring()voidStop lag monitoring.
close(opts?)Promise<void>Graceful shutdown. Stops monitoring, closes stream, then client.

KafkaAdminHelper

MethodReturnsDescription
static newInstance(opts)KafkaAdminHelperFactory method.
getAdmin()AdminAccess the underlying @platformatic/kafka Admin.
isHealthy()booleanReturns true when connected to a broker.
getHealthStatus()TKafkaHealthStatusReturns 'connected', 'disconnected', or 'unknown'.
close(opts?)Promise<void>Graceful shutdown. opts: { isForce?: boolean }

KafkaSchemaRegistryHelper

MethodReturnsDescription
static newInstance(opts)KafkaSchemaRegistryHelperFactory method.
getRegistry()ConfluentSchemaRegistryAccess the underlying schema registry.
getSerializers()SerializersGet serializers for use with a producer.
getDeserializers()DeserializersGet deserializers for use with a consumer.

MQTTClientHelper

MethodReturnsDescription
configure()voidConnects to the MQTT broker. Called automatically by the constructor.
subscribe(opts)Promise<string[]>Subscribe to one or more topics. opts: { topics: string[] }
publish(opts)Promise<{ topic, message }>Publish a message to a topic. opts: { topic: string; message: string | Buffer }

QueueHelper

MethodReturnsDescription
enqueue(payload)Promise<void>Add an element to the queue. Rejected if settled.
dequeue()TQueueElement<T> | undefinedRemove and return the first element.
nextMessage()voidManually trigger processing of the next element. Only works in WAITING state.
lock()voidPause processing. State becomes LOCKED.
unlock(opts)voidResume processing. opts: { shouldProcessNextElement?: boolean } (default: true).
settle()voidMark queue as settled. No new elements accepted after this.
isSettled()booleanReturns true if state is SETTLED and storage is empty.
close()voidSettle the queue and terminate the internal generator.
getElementAt(position)TQueueElement<T>Peek at an element by index.
getState()TQueueStatusReturns the current queue state.
getTotalEvent()numberReturns the total number of elements ever enqueued.
getProcessingEvents()Set<TQueueElement<T>>Returns the set of currently processing elements.

Troubleshooting

"Invalid queue name"

Cause: The queueName option is empty or falsy when creating a BullMQ queue or worker.

Fix: Provide a non-empty queueName:

typescript
// Wrong
new BullMQHelper({ queueName: '', role: 'queue', ... });

// Correct
new BullMQHelper({ queueName: 'my-email-queue', role: 'queue', ... });

"Invalid client role to configure"

Cause: The role option is missing or not one of 'queue' / 'worker'.

Fix: Set role to either 'queue' or 'worker':

typescript
// Wrong
new BullMQHelper({ role: undefined as any, ... });

// Correct
new BullMQHelper({ role: 'worker', ... });

"Invalid url to configure mqtt client!"

Cause: The url option is empty when constructing an MQTTClientHelper. Throws an ApplicationError with status 500.

Fix: Pass a valid MQTT broker URL:

typescript
// Wrong
new MQTTClientHelper({ url: '', ... });

// Correct
new MQTTClientHelper({ url: 'mqtt://localhost:1883', ... });

"MQTT Client is not available to subscribe topic!" / "MQTT Client is not available to publish message!"

Cause: subscribe() or publish() was called before the MQTT client finished connecting, or after the client disconnected. Throws an ApplicationError with status 400.

Fix: Wait for the onConnect callback before subscribing or publishing, or verify the client is connected:

typescript
const client = new MQTTClientHelper({
  identifier: 'my-client',
  url: 'mqtt://localhost:1883',
  options: {},
  onConnect: () => {
    // Safe to subscribe/publish here
    client.subscribe({ topics: ['my/topic'] });
  },
  onMessage: ({ topic, message }) => { /* ... */ },
});

Elements not processing in In-Memory Queue

Cause: Multiple possible reasons why onMessage is never called.

Checklist:

  • Verify onMessage callback is provided -- the generator logs a warning and exits if missing
  • Check if the queue is locked -- call unlock({ shouldProcessNextElement: true }) to resume
  • Check if autoDispatch is false -- call nextMessage() manually after each enqueue()
  • Check if the queue is settled -- a settled queue rejects new elements; create a new QueueHelper instance

"Queue was SETTLED | No more element acceptable"

Cause: enqueue() was called after settle() or close().

Fix: Create a new QueueHelper instance if you need to continue processing:

typescript
queue.close();

// Start a new queue for further work
const newQueue = new QueueHelper<string>({
  identifier: 'task-queue-v2',
  onMessage: async ({ queueElement }) => { /* ... */ },
});

See Also