-
Notifications
You must be signed in to change notification settings - Fork 110
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Refactor context prop support to allow things like opentelemtry to work properly with the cadence sdk #618
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -23,6 +23,9 @@ | |
* Context Propagators are used to propagate information from workflow to activity, workflow to | ||
* child workflow, and workflow to child thread (using {@link com.uber.cadence.workflow.Async}). | ||
* | ||
* <p>It is important to note that all threads share one ContextPropagator instance, so your | ||
* implementation <b>must</b> be thread-safe and store any state in ThreadLocal variables. | ||
* | ||
* <p>A sample <code>ContextPropagator</code> that copies all {@link org.slf4j.MDC} entries starting | ||
* with a given prefix along the code path looks like this: | ||
* | ||
|
@@ -136,4 +139,31 @@ public interface ContextPropagator { | |
|
||
/** Sets the current context */ | ||
void setCurrentContext(Object context); | ||
|
||
/** | ||
* This is a lifecycle method, called after the context has been propagated to the | ||
* workflow/activity thread but the workflow/activity has not yet started. | ||
*/ | ||
default void setUp() { | ||
// No-op | ||
} | ||
|
||
/** | ||
* This is a lifecycle method, called after the workflow/activity has completed. If the method | ||
* finished without exception, {@code successful} will be true. Otherwise, it will be false and | ||
* {@link #onError(Throwable)} will have already been called. | ||
*/ | ||
default void finish() { | ||
// No-op | ||
} | ||
|
||
/** | ||
* This is a lifecycle method, called when the workflow/activity finishes by throwing an unhandled | ||
* exception. {@link #finish()} is called after this method. | ||
* | ||
* @param t The unhandled exception that caused the workflow/activity to terminate | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: |
||
*/ | ||
default void onError(Throwable t) { | ||
// No-op | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,144 @@ | ||
/* | ||
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. | ||
* | ||
* Modifications copyright (C) 2017 Uber Technologies, Inc. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"). You may not | ||
* use this file except in compliance with the License. A copy of the License is | ||
* located at | ||
* | ||
* http://aws.amazon.com/apache2.0 | ||
* | ||
* or in the "license" file accompanying this file. This file is distributed on | ||
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either | ||
* express or implied. See the License for the specific language governing | ||
* permissions and limitations under the License. | ||
*/ | ||
|
||
package com.uber.cadence.context; | ||
|
||
import io.opentelemetry.api.GlobalOpenTelemetry; | ||
import io.opentelemetry.api.baggage.Baggage; | ||
import io.opentelemetry.api.baggage.propagation.W3CBaggagePropagator; | ||
import io.opentelemetry.api.trace.Span; | ||
import io.opentelemetry.api.trace.SpanKind; | ||
import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator; | ||
import io.opentelemetry.context.Context; | ||
import io.opentelemetry.context.Scope; | ||
import io.opentelemetry.context.propagation.TextMapGetter; | ||
import io.opentelemetry.context.propagation.TextMapPropagator; | ||
import io.opentelemetry.context.propagation.TextMapSetter; | ||
import java.nio.charset.Charset; | ||
import java.util.HashMap; | ||
import java.util.Map; | ||
import javax.annotation.Nullable; | ||
import org.slf4j.MDC; | ||
|
||
public class OpenTelemetryContextPropagator implements ContextPropagator { | ||
|
||
private static final TextMapPropagator w3cTraceContextPropagator = | ||
W3CTraceContextPropagator.getInstance(); | ||
private static final TextMapPropagator w3cBaggagePropagator = W3CBaggagePropagator.getInstance(); | ||
private static ThreadLocal<Scope> currentContextOtelScope = new ThreadLocal<>(); | ||
private static ThreadLocal<Span> currentOtelSpan = new ThreadLocal<>(); | ||
private static ThreadLocal<Scope> currentOtelScope = new ThreadLocal<>(); | ||
private static ThreadLocal<Iterable<String>> otelKeySet = new ThreadLocal<>(); | ||
private static final TextMapSetter<Map<String, String>> setter = Map::put; | ||
private static final TextMapGetter<Map<String, String>> getter = | ||
new TextMapGetter<Map<String, String>>() { | ||
@Override | ||
public Iterable<String> keys(Map<String, String> carrier) { | ||
return otelKeySet.get(); | ||
} | ||
|
||
@Nullable | ||
@Override | ||
public String get(Map<String, String> carrier, String key) { | ||
return MDC.get(key); | ||
} | ||
}; | ||
|
||
@Override | ||
public String getName() { | ||
return "OpenTelemetry"; | ||
} | ||
|
||
@Override | ||
public Map<String, byte[]> serializeContext(Object context) { | ||
Map<String, byte[]> serializedContext = new HashMap<>(); | ||
Map<String, String> contextMap = (Map<String, String>) context; | ||
if (contextMap != null) { | ||
for (Map.Entry<String, String> entry : contextMap.entrySet()) { | ||
serializedContext.put(entry.getKey(), entry.getValue().getBytes(Charset.defaultCharset())); | ||
} | ||
} | ||
return serializedContext; | ||
} | ||
|
||
@Override | ||
public Object deserializeContext(Map<String, byte[]> context) { | ||
Map<String, String> contextMap = new HashMap<>(); | ||
for (Map.Entry<String, byte[]> entry : context.entrySet()) { | ||
contextMap.put(entry.getKey(), new String(entry.getValue(), Charset.defaultCharset())); | ||
} | ||
return contextMap; | ||
} | ||
|
||
@Override | ||
public Object getCurrentContext() { | ||
Map<String, String> carrier = new HashMap<>(); | ||
w3cTraceContextPropagator.inject(Context.current(), carrier, setter); | ||
w3cBaggagePropagator.inject(Context.current(), carrier, setter); | ||
return carrier; | ||
} | ||
|
||
@Override | ||
public void setCurrentContext(Object context) { | ||
Map<String, String> contextMap = (Map<String, String>) context; | ||
if (contextMap != null) { | ||
for (Map.Entry<String, String> entry : contextMap.entrySet()) { | ||
MDC.put(entry.getKey(), entry.getValue()); | ||
} | ||
otelKeySet.set(contextMap.keySet()); | ||
} | ||
} | ||
|
||
@Override | ||
@SuppressWarnings("MustBeClosedChecker") | ||
public void setUp() { | ||
Context context = | ||
Baggage.fromContext(w3cBaggagePropagator.extract(Context.current(), null, getter)) | ||
.toBuilder() | ||
.build() | ||
.storeInContext(w3cTraceContextPropagator.extract(Context.current(), null, getter)); | ||
|
||
currentContextOtelScope.set(context.makeCurrent()); | ||
|
||
Span span = | ||
GlobalOpenTelemetry.getTracer("cadence-client") | ||
.spanBuilder("cadence.workflow") | ||
Comment on lines
+118
to
+119
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not familar with opentelemtry -- How are these two properties used? I wonder if we should avoid harhcoding here because users may need to customize them. |
||
.setParent(context) | ||
.setSpanKind(SpanKind.CLIENT) | ||
.startSpan(); | ||
|
||
Scope scope = span.makeCurrent(); | ||
currentOtelSpan.set(span); | ||
currentOtelScope.set(scope); | ||
} | ||
|
||
@Override | ||
public void finish() { | ||
Scope scope = currentOtelScope.get(); | ||
if (scope != null) { | ||
scope.close(); | ||
} | ||
Span span = currentOtelSpan.get(); | ||
if (span != null) { | ||
span.end(); | ||
} | ||
Scope contextScope = currentContextOtelScope.get(); | ||
if (contextScope != null) { | ||
contextScope.close(); | ||
} | ||
} | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why the |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -52,6 +52,7 @@ | |
import java.util.concurrent.TimeUnit; | ||
import java.util.concurrent.TimeoutException; | ||
import java.util.concurrent.atomic.AtomicReference; | ||
import java.util.stream.Collectors; | ||
|
||
class WorkflowStubImpl implements WorkflowStub { | ||
|
||
|
@@ -204,7 +205,18 @@ private Map<String, byte[]> extractContextsAndConvertToBytes( | |
} | ||
Map<String, byte[]> result = new HashMap<>(); | ||
for (ContextPropagator propagator : contextPropagators) { | ||
result.putAll(propagator.serializeContext(propagator.getCurrentContext())); | ||
// Get the serialized context from the propagator | ||
Map<String, byte[]> serializedContext = | ||
propagator.serializeContext(propagator.getCurrentContext()); | ||
// Namespace each entry in case of overlaps, so foo -> bar becomes MyPropagator:foo -> bar | ||
Map<String, byte[]> namespacedSerializedContext = | ||
serializedContext | ||
.entrySet() | ||
.stream() | ||
.collect( | ||
Collectors.toMap( | ||
k -> propagator.getName() + ":" + k.getKey(), Map.Entry::getValue)); | ||
result.putAll(namespacedSerializedContext); | ||
Comment on lines
+208
to
+219
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. maybe move to a util to reuse the code. It's repeated 4 times for ChildWF, WF, activity and localActivity. |
||
} | ||
return result; | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where/what is
{@code successful}
?