Skip to content

WebSocket -- Usage & Examples

Server-side usage patterns, WebSocket Emitter, wire protocol, client tracking, Redis channel architecture, authentication flow, and delivery strategy.

Using in Services/Controllers

Inject WebSocketServerHelper to interact with WebSocket:

typescript
import {
  BaseService,
  inject,
  CoreBindings,
  BaseApplication,
} from '@venizia/ignis';
import { WebSocketBindingKeys } from '@venizia/ignis/websocket';
import { WebSocketServerHelper } from '@venizia/ignis-helpers';

export class NotificationService extends BaseService {
  // Lazy getter pattern -- helper is bound AFTER server starts
  private _ws: WebSocketServerHelper | null = null;

  constructor(
    @inject({ key: CoreBindings.APPLICATION_INSTANCE })
    private application: BaseApplication,
  ) {
    super({ scope: NotificationService.name });
  }

  private get ws(): WebSocketServerHelper {
    if (!this._ws) {
      this._ws = this.application.get<WebSocketServerHelper>({
        key: WebSocketBindingKeys.WEBSOCKET_INSTANCE,
        isOptional: true,
      }) ?? null;
    }

    if (!this._ws) {
      throw new Error('WebSocket not initialized');
    }

    return this._ws;
  }

  // Send to a specific client
  notifyClient(opts: { clientId: string; message: string }) {
    this.ws.send({
      destination: opts.clientId,
      payload: {
        topic: 'notification',
        data: { message: opts.message, time: new Date().toISOString() },
      },
    });
  }

  // Send to all sessions of a user (local instance only)
  notifyUser(opts: { userId: string; message: string }) {
    this.ws.sendToUser({
      userId: opts.userId,
      event: 'notification',
      data: { message: opts.message },
    });
  }

  // Send to a room
  notifyRoom(opts: { room: string; message: string }) {
    this.ws.send({
      destination: opts.room,
      payload: {
        topic: 'room:update',
        data: { message: opts.message },
      },
    });
  }

  // Broadcast to all clients
  broadcastAnnouncement(opts: { message: string }) {
    this.ws.send({
      payload: {
        topic: 'system:announcement',
        data: { message: opts.message },
      },
    });
  }
}

IMPORTANT

Lazy getter pattern: Since WebSocketServerHelper is bound via a post-start hook, it is not available during DI construction. Use a lazy getter that resolves from the application container on first access.

WARNING

send() does not support cross-instance user targeting. The send() method resolves destination by checking local clients map then local rooms map. There is no USER type in send(). To reach all sessions of a user across instances, use sendToUser() for local delivery or WebSocketEmitter.toUser() for Redis-based cross-instance delivery.

WebSocket Emitter

WebSocketEmitter is a standalone, lightweight Redis-only publisher for sending WebSocket messages from processes that do not run a WebSocket server -- such as background workers, cron jobs, microservices, or CLI scripts.

It connects to Redis and publishes messages using the same IRedisSocketMessage envelope that WebSocketServerHelper listens for, so all connected server instances will receive and deliver the messages to their local clients.

When to Use WebSocketEmitter

ScenarioUse
Send from a controller or service in the main appWebSocketServerHelper (injected via DI)
Send from a background worker or cron jobWebSocketEmitter
Send from a separate microserviceWebSocketEmitter
Broadcast from a CLI scriptWebSocketEmitter

Emitter Setup

typescript
import { WebSocketEmitter, RedisHelper } from '@venizia/ignis-helpers';

// 1. Create a Redis connection (same Redis instance as the WebSocket server)
const redisHelper = new RedisHelper({
  name: 'emitter-redis',
  host: process.env.REDIS_HOST ?? 'localhost',
  port: +(process.env.REDIS_PORT ?? 6379),
  password: process.env.REDIS_PASSWORD,
  autoConnect: false,
});

// 2. Create the emitter
const emitter = new WebSocketEmitter({
  identifier: 'my-worker-emitter', // Optional, defaults to 'WebSocketEmitter'
  redisConnection: redisHelper,
});

// 3. Configure (connects Redis pub client)
await emitter.configure();

Sending Messages

typescript
// Send to a specific client by ID
await emitter.toClient({
  clientId: 'uuid-of-client',
  event: 'job:progress',
  data: { jobId: '123', progress: 75 },
});

