Skip to main content


A simple example inspired by temporalio/samples-go/mutex

syntax = "proto3";

package example.mutex.v1;

import "google/protobuf/duration.proto";
import "google/protobuf/empty.proto";
import "temporal/v1/temporal.proto";

service Example {
option (temporal.v1.service) = {task_queue: "mutex"};

rpc AcquireLock(AcquireLockInput) returns (google.protobuf.Empty) {
option (temporal.v1.signal) = {};

rpc LockAcquired(LockAcquiredInput) returns (google.protobuf.Empty) {
option (temporal.v1.signal) = {};

rpc Mutex(MutexInput) returns (google.protobuf.Empty) {
option (temporal.v1.workflow) = {
id: 'mutex:${! resourceId }'
retry_policy: {
initial_interval: {seconds: 1}
backoff_coefficient: 2.0
max_interval: {seconds: 60}
max_attempts: 5
signal: {
ref: 'AcquireLock'
start: true
signal: {ref: 'ReleaseLock'}
option (temporal.v1.activity) = {
start_to_close_timeout: {seconds: 10}

rpc ReleaseLock(ReleaseLockInput) returns (google.protobuf.Empty) {
option (temporal.v1.signal) = {};

rpc SampleWorkflowWithMutex(SampleWorkflowWithMutexInput) returns (google.protobuf.Empty) {
option (temporal.v1.workflow) = {
id: 'SampleWorkflow1WithMutex_${! uuid_v4() }'
signal: {ref: 'LockAcquired'}

message MutexInput {
string resource_id = 1;

message AcquireLockInput {
string workflow_id = 1;
google.protobuf.Duration timeout = 2;

message LockAcquiredInput {
string lease_id = 1;

message ReleaseLockInput {
string lease_id = 1;

message SampleWorkflowWithMutexInput {
string resource_id = 1;
google.protobuf.Duration sleep = 2;
package main

import (

mutexv1 ""
tlog ""

type Workflows struct{}

type MutexWorkflow struct {
log tlog.Logger

func (w *Workflows) Mutex(ctx workflow.Context, input *mutexv1.MutexWorkflowInput) (mutexv1.MutexWorkflow, error) {
return &MutexWorkflow{input, workflow.GetLogger(ctx)}, nil

func (w *MutexWorkflow) Execute(ctx workflow.Context) error {
for {
req := w.AcquireLock.ReceiveAsync()
if req == nil {
w.log.Info("no more signals")
return nil

var leaseID string
if err := workflow.SideEffect(ctx, func(ctx workflow.Context) any {
return uuid.NewString()
}).Get(&leaseID); err != nil {
return fmt.Errorf("error generating lease id: %w", err)

if err := mutexv1.LockAcquiredExternal(ctx, req.GetWorkflowId(), "", &mutexv1.LockAcquiredInput{
LeaseId: leaseID,
}); err != nil {
return fmt.Errorf("error signaling lock acquired: %w", err)

timerCtx, cancelTimer := workflow.WithCancel(ctx)
for done := false; !done; {
AddFuture(workflow.NewTimer(timerCtx, req.GetTimeout().AsDuration()), func(workflow.Future) {
w.log.Info("lease expired")
done = true
AddReceive(w.ReleaseLock.Channel, func(workflow.ReceiveChannel, bool) {
if release := w.ReleaseLock.ReceiveAsync(); release.GetLeaseId() == leaseID {
done = true

type SampleWorkflowWithMutexWorkflow struct {
log tlog.Logger

func (w *Workflows) SampleWorkflowWithMutex(ctx workflow.Context, input *mutexv1.SampleWorkflowWithMutexWorkflowInput) (mutexv1.SampleWorkflowWithMutexWorkflow, error) {
return &SampleWorkflowWithMutexWorkflow{input, workflow.GetLogger(ctx)}, nil

func (w *SampleWorkflowWithMutexWorkflow) Execute(ctx workflow.Context) error {
w.log.Info("started", "resourceID", w.Req.GetResourceId())
mutex, err := mutexv1xns.MutexWithAcquireLockAsync(
&mutexv1.MutexInput{ResourceId: w.Req.GetResourceId()},
WorkflowId: workflow.GetInfo(ctx).WorkflowExecution.ID,
Timeout: durationpb.New(time.Minute * 10),
if err != nil {
return err

lease, _ := w.LockAcquired.Receive(ctx)
defer func() {
if err := mutex.ReleaseLock(ctx, &mutexv1.ReleaseLockInput{LeaseId: lease.GetLeaseId()}); err != nil {
w.log.Error("failed to release lock", "error", err)
w.log.Info("resource lock acquired", "leaseID", lease.GetLeaseId())

w.log.Info("critical operation started")
d := w.Req.GetSleep().AsDuration()
if d == 0 {
d = time.Second * 10
err = workflow.Sleep(ctx, d)
w.log.Info("critical operation finished")
return err

func main() {
app, err := mutexv1.NewExampleCli(
mutexv1.NewExampleCliOptions().WithWorker(func(cmd *cli.Context, c client.Client) (worker.Worker, error) {
w := worker.New(c, mutexv1.ExampleTaskQueue, worker.Options{})
mutexv1.RegisterExampleWorkflows(w, &Workflows{})
mutexv1xns.RegisterExampleActivities(w, mutexv1.NewExampleClient(c))
return w, nil
if err != nil {

if err := app.Run(os.Args); err != nil {

Run this example

  1. Clone the examples
    git clone && cd protoc-gen-go-temporal
  2. Run a local Temporal server
    temporal server start-dev
  3. In a different shell, run the example worker
    go run examples/mutex/main.go worker
  4. In a different shell, execute two or more workflows with the same resource-id
    go run examples/mutex/main.go sample-workflow-with-mutex --resource-id foo -d
    go run examples/mutex/main.go sample-workflow-with-mutex --resource-id foo -d