realtime Package

Overview

Package realtime provides comprehensive real-time event broadcasting and client connection management.

This package implements a high-performance real-time communication system that supports WebSocket and Server-Sent Events (SSE) for live updates in the incident management platform. It provides event broadcasting, channel-based subscriptions, client management, and integration with the incident timeline system.

Key Features:

  • Multi-protocol support (WebSocket, SSE) for real-time connections
  • Channel-based subscription system with flexible filtering
  • High-performance event broadcasting with buffering and concurrency
  • Automatic client connection management and cleanup
  • Integration with incident timeline for real-time incident updates
  • System notification broadcasting for platform-wide announcements
  • Comprehensive monitoring and statistics collection
  • Thread-safe operations for high-concurrency scenarios

Architecture:

The realtime system follows a publisher-subscriber pattern with channel-based routing:

┌─────────────────┐    ┌──────────────────┐    ┌─────────────────┐
│ Event Sources   │───►│ Broadcaster      │───►│ Client Manager  │
│ (Timeline, SLA) │    │ (Pub/Sub Engine) │    │ (WS/SSE Servers)│
└─────────────────┘    └──────────────────┘    └─────────────────┘
                                                      
┌─────────────────┐    ┌──────────────────┐    ┌─────────────────┐
│ Event Filters   │    │ Channel Router   │    │ Connected       │
│ (Topic/Content) │    │ (Subscription)   │    │ Clients         │
└─────────────────┘    └──────────────────┘    └─────────────────┘

Channel System:

  • “incidents”: All incident-related events
  • “incidents.”: Events for specific incident
  • “timeline.”: Timeline events for specific incident
  • “system”: System-wide notifications and announcements
  • “all”: Global event stream (admin/monitoring use)

Event Types:

  • incident.created: New incident declared
  • incident.updated: Incident status or metadata changed
  • timeline.updated: New timeline event added
  • connector.sync: External system synchronization event
  • sla.breach: SLA threshold exceeded
  • system.notification: Platform announcement or alert

Example usage:

// Create broadcaster
broadcaster := realtime.NewBroadcaster()

// Create WebSocket server
wsServer := realtime.NewWebSocketServer(broadcaster)

// Subscribe client to incident updates
client := &MyWebSocketClient{id: "user123"}
subscription := broadcaster.Subscribe(client,
	[]string{"incidents", "system"},
	map[string]string{"severity": "SEV-1"})

// Broadcast incident event
event := &realtime.Event{
	ID:        "evt-123",
	Type:      realtime.EventIncidentCreated,
	Timestamp: time.Now(),
	Data:      incident,
	Channels:  []string{"incidents", "incidents.INC-123"},
}
broadcaster.Broadcast(event)

Integration Patterns:

  • Timeline Service: Automatically broadcasts timeline events
  • SLA Service: Broadcasts breach notifications
  • Connector Service: Broadcasts sync status updates
  • WebSocket/SSE Servers: Handle client connections and message delivery

Import Path: github.com/systmms/incidents/internal/realtime

Types

Broadcaster

Broadcaster manages real-time event distribution to connected clients with high-performance routing.

The Broadcaster serves as the central hub for real-time communication, coordinating between event producers (timeline service, SLA monitors, connectors) and connected clients (WebSocket, SSE connections). It provides efficient event routing, subscription management, and automatic connection cleanup.

Architecture Features:

  • Channel-based subscription routing for O(1) event delivery
  • Buffered event processing to handle high-throughput scenarios
  • Concurrent-safe subscription management with read-write locks
  • Automatic cleanup of dead connections and failed deliveries
  • Content filtering for fine-grained event selection
  • Comprehensive statistics and monitoring capabilities

Performance Characteristics:

  • Event routing: O(channels × subscriptions_per_channel)
  • Subscription lookup: O(1) for direct access, O(n) for channel enumeration
  • Memory usage: Linear with number of subscriptions and buffered events
  • Concurrency: High read concurrency, synchronized writes for safety

The Broadcaster is designed to handle thousands of concurrent connections with minimal latency impact through efficient data structures and asynchronous processing.

{<nil> 13895 type 0 [0x14000290940] 0}

Methods

NewBroadcaster

NewBroadcaster creates a new event broadcaster with default configuration for high-throughput scenarios.

This constructor initializes all internal data structures and starts the background event processing goroutine. The broadcaster is immediately ready to accept subscriptions and process events after creation.

Default Configuration:

  • Event buffer size: 1000 events (configurable for high-throughput deployments)
  • Concurrent processing: Single background goroutine for event delivery
  • Subscription management: Thread-safe with read-write locking
  • Connection cleanup: Automatic cleanup of failed connections during delivery

The broadcaster starts immediately upon creation and continues processing until the Close() method is called or the context is cancelled. This ensures minimal startup latency and immediate availability for real-time event distribution.

Memory Usage: The initial memory footprint is minimal, growing linearly with the number of active subscriptions and buffered events. The event buffer prevents memory buildup during temporary throughput spikes.

Returns a fully initialized Broadcaster ready for production use.

{<nil> <nil> NewBroadcaster 0x140002d2040 <nil>}

Client

Client represents a connected client that can receive real-time events.

The Client interface abstracts the underlying connection protocol (WebSocket, SSE, etc.) and provides a uniform API for event delivery and connection management. This allows the broadcaster to work with different transport mechanisms without protocol-specific code.

Implementations must handle:

  • Protocol-specific message formatting and delivery
  • Connection state management and error handling
  • Graceful disconnection and resource cleanup
  • Concurrent access safety for Send operations

