Skip to content

Commit

Permalink
Set gather/gathermerge as parent of parallel workers
Browse files Browse the repository at this point in the history
  • Loading branch information
bonnefoa committed Sep 9, 2024
1 parent 2b481ac commit beabc49
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 22 deletions.
30 changes: 16 additions & 14 deletions expected/parallel.out
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,21 @@ set local max_parallel_workers_per_gather=2;
1
(1 row)

/*dddbs='postgres.db',traceparent='00-00000000000000000000000000000002-0000000000000002-01'*/ select 2 from pg_class limit 1;
/*dddbs='postgres.db',traceparent='00-00000000000000000000000000000001-0000000000000002-01'*/ select 2 from pg_class limit 1;
?column?
----------
2
(1 row)

/*dddbs='postgres.db',traceparent='00-00000000000000000000000000000003-0000000000000003-00'*/ select 3 from pg_class limit 1;
/*dddbs='postgres.db',traceparent='00-00000000000000000000000000000001-0000000000000003-00'*/ select 3 from pg_class limit 1;
?column?
----------
3
(1 row)

-- Try with parallel tracing disabled
set local pg_tracing.trace_parallel_workers = false;
/*dddbs='postgres.db',traceparent='00-00000000000000000000000000000004-0000000000000004-01'*/ select 4 from pg_class limit 1;
/*dddbs='postgres.db',traceparent='00-00000000000000000000000000000001-0000000000000004-01'*/ select 4 from pg_class limit 1;
?column?
----------
4
Expand All @@ -35,11 +35,15 @@ commit;
-- get tx block
select span_id as tx_block_id from pg_tracing_peek_spans where span_type='TransactionBlock' and trace_id='00000000000000000000000000000001' and parent_id='0000000000000001' \gset
-- get root top span id
select span_id as root_span_id from pg_tracing_peek_spans where span_type='Select query' and trace_id='00000000000000000000000000000001' and parent_id=:'tx_block_id' \gset
select span_id as root_span_id from pg_tracing_peek_spans where span_type='Select query' and trace_id='00000000000000000000000000000001' and parent_id=:'tx_block_id' limit 1 \gset
-- Get executor top span id
SELECT span_id as executor_span_id from pg_tracing_peek_spans where span_operation='ExecutorRun' and trace_id='00000000000000000000000000000001' and parent_id=:'root_span_id' \gset
-- Get Limit span id
SELECT span_id as limit_span_id from pg_tracing_peek_spans where span_operation='Limit' and trace_id='00000000000000000000000000000001' and parent_id=:'executor_span_id' \gset
-- Get Gather span id
SELECT span_id as gather_span_id from pg_tracing_peek_spans where span_operation='Gather' and trace_id='00000000000000000000000000000001' and parent_id=:'limit_span_id' \gset
-- Check the select spans that are attached to the root top span
SELECT trace_id, span_type, span_operation from pg_tracing_peek_spans where span_type='Select query' and parent_id=:'executor_span_id' order by span_operation;
SELECT trace_id, span_type, span_operation from pg_tracing_peek_spans where span_type='Select query' and parent_id=:'gather_span_id' order by span_operation;
trace_id | span_type | span_operation
----------------------------------+--------------+----------------
00000000000000000000000000000001 | Select query | Worker 0
Expand All @@ -51,9 +55,7 @@ SELECT trace_id from pg_tracing_peek_spans group by trace_id;
trace_id
----------------------------------
00000000000000000000000000000001
00000000000000000000000000000004
00000000000000000000000000000002
(3 rows)
(1 row)

