From 20cca413f55dd19d3ff4d12ada55c55f397a37fa Mon Sep 17 00:00:00 2001 From: Michael Nedokushev Date: Thu, 16 Feb 2023 14:47:46 +0000 Subject: [PATCH] Backport OTEL autoinstrumentation from series/2.x --- .../opentelemetry/ContextStorage.scala | 62 ++++++++ .../opentelemetry/PropagatingSupervisor.scala | 50 ++++++ .../zio/telemetry/opentelemetry/Tracing.scala | 149 ++++++++++-------- 3 files changed, 191 insertions(+), 70 deletions(-) create mode 100644 opentelemetry/src/main/scala/zio/telemetry/opentelemetry/ContextStorage.scala create mode 100644 opentelemetry/src/main/scala/zio/telemetry/opentelemetry/PropagatingSupervisor.scala diff --git a/opentelemetry/src/main/scala/zio/telemetry/opentelemetry/ContextStorage.scala b/opentelemetry/src/main/scala/zio/telemetry/opentelemetry/ContextStorage.scala new file mode 100644 index 00000000..5ac1f1ff --- /dev/null +++ b/opentelemetry/src/main/scala/zio/telemetry/opentelemetry/ContextStorage.scala @@ -0,0 +1,62 @@ +package zio.telemetry.opentelemetry + +import io.opentelemetry.context.Context +import zio.{ FiberRef, UIO, ZIO } + +private[opentelemetry] trait ContextStorage { + + def get: UIO[Context] + + def set(context: Context): UIO[Unit] + + def getAndSet(context: Context): UIO[Context] + + def updateAndGet(f: Context => Context): UIO[Context] + + def locally[R, E, A](context: Context)(effect: ZIO[R, E, A]): ZIO[R, E, A] + +} + +private[opentelemetry] object ContextStorage { + + /** + * Use provided [[FiberRef]] as a [[ContextStorage]]. + */ + def fiberRef(ref: FiberRef[Context]): ContextStorage = new ContextStorage { + override def get: UIO[Context] = ref.get + override def set(context: Context): UIO[Unit] = ref.set(context) + override def getAndSet(context: Context): UIO[Context] = ref.getAndSet(context) + override def updateAndGet(f: Context => Context): UIO[Context] = ref.updateAndGet(f) + override def locally[R, E, A](context: Context)(effect: ZIO[R, E, A]): ZIO[R, E, A] = ref.locally(context)(effect) + } + + /** + * Use OpenTelemetry's default context storage which is backed by a [[ThreadLocal]]. This makes sense only if + * [[PropagatingSupervisor]] is used. + */ + def threadLocal: ContextStorage = new ContextStorage { + + override def get: UIO[Context] = ZIO.succeed(Context.current()) + + override def set(context: Context): UIO[Unit] = ZIO.succeed(context.makeCurrent()).unit + + override def getAndSet(context: Context): UIO[Context] = + ZIO.succeed { + val old = Context.current() + val _ = context.makeCurrent() + old + }.uninterruptible + + override def updateAndGet(f: Context => Context): UIO[Context] = + ZIO.succeed { + val updated = f(Context.current()) + val _ = updated.makeCurrent() + updated + }.uninterruptible + + override def locally[R, E, A](context: Context)(effect: ZIO[R, E, A]): ZIO[R, E, A] = + ZIO.bracket(get <* set(context))(set)(_ => effect) + + } + +} diff --git a/opentelemetry/src/main/scala/zio/telemetry/opentelemetry/PropagatingSupervisor.scala b/opentelemetry/src/main/scala/zio/telemetry/opentelemetry/PropagatingSupervisor.scala new file mode 100644 index 00000000..04f2901f --- /dev/null +++ b/opentelemetry/src/main/scala/zio/telemetry/opentelemetry/PropagatingSupervisor.scala @@ -0,0 +1,50 @@ +package zio.telemetry.opentelemetry + +import io.opentelemetry.api.trace.Span +import io.opentelemetry.context.Context +import zio.Supervisor.Propagation +import zio._ + +import java.util.concurrent.ConcurrentHashMap +import scala.annotation.nowarn + +@nowarn("msg=discarded non-Unit value") +final class PropagatingSupervisor extends Supervisor[Unit] { + + private val storage = new ConcurrentHashMap[Fiber.Id, Span]() + + override def value: UIO[Unit] = ZIO.unit + + override def unsafeOnStart[R, E, A]( + environment: R, + effect: ZIO[R, E, A], + parent: Option[Fiber.Runtime[Any, Any]], + fiber: Fiber.Runtime[E, A] + ): Propagation = { + val span = Span.current() + if (span != null) storage.put(fiber.id, span) + else storage.put(fiber.id, Span.fromContext(Context.root())) + + Propagation.Continue + } + + override def unsafeOnEnd[R, E, A](value: Exit[E, A], fiber: Fiber.Runtime[E, A]): Propagation = { + storage.remove(fiber.id) + Context.root().makeCurrent() + + Propagation.Continue + } + + override def unsafeOnSuspend[E, A](fiber: Fiber.Runtime[E, A]): Unit = { + val span = Span.current() + if (span != null) storage.put(fiber.id, span) + else storage.put(fiber.id, Span.fromContext(Context.root())) + Context.root().makeCurrent() + } + + override def unsafeOnResume[E, A](fiber: Fiber.Runtime[E, A]): Unit = { + val span = storage.get(fiber.id) + if (span != null) span.makeCurrent() + } + +} diff --git a/opentelemetry/src/main/scala/zio/telemetry/opentelemetry/Tracing.scala b/opentelemetry/src/main/scala/zio/telemetry/opentelemetry/Tracing.scala index 1bd6c9ec..15d555e2 100644 --- a/opentelemetry/src/main/scala/zio/telemetry/opentelemetry/Tracing.scala +++ b/opentelemetry/src/main/scala/zio/telemetry/opentelemetry/Tracing.scala @@ -18,7 +18,7 @@ import scala.jdk.CollectionConverters._ object Tracing { trait Service { private[opentelemetry] def currentNanos: UIO[Long] - private[opentelemetry] val currentContext: FiberRef[Context] + private[opentelemetry] val currentContext: ContextStorage private[opentelemetry] def createRoot(spanName: String, spanKind: SpanKind): UManaged[Context] private[opentelemetry] def createChildOf(parent: Context, spanName: String, spanKind: SpanKind): UManaged[Context] private[opentelemetry] def createChildOfUnsafe(parent: Context, spanName: String, spanKind: SpanKind): UIO[Context] @@ -29,7 +29,7 @@ object Tracing { private def currentNanos: URIO[Tracing, Long] = ZIO.serviceWith[Tracing.Service](_.currentNanos) - private def currentContext: URIO[Tracing, FiberRef[Context]] = + private def currentContext: URIO[Tracing, ContextStorage] = ZIO.access[Tracing](_.get.currentContext) private def createRoot(spanName: String, spanKind: SpanKind): URManaged[Tracing, Context] = @@ -331,78 +331,87 @@ object Tracing { getCurrentSpan.map(_.getSpanContext()) def managed(tracer: Tracer): URManaged[Clock, Service] = { - class Live(defaultContext: FiberRef[Context], clock: Clock.Service) extends Service { - private def endSpan(span: Span): UIO[Unit] = currentNanos.map(span.end(_, TimeUnit.NANOSECONDS)) - - def currentNanos: UIO[Long] = clock.currentTime(TimeUnit.NANOSECONDS) - - def createRoot(spanName: String, spanKind: SpanKind): UManaged[Context] = - for { - nanoSeconds <- currentNanos.toManaged_ - span <- ZManaged.make( - UIO( - tracer - .spanBuilder(spanName) - .setNoParent() - .setSpanKind(spanKind) - .setStartTimestamp(nanoSeconds, TimeUnit.NANOSECONDS) - .startSpan() - ) - )(endSpan) - } yield span.storeInContext(Context.root()) - - def createChildOf(parent: Context, spanName: String, spanKind: SpanKind): UManaged[Context] = - for { - nanoSeconds <- currentNanos.toManaged_ - span <- ZManaged.make( - UIO( - tracer - .spanBuilder(spanName) - .setParent(parent) - .setSpanKind(spanKind) - .setStartTimestamp(nanoSeconds, TimeUnit.NANOSECONDS) - .startSpan() - ) - )(endSpan) - } yield span.storeInContext(parent) - - def createChildOfUnsafe(parent: Context, spanName: String, spanKind: SpanKind): UIO[Context] = - for { - nanoSeconds <- currentNanos - span <- - UIO( - tracer - .spanBuilder(spanName) - .setParent(parent) - .setSpanKind(spanKind) - .setStartTimestamp(nanoSeconds, TimeUnit.NANOSECONDS) - .startSpan() - ) - } yield span.storeInContext(parent) - - override private[opentelemetry] def end: UIO[Any] = - for { - nanos <- currentNanos - context <- currentContext.get - span = Span.fromContext(context) - } yield span.end(nanos, TimeUnit.NANOSECONDS) - - override private[opentelemetry] def getTracer: UIO[Tracer] = - UIO.succeed(tracer) - - val currentContext: FiberRef[Context] = defaultContext - } - val tracing: URIO[Clock, Service] = for { - clock <- ZIO.access[Clock](_.get) - defaultContext <- FiberRef.make[Context](Context.root()) - } yield new Live(defaultContext, clock) + clock <- ZIO.access[Clock](_.get) + contextRef <- FiberRef.make[Context](Context.root()) + } yield new Live(tracer, ContextStorage.fiberRef(contextRef), clock) ZManaged.make(tracing)(_.end) } - def live: URLayer[Clock with Has[Tracer], Tracing] = ZLayer.fromManaged( - ZIO.access[Has[Tracer]](_.get).toManaged_.flatMap(managed) - ) + def live: URLayer[Clock with Has[Tracer], Tracing] = + ZLayer.fromManaged(ZIO.service[Tracer].toManaged_.flatMap(managed)) + + def propagating: URLayer[Clock with Has[Tracer], Tracing] = { + val tracing = for { + clock <- ZIO.access[Clock](_.get) + tracer <- ZIO.service[Tracer] + } yield new Live(tracer, ContextStorage.threadLocal, clock) + + ZLayer.fromManaged(ZManaged.make(tracing)(_.end)) + } + + private class Live(tracer: Tracer, defaultContext: ContextStorage, clock: Clock.Service) extends Service { + private def endSpan(span: Span): UIO[Unit] = currentNanos.map(span.end(_, TimeUnit.NANOSECONDS)) + + def currentNanos: UIO[Long] = clock.currentTime(TimeUnit.NANOSECONDS) + + def createRoot(spanName: String, spanKind: SpanKind): UManaged[Context] = + for { + nanoSeconds <- currentNanos.toManaged_ + span <- ZManaged.make( + UIO( + tracer + .spanBuilder(spanName) + .setNoParent() + .setSpanKind(spanKind) + .setStartTimestamp(nanoSeconds, TimeUnit.NANOSECONDS) + .startSpan() + ) + )(endSpan) + } yield span.storeInContext(Context.root()) + + def createChildOf(parent: Context, spanName: String, spanKind: SpanKind): UManaged[Context] = + for { + nanoSeconds <- currentNanos.toManaged_ + span <- ZManaged.make( + UIO( + tracer + .spanBuilder(spanName) + .setParent(parent) + .setSpanKind(spanKind) + .setStartTimestamp(nanoSeconds, TimeUnit.NANOSECONDS) + .startSpan() + ) + )(endSpan) + } yield span.storeInContext(parent) + + def createChildOfUnsafe(parent: Context, spanName: String, spanKind: SpanKind): UIO[Context] = + for { + nanoSeconds <- currentNanos + span <- + UIO( + tracer + .spanBuilder(spanName) + .setParent(parent) + .setSpanKind(spanKind) + .setStartTimestamp(nanoSeconds, TimeUnit.NANOSECONDS) + .startSpan() + ) + } yield span.storeInContext(parent) + + override private[opentelemetry] def end: UIO[Any] = + for { + nanos <- currentNanos + context <- currentContext.get + span = Span.fromContext(context) + } yield span.end(nanos, TimeUnit.NANOSECONDS) + + override private[opentelemetry] def getTracer: UIO[Tracer] = + UIO.succeed(tracer) + + val currentContext: ContextStorage = defaultContext + } + }