Example implementations:

  • WebSocketClient: Handles WebSocket protocol specifics
  • SSEClient: Manages Server-Sent Events connections
  • MockClient: Testing implementation for unit tests

Thread Safety: All Client methods must be safe for concurrent use, as the broadcaster may call Send from multiple goroutines simultaneously during fan-out operations.

{<nil> 11563 type 0 [0x14000289e80] 0}

Event

Event represents a real-time event that can be broadcast to connected clients.

Events are the core data structure for the real-time communication system. Each event contains metadata about what happened, when it occurred, and which channels should receive the event. The flexible Data field allows any JSON-serializable payload.

Events support multi-channel broadcasting, allowing a single event to be sent to multiple channels simultaneously. This enables efficient fan-out scenarios where incident updates need to reach both general incident streams and specific incident timelines.

Example event for incident creation:

event := &Event{
	ID:        "evt-123-456",
	Type:      EventIncidentCreated,
	Timestamp: time.Now(),
	Data:      &models.Incident{...},
	Channels:  []string{"incidents", "incidents.INC-123", "all"},
}

Channel routing patterns:

  • “incidents”: All incident-related events
  • “incidents.”: Events for specific incident ID
  • “timeline.”: Timeline events for specific incident ID
  • “system”: System-wide notifications and announcements
  • “all”: Global event stream (typically for admin/monitoring)
{<nil> 5227 type 0 [0x14000289140] 0}

EventType

EventType defines the type of real-time event for client-side processing and filtering.

Event types provide semantic meaning to events, allowing clients to handle different categories of events appropriately. The type system follows a hierarchical naming convention using dot notation to indicate the domain and specific action.

Naming Convention:

  • domain.action: Primary format (e.g., “incident.created”)
  • Domains: incident, timeline, connector, sla, system
  • Actions: created, updated, breach, notification, etc.

This consistent naming enables:

  • Client-side filtering and routing
  • Subscription management by event category
  • Metrics and monitoring by event type
  • Integration with external event systems
{<nil> 7039 type 0 [0x14000289800] 0}

Constants

const EventIncidentCreated const EventIncidentUpdated const EventTimelineUpdated const EventConnectorSync const EventSLABreach const EventSystemNotification

LegacyConfig

LegacyConfig holds configuration for the legacy WebSocket/SSE system

{<nil> 52173 type 0 [0x14000207bc0] 0}

Mode

Mode defines the real-time service mode

{<nil> 51609 type 0 [0x140002076c0] 0}

Constants

const ModeWebSocket const ModeCentrifuge const ModeHybrid

SSEClient

SSEClient represents a Server-Sent Events client connection

{<nil> 42663 type 0 [0x140003a6940] 0}

SSEServer

SSEServer manages Server-Sent Events connections

{<nil> 42951 type 0 [0x140003a75c0] 0}

Methods

NewSSEServer

NewSSEServer creates a new SSE server

{<nil> <nil> NewSSEServer 0x140003aa6c0 <nil>}

SubscribeRequest

SubscribeRequest represents a subscription request from client

{<nil> 60347 type 0 [0x140003c5140] 0}

Subscription

Subscription represents a client’s subscription to real-time events with optional filtering.

Subscriptions define which events a client should receive based on channel membership and optional content filters. The subscription system supports fine-grained control over event delivery, enabling clients to receive only relevant updates.

Channel Subscription Patterns:

  • Single channel: [“incidents”] - receive all incident events
  • Multiple channels: [“incidents”, “system”] - receive incident and system events
  • Specific resources: [“incidents.INC-123”] - receive events for specific incident
  • Wildcard patterns: [“incidents.*”] - planned future enhancement

Filter Examples:

  • Severity filtering: {“severity”: “SEV-1”} - only critical incidents
  • Event type filtering: {“type”: “incident.created”} - only new incidents
  • Status filtering: {“status”: “open”} - only active incidents

The subscription maintains a reference to the client for message delivery but excludes it from JSON serialization to prevent circular references and security issues.

{<nil> 9623 type 0 [0x14000289c40] 0}

UnifiedBroadcaster

UnifiedBroadcaster implements the Timeline Service Broadcaster interface and publishes to both legacy and Centrifuge systems

{<nil> 57737 type 0 [0x140003c4380] 0}

UnifiedConfig

UnifiedConfig holds configuration for the unified real-time service

{<nil> 51889 type 0 [0x14000207980] 0}

Methods

DefaultConfig

DefaultConfig returns a default unified configuration

{<nil> <nil> DefaultConfig 0x14000205140 <nil>}

UnifiedService

UnifiedService manages both legacy and Centrifuge real-time services

{<nil> 52306 type 0 [0x14000207d40] 0}

Methods

NewUnifiedService

NewUnifiedService creates a new unified real-time service

{<nil> <nil> NewUnifiedService 0x140002401c0 <nil>}

WebSocketClient

WebSocketClient represents a WebSocket client connection

{<nil> 59875 type 0 [0x140003c4d80] 0}

WebSocketMessage

WebSocketMessage represents a message sent over WebSocket

{<nil> 60072 type 0 [0x140003c4f80] 0}

WebSocketServer

WebSocketServer manages WebSocket connections

{<nil> 60531 type 0 [0x140003c5240] 0}

Methods

NewWebSocketServer

NewWebSocketServer creates a new WebSocket server

{<nil> <nil> NewWebSocketServer 0x140003fc440 <nil>}

Functions

convertRealtimeEventType

convertRealtimeEventType converts realtime.EventType to centrifuge.EventType

{<nil> <nil> convertRealtimeEventType 0x14000268040 <nil>}

Generated automatically from Go source code. Last updated: 2025-08-25T07:51:05-04:00