Skip to main content

Cross-Namespace

A simple example showcasing usage of the generated XNS helpers for simplifying cross-namespace and even cross-cluster integrations.

example.proto
syntax="proto3";

package example.xns.v1;

import "google/protobuf/empty.proto";
import "temporal/v1/temporal.proto";

service Xns {
option (temporal.v1.service) = {
task_queue: "xns-v1"
};

rpc ProvisionFoo(ProvisionFooRequest) returns (ProvisionFooResponse) {
option (temporal.v1.workflow) = {
id: 'provision-foo/${! name.slug() }'
};
}
}

service Example {
option (temporal.v1.service) = {
task_queue: "example-v1"
};

// CreateFoo creates a new foo operation
rpc CreateFoo(CreateFooRequest) returns (CreateFooResponse) {
option (temporal.v1.workflow) = {
execution_timeout: { seconds: 3600 }
id_reuse_policy: WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE
id: 'create-foo/${! name.slug() }'
xns: {
heartbeat_interval: { seconds: 10 }
heartbeat_timeout: { seconds: 20 }
start_to_close_timeout: { seconds: 3630 }
}
query: { ref: 'GetFooProgress' }
signal: { ref: 'SetFooProgress', start: true }
update: { ref: 'UpdateFooProgress' }
};
}

// GetFooProgress returns the status of a CreateFoo operation
rpc GetFooProgress(google.protobuf.Empty) returns (GetFooProgressResponse) {
option (temporal.v1.query) = {
xns: {
heartbeat_interval: { seconds: 10 }
heartbeat_timeout: { seconds: 20 }
start_to_close_timeout: { seconds: 60 }
}
};
}

// Notify sends a notification
rpc Notify(NotifyRequest) returns (google.protobuf.Empty) {
option (temporal.v1.activity) = {
start_to_close_timeout: { seconds: 30 }
retry_policy: {
max_attempts: 3
}
};
}

// SetFooProgress sets the current status of a CreateFoo operation
rpc SetFooProgress(SetFooProgressRequest) returns (google.protobuf.Empty) {
option (temporal.v1.signal) = {
xns: {
heartbeat_interval: { seconds: 10 }
heartbeat_timeout: { seconds: 20 }
start_to_close_timeout: { seconds: 60 }
}
};
}

// UpdateFooProgress sets the current status of a CreateFoo operation
rpc UpdateFooProgress(SetFooProgressRequest) returns (GetFooProgressResponse) {
option (temporal.v1.update) = {
id: 'update-progress/${! progress.string() }',
xns: {
heartbeat_interval: { seconds: 10 }
heartbeat_timeout: { seconds: 20 }
start_to_close_timeout: { seconds: 60 }
}
};
}
}

// CreateFooRequest describes the input to a CreateFoo workflow
message CreateFooRequest {
// unique foo name
string name = 1;
}

// SampleWorkflowWithMutexResponse describes the output from a CreateFoo workflow
message CreateFooResponse {
Foo foo = 1;
}

// Foo describes an illustrative foo resource
message Foo {
string name = 1;
Status status = 2;

enum Status {
FOO_STATUS_UNSPECIFIED = 0;
FOO_STATUS_READY = 1;
FOO_STATUS_CREATING = 2;
}
}

// GetFooProgressResponse describes the output from a GetFooProgress query
message GetFooProgressResponse {
float progress = 1;
Foo.Status status = 2;
}

// NotifyRequest describes the input to a Notify activity
message NotifyRequest {
string message = 1;
}

// ProvisionFooRequest describes the input to a ProvisionFoo workflow
message ProvisionFooRequest {
// unique foo name
string name = 1;
}

// SampleWorkflowWithMutexResponse describes the output from a ProvisionFoo workflow
message ProvisionFooResponse {
Foo foo = 1;
}

// SetFooProgressRequest describes the input to a SetFooProgress signal
message SetFooProgressRequest {
// value of current workflow progress
float progress = 1;
}
main.go
package main

import (
"fmt"
"log"
"os"
"os/signal"
"sync"
"syscall"

xnsv1 "github.com/cludden/protoc-gen-go-temporal/gen/example/xns/v1"
"github.com/cludden/protoc-gen-go-temporal/gen/example/xns/v1/xnsv1xns"
"github.com/urfave/cli/v2"
"go.temporal.io/sdk/client"
tlog "go.temporal.io/sdk/log"
"go.temporal.io/sdk/worker"
"go.temporal.io/sdk/workflow"
)

