Skip to main content

Workflows

A workflow is implemented as a Go struct that:

  • satisfies the generated <Workflow>Workflow interface type generated by the plugin
  • embeds the generated <Workflow>WorkflowInput struct that contains the workflow input and any registered signals
example.go
package example

import (
examplev1 "path/to/gen/example/v1"
"go.temporal.io/sdk/workflow"
)

type Workflows struct {}

func (w *Workflows) Hello(ctx workflow.Context, input *examplev1.HelloWorkflowInput) (examplev1.HelloWorkflow, error) {
return &HelloWorkflow{input}, nil
}

type HelloWorkflow struct {
*examplev1.HelloWorkflowInput
}

func (w *HelloWorkflow) Execute(ctx workflow.Context) (*examplev1.HelloOutput, error) {
workflow.GetLogger(ctx).Info("executing hello workflow", "input", w.Req)
return &examplev1.HelloOutput{}, nil
}

Parameters

Every <Workflow>Workflow interface includes an Execute method that defines the workflow entrypoint. The signature of this method varies based on whether or not the workflow specifies a non-empty output message type.

tip

Most workflows should specify both an input and output message type, even if the type is empty. This to support the addition of fields to either the input or output (or both) in the future without needing to introduce a breaking change.

example.proto
syntax="proto3";

package example.v1;

import "temporal/v1/temporal.proto";

service Example {
rpc Hello(HelloInput) returns (HelloOutput) {
option (temporal.v1.workflow) = {};
}
}
main.go
package main

import (
"fmt"

examplev1 "path/to/gen/example/v1"
)

type HelloWorkflow struct {
*examplev1.HelloWorkflowInput
}

func (w *HelloWorkflow) Execute(ctx workflow.Context) (*examplev1.HelloOutput, error) {
return &examplev1.HelloOutput{
Result: fmt.Sprintf("Hello %s!", w.Req.GetName()),
}, nil
}

Registration

The plugin generates helpers for registering your workflows with a Temporal worker. These helpers rely on user-defined constructor functions. There are two flavors of registration helpers, composite and individual.

Composite

tip

The composite registration helper is the recommended approach for registrating workflows.

Each protobuf service with Temporal workflow definitions generates a Register<Service>Workflows composite registration function that registers all service workflows defined on a given protobuf service. This function receives two inputs:

  • a worker.Registry to register the Service workflows with
  • a struct value implementing the <Service>Workflows interface generated by the plugin. The interface describes a struct with methods for each workflow that initialize a new workflow value for an individual execution.
main.go
package main

import (
"log"

examplev1 "path/to/gen/example/v1"
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/worker"
)

// Workflows provides constructor methods for example.v1.Example workflows
type Workflows struct {}

// FooWorkflow implements an example.v1.Example.Foo workflow
type FooWorkflow struct {
*examplev1.FooWorkflowInput
}

// Foo initializes a new examplev1.Workflow value
func (w *Workflows) Foo(ctx workflow.Context, input *examplev1.FooWorkflowInput) (examplev1.FooWorkflow, error) {
return &FooWorld{input}, nil
}

// Execute defines the entrypoint to an example.v1.Example.Foo workflow
func (w *FooWorkflow) Execute(ctx workflow.Context) (*examplev1.FooOutput, error) {
return &examplev1.FooOutput{}, nil
}

// BarWorkflow implements an example.v1.Example.Bar workflow
type BarWorkflow struct {
*examplev1.BarWorkflowInput
}

// Bar initializes a new examplev1.Workflow value
func (w *Workflows) Bar(ctx workflow.Context, input *examplev1.BarWorkflowInput) (examplev1.BarWorkflow, error) {
return &BarWorld{input}, nil
}

// Execute defines the entrypoint to an example.v1.Example.Bar workflow
func (w *BarWorkflow) Execute(ctx workflow.Context) (*examplev1.BarOutput, error) {
return &examplev1.BarOutput{}, nil
}

