Skip to main content

Activities

Activities are implemented as methods on a Go struct that satisfies the generated <Service>Activities interface type generated by this plugin

main.go
package main

import (
"context"

examplev1 "path/to/gen/example/v1"
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/worker"
)

type Activities struct {}

func (a *Activities) Hello(ctx context.Context, input *examplev1.HelloInput) (*examplev1.HelloOutput, error) {
return &examplev1.HelloOutput{}, nil
}

func main() {
c, _ := client.Dial(client.Options{})
w := worker.New(c, examplev1.ExampleTaskQueue, worker.Options{})

// Register all example.v1.Example activities with the worker
examplev1.RegisterExampleActivities(w, &Activities{})
w.Run(worker.InterruptCh())
}

Parameters

The signature of an activity method varies based on whether or not the activity definition specifies a non-empty input and/or output message type.

Most activities should specify both an input and output message type, even if the type is empty. This to support the addition of fields to either the input or output (or both) in the future without needing to introduce a breaking change.

example.proto
syntax="proto3";

package example.v1;

import "temporal/v1/temporal.proto";

service Example {
// Hello returns a friendly greeting
rpc Hello(HelloInput) returns (HelloOutput) {
option (temporal.v1.activity) = {
start_to_close_timeout: { seconds: 60 }
};
}
}
main.go
package main

import (
"context"

examplev1 "path/to/gen/example/v1"
)

type Activities struct {}

func (a *Activities) Hello(ctx context.Context, input *examplev1.HelloInput) (*examplev1.HelloOutput, error) {
return &examplev1.HelloOutput{}, nil
}

Registration

The plugin generates helpers for registering your activities with a Temporal worker.

main.go
package main

import (
"context"
"log"

examplev1 "path/to/gen/example/v1"
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/worker"
)

type Activities struct {}

func (a *Activities) Hello(ctx context.Context, input *examplev1.HelloInput) (*examplev1.HelloOutput, error) {
return &examplev1.HelloOutput{}, nil
}

func main() {
// initialize temporal client and worker
c, _ := client.Dial(client.Options{})
w := worker.New(c, examplev1.ExampleTaskQueue, worker.Options{})

// Register all example.v1.Example activities with the worker
examplev1.RegisterExampleActivities(w, &Activities{})
w.Run(worker.InterruptCh())
}

Single Activity Workflows (Tasks)

For convenience, it's possible to define a workflow and activity using the same RPC method.

example.proto
syntax="proto3";

package example.v1;

import "temporal/v1/temporal.proto";

service Example {
// Hello returns a friendly greeting
rpc Hello(HelloInput) returns (HelloOutput) {
option (temporal.v1.workflow) = {};
option (temporal.v1.activity) = {
start_to_close_timeout: { seconds: 60 }
};
}
}

Invocation

Synchronous

The plugin generates typed helpers for executing activities synchronously with the appropriate workflow.ActivityOptions derived from the defaults defined in the schema. These options can be overridden on a per-invocation basis by providing an optional ActivityOptions argument as the final argument to the function

main.go
package main

import (
"context"
"fmt"

examplev1 "path/to/gen/example/v1"
"go.temporal.io/sdk/activity"
"go.temporal.io/sdk/workflow"
)

type (
Workflows struct {}

Activities struct {}

FooWorkflow struct {
*examplev1.FooWorkflowInput
}
)

func (w *Workflows) Foo(ctx workflow.Context, input *examplev1.FooWorkflowInput) (examplev1.FooWorkflow, error) {
return &FooWorkflow{input}, nil
}

func (w *FooWorkflow) Execute(ctx workflow.Context) (*examplev1.FooOutput, error) {
bar, err := examplev1.Bar(ctx, &examplev1.BarInput{})
if err != nil {
return nil, fmt.Errorf("Bar activity error: %w", err)
}
workflow.GetLogger(ctx).Info("Bar activity success", "bar", bar)
return &examplev1.FooOutput{}, nil
}

func (a *Activities) Bar(ctx context.Context, input *examplev1.BarInput) (*examplev1.BarOutput, error) {
activity.GetLogger(ctx).Info("executing Bar activity", "input", input)
return &examplev1.BarOutput{}, nil
}

Asynchronous

The plugin generates typed helpers for executing activities asynchronously with the appropriate workflow.ActivityOptions derived from the defaults defined in the schema. These helpers start the activity and return a Future. Like their synchronous counterparts, these options can be overridden on a per-invocation basis by providing an optional ActivityOptions argument as the final argument to the function

main.go
package main

import (
"context"
"fmt"

examplev1 "path/to/gen/example/v1"
"go.temporal.io/sdk/activity"
"go.temporal.io/sdk/workflow"
)

type (
Workflows struct {}

Activities struct {}

FooWorkflow struct {
*examplev1.FooWorkflowInput
}
)

