Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion internal/internal_event_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -636,7 +636,7 @@ func (wc *workflowEnvironmentImpl) ExecuteChildWorkflow(
tagWorkflowType, params.WorkflowType.Name)
}

func (wc *workflowEnvironmentImpl) ExecuteNexusOperation(params executeNexusOperationParams, callback func(*commonpb.Payload, error), startedHandler func(token string, e error)) int64 {
func (wc *workflowEnvironmentImpl) ExecuteNexusOperation(params ExecuteNexusOperationParams, callback func(*commonpb.Payload, error), startedHandler func(token string, e error)) int64 {
seq := wc.GenerateSequence()
scheduleTaskAttr := &commandpb.ScheduleNexusOperationCommandAttributes{
Endpoint: params.client.Endpoint(),
Expand Down
28 changes: 26 additions & 2 deletions internal/internal_worker_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,37 @@ type (
Summary string
}

executeNexusOperationParams struct {
ExecuteNexusOperationParams struct {
client NexusClient
operation string
input *commonpb.Payload
options NexusOperationOptions
nexusHeader map[string]string
}
)

// NewExecuteNexusOperationParams builds a parameters struct for
// WorkflowEnvironment.ExecuteNexusOperation. Exposed so that non-Go SDKs
// (e.g. roadrunner-temporal proxying for PHP) can populate the struct from
// outside the `internal` package — the fields themselves stay unexported to
// keep the SDK free to evolve them.
func NewExecuteNexusOperationParams(
client NexusClient,
operation string,
input *commonpb.Payload,
options NexusOperationOptions,
nexusHeader map[string]string,
) ExecuteNexusOperationParams {
return ExecuteNexusOperationParams{
client: client,
operation: operation,
input: input,
options: options,
nexusHeader: nexusHeader,
}
}

type (

// WorkflowEnvironment Represents the environment for workflow.
// Should only be used within the scope of workflow definition.
Expand All @@ -88,7 +112,7 @@ type (
RequestCancelChildWorkflow(namespace, workflowID string)
RequestCancelExternalWorkflow(namespace, workflowID, runID string, callback ResultHandler)
ExecuteChildWorkflow(params ExecuteWorkflowParams, callback ResultHandler, startedHandler func(r WorkflowExecution, e error))
ExecuteNexusOperation(params executeNexusOperationParams, callback func(*commonpb.Payload, error), startedHandler func(token string, e error)) int64
ExecuteNexusOperation(params ExecuteNexusOperationParams, callback func(*commonpb.Payload, error), startedHandler func(token string, e error)) int64
RequestCancelNexusOperation(seq int64)
GetLogger() log.Logger
GetMetricsHandler() metrics.Handler
Expand Down
4 changes: 2 additions & 2 deletions internal/internal_workflow_testsuite.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ type (
testNexusOperationHandle struct {
env *testWorkflowEnvironmentImpl
seq int64
params executeNexusOperationParams
params ExecuteNexusOperationParams
operationToken string
cancelRequested bool
started bool
Expand Down Expand Up @@ -2680,7 +2680,7 @@ func (env *testWorkflowEnvironmentImpl) newTestNexusTaskHandler(
}

func (env *testWorkflowEnvironmentImpl) ExecuteNexusOperation(
params executeNexusOperationParams,
params ExecuteNexusOperationParams,
callback func(*commonpb.Payload, error),
startedHandler func(opID string, e error),
) int64 {
Expand Down
2 changes: 1 addition & 1 deletion internal/nexus_operations.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ func protoFailureToNexusFailure(failure *nexuspb.Failure) nexus.Failure {
}

// nexusOperationFailure is a utility in use by the test environment.
func nexusOperationFailure(params executeNexusOperationParams, token string, cause *failurepb.Failure) *failurepb.Failure {
func nexusOperationFailure(params ExecuteNexusOperationParams, token string, cause *failurepb.Failure) *failurepb.Failure {
return &failurepb.Failure{
Message: "nexus operation completed unsuccessfully",
FailureInfo: &failurepb.Failure_NexusOperationExecutionFailureInfo{
Expand Down
10 changes: 5 additions & 5 deletions internal/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -3042,7 +3042,7 @@ func (c nexusClient) ExecuteOperation(ctx Context, operation any, input any, opt
})
}

func (wc *workflowEnvironmentInterceptor) prepareNexusOperationParams(ctx Context, input ExecuteNexusOperationInput) (executeNexusOperationParams, error) {
func (wc *workflowEnvironmentInterceptor) prepareNexusOperationParams(ctx Context, input ExecuteNexusOperationInput) (ExecuteNexusOperationParams, error) {
dc := WithWorkflowContext(ctx, wc.env.GetDataConverter())

var ok bool
Expand All @@ -3055,22 +3055,22 @@ func (wc *workflowEnvironmentInterceptor) prepareNexusOperationParams(ctx Contex
operationName = regOp.Name()
inputType := reflect.TypeOf(input.Input)
if inputType != nil && !inputType.AssignableTo(regOp.InputType()) {
return executeNexusOperationParams{}, fmt.Errorf("cannot assign argument of type %q to type %q for operation %q", inputType, regOp.InputType(), operationName)
return ExecuteNexusOperationParams{}, fmt.Errorf("cannot assign argument of type %q to type %q for operation %q", inputType, regOp.InputType(), operationName)
}
} else {
return executeNexusOperationParams{}, fmt.Errorf("invalid 'operation' parameter, must be an OperationReference or a string")
return ExecuteNexusOperationParams{}, fmt.Errorf("invalid 'operation' parameter, must be an OperationReference or a string")
}

payload, err := dc.ToPayload(input.Input)
if err != nil {
return executeNexusOperationParams{}, err
return ExecuteNexusOperationParams{}, err
}

if input.Options.CancellationType == NexusOperationCancellationTypeUnspecified {
input.Options.CancellationType = NexusOperationCancellationTypeWaitCompleted
}

return executeNexusOperationParams{
return ExecuteNexusOperationParams{
client: input.Client,
operation: operationName,
input: payload,
Expand Down
31 changes: 31 additions & 0 deletions internalbindings/internalbindings.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,40 @@ type (
ContinueAsNewError = internal.ContinueAsNewError
// UpdateCallbacks used to report the result of an update
UpdateCallbacks = internal.UpdateCallbacks
// ExecuteNexusOperationParams parameters for invoking a Nexus operation from
// a workflow via WorkflowEnvironment.ExecuteNexusOperation. Exposed so that
// non-Go SDKs (e.g. roadrunner-temporal proxying for PHP) can build the
// params struct directly when they receive an `ExecuteNexusOperation`
// command from the worker.
ExecuteNexusOperationParams = internal.ExecuteNexusOperationParams
// NexusOperationOptions are workflow-level options for a Nexus operation.
NexusOperationOptions = internal.NexusOperationOptions
// NexusClient is the workflow-level client used to issue Nexus operations.
// Build via internal.NewNexusClient(endpoint, service).
NexusClient = internal.NexusClient
)

// GetLastCompletionResult returns last completion result from workflow.
func GetLastCompletionResult(env WorkflowEnvironment) *commonpb.Payloads {
return internal.GetLastCompletionResultFromWorkflowInfo(env.WorkflowInfo())
}

// NewNexusClient builds a NexusClient targeted at the given endpoint and
// service. Use the result with NewExecuteNexusOperationParams when feeding
// WorkflowEnvironment.ExecuteNexusOperation directly.
func NewNexusClient(endpoint, service string) NexusClient {
return internal.NewNexusClient(endpoint, service)
}

// NewExecuteNexusOperationParams builds an ExecuteNexusOperationParams struct
// from outside the `internal` package — see internal.NewExecuteNexusOperationParams
// for the rationale.
func NewExecuteNexusOperationParams(
client NexusClient,
operation string,
input *commonpb.Payload,
options NexusOperationOptions,
nexusHeader map[string]string,
) ExecuteNexusOperationParams {
return internal.NewExecuteNexusOperationParams(client, operation, input, options, nexusHeader)
}
63 changes: 63 additions & 0 deletions internalbindings/internalbindings_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Tests covering the publicly exposed Nexus caller-side bindings added so
// that non-Go SDKs (e.g. roadrunner-temporal proxying for PHP) can build
// `ExecuteNexusOperationParams` from outside the `internal` package.
//
// The constructor is the only way for an external caller to populate the
// struct — its fields stay unexported on purpose. These tests document and
// guard that contract.
package internalbindings

import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
commonpb "go.temporal.io/api/common/v1"
)

func TestNewNexusClient_ExposesEndpointAndService(t *testing.T) {
c := NewNexusClient("ep-1", "svc-1")
require.NotNil(t, c, "client constructor must return a non-nil NexusClient")

// NexusClient is an interface from go.temporal.io/sdk/internal — it
// exposes Endpoint() and Service() readers. We assert through the
// re-exported alias so the test breaks loudly if the interface shape
// changes upstream and the alias drifts.
assert.Equal(t, "ep-1", c.Endpoint())
assert.Equal(t, "svc-1", c.Service())
}

func TestNewExecuteNexusOperationParams_RoundtripsAllFields(t *testing.T) {
// The struct's fields are unexported, so the only contract surface
// is "what you put in is what dispatchers see when they call
// WorkflowEnvironment.ExecuteNexusOperation". We can't read fields
// directly, but we can verify the constructor accepts the documented
// inputs without panicking and returns a value of the public type.
client := NewNexusClient("ep-2", "svc-2")
payload := &commonpb.Payload{Data: []byte("hello")}
options := NexusOperationOptions{}
headers := map[string]string{"Nexus-Operation-Token": "tok-x"}

params := NewExecuteNexusOperationParams(client, "op-1", payload, options, headers)

// The exported type alias must match the type returned by the
// constructor — protects against accidental drift between the
// `type ExecuteNexusOperationParams = internal....` line and the
// `func NewExecuteNexusOperationParams ... ExecuteNexusOperationParams`
// line.
var _ ExecuteNexusOperationParams = params
}

func TestNewExecuteNexusOperationParams_AcceptsNilHeader(t *testing.T) {
// Headers map is optional — Java callers that don't surface a Nexus
// header just pass nil. Ensure nil doesn't panic on construction.
require.NotPanics(t, func() {
NewExecuteNexusOperationParams(
NewNexusClient("ep", "svc"),
"op",
nil, // input
NexusOperationOptions{},
nil, // header
)
})
}