diff --git a/build.sbt b/build.sbt index 9b046cdb416..593087b68f3 100644 --- a/build.sbt +++ b/build.sbt @@ -82,6 +82,11 @@ val scroogeLibs = thriftLibs ++ Seq("com.twitter" %% "scrooge-core" % releaseVer val lz4Lib = "org.lz4" % "lz4-java" % "1.8.0" +val zipkinLibs = Seq( + "io.zipkin.java" % "zipkin" % "1.28.1", + "io.zipkin.zipkin2" % "zipkin" % "2.22.1" +) + def util(which: String) = "com.twitter" %% ("util-" + which) % releaseVersion excludeAll (ExclusionRule(organization = "junit"), @@ -482,7 +487,7 @@ lazy val finagleZipkinCore = Project( libraryDependencies ++= Seq( util("codec"), util("core"), - util("stats")) ++ scroogeLibs ++ jacksonLibs + util("stats")) ++ scroogeLibs ++ jacksonLibs ++ zipkinLibs ).dependsOn(finagleCore % "compile->compile;test->test", finagleThrift) lazy val finagleZipkinScribe = Project( diff --git a/finagle-zipkin-core/src/main/scala/BUILD b/finagle-zipkin-core/src/main/scala/BUILD index 5da8bb99959..fe0491bc504 100644 --- a/finagle-zipkin-core/src/main/scala/BUILD +++ b/finagle-zipkin-core/src/main/scala/BUILD @@ -12,6 +12,8 @@ scala_library( "3rdparty/jvm/com/fasterxml/jackson/core:jackson-core", "3rdparty/jvm/com/fasterxml/jackson/core:jackson-databind", "3rdparty/jvm/com/fasterxml/jackson/module:jackson-module-scala", + "3rdparty/jvm/io/zipkin/java:zipkin-core", + "3rdparty/jvm/io/zipkin/java:zipkin2", "3rdparty/jvm/org/apache/thrift:libthrift", "finagle/finagle-core/src/main", "finagle/finagle-thrift/src/main/java", diff --git a/finagle-zipkin-core/src/main/scala/com/twitter/finagle/zipkin/core/DurationFilteringTracer.scala b/finagle-zipkin-core/src/main/scala/com/twitter/finagle/zipkin/core/DurationFilteringTracer.scala new file mode 100644 index 00000000000..10d6a641059 --- /dev/null +++ b/finagle-zipkin-core/src/main/scala/com/twitter/finagle/zipkin/core/DurationFilteringTracer.scala @@ -0,0 +1,143 @@ +package com.twitter.finagle.zipkin.core + +import com.github.benmanes.caffeine.cache.Caffeine +import com.github.benmanes.caffeine.cache.RemovalCause +import com.twitter.finagle.stats.NullStatsReceiver +import com.twitter.finagle.stats.StatsReceiver +import com.twitter.finagle.tracing.Record +import com.twitter.finagle.tracing.TraceId +import com.twitter.finagle.zipkin.core.DurationFilteringTracer.BitMask +import com.twitter.finagle.zipkin.core.DurationFilteringTracer.Multiplier +import com.twitter.finagle.zipkin.core.DurationFilteringTracer.SomeFalse +import com.twitter.finagle.zipkin.core.DurationFilteringTracer.SomeTrue +import com.twitter.finagle.zipkin.core.DurationFilteringTracer.salt +import com.twitter.util.Duration +import com.twitter.util.Future +import java.io.FileOutputStream +import java.lang.ThreadLocal +import java.util.concurrent.ConcurrentMap +import org.apache.thrift.TSerializer +import scala.util.Random +import zipkin.internal.ApplyTimestampAndDuration +import zipkin.Codec +import zipkin2.codec.SpanBytesDecoder +import java.lang.{Long => JLong} +import scala.util.Using +import zipkin2.codec.SpanBytesEncoder + +object DurationFilteringTracer { + // Use same sampling params here as in com.twitter.finagle.zipkin.core.Sampler + private val Multiplier = (1 << 24).toFloat + private val BitMask = (Multiplier - 1).toInt + private val salt = new Random().nextInt() + + private val SomeTrue = Some(true) + private val SomeFalse = Some(false) +} + +class DurationFilteringTracer( + duration: Duration, + samplingRate: Float, + outputPath: String, + maxInFlightTraces: Int = 2000, + statsReceiver: StatsReceiver = NullStatsReceiver) + extends RawZipkinTracer { + + if (samplingRate < 0 || samplingRate > 1) { + throw new IllegalArgumentException( + "Sample rate not within the valid range of 0-1, was " + samplingRate + ) + } + + private[this] val persistedSpansCounter = statsReceiver.counter("persistedSpans") + private[this] val evictions = statsReceiver.counter("evictions") + + private[this] val thriftSerialiser = ThreadLocal.withInitial(() => new TSerializer()) + + // map from TraceID -> spans within that trace + private[this] val spanRoots: ConcurrentMap[Long, List[zipkin.Span]] = Caffeine + .newBuilder() + .asInstanceOf[Caffeine[Long, List[zipkin.Span]]] + .maximumSize(maxInFlightTraces) + .evictionListener((_: Long, v: Seq[zipkin.Span], _: RemovalCause) => { + evictions.incr() + }) + .build[Long, List[zipkin.Span]].asMap() + + // sentinel value that will get set for a trace ID when we've seen at least one span in that trace + // with duration >= threshold + private[this] val durationThresholdMetSentinel = List[zipkin.Span]() + + val cacheSizeGauge = statsReceiver.addGauge("cacheSize")(spanRoots.size().floatValue()) + + override def record(record: Record): Unit = { + if (sampleTrace(record.traceId).contains(true)) { + super.record(record) + } + } + + override def sampleTrace(traceId: TraceId): Option[Boolean] = { + // Same as in com.twitter.finagle.zipkin.core.Sampler, except here we don't check if + // the traceId has already had Some(false) set, since we want to consider all traceIds + if (((JLong.hashCode(traceId.traceId.toLong) ^ salt) & BitMask) < samplingRate * Multiplier) + SomeTrue + else + SomeFalse + } + + override def getSampleRate: Float = samplingRate + + override def sendSpans(spans: Seq[Span]): Future[Unit] = { + spans.map(convertToZipkinSpan).foreach { span => + if (span.duration >= duration.inMicroseconds) { + val existingSpansForTrace = spanRoots.put(span.traceId, durationThresholdMetSentinel) + persistSpans(span, existingSpansForTrace) + } else { + val existingSpansForTrace = spanRoots.compute( + span.traceId, + { + case (_, null) => List(span) // this is the first span for the trace + case (_, v) if v.eq(durationThresholdMetSentinel) => + durationThresholdMetSentinel // duration threshold has already been met + case (_, v) => + v.+:(span) // there are existing spans, but duration threshold not yet met + } + ) + + if (existingSpansForTrace.eq(durationThresholdMetSentinel)) { + persistSpans(span, List.empty) + } + } + } + + Future.Done + } + + override def isActivelyTracing(traceId: TraceId): Boolean = sampleTrace(traceId).contains(true) + + private[this] def convertToZipkinSpan(span: Span): zipkin.Span = { + val serialisedBytes = thriftSerialiser.get().serialize(span.toThrift) + val zipkinV1ThriftSpan = zipkin.Codec.THRIFT.readSpan(serialisedBytes) + ApplyTimestampAndDuration.apply(zipkinV1ThriftSpan) + } + + private[this] def persistSpans(parent: zipkin.Span, children: Seq[zipkin.Span]): Unit = { + val spansToPersist = if (children != null) children :+ parent else Seq(parent) + persistedSpansCounter.incr(spansToPersist.size) + Using(new FileOutputStream(outputPath, true)) { fileOutputStream => + spansToPersist.foreach { span => + val converted = convertV1SpanToV2(span) + fileOutputStream.write( + SpanBytesEncoder.JSON_V2 + .encode(converted)) + fileOutputStream.write('\n') + } + fileOutputStream.flush() + } + } + + private[this] def convertV1SpanToV2(span: zipkin.Span): zipkin2.Span = { + val spanBytesV1 = Codec.THRIFT.writeSpan(span) + SpanBytesDecoder.THRIFT.decodeOne(spanBytesV1) + } +} diff --git a/finagle-zipkin-core/src/test/scala/com/twitter/finagle/zipkin/core/DurationFilteringTracerSpec.scala b/finagle-zipkin-core/src/test/scala/com/twitter/finagle/zipkin/core/DurationFilteringTracerSpec.scala new file mode 100644 index 00000000000..056aa85dceb --- /dev/null +++ b/finagle-zipkin-core/src/test/scala/com/twitter/finagle/zipkin/core/DurationFilteringTracerSpec.scala @@ -0,0 +1,148 @@ +package com.twitter.finagle.zipkin.core +package unit + +import org.scalatest.FunSuite +import com.twitter.conversions.DurationOps._ +import com.twitter.finagle.tracing.Trace +import com.twitter.finagle.util.DefaultTimer +import com.twitter.util.Await +import com.twitter.util.Duration +import com.twitter.util.Future +import com.twitter.util.FuturePool +import com.twitter.util.Futures +import com.twitter.util.Promise +import com.twitter.util.Return +import java.io.File +import java.util.concurrent.CountDownLatch +import java.util.concurrent.Executors +import org.scalatest.concurrent.Eventually +import org.scalatest.time.Seconds +import org.scalatest.time.Span +import scala.io.Source +import zipkin2.codec.SpanBytesDecoder + +class DurationFilteringTracerSpec extends FunSuite with Eventually { + + implicit val config: PatienceConfig = PatienceConfig(timeout = scaled(Span(5, Seconds))) + private val futurePool = FuturePool(Executors.newFixedThreadPool(100)) + private implicit val timer = DefaultTimer + + test("Test only persists spans with duration greater than 100 milliseconds") { + val tempFile = File.createTempFile("traces", ".json") + val tracer = new DurationFilteringTracer(100.millis, 1F, tempFile.getPath) + + Trace.letTracers(Seq(tracer)) { + val future1 = Trace.traceLocalFuture("test") { + taskWithDuration(10.millis) + } + + val future2 = Trace.traceLocalFuture("test2") { + taskWithDuration(100.millis) + } + + Await.ready(Futures.join(future1, future2)) + } + + eventually { + val parsedSpans = parseSpansFromFile(tempFile.getPath) + assert(parsedSpans.size == 1) + assert(parsedSpans.head.name() == "test2") + } + } + + test("Test persists all child spans where parent has duration greater than 100 milliseconds") { + val tempFile = File.createTempFile("traces", ".json") + val tracer = new DurationFilteringTracer(100.millis, 1F, tempFile.getPath) + + Trace.letTracers(Seq(tracer)) { + val future1 = Trace.traceLocalFuture("test") { + taskWithDuration(10.millis) + } + + val future2 = Trace.traceLocalFuture("test2") { + taskWithDuration( + 100.millis, + childTask = Futures + .join( + Trace.traceLocalFuture("test3") { + taskWithDuration(10.millis) + }, + Trace.traceLocalFuture("test4") { + taskWithDuration(10.millis) + }).unit + ) + } + + Await.ready(Futures.join(future1, future2)) + } + + eventually { + val parsedSpans = parseSpansFromFile(tempFile.getPath) + assert(parsedSpans.size == 3) + assert(parsedSpans.exists(_.name() == "test2")) + assert(parsedSpans.exists(_.name() == "test3")) + assert(parsedSpans.exists(_.name() == "test4")) + } + } + + test("Keeps limited number of spans in memory") { + val tempFile = File.createTempFile("traces", ".json") + val tracer = + new DurationFilteringTracer(100.millis, 1F, tempFile.getPath, maxInFlightTraces = 99) + + val latch = new CountDownLatch(100) + val barrier = Promise[Unit]() + + val tasks = Trace.letTracers(Seq(tracer)) { + for (i <- 0.until(100)) yield { + Trace.traceLocalFuture(s"task$i")(futurePool { // top level tasks + Trace.traceLocalFuture(s"subtask$i")(Future()) // bottom level tasks + latch.countDown() + Await.ready(barrier) + }) + } + } + + latch.await() + Thread.sleep(100) // ensures tasks exceed 100ms threshold + barrier.update(Return()) + Await.ready(Future.join(tasks)) + + eventually { + val parsedSpans = parseSpansFromFile(tempFile.getPath) + assert(parsedSpans.size < 200) // all 100 top level spans at most 99 bottom level spans + assert(parsedSpans.count(_.name().contains("subtask")) < 100) + } + } + + test("Only collects limited number of spans based on sample rate") { + val tempFile = File.createTempFile("traces", ".json") + + val tracer = new DurationFilteringTracer(0.millis, 0.1F, tempFile.getPath) + + Trace.letTracers(Seq(tracer)) { + for (i <- 0 until 100) { + Trace.traceLocalFuture(s"task$i")(Future()) + } + } + + eventually { + val parsedSpans = parseSpansFromFile(tempFile.getPath) + assert(parsedSpans.size < 30) + } + } + + private[this] def taskWithDuration( + duration: Duration, + childTask: => Future[Unit] = Future() + ): Future[Unit] = { + Future.Unit.delayed(duration).flatMap(_ => childTask) + } + + private[this] def parseSpansFromFile(filename: String): Seq[zipkin2.Span] = { + Source + .fromFile(filename) + .getLines() + .map(line => SpanBytesDecoder.JSON_V2.decodeOne(line.getBytes())) + }.toSeq +}