func (w *Workflows) Foo(ctx workflow.Context, input *examplev1.FooWorkflowInput) (examplev1.FooWorkflow, error) {
return &FooWorkflow{input}, nil
}

func (w *FooWorkflow) Execute(ctx workflow.Context) (*examplev1.FooOutput, error) {
f, err := examplev1.BarAsync(ctx, &examplev1.BarInput{})
if err != nil {
return nil, fmt.Errorf("error starting Bar activity: %w", err)
}

bar, err := f.Get(ctx)
if err != nil {
return nil, fmt.Errorf("Bar activity error: %w", err)
}
workflow.GetLogger(ctx).Info("Bar activity success", "bar", bar)
return &examplev1.FooOutput{}, nil
}

func (a *Activities) Bar(ctx context.Context, input *examplev1.BarInput) (*examplev1.BarOutput, error) {
activity.GetLogger(ctx).Info("executing Bar activity", "input", input)
return &examplev1.BarOutput{}, nil
}

Local

The plugin generates typed helpers for executing activities locally (both synchronously and asynchronously) with the appropriate workflow.LocalActivityOptions derived from the defaults defined in the schema. These options can be overridden on a per-invocation basis by providing an optional LocalActivityOptions argument as the final argument to the function.

main.go
package main

import (
"context"
"fmt"

examplev1 "path/to/gen/example/v1"
"go.temporal.io/sdk/activity"
"go.temporal.io/sdk/workflow"
)

type (
Workflows struct {}

Activities struct {}

FooWorkflow struct {
*examplev1.FooWorkflowInput
}
)

func (w *Workflows) Foo(ctx workflow.Context, input *examplev1.FooWorkflowInput) (examplev1.FooWorkflow, error) {
return &FooWorkflow{input}, nil
}

func (w *FooWorkflow) Execute(ctx workflow.Context) (*examplev1.FooOutput, error) {
bar, err := examplev1.BarLocal(ctx, &examplev1.BarInput{})
if err != nil {
return nil, fmt.Errorf("error starting local Bar activity: %w", err)
}
workflow.GetLogger(ctx).Info("Bar local activity success", "bar", bar)
return &examplev1.FooOutput{}, nil
}

func (a *Activities) Bar(ctx context.Context, input *examplev1.BarInput) (*examplev1.BarOutput, error) {
activity.GetLogger(ctx).Info("executing Bar activity", "input", input)
return &examplev1.BarOutput{}, nil
}

Future

For each Activity, a corresponding <Activity>Future struct is generated with methods for conveniently interacting with the activity execution that wraps the underlying workflow.Future returned by the SDK. All asynchronous activity helpers return this value.

Get

Blocks until the activity completes, returning the activity output or error.

main.go
package main

import (
"context"
"fmt"

examplev1 "path/to/gen/example/v1"
"go.temporal.io/sdk/activity"
"go.temporal.io/sdk/workflow"
)

type (
Workflows struct {}

Activities struct {}

FooWorkflow struct {
*examplev1.FooWorkflowInput
}
)

func (w *Workflows) Foo(ctx workflow.Context, input *examplev1.FooWorkflowInput) (examplev1.FooWorkflow, error) {
return &FooWorkflow{input}, nil
}

func (w *FooWorkflow) Execute(ctx workflow.Context) (*examplev1.FooOutput, error) {
f, err := examplev1.BarAsync(ctx, &examplev1.BarInput{})
if err != nil {
return nil, fmt.Errorf("error starting Bar activity: %w", err)
}

bar, err := f.Get(ctx)
if err != nil {
return nil, fmt.Errorf("Bar activity error: %w", err)
}
workflow.GetLogger(ctx).Info("Bar activity success", "bar", bar)
return &examplev1.FooOutput{}, nil
}

func (a *Activities) Bar(ctx context.Context, input *examplev1.BarInput) (*examplev1.BarOutput, error) {
activity.GetLogger(ctx).Info("executing Bar activity", "input", input)
return &examplev1.BarOutput{}, nil
}

Select

Registers activity completion callback on the given selector.

main.go
package main

import (
"context"
"fmt"

examplev1 "path/to/gen/example/v1"
"go.temporal.io/sdk/activity"
"go.temporal.io/sdk/workflow"
)

type (
Workflows struct {}

Activities struct {}

FooWorkflow struct {
*examplev1.FooWorkflowInput
}
)

func (w *Workflows) Foo(ctx workflow.Context, input *examplev1.FooWorkflowInput) (examplev1.FooWorkflow, error) {
return &FooWorkflow{input}, nil
}

