Shopping Cart
A stateful shopping cart example demonstrating workflows with queries, signals, updates, and cross-namespace activities.
This example showcases:
- Stateful workflows that maintain cart state across updates
- Queries to inspect current cart contents
- Signals to trigger checkout
- Updates to modify cart contents with validation
- Cross-namespace execution with update-with-start patterns
- Continue-as-new handling for long-running workflows
syntax = "proto3";
package example.shoppingcart.v1;
import "google/protobuf/empty.proto";
import "temporal/v1/temporal.proto";
service ShoppingCart {
option (temporal.v1.service) = {task_queue: "example-shoppingcart-v1"};
rpc ShoppingCart(ShoppingCartInput) returns (ShoppingCartOutput) {
option (temporal.v1.workflow) = {
name: "example.shoppingcart.v1.ShoppingCart"
id: 'example.shoppingcart.v1.ShoppingCart/${! nanoid() }'
query: {ref: "Describe"}
signal: {ref: "Checkout"}
update: {
ref: "UpdateCart"
start: true
xns: {
heartbeat_interval: {seconds: 10}
parent_close_policy: PARENT_CLOSE_POLICY_REQUEST_CANCEL
}
workflow_id_conflict_policy: WORKFLOW_ID_CONFLICT_POLICY_USE_EXISTING
}
};
}
rpc Describe(DescribeInput) returns (DescribeOutput) {
option (temporal.v1.query) = {};
}
rpc UpdateCart(UpdateCartInput) returns (UpdateCartOutput) {
option (temporal.v1.update) = {
id: 'example.shoppingcart.v1.UpdateCart/${! nanoid() }'
validate: true
};
}
rpc Checkout(CheckoutInput) returns (google.protobuf.Empty) {
option (temporal.v1.signal) = {};
}
}
message CartState {
map<string, int32> items = 1; // item_id -> quantity
}
message CheckoutInput {}
message DescribeInput {}
message DescribeOutput {
CartState cart = 1;
}
message ShoppingCartInput {
CartState cart = 1;
}
message ShoppingCartOutput {
CartState cart = 1;
}
message UpdateCartInput {
UpdateCartAction action = 1;
string item_id = 2; // item_id to add or remove
}
message UpdateCartOutput {
CartState cart = 1;
}
enum UpdateCartAction {
UPDATE_CART_ACTION_UNSPECIFIED = 0;
UPDATE_CART_ACTION_ADD = 1;
UPDATE_CART_ACTION_REMOVE = 2;
}
package shoppingcart
import (
shoppingcartv1 "github.com/cludden/protoc-gen-go-temporal/gen/example/shoppingcart/v1"
"go.temporal.io/sdk/temporal"
"go.temporal.io/sdk/workflow"
"google.golang.org/protobuf/proto"
)
type (
Workflows struct{}
ShoppingCartWorkflow struct {
*Workflows
shoppingcartv1.ShoppingCartWorkflowInput
cart *shoppingcartv1.CartState
}
)
func (w *Workflows) ShoppingCart(
ctx workflow.Context,
input *shoppingcartv1.ShoppingCartWorkflowInput,
) (shoppingcartv1.ShoppingCartWorkflow, error) {
cart := input.Req.GetCart()
if cart.GetItems() == nil {
cart = &shoppingcartv1.CartState{
Items: make(map[string]int32),
}
}
return &ShoppingCartWorkflow{
Workflows: w,
ShoppingCartWorkflowInput: *input,
cart: cart,
}, nil
}
func (w *ShoppingCartWorkflow) Execute(
ctx workflow.Context,
) (*shoppingcartv1.ShoppingCartOutput, error) {
if err := workflow.Await(ctx, func() bool {
if workflow.GetInfo(ctx).GetContinueAsNewSuggested() {
return true
}
if checkout := w.Checkout.ReceiveAsync(); checkout != nil {
return true
}
return false
}); err != nil {
return nil, err
}
if workflow.GetInfo(ctx).GetContinueAsNewSuggested() {
if err := workflow.Await(ctx, func() bool {
return workflow.AllHandlersFinished(ctx)
}); err != nil {
return nil, err
}
return nil, workflow.NewContinueAsNewError(
ctx,
shoppingcartv1.ShoppingCartWorkflowName,
&shoppingcartv1.ShoppingCartInput{
Cart: w.cart,
},
)
}
return &shoppingcartv1.ShoppingCartOutput{
Cart: w.cart,
}, nil
}
func (w *ShoppingCartWorkflow) Describe(
*shoppingcartv1.DescribeInput,
) (*shoppingcartv1.DescribeOutput, error) {
return &shoppingcartv1.DescribeOutput{
Cart: w.cart,
}, nil
}
func (w *ShoppingCartWorkflow) UpdateCart(
ctx workflow.Context,
input *shoppingcartv1.UpdateCartInput,
) (*shoppingcartv1.UpdateCartOutput, error) {
switch input.GetAction() {
case shoppingcartv1.UpdateCartAction_UPDATE_CART_ACTION_ADD:
w.cart.Items[input.GetItemId()] += 1
case shoppingcartv1.UpdateCartAction_UPDATE_CART_ACTION_REMOVE:
w.cart.Items[input.GetItemId()] -= 1
if w.cart.Items[input.GetItemId()] <= 0 {
delete(w.cart.Items, input.GetItemId())
}
default:
return nil, temporal.NewNonRetryableApplicationError(
"Invalid update cart action",
"Unimplemented",
nil,
)
}
return &shoppingcartv1.UpdateCartOutput{
Cart: proto.Clone(w.cart).(*shoppingcartv1.CartState),
}, nil
}
func (w *ShoppingCartWorkflow) ValidateUpdateCart(
ctx workflow.Context,
input *shoppingcartv1.UpdateCartInput,
) error {
if input.GetAction() == shoppingcartv1.UpdateCartAction_UPDATE_CART_ACTION_UNSPECIFIED ||
input.GetItemId() == "" {
return temporal.NewNonRetryableApplicationError(
"Invalid update cart request",
"InvalidArgument",
nil,
)
}
if input.GetAction() == shoppingcartv1.UpdateCartAction_UPDATE_CART_ACTION_REMOVE &&
w.cart.GetItems()[input.GetItemId()] == 0 {
return temporal.NewNonRetryableApplicationError("Item not found in cart", "NotFound", nil)
}
return nil
}
package main
import (
"fmt"
"log"
"log/slog"
"os"
"time"
"github.com/cludden/protoc-gen-go-temporal/examples/shoppingcart"
shoppingcartv1 "github.com/cludden/protoc-gen-go-temporal/gen/example/shoppingcart/v1"
"github.com/cludden/protoc-gen-go-temporal/gen/example/shoppingcart/v1/shoppingcartv1xns"
"github.com/urfave/cli/v2"
"go.temporal.io/api/workflowservice/v1"
"go.temporal.io/sdk/client"
tlog "go.temporal.io/sdk/log"
"go.temporal.io/sdk/worker"
"go.temporal.io/sdk/workflow"
"google.golang.org/protobuf/types/known/durationpb"
)
func main() {
app, err := shoppingcartv1.NewShoppingCartCli(
shoppingcartv1.NewShoppingCartCliOptions().
WithClient(newClient("shoppingcart")),
)
if err != nil {
log.Fatalf("failed to create shopping cart CLI: %v", err)
}
app.Commands = append(app.Commands, &cli.Command{
Name: "start",
Usage: "Start the shopping cart workers",
Action: start,
})
app.Flags = append(app.Flags, &cli.StringFlag{
Name: "log-level",
Aliases: []string{"l"},
Value: "debug",
Usage: "Set the logging level (debug, info, warn, error)",
})
if err := app.Run(os.Args); err != nil {
log.Fatal(err)
}
}
func start(cmd *cli.Context) error {
logger := newLogger(cmd)
c, err := newClient("default")(cmd)
if err != nil {
return fmt.Errorf("failed to create client: %w", err)
}
defer c.Close()
// register shoppingcart namespace
_, _ = c.WorkflowService().
RegisterNamespace(cmd.Context, &workflowservice.RegisterNamespaceRequest{
Namespace: "shoppingcart",
WorkflowExecutionRetentionPeriod: durationpb.New(24 * time.Hour),
})
// create client for shoppingcart namespace
sc, err := client.NewClientFromExistingWithContext(cmd.Context, c, client.Options{
Namespace: "shoppingcart",
Logger: tlog.NewStructuredLogger(logger),
})
if err != nil {
return fmt.Errorf("failed to create shopping cart client: %w", err)
}
defer sc.Close()
// create and start default worker
dw := worker.New(c, "default", worker.Options{
MaxHeartbeatThrottleInterval: 0,
DefaultHeartbeatThrottleInterval: 0,
})
shoppingcartv1xns.RegisterShoppingCartActivities(dw, shoppingcartv1.NewShoppingCartClient(sc))
// register default workflow
dw.RegisterWorkflowWithOptions(func(ctx workflow.Context) error {
_, run, err := shoppingcartv1xns.ShoppingCartWithUpdateCart(
ctx,
&shoppingcartv1.ShoppingCartInput{},
&shoppingcartv1.UpdateCartInput{
Action: shoppingcartv1.UpdateCartAction_UPDATE_CART_ACTION_ADD,
ItemId: "foo",
},
)
if err != nil {
return err
}
workflow.Go(ctx, func(ctx workflow.Context) {
workflow.GetSignalChannel(ctx, shoppingcartv1.CheckoutSignalName).Receive(ctx, nil)
if err := run.Checkout(ctx, &shoppingcartv1.CheckoutInput{}); err != nil {
workflow.GetLogger(ctx).Error("failed to checkout", "error", err)
}
})
if _, err := run.Get(ctx); err != nil {
workflow.GetLogger(ctx).Error("failed to get shopping cart", "error", err)
}
return nil
}, workflow.RegisterOptions{Name: "default"})
if err := dw.Start(); err != nil {
return fmt.Errorf("failed to start worker: %w", err)
}
defer dw.Stop()
// create and start shoppingcart worker
sw := worker.New(sc, shoppingcartv1.ShoppingCartTaskQueue, worker.Options{})
shoppingcartv1.RegisterShoppingCartWorkflows(sw, &shoppingcart.Workflows{})
if err := sw.Start(); err != nil {
return fmt.Errorf("failed to start shopping cart worker: %w", err)
}
defer sw.Stop()
<-cmd.Context.Done()
log.Println("shutting down workers...")
return nil
}
func newClient(namespace string) func(cmd *cli.Context) (client.Client, error) {
return func(cmd *cli.Context) (client.Client, error) {
return client.DialContext(cmd.Context, client.Options{
Namespace: namespace,
Logger: tlog.NewStructuredLogger(newLogger(cmd)),
})
}
}
func newLogger(cmd *cli.Context) *slog.Logger {
var level slog.Level
switch cmd.String("log-level") {
case "error":
level = slog.LevelError
case "warn":
level = slog.LevelWarn
case "info":
level = slog.LevelInfo
default:
level = slog.LevelDebug
}
return slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{
Level: level,
}))
}
Run this example
- Clone the examples
git clone https://github.com/cludden/protoc-gen-go-temporal && cd protoc-gen-go-temporal
- Run a local Temporal server
temporal server start-dev
- Start the example workers
go run examples/shoppingcart/cmd/shoppingcart/main.go start
- In a different shell, start a shopping cart workflow and add an item
go run examples/shoppingcart/cmd/shoppingcart/main.go shopping-cart-with-update-cart \
--req='{}' \
--update-cart-input='{"action": "UPDATE_CART_ACTION_ADD", "itemId": "apple"}' \
-d - Query the cart contents
go run examples/shoppingcart/cmd/shoppingcart/main.go describe \
-w <workflow-id-from-step-4> - Add more items using updates
go run examples/shoppingcart/cmd/shoppingcart/main.go update-cart \
-w <workflow-id-from-step-4> \
--input='{"action": "UPDATE_CART_ACTION_ADD", "itemId": "banana"}' - Checkout the cart (completes the workflow)
go run examples/shoppingcart/cmd/shoppingcart/main.go checkout \
-w <workflow-id-from-step-4>
Key Features Demonstrated
Stateful Workflow
The ShoppingCart
workflow maintains cart state across its execution, persisting item quantities in a map.
Query Support
The Describe
query allows external inspection of the current cart state without affecting workflow execution.
Signal Handling
The Checkout
signal triggers workflow completion, demonstrating asynchronous communication patterns.
Update with Validation
The UpdateCart
update includes validation logic that prevents invalid operations (e.g., removing items not in the cart).
Cross-Namespace Activities
The example demonstrates cross-namespace execution patterns using update-with-start, allowing workflows to be initiated from different namespaces.
Continue-as-New
The workflow monitors for continue-as-new suggestions and handles them gracefully, ensuring long-running cart sessions don't hit history size limits.