protoc-gen-go-temporal
temporal + go + protobuf = ❤️
About
A protoc plugin for generating typed Temporal clients and workers in Go from protobuf schemas. This plugin allows:
- workflow authors to configure sensible defaults and guardrails
- simplifies the implementation and testing of Temporal workers
- and streamlines integration by providing typed client SDKs and a generated CLI application
Features
Generated Client with:
- methods for executing workflows, queries, signals, and updates
- methods for cancelling or terminating workflows
- default
client.StartWorkflowOptions
andclient.UpdateWorkflowWithOptionsRequest
- dynamic workflow ids, update ids, and search attributes via Bloblang expressions
- default timeouts, id reuse policies, retry policies, wait policies
Generated Worker resources with:
- functions for calling activities and local activities from workflows
- functions for executing child workflows and signalling external workflows
- default
workflow.ActivityOptions
,workflow.ChildWorkflowOptions
- default timeouts, parent cose policies, retry policies
Optional CLI with:
- commands for executing workflows, synchronously or asynchronously
- commands for starting workflows with signals, synchronously or asynchronously
- commands for querying existing workflows
- commands for sending signals to existing workflows
- typed flags for conventiently specifying workflow, query, and signal inputs
Generated Cross-Namespace (XNS) helpers: [Experimental]
- with support for invoking a service's workflows, queries, signals, and updates from workflows in a different temporal namespace
Generated Remote Codec Server helpers
Generated Markdown Documentation
Inspiration
This project was inspired by Chad Retz's awesome github.com/cretz/temporal-sdk-go-advanced and Jacob LeGrone's excellent Replay talk on Temporal @ Datadog
- Annotate
- Generate
- Implement
- Run
- Client
- CLI
- XNS
Annotate your protobuf services and methods with Temporal options.
syntax="proto3";
package example.v1;
import "google/protobuf/empty.proto";
import "temporal/v1/temporal.proto";
service Example {
option (temporal.v1.service) = {
task_queue: "example-v1"
};
// CreateFoo creates a new foo operation
rpc CreateFoo(CreateFooRequest) returns (CreateFooResponse) {
option (temporal.v1.workflow) = {
execution_timeout: { seconds: 3600 }
id: 'create-foo/${! name.slug() }'
id_reuse_policy: WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE
query: { ref: "GetFooProgress" }
signal: { ref: "SetFooProgress", start: true }
update: { ref: "UpdateFooProgress" }
};
}
// GetFooProgress returns the status of a CreateFoo operation
rpc GetFooProgress(google.protobuf.Empty) returns (GetFooProgressResponse) {
option (temporal.v1.query) = {};
}
// Notify sends a notification
rpc Notify(NotifyRequest) returns (google.protobuf.Empty) {
option (temporal.v1.activity) = {
start_to_close_timeout: { seconds: 30 }
retry_policy: {
max_attempts: 3
}
};
}
// SetFooProgress sets the current status of a CreateFoo operation
rpc SetFooProgress(SetFooProgressRequest) returns (google.protobuf.Empty) {
option (temporal.v1.signal) = {};
}
// UpdateFooProgress sets the current status of a CreateFoo operation
rpc UpdateFooProgress(SetFooProgressRequest) returns (GetFooProgressResponse) {
option (temporal.v1.update) = {
id: 'update-progress/${! progress.string() }'
};
}
}
// CreateFooRequest describes the input to a CreateFoo workflow
message CreateFooRequest {
// unique foo name
string name = 1;
}
// SampleWorkflowWithMutexResponse describes the output from a CreateFoo workflow
message CreateFooResponse {
Foo foo = 1;
}
// Foo describes an illustrative foo resource
message Foo {
string name = 1;
Status status = 2;
enum Status {
FOO_STATUS_UNSPECIFIED = 0;
FOO_STATUS_READY = 1;
FOO_STATUS_CREATING = 2;
}
}
// GetFooProgressResponse describes the output from a GetFooProgress query
message GetFooProgressResponse {
float progress = 1;
Foo.Status status = 2;
}
// NotifyRequest describes the input to a Notify activity
message NotifyRequest {
string message = 1;
}
// SetFooProgressRequest describes the input to a SetFooProgress signal
message SetFooProgressRequest {
// value of current workflow progress
float progress = 1;
}
Generate Go code for implementing Temporal Clients, Workers, and CLI applications.
version: v2
modules:
- path: proto
deps:
- buf.build/cludden/protoc-gen-go-temporal
lint:
use:
- BASIC
version: v2
managed:
enabled: true
override:
- file_option: go_package_prefix
value: example/gen
plugins:
- local: protoc-gen-go
out: gen
opt:
- paths=source_relative
- local: protoc-gen-go_temporal
out: gen
strategy: all
opt:
- cli-categories=true
- cli-enabled=true
- docs-out=./proto/README.md
- workflow-update-enabled=true
buf generate
Implement the required Workflow and Activity interfaces.
package example
import (
"context"
"fmt"
examplev1 "github.com/cludden/protoc-gen-go-temporal/gen/example/v1"
"go.temporal.io/sdk/activity"
"go.temporal.io/sdk/log"
"go.temporal.io/sdk/workflow"
)
type (
// Workflows manages shared state for workflow constructors and is used to
// register workflows with a worker
Workflows struct{}
// Activities manages shared state for activities and is used to register
// activities with a worker
Activities struct{}
// CreateFooWorkflow manages workflow state for a CreateFoo workflow
CreateFooWorkflow struct {
// it embeds the generated workflow Input type that contains the workflow
// input and signal helpers
*examplev1.CreateFooWorkflowInput
log log.Logger
progress float32
status examplev1.Foo_Status
}
)
// CreateFoo initializes a new examplev1.CreateFooWorkflow value
func (w *Workflows) CreateFoo(ctx workflow.Context, input *examplev1.CreateFooWorkflowInput) (examplev1.CreateFooWorkflow, error) {
return &CreateFooWorkflow{
CreateFooWorkflowInput: input,
log: workflow.GetLogger(ctx),
status: examplev1.Foo_FOO_STATUS_CREATING,
}, nil
}
// Execute defines the entrypoint to a example.v1.Example.CreateFoo workflow
func (wf *CreateFooWorkflow) Execute(ctx workflow.Context) (*examplev1.CreateFooResponse, error) {
// listen for signals using generated signal provided by workflow input
workflow.Go(ctx, func(ctx workflow.Context) {
for {
signal, _ := wf.SetFooProgress.Receive(ctx)
wf.UpdateFooProgress(ctx, signal)
}
})
// execute Notify activity using generated helper
if err := examplev1.Notify(ctx, &examplev1.NotifyRequest{
Message: fmt.Sprintf("creating foo resource (%s)", wf.Req.GetName()),
}); err != nil {
return nil, fmt.Errorf("error sending notification: %w", err)
}
// block until progress has reached 100 via signals and/or updates
if err := workflow.Await(ctx, func() bool {
return wf.status == examplev1.Foo_FOO_STATUS_READY
}); err != nil {
return nil, fmt.Errorf("error awaiting ready status: %w", err)
}
return &examplev1.CreateFooResponse{
Foo: &examplev1.Foo{
Name: wf.Req.GetName(),
Status: wf.status,
},
}, nil
}
// GetFooProgress defines the handler for a GetFooProgress query
func (wf *CreateFooWorkflow) GetFooProgress() (*examplev1.GetFooProgressResponse, error) {
return &examplev1.GetFooProgressResponse{Progress: wf.progress, Status: wf.status}, nil
}
// UpdateFooProgress defines the handler for an UpdateFooProgress update
func (wf *CreateFooWorkflow) UpdateFooProgress(ctx workflow.Context, req *examplev1.SetFooProgressRequest) (*examplev1.GetFooProgressResponse, error) {
wf.progress = req.GetProgress()
switch {
case wf.progress < 0:
wf.progress, wf.status = 0, examplev1.Foo_FOO_STATUS_CREATING
case wf.progress < 100:
wf.status = examplev1.Foo_FOO_STATUS_CREATING
case wf.progress >= 100:
wf.progress, wf.status = 100, examplev1.Foo_FOO_STATUS_READY
}
return &examplev1.GetFooProgressResponse{Progress: wf.progress, Status: wf.status}, nil
}
// Notify defines the implementation for a Notify activity
func (a *Activities) Notify(ctx context.Context, req *examplev1.NotifyRequest) error {
activity.GetLogger(ctx).Info("notification", "message", req.GetMessage())
return nil
}
Run your Temporal Worker using the generated helpers.
package main
import (
"log"
"log/slog"
"os"
"github.com/cludden/protoc-gen-go-temporal/examples/example"
examplev1 "github.com/cludden/protoc-gen-go-temporal/gen/example/v1"
"github.com/urfave/cli/v2"
"go.temporal.io/sdk/client"
logsdk "go.temporal.io/sdk/log"
"go.temporal.io/sdk/worker"
)
func main() {
// initialize the generated cli application
app, err := examplev1.NewExampleCli(
examplev1.NewExampleCliOptions().
WithClient(func(cmd *cli.Context) (client.Client, error) {
return client.Dial(client.Options{
Logger: logsdk.NewStructuredLogger(slog.New(slog.NewTextHandler(os.Stdout, nil))),
})
}).
WithWorker(func(cmd *cli.Context, c client.Client) (worker.Worker, error) {
// register activities and workflows using generated helpers
w := worker.New(c, examplev1.ExampleTaskQueue, worker.Options{})
examplev1.RegisterExampleActivities(w, &example.Activities{})
examplev1.RegisterExampleWorkflows(w, &example.Workflows{})
return w, nil
}),
)
if err != nil {
log.Fatalf("error initializing example cli: %v", err)
}
// run cli
if err := app.Run(os.Args); err != nil {
log.Fatal(err)
}
}
go run cmd/example/main.go worker
Interact with your workers from any Go application using the generated Client.
package main
import (
"context"
"log"
examplev1 "github.com/cludden/protoc-gen-go-temporal/gen/example/v1"
"go.temporal.io/sdk/client"
)
func main() {
// initialize service client with sdk client
c, _ := client.Dial(client.Options{})
client, ctx := examplev1.NewExampleClient(c), context.Background()
// execute a workflow asynchronously
run, _ := client.CreateFooAsync(ctx, &examplev1.CreateFooRequest{Name: "test"})
log.Printf("started workflow: workflow_id=%s, run_id=%s\n", run.ID(), run.RunID())
// send a signal to the workflow
log.Println("signalling progress")
_ = run.SetFooProgress(ctx, &examplev1.SetFooProgressRequest{Progress: 5.7})
// query the workflow
progress, _ := run.GetFooProgress(ctx)
log.Printf("queried progress: %s\n", progress.String())
// update the workflow
update, _ := run.UpdateFooProgress(ctx, &examplev1.SetFooProgressRequest{Progress: 100})
log.Printf("updated progress: %s\n", update.String())
// block on workflow completion
resp, _ := run.Get(ctx)
log.Printf("workflow completed: %s\n", resp.String())
}
go run cmd/client/main.go worker
Or from your local machine using the generated Command Line Interface.
go build -o example cmd/example/main.go
example -h
example create-foo --name test -d
example set-foo-progress -w create-foo/test --progress 5.7
example get-foo-progress -w create-foo/test
example update-foo-progress -w create-foo/test --progress 100
Or from other Temporal workflows in a different Namespace or Cluster.
package main
import (
"fmt"
"log"
examplev1 "github.com/cludden/protoc-gen-go-temporal/gen/example/v1"
"github.com/cludden/protoc-gen-go-temporal/gen/example/v1/examplev1xns"
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/worker"
"go.temporal.io/sdk/workflow"
)
func main() {
// initialize temporal client for current namespace
c, _ := client.Dial(client.Options{
Namespace: "default",
})
defer c.Close()
// initialize temporal worker in current namespace
w := worker.New(c, "my-task-queue", worker.Options{})
w.RegisterWorkflow(SomeWorkflow)
// initialize temporal client for proto service namespace
xnsc, _ := client.NewClientFromExisting(c, client.Options{
Namespace: "example",
})
// register generated cross-namespace activities using the appropriate
// temporal client
examplev1xns.RegisterExampleActivities(w, examplev1.NewExampleClient(xnsc))
// start worker
_ = w.Run(w.InterruptCh())
}
func SomeWorkflow(ctx workflow.Context) error {
log := workflow.GetLogger(ctx)
// start workflow in target namespace
run, err := examplev1xns.CreateFooAsync(ctx, &examplev1.CreateFooRequest{Name: w.Req.GetName()})
if err != nil {
return fmt.Errorf("error initializing CreateFoo workflow: %w", err)
}
// send signal
if err := run.SetFooProgress(ctx, &examplev1.SetFooProgressRequest{Progress: 5.7}); err != nil {
return fmt.Errorf("error signaling SetFooProgress: %w", err)
}
log.Info("SetFooProgress", "progress", 5.7)
// execute query
progress, err := run.GetFooProgress(ctx)
if err != nil {
return fmt.Errorf("error querying GetFooProgress: %w", err)
}
log.Info("GetFooProgress", "status", progress.GetStatus().String(), "progress", progress.GetProgress())
// execute update
update, err := run.UpdateFooProgressAsync(ctx, &examplev1.SetFooProgressRequest{Progress: 100})
if err != nil {
return fmt.Errorf("error initializing UpdateFooProgress: %w", err)
}
progress, err = update.Get(ctx)
if err != nil {
return fmt.Errorf("error updating UpdateFooProgress: %w", err)
}
log.Info("UpdateFooProgress", "status", progress.GetStatus().String(), "progress", progress.GetProgress())
// await workflow completion
resp, err := run.Get(ctx)
if err != nil {
return err
}
return nil
}