WebSocket -- Usage & Examples
Server-side usage patterns, WebSocket Emitter, wire protocol, client tracking, Redis channel architecture, authentication flow, and delivery strategy.
Using in Services/Controllers
Inject WebSocketServerHelper to interact with WebSocket:
import {
BaseService,
inject,
CoreBindings,
BaseApplication,
} from '@venizia/ignis';
import { WebSocketBindingKeys } from '@venizia/ignis/websocket';
import { WebSocketServerHelper } from '@venizia/ignis-helpers';
export class NotificationService extends BaseService {
// Lazy getter pattern -- helper is bound AFTER server starts
private _ws: WebSocketServerHelper | null = null;
constructor(
@inject({ key: CoreBindings.APPLICATION_INSTANCE })
private application: BaseApplication,
) {
super({ scope: NotificationService.name });
}
private get ws(): WebSocketServerHelper {
if (!this._ws) {
this._ws = this.application.get<WebSocketServerHelper>({
key: WebSocketBindingKeys.WEBSOCKET_INSTANCE,
isOptional: true,
}) ?? null;
}
if (!this._ws) {
throw new Error('WebSocket not initialized');
}
return this._ws;
}
// Send to a specific client
notifyClient(opts: { clientId: string; message: string }) {
this.ws.send({
destination: opts.clientId,
payload: {
topic: 'notification',
data: { message: opts.message, time: new Date().toISOString() },
},
});
}
// Send to all sessions of a user (local instance only)
notifyUser(opts: { userId: string; message: string }) {
this.ws.sendToUser({
userId: opts.userId,
event: 'notification',
data: { message: opts.message },
});
}
// Send to a room
notifyRoom(opts: { room: string; message: string }) {
this.ws.send({
destination: opts.room,
payload: {
topic: 'room:update',
data: { message: opts.message },
},
});
}
// Broadcast to all clients
broadcastAnnouncement(opts: { message: string }) {
this.ws.send({
payload: {
topic: 'system:announcement',
data: { message: opts.message },
},
});
}
}IMPORTANT
Lazy getter pattern: Since WebSocketServerHelper is bound via a post-start hook, it is not available during DI construction. Use a lazy getter that resolves from the application container on first access.
WARNING
send() does not support cross-instance user targeting. The send() method resolves destination by checking local clients map then local rooms map. There is no USER type in send(). To reach all sessions of a user across instances, use sendToUser() for local delivery or WebSocketEmitter.toUser() for Redis-based cross-instance delivery.
WebSocket Emitter
WebSocketEmitter is a standalone, lightweight Redis-only publisher for sending WebSocket messages from processes that do not run a WebSocket server -- such as background workers, cron jobs, microservices, or CLI scripts.
It connects to Redis and publishes messages using the same IRedisSocketMessage envelope that WebSocketServerHelper listens for, so all connected server instances will receive and deliver the messages to their local clients.
When to Use WebSocketEmitter
| Scenario | Use |
|---|---|
| Send from a controller or service in the main app | WebSocketServerHelper (injected via DI) |
| Send from a background worker or cron job | WebSocketEmitter |
| Send from a separate microservice | WebSocketEmitter |
| Broadcast from a CLI script | WebSocketEmitter |
Emitter Setup
import { WebSocketEmitter, RedisHelper } from '@venizia/ignis-helpers';
// 1. Create a Redis connection (same Redis instance as the WebSocket server)
const redisHelper = new RedisHelper({
name: 'emitter-redis',
host: process.env.REDIS_HOST ?? 'localhost',
port: +(process.env.REDIS_PORT ?? 6379),
password: process.env.REDIS_PASSWORD,
autoConnect: false,
});
// 2. Create the emitter
const emitter = new WebSocketEmitter({
identifier: 'my-worker-emitter', // Optional, defaults to 'WebSocketEmitter'
redisConnection: redisHelper,
});
// 3. Configure (connects Redis pub client)
await emitter.configure();Sending Messages
// Send to a specific client by ID
await emitter.toClient({
clientId: 'uuid-of-client',
event: 'job:progress',
data: { jobId: '123', progress: 75 },
});
// Send to all sessions of a user (cross-instance)
await emitter.toUser({
userId: 'user-456',
event: 'notification',
data: { message: 'Your report is ready' },
});
// Send to a room
await emitter.toRoom({
room: 'dashboard-viewers',
event: 'data:update',
data: { metric: 'cpu', value: 42.5 },
exclude: ['client-id-to-skip'], // Optional: exclude specific clients
});
// Broadcast to all connected, authenticated clients
await emitter.broadcast({
event: 'system:maintenance',
data: { message: 'Scheduled maintenance in 10 minutes' },
});Shutdown
// Always shut down when done to release the Redis connection
await emitter.shutdown();NOTE
The emitter uses a fixed serverId of 'emitter' instead of a random UUID. This means all server instances will process emitter messages (none will self-dedup). The emitter only needs a single Redis client (pub), not two (pub + sub) like the server helper.
TIP
WebSocketEmitter.toUser() publishes to the ws:user:{userId} Redis channel. All server instances subscribed via psubscribe('ws:user:*') will receive it and call sendToUser() locally, reaching every session of that user across all instances. This is the recommended way to send to a user from outside the main application process.
Wire Protocol
Client-Server Message Format
All messages exchanged between client and server follow the IWebSocketMessage envelope:
interface IWebSocketMessage<DataType = unknown> {
event: string; // Event name (system or custom)
data?: DataType; // Payload data
id?: string; // Optional message ID (application-defined)
}Messages are serialized as JSON strings over the WebSocket connection. The event field is required -- messages without it are logged and dropped.
System Events
| Event | Direction | Payload | Description |
|---|---|---|---|
authenticate | Client --> Server | Auth credentials ({ type, token, publicKey? }) | Client sends credentials after connection opens |
connected | Server --> Client | { id, userId, time, serverPublicKey?, salt? } | Sent after successful authentication |
disconnect | Both | -- | Connection closing |
join | Client --> Server | { rooms: string[] } | Request to join rooms |
leave | Client --> Server | { rooms: string[] } | Request to leave rooms |
error | Server --> Client | { message: string } | Error notification |
heartbeat | Client --> Server | -- | Keep-alive ping (client sends, server updates lastActivity) |
encrypted | Both | Varies | Encryption handshake data |
NOTE
The heartbeat event is handled specially -- it updates the client's lastActivity timestamp and returns immediately without triggering any callbacks. Clients must send heartbeats within the heartbeatTimeout interval to avoid being disconnected with code 4002.
Close Codes
| Code | Reason | Trigger |
|---|---|---|
1001 | Server shutting down | wsHelper.shutdown() |
4001 | Authentication timeout | Client did not send authenticate within authTimeout, or authenticateFn did not complete within authTimeout * 3 |
4002 | Heartbeat timeout | No messages received within heartbeatTimeout |
4003 | Authentication failed | authenticateFn returned null/false or threw an exception |
4004 | Encryption required | requireEncryption: true and either no handshakeFn configured or handshakeFn returned null/false |
Redis Message Envelope
Cross-instance messages are published via Redis Pub/Sub using the IRedisSocketMessage envelope:
interface IRedisSocketMessage<DataType = unknown> {
serverId: string; // Source server instance ID (UUID or 'emitter')
type: TWebSocketMessageType; // 'client' | 'user' | 'room' | 'broadcast'
target?: string; // Target clientId / userId / room name
event: string; // Event to deliver
data: DataType; // Payload
exclude?: string[]; // Client IDs to exclude from delivery
}Messages from the same serverId are ignored (self-dedup) -- the sending server already delivered locally before publishing to Redis. Messages from the WebSocketEmitter use serverId = 'emitter', which never matches any server's UUID, so all servers process them.
Message Types
| Type | Channel Pattern | Description |
|---|---|---|
client | ws:client:{clientId} | Direct to specific client |
user | ws:user:{userId} | To all clients of a user |
room | ws:room:{roomName} | To all clients in a room |
broadcast | ws:broadcast | To all connected, authenticated clients |
Client Tracking
IWebSocketClient Interface
Each connected client is tracked in an in-memory Map<string, IWebSocketClient>:
interface IWebSocketClient<
MetadataType extends Record<string, unknown> = Record<string, unknown>,
> {
id: string; // Unique client ID (UUID, assigned during upgrade)
userId?: string; // Set after authentication
socket: IWebSocket; // Bun native WebSocket reference
state: TWebSocketClientState; // 'unauthorized' | 'authenticating' | 'authenticated' | 'disconnected'
rooms: Set<string>; // Joined rooms (including default rooms and own clientId room)
backpressured: boolean; // True when socket.send() returns -1 (Bun backpressure)
encrypted: boolean; // Whether client has completed encryption handshake
connectedAt: number; // Connection timestamp (Date.now())
lastActivity: number; // Last heartbeat/message timestamp (Date.now())
metadata?: MetadataType; // Custom metadata from authenticateFn return value
serverPublicKey?: string; // ECDH public key (set if encrypted)
salt?: string; // Encryption salt (set if encrypted)
authTimer?: ReturnType<typeof setTimeout>; // Auth timeout timer (cleared after auth)
}Client State Transitions
UNAUTHORIZED --(authenticate event)--> AUTHENTICATING
|
+-------------+-------------+
| | |
auth fails auth succeeds timeout
| | |
v v v
DISCONNECTED AUTHENTICATED DISCONNECTED
|
(close / heartbeat timeout)
|
v
DISCONNECTEDStates are defined in the WebSocketClientStates constant class:
class WebSocketClientStates {
static readonly UNAUTHORIZED = 'unauthorized';
static readonly AUTHENTICATING = 'authenticating';
static readonly AUTHENTICATED = 'authenticated';
static readonly DISCONNECTED = 'disconnected';
}Tracking Maps
The server maintains three index maps for efficient lookups:
| Map | Key | Value | Purpose |
|---|---|---|---|
clients | clientId | IWebSocketClient | All connected clients |
users | userId | Set<clientId> | Multi-session user index |
rooms | room | Set<clientId> | Room membership index |
TIP
A single user can have multiple client connections (e.g., browser tab + mobile). Use getClientsByUser({ userId }) to reach all sessions. The users map entry is automatically cleaned up when the last client for a user disconnects.
Redis Channel Architecture
WebSocketChannels Class
class WebSocketChannels {
// --- Static channel names ---
static readonly BROADCAST = 'ws:broadcast';
static readonly ROOM_PREFIX = 'ws:room:';
static readonly CLIENT_PREFIX = 'ws:client:';
static readonly USER_PREFIX = 'ws:user:';
// --- Channel builders ---
static forRoom(opts: { room: string }): string; // 'ws:room:{room}'
static forClient(opts: { clientId: string }): string; // 'ws:client:{clientId}'
static forUser(opts: { userId: string }): string; // 'ws:user:{userId}'
// --- Pattern builders (for Redis PSUBSCRIBE) ---
static forRoomPattern(): string; // 'ws:room:*'
static forClientPattern(): string; // 'ws:client:*'
static forUserPattern(): string; // 'ws:user:*'
}Redis Client Type
Both WebSocketServerHelper and WebSocketEmitter support Redis single instance and Redis Cluster:
type TRedisClient = Redis | Cluster;The Redis client is obtained via redisConnection.getClient().duplicate(). The duplicate() call creates a fresh connection that inherits the parent's configuration (including cluster mode). This ensures WebSocket pub/sub traffic does not interfere with application Redis usage.
Subscription Setup
During configure(), the server subscribes to all channels:
// Direct subscribe (exact match)
redisSub.subscribe(WebSocketChannels.BROADCAST); // 'ws:broadcast'
// Pattern subscribe (wildcard match)
redisSub.psubscribe(WebSocketChannels.forRoomPattern()); // 'ws:room:*'
redisSub.psubscribe(WebSocketChannels.forClientPattern()); // 'ws:client:*'
redisSub.psubscribe(WebSocketChannels.forUserPattern()); // 'ws:user:*'NOTE
Redis PSUBSCRIBE uses pattern matching -- a message published to ws:room:chat-general is received by all servers subscribed to ws:room:*. This allows the server to receive messages for any room without knowing room names in advance.
Message Flow (Cross-Instance)
Server A Redis Server B
| | |
|-- send({ destination: room }) -| |
| 1. sendToRoom() locally | |
| 2. publishToRedis() -------->|-- ws:room:chat ------> |
| | onRedisMessage()
| | |-- skip if serverId === own
| | +-- sendToRoom() locallyMessage Flow (Emitter to Servers)
WebSocketEmitter Redis Server A + Server B
| | |
|-- toUser({ userId }) -------->|-- ws:user:u1 --------> |
| serverId = 'emitter' | onRedisMessage()
| | |-- serverId !== own -> process
| | +-- sendToUser() locallyAuthentication Flow
Client Server
| |
|-- WS upgrade request -------->|
|<-- 101 Switching Protocols ---| (Bun handles upgrade)
| |-- onClientConnect()
| | state = UNAUTHORIZED
| | subscribe(clientId) <-- Bun topic for direct messaging
| | start authTimer (5s default)
| |
|-- { event: 'authenticate', |
| data: { token: '...' } } >|-- handleAuthenticate()
| | state = AUTHENTICATING
| | replace timer with authTimeout * 3
| | await authenticateFn(data)
| | |
| | (if requireEncryption)
| | await handshakeFn(data)
| | enableClientEncryption()
| | state = AUTHENTICATED
| | index by userId
| | subscribe(BROADCAST_TOPIC) <-- unless encrypted
| | joinRoom(clientId) <-- auto-join own ID as room
| | joinRoom(default rooms)
| |
|<-- { event: 'connected', |
| data: { id, userId, |
| time, serverPublicKey?, |
| salt? } } -------------|
| |-- clientConnectedFn()Authentication Timeout Details
There are two timeout phases:
Initial timeout (
authTimeout, default 5 s): Starts when the client connects. If the client does not send anauthenticateevent within this window, the socket is closed with code4001.In-progress timeout (
authTimeout * 3, default 15 s): Replaces the initial timer when theauthenticateevent is received. This provides a longer window for the asyncauthenticateFn(and optionallyhandshakeFn) to complete. If authentication does not finish within this window, the socket is closed with code4001.
Client ID Auto-Join
After successful authentication, the server calls joinRoom({ clientId, room: clientId }). This means the client's own ID is registered as both a Bun native topic subscription (set during onClientConnect) and an application-level room. This enables targeting a specific client via send({ destination: clientId }) or sendToRoom({ room: clientId }).
Bun Topic Subscription Timing
| Topic | Subscribed At | Condition |
|---|---|---|
Client's own clientId | onClientConnect() (before auth) | Always |
BROADCAST_TOPIC | handleAuthenticate() (after auth) | Only if !client.encrypted |
| Default rooms | handleAuthenticate() (after auth, via joinRoom()) | Only if !client.encrypted |
| Custom rooms | handleJoin() (on client request) | Only if !client.encrypted |
Encrypted clients are never subscribed to Bun native topics (except clientId which is set before encryption status is known). All delivery to encrypted clients goes through the per-client outboundTransformer path.
Delivery Strategy
The helper uses a dual delivery strategy depending on whether encryption is active:
Without encryption (fast path):
- Room/broadcast messages use Bun's native
server.publish(topic, payload)-- O(1) C++ fan-out - Client-direct messages use
socket.send()directly - Zero JavaScript iteration for room fan-out
With encryption (per-client path):
- Encrypted clients are unsubscribed from all Bun native topics (
enableClientEncryption()) - Room/broadcast messages iterate clients individually, running each through
outboundTransformer - Uses
executePromiseWithLimit({ tasks, limit: encryptedBatchLimit })for concurrency control - Non-encrypted clients in the same room still use the Bun fast path
With exclude parameter:
- When
excludeis provided insendToRoom()orbroadcast(), the fast path is bypassed even without encryption - The server iterates all clients, skipping those in the
excludeset
IMPORTANT
When an outboundTransformer is bound, all room/broadcast messages fall back to the per-client iteration path (even for non-encrypted clients in the same room). This is because Bun native pub/sub cannot selectively apply transformations. Only bind outboundTransformer when you actually need per-client message transformation.
See Also
- Setup & Configuration - Quick reference, imports, setup steps, configuration, and binding keys
- API Reference - Architecture, WebSocketEmitter API, and internals
- Error Reference - Error conditions table and troubleshooting
- WebSocketServerHelper - Helper API documentation
- Socket.IO Component - Node.js-compatible alternative with Socket.IO
- Bun WebSocket Documentation - Official Bun WebSocket API reference