diff --git a/Makefile b/Makefile index a0d303b..a56fe49 100644 --- a/Makefile +++ b/Makefile @@ -17,12 +17,14 @@ OBJS = \ src/pg_tracing_active_spans.o \ src/pg_tracing_explain.o \ src/pg_tracing_json.o \ + src/pg_tracing_operation_hash.o \ src/pg_tracing_otel.o \ src/pg_tracing_parallel.o \ src/pg_tracing_planstate.o \ src/pg_tracing_query_process.o \ src/pg_tracing_span.o \ src/pg_tracing_sql_functions.o \ + src/pg_tracing_strinfo.o \ src/version_compat.o REGRESSCHECKS = setup utility select insert trigger cursor json transaction diff --git a/expected/extended.out b/expected/extended.out index 2988726..270a5d9 100644 --- a/expected/extended.out +++ b/expected/extended.out @@ -80,7 +80,7 @@ SELECT span_type, span_operation, parameters, lvl FROM peek_ordered_spans; Utility query | COMMIT; | | 2 ProcessUtility | ProcessUtility | | 3 TransactionBlock | TransactionBlock | | 1 - Utility query | BEGIN | | 2 + Utility query | BEGIN; | | 2 ProcessUtility | ProcessUtility | | 3 Select query | SELECT $1 | {1} | 2 Planner | Planner | | 3 @@ -297,7 +297,7 @@ SELECT span_type, span_operation, parameters, lvl FROM peek_ordered_spans WHERE span_type | span_operation | parameters | lvl ------------------+------------------+------------+----- TransactionBlock | TransactionBlock | | 1 - Utility query | BEGIN | | 2 + Utility query | BEGIN; | | 2 ProcessUtility | ProcessUtility | | 3 Select query | SELECT $1 | {1} | 2 Planner | Planner | | 3 diff --git a/expected/trigger.out b/expected/trigger.out index d38e778..c754c44 100644 --- a/expected/trigger.out +++ b/expected/trigger.out @@ -193,7 +193,7 @@ SELECT trace_id, span_type, span_operation, lvl from peek_ordered_spans where tr 00000000000000000000000000000001 | Planner | Planner | 6 00000000000000000000000000000001 | ExecutorRun | ExecutorRun | 6 00000000000000000000000000000001 | Result | Result | 7 - 00000000000000000000000000000001 | Select query | SELECT 'SELECT 1' | 5 + 00000000000000000000000000000001 | Select query | SELECT $1 | 5 00000000000000000000000000000001 | ExecutorRun | ExecutorRun | 6 00000000000000000000000000000001 | Result | Result | 7 00000000000000000000000000000001 | TransactionCommit | TransactionCommit | 1 @@ -213,7 +213,7 @@ SELECT trace_id, span_type, span_operation, lvl from peek_ordered_spans where tr 00000000000000000000000000000002 | Planner | Planner | 4 00000000000000000000000000000002 | ExecutorRun | ExecutorRun | 4 00000000000000000000000000000002 | Result | Result | 5 - 00000000000000000000000000000002 | Select query | SELECT 'SELECT 1' | 3 + 00000000000000000000000000000002 | Select query | SELECT $1 | 3 00000000000000000000000000000002 | ExecutorRun | ExecutorRun | 4 00000000000000000000000000000002 | Result | Result | 5 00000000000000000000000000000002 | TransactionCommit | TransactionCommit | 1 @@ -233,7 +233,7 @@ SELECT trace_id, span_type, span_operation, lvl from peek_ordered_spans where tr 00000000000000000000000000000003 | Planner | Planner | 4 00000000000000000000000000000003 | ExecutorRun | ExecutorRun | 4 00000000000000000000000000000003 | Result | Result | 5 - 00000000000000000000000000000003 | Select query | SELECT 'SELECT 1' | 3 + 00000000000000000000000000000003 | Select query | SELECT $1 | 3 00000000000000000000000000000003 | ExecutorRun | ExecutorRun | 4 00000000000000000000000000000003 | Result | Result | 5 00000000000000000000000000000003 | TransactionCommit | TransactionCommit | 1 diff --git a/src/pg_tracing.c b/src/pg_tracing.c index c9645d0..cfef038 100644 --- a/src/pg_tracing.c +++ b/src/pg_tracing.c @@ -101,6 +101,7 @@ typedef struct PerLevelInfos /* GUC variables */ static int pg_tracing_max_span; /* Maximum number of spans to store */ +static int pg_tracing_shared_str_size; /* Size of the shared str area */ static int pg_tracing_max_parameter_str; /* Maximum number of spans to * store */ static bool pg_tracing_planstate_spans = true; /* Generate spans from the @@ -194,6 +195,11 @@ pgTracingSharedState *pg_tracing_shared_state = NULL; */ pgTracingSpans *shared_spans = NULL; +/* + * Shared buffer for strings + */ +char *shared_str = NULL; + /* * Store spans for the current trace. * They will be added to shared_spans at the end of the query tracing. @@ -201,10 +207,19 @@ pgTracingSpans *shared_spans = NULL; static pgTracingSpans * current_trace_spans; /* -* Text for spans are buffered in this stringinfo and written at -* the end of the query tracing in a single write. -*/ -static StringInfo current_trace_text; + * Stringinfo buffer used for operation_name + */ +static StringInfo operation_name_buffer; + +/* + * StringInfo buffer used for operation_name extracted from planstate + */ +static StringInfo plan_name_buffer; + +/* + * Stringinfo buffer used for deparse info and parameters + */ +static StringInfo parameters_buffer; /* Current nesting depth of Planner+ExecutorRun+ProcessUtility calls */ int nested_level = 0; @@ -306,6 +321,19 @@ _PG_init(void) NULL, NULL); + DefineCustomIntVariable("pg_tracing.shared_str_size", + "Size of the allocated string area used for spans' strings (operation_name, parameters, deparse infos...).", + NULL, + &pg_tracing_shared_str_size, + 5242880, + 0, + 52428800, + PGC_POSTMASTER, + 0, + NULL, + NULL, + NULL); + DefineCustomBoolVariable("pg_tracing.trace_parallel_workers", "Whether to generate samples from parallel workers.", NULL, @@ -525,8 +553,10 @@ pg_tracing_memsize(void) size = add_size(size, sizeof(pgTracingSpans)); /* the span variable array */ size = add_size(size, mul_size(pg_tracing_max_span, sizeof(Span))); - /* and the parallel workers context */ + /* the parallel workers context */ size = add_size(size, mul_size(max_parallel_workers, sizeof(pgTracingParallelContext))); + /* and the shared string */ + size = add_size(size, pg_tracing_shared_str_size); return size; } @@ -550,6 +580,7 @@ pg_tracing_shmem_startup(void) { bool found_pg_tracing; bool found_shared_spans; + bool found_shared_str; /* reset in case this is a restart within the postmaster */ pg_tracing_shared_state = NULL; @@ -565,6 +596,9 @@ pg_tracing_shmem_startup(void) sizeof(pgTracingSpans) + pg_tracing_max_span * sizeof(Span), &found_shared_spans); + shared_str = ShmemInitStruct("PgTracing Shared str", + pg_tracing_shared_str_size, + &found_shared_str); /* Initialize pg_tracing memory context */ pg_tracing_mem_ctx = AllocSetContextCreate(TopMemoryContext, @@ -573,6 +607,8 @@ pg_tracing_shmem_startup(void) /* Initialize shmem for trace propagation to parallel workers */ pg_tracing_shmem_parallel_startup(); + /* Initialize operation hash */ + init_operation_hash(); /* First time, let's init shared state */ if (!found_pg_tracing) @@ -585,6 +621,7 @@ pg_tracing_shmem_startup(void) shared_spans->end = 0; shared_spans->max = pg_tracing_max_span; } + LWLockRelease(AddinShmemInitLock); } @@ -745,34 +782,42 @@ is_query_id_filtered(uint64 query_id) } /* - * Append a string current_trace_text StringInfo + * Append str to shared_str + * shared_state lock should be acquired */ -int -add_str_to_trace_buffer(const char *str, int str_len) +Size +append_str_to_shared_str(const char *txt, int str_len) { - int position = current_trace_text->len; + Size extent = pg_tracing_shared_state->extent; - Assert(str_len > 0); + if (extent + str_len > pg_tracing_shared_str_size) + { + /* Not enough room in the shared_str buffer */ + pg_tracing_shared_state->stats.dropped_str++; + return -1; + } - appendBinaryStringInfo(current_trace_text, str, str_len); - appendStringInfoChar(current_trace_text, '\0'); - return position; + /* Copy txt to the shared buffer */ + memcpy(shared_str + pg_tracing_shared_state->extent, txt, str_len); + /* Update our tracked extent */ + pg_tracing_shared_state->extent += str_len; + return extent; } /* * Append parameters to StringInfo trace buffer */ static int -add_parameters_to_trace_buffer(ParamListInfo params) +append_parameters_to_trace_str_buffer(ParamListInfo params) { - int position = current_trace_text->len; + int position = parameters_buffer->len; for (int paramno = 0; paramno < params->numParams; paramno++) { ParamExternData *param = ¶ms->params[paramno]; if (param->isnull || !OidIsValid(param->ptype)) - appendStringInfoChar(current_trace_text, '\0'); + appendStringInfoChar(parameters_buffer, '\0'); else { Oid typoutput; @@ -781,7 +826,7 @@ add_parameters_to_trace_buffer(ParamListInfo params) getTypeOutputInfo(param->ptype, &typoutput, &typisvarlena); pstring = OidOutputFunctionCall(typoutput, param->value); - appendBinaryStringInfo(current_trace_text, pstring, strlen(pstring) + 1); + appendBinaryStringInfo(parameters_buffer, pstring, strlen(pstring) + 1); } } return position; @@ -863,8 +908,10 @@ drop_all_spans_locked(void) { /* Drop all spans */ shared_spans->end = 0; - /* Reset query file position */ + /* Reset shared str position */ pg_tracing_shared_state->extent = 0; + /* Remove all hash entries */ + reset_operation_hash(); /* Update last consume ts */ pg_tracing_shared_state->stats.last_consume = GetCurrentTimestamp(); pg_truncate(PG_TRACING_TEXT_FILE, 0); @@ -963,9 +1010,15 @@ process_query_desc(const Traceparent * traceparent, const QueryDesc *queryDesc, { uint64 parent_id = per_level_infos[nested_level].executor_run_span_id; TimestampTz parent_start = per_level_infos[nested_level].executor_start; + uint64 query_id = queryDesc->plannedstmt->queryId; - process_planstate(traceparent, queryDesc, sql_error_code, pg_tracing_deparse_plan, - parent_id, parent_start, parent_end); + if (query_id == 0) + query_id = current_query_id; + + process_planstate(traceparent, queryDesc, sql_error_code, + pg_tracing_deparse_plan, parent_id, query_id, + parent_start, parent_end, + parameters_buffer, plan_name_buffer); } } @@ -1188,7 +1241,8 @@ cleanup_tracing(void) static void end_tracing(void) { - Size file_position = 0; + Size parameter_pos = 0; + Size plan_name_pos = 0; /* We're still a nested query, tracing is not finished */ if (nested_level > 0) @@ -1196,20 +1250,30 @@ end_tracing(void) LWLockAcquire(pg_tracing_shared_state->lock, LW_EXCLUSIVE); - if (current_trace_text->len > 0) - { - /* Dump all buffered texts in file */ - text_store_file(pg_tracing_shared_state, current_trace_text->data, current_trace_text->len, - &file_position); - - /* Adjust file position of spans */ - for (int i = 0; i < current_trace_spans->end; i++) - adjust_file_offset(current_trace_spans->spans + i, file_position); - } + if (parameters_buffer->len > 0) + parameter_pos = append_str_to_shared_str(parameters_buffer->data, parameters_buffer->len); + if (plan_name_buffer->len > 0) + plan_name_pos = append_str_to_shared_str(plan_name_buffer->data, plan_name_buffer->len); /* We're at the end, add all stored spans to the shared memory */ for (int i = 0; i < current_trace_spans->end; i++) - add_span_to_shared_buffer_locked(¤t_trace_spans->spans[i]); + { + Span *span = ¤t_trace_spans->spans[i]; + + if (span->operation_name_offset != -1) + { + if (span->type >= SPAN_TOP_SELECT && span->type <= SPAN_TOP_UNKNOWN) + span->operation_name_offset = lookup_operation_name(span, operation_name_buffer->data + span->operation_name_offset); + else + span->operation_name_offset += plan_name_pos; + } + if (span->parameter_offset != -1) + span->parameter_offset += parameter_pos; + if (span->deparse_info_offset != -1) + span->deparse_info_offset += parameter_pos; + + add_span_to_shared_buffer_locked(span); + } /* Update our stats with the new trace */ pg_tracing_shared_state->stats.processed_traces++; @@ -1277,7 +1341,9 @@ initialize_trace_level(void) current_trace_spans = palloc0(sizeof(pgTracingSpans) + INITIAL_ALLOCATED_SPANS * sizeof(Span)); current_trace_spans->max = INITIAL_ALLOCATED_SPANS; - current_trace_text = makeStringInfo(); + operation_name_buffer = makeStringInfo(); + plan_name_buffer = makeStringInfo(); + parameters_buffer = makeStringInfo(); MemoryContextSwitchTo(oldcxt); } @@ -1330,7 +1396,6 @@ end_nested_level(const TimestampTz *input_span_end_time) static void initialise_span_context(SpanContext * span_context, Traceparent * traceparent, - StringInfo current_trace_text, const PlannedStmt *pstmt, const JumbleState *jstate, Query *query, const char *query_text) @@ -1344,7 +1409,8 @@ initialise_span_context(SpanContext * span_context, /* We have an ongoing transaction block. Use it as parent for level 0 */ span_context->traceparent->parent_id = tx_block_span.span_id; } - span_context->current_trace_text = current_trace_text; + span_context->parameters_buffer = parameters_buffer; + span_context->operation_name_buffer = operation_name_buffer; span_context->pstmt = pstmt; span_context->query = query; span_context->jstate = jstate; @@ -1355,6 +1421,9 @@ initialise_span_context(SpanContext * span_context, span_context->query_id = pstmt->queryId; else span_context->query_id = query->queryId; + + if (span_context->query_id == 0) + span_context->query_id = current_query_id; } /* @@ -1428,7 +1497,7 @@ pg_tracing_post_parse_analyze(ParseState *pstate, Query *query, JumbleState *jst /* Statement is sampled, initialize memory and push a new active span */ initialize_trace_level(); - initialise_span_context(&span_context, traceparent, current_trace_text, + initialise_span_context(&span_context, traceparent, NULL, jstate, query, pstate->p_sourcetext); if (query->utilityStmt && nodeTag(query->utilityStmt) == T_TransactionStmt) @@ -1440,7 +1509,7 @@ pg_tracing_post_parse_analyze(ParseState *pstate, Query *query, JumbleState *jst /* We have an explicit BEGIN, start a transaction block span */ begin_span(traceparent->trace_id, &tx_block_span, SPAN_TRANSACTION_BLOCK, NULL, traceparent->parent_id, span_context.query_id, GetCurrentTransactionStartTimestamp()); - /* Update traceparent context to use the tx_block span as parent */ + /* Update traceparent context to use the tx_block span as parent */ span_context.traceparent->parent_id = tx_block_span.span_id; } } @@ -1493,7 +1562,7 @@ pg_tracing_planner_hook(Query *query, const char *query_string, int cursorOption /* statement is sampled */ initialize_trace_level(); - initialise_span_context(&span_context, traceparent, current_trace_text, + initialise_span_context(&span_context, traceparent, NULL, NULL, query, query_string); push_active_span(pg_tracing_mem_ctx, &span_context, command_type_to_span_type(query->commandType), HOOK_PLANNER); @@ -1532,7 +1601,7 @@ pg_tracing_planner_hook(Query *query, const char *query_string, int cursorOption { Span *span = peek_active_span(); - span->parameter_offset = add_parameters_to_trace_buffer(params); + span->parameter_offset = append_parameters_to_trace_str_buffer(params); span->num_parameters = params->numParams; } return result; @@ -1586,7 +1655,7 @@ pg_tracing_ExecutorStart(QueryDesc *queryDesc, int eflags) /* Statement is sampled */ initialize_trace_level(); - initialise_span_context(&span_context, traceparent, current_trace_text, + initialise_span_context(&span_context, traceparent, queryDesc->plannedstmt, NULL, NULL, queryDesc->sourceText); push_active_span(pg_tracing_mem_ctx, &span_context, command_type_to_span_type(queryDesc->operation), HOOK_EXECUTOR); @@ -1656,7 +1725,7 @@ pg_tracing_ExecutorRun(QueryDesc *queryDesc, ScanDirection direction, uint64 cou /* ExecutorRun is sampled */ initialize_trace_level(); - initialise_span_context(&span_context, traceparent, current_trace_text, + initialise_span_context(&span_context, traceparent, queryDesc->plannedstmt, NULL, NULL, queryDesc->sourceText); push_active_span(pg_tracing_mem_ctx, &span_context, command_type_to_span_type(queryDesc->operation), HOOK_EXECUTOR); @@ -1768,7 +1837,7 @@ pg_tracing_ExecutorFinish(QueryDesc *queryDesc) /* Statement is sampled */ initialize_trace_level(); - initialise_span_context(&span_context, traceparent, current_trace_text, + initialise_span_context(&span_context, traceparent, queryDesc->plannedstmt, NULL, NULL, queryDesc->sourceText); push_active_span(pg_tracing_mem_ctx, &span_context, command_type_to_span_type(queryDesc->operation), HOOK_EXECUTOR); @@ -1902,6 +1971,9 @@ pg_tracing_ProcessUtility(PlannedStmt *pstmt, const char *queryString, reset_traceparent(&parse_traceparent); } + if (nested_level == 0) + current_query_id = pstmt->queryId; + if (!pg_tracing_enabled(traceparent, nested_level)) { /* No sampling, just go through the standard process utility */ @@ -1952,7 +2024,7 @@ pg_tracing_ProcessUtility(PlannedStmt *pstmt, const char *queryString, in_aborted_transaction = IsAbortedTransactionBlockState(); initialize_trace_level(); - initialise_span_context(&span_context, traceparent, current_trace_text, + initialise_span_context(&span_context, traceparent, pstmt, NULL, NULL, queryString); if (track_utility) { diff --git a/src/pg_tracing.h b/src/pg_tracing.h index da7c25e..3078ccc 100644 --- a/src/pg_tracing.h +++ b/src/pg_tracing.h @@ -240,6 +240,8 @@ typedef struct pgTracingStats int64 processed_spans; /* number of spans processed */ int64 dropped_traces; /* number of traces aborted due to full buffer */ int64 dropped_spans; /* number of dropped spans due to full buffer */ + int64 dropped_str; /* number of dropped str due to full + * shared_str */ int64 otel_sent_spans; /* number of spans sent to otel collector */ int64 otel_failures; /* number of failures to send spans to otel * collector */ @@ -255,7 +257,7 @@ typedef struct pgTracingSharedState { LWLock *lock; /* protects shared spans, shared state and * query file */ - Size extent; /* current extent of query file */ + Size extent; /* current extent of shared str */ pgTracingStats stats; /* global statistics for pg_tracing */ } pgTracingSharedState; @@ -267,6 +269,8 @@ typedef struct planstateTraceContext List *ancestors; List *deparse_ctx; List *rtable_names; + StringInfo deparse_info_buffer; + StringInfo plan_name_buffer; } planstateTraceContext; /* @@ -326,8 +330,6 @@ typedef struct TracedPlanstate typedef struct JsonContext { StringInfo str; - const char *qbuffer; - Size qbuffer_size; int span_type_count[NUM_SPAN_TYPE]; const Span **span_type_to_spans[NUM_SPAN_TYPE]; } JsonContext; @@ -336,7 +338,8 @@ typedef struct SpanContext { TimestampTz start_time; Traceparent *traceparent; - StringInfo current_trace_text; + StringInfo parameters_buffer; + StringInfo operation_name_buffer; const PlannedStmt *pstmt; const Query *query; const JumbleState *jstate; @@ -360,8 +363,9 @@ extern void fetch_parallel_context(Traceparent * traceparent); extern void process_planstate(const Traceparent * traceparent, const QueryDesc *queryDesc, - int sql_error_code, bool deparse_plan, uint64 parent_id, - TimestampTz parent_start, TimestampTz parent_end); + int sql_error_code, bool deparse_plan, uint64 parent_id, uint64 query_id, + TimestampTz parent_start, TimestampTz parent_end, + StringInfo deparse_info_buffer, StringInfo plan_name_buffer); extern void setup_ExecProcNode_override(MemoryContext context, QueryDesc *queryDesc); @@ -382,9 +386,6 @@ extern void extract_trace_context_from_query(Traceparent * traceparent, const ch extern ParseTraceparentErr parse_trace_context(Traceparent * traceparent, const char *trace_context_str, int trace_context_len); extern char *parse_code_to_err(ParseTraceparentErr err); extern const char *normalise_query(const char *query, int query_loc, int *query_len_p); -extern bool text_store_file(pgTracingSharedState * pg_tracing, const char *query, - int query_len, Size *query_offset); -extern char *qtext_load_file(Size *buffer_size); /* pg_tracing_span.c */ extern void begin_span(TraceId trace_id, Span * span, SpanType type, @@ -393,34 +394,33 @@ extern void begin_span(TraceId trace_id, Span * span, SpanType type, extern void end_span(Span * span, const TimestampTz *end_time); extern void reset_span(Span * span); extern const char *get_span_type(const Span * span); -extern const char *get_operation_name(const Span * span, const char *qbuffer, Size qbuffer_size); +extern const char *get_operation_name(const Span * span); extern void adjust_file_offset(Span * span, Size file_position); extern bool traceid_zero(TraceId trace_id); extern const char *span_type_to_str(SpanType span_type); /* pg_tracing_sql_functions.c */ -pgTracingStats get_empty_pg_tracing_stats(void); +extern pgTracingStats get_empty_pg_tracing_stats(void); /* pg_tracing_active_spans.c */ -Span *pop_active_span(const TimestampTz *end_time); -Span *peek_active_span(void); -Span *push_active_span(MemoryContext context, const SpanContext * span_context, - SpanType span_type, HookPhase hook_phase); -Span *push_child_active_span(MemoryContext context, const SpanContext * span_context, - SpanType span_type); +extern Span * pop_active_span(const TimestampTz *end_time); +extern Span * peek_active_span(void); +extern Span * push_active_span(MemoryContext context, const SpanContext * span_context, + SpanType span_type, HookPhase hook_phase); +extern Span * push_child_active_span(MemoryContext context, const SpanContext * span_context, + SpanType span_type); -void cleanup_active_spans(void); +extern void cleanup_active_spans(void); /* pg_tracing.c */ extern pgTracingSharedState * pg_tracing_shared_state; extern pgTracingSpans * shared_spans; - +extern char *shared_str; extern int nested_level; + extern void store_span(const Span * span); -extern int - add_str_to_trace_buffer(const char *str, int str_len); extern void cleanup_tracing(void); extern void @@ -428,12 +428,26 @@ extern void extern void pg_tracing_shmem_startup(void); extern void reset_traceparent(Traceparent * traceparent); +extern Size + append_str_to_shared_str(const char *txt, int str_len); + +/* pg_tracing_strinfo.c */ +extern int + appendStringInfoNT(StringInfo strinfo, const char *str, int str_len); /* pg_tracing_json.c */ extern void marshal_spans_to_json(JsonContext * json_ctx); extern void - build_json_context(JsonContext * json_ctx, const char *qbuffer, Size qbuffer_size, const pgTracingSpans * pgTracingSpans); + build_json_context(JsonContext * json_ctx, const pgTracingSpans * pgTracingSpans); + +/* pg_tracing_operation_hash.c */ +extern void + init_operation_hash(void); +extern void + reset_operation_hash(void); +extern Size + lookup_operation_name(const Span * span, const char *txt); /* pg_tracing_otel.c */ extern void diff --git a/src/pg_tracing_active_spans.c b/src/pg_tracing_active_spans.c index 2677ee2..062c7d7 100644 --- a/src/pg_tracing_active_spans.c +++ b/src/pg_tracing_active_spans.c @@ -170,17 +170,21 @@ begin_active_span(const SpanContext * span_context, Span * span, { /* jstate is available, normalise query and extract parameters' values */ int num_parameters = 0; - int parameter_offset = span_context->current_trace_text->len; - StringInfo trace_text = NULL; + int parameter_offset = span_context->parameters_buffer->len; + StringInfo parameters_buffer = NULL; if (span_context->export_parameters) - /* We want parameter's value, push trace_text StringInfo */ - trace_text = span_context->current_trace_text; + + /* + * We want parameters' value, propagate parameters_buffer + * StringInfo + */ + parameters_buffer = span_context->parameters_buffer; query_len = query->stmt_len; normalised_query = normalise_query_parameters(span_context->jstate, span_context->query_text, query->stmt_location, &query_len, - trace_text, &num_parameters); + parameters_buffer, &num_parameters); span->num_parameters = num_parameters; if (num_parameters > 0 && span_context->export_parameters) span->parameter_offset = parameter_offset; @@ -211,8 +215,8 @@ begin_active_span(const SpanContext * span_context, Span * span, normalised_query = normalise_query(span_context->query_text, stmt_location, &query_len); } if (query_len > 0) - span->operation_name_offset = add_str_to_trace_buffer(normalised_query, - query_len); + span->operation_name_offset = appendStringInfoNT(span_context->operation_name_buffer, normalised_query, + query_len); } /* diff --git a/src/pg_tracing_json.c b/src/pg_tracing_json.c index 75fb205..b948e79 100644 --- a/src/pg_tracing_json.c +++ b/src/pg_tracing_json.c @@ -269,13 +269,13 @@ append_span_attributes(const JsonContext * json_ctx, const Span * span) append_plan_counters(str, &span->plan_counters); append_attribute_int(str, "query.startup", span->startup, true, true); - if (span->parameter_offset != -1 && json_ctx->qbuffer_size > 0 && json_ctx->qbuffer_size > span->parameter_offset) + if (span->parameter_offset != -1) append_array_value_field(str, "query.parameters", - json_ctx->qbuffer + span->parameter_offset, span->num_parameters, true); + shared_str + span->parameter_offset, span->num_parameters, true); - if (span->deparse_info_offset != -1 && json_ctx->qbuffer_size > 0 && json_ctx->qbuffer_size > span->deparse_info_offset) + if (span->deparse_info_offset != -1) append_attribute_string(str, "query.deparse_info", - json_ctx->qbuffer + span->deparse_info_offset, true); + shared_str + span->deparse_info_offset, true); } if (span->sql_error_code > 0) @@ -299,7 +299,7 @@ append_span(const JsonContext * json_ctx, const Span * span) const char *operation_name; StringInfo str = json_ctx->str; - operation_name = get_operation_name(span, json_ctx->qbuffer, json_ctx->qbuffer_size); + operation_name = get_operation_name(span); pg_snprintf(trace_id, 33, INT64_HEX_FORMAT INT64_HEX_FORMAT, span->trace_id.traceid_left, @@ -403,11 +403,9 @@ aggregate_span_by_type(JsonContext * json_ctx, const pgTracingSpans * pgTracingS * Prepare json context for json marshalling */ void -build_json_context(JsonContext * json_ctx, const char *qbuffer, Size qbuffer_size, const pgTracingSpans * pgTracingSpans) +build_json_context(JsonContext * json_ctx, const pgTracingSpans * pgTracingSpans) { json_ctx->str = makeStringInfo(); - json_ctx->qbuffer = qbuffer; - json_ctx->qbuffer_size = qbuffer_size; memset(json_ctx->span_type_count, 0, sizeof(json_ctx->span_type_count)); memset(json_ctx->span_type_to_spans, 0, sizeof(json_ctx->span_type_to_spans)); diff --git a/src/pg_tracing_operation_hash.c b/src/pg_tracing_operation_hash.c new file mode 100644 index 0000000..42602df --- /dev/null +++ b/src/pg_tracing_operation_hash.c @@ -0,0 +1,102 @@ +/*------------------------------------------------------------------------- + * + * pg_tracing_operation_hash.c + * pg_tracing plan explain functions. + * + * IDENTIFICATION + * src/pg_tracing_operation_hash.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" +#include "utils/hsearch.h" +#include "storage/shmem.h" +#include "pg_tracing.h" + +static HTAB *operation_name_hash = NULL; + +typedef struct operationKey +{ + uint64 query_id; + SpanType span_type; +} operationKey; + +typedef struct operationEntry +{ + operationKey key; /* hash key of entry - MUST BE FIRST */ + Size query_offset; /* query text offset in external file */ +} operationEntry; + +/* + * Initialize operation hash in shared memory + */ +void +init_operation_hash(void) +{ + HASHCTL info; + + info.keysize = sizeof(operationKey); + info.entrysize = sizeof(operationEntry); + operation_name_hash = ShmemInitHash("pg_tracing hash", + 100, 50000, &info, + HASH_ELEM | HASH_BLOBS); +} + +/* + * Reset content of the operation hash table + */ +void +reset_operation_hash(void) +{ + HASH_SEQ_STATUS status; + operationEntry *hentry; + + Assert(operation_name_hash != NULL); + + /* Currently we just flush all entries; hard to be smarter ... */ + hash_seq_init(&status, operation_name_hash); + + while ((hentry = (operationEntry *) hash_seq_search(&status)) != NULL) + { + if (hash_search(operation_name_hash, + &hentry->key, + HASH_REMOVE, NULL) == NULL) + elog(ERROR, "hash table corrupted"); + } +} + +Size +lookup_operation_name(const Span * span, const char *txt) +{ + bool found; + Size offset; + operationKey key; + operationEntry *entry; + + key.query_id = span->query_id; + key.span_type = span->type; + Assert(span->query_id != 0); + + if (operation_name_hash == NULL) + { + HASHCTL info; + + info.keysize = sizeof(operationKey); + info.entrysize = sizeof(operationEntry); + operation_name_hash = ShmemInitHash("pg_tracing operation name hash", 100, + 10000, &info, HASH_ELEM | HASH_BLOBS); + } + entry = (operationEntry *) hash_search(operation_name_hash, &key, HASH_ENTER, &found); + + if (found) + { + return entry->query_offset; + } + + offset = pg_tracing_shared_state->extent; + append_str_to_shared_str(txt, strlen(txt) + 1); + /* Update hash's entry */ + entry->query_offset = offset; + + return offset; +} diff --git a/src/pg_tracing_otel.c b/src/pg_tracing_otel.c index 3b09666..1da50ad 100644 --- a/src/pg_tracing_otel.c +++ b/src/pg_tracing_otel.c @@ -120,24 +120,10 @@ static void send_spans_to_otel_collector(OtelContext * octx) { int num_spans = 0; - char *qbuffer; - Size qbuffer_size = 0; JsonContext json_ctx; - /* - * Do a quick check with a shared lock to see if there are any spans to - * send - */ - LWLockAcquire(pg_tracing_shared_state->lock, LW_SHARED); - if (shared_spans->end == 0) - { - LWLockRelease(pg_tracing_shared_state->lock); - return; - } - LWLockRelease(pg_tracing_shared_state->lock); - LWLockAcquire(pg_tracing_shared_state->lock, LW_EXCLUSIVE); - /* Recheck for spans to send */ + /* Check if we have spans to send */ if (shared_spans->end == 0) { LWLockRelease(pg_tracing_shared_state->lock); @@ -145,17 +131,10 @@ send_spans_to_otel_collector(OtelContext * octx) } num_spans = shared_spans->end; - qbuffer = qtext_load_file(&qbuffer_size); - if (qbuffer == NULL) - { - LWLockRelease(pg_tracing_shared_state->lock); - return; - } - /* Do marshalling within marshal memory context */ MemoryContextSwitchTo(marshal_mem_ctx); /* Build the json context */ - build_json_context(&json_ctx, qbuffer, qbuffer_size, shared_spans); + build_json_context(&json_ctx, shared_spans); marshal_spans_to_json(&json_ctx); MemoryContextSwitchTo(otel_exporter_mem_ctx); @@ -185,7 +164,6 @@ send_spans_to_otel_collector(OtelContext * octx) * marshalling */ MemoryContextReset(marshal_mem_ctx); - free(qbuffer); } /* diff --git a/src/pg_tracing_planstate.c b/src/pg_tracing_planstate.c index 6041071..01e7f98 100644 --- a/src/pg_tracing_planstate.c +++ b/src/pg_tracing_planstate.c @@ -621,7 +621,8 @@ create_span_node(PlanState *planstate, const planstateTraceContext * planstateTr operation_name = plan_to_rel_name(planstateTraceContext, planstate); len_operation_name = strlen(operation_name); if (len_operation_name > 0) - span.operation_name_offset = add_str_to_trace_buffer(operation_name, len_operation_name); + span.operation_name_offset = appendStringInfoNT(planstateTraceContext->plan_name_buffer, + operation_name, len_operation_name); /* deparse_ctx is NULL if deparsing was disabled */ if (planstateTraceContext->deparse_ctx != NULL) @@ -629,11 +630,13 @@ create_span_node(PlanState *planstate, const planstateTraceContext * planstateTr deparse_info = plan_to_deparse_info(planstateTraceContext, planstate); deparse_info_len = strlen(deparse_info); if (deparse_info_len > 0) - span.deparse_info_offset = add_str_to_trace_buffer(deparse_info, deparse_info_len); + span.deparse_info_offset = appendStringInfoNT(planstateTraceContext->deparse_info_buffer, + deparse_info, deparse_info_len); } } else if (subplan_name != NULL) - span.operation_name_offset = add_str_to_trace_buffer(subplan_name, strlen(subplan_name)); + span.operation_name_offset = appendStringInfoNT(planstateTraceContext->plan_name_buffer, + subplan_name, strlen(subplan_name)); span.node_counters.rows = (int64) planstate->instrument->ntuples / planstate->instrument->nloops; span.node_counters.nloops = (int64) planstate->instrument->nloops; @@ -658,12 +661,13 @@ create_span_node(PlanState *planstate, const planstateTraceContext * planstateTr void process_planstate(const Traceparent * traceparent, const QueryDesc *queryDesc, int sql_error_code, bool deparse_plan, uint64 parent_id, - TimestampTz parent_start, TimestampTz parent_end) + uint64 query_id, + TimestampTz parent_start, TimestampTz parent_end, + StringInfo deparse_info_buffer, StringInfo plan_name_buffer) { Bitmapset *rels_used = NULL; planstateTraceContext planstateTraceContext; TimestampTz latest_end = 0; - uint64 query_id = queryDesc->plannedstmt->queryId; if (queryDesc->planstate == NULL || queryDesc->planstate->instrument == NULL) return; @@ -671,6 +675,8 @@ process_planstate(const Traceparent * traceparent, const QueryDesc *queryDesc, planstateTraceContext.rtable_names = select_rtable_names_for_explain(queryDesc->plannedstmt->rtable, rels_used); planstateTraceContext.trace_id = traceparent->trace_id; planstateTraceContext.ancestors = NULL; + planstateTraceContext.deparse_info_buffer = deparse_info_buffer; + planstateTraceContext.plan_name_buffer = plan_name_buffer; planstateTraceContext.sql_error_code = sql_error_code; /* Prepare the planstate context for deparsing */ planstateTraceContext.deparse_ctx = NULL; diff --git a/src/pg_tracing_query_process.c b/src/pg_tracing_query_process.c index fe8b0f2..90a4973 100644 --- a/src/pg_tracing_query_process.c +++ b/src/pg_tracing_query_process.c @@ -393,154 +393,3 @@ normalise_query(const char *query, int query_loc, int *query_len_p) norm_query[n_quer_loc] = '\0'; return norm_query; } - -/* - * Store text in the pg_tracing stat file - */ -bool -text_store_file(pgTracingSharedState * pg_tracing, const char *text, int text_len, - Size *query_offset) -{ - Size off; - int fd; - - off = pg_tracing->extent; - pg_tracing->extent += text_len + 1; - - /* - * Don't allow the file to grow larger than what qtext_load_file can - * (theoretically) handle. This has been seen to be reachable on 32-bit - * platforms. - */ - if (unlikely(text_len >= MaxAllocHugeSize - off)) - { - errno = EFBIG; /* not quite right, but it'll do */ - fd = -1; - goto error; - } - - /* Now write the data into the successfully-reserved part of the file */ - fd = OpenTransientFile(PG_TRACING_TEXT_FILE, O_RDWR | O_CREAT | PG_BINARY); - if (fd < 0) - goto error; - - if (pg_pwrite(fd, text, text_len, off) != text_len) - goto error; - if (pg_pwrite(fd, "\0", 1, off + text_len) != 1) - goto error; - - CloseTransientFile(fd); - - /* - * Set offset once write was successful - */ - *query_offset = off; - - return true; - -error: - ereport(LOG, - (errcode_for_file_access(), - errmsg("could not write file \"%s\": %m", - PG_TRACING_TEXT_FILE))); - - if (fd >= 0) - CloseTransientFile(fd); - - return false; -} - -/* - * Read the external query text file into a malloc'd buffer. - * - * Returns NULL (without throwing an error) if unable to read, eg file not - * there or insufficient memory. - * - * On success, the buffer size is also returned into *buffer_size. - */ -char * -qtext_load_file(Size *buffer_size) -{ - char *buf; - int fd; - struct stat stat; - Size nread; - - fd = OpenTransientFile(PG_TRACING_TEXT_FILE, O_RDONLY | PG_BINARY); - if (fd < 0) - { - if (errno != ENOENT) - ereport(LOG, - (errcode_for_file_access(), - errmsg("could not read file \"%s\": %m", - PG_TRACING_TEXT_FILE))); - return NULL; - } - - /* Get file length */ - if (fstat(fd, &stat)) - { - ereport(LOG, - (errcode_for_file_access(), - errmsg("could not stat file \"%s\": %m", - PG_TRACING_TEXT_FILE))); - CloseTransientFile(fd); - return NULL; - } - - /* Allocate buffer; beware that off_t might be wider than size_t */ - if (stat.st_size <= MaxAllocHugeSize) - buf = (char *) malloc(stat.st_size); - else - buf = NULL; - if (buf == NULL) - { - ereport(LOG, - (errcode(ERRCODE_OUT_OF_MEMORY), - errmsg("out of memory"), - errdetail("Could not allocate enough memory to read file \"%s\".", - PG_TRACING_TEXT_FILE))); - CloseTransientFile(fd); - return NULL; - } - - /* - * OK, slurp in the file. Windows fails if we try to read more than - * INT_MAX bytes at once, and other platforms might not like that either, - * so read a very large file in 1GB segments. - */ - nread = 0; - while (nread < stat.st_size) - { - int toread = Min(1024 * 1024 * 1024, stat.st_size - nread); - - /* - * If we get a short read and errno doesn't get set, the reason is - * probably that garbage collection truncated the file since we did - * the fstat(), so we don't log a complaint --- but we don't return - * the data, either, since it's most likely corrupt due to concurrent - * writes from garbage collection. - */ - errno = 0; - if (read(fd, buf + nread, toread) != toread) - { - if (errno) - ereport(LOG, - (errcode_for_file_access(), - errmsg("could not read file \"%s\": %m", - PG_TRACING_TEXT_FILE))); - free(buf); - CloseTransientFile(fd); - return NULL; - } - nread += toread; - } - - if (CloseTransientFile(fd) != 0) - ereport(LOG, - (errcode_for_file_access(), - errmsg("could not close file \"%s\": %m", PG_TRACING_TEXT_FILE))); - - *buffer_size = nread; - return buf; -} diff --git a/src/pg_tracing_span.c b/src/pg_tracing_span.c index 657323a..5b020dd 100644 --- a/src/pg_tracing_span.c +++ b/src/pg_tracing_span.c @@ -54,6 +54,7 @@ begin_span(TraceId trace_id, Span * span, SpanType type, span->database_id = MyDatabaseId; span->user_id = GetUserId(); span->subxact_count = MyProc->subxidStatus.count; + Assert(query_id > 0); span->query_id = query_id; memset(&span->node_counters, 0, sizeof(NodeCounters)); memset(&span->plan_counters, 0, sizeof(PlanCounters)); @@ -283,7 +284,7 @@ is_span_top(SpanType span_type) * For node span, the name may be pulled from the stat file. */ const char * -get_operation_name(const Span * span, const char *qbuffer, Size qbuffer_size) +get_operation_name(const Span * span) { const char *span_type_str; const char *operation_str = NULL; @@ -296,9 +297,9 @@ get_operation_name(const Span * span, const char *qbuffer, Size qbuffer_size) } span_type_str = span_type_to_str(span->type); - if (span->operation_name_offset != -1 && qbuffer_size > 0 - && span->operation_name_offset <= qbuffer_size) - operation_str = qbuffer + span->operation_name_offset; + /* TODO: Check for maximum offset */ + if (span->operation_name_offset != -1) + operation_str = shared_str + span->operation_name_offset; else return span_type_str; diff --git a/src/pg_tracing_sql_functions.c b/src/pg_tracing_sql_functions.c index 6e048c8..ed3d0a6 100644 --- a/src/pg_tracing_sql_functions.c +++ b/src/pg_tracing_sql_functions.c @@ -34,6 +34,7 @@ get_empty_pg_tracing_stats(void) stats.processed_spans = 0; stats.dropped_traces = 0; stats.dropped_spans = 0; + stats.dropped_str = 0; stats.otel_sent_spans = 0; stats.otel_failures = 0; stats.last_consume = 0; @@ -137,11 +138,11 @@ add_node_counters(const NodeCounters * node_counters, int i, Datum *values) * Generate ArrayType with span's parameters */ static Datum -generate_array_parameters(const char *qbuffer, const Span * span) +generate_array_parameters(const Span * span) { Datum *entries; ArrayType *array; - const char *cursor = qbuffer + span->parameter_offset; + const char *cursor = shared_str + span->parameter_offset; entries = (Datum *) palloc(sizeof(Datum) * span->num_parameters); for (int j = 0; j < span->num_parameters; j++) @@ -162,8 +163,7 @@ generate_array_parameters(const char *qbuffer, const Span * span) * Build the tuple for a Span and add it to the output */ static void -add_result_span(ReturnSetInfo *rsinfo, Span * span, - const char *qbuffer, Size qbuffer_size) +add_result_span(ReturnSetInfo *rsinfo, Span * span) { #define PG_TRACING_TRACES_COLS 44 Datum values[PG_TRACING_TRACES_COLS] = {0}; @@ -177,7 +177,7 @@ add_result_span(ReturnSetInfo *rsinfo, Span * span, char span_id[17]; span_type = span_type_to_str(span->type); - operation_name = get_operation_name(span, qbuffer, qbuffer_size); + operation_name = get_operation_name(span); sql_error_code = unpack_sql_state(span->sql_error_code); pg_snprintf(trace_id, 33, INT64_HEX_FORMAT INT64_HEX_FORMAT, @@ -213,15 +213,13 @@ add_result_span(ReturnSetInfo *rsinfo, Span * span, i = add_node_counters(&span->node_counters, i, values); values[i++] = Int64GetDatumFast(span->startup); - if (span->parameter_offset != -1 - && qbuffer_size > 0 - && qbuffer_size > span->parameter_offset) - values[i++] = generate_array_parameters(qbuffer, span); + if (span->parameter_offset != -1) + values[i++] = generate_array_parameters(span); else nulls[i++] = 1; - if (span->deparse_info_offset != -1 && qbuffer_size > 0 && qbuffer_size > span->deparse_info_offset) - values[i++] = CStringGetTextDatum(qbuffer + span->deparse_info_offset); + if (span->deparse_info_offset != -1) + values[i++] = CStringGetTextDatum(shared_str + span->deparse_info_offset); } for (int j = i; j < PG_TRACING_TRACES_COLS; j++) @@ -233,22 +231,13 @@ add_result_span(ReturnSetInfo *rsinfo, Span * span, Datum pg_tracing_json_spans(PG_FUNCTION_ARGS) { - const char *qbuffer; - Size qbuffer_size = 0; JsonContext json_ctx; /* Don't trace this */ cleanup_tracing(); LWLockAcquire(pg_tracing_shared_state->lock, LW_SHARED); - qbuffer = qtext_load_file(&qbuffer_size); - if (qbuffer == NULL) - { - LWLockRelease(pg_tracing_shared_state->lock); - return (Datum) 0; - } - - build_json_context(&json_ctx, qbuffer, qbuffer_size, shared_spans); + build_json_context(&json_ctx, shared_spans); marshal_spans_to_json(&json_ctx); LWLockRelease(pg_tracing_shared_state->lock); @@ -267,8 +256,6 @@ pg_tracing_spans(PG_FUNCTION_ARGS) bool consume; ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; Span *span; - char *qbuffer; - Size qbuffer_size = 0; LWLockMode lock_mode = LW_SHARED; consume = PG_GETARG_BOOL(0); @@ -295,28 +282,11 @@ pg_tracing_spans(PG_FUNCTION_ARGS) */ cleanup_tracing(); - qbuffer = qtext_load_file(&qbuffer_size); - if (qbuffer == NULL) - - /* - * It's possible to get NULL if file was truncated while we read it. - * Abort in this case. - */ - return (Datum) 0; - LWLockAcquire(pg_tracing_shared_state->lock, lock_mode); - /* There was a new write, reload the text file */ - if (pg_tracing_shared_state->extent <= qbuffer_size) - { - free(qbuffer); - qbuffer = qtext_load_file(&qbuffer_size); - } - Assert(pg_tracing_shared_state->extent <= qbuffer_size); - for (int i = 0; i < shared_spans->end; i++) { span = shared_spans->spans + i; - add_result_span(rsinfo, span, qbuffer, qbuffer_size); + add_result_span(rsinfo, span); } /* Consume is set, remove spans from the shared buffer */ @@ -324,7 +294,6 @@ pg_tracing_spans(PG_FUNCTION_ARGS) drop_all_spans_locked(); LWLockRelease(pg_tracing_shared_state->lock); - free(qbuffer); return (Datum) 0; } diff --git a/src/pg_tracing_strinfo.c b/src/pg_tracing_strinfo.c new file mode 100644 index 0000000..67d3736 --- /dev/null +++ b/src/pg_tracing_strinfo.c @@ -0,0 +1,29 @@ +/*------------------------------------------------------------------------- + * + * pg_tracing_strinfo.c + * utility functions for strinfo manipulation. + * + * IDENTIFICATION + * src/pg_tracing_strinfo.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include "pg_tracing.h" + +/* + * Append a null terminated string to provided StringInfo. + * Returns the position where str was inserted + */ +int +appendStringInfoNT(StringInfo strinfo, const char *str, int str_len) +{ + int position = strinfo->len; + + Assert(str_len > 0); + + appendBinaryStringInfo(strinfo, str, str_len); + appendStringInfoChar(strinfo, '\0'); + return position; +}