diff --git a/internal/api/ws/hub.go b/internal/api/ws/hub.go index c1aec98..3ea8cfc 100644 --- a/internal/api/ws/hub.go +++ b/internal/api/ws/hub.go @@ -77,11 +77,11 @@ type PortfolioData struct { // BotStatusData represents the state of a trading bot. type BotStatusData struct { - BotID string `json:"bot_id"` - Status string `json:"status"` - Message string `json:"message"` + BotID string `json:"bot_id"` + Status string `json:"status"` + Message string `json:"message"` PNL float64 `json:"pnl"` - Timestamp int64 `json:"timestamp"` + Timestamp int64 `json:"timestamp"` } // Client represents a single WebSocket connection. diff --git a/internal/db/watchlist_repo.go b/internal/db/watchlist_repo.go index b49dc6e..94e8ff6 100644 --- a/internal/db/watchlist_repo.go +++ b/internal/db/watchlist_repo.go @@ -8,8 +8,8 @@ import ( // WatchlistItem is one row returned from a user's watchlist query. type WatchlistItem struct { - Symbol string `json:"symbol"` - AddedAt time.Time `json:"added_at"` + Symbol string `json:"symbol"` + AddedAt time.Time `json:"added_at"` } // WatchlistRepo performs direct PostgreSQL reads/writes for user watchlists. diff --git a/internal/dbwriter/INTEGRATION_EXAMPLE.go b/internal/dbwriter/INTEGRATION_EXAMPLE.go index a10c16e..96c8480 100644 --- a/internal/dbwriter/INTEGRATION_EXAMPLE.go +++ b/internal/dbwriter/INTEGRATION_EXAMPLE.go @@ -56,10 +56,10 @@ func exampleMainIntegration() { // Single call to start all database writers dbWriterSvc = dbwriter.StartDBWriters( ctx, - bus.Sub, // Redis subscriber - db.Pool, // PostgreSQL pool - symbolMap, // symbol → instrument_id map - allSymbols, // ["RELIANCE", "TCS", ...] + bus.Sub, // Redis subscriber + db.Pool, // PostgreSQL pool + symbolMap, // symbol → instrument_id map + allSymbols, // ["RELIANCE", "TCS", ...] ) log.Println("Database writers initialized") @@ -151,9 +151,9 @@ func advancedDBWriterSetup( // Access repos for custom operations repos := svc.GetRepositories() - _ = repos.Trade // Use for backfilled trade ingestion - _ = repos.Order // Use for order reconciliation - _ = repos.Error // Use for error replay + _ = repos.Trade // Use for backfilled trade ingestion + _ = repos.Order // Use for order reconciliation + _ = repos.Error // Use for error replay // etc. return svc diff --git a/internal/dbwriter/repos.go b/internal/dbwriter/repos.go index 6b1a1d8..41006b9 100644 --- a/internal/dbwriter/repos.go +++ b/internal/dbwriter/repos.go @@ -158,7 +158,7 @@ func (er *ErrorRepository) InsertError(ctx context.Context, ev eventbus.ErrorEve INSERT INTO system_event_log (event_type, severity, payload, created_at) VALUES ($1, $2, $3, $4) `, - "ERROR_" + ev.ErrorCode, + "ERROR_"+ev.ErrorCode, ev.Severity, payload, ev.OccurredAt, @@ -291,8 +291,8 @@ func (pr *PortfolioSnapshotRepository) InsertPortfolioSnapshot(ctx context.Conte `, ev.UserID, decimal.New(ev.Cash, -2), - decimal.New(0, 0), // realized PnL (not in event) - decimal.New(ev.PnL, -2), // unrealized PnL + decimal.New(0, 0), // realized PnL (not in event) + decimal.New(ev.PnL, -2), // unrealized PnL positions, payload, ev.UpdatedAt, @@ -332,7 +332,7 @@ func (hr *HealthEventRepository) InsertHealthEvent(ctx context.Context, ev event INSERT INTO system_event_log (event_type, severity, payload, created_at) VALUES ($1, $2, $3, $4) `, - "HEALTH_" + ev.ServiceName, + "HEALTH_"+ev.ServiceName, "INFO", payload, ev.Timestamp, diff --git a/internal/dbwriter/service.go b/internal/dbwriter/service.go index 742655b..97c6e3a 100644 --- a/internal/dbwriter/service.go +++ b/internal/dbwriter/service.go @@ -20,12 +20,12 @@ type Service struct { portfolioWriter *PortfolioWriter // Repositories handle actual database operations - tradeRepo *TradeRepository - orderRepo *OrderRepository - errorRepo *ErrorRepository - alertRepo *AlertRepository - botStatusRepo *BotStatusRepository - portfolioRepo *PortfolioSnapshotRepository + tradeRepo *TradeRepository + orderRepo *OrderRepository + errorRepo *ErrorRepository + alertRepo *AlertRepository + botStatusRepo *BotStatusRepository + portfolioRepo *PortfolioSnapshotRepository subscriber *eventbus.Subscriber } @@ -50,12 +50,12 @@ func NewService(sub *eventbus.Subscriber, pool *pgxpool.Pool, symbolMap map[stri botStatusWriter: NewBotStatusWriter(sub), portfolioWriter: NewPortfolioWriter(sub), - tradeRepo: NewTradeRepository(pool, symbolMap), - orderRepo: NewOrderRepository(pool), - errorRepo: NewErrorRepository(pool), - alertRepo: NewAlertRepository(pool), - botStatusRepo: NewBotStatusRepository(pool), - portfolioRepo: NewPortfolioSnapshotRepository(pool), + tradeRepo: NewTradeRepository(pool, symbolMap), + orderRepo: NewOrderRepository(pool), + errorRepo: NewErrorRepository(pool), + alertRepo: NewAlertRepository(pool), + botStatusRepo: NewBotStatusRepository(pool), + portfolioRepo: NewPortfolioSnapshotRepository(pool), subscriber: sub, } @@ -182,26 +182,26 @@ func (s *Service) StartErrorWriter(ctx context.Context) { // GetRepositories returns all repository instances for direct use if needed. // Useful for running manual queries or batch operations outside the event stream. func (s *Service) GetRepositories() struct { - Trade *TradeRepository - Order *OrderRepository - Error *ErrorRepository - Alert *AlertRepository - BotStatus *BotStatusRepository - Portfolio *PortfolioSnapshotRepository + Trade *TradeRepository + Order *OrderRepository + Error *ErrorRepository + Alert *AlertRepository + BotStatus *BotStatusRepository + Portfolio *PortfolioSnapshotRepository } { return struct { - Trade *TradeRepository - Order *OrderRepository - Error *ErrorRepository - Alert *AlertRepository - BotStatus *BotStatusRepository - Portfolio *PortfolioSnapshotRepository + Trade *TradeRepository + Order *OrderRepository + Error *ErrorRepository + Alert *AlertRepository + BotStatus *BotStatusRepository + Portfolio *PortfolioSnapshotRepository }{ - Trade: s.tradeRepo, - Order: s.orderRepo, - Error: s.errorRepo, - Alert: s.alertRepo, - BotStatus: s.botStatusRepo, - Portfolio: s.portfolioRepo, + Trade: s.tradeRepo, + Order: s.orderRepo, + Error: s.errorRepo, + Alert: s.alertRepo, + BotStatus: s.botStatusRepo, + Portfolio: s.portfolioRepo, } } diff --git a/internal/dbwriter/writer.go b/internal/dbwriter/writer.go index a17ce61..5089e29 100644 --- a/internal/dbwriter/writer.go +++ b/internal/dbwriter/writer.go @@ -266,9 +266,9 @@ func NewBotStatusWriter(sub *eventbus.Subscriber) *BotStatusWriter { // or a global stream as bots are created. // // For a production system, recommend: -// 1. Maintaining a registry of active bot IDs -// 2. Dynamically subscribing to each bot's status channel -// 3. Persisting status changes for audit/replay purposes +// 1. Maintaining a registry of active bot IDs +// 2. Dynamically subscribing to each bot's status channel +// 3. Persisting status changes for audit/replay purposes func (w *BotStatusWriter) StartGlobal(ctx context.Context, onBotStatus func(eventbus.BotStatusEvent) error) { w.sub.ConsumeBotStatusGroup(ctx, "db-writer", "dbw-node-0", func(ctx context.Context, ev eventbus.BotStatusEvent) error { if err := onBotStatus(ev); err != nil { @@ -300,9 +300,9 @@ func NewPortfolioWriter(sub *eventbus.Subscriber) *PortfolioWriter { // (internal/portfolio/manager.go) on a 2-second timer, not through the event bus. // // If you want to: -// 1. Audit every portfolio change → consume portfolio events from pub/sub -// 2. Maintain a complete history → subscribe to per-user portfolio topics -// 3. Trigger downstream analytics → use this writer +// 1. Audit every portfolio change → consume portfolio events from pub/sub +// 2. Maintain a complete history → subscribe to per-user portfolio topics +// 3. Trigger downstream analytics → use this writer func (w *PortfolioWriter) StartGlobal(ctx context.Context, onPortfolio func(eventbus.PortfolioEvent) error) { // Portfolio persistence is handled by internal/portfolio/manager.go (2-second flush timer). // This writer is available for additional snapshot auditing if needed. diff --git a/internal/engine/types.go b/internal/engine/types.go index a68290d..83dbe11 100644 --- a/internal/engine/types.go +++ b/internal/engine/types.go @@ -49,10 +49,10 @@ type LevelChange struct { } type Trade struct { - Price int `json:"price"` - Qty int `json:"qty"` - MakerOrderID int `json:"maker_order_id"` - TakerOrderID int `json:"taker_order_id"` + Price int `json:"price"` + Qty int `json:"qty"` + MakerOrderID int `json:"maker_order_id"` + TakerOrderID int `json:"taker_order_id"` TimestampUnixNano int64 `json:"timestamp_unix_nano"` } diff --git a/internal/eventbus/bus_test.go b/internal/eventbus/bus_test.go index 79f12a8..f450623 100644 --- a/internal/eventbus/bus_test.go +++ b/internal/eventbus/bus_test.go @@ -224,8 +224,8 @@ func TestStreamCandleClosed(t *testing.T) { Symbol: "SOL", Interval: "1s", Open: 14500, High: 14600, Low: 14490, Close: 14550, - Volume: 120, - IsClosed: true, + Volume: 120, + IsClosed: true, Timestamp: time.Now().UTC().Truncate(time.Second), } if err := pub.PublishCandle(ctx, want); err != nil { diff --git a/internal/eventbus/publisher.go b/internal/eventbus/publisher.go index 1cef02d..dbfb47c 100644 --- a/internal/eventbus/publisher.go +++ b/internal/eventbus/publisher.go @@ -14,23 +14,23 @@ import ( const ( // --- Redis Streams (durable) --- - StreamTrades = "events:trades:stream:%s" // %s = symbol - StreamOrders = "events:orders:stream" // global - StreamCandles = "events:candle:stream:%s:%s" // %s = symbol, %s = interval - StreamErrors = "events:error:stream" // global durable error log + StreamTrades = "events:trades:stream:%s" // %s = symbol + StreamOrders = "events:orders:stream" // global + StreamCandles = "events:candle:stream:%s:%s" // %s = symbol, %s = interval + StreamErrors = "events:error:stream" // global durable error log StreamAlerts = "events:alerts:stream" StreamBotStatus = "events:bot:stream" // --- Redis Pub/Sub (ephemeral) --- - PubSubOrderUser = "events:orders:pubsub:%s" // %s = userID - PubSubDepth = "events:depth:pubsub:%s" // %s = symbol - PubSubTicker = "events:ticker:pubsub:%s" // %s = symbol - PubSubCandle = "events:candle:pubsub:%s:%s" // %s = symbol, %s = interval - PubSubGBM = "events:gbm:pubsub:%s" // %s = symbol - PubSubPortfolio = "events:portfolio:pubsub:%s" // %s = userID - PubSubAlert = "events:alert:pubsub:%s" // %s = userID - PubSubBotStatus = "events:bot:pubsub:%s" // %s = botID - PubSubHealth = "events:health:pubsub:system" // singleton system channel + PubSubOrderUser = "events:orders:pubsub:%s" // %s = userID + PubSubDepth = "events:depth:pubsub:%s" // %s = symbol + PubSubTicker = "events:ticker:pubsub:%s" // %s = symbol + PubSubCandle = "events:candle:pubsub:%s:%s" // %s = symbol, %s = interval + PubSubGBM = "events:gbm:pubsub:%s" // %s = symbol + PubSubPortfolio = "events:portfolio:pubsub:%s" // %s = userID + PubSubAlert = "events:alert:pubsub:%s" // %s = userID + PubSubBotStatus = "events:bot:pubsub:%s" // %s = botID + PubSubHealth = "events:health:pubsub:system" // singleton system channel ) // --------------------------------------------------------------------------- @@ -106,7 +106,7 @@ func (p *Publisher) PublishPortfolio(ctx context.Context, ev PortfolioEvent) err // Publish alert func (p *Publisher) PublishAlert(ctx context.Context, ev AlertEvent) error { - _ = p.xadd(ctx, StreamAlerts, ev) // durable + _ = p.xadd(ctx, StreamAlerts, ev) // durable return p.pubsub(ctx, fmt.Sprintf(PubSubAlert, ev.UserID), ev) // real-time } @@ -114,7 +114,7 @@ func (p *Publisher) PublishAlert(ctx context.Context, ev AlertEvent) error { // Publish status func (p *Publisher) PublishBotStatus(ctx context.Context, ev BotStatusEvent) error { - _ = p.xadd(ctx, StreamBotStatus, ev) // durable + _ = p.xadd(ctx, StreamBotStatus, ev) // durable return p.pubsub(ctx, fmt.Sprintf(PubSubBotStatus, ev.BotID), ev) // real-time } diff --git a/internal/eventbus/subscriber.go b/internal/eventbus/subscriber.go index 444a11a..305dc8e 100644 --- a/internal/eventbus/subscriber.go +++ b/internal/eventbus/subscriber.go @@ -218,7 +218,7 @@ func (s *Subscriber) ConsumeCandlesBatchGroup(ctx context.Context, symbol, inter // Phase 2: continuous batched polling for new messages. var ( - batch []CandleEvent + batch []CandleEvent batchIDs []string // stream message IDs corresponding to batch entries ) @@ -423,7 +423,7 @@ func (s *Subscriber) consumeStream(ctx context.Context, streamKey, group, consum Group: group, Consumer: consumer, Streams: []string{streamKey, ">"}, - Count: 50, // Batch size + Count: 50, // Batch size Block: 2 * time.Second, }).Result() diff --git a/internal/market/generator.go b/internal/market/generator.go index cc545b0..863283c 100644 --- a/internal/market/generator.go +++ b/internal/market/generator.go @@ -29,6 +29,8 @@ type Config struct { MaxQty int // maximum order quantity MaxOrdersPerSide int // max orders to place per side per tick AggressiveRate float64 // probability (0-1) of submitting a market order per tick + MaxActiveOrders int // max number of limit orders to keep alive + MaxDeviationPct float64 // cancel orders further than this percentage from the mid price (e.g. 5.0 for 5%) } // DefaultConfig returns production defaults matching the competition spec. @@ -45,9 +47,17 @@ func DefaultConfig() Config { MaxQty: 10, MaxOrdersPerSide: 5, AggressiveRate: 0.15, // 15% of ticks also submit a market order + MaxActiveOrders: 1000, + MaxDeviationPct: 10.0, } } +// TrackedOrder holds the order ID and its assigned price to allow realistic cancellations +type TrackedOrder struct { + ID int + Price float64 +} + // Generator produces synthetic market data using GBM and feeds orders // directly into the matching engine. It runs as a background goroutine. type Generator struct { @@ -59,8 +69,9 @@ type Generator struct { runMu sync.Mutex active bool - nextOrderID atomic.Int64 - rng *rand.Rand + nextOrderID atomic.Int64 + rng *rand.Rand + activeOrders []TrackedOrder // tracks active limit orders and their prices symbol string // set via SetSymbol for event payloads @@ -76,13 +87,21 @@ type Generator struct { // New creates a generator that will submit orders to the given engine handle. // The mutex must be the same one used by all other code touching this book. func New(cfg Config, book *engine.Handle, bookMu *sync.Mutex) *Generator { + if cfg.MaxActiveOrders <= 0 { + cfg.MaxActiveOrders = 1000 // safe fallback + } + if cfg.MaxDeviationPct <= 0 { + cfg.MaxDeviationPct = 10.0 // safe fallback + } + return &Generator{ cfg: cfg, book: book, mu: bookMu, stopCh: make(chan struct{}), // Each symbol's Generator has its own RNG → independent Z shocks vs other books. - rng: rand.New(rand.NewSource(time.Now().UnixNano())), + rng: rand.New(rand.NewSource(time.Now().UnixNano())), + activeOrders: make([]TrackedOrder, 0, cfg.MaxActiveOrders+20), } } @@ -252,7 +271,9 @@ func (g *Generator) submitOrders(basePrice float64) { } status, result := g.book.AddLimit(orderID, engine.SideBuy, qty, bidPriceCents) - if status != engine.StatusOK { + if status == engine.StatusOK { + g.activeOrders = append(g.activeOrders, TrackedOrder{ID: orderID, Price: bidPrice}) + } else { log.Printf("[market] bid failed: order=%d price=%d qty=%d err=%s", orderID, bidPriceCents, qty, status.Error()) continue @@ -276,7 +297,9 @@ func (g *Generator) submitOrders(basePrice float64) { } status, result := g.book.AddLimit(orderID, engine.SideSell, qty, askPriceCents) - if status != engine.StatusOK { + if status == engine.StatusOK { + g.activeOrders = append(g.activeOrders, TrackedOrder{ID: orderID, Price: askPrice}) + } else { log.Printf("[market] ask failed: order=%d price=%d qty=%d err=%s", orderID, askPriceCents, qty, status.Error()) continue @@ -287,6 +310,37 @@ func (g *Generator) submitOrders(basePrice float64) { } } + // 1. Price-anchored cancellation (using percentage slabs based on volatility) + if g.cfg.MaxDeviationPct > 0 { + var remainingOrders []TrackedOrder + maxDevAbs := basePrice * (g.cfg.MaxDeviationPct / 100.0) + for _, o := range g.activeOrders { + if math.Abs(o.Price-basePrice) > maxDevAbs { + status, _ := g.book.CancelLimit(o.ID) + if status != engine.StatusOK && status != engine.StatusNotFound { + log.Printf("[market] batch cancel failed: order=%d err=%s", o.ID, status.Error()) + } + } else { + remainingOrders = append(remainingOrders, o) + } + } + g.activeOrders = remainingOrders + } + + // 2. Secondary fallback limit to ensure memory doesn't explode in extreme scenarios + if len(g.activeOrders) > g.cfg.MaxActiveOrders { + excess := len(g.activeOrders) - g.cfg.MaxActiveOrders + for i := 0; i < excess; i++ { + orderID := g.activeOrders[i].ID + status, _ := g.book.CancelLimit(orderID) + if status != engine.StatusOK && status != engine.StatusNotFound { + log.Printf("[market] fallback cancel failed: order=%d err=%s", orderID, status.Error()) + } + } + // Shift remaining active orders left + g.activeOrders = append(g.activeOrders[:0], g.activeOrders[excess:]...) + } + // Occasionally submit a market order to simulate aggressive traders // taking liquidity. This creates actual trades in the book. // if g.cfg.AggressiveRate > 0 && g.rng.Float64() < g.cfg.AggressiveRate { diff --git a/internal/market/presets.go b/internal/market/presets.go index 07406b8..9d2d797 100644 --- a/internal/market/presets.go +++ b/internal/market/presets.go @@ -29,12 +29,22 @@ func IndianStockPresets() []SymbolPreset { MaxQty: 200, MaxOrdersPerSide: 5, AggressiveRate: 0.08, // ~1 market order/sec — enough for trades, not too many + MaxActiveOrders: 1000, } withPrice := func(symbol string, price float64, sigma float64) SymbolPreset { cfg := base cfg.InitialPrice = price cfg.Sigma = sigma + + // Set deviation slab based on volatility (sigma) + if sigma <= 0.18 { + cfg.MaxDeviationPct = 5.0 + } else if sigma <= 0.20 { + cfg.MaxDeviationPct = 10.0 + } else { + cfg.MaxDeviationPct = 20.0 + } return SymbolPreset{ Symbol: symbol, Class: Stock, diff --git a/internal/portfolio/types.go b/internal/portfolio/types.go index 0390365..a645339 100644 --- a/internal/portfolio/types.go +++ b/internal/portfolio/types.go @@ -4,45 +4,45 @@ import "time" // Position is an in-memory holding for one symbol inside a user's portfolio. type Position struct { - Symbol string // e.g. "BTCUSDT" - Quantity float64 // positive = long, negative = short - AvgEntry float64 // average cost basis - MarkPrice float64 // last traded price from engine - PnL float64 // (MarkPrice - AvgEntry) * Quantity - UpdatedAt time.Time + Symbol string // e.g. "BTCUSDT" + Quantity float64 // positive = long, negative = short + AvgEntry float64 // average cost basis + MarkPrice float64 // last traded price from engine + PnL float64 // (MarkPrice - AvgEntry) * Quantity + UpdatedAt time.Time } // Portfolio is the full in-memory state for one user. type Portfolio struct { - UserID string - PortfolioID int64 - TotalCash float64 // total cash (realised) - AvailableCash float64 // cash not locked by open orders - BlockedCash float64 // cash locked by pending buy orders (escrow) - Positions map[string]*Position // symbol -> position - UpdatedAt time.Time + UserID string + PortfolioID int64 + TotalCash float64 // total cash (realised) + AvailableCash float64 // cash not locked by open orders + BlockedCash float64 // cash locked by pending buy orders (escrow) + Positions map[string]*Position // symbol -> position + UpdatedAt time.Time } // Fill is a record of a completed trade, emitted to the WS. type Fill struct { Symbol string `json:"symbol"` - Side string `json:"side"` // "buy" | "sell" + Side string `json:"side"` // "buy" | "sell" Price float64 `json:"price"` Qty float64 `json:"qty"` - PnL float64 `json:"pnl"` // realised PnL if closing position + PnL float64 `json:"pnl"` // realised PnL if closing position Timestamp time.Time `json:"timestamp"` } // Snapshot is the full portfolio state sent over HTTP / WS. type Snapshot struct { - UserID string `json:"user_id"` - TotalCash float64 `json:"total_cash"` - AvailableCash float64 `json:"available_cash"` - BlockedCash float64 `json:"blocked_cash"` - Positions []PositionView `json:"positions"` - TotalPnL float64 `json:"total_pnl"` - Equity float64 `json:"equity"` // available_cash + blocked_cash + holdings_value - UpdatedAt time.Time `json:"updated_at"` + UserID string `json:"user_id"` + TotalCash float64 `json:"total_cash"` + AvailableCash float64 `json:"available_cash"` + BlockedCash float64 `json:"blocked_cash"` + Positions []PositionView `json:"positions"` + TotalPnL float64 `json:"total_pnl"` + Equity float64 `json:"equity"` // available_cash + blocked_cash + holdings_value + UpdatedAt time.Time `json:"updated_at"` } // PositionView is the JSON-safe view of a Position. diff --git a/internal/simbot/indicators.go b/internal/simbot/indicators.go index 5e15e70..b22f97d 100644 --- a/internal/simbot/indicators.go +++ b/internal/simbot/indicators.go @@ -118,9 +118,9 @@ func ComputeMACD(buf []float64, fastPeriod, slowPeriod, signalPeriod int) *MACDR // BollingerResult holds the three Bollinger Band values. type BollingerResult struct { - Upper float64 - Mid float64 - Lower float64 + Upper float64 + Mid float64 + Lower float64 } // ComputeBollinger returns Bollinger Bands (upper, mid, lower). diff --git a/internal/simbot/strategies.go b/internal/simbot/strategies.go index cf5d07a..392cf2e 100644 --- a/internal/simbot/strategies.go +++ b/internal/simbot/strategies.go @@ -22,11 +22,11 @@ package simbot type StrategyName string const ( - StrategyFlagshipV2 StrategyName = "flagship_v2" - StrategyBollingerMeanRevert StrategyName = "bollinger_mean_reversion" - StrategyMACDMomentum StrategyName = "macd_momentum" - StrategyRSIReversal StrategyName = "rsi_reversal" - StrategyFastEMATrend StrategyName = "fast_ema_trend" + StrategyFlagshipV2 StrategyName = "flagship_v2" + StrategyBollingerMeanRevert StrategyName = "bollinger_mean_reversion" + StrategyMACDMomentum StrategyName = "macd_momentum" + StrategyRSIReversal StrategyName = "rsi_reversal" + StrategyFastEMATrend StrategyName = "fast_ema_trend" StrategyMACDBollingerBreakout StrategyName = "macd_bollinger_breakout" ) @@ -68,9 +68,10 @@ func GetStrategy(name StrategyName) (StrategyGraph, string) { // sell when it touches the upper band (overbought). // // Signal wiring: -// priceFeed ─────────────────────────► threshBuyVal.value -// bollinger(20,2) ─ lower ──────────► threshBuy.value (price <= lower → buy) -// bollinger(20,2) ─ upper ──────────► threshSell.value (price >= upper → sell) +// +// priceFeed ─────────────────────────► threshBuyVal.value +// bollinger(20,2) ─ lower ──────────► threshBuy.value (price <= lower → buy) +// bollinger(20,2) ─ upper ──────────► threshSell.value (price >= upper → sell) // // NOTE: The NodeThreshold node takes `value` as the input to compare against the // fixed `value` param. Here we swap usage: we feed the dynamic band value into @@ -82,18 +83,23 @@ func GetStrategy(name StrategyName) (StrategyGraph, string) { // Practical approach: We wire it so BOTH the price AND the band flow through // the graph using two threshold nodes with a fixed reference price as the // secondary anchor. The key insight is that for the LOWER band we want: -// price <= lower_band +// +// price <= lower_band +// // We achieve this by feeding the lower band value as the threshold value and // the current price as the input, with operator ">=": -// inputs["value"] (price) >= params["value"] (lower_band_static)? — not dynamic. +// +// inputs["value"] (price) >= params["value"] (lower_band_static)? — not dynamic. // // Because NodeThreshold only supports a STATIC numeric param["value"], we use the // simpler and more robust approach of wiring a CROSSOVER node: -// * fastPort = current price -// * slowPort = lowerBand +// - fastPort = current price +// - slowPort = lowerBand +// // when price crosses DOWN through lower band → buy (crossDown) -// * fastPort = current price -// * slowPort = upperBand +// - fastPort = current price +// - slowPort = upperBand +// // when price crosses UP through upper band → sell (crossUp) // ────────────────────────────────────────────────────────────────────────────── func BollingerMeanReversionStrategy() StrategyGraph { @@ -148,13 +154,15 @@ func BollingerMeanReversionStrategy() StrategyGraph { // oversold). // // Signal wiring: -// MACD(12,26,9) → macdLine, signalLine -// crossover(macdLine, signalLine): -// crossUp → momentum turning bullish -// crossDown → momentum turning bearish -// RSI(14) >= 45 → momFilter (confirms upward bias for buy) -// AND(crossUp, momFilter) → buy -// AND(crossDown, rsiSell) → sell +// +// MACD(12,26,9) → macdLine, signalLine +// crossover(macdLine, signalLine): +// crossUp → momentum turning bullish +// crossDown → momentum turning bearish +// RSI(14) >= 45 → momFilter (confirms upward bias for buy) +// AND(crossUp, momFilter) → buy +// AND(crossDown, rsiSell) → sell +// // ────────────────────────────────────────────────────────────────────────────── func MACDMomentumStrategy() StrategyGraph { return StrategyGraph{ @@ -298,11 +306,14 @@ func FastEMATrendStrategy() StrategyGraph { // (4 units) to exploit the conviction. // // Signal wiring: -// MACD crossUp + price > bb.mid → buy -// MACD crossDown + price < bb.mid → sell +// +// MACD crossUp + price > bb.mid → buy +// MACD crossDown + price < bb.mid → sell // // For "price > bb.mid" we use the CrossOver node trick again: -// crossover(price, mid): crossUp = price just crossed above mid band +// +// crossover(price, mid): crossUp = price just crossed above mid band +// // But since we want a persistent signal (not just the crossing moment), we use // a Threshold node wired as: inputs["value"] (price) op params["value"] (static). // Because params["value"] can't be dynamic, we use the crossover trick to detect diff --git a/pkg/syslog/logger.go b/pkg/syslog/logger.go index 6a58cf2..4996207 100644 --- a/pkg/syslog/logger.go +++ b/pkg/syslog/logger.go @@ -73,13 +73,13 @@ const ( // Event is a single structured system log entry. type Event struct { - ServiceName string `json:"service_name"` - EventType EventType `json:"event_type"` - Severity Severity `json:"severity"` - Message string `json:"message"` - Details map[string]any `json:"details,omitempty"` - DurationMs int `json:"duration_ms,omitempty"` - Timestamp time.Time `json:"timestamp"` + ServiceName string `json:"service_name"` + EventType EventType `json:"event_type"` + Severity Severity `json:"severity"` + Message string `json:"message"` + Details map[string]any `json:"details,omitempty"` + DurationMs int `json:"duration_ms,omitempty"` + Timestamp time.Time `json:"timestamp"` } // ---------------------------------------------------------------------------