Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 122 additions & 1 deletion pkg/sql/plan/apply_indices.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

"github.com/matrixorigin/matrixone/pkg/catalog"
"github.com/matrixorigin/matrixone/pkg/pb/plan"
"github.com/matrixorigin/matrixone/pkg/vm/message"
)

const (
Expand All @@ -40,6 +41,12 @@ type specialIndexGuard struct {
scanNodeIDs []int32
}

type regularIndexTopSortContext struct {
sortNode *plan.Node
sortProjectNode *plan.Node
scanNode *plan.Node
}

// calculatePostFilterOverFetchFactor returns the over-fetch multiplier based on limit size
// for vector index queries with post-filtering (filters applied after index search).
// Smaller limits need more over-fetching due to higher variance in filtering results.
Expand Down Expand Up @@ -440,12 +447,126 @@ func (builder *QueryBuilder) applyIndicesForProject(nodeID int32, projNode *plan
END0:
// 2. Regular Index Check
{

if ctx := builder.buildRegularIndexTopSortContext(projNode); ctx != nil {
builder.applyRegularIndexTopSort(ctx)
}
}

return nodeID, nil
}

func (builder *QueryBuilder) buildRegularIndexTopSortContext(projNode *plan.Node) *regularIndexTopSortContext {
sortNode := builder.resolveSortNode(projNode, 1)
if sortNode == nil || len(sortNode.OrderBy) != 1 || sortNode.Limit == nil || sortNode.Offset != nil || sortNode.RankOption != nil {
return nil
}

scanNode := builder.resolveScanNodeWithIndex(sortNode, 1)
if scanNode == nil || !scanNode.IndexScanInfo.IsIndexScan || scanNode.IndexScanInfo.IsUnique || len(scanNode.BindingTags) == 0 || len(scanNode.OrderBy) != 0 {
return nil
}

// Non-unique regular secondary index tables are laid out as:
// col0 = hidden serialized key (index parts + base-table PK)
// col1 = base-table PK
// Only under this layout can ORDER BY PK be rewritten to the hidden key safely.
if len(scanNode.TableDef.Cols) < 2 ||
scanNode.TableDef.Cols[0].Name != catalog.IndexTableIndexColName ||
scanNode.TableDef.Cols[1].Name != catalog.IndexTablePrimaryColName {
return nil
}

if len(scanNode.IndexScanInfo.Parts) < 2 || len(scanNode.FilterList) == 0 {
return nil
}

if len(sortNode.Children) != 1 {
return nil
}
sortProjectNode := builder.qry.Nodes[sortNode.Children[0]]
if sortProjectNode.NodeType != plan.Node_PROJECT || len(sortProjectNode.BindingTags) == 0 {
return nil
}

orderByCol := sortNode.OrderBy[0].Expr.GetCol()
if orderByCol == nil || orderByCol.RelPos != sortProjectNode.BindingTags[0] || int(orderByCol.ColPos) >= len(sortProjectNode.ProjectList) {
return nil
}

orderExpr := sortProjectNode.ProjectList[orderByCol.ColPos]
orderExprCol := orderExpr.GetCol()
if orderExprCol == nil || orderExprCol.RelPos != scanNode.BindingTags[0] || orderExprCol.ColPos != 1 {
return nil
}

numKeyParts := len(scanNode.IndexScanInfo.Parts) - 1
if !isRegularIndexFullPrefixEquality(scanNode.FilterList[0], numKeyParts) {
return nil
}

return &regularIndexTopSortContext{
sortNode: sortNode,
sortProjectNode: sortProjectNode,
scanNode: scanNode,
}
}

func isRegularIndexFullPrefixEquality(expr *plan.Expr, numKeyParts int) bool {
if numKeyParts <= 0 || expr == nil {
return false
}
fn := expr.GetF()
if fn == nil || fn.Func.ObjName != "prefix_eq" || len(fn.Args) != 2 {
return false
}
serialFn := fn.Args[1].GetF()
return serialFn != nil && serialFn.Func.ObjName == "serial" && len(serialFn.Args) == numKeyParts
}

func hasTopValueMessage(node *plan.Node) bool {
for i := range node.SendMsgList {
if node.SendMsgList[i].MsgType == int32(message.MsgTopValue) {
return true
}
}
return false
}

func (builder *QueryBuilder) applyRegularIndexTopSort(ctx *regularIndexTopSortContext) {
hiddenKeyName := builder.getColName(ctx.sortNode.OrderBy[0].Expr.GetCol())
if hiddenKeyName == "" {
hiddenKeyName = catalog.IndexTableIndexColName
}

projectHiddenKeyExpr := GetColExpr(ctx.scanNode.TableDef.Cols[0].Typ, ctx.scanNode.BindingTags[0], 0)
projectHiddenKeyExpr.GetCol().Name = hiddenKeyName

sortProjectTag := ctx.sortProjectNode.BindingTags[0]
sortProjectColPos := int32(len(ctx.sortProjectNode.ProjectList))
ctx.sortProjectNode.ProjectList = append(ctx.sortProjectNode.ProjectList, projectHiddenKeyExpr)
builder.nameByColRef[[2]int32{sortProjectTag, sortProjectColPos}] = hiddenKeyName

sortHiddenKeyExpr := GetColExpr(ctx.scanNode.TableDef.Cols[0].Typ, sortProjectTag, sortProjectColPos)
sortHiddenKeyExpr.GetCol().Name = hiddenKeyName
ctx.sortNode.OrderBy[0].Expr = sortHiddenKeyExpr

scanHiddenKeyExpr := GetColExpr(ctx.scanNode.TableDef.Cols[0].Typ, ctx.scanNode.BindingTags[0], 0)
scanHiddenKeyExpr.GetCol().Name = ctx.scanNode.TableDef.Cols[0].Name
ctx.scanNode.OrderBy = append(ctx.scanNode.OrderBy, &plan.OrderBySpec{
Expr: scanHiddenKeyExpr,
Flag: ctx.sortNode.OrderBy[0].Flag,
})

if !hasTopValueMessage(ctx.sortNode) {
msgHeader := plan.MsgHeader{
MsgTag: builder.genNewMsgTag(),
MsgType: int32(message.MsgTopValue),
}
ctx.sortNode.SendMsgList = append([]plan.MsgHeader{msgHeader}, ctx.sortNode.SendMsgList...)
ctx.scanNode.RecvMsgList = append(ctx.scanNode.RecvMsgList, msgHeader)
}
}

func (builder *QueryBuilder) detectFullTextGuard(projNode *plan.Node) []int32 {
var sortNode, aggNode *plan.Node
scanNode := builder.resolveScanNodeFromProject(projNode, 1)
Expand Down
209 changes: 209 additions & 0 deletions pkg/sql/plan/apply_indices_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,16 @@
package plan

import (
"context"
"reflect"
"testing"

"github.com/matrixorigin/matrixone/pkg/catalog"
"github.com/matrixorigin/matrixone/pkg/container/types"
planpb "github.com/matrixorigin/matrixone/pkg/pb/plan"
"github.com/matrixorigin/matrixone/pkg/vm/message"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestSuspendScanProtection_RestoresExactCount(t *testing.T) {
Expand Down Expand Up @@ -281,6 +286,210 @@ func TestCalculatePostFilterOverFetchFactor_ActualValues(t *testing.T) {
}
}

func makeTestRegularIndexPrefixEq(t *testing.T, numArgs int) *planpb.Expr {
t.Helper()
args := make([]*planpb.Expr, 0, numArgs)
for i := 0; i < numArgs; i++ {
args = append(args, &planpb.Expr{
Typ: planpb.Type{Id: int32(types.T_int32)},
Expr: &planpb.Expr_Lit{
Lit: &planpb.Literal{
Value: &planpb.Literal_I32Val{I32Val: int32(i + 1)},
},
},
})
}
serialExpr, err := BindFuncExprImplByPlanExpr(context.Background(), "serial", args)
require.NoError(t, err)
prefixExpr, err := BindFuncExprImplByPlanExpr(context.Background(), "prefix_eq", []*planpb.Expr{
GetColExpr(planpb.Type{Id: int32(types.T_varchar), Width: types.MaxVarcharLen}, 100, 0),
serialExpr,
})
require.NoError(t, err)
return prefixExpr
}

func makeTestRegularIndexProjectBuilder(
t *testing.T,
prefixArgCount int,
projectExpr *planpb.Expr,
sortFlag planpb.OrderBySpec_OrderByFlag,
) (*QueryBuilder, int32) {
t.Helper()

builder := NewQueryBuilder(planpb.Query_SELECT, NewMockCompilerContext(true), false, true)
builder.nameByColRef[[2]int32{200, 0}] = "id"

scanNode := &planpb.Node{
NodeType: planpb.Node_TABLE_SCAN,
NodeId: 0,
TableDef: &planpb.TableDef{
Cols: []*planpb.ColDef{
{
Name: catalog.IndexTableIndexColName,
Typ: planpb.Type{Id: int32(types.T_varchar), Width: types.MaxVarcharLen},
},
{
Name: catalog.IndexTablePrimaryColName,
Typ: planpb.Type{Id: int32(types.T_int64)},
},
},
Indexes: []*planpb.IndexDef{{IndexName: "idx_user_active"}},
},
BindingTags: []int32{100},
FilterList: []*planpb.Expr{makeTestRegularIndexPrefixEq(t, prefixArgCount)},
IndexScanInfo: planpb.IndexScanInfo{
IsIndexScan: true,
IndexName: "idx_user_active",
BelongToTable: "events",
Parts: []string{"user_id", "is_active", "id"},
IsUnique: false,
IndexTableName: "__mo_index_secondary_idx_user_active",
},
}

sortProjectNode := &planpb.Node{
NodeType: planpb.Node_PROJECT,
NodeId: 1,
BindingTags: []int32{200},
Children: []int32{0},
ProjectList: []*planpb.Expr{projectExpr},
}

sortNode := &planpb.Node{
NodeType: planpb.Node_SORT,
NodeId: 2,
Children: []int32{1},
OrderBy: []*planpb.OrderBySpec{
{
Expr: GetColExpr(planpb.Type{Id: int32(types.T_int64)}, 200, 0),
Flag: sortFlag,
},
},
Limit: &planpb.Expr{
Typ: planpb.Type{Id: int32(types.T_uint64)},
Expr: &planpb.Expr_Lit{
Lit: &planpb.Literal{
Value: &planpb.Literal_U64Val{U64Val: 20},
},
},
},
}

projNode := &planpb.Node{
NodeType: planpb.Node_PROJECT,
NodeId: 3,
Children: []int32{2},
}

builder.qry.Nodes = []*planpb.Node{scanNode, sortProjectNode, sortNode, projNode}
return builder, 3
}

func TestApplyIndicesForProjectPushesTopValueThroughRegularIndexPKOrder(t *testing.T) {
builder, rootNodeID := makeTestRegularIndexProjectBuilder(
t,
2,
GetColExpr(planpb.Type{Id: int32(types.T_int64)}, 100, 1),
planpb.OrderBySpec_DESC,
)

_, err := builder.applyIndicesForProject(rootNodeID, builder.qry.Nodes[rootNodeID], map[[2]int32]int{}, map[[2]int32]*planpb.Expr{})
require.NoError(t, err)

scanNode := builder.qry.Nodes[0]
sortProjectNode := builder.qry.Nodes[1]
sortNode := builder.qry.Nodes[2]

require.Len(t, sortNode.SendMsgList, 1)
assert.Equal(t, int32(message.MsgTopValue), sortNode.SendMsgList[0].MsgType)
require.Len(t, scanNode.RecvMsgList, 1)
assert.Equal(t, sortNode.SendMsgList[0], scanNode.RecvMsgList[0])

require.Len(t, scanNode.OrderBy, 1)
scanOrderCol := scanNode.OrderBy[0].Expr.GetCol()
require.NotNil(t, scanOrderCol)
assert.Equal(t, int32(100), scanOrderCol.RelPos)
assert.Equal(t, int32(0), scanOrderCol.ColPos)
assert.Equal(t, catalog.IndexTableIndexColName, scanOrderCol.Name)
assert.Equal(t, planpb.OrderBySpec_DESC, scanNode.OrderBy[0].Flag)

sortOrderCol := sortNode.OrderBy[0].Expr.GetCol()
require.NotNil(t, sortOrderCol)
assert.Equal(t, int32(200), sortOrderCol.RelPos)
assert.Equal(t, int32(1), sortOrderCol.ColPos)

require.Len(t, sortProjectNode.ProjectList, 2)
hiddenKeyProjectCol := sortProjectNode.ProjectList[1].GetCol()
require.NotNil(t, hiddenKeyProjectCol)
assert.Equal(t, int32(100), hiddenKeyProjectCol.RelPos)
assert.Equal(t, int32(0), hiddenKeyProjectCol.ColPos)
assert.Equal(t, "id", builder.nameByColRef[[2]int32{200, 1}])
}

func TestApplyIndicesForProjectPushesTopValueThroughRegularIndexPKOrderAsc(t *testing.T) {
builder, rootNodeID := makeTestRegularIndexProjectBuilder(
t,
2,
GetColExpr(planpb.Type{Id: int32(types.T_int64)}, 100, 1),
0,
)

_, err := builder.applyIndicesForProject(rootNodeID, builder.qry.Nodes[rootNodeID], map[[2]int32]int{}, map[[2]int32]*planpb.Expr{})
require.NoError(t, err)

scanNode := builder.qry.Nodes[0]
sortNode := builder.qry.Nodes[2]

require.Len(t, sortNode.SendMsgList, 1)
require.Len(t, scanNode.OrderBy, 1)
assert.Equal(t, planpb.OrderBySpec_OrderByFlag(0), sortNode.OrderBy[0].Flag)
assert.Equal(t, planpb.OrderBySpec_OrderByFlag(0), scanNode.OrderBy[0].Flag)
assert.Equal(t, catalog.IndexTableIndexColName, scanNode.OrderBy[0].Expr.GetCol().Name)
}

func TestApplyIndicesForProjectSkipsRegularIndexPKOrderWithoutFullPrefixEquality(t *testing.T) {
builder, rootNodeID := makeTestRegularIndexProjectBuilder(
t,
1,
GetColExpr(planpb.Type{Id: int32(types.T_int64)}, 100, 1),
planpb.OrderBySpec_DESC,
)

_, err := builder.applyIndicesForProject(rootNodeID, builder.qry.Nodes[rootNodeID], map[[2]int32]int{}, map[[2]int32]*planpb.Expr{})
require.NoError(t, err)

scanNode := builder.qry.Nodes[0]
sortProjectNode := builder.qry.Nodes[1]
sortNode := builder.qry.Nodes[2]

assert.Empty(t, sortNode.SendMsgList)
assert.Empty(t, scanNode.RecvMsgList)
assert.Empty(t, scanNode.OrderBy)
require.Len(t, sortProjectNode.ProjectList, 1)
}

func TestApplyIndicesForProjectSkipsRegularIndexPKOrderForNonPKSortColumn(t *testing.T) {
builder, rootNodeID := makeTestRegularIndexProjectBuilder(
t,
2,
GetColExpr(planpb.Type{Id: int32(types.T_varchar), Width: types.MaxVarcharLen}, 100, 0),
planpb.OrderBySpec_DESC,
)

_, err := builder.applyIndicesForProject(rootNodeID, builder.qry.Nodes[rootNodeID], map[[2]int32]int{}, map[[2]int32]*planpb.Expr{})
require.NoError(t, err)

scanNode := builder.qry.Nodes[0]
sortProjectNode := builder.qry.Nodes[1]
sortNode := builder.qry.Nodes[2]

assert.Empty(t, sortNode.SendMsgList)
assert.Empty(t, scanNode.RecvMsgList)
assert.Empty(t, scanNode.OrderBy)
require.Len(t, sortProjectNode.ProjectList, 1)
}

// Benchmark the function to ensure it's fast
func BenchmarkCalculatePostFilterOverFetchFactor(b *testing.B) {
limits := []uint64{1, 5, 10, 20, 50, 100, 200, 500, 1000, 10000}
Expand Down
Loading