diff --git a/internal/internal_event_handlers.go b/internal/internal_event_handlers.go index 015733c42..561a5a1fa 100644 --- a/internal/internal_event_handlers.go +++ b/internal/internal_event_handlers.go @@ -636,7 +636,7 @@ func (wc *workflowEnvironmentImpl) ExecuteChildWorkflow( tagWorkflowType, params.WorkflowType.Name) } -func (wc *workflowEnvironmentImpl) ExecuteNexusOperation(params executeNexusOperationParams, callback func(*commonpb.Payload, error), startedHandler func(token string, e error)) int64 { +func (wc *workflowEnvironmentImpl) ExecuteNexusOperation(params ExecuteNexusOperationParams, callback func(*commonpb.Payload, error), startedHandler func(token string, e error)) int64 { seq := wc.GenerateSequence() scheduleTaskAttr := &commandpb.ScheduleNexusOperationCommandAttributes{ Endpoint: params.client.Endpoint(), diff --git a/internal/internal_worker_base.go b/internal/internal_worker_base.go index b21bc486a..7984cdad8 100644 --- a/internal/internal_worker_base.go +++ b/internal/internal_worker_base.go @@ -65,13 +65,37 @@ type ( Summary string } - executeNexusOperationParams struct { + ExecuteNexusOperationParams struct { client NexusClient operation string input *commonpb.Payload options NexusOperationOptions nexusHeader map[string]string } +) + +// NewExecuteNexusOperationParams builds a parameters struct for +// WorkflowEnvironment.ExecuteNexusOperation. Exposed so that non-Go SDKs +// (e.g. roadrunner-temporal proxying for PHP) can populate the struct from +// outside the `internal` package — the fields themselves stay unexported to +// keep the SDK free to evolve them. +func NewExecuteNexusOperationParams( + client NexusClient, + operation string, + input *commonpb.Payload, + options NexusOperationOptions, + nexusHeader map[string]string, +) ExecuteNexusOperationParams { + return ExecuteNexusOperationParams{ + client: client, + operation: operation, + input: input, + options: options, + nexusHeader: nexusHeader, + } +} + +type ( // WorkflowEnvironment Represents the environment for workflow. // Should only be used within the scope of workflow definition. @@ -88,7 +112,7 @@ type ( RequestCancelChildWorkflow(namespace, workflowID string) RequestCancelExternalWorkflow(namespace, workflowID, runID string, callback ResultHandler) ExecuteChildWorkflow(params ExecuteWorkflowParams, callback ResultHandler, startedHandler func(r WorkflowExecution, e error)) - ExecuteNexusOperation(params executeNexusOperationParams, callback func(*commonpb.Payload, error), startedHandler func(token string, e error)) int64 + ExecuteNexusOperation(params ExecuteNexusOperationParams, callback func(*commonpb.Payload, error), startedHandler func(token string, e error)) int64 RequestCancelNexusOperation(seq int64) GetLogger() log.Logger GetMetricsHandler() metrics.Handler diff --git a/internal/internal_workflow_testsuite.go b/internal/internal_workflow_testsuite.go index f8f611dd3..1c6dee686 100644 --- a/internal/internal_workflow_testsuite.go +++ b/internal/internal_workflow_testsuite.go @@ -92,7 +92,7 @@ type ( testNexusOperationHandle struct { env *testWorkflowEnvironmentImpl seq int64 - params executeNexusOperationParams + params ExecuteNexusOperationParams operationToken string cancelRequested bool started bool @@ -2680,7 +2680,7 @@ func (env *testWorkflowEnvironmentImpl) newTestNexusTaskHandler( } func (env *testWorkflowEnvironmentImpl) ExecuteNexusOperation( - params executeNexusOperationParams, + params ExecuteNexusOperationParams, callback func(*commonpb.Payload, error), startedHandler func(opID string, e error), ) int64 { diff --git a/internal/nexus_operations.go b/internal/nexus_operations.go index 793e31c5b..1bbb94e86 100644 --- a/internal/nexus_operations.go +++ b/internal/nexus_operations.go @@ -263,7 +263,7 @@ func protoFailureToNexusFailure(failure *nexuspb.Failure) nexus.Failure { } // nexusOperationFailure is a utility in use by the test environment. -func nexusOperationFailure(params executeNexusOperationParams, token string, cause *failurepb.Failure) *failurepb.Failure { +func nexusOperationFailure(params ExecuteNexusOperationParams, token string, cause *failurepb.Failure) *failurepb.Failure { return &failurepb.Failure{ Message: "nexus operation completed unsuccessfully", FailureInfo: &failurepb.Failure_NexusOperationExecutionFailureInfo{ diff --git a/internal/workflow.go b/internal/workflow.go index 691d2841c..2f11ed8d0 100644 --- a/internal/workflow.go +++ b/internal/workflow.go @@ -3042,7 +3042,7 @@ func (c nexusClient) ExecuteOperation(ctx Context, operation any, input any, opt }) } -func (wc *workflowEnvironmentInterceptor) prepareNexusOperationParams(ctx Context, input ExecuteNexusOperationInput) (executeNexusOperationParams, error) { +func (wc *workflowEnvironmentInterceptor) prepareNexusOperationParams(ctx Context, input ExecuteNexusOperationInput) (ExecuteNexusOperationParams, error) { dc := WithWorkflowContext(ctx, wc.env.GetDataConverter()) var ok bool @@ -3055,22 +3055,22 @@ func (wc *workflowEnvironmentInterceptor) prepareNexusOperationParams(ctx Contex operationName = regOp.Name() inputType := reflect.TypeOf(input.Input) if inputType != nil && !inputType.AssignableTo(regOp.InputType()) { - return executeNexusOperationParams{}, fmt.Errorf("cannot assign argument of type %q to type %q for operation %q", inputType, regOp.InputType(), operationName) + return ExecuteNexusOperationParams{}, fmt.Errorf("cannot assign argument of type %q to type %q for operation %q", inputType, regOp.InputType(), operationName) } } else { - return executeNexusOperationParams{}, fmt.Errorf("invalid 'operation' parameter, must be an OperationReference or a string") + return ExecuteNexusOperationParams{}, fmt.Errorf("invalid 'operation' parameter, must be an OperationReference or a string") } payload, err := dc.ToPayload(input.Input) if err != nil { - return executeNexusOperationParams{}, err + return ExecuteNexusOperationParams{}, err } if input.Options.CancellationType == NexusOperationCancellationTypeUnspecified { input.Options.CancellationType = NexusOperationCancellationTypeWaitCompleted } - return executeNexusOperationParams{ + return ExecuteNexusOperationParams{ client: input.Client, operation: operationName, input: payload, diff --git a/internalbindings/internalbindings.go b/internalbindings/internalbindings.go index abd613419..fba6d4279 100644 --- a/internalbindings/internalbindings.go +++ b/internalbindings/internalbindings.go @@ -53,9 +53,40 @@ type ( ContinueAsNewError = internal.ContinueAsNewError // UpdateCallbacks used to report the result of an update UpdateCallbacks = internal.UpdateCallbacks + // ExecuteNexusOperationParams parameters for invoking a Nexus operation from + // a workflow via WorkflowEnvironment.ExecuteNexusOperation. Exposed so that + // non-Go SDKs (e.g. roadrunner-temporal proxying for PHP) can build the + // params struct directly when they receive an `ExecuteNexusOperation` + // command from the worker. + ExecuteNexusOperationParams = internal.ExecuteNexusOperationParams + // NexusOperationOptions are workflow-level options for a Nexus operation. + NexusOperationOptions = internal.NexusOperationOptions + // NexusClient is the workflow-level client used to issue Nexus operations. + // Build via internal.NewNexusClient(endpoint, service). + NexusClient = internal.NexusClient ) // GetLastCompletionResult returns last completion result from workflow. func GetLastCompletionResult(env WorkflowEnvironment) *commonpb.Payloads { return internal.GetLastCompletionResultFromWorkflowInfo(env.WorkflowInfo()) } + +// NewNexusClient builds a NexusClient targeted at the given endpoint and +// service. Use the result with NewExecuteNexusOperationParams when feeding +// WorkflowEnvironment.ExecuteNexusOperation directly. +func NewNexusClient(endpoint, service string) NexusClient { + return internal.NewNexusClient(endpoint, service) +} + +// NewExecuteNexusOperationParams builds an ExecuteNexusOperationParams struct +// from outside the `internal` package — see internal.NewExecuteNexusOperationParams +// for the rationale. +func NewExecuteNexusOperationParams( + client NexusClient, + operation string, + input *commonpb.Payload, + options NexusOperationOptions, + nexusHeader map[string]string, +) ExecuteNexusOperationParams { + return internal.NewExecuteNexusOperationParams(client, operation, input, options, nexusHeader) +} diff --git a/internalbindings/internalbindings_test.go b/internalbindings/internalbindings_test.go new file mode 100644 index 000000000..dc7b679f6 --- /dev/null +++ b/internalbindings/internalbindings_test.go @@ -0,0 +1,63 @@ +// Tests covering the publicly exposed Nexus caller-side bindings added so +// that non-Go SDKs (e.g. roadrunner-temporal proxying for PHP) can build +// `ExecuteNexusOperationParams` from outside the `internal` package. +// +// The constructor is the only way for an external caller to populate the +// struct — its fields stay unexported on purpose. These tests document and +// guard that contract. +package internalbindings + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + commonpb "go.temporal.io/api/common/v1" +) + +func TestNewNexusClient_ExposesEndpointAndService(t *testing.T) { + c := NewNexusClient("ep-1", "svc-1") + require.NotNil(t, c, "client constructor must return a non-nil NexusClient") + + // NexusClient is an interface from go.temporal.io/sdk/internal — it + // exposes Endpoint() and Service() readers. We assert through the + // re-exported alias so the test breaks loudly if the interface shape + // changes upstream and the alias drifts. + assert.Equal(t, "ep-1", c.Endpoint()) + assert.Equal(t, "svc-1", c.Service()) +} + +func TestNewExecuteNexusOperationParams_RoundtripsAllFields(t *testing.T) { + // The struct's fields are unexported, so the only contract surface + // is "what you put in is what dispatchers see when they call + // WorkflowEnvironment.ExecuteNexusOperation". We can't read fields + // directly, but we can verify the constructor accepts the documented + // inputs without panicking and returns a value of the public type. + client := NewNexusClient("ep-2", "svc-2") + payload := &commonpb.Payload{Data: []byte("hello")} + options := NexusOperationOptions{} + headers := map[string]string{"Nexus-Operation-Token": "tok-x"} + + params := NewExecuteNexusOperationParams(client, "op-1", payload, options, headers) + + // The exported type alias must match the type returned by the + // constructor — protects against accidental drift between the + // `type ExecuteNexusOperationParams = internal....` line and the + // `func NewExecuteNexusOperationParams ... ExecuteNexusOperationParams` + // line. + var _ ExecuteNexusOperationParams = params +} + +func TestNewExecuteNexusOperationParams_AcceptsNilHeader(t *testing.T) { + // Headers map is optional — Java callers that don't surface a Nexus + // header just pass nil. Ensure nil doesn't panic on construction. + require.NotPanics(t, func() { + NewExecuteNexusOperationParams( + NewNexusClient("ep", "svc"), + "op", + nil, // input + NexusOperationOptions{}, + nil, // header + ) + }) +}