Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Opentracing #478

Open
wants to merge 15 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,13 @@ dependencies {
compile group: 'com.google.guava', name: 'guava', version: '28.1-jre'
compile group: 'com.cronutils', name: 'cron-utils', version: '9.0.0'
compile group: 'io.micrometer', name: 'micrometer-core', version: '1.1.2'
compile group: 'io.opentracing', name: 'opentracing-api', version: '0.33.0'
compile group: 'io.opentracing', name: 'opentracing-util', version: '0.33.0'

testCompile group: 'junit', name: 'junit', version: '4.12'
testCompile group: 'com.googlecode.junit-toolbox', name: 'junit-toolbox', version: '2.4'
testCompile group: 'ch.qos.logback', name: 'logback-classic', version: '1.2.3'
testCompile group: 'io.opentracing', name: 'opentracing-mock', version: '0.33.0'
}

license {
Expand Down
30 changes: 30 additions & 0 deletions src/main/java/com/uber/cadence/context/ContextPropagator.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
* Context Propagators are used to propagate information from workflow to activity, workflow to
* child workflow, and workflow to child thread (using {@link com.uber.cadence.workflow.Async}).
*
* <p>It is important to note that all threads share one ContextPropagator instance, so your
* implementation <b>must</b> be thread-safe and store any state in ThreadLocal variables.
*
* <p>A sample <code>ContextPropagator</code> that copies all {@link org.slf4j.MDC} entries starting
* with a given prefix along the code path looks like this:
*
Expand Down Expand Up @@ -136,4 +139,31 @@ public interface ContextPropagator {

/** Sets the current context */
void setCurrentContext(Object context);

/**
* This is a lifecycle method, called after the context has been propagated to the
* workflow/activity thread but the workflow/activity has not yet started.
*/
default void setUp() {
// No-op
}

/**
* This is a lifecycle method, called after the workflow/activity has completed. If the method
* finished without exception, {@code successful} will be true. Otherwise, it will be false and
* {@link #onError(Throwable)} will have already been called.
*/
default void finish(boolean successful) {
// No-op
}

/**
* This is a lifecycle method, called when the workflow/activity finishes by throwing an unhandled
* exception. {@link #finish(boolean)} is called after this method.
*
* @param t The unhandled exception that caused the workflow/activity to terminate
*/
default void onError(Throwable t) {
// No-op
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
/*
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
* use this file except in compliance with the License. A copy of the License is
* located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.uber.cadence.context;

import com.uber.cadence.internal.logging.LoggerTag;
import io.opentracing.Scope;
import io.opentracing.Span;
import io.opentracing.SpanContext;
import io.opentracing.Tracer;
import io.opentracing.log.Fields;
import io.opentracing.propagation.Format;
import io.opentracing.propagation.TextMap;
import io.opentracing.tag.Tags;
import io.opentracing.util.GlobalTracer;
import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

/** Support for OpenTracing spans */
public class OpenTracingContextPropagator implements ContextPropagator {

private static final Logger log = LoggerFactory.getLogger(OpenTracingContextPropagator.class);

private static ThreadLocal<SpanContext> currentOpenTracingSpanContext = new ThreadLocal<>();
private static ThreadLocal<Span> currentOpenTracingSpan = new ThreadLocal<>();
private static ThreadLocal<Scope> currentOpenTracingScope = new ThreadLocal<>();

public static void setCurrentOpenTracingSpanContext(SpanContext ctx) {
if (ctx != null) {
currentOpenTracingSpanContext.set(ctx);
}
}

public static SpanContext getCurrentOpenTracingSpanContext() {
return currentOpenTracingSpanContext.get();
}

@Override
public String getName() {
return "OpenTracing";
}

@Override
public Map<String, byte[]> serializeContext(Object context) {
Map<String, byte[]> serializedContext = new HashMap<>();
Map<String, String> contextMap = (Map<String, String>) context;
if (contextMap != null) {
for (Map.Entry<String, String> entry : contextMap.entrySet()) {
serializedContext.put(entry.getKey(), entry.getValue().getBytes(Charset.defaultCharset()));
}
}
return serializedContext;
}

@Override
public Object deserializeContext(Map<String, byte[]> context) {
Map<String, String> contextMap = new HashMap<>();
for (Map.Entry<String, byte[]> entry : context.entrySet()) {
contextMap.put(entry.getKey(), new String(entry.getValue(), Charset.defaultCharset()));
}
return contextMap;
}

@Override
public Object getCurrentContext() {
Tracer currentTracer = GlobalTracer.get();
Span currentSpan = currentTracer.scopeManager().activeSpan();
if (currentSpan != null) {
HashMapTextMap contextTextMap = new HashMapTextMap();
currentTracer.inject(currentSpan.context(), Format.Builtin.TEXT_MAP, contextTextMap);
return contextTextMap.getBackingMap();
} else {
return null;
}
}

@Override
public void setCurrentContext(Object context) {
Tracer currentTracer = GlobalTracer.get();
Map<String, String> contextAsMap = (Map<String, String>) context;
if (contextAsMap != null) {
HashMapTextMap contextTextMap = new HashMapTextMap(contextAsMap);
setCurrentOpenTracingSpanContext(
currentTracer.extract(Format.Builtin.TEXT_MAP, contextTextMap));
}
}

@Override
public void setUp() {
Tracer openTracingTracer = GlobalTracer.get();
Tracer.SpanBuilder builder =
openTracingTracer
.buildSpan("cadence.workflow")
.withTag("resource.name", MDC.get(LoggerTag.WORKFLOW_TYPE));

if (getCurrentOpenTracingSpanContext() != null) {
builder.asChildOf(getCurrentOpenTracingSpanContext());
}

Span span = builder.start();
openTracingTracer.activateSpan(span);
currentOpenTracingSpan.set(span);
Scope scope = openTracingTracer.activateSpan(span);
currentOpenTracingScope.set(scope);
}

@Override
public void onError(Throwable t) {
Span span = currentOpenTracingSpan.get();
if (span != null) {
Tags.ERROR.set(span, true);
Map<String, Object> errorData = new HashMap<>();
errorData.put(Fields.EVENT, "error");
if (t != null) {
errorData.put(Fields.ERROR_OBJECT, t);
errorData.put(Fields.MESSAGE, t.getMessage());
}
span.log(errorData);
}
}

@Override
public void finish(boolean successful) {
Scope currentScope = currentOpenTracingScope.get();
Span currentSpan = currentOpenTracingSpan.get();

if (currentScope != null) {
currentScope.close();
}

if (currentSpan != null) {
currentSpan.finish();
}

currentOpenTracingScope.remove();
currentOpenTracingSpan.remove();
currentOpenTracingSpanContext.remove();
}

/** Just check for other instances of the same class */
@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
}

if (this == obj) {
return true;
}

if (this.getClass().equals(obj.getClass())) {
return true;
}

return false;
}

@Override
public int hashCode() {
return this.getClass().hashCode();
}

private class HashMapTextMap implements TextMap {

private final HashMap<String, String> backingMap = new HashMap<>();

public HashMapTextMap() {
// Noop
}

public HashMapTextMap(Map<String, String> spanData) {
backingMap.putAll(spanData);
}

@Override
public Iterator<Map.Entry<String, String>> iterator() {
return backingMap.entrySet().iterator();
}

@Override
public void put(String key, String value) {
backingMap.put(key, value);
}

public HashMap<String, String> getBackingMap() {
return backingMap;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,14 @@
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** This class holds the current set of context propagators */
public class ContextThreadLocal {

private static final Logger log = LoggerFactory.getLogger(ContextThreadLocal.class);

private static WorkflowThreadLocal<List<ContextPropagator>> contextPropagators =
WorkflowThreadLocal.withInitial(
new Supplier<List<ContextPropagator>>() {
Expand Down Expand Up @@ -57,6 +61,11 @@ public static Map<String, Object> getCurrentContextForPropagation() {
return contextData;
}

/**
* Injects the context data into the thread for each configured context propagator
*
* @param contextData The context data received from the server
*/
public static void propagateContextToCurrentThread(Map<String, Object> contextData) {
if (contextData == null || contextData.isEmpty()) {
return;
Expand All @@ -67,4 +76,49 @@ public static void propagateContextToCurrentThread(Map<String, Object> contextDa
}
}
}

/** Calls {@link ContextPropagator#setUp()} for each propagator */
public static void setUpContextPropagators() {
for (ContextPropagator propagator : contextPropagators.get()) {
try {
propagator.setUp();
} catch (Throwable t) {
// Don't let an error in one propagator block the others
log.error("Error calling setUp() on a contextpropagator", t);
}
}
}

/**
* Calls {@link ContextPropagator#onError(Throwable)} for each propagator
*
* @param t The Throwable that caused the workflow/activity to finish
*/
public static void onErrorContextPropagators(Throwable t) {
for (ContextPropagator propagator : contextPropagators.get()) {
try {
propagator.onError(t);
} catch (Throwable t1) {
// Don't let an error in one propagator block the others
log.error("Error calling onError() on a contextpropagator", t1);
}
}
}

/**
* Calls {@link ContextPropagator#finish(boolean)} for each propagator
*
* @param successful True if the workflow/activity completed without unhandled exception, false
* otherwise
*/
public static void finishContextPropagators(boolean successful) {
for (ContextPropagator propagator : contextPropagators.get()) {
try {
propagator.finish(successful);
} catch (Throwable t) {
// Don't let an error in one propagator block the others
log.error("Error calling finish() on a contextpropagator", t);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

final class WorkflowContext {

Expand Down Expand Up @@ -166,7 +167,18 @@ Map<String, Object> getPropagatedContexts() {

Map<String, Object> contextData = new HashMap<>();
for (ContextPropagator propagator : contextPropagators) {
contextData.put(propagator.getName(), propagator.deserializeContext(headerData));
// Only send the context propagator the fields that belong to them
// Change the map from MyPropagator:foo -> bar to foo -> bar
Map<String, byte[]> filteredData =
headerData
.entrySet()
.stream()
.filter(e -> e.getKey().startsWith(propagator.getName()))
.collect(
Collectors.toMap(
e -> e.getKey().substring(propagator.getName().length() + 1),
Map.Entry::getValue));
contextData.put(propagator.getName(), propagator.deserializeContext(filteredData));
}

return contextData;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -449,7 +450,18 @@ private Map<String, byte[]> extractContextsAndConvertToBytes(
}
Map<String, byte[]> result = new HashMap<>();
for (ContextPropagator propagator : contextPropagators) {
result.putAll(propagator.serializeContext(propagator.getCurrentContext()));
// Get the serialized context from the propagator
Map<String, byte[]> serializedContext =
propagator.serializeContext(propagator.getCurrentContext());
// Namespace each entry in case of overlaps, so foo -> bar becomes MyPropagator:foo -> bar
Map<String, byte[]> namespacedSerializedContext =
serializedContext
.entrySet()
.stream()
.collect(
Collectors.toMap(
e -> propagator.getName() + ":" + e.getKey(), Map.Entry::getValue));
result.putAll(namespacedSerializedContext);
}
return result;
}
Expand Down
Loading