diff --git a/src/pg_tracing.h b/src/pg_tracing.h index 3078ccc..61a18e9 100644 --- a/src/pg_tracing.h +++ b/src/pg_tracing.h @@ -330,8 +330,10 @@ typedef struct TracedPlanstate typedef struct JsonContext { StringInfo str; + int num_spans; int span_type_count[NUM_SPAN_TYPE]; const Span **span_type_to_spans[NUM_SPAN_TYPE]; + const char *spans_str; } JsonContext; typedef struct SpanContext @@ -439,7 +441,10 @@ extern int extern void marshal_spans_to_json(JsonContext * json_ctx); extern void - build_json_context(JsonContext * json_ctx, const pgTracingSpans * pgTracingSpans); + build_json_context(JsonContext * json_ctx, + const pgTracingSpans * pgTracingSpans, + const char *spans_str, + int num_spans); /* pg_tracing_operation_hash.c */ extern void diff --git a/src/pg_tracing_json.c b/src/pg_tracing_json.c index b948e79..39a8be6 100644 --- a/src/pg_tracing_json.c +++ b/src/pg_tracing_json.c @@ -271,11 +271,11 @@ append_span_attributes(const JsonContext * json_ctx, const Span * span) append_attribute_int(str, "query.startup", span->startup, true, true); if (span->parameter_offset != -1) append_array_value_field(str, "query.parameters", - shared_str + span->parameter_offset, span->num_parameters, true); + json_ctx->spans_str + span->parameter_offset, span->num_parameters, true); if (span->deparse_info_offset != -1) append_attribute_string(str, "query.deparse_info", - shared_str + span->deparse_info_offset, true); + json_ctx->spans_str + span->deparse_info_offset, true); } if (span->sql_error_code > 0) @@ -403,11 +403,13 @@ aggregate_span_by_type(JsonContext * json_ctx, const pgTracingSpans * pgTracingS * Prepare json context for json marshalling */ void -build_json_context(JsonContext * json_ctx, const pgTracingSpans * pgTracingSpans) +build_json_context(JsonContext * json_ctx, const pgTracingSpans * pgTracingSpans, const char *spans_str, int num_spans) { json_ctx->str = makeStringInfo(); memset(json_ctx->span_type_count, 0, sizeof(json_ctx->span_type_count)); memset(json_ctx->span_type_to_spans, 0, sizeof(json_ctx->span_type_to_spans)); + json_ctx->spans_str = spans_str; + json_ctx->num_spans = num_spans; aggregate_span_by_type(json_ctx, pgTracingSpans); } diff --git a/src/pg_tracing_otel.c b/src/pg_tracing_otel.c index 1da50ad..b12ff44 100644 --- a/src/pg_tracing_otel.c +++ b/src/pg_tracing_otel.c @@ -25,6 +25,9 @@ typedef struct OtelContext { CURL *curl; /* Curl handle */ struct curl_slist *headers; /* list of http headers common to all requests */ + pgTracingSpans *spans; /* A copy of spans to send */ + char *spans_str; /* A copy of span text */ + const char *endpoint; /* Target otel collector */ int naptime; /* Duration between upload ot spans to the * otel collector */ @@ -32,7 +35,7 @@ typedef struct OtelContext } OtelContext; /* State and configuration of the otel exporter */ -static OtelContext otelContext; +static OtelContext otel_context; /* Dedicated memory contexts for otel exporter background worker. */ static MemoryContext otel_exporter_mem_ctx; @@ -113,14 +116,67 @@ send_json_trace(OtelContext * octx, const char *json_span) return res; } +static void +copy_spans_to_context(OtelContext * octx) +{ + Size span_size = sizeof(pgTracingSpans) + shared_spans->end * sizeof(Span); + + Assert(octx->spans == NULL); + Assert(octx->spans_str == NULL); + + /* Copy spans to send */ + octx->spans = palloc(span_size); + memcpy(octx->spans, shared_spans, span_size); + /* Copy shared str */ + octx->spans_str = palloc(pg_tracing_shared_state->extent); + memcpy(octx->spans_str, shared_str, pg_tracing_shared_state->extent); +} + +static void +send_json_to_otel_collector(OtelContext * octx, JsonContext * json_ctx) +{ + CURLcode ret; + + elog(INFO, "Sending %d spans to %s", json_ctx->num_spans, octx->endpoint); + ret = send_json_trace(octx, json_ctx->str->data); + if (ret == CURLE_OK) + { + pg_tracing_shared_state->stats.otel_sent_spans += json_ctx->num_spans; + /* Send was successful, free the spans and spans_str copy */ + MemoryContextReset(marshal_mem_ctx); + /* and reset our json_ctx stringinfo */ + json_ctx->str = NULL; + } + else + { + ereport(WARNING, errmsg("curl_easy_perform() failed: %s\n", + curl_easy_strerror(ret))); + + /* + * On a failure, we keep the json payload and will retry to send it on + * a next attempt + */ + pg_tracing_shared_state->stats.otel_failures++; + } +} + /* * Consume and send spans to the otel collector */ static void -send_spans_to_otel_collector(OtelContext * octx) +send_spans_to_otel_collector(OtelContext * octx, JsonContext * json_ctx) { int num_spans = 0; - JsonContext json_ctx; + + if (json_ctx->str && json_ctx->str->len > 0) + { + /* + * We have a previous json payload that we failed to send, try to send + * it + */ + send_json_to_otel_collector(octx, json_ctx); + return; + } LWLockAcquire(pg_tracing_shared_state->lock, LW_EXCLUSIVE); /* Check if we have spans to send */ @@ -131,39 +187,35 @@ send_spans_to_otel_collector(OtelContext * octx) } num_spans = shared_spans->end; + /* + * We copy the spans and spans_str in the otel context to release the lock + * as soon as possible + */ + copy_spans_to_context(octx); + + /* Copy is done, drop all spans */ + drop_all_spans_locked(); + /* and free the lock */ + LWLockRelease(pg_tracing_shared_state->lock); + /* Do marshalling within marshal memory context */ MemoryContextSwitchTo(marshal_mem_ctx); + /* Build the json context */ - build_json_context(&json_ctx, shared_spans); - marshal_spans_to_json(&json_ctx); + build_json_context(json_ctx, octx->spans, octx->spans_str, num_spans); + /* Do the json marshalling */ + marshal_spans_to_json(json_ctx); + MemoryContextSwitchTo(otel_exporter_mem_ctx); - if (json_ctx.str->len > 0) - { - CURLcode ret; + /* Marshalling is done, we can release our spans and spans_str copy */ + pfree(octx->spans); + pfree(octx->spans_str); + octx->spans = NULL; + octx->spans_str = NULL; - elog(INFO, "Sending %d spans to %s", num_spans, octx->endpoint); - ret = send_json_trace(octx, json_ctx.str->data); - if (ret == CURLE_OK) - { - /* Send was successful, drop all spans */ - pg_tracing_shared_state->stats.otel_sent_spans += num_spans; - drop_all_spans_locked(); - } - else - { - ereport(WARNING, errmsg("curl_easy_perform() failed: %s\n", - curl_easy_strerror(ret))); - pg_tracing_shared_state->stats.otel_failures++; - } - } - LWLockRelease(pg_tracing_shared_state->lock); - - /* - * Whether json was pushed or note, we can clear the memory used for - * marshalling - */ - MemoryContextReset(marshal_mem_ctx); + if (json_ctx->str->len > 0) + send_json_to_otel_collector(octx, json_ctx); } /* @@ -186,11 +238,11 @@ pg_tracing_start_worker(const char *endpoint, int naptime, int connect_timeout_m strcpy(worker.bgw_type, "pg_tracing otel exporter"); /* Initialize the otel context struct */ - otelContext.headers = NULL; - otelContext.curl = NULL; - otelContext.endpoint = endpoint; - otelContext.naptime = naptime; - otelContext.connect_timeout_ms = connect_timeout_ms; + otel_context.headers = NULL; + otel_context.curl = NULL; + otel_context.endpoint = endpoint; + otel_context.naptime = naptime; + otel_context.connect_timeout_ms = connect_timeout_ms; if (process_shared_preload_libraries_in_progress) { @@ -221,6 +273,10 @@ pg_tracing_start_worker(const char *endpoint, int naptime, int connect_timeout_m void pg_tracing_otel_exporter(Datum main_arg) { + JsonContext json_ctx; + + json_ctx.str = NULL; + /* Establish signal handlers; once that's done, unblock signals. */ pqsignal(SIGTERM, SignalHandlerForShutdownRequest); pqsignal(SIGHUP, SignalHandlerForConfigReload); @@ -260,7 +316,7 @@ pg_tracing_otel_exporter(Datum main_arg) * Create the content type header only once since it will always be the * same */ - otelContext.headers = curl_slist_append(otelContext.headers, "Content-Type: application/json"); + otel_context.headers = curl_slist_append(otel_context.headers, "Content-Type: application/json"); while (!ShutdownRequestPending) { @@ -270,21 +326,21 @@ pg_tracing_otel_exporter(Datum main_arg) ResetLatch(MyLatch); rc = WaitLatch(MyLatch, WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, - otelContext.naptime, + otel_context.naptime, PG_WAIT_EXTENSION); /* Send spans if we have any */ if (rc & WL_TIMEOUT) - send_spans_to_otel_collector(&otelContext); + send_spans_to_otel_collector(&otel_context, &json_ctx); } /* Curl cleanup */ - curl_slist_free_all(otelContext.headers); - otelContext.headers = NULL; - if (otelContext.curl) + curl_slist_free_all(otel_context.headers); + otel_context.headers = NULL; + if (otel_context.curl) { - curl_easy_cleanup(otelContext.curl); - otelContext.curl = NULL; + curl_easy_cleanup(otel_context.curl); + otel_context.curl = NULL; } curl_global_cleanup(); } diff --git a/src/pg_tracing_sql_functions.c b/src/pg_tracing_sql_functions.c index ed3d0a6..74a7329 100644 --- a/src/pg_tracing_sql_functions.c +++ b/src/pg_tracing_sql_functions.c @@ -237,7 +237,7 @@ pg_tracing_json_spans(PG_FUNCTION_ARGS) cleanup_tracing(); LWLockAcquire(pg_tracing_shared_state->lock, LW_SHARED); - build_json_context(&json_ctx, shared_spans); + build_json_context(&json_ctx, shared_spans, shared_str, shared_spans->end); marshal_spans_to_json(&json_ctx); LWLockRelease(pg_tracing_shared_state->lock); diff --git a/t/002_connect_timeout.pl b/t/002_connect_timeout.pl index dab61d5..8fb9e08 100644 --- a/t/002_connect_timeout.pl +++ b/t/002_connect_timeout.pl @@ -22,15 +22,26 @@ $node->safe_psql("postgres", "CREATE EXTENSION pg_tracing;"); -# Create one span -$node->safe_psql("postgres", "/*dddbs='postgres.db',traceparent='00-00000000000000000000000000000001-0000000000000001-01'*/SELECT 1;\n"); +# Create one trace +$node->safe_psql("postgres", "/*dddbs='postgres.db',traceparent='00-00000000000000000000000000000001-0000000000000001-01'*/SELECT 1"); ok( $node->poll_query_until('postgres', "SELECT otel_failures >= 1 FROM pg_tracing_info;"), "Otel failures should be reported"); my $result = $node->safe_psql('postgres', "SELECT count(*) FROM pg_tracing_peek_spans;"); -is($result, qq(4), "Query's spans should still be present"); +is($result, qq(0), "Query's spans should have been consumed as they were copied"); + +# Create a second trace +$node->safe_psql("postgres", "/*dddbs='postgres.db',traceparent='00-00000000000000000000000000000001-0000000000000001-01'*/SELECT 1"); + +# Wait for more failures +ok( $node->poll_query_until('postgres', "SELECT otel_failures >= 3 FROM pg_tracing_info;"), + "Otel failures should be reported"); + +$result = + $node->safe_psql('postgres', "SELECT count(*) FROM pg_tracing_peek_spans;"); +is($result, qq(4), "Query's spans should still be present as we still attempt to send the previous payload"); # Cleanup $node->stop;