From 913f4d83f5b94a9b5effc29e435088c9f4dda581 Mon Sep 17 00:00:00 2001 From: Carly de Frondeville Date: Tue, 20 Jan 2026 20:04:48 -0800 Subject: [PATCH 1/6] include optional task queue stats in describe version --- internal/internal_versioning_client.go | 11 +++++ internal/internal_worker_deployment_client.go | 40 ++++++++++++++++++- internal/worker_deployment_client.go | 15 ++++++- 3 files changed, 64 insertions(+), 2 deletions(-) diff --git a/internal/internal_versioning_client.go b/internal/internal_versioning_client.go index 912517e06..373bad512 100644 --- a/internal/internal_versioning_client.go +++ b/internal/internal_versioning_client.go @@ -393,6 +393,17 @@ func statsFromResponse(stats *taskqueuepb.TaskQueueStats) *TaskQueueStats { } } +func statsByPriorityKeyFromResponse(statsByPriorityKey map[int32]*taskqueuepb.TaskQueueStats) map[int32]TaskQueueStats { + if statsByPriorityKey == nil { + return nil + } + result := make(map[int32]TaskQueueStats, len(statsByPriorityKey)) + for priority, stats := range statsByPriorityKey { + result[priority] = *statsFromResponse(stats) + } + return result +} + func taskQueueVersionInfoFromResponse(response *taskqueuepb.TaskQueueVersionInfo) TaskQueueVersionInfo { if response == nil { return TaskQueueVersionInfo{} diff --git a/internal/internal_worker_deployment_client.go b/internal/internal_worker_deployment_client.go index 1193fe638..469b7518b 100644 --- a/internal/internal_worker_deployment_client.go +++ b/internal/internal_worker_deployment_client.go @@ -328,6 +328,19 @@ func workerDeploymentTaskQueuesInfosFromProto(tqInfos []*deployment.WorkerDeploy return result } +func workerDeploymentTaskQueueInfosFromDescribeVersionProto(tqInfos []*workflowservice.DescribeWorkerDeploymentVersionResponse_VersionTaskQueue) []WorkerDeploymentTaskQueueInfo { + result := []WorkerDeploymentTaskQueueInfo{} + for _, info := range tqInfos { + result = append(result, WorkerDeploymentTaskQueueInfo{ + Name: info.GetName(), + Type: TaskQueueType(info.GetType()), + Stats: statsFromResponse(info.GetStats()), + StatsByPriorityKey: statsByPriorityKeyFromResponse(info.GetStatsByPriorityKey()), + }) + } + return result +} + func workerDeploymentDrainageInfoFromProto(drainageInfo *deployment.VersionDrainageInfo) *WorkerDeploymentVersionDrainageInfo { if drainageInfo == nil { return nil @@ -362,6 +375,29 @@ func workerDeploymentVersionInfoFromProto(info *deployment.WorkerDeploymentVersi } } +func workerDeploymentVersionTaskQueuesFromProto(info *deployment.WorkerDeploymentVersionInfo) WorkerDeploymentVersionInfo { + if info == nil { + return WorkerDeploymentVersionInfo{} + } + //lint:ignore SA1019 ignore deprecated versioning APIs + version := workerDeploymentVersionFromProtoOrString(info.DeploymentVersion, info.Version) + if version == nil { + // Should never happen unless server is sending junk data + version = &WorkerDeploymentVersion{} + } + return WorkerDeploymentVersionInfo{ + Version: *version, + CreateTime: safeAsTime(info.CreateTime), + RoutingChangedTime: safeAsTime(info.RoutingChangedTime), + CurrentSinceTime: safeAsTime(info.CurrentSinceTime), + RampingSinceTime: safeAsTime(info.RampingSinceTime), + RampPercentage: info.RampPercentage, + TaskQueuesInfos: workerDeploymentTaskQueuesInfosFromProto(info.TaskQueueInfos), + DrainageInfo: workerDeploymentDrainageInfoFromProto(info.DrainageInfo), + Metadata: info.Metadata.GetEntries(), + } +} + func (h *workerDeploymentHandleImpl) DescribeVersion(ctx context.Context, options WorkerDeploymentDescribeVersionOptions) (WorkerDeploymentVersionDescription, error) { if err := h.validate(); err != nil { @@ -381,6 +417,7 @@ func (h *workerDeploymentHandleImpl) DescribeVersion(ctx context.Context, option BuildId: options.BuildID, DeploymentName: h.Name, }, + ReportTaskQueueStats: options.ReportTaskQueueStats, } grpcCtx, cancel := newGRPCContext(ctx, defaultGrpcRetryParameters(ctx)) defer cancel() @@ -391,7 +428,8 @@ func (h *workerDeploymentHandleImpl) DescribeVersion(ctx context.Context, option } return WorkerDeploymentVersionDescription{ - Info: workerDeploymentVersionInfoFromProto(resp.GetWorkerDeploymentVersionInfo()), + Info: workerDeploymentVersionInfoFromProto(resp.GetWorkerDeploymentVersionInfo()), + TaskQueueInfos: workerDeploymentTaskQueueInfosFromDescribeVersionProto(resp.GetVersionTaskQueues()), }, nil } diff --git a/internal/worker_deployment_client.go b/internal/worker_deployment_client.go index f89885dee..46be0fab4 100644 --- a/internal/worker_deployment_client.go +++ b/internal/worker_deployment_client.go @@ -5,6 +5,7 @@ import ( "time" commonpb "go.temporal.io/api/common/v1" + "go.temporal.io/sdk/client" ) // WorkerDeploymentVersionDrainageStatus specifies the drainage status for a Worker @@ -180,7 +181,7 @@ type ( ConflictToken []byte // PreviousVersion - The Version that was current before executing this operation, if any. - // + // // Deprecated: in favor of API idempotency. Use `Describe` before this API to get the previous // state. Pass the `ConflictToken` returned by `Describe` to this API to avoid race conditions. PreviousVersion *WorkerDeploymentVersion @@ -322,6 +323,8 @@ type ( WorkerDeploymentDescribeVersionOptions struct { // BuildID - A Build ID within this deployment to describe. BuildID string + // ReportTaskQueueStats - Whether to report stats for task queues which have been polled by this version. + ReportTaskQueueStats bool } // WorkerDeploymentTaskQueueInfo describes properties of the Task Queues involved @@ -336,6 +339,13 @@ type ( // Type - The type of this task queue. Type TaskQueueType + + // Stats - Only set if ReportTaskQueueStats is set on the request. + Stats *TaskQueueStats + + // StatsByPriorityKey - Task queue stats breakdown by priority key. Only contains actively used priority keys. + // Only set if ReportTaskQueueStats is set on the request. + StatsByPriorityKey map[int32]TaskQueueStats } // WorkerDeploymentVersionDrainageInfo describes drainage properties of a Deployment Version. @@ -403,6 +413,9 @@ type ( WorkerDeploymentVersionDescription struct { // Info - Information about this Version. Info WorkerDeploymentVersionInfo + + // All the Task Queues that have ever polled from this Deployment version. + TaskQueueInfos []WorkerDeploymentTaskQueueInfo } // WorkerDeploymentDeleteVersionOptions provides options for From e8d585d6333695ccc739254a0bd1eae3f1c5cf23 Mon Sep 17 00:00:00 2001 From: Carly de Frondeville Date: Thu, 29 Jan 2026 21:59:06 -0700 Subject: [PATCH 2/6] remove unused import --- internal/worker_deployment_client.go | 1 - 1 file changed, 1 deletion(-) diff --git a/internal/worker_deployment_client.go b/internal/worker_deployment_client.go index 46be0fab4..b0aeca9c0 100644 --- a/internal/worker_deployment_client.go +++ b/internal/worker_deployment_client.go @@ -5,7 +5,6 @@ import ( "time" commonpb "go.temporal.io/api/common/v1" - "go.temporal.io/sdk/client" ) // WorkerDeploymentVersionDrainageStatus specifies the drainage status for a Worker From 6f58e5fa216781c30c275ea5ff46617750f519d1 Mon Sep 17 00:00:00 2001 From: Carly de Frondeville Date: Fri, 30 Jan 2026 17:10:34 -0800 Subject: [PATCH 3/6] test describe version with backlog stats --- test/worker_deployment_test.go | 173 ++++++++++++++++++++++++++++++++- 1 file changed, 172 insertions(+), 1 deletion(-) diff --git a/test/worker_deployment_test.go b/test/worker_deployment_test.go index 7e449afe3..31fab21ba 100644 --- a/test/worker_deployment_test.go +++ b/test/worker_deployment_test.go @@ -15,9 +15,9 @@ import ( "go.temporal.io/api/common/v1" enumspb "go.temporal.io/api/enums/v1" "go.temporal.io/api/workflowservice/v1" - "go.temporal.io/sdk/activity" "go.temporal.io/sdk/client" + "go.temporal.io/sdk/temporal" "go.temporal.io/sdk/worker" "go.temporal.io/sdk/workflow" ) @@ -1208,6 +1208,177 @@ func (ts *WorkerDeploymentTestSuite) TestCurrentVersion_AllowNoPollers() { ts.Nil(response2.Info.RoutingConfig.RampingVersion) } +func (ts *WorkerDeploymentTestSuite) TestDescribeVersionWithBacklogStats_NoPriority() { + ts.testDescribeVersionWithBacklogStats(false) +} + +func (ts *WorkerDeploymentTestSuite) TestDescribeVersionWithBacklogStats_YesPriority() { + ts.testDescribeVersionWithBacklogStats(true) +} + +func (ts *WorkerDeploymentTestSuite) testDescribeVersionWithBacklogStats(withPriority bool) { + ctx, cancel := context.WithTimeout(context.Background(), ctxTimeout) + defer cancel() + + deploymentName := "deploy-test-" + uuid.NewString() + v1 := worker.WorkerDeploymentVersion{ + DeploymentName: deploymentName, + BuildID: "1.0", + } + + // Start a worker briefly to register the deployment version + worker1 := worker.New(ts.client, ts.taskQueueName, worker.Options{ + DeploymentOptions: worker.DeploymentOptions{ + UseVersioning: true, + Version: v1, + }, + }) + worker1.RegisterWorkflowWithOptions(ts.workflows.WaitSignalToStartVersionedOne, workflow.RegisterOptions{ + Name: "WaitSignalToStartVersioned", + VersioningBehavior: workflow.VersioningBehaviorPinned, + }) + + ts.NoError(worker1.Start()) + + // Wait for the worker deployment and version to exist + dHandle := ts.client.WorkerDeploymentClient().GetHandle(deploymentName) + ts.waitForWorkerDeployment(ctx, dHandle) + response1, err := dHandle.Describe(ctx, client.WorkerDeploymentDescribeOptions{}) + ts.NoError(err) + ts.waitForWorkerDeploymentVersion(ctx, dHandle, v1) + + // SetCurrent to v1 so that workflows start on that version + _, err = dHandle.SetCurrentVersion(ctx, client.WorkerDeploymentSetCurrentVersionOptions{ + BuildID: v1.BuildID, + ConflictToken: response1.ConflictToken, + }) + ts.NoError(err) + + // Stop the worker so workflows create backlog + worker1.Stop() + + // Start workflows with different priority keys to create task queue backlog + // Priority keys: 1 (high), 3 (medium/default), 5 (low) + priorityKeys := []int{1, 3, 5} + workflowsPerPriority := 2 + wfHandles := make([]client.WorkflowRun, 0, len(priorityKeys)*workflowsPerPriority) + for _, priorityKey := range priorityKeys { + for j := 0; j < workflowsPerPriority; j++ { + wfID := "backlog-test-" + uuid.NewString() + opts := ts.startWorkflowOptions(wfID) + // Only use priority keys if we are testing with priority keys so that we can verify + // that per-priority stats are not returned when priority keys are not set. + if withPriority { + opts.Priority = temporal.Priority{PriorityKey: priorityKey} + } + handle, err := ts.client.ExecuteWorkflow(ctx, opts, "WaitSignalToStartVersioned") + ts.NoError(err) + wfHandles = append(wfHandles, handle) + } + } + + // Wait for backlog stats to be reflected - stats may take time to propagate to version info + ts.Eventuallyf(func() bool { + desc, err := dHandle.DescribeVersion(ctx, client.WorkerDeploymentDescribeVersionOptions{ + BuildID: v1.BuildID, + ReportTaskQueueStats: true, + }) + if err != nil { + return false + } + if desc.Info.Version != v1 { + return false + } + // Find the workflow task queue and check backlog stats + for _, tqInfo := range desc.TaskQueueInfos { + if tqInfo.Name == ts.taskQueueName && tqInfo.Type == client.TaskQueueTypeWorkflow && tqInfo.Stats != nil { + if !(tqInfo.Stats.ApproximateBacklogCount > 0 && + tqInfo.Stats.ApproximateBacklogAge.Nanoseconds() > 0 && + tqInfo.Stats.TasksAddRate > 0 && + tqInfo.Stats.TasksDispatchRate == 0 && // zero task dispatch due to no pollers + tqInfo.Stats.BacklogIncreaseRate > 0) { + ts.T().Logf("Unexpected backlog stats for version: %+v", tqInfo.Stats) + return false + } + // If the backlog stats have propagated to the version info, check that per-priority stats are as expected. + for priorityKey, priorityKeyStats := range tqInfo.StatsByPriorityKey { + // If PriorityKey is set on the workflow, StatsByPriorityKey will be non-nil + // even if the Priority feature is not enabled in the test namespace. + // Until Priority is enabled by default in all SDK test namespaces, this + // test will be relaxed so that it can pass against a non-Priority-enabled namespace. + // Instead of checking that each StatsByPriorityKey entry has a non-empty ApproximateBacklogCount, + // We just check that for each expected priority key the entry exists and has a non-zero + // BacklogIncreaseRate and TasksAddRate. For the default priority key (3), we check that it has an + // ApproximateBacklogCount of either 2 (if Priority is enabled) or 6 (if Priority is not enabled). + if priorityKey != 3 { + if withPriority { + if !(priorityKeyStats.TasksAddRate > 0 && + priorityKeyStats.TasksDispatchRate == 0 && // zero task dispatch due to no pollers + priorityKeyStats.BacklogIncreaseRate > 0) { + ts.T().Logf("Unexpected backlog stats for priority key %v: %+v", priorityKey, priorityKeyStats) + return false + } + } else { + ts.T().Logf("No priority keys set, so only the default priority key should be present, but found key %v", priorityKey) + return false + } + } else { + if !((tqInfo.Stats.ApproximateBacklogCount == 2 || tqInfo.Stats.ApproximateBacklogCount == 6) && + tqInfo.Stats.ApproximateBacklogAge.Nanoseconds() > 0 && + tqInfo.Stats.TasksAddRate > 0 && + tqInfo.Stats.TasksDispatchRate == 0 && // zero task dispatch due to no pollers + tqInfo.Stats.BacklogIncreaseRate > 0) { + ts.T().Logf("Unexpected backlog stats for default priority key %v: %+v", priorityKey, priorityKeyStats) + return false + } + } + } + if withPriority { + return len(tqInfo.StatsByPriorityKey) == len(priorityKeys) + } else { + return len(tqInfo.StatsByPriorityKey) == 1 + } + } + } + return false + }, 10*time.Second, 500*time.Millisecond, "timeout waiting for expected backlog stats to be reflected in version info") + + // Also test that stats are NOT returned when ReportTaskQueueStats is false + descNoStats, err := dHandle.DescribeVersion(ctx, client.WorkerDeploymentDescribeVersionOptions{ + BuildID: v1.BuildID, + ReportTaskQueueStats: false, + }) + ts.NoError(err) + for _, tqInfo := range descNoStats.TaskQueueInfos { + ts.Nil(tqInfo.Stats, "Stats should be nil when ReportTaskQueueStats is false") + } + + // Cleanup: restart worker and complete workflows + worker2 := worker.New(ts.client, ts.taskQueueName, worker.Options{ + DeploymentOptions: worker.DeploymentOptions{ + UseVersioning: true, + Version: v1, + }, + }) + worker2.RegisterWorkflowWithOptions(ts.workflows.WaitSignalToStartVersionedOne, workflow.RegisterOptions{ + Name: "WaitSignalToStartVersioned", + VersioningBehavior: workflow.VersioningBehaviorPinned, + }) + + ts.NoError(worker2.Start()) + defer worker2.Stop() + + // Signal all workflows to complete + for _, handle := range wfHandles { + ts.NoError(ts.client.SignalWorkflow(ctx, handle.GetID(), handle.GetRunID(), "start-signal", "prefix")) + } + // Wait for workflows to complete + for _, handle := range wfHandles { + var result string + ts.NoError(handle.Get(ctx, &result)) + } +} + func (ts *WorkerDeploymentTestSuite) TestDeleteDeployment() { if os.Getenv("DISABLE_SERVER_1_27_TESTS") != "" { ts.T().Skip("temporal server 1.27+ required") From 7bf1c5573c126c6a12357fe3a5689aadc8b9abc6 Mon Sep 17 00:00:00 2001 From: Carly de Frondeville Date: Fri, 30 Jan 2026 17:16:25 -0800 Subject: [PATCH 4/6] mark WorkerDeploymentVersionInfo.TaskQueuesInfos as deprecated to match proto deprecation --- internal/worker_deployment_client.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/internal/worker_deployment_client.go b/internal/worker_deployment_client.go index b0aeca9c0..549b3323d 100644 --- a/internal/worker_deployment_client.go +++ b/internal/worker_deployment_client.go @@ -391,6 +391,8 @@ type ( RampPercentage float32 // TaskQueuesInfos - List of task queues polled by workers in this Deployment Version. + // + // Deprecated: Use WorkerDeploymentVersionDescription.TaskQueueInfos instead. TaskQueuesInfos []WorkerDeploymentTaskQueueInfo // DrainageInfo - Drainage information for a Worker Deployment Version, enabling users to From 743d8a8271ed428b2830a32b81a848f8401d59f5 Mon Sep 17 00:00:00 2001 From: Carly de Frondeville Date: Fri, 30 Jan 2026 17:18:53 -0800 Subject: [PATCH 5/6] more docstring --- internal/worker_deployment_client.go | 1 + 1 file changed, 1 insertion(+) diff --git a/internal/worker_deployment_client.go b/internal/worker_deployment_client.go index 549b3323d..302f3d9a4 100644 --- a/internal/worker_deployment_client.go +++ b/internal/worker_deployment_client.go @@ -416,6 +416,7 @@ type ( Info WorkerDeploymentVersionInfo // All the Task Queues that have ever polled from this Deployment version. + // Stats are only reported if explicitly requested. TaskQueueInfos []WorkerDeploymentTaskQueueInfo } From d6f3f6890ab1c87bf937e3e1ac4d6505799c3f4a Mon Sep 17 00:00:00 2001 From: Carly de Frondeville Date: Fri, 30 Jan 2026 17:21:14 -0800 Subject: [PATCH 6/6] remove unused function --- internal/internal_worker_deployment_client.go | 45 +++++-------------- 1 file changed, 11 insertions(+), 34 deletions(-) diff --git a/internal/internal_worker_deployment_client.go b/internal/internal_worker_deployment_client.go index 469b7518b..912d3b500 100644 --- a/internal/internal_worker_deployment_client.go +++ b/internal/internal_worker_deployment_client.go @@ -328,19 +328,6 @@ func workerDeploymentTaskQueuesInfosFromProto(tqInfos []*deployment.WorkerDeploy return result } -func workerDeploymentTaskQueueInfosFromDescribeVersionProto(tqInfos []*workflowservice.DescribeWorkerDeploymentVersionResponse_VersionTaskQueue) []WorkerDeploymentTaskQueueInfo { - result := []WorkerDeploymentTaskQueueInfo{} - for _, info := range tqInfos { - result = append(result, WorkerDeploymentTaskQueueInfo{ - Name: info.GetName(), - Type: TaskQueueType(info.GetType()), - Stats: statsFromResponse(info.GetStats()), - StatsByPriorityKey: statsByPriorityKeyFromResponse(info.GetStatsByPriorityKey()), - }) - } - return result -} - func workerDeploymentDrainageInfoFromProto(drainageInfo *deployment.VersionDrainageInfo) *WorkerDeploymentVersionDrainageInfo { if drainageInfo == nil { return nil @@ -375,27 +362,17 @@ func workerDeploymentVersionInfoFromProto(info *deployment.WorkerDeploymentVersi } } -func workerDeploymentVersionTaskQueuesFromProto(info *deployment.WorkerDeploymentVersionInfo) WorkerDeploymentVersionInfo { - if info == nil { - return WorkerDeploymentVersionInfo{} - } - //lint:ignore SA1019 ignore deprecated versioning APIs - version := workerDeploymentVersionFromProtoOrString(info.DeploymentVersion, info.Version) - if version == nil { - // Should never happen unless server is sending junk data - version = &WorkerDeploymentVersion{} - } - return WorkerDeploymentVersionInfo{ - Version: *version, - CreateTime: safeAsTime(info.CreateTime), - RoutingChangedTime: safeAsTime(info.RoutingChangedTime), - CurrentSinceTime: safeAsTime(info.CurrentSinceTime), - RampingSinceTime: safeAsTime(info.RampingSinceTime), - RampPercentage: info.RampPercentage, - TaskQueuesInfos: workerDeploymentTaskQueuesInfosFromProto(info.TaskQueueInfos), - DrainageInfo: workerDeploymentDrainageInfoFromProto(info.DrainageInfo), - Metadata: info.Metadata.GetEntries(), +func workerDeploymentVersionTaskQueuesFromProto(tqInfos []*workflowservice.DescribeWorkerDeploymentVersionResponse_VersionTaskQueue) []WorkerDeploymentTaskQueueInfo { + result := []WorkerDeploymentTaskQueueInfo{} + for _, info := range tqInfos { + result = append(result, WorkerDeploymentTaskQueueInfo{ + Name: info.GetName(), + Type: TaskQueueType(info.GetType()), + Stats: statsFromResponse(info.GetStats()), + StatsByPriorityKey: statsByPriorityKeyFromResponse(info.GetStatsByPriorityKey()), + }) } + return result } func (h *workerDeploymentHandleImpl) DescribeVersion(ctx context.Context, options WorkerDeploymentDescribeVersionOptions) (WorkerDeploymentVersionDescription, error) { @@ -429,7 +406,7 @@ func (h *workerDeploymentHandleImpl) DescribeVersion(ctx context.Context, option return WorkerDeploymentVersionDescription{ Info: workerDeploymentVersionInfoFromProto(resp.GetWorkerDeploymentVersionInfo()), - TaskQueueInfos: workerDeploymentTaskQueueInfosFromDescribeVersionProto(resp.GetVersionTaskQueues()), + TaskQueueInfos: workerDeploymentVersionTaskQueuesFromProto(resp.GetVersionTaskQueues()), }, nil }