diff --git a/go.mod b/go.mod index 68ef509aed..e5ec959865 100644 --- a/go.mod +++ b/go.mod @@ -39,7 +39,7 @@ require ( github.com/dropbox/dropbox-sdk-go-unofficial/v6 v6.0.5 github.com/duckdb/duckdb-go/v2 v2.10502.0 github.com/elastic/go-elasticsearch/v8 v8.9.0 - github.com/estuary/flow v0.6.4 + github.com/estuary/flow v0.6.9 github.com/estuary/vitess v0.15.10 github.com/evanphx/json-patch/v5 v5.9.11 github.com/go-mysql-org/go-mysql v0.0.0-20250907131429-558ed11751bc @@ -76,7 +76,7 @@ require ( github.com/tidwall/gjson v1.17.3 github.com/tidwall/sjson v1.2.5 github.com/wk8/go-ordered-map/v2 v2.1.8 - go.gazette.dev/core v0.102.0 + go.gazette.dev/core v0.103.1-0.20260505142537-ce718ada37ee go.mongodb.org/mongo-driver v1.17.4 golang.org/x/crypto v0.49.0 golang.org/x/exp v0.0.0-20260312153236-7ab1446f8b90 diff --git a/go.sum b/go.sum index 55c24ee636..a5dc9f3ab9 100644 --- a/go.sum +++ b/go.sum @@ -409,8 +409,12 @@ github.com/envoyproxy/go-control-plane/ratelimit v0.1.0/go.mod h1:Wk+tMFAFbCXaJP github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/envoyproxy/protoc-gen-validate v1.3.0 h1:TvGH1wof4H33rezVKWSpqKz5NXWg5VPuZ0uONDT6eb4= github.com/envoyproxy/protoc-gen-validate v1.3.0/go.mod h1:HvYl7zwPa5mffgyeTUHA9zHIH36nmrm7oCbo4YKoSWA= -github.com/estuary/flow v0.6.4 h1:MpermXN5h1U85zJspZftGSC7XbdTUWv22/FM/tvofSc= -github.com/estuary/flow v0.6.4/go.mod h1:xihzOEpeEzcLTsgU2LFvjLs0f5UvbTeAQGnwno6ondk= +github.com/estuary/flow v0.6.8 h1:HK9zYKV2HvF6jPI8eSDGdLy4nVUW2FWB+CA37PyuPLI= +github.com/estuary/flow v0.6.8/go.mod h1:SYiOTbPZ0ZlCh8qNVARPMjRF6/aVWY7xQuHFK9BSELk= +github.com/estuary/flow v0.6.9-0.20260520113844-a3d8595811ed h1:+pm+QyY8+bhC2gxJdRa735NX+QawxO0Piuv5c1CUj0w= +github.com/estuary/flow v0.6.9-0.20260520113844-a3d8595811ed/go.mod h1:jdQeVWS9sw520EU2u9UPYryE5iEo1Gu9Ju99LXJ8r7Y= +github.com/estuary/flow v0.6.9 h1:1cGdwmlB4UxNhRnvujTDGwmx1ABvBS9iqWYIIStRaRI= +github.com/estuary/flow v0.6.9/go.mod h1:jdQeVWS9sw520EU2u9UPYryE5iEo1Gu9Ju99LXJ8r7Y= github.com/estuary/go-mysql v0.0.0-20250918155720-90d473fd1a3e h1:stJOXFppEkEksGtqpJjIIfufID+RUkrkMCFKgu4Vfaw= github.com/estuary/go-mysql v0.0.0-20250918155720-90d473fd1a3e/go.mod h1:FQxw17uRbFvMZFK+dPtIPufbU46nBdrGaxOw0ac9MFs= github.com/estuary/vitess v0.15.10 h1:oXJgcG0HGZWuwjdvSp4pIFIAXtKLaR9Fl9VBynSszFA= @@ -1109,8 +1113,8 @@ go.etcd.io/etcd/client/pkg/v3 v3.6.5 h1:Duz9fAzIZFhYWgRjp/FgNq2gO1jId9Yae/rLn3Rr go.etcd.io/etcd/client/pkg/v3 v3.6.5/go.mod h1:8Wx3eGRPiy0qOFMZT/hfvdos+DjEaPxdIDiCDUv/FQk= go.etcd.io/etcd/client/v3 v3.6.5 h1:yRwZNFBx/35VKHTcLDeO7XVLbCBFbPi+XV4OC3QJf2U= go.etcd.io/etcd/client/v3 v3.6.5/go.mod h1:ZqwG/7TAFZ0BJ0jXRPoJjKQJtbFo/9NIY8uoFFKcCyo= -go.gazette.dev/core v0.102.0 h1:S1FFQn4Ws4qThrnwQhZ75jghg++Tfev++fPdq6PVlGw= -go.gazette.dev/core v0.102.0/go.mod h1:4qMw9JDecqIx6EijHy6ndbueTYeglm6LW4qKxEUOZ8M= +go.gazette.dev/core v0.103.1-0.20260505142537-ce718ada37ee h1:yceN2m1184/i4YSN4tlfMSdpnV+xnePgnUf4KFa00oE= +go.gazette.dev/core v0.103.1-0.20260505142537-ce718ada37ee/go.mod h1:4qMw9JDecqIx6EijHy6ndbueTYeglm6LW4qKxEUOZ8M= go.mongodb.org/mongo-driver v1.11.4/go.mod h1:PTSz5yu21bkT/wXpkS7WR5f0ddqw5quethTUn9WM+2g= go.mongodb.org/mongo-driver v1.17.4 h1:jUorfmVzljjr0FLzYQsGP8cgN/qzzxlY9Vh0C9KFXVw= go.mongodb.org/mongo-driver v1.17.4/go.mod h1:Hy04i7O2kC4RS06ZrhPRqj/u4DTYkFDAAccj+rVKqgQ= diff --git a/materialize-postgres/.snapshots/TestSQLGeneration b/materialize-postgres/.snapshots/TestSQLGeneration index 6506a44470..6f63e0821a 100644 --- a/materialize-postgres/.snapshots/TestSQLGeneration +++ b/materialize-postgres/.snapshots/TestSQLGeneration @@ -344,4 +344,20 @@ BEGIN END $$; --- End Fence Update --- +--- Begin createTargetTable [jsonb contentMediaType] --- + +CREATE TABLE IF NOT EXISTS "public".jsonb_round_trip ( + id BIGINT NOT NULL, + plain_json JSON, + jsonb_col JSONB, + + PRIMARY KEY (id) +); + +COMMENT ON TABLE "public".jsonb_round_trip IS 'Generated for materialize-postgres jsonb round-trip test'; +COMMENT ON COLUMN "public".jsonb_round_trip.id IS ''; +COMMENT ON COLUMN "public".jsonb_round_trip.plain_json IS ''; +COMMENT ON COLUMN "public".jsonb_round_trip.jsonb_col IS ''; +--- End createTargetTable [jsonb contentMediaType] --- + diff --git a/materialize-postgres/sqlgen.go b/materialize-postgres/sqlgen.go index 00159bc3de..f56e554e5d 100644 --- a/materialize-postgres/sqlgen.go +++ b/materialize-postgres/sqlgen.go @@ -34,18 +34,36 @@ func createPgDialect(featureFlags map[string]bool) sql.Dialect { binaryMapping = sql.MapStatic("BYTEA", sql.UsingConverter(sql.Base64Decoder)) } + // Fields originating from a PostgreSQL JSONB column carry this + // (non-standard, vendor-tree) contentMediaType so they can be recreated + // as JSONB at the destination instead of collapsing onto JSON. Anything + // without the annotation defaults to JSON, preserving the historical + // behavior for existing materializations and non-Postgres sources. + jsonbContentMediaType := "application/vnd.estuary.postgresql.jsonb+json" + jsonOrJsonb := func(jsonMapper, jsonbMapper sql.MapProjectionFn) sql.MapProjectionFn { + return func(p *sql.Projection) (sql.DDLer, sql.CompatibleColumnTypes, sql.ElementConverter) { + if p.Inference.ContentMediaType == jsonbContentMediaType { + return jsonbMapper(p) + } + return jsonMapper(p) + } + } + mapper := sql.NewDDLMapper( sql.FlatTypeMappings{ sql.INTEGER: sql.MapSignedInt64( sql.MapStatic("BIGINT", sql.AlsoCompatibleWith("integer")), sql.MapStatic("NUMERIC"), ), - sql.NUMBER: sql.MapStatic("DOUBLE PRECISION"), - sql.BOOLEAN: sql.MapStatic("BOOLEAN"), - sql.OBJECT: sql.MapStatic("JSON"), - sql.ARRAY: sql.MapStatic("JSON"), - sql.BINARY: binaryMapping, - sql.MULTIPLE: sql.MapStatic("JSON", sql.UsingConverter(sql.ToJsonBytes)), + sql.NUMBER: sql.MapStatic("DOUBLE PRECISION"), + sql.BOOLEAN: sql.MapStatic("BOOLEAN"), + sql.OBJECT: jsonOrJsonb(sql.MapStatic("JSON"), sql.MapStatic("JSONB")), + sql.ARRAY: jsonOrJsonb(sql.MapStatic("JSON"), sql.MapStatic("JSONB")), + sql.BINARY: binaryMapping, + sql.MULTIPLE: jsonOrJsonb( + sql.MapStatic("JSON", sql.UsingConverter(sql.ToJsonBytes)), + sql.MapStatic("JSONB", sql.UsingConverter(sql.ToJsonBytes)), + ), sql.STRING_INTEGER: sql.MapStatic("NUMERIC"), sql.STRING_NUMBER: sql.MapStatic("DECIMAL", sql.AlsoCompatibleWith("numeric")), sql.STRING: sql.MapString(sql.StringMappings{ @@ -80,7 +98,12 @@ func createPgDialect(featureFlags map[string]bool) sql.Dialect { "text": {sql.NewMigrationSpec([]string{"bytea"}, sql.WithCastSQL(stringToByteaCast))}, "character varying": {sql.NewMigrationSpec([]string{"bytea"}, sql.WithCastSQL(stringToByteaCast))}, "bytea": {sql.NewMigrationSpec([]string{"text"}, sql.WithCastSQL(byteaToStringCast))}, - "*": {sql.NewMigrationSpec([]string{"json"}, sql.WithCastSQL(toJsonCast))}, + // PostgreSQL accepts implicit casts in both directions between + // json and jsonb, so columns can move freely as the upstream + // source type annotation flips. + "json": {sql.NewMigrationSpec([]string{"jsonb"})}, + "jsonb": {sql.NewMigrationSpec([]string{"json"})}, + "*": {sql.NewMigrationSpec([]string{"json", "jsonb"}, sql.WithCastSQL(toJsonCast))}, }, TableLocatorer: sql.TableLocatorFn(func(path []string) sql.InfoTableLocation { if len(path) == 1 { diff --git a/materialize-postgres/sqlgen_test.go b/materialize-postgres/sqlgen_test.go index 7b15374df6..42755276fb 100644 --- a/materialize-postgres/sqlgen_test.go +++ b/materialize-postgres/sqlgen_test.go @@ -41,9 +41,80 @@ func TestSQLGeneration(t *testing.T) { }, ) + // Exercise the source-postgres -> materialize-postgres JSONB round-trip: + // a value field carrying the application/vnd.postgresql.jsonb+json + // contentMediaType must render as JSONB, while a sibling field without + // the annotation stays on JSON. + jsonbTable := buildJSONBTestTable(t) + jsonbName := "createTargetTable [jsonb contentMediaType]" + snap.WriteString("--- Begin " + jsonbName + " ---\n") + require.NoError(t, testTemplates.createTargetTable.Execute(snap, &jsonbTable)) + snap.WriteString("--- End " + jsonbName + " ---\n\n") + cupaloy.SnapshotT(t, snap.String()) } +// buildJSONBTestTable assembles a synthetic Table with two value projections: +// one carrying the jsonb contentMediaType so it should map to JSONB, and one +// without so it should default to JSON. +func buildJSONBTestTable(t *testing.T) sql.Table { + t.Helper() + + const jsonbMediaType = "application/vnd.estuary.postgresql.jsonb+json" + multipleTypes := []string{"object", "string", "array", "number", "boolean", "null"} + + mkValue := func(field, contentType string) sql.Column { + p := sql.Projection{ + Projection: pf.Projection{ + Field: field, + Ptr: "/" + field, + Inference: pf.Inference{ + Types: multipleTypes, + ContentMediaType: contentType, + Exists: pf.Inference_MAY, + }, + }, + } + return sql.Column{ + Projection: p, + MappedType: testDialect.MapType(&p, sql.FieldConfig{}), + Identifier: testDialect.Identifier(field), + } + } + + keyProj := sql.Projection{ + Projection: pf.Projection{ + Field: "id", + Ptr: "/id", + IsPrimaryKey: true, + Inference: pf.Inference{ + Types: []string{"integer"}, + Exists: pf.Inference_MUST, + }, + }, + } + keyCol := sql.Column{ + Projection: keyProj, + MappedType: testDialect.MapType(&keyProj, sql.FieldConfig{}), + Identifier: testDialect.Identifier("id"), + MustExist: true, + } + + tableName := "jsonb_round_trip" + return sql.Table{ + TableShape: sql.TableShape{ + Path: []string{"public", tableName}, + Comment: "Generated for materialize-postgres jsonb round-trip test", + }, + Identifier: testDialect.Identifier("public", tableName), + Keys: []sql.Column{keyCol}, + Values: []sql.Column{ + mkValue("plain_json", ""), + mkValue("jsonb_col", jsonbMediaType), + }, + } +} + func TestDateTimeColumn(t *testing.T) { var mapped = testDialect.MapType(&sql.Projection{ @@ -62,6 +133,35 @@ func TestDateTimeColumn(t *testing.T) { require.NoError(t, err) } +func TestJSONBContentMediaType(t *testing.T) { + jsonbMediaType := "application/vnd.estuary.postgresql.jsonb+json" + + mapWithMediaType := func(types []string, contentType string) string { + return testDialect.MapType(&sql.Projection{ + Projection: pf.Projection{ + Inference: pf.Inference{ + Types: types, + ContentMediaType: contentType, + Exists: pf.Inference_MUST, + }, + }, + }, sql.FieldConfig{}).DDL + } + + require.Equal(t, + "JSONB NOT NULL", + mapWithMediaType([]string{"object", "string", "array", "number", "boolean"}, jsonbMediaType), + "MULTIPLE-typed field with jsonb contentMediaType should map to JSONB") + require.Equal(t, + "JSON NOT NULL", + mapWithMediaType([]string{"object", "string", "array", "number", "boolean"}, "application/json"), + "MULTIPLE-typed field with application/json contentMediaType should map to JSON") + require.Equal(t, + "JSON NOT NULL", + mapWithMediaType([]string{"object", "string", "array", "number", "boolean"}, ""), + "MULTIPLE-typed field without contentMediaType should default to JSON") +} + func TestTruncatedIdentifier(t *testing.T) { tests := []struct { name string diff --git a/source-postgres/.snapshots/TestDiscoveryComplex b/source-postgres/.snapshots/TestDiscoveryComplex index 3f83263f0c..b6c6695a8d 100644 --- a/source-postgres/.snapshots/TestDiscoveryComplex +++ b/source-postgres/.snapshots/TestDiscoveryComplex @@ -36,10 +36,12 @@ sql> COMMENT ON COLUMN test.discoverycomplex_934635.k1 IS 'I think this is a key ] }, "doc": { - "description": "(source type: json)" + "description": "(source type: json)", + "contentMediaType": "application/json" }, "doc/bin": { - "description": "(source type: non-nullable jsonb)" + "description": "(source type: non-nullable jsonb)", + "contentMediaType": "application/vnd.estuary.postgresql.jsonb+json" }, "foo": { "description": "This is a text field! (source type: text)", diff --git a/source-postgres/discovery.go b/source-postgres/discovery.go index c2dcf7c82a..f9b0bcbfc5 100644 --- a/source-postgres/discovery.go +++ b/source-postgres/discovery.go @@ -267,10 +267,11 @@ func (db *postgresDatabase) TranslateDBToJSONType(column sqlcapture.ColumnInfo, } type columnSchema struct { - contentEncoding string - format string - nullable bool - jsonTypes []string + contentEncoding string + contentMediaType string + format string + nullable bool + jsonTypes []string } func (s columnSchema) toType() *jsonschema.Schema { @@ -283,6 +284,10 @@ func (s columnSchema) toType() *jsonschema.Schema { out.Extras["contentEncoding"] = s.contentEncoding // New in 2019-09. } + if s.contentMediaType != "" { + out.Extras["contentMediaType"] = s.contentMediaType // New in 2019-09. + } + if s.jsonTypes != nil { var types = append([]string(nil), s.jsonTypes...) if s.nullable { @@ -338,8 +343,12 @@ var postgresTypeToJSON = map[string]columnSchema{ "bit": {jsonTypes: []string{"string"}}, "varbit": {jsonTypes: []string{"string"}}, - "json": {}, - "jsonb": {}, + // json and jsonb columns capture arbitrary JSON values, so we don't constrain + // the JSON Schema type. The contentMediaType annotation distinguishes the two + // at the wire so downstream connectors (e.g. materialize-postgres) can + // recreate the original column type instead of collapsing both onto json. + "json": {contentMediaType: "application/json"}, + "jsonb": {contentMediaType: "application/vnd.estuary.postgresql.jsonb+json"}, "jsonpath": {jsonTypes: []string{"string"}}, // Domain-Specific Types