Skip to content

Commit

Permalink
Work on a copy of spans before sending to otel collector
Browse files Browse the repository at this point in the history
To avoid locking spans with an exclusive lock while sending the json
payload to the collector, we create a copy of both spans and spans_str
and immediately release the lock.

From the copy, we marshal the json payload to be sent and attempt to
send it. We keep this payload until send is successful.
  • Loading branch information
bonnefoa committed Jul 16, 2024
1 parent a042d16 commit b3231a4
Show file tree
Hide file tree
Showing 5 changed files with 125 additions and 51 deletions.
7 changes: 6 additions & 1 deletion src/pg_tracing.h
Original file line number Diff line number Diff line change
Expand Up @@ -330,8 +330,10 @@ typedef struct TracedPlanstate
typedef struct JsonContext
{
StringInfo str;
int num_spans;
int span_type_count[NUM_SPAN_TYPE];
const Span **span_type_to_spans[NUM_SPAN_TYPE];
const char *spans_str;
} JsonContext;

typedef struct SpanContext
Expand Down Expand Up @@ -439,7 +441,10 @@ extern int
extern void
marshal_spans_to_json(JsonContext * json_ctx);
extern void
build_json_context(JsonContext * json_ctx, const pgTracingSpans * pgTracingSpans);
build_json_context(JsonContext * json_ctx,
const pgTracingSpans * pgTracingSpans,
const char *spans_str,
int num_spans);

/* pg_tracing_operation_hash.c */
extern void
Expand Down
8 changes: 5 additions & 3 deletions src/pg_tracing_json.c
Original file line number Diff line number Diff line change
Expand Up @@ -271,11 +271,11 @@ append_span_attributes(const JsonContext * json_ctx, const Span * span)
append_attribute_int(str, "query.startup", span->startup, true, true);
if (span->parameter_offset != -1)
append_array_value_field(str, "query.parameters",
shared_str + span->parameter_offset, span->num_parameters, true);
json_ctx->spans_str + span->parameter_offset, span->num_parameters, true);

if (span->deparse_info_offset != -1)
append_attribute_string(str, "query.deparse_info",
shared_str + span->deparse_info_offset, true);
json_ctx->spans_str + span->deparse_info_offset, true);
}

if (span->sql_error_code > 0)
Expand Down Expand Up @@ -403,11 +403,13 @@ aggregate_span_by_type(JsonContext * json_ctx, const pgTracingSpans * pgTracingS
* Prepare json context for json marshalling
*/
void
build_json_context(JsonContext * json_ctx, const pgTracingSpans * pgTracingSpans)
build_json_context(JsonContext * json_ctx, const pgTracingSpans * pgTracingSpans, const char *spans_str, int num_spans)
{
json_ctx->str = makeStringInfo();
memset(json_ctx->span_type_count, 0, sizeof(json_ctx->span_type_count));
memset(json_ctx->span_type_to_spans, 0, sizeof(json_ctx->span_type_to_spans));
json_ctx->spans_str = spans_str;
json_ctx->num_spans = num_spans;

aggregate_span_by_type(json_ctx, pgTracingSpans);
}
Expand Down
142 changes: 99 additions & 43 deletions src/pg_tracing_otel.c
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,17 @@ typedef struct OtelContext
{
CURL *curl; /* Curl handle */
struct curl_slist *headers; /* list of http headers common to all requests */
pgTracingSpans *spans; /* A copy of spans to send */
char *spans_str; /* A copy of span text */

const char *endpoint; /* Target otel collector */
int naptime; /* Duration between upload ot spans to the
* otel collector */
int connect_timeout_ms; /* Connection timeout in ms */
} OtelContext;

/* State and configuration of the otel exporter */
static OtelContext otelContext;
static OtelContext otel_context;

/* Dedicated memory contexts for otel exporter background worker. */
static MemoryContext otel_exporter_mem_ctx;
Expand Down Expand Up @@ -113,14 +116,67 @@ send_json_trace(OtelContext * octx, const char *json_span)
return res;
}

