Skip to content

Commit

Permalink
ZIO-2.0.0-RC4 (#516)
Browse files Browse the repository at this point in the history
* chore: upgrade zio to 2.0.0-RC3. need updated zio-http dep

* chore: upgrade zio to 2.0.0-RC4, use Scope instead of ZManaged now
  • Loading branch information
vladimir-lu authored Apr 6, 2022
1 parent 1993082 commit 4b52f03
Show file tree
Hide file tree
Showing 18 changed files with 233 additions and 213 deletions.
82 changes: 40 additions & 42 deletions opencensus/src/main/scala/zio/telemetry/opencensus/Live.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@ import io.opencensus.trace.Tracer

object Live {
val live: URLayer[Tracer, Tracing.Service] =
ZLayer.fromManaged(for {
tracer <- ZManaged.service[Tracer]
tracing = FiberRef.make[Span](BlankSpan.INSTANCE).map(new Live(tracer, _))
managed <- ZManaged.acquireReleaseWith(tracing)(_.end)
} yield managed)
ZLayer.scoped(for {
tracer <- ZIO.service[Tracer]
tracing = FiberRef.make[Span](BlankSpan.INSTANCE).map(new Live(tracer, _))
live <- ZIO.acquireRelease(tracing)(_.end)
} yield live)
}

class Live(tracer: Tracer, root: FiberRef[Span]) extends Tracing.Service {
Expand All @@ -30,30 +30,28 @@ class Live(tracer: Tracer, root: FiberRef[Span]) extends Tracing.Service {
toErrorStatus: ErrorMapper[E],
attributes: Map[String, AttributeValue]
)(effect: ZIO[R, E, A]): ZIO[R, E, A] =
for {
parent <- currentSpan_.get
res <- createSpan(parent, name, kind).use(
finalizeSpanUsingEffect(
putAttributes(attributes) *> effect,
toErrorStatus
)
)
} yield res
ZIO.scoped[R] {
for {
parent <- currentSpan_.get
span <- createSpan(parent, name, kind)
res <- finalizeSpanUsingEffect(putAttributes(attributes) *> effect, toErrorStatus)(span)
} yield res
}

def root[R, E, A](
name: String,
kind: Span.Kind,
toErrorStatus: ErrorMapper[E],
attributes: Map[String, AttributeValue]
)(effect: ZIO[R, E, A]): ZIO[R, E, A] =
for {
res <- createSpan(BlankSpan.INSTANCE, name, kind).use(
finalizeSpanUsingEffect(
putAttributes(attributes) *> effect,
toErrorStatus
)
)
} yield res
ZIO.scoped[R] {
createSpan(BlankSpan.INSTANCE, name, kind).flatMap(span =>
finalizeSpanUsingEffect(
putAttributes(attributes) *> effect,
toErrorStatus
)(span)
)
}

def fromRemoteSpan[R, E, A](
remote: SpanContext,
Expand All @@ -62,21 +60,21 @@ class Live(tracer: Tracer, root: FiberRef[Span]) extends Tracing.Service {
toErrorStatus: ErrorMapper[E],
attributes: Map[String, AttributeValue]
)(effect: ZIO[R, E, A]): ZIO[R, E, A] =
for {
res <- createSpanFromRemote(remote, name, kind).use(
finalizeSpanUsingEffect(
putAttributes(attributes) *> effect,
toErrorStatus
)
)
} yield res
ZIO.scoped[R] {
createSpanFromRemote(remote, name, kind).flatMap(span =>
finalizeSpanUsingEffect(
putAttributes(attributes) *> effect,
toErrorStatus
)(span)
)
}

def putAttributes(
attributes: Map[String, AttributeValue]
): ZIO[Any, Nothing, Unit] =
for {
current <- currentSpan_.get
_ <- UIO(attributes.foreach { case ((k, v)) =>
_ <- ZIO.succeed(attributes.foreach { case ((k, v)) =>
current.putAttribute(k, v)
})
} yield ()
Expand All @@ -85,29 +83,29 @@ class Live(tracer: Tracer, root: FiberRef[Span]) extends Tracing.Service {
parent: Span,
name: String,
kind: Span.Kind
): UManaged[Span] =
ZManaged.acquireReleaseWith(
UIO(
): URIO[Scope, Span] =
ZIO.acquireRelease(
ZIO.succeed(
tracer
.spanBuilderWithExplicitParent(name, parent)
.setSpanKind(kind)
.startSpan()
)
)(span => UIO(span.end))
)(span => ZIO.succeed(span.end()))

private def createSpanFromRemote(
parent: SpanContext,
name: String,
kind: Span.Kind
): UManaged[Span] =
ZManaged.acquireReleaseWith(
UIO(
): URIO[Scope, Span] =
ZIO.acquireRelease(
ZIO.succeed(
tracer
.spanBuilderWithRemoteParent(name, parent)
.setSpanKind(kind)
.startSpan()
)
)(span => UIO(span.end))
)(span => ZIO.succeed(span.end()))

private def finalizeSpanUsingEffect[R, E, A](
effect: ZIO[R, E, A],
Expand All @@ -126,13 +124,13 @@ class Live(tracer: Tracer, root: FiberRef[Span]) extends Tracing.Service {
): UIO[Unit] =
for {
current <- currentSpan
_ <- URIO(format.inject(current.getContext(), carrier, setter))
_ <- ZIO.succeed(format.inject(current.getContext, carrier, setter))
} yield ()

private[opencensus] def end: UIO[Unit] =
for {
span <- currentSpan_.get
_ <- UIO(span.end())
_ <- ZIO.succeed(span.end())
} yield ()

private def setErrorStatus[E](
Expand All @@ -142,6 +140,6 @@ class Live(tracer: Tracer, root: FiberRef[Span]) extends Tracing.Service {
): UIO[Unit] = {
val errorStatus =
cause.failureOption.flatMap(toErrorStatus.lift).getOrElse(Status.UNKNOWN)
UIO(span.setStatus(errorStatus))
ZIO.succeed(span.setStatus(errorStatus))
}
}
10 changes: 6 additions & 4 deletions opencensus/src/main/scala/zio/telemetry/opencensus/Tracing.scala
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,12 @@ object Tracing {
toErrorStatus: ErrorMapper[E] = defaultMapper[E],
attributes: Map[String, AttributeValue] = Map()
)(effect: ZIO[R, E, A]): ZIO[R with Service, E, A] =
Task(format.extract(carrier, getter)).foldZIO(
_ => root(name, kind, toErrorStatus)(effect),
remote => fromRemoteSpan(remote, name, kind, toErrorStatus, attributes)(effect)
)
ZIO
.attempt(format.extract(carrier, getter))
.foldZIO(
_ => root(name, kind, toErrorStatus)(effect),
remote => fromRemoteSpan(remote, name, kind, toErrorStatus, attributes)(effect)
)

def inject[C, R, E, A](
format: TextFormat,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,33 +2,36 @@ package zio.telemetry.opentelemetry.example

import sttp.model.Uri
import zhttp.service.server.ServerChannelFactory
import zhttp.service.{ EventLoopGroup, Server, ServerChannelFactory }
import zhttp.service.{EventLoopGroup, Server, ServerChannelFactory}
import zio.Console.printLine
import zio.config.getConfig
import zio.config.magnolia.{ descriptor, Descriptor }
import zio.config.magnolia.{Descriptor, descriptor}
import zio.config.typesafe.TypesafeConfig
import zio.telemetry.opentelemetry.Tracing
import zio.telemetry.opentelemetry.example.config.AppConfig
import zio.telemetry.opentelemetry.example.http.BackendApp
import zio.{ ZEnv, ZIO, ZIOAppDefault, ZLayer }
import zio.{ZIO, ZIOAppDefault}

object BackendServer extends ZIOAppDefault {
implicit val sttpUriDescriptor: Descriptor[Uri] =
Descriptor[String].transformOrFailLeft(Uri.parse)(_.toString)

type AppEnv = Tracing.Service with ServerChannelFactory with EventLoopGroup with AppConfig

val server =
getConfig[AppConfig].flatMap { conf =>
val port = conf.backend.host.port.getOrElse(9000)
(Server.port(port) ++ Server.app(BackendApp.routes)).make.use(_ =>
printLine(s"BackendServer started on port $port") *> ZIO.never
)
ZIO.scoped[AppEnv] {
for {
conf <- getConfig[AppConfig]
port = conf.backend.host.port.getOrElse(9000)
server = Server.port(port) ++ Server.app(BackendApp.routes)
_ <- server.make
_ <- printLine(s"BackendServer started on port $port") *> ZIO.never
} yield ()
}

val configLayer = TypesafeConfig.fromResourcePath(descriptor[AppConfig])

val appLayer: ZLayer[ZEnv with AppConfig, Throwable, Tracing.Service with ServerChannelFactory with EventLoopGroup] =
(JaegerTracer.live >>> Tracing.live) ++ ServerChannelFactory.auto ++ EventLoopGroup.auto(0)
val appLayer = (JaegerTracer.live >>> Tracing.live) ++ ServerChannelFactory.auto ++ EventLoopGroup.auto(0)

override def run =
server.provideSomeLayer(configLayer >+> appLayer).exitCode
ZIO.provideLayer(configLayer >+> appLayer)(server.exitCode)
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@ object JaegerTracer {
.service[AppConfig]
.flatMap(c =>
(for {
spanExporter <- Task(JaegerGrpcSpanExporter.builder().setEndpoint(c.get.tracer.host).build())
spanProcessor <- UIO(SimpleSpanProcessor.create(spanExporter))
tracerProvider <- UIO(SdkTracerProvider.builder().addSpanProcessor(spanProcessor).build())
openTelemetry <- UIO(OpenTelemetrySdk.builder().setTracerProvider(tracerProvider).build())
tracer <- UIO(openTelemetry.getTracer("zio.telemetry.opentelemetry.example.JaegerTracer"))
spanExporter <- ZIO.attempt(JaegerGrpcSpanExporter.builder().setEndpoint(c.get.tracer.host).build())
spanProcessor <- ZIO.succeed(SimpleSpanProcessor.create(spanExporter))
tracerProvider <- ZIO.succeed(SdkTracerProvider.builder().addSpanProcessor(spanProcessor).build())
openTelemetry <- ZIO.succeed(OpenTelemetrySdk.builder().setTracerProvider(tracerProvider).build())
tracer <- ZIO.succeed(openTelemetry.getTracer("zio.telemetry.opentelemetry.example.JaegerTracer"))
} yield tracer).toLayer
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,45 +3,46 @@ package zio.telemetry.opentelemetry.example
import sttp.capabilities.WebSockets
import sttp.capabilities.zio.ZioStreams
import sttp.client3.SttpBackend
import zio.config.getConfig
import zio.config.typesafe.TypesafeConfig
import zio.config.magnolia.{ descriptor, Descriptor }
import zio.telemetry.opentelemetry.Tracing
import zio.telemetry.opentelemetry.example.config.AppConfig
import zio.telemetry.opentelemetry.example.http.{ Client, ProxyApp }
import zio.{ ExitCode, Task, URIO, ZEnv, ZIO, ZIOAppDefault, ZLayer, ZManaged }
import sttp.client3.asynchttpclient.zio.AsyncHttpClientZioBackend
import sttp.model.Uri
import zhttp.service.{ EventLoopGroup, Server, ServerChannelFactory }
import zhttp.service.server.ServerChannelFactory
import zhttp.service.{EventLoopGroup, Server, ServerChannelFactory}
import zio.Console.printLine
import zio.config.getConfig
import zio.config.magnolia._
import zio.config.typesafe.TypesafeConfig
import zio.telemetry.opentelemetry.Tracing
import zio.telemetry.opentelemetry.example.config.AppConfig
import zio.telemetry.opentelemetry.example.http.{Client, ProxyApp}
import zio.{Task, ZIO, ZIOAppDefault, ZLayer}

object ProxyServer extends ZIOAppDefault {
implicit val sttpUriDescriptor: Descriptor[Uri] =
Descriptor[String].transformOrFailLeft(Uri.parse)(_.toString)

type AppEnv = AppConfig with Client.Service with Tracing.Service with ServerChannelFactory with EventLoopGroup

val server =
getConfig[AppConfig].flatMap { conf =>
val port = conf.proxy.host.port.getOrElse(8080)
(Server.port(port) ++ Server.app(ProxyApp.routes)).make.use(_ =>
printLine(s"ProxyServer started on port $port") *> ZIO.never
)
ZIO.scoped[AppEnv] {
for {
conf <- getConfig[AppConfig]
port = conf.proxy.host.port.getOrElse(8080)
server = Server.port(port) ++ Server.app(ProxyApp.routes)
_ <- server.make
_ <- printLine(s"ProxyServer started on port $port") *> ZIO.never
} yield ()
}

val configLayer = TypesafeConfig.fromResourcePath(descriptor[AppConfig])

val httpBackend: ZLayer[Any, Throwable, SttpBackend[Task, ZioStreams with WebSockets]] =
ZManaged.acquireReleaseWith(AsyncHttpClientZioBackend())(_.close().ignore).toLayer
ZLayer.fromAcquireRelease(AsyncHttpClientZioBackend())(_.close().ignore)

val sttp: ZLayer[AppConfig, Throwable, Client.Service] = httpBackend >>> Client.live

val appEnv: ZLayer[
ZEnv with AppConfig,
Throwable,
Client.Service with Tracing.Service with ServerChannelFactory with EventLoopGroup
] =
val appEnv =
(JaegerTracer.live >>> Tracing.live) ++ sttp ++ ServerChannelFactory.auto ++ EventLoopGroup.auto(0)

override def run: URIO[ZEnv, ExitCode] =
server.provideSomeLayer(configLayer >+> appEnv).exitCode
override def run =
ZIO.provideLayer(configLayer >+> appEnv)(server.exitCode)
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package zio.telemetry.opentelemetry.example.http
import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator
import io.opentelemetry.api.trace.{ SpanKind, StatusCode }
import io.opentelemetry.context.propagation.{ TextMapPropagator, TextMapSetter }
import zio.UIO
import zio.ZIO
import zio.telemetry.opentelemetry.Tracing.root
import zio.telemetry.opentelemetry.Tracing
import zhttp.http.{ !!, ->, /, Http, HttpApp, Method, Response }
Expand All @@ -22,7 +22,7 @@ object ProxyApp {
case Method.GET -> !! / "statuses" =>
root("/statuses", SpanKind.SERVER, errorMapper) {
for {
carrier <- UIO(mutable.Map[String, String]().empty)
carrier <- ZIO.succeed(mutable.Map[String, String]().empty)
_ <- Tracing.setAttribute("http.method", "get")
_ <- Tracing.addEvent("proxy-event")
_ <- Tracing.inject(propagator, carrier, setter)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ private[opentelemetry] object ContextPropagation {
getter: TextMapGetter[C]
): UIO[Context] =
ZIO.uninterruptible {
UIO(propagator.extract(Context.root(), carrier, getter))
ZIO.succeed(propagator.extract(Context.root(), carrier, getter))
}

/**
Expand All @@ -27,6 +27,6 @@ private[opentelemetry] object ContextPropagation {
carrier: C,
setter: TextMapSetter[C]
): URIO[Tracing.Service, Unit] =
UIO(propagator.inject(context, carrier, setter))
ZIO.succeed(propagator.inject(context, carrier, setter))

}
Loading

0 comments on commit 4b52f03

Please sign in to comment.