Cross Namespace (XNS)
Cross-Namespace (XNS) functionality enables workflows in one Temporal namespace or cluster to invoke workflows, send signals, execute queries, and perform updates on workflows running in different namespaces or clusters. The protoc-gen-go-temporal plugin generates XNS helper functions that wrap these operations as activities, providing type-safe, cross-namespace communication.
XNS Configuration
Enable XNS functionality by adding xns
configuration blocks to your protobuf temporal method options.
- XNS Configuration
- Generated XNS Package
syntax="proto3";
package example.v1;
import "temporal/v1/temporal.proto";
service Example {
option (temporal.v1.service) = {
task_queue: "example-v1"
};
rpc CreateFoo(CreateFooRequest) returns (CreateFooResponse) {
option (temporal.v1.workflow) = {
execution_timeout: { seconds: 3600 }
id: 'create-foo/${! name.slug() }'
xns: {
heartbeat_interval: { seconds: 10 }
heartbeat_timeout: { seconds: 20 }
start_to_close_timeout: { seconds: 3630 }
}
};
}
rpc GetFooProgress(google.protobuf.Empty) returns (GetFooProgressResponse) {
option (temporal.v1.query) = {
xns: {
heartbeat_interval: { seconds: 5 }
heartbeat_timeout: { seconds: 15 }
start_to_close_timeout: { seconds: 60 }
}
};
}
rpc SetFooProgress(SetFooProgressRequest) returns (google.protobuf.Empty) {
option (temporal.v1.signal) = {
xns: {
heartbeat_interval: { seconds: 5 }
heartbeat_timeout: { seconds: 15 }
start_to_close_timeout: { seconds: 60 }
}
};
}
rpc UpdateFooProgress(SetFooProgressRequest) returns (GetFooProgressResponse) {
option (temporal.v1.update) = {
id: 'update-progress/${! progress.string() }'
xns: {
heartbeat_interval: { seconds: 5 }
heartbeat_timeout: { seconds: 15 }
start_to_close_timeout: { seconds: 60 }
}
};
}
}
// Generated package: path/to/gen/example/v1/examplev1xns
// Workflow execution functions
func CreateFoo(ctx workflow.Context, req *CreateFooRequest, opts ...*CreateFooWorkflowOptions) (*CreateFooResponse, error)
func CreateFooAsync(ctx workflow.Context, req *CreateFooRequest, opts ...*CreateFooWorkflowOptions) (CreateFooRun, error)
// Query execution functions
func GetFooProgress(ctx workflow.Context, workflowID, runID string, opts ...*GetFooProgressQueryOptions) (*GetFooProgressResponse, error)
func GetFooProgressAsync(ctx workflow.Context, workflowID, runID string, opts ...*GetFooProgressQueryOptions) (GetFooProgressQueryHandle, error)
// Signal execution functions
func SetFooProgress(ctx workflow.Context, workflowID, runID string, req *SetFooProgressRequest, opts ...*SetFooProgressSignalOptions) error
func SetFooProgressAsync(ctx workflow.Context, workflowID, runID string, req *SetFooProgressRequest, opts ...*SetFooProgressSignalOptions) (SetFooProgressSignalHandle, error)
// Update execution functions
func UpdateFooProgress(ctx workflow.Context, workflowID, runID string, req *SetFooProgressRequest, opts ...*UpdateFooProgressUpdateOptions) (*GetFooProgressResponse, error)
func UpdateFooProgressAsync(ctx workflow.Context, workflowID, runID string, req *SetFooProgressRequest, opts ...*UpdateFooProgressUpdateOptions) (UpdateFooProgressHandle, error)
// XNS activity registration
func RegisterExampleActivities(r worker.ActivityRegistry, client ExampleClient, options ...*ExampleOptions)
Worker Setup
Configure workers in both the calling namespace (source) and target namespace to enable XNS communication.
- Target Namespace Worker
- Calling Namespace Worker
package main
import (
"log"
examplev1 "path/to/gen/example/v1"
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/worker"
)
func main() {
// Connect to target namespace
targetClient, err := client.Dial(client.Options{
HostPort: "localhost:7233",
Namespace: "target-namespace",
})
if err != nil {
log.Fatal(err)
}
defer targetClient.Close()
// Create worker for target namespace
targetWorker := worker.New(targetClient, examplev1.ExampleTaskQueue, worker.Options{})
// Register actual workflow and activity implementations
examplev1.RegisterExampleWorkflows(targetWorker, &ExampleWorkflows{})
examplev1.RegisterExampleActivities(targetWorker, &ExampleActivities{})
log.Println("Target namespace worker starting...")
err = targetWorker.Run(worker.InterruptCh())
if err != nil {
log.Fatal(err)
}
}
package main
import (
"log"
examplev1 "path/to/gen/example/v1"
examplev1xns "path/to/gen/example/v1/examplev1xns"
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/worker"
)
func main() {
// Connect to calling namespace
callingClient, err := client.Dial(client.Options{
HostPort: "localhost:7233",
Namespace: "calling-namespace",
})
if err != nil {
log.Fatal(err)
}
defer callingClient.Close()
// Connect to target namespace for XNS client
targetClient, err := client.Dial(client.Options{
HostPort: "localhost:7233",
Namespace: "target-namespace",
})
if err != nil {
log.Fatal(err)
}
defer targetClient.Close()
// Create worker for calling namespace
callingWorker := worker.New(callingClient, "calling-task-queue", worker.Options{})
// Register calling workflows
RegisterCallingWorkflows(callingWorker, &CallingWorkflows{})
// Register target XNS activities with calling worker using the target client
examplev1xns.RegisterExampleActivities(callingWorker, examplev1.NewExampleClient(targetClient))
log.Println("Calling namespace worker starting...")
err = callingWorker.Run(worker.InterruptCh())
if err != nil {
log.Fatal(err)
}
}
XNS Workflow Execution
Execute workflows across namespaces using the generated XNS helpers.
- Synchronous Execution
- Asynchronous Execution
package main
import (
"fmt"
examplev1 "path/to/gen/example/v1"
examplev1xns "path/to/gen/example/v1/examplev1xns"
"go.temporal.io/sdk/workflow"
)
type CallingWorkflow struct {
*CallingWorkflowInput
}
func (w *CallingWorkflow) Execute(ctx workflow.Context) (*CallingWorkflowResponse, error) {
// Execute workflow in target namespace synchronously
result, err := examplev1xns.CreateFoo(ctx, &examplev1.CreateFooRequest{
Name: "cross-namespace-foo",
})
if err != nil {
return nil, fmt.Errorf("cross-namespace workflow failed: %w", err)
}
workflow.GetLogger(ctx).Info("Cross-namespace workflow completed",
"result", result.Foo.Name)
return &CallingWorkflowResponse{
RemoteResult: result,
}, nil
}
func (w *CallingWorkflow) Execute(ctx workflow.Context) (*CallingWorkflowResponse, error) {
// Start workflow in target namespace asynchronously
run, err := examplev1xns.CreateFooAsync(ctx, &examplev1.CreateFooRequest{
Name: "async-cross-namespace-foo",
})
if err != nil {
return nil, fmt.Errorf("failed to start cross-namespace workflow: %w", err)
}
workflow.GetLogger(ctx).Info("Cross-namespace workflow started")
// Do other work while remote workflow executes
err = workflow.Sleep(ctx, time.Second*10)
if err != nil {
return nil, err
}
// Wait for remote workflow completion
result, err := run.Get(ctx)
if err != nil {
return nil, fmt.Errorf("cross-namespace workflow execution failed: %w", err)
}
return &CallingWorkflowResponse{
RemoteResult: result,
}, nil
}
XNS Communication
Send signals, execute queries, and perform updates on workflows running in different namespaces.
- Cross-Namespace Signals
- Cross-Namespace Queries
- Cross-Namespace Updates
func (w *CallingWorkflow) Execute(ctx workflow.Context) error {
// Start remote workflow
run, err := examplev1xns.CreateFooAsync(ctx, &examplev1.CreateFooRequest{
Name: "signal-target",
})
if err != nil {
return err
}
// Send signals to remote workflow
for progress := 0.2; progress <= 1.0; progress += 0.2 {
workflow.Sleep(ctx, time.Second*5)
err = run.SetFooProgress(ctx, &examplev1.SetFooProgressRequest{
Progress: float32(progress),
})
if err != nil {
workflow.GetLogger(ctx).Error("Failed to send signal", "error", err)
} else {
workflow.GetLogger(ctx).Info("Sent progress signal", "progress", progress)
}
}
// Wait for completion
result, err := run.Get(ctx)
if err != nil {
return err
}
workflow.GetLogger(ctx).Info("Remote workflow completed", "result", result)
return nil
}
func (w *CallingWorkflow) Execute(ctx workflow.Context) error {
// Start remote workflow
run, err := examplev1xns.CreateFooAsync(ctx, &examplev1.CreateFooRequest{
Name: "query-target",
})
if err != nil {
return err
}
// Monitor remote workflow progress via queries
for i := 0; i < 10; i++ {
workflow.Sleep(ctx, time.Second*5)
// Query remote workflow state
progress, err := run.GetFooProgress(ctx)
if err != nil {
workflow.GetLogger(ctx).Error("Query failed", "error", err)
continue
}
workflow.GetLogger(ctx).Info("Remote workflow progress",
"progress", progress.Progress,
"status", progress.Status)
// Break if complete
if progress.Status == examplev1.Foo_FOO_STATUS_READY {
break
}
}
// Wait for final completion
result, err := run.Get(ctx)
if err != nil {
return err
}
return nil
}
func (w *CallingWorkflow) Execute(ctx workflow.Context) error {
// Start remote workflow
run, err := examplev1xns.CreateFooAsync(ctx, &examplev1.CreateFooRequest{
Name: "update-target",
})
if err != nil {
return err
}
// Send updates to remote workflow
updateResult, err := run.UpdateFooProgress(ctx, &examplev1.SetFooProgressRequest{
Progress: 0.75,
})
if err != nil {
workflow.GetLogger(ctx).Error("Update failed", "error", err)
return err
}
workflow.GetLogger(ctx).Info("Update completed",
"progress", updateResult.GetProgress(),
"status", updateResult.GetStatus())
// Continue with async update
updateHandle, err := run.UpdateFooProgressAsync(ctx, &examplev1.SetFooProgressRequest{
Progress: 1.0,
})
if err != nil {
return err
}
// Wait for async update completion
finalUpdate, err := updateHandle.Get(ctx)
if err != nil {
return err
}
workflow.GetLogger(ctx).Info("Final update completed", "result", finalUpdate)
// Wait for workflow completion
result, err := run.Get(ctx)
return err
}
XNS Options and Configuration
Configure XNS operations using the generated options builders.
- Workflow Options
- Detached Mode
func (w *CallingWorkflow) Execute(ctx workflow.Context) error {
// Configure XNS workflow with custom options
run, err := examplev1xns.CreateFooAsync(ctx, &examplev1.CreateFooRequest{
Name: "configured-foo",
}, examplev1xns.NewCreateFooWorkflowOptions().
WithDetached(false). // Wait for completion (default)
WithHeartbeatInterval(time.Second * 15).
WithParentClosePolicy(enums.PARENT_CLOSE_POLICY_REQUEST_CANCEL).
WithActivityOptions(workflow.ActivityOptions{
StartToCloseTimeout: time.Minute * 10,
RetryPolicy: &temporal.RetryPolicy{
MaximumAttempts: 3,
},
}),
)
if err != nil {
return err
}
result, err := run.Get(ctx)
if err != nil {
return err
}
workflow.GetLogger(ctx).Info("Configured XNS workflow completed", "result", result)
return nil
}
func (w *CallingWorkflow) Execute(ctx workflow.Context) error {
// Start remote workflow in detached mode (fire-and-forget)
_, err := examplev1xns.CreateFoo(ctx, &examplev1.CreateFooRequest{
Name: "detached-foo",
}, examplev1xns.NewCreateFooWorkflowOptions().
WithDetached(true), // Don't wait for completion
)
if err != nil {
return err
}
workflow.GetLogger(ctx).Info("Detached workflow started")
// Parent workflow can complete while remote workflow continues
return nil
}
XNS Error Handling
Handle errors from cross-namespace operations with built-in error conversion and custom error handling.
- Error Handling
- Custom Error Handling
import (
"errors"
"github.com/cludden/protoc-gen-go-temporal/pkg/xns"
examplev1xns "path/to/gen/example/v1/examplev1xns"
"go.temporal.io/sdk/temporal"
)
func (w *CallingWorkflow) Execute(ctx workflow.Context) error {
run, err := examplev1xns.CreateFooAsync(ctx, &examplev1.CreateFooRequest{
Name: "error-prone-foo",
})
if err != nil {
return fmt.Errorf("failed to start XNS workflow: %w", err)
}
result, err := run.Get(ctx)
if err != nil {
// Handle different types of XNS errors
var applicationError *temporal.ApplicationError
if errors.As(err, &applicationError) {
// Extract XNS error details
errorCode := xns.Code(err)
isNonRetryable := xns.IsNonRetryable(err)
underlyingErr := xns.Unwrap(err)
workflow.GetLogger(ctx).Error("XNS workflow failed",
"error_code", errorCode,
"non_retryable", isNonRetryable,
"underlying_error", underlyingErr)
// Handle specific error types
switch errorCode {
case "WORKFLOW_ALREADY_EXISTS":
// Handle workflow already exists
return fmt.Errorf("remote workflow already exists: %w", err)
case "WORKFLOW_NOT_FOUND":
// Handle workflow not found
return fmt.Errorf("remote workflow not found: %w", err)
default:
return fmt.Errorf("remote workflow error: %w", err)
}
}
return fmt.Errorf("XNS workflow execution failed: %w", err)
}
workflow.GetLogger(ctx).Info("XNS workflow completed successfully", "result", result)
return nil
}
// Register XNS activities with custom error converter
func setupXNSWorker() {
// Custom error converter function
errorConverter := func(err error) error {
var applicationError *temporal.ApplicationError
if errors.As(err, &applicationError) {
// Convert to custom error format
return temporal.NewApplicationError(
fmt.Sprintf("CUSTOM_%s", applicationError.Type()),
"CUSTOM_ERROR",
err,
)
}
return err
}
// Register with custom error handling
examplev1xns.RegisterExampleActivities(worker, client,
examplev1xns.NewExampleOptions().WithErrorConverter(errorConverter))
}
Advanced XNS Patterns
Signal/Update with Start
Start remote workflows with initial signals or updates for atomic initialization.
- Signal with Start
- Update with Start
func (w *CallingWorkflow) Execute(ctx workflow.Context) error {
// Start remote workflow with initial signal
run, err := examplev1xns.CreateFooWithSetFooProgressAsync(ctx,
&examplev1.CreateFooRequest{Name: "signal-start-foo"},
&examplev1.SetFooProgressRequest{Progress: 0.25},
)
if err != nil {
return fmt.Errorf("failed to start workflow with signal: %w", err)
}
// Remote workflow starts with initial progress of 0.25
result, err := run.Get(ctx)
if err != nil {
return err
}
workflow.GetLogger(ctx).Info("Workflow started with signal completed", "result", result)
return nil
}
func (w *CallingWorkflow) Execute(ctx workflow.Context) error {
// Start remote workflow with initial update
updateResult, run, err := examplev1xns.CreateFooWithUpdateFooProgress(ctx,
&examplev1.CreateFooRequest{Name: "update-start-foo"},
&examplev1.SetFooProgressRequest{Progress: 0.5},
)
if err != nil {
return fmt.Errorf("failed to start workflow with update: %w", err)
}
workflow.GetLogger(ctx).Info("Initial update completed",
"update_result", updateResult)
// Wait for workflow completion
workflowResult, err := run.Get(ctx)
if err != nil {
return err
}
workflow.GetLogger(ctx).Info("Workflow with update completed",
"workflow_result", workflowResult)
return nil
}
Testing XNS Workflows
Test cross-namespace workflows using Temporal's testing framework with XNS mocking.
- XNS Testing
- Schema
package main
import (
"testing"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"go.temporal.io/sdk/testsuite"
examplev1 "path/to/gen/example/v1"
examplev1xns "path/to/gen/example/v1/examplev1xns"
)
func TestXNSWorkflow(t *testing.T) {
suite := &testsuite.WorkflowTestSuite{}
env := suite.NewTestWorkflowEnvironment()
// Register local workflows
RegisterCallingWorkflows(env, &CallingWorkflows{})
// Mock XNS activities
env.OnActivity(examplev1xns.CreateFooActivityName, mock.Anything, mock.Anything).
Return(&examplev1.CreateFooResponse{
Foo: &examplev1.Foo{
Name: "test-xns-foo",
Status: examplev1.Foo_FOO_STATUS_READY,
},
}, nil)
// Execute workflow that uses XNS
env.ExecuteWorkflow("CallingWorkflow", &CallingWorkflowRequest{
Name: "test-caller",
})
require.True(t, env.IsWorkflowCompleted())
require.NoError(t, env.GetWorkflowError())
var result CallingWorkflowResponse
err := env.GetWorkflowResult(&result)
require.NoError(t, err)
require.Equal(t, "test-xns-foo", result.RemoteResult.Foo.Name)
}
syntax="proto3";
package calling.v1;
import "temporal/v1/temporal.proto";
service Calling {
rpc CallingWorkflow(CallingWorkflowRequest) returns (CallingWorkflowResponse) {
option (temporal.v1.workflow) = {};
}
}