func main() {
// initialize temporal client and worker
c, err := client.Dial(client.Options{})
if err != nil {
log.Fatalf("error initializing client: %v", err)
}
w := worker.New(c, examplev1.ExampleTaskQueue, worker.Options{})

// Register all example.v1.Example workflows with the worker
examplev1.RegisterExampleWorkflows(w, &Workflows{})
w.Run(worker.InterruptCh())
}

Individual

Each workflow definitions generates a Register<Workflow>Workflow individual registration function. This function receives two inputs:

  • a worker.Worker to register the workflow with
  • a constructor function that receives as input the workflow execution context and generated workflow input and initializes a new workflow value for an individual execution
main.go
package main

import (
"log"

examplev1 "path/to/gen/example/v1"
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/worker"
"go.temporal.io/sdk/workflow"
)

// FooWorkflow implements an example.v1.Example.Foo workflow
type FooWorkflow struct {
*examplev1.FooWorkflowInput
}

// NewFooWorkflow initializes a new examplev1.Workflow value
func NewFooWorkflow(ctx workflow.Context, input *examplev1.FooWorkflowInput) (examplev1.FooWorkflow, error) {
return &FooWorld{input}, nil
}

// Execute defines the entrypoint to an example.v1.Example.Foo workflow
func (w *FooWorkflow) Execute(ctx workflow.Context) (*examplev1.FooOutput, error) {
return &examplev1.FooOutput{}, nil
}

// BarWorkflow implements an example.v1.Example.Bar workflow
type BarWorkflow struct {
*examplev1.BarWorkflowInput
}

// NewBarWorkflow initializes a new examplev1.Workflow value
func NewBarWorkflow(ctx workflow.Context, input *examplev1.BarWorkflowInput) (examplev1.BarWorkflow, error) {
return &BarWorld{input}, nil
}

// Execute defines the entrypoint to an example.v1.Example.Bar workflow
func (w *BarWorkflow) Execute(ctx workflow.Context) (*examplev1.BarOutput, error) {
return &examplev1.BarOutput{}, nil
}

func main() {
// initialize temporal client and worker
c, err := client.Dial(client.Options{})
if err != nil {
log.Fatalf("error initializing client: %v", err)
}
w := worker.New(c, examplev1.ExampleTaskQueue, worker.Options{})

// Register all example.v1.Example workflows individually
examplev1.RegisterFooWorkflow(w, NewFooWorkflow)
examplev1.RegisterBarWorkflow(w, NewBarWorkflow)
w.Run(worker.InterruptCh())
}

Aliases

Workflows can be annotated with 0 or more aliases which results in the worker registering the workflow definition multiple times with different names. This can be used to evolve workflow naming conventions in a non-breaking fashion:

  1. Initial workflow definition
syntax="proto3";

package example.v1;

service Example {
rpc Hello(HelloInput) returns (HelloOutput) {
option (temporal.v1.workflow) = {
name: "hello-workflow"
};
}
}
  1. Add new workflow name as alias.

Old clients and new clients generated from this schema will continue to use the old name when executing workflows. New worker deployments will register the workflow under both names. This step is necessary to prevent new clients from attempting to execute the workflow using the new name before workers have been deployed. If this risk is deemed acceptable, this step can be skipped.

syntax="proto3";

package example.v1;

service Example {
rpc Hello(HelloInput) returns (HelloOutput) {
option (temporal.v1.workflow) = {
name: "hello-workflow"
aliases: ["example.v1.Hello"]
};
}
}
  1. Update name and aliases.

All new clients will now use the new workflow name. Workers will continue to handle both names.

syntax="proto3";

package example.v1;

service Example {
rpc Hello(HelloInput) returns (HelloOutput) {
option (temporal.v1.workflow) = {
name: "example.v1.Hello"
aliases: ["hello-workflow"]
};
}
}
  1. Remove old aliases.

