Skip to content

Commit

Permalink
RUM-8051: Implement Head-based sampling for network instrumentation
Browse files Browse the repository at this point in the history
  • Loading branch information
0xnm committed Jan 15, 2025
1 parent eebc972 commit 8852339
Show file tree
Hide file tree
Showing 28 changed files with 1,613 additions and 156 deletions.
21 changes: 21 additions & 0 deletions .gitlab-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,27 @@ test-pyramid:single-fit-trace:
reports:
junit: "**/build/test-results/testReleaseUnitTest/*.xml"

test-pyramid:single-fit-okhttp:
tags: [ "arch:amd64" ]
image: $CI_IMAGE_DOCKER
stage: test-pyramid
timeout: 1h
cache:
key: $CI_COMMIT_REF_SLUG
paths:
- cache/caches/
- cache/notifications/
policy: pull
script:
- rm -rf ~/.gradle/daemon/
- export DD_AGENT_HOST="$BUILDENV_HOST_IP"
- GRADLE_OPTS="-Xmx3072m" DD_TAGS="test.configuration.variant:release" ./gradlew :reliability:single-fit:okhttp:testReleaseUnitTest --stacktrace --no-daemon --build-cache --gradle-user-home cache/ -Dorg.gradle.jvmargs=-javaagent:$DD_TRACER_FOLDER/dd-java-agent.jar=$DD_COMMON_AGENT_CONFIG
artifacts:
when: always
expire_in: 1 week
reports:
junit: "**/build/test-results/testReleaseUnitTest/*.xml"

# RUN INSTRUMENTED TESTS ON MIN API (21), LATEST API (34) and MEDIAN API (28)

