Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/db/gateway_envelope.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func InsertGatewayEnvelopeWithChecksTransactional(
return queries.InsertGatewayEnvelopeV3Row{}, err
}

err = q.EnsureGatewayPartsV3(ctx, queries.EnsureGatewayPartsV3Params{
err = q.EnsureGatewayPartsV4(ctx, queries.EnsureGatewayPartsV4Params{
OriginatorNodeID: row.OriginatorNodeID,
OriginatorSequenceID: row.OriginatorSequenceID,
BandWidth: GatewayEnvelopeBandWidth,
Expand Down Expand Up @@ -161,7 +161,7 @@ func InsertGatewayEnvelopeWithChecksStandalone(
return queries.InsertGatewayEnvelopeV3Row{}, err
}

err = q.EnsureGatewayPartsV3(ctx, queries.EnsureGatewayPartsV3Params{
err = q.EnsureGatewayPartsV4(ctx, queries.EnsureGatewayPartsV4Params{
OriginatorNodeID: row.OriginatorNodeID,
OriginatorSequenceID: row.OriginatorSequenceID,
BandWidth: GatewayEnvelopeBandWidth,
Expand Down
2 changes: 1 addition & 1 deletion pkg/db/gateway_envelope_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func InsertGatewayEnvelopeBatchV2Transactional(
logger.Info("creating partitions for batch insert")

for _, envelope := range input.Envelopes {
err = q.EnsureGatewayPartsV3(ctx, queries.EnsureGatewayPartsV3Params{
err = q.EnsureGatewayPartsV4(ctx, queries.EnsureGatewayPartsV4Params{
OriginatorNodeID: envelope.OriginatorNodeID,
OriginatorSequenceID: envelope.OriginatorSequenceID,
BandWidth: GatewayEnvelopeBandWidth,
Expand Down
182 changes: 182 additions & 0 deletions pkg/db/gateway_envelope_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package db_test
import (
"context"
"database/sql"
"fmt"
"sync"
"sync/atomic"
"testing"
Expand Down Expand Up @@ -281,6 +282,187 @@ func TestInsertGatewayEnvelopeWithChecksStandalone_AutoCreateAndRetry(t *testing
require.NoError(t, err)
}

// TestEnsureGatewayPartsV4_ConcurrentCreate verifies that many concurrent
// callers racing to create partitions for the same (originator, band) are
// all serialized via the advisory locks added in migration 00024, and that
// the end state is exactly one L1 + one L2 pair on both the meta and blob
// sides.
//
// Before migration 00024, ensure_gateway_parts_v3 did not serialize callers
// and relied on a regex match of SQLERRM to swallow "already a partition"
// errors — a race could leave the child unattached while the caller saw
// success, which later surfaced as SQLSTATE 23514 ("no partition of
// relation...") on the insert path. The v4 helpers hold a per-(originator,
// band) `pg_advisory_xact_lock` around the CREATE/ATTACH window and short-
// circuit via `pg_inherits` when the partition is already attached, so
// concurrent callers converge on the same committed state with no errors.
//
// Note: this test races the PARTITION CREATION path specifically (via
// EnsureGatewayPartsV4). It does not overlap the creation with concurrent
// inserts, because PostgreSQL's intrinsic lock ordering between INSERT
// (RowExclusive on parent) and ATTACH PARTITION (ShareRowExclusive on
// parent) can deadlock independently of this code — see the existing
// `TestInsertAndIncrementParallel` which works around the same limitation
// with an "insert one to avoid DDL creation deadlocks" warmup.
func TestEnsureGatewayPartsV4_ConcurrentCreate(t *testing.T) {
t.Parallel()

ctx := context.Background()
db, _ := testutils.NewRawDB(t, ctx)
q := queries.New(db)

const (
nodeID int32 = 4242
seqID int64 = 1
bandWidth = xmtpd_db.GatewayEnvelopeBandWidth
numGoroutine = 32
)

var (
wg sync.WaitGroup
errCh = make(chan error, numGoroutine)
startCh = make(chan struct{})
)

for range numGoroutine {
wg.Go(func() {
<-startCh
errCh <- q.EnsureGatewayPartsV4(ctx, queries.EnsureGatewayPartsV4Params{
OriginatorNodeID: nodeID,
OriginatorSequenceID: seqID,
BandWidth: bandWidth,
})
})
}

// Release all goroutines at once to maximise the chance of racing on
// partition creation.
close(startCh)
wg.Wait()
close(errCh)

for err := range errCh {
require.NoError(t, err, "concurrent ensure_gateway_parts_v4 must not fail")
}

// End state: exactly one L1 (and one L2) child under each of the meta
// and blob parents for this originator.
for _, parent := range []string{"gateway_envelopes_meta", "gateway_envelopes_blob"} {
l1Name := fmt.Sprintf("%s_o%d", parent, nodeID)

var l1Count int
require.NoError(
t,
db.QueryRowContext(
ctx,
`SELECT COUNT(*)
FROM pg_inherits i
JOIN pg_class c ON c.oid = i.inhrelid
JOIN pg_class p ON p.oid = i.inhparent
WHERE p.relname = $1 AND c.relname = $2`,
parent, l1Name,
).Scan(&l1Count),
)
require.Equal(t, 1, l1Count, "expected exactly one L1 child under %s", parent)

var l2Count int
require.NoError(
t,
db.QueryRowContext(
ctx,
`SELECT COUNT(*)
FROM pg_inherits i
JOIN pg_class c ON c.oid = i.inhrelid
JOIN pg_class p ON p.oid = i.inhparent
WHERE p.relname = $1`,
l1Name,
).Scan(&l2Count),
)
require.Equal(t, 1, l2Count, "expected exactly one L2 child under %s", l1Name)
}
}

// TestInsertGatewayEnvelopeWithChecksStandalone_ConcurrentWithWarmup verifies
// that, once partitions exist, concurrent standalone inserts all land
// successfully without deadlocks or "no partition of relation" errors.
//
// This mirrors the pattern in `TestInsertAndIncrementParallel`: a single
// initial insert warms up the partitions so subsequent concurrent inserts
// hit the fast path. It exercises the EnsureGatewayPartsV4 short-circuit
// (pg_inherits says "already attached") when many goroutines retry after
// racing on a subsequent band boundary, without triggering the PG-intrinsic
// INSERT-vs-ATTACH deadlock.
func TestInsertGatewayEnvelopeWithChecksStandalone_ConcurrentWithWarmup(t *testing.T) {
t.Parallel()

ctx := context.Background()
db, _ := testutils.NewRawDB(t, ctx)
q := queries.New(db)

const (
nodeID int32 = 4343
numGoroutine int32 = 16
)

// Warmup insert to create partitions and avoid the PG-intrinsic
// INSERT-vs-ATTACH deadlock in the concurrent phase.
_, err := xmtpd_db.InsertGatewayEnvelopeWithChecksStandalone(
ctx,
q,
queries.InsertGatewayEnvelopeV3Params{
OriginatorNodeID: nodeID,
OriginatorSequenceID: 1,
Topic: testutils.RandomBytes(32),
OriginatorEnvelope: testutils.RandomBytes(64),
},
)
require.NoError(t, err)

var (
wg sync.WaitGroup
errCh = make(chan error, numGoroutine)
startCh = make(chan struct{})
)

for i := range numGoroutine {
wg.Add(1)
go func(seq int64) {
defer wg.Done()
<-startCh
_, err := xmtpd_db.InsertGatewayEnvelopeWithChecksStandalone(
ctx,
q,
queries.InsertGatewayEnvelopeV3Params{
OriginatorNodeID: nodeID,
OriginatorSequenceID: seq,
Topic: testutils.RandomBytes(32),
OriginatorEnvelope: testutils.RandomBytes(64),
},
)
errCh <- err
}(int64(i + 2))
}

close(startCh)
wg.Wait()
close(errCh)

for err := range errCh {
require.NoError(t, err, "concurrent standalone insert must not fail after warmup")
}

var count int64
require.NoError(
t,
db.QueryRowContext(
ctx,
`SELECT COUNT(*) FROM gateway_envelopes_meta WHERE originator_node_id = $1`,
nodeID,
).Scan(&count),
)
require.Equal(t, int64(numGoroutine+1), count, "expected warmup + concurrent rows")
}

func TestInsertGatewayEnvelopeWithChecksStandalone_PreexistingPartitions(t *testing.T) {
t.Parallel()

Expand Down
9 changes: 9 additions & 0 deletions pkg/db/migrations/00024_harden-partition-creation.down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
-- Revert migration 00024 by dropping the hardened partition helpers. The
-- pre-existing v2/v3 helpers are untouched by the up migration, so no rename
-- or restore is required here.

DROP FUNCTION IF EXISTS ensure_gateway_parts_v4(int, bigint, bigint);
DROP FUNCTION IF EXISTS make_blob_seq_subpart_v4(int, bigint, bigint);
DROP FUNCTION IF EXISTS make_blob_originator_part_v4(int);
DROP FUNCTION IF EXISTS make_meta_seq_subpart_v3(int, bigint, bigint);
DROP FUNCTION IF EXISTS make_meta_originator_part_v3(int);
Loading
Loading