type (
Workflows struct{}
)

type ProvisionFooWorkflow struct {
*xnsv1.ProvisionFooWorkflowInput
log tlog.Logger
}

func (wfs *Workflows) ProvisionFoo(ctx workflow.Context, input *xnsv1.ProvisionFooWorkflowInput) (xnsv1.ProvisionFooWorkflow, error) {
return &ProvisionFooWorkflow{input, workflow.GetLogger(ctx)}, nil
}

func (w *ProvisionFooWorkflow) Execute(ctx workflow.Context) (*xnsv1.ProvisionFooResponse, error) {
run, err := xnsv1xns.CreateFooAsync(ctx, &xnsv1.CreateFooRequest{Name: w.Req.GetName()})
if err != nil {
return nil, fmt.Errorf("error initializing CreateFoo workflow: %w", err)
}

if err := run.SetFooProgress(ctx, &xnsv1.SetFooProgressRequest{Progress: 5.7}); err != nil {
return nil, fmt.Errorf("error signaling SetFooProgress: %w", err)
}
w.log.Info("SetFooProgress", "progress", 5.7)

progress, err := run.GetFooProgress(ctx)
if err != nil {
return nil, fmt.Errorf("error querying GetFooProgress: %w", err)
}
w.log.Info("GetFooProgress", "status", progress.GetStatus().String(), "progress", progress.GetProgress())

update, err := run.UpdateFooProgressAsync(ctx, &xnsv1.SetFooProgressRequest{Progress: 100})
if err != nil {
return nil, fmt.Errorf("error initializing UpdateFooProgress: %w", err)
}
progress, err = update.Get(ctx)
if err != nil {
return nil, fmt.Errorf("error updating UpdateFooProgress: %w", err)
}
w.log.Info("UpdateFooProgress", "status", progress.GetStatus().String(), "progress", progress.GetProgress())

resp, err := run.Get(ctx)
if err != nil {
return nil, err
}
return &xnsv1.ProvisionFooResponse{Foo: resp.GetFoo()}, nil
}

func main() {
app := &cli.App{}

exampleCmd, err := xnsv1.NewExampleCliCommand()
if err != nil {
log.Fatal(err)
}
app.Commands = append(app.Commands, exampleCmd)

xnsCmd, err := xnsv1.NewXnsCliCommand()
if err != nil {
log.Fatal(err)
}
app.Commands = append(app.Commands, xnsCmd)

app.Commands = append(app.Commands, &cli.Command{
Name: "worker",
Action: func(cmd *cli.Context) error {
c, err := client.Dial(client.Options{
Namespace: "example",
})
if err != nil {
return err
}
defer c.Close()

xnsc, err := client.NewClientFromExisting(c, client.Options{
Namespace: "default",
})
if err != nil {
return err
}

examplew := worker.New(c, xnsv1.ExampleTaskQueue, worker.Options{})
xnsv1.RegisterExampleWorkflows(examplew, &ExampleWorkflows{})
xnsv1.RegisterExampleActivities(examplew, &ExampleActivities{})

xnsw := worker.New(xnsc, xnsv1.XnsTaskQueue, worker.Options{})
xnsv1.RegisterXnsWorkflows(xnsw, &Workflows{})
xnsv1xns.RegisterExampleActivities(xnsw, xnsv1.NewExampleClient(c))

var g sync.WaitGroup
closeCh := make(chan any)
g.Add(2)
go func() {
defer g.Done()
examplew.Run(closeCh)
}()
go func() {
defer g.Done()
xnsw.Run(closeCh)
}()

interruptCh := make(chan os.Signal, 1)
signal.Notify(interruptCh, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-interruptCh
close(closeCh)
}()
g.Wait()
return nil
},
})

if err := app.Run(os.Args); err != nil {
log.Fatal(err)
}
}

Run this example

  1. Clone the examples
    git clone https://github.com/cludden/protoc-gen-go-temporal && cd protoc-gen-go-temporal
  2. Start temporal
    temporal server start-dev \
    --dynamic-config-value "frontend.enableUpdateWorkflowExecution=true" \
    --dynamic-config-value "frontend.enableUpdateWorkflowExecutionAsyncAccepted=true"
  3. In a different terminal, create example namespace and run the worker
    temporal operator namespace create example
    go run ./examples/xns/... worker
  4. In a different terminal, execute an xns workflow
    go run ./examples/xns/... xns provision-foo --name test