Skip to content

feat: pubsub#5435

Draft
nugaon wants to merge 49 commits intomasterfrom
feat/pubsub
Draft

feat: pubsub#5435
nugaon wants to merge 49 commits intomasterfrom
feat/pubsub

Conversation

@nugaon
Copy link
Copy Markdown
Member

@nugaon nugaon commented Apr 14, 2026

pubsub

A brokered publish/subscribe protocol over Bee's p2p layer, exposed to end-users via WebSocket.

Overview

One node acts as a broker: it accepts p2p streams, validates incoming messages, and fans them out to all connected receivers. Other nodes connect as either a publisher (publish + receive) or a subscriber (receive only).

Connection flow

WebSocket client
      │  HTTP GET /pubsub/{topic}  (broker peer multiaddr in HTTP request headers)
      ▼
api/pubsub.go  ──►  pubsub.Service.Connect  ──►  mode.Connect (opens p2p stream)
                                                         │
                                               Broker node — brokerHandler
                                                     ├─ readwrite=1  ──►  handlePublisher
                                                     └─ readwrite=0  ──►  handleSubscriber

The connecting node receives a SubscriberConn which is bridged to the WebSocket: inbound WS frames are forwarded to the p2p stream (publishers only); p2p frames from the broker are decoded and forwarded to the WS client (all connections).

Mode system

Protocol-specific behaviour is isolated behind a Mode interface. The Service is mode-agnostic: it delegates header construction, message reading/validation, and broadcast formatting to the active mode. New protocol variants can be added by implementing Mode and registering a mode ID in newMode.

GSOC Ephemeral mode (mode 1)

Messages are Single Owner Chunks (SOC) signed by the holder of the topic's private key. The topic address is derived from the SOC owner public key and an arbitrary topic ID, so only the key holder(s) can publish and the messages carry full SOC wrapping.

Publisher → Broker (p2p)

[ sig: 65 B ][ span: 8 B LE ][ payload: up to 4 KB ]

The broker verifies the ECDSA signature on every message before broadcasting.

Broker → Subscriber (p2p)

Every broker frame begins with a 1-byte message type. 0x01 is reserved at the service level and is valid across all modes; all other type bytes are mode-specific and decoded by the active mode implementation.

First byte Level Meaning Payload
0x01 service Ping (no fields — keepalive sent every 30 s)
0x02 GSOC mode Handshake + data [SOC ID: 32 B][owner: 20 B][sig: 65 B][span: 8 B][payload: N B]
0x03 GSOC mode Data only [sig: 65 B][span: 8 B][payload: N B]

The handshake frame (SOC identity metadata) is sent once per topic on the first broadcast. Subsequent messages are data-only. Ping frames are consumed by readServiceMessage before mode dispatch and never surfaced to the WebSocket client.

WebSocket (both directions)

Both the publisher (inbound) and subscriber (outbound) see the raw SOC payload:

[ sig: 65 B ][ span: 8 B ][ payload: N B ]

The node handles all p2p framing and signature verification transparently.

Multi-WebSocket multiplexer

Multiple WebSocket sessions to the same topic on the same node share a single p2p stream to the broker via SubscriberConn:

  • CreateSubscriberConn increments a ref count instead of opening a new stream when one already exists
  • A single runMux goroutine fans out incoming broker messages to all per-session channels; on stream error it immediately clears the shared conn so new sessions get a fresh stream
  • RemoveSubscriberConn decrements refs; FullClose is called exactly once when refs reach zero

API

Method Path Description
GET (WS upgrade) /pubsub/{topic} Connect to a topic as subscriber or publisher
GET /pubsub/ List active topics with role and connection count

Headers for /pubsub/{topic}:

  • Swarm-Pubsub-Peer (required): multiaddr of the broker peer
  • Swarm-Pubsub-Gsoc-Eth-Address + Swarm-Pubsub-Gsoc-Topic (optional): upgrade to publisher role

Configuration

Flag Default Description
--pubsub-broker-mode false Enable broker role on this node
--pubsub-max-connections 0 (unlimited) Max simultaneous subscriber streams the broker accepts

The broker reserves inbound stream slots from the global libp2p budget rather than stacking on top of it. Subscriber nodes use ConnectAllowLight when dialing the broker, so broker nodes can run in light-node mode.

Extensibility

  • New modes — implement Mode for a different message format and register a mode ID.
  • Access controlvalidatePublisher can enforce allow-lists, or stake checks.
  • ReplayhandleSubscriber can replay on connect.
  • WASM / event-driven clientsbroadcast can forward to an event emitter, making the broker embeddable in a WASM-compiled client without a separate p2p layer.
  • Per-subscriber transformsformatBroadcast receives the individual brokerSubscriber, enabling selective filtering or transformation.

Checklist

  • I have read the coding guide.
  • My change requires a documentation update, and I have done it.
  • I have added tests to cover my changes.
  • I have filled out the description and linked the related issues.

Description

Open API Spec Version Changes (if applicable)

Motivation and Context (Optional)

Related Issue (Optional)

Screenshots (if appropriate):

AI Disclosure

  • This PR contains code that has been generated by an LLM.
  • I have reviewed the AI generated code thoroughly.
  • I possess the technical expertise to responsibly review the code generated in this PR.

Comment thread pkg/pubsub/mode.go Outdated
Comment thread pkg/pubsub/mode.go Outdated
Comment thread pkg/pubsub/pubsub.go Outdated
defer bc.mu.Unlock()

for _, sub := range bc.subscribers {
msg := bc.mode.FormatBroadcast(bc, sub, rawMsg)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dont think this is needed inside the fanout loop. Why would you reserialise for every subscriber?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the message serialization varies between subscribers e.g. one of them did not get the SOC ID and address for message verification, another subscriber already got it. Then, these metadata will be either attached or not in the message payload.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why would it vary between subscribers? In fact it must be the same, otherwise same msg can be repeated.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants