From cad0b04b98546114c4d674b493efb196b7516b02 Mon Sep 17 00:00:00 2001 From: Mahdi Dibaiee Date: Mon, 11 May 2026 17:45:23 +0200 Subject: [PATCH 1/3] {source-materialize}-postgres: support JSONB --- .../.snapshots/TestSQLGeneration | 16 +++ materialize-postgres/sqlgen.go | 37 ++++-- materialize-postgres/sqlgen_test.go | 111 ++++++++++++++++++ .../.snapshots/TestDiscoveryComplex | 6 +- source-postgres/discovery.go | 21 +++- 5 files changed, 176 insertions(+), 15 deletions(-) diff --git a/materialize-postgres/.snapshots/TestSQLGeneration b/materialize-postgres/.snapshots/TestSQLGeneration index 6506a44470..6f63e0821a 100644 --- a/materialize-postgres/.snapshots/TestSQLGeneration +++ b/materialize-postgres/.snapshots/TestSQLGeneration @@ -344,4 +344,20 @@ BEGIN END $$; --- End Fence Update --- +--- Begin createTargetTable [jsonb contentMediaType] --- + +CREATE TABLE IF NOT EXISTS "public".jsonb_round_trip ( + id BIGINT NOT NULL, + plain_json JSON, + jsonb_col JSONB, + + PRIMARY KEY (id) +); + +COMMENT ON TABLE "public".jsonb_round_trip IS 'Generated for materialize-postgres jsonb round-trip test'; +COMMENT ON COLUMN "public".jsonb_round_trip.id IS ''; +COMMENT ON COLUMN "public".jsonb_round_trip.plain_json IS ''; +COMMENT ON COLUMN "public".jsonb_round_trip.jsonb_col IS ''; +--- End createTargetTable [jsonb contentMediaType] --- + diff --git a/materialize-postgres/sqlgen.go b/materialize-postgres/sqlgen.go index 00159bc3de..6281538022 100644 --- a/materialize-postgres/sqlgen.go +++ b/materialize-postgres/sqlgen.go @@ -34,18 +34,36 @@ func createPgDialect(featureFlags map[string]bool) sql.Dialect { binaryMapping = sql.MapStatic("BYTEA", sql.UsingConverter(sql.Base64Decoder)) } + // Fields originating from a PostgreSQL JSONB column carry this + // (non-standard, vendor-tree) contentMediaType so they can be recreated + // as JSONB at the destination instead of collapsing onto JSON. Anything + // without the annotation defaults to JSON, preserving the historical + // behavior for existing materializations and non-Postgres sources. + jsonbContentMediaType := "application/vnd.postgresql.jsonb+json" + jsonOrJsonb := func(jsonMapper, jsonbMapper sql.MapProjectionFn) sql.MapProjectionFn { + return func(p *sql.Projection) (sql.DDLer, sql.CompatibleColumnTypes, sql.ElementConverter) { + if p.Inference.String_ != nil && p.Inference.String_.ContentType == jsonbContentMediaType { + return jsonbMapper(p) + } + return jsonMapper(p) + } + } + mapper := sql.NewDDLMapper( sql.FlatTypeMappings{ sql.INTEGER: sql.MapSignedInt64( sql.MapStatic("BIGINT", sql.AlsoCompatibleWith("integer")), sql.MapStatic("NUMERIC"), ), - sql.NUMBER: sql.MapStatic("DOUBLE PRECISION"), - sql.BOOLEAN: sql.MapStatic("BOOLEAN"), - sql.OBJECT: sql.MapStatic("JSON"), - sql.ARRAY: sql.MapStatic("JSON"), - sql.BINARY: binaryMapping, - sql.MULTIPLE: sql.MapStatic("JSON", sql.UsingConverter(sql.ToJsonBytes)), + sql.NUMBER: sql.MapStatic("DOUBLE PRECISION"), + sql.BOOLEAN: sql.MapStatic("BOOLEAN"), + sql.OBJECT: jsonOrJsonb(sql.MapStatic("JSON"), sql.MapStatic("JSONB")), + sql.ARRAY: jsonOrJsonb(sql.MapStatic("JSON"), sql.MapStatic("JSONB")), + sql.BINARY: binaryMapping, + sql.MULTIPLE: jsonOrJsonb( + sql.MapStatic("JSON", sql.UsingConverter(sql.ToJsonBytes)), + sql.MapStatic("JSONB", sql.UsingConverter(sql.ToJsonBytes)), + ), sql.STRING_INTEGER: sql.MapStatic("NUMERIC"), sql.STRING_NUMBER: sql.MapStatic("DECIMAL", sql.AlsoCompatibleWith("numeric")), sql.STRING: sql.MapString(sql.StringMappings{ @@ -80,7 +98,12 @@ func createPgDialect(featureFlags map[string]bool) sql.Dialect { "text": {sql.NewMigrationSpec([]string{"bytea"}, sql.WithCastSQL(stringToByteaCast))}, "character varying": {sql.NewMigrationSpec([]string{"bytea"}, sql.WithCastSQL(stringToByteaCast))}, "bytea": {sql.NewMigrationSpec([]string{"text"}, sql.WithCastSQL(byteaToStringCast))}, - "*": {sql.NewMigrationSpec([]string{"json"}, sql.WithCastSQL(toJsonCast))}, + // PostgreSQL accepts implicit casts in both directions between + // json and jsonb, so columns can move freely as the upstream + // source type annotation flips. + "json": {sql.NewMigrationSpec([]string{"jsonb"})}, + "jsonb": {sql.NewMigrationSpec([]string{"json"})}, + "*": {sql.NewMigrationSpec([]string{"json"}, sql.WithCastSQL(toJsonCast))}, }, TableLocatorer: sql.TableLocatorFn(func(path []string) sql.InfoTableLocation { if len(path) == 1 { diff --git a/materialize-postgres/sqlgen_test.go b/materialize-postgres/sqlgen_test.go index 7b15374df6..37695c0461 100644 --- a/materialize-postgres/sqlgen_test.go +++ b/materialize-postgres/sqlgen_test.go @@ -41,9 +41,84 @@ func TestSQLGeneration(t *testing.T) { }, ) + // Exercise the source-postgres -> materialize-postgres JSONB round-trip: + // a value field carrying the application/vnd.postgresql.jsonb+json + // contentMediaType must render as JSONB, while a sibling field without + // the annotation stays on JSON. + jsonbTable := buildJSONBTestTable(t) + jsonbName := "createTargetTable [jsonb contentMediaType]" + snap.WriteString("--- Begin " + jsonbName + " ---\n") + require.NoError(t, testTemplates.createTargetTable.Execute(snap, &jsonbTable)) + snap.WriteString("--- End " + jsonbName + " ---\n\n") + cupaloy.SnapshotT(t, snap.String()) } +// buildJSONBTestTable assembles a synthetic Table with two value projections: +// one carrying the jsonb contentMediaType so it should map to JSONB, and one +// without so it should default to JSON. +func buildJSONBTestTable(t *testing.T) sql.Table { + t.Helper() + + const jsonbMediaType = "application/vnd.postgresql.jsonb+json" + multipleTypes := []string{"object", "string", "array", "number", "boolean", "null"} + + mkValue := func(field, contentType string) sql.Column { + var stringInf *pf.Inference_String + if contentType != "" { + stringInf = &pf.Inference_String{ContentType: contentType} + } + p := sql.Projection{ + Projection: pf.Projection{ + Field: field, + Ptr: "/" + field, + Inference: pf.Inference{ + Types: multipleTypes, + String_: stringInf, + Exists: pf.Inference_MAY, + }, + }, + } + return sql.Column{ + Projection: p, + MappedType: testDialect.MapType(&p, sql.FieldConfig{}), + Identifier: testDialect.Identifier(field), + } + } + + keyProj := sql.Projection{ + Projection: pf.Projection{ + Field: "id", + Ptr: "/id", + IsPrimaryKey: true, + Inference: pf.Inference{ + Types: []string{"integer"}, + Exists: pf.Inference_MUST, + }, + }, + } + keyCol := sql.Column{ + Projection: keyProj, + MappedType: testDialect.MapType(&keyProj, sql.FieldConfig{}), + Identifier: testDialect.Identifier("id"), + MustExist: true, + } + + tableName := "jsonb_round_trip" + return sql.Table{ + TableShape: sql.TableShape{ + Path: []string{"public", tableName}, + Comment: "Generated for materialize-postgres jsonb round-trip test", + }, + Identifier: testDialect.Identifier("public", tableName), + Keys: []sql.Column{keyCol}, + Values: []sql.Column{ + mkValue("plain_json", ""), + mkValue("jsonb_col", jsonbMediaType), + }, + } +} + func TestDateTimeColumn(t *testing.T) { var mapped = testDialect.MapType(&sql.Projection{ @@ -62,6 +137,42 @@ func TestDateTimeColumn(t *testing.T) { require.NoError(t, err) } +func TestJSONBContentMediaType(t *testing.T) { + jsonbMediaType := "application/vnd.postgresql.jsonb+json" + + mapWithMediaType := func(types []string, contentType string) string { + var stringInf *pf.Inference_String + for _, ty := range types { + if ty == "string" { + stringInf = &pf.Inference_String{ContentType: contentType} + break + } + } + return testDialect.MapType(&sql.Projection{ + Projection: pf.Projection{ + Inference: pf.Inference{ + Types: types, + String_: stringInf, + Exists: pf.Inference_MUST, + }, + }, + }, sql.FieldConfig{}).DDL + } + + require.Equal(t, + "JSONB NOT NULL", + mapWithMediaType([]string{"object", "string", "array", "number", "boolean"}, jsonbMediaType), + "MULTIPLE-typed field with jsonb contentMediaType should map to JSONB") + require.Equal(t, + "JSON NOT NULL", + mapWithMediaType([]string{"object", "string", "array", "number", "boolean"}, "application/json"), + "MULTIPLE-typed field with application/json contentMediaType should map to JSON") + require.Equal(t, + "JSON NOT NULL", + mapWithMediaType([]string{"object", "string", "array", "number", "boolean"}, ""), + "MULTIPLE-typed field without contentMediaType should default to JSON") +} + func TestTruncatedIdentifier(t *testing.T) { tests := []struct { name string diff --git a/source-postgres/.snapshots/TestDiscoveryComplex b/source-postgres/.snapshots/TestDiscoveryComplex index 3f83263f0c..a8faf20516 100644 --- a/source-postgres/.snapshots/TestDiscoveryComplex +++ b/source-postgres/.snapshots/TestDiscoveryComplex @@ -36,10 +36,12 @@ sql> COMMENT ON COLUMN test.discoverycomplex_934635.k1 IS 'I think this is a key ] }, "doc": { - "description": "(source type: json)" + "description": "(source type: json)", + "contentMediaType": "application/json" }, "doc/bin": { - "description": "(source type: non-nullable jsonb)" + "description": "(source type: non-nullable jsonb)", + "contentMediaType": "application/vnd.postgresql.jsonb+json" }, "foo": { "description": "This is a text field! (source type: text)", diff --git a/source-postgres/discovery.go b/source-postgres/discovery.go index c2dcf7c82a..bbcefd59c3 100644 --- a/source-postgres/discovery.go +++ b/source-postgres/discovery.go @@ -267,10 +267,11 @@ func (db *postgresDatabase) TranslateDBToJSONType(column sqlcapture.ColumnInfo, } type columnSchema struct { - contentEncoding string - format string - nullable bool - jsonTypes []string + contentEncoding string + contentMediaType string + format string + nullable bool + jsonTypes []string } func (s columnSchema) toType() *jsonschema.Schema { @@ -283,6 +284,10 @@ func (s columnSchema) toType() *jsonschema.Schema { out.Extras["contentEncoding"] = s.contentEncoding // New in 2019-09. } + if s.contentMediaType != "" { + out.Extras["contentMediaType"] = s.contentMediaType // New in 2019-09. + } + if s.jsonTypes != nil { var types = append([]string(nil), s.jsonTypes...) if s.nullable { @@ -338,8 +343,12 @@ var postgresTypeToJSON = map[string]columnSchema{ "bit": {jsonTypes: []string{"string"}}, "varbit": {jsonTypes: []string{"string"}}, - "json": {}, - "jsonb": {}, + // json and jsonb columns capture arbitrary JSON values, so we don't constrain + // the JSON Schema type. The contentMediaType annotation distinguishes the two + // at the wire so downstream connectors (e.g. materialize-postgres) can + // recreate the original column type instead of collapsing both onto json. + "json": {contentMediaType: "application/json"}, + "jsonb": {contentMediaType: "application/vnd.postgresql.jsonb+json"}, "jsonpath": {jsonTypes: []string{"string"}}, // Domain-Specific Types From b031b2fb4b880b3917726b63d1c6753deb876446 Mon Sep 17 00:00:00 2001 From: Mahdi Dibaiee Date: Fri, 15 May 2026 14:56:17 +0100 Subject: [PATCH 2/3] update jsonb contentMediaType: application/vnd.estuary.postgresql.jsonb+json --- materialize-postgres/sqlgen.go | 4 ++-- materialize-postgres/sqlgen_test.go | 2 +- source-postgres/.snapshots/TestDiscoveryComplex | 2 +- source-postgres/discovery.go | 2 +- 4 files changed, 5 insertions(+), 5 deletions(-) diff --git a/materialize-postgres/sqlgen.go b/materialize-postgres/sqlgen.go index 6281538022..bee29236d4 100644 --- a/materialize-postgres/sqlgen.go +++ b/materialize-postgres/sqlgen.go @@ -39,7 +39,7 @@ func createPgDialect(featureFlags map[string]bool) sql.Dialect { // as JSONB at the destination instead of collapsing onto JSON. Anything // without the annotation defaults to JSON, preserving the historical // behavior for existing materializations and non-Postgres sources. - jsonbContentMediaType := "application/vnd.postgresql.jsonb+json" + jsonbContentMediaType := "application/vnd.estuary.postgresql.jsonb+json" jsonOrJsonb := func(jsonMapper, jsonbMapper sql.MapProjectionFn) sql.MapProjectionFn { return func(p *sql.Projection) (sql.DDLer, sql.CompatibleColumnTypes, sql.ElementConverter) { if p.Inference.String_ != nil && p.Inference.String_.ContentType == jsonbContentMediaType { @@ -103,7 +103,7 @@ func createPgDialect(featureFlags map[string]bool) sql.Dialect { // source type annotation flips. "json": {sql.NewMigrationSpec([]string{"jsonb"})}, "jsonb": {sql.NewMigrationSpec([]string{"json"})}, - "*": {sql.NewMigrationSpec([]string{"json"}, sql.WithCastSQL(toJsonCast))}, + "*": {sql.NewMigrationSpec([]string{"json", "jsonb"}, sql.WithCastSQL(toJsonCast))}, }, TableLocatorer: sql.TableLocatorFn(func(path []string) sql.InfoTableLocation { if len(path) == 1 { diff --git a/materialize-postgres/sqlgen_test.go b/materialize-postgres/sqlgen_test.go index 37695c0461..04ab560f29 100644 --- a/materialize-postgres/sqlgen_test.go +++ b/materialize-postgres/sqlgen_test.go @@ -60,7 +60,7 @@ func TestSQLGeneration(t *testing.T) { func buildJSONBTestTable(t *testing.T) sql.Table { t.Helper() - const jsonbMediaType = "application/vnd.postgresql.jsonb+json" + const jsonbMediaType = "application/vnd.estuary.postgresql.jsonb+json" multipleTypes := []string{"object", "string", "array", "number", "boolean", "null"} mkValue := func(field, contentType string) sql.Column { diff --git a/source-postgres/.snapshots/TestDiscoveryComplex b/source-postgres/.snapshots/TestDiscoveryComplex index a8faf20516..b6c6695a8d 100644 --- a/source-postgres/.snapshots/TestDiscoveryComplex +++ b/source-postgres/.snapshots/TestDiscoveryComplex @@ -41,7 +41,7 @@ sql> COMMENT ON COLUMN test.discoverycomplex_934635.k1 IS 'I think this is a key }, "doc/bin": { "description": "(source type: non-nullable jsonb)", - "contentMediaType": "application/vnd.postgresql.jsonb+json" + "contentMediaType": "application/vnd.estuary.postgresql.jsonb+json" }, "foo": { "description": "This is a text field! (source type: text)", diff --git a/source-postgres/discovery.go b/source-postgres/discovery.go index bbcefd59c3..f9b0bcbfc5 100644 --- a/source-postgres/discovery.go +++ b/source-postgres/discovery.go @@ -348,7 +348,7 @@ var postgresTypeToJSON = map[string]columnSchema{ // at the wire so downstream connectors (e.g. materialize-postgres) can // recreate the original column type instead of collapsing both onto json. "json": {contentMediaType: "application/json"}, - "jsonb": {contentMediaType: "application/vnd.postgresql.jsonb+json"}, + "jsonb": {contentMediaType: "application/vnd.estuary.postgresql.jsonb+json"}, "jsonpath": {jsonTypes: []string{"string"}}, // Domain-Specific Types From f5ed74a83012f69a97ff67a52d1c5335523d8304 Mon Sep 17 00:00:00 2001 From: Mahdi Dibaiee Date: Thu, 21 May 2026 12:11:36 +0100 Subject: [PATCH 3/3] use latest flow for ContentMediaType see https://github.com/estuary/flow/pull/2945 --- go.mod | 4 ++-- go.sum | 12 ++++++++---- materialize-postgres/sqlgen.go | 2 +- materialize-postgres/sqlgen_test.go | 25 +++++++------------------ 4 files changed, 18 insertions(+), 25 deletions(-) diff --git a/go.mod b/go.mod index 68ef509aed..e5ec959865 100644 --- a/go.mod +++ b/go.mod @@ -39,7 +39,7 @@ require ( github.com/dropbox/dropbox-sdk-go-unofficial/v6 v6.0.5 github.com/duckdb/duckdb-go/v2 v2.10502.0 github.com/elastic/go-elasticsearch/v8 v8.9.0 - github.com/estuary/flow v0.6.4 + github.com/estuary/flow v0.6.9 github.com/estuary/vitess v0.15.10 github.com/evanphx/json-patch/v5 v5.9.11 github.com/go-mysql-org/go-mysql v0.0.0-20250907131429-558ed11751bc @@ -76,7 +76,7 @@ require ( github.com/tidwall/gjson v1.17.3 github.com/tidwall/sjson v1.2.5 github.com/wk8/go-ordered-map/v2 v2.1.8 - go.gazette.dev/core v0.102.0 + go.gazette.dev/core v0.103.1-0.20260505142537-ce718ada37ee go.mongodb.org/mongo-driver v1.17.4 golang.org/x/crypto v0.49.0 golang.org/x/exp v0.0.0-20260312153236-7ab1446f8b90 diff --git a/go.sum b/go.sum index 55c24ee636..a5dc9f3ab9 100644 --- a/go.sum +++ b/go.sum @@ -409,8 +409,12 @@ github.com/envoyproxy/go-control-plane/ratelimit v0.1.0/go.mod h1:Wk+tMFAFbCXaJP github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/envoyproxy/protoc-gen-validate v1.3.0 h1:TvGH1wof4H33rezVKWSpqKz5NXWg5VPuZ0uONDT6eb4= github.com/envoyproxy/protoc-gen-validate v1.3.0/go.mod h1:HvYl7zwPa5mffgyeTUHA9zHIH36nmrm7oCbo4YKoSWA= -github.com/estuary/flow v0.6.4 h1:MpermXN5h1U85zJspZftGSC7XbdTUWv22/FM/tvofSc= -github.com/estuary/flow v0.6.4/go.mod h1:xihzOEpeEzcLTsgU2LFvjLs0f5UvbTeAQGnwno6ondk= +github.com/estuary/flow v0.6.8 h1:HK9zYKV2HvF6jPI8eSDGdLy4nVUW2FWB+CA37PyuPLI= +github.com/estuary/flow v0.6.8/go.mod h1:SYiOTbPZ0ZlCh8qNVARPMjRF6/aVWY7xQuHFK9BSELk= +github.com/estuary/flow v0.6.9-0.20260520113844-a3d8595811ed h1:+pm+QyY8+bhC2gxJdRa735NX+QawxO0Piuv5c1CUj0w= +github.com/estuary/flow v0.6.9-0.20260520113844-a3d8595811ed/go.mod h1:jdQeVWS9sw520EU2u9UPYryE5iEo1Gu9Ju99LXJ8r7Y= +github.com/estuary/flow v0.6.9 h1:1cGdwmlB4UxNhRnvujTDGwmx1ABvBS9iqWYIIStRaRI= +github.com/estuary/flow v0.6.9/go.mod h1:jdQeVWS9sw520EU2u9UPYryE5iEo1Gu9Ju99LXJ8r7Y= github.com/estuary/go-mysql v0.0.0-20250918155720-90d473fd1a3e h1:stJOXFppEkEksGtqpJjIIfufID+RUkrkMCFKgu4Vfaw= github.com/estuary/go-mysql v0.0.0-20250918155720-90d473fd1a3e/go.mod h1:FQxw17uRbFvMZFK+dPtIPufbU46nBdrGaxOw0ac9MFs= github.com/estuary/vitess v0.15.10 h1:oXJgcG0HGZWuwjdvSp4pIFIAXtKLaR9Fl9VBynSszFA= @@ -1109,8 +1113,8 @@ go.etcd.io/etcd/client/pkg/v3 v3.6.5 h1:Duz9fAzIZFhYWgRjp/FgNq2gO1jId9Yae/rLn3Rr go.etcd.io/etcd/client/pkg/v3 v3.6.5/go.mod h1:8Wx3eGRPiy0qOFMZT/hfvdos+DjEaPxdIDiCDUv/FQk= go.etcd.io/etcd/client/v3 v3.6.5 h1:yRwZNFBx/35VKHTcLDeO7XVLbCBFbPi+XV4OC3QJf2U= go.etcd.io/etcd/client/v3 v3.6.5/go.mod h1:ZqwG/7TAFZ0BJ0jXRPoJjKQJtbFo/9NIY8uoFFKcCyo= -go.gazette.dev/core v0.102.0 h1:S1FFQn4Ws4qThrnwQhZ75jghg++Tfev++fPdq6PVlGw= -go.gazette.dev/core v0.102.0/go.mod h1:4qMw9JDecqIx6EijHy6ndbueTYeglm6LW4qKxEUOZ8M= +go.gazette.dev/core v0.103.1-0.20260505142537-ce718ada37ee h1:yceN2m1184/i4YSN4tlfMSdpnV+xnePgnUf4KFa00oE= +go.gazette.dev/core v0.103.1-0.20260505142537-ce718ada37ee/go.mod h1:4qMw9JDecqIx6EijHy6ndbueTYeglm6LW4qKxEUOZ8M= go.mongodb.org/mongo-driver v1.11.4/go.mod h1:PTSz5yu21bkT/wXpkS7WR5f0ddqw5quethTUn9WM+2g= go.mongodb.org/mongo-driver v1.17.4 h1:jUorfmVzljjr0FLzYQsGP8cgN/qzzxlY9Vh0C9KFXVw= go.mongodb.org/mongo-driver v1.17.4/go.mod h1:Hy04i7O2kC4RS06ZrhPRqj/u4DTYkFDAAccj+rVKqgQ= diff --git a/materialize-postgres/sqlgen.go b/materialize-postgres/sqlgen.go index bee29236d4..f56e554e5d 100644 --- a/materialize-postgres/sqlgen.go +++ b/materialize-postgres/sqlgen.go @@ -42,7 +42,7 @@ func createPgDialect(featureFlags map[string]bool) sql.Dialect { jsonbContentMediaType := "application/vnd.estuary.postgresql.jsonb+json" jsonOrJsonb := func(jsonMapper, jsonbMapper sql.MapProjectionFn) sql.MapProjectionFn { return func(p *sql.Projection) (sql.DDLer, sql.CompatibleColumnTypes, sql.ElementConverter) { - if p.Inference.String_ != nil && p.Inference.String_.ContentType == jsonbContentMediaType { + if p.Inference.ContentMediaType == jsonbContentMediaType { return jsonbMapper(p) } return jsonMapper(p) diff --git a/materialize-postgres/sqlgen_test.go b/materialize-postgres/sqlgen_test.go index 04ab560f29..42755276fb 100644 --- a/materialize-postgres/sqlgen_test.go +++ b/materialize-postgres/sqlgen_test.go @@ -64,18 +64,14 @@ func buildJSONBTestTable(t *testing.T) sql.Table { multipleTypes := []string{"object", "string", "array", "number", "boolean", "null"} mkValue := func(field, contentType string) sql.Column { - var stringInf *pf.Inference_String - if contentType != "" { - stringInf = &pf.Inference_String{ContentType: contentType} - } p := sql.Projection{ Projection: pf.Projection{ Field: field, Ptr: "/" + field, Inference: pf.Inference{ - Types: multipleTypes, - String_: stringInf, - Exists: pf.Inference_MAY, + Types: multipleTypes, + ContentMediaType: contentType, + Exists: pf.Inference_MAY, }, }, } @@ -138,22 +134,15 @@ func TestDateTimeColumn(t *testing.T) { } func TestJSONBContentMediaType(t *testing.T) { - jsonbMediaType := "application/vnd.postgresql.jsonb+json" + jsonbMediaType := "application/vnd.estuary.postgresql.jsonb+json" mapWithMediaType := func(types []string, contentType string) string { - var stringInf *pf.Inference_String - for _, ty := range types { - if ty == "string" { - stringInf = &pf.Inference_String{ContentType: contentType} - break - } - } return testDialect.MapType(&sql.Projection{ Projection: pf.Projection{ Inference: pf.Inference{ - Types: types, - String_: stringInf, - Exists: pf.Inference_MUST, + Types: types, + ContentMediaType: contentType, + Exists: pf.Inference_MUST, }, }, }, sql.FieldConfig{}).DDL