Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion pkg/instance/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ func (i *Instance) setupInstanceAPI() {
i.ForRole("instance", i.rootContext).AddEventListener(apitypes.EventTopicAPIMuxSetup, func(ev *roles.Event) {
svc := ev.Payload.Data["svc"].(*web.Service)
svc.Get("/api/v1/cluster", i.APIClusterInfo())
svc.Get("/api/v1/cluster/instance", i.APIInstanceInfo())
svc.Get("/api/v1/cluster/instance", i.APIInstanceGet())
svc.Put("/api/v1/cluster/instance", i.APIInstancePut())
svc.Post("/api/v1/cluster/roles/restart", i.APIClusterRoleRestart())
})
}
31 changes: 30 additions & 1 deletion pkg/instance/api_instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ type APIInstanceInfo struct {
InstanceIP string `json:"instanceIP" required:"true"`
}

func (i *Instance) APIInstanceInfo() usecase.Interactor {
func (i *Instance) APIInstanceGet() usecase.Interactor {
u := usecase.NewInteractor(func(ctx context.Context, input struct{}, output *APIInstanceInfo) error {
output.Version = extconfig.Version
output.BuildHash = extconfig.BuildHash
Expand All @@ -89,3 +89,32 @@ func (i *Instance) APIInstanceInfo() usecase.Interactor {
u.SetExpectedErrors(status.Internal)
return u
}

type APIInstancesPutInput struct {
Identifier string `query:"identifier" required:"true"`

Roles []string `json:"roles" required:"true"`
}

func (i *Instance) APIInstancePut() usecase.Interactor {
u := usecase.NewInteractor(func(ctx context.Context, input APIInstancesPutInput, output *struct{}) error {
_, err := i.kv.Put(
ctx,
i.kv.Key(
types.KeyInstance,
i.identifier,
types.KeyRoles,
).String(),
string(strings.Join(input.Roles, types.RoleSeparator)),
)
if err != nil {
return status.Wrap(err, status.Internal)
}
return nil
})
u.SetName("cluster.put_instance")
u.SetTitle("Instance")
u.SetTags("cluster/instances")
u.SetExpectedErrors(status.Internal)
return u
}
2 changes: 1 addition & 1 deletion pkg/instance/api_instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ func TestAPIInstanceInfo(t *testing.T) {
rootInst := instance.New()

var output instance.APIInstanceInfo
assert.NoError(t, rootInst.APIInstanceInfo().Interact(tests.Context(), struct{}{}, &output))
assert.NoError(t, rootInst.APIInstanceGet().Interact(tests.Context(), struct{}{}, &output))
assert.NotNil(t, output)
assert.Equal(t, output.Version, extconfig.Version)
}
Expand Down
52 changes: 3 additions & 49 deletions pkg/instance/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,57 +154,11 @@ func (i *Instance) getRoles(ctx context.Context) []string {
roles := extconfig.Get().BootstrapRoles
if err == nil && len(rr.Kvs) > 0 {
roles = string(rr.Kvs[0].Value)
i.log.Info("roles configured for instance", zap.Strings("roles", strings.Split(roles, ";")))
i.log.Info("roles configured for instance", zap.Strings("roles", strings.Split(roles, types.RoleSeparator)))
} else {
i.log.Info("defaulting to bootstrap roles", zap.Strings("roles", strings.Split(roles, ";")))
i.log.Info("defaulting to bootstrap roles", zap.Strings("roles", strings.Split(roles, types.RoleSeparator)))
}
return strings.Split(roles, ";")
}

func (i *Instance) bootstrap(ctx context.Context) {
i.log.Debug("bootstrapping instance")
i.keepAliveInstanceInfo(ctx)
i.setupInstanceAPI()
rootInstance := i.ForRole("root", ctx)
for _, roleId := range i.getRoles(ctx) {
instanceRoles.WithLabelValues(roleId).Add(1)
rctx, cancel := context.WithCancelCause(i.rootContext)
rc := RoleContext{
RoleInstance: i.ForRole(roleId, rctx),
ContextCancelFunc: cancel,
}
switch roleId {
case "etcd":
// Special handling
continue
default:
span := sentry.StartSpan(ctx, "gravity.instance.bootstrap.role")
span.SetTag("gravity.role", roleId)
rc.Role = roles.GetRole(roleId)(rc.RoleInstance)
span.Finish()
}
i.rolesM.Lock()
i.roles[roleId] = rc
i.rolesM.Unlock()
}
rootInstance.AddEventListener(types.EventTopicRoleRestart, i.eventRoleRestart)
rootInstance.DispatchEvent(
types.EventTopicInstanceBootstrapped,
roles.NewEvent(i.rootContext, map[string]interface{}{}),
)
i.checkFirstStart(ctx)
wg := sync.WaitGroup{}
for roleId := range i.roles {
wg.Add(1)
go i.startWatchRole(ctx, roleId, func() {
wg.Done()
})
}
go func() {
wg.Wait()
i.DispatchEvent(types.EventTopicRolesStarted, roles.NewEvent(ctx, map[string]interface{}{}))
sentry.TransactionFromContext(ctx).Finish()
}()
return strings.Split(roles, types.RoleSeparator)
}

func (i *Instance) eventRoleRestart(ev *roles.Event) {
Expand Down
56 changes: 56 additions & 0 deletions pkg/instance/instance_bootstrap.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package instance

import (
"context"
"sync"

"beryju.io/gravity/pkg/instance/types"
"beryju.io/gravity/pkg/roles"
"github.com/getsentry/sentry-go"
)

func (i *Instance) bootstrap(ctx context.Context) {
i.log.Debug("bootstrapping instance")
i.keepAliveInstanceInfo(ctx)
i.setupInstanceAPI()
rootInstance := i.ForRole("root", ctx)
for _, roleId := range i.getRoles(ctx) {
instanceRoles.WithLabelValues(roleId).Add(1)
rctx, cancel := context.WithCancelCause(i.rootContext)
rc := RoleContext{
RoleInstance: i.ForRole(roleId, rctx),
ContextCancelFunc: cancel,
}
switch roleId {
case "etcd":
// Special handling
continue
default:
span := sentry.StartSpan(ctx, "gravity.instance.bootstrap.role")
span.SetTag("gravity.role", roleId)
rc.Role = roles.GetRole(roleId)(rc.RoleInstance)
span.Finish()
}
i.rolesM.Lock()
i.roles[roleId] = rc
i.rolesM.Unlock()
}
rootInstance.AddEventListener(types.EventTopicRoleRestart, i.eventRoleRestart)
rootInstance.DispatchEvent(
types.EventTopicInstanceBootstrapped,
roles.NewEvent(i.rootContext, map[string]interface{}{}),
)
i.checkFirstStart(ctx)
wg := sync.WaitGroup{}
for roleId := range i.roles {
wg.Add(1)
go i.startWatchRole(ctx, roleId, func() {
wg.Done()
})
}
go func() {
wg.Wait()
i.DispatchEvent(types.EventTopicRolesStarted, roles.NewEvent(ctx, map[string]interface{}{}))
sentry.TransactionFromContext(ctx).Finish()
}()
}
2 changes: 2 additions & 0 deletions pkg/instance/types/role.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,5 @@ const (
KeyCluster = "cluster"
KeyMigration = "migration"
)

const RoleSeparator = ";"
15 changes: 7 additions & 8 deletions pkg/roles/etcd/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,9 @@ func (r *Role) APIClusterMembers() usecase.Interactor {
}

type APIMemberJoinInput struct {
Peer string `json:"peer" maxLength:"255"`
Roles string `json:"roles"`
Identifier string `json:"identifier"`
Peer string `json:"peer" maxLength:"255"`
Roles []string `json:"roles"`
Identifier string `json:"identifier"`
}
type APIMemberJoinOutput struct {
EtcdInitialCluster string `json:"etcdInitialCluster"`
Expand Down Expand Up @@ -80,11 +80,10 @@ func (r *Role) APIClusterJoin() usecase.Interactor {
))

// Pre-configure roles for new node
roles := strings.Split(input.Roles, ",")
if input.Roles == "" {
roles = strings.Split(extconfig.Get().BootstrapRoles, ",")
if len(input.Roles) == 0 {
input.Roles = strings.Split(extconfig.Get().BootstrapRoles, ",")
// If we're copying our roles, exclude backup
roles = slices.DeleteFunc(roles, func(role string) bool {
input.Roles = slices.DeleteFunc(input.Roles, func(role string) bool {
return role == "backup"
})
}
Expand All @@ -95,7 +94,7 @@ func (r *Role) APIClusterJoin() usecase.Interactor {
input.Identifier,
types.KeyRoles,
).String(),
strings.Join(roles, ","),
strings.Join(input.Roles, ","),
)
if err != nil {
r.log.Warn("failed to put roles for node", zap.Error(err))
Expand Down
Loading
Loading