Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ require (
github.com/dropbox/dropbox-sdk-go-unofficial/v6 v6.0.5
github.com/duckdb/duckdb-go/v2 v2.10502.0
github.com/elastic/go-elasticsearch/v8 v8.9.0
github.com/estuary/flow v0.6.4
github.com/estuary/flow v0.6.9
github.com/estuary/vitess v0.15.10
github.com/evanphx/json-patch/v5 v5.9.11
github.com/go-mysql-org/go-mysql v0.0.0-20250907131429-558ed11751bc
Expand Down Expand Up @@ -76,7 +76,7 @@ require (
github.com/tidwall/gjson v1.17.3
github.com/tidwall/sjson v1.2.5
github.com/wk8/go-ordered-map/v2 v2.1.8
go.gazette.dev/core v0.102.0
go.gazette.dev/core v0.103.1-0.20260505142537-ce718ada37ee
go.mongodb.org/mongo-driver v1.17.4
golang.org/x/crypto v0.49.0
golang.org/x/exp v0.0.0-20260312153236-7ab1446f8b90
Expand Down
12 changes: 8 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -409,8 +409,12 @@ github.com/envoyproxy/go-control-plane/ratelimit v0.1.0/go.mod h1:Wk+tMFAFbCXaJP
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/envoyproxy/protoc-gen-validate v1.3.0 h1:TvGH1wof4H33rezVKWSpqKz5NXWg5VPuZ0uONDT6eb4=
github.com/envoyproxy/protoc-gen-validate v1.3.0/go.mod h1:HvYl7zwPa5mffgyeTUHA9zHIH36nmrm7oCbo4YKoSWA=
github.com/estuary/flow v0.6.4 h1:MpermXN5h1U85zJspZftGSC7XbdTUWv22/FM/tvofSc=
github.com/estuary/flow v0.6.4/go.mod h1:xihzOEpeEzcLTsgU2LFvjLs0f5UvbTeAQGnwno6ondk=
github.com/estuary/flow v0.6.8 h1:HK9zYKV2HvF6jPI8eSDGdLy4nVUW2FWB+CA37PyuPLI=
github.com/estuary/flow v0.6.8/go.mod h1:SYiOTbPZ0ZlCh8qNVARPMjRF6/aVWY7xQuHFK9BSELk=
github.com/estuary/flow v0.6.9-0.20260520113844-a3d8595811ed h1:+pm+QyY8+bhC2gxJdRa735NX+QawxO0Piuv5c1CUj0w=
github.com/estuary/flow v0.6.9-0.20260520113844-a3d8595811ed/go.mod h1:jdQeVWS9sw520EU2u9UPYryE5iEo1Gu9Ju99LXJ8r7Y=
github.com/estuary/flow v0.6.9 h1:1cGdwmlB4UxNhRnvujTDGwmx1ABvBS9iqWYIIStRaRI=
github.com/estuary/flow v0.6.9/go.mod h1:jdQeVWS9sw520EU2u9UPYryE5iEo1Gu9Ju99LXJ8r7Y=
github.com/estuary/go-mysql v0.0.0-20250918155720-90d473fd1a3e h1:stJOXFppEkEksGtqpJjIIfufID+RUkrkMCFKgu4Vfaw=
github.com/estuary/go-mysql v0.0.0-20250918155720-90d473fd1a3e/go.mod h1:FQxw17uRbFvMZFK+dPtIPufbU46nBdrGaxOw0ac9MFs=
github.com/estuary/vitess v0.15.10 h1:oXJgcG0HGZWuwjdvSp4pIFIAXtKLaR9Fl9VBynSszFA=
Expand Down Expand Up @@ -1109,8 +1113,8 @@ go.etcd.io/etcd/client/pkg/v3 v3.6.5 h1:Duz9fAzIZFhYWgRjp/FgNq2gO1jId9Yae/rLn3Rr
go.etcd.io/etcd/client/pkg/v3 v3.6.5/go.mod h1:8Wx3eGRPiy0qOFMZT/hfvdos+DjEaPxdIDiCDUv/FQk=
go.etcd.io/etcd/client/v3 v3.6.5 h1:yRwZNFBx/35VKHTcLDeO7XVLbCBFbPi+XV4OC3QJf2U=
go.etcd.io/etcd/client/v3 v3.6.5/go.mod h1:ZqwG/7TAFZ0BJ0jXRPoJjKQJtbFo/9NIY8uoFFKcCyo=
go.gazette.dev/core v0.102.0 h1:S1FFQn4Ws4qThrnwQhZ75jghg++Tfev++fPdq6PVlGw=
go.gazette.dev/core v0.102.0/go.mod h1:4qMw9JDecqIx6EijHy6ndbueTYeglm6LW4qKxEUOZ8M=
go.gazette.dev/core v0.103.1-0.20260505142537-ce718ada37ee h1:yceN2m1184/i4YSN4tlfMSdpnV+xnePgnUf4KFa00oE=
go.gazette.dev/core v0.103.1-0.20260505142537-ce718ada37ee/go.mod h1:4qMw9JDecqIx6EijHy6ndbueTYeglm6LW4qKxEUOZ8M=
go.mongodb.org/mongo-driver v1.11.4/go.mod h1:PTSz5yu21bkT/wXpkS7WR5f0ddqw5quethTUn9WM+2g=
go.mongodb.org/mongo-driver v1.17.4 h1:jUorfmVzljjr0FLzYQsGP8cgN/qzzxlY9Vh0C9KFXVw=
go.mongodb.org/mongo-driver v1.17.4/go.mod h1:Hy04i7O2kC4RS06ZrhPRqj/u4DTYkFDAAccj+rVKqgQ=
Expand Down
16 changes: 16 additions & 0 deletions materialize-postgres/.snapshots/TestSQLGeneration
Original file line number Diff line number Diff line change
Expand Up @@ -344,4 +344,20 @@ BEGIN
END $$;
--- End Fence Update ---

--- Begin createTargetTable [jsonb contentMediaType] ---

CREATE TABLE IF NOT EXISTS "public".jsonb_round_trip (
id BIGINT NOT NULL,
plain_json JSON,
jsonb_col JSONB,

PRIMARY KEY (id)
);

COMMENT ON TABLE "public".jsonb_round_trip IS 'Generated for materialize-postgres jsonb round-trip test';
COMMENT ON COLUMN "public".jsonb_round_trip.id IS '';
COMMENT ON COLUMN "public".jsonb_round_trip.plain_json IS '';
COMMENT ON COLUMN "public".jsonb_round_trip.jsonb_col IS '';
--- End createTargetTable [jsonb contentMediaType] ---


37 changes: 30 additions & 7 deletions materialize-postgres/sqlgen.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,18 +34,36 @@ func createPgDialect(featureFlags map[string]bool) sql.Dialect {
binaryMapping = sql.MapStatic("BYTEA", sql.UsingConverter(sql.Base64Decoder))
}

// Fields originating from a PostgreSQL JSONB column carry this
// (non-standard, vendor-tree) contentMediaType so they can be recreated
// as JSONB at the destination instead of collapsing onto JSON. Anything
// without the annotation defaults to JSON, preserving the historical
// behavior for existing materializations and non-Postgres sources.
jsonbContentMediaType := "application/vnd.estuary.postgresql.jsonb+json"
jsonOrJsonb := func(jsonMapper, jsonbMapper sql.MapProjectionFn) sql.MapProjectionFn {
return func(p *sql.Projection) (sql.DDLer, sql.CompatibleColumnTypes, sql.ElementConverter) {
if p.Inference.ContentMediaType == jsonbContentMediaType {
return jsonbMapper(p)
}
return jsonMapper(p)
}
}

mapper := sql.NewDDLMapper(
sql.FlatTypeMappings{
sql.INTEGER: sql.MapSignedInt64(
sql.MapStatic("BIGINT", sql.AlsoCompatibleWith("integer")),
sql.MapStatic("NUMERIC"),
),
sql.NUMBER: sql.MapStatic("DOUBLE PRECISION"),
sql.BOOLEAN: sql.MapStatic("BOOLEAN"),
sql.OBJECT: sql.MapStatic("JSON"),
sql.ARRAY: sql.MapStatic("JSON"),
sql.BINARY: binaryMapping,
sql.MULTIPLE: sql.MapStatic("JSON", sql.UsingConverter(sql.ToJsonBytes)),
sql.NUMBER: sql.MapStatic("DOUBLE PRECISION"),
sql.BOOLEAN: sql.MapStatic("BOOLEAN"),
sql.OBJECT: jsonOrJsonb(sql.MapStatic("JSON"), sql.MapStatic("JSONB")),
sql.ARRAY: jsonOrJsonb(sql.MapStatic("JSON"), sql.MapStatic("JSONB")),
sql.BINARY: binaryMapping,
sql.MULTIPLE: jsonOrJsonb(
sql.MapStatic("JSON", sql.UsingConverter(sql.ToJsonBytes)),
sql.MapStatic("JSONB", sql.UsingConverter(sql.ToJsonBytes)),
),
sql.STRING_INTEGER: sql.MapStatic("NUMERIC"),
sql.STRING_NUMBER: sql.MapStatic("DECIMAL", sql.AlsoCompatibleWith("numeric")),
sql.STRING: sql.MapString(sql.StringMappings{
Expand Down Expand Up @@ -80,7 +98,12 @@ func createPgDialect(featureFlags map[string]bool) sql.Dialect {
"text": {sql.NewMigrationSpec([]string{"bytea"}, sql.WithCastSQL(stringToByteaCast))},
"character varying": {sql.NewMigrationSpec([]string{"bytea"}, sql.WithCastSQL(stringToByteaCast))},
"bytea": {sql.NewMigrationSpec([]string{"text"}, sql.WithCastSQL(byteaToStringCast))},
"*": {sql.NewMigrationSpec([]string{"json"}, sql.WithCastSQL(toJsonCast))},
// PostgreSQL accepts implicit casts in both directions between
// json and jsonb, so columns can move freely as the upstream
// source type annotation flips.
"json": {sql.NewMigrationSpec([]string{"jsonb"})},
"jsonb": {sql.NewMigrationSpec([]string{"json"})},
"*": {sql.NewMigrationSpec([]string{"json", "jsonb"}, sql.WithCastSQL(toJsonCast))},
},
TableLocatorer: sql.TableLocatorFn(func(path []string) sql.InfoTableLocation {
if len(path) == 1 {
Expand Down
100 changes: 100 additions & 0 deletions materialize-postgres/sqlgen_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,80 @@ func TestSQLGeneration(t *testing.T) {
},
)

// Exercise the source-postgres -> materialize-postgres JSONB round-trip:
// a value field carrying the application/vnd.postgresql.jsonb+json
// contentMediaType must render as JSONB, while a sibling field without
// the annotation stays on JSON.
jsonbTable := buildJSONBTestTable(t)
jsonbName := "createTargetTable [jsonb contentMediaType]"
snap.WriteString("--- Begin " + jsonbName + " ---\n")
require.NoError(t, testTemplates.createTargetTable.Execute(snap, &jsonbTable))
snap.WriteString("--- End " + jsonbName + " ---\n\n")

cupaloy.SnapshotT(t, snap.String())
}

// buildJSONBTestTable assembles a synthetic Table with two value projections:
// one carrying the jsonb contentMediaType so it should map to JSONB, and one
// without so it should default to JSON.
func buildJSONBTestTable(t *testing.T) sql.Table {
t.Helper()

const jsonbMediaType = "application/vnd.estuary.postgresql.jsonb+json"
multipleTypes := []string{"object", "string", "array", "number", "boolean", "null"}

mkValue := func(field, contentType string) sql.Column {
p := sql.Projection{
Projection: pf.Projection{
Field: field,
Ptr: "/" + field,
Inference: pf.Inference{
Types: multipleTypes,
ContentMediaType: contentType,
Exists: pf.Inference_MAY,
},
},
}
return sql.Column{
Projection: p,
MappedType: testDialect.MapType(&p, sql.FieldConfig{}),
Identifier: testDialect.Identifier(field),
}
}

keyProj := sql.Projection{
Projection: pf.Projection{
Field: "id",
Ptr: "/id",
IsPrimaryKey: true,
Inference: pf.Inference{
Types: []string{"integer"},
Exists: pf.Inference_MUST,
},
},
}
keyCol := sql.Column{
Projection: keyProj,
MappedType: testDialect.MapType(&keyProj, sql.FieldConfig{}),
Identifier: testDialect.Identifier("id"),
MustExist: true,
}

tableName := "jsonb_round_trip"
return sql.Table{
TableShape: sql.TableShape{
Path: []string{"public", tableName},
Comment: "Generated for materialize-postgres jsonb round-trip test",
},
Identifier: testDialect.Identifier("public", tableName),
Keys: []sql.Column{keyCol},
Values: []sql.Column{
mkValue("plain_json", ""),
mkValue("jsonb_col", jsonbMediaType),
},
}
}

func TestDateTimeColumn(t *testing.T) {

var mapped = testDialect.MapType(&sql.Projection{
Expand All @@ -62,6 +133,35 @@ func TestDateTimeColumn(t *testing.T) {
require.NoError(t, err)
}

func TestJSONBContentMediaType(t *testing.T) {
jsonbMediaType := "application/vnd.estuary.postgresql.jsonb+json"

mapWithMediaType := func(types []string, contentType string) string {
return testDialect.MapType(&sql.Projection{
Projection: pf.Projection{
Inference: pf.Inference{
Types: types,
ContentMediaType: contentType,
Exists: pf.Inference_MUST,
},
},
}, sql.FieldConfig{}).DDL
}

require.Equal(t,
"JSONB NOT NULL",
mapWithMediaType([]string{"object", "string", "array", "number", "boolean"}, jsonbMediaType),
"MULTIPLE-typed field with jsonb contentMediaType should map to JSONB")
require.Equal(t,
"JSON NOT NULL",
mapWithMediaType([]string{"object", "string", "array", "number", "boolean"}, "application/json"),
"MULTIPLE-typed field with application/json contentMediaType should map to JSON")
require.Equal(t,
"JSON NOT NULL",
mapWithMediaType([]string{"object", "string", "array", "number", "boolean"}, ""),
"MULTIPLE-typed field without contentMediaType should default to JSON")
}

func TestTruncatedIdentifier(t *testing.T) {
tests := []struct {
name string
Expand Down
6 changes: 4 additions & 2 deletions source-postgres/.snapshots/TestDiscoveryComplex
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,12 @@ sql> COMMENT ON COLUMN test.discoverycomplex_934635.k1 IS 'I think this is a key
]
},
"doc": {
"description": "(source type: json)"
"description": "(source type: json)",
"contentMediaType": "application/json"
},
"doc/bin": {
"description": "(source type: non-nullable jsonb)"
"description": "(source type: non-nullable jsonb)",
"contentMediaType": "application/vnd.estuary.postgresql.jsonb+json"
},
"foo": {
"description": "This is a text field! (source type: text)",
Expand Down
21 changes: 15 additions & 6 deletions source-postgres/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,10 +267,11 @@ func (db *postgresDatabase) TranslateDBToJSONType(column sqlcapture.ColumnInfo,
}

type columnSchema struct {
contentEncoding string
format string
nullable bool
jsonTypes []string
contentEncoding string
contentMediaType string
format string
nullable bool
jsonTypes []string
}

func (s columnSchema) toType() *jsonschema.Schema {
Expand All @@ -283,6 +284,10 @@ func (s columnSchema) toType() *jsonschema.Schema {
out.Extras["contentEncoding"] = s.contentEncoding // New in 2019-09.
}

if s.contentMediaType != "" {
out.Extras["contentMediaType"] = s.contentMediaType // New in 2019-09.
}

if s.jsonTypes != nil {
var types = append([]string(nil), s.jsonTypes...)
if s.nullable {
Expand Down Expand Up @@ -338,8 +343,12 @@ var postgresTypeToJSON = map[string]columnSchema{
"bit": {jsonTypes: []string{"string"}},
"varbit": {jsonTypes: []string{"string"}},

"json": {},
"jsonb": {},
// json and jsonb columns capture arbitrary JSON values, so we don't constrain
// the JSON Schema type. The contentMediaType annotation distinguishes the two
// at the wire so downstream connectors (e.g. materialize-postgres) can
// recreate the original column type instead of collapsing both onto json.
"json": {contentMediaType: "application/json"},
"jsonb": {contentMediaType: "application/vnd.estuary.postgresql.jsonb+json"},
"jsonpath": {jsonTypes: []string{"string"}},

// Domain-Specific Types
Expand Down
Loading