Realtime Bus¶
Status: 🚧 Partial — The core bus (JWT handshake, single-socket eviction, sector/team/personal rooms, the four send primitives, heartbeat sweeper … · ⚠︎ contains code↔spec divergence (impl audit 2026-06-16)
Purpose¶
The realtime bus is the WebSocket-based event channel that gives the universe its sense of presence: live combat, sector-presence awareness, market updates, chat, system notifications. The bus is the single push channel; the REST API is the request/response channel. State is authoritative on the server; clients render optimistically and reconcile on the bus.
Inputs¶
The bus reads:
- A connecting client's JWT (in the Authorization header during the upgrade or as a ?token= query param for browsers without header support).
- The user's Player row (or User row for admin connections) for room membership: current_sector_id, team_id, region, role.
- Server-side events from any service: combat, trading, governance, genesis, ARIA, chat.
- Cross-process pub/sub from Redis (so multiple gameserver workers see each other's events).
The bus fires: - On connection upgrade — auth handshake. - On every authoritative state change in any service that broadcasts. - On periodic heartbeat — for liveness and presence verification. - On disconnect — cleanup + departure broadcasts.
Process¶
Connection lifecycle¶
Client Server
│ WS upgrade /api/v1/ws/connect │
│ (admins use /api/v1/ws/admin) │
│ ?token=JWT │
├───────────────────────────────────────►│
│ │ verify JWT (same key as REST)
│ │ load Player row
│ │ if active connection exists for
│ │ user_id → close old socket
│ │ register in:
│ │ active_connections[user_id]
│ │ sector_connections[sector_id]
│ │ team_connections[team_id]
│ {type: "connected", session_id, ...} │
│◄───────────────────────────────────────┤
│ │ broadcast to sector room:
│ │ {type: "player_entered_sector"}
│ │
│ heartbeat every 30s │
├──────────►◄────────────────────────────┤
│ │
│ {type: "subscribe", topics:[...]} │ optional explicit subscriptions
├───────────────────────────────────────►│ (e.g. market topic, admin feed)
│ │
│ ... events flow ... │
│ │
│ disconnect (close / network drop) │
│◄─────►◄────────────────────────────────┤
│ │ scrub from rooms
│ │ broadcast "player_left_sector"
Auth handshake¶
- Client opens a WebSocket against either
/api/v1/ws/connect(player) or/api/v1/ws/admin(admin) with the same JWT used for REST. The token can be sent in theAuthorizationheader during the upgrade or as a?token=query parameter for browsers without header support. - Server decodes and verifies (same key, same expiry).
- On failure, close with code
4001(unauthorized — also reused for "superseded" connections; see Failure modes). - On success, server stores
user_data = {username, current_sector, team_id, region_id, ...}inconnection_metadata. - Single-connection-per-user: an existing socket for the same
user_idis closed first (the new socket wins). Multi-device support via adevice_idquery param is 📐 Design-only.
Channel / topic model¶
Two layers: rooms (auto-assigned, follow player state) and topics (explicit subscribe).
Auto-assigned rooms:
- global — all connections.
- sector:{sector_id} — players currently in that sector.
- team:{team_id} — team members.
- region:{region_id} — regional citizens (governance, treaties). 📐 Design-only — no region_connections map exists in ConnectionManager today.
- personal:{user_id} — single-recipient unicast. Single-recipient invariant (per ADR-0057 A-I1): a personal:{user_id} room has exactly one subscriber — the user themselves. Room-join is gated by the realtime gateway: a connection authenticated as user_X may only subscribe to personal:user_X. Cross-user subscription is rejected with ERR_AUTH_FORBIDDEN and logged as a security event. This invariant is load-bearing for ARIA's per-player privacy (ADR-0016) — nothing in any player's ARIA stream may surface in another player's session.
- admin — admin connections only.
Explicit topics (client must subscribe):
- market:{commodity} — high-frequency market deltas. ✅ Shipped (market_subscribe / market_unsubscribe in enhanced_websocket_service.py).
- market:station:{station_id} — per-station deltas. ✅ Shipped via the same path.
- combat:{combat_id} — round-by-round spectator stream. 📐 Design-only.
- aria — personal ARIA events for this player. 📐 Design-only — current ARIA events route through personal:{user_id}.
Movement triggers a room hop: leave sector:{old}, join sector:{new}, broadcast departure + arrival events.
Broadcast vs unicast¶
| Method | Target | Use | Status |
|---|---|---|---|
send_personal_message(user_id, msg) |
One user | Trade confirms, ARIA replies, alerts. | ✅ |
broadcast_to_sector(sector_id, msg, exclude=) |
Sector room | Ship arrivals, combat starts, market updates. | ✅ |
broadcast_to_team(team_id, msg, exclude=) |
Team room | Team chat, fleet orders. | ✅ |
broadcast_to_region(region_id, msg, exclude=) |
Region room | Election state, policy votes, treaties. | 📐 |
broadcast_global(msg, exclude=) |
Everybody | Server announcements. | ✅ |
publish_topic(topic, msg) |
Topic subscribers | Market firehose, combat spectators. | 📐 (a market-specific fanout via redis_pubsub_service exists; the generic publish_topic primitive does not) |
Every send wraps in try/except; failed sends mark the connection for cleanup.
Presence¶
connection_metadata[user_id] carries last_heartbeat. A periodic sweeper:
1. Iterates active connections.
2. If now - last_heartbeat > 300s, close the socket and run disconnect.
3. Emits presence_updated so leader-board + sector-presence UIs update.
Reconnection¶
📐 Design-only. The replay-buffer and
state_resyncmachinery below is the design target; today the gameserver does not maintain a per-user event ring buffer or emitstate_resync. Clients reconnect with their JWT and re-fetch state from REST.
Clients reconnect with the same JWT plus an optional last_event_id:
1. Server registers the new socket (closing any stale one).
2. If last_event_id is recent (≤ 60 s), the server replays missed events from a per-user ring buffer.
3. Older gaps trigger a forced state_resync event, so the client re-fetches authoritative state via REST.
Server-pushed event taxonomy¶
Target taxonomy. The table below is the canonical event vocabulary the bus should converge to: granular phase events (
combat_started/round/resolvedrather than a single overloadedcombat_update), dotted namespaces for room-scoped events (sector.player_joined), and dedicated event names for first-class domain transitions (genesis, governance, bounties, transactions, price alerts). Events marked 📐 are not yet emitted by the gameserver; getting there is tracked as tech debt againstwebsocket_service.pyand the services that should publish them. See "Current wire deviation" below for what code emits today.
| Type | Direction | Payload (key fields) | Status |
|---|---|---|---|
connected |
server → one | session_id, server_time, your_state |
✅ |
state_resync |
server → one | reason (client should re-fetch) |
📐 |
sector.player_joined / sector.player_left |
sector room | user_id, username, sector_id |
📐 (code emits underscored player_entered_sector / player_left_sector) |
combat_started / combat_round / combat_resolved |
participants + sector | combat_id, attacker_id, defender_id, round, deltas, result |
📐 (code emits a single combat_update event) |
ship_destroyed |
sector + global | ship_id, owner_id, cause |
✅ |
market_update |
market topic | station_id, commodity, buy, sell, trend, supply, demand |
✅ |
transaction_completed |
personal | tx_id, station_id, commodity, units, total |
📐 |
price_alert_triggered |
personal | alert_id, commodity, condition, current_price |
📐 |
bounty_placed / bounty_collected |
global + participants | target_id, amount |
📐 |
genesis_progress |
personal | planet_id, percent, hours_remaining |
📐 |
governance_event |
region room | kind, election_id?, policy_id?, payload |
📐 |
citadel.upgrade_cancelled |
personal (planet owner) | planet_id, cancelled_upgrade, reason, lost_building, credits_refunded, at |
📐 (per ADR-0059) |
pirate.holding_evolved |
sector + region | holding_id, from_tier, to_tier, formation_id, at |
📐 (per ADR-0060) |
pirate.region_cleansed |
region | region_id, holdings_destroyed_count, cleansed_by_player_ids, at |
📐 (per ADR-0060) |
pirate.daughter_spawned |
sector + region | parent_holding_id, daughter_holding_id, parent_zone, daughter_zone, at |
📐 (per ADR-0060) |
pirate.holding_captured |
sector + region + personal (capturer) | holding_id, tier, captured_by_player_id, captured_by_team_id, at |
📐 (per ADR-0060) |
pirate.fleet_destroyed |
sector + region | holding_id, fleet_size_destroyed, by_player_ids, at |
📐 (per ADR-0060) |
npc.role_promoted |
sector + region | npc_id, sector_id, from_role, to_role, predecessor_npc_id, at |
📐 (per ADR-0063 N-F1) |
npc.coordinated_genocide_detected |
region + ops | region_id, kills_in_window, window_seconds, marshal_npc_ids, at |
📐 (per ADR-0063 N-V4) |
aria_message |
personal | message_id, role, content, attachments |
📐 (code emits aria_response) |
chat_message |
room (sector/team/global) | channel, sender_id, sender_name, body, timestamp |
✅ |
notification |
personal | kind, title, body, action_url |
✅ |
medal_awarded |
personal + team (if any) + sector (only when tier ≥ GOLD or category = UNIQUE) |
medal_id, name, tier, category, icon_key, shape_tier, awarded_at, awarded_via, citation, is_hidden |
📐 (see ./medal-service.md) |
medal_progress |
personal | medal_id, counter_key, current, threshold, percent |
📐 (fired at 25 / 50 / 75 / 90 / 99 % thresholds) |
medal_revoked |
personal | medal_id, reason, revoking_admin_username |
📐 |
heartbeat / pong |
both | server_time |
✅ |
Current wire deviation¶
The shipping services/gameserver/src/services/websocket_service.py emits the events listed above with the alternative names noted in the Status column, plus several events that are not yet first-class in the target taxonomy: economy:alert, system:announcement, subscription_confirmed, broadcast_sent, admin_intervention, heartbeat_ack. These will be folded into the target vocabulary (e.g. economy:alert → price_alert_triggered, system:announcement → broadcast notification) when the publisher refactor lands. Until then, frontends that need to listen to those events should subscribe by their current names.
Transactional outbox for multi-table operations¶
Per ADR-0054 X-V1, multi-table transactions that emit realtime events (cleanup orchestrator, takeover commit, gate destruction, holding capture) use the transactional outbox pattern to avoid ghost-state bugs on rollback.
def multi_table_operation():
pending_events = []
try:
with transaction():
# ... do work, append events to pending_events ...
db.session.commit()
except:
log.exception(...)
return # pending_events discarded; events never leave server
# Post-commit flush — best-effort
for event_type, payload in pending_events:
try:
realtime_bus.emit(event_type, payload)
except Exception:
log.warning("Post-commit event emit failed", event_type=event_type)
When to use: any transaction that touches > 1 logical entity AND emits realtime events. Cleanup orchestrator, takeover, gate cascade, holding capture, planet-cascade-compensation are the canonical examples.
When NOT to use: single-table updates that emit one event (e.g., a player updates their own settings → one row write + one event). The direct emit pattern is fine — there's no rollback window long enough to matter.
Rationale: if the transaction rolls back after events have already broadcast, clients see UI state for changes that didn't commit on the server. Outbox flushes events only after commit succeeds; rollback discards them.
Cross-process fanout¶
Multiple gameserver workers run; the bus uses Redis pub/sub (redis_pubsub_service) for fanout:
1. Service emits an event by calling the bus method.
2. The local connection manager handles directly-connected sockets.
3. For users connected to a different worker, the message is published to a Redis channel (sw2102:bus).
4. Every worker subscribes; on receipt, each delivers to its locally connected sockets.
In multi-regional mode, a dedicated redis-nexus Redis instance handles cross-region traffic.
Performance targets¶
Per ADR-0051 SK28, these are launch SLOs validated by a pre-launch load test at staging on production-class hardware, simulating peak event volume (combat resolutions, sector-presence updates, market-tick fanout, ARIA notifications).
| Target | Value |
|---|---|
| Local (same-region) round-trip latency | <50 ms |
| Cross-region round-trip latency | <200 ms |
| Concurrent connections per gameserver | 5,000 |
| P99 message delivery latency | <200 ms |
| Message throughput per server | 1,000+ msg/sec |
| Per-connection memory footprint | ~2 KB |
If the load test fails to meet the connection or latency SLO, the decision splits two ways:
- (a) Horizontal scale-out — multiple gameserver instances behind a connection-router that shards by
user_idhash. Per-instance load drops; total capacity scales linearly with instance count. Strong default. - (b) Connection cap with queue — hard cap at the SLO threshold; new connections queue until a slot frees. Acceptable for non-critical players (read-only browsers); not acceptable for actively-playing subscribers.
The choice is deferred to test results.
Rate limits¶
Anti-abuse caps enforced inside enhanced_websocket_service.py. Exceeding any limit on an authenticated socket returns an error event with code: "rate_limited" and the offending command is dropped.
| Limit | Value | Role |
|---|---|---|
| Per-connection message rate | 100 msg/s | binding |
| Trade commands | 30 / min | binding |
| AI / ARIA requests | 60 / min | advisory — set above the server-side ARIA cap (10 req/min, see ../OPERATIONS/aria.md#cost-rate-controls) so the rejection-with-canonical-reason-code always comes from the ARIA server, not the bus. The bus value exists to suppress runaway local input loops; player-facing rate-limit feedback originates from the server. |
| Topic subscriptions per socket | 50 | binding |
Sustained violations escalate to a forced disconnect with close code 4002.
Outputs / state changes¶
The bus itself does not own persistent state. It mutates only ephemeral structures:
- active_connections[user_id] — open sockets.
- connection_metadata[user_id] — heartbeats, room assignments.
- sector_connections[sector_id], team_connections[team_id], region_connections[region_id] — room rosters.
- Per-user ring buffer of recent events (in Redis) for reconnection replay.
Indirectly, every event delivered triggers UI state changes on connected clients.
Invariants¶
✅ Shipped invariants¶
- A user has at most one active socket per
user_id(the launch rule). Connecting from a second tab/device evicts the prior socket with close code4001. - Only authenticated sockets are added to any room.
- Server never trusts a client message for state mutation — clients send commands (validated and re-routed through REST handlers); state-changing pushes only originate from server services.
- Every send is non-blocking with respect to the originating service — failures are logged and queued for cleanup, not raised.
- Sector / team / region rooms are derived from authoritative DB state; a player who moves out of a sector is always removed from that room before an arrival event fires for the new sector.
- Monotonic event ordering within a single room — events delivered to subscribers of a given room arrive in the order they were emitted server-side.
📐 Design-only invariants¶
- Per-
device_idsocket cap (one active socket per device, regardless ofuser_id). Requires the 📐 Design-onlydevice_idfield on connection metadata; not enforced at launch — the per-user_idrule is the binding constraint until the device-fingerprinting layer ships. - Replay buffers expire (TTL 60 s); on miss, the client resyncs from REST. Requires the 📐 Design-only replay-buffer service; at launch a missed event simply means the client's next REST poll picks up the state — no replay is offered.
Failure modes¶
| Mode | Target handling |
|---|---|
| Stale token expires mid-session | On next message attempt, re-verify; on failure, close 4001 and force reconnect. |
| Worker crash | Redis pub/sub keeps surviving workers in sync; affected sockets reconnect to a different worker. |
| Redis outage | Local broadcast still works for same-worker users; cross-worker delivery degrades; state_resync triggered when Redis returns. |
| Slow client (back-pressure) | Per-socket send queue with bounded size; overflow drops cosmetic events first (chat, presence) and forces state_resync. |
| Duplicate connect (same user, same device) | Old socket closed cleanly with code 4001 ("superseded"). |
| Heartbeat miss | After 300 s of silence, server closes the socket; client detects and reconnects with replay. |
| Subscription spam | Server caps subscriptions per socket (e.g. 50 topics); excess returns subscription_rejected. |
| Auth race during region transfer | Connection is closed and re-handshaken when region_id changes. |
Source map¶
| Concern | Path (target) |
|---|---|
| Connection manager | services/gameserver/src/services/websocket_service.py:ConnectionManager |
| Enhanced features (rooms, topics, replay) | services/gameserver/src/services/enhanced_websocket_service.py |
| Cross-process fanout | services/gameserver/src/services/redis_pubsub_service.py |
| Redis client | services/gameserver/src/services/redis_service.py |
| WS routes | services/gameserver/src/api/routes/websocket.py, enhanced_websocket.py |
| Player-client primary | services/player-client/src/services/realtimeWebSocket.ts |
| Player-client minimal | services/player-client/src/services/websocket.ts |
| Player-client React provider | services/player-client/src/contexts/WebSocketContext.tsx |
| Admin client | services/admin-ui/src/services/websocket.ts |
| Multi-regional Redis | redis-nexus instance (deployed via services/region-manager) |
Related¶
- DATA_MODELS:
../DATA_MODELS/player.md,../DATA_MODELS/admin.md. - ARCHITECTURE:
../ARCHITECTURE/services.md,../ARCHITECTURE/multi-regional.md,../ARCHITECTURE/auth.md. - OPERATIONS:
../OPERATIONS/realtime.md. - SYSTEMS: every other system in this directory emits events through this bus.