diff --git a/go.mod b/go.mod index 06d4db1..543f005 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,8 @@ go 1.26 require ( github.com/go-redis/redismock/v9 v9.2.0 github.com/redis/go-redis/v9 v9.18.0 + github.com/spf13/cobra v1.10.2 + github.com/spf13/viper v1.21.0 github.com/streamingfast/dgrpc v0.0.0-20260420180129-8b81f2664993 github.com/streamingfast/logging v0.0.0-20260108192805-38f96de0a641 github.com/stretchr/testify v1.11.1 @@ -75,9 +77,7 @@ require ( github.com/sourcegraph/conc v0.3.1-0.20240121214520-5f936abd7ae8 // indirect github.com/spf13/afero v1.15.0 // indirect github.com/spf13/cast v1.10.0 // indirect - github.com/spf13/cobra v1.10.2 // indirect github.com/spf13/pflag v1.0.10 // indirect - github.com/spf13/viper v1.21.0 // indirect github.com/streamingfast/dmetrics v0.0.0-20250711072030-f023e918a175 // indirect github.com/streamingfast/sf-tracing v0.0.0-20251218140752-bafd5572499f // indirect github.com/streamingfast/shutter v1.5.0 // indirect diff --git a/go.sum b/go.sum index 6642338..7242f19 100644 --- a/go.sum +++ b/go.sum @@ -67,8 +67,8 @@ github.com/envoyproxy/protoc-gen-validate v1.2.1 h1:DEo3O99U8j4hBFwbJfrz9VtgcDfU github.com/envoyproxy/protoc-gen-validate v1.2.1/go.mod h1:d/C80l/jxXLdfEIhX1W2TmLfsJ31lvEjwamM4DxlWXU= github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg= github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= -github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY= -github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbSClcnxKAGw= +github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8= +github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0= github.com/fsnotify/fsnotify v1.9.0 h1:2Ml+OJNzbYCTzsxtv8vKSFD9PbJjmhYF14k/jKC7S9k= github.com/fsnotify/fsnotify v1.9.0/go.mod h1:8jBTzvmWwFyi3Pb8djgCCO5IBqzKJ/Jwo8TRcHyHii0= github.com/fxamacker/cbor/v2 v2.9.0 h1:NpKPmjDBgUfBms6tr6JZkTHtfFGcMKsw3eGcmD/sapM= @@ -212,7 +212,6 @@ github.com/spf13/cast v1.10.0 h1:h2x0u2shc1QuLHfxi+cTJvs30+ZAHOGRic8uyGTDWxY= github.com/spf13/cast v1.10.0/go.mod h1:jNfB8QC9IA6ZuY2ZjDp0KtFO2LZZlg4S/7bzP6qqeHo= github.com/spf13/cobra v1.10.2 h1:DMTTonx5m65Ic0GOoRY2c16WCbHxOOw6xxezuLaBpcU= github.com/spf13/cobra v1.10.2/go.mod h1:7C1pvHqHw5A4vrJfjNwvOdzYu0Gml16OCs2GRiTUUS4= -github.com/spf13/pflag v1.0.9 h1:9exaQaMOCwffKiiiYk6/BndUBv+iRViNW+4lEMi0PvY= github.com/spf13/pflag v1.0.9/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= github.com/spf13/pflag v1.0.10 h1:4EBh2KAYBwaONj6b2Ye1GiHfwjqyROoF4RwYO+vPwFk= github.com/spf13/pflag v1.0.10/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= diff --git a/grpc/services/hosted.go b/grpc/services/hosted.go index 4e4c0aa..0c3e654 100644 --- a/grpc/services/hosted.go +++ b/grpc/services/hosted.go @@ -39,7 +39,7 @@ func NewHosted(k8sClient kubernetes.Interface, namespace string, redisClient *re } func (s *Hosted) Deploy(ctx context.Context, req *pbservice.DeploymentRequest) (*pbservice.DeploymentResult, error) { - s.logger.Info("deploy request received") + s.logger.Info("deploy request received", zap.String("user_id", req.GetUserId())) deploymentRequest := req.GetDeploymentRequest() if deploymentRequest == nil { @@ -51,7 +51,7 @@ func (s *Hosted) Deploy(ctx context.Context, req *pbservice.DeploymentRequest) ( return nil, fmt.Errorf("creating deployer: %w", err) } - if err := d.Deploy(ctx, req.GetDeploymentId(), req.GetOrganizationId(), req.GetApiKey()); err != nil { + if err := d.Deploy(ctx, req.GetDeploymentId(), req.GetOrganizationId(), req.GetUserId(), req.GetApiKey()); err != nil { s.logger.Error("failed to deploy", zap.Error(err)) return nil, fmt.Errorf("deploying: %w", err) } @@ -60,7 +60,7 @@ func (s *Hosted) Deploy(ctx context.Context, req *pbservice.DeploymentRequest) ( s.logger.Error("failed to start tracker", zap.String("deployment_id", req.GetDeploymentId()), zap.Error(err)) } - s.logger.Info("deploy request completed successfully", zap.String("deployment_id", req.GetDeploymentId())) + s.logger.Info("deploy request completed successfully", zap.String("deployment_id", req.GetDeploymentId()), zap.String("user_id", req.GetUserId())) return &pbservice.DeploymentResult{}, nil } diff --git a/k8s/deployer/deployer.go b/k8s/deployer/deployer.go index 921ae87..d73f29f 100644 --- a/k8s/deployer/deployer.go +++ b/k8s/deployer/deployer.go @@ -10,7 +10,7 @@ import ( ) type Deployer interface { - Deploy(ctx context.Context, deploymentID, organizationID, apiKey string) error + Deploy(ctx context.Context, deploymentID, organizationID, userID, apiKey string) error } func New(k8sClient kubernetes.Interface, namespace string, logger *zap.Logger, deploymentRequest *pbcommon.DeploymentRequest) (Deployer, error) { diff --git a/k8s/deployer/sink_sql_from_proto.go b/k8s/deployer/sink_sql_from_proto.go index a92287d..005a7aa 100644 --- a/k8s/deployer/sink_sql_from_proto.go +++ b/k8s/deployer/sink_sql_from_proto.go @@ -32,7 +32,7 @@ func NewSinkSqlFromProto(k8sClient kubernetes.Interface, namespace string, logge } } -func (d *SinkSqlFromProto) Deploy(ctx context.Context, deploymentID, organizationID, apiKey string) error { +func (d *SinkSqlFromProto) Deploy(ctx context.Context, deploymentID, organizationID, userID, apiKey string) error { deployment := d.deployment podName := podName(organizationID, deployment.GetExecutionConfig()) @@ -76,6 +76,7 @@ func (d *SinkSqlFromProto) Deploy(ctx context.Context, deploymentID, organizatio "managed-by": "services-control-plane", "deployment-id": deploymentID, "organization-id": organizationID, + "user-id": userID, "output-module": strings.ReplaceAll(execCfg.GetOutputModule(), ":", "-"), } diff --git a/pb/sf/hosted/common/v1/deployment.pb.go b/pb/sf/hosted/common/v1/deployment.pb.go index 7d428b6..e18b396 100644 --- a/pb/sf/hosted/common/v1/deployment.pb.go +++ b/pb/sf/hosted/common/v1/deployment.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.36.11 -// protoc (unknown) +// protoc v3.21.12 // source: sf/hosted/common/v1/deployment.proto package pbcommon diff --git a/pb/sf/hosted/common/v1/output_config.pb.go b/pb/sf/hosted/common/v1/output_config.pb.go index 7edb616..729aff8 100644 --- a/pb/sf/hosted/common/v1/output_config.pb.go +++ b/pb/sf/hosted/common/v1/output_config.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.36.11 -// protoc (unknown) +// protoc v3.21.12 // source: sf/hosted/common/v1/output_config.proto package pbcommon diff --git a/pb/sf/hosted/service/v1/pod_state.pb.go b/pb/sf/hosted/service/v1/pod_state.pb.go index 7dd092d..4fe9d1b 100644 --- a/pb/sf/hosted/service/v1/pod_state.pb.go +++ b/pb/sf/hosted/service/v1/pod_state.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.36.11 -// protoc (unknown) +// protoc v3.21.12 // source: sf/hosted/service/v1/pod_state.proto package pbservice diff --git a/pb/sf/hosted/service/v1/service.pb.go b/pb/sf/hosted/service/v1/service.pb.go index 34ed582..15a8321 100644 --- a/pb/sf/hosted/service/v1/service.pb.go +++ b/pb/sf/hosted/service/v1/service.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.36.11 -// protoc (unknown) +// protoc v3.21.12 // source: sf/hosted/service/v1/service.proto package pbservice @@ -26,9 +26,10 @@ type DeploymentRequest struct { state protoimpl.MessageState `protogen:"open.v1"` DeploymentId string `protobuf:"bytes,1,opt,name=deployment_id,json=deploymentId,proto3" json:"deployment_id,omitempty"` OrganizationId string `protobuf:"bytes,2,opt,name=organization_id,json=organizationId,proto3" json:"organization_id,omitempty"` - Name string `protobuf:"bytes,3,opt,name=name,proto3" json:"name,omitempty"` - DeploymentRequest *v1.DeploymentRequest `protobuf:"bytes,4,opt,name=deployment_request,json=deploymentRequest,proto3" json:"deployment_request,omitempty"` - ApiKey string `protobuf:"bytes,5,opt,name=api_key,json=apiKey,proto3" json:"api_key,omitempty"` + UserId string `protobuf:"bytes,3,opt,name=user_id,json=userId,proto3" json:"user_id,omitempty"` + Name string `protobuf:"bytes,4,opt,name=name,proto3" json:"name,omitempty"` + DeploymentRequest *v1.DeploymentRequest `protobuf:"bytes,5,opt,name=deployment_request,json=deploymentRequest,proto3" json:"deployment_request,omitempty"` + ApiKey string `protobuf:"bytes,6,opt,name=api_key,json=apiKey,proto3" json:"api_key,omitempty"` unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -77,6 +78,13 @@ func (x *DeploymentRequest) GetOrganizationId() string { return "" } +func (x *DeploymentRequest) GetUserId() string { + if x != nil { + return x.UserId + } + return "" +} + func (x *DeploymentRequest) GetName() string { if x != nil { return x.Name @@ -682,13 +690,14 @@ var File_sf_hosted_service_v1_service_proto protoreflect.FileDescriptor const file_sf_hosted_service_v1_service_proto_rawDesc = "" + "\n" + - "\"sf/hosted/service/v1/service.proto\x12\x1dsf.hosted.service.internal.v1\x1a'sf/hosted/common/v1/output_config.proto\x1a$sf/hosted/common/v1/deployment.proto\x1a$sf/hosted/service/v1/pod_state.proto\"\xe5\x01\n" + + "\"sf/hosted/service/v1/service.proto\x12\x1dsf.hosted.service.internal.v1\x1a'sf/hosted/common/v1/output_config.proto\x1a$sf/hosted/common/v1/deployment.proto\x1a$sf/hosted/service/v1/pod_state.proto\"\xfe\x01\n" + "\x11DeploymentRequest\x12#\n" + "\rdeployment_id\x18\x01 \x01(\tR\fdeploymentId\x12'\n" + - "\x0forganization_id\x18\x02 \x01(\tR\x0eorganizationId\x12\x12\n" + - "\x04name\x18\x03 \x01(\tR\x04name\x12U\n" + - "\x12deployment_request\x18\x04 \x01(\v2&.sf.hosted.common.v1.DeploymentRequestR\x11deploymentRequest\x12\x17\n" + - "\aapi_key\x18\x05 \x01(\tR\x06apiKey\"\x12\n" + + "\x0forganization_id\x18\x02 \x01(\tR\x0eorganizationId\x12\x17\n" + + "\auser_id\x18\x03 \x01(\tR\x06userId\x12\x12\n" + + "\x04name\x18\x04 \x01(\tR\x04name\x12U\n" + + "\x12deployment_request\x18\x05 \x01(\v2&.sf.hosted.common.v1.DeploymentRequestR\x11deploymentRequest\x12\x17\n" + + "\aapi_key\x18\x06 \x01(\tR\x06apiKey\"\x12\n" + "\x10DeploymentResult\"E\n" + "\x0fReplicaResponse\x12\x18\n" + "\asuccess\x18\x01 \x01(\bR\asuccess\x12\x18\n" + diff --git a/pb/sf/hosted/service/v1/service_grpc.pb.go b/pb/sf/hosted/service/v1/service_grpc.pb.go index a286d70..404797a 100644 --- a/pb/sf/hosted/service/v1/service_grpc.pb.go +++ b/pb/sf/hosted/service/v1/service_grpc.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: // - protoc-gen-go-grpc v1.6.1 -// - protoc (unknown) +// - protoc v3.21.12 // source: sf/hosted/service/v1/service.proto package pbservice diff --git a/proto-internal/sf/hosted/service/v1/service.proto b/proto-internal/sf/hosted/service/v1/service.proto index 9ec6c3b..9a09b33 100644 --- a/proto-internal/sf/hosted/service/v1/service.proto +++ b/proto-internal/sf/hosted/service/v1/service.proto @@ -19,9 +19,10 @@ service HostedService { message DeploymentRequest { string deployment_id = 1; string organization_id = 2; - string name = 3; - sf.hosted.common.v1.DeploymentRequest deployment_request = 4; - string api_key = 5; + string user_id = 3; + string name = 4; + sf.hosted.common.v1.DeploymentRequest deployment_request = 5; + string api_key = 6; } message DeploymentResult {