diff --git a/pkg/db/gateway_envelope.go b/pkg/db/gateway_envelope.go index 2c068ce83..a431dfca1 100644 --- a/pkg/db/gateway_envelope.go +++ b/pkg/db/gateway_envelope.go @@ -123,7 +123,7 @@ func InsertGatewayEnvelopeWithChecksTransactional( return queries.InsertGatewayEnvelopeV3Row{}, err } - err = q.EnsureGatewayPartsV3(ctx, queries.EnsureGatewayPartsV3Params{ + err = q.EnsureGatewayPartsV4(ctx, queries.EnsureGatewayPartsV4Params{ OriginatorNodeID: row.OriginatorNodeID, OriginatorSequenceID: row.OriginatorSequenceID, BandWidth: GatewayEnvelopeBandWidth, @@ -161,7 +161,7 @@ func InsertGatewayEnvelopeWithChecksStandalone( return queries.InsertGatewayEnvelopeV3Row{}, err } - err = q.EnsureGatewayPartsV3(ctx, queries.EnsureGatewayPartsV3Params{ + err = q.EnsureGatewayPartsV4(ctx, queries.EnsureGatewayPartsV4Params{ OriginatorNodeID: row.OriginatorNodeID, OriginatorSequenceID: row.OriginatorSequenceID, BandWidth: GatewayEnvelopeBandWidth, diff --git a/pkg/db/gateway_envelope_batch.go b/pkg/db/gateway_envelope_batch.go index c708d96b8..90b956723 100644 --- a/pkg/db/gateway_envelope_batch.go +++ b/pkg/db/gateway_envelope_batch.go @@ -66,7 +66,7 @@ func InsertGatewayEnvelopeBatchV2Transactional( logger.Info("creating partitions for batch insert") for _, envelope := range input.Envelopes { - err = q.EnsureGatewayPartsV3(ctx, queries.EnsureGatewayPartsV3Params{ + err = q.EnsureGatewayPartsV4(ctx, queries.EnsureGatewayPartsV4Params{ OriginatorNodeID: envelope.OriginatorNodeID, OriginatorSequenceID: envelope.OriginatorSequenceID, BandWidth: GatewayEnvelopeBandWidth, diff --git a/pkg/db/gateway_envelope_test.go b/pkg/db/gateway_envelope_test.go index 76dd15b50..71611b810 100644 --- a/pkg/db/gateway_envelope_test.go +++ b/pkg/db/gateway_envelope_test.go @@ -3,6 +3,7 @@ package db_test import ( "context" "database/sql" + "fmt" "sync" "sync/atomic" "testing" @@ -281,6 +282,187 @@ func TestInsertGatewayEnvelopeWithChecksStandalone_AutoCreateAndRetry(t *testing require.NoError(t, err) } +// TestEnsureGatewayPartsV4_ConcurrentCreate verifies that many concurrent +// callers racing to create partitions for the same (originator, band) are +// all serialized via the advisory locks added in migration 00024, and that +// the end state is exactly one L1 + one L2 pair on both the meta and blob +// sides. +// +// Before migration 00024, ensure_gateway_parts_v3 did not serialize callers +// and relied on a regex match of SQLERRM to swallow "already a partition" +// errors — a race could leave the child unattached while the caller saw +// success, which later surfaced as SQLSTATE 23514 ("no partition of +// relation...") on the insert path. The v4 helpers hold a per-(originator, +// band) `pg_advisory_xact_lock` around the CREATE/ATTACH window and short- +// circuit via `pg_inherits` when the partition is already attached, so +// concurrent callers converge on the same committed state with no errors. +// +// Note: this test races the PARTITION CREATION path specifically (via +// EnsureGatewayPartsV4). It does not overlap the creation with concurrent +// inserts, because PostgreSQL's intrinsic lock ordering between INSERT +// (RowExclusive on parent) and ATTACH PARTITION (ShareRowExclusive on +// parent) can deadlock independently of this code — see the existing +// `TestInsertAndIncrementParallel` which works around the same limitation +// with an "insert one to avoid DDL creation deadlocks" warmup. +func TestEnsureGatewayPartsV4_ConcurrentCreate(t *testing.T) { + t.Parallel() + + ctx := context.Background() + db, _ := testutils.NewRawDB(t, ctx) + q := queries.New(db) + + const ( + nodeID int32 = 4242 + seqID int64 = 1 + bandWidth = xmtpd_db.GatewayEnvelopeBandWidth + numGoroutine = 32 + ) + + var ( + wg sync.WaitGroup + errCh = make(chan error, numGoroutine) + startCh = make(chan struct{}) + ) + + for range numGoroutine { + wg.Go(func() { + <-startCh + errCh <- q.EnsureGatewayPartsV4(ctx, queries.EnsureGatewayPartsV4Params{ + OriginatorNodeID: nodeID, + OriginatorSequenceID: seqID, + BandWidth: bandWidth, + }) + }) + } + + // Release all goroutines at once to maximise the chance of racing on + // partition creation. + close(startCh) + wg.Wait() + close(errCh) + + for err := range errCh { + require.NoError(t, err, "concurrent ensure_gateway_parts_v4 must not fail") + } + + // End state: exactly one L1 (and one L2) child under each of the meta + // and blob parents for this originator. + for _, parent := range []string{"gateway_envelopes_meta", "gateway_envelopes_blob"} { + l1Name := fmt.Sprintf("%s_o%d", parent, nodeID) + + var l1Count int + require.NoError( + t, + db.QueryRowContext( + ctx, + `SELECT COUNT(*) + FROM pg_inherits i + JOIN pg_class c ON c.oid = i.inhrelid + JOIN pg_class p ON p.oid = i.inhparent + WHERE p.relname = $1 AND c.relname = $2`, + parent, l1Name, + ).Scan(&l1Count), + ) + require.Equal(t, 1, l1Count, "expected exactly one L1 child under %s", parent) + + var l2Count int + require.NoError( + t, + db.QueryRowContext( + ctx, + `SELECT COUNT(*) + FROM pg_inherits i + JOIN pg_class c ON c.oid = i.inhrelid + JOIN pg_class p ON p.oid = i.inhparent + WHERE p.relname = $1`, + l1Name, + ).Scan(&l2Count), + ) + require.Equal(t, 1, l2Count, "expected exactly one L2 child under %s", l1Name) + } +} + +// TestInsertGatewayEnvelopeWithChecksStandalone_ConcurrentWithWarmup verifies +// that, once partitions exist, concurrent standalone inserts all land +// successfully without deadlocks or "no partition of relation" errors. +// +// This mirrors the pattern in `TestInsertAndIncrementParallel`: a single +// initial insert warms up the partitions so subsequent concurrent inserts +// hit the fast path. It exercises the EnsureGatewayPartsV4 short-circuit +// (pg_inherits says "already attached") when many goroutines retry after +// racing on a subsequent band boundary, without triggering the PG-intrinsic +// INSERT-vs-ATTACH deadlock. +func TestInsertGatewayEnvelopeWithChecksStandalone_ConcurrentWithWarmup(t *testing.T) { + t.Parallel() + + ctx := context.Background() + db, _ := testutils.NewRawDB(t, ctx) + q := queries.New(db) + + const ( + nodeID int32 = 4343 + numGoroutine int32 = 16 + ) + + // Warmup insert to create partitions and avoid the PG-intrinsic + // INSERT-vs-ATTACH deadlock in the concurrent phase. + _, err := xmtpd_db.InsertGatewayEnvelopeWithChecksStandalone( + ctx, + q, + queries.InsertGatewayEnvelopeV3Params{ + OriginatorNodeID: nodeID, + OriginatorSequenceID: 1, + Topic: testutils.RandomBytes(32), + OriginatorEnvelope: testutils.RandomBytes(64), + }, + ) + require.NoError(t, err) + + var ( + wg sync.WaitGroup + errCh = make(chan error, numGoroutine) + startCh = make(chan struct{}) + ) + + for i := range numGoroutine { + wg.Add(1) + go func(seq int64) { + defer wg.Done() + <-startCh + _, err := xmtpd_db.InsertGatewayEnvelopeWithChecksStandalone( + ctx, + q, + queries.InsertGatewayEnvelopeV3Params{ + OriginatorNodeID: nodeID, + OriginatorSequenceID: seq, + Topic: testutils.RandomBytes(32), + OriginatorEnvelope: testutils.RandomBytes(64), + }, + ) + errCh <- err + }(int64(i + 2)) + } + + close(startCh) + wg.Wait() + close(errCh) + + for err := range errCh { + require.NoError(t, err, "concurrent standalone insert must not fail after warmup") + } + + var count int64 + require.NoError( + t, + db.QueryRowContext( + ctx, + `SELECT COUNT(*) FROM gateway_envelopes_meta WHERE originator_node_id = $1`, + nodeID, + ).Scan(&count), + ) + require.Equal(t, int64(numGoroutine+1), count, "expected warmup + concurrent rows") +} + func TestInsertGatewayEnvelopeWithChecksStandalone_PreexistingPartitions(t *testing.T) { t.Parallel() diff --git a/pkg/db/migrations/00024_harden-partition-creation.down.sql b/pkg/db/migrations/00024_harden-partition-creation.down.sql new file mode 100644 index 000000000..c26cbc92a --- /dev/null +++ b/pkg/db/migrations/00024_harden-partition-creation.down.sql @@ -0,0 +1,9 @@ +-- Revert migration 00024 by dropping the hardened partition helpers. The +-- pre-existing v2/v3 helpers are untouched by the up migration, so no rename +-- or restore is required here. + +DROP FUNCTION IF EXISTS ensure_gateway_parts_v4(int, bigint, bigint); +DROP FUNCTION IF EXISTS make_blob_seq_subpart_v4(int, bigint, bigint); +DROP FUNCTION IF EXISTS make_blob_originator_part_v4(int); +DROP FUNCTION IF EXISTS make_meta_seq_subpart_v3(int, bigint, bigint); +DROP FUNCTION IF EXISTS make_meta_originator_part_v3(int); diff --git a/pkg/db/migrations/00024_harden-partition-creation.up.sql b/pkg/db/migrations/00024_harden-partition-creation.up.sql new file mode 100644 index 000000000..720a9f042 --- /dev/null +++ b/pkg/db/migrations/00024_harden-partition-creation.up.sql @@ -0,0 +1,318 @@ +-- Harden gateway-envelopes partition creation against races and a +-- format-string bug in the v2 helpers. +-- +-- Motivation (issue #1967): +-- * `make_meta_seq_subpart_v2` builds a CHECK predicate with `format()`, +-- passes four arguments into a string with three placeholders, and relies +-- on PostgreSQL silently dropping the extra. The resulting CHECK is +-- `originator_sequence_id >= _oid AND originator_sequence_id < _start` +-- (e.g. `>= 100 AND < 0` for the first band), not the intended +-- `>= _start AND < _end`. The CHECK is dropped right after ATTACH so the +-- bug is normally benign, but it is still an objective defect. +-- * Every `make_*_part_v*` helper ends in +-- EXCEPTION WHEN OTHERS THEN +-- IF SQLERRM ~ 'is already a partition' THEN NULL; +-- ELSE RAISE; END IF; +-- This regex-matches the PostgreSQL error text and, because the handler +-- sits inside a PL/pgSQL sub-transaction, rolls back the preceding +-- CREATE TABLE together with the failed ATTACH. Any other error whose +-- message happens to contain that substring — or any future change in +-- PostgreSQL's error text — turns into a silent no-op that leaves the +-- partition unattached while the caller sees success. +-- * `ensure_gateway_parts_v3` does not serialize concurrent callers. Two +-- callers racing on the same `(originator_node_id, band_start)` can +-- interleave CREATE / ATTACH / DROP CONSTRAINT, and PostgreSQL's +-- per-statement locks do not guarantee atomicity across the function. +-- * Even with *per-oid* serialization, two callers for DIFFERENT oids can +-- deadlock: PostgreSQL's `ATTACH PARTITION` on a sub-partitioned child +-- propagates locks up to the top-level partitioned parent AND across +-- sibling L1 children (to validate partition-bound non-overlap). A +-- caller at step 1 holding AccessExclusive on its newly-created `oN` +-- child while waiting for ShareRowExclusive on the top parent will +-- deadlock against a concurrent caller that has already acquired that +-- ShareRowExclusive and now wants ShareUpdateExclusive on the first +-- caller's `oN` child (as part of its own step-3 ATTACH propagation). +-- This was observed in CI as `TestCreateServer` SQLSTATE 40P01. +-- +-- Fix strategy (append-only; the v2/v3 helpers remain in `pg_proc`): +-- * New `make_*_part_v3`/`_v4` helpers that short-circuit via `pg_inherits` +-- when the partition is already attached, build a CORRECT CHECK +-- predicate, and let any ATTACH error propagate to the caller. +-- * New `ensure_gateway_parts_v4` that takes a single GLOBAL advisory lock +-- at entry, fully serializing all concurrent partition-creation work +-- regardless of oid or band. The helpers additionally take per-resource +-- advisory locks as defense-in-depth for direct callers. +-- +-- The legacy helpers are left in place so that migration-behavior tests +-- (e.g. `migration_00023_test.go`) continue to populate pre-rename databases +-- through their existing code paths. + +-- META: create LIST child for one originator, idempotently and with no +-- exception swallowing. +CREATE FUNCTION make_meta_originator_part_v3(_oid int) + RETURNS void AS $$ +DECLARE + subname text := format('gateway_envelopes_meta_o%s', _oid); + already_attached boolean; +BEGIN + -- Serialize concurrent callers for this originator on the meta side. + -- `pg_advisory_xact_lock(int, int)` is the two-argument form; the first + -- int is a namespace discriminator and the second is the resource id. + PERFORM pg_advisory_xact_lock(hashtext('xmtpd.gateway_envelopes_meta_l1'), _oid); + + -- Short-circuit if the partition is already attached to the expected + -- parent. This is the authoritative check — not a regex on SQLERRM. + SELECT EXISTS ( + SELECT 1 + FROM pg_inherits i + JOIN pg_class c ON c.oid = i.inhrelid + JOIN pg_class p ON p.oid = i.inhparent + WHERE c.relname = subname + AND p.relname = 'gateway_envelopes_meta' + ) INTO already_attached; + + IF already_attached THEN + -- Defensive cleanup: older callers may have left seed constraints + -- behind if ATTACH raised while the constraint was present. A + -- successful attach means the constraint is no longer needed. + EXECUTE format( + 'ALTER TABLE %I DROP CONSTRAINT IF EXISTS oid_check;', + subname + ); + RETURN; + END IF; + + -- Create the child with a validating CHECK so PostgreSQL can skip the + -- full-scan validation during ATTACH. The CHECK is dropped immediately + -- after the successful attach. + EXECUTE format( + 'CREATE TABLE IF NOT EXISTS %I ( + LIKE gateway_envelopes_meta INCLUDING DEFAULTS INCLUDING CONSTRAINTS, + CONSTRAINT oid_check CHECK (originator_node_id = %s) + ) PARTITION BY RANGE (originator_sequence_id);', + subname, + _oid::text + ); + + EXECUTE format( + 'ALTER TABLE gateway_envelopes_meta ATTACH PARTITION %I + FOR VALUES IN (%s);', + subname, + _oid::text + ); + + EXECUTE format( + 'ALTER TABLE %I DROP CONSTRAINT IF EXISTS oid_check;', + subname + ); +END; +$$ LANGUAGE plpgsql; + + +-- META: create a RANGE subpartition [start, end), idempotently. +CREATE FUNCTION make_meta_seq_subpart_v3(_oid int, _start bigint, _end bigint) + RETURNS void AS $$ +DECLARE + parent text := format('gateway_envelopes_meta_o%s', _oid); + subname text := format('gateway_envelopes_meta_o%s_s%s_%s', _oid, _start, _end); + already_attached boolean; +BEGIN + -- Serialize concurrent callers per (originator, band_start). + PERFORM pg_advisory_xact_lock( + hashtext('xmtpd.gateway_envelopes_meta_l2'), + hashtext(format('%s:%s', _oid, _start)) + ); + + SELECT EXISTS ( + SELECT 1 + FROM pg_inherits i + JOIN pg_class c ON c.oid = i.inhrelid + JOIN pg_class p ON p.oid = i.inhparent + WHERE c.relname = subname + AND p.relname = parent + ) INTO already_attached; + + IF already_attached THEN + EXECUTE format( + 'ALTER TABLE %I DROP CONSTRAINT IF EXISTS seq_id_check;', + subname + ); + RETURN; + END IF; + + -- Correct CHECK predicate: bounds come from (_start, _end). The v2 helper + -- had a format() arity bug that produced `>= _oid AND < _start` instead. + EXECUTE format( + 'CREATE TABLE IF NOT EXISTS %I ( + LIKE gateway_envelopes_meta INCLUDING DEFAULTS INCLUDING CONSTRAINTS, + CONSTRAINT seq_id_check CHECK ( originator_sequence_id >= %s AND originator_sequence_id < %s ) + );', + subname, + _start::text, + _end::text + ); + + EXECUTE format( + 'ALTER TABLE %I ATTACH PARTITION %I + FOR VALUES FROM (%s) TO (%s);', + parent, + subname, + _start::text, + _end::text + ); + + EXECUTE format( + 'ALTER TABLE %I DROP CONSTRAINT IF EXISTS seq_id_check;', + subname + ); +END; +$$ LANGUAGE plpgsql; + + +-- BLOB: create LIST child for one originator, idempotently. +CREATE FUNCTION make_blob_originator_part_v4(_oid int) + RETURNS void AS $$ +DECLARE + subname text := format('gateway_envelopes_blob_o%s', _oid); + already_attached boolean; +BEGIN + PERFORM pg_advisory_xact_lock(hashtext('xmtpd.gateway_envelopes_blob_l1'), _oid); + + SELECT EXISTS ( + SELECT 1 + FROM pg_inherits i + JOIN pg_class c ON c.oid = i.inhrelid + JOIN pg_class p ON p.oid = i.inhparent + WHERE c.relname = subname + AND p.relname = 'gateway_envelopes_blob' + ) INTO already_attached; + + IF already_attached THEN + EXECUTE format( + 'ALTER TABLE %I DROP CONSTRAINT IF EXISTS oid_check;', + subname + ); + RETURN; + END IF; + + EXECUTE format( + 'CREATE TABLE IF NOT EXISTS %I ( + LIKE gateway_envelopes_blob INCLUDING DEFAULTS INCLUDING CONSTRAINTS, + CONSTRAINT oid_check CHECK (originator_node_id = %s) + ) PARTITION BY RANGE (originator_sequence_id);', + subname, + _oid::text + ); + + EXECUTE format( + 'ALTER TABLE gateway_envelopes_blob ATTACH PARTITION %I + FOR VALUES IN (%s);', + subname, + _oid::text + ); + + EXECUTE format( + 'ALTER TABLE %I DROP CONSTRAINT IF EXISTS oid_check;', + subname + ); +END; +$$ LANGUAGE plpgsql; + + +-- BLOB: create a RANGE subpartition [start, end), idempotently. +CREATE FUNCTION make_blob_seq_subpart_v4(_oid int, _start bigint, _end bigint) + RETURNS void AS $$ +DECLARE + parent text := format('gateway_envelopes_blob_o%s', _oid); + subname text := format('gateway_envelopes_blob_o%s_s%s_%s', _oid, _start, _end); + already_attached boolean; +BEGIN + PERFORM pg_advisory_xact_lock( + hashtext('xmtpd.gateway_envelopes_blob_l2'), + hashtext(format('%s:%s', _oid, _start)) + ); + + SELECT EXISTS ( + SELECT 1 + FROM pg_inherits i + JOIN pg_class c ON c.oid = i.inhrelid + JOIN pg_class p ON p.oid = i.inhparent + WHERE c.relname = subname + AND p.relname = parent + ) INTO already_attached; + + IF already_attached THEN + EXECUTE format( + 'ALTER TABLE %I DROP CONSTRAINT IF EXISTS seq_id_check;', + subname + ); + RETURN; + END IF; + + EXECUTE format( + 'CREATE TABLE IF NOT EXISTS %I ( + LIKE gateway_envelopes_blob INCLUDING DEFAULTS INCLUDING CONSTRAINTS, + CONSTRAINT seq_id_check CHECK ( originator_sequence_id >= %s AND originator_sequence_id < %s ) + );', + subname, + _start::text, + _end::text + ); + + EXECUTE format( + 'ALTER TABLE %I ATTACH PARTITION %I + FOR VALUES FROM (%s) TO (%s);', + parent, + subname, + _start::text, + _end::text + ); + + EXECUTE format( + 'ALTER TABLE %I DROP CONSTRAINT IF EXISTS seq_id_check;', + subname + ); +END; +$$ LANGUAGE plpgsql; + + +-- Production partition ensure. Calls the hardened helpers above. +-- +-- Takes a single GLOBAL transaction-scoped advisory lock at entry to +-- fully serialize all partition-creation work across the cluster. +-- +-- The per-oid / per-band advisory locks inside the helpers are not +-- sufficient on their own: `ATTACH PARTITION` on a sub-partitioned child +-- propagates lock acquisitions across sibling partitions at the L1 level +-- (to validate partition-bound non-overlap), so two callers for different +-- oids can still deadlock on each other's freshly-created L1 children. +-- The global lock is cheap — partition creation is a rare (cold-path) +-- operation taken only when the savepoint-retry path fires. +CREATE FUNCTION ensure_gateway_parts_v4( + p_originator_node_id int, + p_originator_sequence_id bigint, + p_band_width bigint DEFAULT 1000000 +) RETURNS void LANGUAGE plpgsql AS $$ +DECLARE + v_band_start bigint := (p_originator_sequence_id / p_band_width) * p_band_width; +BEGIN + -- Global serialization: one ensure_gateway_parts_v4 call at a time. + -- Uses the single-argument form with a 64-bit namespace key derived + -- from a stable string so this lock cannot collide with per-oid locks + -- (which use the two-int form). + PERFORM pg_advisory_xact_lock(hashtext('xmtpd.ensure_gateway_parts')::bigint); + + PERFORM make_meta_originator_part_v3(p_originator_node_id); + PERFORM make_blob_originator_part_v4(p_originator_node_id); + PERFORM make_meta_seq_subpart_v3( + p_originator_node_id, + v_band_start, + v_band_start + p_band_width + ); + PERFORM make_blob_seq_subpart_v4( + p_originator_node_id, + v_band_start, + v_band_start + p_band_width + ); +END; +$$; diff --git a/pkg/db/migrations/migration_00024_test.go b/pkg/db/migrations/migration_00024_test.go new file mode 100644 index 000000000..bad21ecd7 --- /dev/null +++ b/pkg/db/migrations/migration_00024_test.go @@ -0,0 +1,205 @@ +package migrations_test + +import ( + "database/sql" + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/xmtp/xmtpd/pkg/db/migrations" + "github.com/xmtp/xmtpd/pkg/testutils" +) + +// TestMigration00024_EnsureGatewayPartsV4_AttachesL1AndL2 verifies that the +// hardened partition-ensure function attaches both the L1 (per-originator LIST) +// and L2 (per-(originator, seq-band) RANGE) partitions to their parents and +// leaves no seed CHECK constraints behind. +// +// The v2 `make_meta_seq_subpart` helper had a `format()` arity bug that +// produced a bogus `seq_id_check` (see migration 00024). The v4 helpers write +// a correct CHECK for the duration of ATTACH and drop it afterwards — the test +// proves no stray `oid_check` or `seq_id_check` constraints survive. +func TestMigration00024_EnsureGatewayPartsV4_AttachesL1AndL2(t *testing.T) { + ctx := t.Context() + database, _ := testutils.NewRawDB(t, ctx) + + const ( + oid = 100 + seqID = 1 + l1Meta = "gateway_envelopes_meta_o100" + l1Blob = "gateway_envelopes_blob_o100" + l2Meta = "gateway_envelopes_meta_o100_s0_1000000" + l2Blob = "gateway_envelopes_blob_o100_s0_1000000" + bandBw = int64(1_000_000) + ) + + _, err := database.ExecContext( + ctx, + `SELECT ensure_gateway_parts_v4($1, $2, $3)`, + oid, seqID, bandBw, + ) + require.NoError(t, err, "ensure_gateway_parts_v4 should succeed on a fresh schema") + + // L1 (meta) attached to gateway_envelopes_meta. + assertInherits(t, database, l1Meta, "gateway_envelopes_meta") + // L1 (blob) attached to gateway_envelopes_blob. + assertInherits(t, database, l1Blob, "gateway_envelopes_blob") + // L2 (meta) attached to the L1 meta. + assertInherits(t, database, l2Meta, l1Meta) + // L2 (blob) attached to the L1 blob. + assertInherits(t, database, l2Blob, l1Blob) + + // No residual CHECK constraints named oid_check / seq_id_check on any + // partition child — the v4 helpers drop them after ATTACH. + for _, child := range []string{l1Meta, l1Blob, l2Meta, l2Blob} { + assertNoCheckConstraint(t, database, child, "oid_check") + assertNoCheckConstraint(t, database, child, "seq_id_check") + } +} + +// TestMigration00024_EnsureGatewayPartsV4_Idempotent verifies that calling +// ensure_gateway_parts_v4 twice for the same (oid, seq) is a no-op on the +// second call — no duplicate L1/L2 children, no errors, and the pg_inherits +// row count is unchanged. +func TestMigration00024_EnsureGatewayPartsV4_Idempotent(t *testing.T) { + ctx := t.Context() + database, _ := testutils.NewRawDB(t, ctx) + + const ( + oid = 101 + seqID = 1 + bandBw = int64(1_000_000) + ) + + _, err := database.ExecContext( + ctx, + `SELECT ensure_gateway_parts_v4($1, $2, $3)`, + oid, seqID, bandBw, + ) + require.NoError(t, err) + + before := countInheritsForOriginator(t, database, oid) + + // Call again — must be a schema no-op. + _, err = database.ExecContext( + ctx, + `SELECT ensure_gateway_parts_v4($1, $2, $3)`, + oid, seqID, bandBw, + ) + require.NoError(t, err, "second call must not error") + + after := countInheritsForOriginator(t, database, oid) + assert.Equal(t, before, after, "second call should not create new partitions") +} + +// TestMigration00024_EnsureGatewayPartsV4_CoexistsWithV3 verifies the +// "append-only migration" invariant: the v3 helpers and v4 helpers can both +// be called against the same schema, and calling v4 for (oid, seq) that were +// already seeded by v3 is a no-op (no errors, no duplicate rows). +// +// The test seeds partitions at schema version 23 via ensure_gateway_parts_v3, +// migrates to HEAD (applying 00024), then calls ensure_gateway_parts_v4 for +// the same (oid, seq) and asserts it reports success without re-attaching. +func TestMigration00024_EnsureGatewayPartsV4_CoexistsWithV3(t *testing.T) { + ctx := t.Context() + database, _ := testutils.NewRawDBAtVersion(t, ctx, 23) + + const ( + oid = 102 + seqID = 1 + bandBw = int64(1_000_000) + ) + + // Seed via v3 at schema version 23. + _, err := database.ExecContext( + ctx, + `SELECT ensure_gateway_parts_v3($1, $2, $3)`, + oid, seqID, bandBw, + ) + require.NoError(t, err, "ensure_gateway_parts_v3 should succeed at v23") + + // Apply migration 00024. + require.NoError(t, migrations.Migrate(ctx, database)) + + before := countInheritsForOriginator(t, database, oid) + + _, err = database.ExecContext( + ctx, + `SELECT ensure_gateway_parts_v4($1, $2, $3)`, + oid, seqID, bandBw, + ) + require.NoError(t, err, "ensure_gateway_parts_v4 should be a no-op on v3-seeded partitions") + + after := countInheritsForOriginator(t, database, oid) + assert.Equal(t, before, after, "v4 should not alter partitions already attached by v3") +} + +// assertInherits requires child to be attached as a partition of parent +// (via pg_inherits). Both names are unqualified public-schema relation names. +func assertInherits(t *testing.T, database *sql.DB, child, parent string) { + t.Helper() + var exists bool + err := database.QueryRowContext( + t.Context(), + `SELECT EXISTS ( + SELECT 1 + FROM pg_inherits i + JOIN pg_class c ON c.oid = i.inhrelid + JOIN pg_class p ON p.oid = i.inhparent + WHERE c.relname = $1 + AND p.relname = $2 + )`, + child, parent, + ).Scan(&exists) + require.NoError(t, err) + assert.Truef(t, exists, "%s should be attached as a partition of %s", child, parent) +} + +// assertNoCheckConstraint fails if a CHECK constraint by the given name exists +// on the given relation. The v4 helpers install a seed CHECK for the duration +// of ATTACH, then drop it — so a surviving constraint is evidence that ATTACH +// failed silently (the exact regression this migration prevents). +func assertNoCheckConstraint(t *testing.T, database *sql.DB, relName, constraintName string) { + t.Helper() + var exists bool + err := database.QueryRowContext( + t.Context(), + `SELECT EXISTS ( + SELECT 1 + FROM pg_constraint con + JOIN pg_class c ON c.oid = con.conrelid + WHERE c.relname = $1 + AND con.conname = $2 + )`, + relName, constraintName, + ).Scan(&exists) + require.NoError(t, err) + assert.Falsef( + t, + exists, + "%s should not carry residual CHECK constraint %s after ATTACH", + relName, constraintName, + ) +} + +// countInheritsForOriginator returns the number of pg_inherits rows whose +// child relation is named gateway_envelopes_{meta,blob}_o[...]. Used to +// assert schema idempotence. +func countInheritsForOriginator(t *testing.T, database *sql.DB, oid int) int { + t.Helper() + metaPrefix := fmt.Sprintf("gateway_envelopes_meta_o%d%%", oid) + blobPrefix := fmt.Sprintf("gateway_envelopes_blob_o%d%%", oid) + var n int + err := database.QueryRowContext( + t.Context(), + `SELECT COUNT(*) + FROM pg_inherits i + JOIN pg_class c ON c.oid = i.inhrelid + WHERE c.relname LIKE $1 + OR c.relname LIKE $2`, + metaPrefix, blobPrefix, + ).Scan(&n) + require.NoError(t, err) + return n +} diff --git a/pkg/db/migrations/migrations_test.go b/pkg/db/migrations/migrations_test.go index 176468f01..d1297ef56 100644 --- a/pkg/db/migrations/migrations_test.go +++ b/pkg/db/migrations/migrations_test.go @@ -14,7 +14,7 @@ import ( "github.com/xmtp/xmtpd/pkg/topic" ) -const currentMigration int64 = 23 +const currentMigration int64 = 24 var ( originatorIDs = []int32{100, 200, 300} @@ -212,6 +212,10 @@ func TestMigrations(t *testing.T) { checkRenameEnvelopeBlobs(t, database) }) + t.Run("00024_harden-partition-creation", func(t *testing.T) { + checkHardenPartitionCreation(t, database) + }) + t.Run("data_verification", func(t *testing.T) { checkDataVerification(t, database) }) @@ -411,6 +415,19 @@ func checkMetaPartitionSelect(t *testing.T, database *sql.DB) { functionExists(t, database, "get_prunable_meta_partitions") } +func checkHardenPartitionCreation(t *testing.T, database *sql.DB) { + functions := []string{ + "make_meta_originator_part_v3", + "make_meta_seq_subpart_v3", + "make_blob_originator_part_v4", + "make_blob_seq_subpart_v4", + "ensure_gateway_parts_v4", + } + for _, fn := range functions { + functionExists(t, database, fn) + } +} + func checkRenameEnvelopeBlobs(t *testing.T, database *sql.DB) { // Renamed parent table is in place; the legacy name is gone. tableExists(t, database, "gateway_envelopes_blob") diff --git a/pkg/db/queries/db.go b/pkg/db/queries/db.go index 53c093238..44ab6adeb 100644 --- a/pkg/db/queries/db.go +++ b/pkg/db/queries/db.go @@ -57,6 +57,9 @@ func Prepare(ctx context.Context, db DBTX) (*Queries, error) { if q.ensureGatewayPartsV3Stmt, err = db.PrepareContext(ctx, ensureGatewayPartsV3); err != nil { return nil, fmt.Errorf("error preparing query EnsureGatewayPartsV3: %w", err) } + if q.ensureGatewayPartsV4Stmt, err = db.PrepareContext(ctx, ensureGatewayPartsV4); err != nil { + return nil, fmt.Errorf("error preparing query EnsureGatewayPartsV4: %w", err) + } if q.fetchPayerReportStmt, err = db.PrepareContext(ctx, fetchPayerReport); err != nil { return nil, fmt.Errorf("error preparing query FetchPayerReport: %w", err) } @@ -315,6 +318,11 @@ func (q *Queries) Close() error { err = fmt.Errorf("error closing ensureGatewayPartsV3Stmt: %w", cerr) } } + if q.ensureGatewayPartsV4Stmt != nil { + if cerr := q.ensureGatewayPartsV4Stmt.Close(); cerr != nil { + err = fmt.Errorf("error closing ensureGatewayPartsV4Stmt: %w", cerr) + } + } if q.fetchPayerReportStmt != nil { if cerr := q.fetchPayerReportStmt.Close(); cerr != nil { err = fmt.Errorf("error closing fetchPayerReportStmt: %w", cerr) @@ -695,6 +703,7 @@ type Queries struct { deleteObsoleteNoncesStmt *sql.Stmt ensureGatewayPartsStmt *sql.Stmt ensureGatewayPartsV3Stmt *sql.Stmt + ensureGatewayPartsV4Stmt *sql.Stmt fetchPayerReportStmt *sql.Stmt fetchPayerReportLockedStmt *sql.Stmt fetchPayerReportsStmt *sql.Stmt @@ -778,6 +787,7 @@ func (q *Queries) WithTx(tx *sql.Tx) *Queries { deleteObsoleteNoncesStmt: q.deleteObsoleteNoncesStmt, ensureGatewayPartsStmt: q.ensureGatewayPartsStmt, ensureGatewayPartsV3Stmt: q.ensureGatewayPartsV3Stmt, + ensureGatewayPartsV4Stmt: q.ensureGatewayPartsV4Stmt, fetchPayerReportStmt: q.fetchPayerReportStmt, fetchPayerReportLockedStmt: q.fetchPayerReportLockedStmt, fetchPayerReportsStmt: q.fetchPayerReportsStmt, diff --git a/pkg/db/queries/partitions.sql.go b/pkg/db/queries/partitions.sql.go index 997b969cc..73ec49d1a 100644 --- a/pkg/db/queries/partitions.sql.go +++ b/pkg/db/queries/partitions.sql.go @@ -52,6 +52,28 @@ func (q *Queries) EnsureGatewayPartsV3(ctx context.Context, arg EnsureGatewayPar return err } +const ensureGatewayPartsV4 = `-- name: EnsureGatewayPartsV4 :exec +SELECT ensure_gateway_parts_v4( + $1, + $2, + $3 + ) +` + +type EnsureGatewayPartsV4Params struct { + OriginatorNodeID int32 + OriginatorSequenceID int64 + BandWidth int64 +} + +// Hardened partition ensure. Adds advisory locking, pg_inherits-based +// short-circuit, corrected CHECK constraint, and no silent error swallowing. +// See migration 00024 for context. +func (q *Queries) EnsureGatewayPartsV4(ctx context.Context, arg EnsureGatewayPartsV4Params) error { + _, err := q.exec(ctx, q.ensureGatewayPartsV4Stmt, ensureGatewayPartsV4, arg.OriginatorNodeID, arg.OriginatorSequenceID, arg.BandWidth) + return err +} + const insertSavePoint = `-- name: InsertSavePoint :exec SAVEPOINT sp_part ` diff --git a/pkg/db/sqlc/partitions.sql b/pkg/db/sqlc/partitions.sql index 668f57804..de42a59dd 100644 --- a/pkg/db/sqlc/partitions.sql +++ b/pkg/db/sqlc/partitions.sql @@ -17,6 +17,16 @@ SELECT ensure_gateway_parts_v3( @band_width ); +-- name: EnsureGatewayPartsV4 :exec +-- Hardened partition ensure. Adds advisory locking, pg_inherits-based +-- short-circuit, corrected CHECK constraint, and no silent error swallowing. +-- See migration 00024 for context. +SELECT ensure_gateway_parts_v4( + @originator_node_id, + @originator_sequence_id, + @band_width + ); + -- name: MakeMetaOriginatorPart :exec SELECT make_meta_originator_part_v2(@originator_node_id); diff --git a/pkg/db/worker/worker.go b/pkg/db/worker/worker.go index 503251ffe..440ba0a51 100644 --- a/pkg/db/worker/worker.go +++ b/pkg/db/worker/worker.go @@ -167,12 +167,12 @@ func (w *Worker) runPartitionCheck(ctx context.Context, nodeID uint32, seqID int return nil } - params := queries.EnsureGatewayPartsV3Params{ + params := queries.EnsureGatewayPartsV4Params{ OriginatorNodeID: int32(nodeID), OriginatorSequenceID: targetSeqID, BandWidth: partitionSize, } - err := w.db.WriteQuery().EnsureGatewayPartsV3(ctx, params) + err := w.db.WriteQuery().EnsureGatewayPartsV4(ctx, params) if err != nil { return fmt.Errorf("could not create gateway partitions: %w", err) }