-- Check number of executor spans
SELECT count(*) from pg_tracing_consume_spans where span_operation='ExecutorRun';
Expand Down Expand Up @@ -82,13 +84,13 @@ SELECT span_type, span_operation, lvl FROM peek_ordered_spans where trace_id='00
ExecutorRun | ExecutorRun | 2
Planner | Planner | 2
Limit | Limit | 3
Select query | Worker 0 | 3
Select query | Worker 1 | 3
ExecutorRun | ExecutorRun | 4
ExecutorRun | ExecutorRun | 4
Gather | Gather | 4
SeqScan | SeqScan on pg_class | 5
SeqScan | SeqScan on pg_class | 5
Select query | Worker 0 | 5
Select query | Worker 1 | 5
ExecutorRun | ExecutorRun | 6
ExecutorRun | ExecutorRun | 6
SeqScan | SeqScan on pg_class | 7
SeqScan | SeqScan on pg_class | 7
(11 rows)

-- Cleanup
Expand Down
14 changes: 9 additions & 5 deletions sql/parallel.sql
Original file line number Diff line number Diff line change
Expand Up @@ -7,23 +7,27 @@ set local max_parallel_workers_per_gather=2;

-- Trace parallel queries
/*dddbs='postgres.db',traceparent='00-00000000000000000000000000000001-0000000000000001-01'*/ select 1 from pg_class limit 1;
/*dddbs='postgres.db',traceparent='00-00000000000000000000000000000002-0000000000000002-01'*/ select 2 from pg_class limit 1;
/*dddbs='postgres.db',traceparent='00-00000000000000000000000000000003-0000000000000003-00'*/ select 3 from pg_class limit 1;
/*dddbs='postgres.db',traceparent='00-00000000000000000000000000000001-0000000000000002-01'*/ select 2 from pg_class limit 1;
/*dddbs='postgres.db',traceparent='00-00000000000000000000000000000001-0000000000000003-00'*/ select 3 from pg_class limit 1;

-- Try with parallel tracing disabled
set local pg_tracing.trace_parallel_workers = false;
/*dddbs='postgres.db',traceparent='00-00000000000000000000000000000004-0000000000000004-01'*/ select 4 from pg_class limit 1;
/*dddbs='postgres.db',traceparent='00-00000000000000000000000000000001-0000000000000004-01'*/ select 4 from pg_class limit 1;
commit;

-- get tx block
select span_id as tx_block_id from pg_tracing_peek_spans where span_type='TransactionBlock' and trace_id='00000000000000000000000000000001' and parent_id='0000000000000001' \gset
-- get root top span id
select span_id as root_span_id from pg_tracing_peek_spans where span_type='Select query' and trace_id='00000000000000000000000000000001' and parent_id=:'tx_block_id' \gset
select span_id as root_span_id from pg_tracing_peek_spans where span_type='Select query' and trace_id='00000000000000000000000000000001' and parent_id=:'tx_block_id' limit 1 \gset
-- Get executor top span id
SELECT span_id as executor_span_id from pg_tracing_peek_spans where span_operation='ExecutorRun' and trace_id='00000000000000000000000000000001' and parent_id=:'root_span_id' \gset
-- Get Limit span id
SELECT span_id as limit_span_id from pg_tracing_peek_spans where span_operation='Limit' and trace_id='00000000000000000000000000000001' and parent_id=:'executor_span_id' \gset
-- Get Gather span id
SELECT span_id as gather_span_id from pg_tracing_peek_spans where span_operation='Gather' and trace_id='00000000000000000000000000000001' and parent_id=:'limit_span_id' \gset

-- Check the select spans that are attached to the root top span
SELECT trace_id, span_type, span_operation from pg_tracing_peek_spans where span_type='Select query' and parent_id=:'executor_span_id' order by span_operation;
SELECT trace_id, span_type, span_operation from pg_tracing_peek_spans where span_type='Select query' and parent_id=:'gather_span_id' order by span_operation;