static void
copy_spans_to_context(OtelContext * octx)
{
Size span_size = sizeof(pgTracingSpans) + shared_spans->end * sizeof(Span);

Assert(octx->spans == NULL);
Assert(octx->spans_str == NULL);

/* Copy spans to send */
octx->spans = palloc(span_size);
memcpy(octx->spans, shared_spans, span_size);
/* Copy shared str */
octx->spans_str = palloc(pg_tracing_shared_state->extent);
memcpy(octx->spans_str, shared_str, pg_tracing_shared_state->extent);
}

static void
send_json_to_otel_collector(OtelContext * octx, JsonContext * json_ctx)
{
CURLcode ret;

elog(INFO, "Sending %d spans to %s", json_ctx->num_spans, octx->endpoint);
ret = send_json_trace(octx, json_ctx->str->data);
if (ret == CURLE_OK)
{
pg_tracing_shared_state->stats.otel_sent_spans += json_ctx->num_spans;
/* Send was successful, free the spans and spans_str copy */
MemoryContextReset(marshal_mem_ctx);
/* and reset our json_ctx stringinfo */
json_ctx->str = NULL;
}
else
{
ereport(WARNING, errmsg("curl_easy_perform() failed: %s\n",
curl_easy_strerror(ret)));

/*
* On a failure, we keep the json payload and will retry to send it on
* a next attempt
*/
pg_tracing_shared_state->stats.otel_failures++;
}
}

/*
* Consume and send spans to the otel collector
*/
static void
send_spans_to_otel_collector(OtelContext * octx)
send_spans_to_otel_collector(OtelContext * octx, JsonContext * json_ctx)
{
int num_spans = 0;
JsonContext json_ctx;

if (json_ctx->str && json_ctx->str->len > 0)
{
/*
* We have a previous json payload that we failed to send, try to send
* it
*/
send_json_to_otel_collector(octx, json_ctx);
return;
}

LWLockAcquire(pg_tracing_shared_state->lock, LW_EXCLUSIVE);
/* Check if we have spans to send */
Expand All @@ -131,39 +187,35 @@ send_spans_to_otel_collector(OtelContext * octx)
}
num_spans = shared_spans->end;

/*
* We copy the spans and spans_str in the otel context to release the lock
* as soon as possible
*/
copy_spans_to_context(octx);

/* Copy is done, drop all spans */
drop_all_spans_locked();
/* and free the lock */
LWLockRelease(pg_tracing_shared_state->lock);

/* Do marshalling within marshal memory context */
MemoryContextSwitchTo(marshal_mem_ctx);

/* Build the json context */
build_json_context(&json_ctx, shared_spans);
marshal_spans_to_json(&json_ctx);
build_json_context(json_ctx, octx->spans, octx->spans_str, num_spans);
/* Do the json marshalling */
marshal_spans_to_json(json_ctx);

MemoryContextSwitchTo(otel_exporter_mem_ctx);

if (json_ctx.str->len > 0)
{
CURLcode ret;
/* Marshalling is done, we can release our spans and spans_str copy */
pfree(octx->spans);
pfree(octx->spans_str);
octx->spans = NULL;
octx->spans_str = NULL;

elog(INFO, "Sending %d spans to %s", num_spans, octx->endpoint);
ret = send_json_trace(octx, json_ctx.str->data);
if (ret == CURLE_OK)
{
/* Send was successful, drop all spans */
pg_tracing_shared_state->stats.otel_sent_spans += num_spans;
drop_all_spans_locked();
}
else
{
ereport(WARNING, errmsg("curl_easy_perform() failed: %s\n",
curl_easy_strerror(ret)));
pg_tracing_shared_state->stats.otel_failures++;
}
}
LWLockRelease(pg_tracing_shared_state->lock);

/*
* Whether json was pushed or note, we can clear the memory used for
* marshalling
*/
MemoryContextReset(marshal_mem_ctx);
if (json_ctx->str->len > 0)
send_json_to_otel_collector(octx, json_ctx);
}

/*
Expand All @@ -186,11 +238,11 @@ pg_tracing_start_worker(const char *endpoint, int naptime, int connect_timeout_m
strcpy(worker.bgw_type, "pg_tracing otel exporter");

/* Initialize the otel context struct */
otelContext.headers = NULL;
otelContext.curl = NULL;
otelContext.endpoint = endpoint;
otelContext.naptime = naptime;
otelContext.connect_timeout_ms = connect_timeout_ms;
otel_context.headers = NULL;
otel_context.curl = NULL;
otel_context.endpoint = endpoint;
otel_context.naptime = naptime;
otel_context.connect_timeout_ms = connect_timeout_ms;

