Updatable Timer
A simple example inspired by temporalio/samples-go/updatabletimer
example.proto
syntax="proto3";
package example.updatabletimer.v1;
import "google/protobuf/empty.proto";
import "google/protobuf/timestamp.proto";
import "temporal/v1/temporal.proto";
service Example {
option (temporal.v1.service) = {
task_queue: "updatable-timer"
};
// GetWakeUpTime retrieves the current timer expiration timestamp
rpc GetWakeUpTime(google.protobuf.Empty) returns (GetWakeUpTimeOutput) {
option (temporal.v1.query) = {};
}
// UpdatableTimer describes an updatable timer workflow
rpc UpdatableTimer(UpdatableTimerInput) returns (google.protobuf.Empty) {
option (temporal.v1.workflow) = {
name: "UpdatableTimer"
id: 'updatable-timer/${! name.or(uuid_v4()) }'
query: { ref: "GetWakeUpTime" }
signal: { ref: "UpdateWakeUpTime" }
};
}
// UpdateWakeUpTime updates the timer expiration timestamp
rpc UpdateWakeUpTime(UpdateWakeUpTimeInput) returns (google.protobuf.Empty) {
option (temporal.v1.signal) = {};
}
}
// GetWakeUpTimeOutput describes the input to a GetWakeUpTime query
message GetWakeUpTimeOutput {
google.protobuf.Timestamp wake_up_time = 1;
}
// UpdatableTimerInput describes the input to a UpdatableTimer workflow
message UpdatableTimerInput {
google.protobuf.Timestamp initial_wake_up_time = 1;
string name = 2;
}
// UpdateWakeUpTimeInput describes the input to a UpdateWakeUpTime signal
message UpdateWakeUpTimeInput {
google.protobuf.Timestamp wake_up_time = 1;
}
main.go
package main
import (
"log"
"os"
updatabletimerv1 "github.com/cludden/protoc-gen-go-temporal/gen/example/updatabletimer/v1"
"github.com/urfave/cli/v2"
"go.temporal.io/sdk/client"
tlog "go.temporal.io/sdk/log"
"go.temporal.io/sdk/worker"
"go.temporal.io/sdk/workflow"
"google.golang.org/protobuf/types/known/timestamppb"
)
// UpdatableTimerWorkflow provides a updatabletimerv1.UpdatableTimerWorkflow implementation
type UpdatableTimerWorkflow struct {
*updatabletimerv1.UpdatableTimerWorkflowInput
log tlog.Logger
wakeUpTime *timestamppb.Timestamp
}
// NewUpdatableTimerWorkflow initializes a new updatabletimerv1.UpdatableTimerWorkflow value
func NewUpdatableTimerWorkflow(ctx workflow.Context, input *updatabletimerv1.UpdatableTimerWorkflowInput) (updatabletimerv1.UpdatableTimerWorkflow, error) {
return &UpdatableTimerWorkflow{input, workflow.GetLogger(ctx), input.Req.GetInitialWakeUpTime()}, nil
}
// Execute defines the entrypoint to a UpdatableTimer workflow
func (w *UpdatableTimerWorkflow) Execute(ctx workflow.Context) error {
for timerFired := false; !timerFired && ctx.Err() == nil; {
timerCtx, cancelTimer := workflow.WithCancel(ctx)
timer := workflow.NewTimer(timerCtx, w.wakeUpTime.AsTime().Sub(workflow.Now(ctx)))
w.log.Info("SleepUntil", "WakeUpTime", w.wakeUpTime)
workflow.NewSelector(ctx).
AddFuture(timer, func(f workflow.Future) {
if err := f.Get(timerCtx, nil); err != nil {
w.log.Info("Timer canceled")
} else {
w.log.Info("Timer fired")
timerFired = true
}
}).
AddReceive(w.UpdateWakeUpTime.Channel, func(workflow.ReceiveChannel, bool) {
defer cancelTimer()
w.wakeUpTime = w.UpdateWakeUpTime.ReceiveAsync().GetWakeUpTime()
w.log.Info("WakeUpTime updated", "WakeUpTime", w.wakeUpTime)
}).
Select(ctx)
}
return ctx.Err()
}
// GetWakeUpTime defines the entrypoint to a GetWakeUpTime query
func (w *UpdatableTimerWorkflow) GetWakeUpTime() (*updatabletimerv1.GetWakeUpTimeOutput, error) {
return &updatabletimerv1.GetWakeUpTimeOutput{WakeUpTime: w.wakeUpTime}, nil
}
func main() {
app, err := updatabletimerv1.NewExampleCli(
updatabletimerv1.NewExampleCliOptions().WithWorker(func(cmd *cli.Context, c client.Client) (worker.Worker, error) {
w := worker.New(c, updatabletimerv1.ExampleTaskQueue, worker.Options{})
updatabletimerv1.RegisterUpdatableTimerWorkflow(w, NewUpdatableTimerWorkflow)
return w, nil
}),
)
if err != nil {
log.Fatal(err)
}
if err := app.Run(os.Args); err != nil {
log.Fatal(err)
}
}
Run this example
-
Clone the examples
git clone https://github.com/cludden/protoc-gen-go-temporal && cd protoc-gen-go-temporal
-
Run a local Temporal server
temporal server start-dev
-
Run the example worker
go run examples/updatabletimer/main.go worker
-
Initialize an
UpdatableTimer
workflow with an expiration1h
in the futurego run examples/updatabletimer/main.go updatable-timer \
--name example \
--initial-wake-up-time $(TZ=UTC date -v+1H "+%Y-%m-%dT%H:%M:%SZ") \
-d -
Query the
UpdatableTimer
workflowgo run examples/updatabletimer/main.go get-wake-up-time -w updatable-timer/example
-
Update the timer expiration to
30s
in the futurego run examples/updatabletimer/main.go update-wake-up-time \
-w updatable-timer/example \
--wake-up-time $(TZ=UTC date -v+30S "+%Y-%m-%dT%H:%M:%SZ")