protoc-gen-go-temporal
temporal + go + protobuf = ❤️

About
A protoc plugin for generating typed Temporal clients and workers in Go from protobuf schemas. This plugin allows:
- workflow authors to configure sensible defaults and guardrails
- simplifies the implementation and testing of Temporal workers
- and streamlines integration by providing typed client SDKs and a generated CLI application
Features
Generated Client with:
- methods for executing workflows, queries, signals, and updates
- methods for cancelling or terminating workflows
- default client.StartWorkflowOptionsandclient.UpdateWorkflowWithOptionsRequest
- dynamic workflow ids, update ids, and search attributes via Bloblang expressions
- default timeouts, id reuse policies, retry policies, wait policies
Generated Worker resources with:
- functions for calling activities and local activities from workflows
- functions for executing child workflows and signalling external workflows
- default workflow.ActivityOptions,workflow.ChildWorkflowOptions
- default timeouts, parent cose policies, retry policies
Optional CLI with:
- commands for executing workflows, synchronously or asynchronously
- commands for starting workflows with signals, synchronously or asynchronously
- commands for querying existing workflows
- commands for sending signals to existing workflows
- typed flags for conventiently specifying workflow, query, and signal inputs
Generated Nexus helpers: [Experimental]
- with support for invoking a service's workflows via Nexus operations
Generated Cross-Namespace (XNS) helpers:
- with support for invoking a service's workflows, queries, signals, and updates from workflows in a different temporal namespace or cluster
Generated Remote Codec Server helpers
Generated Markdown Documentation
Inspiration
This project was inspired by Chad Retz's awesome github.com/cretz/temporal-sdk-go-advanced and Jacob LeGrone's excellent Replay talk on Temporal @ Datadog
- Annotate
- Generate
- Implement
- Run
- Client
- CLI
- XNS
Annotate your protobuf services and methods with Temporal options.
syntax = "proto3";
package example.v1;
import "google/protobuf/empty.proto";
import "temporal/v1/temporal.proto";
service Example {
  option (temporal.v1.service) = {task_queue: "example-v1"};
  // CreateFoo creates a new foo operation
  rpc CreateFoo(CreateFooRequest) returns (CreateFooResponse) {
    option (temporal.v1.workflow) = {
      execution_timeout: {seconds: 3600}
      id: 'create-foo/${! name.slug() }'
      id_reuse_policy: WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE
      query: {ref: "GetFooProgress"}
      signal: {
        ref: "SetFooProgress"
        start: true
      }
      update: {ref: "UpdateFooProgress"}
    };
  }
  // GetFooProgress returns the status of a CreateFoo operation
  rpc GetFooProgress(google.protobuf.Empty) returns (GetFooProgressResponse) {
    option (temporal.v1.query) = {};
  }
  // Notify sends a notification
  rpc Notify(NotifyRequest) returns (google.protobuf.Empty) {
    option (temporal.v1.activity) = {
      start_to_close_timeout: {seconds: 30}
      retry_policy: {max_attempts: 3}
    };
  }
  // SetFooProgress sets the current status of a CreateFoo operation
  rpc SetFooProgress(SetFooProgressRequest) returns (google.protobuf.Empty) {
    option (temporal.v1.signal) = {};
  }
  // UpdateFooProgress sets the current status of a CreateFoo operation
  rpc UpdateFooProgress(SetFooProgressRequest) returns (GetFooProgressResponse) {
    option (temporal.v1.update) = {id: 'update-progress/${! progress.string() }'};
  }
}
// CreateFooRequest describes the input to a CreateFoo workflow
message CreateFooRequest {
  // unique foo name
  string name = 1;
}
// SampleWorkflowWithMutexResponse describes the output from a CreateFoo workflow
message CreateFooResponse {
  Foo foo = 1;
}
// Foo describes an illustrative foo resource
message Foo {
  string name = 1;
  Status status = 2;
  enum Status {
    FOO_STATUS_UNSPECIFIED = 0;
    FOO_STATUS_READY = 1;
    FOO_STATUS_CREATING = 2;
  }
}
// GetFooProgressResponse describes the output from a GetFooProgress query
message GetFooProgressResponse {
  float progress = 1;
  Foo.Status status = 2;
}
// NotifyRequest describes the input to a Notify activity
message NotifyRequest {
  string message = 1;
}
// SetFooProgressRequest describes the input to a SetFooProgress signal
message SetFooProgressRequest {
  // value of current workflow progress
  float progress = 1;
}
Generate Go code for implementing Temporal Clients, Workers, and CLI applications.
version: v2
modules:
  - path: proto
deps:
  - buf.build/cludden/protoc-gen-go-temporal
lint:
  use:
    - BASIC
version: v2
managed:
  enabled: true
  override:
    - file_option: go_package_prefix
      value: example/gen
