Skip to content

Commit

Permalink
Add tracing of parallel queries
Browse files Browse the repository at this point in the history
On parallel query, save the tracecontext in a shared memory. Parallel
worker will be able to pull the tracecontext from it and generate spans
on their own.
  • Loading branch information
bonnefoa committed Mar 22, 2024
1 parent 89a8192 commit da0af6c
Show file tree
Hide file tree
Showing 7 changed files with 321 additions and 9 deletions.
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,11 @@ OBJS = \
$(WIN32RES) \
src/pg_tracing.o \
src/pg_tracing_query_process.o \
src/pg_tracing_parallel.o \
src/pg_tracing_span.o

REGRESSCHECKS = utility select extended insert trigger sample \
subxact full_buffer nested wal cleanup
parallel subxact full_buffer nested wal cleanup
REGRESSCHECKS_OPTS = --no-locale --encoding=UTF8 --temp-config pg_tracing.conf --temp-instance=./tmp_check

PGXS := $(shell $(PG_CONFIG) --pgxs)
Expand Down
32 changes: 24 additions & 8 deletions doc/pg_tracing.md
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,26 @@ The `pg_tracing_consume_spans` and `pg_tracing_peek_spans` views are defined in

## Parameters

### pg_tracing.buffer_mode (enum)

Controls span buffer's behaviour when `pg_tracing.max_span` spans is reached. If `keep_on_full`, the existing buffer is kept while new spans are dropped. If `drop_on_full`, the existing buffer is dropped and new spans are added. The default value is `keep_on_full`.

### pg_tracing.caller_sample_rate (real)

Controls the fraction of statements with SQLCommenter tracecontext and an enabled sampled flag that will generate spans. The default value is 1.

### pg_tracing.export_parameters (boolean)

Controls whether the query's parameters should be exported in spans metadata. The default value is `on`.

### pg_tracing.filter_query_ids (string)

Restrict sampling to the provided queryIds. An empty value won't filter any queries. The default value is empty.

### pg_tracing.max_parameter_size (integer)

Controls the maximum size of the parameter string. The default value is 1024.

### pg_tracing.max_span (integer)

