Mutex
A simple example inspired by temporalio/samples-go/mutex
example.proto
syntax = "proto3";
package example.mutex.v1;
import "google/protobuf/duration.proto";
import "google/protobuf/empty.proto";
import "temporal/v1/temporal.proto";
service Example {
option (temporal.v1.service) = {task_queue: "mutex"};
// AcquireLock requests a lock on a resource identified by `resource_id`
// and blocks until the lock is acquired, returning a `lease_id` that
// can be used to release the lock.
rpc AcquireLock(AcquireLockInput) returns (AcquireLockOutput) {
option (temporal.v1.update) = {name: "mutex.v1.AcquireLock"};
}
// Mutex is a workflow that manages concurrent access to a resource
// identified by `resource_id`.
rpc Mutex(MutexInput) returns (google.protobuf.Empty) {
option (temporal.v1.workflow) = {
name: "mutex.v1.Mutex"
id: 'mutex:${! resourceId }'
retry_policy: {
initial_interval: {seconds: 1}
backoff_coefficient: 2.0
max_interval: {seconds: 60}
max_attempts: 5
}
update: {
ref: 'AcquireLock'
start: true
workflow_id_conflict_policy: WORKFLOW_ID_CONFLICT_POLICY_USE_EXISTING
}
signal: {ref: 'ReleaseLock'}
};
option (temporal.v1.activity) = {
start_to_close_timeout: {seconds: 10}
};
}
// ReleaseLock releases a lock on a resource identified by `lease_id`.
rpc ReleaseLock(ReleaseLockInput) returns (google.protobuf.Empty) {
option (temporal.v1.signal) = {name: "mutex.v1.ReleaseLock"};
}
// SampleWorkflowWithMutex is a sample workflow that demonstrates how to
// use the Mutex service.
rpc SampleWorkflowWithMutex(SampleWorkflowWithMutexInput) returns (google.protobuf.Empty) {
option (temporal.v1.workflow) = {id: 'SampleWorkflow1WithMutex_${! uuid_v4() }'};
}
}
message MutexInput {
string resource_id = 1;
}
message AcquireLockInput {
google.protobuf.Duration timeout = 1;
}
message AcquireLockOutput {
string lease_id = 1;
}
message ReleaseLockInput {
string lease_id = 1;
}
message SampleWorkflowWithMutexInput {
string resource_id = 1;
google.protobuf.Duration sleep = 2;
}
main.go
package main
import (
"cmp"
"fmt"
"log"
"os"
"time"
mutexv1 "github.com/cludden/protoc-gen-go-temporal/gen/example/mutex/v1"
"github.com/cludden/protoc-gen-go-temporal/gen/example/mutex/v1/mutexv1xns"
"github.com/google/uuid"
"github.com/urfave/cli/v2"
"go.temporal.io/sdk/client"
tlog "go.temporal.io/sdk/log"
"go.temporal.io/sdk/worker"
"go.temporal.io/sdk/workflow"
"google.golang.org/protobuf/types/known/durationpb"
)
type (
Workflows struct{}
MutexWorkflow struct {
*mutexv1.MutexWorkflowInput
log tlog.Logger
requests []*leaseRequest
}
leaseRequest struct {
input *mutexv1.AcquireLockInput
f workflow.Settable
}
)
func (w *Workflows) Mutex(
ctx workflow.Context,
input *mutexv1.MutexWorkflowInput,
) (mutexv1.MutexWorkflow, error) {
return &MutexWorkflow{
input,
workflow.GetLogger(ctx),
make([]*leaseRequest, 0),
}, nil
}
func (w *MutexWorkflow) Execute(ctx workflow.Context) error {
var req *leaseRequest
for len(w.requests) > 0 {
if req, w.requests = w.requests[0], w.requests[1:]; req == nil {
w.log.Info("no more lock requests")
return nil
}
var leaseID string
if err := workflow.SideEffect(ctx, func(ctx workflow.Context) any {
return uuid.NewString()
}).Get(&leaseID); err != nil {
req.f.SetError(fmt.Errorf("error generating lease id: %w", err))
continue
}
req.f.SetValue(leaseID)
timerCtx, cancelTimer := workflow.WithCancel(ctx)
for done := false; !done; {
timer := workflow.NewTimer(timerCtx, req.input.GetTimeout().AsDuration())
workflow.NewSelector(ctx).
AddFuture(timer, func(workflow.Future) {
w.log.Info("lease expired")
done = true
}).
AddReceive(w.ReleaseLock.Channel, func(workflow.ReceiveChannel, bool) {
if r := w.ReleaseLock.ReceiveAsync(); r.GetLeaseId() == leaseID {
cancelTimer()
done = true
}
}).
Select(ctx)
}
}
return nil
}
func (w *MutexWorkflow) AcquireLock(
ctx workflow.Context,
input *mutexv1.AcquireLockInput,
) (*mutexv1.AcquireLockOutput, error) {
f, set := workflow.NewFuture(ctx)
w.requests = append(w.requests, &leaseRequest{
input: input,
f: set,
})
var leaseID string
if err := f.Get(ctx, &leaseID); err != nil {
return nil, fmt.Errorf("error waiting for lease id: %w", err)
}
return &mutexv1.AcquireLockOutput{LeaseId: leaseID}, nil
}
type SampleWorkflowWithMutexWorkflow struct {
*mutexv1.SampleWorkflowWithMutexWorkflowInput
log tlog.Logger
}
func (w *Workflows) SampleWorkflowWithMutex(
ctx workflow.Context,
input *mutexv1.SampleWorkflowWithMutexWorkflowInput,
) (mutexv1.SampleWorkflowWithMutexWorkflow, error) {
return &SampleWorkflowWithMutexWorkflow{input, workflow.GetLogger(ctx)}, nil
}
func (w *SampleWorkflowWithMutexWorkflow) Execute(ctx workflow.Context) error {
w.log.Info("started", "resourceID", w.Req.GetResourceId())
lease, mutex, err := mutexv1xns.MutexWithAcquireLock(
ctx,
&mutexv1.MutexInput{ResourceId: w.Req.GetResourceId()},
&mutexv1.AcquireLockInput{
Timeout: durationpb.New(time.Minute * 10),
},
)
if err != nil {
return err
}
defer func() {
if err := mutex.ReleaseLock(ctx, &mutexv1.ReleaseLockInput{
LeaseId: lease.GetLeaseId(),
}); err != nil {
w.log.Error("failed to release lock", "error", err)
}
}()
w.log.Info("resource lock acquired", "leaseID", lease.GetLeaseId())
w.log.Info("critical operation started")
err = workflow.Sleep(ctx, cmp.Or(w.Req.GetSleep().AsDuration(), time.Second*10))
w.log.Info("critical operation finished")
return err
}
func main() {
app, err := mutexv1.NewExampleCli(
mutexv1.NewExampleCliOptions().
WithWorker(func(cmd *cli.Context, c client.Client) (worker.Worker, error) {
w := worker.New(c, mutexv1.ExampleTaskQueue, worker.Options{})
mutexv1.RegisterExampleWorkflows(w, &Workflows{})
mutexv1xns.RegisterExampleActivities(w, mutexv1.NewExampleClient(c))
return w, nil
}),
)
if err != nil {
log.Fatal(err)
}
if err := app.Run(os.Args); err != nil {
log.Fatal(err)
}
}
Run this example
- Clone the examples
git clone https://github.com/cludden/protoc-gen-go-temporal && cd protoc-gen-go-temporal
- Run a local Temporal server
temporal server start-dev
- In a different shell, run the example worker
go run examples/mutex/main.go worker
- In a different shell, execute two or more workflows with the same resource-id
go run examples/mutex/main.go sample-workflow-with-mutex --resource-id foo -d
go run examples/mutex/main.go sample-workflow-with-mutex --resource-id foo -d