func (w *FooWorkflow) Execute(ctx workflow.Context) (*examplev1.FooOutput, error) {
barF, err := examplev1.BarAsync(ctx, &examplev1.BarInput{})
if err != nil {
return nil, fmt.Errorf("error starting Bar activity: %w", err)
}

bazF, err := examplev1.BazAsync(ctx &examplev1.BazInput{})
if err != nil {
return nil, fmt.Errorf("error starting Baz activity: %w", err)
}

sel := workflow.NewSelect(ctx)
barF.Select(sel, func(bar *BarOutput) {
workflow.GetLogger(ctx).Info("Bar activity success", "bar", bar)
})
bazF.Select(sel, func(baz *BazOutput) {
workflow.GetLogger(ctx).Info("Baz activity success", "baz", baz)
})
sel.Select(ctx)
return &examplev1.FooOutput{}, nil
}

func (a *Activities) Bar(ctx context.Context, input *examplev1.BarInput) (*examplev1.BarOutput, error) {
activity.GetLogger(ctx).Info("executing Bar activity", "input", input)
return &examplev1.BarOutput{}, nil
}

func (a *Activities) Baz(ctx context.Context, input *examplev1.BazInput) (*examplev1.BazOutput, error) {
activity.GetLogger(ctx).Info("executing Baz activity", "input", input)
return &examplev1.BazOutput{}, nil
}

Future

workflow.Future

In addition to the methods, the underlying Future is available via the public Future field.

main.go
package main

import (
"context"
"fmt"

examplev1 "path/to/gen/example/v1"
"go.temporal.io/sdk/activity"
"go.temporal.io/sdk/workflow"
)

type (
Workflows struct {}

Activities struct {}

FooWorkflow struct {
*examplev1.FooWorkflowInput
}
)

func (w *Workflows) Foo(ctx workflow.Context, input *examplev1.FooWorkflowInput) (examplev1.FooWorkflow, error) {
return &FooWorkflow{input}, nil
}

func (w *FooWorkflow) Execute(ctx workflow.Context) (*examplev1.FooOutput, error) {
barF, err := examplev1.BarAsync(ctx, &examplev1.BarInput{})
if err != nil {
return nil, fmt.Errorf("error starting Bar activity: %w", err)
}

bazF, err := examplev1.BazAsync(ctx &examplev1.BazInput{})
if err != nil {
return nil, fmt.Errorf("error starting Baz activity: %w", err)
}

sel := workflow.NewSelect(ctx).
AddFuture(barF.Future, func(workflow.Future) {
bar, _ := barF.Get(ctx)
workflow.GetLogger(ctx).Info("Bar activity success", "bar", bar)
}).
AddFuture(bazF.Future, func(workflow.Future) {
baz, _ := barF.Get(ctx)
workflow.GetLogger(ctx).Info("Baz activity success", "baz", baz)
}).
Select(ctx)
return &examplev1.FooOutput{}, nil
}

func (a *Activities) Bar(ctx context.Context, input *examplev1.BarInput) (*examplev1.BarOutput, error) {
activity.GetLogger(ctx).Info("executing Bar activity", "input", input)
return &examplev1.BarOutput{}, nil
}

func (a *Activities) Baz(ctx context.Context, input *examplev1.BazInput) (*examplev1.BazOutput, error) {
activity.GetLogger(ctx).Info("executing Baz activity", "input", input)
return &examplev1.BazOutput{}, nil
}

Options

Both synchronous and asynchronous activity helpers accept an optional <Activity>ActivityOptions value as the final argument. This argument can be used to override the default workflow.ActivityOptions created using the defaults defined in the schema.

WithActivityOptions

Override the initial workflow.ActivityOptions value for an individual invocation. Schema defined defaults will be applied over this value.

example.go
func (w *FooWorkflow) Execute(ctx workflow.Context) (*examplev1.FooOutput, error) {
_, err := examplev1.Bar(ctx, &examplev1.BarInput{}, examplev1.NewBarActivityOptions().
WithActivityOptions(workflow.ActivityOptions{
ScheduleToCloseTimeout: time.Minute, // this overrides the 30s default defined in the schema
}),
)
return &examplev1.FooOutput{}, err
}

WithHeartbeatTimeout

Set the activity HeartbeatTimeout value

example.go
func (w *FooWorkflow) Execute(ctx workflow.Context) (*examplev1.FooOutput, error) {
_, err := examplev1.Bar(ctx, &examplev1.BarInput{}, examplev1.NewBarActivityOptions().
WithHeartbeatTimeout(time.Second * 20),
)
return &examplev1.FooOutput{}, err
}

WithRetryPolicy

Set the activity RetryPolicy value

example.go
func (w *FooWorkflow) Execute(ctx workflow.Context) (*examplev1.FooOutput, error) {
_, err := examplev1.Bar(ctx, &examplev1.BarInput{}, examplev1.NewBarActivityOptions().
WithRetryPolicy(&temporal.RetryPolicy{
MaximumAttempts: 3,
}),
)
return &examplev1.FooOutput{}, err
}

