Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions pkg/bootstrap/versions/v4_0_0/cluster_upgrade_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
)

var clusterUpgEntries = []versions.UpgradeEntry{
upg_mo_indexes_add_included_columns_for_cluster,
upg_mo_iscp_log_new,
upg_mo_iscp_task,
upg_mo_publication_task,
Expand All @@ -45,6 +46,34 @@ var clusterUpgEntries = []versions.UpgradeEntry{
upg_mo_columns_add_attr_generated,
}

var checkMoIndexesIncludedColumns = func(txn executor.TxnExecutor, accountId uint32) (versions.ColumnInfo, error) {
return versions.CheckTableColumn(txn, accountId, catalog.MO_CATALOG, catalog.MO_INDEXES, catalog.IndexIncludedColumns)
}

func newMoIndexesAddIncludedColumnsEntry() versions.UpgradeEntry {
return versions.UpgradeEntry{
Schema: catalog.MO_CATALOG,
TableName: catalog.MO_INDEXES,
UpgType: versions.ADD_COLUMN,
UpgSql: fmt.Sprintf(
"alter table %s.%s add column %s text after %s",
catalog.MO_CATALOG,
catalog.MO_INDEXES,
catalog.IndexIncludedColumns,
"index_table_name",
),
CheckFunc: func(txn executor.TxnExecutor, accountId uint32) (bool, error) {
info, err := checkMoIndexesIncludedColumns(txn, accountId)
if err != nil {
return false, err
}
return info.IsExits, nil
},
}
}

var upg_mo_indexes_add_included_columns_for_cluster = newMoIndexesAddIncludedColumnsEntry()

var upg_mo_iscp_log_new = versions.UpgradeEntry{
Schema: catalog.MO_CATALOG,
TableName: catalog.MO_ISCP_LOG,
Expand Down
3 changes: 3 additions & 0 deletions pkg/bootstrap/versions/v4_0_0/tenant_upgrade_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
)

var tenantUpgEntries = []versions.UpgradeEntry{
upg_mo_indexes_add_included_columns_for_tenant,
enablePartitionMetadata,
enablePartitionTables,
upg_alter_mo_snapshots,
Expand All @@ -35,6 +36,8 @@ var tenantUpgEntries = []versions.UpgradeEntry{
upg_information_schema_statistics,
}

var upg_mo_indexes_add_included_columns_for_tenant = newMoIndexesAddIncludedColumnsEntry()

var enablePartitionMetadata = versions.UpgradeEntry{
Schema: catalog.MO_CATALOG,
TableName: catalog.MOPartitionMetadata,
Expand Down
64 changes: 64 additions & 0 deletions pkg/bootstrap/versions/v4_0_0/upgrade_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package v4_0_0
import (
"context"
"fmt"
"strings"
"testing"

"github.com/golang/mock/gomock"
Expand All @@ -33,6 +34,18 @@ import (
"github.com/matrixorigin/matrixone/pkg/util/sysview"
)

func findUpgradeEntryIndex(entries []versions.UpgradeEntry, target versions.UpgradeEntry) int {
for i, entry := range entries {
if entry.Schema == target.Schema &&
entry.TableName == target.TableName &&
entry.UpgType == target.UpgType &&
entry.UpgSql == target.UpgSql {
return i
}
}
return -1
}

func mockTenantUpgrade(t *testing.T) {
drop_mo_retention := versions.UpgradeEntry{
Schema: catalog.MO_CATALOG,
Expand All @@ -48,6 +61,10 @@ func mockTenantUpgrade(t *testing.T) {
}

func Test_Upgrade(t *testing.T) {
originalTenantUpgEntries := append([]versions.UpgradeEntry(nil), tenantUpgEntries...)
defer func() {
tenantUpgEntries = originalTenantUpgEntries
}()
mockTenantUpgrade(t)

sid := ""
Expand Down Expand Up @@ -84,6 +101,10 @@ func Test_Upgrade(t *testing.T) {
}

func Test_versionHandle_HandleClusterUpgrade(t *testing.T) {
originalClusterUpgEntries := append([]versions.UpgradeEntry(nil), clusterUpgEntries...)
defer func() {
clusterUpgEntries = originalClusterUpgEntries
}()
clusterUpgEntries = []versions.UpgradeEntry{}

v := &versionHandle{
Expand Down Expand Up @@ -169,3 +190,46 @@ func Test_upg_statistics_view_check_mismatch(t *testing.T) {
assert.NoError(t, err)
assert.False(t, ok)
}

func Test_upg_mo_indexes_add_included_columns_check(t *testing.T) {
stubs := gostub.Stub(&checkMoIndexesIncludedColumns, func(txn executor.TxnExecutor, accountId uint32) (versions.ColumnInfo, error) {
return versions.ColumnInfo{IsExits: true}, nil
})
defer stubs.Reset()

ok, err := upg_mo_indexes_add_included_columns_for_cluster.CheckFunc(nil, catalog.System_Account)
assert.NoError(t, err)
assert.True(t, ok)

ok, err = upg_mo_indexes_add_included_columns_for_tenant.CheckFunc(nil, 100)
assert.NoError(t, err)
assert.True(t, ok)
}

func Test_upg_mo_indexes_add_included_columns_check_error(t *testing.T) {
stubs := gostub.Stub(&checkMoIndexesIncludedColumns, func(txn executor.TxnExecutor, accountId uint32) (versions.ColumnInfo, error) {
return versions.ColumnInfo{}, moerr.NewInternalErrorNoCtx("return error")
})
defer stubs.Reset()

ok, err := upg_mo_indexes_add_included_columns_for_cluster.CheckFunc(nil, catalog.System_Account)
assert.Error(t, err)
assert.False(t, ok)
}

func Test_upg_mo_indexes_add_included_columns_ordering(t *testing.T) {
clusterEntryIdx := findUpgradeEntryIndex(clusterUpgEntries, upg_mo_indexes_add_included_columns_for_cluster)
clusterCreateIdx := findUpgradeEntryIndex(clusterUpgEntries, upg_mo_iscp_log_new)
if assert.NotEqual(t, -1, clusterEntryIdx) && assert.NotEqual(t, -1, clusterCreateIdx) {
assert.Less(t, clusterEntryIdx, clusterCreateIdx)
}

tenantEntryIdx := findUpgradeEntryIndex(tenantUpgEntries, upg_mo_indexes_add_included_columns_for_tenant)
tenantCreateIdx := findUpgradeEntryIndex(tenantUpgEntries, enablePartitionMetadata)
if assert.NotEqual(t, -1, tenantEntryIdx) && assert.NotEqual(t, -1, tenantCreateIdx) {
assert.Less(t, tenantEntryIdx, tenantCreateIdx)
}

assert.True(t, strings.Contains(upg_mo_indexes_add_included_columns_for_cluster.UpgSql, catalog.IndexIncludedColumns))
assert.True(t, strings.Contains(upg_mo_indexes_add_included_columns_for_cluster.UpgSql, "index_table_name"))
}
63 changes: 52 additions & 11 deletions pkg/catalog/secondary_index_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,18 +85,58 @@ func IsHnswIndexAlgo(algo string) bool {

// ------------------------[START] IndexAlgoParams------------------------
const (
IndexAlgoParamLists = "lists"
IndexAlgoParamOpType = "op_type"
HnswM = "m"
HnswEfConstruction = "ef_construction"
HnswQuantization = "quantization"
HnswEfSearch = "ef_search"
Async = "async"
AutoUpdate = "auto_update"
Day = "day"
Hour = "hour"
IndexAlgoParamLists = "lists"
IndexAlgoParamOpType = "op_type"
IndexAlgoParamIncludeColumns = "include_columns"
HnswM = "m"
HnswEfConstruction = "ef_construction"
HnswQuantization = "quantization"
HnswEfSearch = "ef_search"
Async = "async"
AutoUpdate = "auto_update"
Day = "day"
Hour = "hour"
)

func MarshalIncludeColumnsValue(cols []string) (string, error) {
if len(cols) == 0 {
return "", nil
}
data, err := json.Marshal(cols)
if err != nil {
return "", err
}
return string(data), nil
}

func ParseIncludeColumnsValue(raw string) ([]string, error) {
raw = strings.TrimSpace(raw)
if raw == "" {
return nil, nil
}

if strings.HasPrefix(raw, "[") {
var cols []string
if err := json.Unmarshal([]byte(raw), &cols); err == nil {
return cols, nil
}
}

parts := strings.Split(raw, ",")
cols := make([]string, 0, len(parts))
for _, part := range parts {
part = strings.TrimSpace(part)
if part == "" {
continue
}
cols = append(cols, part)
}
if len(cols) == 0 {
return nil, nil
}
return cols, nil
}

/* 1. ToString Functions */

// IndexParamsToStringList used by buildShowCreateTable and restoreDDL
Expand Down Expand Up @@ -227,7 +267,7 @@ func indexParamsToMap(def interface{}) (map[string]string, error) {
case tree.INDEX_TYPE_BTREE, tree.INDEX_TYPE_INVALID, tree.INDEX_TYPE_RTREE:
// do nothing
case tree.INDEX_TYPE_MASTER:
// do nothing
// do nothing
case tree.INDEX_TYPE_IVFFLAT:
if idx.IndexOption.AlgoParamList == 0 {
// NOTE:
Expand Down Expand Up @@ -265,6 +305,7 @@ func indexParamsToMap(def interface{}) (map[string]string, error) {
if idx.IndexOption.Hour > 0 {
res[Hour] = strconv.FormatInt(idx.IndexOption.Hour, 10)
}

case tree.INDEX_TYPE_HNSW:
if idx.IndexOption.HnswM < 0 {
return nil, moerr.NewInternalErrorNoCtx("invalid M. hnsw.M must be > 0")
Expand Down
53 changes: 52 additions & 1 deletion pkg/catalog/secondary_index_utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@ import (
"testing"

"github.com/stretchr/testify/require"

"github.com/matrixorigin/matrixone/pkg/sql/parsers/tree"
)

func TestIsIndexAsync(t *testing.T) {

var (
json string
err error
Expand All @@ -47,3 +48,53 @@ func TestIsIndexAsync(t *testing.T) {
_, err = IsIndexAsync(json)
require.NotNil(t, err)
}

func unresolvedName(name string) *tree.UnresolvedName {
return tree.NewUnresolvedName(tree.NewCStr(name, 1))
}

func TestIndexParamsToJsonString_DoesNotSerializeIvfFlatIncludeColumns(t *testing.T) {
idx := tree.NewIndex(
false,
[]*tree.KeyPart{tree.NewKeyPart(unresolvedName("embedding"), -1, tree.DefaultDirection, nil)},
"idx1",
tree.INDEX_TYPE_IVFFLAT,
&tree.IndexOption{
AlgoParamList: 10,
AlgoParamVectorOpType: "vector_l2_ops",
IncludeColumns: []*tree.UnresolvedName{
unresolvedName("title"),
unresolvedName("category"),
},
},
)

params, err := IndexParamsToJsonString(idx)
require.NoError(t, err)

paramMap, err := IndexParamsStringToMap(params)
require.NoError(t, err)
require.Equal(t, "10", paramMap[IndexAlgoParamLists])
require.Equal(t, "vector_l2_ops", paramMap[IndexAlgoParamOpType])
_, ok := paramMap[IndexAlgoParamIncludeColumns]
require.False(t, ok)
}

func TestIndexParamsToStringList_DoesNotRenderIncludeColumns(t *testing.T) {
paramList, err := IndexParamsToStringList(`{"lists":"10","op_type":"vector_l2_ops","include_columns":"[\"title\",\"category\"]"}`)
require.NoError(t, err)
require.Contains(t, paramList, "lists = 10")
require.Contains(t, paramList, "op_type 'vector_l2_ops'")
require.NotContains(t, paramList, "INCLUDE")
require.NotContains(t, paramList, "title")
}

func TestParseIncludeColumnsValue_BackwardCompatible(t *testing.T) {
cols, err := ParseIncludeColumnsValue(`["title","category"]`)
require.NoError(t, err)
require.Equal(t, []string{"title", "category"}, cols)

cols, err = ParseIncludeColumnsValue("title,category")
require.NoError(t, err)
require.Equal(t, []string{"title", "category"}, cols)
}
8 changes: 5 additions & 3 deletions pkg/catalog/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,9 +253,10 @@ const (
SystemRelAttr_LogicalID = "rel_logical_id"

// 'mo_indexes' table
IndexAlgoName = "algo"
IndexAlgoTableType = "algo_table_type"
IndexAlgoParams = "algo_params"
IndexAlgoName = "algo"
IndexAlgoTableType = "algo_table_type"
IndexAlgoParams = "algo_params"
IndexIncludedColumns = "included_columns"

// 'mo_columns' table
SystemColAttr_UniqName = "att_uniq_name"
Expand Down Expand Up @@ -385,6 +386,7 @@ const (
SystemSI_IVFFLAT_TblCol_Entries_id = "__mo_index_centroid_fk_id"
SystemSI_IVFFLAT_TblCol_Entries_pk = IndexTablePrimaryColName
SystemSI_IVFFLAT_TblCol_Entries_entry = "__mo_index_centroid_fk_entry"
SystemSI_IVFFLAT_IncludeColPrefix = "__mo_index_include_"

/************ 3. FULLTEXT Index **************/

Expand Down
1 change: 1 addition & 0 deletions pkg/frontend/predefined.go
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,7 @@ var (
ordinal_position int unsigned not null,
options text,
index_table_name varchar(5000),
included_columns text,
primary key(id, column_name)
)`, catalog.MO_CATALOG, catalog.MO_INDEXES)

Expand Down
Loading
Loading