Once all old clients have been phased out and old workflow histories have expired, the old alias can be safely removed.

syntax="proto3";

package example.v1;

service Example {
rpc Hello(HelloInput) returns (HelloOutput) {
option (temporal.v1.workflow) = {
name: "example.v1.Hello"
};
}
}

Initializers

Workflow structs can implement an optional Initialize method which will be invoked prior to signal channel initialization and query or update handler registrations. This can be useful if a workflow requires the use of an activity to initialize local workflow state.

example.go
package example

import (
"context"
"errors"
"log"

examplev1 "path/to/gen/example/v1"
"go.temporal.io/sdk/workflow"
)

type (
Workflows struct {}

FooWorkflow struct {
*examplev1.FooWorkflowInput
data map[string]any
}
)

func (w *Workflows) Foo(ctx workflow.Context, input *examplev1.FooWorkflowInput) (examplev1.FooWorkflow, error) {
return &FooWorkflow{input, nil}, nil
}

func (w *FooWorkflow) Initialize(ctx workflow.Context) error {
return workflow.SideEffect(ctx, func(workflow.Context) any {
return map[string]any{
"foo": "bar",
}
}).Get(&w.data)
}

func (w *FooWorkflow) Execute(ctx workflow.Context) (*examplev1.FooOutput, error) {
if err := workflow.Await(ctx, func() bool {
foo, ok := w.data["foo"].(string)
return ok && foo != "bar"
}); err != nil {
return nil, err
}
return &examplev1.FooOutput{}, nil
}

func (w *FooWorkflow) Bar(ctx workflow.Context, input *examplev1.BarInput) (*examplev1.BarOutput, error) {
if foo, _ := w.data["foo"]; foo != "bar" {
return nil, errors.New("unable to update foo")
}
w.data["foo"] = input.GetBar()
return &examplev1.BarOutput{}, nil
}

Invocation

The plugin supports several methods for executing protobuf workflows, each of which is outlined in more detail below.

Client

Consumers can utilize the generated Client to execute workflows from any Go application. See the Clients guide for more usage details.

main.go
package main

import (
"context"
"log"

examplev1 "path/to/gen/example/v1"
"go.temporal.io/sdk/client"
)

func main() {
// initialize temporal client
c, err := client.Dial(client.Options{})
if err != nil {
log.Fatalf("error initializing client: %v", err)
}

// initialize temporal protobuf client
client := examplev1.NewExampleClient(c)

// execute an example.v1.Example.Hello workflow and block until completion or non-retryable error
out, err := client.Hello(context.Background(), &examplev1.HelloInput{})
if err != nil {
log.Fatalf("error executing example.v1.Example.Hello workflow: %v", err)
}
}

Command Line Interface

Consumers can utilize the generated Command Line Interface as a standalone application for executing workflows. See the CLI guide for more usage details.

example -h
NAME:
example - an example temporal cli

USAGE:
example [global options] command [command options] [arguments...]

COMMANDS:
help, h Shows a list of commands or help for one command
WORKFLOWS:
hello Hello returns a friendly greeting
example hello -h
NAME:
example hello - Hello returns a friendly greeting

USAGE:
example hello [command options] [arguments...]

CATEGORY:
WORKFLOWS

OPTIONS:
--detach, -d run workflow in the background and print workflow and execution id (default: false)
--help, -h show help
--input-file value, -f value path to json-formatted input file
--task-queue value, -t value task queue name (default: "example-v1") [$TEMPORAL_TASK_QUEUE_NAME, $TEMPORAL_TASK_QUEUE, $TASK_QUEUE_NAME, $TASK_QUEUE]

INPUT

--name value Name specifies the subject to greet
example hello --name Temporal
{
"result": "Hello Temporal!"
}

Child Workflows

Workflows can be executed as child workflows from other workflows in the same Temporal namespace. See the Child Workflows guide for more usage details.