WithScheduleToCloseTimeout

Set the activity ScheduleToCloseTimeout value

example.go
func (w *FooWorkflow) Execute(ctx workflow.Context) (*examplev1.FooOutput, error) {
_, err := examplev1.Bar(ctx, &examplev1.BarInput{}, examplev1.NewBarActivityOptions().
WithScheduleToCloseTimeout(time.Second * 240), // 4m
)
return &examplev1.FooOutput{}, err
}

WithScheduleToStartTimeout

Set the activity ScheduleToStartTimeout value

example.go
func (w *FooWorkflow) Execute(ctx workflow.Context) (*examplev1.FooOutput, error) {
_, err := examplev1.Bar(ctx, &examplev1.BarInput{}, examplev1.NewBarActivityOptions().
WithScheduleToStartTimeout(time.Second * 10),
)
return &examplev1.FooOutput{}, err
}

WithStartToCloseTimeout

Set the activity StartToCloseTimeout value

example.go
func (w *FooWorkflow) Execute(ctx workflow.Context) (*examplev1.FooOutput, error) {
_, err := examplev1.Bar(ctx, &examplev1.BarInput{}, examplev1.NewBarActivityOptions().
WithStartToCloseTimeout(time.Second * 90),
)
return &examplev1.FooOutput{}, err
}

WithTaskQueue

Set the activity TaskQueue value

example.go
func (w *FooWorkflow) Execute(ctx workflow.Context) (*examplev1.FooOutput, error) {
_, err := examplev1.Bar(ctx, &examplev1.BarInput{}, examplev1.NewBarActivityOptions().
WithTaskQueue("example-v3"),
)
return &examplev1.FooOutput{}, err
}

WithWaitForCancellation

Set the activity WaitForCancellation value

example.go
func (w *FooWorkflow) Execute(ctx workflow.Context) (*examplev1.FooOutput, error) {
_, err := examplev1.Bar(ctx, &examplev1.BarInput{}, examplev1.NewBarActivityOptions().
WithWaitForCancellation(false),
)
return &examplev1.FooOutput{}, err
}

Local Options

Both synchronous and asynchronous local activity helpers accept an optional <Activity>LocalActivityOptions value as the final argument. This argument can be used to override the default workflow.LocalActivityOptions created using the defaults defined in the schema.

Local

Override the local activity with a caller-defined implementation.

example.go
func (w *FooWorkflow) Execute(ctx workflow.Context) (*examplev1.FooOutput, error) {
_, err := examplev1.BarLocal(ctx, &examplev1.BarInput{}, examplev1.NewBarLocalActivityOptions().
Local(func(ctx context.Context, input *examplev1.BarInput) (*examplev1.BarOutput, error) {
activity.GetLogger(ctx).Info("executing custom Bar activity", "input", input)
return &examplev1.BarOutput{}, nil
}),
)
return &examplev1.FooOutput{}, err
}

WithLocalActivityOptions

Override the initial workflow.LocalActivityOptions value for an individual invocation. Schema defined defaults will be applied over this value.

example.go
func (w *FooWorkflow) Execute(ctx workflow.Context) (*examplev1.FooOutput, error) {
_, err := examplev1.BarLocal(ctx, &examplev1.BarInput{}, examplev1.NewBarLocalActivityOptions().
WithLocalActivityOptions(workflow.LocalActivityOptions{
ScheduleToCloseTimeout: time.Minute, // this overrides the 30s default defined in the schema
}),
)
return &examplev1.FooOutput{}, err
}

WithRetryPolicy

Set the activity RetryPolicy value

example.go
func (w *FooWorkflow) Execute(ctx workflow.Context) (*examplev1.FooOutput, error) {
_, err := examplev1.BarLocal(ctx, &examplev1.BarInput{}, examplev1.NewBarLocalActivityOptions().
WithRetryPolicy(&temporal.RetryPolicy{
MaximumAttempts: 3,
}),
)
return &examplev1.FooOutput{}, err
}

WithScheduleToClose

Set the activity ScheduleToClose value

example.go
func (w *FooWorkflow) Execute(ctx workflow.Context) (*examplev1.FooOutput, error) {
_, err := examplev1.BarLocal(ctx, &examplev1.BarInput{}, examplev1.NewBarLocalActivityOptions().
WithScheduleToClose(time.Second * 240),
)
return &examplev1.FooOutput{}, err
}

WithStartToCloseTimeout

Set the activity StartToCloseTimeout value

example.go
func (w *FooWorkflow) Execute(ctx workflow.Context) (*examplev1.FooOutput, error) {
_, err := examplev1.BarLocal(ctx, &examplev1.BarInput{}, examplev1.NewBarLocalActivityOptions().
WithStartToCloseTimeout(time.Second * 90),
)
return &examplev1.FooOutput{}, err
}