Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 68 additions & 0 deletions pkg/keyspace/tso_keyspace_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,74 @@ func (m *GroupManager) GetGroupByKeyspaceID(id uint32) (uint32, error) {
return 0, errs.ErrKeyspaceNotInAnyKeyspaceGroup
}

// RemoveKeyspacesFromGroup removes the specified keyspaces from the given keyspace group.
// If a keyspace is not in the group, it will be skipped (no error).
// It returns the updated keyspace group and any error encountered.
func (m *GroupManager) RemoveKeyspacesFromGroup(groupID uint32, keyspaceIDs []uint32) (*endpoint.KeyspaceGroup, error) {
m.Lock()
defer m.Unlock()

var (
kg *endpoint.KeyspaceGroup
err error
)

if err := m.store.RunInTxn(m.ctx, func(txn kv.Txn) error {
// Load the keyspace group
kg, err = m.store.LoadKeyspaceGroup(txn, groupID)
if err != nil {
return err
}
if kg == nil {
return errs.ErrKeyspaceGroupNotExists.FastGenByArgs(groupID)
}
if kg.IsSplitting() {
return errs.ErrKeyspaceGroupInSplit.FastGenByArgs(groupID)
}
if kg.IsMerging() {
return errs.ErrKeyspaceGroupInMerging.FastGenByArgs(groupID)
}

// Build a set of keyspaces to remove (excluding protected bootstrap/system keyspace)
toRemove := make(map[uint32]struct{})
for _, ksID := range keyspaceIDs {
// Keep the protected bootstrap/system keyspace in the default group.
if isProtectedKeyspaceID(ksID) {
continue
}
// Only add if it exists in the group (skip if not present)
if slice.Contains(kg.Keyspaces, ksID) {
toRemove[ksID] = struct{}{}
}
}

// If nothing to remove, return nil to skip update
if len(toRemove) == 0 {
return nil
}

// Filter out keyspaces to remove
newKeyspaces := make([]uint32, 0, len(kg.Keyspaces)-len(toRemove))
for _, ks := range kg.Keyspaces {
if _, shouldRemove := toRemove[ks]; !shouldRemove {
newKeyspaces = append(newKeyspaces, ks)
}
}
kg.Keyspaces = newKeyspaces

// Save the updated keyspace group
return m.store.SaveKeyspaceGroup(txn, kg)
}); err != nil {
return nil, err
}

// Update the cache
userKind := endpoint.StringUserKind(kg.UserKind)
m.groups[userKind].Put(kg)

return kg, nil
}

var failpointOnce sync.Once

// UpdateKeyspaceForGroup updates the keyspace field for the keyspace group.
Expand Down
88 changes: 88 additions & 0 deletions server/apiv2/handlers/tso_keyspace_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/gin-gonic/gin"

"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/keyspacepb"

"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/keyspace/constant"
Expand Down Expand Up @@ -51,6 +52,7 @@ func RegisterTSOKeyspaceGroup(r *gin.RouterGroup) {
router.DELETE("/:id/split", FinishSplitKeyspaceByID)
router.POST("/:id/merge", MergeKeyspaceGroups)
router.DELETE("/:id/merge", FinishMergeKeyspaceByID)
router.DELETE("/:id/keyspaces", RemoveKeyspacesFromGroup)
}

