Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions pkg/pb/plan/plan.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

110 changes: 110 additions & 0 deletions pkg/sql/colexec/multi_update/affected_rows_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
// Copyright 2021-2024 Matrix Origin
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package multi_update

import (
"testing"

"github.com/stretchr/testify/require"
)

// Validates that REPLACE INTO counts both inserted and deleted rows toward
// AffectedRows, while regular UPDATE / DELETE / INSERT are unchanged.
func TestAddAffectRows_PerAction(t *testing.T) {
cases := []struct {
name string
action actionType
isReplace bool
tableType UpdateTableType
insertRows uint64
deleteRows uint64
wantAfterInsFn uint64 // after only addInsertAffectRows
wantAfterDelFn uint64 // after addInsertAffectRows + addDeleteAffectRows
}{
{
name: "INSERT main table",
action: actionInsert,
tableType: UpdateMainTable,
insertRows: 5,
deleteRows: 0,
wantAfterInsFn: 5,
wantAfterDelFn: 5,
},
{
name: "DELETE main table",
action: actionDelete,
tableType: UpdateMainTable,
insertRows: 0,
deleteRows: 7,
wantAfterInsFn: 0,
wantAfterDelFn: 7,
},
{
name: "UPDATE main table counts insert side only",
action: actionUpdate,
isReplace: false,
tableType: UpdateMainTable,
insertRows: 3,
deleteRows: 3,
wantAfterInsFn: 3,
wantAfterDelFn: 3,
},
{
name: "REPLACE main table counts insert + delete",
action: actionUpdate,
isReplace: true,
tableType: UpdateMainTable,
insertRows: 4,
deleteRows: 2,
wantAfterInsFn: 4,
wantAfterDelFn: 6,
},
{
name: "REPLACE unique index table is skipped",
action: actionUpdate,
isReplace: true,
tableType: UpdateUniqueIndexTable,
insertRows: 10,
deleteRows: 10,
wantAfterInsFn: 0,
wantAfterDelFn: 0,
},
}

for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
op := &MultiUpdate{
MultiUpdateCtx: []*MultiUpdateCtx{{IsReplace: tc.isReplace}},
}
op.addAffectedRowsFunc = op.doAddAffectedRows
op.ctr.action = tc.action

op.addInsertAffectRows(tc.tableType, tc.insertRows)
require.Equal(t, tc.wantAfterInsFn, op.GetAffectedRows(),
"after addInsertAffectRows")

op.addDeleteAffectRows(tc.tableType, tc.deleteRows)
require.Equal(t, tc.wantAfterDelFn, op.GetAffectedRows(),
"after addDeleteAffectRows")
})
}
}

// isReplace must tolerate an empty MultiUpdateCtx slice (defensive — shouldn't
// happen in practice but the operator must not panic on it).
func TestIsReplace_EmptyCtx(t *testing.T) {
op := &MultiUpdate{}
require.False(t, op.isReplace())
}
35 changes: 21 additions & 14 deletions pkg/sql/colexec/multi_update/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,10 @@ type MultiUpdateCtx struct {
InsertCols []int
DeleteCols []int
PartitionCols []int
// IsReplace marks this ctx as belonging to a REPLACE INTO statement's main
// table. Used by addDeleteAffectRows to count replaced-out rows toward
// affected_rows (MySQL semantics: affected_rows = inserted + deleted).
IsReplace bool
}

func (update MultiUpdate) TypeName() string {
Expand Down Expand Up @@ -190,13 +194,8 @@ func (update *MultiUpdate) addInsertAffectRows(tableType UpdateTableType, rowCou
if tableType != UpdateMainTable {
return
}
// For REPLACE INTO, we always count INSERT rows, regardless of update.ctr.action
// because REPLACE INTO should return at least the number of rows being inserted
switch update.ctr.action {
case actionInsert:
update.addAffectedRowsFunc(rowCount)
case actionUpdate:
// For REPLACE INTO with both DELETE and INSERT, count INSERT rows
case actionInsert, actionUpdate:
update.addAffectedRowsFunc(rowCount)
}
}
Expand All @@ -205,19 +204,27 @@ func (update *MultiUpdate) addDeleteAffectRows(tableType UpdateTableType, rowCou
if tableType != UpdateMainTable {
return
}
// For REPLACE INTO, we don't count DELETE rows in affected rows
// REPLACE INTO should only return the number of INSERT rows
// Only count DELETE rows for regular UPDATE operations
switch update.ctr.action {
case actionDelete:
// Regular DELETE operation, count it
update.addAffectedRowsFunc(rowCount)
case actionUpdate:
// For UPDATE operations (not REPLACE INTO), count DELETE rows
// But for REPLACE INTO, this should not be called or should be ignored
// REPLACE INTO uses actionUpdate but should only count INSERT
// So we don't count DELETE here for actionUpdate
// MySQL REPLACE semantics: affected_rows = inserted + deleted.
// Regular UPDATE still counts only the insert side (== rows_matched),
// so we only add the delete side when this MULTI_UPDATE was produced
// by REPLACE INTO.
if update.isReplace() {
update.addAffectedRowsFunc(rowCount)
}
}
}

// isReplace reports whether this MultiUpdate was produced by REPLACE INTO.
// The flag lives on the main-table UpdateCtx (index 0) — see bind_replace.go.
func (update *MultiUpdate) isReplace() bool {
if len(update.MultiUpdateCtx) == 0 {
return false
}
return update.MultiUpdateCtx[0].IsReplace
}

func (update *MultiUpdate) doAddAffectedRows(affectedRows uint64) {
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/compile/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -823,6 +823,7 @@ func constructMultiUpdate(
InsertCols: insertCols,
DeleteCols: deleteCols,
PartitionCols: partitionCols,
IsReplace: updateCtx.IsReplace,
}
}
arg.Action = action
Expand Down
10 changes: 6 additions & 4 deletions pkg/sql/compile/remoterun.go
Original file line number Diff line number Diff line change
Expand Up @@ -774,8 +774,9 @@ func convertToPipelineInstruction(op vm.Operator, proc *process.Process, ctx *sc
updateCtxList := make([]*plan.UpdateCtx, len(t.MultiUpdateCtx))
for i, muCtx := range t.MultiUpdateCtx {
updateCtxList[i] = &plan.UpdateCtx{
ObjRef: muCtx.ObjRef,
TableDef: muCtx.TableDef,
ObjRef: muCtx.ObjRef,
TableDef: muCtx.TableDef,
IsReplace: muCtx.IsReplace,
}

updateCtxList[i].InsertCols = make([]plan.ColRef, len(muCtx.InsertCols))
Expand Down Expand Up @@ -1230,8 +1231,9 @@ func convertToVmOperator(opr *pipeline.Instruction, ctx *scopeContext, eng engin
for i, muCtx := range t.UpdateCtxList {

arg.MultiUpdateCtx[i] = &multi_update.MultiUpdateCtx{
ObjRef: muCtx.ObjRef,
TableDef: muCtx.TableDef,
ObjRef: muCtx.ObjRef,
TableDef: muCtx.TableDef,
IsReplace: muCtx.IsReplace,
}

arg.MultiUpdateCtx[i].InsertCols = make([]int, len(muCtx.InsertCols))
Expand Down
Loading
Loading