diff --git a/diode-server/cmd/reconciler/main.go b/diode-server/cmd/reconciler/main.go index aa643e6a..c0a8740d 100644 --- a/diode-server/cmd/reconciler/main.go +++ b/diode-server/cmd/reconciler/main.go @@ -140,12 +140,14 @@ func main() { repository := postgres.NewRepository(dbPool) - // Initialize graph Service if graph DB feature is enabled - var graphServiceOpt reconciler.ProcessorOption + // Build a fresh graph.Service per call when the graph DB feature is + // enabled. The processor invokes this once per worker goroutine because + // graph.Service holds per-call mutable state on its receiver and is + // documented as not safe for concurrent use. + var graphServiceFactory reconciler.GraphServiceFactory if cfg.EnableGraphDB { - s.Logger().Info("graph DB feature enabled, initializing graph Service") + s.Logger().Info("graph DB feature enabled, initializing graph Service factory") - // Load matching configuration if provided var matchingConfig *matching.Config if cfg.EntityMatchingConfigPath != "" { var err error @@ -160,28 +162,24 @@ func main() { } } - // Create graph repository adapter graphRepo := postgres.NewGraphRepository(dbPool) - - // Create graph Service with repository - var opts []graph.Option - if matchingConfig != nil { - opts = append(opts, graph.WithMatchingConfig(matchingConfig)) - - // Create entity matcher for confidence-based matching - matcherConfig := &matching.EntityMatchingConfig{ - Rules: matchingConfig.GetFinalRules(), - GlobalMinConf: matching.MatchConfidence(matchingConfig.GlobalSettings.DefaultMinConfidence), - EnableFallback: true, - CacheResults: true, - MaxCacheSize: 1000, + graphServiceFactory = func() reconciler.GraphEntityUpserter { + var opts []graph.Option + if matchingConfig != nil { + opts = append(opts, graph.WithMatchingConfig(matchingConfig)) + + matcherConfig := &matching.EntityMatchingConfig{ + Rules: matchingConfig.GetFinalRules(), + GlobalMinConf: matching.MatchConfidence(matchingConfig.GlobalSettings.DefaultMinConfidence), + EnableFallback: true, + CacheResults: true, + MaxCacheSize: 1000, + } + entityMatcher := entitymatcher.NewMatcher(graphRepo, matcherConfig, s.Logger()) + opts = append(opts, graph.WithEntityMatcher(entityMatcher)) } - entityMatcher := entitymatcher.NewMatcher(graphRepo, matcherConfig, s.Logger()) - opts = append(opts, graph.WithEntityMatcher(entityMatcher)) + return graph.NewService(graphRepo, s.Logger(), opts...) } - graphService := graph.NewService(graphRepo, s.Logger(), opts...) - - graphServiceOpt = reconciler.WithGraphService(graphService) } diodeToNetBoxMaxRetries := 3 @@ -206,13 +204,7 @@ func main() { ops := reconciler.NewOps(repository, nbClient, s.Logger(), nil) ops.Start(ctx) - // Build processor options - var processorOpts []reconciler.ProcessorOption - if graphServiceOpt != nil { - processorOpts = append(processorOpts, graphServiceOpt) - } - - ingestionProcessor, err := reconciler.NewIngestionProcessor(ctx, s.Logger(), cfg, redisClient, redisStreamClient, reconciler.DefaultRedisStreamID, reconciler.DefaultRedisConsumerGroup, ops, metricRecorder, processorOpts...) + ingestionProcessor, err := reconciler.NewIngestionProcessor(ctx, s.Logger(), cfg, redisClient, redisStreamClient, reconciler.DefaultRedisStreamID, reconciler.DefaultRedisConsumerGroup, ops, metricRecorder) if err != nil { s.Logger().Error("failed to instantiate ingestion processor", "error", err) metricRecorder.RecordServiceStartupAttempt(ctx, false) @@ -260,6 +252,22 @@ func main() { } } + // Graph-upsert runs as an independent processor when the feature is + // enabled — it polls ingestion_logs for rows that haven't been mirrored + // into the graph DB yet and drains them at its own pace, so a graph-DB + // outage cannot block plan/apply or the consume loop. + if graphServiceFactory != nil { + if err := repository.ResetClaimedGraphUpserts(ctx); err != nil { + s.Logger().Error("failed to reset stale graph-upsert claims", "error", err) + } + graphUpsertProcessor := reconciler.NewGraphUpsertProcessor(s.Logger(), cfg, repository, metricRecorder, graphServiceFactory, backpressure) + if err := s.RegisterComponent(graphUpsertProcessor); err != nil { + s.Logger().Error("failed to register graph upsert processor", "error", err) + metricRecorder.RecordServiceStartupAttempt(ctx, false) + os.Exit(1) + } + } + authorizer := authutil.NewContextAuthorizer(s.Logger()) gRPCServer, err := reconciler.NewServer(ctx, s.Logger(), repository, serverInterceptors(authorizer, s.Logger())...) if err != nil { diff --git a/diode-server/dbstore/postgres/migrations/20260518000001_add_graph_upsert_tracking.sql b/diode-server/dbstore/postgres/migrations/20260518000001_add_graph_upsert_tracking.sql new file mode 100644 index 00000000..9a38b83d --- /dev/null +++ b/diode-server/dbstore/postgres/migrations/20260518000001_add_graph_upsert_tracking.sql @@ -0,0 +1,34 @@ +-- +goose Up + +-- Track graph-upsert progress per ingestion log. Graph upsert runs on a +-- separate processor (GraphUpsertProcessor) that is independent of the +-- ingestion state machine; using its own columns keeps the two concerns +-- decoupled and lets graph upsert run on rows in any state. +-- +-- graph_upserted_at — set when graph upsert completes successfully +-- graph_upsert_attempts — number of attempts so far (terminal at the +-- processor-configured max) +-- graph_upsert_claimed_at — soft lease for the worker that currently owns +-- the row; cleared on startup so a crashed worker +-- does not leak its claim +ALTER TABLE ingestion_logs + ADD COLUMN graph_upserted_at TIMESTAMP WITH TIME ZONE, + ADD COLUMN graph_upsert_attempts INTEGER NOT NULL DEFAULT 0, + ADD COLUMN graph_upsert_claimed_at TIMESTAMP WITH TIME ZONE; + +-- Partial index for the claim query: only un-upserted, un-claimed rows are +-- ever scanned. Terminal-failed rows fall out of the index once attempts +-- exceeds the processor's max-attempts setting; the WHERE clause in the +-- claim query handles that filter. +CREATE INDEX IF NOT EXISTS idx_ingestion_logs_graph_upsert_pending + ON ingestion_logs (id) + WHERE graph_upserted_at IS NULL AND graph_upsert_claimed_at IS NULL; + +-- +goose Down + +DROP INDEX IF EXISTS idx_ingestion_logs_graph_upsert_pending; + +ALTER TABLE ingestion_logs + DROP COLUMN IF EXISTS graph_upsert_claimed_at, + DROP COLUMN IF EXISTS graph_upsert_attempts, + DROP COLUMN IF EXISTS graph_upserted_at; diff --git a/diode-server/dbstore/postgres/queries/ingestion_logs.sql b/diode-server/dbstore/postgres/queries/ingestion_logs.sql index 3aa7588e..a16e8d8d 100644 --- a/diode-server/dbstore/postgres/queries/ingestion_logs.sql +++ b/diode-server/dbstore/postgres/queries/ingestion_logs.sql @@ -141,3 +141,45 @@ RETURNING *; UPDATE ingestion_logs SET state = 1 WHERE state = 8; + +-- name: ClaimGraphUpsertCandidates :many +-- Claim a batch of ingestion logs that have not yet been upserted into the +-- graph DB, marking them as claimed by setting graph_upsert_claimed_at and +-- incrementing graph_upsert_attempts. Rows past max_attempts are skipped and +-- left in their terminal-failed state. +UPDATE ingestion_logs +SET graph_upsert_claimed_at = CURRENT_TIMESTAMP, + graph_upsert_attempts = graph_upsert_attempts + 1 +WHERE id IN ( + SELECT id FROM ingestion_logs + WHERE graph_upserted_at IS NULL + AND graph_upsert_claimed_at IS NULL + AND graph_upsert_attempts < sqlc.arg('max_attempts')::int4 + ORDER BY id + LIMIT sqlc.arg('batch_size') + FOR UPDATE SKIP LOCKED +) +RETURNING *; + +-- name: MarkGraphUpserted :exec +-- Mark a batch of ingestion logs as successfully upserted into the graph DB. +UPDATE ingestion_logs +SET graph_upserted_at = CURRENT_TIMESTAMP, + graph_upsert_claimed_at = NULL +WHERE id = ANY(@ids::int4[]); + +-- name: ReleaseGraphUpsertClaims :exec +-- Release graph-upsert claims for a batch of ingestion logs without marking +-- them as upserted. Used when an attempt fails but more retries are still +-- allowed by max_attempts. +UPDATE ingestion_logs +SET graph_upsert_claimed_at = NULL +WHERE id = ANY(@ids::int4[]); + +-- name: ResetClaimedGraphUpserts :exec +-- Clear stale graph-upsert claims on startup so rows held by a crashed +-- worker are re-claimable. Idempotent — safe to run on every startup. +UPDATE ingestion_logs +SET graph_upsert_claimed_at = NULL +WHERE graph_upsert_claimed_at IS NOT NULL + AND graph_upserted_at IS NULL; diff --git a/diode-server/dbstore/postgres/repository.go b/diode-server/dbstore/postgres/repository.go index 22fabb4b..0e5dab92 100644 --- a/diode-server/dbstore/postgres/repository.go +++ b/diode-server/dbstore/postgres/repository.go @@ -785,3 +785,62 @@ func (r *Repository) ClaimQueuedForAutoApply(ctx context.Context, batchSize int3 func (r *Repository) ResetApplyingIngestionLogs(ctx context.Context) error { return r.queries.ResetApplyingIngestionLogs(ctx) } + +// ClaimGraphUpsertCandidates claims a batch of ingestion logs that have not +// yet been upserted into the graph DB. Each claimed row has its claim +// timestamp set and its attempt counter incremented; the caller is then +// responsible for finalizing each claim via MarkGraphUpserted or +// ReleaseGraphUpsertClaims. +// +// SourceMetadata is populated on each returned QueuedIngestionLog so the +// graph upsert path can recover the original IngestRequest.metadata blob +// that was stashed at ingest time. +func (r *Repository) ClaimGraphUpsertCandidates(ctx context.Context, batchSize, maxAttempts int32) ([]ops.QueuedIngestionLog, error) { + dbLogs, err := r.queries.ClaimGraphUpsertCandidates(ctx, postgres.ClaimGraphUpsertCandidatesParams{ + MaxAttempts: maxAttempts, + BatchSize: batchSize, + }) + if err != nil { + return nil, fmt.Errorf("failed to claim graph-upsert candidates: %w", err) + } + + result := make([]ops.QueuedIngestionLog, 0, len(dbLogs)) + for _, dbLog := range dbLogs { + log, err := dbLog.ToProto() + if err != nil { + return nil, fmt.Errorf("failed to convert ingestion log %d to proto: %w", dbLog.ID, err) + } + result = append(result, ops.QueuedIngestionLog{ + ID: dbLog.ID, + IngestionLog: log, + SourceMetadata: dbLog.SourceMetadata, + }) + } + return result, nil +} + +// MarkGraphUpserted marks the given ingestion logs as successfully upserted +// into the graph DB and releases their claims. +func (r *Repository) MarkGraphUpserted(ctx context.Context, ids []int32) error { + if len(ids) == 0 { + return nil + } + return r.queries.MarkGraphUpserted(ctx, ids) +} + +// ReleaseGraphUpsertClaims clears the claim on a batch of ingestion logs +// without marking them as upserted, so they can be re-claimed on the next +// polling cycle (subject to max-attempts). +func (r *Repository) ReleaseGraphUpsertClaims(ctx context.Context, ids []int32) error { + if len(ids) == 0 { + return nil + } + return r.queries.ReleaseGraphUpsertClaims(ctx, ids) +} + +// ResetClaimedGraphUpserts clears stale graph-upsert claims held by workers +// that died between claiming and finalizing. Idempotent — safe to run on +// every startup. +func (r *Repository) ResetClaimedGraphUpserts(ctx context.Context) error { + return r.queries.ResetClaimedGraphUpserts(ctx) +} diff --git a/diode-server/docker/sample.env b/diode-server/docker/sample.env index 64c46d73..70a3ed20 100644 --- a/diode-server/docker/sample.env +++ b/diode-server/docker/sample.env @@ -48,3 +48,8 @@ INGESTION_LOG_PROCESSOR_BATCH_SIZE=50 INGESTION_LOG_PROCESSOR_CONCURRENCY=1 AUTO_APPLY_PROCESSOR_BATCH_SIZE=50 AUTO_APPLY_PROCESSOR_CONCURRENCY=1 + +# Graph upsert processor (only runs when ENABLE_GRAPH_DB=true) +# GRAPH_UPSERT_PROCESSOR_BATCH_SIZE=100 +# GRAPH_UPSERT_PROCESSOR_CONCURRENCY=1 +# GRAPH_UPSERT_PROCESSOR_MAX_ATTEMPTS=5 diff --git a/diode-server/gen/dbstore/postgres/ingestion_logs.sql.go b/diode-server/gen/dbstore/postgres/ingestion_logs.sql.go index 3130d9da..1cce5a1b 100644 --- a/diode-server/gen/dbstore/postgres/ingestion_logs.sql.go +++ b/diode-server/gen/dbstore/postgres/ingestion_logs.sql.go @@ -62,6 +62,74 @@ func (q *Queries) BulkUpdateIngestionLogStates(ctx context.Context, arg BulkUpda return err } +const claimGraphUpsertCandidates = `-- name: ClaimGraphUpsertCandidates :many +UPDATE ingestion_logs +SET graph_upsert_claimed_at = CURRENT_TIMESTAMP, + graph_upsert_attempts = graph_upsert_attempts + 1 +WHERE id IN ( + SELECT id FROM ingestion_logs + WHERE graph_upserted_at IS NULL + AND graph_upsert_claimed_at IS NULL + AND graph_upsert_attempts < $1::int4 + ORDER BY id + LIMIT $2 + FOR UPDATE SKIP LOCKED +) +RETURNING id, external_id, object_type, state, request_id, ingestion_ts, source_ts, producer_app_name, producer_app_version, sdk_name, sdk_version, entity, error, source_metadata, created_at, updated_at, entity_hash, last_seen, duplicate_count, graph_upserted_at, graph_upsert_attempts, graph_upsert_claimed_at +` + +type ClaimGraphUpsertCandidatesParams struct { + MaxAttempts int32 `json:"max_attempts"` + BatchSize int32 `json:"batch_size"` +} + +// Claim a batch of ingestion logs that have not yet been upserted into the +// graph DB, marking them as claimed by setting graph_upsert_claimed_at and +// incrementing graph_upsert_attempts. Rows past max_attempts are skipped and +// left in their terminal-failed state. +func (q *Queries) ClaimGraphUpsertCandidates(ctx context.Context, arg ClaimGraphUpsertCandidatesParams) ([]IngestionLog, error) { + rows, err := q.db.Query(ctx, claimGraphUpsertCandidates, arg.MaxAttempts, arg.BatchSize) + if err != nil { + return nil, err + } + defer rows.Close() + var items []IngestionLog + for rows.Next() { + var i IngestionLog + if err := rows.Scan( + &i.ID, + &i.ExternalID, + &i.ObjectType, + &i.State, + &i.RequestID, + &i.IngestionTs, + &i.SourceTs, + &i.ProducerAppName, + &i.ProducerAppVersion, + &i.SdkName, + &i.SdkVersion, + &i.Entity, + &i.Error, + &i.SourceMetadata, + &i.CreatedAt, + &i.UpdatedAt, + &i.EntityHash, + &i.LastSeen, + &i.DuplicateCount, + &i.GraphUpsertedAt, + &i.GraphUpsertAttempts, + &i.GraphUpsertClaimedAt, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + const claimQueuedForAutoApply = `-- name: ClaimQueuedForAutoApply :many UPDATE ingestion_logs SET state = 8 @@ -72,7 +140,7 @@ WHERE id IN ( LIMIT $1 FOR UPDATE SKIP LOCKED ) -RETURNING id, external_id, object_type, state, request_id, ingestion_ts, source_ts, producer_app_name, producer_app_version, sdk_name, sdk_version, entity, error, source_metadata, created_at, updated_at, entity_hash, last_seen, duplicate_count +RETURNING id, external_id, object_type, state, request_id, ingestion_ts, source_ts, producer_app_name, producer_app_version, sdk_name, sdk_version, entity, error, source_metadata, created_at, updated_at, entity_hash, last_seen, duplicate_count, graph_upserted_at, graph_upsert_attempts, graph_upsert_claimed_at ` // Claim a batch of QUEUED ingestion logs for the AutoApplyProcessor (combined @@ -108,6 +176,9 @@ func (q *Queries) ClaimQueuedForAutoApply(ctx context.Context, batchSize int32) &i.EntityHash, &i.LastSeen, &i.DuplicateCount, + &i.GraphUpsertedAt, + &i.GraphUpsertAttempts, + &i.GraphUpsertClaimedAt, ); err != nil { return nil, err } @@ -129,7 +200,7 @@ WHERE id IN ( LIMIT $1 FOR UPDATE SKIP LOCKED ) -RETURNING id, external_id, object_type, state, request_id, ingestion_ts, source_ts, producer_app_name, producer_app_version, sdk_name, sdk_version, entity, error, source_metadata, created_at, updated_at, entity_hash, last_seen, duplicate_count +RETURNING id, external_id, object_type, state, request_id, ingestion_ts, source_ts, producer_app_name, producer_app_version, sdk_name, sdk_version, entity, error, source_metadata, created_at, updated_at, entity_hash, last_seen, duplicate_count, graph_upserted_at, graph_upsert_attempts, graph_upsert_claimed_at ` func (q *Queries) ClaimQueuedIngestionLogs(ctx context.Context, batchSize int32) ([]IngestionLog, error) { @@ -161,6 +232,9 @@ func (q *Queries) ClaimQueuedIngestionLogs(ctx context.Context, batchSize int32) &i.EntityHash, &i.LastSeen, &i.DuplicateCount, + &i.GraphUpsertedAt, + &i.GraphUpsertAttempts, + &i.GraphUpsertClaimedAt, ); err != nil { return nil, err } @@ -207,7 +281,7 @@ const createIngestionLog = `-- name: CreateIngestionLog :one INSERT INTO ingestion_logs (external_id, object_type, state, request_id, ingestion_ts, source_ts, producer_app_name, producer_app_version, sdk_name, sdk_version, entity, source_metadata, entity_hash) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13) -RETURNING id, external_id, object_type, state, request_id, ingestion_ts, source_ts, producer_app_name, producer_app_version, sdk_name, sdk_version, entity, error, source_metadata, created_at, updated_at, entity_hash, last_seen, duplicate_count +RETURNING id, external_id, object_type, state, request_id, ingestion_ts, source_ts, producer_app_name, producer_app_version, sdk_name, sdk_version, entity, error, source_metadata, created_at, updated_at, entity_hash, last_seen, duplicate_count, graph_upserted_at, graph_upsert_attempts, graph_upsert_claimed_at ` type CreateIngestionLogParams struct { @@ -263,6 +337,9 @@ func (q *Queries) CreateIngestionLog(ctx context.Context, arg CreateIngestionLog &i.EntityHash, &i.LastSeen, &i.DuplicateCount, + &i.GraphUpsertedAt, + &i.GraphUpsertAttempts, + &i.GraphUpsertClaimedAt, ) return i, err } @@ -297,7 +374,7 @@ func (q *Queries) FindIngestionLogIDsByExternalIDs(ctx context.Context, external } const findPriorIngestionLogByEntityHash = `-- name: FindPriorIngestionLogByEntityHash :one -SELECT il.id, il.external_id, il.object_type, il.state, il.request_id, il.ingestion_ts, il.source_ts, il.producer_app_name, il.producer_app_version, il.sdk_name, il.sdk_version, il.entity, il.error, il.source_metadata, il.created_at, il.updated_at, il.entity_hash, il.last_seen, il.duplicate_count +SELECT il.id, il.external_id, il.object_type, il.state, il.request_id, il.ingestion_ts, il.source_ts, il.producer_app_name, il.producer_app_version, il.sdk_name, il.sdk_version, il.entity, il.error, il.source_metadata, il.created_at, il.updated_at, il.entity_hash, il.last_seen, il.duplicate_count, il.graph_upserted_at, il.graph_upsert_attempts, il.graph_upsert_claimed_at FROM ingestion_logs il LEFT JOIN LATERAL ( SELECT branch_id @@ -340,15 +417,18 @@ func (q *Queries) FindPriorIngestionLogByEntityHash(ctx context.Context, arg Fin &i.EntityHash, &i.LastSeen, &i.DuplicateCount, + &i.GraphUpsertedAt, + &i.GraphUpsertAttempts, + &i.GraphUpsertClaimedAt, ) return i, err } const findPriorIngestionLogsByEntityHashes = `-- name: FindPriorIngestionLogsByEntityHashes :many -SELECT il.id, il.external_id, il.object_type, il.state, il.request_id, il.ingestion_ts, il.source_ts, il.producer_app_name, il.producer_app_version, il.sdk_name, il.sdk_version, il.entity, il.error, il.source_metadata, il.created_at, il.updated_at, il.entity_hash, il.last_seen, il.duplicate_count +SELECT il.id, il.external_id, il.object_type, il.state, il.request_id, il.ingestion_ts, il.source_ts, il.producer_app_name, il.producer_app_version, il.sdk_name, il.sdk_version, il.entity, il.error, il.source_metadata, il.created_at, il.updated_at, il.entity_hash, il.last_seen, il.duplicate_count, il.graph_upserted_at, il.graph_upsert_attempts, il.graph_upsert_claimed_at FROM unnest($1::text[]) AS h(entity_hash) CROSS JOIN LATERAL ( - SELECT il2.id, il2.external_id, il2.object_type, il2.state, il2.request_id, il2.ingestion_ts, il2.source_ts, il2.producer_app_name, il2.producer_app_version, il2.sdk_name, il2.sdk_version, il2.entity, il2.error, il2.source_metadata, il2.created_at, il2.updated_at, il2.entity_hash, il2.last_seen, il2.duplicate_count + SELECT il2.id, il2.external_id, il2.object_type, il2.state, il2.request_id, il2.ingestion_ts, il2.source_ts, il2.producer_app_name, il2.producer_app_version, il2.sdk_name, il2.sdk_version, il2.entity, il2.error, il2.source_metadata, il2.created_at, il2.updated_at, il2.entity_hash, il2.last_seen, il2.duplicate_count, il2.graph_upserted_at, il2.graph_upsert_attempts, il2.graph_upsert_claimed_at FROM ingestion_logs il2 WHERE il2.entity_hash = h.entity_hash AND ( @@ -397,6 +477,9 @@ func (q *Queries) FindPriorIngestionLogsByEntityHashes(ctx context.Context, arg &i.EntityHash, &i.LastSeen, &i.DuplicateCount, + &i.GraphUpsertedAt, + &i.GraphUpsertAttempts, + &i.GraphUpsertClaimedAt, ); err != nil { return nil, err } @@ -420,6 +503,33 @@ func (q *Queries) IncrementDuplicateCount(ctx context.Context, id int32) error { return err } +const markGraphUpserted = `-- name: MarkGraphUpserted :exec +UPDATE ingestion_logs +SET graph_upserted_at = CURRENT_TIMESTAMP, + graph_upsert_claimed_at = NULL +WHERE id = ANY($1::int4[]) +` + +// Mark a batch of ingestion logs as successfully upserted into the graph DB. +func (q *Queries) MarkGraphUpserted(ctx context.Context, ids []int32) error { + _, err := q.db.Exec(ctx, markGraphUpserted, ids) + return err +} + +const releaseGraphUpsertClaims = `-- name: ReleaseGraphUpsertClaims :exec +UPDATE ingestion_logs +SET graph_upsert_claimed_at = NULL +WHERE id = ANY($1::int4[]) +` + +// Release graph-upsert claims for a batch of ingestion logs without marking +// them as upserted. Used when an attempt fails but more retries are still +// allowed by max_attempts. +func (q *Queries) ReleaseGraphUpsertClaims(ctx context.Context, ids []int32) error { + _, err := q.db.Exec(ctx, releaseGraphUpsertClaims, ids) + return err +} + const resetApplyingIngestionLogs = `-- name: ResetApplyingIngestionLogs :exec UPDATE ingestion_logs SET state = 1 @@ -433,8 +543,22 @@ func (q *Queries) ResetApplyingIngestionLogs(ctx context.Context) error { return err } +const resetClaimedGraphUpserts = `-- name: ResetClaimedGraphUpserts :exec +UPDATE ingestion_logs +SET graph_upsert_claimed_at = NULL +WHERE graph_upsert_claimed_at IS NOT NULL + AND graph_upserted_at IS NULL +` + +// Clear stale graph-upsert claims on startup so rows held by a crashed +// worker are re-claimable. Idempotent — safe to run on every startup. +func (q *Queries) ResetClaimedGraphUpserts(ctx context.Context) error { + _, err := q.db.Exec(ctx, resetClaimedGraphUpserts) + return err +} + const retrieveIngestionLogByExternalID = `-- name: RetrieveIngestionLogByExternalID :one -SELECT id, external_id, object_type, state, request_id, ingestion_ts, source_ts, producer_app_name, producer_app_version, sdk_name, sdk_version, entity, error, source_metadata, created_at, updated_at, entity_hash, last_seen, duplicate_count +SELECT id, external_id, object_type, state, request_id, ingestion_ts, source_ts, producer_app_name, producer_app_version, sdk_name, sdk_version, entity, error, source_metadata, created_at, updated_at, entity_hash, last_seen, duplicate_count, graph_upserted_at, graph_upsert_attempts, graph_upsert_claimed_at FROM ingestion_logs WHERE external_id = $1 ` @@ -462,12 +586,15 @@ func (q *Queries) RetrieveIngestionLogByExternalID(ctx context.Context, external &i.EntityHash, &i.LastSeen, &i.DuplicateCount, + &i.GraphUpsertedAt, + &i.GraphUpsertAttempts, + &i.GraphUpsertClaimedAt, ) return i, err } const retrieveIngestionLogs = `-- name: RetrieveIngestionLogs :many -SELECT id, external_id, object_type, state, request_id, ingestion_ts, source_ts, producer_app_name, producer_app_version, sdk_name, sdk_version, entity, error, source_metadata, created_at, updated_at, entity_hash, last_seen, duplicate_count +SELECT id, external_id, object_type, state, request_id, ingestion_ts, source_ts, producer_app_name, producer_app_version, sdk_name, sdk_version, entity, error, source_metadata, created_at, updated_at, entity_hash, last_seen, duplicate_count, graph_upserted_at, graph_upsert_attempts, graph_upsert_claimed_at FROM ingestion_logs WHERE (state = $1 OR $1 IS NULL) AND (object_type = $2 OR $2 IS NULL) @@ -522,6 +649,9 @@ func (q *Queries) RetrieveIngestionLogs(ctx context.Context, arg RetrieveIngesti &i.EntityHash, &i.LastSeen, &i.DuplicateCount, + &i.GraphUpsertedAt, + &i.GraphUpsertAttempts, + &i.GraphUpsertClaimedAt, ); err != nil { return nil, err } @@ -606,7 +736,7 @@ UPDATE ingestion_logs SET state = $2, error = $3 WHERE id = $1 -RETURNING id, external_id, object_type, state, request_id, ingestion_ts, source_ts, producer_app_name, producer_app_version, sdk_name, sdk_version, entity, error, source_metadata, created_at, updated_at, entity_hash, last_seen, duplicate_count +RETURNING id, external_id, object_type, state, request_id, ingestion_ts, source_ts, producer_app_name, producer_app_version, sdk_name, sdk_version, entity, error, source_metadata, created_at, updated_at, entity_hash, last_seen, duplicate_count, graph_upserted_at, graph_upsert_attempts, graph_upsert_claimed_at ` type UpdateIngestionLogStateWithErrorParams struct { diff --git a/diode-server/gen/dbstore/postgres/types.go b/diode-server/gen/dbstore/postgres/types.go index bc02935d..320d0c54 100644 --- a/diode-server/gen/dbstore/postgres/types.go +++ b/diode-server/gen/dbstore/postgres/types.go @@ -90,25 +90,28 @@ type GraphNodeSnapshotMetadatum struct { } type IngestionLog struct { - ID int32 `json:"id"` - ExternalID string `json:"external_id"` - ObjectType pgtype.Text `json:"object_type"` - State pgtype.Int4 `json:"state"` - RequestID pgtype.Text `json:"request_id"` - IngestionTs pgtype.Int8 `json:"ingestion_ts"` - SourceTs pgtype.Int8 `json:"source_ts"` - ProducerAppName pgtype.Text `json:"producer_app_name"` - ProducerAppVersion pgtype.Text `json:"producer_app_version"` - SdkName pgtype.Text `json:"sdk_name"` - SdkVersion pgtype.Text `json:"sdk_version"` - Entity []byte `json:"entity"` - Error json.RawMessage `json:"error"` - SourceMetadata []byte `json:"source_metadata"` - CreatedAt pgtype.Timestamptz `json:"created_at"` - UpdatedAt pgtype.Timestamptz `json:"updated_at"` - EntityHash pgtype.Text `json:"entity_hash"` - LastSeen pgtype.Timestamptz `json:"last_seen"` - DuplicateCount int32 `json:"duplicate_count"` + ID int32 `json:"id"` + ExternalID string `json:"external_id"` + ObjectType pgtype.Text `json:"object_type"` + State pgtype.Int4 `json:"state"` + RequestID pgtype.Text `json:"request_id"` + IngestionTs pgtype.Int8 `json:"ingestion_ts"` + SourceTs pgtype.Int8 `json:"source_ts"` + ProducerAppName pgtype.Text `json:"producer_app_name"` + ProducerAppVersion pgtype.Text `json:"producer_app_version"` + SdkName pgtype.Text `json:"sdk_name"` + SdkVersion pgtype.Text `json:"sdk_version"` + Entity []byte `json:"entity"` + Error json.RawMessage `json:"error"` + SourceMetadata []byte `json:"source_metadata"` + CreatedAt pgtype.Timestamptz `json:"created_at"` + UpdatedAt pgtype.Timestamptz `json:"updated_at"` + EntityHash pgtype.Text `json:"entity_hash"` + LastSeen pgtype.Timestamptz `json:"last_seen"` + DuplicateCount int32 `json:"duplicate_count"` + GraphUpsertedAt pgtype.Timestamptz `json:"graph_upserted_at"` + GraphUpsertAttempts int32 `json:"graph_upsert_attempts"` + GraphUpsertClaimedAt pgtype.Timestamptz `json:"graph_upsert_claimed_at"` } type VDeviation struct { diff --git a/diode-server/reconciler/config.go b/diode-server/reconciler/config.go index 87fd8056..0fa76abc 100644 --- a/diode-server/reconciler/config.go +++ b/diode-server/reconciler/config.go @@ -42,6 +42,10 @@ type Config struct { AutoApplyProcessorBatchSize int32 `envconfig:"AUTO_APPLY_PROCESSOR_BATCH_SIZE" default:"100"` AutoApplyProcessorConcurrency int `envconfig:"AUTO_APPLY_PROCESSOR_CONCURRENCY" default:"1"` + GraphUpsertProcessorBatchSize int32 `envconfig:"GRAPH_UPSERT_PROCESSOR_BATCH_SIZE" default:"100"` + GraphUpsertProcessorConcurrency int `envconfig:"GRAPH_UPSERT_PROCESSOR_CONCURRENCY" default:"1"` + GraphUpsertProcessorMaxAttempts int32 `envconfig:"GRAPH_UPSERT_PROCESSOR_MAX_ATTEMPTS" default:"5"` + // Experimental EnableGraphDB bool `envconfig:"ENABLE_GRAPH_DB" default:"false"` diff --git a/diode-server/reconciler/graph_upsert_processor.go b/diode-server/reconciler/graph_upsert_processor.go new file mode 100644 index 00000000..82d8cc1a --- /dev/null +++ b/diode-server/reconciler/graph_upsert_processor.go @@ -0,0 +1,246 @@ +package reconciler + +import ( + "context" + "encoding/json" + "log/slog" + "sync" + "time" + + "go.opentelemetry.io/otel/attribute" + + "github.com/netboxlabs/diode/diode-server/gen/diode/v1/diodepb" + "github.com/netboxlabs/diode/diode-server/graph" + "github.com/netboxlabs/diode/diode-server/reconciler/ops" + "github.com/netboxlabs/diode/diode-server/telemetry" +) + +const ( + defaultGraphUpsertPollInterval = 100 * time.Millisecond + defaultGraphUpsertIdleInterval = time.Second + defaultGraphUpsertBatchSize = 100 + defaultGraphUpsertMaxAttempts = 5 +) + +// GraphEntityUpserter is the subset of *graph.Service that the +// GraphUpsertProcessor depends on. It exists so the processor can be tested +// with a small mock rather than wiring up a real graph repository. +// +// Implementations are not required to be safe for concurrent use; the +// processor builds a fresh instance per worker via GraphServiceFactory to +// match graph.Service's documented serialization requirement. +type GraphEntityUpserter interface { + UpsertEntity(ctx context.Context, entity *diodepb.Entity, requestMetadata ...map[string]any) (*graph.Node, error) +} + +// GraphServiceFactory returns a GraphEntityUpserter for a single worker +// goroutine. graph.Service holds per-call mutable state on the receiver and +// is documented as not safe for concurrent use, so the processor calls the +// factory once per worker on startup. +type GraphServiceFactory func() GraphEntityUpserter + +// GraphUpsertProcessor moves entities from ingestion_logs into the graph +// database. It runs independently of the ingestion state machine — graph +// rows are tracked via the graph_upserted_at/graph_upsert_attempts columns +// rather than the ingestion state column, so plan and apply continue to +// drain the inbox even if the graph DB is unavailable. +// +// On crash mid-batch, rows with a non-null graph_upsert_claimed_at are +// returned to the pool at startup by ResetClaimedGraphUpserts so the next +// worker iteration picks them up. +type GraphUpsertProcessor struct { + config Config + logger *slog.Logger + repo Repository + metrics Metrics + serviceFactory GraphServiceFactory + backpressure BackpressureFunc + cancel context.CancelFunc + mx sync.Mutex + batchSize int32 + maxAttempts int32 +} + +// NewGraphUpsertProcessor creates a new graph-upsert processor. +// +// serviceFactory must return a fresh GraphEntityUpserter per call — each +// worker goroutine builds its own to avoid contending on the per-call +// mutable state held inside graph.Service. +// +// backpressure may be nil; when supplied, the poll loop yields one idle +// interval while the condition holds, matching the existing processors. +func NewGraphUpsertProcessor(logger *slog.Logger, cfg Config, repo Repository, metrics Metrics, serviceFactory GraphServiceFactory, backpressure BackpressureFunc) *GraphUpsertProcessor { + batchSize := cfg.GraphUpsertProcessorBatchSize + if batchSize <= 0 { + batchSize = defaultGraphUpsertBatchSize + } + maxAttempts := cfg.GraphUpsertProcessorMaxAttempts + if maxAttempts <= 0 { + maxAttempts = defaultGraphUpsertMaxAttempts + } + return &GraphUpsertProcessor{ + config: cfg, + logger: logger, + repo: repo, + metrics: metrics, + serviceFactory: serviceFactory, + backpressure: backpressure, + batchSize: batchSize, + maxAttempts: maxAttempts, + } +} + +// Name returns the name of the component. +func (p *GraphUpsertProcessor) Name() string { + return "reconciler-graph-upsert-processor" +} + +// Start begins polling for un-upserted ingestion logs and processing them. +func (p *GraphUpsertProcessor) Start(ctx context.Context) error { + p.logger.Info("starting component", "name", p.Name()) + p.mx.Lock() + ctx, cancel := context.WithCancel(ctx) + p.cancel = cancel + p.mx.Unlock() + return p.pollLoop(ctx) +} + +// Stop stops the graph-upsert processor. +func (p *GraphUpsertProcessor) Stop() error { + p.logger.Info("stopping component", "name", p.Name()) + p.mx.Lock() + if p.cancel != nil { + p.cancel() + p.cancel = nil + } + p.mx.Unlock() + return nil +} + +func (p *GraphUpsertProcessor) pollLoop(ctx context.Context) error { + concurrency := max(p.config.GraphUpsertProcessorConcurrency, 1) + + if concurrency == 1 { + p.pollWorker(ctx, p.serviceFactory()) + return nil + } + + var wg sync.WaitGroup + wg.Add(concurrency) + for range concurrency { + go func() { + defer wg.Done() + p.pollWorker(ctx, p.serviceFactory()) + }() + } + wg.Wait() + return nil +} + +func (p *GraphUpsertProcessor) pollWorker(ctx context.Context, svc GraphEntityUpserter) { + for { + if p.backpressure != nil && p.backpressure(ctx) { + select { + case <-ctx.Done(): + return + case <-time.After(defaultGraphUpsertIdleInterval): + continue + } + } + + batch, err := p.repo.ClaimGraphUpsertCandidates(ctx, p.batchSize, p.maxAttempts) + if err != nil { + p.logger.Error("failed to claim graph-upsert candidates", "error", err) + select { + case <-ctx.Done(): + return + case <-time.After(defaultGraphUpsertIdleInterval): + continue + } + } + + if len(batch) == 0 { + select { + case <-ctx.Done(): + return + case <-time.After(defaultGraphUpsertIdleInterval): + continue + } + } + + p.processBatch(ctx, svc, batch) + + if len(batch) < int(p.batchSize) { + select { + case <-ctx.Done(): + return + case <-time.After(defaultGraphUpsertPollInterval): + } + } + } +} + +func (p *GraphUpsertProcessor) processBatch(ctx context.Context, svc GraphEntityUpserter, batch []ops.QueuedIngestionLog) { + successIDs := make([]int32, 0, len(batch)) + failureIDs := make([]int32, 0) + + for _, item := range batch { + entity := item.IngestionLog.GetEntity() + if entity == nil { + p.logger.Warn("graph upsert skipped: ingestion log has nil entity", "ingestion_log_id", item.ID) + successIDs = append(successIDs, item.ID) + continue + } + + var reqMeta map[string]any + if len(item.SourceMetadata) > 0 { + if err := json.Unmarshal(item.SourceMetadata, &reqMeta); err != nil { + p.logger.Warn("failed to unmarshal request metadata; continuing without it", + "error", err, + "ingestion_log_id", item.ID) + reqMeta = nil + } + } + + nodeType := item.IngestionLog.GetObjectType() + start := time.Now() + + var err error + if reqMeta != nil { + _, err = svc.UpsertEntity(ctx, entity, reqMeta) + } else { + _, err = svc.UpsertEntity(ctx, entity) + } + duration := time.Since(start).Seconds() + + attrs := []attribute.KeyValue{ + attribute.String(telemetry.AttributeSDKName, item.IngestionLog.GetSdkName()), + attribute.String(telemetry.AttributeProducerAppName, item.IngestionLog.GetProducerAppName()), + } + metricsCtx := telemetry.ContextWithMetricAttributes(ctx, attrs...) + + if err != nil { + p.logger.Warn("graph upsert failed", + "error", err, + "ingestion_log_id", item.ID, + "object_type", nodeType) + p.metrics.RecordGraphUpsert(metricsCtx, false, nodeType, duration) + failureIDs = append(failureIDs, item.ID) + continue + } + + p.metrics.RecordGraphUpsert(metricsCtx, true, nodeType, duration) + successIDs = append(successIDs, item.ID) + } + + if len(successIDs) > 0 { + if err := p.repo.MarkGraphUpserted(ctx, successIDs); err != nil { + p.logger.Error("failed to mark graph-upserted rows; they will be re-claimed", "error", err, "ids", len(successIDs)) + } + } + if len(failureIDs) > 0 { + if err := p.repo.ReleaseGraphUpsertClaims(ctx, failureIDs); err != nil { + p.logger.Error("failed to release graph-upsert claims; rows will recover on the next ResetClaimedGraphUpserts cycle", "error", err, "ids", len(failureIDs)) + } + } +} diff --git a/diode-server/reconciler/graph_upsert_processor_test.go b/diode-server/reconciler/graph_upsert_processor_test.go new file mode 100644 index 00000000..3daa7363 --- /dev/null +++ b/diode-server/reconciler/graph_upsert_processor_test.go @@ -0,0 +1,452 @@ +package reconciler_test + +import ( + "context" + "errors" + "log/slog" + "os" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + + "github.com/netboxlabs/diode/diode-server/gen/diode/v1/diodepb" + "github.com/netboxlabs/diode/diode-server/gen/diode/v1/reconcilerpb" + "github.com/netboxlabs/diode/diode-server/graph" + "github.com/netboxlabs/diode/diode-server/reconciler" + "github.com/netboxlabs/diode/diode-server/reconciler/mocks" + "github.com/netboxlabs/diode/diode-server/reconciler/ops" +) + +// fakeUpserter is a minimal GraphEntityUpserter that returns canned responses +// per call. It tracks how many times it was invoked so tests can assert the +// processor walked the full batch. +type fakeUpserter struct { + mu sync.Mutex + results []error + calls int + entity []*diodepb.Entity + reqMeta []map[string]any +} + +func (f *fakeUpserter) UpsertEntity(_ context.Context, entity *diodepb.Entity, requestMetadata ...map[string]any) (*graph.Node, error) { + f.mu.Lock() + defer f.mu.Unlock() + f.entity = append(f.entity, entity) + if len(requestMetadata) > 0 { + f.reqMeta = append(f.reqMeta, requestMetadata[0]) + } else { + f.reqMeta = append(f.reqMeta, nil) + } + idx := f.calls + f.calls++ + if idx < len(f.results) { + if err := f.results[idx]; err != nil { + return nil, err + } + } + return &graph.Node{}, nil +} + +func (f *fakeUpserter) callCount() int { + f.mu.Lock() + defer f.mu.Unlock() + return f.calls +} + +func (f *fakeUpserter) recordedMetadata() []map[string]any { + f.mu.Lock() + defer f.mu.Unlock() + out := make([]map[string]any, len(f.reqMeta)) + copy(out, f.reqMeta) + return out +} + +func newGraphUpsertTestLogger() *slog.Logger { + return slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug})) +} + +func newGraphUpsertCandidate(id int32, objectType string, sourceMetadata []byte) ops.QueuedIngestionLog { + return ops.QueuedIngestionLog{ + ID: id, + IngestionLog: &reconcilerpb.IngestionLog{ + ObjectType: objectType, + Entity: &diodepb.Entity{Entity: &diodepb.Entity_Site{Site: &diodepb.Site{Name: "site-" + objectType}}}, + SdkName: "test-sdk", + ProducerAppName: "test-app", + }, + SourceMetadata: sourceMetadata, + } +} + +func TestGraphUpsertProcessor_Name(t *testing.T) { + factory := func() reconciler.GraphEntityUpserter { return &fakeUpserter{} } + p := reconciler.NewGraphUpsertProcessor(newGraphUpsertTestLogger(), reconciler.Config{}, nil, nil, factory, nil) + assert.Equal(t, "reconciler-graph-upsert-processor", p.Name()) +} + +func TestGraphUpsertProcessor_StartStop(t *testing.T) { + repo := mocks.NewRepository(t) + mockMetrics := mocks.NewMetrics(t) + repo.On("ClaimGraphUpsertCandidates", mock.Anything, mock.Anything, mock.Anything). + Return([]ops.QueuedIngestionLog{}, nil).Maybe() + + factory := func() reconciler.GraphEntityUpserter { return &fakeUpserter{} } + p := reconciler.NewGraphUpsertProcessor(newGraphUpsertTestLogger(), reconciler.Config{}, repo, mockMetrics, factory, nil) + + ctx, cancel := context.WithCancel(context.Background()) + done := make(chan error, 1) + go func() { done <- p.Start(ctx) }() + + time.Sleep(50 * time.Millisecond) + cancel() + + select { + case err := <-done: + require.NoError(t, err) + case <-time.After(5 * time.Second): + t.Fatal("processor did not exit") + } +} + +func TestGraphUpsertProcessor_StopViaMethod(t *testing.T) { + repo := mocks.NewRepository(t) + mockMetrics := mocks.NewMetrics(t) + repo.On("ClaimGraphUpsertCandidates", mock.Anything, mock.Anything, mock.Anything). + Return([]ops.QueuedIngestionLog{}, nil).Maybe() + + factory := func() reconciler.GraphEntityUpserter { return &fakeUpserter{} } + p := reconciler.NewGraphUpsertProcessor(newGraphUpsertTestLogger(), reconciler.Config{}, repo, mockMetrics, factory, nil) + + ctx := context.Background() + done := make(chan error, 1) + go func() { done <- p.Start(ctx) }() + + time.Sleep(50 * time.Millisecond) + require.NoError(t, p.Stop()) + + select { + case err := <-done: + require.NoError(t, err) + case <-time.After(5 * time.Second): + t.Fatal("processor did not exit") + } +} + +func TestGraphUpsertProcessor_SuccessfulBatchMarksAllRows(t *testing.T) { + repo := mocks.NewRepository(t) + mockMetrics := mocks.NewMetrics(t) + + batch := []ops.QueuedIngestionLog{ + newGraphUpsertCandidate(1, "dcim.site", []byte(`{"run_id":"abc"}`)), + newGraphUpsertCandidate(2, "dcim.site", []byte(`{"run_id":"abc"}`)), + } + + claimed := make(chan struct{}, 1) + marked := make(chan struct{}, 1) + + repo.On("ClaimGraphUpsertCandidates", mock.Anything, int32(100), int32(5)). + Return(batch, nil).Once().Run(func(_ mock.Arguments) { + select { + case claimed <- struct{}{}: + default: + } + }) + repo.On("ClaimGraphUpsertCandidates", mock.Anything, int32(100), int32(5)). + Return([]ops.QueuedIngestionLog{}, nil).Maybe() + + repo.On("MarkGraphUpserted", mock.Anything, []int32{1, 2}).Return(nil).Once().Run(func(_ mock.Arguments) { + select { + case marked <- struct{}{}: + default: + } + }) + mockMetrics.On("RecordGraphUpsert", mock.Anything, true, "dcim.site", mock.AnythingOfType("float64")).Times(2) + + upserter := &fakeUpserter{} + factory := func() reconciler.GraphEntityUpserter { return upserter } + p := reconciler.NewGraphUpsertProcessor(newGraphUpsertTestLogger(), reconciler.Config{}, repo, mockMetrics, factory, nil) + + ctx, cancel := context.WithCancel(context.Background()) + done := make(chan error, 1) + go func() { done <- p.Start(ctx) }() + + select { + case <-claimed: + case <-time.After(5 * time.Second): + t.Fatal("batch was never claimed") + } + select { + case <-marked: + case <-time.After(5 * time.Second): + t.Fatal("batch was never marked upserted") + } + + cancel() + <-done + + assert.Equal(t, 2, upserter.callCount(), "upserter should have been called for each row") + // Verify request metadata was unmarshaled and passed through. + for _, meta := range upserter.recordedMetadata() { + require.NotNil(t, meta) + assert.Equal(t, "abc", meta["run_id"]) + } + + repo.AssertExpectations(t) + repo.AssertNotCalled(t, "ReleaseGraphUpsertClaims", mock.Anything, mock.Anything) +} + +func TestGraphUpsertProcessor_FailedRowsReleaseClaims(t *testing.T) { + repo := mocks.NewRepository(t) + mockMetrics := mocks.NewMetrics(t) + + batch := []ops.QueuedIngestionLog{ + newGraphUpsertCandidate(10, "dcim.site", nil), + newGraphUpsertCandidate(11, "dcim.site", nil), + newGraphUpsertCandidate(12, "dcim.site", nil), + } + + claimed := make(chan struct{}, 1) + mixed := make(chan struct{}, 1) + + repo.On("ClaimGraphUpsertCandidates", mock.Anything, int32(100), int32(5)). + Return(batch, nil).Once().Run(func(_ mock.Arguments) { + select { + case claimed <- struct{}{}: + default: + } + }) + repo.On("ClaimGraphUpsertCandidates", mock.Anything, int32(100), int32(5)). + Return([]ops.QueuedIngestionLog{}, nil).Maybe() + + // Row 11 fails; rows 10 and 12 succeed. + repo.On("MarkGraphUpserted", mock.Anything, []int32{10, 12}).Return(nil).Once() + repo.On("ReleaseGraphUpsertClaims", mock.Anything, []int32{11}).Return(nil).Once().Run(func(_ mock.Arguments) { + select { + case mixed <- struct{}{}: + default: + } + }) + + mockMetrics.On("RecordGraphUpsert", mock.Anything, true, "dcim.site", mock.AnythingOfType("float64")).Times(2) + mockMetrics.On("RecordGraphUpsert", mock.Anything, false, "dcim.site", mock.AnythingOfType("float64")).Once() + + upserter := &fakeUpserter{results: []error{nil, errors.New("graph DB unreachable"), nil}} + factory := func() reconciler.GraphEntityUpserter { return upserter } + p := reconciler.NewGraphUpsertProcessor(newGraphUpsertTestLogger(), reconciler.Config{}, repo, mockMetrics, factory, nil) + + ctx, cancel := context.WithCancel(context.Background()) + done := make(chan error, 1) + go func() { done <- p.Start(ctx) }() + + <-claimed + <-mixed + + cancel() + <-done + + repo.AssertExpectations(t) + mockMetrics.AssertExpectations(t) +} + +func TestGraphUpsertProcessor_NilEntityIsTreatedAsSuccess(t *testing.T) { + repo := mocks.NewRepository(t) + mockMetrics := mocks.NewMetrics(t) + + batch := []ops.QueuedIngestionLog{{ + ID: 42, + IngestionLog: &reconcilerpb.IngestionLog{ObjectType: "dcim.site"}, + }} + + claimed := make(chan struct{}, 1) + repo.On("ClaimGraphUpsertCandidates", mock.Anything, int32(100), int32(5)). + Return(batch, nil).Once().Run(func(_ mock.Arguments) { + select { + case claimed <- struct{}{}: + default: + } + }) + repo.On("ClaimGraphUpsertCandidates", mock.Anything, int32(100), int32(5)). + Return([]ops.QueuedIngestionLog{}, nil).Maybe() + repo.On("MarkGraphUpserted", mock.Anything, []int32{42}).Return(nil).Once() + + upserter := &fakeUpserter{} + factory := func() reconciler.GraphEntityUpserter { return upserter } + p := reconciler.NewGraphUpsertProcessor(newGraphUpsertTestLogger(), reconciler.Config{}, repo, mockMetrics, factory, nil) + + ctx, cancel := context.WithCancel(context.Background()) + done := make(chan error, 1) + go func() { done <- p.Start(ctx) }() + + <-claimed + time.Sleep(100 * time.Millisecond) + cancel() + <-done + + assert.Equal(t, 0, upserter.callCount(), "nil entity must skip the graph service entirely") + repo.AssertNotCalled(t, "ReleaseGraphUpsertClaims", mock.Anything, mock.Anything) +} + +func TestGraphUpsertProcessor_ClaimErrorIsRetried(t *testing.T) { + repo := mocks.NewRepository(t) + mockMetrics := mocks.NewMetrics(t) + + errCh := make(chan struct{}, 1) + repo.On("ClaimGraphUpsertCandidates", mock.Anything, int32(100), int32(5)). + Return(nil, errors.New("db down")).Once().Run(func(_ mock.Arguments) { + select { + case errCh <- struct{}{}: + default: + } + }) + repo.On("ClaimGraphUpsertCandidates", mock.Anything, int32(100), int32(5)). + Return([]ops.QueuedIngestionLog{}, nil).Maybe() + + factory := func() reconciler.GraphEntityUpserter { return &fakeUpserter{} } + p := reconciler.NewGraphUpsertProcessor(newGraphUpsertTestLogger(), reconciler.Config{}, repo, mockMetrics, factory, nil) + + ctx, cancel := context.WithCancel(context.Background()) + done := make(chan error, 1) + go func() { done <- p.Start(ctx) }() + + <-errCh + time.Sleep(50 * time.Millisecond) + cancel() + + select { + case err := <-done: + require.NoError(t, err) + case <-time.After(5 * time.Second): + t.Fatal("processor did not exit after claim error") + } +} + +func TestGraphUpsertProcessor_BackpressureSkipsProcessing(t *testing.T) { + repo := mocks.NewRepository(t) + mockMetrics := mocks.NewMetrics(t) + + var backpressureActive atomic.Bool + backpressureActive.Store(true) + backpressure := func(_ context.Context) bool { return backpressureActive.Load() } + + factory := func() reconciler.GraphEntityUpserter { return &fakeUpserter{} } + p := reconciler.NewGraphUpsertProcessor(newGraphUpsertTestLogger(), reconciler.Config{}, repo, mockMetrics, factory, backpressure) + + ctx, cancel := context.WithCancel(context.Background()) + done := make(chan error, 1) + go func() { done <- p.Start(ctx) }() + + time.Sleep(200 * time.Millisecond) + cancel() + <-done + + repo.AssertNotCalled(t, "ClaimGraphUpsertCandidates", mock.Anything, mock.Anything, mock.Anything) +} + +func TestGraphUpsertProcessor_BackpressureReleasedResumesProcessing(t *testing.T) { + repo := mocks.NewRepository(t) + mockMetrics := mocks.NewMetrics(t) + + batch := []ops.QueuedIngestionLog{newGraphUpsertCandidate(1, "dcim.site", nil)} + + var backpressureActive atomic.Bool + backpressureActive.Store(true) + backpressure := func(_ context.Context) bool { return backpressureActive.Load() } + + claimed := make(chan struct{}, 1) + repo.On("ClaimGraphUpsertCandidates", mock.Anything, int32(100), int32(5)). + Return(batch, nil).Once().Run(func(_ mock.Arguments) { + select { + case claimed <- struct{}{}: + default: + } + }) + repo.On("ClaimGraphUpsertCandidates", mock.Anything, int32(100), int32(5)). + Return([]ops.QueuedIngestionLog{}, nil).Maybe() + repo.On("MarkGraphUpserted", mock.Anything, []int32{1}).Return(nil).Maybe() + mockMetrics.On("RecordGraphUpsert", mock.Anything, true, "dcim.site", mock.AnythingOfType("float64")).Maybe() + + factory := func() reconciler.GraphEntityUpserter { return &fakeUpserter{} } + p := reconciler.NewGraphUpsertProcessor(newGraphUpsertTestLogger(), reconciler.Config{}, repo, mockMetrics, factory, backpressure) + + ctx, cancel := context.WithCancel(context.Background()) + done := make(chan error, 1) + go func() { done <- p.Start(ctx) }() + + time.Sleep(100 * time.Millisecond) + repo.AssertNotCalled(t, "ClaimGraphUpsertCandidates", mock.Anything, mock.Anything, mock.Anything) + + backpressureActive.Store(false) + + select { + case <-claimed: + case <-time.After(5 * time.Second): + t.Fatal("processor did not resume after backpressure released") + } + + cancel() + <-done +} + +func TestGraphUpsertProcessor_PerWorkerFactoryIsInvokedOncePerWorker(t *testing.T) { + repo := mocks.NewRepository(t) + mockMetrics := mocks.NewMetrics(t) + + repo.On("ClaimGraphUpsertCandidates", mock.Anything, int32(100), int32(5)). + Return([]ops.QueuedIngestionLog{}, nil).Maybe() + + var factoryCalls atomic.Int32 + factory := func() reconciler.GraphEntityUpserter { + factoryCalls.Add(1) + return &fakeUpserter{} + } + + cfg := reconciler.Config{GraphUpsertProcessorConcurrency: 3} + p := reconciler.NewGraphUpsertProcessor(newGraphUpsertTestLogger(), cfg, repo, mockMetrics, factory, nil) + + ctx, cancel := context.WithCancel(context.Background()) + done := make(chan error, 1) + go func() { done <- p.Start(ctx) }() + + // Give all three workers a chance to spin up before cancelling. + assert.Eventually(t, func() bool { return factoryCalls.Load() == 3 }, 2*time.Second, 20*time.Millisecond, + "factory should be invoked exactly once per worker — graph.Service is not safe for concurrent use") + + cancel() + <-done +} + +func TestGraphUpsertProcessor_DefaultsFillUnsetConfigValues(t *testing.T) { + repo := mocks.NewRepository(t) + mockMetrics := mocks.NewMetrics(t) + + // With BatchSize/MaxAttempts unset, the processor must fall back to the + // package defaults — 100/5 — when issuing claim queries. + claimed := make(chan struct{}, 1) + repo.On("ClaimGraphUpsertCandidates", mock.Anything, int32(100), int32(5)). + Return([]ops.QueuedIngestionLog{}, nil).Maybe().Run(func(_ mock.Arguments) { + select { + case claimed <- struct{}{}: + default: + } + }) + + factory := func() reconciler.GraphEntityUpserter { return &fakeUpserter{} } + p := reconciler.NewGraphUpsertProcessor(newGraphUpsertTestLogger(), reconciler.Config{}, repo, mockMetrics, factory, nil) + + ctx, cancel := context.WithCancel(context.Background()) + done := make(chan error, 1) + go func() { done <- p.Start(ctx) }() + + select { + case <-claimed: + case <-time.After(2 * time.Second): + t.Fatal("claim never called with default args") + } + cancel() + <-done +} diff --git a/diode-server/reconciler/ingestion_processor.go b/diode-server/reconciler/ingestion_processor.go index d3df36a2..81892131 100644 --- a/diode-server/reconciler/ingestion_processor.go +++ b/diode-server/reconciler/ingestion_processor.go @@ -3,6 +3,7 @@ package reconciler import ( "bytes" "context" + "encoding/json" "errors" "fmt" "io" @@ -24,7 +25,6 @@ import ( "github.com/netboxlabs/diode/diode-server/gen/diode/v1/diodepb" "github.com/netboxlabs/diode/diode-server/gen/diode/v1/reconcilerpb" "github.com/netboxlabs/diode/diode-server/gen/netbox" - "github.com/netboxlabs/diode/diode-server/graph" "github.com/netboxlabs/diode/diode-server/netboxdiodeplugin" "github.com/netboxlabs/diode/diode-server/reconciler/ops" "github.com/netboxlabs/diode/diode-server/sentry" @@ -72,7 +72,6 @@ type IngestionProcessor struct { metrics Metrics cancel context.CancelFunc mx sync.Mutex - graphService *graph.Service // nil when ENABLE_GRAPH_DB is false } // IngestionProcessorOps represents the basic operations that the ingestion processor performs @@ -85,26 +84,14 @@ type IngestionProcessorOps interface { RefreshDefaultBranch(ctx context.Context) (*netboxdiodeplugin.Branch, error) } -// ProcessorOption is a functional option for configuring IngestionProcessor -type ProcessorOption func(*IngestionProcessor) - -// WithGraphService sets the graph.Service for graph-based entity extraction. -// When set, entities are also stored in the graph database for relationship tracking. -// Pass nil to disable graph extraction. -func WithGraphService(svc *graph.Service) ProcessorOption { - return func(p *IngestionProcessor) { - p.graphService = svc - } -} - // NewIngestionProcessor creates a new ingestion processor -func NewIngestionProcessor(_ context.Context, logger *slog.Logger, cfg Config, redisClient, redisStreamClient RedisClient, redisStreamID string, redisConsumerGroup string, ops IngestionProcessorOps, metrics Metrics, opts ...ProcessorOption) (*IngestionProcessor, error) { +func NewIngestionProcessor(_ context.Context, logger *slog.Logger, cfg Config, redisClient, redisStreamClient RedisClient, redisStreamID string, redisConsumerGroup string, ops IngestionProcessorOps, metrics Metrics) (*IngestionProcessor, error) { hostname, err := os.Hostname() if err != nil { return nil, fmt.Errorf("failed to get hostname: %v", err) } - component := &IngestionProcessor{ + return &IngestionProcessor{ Config: cfg, logger: logger, hostname: hostname, @@ -114,14 +101,7 @@ func NewIngestionProcessor(_ context.Context, logger *slog.Logger, cfg Config, r redisConsumerGroup: redisConsumerGroup, ops: ops, metrics: metrics, - } - - // Apply functional options - for _, opt := range opts { - opt(component) - } - - return component, nil + }, nil } // Name returns the name of the component @@ -286,6 +266,21 @@ func (p *IngestionProcessor) CreateIngestionLogs(ctx context.Context, ingestReq // per-batch NetBox hit that thrashes plugin workers under burst ingest. _, _ = p.ops.DefaultBranch(ctx) + // Stash request-level metadata once per request — every row produced by + // this request shares the same blob. The GraphUpsertProcessor reads it + // back to merge run_id (and friends) into graph snapshots; if marshaling + // fails we degrade to no request metadata rather than failing ingest. + var requestMetadata []byte + if md := ingestReq.GetMetadata(); md != nil { + if asMap := md.AsMap(); len(asMap) > 0 { + if b, err := json.Marshal(asMap); err == nil { + requestMetadata = b + } else { + p.logger.Warn("failed to marshal IngestRequest metadata; ingesting without it", "error", err) + } + } + } + // Phase 1: Pre-validate entities, build ingestion log protos, and generate entity hashes fingerprinter := entityhash.NewEntityFingerprinter() @@ -293,8 +288,6 @@ func (p *IngestionProcessor) CreateIngestionLogs(ctx context.Context, ingestReq index int ingestionLog *reconcilerpb.IngestionLog entityHash string - entity *diodepb.Entity - objectType string } var valid []validEntity @@ -336,8 +329,6 @@ func (p *IngestionProcessor) CreateIngestionLogs(ctx context.Context, ingestReq index: i, ingestionLog: ingestionLog, entityHash: hash, - entity: v, - objectType: objectType, }) } @@ -351,7 +342,7 @@ func (p *IngestionProcessor) CreateIngestionLogs(ctx context.Context, ingestReq entityHashes := make([]string, len(valid)) for i, v := range valid { logs[i] = v.ingestionLog - sourceMetadata[i] = nil + sourceMetadata[i] = requestMetadata entityHashes[i] = v.entityHash } @@ -362,13 +353,14 @@ func (p *IngestionProcessor) CreateIngestionLogs(ctx context.Context, ingestReq return errs } - // Phase 5: Post-processing — metrics, graph upserts, send to channel - for i, result := range results { + // Phase 5: Post-processing — metrics only. Graph upsert runs on a + // separate processor fed from ingestion_logs so the consume loop stays + // at COPY speed regardless of graph-DB latency. + for _, result := range results { if result == nil { continue } - v := valid[i] ingestionLog := result.IngestionLog id := result.ID @@ -384,27 +376,6 @@ func (p *IngestionProcessor) CreateIngestionLogs(ctx context.Context, ingestReq metricsCtx := telemetry.ContextWithMetricAttributes(ctx, attrs...) p.metrics.RecordIngestionLogCreate(metricsCtx, true) - // Upsert entity into graph if graph DB is enabled (non-blocking, errors logged but not fatal) - if p.graphService != nil { - start := time.Now() - // Pass request-level metadata (e.g. run_id) for graph storage - var reqMeta map[string]any - if md := ingestReq.GetMetadata(); md != nil { - reqMeta = md.AsMap() - } - _, graphErr := p.graphService.UpsertEntity(ctx, v.entity, reqMeta) - duration := time.Since(start).Seconds() - if graphErr != nil { - p.logger.Warn("graph upsert entity failed", - "error", graphErr, - "ingestion_log_id", id, - "entity_type", v.objectType) - p.metrics.RecordGraphUpsert(ctx, false, v.objectType, duration) - } else { - p.metrics.RecordGraphUpsert(ctx, true, v.objectType, duration) - } - } - if result.WasDuplicate && result.IngestionLog.State == reconcilerpb.State_IGNORED { p.logger.Debug("skipping ingestion log because it is a duplicate of an ignored ingestion log", "id", id, "externalID", ingestionLog.GetId()) } diff --git a/diode-server/reconciler/ingestion_processor_internal_test.go b/diode-server/reconciler/ingestion_processor_internal_test.go index 6e331822..5d3e16a2 100644 --- a/diode-server/reconciler/ingestion_processor_internal_test.go +++ b/diode-server/reconciler/ingestion_processor_internal_test.go @@ -12,9 +12,11 @@ import ( "github.com/andybalholm/brotli" "github.com/redis/go-redis/v9" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/structpb" "github.com/netboxlabs/diode/diode-server/gen/diode/v1/diodepb" "github.com/netboxlabs/diode/diode-server/gen/diode/v1/reconcilerpb" @@ -296,6 +298,120 @@ func TestHandleStreamMessageLegacyUncompressed(t *testing.T) { mockRepository.AssertExpectations(t) } +// TestCreateIngestionLogs_StashesRequestMetadata verifies that +// IngestRequest.metadata is marshaled into the source_metadata column for +// every row in the batch, so the GraphUpsertProcessor can read it back when +// merging run_id (and friends) into graph snapshots. +func TestCreateIngestionLogs_StashesRequestMetadata(t *testing.T) { + ctx := context.Background() + mockNbClient := new(mnp.NetBoxAPI) + mockRepository := new(mr.Repository) + mockMetrics := mr.NewMetrics(t) + logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug, AddSource: false})) + + p := &IngestionProcessor{ + logger: logger, + Config: Config{}, + ops: NewOps(mockRepository, mockNbClient, logger, nil), + metrics: mockMetrics, + } + + mdStruct, err := structpb.NewStruct(map[string]any{ + "run_id": "run-7", + "source": "observability-pipeline", + "version": 42, + }) + require.NoError(t, err) + + ingestReq := &diodepb.IngestRequest{ + Id: "req-meta", + Entities: []*diodepb.Entity{ + {Entity: &diodepb.Entity_Site{Site: &diodepb.Site{Name: "alpha"}}}, + {Entity: &diodepb.Entity_Site{Site: &diodepb.Site{Name: "beta"}}}, + }, + Metadata: mdStruct, + } + + mockNbClient.On("GetDefaultBranch", mock.Anything).Return((*netboxdiodeplugin.Branch)(nil), nil) + mockRepository.On("FindPriorIngestionLogsByEntityHashes", mock.Anything, mock.Anything, mock.Anything). + Return(map[string]*ops.PriorIngestionLog{}, nil) + + var capturedSourceMetadata [][]byte + mockRepository.On("BulkCreateIngestionLogs", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return( + func(_ context.Context, logs []*reconcilerpb.IngestionLog, sourceMetadata [][]byte, _ []string) map[string]int32 { + capturedSourceMetadata = sourceMetadata + result := make(map[string]int32, len(logs)) + for i, log := range logs { + result[log.Id] = int32(i + 1) + } + return result + }, nil) + mockMetrics.On("RecordIngestionLogCreate", mock.Anything, mock.Anything).Return() + + errs := p.CreateIngestionLogs(ctx, ingestReq, 1720425600) + require.Empty(t, errs) + + require.Len(t, capturedSourceMetadata, 2, "every row in the batch should carry the request metadata") + for i, raw := range capturedSourceMetadata { + require.NotEmpty(t, raw, "row %d source_metadata is empty", i) + var got map[string]any + require.NoError(t, json.Unmarshal(raw, &got)) + assert.Equal(t, "run-7", got["run_id"]) + assert.Equal(t, "observability-pipeline", got["source"]) + // structpb encodes ints as float64 in google.protobuf.Struct. + assert.Equal(t, float64(42), got["version"]) + } + + mockRepository.AssertExpectations(t) +} + +// TestCreateIngestionLogs_NilMetadataLeavesSourceMetadataEmpty confirms the +// non-graph path is unaffected: when no IngestRequest.metadata is supplied, +// each row's source_metadata stays nil so the COPY stream stays compact. +func TestCreateIngestionLogs_NilMetadataLeavesSourceMetadataEmpty(t *testing.T) { + ctx := context.Background() + mockNbClient := new(mnp.NetBoxAPI) + mockRepository := new(mr.Repository) + mockMetrics := mr.NewMetrics(t) + logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug, AddSource: false})) + + p := &IngestionProcessor{ + logger: logger, + Config: Config{}, + ops: NewOps(mockRepository, mockNbClient, logger, nil), + metrics: mockMetrics, + } + + ingestReq := &diodepb.IngestRequest{ + Id: "req-nometa", + Entities: []*diodepb.Entity{ + {Entity: &diodepb.Entity_Site{Site: &diodepb.Site{Name: "lone"}}}, + }, + } + + mockNbClient.On("GetDefaultBranch", mock.Anything).Return((*netboxdiodeplugin.Branch)(nil), nil) + mockRepository.On("FindPriorIngestionLogsByEntityHashes", mock.Anything, mock.Anything, mock.Anything). + Return(map[string]*ops.PriorIngestionLog{}, nil) + + var captured [][]byte + mockRepository.On("BulkCreateIngestionLogs", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return( + func(_ context.Context, logs []*reconcilerpb.IngestionLog, sourceMetadata [][]byte, _ []string) map[string]int32 { + captured = sourceMetadata + out := make(map[string]int32, len(logs)) + for _, log := range logs { + out[log.Id] = 1 + } + return out + }, nil) + mockMetrics.On("RecordIngestionLogCreate", mock.Anything, mock.Anything).Return() + + errs := p.CreateIngestionLogs(ctx, ingestReq, 1720425600) + require.Empty(t, errs) + + require.Len(t, captured, 1) + assert.Nil(t, captured[0], "no IngestRequest.metadata means no source_metadata blob") +} + func TestCompressChangeSet(t *testing.T) { cs := changeset.ChangeSet{ ID: "5663a77e-9bad-4981-afe9-77d8a9f2b8b5", diff --git a/diode-server/reconciler/mocks/repository.go b/diode-server/reconciler/mocks/repository.go index 7ac0a26c..78c6348b 100644 --- a/diode-server/reconciler/mocks/repository.go +++ b/diode-server/reconciler/mocks/repository.go @@ -195,6 +195,66 @@ func (_c *Repository_BulkPersistChangeSets_Call) RunAndReturn(run func(context.C return _c } +// ClaimGraphUpsertCandidates provides a mock function with given fields: ctx, batchSize, maxAttempts +func (_m *Repository) ClaimGraphUpsertCandidates(ctx context.Context, batchSize int32, maxAttempts int32) ([]ops.QueuedIngestionLog, error) { + ret := _m.Called(ctx, batchSize, maxAttempts) + + if len(ret) == 0 { + panic("no return value specified for ClaimGraphUpsertCandidates") + } + + var r0 []ops.QueuedIngestionLog + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, int32, int32) ([]ops.QueuedIngestionLog, error)); ok { + return rf(ctx, batchSize, maxAttempts) + } + if rf, ok := ret.Get(0).(func(context.Context, int32, int32) []ops.QueuedIngestionLog); ok { + r0 = rf(ctx, batchSize, maxAttempts) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]ops.QueuedIngestionLog) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, int32, int32) error); ok { + r1 = rf(ctx, batchSize, maxAttempts) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// Repository_ClaimGraphUpsertCandidates_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ClaimGraphUpsertCandidates' +type Repository_ClaimGraphUpsertCandidates_Call struct { + *mock.Call +} + +// ClaimGraphUpsertCandidates is a helper method to define mock.On call +// - ctx context.Context +// - batchSize int32 +// - maxAttempts int32 +func (_e *Repository_Expecter) ClaimGraphUpsertCandidates(ctx interface{}, batchSize interface{}, maxAttempts interface{}) *Repository_ClaimGraphUpsertCandidates_Call { + return &Repository_ClaimGraphUpsertCandidates_Call{Call: _e.mock.On("ClaimGraphUpsertCandidates", ctx, batchSize, maxAttempts)} +} + +func (_c *Repository_ClaimGraphUpsertCandidates_Call) Run(run func(ctx context.Context, batchSize int32, maxAttempts int32)) *Repository_ClaimGraphUpsertCandidates_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(int32), args[2].(int32)) + }) + return _c +} + +func (_c *Repository_ClaimGraphUpsertCandidates_Call) Return(_a0 []ops.QueuedIngestionLog, _a1 error) *Repository_ClaimGraphUpsertCandidates_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *Repository_ClaimGraphUpsertCandidates_Call) RunAndReturn(run func(context.Context, int32, int32) ([]ops.QueuedIngestionLog, error)) *Repository_ClaimGraphUpsertCandidates_Call { + _c.Call.Return(run) + return _c +} + // ClaimQueuedForAutoApply provides a mock function with given fields: ctx, batchSize func (_m *Repository) ClaimQueuedForAutoApply(ctx context.Context, batchSize int32) ([]ops.QueuedIngestionLog, error) { ret := _m.Called(ctx, batchSize) @@ -668,6 +728,100 @@ func (_c *Repository_IncrementDuplicateCount_Call) RunAndReturn(run func(context return _c } +// MarkGraphUpserted provides a mock function with given fields: ctx, ids +func (_m *Repository) MarkGraphUpserted(ctx context.Context, ids []int32) error { + ret := _m.Called(ctx, ids) + + if len(ret) == 0 { + panic("no return value specified for MarkGraphUpserted") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, []int32) error); ok { + r0 = rf(ctx, ids) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// Repository_MarkGraphUpserted_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'MarkGraphUpserted' +type Repository_MarkGraphUpserted_Call struct { + *mock.Call +} + +// MarkGraphUpserted is a helper method to define mock.On call +// - ctx context.Context +// - ids []int32 +func (_e *Repository_Expecter) MarkGraphUpserted(ctx interface{}, ids interface{}) *Repository_MarkGraphUpserted_Call { + return &Repository_MarkGraphUpserted_Call{Call: _e.mock.On("MarkGraphUpserted", ctx, ids)} +} + +func (_c *Repository_MarkGraphUpserted_Call) Run(run func(ctx context.Context, ids []int32)) *Repository_MarkGraphUpserted_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].([]int32)) + }) + return _c +} + +func (_c *Repository_MarkGraphUpserted_Call) Return(_a0 error) *Repository_MarkGraphUpserted_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *Repository_MarkGraphUpserted_Call) RunAndReturn(run func(context.Context, []int32) error) *Repository_MarkGraphUpserted_Call { + _c.Call.Return(run) + return _c +} + +// ReleaseGraphUpsertClaims provides a mock function with given fields: ctx, ids +func (_m *Repository) ReleaseGraphUpsertClaims(ctx context.Context, ids []int32) error { + ret := _m.Called(ctx, ids) + + if len(ret) == 0 { + panic("no return value specified for ReleaseGraphUpsertClaims") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, []int32) error); ok { + r0 = rf(ctx, ids) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// Repository_ReleaseGraphUpsertClaims_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ReleaseGraphUpsertClaims' +type Repository_ReleaseGraphUpsertClaims_Call struct { + *mock.Call +} + +// ReleaseGraphUpsertClaims is a helper method to define mock.On call +// - ctx context.Context +// - ids []int32 +func (_e *Repository_Expecter) ReleaseGraphUpsertClaims(ctx interface{}, ids interface{}) *Repository_ReleaseGraphUpsertClaims_Call { + return &Repository_ReleaseGraphUpsertClaims_Call{Call: _e.mock.On("ReleaseGraphUpsertClaims", ctx, ids)} +} + +func (_c *Repository_ReleaseGraphUpsertClaims_Call) Run(run func(ctx context.Context, ids []int32)) *Repository_ReleaseGraphUpsertClaims_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].([]int32)) + }) + return _c +} + +func (_c *Repository_ReleaseGraphUpsertClaims_Call) Return(_a0 error) *Repository_ReleaseGraphUpsertClaims_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *Repository_ReleaseGraphUpsertClaims_Call) RunAndReturn(run func(context.Context, []int32) error) *Repository_ReleaseGraphUpsertClaims_Call { + _c.Call.Return(run) + return _c +} + // ResetApplyingIngestionLogs provides a mock function with given fields: ctx func (_m *Repository) ResetApplyingIngestionLogs(ctx context.Context) error { ret := _m.Called(ctx) @@ -714,6 +868,52 @@ func (_c *Repository_ResetApplyingIngestionLogs_Call) RunAndReturn(run func(cont return _c } +// ResetClaimedGraphUpserts provides a mock function with given fields: ctx +func (_m *Repository) ResetClaimedGraphUpserts(ctx context.Context) error { + ret := _m.Called(ctx) + + if len(ret) == 0 { + panic("no return value specified for ResetClaimedGraphUpserts") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context) error); ok { + r0 = rf(ctx) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// Repository_ResetClaimedGraphUpserts_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ResetClaimedGraphUpserts' +type Repository_ResetClaimedGraphUpserts_Call struct { + *mock.Call +} + +// ResetClaimedGraphUpserts is a helper method to define mock.On call +// - ctx context.Context +func (_e *Repository_Expecter) ResetClaimedGraphUpserts(ctx interface{}) *Repository_ResetClaimedGraphUpserts_Call { + return &Repository_ResetClaimedGraphUpserts_Call{Call: _e.mock.On("ResetClaimedGraphUpserts", ctx)} +} + +func (_c *Repository_ResetClaimedGraphUpserts_Call) Run(run func(ctx context.Context)) *Repository_ResetClaimedGraphUpserts_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context)) + }) + return _c +} + +func (_c *Repository_ResetClaimedGraphUpserts_Call) Return(_a0 error) *Repository_ResetClaimedGraphUpserts_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *Repository_ResetClaimedGraphUpserts_Call) RunAndReturn(run func(context.Context) error) *Repository_ResetClaimedGraphUpserts_Call { + _c.Call.Return(run) + return _c +} + // RetrieveDeviationByID provides a mock function with given fields: ctx, externalID func (_m *Repository) RetrieveDeviationByID(ctx context.Context, externalID string) (*reconcilerpb.Deviation, error) { ret := _m.Called(ctx, externalID) diff --git a/diode-server/reconciler/ops/types.go b/diode-server/reconciler/ops/types.go index 6a76eeb8..3da82552 100644 --- a/diode-server/reconciler/ops/types.go +++ b/diode-server/reconciler/ops/types.go @@ -19,10 +19,16 @@ type PriorIngestionLog struct { IngestionLog *reconcilerpb.IngestionLog } -// QueuedIngestionLog represents an ingestion log in QUEUED state ready for processing +// QueuedIngestionLog represents an ingestion log claimed for processing. +// SourceMetadata holds the raw JSONB blob stashed at ingest time (currently +// the IngestRequest.metadata struct) and is populated only by claim paths +// that need it — e.g. the GraphUpsertProcessor reads it back to merge +// request-level metadata into graph snapshots. Other claim paths leave it +// nil. type QueuedIngestionLog struct { - ID int32 - IngestionLog *reconcilerpb.IngestionLog + ID int32 + IngestionLog *reconcilerpb.IngestionLog + SourceMetadata []byte } // BulkGenerateChangeSetResult holds the result of generating a change set for a single item in a bulk operation. diff --git a/diode-server/reconciler/repository.go b/diode-server/reconciler/repository.go index 3b04067b..288fd9a3 100644 --- a/diode-server/reconciler/repository.go +++ b/diode-server/reconciler/repository.go @@ -35,4 +35,10 @@ type Repository interface { ClaimQueuedIngestionLogs(ctx context.Context, batchSize int32) ([]ops.QueuedIngestionLog, error) ClaimQueuedForAutoApply(ctx context.Context, batchSize int32) ([]ops.QueuedIngestionLog, error) ResetApplyingIngestionLogs(ctx context.Context) error + + // Graph-upsert processing (independent of the ingestion state machine) + ClaimGraphUpsertCandidates(ctx context.Context, batchSize, maxAttempts int32) ([]ops.QueuedIngestionLog, error) + MarkGraphUpserted(ctx context.Context, ids []int32) error + ReleaseGraphUpsertClaims(ctx context.Context, ids []int32) error + ResetClaimedGraphUpserts(ctx context.Context) error }