Skip to content

Realtime Bus

Status: 🚧 Partial — The core bus (JWT handshake, single-socket eviction, sector/team/personal rooms, the four send primitives, heartbeat sweeper … · ⚠︎ contains code↔spec divergence (impl audit 2026-06-16)

Purpose

The realtime bus is the WebSocket-based event channel that gives the universe its sense of presence: live combat, sector-presence awareness, market updates, chat, system notifications. The bus is the single push channel; the REST API is the request/response channel. State is authoritative on the server; clients render optimistically and reconcile on the bus.

Inputs

The bus reads: - A connecting client's JWT (in the Authorization header during the upgrade or as a ?token= query param for browsers without header support). - The user's Player row (or User row for admin connections) for room membership: current_sector_id, team_id, region, role. - Server-side events from any service: combat, trading, governance, genesis, ARIA, chat. - Cross-process pub/sub from Redis (so multiple gameserver workers see each other's events).

The bus fires: - On connection upgrade — auth handshake. - On every authoritative state change in any service that broadcasts. - On periodic heartbeat — for liveness and presence verification. - On disconnect — cleanup + departure broadcasts.

Process

Connection lifecycle

Client                                   Server
  │ WS upgrade /api/v1/ws/connect          │
  │   (admins use /api/v1/ws/admin)        │
  │   ?token=JWT                           │
  ├───────────────────────────────────────►│
  │                                        │ verify JWT (same key as REST)
  │                                        │ load Player row
  │                                        │ if active connection exists for
  │                                        │   user_id → close old socket
  │                                        │ register in:
  │                                        │   active_connections[user_id]
  │                                        │   sector_connections[sector_id]
  │                                        │   team_connections[team_id]
  │ {type: "connected", session_id, ...}   │
  │◄───────────────────────────────────────┤
  │                                        │ broadcast to sector room:
  │                                        │   {type: "player_entered_sector"}
  │                                        │
  │ heartbeat every 30s                    │
  ├──────────►◄────────────────────────────┤
  │                                        │
  │ {type: "subscribe", topics:[...]}      │ optional explicit subscriptions
  ├───────────────────────────────────────►│ (e.g. market topic, admin feed)
  │                                        │
  │ ... events flow ...                    │
  │                                        │
  │ disconnect (close / network drop)      │
  │◄─────►◄────────────────────────────────┤
  │                                        │ scrub from rooms
  │                                        │ broadcast "player_left_sector"

Auth handshake

  1. Client opens a WebSocket against either /api/v1/ws/connect (player) or /api/v1/ws/admin (admin) with the same JWT used for REST. The token can be sent in the Authorization header during the upgrade or as a ?token= query parameter for browsers without header support.
  2. Server decodes and verifies (same key, same expiry).
  3. On failure, close with code 4001 (unauthorized — also reused for "superseded" connections; see Failure modes).
  4. On success, server stores user_data = {username, current_sector, team_id, region_id, ...} in connection_metadata.
  5. Single-connection-per-user: an existing socket for the same user_id is closed first (the new socket wins). Multi-device support via a device_id query param is 📐 Design-only.

Channel / topic model

Two layers: rooms (auto-assigned, follow player state) and topics (explicit subscribe).

Auto-assigned rooms: - global — all connections. - sector:{sector_id} — players currently in that sector. - team:{team_id} — team members. - region:{region_id} — regional citizens (governance, treaties). 📐 Design-only — no region_connections map exists in ConnectionManager today. - personal:{user_id} — single-recipient unicast. Single-recipient invariant (per ADR-0057 A-I1): a personal:{user_id} room has exactly one subscriber — the user themselves. Room-join is gated by the realtime gateway: a connection authenticated as user_X may only subscribe to personal:user_X. Cross-user subscription is rejected with ERR_AUTH_FORBIDDEN and logged as a security event. This invariant is load-bearing for ARIA's per-player privacy (ADR-0016) — nothing in any player's ARIA stream may surface in another player's session. - admin — admin connections only.

Explicit topics (client must subscribe): - market:{commodity} — high-frequency market deltas. ✅ Shipped (market_subscribe / market_unsubscribe in enhanced_websocket_service.py). - market:station:{station_id} — per-station deltas. ✅ Shipped via the same path. - combat:{combat_id} — round-by-round spectator stream. 📐 Design-only. - aria — personal ARIA events for this player. 📐 Design-only — current ARIA events route through personal:{user_id}.

Movement triggers a room hop: leave sector:{old}, join sector:{new}, broadcast departure + arrival events.

Broadcast vs unicast

Method Target Use Status
send_personal_message(user_id, msg) One user Trade confirms, ARIA replies, alerts.
broadcast_to_sector(sector_id, msg, exclude=) Sector room Ship arrivals, combat starts, market updates.
broadcast_to_team(team_id, msg, exclude=) Team room Team chat, fleet orders.
broadcast_to_region(region_id, msg, exclude=) Region room Election state, policy votes, treaties. 📐
broadcast_global(msg, exclude=) Everybody Server announcements.
publish_topic(topic, msg) Topic subscribers Market firehose, combat spectators. 📐 (a market-specific fanout via redis_pubsub_service exists; the generic publish_topic primitive does not)

Every send wraps in try/except; failed sends mark the connection for cleanup.

Presence

connection_metadata[user_id] carries last_heartbeat. A periodic sweeper: 1. Iterates active connections. 2. If now - last_heartbeat > 300s, close the socket and run disconnect. 3. Emits presence_updated so leader-board + sector-presence UIs update.

Reconnection

📐 Design-only. The replay-buffer and state_resync machinery below is the design target; today the gameserver does not maintain a per-user event ring buffer or emit state_resync. Clients reconnect with their JWT and re-fetch state from REST.

Clients reconnect with the same JWT plus an optional last_event_id: 1. Server registers the new socket (closing any stale one). 2. If last_event_id is recent (≤ 60 s), the server replays missed events from a per-user ring buffer. 3. Older gaps trigger a forced state_resync event, so the client re-fetches authoritative state via REST.

Server-pushed event taxonomy

Target taxonomy. The table below is the canonical event vocabulary the bus should converge to: granular phase events (combat_started/round/resolved rather than a single overloaded combat_update), dotted namespaces for room-scoped events (sector.player_joined), and dedicated event names for first-class domain transitions (genesis, governance, bounties, transactions, price alerts). Events marked 📐 are not yet emitted by the gameserver; getting there is tracked as tech debt against websocket_service.py and the services that should publish them. See "Current wire deviation" below for what code emits today.

Type Direction Payload (key fields) Status
connected server → one session_id, server_time, your_state
state_resync server → one reason (client should re-fetch) 📐
sector.player_joined / sector.player_left sector room user_id, username, sector_id 📐 (code emits underscored player_entered_sector / player_left_sector)
combat_started / combat_round / combat_resolved participants + sector combat_id, attacker_id, defender_id, round, deltas, result 📐 (code emits a single combat_update event)
ship_destroyed sector + global ship_id, owner_id, cause
market_update market topic station_id, commodity, buy, sell, trend, supply, demand
transaction_completed personal tx_id, station_id, commodity, units, total 📐
price_alert_triggered personal alert_id, commodity, condition, current_price 📐
bounty_placed / bounty_collected global + participants target_id, amount 📐
genesis_progress personal planet_id, percent, hours_remaining 📐
governance_event region room kind, election_id?, policy_id?, payload 📐
citadel.upgrade_cancelled personal (planet owner) planet_id, cancelled_upgrade, reason, lost_building, credits_refunded, at 📐 (per ADR-0059)
pirate.holding_evolved sector + region holding_id, from_tier, to_tier, formation_id, at 📐 (per ADR-0060)
pirate.region_cleansed region region_id, holdings_destroyed_count, cleansed_by_player_ids, at 📐 (per ADR-0060)
pirate.daughter_spawned sector + region parent_holding_id, daughter_holding_id, parent_zone, daughter_zone, at 📐 (per ADR-0060)
pirate.holding_captured sector + region + personal (capturer) holding_id, tier, captured_by_player_id, captured_by_team_id, at 📐 (per ADR-0060)
pirate.fleet_destroyed sector + region holding_id, fleet_size_destroyed, by_player_ids, at 📐 (per ADR-0060)
npc.role_promoted sector + region npc_id, sector_id, from_role, to_role, predecessor_npc_id, at 📐 (per ADR-0063 N-F1)
npc.coordinated_genocide_detected region + ops region_id, kills_in_window, window_seconds, marshal_npc_ids, at 📐 (per ADR-0063 N-V4)
aria_message personal message_id, role, content, attachments 📐 (code emits aria_response)
chat_message room (sector/team/global) channel, sender_id, sender_name, body, timestamp
notification personal kind, title, body, action_url
medal_awarded personal + team (if any) + sector (only when tier ≥ GOLD or category = UNIQUE) medal_id, name, tier, category, icon_key, shape_tier, awarded_at, awarded_via, citation, is_hidden 📐 (see ./medal-service.md)
medal_progress personal medal_id, counter_key, current, threshold, percent 📐 (fired at 25 / 50 / 75 / 90 / 99 % thresholds)
medal_revoked personal medal_id, reason, revoking_admin_username 📐
heartbeat / pong both server_time

Current wire deviation

The shipping services/gameserver/src/services/websocket_service.py emits the events listed above with the alternative names noted in the Status column, plus several events that are not yet first-class in the target taxonomy: economy:alert, system:announcement, subscription_confirmed, broadcast_sent, admin_intervention, heartbeat_ack. These will be folded into the target vocabulary (e.g. economy:alertprice_alert_triggered, system:announcement → broadcast notification) when the publisher refactor lands. Until then, frontends that need to listen to those events should subscribe by their current names.

Transactional outbox for multi-table operations

Per ADR-0054 X-V1, multi-table transactions that emit realtime events (cleanup orchestrator, takeover commit, gate destruction, holding capture) use the transactional outbox pattern to avoid ghost-state bugs on rollback.

def multi_table_operation():
    pending_events = []
    try:
        with transaction():
            # ... do work, append events to pending_events ...
            db.session.commit()
    except:
        log.exception(...)
        return  # pending_events discarded; events never leave server

    # Post-commit flush — best-effort
    for event_type, payload in pending_events:
        try:
            realtime_bus.emit(event_type, payload)
        except Exception:
            log.warning("Post-commit event emit failed", event_type=event_type)

When to use: any transaction that touches > 1 logical entity AND emits realtime events. Cleanup orchestrator, takeover, gate cascade, holding capture, planet-cascade-compensation are the canonical examples.

When NOT to use: single-table updates that emit one event (e.g., a player updates their own settings → one row write + one event). The direct emit pattern is fine — there's no rollback window long enough to matter.

Rationale: if the transaction rolls back after events have already broadcast, clients see UI state for changes that didn't commit on the server. Outbox flushes events only after commit succeeds; rollback discards them.

Cross-process fanout

Multiple gameserver workers run; the bus uses Redis pub/sub (redis_pubsub_service) for fanout: 1. Service emits an event by calling the bus method. 2. The local connection manager handles directly-connected sockets. 3. For users connected to a different worker, the message is published to a Redis channel (sw2102:bus). 4. Every worker subscribes; on receipt, each delivers to its locally connected sockets.

In multi-regional mode, a dedicated redis-nexus Redis instance handles cross-region traffic.

Performance targets

Per ADR-0051 SK28, these are launch SLOs validated by a pre-launch load test at staging on production-class hardware, simulating peak event volume (combat resolutions, sector-presence updates, market-tick fanout, ARIA notifications).

Target Value
Local (same-region) round-trip latency <50 ms
Cross-region round-trip latency <200 ms
Concurrent connections per gameserver 5,000
P99 message delivery latency <200 ms
Message throughput per server 1,000+ msg/sec
Per-connection memory footprint ~2 KB

If the load test fails to meet the connection or latency SLO, the decision splits two ways:

  • (a) Horizontal scale-out — multiple gameserver instances behind a connection-router that shards by user_id hash. Per-instance load drops; total capacity scales linearly with instance count. Strong default.
  • (b) Connection cap with queue — hard cap at the SLO threshold; new connections queue until a slot frees. Acceptable for non-critical players (read-only browsers); not acceptable for actively-playing subscribers.

The choice is deferred to test results.

Rate limits

Anti-abuse caps enforced inside enhanced_websocket_service.py. Exceeding any limit on an authenticated socket returns an error event with code: "rate_limited" and the offending command is dropped.

Limit Value Role
Per-connection message rate 100 msg/s binding
Trade commands 30 / min binding
AI / ARIA requests 60 / min advisory — set above the server-side ARIA cap (10 req/min, see ../OPERATIONS/aria.md#cost-rate-controls) so the rejection-with-canonical-reason-code always comes from the ARIA server, not the bus. The bus value exists to suppress runaway local input loops; player-facing rate-limit feedback originates from the server.
Topic subscriptions per socket 50 binding

Sustained violations escalate to a forced disconnect with close code 4002.

Outputs / state changes

The bus itself does not own persistent state. It mutates only ephemeral structures: - active_connections[user_id] — open sockets. - connection_metadata[user_id] — heartbeats, room assignments. - sector_connections[sector_id], team_connections[team_id], region_connections[region_id] — room rosters. - Per-user ring buffer of recent events (in Redis) for reconnection replay.

Indirectly, every event delivered triggers UI state changes on connected clients.

Invariants

✅ Shipped invariants

  1. A user has at most one active socket per user_id (the launch rule). Connecting from a second tab/device evicts the prior socket with close code 4001.
  2. Only authenticated sockets are added to any room.
  3. Server never trusts a client message for state mutation — clients send commands (validated and re-routed through REST handlers); state-changing pushes only originate from server services.
  4. Every send is non-blocking with respect to the originating service — failures are logged and queued for cleanup, not raised.
  5. Sector / team / region rooms are derived from authoritative DB state; a player who moves out of a sector is always removed from that room before an arrival event fires for the new sector.
  6. Monotonic event ordering within a single room — events delivered to subscribers of a given room arrive in the order they were emitted server-side.

📐 Design-only invariants

  1. Per-device_id socket cap (one active socket per device, regardless of user_id). Requires the 📐 Design-only device_id field on connection metadata; not enforced at launch — the per-user_id rule is the binding constraint until the device-fingerprinting layer ships.
  2. Replay buffers expire (TTL 60 s); on miss, the client resyncs from REST. Requires the 📐 Design-only replay-buffer service; at launch a missed event simply means the client's next REST poll picks up the state — no replay is offered.

Failure modes

Mode Target handling
Stale token expires mid-session On next message attempt, re-verify; on failure, close 4001 and force reconnect.
Worker crash Redis pub/sub keeps surviving workers in sync; affected sockets reconnect to a different worker.
Redis outage Local broadcast still works for same-worker users; cross-worker delivery degrades; state_resync triggered when Redis returns.
Slow client (back-pressure) Per-socket send queue with bounded size; overflow drops cosmetic events first (chat, presence) and forces state_resync.
Duplicate connect (same user, same device) Old socket closed cleanly with code 4001 ("superseded").
Heartbeat miss After 300 s of silence, server closes the socket; client detects and reconnects with replay.
Subscription spam Server caps subscriptions per socket (e.g. 50 topics); excess returns subscription_rejected.
Auth race during region transfer Connection is closed and re-handshaken when region_id changes.

Source map

Concern Path (target)
Connection manager services/gameserver/src/services/websocket_service.py:ConnectionManager
Enhanced features (rooms, topics, replay) services/gameserver/src/services/enhanced_websocket_service.py
Cross-process fanout services/gameserver/src/services/redis_pubsub_service.py
Redis client services/gameserver/src/services/redis_service.py
WS routes services/gameserver/src/api/routes/websocket.py, enhanced_websocket.py
Player-client primary services/player-client/src/services/realtimeWebSocket.ts
Player-client minimal services/player-client/src/services/websocket.ts
Player-client React provider services/player-client/src/contexts/WebSocketContext.tsx
Admin client services/admin-ui/src/services/websocket.ts
Multi-regional Redis redis-nexus instance (deployed via services/region-manager)