diff --git a/internal/internal_versioning_client.go b/internal/internal_versioning_client.go index 912517e06..373bad512 100644 --- a/internal/internal_versioning_client.go +++ b/internal/internal_versioning_client.go @@ -393,6 +393,17 @@ func statsFromResponse(stats *taskqueuepb.TaskQueueStats) *TaskQueueStats { } } +func statsByPriorityKeyFromResponse(statsByPriorityKey map[int32]*taskqueuepb.TaskQueueStats) map[int32]TaskQueueStats { + if statsByPriorityKey == nil { + return nil + } + result := make(map[int32]TaskQueueStats, len(statsByPriorityKey)) + for priority, stats := range statsByPriorityKey { + result[priority] = *statsFromResponse(stats) + } + return result +} + func taskQueueVersionInfoFromResponse(response *taskqueuepb.TaskQueueVersionInfo) TaskQueueVersionInfo { if response == nil { return TaskQueueVersionInfo{} diff --git a/internal/internal_worker_deployment_client.go b/internal/internal_worker_deployment_client.go index 1193fe638..912d3b500 100644 --- a/internal/internal_worker_deployment_client.go +++ b/internal/internal_worker_deployment_client.go @@ -362,6 +362,19 @@ func workerDeploymentVersionInfoFromProto(info *deployment.WorkerDeploymentVersi } } +func workerDeploymentVersionTaskQueuesFromProto(tqInfos []*workflowservice.DescribeWorkerDeploymentVersionResponse_VersionTaskQueue) []WorkerDeploymentTaskQueueInfo { + result := []WorkerDeploymentTaskQueueInfo{} + for _, info := range tqInfos { + result = append(result, WorkerDeploymentTaskQueueInfo{ + Name: info.GetName(), + Type: TaskQueueType(info.GetType()), + Stats: statsFromResponse(info.GetStats()), + StatsByPriorityKey: statsByPriorityKeyFromResponse(info.GetStatsByPriorityKey()), + }) + } + return result +} + func (h *workerDeploymentHandleImpl) DescribeVersion(ctx context.Context, options WorkerDeploymentDescribeVersionOptions) (WorkerDeploymentVersionDescription, error) { if err := h.validate(); err != nil { @@ -381,6 +394,7 @@ func (h *workerDeploymentHandleImpl) DescribeVersion(ctx context.Context, option BuildId: options.BuildID, DeploymentName: h.Name, }, + ReportTaskQueueStats: options.ReportTaskQueueStats, } grpcCtx, cancel := newGRPCContext(ctx, defaultGrpcRetryParameters(ctx)) defer cancel() @@ -391,7 +405,8 @@ func (h *workerDeploymentHandleImpl) DescribeVersion(ctx context.Context, option } return WorkerDeploymentVersionDescription{ - Info: workerDeploymentVersionInfoFromProto(resp.GetWorkerDeploymentVersionInfo()), + Info: workerDeploymentVersionInfoFromProto(resp.GetWorkerDeploymentVersionInfo()), + TaskQueueInfos: workerDeploymentVersionTaskQueuesFromProto(resp.GetVersionTaskQueues()), }, nil } diff --git a/internal/worker_deployment_client.go b/internal/worker_deployment_client.go index f89885dee..302f3d9a4 100644 --- a/internal/worker_deployment_client.go +++ b/internal/worker_deployment_client.go @@ -180,7 +180,7 @@ type ( ConflictToken []byte // PreviousVersion - The Version that was current before executing this operation, if any. - // + // // Deprecated: in favor of API idempotency. Use `Describe` before this API to get the previous // state. Pass the `ConflictToken` returned by `Describe` to this API to avoid race conditions. PreviousVersion *WorkerDeploymentVersion @@ -322,6 +322,8 @@ type ( WorkerDeploymentDescribeVersionOptions struct { // BuildID - A Build ID within this deployment to describe. BuildID string + // ReportTaskQueueStats - Whether to report stats for task queues which have been polled by this version. + ReportTaskQueueStats bool } // WorkerDeploymentTaskQueueInfo describes properties of the Task Queues involved @@ -336,6 +338,13 @@ type ( // Type - The type of this task queue. Type TaskQueueType + + // Stats - Only set if ReportTaskQueueStats is set on the request. + Stats *TaskQueueStats + + // StatsByPriorityKey - Task queue stats breakdown by priority key. Only contains actively used priority keys. + // Only set if ReportTaskQueueStats is set on the request. + StatsByPriorityKey map[int32]TaskQueueStats } // WorkerDeploymentVersionDrainageInfo describes drainage properties of a Deployment Version. @@ -382,6 +391,8 @@ type ( RampPercentage float32 // TaskQueuesInfos - List of task queues polled by workers in this Deployment Version. + // + // Deprecated: Use WorkerDeploymentVersionDescription.TaskQueueInfos instead. TaskQueuesInfos []WorkerDeploymentTaskQueueInfo // DrainageInfo - Drainage information for a Worker Deployment Version, enabling users to @@ -403,6 +414,10 @@ type ( WorkerDeploymentVersionDescription struct { // Info - Information about this Version. Info WorkerDeploymentVersionInfo + + // All the Task Queues that have ever polled from this Deployment version. + // Stats are only reported if explicitly requested. + TaskQueueInfos []WorkerDeploymentTaskQueueInfo } // WorkerDeploymentDeleteVersionOptions provides options for diff --git a/test/worker_deployment_test.go b/test/worker_deployment_test.go index 7e449afe3..31fab21ba 100644 --- a/test/worker_deployment_test.go +++ b/test/worker_deployment_test.go @@ -15,9 +15,9 @@ import ( "go.temporal.io/api/common/v1" enumspb "go.temporal.io/api/enums/v1" "go.temporal.io/api/workflowservice/v1" - "go.temporal.io/sdk/activity" "go.temporal.io/sdk/client" + "go.temporal.io/sdk/temporal" "go.temporal.io/sdk/worker" "go.temporal.io/sdk/workflow" ) @@ -1208,6 +1208,177 @@ func (ts *WorkerDeploymentTestSuite) TestCurrentVersion_AllowNoPollers() { ts.Nil(response2.Info.RoutingConfig.RampingVersion) } +func (ts *WorkerDeploymentTestSuite) TestDescribeVersionWithBacklogStats_NoPriority() { + ts.testDescribeVersionWithBacklogStats(false) +} + +func (ts *WorkerDeploymentTestSuite) TestDescribeVersionWithBacklogStats_YesPriority() { + ts.testDescribeVersionWithBacklogStats(true) +} + +func (ts *WorkerDeploymentTestSuite) testDescribeVersionWithBacklogStats(withPriority bool) { + ctx, cancel := context.WithTimeout(context.Background(), ctxTimeout) + defer cancel() + + deploymentName := "deploy-test-" + uuid.NewString() + v1 := worker.WorkerDeploymentVersion{ + DeploymentName: deploymentName, + BuildID: "1.0", + } + + // Start a worker briefly to register the deployment version + worker1 := worker.New(ts.client, ts.taskQueueName, worker.Options{ + DeploymentOptions: worker.DeploymentOptions{ + UseVersioning: true, + Version: v1, + }, + }) + worker1.RegisterWorkflowWithOptions(ts.workflows.WaitSignalToStartVersionedOne, workflow.RegisterOptions{ + Name: "WaitSignalToStartVersioned", + VersioningBehavior: workflow.VersioningBehaviorPinned, + }) + + ts.NoError(worker1.Start()) + + // Wait for the worker deployment and version to exist + dHandle := ts.client.WorkerDeploymentClient().GetHandle(deploymentName) + ts.waitForWorkerDeployment(ctx, dHandle) + response1, err := dHandle.Describe(ctx, client.WorkerDeploymentDescribeOptions{}) + ts.NoError(err) + ts.waitForWorkerDeploymentVersion(ctx, dHandle, v1) + + // SetCurrent to v1 so that workflows start on that version + _, err = dHandle.SetCurrentVersion(ctx, client.WorkerDeploymentSetCurrentVersionOptions{ + BuildID: v1.BuildID, + ConflictToken: response1.ConflictToken, + }) + ts.NoError(err) + + // Stop the worker so workflows create backlog + worker1.Stop() + + // Start workflows with different priority keys to create task queue backlog + // Priority keys: 1 (high), 3 (medium/default), 5 (low) + priorityKeys := []int{1, 3, 5} + workflowsPerPriority := 2 + wfHandles := make([]client.WorkflowRun, 0, len(priorityKeys)*workflowsPerPriority) + for _, priorityKey := range priorityKeys { + for j := 0; j < workflowsPerPriority; j++ { + wfID := "backlog-test-" + uuid.NewString() + opts := ts.startWorkflowOptions(wfID) + // Only use priority keys if we are testing with priority keys so that we can verify + // that per-priority stats are not returned when priority keys are not set. + if withPriority { + opts.Priority = temporal.Priority{PriorityKey: priorityKey} + } + handle, err := ts.client.ExecuteWorkflow(ctx, opts, "WaitSignalToStartVersioned") + ts.NoError(err) + wfHandles = append(wfHandles, handle) + } + } + + // Wait for backlog stats to be reflected - stats may take time to propagate to version info + ts.Eventuallyf(func() bool { + desc, err := dHandle.DescribeVersion(ctx, client.WorkerDeploymentDescribeVersionOptions{ + BuildID: v1.BuildID, + ReportTaskQueueStats: true, + }) + if err != nil { + return false + } + if desc.Info.Version != v1 { + return false + } + // Find the workflow task queue and check backlog stats + for _, tqInfo := range desc.TaskQueueInfos { + if tqInfo.Name == ts.taskQueueName && tqInfo.Type == client.TaskQueueTypeWorkflow && tqInfo.Stats != nil { + if !(tqInfo.Stats.ApproximateBacklogCount > 0 && + tqInfo.Stats.ApproximateBacklogAge.Nanoseconds() > 0 && + tqInfo.Stats.TasksAddRate > 0 && + tqInfo.Stats.TasksDispatchRate == 0 && // zero task dispatch due to no pollers + tqInfo.Stats.BacklogIncreaseRate > 0) { + ts.T().Logf("Unexpected backlog stats for version: %+v", tqInfo.Stats) + return false + } + // If the backlog stats have propagated to the version info, check that per-priority stats are as expected. + for priorityKey, priorityKeyStats := range tqInfo.StatsByPriorityKey { + // If PriorityKey is set on the workflow, StatsByPriorityKey will be non-nil + // even if the Priority feature is not enabled in the test namespace. + // Until Priority is enabled by default in all SDK test namespaces, this + // test will be relaxed so that it can pass against a non-Priority-enabled namespace. + // Instead of checking that each StatsByPriorityKey entry has a non-empty ApproximateBacklogCount, + // We just check that for each expected priority key the entry exists and has a non-zero + // BacklogIncreaseRate and TasksAddRate. For the default priority key (3), we check that it has an + // ApproximateBacklogCount of either 2 (if Priority is enabled) or 6 (if Priority is not enabled). + if priorityKey != 3 { + if withPriority { + if !(priorityKeyStats.TasksAddRate > 0 && + priorityKeyStats.TasksDispatchRate == 0 && // zero task dispatch due to no pollers + priorityKeyStats.BacklogIncreaseRate > 0) { + ts.T().Logf("Unexpected backlog stats for priority key %v: %+v", priorityKey, priorityKeyStats) + return false + } + } else { + ts.T().Logf("No priority keys set, so only the default priority key should be present, but found key %v", priorityKey) + return false + } + } else { + if !((tqInfo.Stats.ApproximateBacklogCount == 2 || tqInfo.Stats.ApproximateBacklogCount == 6) && + tqInfo.Stats.ApproximateBacklogAge.Nanoseconds() > 0 && + tqInfo.Stats.TasksAddRate > 0 && + tqInfo.Stats.TasksDispatchRate == 0 && // zero task dispatch due to no pollers + tqInfo.Stats.BacklogIncreaseRate > 0) { + ts.T().Logf("Unexpected backlog stats for default priority key %v: %+v", priorityKey, priorityKeyStats) + return false + } + } + } + if withPriority { + return len(tqInfo.StatsByPriorityKey) == len(priorityKeys) + } else { + return len(tqInfo.StatsByPriorityKey) == 1 + } + } + } + return false + }, 10*time.Second, 500*time.Millisecond, "timeout waiting for expected backlog stats to be reflected in version info") + + // Also test that stats are NOT returned when ReportTaskQueueStats is false + descNoStats, err := dHandle.DescribeVersion(ctx, client.WorkerDeploymentDescribeVersionOptions{ + BuildID: v1.BuildID, + ReportTaskQueueStats: false, + }) + ts.NoError(err) + for _, tqInfo := range descNoStats.TaskQueueInfos { + ts.Nil(tqInfo.Stats, "Stats should be nil when ReportTaskQueueStats is false") + } + + // Cleanup: restart worker and complete workflows + worker2 := worker.New(ts.client, ts.taskQueueName, worker.Options{ + DeploymentOptions: worker.DeploymentOptions{ + UseVersioning: true, + Version: v1, + }, + }) + worker2.RegisterWorkflowWithOptions(ts.workflows.WaitSignalToStartVersionedOne, workflow.RegisterOptions{ + Name: "WaitSignalToStartVersioned", + VersioningBehavior: workflow.VersioningBehaviorPinned, + }) + + ts.NoError(worker2.Start()) + defer worker2.Stop() + + // Signal all workflows to complete + for _, handle := range wfHandles { + ts.NoError(ts.client.SignalWorkflow(ctx, handle.GetID(), handle.GetRunID(), "start-signal", "prefix")) + } + // Wait for workflows to complete + for _, handle := range wfHandles { + var result string + ts.NoError(handle.Get(ctx, &result)) + } +} + func (ts *WorkerDeploymentTestSuite) TestDeleteDeployment() { if os.Getenv("DISABLE_SERVER_1_27_TESTS") != "" { ts.T().Skip("temporal server 1.27+ required")