Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
eece8ea
Automatic request header support
cretz Jan 29, 2026
3004fce
Populate resource_id fields for request header routing
tconley1428 Mar 11, 2026
9d199d8
Add resource_id population for Nexus task requests
tconley1428 Mar 11, 2026
89129e9
Add comprehensive activity task resource ID tests
tconley1428 Mar 12, 2026
2dfe1a4
Fix partial validation of RecordActivityTaskHeartbeatByIdRequest
tconley1428 Mar 12, 2026
636af76
Remove resource_id implementations for messages without proto fields
tconley1428 Mar 12, 2026
746f006
Complete systematic verification of all resource ID field tests
tconley1428 Mar 12, 2026
a146692
Rename test file and remove working state documentation
tconley1428 Mar 12, 2026
613448b
Update resource ids with prefixes
tconley1428 Mar 18, 2026
c79cc72
Standardize resource_id prefix responsibility in SDK
tconley1428 Mar 20, 2026
c213166
Update go.temporal.io/api to v1.62.6, remove local replace directive
tconley1428 Mar 25, 2026
16e0b56
Merge branch 'master' into populate-resource-id-fields
tconley1428 Mar 25, 2026
f8bdc0f
Remove unused test helper and add empty resource ID test
tconley1428 Mar 25, 2026
26f71c0
Run go mod tidy in internal/cmd/build
tconley1428 Mar 25, 2026
7a7413d
Run go mod tidy across all modules
tconley1428 Mar 25, 2026
68f7164
Merge remote-tracking branch 'origin/master' into populate-resource-i…
tconley1428 Mar 25, 2026
3b10ab8
Merge branch 'master' into populate-resource-id-fields
tconley1428 Mar 26, 2026
7df41ba
Address review feedback: fix CompleteActivity resource ID and nil-safety
tconley1428 Mar 26, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ go 1.23.0

toolchain go1.23.6

replace go.temporal.io/api => ../api-go

require (
github.com/facebookgo/clock v0.0.0-20150410010913-600d898af40a
github.com/gogo/protobuf v1.3.2
Expand Down
20 changes: 13 additions & 7 deletions internal/grpc_dialer.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/retry"
"go.temporal.io/api/proxy"
"go.temporal.io/api/serviceerror"
"go.temporal.io/sdk/internal/common/metrics"
"go.temporal.io/sdk/internal/common/retry"
Expand All @@ -17,6 +18,7 @@ import (
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
)

type (
Expand Down Expand Up @@ -157,17 +159,21 @@ func requiredInterceptors(
interceptors = append(interceptors, interceptor)
}
}
// Add namespace provider interceptor
interceptors = append(interceptors, namespaceProviderInterceptor())
// Add temporal header interceptor (namespace + resource ID)
interceptors = append(interceptors, temporalHeaderInterceptor())
return interceptors
}

