Skip to content

Commit

Permalink
Set Temporal-Namespace header on every namespace-specific gRPC request
Browse files Browse the repository at this point in the history
  • Loading branch information
Quinn-With-Two-Ns committed May 11, 2024
1 parent 81cd5dc commit 8a724cd
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 2 deletions.
17 changes: 17 additions & 0 deletions internal/grpc_dialer.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ const (

// defaultKeepAliveTimeout is the keep alive timeout if one is not specified.
defaultKeepAliveTimeout = 15 * time.Second

// temporalNamespaceHeaderKey is the header key that should contain the target namespace of the request.
temporalNamespaceHeaderKey = "temporal-namespace"
)

func dial(params dialParameters) (*grpc.ClientConn, error) {
Expand Down Expand Up @@ -176,9 +179,23 @@ func requiredInterceptors(
interceptors = append(interceptors, interceptor)
}
}
// Add namespace provider interceptor
interceptors = append(interceptors, namespaceProviderInterceptor())
return interceptors
}

func namespaceProviderInterceptor() grpc.UnaryClientInterceptor {
return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
if nsReq, ok := req.(interface{ GetNamespace() string }); ok {
// Only add namespace if it doesn't already exist
if md, _ := metadata.FromOutgoingContext(ctx); len(md.Get(temporalNamespaceHeaderKey)) == 0 {
ctx = metadata.AppendToOutgoingContext(ctx, temporalNamespaceHeaderKey, nsReq.GetNamespace())
}
}
return invoker(ctx, method, req, reply, cc, opts...)
}
}

func trafficControllerInterceptor(controller TrafficController) grpc.UnaryClientInterceptor {
return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
err := controller.CheckCallAllowed(ctx, method, req, reply)
Expand Down
33 changes: 31 additions & 2 deletions internal/grpc_dialer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,33 @@ func TestCredentialsAPIKey(t *testing.T) {
)
}

func TestNamespaceInterceptor(t *testing.T) {
srv, err := startTestGRPCServer()
require.NoError(t, err)
defer srv.Stop()

// Fixed string
client, err := DialClient(context.Background(), ClientOptions{
Namespace: "test-namespace",
HostPort: srv.addr,
})
require.NoError(t, err)
defer client.Close()
// Verify namespace header is set in the context even though getSystemInfo doesn't have it on the request
require.Equal(
t,
[]string{"test-namespace"},
metadata.ValueFromIncomingContext(srv.getSystemInfoRequestContext, temporalNamespaceHeaderKey),
)
// Verify namespace header is set on a request that does have namespace on the request
require.NoError(t, client.SignalWorkflow(context.Background(), "workflowid", "runid", "signalname", nil))
require.Equal(
t,
[]string{"test-namespace"},
metadata.ValueFromIncomingContext(srv.lastSignalWorkflowExecutionContext, temporalNamespaceHeaderKey),
)
}

func TestCredentialsMTLS(t *testing.T) {
// Just confirming option is set, not full end-to-end mTLS test

Expand Down Expand Up @@ -520,6 +547,7 @@ type testGRPCServer struct {
getSystemInfoRequestContext context.Context
getSystemInfoResponse workflowservice.GetSystemInfoResponse
getSystemInfoResponseError error
lastSignalWorkflowExecutionContext context.Context
signalWorkflowExecutionResponse workflowservice.SignalWorkflowExecutionResponse
signalWorkflowExecutionResponseError error
}
Expand Down Expand Up @@ -579,10 +607,11 @@ func (t *testGRPCServer) GetSystemInfo(
}

func (t *testGRPCServer) SignalWorkflowExecution(
context.Context,
*workflowservice.SignalWorkflowExecutionRequest,
ctx context.Context,
_ *workflowservice.SignalWorkflowExecutionRequest,
) (*workflowservice.SignalWorkflowExecutionResponse, error) {
atomic.AddInt32(&t.sigWfCount, 1)
t.lastSignalWorkflowExecutionContext = ctx
return &t.signalWorkflowExecutionResponse, t.signalWorkflowExecutionResponseError
}

Expand Down
3 changes: 3 additions & 0 deletions internal/internal_workflow_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
healthpb "google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/durationpb"

Expand Down Expand Up @@ -1167,6 +1168,8 @@ func (wc *WorkflowClient) loadCapabilities(ctx context.Context, getSystemInfoTim
}
ctx, cancel := context.WithTimeout(ctx, getSystemInfoTimeout)
defer cancel()
// Manually append the namespace to the header since the request does not have it.
ctx = metadata.AppendToOutgoingContext(ctx, temporalNamespaceHeaderKey, wc.namespace)
resp, err := wc.workflowService.GetSystemInfo(ctx, &workflowservice.GetSystemInfoRequest{})
// We ignore unimplemented
if _, isUnimplemented := err.(*serviceerror.Unimplemented); err != nil && !isUnimplemented {
Expand Down

0 comments on commit 8a724cd

Please sign in to comment.