plugins:
  - local: protoc-gen-go
    out: gen
    opt:
      - paths=source_relative
  - local: protoc-gen-go_temporal
    out: gen
    strategy: all
    opt:
      - cli-categories=true
      - cli-enabled=true
      - cli-v3=true
      - docs-out=./proto/README.md
buf generate
Implement the required Workflow and Activity interfaces.
package example
import (
	"context"
	"fmt"
	examplev1 "github.com/cludden/protoc-gen-go-temporal/gen/example/v1"
	"go.temporal.io/sdk/activity"
	"go.temporal.io/sdk/log"
	"go.temporal.io/sdk/workflow"
)
type (
	// Workflows manages shared state for workflow constructors and is used to
	// register workflows with a worker
	Workflows struct{}
	// Activities manages shared state for activities and is used to register
	// activities with a worker
	Activities struct{}
	// CreateFooWorkflow manages workflow state for a CreateFoo workflow
	CreateFooWorkflow struct {
		// it embeds the generated workflow Input type that contains the workflow
		// input and signal helpers
		*examplev1.CreateFooWorkflowInput
		log      log.Logger
		progress float32
		status   examplev1.Foo_Status
	}
)
// CreateFoo initializes a new examplev1.CreateFooWorkflow value
func (w *Workflows) CreateFoo(ctx workflow.Context, input *examplev1.CreateFooWorkflowInput) (examplev1.CreateFooWorkflow, error) {
	return &CreateFooWorkflow{
		CreateFooWorkflowInput: input,
		log:                    workflow.GetLogger(ctx),
		status:                 examplev1.Foo_FOO_STATUS_CREATING,
	}, nil
}
// Execute defines the entrypoint to a example.v1.Example.CreateFoo workflow
func (wf *CreateFooWorkflow) Execute(ctx workflow.Context) (*examplev1.CreateFooResponse, error) {
	// listen for signals using generated signal provided by workflow input
	workflow.Go(ctx, func(ctx workflow.Context) {
		for {
			signal, _ := wf.SetFooProgress.Receive(ctx)
			wf.UpdateFooProgress(ctx, signal)
		}
	})
	// execute Notify activity using generated helper
	if err := examplev1.Notify(ctx, &examplev1.NotifyRequest{
		Message: fmt.Sprintf("creating foo resource (%s)", wf.Req.GetName()),
	}); err != nil {
		return nil, fmt.Errorf("error sending notification: %w", err)
	}
	// block until progress has reached 100 via signals and/or updates
	if err := workflow.Await(ctx, func() bool {
		return wf.status == examplev1.Foo_FOO_STATUS_READY
	}); err != nil {
		return nil, fmt.Errorf("error awaiting ready status: %w", err)
	}
	return &examplev1.CreateFooResponse{
		Foo: &examplev1.Foo{
			Name:   wf.Req.GetName(),
			Status: wf.status,
		},
	}, nil
}
// GetFooProgress defines the handler for a GetFooProgress query
func (wf *CreateFooWorkflow) GetFooProgress() (*examplev1.GetFooProgressResponse, error) {
	return &examplev1.GetFooProgressResponse{Progress: wf.progress, Status: wf.status}, nil
}
// UpdateFooProgress defines the handler for an UpdateFooProgress update
func (wf *CreateFooWorkflow) UpdateFooProgress(ctx workflow.Context, req *examplev1.SetFooProgressRequest) (*examplev1.GetFooProgressResponse, error) {
	wf.progress = req.GetProgress()
	switch {
	case wf.progress < 0:
		wf.progress, wf.status = 0, examplev1.Foo_FOO_STATUS_CREATING
	case wf.progress < 100:
		wf.status = examplev1.Foo_FOO_STATUS_CREATING
	case wf.progress >= 100:
		wf.progress, wf.status = 100, examplev1.Foo_FOO_STATUS_READY
	}
	return &examplev1.GetFooProgressResponse{Progress: wf.progress, Status: wf.status}, nil
}
// Notify defines the implementation for a Notify activity
func (a *Activities) Notify(ctx context.Context, req *examplev1.NotifyRequest) error {
	activity.GetLogger(ctx).Info("notification", "message", req.GetMessage())
	return nil
}
Run your Temporal Worker using the generated helpers.
package main
import (
	"context"
	"log"
	"log/slog"
	"os"
	"github.com/cludden/protoc-gen-go-temporal/examples/example"
	examplev1 "github.com/cludden/protoc-gen-go-temporal/gen/example/v1"
	"github.com/urfave/cli/v3"
	"go.temporal.io/sdk/client"
	logsdk "go.temporal.io/sdk/log"
	"go.temporal.io/sdk/worker"
)
func main() {
	// initialize the generated cli application
	app, err := examplev1.NewExampleCli(
		examplev1.NewExampleCliOptions().
			WithClient(func(ctx context.Context, cmd *cli.Command) (client.Client, error) {
				return client.DialContext(ctx, client.Options{
					Logger: logsdk.NewStructuredLogger(slog.New(slog.NewTextHandler(os.Stdout, nil))),
				})
			}).
			WithWorker(func(ctx context.Context, cmd *cli.Command, c client.Client) (worker.Worker, error) {
				// register activities and workflows using generated helpers
				w := worker.New(c, examplev1.ExampleTaskQueue, worker.Options{})
				examplev1.RegisterExampleActivities(w, &example.Activities{})
				examplev1.RegisterExampleWorkflows(w, &example.Workflows{})
				return w, nil
			}),
	)
	if err != nil {
		log.Fatalf("error initializing example cli: %v", err)
	}
	// run cli
	if err := app.Run(context.Background(), os.Args); err != nil {
		log.Fatal(err)
	}
}
go run cmd/example/main.go worker
Interact with your workers from any Go application using the generated Client.
package main
import (
	"context"
	"log"
	examplev1 "github.com/cludden/protoc-gen-go-temporal/gen/example/v1"
	"go.temporal.io/sdk/client"
)
func main() {
	// initialize service client with sdk client
	c, _ := client.Dial(client.Options{})
	client, ctx := examplev1.NewExampleClient(c), context.Background()
	// execute a workflow asynchronously
	run, _ := client.CreateFooAsync(ctx, &examplev1.CreateFooRequest{Name: "test"})
	log.Printf("started workflow: workflow_id=%s, run_id=%s\n", run.ID(), run.RunID())
	// send a signal to the workflow
	log.Println("signalling progress")
	_ = run.SetFooProgress(ctx, &examplev1.SetFooProgressRequest{Progress: 5.7})
	// query the workflow
	progress, _ := run.GetFooProgress(ctx)
	log.Printf("queried progress: %s\n", progress.String())
	// update the workflow
	update, _ := run.UpdateFooProgress(ctx, &examplev1.SetFooProgressRequest{Progress: 100})
	log.Printf("updated progress: %s\n", update.String())
	// block on workflow completion
	resp, _ := run.Get(ctx)
	log.Printf("workflow completed: %s\n", resp.String())
}
go run cmd/client/main.go worker
Or from your local machine using the generated Command Line Interface.
go build -o example cmd/example/main.go
example -h

