-
Notifications
You must be signed in to change notification settings - Fork 8
Expand file tree
/
Copy pathadmin.go
More file actions
130 lines (121 loc) · 3.86 KB
/
admin.go
File metadata and controls
130 lines (121 loc) · 3.86 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
package spool
import (
"context"
"fmt"
"cloud.google.com/go/spanner"
admin "cloud.google.com/go/spanner/admin/database/apiv1"
"cloud.google.com/go/spanner/admin/database/apiv1/databasepb"
"github.com/cloudspannerecosystem/spool/internal/db"
"github.com/cloudspannerecosystem/spool/model"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
// Setup creates a new spool metadata database.
func Setup(ctx context.Context, conf *Config) error {
adminClient, err := admin.NewDatabaseAdminClient(ctx, conf.ClientOptions()...)
if err != nil {
return err
}
_, err = adminClient.GetDatabase(ctx, &databasepb.GetDatabaseRequest{
Name: conf.Database(),
})
if st, ok := status.FromError(err); ok && st.Code() == codes.NotFound {
// Database does not exist. Create a new one.
op, err := adminClient.CreateDatabase(ctx, &databasepb.CreateDatabaseRequest{
Parent: conf.Instance(),
CreateStatement: fmt.Sprintf("CREATE DATABASE `%s`", conf.DatabaseID()),
ExtraStatements: ddlToStatements(db.SpoolSchema),
})
if err != nil {
return err
}
if _, err := op.Wait(ctx); err != nil {
return err
}
} else if err != nil {
return err
} else {
// Database already exists. Try to update schema.
// Considerations when the database is created using terraform, etc.
op, err := adminClient.UpdateDatabaseDdl(ctx, &databasepb.UpdateDatabaseDdlRequest{
Database: conf.Database(),
Statements: ddlToStatements(db.SpoolSchema),
})
if err != nil {
return err
}
if err := op.Wait(ctx); err != nil {
return err
}
}
return nil
}
// ListAll gets all databases from the pool.
func ListAll(ctx context.Context, conf *Config) ([]*model.SpoolDatabase, error) {
client, err := spanner.NewClient(ctx, conf.Database(), conf.ClientOptions()...)
if err != nil {
return nil, err
}
return model.FindAllSpoolDatabases(ctx, client.ReadOnlyTransaction())
}
// CleanAll removes all idle databases.
func CleanAll(ctx context.Context, conf *Config, filters ...func(sdb *model.SpoolDatabase) bool) error {
client, err := spanner.NewClient(ctx, conf.Database(), conf.ClientOptions()...)
if err != nil {
return err
}
return clean(ctx, client, conf, func(ctx context.Context, txn *spanner.ReadWriteTransaction) ([]*model.SpoolDatabase, error) {
sdbs, err := model.FindAllSpoolDatabases(ctx, txn)
if err != nil {
return nil, err
}
return filter(sdbs, filters...), nil
})
}
func clean(ctx context.Context, client *spanner.Client, conf *Config, find func(ctx context.Context, txn *spanner.ReadWriteTransaction) ([]*model.SpoolDatabase, error)) error {
var dropErr error
if _, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, txn *spanner.ReadWriteTransaction) error {
sdbs, err := find(ctx, txn)
if err != nil {
return err
}
ms := []*spanner.Mutation{}
for _, sdb := range sdbs {
dropErr = dropDatabase(ctx, conf.WithDatabaseID(sdb.DatabaseName))
if dropErr != nil {
st, ok := status.FromError(dropErr)
if ok && st.Code() == codes.NotFound {
// Database was not found, ignore this error and continue to the next database.
// Reset dropErr so it doesn't affect the final return value unless a subsequent, different error occurs.
dropErr = nil
fmt.Printf("%s was not deleted because it no longer exists.\n", sdb.DatabaseName)
} else {
// For any other error, break the loop.
break
}
}
ms = append(ms, sdb.Delete(ctx))
}
if len(ms) > 0 {
if err := txn.BufferWrite(ms); err != nil {
return err
}
}
return nil
}); err != nil {
return err
}
if dropErr != nil {
return dropErr
}
return nil
}
func dropDatabase(ctx context.Context, conf *Config) error {
adminClient, err := admin.NewDatabaseAdminClient(ctx, conf.ClientOptions()...)
if err != nil {
return err
}
return adminClient.DropDatabase(ctx, &databasepb.DropDatabaseRequest{
Database: conf.Database(),
})
}