Skip to content

Admin

The KafkaAdminHelper wraps @platformatic/kafka's Admin with health tracking, graceful shutdown, and broker event callbacks. Use getAdmin() to access the full Admin API directly.

typescript
class KafkaAdminHelper extends BaseKafkaHelper<Admin>

NOTE

KafkaAdminHelper has no generic type parameters -- the Admin client does not deal with serialized messages.

Helper API

MethodSignatureDescription
newInstance(opts)static newInstance(opts): KafkaAdminHelperFactory method
getAdmin()(): AdminAccess the underlying Admin
isHealthy()(): booleantrue when broker connected
isReady()(): booleanSame as isHealthy()
getHealthStatus()(): TKafkaHealthStatus'connected' | 'disconnected' | 'unknown'
close(opts?)(opts?: { isForce?: boolean }): Promise<void>Close the admin connection (default: graceful)

IKafkaAdminOptions

typescript
interface IKafkaAdminOptions extends IKafkaConnectionOptions {
  identifier?: string;       // Default: 'kafka-admin'
  shutdownTimeout?: number;  // Default: 30000ms
  onBrokerConnect?: TKafkaBrokerEventCallback;
  onBrokerDisconnect?: TKafkaBrokerEventCallback;
}

Plus all Connection Options.

Basic Example

typescript
import { KafkaAdminHelper } from '@venizia/ignis-helpers/kafka';

const helper = KafkaAdminHelper.newInstance({
  bootstrapBrokers: ['localhost:9092'],
  clientId: 'my-admin',
  onBrokerConnect: ({ broker }) => console.log(`Connected to ${broker.host}:${broker.port}`),
  onBrokerDisconnect: ({ broker }) => console.log(`Disconnected from ${broker.host}`),
});

const admin = helper.getAdmin();

// Create a topic
await admin.createTopics({ topics: ['orders'], partitions: 3, replicas: 2 });

// List all topics
const topics = await admin.listTopics();

// Health check
helper.isHealthy(); // true when connected

// Graceful close
await helper.close();

// Or force close
await helper.close({ isForce: true });

Graceful Shutdown

close() uses the base closeClient() (which calls this.client.close()) with a graceful timeout. If the graceful close exceeds shutdownTimeout (default 30s), it automatically force-closes. After close(), healthStatus is set to 'disconnected'.

typescript
// Graceful (recommended)
await helper.close();

// Force
await helper.close({ isForce: true });

API Reference (@platformatic/kafka)

After calling helper.getAdmin(), you have full access to the Admin class.

Topic Management

MethodSignatureDescription
createTopics(opts)(opts: { topics: string[], partitions?: number, replicas?: number, configs?: Config[] }): Promise<CreatedTopic[]>Create topics
deleteTopics(opts)(opts: { topics: string[] }): Promise<void>Delete topics
listTopics(opts?)(opts?: { includeInternals?: boolean }): Promise<string[]>List all topics
createPartitions(opts)(opts: { topics: CreatePartitionsRequestTopic[], validateOnly?: boolean }): Promise<void>Add partitions to existing topics
deleteRecords(opts)(opts: { topics: { name, partitions: { partition, offset }[] }[] }): Promise<DeletedRecordsTopic[]>Delete records up to offset
typescript
// Create with custom configuration
await admin.createTopics({
  topics: ['orders'],
  partitions: 6,
  replicas: 3,
  configs: [
    { name: 'retention.ms', value: '604800000' },   // 7 days
    { name: 'cleanup.policy', value: 'compact' },
    { name: 'compression.type', value: 'zstd' },
  ],
});

// Add partitions (can only increase, never decrease)
await admin.createPartitions({
  topics: [{ name: 'orders', count: 12 }],
});

// Delete records before offset 1000 on partition 0
await admin.deleteRecords({
  topics: [{ name: 'orders', partitions: [{ partition: 0, offset: 1000n }] }],
});

Consumer Group Management

MethodSignatureDescription
listGroups(opts?)(opts?: { states?: string[], types?: string[] }): Promise<Map<string, GroupBase>>List consumer groups
describeGroups(opts)(opts: { groups: string[] }): Promise<Map<string, Group>>Describe consumer groups (members, assignments)
deleteGroups(opts)(opts: { groups: string[] }): Promise<void>Delete consumer groups
removeMembersFromConsumerGroup(opts)(opts: { groupId, members? }): Promise<void>Remove specific members
typescript
// List all active groups
const groups = await admin.listGroups({ states: ['STABLE'] });

// Describe group members and partition assignments
const details = await admin.describeGroups({ groups: ['order-processing'] });
for (const [groupId, group] of details) {
  console.log(`Group: ${groupId}, State: ${group.state}`);
  for (const [memberId, member] of group.members) {
    console.log(`  Member: ${member.clientId} (${member.clientHost})`);
  }
}

Offset Management

MethodSignatureDescription
listOffsets(opts)(opts): Promise<ListedOffsetsTopic[]>List partition offsets at timestamps
listConsumerGroupOffsets(opts)(opts: { groups }): Promise<ListConsumerGroupOffsetsGroup[]>List committed offsets for groups
alterConsumerGroupOffsets(opts)(opts: { groupId, topics }): Promise<void>Reset/alter committed offsets
deleteConsumerGroupOffsets(opts)(opts: { groupId, topics }): Promise<...>Delete committed offsets
typescript
// Reset consumer group offsets to earliest
await admin.alterConsumerGroupOffsets({
  groupId: 'order-processing',
  topics: [{
    name: 'orders',
    partitionOffsets: [
      { partition: 0, offset: 0n },
      { partition: 1, offset: 0n },
      { partition: 2, offset: 0n },
    ],
  }],
});

Configuration Management

MethodSignatureDescription
describeConfigs(opts)(opts: { resources, includeSynonyms?, includeDocumentation? }): Promise<ConfigDescription[]>Describe broker/topic configurations
alterConfigs(opts)(opts: { resources, validateOnly? }): Promise<void>Replace topic/broker configs
incrementalAlterConfigs(opts)(opts: { resources, validateOnly? }): Promise<void>Incrementally modify configs

ACL Management

MethodSignatureDescription
createAcls(opts)(opts: { creations: Acl[] }): Promise<void>Create access control lists
describeAcls(opts)(opts: { filter: AclFilter }): Promise<DescribeAclsResponseResource[]>Describe ACLs
deleteAcls(opts)(opts: { filters: AclFilter[] }): Promise<Acl[]>Delete ACLs

Quota Management

MethodSignatureDescription
describeClientQuotas(opts)(opts): Promise<DescribeClientQuotasResponseEntry[]>Describe client quotas
alterClientQuotas(opts)(opts): Promise<AlterClientQuotasResponseEntries[]>Alter client quotas

Log Management

MethodSignatureDescription
describeLogDirs(opts)(opts: { topics }): Promise<BrokerLogDirDescription[]>Describe broker log directories