// Send to all sessions of a user (cross-instance)
await emitter.toUser({
  userId: 'user-456',
  event: 'notification',
  data: { message: 'Your report is ready' },
});

// Send to a room
await emitter.toRoom({
  room: 'dashboard-viewers',
  event: 'data:update',
  data: { metric: 'cpu', value: 42.5 },
  exclude: ['client-id-to-skip'], // Optional: exclude specific clients
});

// Broadcast to all connected, authenticated clients
await emitter.broadcast({
  event: 'system:maintenance',
  data: { message: 'Scheduled maintenance in 10 minutes' },
});

Shutdown

typescript
// Always shut down when done to release the Redis connection
await emitter.shutdown();

NOTE

The emitter uses a fixed serverId of 'emitter' instead of a random UUID. This means all server instances will process emitter messages (none will self-dedup). The emitter only needs a single Redis client (pub), not two (pub + sub) like the server helper.

TIP

WebSocketEmitter.toUser() publishes to the ws:user:{userId} Redis channel. All server instances subscribed via psubscribe('ws:user:*') will receive it and call sendToUser() locally, reaching every session of that user across all instances. This is the recommended way to send to a user from outside the main application process.

Wire Protocol

Client-Server Message Format

All messages exchanged between client and server follow the IWebSocketMessage envelope:

typescript
interface IWebSocketMessage<DataType = unknown> {
  event: string;    // Event name (system or custom)
  data?: DataType;  // Payload data
  id?: string;      // Optional message ID (application-defined)
}

Messages are serialized as JSON strings over the WebSocket connection. The event field is required -- messages without it are logged and dropped.

System Events

EventDirectionPayloadDescription
authenticateClient --> ServerAuth credentials ({ type, token, publicKey? })Client sends credentials after connection opens
connectedServer --> Client{ id, userId, time, serverPublicKey?, salt? }Sent after successful authentication
disconnectBoth--Connection closing
joinClient --> Server{ rooms: string[] }Request to join rooms
leaveClient --> Server{ rooms: string[] }Request to leave rooms
errorServer --> Client{ message: string }Error notification
heartbeatClient --> Server--Keep-alive ping (client sends, server updates lastActivity)
encryptedBothVariesEncryption handshake data

NOTE

The heartbeat event is handled specially -- it updates the client's lastActivity timestamp and returns immediately without triggering any callbacks. Clients must send heartbeats within the heartbeatTimeout interval to avoid being disconnected with code 4002.

Close Codes

CodeReasonTrigger
1001Server shutting downwsHelper.shutdown()
4001Authentication timeoutClient did not send authenticate within authTimeout, or authenticateFn did not complete within authTimeout * 3
4002Heartbeat timeoutNo messages received within heartbeatTimeout
4003Authentication failedauthenticateFn returned null/false or threw an exception
4004Encryption requiredrequireEncryption: true and either no handshakeFn configured or handshakeFn returned null/false

Redis Message Envelope

Cross-instance messages are published via Redis Pub/Sub using the IRedisSocketMessage envelope:

typescript
interface IRedisSocketMessage<DataType = unknown> {
  serverId: string;                // Source server instance ID (UUID or 'emitter')
  type: TWebSocketMessageType;     // 'client' | 'user' | 'room' | 'broadcast'
  target?: string;                 // Target clientId / userId / room name
  event: string;                   // Event to deliver
  data: DataType;                  // Payload
  exclude?: string[];              // Client IDs to exclude from delivery
}

Messages from the same serverId are ignored (self-dedup) -- the sending server already delivered locally before publishing to Redis. Messages from the WebSocketEmitter use serverId = 'emitter', which never matches any server's UUID, so all servers process them.

Message Types

TypeChannel PatternDescription
clientws:client:{clientId}Direct to specific client
userws:user:{userId}To all clients of a user
roomws:room:{roomName}To all clients in a room
broadcastws:broadcastTo all connected, authenticated clients

Client Tracking

IWebSocketClient Interface

Each connected client is tracked in an in-memory Map<string, IWebSocketClient>:

typescript
interface IWebSocketClient<
  MetadataType extends Record<string, unknown> = Record<string, unknown>,