-- Check generated trace_id
SELECT trace_id from pg_tracing_peek_spans group by trace_id;
Expand Down
6 changes: 5 additions & 1 deletion src/pg_tracing.c
Original file line number Diff line number Diff line change
Expand Up @@ -1832,6 +1832,7 @@ pg_tracing_ExecutorRun(QueryDesc *queryDesc, ScanDirection direction, uint64 cou
Span *executor_run_span;
Traceparent *traceparent = &executor_traceparent;
int num_nodes;
uint64 parallel_workers_parent_id = 0;

if (!pg_tracing_enabled(traceparent, nested_level) || queryDesc->totaltime == NULL)
{
Expand Down Expand Up @@ -1876,7 +1877,10 @@ pg_tracing_ExecutorRun(QueryDesc *queryDesc, ScanDirection direction, uint64 cou
* child processes
*/
if (queryDesc->plannedstmt->parallelModeNeeded && pg_tracing_trace_parallel_workers)
add_parallel_context(traceparent, executor_run_span->span_id);
{
parallel_workers_parent_id = generate_parallel_workers_parent_id();
add_parallel_context(traceparent, parallel_workers_parent_id);
}

/*
* Setup ExecProcNode override to capture node start if planstate spans
Expand Down
2 changes: 2 additions & 0 deletions src/pg_tracing.h
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,8 @@ extern TimestampTz
get_span_end_from_planstate(PlanState *planstate, TimestampTz plan_start, TimestampTz root_end);
extern int
number_nodes_from_planstate(PlanState *planstate);
extern uint64
generate_parallel_workers_parent_id(void);

/* pg_tracing_query_process.c */
extern const char *normalise_query_parameters(const SpanContext * span_context, Span * span,
Expand Down
35 changes: 33 additions & 2 deletions src/pg_tracing_planstate.c
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ static int index_planstart = 0;
/* Maximum elements allocated in the traced_planstates array */
static int max_planstart = 0;

/* Span id of parallel worker's parent */
static uint64 parallel_workers_parent_id = 0;

static Span
create_span_node(PlanState *planstate, const planstateTraceContext * planstateTraceContext,
uint64 *span_id, uint64 parent_id, uint64 query_id, SpanType node_type,
Expand All @@ -51,6 +54,16 @@ cleanup_planstarts(void)
index_planstart = 0;
}

/*
* Generate a new span id that will be used for worker's parent
*/
uint64
generate_parallel_workers_parent_id(void)
{
parallel_workers_parent_id = pg_prng_int64(&pg_global_prng_state);
return parallel_workers_parent_id;
}

/*
* Fetch the node start of a planstate
*/
Expand Down Expand Up @@ -166,6 +179,8 @@ drop_traced_planstates(void)
static TupleTableSlot *
ExecProcNodeFirstPgTracing(PlanState *node)
{
uint64 span_id;

if (max_planstart == 0)

/*
Expand All @@ -186,10 +201,26 @@ ExecProcNodeFirstPgTracing(PlanState *node)
max_planstart * sizeof(TracedPlanstate));
}

switch (nodeTag(node))
{
case T_GatherState:
case T_GatherMergeState:

/*
* This is the parent node for the parallel workers, use the
* corrent span_id
*/
span_id = parallel_workers_parent_id;
break;
default:
/* Normal node, generate a random span_id */
span_id = pg_prng_uint64(&pg_global_prng_state);
}

/* Register planstate start */
traced_planstates[index_planstart].planstate = node;
traced_planstates[index_planstart].node_start = GetCurrentTimestamp();
traced_planstates[index_planstart].span_id = pg_prng_uint64(&pg_global_prng_state);
traced_planstates[index_planstart].span_id = span_id;
traced_planstates[index_planstart].nested_level = nested_level;
index_planstart++;

Expand Down Expand Up @@ -410,7 +441,7 @@ create_spans_from_planstate(PlanState *planstate, planstateTraceContext * planst
if (haschildren && planstateTraceContext->deparse_ctx)
planstateTraceContext->ancestors = lcons(planstate->plan, planstateTraceContext->ancestors);

/* We only need to walk the subplans if they are executed */
/* We only need to walk the subplans if they are executed */
subplan_executed = is_subplan_executed(planstate);
/* Walk the outerplan */
if (outerPlanState(planstate) && subplan_executed)
Expand Down

0 comments on commit beabc49

Please sign in to comment.