if (process_shared_preload_libraries_in_progress)
{
Expand Down Expand Up @@ -221,6 +273,10 @@ pg_tracing_start_worker(const char *endpoint, int naptime, int connect_timeout_m
void
pg_tracing_otel_exporter(Datum main_arg)
{
JsonContext json_ctx;

json_ctx.str = NULL;

/* Establish signal handlers; once that's done, unblock signals. */
pqsignal(SIGTERM, SignalHandlerForShutdownRequest);
pqsignal(SIGHUP, SignalHandlerForConfigReload);
Expand Down Expand Up @@ -260,7 +316,7 @@ pg_tracing_otel_exporter(Datum main_arg)
* Create the content type header only once since it will always be the
* same
*/
otelContext.headers = curl_slist_append(otelContext.headers, "Content-Type: application/json");
otel_context.headers = curl_slist_append(otel_context.headers, "Content-Type: application/json");

while (!ShutdownRequestPending)
{
Expand All @@ -270,21 +326,21 @@ pg_tracing_otel_exporter(Datum main_arg)
ResetLatch(MyLatch);
rc = WaitLatch(MyLatch,
WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
otelContext.naptime,
otel_context.naptime,
PG_WAIT_EXTENSION);

/* Send spans if we have any */
if (rc & WL_TIMEOUT)
send_spans_to_otel_collector(&otelContext);
send_spans_to_otel_collector(&otel_context, &json_ctx);
}

/* Curl cleanup */
curl_slist_free_all(otelContext.headers);
otelContext.headers = NULL;
if (otelContext.curl)
curl_slist_free_all(otel_context.headers);
otel_context.headers = NULL;
if (otel_context.curl)
{
curl_easy_cleanup(otelContext.curl);
otelContext.curl = NULL;
curl_easy_cleanup(otel_context.curl);
otel_context.curl = NULL;
}
curl_global_cleanup();
}
2 changes: 1 addition & 1 deletion src/pg_tracing_sql_functions.c
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ pg_tracing_json_spans(PG_FUNCTION_ARGS)
cleanup_tracing();

LWLockAcquire(pg_tracing_shared_state->lock, LW_SHARED);
build_json_context(&json_ctx, shared_spans);
build_json_context(&json_ctx, shared_spans, shared_str, shared_spans->end);
marshal_spans_to_json(&json_ctx);
LWLockRelease(pg_tracing_shared_state->lock);

Expand Down
17 changes: 14 additions & 3 deletions t/002_connect_timeout.pl
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,26 @@
$node->safe_psql("postgres",
"CREATE EXTENSION pg_tracing;");

# Create one span
$node->safe_psql("postgres", "/*dddbs='postgres.db',traceparent='00-00000000000000000000000000000001-0000000000000001-01'*/SELECT 1;\n");
# Create one trace
$node->safe_psql("postgres", "/*dddbs='postgres.db',traceparent='00-00000000000000000000000000000001-0000000000000001-01'*/SELECT 1");

ok( $node->poll_query_until('postgres', "SELECT otel_failures >= 1 FROM pg_tracing_info;"),
"Otel failures should be reported");

my $result =
$node->safe_psql('postgres', "SELECT count(*) FROM pg_tracing_peek_spans;");
is($result, qq(4), "Query's spans should still be present");
is($result, qq(0), "Query's spans should have been consumed as they were copied");

# Create a second trace
$node->safe_psql("postgres", "/*dddbs='postgres.db',traceparent='00-00000000000000000000000000000001-0000000000000001-01'*/SELECT 1");

# Wait for more failures
ok( $node->poll_query_until('postgres', "SELECT otel_failures >= 3 FROM pg_tracing_info;"),
"Otel failures should be reported");

$result =
$node->safe_psql('postgres', "SELECT count(*) FROM pg_tracing_peek_spans;");
is($result, qq(4), "Query's spans should still be present as we still attempt to send the previous payload");

# Cleanup
$node->stop;
Expand Down

0 comments on commit b3231a4

Please sign in to comment.