> {
  id: string;                           // Unique client ID (UUID, assigned during upgrade)
  userId?: string;                      // Set after authentication
  socket: IWebSocket;                   // Bun native WebSocket reference
  state: TWebSocketClientState;         // 'unauthorized' | 'authenticating' | 'authenticated' | 'disconnected'
  rooms: Set<string>;                   // Joined rooms (including default rooms and own clientId room)
  backpressured: boolean;               // True when socket.send() returns -1 (Bun backpressure)
  encrypted: boolean;                   // Whether client has completed encryption handshake
  connectedAt: number;                  // Connection timestamp (Date.now())
  lastActivity: number;                 // Last heartbeat/message timestamp (Date.now())
  metadata?: MetadataType;              // Custom metadata from authenticateFn return value
  serverPublicKey?: string;             // ECDH public key (set if encrypted)
  salt?: string;                        // Encryption salt (set if encrypted)
  authTimer?: ReturnType<typeof setTimeout>; // Auth timeout timer (cleared after auth)
}

Client State Transitions

UNAUTHORIZED --(authenticate event)--> AUTHENTICATING
                                            |
                              +-------------+-------------+
                              |             |             |
                         auth fails    auth succeeds   timeout
                              |             |             |
                              v             v             v
                        DISCONNECTED   AUTHENTICATED  DISCONNECTED
                                            |
                                    (close / heartbeat timeout)
                                            |
                                            v
                                       DISCONNECTED

States are defined in the WebSocketClientStates constant class:

typescript
class WebSocketClientStates {
  static readonly UNAUTHORIZED = 'unauthorized';
  static readonly AUTHENTICATING = 'authenticating';
  static readonly AUTHENTICATED = 'authenticated';
  static readonly DISCONNECTED = 'disconnected';
}

Tracking Maps

The server maintains three index maps for efficient lookups:

MapKeyValuePurpose
clientsclientIdIWebSocketClientAll connected clients
usersuserIdSet<clientId>Multi-session user index
roomsroomSet<clientId>Room membership index

TIP

A single user can have multiple client connections (e.g., browser tab + mobile). Use getClientsByUser({ userId }) to reach all sessions. The users map entry is automatically cleaned up when the last client for a user disconnects.

Redis Channel Architecture

WebSocketChannels Class

typescript
class WebSocketChannels {
  // --- Static channel names ---
  static readonly BROADCAST = 'ws:broadcast';
  static readonly ROOM_PREFIX = 'ws:room:';
  static readonly CLIENT_PREFIX = 'ws:client:';
  static readonly USER_PREFIX = 'ws:user:';

  // --- Channel builders ---
  static forRoom(opts: { room: string }): string;     // 'ws:room:{room}'
  static forClient(opts: { clientId: string }): string; // 'ws:client:{clientId}'
  static forUser(opts: { userId: string }): string;     // 'ws:user:{userId}'

  // --- Pattern builders (for Redis PSUBSCRIBE) ---
  static forRoomPattern(): string;   // 'ws:room:*'
  static forClientPattern(): string; // 'ws:client:*'
  static forUserPattern(): string;   // 'ws:user:*'
}

Redis Client Type

Both WebSocketServerHelper and WebSocketEmitter support Redis single instance and Redis Cluster:

typescript
type TRedisClient = Redis | Cluster;

The Redis client is obtained via redisConnection.getClient().duplicate(). The duplicate() call creates a fresh connection that inherits the parent's configuration (including cluster mode). This ensures WebSocket pub/sub traffic does not interfere with application Redis usage.

Subscription Setup

During configure(), the server subscribes to all channels:

typescript
// Direct subscribe (exact match)
redisSub.subscribe(WebSocketChannels.BROADCAST);            // 'ws:broadcast'

// Pattern subscribe (wildcard match)
redisSub.psubscribe(WebSocketChannels.forRoomPattern());    // 'ws:room:*'
redisSub.psubscribe(WebSocketChannels.forClientPattern());  // 'ws:client:*'
redisSub.psubscribe(WebSocketChannels.forUserPattern());    // 'ws:user:*'

NOTE

Redis PSUBSCRIBE uses pattern matching -- a message published to ws:room:chat-general is received by all servers subscribed to ws:room:*. This allows the server to receive messages for any room without knowing room names in advance.

Message Flow (Cross-Instance)