example create-foo --name test -d

example set-foo-progress -w create-foo/test --progress 5.7
example get-foo-progress -w create-foo/test

example update-foo-progress -w create-foo/test --progress 100

Or from other Temporal workflows in a different Namespace or Cluster.
package main
import (
	"fmt"
	"log"
	examplev1 "github.com/cludden/protoc-gen-go-temporal/gen/example/v1"
	"github.com/cludden/protoc-gen-go-temporal/gen/example/v1/examplev1xns"
	"go.temporal.io/sdk/client"
	"go.temporal.io/sdk/worker"
	"go.temporal.io/sdk/workflow"
)
func main() {
	// initialize temporal client for current namespace
	c, _ := client.Dial(client.Options{
		Namespace: "default",
	})
	defer c.Close()
	// initialize temporal worker in current namespace
	w := worker.New(c, "my-task-queue", worker.Options{})
	w.RegisterWorkflow(SomeWorkflow)
	// initialize temporal client for proto service namespace
	xnsc, _ := client.NewClientFromExisting(c, client.Options{
		Namespace: "example",
	})
	// register generated cross-namespace activities using the appropriate
	// temporal client
	examplev1xns.RegisterExampleActivities(w, examplev1.NewExampleClient(xnsc))
	
	// start worker
	_ = w.Run(w.InterruptCh())
}
func SomeWorkflow(ctx workflow.Context) error {
	log := workflow.GetLogger(ctx)
	// start workflow in target namespace
	run, err := examplev1xns.CreateFooAsync(ctx, &examplev1.CreateFooRequest{Name: w.Req.GetName()})
	if err != nil {
		return fmt.Errorf("error initializing CreateFoo workflow: %w", err)
	}
	// send signal
	if err := run.SetFooProgress(ctx, &examplev1.SetFooProgressRequest{Progress: 5.7}); err != nil {
		return fmt.Errorf("error signaling SetFooProgress: %w", err)
	}
	log.Info("SetFooProgress", "progress", 5.7)
	// execute query
	progress, err := run.GetFooProgress(ctx)
	if err != nil {
		return fmt.Errorf("error querying GetFooProgress: %w", err)
	}
	log.Info("GetFooProgress", "status", progress.GetStatus().String(), "progress", progress.GetProgress())
	// execute update
	update, err := run.UpdateFooProgressAsync(ctx, &examplev1.SetFooProgressRequest{Progress: 100})
	if err != nil {
		return fmt.Errorf("error initializing UpdateFooProgress: %w", err)
	}
	progress, err = update.Get(ctx)
	if err != nil {
		return fmt.Errorf("error updating UpdateFooProgress: %w", err)
	}
	log.Info("UpdateFooProgress", "status", progress.GetStatus().String(), "progress", progress.GetProgress())
	// await workflow completion
	resp, err := run.Get(ctx)
	if err != nil {
		return err
	}
	return nil
}