func namespaceProviderInterceptor() grpc.UnaryClientInterceptor {
func temporalHeaderInterceptor() grpc.UnaryClientInterceptor {
return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
if nsReq, ok := req.(interface{ GetNamespace() string }); ok {
// Only add namespace if it doesn't already exist
if md, _ := metadata.FromOutgoingContext(ctx); len(md.Get(temporalNamespaceHeaderKey)) == 0 {
ctx = metadata.AppendToOutgoingContext(ctx, temporalNamespaceHeaderKey, nsReq.GetNamespace())
var extractOpts proxy.ExtractHeadersOptions
extractOpts.Request, _ = req.(proto.Message)
extractOpts.ExistingMetadata, _ = metadata.FromOutgoingContext(ctx)
if extractOpts.Request != nil {
if headers, err := proxy.ExtractTemporalRequestHeaders(ctx, extractOpts); err != nil {
return err
} else if len(headers) > 0 {
ctx = metadata.AppendToOutgoingContext(ctx, headers...)
}
}
return invoker(ctx, method, req, reply, cc, opts...)
Expand Down
10 changes: 8 additions & 2 deletions internal/grpc_dialer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -547,7 +547,7 @@ func TestExistingContextMetadataJoinedWithSDKHeaders(t *testing.T) {
)
}

func TestNamespaceInterceptor(t *testing.T) {
func TestTemporalHeaderInterceptor(t *testing.T) {
srv, err := startTestGRPCServer()
require.NoError(t, err)
defer srv.Stop()
Expand All @@ -566,12 +566,18 @@ func TestNamespaceInterceptor(t *testing.T) {
metadata.ValueFromIncomingContext(srv.getSystemInfoRequestContext, temporalNamespaceHeaderKey),
)
// Verify namespace header is set on a request that does have namespace on the request
require.NoError(t, client.SignalWorkflow(context.Background(), "workflowid", "runid", "signalname", nil))
require.NoError(t, client.SignalWorkflow(context.Background(), "test-workflow-id", "runid", "signalname", nil))
require.Equal(
t,
[]string{"test-namespace"},
metadata.ValueFromIncomingContext(srv.lastSignalWorkflowExecutionContext, temporalNamespaceHeaderKey),
)
// Verify resource-id header is also set
require.Equal(
t,
[]string{"workflow:test-workflow-id"},
metadata.ValueFromIncomingContext(srv.lastSignalWorkflowExecutionContext, "temporal-resource-id"),
)
}

func TestCredentialsMTLS(t *testing.T) {
Expand Down
30 changes: 16 additions & 14 deletions internal/internal_nexus_task_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func (h *nexusTaskHandler) Execute(task *workflowservice.PollNexusTaskQueueRespo
failureReasonSupport := getEffectiveTemporalFailureResponses(task.GetRequest().GetCapabilities().GetTemporalFailureResponses())
nctx, handlerErr := h.newNexusOperationContext(task)
if handlerErr != nil {
failureRequest, err := h.fillInFailure(task.TaskToken, handlerErr, failureReasonSupport)
failureRequest, err := h.fillInFailure(task.TaskToken, task.ResourceId, handlerErr, failureReasonSupport)
if err != nil {
return nil, nil, err
}
Expand All @@ -107,13 +107,13 @@ func (h *nexusTaskHandler) Execute(task *workflowservice.PollNexusTaskQueueRespo
return nil, nil, err
}
if handlerErr != nil {
failureRequest, err := h.fillInFailure(task.TaskToken, handlerErr, failureReasonSupport)
failureRequest, err := h.fillInFailure(task.TaskToken, task.ResourceId, handlerErr, failureReasonSupport)
if err != nil {
return nil, nil, err
}
return nil, failureRequest, nil
}
completedRequest, err := h.fillInCompletion(task.TaskToken, res, failureReasonSupport)
completedRequest, err := h.fillInCompletion(task.TaskToken, task.ResourceId, res, failureReasonSupport)
if err != nil {
return nil, nil, err
}
Expand All @@ -127,13 +127,13 @@ func (h *nexusTaskHandler) ExecuteContext(nctx *NexusOperationContext, task *wor
return nil, nil, err
}
if handlerErr != nil {
failureRequest, err := h.fillInFailure(task.TaskToken, handlerErr, failureReasonSupport)
failureRequest, err := h.fillInFailure(task.TaskToken, task.ResourceId, handlerErr, failureReasonSupport)
if err != nil {
return nil, nil, err
}
return nil, failureRequest, nil
}
completedRequest, err := h.fillInCompletion(task.TaskToken, res, failureReasonSupport)
completedRequest, err := h.fillInCompletion(task.TaskToken, task.ResourceId, res, failureReasonSupport)
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -458,7 +458,7 @@ func (h *nexusTaskHandler) newNexusOperationContext(response *workflowservice.Po
}, nil
}

func (h *nexusTaskHandler) fillInCompletion(taskToken []byte, res *nexuspb.Response, failureReasonSupport bool) (*workflowservice.RespondNexusTaskCompletedRequest, error) {
func (h *nexusTaskHandler) fillInCompletion(taskToken []byte, resourceId string, res *nexuspb.Response, failureReasonSupport bool) (*workflowservice.RespondNexusTaskCompletedRequest, error) {
// Handle conversion of Failure to OperationError for backwards compatibility with old servers.
if res.GetStartOperation().GetFailure() != nil && !failureReasonSupport {
// Convert to operation error for backwards compatibility.
Expand Down Expand Up @@ -487,18 +487,20 @@ func (h *nexusTaskHandler) fillInCompletion(taskToken []byte, res *nexuspb.Respo
}
}
return &workflowservice.RespondNexusTaskCompletedRequest{
Identity: h.identity,
Namespace: h.namespace,
TaskToken: taskToken,
Response: res,
Identity: h.identity,
Namespace: h.namespace,
TaskToken: taskToken,
Response: res,
ResourceId: resourceId,
}, nil
}

func (h *nexusTaskHandler) fillInFailure(taskToken []byte, handlerError *nexus.HandlerError, failureReasonSupport bool) (*workflowservice.RespondNexusTaskFailedRequest, error) {
func (h *nexusTaskHandler) fillInFailure(taskToken []byte, resourceId string, handlerError *nexus.HandlerError, failureReasonSupport bool) (*workflowservice.RespondNexusTaskFailedRequest, error) {
r := &workflowservice.RespondNexusTaskFailedRequest{
Identity: h.identity,
Namespace: h.namespace,
TaskToken: taskToken,
Identity: h.identity,
Namespace: h.namespace,
TaskToken: taskToken,
ResourceId: resourceId,
}
if failureReasonSupport {
r.Failure = h.failureConverter.ErrorToFailure(handlerError)
Expand Down
2 changes: 1 addition & 1 deletion internal/internal_nexus_task_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func (ntp *nexusTaskPoller) ProcessTask(task interface{}) error {
nctx, handlerErr := ntp.taskHandler.newNexusOperationContext(response)
if handlerErr != nil {
// context wasn't propagated to us, use a background context.
failedRequest, err := ntp.taskHandler.fillInFailure(response.TaskToken, handlerErr, getEffectiveTemporalFailureResponses(response.GetRequest().GetCapabilities().GetTemporalFailureResponses()))
failedRequest, err := ntp.taskHandler.fillInFailure(response.TaskToken, response.ResourceId, handlerErr, getEffectiveTemporalFailureResponses(response.GetRequest().GetCapabilities().GetTemporalFailureResponses()))
if err != nil {
return err
}
Expand Down
37 changes: 28 additions & 9 deletions internal/internal_task_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -1834,8 +1834,9 @@ func (wth *workflowTaskHandlerImpl) completeWorkflow(
// for query task
if task.Query != nil {
queryCompletedRequest := &workflowservice.RespondQueryTaskCompletedRequest{
TaskToken: task.TaskToken,
Namespace: wth.namespace,
TaskToken: task.TaskToken,
Namespace: wth.namespace,
ResourceId: task.WorkflowExecution.WorkflowId,
}
var panicErr *PanicError
if errors.As(workflowContext.err, &panicErr) {
Expand Down Expand Up @@ -1961,6 +1962,7 @@ func (wth *workflowTaskHandlerImpl) completeWorkflow(
BinaryChecksum: wth.workerBuildID,
QueryResults: queryResults,
Namespace: wth.namespace,
ResourceId: task.WorkflowExecution.WorkflowId,
MeteringMetadata: &commonpb.MeteringMetadata{NonfirstLocalActivityExecutionAttempts: nonfirstLAAttempts},
SdkMetadata: &sdk.WorkflowTaskCompletedMetadata{
LangUsedFlags: langUsedFlags,
Expand Down Expand Up @@ -2300,7 +2302,8 @@ func (ath *activityTaskHandlerImpl) Execute(taskQueue string, t *workflowservice
metricsHandler.Counter(metrics.UnregisteredActivityInvocationCounter).Inc(1)
return convertActivityResultToRespondRequest(ath.identity, t.TaskToken, nil,
NewActivityNotRegisteredError(activityType, ath.getRegisteredActivityNames()),
ath.dataConverter, ath.failureConverter, ath.namespace, false, ath.versionStamp, ath.deployment, ath.workerDeploymentOptions), nil
ath.dataConverter, ath.failureConverter, ath.namespace, false, ath.versionStamp, ath.deployment, ath.workerDeploymentOptions,
t.WorkflowExecution.GetWorkflowId(), t.ActivityId), nil
}

// panic handler
Expand All @@ -2318,7 +2321,8 @@ func (ath *activityTaskHandlerImpl) Execute(taskQueue string, t *workflowservice
metricsHandler.Counter(metrics.ActivityTaskErrorCounter).Inc(1)
panicErr := newPanicError(p, st)
result = convertActivityResultToRespondRequest(ath.identity, t.TaskToken, nil, panicErr,
ath.dataConverter, ath.failureConverter, ath.namespace, false, ath.versionStamp, ath.deployment, ath.workerDeploymentOptions)
ath.dataConverter, ath.failureConverter, ath.namespace, false, ath.versionStamp, ath.deployment, ath.workerDeploymentOptions,
t.WorkflowExecution.GetWorkflowId(), t.ActivityId)
}
}()

Expand Down Expand Up @@ -2365,7 +2369,8 @@ func (ath *activityTaskHandlerImpl) Execute(taskQueue string, t *workflowservice
)
}
return convertActivityResultToRespondRequest(ath.identity, t.TaskToken, output, err,
ath.dataConverter, ath.failureConverter, ath.namespace, isActivityCanceled, ath.versionStamp, ath.deployment, ath.workerDeploymentOptions), nil
ath.dataConverter, ath.failureConverter, ath.namespace, isActivityCanceled, ath.versionStamp, ath.deployment, ath.workerDeploymentOptions,
t.WorkflowExecution.GetWorkflowId(), t.ActivityId), nil
}

func (ath *activityTaskHandlerImpl) getActivity(name string) activity {
Expand Down Expand Up @@ -2425,15 +2430,28 @@ func createNewCommandWithMetadata(commandType enumspb.CommandType, metadata *sdk
}
}

func getActivityResourceIdFromCtx(ctx context.Context) string {
env := getActivityEnvironmentFromCtx(ctx)
if env == nil {
return ""
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we want test coverage for when env == nil?

}
// Check if this is a workflow activity or standalone activity
if env.workflowExecution.ID != "" {
return env.workflowExecution.ID
}
return env.activityID
}

func recordActivityHeartbeat(ctx context.Context, service workflowservice.WorkflowServiceClient, metricsHandler metrics.Handler,
identity string, taskToken []byte, details *commonpb.Payloads,
) error {
namespace := getNamespaceFromActivityCtx(ctx)
request := &workflowservice.RecordActivityTaskHeartbeatRequest{
TaskToken: taskToken,
Details: details,
Identity: identity,
Namespace: namespace,
TaskToken: taskToken,
Details: details,
Identity: identity,
Namespace: namespace,
ResourceId: getActivityResourceIdFromCtx(ctx),
}
Comment on lines 2461 to 2471
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 RecordActivityHeartbeatWithOptions in internal/internal_workflow_client.go never passes opts.WorkflowID to recordActivityHeartbeat, so RecordActivityTaskHeartbeatRequest.ResourceId is always empty when heartbeating from outside an activity — the exact primary use case for this API. This means proxy-based resource-id routing introduced by this PR will silently fail for all external/async heartbeat calls even when a WorkflowID is provided. The fix is to thread opts.WorkflowID into recordActivityHeartbeat (adding a workflowID parameter) and fall back to getActivityResourceId(workflowID, "") when the context has no activity environment, mirroring the pattern already used in recordActivityHeartbeatByID.

Extended reasoning...

Bug: RecordActivityHeartbeatWithOptions always produces empty ResourceId

What the bug is and how it manifests

RecordActivityHeartbeatWithOptions is the public API for workers (or application code) that hold an activity task token and want to report heartbeats outside the normal in-activity context — for example, in async or forked-process activity patterns. The caller provides opts.WorkflowID precisely so the system can associate the heartbeat with the correct workflow. PR #2226 adds ResourceId population to RecordActivityTaskHeartbeatRequest for proxy routing, but the RecordActivityHeartbeatWithOptions code path never sets ResourceId to anything other than empty string.

The specific code path that triggers it

RecordActivityHeartbeatWithOptions calls recordActivityHeartbeat(ctx, svc, metrics, identity, opts.TaskToken, data)opts.WorkflowID is referenced only for ActivitySerializationContext and is not forwarded. Inside recordActivityHeartbeat, the request is built with ResourceId: getActivityResourceIdFromCtx(ctx). getActivityResourceIdFromCtx calls getActivityEnvironmentFromCtx(ctx), which returns nil for any context that did not go through the normal in-activity dispatch path (i.e., every external/async caller). When the env is nil, the helper immediately returns "", so ResourceId is always empty for this entire code path.

Why existing code does not prevent it

The same PR correctly fixes recordActivityHeartbeatByID by passing workflowID and activityID as explicit parameters and computing ResourceId: getActivityResourceId(workflowID, activityID). It also fixes CompleteActivityWithOptions to pass opts.WorkflowID. The heartbeat-with-options path was simply missed. There is no guard or fallback that would use opts.WorkflowID as a substitute when the context lookup returns nil.

Impact

Any deployment relying on the temporal-resource-id gRPC header (set by temporalHeaderInterceptor from ResourceId) for proxy routing will receive no header on RecordActivityTaskHeartbeat requests originating from RecordActivityHeartbeatWithOptions. Since this is the primary API for async activities, proxy routing silently fails for a significant class of requests. The omission is invisible at the Go API level — callers supply opts.WorkflowID and have no way to know it is being ignored.

How to fix it

Add a workflowID string parameter to recordActivityHeartbeat and compute ResourceId with a two-stage fallback: first try getActivityResourceIdFromCtx(ctx), then fall back to getActivityResourceId(workflowID, "") if empty. Then update RecordActivityHeartbeatWithOptions to pass opts.WorkflowID and all other internal callers to pass "". This mirrors the pattern already used in recordActivityHeartbeatByID.

Step-by-step proof

  1. A worker receives an activity task token, spawns a goroutine, and calls client.RecordActivityHeartbeatWithOptions(ctx, opts) with opts.WorkflowID = "my-workflow-123" and opts.TaskToken = token.
  2. RecordActivityHeartbeatWithOptions invokes recordActivityHeartbeat(ctx, svc, metrics, identity, opts.TaskToken, data)opts.WorkflowID is not forwarded.
  3. recordActivityHeartbeat builds the request with ResourceId: getActivityResourceIdFromCtx(ctx).
  4. getActivityResourceIdFromCtx calls getActivityEnvironmentFromCtx(ctx) which returns nil (external ctx has no activity env), so it returns "".
  5. The outgoing RecordActivityTaskHeartbeatRequest has ResourceId = "".
  6. temporalHeaderInterceptor calls proxy.ExtractTemporalRequestHeaders, finds no resource id, and sets no temporal-resource-id header.
  7. The proxy cannot route the request to the correct cell, defeating the purpose of this PR for async heartbeats.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds legit


var heartbeatResponse *workflowservice.RecordActivityTaskHeartbeatResponse
Expand Down Expand Up @@ -2465,6 +2483,7 @@ func recordActivityHeartbeatByID(ctx context.Context, service workflowservice.Wo
ActivityId: activityID,
Details: details,
Identity: identity,
ResourceId: getActivityResourceId(workflowID, activityID),
}

var heartbeatResponse *workflowservice.RecordActivityTaskHeartbeatByIdResponse
Expand Down
Loading
Loading