diff --git a/src/diskquota.c b/src/diskquota.c index cc25b70a..50eb0260 100644 --- a/src/diskquota.c +++ b/src/diskquota.c @@ -176,8 +176,8 @@ static bool is_altering_extension_to_default_version(char *version) { int spi_ret; - bool ret = false; - SPI_connect(); + bool ret = false; + bool connected_in_this_function = SPI_connect_if_not_yet(); spi_ret = SPI_execute("select default_version from pg_available_extensions where name ='diskquota'", true, 0); if (spi_ret != SPI_OK_SELECT) elog(ERROR, "[diskquota] failed to select diskquota default version during diskquota update."); @@ -194,7 +194,7 @@ is_altering_extension_to_default_version(char *version) if (strcmp(version, default_version) == 0) ret = true; } } - SPI_finish(); + SPI_finish_if(connected_in_this_function); return ret; } @@ -959,9 +959,9 @@ static void create_monitor_db_table(void) { const char *sql; - bool connected = false; - bool pushed_active_snap = false; - bool ret = true; + bool connected_in_this_function = false; + bool pushed_active_snap = false; + bool ret = true; /* * Create function diskquota.diskquota_fetch_table_stat in launcher @@ -990,21 +990,14 @@ create_monitor_db_table(void) */ PG_TRY(); { - int ret_code = SPI_connect(); - if (ret_code != SPI_OK_CONNECT) - { - ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), - errmsg("[diskquota launcher] unable to connect to execute internal query. return code: %d.", - ret_code))); - } - connected = true; + connected_in_this_function = SPI_connect_if_not_yet(); PushActiveSnapshot(GetTransactionSnapshot()); pushed_active_snap = true; /* debug_query_string need to be set for SPI_execute utility functions. */ debug_query_string = sql; - ret_code = SPI_execute(sql, false, 0); + int ret_code = SPI_execute(sql, false, 0); if (ret_code != SPI_OK_UTILITY) { int saved_errno = errno; @@ -1024,7 +1017,7 @@ create_monitor_db_table(void) RESUME_INTERRUPTS(); } PG_END_TRY(); - if (connected) SPI_finish(); + SPI_finish_if(connected_in_this_function); if (pushed_active_snap) PopActiveSnapshot(); if (ret) CommitTransactionCommand(); @@ -1054,14 +1047,8 @@ init_database_list(void) StartTransactionCommand(); PushActiveSnapshot(GetTransactionSnapshot()); - ret = SPI_connect(); - if (ret != SPI_OK_CONNECT) - { - int saved_errno = errno; - ereport(ERROR, (errmsg("[diskquota launcher] SPI connect error, reason: %s, return code: %d.", - strerror(saved_errno), ret))); - } - ret = SPI_execute("select dbid from diskquota_namespace.database_list;", true, 0); + bool connected_in_this_function = SPI_connect_if_not_yet(); + ret = SPI_execute("select dbid from diskquota_namespace.database_list;", true, 0); if (ret != SPI_OK_SELECT) { int saved_errno = errno; @@ -1120,6 +1107,7 @@ init_database_list(void) } } num_db = num; + SPI_finish_if(connected_in_this_function); /* As update_monitor_db_mpp needs to execute sql, so can not put in the loop above */ for (int i = 0; i < diskquota_max_monitored_databases; i++) { @@ -1129,7 +1117,6 @@ init_database_list(void) update_monitor_db_mpp(dbEntry->dbid, ADD_DB_TO_MONITOR, LAUNCHER_SCHEMA); } } - SPI_finish(); PopActiveSnapshot(); CommitTransactionCommand(); /* TODO: clean invalid database */ @@ -1181,7 +1168,6 @@ static void do_process_extension_ddl_message(MessageResult *code, ExtensionDDLMessage local_extension_ddl_message) { int old_num_db = num_db; - bool connected = false; bool pushed_active_snap = false; bool ret = true; @@ -1194,13 +1180,6 @@ do_process_extension_ddl_message(MessageResult *code, ExtensionDDLMessage local_ */ PG_TRY(); { - int ret_code = SPI_connect(); - if (ret_code != SPI_OK_CONNECT) - { - ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), - errmsg("unable to connect to execute internal query. return code: %d.", ret_code))); - } - connected = true; PushActiveSnapshot(GetTransactionSnapshot()); pushed_active_snap = true; @@ -1235,7 +1214,6 @@ do_process_extension_ddl_message(MessageResult *code, ExtensionDDLMessage local_ } PG_END_TRY(); - if (connected) SPI_finish(); if (pushed_active_snap) PopActiveSnapshot(); if (ret) CommitTransactionCommand(); @@ -1251,12 +1229,6 @@ do_process_extension_ddl_message(MessageResult *code, ExtensionDDLMessage local_ PushActiveSnapshot(GetTransactionSnapshot()); pushed_active_snap = true; Oid dbid = local_extension_ddl_message.dbid; - int ret_code = SPI_connect(); - if (ret_code != SPI_OK_CONNECT) - { - ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), - errmsg("unable to connect to execute internal query. return code: %d.", ret_code))); - } switch (local_extension_ddl_message.cmd) { case CMD_CREATE_EXTENSION: @@ -1278,7 +1250,6 @@ do_process_extension_ddl_message(MessageResult *code, ExtensionDDLMessage local_ local_extension_ddl_message.cmd))); break; } - SPI_finish(); if (pushed_active_snap) PopActiveSnapshot(); CommitTransactionCommand(); } @@ -1372,8 +1343,9 @@ add_dbid_to_database_list(Oid dbid) { int ret; - Oid argt[1] = {OIDOID}; - Datum argv[1] = {ObjectIdGetDatum(dbid)}; + Oid argt[1] = {OIDOID}; + Datum argv[1] = {ObjectIdGetDatum(dbid)}; + bool connected_in_this_function = SPI_connect_if_not_yet(); ret = SPI_execute_with_args("select * from diskquota_namespace.database_list where dbid = $1", 1, argt, argv, NULL, true, 0); @@ -1391,7 +1363,7 @@ add_dbid_to_database_list(Oid dbid) ereport(WARNING, (errmsg("[diskquota launcher] database id %d is already actived, " "skip database_list update", dbid))); - return; + goto ret; } ret = SPI_execute_with_args("insert into diskquota_namespace.database_list values($1)", 1, argt, argv, NULL, false, @@ -1405,7 +1377,8 @@ add_dbid_to_database_list(Oid dbid) ret, strerror(saved_errno)))); } - return; +ret: + SPI_finish_if(connected_in_this_function); } /* @@ -1415,23 +1388,25 @@ add_dbid_to_database_list(Oid dbid) static void del_dbid_from_database_list(Oid dbid) { - int ret; + bool connected_in_this_function = SPI_connect_if_not_yet(); /* errors will be cached in outer function */ - ret = SPI_execute_with_args("delete from diskquota_namespace.database_list where dbid = $1", 1, - (Oid[]){ - OIDOID, - }, - (Datum[]){ - ObjectIdGetDatum(dbid), - }, - NULL, false, 0); + int ret = SPI_execute_with_args("delete from diskquota_namespace.database_list where dbid = $1", 1, + (Oid[]){ + OIDOID, + }, + (Datum[]){ + ObjectIdGetDatum(dbid), + }, + NULL, false, 0); if (ret != SPI_OK_DELETE) { int saved_errno = errno; ereport(ERROR, (errmsg("[diskquota launcher] del_dbid_from_database_list: reason: %s, ret_code: %d.", strerror(saved_errno), ret))); } + + SPI_finish_if(connected_in_this_function); } /* @@ -1625,10 +1600,8 @@ static const char * diskquota_status_schema_version() { static char ret_version[64]; - int ret = SPI_connect(); - Assert(ret = SPI_OK_CONNECT); - - ret = SPI_execute("select extversion from pg_extension where extname = 'diskquota'", true, 0); + bool connected_in_this_function = SPI_connect_if_not_yet(); + int ret = SPI_execute("select extversion from pg_extension where extname = 'diskquota'", true, 0); if (ret != SPI_OK_SELECT || SPI_processed != 1) { @@ -1655,11 +1628,11 @@ diskquota_status_schema_version() StrNCpy(ret_version, version, sizeof(ret_version) - 1); - SPI_finish(); + SPI_finish_if(connected_in_this_function); return ret_version; fail: - SPI_finish(); + SPI_finish_if(connected_in_this_function); return ""; } diff --git a/src/diskquota.h b/src/diskquota.h index 7c2bbb15..9c47acb5 100644 --- a/src/diskquota.h +++ b/src/diskquota.h @@ -319,4 +319,6 @@ extern HTAB *DiskquotaShmemInitHash(const char *name, long init_size, long max_s extern void refresh_monitored_dbid_cache(void); extern HASHACTION check_hash_fullness(HTAB *hashp, int max_size, const char *warning_message, TimestampTz *last_overflow_report); +bool SPI_connect_if_not_yet(void); +void SPI_finish_if(bool connected_in_this_function); #endif diff --git a/src/diskquota_utility.c b/src/diskquota_utility.c index f306cef8..252d32cd 100644 --- a/src/diskquota_utility.c +++ b/src/diskquota_utility.c @@ -46,6 +46,7 @@ #include "utils/faultinjector.h" #include "utils/fmgroids.h" #include "utils/formatting.h" +#include "utils/memutils.h" #include "utils/numeric.h" #include "libpq-fe.h" #include "funcapi.h" @@ -123,8 +124,6 @@ static void check_role(Oid roleoid, char *rolname, int64 quota_limit_mb); Datum init_table_size_table(PG_FUNCTION_ARGS) { - int ret; - RangeVar *rv; Relation rel; /* @@ -157,10 +156,10 @@ init_table_size_table(PG_FUNCTION_ARGS) * They do not work on entry db since we do not support dispatching * from entry-db currently. */ - SPI_connect(); + bool connected_in_this_function = SPI_connect_if_not_yet(); /* delete all the table size info in table_size if exist. */ - ret = SPI_execute("truncate table diskquota.table_size", false, 0); + int ret = SPI_execute("truncate table diskquota.table_size", false, 0); if (ret != SPI_OK_UTILITY) elog(ERROR, "cannot truncate table_size table: error code %d", ret); ret = SPI_execute( @@ -199,7 +198,7 @@ init_table_size_table(PG_FUNCTION_ARGS) NULL, false, 0); if (ret != SPI_OK_UPDATE) elog(ERROR, "cannot update state table: error code %d", ret); - SPI_finish(); + SPI_finish_if(connected_in_this_function); PG_RETURN_VOID(); } @@ -433,17 +432,10 @@ diskquota_pause(PG_FUNCTION_ARGS) { dbid = PG_GETARG_OID(0); } - if (IS_QUERY_DISPATCHER()) - { - // pause current worker - if (SPI_OK_CONNECT != SPI_connect()) - { - ereport(ERROR, - (errcode(ERRCODE_INTERNAL_ERROR), errmsg("[diskquota] unable to connect to execute SPI query"))); - } - update_monitor_db_mpp(dbid, PAUSE_DB_TO_MONITOR, EXTENSION_SCHEMA); - SPI_finish(); - } + + // pause current worker + if (IS_QUERY_DISPATCHER()) update_monitor_db_mpp(dbid, PAUSE_DB_TO_MONITOR, EXTENSION_SCHEMA); + PG_RETURN_VOID(); } @@ -466,16 +458,7 @@ diskquota_resume(PG_FUNCTION_ARGS) } // active current worker - if (IS_QUERY_DISPATCHER()) - { - if (SPI_OK_CONNECT != SPI_connect()) - { - ereport(ERROR, - (errcode(ERRCODE_INTERNAL_ERROR), errmsg("[diskquota] unable to connect to execute SPI query"))); - } - update_monitor_db_mpp(dbid, RESUME_DB_TO_MONITOR, EXTENSION_SCHEMA); - SPI_finish(); - } + if (IS_QUERY_DISPATCHER()) update_monitor_db_mpp(dbid, RESUME_DB_TO_MONITOR, EXTENSION_SCHEMA); PG_RETURN_VOID(); } @@ -486,7 +469,6 @@ diskquota_resume(PG_FUNCTION_ARGS) static bool is_database_empty(void) { - int ret; TupleDesc tupdesc; bool is_empty = false; @@ -494,9 +476,9 @@ is_database_empty(void) * If error happens in is_database_empty, just return error messages to * the client side. So there is no need to catch the error. */ - SPI_connect(); + bool connected_in_this_function = SPI_connect_if_not_yet(); - ret = SPI_execute( + int ret = SPI_execute( "INSERT INTO diskquota.state SELECT (count(relname) = 0)::int " "FROM " " pg_class AS c, " @@ -531,7 +513,7 @@ is_database_empty(void) /* * And finish our transaction. */ - SPI_finish(); + SPI_finish_if(connected_in_this_function); return is_empty; } @@ -688,9 +670,7 @@ set_role_quota(PG_FUNCTION_ARGS) } check_role(roleoid, rolname, quota_limit_mb); - SPI_connect(); set_quota_config_internal(roleoid, quota_limit_mb, ROLE_QUOTA, INVALID_SEGRATIO, InvalidOid); - SPI_finish(); PG_RETURN_VOID(); } @@ -721,9 +701,7 @@ set_schema_quota(PG_FUNCTION_ARGS) { ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("disk quota can not be set to 0 MB"))); } - SPI_connect(); set_quota_config_internal(namespaceoid, quota_limit_mb, NAMESPACE_QUOTA, INVALID_SEGRATIO, InvalidOid); - SPI_finish(); PG_RETURN_VOID(); } @@ -766,10 +744,8 @@ set_role_tablespace_quota(PG_FUNCTION_ARGS) } check_role(roleoid, rolname, quota_limit_mb); - SPI_connect(); row_id = set_target_internal(roleoid, spcoid, quota_limit_mb, ROLE_TABLESPACE_QUOTA); set_quota_config_internal(row_id, quota_limit_mb, ROLE_TABLESPACE_QUOTA, INVALID_SEGRATIO, spcoid); - SPI_finish(); PG_RETURN_VOID(); } @@ -810,10 +786,8 @@ set_schema_tablespace_quota(PG_FUNCTION_ARGS) ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("disk quota can not be set to 0 MB"))); } - SPI_connect(); row_id = set_target_internal(namespaceoid, spcoid, quota_limit_mb, NAMESPACE_TABLESPACE_QUOTA); set_quota_config_internal(row_id, quota_limit_mb, NAMESPACE_TABLESPACE_QUOTA, INVALID_SEGRATIO, spcoid); - SPI_finish(); PG_RETURN_VOID(); } @@ -833,6 +807,7 @@ set_quota_config_internal(Oid targetoid, int64 quota_limit_mb, QuotaType type, f /* Report error if diskquota is not ready. */ do_check_diskquota_state_is_ready(); + bool connected_in_this_function = SPI_connect_if_not_yet(); /* * If error happens in set_quota_config_internal, just return error messages to * the client side. So there is no need to catch the error. @@ -934,7 +909,7 @@ set_quota_config_internal(Oid targetoid, int64 quota_limit_mb, QuotaType type, f } } - return; + SPI_finish_if(connected_in_this_function); } static int @@ -944,7 +919,7 @@ set_target_internal(Oid primaryoid, Oid spcoid, int64 quota_limit_mb, QuotaType int row_id = -1; bool is_null = false; Datum v; - + bool connected_in_this_function = SPI_connect_if_not_yet(); /* * If error happens in set_target_internal, just return error messages to * the client side. So there is no need to catch the error. @@ -1025,6 +1000,9 @@ set_target_internal(Oid primaryoid, Oid spcoid, int64 quota_limit_mb, QuotaType Assert(is_null == false); row_id = DatumGetInt32(v); } + + SPI_finish_if(connected_in_this_function); + /* No need to update the target table */ return row_id; @@ -1172,10 +1150,7 @@ set_per_segment_quota(PG_FUNCTION_ARGS) ereportif(ratio == 0, ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("per segment quota ratio can not be set to 0"))); - if (SPI_OK_CONNECT != SPI_connect()) - { - ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("unable to connect to execute internal query"))); - } + bool connected_in_this_function = SPI_connect_if_not_yet(); /* * lock table quota_config table in exlusive mode * @@ -1228,7 +1203,7 @@ set_per_segment_quota(PG_FUNCTION_ARGS) /* * And finish our transaction. */ - SPI_finish(); + SPI_finish_if(connected_in_this_function); PG_RETURN_VOID(); } @@ -1236,11 +1211,10 @@ int worker_spi_get_extension_version(int *major, int *minor) { StartTransactionCommand(); - int ret = SPI_connect(); - Assert(ret = SPI_OK_CONNECT); + bool connected_in_this_function = SPI_connect_if_not_yet(); PushActiveSnapshot(GetTransactionSnapshot()); - ret = SPI_execute("select extversion from pg_extension where extname = 'diskquota'", true, 0); + int ret = SPI_execute("select extversion from pg_extension where extname = 'diskquota'", true, 0); if (SPI_processed == 0) { @@ -1283,7 +1257,7 @@ worker_spi_get_extension_version(int *major, int *minor) ret = 0; out: - SPI_finish(); + SPI_finish_if(connected_in_this_function); PopActiveSnapshot(); CommitTransactionCommand(); @@ -1302,23 +1276,23 @@ worker_spi_get_extension_version(int *major, int *minor) List * get_rel_oid_list(bool is_init) { - List *oidlist = NIL; - int ret; + List *oidlist = NIL; + bool connected_in_this_function = SPI_connect_if_not_yet(); #define SELECT_FROM_PG_CATALOG_PG_CLASS "select oid from pg_catalog.pg_class where oid >= $1 and relkind in ('r', 'm')" - ret = SPI_execute_with_args(is_init ? SELECT_FROM_PG_CATALOG_PG_CLASS - " union distinct" - " select tableid from diskquota.table_size where segid = -1" - : SELECT_FROM_PG_CATALOG_PG_CLASS, - 1, - (Oid[]){ - OIDOID, - }, - (Datum[]){ - ObjectIdGetDatum(FirstNormalObjectId), - }, - NULL, false, 0); + int ret = SPI_execute_with_args(is_init ? SELECT_FROM_PG_CATALOG_PG_CLASS + " union distinct" + " select tableid from diskquota.table_size where segid = -1" + : SELECT_FROM_PG_CATALOG_PG_CLASS, + 1, + (Oid[]){ + OIDOID, + }, + (Datum[]){ + ObjectIdGetDatum(FirstNormalObjectId), + }, + NULL, false, 0); #undef SELECT_FROM_PG_CATALOG_PG_CLASS @@ -1336,9 +1310,10 @@ get_rel_oid_list(bool is_init) oid = DatumGetObjectId(SPI_getbinval(tup, tupdesc, 1, &isnull)); if (!isnull) { - List *indexIds; - oidlist = lappend_oid(oidlist, oid); - indexIds = diskquota_get_index_list(oid); + List *indexIds; + MemoryContext oldcontext = MemoryContextSwitchTo(CurTransactionContext); + oidlist = lappend_oid(oidlist, oid); + indexIds = diskquota_get_index_list(oid); if (indexIds != NIL) { foreach (l, indexIds) @@ -1347,8 +1322,10 @@ get_rel_oid_list(bool is_init) } } list_free(indexIds); + MemoryContextSwitchTo(oldcontext); } } + SPI_finish_if(connected_in_this_function); return oidlist; } @@ -1587,6 +1564,7 @@ get_per_segment_ratio(Oid spcoid) if (!OidIsValid(spcoid)) return segratio; + bool connected_in_this_function = SPI_connect_if_not_yet(); /* * using row share lock to lock TABLESPACE_QUTAO * row to avoid concurrently updating the segratio @@ -1620,6 +1598,7 @@ get_per_segment_ratio(Oid spcoid) segratio = DatumGetFloat4(dat); } } + SPI_finish_if(connected_in_this_function); return segratio; } @@ -1709,3 +1688,29 @@ check_hash_fullness(HTAB *hashp, int max_size, const char *warning_message, Time return HASH_FIND; } + +bool +SPI_connect_if_not_yet(void) +{ + if (SPI_context()) return false; + + int rc = SPI_connect(); + + ereportif(rc != SPI_OK_CONNECT, ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), errmsg("[diskquota] SPI_connect failed"), + errdetail("%s", SPI_result_code_string(rc)))); + + return true; +} + +void +SPI_finish_if(bool connected_in_calling_function) +{ + if (!connected_in_calling_function || !SPI_context()) return; + + int rc = SPI_finish(); + + ereportif(rc != SPI_OK_FINISH, ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), errmsg("[diskquota] SPI_finish failed"), + errdetail("%s", SPI_result_code_string(rc)))); +} diff --git a/src/gp_activetable.c b/src/gp_activetable.c index d328c81a..b216a14f 100644 --- a/src/gp_activetable.c +++ b/src/gp_activetable.c @@ -447,13 +447,6 @@ diskquota_fetch_table_stat(PG_FUNCTION_ARGS) { MemoryContext oldcontext; TupleDesc tupdesc; - int ret_code = SPI_connect(); - if (ret_code != SPI_OK_CONNECT) - { - ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), - errmsg("unable to connect to execute internal query. return code: %d.", ret_code))); - } - SPI_finish(); /* create a function context for cross-call persistence */ funcctx = SRF_FIRSTCALL_INIT(); @@ -952,7 +945,8 @@ load_table_size(HTAB *local_table_stats_map) ActiveTableEntryCombined *quota_entry; SPIPlanPtr plan; Portal portal; - char *sql = "select tableid, size, segid from diskquota.table_size"; + char *sql = "select tableid, size, segid from diskquota.table_size"; + bool connected_in_this_function = SPI_connect_if_not_yet(); if ((plan = SPI_prepare(sql, 0, NULL)) == NULL) ereport(ERROR, (errmsg("[diskquota] SPI_prepare(\"%s\") failed", sql))); @@ -1028,6 +1022,7 @@ load_table_size(HTAB *local_table_stats_map) SPI_freetuptable(SPI_tuptable); SPI_cursor_close(portal); SPI_freeplan(plan); + SPI_finish_if(connected_in_this_function); } /* diff --git a/src/quotamodel.c b/src/quotamodel.c index d4dc8a90..2f45180c 100644 --- a/src/quotamodel.c +++ b/src/quotamodel.c @@ -671,10 +671,9 @@ vacuum_disk_quota_model(uint32 id) * Check whether the diskquota state is ready */ bool -check_diskquota_state_is_ready() +check_diskquota_state_is_ready(void) { bool is_ready = false; - bool connected = false; bool pushed_active_snap = false; bool ret = true; @@ -687,12 +686,6 @@ check_diskquota_state_is_ready() */ PG_TRY(); { - if (SPI_OK_CONNECT != SPI_connect()) - { - ereport(ERROR, - (errcode(ERRCODE_INTERNAL_ERROR), errmsg("[diskquota] unable to connect to execute SPI query"))); - } - connected = true; PushActiveSnapshot(GetTransactionSnapshot()); pushed_active_snap = true; is_ready = do_check_diskquota_state_is_ready(); @@ -708,7 +701,6 @@ check_diskquota_state_is_ready() RESUME_INTERRUPTS(); } PG_END_TRY(); - if (connected) SPI_finish(); if (pushed_active_snap) PopActiveSnapshot(); if (ret) CommitTransactionCommand(); @@ -732,7 +724,8 @@ do_check_diskquota_state_is_ready(void) { int ret; TupleDesc tupdesc; - ret = SPI_execute("select state from diskquota.state", true, 0); + bool connected_in_this_function = SPI_connect_if_not_yet(); + ret = SPI_execute("select state from diskquota.state", true, 0); ereportif(ret != SPI_OK_SELECT, ERROR, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("[diskquota] check diskquota state SPI_execute failed: error code %d", ret))); @@ -759,6 +752,8 @@ do_check_diskquota_state_is_ready(void) state = isnull ? DISKQUOTA_UNKNOWN_STATE : DatumGetInt32(dat); bool is_ready = state == DISKQUOTA_READY_STATE; + SPI_finish_if(connected_in_this_function); + if (!is_ready && !diskquota_is_readiness_logged()) { diskquota_set_readiness_logged(); @@ -800,7 +795,6 @@ refresh_disk_quota_model(bool is_init) static void refresh_disk_quota_usage(bool is_init) { - bool connected = false; bool pushed_active_snap = false; bool ret = true; HTAB *local_active_table_stat_map = NULL; @@ -814,12 +808,6 @@ refresh_disk_quota_usage(bool is_init) */ PG_TRY(); { - if (SPI_OK_CONNECT != SPI_connect()) - { - ereport(ERROR, - (errcode(ERRCODE_INTERNAL_ERROR), errmsg("[diskquota] unable to connect to execute SPI query"))); - } - connected = true; PushActiveSnapshot(GetTransactionSnapshot()); pushed_active_snap = true; /* @@ -861,7 +849,6 @@ refresh_disk_quota_usage(bool is_init) RESUME_INTERRUPTS(); } PG_END_TRY(); - if (connected) SPI_finish(); if (pushed_active_snap) PopActiveSnapshot(); if (ret) CommitTransactionCommand(); @@ -1153,7 +1140,6 @@ static void delete_from_table_size_map(char *str) { StringInfoData delete_statement; - int ret; initStringInfo(&delete_statement); appendStringInfo(&delete_statement, @@ -1161,10 +1147,12 @@ delete_from_table_size_map(char *str) "delete from diskquota.table_size " "where (tableid, segid) in ( SELECT * FROM deleted_table );", str); - ret = SPI_execute(delete_statement.data, false, 0); + bool connected_in_this_function = SPI_connect_if_not_yet(); + int ret = SPI_execute(delete_statement.data, false, 0); if (ret != SPI_OK_DELETE) ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("[diskquota] delete_from_table_size_map SPI_execute failed: error code %d", ret))); + SPI_finish_if(connected_in_this_function); pfree(delete_statement.data); } @@ -1172,14 +1160,15 @@ static void insert_into_table_size_map(char *str) { StringInfoData insert_statement; - int ret; initStringInfo(&insert_statement); appendStringInfo(&insert_statement, "insert into diskquota.table_size values %s;", str); - ret = SPI_execute(insert_statement.data, false, 0); + bool connected_in_this_function = SPI_connect_if_not_yet(); + int ret = SPI_execute(insert_statement.data, false, 0); if (ret != SPI_OK_INSERT) ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("[diskquota] insert_into_table_size_map SPI_execute failed: error code %d", ret))); + SPI_finish_if(connected_in_this_function); pfree(insert_statement.data); } @@ -1413,7 +1402,6 @@ truncateStringInfo(StringInfo str, int nchars) static bool load_quotas(void) { - bool connected = false; bool pushed_active_snap = false; bool ret = true; @@ -1426,13 +1414,6 @@ load_quotas(void) */ PG_TRY(); { - int ret_code = SPI_connect(); - if (ret_code != SPI_OK_CONNECT) - { - ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), - errmsg("[diskquota] unable to connect to execute SPI query, return code: %d", ret_code))); - } - connected = true; PushActiveSnapshot(GetTransactionSnapshot()); pushed_active_snap = true; do_load_quotas(); @@ -1448,7 +1429,6 @@ load_quotas(void) RESUME_INTERRUPTS(); } PG_END_TRY(); - if (connected) SPI_finish(); if (pushed_active_snap) PopActiveSnapshot(); if (ret) CommitTransactionCommand(); @@ -1475,6 +1455,7 @@ do_load_quotas(void) */ clean_all_quota_limit(); + bool connected_in_this_function = SPI_connect_if_not_yet(); /* * read quotas from diskquota.quota_config and target table */ @@ -1556,7 +1537,7 @@ do_load_quotas(void) } } - return; + SPI_finish_if(connected_in_this_function); } /* @@ -2301,7 +2282,9 @@ update_monitor_db_mpp(Oid dbid, FetchTableStatType action, const char *schema) "SELECT %s.diskquota_fetch_table_stat(%d, '{%d}'::oid[]) FROM gp_dist_random('gp_id')", schema, action, dbid); /* Add current database to the monitored db cache on all segments */ - int ret = SPI_execute(sql_command.data, true, 0); + bool connected_in_this_function = SPI_connect_if_not_yet(); + int ret = SPI_execute(sql_command.data, true, 0); + SPI_finish_if(connected_in_this_function); pfree(sql_command.data); ereportif(ret != SPI_OK_SELECT, ERROR,