Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
*.sw[ap]
*.yml
!docs/monitor/*.yml
!tests/integration/**/*.yml
tags
*.bak
*.tar.gz
Expand All @@ -31,3 +32,6 @@ common/123.pid

#Claude
.claude/

# Local-only test env vars (see docs/refactor-test-config-env-vars.md)
.env.test
2 changes: 1 addition & 1 deletion collector/coordinator/extra_job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
"github.com/alibaba/MongoShake/v2/unit_test_common"
)

const (
var (
testMongoAddress = unit_test_common.TestUrl
)

Expand Down
12 changes: 12 additions & 0 deletions collector/coordinator/full.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,18 @@ func (coordinator *ReplicationCoordinator) startDocumentReplication() (err error
}
} else {
l.Logger.Infof("source is replica or mongos, no need to fetching chunk map")
// surface a clear warning when full_sync.executor.filter.orphan_document=true is
// configured but cannot take effect: in mongos mode the orphan filter has no
// way to attribute a fetched doc to a specific shard, so silently dropping the
// option (the previous behavior) gives users the impression it was applied.
if fromIsSharding && coordinator.MongoS != nil &&
conf.Options.FullSyncExecutorFilterOrphanDocument {
l.Logger.Warnf("full_sync.executor.filter.orphan_document=true is set but " +
"source is mongos (mongo_s_url configured); the orphan filter will NOT take " +
"effect because docs read via mongos are not tagged with their origin shard. " +
"To enable orphan filtering, drop mongo_s_url and use mongo_urls (mongod direct) " +
"plus mongo_cs_url, or run cleanupOrphaned on the source cluster before full sync.")
}
}

filterList := filter.NewDocFilterList()
Expand Down
9 changes: 6 additions & 3 deletions collector/coordinator/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,14 @@ import (
"github.com/alibaba/MongoShake/v2/unit_test_common"
)

const (
var (
testUrl = unit_test_common.TestUrl
testUrlServerless = unit_test_common.TestUrlServerlessTenant
testDb = "test_db3"
testCollection = "test_ut"
)

const (
testDb = "test_db3"
testCollection = "test_ut"
)

func TestSelectSyncMode(t *testing.T) {
Expand Down
15 changes: 11 additions & 4 deletions collector/docsyncer/doc_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,10 +192,17 @@ func (exec *DocExecutor) doSync(docs []*bson.Raw) error {
if conf.Options.FullSyncExecutorFilterOrphanDocument && exec.syncer.orphanFilter != nil {
var docData bson.D
if err := bson.Unmarshal(*doc, &docData); err != nil {
l.Logger.Errorf("doSync do bson unmarshal %v failed. %v", doc, err)
}
// judge whether is orphan document, pass if so
if exec.syncer.orphanFilter.Filter(docData, ns.Database+"."+ns.Collection) {
// previously fell through with an empty docData, which then caused
// OrphanFilter.Filter -> oplog.GetKey to return nil and Panicf the
// process. Skip the orphan check on unparseable docs instead so a
// single bad payload can't crash the syncer; the corrupt bytes
// will surface again at BulkWrite as a structured driver error.
l.Logger.Errorf("doSync skip orphan check, bson unmarshal failed: %v", err)
// intentional fall-through to BulkWrite (no continue):
// the malformed payload should reach the driver and surface
// there as a typed write error, not be silently dropped.
} else if exec.syncer.orphanFilter.Filter(docData, ns.Database+"."+ns.Collection) {
// judge whether is orphan document, pass if so
l.Logger.Infof("orphan document [%v] filter", doc)
continue
}
Expand Down
13 changes: 8 additions & 5 deletions collector/docsyncer/doc_syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@ import (
"github.com/alibaba/MongoShake/v2/unit_test_common"
)

const (
var (
testMongoAddress = unit_test_common.TestUrl
testDb = "test_db"
testCollection = "test_coll"
)

const (
testDb = "test_db"
testCollection = "test_coll"
)

var (
Expand Down Expand Up @@ -189,8 +192,8 @@ func TestDbSync(t *testing.T) {
},
},
},
Keys: []string{"x"},
ShardType: sharding.RangedShard,
Keys: []string{"x"},
ShardTypes: []string{sharding.RangedShard},
},
})
dbSyncer := &DBSyncer{
Expand Down
Loading