diff --git a/Makefile b/Makefile index c96aae2..ea4e976 100644 --- a/Makefile +++ b/Makefile @@ -4,8 +4,9 @@ MODULE_big = pg_query_state OBJS = pg_query_state.o signal_handler.o $(WIN32RES) EXTENSION = pg_query_state -EXTVERSION = 1.1 -DATA = pg_query_state--1.0--1.1.sql +EXTVERSION = 1.2 +DATA = pg_query_state--1.0--1.1.sql \ + pg_query_state--1.1--1.2.sql DATA_built = $(EXTENSION)--$(EXTVERSION).sql PGFILEDESC = "pg_query_state - facility to track progress of plan execution" @@ -13,9 +14,10 @@ EXTRA_CLEAN = ./isolation_output $(EXTENSION)--$(EXTVERSION).sql \ Dockerfile ./tests/*.pyc ./tmp_stress ISOLATION = corner_cases - ISOLATION_OPTS = --load-extension=pg_query_state +TAP_TESTS = 1 + ifdef USE_PGXS PG_CONFIG ?= pg_config PGXS := $(shell $(PG_CONFIG) --pgxs) diff --git a/README.md b/README.md index 6c983c1..bf4684c 100644 --- a/README.md +++ b/README.md @@ -321,3 +321,117 @@ Do not hesitate to post your issues, questions and new ideas at the [issues](htt ## Authors [Maksim Milyutin](https://github.com/maksm90) Alexey Kondratov Postgres Professional Ltd., Russia + +## Function progress\_bar +```plpgsql +pg_progress_bar( + integer pid +) returns FLOAT +``` +extracts the current query state from backend with specified 'pid'. Then gets the numerical values of the actual rows and total rows and count progress for the whole query tree. Function returns numeric value from 0 to 1 describing the measure of query fulfillment. If there is no information about current state of the query, or the impossibility of counting, the corresponding messages will be displayed. + +## Function progress\_bar\_visual +```plpgsql +pg_progress_bar_visual( + integer pid, + integer delay +) returns VOID +``` +cyclically extracts and print the current query state in numeric value from backend with specified 'pid' every period specified by 'delay' in seconds. This is the looping version of the progress\_bar function that returns void value. + +**_Warning_**: Calling role have to be superuser or member of the role whose backend is being called. Otherwise function prints ERROR message `permission denied`. + +## Examples +Assume first backend executes some function: +```sql +postgres=# insert into table_name select generate_series(1,10000000); +``` +Other backend can get the follow output: +```sql +postgres=# SELECT pid FROM pg_stat_activity where query like 'insert%'; + pid +------- + 23877 +(1 row) + +postgres=# SELECT pg_progress_bar(23877); + pg_progress_bar +----------------- + 0.6087927 +(1 row) +``` +Or continuous version: +```sql +postgres=# SELECT pg_progress_bar_visual(23877, 1); +Progress = 0.043510 +Progress = 0.085242 +Progress = 0.124921 +Progress = 0.168168 +Progress = 0.213803 +Progress = 0.250362 +Progress = 0.292632 +Progress = 0.331454 +Progress = 0.367509 +Progress = 0.407450 +Progress = 0.448646 +Progress = 0.488171 +Progress = 0.530559 +Progress = 0.565558 +Progress = 0.608039 +Progress = 0.645778 +Progress = 0.654842 +Progress = 0.699006 +Progress = 0.735760 +Progress = 0.787641 +Progress = 0.832160 +Progress = 0.871077 +Progress = 0.911858 +Progress = 0.956362 +Progress = 0.995097 +Progress = 1.000000 + pg_progress_bar_visual +------------------------ + 1 +(1 row) +``` +Also uncountable queries exist. Assume first backend executes some function: +```sql +DELETE from table_name; +``` +Other backend can get the follow output: +```sql +postgres=# SELECT pid FROM pg_stat_activity where query like 'delete%'; + pid +------- + 23877 +(1 row) + +postgres=# SELECT pg_progress_bar(23877); +INFO: Counting Progress doesn't available + pg_progress_bar +----------------- + -1 +(1 row) + +postgres=# SELECT pg_progress_bar_visual(23877, 5); +INFO: Counting Progress doesn't available + pg_progress_bar_visual +------------------------ + -1 +(1 row) +``` + +## Reinstallation +If you already have a module 'pg_query_state' without progress bar functions installed, execute this in the module's directory: +``` +make install USE_PGXS=1 +``` +It is essential to restart the PostgreSQL instance. After that, execute the following queries in psql: +```sql +DROP EXTENSION IF EXISTS pg_query_state; +CREATE EXTENSION pg_query_state; +``` + +## Authors +Ekaterina Sokolova Postgres Professional Ltd., Russia +Vyacheslav Makarov Postgres Professional Ltd., Russia diff --git a/init.sql b/init.sql index 3a9bb61..2dc7363 100644 --- a/init.sql +++ b/init.sql @@ -15,3 +15,14 @@ CREATE FUNCTION pg_query_state(pid integer , leader_pid integer) AS 'MODULE_PATHNAME' LANGUAGE C STRICT VOLATILE; + +CREATE FUNCTION pg_progress_bar(pid integer) + RETURNS FLOAT + AS 'MODULE_PATHNAME' + LANGUAGE C STRICT VOLATILE; + +CREATE FUNCTION pg_progress_bar_visual(pid integer + , delay integer = 1) + RETURNS FLOAT + AS 'MODULE_PATHNAME', 'pg_progress_bar' + LANGUAGE C STRICT VOLATILE; diff --git a/pg_query_state--1.1--1.2.sql b/pg_query_state--1.1--1.2.sql new file mode 100644 index 0000000..594d5ff --- /dev/null +++ b/pg_query_state--1.1--1.2.sql @@ -0,0 +1,14 @@ +-- complain if script is sourced in psql, rather than via CREATE EXTENSION +\echo Use "ALTER EXTENSION pg_query_state UPDATE TO '1.2'" to load this file. \quit + +CREATE FUNCTION pg_progress_bar(pid integer) + RETURNS FLOAT + AS 'MODULE_PATHNAME' + LANGUAGE C STRICT VOLATILE; + +CREATE FUNCTION pg_progress_bar_visual(pid integer + , delay integer = 1) + RETURNS FLOAT + AS 'MODULE_PATHNAME', 'pg_progress_bar' + LANGUAGE C STRICT VOLATILE; + \ No newline at end of file diff --git a/pg_query_state.c b/pg_query_state.c index 1949643..9329f78 100644 --- a/pg_query_state.c +++ b/pg_query_state.c @@ -63,11 +63,11 @@ static shm_mq_result receive_msg_by_parts(shm_mq_handle *mqh, Size *total, void **datap, int64 timeout, int *rc, bool nowait); /* Global variables */ -List *QueryDescStack = NIL; +List *QueryDescStack = NIL; static ProcSignalReason UserIdPollReason = INVALID_PROCSIGNAL; static ProcSignalReason QueryStatePollReason = INVALID_PROCSIGNAL; static ProcSignalReason WorkerPollReason = INVALID_PROCSIGNAL; -static bool module_initialized = false; +static bool module_initialized = false; static const char *be_state_str[] = { /* BackendState -> string repr */ "undefined", /* STATE_UNDEFINED */ "idle", /* STATE_IDLE */ @@ -103,8 +103,8 @@ static List *GetRemoteBackendQueryStates(PGPROC *leader, /* Shared memory variables */ shm_toc *toc = NULL; RemoteUserIdResult *counterpart_userid = NULL; -pg_qs_params *params = NULL; -shm_mq *mq = NULL; +pg_qs_params *params = NULL; +shm_mq *mq = NULL; /* * Estimate amount of shared memory needed. @@ -1247,3 +1247,259 @@ DetachPeer(void) ereport(LOG, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("pg_query_state peer is not responding"))); } + +/* + * Count progress of query execution like ratio of + * number of received to planned rows in persent. + * Changes of this function can lead to more plausible results. + */ +static double +CountProgress(char *plan_text) +{ + char *plan; /* Copy of plan_text */ + char *node; /* Part of plan with information about single node */ + char *rows; /* Pointer to rows */ + char *actual_rows_str; /* Actual rows in string format */ + char *plan_rows_str; /* Planned rows in string format */ + int len; /* Length of rows in string format */ + double actual_rows; /* Actual rows */ + double plan_rows; /* Planned rows */ + double progress = 0; /* Summary progress on nodes */ + int node_amount = 0; /* Amount of plantree nodes using in counting progress */ + + plan = palloc(sizeof(char) * (strlen(plan_text) + 1)); + strcpy(plan, plan_text); + node = strtok(plan, "["); /* Get information about upper node */ + while (node != NULL) + { + if (strstr(node, "Seq Scan") == NULL) + { + if (strstr(node, "ModifyTable") == NULL) + { + if (strstr(node, "Result") == NULL) + { + if ((rows = strstr(node, "Rows Removed by Filter")) != NULL) + { + node_amount++; + rows = (char *) (rows + strlen("Rows Removed by Filter\": ") * sizeof(char)); + + /* + * Filter node have 2 conditions: + * 1) Was not filtered (current progress = 0) + * 2) Was filtered (current progress = 1) + */ + if (rows[0] != '0') + progress += 1; + } + else if ((rows = strstr(node, "\"Actual Rows\": ")) != NULL) + { + node_amount++; + rows = (char *) (rows + strlen("\"Actual Rows\": ") * sizeof(char)); + len = strstr(rows, "\n") - rows; + if ((strstr(rows, ",") - rows) < len) + len = strstr(rows, ",") - rows; + actual_rows_str = palloc(sizeof(char) * (len + 1)); + actual_rows_str[len] = 0; + strncpy(actual_rows_str, rows, len); + actual_rows = strtod(actual_rows_str, NULL); + pfree(actual_rows_str); + + rows = strstr(node, "\"Plan Rows\": "); + rows = (char *) (rows + strlen("\"Plan Rows\": ") * sizeof(char)); + len = strstr(rows, ",") - rows; + plan_rows_str = palloc(sizeof(char) * (len + 1)); + plan_rows_str[len] = 0; + strncpy(plan_rows_str, rows, len); + plan_rows = strtod(plan_rows_str, NULL); + pfree(plan_rows_str); + + if (plan_rows > actual_rows) + progress += actual_rows / plan_rows; + else + progress += 1; + } + } + } + } + node = strtok(NULL, "["); + } + + pfree(plan); + if (node_amount > 0) + { + progress = progress / node_amount; + if (progress == 1) + progress = 0.999999; + } + else + return -1; + return progress; +} + +static double +GetCurrentNumericState(shm_mq_msg *msg) +{ + typedef struct + { + PGPROC *proc; + ListCell *frame_cursor; + int frame_index; + List *stack; + } proc_state; + + /* multicall context type */ + typedef struct + { + ListCell *proc_cursor; + List *procs; + } pg_qs_fctx; + + pg_qs_fctx *fctx; + List *qs_stack; + proc_state *p_state; + stack_frame *frame; + char *plan_text; + + fctx = (pg_qs_fctx *) palloc(sizeof(pg_qs_fctx)); + fctx->procs = NIL; + p_state = (proc_state *) palloc(sizeof(proc_state)); + qs_stack = deserialize_stack(msg->stack, msg->stack_depth); + p_state->proc = msg->proc; + p_state->stack = qs_stack; + p_state->frame_index = 0; + p_state->frame_cursor = list_head(qs_stack); + fctx->procs = lappend(fctx->procs, p_state); + fctx->proc_cursor = list_head(fctx->procs); + frame = (stack_frame *) lfirst(p_state->frame_cursor); + plan_text = frame->plan->vl_dat; + return CountProgress(plan_text); +} + +PG_FUNCTION_INFO_V1(pg_progress_bar); +Datum +pg_progress_bar(PG_FUNCTION_ARGS) +{ + pid_t pid = PG_GETARG_INT32(0); + int delay = 0; + PGPROC *proc; + Oid counterpart_user_id; + shm_mq_msg *msg; + List *bg_worker_procs = NIL; + List *msgs; + double progress; + double old_progress; + + if (PG_NARGS() == 2) + { + /* + * This is continuous mode, function 'pg_progress_bar_visual', + * we need to get delay value. + */ + delay = PG_GETARG_INT32(1); + if (delay < 1) + ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("the value of \"delay\" must be positive integer"))); + } + + if (!module_initialized) + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("pg_query_state wasn't initialized yet"))); + + if (pid == MyProcPid) + ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("attempt to extract state of current process"))); + + proc = BackendPidGetProc(pid); + if (!proc || +#if PG_VERSION_NUM >= 170000 + proc->vxid.procNumber == INVALID_PROC_NUMBER || +#else + proc->backendId == InvalidBackendId || +#endif + proc->databaseId == InvalidOid || + proc->roleId == InvalidOid) + ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("backend with pid=%d not found", pid))); + + counterpart_user_id = GetRemoteBackendUserId(proc); + if (!(superuser() || GetUserId() == counterpart_user_id)) + ereport(ERROR, (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), + errmsg("permission denied"))); + + old_progress = 0; + progress = 0; + if (SRF_IS_FIRSTCALL()) + { + pg_atomic_write_u32(&counterpart_userid->n_peers, 1); + params->reqid = ++reqid; + } + + bg_worker_procs = GetRemoteBackendWorkers(proc); + msgs = GetRemoteBackendQueryStates(proc, + bg_worker_procs, + 0, 1, 0, 0, 0, + EXPLAIN_FORMAT_JSON); + if (list_length(msgs) == 0) + elog(WARNING, "backend does not reply"); + msg = (shm_mq_msg *) linitial(msgs); + + switch (msg->result_code) + { + case QUERY_NOT_RUNNING: + elog(INFO, "query not runing"); + PG_RETURN_FLOAT8((float8) -1); + break; + case STAT_DISABLED: + elog(INFO, "query execution statistics disabled"); + PG_RETURN_FLOAT8((float8) -1); + default: + break; + } + if (msg->result_code == QS_RETURNED && delay == 0) + { + progress = GetCurrentNumericState(msg); + if (progress < 0) + { + elog(INFO, "Counting Progress doesn't available"); + PG_RETURN_FLOAT8((float8) -1); + } + else + PG_RETURN_FLOAT8((float8) progress); + } + else if (msg->result_code == QS_RETURNED) + { + while (msg->result_code == QS_RETURNED) + { + progress = GetCurrentNumericState(msg); + if (progress > old_progress) + { + elog(INFO, "\rProgress = %f", progress); + old_progress = progress; + } + else if (progress < 0) + { + elog(INFO, "Counting Progress doesn't available"); + break; + } + + for (int i = 0; i < delay; i++) + { + pg_usleep(1000000); + CHECK_FOR_INTERRUPTS(); + } + + bg_worker_procs = GetRemoteBackendWorkers(proc); + msgs = GetRemoteBackendQueryStates(proc, + bg_worker_procs, + 0, 1, 0, 0, 0, + EXPLAIN_FORMAT_JSON); + if (list_length(msgs) == 0) + elog(WARNING, "backend does not reply"); + msg = (shm_mq_msg *) linitial(msgs); + } + if (progress > -1) + elog(INFO, "\rProgress = 1.000000"); + PG_RETURN_FLOAT8((float8) 1); + } + PG_RETURN_FLOAT8((float8) -1); +} diff --git a/pg_query_state.control b/pg_query_state.control index fdf637e..9373bb0 100644 --- a/pg_query_state.control +++ b/pg_query_state.control @@ -1,5 +1,5 @@ # pg_query_state extension comment = 'tool for inspection query progress' -default_version = '1.1' +default_version = '1.2' module_pathname = '$libdir/pg_query_state' relocatable = true diff --git a/t/test_bad_progress_bar.pl b/t/test_bad_progress_bar.pl new file mode 100644 index 0000000..073cdb1 --- /dev/null +++ b/t/test_bad_progress_bar.pl @@ -0,0 +1,115 @@ +# pg_query_state/t/test_bad_progress_bar.pl +# +# Check uncorrect launches of functions pg_progress_bar(pid) +# and pg_progress_bar_visual(pid, delay) + +use strict; +use warnings; +use Test::More tests => 4; + +# List of checks for bad cases: +# 1) appealing to a bad pid +# ------- requires DBI and DBD::Pg modules ------- +# 2) extracting the state of the process itself + +my $node; +my $dbh_status; +my $pid_status; + +sub bad_pid +{ + note('Extracting from bad pid'); + my $stderr; + $node->psql('postgres', 'SELECT * from pg_progress_bar(-1)', stderr => \$stderr); + is ($stderr, 'psql::1: ERROR: backend with pid=-1 not found', "appealing to a bad pid for pg_progress_bar"); + $node->psql('postgres', 'SELECT * from pg_progress_bar(-1)_visual', stderr => \$stderr); + is ($stderr, 'psql::1: ERROR: backend with pid=-1 not found', "appealing to a bad pid for pg_progress_bar_visual"); +} + +sub self_status +{ + note('Extracting your own status'); + $dbh_status->do('SELECT * from pg_progress_bar(' . $pid_status . ')'); + is($dbh_status->errstr, 'ERROR: attempt to extract state of current process', "extracting the state of the process itself for pg_progress_bar"); + $dbh_status->do('SELECT * from pg_progress_bar_visual(' . $pid_status . ')'); + is($dbh_status->errstr, 'ERROR: attempt to extract state of current process', "extracting the state of the process itself for pg_progress_bar_visual"); +} + +# start backend for function pg_progress_bar + +my $pg_15_modules; + +BEGIN +{ + $pg_15_modules = eval + { + require PostgreSQL::Test::Cluster; + require PostgreSQL::Test::Utils; + return 1; + }; + + unless (defined $pg_15_modules) + { + $pg_15_modules = 0; + + require PostgresNode; + require TestLib; + } +} + +note('PostgreSQL 15 modules are used: ' . ($pg_15_modules ? 'yes' : 'no')); + +if ($pg_15_modules) +{ + $node = PostgreSQL::Test::Cluster->new("master"); +} +else +{ + $node = PostgresNode->get_new_node("master"); +} + +$node->init; +$node->start; +$node->append_conf('postgresql.conf', "shared_preload_libraries = 'pg_query_state'"); +$node->restart; +$node->psql('postgres', 'CREATE EXTENSION pg_query_state;'); + +# 2 tests for 1 case +bad_pid(); + +# Check whether we have both DBI and DBD::pg + +my $dbdpg_rc = eval +{ + require DBI; + require DBD::Pg; + 1; +}; + +$dbdpg_rc = 0 unless defined $dbdpg_rc; + +if ($dbdpg_rc != 1) +{ + diag('DBI and DBD::Pg are not available, skip 2/4 tests'); +} + +SKIP: { + skip "DBI and DBD::Pg are not available", 2 if ($dbdpg_rc != 1); + + DBD::Pg->import(':async'); + $dbh_status = DBI->connect('DBI:Pg:' . $node->connstr($_)); + if ( !defined $dbh_status ) + { + die "Cannot connect to database for dbh with pg_progress_bar\n"; + } + + $pid_status = $dbh_status->{pg_pid}; + + # 2 tests for 2 case + self_status(); + + $dbh_status->disconnect; +} + +$node->stop('fast'); + diff --git a/tests/common.py b/tests/common.py index 6dab69a..c69a8c7 100644 --- a/tests/common.py +++ b/tests/common.py @@ -161,6 +161,54 @@ def onetime_query_state(config, async_conn, query, args={}, num_workers=0): set_guc(async_conn, 'enable_mergejoin', 'on') return result, notices +def progress_bar(config, pid): + conn = psycopg2.connect(**config) + curs = conn.cursor() + + curs.callproc('pg_progress_bar', (pid,)) + result = curs.fetchall() + notices = conn.notices[:] + conn.close() + + return result, notices + +def onetime_progress_bar(config, async_conn, query, args={}, num_workers=0): + """ + Get intermediate state of 'query' on connection 'async_conn' after number of 'steps' + of node executions from start of query + """ + + acurs = async_conn.cursor() + + set_guc(async_conn, 'enable_mergejoin', 'off') + set_guc(async_conn, 'max_parallel_workers_per_gather', num_workers) + acurs.execute(query) + + # extract progress of current query + MAX_PG_QS_RETRIES = 10 + DELAY_BETWEEN_RETRIES = 0.1 + pg_qs_args = { + 'config': config, + 'pid': async_conn.get_backend_pid(), + } + for k, v in args.items(): + pg_qs_args[k] = v + n_retries = 0 + while True: + result, notices = progress_bar(**pg_qs_args) + n_retries += 1 + if len(result) > 0: + break + if n_retries >= MAX_PG_QS_RETRIES: + # pg_query_state callings don't return any result, more likely run + # query has completed + break + time.sleep(DELAY_BETWEEN_RETRIES) + wait(async_conn) + + set_guc(async_conn, 'enable_mergejoin', 'on') + return result, notices + def set_guc(async_conn, param, value): acurs = async_conn.cursor() acurs.execute('set %s to %s' % (param, value)) diff --git a/tests/pg_qs_test_runner.py b/tests/pg_qs_test_runner.py index 944f77f..63b0270 100644 --- a/tests/pg_qs_test_runner.py +++ b/tests/pg_qs_test_runner.py @@ -70,6 +70,7 @@ class TeardownException(Exception): pass test_formats, test_timing_buffers_conflicts, test_insert_on_conflict, + test_progress_bar, ] def setup(con): diff --git a/tests/test_cases.py b/tests/test_cases.py index 498484b..3d4e9ae 100644 --- a/tests/test_cases.py +++ b/tests/test_cases.py @@ -408,3 +408,18 @@ def test_timing_buffers_conflicts(config): and 'WARNING: buffers statistics disabled\n' in notices common.n_close((acon,)) + +def test_progress_bar(config): + """test pg_progress_bar of simple query""" + + acon, = common.n_async_connect(config) + query = 'select * from foo join bar on foo.c1=bar.c1' + + qs, notices = common.onetime_progress_bar(config, acon, query) + assert qs[0][0] >= 0 and qs[0][0] < 1 + first_qs = qs[0][0] + + qs, _ = common.onetime_progress_bar(config, acon, query) + assert qs[0][0] >= first_qs and qs[0][0] < 1 + + common.n_close((acon,))