Skip to main content

Updatable Timer

A simple example inspired by temporalio/samples-go/updatabletimer

example.proto
syntax="proto3";

package example.updatabletimer.v1;

import "google/protobuf/empty.proto";
import "google/protobuf/timestamp.proto";
import "temporal/v1/temporal.proto";

service Example {
option (temporal.v1.service) = {
task_queue: "updatable-timer"
};

// GetWakeUpTime retrieves the current timer expiration timestamp
rpc GetWakeUpTime(google.protobuf.Empty) returns (GetWakeUpTimeOutput) {
option (temporal.v1.query) = {};
}

// UpdatableTimer describes an updatable timer workflow
rpc UpdatableTimer(UpdatableTimerInput) returns (google.protobuf.Empty) {
option (temporal.v1.workflow) = {
name: "UpdatableTimer"
id: 'updatable-timer/${! name.or(uuid_v4()) }'
query: { ref: "GetWakeUpTime" }
signal: { ref: "UpdateWakeUpTime" }
};
}

// UpdateWakeUpTime updates the timer expiration timestamp
rpc UpdateWakeUpTime(UpdateWakeUpTimeInput) returns (google.protobuf.Empty) {
option (temporal.v1.signal) = {};
}
}

// GetWakeUpTimeOutput describes the input to a GetWakeUpTime query
message GetWakeUpTimeOutput {
google.protobuf.Timestamp wake_up_time = 1;
}

// UpdatableTimerInput describes the input to a UpdatableTimer workflow
message UpdatableTimerInput {
google.protobuf.Timestamp initial_wake_up_time = 1;
string name = 2;
}

// UpdateWakeUpTimeInput describes the input to a UpdateWakeUpTime signal
message UpdateWakeUpTimeInput {
google.protobuf.Timestamp wake_up_time = 1;
}
main.go
package main

import (
"log"
"os"

updatabletimerv1 "github.com/cludden/protoc-gen-go-temporal/gen/example/updatabletimer/v1"
"github.com/urfave/cli/v2"
"go.temporal.io/sdk/client"
tlog "go.temporal.io/sdk/log"
"go.temporal.io/sdk/worker"
"go.temporal.io/sdk/workflow"
"google.golang.org/protobuf/types/known/timestamppb"
)

// UpdatableTimerWorkflow provides a updatabletimerv1.UpdatableTimerWorkflow implementation
type UpdatableTimerWorkflow struct {
*updatabletimerv1.UpdatableTimerWorkflowInput
log tlog.Logger
wakeUpTime *timestamppb.Timestamp
}

// NewUpdatableTimerWorkflow initializes a new updatabletimerv1.UpdatableTimerWorkflow value
func NewUpdatableTimerWorkflow(ctx workflow.Context, input *updatabletimerv1.UpdatableTimerWorkflowInput) (updatabletimerv1.UpdatableTimerWorkflow, error) {
return &UpdatableTimerWorkflow{input, workflow.GetLogger(ctx), input.Req.GetInitialWakeUpTime()}, nil
}

// Execute defines the entrypoint to a UpdatableTimer workflow
func (w *UpdatableTimerWorkflow) Execute(ctx workflow.Context) error {
for timerFired := false; !timerFired && ctx.Err() == nil; {
timerCtx, cancelTimer := workflow.WithCancel(ctx)
timer := workflow.NewTimer(timerCtx, w.wakeUpTime.AsTime().Sub(workflow.Now(ctx)))
w.log.Info("SleepUntil", "WakeUpTime", w.wakeUpTime)

workflow.NewSelector(ctx).
AddFuture(timer, func(f workflow.Future) {
if err := f.Get(timerCtx, nil); err != nil {
w.log.Info("Timer canceled")
} else {
w.log.Info("Timer fired")
timerFired = true
}
}).
AddReceive(w.UpdateWakeUpTime.Channel, func(workflow.ReceiveChannel, bool) {
defer cancelTimer()
w.wakeUpTime = w.UpdateWakeUpTime.ReceiveAsync().GetWakeUpTime()
w.log.Info("WakeUpTime updated", "WakeUpTime", w.wakeUpTime)
}).
Select(ctx)
}
return ctx.Err()
}

// GetWakeUpTime defines the entrypoint to a GetWakeUpTime query
func (w *UpdatableTimerWorkflow) GetWakeUpTime() (*updatabletimerv1.GetWakeUpTimeOutput, error) {
return &updatabletimerv1.GetWakeUpTimeOutput{WakeUpTime: w.wakeUpTime}, nil
}

func main() {
app, err := updatabletimerv1.NewExampleCli(
updatabletimerv1.NewExampleCliOptions().WithWorker(func(cmd *cli.Context, c client.Client) (worker.Worker, error) {
w := worker.New(c, updatabletimerv1.ExampleTaskQueue, worker.Options{})
updatabletimerv1.RegisterUpdatableTimerWorkflow(w, NewUpdatableTimerWorkflow)
return w, nil
}),
)
if err != nil {
log.Fatal(err)
}

if err := app.Run(os.Args); err != nil {
log.Fatal(err)
}
}

Run this example

  1. Clone the examples

    git clone https://github.com/cludden/protoc-gen-go-temporal && cd protoc-gen-go-temporal
  2. Run a local Temporal server

    temporal server start-dev
  3. Run the example worker

    go run examples/updatabletimer/main.go worker
  4. Initialize an UpdatableTimer workflow with an expiration 1h in the future

    go run examples/updatabletimer/main.go updatable-timer \
    --name example \
    --initial-wake-up-time $(TZ=UTC date -v+1H "+%Y-%m-%dT%H:%M:%SZ") \
    -d
  5. Query the UpdatableTimer workflow

    go run examples/updatabletimer/main.go get-wake-up-time -w updatable-timer/example
  6. Update the timer expiration to 30s in the future

    go run examples/updatabletimer/main.go update-wake-up-time \
    -w updatable-timer/example \
    --wake-up-time $(TZ=UTC date -v+30S "+%Y-%m-%dT%H:%M:%SZ")