example.go
package main

import (
"fmt"

examplev1 "path/to/gen/example/v1"
"go.temporal.io/sdk/workflow"
)

func MyWorkflow(ctx workflow.Context) error {
out, err := examplev1.HelloChild(ctx, &examplev1.HelloInput{})
if err != nil {
return fmt.Errorf("error executing example.v1.Example.Hello child workflow: %w", err)
}
return nil
}

Cross-Namespace (XNS)

Workflows can be executed from other workflows in a different Temporal namespace or even an entirely separate Temporal cluster (e.g. on-prem to cloud). See the Cross-Namespace guide for more usage details.

example.go
package example

import (
"fmt"

examplev1 "path/to/gen/example/v1"
"path/to/gen/example/v1/examplev1xns"
"go.temporal.io/sdk/workflow"
)

func MyWorkflow(ctx workflow.Context) error {
out, err := examplev1xns.Hello(ctx, &examplev1.HelloInput{})
if err != nil {
return fmt.Errorf("error executing example.v1.Example.Hello xns workflow: %w", err)
}
return nil
}

Workflow Functions

Workflow definitions can be executed inline by another workflow definition using the generated <Workflow>Function variables or using the generated <Service>WorkflowFunctions helpers that enable mocking.

main.go
package main

import (
examplev1 "path/to/gen/example/v1"
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/worker"
"go.temporal.io/sdk/workflow"
)

type (
Workflows struct {}

FooWorkflow struct {
*examplev1.FooWorkflowInput
}

BarWorkflow struct {
*examplev1.BarWorkflowInput
}
)

func (w *Workflows) Foo(ctx workflow.Context, input *examplev1.FooInput) (examplev1.FooWorkflow, error) {
return &FooWorkflow{input}, nil
}

func (w *FooWorkflow) Execute(ctx workflow.Context) error {
workflow.GetLogger(ctx).Info("hello from foo!", "name", w.Req.GetName())
return nil
}

func (w *Workflows) Bar(ctx workflow.Context, input *examplev1.BarInput) (examplev1.BarWorkflow, error) {
return &BarWorkflow{input}, nil
}

func (w *BarWorkflow) Execute(ctx workflow.Context) error {
workflow.GetLogger(ctx).Info("hello from bar!", "name", w.Req.GetName())
out, err := examplev1.FooFunction(ctx, &examplev1.FooInput{})
if err != nil {
return fmt.Errorf("error executing example.v1.Example.Foo inline: %w", err)
}
return nil
}

func main() {
c, err := client.Dial(client.Options{})
if err != nil {
log.Fatalf("error initializing client: %v", err)
}
w := worker.New(c, examplev1.ExampleTaskQueue, worker.Options{})

examplev1.RegisterExampleWorkflows(w, &Workflows{})
w.Run(worker.InterruptCh())
}

Options

Both synchronous and asynchronous workflow methods accept an optional <Workflow>Options value as the final argument. This argument can be used to override the default client.StartWorkflowOptions created using the defaults defined in the schema.

WithExecutionTimeout

Set the workflow WorkflowExecutionTimeout value

example.go
func example(ctx context.Context, client examplev1.ExampleClient) error {
// override the default WorkflowExecutionTimeout set by the generated client
_, err := client.Foo(ctx, &examplev1.FooInput{}, examplev1.NewFooOptions().
WithExecutionTimeout(time.Hour * 2),
)
return err
}

WithID

Set the workflow ID value

example.go
func example(ctx context.Context, client examplev1.ExampleClient) error {
// override the default ID set by the generated client
_, err := client.Foo(ctx, &examplev1.FooInput{}, examplev1.NewFooOptions().
WithID("foo/bar"),
)
return err
}

WithIDReusePolicy

Set the workflow WorkflowIDReusePolicy value

