Nexus
Nexus is a Temporal platform feature that enables reliable connectivity between Temporal Applications, promoting modular microservice architectures. This plugin generates protoc-gen-go-nexus compatible Nexus service handlers to simplify nexus setup.
Overviewβ
Nexus allows services to expose operations as well-defined service contracts that other services can reliably invoke. The plugin generates:
- Nexus Service Handlers: Server-side components that expose workflows as Nexus operations
- Registration Functions: Helpers for registering Nexus services with workers
Schema Definitionβ
There is no additional configuration required for Nexus services.
- Schema
- Go
syntax = "proto3";
package example.nexus.v1;
import "nexus/v1/options.proto";
import "temporal/v1/temporal.proto";
service GreetingService {
option (temporal.v1.service) = {task_queue: "greeting-v1"};
// generates a friendly greeting based on the input name and language
rpc Hello(HelloInput) returns (HelloOutput) {
option (temporal.v1.workflow) = {
id: 'example.nexus.v1.Hello/${! language.or(throw("language required")) }/${! name.slug() }'
};
}
}
message HelloInput {
string name = 1;
Language language = 2;
}
message HelloOutput {
string message = 1;
}
enum Language {
LANGUAGE_UNSPECIFIED = 0;
LANGUAGE_ENGLISH = 1;
LANGUAGE_SPANISH = 2;
LANGUAGE_FRENCH = 3;
}
package greeting
import (
"context"
nexusv1 "path/to/gen/example/nexus/v1"
"path/to/gen/example/nexus/v1/nexusv1temporal"
"go.temporal.io/sdk/worker"
"go.temporal.io/sdk/workflow"
)
type (
workflows struct{}
helloWorkflow struct {
*workflows
*nexusv1.HelloWorkflowInput
}
)
// Register registers workflows and nexus service with worker
func Register(r worker.Registry) error {
nexusv1.RegisterGreetingServiceWorkflows(r, &workflows{})
return nexusv1temporal.RegisterGreetingServiceNexusService(r)
}
// Hello creates a new hello workflow instance
func (w *workflows) Hello(ctx workflow.Context, input *nexusv1.HelloWorkflowInput) (nexusv1.HelloWorkflow, error) {
return &helloWorkflow{w, input}, nil
}
// Execute implements the hello workflow logic
func (w *helloWorkflow) Execute(ctx workflow.Context) (*nexusv1.HelloOutput, error) {
switch w.Req.Language {
case nexusv1.Language_LANGUAGE_ENGLISH:
return &nexusv1.HelloOutput{Message: "Hello " + w.Req.Name + " π"}, nil
case nexusv1.Language_LANGUAGE_SPANISH:
return &nexusv1.HelloOutput{Message: "Β‘Hola! " + w.Req.Name + " π"}, nil
default:
return &nexusv1.HelloOutput{Message: "Hello " + w.Req.Name + " π"}, nil
}
}
Service Configurationβ
Services can be configured to control which operations are exposed via Nexus using tags.
Service Tagsβ
Use the nexus.v1.service
option to configure service-level behavior:
service EchoService {
option (.nexus.v1.service).tags = "disabled";
option (temporal.v1.service) = {task_queue: "echo-v1"};
rpc Echo(EchoInput) returns (EchoOutput) {
option (temporal.v1.workflow) = {};
}
}
Services tagged with disabled
will not generate Nexus handlers, allowing you to selectively expose services.
Operation Tagsβ
Individual operations can also be controlled using the nexus.v1.operation
option:
service MyService {
rpc PublicOperation(Input) returns (Output) {
option (temporal.v1.workflow) = {};
option (nexus.v1.operation).tags = "public";
}
rpc InternalOperation(Input) returns (Output) {
option (temporal.v1.workflow) = {};
option (nexus.v1.operation).tags = "internal";
}
}
Generated Codeβ
The plugin generates several components for Nexus integration:
Service Handlerβ
A Nexus service handler that exposes workflow operations:
// Nexus handler for example.nexus.v1.GreetingService
type GreetingServiceNexusHandler struct{}
// Hello operation that executes the Hello workflow
func (h *GreetingServiceNexusHandler) Hello(name string) nexus.Operation[*HelloInput, *HelloOutput] {
return temporalnexus.MustNewWorkflowRunOperationWithOptions(
temporalnexus.WorkflowRunOperationOptions[*HelloInput, *HelloOutput]{
Name: name,
Handler: func(ctx context.Context, input *HelloInput, opts nexus.StartOperationOptions) (temporalnexus.WorkflowHandle[*HelloOutput], error) {
// Builds workflow options and executes workflow
o, err := NewHelloOptions().Build(input.ProtoReflect())
if err != nil {
return nil, err
}
return temporalnexus.ExecuteUntypedWorkflow[*HelloOutput](ctx, opts, o, HelloWorkflowName, input)
},
})
}
Registration Functionβ
A helper function to register the Nexus service with a worker:
// RegisterGreetingServiceNexusService initializes and registers the Nexus service
func RegisterGreetingServiceNexusService(r worker.NexusServiceRegistry) error {
svc, err := nexusv1nexus.NewGreetingServiceNexusService(&GreetingServiceNexusHandler{})
if err != nil {
return err
}
r.RegisterNexusService(svc)
return nil
}
Usage Examplesβ
Exposing a Serviceβ
Register workflows and Nexus service with a worker:
- Service Provider
- Service Consumer
package main
import (
"log"
nexusv1 "path/to/gen/example/nexus/v1"
"path/to/gen/example/nexus/v1/nexusv1temporal"
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/worker"
)
func main() {
c, err := client.Dial(client.Options{})
if err != nil {
log.Fatal(err)
}
defer c.Close()
w := worker.New(c, nexusv1.GreetingServiceTaskQueue, worker.Options{})
// Register workflows
nexusv1.RegisterGreetingServiceWorkflows(w, &Workflows{})
// Register Nexus service
if err := nexusv1temporal.RegisterGreetingServiceNexusService(w); err != nil {
log.Fatal(err)
}
if err := w.Run(worker.InterruptCh()); err != nil {
log.Fatal(err)
}
}
package consumer
import (
nexusv1 "path/to/gen/example/nexus/v1"
"path/to/gen/example/nexus/v1/nexusv1nexustemporal"
"go.temporal.io/sdk/workflow"
)
type EchoWorkflow struct {
greeting *nexusv1nexustemporal.GreetingServiceNexusClient
*nexusv1.EchoWorkflowInput
}
func (w *EchoWorkflow) Execute(ctx workflow.Context) (*nexusv1.EchoOutput, error) {
// Call the greeting service via Nexus
hello, err := w.greeting.Hello(ctx, &nexusv1.HelloInput{
Name: w.Req.GetName(),
Language: w.Req.GetLanguage(),
}, workflow.NexusOperationOptions{})
if err != nil {
return nil, err
}
return &nexusv1.EchoOutput{
Message: hello.GetMessage(),
}, nil
}
Creating Nexus Clientsβ
Generate and initialize Nexus clients in your workflow constructors using protoc-gen-go-nexus-temporal:
func Register(r worker.WorkflowRegistry, endpoint string) error {
// Create Nexus client for the greeting service
greetingClient := nexusv1nexustemporal.NewGreetingServiceNexusClient(endpoint)
w := &workflows{greeting: greetingClient}
nexusv1.RegisterEchoServiceWorkflows(r, w)
return nil
}
Asynchronous Operationsβ
Use async methods for fire-and-forget or concurrent operations:
func (w *MyWorkflow) Execute(ctx workflow.Context) error {
// Start multiple operations concurrently
future1 := w.greeting.HelloAsync(ctx, &nexusv1.HelloInput{
Name: "Alice", Language: nexusv1.Language_LANGUAGE_ENGLISH,
}, workflow.NexusOperationOptions{})
future2 := w.greeting.HelloAsync(ctx, &nexusv1.HelloInput{
Name: "Bob", Language: nexusv1.Language_LANGUAGE_SPANISH,
}, workflow.NexusOperationOptions{})
// Wait for both operations to complete
result1, err := future1.GetTyped(ctx)
if err != nil {
return err
}
result2, err := future2.GetTyped(ctx)
if err != nil {
return err
}
workflow.GetLogger(ctx).Info("Greetings received", "alice", result1.Message, "bob", result2.Message)
return nil
}
Configurationβ
Plugin Configurationβ
Configure Nexus code generation in your buf.gen.yaml
:
plugins:
- plugin: buf.build/cludden/protoc-gen-go-temporal
out: gen
opt:
- nexus=true
- nexus-service-include-tags=public,external
- nexus-service-exclude-tags=internal
- nexus-operation-include-tags=public
- nexus-operation-exclude-tags=internal
Available options:
nexus=true
: Enable Nexus code generationnexus-service-include-tags
: Only generate handlers for services with these tagsnexus-service-exclude-tags
: Skip services with these tagsnexus-operation-include-tags
: Only generate operations with these tagsnexus-operation-exclude-tags
: Skip operations with these tags
Nexus Endpoint Registrationβ
Register Nexus endpoints to make services discoverable:
func registerEndpoint(ctx context.Context, client client.Client, endpointName string) error {
_, err := client.OperatorService().CreateNexusEndpoint(ctx, &operatorservice.CreateNexusEndpointRequest{
Spec: &nexus.EndpointSpec{
Name: endpointName,
Target: &nexus.EndpointTarget{
Variant: &nexus.EndpointTarget_Worker_{
Worker: &nexus.EndpointTarget_Worker{
Namespace: "default",
TaskQueue: nexusv1.GreetingServiceTaskQueue,
},
},
},
},
})
return err
}