// CreateKeyspaceGroupParams defines the params for creating keyspace groups.
Expand Down Expand Up @@ -558,3 +560,89 @@ func parseNodeAddress(c *gin.Context) (string, error) {
func isValid(id uint32) bool {
return id >= constant.DefaultKeyspaceGroupID && id <= mcs.MaxKeyspaceGroupCountInUse
}

// RemoveKeyspacesFromGroupParams defines the params for removing keyspaces from a keyspace group.
type RemoveKeyspacesFromGroupParams struct {
Keyspaces []uint32 `json:"keyspaces"`
}

// RemoveKeyspacesFromGroup removes the specified keyspaces from the given keyspace group.
// Keyspaces in archived or tombstone state will be removed. Keyspaces not in the group will be skipped.
func RemoveKeyspacesFromGroup(c *gin.Context) {
// Parse and validate group ID
groupID, err := validateKeyspaceGroupID(c)
if err != nil {
c.AbortWithStatusJSON(http.StatusBadRequest, "invalid keyspace group id")
return
}

// Parse request body
var params RemoveKeyspacesFromGroupParams
if err := c.BindJSON(&params); err != nil {
c.AbortWithStatusJSON(http.StatusBadRequest, errs.ErrBindJSON.Wrap(err).GenWithStackByCause().Error())
return
}

if len(params.Keyspaces) == 0 {
c.AbortWithStatusJSON(http.StatusBadRequest, "keyspaces list cannot be empty")
return
}

svr := c.MustGet(middlewares.ServerContextKey).(*server.Server)
keyspaceManager := svr.GetKeyspaceManager()
if keyspaceManager == nil {
c.AbortWithStatusJSON(http.StatusInternalServerError, managerUninitializedErr)
return
}

groupManager := svr.GetKeyspaceGroupManager()
if groupManager == nil {
c.AbortWithStatusJSON(http.StatusInternalServerError, GroupManagerUninitializedErr)
return
}

// Filter keyspaces: only keep those in ARCHIVED or TOMBSTONE state
var validKeyspaces []uint32
for _, keyspaceID := range params.Keyspaces {
// Load the keyspace meta to check its state
keyspaceMeta, err := keyspaceManager.LoadKeyspaceByID(keyspaceID)
if err != nil {
// Skip if keyspace doesn't exist
if errors.ErrorEqual(err, errs.ErrKeyspaceNotFound) {
continue
}
c.AbortWithStatusJSON(http.StatusInternalServerError, err.Error())
return
}

// Check if the keyspace is in archived or tombstone state
state := keyspaceMeta.GetState()
if state == keyspacepb.KeyspaceState_ARCHIVED || state == keyspacepb.KeyspaceState_TOMBSTONE {
validKeyspaces = append(validKeyspaces, keyspaceID)
}
}

// If no valid keyspaces to remove, load and return the group without modification
if len(validKeyspaces) == 0 {
kg, err := groupManager.GetKeyspaceGroupByID(groupID)
if err != nil {
c.AbortWithStatusJSON(http.StatusInternalServerError, err.Error())
return
}
if kg == nil {
c.AbortWithStatusJSON(http.StatusInternalServerError, errs.ErrKeyspaceGroupNotExists.FastGenByArgs(groupID).Error())
return
}
c.IndentedJSON(http.StatusOK, kg)
return
}

// Remove the keyspaces from the keyspace group
kg, err := groupManager.RemoveKeyspacesFromGroup(groupID, validKeyspaces)
if err != nil {
c.AbortWithStatusJSON(http.StatusInternalServerError, err.Error())
return
}

c.IndentedJSON(http.StatusOK, kg)
}
37 changes: 37 additions & 0 deletions tests/server/apiv2/handlers/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,3 +326,40 @@ func MustMergeKeyspaceGroup(re *require.Assertions, server *tests.TestServer, id
re.NoError(err)
re.Equal(http.StatusOK, resp.StatusCode, string(data))
}

// MustRemoveKeyspacesFromGroup removes keyspaces from a keyspace group with HTTP API.
func MustRemoveKeyspacesFromGroup(re *require.Assertions, server *tests.TestServer, groupID uint32, keyspaceIDs []uint32) *endpoint.KeyspaceGroup {
params := &handlers.RemoveKeyspacesFromGroupParams{
Keyspaces: keyspaceIDs,
}
data, err := json.Marshal(params)
re.NoError(err)
httpReq, err := http.NewRequest(http.MethodDelete, server.GetAddr()+keyspaceGroupsPrefix+fmt.Sprintf("/%d/keyspaces", groupID), bytes.NewBuffer(data))
re.NoError(err)
resp, err := tests.TestDialClient.Do(httpReq)
re.NoError(err)
defer resp.Body.Close()
respData, err := io.ReadAll(resp.Body)
re.NoError(err)
re.Equal(http.StatusOK, resp.StatusCode, string(respData))
var kg endpoint.KeyspaceGroup
re.NoError(json.Unmarshal(respData, &kg))
return &kg
}

// FailRemoveKeyspacesFromGroupWithCode fails to remove keyspaces from a keyspace group with HTTP API.
func FailRemoveKeyspacesFromGroupWithCode(re *require.Assertions, server *tests.TestServer, groupID uint32, keyspaceIDs []uint32, expectCode int) {
params := &handlers.RemoveKeyspacesFromGroupParams{
Keyspaces: keyspaceIDs,
}
data, err := json.Marshal(params)
re.NoError(err)
httpReq, err := http.NewRequest(http.MethodDelete, server.GetAddr()+keyspaceGroupsPrefix+fmt.Sprintf("/%d/keyspaces", groupID), bytes.NewBuffer(data))
re.NoError(err)
resp, err := tests.TestDialClient.Do(httpReq)
re.NoError(err)
defer resp.Body.Close()
respData, err := io.ReadAll(resp.Body)
re.NoError(err)
re.Equal(expectCode, resp.StatusCode, string(respData))
}
123 changes: 119 additions & 4 deletions tests/server/apiv2/handlers/tso_keyspace_group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,12 @@ import (

"github.com/stretchr/testify/suite"

"github.com/tikv/pd/pkg/mcs/utils/constant"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/keyspacepb"

"github.com/tikv/pd/pkg/keyspace"
kgconstant "github.com/tikv/pd/pkg/keyspace/constant"
mcsconstant "github.com/tikv/pd/pkg/mcs/utils/constant"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/server/apiv2/handlers"
"github.com/tikv/pd/tests"
Expand Down Expand Up @@ -100,7 +105,7 @@ func (suite *keyspaceGroupTestSuite) TestCreateKeyspaceGroups() {
// invalid ID.
kgs = &handlers.CreateKeyspaceGroupParams{KeyspaceGroups: []*endpoint.KeyspaceGroup{
{
ID: constant.MaxKeyspaceGroupCount + 1,
ID: mcsconstant.MaxKeyspaceGroupCount + 1,
UserKind: endpoint.Standard.String(),
},
}}
Expand Down Expand Up @@ -141,7 +146,7 @@ func (suite *keyspaceGroupTestSuite) TestSplitKeyspaceGroup() {
ID: uint32(1),
UserKind: endpoint.Standard.String(),
Keyspaces: []uint32{111, 222, 333},
Members: make([]endpoint.KeyspaceGroupMember, constant.DefaultKeyspaceGroupReplicaCount),
Members: make([]endpoint.KeyspaceGroupMember, mcsconstant.DefaultKeyspaceGroupReplicaCount),
},
}}

Expand Down Expand Up @@ -248,7 +253,6 @@ func (suite *keyspaceGroupTestSuite) TestKeyspaceGroupErrorMessage() {
re.NoError(json.NewDecoder(resp.Body).Decode(&errorMsg))
re.NotEmpty(errorMsg, "Error message should not be empty")
re.Contains(errorMsg, "invalid", "Error message should indicate invalid input")

// Test SetPriorityForKeyspaceGroup with invalid JSON
httpReq, err = http.NewRequest(
http.MethodPatch,
Expand All @@ -266,3 +270,114 @@ func (suite *keyspaceGroupTestSuite) TestKeyspaceGroupErrorMessage() {
re.NotEmpty(errorMsg, "Error message should not be empty")
re.Contains(errorMsg, "invalid", "Error message should indicate invalid input")
}

func (suite *keyspaceGroupTestSuite) TestRemoveKeyspacesFromGroup() {
re := suite.Require()
re.NoError(failpoint.Enable("github.com/tikv/pd/server/delayStartServerLoop", `return(true)`))
defer func() {
re.NoError(failpoint.Disable("github.com/tikv/pd/server/delayStartServerLoop"))
}()
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/keyspace/skipSplitRegion", "return(true)"))
defer func() {
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/keyspace/skipSplitRegion"))
}()

keyspaceManager := suite.server.GetKeyspaceManager()
re.NotNil(keyspaceManager)

// Create test keyspaces (automatically added to default keyspace group 0)
keyspaceMeta1, err := keyspaceManager.CreateKeyspace(&keyspace.CreateKeyspaceRequest{
Name: "test_keyspace_1",
CreateTime: 0,
})
re.NoError(err)
keyspaceID1 := keyspaceMeta1.GetId()

keyspaceMeta2, err := keyspaceManager.CreateKeyspace(&keyspace.CreateKeyspaceRequest{
Name: "test_keyspace_2",
CreateTime: 0,
})
re.NoError(err)
keyspaceID2 := keyspaceMeta2.GetId()

keyspaceMeta3, err := keyspaceManager.CreateKeyspace(&keyspace.CreateKeyspaceRequest{
Name: "test_keyspace_3",
CreateTime: 0,
})
re.NoError(err)
keyspaceID3 := keyspaceMeta3.GetId()

// Verify all keyspaces are in the default group
kg := MustLoadKeyspaceGroupByID(re, suite.server, kgconstant.DefaultKeyspaceGroupID)
re.Contains(kg.Keyspaces, keyspaceID1)
re.Contains(kg.Keyspaces, keyspaceID2)
re.Contains(kg.Keyspaces, keyspaceID3)

// Test 1: Try to remove ENABLED keyspaces (should succeed but nothing removed)
kg = MustRemoveKeyspacesFromGroup(re, suite.server, kgconstant.DefaultKeyspaceGroupID,
[]uint32{keyspaceID1})
// Verify nothing is removed (keyspace is still there because it's ENABLED)
re.Contains(kg.Keyspaces, keyspaceID1)

// Test 2: Update keyspaces to ARCHIVED/TOMBSTONE state and batch remove
// Set keyspace1 to ARCHIVED
_, err = keyspaceManager.UpdateKeyspaceStateByID(keyspaceID1, keyspacepb.KeyspaceState_DISABLED, 0)
re.NoError(err)
_, err = keyspaceManager.UpdateKeyspaceStateByID(keyspaceID1, keyspacepb.KeyspaceState_ARCHIVED, 0)
re.NoError(err)

// Set keyspace2 to TOMBSTONE
_, err = keyspaceManager.UpdateKeyspaceStateByID(keyspaceID2, keyspacepb.KeyspaceState_DISABLED, 0)
re.NoError(err)
_, err = keyspaceManager.UpdateKeyspaceStateByID(keyspaceID2, keyspacepb.KeyspaceState_ARCHIVED, 0)
re.NoError(err)
_, err = keyspaceManager.UpdateKeyspaceStateByID(keyspaceID2, keyspacepb.KeyspaceState_TOMBSTONE, 0)
re.NoError(err)

// Batch remove keyspace1 and keyspace2
MustRemoveKeyspacesFromGroup(re, suite.server, kgconstant.DefaultKeyspaceGroupID,
[]uint32{keyspaceID1, keyspaceID2})

// Verify both keyspaces are removed
kg = MustLoadKeyspaceGroupByID(re, suite.server, kgconstant.DefaultKeyspaceGroupID)
re.NotContains(kg.Keyspaces, keyspaceID1)
re.NotContains(kg.Keyspaces, keyspaceID2)
re.Contains(kg.Keyspaces, keyspaceID3) // keyspace3 should still be there

// Test 3: Mix valid and invalid keyspaces
// Set keyspace3 to ARCHIVED
_, err = keyspaceManager.UpdateKeyspaceStateByID(keyspaceID3, keyspacepb.KeyspaceState_DISABLED, 0)
re.NoError(err)
_, err = keyspaceManager.UpdateKeyspaceStateByID(keyspaceID3, keyspacepb.KeyspaceState_ARCHIVED, 0)
re.NoError(err)

// Include: valid (keyspace3), already removed (keyspace1), non-existent (99999)
// Should only remove keyspace3, others are skipped
MustRemoveKeyspacesFromGroup(re, suite.server, kgconstant.DefaultKeyspaceGroupID,
[]uint32{keyspaceID3, keyspaceID1, 99999})

// Verify only keyspace3 is removed
kg = MustLoadKeyspaceGroupByID(re, suite.server, kgconstant.DefaultKeyspaceGroupID)
re.NotContains(kg.Keyspaces, keyspaceID3)

// Test 4: Try to remove from non-existent group
FailRemoveKeyspacesFromGroupWithCode(re, suite.server, 999,
[]uint32{keyspaceID1}, http.StatusInternalServerError)

// Test 5: Try to remove with empty keyspace list (should fail - empty list)
FailRemoveKeyspacesFromGroupWithCode(re, suite.server, kgconstant.DefaultKeyspaceGroupID,
[]uint32{}, http.StatusBadRequest)

// Test 6: All keyspaces in wrong state (should succeed but nothing removed)
keyspaceMeta4, err := keyspaceManager.CreateKeyspace(&keyspace.CreateKeyspaceRequest{
Name: "test_keyspace_4",
CreateTime: 0,
})
re.NoError(err)
keyspaceID4 := keyspaceMeta4.GetId()

kg = MustRemoveKeyspacesFromGroup(re, suite.server, kgconstant.DefaultKeyspaceGroupID,
[]uint32{keyspaceID4}) // ENABLED state, will be skipped
// Verify keyspace4 is still there
re.Contains(kg.Keyspaces, keyspaceID4)
}
Loading