example.go
func example(ctx context.Context, client examplev1.ExampleClient) error {
// override the default WorkflowIDReusePolicy set by the generated client
_, err := client.Foo(ctx, &examplev1.FooInput{}, examplev1.NewFooOptions().
WithIDReusePolicy(enumspb.WORKFLOW_ID_REUSE_POLICY_REJECT_DUPLICATE),
)
return err
}

WithRetryPolicy

Set the workflow RetryPolicy value

example.go
func example(ctx context.Context, client examplev1.ExampleClient) error {
// override the default RetryPolicy set by the generated client
_, err := client.Foo(ctx, &examplev1.FooInput{}, examplev1.NewFooOptions().
WithRetryPolicy(&temporal.RetryPolicy{
MaximumAttempts: 3,
}),
)
return err
}

WithRunTimeout

Set the workflow WorkflowRunTimeout value

example.go
func example(ctx context.Context, client examplev1.ExampleClient) error {
// override the default WorkflowRunTimeout set by the generated client
_, err := client.Foo(ctx, &examplev1.FooInput{}, examplev1.NewFooOptions().
WithRunTimeout(time.Minute * 10),
)
return err
}

WithSearchAttributes

Set the workflow SearchAttributes value

example.go
func example(ctx context.Context, client examplev1.ExampleClient) error {
// override the default SearchAttributes set by the generated client
_, err := client.Foo(ctx, &examplev1.FooInput{}, examplev1.NewFooOptions().
WithSearchAttributes(map[string]any{
"foo": "baz"
}),
)
return err
}

WithStartWorkflowOptions

Override the initial client.StartWorkflowOptions value for an individual invocation. Schema defined defaults will be applied over this value.

example.go
func example(ctx context.Context, client examplev1.ExampleClient) error {
_, err := client.Foo(ctx, &examplev1.FooInput{}, examplev1.NewFooOptions().
WithStartWorkflowOptions(client.StartWorkflowOptions{
WorkflowExecutionTimeout: time.Hour * 2
}),
)
return err
}

WithTaskTimeout

Set the workflow WorkflowTaskTimeout value

example.go
func example(ctx context.Context, client examplev1.ExampleClient) error {
// override the default WorkflowTaskTimeout set by the generated client
_, err := client.Foo(ctx, &examplev1.FooInput{}, examplev1.NewFooOptions().
WithTaskTimeout(time.Second * 5),
)
return err
}

WithTaskQueue

Set the workflow TaskQueue value

example.go
func example(ctx context.Context, client examplev1.ExampleClient) error {
// override the default TaskQueue set by the generated client
_, err := client.Foo(ctx, &examplev1.FooInput{}, examplev1.NewFooOptions().
WithTaskQueue("example-v3"),
)
return err
}

ChildOptions

Both synchronous and asynchronous child workflow helpers accept an optional <Workflow>ChildOptions value as the final argument. This argument can be used to override the default workflow.ChildWorkflowOptions created using the defaults defined in the schema.

WithChildWorkflowOptions

Override the initial workflow.ChildWorkflowOptions value for an individual invocation. Schema defined defaults will be applied over this value.

example.go
func (w *FooWorkflow) Execute(ctx workflow.Context) (*examplev1.FooOutput, error) {
_, err := examplev1.BarChild(ctx, &examplev1.BarInput{}, examplev1.NewBarChildOptions().
WithChildWorkflowOptions(workflow.ChildWorkflowOptions{
WorkflowExecutionTimeout: time.Hour * 2,
}),
)
return &examplev1.FooOutput{}, err
}

WithExecutionTimeout

Sets the child workflow WorkflowExecutionTimeout value

example.go
func (w *FooWorkflow) Execute(ctx workflow.Context) (*examplev1.FooOutput, error) {
_, err := examplev1.BarChild(ctx, &examplev1.BarInput{}, examplev1.NewBarChildOptions().
WithExecutionTimeout(time.Hour * 2),
)
return &examplev1.FooOutput{}, err
}

