diff --git a/pkg/frontend/data_branch.go b/pkg/frontend/data_branch.go index 53b0008f818ad..f324c21d04791 100644 --- a/pkg/frontend/data_branch.go +++ b/pkg/frontend/data_branch.go @@ -140,15 +140,19 @@ func (retBatchPool *retBatchList) acquireRetBatch(tblStuff tableStuff, forTombst if retBatchPool.dataVecCnt == 0 { retBatchPool.dataVecCnt = len(tblStuff.def.colNames) - retBatchPool.tombVecCnt = 1 + retBatchPool.tombVecCnt = 3 retBatchPool.dataTypes = tblStuff.def.colTypes retBatchPool.tombstoneType = tblStuff.def.colTypes[tblStuff.def.pkColIdx] + retBatchPool.tombRowIDType = types.T_Rowid.ToType() + retBatchPool.tombKeyType = types.T_varbinary.ToType() } if forTombstone { if len(retBatchPool.tList) == 0 { bat = batch.NewWithSize(retBatchPool.tombVecCnt) bat.Vecs[0] = vector.NewVec(retBatchPool.tombstoneType) + bat.Vecs[1] = vector.NewVec(retBatchPool.tombRowIDType) + bat.Vecs[2] = vector.NewVec(retBatchPool.tombKeyType) goto done } @@ -161,6 +165,12 @@ func (retBatchPool *retBatchList) acquireRetBatch(tblStuff tableStuff, forTombst if !typeMatched(bat.Vecs[0], retBatchPool.tombstoneType) { panic(moerr.NewInternalErrorNoCtxf("retBatchPool: tombstone vec type mismatch, got %v expect %v", bat.Vecs[0].GetType(), retBatchPool.tombstoneType)) } + if !typeMatched(bat.Vecs[1], retBatchPool.tombRowIDType) { + panic(moerr.NewInternalErrorNoCtxf("retBatchPool: tombstone rowid vec type mismatch, got %v expect %v", bat.Vecs[1].GetType(), retBatchPool.tombRowIDType)) + } + if !typeMatched(bat.Vecs[2], retBatchPool.tombKeyType) { + panic(moerr.NewInternalErrorNoCtxf("retBatchPool: tombstone key vec type mismatch, got %v expect %v", bat.Vecs[2].GetType(), retBatchPool.tombKeyType)) + } bat.CleanOnlyData() goto done diff --git a/pkg/frontend/data_branch_hashdiff.go b/pkg/frontend/data_branch_hashdiff.go index 729355fceab0b..af97f845c5b1a 100644 --- a/pkg/frontend/data_branch_hashdiff.go +++ b/pkg/frontend/data_branch_hashdiff.go @@ -305,10 +305,14 @@ func handleDelsOnLCA( ) if len(sels) == 0 { - tBat.Vecs[0].CleanOnlyData() + for i := range tBat.Vecs { + tBat.Vecs[i].CleanOnlyData() + } tBat.SetRowCount(0) } else { - tBat.Vecs[0].Shrink(sels, false) + for i := range tBat.Vecs { + tBat.Vecs[i].Shrink(sels, false) + } tBat.SetRowCount(tBat.Vecs[0].Length()) } logutil.Debug( @@ -1210,12 +1214,48 @@ func findDeleteAndUpdateBat( // merge inserts and deletes on the tar // this deletes is not on the lca if tombBat.RowCount() > 0 { - if _, err2 = dataHashmap.PopByVectorsStream( - []*vector.Vector{tombBat.Vecs[0]}, false, nil, - ); err2 != nil { - tblStuff.retPool.releaseRetBatch(tombBat, true) - tblStuff.retPool.releaseRetBatch(dBat, false) - return err2 + removedLiveRows := 0 + tombRowIDs := vector.MustFixedColNoTypeCheck[types.Rowid](tombBat.Vecs[1]) + for i := 0; i < tombBat.RowCount(); i++ { + keyBytes := tombBat.Vecs[2].GetBytesAt(i) + var dataRet databranchutils.GetResult + if dataRet, err2 = dataHashmap.GetByEncodedKey(keyBytes); err2 != nil { + tblStuff.retPool.releaseRetBatch(tombBat, true) + tblStuff.retPool.releaseRetBatch(dBat, false) + return err2 + } + if !dataRet.Exists { + continue + } + var matchedRow []byte + for _, row := range dataRet.Rows { + var liveTuple types.Tuple + if liveTuple, _, err2 = dataHashmap.DecodeRow(row); err2 != nil { + tblStuff.retPool.releaseRetBatch(tombBat, true) + tblStuff.retPool.releaseRetBatch(dBat, false) + return err2 + } + liveRowIDBytes, ok := liveTuple[0].([]uint8) + if !ok { + continue + } + liveRowID := types.DecodeFixed[types.Rowid](liveRowIDBytes) + if !liveRowID.EQ(&tombRowIDs[i]) { + continue + } + matchedRow = row + break + } + if matchedRow == nil { + continue + } + var removed int + if removed, err2 = dataHashmap.PopByEncodedKeyValue(keyBytes, matchedRow, false); err2 != nil { + tblStuff.retPool.releaseRetBatch(tombBat, true) + tblStuff.retPool.releaseRetBatch(dBat, false) + return err2 + } + removedLiveRows += removed } tblStuff.retPool.releaseRetBatch(tombBat, true) } @@ -1347,7 +1387,7 @@ func findDeleteAndUpdateBat( return nil } - if err2 = cursor.ForEach(func(key []byte, _ []byte) error { + if err2 = cursor.ForEach(func(key []byte, row []byte) error { select { case <-ctx.Done(): return ctx.Err() @@ -1360,6 +1400,21 @@ func findDeleteAndUpdateBat( if err2 = vector.AppendAny(tBat1.Vecs[0], tuple[0], false, ses.proc.Mp()); err2 != nil { return err2 } + var tombTuple types.Tuple + if tombTuple, _, err2 = tombstoneHashmap.DecodeRow(row); err2 != nil { + return err2 + } + rowIDBytes, ok := tombTuple[0].([]uint8) + if !ok { + return moerr.NewInternalErrorNoCtx("tombstone row missing rowid in hashmap payload") + } + rowID := types.DecodeFixed[types.Rowid](rowIDBytes) + if err2 = vector.AppendFixed(tBat1.Vecs[1], rowID, false, ses.proc.Mp()); err2 != nil { + return err2 + } + if err2 = vector.AppendBytes(tBat1.Vecs[2], append([]byte(nil), key...), false, ses.proc.Mp()); err2 != nil { + return err2 + } tBat1.SetRowCount(tBat1.Vecs[0].Length()) if tBat1.RowCount() >= maxTombstoneBatchCnt { @@ -1391,21 +1446,18 @@ func findDeleteAndUpdateBat( } func appendTupleToBat(ses *Session, bat *batch.Batch, tuple types.Tuple, tblStuff tableStuff) error { - limit := len(tuple) - if limit == bat.VectorCount()+1 { - // Some change-collection paths keep commit ts in the encoded payload while - // others only materialize visible columns. Trim the trailing commit ts only - // when it is actually present. - limit-- - } - if limit != bat.VectorCount() { + if bat.VectorCount() != len(tblStuff.def.colNames) { return moerr.NewInternalErrorNoCtxf( - "unexpected tuple width %d for batch with %d vectors on table %s", - len(tuple), bat.VectorCount(), tblStuff.tarRel.GetTableName(), + "unexpected batch width %d for table %s with %d physical columns", + bat.VectorCount(), tblStuff.tarRel.GetTableName(), len(tblStuff.def.colNames), ) } - for j, val := range tuple[:limit] { - vec := bat.Vecs[j] + for colIdx := range tblStuff.def.colNames { + val, err := getTupleColumnValue(tuple, tblStuff, colIdx) + if err != nil { + return err + } + vec := bat.Vecs[colIdx] if err := appendTupleValueToVector(vec, val, ses.proc.Mp()); err != nil { return err } @@ -1416,6 +1468,113 @@ func appendTupleToBat(ses *Session, bat *batch.Batch, tuple types.Tuple, tblStuf return nil } +func getTupleColumnValue(tuple types.Tuple, tblStuff tableStuff, colIdx int) (any, error) { + totalColCnt := len(tblStuff.def.colTypes) + if colIdx < 0 || colIdx >= totalColCnt { + return nil, moerr.NewInternalErrorNoCtxf( + "column index %d out of range for table %s with %d columns", + colIdx, tblStuff.tarRel.GetTableName(), totalColCnt, + ) + } + switch len(tuple) { + case totalColCnt, totalColCnt + 1: + return tuple[colIdx], nil + case totalColCnt + 2: + return tuple[colIdx+1], nil + default: + return nil, moerr.NewInternalErrorNoCtxf( + "unexpected tuple width %d for table %s with %d visible columns", + len(tuple), tblStuff.tarRel.GetTableName(), len(tblStuff.def.visibleIdxes), + ) + } +} + +func visibleTupleKeyIdxes(tblStuff tableStuff) []int { + idxes := make([]int, len(tblStuff.def.visibleIdxes)) + for i, colIdx := range tblStuff.def.visibleIdxes { + idxes[i] = colIdx + 1 + } + return idxes +} + +func batchSampleRowsForLog(bat *batch.Batch, limit int) []string { + if bat == nil || bat.RowCount() == 0 || limit <= 0 { + return nil + } + if limit > bat.RowCount() { + limit = bat.RowCount() + } + rows := make([]string, 0, limit) + for rowIdx := 0; rowIdx < limit; rowIdx++ { + cols := make([]string, 0, bat.VectorCount()) + for _, vec := range bat.Vecs { + if vec == nil { + cols = append(cols, "") + continue + } + if rowIdx >= vec.Length() { + cols = append(cols, "") + continue + } + if vec.GetNulls().Contains(uint64(rowIdx)) { + cols = append(cols, "NULL") + continue + } + cols = append(cols, fmt.Sprintf("%v", types.DecodeValue(vec.GetRawBytesAt(rowIdx), vec.GetType().Oid))) + } + rows = append(rows, strings.Join(cols, ", ")) + } + return rows +} + +func validateLeadingRowID(side, tableName string, isTombstone bool, bat *batch.Batch) error { + if bat == nil || bat.RowCount() == 0 { + return nil + } + buildFields := func() []zap.Field { + return []zap.Field{ + zap.String("side", side), + zap.String("table-name", tableName), + zap.Bool("tombstone", isTombstone), + zap.Int("row-cnt", bat.RowCount()), + zap.Int("vec-cnt", bat.VectorCount()), + zap.Strings("attrs", append([]string(nil), bat.Attrs...)), + zap.Strings("samples", batchSampleRowsForLog(bat, 4)), + } + } + fail := func(msg string, extra ...zap.Field) error { + logutil.Error(msg, append(buildFields(), extra...)...) + return moerr.NewInternalErrorNoCtx(msg) + } + if bat.VectorCount() == 0 || bat.Vecs[0] == nil { + return fail("DataBranch-CollectChanges-MissingRowID") + } + rowIDVec := bat.Vecs[0] + if rowIDVec.GetType().Oid != types.T_Rowid { + return fail("DataBranch-CollectChanges-InvalidRowIDVector", + zap.String("rowid-vec-type", rowIDVec.GetType().String()), + ) + } + if rowIDVec.Length() != bat.RowCount() { + return fail("DataBranch-CollectChanges-RowIDLenMismatch", + zap.Int("rowid-vec-len", rowIDVec.Length()), + ) + } + rowIDs := vector.MustFixedColNoTypeCheck[types.Rowid](rowIDVec) + for i := range rowIDs { + if rowIDVec.GetNulls().Contains(uint64(i)) { + return fail("DataBranch-CollectChanges-NullRowID", zap.Int("row-idx", i)) + } + if rowIDs[i].EQ(&types.EmptyRowid) || rowIDs[i].BorrowBlockID().IsEmpty() { + return fail("DataBranch-CollectChanges-InvalidRowID", + zap.Int("row-idx", i), + zap.String("rowid", rowIDs[i].String()), + ) + } + } + return nil +} + func appendTupleValueToVector(vec *vector.Vector, val any, mp *mpool.MPool) error { if val == nil { return vector.AppendNull(vec, mp) @@ -1441,7 +1600,12 @@ func checkConflictAndAppendToBat( case tree.CONFLICT_FAIL: buf := acquireBuffer(tblStuff.bufPool) for i, idx := range tblStuff.def.pkColIdxes { - if err2 = formatValIntoString(ses, tarTuple[idx], tblStuff.def.colTypes[idx], buf); err2 != nil { + var val any + if val, err2 = getTupleColumnValue(tarTuple, tblStuff, idx); err2 != nil { + releaseBuffer(tblStuff.bufPool, buf) + return err2 + } + if err2 = formatValIntoString(ses, val, tblStuff.def.colTypes[idx], buf); err2 != nil { releaseBuffer(tblStuff.bufPool, buf) return err2 } @@ -1491,7 +1655,7 @@ func diffDataHelper( if tblStuff.def.pkKind == fakeKind { var ( - keyIdxes = tblStuff.def.visibleIdxes + keyIdxes = visibleTupleKeyIdxes(tblStuff) newHashmap databranchutils.BranchHashmap ) @@ -1502,6 +1666,14 @@ func diffDataHelper( return err } baseDataHashmap = newHashmap + + if newHashmap, err = tarDataHashmap.Migrate(keyIdxes, -1); err != nil { + return err + } + if err = tarDataHashmap.Close(); err != nil { + return err + } + tarDataHashmap = newHashmap } if err = tarDataHashmap.ForEachShardParallel(func(cursor databranchutils.ShardCursor) error { @@ -1526,7 +1698,7 @@ func diffDataHelper( } if tblStuff.def.pkKind == fakeKind { - if checkRet, err2 = baseDataHashmap.PopByEncodedFullValue(row, false); err2 != nil { + if checkRet, err2 = baseDataHashmap.PopByEncodedKey(key, false); err2 != nil { return err2 } } else { @@ -1565,9 +1737,16 @@ func diffDataHelper( // pk columns already compared continue } + var tarVal, baseVal any + if tarVal, err2 = getTupleColumnValue(tarTuple, tblStuff, idx); err2 != nil { + return err2 + } + if baseVal, err2 = getTupleColumnValue(baseTuple, tblStuff, idx); err2 != nil { + return err2 + } if types.CompareValue( - tarTuple[idx], baseTuple[idx], + tarVal, baseVal, ) != 0 { notSame = true break @@ -1813,8 +1992,11 @@ func buildHashmapForTable( if pickKeyHashmap != nil { pkIdx := tblStuff.def.pkColIdx if isTombstone { - // After updateTombstoneBatch, tombstone PK is at Vec[0]. - pkIdx = 0 + // Data branch keeps rowid in Vec[0] while building the + // hashmaps, so tombstone PK lives at Vec[1]. + pkIdx = 1 + } else { + pkIdx++ } var results []databranchutils.GetResult results, taskErr = pickKeyHashmap.GetByVectors( @@ -1840,9 +2022,9 @@ func buildHashmapForTable( } if isTombstone { - taskErr = tombstoneHashmap.PutByVectors(bat.Vecs[:ll], []int{0}) + taskErr = tombstoneHashmap.PutByVectors(bat.Vecs[:ll], []int{1}) } else { - taskErr = dataHashmap.PutByVectors(bat.Vecs[:ll], []int{tblStuff.def.pkColIdx}) + taskErr = dataHashmap.PutByVectors(bat.Vecs[:ll], []int{tblStuff.def.pkColIdx + 1}) } } if taskErr != nil { @@ -1873,6 +2055,19 @@ func buildHashmapForTable( break } + tableName := "" + if side == "base" && tblStuff.baseRel != nil { + tableName = tblStuff.baseRel.GetTableName() + } else if side == "target" && tblStuff.tarRel != nil { + tableName = tblStuff.tarRel.GetTableName() + } + if err = validateLeadingRowID(side, tableName, false, dataBat); err != nil { + return + } + if err = validateLeadingRowID(side, tableName, true, tombstoneBat); err != nil { + return + } + if dataBat != nil && dataBat.RowCount() > 0 { totalRows += int64(dataBat.RowCount()) totalBytes += int64(dataBat.Size()) diff --git a/pkg/frontend/data_branch_hashdiff_test.go b/pkg/frontend/data_branch_hashdiff_test.go index e1bdc2701b208..ad3ed29623519 100644 --- a/pkg/frontend/data_branch_hashdiff_test.go +++ b/pkg/frontend/data_branch_hashdiff_test.go @@ -22,6 +22,7 @@ import ( "testing" "github.com/golang/mock/gomock" + "github.com/matrixorigin/matrixone/pkg/catalog" "github.com/matrixorigin/matrixone/pkg/common/moerr" "github.com/matrixorigin/matrixone/pkg/common/mpool" "github.com/matrixorigin/matrixone/pkg/container/batch" @@ -30,6 +31,7 @@ import ( "github.com/matrixorigin/matrixone/pkg/defines" "github.com/matrixorigin/matrixone/pkg/frontend/databranchutils" mock_frontend "github.com/matrixorigin/matrixone/pkg/frontend/test" + "github.com/matrixorigin/matrixone/pkg/objectio" "github.com/matrixorigin/matrixone/pkg/pb/plan" "github.com/matrixorigin/matrixone/pkg/pb/timestamp" "github.com/matrixorigin/matrixone/pkg/sql/parsers/tree" @@ -924,19 +926,30 @@ func (s *stubEngineChangesHandle) Close() error { } func buildHashDiffDataBatch(t *testing.T, mp *mpool.MPool, rows [][]any) *batch.Batch { + rowIDs := make([]types.Rowid, len(rows)) + for i := range rows { + rowIDs[i] = buildHashDiffRowID(t, i) + } + return buildHashDiffDataBatchWithRowIDs(t, mp, rowIDs, rows) +} + +func buildHashDiffDataBatchWithRowIDs(t *testing.T, mp *mpool.MPool, rowIDs []types.Rowid, rows [][]any) *batch.Batch { t.Helper() + require.Len(t, rowIDs, len(rows)) - bat := batch.NewWithSize(4) - bat.SetAttributes([]string{"id", "name", "hidden", "__commit_ts"}) - bat.Vecs[0] = vector.NewVec(types.T_int64.ToType()) - bat.Vecs[1] = vector.NewVec(types.T_varchar.ToType()) + bat := batch.NewWithSize(5) + bat.SetAttributes([]string{catalog.Row_ID, "id", "name", "hidden", "__commit_ts"}) + bat.Vecs[0] = vector.NewVec(types.T_Rowid.ToType()) + bat.Vecs[1] = vector.NewVec(types.T_int64.ToType()) bat.Vecs[2] = vector.NewVec(types.T_varchar.ToType()) bat.Vecs[3] = vector.NewVec(types.T_varchar.ToType()) + bat.Vecs[4] = vector.NewVec(types.T_varchar.ToType()) - for _, row := range rows { + for rowIdx, row := range rows { require.Len(t, row, 4) + require.NoError(t, appendTestVectorValue(bat.Vecs[0], rowIDs[rowIdx], mp)) for i, val := range row { - require.NoError(t, appendTestVectorValue(bat.Vecs[i], val, mp)) + require.NoError(t, appendTestVectorValue(bat.Vecs[i+1], val, mp)) } } bat.SetRowCount(len(rows)) @@ -944,23 +957,43 @@ func buildHashDiffDataBatch(t *testing.T, mp *mpool.MPool, rows [][]any) *batch. } func buildHashDiffTombstoneBatch(t *testing.T, mp *mpool.MPool, rows [][]any) *batch.Batch { + rowIDs := make([]types.Rowid, len(rows)) + for i := range rows { + rowIDs[i] = buildHashDiffRowID(t, i) + } + return buildHashDiffTombstoneBatchWithRowIDs(t, mp, rowIDs, rows) +} + +func buildHashDiffTombstoneBatchWithRowIDs(t *testing.T, mp *mpool.MPool, rowIDs []types.Rowid, rows [][]any) *batch.Batch { t.Helper() + require.Len(t, rowIDs, len(rows)) - bat := batch.NewWithSize(2) - bat.SetAttributes([]string{"id", "__commit_ts"}) - bat.Vecs[0] = vector.NewVec(types.T_int64.ToType()) - bat.Vecs[1] = vector.NewVec(types.T_varchar.ToType()) + bat := batch.NewWithSize(3) + bat.SetAttributes([]string{catalog.Row_ID, "id", "__commit_ts"}) + bat.Vecs[0] = vector.NewVec(types.T_Rowid.ToType()) + bat.Vecs[1] = vector.NewVec(types.T_int64.ToType()) + bat.Vecs[2] = vector.NewVec(types.T_varchar.ToType()) - for _, row := range rows { + for rowIdx, row := range rows { require.Len(t, row, 2) + require.NoError(t, appendTestVectorValue(bat.Vecs[0], rowIDs[rowIdx], mp)) for i, val := range row { - require.NoError(t, appendTestVectorValue(bat.Vecs[i], val, mp)) + require.NoError(t, appendTestVectorValue(bat.Vecs[i+1], val, mp)) } } bat.SetRowCount(len(rows)) return bat } +func buildHashDiffRowID(t *testing.T, rowIdx int) types.Rowid { + t.Helper() + var uid types.Uuid + uid[0] = byte(rowIdx + 1) + uid[15] = byte(rowIdx + 1) + blkID := objectio.NewBlockid(&uid, 0, 1) + return types.NewRowid(blkID, uint32(rowIdx)) +} + func commitTSBytes(ts types.TS) []byte { buf := make([]byte, len(ts)) copy(buf, ts[:]) @@ -996,3 +1029,83 @@ func TestLCAProbeJoinCastType(t *testing.T) { }) } } + +func TestValidateLeadingRowID(t *testing.T) { + mp := mpool.MustNewZero() + + makeBatchWithLeadingType := func(t *testing.T, oid types.T) *batch.Batch { + t.Helper() + bat := batch.NewWithSize(2) + bat.SetAttributes([]string{catalog.Row_ID, "id"}) + bat.Vecs[0] = vector.NewVec(oid.ToType()) + bat.Vecs[1] = vector.NewVec(types.T_int64.ToType()) + require.NoError(t, vector.AppendFixed(bat.Vecs[1], int64(1), false, mp)) + bat.SetRowCount(1) + return bat + } + + t.Run("nil and empty pass", func(t *testing.T) { + require.NoError(t, validateLeadingRowID("base", "t", false, nil)) + bat := batch.NewWithSize(0) + bat.SetRowCount(0) + require.NoError(t, validateLeadingRowID("base", "t", false, bat)) + }) + + t.Run("missing rowid vector", func(t *testing.T) { + bat := batch.NewWithSize(0) + bat.SetRowCount(1) + err := validateLeadingRowID("base", "t", false, bat) + require.Error(t, err) + require.Contains(t, err.Error(), "DataBranch-CollectChanges-MissingRowID") + }) + + t.Run("wrong leading type", func(t *testing.T) { + bat := makeBatchWithLeadingType(t, types.T_int64) + require.NoError(t, vector.AppendFixed(bat.Vecs[0], int64(1), false, mp)) + err := validateLeadingRowID("base", "t", false, bat) + require.Error(t, err) + require.Contains(t, err.Error(), "DataBranch-CollectChanges-InvalidRowIDVector") + }) + + t.Run("length mismatch", func(t *testing.T) { + bat := batch.NewWithSize(2) + bat.SetAttributes([]string{catalog.Row_ID, "id"}) + bat.Vecs[0] = vector.NewVec(types.T_Rowid.ToType()) + bat.Vecs[1] = vector.NewVec(types.T_int64.ToType()) + require.NoError(t, vector.AppendFixed(bat.Vecs[1], int64(1), false, mp)) + require.NoError(t, vector.AppendFixed(bat.Vecs[1], int64(2), false, mp)) + uid, err := types.BuildUuid() + require.NoError(t, err) + blkID := objectio.NewBlockid(&uid, 0, 1) + require.NoError(t, vector.AppendFixed(bat.Vecs[0], types.NewRowid(blkID, 0), false, mp)) + bat.SetRowCount(2) + err = validateLeadingRowID("base", "t", false, bat) + require.Error(t, err) + require.Contains(t, err.Error(), "DataBranch-CollectChanges-RowIDLenMismatch") + }) + + t.Run("null rowid", func(t *testing.T) { + bat := makeBatchWithLeadingType(t, types.T_Rowid) + require.NoError(t, vector.AppendFixed(bat.Vecs[0], types.Rowid{}, true, mp)) + err := validateLeadingRowID("base", "t", false, bat) + require.Error(t, err) + require.Contains(t, err.Error(), "DataBranch-CollectChanges-NullRowID") + }) + + t.Run("empty rowid", func(t *testing.T) { + bat := makeBatchWithLeadingType(t, types.T_Rowid) + require.NoError(t, vector.AppendFixed(bat.Vecs[0], types.EmptyRowid, false, mp)) + err := validateLeadingRowID("base", "t", false, bat) + require.Error(t, err) + require.Contains(t, err.Error(), "DataBranch-CollectChanges-InvalidRowID") + }) + + t.Run("valid rowid passes", func(t *testing.T) { + bat := makeBatchWithLeadingType(t, types.T_Rowid) + uid, err := types.BuildUuid() + require.NoError(t, err) + blkID := objectio.NewBlockid(&uid, 0, 1) + require.NoError(t, vector.AppendFixed(bat.Vecs[0], types.NewRowid(blkID, 0), false, mp)) + require.NoError(t, validateLeadingRowID("base", "t", false, bat)) + }) +} diff --git a/pkg/frontend/data_branch_types.go b/pkg/frontend/data_branch_types.go index 5e3e44457e2b0..57fe280fbe4f8 100644 --- a/pkg/frontend/data_branch_types.go +++ b/pkg/frontend/data_branch_types.go @@ -145,6 +145,8 @@ type retBatchList struct { tombVecCnt int dataTypes []types.Type tombstoneType types.Type + tombRowIDType types.Type + tombKeyType types.Type } type compositeOption struct { diff --git a/pkg/frontend/databranchutils/branch_change_handle.go b/pkg/frontend/databranchutils/branch_change_handle.go index 89e63767bad1b..9a4b72dc3fb28 100644 --- a/pkg/frontend/databranchutils/branch_change_handle.go +++ b/pkg/frontend/databranchutils/branch_change_handle.go @@ -26,7 +26,10 @@ import ( var _ engine.ChangesHandle = new(BranchChangeHandle) type BranchChangeHandle struct { - handle engine.ChangesHandle + handle engine.ChangesHandle + snapshotReadPolicy engine.SnapshotReadPolicy + retainRowID bool + pkFilter *engine.PKFilter } func (b *BranchChangeHandle) Next( @@ -44,6 +47,14 @@ func (b *BranchChangeHandle) Next( return } + ctx = engine.WithSnapshotReadPolicy(ctx, b.snapshotReadPolicy) + if b.retainRowID { + ctx = engine.WithRetainRowID(ctx, true) + } + if b.pkFilter != nil && len(b.pkFilter.Segments) > 0 { + ctx = engine.WithPKFilter(ctx, b.pkFilter) + } + return b.handle.Next(ctx, mp) } @@ -63,8 +74,12 @@ var CollectChanges = func( ) (engine.ChangesHandle, error) { if end.GE(&from) { - handle := new(BranchChangeHandle) + handle := &BranchChangeHandle{ + snapshotReadPolicy: engine.SnapshotReadPolicyVisibleState, + retainRowID: true, + } ctx = engine.WithSnapshotReadPolicy(ctx, engine.SnapshotReadPolicyVisibleState) + ctx = engine.WithRetainRowID(ctx, true) var err error if handle.handle, err = rel.CollectChanges( ctx, from, end, false, mp, @@ -92,8 +107,13 @@ var CollectChangesWithPKFilter = func( ) (engine.ChangesHandle, error) { if end.GE(&from) { - handle := new(BranchChangeHandle) + handle := &BranchChangeHandle{ + snapshotReadPolicy: engine.SnapshotReadPolicyVisibleState, + retainRowID: true, + pkFilter: pkFilter, + } ctx = engine.WithSnapshotReadPolicy(ctx, engine.SnapshotReadPolicyVisibleState) + ctx = engine.WithRetainRowID(ctx, true) if pkFilter != nil && len(pkFilter.Segments) > 0 { ctx = engine.WithPKFilter(ctx, pkFilter) } diff --git a/pkg/frontend/databranchutils/branch_change_handle_test.go b/pkg/frontend/databranchutils/branch_change_handle_test.go index 4b95d342ba713..d26efa00cef82 100644 --- a/pkg/frontend/databranchutils/branch_change_handle_test.go +++ b/pkg/frontend/databranchutils/branch_change_handle_test.go @@ -35,9 +35,11 @@ type fakeChangesHandle struct { hint engine.ChangesHandle_Hint err error closed bool + lastCtx context.Context } -func (f *fakeChangesHandle) Next(context.Context, *mpool.MPool) (*batch.Batch, *batch.Batch, engine.ChangesHandle_Hint, error) { +func (f *fakeChangesHandle) Next(ctx context.Context, _ *mpool.MPool) (*batch.Batch, *batch.Batch, engine.ChangesHandle_Hint, error) { + f.lastCtx = ctx return f.data, f.tombstone, f.hint, f.err } @@ -74,6 +76,26 @@ func TestBranchChangeHandleNextPassthrough(t *testing.T) { require.Equal(t, engine.ChangesHandle_Tail_done, hint) } +func TestBranchChangeHandleNextReappliesPolicies(t *testing.T) { + fake := &fakeChangesHandle{} + pkFilter := &engine.PKFilter{ + Segments: [][]byte{{1, 2, 3}}, + PrimarySeqnum: 7, + } + h := &BranchChangeHandle{ + handle: fake, + snapshotReadPolicy: engine.SnapshotReadPolicyVisibleState, + retainRowID: true, + pkFilter: pkFilter, + } + + _, _, _, err := h.Next(context.Background(), nil) + require.NoError(t, err) + require.Equal(t, engine.SnapshotReadPolicyVisibleState, engine.SnapshotReadPolicyFromContext(fake.lastCtx)) + require.True(t, engine.RetainRowIDFromContext(fake.lastCtx)) + require.Same(t, pkFilter, engine.PKFilterFromContext(fake.lastCtx)) +} + func TestBranchChangeHandleNextUnderlyingError(t *testing.T) { h := &BranchChangeHandle{ handle: &fakeChangesHandle{err: moerr.NewInternalErrorNoCtx("next failed")}, @@ -121,6 +143,8 @@ func TestCollectChangesRange(t *testing.T) { actual := handle.(*BranchChangeHandle) require.Equal(t, fake, actual.handle) + require.Equal(t, engine.SnapshotReadPolicyVisibleState, actual.snapshotReadPolicy) + require.True(t, actual.retainRowID) } func TestCollectChangesSkipsEmptyRange(t *testing.T) { @@ -163,3 +187,42 @@ func TestCollectChangesPropagatesError(t *testing.T) { require.Nil(t, handle) require.ErrorIs(t, err, expectedErr) } + +func TestCollectChangesWithPKFilterPropagatesPoliciesToHandle(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + rel := mock_frontend.NewMockRelation(ctrl) + from := types.BuildTS(1, 0) + end := types.BuildTS(2, 0) + fake := &fakeChangesHandle{} + pkFilter := &engine.PKFilter{ + Segments: [][]byte{{4, 5, 6}}, + PrimarySeqnum: 3, + } + + rel.EXPECT().CollectChanges(gomock.Any(), from, end, false, gomock.Any()).DoAndReturn( + func( + ctx context.Context, + _ types.TS, + _ types.TS, + _ bool, + _ *mpool.MPool, + ) (engine.ChangesHandle, error) { + require.Equal(t, engine.SnapshotReadPolicyVisibleState, engine.SnapshotReadPolicyFromContext(ctx)) + require.True(t, engine.RetainRowIDFromContext(ctx)) + require.Same(t, pkFilter, engine.PKFilterFromContext(ctx)) + return fake, nil + }, + ) + + handle, err := CollectChangesWithPKFilter(context.Background(), rel, from, end, nil, pkFilter) + require.NoError(t, err) + require.IsType(t, &BranchChangeHandle{}, handle) + + actual := handle.(*BranchChangeHandle) + require.Equal(t, fake, actual.handle) + require.Equal(t, engine.SnapshotReadPolicyVisibleState, actual.snapshotReadPolicy) + require.True(t, actual.retainRowID) + require.Same(t, pkFilter, actual.pkFilter) +} diff --git a/pkg/vm/engine/change_handle_policy_test.go b/pkg/vm/engine/change_handle_policy_test.go index 06412c2ad16ff..f1e54d2a61ef4 100644 --- a/pkg/vm/engine/change_handle_policy_test.go +++ b/pkg/vm/engine/change_handle_policy_test.go @@ -31,3 +31,14 @@ func TestSnapshotReadPolicyContextRoundTrip(t *testing.T) { ctx := WithSnapshotReadPolicy(base, SnapshotReadPolicyVisibleState) require.Equal(t, SnapshotReadPolicyVisibleState, SnapshotReadPolicyFromContext(ctx)) } + +func TestRetainRowIDContextRoundTrip(t *testing.T) { + base := context.Background() + + require.False(t, RetainRowIDFromContext(nil)) + require.False(t, RetainRowIDFromContext(base)) + require.True(t, WithRetainRowID(base, false) == base) + + ctx := WithRetainRowID(base, true) + require.True(t, RetainRowIDFromContext(ctx)) +} diff --git a/pkg/vm/engine/change_handle_rowid.go b/pkg/vm/engine/change_handle_rowid.go new file mode 100644 index 0000000000000..dc49cbf2849eb --- /dev/null +++ b/pkg/vm/engine/change_handle_rowid.go @@ -0,0 +1,39 @@ +// Copyright 2026 Matrix Origin +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package engine + +import "context" + +type retainRowIDKey struct{} + +// WithRetainRowID requests that CollectChanges keep rowid in its internal batch +// shape for callers that need row-level provenance. Callers that don't opt in +// keep the existing output contract. +func WithRetainRowID(ctx context.Context, retain bool) context.Context { + if ctx == nil || !retain { + return ctx + } + return context.WithValue(ctx, retainRowIDKey{}, true) +} + +// RetainRowIDFromContext reports whether the caller requested rowid retention. +// The default remains false so existing callers are unchanged. +func RetainRowIDFromContext(ctx context.Context) bool { + if ctx == nil { + return false + } + retain, _ := ctx.Value(retainRowIDKey{}).(bool) + return retain +} diff --git a/pkg/vm/engine/disttae/change_handle.go b/pkg/vm/engine/disttae/change_handle.go index 8f1ec35d91e8e..3f90f213adc2b 100644 --- a/pkg/vm/engine/disttae/change_handle.go +++ b/pkg/vm/engine/disttae/change_handle.go @@ -19,6 +19,7 @@ import ( "fmt" "time" + "github.com/matrixorigin/matrixone/pkg/catalog" "github.com/matrixorigin/matrixone/pkg/common/moerr" "github.com/matrixorigin/matrixone/pkg/common/mpool" "github.com/matrixorigin/matrixone/pkg/container/batch" @@ -549,6 +550,7 @@ type CheckpointChangesHandle struct { duration time.Duration dataLength int lastPrintTime time.Time + retainRowID bool } func NewCheckpointChangesHandle( @@ -558,10 +560,11 @@ func NewCheckpointChangesHandle( mp *mpool.MPool, ) (*CheckpointChangesHandle, error) { handle := &CheckpointChangesHandle{ - end: end, - table: table, - fs: table.getTxn().engine.fs, - sid: table.proc.Load().GetService(), + end: end, + table: table, + fs: table.getTxn().engine.fs, + sid: table.proc.Load().GetService(), + retainRowID: engine.RetainRowIDFromContext(ctx), } err := handle.initReader(ctx) return handle, err @@ -639,10 +642,51 @@ func (h *CheckpointChangesHandle) Next( data.Clean(mp) return } - rowidVec := data.Vecs[len(data.Vecs)-1] - rowidVec.Free(mp) - data.Vecs[len(data.Vecs)-1] = committs - data.Attrs[len(data.Attrs)-1] = objectio.DefaultCommitTS_Attr + rowIDIdx := -1 + for i, vec := range data.Vecs { + if vec != nil && vec.GetType().Oid == types.T_Rowid { + rowIDIdx = i + break + } + } + if h.retainRowID { + if rowIDIdx < 0 { + data.Clean(mp) + committs.Free(mp) + err = moerr.NewInternalErrorNoCtx("checkpoint changes handle missing rowid vector") + return + } + rowIDVec := data.Vecs[rowIDIdx] + rewrittenVecs := make([]*vector.Vector, 0, len(data.Vecs)+1) + rewrittenVecs = append(rewrittenVecs, rowIDVec) + for i, vec := range data.Vecs { + if i == rowIDIdx { + continue + } + rewrittenVecs = append(rewrittenVecs, vec) + } + rewrittenVecs = append(rewrittenVecs, committs) + data.Vecs = rewrittenVecs + + rewrittenAttrs := make([]string, 0, len(data.Attrs)+1) + rewrittenAttrs = append(rewrittenAttrs, catalog.Row_ID) + for i, attr := range data.Attrs { + if i == rowIDIdx { + continue + } + rewrittenAttrs = append(rewrittenAttrs, attr) + } + rewrittenAttrs = append(rewrittenAttrs, objectio.DefaultCommitTS_Attr) + data.Attrs = rewrittenAttrs + } else { + if rowIDIdx >= 0 { + data.Vecs[rowIDIdx].Free(mp) + data.Vecs = append(data.Vecs[:rowIDIdx], data.Vecs[rowIDIdx+1:]...) + data.Attrs = append(data.Attrs[:rowIDIdx], data.Attrs[rowIDIdx+1:]...) + } + data.Vecs = append(data.Vecs, committs) + data.Attrs = append(data.Attrs, objectio.DefaultCommitTS_Attr) + } h.duration += time.Since(t0) h.dataLength += data.Vecs[0].Length() return diff --git a/pkg/vm/engine/disttae/logtailreplay/change_handle.go b/pkg/vm/engine/disttae/logtailreplay/change_handle.go index 38273caebe0bf..c7aa461cf458d 100755 --- a/pkg/vm/engine/disttae/logtailreplay/change_handle.go +++ b/pkg/vm/engine/disttae/logtailreplay/change_handle.go @@ -22,6 +22,7 @@ import ( goSort "sort" + "github.com/matrixorigin/matrixone/pkg/catalog" "github.com/matrixorigin/matrixone/pkg/fileservice" "github.com/matrixorigin/matrixone/pkg/logutil" "github.com/matrixorigin/matrixone/pkg/objectio" @@ -230,6 +231,7 @@ type CNObjectHandle struct { base *baseHandle cache []*batch.Batch + blks []types.Blockid TSs []types.TS } @@ -241,17 +243,22 @@ func NewCNObjectHandle(isTombstone bool, objects []*objectio.ObjectEntry, fs fil fs: fs, mp: mp, cache: make([]*batch.Batch, 0), + blks: make([]types.Blockid, 0), } } func (h *CNObjectHandle) prefetch(ctx context.Context) (err error) { t0 := time.Now() jobs := make([]*tasks.Job, 0) + blks := make([]types.Blockid, 0) for i := 0; i < LoadParallism; i++ { if h.objectOffsetCursor >= len(h.objects) { break } - stats := h.objects[h.objectOffsetCursor].ObjectStats - h.TSs = append(h.TSs, h.objects[h.objectOffsetCursor].CreateTime) + entry := h.objects[h.objectOffsetCursor] + stats := entry.ObjectStats + blk := uint16(h.blkOffsetCursor) + h.TSs = append(h.TSs, entry.CreateTime) + blks = append(blks, objectio.NewBlockidWithObjectID(stats.ObjectName().ObjectId(), blk)) job := prefetchObjects(ctx, uint32(h.blkOffsetCursor), h.fs, &stats, h.base.changesHandle.scheduler) jobs = append(jobs, job) h.blkOffsetCursor++ @@ -260,7 +267,7 @@ func (h *CNObjectHandle) prefetch(ctx context.Context) (err error) { h.objectOffsetCursor++ } } - for _, job := range jobs { + for i, job := range jobs { res := job.GetResult() if res.Err != nil { err = res.Err @@ -274,6 +281,7 @@ func (h *CNObjectHandle) prefetch(ctx context.Context) (err error) { putJob(job) bat := res.Res.(*batch.Batch) h.cache = append(h.cache, bat) + h.blks = append(h.blks, blks[i]) } h.base.changesHandle.readDuration += time.Since(t0) return @@ -295,20 +303,32 @@ func (h *CNObjectHandle) Next(ctx context.Context, bat **batch.Batch, mp *mpool. } } data := h.cache[0] + var blk *types.Blockid + if len(h.blks) > 0 { + blk = &h.blks[0] + } ts := h.TSs[0] t0 := time.Now() if h.isTombstone { - updateCNTombstoneBatch( + if err = updateCNTombstoneBatch( data, ts, + blk, + h.base.changesHandle.retainRowID, h.mp, - ) + ); err != nil { + return err + } } else { - updateCNDataBatch( + if err = updateCNDataBatch( data, ts, + blk, + h.base.changesHandle.retainRowID, h.mp, - ) + ); err != nil { + return err + } } h.base.changesHandle.updateDuration += time.Since(t0) t0 = time.Now() @@ -326,6 +346,9 @@ func (h *CNObjectHandle) Next(ctx context.Context, bat **batch.Batch, mp *mpool. return moerr.GetOkExpectedEOB() } h.cache = h.cache[1:] + if len(h.blks) > 0 { + h.blks = h.blks[1:] + } h.TSs = h.TSs[1:] srcLen := data.Vecs[0].Length() sels := make([]int64, srcLen) @@ -368,6 +391,7 @@ type AObjectHandle struct { fs fileservice.FileService mp *mpool.MPool cache []*batch.Batch + blks []types.Blockid p *baseHandle // blockPlans caches block-level commit-ts overlap decisions for objects. @@ -402,6 +426,7 @@ func NewAObjectHandle(ctx context.Context, p *baseHandle, isTombstone bool, star mp: mp, p: p, cache: make([]*batch.Batch, 0), + blks: make([]types.Blockid, 0), blockPlans: make(map[string]*aobjBlockPlan), } return handle @@ -662,6 +687,7 @@ func calcPruneRate(pruned, total int) float64 { func (h *AObjectHandle) prefetch(ctx context.Context) (err error) { t0 := time.Now() jobs := make([]*tasks.Job, 0) + blks := make([]types.Blockid, 0) for i := 0; i < LoadParallism; i++ { obj, blk, ok, targetErr := h.nextPrefetchTarget(ctx) if targetErr != nil { @@ -675,8 +701,9 @@ func (h *AObjectHandle) prefetch(ctx context.Context) (err error) { stats := obj.ObjectStats job := prefetchObjects(ctx, uint32(blk), h.fs, &stats, h.p.changesHandle.scheduler) jobs = append(jobs, job) + blks = append(blks, objectio.NewBlockidWithObjectID(stats.ObjectName().ObjectId(), blk)) } - for _, job := range jobs { + for i, job := range jobs { res := job.GetResult() if res.Err != nil { err = res.Err @@ -690,6 +717,7 @@ func (h *AObjectHandle) prefetch(ctx context.Context) (err error) { putJob(job) bat := res.Res.(*batch.Batch) h.cache = append(h.cache, bat) + h.blks = append(h.blks, blks[i]) } h.p.changesHandle.readDuration += time.Since(t0) return @@ -728,11 +756,20 @@ func (h *AObjectHandle) getNextAObject(ctx context.Context) (err error) { } h.currentBatch = h.cache[0] h.cache = h.cache[1:] + var blk *types.Blockid + if len(h.blks) > 0 { + blk = &h.blks[0] + h.blks = h.blks[1:] + } t0 := time.Now() if h.isTombstone { - updateTombstoneBatch(h.currentBatch, h.start, h.end, h.p.skipTS, !h.quick, h.mp) + if err = updateTombstoneBatch(h.currentBatch, h.start, h.end, h.p.skipTS, !h.quick, blk, h.p.changesHandle.retainRowID, h.mp); err != nil { + return err + } } else { - updateDataBatch(h.currentBatch, h.start, h.end, h.mp) + if err = updateDataBatch(h.currentBatch, h.start, h.end, blk, h.p.changesHandle.retainRowID, h.mp); err != nil { + return err + } } h.p.changesHandle.updateDuration += time.Since(t0) h.batchLength = h.currentBatch.Vecs[0].Length() @@ -1023,7 +1060,7 @@ func (p *baseHandle) getBatchesFromRowIterator(iter btree.IterG[*RowEntry], star entry := iter.Item() if checkTS(start, end, entry.Time) { if !entry.Deleted && !tombstone { - fillInInsertBatch(&bat, entry, mp) + fillInInsertBatch(&bat, entry, p.changesHandle.retainRowID, mp) } if entry.Deleted && tombstone { if p.skipTS != nil { @@ -1032,7 +1069,7 @@ func (p *baseHandle) getBatchesFromRowIterator(iter btree.IterG[*RowEntry], star continue } } - fillInDeleteBatch(&bat, entry, mp) + fillInDeleteBatch(&bat, entry, p.changesHandle.retainRowID, mp) } } } @@ -1391,6 +1428,8 @@ type ChangeHandler struct { // and row level. Only DATA BRANCH PICK sets this; other callers leave it nil. pkFilter *engine.PKFilter + retainRowID bool + LogThreshold time.Duration } @@ -1518,6 +1557,7 @@ func NewChangesHandlerWithPartitionStateRange( enableCommitTSBlockPrune: true, strictCommitTSBlockPrune: true, enableDeleteChainResolve: true, + retainRowID: engine.RetainRowIDFromContext(ctx), } defer func() { if err != nil { @@ -1585,6 +1625,7 @@ func newChangesHandlerWithCheckpointEntries( mp: mp, scheduler: tasks.NewParallelJobScheduler(LoadParallism), isRecoveryMode: isRecoveryMode, + retainRowID: engine.RetainRowIDFromContext(ctx), } defer func() { if err == nil { @@ -1815,6 +1856,7 @@ func NewChangesHandler( mp: mp, scheduler: tasks.NewParallelJobScheduler(LoadParallism), pkFilter: engine.PKFilterFromContext(ctx), + retainRowID: engine.RetainRowIDFromContext(ctx), } defer func() { if err != nil { @@ -1967,7 +2009,11 @@ func filterBatch(data, tombstone *batch.Batch, primarySeqnum int, skipDeletes bo rowInfoMap := make(map[any][]rowInfo) // Process data batch - pkVec := data.Vecs[primarySeqnum] + dataPKIdx := primarySeqnum + if len(data.Vecs) > 0 && data.Vecs[0] != nil && data.Vecs[0].GetType().Oid == types.T_Rowid { + dataPKIdx++ + } + pkVec := data.Vecs[dataPKIdx] tsVec := data.Vecs[len(data.Vecs)-1] timestamps := vector.MustFixedColWithTypeCheck[types.TS](tsVec) for i := 0; i < pkVec.Length(); i++ { @@ -1983,8 +2029,14 @@ func filterBatch(data, tombstone *batch.Batch, primarySeqnum int, skipDeletes bo } // Process tombstone batch - pkVec = tombstone.Vecs[0] - tsVec = tombstone.Vecs[1] + tombstonePKIdx := 0 + tombstoneTSIdx := 1 + if len(tombstone.Vecs) > 0 && tombstone.Vecs[0] != nil && tombstone.Vecs[0].GetType().Oid == types.T_Rowid { + tombstonePKIdx = 1 + tombstoneTSIdx = 2 + } + pkVec = tombstone.Vecs[tombstonePKIdx] + tsVec = tombstone.Vecs[tombstoneTSIdx] timestamps = vector.MustFixedColWithTypeCheck[types.TS](tsVec) for i := 0; i < pkVec.Length(); i++ { pkVal := vector.GetAny(pkVec, i, false) @@ -2295,8 +2347,12 @@ func sortBatch(bat *batch.Batch, sortIdx int, mp *mpool.MPool) error { // } //} -func newDataBatchWithBatch(src *batch.Batch) (data *batch.Batch) { +func newDataBatchWithBatch(src *batch.Batch, retainRowID bool) (data *batch.Batch) { data = batch.NewWithSize(0) + if retainRowID { + data.Attrs = append(data.Attrs, catalog.Row_ID) + data.Vecs = append(data.Vecs, vector.NewVec(types.T_Rowid.ToType())) + } data.Attrs = append(data.Attrs, src.Attrs[2:]...) for _, vec := range src.Vecs { if vec.GetType().Oid == types.T_Rowid || vec.GetType().Oid == types.T_TS { @@ -2375,32 +2431,53 @@ func appendFromEntry(src, vec *vector.Vector, offset int, mp *mpool.MPool) { } -func fillInInsertBatch(bat **batch.Batch, entry *RowEntry, mp *mpool.MPool) { +func fillInInsertBatch(bat **batch.Batch, entry *RowEntry, retainRowID bool, mp *mpool.MPool) { if *bat == nil { - (*bat) = newDataBatchWithBatch(entry.Batch) + (*bat) = newDataBatchWithBatch(entry.Batch, retainRowID) + } + dstOffset := 0 + if retainRowID { + appendFromEntry(entry.Batch.Vecs[0], (*bat).Vecs[0], int(entry.Offset), mp) + dstOffset = 1 } for i, vec := range entry.Batch.Vecs { if vec.GetType().Oid == types.T_Rowid || vec.GetType().Oid == types.T_TS { continue } - appendFromEntry(vec, (*bat).Vecs[i-2], int(entry.Offset), mp) + appendFromEntry(vec, (*bat).Vecs[i-2+dstOffset], int(entry.Offset), mp) } appendFromEntry(entry.Batch.Vecs[1], (*bat).Vecs[len((*bat).Vecs)-1], int(entry.Offset), mp) } -func fillInDeleteBatch(bat **batch.Batch, entry *RowEntry, mp *mpool.MPool) { +func fillInDeleteBatch(bat **batch.Batch, entry *RowEntry, retainRowID bool, mp *mpool.MPool) { pkVec := entry.Batch.Vecs[2] if *bat == nil { - (*bat) = batch.NewWithSize(2) - (*bat).SetAttributes([]string{ - objectio.TombstoneAttr_PK_Attr, - objectio.DefaultCommitTS_Attr, - }) - (*bat).Vecs[0] = vector.NewVec(*pkVec.GetType()) - (*bat).Vecs[1] = vector.NewVec(types.T_TS.ToType()) + vecCnt := 2 + attrs := []string{objectio.TombstoneAttr_PK_Attr, objectio.DefaultCommitTS_Attr} + if retainRowID { + vecCnt = 3 + attrs = []string{catalog.Row_ID, objectio.TombstoneAttr_PK_Attr, objectio.DefaultCommitTS_Attr} + } + (*bat) = batch.NewWithSize(vecCnt) + (*bat).SetAttributes(attrs) + if retainRowID { + (*bat).Vecs[0] = vector.NewVec(types.T_Rowid.ToType()) + (*bat).Vecs[1] = vector.NewVec(*pkVec.GetType()) + (*bat).Vecs[2] = vector.NewVec(types.T_TS.ToType()) + } else { + (*bat).Vecs[0] = vector.NewVec(*pkVec.GetType()) + (*bat).Vecs[1] = vector.NewVec(types.T_TS.ToType()) + } + } + pkIdx := 0 + tsIdx := 1 + if retainRowID { + appendFromEntry(entry.Batch.Vecs[0], (*bat).Vecs[0], int(entry.Offset), mp) + pkIdx = 1 + tsIdx = 2 } - appendFromEntry(pkVec, (*bat).Vecs[0], int(entry.Offset), mp) - vector.AppendFixed((*bat).Vecs[1], entry.Time, false, mp) + appendFromEntry(pkVec, (*bat).Vecs[pkIdx], int(entry.Offset), mp) + vector.AppendFixed((*bat).Vecs[tsIdx], entry.Time, false, mp) } // PXU TODO @@ -2439,19 +2516,132 @@ func prefetchObjects( return } -func updateTombstoneBatch(bat *batch.Batch, start, end types.TS, skipTS map[types.TS]struct{}, sort bool, mp *mpool.MPool) { - bat.Vecs[0].Free(mp) // rowid - //bat.Vecs[2].Free(mp) // phyaddr - bat.Vecs = []*vector.Vector{bat.Vecs[1], bat.Vecs[2]} - bat.Attrs = []string{ - objectio.TombstoneAttr_PK_Attr, - objectio.DefaultCommitTS_Attr} - applyTSFilterForBatch(bat, 1, skipTS, start, end) +func prependRowIDVectorIfNeeded(bat *batch.Batch, blk *types.Blockid, mp *mpool.MPool) error { + if bat == nil || blk == nil || bat.RowCount() == 0 { + return nil + } + firstRowIDIdx := -1 + rowIDCnt := 0 + for i, vec := range bat.Vecs { + if vec != nil && vec.GetType().Oid == types.T_Rowid { + rowIDCnt++ + if firstRowIDIdx == -1 { + firstRowIDIdx = i + } + } + } + if firstRowIDIdx >= 0 { + if rowIDCnt == 1 && firstRowIDIdx == 0 { + return nil + } + origVecs := bat.Vecs + rebuiltVecs := make([]*vector.Vector, 0, len(origVecs)-rowIDCnt+1) + rebuiltVecs = append(rebuiltVecs, origVecs[firstRowIDIdx]) + for i, vec := range origVecs { + if i == firstRowIDIdx { + continue + } + if vec != nil && vec.GetType().Oid == types.T_Rowid { + vec.Free(mp) + continue + } + rebuiltVecs = append(rebuiltVecs, vec) + } + bat.Vecs = rebuiltVecs + if len(bat.Attrs) == len(origVecs) { + rebuiltAttrs := make([]string, 0, len(rebuiltVecs)) + rebuiltAttrs = append(rebuiltAttrs, catalog.Row_ID) + for i, attr := range bat.Attrs { + if i == firstRowIDIdx { + continue + } + if origVecs[i] != nil && origVecs[i].GetType().Oid == types.T_Rowid { + continue + } + rebuiltAttrs = append(rebuiltAttrs, attr) + } + bat.Attrs = rebuiltAttrs + } + return nil + } + rowIDVec := vector.NewVec(types.T_Rowid.ToType()) + for i := 0; i < bat.RowCount(); i++ { + if err := vector.AppendFixed(rowIDVec, types.NewRowid(blk, uint32(i)), false, mp); err != nil { + rowIDVec.Free(mp) + return err + } + } + bat.Vecs = append([]*vector.Vector{rowIDVec}, bat.Vecs...) + if len(bat.Attrs) == len(bat.Vecs)-1 { + bat.Attrs = append([]string{catalog.Row_ID}, bat.Attrs...) + } + return nil +} + +func updateTombstoneBatch(bat *batch.Batch, start, end types.TS, skipTS map[types.TS]struct{}, sort bool, blk *types.Blockid, retainRowID bool, mp *mpool.MPool) error { + if retainRowID { + if err := prependRowIDVectorIfNeeded(bat, blk, mp); err != nil { + return err + } + } + var rowIDVec *vector.Vector + var pkVec *vector.Vector + var commitTSVec *vector.Vector + for _, vec := range bat.Vecs { + switch vec.GetType().Oid { + case types.T_Rowid: + if rowIDVec == nil { + rowIDVec = vec + } else { + vec.Free(mp) + } + case types.T_TS: + if commitTSVec == nil { + commitTSVec = vec + } else { + vec.Free(mp) + } + default: + if pkVec == nil { + pkVec = vec + } else { + vec.Free(mp) + } + } + } + if pkVec == nil || commitTSVec == nil || (retainRowID && rowIDVec == nil) { + return moerr.NewInternalErrorNoCtx("invalid tombstone batch layout for collect changes") + } + if retainRowID { + bat.Vecs = []*vector.Vector{rowIDVec, pkVec, commitTSVec} + bat.Attrs = []string{ + catalog.Row_ID, + objectio.TombstoneAttr_PK_Attr, + objectio.DefaultCommitTS_Attr, + } + applyTSFilterForBatch(bat, 2, skipTS, start, end) + } else { + if rowIDVec != nil { + rowIDVec.Free(mp) + } + bat.Vecs = []*vector.Vector{pkVec, commitTSVec} + bat.Attrs = []string{ + objectio.TombstoneAttr_PK_Attr, + objectio.DefaultCommitTS_Attr} + applyTSFilterForBatch(bat, 1, skipTS, start, end) + } if sort { - sortBatch(bat, 1, mp) + sortIdx := len(bat.Vecs) - 1 + return sortBatch(bat, sortIdx, mp) } + return nil } -func updateDataBatch(bat *batch.Batch, start, end types.TS, mp *mpool.MPool) { +func updateDataBatch(bat *batch.Batch, start, end types.TS, blk *types.Blockid, retainRowID bool, mp *mpool.MPool) error { + if retainRowID { + if err := prependRowIDVectorIfNeeded(bat, blk, mp); err != nil { + return err + } + } filteredVecs := make([]*vector.Vector, 0, len(bat.Vecs)) var commitTSVec *vector.Vector rebuildAttrs := len(bat.Attrs) == len(bat.Vecs) @@ -2461,7 +2651,14 @@ func updateDataBatch(bat *batch.Batch, start, end types.TS, mp *mpool.MPool) { for i, vec := range bat.Vecs { switch vec.GetType().Oid { case types.T_Rowid: - vec.Free(mp) + if retainRowID { + filteredVecs = append(filteredVecs, vec) + if rebuildAttrs { + filteredAttrs = append(filteredAttrs, bat.Attrs[i]) + } + } else { + vec.Free(mp) + } case types.T_TS: commitTSVec = vec if rebuildAttrs { @@ -2488,35 +2685,83 @@ func updateDataBatch(bat *batch.Batch, start, end types.TS, mp *mpool.MPool) { bat.Attrs = filteredAttrs } applyTSFilterForBatch(bat, len(bat.Vecs)-1, nil, start, end) + return nil } -func updateCNTombstoneBatch(bat *batch.Batch, committs types.TS, mp *mpool.MPool) { +func updateCNTombstoneBatch(bat *batch.Batch, committs types.TS, blk *types.Blockid, retainRowID bool, mp *mpool.MPool) error { + if bat == nil { + return moerr.NewInternalErrorNoCtx("updateCNTombstoneBatch: nil batch") + } + if retainRowID { + if err := prependRowIDVectorIfNeeded(bat, blk, mp); err != nil { + return err + } + } + var rowid *vector.Vector var pk *vector.Vector for _, vec := range bat.Vecs { - if vec.GetType().Oid != types.T_Rowid && vec.GetType().Oid != types.T_TS { - pk = vec - } else { + switch vec.GetType().Oid { + case types.T_Rowid: + if retainRowID { + rowid = vec + } else { + vec.Free(mp) + } + case types.T_TS: vec.Free(mp) + default: + pk = vec } } + if pk == nil { + return moerr.NewInternalErrorNoCtx("updateCNTombstoneBatch: tombstone batch missing pk vector") + } + if retainRowID && rowid == nil { + return moerr.NewInternalErrorNoCtx("updateCNTombstoneBatch: retainRowID set but rowid vector missing") + } commitTS, err := vector.NewConstFixed(types.T_TS.ToType(), committs, pk.Length(), mp) if err != nil { - return + return err } - bat.Vecs = []*vector.Vector{pk, commitTS} - bat.Attrs = []string{objectio.TombstoneAttr_PK_Attr, objectio.DefaultCommitTS_Attr} + if retainRowID { + bat.Vecs = []*vector.Vector{rowid, pk, commitTS} + bat.Attrs = []string{catalog.Row_ID, objectio.TombstoneAttr_PK_Attr, objectio.DefaultCommitTS_Attr} + } else { + bat.Vecs = []*vector.Vector{pk, commitTS} + bat.Attrs = []string{objectio.TombstoneAttr_PK_Attr, objectio.DefaultCommitTS_Attr} + } + return nil } -func updateCNDataBatch(bat *batch.Batch, commitTS types.TS, mp *mpool.MPool) { +func updateCNDataBatch(bat *batch.Batch, commitTS types.TS, blk *types.Blockid, retainRowID bool, mp *mpool.MPool) error { + if bat == nil { + return moerr.NewInternalErrorNoCtx("updateCNDataBatch: nil batch") + } for i, vec := range bat.Vecs { if vec.GetType().Oid == types.T_TS { vec.Free(mp) bat.Vecs = append(bat.Vecs[:i], bat.Vecs[i+1:]...) + if len(bat.Attrs) == len(bat.Vecs)+1 { + bat.Attrs = append(bat.Attrs[:i], bat.Attrs[i+1:]...) + } + break + } + } + if retainRowID { + if err := prependRowIDVectorIfNeeded(bat, blk, mp); err != nil { + return err } } + if len(bat.Vecs) == 0 { + return moerr.NewInternalErrorNoCtx("updateCNDataBatch: data batch has no vectors after stripping commit-ts") + } commitTSVec, err := vector.NewConstFixed(types.T_TS.ToType(), commitTS, bat.Vecs[0].Length(), mp) if err != nil { - return + return err } bat.Vecs = append(bat.Vecs, commitTSVec) + if len(bat.Attrs) == len(bat.Vecs)-1 { + bat.Attrs = append(bat.Attrs, objectio.DefaultCommitTS_Attr) + } + return nil } // TestGetObjectsFromCheckpointEntries exposes getObjectsFromCheckpointEntries for tests in other packages. diff --git a/pkg/vm/engine/disttae/logtailreplay/change_handle_test.go b/pkg/vm/engine/disttae/logtailreplay/change_handle_test.go index 858a3fc01bf73..bbdb57523f015 100644 --- a/pkg/vm/engine/disttae/logtailreplay/change_handle_test.go +++ b/pkg/vm/engine/disttae/logtailreplay/change_handle_test.go @@ -20,6 +20,7 @@ import ( "github.com/stretchr/testify/require" + "github.com/matrixorigin/matrixone/pkg/catalog" "github.com/matrixorigin/matrixone/pkg/common/moerr" "github.com/matrixorigin/matrixone/pkg/common/mpool" "github.com/matrixorigin/matrixone/pkg/container/batch" @@ -108,8 +109,8 @@ func TestUpdateCNTombstoneBatch_IsIdempotent(t *testing.T) { bat.SetRowCount(1) defer bat.Clean(mp) - updateCNTombstoneBatch(bat, types.BuildTS(20, 0), mp) - updateCNTombstoneBatch(bat, types.BuildTS(30, 0), mp) + require.NoError(t, updateCNTombstoneBatch(bat, types.BuildTS(20, 0), nil, false, mp)) + require.NoError(t, updateCNTombstoneBatch(bat, types.BuildTS(30, 0), nil, false, mp)) require.Equal(t, 2, len(bat.Vecs)) require.Equal(t, types.T_int64, bat.Vecs[0].GetType().Oid) @@ -150,7 +151,7 @@ func TestUpdateCNDataBatch_RemoveTSVector(t *testing.T) { // Call updateCNDataBatch newCommitTS := types.BuildTS(100, 0) - updateCNDataBatch(bat, newCommitTS, mp) + require.NoError(t, updateCNDataBatch(bat, newCommitTS, nil, false, mp)) // Verify T_TS vector is removed and new commitTS vector is added at the end require.Equal(t, 3, len(bat.Vecs)) @@ -189,7 +190,7 @@ func TestUpdateCNDataBatch_NoTSVector(t *testing.T) { // Call updateCNDataBatch newCommitTS := types.BuildTS(100, 0) - updateCNDataBatch(bat, newCommitTS, mp) + require.NoError(t, updateCNDataBatch(bat, newCommitTS, nil, false, mp)) // Verify commitTS vector is added at the end require.Equal(t, 3, len(bat.Vecs)) @@ -229,7 +230,7 @@ func TestUpdateDataBatch_PreservesTrailingColumnsWithoutRowid(t *testing.T) { bat.Vecs[3] = commitTS bat.SetRowCount(1) - updateDataBatch(bat, types.BuildTS(50, 0), types.BuildTS(150, 0), mp) + require.NoError(t, updateDataBatch(bat, types.BuildTS(50, 0), types.BuildTS(150, 0), nil, false, mp)) require.Equal(t, 4, len(bat.Vecs)) require.Equal(t, []string{"id", "created_at", "updated_at", objectio.DefaultCommitTS_Attr}, bat.Attrs) @@ -242,6 +243,43 @@ func TestUpdateDataBatch_PreservesTrailingColumnsWithoutRowid(t *testing.T) { bat.Clean(mp) } +func TestUpdateDataBatch_RetainsSynthesizedRowID(t *testing.T) { + mp := mpool.MustNewZero() + defer mpool.DeleteMPool(mp) + + bat := batch.NewWithSize(3) + bat.SetAttributes([]string{"a", "b", objectio.DefaultCommitTS_Attr}) + + aVec := vector.NewVec(types.T_int32.ToType()) + require.NoError(t, vector.AppendFixed(aVec, int32(4), false, mp)) + require.NoError(t, vector.AppendFixed(aVec, int32(5), false, mp)) + bat.Vecs[0] = aVec + + bVec := vector.NewVec(types.T_int32.ToType()) + require.NoError(t, vector.AppendFixed(bVec, int32(40), false, mp)) + require.NoError(t, vector.AppendFixed(bVec, int32(50), false, mp)) + bat.Vecs[1] = bVec + + tsVec := vector.NewVec(types.T_TS.ToType()) + require.NoError(t, vector.AppendFixed(tsVec, types.BuildTS(100, 0), false, mp)) + require.NoError(t, vector.AppendFixed(tsVec, types.BuildTS(200, 0), false, mp)) + bat.Vecs[2] = tsVec + bat.SetRowCount(2) + + blk := types.Blockid{} + require.NoError(t, updateDataBatch(bat, types.BuildTS(50, 0), types.BuildTS(150, 0), &blk, true, mp)) + + require.Equal(t, 4, len(bat.Vecs)) + require.Equal(t, catalog.Row_ID, bat.Attrs[0]) + require.Equal(t, types.T_Rowid, bat.Vecs[0].GetType().Oid) + require.Equal(t, 1, bat.Vecs[0].Length()) + + rowIDs := vector.MustFixedColNoTypeCheck[types.Rowid](bat.Vecs[0]) + require.Equal(t, types.NewRowid(&blk, 0), rowIDs[0]) + + bat.Clean(mp) +} + func TestAObjectHandleShouldReadBlock_UsesCachedPlan(t *testing.T) { obj := makeTestObjectEntry(t, 2, false, false, types.BuildTS(10, 0)) handle := &AObjectHandle{ @@ -1936,3 +1974,176 @@ func TestShouldReadBlock_WithBuildBlockPlan(t *testing.T) { require.NoError(t, err) require.False(t, read, "block doesn't overlap range → should NOT read") } + +func TestUpdateCNTombstoneBatch_NilGuard(t *testing.T) { + mp := mpool.MustNewZero() + + t.Run("nil batch", func(t *testing.T) { + err := updateCNTombstoneBatch(nil, types.BuildTS(1, 0), nil, false, mp) + require.Error(t, err) + }) + + t.Run("missing pk vector", func(t *testing.T) { + bat := batch.NewWithSize(1) + bat.Vecs[0] = vector.NewVec(types.T_TS.ToType()) + require.NoError(t, vector.AppendFixed(bat.Vecs[0], types.BuildTS(1, 0), false, mp)) + err := updateCNTombstoneBatch(bat, types.BuildTS(2, 0), nil, false, mp) + require.Error(t, err) + require.Contains(t, err.Error(), "missing pk vector") + }) +} + +func TestUpdateCNDataBatch_NilGuard(t *testing.T) { + mp := mpool.MustNewZero() + + t.Run("nil batch", func(t *testing.T) { + err := updateCNDataBatch(nil, types.BuildTS(1, 0), nil, false, mp) + require.Error(t, err) + }) + + t.Run("only ts column - no vectors after strip", func(t *testing.T) { + bat := batch.NewWithSize(1) + bat.Vecs[0] = vector.NewVec(types.T_TS.ToType()) + require.NoError(t, vector.AppendFixed(bat.Vecs[0], types.BuildTS(1, 0), false, mp)) + err := updateCNDataBatch(bat, types.BuildTS(2, 0), nil, false, mp) + require.Error(t, err) + require.Contains(t, err.Error(), "no vectors after stripping commit-ts") + }) +} + +// TestCDCSchema_NoRowIDWhenRetainRowIDFalse locks the CDC-facing batch shapes: +// the disttae.NewChangesHandler path leaves retainRowID=false, and every batch +// mutator must produce output that contains no T_Rowid column. Regressions here +// would silently break CDC sinks that index columns by position. +func TestCDCSchema_NoRowIDWhenRetainRowIDFalse(t *testing.T) { + mp := mpool.MustNewZero() + defer mpool.DeleteMPool(mp) + + assertNoRowID := func(t *testing.T, bat *batch.Batch) { + t.Helper() + for i, vec := range bat.Vecs { + require.NotEqual(t, types.T_Rowid, vec.GetType().Oid, + "CDC batch must not contain T_Rowid vec at idx=%d attrs=%v", i, bat.Attrs) + } + for _, attr := range bat.Attrs { + require.NotEqual(t, catalog.Row_ID, attr, + "CDC batch must not contain Row_ID attr; attrs=%v", bat.Attrs) + } + } + + t.Run("updateTombstoneBatch drops input rowid vec", func(t *testing.T) { + bat := batch.NewWithSize(3) + // Simulate a tombstone batch that happens to carry a rowid column. + ridVec := vector.NewVec(types.T_Rowid.ToType()) + blk := types.Blockid{} + require.NoError(t, vector.AppendFixed(ridVec, types.NewRowid(&blk, 0), false, mp)) + pkVec := vector.NewVec(types.T_int64.ToType()) + require.NoError(t, vector.AppendFixed(pkVec, int64(7), false, mp)) + tsVec := vector.NewVec(types.T_TS.ToType()) + require.NoError(t, vector.AppendFixed(tsVec, types.BuildTS(100, 0), false, mp)) + bat.Vecs[0] = ridVec + bat.Vecs[1] = pkVec + bat.Vecs[2] = tsVec + bat.SetRowCount(1) + + require.NoError(t, updateTombstoneBatch( + bat, types.BuildTS(50, 0), types.BuildTS(150, 0), nil, false, nil, false, mp)) + + require.Equal(t, []string{ + objectio.TombstoneAttr_PK_Attr, + objectio.DefaultCommitTS_Attr, + }, bat.Attrs) + require.Equal(t, 2, len(bat.Vecs)) + assertNoRowID(t, bat) + bat.Clean(mp) + }) + + t.Run("updateDataBatch drops input rowid vec", func(t *testing.T) { + bat := batch.NewWithSize(4) + bat.SetAttributes([]string{catalog.Row_ID, "a", "b", objectio.DefaultCommitTS_Attr}) + ridVec := vector.NewVec(types.T_Rowid.ToType()) + blk := types.Blockid{} + require.NoError(t, vector.AppendFixed(ridVec, types.NewRowid(&blk, 0), false, mp)) + aVec := vector.NewVec(types.T_int32.ToType()) + require.NoError(t, vector.AppendFixed(aVec, int32(1), false, mp)) + bVec := vector.NewVec(types.T_int32.ToType()) + require.NoError(t, vector.AppendFixed(bVec, int32(10), false, mp)) + tsVec := vector.NewVec(types.T_TS.ToType()) + require.NoError(t, vector.AppendFixed(tsVec, types.BuildTS(100, 0), false, mp)) + bat.Vecs[0] = ridVec + bat.Vecs[1] = aVec + bat.Vecs[2] = bVec + bat.Vecs[3] = tsVec + bat.SetRowCount(1) + + require.NoError(t, updateDataBatch( + bat, types.BuildTS(50, 0), types.BuildTS(150, 0), nil, false, mp)) + + assertNoRowID(t, bat) + // Trailing column must remain commit_ts. + require.Equal(t, types.T_TS, bat.Vecs[len(bat.Vecs)-1].GetType().Oid) + require.Equal(t, objectio.DefaultCommitTS_Attr, bat.Attrs[len(bat.Attrs)-1]) + bat.Clean(mp) + }) + + t.Run("fillInDeleteBatch produces 2-col layout", func(t *testing.T) { + // RowEntry.Batch layout expected by fillInDeleteBatch: [Rowid, TS, pk] + src := batch.NewWithSize(3) + ridVec := vector.NewVec(types.T_Rowid.ToType()) + blk := types.Blockid{} + require.NoError(t, vector.AppendFixed(ridVec, types.NewRowid(&blk, 0), false, mp)) + tsSrc := vector.NewVec(types.T_TS.ToType()) + require.NoError(t, vector.AppendFixed(tsSrc, types.BuildTS(100, 0), false, mp)) + pkVec := vector.NewVec(types.T_int64.ToType()) + require.NoError(t, vector.AppendFixed(pkVec, int64(42), false, mp)) + src.Vecs[0] = ridVec + src.Vecs[1] = tsSrc + src.Vecs[2] = pkVec + src.SetRowCount(1) + + entry := &RowEntry{Batch: src, Offset: 0, Time: types.BuildTS(100, 0)} + var out *batch.Batch + fillInDeleteBatch(&out, entry, false, mp) + require.NotNil(t, out) + require.Equal(t, []string{ + objectio.TombstoneAttr_PK_Attr, + objectio.DefaultCommitTS_Attr, + }, out.Attrs) + require.Equal(t, 2, len(out.Vecs)) + assertNoRowID(t, out) + out.Clean(mp) + src.Clean(mp) + }) + + t.Run("fillInInsertBatch omits rowid column", func(t *testing.T) { + // RowEntry.Batch layout: [Rowid, TS, pk, val]; src.Attrs[2:] = [pk, val] + src := batch.NewWithSize(4) + src.SetAttributes([]string{catalog.Row_ID, objectio.DefaultCommitTS_Attr, "pk", "val"}) + ridVec := vector.NewVec(types.T_Rowid.ToType()) + blk := types.Blockid{} + require.NoError(t, vector.AppendFixed(ridVec, types.NewRowid(&blk, 0), false, mp)) + tsSrc := vector.NewVec(types.T_TS.ToType()) + require.NoError(t, vector.AppendFixed(tsSrc, types.BuildTS(100, 0), false, mp)) + pkVec := vector.NewVec(types.T_int64.ToType()) + require.NoError(t, vector.AppendFixed(pkVec, int64(7), false, mp)) + valVec := vector.NewVec(types.T_int64.ToType()) + require.NoError(t, vector.AppendFixed(valVec, int64(70), false, mp)) + src.Vecs[0] = ridVec + src.Vecs[1] = tsSrc + src.Vecs[2] = pkVec + src.Vecs[3] = valVec + src.SetRowCount(1) + + entry := &RowEntry{Batch: src, Offset: 0, Time: types.BuildTS(100, 0)} + var out *batch.Batch + fillInInsertBatch(&out, entry, false, mp) + require.NotNil(t, out) + require.Equal(t, []string{"pk", "val", objectio.DefaultCommitTS_Attr}, out.Attrs) + require.Equal(t, 3, len(out.Vecs)) + assertNoRowID(t, out) + // Trailing column must be commit_ts. + require.Equal(t, types.T_TS, out.Vecs[len(out.Vecs)-1].GetType().Oid) + out.Clean(mp) + src.Clean(mp) + }) +} diff --git a/pkg/vm/engine/disttae/snapshot_materialized_datasource.go b/pkg/vm/engine/disttae/snapshot_materialized_datasource.go new file mode 100644 index 0000000000000..1e8d6822c6e52 --- /dev/null +++ b/pkg/vm/engine/disttae/snapshot_materialized_datasource.go @@ -0,0 +1,350 @@ +// Copyright 2026 Matrix Origin +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package disttae + +import ( + "context" + "fmt" + "slices" + + "github.com/matrixorigin/matrixone/pkg/common/mpool" + "github.com/matrixorigin/matrixone/pkg/container/batch" + "github.com/matrixorigin/matrixone/pkg/container/types" + "github.com/matrixorigin/matrixone/pkg/container/vector" + "github.com/matrixorigin/matrixone/pkg/fileservice" + "github.com/matrixorigin/matrixone/pkg/objectio" + "github.com/matrixorigin/matrixone/pkg/pb/plan" + "github.com/matrixorigin/matrixone/pkg/vm/engine" + "github.com/matrixorigin/matrixone/pkg/vm/engine/disttae/logtailreplay" + "github.com/matrixorigin/matrixone/pkg/vm/engine/readutil" +) + +type materializedSnapshotDataSource struct { + ctx context.Context + fs fileservice.FileService + pState *logtailreplay.PartitionState + currentTS types.TS + snapshotTS types.TS + tombstone engine.Tombstoner + remote engine.DataSource + + memPKFilter *readutil.MemPKFilter + orderBy []*plan.OrderBySpec + + built bool + mp *mpool.MPool + inMemBatches []*batch.Batch + inMemCursor int + + deletedRowsCache map[objectio.Blockid]*objectio.Bitmap +} + +func newMaterializedSnapshotDataSource( + ctx context.Context, + fs fileservice.FileService, + pState *logtailreplay.PartitionState, + currentTS types.TS, + snapshotTS types.TS, + relData engine.RelData, + tombstone engine.Tombstoner, +) engine.DataSource { + var remote engine.DataSource + if relData != nil && relData.DataCnt() > 0 { + remote = readutil.NewRemoteDataSource(ctx, fs, snapshotTS.ToTimestamp(), relData) + } + return &materializedSnapshotDataSource{ + ctx: ctx, + fs: fs, + pState: pState, + currentTS: currentTS, + snapshotTS: snapshotTS, + tombstone: tombstone, + remote: remote, + deletedRowsCache: make(map[objectio.Blockid]*objectio.Bitmap), + } +} + +func (ds *materializedSnapshotDataSource) String() string { + return fmt.Sprintf( + "MaterializedSnapshotDataSource{current-ts=%s,snapshot-ts=%s,inmem-batches=%d,inmem-cursor=%d,remote=%v}", + ds.currentTS.ToString(), + ds.snapshotTS.ToString(), + len(ds.inMemBatches), + ds.inMemCursor, + ds.remote != nil, + ) +} + +func (ds *materializedSnapshotDataSource) Next( + ctx context.Context, + cols []string, + colTypes []types.Type, + seqNums []uint16, + _ int32, + filter any, + mp *mpool.MPool, + outBatch *batch.Batch, +) (*objectio.BlockInfo, engine.DataState, error) { + if !ds.built { + ds.built = true + ds.mp = mp + if memFilter, ok := filter.(*readutil.MemPKFilter); ok { + ds.memPKFilter = memFilter + } + if err := ds.buildCommittedInMemBatches(ctx, cols, colTypes, seqNums, mp); err != nil { + return nil, engine.End, err + } + } + + if ds.inMemCursor < len(ds.inMemBatches) { + next := ds.inMemBatches[ds.inMemCursor] + ds.inMemCursor++ + outBatch.CleanOnlyData() + if _, err := outBatch.Append(ctx, mp, next); err != nil { + return nil, engine.End, err + } + return nil, engine.InMem, nil + } + + if ds.remote == nil { + return nil, engine.End, nil + } + return ds.remote.Next(ctx, cols, colTypes, seqNums, 0, filter, mp, outBatch) +} + +func (ds *materializedSnapshotDataSource) ApplyTombstones( + ctx context.Context, + bid *objectio.Blockid, + rowsOffset []int64, + _ engine.TombstoneApplyPolicy, +) ([]int64, error) { + if ds.remote != nil { + return ds.remote.ApplyTombstones(ctx, bid, rowsOffset, engine.Policy_CheckAll) + } + if ds.tombstone == nil || len(rowsOffset) == 0 { + return rowsOffset, nil + } + slices.Sort(rowsOffset) + left := ds.tombstone.ApplyInMemTombstones(bid, rowsOffset, nil) + return ds.tombstone.ApplyPersistedTombstones(ctx, ds.fs, &ds.snapshotTS, bid, left, nil) +} + +func (ds *materializedSnapshotDataSource) GetTombstones( + ctx context.Context, + bid *objectio.Blockid, +) (objectio.Bitmap, error) { + if ds.remote != nil { + return ds.remote.GetTombstones(ctx, bid) + } + bm := objectio.GetReusableBitmap() + if ds.tombstone == nil { + return bm, nil + } + ds.tombstone.ApplyInMemTombstones(bid, nil, &bm) + if _, err := ds.tombstone.ApplyPersistedTombstones(ctx, ds.fs, &ds.snapshotTS, bid, nil, &bm); err != nil { + bm.Release() + return objectio.Bitmap{}, err + } + return bm, nil +} + +func (ds *materializedSnapshotDataSource) SetOrderBy(orderby []*plan.OrderBySpec) { + ds.orderBy = orderby + if ds.remote != nil { + ds.remote.SetOrderBy(orderby) + } +} + +func (ds *materializedSnapshotDataSource) GetOrderBy() []*plan.OrderBySpec { + if ds.remote != nil { + return ds.remote.GetOrderBy() + } + return ds.orderBy +} + +func (ds *materializedSnapshotDataSource) SetFilterZM(zm objectio.ZoneMap) { + if ds.remote != nil { + ds.remote.SetFilterZM(zm) + } +} + +func (ds *materializedSnapshotDataSource) Close() { + if ds.remote != nil { + ds.remote.Close() + } + for _, bat := range ds.inMemBatches { + if bat != nil && ds.mp != nil { + bat.Clean(ds.mp) + } + } + ds.inMemBatches = nil + ds.inMemCursor = 0 + for bid, bm := range ds.deletedRowsCache { + if bm != nil { + bm.Release() + } + delete(ds.deletedRowsCache, bid) + } +} + +func (ds *materializedSnapshotDataSource) buildCommittedInMemBatches( + ctx context.Context, + attrs []string, + colTypes []types.Type, + seqNums []uint16, + mp *mpool.MPool, +) error { + if ds.pState == nil { + return nil + } + + var iter logtailreplay.RowsIter + if ds.memPKFilter != nil && ds.memPKFilter.Valid() { + iter = ds.pState.NewPrimaryKeyIter( + ds.snapshotTS, + ds.memPKFilter.Op(), + ds.memPKFilter.Keys(), + ) + } else { + iter = ds.pState.NewRowsIter(ds.snapshotTS, nil, false) + } + defer iter.Close() + + out := newMaterializedInMemBatch(attrs, colTypes) + totalRows := 0 + for iter.Next() { + entry := iter.Entry() + if ds.shouldSkipCommittedInMemEntry(entry) { + continue + } + deleted, err := ds.isDeletedByCommittedTombstones(ctx, entry.RowID) + if err != nil { + out.Clean(mp) + return err + } + if deleted { + continue + } + if out.RowCount() >= int(objectio.BlockMaxRows) { + ds.inMemBatches = append(ds.inMemBatches, out) + out = newMaterializedInMemBatch(attrs, colTypes) + } + if err := appendCommittedInMemEntry(out, entry, seqNums, mp); err != nil { + out.Clean(mp) + return err + } + totalRows++ + } + + if out.RowCount() > 0 { + ds.inMemBatches = append(ds.inMemBatches, out) + } else { + out.Clean(mp) + } + + if ds.memPKFilter != nil && totalRows == 1 { + ds.memPKFilter.RecordExactHit() + } + return nil +} + +func (ds *materializedSnapshotDataSource) shouldSkipCommittedInMemEntry( + entry *logtailreplay.RowEntry, +) bool { + if ds.memPKFilter == nil || !ds.memPKFilter.HasBF || ds.memPKFilter.BFSeqNum == -1 { + return false + } + bfColVec := entry.Batch.Vecs[2+ds.memPKFilter.BFSeqNum] + if bfColVec.IsNull(uint64(entry.Offset)) { + return true + } + return !ds.memPKFilter.FilterHint.BF.Test( + bfColVec.GetRawBytesAt(int(entry.Offset)), + ) +} + +func (ds *materializedSnapshotDataSource) isDeletedByCommittedTombstones( + ctx context.Context, + rowID objectio.Rowid, +) (bool, error) { + if ds.tombstone == nil { + return false, nil + } + bid := rowID.BorrowBlockID() + mask, ok := ds.deletedRowsCache[*bid] + if !ok { + bm := objectio.GetReusableBitmap() + ds.tombstone.ApplyInMemTombstones(bid, nil, &bm) + if _, err := ds.tombstone.ApplyPersistedTombstones( + ctx, + ds.fs, + &ds.snapshotTS, + bid, + nil, + &bm, + ); err != nil { + bm.Release() + return false, err + } + ds.deletedRowsCache[*bid] = &bm + mask = &bm + } + return mask.Contains(uint64(rowID.GetRowOffset())), nil +} + +func newMaterializedInMemBatch( + attrs []string, + colTypes []types.Type, +) *batch.Batch { + bat := batch.NewWithSize(len(attrs)) + bat.SetAttributes(attrs) + for i := range attrs { + bat.Vecs[i] = vector.NewVec(colTypes[i]) + } + return bat +} + +func appendCommittedInMemEntry( + out *batch.Batch, + entry *logtailreplay.RowEntry, + seqNums []uint16, + mp *mpool.MPool, +) error { + for i := range out.Attrs { + switch seqNums[i] { + case objectio.SEQNUM_ROWID: + if err := vector.AppendFixed(out.Vecs[i], entry.RowID, false, mp); err != nil { + return err + } + case objectio.SEQNUM_COMMITTS: + if err := vector.AppendFixed(out.Vecs[i], entry.Time, false, mp); err != nil { + return err + } + default: + idx := 2 + seqNums[i] + if int(idx) >= len(entry.Batch.Vecs) || entry.Batch.Attrs[idx] == "" { + if err := vector.AppendAny(out.Vecs[i], nil, true, mp); err != nil { + return err + } + continue + } + if err := out.Vecs[i].UnionOne(entry.Batch.Vecs[idx], entry.Offset, mp); err != nil { + return err + } + } + } + out.SetRowCount(out.Vecs[0].Length()) + return nil +} diff --git a/pkg/vm/engine/disttae/snapshot_materialized_datasource_test.go b/pkg/vm/engine/disttae/snapshot_materialized_datasource_test.go new file mode 100644 index 0000000000000..1c23353ddc388 --- /dev/null +++ b/pkg/vm/engine/disttae/snapshot_materialized_datasource_test.go @@ -0,0 +1,427 @@ +// Copyright 2026 Matrix Origin +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package disttae + +import ( + "bytes" + "context" + "testing" + + "github.com/matrixorigin/matrixone/pkg/common/mpool" + "github.com/matrixorigin/matrixone/pkg/container/batch" + "github.com/matrixorigin/matrixone/pkg/container/types" + "github.com/matrixorigin/matrixone/pkg/container/vector" + "github.com/matrixorigin/matrixone/pkg/fileservice" + "github.com/matrixorigin/matrixone/pkg/objectio" + "github.com/matrixorigin/matrixone/pkg/pb/api" + "github.com/matrixorigin/matrixone/pkg/sql/plan/function" + "github.com/matrixorigin/matrixone/pkg/vm/engine" + "github.com/matrixorigin/matrixone/pkg/vm/engine/disttae/logtailreplay" + "github.com/matrixorigin/matrixone/pkg/vm/engine/readutil" + "github.com/stretchr/testify/require" +) + +func TestMaterializedSnapshotDataSourceUsesSnapshotTSForInMemRows(t *testing.T) { + mp := mpool.MustNewZero() + defer mpool.DeleteMPool(mp) + + pState := newTestMaterializedSnapshotPartitionState(t, mp, []testMaterializedSnapshotRow{ + {rowID: buildTestMaterializedSnapshotRowID(t, 1), ts: types.BuildTS(15, 0), id: 1, payload: 100}, + {rowID: buildTestMaterializedSnapshotRowID(t, 2), ts: types.BuildTS(25, 0), id: 2, payload: 200}, + }) + + source := newMaterializedSnapshotDataSource( + context.Background(), + nil, + pState, + types.BuildTS(30, 0), + types.BuildTS(20, 0), + nil, + nil, + ) + + got := collectMaterializedSnapshotInt64Column( + t, + source, + []string{"payload"}, + []types.Type{types.T_int64.ToType()}, + []uint16{1}, + nil, + mp, + ) + require.Equal(t, []int64{100}, got) +} + +func TestMaterializedSnapshotDataSourceUsesSnapshotTSForPrimaryKeyFilter(t *testing.T) { + mp := mpool.MustNewZero() + defer mpool.DeleteMPool(mp) + + pState := newTestMaterializedSnapshotPartitionState(t, mp, []testMaterializedSnapshotRow{ + {rowID: buildTestMaterializedSnapshotRowID(t, 1), ts: types.BuildTS(15, 0), id: 1, payload: 100}, + {rowID: buildTestMaterializedSnapshotRowID(t, 2), ts: types.BuildTS(25, 0), id: 1, payload: 200}, + }) + + var filter readutil.MemPKFilter + packer := types.NewPacker() + filter.SetFullData(function.EQUAL, false, readutil.EncodePrimaryKey(int64(1), packer)) + packer.Close() + + source := newMaterializedSnapshotDataSource( + context.Background(), + nil, + pState, + types.BuildTS(30, 0), + types.BuildTS(20, 0), + nil, + nil, + ) + + got := collectMaterializedSnapshotInt64Column( + t, + source, + []string{"payload"}, + []types.Type{types.T_int64.ToType()}, + []uint16{1}, + &filter, + mp, + ) + require.Equal(t, []int64{100}, got) +} + +func TestMaterializedSnapshotDataSourceUsesSnapshotTSForPersistedTombstones(t *testing.T) { + snapshotTS := types.BuildTS(20, 0) + tombstoner := &recordingTombstoner{} + source := newMaterializedSnapshotDataSource( + context.Background(), + nil, + nil, + types.BuildTS(30, 0), + snapshotTS, + nil, + tombstoner, + ) + ds := source.(*materializedSnapshotDataSource) + + rowID := buildTestMaterializedSnapshotRowID(t, 9) + bid := rowID.BorrowBlockID() + + left, err := ds.ApplyTombstones(context.Background(), bid, []int64{1, 2}, engine.Policy_CheckAll) + require.NoError(t, err) + require.Equal(t, []int64{1, 2}, left) + + bm, err := ds.GetTombstones(context.Background(), bid) + require.NoError(t, err) + bm.Release() + + deleted, err := ds.isDeletedByCommittedTombstones(context.Background(), rowID) + require.NoError(t, err) + require.False(t, deleted) + + require.Equal(t, []types.TS{snapshotTS, snapshotTS, snapshotTS}, tombstoner.snapshots) +} + +type testMaterializedSnapshotRow struct { + rowID types.Rowid + ts types.TS + id int64 + payload int64 +} + +func newTestMaterializedSnapshotPartitionState( + t *testing.T, + mp *mpool.MPool, + rows []testMaterializedSnapshotRow, +) *logtailreplay.PartitionState { + t.Helper() + + pState := logtailreplay.NewPartitionState("", false, 42, false) + rowIDVec := vector.NewVec(types.T_Rowid.ToType()) + tsVec := vector.NewVec(types.T_TS.ToType()) + idVec := vector.NewVec(types.T_int64.ToType()) + payloadVec := vector.NewVec(types.T_int64.ToType()) + defer rowIDVec.Free(mp) + defer tsVec.Free(mp) + defer idVec.Free(mp) + defer payloadVec.Free(mp) + + for _, row := range rows { + require.NoError(t, vector.AppendFixed(rowIDVec, row.rowID, false, mp)) + require.NoError(t, vector.AppendFixed(tsVec, row.ts, false, mp)) + require.NoError(t, vector.AppendFixed(idVec, row.id, false, mp)) + require.NoError(t, vector.AppendFixed(payloadVec, row.payload, false, mp)) + } + + packer := types.NewPacker() + defer packer.Close() + pState.HandleRowsInsert(context.Background(), &api.Batch{ + Attrs: []string{"rowid", "time", "id", "payload"}, + Vecs: []api.Vector{ + mustVectorToProtoForMaterializedSnapshotTest(t, rowIDVec), + mustVectorToProtoForMaterializedSnapshotTest(t, tsVec), + mustVectorToProtoForMaterializedSnapshotTest(t, idVec), + mustVectorToProtoForMaterializedSnapshotTest(t, payloadVec), + }, + }, 0, packer, mp) + + return pState +} + +func collectMaterializedSnapshotInt64Column( + t *testing.T, + source engine.DataSource, + attrs []string, + colTypes []types.Type, + seqNums []uint16, + filter any, + mp *mpool.MPool, +) []int64 { + t.Helper() + defer source.Close() + + out := batch.NewWithSize(len(attrs)) + out.SetAttributes(attrs) + for i := range attrs { + out.Vecs[i] = vector.NewVec(colTypes[i]) + } + defer out.Clean(mp) + + var ret []int64 + for { + _, state, err := source.Next( + context.Background(), + attrs, + colTypes, + seqNums, + 0, + filter, + mp, + out, + ) + require.NoError(t, err) + if state == engine.End { + return ret + } + ret = append(ret, vector.MustFixedColWithTypeCheck[int64](out.Vecs[0])...) + } +} + +func buildTestMaterializedSnapshotRowID(t *testing.T, rowIdx uint32) types.Rowid { + t.Helper() + uid, err := types.BuildUuid() + require.NoError(t, err) + blkID := objectio.NewBlockid(&uid, 0, 1) + return types.NewRowid(blkID, rowIdx) +} + +func mustVectorToProtoForMaterializedSnapshotTest(t *testing.T, vec *vector.Vector) api.Vector { + t.Helper() + ret, err := vector.VectorToProtoVector(vec) + require.NoError(t, err) + return ret +} + +type recordingTombstoner struct { + snapshots []types.TS +} + +func (r *recordingTombstoner) Type() engine.TombstoneType { + return engine.TombstoneData +} + +func (r *recordingTombstoner) HasAnyInMemoryTombstone() bool { + return false +} + +func (r *recordingTombstoner) HasAnyTombstoneFile() bool { + return true +} + +func (r *recordingTombstoner) String() string { + return "recordingTombstoner" +} + +func (r *recordingTombstoner) StringWithPrefix(prefix string) string { + return prefix + "recordingTombstoner" +} + +func (r *recordingTombstoner) HasBlockTombstone(context.Context, *objectio.Blockid, fileservice.FileService) (bool, error) { + return false, nil +} + +func (r *recordingTombstoner) MarshalBinaryWithBuffer(*bytes.Buffer) error { + return nil +} + +func (r *recordingTombstoner) UnmarshalBinary([]byte) error { + return nil +} + +func (r *recordingTombstoner) PrefetchTombstones(string, fileservice.FileService, []objectio.Blockid) { +} + +func (r *recordingTombstoner) ApplyInMemTombstones(_ *types.Blockid, rowsOffset []int64, _ *objectio.Bitmap) []int64 { + return rowsOffset +} + +func (r *recordingTombstoner) ApplyPersistedTombstones( + _ context.Context, + _ fileservice.FileService, + snapshot *types.TS, + _ *types.Blockid, + rowsOffset []int64, + _ *objectio.Bitmap, +) ([]int64, error) { + r.snapshots = append(r.snapshots, *snapshot) + return rowsOffset, nil +} + +func (r *recordingTombstoner) Merge(engine.Tombstoner) error { + return nil +} + +func (r *recordingTombstoner) SortInMemory() {} + +// tsFilteringTombstoner reacts to the readTS argument. It deletes a row +// only when the read timestamp is >= the row's tombstone commit timestamp. +// This makes the snapshotTS vs currentTS distinction observable. +type tsFilteringTombstoner struct { + tombstones map[int64]types.TS // rowOffset -> tombstone commit ts +} + +func (t *tsFilteringTombstoner) Type() engine.TombstoneType { return engine.TombstoneData } +func (t *tsFilteringTombstoner) HasAnyInMemoryTombstone() bool { + return false +} +func (t *tsFilteringTombstoner) HasAnyTombstoneFile() bool { return true } +func (t *tsFilteringTombstoner) String() string { return "tsFilteringTombstoner" } +func (t *tsFilteringTombstoner) StringWithPrefix(p string) string { return p + t.String() } +func (t *tsFilteringTombstoner) HasBlockTombstone(context.Context, *objectio.Blockid, fileservice.FileService) (bool, error) { + return true, nil +} +func (t *tsFilteringTombstoner) MarshalBinaryWithBuffer(*bytes.Buffer) error { return nil } +func (t *tsFilteringTombstoner) UnmarshalBinary([]byte) error { return nil } +func (t *tsFilteringTombstoner) PrefetchTombstones(string, fileservice.FileService, []objectio.Blockid) { +} +func (t *tsFilteringTombstoner) ApplyInMemTombstones(_ *types.Blockid, rowsOffset []int64, _ *objectio.Bitmap) []int64 { + return rowsOffset +} +func (t *tsFilteringTombstoner) ApplyPersistedTombstones( + _ context.Context, + _ fileservice.FileService, + snapshot *types.TS, + _ *types.Blockid, + rowsOffset []int64, + deletedRows *objectio.Bitmap, +) ([]int64, error) { + left := rowsOffset[:0] + for _, off := range rowsOffset { + ts, ok := t.tombstones[off] + if ok && !ts.GT(snapshot) { + if deletedRows != nil { + deletedRows.Add(uint64(off)) + } + continue + } + left = append(left, off) + } + return left, nil +} +func (t *tsFilteringTombstoner) Merge(engine.Tombstoner) error { return nil } +func (t *tsFilteringTombstoner) SortInMemory() {} + +// TestMaterializedSnapshotDataSourcePersistedTombstoneFiltersBySnapshotTS proves +// that fallback uses snapshotTS (not currentTS) for persisted tombstone visibility. +// If the production code reverts to currentTS, this test fails. +func TestMaterializedSnapshotDataSourcePersistedTombstoneFiltersBySnapshotTS(t *testing.T) { + tomb := &tsFilteringTombstoner{ + tombstones: map[int64]types.TS{ + 1: types.BuildTS(25, 0), // tombstoned at 25, after snapshotTS=20 + }, + } + + source := newMaterializedSnapshotDataSource( + context.Background(), + nil, + nil, + types.BuildTS(30, 0), // currentTS — would mark row 1 deleted + types.BuildTS(20, 0), // snapshotTS — would NOT mark row 1 deleted + nil, + tomb, + ) + ds := source.(*materializedSnapshotDataSource) + + rowID := buildTestMaterializedSnapshotRowID(t, 9) + bid := rowID.BorrowBlockID() + + left, err := ds.ApplyTombstones(context.Background(), bid, []int64{0, 1}, engine.Policy_CheckAll) + require.NoError(t, err) + // snapshotTS=20 < tombstone(25), so row 1 must NOT be filtered out. + // If this test sees [0] only, the production code is reading at currentTS. + require.Equal(t, []int64{0, 1}, left, "snapshotTS must shield row 1 from a future tombstone at TS=25") + + bm, err := ds.GetTombstones(context.Background(), bid) + require.NoError(t, err) + // Same invariant via GetTombstones path. + require.Equal(t, 0, bm.Count(), "no row should be in tombstone bitmap at snapshotTS=20") + bm.Release() +} + +func TestMaterializedSnapshotDataSourceAccessors_NoRemote(t *testing.T) { + mp := mpool.MustNewZero() + pState := newTestMaterializedSnapshotPartitionState(t, mp, nil) + + src := newMaterializedSnapshotDataSource( + context.Background(), + nil, + pState, + types.BuildTS(30, 0), + types.BuildTS(20, 0), + nil, + nil, + ).(*materializedSnapshotDataSource) + + s := src.String() + require.Contains(t, s, "MaterializedSnapshotDataSource") + require.Contains(t, s, "snapshot-ts=") + require.Contains(t, s, "current-ts=") + + require.Nil(t, src.GetOrderBy()) + src.SetOrderBy(nil) + require.Nil(t, src.GetOrderBy()) + src.SetFilterZM(objectio.ZoneMap{}) +} + +func TestMaterializedSnapshotDataSourceAccessors_WithRemote(t *testing.T) { + mp := mpool.MustNewZero() + pState := newTestMaterializedSnapshotPartitionState(t, mp, nil) + + relData := newTestSnapshotRelData(1) + + src := newMaterializedSnapshotDataSource( + context.Background(), + nil, + pState, + types.BuildTS(30, 0), + types.BuildTS(20, 0), + relData, + nil, + ).(*materializedSnapshotDataSource) + require.NotNil(t, src.remote) + + src.SetOrderBy(nil) + _ = src.GetOrderBy() + src.SetFilterZM(objectio.ZoneMap{}) + _ = src.String() +} diff --git a/pkg/vm/engine/disttae/snapshot_scan.go b/pkg/vm/engine/disttae/snapshot_scan.go index 45a0d57a8883e..2bea0fb72e27a 100644 --- a/pkg/vm/engine/disttae/snapshot_scan.go +++ b/pkg/vm/engine/disttae/snapshot_scan.go @@ -62,13 +62,13 @@ type snapshotScanReaderConfig struct { // ScanSnapshotWithCurrentRanges reads rows at snapshotTS by reusing the current // relation handle and its current-view ranges. // -// Serial path (parallelism ≤ 1): Uses LocalDataSource which reads both -// in-memory partition-state rows and persisted S3 blocks, ensuring newly -// created catalog entries that have not been flushed are still visible. +// Snapshot scan reuses current-view disk ranges, but materializes the latest +// committed in-memory rows separately so reader fallback does not need to lazily +// consult historical pState state during scan execution. // -// Parallel path (parallelism > 1): Uses RemoteDataSource for disk blocks -// only. In-memory rows are typically flushed for the large tables that -// trigger parallelism. +// Serial path uses a hybrid source that emits committed in-memory rows first, +// then reads persisted blocks through RemoteDataSource. Parallel path emits the +// same committed in-memory rows once, then scans persisted shards concurrently. func ScanSnapshotWithCurrentRanges( ctx context.Context, caller string, @@ -113,6 +113,7 @@ func ScanSnapshotWithCurrentRanges( if err != nil { return err } + currentTS := types.TimestampToTS(tbl.db.op.SnapshotTS()) // Strip the EmptyBlockInfo marker that Ranges() may prepend. if relData != nil && relData.DataCnt() > 0 { @@ -137,7 +138,7 @@ func ScanSnapshotWithCurrentRanges( zap.Int("disk-block-cnt", diskBlockCnt), ) - tombstones, err := tbl.CollectTombstones(ctx, 0, engine.Policy_CollectAllTombstones) + tombstones, err := tbl.CollectTombstones(ctx, 0, engine.Policy_CollectCommittedTombstones) if err != nil { return err } @@ -157,13 +158,28 @@ func ScanSnapshotWithCurrentRanges( } actualParallelism := normalizeSnapshotScanParallelism(relData, scanParallelism) + // Attach tombstones BEFORE splitting: BlockListRelData.DataSlice copies + // the parent's tombstones pointer at split time, so shards created from a + // relData without tombstones would later read GetTombstones()==nil even if + // we attached to the parent afterwards. That would silently skip tombstone + // filtering in the parallel snapshot scan and surface deleted rows. + if relData != nil && relData.DataCnt() > 0 { + if err = relData.AttachTombstones(tombstones); err != nil { + return err + } + } shards := splitSnapshotScanShards(relData, actualParallelism) if len(shards) > 1 { - // Parallel path: disk-only via RemoteDataSource. - if err = relData.AttachTombstones(tombstones); err != nil { + if err = scanSnapshotCommittedInMem( + ctx, eng.fs, pState, currentTS, snapshotTS, tombstones, + readerCfg, mp, onBatch, + ); err != nil { return err } + if relData == nil || relData.DataCnt() == 0 { + return nil + } logutil.Info( "SnapshotScan-Parallel-Start", zap.String("caller", caller), @@ -196,10 +212,9 @@ func ScanSnapshotWithCurrentRanges( return nil } - // Serial path: LocalDataSource reads in-memory rows + disk blocks. - return scanSnapshotShardLocal( - ctx, eng.fs, tbl, pState, tombstones, - snapshotTS, relData, readerCfg, mp, onBatch, + return scanSnapshotShardWithCommittedInMem( + ctx, eng.fs, pState, currentTS, snapshotTS, relData, tombstones, + readerCfg, mp, onBatch, ) } @@ -351,41 +366,43 @@ func scanSnapshotShard( return readSnapshotWithSource(ctx, fs, source, snapshotTS, cfg, mp, onBatch) } -// scanSnapshotShardLocal uses LocalDataSource to read both in-memory -// partition-state rows and persisted S3 blocks in a single serial pass. -// The snapshotTS overrides the transaction's own snapshot so the reader -// applies the correct visibility window. -func scanSnapshotShardLocal( +func scanSnapshotCommittedInMem( ctx context.Context, fs fileservice.FileService, - tbl *txnTable, pState *logtailreplay.PartitionState, + currentTS types.TS, + snapshotTS types.TS, tombstones engine.Tombstoner, + cfg snapshotScanReaderConfig, + mp *mpool.MPool, + onBatch func(*batch.Batch) error, +) error { + source := newMaterializedSnapshotDataSource( + ctx, fs, pState, currentTS, snapshotTS, nil, tombstones, + ) + return readSnapshotWithSource(ctx, fs, source, snapshotTS, cfg, mp, onBatch) +} + +func scanSnapshotShardWithCommittedInMem( + ctx context.Context, + fs fileservice.FileService, + pState *logtailreplay.PartitionState, + currentTS types.TS, snapshotTS types.TS, relData engine.RelData, + tombstones engine.Tombstoner, cfg snapshotScanReaderConfig, mp *mpool.MPool, onBatch func(*batch.Batch) error, ) error { - var rangesSlice objectio.BlockInfoSlice - if relData != nil { - rangesSlice = relData.GetBlockInfoSlice() - } - - source, err := NewLocalDataSource( - ctx, tbl, - 0, // txnOffset: committed data only - pState, - rangesSlice, - tombstones, - false, // skipReadMem: include in-memory rows - 0, // tombstonePolicy: apply all - engine.ShardingRemoteDataSource, - ) - if err != nil { - return err + if relData != nil && relData.DataCnt() > 0 { + if err := relData.AttachTombstones(tombstones); err != nil { + return err + } } - source.snapshotTS = snapshotTS + source := newMaterializedSnapshotDataSource( + ctx, fs, pState, currentTS, snapshotTS, relData, tombstones, + ) return readSnapshotWithSource(ctx, fs, source, snapshotTS, cfg, mp, onBatch) } diff --git a/pkg/vm/engine/disttae/snapshot_scan_test.go b/pkg/vm/engine/disttae/snapshot_scan_test.go index 613c714650092..f61d9b557a5a6 100644 --- a/pkg/vm/engine/disttae/snapshot_scan_test.go +++ b/pkg/vm/engine/disttae/snapshot_scan_test.go @@ -141,6 +141,40 @@ func TestSplitSnapshotScanShards(t *testing.T) { require.Equal(t, relData.DataCnt(), totalBlocks) } +// TestSplitSnapshotScanShards_TombstoneInheritance pins the ordering invariant +// that callers must AttachTombstones BEFORE splitting. BlockListRelData.DataSlice +// snapshots the parent's tombstones pointer at split time, so attaching +// afterwards leaves shards with nil tombstones and silently drops tombstone +// filtering in the parallel snapshot scan. +func TestSplitSnapshotScanShards_TombstoneInheritance(t *testing.T) { + tombstones := &recordingTombstoner{} + + t.Run("attach before split: shards inherit tombstones", func(t *testing.T) { + relData := newTestSnapshotRelData(snapshotScanMinBlocksPerReader*4 + 1) + require.NoError(t, relData.AttachTombstones(tombstones)) + shards := splitSnapshotScanShards(relData, 4) + require.Greater(t, len(shards), 1) + for i, shard := range shards { + require.NotNil(t, shard.GetTombstones(), "shard[%d] missing tombstones", i) + require.Same(t, tombstones, shard.GetTombstones(), + "shard[%d] tombstones pointer mismatch", i) + } + }) + + t.Run("attach after split: shards do NOT see tombstones", func(t *testing.T) { + // This locks the documented hazard: if a future caller reorders these + // two operations, this test starts passing the inverse assertion below + // and surfaces the regression. + relData := newTestSnapshotRelData(snapshotScanMinBlocksPerReader*4 + 1) + shards := splitSnapshotScanShards(relData, 4) + require.NoError(t, relData.AttachTombstones(tombstones)) + for i, shard := range shards { + require.Nil(t, shard.GetTombstones(), + "shard[%d] should NOT have tombstones when attached after split", i) + } + }) +} + func TestAttrsToSeqnums(t *testing.T) { tableDef := &plan.TableDef{ Cols: []*plan.ColDef{ diff --git a/pkg/vm/engine/tae/blockio/read.go b/pkg/vm/engine/tae/blockio/read.go index 6a4fc4cc3ace7..93f6a67f5a490 100644 --- a/pkg/vm/engine/tae/blockio/read.go +++ b/pkg/vm/engine/tae/blockio/read.go @@ -835,18 +835,38 @@ func readBlockData( idxes, typs := excludePhyAddrColumn(colIndexes, colTypes, phyAddrColumnPos) + blockHasCommitTS := func() (bool, error) { + if info.IsAppendable() { + return true, nil + } + + location := info.MetaLocation() + meta, err2 := objectio.FastLoadObjectMeta(ctx, &location, false, fs) + if err2 != nil { + return false, err2 + } + dataMeta := meta.MustGetMeta(objectio.SchemaData) + blkMeta := dataMeta.GetBlockMeta(uint32(info.MetaLocation().ID())) + metaColCnt := blkMeta.GetMetaColumnCount() + if metaColCnt == 0 { + return false, nil + } + return blkMeta.ColumnMeta(metaColCnt-1).DataType() == uint8(types.T_TS), nil + } + readColumns := func( cols []uint16, + typs2 []types.Type, cacheVectors2 containers.Vectors, - ) (err2 error) { + ) (release2 func(), err2 error) { if len(cols) == 0 && phyAddrColumnPos >= 0 { // only read rowid column on non appendable block, return early - release = func() {} + release2 = func() {} return } - release, err2 = ioutil.LoadColumns( - ctx, cols, typs, fs, info.MetaLocation(), cacheVectors2, m, policy, + release2, err2 = ioutil.LoadColumns( + ctx, cols, typs2, fs, info.MetaLocation(), cacheVectors2, m, policy, ) if err2 != nil { return @@ -854,29 +874,58 @@ func readBlockData( return } - readABlkColumns := func( + readColumnsWithCommitTSFilter := func( cols []uint16, + typs2 []types.Type, cacheVectors2 containers.Vectors, ) ( deletes objectio.Bitmap, err2 error, ) { - // appendable block should be filtered by committs - //cols = append(cols, objectio.SEQNUM_COMMITTS, objectio.SEQNUM_ABORT) // committs, aborted - cols = append(cols, objectio.SEQNUM_COMMITTS) // committs, aborted - - // no need to add typs, the two columns won't be generated - if err2 = readColumns( - cols, cacheVectors2, - ); err2 != nil { + dataRelease, err2 := readColumns(cols, typs2, cacheVectors2) + if err2 != nil { return } + hasCommitTS, err2 := blockHasCommitTS() + if err2 != nil { + if dataRelease != nil { + dataRelease() + } + return + } + if !hasCommitTS { + release = dataRelease + deletes = objectio.GetReusableBitmap() + return + } + + commitTSVectors := containers.NewVectors(1) + releaseCommitTS, err2 := readColumns( + []uint16{objectio.SEQNUM_COMMITTS}, + []types.Type{types.T_TS.ToType()}, + commitTSVectors, + ) + if err2 != nil { + if dataRelease != nil { + dataRelease() + } + return + } + + release = func() { + if releaseCommitTS != nil { + releaseCommitTS() + } + if dataRelease != nil { + dataRelease() + } + } + deletes = objectio.GetReusableBitmap() t0 := time.Now() - //aborts := vector.MustFixedColWithTypeCheck[bool](loaded.Vecs[len(loaded.Vecs)-1]) - commits := vector.MustFixedColWithTypeCheck[types.TS](&cacheVectors2[len(cols)-1]) + commits := vector.MustFixedColWithTypeCheck[types.TS](&commitTSVectors[0]) for i := 0; i < len(commits); i++ { if commits[i].GT(&ts) { deletes.Add(uint64(i)) @@ -892,11 +941,7 @@ func readBlockData( return } - if info.IsAppendable() { - deleteMask, err = readABlkColumns(idxes, cacheVectors) - } else { - err = readColumns(idxes, cacheVectors) - } + deleteMask, err = readColumnsWithCommitTSFilter(idxes, typs, cacheVectors) return } diff --git a/pkg/vm/engine/tae/blockio/read_test.go b/pkg/vm/engine/tae/blockio/read_test.go index 3529da6cf4e18..da4c3a7874311 100644 --- a/pkg/vm/engine/tae/blockio/read_test.go +++ b/pkg/vm/engine/tae/blockio/read_test.go @@ -23,6 +23,8 @@ import ( "github.com/matrixorigin/matrixone/pkg/container/types" "github.com/matrixorigin/matrixone/pkg/container/vector" "github.com/matrixorigin/matrixone/pkg/objectio" + "github.com/matrixorigin/matrixone/pkg/objectio/ioutil" + "github.com/matrixorigin/matrixone/pkg/testutil" "github.com/matrixorigin/matrixone/pkg/vectorindex/metric" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/containers" "github.com/stretchr/testify/require" @@ -359,6 +361,128 @@ func TestBuildTopInputRows(t *testing.T) { }) } +func TestReadBlockDataFiltersNonAppendableByCommitTS(t *testing.T) { + ctx := context.Background() + fs := testutil.NewSharedFS() + mp := mpool.MustNewZero() + defer mpool.DeleteMPool(mp) + + writer := ioutil.ConstructWriter( + 0, + []uint16{0, objectio.SEQNUM_COMMITTS}, + -1, + false, + false, + fs, + ) + + input := batch.NewWithSize(2) + input.Vecs[0] = vector.NewVec(types.T_int32.ToType()) + input.Vecs[1] = vector.NewVec(types.T_TS.ToType()) + for _, row := range []struct { + val int32 + ts types.TS + }{ + {val: 11, ts: types.BuildTS(20, 0)}, + {val: 22, ts: types.BuildTS(10, 0)}, + {val: 33, ts: types.BuildTS(30, 0)}, + } { + require.NoError(t, vector.AppendFixed(input.Vecs[0], row.val, false, mp)) + require.NoError(t, vector.AppendFixed(input.Vecs[1], row.ts, false, mp)) + } + input.SetRowCount(3) + + _, err := writer.WriteBatch(input) + require.NoError(t, err) + blocks, _, err := writer.Sync(ctx) + require.NoError(t, err) + require.Len(t, blocks, 1) + + info := blocks[0].GenerateBlockInfo(writer.GetName(), false) + require.False(t, info.IsAppendable()) + + cacheVectors := containers.NewVectors(1) + deleteMask, release, err := readBlockData( + ctx, + []uint16{0}, + []types.Type{types.T_int32.ToType()}, + -1, + &info, + nil, + types.BuildTS(15, 0), + 0, + cacheVectors, + mp, + fs, + ) + require.NoError(t, err) + defer deleteMask.Release() + defer release() + + require.Equal(t, 2, deleteMask.Count()) + require.True(t, deleteMask.Contains(0)) + require.False(t, deleteMask.Contains(1)) + require.True(t, deleteMask.Contains(2)) + + vals := vector.MustFixedColWithTypeCheck[int32](&cacheVectors[0]) + require.Equal(t, []int32{11, 22, 33}, vals) +} + +func TestReadBlockDataNonAppendableWithoutCommitTS(t *testing.T) { + ctx := context.Background() + fs := testutil.NewSharedFS() + mp := mpool.MustNewZero() + defer mpool.DeleteMPool(mp) + + writer := ioutil.ConstructWriter( + 0, + []uint16{0}, + -1, + false, + false, + fs, + ) + + input := batch.NewWithSize(1) + input.Vecs[0] = vector.NewVec(types.T_int32.ToType()) + for _, row := range []int32{11, 22, 33} { + require.NoError(t, vector.AppendFixed(input.Vecs[0], row, false, mp)) + } + input.SetRowCount(3) + + _, err := writer.WriteBatch(input) + require.NoError(t, err) + blocks, _, err := writer.Sync(ctx) + require.NoError(t, err) + require.Len(t, blocks, 1) + + info := blocks[0].GenerateBlockInfo(writer.GetName(), false) + require.False(t, info.IsAppendable()) + + cacheVectors := containers.NewVectors(1) + deleteMask, release, err := readBlockData( + ctx, + []uint16{0}, + []types.Type{types.T_int32.ToType()}, + -1, + &info, + nil, + types.BuildTS(15, 0), + 0, + cacheVectors, + mp, + fs, + ) + require.NoError(t, err) + defer deleteMask.Release() + defer release() + + require.Zero(t, deleteMask.Count()) + + vals := vector.MustFixedColWithTypeCheck[int32](&cacheVectors[0]) + require.Equal(t, []int32{11, 22, 33}, vals) +} + // TestHandleOrderByLimitAllNullVectors verifies that HandleOrderByLimitOnIVFFlatIndex // returns empty sels/dists when all vector rows are NULL. // This is the root cause of the IVF-Flat entries table panic: when the InMem diff --git a/test/distributed/cases/git4data/branch/diff/diff_9.result b/test/distributed/cases/git4data/branch/diff/diff_9.result index 610f0098fd723..976cae20e6a26 100644 --- a/test/distributed/cases/git4data/branch/diff/diff_9.result +++ b/test/distributed/cases/git4data/branch/diff/diff_9.result @@ -330,4 +330,103 @@ data branch diff c3_tar against c3_src output count; 178 drop table c3_src; drop table c3_tar; +create table t1(a int, b int, primary key(a)); +insert into t1 values(1, 1), (2, 2), (3, 3); +data branch create table t2 from t1; +insert into t2 values(4, 4), (5, 5); +data branch diff t2 against t1; +diff t2 against t1 flag a b +t2 INSERT 4 4 +t2 INSERT 5 5 +data branch merge t2 into t1; +data branch diff t2 against t1; +diff t2 against t1 flag a b +update t1 set b = b + 1 where a = 4; +data branch diff t2 against t1; +diff t2 against t1 flag a b +t2 INSERT 4 4 +t1 INSERT 4 5 +select mo_ctl('dn', 'flush', 'test_gc_diff.t2'); +➤ mo_ctl(dn, flush, test_gc_diff.t2)[12,-1,0] 𝄀 +{ + "method": "Flush", + "result": [ + { + "returnStr": "OK" + } + ] +} + +select mo_ctl('dn', 'flush', 'test_gc_diff.t1'); +➤ mo_ctl(dn, flush, test_gc_diff.t1)[12,-1,0] 𝄀 +{ + "method": "Flush", + "result": [ + { + "returnStr": "OK" + } + ] +} + +select mo_ctl('dn', 'globalcheckpoint', ''); +➤ mo_ctl(dn, globalcheckpoint, )[12,-1,0] 𝄀 +{ + "method": "GlobalCheckpoint", + "result": [ + { + "returnStr": "OK" + } + ] +} + +select mo_ctl('dn', 'globalcheckpoint', ''); +➤ mo_ctl(dn, globalcheckpoint, )[12,-1,0] 𝄀 +{ + "method": "GlobalCheckpoint", + "result": [ + { + "returnStr": "OK" + } + ] +} + +select mo_ctl('dn', 'diskcleaner', 'force_gc'); +➤ mo_ctl(dn, diskcleaner, force_gc)[12,-1,0] 𝄀 +{ + "method": "DiskCleaner", + "result": [ + { + "returnStr": "OK" + } + ] +} + +select mo_ctl('dn', 'globalcheckpoint', ''); +➤ mo_ctl(dn, globalcheckpoint, )[12,-1,0] 𝄀 +{ + "method": "GlobalCheckpoint", + "result": [ + { + "returnStr": "OK" + } + ] +} + +select mo_ctl('dn', 'diskcleaner', 'force_gc'); +➤ mo_ctl(dn, diskcleaner, force_gc)[12,-1,0] 𝄀 +{ + "method": "DiskCleaner", + "result": [ + { + "returnStr": "OK" + } + ] +} + +data branch diff t2 against t1; +diff t2 against t1 flag a b +t2 INSERT 4 4 +t1 INSERT 4 5 +drop table t1; +drop table t2; drop database test_gc_diff; diff --git a/test/distributed/cases/git4data/branch/diff/diff_9.sql b/test/distributed/cases/git4data/branch/diff/diff_9.sql index 0f0d77b490a07..649c6424b9612 100644 --- a/test/distributed/cases/git4data/branch/diff/diff_9.sql +++ b/test/distributed/cases/git4data/branch/diff/diff_9.sql @@ -132,4 +132,37 @@ data branch diff c3_tar against c3_src output count; drop table c3_src; drop table c3_tar; +-- Case 4: merged branch inserts must remain INSERT after GC even if base updates same PK +create table t1(a int, b int, primary key(a)); +insert into t1 values(1, 1), (2, 2), (3, 3); +data branch create table t2 from t1; +insert into t2 values(4, 4), (5, 5); + +data branch diff t2 against t1; +data branch merge t2 into t1; +data branch diff t2 against t1; + +update t1 set b = b + 1 where a = 4; +data branch diff t2 against t1; + +-- @ignore:0 +select mo_ctl('dn', 'flush', 'test_gc_diff.t2'); +-- @ignore:0 +select mo_ctl('dn', 'flush', 'test_gc_diff.t1'); +-- @ignore:0 +select mo_ctl('dn', 'globalcheckpoint', ''); +-- @ignore:0 +select mo_ctl('dn', 'globalcheckpoint', ''); +-- @ignore:0 +select mo_ctl('dn', 'diskcleaner', 'force_gc'); +-- @ignore:0 +select mo_ctl('dn', 'globalcheckpoint', ''); +-- @ignore:0 +select mo_ctl('dn', 'diskcleaner', 'force_gc'); + +data branch diff t2 against t1; + +drop table t1; +drop table t2; + drop database test_gc_diff;