From ac5cde312b3ac2e11704fa6f4203affb3291c7d0 Mon Sep 17 00:00:00 2001 From: Alexander Kondakov Date: Tue, 15 Oct 2024 06:18:13 +0300 Subject: [PATCH] Initial solution --- gpcontrib/arenadata_toolkit/Makefile | 25 +- .../arenadata_toolkit--1.6--1.7.sql | 98 ++ .../arenadata_toolkit.control | 2 +- .../expected/arenadata_toolkit_guc.out | 233 ++++ .../expected/arenadata_toolkit_test.out | 182 ++- .../expected/arenadata_toolkit_tracking.out | 217 +++ .../expected/upgrade_test.out | 7 +- .../sql/arenadata_toolkit_guc.sql | 144 ++ .../sql/arenadata_toolkit_tracking.sql | 103 ++ .../arenadata_toolkit/src/arenadata_toolkit.c | 43 + .../src/arenadata_toolkit_guc.c | 295 ++++ .../src/arenadata_toolkit_worker.c | 450 ++++++ gpcontrib/arenadata_toolkit/src/bloom.c | 80 ++ gpcontrib/arenadata_toolkit/src/bloom_set.c | 280 ++++ .../{arenadata_toolkit.c => src/dbsize.c} | 100 +- gpcontrib/arenadata_toolkit/src/drops_track.c | 223 +++ gpcontrib/arenadata_toolkit/src/file_hook.c | 147 ++ .../src/include/arenadata_toolkit_guc.h | 22 + .../src/include/arenadata_toolkit_worker.h | 6 + .../arenadata_toolkit/src/include/bloom.h | 23 + .../arenadata_toolkit/src/include/bloom_set.h | 41 + .../arenadata_toolkit/src/include/dbsize.h | 6 + .../src/include/drops_track.h | 16 + .../arenadata_toolkit/src/include/file_hook.h | 7 + .../arenadata_toolkit/src/include/tf_shmem.h | 18 + gpcontrib/arenadata_toolkit/src/tf_shmem.c | 56 + gpcontrib/arenadata_toolkit/src/track_files.c | 1227 +++++++++++++++++ 27 files changed, 3950 insertions(+), 101 deletions(-) create mode 100644 gpcontrib/arenadata_toolkit/arenadata_toolkit--1.6--1.7.sql create mode 100644 gpcontrib/arenadata_toolkit/expected/arenadata_toolkit_guc.out create mode 100644 gpcontrib/arenadata_toolkit/expected/arenadata_toolkit_tracking.out create mode 100644 gpcontrib/arenadata_toolkit/sql/arenadata_toolkit_guc.sql create mode 100644 gpcontrib/arenadata_toolkit/sql/arenadata_toolkit_tracking.sql create mode 100644 gpcontrib/arenadata_toolkit/src/arenadata_toolkit.c create mode 100644 gpcontrib/arenadata_toolkit/src/arenadata_toolkit_guc.c create mode 100644 gpcontrib/arenadata_toolkit/src/arenadata_toolkit_worker.c create mode 100644 gpcontrib/arenadata_toolkit/src/bloom.c create mode 100644 gpcontrib/arenadata_toolkit/src/bloom_set.c rename gpcontrib/arenadata_toolkit/{arenadata_toolkit.c => src/dbsize.c} (85%) create mode 100644 gpcontrib/arenadata_toolkit/src/drops_track.c create mode 100644 gpcontrib/arenadata_toolkit/src/file_hook.c create mode 100644 gpcontrib/arenadata_toolkit/src/include/arenadata_toolkit_guc.h create mode 100644 gpcontrib/arenadata_toolkit/src/include/arenadata_toolkit_worker.h create mode 100644 gpcontrib/arenadata_toolkit/src/include/bloom.h create mode 100644 gpcontrib/arenadata_toolkit/src/include/bloom_set.h create mode 100644 gpcontrib/arenadata_toolkit/src/include/dbsize.h create mode 100644 gpcontrib/arenadata_toolkit/src/include/drops_track.h create mode 100644 gpcontrib/arenadata_toolkit/src/include/file_hook.h create mode 100644 gpcontrib/arenadata_toolkit/src/include/tf_shmem.h create mode 100644 gpcontrib/arenadata_toolkit/src/tf_shmem.c create mode 100644 gpcontrib/arenadata_toolkit/src/track_files.c diff --git a/gpcontrib/arenadata_toolkit/Makefile b/gpcontrib/arenadata_toolkit/Makefile index 446afcbc399d..665c4ae401e5 100644 --- a/gpcontrib/arenadata_toolkit/Makefile +++ b/gpcontrib/arenadata_toolkit/Makefile @@ -3,7 +3,7 @@ MODULES = arenadata_toolkit EXTENSION = arenadata_toolkit -EXTENSION_VERSION = 1.6 +EXTENSION_VERSION = 1.7 DATA = \ arenadata_toolkit--1.0.sql \ arenadata_toolkit--1.0--1.1.sql \ @@ -12,15 +12,29 @@ DATA = \ arenadata_toolkit--1.3--1.4.sql \ arenadata_toolkit--1.4--1.5.sql \ arenadata_toolkit--1.5--1.6.sql \ + arenadata_toolkit--1.6--1.7.sql \ DATA_built = $(EXTENSION)--$(EXTENSION_VERSION).sql -$(DATA_built): $(DATA) - cat $(DATA) > $(DATA_built) +MODULE_big = arenadata_toolkit +OBJS = \ + src/arenadata_toolkit_guc.o \ + src/bloom.o \ + src/bloom_set.o \ + src/drops_track.o \ + src/file_hook.o \ + src/tf_shmem.o \ + src/arenadata_toolkit.o \ + src/arenadata_toolkit_worker.o \ + src/track_files.o \ + src/dbsize.o \ + +PG_CFLAGS = -I$(libpq_srcdir) -I$(CURDIR)/src/include REGRESS = arenadata_toolkit_test arenadata_toolkit_skew_test adb_get_relfilenodes_test \ adb_collect_table_stats_test adb_vacuum_strategy_test adb_relation_storage_size_test \ - tablespace_location upgrade_test adb_hba_file_rules_view_test + tablespace_location upgrade_test adb_hba_file_rules_view_test \ + arenadata_toolkit_guc arenadata_toolkit_tracking REGRESS_OPTS += --init-file=$(top_srcdir)/src/test/regress/init_file ifdef USE_PGXS @@ -33,3 +47,6 @@ top_builddir = ../.. include $(top_builddir)/src/Makefile.global include $(top_srcdir)/contrib/contrib-global.mk endif + +$(DATA_built): $(DATA) + cat $(DATA) > $(DATA_built) diff --git a/gpcontrib/arenadata_toolkit/arenadata_toolkit--1.6--1.7.sql b/gpcontrib/arenadata_toolkit/arenadata_toolkit--1.6--1.7.sql new file mode 100644 index 000000000000..9038b32c8ef9 --- /dev/null +++ b/gpcontrib/arenadata_toolkit/arenadata_toolkit--1.6--1.7.sql @@ -0,0 +1,98 @@ +/* gpcontrib/arenadata_toolkit/arenadata_toolkit--1.6--1.7.sql */ + +CREATE FUNCTION arenadata_toolkit.tracking_register_db(dbid OID default 0) +returns BOOL AS '$libdir/arenadata_toolkit', +'tracking_register_db' LANGUAGE C; + +REVOKE ALL ON FUNCTION arenadata_toolkit.tracking_register_db(dbid OID) FROM public; + +CREATE FUNCTION arenadata_toolkit.tracking_unregister_db(dbid OID default 0) +returns BOOL AS '$libdir/arenadata_toolkit', +'tracking_unregister_db' LANGUAGE C; + +REVOKE ALL ON FUNCTION arenadata_toolkit.tracking_unregister_db(dbid OID) FROM public; + +CREATE FUNCTION arenadata_toolkit.tracking_register_schema(schemaname NAME, dbid OID default 0) +returns BOOL AS '$libdir/arenadata_toolkit', +'tracking_register_schema' LANGUAGE C EXECUTE ON master; + +REVOKE ALL ON FUNCTION arenadata_toolkit.tracking_register_schema(schema NAME, dbid OID) FROM public; + +CREATE FUNCTION arenadata_toolkit.tracking_unregister_schema(schema NAME, dbid OID default 0) +returns BOOL AS '$libdir/arenadata_toolkit', +'tracking_unregister_schema' LANGUAGE C EXECUTE ON master; + +REVOKE ALL ON FUNCTION arenadata_toolkit.tracking_unregister_schema(schema NAME, dbid OID) FROM public; + +CREATE FUNCTION arenadata_toolkit.tracking_set_relkinds(relkinds NAME, dbid OID DEFAULT 0) +returns BOOL AS '$libdir/arenadata_toolkit', +'tracking_set_relkinds' LANGUAGE C EXECUTE ON master; + +REVOKE ALL ON FUNCTION arenadata_toolkit.tracking_set_relkinds(relkinds NAME, dbid OID) FROM public; + +CREATE FUNCTION arenadata_toolkit.tracking_set_relstorages(relstorages NAME, dbid OID DEFAULT 0) +returns BOOL AS '$libdir/arenadata_toolkit', +'tracking_set_relstorages' LANGUAGE C EXECUTE ON master; + +REVOKE ALL ON FUNCTION arenadata_toolkit.tracking_set_relstorages(relstorages NAME, dbid OID) FROM public; + +CREATE FUNCTION arenadata_toolkit.tracking_set_snapshot_on_recovery(val BOOL, dbid OID DEFAULT 0) +returns BOOL AS '$libdir/arenadata_toolkit', +'tracking_set_snapshot_on_recovery' LANGUAGE C EXECUTE ON master; + +REVOKE ALL ON FUNCTION arenadata_toolkit.tracking_set_snapshot_on_recovery(val BOOL, dbid OID) FROM public; + +CREATE FUNCTION arenadata_toolkit.tracking_trigger_initial_snapshot(dbid OID DEFAULT 0) +returns BOOL AS '$libdir/arenadata_toolkit', +'tracking_trigger_initial_snapshot' LANGUAGE C; + +CREATE FUNCTION arenadata_toolkit.tracking_is_initial_snapshot_triggered(dbid OID DEFAULT 0) +returns BOOL AS '$libdir/arenadata_toolkit', +'tracking_is_initial_snapshot_triggered' LANGUAGE C; + +REVOKE ALL ON FUNCTION arenadata_toolkit.tracking_is_initial_snapshot_triggered(dbid OID) FROM public; + +CREATE FUNCTION arenadata_toolkit.tracking_is_initial_snapshot_triggered_master(dbid OID DEFAULT 0) +returns BOOL AS '$libdir/arenadata_toolkit', +'tracking_is_initial_snapshot_triggered' LANGUAGE C EXECUTE ON master; + +CREATE FUNCTION arenadata_toolkit.tracking_is_initial_snapshot_triggered_segments(dbid OID DEFAULT 0) +returns BOOL AS '$libdir/arenadata_toolkit', +'tracking_is_initial_snapshot_triggered' LANGUAGE C EXECUTE ON ALL segments; + +CREATE FUNCTION arenadata_toolkit.tracking_is_segment_initialized() +returns TABLE(segindex INT, is_initialized BOOL) AS '$libdir/arenadata_toolkit', +'tracking_is_segment_initialized' LANGUAGE C; + +REVOKE ALL ON FUNCTION arenadata_toolkit.tracking_is_segment_initialized() FROM public; + +CREATE FUNCTION arenadata_toolkit.tracking_get_track_main() +RETURNS TABLE(relid OID, relname NAME, relfilenode OID, size BIGINT, state "char", segid INT, +relnamespace OID, relkind "char", relstorage "char") AS '$libdir/arenadata_toolkit', +'tracking_get_track_main' LANGUAGE C; + +CREATE FUNCTION arenadata_toolkit.tracking_get_track() +RETURNS TABLE(relid OID, relname NAME, relfilenode OID, size BIGINT, state "char", segid INT, +relnamespace OID, relkind "char", relstorage "char") AS '$libdir/arenadata_toolkit', +'tracking_get_track' LANGUAGE C EXECUTE ON master; + +CREATE VIEW arenadata_toolkit.tables_track AS +SELECT t.*, coalesce(c.oid, i.indrelid, vm.relid, blk.relid, seg.relid) AS parent_relid +FROM arenadata_toolkit.tracking_get_track() AS t +LEFT JOIN pg_class AS c + ON c.reltoastrelid = t.relid AND t.relkind = 't' +LEFT JOIN pg_index AS i + ON i.indexrelid = t.relid AND t.relkind = 'i' +LEFT JOIN pg_catalog.pg_appendonly AS vm + ON vm.visimaprelid = t.relid AND t.relkind = 'M' +LEFT JOIN pg_catalog.pg_appendonly AS blk + ON blk.blkdirrelid = t.relid AND t.relkind = 'b' +LEFT JOIN pg_catalog.pg_appendonly AS seg + ON seg.segrelid = t.relid AND t.relkind = 'o'; + +CREATE VIEW arenadata_toolkit.is_initial_snapshot_triggered AS +SELECT CASE + WHEN TRUE = ALL(select arenadata_toolkit.tracking_is_initial_snapshot_triggered_segments()) + AND + arenadata_toolkit.tracking_is_initial_snapshot_triggered_master() + THEN 1 ELSE NULL END AS is_triggered; diff --git a/gpcontrib/arenadata_toolkit/arenadata_toolkit.control b/gpcontrib/arenadata_toolkit/arenadata_toolkit.control index 9d7f496cca3d..cf6ec33cb566 100644 --- a/gpcontrib/arenadata_toolkit/arenadata_toolkit.control +++ b/gpcontrib/arenadata_toolkit/arenadata_toolkit.control @@ -1,5 +1,5 @@ # arenadata_toolkit extension comment = 'extension is used for manipulation of objects created by adb-bundle' -default_version = '1.6' +default_version = '1.7' module_pathname = '$libdir/arenadata_toolkit' relocatable = false diff --git a/gpcontrib/arenadata_toolkit/expected/arenadata_toolkit_guc.out b/gpcontrib/arenadata_toolkit/expected/arenadata_toolkit_guc.out new file mode 100644 index 000000000000..017768eae89d --- /dev/null +++ b/gpcontrib/arenadata_toolkit/expected/arenadata_toolkit_guc.out @@ -0,0 +1,233 @@ +-- start_matchsubs +-- +-- m/ERROR: \[arenadata_toolkit\] exceeded maximum number of tracked databases \(track_files\.c:\d+\)/ +-- s/\d+/XXX/g +-- +-- end_matchsubs +-- Test database registering GUC. +CREATE DATABASE tracking1; +\c tracking1; +CREATE EXTENSION arenadata_toolkit; +SHOW arenadata_toolkit.tracking_is_db_tracked; + arenadata_toolkit.tracking_is_db_tracked +------------------------------------------ + off +(1 row) + +SELECT datname, setconfig FROM pg_db_role_setting JOIN pg_database ON +setdatabase=oid WHERE datname=current_database(); + datname | setconfig +---------+----------- +(0 rows) + +\c -; +\c tracking1; +SELECT arenadata_toolkit.tracking_register_db(); + tracking_register_db +---------------------- + t +(1 row) + +SHOW arenadata_toolkit.tracking_is_db_tracked; + arenadata_toolkit.tracking_is_db_tracked +------------------------------------------ + off +(1 row) + +SELECT datname, setconfig FROM pg_db_role_setting JOIN pg_database ON +setdatabase=oid WHERE datname=current_database(); + datname | setconfig +-----------+---------------------------------------------- + tracking1 | {arenadata_toolkit.tracking_is_db_tracked=t} +(1 row) + +SELECT arenadata_toolkit.tracking_unregister_db(); + tracking_unregister_db +------------------------ + t +(1 row) + +\c -; +\c tracking1; +SHOW arenadata_toolkit.tracking_is_db_tracked; + arenadata_toolkit.tracking_is_db_tracked +------------------------------------------ + off +(1 row) + +SELECT datname, setconfig FROM pg_db_role_setting JOIN pg_database ON +setdatabase=oid WHERE datname=current_database(); + datname | setconfig +-----------+---------------------------------------------- + tracking1 | {arenadata_toolkit.tracking_is_db_tracked=f} +(1 row) + +-- Prohibit manual GUC setting. +SET arenadata_toolkit.tracking_is_db_tracked = true; +ERROR: cannot change tracking status outside the tracking_register_db function +ALTER DATABASE tracking1 SET arenadata_toolkit.tracking_is_db_tracked = true; +ERROR: cannot change tracking status outside the tracking_register_db function +-- Test limit of tracking databases. +SHOW arenadata_toolkit.tracking_db_track_count; + arenadata_toolkit.tracking_db_track_count +------------------------------------------- + 5 +(1 row) + +CREATE DATABASE tracking2; +CREATE DATABASE tracking3; +CREATE DATABASE tracking4; +CREATE DATABASE tracking5; +CREATE DATABASE tracking6; +SELECT arenadata_toolkit.tracking_register_db(oid) FROM pg_database WHERE datname IN +('tracking1', 'tracking2', 'tracking3', 'tracking4', 'tracking5'); +ERROR: [arenadata_toolkit] exceeded maximum number of tracked databases (track_files.c:XXX) +SELECT arenadata_toolkit.tracking_register_db(oid) FROM pg_database WHERE datname IN +('tracking6'); +ERROR: [arenadata_toolkit] exceeded maximum number of tracked databases (track_files.c:XXX) +SELECT arenadata_toolkit.tracking_unregister_db(oid) FROM pg_database WHERE datname IN +('tracking1', 'tracking2', 'tracking3', 'tracking4', 'tracking5', 'tracking6'); + tracking_unregister_db +------------------------ + t + t + t + t + t + t +(6 rows) + +DROP DATABASE IF EXISTS tracking2; +DROP DATABASE IF EXISTS tracking3; +DROP DATABASE IF EXISTS tracking4; +DROP DATABASE IF EXISTS tracking5; +DROP DATABASE IF EXISTS tracking6; +-- Test arenadata_toolkit.tracking_snapshot_on_recovery GUC +SELECT arenadata_toolkit.tracking_set_snapshot_on_recovery(true); + tracking_set_snapshot_on_recovery +----------------------------------- + t +(1 row) + +SELECT datname, setconfig FROM pg_db_role_setting JOIN pg_database ON +setdatabase=oid WHERE datname=current_database(); + datname | setconfig +-----------+------------------------------------------------------------------------------------------------ + tracking1 | {arenadata_toolkit.tracking_is_db_tracked=f,arenadata_toolkit.tracking_snapshot_on_recovery=t} +(1 row) + +-- Prohibit manual GUC setting. +SET arenadata_toolkit.tracking_set_snapshot_on_recovery = false; +ALTER DATABASE tracking1 SET arenadata_toolkit.tracking_set_snapshot_on_recovery = false; +-- Test arenadata_toolkit.tracking_relstorages GUC +SELECT arenadata_toolkit.tracking_set_relstorages('f,a,x'); + tracking_set_relstorages +-------------------------- + t +(1 row) + +SELECT datname, setconfig FROM pg_db_role_setting JOIN pg_database ON +setdatabase=oid WHERE datname=current_database(); + datname | setconfig +-----------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + tracking1 | {arenadata_toolkit.tracking_is_db_tracked=f,arenadata_toolkit.tracking_snapshot_on_recovery=t,arenadata_toolkit.tracking_set_snapshot_on_recovery=false,"arenadata_toolkit.tracking_relstorages=f,a,x"} +(1 row) + +SELECT arenadata_toolkit.tracking_set_relstorages('v,v,v,,,'); + tracking_set_relstorages +-------------------------- + t +(1 row) + +SELECT datname, setconfig FROM pg_db_role_setting JOIN pg_database ON +setdatabase=oid WHERE datname=current_database(); + datname | setconfig +-----------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + tracking1 | {arenadata_toolkit.tracking_is_db_tracked=f,arenadata_toolkit.tracking_snapshot_on_recovery=t,arenadata_toolkit.tracking_set_snapshot_on_recovery=false,arenadata_toolkit.tracking_relstorages=v} +(1 row) + +SELECT arenadata_toolkit.tracking_set_relstorages('d,b,c'); +ERROR: Invalid relstorage type: d +HINT: Valid relstorages are: 'h', 'x', 'a', 'v', 'c', 'f' +-- Prohibit manual GUC setting. +SET arenadata_toolkit.tracking_relstorages = "h, a, x"; +ERROR: cannot change tracking status outside the tracking_register_relstorages function +ALTER DATABASE tracking1 SET arenadata_toolkit.tracking_relstorages = "h, a, x"; +ERROR: cannot change tracking status outside the tracking_register_relstorages function +-- Test arenadata_toolkit.tracking_relkinds GUC +SELECT arenadata_toolkit.tracking_set_relkinds('r,t,o,S'); + tracking_set_relkinds +----------------------- + t +(1 row) + +SELECT datname, setconfig FROM pg_db_role_setting JOIN pg_database ON +setdatabase=oid WHERE datname=current_database(); + datname | setconfig +-----------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + tracking1 | {arenadata_toolkit.tracking_is_db_tracked=f,arenadata_toolkit.tracking_snapshot_on_recovery=t,arenadata_toolkit.tracking_set_snapshot_on_recovery=false,arenadata_toolkit.tracking_relstorages=v,"arenadata_toolkit.tracking_relkinds=r,t,o,S"} +(1 row) + +SELECT arenadata_toolkit.tracking_set_relkinds('m,M,o,,,'); + tracking_set_relkinds +----------------------- + t +(1 row) + +SELECT datname, setconfig FROM pg_db_role_setting JOIN pg_database ON +setdatabase=oid WHERE datname=current_database(); + datname | setconfig +-----------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + tracking1 | {arenadata_toolkit.tracking_is_db_tracked=f,arenadata_toolkit.tracking_snapshot_on_recovery=t,arenadata_toolkit.tracking_set_snapshot_on_recovery=false,arenadata_toolkit.tracking_relstorages=v,"arenadata_toolkit.tracking_relkinds=m,M,o"} +(1 row) + +SELECT arenadata_toolkit.tracking_set_relkinds('d,b,c'); +ERROR: Invalid relkind: d +HINT: Valid relkinds are: 'r', 'i', 'S', 't', 'v', 'c', 'f', 'u', 'm', 'o', 'b', 'M' +-- Prohibit manual GUC setting. +SET arenadata_toolkit.tracking_relkinds = "h, a, x"; +ERROR: cannot change tracking status outside the tracking_register_relkinds function +ALTER DATABASE tracking1 SET arenadata_toolkit.tracking_relkinds = "h, a, x"; +ERROR: cannot change tracking status outside the tracking_register_relkinds function +-- Test arenadata_toolkit.tracking_schemas GUC +SELECT arenadata_toolkit.tracking_register_schema('arenadata_toolkit'); + tracking_register_schema +-------------------------- + t +(1 row) + +SELECT arenadata_toolkit.tracking_register_schema('public'); + tracking_register_schema +-------------------------- + t +(1 row) + +SELECT datname, setconfig FROM pg_db_role_setting JOIN pg_database ON +setdatabase=oid WHERE datname=current_database(); + datname | setconfig +-----------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + tracking1 | {arenadata_toolkit.tracking_is_db_tracked=f,arenadata_toolkit.tracking_snapshot_on_recovery=t,arenadata_toolkit.tracking_set_snapshot_on_recovery=false,arenadata_toolkit.tracking_relstorages=v,"arenadata_toolkit.tracking_relkinds=m,M,o","arenadata_toolkit.tracking_schemas=arenadata_toolkit,public"} +(1 row) + +SELECT arenadata_toolkit.tracking_unregister_schema('public'); + tracking_unregister_schema +---------------------------- + t +(1 row) + +SELECT datname, setconfig FROM pg_db_role_setting JOIN pg_database ON +setdatabase=oid WHERE datname=current_database(); + datname | setconfig +-----------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + tracking1 | {arenadata_toolkit.tracking_is_db_tracked=f,arenadata_toolkit.tracking_snapshot_on_recovery=t,arenadata_toolkit.tracking_set_snapshot_on_recovery=false,arenadata_toolkit.tracking_relstorages=v,"arenadata_toolkit.tracking_relkinds=m,M,o",arenadata_toolkit.tracking_schemas=arenadata_toolkit} +(1 row) + +SELECT arenadata_toolkit.tracking_register_schema('pg_pg'); +ERROR: schema pg_pg does not exist +-- Prohibit manual GUC setting. +SET arenadata_toolkit.tracking_schemas = "pg_catalog, mychema"; +ERROR: cannot change tracking status outside the tracking_register_schema function +ALTER DATABASE tracking1 SET arenadata_toolkit.tracking_schemas = "pg_catalog, mychema"; +ERROR: cannot change tracking status outside the tracking_register_schema function +\c contrib_regression; +DROP DATABASE tracking1; diff --git a/gpcontrib/arenadata_toolkit/expected/arenadata_toolkit_test.out b/gpcontrib/arenadata_toolkit/expected/arenadata_toolkit_test.out index 97e9c1ac4dd8..ef9b5a9c5b9c 100644 --- a/gpcontrib/arenadata_toolkit/expected/arenadata_toolkit_test.out +++ b/gpcontrib/arenadata_toolkit/expected/arenadata_toolkit_test.out @@ -112,31 +112,63 @@ SELECT objname, objtype, objstorage, objacl FROM toolkit_objects_info ORDER BY o db_files_history_backup_YYYYMMDDtHHMMSS | table | a | db_files_history_backup_YYYYMMDDtHHMMSS_1_prt_default_part | table | a | db_files_history_backup_YYYYMMDDtHHMMSS_1_prt_pYYYYMM | table | a | + is_initial_snapshot_triggered | table | v | operation_exclude | table | a | -(24 rows) + tables_track | table | v | + tracking_get_track | proc | - | + tracking_get_track_main | proc | - | + tracking_is_initial_snapshot_triggered | proc | - | {owner=X/owner} + tracking_is_initial_snapshot_triggered_master | proc | - | + tracking_is_initial_snapshot_triggered_segments | proc | - | + tracking_is_segment_initialized | proc | - | {owner=X/owner} + tracking_register_db | proc | - | {owner=X/owner} + tracking_register_schema | proc | - | {owner=X/owner} + tracking_set_relkinds | proc | - | {owner=X/owner} + tracking_set_relstorages | proc | - | {owner=X/owner} + tracking_set_snapshot_on_recovery | proc | - | {owner=X/owner} + tracking_trigger_initial_snapshot | proc | - | + tracking_unregister_db | proc | - | {owner=X/owner} + tracking_unregister_schema | proc | - | {owner=X/owner} +(40 rows) -- check that toolkit objects now depends on extension SELECT objname, objtype, extname, deptype FROM pg_depend d JOIN toolkit_objects_info objs ON d.objid = objs.objid JOIN pg_extension e ON d.refobjid = e.oid WHERE d.deptype = 'e' AND e.extname = 'arenadata_toolkit' ORDER BY objname; - objname | objtype | extname | deptype ----------------------------------------+---------+-------------------+--------- - __db_files_current | table | arenadata_toolkit | e - __db_files_current_unmapped | table | arenadata_toolkit | e - __db_segment_files | table | arenadata_toolkit | e - adb_collect_table_stats | proc | arenadata_toolkit | e - adb_create_tables | proc | arenadata_toolkit | e - adb_get_relfilenodes | proc | arenadata_toolkit | e - adb_hba_file_rules | proc | arenadata_toolkit | e - adb_hba_file_rules_view | table | arenadata_toolkit | e - adb_relation_storage_size | proc | arenadata_toolkit | e - adb_relation_storage_size_on_segments | proc | arenadata_toolkit | e - adb_skew_coefficients | table | arenadata_toolkit | e - adb_vacuum_strategy | proc | arenadata_toolkit | e - adb_vacuum_strategy_newest_first | proc | arenadata_toolkit | e - adb_vacuum_strategy_newest_last | proc | arenadata_toolkit | e -(14 rows) + objname | objtype | extname | deptype +-------------------------------------------------+---------+-------------------+--------- + __db_files_current | table | arenadata_toolkit | e + __db_files_current_unmapped | table | arenadata_toolkit | e + __db_segment_files | table | arenadata_toolkit | e + adb_collect_table_stats | proc | arenadata_toolkit | e + adb_create_tables | proc | arenadata_toolkit | e + adb_get_relfilenodes | proc | arenadata_toolkit | e + adb_hba_file_rules | proc | arenadata_toolkit | e + adb_hba_file_rules_view | table | arenadata_toolkit | e + adb_relation_storage_size | proc | arenadata_toolkit | e + adb_relation_storage_size_on_segments | proc | arenadata_toolkit | e + adb_skew_coefficients | table | arenadata_toolkit | e + adb_vacuum_strategy | proc | arenadata_toolkit | e + adb_vacuum_strategy_newest_first | proc | arenadata_toolkit | e + adb_vacuum_strategy_newest_last | proc | arenadata_toolkit | e + is_initial_snapshot_triggered | table | arenadata_toolkit | e + tables_track | table | arenadata_toolkit | e + tracking_get_track | proc | arenadata_toolkit | e + tracking_get_track_main | proc | arenadata_toolkit | e + tracking_is_initial_snapshot_triggered | proc | arenadata_toolkit | e + tracking_is_initial_snapshot_triggered_master | proc | arenadata_toolkit | e + tracking_is_initial_snapshot_triggered_segments | proc | arenadata_toolkit | e + tracking_is_segment_initialized | proc | arenadata_toolkit | e + tracking_register_db | proc | arenadata_toolkit | e + tracking_register_schema | proc | arenadata_toolkit | e + tracking_set_relkinds | proc | arenadata_toolkit | e + tracking_set_relstorages | proc | arenadata_toolkit | e + tracking_set_snapshot_on_recovery | proc | arenadata_toolkit | e + tracking_trigger_initial_snapshot | proc | arenadata_toolkit | e + tracking_unregister_db | proc | arenadata_toolkit | e + tracking_unregister_schema | proc | arenadata_toolkit | e +(30 rows) DROP EXTENSION arenadata_toolkit; DROP SCHEMA arenadata_toolkit CASCADE; @@ -153,53 +185,85 @@ SELECT arenadata_toolkit.adb_create_tables(); -- show toolkit objects (and their grants) that belongs to arenadata_toolkit schema after creating -- extension and calling adb_create_tables SELECT objname, objtype, objstorage, objacl FROM toolkit_objects_info ORDER BY objname; - objname | objtype | objstorage | objacl ----------------------------------------+---------+------------+-------------------------------- - __db_files_current | table | v | {owner=arwdDxt/owner,=r/owner} - __db_files_current_unmapped | table | v | {owner=arwdDxt/owner,=r/owner} - __db_segment_files | table | v | {owner=arwdDxt/owner,=r/owner} - adb_collect_table_stats | proc | - | {owner=X/owner} - adb_create_tables | proc | - | {owner=X/owner} - adb_get_relfilenodes | proc | - | {=X/owner,owner=X/owner} - adb_hba_file_rules | proc | - | {owner=X/owner} - adb_hba_file_rules_view | table | v | {owner=arwdDxt/owner} - adb_relation_storage_size | proc | - | {=X/owner,owner=X/owner} - adb_relation_storage_size_on_segments | proc | - | {=X/owner,owner=X/owner} - adb_skew_coefficients | table | v | {owner=arwdDxt/owner,=r/owner} - adb_vacuum_strategy | proc | - | {owner=X/owner} - adb_vacuum_strategy_newest_first | proc | - | {owner=X/owner} - adb_vacuum_strategy_newest_last | proc | - | {owner=X/owner} - arenadata_toolkit | schema | - | {owner=UC/owner,=U/owner} - daily_operation | table | a | {owner=arwdDxt/owner} - db_files_current | table | h | {owner=arwdDxt/owner,=r/owner} - db_files_history | table | a | {owner=arwdDxt/owner} - db_files_history_1_prt_default_part | table | a | {owner=arwdDxt/owner} - db_files_history_1_prt_pYYYYMM | table | a | {owner=arwdDxt/owner} - operation_exclude | table | a | {owner=arwdDxt/owner} -(21 rows) + objname | objtype | objstorage | objacl +-------------------------------------------------+---------+------------+-------------------------------- + __db_files_current | table | v | {owner=arwdDxt/owner,=r/owner} + __db_files_current_unmapped | table | v | {owner=arwdDxt/owner,=r/owner} + __db_segment_files | table | v | {owner=arwdDxt/owner,=r/owner} + adb_collect_table_stats | proc | - | {owner=X/owner} + adb_create_tables | proc | - | {owner=X/owner} + adb_get_relfilenodes | proc | - | {=X/owner,owner=X/owner} + adb_hba_file_rules | proc | - | {owner=X/owner} + adb_hba_file_rules_view | table | v | {owner=arwdDxt/owner} + adb_relation_storage_size | proc | - | {=X/owner,owner=X/owner} + adb_relation_storage_size_on_segments | proc | - | {=X/owner,owner=X/owner} + adb_skew_coefficients | table | v | {owner=arwdDxt/owner,=r/owner} + adb_vacuum_strategy | proc | - | {owner=X/owner} + adb_vacuum_strategy_newest_first | proc | - | {owner=X/owner} + adb_vacuum_strategy_newest_last | proc | - | {owner=X/owner} + arenadata_toolkit | schema | - | {owner=UC/owner,=U/owner} + daily_operation | table | a | {owner=arwdDxt/owner} + db_files_current | table | h | {owner=arwdDxt/owner,=r/owner} + db_files_history | table | a | {owner=arwdDxt/owner} + db_files_history_1_prt_default_part | table | a | {owner=arwdDxt/owner} + db_files_history_1_prt_p202410 | table | a | {owner=arwdDxt/owner} + is_initial_snapshot_triggered | table | v | + operation_exclude | table | a | {owner=arwdDxt/owner} + tables_track | table | v | + tracking_get_track | proc | - | + tracking_get_track_main | proc | - | + tracking_is_initial_snapshot_triggered | proc | - | {owner=X/owner} + tracking_is_initial_snapshot_triggered_master | proc | - | + tracking_is_initial_snapshot_triggered_segments | proc | - | + tracking_is_segment_initialized | proc | - | {owner=X/owner} + tracking_register_db | proc | - | {owner=X/owner} + tracking_register_schema | proc | - | {owner=X/owner} + tracking_set_relkinds | proc | - | {owner=X/owner} + tracking_set_relstorages | proc | - | {owner=X/owner} + tracking_set_snapshot_on_recovery | proc | - | {owner=X/owner} + tracking_trigger_initial_snapshot | proc | - | + tracking_unregister_db | proc | - | {owner=X/owner} + tracking_unregister_schema | proc | - | {owner=X/owner} +(37 rows) -- check that toolkit objects now depends on extension SELECT objname, objtype, extname, deptype FROM pg_depend d JOIN toolkit_objects_info objs ON d.objid = objs.objid JOIN pg_extension e ON d.refobjid = e.oid WHERE d.deptype = 'e' AND e.extname = 'arenadata_toolkit' ORDER BY objname; - objname | objtype | extname | deptype ----------------------------------------+---------+-------------------+--------- - __db_files_current | table | arenadata_toolkit | e - __db_files_current_unmapped | table | arenadata_toolkit | e - __db_segment_files | table | arenadata_toolkit | e - adb_collect_table_stats | proc | arenadata_toolkit | e - adb_create_tables | proc | arenadata_toolkit | e - adb_get_relfilenodes | proc | arenadata_toolkit | e - adb_hba_file_rules | proc | arenadata_toolkit | e - adb_hba_file_rules_view | table | arenadata_toolkit | e - adb_relation_storage_size | proc | arenadata_toolkit | e - adb_relation_storage_size_on_segments | proc | arenadata_toolkit | e - adb_skew_coefficients | table | arenadata_toolkit | e - adb_vacuum_strategy | proc | arenadata_toolkit | e - adb_vacuum_strategy_newest_first | proc | arenadata_toolkit | e - adb_vacuum_strategy_newest_last | proc | arenadata_toolkit | e -(14 rows) + objname | objtype | extname | deptype +-------------------------------------------------+---------+-------------------+--------- + __db_files_current | table | arenadata_toolkit | e + __db_files_current_unmapped | table | arenadata_toolkit | e + __db_segment_files | table | arenadata_toolkit | e + adb_collect_table_stats | proc | arenadata_toolkit | e + adb_create_tables | proc | arenadata_toolkit | e + adb_get_relfilenodes | proc | arenadata_toolkit | e + adb_hba_file_rules | proc | arenadata_toolkit | e + adb_hba_file_rules_view | table | arenadata_toolkit | e + adb_relation_storage_size | proc | arenadata_toolkit | e + adb_relation_storage_size_on_segments | proc | arenadata_toolkit | e + adb_skew_coefficients | table | arenadata_toolkit | e + adb_vacuum_strategy | proc | arenadata_toolkit | e + adb_vacuum_strategy_newest_first | proc | arenadata_toolkit | e + adb_vacuum_strategy_newest_last | proc | arenadata_toolkit | e + is_initial_snapshot_triggered | table | arenadata_toolkit | e + tables_track | table | arenadata_toolkit | e + tracking_get_track | proc | arenadata_toolkit | e + tracking_get_track_main | proc | arenadata_toolkit | e + tracking_is_initial_snapshot_triggered | proc | arenadata_toolkit | e + tracking_is_initial_snapshot_triggered_master | proc | arenadata_toolkit | e + tracking_is_initial_snapshot_triggered_segments | proc | arenadata_toolkit | e + tracking_is_segment_initialized | proc | arenadata_toolkit | e + tracking_register_db | proc | arenadata_toolkit | e + tracking_register_schema | proc | arenadata_toolkit | e + tracking_set_relkinds | proc | arenadata_toolkit | e + tracking_set_relstorages | proc | arenadata_toolkit | e + tracking_set_snapshot_on_recovery | proc | arenadata_toolkit | e + tracking_trigger_initial_snapshot | proc | arenadata_toolkit | e + tracking_unregister_db | proc | arenadata_toolkit | e + tracking_unregister_schema | proc | arenadata_toolkit | e +(30 rows) DROP EXTENSION arenadata_toolkit; DROP SCHEMA arenadata_toolkit CASCADE; diff --git a/gpcontrib/arenadata_toolkit/expected/arenadata_toolkit_tracking.out b/gpcontrib/arenadata_toolkit/expected/arenadata_toolkit_tracking.out new file mode 100644 index 000000000000..3f2d7ef47e4e --- /dev/null +++ b/gpcontrib/arenadata_toolkit/expected/arenadata_toolkit_tracking.out @@ -0,0 +1,217 @@ +-- Tests for size tracking logic introduced in version 1.7 +-- start_matchsubs +-- m/ERROR: database \d+ is not tracked \(track_files\.c:\d+\)/ +-- s/\d+/XXX/g +-- end_matchsubs +CREATE DATABASE tracking_db1; +\c tracking_db1; +CREATE EXTENSION arenadata_toolkit; +-- 1. Test getting track on not registered database; +SELECT * FROM arenadata_toolkit.tracking_get_track(); +ERROR: database 44817 is not tracked (track_files.c:347) +CONTEXT: SQL statement "SELECT * FROM arenadata_toolkit.tracking_get_track_main()" +SELECT arenadata_toolkit.tracking_register_db(); + tracking_register_db +---------------------- + t +(1 row) + +-- 2. Test initial snapshot behaviour. Triggering initial snapshot leads to +-- setting up the bloom filter such that all relfilenodes are considered. +SELECT arenadata_toolkit.tracking_trigger_initial_snapshot(); + tracking_trigger_initial_snapshot +----------------------------------- + t +(1 row) + +SELECT is_triggered FROM arenadata_toolkit.is_initial_snapshot_triggered; + is_triggered +-------------- + 1 +(1 row) + +-- 3. If user hasn't registered any schema, the default schemas are used. +-- See arenadata_toolkit_guc.c. At commit the bloom filter is cleared. The next +-- call of tracking_get_track() will return nothing if database is not modified in between. +SELECT count(*) FROM arenadata_toolkit.tracking_get_track(); + count +------- + 1056 +(1 row) + +-- 4. Create table in specific schema and register that schema. +CREATE TABLE arenadata_toolkit.tracking_t1 (i INT) +WITH (appendonly=true, orientation=column) DISTRIBUTED BY (i); +SELECT arenadata_toolkit.tracking_register_schema('arenadata_toolkit'); + tracking_register_schema +-------------------------- + t +(1 row) + +-- Getting the track. Only created table with size 0 is expected; +SELECT relname, size, state, segid, relkind, relstorage +FROM arenadata_toolkit.tracking_get_track(); + relname | size | state | segid | relkind | relstorage +-------------+------+-------+-------+---------+------------ + tracking_t1 | 0 | a | -1 | r | c + tracking_t1 | 0 | a | 0 | r | c + tracking_t1 | 0 | a | 1 | r | c + tracking_t1 | 0 | a | 2 | r | c +(4 rows) + +-- 5. Test data extending event. Bloom should capture it. +INSERT INTO arenadata_toolkit.tracking_t1 SELECT generate_series(1,100000); +SELECT relname, size, state, segid, relkind, relstorage +FROM arenadata_toolkit.tracking_get_track(); + relname | size | state | segid | relkind | relstorage +-------------+--------+-------+-------+---------+------------ + tracking_t1 | 134064 | a | 0 | r | c + tracking_t1 | 133528 | a | 1 | r | c + tracking_t1 | 133064 | a | 2 | r | c +(3 rows) + +-- 6. Dropping table. The track shows only relfilenodes without names and other additional info with status 'd'. +DROP TABLE arenadata_toolkit.tracking_t1; +SELECT relname, size, state, segid, relkind, relstorage +FROM arenadata_toolkit.tracking_get_track(); + relname | size | state | segid | relkind | relstorage +---------+------+-------+-------+---------+------------ + | 0 | d | -1 | | + | 0 | d | -1 | | + | 0 | d | -1 | | + | 0 | d | -1 | | + | 0 | d | 0 | | + | 0 | d | 0 | | + | 0 | d | 0 | | + | 0 | d | 0 | | + | 0 | d | 1 | | + | 0 | d | 1 | | + | 0 | d | 1 | | + | 0 | d | 1 | | + | 0 | d | 2 | | + | 0 | d | 2 | | + | 0 | d | 2 | | + | 0 | d | 2 | | +(16 rows) + +-- 8. Test actions on commit and rollback +CREATE TABLE arenadata_toolkit.tracking_t1 (i INT) +WITH (appendonly=true, orientation=column) DISTRIBUTED BY (i); +INSERT INTO arenadata_toolkit.tracking_t1 SELECT generate_series(1,100000); +-- If the wrapping transaction rollbacks, the Bloom filter is not cleared up. +BEGIN; +SELECT relname, size, state, segid, relkind, relstorage +FROM arenadata_toolkit.tracking_get_track(); + relname | size | state | segid | relkind | relstorage +-------------+--------+-------+-------+---------+------------ + tracking_t1 | 0 | a | -1 | r | c + tracking_t1 | 134064 | a | 0 | r | c + tracking_t1 | 133528 | a | 1 | r | c + tracking_t1 | 133064 | a | 2 | r | c +(4 rows) + +ROLLBACK; +-- If commits, filter is cleared. +BEGIN; +SELECT relname, size, state, segid, relkind, relstorage +FROM arenadata_toolkit.tracking_get_track(); + relname | size | state | segid | relkind | relstorage +-------------+--------+-------+-------+---------+------------ + tracking_t1 | 0 | a | -1 | r | c + tracking_t1 | 134064 | a | 0 | r | c + tracking_t1 | 133528 | a | 1 | r | c + tracking_t1 | 133064 | a | 2 | r | c +(4 rows) + +COMMIT; +SELECT relname, size, state, segid, relkind, relstorage +FROM arenadata_toolkit.tracking_get_track(); + relname | size | state | segid | relkind | relstorage +---------+------+-------+-------+---------+------------ +(0 rows) + +-- 9. Test repetitive track call within the same transaction. In case of +-- rollback only first changes shoul be present. +INSERT INTO arenadata_toolkit.tracking_t1 SELECT generate_series(1,10000); +BEGIN; +SELECT relname, size, state, segid, relkind, relstorage +FROM arenadata_toolkit.tracking_get_track(); + relname | size | state | segid | relkind | relstorage +-------------+--------+-------+-------+---------+------------ + tracking_t1 | 147576 | a | 0 | r | c + tracking_t1 | 147112 | a | 1 | r | c + tracking_t1 | 146096 | a | 2 | r | c +(3 rows) + +CREATE TABLE arenadata_toolkit.tracking_t2 (j BIGINT) DISTRIBUTED BY (j); +INSERT INTO arenadata_toolkit.tracking_t2 SELECT generate_series(1,10000); +INSERT INTO arenadata_toolkit.tracking_t1 SELECT generate_series(1,10000); +SELECT relname, size, state, segid, relkind, relstorage +FROM arenadata_toolkit.tracking_get_track(); + relname | size | state | segid | relkind | relstorage +-------------+--------+-------+-------+---------+------------ + tracking_t1 | 161088 | a | 0 | r | c + tracking_t2 | 229376 | a | 0 | r | h + tracking_t1 | 160696 | a | 1 | r | c + tracking_t2 | 229376 | a | 1 | r | h + tracking_t1 | 159128 | a | 2 | r | c + tracking_t2 | 229376 | a | 2 | r | h +(6 rows) + +ROLLBACK; +SELECT relname, size, state, segid, relkind, relstorage +FROM arenadata_toolkit.tracking_get_track(); + relname | size | state | segid | relkind | relstorage +-------------+--------+-------+-------+---------+------------ + | 0 | d | -1 | | + tracking_t1 | 161088 | a | 0 | r | c + | 0 | d | 0 | | + tracking_t1 | 160696 | a | 1 | r | c + | 0 | d | 1 | | + tracking_t1 | 159128 | a | 2 | r | c + | 0 | d | 2 | | +(7 rows) + +-- 10. Test relkind filtering. +CREATE TABLE arenadata_toolkit.tracking_t1 (i INT) +WITH (appendonly=true, orientation=column) DISTRIBUTED BY (i); +ERROR: relation "tracking_t1" already exists +INSERT INTO arenadata_toolkit.tracking_t1 SELECT generate_series(1,100000); +CREATE INDEX ON arenadata_toolkit.tracking_t1(i); +-- Want to see index and block dir relation. +SELECT arenadata_toolkit.tracking_register_schema('pg_aoseg'); + tracking_register_schema +-------------------------- + t +(1 row) + +SELECT arenadata_toolkit.tracking_set_relkinds('o,i'); + tracking_set_relkinds +----------------------- + t +(1 row) + +SELECT size, state, segid, relkind, relstorage +FROM arenadata_toolkit.tracking_get_track(); + size | state | segid | relkind | relstorage +---------+-------+-------+---------+------------ + 32768 | a | -1 | i | h + 32768 | a | -1 | i | h + 1638400 | a | 0 | i | h + 65536 | a | 0 | i | h + 1638400 | a | 1 | i | h + 65536 | a | 1 | i | h + 1638400 | a | 2 | i | h + 65536 | a | 2 | i | h +(8 rows) + +DROP TABLE arenadata_toolkit.tracking_t1; +-- Clean up +SELECT arenadata_toolkit.tracking_unregister_db(); + tracking_unregister_db +------------------------ + t +(1 row) + +\c contrib_regression; +DROP DATABASE tracking_db1; diff --git a/gpcontrib/arenadata_toolkit/expected/upgrade_test.out b/gpcontrib/arenadata_toolkit/expected/upgrade_test.out index 35cb2c00ccab..22915ab53bc9 100644 --- a/gpcontrib/arenadata_toolkit/expected/upgrade_test.out +++ b/gpcontrib/arenadata_toolkit/expected/upgrade_test.out @@ -175,7 +175,12 @@ ORDER BY 1; 1.5: column tablespace_location check 1.5: create the latest check 1.5: only alter check -(34 rows) + 1.6: alter and create_tables check + 1.6: alter, create_tables and collect_table_stats check + 1.6: column tablespace_location check + 1.6: create the latest check + 1.6: only alter check +(39 rows) -- Cleanup DROP FUNCTION do_upgrade_test_for_arenadata_toolkit(TEXT); diff --git a/gpcontrib/arenadata_toolkit/sql/arenadata_toolkit_guc.sql b/gpcontrib/arenadata_toolkit/sql/arenadata_toolkit_guc.sql new file mode 100644 index 000000000000..9ecf2ef897c2 --- /dev/null +++ b/gpcontrib/arenadata_toolkit/sql/arenadata_toolkit_guc.sql @@ -0,0 +1,144 @@ +-- start_matchsubs +-- +-- m/ERROR: \[arenadata_toolkit\] exceeded maximum number of tracked databases \(track_files\.c:\d+\)/ +-- s/\d+/XXX/g +-- +-- end_matchsubs +--start_ignore +DROP DATABASE IF EXISTS tracking1; +DROP DATABASE IF EXISTS tracking2; +DROP DATABASE IF EXISTS tracking3; +DROP DATABASE IF EXISTS tracking4; +DROP DATABASE IF EXISTS tracking5; +DROP DATABASE IF EXISTS tracking6; +--end_ignore + +-- Test database registering GUC. +CREATE DATABASE tracking1; +\c tracking1; +CREATE EXTENSION arenadata_toolkit; + +SHOW arenadata_toolkit.tracking_is_db_tracked; + +SELECT datname, setconfig FROM pg_db_role_setting JOIN pg_database ON +setdatabase=oid WHERE datname=current_database(); + +\c -; +\c tracking1; + +SELECT arenadata_toolkit.tracking_register_db(); + +SHOW arenadata_toolkit.tracking_is_db_tracked; + +SELECT datname, setconfig FROM pg_db_role_setting JOIN pg_database ON +setdatabase=oid WHERE datname=current_database(); + +SELECT arenadata_toolkit.tracking_unregister_db(); + +\c -; +\c tracking1; + +SHOW arenadata_toolkit.tracking_is_db_tracked; + +SELECT datname, setconfig FROM pg_db_role_setting JOIN pg_database ON +setdatabase=oid WHERE datname=current_database(); + +-- Prohibit manual GUC setting. +SET arenadata_toolkit.tracking_is_db_tracked = true; + +ALTER DATABASE tracking1 SET arenadata_toolkit.tracking_is_db_tracked = true; + +-- Test limit of tracking databases. +SHOW arenadata_toolkit.tracking_db_track_count; + +CREATE DATABASE tracking2; +CREATE DATABASE tracking3; +CREATE DATABASE tracking4; +CREATE DATABASE tracking5; +CREATE DATABASE tracking6; + +SELECT arenadata_toolkit.tracking_register_db(oid) FROM pg_database WHERE datname IN +('tracking1', 'tracking2', 'tracking3', 'tracking4', 'tracking5'); + +SELECT arenadata_toolkit.tracking_register_db(oid) FROM pg_database WHERE datname IN +('tracking6'); + +SELECT arenadata_toolkit.tracking_unregister_db(oid) FROM pg_database WHERE datname IN +('tracking1', 'tracking2', 'tracking3', 'tracking4', 'tracking5', 'tracking6'); + +DROP DATABASE IF EXISTS tracking2; +DROP DATABASE IF EXISTS tracking3; +DROP DATABASE IF EXISTS tracking4; +DROP DATABASE IF EXISTS tracking5; +DROP DATABASE IF EXISTS tracking6; + +-- Test arenadata_toolkit.tracking_snapshot_on_recovery GUC +SELECT arenadata_toolkit.tracking_set_snapshot_on_recovery(true); + +SELECT datname, setconfig FROM pg_db_role_setting JOIN pg_database ON +setdatabase=oid WHERE datname=current_database(); + +-- Prohibit manual GUC setting. +SET arenadata_toolkit.tracking_set_snapshot_on_recovery = false; + +ALTER DATABASE tracking1 SET arenadata_toolkit.tracking_set_snapshot_on_recovery = false; + +-- Test arenadata_toolkit.tracking_relstorages GUC +SELECT arenadata_toolkit.tracking_set_relstorages('f,a,x'); + +SELECT datname, setconfig FROM pg_db_role_setting JOIN pg_database ON +setdatabase=oid WHERE datname=current_database(); + +SELECT arenadata_toolkit.tracking_set_relstorages('v,v,v,,,'); + +SELECT datname, setconfig FROM pg_db_role_setting JOIN pg_database ON +setdatabase=oid WHERE datname=current_database(); + +SELECT arenadata_toolkit.tracking_set_relstorages('d,b,c'); + +-- Prohibit manual GUC setting. +SET arenadata_toolkit.tracking_relstorages = "h, a, x"; + +ALTER DATABASE tracking1 SET arenadata_toolkit.tracking_relstorages = "h, a, x"; + +-- Test arenadata_toolkit.tracking_relkinds GUC +SELECT arenadata_toolkit.tracking_set_relkinds('r,t,o,S'); + +SELECT datname, setconfig FROM pg_db_role_setting JOIN pg_database ON +setdatabase=oid WHERE datname=current_database(); + +SELECT arenadata_toolkit.tracking_set_relkinds('m,M,o,,,'); + +SELECT datname, setconfig FROM pg_db_role_setting JOIN pg_database ON +setdatabase=oid WHERE datname=current_database(); + +SELECT arenadata_toolkit.tracking_set_relkinds('d,b,c'); + +-- Prohibit manual GUC setting. +SET arenadata_toolkit.tracking_relkinds = "h, a, x"; + +ALTER DATABASE tracking1 SET arenadata_toolkit.tracking_relkinds = "h, a, x"; + +-- Test arenadata_toolkit.tracking_schemas GUC +SELECT arenadata_toolkit.tracking_register_schema('arenadata_toolkit'); + +SELECT arenadata_toolkit.tracking_register_schema('public'); + +SELECT datname, setconfig FROM pg_db_role_setting JOIN pg_database ON +setdatabase=oid WHERE datname=current_database(); + +SELECT arenadata_toolkit.tracking_unregister_schema('public'); + +SELECT datname, setconfig FROM pg_db_role_setting JOIN pg_database ON +setdatabase=oid WHERE datname=current_database(); + +SELECT arenadata_toolkit.tracking_register_schema('pg_pg'); + +-- Prohibit manual GUC setting. +SET arenadata_toolkit.tracking_schemas = "pg_catalog, mychema"; + +ALTER DATABASE tracking1 SET arenadata_toolkit.tracking_schemas = "pg_catalog, mychema"; + +\c contrib_regression; + +DROP DATABASE tracking1; diff --git a/gpcontrib/arenadata_toolkit/sql/arenadata_toolkit_tracking.sql b/gpcontrib/arenadata_toolkit/sql/arenadata_toolkit_tracking.sql new file mode 100644 index 000000000000..0465d606fecf --- /dev/null +++ b/gpcontrib/arenadata_toolkit/sql/arenadata_toolkit_tracking.sql @@ -0,0 +1,103 @@ +-- Tests for size tracking logic introduced in version 1.7 +-- start_matchsubs +-- m/ERROR: database \d+ is not tracked \(track_files\.c:\d+\)/ +-- s/\d+/XXX/g +-- end_matchsubs +--start_ignore +DROP DATABASE IF EXISTS tracking1; +--end_ignore +CREATE DATABASE tracking_db1; +\c tracking_db1; +CREATE EXTENSION arenadata_toolkit; + +-- 1. Test getting track on not registered database; +SELECT * FROM arenadata_toolkit.tracking_get_track(); + +SELECT arenadata_toolkit.tracking_register_db(); + +-- 2. Test initial snapshot behaviour. Triggering initial snapshot leads to +-- setting up the bloom filter such that all relfilenodes are considered. +SELECT arenadata_toolkit.tracking_trigger_initial_snapshot(); +SELECT is_triggered FROM arenadata_toolkit.is_initial_snapshot_triggered; + +-- 3. If user hasn't registered any schema, the default schemas are used. +-- See arenadata_toolkit_guc.c. At commit the bloom filter is cleared. The next +-- call of tracking_get_track() will return nothing if database is not modified in between. +SELECT count(*) FROM arenadata_toolkit.tracking_get_track(); + +-- 4. Create table in specific schema and register that schema. +CREATE TABLE arenadata_toolkit.tracking_t1 (i INT) +WITH (appendonly=true, orientation=column) DISTRIBUTED BY (i); + +SELECT arenadata_toolkit.tracking_register_schema('arenadata_toolkit'); + +-- Getting the track. Only created table with size 0 is expected; +SELECT relname, size, state, segid, relkind, relstorage +FROM arenadata_toolkit.tracking_get_track(); + +-- 5. Test data extending event. Bloom should capture it. +INSERT INTO arenadata_toolkit.tracking_t1 SELECT generate_series(1,100000); +SELECT relname, size, state, segid, relkind, relstorage +FROM arenadata_toolkit.tracking_get_track(); + +-- 6. Dropping table. The track shows only relfilenodes without names and other additional info with status 'd'. +DROP TABLE arenadata_toolkit.tracking_t1; +SELECT relname, size, state, segid, relkind, relstorage +FROM arenadata_toolkit.tracking_get_track(); + +-- 8. Test actions on commit and rollback +CREATE TABLE arenadata_toolkit.tracking_t1 (i INT) +WITH (appendonly=true, orientation=column) DISTRIBUTED BY (i); +INSERT INTO arenadata_toolkit.tracking_t1 SELECT generate_series(1,100000); + +-- If the wrapping transaction rollbacks, the Bloom filter is not cleared up. +BEGIN; +SELECT relname, size, state, segid, relkind, relstorage +FROM arenadata_toolkit.tracking_get_track(); +ROLLBACK; + +-- If commits, filter is cleared. +BEGIN; +SELECT relname, size, state, segid, relkind, relstorage +FROM arenadata_toolkit.tracking_get_track(); +COMMIT; +SELECT relname, size, state, segid, relkind, relstorage +FROM arenadata_toolkit.tracking_get_track(); + +-- 9. Test repetitive track call within the same transaction. In case of +-- rollback only first changes shoul be present. +INSERT INTO arenadata_toolkit.tracking_t1 SELECT generate_series(1,10000); +BEGIN; +SELECT relname, size, state, segid, relkind, relstorage +FROM arenadata_toolkit.tracking_get_track(); + +CREATE TABLE arenadata_toolkit.tracking_t2 (j BIGINT) DISTRIBUTED BY (j); +INSERT INTO arenadata_toolkit.tracking_t2 SELECT generate_series(1,10000); +INSERT INTO arenadata_toolkit.tracking_t1 SELECT generate_series(1,10000); + +SELECT relname, size, state, segid, relkind, relstorage +FROM arenadata_toolkit.tracking_get_track(); +ROLLBACK; +SELECT relname, size, state, segid, relkind, relstorage +FROM arenadata_toolkit.tracking_get_track(); + +-- 10. Test relkind filtering. +CREATE TABLE arenadata_toolkit.tracking_t1 (i INT) +WITH (appendonly=true, orientation=column) DISTRIBUTED BY (i); +INSERT INTO arenadata_toolkit.tracking_t1 SELECT generate_series(1,100000); +CREATE INDEX ON arenadata_toolkit.tracking_t1(i); + +-- Want to see index and block dir relation. +SELECT arenadata_toolkit.tracking_register_schema('pg_aoseg'); +SELECT arenadata_toolkit.tracking_set_relkinds('o,i'); + +SELECT size, state, segid, relkind, relstorage +FROM arenadata_toolkit.tracking_get_track(); + +DROP TABLE arenadata_toolkit.tracking_t1; + +-- Clean up +SELECT arenadata_toolkit.tracking_unregister_db(); + +\c contrib_regression; +DROP DATABASE tracking_db1; diff --git a/gpcontrib/arenadata_toolkit/src/arenadata_toolkit.c b/gpcontrib/arenadata_toolkit/src/arenadata_toolkit.c new file mode 100644 index 000000000000..9a58b4c19f55 --- /dev/null +++ b/gpcontrib/arenadata_toolkit/src/arenadata_toolkit.c @@ -0,0 +1,43 @@ +#include "postgres.h" + +#include "access/xlog.h" +#include "cdb/cdbvars.h" +#include "miscadmin.h" +#include "postmaster/bgworker.h" +#include "storage/shmem.h" + +#include "arenadata_toolkit_guc.h" +#include "arenadata_toolkit_worker.h" +#include "drops_track.h" +#include "file_hook.h" +#include "tf_shmem.h" + +void _PG_init(void); +void _PG_fini(void); + +void +_PG_init(void) +{ + if (!process_shared_preload_libraries_in_progress) + return; + + tf_guc_define(); + tf_shmem_init(); + file_hook_init(); + + drops_track_init(); + + if (IS_QUERY_DISPATCHER()) + { + arenadata_toolkit_worker_register(); + } +} + +void +_PG_fini(void) +{ + tf_shmem_deinit(); + file_hook_deinit(); + file_hook_deinit(); + drops_track_deinit(); +} diff --git a/gpcontrib/arenadata_toolkit/src/arenadata_toolkit_guc.c b/gpcontrib/arenadata_toolkit/src/arenadata_toolkit_guc.c new file mode 100644 index 000000000000..ad4603984869 --- /dev/null +++ b/gpcontrib/arenadata_toolkit/src/arenadata_toolkit_guc.c @@ -0,0 +1,295 @@ +#include "arenadata_toolkit_guc.h" + + +#include "cdb/cdbvars.h" +#include "catalog/objectaccess.h" +#include "catalog/pg_db_role_setting.h" +#include +#include "utils/guc.h" +#include "tf_shmem.h" + +#define DEFAULT_BLOOM_SIZE 1000000 +#define DEFAULT_DB_TRACK_COUNT 5 +#define DEFAULT_IS_TRACKED false +#define DEFAULT_DROPS_COUNT 100000 +#define DEFAULT_TRACKED_SCHEMAS "public,arenadata_toolkit,pg_catalog,pg_toast,pg_aoseg,information_schema" +#define DEFAULT_GET_FULL_SNAPSHOT_ON_RECOVERY true +#define DEFAULT_TRACKED_REL_STORAGES "h,a,c" +#define DEFAULT_TRACKED_REL_KINDS "r,i,t,m,o,b,M" +#define DEFAULT_NAPTIME 60 + +#define MIN_BLOOM_SIZE 1 +#define MIN_DB_TRACK_COUNT 1 +#define MIN_DROPS_COUNT 1 +#define MIN_NAPTIME 1 + +#define MAX_BLOOM_SIZE 128000000 +#define MAX_DB_TRACK_COUNT 1000 +#define MAX_DROPS_COUNT 1000000 +#define MAX_NAPTIME OID_MAX & 0x7FFFFFFF + +int bloom_size = DEFAULT_BLOOM_SIZE; +int db_track_count = DEFAULT_DB_TRACK_COUNT; +bool is_tracked = DEFAULT_IS_TRACKED; +bool get_full_snapshot_on_recovery = DEFAULT_GET_FULL_SNAPSHOT_ON_RECOVERY; +int drops_count = DEFAULT_DROPS_COUNT; +char *tracked_schemas = DEFAULT_TRACKED_SCHEMAS; +char *tracked_rel_storages = DEFAULT_TRACKED_REL_STORAGES; +char *tracked_rel_kinds = DEFAULT_TRACKED_REL_KINDS; +int tracking_worker_naptime_sec = DEFAULT_NAPTIME; + +static bool is_tracked_unlocked = false; +static bool is_get_full_snapshot_on_recovery_unlocked = false; +static bool is_schemas_unlocked = false; +static bool is_relkinds_unlocked = false; +static bool is_relstorages_unlocked = false; + +void +tf_guc_unlock_tracked_once(void) +{ + if (!is_tracked_unlocked) + is_tracked_unlocked = true; +} + +void +tf_guc_unlock_full_snapshot_on_recovery_once(void) +{ + if (!is_get_full_snapshot_on_recovery_unlocked) + is_get_full_snapshot_on_recovery_unlocked = true; +} + +void +tf_guc_unlock_schemas_once(void) +{ + if (!is_schemas_unlocked) + is_schemas_unlocked = true; +} + +void +tf_guc_unlock_relkinds_once(void) +{ + if (!is_relkinds_unlocked) + is_relkinds_unlocked = true; +} + +void +tf_guc_unlock_relstorages_once(void) +{ + if (!is_relstorages_unlocked) + is_relstorages_unlocked = true; +} + +/* Prohibit changing the GUC value manually except several cases. + * This is not called for RESET, so RESET is not guarded + */ +static bool +check_tracked(bool *newval, void **extra, GucSource source) +{ + if (IsInitProcessingMode() || Gp_role == GP_ROLE_EXECUTE || + (Gp_role == GP_ROLE_DISPATCH && is_tracked_unlocked)) + { + if (is_tracked_unlocked) + is_tracked_unlocked = false; + + if (source != PGC_S_DATABASE && source != PGC_S_DEFAULT && source != PGC_S_TEST) + return false; + + return true; + } + + GUC_check_errmsg("cannot change tracking status outside the tracking_register_db function"); + return false; +} + +/* Prohibit changing the GUC value manually except several cases. + * This is not called for RESET, so RESET is not guarded + */ +static bool +check_get_full_snapshot_on_recovery(bool *newval, void **extra, GucSource source) +{ + if (IsInitProcessingMode() || Gp_role == GP_ROLE_EXECUTE || + (Gp_role == GP_ROLE_DISPATCH && is_get_full_snapshot_on_recovery_unlocked)) + { + if (is_get_full_snapshot_on_recovery_unlocked) + is_get_full_snapshot_on_recovery_unlocked = false; + + if (source != PGC_S_DATABASE && source != PGC_S_DEFAULT && source != PGC_S_TEST) + return false; + + return true; + } + + GUC_check_errmsg("cannot change tracking status outside the tracking_set_snapshot_on_recovery function"); + return false; +} + +static bool +check_relkinds(char **newval, void **extra, GucSource source) +{ + if (IsInitProcessingMode() || Gp_role == GP_ROLE_EXECUTE || + (Gp_role == GP_ROLE_DISPATCH && is_relkinds_unlocked)) + { + if (is_relkinds_unlocked) + is_relkinds_unlocked = false; + + if (source != PGC_S_DATABASE && source != PGC_S_DEFAULT && source != PGC_S_TEST) + return false; + + return true; + } + + GUC_check_errmsg("cannot change tracking status outside the tracking_register_relkinds function"); + return false; +} + +static bool +check_schemas(char **newval, void **extra, GucSource source) +{ + if (IsInitProcessingMode() || Gp_role == GP_ROLE_EXECUTE || + (Gp_role == GP_ROLE_DISPATCH && is_schemas_unlocked)) + { + if (is_schemas_unlocked) + is_schemas_unlocked = false; + + if (source != PGC_S_DATABASE && source != PGC_S_DEFAULT && source != PGC_S_TEST) + return false; + + return true; + } + + GUC_check_errmsg("cannot change tracking status outside the tracking_register_schema function"); + return false; +} + +static bool +check_relstorages(char **newval, void **extra, GucSource source) +{ + if (IsInitProcessingMode() || Gp_role == GP_ROLE_EXECUTE || + (Gp_role == GP_ROLE_DISPATCH && is_relstorages_unlocked)) + { + if (is_relstorages_unlocked) + is_relstorages_unlocked = false; + + if (source != PGC_S_DATABASE && source != PGC_S_DEFAULT && source != PGC_S_TEST) + return false; + + return true; + } + + GUC_check_errmsg("cannot change tracking status outside the tracking_register_relstorages function"); + return false; +} + +void +tf_guc_define(void) +{ + DefineCustomIntVariable("arenadata_toolkit.tracking_bloom_size", + "Size of bloom filter in bytes for each tracked database", + NULL, + &bloom_size, + DEFAULT_BLOOM_SIZE, + MIN_BLOOM_SIZE, + MAX_BLOOM_SIZE, + PGC_POSTMASTER, + 0, + NULL, + NULL, + NULL + ); + + DefineCustomIntVariable("arenadata_toolkit.tracking_db_track_count", + "Count of tracked databases.", + NULL, + &db_track_count, + DEFAULT_DB_TRACK_COUNT, + MIN_DB_TRACK_COUNT, + MAX_DB_TRACK_COUNT, + PGC_POSTMASTER, + 0, + NULL, + NULL, + NULL + ); + + DefineCustomBoolVariable("arenadata_toolkit.tracking_is_db_tracked", + "Is current database tracked.", + NULL, + &is_tracked, + DEFAULT_IS_TRACKED, + PGC_SUSET, + 0, + &check_tracked, + NULL, + NULL); + + DefineCustomBoolVariable("arenadata_toolkit.tracking_snapshot_on_recovery", + "Return full snapshot at startup/recovery.", + NULL, + &get_full_snapshot_on_recovery, + DEFAULT_GET_FULL_SNAPSHOT_ON_RECOVERY, + PGC_SUSET, + 0, + &check_get_full_snapshot_on_recovery, + NULL, + NULL); + + DefineCustomIntVariable("arenadata_toolkit.tracking_drops_count", + "Count of max monitored drop events.", + NULL, + &drops_count, + DEFAULT_DROPS_COUNT, + MIN_DROPS_COUNT, + MAX_DROPS_COUNT, + PGC_POSTMASTER, + 0, + NULL, + NULL, + NULL); + + DefineCustomStringVariable("arenadata_toolkit.tracking_schemas", + "Tracked schema names.", + NULL, + &tracked_schemas, + DEFAULT_TRACKED_SCHEMAS, + PGC_SUSET, + 0, + &check_schemas, + NULL, + NULL); + + DefineCustomStringVariable("arenadata_toolkit.tracking_relstorages", + "Tracked relation storage types.", + NULL, + &tracked_rel_storages, + DEFAULT_TRACKED_REL_STORAGES, + PGC_SUSET, + 0, + &check_relstorages, + NULL, + NULL); + + DefineCustomStringVariable("arenadata_toolkit.tracking_relkinds", + "Tracked relation kinds.", + NULL, + &tracked_rel_kinds, + DEFAULT_TRACKED_REL_KINDS, + PGC_SUSET, + 0, + &check_relkinds, + NULL, + NULL); + + + DefineCustomIntVariable("arenadata_toolkit.tracking_worker_naptime_sec", + "Toolkit background worker nap time", + NULL, + &tracking_worker_naptime_sec, + DEFAULT_NAPTIME, + 1, + MAX_NAPTIME, + PGC_POSTMASTER, + 0, + NULL, + NULL, + NULL); +} diff --git a/gpcontrib/arenadata_toolkit/src/arenadata_toolkit_worker.c b/gpcontrib/arenadata_toolkit/src/arenadata_toolkit_worker.c new file mode 100644 index 000000000000..660102352438 --- /dev/null +++ b/gpcontrib/arenadata_toolkit/src/arenadata_toolkit_worker.c @@ -0,0 +1,450 @@ +#include "postgres.h" + +#include "access/xact.h" +#include "access/genam.h" +#include "access/heapam.h" +#include "catalog/pg_db_role_setting.h" +#include "catalog/pg_extension.h" +#include "catalog/indexing.h" +#include "cdb/cdbdisp_query.h" +#include "cdb/cdbdispatchresult.h" +#include "libpq-fe.h" +#include "postmaster/bgworker.h" +#include "storage/proc.h" +#include "storage/ipc.h" +#include "utils/snapmgr.h" +#include "utils/builtins.h" +#include "utils/fmgroids.h" + +#include "arenadata_toolkit_worker.h" +#include "arenadata_toolkit_guc.h" +#include "tf_shmem.h" + +#define EXTENSIONNAME "arenadata_toolkit" + +typedef struct +{ + Oid dbid; + bool get_full_snapshot_on_recovery; +} tracked_db_t; + +static BackgroundWorker worker; + +/* flags set by signal handlers */ +static volatile sig_atomic_t got_sighup = false; +static volatile sig_atomic_t got_sigterm = false; +static volatile sig_atomic_t got_sigusr1 = false; + +/* parse array of GUCs, find desired and analyze it */ +static bool +is_db_tracked(ArrayType *array) +{ + bool is_tracked = false; + Datum *elems; + bool *nulls; + int nelems; + + deconstruct_array(array, TEXTOID, -1, false, 'i', + &elems, &nulls, &nelems); + for (int i = 0; i < nelems; i++) + { + char *s; + char *name; + char *value; + + if (nulls[i]) + continue; + + s = TextDatumGetCString(elems[i]); + ParseLongOption(s, &name, &value); + + if (!value) + { + free(name); + continue; + } + + if (strcmp(name, "arenadata_toolkit.tracking_is_db_tracked") == 0 && + strcmp(value, "t") == 0) + { + is_tracked = true; + break; + } + + free(name); + if (value) + free(value); + pfree(s); + } + + return is_tracked; +} + +static bool +full_snapshot_on_recovery(ArrayType *array) +{ + bool take_snapshot = false; + bool found = false; + Datum *elems; + bool *nulls; + int nelems; + + deconstruct_array(array, TEXTOID, -1, false, 'i', + &elems, &nulls, &nelems); + + for (int i = 0; i < nelems; i++) + { + char *s; + char *name; + char *value; + + if (nulls[i]) + continue; + + s = TextDatumGetCString(elems[i]); + ParseLongOption(s, &name, &value); + + if (!value) + { + free(name); + continue; + } + + if (strcmp(name, "arenadata_toolkit.tracking_snapshot_on_recovery") == 0) + { + found = true; + if (strcmp(value, "t") == 0) + take_snapshot = true; + break; + } + + free(name); + if (value) + free(value); + pfree(s); + } + + if (!found) + take_snapshot = get_full_snapshot_on_recovery; + + return take_snapshot; +} + +static List * +get_uninitialized_segments() +{ + int i; + CdbPgResults cdb_pgresults = {NULL, 0}; + List *list = NIL; + + CdbDispatchCommand("select * from arenadata_toolkit.tracking_is_segment_initialized()", 0, &cdb_pgresults); + + for (i = 0; i < cdb_pgresults.numResults; i++) + { + struct pg_result *pgresult = cdb_pgresults.pg_results[i]; + + if (PQresultStatus(pgresult) != PGRES_TUPLES_OK) + { + cdbdisp_clearCdbPgResults(&cdb_pgresults); + elog(ERROR, "is_initialized: resultStatus not tuples_Ok: %s %s", + PQresStatus(PQresultStatus(pgresult)), PQresultErrorMessage(pgresult)); + } + else + { + int32 segindex = 0; + bool is_initialized = false; + + segindex = atoi(PQgetvalue(pgresult, 0, 0)); + is_initialized = strcmp(PQgetvalue(pgresult, 0, 1), "t") == 0; + + elog(LOG, "get_uninitialized_segments, segindex: %d, is_initialized: %d", segindex, is_initialized); + + if (!is_initialized) + list = lappend_int(list, segindex); + } + } + + cdbdisp_clearCdbPgResults(&cdb_pgresults); + + return list; +} + +/* + * Signal handler for SIGTERM + * Set a flag to let the main loop to terminate, and set our latch to wake + * it up. + */ +static void +tracking_sigterm(SIGNAL_ARGS) +{ + int save_errno = errno; + + got_sigterm = true; + if (MyProc) + SetLatch(&MyProc->procLatch); + + errno = save_errno; +} + +/* + * Signal handler for SIGHUP + * Set a flag to tell the main loop to reread the config file, and set + * our latch to wake it up. + */ +static void +tracking_sighup(SIGNAL_ARGS) +{ + int save_errno = errno; + + got_sighup = true; + if (MyProc) + SetLatch(&MyProc->procLatch); + + errno = save_errno; +} + +/* + * Signal handler for SIGUSR1 + * Set a flag to tell the launcher to handle extension ddl message + */ +static void +tracking_sigusr1(SIGNAL_ARGS) +{ + int save_errno = errno; + + got_sigusr1 = true; + if (MyProc) + SetLatch(&MyProc->procLatch); + + errno = save_errno; +} + +static bool +extension_created() +{ + bool exists = false; + Relation rel; + SysScanDesc scandesc; + HeapTuple tuple; + ScanKeyData entry[1]; + + rel = heap_open(ExtensionRelationId, AccessShareLock); + + ScanKeyInit(&entry[0], + Anum_pg_extension_extname, + BTEqualStrategyNumber, F_NAMEEQ, + CStringGetDatum(EXTENSIONNAME)); + + scandesc = systable_beginscan(rel, ExtensionNameIndexId, true, + NULL, 1, entry); + + tuple = systable_getnext(scandesc); + + exists = HeapTupleIsValid(tuple); + + systable_endscan(scandesc); + heap_close(rel, AccessShareLock); + + return exists; +} + +static void +dispatch_register_to_master(List *dbids) +{ + ListCell *cell; + tracked_db_t *trackedDb; + + foreach(cell, dbids) + { + trackedDb = (tracked_db_t *) lfirst(cell); + + bloom_set_bind(&tf_shared_state->bloom_set, trackedDb->dbid); + bloom_set_trigger_bits(&tf_shared_state->bloom_set, trackedDb->dbid, + trackedDb->get_full_snapshot_on_recovery); + } + + LWLockAcquire(tf_shared_state->bloom_set.lock, LW_EXCLUSIVE); + tf_shared_state->is_initialized = true; + LWLockRelease(tf_shared_state->bloom_set.lock); +} + +static void +dispatch_register_to_segments(List *dbids, List *uninitialized_segments) +{ + ListCell *cell; + tracked_db_t *trackedDb; + CdbPgResults cdb_pgresults = {NULL, 0}; + + if (uninitialized_segments == NIL) + return; + + foreach(cell, dbids) + { + trackedDb = (tracked_db_t *) lfirst(cell); + + char *cmd = psprintf("select arenadata_toolkit.tracking_register_db(%u)", trackedDb->dbid); + + CdbDispatchCommandToSegments(cmd, + 0, + uninitialized_segments, + &cdb_pgresults); + + if (trackedDb->get_full_snapshot_on_recovery) + { + cmd = psprintf("select arenadata_toolkit.tracking_trigger_initial_snapshot(%u)", trackedDb->dbid); + + CdbDispatchCommandToSegments(cmd, + 0, + uninitialized_segments, + &cdb_pgresults); + } + } +} + +static void +dispatch_register(bool dispatch_to_master, List *uninitialized_segments) +{ + Relation rel; + SysScanDesc scan; + HeapTuple tup; + List *dbids = NIL; + tracked_db_t *trackedDb; + + rel = heap_open(DbRoleSettingRelationId, RowExclusiveLock); + scan = systable_beginscan(rel, InvalidOid, false, NULL, 0, NULL); + while (HeapTupleIsValid(tup = systable_getnext(scan))) + { + bool isnull; + Datum str_datum; + Datum oid_datum; + ArrayType *a; + + str_datum = heap_getattr(tup, Anum_pg_db_role_setting_setconfig, + RelationGetDescr(rel), &isnull); + if (isnull) + continue; + + oid_datum = heap_getattr(tup, Anum_pg_db_role_setting_setrole, + RelationGetDescr(rel), &isnull); + if (DatumGetObjectId(oid_datum) != InvalidOid) + continue; + + oid_datum = heap_getattr(tup, Anum_pg_db_role_setting_setdatabase, + RelationGetDescr(rel), &isnull); + if (DatumGetObjectId(oid_datum) == InvalidOid) + continue; + + a = DatumGetArrayTypeP(str_datum); + + if (is_db_tracked(a)) + { + trackedDb = (tracked_db_t *) palloc0(sizeof(tracked_db_t)); + + trackedDb->dbid = DatumGetObjectId(oid_datum); + trackedDb->get_full_snapshot_on_recovery = full_snapshot_on_recovery(a); + dbids = lappend(dbids, trackedDb); + } + } + + systable_endscan(scan); + heap_close(rel, RowExclusiveLock); + + if (dbids != NIL) + { + ListCell *cell; + + if (dispatch_to_master) + dispatch_register_to_master(dbids); + + dispatch_register_to_segments(dbids, uninitialized_segments); + + foreach(cell, dbids) + { + pfree(lfirst(cell)); + } + + list_free(dbids); + } + + LWLockAcquire(tf_shared_state->bloom_set.lock, LW_EXCLUSIVE); + tf_shared_state->bgworker_ready = true; + LWLockRelease(tf_shared_state->bloom_set.lock); +} + +/* scan pg_db_role_setting, find all databases, bind blooms if necessary */ +static void +arenadata_toolkit_worker(Datum main_arg) +{ + elog(LOG, "[arenadata toolkit] Starting background worker"); + + bool master_initialized = false; + + pqsignal(SIGHUP, tracking_sighup); + pqsignal(SIGTERM, tracking_sigterm); + pqsignal(SIGUSR1, tracking_sigusr1); + + BackgroundWorkerUnblockSignals(); + + BackgroundWorkerInitializeConnection(DB_FOR_COMMON_ACCESS, NULL); + + while (!got_sigterm) + { + int rc; + List *uninitialized_segments; + + CHECK_FOR_INTERRUPTS(); + + StartTransactionCommand(); + + if (extension_created()) + { + elog(LOG, "[arenadata toolkit] Getting uninitialized segments"); + uninitialized_segments = get_uninitialized_segments(uninitialized_segments); + + if (!master_initialized || list_length(uninitialized_segments) > 0) + { + elog(LOG, "Dispatching register to segments"); + dispatch_register(!master_initialized, uninitialized_segments); + list_free(uninitialized_segments); + uninitialized_segments = NIL; + master_initialized = true; + } + } + CommitTransactionCommand(); + + rc = WaitLatch(&MyProc->procLatch, WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH, + tracking_worker_naptime_sec * 1000); + ResetLatch(&MyProc->procLatch); + + /* Emergency bailout if postmaster has died */ + if (rc & WL_POSTMASTER_DEATH) + { + ereport(LOG, (errmsg("[arenadata toolkit] bgworker is being terminated by postmaster death."))); + proc_exit(1); + } + + if (got_sighup) + { + got_sighup = false; + } + } + + if (got_sigterm) + ereport(LOG, (errmsg("[arenadata toolkit] stop worker process"))); + + proc_exit(0); +} + +void +arenadata_toolkit_worker_register(void) +{ + worker.bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION; + worker.bgw_start_time = BgWorkerStart_ConsistentState; + worker.bgw_restart_time = BGW_DEFAULT_RESTART_INTERVAL; + worker.bgw_main = arenadata_toolkit_worker; + worker.bgw_notify_pid = 0; + worker.bgw_start_rule = NULL; + sprintf(worker.bgw_name, "arenadata_toolkit"); + + RegisterBackgroundWorker(&worker); +} diff --git a/gpcontrib/arenadata_toolkit/src/bloom.c b/gpcontrib/arenadata_toolkit/src/bloom.c new file mode 100644 index 000000000000..b1e24acc3f92 --- /dev/null +++ b/gpcontrib/arenadata_toolkit/src/bloom.c @@ -0,0 +1,80 @@ +/* + * Simple bloom filter without using postgres primitives. + */ +#include "bloom.h" + +#include + +bloom_t * +bloom_init(const uint32_t bloom_size, void *mem) +{ + bloom_t *bloom = mem; + + bloom->size = bloom_size; + bloom_clear(bloom); + + return bloom; +} + +static uint32_t +calc_idx(bloom_t * bloom, uint64_t hash, uint8_t *bit_idx) +{ + uint64_t bloom_bit_idx = hash % (8 * bloom->size); + + *bit_idx = bloom_bit_idx % 8; + + return bloom_bit_idx / 8; +} + +int +bloom_isset(bloom_t * bloom, uint64_t hash) +{ + uint8_t bit_idx; + uint32_t byte_idx = calc_idx(bloom, hash, &bit_idx); + + return bloom->map[byte_idx] & (1 << bit_idx); +} + +void +bloom_set(bloom_t * bloom, uint64_t hash) +{ + uint8_t bit_idx; + uint32_t byte_idx = calc_idx(bloom, hash, &bit_idx); + + bloom->map[byte_idx] |= (1 << bit_idx); +} + +void +bloom_set_all(bloom_t * bloom) +{ + memset(bloom->map, 0xFF, bloom->size); + bloom->is_set_all = 1; +} + +void +bloom_clear(bloom_t * bloom) +{ + memset(bloom->map, 0, bloom->size); + bloom->is_set_all = 0; +} + +void +bloom_merge(bloom_t * dst, bloom_t * src) +{ + for (uint32_t i = 0; i < dst->size; i++) + dst->map[i] |= src->map[i]; + if (src->is_set_all) + dst->is_set_all = src->is_set_all; +} + +bloom_t * +bloom_copy(bloom_t * bloom, void *mem) +{ + bloom_t *copy; + + copy = bloom_init(bloom->size, mem); + memcpy(copy->map, bloom->map, bloom->size); + copy->is_set_all = bloom->is_set_all; + + return copy; +} diff --git a/gpcontrib/arenadata_toolkit/src/bloom_set.c b/gpcontrib/arenadata_toolkit/src/bloom_set.c new file mode 100644 index 000000000000..66e7907ea6c7 --- /dev/null +++ b/gpcontrib/arenadata_toolkit/src/bloom_set.c @@ -0,0 +1,280 @@ +/* + * Set of blooms. Main entry point to find a bloom and work with it. + * Used to track create, extend, truncate events. + */ + +#include "bloom_set.h" + +#include + +#define BLOOM_ENTRY_GET(set, i) (void *)(set->bloom_entries + i * FULL_BLOOM_ENTRY_SIZE(set->bloom_size)); + +static bloom_entry_t * bloom_entry_init(const uint32_t bloom_size, void *mem) +{ + bloom_entry_t *bloom_entry = mem; + + bloom_entry->lock = LWLockAssign(); + bloom_entry->dbid = InvalidOid; + (void)bloom_init(bloom_size, &bloom_entry->bloom); + + return bloom_entry; +} + +bloom_set_t * +bloom_set_init(const uint32_t bloom_count, const uint32_t bloom_size, void *mem) +{ + bloom_set_t *bloom_set = mem; + + bloom_set->bloom_count = bloom_count; + bloom_set->bloom_size = bloom_size; + bloom_set->lock = LWLockAssign(); + + for (uint32_t i = 0; i < bloom_count; i++) + { + void *bloom_entry_mem = BLOOM_ENTRY_GET(bloom_set, i); + + (void)bloom_entry_init(bloom_size, bloom_entry_mem); + } + + return bloom_set; +} + +/* simple linear search, probably should be reworked (depends on target dbs count) */ +static bloom_entry_t * find_bloom_entry(bloom_set_t * bloom_set, Oid dbid, bool *found) +{ + bloom_entry_t *bloom_entry; + int i; + + *found = false; + + for (i = 0; i < bloom_set->bloom_count; i++) + { + bloom_entry = BLOOM_ENTRY_GET(bloom_set, i); + if (bloom_entry->dbid == dbid || bloom_entry->dbid == InvalidOid) + break; + } + + if (i == bloom_set->bloom_count) + return NULL; + + if (bloom_entry->dbid != InvalidOid) + *found = true; + + return bloom_entry; +} + +/* bind not used filter to given dbid */ +bool +bloom_set_bind(bloom_set_t * bloom_set, Oid dbid) +{ + bloom_entry_t *bloom_entry; + bool found; + + LWLockAcquire(bloom_set->lock, LW_SHARED); + bloom_entry = find_bloom_entry(bloom_set, dbid, &found); + LWLockRelease(bloom_set->lock); + + if (found) + return true; + else if (!bloom_entry) + { + return false; + } + + LWLockAcquire(bloom_set->lock, LW_EXCLUSIVE); + bloom_entry = find_bloom_entry(bloom_set, dbid, &found); + if (bloom_entry && !found) + LWLockAcquire(bloom_entry->lock, LW_EXCLUSIVE); + LWLockRelease(bloom_set->lock); + + if (!bloom_entry) + { + elog(WARNING, "Our bloom filter was stolen :("); + return false; + } + + if (!found) + { + bloom_entry->dbid = dbid; + LWLockRelease(bloom_entry->lock); + elog(DEBUG1, "Bloom binded %d", dbid); + } + + + return true; +} + +bool +bloom_set_trigger_bits(bloom_set_t * bloom_set, Oid dbid, bool on) +{ + bloom_entry_t *bloom_entry; + bool found; + + LWLockAcquire(bloom_set->lock, LW_SHARED); + { + bloom_entry = find_bloom_entry(bloom_set, dbid, &found); + } + LWLockRelease(bloom_set->lock); + + if (!found) + { + elog(LOG, "[arenadata toolkit] tracking_initial_snapshot Bloom filter not found"); + return false; + } + + LWLockAcquire(bloom_entry->lock, LW_EXCLUSIVE); + { + if (on) + bloom_set_all(&bloom_entry->bloom); + else + bloom_clear(&bloom_entry->bloom); + } + LWLockRelease(bloom_entry->lock); + return true; +} + +/* unbind used filter by given dbid */ +void +bloom_set_unbind(bloom_set_t * bloom_set, Oid dbid) +{ + bloom_entry_t *bloom_entry; + bool found; + + LWLockAcquire(bloom_set->lock, LW_SHARED); + bloom_entry = find_bloom_entry(bloom_set, dbid, &found); + LWLockRelease(bloom_set->lock); + + if (!found) + return; + + LWLockAcquire(bloom_set->lock, LW_EXCLUSIVE); + bloom_entry = find_bloom_entry(bloom_set, dbid, &found); + if (bloom_entry && found) + LWLockAcquire(bloom_entry->lock, LW_EXCLUSIVE); + LWLockRelease(bloom_set->lock); + + if (found) + { + bloom_entry->dbid = InvalidOid; + bloom_clear(&bloom_entry->bloom); + LWLockRelease(bloom_entry->lock); + elog(DEBUG1, "Bloom unbinded %d", dbid); + } +} + +uint64_t +bloom_set_calc_hash(const void *buf, size_t len) +{ + struct wide_hash + { + uint64_t i1; + uint64_t i2; + }; + struct wide_hash w_hash; + bool hash_res = pg_md5_binary(buf, len, &w_hash); + + Assert(hash_res); + + return w_hash.i1 ^ w_hash.i2; +} + +/* find bloom by dbid, set bit based on relNode hash */ +void +bloom_set_set(bloom_set_t * bloom_s, Oid dbid, Oid relNode) +{ + bloom_entry_t *bloom_entry; + bool found; + uint64_t hash; + + LWLockAcquire(bloom_s->lock, LW_SHARED); + bloom_entry = find_bloom_entry(bloom_s, dbid, &found); + if (found) + LWLockAcquire(bloom_entry->lock, LW_EXCLUSIVE); + LWLockRelease(bloom_s->lock); + + if (!found) + return; + + hash = bloom_set_calc_hash(&relNode, sizeof(relNode)); + bloom_set(&bloom_entry->bloom, hash); + LWLockRelease(bloom_entry->lock); + + elog(DEBUG1, "Bloom set %d %d", dbid, relNode); +} + +/* find bloom by dbid, copy all bytes to new filter, clear old (but keep it) */ +bloom_t * +bloom_set_move(bloom_set_t * bloom_set, Oid dbid, void *mem) +{ + bloom_entry_t *bloom_entry; + bool found; + bloom_t *copy; + + LWLockAcquire(bloom_set->lock, LW_SHARED); + bloom_entry = find_bloom_entry(bloom_set, dbid, &found); + if (found) + LWLockAcquire(bloom_entry->lock, LW_EXCLUSIVE); + LWLockRelease(bloom_set->lock); + + /* no bloom for the database */ + if (!found) + return NULL; + + copy = bloom_copy(&bloom_entry->bloom, mem); + bloom_clear(&bloom_entry->bloom); + LWLockRelease(bloom_entry->lock); + + elog(DEBUG1, "Bloom moved %d", dbid); + + return copy; +} + +/* find bloom by dbid, merge bytes from another bloom to it */ +bool +bloom_set_merge(bloom_set_t * bloom_set, Oid dbid, bloom_t * m_bloom) +{ + bloom_entry_t *bloom_entry; + bool found; + + if (!m_bloom) + return false; + + LWLockAcquire(bloom_set->lock, LW_SHARED); + bloom_entry = find_bloom_entry(bloom_set, dbid, &found); + if (found) + LWLockAcquire(bloom_entry->lock, LW_EXCLUSIVE); + LWLockRelease(bloom_set->lock); + + if (!found) + return false; + + bloom_merge(&bloom_entry->bloom, m_bloom); + LWLockRelease(bloom_entry->lock); + + elog(DEBUG1, "Bloom merged %d", dbid); + + return true; +} + +bool +bloom_set_is_all_bits_triggered(bloom_set_t * bloom_set, Oid dbid) +{ + bloom_entry_t *bloom_entry; + bool found; + bool is_triggered; + + LWLockAcquire(bloom_set->lock, LW_SHARED); + bloom_entry = find_bloom_entry(bloom_set, dbid, &found); + LWLockRelease(bloom_set->lock); + + if (!found) + { + return false; + } + + LWLockAcquire(bloom_entry->lock, LW_SHARED); + is_triggered = bloom_entry->bloom.is_set_all; + LWLockRelease(bloom_entry->lock); + + return is_triggered; +} diff --git a/gpcontrib/arenadata_toolkit/arenadata_toolkit.c b/gpcontrib/arenadata_toolkit/src/dbsize.c similarity index 85% rename from gpcontrib/arenadata_toolkit/arenadata_toolkit.c rename to gpcontrib/arenadata_toolkit/src/dbsize.c index 11a5bd9868ff..9731f744f44d 100644 --- a/gpcontrib/arenadata_toolkit/arenadata_toolkit.c +++ b/gpcontrib/arenadata_toolkit/src/dbsize.c @@ -1,30 +1,27 @@ -#include #include -#include -#include #include "postgres.h" #include "access/aomd.h" #include "access/heapam.h" +#include "catalog/pg_tablespace.h" #include "cdb/cdbvars.h" #include "common/relpath.h" #include "fmgr.h" #include "funcapi.h" +#include "libpq/hba.h" #include "miscadmin.h" -#include "storage/fd.h" #include "nodes/execnodes.h" -#include "cdb/cdbvars.h" -#include "libpq/hba.h" +#include "storage/fd.h" +#include "storage/lock.h" #include "utils/builtins.h" #include "utils/relfilenodemap.h" #include "utils/timestamp.h" #include "utils/elog.h" #include "utils/rel.h" #include "utils/relcache.h" -#include "catalog/pg_tablespace.h" -#include "storage/lock.h" +#include "dbsize.h" PG_MODULE_MAGIC; /* @@ -35,11 +32,11 @@ PG_MODULE_MAGIC; static int64 calculate_relation_size(Relation rel, ForkNumber forknum); static int64 get_heap_storage_total_bytes(Relation rel, - ForkNumber forknum, char *relpath); + ForkNumber forknum, char *relpath); static int64 get_ao_storage_total_bytes(Relation rel, char *relpath); static bool calculate_ao_storage_perSegFile(const int segno, void *ctx); static void fill_relation_seg_path(char *buf, int bufLen, - const char *relpath, int segNo); + const char *relpath, int segNo); static int64 calculate_toast_table_size(Oid toastrelid, ForkNumber forknum); /* @@ -162,8 +159,8 @@ calculate_ao_storage_perSegFile(const int segno, void *ctx) static int64 calculate_toast_table_size(Oid toastrelid, ForkNumber forknum) { - Relation toastRel = relation_open(toastrelid, AccessShareLock); - int64 size = calculate_relation_size(toastRel, forknum); + Relation toastRel = relation_open(toastrelid, AccessShareLock); + int64 size = calculate_relation_size(toastRel, forknum); relation_close(toastRel, AccessShareLock); return size; @@ -223,7 +220,7 @@ get_ao_storage_total_bytes(Relation rel, char *relpath) * operations (for ex: CTAS) zero segment will store tuples). Thus * calculate segno=0 manually. */ - (void) calculate_ao_storage_perSegFile(0, &ctx); + (void)calculate_ao_storage_perSegFile(0, &ctx); ao_foreach_extent_file(calculate_ao_storage_perSegFile, &ctx); return ctx.total_size; @@ -231,37 +228,41 @@ get_ao_storage_total_bytes(Relation rel, char *relpath) typedef struct { - char *datpath; - DIR *dirdesc; - TupleDesc tupdesc; -} user_fctx_data; + char *datpath; + DIR *dirdesc; + TupleDesc tupdesc; +} user_fctx_data; /* * Name of file must be "XXX.X" or "XXX" * where XXX is Oid. OID must be not more than OID_MAX. */ -static Oid get_oid_from_filename(const char *filename) +static Oid +get_oid_from_filename(const char *filename) { - unsigned long int oid, segment; - char trailer; + unsigned long int oid, + segment; + char trailer; + + int count = sscanf(filename, "%lu.%lu%c", &oid, &segment, &trailer); - int count = sscanf(filename, "%lu.%lu%c", &oid, &segment, &trailer); if (count < 1 || count > 2) return InvalidOid; if (oid > OID_MAX) return InvalidOid; - return (Oid) oid; + return (Oid)oid; } PG_FUNCTION_INFO_V1(adb_get_relfilenodes); -Datum adb_get_relfilenodes(PG_FUNCTION_ARGS) +Datum +adb_get_relfilenodes(PG_FUNCTION_ARGS) { - Oid datoid = MyDatabaseId; - Oid tablespace_oid = PG_GETARG_OID(0); + Oid datoid = MyDatabaseId; + Oid tablespace_oid = PG_GETARG_OID(0); - struct dirent *direntry; - user_fctx_data *fctx_data; + struct dirent *direntry; + user_fctx_data *fctx_data; FuncCallContext *funcctx; if (tablespace_oid == GLOBALTABLESPACE_OID) @@ -280,13 +281,14 @@ Datum adb_get_relfilenodes(PG_FUNCTION_ARGS) if (!fctx_data->dirdesc) { - /* Nothing to do: empty tablespace (maybe it has been just created)*/ + /* Nothing to do: empty tablespace (maybe it has been just + * created) */ MemoryContextSwitchTo(oldcontext); SRF_RETURN_DONE(funcctx); } if (get_call_result_type(fcinfo, NULL, &fctx_data->tupdesc) - != TYPEFUNC_COMPOSITE) + != TYPEFUNC_COMPOSITE) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("return type must be a row type"))); @@ -303,12 +305,12 @@ Datum adb_get_relfilenodes(PG_FUNCTION_ARGS) while ((direntry = ReadDir(fctx_data->dirdesc, fctx_data->datpath)) != NULL) { struct stat fst; - Datum values[10]; - bool nulls[10]; - char *filename; - Oid reloid; - Oid relfilenode_oid; - HeapTuple tuple; + Datum values[10]; + bool nulls[10]; + char *filename; + Oid reloid; + Oid relfilenode_oid; + HeapTuple tuple; CHECK_FOR_INTERRUPTS(); @@ -366,7 +368,33 @@ Datum adb_get_relfilenodes(PG_FUNCTION_ARGS) } PG_FUNCTION_INFO_V1(adb_hba_file_rules); -Datum adb_hba_file_rules(PG_FUNCTION_ARGS) +Datum +adb_hba_file_rules(PG_FUNCTION_ARGS) { return pg_hba_file_rules(fcinfo); } + +/* */ +int64 +dbsize_calc_size(Oid relid) +{ + Relation rel; + int64 size = 0; + + rel = try_relation_open(relid, AccessShareLock, false); + + if (rel == NULL) + return size; + + if (rel->rd_node.relNode == 0) + return size; + + size += calculate_relation_size(rel, MAIN_FORKNUM); + size += calculate_relation_size(rel, FSM_FORKNUM); + size += calculate_relation_size(rel, VISIBILITYMAP_FORKNUM); + size += calculate_relation_size(rel, INIT_FORKNUM); + + relation_close(rel, AccessShareLock); + + return size; +} diff --git a/gpcontrib/arenadata_toolkit/src/drops_track.c b/gpcontrib/arenadata_toolkit/src/drops_track.c new file mode 100644 index 000000000000..b54f7ce37e4f --- /dev/null +++ b/gpcontrib/arenadata_toolkit/src/drops_track.c @@ -0,0 +1,223 @@ +/* + * Track unlink hook events. + */ + +#include "drops_track.h" + +#include "lib/ilist.h" +#include "storage/lwlock.h" +#include "storage/ipc.h" +#include "storage/shmem.h" + +#include "arenadata_toolkit_guc.h" + +#define TRACK_NODE_GET(track, i) (void *)(track->nodes + i * sizeof(drops_track_node_t)); + +typedef struct +{ + Oid relNode; + Oid dbNode; +} track_relfilenode_t; + +/* doubly linked list node of dropped file nodes */ +typedef struct +{ + dlist_node node; + uint32_t idx; /* idx in 'nodes' array; just for info */ + track_relfilenode_t relfileNode; +} drops_track_node_t; + + +typedef struct +{ + LWLock *lock; + dlist_head head; + uint32_t used_count; /* count of used nodes */ + int unused_idx; /* next unused idx or -1 if unknown; for + * faster search */ + char nodes[FLEXIBLE_ARRAY_MEMBER]; /* array of drops_track_node_t */ +} drops_track_t; + +static shmem_startup_hook_type next_shmem_startup_hook = NULL; +static drops_track_t * drops_track; + +static Size +drops_track_calc_size() +{ + Size size; + + size = offsetof(drops_track_t, nodes); + size = add_size(size, mul_size(drops_count, sizeof(drops_track_node_t))); + + return size; +} + +static void +drops_track_hook(void) +{ + bool found; + Size size = drops_track_calc_size(); + + drops_track = ShmemInitStruct("adb_track_files_drops", size, &found); + + if (!found) + { + drops_track->lock = LWLockAssign(); + drops_track->used_count = 0; + drops_track->unused_idx = 0; + dlist_init(&drops_track->head); + + for (uint32_t i = 0; i < drops_count; i++) + { + drops_track_node_t *track_node = TRACK_NODE_GET(drops_track, i); + + track_node->relfileNode.relNode = InvalidOid; + track_node->relfileNode.dbNode = InvalidOid; + track_node->idx = i; + } + } + + if (next_shmem_startup_hook) + next_shmem_startup_hook(); +} + +void +drops_track_init(void) +{ + RequestAddinLWLocks(1); + RequestAddinShmemSpace(drops_track_calc_size()); + + next_shmem_startup_hook = shmem_startup_hook; + shmem_startup_hook = drops_track_hook; +} + +void +drops_track_deinit(void) +{ + shmem_startup_hook = next_shmem_startup_hook; +} + +/* find unused node; this should be heavily reworked or optimized */ +static drops_track_node_t * find_empty_node() +{ + drops_track_node_t *track_node = NULL; + + if (drops_track->unused_idx >= 0) + { + track_node = TRACK_NODE_GET(drops_track, drops_track->unused_idx); + drops_track->unused_idx++; + if (drops_track->unused_idx >= drops_count) + drops_track->unused_idx = -1; + else + { + drops_track_node_t *unused_node = TRACK_NODE_GET(drops_track, drops_track->unused_idx); + + if (unused_node->relfileNode.relNode != InvalidOid) + drops_track->unused_idx = -1; + } + } + else + { + for (uint32_t i = 0; i < drops_count; i++) + { + track_node = TRACK_NODE_GET(drops_track, i); + if (track_node->relfileNode.relNode == InvalidOid) + break; + } + } + return track_node; +} + +/* add relNode to track; old node is dropped if no space */ +void +drops_track_add(RelFileNode relfileNode) +{ + drops_track_node_t *track_node; + + LWLockAcquire(drops_track->lock, LW_EXCLUSIVE); + + if (drops_track->used_count >= drops_count) + { + track_node = (drops_track_node_t *) dlist_pop_head_node(&drops_track->head); + elog(DEBUG1, "No space for drop track. Oldest node removed (%d).", track_node->relfileNode.relNode); + } + else + { + track_node = find_empty_node(); + drops_track->used_count++; + Assert(track_node); + } + + track_node->relfileNode.relNode = relfileNode.relNode; + track_node->relfileNode.dbNode = relfileNode.dbNode; + dlist_push_tail(&drops_track->head, &track_node->node); + + LWLockRelease(drops_track->lock); +} + +/* move relfilenodes from track to list */ +List * +drops_track_move(Oid dbid) +{ + List *oids = NIL; + dlist_mutable_iter iter; + + LWLockAcquire(drops_track->lock, LW_EXCLUSIVE); + + if (drops_track->used_count == 0) + { + LWLockRelease(drops_track->lock); + return oids; + } + + dlist_foreach_modify(iter, &drops_track->head) + { + drops_track_node_t *track_node = (drops_track_node_t *) iter.cur; + + /* newest in head, oldest in tail */ + if (track_node->relfileNode.dbNode == dbid) + { + oids = lcons_oid(track_node->relfileNode.relNode, oids); + drops_track->used_count--; + track_node->relfileNode.relNode = InvalidOid; + track_node->relfileNode.dbNode = InvalidOid; + dlist_delete(&track_node->node); + } + } + + LWLockRelease(drops_track->lock); + + return oids; +} + +/* undo moving of relfilenodes; old nodes are dropped if no space */ +void +drops_track_move_undo(List *oids, Oid dbid) +{ + ListCell *cell; + + if (oids == NIL) + return; + + LWLockAcquire(drops_track->lock, LW_EXCLUSIVE); + + foreach(cell, oids) + { + Oid oid = lfirst_oid(cell); + drops_track_node_t *track_node; + + if (drops_track->used_count >= drops_count) + { + elog(DEBUG1, "No space for move back. Oldest node removed (%d).", oid); + continue; + } + + track_node = find_empty_node(); + drops_track->used_count++; + track_node->relfileNode.relNode = oid; + track_node->relfileNode.dbNode = dbid; + dlist_push_head(&drops_track->head, &track_node->node); + } + + LWLockRelease(drops_track->lock); +} diff --git a/gpcontrib/arenadata_toolkit/src/file_hook.c b/gpcontrib/arenadata_toolkit/src/file_hook.c new file mode 100644 index 000000000000..c29042169358 --- /dev/null +++ b/gpcontrib/arenadata_toolkit/src/file_hook.c @@ -0,0 +1,147 @@ +/* + * File hooks to track events. + */ + +#include "file_hook.h" + +#include "postgres.h" +#include "storage/smgr.h" +#include "access/xact.h" +#include "catalog/namespace.h" +#include "utils/lsyscache.h" +#include "access/heapam.h" +#include "access/htup_details.h" +#include "catalog/objectaccess.h" + +#include "tf_shmem.h" +#include "arenadata_toolkit_guc.h" +#include "drops_track.h" + +static file_create_hook_type next_file_create_hook = NULL; +static file_extend_hook_type next_file_extend_hook = NULL; +static file_truncate_hook_type next_file_truncate_hook = NULL; +static file_unlink_hook_type next_file_unlink_hook = NULL; + +static bloom_t * non_committed_bloom = NULL; +static Oid non_committed_dbid = InvalidOid; + +static bool +is_file_node_trackable(RelFileNodeBackend * rnode) +{ + return !(rnode->node.dbNode == InvalidOid); +} + +static void +file_node_set(RelFileNodeBackend * rnode) +{ + if (!is_file_node_trackable(rnode)) + return; + + bloom_set_set(&tf_shared_state->bloom_set, rnode->node.dbNode, rnode->node.relNode); +} + +/* 'create' events stored in local bloom and merged only on commit, when changes are already in catalog */ +static void +xact_end_create_callback(XactEvent event, void *arg) +{ + if (event != XACT_EVENT_COMMIT && event != XACT_EVENT_ABORT) + return; + + elog(DEBUG1, "xact_end_create_callback"); + + if (event == XACT_EVENT_COMMIT) + bloom_set_merge(&tf_shared_state->bloom_set, non_committed_dbid, non_committed_bloom); + + pfree(non_committed_bloom); + non_committed_bloom = NULL; + non_committed_dbid = InvalidOid; +} + +static void +hook_create(RelFileNodeBackend rnode) +{ + uint64_t hash; + + if (next_file_create_hook) + next_file_create_hook(rnode); + + if (!is_file_node_trackable(&rnode)) + return; + + if (!non_committed_bloom) + { + non_committed_bloom = + MemoryContextAlloc(TopMemoryContext, FULL_BLOOM_SIZE(bloom_size)); + bloom_init(bloom_size, non_committed_bloom); + non_committed_dbid = rnode.node.dbNode; + RegisterXactCallbackOnce(xact_end_create_callback, NULL); + } + + elog(DEBUG1, "hook_create: %d %d %d %d", + rnode.backend, rnode.node.dbNode, rnode.node.spcNode, rnode.node.relNode); + + hash = bloom_set_calc_hash(&rnode.node.relNode, sizeof(rnode.node.relNode)); + bloom_set(non_committed_bloom, hash); + +} + +static void +hook_extend(RelFileNodeBackend rnode) +{ + if (next_file_extend_hook) + next_file_extend_hook(rnode); + + elog(DEBUG1, "hook_extend: %d %d %d %d", + rnode.backend, rnode.node.dbNode, rnode.node.spcNode, rnode.node.relNode); + + file_node_set(&rnode); +} + +static void +hook_truncate(RelFileNodeBackend rnode) +{ + if (next_file_truncate_hook) + next_file_truncate_hook(rnode); + + elog(DEBUG1, "hook_truncate: %d %d %d %d", + rnode.backend, rnode.node.dbNode, rnode.node.spcNode, rnode.node.relNode); + + file_node_set(&rnode); +} + +static void +hook_unlink(RelFileNodeBackend rnode) +{ + if (next_file_unlink_hook) + next_file_unlink_hook(rnode); + + elog(DEBUG1, "hook_unlink: %d %d %d %d", + rnode.backend, rnode.node.dbNode, rnode.node.spcNode, rnode.node.relNode); + + drops_track_add(rnode.node); +} + +void +file_hook_init() +{ + next_file_create_hook = file_create_hook; + file_create_hook = hook_create; + + next_file_extend_hook = file_extend_hook; + file_extend_hook = hook_extend; + + next_file_truncate_hook = file_truncate_hook; + file_truncate_hook = hook_truncate; + + next_file_unlink_hook = file_unlink_hook; + file_unlink_hook = hook_unlink; +} + +void +file_hook_deinit() +{ + file_create_hook = next_file_create_hook; + file_extend_hook = next_file_extend_hook; + file_truncate_hook = next_file_truncate_hook; + file_unlink_hook = next_file_unlink_hook; +} diff --git a/gpcontrib/arenadata_toolkit/src/include/arenadata_toolkit_guc.h b/gpcontrib/arenadata_toolkit/src/include/arenadata_toolkit_guc.h new file mode 100644 index 000000000000..155efe645fb5 --- /dev/null +++ b/gpcontrib/arenadata_toolkit/src/include/arenadata_toolkit_guc.h @@ -0,0 +1,22 @@ +#ifndef ARENADATA_TOOLKIT_GUC_H +#define ARENADATA_TOOLKIT_GUC_H + +#include "postgres.h" + +extern int bloom_size; +extern int db_track_count; +extern int drops_count; +extern bool get_full_snapshot_on_recovery; +extern char *tracked_schemas; +extern char *tracked_rel_storages; +extern char *tracked_rel_kinds; +extern int tracking_worker_naptime_sec; + +void tf_guc_unlock_tracked_once(void); +void tf_guc_unlock_full_snapshot_on_recovery_once(void); +void tf_guc_define(void); +void tf_guc_unlock_schemas_once(void); +void tf_guc_unlock_relkinds_once(void); +void tf_guc_unlock_relstorages_once(void); + +#endif /* ARENADATA_TOOLKIT_GUC_H */ diff --git a/gpcontrib/arenadata_toolkit/src/include/arenadata_toolkit_worker.h b/gpcontrib/arenadata_toolkit/src/include/arenadata_toolkit_worker.h new file mode 100644 index 000000000000..a07ba0ab75e3 --- /dev/null +++ b/gpcontrib/arenadata_toolkit/src/include/arenadata_toolkit_worker.h @@ -0,0 +1,6 @@ +#ifndef BLOOM_WORKER_H +#define BLOOM_WORKER_H + +void arenadata_toolkit_worker_register(void); + +#endif /* BLOOM_WORKER_H */ diff --git a/gpcontrib/arenadata_toolkit/src/include/bloom.h b/gpcontrib/arenadata_toolkit/src/include/bloom.h new file mode 100644 index 000000000000..598d044ad2ce --- /dev/null +++ b/gpcontrib/arenadata_toolkit/src/include/bloom.h @@ -0,0 +1,23 @@ +#ifndef BLOOM_H +#define BLOOM_H + +#include + +#define FULL_BLOOM_SIZE(size) (offsetof(bloom_t, map) + size) + +typedef struct +{ + uint32_t size; /* size in bytes of 'map' */ + int is_set_all; /* is all bits sets by bloom_set_all */ + char map[] /* filter itself, array of bytes */ ; +} bloom_t; + +bloom_t *bloom_init(const uint32_t bloom_size, void *mem); +int bloom_isset(bloom_t * bloom, uint64_t hash); +void bloom_set(bloom_t * bloom, uint64_t hash); +void bloom_set_all(bloom_t * bloom); +void bloom_clear(bloom_t * bloom); +void bloom_merge(bloom_t * dst, bloom_t * src); +bloom_t *bloom_copy(bloom_t * bloom, void *mem); + +#endif /* BLOOM_H */ diff --git a/gpcontrib/arenadata_toolkit/src/include/bloom_set.h b/gpcontrib/arenadata_toolkit/src/include/bloom_set.h new file mode 100644 index 000000000000..e2409d4b579e --- /dev/null +++ b/gpcontrib/arenadata_toolkit/src/include/bloom_set.h @@ -0,0 +1,41 @@ +#ifndef BLOOM_SET_H +#define BLOOM_SET_H + +#include "postgres.h" + +#include "storage/lwlock.h" + +#include "bloom.h" + +#define FULL_BLOOM_ENTRY_SIZE(size) (offsetof(bloom_entry_t, bloom) + FULL_BLOOM_SIZE(size)) +#define FULL_BLOOM_SET_SIZE(size, count) (offsetof(bloom_set_t, bloom_entries) + FULL_BLOOM_ENTRY_SIZE(size) * count) + +/* bloom filter extended by dbid */ +typedef struct +{ + LWLock *lock; + Oid dbid; /* dbid if binded, InvalidOid if unbinded */ + bloom_t bloom; +} bloom_entry_t; + +/* static set of all bloom filters */ +typedef struct +{ + LWLock *lock; + uint8_t bloom_count; /* count of bloom_entry_t in bloom_entries */ + uint32_t bloom_size; /* size of bloom filter */ + char bloom_entries[FLEXIBLE_ARRAY_MEMBER]; /* array of + * bloom_entry_t */ +} bloom_set_t; + +bloom_set_t *bloom_set_init(const uint32_t bloom_count, const uint32_t bloom_size, void *mem); +bool bloom_set_bind(bloom_set_t * bloom_set, Oid dbid); +void bloom_set_unbind(bloom_set_t * bloom_set, Oid dbid); +uint64_t bloom_set_calc_hash(const void *buf, size_t len); +void bloom_set_set(bloom_set_t * bloom_set, Oid dbid, Oid relNode); +bloom_t *bloom_set_move(bloom_set_t * bloom_set, Oid dbid, void *mem); +bool bloom_set_merge(bloom_set_t * bloom_set, Oid dbid, bloom_t * m_bloom); +bool bloom_set_trigger_bits(bloom_set_t * bloom_set, Oid dbid, bool on); +bool bloom_set_is_all_bits_triggered(bloom_set_t * bloom_set, Oid dbid); + +#endif /* BLOOM_SET_H */ diff --git a/gpcontrib/arenadata_toolkit/src/include/dbsize.h b/gpcontrib/arenadata_toolkit/src/include/dbsize.h new file mode 100644 index 000000000000..228cea9806db --- /dev/null +++ b/gpcontrib/arenadata_toolkit/src/include/dbsize.h @@ -0,0 +1,6 @@ +#ifndef DBSIZE_H +#define DBSIZE_H + +int64 dbsize_calc_size(Oid relid); + +#endif /* DBSIZE_H */ diff --git a/gpcontrib/arenadata_toolkit/src/include/drops_track.h b/gpcontrib/arenadata_toolkit/src/include/drops_track.h new file mode 100644 index 000000000000..e30c65f05632 --- /dev/null +++ b/gpcontrib/arenadata_toolkit/src/include/drops_track.h @@ -0,0 +1,16 @@ +#ifndef DROPS_TRACK_H +#define DROPS_TRACK_H + +#include "postgres.h" +#include "nodes/pg_list.h" +#include "storage/relfilenode.h" + +void drops_track_init(void); +void drops_track_deinit(void); + +void drops_track_add(RelFileNode relNode); +List *drops_track_move(Oid dbid); +void drops_track_move_undo(List *oids, Oid dbid); + + +#endif /* DROPS_TRACK_H */ diff --git a/gpcontrib/arenadata_toolkit/src/include/file_hook.h b/gpcontrib/arenadata_toolkit/src/include/file_hook.h new file mode 100644 index 000000000000..d5f4c3048b10 --- /dev/null +++ b/gpcontrib/arenadata_toolkit/src/include/file_hook.h @@ -0,0 +1,7 @@ +#ifndef FILE_HOOK_H +#define FILE_HOOK_H + +void file_hook_init(void); +void file_hook_deinit(void); + +#endif /* FILE_HOOK_H */ diff --git a/gpcontrib/arenadata_toolkit/src/include/tf_shmem.h b/gpcontrib/arenadata_toolkit/src/include/tf_shmem.h new file mode 100644 index 000000000000..98beae2abb68 --- /dev/null +++ b/gpcontrib/arenadata_toolkit/src/include/tf_shmem.h @@ -0,0 +1,18 @@ +#ifndef TF_SHMEM_H +#define TF_SHMEM_H + +#include "bloom_set.h" + +typedef struct +{ + bool bgworker_ready; /* is bgworker complete with its job */ + bool is_initialized; + bloom_set_t bloom_set; +} tf_shared_state_t; + +extern tf_shared_state_t * tf_shared_state; + +void tf_shmem_init(void); +void tf_shmem_deinit(void); + +#endif /* TF_SHMEM_H */ diff --git a/gpcontrib/arenadata_toolkit/src/tf_shmem.c b/gpcontrib/arenadata_toolkit/src/tf_shmem.c new file mode 100644 index 000000000000..9615c60b8682 --- /dev/null +++ b/gpcontrib/arenadata_toolkit/src/tf_shmem.c @@ -0,0 +1,56 @@ +#include "tf_shmem.h" + +#include "storage/ipc.h" +#include "storage/shmem.h" + +#include "arenadata_toolkit_guc.h" +#include "bloom_set.h" + +static shmem_startup_hook_type next_shmem_startup_hook = NULL; +tf_shared_state_t *tf_shared_state; + +static Size +tf_shmem_calc_size(void) +{ + Size size; + + size = offsetof(tf_shared_state_t, bloom_set); + size = add_size(size, FULL_BLOOM_SET_SIZE(bloom_size, db_track_count)); + + return size; +} + +static void +tf_shmem_hook(void) +{ + bool found; + Size size = tf_shmem_calc_size(); + + tf_shared_state = ShmemInitStruct("toolkit_track_files", size, &found); + + if (!found) + { + tf_shared_state->bgworker_ready = false; + bloom_set_init(db_track_count, bloom_size, &tf_shared_state->bloom_set); + } + + if (next_shmem_startup_hook) + next_shmem_startup_hook(); +} + +void +tf_shmem_init() +{ + /* don't forget to add additional locks */ + RequestAddinLWLocks(1 + db_track_count); + RequestAddinShmemSpace(tf_shmem_calc_size()); + + next_shmem_startup_hook = shmem_startup_hook; + shmem_startup_hook = tf_shmem_hook; +} + +void +tf_shmem_deinit(void) +{ + shmem_startup_hook = next_shmem_startup_hook; +} diff --git a/gpcontrib/arenadata_toolkit/src/track_files.c b/gpcontrib/arenadata_toolkit/src/track_files.c new file mode 100644 index 000000000000..ab54725a00e3 --- /dev/null +++ b/gpcontrib/arenadata_toolkit/src/track_files.c @@ -0,0 +1,1227 @@ +#include "postgres.h" + + +#include "access/genam.h" +#include "access/xact.h" +#include "catalog/indexing.h" +#include "catalog/pg_namespace.h" +#include "catalog/pg_db_role_setting.h" +#include "cdb/cdbdisp_query.h" +#include "cdb/cdbdispatchresult.h" +#include "cdb/cdbvars.h" +#include "cdb/cdbutil.h" +#include "commands/dbcommands.h" +#include "executor/spi.h" +#include "fmgr.h" +#include "funcapi.h" +#include "libpq-fe.h" +#include "miscadmin.h" +#include "storage/shmem.h" +#include "utils/relcache.h" +#include "utils/builtins.h" +#include "utils/memutils.h" +#include "utils/fmgroids.h" +#include "utils/syscache.h" +#include "utils/lsyscache.h" + +#include "arenadata_toolkit_guc.h" +#include "arenadata_toolkit_worker.h" +#include "drops_track.h" +#include "dbsize.h" +#include "file_hook.h" +#include "tf_shmem.h" + +PG_FUNCTION_INFO_V1(tracking_register_db); +PG_FUNCTION_INFO_V1(tracking_unregister_db); +PG_FUNCTION_INFO_V1(tracking_set_snapshot_on_recovery); +PG_FUNCTION_INFO_V1(tracking_register_schema); +PG_FUNCTION_INFO_V1(tracking_unregister_schema); +PG_FUNCTION_INFO_V1(tracking_set_relkinds); +PG_FUNCTION_INFO_V1(tracking_set_relstorages); +PG_FUNCTION_INFO_V1(tracking_is_segment_initialized); +PG_FUNCTION_INFO_V1(tracking_trigger_initial_snapshot); +PG_FUNCTION_INFO_V1(tracking_is_initial_snapshot_triggered); +PG_FUNCTION_INFO_V1(tracking_get_track); +PG_FUNCTION_INFO_V1(tracking_get_track_main); + +typedef struct +{ + Relation pg_class_rel; + SysScanDesc scan; +} tf_main_func_state_t; + +typedef struct +{ + bloom_t *bloom; + bloom_t *rollback_bloom; + List *drops; + ListCell *next_drop; + List *relkinds; + List *relstorages; + List *schema_oids; +} tf_get_global_state_t; + +typedef struct +{ + CdbPgResults cdb_results; + int current_result; + int current_row; + + SPITupleTable *entry_result; + uint64 entry_processed; + int entry_current_row; + + FmgrInfo *inputFuncInfos; + Oid *typIOParams; +} tf_get_func_state_t; + +tf_get_global_state_t tf_get_global_state = {NULL, NULL, NIL, NULL, NIL, NIL, NIL}; + +/* if get function complete with commit, just free resources; if with abort, move bloom and drops back */ +static void +xact_end_get_callback(XactEvent event, void *arg) +{ + if (event != XACT_EVENT_COMMIT && event != XACT_EVENT_ABORT) + return; + + if (tf_get_global_state.bloom == NULL) + return; + + if (event == XACT_EVENT_ABORT) + { + if (tf_get_global_state.rollback_bloom) + bloom_set_merge(&tf_shared_state->bloom_set, MyDatabaseId, tf_get_global_state.rollback_bloom); + else + bloom_set_merge(&tf_shared_state->bloom_set, MyDatabaseId, tf_get_global_state.bloom); + drops_track_move_undo(tf_get_global_state.drops, MyDatabaseId); + } + + if (tf_get_global_state.bloom) + { + pfree(tf_get_global_state.bloom); + tf_get_global_state.bloom = NULL; + } + + if (tf_get_global_state.rollback_bloom) + { + pfree(tf_get_global_state.rollback_bloom); + tf_get_global_state.rollback_bloom = NULL; + } + + if (tf_get_global_state.drops != NIL) + { + pfree(tf_get_global_state.drops); + tf_get_global_state.drops = NIL; + tf_get_global_state.next_drop = NULL; + } + + if (tf_get_global_state.relkinds != NIL) + { + pfree(tf_get_global_state.relkinds); + tf_get_global_state.relkinds = NIL; + } + + if (tf_get_global_state.relstorages != NIL) + { + pfree(tf_get_global_state.relstorages); + tf_get_global_state.relstorages = NIL; + } + + if (tf_get_global_state.schema_oids != NIL) + { + pfree(tf_get_global_state.schema_oids); + tf_get_global_state.schema_oids = NIL; + } + +} + +static List * +split_string_to_list(const char *input) +{ + List *result = NIL; + char *input_copy; + char *token; + + if (input == NULL) + return NIL; + + input_copy = pstrdup(input); + + token = strtok(input_copy, ","); + + while (token != NULL) + { + if (*token != '\0') + { + result = lappend(result, pstrdup(token)); + } + + token = strtok(NULL, ","); + } + + pfree(input_copy); + + return result; +} + +static void +get_filters_from_guc() +{ + Relation rel; + ScanKeyData skey[2]; + SysScanDesc scan; + HeapTuple tuple; + char *current_schemas = NULL; + char *current_relkinds = NULL; + char *current_relstorages = NULL; + List *schema_names = NIL; + ListCell *lc; + + rel = heap_open(DbRoleSettingRelationId, RowExclusiveLock); + ScanKeyInit(&skey[0], + Anum_pg_db_role_setting_setdatabase, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(MyDatabaseId)); + + /* + * Lookup for not role specific configuration + */ + ScanKeyInit(&skey[1], + Anum_pg_db_role_setting_setrole, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(InvalidOid)); + scan = systable_beginscan(rel, DbRoleSettingDatidRolidIndexId, true, NULL, 2, skey); + + tuple = systable_getnext(scan); + if (HeapTupleIsValid(tuple)) + { + bool isnull; + Datum str_datum; + + str_datum = heap_getattr(tuple, Anum_pg_db_role_setting_setconfig, + RelationGetDescr(rel), &isnull); + if (!isnull) + { + ArrayType *array; + Datum *elems; + bool *nulls; + int nelems; + + array = DatumGetArrayTypeP(str_datum); + deconstruct_array(array, TEXTOID, -1, false, 'i', + &elems, &nulls, &nelems); + for (int i = 0; i < nelems; i++) + { + if (nulls[i]) + continue; + char *str = TextDatumGetCString(elems[i]); + + if (strncmp(str, "arenadata_toolkit.tracking_schemas=", 35) == 0) + current_schemas = pstrdup(str + 35); + else if (strncmp(str, "arenadata_toolkit.tracking_relstorages=", 39) == 0) + current_relstorages = pstrdup(str + 39); + else if (strncmp(str, "arenadata_toolkit.tracking_relkinds=", 36) == 0) + current_relkinds = pstrdup(str + 36); + pfree(str); + } + } + } + systable_endscan(scan); + heap_close(rel, RowExclusiveLock); + + if (current_schemas) + schema_names = split_string_to_list(current_schemas); + else + schema_names = split_string_to_list(tracked_schemas); + if (current_relstorages) + tf_get_global_state.relstorages = split_string_to_list(current_relstorages); + else + tf_get_global_state.relstorages = split_string_to_list(tracked_rel_storages); + if (current_relkinds) + tf_get_global_state.relkinds = split_string_to_list(current_relkinds); + else + tf_get_global_state.relkinds = split_string_to_list(tracked_rel_kinds); + + foreach(lc, schema_names) + { + Oid nspOid; + char *name = (char *)lfirst(lc); + + nspOid = GetSysCacheOid1(NAMESPACENAME, CStringGetDatum(name)); + + if (!OidIsValid(nspOid)) + { + elog(DEBUG1, "[tracking_get_track] schema \"%s\" does not exist", name); + continue; + } + + tf_get_global_state.schema_oids = lappend_oid(tf_get_global_state.schema_oids, nspOid); + } + + if (schema_names) + pfree(schema_names); +} + + +static bool +schema_is_tracked(Oid schema) +{ + ListCell *lc; + + if (tf_get_global_state.schema_oids == NIL) + return false; + + foreach(lc, tf_get_global_state.schema_oids) + { + Oid tracked_schema = lfirst_oid(lc); + + if (tracked_schema == schema) + return true; + } + + return false; +} + +static bool +relkind_is_tracked(char relkind) +{ + ListCell *lc; + + if (tf_get_global_state.relkinds == NIL) + return false; + + foreach(lc, tf_get_global_state.relkinds) + { + char *tracked_relkind = (char *)lfirst(lc); + + if (tracked_relkind != NULL && *tracked_relkind == relkind) + return true; + } + + return false; +} + +static bool +relstorage_is_tracked(char relstorage) +{ + ListCell *lc; + + if (tf_get_global_state.relstorages == NIL) + return false; + + foreach(lc, tf_get_global_state.relstorages) + { + char *tracked_relstorage = (char *)lfirst(lc); + + if (tracked_relstorage != NULL && *tracked_relstorage == relstorage) + return true; + } + + return false; +} + +/* + * Main logic for getting the size track. + */ +Datum +tracking_get_track_main(PG_FUNCTION_ARGS) +{ + FuncCallContext *funcctx; + tf_main_func_state_t *state; + HeapTuple result; + Datum datums[9]; + bool nulls[9] = {0}; + + LWLockAcquire(tf_shared_state->bloom_set.lock, LW_EXCLUSIVE); + if (!tf_shared_state->bgworker_ready && Gp_role == GP_ROLE_DISPATCH) + { + LWLockRelease(tf_shared_state->bloom_set.lock); + elog(ERROR, "Can't get track before bgworker updates the tracking status."); + } + LWLockRelease(tf_shared_state->bloom_set.lock); + + if (SRF_IS_FIRSTCALL()) + { + MemoryContext oldcontext; + + funcctx = SRF_FIRSTCALL_INIT(); + + RegisterXactCallbackOnce(xact_end_get_callback, NULL); + + oldcontext = MemoryContextSwitchTo(CurTransactionContext); + + if (tf_get_global_state.bloom == NULL) + { + tf_get_global_state.bloom = palloc(FULL_BLOOM_SIZE(bloom_size)); + if (!bloom_set_move(&tf_shared_state->bloom_set, MyDatabaseId, tf_get_global_state.bloom)) + elog(ERROR, "database %u is not tracked", MyDatabaseId); + } + else + { +/* + * This code is needed for the cases when there are several track requests within the same transaction. + * rollback_bloom stands for preserving initial filter state at the moment of the first function call within the + * transaction. + */ + if (tf_get_global_state.rollback_bloom == NULL) + { + tf_get_global_state.rollback_bloom = palloc(FULL_BLOOM_SIZE(bloom_size)); + bloom_copy(tf_get_global_state.bloom, tf_get_global_state.rollback_bloom); + } + bloom_clear(tf_get_global_state.bloom); + if (!bloom_set_move(&tf_shared_state->bloom_set, MyDatabaseId, tf_get_global_state.bloom)) + elog(ERROR, "database %u is not tracked", MyDatabaseId); + } + /* initial snapshot shouldn't return drops */ + if (tf_get_global_state.bloom && !tf_get_global_state.bloom->is_set_all) + { + tf_get_global_state.drops = drops_track_move(MyDatabaseId); + tf_get_global_state.next_drop = list_head(tf_get_global_state.drops); + } + + /* + * Let's retrieve tracking information only once for the transaction. + */ + if (tf_get_global_state.schema_oids == NIL) + get_filters_from_guc(); + + if (tf_get_global_state.relstorages == NIL || + tf_get_global_state.relkinds == NIL || + tf_get_global_state.schema_oids == NIL) + elog(ERROR, "cannot get tracking configuration (schemas, relkinds, reltorage) for database %u", MyDatabaseId); + + MemoryContextSwitchTo(oldcontext); + + oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx); + + funcctx->tuple_desc = CreateTemplateTupleDesc(9, false); + TupleDescInitEntry(funcctx->tuple_desc, (AttrNumber)1, "relid", OIDOID, -1, 0); + TupleDescInitEntry(funcctx->tuple_desc, (AttrNumber)2, "name", NAMEOID, -1, 0); + TupleDescInitEntry(funcctx->tuple_desc, (AttrNumber)3, "relfilenode", OIDOID, -1, 0); + TupleDescInitEntry(funcctx->tuple_desc, (AttrNumber)4, "size", INT8OID, -1, 0); + TupleDescInitEntry(funcctx->tuple_desc, (AttrNumber)5, "state", CHAROID, -1, 0); + TupleDescInitEntry(funcctx->tuple_desc, (AttrNumber)6, "gp_segment_id", INT4OID, -1, 0); + TupleDescInitEntry(funcctx->tuple_desc, (AttrNumber)7, "relnamespace", OIDOID, -1, 0); + TupleDescInitEntry(funcctx->tuple_desc, (AttrNumber)8, "relkind", CHAROID, -1, 0); + TupleDescInitEntry(funcctx->tuple_desc, (AttrNumber)9, "relstorage", CHAROID, -1, 0); + funcctx->tuple_desc = BlessTupleDesc(funcctx->tuple_desc); + + state = (tf_main_func_state_t *) palloc0(sizeof(tf_main_func_state_t)); + funcctx->user_fctx = (void *)state; + + if (tf_get_global_state.bloom) + { + state->pg_class_rel = heap_open(RelationRelationId, AccessShareLock); + state->scan = systable_beginscan(state->pg_class_rel, InvalidOid, false, NULL, 0, NULL); + } + + MemoryContextSwitchTo(oldcontext); + } + + funcctx = SRF_PERCALL_SETUP(); + state = funcctx->user_fctx; + + while (true) + { + Oid filenode; + Oid relnamespace; + char relkind; + char relstorage; + HeapTuple pg_class_tuple; + uint64_t hash; + + if (!state->scan) + break; + + pg_class_tuple = systable_getnext(state->scan); + + if (!HeapTupleIsValid(pg_class_tuple)) + { + systable_endscan(state->scan); + heap_close(state->pg_class_rel, AccessShareLock); + state->scan = NULL; + state->pg_class_rel = NULL; + break; + } + + datums[6] = heap_getattr(pg_class_tuple, Anum_pg_class_relnamespace, RelationGetDescr(state->pg_class_rel), &nulls[6]); + relnamespace = DatumGetObjectId(datums[6]); + + if (!schema_is_tracked(relnamespace)) + continue; + + datums[7] = heap_getattr(pg_class_tuple, Anum_pg_class_relkind, RelationGetDescr(state->pg_class_rel), &nulls[7]); + relkind = CharGetDatum(datums[7]); + + if (!relkind_is_tracked(relkind)) + continue; + + datums[8] = heap_getattr(pg_class_tuple, Anum_pg_class_relstorage, RelationGetDescr(state->pg_class_rel), &nulls[8]); + relstorage = CharGetDatum(datums[8]); + + if (!relstorage_is_tracked(relstorage)) + continue; + + datums[0] = ObjectIdGetDatum(HeapTupleGetOid(pg_class_tuple)); + + datums[1] = heap_getattr(pg_class_tuple, Anum_pg_class_relname, RelationGetDescr(state->pg_class_rel), &nulls[1]); + + datums[2] = heap_getattr(pg_class_tuple, Anum_pg_class_relfilenode, RelationGetDescr(state->pg_class_rel), &nulls[2]); + filenode = DatumGetObjectId(datums[2]); + + if (nulls[2]) + continue; + + /* Bloom filter check */ + hash = bloom_set_calc_hash(&filenode, sizeof(filenode)); + if (!bloom_isset(tf_get_global_state.bloom, hash)) + continue; + + /* + * Taking a lock and calling relation_open in dbsize_calc_size is + * quite suboptimal. The size calculation strategy should be revised + * in future. + */ + datums[3] = Int64GetDatum(dbsize_calc_size(HeapTupleGetOid(pg_class_tuple))); + datums[4] = CharGetDatum(tf_get_global_state.bloom->is_set_all ? 'i' : 'a'); + datums[5] = Int32GetDatum(GpIdentity.segindex); + + result = heap_form_tuple(funcctx->tuple_desc, datums, nulls); + + SRF_RETURN_NEXT(funcctx, HeapTupleGetDatum(result)); + } + + while (true) + { + Oid filenode; + + if (!tf_get_global_state.next_drop) + break; + + filenode = lfirst_oid(tf_get_global_state.next_drop); + tf_get_global_state.next_drop = lnext(tf_get_global_state.next_drop); + + nulls[0] = true; + nulls[1] = true; + datums[2] = filenode; + datums[3] = Int64GetDatum(0); + datums[4] = CharGetDatum('d'); + datums[5] = Int32GetDatum(GpIdentity.segindex); + nulls[6] = true; + nulls[7] = true; + nulls[8] = true; + + result = heap_form_tuple(funcctx->tuple_desc, datums, nulls); + + SRF_RETURN_NEXT(funcctx, HeapTupleGetDatum(result)); + } + + SRF_RETURN_DONE(funcctx); +} + +/* + * Function used in "arenadata_toolkit.tables_track" view. In order to keep bloom filter + * in consistent state across segments this function dispatches main tracking logic to the + * segments in a distributed transaction. + */ +Datum +tracking_get_track(PG_FUNCTION_ARGS) +{ + FuncCallContext *funcctx; + tf_get_func_state_t *state; + HeapTuple result; + Datum values[9]; + bool nulls[9] = {0}; + + if (SRF_IS_FIRSTCALL()) + { + MemoryContext oldcontext = CurrentMemoryContext; + + funcctx = SRF_FIRSTCALL_INIT(); + + /* + * If we use CdbDispatchCommandToSegments, we will face the problem + * that entry db slice won't be part of global transaction and + * immediately commits, killing the chance for bloom filter to + * restore. Therefore, the spi approach for retrieving track at -1 + * segment is chosen. + */ + if (SPI_connect() != SPI_OK_CONNECT) + ereport(ERROR, (errmsg("SPI_connect failed"))); + if (SPI_execute("SELECT * FROM arenadata_toolkit.tracking_get_track_main()", true, 0) != SPI_OK_SELECT) + ereport(ERROR, (errmsg("SPI_execute failed"))); + + MemoryContextSwitchTo(funcctx->multi_call_memory_ctx); + + state = (tf_get_func_state_t *) palloc0(sizeof(tf_get_func_state_t)); + funcctx->user_fctx = (void *)state; + + state->entry_result = SPI_tuptable; + state->entry_processed = SPI_processed; + state->entry_current_row = 0; + + CdbDispatchCommand("SELECT * FROM arenadata_toolkit.tracking_get_track_main()", DF_NEED_TWO_PHASE | DF_CANCEL_ON_ERROR, + &state->cdb_results); + + state->current_result = 0; + state->current_row = 0; + + funcctx->tuple_desc = CreateTemplateTupleDesc(9, false); + TupleDescInitEntry(funcctx->tuple_desc, (AttrNumber)1, "relid", OIDOID, -1, 0); + TupleDescInitEntry(funcctx->tuple_desc, (AttrNumber)2, "name", NAMEOID, -1, 0); + TupleDescInitEntry(funcctx->tuple_desc, (AttrNumber)3, "relfilenode", OIDOID, -1, 0); + TupleDescInitEntry(funcctx->tuple_desc, (AttrNumber)4, "size", INT8OID, -1, 0); + TupleDescInitEntry(funcctx->tuple_desc, (AttrNumber)5, "state", CHAROID, -1, 0); + TupleDescInitEntry(funcctx->tuple_desc, (AttrNumber)6, "gp_segment_id", INT4OID, -1, 0); + TupleDescInitEntry(funcctx->tuple_desc, (AttrNumber)7, "relnamespace", OIDOID, -1, 0); + TupleDescInitEntry(funcctx->tuple_desc, (AttrNumber)8, "relkind", CHAROID, -1, 0); + TupleDescInitEntry(funcctx->tuple_desc, (AttrNumber)9, "relstorage", CHAROID, -1, 0); + funcctx->tuple_desc = BlessTupleDesc(funcctx->tuple_desc); + + if (state->cdb_results.numResults > 0) + { + int natts = funcctx->tuple_desc->natts; + + state->inputFuncInfos = (FmgrInfo *)palloc0(natts * sizeof(FmgrInfo)); + state->typIOParams = (Oid *)palloc0(natts * sizeof(Oid)); + for (int i = 0; i < natts; i++) + { + Oid type = TupleDescAttr(funcctx->tuple_desc, i)->atttypid; + + getTypeInputInfo(type, &state->inputFuncInfos[i].fn_oid, &state->typIOParams[i]); + fmgr_info(state->inputFuncInfos[i].fn_oid, &state->inputFuncInfos[i]); + } + } + + MemoryContextSwitchTo(oldcontext); + } + + funcctx = SRF_PERCALL_SETUP(); + state = funcctx->user_fctx; + + if (state->entry_current_row < state->entry_processed) + { + HeapTuple inputTuple = state->entry_result->vals[state->entry_current_row]; + TupleDesc inputTupleDesc = state->entry_result->tupdesc; + + for (int i = 0; i < funcctx->tuple_desc->natts; i++) + { + values[i] = SPI_getbinval(inputTuple, inputTupleDesc, i + 1, &nulls[i]); + } + HeapTuple resultTuple = heap_form_tuple(funcctx->tuple_desc, values, nulls); + + state->entry_current_row++; + SRF_RETURN_NEXT(funcctx, HeapTupleGetDatum(resultTuple)); + } + + SPI_finish(); + + while (state->current_result < state->cdb_results.numResults) + { + struct pg_result *pgresult = state->cdb_results.pg_results[state->current_result]; + + if (pgresult) + { + int nrows = PQntuples(pgresult); + int ncols = PQnfields(pgresult); + + if (state->current_row < nrows) + { + for (int col = 0; col < ncols; col++) + { + if (PQgetisnull(pgresult, state->current_row, col)) + { + values[col] = (Datum)0; + nulls[col] = true; + } + else + { + char *value = PQgetvalue(pgresult, state->current_row, col); + + values[col] = InputFunctionCall(&state->inputFuncInfos[col], value, state->typIOParams[col], -1); + } + } + result = heap_form_tuple(funcctx->tuple_desc, values, nulls); + state->current_row++; + SRF_RETURN_NEXT(funcctx, HeapTupleGetDatum(result)); + } + else + { + state->current_row = 0; + state->current_result++; + } + } + else + state->current_result++; + } + + SRF_RETURN_DONE(funcctx); +} + +static void +track_db(Oid dbid, bool reg) +{ + if (Gp_role == GP_ROLE_DISPATCH) + { + AlterDatabaseSetStmt stmt; + VariableSetStmt v_stmt; + A_Const aconst = + {.type = T_A_Const,.val = {.type = T_String,.val.str = reg ? "t" : "f"}}; + + stmt.type = T_AlterDatabaseSetStmt; + stmt.dbname = get_database_name(dbid); + + if (stmt.dbname == NULL) + elog(ERROR, "[arenadata_toolkit] database %u does not exist", dbid); + + stmt.setstmt = &v_stmt; + + v_stmt.type = T_VariableSetStmt; + v_stmt.kind = VAR_SET_VALUE; + v_stmt.name = "arenadata_toolkit.tracking_is_db_tracked"; + v_stmt.args = lappend(NIL, &aconst); + v_stmt.is_local = false; + + tf_guc_unlock_tracked_once(); + + AlterDatabaseSet(&stmt); + } + + if (!reg) + bloom_set_unbind(&tf_shared_state->bloom_set, dbid); + else if (!bloom_set_bind(&tf_shared_state->bloom_set, dbid)) + elog(ERROR, "[arenadata_toolkit] exceeded maximum number of tracked databases"); +} + +/* + * Registers current (if dbid is 0) or specific database as tracked by arenadata_toolkit tables tracking. + * Dispatches call to segments by itself. Binds a bloom filter to the registered database if possible. + */ +Datum +tracking_register_db(PG_FUNCTION_ARGS) +{ + Oid dbid = PG_GETARG_OID(0); + + dbid = dbid == InvalidOid ? MyDatabaseId : dbid; + elog(LOG, "[arenadata_toolkit] registering database %u for tracking", dbid); + + track_db(dbid, true); + + LWLockAcquire(tf_shared_state->bloom_set.lock, LW_EXCLUSIVE); + tf_shared_state->is_initialized = true; + LWLockRelease(tf_shared_state->bloom_set.lock); + + if (Gp_role == GP_ROLE_DISPATCH) + { + char *cmd = + psprintf("select arenadata_toolkit.tracking_register_db(%u)", dbid); + + CdbDispatchCommand(cmd, 0, NULL); + } + + PG_RETURN_BOOL(true); +} + +/* + * Stop tracking given database and unbind from bloom. + */ +Datum +tracking_unregister_db(PG_FUNCTION_ARGS) +{ + Oid dbid = PG_GETARG_OID(0); + + dbid = dbid == InvalidOid ? MyDatabaseId : dbid; + elog(LOG, "[arenadata_toolkit] unregistering database %u from tracking", dbid); + + track_db(dbid, false); + + if (Gp_role == GP_ROLE_DISPATCH) + { + char *cmd = + psprintf("select arenadata_toolkit.tracking_unregister_db(%u)", dbid); + + CdbDispatchCommand(cmd, 0, NULL); + } + + PG_RETURN_BOOL(true); +} + +Datum +tracking_set_snapshot_on_recovery(PG_FUNCTION_ARGS) +{ + bool set = PG_GETARG_OID(0); + Oid dbid = PG_GETARG_OID(1); + + dbid = dbid == InvalidOid ? MyDatabaseId : dbid; + + A_Const aconst = + {.type = T_A_Const,.val = {.type = T_String,.val.str = set ? "t" : "f"}}; + + if (Gp_role == GP_ROLE_DISPATCH) + { + AlterDatabaseSetStmt stmt; + VariableSetStmt v_stmt; + + stmt.type = T_AlterDatabaseSetStmt; + stmt.dbname = get_database_name(dbid); + stmt.setstmt = &v_stmt; + + if (stmt.dbname == NULL) + elog(ERROR, "[arenadata_toolkit] database %u does not exist", dbid); + + v_stmt.type = T_VariableSetStmt; + v_stmt.kind = VAR_SET_VALUE; + v_stmt.name = "arenadata_toolkit.tracking_snapshot_on_recovery"; + v_stmt.args = lappend(NIL, &aconst); + v_stmt.is_local = false; + + tf_guc_unlock_full_snapshot_on_recovery_once(); + + AlterDatabaseSet(&stmt); + } + + if (Gp_role == GP_ROLE_DISPATCH) + { + char *cmd = + psprintf("select arenadata_toolkit.tracking_set_snapshot_on_recovery(%s, %u)", + set ? "true" : "false", dbid); + + CdbDispatchCommand(cmd, 0, NULL); + } + + PG_RETURN_BOOL(true); +} + +/* Helper function to add or remove schema from configuration string */ +static char * +add_or_remove_schema(const char *schema_string, const char *schemaName, bool add) +{ + StringInfoData buf; + char *token; + char *str; + bool found = false; + + initStringInfo(&buf); + + if (schema_string && schema_string[0] != '\0') + { + str = pstrdup(schema_string); + token = strtok(str, ","); + while (token != NULL) + { + if (strcmp(token, schemaName) == 0) + { + found = true; + if (add) + { + appendStringInfo(&buf, "%s,", token); + } + } + else + { + appendStringInfo(&buf, "%s,", token); + } + token = strtok(NULL, ","); + } + pfree(str); + } + + if (add && !found) + { + appendStringInfo(&buf, "%s,", schemaName); + } + + if (buf.len > 0 && buf.data[buf.len - 1] == ',') + { + buf.data[buf.len - 1] = '\0'; + buf.len--; + } + + if (buf.len == 0) + { + pfree(buf.data); + return NULL; + } + + return buf.data; +} + +static void +track_schema(const char *schemaName, Oid dbid, bool reg) +{ + Relation rel; + ScanKeyData skey[2]; + SysScanDesc scan; + HeapTuple tuple; + char *current_schemas = NULL; + char *new_schemas = NULL; + AlterDatabaseSetStmt stmt; + VariableSetStmt v_stmt; + A_Const arg; + + rel = heap_open(DbRoleSettingRelationId, RowExclusiveLock); + ScanKeyInit(&skey[0], + Anum_pg_db_role_setting_setdatabase, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(dbid)); + + /* + * Lookup for not role specific configuration + */ + ScanKeyInit(&skey[1], + Anum_pg_db_role_setting_setrole, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(InvalidOid)); + scan = systable_beginscan(rel, DbRoleSettingDatidRolidIndexId, true, NULL, 2, skey); + + tuple = systable_getnext(scan); + if (HeapTupleIsValid(tuple)) + { + bool isnull; + Datum str_datum; + + str_datum = heap_getattr(tuple, Anum_pg_db_role_setting_setconfig, + RelationGetDescr(rel), &isnull); + if (!isnull) + { + ArrayType *array; + Datum *elems; + int nelems; + + array = DatumGetArrayTypeP(str_datum); + deconstruct_array(array, TEXTOID, -1, false, 'i', + &elems, NULL, &nelems); + for (int i = 0; i < nelems; i++) + { + char *str = TextDatumGetCString(elems[i]); + + if (strncmp(str, "arenadata_toolkit.tracking_schemas=", 35) == 0) + { + current_schemas = pstrdup(str + 35); + break; + } + pfree(str); + } + } + } + systable_endscan(scan); + heap_close(rel, RowExclusiveLock); + + new_schemas = add_or_remove_schema(current_schemas, schemaName, reg); + + stmt.type = T_AlterDatabaseSetStmt; + stmt.dbname = get_database_name(dbid); + + if (stmt.dbname == NULL) + elog(ERROR, "[arenadata_toolkit] database %u does not exist", dbid); + + stmt.setstmt = &v_stmt; + + v_stmt.type = T_VariableSetStmt; + v_stmt.name = "arenadata_toolkit.tracking_schemas"; + v_stmt.is_local = false; + + arg.type = T_A_Const; + arg.val.type = T_String; + arg.val.val.str = new_schemas; + arg.location = -1; + + if (new_schemas == NULL) + { + /* + * If new_schemas is NULL, we're removing the last schema, so let's + * just RESET the variable + */ + v_stmt.kind = VAR_RESET; + v_stmt.args = NIL; + } + else + { + v_stmt.kind = VAR_SET_VALUE; + v_stmt.args = list_make1(&arg); + } + + tf_guc_unlock_schemas_once(); + + AlterDatabaseSet(&stmt); + + if (current_schemas) + pfree(current_schemas); + if (new_schemas) + pfree(new_schemas); +} + +Datum +tracking_register_schema(PG_FUNCTION_ARGS) +{ + const char *schema_name = NameStr(*PG_GETARG_NAME(0)); + Oid dbid = PG_GETARG_OID(1); + + dbid = dbid == InvalidOid ? MyDatabaseId : dbid; + + if (!SearchSysCacheExists1(NAMESPACENAME, CStringGetDatum(schema_name))) + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_SCHEMA), + errmsg("schema %s does not exist", schema_name))); + + elog(LOG, "[arenadata_toolkit] registering schema %s in database %u for tracking", schema_name, dbid); + + track_schema(schema_name, dbid, true); + + PG_RETURN_BOOL(true); +} + +Datum +tracking_unregister_schema(PG_FUNCTION_ARGS) +{ + const char *schema_name = NameStr(*PG_GETARG_NAME(0)); + Oid dbid = PG_GETARG_OID(1); + + dbid = dbid == InvalidOid ? MyDatabaseId : dbid; + + if (!SearchSysCacheExists1(NAMESPACENAME, CStringGetDatum(schema_name))) + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_SCHEMA), + errmsg("schema with OID %s does not exist", schema_name))); + + elog(LOG, "[arenadata_toolkit] registering schema %s in database %u for tracking", schema_name, dbid); + + track_schema(schema_name, dbid, false); + + PG_RETURN_BOOL(true); +} + +static bool +is_valid_relkind(char relkind) +{ + return (relkind == 'r' || relkind == 'i' || relkind == 'S' || + relkind == 't' || relkind == 'v' || relkind == 'c' || + relkind == 'f' || relkind == 'u' || relkind == 'm' || + relkind == 'o' || relkind == 'b' || relkind == 'M'); +} + +Datum +tracking_set_relkinds(PG_FUNCTION_ARGS) +{ + char *relkinds_str = NameStr(*PG_GETARG_NAME(0)); + Oid dbid = PG_GETARG_OID(1); + char *token; + char *str_copy; + bool seen_relkinds[256] = {false}; + StringInfoData buf; + AlterDatabaseSetStmt stmt; + VariableSetStmt v_stmt; + A_Const arg; + + dbid = dbid == InvalidOid ? MyDatabaseId : dbid; + + initStringInfo(&buf); + str_copy = pstrdup(relkinds_str); + token = strtok(str_copy, ","); + while (token != NULL) + { + if (strlen(token) != 1 || !is_valid_relkind(token[0])) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("Invalid relkind: %s", token), + errhint("Valid relkinds are: 'r', 'i', 'S', 't', 'v', 'c', 'f', 'u', 'm', 'o', 'b', 'M'"))); + + if (!seen_relkinds[(unsigned char)token[0]]) + { + appendStringInfoChar(&buf, token[0]); + appendStringInfoChar(&buf, ','); + seen_relkinds[(unsigned char)token[0]] = true; + } + token = strtok(NULL, ","); + } + pfree(str_copy); + + stmt.type = T_AlterDatabaseSetStmt; + stmt.dbname = get_database_name(dbid); + stmt.setstmt = &v_stmt; + + if (stmt.dbname == NULL) + elog(ERROR, "[arenadata_toolkit] database %u does not exist", dbid); + + v_stmt.type = T_VariableSetStmt; + v_stmt.name = "arenadata_toolkit.tracking_relkinds"; + v_stmt.is_local = false; + + arg.type = T_A_Const; + arg.val.type = T_String; + arg.val.val.str = buf.data; + arg.location = -1; + + if (buf.len > 0 && buf.data[buf.len - 1] == ',') + { + buf.data[buf.len - 1] = '\0'; + buf.len--; + } + + if (buf.len == 0) + { + v_stmt.kind = VAR_RESET; + v_stmt.args = NIL; + } + else + { + v_stmt.kind = VAR_SET_VALUE; + v_stmt.args = list_make1(&arg); + elog(LOG, "[arenadata_toolkit] setting relkinds %s in database %u for tracking", buf.data, dbid); + } + + tf_guc_unlock_relkinds_once(); + + AlterDatabaseSet(&stmt); + pfree(buf.data); + + PG_RETURN_BOOL(true); +} + +static bool +is_valid_relstorage(char relstorage) +{ + return (relstorage == 'h' || relstorage == 'a' || relstorage == 'c' || + relstorage == 'x' || relstorage == 'v' || relstorage == 'f'); +} + +Datum +tracking_set_relstorages(PG_FUNCTION_ARGS) +{ + char *relstorages_str = NameStr(*PG_GETARG_NAME(0)); + Oid dbid = PG_GETARG_OID(1); + char *token; + char *str_copy; + bool seen_relstorages[256] = {false}; + StringInfoData buf; + AlterDatabaseSetStmt stmt; + VariableSetStmt v_stmt; + A_Const arg; + + dbid = dbid == InvalidOid ? MyDatabaseId : dbid; + + initStringInfo(&buf); + str_copy = pstrdup(relstorages_str); + token = strtok(str_copy, ","); + while (token != NULL) + { + if (strlen(token) != 1 || !is_valid_relstorage(token[0])) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("Invalid relstorage type: %s", token), + errhint("Valid relstorages are: 'h', 'x', 'a', 'v', 'c', 'f'"))); + + if (!seen_relstorages[(unsigned char)token[0]]) + { + appendStringInfoChar(&buf, token[0]); + appendStringInfoChar(&buf, ','); + seen_relstorages[(unsigned char)token[0]] = true; + } + token = strtok(NULL, ","); + } + pfree(str_copy); + + stmt.type = T_AlterDatabaseSetStmt; + stmt.dbname = get_database_name(dbid); + + if (stmt.dbname == NULL) + elog(ERROR, "[arenadata_toolkit] database %u does not exist", dbid); + + stmt.setstmt = &v_stmt; + + v_stmt.type = T_VariableSetStmt; + v_stmt.name = "arenadata_toolkit.tracking_relstorages"; + v_stmt.is_local = false; + + arg.type = T_A_Const; + arg.val.type = T_String; + arg.val.val.str = buf.data; + arg.location = -1; + + if (buf.len > 0 && buf.data[buf.len - 1] == ',') + { + buf.data[buf.len - 1] = '\0'; + buf.len--; + } + + if (buf.len == 0) + { + v_stmt.kind = VAR_RESET; + v_stmt.args = NIL; + } + else + { + v_stmt.kind = VAR_SET_VALUE; + v_stmt.args = list_make1(&arg); + elog(LOG, "[arenadata_toolkit] setting relstorages %s in database %u for tracking", buf.data, dbid); + } + + tf_guc_unlock_relstorages_once(); + + AlterDatabaseSet(&stmt); + + pfree(buf.data); + + PG_RETURN_BOOL(true); +} + +Datum +tracking_trigger_initial_snapshot(PG_FUNCTION_ARGS) +{ + Oid dbid = PG_GETARG_OID(0); + + dbid = dbid == InvalidOid ? MyDatabaseId : dbid; + elog(LOG, "[arenadata_toolkit] tracking_trigger_initial_snapshot dbid: %d", dbid); + + if (!bloom_set_trigger_bits(&tf_shared_state->bloom_set, dbid, true)) + elog(ERROR, "Failed to find corresponding filter to database %u", dbid); + + if (Gp_role == GP_ROLE_DISPATCH) + { + char *cmd = psprintf("select arenadata_toolkit.tracking_trigger_initial_snapshot(%d)", dbid); + + CdbDispatchCommand(cmd, 0, NULL); + } + + PG_RETURN_BOOL(true); +} + +Datum +tracking_is_initial_snapshot_triggered(PG_FUNCTION_ARGS) +{ + Oid dbid = PG_GETARG_OID(0); + bool is_triggered = false; + + dbid = dbid == InvalidOid ? MyDatabaseId : dbid; + + is_triggered = bloom_set_is_all_bits_triggered(&tf_shared_state->bloom_set, dbid); + + elog(LOG, "[arenadata_toolkit] is_initial_snapshot_triggered:%d dbid: %d", is_triggered, dbid); + + PG_RETURN_BOOL(is_triggered); +} + +Datum +tracking_is_segment_initialized(PG_FUNCTION_ARGS) +{ + ReturnSetInfo *rsi; + TupleDesc tupdesc; + HeapTuple tuple; + Datum values[2]; + bool nulls[2]; + Datum result; + + rsi = (ReturnSetInfo *)fcinfo->resultinfo; + tupdesc = rsi->expectedDesc; + + /* Populate an output tuple. */ + values[0] = Int32GetDatum(GpIdentity.segindex); + LWLockAcquire(tf_shared_state->bloom_set.lock, LW_EXCLUSIVE); + values[1] = BoolGetDatum(tf_shared_state->is_initialized); + LWLockRelease(tf_shared_state->bloom_set.lock); + nulls[0] = nulls[1] = false; + tuple = heap_form_tuple(tupdesc, values, nulls); + result = HeapTupleGetDatum(tuple); + + PG_RETURN_DATUM(result); +}