test-pyramid:legacy-integration-instrumented-min-api:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public void setSamplingPriority(final DDSpan span) {
final boolean priorityWasSet;

if (sampler.sample(span)) {
priorityWasSet = span.context().setSamplingPriority(com.datadog.legacy.trace.api.sampling.PrioritySampling.SAMPLER_KEEP);
priorityWasSet = span.context().setSamplingPriority(PrioritySampling.SAMPLER_KEEP);
} else {
priorityWasSet = span.context().setSamplingPriority(PrioritySampling.SAMPLER_DROP);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,9 @@ public void inject(final DDSpanContext context, final TextMapInject carrier) {
// adding the tags
carrier.put(DATADOG_TAGS_KEY, MOST_SIGNIFICANT_TRACE_ID_KEY + "=" + mostSignificantTraceId);


// always use max sampling priority for Android traces
carrier.put(SAMPLING_PRIORITY_KEY, "1");
if (context.lockSamplingPriority()) {
carrier.put(SAMPLING_PRIORITY_KEY, String.valueOf(context.getSamplingPriority()));
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import com.datadog.android.event.NoOpEventMapper
import com.datadog.android.trace.internal.domain.event.ContextAwareMapper
import com.datadog.android.trace.internal.storage.ContextAwareSerializer
import com.datadog.android.trace.model.SpanEvent
import com.datadog.legacy.trace.api.sampling.PrioritySampling
import com.datadog.trace.common.writer.Writer
import com.datadog.trace.core.DDSpan
import java.util.Locale
Expand All @@ -41,10 +42,14 @@ internal class OtelTraceWriter(
sdkCore.getFeature(Feature.TRACING_FEATURE_NAME)
?.withWriteContext { datadogContext, eventBatchWriter ->
// TODO RUM-4092 Add the capability in the serializer to handle multiple spans in one payload
trace.forEach { span ->
@Suppress("ThreadSafety") // called in the worker context
writeSpan(datadogContext, eventBatchWriter, span)
}
trace
.filter {
it.samplingPriority() !in DROP_SAMPLING_PRIORITIES
}
.forEach { span ->
@Suppress("ThreadSafety") // called in the worker context
writeSpan(datadogContext, eventBatchWriter, span)
}
}
}

Expand Down Expand Up @@ -90,5 +95,6 @@ internal class OtelTraceWriter(

companion object {
internal const val ERROR_SERIALIZING = "Error serializing %s model"
internal val DROP_SAMPLING_PRIORITIES = setOf(PrioritySampling.SAMPLER_DROP, PrioritySampling.USER_DROP)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import com.datadog.android.event.EventMapper
import com.datadog.android.trace.internal.domain.event.ContextAwareMapper
import com.datadog.android.trace.internal.storage.ContextAwareSerializer
import com.datadog.android.trace.model.SpanEvent
import com.datadog.legacy.trace.api.sampling.PrioritySampling
import com.datadog.legacy.trace.common.writer.Writer
import com.datadog.opentracing.DDSpan
import java.util.Locale
Expand All @@ -39,9 +40,13 @@ internal class TraceWriter(
if (trace == null) return
sdkCore.getFeature(Feature.TRACING_FEATURE_NAME)
?.withWriteContext { datadogContext, eventBatchWriter ->
trace.forEach { span ->
writeSpan(datadogContext, eventBatchWriter, span)
}
trace
.filter {
it.samplingPriority == null || it.samplingPriority !in DROP_SAMPLING_PRIORITIES
}
.forEach { span ->
writeSpan(datadogContext, eventBatchWriter, span)
}
}
}

Expand Down Expand Up @@ -86,5 +91,6 @@ internal class TraceWriter(

companion object {
internal const val ERROR_SERIALIZING = "Error serializing %s model"
internal val DROP_SAMPLING_PRIORITIES = setOf(PrioritySampling.SAMPLER_DROP, PrioritySampling.USER_DROP)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,9 @@ internal class OtelTraceWriterTest {
@Test
fun `M write spans W write()`(forge: Forge) {
// GIVEN
val ddSpans = forge.aList { getForgery<DDSpan>() }.toMutableList()
val ddSpans = forge.aList { getForgery<DDSpan>() }
.filter { it.samplingPriority() !in OtelTraceWriter.DROP_SAMPLING_PRIORITIES }
.toMutableList()
val spanEvents = ddSpans.map { forge.getForgery<SpanEvent>() }
val serializedSpans = ddSpans.map { forge.aString() }

Expand All @@ -131,10 +133,30 @@ internal class OtelTraceWriterTest {
verifyNoMoreInteractions(mockEventBatchWriter)
}

@Test
fun `M not write spans with drop sampling priority W write()`(forge: Forge) {
// GIVEN
val ddSpans = forge.aList { getForgery<DDSpan>() }
.filter { it.samplingPriority() in OtelTraceWriter.DROP_SAMPLING_PRIORITIES }
.toMutableList()

// WHEN
testedWriter.write(ddSpans)

// THEN
verifyNoInteractions(mockEventBatchWriter)

ddSpans.forEach {
it.finish()
}
}

@Test
fun `M not write non-mapped spans W write()`(forge: Forge) {
// GIVEN
val ddSpans = forge.aList { getForgery<DDSpan>() }.toMutableList()
val ddSpans = forge.aList { getForgery<DDSpan>() }
.filter { it.samplingPriority() !in OtelTraceWriter.DROP_SAMPLING_PRIORITIES }
.toMutableList()
val spanEvents = ddSpans.map { forge.getForgery<SpanEvent>() }
val mappedEvents = spanEvents.map { forge.aNullable { it } }

Expand Down Expand Up @@ -168,7 +190,9 @@ internal class OtelTraceWriterTest {
@Test
fun `M not write non-serialized spans W write()`(forge: Forge) {
// GIVEN
val ddSpans = forge.aList { getForgery<DDSpan>() }.toMutableList()
val ddSpans = forge.aList { getForgery<DDSpan>() }
.filter { it.samplingPriority() !in OtelTraceWriter.DROP_SAMPLING_PRIORITIES }
.toMutableList()
val spanEvents = ddSpans.map { forge.getForgery<SpanEvent>() }

val serializedSpans = spanEvents.map { forge.aNullable { aString() } }
Expand Down Expand Up @@ -213,7 +237,9 @@ internal class OtelTraceWriterTest {
@Test
fun `M log error and proceed W write() { serialization failed }`(forge: Forge) {
// GIVEN
val ddSpans = forge.aList { getForgery<DDSpan>() }.toMutableList()
val ddSpans = forge.aList { getForgery<DDSpan>() }
.filter { it.samplingPriority() !in OtelTraceWriter.DROP_SAMPLING_PRIORITIES }
.toMutableList()
val spanEvents = ddSpans.map { forge.getForgery<SpanEvent>() }
val serializedSpans = ddSpans.map { forge.aString() }

Expand Down Expand Up @@ -261,7 +287,9 @@ internal class OtelTraceWriterTest {
@Test
fun `M request event write context once W write()`(forge: Forge) {
// GIVEN
val ddSpans = forge.aList { getForgery<DDSpan>() }.toMutableList()
val ddSpans = forge.aList { getForgery<DDSpan>() }
.filter { it.samplingPriority() !in OtelTraceWriter.DROP_SAMPLING_PRIORITIES }
.toMutableList()

// WHEN
testedWriter.write(ddSpans)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,9 @@ internal class TraceWriterTest {
@Test
fun `M write spans W write()`(forge: Forge) {
// GIVEN
val ddSpans = forge.aList { getForgery<DDSpan>() }.toMutableList()
val ddSpans = forge.aList { getForgery<DDSpan>() }
.filter { it.samplingPriority !in TraceWriter.DROP_SAMPLING_PRIORITIES }
.toMutableList()
val spanEvents = ddSpans.map { forge.getForgery<SpanEvent>() }
val serializedSpans = ddSpans.map { forge.aString() }

Expand Down Expand Up @@ -138,11 +140,32 @@ internal class TraceWriterTest {
}
}

@Test
fun `M not write spans with drop sampling priority W write() { drop sampling decision }`(forge: Forge) {
// GIVEN
val ddSpans = forge.aList { getForgery<DDSpan>() }
.filter { it.samplingPriority in TraceWriter.DROP_SAMPLING_PRIORITIES }
.toMutableList()

// WHEN
testedWriter.write(ddSpans)

// THEN
verifyNoInteractions(mockEventBatchWriter)

ddSpans.forEach {
it.finish()
}
}

@Test
fun `M not write non-mapped spans W write()`(forge: Forge) {
// GIVEN
val ddSpans = forge.aList { getForgery<DDSpan>() }.toMutableList()
val spanEvents = ddSpans.map { forge.getForgery<SpanEvent>() }
val ddSpans = forge.aList { getForgery<DDSpan>() }
.filter { it.samplingPriority !in TraceWriter.DROP_SAMPLING_PRIORITIES }
.toMutableList()
val spanEvents = ddSpans
.map { forge.getForgery<SpanEvent>() }
val mappedEvents = spanEvents.map { forge.aNullable { it } }

val serializedSpans = mappedEvents.filterNotNull().map { forge.aString() }
Expand Down Expand Up @@ -182,7 +205,9 @@ internal class TraceWriterTest {
@Test
fun `M not write non-serialized spans W write()`(forge: Forge) {
// GIVEN
val ddSpans = forge.aList { getForgery<DDSpan>() }.toMutableList()
val ddSpans = forge.aList { getForgery<DDSpan>() }
.filter { it.samplingPriority !in TraceWriter.DROP_SAMPLING_PRIORITIES }
.toMutableList()
val spanEvents = ddSpans.map { forge.getForgery<SpanEvent>() }

val serializedSpans = spanEvents.map { forge.aNullable { aString() } }
Expand Down Expand Up @@ -234,7 +259,9 @@ internal class TraceWriterTest {
@Test
fun `M log error and proceed W write() { serialization failed }`(forge: Forge) {
// GIVEN
val ddSpans = forge.aList { getForgery<DDSpan>() }.toMutableList()
val ddSpans = forge.aList { getForgery<DDSpan>() }
.filter { it.samplingPriority !in TraceWriter.DROP_SAMPLING_PRIORITIES }
.toMutableList()
val spanEvents = ddSpans.map { forge.getForgery<SpanEvent>() }
val serializedSpans = ddSpans.map { forge.aString() }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
package com.datadog.android.utils.forge

import com.datadog.trace.api.DD128bTraceId
import com.datadog.trace.api.sampling.PrioritySampling
import com.datadog.trace.bootstrap.instrumentation.api.AgentSpanLink
import com.datadog.trace.core.DDSpan
import com.datadog.trace.core.DDSpanContext
Expand All @@ -31,7 +32,13 @@ internal class CoreDDSpanForgeryFactory : ForgeryFactory<DDSpan> {
val lowOrderTraceId = forge.aLong(min = 0)
val spanId = forge.aLong(min = 1)
val parentId = forge.aLong(min = 1)
val samplingPriority = forge.anInt()
val samplingPriority = forge.anElementFrom(
PrioritySampling.UNSET,
PrioritySampling.SAMPLER_DROP,
PrioritySampling.USER_DROP,
PrioritySampling.SAMPLER_KEEP,
PrioritySampling.USER_KEEP
).toInt()
val tagsAndMetrics = tags + metrics
val mockSpanContext: DDSpanContext = mock {
whenever(it.baggageItems).thenReturn(baggageItems)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

package com.datadog.android.utils.forge

import com.datadog.legacy.trace.api.sampling.PrioritySampling
import com.datadog.opentracing.DDSpanContext
import fr.xgouchet.elmyr.Forge
import fr.xgouchet.elmyr.ForgeryFactory
Expand All @@ -26,7 +27,13 @@ internal class DDSpanContextForgeryFactory : ForgeryFactory<DDSpanContext> {
val serviceName = forge.anAlphabeticalString()
val spanType = forge.anAlphabeticalString()
val origin = forge.anAlphabeticalString()
val samplingPriority = forge.anInt()
val samplingPriority = forge.anElementFrom(
PrioritySampling.UNSET,
PrioritySampling.SAMPLER_DROP,
PrioritySampling.USER_DROP,
PrioritySampling.SAMPLER_KEEP,
PrioritySampling.USER_KEEP
)
val baggageItems = forge.aMap(size = forge.anInt(min = 0, max = 10)) {
anAlphabeticalString() to anAlphabeticalString()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
package com.datadog.android.utils.forge

import com.datadog.legacy.trace.api.Config
import com.datadog.legacy.trace.api.sampling.PrioritySampling
import com.datadog.opentracing.DDSpan
import com.datadog.opentracing.DDTracer
import com.datadog.opentracing.DDTracer.DDSpanBuilder
Expand Down Expand Up @@ -34,6 +35,15 @@ internal class SpanForgeryFactory : ForgeryFactory<DDSpan> {
isWithErrorFlag,
tags
).start() as DDSpan
if (forge.aBool()) {
span.samplingPriority = forge.anElementFrom(
PrioritySampling.UNSET,
PrioritySampling.SAMPLER_DROP,
PrioritySampling.USER_DROP,
PrioritySampling.SAMPLER_KEEP,
PrioritySampling.USER_KEEP
)
}
metrics.forEach {
span.context().setMetric(it.key, it.value)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package com.datadog.opentracing.propagation

import com.datadog.android.trace.internal.domain.event.BigIntegerUtils
import com.datadog.android.utils.forge.Configurator
import com.datadog.legacy.trace.api.sampling.PrioritySampling
import com.datadog.opentracing.DDSpanContext
import com.datadog.tools.unit.setFieldValue
import fr.xgouchet.elmyr.Forge
Expand Down Expand Up @@ -89,7 +90,14 @@ internal class DatadogHttpCodecTest {
headers[DatadogHttpCodec.LEAST_SIGNIFICANT_TRACE_ID_KEY]
).isEqualTo(fakeLeastSignificant64BitsTraceId)
assertThat(headers[DatadogHttpCodec.DATADOG_TAGS_KEY]).isEqualTo(expectedInjectedTags())
assertThat(headers[DatadogHttpCodec.SAMPLING_PRIORITY_KEY]).isEqualTo("1")
assertThat(headers[DatadogHttpCodec.SAMPLING_PRIORITY_KEY])
.let {
if (fakeDDSpanContext.samplingPriority != PrioritySampling.UNSET) {
it.isEqualTo(fakeDDSpanContext.samplingPriority.toString())
} else {
it.isNull()
}
}
assertThat(headers[DatadogHttpCodec.SPAN_ID_KEY])
.isEqualTo(fakeDDSpanContext.spanId.toString())
fakeDDSpanContext.baggageItems.forEach { (key, value) ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ package com.datadog.android.okhttp.otel

import com.datadog.android.okhttp.TraceContext
import com.datadog.legacy.trace.api.sampling.PrioritySampling
import com.datadog.opentelemetry.trace.OtelSpan
import com.datadog.trace.core.DDSpanContext
import io.opentelemetry.api.trace.Span
import okhttp3.Request

Expand All @@ -17,11 +19,23 @@ import okhttp3.Request
* @return the modified Request.Builder instance
*/
fun Request.Builder.addParentSpan(span: Span): Request.Builder {
val context = span.spanContext
// because the DatadogInterceptor is using the old legacy DDTracer for handling the spans around request
// we need to provide the legacy sampling codes here.
val prioritySampling = if (context.isSampled) PrioritySampling.USER_KEEP else PrioritySampling.UNSET
@Suppress("UnsafeThirdPartyFunctionCall") // the context will always be a TraceContext
tag(TraceContext::class.java, TraceContext(context.traceId, context.spanId, prioritySampling))
// very fragile and assumes that Datadog Tracer is used
// we need to trigger sampling decision at this point, because we are doing context propagation out of OpenTelemetry
if (span is OtelSpan) {
val agentSpanContext = span.agentSpanContext
if (agentSpanContext is DDSpanContext) {
agentSpanContext.trace.setSamplingPriorityIfNecessary()
}
@Suppress("UnsafeThirdPartyFunctionCall") // the context will always be a TraceContext
tag(
TraceContext::class.java,
TraceContext(span.spanContext.traceId, span.spanContext.spanId, agentSpanContext.samplingPriority)
)
} else {
val context = span.spanContext
val prioritySampling = if (context.isSampled) PrioritySampling.USER_KEEP else PrioritySampling.UNSET
@Suppress("UnsafeThirdPartyFunctionCall") // the context will always be a TraceContext
tag(TraceContext::class.java, TraceContext(context.traceId, context.spanId, prioritySampling))
}
return this
}
Loading

0 comments on commit 8852339

Please sign in to comment.