realtime Package
Overview
Package realtime provides comprehensive real-time event broadcasting and client connection management.
This package implements a high-performance real-time communication system that supports WebSocket and Server-Sent Events (SSE) for live updates in the incident management platform. It provides event broadcasting, channel-based subscriptions, client management, and integration with the incident timeline system.
Key Features:
- Multi-protocol support (WebSocket, SSE) for real-time connections
- Channel-based subscription system with flexible filtering
- High-performance event broadcasting with buffering and concurrency
- Automatic client connection management and cleanup
- Integration with incident timeline for real-time incident updates
- System notification broadcasting for platform-wide announcements
- Comprehensive monitoring and statistics collection
- Thread-safe operations for high-concurrency scenarios
Architecture:
The realtime system follows a publisher-subscriber pattern with channel-based routing:
┌─────────────────┐ ┌──────────────────┐ ┌─────────────────┐
│ Event Sources │───►│ Broadcaster │───►│ Client Manager │
│ (Timeline, SLA) │ │ (Pub/Sub Engine) │ │ (WS/SSE Servers)│
└─────────────────┘ └──────────────────┘ └─────────────────┘
│ │ │┌─────────────────┐ ┌──────────────────┐ ┌─────────────────┐
│ Event Filters │ │ Channel Router │ │ Connected │
│ (Topic/Content) │ │ (Subscription) │ │ Clients │
└─────────────────┘ └──────────────────┘ └─────────────────┘
Channel System:
- “incidents”: All incident-related events
- “incidents.
”: Events for specific incident - “timeline.
”: Timeline events for specific incident - “system”: System-wide notifications and announcements
- “all”: Global event stream (admin/monitoring use)
Event Types:
- incident.created: New incident declared
- incident.updated: Incident status or metadata changed
- timeline.updated: New timeline event added
- connector.sync: External system synchronization event
- sla.breach: SLA threshold exceeded
- system.notification: Platform announcement or alert
Example usage:
// Create broadcaster
broadcaster := realtime.NewBroadcaster()
// Create WebSocket server
wsServer := realtime.NewWebSocketServer(broadcaster)
// Subscribe client to incident updates
client := &MyWebSocketClient{id: "user123"}
subscription := broadcaster.Subscribe(client,
[]string{"incidents", "system"},
map[string]string{"severity": "SEV-1"})
// Broadcast incident event
event := &realtime.Event{
ID: "evt-123",
Type: realtime.EventIncidentCreated,
Timestamp: time.Now(),
Data: incident,
Channels: []string{"incidents", "incidents.INC-123"},
}
broadcaster.Broadcast(event)
Integration Patterns:
- Timeline Service: Automatically broadcasts timeline events
- SLA Service: Broadcasts breach notifications
- Connector Service: Broadcasts sync status updates
- WebSocket/SSE Servers: Handle client connections and message delivery
Import Path: github.com/systmms/incidents/internal/realtime
Types
Broadcaster
Broadcaster manages real-time event distribution to connected clients with high-performance routing.
The Broadcaster serves as the central hub for real-time communication, coordinating between event producers (timeline service, SLA monitors, connectors) and connected clients (WebSocket, SSE connections). It provides efficient event routing, subscription management, and automatic connection cleanup.
Architecture Features:
- Channel-based subscription routing for O(1) event delivery
- Buffered event processing to handle high-throughput scenarios
- Concurrent-safe subscription management with read-write locks
- Automatic cleanup of dead connections and failed deliveries
- Content filtering for fine-grained event selection
- Comprehensive statistics and monitoring capabilities
Performance Characteristics:
- Event routing: O(channels × subscriptions_per_channel)
- Subscription lookup: O(1) for direct access, O(n) for channel enumeration
- Memory usage: Linear with number of subscriptions and buffered events
- Concurrency: High read concurrency, synchronized writes for safety
The Broadcaster is designed to handle thousands of concurrent connections with minimal latency impact through efficient data structures and asynchronous processing.
{<nil> 13895 type 0 [0x14000290940] 0}Methods
NewBroadcaster
NewBroadcaster creates a new event broadcaster with default configuration for high-throughput scenarios.
This constructor initializes all internal data structures and starts the background event processing goroutine. The broadcaster is immediately ready to accept subscriptions and process events after creation.
Default Configuration:
- Event buffer size: 1000 events (configurable for high-throughput deployments)
- Concurrent processing: Single background goroutine for event delivery
- Subscription management: Thread-safe with read-write locking
- Connection cleanup: Automatic cleanup of failed connections during delivery
The broadcaster starts immediately upon creation and continues processing until the Close() method is called or the context is cancelled. This ensures minimal startup latency and immediate availability for real-time event distribution.
Memory Usage: The initial memory footprint is minimal, growing linearly with the number of active subscriptions and buffered events. The event buffer prevents memory buildup during temporary throughput spikes.
Returns a fully initialized Broadcaster ready for production use.
{<nil> <nil> NewBroadcaster 0x140002d2040 <nil>}Client
Client represents a connected client that can receive real-time events.
The Client interface abstracts the underlying connection protocol (WebSocket, SSE, etc.) and provides a uniform API for event delivery and connection management. This allows the broadcaster to work with different transport mechanisms without protocol-specific code.
Implementations must handle:
- Protocol-specific message formatting and delivery
- Connection state management and error handling
- Graceful disconnection and resource cleanup
- Concurrent access safety for Send operations
Example implementations:
- WebSocketClient: Handles WebSocket protocol specifics
- SSEClient: Manages Server-Sent Events connections
- MockClient: Testing implementation for unit tests
Thread Safety: All Client methods must be safe for concurrent use, as the broadcaster may call Send from multiple goroutines simultaneously during fan-out operations.
{<nil> 11563 type 0 [0x14000289e80] 0}Event
Event represents a real-time event that can be broadcast to connected clients.
Events are the core data structure for the real-time communication system. Each event contains metadata about what happened, when it occurred, and which channels should receive the event. The flexible Data field allows any JSON-serializable payload.
Events support multi-channel broadcasting, allowing a single event to be sent to multiple channels simultaneously. This enables efficient fan-out scenarios where incident updates need to reach both general incident streams and specific incident timelines.
Example event for incident creation:
event := &Event{
ID: "evt-123-456",
Type: EventIncidentCreated,
Timestamp: time.Now(),
Data: &models.Incident{...},
Channels: []string{"incidents", "incidents.INC-123", "all"},
}
Channel routing patterns:
- “incidents”: All incident-related events
- “incidents.
”: Events for specific incident ID - “timeline.
”: Timeline events for specific incident ID - “system”: System-wide notifications and announcements
- “all”: Global event stream (typically for admin/monitoring)
{<nil> 5227 type 0 [0x14000289140] 0}EventType
EventType defines the type of real-time event for client-side processing and filtering.
Event types provide semantic meaning to events, allowing clients to handle different categories of events appropriately. The type system follows a hierarchical naming convention using dot notation to indicate the domain and specific action.
Naming Convention:
- domain.action: Primary format (e.g., “incident.created”)
- Domains: incident, timeline, connector, sla, system
- Actions: created, updated, breach, notification, etc.
This consistent naming enables:
- Client-side filtering and routing
- Subscription management by event category
- Metrics and monitoring by event type
- Integration with external event systems
{<nil> 7039 type 0 [0x14000289800] 0}Constants
const EventIncidentCreated const EventIncidentUpdated const EventTimelineUpdated const EventConnectorSync const EventSLABreach const EventSystemNotificationLegacyConfig
LegacyConfig holds configuration for the legacy WebSocket/SSE system
{<nil> 52173 type 0 [0x14000207bc0] 0}Mode
Mode defines the real-time service mode
{<nil> 51609 type 0 [0x140002076c0] 0}Constants
const ModeWebSocket const ModeCentrifuge const ModeHybridSSEClient
SSEClient represents a Server-Sent Events client connection
{<nil> 42663 type 0 [0x140003a6940] 0}SSEServer
SSEServer manages Server-Sent Events connections
{<nil> 42951 type 0 [0x140003a75c0] 0}Methods
NewSSEServer
NewSSEServer creates a new SSE server
{<nil> <nil> NewSSEServer 0x140003aa6c0 <nil>}SubscribeRequest
SubscribeRequest represents a subscription request from client
{<nil> 60347 type 0 [0x140003c5140] 0}Subscription
Subscription represents a client’s subscription to real-time events with optional filtering.
Subscriptions define which events a client should receive based on channel membership and optional content filters. The subscription system supports fine-grained control over event delivery, enabling clients to receive only relevant updates.
Channel Subscription Patterns:
- Single channel: [“incidents”] - receive all incident events
- Multiple channels: [“incidents”, “system”] - receive incident and system events
- Specific resources: [“incidents.INC-123”] - receive events for specific incident
- Wildcard patterns: [“incidents.*”] - planned future enhancement
Filter Examples:
- Severity filtering: {“severity”: “SEV-1”} - only critical incidents
- Event type filtering: {“type”: “incident.created”} - only new incidents
- Status filtering: {“status”: “open”} - only active incidents
The subscription maintains a reference to the client for message delivery but excludes it from JSON serialization to prevent circular references and security issues.
{<nil> 9623 type 0 [0x14000289c40] 0}UnifiedBroadcaster
UnifiedBroadcaster implements the Timeline Service Broadcaster interface and publishes to both legacy and Centrifuge systems
{<nil> 57737 type 0 [0x140003c4380] 0}UnifiedConfig
UnifiedConfig holds configuration for the unified real-time service
{<nil> 51889 type 0 [0x14000207980] 0}Methods
DefaultConfig
DefaultConfig returns a default unified configuration
{<nil> <nil> DefaultConfig 0x14000205140 <nil>}UnifiedService
UnifiedService manages both legacy and Centrifuge real-time services
{<nil> 52306 type 0 [0x14000207d40] 0}Methods
NewUnifiedService
NewUnifiedService creates a new unified real-time service
{<nil> <nil> NewUnifiedService 0x140002401c0 <nil>}WebSocketClient
WebSocketClient represents a WebSocket client connection
{<nil> 59875 type 0 [0x140003c4d80] 0}WebSocketMessage
WebSocketMessage represents a message sent over WebSocket
{<nil> 60072 type 0 [0x140003c4f80] 0}WebSocketServer
WebSocketServer manages WebSocket connections
{<nil> 60531 type 0 [0x140003c5240] 0}Methods
NewWebSocketServer
NewWebSocketServer creates a new WebSocket server
{<nil> <nil> NewWebSocketServer 0x140003fc440 <nil>}Functions
convertRealtimeEventType
convertRealtimeEventType converts realtime.EventType to centrifuge.EventType
{<nil> <nil> convertRealtimeEventType 0x14000268040 <nil>}Generated automatically from Go source code. Last updated: 2025-08-25T07:51:05-04:00