Activities
Activities are implemented as methods on a Go struct
that satisfies the generated <Service>Activities
interface type generated by this plugin
- Go
- Schema
package main
import (
"context"
examplev1 "path/to/gen/example/v1"
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/worker"
)
type Activities struct {}
func (a *Activities) Hello(ctx context.Context, input *examplev1.HelloInput) (*examplev1.HelloOutput, error) {
return &examplev1.HelloOutput{}, nil
}
func main() {
c, _ := client.Dial(client.Options{})
w := worker.New(c, examplev1.ExampleTaskQueue, worker.Options{})
// Register all example.v1.Example activities with the worker
examplev1.RegisterExampleActivities(w, &Activities{})
w.Run(worker.InterruptCh())
}
syntax="proto3";
package example.v1;
import "temporal/v1/temporal.proto";
service Example {
option (temporal.v1.service).task_queue = "example-v1";
// Hello returns a friendly greeting
rpc Hello(HelloInput) returns (HelloOutput) {
option (temporal.v1.activity) = {
start_to_close_timeout: { seconds: 60 }
};
}
}
Parameters
The signature of an activity method varies based on whether or not the activity definition specifies a non-empty input and/or output message type.
- Input & Output Parameters
- No Output Parameter
- No Input Parameter
- No Parameters
Most activities should specify both an input and output message type, even if the type is empty. This to support the addition of fields to either the input or output (or both) in the future without needing to introduce a breaking change.
syntax="proto3";
package example.v1;
import "temporal/v1/temporal.proto";
service Example {
// Hello returns a friendly greeting
rpc Hello(HelloInput) returns (HelloOutput) {
option (temporal.v1.activity) = {
start_to_close_timeout: { seconds: 60 }
};
}
}
package main
import (
"context"
examplev1 "path/to/gen/example/v1"
)
type Activities struct {}
func (a *Activities) Hello(ctx context.Context, input *examplev1.HelloInput) (*examplev1.HelloOutput, error) {
return &examplev1.HelloOutput{}, nil
}
An Activity output can be omitted using the native google.protobuf.Empty type. This modifies the signature of the Activity method to have a single return value of type error
. Note that this also requires an additional google/protobuf/empty.proto
protobuf import statement.
syntax="proto3";
package example.v1;
import "google/protobuf/empty.proto";
import "temporal/v1/temporal.proto";
service Example {
// Hello returns a friendly greeting
rpc Hello(HelloInput) returns (google.protobuf.Empty) {
option (temporal.v1.activity) = {
start_to_close_timeout: { seconds: 60 }
};
}
}
package main
import (
"context"
examplev1 "path/to/gen/example/v1"
)
type Activities struct {}
func (a *Activities) Hello(ctx context.Context, input *examplev1.HelloInput) error {
return nil
}
An Activity input can be omitted using the native google.protobuf.Empty type. This modifies the signature of the Activity method to have a single input argument of type context.Context
. Note that this also requires an additional google/protobuf/empty.proto
protobuf import statement.
syntax="proto3";
package example.v1;
import "google/protobuf/empty.proto";
import "temporal/v1/temporal.proto";
service Example {
// Hello returns a friendly greeting
rpc Hello(google.protobuf.Empty) returns (HelloOutput) {
option (temporal.v1.activity) = {
start_to_close_timeout: { seconds: 60 }
};
}
}
package main
import (
"context"
examplev1 "path/to/gen/example/v1"
)
type Activities struct {}
func (a *Activities) Hello(ctx context.Context) (*examplev1.HelloOutput, error) {
return &examplev1.HelloOutput{}, nil
}
An Activity can omit both input and output parameters using the native google.protobuf.Empty type. This modifies the signature of the Activity method to accept a single argument of type context.Context
and return a single value of type error
. Note that this also requires an additional google/protobuf/empty.proto
protobuf import statement.
syntax="proto3";
package example.v1;
import "google/protobuf/empty.proto";
import "temporal/v1/temporal.proto";
service Example {
// Hello returns a friendly greeting
rpc Hello(google.protobuf.Empty) returns (google.protobuf.Empty) {
option (temporal.v1.activity) = {
start_to_close_timeout: { seconds: 60 }
};
}
}
package main
import (
"context"
examplev1 "path/to/gen/example/v1"
)
type Activities struct {}
func (a *Activities) Hello(ctx context.Context) error {
return nil
}
Registration
The plugin generates helpers for registering your activities with a Temporal worker.
- Go
- Schema
package main
import (
"context"
"log"
examplev1 "path/to/gen/example/v1"
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/worker"
)
type Activities struct {}
func (a *Activities) Hello(ctx context.Context, input *examplev1.HelloInput) (*examplev1.HelloOutput, error) {
return &examplev1.HelloOutput{}, nil
}
func main() {
// initialize temporal client and worker
c, _ := client.Dial(client.Options{})
w := worker.New(c, examplev1.ExampleTaskQueue, worker.Options{})
// Register all example.v1.Example activities with the worker
examplev1.RegisterExampleActivities(w, &Activities{})
w.Run(worker.InterruptCh())
}
syntax="proto3";
package example.v1;
import "temporal/v1/temporal.proto";
service Example {
option (temporal.v1.service) = {
task_queue: "example-v1"
};
// Hello returns a friendly greeting
rpc Hello(HelloInput) returns (HelloOutput) {
option (temporal.v1.activity) = {
start_to_close_timeout: { seconds: 60 }
};
}
}
Single Activity Workflows (Tasks)
For convenience, it's possible to define a workflow and activity using the same RPC method.
- Schema
- Go
syntax="proto3";
package example.v1;
import "temporal/v1/temporal.proto";
service Example {
// Hello returns a friendly greeting
rpc Hello(HelloInput) returns (HelloOutput) {
option (temporal.v1.workflow) = {};
option (temporal.v1.activity) = {
start_to_close_timeout: { seconds: 60 }
};
}
}
package main
import (
"context"
examplev1 "path/to/gen/example/v1"
"go.temporal.io/sdk/activity"
"go.temporal.io/sdk/workflow"
)
type (
Workflows struct {}
Activities struct {}
HelloWorkflow struct {
*examplev1.HelloWorkflowInput
}
)
func (w *Workflows) Hello(ctx workflow.Context, input *examplev1.HelloWorkflowInput) (examplev1.HelloWorkflow, error) {
return &HelloWorkflow{input}, nil
}
func (w *HelloWorkflow) Execute(ctx workflow.Context) (*examplev1.HelloOutput, error) {
workflow.GetLogger(ctx).Info("executing hello workflow", "input", w.Req)
return examplev1.Hello(ctx, w.Req)
}
func (a *Activities) Hello(ctx context.Context, input *examplev1.HelloInput) (*examplev1.HelloOutput, error) {
activity.GetLogger(ctx).Info("executing hello activity", "input", input)
return &examplev1.HelloOutput{}, nil
}
Invocation
Synchronous
The plugin generates typed helpers for executing activities synchronously with the appropriate workflow.ActivityOptions derived from the defaults defined in the schema. These options can be overridden on a per-invocation basis by providing an optional ActivityOptions argument as the final argument to the function
- Go
- Schema
package main
import (
"context"
"fmt"
examplev1 "path/to/gen/example/v1"
"go.temporal.io/sdk/activity"
"go.temporal.io/sdk/workflow"
)
type (
Workflows struct {}
Activities struct {}
FooWorkflow struct {
*examplev1.FooWorkflowInput
}
)
func (w *Workflows) Foo(ctx workflow.Context, input *examplev1.FooWorkflowInput) (examplev1.FooWorkflow, error) {
return &FooWorkflow{input}, nil
}
func (w *FooWorkflow) Execute(ctx workflow.Context) (*examplev1.FooOutput, error) {
bar, err := examplev1.Bar(ctx, &examplev1.BarInput{})
if err != nil {
return nil, fmt.Errorf("Bar activity error: %w", err)
}
workflow.GetLogger(ctx).Info("Bar activity success", "bar", bar)
return &examplev1.FooOutput{}, nil
}
func (a *Activities) Bar(ctx context.Context, input *examplev1.BarInput) (*examplev1.BarOutput, error) {
activity.GetLogger(ctx).Info("executing Bar activity", "input", input)
return &examplev1.BarOutput{}, nil
}
syntax="proto3";
package example.v1;
import "temporal/v1/temporal.proto";
service Example {
rpc Foo(FooInput) returns (FooOutput) {
option (temporal.v1.workflow) = {};
}
rpc Bar(BarInput) returns (BarOutput) {
option (temporal.v1.activity) = {
schedule_to_close_timeout: { seconds: 30 }
};
}
}
Asynchronous
The plugin generates typed helpers for executing activities asynchronously with the appropriate workflow.ActivityOptions derived from the defaults defined in the schema. These helpers start the activity and return a Future. Like their synchronous counterparts, these options can be overridden on a per-invocation basis by providing an optional ActivityOptions argument as the final argument to the function
- Go
- Schema
package main
import (
"context"
"fmt"
examplev1 "path/to/gen/example/v1"
"go.temporal.io/sdk/activity"
"go.temporal.io/sdk/workflow"
)
type (
Workflows struct {}
Activities struct {}
FooWorkflow struct {
*examplev1.FooWorkflowInput
}
)
func (w *Workflows) Foo(ctx workflow.Context, input *examplev1.FooWorkflowInput) (examplev1.FooWorkflow, error) {
return &FooWorkflow{input}, nil
}
func (w *FooWorkflow) Execute(ctx workflow.Context) (*examplev1.FooOutput, error) {
f, err := examplev1.BarAsync(ctx, &examplev1.BarInput{})
if err != nil {
return nil, fmt.Errorf("error starting Bar activity: %w", err)
}
bar, err := f.Get(ctx)
if err != nil {
return nil, fmt.Errorf("Bar activity error: %w", err)
}
workflow.GetLogger(ctx).Info("Bar activity success", "bar", bar)
return &examplev1.FooOutput{}, nil
}
func (a *Activities) Bar(ctx context.Context, input *examplev1.BarInput) (*examplev1.BarOutput, error) {
activity.GetLogger(ctx).Info("executing Bar activity", "input", input)
return &examplev1.BarOutput{}, nil
}
syntax="proto3";
package example.v1;
import "temporal/v1/temporal.proto";
service Example {
rpc Foo(FooInput) returns (FooOutput) {
option (temporal.v1.workflow) = {};
}
rpc Bar(BarInput) returns (BarOutput) {
option (temporal.v1.activity) = {
schedule_to_close_timeout: { seconds: 30 }
};
}
}
Local
The plugin generates typed helpers for executing activities locally (both synchronously and asynchronously) with the appropriate workflow.LocalActivityOptions derived from the defaults defined in the schema. These options can be overridden on a per-invocation basis by providing an optional LocalActivityOptions argument as the final argument to the function.
- Go
- Go (Custom)
- Schema
package main
import (
"context"
"fmt"
examplev1 "path/to/gen/example/v1"
"go.temporal.io/sdk/activity"
"go.temporal.io/sdk/workflow"
)
type (
Workflows struct {}
Activities struct {}
FooWorkflow struct {
*examplev1.FooWorkflowInput
}
)
func (w *Workflows) Foo(ctx workflow.Context, input *examplev1.FooWorkflowInput) (examplev1.FooWorkflow, error) {
return &FooWorkflow{input}, nil
}
func (w *FooWorkflow) Execute(ctx workflow.Context) (*examplev1.FooOutput, error) {
bar, err := examplev1.BarLocal(ctx, &examplev1.BarInput{})
if err != nil {
return nil, fmt.Errorf("error starting local Bar activity: %w", err)
}
workflow.GetLogger(ctx).Info("Bar local activity success", "bar", bar)
return &examplev1.FooOutput{}, nil
}
func (a *Activities) Bar(ctx context.Context, input *examplev1.BarInput) (*examplev1.BarOutput, error) {
activity.GetLogger(ctx).Info("executing Bar activity", "input", input)
return &examplev1.BarOutput{}, nil
}
It's possible to override a local activity execution definition using the Local local activity option.
package main
import (
"context"
"fmt"
examplev1 "path/to/gen/example/v1"
"go.temporal.io/sdk/activity"
"go.temporal.io/sdk/workflow"
)
type (
Workflows struct {}
Activities struct {}
FooWorkflow struct {
*examplev1.FooWorkflowInput
}
)
func (w *Workflows) Foo(ctx workflow.Context, input *examplev1.FooWorkflowInput) (examplev1.FooWorkflow, error) {
return &FooWorkflow{input}, nil
}
func (w *FooWorkflow) Execute(ctx workflow.Context) (*examplev1.FooOutput, error) {
bar, err := examplev1.BarLocal(ctx, &examplev1.BarInput{}, examplev1.NewBarLocalActivityOptions().
Local(func(ctx context.Context, input *BarInput) (*BarOutput, error) {
activity.GetLogger(ctx).Info("executing custom Bar local activity", "input", input)
return &examplev1.BarOutput{}, nil
}),
)
if err != nil {
return nil, fmt.Errorf("error starting local Bar activity: %w", err)
}
workflow.GetLogger(ctx).Info("Bar local activity success", "bar", bar)
return &examplev1.FooOutput{}, nil
}
func (a *Activities) Bar(ctx context.Context, input *examplev1.BarInput) (*examplev1.BarOutput, error) {
activity.GetLogger(ctx).Info("executing Bar activity", "input", input)
return &examplev1.BarOutput{}, nil
}
syntax="proto3";
package example.v1;
import "temporal/v1/temporal.proto";
service Example {
rpc Foo(FooInput) returns (FooOutput) {
option (temporal.v1.workflow) = {};
}
rpc Bar(BarInput) returns (BarOutput) {
option (temporal.v1.activity) = {
schedule_to_close_timeout: { seconds: 30 }
};
}
}
Future
For each Activity, a corresponding <Activity>Future
struct is generated with methods for conveniently interacting with the activity execution that wraps the underlying workflow.Future returned by the SDK. All asynchronous activity helpers return this value.
Get
Blocks until the activity completes, returning the activity output or error.
- Go
- Schema
package main
import (
"context"
"fmt"
examplev1 "path/to/gen/example/v1"
"go.temporal.io/sdk/activity"
"go.temporal.io/sdk/workflow"
)
type (
Workflows struct {}
Activities struct {}
FooWorkflow struct {
*examplev1.FooWorkflowInput
}
)
func (w *Workflows) Foo(ctx workflow.Context, input *examplev1.FooWorkflowInput) (examplev1.FooWorkflow, error) {
return &FooWorkflow{input}, nil
}
func (w *FooWorkflow) Execute(ctx workflow.Context) (*examplev1.FooOutput, error) {
f, err := examplev1.BarAsync(ctx, &examplev1.BarInput{})
if err != nil {
return nil, fmt.Errorf("error starting Bar activity: %w", err)
}
bar, err := f.Get(ctx)
if err != nil {
return nil, fmt.Errorf("Bar activity error: %w", err)
}
workflow.GetLogger(ctx).Info("Bar activity success", "bar", bar)
return &examplev1.FooOutput{}, nil
}
func (a *Activities) Bar(ctx context.Context, input *examplev1.BarInput) (*examplev1.BarOutput, error) {
activity.GetLogger(ctx).Info("executing Bar activity", "input", input)
return &examplev1.BarOutput{}, nil
}
syntax="proto3";
package example.v1;
import "temporal/v1/temporal.proto";
service Example {
rpc Foo(FooInput) returns (FooOutput) {
option (temporal.v1.workflow) = {};
}
rpc Bar(BarInput) returns (BarOutput) {
option (temporal.v1.activity) = {
schedule_to_close_timeout: { seconds: 30 }
};
}
}
Select
Registers activity completion callback on the given selector.
- Go
- Schema
package main
import (
"context"
"fmt"
examplev1 "path/to/gen/example/v1"
"go.temporal.io/sdk/activity"
"go.temporal.io/sdk/workflow"
)
type (
Workflows struct {}
Activities struct {}
FooWorkflow struct {
*examplev1.FooWorkflowInput
}
)
func (w *Workflows) Foo(ctx workflow.Context, input *examplev1.FooWorkflowInput) (examplev1.FooWorkflow, error) {
return &FooWorkflow{input}, nil
}
func (w *FooWorkflow) Execute(ctx workflow.Context) (*examplev1.FooOutput, error) {
barF, err := examplev1.BarAsync(ctx, &examplev1.BarInput{})
if err != nil {
return nil, fmt.Errorf("error starting Bar activity: %w", err)
}
bazF, err := examplev1.BazAsync(ctx &examplev1.BazInput{})
if err != nil {
return nil, fmt.Errorf("error starting Baz activity: %w", err)
}
sel := workflow.NewSelect(ctx)
barF.Select(sel, func(bar *BarOutput) {
workflow.GetLogger(ctx).Info("Bar activity success", "bar", bar)
})
bazF.Select(sel, func(baz *BazOutput) {
workflow.GetLogger(ctx).Info("Baz activity success", "baz", baz)
})
sel.Select(ctx)
return &examplev1.FooOutput{}, nil
}
func (a *Activities) Bar(ctx context.Context, input *examplev1.BarInput) (*examplev1.BarOutput, error) {
activity.GetLogger(ctx).Info("executing Bar activity", "input", input)
return &examplev1.BarOutput{}, nil
}
func (a *Activities) Baz(ctx context.Context, input *examplev1.BazInput) (*examplev1.BazOutput, error) {
activity.GetLogger(ctx).Info("executing Baz activity", "input", input)
return &examplev1.BazOutput{}, nil
}
syntax="proto3";
package example.v1;
import "temporal/v1/temporal.proto";
service Example {
rpc Foo(FooInput) returns (FooOutput) {
option (temporal.v1.workflow) = {};
}
rpc Bar(BarInput) returns (BarOutput) {
option (temporal.v1.activity) = {
schedule_to_close_timeout: { seconds: 30 }
};
}
rpc Baz(BazInput) returns (BazOutput) {
option (temporal.v1.activity) = {
schedule_to_close_timeout: { seconds: 30 }
};
}
}
Future
In addition to the methods, the underlying Future is available via the public Future
field.
- Go
- Schema
package main
import (
"context"
"fmt"
examplev1 "path/to/gen/example/v1"
"go.temporal.io/sdk/activity"
"go.temporal.io/sdk/workflow"
)
type (
Workflows struct {}
Activities struct {}
FooWorkflow struct {
*examplev1.FooWorkflowInput
}
)
func (w *Workflows) Foo(ctx workflow.Context, input *examplev1.FooWorkflowInput) (examplev1.FooWorkflow, error) {
return &FooWorkflow{input}, nil
}
func (w *FooWorkflow) Execute(ctx workflow.Context) (*examplev1.FooOutput, error) {
barF, err := examplev1.BarAsync(ctx, &examplev1.BarInput{})
if err != nil {
return nil, fmt.Errorf("error starting Bar activity: %w", err)
}
bazF, err := examplev1.BazAsync(ctx &examplev1.BazInput{})
if err != nil {
return nil, fmt.Errorf("error starting Baz activity: %w", err)
}
sel := workflow.NewSelect(ctx).
AddFuture(barF.Future, func(workflow.Future) {
bar, _ := barF.Get(ctx)
workflow.GetLogger(ctx).Info("Bar activity success", "bar", bar)
}).
AddFuture(bazF.Future, func(workflow.Future) {
baz, _ := barF.Get(ctx)
workflow.GetLogger(ctx).Info("Baz activity success", "baz", baz)
}).
Select(ctx)
return &examplev1.FooOutput{}, nil
}
func (a *Activities) Bar(ctx context.Context, input *examplev1.BarInput) (*examplev1.BarOutput, error) {
activity.GetLogger(ctx).Info("executing Bar activity", "input", input)
return &examplev1.BarOutput{}, nil
}
func (a *Activities) Baz(ctx context.Context, input *examplev1.BazInput) (*examplev1.BazOutput, error) {
activity.GetLogger(ctx).Info("executing Baz activity", "input", input)
return &examplev1.BazOutput{}, nil
}
syntax="proto3";
package example.v1;
import "temporal/v1/temporal.proto";
service Example {
rpc Foo(FooInput) returns (FooOutput) {
option (temporal.v1.workflow) = {};
}
rpc Bar(BarInput) returns (BarOutput) {
option (temporal.v1.activity) = {
schedule_to_close_timeout: { seconds: 30 }
};
}
rpc Baz(BazInput) returns (BazOutput) {
option (temporal.v1.activity) = {
schedule_to_close_timeout: { seconds: 30 }
};
}
}
Options
Both synchronous and asynchronous activity helpers accept an optional <Activity>ActivityOptions
value as the final argument. This argument can be used to override the default workflow.ActivityOptions created using the defaults defined in the schema.
WithActivityOptions
Override the initial workflow.ActivityOptions value for an individual invocation. Schema defined defaults will be applied over this value.
- Go
- Schema
func (w *FooWorkflow) Execute(ctx workflow.Context) (*examplev1.FooOutput, error) {
_, err := examplev1.Bar(ctx, &examplev1.BarInput{}, examplev1.NewBarActivityOptions().
WithActivityOptions(workflow.ActivityOptions{
ScheduleToCloseTimeout: time.Minute, // this overrides the 30s default defined in the schema
}),
)
return &examplev1.FooOutput{}, err
}
syntax="proto3";
package example.v1;
import "temporal/v1/temporal.proto";
service Example {
rpc Foo(FooInput) returns (FooOutput) {
option (temporal.v1.workflow) = {};
}
rpc Bar(BarInput) returns (BarOutput) {
option (temporal.v1.activity) = {
schedule_to_close_timeout: { seconds: 30 }
};
}
}
WithHeartbeatTimeout
Set the activity HeartbeatTimeout
value
- Go
- Schema
func (w *FooWorkflow) Execute(ctx workflow.Context) (*examplev1.FooOutput, error) {
_, err := examplev1.Bar(ctx, &examplev1.BarInput{}, examplev1.NewBarActivityOptions().
WithHeartbeatTimeout(time.Second * 20),
)
return &examplev1.FooOutput{}, err
}
syntax="proto3";
package example.v1;
import "temporal/v1/temporal.proto";
service Example {
rpc Foo(FooInput) returns (FooOutput) {
option (temporal.v1.workflow) = {};
}
rpc Bar(BarInput) returns (BarOutput) {
option (temporal.v1.activity) = {
heartbeat_timeout: { seconds: 30 }
};
}
}
WithRetryPolicy
Set the activity RetryPolicy
value
- Go
- Schema
func (w *FooWorkflow) Execute(ctx workflow.Context) (*examplev1.FooOutput, error) {
_, err := examplev1.Bar(ctx, &examplev1.BarInput{}, examplev1.NewBarActivityOptions().
WithRetryPolicy(&temporal.RetryPolicy{
MaximumAttempts: 3,
}),
)
return &examplev1.FooOutput{}, err
}
syntax="proto3";
package example.v1;
import "temporal/v1/temporal.proto";
service Example {
rpc Foo(FooInput) returns (FooOutput) {
option (temporal.v1.workflow) = {};
}
rpc Bar(BarInput) returns (BarOutput) {
option (temporal.v1.activity) = {
retry_policy: {
max_attempts: 5
}
};
}
}
WithScheduleToCloseTimeout
Set the activity ScheduleToCloseTimeout
value
- Go
- Schema
func (w *FooWorkflow) Execute(ctx workflow.Context) (*examplev1.FooOutput, error) {
_, err := examplev1.Bar(ctx, &examplev1.BarInput{}, examplev1.NewBarActivityOptions().
WithScheduleToCloseTimeout(time.Second * 240), // 4m
)
return &examplev1.FooOutput{}, err
}
syntax="proto3";
package example.v1;
import "temporal/v1/temporal.proto";
service Example {
rpc Foo(FooInput) returns (FooOutput) {
option (temporal.v1.workflow) = {};
}
rpc Bar(BarInput) returns (BarOutput) {
option (temporal.v1.activity) = {
schedule_to_close_timeout: { seconds: 300 } // 5m
};
}
}
WithScheduleToStartTimeout
Set the activity ScheduleToStartTimeout
value
- Go
- Schema
func (w *FooWorkflow) Execute(ctx workflow.Context) (*examplev1.FooOutput, error) {
_, err := examplev1.Bar(ctx, &examplev1.BarInput{}, examplev1.NewBarActivityOptions().
WithScheduleToStartTimeout(time.Second * 10),
)
return &examplev1.FooOutput{}, err
}
syntax="proto3";
package example.v1;
import "temporal/v1/temporal.proto";
service Example {
rpc Foo(FooInput) returns (FooOutput) {
option (temporal.v1.workflow) = {};
}
rpc Bar(BarInput) returns (BarOutput) {
option (temporal.v1.activity) = {
schedule_to_start_timeout: { seconds: 5 }
};
}
}
WithStartToCloseTimeout
Set the activity StartToCloseTimeout
value
- Go
- Schema
func (w *FooWorkflow) Execute(ctx workflow.Context) (*examplev1.FooOutput, error) {
_, err := examplev1.Bar(ctx, &examplev1.BarInput{}, examplev1.NewBarActivityOptions().
WithStartToCloseTimeout(time.Second * 90),
)
return &examplev1.FooOutput{}, err
}
syntax="proto3";
package example.v1;
import "temporal/v1/temporal.proto";
service Example {
rpc Foo(FooInput) returns (FooOutput) {
option (temporal.v1.workflow) = {};
}
rpc Bar(BarInput) returns (BarOutput) {
option (temporal.v1.activity) = {
start_to_close_timeout: { seconds: 60 }
};
}
}
WithTaskQueue
Set the activity TaskQueue
value
- Go
- Schema
func (w *FooWorkflow) Execute(ctx workflow.Context) (*examplev1.FooOutput, error) {
_, err := examplev1.Bar(ctx, &examplev1.BarInput{}, examplev1.NewBarActivityOptions().
WithTaskQueue("example-v3"),
)
return &examplev1.FooOutput{}, err
}
syntax="proto3";
package example.v1;
import "temporal/v1/temporal.proto";
service Example {
option (temporal.v1.service).task_queue = "example-v1";
rpc Foo(FooInput) returns (FooOutput) {
option (temporal.v1.workflow) = {};
}
rpc Bar(BarInput) returns (BarOutput) {
option (temporal.v1.activity) = {
task_queue: "example-v2"
};
}
}
WithWaitForCancellation
Set the activity WaitForCancellation
value
- Go
- Schema
func (w *FooWorkflow) Execute(ctx workflow.Context) (*examplev1.FooOutput, error) {
_, err := examplev1.Bar(ctx, &examplev1.BarInput{}, examplev1.NewBarActivityOptions().
WithWaitForCancellation(false),
)
return &examplev1.FooOutput{}, err
}
syntax="proto3";
package example.v1;
import "temporal/v1/temporal.proto";
service Example {
rpc Foo(FooInput) returns (FooOutput) {
option (temporal.v1.workflow) = {};
}
rpc Bar(BarInput) returns (BarOutput) {
option (temporal.v1.activity) = {
wait_for_cancellation: true
};
}
}
Local Options
Both synchronous and asynchronous local activity helpers accept an optional <Activity>LocalActivityOptions
value as the final argument. This argument can be used to override the default workflow.LocalActivityOptions created using the defaults defined in the schema.
Local
Override the local activity with a caller-defined implementation.
- Go
- Schema
func (w *FooWorkflow) Execute(ctx workflow.Context) (*examplev1.FooOutput, error) {
_, err := examplev1.BarLocal(ctx, &examplev1.BarInput{}, examplev1.NewBarLocalActivityOptions().
Local(func(ctx context.Context, input *examplev1.BarInput) (*examplev1.BarOutput, error) {
activity.GetLogger(ctx).Info("executing custom Bar activity", "input", input)
return &examplev1.BarOutput{}, nil
}),
)
return &examplev1.FooOutput{}, err
}
syntax="proto3";
package example.v1;
import "temporal/v1/temporal.proto";
service Example {
rpc Foo(FooInput) returns (FooOutput) {
option (temporal.v1.workflow) = {};
}
rpc Bar(BarInput) returns (BarOutput) {
option (temporal.v1.activity) = {
schedule_to_close_timeout: { seconds: 30 }
};
}
}
WithLocalActivityOptions
Override the initial workflow.LocalActivityOptions value for an individual invocation. Schema defined defaults will be applied over this value.
- Go
- Schema
func (w *FooWorkflow) Execute(ctx workflow.Context) (*examplev1.FooOutput, error) {
_, err := examplev1.BarLocal(ctx, &examplev1.BarInput{}, examplev1.NewBarLocalActivityOptions().
WithLocalActivityOptions(workflow.LocalActivityOptions{
ScheduleToCloseTimeout: time.Minute, // this overrides the 30s default defined in the schema
}),
)
return &examplev1.FooOutput{}, err
}
syntax="proto3";
package example.v1;
import "temporal/v1/temporal.proto";
service Example {
rpc Foo(FooInput) returns (FooOutput) {
option (temporal.v1.workflow) = {};
}
rpc Bar(BarInput) returns (BarOutput) {
option (temporal.v1.activity) = {
schedule_to_close_timeout: { seconds: 30 }
};
}
}
WithRetryPolicy
Set the activity RetryPolicy
value
- Go
- Schema
func (w *FooWorkflow) Execute(ctx workflow.Context) (*examplev1.FooOutput, error) {
_, err := examplev1.BarLocal(ctx, &examplev1.BarInput{}, examplev1.NewBarLocalActivityOptions().
WithRetryPolicy(&temporal.RetryPolicy{
MaximumAttempts: 3,
}),
)
return &examplev1.FooOutput{}, err
}
syntax="proto3";
package example.v1;
import "temporal/v1/temporal.proto";
service Example {
rpc Foo(FooInput) returns (FooOutput) {
option (temporal.v1.workflow) = {};
}
rpc Bar(BarInput) returns (BarOutput) {
option (temporal.v1.activity) = {
retry_policy: {
max_attempts: 5
}
};
}
}
WithScheduleToClose
Set the activity ScheduleToClose
value
- Go
- Schema
func (w *FooWorkflow) Execute(ctx workflow.Context) (*examplev1.FooOutput, error) {
_, err := examplev1.BarLocal(ctx, &examplev1.BarInput{}, examplev1.NewBarLocalActivityOptions().
WithScheduleToClose(time.Second * 240),
)
return &examplev1.FooOutput{}, err
}
syntax="proto3";
package example.v1;
import "temporal/v1/temporal.proto";
service Example {
rpc Foo(FooInput) returns (FooOutput) {
option (temporal.v1.workflow) = {};
}
rpc Bar(BarInput) returns (BarOutput) {
option (temporal.v1.activity) = {
schedule_to_close_timeout: { seconds: 300 }
};
}
}
WithStartToCloseTimeout
Set the activity StartToCloseTimeout
value
- Go
- Schema
func (w *FooWorkflow) Execute(ctx workflow.Context) (*examplev1.FooOutput, error) {
_, err := examplev1.BarLocal(ctx, &examplev1.BarInput{}, examplev1.NewBarLocalActivityOptions().
WithStartToCloseTimeout(time.Second * 90),
)
return &examplev1.FooOutput{}, err
}
syntax="proto3";
package example.v1;
import "temporal/v1/temporal.proto";
service Example {
rpc Foo(FooInput) returns (FooOutput) {
option (temporal.v1.workflow) = {};
}
rpc Bar(BarInput) returns (BarOutput) {
option (temporal.v1.activity) = {
start_to_close_timeout: { seconds: 60 }
};
}
}