Realtime Bus¶
Purpose¶
The realtime bus is the WebSocket-based event channel that gives the universe its sense of presence: live combat, sector-presence awareness, market updates, chat, system notifications. The bus is the single push channel; the REST API is the request/response channel. State is authoritative on the server; clients render optimistically and reconcile on the bus.
Inputs¶
The bus reads:
- A connecting client's JWT (in the Authorization header during the upgrade or as a ?token= query param for browsers without header support).
- The user's Player row (or User row for admin connections) for room membership: current_sector_id, team_id, region, role.
- Server-side events from any service: combat, trading, governance, genesis, ARIA, chat.
- Cross-process pub/sub from Redis (so multiple gameserver workers see each other's events).
The bus fires: - On connection upgrade — auth handshake. - On every authoritative state change in any service that broadcasts. - On periodic heartbeat — for liveness and presence verification. - On disconnect — cleanup + departure broadcasts.
Process¶
Connection lifecycle¶
Client Server
│ WS upgrade /api/v1/ws?token=JWT │
├───────────────────────────────────────►│
│ │ verify JWT (same key as REST)
│ │ load Player row
│ │ if active connection exists for
│ │ user_id → close old socket
│ │ register in:
│ │ active_connections[user_id]
│ │ sector_connections[sector_id]
│ │ team_connections[team_id]
│ {type: "connected", session_id, ...} │
│◄───────────────────────────────────────┤
│ │ broadcast to sector room:
│ │ {type: "player_entered_sector"}
│ │
│ heartbeat every 30s │
├──────────►◄────────────────────────────┤
│ │
│ {type: "subscribe", topics:[...]} │ optional explicit subscriptions
├───────────────────────────────────────►│ (e.g. market topic, admin feed)
│ │
│ ... events flow ... │
│ │
│ disconnect (close / network drop) │
│◄─────►◄────────────────────────────────┤
│ │ scrub from rooms
│ │ broadcast "player_left_sector"
Auth handshake¶
- Client sends WS upgrade with the same JWT used for REST.
- Server decodes and verifies (same key, same expiry).
- On failure, close with code
4401(custom: unauthorized). - On success, server stores
user_data = {username, current_sector, team_id, region_id, ...}inconnection_metadata. - Single-connection-per-user: an existing socket for the same
user_idis closed first (the new socket wins). Multi-device support is opt-in via adevice_idquery param; absent it, only one tab is live.
Channel / topic model¶
Two layers: rooms (auto-assigned, follow player state) and topics (explicit subscribe).
Auto-assigned rooms:
- global — all connections.
- sector:{sector_id} — players currently in that sector.
- team:{team_id} — team members.
- region:{region_id} — regional citizens (governance, treaties).
- personal:{user_id} — single-recipient unicast.
- admin — admin connections only.
Explicit topics (client must subscribe):
- market:{commodity} — high-frequency market deltas.
- market:station:{station_id} — per-station deltas.
- combat:{combat_id} — round-by-round spectator stream.
- aria — personal ARIA events for this player.
Movement triggers a room hop: leave sector:{old}, join sector:{new}, broadcast departure + arrival events.
Broadcast vs unicast¶
| Method | Target | Use |
|---|---|---|
send_personal_message(user_id, msg) |
One user | Trade confirms, ARIA replies, alerts. |
broadcast_to_sector(sector_id, msg, exclude=) |
Sector room | Ship arrivals, combat starts, market updates. |
broadcast_to_team(team_id, msg, exclude=) |
Team room | Team chat, fleet orders. |
broadcast_to_region(region_id, msg, exclude=) |
Region room | Election state, policy votes, treaties. |
broadcast_global(msg, exclude=) |
Everybody | Server announcements. |
publish_topic(topic, msg) |
Topic subscribers | Market firehose, combat spectators. |
Every send wraps in try/except; failed sends mark the connection for cleanup.
Presence¶
connection_metadata[user_id] carries last_heartbeat. A periodic sweeper:
1. Iterates active connections.
2. If now - last_heartbeat > 90s, close the socket and run disconnect.
3. Emits presence_updated so leader-board + sector-presence UIs update.
Reconnection¶
Clients reconnect with the same JWT plus an optional last_event_id:
1. Server registers the new socket (closing any stale one).
2. If last_event_id is recent (≤ 60 s), the server replays missed events from a per-user ring buffer.
3. Older gaps trigger a forced state_resync event, so the client re-fetches authoritative state via REST.
Server-pushed event taxonomy¶
| Type | Direction | Payload (key fields) |
|---|---|---|
connected |
server → one | session_id, server_time, your_state |
state_resync |
server → one | reason (client should re-fetch) |
player_entered_sector / player_left_sector |
sector room | user_id, username, sector_id |
combat_started / combat_round / combat_resolved |
participants + sector | combat_id, attacker_id, defender_id, round, deltas, result |
ship_destroyed |
sector + global | ship_id, owner_id, cause |
market_update |
market topic | station_id, commodity, buy, sell, trend, supply, demand |
transaction_completed |
personal | tx_id, station_id, commodity, units, total |
price_alert_triggered |
personal | alert_id, commodity, condition, current_price |
bounty_placed / bounty_collected |
global + participants | target_id, amount |
genesis_progress |
personal | planet_id, percent, hours_remaining |
governance_event |
region room | kind, election_id?, policy_id?, payload |
aria_message |
personal | message_id, role, content, attachments |
chat_message |
room (sector/team/global) | channel, sender_id, sender_name, body, timestamp |
notification |
personal | kind, title, body, action_url |
heartbeat / pong |
both | server_time |
Cross-process fanout¶
Multiple gameserver workers run; the bus uses Redis pub/sub (redis_pubsub_service) for fanout:
1. Service emits an event by calling the bus method.
2. The local connection manager handles directly-connected sockets.
3. For users connected to a different worker, the message is published to a Redis channel (sw2102:bus).
4. Every worker subscribes; on receipt, each delivers to its locally connected sockets.
In multi-regional mode, a dedicated redis-nexus Redis instance handles cross-region traffic.
Outputs / state changes¶
The bus itself does not own persistent state. It mutates only ephemeral structures:
- active_connections[user_id] — open sockets.
- connection_metadata[user_id] — heartbeats, room assignments.
- sector_connections[sector_id], team_connections[team_id], region_connections[region_id] — room rosters.
- Per-user ring buffer of recent events (in Redis) for reconnection replay.
Indirectly, every event delivered triggers UI state changes on connected clients.
Invariants¶
- A user has at most one active socket per
device_id. - Only authenticated sockets are added to any room.
- Server never trusts a client message for state mutation — clients send commands (validated and re-routed through REST handlers); state-changing pushes only originate from server services.
- Every send is non-blocking with respect to the originating service — failures are logged and queued for cleanup, not raised.
- Sector / team / region rooms are derived from authoritative DB state; a player who moves out of a sector is always removed from that room before an arrival event fires for the new sector.
- Replay buffers expire (TTL 60 s); on miss, the client resyncs from REST.
Failure modes¶
| Mode | Target handling |
|---|---|
| Stale token expires mid-session | On next message attempt, re-verify; on failure, close 4401 and force reconnect. |
| Worker crash | Redis pub/sub keeps surviving workers in sync; affected sockets reconnect to a different worker. |
| Redis outage | Local broadcast still works for same-worker users; cross-worker delivery degrades; state_resync triggered when Redis returns. |
| Slow client (back-pressure) | Per-socket send queue with bounded size; overflow drops cosmetic events first (chat, presence) and forces state_resync. |
| Duplicate connect (same user, same device) | Old socket closed cleanly with code 4001 ("superseded"). |
| Heartbeat miss | After 90 s of silence, server closes the socket; client detects and reconnects with replay. |
| Subscription spam | Server caps subscriptions per socket (e.g. 50 topics); excess returns subscription_rejected. |
| Auth race during region transfer | Connection is closed and re-handshaken when region_id changes. |
Source map¶
| Concern | Path (target) |
|---|---|
| Connection manager | services/gameserver/src/services/websocket_service.py:ConnectionManager |
| Enhanced features (rooms, topics, replay) | services/gameserver/src/services/enhanced_websocket_service.py |
| Cross-process fanout | services/gameserver/src/services/redis_pubsub_service.py |
| Redis client | services/gameserver/src/services/redis_service.py |
| WS routes | services/gameserver/src/api/routes/websocket.py, enhanced_websocket.py |
| Player-client primary | services/player-client/src/services/realtimeWebSocket.ts |
| Player-client minimal | services/player-client/src/services/websocket.ts |
| Player-client React provider | services/player-client/src/contexts/WebSocketContext.tsx |
| Admin client | services/admin-ui/src/services/websocket.ts |
| Multi-regional Redis | redis-nexus instance (deployed via services/region-manager) |
Related¶
- DATA_MODELS:
../DATA_MODELS/player.md,../DATA_MODELS/admin.md. - ARCHITECTURE:
../ARCHITECTURE/services.md,../ARCHITECTURE/multi-regional.md,../ARCHITECTURE/auth.md. - OPERATIONS:
../OPERATIONS/realtime.md. - SYSTEMS: every other system in this directory emits events through this bus.