Skip to content

Commit

Permalink
fix(telemetry): assign our own span IDs (prisma#5091)
Browse files Browse the repository at this point in the history
Our logic assumed that span IDs were unique within a trace but that's
incorrect: they can be reused as long as the spans don't overlap in time
(e.g. sequential siblings). Now we assign our own IDs which are
guaranteed to be unique within a trace as long as the trace has less
than 18446744073709551616 (2**64) spans.

`tracing` tries not to reuse them immediately so this wasn't caught in
tests until we started running tests with WASM where the issue
immediately became evident (presumably due to the lack of a high quality
source of randomness but I haven't actually investigated what the reason
behind the difference in behaviour is).
  • Loading branch information
aqrln authored Dec 17, 2024
1 parent caaf939 commit 11f085a
Show file tree
Hide file tree
Showing 9 changed files with 103 additions and 49 deletions.
13 changes: 9 additions & 4 deletions libs/telemetry/src/collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use serde::Serialize;

use crate::id::{RequestId, SpanId};
use crate::models::{LogLevel, SpanKind};
use crate::NextId;

#[derive(Debug, Clone)]
#[cfg_attr(test, derive(Serialize))]
Expand Down Expand Up @@ -37,10 +38,10 @@ pub(crate) struct SpanBuilder {
}

impl SpanBuilder {
pub fn new(name: &'static str, id: impl Into<SpanId>, attrs_size_hint: usize) -> Self {
pub fn new(name: &'static str, attrs_size_hint: usize) -> Self {
Self {
request_id: None,
id: id.into(),
id: SpanId::next(),
name: name.into(),
start_time: SystemTime::now(),
elapsed: ElapsedTimeCounter::start(),
Expand All @@ -54,6 +55,10 @@ impl SpanBuilder {
self.request_id
}

pub fn span_id(&self) -> SpanId {
self.id
}

pub fn set_request_id(&mut self, request_id: RequestId) {
self.request_id = Some(request_id);
}
Expand All @@ -74,10 +79,10 @@ impl SpanBuilder {
self.links.push(link);
}

pub fn end(self, parent_id: Option<impl Into<SpanId>>) -> CollectedSpan {
pub fn end(self, parent_id: Option<SpanId>) -> CollectedSpan {
CollectedSpan {
id: self.id,
parent_id: parent_id.map(Into::into),
parent_id,
name: self.name,
start_time: self.start_time,
duration: self.elapsed.elapsed_time(),
Expand Down
6 changes: 4 additions & 2 deletions libs/telemetry/src/exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,8 @@ impl AllowAttribute for InternalAttributesFilter {
mod tests {
use std::time::{Duration, SystemTime};

use crate::NextId;

use super::*;

use CaptureTarget::*;
Expand Down Expand Up @@ -327,7 +329,7 @@ mod tests {
let request_id = exporter.start_capturing(RequestId::next(), capture_all()).await;

let span = CollectedSpan {
id: tracing::span::Id::from_u64(1).into(),
id: SpanId::try_from(1).unwrap(),
parent_id: None,
name: "test_span".into(),
start_time: SystemTime::UNIX_EPOCH,
Expand Down Expand Up @@ -381,7 +383,7 @@ mod tests {
let request_id = exporter.start_capturing(RequestId::next(), capture_spans()).await;

let span = CollectedSpan {
id: tracing::span::Id::from_u64(1).into(),
id: SpanId::try_from(1).unwrap(),
parent_id: None,
name: "test_span".into(),
start_time: SystemTime::UNIX_EPOCH,
Expand Down
95 changes: 64 additions & 31 deletions libs/telemetry/src/id.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,13 @@ impl SerializableNonZeroU64 {
pub fn into_u64(self) -> u64 {
self.0.get()
}
}

impl TryFrom<u64> for SerializableNonZeroU64 {
type Error = u64;

pub fn from_u64(value: u64) -> Option<Self> {
NonZeroU64::new(value).map(Self)
fn try_from(value: u64) -> Result<Self, Self::Error> {
NonZeroU64::new(value).map(Self).ok_or(value)
}
}

Expand Down Expand Up @@ -66,64 +70,78 @@ impl<'de> Deserialize<'de> for SerializableNonZeroU64 {
}
}

/// A unique identifier for a span. It maps directly to [`tracing::span::Id`] assigned by
/// [`tracing_subscriber::registry::Registry`].
/// A unique identifier for a span.
///
/// We don't use the original span IDs assigned by the `tracing` `Subscriber`
/// because they are are only guaranteed to be unique among the spans active at
/// the same time. They may be reused after a span is closed (even for
/// successive sibling spans in the same trace as long as they don't overlap in
/// time), so they are ephemeral and cannot be stored. Since we do need to store
/// them and can only tolerate reuse across different traces but not in a single
/// trace, we generate our own IDs.
#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize)]
#[repr(transparent)]
pub struct SpanId(SerializableNonZeroU64);

impl From<&tracing::span::Id> for SpanId {
fn from(id: &tracing::span::Id) -> Self {
Self(SerializableNonZeroU64(id.into_non_zero_u64()))
impl From<NonZeroU64> for SpanId {
fn from(value: NonZeroU64) -> Self {
Self(SerializableNonZeroU64(value))
}
}

impl From<tracing::span::Id> for SpanId {
fn from(id: tracing::span::Id) -> Self {
Self::from(&id)
impl TryFrom<u64> for SpanId {
type Error = u64;

fn try_from(value: u64) -> Result<Self, Self::Error> {
SerializableNonZeroU64::try_from(value).map(Self)
}
}

impl NextId for SpanId {}

/// A unique identifier for an engine trace, representing a tree of spans. These
/// internal traces *do not* correspond to OpenTelemetry trace IDs. One
/// OpenTelemetry trace may contain multiple Prisma Client operations, each of
/// them leading to one or more engine requests. Since engine traces map 1:1 to
/// requests to the engine, we call these trace IDs "request IDs" to
/// disambiguate and avoid confusion.
///
/// We don't use IDs of the root spans themselves for this purpose because span
/// IDs are only guaranteed to be unique among the spans active at the same
/// time. They may be reused after a span is closed, so they are not
/// historically unique. We store the collected spans and events for some short
/// time after the spans are closed until the client requests them, so we need
/// request IDs that are guaranteed to be unique for a very long period of time
/// (although they still don't necessarily have to be unique for the whole
/// lifetime of the process).
/// We store the collected spans and events for some short time after the spans
/// are closed until the client requests them, so we need request IDs that are
/// guaranteed to be unique for a very long period of time (although they still
/// don't necessarily have to be unique for the whole lifetime of the process).
///
/// We don't use the root span IDs as the request IDs to have more flexibility
/// and allow clients to generate the request IDs on the client side, rather
/// than having us send the generated request ID back to the client. This
/// guarantees we can still fetch the traces from the engine even in a case of
/// an error and no response sent back to the client.
#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize)]
#[repr(transparent)]
pub struct RequestId(SerializableNonZeroU64);

impl RequestId {
pub fn next() -> Self {
static NEXT_ID: AtomicU64 = AtomicU64::new(1);

let mut id = 0;
while id == 0 {
id = NEXT_ID.fetch_add(1, Ordering::Relaxed);
}

Self(SerializableNonZeroU64(NonZeroU64::new(id).unwrap()))
}

pub fn into_u64(self) -> u64 {
self.0.into_u64()
}
}

pub fn from_u64(value: u64) -> Option<Self> {
SerializableNonZeroU64::from_u64(value).map(Self)
impl From<NonZeroU64> for RequestId {
fn from(value: NonZeroU64) -> Self {
Self(SerializableNonZeroU64(value))
}
}

impl TryFrom<u64> for RequestId {
type Error = u64;

fn try_from(value: u64) -> Result<Self, Self::Error> {
SerializableNonZeroU64::try_from(value).map(Self)
}
}

impl NextId for RequestId {}

impl Default for RequestId {
fn default() -> Self {
Self::next()
Expand All @@ -137,3 +155,18 @@ impl FromStr for RequestId {
SerializableNonZeroU64::from_str(s).map(Self)
}
}

/// A trait for types that represent sequential IDs and can be losslessly
/// converted from [`NonZeroU64`].
pub trait NextId: Sized + From<NonZeroU64> {
fn next() -> Self {
static NEXT_ID: AtomicU64 = AtomicU64::new(1);

let mut id = 0;
while id == 0 {
id = NEXT_ID.fetch_add(1, Ordering::Relaxed);
}

Self::from(NonZeroU64::new(id).unwrap())
}
}
28 changes: 21 additions & 7 deletions libs/telemetry/src/layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ where
{
fn on_new_span(&self, attrs: &Attributes<'_>, id: &Id, ctx: Context<'_, S>) {
let span = Self::require_span(id, &ctx);
let mut span_builder = SpanBuilder::new(span.name(), id, attrs.fields().len());
let mut span_builder = SpanBuilder::new(span.name(), attrs.fields().len());

if let Some(request_id) = span
.parent()
Expand All @@ -98,11 +98,16 @@ where
}

fn on_follows_from(&self, span: &Id, follows: &Id, ctx: Context<'_, S>) {
let followed_span = Self::require_span(follows, &ctx);
let Some(followed_id) = followed_span.extensions().get::<SpanBuilder>().map(|sb| sb.span_id()) else {
return;
};

let span = Self::require_span(span, &ctx);
let mut extensions = span.extensions_mut();

if let Some(span_builder) = extensions.get_mut::<SpanBuilder>() {
span_builder.add_link(follows.into());
span_builder.add_link(followed_id);
}
}

Expand All @@ -116,12 +121,18 @@ where
return;
};

let Some(request_id) = parent.extensions().get::<SpanBuilder>().and_then(|sb| sb.request_id()) else {
let extensions = parent.extensions();

let Some(span_builder) = extensions.get::<SpanBuilder>() else {
return;
};

let Some(request_id) = span_builder.request_id() else {
return;
};

let mut event_builder = EventBuilder::new(
parent.id().into(),
span_builder.span_id(),
event.metadata().target(),
event.metadata().level().into(),
event.metadata().fields().len(),
Expand All @@ -145,7 +156,10 @@ where
return;
};

let parent_id = span.parent().map(|parent| parent.id());
let parent_id = span
.parent()
.and_then(|parent| parent.extensions().get::<SpanBuilder>().map(|sb| sb.span_id()));

let collected_span = span_builder.end(parent_id);

self.collector.add_span(request_id, collected_span);
Expand Down Expand Up @@ -182,7 +196,7 @@ impl<Filter: AllowAttribute> field::Visit for SpanAttributeVisitor<'_, Filter> {
fn record_u64(&mut self, field: &field::Field, value: u64) {
match field.name() {
REQUEST_ID_FIELD => {
if let Some(request_id) = RequestId::from_u64(value) {
if let Ok(request_id) = RequestId::try_from(value) {
self.span_builder.set_request_id(request_id);
}
}
Expand Down Expand Up @@ -279,7 +293,7 @@ impl<Filter: AllowAttribute> field::Visit for EventAttributeVisitor<'_, Filter>
#[cfg(test)]
mod tests {
use crate::collector::{AllowAttribute, CollectedEvent, CollectedSpan};
use crate::id::RequestId;
use crate::id::{NextId, RequestId};

use super::*;

Expand Down
2 changes: 1 addition & 1 deletion libs/telemetry/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,6 @@ pub mod time;
pub mod traceparent;

pub use exporter::Exporter;
pub use id::RequestId;
pub use id::{NextId, RequestId};
pub use layer::layer;
pub use traceparent::TraceParent;
2 changes: 1 addition & 1 deletion query-engine/query-engine/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use psl::parser_database::Files;
use query_core::{protocol::EngineProtocol, schema};
use request_handlers::{dmmf, RequestBody, RequestHandler};
use std::{env, sync::Arc};
use telemetry::RequestId;
use telemetry::{NextId, RequestId};

pub struct ExecuteRequest {
query: String,
Expand Down
2 changes: 1 addition & 1 deletion query-engine/query-engine/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use query_core::{
use request_handlers::{load_executor, ConnectorKind};
use std::{env, fmt, sync::Arc};
use telemetry::exporter::{CaptureSettings, CaptureTarget};
use telemetry::RequestId;
use telemetry::{NextId, RequestId};
use tracing::Instrument;

/// Prisma request context containing all immutable state of the process.
Expand Down
2 changes: 1 addition & 1 deletion query-engine/query-engine/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use std::net::SocketAddr;
use std::sync::Arc;
use std::time::{Duration, Instant};
use telemetry::exporter::{CaptureSettings, CaptureTarget, TraceData};
use telemetry::{RequestId, TraceParent};
use telemetry::{NextId, RequestId, TraceParent};
use tracing::{Instrument, Span};

/// Starts up the graphql query engine server
Expand Down
2 changes: 1 addition & 1 deletion query-engine/query-engine/src/tests/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use enumflags2::make_bitflags;
use indoc::{formatdoc, indoc};
use query_core::protocol::EngineProtocol;
use serde_json::json;
use telemetry::RequestId;
use telemetry::{NextId, RequestId};

#[tokio::test]
async fn connection_string_problems_give_a_nice_error() {
Expand Down

0 comments on commit 11f085a

Please sign in to comment.