Server A                          Redis                         Server B
   |                                |                               |
   |-- send({ destination: room }) -|                               |
   |   1. sendToRoom() locally      |                               |
   |   2. publishToRedis() -------->|-- ws:room:chat ------>        |
   |                                |                    onRedisMessage()
   |                                |                      |-- skip if serverId === own
   |                                |                      +-- sendToRoom() locally

Message Flow (Emitter to Servers)

WebSocketEmitter                  Redis                    Server A + Server B
   |                                |                               |
   |-- toUser({ userId }) -------->|-- ws:user:u1 -------->        |
   |   serverId = 'emitter'        |                    onRedisMessage()
   |                                |                      |-- serverId !== own -> process
   |                                |                      +-- sendToUser() locally

Authentication Flow

Client                          Server
  |                                |
  |-- WS upgrade request -------->|
  |<-- 101 Switching Protocols ---|  (Bun handles upgrade)
  |                                |-- onClientConnect()
  |                                |   state = UNAUTHORIZED
  |                                |   subscribe(clientId)  <-- Bun topic for direct messaging
  |                                |   start authTimer (5s default)
  |                                |
  |-- { event: 'authenticate',    |
  |     data: { token: '...' } } >|-- handleAuthenticate()
  |                                |   state = AUTHENTICATING
  |                                |   replace timer with authTimeout * 3
  |                                |   await authenticateFn(data)
  |                                |     |
  |                                |   (if requireEncryption)
  |                                |     await handshakeFn(data)
  |                                |     enableClientEncryption()
  |                                |   state = AUTHENTICATED
  |                                |   index by userId
  |                                |   subscribe(BROADCAST_TOPIC)  <-- unless encrypted
  |                                |   joinRoom(clientId)          <-- auto-join own ID as room
  |                                |   joinRoom(default rooms)
  |                                |
  |<-- { event: 'connected',      |
  |      data: { id, userId,      |
  |        time, serverPublicKey?, |
  |        salt? } } -------------|
  |                                |-- clientConnectedFn()

Authentication Timeout Details

There are two timeout phases:

  1. Initial timeout (authTimeout, default 5 s): Starts when the client connects. If the client does not send an authenticate event within this window, the socket is closed with code 4001.

  2. In-progress timeout (authTimeout * 3, default 15 s): Replaces the initial timer when the authenticate event is received. This provides a longer window for the async authenticateFn (and optionally handshakeFn) to complete. If authentication does not finish within this window, the socket is closed with code 4001.

Client ID Auto-Join

After successful authentication, the server calls joinRoom({ clientId, room: clientId }). This means the client's own ID is registered as both a Bun native topic subscription (set during onClientConnect) and an application-level room. This enables targeting a specific client via send({ destination: clientId }) or sendToRoom({ room: clientId }).

Bun Topic Subscription Timing

TopicSubscribed AtCondition
Client's own clientIdonClientConnect() (before auth)Always
BROADCAST_TOPIChandleAuthenticate() (after auth)Only if !client.encrypted
Default roomshandleAuthenticate() (after auth, via joinRoom())Only if !client.encrypted
Custom roomshandleJoin() (on client request)Only if !client.encrypted

Encrypted clients are never subscribed to Bun native topics (except clientId which is set before encryption status is known). All delivery to encrypted clients goes through the per-client outboundTransformer path.

Delivery Strategy

The helper uses a dual delivery strategy depending on whether encryption is active:

Without encryption (fast path):

  • Room/broadcast messages use Bun's native server.publish(topic, payload) -- O(1) C++ fan-out
  • Client-direct messages use socket.send() directly
  • Zero JavaScript iteration for room fan-out

With encryption (per-client path):

  • Encrypted clients are unsubscribed from all Bun native topics (enableClientEncryption())
  • Room/broadcast messages iterate clients individually, running each through outboundTransformer
  • Uses executePromiseWithLimit({ tasks, limit: encryptedBatchLimit }) for concurrency control
  • Non-encrypted clients in the same room still use the Bun fast path

With exclude parameter:

  • When exclude is provided in sendToRoom() or broadcast(), the fast path is bypassed even without encryption
  • The server iterates all clients, skipping those in the exclude set

IMPORTANT

When an outboundTransformer is bound, all room/broadcast messages fall back to the per-client iteration path (even for non-encrypted clients in the same room). This is because Bun native pub/sub cannot selectively apply transformations. Only bind outboundTransformer when you actually need per-client message transformation.

See Also