Skip to content

Commit

Permalink
finagle/finagle-zipkin-core: Introduce local tracing implementation w…
Browse files Browse the repository at this point in the history
…ith duration filter and sampling

This phab introduces `DurationFilteringTracer`, which can be used to collect traces locally and write them to a file, with a configured sample rate and span duration threshold. The implementation converts Finagle spans to the zipkin2 json format, so that they can be uploaded to the Zipkin ui or Grafana for analysis. For duration filtering, we need to keep sampled spans in memory until either a parent span completes with a duration > than the configured threshold, or the topmost span of the trace is received. I use a Caffeine cache for this, so that we won't run out of memory if the sampling rate is set too high, or if there are orphaned spans.

Differential Revision: https://phabricator.twitter.biz/D1182242
  • Loading branch information
Tristan Pollitt authored and jenkins committed Nov 21, 2024
1 parent 49feab7 commit d6872e1
Show file tree
Hide file tree
Showing 4 changed files with 299 additions and 1 deletion.
7 changes: 6 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,11 @@ val scroogeLibs = thriftLibs ++ Seq("com.twitter" %% "scrooge-core" % releaseVer

val lz4Lib = "org.lz4" % "lz4-java" % "1.8.0"

val zipkinLibs = Seq(
"io.zipkin.java" % "zipkin" % "1.28.1",
"io.zipkin.zipkin2" % "zipkin" % "2.22.1"
)

def util(which: String) =
"com.twitter" %% ("util-" + which) % releaseVersion excludeAll (ExclusionRule(organization =
"junit"),
Expand Down Expand Up @@ -482,7 +487,7 @@ lazy val finagleZipkinCore = Project(
libraryDependencies ++= Seq(
util("codec"),
util("core"),
util("stats")) ++ scroogeLibs ++ jacksonLibs
util("stats")) ++ scroogeLibs ++ jacksonLibs ++ zipkinLibs
).dependsOn(finagleCore % "compile->compile;test->test", finagleThrift)

lazy val finagleZipkinScribe = Project(
Expand Down
2 changes: 2 additions & 0 deletions finagle-zipkin-core/src/main/scala/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ scala_library(
"3rdparty/jvm/com/fasterxml/jackson/core:jackson-core",
"3rdparty/jvm/com/fasterxml/jackson/core:jackson-databind",
"3rdparty/jvm/com/fasterxml/jackson/module:jackson-module-scala",
"3rdparty/jvm/io/zipkin/java:zipkin-core",
"3rdparty/jvm/io/zipkin/java:zipkin2",
"3rdparty/jvm/org/apache/thrift:libthrift",
"finagle/finagle-core/src/main",
"finagle/finagle-thrift/src/main/java",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
package com.twitter.finagle.zipkin.core

import com.github.benmanes.caffeine.cache.Caffeine
import com.github.benmanes.caffeine.cache.RemovalCause
import com.twitter.finagle.stats.NullStatsReceiver
import com.twitter.finagle.stats.StatsReceiver
import com.twitter.finagle.tracing.Record
import com.twitter.finagle.tracing.TraceId
import com.twitter.finagle.zipkin.core.DurationFilteringTracer.BitMask
import com.twitter.finagle.zipkin.core.DurationFilteringTracer.Multiplier
import com.twitter.finagle.zipkin.core.DurationFilteringTracer.SomeFalse
import com.twitter.finagle.zipkin.core.DurationFilteringTracer.SomeTrue
import com.twitter.finagle.zipkin.core.DurationFilteringTracer.salt
import com.twitter.util.Duration
import com.twitter.util.Future
import java.io.FileOutputStream
import java.lang.ThreadLocal
import java.util.concurrent.ConcurrentMap
import org.apache.thrift.TSerializer
import scala.util.Random
import zipkin.internal.ApplyTimestampAndDuration
import zipkin.Codec
import zipkin2.codec.SpanBytesDecoder
import java.lang.{Long => JLong}
import scala.util.Using
import zipkin2.codec.SpanBytesEncoder

object DurationFilteringTracer {
// Use same sampling params here as in com.twitter.finagle.zipkin.core.Sampler
private val Multiplier = (1 << 24).toFloat
private val BitMask = (Multiplier - 1).toInt
private val salt = new Random().nextInt()

private val SomeTrue = Some(true)
private val SomeFalse = Some(false)
}

class DurationFilteringTracer(
duration: Duration,
samplingRate: Float,
outputPath: String,
maxInFlightTraces: Int = 2000,
statsReceiver: StatsReceiver = NullStatsReceiver)
extends RawZipkinTracer {

if (samplingRate < 0 || samplingRate > 1) {
throw new IllegalArgumentException(
"Sample rate not within the valid range of 0-1, was " + samplingRate
)
}

private[this] val persistedSpansCounter = statsReceiver.counter("persistedSpans")
private[this] val evictions = statsReceiver.counter("evictions")

private[this] val thriftSerialiser = ThreadLocal.withInitial(() => new TSerializer())

// map from TraceID -> spans within that trace
private[this] val spanRoots: ConcurrentMap[Long, List[zipkin.Span]] = Caffeine
.newBuilder()
.asInstanceOf[Caffeine[Long, List[zipkin.Span]]]
.maximumSize(maxInFlightTraces)
.evictionListener((_: Long, v: Seq[zipkin.Span], _: RemovalCause) => {
evictions.incr()
})
.build[Long, List[zipkin.Span]].asMap()

// sentinel value that will get set for a trace ID when we've seen at least one span in that trace
// with duration >= threshold
private[this] val durationThresholdMetSentinel = List[zipkin.Span]()

val cacheSizeGauge = statsReceiver.addGauge("cacheSize")(spanRoots.size().floatValue())

override def record(record: Record): Unit = {
if (sampleTrace(record.traceId).contains(true)) {
super.record(record)
}
}

override def sampleTrace(traceId: TraceId): Option[Boolean] = {
// Same as in com.twitter.finagle.zipkin.core.Sampler, except here we don't check if
// the traceId has already had Some(false) set, since we want to consider all traceIds
if (((JLong.hashCode(traceId.traceId.toLong) ^ salt) & BitMask) < samplingRate * Multiplier)
SomeTrue
else
SomeFalse
}

override def getSampleRate: Float = samplingRate

override def sendSpans(spans: Seq[Span]): Future[Unit] = {
spans.map(convertToZipkinSpan).foreach { span =>
if (span.duration >= duration.inMicroseconds) {
val existingSpansForTrace = spanRoots.put(span.traceId, durationThresholdMetSentinel)
persistSpans(span, existingSpansForTrace)
} else {
val existingSpansForTrace = spanRoots.compute(
span.traceId,
{
case (_, null) => List(span) // this is the first span for the trace
case (_, v) if v.eq(durationThresholdMetSentinel) =>
durationThresholdMetSentinel // duration threshold has already been met
case (_, v) =>
v.+:(span) // there are existing spans, but duration threshold not yet met
}
)

if (existingSpansForTrace.eq(durationThresholdMetSentinel)) {
persistSpans(span, List.empty)
}
}
}

Future.Done
}

override def isActivelyTracing(traceId: TraceId): Boolean = sampleTrace(traceId).contains(true)

private[this] def convertToZipkinSpan(span: Span): zipkin.Span = {
val serialisedBytes = thriftSerialiser.get().serialize(span.toThrift)
val zipkinV1ThriftSpan = zipkin.Codec.THRIFT.readSpan(serialisedBytes)
ApplyTimestampAndDuration.apply(zipkinV1ThriftSpan)
}

private[this] def persistSpans(parent: zipkin.Span, children: Seq[zipkin.Span]): Unit = {
val spansToPersist = if (children != null) children :+ parent else Seq(parent)
persistedSpansCounter.incr(spansToPersist.size)
Using(new FileOutputStream(outputPath, true)) { fileOutputStream =>
spansToPersist.foreach { span =>
val converted = convertV1SpanToV2(span)
fileOutputStream.write(
SpanBytesEncoder.JSON_V2
.encode(converted))
fileOutputStream.write('\n')
}
fileOutputStream.flush()
}
}

private[this] def convertV1SpanToV2(span: zipkin.Span): zipkin2.Span = {
val spanBytesV1 = Codec.THRIFT.writeSpan(span)
SpanBytesDecoder.THRIFT.decodeOne(spanBytesV1)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
package com.twitter.finagle.zipkin.core
package unit

import org.scalatest.FunSuite
import com.twitter.conversions.DurationOps._
import com.twitter.finagle.tracing.Trace
import com.twitter.finagle.util.DefaultTimer
import com.twitter.util.Await
import com.twitter.util.Duration
import com.twitter.util.Future
import com.twitter.util.FuturePool
import com.twitter.util.Futures
import com.twitter.util.Promise
import com.twitter.util.Return
import java.io.File
import java.util.concurrent.CountDownLatch
import java.util.concurrent.Executors
import org.scalatest.concurrent.Eventually
import org.scalatest.time.Seconds
import org.scalatest.time.Span
import scala.io.Source
import zipkin2.codec.SpanBytesDecoder

class DurationFilteringTracerSpec extends FunSuite with Eventually {

implicit val config: PatienceConfig = PatienceConfig(timeout = scaled(Span(5, Seconds)))
private val futurePool = FuturePool(Executors.newFixedThreadPool(100))
private implicit val timer = DefaultTimer

test("Test only persists spans with duration greater than 100 milliseconds") {
val tempFile = File.createTempFile("traces", ".json")
val tracer = new DurationFilteringTracer(100.millis, 1F, tempFile.getPath)

Trace.letTracers(Seq(tracer)) {
val future1 = Trace.traceLocalFuture("test") {
taskWithDuration(10.millis)
}

val future2 = Trace.traceLocalFuture("test2") {
taskWithDuration(100.millis)
}

Await.ready(Futures.join(future1, future2))
}

eventually {
val parsedSpans = parseSpansFromFile(tempFile.getPath)
assert(parsedSpans.size == 1)
assert(parsedSpans.head.name() == "test2")
}
}

test("Test persists all child spans where parent has duration greater than 100 milliseconds") {
val tempFile = File.createTempFile("traces", ".json")
val tracer = new DurationFilteringTracer(100.millis, 1F, tempFile.getPath)

Trace.letTracers(Seq(tracer)) {
val future1 = Trace.traceLocalFuture("test") {
taskWithDuration(10.millis)
}

val future2 = Trace.traceLocalFuture("test2") {
taskWithDuration(
100.millis,
childTask = Futures
.join(
Trace.traceLocalFuture("test3") {
taskWithDuration(10.millis)
},
Trace.traceLocalFuture("test4") {
taskWithDuration(10.millis)
}).unit
)
}

Await.ready(Futures.join(future1, future2))
}

eventually {
val parsedSpans = parseSpansFromFile(tempFile.getPath)
assert(parsedSpans.size == 3)
assert(parsedSpans.exists(_.name() == "test2"))
assert(parsedSpans.exists(_.name() == "test3"))
assert(parsedSpans.exists(_.name() == "test4"))
}
}

test("Keeps limited number of spans in memory") {
val tempFile = File.createTempFile("traces", ".json")
val tracer =
new DurationFilteringTracer(100.millis, 1F, tempFile.getPath, maxInFlightTraces = 99)

val latch = new CountDownLatch(100)
val barrier = Promise[Unit]()

val tasks = Trace.letTracers(Seq(tracer)) {
for (i <- 0.until(100)) yield {
Trace.traceLocalFuture(s"task$i")(futurePool { // top level tasks
Trace.traceLocalFuture(s"subtask$i")(Future()) // bottom level tasks
latch.countDown()
Await.ready(barrier)
})
}
}

latch.await()
Thread.sleep(100) // ensures tasks exceed 100ms threshold
barrier.update(Return())
Await.ready(Future.join(tasks))

eventually {
val parsedSpans = parseSpansFromFile(tempFile.getPath)
assert(parsedSpans.size < 200) // all 100 top level spans at most 99 bottom level spans
assert(parsedSpans.count(_.name().contains("subtask")) < 100)
}
}

test("Only collects limited number of spans based on sample rate") {
val tempFile = File.createTempFile("traces", ".json")

val tracer = new DurationFilteringTracer(0.millis, 0.1F, tempFile.getPath)

Trace.letTracers(Seq(tracer)) {
for (i <- 0 until 100) {
Trace.traceLocalFuture(s"task$i")(Future())
}
}

eventually {
val parsedSpans = parseSpansFromFile(tempFile.getPath)
assert(parsedSpans.size < 30)
}
}

private[this] def taskWithDuration(
duration: Duration,
childTask: => Future[Unit] = Future()
): Future[Unit] = {
Future.Unit.delayed(duration).flatMap(_ => childTask)
}

private[this] def parseSpansFromFile(filename: String): Seq[zipkin2.Span] = {
Source
.fromFile(filename)
.getLines()
.map(line => SpanBytesDecoder.JSON_V2.decodeOne(line.getBytes()))
}.toSeq
}

0 comments on commit d6872e1

Please sign in to comment.