WithID

Sets the child workflow WorkflowID value

example.go
func (w *FooWorkflow) Execute(ctx workflow.Context) (*examplev1.FooOutput, error) {
_, err := examplev1.BarChild(ctx, &examplev1.BarInput{}, examplev1.NewBarChildOptions().
WithID("bar/baz"),
)
return &examplev1.FooOutput{}, err
}

WithIDReusePolicy

Sets the child workflow WorkflowID value

example.go
func (w *FooWorkflow) Execute(ctx workflow.Context) (*examplev1.FooOutput, error) {
_, err := examplev1.BarChild(ctx, &examplev1.BarInput{}, examplev1.NewBarChildOptions().
WithIDReusePolicy(enumspb.WORKFLOW_ID_REUSE_POLICY_REJECT_DUPLICATE),
)
return &examplev1.FooOutput{}, err
}

WithParentClosePolicy

Sets the child workflow ParentClosePolicy value

example.go
func (w *FooWorkflow) Execute(ctx workflow.Context) (*examplev1.FooOutput, error) {
_, err := examplev1.BarChild(ctx, &examplev1.BarInput{}, examplev1.NewBarChildOptions().
WithParentClosePolicy(enumspb.PARENT_CLOSE_POLICY_ABANDON),
)
return &examplev1.FooOutput{}, err
}

WithRetryPolicy

Sets the child workflow RetryPolicy value

example.go
func (w *FooWorkflow) Execute(ctx workflow.Context) (*examplev1.FooOutput, error) {
_, err := examplev1.BarChild(ctx, &examplev1.BarInput{}, examplev1.NewBarChildOptions().
WithRetryPolicy(&temporal.RetryPolicy{
MaximumAttempts: 3,
}),
)
return &examplev1.FooOutput{}, err
}

WithRunTimeout

Sets the child workflow WorkflowRunTimeout value

example.go
func (w *FooWorkflow) Execute(ctx workflow.Context) (*examplev1.FooOutput, error) {
_, err := examplev1.BarChild(ctx, &examplev1.BarInput{}, examplev1.NewBarChildOptions().
WithRunTimeout(time.Minute * 20),
)
return &examplev1.FooOutput{}, err
}

WithSearchAttributes

Sets the child workflow SearchAttributes value

example.go
func (w *FooWorkflow) Execute(ctx workflow.Context) (*examplev1.FooOutput, error) {
_, err := examplev1.BarChild(ctx, &examplev1.BarInput{}, examplev1.NewBarChildOptions().
WithSearchAttributes(map[string]any{
"foo": "baz"
}),
)
return &examplev1.FooOutput{}, err
}

WithTaskTimeout

Sets the child workflow WorkflowTaskTimeout value

example.go
func (w *FooWorkflow) Execute(ctx workflow.Context) (*examplev1.FooOutput, error) {
_, err := examplev1.BarChild(ctx, &examplev1.BarInput{}, examplev1.NewBarChildOptions().
WithTaskTimeout(time.Second * 5),
)
return &examplev1.FooOutput{}, err
}

WithTaskQueue

Sets the child workflow TaskQueue value

example.go
func (w *FooWorkflow) Execute(ctx workflow.Context) (*examplev1.FooOutput, error) {
_, err := examplev1.BarChild(ctx, &examplev1.BarInput{}, examplev1.NewBarChildOptions().
WithTaskQueue("example-v3"),
)
return &examplev1.FooOutput{}, err
}

WithWaitForCancellation

Sets the child workflow WaitForCancellation value

example.go
func (w *FooWorkflow) Execute(ctx workflow.Context) (*examplev1.FooOutput, error) {
_, err := examplev1.BarChild(ctx, &examplev1.BarInput{}, examplev1.NewBarChildOptions().
WithWaitForCancellation(false),
)
return &examplev1.FooOutput{}, err
}