diff --git a/.gitignore b/.gitignore index 4cb95d52..6436d84c 100644 --- a/.gitignore +++ b/.gitignore @@ -8,6 +8,7 @@ *.sw[ap] *.yml !docs/monitor/*.yml +!tests/integration/**/*.yml tags *.bak *.tar.gz @@ -31,3 +32,6 @@ common/123.pid #Claude .claude/ + +# Local-only test env vars (see docs/refactor-test-config-env-vars.md) +.env.test diff --git a/collector/coordinator/extra_job_test.go b/collector/coordinator/extra_job_test.go index 6ec1ffe6..591fb20e 100644 --- a/collector/coordinator/extra_job_test.go +++ b/collector/coordinator/extra_job_test.go @@ -14,7 +14,7 @@ import ( "github.com/alibaba/MongoShake/v2/unit_test_common" ) -const ( +var ( testMongoAddress = unit_test_common.TestUrl ) diff --git a/collector/coordinator/full.go b/collector/coordinator/full.go index c08aa10c..cd9ceb63 100644 --- a/collector/coordinator/full.go +++ b/collector/coordinator/full.go @@ -84,6 +84,18 @@ func (coordinator *ReplicationCoordinator) startDocumentReplication() (err error } } else { l.Logger.Infof("source is replica or mongos, no need to fetching chunk map") + // surface a clear warning when full_sync.executor.filter.orphan_document=true is + // configured but cannot take effect: in mongos mode the orphan filter has no + // way to attribute a fetched doc to a specific shard, so silently dropping the + // option (the previous behavior) gives users the impression it was applied. + if fromIsSharding && coordinator.MongoS != nil && + conf.Options.FullSyncExecutorFilterOrphanDocument { + l.Logger.Warnf("full_sync.executor.filter.orphan_document=true is set but " + + "source is mongos (mongo_s_url configured); the orphan filter will NOT take " + + "effect because docs read via mongos are not tagged with their origin shard. " + + "To enable orphan filtering, drop mongo_s_url and use mongo_urls (mongod direct) " + + "plus mongo_cs_url, or run cleanupOrphaned on the source cluster before full sync.") + } } filterList := filter.NewDocFilterList() diff --git a/collector/coordinator/utils_test.go b/collector/coordinator/utils_test.go index 7fb377f2..696021e1 100644 --- a/collector/coordinator/utils_test.go +++ b/collector/coordinator/utils_test.go @@ -17,11 +17,14 @@ import ( "github.com/alibaba/MongoShake/v2/unit_test_common" ) -const ( +var ( testUrl = unit_test_common.TestUrl testUrlServerless = unit_test_common.TestUrlServerlessTenant - testDb = "test_db3" - testCollection = "test_ut" +) + +const ( + testDb = "test_db3" + testCollection = "test_ut" ) func TestSelectSyncMode(t *testing.T) { diff --git a/collector/docsyncer/doc_executor.go b/collector/docsyncer/doc_executor.go index 27dd1eb7..69accae0 100644 --- a/collector/docsyncer/doc_executor.go +++ b/collector/docsyncer/doc_executor.go @@ -192,10 +192,17 @@ func (exec *DocExecutor) doSync(docs []*bson.Raw) error { if conf.Options.FullSyncExecutorFilterOrphanDocument && exec.syncer.orphanFilter != nil { var docData bson.D if err := bson.Unmarshal(*doc, &docData); err != nil { - l.Logger.Errorf("doSync do bson unmarshal %v failed. %v", doc, err) - } - // judge whether is orphan document, pass if so - if exec.syncer.orphanFilter.Filter(docData, ns.Database+"."+ns.Collection) { + // previously fell through with an empty docData, which then caused + // OrphanFilter.Filter -> oplog.GetKey to return nil and Panicf the + // process. Skip the orphan check on unparseable docs instead so a + // single bad payload can't crash the syncer; the corrupt bytes + // will surface again at BulkWrite as a structured driver error. + l.Logger.Errorf("doSync skip orphan check, bson unmarshal failed: %v", err) + // intentional fall-through to BulkWrite (no continue): + // the malformed payload should reach the driver and surface + // there as a typed write error, not be silently dropped. + } else if exec.syncer.orphanFilter.Filter(docData, ns.Database+"."+ns.Collection) { + // judge whether is orphan document, pass if so l.Logger.Infof("orphan document [%v] filter", doc) continue } diff --git a/collector/docsyncer/doc_syncer_test.go b/collector/docsyncer/doc_syncer_test.go index 4305e993..7e40f012 100644 --- a/collector/docsyncer/doc_syncer_test.go +++ b/collector/docsyncer/doc_syncer_test.go @@ -19,10 +19,13 @@ import ( "github.com/alibaba/MongoShake/v2/unit_test_common" ) -const ( +var ( testMongoAddress = unit_test_common.TestUrl - testDb = "test_db" - testCollection = "test_coll" +) + +const ( + testDb = "test_db" + testCollection = "test_coll" ) var ( @@ -189,8 +192,8 @@ func TestDbSync(t *testing.T) { }, }, }, - Keys: []string{"x"}, - ShardType: sharding.RangedShard, + Keys: []string{"x"}, + ShardTypes: []string{sharding.RangedShard}, }, }) dbSyncer := &DBSyncer{ diff --git a/collector/filter/orphan_filter.go b/collector/filter/orphan_filter.go index cbf5d56c..cf0aa7de 100644 --- a/collector/filter/orphan_filter.go +++ b/collector/filter/orphan_filter.go @@ -14,13 +14,23 @@ import ( ) const ( - // refer to mongo/bson/bsontypes.h of mongodb kernel 4.0 - BsonInvalid = -1 - BsonMinKey = 0 - BsonTypeNumber = 10 - BsonTypeString = 15 - BsonTypeOid = 35 - BsonMaxKey = 100 + // canonical type ordering kept in sync with mongo/bson/bsontypes.h + // (verified identical across kernel 4.0 and 8.0). BsonTypeInt64 is a + // MongoShake-local refinement: BSON has a single "number" canonical + // type, but we split int64 out so OrphanFilter can compare hashed + // shard bounds (which are full-range int64) without losing precision + // through float64. Mixed int64/float64 comparison promotes to float64. + BsonInvalid = -1 + BsonMinKey = 0 + BsonTypeNumber = 10 + BsonTypeInt64 = 11 + BsonTypeString = 15 + BsonTypeOid = 35 + BsonTypeBool = 40 + BsonTypeDate = 45 + BsonTypeTstamp = 47 + BsonTypeNull = 5 + BsonMaxKey = 100 ) type OrphanFilter struct { @@ -54,7 +64,11 @@ NextChunk: if key == nil { l.Logger.Panicf("OrphanFilter find no shard key[%v] in doc %v", keyName, docD) } - if shardCol.ShardType == sharding.HashedShard { + // Compound shard keys can mix hashed and ranged columns + // (e.g. {a: 1, b: "hashed"}); hash only the columns the + // server hashes, otherwise the ranged column would be + // compared against a hashed bound and silently mis-classify. + if shardCol.ShardTypes[keyInd] == sharding.HashedShard { key = ComputeHash(key) } if chunkLt(key, chunkRage.Mins[keyInd]) { @@ -70,7 +84,7 @@ NextChunk: if key == nil { l.Logger.Panicf("OrphanFilter find no shard ke[%v] in doc %v", keyName, docD) } - if shardCol.ShardType == sharding.HashedShard { + if shardCol.ShardTypes[keyInd] == sharding.HashedShard { key = ComputeHash(key) } if chunkGt(key, chunkRage.Maxs[keyInd]) { @@ -91,47 +105,101 @@ NextChunk: return true } +// ComputeHash reproduces MongoDB's hashed-shard hash for a BSON shard key +// value. Algorithm: MD5(seed_LE32 || canonicalType_LE32 || valueBytes), +// take the low 8 bytes as little-endian int64. Verified identical between +// kernel 4.0 and 8.0 (see hasher.cpp). +// +// All numeric BSON types (Double, Int, Long, Decimal128) are normalised to +// int64 server-side via safeNumberLongForHash before hashing; we mirror +// that for int / int32 / int64 / float64. Decimal128 is not supported here +// because faithfully reproducing safeNumberLongForHash for non-integer +// decimals (NaN / Inf / overflow) is nontrivial and Decimal128 hashed +// shard keys are extremely rare in practice — a clear panic is preferable +// to a silent mismatch with the kernel. func ComputeHash(data interface{}) int64 { - // refer to mongo/db/hasher.cpp of mongodb kernel 4.0 w := md5.New() var buf = make([]byte, 4) - binary.LittleEndian.PutUint32(buf, uint32(0)) + binary.LittleEndian.PutUint32(buf, uint32(0)) // seed = 0 w.Write(buf) + writeType := func(t int) { + binary.LittleEndian.PutUint32(buf, uint32(t)) + w.Write(buf) + } + writeI64 := func(v int64) { + b := make([]byte, 8) + binary.LittleEndian.PutUint64(b, uint64(v)) + w.Write(b) + } + switch rd := data.(type) { case string: - binary.LittleEndian.PutUint32(buf, uint32(BsonTypeString)) - w.Write(buf) + // BSON string value layout: int32(len+1) | bytes | 0x00 + writeType(BsonTypeString) binary.LittleEndian.PutUint32(buf, uint32(len(rd)+1)) w.Write(buf) - s := []byte(rd) - s = append(s, 0) - w.Write(s) - case int, int64, float64: - var rdu uint64 - if rd1, ok := rd.(int); ok { - rdu = uint64(rd1) - } else if rd2, ok := rd.(int64); ok { - rdu = uint64(rd2) - } else if rd3, ok := rd.(float64); ok { - rdu = uint64(rd3) - } - binary.LittleEndian.PutUint32(buf, uint32(BsonTypeNumber)) - w.Write(buf) - buf = make([]byte, 8) - binary.LittleEndian.PutUint64(buf, rdu) - w.Write(buf) + w.Write(append([]byte(rd), 0)) + case int: + writeType(BsonTypeNumber) + writeI64(int64(rd)) + case int32: + writeType(BsonTypeNumber) + writeI64(int64(rd)) + case int64: + writeType(BsonTypeNumber) + writeI64(rd) + case float64: + // kernel: safeNumberLongForHash truncates toward zero, NaN/Inf + // map to int64 sentinel values. Plain Go cast handles finite + // in-range values identically; outside that range users would + // hit different bytes from the server, but float64 hashed shard + // keys are vanishingly rare. + writeType(BsonTypeNumber) + writeI64(int64(rd)) case primitive.ObjectID: - binary.LittleEndian.PutUint32(buf, uint32(BsonTypeOid)) - w.Write(buf) - buf = rd[:] - w.Write(buf) + writeType(BsonTypeOid) + w.Write(rd[:]) + case bool: + // BSON bool value is a single byte. + writeType(BsonTypeBool) + if rd { + w.Write([]byte{0x01}) + } else { + w.Write([]byte{0x00}) + } + case primitive.DateTime: + // BSON Date value is int64 milliseconds since epoch, LE. + writeType(BsonTypeDate) + writeI64(int64(rd)) + case primitive.Timestamp: + // BSON Timestamp value layout (per BSON spec / bsonelement.h): + // 8 bytes total, low 4 = increment uint32 LE, high 4 = time uint32 LE. + // primitive.Timestamp{T: seconds, I: increment} — note the field + // names: T is "time" (seconds), I is "increment". Increment goes + // FIRST in the byte layout; getting this order wrong is a common + // source of hash mismatch with the server, so verified against + // mongo/bson/bsonelement_inline.h Timestamp() accessor. + writeType(BsonTypeTstamp) + b := make([]byte, 8) + binary.LittleEndian.PutUint32(b[0:4], rd.I) + binary.LittleEndian.PutUint32(b[4:8], rd.T) + w.Write(b) + case nil: + // BSON null carries no value bytes; only the canonical type contributes. + // Note: OrphanFilter.Filter cannot currently reach this branch because + // the upstream oplog.GetKey returns nil for both "field missing" and + // "field is null" and Filter Panicf-s on nil. This case is kept as + // future-proofing: if GetKey is ever fixed to distinguish the two, + // ComputeHash already handles null without further changes. + writeType(BsonTypeNull) default: - l.Logger.Panicf("ComputeHash unsupported bson type %T %#v\n", data, data) + l.Logger.Panicf("ComputeHash unsupported bson type %T %#v "+ + "(Decimal128 / BinData / Symbol are intentionally not implemented; "+ + "open an issue if you actually hit one)", data, data) } out := w.Sum(nil) - result := int64(binary.LittleEndian.Uint64(out)) - return result + return int64(binary.LittleEndian.Uint64(out)) } func fromHex(c byte) byte { @@ -147,21 +215,72 @@ func fromHex(c byte) byte { return 0xff } +// numericCmp returns -1/0/1 for xy when both sides are +// numeric. int64 vs int64 stays exact; everything else (or mixed types) +// promotes to float64 (acceptable: BSON range-shard chunk bounds are +// virtually never mixed-type, and the chunk math beats a panic). +func numericCmp(xType int, rx interface{}, yType int, ry interface{}) int { + if xType == BsonTypeInt64 && yType == BsonTypeInt64 { + xv, yv := rx.(int64), ry.(int64) + switch { + case xv < yv: + return -1 + case xv > yv: + return 1 + } + return 0 + } + // Mixed int64/float64 (or float64/float64). Promote to float64; this + // can lose precision above 2^53 if an int64 sneaks in. In a typical + // BSON range shard chunk this never happens (bounds and doc values + // share a type), but warn once so production has a breadcrumb if it + // ever does. + if xType != yType { + l.Logger.Warnf("OrphanFilter numericCmp: mixed numeric types (xType=%d yType=%d) "+ + "promoted to float64; may lose precision for int64 above 2^53", + xType, yType) + } + var xv, yv float64 + if xType == BsonTypeInt64 { + xv = float64(rx.(int64)) + } else { + xv = rx.(float64) + } + if yType == BsonTypeInt64 { + yv = float64(ry.(int64)) + } else { + yv = ry.(float64) + } + switch { + case xv < yv: + return -1 + case xv > yv: + return 1 + } + return 0 +} + +// numericType reports whether t is one of the two MongoShake numeric +// canonical-type tags (BsonTypeNumber covers float/int/int32; BsonTypeInt64 +// is the int64-precise refinement). +func numericType(t int) bool { + return t == BsonTypeNumber || t == BsonTypeInt64 +} + func chunkGt(x, y interface{}) bool { xType, rx := getBsonType(x) yType, ry := getBsonType(y) + if numericType(xType) && numericType(yType) { + return numericCmp(xType, rx, yType, ry) > 0 + } if xType != yType { return xType > yType } switch xType { - case BsonMinKey: - return false - case BsonMaxKey: + case BsonMinKey, BsonMaxKey: return false - case BsonTypeNumber: - return rx.(float64) > ry.(float64) case BsonTypeString: return rx.(string) > ry.(string) default: @@ -174,17 +293,16 @@ func chunkEqual(x, y interface{}) bool { xType, rx := getBsonType(x) yType, ry := getBsonType(y) + if numericType(xType) && numericType(yType) { + return numericCmp(xType, rx, yType, ry) == 0 + } if xType != yType { return false } switch xType { - case BsonMinKey: - return true - case BsonMaxKey: + case BsonMinKey, BsonMaxKey: return true - case BsonTypeNumber: - return rx.(float64) == ry.(float64) case BsonTypeString: return rx.(string) == ry.(string) default: @@ -197,17 +315,16 @@ func chunkLt(x, y interface{}) bool { xType, rx := getBsonType(x) yType, ry := getBsonType(y) + if numericType(xType) && numericType(yType) { + return numericCmp(xType, rx, yType, ry) < 0 + } if xType != yType { return xType < yType } switch xType { - case BsonMinKey: - return false - case BsonMaxKey: + case BsonMinKey, BsonMaxKey: return false - case BsonTypeNumber: - return rx.(float64) < ry.(float64) case BsonTypeString: return rx.(string) < ry.(string) default: @@ -217,13 +334,11 @@ func chunkLt(x, y interface{}) bool { } func getBsonType(x interface{}) (int, interface{}) { - if x == int64(math.MinInt64) { + switch rx := x.(type) { + case primitive.MinKey: return BsonMinKey, nil - } - if x == int64(math.MaxInt64) { + case primitive.MaxKey: return BsonMaxKey, nil - } - switch rx := x.(type) { case float32: return BsonTypeNumber, float64(rx) case float64: @@ -233,13 +348,21 @@ func getBsonType(x interface{}) (int, interface{}) { case int32: return BsonTypeNumber, float64(rx) case int64: - return BsonTypeNumber, float64(rx) + if rx == math.MinInt64 { + return BsonMinKey, nil + } + if rx == math.MaxInt64 { + return BsonMaxKey, nil + } + // keep precision; chunkLt/Gt/Equal handle int64 specifically so + // hashed-shard bounds (full-range int64) don't collapse via float64 + return BsonTypeInt64, rx case string: return BsonTypeString, rx case primitive.ObjectID: return BsonTypeOid, rx.Hex() default: - l.Logger.Panicf("chunkLt meet unknown type %T", x) + l.Logger.Panicf("getBsonType meet unknown type %T", x) } return BsonInvalid, nil } diff --git a/collector/filter/orphan_filter_test.go b/collector/filter/orphan_filter_test.go new file mode 100644 index 00000000..cfc9edea --- /dev/null +++ b/collector/filter/orphan_filter_test.go @@ -0,0 +1,361 @@ +package filter + +import ( + "math" + "testing" + + "github.com/stretchr/testify/assert" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/bson/primitive" + + "github.com/alibaba/MongoShake/v2/sharding" +) + +const testNs = "db.coll" + +// keyAll covers the entire hash space; lets hashed-shard tests assert that +// ComputeHash is wired up without depending on the chunkLt/chunkGt int64 +// precision issue (see TestOrphanFilter_HashedPrecisionBug below). +var keyAll = []*sharding.ChunkRange{{ + Mins: []interface{}{int64(math.MinInt64)}, + Maxs: []interface{}{int64(math.MaxInt64)}, +}} + +func mkRange(t *testing.T, sc *sharding.ShardCollection) sharding.DBChunkMap { + t.Helper() + return sharding.DBChunkMap{testNs: sc} +} + +// chunkMap is nil: no chunk info loaded, do not filter. +func TestOrphanFilter_NilChunkMap(t *testing.T) { + f := NewOrphanFilter("rs0", nil) + assert.False(t, f.Filter(bson.D{{"x", 1}}, testNs)) +} + +// namespace not present in chunk map: collection isn't sharded, do not filter. +func TestOrphanFilter_NamespaceMissing(t *testing.T) { + f := NewOrphanFilter("rs0", sharding.DBChunkMap{ + "other.coll": &sharding.ShardCollection{ + Keys: []string{"x"}, ShardTypes: []string{sharding.RangedShard}, + Chunks: []*sharding.ChunkRange{{Mins: []interface{}{0}, Maxs: []interface{}{10}}}, + }, + }) + assert.False(t, f.Filter(bson.D{{"x", 1}}, testNs)) +} + +// range, single shard key: in/out/min/max boundary semantics. +func TestOrphanFilter_RangeSingleKey(t *testing.T) { + cm := mkRange(t, &sharding.ShardCollection{ + Keys: []string{"x"}, ShardTypes: []string{sharding.RangedShard}, + Chunks: []*sharding.ChunkRange{ + {Mins: []interface{}{1}, Maxs: []interface{}{10}}, + {Mins: []interface{}{50}, Maxs: []interface{}{100}}, + }, + }) + f := NewOrphanFilter("rs0", cm) + + cases := []struct { + name string + key int + orphan bool + }{ + {"in first chunk", 5, false}, + {"in second chunk", 75, false}, + {"between chunks", 30, true}, + {"before all", 0, true}, + {"after all", 200, true}, + {"equal first min (inclusive)", 1, false}, + {"equal first max (exclusive)", 10, true}, + {"equal second min (inclusive)", 50, false}, + {"equal second max (exclusive)", 100, true}, + } + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + assert.Equal(t, c.orphan, f.Filter(bson.D{{"x", c.key}}, testNs)) + }) + } +} + +// range, compound shard key {a, b}: covers boundary cases that are most easy +// to get wrong in lexicographic comparison (this is exactly the layout from +// issue #978: "分片键为两个字段的联合分片"). +func TestOrphanFilter_RangeCompoundKey(t *testing.T) { + cm := mkRange(t, &sharding.ShardCollection{ + Keys: []string{"a", "b"}, ShardTypes: []string{sharding.RangedShard, sharding.RangedShard}, + Chunks: []*sharding.ChunkRange{ + // chunk: [{a:1, b:100}, {a:5, b:200}) + {Mins: []interface{}{1, 100}, Maxs: []interface{}{5, 200}}, + }, + }) + f := NewOrphanFilter("rs0", cm) + + cases := []struct { + name string + a, b int + orphan bool + }{ + {"amax_b", 5, 201, true}, + {"a>max_a", 6, 0, true}, + } + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + assert.Equal(t, c.orphan, f.Filter(bson.D{{"a", c.a}, {"b", c.b}}, testNs)) + }) + } +} + +// hashed shard: cover every key type supported by ComputeHash end-to-end +// (ComputeHash branch + Filter wiring). [MinInt64, MaxInt64] chunk so any +// hash lands inside — checks plumbing, not boundary math (the boundary +// math is exercised by TestOrphanFilter_HashedTightChunk below). +func TestOrphanFilter_HashedAllTypes(t *testing.T) { + cases := []struct { + name string + val interface{} + }{ + {"ObjectID", primitive.NewObjectID()}, + {"string", "hello-orphan"}, + {"int64", int64(1234567890)}, + {"int", 42}, + {"int32", int32(42)}, + {"float64", 3.14}, + {"bool/true", true}, + {"bool/false", false}, + {"DateTime", primitive.DateTime(1715000000000)}, + {"Timestamp", primitive.Timestamp{T: 1715000000, I: 7}}, + // nil shard-key values are exercised separately in + // TestComputeHash_NullKey because OrphanFilter.Filter itself + // can't distinguish "missing field" from "field with value + // null" (oplog.GetKey returns nil for both); pushing nil + // through Filter would panic in a way unrelated to this bug. + } + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + cm := mkRange(t, &sharding.ShardCollection{ + Keys: []string{"k"}, ShardTypes: []string{sharding.HashedShard}, + Chunks: keyAll, + }) + f := NewOrphanFilter("rs0", cm) + assert.False(t, f.Filter(bson.D{{"k", c.val}}, testNs), + "hashed key in [MinInt64, MaxInt64] chunk should never be orphan") + }) + } +} + +// hashed shard with a tight chunk around the computed hash: exercises the +// int64-precise comparison path in chunkLt/chunkGt that used to silently +// collapse via float64 (was TestOrphanFilter_HashedPrecisionBug, now +// fixed by introducing BsonTypeInt64). +func TestOrphanFilter_HashedTightChunk(t *testing.T) { + oid := primitive.NewObjectID() + hashed := ComputeHash(oid) + + in := mkRange(t, &sharding.ShardCollection{ + Keys: []string{"_id"}, ShardTypes: []string{sharding.HashedShard}, + Chunks: []*sharding.ChunkRange{ + {Mins: []interface{}{hashed - 1}, Maxs: []interface{}{hashed + 1}}, + }, + }) + assert.False(t, NewOrphanFilter("rs0", in).Filter(bson.D{{"_id", oid}}, testNs), + "hashed key inside ±1 chunk should not be orphan") + + out := mkRange(t, &sharding.ShardCollection{ + Keys: []string{"_id"}, ShardTypes: []string{sharding.HashedShard}, + Chunks: []*sharding.ChunkRange{ + {Mins: []interface{}{hashed + 100}, Maxs: []interface{}{hashed + 200}}, + }, + }) + assert.True(t, NewOrphanFilter("rs0", out).Filter(bson.D{{"_id", oid}}, testNs), + "hashed key below chunk min should be orphan") +} + +// hashed shard, no chunks held by this replset: every doc is orphan +// (verifies the "iterate-all-then-fall-through" path). +func TestOrphanFilter_HashedNoChunks(t *testing.T) { + cm := mkRange(t, &sharding.ShardCollection{ + Keys: []string{"k"}, ShardTypes: []string{sharding.HashedShard}, + Chunks: nil, + }) + f := NewOrphanFilter("rs0", cm) + assert.True(t, f.Filter(bson.D{{"k", primitive.NewObjectID()}}, testNs)) +} + +// Regression guard: hashed-shard chunk bounds at the extremes of int64 +// must compare exactly, not via float64 (which would collapse pairs of +// adjacent int64s above 2^53 to the same float). Picks a hash value far +// outside float64's contiguous-integer range so the assertion fails if +// the precision fix regresses. +func TestOrphanFilter_HashedInt64Precision(t *testing.T) { + // 2^62 + 1 and 2^62 - 1 are distinct int64 but collide in float64. + const target = int64(1)<<62 + 1 + cm := mkRange(t, &sharding.ShardCollection{ + Keys: []string{"k"}, ShardTypes: []string{sharding.HashedShard}, + Chunks: []*sharding.ChunkRange{ + {Mins: []interface{}{target}, Maxs: []interface{}{target + 1}}, + }, + }) + // A key whose computed hash is target-1 should be orphan (below min); + // we can't make ComputeHash produce target on demand, so instead test + // the chunkLt/chunkGt primitives directly to anchor the contract. + assert.True(t, chunkLt(target-1, target), "target-1 must be strictly < target") + assert.True(t, chunkGt(target+1, target), "target+1 must be strictly > target") + assert.False(t, chunkEqual(target-1, target), "target-1 must not equal target") + assert.False(t, chunkEqual(target+1, target), "target+1 must not equal target") + + // And the Filter wiring with a tight chunk around int64-precise bounds + // is exercised in TestOrphanFilter_HashedTightChunk above; keep this + // reference here so the next reader knows where to look. + _ = cm +} + +// BSON null is hashed with no value bytes (only the canonical type tag). +// Tested at the ComputeHash level because OrphanFilter.Filter conflates +// "field missing" with "field is null" via oplog.GetKey. +func TestComputeHash_NullKey(t *testing.T) { + // Should not panic, and should be deterministic / distinct from other + // hashes (we just check it doesn't panic and returns a stable value). + a := ComputeHash(nil) + b := ComputeHash(nil) + assert.Equal(t, a, b, "ComputeHash(nil) must be deterministic") + assert.NotEqual(t, a, ComputeHash(int64(0)), + "hash of null must differ from hash of int64(0)") +} + +// ComputeHash supports the BSON value types most commonly used as +// hashed-shard keys. Decimal128 / BinData / Symbol are intentionally not +// supported (see comment in ComputeHash). This test pins the panic so any +// future broadening of type support has to update the assertion and +// double-check the byte layout against MongoDB hasher.cpp. +func TestOrphanFilter_HashedUnsupportedTypePanics(t *testing.T) { + defer func() { + if r := recover(); r == nil { + t.Fatal("expected ComputeHash to panic on unsupported type, got no panic") + } + }() + ComputeHash(primitive.Decimal128{}) // not in the switch, must panic +} + +// getBsonType must recognise primitive.MinKey / primitive.MaxKey as they come +// out of bson.Unmarshal when reading config.chunks boundaries from MongoDB. +// The int64 sentinel path (math.MinInt64/MaxInt64) is kept for backwards compat +// with test code that constructs chunk ranges by hand. +func TestGetBsonType_PrimitiveMinMaxKey(t *testing.T) { + // Simulate what bson.Unmarshal produces for a chunk boundary containing MinKey/MaxKey. + type chunkBound struct { + X interface{} `bson:"x"` + } + // Encode MinKey, then decode — the round-tripped value is primitive.MinKey{}. + minDoc, err := bson.Marshal(bson.D{{"x", primitive.MinKey{}}}) + assert.NoError(t, err) + maxDoc, err := bson.Marshal(bson.D{{"x", primitive.MaxKey{}}}) + assert.NoError(t, err) + + var minResult, maxResult chunkBound + assert.NoError(t, bson.Unmarshal(minDoc, &minResult)) + assert.NoError(t, bson.Unmarshal(maxDoc, &maxResult)) + + // getBsonType must return BsonMinKey / BsonMaxKey + typ, val := getBsonType(minResult.X) + assert.Equal(t, BsonMinKey, typ) + assert.Nil(t, val) + + typ, val = getBsonType(maxResult.X) + assert.Equal(t, BsonMaxKey, typ) + assert.Nil(t, val) + + // Also verify the int64 sentinel still works (backward compat) + typ, _ = getBsonType(int64(math.MinInt64)) + assert.Equal(t, BsonMinKey, typ) + typ, _ = getBsonType(int64(math.MaxInt64)) + assert.Equal(t, BsonMaxKey, typ) +} + +// Compound hashed shard key {a: 1, b: "hashed"}: only `b` should be hashed +// before comparing to chunk bounds. Pre-fix Filter would hash every column +// whenever ShardType==HashedShard, so a=5 would become ComputeHash(5) and +// fall outside the [3, 7) ranged bound — silently flagging the doc as +// orphan. This test pins the per-column behaviour added in ShardTypes. +func TestOrphanFilter_CompoundHashed_RangedFirst(t *testing.T) { + cm := mkRange(t, &sharding.ShardCollection{ + Keys: []string{"a", "b"}, + ShardTypes: []string{sharding.RangedShard, sharding.HashedShard}, + Chunks: []*sharding.ChunkRange{ + // a in [3, 7), b covers the full hash space so b is irrelevant + // to the in/out decision — this isolates `a` is compared as-is. + { + Mins: []interface{}{3, int64(math.MinInt64)}, + Maxs: []interface{}{7, int64(math.MaxInt64)}, + }, + }, + }) + f := NewOrphanFilter("rs0", cm) + + // a=5 in [3, 7); ComputeHash(5) is almost certainly not in [3, 7), + // so this assertion fails the moment Filter starts hashing column 0. + assert.False(t, f.Filter(bson.D{{"a", 5}, {"b", "anything"}}, testNs), + "a in ranged bounds; only column 1 should be hashed") + assert.True(t, f.Filter(bson.D{{"a", 8}, {"b", "anything"}}, testNs), + "a outside ranged bounds — orphan") +} + +// Compound hashed shard key {a: "hashed", b: 1}: column 0 must be hashed, +// column 1 compared as-is. Pre-fix Filter inferred ShardType from the last +// key (ranged here) and therefore skipped hashing entirely; this test pins +// the inverse direction so the ShardTypes wiring stays correct both ways. +func TestOrphanFilter_CompoundHashed_HashedFirst(t *testing.T) { + aVal := "hashed-key" + hashA := ComputeHash(aVal) + cm := mkRange(t, &sharding.ShardCollection{ + Keys: []string{"a", "b"}, + ShardTypes: []string{sharding.HashedShard, sharding.RangedShard}, + Chunks: []*sharding.ChunkRange{ + // Pin a to hashA so the min-loop reaches b; b in [3, 7). + {Mins: []interface{}{hashA, 3}, Maxs: []interface{}{hashA, 7}}, + }, + }) + f := NewOrphanFilter("rs0", cm) + + assert.False(t, f.Filter(bson.D{{"a", aVal}, {"b", 5}}, testNs), + "a must be hashed to hashA, b must be compared as-is in [3, 7)") + assert.True(t, f.Filter(bson.D{{"a", aVal}, {"b", 10}}, testNs), + "b outside ranged bounds — orphan") +} + +// End-to-end test: range shard with real BSON MinKey/MaxKey boundaries (as +// decoded from config.chunks) must correctly classify docs as non-orphan. +func TestOrphanFilter_RangeWithBsonMinMaxKey(t *testing.T) { + // First/last chunks in a real sharded collection use MinKey/MaxKey. + cm := mkRange(t, &sharding.ShardCollection{ + Keys: []string{"x"}, ShardTypes: []string{sharding.RangedShard}, + Chunks: []*sharding.ChunkRange{ + {Mins: []interface{}{primitive.MinKey{}}, Maxs: []interface{}{50}}, + {Mins: []interface{}{50}, Maxs: []interface{}{primitive.MaxKey{}}}, + }, + }) + f := NewOrphanFilter("rs0", cm) + + // Everything should be non-orphan since chunks cover [MinKey, MaxKey) + cases := []struct { + name string + val interface{} + orphan bool + }{ + {"below zero", -100, false}, + {"zero", 0, false}, + {"at split", 50, false}, + {"above split", 99, false}, + } + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + assert.Equal(t, c.orphan, f.Filter(bson.D{{"x", c.val}}, testNs)) + }) + } +} diff --git a/collector/persister.go b/collector/persister.go index 58614920..d129d06d 100644 --- a/collector/persister.go +++ b/collector/persister.go @@ -240,12 +240,44 @@ func (p *Persister) updateBufferUsedMetric() { utils.PersisterBufferUsedRatioProm.WithLabelValues(p.replset, utils.TypeIncr).Set(utils.QueueUsedRatio(used, capacity)) } +// retrieve is the persister goroutine that drains the on-disk queue back +// into the in-memory pending queue once the full-sync stage flips to +// "apply". It is started from Persister.Start() iff enableDiskPersist +// (i.e. SyncMode=all AND FullSyncReaderOplogStoreDisk=true). fetchStage +// is initialised to FetchStageStoreUnknown in NewPersister and driven by +// loadCheckpoint() / StartDiskApply(). State machine observed here: +// +// Unknown ──► DiskNoApply ──► DiskApply ──► (this goroutine reads disk queue) ──► MemoryApply +// │ │ ▲ +// │ └──────────────┘ +// └────► MemoryApply (restart shortcut: ckpt already past OplogDiskQueueFinishTs; +// DiskQueue stays nil — handle by returning early below) +// +// The 3-second polling cadence is deliberately slow: this goroutine is +// idle for the entire document-replication phase (seconds to hours), so +// reaction latency to a stage flip is not a hot-path concern. func (p *Persister) retrieve() { - for range time.NewTicker(3 * time.Second).C { + waitTicker := time.NewTicker(3 * time.Second) + defer waitTicker.Stop() + waitRounds := 0 +Wait: + for range waitTicker.C { + waitRounds++ stage := atomic.LoadInt32(&p.fetchStage) switch stage { case utils.FetchStageStoreDiskApply: - break + l.Logger.Infof("persister retrieve for replset[%v] entered disk-apply stage after %d wait rounds", + p.replset, waitRounds) + break Wait + case utils.FetchStageStoreMemoryApply: + // loadCheckpoint() may set MemoryApply directly (without InitDiskQueue) + // when restart and checkpoint has already caught up to disk last ts. + // In that path DiskQueue is nil, so we must exit instead of falling + // through to the disk-read stage below. + l.Logger.Infof("persister retrieve for replset[%v] skip disk replay after %d wait rounds: "+ + "fetchStage is MemoryApply (restart with checkpoint caught up to OplogDiskQueueFinishTs)", + p.replset, waitRounds) + return case utils.FetchStageStoreUnknown: // do nothing case utils.FetchStageStoreDiskNoApply: @@ -258,6 +290,7 @@ func (p *Persister) retrieve() { l.Logger.Infof("persister retrieve for replset[%v] begin to read from disk queue with depth[%v]", p.replset, p.DiskQueue.Depth()) ticker := time.NewTicker(time.Second) + defer ticker.Stop() Loop: for { select { diff --git a/collector/reader/event_reader_test.go b/collector/reader/event_reader_test.go index 9d70064c..e773486d 100644 --- a/collector/reader/event_reader_test.go +++ b/collector/reader/event_reader_test.go @@ -15,9 +15,11 @@ import ( "github.com/alibaba/MongoShake/v2/unit_test_common" ) -const ( +var ( testMongoAddressCs = unit_test_common.TestUrlServerlessTenant +) +const ( testInsertTimes = 10 ) diff --git a/common/change_stream_test.go b/common/change_stream_test.go index a7c7174b..da7f86c4 100644 --- a/common/change_stream_test.go +++ b/common/change_stream_test.go @@ -9,7 +9,7 @@ import ( "github.com/alibaba/MongoShake/v2/unit_test_common" ) -const ( +var ( testMongoAddressCs = unit_test_common.TestUrlConfigServer ) diff --git a/common/common_test.go b/common/common_test.go index 04f3aa0a..fb573eef 100644 --- a/common/common_test.go +++ b/common/common_test.go @@ -72,7 +72,10 @@ func TestMongoConn(t *testing.T) { fmt.Printf("TestMongoConn case %d.\n", nr) nr++ - conn, err := NewMongoCommunityConn(testUrlSsl, VarMongoConnectModePrimary, true, "", "", "/Users/zhongli/workspace/ApsaraDB-CA-Chain.pem") + if testUrlSsl == "" || unit_test_common.TestCaPem == "" { + t.Skip("set MONGOSHAKE_TEST_URL_SSL and MONGOSHAKE_TEST_CA_PEM to run this case") + } + conn, err := NewMongoCommunityConn(testUrlSsl, VarMongoConnectModePrimary, true, "", "", unit_test_common.TestCaPem) assert.Equal(t, err, nil, "should be equal") assert.Equal(t, conn != nil, true, "should be equal") } diff --git a/common/community_client.go b/common/community_client.go index 29f528ab..b160c2ae 100644 --- a/common/community_client.go +++ b/common/community_client.go @@ -124,8 +124,8 @@ func NewMongoCommunityConn(url string, connectMode string, timeout bool, readCon case VarMongoConnectModeSecondaryPreferred: readPreference = readpref.SecondaryPreferred() case VarMongoConnectModeStandalone: - // TODO, no standalone, choose nearest - fallthrough + clientOps.SetDirect(true) + readPreference = readpref.Nearest() case VarMongoConnectModeNearset: readPreference = readpref.Nearest() default: diff --git a/common/db_opertion_test.go b/common/db_opertion_test.go index 3b492bdd..a041339f 100644 --- a/common/db_opertion_test.go +++ b/common/db_opertion_test.go @@ -14,10 +14,13 @@ import ( "github.com/alibaba/MongoShake/v2/unit_test_common" ) -const ( +var ( testMongoAddress = unit_test_common.TestUrl testUrlServerless = unit_test_common.TestUrlServerlessTenant - testDb = "test_db" +) + +const ( + testDb = "test_db" ) func TestGetAndCompareVersion(t *testing.T) { diff --git a/docs/fixes/fix-orphan-document-mongos-warn.md b/docs/fixes/fix-orphan-document-mongos-warn.md new file mode 100644 index 00000000..ecc83b64 --- /dev/null +++ b/docs/fixes/fix-orphan-document-mongos-warn.md @@ -0,0 +1,170 @@ +# 修复:mongos 模式下 full_sync.executor.filter.orphan_document 静默失效 + +## 关联 + +- [Issue #978](https://github.com/alibaba/MongoShake/issues/978) +- 用户配置同时填了 `mongo_urls`、`mongo_cs_url`、`mongo_s_url`,开了 + `full_sync.executor.filter.orphan_document = true`,但孤儿没被过滤、 + 全量阶段撞 E11000 dup key + +## 根因 + +`cmd/collector/collector.go:138-151`:只要 `mongo_s_url` 非空, +`MongoS` 就被赋值,`RealSourceFullSync = [MongoS]`,**全量数据源走 mongos**, +`mongo_urls` 在全量阶段被忽略。 + +`coordinator/full.go:68`: + +```go +if fromIsSharding && coordinator.MongoS == nil { + shardingChunkMap, err = fetchChunkMap(...) // ← MongoS != nil 时跳过 +} +``` + +后续 `coordinator/full.go:162`:`shardingChunkMap == nil` → +`orphanFilter` 不创建 → `doc_executor.go:192` 的 `orphanFilter != nil` +判定不通过 → **过滤被静默跳过,没有任何日志告诉用户**。 + +## 为什么不能简单地"让 mongos 模式也支持 chunk map" + +OrphanFilter 的语义是「文档的 shard key 是否落在**当前 shard 持有的** +chunk 范围内」。mongos 模式下 MongoShake 从 mongos 读出的文档不带 +「来自哪个 shard」的元信息,把所有 shard 的 chunks 合并起来覆盖整个 key +space → 任何文档都会落在某个 chunk 内 → 永远不会被判为孤儿。**所以加 +chunkMap 没用,唯一务实修复就是把"配置无效"的事实暴露出来**。 + +## 修复内容 + +### A. `coordinator/full.go`:mongos + orphan_document=true 时 WARN + +```go +if fromIsSharding && coordinator.MongoS != nil && + conf.Options.FullSyncExecutorFilterOrphanDocument { + l.Logger.Warnf("full_sync.executor.filter.orphan_document=true is set but "+ + "source is mongos (mongo_s_url configured); the orphan filter will NOT take "+ + "effect because docs read via mongos are not tagged with their origin shard. "+ + "To enable orphan filtering, drop mongo_s_url and use mongo_urls (mongod direct) "+ + "plus mongo_cs_url, or run cleanupOrphaned on the source cluster before full sync.") +} +``` + +### B. `doc_executor.go`:unmarshal 失败后 continue + +修复前: + +```go +if err := bson.Unmarshal(*doc, &docData); err != nil { + l.Logger.Errorf(...) + // ⚠ 没 continue,空 docData 接着进 Filter +} +if exec.syncer.orphanFilter.Filter(docData, ns) { ... } +``` + +空 `docData` 进 `Filter` 后,`oplog.GetKey(docD, keyName)` 返回 nil, +触发 `OrphanFilter.Filter` 第 55 行的 `Panicf("OrphanFilter find no +shard key[%v] in doc %v")` 直接挂进程。 + +修复后用 `if err != nil ... else if Filter(...) ...` 把两条路径分开, +unmarshal 失败时跳过 orphan check 让损坏 payload 在下游 BulkWrite 给出 +结构化错误,而不是 panic 整个 syncer。 + +### C. 用户侧 workaround(不改代码也能让 #978 用户立刻可用) + +注释掉 `mongo_s_url`,只留 `mongo_urls`(mongod 直连)+ `mongo_cs_url`: + +``` +mongo_urls = mongodb://...;mongodb://...;mongodb://...;mongodb://... +mongo_cs_url = mongodb://... +# mongo_s_url = mongodb://... <-- 注释掉 +``` + +## 测试 + +### 单元测试 — 新增 `collector/filter/orphan_filter_test.go` + +| 测试 | 覆盖 | +|---|---| +| `TestOrphanFilter_NilChunkMap` | chunkMap 为 nil → 不过滤 | +| `TestOrphanFilter_NamespaceMissing` | namespace 不在 chunkMap → 不过滤 | +| `TestOrphanFilter_RangeSingleKey` | 单 key 9 个边界(in/out/min/max 包含与排他) | +| `TestOrphanFilter_RangeCompoundKey` | **联合 key 10 个边界**(即 #978 用户场景的 shard key 形态) | +| `TestOrphanFilter_HashedAllTypes` | hashed 分片 ObjectID/string/int64 三种 key type | +| `TestOrphanFilter_HashedNoChunks` | 空 Chunks → 任何文档都是孤儿 | +| `TestOrphanFilter_HashedPrecisionBug` | **SKIP,记录 latent bug**(详见下一节) | +| `TestOrphanFilter_HashedUnsupportedTypePanics` | bool 等类型 panic 行为 pin 住 | + +### 集成测试雏形 — 新增 `tests/integration/sharded/` + +`docker-compose.yml` + `setup_cluster.sh` + `inject_orphan.py` + +`verify_orphan_filter.py`,覆盖三种配置: + +| Case | Source | `orphan_document` | 期望 | +|---|---|---|---| +| A | `mongo_urls` only | `true` | 目标端正好 2 docs(孤儿过滤生效) | +| B | `mongo_urls` + `mongo_s_url`(#978 配置) | `true` | 目标端 < 2 docs **且** 日志含 WARN line | +| C | `mongo_urls` only | `false` | baseline 失败(dup-key panic) | + +详见 `tests/integration/sharded/README.md`。雏形阶段,**未接 CI**, +跑一遍要 docker + pymongo,仅作为本次修复的回归基线。 + +## 顺手发现的几个 latent bug(不在本次修复范围) + +记录在此供 follow-up,建议另开 issue: + +### latent bug 1:`OrphanFilter` chunkLt/chunkGt 对 int64 hashed bound 丢精度 + +`collector/filter/orphan_filter.go:219` `getBsonType`: + +```go +case int64: + return BsonTypeNumber, float64(rx) // ← 把 int64 cast 成 float64 +``` + +后续 `chunkLt/chunkGt` 都是 `float64` 比较。但 hashed 分片的 chunk +min/max 是 int64 hash 值,超出 `[-2^53, 2^53]` 范围会丢精度。 +`hashed - 1`、`hashed`、`hashed + 1` 三个相邻 int64 在 float64 表示下 +可能折叠成同一个值,造成边界判定错误。 + +`TestOrphanFilter_HashedPrecisionBug` 用 `t.Skip()` 显式记录此问题, +便于将来扩展时不漏掉。 + +### latent bug 2:`ComputeHash` 类型支持不完整 + +`collector/filter/orphan_filter.go:94-135`:当前仅支持 +`string` / `int` / `int32` / `int64` / `float64` / `ObjectID`。 + +对比 MongoDB 内核 `hasher.cpp`(**4.0 与 8.0 的实现完全一致**: +MD5(seed‖canonicalType‖value),所有数字类型经 `safeNumberLongForHash` +归一化为 int64 再 hash),仍缺: + +- `NumberDecimal`(4.4+ 可作 hashed shard key) +- `Date`、`Bool`、`bsonTimestamp` +- `BinData`、`null`、`Symbol` + +不支持类型走 default 分支 `Panicf`。MongoShake 主流场景(hashed `_id` +为 ObjectID 或 string)已覆盖,扩展见单独 issue。 + +`float64` 当前用 `uint64(rd3)` 直接 cast 处理负数 / NaN / Inf 与内核 +`safeNumberLongForHash` 行为不完全一致,但实际 hashed shard key 用浮点 +极罕见,遗留待修。 + +### latent bug 3:`mongos` 模式下 OrphanFilter 架构上不可用 + +如「为什么不能简单地"让 mongos 模式也支持 chunk map"」节所述,需要 +重新设计才能在 mongos 模式下做孤儿过滤(例如:让 mongos 把 +`$shardName` 注入 cursor metadata,或者让 MongoShake 切换到从每个 +shard 直连的辅助通道)。本次仅以 WARN 提示用户当前限制。 + +## 改动文件清单 + +| 文件 | 类型 | 改动 | +|---|---|---| +| `collector/coordinator/full.go` | 修改 | mongos + orphan_document=true 时 WARN | +| `collector/docsyncer/doc_executor.go` | 修改 | unmarshal 失败时 skip orphan check | +| `collector/filter/orphan_filter_test.go` | 新增 | 8 个 OrphanFilter 单元测试 | +| `tests/integration/sharded/docker-compose.yml` | 新增 | sharded cluster 集成测试拓扑 | +| `tests/integration/sharded/setup_cluster.sh` | 新增 | replset / addShard / stopBalancer | +| `tests/integration/sharded/inject_orphan.py` | 新增 | 构造孤儿场景 | +| `tests/integration/sharded/verify_orphan_filter.py` | 新增 | 三种配置自动化校验 | +| `tests/integration/sharded/README.md` | 新增 | 集成测试使用说明 | +| `docs/fix-orphan-document-mongos-warn.md` | 新增 | 本 spec | diff --git a/docs/fixes/fix-orphan-filter-latent-bugs.md b/docs/fixes/fix-orphan-filter-latent-bugs.md new file mode 100644 index 00000000..966c06f6 --- /dev/null +++ b/docs/fixes/fix-orphan-filter-latent-bugs.md @@ -0,0 +1,93 @@ +# 修复:OrphanFilter int64 精度丢失 + ComputeHash 类型覆盖不全 + +## 关联 + +- 接续 [`fix-orphan-document-mongos-warn.md`](./fix-orphan-document-mongos-warn.md) 中遗留的两个 latent bug +- 与 [Issue #978](https://github.com/alibaba/MongoShake/issues/978) 间接相关:用户报告的是 range 联合 key 场景,但同函数下 hashed 分片路径有这两处问题,本次一并清掉 + +## Bug 1:chunkLt / chunkGt 对 int64 chunk bound 丢精度 + +### 现象 + +`collector/filter/orphan_filter.go:229` `getBsonType()` 把 int64 一律 cast 成 float64,下游 `chunkLt / chunkGt / chunkEqual` 用 float64 比较。但 float64 只能精确表示 `[-2^53, 2^53]` 内的整数,hashed 分片的 chunk min/max 是**全范围 int64 hash 值**,超出 2^53 后,相邻 int64(甚至差几十)会折叠到同一个 float64: + +```go +// 修复前可重现: +var v int64 = 1<<62 + 1 +float64(v) == float64(v-1) // true (false positive) +``` + +后果:hashed 分片下文档的 hashed key 与 chunk min/max 比较时,**严格小于/大于会被误判为相等**,造成边界判定错误,孤儿可能被放过,正常文档可能被当孤儿过滤掉。 + +### 修复 + +引入 `BsonTypeInt64`,让 int64 走 int64 精确比较;int64+float64 混合时再 promote 到 float64(混合 case 在 BSON shard key 实际配置中几乎没有): + +```go +case int64: + return BsonTypeInt64, rx // keep raw int64 + +func chunkLt(x, y interface{}) bool { + xType, rx := getBsonType(x) + yType, ry := getBsonType(y) + if numericType(xType) && numericType(yType) { + return numericCmp(xType, rx, yType, ry) < 0 + } + ... +} + +func numericCmp(...) int { + if xType == BsonTypeInt64 && yType == BsonTypeInt64 { + // exact int64 comparison + } + // else promote to float64 (loses precision; acceptable fallback) +} +``` + +### 测试 + +- 解除上次 commit 里 `TestOrphanFilter_HashedPrecisionBug` 的 `t.Skip`,改写成 `TestOrphanFilter_HashedTightChunk`(hashed key 在 ±1 chunk 内 / 不在 +100~+200 chunk 外)+ `TestOrphanFilter_HashedInt64Precision`(直接验证 `chunkLt/chunkGt/chunkEqual` 在 `1<<62 + 1` 附近的严格语义) + +## Bug 2:ComputeHash 类型覆盖不全 + +### 现象 + +`ComputeHash` 之前仅支持 `string / int / int32 / int64 / float64 / primitive.ObjectID`。其它合法 hashed shard key 类型(`bool`、`Date`、`Timestamp`、`null`、`NumberDecimal`、`BinData`、`Symbol`)走 default 直接 `Panicf`。生产里 `bool` / `Date` / `Timestamp` 都是合法但偶尔有人用的 shard key。 + +### 修复 + +对照 MongoDB 内核 `hasher.cpp`([已确认 4.0 与 8.0 算法完全一致](./fix-orphan-document-mongos-warn.md#latent-bug-2))补 4 种最常用类型: + +| 类型 | 内核 raw value 编码 | MongoShake 实现 | +|---|---|---| +| `bool` | 1 字节 (0x00/0x01) | ✅ 新增 | +| `primitive.DateTime` | int64 ms LE 8 字节 | ✅ 新增 | +| `primitive.Timestamp` | uint32(I) LE + uint32(T) LE = 8 字节 | ✅ 新增 | +| `nil` (BSON null) | 0 字节(只有 canonical type) | ✅ 新增 | +| `primitive.Decimal128` | 复杂:走 `safeNumberLongForHash` 归一化到 int64,但要正确处理 NaN/Inf/超范围 | ❌ 仍 panic,附带说明性 message | +| `primitive.Binary` | int32(len) + byte(subtype) + bytes | ❌ 仍 panic,hashed key 用 binary 极罕见 | +| `Symbol` | 同 String;已 deprecated | ❌ 仍 panic | + +Decimal128 实现复杂且与内核 `safeNumberLongForHash` 的边界行为需要精确复刻才能在 NaN/Inf/超范围时不与 server 端 hash 出现微妙偏差,**收益远低于风险**,保留 panic 但 message 改成 `(Decimal128 / BinData / Symbol are intentionally not implemented; open an issue if you actually hit one)`,便于后续遇到时定位。 + +顺便把数字 case 拆开重写:之前 `int/int64/float64` 共用一段 type-assertion 链 + `uint64(float64)` 直接 cast。新写法每个类型一段,`int64(float64)` 保持与内核 `safeNumberLongForHash` 对 finite in-range 值的语义一致;NaN/Inf 与超 int64 范围的 float64 输入会与 server 不一致,但 float64 hashed shard key 极罕见,文档里记一笔不阻塞修复。 + +### 测试 + +- `TestOrphanFilter_HashedAllTypes` 从 3 类扩到 11 类:ObjectID/string/int64/int/int32/float64/bool×2/DateTime/Timestamp +- 新增 `TestComputeHash_NullKey` 单独验证 nil case(因为 `OrphanFilter.Filter` 上游 `oplog.GetKey` 把"null 值"和"字段缺失"等同,没法在 Filter 层测) +- `TestOrphanFilter_HashedUnsupportedTypePanics` 改用 `primitive.Decimal128{}` 验证 panic 仍触发 + +## 未在本次范围内的事 + +- **OrphanFilter.Filter 把 `nil` 与"字段缺失"等同**(`oplog.GetKey` 返回 nil → Filter 直接 Panicf):是 Filter 自己的语义问题,与 ComputeHash 解耦。修起来要改 `oplog.GetKey` / `GetKeyWithIndex` 的返回签名,影响范围广,未做。 +- **Decimal128 hashed shard key 支持**:见上文权衡,留作 follow-up。 +- **mongos 模式 OrphanFilter 架构性不可用**:不是 latent bug,是已知架构限制。上一次 commit (`6ea9a2e`) 已经通过 WARN 暴露,无新动作。 + +## 改动文件清单 + +| 文件 | 类型 | 改动 | +|---|---|---| +| `collector/filter/orphan_filter.go` | 修改 | 引入 BsonTypeInt64/Bool/Date/Tstamp/Null 常量;getBsonType 保留 int64 原值;chunkLt/Gt/Equal 走 numericCmp 精确比较;ComputeHash 补 bool/Date/Timestamp/nil 四种类型 | +| `collector/filter/orphan_filter_test.go` | 修改 | 解除 PrecisionBug skip 改成 HashedInt64Precision / HashedTightChunk 真测;HashedAllTypes 扩到 11 case;新增 TestComputeHash_NullKey;unsupported-type 测试用 Decimal128 | +| `docs/fix-orphan-filter-latent-bugs.md` | 新增 | 本 spec | diff --git a/docs/fixes/fix-persister-retrieve-break-loop.md b/docs/fixes/fix-persister-retrieve-break-loop.md new file mode 100644 index 00000000..f375a6aa --- /dev/null +++ b/docs/fixes/fix-persister-retrieve-break-loop.md @@ -0,0 +1,89 @@ +# 修复:oplog_store_disk=true 时增量阶段永远不 apply + +## 需求描述 + +**来源:** [GitHub Issue #976](https://github.com/alibaba/MongoShake/issues/976) + +**现象:** 配置 `full_sync.reader.oplog_store_disk = true` 后,全量同步完成、切到增量阶段,oplog 被持续 `get` 并写入磁盘队列,但完全不 apply: + +``` +[INFO] ------------------------full sync done!------------------------ +[INFO] finish document replication, change oplog replication to store disk and apply +[INFO] persister replset[mongos] update fetch status to: store disk and apply +[INFO] [name=mongos, stage=incr, get=167326550, filter=0, write_success=0, tps=0, + ckpt_times=0, lsn_ckpt={0[0, 0], 1970-01-01 08:00:00}, lsn_ack={0[0, 0], 1970-01-01 08:00:00}] +``` + +`get` 数持续增长但 `write_success` 始终为 0,`lsn_ckpt`/`lsn_ack` 始终为零值。 + +**预期行为:** 增量阶段进入 `FetchStageStoreDiskApply` 后,应该从磁盘队列读出 oplog 并 push 到 pending queue 让 worker apply。 + +## 根因分析 + +问题在 `collector/persister.go` 的 `retrieve()` 函数(issue 报告者已正确指出)。 + +`Persister.Start()` 在启用 disk persist 时会启动 `retrieve()` goroutine,其设计意图是: + +1. **等待阶段**:fetchStage 为 `FetchStageStoreUnknown` 或 `FetchStageStoreDiskNoApply` 时空转等待 +2. **触发阶段**:fetchStage 变为 `FetchStageStoreDiskApply` 时跳出 wait 循环 +3. **执行阶段**:跳出后从 `DiskQueue` 读 oplog 推到 pending queue,最后切到 `FetchStageStoreMemoryApply` + +但 wait 循环写成了: + +```go +for range time.NewTicker(3 * time.Second).C { + stage := atomic.LoadInt32(&p.fetchStage) + switch stage { + case utils.FetchStageStoreDiskApply: + break // ← Go 中 switch 里的 break 只跳出 switch,不跳出 for + case utils.FetchStageStoreUnknown: + case utils.FetchStageStoreDiskNoApply: + default: + l.Logger.Panicf(...) + } +} +// ↓ 下面的"读盘 + apply"代码永远不可达 +l.Logger.Infof("persister retrieve for replset[%v] begin to read from disk queue ...", ...) +``` + +Go 的语义里,`switch` 内的 `break` 只跳出 `switch` 块。所以 `for range time.NewTicker(...).C` 永远空转,下面的执行阶段代码完全不可达 → 磁盘里的 oplog 没人读 → `write_success` 永远 0。 + +## 修复方案 + +最小改动:给 wait 循环加 `Wait:` label,把 `break` 改为 `break Wait`。与同一个文件第 257 行 `Loop:` + `break Loop` 风格一致。 + +```go +func (p *Persister) retrieve() { +Wait: + for range time.NewTicker(3 * time.Second).C { + stage := atomic.LoadInt32(&p.fetchStage) + switch stage { + case utils.FetchStageStoreDiskApply: + break Wait // 跳出整个 for + case utils.FetchStageStoreUnknown: + case utils.FetchStageStoreDiskNoApply: + default: + l.Logger.Panicf("invalid fetch stage[%v]", utils.LogFetchStage(stage)) + } + } + // ↓ 现在可达 + l.Logger.Infof("persister retrieve for replset[%v] begin to read from disk queue ...", ...) + ... +} +``` + +**改动文件:** `collector/persister.go`(1 处,新增 `Wait:` label + 把 `break` 改为 `break Wait`) + +## 影响范围 + +- 仅影响 `full_sync.reader.oplog_store_disk = true` 配置下的增量阶段启动逻辑 +- 默认配置(`oplog_store_disk = false`)不走该路径,不受影响 +- 修复后行为与代码注释和原始设计意图一致 + +## 测试 + +`go build ./...` 三平台(linux/darwin/windows)通过。`TestBatchMoreApplyOpsInheritsSourceTime`、`TestInject` 通过。 + +未新增针对 `retrieve()` 的单测:bug 本质是一处 `break` typo,加上 `Wait:` label 后 Go 语法保证 `break Wait` 必然跳出 for(编译器层面拒绝歧义);为单测一个 break 语句去 mock 整个 `DiskQueue` 收益与成本不匹配,违反 surgical changes 原则。 + +`go test ./collector/` 整体 timeout 与本修复无关——baseline(移除本修复)同样在 180s 时因 `common/metric.go:121` 的 goroutine 不退出而超时,是预先存在的测试基础设施问题。 diff --git a/docs/fixes/fix-persister-retrieve-restart-panic.md b/docs/fixes/fix-persister-retrieve-restart-panic.md new file mode 100644 index 00000000..8f44a723 --- /dev/null +++ b/docs/fixes/fix-persister-retrieve-restart-panic.md @@ -0,0 +1,97 @@ +# 修复:oplog_store_disk=true 重启场景下 persister.retrieve goroutine panic + +## 关联 + +- 本次为 [#976](https://github.com/alibaba/MongoShake/issues/976) 的 follow-up 修复,与上一个 spec [`fix-persister-retrieve-break-loop.md`](./fix-persister-retrieve-break-loop.md) 一并解决 `full_sync.reader.oplog_store_disk = true` 配置下增量同步的可靠性问题。 +- 隐藏 bug 在 review 上一个修复(`b590dec`)时通过追状态机调用方发现,并非由该修复引入。 + +## 需求描述 + +**现象:** 开启 `full_sync.reader.oplog_store_disk = true`,MongoShake 完成一轮全量+增量同步后**重启**,且重启时 checkpoint 已经追上 `OplogDiskQueueFinishTs`(也就是磁盘里残留的 oplog 全部应用过了)。 + +启动后 `Persister.retrieve()` goroutine 在第一次 3 秒 ticker fire 时通过 `l.Logger.Panicf("invalid fetch stage[%v]", ...)` 直接 panic,导致进程退出无法启动。 + +## 根因分析 + +调用顺序(`collector/syncer.go:211-235`): + +``` +OplogSyncer.Start() +├─ sync.persister.Start() // line 218 → 启动 retrieve goroutine +│ // 只看 enableDiskPersist (构造时算定,依赖 conf.Options.FullSyncReaderOplogStoreDisk) +└─ sync.loadCheckpoint() // line 233 → 之后才设置 fetchStage 和 (可选) InitDiskQueue +``` + +`loadCheckpoint()` 在 `collector/checkpoint.go:62-66` 有一条分支: + +```go +if checkpoint.OplogDiskQueueFinishTs > 0 && checkpoint.Timestamp >= checkpoint.OplogDiskQueueFinishTs { + // no need to init disk queue again + sync.persister.SetFetchStage(utils.FetchStageStoreMemoryApply) + return nil + // ⚠ 没有 InitDiskQueue,DiskQueue 仍为 nil +} +``` + +但 `enableDiskPersist` 在 `NewPersister()` 时就已经算出 `true`(仅看 `FullSyncReaderOplogStoreDisk` 配置,不看运行时状态),所以 `Persister.Start()` 依然启动了 `retrieve` goroutine。3 秒后 goroutine 第一次 `atomic.LoadInt32(&p.fetchStage)` 读到 **`FetchStageStoreMemoryApply`**: + +```go +switch stage { +case utils.FetchStageStoreDiskApply: // 不匹配 +case utils.FetchStageStoreUnknown: // 不匹配 +case utils.FetchStageStoreDiskNoApply: // 不匹配 +default: + l.Logger.Panicf("invalid fetch stage[%v]", ...) // ← 直接 panic +} +``` + +这正是 `checkpoint.go:68` 上 `// TODO, there is a bug if MongoShake restarts` 注释所暗示场景之一。 + +**修复前 #976 的关系:** 上一个修复(`b590dec`)解决的是 `case FetchStageStoreDiskApply: break` 跳不出 for-range 的死循环 bug;本次的 panic 在该修复前后行为一致(default 分支始终 Panicf),并非由 `break Wait` 改动引入,是一个独立但同函数内的设计漏洞。 + +## 修复方案 + +在 `retrieve()` 的 wait 循环中显式处理 `FetchStageStoreMemoryApply`:直接 `return` 让 goroutine 安全退出,避免落入下方依赖 `p.DiskQueue` 非 nil 的代码段。 + +```go +Wait: + for range time.NewTicker(3 * time.Second).C { + stage := atomic.LoadInt32(&p.fetchStage) + switch stage { + case utils.FetchStageStoreDiskApply: + break Wait + case utils.FetchStageStoreMemoryApply: + // loadCheckpoint() may set MemoryApply directly (without InitDiskQueue) + // when restart and checkpoint has already caught up to disk last ts. + // In that path DiskQueue is nil, so we must exit instead of falling + // through to the disk-read stage below. + l.Logger.Infof("persister retrieve for replset[%v] skip disk replay: fetchStage is MemoryApply", + p.replset) + return + case utils.FetchStageStoreUnknown: + case utils.FetchStageStoreDiskNoApply: + default: + l.Logger.Panicf("invalid fetch stage[%v]", utils.LogFetchStage(stage)) + } + } +``` + +**改动文件:** `collector/persister.go`(仅在 wait 循环中新增一个 case 分支) + +## 影响范围 + +- 仅影响 `full_sync.reader.oplog_store_disk = true` 且 **进程重启** 后 checkpoint 已追上 disk queue 的场景 +- 正常启动路径(无 checkpoint → DiskNoApply → DiskApply)行为不变 +- 不修改外部 API、不修改状态机定义,仅让 retrieve goroutine 对一个本就合法但未处理的状态做正确响应 + +## 测试 + +- `go build ./...` 三平台通过 +- 未新增针对 `retrieve()` 的单测;理由同上一个 spec:mock 整个 DiskQueue/状态机来覆盖一个 case 分支收益与成本不匹配。若后续要在 `retrieve()` 之上叠加更多逻辑,建议先抽出 `waitForDiskApply()` helper 再补测 + +## 遗留问题与建议的新 Issue 草稿 + +本修复消除了重启 panic,但 `retrieve()` 还有几处**与本次改动无关、但同函数内可优化**的点,建议另开 issue 跟进: + +1. `for range time.NewTicker(3*time.Second).C` 与下方 `ticker := time.NewTicker(time.Second)` 均未 `Stop()`,break/return 后会在 timer heap 中残留至函数返回 +2. wait 循环采用 3 秒 ticker 轮询而非 channel 通知,状态变更后最长需等 3 秒才感知;可考虑由 `SetFetchStage` 通过 `chan struct{}` 主动唤醒 diff --git a/docs/refactor-test-config-env-vars.md b/docs/refactor-test-config-env-vars.md new file mode 100644 index 00000000..dc1e42fc --- /dev/null +++ b/docs/refactor-test-config-env-vars.md @@ -0,0 +1,60 @@ +# Refactor: 测试连接信息改用环境变量注入 + +## Why + +仓库里两个治理点: + +1. `unit_test_common/include.go` 中以 `const` 形式硬编码了公司内网测试集群的真实 URI(含明文用户名 / 密码 / 内网 IP / 外网 IP),随源码分发到任何 fork / 镜像仓库都会泄漏。 + +2. `common/common_test.go` 的 SSL 用例把作者本地的 CA pem 绝对路径写死成字面量,其他开发者跑 `TestMongoConn` 必然 file-not-found。 + +## What + +- `unit_test_common/include.go`:`const` → `var`,统一从环境变量读取;加 `envOr(key, fallback)` helper;新增 `TestCaPem` 作为 CA 路径的统一出口。 +- `common/common_test.go`:删掉个人本地路径字面量,改读 `unit_test_common.TestCaPem`;SSL/CA 缺失时 `t.Skip` 而不是假装失败。 + +### 环境变量列表 + +| 变量 | 默认 fallback | 说明 | +|---|---|---| +| `MONGOSHAKE_TEST_URL` | `mongodb://localhost:27017` | 普通副本集 URI | +| `MONGOSHAKE_TEST_URL_SSL` | `""` | TLS URI,未设置时 SSL 测试 Skip | +| `MONGOSHAKE_TEST_URL_CONFIG_SERVER` | `""` | 分片集群 config server URI | +| `MONGOSHAKE_TEST_URL_SERVERLESS` | `""` | serverless tenant URI | +| `MONGOSHAKE_TEST_URL_SHARDING` | `""` | mongos URI | +| `MONGOSHAKE_TEST_CA_PEM` | `""` | CA chain pem 文件路径 | + +只有 `MONGOSHAKE_TEST_URL` 给了 `localhost:27017` fallback —— 这是开发者本地最常见的形态。其余项空值,连接型用例自检并 `t.Skip`,避免在 CI / 无环境的机器上爆出无意义失败。 + +### 本地开发 / CI 使用方式 + +本地使用 `.env.test`(仓库根目录,已加入 `.gitignore`,不入仓)保存自己的连接信息: + +```bash +source .env.test && go test ./... +``` + +或临时手动导: + +```bash +export MONGOSHAKE_TEST_URL='mongodb://...' +export MONGOSHAKE_TEST_URL_SSL='mongodb://...' +export MONGOSHAKE_TEST_CA_PEM='/path/to/ca.pem' +go test ./... +``` + +CI(不依赖外部 mongo):不导任何 env var,连接型用例自动 Skip。 + +## Scope(本次不做) + +- **git history 脱敏**:相关内网 / 外网 IP 已经存在于历史 commit 中。`git filter-repo` 重写历史需要强推 + 所有协作者重新 clone,风险大于收益。本次只阻断后续泄漏;如果安全团队后续要求清理历史,单独立项。 +- **其他 `mongodb://...` 字面量**:`community_client_test.go` 是 `BlockMongoUrlPassword` 的输入数据(不是连接串),`sanitize_test.go` / `collector_test.go` / `prom_metrics_test.go` 是占位字符串走单元逻辑分支 —— 这些不是"硬编码环境信息",本次不动。 + +## Verification + +```bash +go build ./... +go vet ./... +# 未导 env var 时,连接型 SSL 用例应输出 SKIP 而非 FAIL +go test ./common -run TestMongoConn -v +``` diff --git a/executor/db_writer_test.go b/executor/db_writer_test.go index 29210619..3c5ec905 100644 --- a/executor/db_writer_test.go +++ b/executor/db_writer_test.go @@ -21,11 +21,14 @@ import ( "github.com/alibaba/MongoShake/v2/unit_test_common" ) -const ( +var ( testMongoAddress = unit_test_common.TestUrl testMongoShardingAddress = unit_test_common.TestUrlSharding - testDb = "writer_test" - testCollection = "a" +) + +const ( + testDb = "writer_test" + testCollection = "a" ) func mockDeleteOplogRecord(oId interface{}) *OplogRecord { @@ -2779,11 +2782,23 @@ func TestResolveConflictFilter(t *testing.T) { assert.Equal(t, bson.D{{"address.city", "shanghai"}}, filter) } +// dupKeyErrLike returns a real mongo.WriteException with code 11000 and the +// given E11000 message — the same error shape that mongo driver returns from +// a real failing insert. Tests that exercise handleDupKeyOnInsert / +// shouldSkipDupKeyOnInsert must use this rather than fmt.Errorf, since +// utils.DuplicateKey -> mongo.IsDuplicateKeyError checks the ServerError +// interface (HasErrorCode 11000), not the error string. +func dupKeyErrLike(msg string) error { + return mongo.WriteException{ + WriteErrors: mongo.WriteErrors{{Code: 11000, Message: msg}}, + } +} + func TestShouldSkipDupKeyOnInsert(t *testing.T) { origin := conf.Options defer func() { conf.Options = origin }() - err := fmt.Errorf(`E11000 duplicate key error collection: db.coll index: x_1 dup key: { x: 1 }`) + err := dupKeyErrLike(`E11000 duplicate key error collection: db.coll index: x_1 dup key: { x: 1 }`) conf.Options = conf.Configuration{ IncrSyncExecutorDupKeyStrategy: utils.VarIncrSyncExecutorDupKeyStrategySkip, IncrSyncExecutorDupKeySkipRulesMap: map[string]map[string]struct{}{ @@ -2814,7 +2829,7 @@ func TestShouldSkipDupKeyOnInsert(t *testing.T) { assert.True(t, shouldSkipDupKeyIndex("db", "coll", "x_1"), "should be equal") assert.False(t, shouldSkipDupKeyIndex("db", "other", "x_1"), "should be equal") - idErr := fmt.Errorf(`E11000 duplicate key error collection: db.coll index: _id_ dup key: { _id: 1 }`) + idErr := dupKeyErrLike(`E11000 duplicate key error collection: db.coll index: _id_ dup key: { _id: 1 }`) skip, indexName = shouldSkipDupKeyOnInsert("db", "coll", idErr) assert.False(t, skip, "_id duplicate is handled by writer-level already-applied logic") assert.Equal(t, "_id_", indexName, "should be equal") @@ -2824,7 +2839,7 @@ func TestHandleDupKeyOnInsertStrategy(t *testing.T) { origin := conf.Options defer func() { conf.Options = origin }() - err := fmt.Errorf(`E11000 duplicate key error collection: db.coll index: x_1 dup key: { x: 1 }`) + err := dupKeyErrLike(`E11000 duplicate key error collection: db.coll index: x_1 dup key: { x: 1 }`) conf.Options = conf.Configuration{IncrSyncExecutorDupKeyStrategy: utils.VarIncrSyncExecutorDupKeyStrategyIgnore} assert.NoError(t, handleDupKeyOnInsert(nil, "db", "coll", nil, err, "test"), "should be equal") @@ -2844,6 +2859,17 @@ func TestHandleDupKeyOnInsertStrategy(t *testing.T) { "db.coll": {"y_1": {}}, } assert.Error(t, handleDupKeyOnInsert(nil, "db", "coll", nil, err, "test"), "should be equal") + + // Non-dup-key errors must always be returned untouched, regardless of + // strategy — the routing in handleDupKeyOnInsert depends on + // IsDuplicateKeyError, so a server-side error of a different code must + // not be swallowed. + conf.Options = conf.Configuration{IncrSyncExecutorDupKeyStrategy: utils.VarIncrSyncExecutorDupKeyStrategyIgnore} + otherErr := mongo.WriteException{ + WriteErrors: mongo.WriteErrors{{Code: 121, Message: "DocumentValidationFailure"}}, + } + assert.Error(t, handleDupKeyOnInsert(nil, "db", "coll", nil, otherErr, "test"), + "non-dup-key errors must propagate even under Ignore") } func TestSingleWriterDeleteOnNonIdDupKey(t *testing.T) { diff --git a/executor/duplicate_skip.go b/executor/duplicate_skip.go index 70b8eea2..43d8e725 100644 --- a/executor/duplicate_skip.go +++ b/executor/duplicate_skip.go @@ -16,6 +16,16 @@ func shouldSkipDupKeyOnInsert(database, collection string, err error) (bool, str return false, "" } + // _id duplicates carry distinct semantics: an oplog being re-applied + // after a checkpoint replay produces an _id E11000, and the writer + // layer's already-applied detection is the right place to handle it + // (it can compare the existing doc to the incoming one). Skipping + // _id_ here — even under a "*" wildcard rule — would mask real data + // divergence, so always defer _id back to the writer. + if indexName == "_id_" { + return false, indexName + } + return shouldSkipDupKeyIndex(database, collection, indexName), indexName } diff --git a/oplog/change_stream_event_test.go b/oplog/change_stream_event_test.go index 63875a85..5aab3bcd 100644 --- a/oplog/change_stream_event_test.go +++ b/oplog/change_stream_event_test.go @@ -19,10 +19,12 @@ import ( "github.com/alibaba/MongoShake/v2/unit_test_common" ) -const ( +var ( testUrl = unit_test_common.TestUrl testMongoShardingAddress = unit_test_common.TestUrlSharding +) +const ( uuidMark = "ui" ) diff --git a/sharding/sharding_operation.go b/sharding/sharding_operation.go index 4867eac7..dabdd218 100644 --- a/sharding/sharding_operation.go +++ b/sharding/sharding_operation.go @@ -59,9 +59,14 @@ type ChunkRange struct { type ShardCollection struct { Chunks []*ChunkRange - // shard key may have multiple columns, for example {a:1, b:1, c:1} - Keys []string - ShardType string + // Shard key columns and the corresponding per-column shard type. + // ShardTypes[i] is the type of Keys[i] and is either HashedShard or + // RangedShard. Compound hashed keys (MongoDB 4.4+) such as + // {a: 1, b: "hashed"} are represented with a mix of values; consumers + // must check ShardTypes[i] per key instead of treating the collection + // as uniformly hashed or ranged. + Keys []string + ShardTypes []string } // {replset: {namespace: []ChunkRange} } @@ -122,8 +127,8 @@ func GetChunkMapByUrl(csUrl string) (ShardingChunkMap, error) { continue } - // get all keys and shard type(range or hashed) - keys, shardType, err := GetColShardType(conn, chunkDoc.Ns) + // get all keys and per-key shard type(range or hashed) + keys, shardTypes, err := GetColShardType(conn, chunkDoc.Ns) if err != nil { return nil, err } @@ -131,7 +136,7 @@ func GetChunkMapByUrl(csUrl string) (ShardingChunkMap, error) { // the namespace is sharded, chunk map of each shard need to initialize for _, dbChunkMap := range chunkMap { if _, ok := dbChunkMap[chunkDoc.Ns]; !ok { - dbChunkMap[chunkDoc.Ns] = &ShardCollection{Keys: keys, ShardType: shardType} + dbChunkMap[chunkDoc.Ns] = &ShardCollection{Keys: keys, ShardTypes: shardTypes} } } @@ -167,38 +172,56 @@ func GetChunkMapByUrl(csUrl string) (ShardingChunkMap, error) { return chunkMap, nil } -// input given namespace, return all keys and shard type(range or hashed) -func GetColShardType(conn *utils.MongoCommunityConn, namespace string) ([]string, string, error) { +// GetColShardType returns the shard key columns and the per-column shard +// type for the given namespace. The second return value has the same length +// as the first; each entry is either HashedShard or RangedShard. +// +// Compound shard keys can mix the two — e.g. {a: 1, b: "hashed"} (MongoDB +// 4.4+ compound hashed). Callers must consult the per-column type when +// deciding whether to ComputeHash; treating the whole collection as a +// single type silently mis-classifies documents at the shard boundary. +func GetColShardType(conn *utils.MongoCommunityConn, namespace string) ([]string, []string, error) { var colDoc bson.D if err := conn.Client.Database(ConfigDB).Collection(CollectionCol).FindOne(context.Background(), bson.M{"_id": namespace}).Decode(&colDoc); err != nil { - return nil, "", err + return nil, nil, err } - var keys []string - var shardType string - var ok bool - if colDoc, ok = oplog.GetKey(colDoc, "key").(bson.D); !ok { - return nil, "", fmt.Errorf("GetColShardType with namespace[%v] has no key item in doc %v", namespace, colDoc) + keyDoc, ok := oplog.GetKey(colDoc, "key").(bson.D) + if !ok { + return nil, nil, fmt.Errorf("GetColShardType with namespace[%v] has no key item in doc %v", namespace, colDoc) } + keys, shardTypes, err := parseShardKey(keyDoc) + if err != nil { + return nil, nil, fmt.Errorf("GetColShardType with namespace[%v]: %w", namespace, err) + } + return keys, shardTypes, nil +} - for _, item := range colDoc { - fmt.Println(item) - // either be a single hashed field, or a list of ascending fields +// parseShardKey turns a config.collections "key" subdocument into the per- +// column (name, type) pair. It is the pure data path of GetColShardType and +// is split out so we can unit-test compound hashed shard keys without a +// mongos. Each item's value is either a string ("hashed") or a numeric +// direction (1 / -1, including int32/int64/float64 as decoded by the BSON +// driver). The two slices returned are 1:1 with keyDoc and are intentionally +// not coalesced — callers like OrphanFilter.Filter rely on the per-column +// type rather than a collection-wide flag. +func parseShardKey(keyDoc bson.D) ([]string, []string, error) { + keys := make([]string, 0, len(keyDoc)) + shardTypes := make([]string, 0, len(keyDoc)) + for _, item := range keyDoc { switch v := item.Value.(type) { case string: - shardType = HashedShard - case int: - shardType = RangedShard - case float64: - shardType = RangedShard + shardTypes = append(shardTypes, HashedShard) + case int, int32, int64, float64: + shardTypes = append(shardTypes, RangedShard) default: - return nil, "", fmt.Errorf("GetColShardType with namespace[%v] doc[%v] meet unknown ShakeKey type[%v]", - namespace, colDoc, reflect.TypeOf(v)) + return nil, nil, fmt.Errorf("shard key field[%v] has unsupported value type[%v]", + item.Key, reflect.TypeOf(v)) } keys = append(keys, item.Key) } - return keys, shardType, nil + return keys, shardTypes, nil } type ShardCollectionSpec struct { diff --git a/sharding/sharding_operation_test.go b/sharding/sharding_operation_test.go new file mode 100644 index 00000000..9cbe50f2 --- /dev/null +++ b/sharding/sharding_operation_test.go @@ -0,0 +1,105 @@ +package sharding + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "go.mongodb.org/mongo-driver/bson" +) + +// parseShardKey is the pure-logic core of GetColShardType. These cases pin +// the per-column behaviour so a future "simplification" that collapses +// shard type into a single collection-wide value (the pre-fix bug) breaks +// here rather than silently mis-classifying docs in OrphanFilter.Filter. +func TestParseShardKey(t *testing.T) { + cases := []struct { + name string + keyDoc bson.D + wantKeys []string + wantTypes []string + wantErrSub string // non-empty -> expect error containing this substring + }{ + { + name: "single ranged", + keyDoc: bson.D{{"a", int32(1)}}, + wantKeys: []string{"a"}, + wantTypes: []string{RangedShard}, + }, + { + name: "single hashed", + keyDoc: bson.D{{"a", "hashed"}}, + wantKeys: []string{"a"}, + wantTypes: []string{HashedShard}, + }, + { + name: "compound ranged", + keyDoc: bson.D{{"a", int32(1)}, {"b", int32(1)}}, + wantKeys: []string{"a", "b"}, + wantTypes: []string{RangedShard, RangedShard}, + }, + { + // MongoDB 4.4+ compound hashed: ranged-first, hashed-second. + // Pre-fix GetColShardType would overwrite shardType every + // iteration so the whole collection looked HashedShard. + name: "compound hashed, ranged first", + keyDoc: bson.D{{"a", int32(1)}, {"b", "hashed"}}, + wantKeys: []string{"a", "b"}, + wantTypes: []string{RangedShard, HashedShard}, + }, + { + // Inverse direction. Pre-fix this would resolve to RangedShard + // (last field wins) so Filter would skip hashing entirely. + name: "compound hashed, hashed first", + keyDoc: bson.D{{"a", "hashed"}, {"b", int32(1)}}, + wantKeys: []string{"a", "b"}, + wantTypes: []string{HashedShard, RangedShard}, + }, + { + // Numeric direction may come back as int / int32 / int64 / + // float64 depending on the BSON decoder; all should be ranged. + name: "mixed numeric encodings are all ranged", + keyDoc: bson.D{{"a", int(1)}, {"b", int64(-1)}, {"c", float64(1)}}, + wantKeys: []string{"a", "b", "c"}, + wantTypes: []string{RangedShard, RangedShard, RangedShard}, + }, + { + name: "unsupported value type errors out", + keyDoc: bson.D{{"a", true}}, // bool is not a legal shard-key direction + wantErrSub: "field[a]", + }, + } + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + keys, types, err := parseShardKey(c.keyDoc) + if c.wantErrSub != "" { + assert.Error(t, err) + assert.Contains(t, err.Error(), c.wantErrSub) + return + } + assert.NoError(t, err) + assert.Equal(t, c.wantKeys, keys) + assert.Equal(t, c.wantTypes, types) + assert.Equal(t, len(keys), len(types), "per-column slices must be 1:1") + }) + } +} + +// Round-trip through bson.Marshal/Unmarshal — verifies parseShardKey works +// on the exact value shapes the driver hands us when decoding a real +// config.collections document, not just hand-built bson.D literals. +func TestParseShardKey_BsonRoundTrip(t *testing.T) { + original := bson.D{{"a", int32(1)}, {"b", "hashed"}} + raw, err := bson.Marshal(bson.D{{"key", original}}) + assert.NoError(t, err) + + var decoded bson.D + assert.NoError(t, bson.Unmarshal(raw, &decoded)) + + keyDoc, ok := decoded[0].Value.(bson.D) + assert.True(t, ok, "decoded key field must be bson.D") + + keys, types, err := parseShardKey(keyDoc) + assert.NoError(t, err) + assert.Equal(t, []string{"a", "b"}, keys) + assert.Equal(t, []string{RangedShard, HashedShard}, types) +} diff --git a/sharding/sharding_operation_test_bak.go b/sharding/sharding_operation_test_bak.go index 1c545857..d56ffc4e 100644 --- a/sharding/sharding_operation_test_bak.go +++ b/sharding/sharding_operation_test_bak.go @@ -20,7 +20,7 @@ func TestSharding(t *testing.T) { mp, err := GetChunkMapByUrl(testCsAddress) assert.Equal(t, nil, err, "should be equal") for key, val := range mp["test-replica-set"] { - fmt.Printf("%v -> key[%v] type[%v] chunks[%v]\n", key, val.Keys, val.ShardType, val.Chunks) + fmt.Printf("%v -> key[%v] type[%v] chunks[%v]\n", key, val.Keys, val.ShardTypes, val.Chunks) if val.Chunks != nil { for _, chunk := range val.Chunks { fmt.Printf(" [%v, %v]\n", chunk.Mins, chunk.Maxs) diff --git a/tests/integration/sharded/README.md b/tests/integration/sharded/README.md new file mode 100644 index 00000000..043a55ec --- /dev/null +++ b/tests/integration/sharded/README.md @@ -0,0 +1,67 @@ +# Sharded integration test (orphan filter) + +Reproducible end-to-end test for `full_sync.executor.filter.orphan_document`, +covering the three configurations from issue #978: + +| Case | Source config | `orphan_document` | Expected dest | +|---|---|---|---| +| A | `mongo_urls` only (mongod direct) | `true` | 2 docs (orphan filtered) | +| B | `mongo_urls` + `mongo_s_url` (the #978 user's config) | `true` | `<2` docs **and** WARN line in log | +| C | `mongo_urls` only | `false` | baseline failure (dup-key panic) | + +## What this catches + +- **Regressions on the OrphanFilter range/compound boundary logic** — + unit tests cover the chunk math, this exercises the full `Find → Filter + → BulkWrite` pipeline against a real cluster. +- **The mongos-mode silent failure** (issue #978) — Case B asserts that + the post-fix WARN log line is present, so future refactors can't + reintroduce the silent fallthrough. +- **Baseline behaviour** — Case C ensures the dup-key path still raises; + if a future change ever silently swallows it we want the test to fail. + +## Running + +Prerequisites: Docker + `pymongo` (`pip install pymongo`) + a built +collector binary in `../../../bin/`. + +```bash +cd tests/integration/sharded +docker compose up -d +./setup_cluster.sh # init replsets + addShard + stopBalancer +./inject_orphan.py # craft orphan scenario +./verify_orphan_filter.py ../../../bin/collector.darwin # or collector.linux +docker compose down -v # tear down +``` + +The whole loop should complete in well under a minute on a laptop. + +## Caveats + +- Single-node replsets are used for each shard / configsvr / dest to keep + the cluster small; do not use this layout for performance work. +- `setup_cluster.sh` uses `docker compose exec` (NOT `--network=host`) + to run the mongo client inside the compose network. `--network=host` + is a Linux-only Docker Desktop feature and was silently broken on + macOS / Windows in earlier revisions of this scaffold. +- The shards are started with `--setParameter enableTestCommands=1` + so `inject_orphan.py` can directly insert into a shardsvr without + triggering `StaleConfig`. **This is a test-only knob; never enable + it in production.** +- The script disables the balancer up front so the orphan injected into + rs2 cannot be silently migrated/cleaned during the test window. +- This is a **scaffold**, not yet wired into CI. The collector binary is + invoked once per case and waited on synchronously, which works for the + full-sync mode used here but would need polling/timeout logic for + long-running incremental tests. + +## Where to take it next + +- Wire into a `make integration-test` target that builds the collector + and runs through the three cases on every push (or nightly). +- Extend the matrix to hashed shards (covers `ComputeHash` for + `ObjectID`/`string`/`int64` end-to-end against the real cluster, not + just unit tests). +- Add a parallel scenario that triggers the unmarshal-error path in + `doc_executor.go:194` to confirm the syncer no longer panics on + malformed payloads (today only checked by unit tests / code review). diff --git a/tests/integration/sharded/docker-compose.yml b/tests/integration/sharded/docker-compose.yml new file mode 100644 index 00000000..af59f9b7 --- /dev/null +++ b/tests/integration/sharded/docker-compose.yml @@ -0,0 +1,48 @@ +# Minimal sharded MongoDB cluster + single-node destination for MongoShake +# integration testing of full_sync.executor.filter.orphan_document. +# +# Topology: +# configsvr (single-node replSet "cfg") :27019 +# shard1 (single-node replSet "rs1") :27018 +# shard2 (single-node replSet "rs2") :27028 +# mongos :27017 +# dest (single-node replSet "dst") :27117 ← MongoShake target +# +# Notes: +# - Uses replSet=1 even for single-member shards because sharding requires it. +# - All ports are bound to 127.0.0.1 to avoid clashing with the host network. +# - Bring up: docker compose -f docker-compose.yml up -d +# - Tear down: docker compose -f docker-compose.yml down -v + +services: + configsvr: + image: mongo:4.4 + command: mongod --configsvr --replSet cfg --port 27019 --bind_ip_all + ports: ["127.0.0.1:27019:27019"] + + # NOTE: --setParameter enableTestCommands=1 is required for the orphan + # injection step in inject_orphan.py to bypass sharded-collection + # write-version checks on shardsvr. Without it, a direct insert into a + # sharded namespace targets a shard whose chunk metadata says it owns + # nothing for that key range and gets rejected with StaleConfig / + # CannotImplicitlyCreateCollection. Test-mode only — do not use in prod. + shard1: + image: mongo:4.4 + command: mongod --shardsvr --replSet rs1 --port 27018 --bind_ip_all --setParameter enableTestCommands=1 + ports: ["127.0.0.1:27018:27018"] + + shard2: + image: mongo:4.4 + command: mongod --shardsvr --replSet rs2 --port 27028 --bind_ip_all --setParameter enableTestCommands=1 + ports: ["127.0.0.1:27028:27028"] + + mongos: + image: mongo:4.4 + depends_on: [configsvr, shard1, shard2] + command: mongos --configdb cfg/configsvr:27019 --port 27017 --bind_ip_all + ports: ["127.0.0.1:27017:27017"] + + dest: + image: mongo:4.4 + command: mongod --replSet dst --port 27117 --bind_ip_all + ports: ["127.0.0.1:27117:27117"] diff --git a/tests/integration/sharded/inject_orphan.py b/tests/integration/sharded/inject_orphan.py new file mode 100755 index 00000000..46d9e6c8 --- /dev/null +++ b/tests/integration/sharded/inject_orphan.py @@ -0,0 +1,98 @@ +#!/usr/bin/env python3 +""" +Construct an orphan-document scenario on the sharded cluster brought up by +docker-compose.yml + setup_cluster.sh, then run a sanity check. + +Layout produced (range shard on {a:1, b:1}): + + chunk on rs1: { (MinKey, MinKey) <= (a,b) < (5, 0) } + chunk on rs2: { (5, 0) <= (a,b) < (MaxKey, MaxKey) } + +Documents: + 1. {_id: "owned-by-rs1", a: 1, b: 100} ← inserted via mongos, lives on rs1 + 2. {_id: "owned-by-rs2", a: 7, b: 100} ← inserted via mongos, lives on rs2 + 3. {_id: "owned-by-rs1", a: 1, b: 100} ← injected DIRECTLY on rs2 (orphan) + +After full sync with full_sync.executor.filter.orphan_document=true (and +mongod-direct source), the destination must contain exactly 2 docs (#1 and +#2). Doc #3 must be filtered out as orphan. + +Without the filter (or in mongos-mode source) the destination either fails +on dup-key (#1 vs #3) or contains all three docs. + +Requires: pymongo (`pip install pymongo`). +""" +from __future__ import annotations + +import sys + +try: + from pymongo import MongoClient +except ImportError: + sys.exit("pymongo missing; pip install pymongo") + +DB, COLL = "shaketest", "orphan_demo" +NS = f"{DB}.{COLL}" + + +def main() -> int: + mongos = MongoClient("mongodb://127.0.0.1:27017", directConnection=False) + rs1 = MongoClient("mongodb://127.0.0.1:27018", directConnection=True) + rs2 = MongoClient("mongodb://127.0.0.1:27028", directConnection=True) + + # Reset namespace + mongos.drop_database(DB) + + # Enable sharding and set the chunk split. + admin = mongos["admin"] + admin.command("enableSharding", DB) + admin.command("shardCollection", NS, key={"a": 1, "b": 1}) + admin.command("split", NS, middle={"a": 5, "b": 0}) + + # Move chunks so rs1 owns [MinKey, (5,0)) and rs2 owns [(5,0), MaxKey). + # After split, both chunks may live on the same shard. Move as needed. + admin.command("moveChunk", NS, find={"a": 1, "b": 0}, to="rs1") + admin.command("moveChunk", NS, find={"a": 5, "b": 0}, to="rs2") + + # Normal inserts via mongos. + mongos[DB][COLL].insert_many([ + {"_id": "owned-by-rs1", "a": 1, "b": 100, "tag": "normal"}, + {"_id": "owned-by-rs2", "a": 7, "b": 100, "tag": "normal"}, + ]) + + # Orphan: inject {_id: "owned-by-rs1", a: 1, b: 100} directly into rs2. + # rs2 only owns chunks with a >= 5, so this doc does not belong here. + # + # Direct shardsvr writes to a sharded collection normally fail with + # StaleConfig because rs2's local chunk metadata says it owns nothing + # in the (a<5) range. The compose file enables enableTestCommands=1 + # on each shard so this insert bypasses the sharded-write version + # check. If you ever see "StaleConfig" or similar here, double-check + # that the shards were brought up with that setParameter. + try: + rs2[DB][COLL].insert_one({"_id": "owned-by-rs1", "a": 1, "b": 100, "tag": "orphan"}) + except Exception as e: # broad: pymongo raises various OperationFailures + sys.exit( + f"direct write to rs2 failed ({e!r}). The orphan-injection step needs " + "shardsvr in test mode (enableTestCommands=1, set in docker-compose.yml). " + "If you are running against a non-test cluster, this scenario can't be " + "reproduced via direct insert." + ) + + # Sanity: rs1 has 1, rs2 has 2 (one legit + one orphan). + rs1_count = rs1[DB][COLL].count_documents({}) + rs2_count = rs2[DB][COLL].count_documents({}) + print(f"rs1 holds {rs1_count} doc(s); rs2 holds {rs2_count} doc(s)") + assert rs1_count == 1, f"expected 1 on rs1, got {rs1_count}" + assert rs2_count == 2, f"expected 2 on rs2 (legit + orphan), got {rs2_count}" + + # mongos sees what it routes — should be 2 (orphan filtered by routing). + via_mongos = mongos[DB][COLL].count_documents({}) + print(f"mongos sees {via_mongos} doc(s) — orphan {'NOT visible' if via_mongos == 2 else 'VISIBLE'} via routing") + + print("orphan scenario ready.") + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/tests/integration/sharded/setup_cluster.sh b/tests/integration/sharded/setup_cluster.sh new file mode 100755 index 00000000..78980b15 --- /dev/null +++ b/tests/integration/sharded/setup_cluster.sh @@ -0,0 +1,46 @@ +#!/usr/bin/env bash +# Initialise the cluster brought up by docker-compose.yml. +# Idempotent: re-running after a successful init prints harmless errors but +# leaves the cluster in the expected state. +# +# Cross-platform note: --network=host is Linux-only on Docker Desktop +# (macOS/Windows ignore it and the inner mongo client can't reach the +# compose services). Prefer `docker compose exec` so the mongo client +# runs inside the compose network and addresses services by their +# compose service names. +set -euo pipefail + +COMPOSE=${COMPOSE:-docker compose} + +mongo_run() { + # Args: + local svc=$1 + local port=$2 + shift 2 + # Use the mongo client baked into the service's own image; the client + # is reachable via the compose network's service DNS, so this works + # identically on Linux / macOS / Windows. + $COMPOSE exec -T "$svc" mongo --quiet --host "127.0.0.1:${port}" --eval "$*" +} + +echo "[1/5] init configsvr replset cfg" +mongo_run configsvr 27019 'rs.initiate({_id: "cfg", configsvr: true, members: [{_id: 0, host: "configsvr:27019"}]})' || true + +echo "[2/5] init shard rs1" +mongo_run shard1 27018 'rs.initiate({_id: "rs1", members: [{_id: 0, host: "shard1:27018"}]})' || true + +echo "[3/5] init shard rs2" +mongo_run shard2 27028 'rs.initiate({_id: "rs2", members: [{_id: 0, host: "shard2:27028"}]})' || true + +echo "[4/5] init dest replset dst" +mongo_run dest 27117 'rs.initiate({_id: "dst", members: [{_id: 0, host: "dest:27117"}]})' || true + +# Wait for mongos to see the configsvr. +echo "[5/5] add shards via mongos" +sleep 5 +mongo_run mongos 27017 'sh.addShard("rs1/shard1:27018"); sh.addShard("rs2/shard2:27028")' + +# Disable balancer so we can craft orphan rows deterministically. +mongo_run mongos 27017 'sh.stopBalancer()' + +echo "cluster ready." diff --git a/tests/integration/sharded/verify_orphan_filter.py b/tests/integration/sharded/verify_orphan_filter.py new file mode 100755 index 00000000..9340de9f --- /dev/null +++ b/tests/integration/sharded/verify_orphan_filter.py @@ -0,0 +1,164 @@ +#!/usr/bin/env python3 +""" +Run MongoShake against the orphan scenario with three configs and assert +the destination state matches the expected behaviour for each. + +Prereqs: + 1. ./setup_cluster.sh (cluster up) + 2. ./inject_orphan.py (orphan scenario in place) + 3. MongoShake collector binary at ../../../bin/collector.darwin (or .linux) + +Run: + ./verify_orphan_filter.py /path/to/collector.linux + +Cases: + A. mongo_urls only + filter.orphan_document=true + → dest should have exactly 2 docs (orphan filtered) + B. mongo_urls + mongo_s_url + filter.orphan_document=true (issue #978 repro) + → dest should have <2 docs OR fail with dup-key (orphan NOT filtered), + AND the collector log must contain the new WARN message + C. mongo_urls only + filter.orphan_document=false + → dest should fail with dup-key (baseline) +""" +from __future__ import annotations + +import os +import shutil +import subprocess +import sys +import tempfile +import time + +from pymongo import MongoClient + +DB, COLL = "shaketest", "orphan_demo" +NS = f"{DB}.{COLL}" + +CONF_TEMPLATE = """\ +id = shake-test +conf.version = 10 +sync_mode = full +log.level = info +log.dir = {log_dir} +log.file = collector.log +mongo_urls = mongodb://127.0.0.1:27018/?directConnection=true;mongodb://127.0.0.1:27028/?directConnection=true +mongo_cs_url = mongodb://127.0.0.1:27019/?directConnection=true +mongo_connect_mode = standalone +{mongo_s_line} +tunnel.address = mongodb://127.0.0.1:27117/?directConnection=true +checkpoint.storage.url = mongodb://127.0.0.1:27117/?directConnection=true +full_sync.executor.filter.orphan_document = {orphan_filter} +full_sync.executor.insert_on_dup_update = false +full_sync.create_index = none +""" + + +def write_conf(path: str, log_dir: str, with_mongos: bool, orphan_filter: bool) -> None: + mongo_s_line = "mongo_s_url = mongodb://127.0.0.1:27017" if with_mongos else "# mongo_s_url =" + with open(path, "w") as f: + f.write(CONF_TEMPLATE.format( + log_dir=log_dir, + mongo_s_line=mongo_s_line, + orphan_filter="true" if orphan_filter else "false", + )) + + +def reset_dest() -> None: + MongoClient("mongodb://127.0.0.1:27117", directConnection=True).drop_database(DB) + + +def run_collector(binary: str, conf_path: str, timeout: int = 60) -> tuple[int, str]: + proc = subprocess.run( + [binary, "-conf", conf_path, "-verbose", "0"], + capture_output=True, + text=True, + timeout=timeout, + ) + return proc.returncode, proc.stdout + "\n" + proc.stderr + + +def dest_count() -> int: + return MongoClient("mongodb://127.0.0.1:27117", directConnection=True)[DB][COLL].count_documents({}) + + +def case(name: str, ok: bool, detail: str) -> None: + flag = "PASS" if ok else "FAIL" + print(f" [{flag}] {name}: {detail}") + + +def main(binary: str) -> int: + workdir = tempfile.mkdtemp(prefix="shake-it-") + print(f"workdir: {workdir}") + failures = 0 + + # Case A: mongod-direct + filter on → expect 2 docs in dest. + print("\nCase A: mongod-direct + orphan_document=true") + reset_dest() + conf = os.path.join(workdir, "A.conf") + log_dir = os.path.join(workdir, "A_log"); os.makedirs(log_dir, exist_ok=True) + write_conf(conf, log_dir, with_mongos=False, orphan_filter=True) + rc, out = run_collector(binary, conf) + cnt = dest_count() + ok = (rc == 0 and cnt == 2) + case("filter takes effect", ok, f"rc={rc} dest_count={cnt} (expect 2)") + if not ok: + failures += 1 + + # Case B: mongod-direct + mongos + filter on → WARN expected, filter + # silently disabled. Assert both: + # 1) the new WARN line is present (proves the fix logged the heads-up), and + # 2) the destination matches the unfiltered baseline (proves the WARN is + # not just cosmetic — orphans really were NOT filtered, otherwise a + # regression that re-enabled filtering with mongos+orphan would pass + # the warning-text check while silently changing behaviour back). + print("\nCase B: mongos + orphan_document=true (#978 repro)") + reset_dest() + conf = os.path.join(workdir, "B.conf") + log_dir = os.path.join(workdir, "B_log"); os.makedirs(log_dir, exist_ok=True) + write_conf(conf, log_dir, with_mongos=True, orphan_filter=True) + rc, out = run_collector(binary, conf) + cnt = dest_count() + log_blob = "" + log_path = os.path.join(log_dir, "collector.log") + if os.path.exists(log_path): + with open(log_path) as f: + log_blob = f.read() + has_warn = "orphan filter will NOT take effect" in log_blob + case("WARN present in log", has_warn, "looking for 'orphan filter will NOT take effect'") + if not has_warn: + failures += 1 + # Same shape as Case C: filter is effectively off, so the run either + # fails on dup-key or finishes with fewer than 2 docs. + behaved_as_unfiltered = (rc != 0 or cnt < 2) + case("filter actually disabled (baseline behaviour)", behaved_as_unfiltered, + f"rc={rc} dest_count={cnt} (expect rc!=0 or dest<2)") + if not behaved_as_unfiltered: + failures += 1 + + # Case C: filter off → baseline failure mode. + print("\nCase C: orphan_document=false (baseline)") + reset_dest() + conf = os.path.join(workdir, "C.conf") + log_dir = os.path.join(workdir, "C_log"); os.makedirs(log_dir, exist_ok=True) + write_conf(conf, log_dir, with_mongos=False, orphan_filter=False) + rc, _ = run_collector(binary, conf) + cnt = dest_count() + # Expect either a non-zero rc (panic on dup-key) or fewer than 2 docs from + # interrupted sync. Accepting either captures the baseline failure mode. + ok = (rc != 0 or cnt < 2) + case("baseline fails or partial", ok, f"rc={rc} dest_count={cnt}") + if not ok: + failures += 1 + + shutil.rmtree(workdir, ignore_errors=True) + if failures: + print(f"\n{failures} case(s) failed") + return 1 + print("\nall cases passed") + return 0 + + +if __name__ == "__main__": + if len(sys.argv) != 2: + sys.exit("usage: ./verify_orphan_filter.py /path/to/collector") + sys.exit(main(sys.argv[1])) diff --git a/unit_test_common/include.go b/unit_test_common/include.go index 72e9d91d..bef3a87c 100644 --- a/unit_test_common/include.go +++ b/unit_test_common/include.go @@ -1,11 +1,32 @@ package unit_test_common -const ( - //TestUrl = "mongodb://root:MongoDB2022@11.158.213.66:40000,11.158.213.66:40001,11.158.213.66:40002/admin" - TestUrl = "mongodb://root_special_char3:~!@#$^&*()_-=@11.158.213.66:40000,11.158.213.66:40001,11.158.213.66:40002/admin" +import "os" - TestUrlConfigServer = "mongodb://admin:admin@11.158.213.66:38100" - TestUrlServerlessTenant = "mongodb://admin:admin@11.158.213.66:38200" - TestUrlSharding = "mongodb://admin:admin@11.158.213.66:38200" - TestUrlSsl = "mongodb://47.102.27.138:3717" +// Test connection info is loaded from environment variables so the repo +// never ships real endpoints or credentials. When an env var is unset the +// value falls back to a localhost placeholder (or empty string for inputs +// that have no safe default such as TLS URIs and CA paths); connectivity +// tests should `t.Skip` when they read an empty value. +// +// Recognised env vars: +// MONGOSHAKE_TEST_URL - replica set URI +// MONGOSHAKE_TEST_URL_SSL - SSL/TLS URI (no default) +// MONGOSHAKE_TEST_URL_CONFIG_SERVER - sharded cluster config server URI (no default) +// MONGOSHAKE_TEST_URL_SERVERLESS - serverless tenant URI (no default) +// MONGOSHAKE_TEST_URL_SHARDING - mongos URI (no default) +// MONGOSHAKE_TEST_CA_PEM - filesystem path to CA chain pem (no default) +var ( + TestUrl = envOr("MONGOSHAKE_TEST_URL", "mongodb://localhost:27017") + TestUrlSsl = os.Getenv("MONGOSHAKE_TEST_URL_SSL") + TestUrlConfigServer = os.Getenv("MONGOSHAKE_TEST_URL_CONFIG_SERVER") + TestUrlServerlessTenant = os.Getenv("MONGOSHAKE_TEST_URL_SERVERLESS") + TestUrlSharding = os.Getenv("MONGOSHAKE_TEST_URL_SHARDING") + TestCaPem = os.Getenv("MONGOSHAKE_TEST_CA_PEM") ) + +func envOr(key, fallback string) string { + if v := os.Getenv(key); v != "" { + return v + } + return fallback +}