Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 38 additions & 30 deletions diode-server/cmd/reconciler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,12 +140,14 @@ func main() {

repository := postgres.NewRepository(dbPool)

// Initialize graph Service if graph DB feature is enabled
var graphServiceOpt reconciler.ProcessorOption
// Build a fresh graph.Service per call when the graph DB feature is
// enabled. The processor invokes this once per worker goroutine because
// graph.Service holds per-call mutable state on its receiver and is
// documented as not safe for concurrent use.
var graphServiceFactory reconciler.GraphServiceFactory
if cfg.EnableGraphDB {
s.Logger().Info("graph DB feature enabled, initializing graph Service")
s.Logger().Info("graph DB feature enabled, initializing graph Service factory")

// Load matching configuration if provided
var matchingConfig *matching.Config
if cfg.EntityMatchingConfigPath != "" {
var err error
Expand All @@ -160,28 +162,24 @@ func main() {
}
}

// Create graph repository adapter
graphRepo := postgres.NewGraphRepository(dbPool)

// Create graph Service with repository
var opts []graph.Option
if matchingConfig != nil {
opts = append(opts, graph.WithMatchingConfig(matchingConfig))

// Create entity matcher for confidence-based matching
matcherConfig := &matching.EntityMatchingConfig{
Rules: matchingConfig.GetFinalRules(),
GlobalMinConf: matching.MatchConfidence(matchingConfig.GlobalSettings.DefaultMinConfidence),
EnableFallback: true,
CacheResults: true,
MaxCacheSize: 1000,
graphServiceFactory = func() reconciler.GraphEntityUpserter {
var opts []graph.Option
if matchingConfig != nil {
opts = append(opts, graph.WithMatchingConfig(matchingConfig))

matcherConfig := &matching.EntityMatchingConfig{
Rules: matchingConfig.GetFinalRules(),
GlobalMinConf: matching.MatchConfidence(matchingConfig.GlobalSettings.DefaultMinConfidence),
EnableFallback: true,
CacheResults: true,
MaxCacheSize: 1000,
}
entityMatcher := entitymatcher.NewMatcher(graphRepo, matcherConfig, s.Logger())
opts = append(opts, graph.WithEntityMatcher(entityMatcher))
}
entityMatcher := entitymatcher.NewMatcher(graphRepo, matcherConfig, s.Logger())
opts = append(opts, graph.WithEntityMatcher(entityMatcher))
return graph.NewService(graphRepo, s.Logger(), opts...)
}
graphService := graph.NewService(graphRepo, s.Logger(), opts...)

graphServiceOpt = reconciler.WithGraphService(graphService)
}

diodeToNetBoxMaxRetries := 3
Expand All @@ -206,13 +204,7 @@ func main() {
ops := reconciler.NewOps(repository, nbClient, s.Logger(), nil)
ops.Start(ctx)

// Build processor options
var processorOpts []reconciler.ProcessorOption
if graphServiceOpt != nil {
processorOpts = append(processorOpts, graphServiceOpt)
}

ingestionProcessor, err := reconciler.NewIngestionProcessor(ctx, s.Logger(), cfg, redisClient, redisStreamClient, reconciler.DefaultRedisStreamID, reconciler.DefaultRedisConsumerGroup, ops, metricRecorder, processorOpts...)
ingestionProcessor, err := reconciler.NewIngestionProcessor(ctx, s.Logger(), cfg, redisClient, redisStreamClient, reconciler.DefaultRedisStreamID, reconciler.DefaultRedisConsumerGroup, ops, metricRecorder)
if err != nil {
s.Logger().Error("failed to instantiate ingestion processor", "error", err)
metricRecorder.RecordServiceStartupAttempt(ctx, false)
Expand Down Expand Up @@ -260,6 +252,22 @@ func main() {
}
}

// Graph-upsert runs as an independent processor when the feature is
// enabled — it polls ingestion_logs for rows that haven't been mirrored
// into the graph DB yet and drains them at its own pace, so a graph-DB
// outage cannot block plan/apply or the consume loop.
if graphServiceFactory != nil {
if err := repository.ResetClaimedGraphUpserts(ctx); err != nil {
s.Logger().Error("failed to reset stale graph-upsert claims", "error", err)
}
graphUpsertProcessor := reconciler.NewGraphUpsertProcessor(s.Logger(), cfg, repository, metricRecorder, graphServiceFactory, backpressure)
if err := s.RegisterComponent(graphUpsertProcessor); err != nil {
s.Logger().Error("failed to register graph upsert processor", "error", err)
metricRecorder.RecordServiceStartupAttempt(ctx, false)
os.Exit(1)
}
}

authorizer := authutil.NewContextAuthorizer(s.Logger())
gRPCServer, err := reconciler.NewServer(ctx, s.Logger(), repository, serverInterceptors(authorizer, s.Logger())...)
if err != nil {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
-- +goose Up

-- Track graph-upsert progress per ingestion log. Graph upsert runs on a
-- separate processor (GraphUpsertProcessor) that is independent of the
-- ingestion state machine; using its own columns keeps the two concerns
-- decoupled and lets graph upsert run on rows in any state.
--
-- graph_upserted_at — set when graph upsert completes successfully
-- graph_upsert_attempts — number of attempts so far (terminal at the
-- processor-configured max)
-- graph_upsert_claimed_at — soft lease for the worker that currently owns
-- the row; cleared on startup so a crashed worker
-- does not leak its claim
ALTER TABLE ingestion_logs
ADD COLUMN graph_upserted_at TIMESTAMP WITH TIME ZONE,
ADD COLUMN graph_upsert_attempts INTEGER NOT NULL DEFAULT 0,
ADD COLUMN graph_upsert_claimed_at TIMESTAMP WITH TIME ZONE;

-- Partial index for the claim query: only un-upserted, un-claimed rows are
-- ever scanned. Terminal-failed rows fall out of the index once attempts
-- exceeds the processor's max-attempts setting; the WHERE clause in the
-- claim query handles that filter.
CREATE INDEX IF NOT EXISTS idx_ingestion_logs_graph_upsert_pending
ON ingestion_logs (id)
WHERE graph_upserted_at IS NULL AND graph_upsert_claimed_at IS NULL;

-- +goose Down

DROP INDEX IF EXISTS idx_ingestion_logs_graph_upsert_pending;

ALTER TABLE ingestion_logs
DROP COLUMN IF EXISTS graph_upsert_claimed_at,
DROP COLUMN IF EXISTS graph_upsert_attempts,
DROP COLUMN IF EXISTS graph_upserted_at;
42 changes: 42 additions & 0 deletions diode-server/dbstore/postgres/queries/ingestion_logs.sql
Original file line number Diff line number Diff line change
Expand Up @@ -141,3 +141,45 @@ RETURNING *;
UPDATE ingestion_logs
SET state = 1
WHERE state = 8;

-- name: ClaimGraphUpsertCandidates :many
-- Claim a batch of ingestion logs that have not yet been upserted into the
-- graph DB, marking them as claimed by setting graph_upsert_claimed_at and
-- incrementing graph_upsert_attempts. Rows past max_attempts are skipped and
-- left in their terminal-failed state.
UPDATE ingestion_logs
SET graph_upsert_claimed_at = CURRENT_TIMESTAMP,
graph_upsert_attempts = graph_upsert_attempts + 1
WHERE id IN (
SELECT id FROM ingestion_logs
WHERE graph_upserted_at IS NULL
AND graph_upsert_claimed_at IS NULL
AND graph_upsert_attempts < sqlc.arg('max_attempts')::int4
ORDER BY id
LIMIT sqlc.arg('batch_size')
FOR UPDATE SKIP LOCKED
)
RETURNING *;

-- name: MarkGraphUpserted :exec
-- Mark a batch of ingestion logs as successfully upserted into the graph DB.
UPDATE ingestion_logs
SET graph_upserted_at = CURRENT_TIMESTAMP,
graph_upsert_claimed_at = NULL
WHERE id = ANY(@ids::int4[]);

-- name: ReleaseGraphUpsertClaims :exec
-- Release graph-upsert claims for a batch of ingestion logs without marking
-- them as upserted. Used when an attempt fails but more retries are still
-- allowed by max_attempts.
UPDATE ingestion_logs
SET graph_upsert_claimed_at = NULL
WHERE id = ANY(@ids::int4[]);

-- name: ResetClaimedGraphUpserts :exec
-- Clear stale graph-upsert claims on startup so rows held by a crashed
-- worker are re-claimable. Idempotent — safe to run on every startup.
UPDATE ingestion_logs
SET graph_upsert_claimed_at = NULL
WHERE graph_upsert_claimed_at IS NOT NULL
AND graph_upserted_at IS NULL;
59 changes: 59 additions & 0 deletions diode-server/dbstore/postgres/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -785,3 +785,62 @@ func (r *Repository) ClaimQueuedForAutoApply(ctx context.Context, batchSize int3
func (r *Repository) ResetApplyingIngestionLogs(ctx context.Context) error {
return r.queries.ResetApplyingIngestionLogs(ctx)
}

// ClaimGraphUpsertCandidates claims a batch of ingestion logs that have not
// yet been upserted into the graph DB. Each claimed row has its claim
// timestamp set and its attempt counter incremented; the caller is then
// responsible for finalizing each claim via MarkGraphUpserted or
// ReleaseGraphUpsertClaims.
//
// SourceMetadata is populated on each returned QueuedIngestionLog so the
// graph upsert path can recover the original IngestRequest.metadata blob
// that was stashed at ingest time.
func (r *Repository) ClaimGraphUpsertCandidates(ctx context.Context, batchSize, maxAttempts int32) ([]ops.QueuedIngestionLog, error) {
dbLogs, err := r.queries.ClaimGraphUpsertCandidates(ctx, postgres.ClaimGraphUpsertCandidatesParams{
MaxAttempts: maxAttempts,
BatchSize: batchSize,
})
if err != nil {
return nil, fmt.Errorf("failed to claim graph-upsert candidates: %w", err)
}

result := make([]ops.QueuedIngestionLog, 0, len(dbLogs))
for _, dbLog := range dbLogs {
log, err := dbLog.ToProto()
if err != nil {
return nil, fmt.Errorf("failed to convert ingestion log %d to proto: %w", dbLog.ID, err)
}
result = append(result, ops.QueuedIngestionLog{
ID: dbLog.ID,
IngestionLog: log,
SourceMetadata: dbLog.SourceMetadata,
})
}
return result, nil
}

// MarkGraphUpserted marks the given ingestion logs as successfully upserted
// into the graph DB and releases their claims.
func (r *Repository) MarkGraphUpserted(ctx context.Context, ids []int32) error {
if len(ids) == 0 {
return nil
}
return r.queries.MarkGraphUpserted(ctx, ids)
}

// ReleaseGraphUpsertClaims clears the claim on a batch of ingestion logs
// without marking them as upserted, so they can be re-claimed on the next
// polling cycle (subject to max-attempts).
func (r *Repository) ReleaseGraphUpsertClaims(ctx context.Context, ids []int32) error {
if len(ids) == 0 {
return nil
}
return r.queries.ReleaseGraphUpsertClaims(ctx, ids)
}

// ResetClaimedGraphUpserts clears stale graph-upsert claims held by workers
// that died between claiming and finalizing. Idempotent — safe to run on
// every startup.
func (r *Repository) ResetClaimedGraphUpserts(ctx context.Context) error {
return r.queries.ResetClaimedGraphUpserts(ctx)
}
5 changes: 5 additions & 0 deletions diode-server/docker/sample.env
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,8 @@ INGESTION_LOG_PROCESSOR_BATCH_SIZE=50
INGESTION_LOG_PROCESSOR_CONCURRENCY=1
AUTO_APPLY_PROCESSOR_BATCH_SIZE=50
AUTO_APPLY_PROCESSOR_CONCURRENCY=1

# Graph upsert processor (only runs when ENABLE_GRAPH_DB=true)
# GRAPH_UPSERT_PROCESSOR_BATCH_SIZE=100
# GRAPH_UPSERT_PROCESSOR_CONCURRENCY=1
# GRAPH_UPSERT_PROCESSOR_MAX_ATTEMPTS=5
Loading
Loading