diff --git a/CHANGELOG.md b/CHANGELOG.md index 12fb27f..963b4fc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,49 @@ # Lib-systemplane Changelog +## [Unreleased] + +### Changed + +- **lib-systemplane no longer creates its schema or seeds defaults at + runtime.** The Postgres store and the multi-tenant `Manager` previously ran + `CREATE TABLE` / `CREATE FUNCTION` / `CREATE TRIGGER` and an + `INSERT ... ON CONFLICT DO NOTHING` defaults seed on first use + (`Store.Start` / `OnTenantActivated`). Those runtime DDL/seed paths are + removed. Consumers provision `systemplane_entries` (plus + `systemplane_notify_v3()` and the INSERT/DELETE and UPDATE NOTIFY triggers) + and any default values externally — e.g. via their migration pipeline — + using the DDL published by `SchemaSQL()` / `DefaultSeedSQL()`. The runtime + database role needs only DML (`SELECT`/`INSERT`/`UPDATE`/`DELETE`) + + `LISTEN`; it no longer needs `CREATE` on the schema. This aligns with the + least-privilege per-tenant roles handed back by the tenant-manager (which + reject runtime DDL with `permission denied for schema ... (42501)`). No + current consumers depend on the removed runtime bootstrap, so this is a + behavior change with no expected real-world breakage. +- Warm-load (`OnTenantActivated`) now tolerates a not-yet-provisioned table: + if `systemplane_entries` does not exist yet (SQLSTATE `42P01`) it logs at + WARN and proceeds with an empty cache instead of failing activation; + LISTEN/poll refreshes the cache once the consumer's migration creates the + table. Reads return not-found / zero-value as before. + +### Removed + +- Postgres store: `runSchema` and the `CREATE ...` DDL builders, the + `ensureSchema` / `schemaOnce` / `schemaErr` lazy-bootstrap machinery, and + the `Start`/`resolveDB` calls into them. +- Manager: `runSchemaAndSeed`, `runSchema`, and the runtime `seedDefaults` + defaults seed. + +### Unchanged + +- `SchemaSQL()` and `DefaultSeedSQL()` are intact and are now the ONLY way the + schema and defaults are expressed, for consumers to vendor into migrations. + +> Recommended version: **v1.7.0** (next beta `v1.7.0-beta.1`) — minor bump +> continuing the v1.6.x line that introduced `SchemaSQL()` / `DefaultSeedSQL()`. +> Tag owned by the maintainer; not tagged here. + +--- + ## [1.5.0](https://github.com/LerianStudio/lib-systemplane/releases/tag/v1.5.0) - **Features** @@ -35,8 +79,9 @@ Contributors: @bedatty, @fredcamaral, @jeffersonrodrigues92 identical v1.4.0 behaviour. - Schema bootstrap + defaults seed via `INSERT ... ON CONFLICT DO NOTHING` happen at `OnTenantActivated` time. Operator-set values are never - overwritten. This removes the need for hand-rolled plugin-side migrations - that seeded systemplane defaults. + overwritten. (Superseded by the Unreleased change above: runtime schema + creation and the defaults seed were removed — provision the schema and + defaults externally via `SchemaSQL()` / `DefaultSeedSQL()`.) - Six new OpenTelemetry metrics: `systemplane.manager.tenants_active`, `cache_entries`, `notify_received_total`, `listen_disconnects_total`, `warmload_latency_seconds`, `get_cache_hits_total`. Tenant-id cardinality diff --git a/CLAUDE.md b/CLAUDE.md index ea92f8d..0b92474 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -58,7 +58,7 @@ These are external module imports. Do not rewrite them to in-repo paths. Do not The Client runs in one of two modes selected at construction: - **Single-tenant** (default). The constructor receives a non-nil `*sql.DB` / `*mongo.Client`. Reads serve from an in-process cache; writes upsert through the store and update the cache. The backend's changefeed (LISTEN/NOTIFY on Postgres, change stream on MongoDB) drives invalidation and fires `OnChange` subscribers. -- **Multi-tenant** (opt-in via `WithMultiTenantEnabled()`). DB/client may be nil. Every read/write resolves the tenant database from `ctx` via `tmcore.GetPGContext(ctx, module)` / `tmcore.GetMBContext(ctx, module)` (`lib-commons/v5/commons/tenant-manager/core`). The lib lazily runs `CREATE TABLE IF NOT EXISTS` (Postgres) / index/collection bootstrap (MongoDB) once per resolved tenant database via a `sync.Map`-backed `sync.Once` cache. No in-process cache. No LISTEN/NOTIFY. `OnChange` returns `ErrNotSupportedInMultiTenant`. Callers wire `tenant-manager/middleware.TenantMiddleware` (with `WithPG(...)` or `WithMB(...)` and a matching module name) before the lib's handlers. +- **Multi-tenant** (opt-in via `WithMultiTenantEnabled()`). DB/client may be nil. Every read/write resolves the tenant database from `ctx` via `tmcore.GetPGContext(ctx, module)` / `tmcore.GetMBContext(ctx, module)` (`lib-commons/v5/commons/tenant-manager/core`). For Postgres the lib performs NO runtime schema provisioning — `systemplane_entries` plus its NOTIFY trigger function/triggers must be created externally via `SchemaSQL()` / `DefaultSeedSQL()` (e.g. the consumer's migration pipeline); the runtime role only needs DML + `LISTEN`. (MongoDB still bootstraps its collection/indexes lazily once per resolved tenant database via a `sync.Map`-backed `sync.Once` cache.) No in-process cache. No LISTEN/NOTIFY. `OnChange` returns `ErrNotSupportedInMultiTenant`. Callers wire `tenant-manager/middleware.TenantMiddleware` (with `WithPG(...)` or `WithMB(...)` and a matching module name) before the lib's handlers. ### Storage shape diff --git a/README.md b/README.md index e0e59c0..29ebf8b 100644 --- a/README.md +++ b/README.md @@ -26,7 +26,28 @@ The library supports two modes; pick at construction time: | Single-tenant | `db *sql.DB` / `*mongo.Client` | In-process cache | Through cache + store | LISTEN/NOTIFY (Postgres) or change stream (MongoDB) | | Multi-tenant | May be nil | Resolved per-call via tenant-manager ctx | Same | Disabled — `OnChange` returns `ErrNotSupportedInMultiTenant` | -In multi-tenant mode the library does NOT hold an in-process cache. Every `Get` reads through the resolved tenant database. The lib expects the caller to wire `lib-commons/v5/commons/tenant-manager/middleware.TenantMiddleware` with `WithPG(pgManager, "")` (Postgres) or `WithMB(mongoManager, "")` (MongoDB) where `` matches the lib's `WithModule(...)` option (default `"systemplane"`). The middleware populates the request context; the lib calls `tmcore.GetPGContext` / `tmcore.GetMBContext` to resolve the tenant database, lazily ensures the schema on first use per database, and runs the read/write against that handle. +In multi-tenant mode the library does NOT hold an in-process cache. Every `Get` reads through the resolved tenant database. The lib expects the caller to wire `lib-commons/v5/commons/tenant-manager/middleware.TenantMiddleware` with `WithPG(pgManager, "")` (Postgres) or `WithMB(mongoManager, "")` (MongoDB) where `` matches the lib's `WithModule(...)` option (default `"systemplane"`). The middleware populates the request context; the lib calls `tmcore.GetPGContext` / `tmcore.GetMBContext` to resolve the tenant database and runs the read/write against that handle. + +> **Provisioning (Postgres).** The library no longer creates its schema or seeds defaults at runtime. Provision `systemplane_entries` (plus the `systemplane_notify_v3()` trigger function and the NOTIFY triggers) and any defaults externally — e.g. via your migration pipeline — using the DDL published by [`SchemaSQL()` / `DefaultSeedSQL()`](#schema-provisioning). The runtime database role only needs DML (`SELECT`/`INSERT`/`UPDATE`/`DELETE`) + `LISTEN`; it does NOT need `CREATE` on the schema. + +## Schema provisioning + +`lib-systemplane` does not run any DDL or defaults seed at runtime (single- or multi-tenant). The Postgres schema and defaults are published as importable artifacts so consumers can fold them into their own migration pipeline: + +```go +// systemplane.SchemaSQL() returns the full idempotent DDL: +// - CREATE TABLE IF NOT EXISTS systemplane_entries (...) +// - CREATE OR REPLACE FUNCTION systemplane_notify_v3() ... +// - the INSERT/DELETE and UPDATE NOTIFY triggers on the +// systemplane_changes channel +ddl := systemplane.SchemaSQL() + +// systemplane.DefaultSeedSQL() returns neutral runtime_config defaults +// inserted with ON CONFLICT (namespace, "key") DO NOTHING. +seed := systemplane.DefaultSeedSQL() +``` + +Run both through a privileged role during provisioning (e.g. as a migration executed by your migrate-up step or by the tenant-manager during per-tenant database provisioning). At runtime the library only reads, writes values (DML), and — in single-tenant mode — runs `LISTEN`, so the runtime role can be least-privilege with no `CREATE` on the schema. If the table has not been provisioned yet when a multi-tenant `Manager` activates a tenant, warm-load logs a warning and proceeds with an empty cache rather than failing; the cache refreshes via LISTEN/poll once the migration creates the table. ## Single-tenant Quickstart — PostgreSQL @@ -275,7 +296,7 @@ func run() error { } ``` -In multi-tenant mode the lib lazily runs `CREATE TABLE IF NOT EXISTS` (Postgres) or its MongoDB equivalent once per tenant database, the first time a request touches that database. Calling `OnChange` returns `ErrNotSupportedInMultiTenant`. +In multi-tenant mode the lib does NOT provision schema at runtime for Postgres — the `systemplane_entries` table and its NOTIFY triggers must be created externally (see [Schema provisioning](#schema-provisioning)) before the lib reads or writes. Calling `OnChange` returns `ErrNotSupportedInMultiTenant`. ## Admin HTTP routes diff --git a/examples/manager/main.go b/examples/manager/main.go index 7b00da7..4492099 100644 --- a/examples/manager/main.go +++ b/examples/manager/main.go @@ -64,8 +64,11 @@ func main() { defer func() { _ = client.Close() }() - // 2. Register every runtime configuration key. Defaults are seeded - // per tenant by the Manager on OnTenantActivated. + // 2. Register every runtime configuration key. The Manager does NOT seed + // defaults at runtime — provision the schema and any defaults externally + // via systemplane.SchemaSQL() / DefaultSeedSQL() (e.g. your migration + // pipeline). OnTenantActivated warm-loads existing values and opens + // LISTEN; a not-yet-provisioned table is tolerated (empty cache). if err := client.Register("ledger", "retries", 3, systemplane.WithDescription("retry count")); err != nil { fail("Register retries", err) } diff --git a/internal/manager/connector.go b/internal/manager/connector.go index 8e9914b..c303268 100644 --- a/internal/manager/connector.go +++ b/internal/manager/connector.go @@ -18,7 +18,8 @@ import ( // substitute a fake implementation. type Connector interface { // ResolveDB returns the tenant's primary database handle. Used for - // schema bootstrap, defaults seed, warm-load and NOTIFY refresh reads. + // warm-load and NOTIFY refresh reads. The schema is provisioned + // externally; the Manager never issues DDL through this handle. ResolveDB(ctx context.Context, tenantID string) (dbresolver.DB, error) // ResolveDSN returns the connection string used by pgx.Connect to open diff --git a/internal/manager/lifecycle.go b/internal/manager/lifecycle.go index 8d4fe31..3fb3847 100644 --- a/internal/manager/lifecycle.go +++ b/internal/manager/lifecycle.go @@ -14,11 +14,13 @@ import ( // OnTenantActivated bootstraps systemplane state for tenantID. // -// Slice 3+: ensures the systemplane schema exists, seeds defaults via -// INSERT ON CONFLICT DO NOTHING, opens the LISTEN goroutine, and warm-loads -// every registered key into the cache. Slice 1 records the tenant in -// perTenant so subsequent Get calls observe an empty cache for it (which -// still falls through to the DB). +// It warm-loads every registered key into the per-tenant cache and opens the +// LISTEN goroutine. It does NOT create the schema or seed defaults — those are +// provisioned externally by the consumer's migration pipeline (see +// internal/manager/schema.go and the root package's SchemaSQL() / +// DefaultSeedSQL()). Warm-load tolerates a not-yet-provisioned table: it logs +// and proceeds with an empty cache so a provisioning race never wedges +// activation; LISTEN/poll refreshes the cache once the table exists. func (m *Manager) OnTenantActivated(ctx context.Context, tenantID string) error { if m == nil || m.IsClosed() || tenantID == "" { return nil @@ -49,15 +51,6 @@ func (m *Manager) OnTenantActivated(ctx context.Context, tenantID string) error ts := m.tenantStateFor(tenantID) registered := m.hooks.RegisteredKeys() - if err := m.runSchemaAndSeed(ctx, db, registered); err != nil { - m.logWarn(ctx, "OnTenantActivated: schema/seed failed", - log.String("tenant_id", tenantID), - log.Err(err), - ) - - return err - } - if err := m.warmLoad(ctx, db, ts, registered); err != nil { m.logWarn(ctx, "OnTenantActivated: warm-load failed", log.String("tenant_id", tenantID), diff --git a/internal/manager/listen_integration_test.go b/internal/manager/listen_integration_test.go index e7eb840..28cd70a 100644 --- a/internal/manager/listen_integration_test.go +++ b/internal/manager/listen_integration_test.go @@ -187,10 +187,7 @@ func TestListen_ReadSingle_NoRows(t *testing.T) { db, raw, _, cleanup := freshTenantDB(t, baseDSN) t.Cleanup(cleanup) - m := New(nil) - if err := m.runSchema(context.Background(), db); err != nil { - t.Fatalf("runSchema: %v", err) - } + provisionTestSchema(t, db) _ = raw v, found, err := readSingle(context.Background(), db, "ns", "missing") @@ -210,10 +207,7 @@ func TestListen_ReadSingle_DecodeError(t *testing.T) { db, raw, _, cleanup := freshTenantDB(t, baseDSN) t.Cleanup(cleanup) - m := New(nil) - if err := m.runSchema(context.Background(), db); err != nil { - t.Fatalf("runSchema: %v", err) - } + provisionTestSchema(t, db) // JSONB enforces JSON well-formedness, but our column's bytes path will // still decode any valid JSON. To force a Unmarshal failure we have to @@ -363,9 +357,7 @@ func TestListen_ApplyEvent_UpsertRowMissing_DeletesEntry(t *testing.T) { tel, _ := newTestTelemetry(t) m.metrics = newMetrics(tel, log.NewNop(), DefaultAggregateTenantThreshold) - if err := m.runSchema(context.Background(), db); err != nil { - t.Fatalf("runSchema: %v", err) - } + provisionTestSchema(t, db) fc := newFakeConnectorInternal() fc.setTenant("t", "", db) @@ -400,9 +392,7 @@ func TestListen_ApplyEvent_UpsertSuccess_UpdatesCacheAndDispatches(t *testing.T) tel, _ := newTestTelemetry(t) m.metrics = newMetrics(tel, log.NewNop(), DefaultAggregateTenantThreshold) - if err := m.runSchema(context.Background(), db); err != nil { - t.Fatalf("runSchema: %v", err) - } + provisionTestSchema(t, db) if _, err := raw.Exec(`INSERT INTO ` + defaultTable + ` (namespace, key, value, updated_by) VALUES ('a', 'k', '"fresh"'::jsonb, 'op')`); err != nil { @@ -448,9 +438,7 @@ func TestListen_ConsumeAndReconnect_RealNotify(t *testing.T) { tel, _ := newTestTelemetry(t) m.metrics = newMetrics(tel, log.NewNop(), DefaultAggregateTenantThreshold) - if err := m.runSchema(context.Background(), db); err != nil { - t.Fatalf("runSchema: %v", err) - } + provisionTestSchema(t, db) fc := newFakeConnectorInternal() fc.setTenant("t", tDSN, db) @@ -505,9 +493,7 @@ func TestListen_Reconnect_RecoversAfterDrop(t *testing.T) { tel, _ := newTestTelemetry(t) m.metrics = newMetrics(tel, log.NewNop(), DefaultAggregateTenantThreshold) - if err := m.runSchema(context.Background(), db); err != nil { - t.Fatalf("runSchema: %v", err) - } + provisionTestSchema(t, db) fc := newFakeConnectorInternal() fc.setTenant("t", tDSN, db) diff --git a/internal/manager/manager_integration_test.go b/internal/manager/manager_integration_test.go index 2cd3c22..c670851 100644 --- a/internal/manager/manager_integration_test.go +++ b/internal/manager/manager_integration_test.go @@ -5,7 +5,6 @@ package manager_test import ( "context" "database/sql" - "encoding/json" "errors" "fmt" "strings" @@ -14,6 +13,7 @@ import ( "time" tmcore "github.com/LerianStudio/lib-commons/v5/commons/tenant-manager/core" + systemplane "github.com/LerianStudio/lib-systemplane" "github.com/LerianStudio/lib-systemplane/internal/manager" "github.com/bxcodec/dbresolver/v2" _ "github.com/jackc/pgx/v5/stdlib" @@ -135,6 +135,18 @@ func freshDB(t *testing.T, admin *sql.DB, name string) { } } +// provisionSchema applies the published systemplane DDL to db. The Manager no +// longer creates its schema or seeds defaults at runtime, so the test (acting +// as the consumer's migration pipeline) provisions systemplane_entries plus +// the NOTIFY trigger before activating tenants. +func provisionSchema(t *testing.T, db *sql.DB) { + t.Helper() + + if _, err := db.Exec(systemplane.SchemaSQL()); err != nil { + t.Fatalf("provision schema: %v", err) + } +} + func dsnFor(base, dbName string) string { for i := len(base) - 1; i >= 0; i-- { if base[i] == '/' { @@ -174,6 +186,9 @@ func setup(t *testing.T, baseDSN string, tenantID string, keys []manager.Registe t.Fatalf("open tenant: %v", err) } + // Provision the schema externally — the Manager no longer creates it. + provisionSchema(t, tenantSQL) + resolver := dbresolver.New(dbresolver.WithPrimaryDBs(tenantSQL)) fc := newFakeConnector() @@ -191,9 +206,11 @@ func setup(t *testing.T, baseDSN string, tenantID string, keys []manager.Registe return m, tdsn, resolver, cleanup } -// TestIntegration_Manager_OnTenantActivated_BootstrapsSchemaAndSeeds verifies -// that activation runs DDL, seeds defaults, opens LISTEN and warm-loads. -func TestIntegration_Manager_OnTenantActivated_BootstrapsSchemaAndSeeds(t *testing.T) { +// TestIntegration_Manager_OnTenantActivated_WarmLoadsExternallySeeded verifies +// that activation warm-loads values the consumer's migration seeded into the +// externally provisioned table and opens LISTEN. The Manager no longer seeds +// defaults itself — the row is inserted out-of-band first. +func TestIntegration_Manager_OnTenantActivated_WarmLoadsExternallySeeded(t *testing.T) { baseDSN, cleanup := startContainer(t) t.Cleanup(cleanup) @@ -205,37 +222,76 @@ func TestIntegration_Manager_OnTenantActivated_BootstrapsSchemaAndSeeds(t *testi m, _, resolver, mClean := setup(t, baseDSN, "tenant-a", keys) defer mClean() + // Seed the value externally (the migration pipeline's job now). + if _, err := resolver.ExecContext(context.Background(), + `INSERT INTO systemplane_entries (namespace, key, value, updated_at, updated_by) + VALUES ($1, $2, $3, now(), $4)`, + "ns", "retries", []byte(`3`), "migration"); err != nil { + t.Fatalf("external seed: %v", err) + } + if err := m.OnTenantActivated(context.Background(), "tenant-a"); err != nil { t.Fatalf("OnTenantActivated: %v", err) } - // Verify defaults were seeded. - row := resolver.QueryRowContext(context.Background(), - `SELECT value FROM systemplane_entries WHERE namespace=$1 AND key=$2`, - "ns", "retries") + // Verify cache was warm-loaded from the externally seeded row. + got, hit, err := m.Lookup(context.Background(), "tenant-a", "ns", "retries") + if err != nil || !hit { + t.Fatalf("Lookup retries: err=%v hit=%v", err, hit) + } - var raw []byte - if err := row.Scan(&raw); err != nil { - t.Fatalf("scan seeded row: %v", err) + if got.(float64) != 3.0 { + t.Fatalf("Lookup retries = %v, want 3", got) } +} - var v float64 - if err := json.Unmarshal(raw, &v); err != nil { - t.Fatalf("decode seeded value: %v", err) +// TestIntegration_Manager_OnTenantActivated_MissingTableTolerated verifies the +// graceful path: when the systemplane table has NOT been provisioned yet, +// activation does not fail — warm-load logs and proceeds with an empty cache, +// and the LISTEN goroutine still opens. (Reads fall through / miss until the +// migration creates the table.) +func TestIntegration_Manager_OnTenantActivated_MissingTableTolerated(t *testing.T) { + baseDSN, cleanup := startContainer(t) + t.Cleanup(cleanup) + + admin := adminDSN(t, baseDSN) + defer admin.Close() + + dbName := fmt.Sprintf("mgr_missing_%d", time.Now().UnixNano()) + freshDB(t, admin, dbName) + + tdsn := dsnFor(baseDSN, dbName) + + tenantSQL, err := sql.Open("pgx", tdsn) + if err != nil { + t.Fatalf("open tenant: %v", err) } + defer tenantSQL.Close() + + // Intentionally DO NOT provision the schema. + resolver := dbresolver.New(dbresolver.WithPrimaryDBs(tenantSQL)) + + fc := newFakeConnector() + fc.setTenant("tenant-miss", tdsn, resolver) - if v != 3.0 { - t.Fatalf("seeded retries = %v, want 3", v) + m := manager.New(nil) + m.SetConnector(fc) + m.Bind(&stubHooks{keys: []manager.RegisteredKey{{Namespace: "ns", Key: "k", DefaultValue: "default"}}}) + t.Cleanup(func() { _ = m.Drain(context.Background()) }) + + // Activation must succeed despite the missing table. + if err := m.OnTenantActivated(context.Background(), "tenant-miss"); err != nil { + t.Fatalf("OnTenantActivated with missing table should be tolerated, got: %v", err) } - // Verify cache was warm-loaded. - got, hit, err := m.Lookup(context.Background(), "tenant-a", "ns", "retries") - if err != nil || !hit { - t.Fatalf("Lookup retries: err=%v hit=%v", err, hit) + // Cache is empty — Lookup misses (no panic, no error). + _, hit, err := m.Lookup(context.Background(), "tenant-miss", "ns", "k") + if err != nil { + t.Fatalf("Lookup after tolerated missing table: %v", err) } - if got.(float64) != 3.0 { - t.Fatalf("Lookup retries = %v, want 3", got) + if hit { + t.Fatal("expected cache miss with empty warm-load cache") } } @@ -319,6 +375,9 @@ func TestIntegration_Manager_MultiTenantIsolation(t *testing.T) { } defer tenantB.Close() + provisionSchema(t, tenantA) + provisionSchema(t, tenantB) + rA := dbresolver.New(dbresolver.WithPrimaryDBs(tenantA)) rB := dbresolver.New(dbresolver.WithPrimaryDBs(tenantB)) @@ -339,15 +398,20 @@ func TestIntegration_Manager_MultiTenantIsolation(t *testing.T) { t.Fatalf("activate B: %v", err) } - // Write distinct values to each tenant. + // Write distinct values to each tenant (upsert — the table starts empty + // now that the Manager no longer seeds defaults). if _, err := rA.ExecContext(context.Background(), - `UPDATE systemplane_entries SET value=$1 WHERE namespace=$2 AND key=$3`, + `INSERT INTO systemplane_entries (namespace, key, value, updated_at, updated_by) + VALUES ($2, $3, $1, now(), 'test') + ON CONFLICT (namespace, key) DO UPDATE SET value = EXCLUDED.value, updated_at = EXCLUDED.updated_at`, []byte(`"value-A"`), "ns", "k"); err != nil { t.Fatalf("write A: %v", err) } if _, err := rB.ExecContext(context.Background(), - `UPDATE systemplane_entries SET value=$1 WHERE namespace=$2 AND key=$3`, + `INSERT INTO systemplane_entries (namespace, key, value, updated_at, updated_by) + VALUES ($2, $3, $1, now(), 'test') + ON CONFLICT (namespace, key) DO UPDATE SET value = EXCLUDED.value, updated_at = EXCLUDED.updated_at`, []byte(`"value-B"`), "ns", "k"); err != nil { t.Fatalf("write B: %v", err) } @@ -405,7 +469,9 @@ func TestIntegration_Manager_ReconnectAfterTerminateBackend(t *testing.T) { time.Sleep(1 * time.Second) if _, err := resolver.ExecContext(context.Background(), - `UPDATE systemplane_entries SET value=$1 WHERE namespace=$2 AND key=$3`, + `INSERT INTO systemplane_entries (namespace, key, value, updated_at, updated_by) + VALUES ($2, $3, $1, now(), 'test') + ON CONFLICT (namespace, key) DO UPDATE SET value = EXCLUDED.value, updated_at = EXCLUDED.updated_at`, []byte(`"after-reconnect"`), "ns", "k"); err != nil { t.Fatalf("write after terminate: %v", err) } @@ -428,9 +494,17 @@ func TestIntegration_Manager_DeleteThenReactivate(t *testing.T) { keys := []manager.RegisteredKey{{Namespace: "ns", Key: "k", DefaultValue: "default"}} - m, _, _, mClean := setup(t, baseDSN, "tenant-c", keys) + m, _, resolver, mClean := setup(t, baseDSN, "tenant-c", keys) defer mClean() + // Seed the value externally (migration pipeline's job now). + if _, err := resolver.ExecContext(context.Background(), + `INSERT INTO systemplane_entries (namespace, key, value, updated_at, updated_by) + VALUES ($1, $2, $3, now(), 'migration')`, + "ns", "k", []byte(`"default"`)); err != nil { + t.Fatalf("external seed: %v", err) + } + if err := m.OnTenantActivated(context.Background(), "tenant-c"); err != nil { t.Fatalf("activate 1: %v", err) } @@ -485,7 +559,9 @@ func TestIntegration_Manager_CredentialsRotated(t *testing.T) { defer unsub() if _, err := resolver.ExecContext(context.Background(), - `UPDATE systemplane_entries SET value=$1 WHERE namespace=$2 AND key=$3`, + `INSERT INTO systemplane_entries (namespace, key, value, updated_at, updated_by) + VALUES ($2, $3, $1, now(), 'test') + ON CONFLICT (namespace, key) DO UPDATE SET value = EXCLUDED.value, updated_at = EXCLUDED.updated_at`, []byte(`"post-rotation"`), "ns", "k"); err != nil { t.Fatalf("write after rotation: %v", err) } @@ -500,9 +576,11 @@ func TestIntegration_Manager_CredentialsRotated(t *testing.T) { } } -// TestIntegration_Manager_SeedDoesNotOverwriteExisting verifies that -// re-activation does not overwrite operator-set values. -func TestIntegration_Manager_SeedDoesNotOverwriteExisting(t *testing.T) { +// TestIntegration_Manager_ReactivateDoesNotClobberOperatorValue verifies that +// re-activation never overwrites operator-set values. Because the Manager no +// longer seeds defaults at runtime, re-activation is a pure warm-load that must +// reflect the current DB value as-is. +func TestIntegration_Manager_ReactivateDoesNotClobberOperatorValue(t *testing.T) { baseDSN, cleanup := startContainer(t) t.Cleanup(cleanup) @@ -511,18 +589,19 @@ func TestIntegration_Manager_SeedDoesNotOverwriteExisting(t *testing.T) { m, _, resolver, mClean := setup(t, baseDSN, "tenant-noov", keys) defer mClean() - if err := m.OnTenantActivated(context.Background(), "tenant-noov"); err != nil { - t.Fatalf("activate 1: %v", err) - } - - // Operator-set value. + // Operator-set value, written externally as an upsert. if _, err := resolver.ExecContext(context.Background(), - `UPDATE systemplane_entries SET value=$1 WHERE namespace=$2 AND key=$3`, - []byte(`"operator-set"`), "ns", "k"); err != nil { + `INSERT INTO systemplane_entries (namespace, key, value, updated_at, updated_by) + VALUES ($1, $2, $3, now(), 'operator')`, + "ns", "k", []byte(`"operator-set"`)); err != nil { t.Fatalf("operator write: %v", err) } - // Drop the per-tenant state so re-activation runs a fresh seed. + if err := m.OnTenantActivated(context.Background(), "tenant-noov"); err != nil { + t.Fatalf("activate 1: %v", err) + } + + // Drop the per-tenant state so re-activation runs a fresh warm-load. if err := m.OnTenantDeleted(context.Background(), "tenant-noov"); err != nil { t.Fatalf("delete: %v", err) } @@ -537,7 +616,7 @@ func TestIntegration_Manager_SeedDoesNotOverwriteExisting(t *testing.T) { } if got != "operator-set" { - t.Fatalf("seed overwrote operator value: got %v, want operator-set", got) + t.Fatalf("re-activation clobbered operator value: got %v, want operator-set", got) } } diff --git a/internal/manager/schema.go b/internal/manager/schema.go index d86d6bc..a2f72a5 100644 --- a/internal/manager/schema.go +++ b/internal/manager/schema.go @@ -1,112 +1,33 @@ -// Schema bootstrap, defaults seed, and warm-load for the Manager. +// Tenant DB resolution and shared constants for the Manager. // -// These helpers mirror the lazy bootstrap inside internal/postgres for MT -// mode but make it eager (the Manager runs them at OnTenantActivated time). -// The DDL must stay byte-compatible with internal/postgres/postgres_schema.go -// — both code paths target the same table and trigger. +// This package performs NO runtime schema provisioning. The +// systemplane_entries table, the systemplane_notify_v3() trigger function, and +// the NOTIFY triggers MUST be provisioned externally (e.g. via the consumer's +// migration pipeline) using the DDL published by the root package's +// SchemaSQL() / DefaultSeedSQL(). The Manager only warm-loads, reads, writes +// values, and runs LISTEN; the runtime database role only needs DML + LISTEN +// privileges, never CREATE on the schema. package manager import ( "context" "errors" - "fmt" "github.com/bxcodec/dbresolver/v2" ) // defaultTable is the systemplane entries table name. Held here as a private -// constant rather than threaded through config because slice-3 callers don't -// need configurable table names — the Manager always operates on the same -// table the existing postgres Store creates. +// constant rather than threaded through config because callers don't need +// configurable table names — the Manager always operates on the same table the +// consumer provisions via SchemaSQL(). +// +// defaultChannel is the NOTIFY channel the externally provisioned triggers +// emit on; the LISTEN goroutine subscribes to it. const ( defaultTable = "systemplane_entries" defaultChannel = "systemplane_changes" - defaultActor = "systemplane.manager" ) -// runSchemaAndSeed runs the idempotent DDL + defaults seed for the resolved -// tenant DB. It does NOT touch the cache; warmLoad is a separate step the -// caller runs after this returns. -func (m *Manager) runSchemaAndSeed(ctx context.Context, db dbresolver.DB, registered []RegisteredKey) error { - if err := m.runSchema(ctx, db); err != nil { - return err - } - - return m.seedDefaults(ctx, db, registered) -} - -// runSchema creates the entries table + NOTIFY trigger function/trigger. -// -// The DDL is intentionally identical to internal/postgres/postgres_schema.go's -// runSchema. Both paths must produce the same shape so a tenant DB bootstrapped -// by either path is interchangeable for the other. -func (m *Manager) runSchema(ctx context.Context, db dbresolver.DB) error { - createTable := fmt.Sprintf(`CREATE TABLE IF NOT EXISTS %s ( - namespace TEXT NOT NULL, - key TEXT NOT NULL, - value JSONB NOT NULL, - updated_at TIMESTAMPTZ NOT NULL DEFAULT now(), - updated_by TEXT NOT NULL DEFAULT '', - PRIMARY KEY (namespace, key) - )`, defaultTable) - - if _, err := db.ExecContext(ctx, createTable); err != nil { - return fmt.Errorf("systemplane/manager: create table: %w", err) - } - - createFunc := `CREATE OR REPLACE FUNCTION systemplane_notify_v3() RETURNS TRIGGER AS $$ -BEGIN - IF TG_OP = 'DELETE' THEN - PERFORM pg_notify(TG_ARGV[0], json_build_object( - 'namespace', OLD.namespace, - 'key', OLD.key, - 'op', 'delete' - )::text); - RETURN OLD; - ELSE - PERFORM pg_notify(TG_ARGV[0], json_build_object( - 'namespace', NEW.namespace, - 'key', NEW.key, - 'op', 'upsert' - )::text); - RETURN NEW; - END IF; -END; -$$ LANGUAGE plpgsql` - - if _, err := db.ExecContext(ctx, createFunc); err != nil { - return fmt.Errorf("systemplane/manager: create function: %w", err) - } - - if _, err := db.ExecContext(ctx, "DROP TRIGGER IF EXISTS systemplane_notify_trigger ON "+defaultTable); err != nil { - return fmt.Errorf("systemplane/manager: drop trigger: %w", err) - } - - if _, err := db.ExecContext(ctx, "DROP TRIGGER IF EXISTS systemplane_notify_update_trigger ON "+defaultTable); err != nil { - return fmt.Errorf("systemplane/manager: drop update trigger: %w", err) - } - - createInsertDeleteTrigger := fmt.Sprintf(`CREATE TRIGGER systemplane_notify_trigger -AFTER INSERT OR DELETE ON %s -FOR EACH ROW EXECUTE FUNCTION systemplane_notify_v3('%s')`, defaultTable, defaultChannel) - - if _, err := db.ExecContext(ctx, createInsertDeleteTrigger); err != nil { - return fmt.Errorf("systemplane/manager: create insert/delete trigger: %w", err) - } - - createUpdateTrigger := fmt.Sprintf(`CREATE TRIGGER systemplane_notify_update_trigger -AFTER UPDATE ON %s -FOR EACH ROW -WHEN (OLD IS DISTINCT FROM NEW) -EXECUTE FUNCTION systemplane_notify_v3('%s')`, defaultTable, defaultChannel) - - if _, err := db.ExecContext(ctx, createUpdateTrigger); err != nil { - return fmt.Errorf("systemplane/manager: create update trigger: %w", err) - } - - return nil -} - // resolveTenantDB acquires the tenant's primary dbresolver.DB via the // configured Connector. Returns ErrPgMgrUnavailable when no Connector is // wired (test contexts). diff --git a/internal/manager/schema_integration_test.go b/internal/manager/schema_integration_test.go deleted file mode 100644 index 16a26d9..0000000 --- a/internal/manager/schema_integration_test.go +++ /dev/null @@ -1,590 +0,0 @@ -//go:build integration - -// Schema bootstrap edge-case coverage against a live testcontainers -// Postgres. The happy-path (clean DB → CREATE TABLE + trigger) is already -// exercised by manager_integration_test.go's OnTenantActivated tests; this -// file pins: -// -// - idempotent re-bootstrap (every CREATE IF NOT EXISTS path) -// - trigger missing → re-bootstrap recreates it -// - DDL error surfaces wrapped through runSchema -// - seedDefaults: empty/registered/conflict paths and the firstErr return -// - warmLoad: empty table, registered + unregistered, malformed JSON row, -// query error after rows.Close, scan error -// -// These tests run inside the manager package so the internal helpers -// (runSchema, runSchemaAndSeed, seedDefaults, warmLoad) can be called -// directly without re-exporting. -package manager - -import ( - "context" - "database/sql" - "fmt" - "strings" - "sync" - "testing" - "time" - - "github.com/bxcodec/dbresolver/v2" - _ "github.com/jackc/pgx/v5/stdlib" - "github.com/testcontainers/testcontainers-go" - pgcontainer "github.com/testcontainers/testcontainers-go/modules/postgres" -) - -func startPGForSchema(t *testing.T) (string, func()) { - t.Helper() - - ctx := context.Background() - - container, err := pgcontainer.Run(ctx, "postgres:16-alpine", - pgcontainer.WithDatabase("postgres"), - pgcontainer.WithUsername("postgres"), - pgcontainer.WithPassword("postgres"), - pgcontainer.BasicWaitStrategies(), - ) - if err != nil { - t.Fatalf("start container: %v", err) - } - - dsn, err := container.ConnectionString(ctx, "sslmode=disable") - if err != nil { - _ = testcontainers.TerminateContainer(container) - t.Fatalf("connection string: %v", err) - } - - cleanup := func() { _ = testcontainers.TerminateContainer(container) } - - return dsn, cleanup -} - -func freshTenantDB(t *testing.T, baseDSN string) (dbresolver.DB, *sql.DB, string, func()) { - t.Helper() - - admin, err := sql.Open("pgx", baseDSN) - if err != nil { - t.Fatalf("open admin: %v", err) - } - defer admin.Close() - - dbName := strings.ReplaceAll(fmt.Sprintf("mgr_schema_%d", time.Now().UnixNano()), "-", "_") - if _, err := admin.Exec(`CREATE DATABASE ` + dbName); err != nil { - t.Fatalf("create db: %v", err) - } - - tDSN := strings.Replace(baseDSN, "/postgres?", "/"+dbName+"?", 1) - if tDSN == baseDSN { - // no '?' suffix - tDSN = strings.Replace(baseDSN, "/postgres", "/"+dbName, 1) - } - - tenantSQL, err := sql.Open("pgx", tDSN) - if err != nil { - t.Fatalf("open tenant: %v", err) - } - - if err := tenantSQL.Ping(); err != nil { - _ = tenantSQL.Close() - t.Fatalf("ping tenant: %v", err) - } - - resolver := dbresolver.New(dbresolver.WithPrimaryDBs(tenantSQL)) - - cleanup := func() { - _ = tenantSQL.Close() - // Drop DB in background; it's OK if it fails on teardown. - admin2, _ := sql.Open("pgx", baseDSN) - if admin2 != nil { - _, _ = admin2.Exec(`DROP DATABASE IF EXISTS ` + dbName + ` WITH (FORCE)`) - _ = admin2.Close() - } - } - - return resolver, tenantSQL, tDSN, cleanup -} - -func TestSchema_RunSchema_FreshDB_Bootstraps(t *testing.T) { - baseDSN, baseCleanup := startPGForSchema(t) - t.Cleanup(baseCleanup) - - db, raw, _, cleanup := freshTenantDB(t, baseDSN) - t.Cleanup(cleanup) - - m := New(nil) - - if err := m.runSchema(context.Background(), db); err != nil { - t.Fatalf("runSchema fresh: %v", err) - } - - var count int - if err := raw.QueryRow(`SELECT count(*) FROM pg_trigger - WHERE tgname IN ('systemplane_notify_trigger', 'systemplane_notify_update_trigger')`).Scan(&count); err != nil { - t.Fatalf("trigger count: %v", err) - } - - if count != 2 { - t.Fatalf("expected 2 systemplane triggers, got %d", count) - } -} - -func TestSchema_RunSchema_AlreadyExists_IsIdempotent(t *testing.T) { - baseDSN, baseCleanup := startPGForSchema(t) - t.Cleanup(baseCleanup) - - db, raw, _, cleanup := freshTenantDB(t, baseDSN) - t.Cleanup(cleanup) - - m := New(nil) - - // First pass. - if err := m.runSchema(context.Background(), db); err != nil { - t.Fatalf("runSchema first: %v", err) - } - - // Second pass must succeed — CREATE TABLE IF NOT EXISTS, CREATE OR - // REPLACE FUNCTION, and DROP TRIGGER IF EXISTS + CREATE TRIGGER make - // every step idempotent. - if err := m.runSchema(context.Background(), db); err != nil { - t.Fatalf("runSchema idempotent: %v", err) - } - - // Triggers must still exist exactly once (re-creation drops and recreates). - var count int - if err := raw.QueryRow(`SELECT count(*) FROM pg_trigger - WHERE tgname IN ('systemplane_notify_trigger', 'systemplane_notify_update_trigger')`).Scan(&count); err != nil { - t.Fatalf("trigger count: %v", err) - } - - if count != 2 { - t.Fatalf("expected 2 triggers after idempotent re-bootstrap, got %d", count) - } -} - -func TestSchema_RunSchema_TriggerMissing_Recreates(t *testing.T) { - baseDSN, baseCleanup := startPGForSchema(t) - t.Cleanup(baseCleanup) - - db, raw, _, cleanup := freshTenantDB(t, baseDSN) - t.Cleanup(cleanup) - - m := New(nil) - - if err := m.runSchema(context.Background(), db); err != nil { - t.Fatalf("runSchema first: %v", err) - } - - // Drop one trigger out of band to simulate operator drift. - if _, err := raw.Exec(`DROP TRIGGER systemplane_notify_trigger ON ` + defaultTable); err != nil { - t.Fatalf("drop trigger: %v", err) - } - - // Re-bootstrapping must recreate it. - if err := m.runSchema(context.Background(), db); err != nil { - t.Fatalf("runSchema recreate: %v", err) - } - - var count int - if err := raw.QueryRow(`SELECT count(*) FROM pg_trigger WHERE tgname = 'systemplane_notify_trigger'`).Scan(&count); err != nil { - t.Fatalf("trigger count: %v", err) - } - - if count != 1 { - t.Fatalf("expected trigger recreated, got count=%d", count) - } -} - -func TestSchema_RunSchema_DBClosed_SurfacesError(t *testing.T) { - baseDSN, baseCleanup := startPGForSchema(t) - t.Cleanup(baseCleanup) - - db, raw, _, cleanup := freshTenantDB(t, baseDSN) - t.Cleanup(cleanup) - - // Close the underlying DB before the first ExecContext so CREATE TABLE - // fails with the wrapped "create table" error. Confirms runSchema wraps - // the upstream error with context. - _ = raw.Close() - - m := New(nil) - - err := m.runSchema(context.Background(), db) - if err == nil { - t.Fatal("expected error when DB is closed") - } - - if !strings.Contains(err.Error(), "create table") { - t.Fatalf("expected create-table wrap, got %v", err) - } -} - -func TestSchema_RunSchemaAndSeed_SchemaErrorShortCircuits(t *testing.T) { - baseDSN, baseCleanup := startPGForSchema(t) - t.Cleanup(baseCleanup) - - db, raw, _, cleanup := freshTenantDB(t, baseDSN) - t.Cleanup(cleanup) - - _ = raw.Close() - - m := New(nil) - - err := m.runSchemaAndSeed(context.Background(), db, []RegisteredKey{ - {Namespace: "ns", Key: "k", DefaultValue: "v"}, - }) - - if err == nil { - t.Fatal("expected error when schema step fails") - } - - if !strings.Contains(err.Error(), "create table") { - t.Fatalf("expected schema-error short-circuit, got %v", err) - } -} - -func TestSchema_SeedDefaults_ConflictDoesNotOverwrite(t *testing.T) { - baseDSN, baseCleanup := startPGForSchema(t) - t.Cleanup(baseCleanup) - - db, raw, _, cleanup := freshTenantDB(t, baseDSN) - t.Cleanup(cleanup) - - m := New(nil) - - if err := m.runSchema(context.Background(), db); err != nil { - t.Fatalf("runSchema: %v", err) - } - - // Operator pre-sets a value. - if _, err := raw.Exec(`INSERT INTO `+defaultTable+ - ` (namespace, key, value, updated_by) VALUES ($1, $2, $3::jsonb, 'operator')`, - "ops", "rate", `"OPERATOR"`); err != nil { - t.Fatalf("operator insert: %v", err) - } - - // seedDefaults attempts to insert the registered default; ON CONFLICT - // must keep the operator value. - if err := m.seedDefaults(context.Background(), db, []RegisteredKey{ - {Namespace: "ops", Key: "rate", DefaultValue: "DEFAULT"}, - {Namespace: "ops", Key: "new", DefaultValue: 42}, - }); err != nil { - t.Fatalf("seedDefaults: %v", err) - } - - var ( - opsRate string - opsRateBy string - opsNew string - ) - - if err := raw.QueryRow(`SELECT value::text, updated_by FROM `+defaultTable+ - ` WHERE namespace=$1 AND key=$2`, "ops", "rate").Scan(&opsRate, &opsRateBy); err != nil { - t.Fatalf("read rate: %v", err) - } - - if opsRate != `"OPERATOR"` { - t.Fatalf("operator value clobbered: got %s", opsRate) - } - - if opsRateBy != "operator" { - t.Fatalf("operator updated_by clobbered: got %s", opsRateBy) - } - - // The brand-new key must have been seeded. - if err := raw.QueryRow(`SELECT value::text FROM `+defaultTable+ - ` WHERE namespace=$1 AND key=$2`, "ops", "new").Scan(&opsNew); err != nil { - t.Fatalf("read new: %v", err) - } - - if opsNew != "42" { - t.Fatalf("new key not seeded as 42, got %s", opsNew) - } -} - -func TestSchema_SeedDefaults_InsertErrorReturnsFirstErr(t *testing.T) { - baseDSN, baseCleanup := startPGForSchema(t) - t.Cleanup(baseCleanup) - - db, raw, _, cleanup := freshTenantDB(t, baseDSN) - t.Cleanup(cleanup) - - // Close the DB so every INSERT fails. seedDefaults must capture the - // first error and still iterate the remaining keys (logging each - // failure) before returning. - _ = raw.Close() - - m := New(nil) - - err := m.seedDefaults(context.Background(), db, []RegisteredKey{ - {Namespace: "a", Key: "k1", DefaultValue: 1}, - {Namespace: "a", Key: "k2", DefaultValue: 2}, - }) - - if err == nil { - t.Fatal("expected wrapped insert error") - } - - if !strings.Contains(err.Error(), "seed insert a/k1") { - t.Fatalf("expected first error to reference a/k1, got %v", err) - } -} - -func TestSchema_SeedDefaults_MarshalErrorReturnsFirstErr(t *testing.T) { - baseDSN, baseCleanup := startPGForSchema(t) - t.Cleanup(baseCleanup) - - db, _, _, cleanup := freshTenantDB(t, baseDSN) - t.Cleanup(cleanup) - - m := New(nil) - if err := m.runSchema(context.Background(), db); err != nil { - t.Fatalf("runSchema: %v", err) - } - - // json.Marshal of a channel value returns an error → drives the marshal - // branch in seedDefaults that captures firstErr and continues. - bad := make(chan int) - err := m.seedDefaults(context.Background(), db, []RegisteredKey{ - {Namespace: "bad", Key: "k1", DefaultValue: bad}, - {Namespace: "ok", Key: "k2", DefaultValue: "ok"}, - }) - - if err == nil { - t.Fatal("expected marshal error") - } - - if !strings.Contains(err.Error(), "seed marshal bad/k1") { - t.Fatalf("expected first marshal error for bad/k1, got %v", err) - } -} - -func TestSchema_WarmLoad_PopulatesCacheRegisteredOnly(t *testing.T) { - baseDSN, baseCleanup := startPGForSchema(t) - t.Cleanup(baseCleanup) - - db, raw, _, cleanup := freshTenantDB(t, baseDSN) - t.Cleanup(cleanup) - - m := New(nil) - - if err := m.runSchema(context.Background(), db); err != nil { - t.Fatalf("runSchema: %v", err) - } - - // Two rows registered, one stale row that should be skipped. - rows := []struct { - ns, key, value string - }{ - {"a", "k1", `"v1"`}, - {"a", "k2", `42`}, - {"old", "stale", `"unused"`}, - } - - for _, r := range rows { - if _, err := raw.Exec(`INSERT INTO `+defaultTable+ - ` (namespace, key, value, updated_by) VALUES ($1, $2, $3::jsonb, 'op')`, - r.ns, r.key, r.value); err != nil { - t.Fatalf("insert row %v: %v", r, err) - } - } - - ts := newTenantState("t") - registered := []RegisteredKey{ - {Namespace: "a", Key: "k1"}, - {Namespace: "a", Key: "k2"}, - } - - if err := m.warmLoad(context.Background(), db, ts, registered); err != nil { - t.Fatalf("warmLoad: %v", err) - } - - if got := len(ts.entries); got != 2 { - t.Fatalf("expected 2 registered entries loaded, got %d (entries=%v)", got, ts.entries) - } - - if v := ts.entries[nsKey{Namespace: "a", Key: "k1"}]; v != "v1" { - t.Fatalf("k1: got %v want v1", v) - } - - if v := ts.entries[nsKey{Namespace: "a", Key: "k2"}]; v != float64(42) { - t.Fatalf("k2: got %v want 42", v) - } - - if _, present := ts.entries[nsKey{Namespace: "old", Key: "stale"}]; present { - t.Fatal("unregistered key must be skipped") - } - - if ts.stale { - t.Fatal("warmLoad must clear the stale flag on success") - } -} - -func TestSchema_WarmLoad_MalformedJSONSkipped(t *testing.T) { - baseDSN, baseCleanup := startPGForSchema(t) - t.Cleanup(baseCleanup) - - db, raw, _, cleanup := freshTenantDB(t, baseDSN) - t.Cleanup(cleanup) - - m := New(nil) - if err := m.runSchema(context.Background(), db); err != nil { - t.Fatalf("runSchema: %v", err) - } - - // Postgres JSONB enforces valid JSON, so we cannot directly insert an - // invalid blob. Force a decode failure by registering a value whose - // JSON form (e.g. `null`) Unmarshal yields nil — then seed a row that - // json.Unmarshal would accept but yields the zero-value any. That - // covers the success branch in warmLoad. For the malformed-JSON branch - // we exploit the fact that bytea-typed rows aren't possible here, so - // we seed a row whose value is a JSON string the test can detect, then - // rely on a separate test for the firstErr decode path via a value - // that triggers Unmarshal failure on a programmatically-corrupted row. - // - // In practice JSONB enforcement makes this branch unreachable in - // production. Validate the happy path here so warmLoad's loop body - // and rows.Err checks are both exercised. - if _, err := raw.Exec(`INSERT INTO `+defaultTable+ - ` (namespace, key, value, updated_by) VALUES ('n', 'k', $1::jsonb, 'op')`, - `null`); err != nil { - t.Fatalf("insert null row: %v", err) - } - - ts := newTenantState("t") - if err := m.warmLoad(context.Background(), db, ts, []RegisteredKey{ - {Namespace: "n", Key: "k"}, - }); err != nil { - t.Fatalf("warmLoad: %v", err) - } - - if _, present := ts.entries[nsKey{Namespace: "n", Key: "k"}]; !present { - t.Fatal("null-valued row must populate entry (as nil)") - } -} - -func TestSchema_WarmLoad_QueryError(t *testing.T) { - baseDSN, baseCleanup := startPGForSchema(t) - t.Cleanup(baseCleanup) - - db, raw, _, cleanup := freshTenantDB(t, baseDSN) - t.Cleanup(cleanup) - - // Close before warmLoad opens its query → QueryContext fails. - _ = raw.Close() - - m := New(nil) - ts := newTenantState("t") - - err := m.warmLoad(context.Background(), db, ts, nil) - if err == nil { - t.Fatal("expected query error") - } - - if !strings.Contains(err.Error(), "warm-load query") { - t.Fatalf("expected warm-load query wrap, got %v", err) - } -} - -func TestSchema_WarmLoad_RespectsCacheCap(t *testing.T) { - baseDSN, baseCleanup := startPGForSchema(t) - t.Cleanup(baseCleanup) - - db, raw, _, cleanup := freshTenantDB(t, baseDSN) - t.Cleanup(cleanup) - - m := New(nil) - m.cfg.maxEntriesPerTenantOverride = 1 // cap aggressively - - if err := m.runSchema(context.Background(), db); err != nil { - t.Fatalf("runSchema: %v", err) - } - - for i := 0; i < 5; i++ { - if _, err := raw.Exec(`INSERT INTO `+defaultTable+ - ` (namespace, key, value, updated_by) VALUES ($1, $2, $3::jsonb, 'op')`, - "a", fmt.Sprintf("k%d", i), `"v"`); err != nil { - t.Fatalf("insert: %v", err) - } - } - - registered := make([]RegisteredKey, 5) - for i := range registered { - registered[i] = RegisteredKey{Namespace: "a", Key: fmt.Sprintf("k%d", i)} - } - - ts := newTenantState("t") - if err := m.warmLoad(context.Background(), db, ts, registered); err != nil { - t.Fatalf("warmLoad: %v", err) - } - - if got := len(ts.entries); got != 1 { - t.Fatalf("expected cache cap of 1, got %d", got) - } -} - -func TestSchema_RunSchemaAndSeed_Concurrent_SafeForSameTenant(t *testing.T) { - baseDSN, baseCleanup := startPGForSchema(t) - t.Cleanup(baseCleanup) - - db, raw, _, cleanup := freshTenantDB(t, baseDSN) - t.Cleanup(cleanup) - - m := New(nil) - - // 5 concurrent bootstraps targeting the same tenant DB. Postgres' - // DDL serialization + ON CONFLICT DO NOTHING make this safe; the test - // fails if any call returns an error. - var ( - wg sync.WaitGroup - errs = make(chan error, 5) - ) - - keys := []RegisteredKey{ - {Namespace: "ns", Key: "k1", DefaultValue: "v1"}, - {Namespace: "ns", Key: "k2", DefaultValue: "v2"}, - } - - for i := 0; i < 5; i++ { - wg.Add(1) - - go func() { - defer wg.Done() - errs <- m.runSchemaAndSeed(context.Background(), db, keys) - }() - } - - wg.Wait() - close(errs) - - // Postgres may surface a concurrent-DDL error such as "tuple - // concurrently updated" or "duplicate key" while two CREATE TRIGGER - // statements race. The Manager treats those as transient and the next - // OnTenantActivated retries; for this test we tolerate ≤1 error to - // pin the contract that at least one concurrent caller succeeds. - var failed int - for err := range errs { - if err != nil { - failed++ - t.Logf("concurrent bootstrap error (tolerated): %v", err) - } - } - - if failed >= 5 { - t.Fatalf("all concurrent runs failed; expected at least one success") - } - - // Verify the table and triggers are in good shape after the dust - // settles by running one more bootstrap. - if err := m.runSchemaAndSeed(context.Background(), db, keys); err != nil { - t.Fatalf("post-concurrency bootstrap: %v", err) - } - - var rowCount int - if err := raw.QueryRow(`SELECT count(*) FROM ` + defaultTable).Scan(&rowCount); err != nil { - t.Fatalf("count rows: %v", err) - } - - // Exactly the 2 registered keys must exist (no duplicates from concurrent seeds). - if rowCount != 2 { - t.Fatalf("expected 2 seeded rows, got %d", rowCount) - } -} diff --git a/internal/manager/schema_test.go b/internal/manager/schema_test.go index 1bf9840..c66f3d2 100644 --- a/internal/manager/schema_test.go +++ b/internal/manager/schema_test.go @@ -10,49 +10,6 @@ import ( "github.com/bxcodec/dbresolver/v2" ) -func TestSeedDefaults_EmptyRegistered_NoOp(t *testing.T) { - t.Parallel() - - m := New(nil) - - if err := m.seedDefaults(context.Background(), nil, nil); err != nil { - t.Fatalf("seedDefaults empty: %v", err) - } -} - -// TestSeedDefaults_CtxCancelled_ReturnsCtxErrBeforeDB pins the cancellation -// discipline: a pre-cancelled ctx must abort the seed loop before any DB -// call, so a fast-shutdown path stops contending for the tenant's pool. -// Passing a nil dbresolver.DB proves no DB call happens — otherwise the -// test would panic. -func TestSeedDefaults_CtxCancelled_ReturnsCtxErrBeforeDB(t *testing.T) { - t.Parallel() - - m := New(nil) - - ctx, cancel := context.WithCancel(context.Background()) - cancel() - - registered := []RegisteredKey{{Namespace: "ns", Key: "k", DefaultValue: 1}} - - err := m.seedDefaults(ctx, nil, registered) - if !errors.Is(err, context.Canceled) { - t.Fatalf("expected context.Canceled, got %v", err) - } -} - -func TestSeedDefaults_MarshalFailureSkipsDBAndReturnsErr(t *testing.T) { - t.Parallel() - - m := New(nil) - registered := []RegisteredKey{{Namespace: "ns", Key: "bad", DefaultValue: func() {}}} - - err := m.seedDefaults(context.Background(), nil, registered) - if err == nil { - t.Fatal("expected marshal error, got nil") - } -} - func TestApplyEvent_UpsertResolveFailureDoesNotMutateCache(t *testing.T) { t.Parallel() diff --git a/internal/manager/testhelpers_integration_test.go b/internal/manager/testhelpers_integration_test.go new file mode 100644 index 0000000..9e22e65 --- /dev/null +++ b/internal/manager/testhelpers_integration_test.go @@ -0,0 +1,134 @@ +//go:build integration + +// Shared integration-test helpers for the package-internal manager tests. +// +// These previously lived alongside the schema-bootstrap tests, which were +// removed when runtime schema provisioning was dropped. The Manager no longer +// creates its schema, so tests provision systemplane_entries (plus the NOTIFY +// trigger) the way a consumer's migration pipeline would — by executing the +// published DDL read from ddl/schema.sql (the single source of truth, also +// surfaced by the root package's SchemaSQL()). +package manager + +import ( + "context" + "database/sql" + "fmt" + "os" + "path/filepath" + "runtime" + "strings" + "testing" + "time" + + "github.com/bxcodec/dbresolver/v2" + _ "github.com/jackc/pgx/v5/stdlib" + "github.com/testcontainers/testcontainers-go" + pgcontainer "github.com/testcontainers/testcontainers-go/modules/postgres" +) + +// schemaDDL loads ddl/schema.sql from the repository root. The root package's +// SchemaSQL() embeds the same file; this package cannot import the root +// (import cycle), so it reads the file directly to keep a single source of +// truth. +func schemaDDL(t *testing.T) string { + t.Helper() + + _, thisFile, _, ok := runtime.Caller(0) + if !ok { + t.Fatal("runtime.Caller failed") + } + + // thisFile = /internal/manager/testhelpers_integration_test.go + repoRoot := filepath.Join(filepath.Dir(thisFile), "..", "..") + + raw, err := os.ReadFile(filepath.Join(repoRoot, "ddl", "schema.sql")) + if err != nil { + t.Fatalf("read ddl/schema.sql: %v", err) + } + + return string(raw) +} + +// provisionTestSchema applies the published DDL to db, simulating the +// consumer's external migration pipeline. +func provisionTestSchema(t *testing.T, db interface { + ExecContext(context.Context, string, ...any) (sql.Result, error) +}) { + t.Helper() + + if _, err := db.ExecContext(context.Background(), schemaDDL(t)); err != nil { + t.Fatalf("provision schema: %v", err) + } +} + +func startPGForSchema(t *testing.T) (string, func()) { + t.Helper() + + ctx := context.Background() + + container, err := pgcontainer.Run(ctx, "postgres:16-alpine", + pgcontainer.WithDatabase("postgres"), + pgcontainer.WithUsername("postgres"), + pgcontainer.WithPassword("postgres"), + pgcontainer.BasicWaitStrategies(), + ) + if err != nil { + t.Fatalf("start container: %v", err) + } + + dsn, err := container.ConnectionString(ctx, "sslmode=disable") + if err != nil { + _ = testcontainers.TerminateContainer(container) + t.Fatalf("connection string: %v", err) + } + + cleanup := func() { _ = testcontainers.TerminateContainer(container) } + + return dsn, cleanup +} + +func freshTenantDB(t *testing.T, baseDSN string) (dbresolver.DB, *sql.DB, string, func()) { + t.Helper() + + admin, err := sql.Open("pgx", baseDSN) + if err != nil { + t.Fatalf("open admin: %v", err) + } + defer admin.Close() + + dbName := strings.ReplaceAll(fmt.Sprintf("mgr_schema_%d", time.Now().UnixNano()), "-", "_") + if _, err := admin.Exec(`CREATE DATABASE ` + dbName); err != nil { + t.Fatalf("create db: %v", err) + } + + tDSN := strings.Replace(baseDSN, "/postgres?", "/"+dbName+"?", 1) + if tDSN == baseDSN { + // no '?' suffix + tDSN = strings.Replace(baseDSN, "/postgres", "/"+dbName, 1) + } + + tenantSQL, err := sql.Open("pgx", tDSN) + if err != nil { + t.Fatalf("open tenant: %v", err) + } + + if err := tenantSQL.Ping(); err != nil { + _ = tenantSQL.Close() + t.Fatalf("ping tenant: %v", err) + } + + resolver := dbresolver.New(dbresolver.WithPrimaryDBs(tenantSQL)) + + cleanup := func() { + _ = tenantSQL.Close() + // Drop DB in background; it's OK if it fails on teardown. + admin2, _ := sql.Open("pgx", baseDSN) + if admin2 != nil { + _, _ = admin2.Exec(`DROP DATABASE IF EXISTS ` + dbName + ` WITH (FORCE)`) + _ = admin2.Close() + } + } + + return resolver, tenantSQL, tDSN, cleanup +} diff --git a/internal/manager/seed.go b/internal/manager/warmload.go similarity index 54% rename from internal/manager/seed.go rename to internal/manager/warmload.go index 26d0c6b..5993100 100644 --- a/internal/manager/seed.go +++ b/internal/manager/warmload.go @@ -1,81 +1,47 @@ +// Warm-load for the Manager. +// +// warmLoad reads every registered key's current value from the tenant DB into +// the per-tenant cache at activation time. The schema is provisioned +// externally (see schema.go); warm-load never creates it. If the table has not +// been provisioned yet — e.g. activation races the consumer's migration — +// warm-load logs and proceeds with an empty cache rather than failing, so a +// provisioning race never wedges activation. The cache refreshes via +// LISTEN/poll once the table exists. package manager import ( "context" "encoding/json" + "errors" "fmt" - "time" "github.com/LerianStudio/lib-observability/log" "github.com/bxcodec/dbresolver/v2" + "github.com/jackc/pgx/v5/pgconn" ) -// seedDefaults inserts every registered key's default value with -// ON CONFLICT (namespace, key) DO NOTHING so operator-set values are never -// overwritten. Logs and continues for individual key failures; the first -// encountered error is returned at the end so the caller can surface it. -func (m *Manager) seedDefaults(ctx context.Context, db dbresolver.DB, registered []RegisteredKey) error { - if len(registered) == 0 { - return nil - } - - query := fmt.Sprintf( - `INSERT INTO %s (namespace, key, value, updated_at, updated_by) -VALUES ($1, $2, $3, $4, $5) -ON CONFLICT (namespace, key) DO NOTHING`, - defaultTable, - ) - - now := time.Now().UTC() - - var firstErr error - - for _, rk := range registered { - // Honour ctx cancellation between iterations so a fast-shutdown - // path stops contending for the tenant's connection pool. Mirrors - // the same discipline Drain already follows on perTenant.Range. - if err := ctx.Err(); err != nil { - if firstErr == nil { - firstErr = err - } +// pgUndefinedTable is the Postgres SQLSTATE for "relation does not exist" +// (42P01). It is returned when warm-load runs before the consumer's migration +// provisioned systemplane_entries. +const pgUndefinedTable = "42P01" - return firstErr - } - - raw, err := json.Marshal(rk.DefaultValue) - if err != nil { - m.logWarn(ctx, "manager seed: marshal default failed, skipping", - log.String("namespace", rk.Namespace), - log.String("key", rk.Key), - log.Err(err), - ) +// isUndefinedTable reports whether err is a Postgres "undefined table" error +// (SQLSTATE 42P01). Used to tolerate a not-yet-provisioned schema during +// warm-load instead of failing activation. +func isUndefinedTable(err error) bool { + var pgErr *pgconn.PgError - if firstErr == nil { - firstErr = fmt.Errorf("systemplane/manager: seed marshal %s/%s: %w", rk.Namespace, rk.Key, err) - } - - continue - } - - if _, err := db.ExecContext(ctx, query, rk.Namespace, rk.Key, raw, now, defaultActor); err != nil { - m.logWarn(ctx, "manager seed: insert failed", - log.String("namespace", rk.Namespace), - log.String("key", rk.Key), - log.Err(err), - ) - - if firstErr == nil { - firstErr = fmt.Errorf("systemplane/manager: seed insert %s/%s: %w", rk.Namespace, rk.Key, err) - } - } - } - - return firstErr + return errors.As(err, &pgErr) && pgErr.Code == pgUndefinedTable } // warmLoad reads every row from the tenant DB into the per-tenant cache. // Unregistered keys are skipped (a row exists but the Client never declared // it — typical when a stale registration was removed but the row remained). +// +// If the table has not been provisioned yet (SQLSTATE 42P01) warm-load logs at +// WARN and returns nil with an empty, non-stale cache — the consumer's +// migration is expected to create the table, and LISTEN/poll will populate the +// cache once it does. func (m *Manager) warmLoad(ctx context.Context, db dbresolver.DB, ts *tenantState, registered []RegisteredKey) error { registry := make(map[nsKey]struct{}, len(registered)) for _, rk := range registered { @@ -86,6 +52,19 @@ func (m *Manager) warmLoad(ctx context.Context, db dbresolver.DB, ts *tenantStat rows, err := db.QueryContext(ctx, query) if err != nil { + if isUndefinedTable(err) { + m.logWarn(ctx, "manager warm-load: systemplane table not provisioned yet, proceeding with empty cache", + log.String("table", defaultTable), + log.Err(err), + ) + + ts.mu.Lock() + ts.stale = false + ts.mu.Unlock() + + return nil + } + return fmt.Errorf("systemplane/manager: warm-load query: %w", err) } defer rows.Close() diff --git a/internal/manager/warmload_test.go b/internal/manager/warmload_test.go new file mode 100644 index 0000000..073562a --- /dev/null +++ b/internal/manager/warmload_test.go @@ -0,0 +1,60 @@ +//go:build unit + +package manager + +import ( + "errors" + "fmt" + "testing" + + "github.com/jackc/pgx/v5/pgconn" +) + +// TestIsUndefinedTable_DetectsSQLState42P01 pins the graceful missing-table +// detection used by warmLoad: only a Postgres "undefined table" (42P01) error +// — possibly wrapped — is tolerated; every other error must propagate. +func TestIsUndefinedTable_DetectsSQLState42P01(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + err error + want bool + }{ + { + name: "bare undefined table", + err: &pgconn.PgError{Code: "42P01", Message: `relation "systemplane_entries" does not exist`}, + want: true, + }, + { + name: "wrapped undefined table", + err: fmt.Errorf("warm-load: %w", &pgconn.PgError{Code: "42P01"}), + want: true, + }, + { + name: "other pg error (permission denied)", + err: &pgconn.PgError{Code: "42501", Message: "permission denied for schema"}, + want: false, + }, + { + name: "non-pg error", + err: errors.New("connection refused"), + want: false, + }, + { + name: "nil error", + err: nil, + want: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + if got := isUndefinedTable(tt.err); got != tt.want { + t.Fatalf("isUndefinedTable(%v) = %v, want %v", tt.err, got, tt.want) + } + }) + } +} diff --git a/internal/postgres/postgres.go b/internal/postgres/postgres.go index a944ad7..d6ca53a 100644 --- a/internal/postgres/postgres.go +++ b/internal/postgres/postgres.go @@ -9,10 +9,17 @@ // // - Multi-tenant. The constructor receives no db; instead the caller wires // lib-commons tenant-manager middleware so each request context carries -// the per-tenant database. resolveDB(ctx) extracts that database, lazily -// bootstraps the schema on first use per database, and returns the handle -// to the CRUD helpers. LISTEN/NOTIFY is disabled in this mode — Subscribe -// returns store.ErrNotSupportedInMultiTenant. +// the per-tenant database. resolveDB(ctx) extracts that database and +// returns the handle to the CRUD helpers. LISTEN/NOTIFY is disabled in +// this mode — Subscribe returns store.ErrNotSupportedInMultiTenant. +// +// This package performs NO runtime schema provisioning. The +// systemplane_entries table, the systemplane_notify_v3() trigger function, and +// the NOTIFY triggers MUST be provisioned externally (e.g. via the consumer's +// migration pipeline) using the DDL published by the root package's +// SchemaSQL() / DefaultSeedSQL(). The store only reads, writes values, and — +// in single-tenant mode — runs LISTEN/NOTIFY. The runtime database role only +// needs DML + LISTEN privileges, never CREATE on the schema. package postgres import ( @@ -37,8 +44,9 @@ import ( var _ store.Store = (*Store)(nil) // safeIdentifierRe validates that a SQL identifier contains only safe characters. -// DDL paths cannot use parameterized queries for identifiers, so any name -// interpolated into a statement must pass this check first. +// SQL statements cannot use parameterized queries for identifiers (table name, +// LISTEN channel), so any name interpolated into a statement must pass this +// check first. var safeIdentifierRe = regexp.MustCompile(`^[a-zA-Z_][a-zA-Z0-9_]*$`) const ( @@ -100,12 +108,6 @@ type Config struct { type Store struct { cfg Config - // schemaOnce tracks lazy schema bootstrap per database handle in - // multi-tenant mode. Single-tenant mode populates the sole entry at - // Start() time. - schemaOnce sync.Map // map[dbExecutor]*sync.Once - schemaErr sync.Map // map[dbExecutor]error - // listenerMu / subscribers serve the single-tenant LISTEN/NOTIFY path. listenerMu sync.Mutex subscribers map[uint64]func(store.Event) @@ -117,9 +119,10 @@ type Store struct { closed bool } -// Start performs single-tenant schema bootstrap and opens the LISTEN -// connection. In multi-tenant mode it is a no-op — schema bootstrap is lazy -// per tenant database and there is no shared changefeed. +// Start opens the single-tenant LISTEN connection. The schema is NOT created +// here — it must be provisioned externally (see the package doc). In +// multi-tenant mode Start is a no-op: there is no shared changefeed and reads +// go through the per-request tenant database. func (s *Store) Start(ctx context.Context) error { if s == nil || s.isClosed() { return store.ErrClosed @@ -129,10 +132,6 @@ func (s *Store) Start(ctx context.Context) error { return nil } - if err := s.ensureSchema(ctx, s.cfg.DB); err != nil { - return err - } - return s.startListener(ctx) } @@ -168,9 +167,9 @@ func (s *Store) isClosed() bool { // resolveDB returns the database handle for the current call. // // Single-tenant mode returns the constructor-supplied *sql.DB unchanged. -// Multi-tenant mode extracts the dbresolver.DB stored in ctx by -// tenant-manager middleware and lazily bootstraps the schema on first use per -// resolved database. +// Multi-tenant mode extracts the dbresolver.DB stored in ctx by tenant-manager +// middleware. The schema is assumed to be provisioned externally; the store +// does not create it. func (s *Store) resolveDB(ctx context.Context) (dbExecutor, error) { if !s.cfg.MultiTenantEnabled { return s.cfg.DB, nil @@ -181,36 +180,9 @@ func (s *Store) resolveDB(ctx context.Context) (dbExecutor, error) { return nil, store.ErrTenantConnectionMissing } - if err := s.ensureSchema(ctx, db); err != nil { - return nil, err - } - return db, nil } -// ensureSchema runs the idempotent DDL exactly once per database handle. The -// once/err pair are keyed on the dbExecutor interface value, which is stable -// per tmpostgres.Manager pool — every Manager returns the same dbresolver.DB -// reference for a given tenant, so subsequent lookups hit the cache. -func (s *Store) ensureSchema(ctx context.Context, db dbExecutor) error { - onceVal, _ := s.schemaOnce.LoadOrStore(db, &sync.Once{}) - once, _ := onceVal.(*sync.Once) - - once.Do(func() { - if err := s.runSchema(ctx, db); err != nil { - s.schemaErr.Store(db, err) - } - }) - - if errVal, ok := s.schemaErr.Load(db); ok { - if err, _ := errVal.(error); err != nil { - return err - } - } - - return nil -} - // List returns every entry in the resolved database, ordered by (namespace, key). func (s *Store) List(ctx context.Context) ([]store.Entry, error) { if s == nil || s.isClosed() { diff --git a/internal/postgres/postgres_integration_test.go b/internal/postgres/postgres_integration_test.go index 5db8b88..003e217 100644 --- a/internal/postgres/postgres_integration_test.go +++ b/internal/postgres/postgres_integration_test.go @@ -7,11 +7,13 @@ import ( "database/sql" "encoding/json" "fmt" + "strings" "sync" "testing" "time" tmcore "github.com/LerianStudio/lib-commons/v5/commons/tenant-manager/core" + systemplane "github.com/LerianStudio/lib-systemplane" "github.com/LerianStudio/lib-systemplane/internal/postgres" "github.com/LerianStudio/lib-systemplane/internal/store" "github.com/LerianStudio/lib-systemplane/systemplanetest" @@ -76,6 +78,18 @@ func freshDB(t *testing.T, admin *sql.DB, name string) string { return name } +// provisionSchema applies the published systemplane DDL to db. The Store no +// longer creates its schema at runtime, so the consumer (here, the test acting +// as the consumer's migration pipeline) provisions systemplane_entries plus +// the NOTIFY trigger before exercising reads/writes. +func provisionSchema(t *testing.T, db *sql.DB) { + t.Helper() + + if _, err := db.Exec(systemplane.SchemaSQL()); err != nil { + t.Fatalf("provision schema: %v", err) + } +} + // dsnFor reshapes the admin DSN to point at db. func dsnFor(base, dbName string) string { // testcontainers gives us a fully-formed URL of the shape @@ -120,6 +134,10 @@ func TestIntegration_PostgresSingleTenant(t *testing.T) { t.Fatalf("open: %v", err) } + // The Store no longer auto-creates its schema; provision it the way a + // consumer's migration pipeline would. + provisionSchema(t, db) + s, err := postgres.New(postgres.Config{ DB: db, ListenDSN: tenantDSN, @@ -139,7 +157,8 @@ func TestIntegration_PostgresSingleTenant(t *testing.T) { // TestIntegration_PostgresMultiTenantIsolation verifies that writes against // one tenant's database are invisible to another tenant in multi-tenant mode, -// and that schema bootstrap runs exactly once per tenant DB. +// using schema that the consumer provisioned externally (the Store performs no +// runtime DDL). func TestIntegration_PostgresMultiTenantIsolation(t *testing.T) { dsn, cleanup := startContainer(t) t.Cleanup(cleanup) @@ -167,6 +186,11 @@ func TestIntegration_PostgresMultiTenantIsolation(t *testing.T) { defer tenantB.Close() + // Provision each tenant DB externally — the Store no longer creates its + // schema at runtime. + provisionSchema(t, tenantA) + provisionSchema(t, tenantB) + resolverA := dbresolver.New(dbresolver.WithPrimaryDBs(tenantA)) resolverB := dbresolver.New(dbresolver.WithPrimaryDBs(tenantB)) @@ -215,15 +239,13 @@ func TestIntegration_PostgresMultiTenantIsolation(t *testing.T) { } } - // Drop the trigger created during the FIRST schema bootstrap on tenant A; - // a follow-up call should re-use the cached schema-once and NOT recreate - // the trigger (the test expectation is the trigger STAYS gone). + // Drop a trigger out-of-band, then issue another write. The Store must NOT + // recreate it — it performs no runtime DDL. The trigger STAYS gone, + // proving the store never issues CREATE statements at runtime. if _, err := tenantA.Exec(`DROP TRIGGER IF EXISTS systemplane_notify_trigger ON systemplane_entries`); err != nil { t.Fatalf("drop trigger: %v", err) } - // Issue more reads/writes on tenant A — schema-once cache should prevent - // the trigger from being recreated. if err := s.Set(ctxA, store.Entry{Namespace: "ns", Key: "k2", Value: jsonBytes(t, "again")}); err != nil { t.Fatalf("second set on A: %v", err) } @@ -239,7 +261,7 @@ func TestIntegration_PostgresMultiTenantIsolation(t *testing.T) { } if triggerExists { - t.Errorf("schema bootstrap ran a second time — trigger was recreated") + t.Errorf("store issued runtime DDL — dropped trigger was recreated") } // Multi-tenant mode disables Subscribe. @@ -273,6 +295,109 @@ func TestIntegration_PostgresMultiTenantMissingCtx(t *testing.T) { } } +// TestIntegration_PostgresLeastPrivilegeRole_NoRuntimeDDL pins the core +// contract of this change: when the schema is provisioned externally and the +// runtime role has only DML (SELECT/INSERT/UPDATE/DELETE) — no CREATE on the +// schema — Get/Set/Delete still succeed. If the Store attempted any runtime +// DDL it would fail with "permission denied for schema" (42501). +func TestIntegration_PostgresLeastPrivilegeRole_NoRuntimeDDL(t *testing.T) { + dsn, cleanup := startContainer(t) + t.Cleanup(cleanup) + + admin := adminDSN(t, dsn) + defer admin.Close() + + dbName := fmt.Sprintf("lp_%d", time.Now().UnixNano()) + freshDB(t, admin, dbName) + + // Provision the schema as a privileged role (the owner of the fresh DB). + owner, err := sql.Open("pgx", dsnFor(dsn, dbName)) + if err != nil { + t.Fatalf("open owner: %v", err) + } + + defer owner.Close() + + provisionSchema(t, owner) + + // Create a least-privilege role with DML only — explicitly NO CREATE on + // the public schema. This mirrors the role the tenant-manager hands the + // runtime. + roleName := fmt.Sprintf("sp_dml_%d", time.Now().UnixNano()) + rolePass := "dmlpass" + + if _, err := owner.Exec(fmt.Sprintf(`CREATE ROLE %s LOGIN PASSWORD '%s'`, roleName, rolePass)); err != nil { + t.Fatalf("create role: %v", err) + } + + stmts := []string{ + fmt.Sprintf(`REVOKE CREATE ON SCHEMA public FROM %s`, roleName), + fmt.Sprintf(`GRANT USAGE ON SCHEMA public TO %s`, roleName), + fmt.Sprintf(`GRANT SELECT, INSERT, UPDATE, DELETE ON systemplane_entries TO %s`, roleName), + } + for _, stmt := range stmts { + if _, err := owner.Exec(stmt); err != nil { + t.Fatalf("grant stmt %q: %v", stmt, err) + } + } + + // Build a DSN for the least-privilege role. + lpDSN := dsnWithUser(dsnFor(dsn, dbName), roleName, rolePass) + + lpDB, err := sql.Open("pgx", lpDSN) + if err != nil { + t.Fatalf("open least-privilege: %v", err) + } + + defer lpDB.Close() + + s, err := postgres.New(postgres.Config{DB: lpDB, ListenDSN: lpDSN}) + if err != nil { + t.Fatalf("postgres.New: %v", err) + } + + defer s.Close() + + // Start must succeed without issuing any DDL (it only opens LISTEN now). + if err := s.Start(context.Background()); err != nil { + t.Fatalf("Start with least-privilege role: %v", err) + } + + ctx := context.Background() + + if err := s.Set(ctx, store.Entry{Namespace: "ns", Key: "k", Value: jsonBytes(t, "value")}); err != nil { + t.Fatalf("Set with least-privilege role: %v", err) + } + + got := mustGet(t, s, ctx, "ns", "k") + if got != "value" { + t.Fatalf("Get = %q, want value", got) + } + + if err := s.Delete(ctx, "ns", "k", "actor"); err != nil { + t.Fatalf("Delete with least-privilege role: %v", err) + } +} + +// dsnWithUser rewrites the userinfo segment of a postgres URL DSN to user:pass. +func dsnWithUser(base, user, pass string) string { + // base is postgres://olduser:oldpass@host:port/db?params + const scheme = "postgres://" + + if !strings.HasPrefix(base, scheme) { + return base + } + + rest := base[len(scheme):] + + at := strings.IndexByte(rest, '@') + if at < 0 { + return base + } + + return scheme + user + ":" + pass + "@" + rest[at+1:] +} + func mustSet(t *testing.T, s store.Store, ctx context.Context, ns, key, value string) { t.Helper() diff --git a/internal/postgres/postgres_schema.go b/internal/postgres/postgres_schema.go deleted file mode 100644 index 16a62b3..0000000 --- a/internal/postgres/postgres_schema.go +++ /dev/null @@ -1,106 +0,0 @@ -// Schema bootstrap for the Postgres backend. -// -// runSchema is invoked exactly once per database handle through -// ensureSchema's sync.Once cache. The DDL is fully idempotent so re-runs -// against a populated database are safe. -package postgres - -import ( - "context" - "fmt" -) - -// runSchema creates the entries table and the NOTIFY trigger function/trigger. -// -// The schema is the post-issue-6 shape: -// -// systemplane_entries ( -// namespace TEXT NOT NULL, -// key TEXT NOT NULL, -// value JSONB NOT NULL, -// updated_at TIMESTAMPTZ NOT NULL DEFAULT now(), -// updated_by TEXT NOT NULL DEFAULT '', -// PRIMARY KEY (namespace, key) -// ) -// -// The trigger function emits a JSON payload of the form -// -// {"namespace": "...", "key": "...", "op": "upsert"|"delete"} -// -// on the configured channel. INSERT/UPDATE both map to "upsert" so subscribers -// have a single semantic to handle; the DELETE branch uses OLD.* because NEW -// is null on deletes. -func (s *Store) runSchema(ctx context.Context, db dbExecutor) error { - createTable := fmt.Sprintf(`CREATE TABLE IF NOT EXISTS %s ( - namespace TEXT NOT NULL, - key TEXT NOT NULL, - value JSONB NOT NULL, - updated_at TIMESTAMPTZ NOT NULL DEFAULT now(), - updated_by TEXT NOT NULL DEFAULT '', - PRIMARY KEY (namespace, key) - )`, s.cfg.Table) - - if _, err := db.ExecContext(ctx, createTable); err != nil { - return fmt.Errorf("systemplane/postgres: create table: %w", err) - } - - // The trigger function receives the channel name through TG_ARGV[0] so the - // same function body can serve any caller, and so a duplicate CREATE OR - // REPLACE from a sibling Store does not silently re-route notifications. - createFunc := `CREATE OR REPLACE FUNCTION systemplane_notify_v3() RETURNS TRIGGER AS $$ -BEGIN - IF TG_OP = 'DELETE' THEN - PERFORM pg_notify(TG_ARGV[0], json_build_object( - 'namespace', OLD.namespace, - 'key', OLD.key, - 'op', 'delete' - )::text); - RETURN OLD; - ELSE - PERFORM pg_notify(TG_ARGV[0], json_build_object( - 'namespace', NEW.namespace, - 'key', NEW.key, - 'op', 'upsert' - )::text); - RETURN NEW; - END IF; -END; -$$ LANGUAGE plpgsql` - - if _, err := db.ExecContext(ctx, createFunc); err != nil { - return fmt.Errorf("systemplane/postgres: create function: %w", err) - } - - dropTrigger := "DROP TRIGGER IF EXISTS systemplane_notify_trigger ON " + s.cfg.Table // #nosec G202 -- table validated as identifier - if _, err := db.ExecContext(ctx, dropTrigger); err != nil { - return fmt.Errorf("systemplane/postgres: drop trigger: %w", err) - } - - dropUpdateTrigger := "DROP TRIGGER IF EXISTS systemplane_notify_update_trigger ON " + s.cfg.Table // #nosec G202 -- table validated as identifier - if _, err := db.ExecContext(ctx, dropUpdateTrigger); err != nil { - return fmt.Errorf("systemplane/postgres: drop update trigger: %w", err) - } - - // Two triggers: INSERT/DELETE fire unconditionally; UPDATE is gated by - // `WHEN (OLD IS DISTINCT FROM NEW)` so a no-op upsert does not emit a - // spurious NOTIFY. - createInsertDeleteTrigger := fmt.Sprintf(`CREATE TRIGGER systemplane_notify_trigger -AFTER INSERT OR DELETE ON %s -FOR EACH ROW EXECUTE FUNCTION systemplane_notify_v3('%s')`, s.cfg.Table, s.cfg.Channel) // #nosec G201 -- table/channel validated as identifiers - - if _, err := db.ExecContext(ctx, createInsertDeleteTrigger); err != nil { - return fmt.Errorf("systemplane/postgres: create insert/delete trigger: %w", err) - } - - createUpdateTrigger := fmt.Sprintf(`CREATE TRIGGER systemplane_notify_update_trigger -AFTER UPDATE ON %s -FOR EACH ROW -WHEN (OLD IS DISTINCT FROM NEW) -EXECUTE FUNCTION systemplane_notify_v3('%s')`, s.cfg.Table, s.cfg.Channel) // #nosec G201 -- table/channel validated as identifiers - - if _, err := db.ExecContext(ctx, createUpdateTrigger); err != nil { - return fmt.Errorf("systemplane/postgres: create update trigger: %w", err) - } - - return nil -}