Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions internal/api/ws/hub.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,11 @@ type PortfolioData struct {

// BotStatusData represents the state of a trading bot.
type BotStatusData struct {
BotID string `json:"bot_id"`
Status string `json:"status"`
Message string `json:"message"`
BotID string `json:"bot_id"`
Status string `json:"status"`
Message string `json:"message"`
PNL float64 `json:"pnl"`
Timestamp int64 `json:"timestamp"`
Timestamp int64 `json:"timestamp"`
}

// Client represents a single WebSocket connection.
Expand Down
4 changes: 2 additions & 2 deletions internal/db/watchlist_repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ import (

// WatchlistItem is one row returned from a user's watchlist query.
type WatchlistItem struct {
Symbol string `json:"symbol"`
AddedAt time.Time `json:"added_at"`
Symbol string `json:"symbol"`
AddedAt time.Time `json:"added_at"`
}

// WatchlistRepo performs direct PostgreSQL reads/writes for user watchlists.
Expand Down
14 changes: 7 additions & 7 deletions internal/dbwriter/INTEGRATION_EXAMPLE.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,10 @@ func exampleMainIntegration() {
// Single call to start all database writers
dbWriterSvc = dbwriter.StartDBWriters(
ctx,
bus.Sub, // Redis subscriber
db.Pool, // PostgreSQL pool
symbolMap, // symbol → instrument_id map
allSymbols, // ["RELIANCE", "TCS", ...]
bus.Sub, // Redis subscriber
db.Pool, // PostgreSQL pool
symbolMap, // symbol → instrument_id map
allSymbols, // ["RELIANCE", "TCS", ...]
)
log.Println("Database writers initialized")

Expand Down Expand Up @@ -151,9 +151,9 @@ func advancedDBWriterSetup(

// Access repos for custom operations
repos := svc.GetRepositories()
_ = repos.Trade // Use for backfilled trade ingestion
_ = repos.Order // Use for order reconciliation
_ = repos.Error // Use for error replay
_ = repos.Trade // Use for backfilled trade ingestion
_ = repos.Order // Use for order reconciliation
_ = repos.Error // Use for error replay
// etc.

return svc
Expand Down
8 changes: 4 additions & 4 deletions internal/dbwriter/repos.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ func (er *ErrorRepository) InsertError(ctx context.Context, ev eventbus.ErrorEve
INSERT INTO system_event_log (event_type, severity, payload, created_at)
VALUES ($1, $2, $3, $4)
`,
"ERROR_" + ev.ErrorCode,
"ERROR_"+ev.ErrorCode,
ev.Severity,
payload,
ev.OccurredAt,
Expand Down Expand Up @@ -291,8 +291,8 @@ func (pr *PortfolioSnapshotRepository) InsertPortfolioSnapshot(ctx context.Conte
`,
ev.UserID,
decimal.New(ev.Cash, -2),
decimal.New(0, 0), // realized PnL (not in event)
decimal.New(ev.PnL, -2), // unrealized PnL
decimal.New(0, 0), // realized PnL (not in event)
decimal.New(ev.PnL, -2), // unrealized PnL
positions,
payload,
ev.UpdatedAt,
Expand Down Expand Up @@ -332,7 +332,7 @@ func (hr *HealthEventRepository) InsertHealthEvent(ctx context.Context, ev event
INSERT INTO system_event_log (event_type, severity, payload, created_at)
VALUES ($1, $2, $3, $4)
`,
"HEALTH_" + ev.ServiceName,
"HEALTH_"+ev.ServiceName,
"INFO",
payload,
ev.Timestamp,
Expand Down
60 changes: 30 additions & 30 deletions internal/dbwriter/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@ type Service struct {
portfolioWriter *PortfolioWriter

// Repositories handle actual database operations
tradeRepo *TradeRepository
orderRepo *OrderRepository
errorRepo *ErrorRepository
alertRepo *AlertRepository
botStatusRepo *BotStatusRepository
portfolioRepo *PortfolioSnapshotRepository
tradeRepo *TradeRepository
orderRepo *OrderRepository
errorRepo *ErrorRepository
alertRepo *AlertRepository
botStatusRepo *BotStatusRepository
portfolioRepo *PortfolioSnapshotRepository

subscriber *eventbus.Subscriber
}
Expand All @@ -50,12 +50,12 @@ func NewService(sub *eventbus.Subscriber, pool *pgxpool.Pool, symbolMap map[stri
botStatusWriter: NewBotStatusWriter(sub),
portfolioWriter: NewPortfolioWriter(sub),

tradeRepo: NewTradeRepository(pool, symbolMap),
orderRepo: NewOrderRepository(pool),
errorRepo: NewErrorRepository(pool),
alertRepo: NewAlertRepository(pool),
botStatusRepo: NewBotStatusRepository(pool),
portfolioRepo: NewPortfolioSnapshotRepository(pool),
tradeRepo: NewTradeRepository(pool, symbolMap),
orderRepo: NewOrderRepository(pool),
errorRepo: NewErrorRepository(pool),
alertRepo: NewAlertRepository(pool),
botStatusRepo: NewBotStatusRepository(pool),
portfolioRepo: NewPortfolioSnapshotRepository(pool),

subscriber: sub,
}
Expand Down Expand Up @@ -182,26 +182,26 @@ func (s *Service) StartErrorWriter(ctx context.Context) {
// GetRepositories returns all repository instances for direct use if needed.
// Useful for running manual queries or batch operations outside the event stream.
func (s *Service) GetRepositories() struct {
Trade *TradeRepository
Order *OrderRepository
Error *ErrorRepository
Alert *AlertRepository
BotStatus *BotStatusRepository
Portfolio *PortfolioSnapshotRepository
Trade *TradeRepository
Order *OrderRepository
Error *ErrorRepository
Alert *AlertRepository
BotStatus *BotStatusRepository
Portfolio *PortfolioSnapshotRepository
} {
return struct {
Trade *TradeRepository
Order *OrderRepository
Error *ErrorRepository
Alert *AlertRepository
BotStatus *BotStatusRepository
Portfolio *PortfolioSnapshotRepository
Trade *TradeRepository
Order *OrderRepository
Error *ErrorRepository
Alert *AlertRepository
BotStatus *BotStatusRepository
Portfolio *PortfolioSnapshotRepository
}{
Trade: s.tradeRepo,
Order: s.orderRepo,
Error: s.errorRepo,
Alert: s.alertRepo,
BotStatus: s.botStatusRepo,
Portfolio: s.portfolioRepo,
Trade: s.tradeRepo,
Order: s.orderRepo,
Error: s.errorRepo,
Alert: s.alertRepo,
BotStatus: s.botStatusRepo,
Portfolio: s.portfolioRepo,
}
}
12 changes: 6 additions & 6 deletions internal/dbwriter/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,9 +266,9 @@ func NewBotStatusWriter(sub *eventbus.Subscriber) *BotStatusWriter {
// or a global stream as bots are created.
//
// For a production system, recommend:
// 1. Maintaining a registry of active bot IDs
// 2. Dynamically subscribing to each bot's status channel
// 3. Persisting status changes for audit/replay purposes
// 1. Maintaining a registry of active bot IDs
// 2. Dynamically subscribing to each bot's status channel
// 3. Persisting status changes for audit/replay purposes
func (w *BotStatusWriter) StartGlobal(ctx context.Context, onBotStatus func(eventbus.BotStatusEvent) error) {
w.sub.ConsumeBotStatusGroup(ctx, "db-writer", "dbw-node-0", func(ctx context.Context, ev eventbus.BotStatusEvent) error {
if err := onBotStatus(ev); err != nil {
Expand Down Expand Up @@ -300,9 +300,9 @@ func NewPortfolioWriter(sub *eventbus.Subscriber) *PortfolioWriter {
// (internal/portfolio/manager.go) on a 2-second timer, not through the event bus.
//
// If you want to:
// 1. Audit every portfolio change → consume portfolio events from pub/sub
// 2. Maintain a complete history → subscribe to per-user portfolio topics
// 3. Trigger downstream analytics → use this writer
// 1. Audit every portfolio change → consume portfolio events from pub/sub
// 2. Maintain a complete history → subscribe to per-user portfolio topics
// 3. Trigger downstream analytics → use this writer
func (w *PortfolioWriter) StartGlobal(ctx context.Context, onPortfolio func(eventbus.PortfolioEvent) error) {
// Portfolio persistence is handled by internal/portfolio/manager.go (2-second flush timer).
// This writer is available for additional snapshot auditing if needed.
Expand Down
8 changes: 4 additions & 4 deletions internal/engine/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,10 @@ type LevelChange struct {
}

type Trade struct {
Price int `json:"price"`
Qty int `json:"qty"`
MakerOrderID int `json:"maker_order_id"`
TakerOrderID int `json:"taker_order_id"`
Price int `json:"price"`
Qty int `json:"qty"`
MakerOrderID int `json:"maker_order_id"`
TakerOrderID int `json:"taker_order_id"`
TimestampUnixNano int64 `json:"timestamp_unix_nano"`
}

Expand Down
4 changes: 2 additions & 2 deletions internal/eventbus/bus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,8 +224,8 @@ func TestStreamCandleClosed(t *testing.T) {
Symbol: "SOL",
Interval: "1s",
Open: 14500, High: 14600, Low: 14490, Close: 14550,
Volume: 120,
IsClosed: true,
Volume: 120,
IsClosed: true,
Timestamp: time.Now().UTC().Truncate(time.Second),
}
if err := pub.PublishCandle(ctx, want); err != nil {
Expand Down
30 changes: 15 additions & 15 deletions internal/eventbus/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,23 +14,23 @@ import (

const (
// --- Redis Streams (durable) ---
StreamTrades = "events:trades:stream:%s" // %s = symbol
StreamOrders = "events:orders:stream" // global
StreamCandles = "events:candle:stream:%s:%s" // %s = symbol, %s = interval
StreamErrors = "events:error:stream" // global durable error log
StreamTrades = "events:trades:stream:%s" // %s = symbol
StreamOrders = "events:orders:stream" // global
StreamCandles = "events:candle:stream:%s:%s" // %s = symbol, %s = interval
StreamErrors = "events:error:stream" // global durable error log
StreamAlerts = "events:alerts:stream"
StreamBotStatus = "events:bot:stream"

// --- Redis Pub/Sub (ephemeral) ---
PubSubOrderUser = "events:orders:pubsub:%s" // %s = userID
PubSubDepth = "events:depth:pubsub:%s" // %s = symbol
PubSubTicker = "events:ticker:pubsub:%s" // %s = symbol
PubSubCandle = "events:candle:pubsub:%s:%s" // %s = symbol, %s = interval
PubSubGBM = "events:gbm:pubsub:%s" // %s = symbol
PubSubPortfolio = "events:portfolio:pubsub:%s" // %s = userID
PubSubAlert = "events:alert:pubsub:%s" // %s = userID
PubSubBotStatus = "events:bot:pubsub:%s" // %s = botID
PubSubHealth = "events:health:pubsub:system" // singleton system channel
PubSubOrderUser = "events:orders:pubsub:%s" // %s = userID
PubSubDepth = "events:depth:pubsub:%s" // %s = symbol
PubSubTicker = "events:ticker:pubsub:%s" // %s = symbol
PubSubCandle = "events:candle:pubsub:%s:%s" // %s = symbol, %s = interval
PubSubGBM = "events:gbm:pubsub:%s" // %s = symbol
PubSubPortfolio = "events:portfolio:pubsub:%s" // %s = userID
PubSubAlert = "events:alert:pubsub:%s" // %s = userID
PubSubBotStatus = "events:bot:pubsub:%s" // %s = botID
PubSubHealth = "events:health:pubsub:system" // singleton system channel
)

// ---------------------------------------------------------------------------
Expand Down Expand Up @@ -106,15 +106,15 @@ func (p *Publisher) PublishPortfolio(ctx context.Context, ev PortfolioEvent) err

// Publish alert
func (p *Publisher) PublishAlert(ctx context.Context, ev AlertEvent) error {
_ = p.xadd(ctx, StreamAlerts, ev) // durable
_ = p.xadd(ctx, StreamAlerts, ev) // durable
return p.pubsub(ctx, fmt.Sprintf(PubSubAlert, ev.UserID), ev) // real-time
}

// --- Bot Lifecycle (Pub/Sub) ---

// Publish status
func (p *Publisher) PublishBotStatus(ctx context.Context, ev BotStatusEvent) error {
_ = p.xadd(ctx, StreamBotStatus, ev) // durable
_ = p.xadd(ctx, StreamBotStatus, ev) // durable
return p.pubsub(ctx, fmt.Sprintf(PubSubBotStatus, ev.BotID), ev) // real-time
}

Expand Down
4 changes: 2 additions & 2 deletions internal/eventbus/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ func (s *Subscriber) ConsumeCandlesBatchGroup(ctx context.Context, symbol, inter

// Phase 2: continuous batched polling for new messages.
var (
batch []CandleEvent
batch []CandleEvent
batchIDs []string // stream message IDs corresponding to batch entries
)

Expand Down Expand Up @@ -423,7 +423,7 @@ func (s *Subscriber) consumeStream(ctx context.Context, streamKey, group, consum
Group: group,
Consumer: consumer,
Streams: []string{streamKey, ">"},
Count: 50, // Batch size
Count: 50, // Batch size
Block: 2 * time.Second,
}).Result()

Expand Down
Loading
Loading