Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion pkg/frontend/data_branch.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,15 +140,19 @@ func (retBatchPool *retBatchList) acquireRetBatch(tblStuff tableStuff, forTombst

if retBatchPool.dataVecCnt == 0 {
retBatchPool.dataVecCnt = len(tblStuff.def.colNames)
retBatchPool.tombVecCnt = 1
retBatchPool.tombVecCnt = 3
retBatchPool.dataTypes = tblStuff.def.colTypes
retBatchPool.tombstoneType = tblStuff.def.colTypes[tblStuff.def.pkColIdx]
retBatchPool.tombRowIDType = types.T_Rowid.ToType()
retBatchPool.tombKeyType = types.T_varbinary.ToType()
}

if forTombstone {
if len(retBatchPool.tList) == 0 {
bat = batch.NewWithSize(retBatchPool.tombVecCnt)
bat.Vecs[0] = vector.NewVec(retBatchPool.tombstoneType)
bat.Vecs[1] = vector.NewVec(retBatchPool.tombRowIDType)
bat.Vecs[2] = vector.NewVec(retBatchPool.tombKeyType)
goto done
}

Expand All @@ -161,6 +165,12 @@ func (retBatchPool *retBatchList) acquireRetBatch(tblStuff tableStuff, forTombst
if !typeMatched(bat.Vecs[0], retBatchPool.tombstoneType) {
panic(moerr.NewInternalErrorNoCtxf("retBatchPool: tombstone vec type mismatch, got %v expect %v", bat.Vecs[0].GetType(), retBatchPool.tombstoneType))
}
if !typeMatched(bat.Vecs[1], retBatchPool.tombRowIDType) {
panic(moerr.NewInternalErrorNoCtxf("retBatchPool: tombstone rowid vec type mismatch, got %v expect %v", bat.Vecs[1].GetType(), retBatchPool.tombRowIDType))
}
if !typeMatched(bat.Vecs[2], retBatchPool.tombKeyType) {
panic(moerr.NewInternalErrorNoCtxf("retBatchPool: tombstone key vec type mismatch, got %v expect %v", bat.Vecs[2].GetType(), retBatchPool.tombKeyType))
}

bat.CleanOnlyData()
goto done
Expand Down
253 changes: 224 additions & 29 deletions pkg/frontend/data_branch_hashdiff.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,10 +305,14 @@ func handleDelsOnLCA(
)

if len(sels) == 0 {
tBat.Vecs[0].CleanOnlyData()
for i := range tBat.Vecs {
tBat.Vecs[i].CleanOnlyData()
}
tBat.SetRowCount(0)
} else {
tBat.Vecs[0].Shrink(sels, false)
for i := range tBat.Vecs {
tBat.Vecs[i].Shrink(sels, false)
}
tBat.SetRowCount(tBat.Vecs[0].Length())
}
logutil.Debug(
Expand Down Expand Up @@ -1210,12 +1214,48 @@ func findDeleteAndUpdateBat(
// merge inserts and deletes on the tar
// this deletes is not on the lca
if tombBat.RowCount() > 0 {
if _, err2 = dataHashmap.PopByVectorsStream(
[]*vector.Vector{tombBat.Vecs[0]}, false, nil,
); err2 != nil {
tblStuff.retPool.releaseRetBatch(tombBat, true)
tblStuff.retPool.releaseRetBatch(dBat, false)
return err2
removedLiveRows := 0
tombRowIDs := vector.MustFixedColNoTypeCheck[types.Rowid](tombBat.Vecs[1])
for i := 0; i < tombBat.RowCount(); i++ {
keyBytes := tombBat.Vecs[2].GetBytesAt(i)
var dataRet databranchutils.GetResult
if dataRet, err2 = dataHashmap.GetByEncodedKey(keyBytes); err2 != nil {
tblStuff.retPool.releaseRetBatch(tombBat, true)
tblStuff.retPool.releaseRetBatch(dBat, false)
return err2
}
if !dataRet.Exists {
continue
}
var matchedRow []byte
for _, row := range dataRet.Rows {
var liveTuple types.Tuple
if liveTuple, _, err2 = dataHashmap.DecodeRow(row); err2 != nil {
tblStuff.retPool.releaseRetBatch(tombBat, true)
tblStuff.retPool.releaseRetBatch(dBat, false)
return err2
}
liveRowIDBytes, ok := liveTuple[0].([]uint8)
if !ok {
continue
}
liveRowID := types.DecodeFixed[types.Rowid](liveRowIDBytes)
if !liveRowID.EQ(&tombRowIDs[i]) {
continue
}
matchedRow = row
break
}
if matchedRow == nil {
continue
}
var removed int
if removed, err2 = dataHashmap.PopByEncodedKeyValue(keyBytes, matchedRow, false); err2 != nil {
tblStuff.retPool.releaseRetBatch(tombBat, true)
tblStuff.retPool.releaseRetBatch(dBat, false)
return err2
}
removedLiveRows += removed
}
tblStuff.retPool.releaseRetBatch(tombBat, true)
}
Expand Down Expand Up @@ -1347,7 +1387,7 @@ func findDeleteAndUpdateBat(
return nil
}

if err2 = cursor.ForEach(func(key []byte, _ []byte) error {
if err2 = cursor.ForEach(func(key []byte, row []byte) error {
select {
case <-ctx.Done():
return ctx.Err()
Expand All @@ -1360,6 +1400,21 @@ func findDeleteAndUpdateBat(
if err2 = vector.AppendAny(tBat1.Vecs[0], tuple[0], false, ses.proc.Mp()); err2 != nil {
return err2
}
var tombTuple types.Tuple
if tombTuple, _, err2 = tombstoneHashmap.DecodeRow(row); err2 != nil {
return err2
}
rowIDBytes, ok := tombTuple[0].([]uint8)
if !ok {
return moerr.NewInternalErrorNoCtx("tombstone row missing rowid in hashmap payload")
}
rowID := types.DecodeFixed[types.Rowid](rowIDBytes)
if err2 = vector.AppendFixed(tBat1.Vecs[1], rowID, false, ses.proc.Mp()); err2 != nil {
return err2
}
if err2 = vector.AppendBytes(tBat1.Vecs[2], append([]byte(nil), key...), false, ses.proc.Mp()); err2 != nil {
return err2
}

tBat1.SetRowCount(tBat1.Vecs[0].Length())
if tBat1.RowCount() >= maxTombstoneBatchCnt {
Expand Down Expand Up @@ -1391,21 +1446,18 @@ func findDeleteAndUpdateBat(
}

func appendTupleToBat(ses *Session, bat *batch.Batch, tuple types.Tuple, tblStuff tableStuff) error {
limit := len(tuple)
if limit == bat.VectorCount()+1 {
// Some change-collection paths keep commit ts in the encoded payload while
// others only materialize visible columns. Trim the trailing commit ts only
// when it is actually present.
limit--
}
if limit != bat.VectorCount() {
if bat.VectorCount() != len(tblStuff.def.colNames) {
return moerr.NewInternalErrorNoCtxf(
"unexpected tuple width %d for batch with %d vectors on table %s",
len(tuple), bat.VectorCount(), tblStuff.tarRel.GetTableName(),
"unexpected batch width %d for table %s with %d physical columns",
bat.VectorCount(), tblStuff.tarRel.GetTableName(), len(tblStuff.def.colNames),
)
}
for j, val := range tuple[:limit] {
vec := bat.Vecs[j]
for colIdx := range tblStuff.def.colNames {
val, err := getTupleColumnValue(tuple, tblStuff, colIdx)
if err != nil {
return err
}
vec := bat.Vecs[colIdx]
if err := appendTupleValueToVector(vec, val, ses.proc.Mp()); err != nil {
return err
}
Expand All @@ -1416,6 +1468,113 @@ func appendTupleToBat(ses *Session, bat *batch.Batch, tuple types.Tuple, tblStuf
return nil
}

func getTupleColumnValue(tuple types.Tuple, tblStuff tableStuff, colIdx int) (any, error) {
totalColCnt := len(tblStuff.def.colTypes)
if colIdx < 0 || colIdx >= totalColCnt {
return nil, moerr.NewInternalErrorNoCtxf(
"column index %d out of range for table %s with %d columns",
colIdx, tblStuff.tarRel.GetTableName(), totalColCnt,
)
}
switch len(tuple) {
case totalColCnt, totalColCnt + 1:
return tuple[colIdx], nil
case totalColCnt + 2:
return tuple[colIdx+1], nil
default:
return nil, moerr.NewInternalErrorNoCtxf(
"unexpected tuple width %d for table %s with %d visible columns",
len(tuple), tblStuff.tarRel.GetTableName(), len(tblStuff.def.visibleIdxes),
)
}
}

func visibleTupleKeyIdxes(tblStuff tableStuff) []int {
idxes := make([]int, len(tblStuff.def.visibleIdxes))
for i, colIdx := range tblStuff.def.visibleIdxes {
idxes[i] = colIdx + 1
}
return idxes
}

func batchSampleRowsForLog(bat *batch.Batch, limit int) []string {
if bat == nil || bat.RowCount() == 0 || limit <= 0 {
return nil
}
if limit > bat.RowCount() {
limit = bat.RowCount()
}
rows := make([]string, 0, limit)
for rowIdx := 0; rowIdx < limit; rowIdx++ {
cols := make([]string, 0, bat.VectorCount())
for _, vec := range bat.Vecs {
if vec == nil {
cols = append(cols, "<nil>")
continue
}
if rowIdx >= vec.Length() {
cols = append(cols, "<oob>")
continue
}
if vec.GetNulls().Contains(uint64(rowIdx)) {
cols = append(cols, "NULL")
continue
}
cols = append(cols, fmt.Sprintf("%v", types.DecodeValue(vec.GetRawBytesAt(rowIdx), vec.GetType().Oid)))
}
rows = append(rows, strings.Join(cols, ", "))
}
return rows
}

func validateLeadingRowID(side, tableName string, isTombstone bool, bat *batch.Batch) error {
if bat == nil || bat.RowCount() == 0 {
return nil
}
buildFields := func() []zap.Field {
return []zap.Field{
zap.String("side", side),
zap.String("table-name", tableName),
zap.Bool("tombstone", isTombstone),
zap.Int("row-cnt", bat.RowCount()),
zap.Int("vec-cnt", bat.VectorCount()),
zap.Strings("attrs", append([]string(nil), bat.Attrs...)),
zap.Strings("samples", batchSampleRowsForLog(bat, 4)),
}
}
fail := func(msg string, extra ...zap.Field) error {
logutil.Error(msg, append(buildFields(), extra...)...)
return moerr.NewInternalErrorNoCtx(msg)
}
if bat.VectorCount() == 0 || bat.Vecs[0] == nil {
return fail("DataBranch-CollectChanges-MissingRowID")
}
rowIDVec := bat.Vecs[0]
if rowIDVec.GetType().Oid != types.T_Rowid {
return fail("DataBranch-CollectChanges-InvalidRowIDVector",
zap.String("rowid-vec-type", rowIDVec.GetType().String()),
)
}
if rowIDVec.Length() != bat.RowCount() {
return fail("DataBranch-CollectChanges-RowIDLenMismatch",
zap.Int("rowid-vec-len", rowIDVec.Length()),
)
}
rowIDs := vector.MustFixedColNoTypeCheck[types.Rowid](rowIDVec)
for i := range rowIDs {
if rowIDVec.GetNulls().Contains(uint64(i)) {
return fail("DataBranch-CollectChanges-NullRowID", zap.Int("row-idx", i))
}
if rowIDs[i].EQ(&types.EmptyRowid) || rowIDs[i].BorrowBlockID().IsEmpty() {
return fail("DataBranch-CollectChanges-InvalidRowID",
zap.Int("row-idx", i),
zap.String("rowid", rowIDs[i].String()),
)
}
}
return nil
}

func appendTupleValueToVector(vec *vector.Vector, val any, mp *mpool.MPool) error {
if val == nil {
return vector.AppendNull(vec, mp)
Expand All @@ -1441,7 +1600,12 @@ func checkConflictAndAppendToBat(
case tree.CONFLICT_FAIL:
buf := acquireBuffer(tblStuff.bufPool)
for i, idx := range tblStuff.def.pkColIdxes {
if err2 = formatValIntoString(ses, tarTuple[idx], tblStuff.def.colTypes[idx], buf); err2 != nil {
var val any
if val, err2 = getTupleColumnValue(tarTuple, tblStuff, idx); err2 != nil {
releaseBuffer(tblStuff.bufPool, buf)
return err2
}
if err2 = formatValIntoString(ses, val, tblStuff.def.colTypes[idx], buf); err2 != nil {
releaseBuffer(tblStuff.bufPool, buf)
return err2
}
Expand Down Expand Up @@ -1491,7 +1655,7 @@ func diffDataHelper(

if tblStuff.def.pkKind == fakeKind {
var (
keyIdxes = tblStuff.def.visibleIdxes
keyIdxes = visibleTupleKeyIdxes(tblStuff)
newHashmap databranchutils.BranchHashmap
)

Expand All @@ -1502,6 +1666,14 @@ func diffDataHelper(
return err
}
baseDataHashmap = newHashmap

if newHashmap, err = tarDataHashmap.Migrate(keyIdxes, -1); err != nil {
return err
}
if err = tarDataHashmap.Close(); err != nil {
return err
}
tarDataHashmap = newHashmap
}

if err = tarDataHashmap.ForEachShardParallel(func(cursor databranchutils.ShardCursor) error {
Expand All @@ -1526,7 +1698,7 @@ func diffDataHelper(
}

if tblStuff.def.pkKind == fakeKind {
if checkRet, err2 = baseDataHashmap.PopByEncodedFullValue(row, false); err2 != nil {
if checkRet, err2 = baseDataHashmap.PopByEncodedKey(key, false); err2 != nil {
return err2
}
} else {
Expand Down Expand Up @@ -1565,9 +1737,16 @@ func diffDataHelper(
// pk columns already compared
continue
}
var tarVal, baseVal any
if tarVal, err2 = getTupleColumnValue(tarTuple, tblStuff, idx); err2 != nil {
return err2
}
if baseVal, err2 = getTupleColumnValue(baseTuple, tblStuff, idx); err2 != nil {
return err2
}

if types.CompareValue(
tarTuple[idx], baseTuple[idx],
tarVal, baseVal,
) != 0 {
notSame = true
break
Expand Down Expand Up @@ -1813,8 +1992,11 @@ func buildHashmapForTable(
if pickKeyHashmap != nil {
pkIdx := tblStuff.def.pkColIdx
if isTombstone {
// After updateTombstoneBatch, tombstone PK is at Vec[0].
pkIdx = 0
// Data branch keeps rowid in Vec[0] while building the
// hashmaps, so tombstone PK lives at Vec[1].
pkIdx = 1
} else {
pkIdx++
}
var results []databranchutils.GetResult
results, taskErr = pickKeyHashmap.GetByVectors(
Expand All @@ -1840,9 +2022,9 @@ func buildHashmapForTable(
}

if isTombstone {
taskErr = tombstoneHashmap.PutByVectors(bat.Vecs[:ll], []int{0})
taskErr = tombstoneHashmap.PutByVectors(bat.Vecs[:ll], []int{1})
} else {
taskErr = dataHashmap.PutByVectors(bat.Vecs[:ll], []int{tblStuff.def.pkColIdx})
taskErr = dataHashmap.PutByVectors(bat.Vecs[:ll], []int{tblStuff.def.pkColIdx + 1})
}
}
if taskErr != nil {
Expand Down Expand Up @@ -1873,6 +2055,19 @@ func buildHashmapForTable(
break
}

tableName := ""
if side == "base" && tblStuff.baseRel != nil {
tableName = tblStuff.baseRel.GetTableName()
} else if side == "target" && tblStuff.tarRel != nil {
tableName = tblStuff.tarRel.GetTableName()
}
if err = validateLeadingRowID(side, tableName, false, dataBat); err != nil {
return
}
if err = validateLeadingRowID(side, tableName, true, tombstoneBat); err != nil {
return
}

if dataBat != nil && dataBat.RowCount() > 0 {
totalRows += int64(dataBat.RowCount())
totalBytes += int64(dataBat.Size())
Expand Down
Loading
Loading