Updates
An update is implemented as a method on the workflow struct.
Workflow Updates are considered an experimental feature. They require the --workflow-update-enabled to be enabled.
- Schema
- Go
syntax="proto3";
package example.v1;
import "temporal/v1/temporal.proto";
service Example {
option (temporal.v1.service).task_queue = "example-v1";
rpc Foo(FooInput) returns (FooOutput) {
option (temporal.v1.workflow) = {
update: { ref: "Bar" }
};
}
rpc Bar(BarInput) returns (BarOutput) {
option (temporal.v1.update) = {
id: 'bar/${! uuid_v4() }'
};
}
}
package example
import (
"fmt"
examplev1 "path/to/gen/example/v1"
"go.temporal.io/sdk/workflow"
)
type FooWorkflow struct {
*examplev1.FooWorkflowInput
bar *examplev1.BarInput
}
func (w *FooWorkflow) Execute(ctx workflow.Context) (*examplev1.FooOutput, error) {
if err := workflow.Await(ctx, func() bool {
return w.bar != nil
}); err != nil {
return nil, fmt.Errorf("error awaiting update: %w", err)
}
}
func (w *FooWorkflow) Bar(ctx workflow.Context, input *examplev1.BarInput) (*examplev1.BarOutput, error) {
w.bar = input
return &examplev1.BarOutput{}, nil
}
Validation
Temporal workflow updates have four phases: Admission
, Validation
, Execution
, and Completion
. The Validation
phase is optional, and can be enabled by specifying a developer-provided validation function.
This validation code, similar to a Query handler, can observe but not change the Workflow state. This means that the validation of an Update request may depend on the Workflow state at runtime. To indicate that the Update request doesn't pass validation, the validation code must throw/return a language-appropriate error. In this case, the system rejects the request, doesn't record anything in the Workflow Event History to indicate that the Update ever happened, and the Update processing doesn't proceed to later phases.
Validation is enabled using the validate update option and implemented as a method on the workflow struct.
- Schema
- Go
syntax="proto3";
package example.v1;
import "temporal/v1/temporal.proto";
service Example {
option (temporal.v1.service).task_queue = "example-v1";
rpc Foo(FooInput) returns (FooOutput) {
option (temporal.v1.workflow) = {
update: { ref: "Bar" }
};
}
rpc Bar(BarInput) returns (BarOutput) {
option (temporal.v1.update) = {
id: 'bar/${! uuid_v4() }'
validate: true
};
}
}
package example
import (
"fmt"
examplev1 "path/to/gen/example/v1"
"go.temporal.io/sdk/temporal"
"go.temporal.io/sdk/workflow"
)
type FooWorkflow struct {
*examplev1.FooWorkflowInput
bar *examplev1.BarInput
}
func (w *FooWorkflow) Execute(ctx workflow.Context) (*examplev1.FooOutput, error) {
if err := workflow.Await(ctx, func() bool {
return w.bar != nil
}); err != nil {
return nil, fmt.Errorf("error awaiting update: %w", err)
}
}
func (w *FooWorkflow) ValidateBar(ctx workflow.Context, input *examplev1.BarInput) error {
if w.bar != nil {
return temporal.NewNonRetryableApplicationError(ctx, "AlreadyExists", nil)
}
return nil
}
func (w *FooWorkflow) Bar(ctx workflow.Context, input *examplev1.BarInput) (*examplev1.BarOutput, error) {
w.bar = input
return &examplev1.BarOutput{}, nil
}
Invocation
Client
Consumers can utilize the generated Client to execute workflow updates from any Go application. See the Clients guide for more usage details.
- Go
- Schema
package example
import (
"context"
"log"
examplev1 "path/to/gen/example/v1"
"go.temporal.io/sdk/client"
)
func main() {
c, _ := client.Dial(client.Options{})
client, ctx := examplev1.NewExampleClient(c), context.Background()
bar, err := client.Bar(ctx, "foo-worklow-id", "", &examplev1.BarInput{});
if err != nil {
log.Fatalf("error updating workflow: %v", err)
}
log.Printf("bar update successful: %s", bar.String())
}
syntax="proto3";
package example.v1;
import "google/protobuf/empty.proto";
import "temporal/v1/temporal.proto";
service Example {
rpc Foo(FooInput) returns (FooOutput) {
option (temporal.v1.workflow) = {
update: { ref: "Bar" }
};
}
rpc Bar(BarInput) returns (BarOutput) {
option (temporal.v1.update) = {};
}
}
Command Line Interface (CLI)
Consumers can utilize the generated Command Line Interface as a standalone application for executing workflow updates. See the CLI guide for more usage details.
- Shell
- Go
- Schema
NAME:
example - an example temporal cli
USAGE:
example [global options] command [command options] [arguments...]
COMMANDS:
help, h Shows a list of commands or help for one command
UPDATES:
bar Bar updates a foo workflow
WORKFLOWS:
foo Foo does some foo thing
NAME:
example bar - Bar updates a foo workflow
USAGE:
example bar [command options] [arguments...]
CATEGORY:
UPDATES
OPTIONS:
--detach, -d run workflow update in the background and print workflow, execution, and update id (default: false)
--input-file value, -f value path to json-formatted input file
--run-id value, -r value run id
--workflow-id value, -w value workflow id
INPUT
--name value Name specifies the subject to greet
{}
package main
import (
"log"
"os"
examplev1 "path/to/gen/example/v1"
)
func main() {
app, err := examplev1.NewExampleCLI()
if err != nil {
log.Fatalf("error initializing cli: %v", err)
}
if err := app.Run(os.Args); err != nil {
log.Fatal(err)
}
}
syntax="proto3";
package example.v1;
import "google/protobuf/empty.proto";
import "temporal/v1/temporal.proto";
service Example {
// Foo does some foo thing
rpc Foo(FooInput) returns (FooOutput) {
option (temporal.v1.workflow) = {
update: { ref: "Bar" }
};
}
// Bar updates a foo workflow
rpc Bar(BarInput) returns (BarOutput) {
option (temporal.v1.update) = {};
}
}
message BarInput {
string name = 1;
}
Workflow Run
The generated client's asynchronous workflow methods return a WorkflowRun that provides methods for updating the workflow.
- Go
- Schema
package main
import (
"context"
"log"
examplev1 "path/to/gen/example/v1"
"go.temporal.io/sdk/client"
)
func main() {
c, _ := client.Dial(client.Options{})
client, ctx := examplev1.NewExampleClient(c), context.Background()
run, err := client.FooAsync(ctx, &examplev1.FooInput{})
if err != nil {
log.Fatalf("error starting workflow: %v", err)
}
bar, err := run.Bar(ctx, &examplev1.BarInput{})
if err != nil {
log.Fatalf("error updating workflow: %v", err)
}
log.Printf("update successful: %s\n", bar.String())
foo, err := run.Get(ctx)
if err != nil {
log.Fatalf("workflow failed: %v", err)
}
log.Printf("workflow successful: %s\n", foo.String())
}
syntax="proto3";
package example.v1;
import "google/protobuf/empty.proto";
import "temporal/v1/temporal.proto";
service Example {
rpc Foo(FooInput) returns (FooOutput) {
option (temporal.v1.workflow) = {
update: { ref: "Bar" }
};
}
rpc Bar(BarInput) returns (BarOutput) {
option (temporal.v1.update) = {};
}
}
Cross-Namespace (XNS)
Updates can be executed from other workflows in a different Temporal namespace or even an entirely separate Temporal cluster (e.g. on-prem to cloud) using the generated Cross-Namespace helpers. See the Cross-Namespace guide for more usage details.
- Go
- Go (XNS Workflow Run)
- Schema
package example
import (
"fmt"
examplev1 "path/to/gen/example/v1"
"path/to/gen/example/v1/examplev1xns"
"go.temporal.io/sdk/workflow"
)
func MyWorkflow(ctx workflow.Context) error {
_, err := examplev1xns.Bar(ctx, "foo-workflow-id", "", &examplev1.BarInput{});
if err != nil {
return fmt.Errorf("error executing Bar update: %w", err)
}
return err
}
package example
import (
"fmt"
examplev1 "path/to/gen/example/v1"
"path/to/gen/example/v1/examplev1xns"
"go.temporal.io/sdk/workflow"
)
func MyWorkflow(ctx workflow.Context) error {
run, err := examplev1xns.FooAsync(ctx, &examplev1.FooInput{})
if err != nil {
return fmt.Errorf("error starting Foo workflow: %w", err)
}
if _, err := run.Bar(ctx, &examplev1.BarInput{}); err != nil {
return fmt.Errorf("error executing Bar update: %w", err)
}
return run.Get(ctx)
}
syntax="proto3";
package example.v1;
import "google/protobuf/empty.proto";
import "temporal/v1/temporal.proto";
service Example {
rpc Foo(FooInput) returns (google.protobuf.Empty) {
option (temporal.v1.workflow) = {
update: { ref: "Bar" }
};
}
rpc Bar(BarInput) returns (BarOutput) {
option (temporal.v1.update) = {};
}
}
SDK
Proto updates are compatible with official Temporal sdk update methods.
- Go
- Schema
package example
import (
"context"
"log"
examplev1 "path/to/gen/example/v1"
"go.temporal.io/sdk/client"
)
func main() {
c, _ := client.Dial(client.Options{})
ctx := context.Background()
handle, err := client.UpdateWorkflow(ctx, "foo-worklow-id", "", examplev1.BarUpdateName, &examplev1.BarInput{});
if err != nil {
log.Fatalf("error starting workflow update: %v", err)
}
var bar examplev1.BarOutput
if err := handle.Get(ctx, &bar); err != nil {
log.Fatalf("error updating workflow: %v", err)
}
log.Printf("bar update successful: %s", bar.String())
}
syntax="proto3";
package example.v1;
import "google/protobuf/empty.proto";
import "temporal/v1/temporal.proto";
service Example {
rpc Foo(FooInput) returns (google.protobuf.Empty) {
option (temporal.v1.workflow) = {
update: { ref: "Bar" }
};
}
rpc Bar(BarInput) returns (BarOutput) {
option (temporal.v1.update) = {};
}
}
UpdateOptions
Both synchronous and asynchronous update helpers accept an optional <Update>Options
value as the final argument. This argument can be used to override the default client.UpdateWorkflowWithOptionsRequest created using the defaults defined in the schema.
WithUpdateID
Sets the update UpdateID
value
- Go
- Schema
func example(ctx context.Context, client examplev1.ExampleClient) error {
_, err := client.Bar(ctx, "foo-workflow-id", "", &examplev1.BarInput{}, examplev1.NewBarOptions().
WithUpdateID("bar/baz"),
)
return err
}
syntax="proto3";
package example.v1;
import "temporal/v1/temporal.proto";
service Example {
rpc Foo(FooInput) returns (FooOutput) {
option (temporal.v1.workflow) = {
update: { ref: "Bar" }
};
}
rpc Bar(BarInput) returns (BarOutput) {
option (temporal.v1.update) = {
id: 'bar/${! uuid_v4() }'
};
}
}
WithUpdateWorkflowOptions
Override the initial client.UpdateWorkflowWithOptionsRequest value for an individual invocation. Schema defined defaults will be applied over this value.
- Go
- Schema
func example(ctx context.Context, client examplev1.ExampleClient) error {
_, err := client.Bar(ctx, "foo-workflow-id", "", &examplev1.BarInput{}, examplev1.NewBarOptions().
WithUpdateWorkflowOptions(client.UpdateWorkflowWithOptionsRequest{
WaitPolicy: &updatepb.WaitPolicy{
LifecycleStage: enumspb.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_ACCEPTED,
},
}),
)
return err
}
syntax="proto3";
package example.v1;
import "temporal/v1/temporal.proto";
service Example {
rpc Foo(FooInput) returns (FooOutput) {
option (temporal.v1.workflow) = {
update: { ref: "Bar" }
};
}
rpc Bar(BarInput) returns (BarOutput) {
option (temporal.v1.update) = {
wait_policy: WAIT_POLICY_ADMITTED
};
}
}
WithWaitPolicy
Sets the update WaitPolicy
value
- Go
- Schema
func example(ctx context.Context, client examplev1.ExampleClient) error {
_, err := client.Bar(ctx, "foo-workflow-id", "", &examplev1.BarInput{}, examplev1.NewBarOptions().
WithUpdateWorkflowOptions(enumspb.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_ACCEPTED),
)
return err
}
syntax="proto3";
package example.v1;
import "temporal/v1/temporal.proto";
service Example {
rpc Foo(FooInput) returns (FooOutput) {
option (temporal.v1.workflow) = {
update: { ref: "Bar" }
};
}
rpc Bar(BarInput) returns (BarOutput) {
option (temporal.v1.update) = {
wait_policy: WAIT_POLICY_ADMITTED
};
}
}