From 405723e1ce2d13bc0c47b4133f932e48a29d4548 Mon Sep 17 00:00:00 2001 From: Christoph Berg Date: Wed, 28 Sep 2016 12:38:41 +0200 Subject: [PATCH] Fix build against 9.6 parser/keywords.h is now common/keywords.h The regression expected output files need a new variant because plpgsql ERRORs will now show an extra CONTEXT line. The net diff for the two added _1.out files is: ``` --- sql/pgq_node/expected/pgq_node_test.out 2013-02-08 10:44:34.000000000 +0100 +++ sql/pgq_node/expected/pgq_node_test_1.out 2016-09-28 12:36:52.311636581 +0200 @@ -269,6 +269,7 @@ select * from pgq_node.is_root_node('cqueue'); ERROR: queue does not exist: cqueue +CONTEXT: PL/pgSQL function pgq_node.is_root_node(text) line 19 at RAISE select * from pgq_node.get_consumer_state('bqueue', 'random_consumer'); ret_code | ret_note | node_type | node_name | completed_tick | provider_node | provider_location | paused | uptodate | cur_error ----------+----------+-----------+-----------+----------------+---------------+-------------------+--------+----------+----------- --- sql/pgq_coop/expected/pgq_coop_test.out 2013-02-08 10:44:34.000000000 +0100 +++ sql/pgq_coop/expected/pgq_coop_test_1.out 2016-09-28 12:36:52.311636581 +0200 @@ -137,6 +137,7 @@ select pgq_coop.unregister_subconsumer('testqueue', 'maincons', 'subcons2', 0); ERROR: subconsumer has active batch +CONTEXT: PL/pgSQL function pgq_coop.unregister_subconsumer(text,text,text,integer) line 42 at RAISE select pgq_coop.unregister_subconsumer('testqueue', 'maincons', 'subcons2', 1); unregister_subconsumer ------------------------ ``` --- sql/pgq/triggers/stringutil.c | 4 + sql/pgq_coop/expected/pgq_coop_test_1.out | 193 +++++++++ sql/pgq_node/expected/pgq_node_test_1.out | 473 ++++++++++++++++++++++ 3 files changed, 670 insertions(+) create mode 100644 sql/pgq_coop/expected/pgq_coop_test_1.out create mode 100644 sql/pgq_node/expected/pgq_node_test_1.out diff --git a/sql/pgq/triggers/stringutil.c b/sql/pgq/triggers/stringutil.c index b52ec622..8d20e325 100644 --- a/sql/pgq/triggers/stringutil.c +++ b/sql/pgq/triggers/stringutil.c @@ -19,7 +19,11 @@ #include #include #include +#if PG_VERSION_NUM >= 90600 +#include +#else #include +#endif #include #include "stringutil.h" diff --git a/sql/pgq_coop/expected/pgq_coop_test_1.out b/sql/pgq_coop/expected/pgq_coop_test_1.out new file mode 100644 index 00000000..d8e483da --- /dev/null +++ b/sql/pgq_coop/expected/pgq_coop_test_1.out @@ -0,0 +1,193 @@ +select pgq.create_queue('testqueue'); + create_queue +-------------- + 1 +(1 row) + +update pgq.queue set queue_ticker_max_count = 1 where queue_name = 'testqueue'; +-- register +select pgq_coop.register_subconsumer('testqueue', 'maincons', 'subcons1'); + register_subconsumer +---------------------- + 1 +(1 row) + +select pgq_coop.register_subconsumer('testqueue', 'maincons', 'subcons1'); + register_subconsumer +---------------------- + 0 +(1 row) + +select pgq_coop.register_subconsumer('testqueue', 'maincons', 'subcons1'); + register_subconsumer +---------------------- + 0 +(1 row) + +select pgq_coop.register_subconsumer('testqueue', 'maincons', 'subcons2'); + register_subconsumer +---------------------- + 1 +(1 row) + +-- process events +select pgq_coop.next_batch('testqueue', 'maincons', 'subcons1'); + next_batch +------------ + +(1 row) + +select pgq.insert_event('testqueue', 'ev0', 'data'); + insert_event +-------------- + 1 +(1 row) + +select pgq.insert_event('testqueue', 'ev1', 'data'); + insert_event +-------------- + 2 +(1 row) + +select pgq.insert_event('testqueue', 'ev2', 'data'); + insert_event +-------------- + 3 +(1 row) + +select pgq.ticker(); + ticker +-------- + 1 +(1 row) + +select pgq_coop.next_batch('testqueue', 'maincons', 'subcons1'); + next_batch +------------ + 1 +(1 row) + +select pgq_coop.next_batch('testqueue', 'maincons', 'subcons1'); + next_batch +------------ + 1 +(1 row) + +select pgq_coop.next_batch('testqueue', 'maincons', 'subcons2'); + next_batch +------------ + +(1 row) + +select pgq.insert_event('testqueue', 'ev3', 'data'); + insert_event +-------------- + 4 +(1 row) + +select pgq.insert_event('testqueue', 'ev4', 'data'); + insert_event +-------------- + 5 +(1 row) + +select pgq.insert_event('testqueue', 'ev5', 'data'); + insert_event +-------------- + 6 +(1 row) + +select pgq.ticker(); + ticker +-------- + 1 +(1 row) + +select pgq_coop.next_batch('testqueue', 'maincons', 'subcons2'); + next_batch +------------ + 2 +(1 row) + +select pgq_coop.finish_batch(2); + finish_batch +-------------- + 1 +(1 row) + +-- test takeover +select pgq_coop.next_batch('testqueue', 'maincons', 'subcons2', '1 hour'); + next_batch +------------ + +(1 row) + +update pgq.subscription set sub_active = '2005-01-01' where sub_batch is not null; +select pgq_coop.next_batch('testqueue', 'maincons', 'subcons2', '1 hour'); + next_batch +------------ + 1 +(1 row) + +select pgq_coop.unregister_subconsumer('testqueue', 'maincons', 'subcons1', 0); + unregister_subconsumer +------------------------ + 0 +(1 row) + +select pgq_coop.unregister_subconsumer('testqueue', 'maincons', 'subcons2', 0); +ERROR: subconsumer has active batch +CONTEXT: PL/pgSQL function pgq_coop.unregister_subconsumer(text,text,text,integer) line 42 at RAISE +select pgq_coop.unregister_subconsumer('testqueue', 'maincons', 'subcons2', 1); + unregister_subconsumer +------------------------ + 1 +(1 row) + +select pgq_coop.unregister_subconsumer('testqueue', 'maincons', 'subcons2', 0); + unregister_subconsumer +------------------------ + 0 +(1 row) + +-- test auto-creation +select pgq_coop.next_batch('testqueue', 'cmain', 'sc1'); + next_batch +------------ + +(1 row) + +select pgq_coop.next_batch('testqueue', 'cmain', 'sc2'); + next_batch +------------ + +(1 row) + +select consumer_name, last_tick from pgq.get_consumer_info(); + consumer_name | last_tick +---------------+----------- + cmain | 3 + cmain.sc1 | + cmain.sc2 | + maincons | 3 +(4 rows) + +-- test unregistering with pure pgq api +select pgq.unregister_consumer('testqueue', 'cmain.sc2'); + unregister_consumer +--------------------- + 1 +(1 row) + +select pgq.unregister_consumer('testqueue', 'cmain'); + unregister_consumer +--------------------- + 2 +(1 row) + +select consumer_name, last_tick from pgq.get_consumer_info(); + consumer_name | last_tick +---------------+----------- + maincons | 3 +(1 row) + diff --git a/sql/pgq_node/expected/pgq_node_test_1.out b/sql/pgq_node/expected/pgq_node_test_1.out new file mode 100644 index 00000000..85dca321 --- /dev/null +++ b/sql/pgq_node/expected/pgq_node_test_1.out @@ -0,0 +1,473 @@ +select * from pgq_node.register_location('aqueue', 'node1', 'dbname=node1', false); + ret_code | ret_note +----------+--------------------- + 200 | Location registered +(1 row) + +select * from pgq_node.register_location('aqueue', 'node2', 'dbname=node2', false); + ret_code | ret_note +----------+--------------------- + 200 | Location registered +(1 row) + +select * from pgq_node.register_location('aqueue', 'node3', 'dbname=node3', false); + ret_code | ret_note +----------+--------------------- + 200 | Location registered +(1 row) + +select * from pgq_node.register_location('aqueue', 'node4', 'dbname=node44', false); + ret_code | ret_note +----------+--------------------- + 200 | Location registered +(1 row) + +select * from pgq_node.register_location('aqueue', 'node4', 'dbname=node4', false); + ret_code | ret_note +----------+--------------------- + 200 | Location registered +(1 row) + +select * from pgq_node.register_location('aqueue', 'node5', 'dbname=node4', false); + ret_code | ret_note +----------+--------------------- + 200 | Location registered +(1 row) + +select * from pgq_node.get_queue_locations('aqueue'); + node_name | node_location | dead +-----------+---------------+------ + node1 | dbname=node1 | f + node2 | dbname=node2 | f + node3 | dbname=node3 | f + node4 | dbname=node4 | f + node5 | dbname=node4 | f +(5 rows) + +select * from pgq_node.unregister_location('aqueue', 'node5'); + ret_code | ret_note +----------+---------- + 200 | Ok +(1 row) + +select * from pgq_node.unregister_location('aqueue', 'node5'); + ret_code | ret_note +----------+---------------------------------- + 301 | Location not found: aqueue/node5 +(1 row) + +select * from pgq_node.get_queue_locations('aqueue'); + node_name | node_location | dead +-----------+---------------+------ + node1 | dbname=node1 | f + node2 | dbname=node2 | f + node3 | dbname=node3 | f + node4 | dbname=node4 | f +(4 rows) + +select * from pgq_node.create_node('aqueue', 'root', 'node1', 'node1_worker', null, null, null); + ret_code | ret_note +----------+-------------------------------------------------------------- + 200 | Node "node1" initialized for queue "aqueue" with type "root" +(1 row) + +select * from pgq_node.register_subscriber('aqueue', 'node2', 'node2_worker', null); + ret_code | ret_note | global_watermark +----------+------------------------------+------------------ + 200 | Subscriber registered: node2 | 1 +(1 row) + +select * from pgq_node.register_subscriber('aqueue', 'node3', 'node3_worker', null); + ret_code | ret_note | global_watermark +----------+------------------------------+------------------ + 200 | Subscriber registered: node3 | 1 +(1 row) + +select * from pgq_node.maint_watermark('aqueue'); + maint_watermark +----------------- + 0 +(1 row) + +select * from pgq_node.maint_watermark('aqueue-x'); + maint_watermark +----------------- + 0 +(1 row) + +select * from pgq_node.get_consumer_info('aqueue'); + consumer_name | provider_node | last_tick_id | paused | uptodate | cur_error +---------------+---------------+--------------+--------+----------+----------- + node1_worker | node1 | 1 | f | f | +(1 row) + +select * from pgq_node.unregister_subscriber('aqueue', 'node3'); + ret_code | ret_note +----------+-------------------------------- + 200 | Subscriber unregistered: node3 +(1 row) + +select queue_name, consumer_name, last_tick from pgq.get_consumer_info(); + queue_name | consumer_name | last_tick +------------+-------------------+----------- + aqueue | .global_watermark | 1 + aqueue | .node2.watermark | 1 + aqueue | node2_worker | 1 +(3 rows) + +select * from pgq_node.get_worker_state('aqueue'); + ret_code | ret_note | node_type | node_name | completed_tick | provider_node | provider_location | paused | uptodate | cur_error | worker_name | global_watermark | local_watermark | local_queue_top | combined_queue | combined_type +----------+----------+-----------+-----------+----------------+---------------+-------------------+--------+----------+-----------+--------------+------------------+-----------------+-----------------+----------------+--------------- + 100 | Ok | root | node1 | 1 | node1 | dbname=node1 | f | f | | node1_worker | 1 | 1 | 1 | | +(1 row) + +update pgq.queue set queue_ticker_max_lag = '0', queue_ticker_idle_period = '0'; +select * from pgq.ticker('aqueue'); + ticker +-------- + 2 +(1 row) + +select * from pgq.ticker('aqueue'); + ticker +-------- + 3 +(1 row) + +select * from pgq_node.set_subscriber_watermark('aqueue', 'node2', 3); + ret_code | ret_note +----------+--------------------------- + 200 | .node2.watermark set to 3 +(1 row) + +select queue_name, consumer_name, last_tick from pgq.get_consumer_info(); + queue_name | consumer_name | last_tick +------------+-------------------+----------- + aqueue | .global_watermark | 1 + aqueue | .node2.watermark | 3 + aqueue | node2_worker | 1 +(3 rows) + +select * from pgq_node.set_node_attrs('aqueue', 'test=1'); + ret_code | ret_note +----------+------------------------- + 200 | Node attributes updated +(1 row) + +select * from pgq_node.get_node_info('aqueue'); + ret_code | ret_note | node_type | node_name | global_watermark | local_watermark | provider_node | provider_location | combined_queue | combined_type | worker_name | worker_paused | worker_uptodate | worker_last_tick | node_attrs +----------+----------+-----------+-----------+------------------+-----------------+---------------+-------------------+----------------+---------------+--------------+---------------+-----------------+------------------+------------ + 100 | Ok | root | node1 | 1 | 1 | node1 | dbname=node1 | | | node1_worker | f | f | 3 | test=1 +(1 row) + +select * from pgq_node.get_subscriber_info('aqueue'); + node_name | worker_name | node_watermark +-----------+--------------+---------------- + node2 | node2_worker | 3 +(1 row) + +-- branch node +select * from pgq_node.register_location('bqueue', 'node1', 'dbname=node1', false); + ret_code | ret_note +----------+--------------------- + 200 | Location registered +(1 row) + +select * from pgq_node.register_location('bqueue', 'node2', 'dbname=node2', false); + ret_code | ret_note +----------+--------------------- + 200 | Location registered +(1 row) + +select * from pgq_node.register_location('bqueue', 'node3', 'dbname=node3', false); + ret_code | ret_note +----------+--------------------- + 200 | Location registered +(1 row) + +select * from pgq_node.create_node('bqueue', 'branch', 'node2', 'node2_worker', 'node1', 1, null); + ret_code | ret_note +----------+---------------------------------------------------------------- + 200 | Node "node2" initialized for queue "bqueue" with type "branch" +(1 row) + +select * from pgq_node.register_consumer('bqueue', 'random_consumer', 'node1', 1); + ret_code | ret_note +----------+----------------------------------------------------- + 200 | Consumer random_consumer registered on queue bqueue +(1 row) + +select * from pgq_node.register_consumer('bqueue', 'random_consumer2', 'node1', 1); + ret_code | ret_note +----------+------------------------------------------------------ + 200 | Consumer random_consumer2 registered on queue bqueue +(1 row) + +select * from pgq_node.local_state; + queue_name | consumer_name | provider_node | last_tick_id | cur_error | paused | uptodate +------------+------------------+---------------+--------------+-----------+--------+---------- + aqueue | node1_worker | node1 | 1 | | f | f + bqueue | node2_worker | node1 | 1 | | f | f + bqueue | random_consumer | node1 | 1 | | f | f + bqueue | random_consumer2 | node1 | 1 | | f | f +(4 rows) + +select * from pgq_node.node_info; + queue_name | node_type | node_name | worker_name | combined_queue | node_attrs +------------+-----------+-----------+--------------+----------------+------------ + aqueue | root | node1 | node1_worker | | test=1 + bqueue | branch | node2 | node2_worker | | +(2 rows) + +select * from pgq_node.get_node_info('aqueue'); + ret_code | ret_note | node_type | node_name | global_watermark | local_watermark | provider_node | provider_location | combined_queue | combined_type | worker_name | worker_paused | worker_uptodate | worker_last_tick | node_attrs +----------+----------+-----------+-----------+------------------+-----------------+---------------+-------------------+----------------+---------------+--------------+---------------+-----------------+------------------+------------ + 100 | Ok | root | node1 | 1 | 1 | node1 | dbname=node1 | | | node1_worker | f | f | 3 | test=1 +(1 row) + +select * from pgq_node.get_node_info('bqueue'); + ret_code | ret_note | node_type | node_name | global_watermark | local_watermark | provider_node | provider_location | combined_queue | combined_type | worker_name | worker_paused | worker_uptodate | worker_last_tick | node_attrs +----------+----------+-----------+-----------+------------------+-----------------+---------------+-------------------+----------------+---------------+--------------+---------------+-----------------+------------------+------------ + 100 | Ok | branch | node2 | 1 | 1 | node1 | dbname=node1 | | | node2_worker | f | f | 1 | +(1 row) + +select * from pgq_node.get_node_info('cqueue'); + ret_code | ret_note | node_type | node_name | global_watermark | local_watermark | provider_node | provider_location | combined_queue | combined_type | worker_name | worker_paused | worker_uptodate | worker_last_tick | node_attrs +----------+-----------------------+-----------+-----------+------------------+-----------------+---------------+-------------------+----------------+---------------+-------------+---------------+-----------------+------------------+------------ + 404 | Unknown queue: cqueue | | | | | | | | | | | | | +(1 row) + +select * from pgq_node.get_worker_state('aqueue'); + ret_code | ret_note | node_type | node_name | completed_tick | provider_node | provider_location | paused | uptodate | cur_error | worker_name | global_watermark | local_watermark | local_queue_top | combined_queue | combined_type +----------+----------+-----------+-----------+----------------+---------------+-------------------+--------+----------+-----------+--------------+------------------+-----------------+-----------------+----------------+--------------- + 100 | Ok | root | node1 | 1 | node1 | dbname=node1 | f | f | | node1_worker | 1 | 1 | 3 | | +(1 row) + +select * from pgq_node.get_worker_state('bqueue'); + ret_code | ret_note | node_type | node_name | completed_tick | provider_node | provider_location | paused | uptodate | cur_error | worker_name | global_watermark | local_watermark | local_queue_top | combined_queue | combined_type +----------+----------+-----------+-----------+----------------+---------------+-------------------+--------+----------+-----------+--------------+------------------+-----------------+-----------------+----------------+--------------- + 100 | Ok | branch | node2 | 1 | node1 | dbname=node1 | f | f | | node2_worker | 1 | 1 | 1 | | +(1 row) + +select * from pgq_node.get_worker_state('cqueue'); + ret_code | ret_note | node_type | node_name | completed_tick | provider_node | provider_location | paused | uptodate | cur_error | worker_name | global_watermark | local_watermark | local_queue_top | combined_queue | combined_type +----------+-----------------------+-----------+-----------+----------------+---------------+-------------------+--------+----------+-----------+-------------+------------------+-----------------+-----------------+----------------+--------------- + 404 | Unknown queue: cqueue | | | | | | | | | | | | | | +(1 row) + +select * from pgq_node.is_root_node('aqueue'); + is_root_node +-------------- + t +(1 row) + +select * from pgq_node.is_root_node('bqueue'); + is_root_node +-------------- + f +(1 row) + +select * from pgq_node.is_root_node('cqueue'); +ERROR: queue does not exist: cqueue +CONTEXT: PL/pgSQL function pgq_node.is_root_node(text) line 19 at RAISE +select * from pgq_node.get_consumer_state('bqueue', 'random_consumer'); + ret_code | ret_note | node_type | node_name | completed_tick | provider_node | provider_location | paused | uptodate | cur_error +----------+----------+-----------+-----------+----------------+---------------+-------------------+--------+----------+----------- + 100 | Ok | branch | node2 | 1 | node1 | dbname=node1 | f | f | +(1 row) + +select * from pgq_node.get_consumer_state('bqueue', 'random_consumer2'); + ret_code | ret_note | node_type | node_name | completed_tick | provider_node | provider_location | paused | uptodate | cur_error +----------+----------+-----------+-----------+----------------+---------------+-------------------+--------+----------+----------- + 100 | Ok | branch | node2 | 1 | node1 | dbname=node1 | f | f | +(1 row) + +select * from pgq_node.set_consumer_error('bqueue', 'random_consumer2', 'failure'); + ret_code | ret_note +----------+------------------------------------------- + 100 | Consumer random_consumer2 error = failure +(1 row) + +select * from pgq_node.get_consumer_state('bqueue', 'random_consumer2'); + ret_code | ret_note | node_type | node_name | completed_tick | provider_node | provider_location | paused | uptodate | cur_error +----------+----------+-----------+-----------+----------------+---------------+-------------------+--------+----------+----------- + 100 | Ok | branch | node2 | 1 | node1 | dbname=node1 | f | f | failure +(1 row) + +select * from pgq_node.set_consumer_completed('bqueue', 'random_consumer2', 2); + ret_code | ret_note +----------+---------------------------------------------- + 100 | Consumer random_consumer2 compleded tick = 2 +(1 row) + +select * from pgq_node.get_consumer_state('bqueue', 'random_consumer2'); + ret_code | ret_note | node_type | node_name | completed_tick | provider_node | provider_location | paused | uptodate | cur_error +----------+----------+-----------+-----------+----------------+---------------+-------------------+--------+----------+----------- + 100 | Ok | branch | node2 | 2 | node1 | dbname=node1 | f | f | +(1 row) + +select * from pgq_node.set_consumer_paused('bqueue', 'random_consumer2', true); + ret_code | ret_note +----------+-------------------------------------------- + 200 | Consumer random_consumer2 tagged as paused +(1 row) + +select * from pgq_node.set_consumer_uptodate('bqueue', 'random_consumer2', true); + ret_code | ret_note +----------+----------------------- + 200 | Consumer uptodate = 1 +(1 row) + +select * from pgq_node.change_consumer_provider('bqueue', 'random_consumer2', 'node3'); + ret_code | ret_note +----------+--------------------------------------- + 200 | Consumer provider node set to : node3 +(1 row) + +select * from pgq_node.get_consumer_state('bqueue', 'random_consumer2'); + ret_code | ret_note | node_type | node_name | completed_tick | provider_node | provider_location | paused | uptodate | cur_error +----------+----------+-----------+-----------+----------------+---------------+-------------------+--------+----------+----------- + 100 | Ok | branch | node2 | 2 | node3 | dbname=node3 | t | f | +(1 row) + +select * from pgq_node.unregister_consumer('bqueue', 'random_consumer2'); + ret_code | ret_note +----------+---------------------------------------------------- + 200 | Consumer random_consumer2 unregistered from bqueue +(1 row) + +select * from pgq_node.get_consumer_state('bqueue', 'random_consumer2'); + ret_code | ret_note | node_type | node_name | completed_tick | provider_node | provider_location | paused | uptodate | cur_error +----------+-------------------------------------------+-----------+-----------+----------------+---------------+-------------------+--------+----------+----------- + 404 | Unknown consumer: bqueue/random_consumer2 | branch | node2 | | | | | | +(1 row) + +select * from pgq_node.get_node_info('bqueue'); + ret_code | ret_note | node_type | node_name | global_watermark | local_watermark | provider_node | provider_location | combined_queue | combined_type | worker_name | worker_paused | worker_uptodate | worker_last_tick | node_attrs +----------+----------+-----------+-----------+------------------+-----------------+---------------+-------------------+----------------+---------------+--------------+---------------+-----------------+------------------+------------ + 100 | Ok | branch | node2 | 1 | 1 | node1 | dbname=node1 | | | node2_worker | f | f | 1 | +(1 row) + +set session_replication_role = 'replica'; +select * from pgq_node.demote_root('aqueue', 1, 'node3'); + ret_code | ret_note | last_tick +----------+--------------------------------------+----------- + 200 | Step 1: Writing disabled for: aqueue | +(1 row) + +select * from pgq_node.demote_root('aqueue', 1, 'node3'); + ret_code | ret_note | last_tick +----------+--------------------------------------+----------- + 200 | Step 1: Writing disabled for: aqueue | +(1 row) + +select * from pgq_node.demote_root('aqueue', 2, 'node3'); + ret_code | ret_note | last_tick +----------+------------------------------------+----------- + 200 | Step 2: Inserted last tick: aqueue | 4 +(1 row) + +select * from pgq_node.demote_root('aqueue', 2, 'node3'); + ret_code | ret_note | last_tick +----------+------------------------------------+----------- + 200 | Step 2: Inserted last tick: aqueue | 5 +(1 row) + +select * from pgq_node.demote_root('aqueue', 3, 'node3'); + ret_code | ret_note | last_tick +----------+----------------------------------------+----------- + 200 | Step 3: Demoted root to branch: aqueue | 5 +(1 row) + +select * from pgq_node.demote_root('aqueue', 3, 'node3'); + ret_code | ret_note | last_tick +----------+---------------+----------- + 301 | Node not root | +(1 row) + +-- leaf node +select * from pgq_node.register_location('mqueue', 'node1', 'dbname=node1', false); + ret_code | ret_note +----------+--------------------- + 200 | Location registered +(1 row) + +select * from pgq_node.register_location('mqueue', 'node2', 'dbname=node2', false); + ret_code | ret_note +----------+--------------------- + 200 | Location registered +(1 row) + +select * from pgq_node.register_location('mqueue', 'node3', 'dbname=node3', false); + ret_code | ret_note +----------+--------------------- + 200 | Location registered +(1 row) + +select * from pgq_node.create_node('mqueue', 'leaf', 'node2', 'node2_worker', 'node1', 13, 'aqueue'); + ret_code | ret_note +----------+-------------------------------------------------------------- + 200 | Node "node2" initialized for queue "mqueue" with type "leaf" +(1 row) + +select * from pgq_node.get_worker_state('mqueue'); + ret_code | ret_note | node_type | node_name | completed_tick | provider_node | provider_location | paused | uptodate | cur_error | worker_name | global_watermark | local_watermark | local_queue_top | combined_queue | combined_type +----------+----------+-----------+-----------+----------------+---------------+-------------------+--------+----------+-----------+--------------+------------------+-----------------+-----------------+----------------+--------------- + 100 | Ok | leaf | node2 | 13 | node1 | dbname=node1 | f | f | | node2_worker | | 13 | | aqueue | branch +(1 row) + +select * from pgq_node.drop_node('asd', 'asd'); + ret_code | ret_note +----------+------------------- + 200 | Node dropped: asd +(1 row) + +select * from pgq_node.drop_node('mqueue', 'node3'); + ret_code | ret_note +----------+--------------------- + 200 | Node dropped: node3 +(1 row) + +select * from pgq_node.drop_node('mqueue', 'node2'); + ret_code | ret_note +----------+--------------------- + 200 | Node dropped: node2 +(1 row) + +select * from pgq_node.drop_node('mqueue', 'node1'); + ret_code | ret_note +----------+--------------------- + 200 | Node dropped: node1 +(1 row) + +select * from pgq_node.drop_node('aqueue', 'node5'); + ret_code | ret_note +----------+--------------------- + 200 | Node dropped: node5 +(1 row) + +select * from pgq_node.drop_node('aqueue', 'node4'); + ret_code | ret_note +----------+--------------------- + 200 | Node dropped: node4 +(1 row) + +select * from pgq_node.drop_node('aqueue', 'node1'); + ret_code | ret_note +----------+--------------------- + 200 | Node dropped: node1 +(1 row) + +select * from pgq_node.drop_node('aqueue', 'node2'); + ret_code | ret_note +----------+--------------------- + 200 | Node dropped: node2 +(1 row) + +select * from pgq_node.drop_node('aqueue', 'node3'); + ret_code | ret_note +----------+--------------------- + 200 | Node dropped: node3 +(1 row) + +\q