Getting Started
Setup
1. Create example project
Create a new project and initialize go and buf modules
mkdir -p example/proto
cd example
go mod init example
go get -u "github.com/cludden/protoc-gen-go-temporal@$(curl --silent https://api.github.com/repos/cludden/protoc-gen-go-temporal/releases/latest|jq -r .tag_name)"
go mod tidy
cd proto
buf mod init
2. Create buf.yaml
Configure buf module and add buf.build/cludden/protoc-gen-go-temporal
as a proto dependency to proto/buf.yaml
proto/buf.yaml
version: v1
deps:
- buf.build/cludden/protoc-gen-go-temporal
breaking:
use:
- FILE
lint:
allow_comment_ignores: true
use:
- BASIC
3. Create buf.gen.yaml
Configure buf code generation
buf.gen.yaml
version: v1
managed:
enabled: true
go_package_prefix:
default: example/gen
except:
- buf.build/cludden/protoc-gen-go-temporal
plugins:
- plugin: go
out: gen
opt: paths=source_relative
- plugin: go_temporal
out: gen
opt: paths=source_relative,cli-enabled=true,cli-categories=true,workflow-update-enabled=true,docs-out=./proto/README.md
strategy: all
4. Create buf.work.yaml
Configure buf workspace
buf.work.yaml
version: v1
directories:
- proto
Generate
5. Define schema
proto/example/v1/example.proto
syntax="proto3";
package example.v1;
import "google/protobuf/empty.proto";
import "temporal/v1/temporal.proto";
service Example {
option (temporal.v1.service) = {
task_queue: "example-v1"
};
// CreateFoo creates a new foo operation
rpc CreateFoo(CreateFooRequest) returns (CreateFooResponse) {
option (temporal.v1.workflow) = {
execution_timeout: { seconds: 3600 } // foos can take awhile to create
id_reuse_policy: WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE
id: 'create-foo/${! name.slug() }'
query: { ref: "GetFooProgress" }
signal: { ref: "SetFooProgress", start: true }
update: { ref: "UpdateFooProgress" }
};
}
// GetFooProgress returns the status of a CreateFoo operation
rpc GetFooProgress(google.protobuf.Empty) returns (GetFooProgressResponse) {
option (temporal.v1.query) = {};
}
// Notify sends a notification
rpc Notify(NotifyRequest) returns (google.protobuf.Empty) {
option (temporal.v1.activity) = {
start_to_close_timeout: { seconds: 30 }
retry_policy: {
max_attempts: 3
}
};
}
// SetFooProgress sets the current status of a CreateFoo operation
rpc SetFooProgress(SetFooProgressRequest) returns (google.protobuf.Empty) {
option (temporal.v1.signal) = {};
}
// UpdateFooProgress sets the current status of a CreateFoo operation
rpc UpdateFooProgress(SetFooProgressRequest) returns (GetFooProgressResponse) {
option (temporal.v1.update) = {
id: 'update-progress/${! progress.string() }',
};
}
}
// CreateFooRequest describes the input to a CreateFoo workflow
message CreateFooRequest {
// unique foo name
string name = 1;
}
// SampleWorkflowWithMutexResponse describes the output from a CreateFoo workflow
message CreateFooResponse {
Foo foo = 1;
}
// Foo describes an illustrative foo resource
message Foo {
string name = 1;
Status status = 2;
enum Status {
FOO_STATUS_UNSPECIFIED = 0;
FOO_STATUS_READY = 1;
FOO_STATUS_CREATING = 2;
}
}
// GetFooProgressResponse describes the output from a GetFooProgress query
message GetFooProgressResponse {
float progress = 1;
Foo.Status status = 2;
}
// NotifyRequest describes the input to a Notify activity
message NotifyRequest {
string message = 1;
}
// SetFooProgressRequest describes the input to a SetFooProgress signal
message SetFooProgressRequest {
// value of current workflow progress
float progress = 1;
}
6. Generate go code
Generate temporal worker, client, and cli types, methods, interfaces, and functions
buf mod update
cd ..
buf generate
Implement
7. Implement worker
Implement the required Workflow and Activity interfaces and create a new Temporal worker using the generated helpers
main.go
package main
import (
"context"
"fmt"
"log"
"os"
examplev1 "example/gen/example/v1"
"github.com/urfave/cli/v2"
"go.temporal.io/sdk/activity"
"go.temporal.io/sdk/client"
sdklog "go.temporal.io/sdk/log"
"go.temporal.io/sdk/worker"
"go.temporal.io/sdk/workflow"
)
type (
// Workflows manages shared state for workflow constructors and is used to
// register workflows with a worker
Workflows struct{}
// Activities manages shared state for activities and is used to register
// activities with a worker
Activities struct{}
// CreateFooWorkflow manages workflow state for a CreateFoo workflow
CreateFooWorkflow struct {
// it embeds the generated workflow Input type that contains the workflow
// input and signal helpers
*examplev1.CreateFooWorkflowInput
log sdklog.Logger
progress float32
status examplev1.Foo_Status
}
)
// CreateFoo implements a CreateFoo workflow constructor on the shared Workflows struct
// that initializes a new CreateFooWorkflow for each execution
func (w *Workflows) CreateFoo(ctx workflow.Context, input *examplev1.CreateFooWorkflowInput) (examplev1.CreateFooWorkflow, error) {
return &CreateFooWorkflow{
CreateFooWorkflowInput: input,
log: workflow.GetLogger(ctx),
status: examplev1.Foo_FOO_STATUS_CREATING,
}, nil
}
// Execute defines the entrypoint to a CreateFooWorkflow
func (wf *CreateFooWorkflow) Execute(ctx workflow.Context) (*examplev1.CreateFooResponse, error) {
// listen for signals using generated signal provided by workflow input
workflow.Go(ctx, func(ctx workflow.Context) {
for {
signal, _ := wf.SetFooProgress.Receive(ctx)
wf.UpdateFooProgress(ctx, signal)
}
})
// execute Notify activity using generated helper
if err := examplev1.Notify(ctx, &examplev1.NotifyRequest{
Message: fmt.Sprintf("creating foo resource (%s)", wf.Req.GetName()),
}); err != nil {
return nil, fmt.Errorf("error sending notification: %w", err)
}
// block until progress has reached 100 via signals and/or updates
if err := workflow.Await(ctx, func() bool {
return wf.status == examplev1.Foo_FOO_STATUS_READY
}); err != nil {
return nil, fmt.Errorf("error awaiting ready status: %w", err)
}
return &examplev1.CreateFooResponse{
Foo: &examplev1.Foo{
Name: wf.Req.GetName(),
Status: wf.status,
},
}, nil
}
// GetFooProgress defines the handler for a GetFooProgress query
func (wf *CreateFooWorkflow) GetFooProgress() (*examplev1.GetFooProgressResponse, error) {
return &examplev1.GetFooProgressResponse{Progress: wf.progress, Status: wf.status}, nil
}
// UpdateFooProgress defines the handler for an UpdateFooProgress update
func (wf *CreateFooWorkflow) UpdateFooProgress(ctx workflow.Context, req *examplev1.SetFooProgressRequest) (*examplev1.GetFooProgressResponse, error) {
wf.progress = req.GetProgress()
switch {
case wf.progress < 0:
wf.progress, wf.status = 0, examplev1.Foo_FOO_STATUS_CREATING
case wf.progress < 100:
wf.status = examplev1.Foo_FOO_STATUS_CREATING
case wf.progress >= 100:
wf.progress, wf.status = 100, examplev1.Foo_FOO_STATUS_READY
}
return &examplev1.GetFooProgressResponse{Progress: wf.progress, Status: wf.status}, nil
}
// Notify defines the implementation for a Notify activity
func (a *Activities) Notify(ctx context.Context, req *examplev1.NotifyRequest) error {
activity.GetLogger(ctx).Info("notification", "message", req.GetMessage())
return nil
}
func main() {
// initialize the generated cli application
app, err := examplev1.NewExampleCli(
examplev1.NewExampleCliOptions().WithWorker(func(cmd *cli.Context, c client.Client) (worker.Worker, error) {
// register activities and workflows using generated helpers
w := worker.New(c, examplev1.ExampleTaskQueue, worker.Options{})
examplev1.RegisterExampleActivities(w, &Activities{})
examplev1.RegisterExampleWorkflows(w, &Workflows{})
return w, nil
}),
)
if err != nil {
log.Fatalf("error initializing example cli: %v", err)
}
// run cli
if err := app.Run(os.Args); err != nil {
log.Fatal(err)
}
}
Run
8. Start Temporal
Start temporal and visit the UI at http://localhost:8233.
temporal server start-dev \
--dynamic-config-value "frontend.enableUpdateWorkflowExecution=true" \
--dynamic-config-value "frontend.enableUpdateWorkflowExecutionAsyncAccepted=true"
9. Start worker
In a different terminal, start the worker.
go mod tidy
go run main.go worker
10. Start a workflow
In a different terminal, start a workflow.
- CLI
- Client
View CLI options
go run main.go -h
Execute a workflow
go run main.go create-foo -d --name test
Send a signal
go run main.go set-foo-progress -w create-foo/test --progress 5.7
Query the workflow
go run main.go get-foo-progress -w create-foo/test
Update the workflow
go run main.go update-foo-progress -w create-foo/test --progress 100
Query the completed workflow
go run main.go get-foo-progress -w create-foo/test
cmd/client/main.go
package main
import (
"context"
"log"
examplev1 "github.com/cludden/protoc-gen-go-temporal/gen/example/v1"
"go.temporal.io/sdk/client"
)
func main() {
c, _ := client.Dial(client.Options{})
client, ctx := examplev1.NewClient(c), context.Background()
run, _ := client.CreateFooAsync(ctx, &examplev1.CreateFooRequest{Name: "test"})
log.Printf("started workflow: workflow_id=%s, run_id=%s\n", run.ID(), run.RunID())
log.Println("signalling progress")
_ = run.SetFooProgress(ctx, &examplev1.SetFooProgressRequest{Progress: 5.7})
progress, _ := run.GetFooProgress(ctx)
log.Printf("queried progress: %s\n", progress.String())
update, _ := run.UpdateFooProgress(ctx, &examplev1.SetFooProgressRequest{Progress: 100})
log.Printf("updated progress: %s\n", update.String())
resp, _ := run.Get(ctx)
log.Printf("workflow completed: %s\n", resp.String())
}