Specifies the maximum number of spans stored by the extension. If more spans are generated, the span buffer will be emptied if `pg_tracing.buffer_mode` is set to `drop_on_full`. If `pg_tracing.buffer_mode` is set to `keep_on_full`, the new spans will be dropped and tracing will be aborted. The default value is 5000. This parameter can only be set at server start.
Expand All @@ -129,13 +149,13 @@ FROM pg_shmem_allocations
WHERE name ='PgTracing Spans';
```

### pg_tracing.buffer_mode (enum)
### pg_tracing.sample_rate (real)

Controls span buffer's behaviour when `pg_tracing.max_span` spans is reached. If `keep_on_full`, the existing buffer is kept while new spans are dropped. If `drop_on_full`, the existing buffer is dropped and new spans are added. The default value is `keep_on_full`.
Controls the fraction of statements that generate spans. Statements with tracecontext propagated with SQLCommenter and sampled flag enabled are not impacted by this parameter. For traces with nested statements, either all will be explained or none. The default value is 0.

### pg_tracing.max_parameter_size (integer)
### pg_tracing.trace_parallel_workers (boolean)

Controls the maximum size of the parameter string. The default value is 1024.
Controls whether spans should be generated for workers created by parallel queries.

### pg_tracing.track (enum)

Expand All @@ -153,10 +173,6 @@ Controls the fraction of statements that generate spans. Statements with traceco

Controls the fraction of statements with SQLCommenter tracecontext and an enabled sampled flag that will generate spans. The default value is 1.

### pg_tracing.filter_query_ids (string)

Restrict sampling to the provided queryIds. An empty value won't filter any queries. The default value is empty.

### pg_tracing.export_parameters (boolean)

Controls whether the query's parameters should be exported in spans metadata. The default value is `on`.
62 changes: 62 additions & 0 deletions expected/parallel.out
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
begin;
-- encourage use of parallel plans
set local parallel_setup_cost=0;
set local parallel_tuple_cost=0;
set local min_parallel_table_scan_size=0;
set local max_parallel_workers_per_gather=2;
-- Trace parallel queries
/*dddbs='postgres.db',traceparent='00-00000000000000000000000000000001-0000000000000001-01'*/ select 1 from pg_class limit 1;
?column?
----------
1
(1 row)

/*dddbs='postgres.db',traceparent='00-00000000000000000000000000000002-0000000000000002-01'*/ select 2 from pg_class limit 1;
?column?
----------
2
(1 row)

/*dddbs='postgres.db',traceparent='00-00000000000000000000000000000003-0000000000000003-00'*/ select 3 from pg_class limit 1;
?column?
----------
3
(1 row)

-- Try with parallel tracing disabled
set local pg_tracing.trace_parallel_workers = false;
/*dddbs='postgres.db',traceparent='00-00000000000000000000000000000004-0000000000000004-01'*/ select 4 from pg_class limit 1;
?column?
----------
4
(1 row)

commit;
-- Get root top span id
SELECT span_id as root_span_id from pg_tracing_peek_spans where span_type='Select query' and trace_id='00000000000000000000000000000001' and parent_id='0000000000000001' \gset
-- Get executor top span id
SELECT span_id as executor_span_id from pg_tracing_peek_spans where span_operation='ExecutorRun' and trace_id='00000000000000000000000000000001' and parent_id=:'root_span_id' \gset
-- Check the select spans that are attached to the root top span
SELECT trace_id, span_type, span_operation from pg_tracing_peek_spans where span_type='Select query' and parent_id=:'executor_span_id' order by span_operation;
trace_id | span_type | span_operation
----------------------------------+--------------+----------------
00000000000000000000000000000001 | Select query | Worker 0
00000000000000000000000000000001 | Select query | Worker 1
(2 rows)

-- Check generated trace_id
SELECT trace_id from pg_tracing_peek_spans group by trace_id;
trace_id
----------------------------------
00000000000000000000000000000001
00000000000000000000000000000004
00000000000000000000000000000002
(3 rows)

-- Check number of executor spans
SELECT count(*) from pg_tracing_consume_spans where span_type='Executor';
count
-------
7
(1 row)

30 changes: 30 additions & 0 deletions sql/parallel.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
begin;
-- encourage use of parallel plans
set local parallel_setup_cost=0;
set local parallel_tuple_cost=0;
set local min_parallel_table_scan_size=0;
set local max_parallel_workers_per_gather=2;

-- Trace parallel queries
/*dddbs='postgres.db',traceparent='00-00000000000000000000000000000001-0000000000000001-01'*/ select 1 from pg_class limit 1;
/*dddbs='postgres.db',traceparent='00-00000000000000000000000000000002-0000000000000002-01'*/ select 2 from pg_class limit 1;
/*dddbs='postgres.db',traceparent='00-00000000000000000000000000000003-0000000000000003-00'*/ select 3 from pg_class limit 1;

-- Try with parallel tracing disabled
set local pg_tracing.trace_parallel_workers = false;
/*dddbs='postgres.db',traceparent='00-00000000000000000000000000000004-0000000000000004-01'*/ select 4 from pg_class limit 1;
commit;

-- Get root top span id
SELECT span_id as root_span_id from pg_tracing_peek_spans where span_type='Select query' and trace_id='00000000000000000000000000000001' and parent_id='0000000000000001' \gset
-- Get executor top span id
SELECT span_id as executor_span_id from pg_tracing_peek_spans where span_operation='ExecutorRun' and trace_id='00000000000000000000000000000001' and parent_id=:'root_span_id' \gset

-- Check the select spans that are attached to the root top span
SELECT trace_id, span_type, span_operation from pg_tracing_peek_spans where span_type='Select query' and parent_id=:'executor_span_id' order by span_operation;

-- Check generated trace_id
SELECT trace_id from pg_tracing_peek_spans group by trace_id;

-- Check number of executor spans
SELECT count(*) from pg_tracing_consume_spans where span_type='Executor';
68 changes: 68 additions & 0 deletions src/pg_tracing.c
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,9 @@ typedef struct pgTracingQueryIdFilter
static int pg_tracing_max_span; /* Maximum number of spans to store */
static int pg_tracing_max_parameter_str; /* Maximum number of spans to
* store */
static bool pg_tracing_trace_parallel_workers = true; /* True to generate
* spans from parallel
* workers */
static double pg_tracing_sample_rate = 0; /* Sample rate applied to queries
* without SQLCommenter */
static double pg_tracing_caller_sample_rate = 1; /* Sample rate applied to
Expand Down Expand Up @@ -287,6 +290,17 @@ _PG_init(void)
NULL,
NULL);

DefineCustomBoolVariable("pg_tracing.trace_parallel_workers",
"Whether to generate samples from parallel workers.",
NULL,
&pg_tracing_trace_parallel_workers,
true,
PGC_USERSET,
0,
NULL,
NULL,
NULL);

DefineCustomEnumVariable("pg_tracing.track",
"Selects which statements are tracked by pg_tracing.",
NULL,
Expand Down Expand Up @@ -415,6 +429,8 @@ pg_tracing_memsize(void)
size = add_size(size, sizeof(pgTracingSpans));
/* the span variable array */
size = add_size(size, mul_size(pg_tracing_max_span, sizeof(Span)));
/* and the parallel workers context */
size = add_size(size, mul_size(max_parallel_workers, sizeof(pgTracingParallelContext)));

return size;
}
Expand Down Expand Up @@ -522,6 +538,23 @@ add_str_to_trace_buffer(const char *str, int str_len)
return position;
}


/*
* Add the worker name to the provided stringinfo
*/
static int
add_worker_name_to_trace_buffer(StringInfo str_info, int parallel_worker_number)
{
int position = str_info->cursor;

Assert(str_len > 0);

appendStringInfo(str_info, "Worker %d", parallel_worker_number);
appendStringInfoChar(str_info, '\0');
str_info->cursor = str_info->len;
return position;
}

/*
* Store a span in the current_trace_spans buffer
*/
Expand Down Expand Up @@ -767,6 +800,9 @@ pg_tracing_shmem_startup(void)
"pg_tracing memory context",
ALLOCSET_DEFAULT_SIZES);

/* Initialize shmem for trace propagation to parallel workers */
pg_tracing_shmem_parallel_startup();

/* First time, let's init shared state */
if (!found_pg_tracing)
{
Expand Down Expand Up @@ -928,6 +964,17 @@ extract_trace_context(struct pgTracingTraceContext *trace_context, ParseState *p
if (pg_tracing_sample_rate == 0 && pg_tracing_caller_sample_rate == 0)
return;

/*
* In a parallel worker, check the parallel context shared buffer to see
* if the leader left a trace context
*/
if (IsParallelWorker())
{
if (pg_tracing_trace_parallel_workers)
fetch_parallel_context(trace_context);
return;
}

Assert(trace_context->root_span.span_id == 0);
Assert(traceid_zero(trace_context->traceparent.trace_id));

Expand Down Expand Up @@ -975,6 +1022,8 @@ cleanup_tracing(void)
!current_trace_context.traceparent.sampled)
/* No need for cleaning */
return;
if (pg_tracing_trace_parallel_workers)
remove_parallel_context();
MemoryContextReset(pg_tracing_mem_ctx);
reset_trace_context(&root_trace_context);
reset_trace_context(&current_trace_context);
Expand Down Expand Up @@ -1199,6 +1248,16 @@ begin_top_span(pgTracingTraceContext * trace_context, Span * top_span,
NULL, parent_id,
per_level_buffers[exec_nested_level].query_id, &start_time);

if (IsParallelWorker())
{
/*
* In a parallel worker, we use the worker name as the span's
* operation
*/
top_span->operation_name_offset = add_worker_name_to_trace_buffer(current_trace_text, ParallelWorkerNumber);
return;
}

if (jstate && jstate->clocations_count > 0 && query != NULL)
{
/* jstate is available, normalise query and extract parameters' values */
Expand Down Expand Up @@ -1572,6 +1631,7 @@ pg_tracing_ExecutorStart(QueryDesc *queryDesc, int eflags)
* ExecutorRun hook: track nesting depth and create ExecutorRun span.
* ExecutorRun can create nested queries so we need to create ExecutorRun span
* as a top span.
* If the plan needs to create parallel workers, push the trace context in the parallel shared buffer.
*/
static void
pg_tracing_ExecutorRun(QueryDesc *queryDesc, ScanDirection direction, uint64 count,
Expand All @@ -1592,6 +1652,14 @@ pg_tracing_ExecutorRun(QueryDesc *queryDesc, ScanDirection direction, uint64 cou
begin_span(trace_context->traceparent.trace_id, executor_run_span,
SPAN_EXECUTOR_RUN, NULL, parent_id,
per_level_buffers[exec_nested_level].query_id, &span_start_time);

/*
* If this query starts parallel worker, push the trace context for
* the child processes
*/
if (queryDesc->plannedstmt->parallelModeNeeded && pg_tracing_trace_parallel_workers)
add_parallel_context(trace_context, executor_run_span->span_id,
per_level_buffers[exec_nested_level].query_id);
}

exec_nested_level++;
Expand Down
26 changes: 26 additions & 0 deletions src/pg_tracing.h
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,32 @@ typedef struct pgTracingTraceContext
Span root_span; /* Top span for exec_nested_level 0 */
} pgTracingTraceContext;

/*
* A trace context for a specific parallel context
*/
typedef struct pgTracingParallelContext
{
BackendId leader_backend_id; /* Backend id of the leader, set to
* InvalidBackendId if unused */
pgTracingTraceContext trace_context;
} pgTracingParallelContext;

/*
* Store context for parallel workers
*/
typedef struct pgTracingParallelWorkers
{
slock_t mutex;
pgTracingParallelContext trace_contexts[FLEXIBLE_ARRAY_MEMBER];
} pgTracingParallelWorkers;

/* pg_tracing_parallel.c */
extern void pg_tracing_shmem_parallel_startup(void);
extern void add_parallel_context(const struct pgTracingTraceContext *trace_context,
uint64 parent_id, uint64 query_id);
extern void remove_parallel_context(void);
extern void fetch_parallel_context(pgTracingTraceContext * trace_context);

/* pg_tracing_query_process.c */
extern const char *normalise_query_parameters(const JumbleState *jstate, const char *query,
int query_loc, int *query_len_p, char **param_str,
Expand Down
Loading

0 comments on commit da0af6c

Please sign in to comment.