From 7ddd87d446892de02cdfee461a4df6bd274be903 Mon Sep 17 00:00:00 2001 From: Danny Zaken Date: Mon, 4 Nov 2024 19:25:08 +0200 Subject: [PATCH] Added a separate clients pool for md_store in postgres_client. - Refactored `postgres_client.js` to manage multiple node-postgres client pools. - Updated `config.js` to introduce `POSTGRES_MAX_MD_CLIENTS` configuration. - Modified `md_store.js` to use a specific node-postgres pool for md_store operations. - Added environment variables to override POSTGRES_MAX_MD_CLIENTS in `run_ceph_test_on_test_container.sh`, otherwise max_connections is exceeded. Signed-off-by: Danny Zaken initial commit with separate pools Signed-off-by: Danny Zaken --- config.js | 7 +- src/deploy/NVA_build/standalone_deploy.sh | 2 +- src/server/object_services/md_store.js | 50 ++--- .../run_ceph_test_on_test_container.sh | 4 + src/util/postgres_client.js | 173 ++++++++++++------ 5 files changed, 153 insertions(+), 83 deletions(-) diff --git a/config.js b/config.js index 855de9bc58..18c4dd737a 100644 --- a/config.js +++ b/config.js @@ -199,7 +199,7 @@ config.S3_RESTORE_REQUEST_MAX_DAYS_BEHAVIOUR = 'TRUNCATE'; /** * S3_MAX_KEY_LENGTH controls the maximum key length that will be accepted * by NooBaa endpoints. - * + * * This value is 1024 bytes for S3 but the default is `Infinity` */ config.S3_MAX_KEY_LENGTH = Infinity; @@ -207,7 +207,7 @@ config.S3_MAX_KEY_LENGTH = Infinity; /** * S3_MAX_BUCKET_NAME_LENGTH controls the maximum bucket name length that * will be accepted by NooBaa endpoints. - * + * * This value is 63 bytes for S3 but the default is `Infinity` */ config.S3_MAX_BUCKET_NAME_LENGTH = Infinity; @@ -229,7 +229,8 @@ config.ROOT_KEY_MOUNT = '/etc/noobaa-server/root_keys'; config.DB_TYPE = /** @type {nb.DBType} */ (process.env.DB_TYPE || 'postgres'); -config.POSTGRES_MAX_CLIENTS = (process.env.LOCAL_MD_SERVER === 'true') ? 80 : 10; +config.POSTGRES_DEFAULT_MAX_CLIENTS = 10; +config.POSTGRES_MD_MAX_CLIENTS = (process.env.LOCAL_MD_SERVER === 'true') ? 70 : 10; /////////////////// // SYSTEM CONFIG // diff --git a/src/deploy/NVA_build/standalone_deploy.sh b/src/deploy/NVA_build/standalone_deploy.sh index 482c7be44d..5d26feffc1 100755 --- a/src/deploy/NVA_build/standalone_deploy.sh +++ b/src/deploy/NVA_build/standalone_deploy.sh @@ -15,7 +15,7 @@ const config = exports; config.DEFAULT_POOL_TYPE = 'HOSTS'; config.AGENT_RPC_PORT = '9999'; config.AGENT_RPC_PROTOCOL = 'tcp'; -config.POSTGRES_MAX_CLIENTS = 10; +config.POSTGRES_DEFAULT_MAX_CLIENTS = 10; EOF # setup_env is not needed when running inside a container because the container diff --git a/src/server/object_services/md_store.js b/src/server/object_services/md_store.js index 90cd1ad68c..9b815c46d5 100644 --- a/src/server/object_services/md_store.js +++ b/src/server/object_services/md_store.js @@ -30,33 +30,43 @@ const config = require('../../../config'); class MDStore { constructor(test_suffix = '') { + + + const postgres_pool = 'md'; + this._objects = db_client.instance().define_collection({ name: 'objectmds' + test_suffix, schema: object_md_schema, db_indexes: object_md_indexes, + postgres_pool, }); this._multiparts = db_client.instance().define_collection({ name: 'objectmultiparts' + test_suffix, schema: object_multipart_schema, db_indexes: object_multipart_indexes, + postgres_pool, }); this._parts = db_client.instance().define_collection({ name: 'objectparts' + test_suffix, schema: object_part_schema, db_indexes: object_part_indexes, + postgres_pool, }); this._chunks = db_client.instance().define_collection({ name: 'datachunks' + test_suffix, schema: data_chunk_schema, db_indexes: data_chunk_indexes, + postgres_pool, }); this._blocks = db_client.instance().define_collection({ name: 'datablocks' + test_suffix, schema: data_block_schema, db_indexes: data_block_indexes, + postgres_pool, }); this._sequences = db_client.instance().define_sequence({ name: 'mdsequences' + test_suffix, + postgres_pool, }); } @@ -248,19 +258,16 @@ class MDStore { async remove_objects_and_unset_latest(objs) { if (!objs || !objs.length) return; - await this._objects.updateMany( - { - _id: { - $in: objs.map(obj => obj._id), - } - }, - { - $set: { - deleted: new Date(), - version_past: true, - }, + await this._objects.updateMany({ + _id: { + $in: objs.map(obj => obj._id), } - ); + }, { + $set: { + deleted: new Date(), + version_past: true, + }, + }); } // 2, 3, 4 @@ -1362,7 +1369,7 @@ class MDStore { } /** - * + * * @param {{ * tier: nb.ID, * limit: number, @@ -1387,14 +1394,14 @@ class MDStore { } return this._chunks - .find(selectors, { - projection: { _id: 1 }, - hint: "tiering_index", - sort, - limit, - }) + .find(selectors, { + projection: { _id: 1 }, + hint: "tiering_index", + sort, + limit, + }) - .then(chunks => db_client.instance().uniq_ids(chunks, "_id")); + .then(chunks => db_client.instance().uniq_ids(chunks, "_id")); } @@ -1775,8 +1782,7 @@ class MDStore { find_deleted_blocks(max_delete_time, limit) { const query = { deleted: { - $lt: new Date(max_delete_time), - $exists: true // Force index usage + $lt: new Date(max_delete_time) }, }; return this._blocks.find(query, { diff --git a/src/test/system_tests/ceph_s3_tests/run_ceph_test_on_test_container.sh b/src/test/system_tests/ceph_s3_tests/run_ceph_test_on_test_container.sh index fa53b427ca..078dabd38c 100755 --- a/src/test/system_tests/ceph_s3_tests/run_ceph_test_on_test_container.sh +++ b/src/test/system_tests/ceph_s3_tests/run_ceph_test_on_test_container.sh @@ -25,6 +25,10 @@ export JWT_SECRET=123456789 export NOOBAA_ROOT_SECRET='AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA=' export LOCAL_MD_SERVER=true +#The default max connections for postgres is 100. limit max clients to 10 per pool (per process). +export CONFIG_JS_POSTGRES_MD_MAX_CLIENTS=10 +export CONFIG_JS_POSTGRES_DEFAULT_MAX_CLIENTS=10 + export POSTGRES_HOST=${POSTGRES_HOST:-localhost} export MGMT_ADDR=wss://${NOOBAA_MGMT_SERVICE_HOST:-localhost}:${NOOBAA_MGMT_SERVICE_PORT:-5443} export BG_ADDR=wss://localhost:5445 diff --git a/src/util/postgres_client.js b/src/util/postgres_client.js index da80b2da1b..db809d8e96 100644 --- a/src/util/postgres_client.js +++ b/src/util/postgres_client.js @@ -48,7 +48,7 @@ const COMPARISON_OPS = [ // temporary solution for encode\decode // perfrom encode\decode json for every query to\from the DB // TODO: eventually we want to perform this using the ajv process -// in schema_utils - handle +// in schema_utils - handle function decode_json(schema, val) { if (!schema) { return val; @@ -88,7 +88,7 @@ function decode_json(schema, val) { return val; } -// convert certain types to a known representation +// convert certain types to a known representation function encode_json(schema, val) { if (!val || !schema) { return val; @@ -205,7 +205,7 @@ async function log_query(pg_client, query, tag, millitook, should_explain) { if (millitook > config.LONG_DB_QUERY_THRESHOLD) { dbg.warn( - `QUERY_LOG: LONG QUERY (OVER ${config.LONG_DB_QUERY_THRESHOLD} ms) - + `QUERY_LOG: LONG QUERY (OVER ${config.LONG_DB_QUERY_THRESHOLD} ms) - please check whether the DB and core pods have sufficient CPU and memory `, JSON.stringify(log_obj) ); @@ -255,14 +255,15 @@ async function _do_query(pg_client, q, transaction_counter) { const milliend = time_utils.millistamp(); const millitook = milliend - millistart; if (process.env.PG_ENABLE_QUERY_LOG === 'true' || millitook > config.LONG_DB_QUERY_THRESHOLD) { - // noticed that some failures in explain are invalidating the transaction. - // myabe did something wrong but for now don't try to EXPLAIN the query when in transaction. + // noticed that some failures in explain are invalidating the transaction. + // myabe did something wrong but for now don't try to EXPLAIN the query when in transaction. await log_query(pg_client, q, tag, millitook, /*should_explain*/ transaction_counter === 0); } return res; } catch (err) { if (err.routine === 'index_create' && err.code === '42P07') return; dbg.error(`postgres_client: ${tag}: failed with error:`, err); + await log_query(pg_client, q, tag, 0, /*should_explain*/ false); throw err; } } @@ -320,7 +321,7 @@ function buildPostgresArrayQuery(table_name, update, find) { function convert_array_query(table_name, encoded_update, encoded_find) { let query; - // translation of '.$.' is currently supported for findAndUpdateOne and more specifcally to $set operations. + // translation of '.$.' is currently supported for findAndUpdateOne and more specifcally to $set operations. const update_keys = encoded_update.$set && Object.keys(encoded_update.$set).filter(key => key.includes('.$.')); if (update_keys && update_keys.length) { query = buildPostgresArrayQuery(table_name, encoded_update.$set, encoded_find); @@ -329,15 +330,15 @@ function convert_array_query(table_name, encoded_update, encoded_find) { } class PgTransaction { - constructor(client) { + constructor(pg_pool) { this.transaction_id = trans_counter; trans_counter += 1; - this.client = client; + this.pg_pool = pg_pool; } async begin() { try { - this.pg_client = await this.client.pool.connect(); + this.pg_client = await this.pg_pool.connect(); } catch (err) { dbg.error(DB_CONNECT_ERROR_MESSAGE, err); throw new Error(DB_CONNECT_ERROR_MESSAGE); @@ -375,10 +376,10 @@ class PgTransaction { class BulkOp { - constructor({ client, name, schema }) { + constructor({ pg_pool, name, schema }) { this.name = name; this.schema = schema; - this.transaction = new PgTransaction(client); + this.transaction = new PgTransaction(pg_pool); this.queries = []; this.length = 0; // this.nInserted = 0; @@ -443,7 +444,7 @@ class BulkOp { nMatched, nModified, nRemoved, - // nUpserted is not used in our code. returning 0 + // nUpserted is not used in our code. returning 0 nUpserted: 0, getInsertedIds: not_implemented, getLastOp: not_implemented, @@ -526,6 +527,7 @@ class PostgresSequence { const { name, client } = params; this.name = name; this.client = client; + this.pool_key = params.postgres_pool || 'default'; } // Lazy migration of the old mongo style collection/table based @@ -566,10 +568,18 @@ class PostgresSequence { } } + get_pool() { + const pool = this.client.get_pool(this.pool_key); + if (!pool) { + throw new Error(`The postgres clients pool ${this.pool_key} disconnected`); + } + return pool; + } + async nextsequence() { if (this.init_promise) await this.init_promise; const q = { text: `SELECT nextval('${this.seqname()}')` }; - const res = await _do_query(this.client.pool, q, 0); + const res = await _do_query(this.get_pool(), q, 0); return Number.parseInt(res.rows[0].nextval, 10); } } @@ -592,7 +602,11 @@ class PostgresTable { this.db_indexes = [id_index, ...(db_indexes || [])]; this.schema = schema; this.client = client; - // calculate an advisory_lock_key from this collection by taking the first 32 bit + + // the pool to be used for the table + this.pool_key = table_params.postgres_pool || 'default'; + + // calculate an advisory_lock_key from this collection by taking the first 32 bit // of the sha256 of the table name const advisory_lock_key_string = crypto.createHash('sha256') .update(name) @@ -614,16 +628,26 @@ class PostgresTable { } } + + get_pool() { + const pool = this.client.get_pool(this.pool_key); + if (!pool) { + throw new Error(`The postgres clients pool ${this.pool_key} disconnected`); + } + return pool; + } + + initializeUnorderedBulkOp() { return new UnorderedBulkOp({ name: this.name, - client: this.client, + pg_pool: this.get_pool(), schema: this.schema }); } initializeOrderedBulkOp() { - return new OrderedBulkOp({ name: this.name, client: this.client, schema: this.schema }); + return new OrderedBulkOp({ name: this.name, pg_pool: this.get_pool(), schema: this.schema }); } async _create_table(pool) { @@ -680,7 +704,7 @@ class PostgresTable { async single_query(text, values, client, skip_init) { if (!skip_init) await this.init_promise; const q = { text, values }; - return _do_query(client || this.client.pool, q, 0); + return _do_query(client || this.get_pool(), q, 0); } get_id(data) { @@ -1019,7 +1043,7 @@ class PostgresTable { value: return_value, }); - //Working on each column + //Working on each column try { map = await this.single_query(map_reduce_query); } catch (err) { @@ -1177,10 +1201,10 @@ class PostgresTable { /** * findOneAndUpdate finds the first entry that matches the selector and applies the given update to it. - * + * * If upsert is true, it will create the entry if it doesn't exist - this will only create the entry * with _id, if more fields are needed to be created "atomically" then `upsert_fields` should be used. - * + * * `upsert_fields` is not available in mongo as mongo by default creates even the nested fields if missing. * @param {Record} query aka Selector * @param {Record} update updates to apply @@ -1188,8 +1212,8 @@ class PostgresTable { * upsert: boolean, * returnOriginal: boolean, * upsert_fields: Record - * }} options - * @returns + * }} options + * @returns */ async findOneAndUpdate(query, update, options) { if (options.returnOriginal !== false) { @@ -1237,7 +1261,7 @@ class PostgresTable { let pg_client; let locked; try { - pg_client = await this.client.pool.connect(); + pg_client = await this.get_pool().connect(); let update_res = await this._updateOneWithClient(pg_client, query, update, options); if (update_res.rowCount === 0) { // try to lock the advisory_lock_key for this table, try update and insert the first doc if 0 docs updated @@ -1285,7 +1309,7 @@ class PostgresTable { } async stats() { - // TODO + // TODO return { ns: 'TODO', count: Infinity, @@ -1400,10 +1424,7 @@ class PostgresClient extends EventEmitter { dbg.log0('disconnect called'); this._disconnected_state = true; this._connect_promise = null; - if (this.pool) { - this.pool.end(); - this.pool = null; - } + this._destroy_all_pools(); if (this.ssl_cert_info) { this.ssl_cert_info.removeListener(this._update_ssl_cert); } @@ -1423,10 +1444,26 @@ class PostgresClient extends EventEmitter { return this.new_pool_params.database; } + get_pool(name = 'default') { + return this.pools[name].instance; + } + constructor(params) { super(); this.tables = []; this.sequences = []; + + this.pools = { + default: { + instance: null, + size: config.POSTGRES_DEFAULT_MAX_CLIENTS + }, + md: { + instance: null, + size: config.POSTGRES_MD_MAX_CLIENTS + } + }; + const postgres_port = parseInt(process.env.POSTGRES_PORT || '5432', 10); if (process.env.POSTGRES_CONNECTION_STRING) { @@ -1446,8 +1483,6 @@ class PostgresClient extends EventEmitter { ...params, }; } - // TODO: check the effect of max clients. default is 10 - this.new_pool_params.max = config.POSTGRES_MAX_CLIENTS; // As we now also support external DB we don't want to print secret user data // so this code will mask out passwords from the printed pool params this.print_pool_params = _.omit(this.print_pool_params, 'password'); @@ -1530,8 +1565,8 @@ class PostgresClient extends EventEmitter { const seq = new PostgresSequence({ ...params, client: this }); this.sequences.push(seq); - if (this.pool) { - seq.init_promise = seq._create(this.pool).catch(_.noop); // TODO what is best to do when init_collection fails here? + if (this.default_pool) { + seq.init_promise = seq._create(this.default_pool).catch(_.noop); // TODO what is best to do when init_collection fails here? } return seq; @@ -1545,8 +1580,8 @@ class PostgresClient extends EventEmitter { const table = new PostgresTable({ ...table_params, client: this }); this.tables.push(table); - if (this.pool) { - table.init_promise = table._create_table(this.pool).catch(_.noop); // TODO what is best to do when init_collection fails here? + if (this.default_pool) { + table.init_promise = table._create_table(this.default_pool).catch(_.noop); // TODO what is best to do when init_collection fails here? } return table; @@ -1597,7 +1632,7 @@ class PostgresClient extends EventEmitter { } is_connected() { - return Boolean(this.pool); + return Boolean(this.default_pool); } async connect(skip_init_db) { @@ -1609,33 +1644,22 @@ class PostgresClient extends EventEmitter { } async _connect(skip_init_db) { - // TODO: check if we need to listen for events from pool (https://node-postgres.com/api/pool#events) - // this.pool = new Pool(this.new_pool_params); - - // await this._load_sql_functions(); - // await this.ta - // return this.wait_for_client_init(); - - let pool; let is_connected = false; if (process.env.POSTGRES_SSL_REQUIRED) await this._load_ssl_certs(); while (!is_connected) { try { if (this._disconnected_state) return; - if (this.pool) return; + if (this.default_pool) return; dbg.log0('_connect: called with', this.print_pool_params); - // this._set_connect_timeout(); - // client = await mongodb.MongoClient.connect(this.url, this.config); - pool = new Pool(this.new_pool_params); + + this._create_all_pools(); + if (skip_init_db !== 'skip_init_db') { - await this._init_collections(pool); + await this._init_collections(this.pools.default.instance); } dbg.log0('_connect: connected', this.print_pool_params); // this._reset_connect_timeout(); - this.pool = pool; - this.pool.on('error', err => { - dbg.error('got error on postgres pool', err); - }); + this.default_pool = this.pools.default.instance; this.emit('reconnect'); dbg.log0(`connected`); is_connected = true; @@ -1644,16 +1668,51 @@ class PostgresClient extends EventEmitter { // autoReconnect only works once initial connection is created, // so we need to handle retry in initial connect. dbg.error('_connect: initial connect failed, will retry', err.message); - if (pool) { - pool.end(); - pool = null; - this.pool = null; - } + this._destroy_all_pools(); + this.default_pool = null; await P.delay(3000); } } } + _create_pool(name) { + const pool = this.pools[name]; + if (!pool) { + throw new Error(`create_pool: the pool ${name} is not defined in pools object`); + } + if (!pool.instance) { + pool.instance = new Pool({ ...this.new_pool_params, max: pool.size }); + if (!pool._error_listener) { + pool.error_listener = err => { + dbg.error(`got error on postgres pool ${name}`, err); + }; + } + pool.instance.on('error', pool.error_listener); + } + } + + _create_all_pools() { + for (const pool_name of Object.keys(this.pools)) { + this._create_pool(pool_name); + } + } + + _destroy_pool(name) { + const pool = this.pools[name]; + if (pool && pool.instance) { + pool.instance.removeListener('error', pool.error_listener); + pool.instance.end(); + pool.instance = null; + pool.error_listener = null; + } + } + + _destroy_all_pools() { + for (const pool_name of Object.keys(this.pools)) { + this._destroy_pool(pool_name); + } + } + async _load_ssl_certs() { this.ssl_cert_info = await ssl_utils.get_ssl_cert_info('EXTERNAL_DB') || {}; /** @type {import('tls').ConnectionOptions} */