Skip to content

Commit

Permalink
feat: close connections that exceeds opts.idleTimeout (#159)
Browse files Browse the repository at this point in the history
* feat: close connections that exceeds opts.idleTimeout

fixes #148, closes #60 with a mocked Pool and Connection for sqlite driver

* test: SQLite driver.pool connections

* test: findOne({ where }) in sequelize adapter
  • Loading branch information
cyjake authored Aug 17, 2021
1 parent b77da6b commit 837a266
Show file tree
Hide file tree
Showing 10 changed files with 272 additions and 59 deletions.
27 changes: 27 additions & 0 deletions src/drivers/abstract/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,33 @@ module.exports = class AbstractDriver {
constructor(opts = {}) {
const { logger } = opts;
this.logger = logger instanceof Logger ? logger : new Logger(logger);
this.idleTimeout = opts.idleTimeout || 60;
this.options = opts;
}

closeConnection(connection) {
throw new Error('not implemented');
}

recycleConnections() {
const acquiredAt = new Map();
const timeout = this.idleTimeout * 1000;

this.pool.on('acquire', function onAcquire(connection) {
acquiredAt.set(connection, Date.now());
});

const checkIdle = () => {
const now = Date.now();
for (const [ connection, timestamp ] of acquiredAt) {
if (now - timestamp > timeout) {
this.closeConnection(connection);
acquiredAt.delete(connection);
}
}
setTimeout(checkIdle, timeout);
};
checkIdle();
}

/**
Expand Down
47 changes: 31 additions & 16 deletions src/drivers/mysql/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,31 @@ class MysqlDriver extends AbstractDriver {
* @param {boolean} opts.stringifyObjects - stringify object value in dataValues
*/
constructor(opts = {}) {
super(opts);
this.type = 'mysql';
this.pool = this.createPool(opts);
this.escape = this.pool.escape.bind(this.pool);
this.recycleConnections();
}

get escapeId() {
return this.pool.escapeId;
}

createPool(opts) {
// some RDMS use appName to locate the database instead of the actual db, though the table_schema stored in infomation_schema.columns is still the latter one.
const database = opts.appName || opts.database;
const client = opts.client || 'mysql';
const {
host, port, user, password,
connectionLimit, charset, stringifyObjects = false,
} = opts;

if (client !== 'mysql' && client !== 'mysql2') {
throw new Error(`Unsupported mysql client ${client}`);
}
const { host, port, user, password, connectionLimit, charset, stringifyObjects = false } = opts;
// some RDMS use appName to locate the database instead of the actual db, though the table_schema stored in infomation_schema.columns is still the latter one.
const database = opts.appName || opts.database;
super(opts);
this.type = 'mysql';
this.database = database;
this.pool = require(client).createPool({

return require(client).createPool({
connectionLimit,
host,
port,
Expand All @@ -41,12 +55,6 @@ class MysqlDriver extends AbstractDriver {
charset,
stringifyObjects,
});

this.escape = this.pool.escape.bind(this.pool);
}

get escapeId() {
return this.pool.escapeId;
}

getConnection() {
Expand All @@ -61,11 +69,16 @@ class MysqlDriver extends AbstractDriver {
});
}

closeConnection(connection) {
connection.release();
connection.destroy();
}

async query(query, values, opts = {}) {
const { pool, logger } = this;
const { connection } = opts;
const { logger } = this;
const connection = opts.connection || await this.getConnection();
const promise = new Promise((resolve, reject) => {
(connection || pool).query(query, values, (err, results, fields) => {
connection.query(query, values, (err, results, fields) => {
if (err) {
reject(err);
} else {
Expand All @@ -82,6 +95,8 @@ class MysqlDriver extends AbstractDriver {
} catch (err) {
logger.logQueryError(sql, err, Date.now() - start, opts);
throw err;
} finally {
if (!opts.connection) connection.release();
}

logger.logQuery(sql, Date.now() - start, opts);
Expand Down
3 changes: 2 additions & 1 deletion src/drivers/mysql/schema.js
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ module.exports = {
* @param {string} newColumn the new column name
*/
async renameColumn(table, name, newName) {
const { database, escapeId } = this;
const { escapeId } = this;
const { database } = this.options;
const { columnName } = new Attribute(name);
const schemaInfo = await this.querySchemaInfo(database, table);
const { columnName: _, ...columnInfo } = schemaInfo[table].find(entry => {
Expand Down
21 changes: 16 additions & 5 deletions src/drivers/postgres/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -107,21 +107,30 @@ function parameterize(sql, values) {
class PostgresDriver extends AbstractDriver {
constructor(opts = {}) {
super(opts);
const { host, port, user, password, database } = opts;

this.type = 'postgres';
this.pool = new Pool({ host, port, user, password, database });
this.pool = this.createPool(opts);
this.recycleConnections();
}

createPool(opts) {
const { host, port, user, password, database } = opts;
return new Pool({ host, port, user, password, database });
}

async getConnection() {
return await this.pool.connect();
}

async closeConnection(client) {
client.release();
await client.end();
}

async query(query, values, spell = {}) {
const { sql, nestTables } = typeof query === 'string' ? { sql: query } : query;
const { text } = parameterize(sql, values);
const { logger } = this;
const client = spell && spell.connection || this.pool;
const connection = spell.connection || await this.getConnection();
const command = sql.slice(0, sql.indexOf(' ')).toLowerCase();

async function tryQuery(...args) {
Expand All @@ -130,10 +139,12 @@ class PostgresDriver extends AbstractDriver {
let result;

try {
result = await client.query(...args);
result = await connection.query(...args);
} catch (err) {
logger.logQueryError(formatted, err, Date.now() - start, spell);
throw err;
} finally {
if (!spell.connection) connection.release();
}

logger.logQuery(formatted, Date.now() - start, spell);
Expand Down
105 changes: 77 additions & 28 deletions src/drivers/sqlite/index.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
'use strict';

const EventEmitter = require('events');
const strftime = require('strftime');

const AbstractDriver = require('../abstract');
Expand Down Expand Up @@ -34,11 +35,11 @@ function nest(rows, fields, spell) {
}

class Connection {
constructor({ client, database, mode, logger }) {
constructor({ client, database, mode, pool }) {
const { Database, OPEN_READWRITE, OPEN_CREATE } = client;
if (mode == null) mode = OPEN_READWRITE | OPEN_CREATE;
this.database = new Database(database, mode);
this.logger = logger;
this.pool = pool;
}

async query(query, values, spell) {
Expand All @@ -48,15 +49,14 @@ class Connection {
const result = await this.all(sql, values);
if (nestTables) return nest(result.rows, result.fields, spell);
return result;
} else {
return await this.run(sql, values);
}
return await this.run(sql, values);
}

all(sql, values) {
return new Promise((resolve, reject) => {
this.database.all(sql, values, (err, rows, fields) => {
if (err) reject(new Error(err.stack));
if (err) reject(err);
else resolve({ rows, fields });
});
});
Expand All @@ -70,39 +70,88 @@ class Connection {
});
});
}

release() {
this.pool.releaseConnection(this);
}

async destroy() {
const { connections } = this.pool;
const index = connections.indexOf(this);
if (index >= 0) connections.splice(index, 1);

return await new Promise((resolve, reject) => {
this.database.close(function(err) {
if (err) reject(err);
resolve();
});
});
}
}

class Pool extends EventEmitter {
constructor(opts) {
super(opts);
this.options = {
connectionLimit: 10,
...opts,
client: opts.client || 'sqlite3',
};
this.client = require(this.options.client);
this.connections = [];
this.queue = [];
}

async getConnection() {
const { connections, queue, client, options } = this;
for (const connection of connections) {
if (connection.idle) {
connection.idle = false;
this.emit('acquire', connection);
return connection;
}
}
if (connections.length < options.connectionLimit) {
const connection = new Connection({ ...options, client, pool: this });
connections.push(connection);
this.emit('acquire', connection);
return connection;
}
await new Promise(resolve => queue.push(resolve));
return await this.getConnection();
}

releaseConnection(connection) {
connection.idle = true;
this.emit('release', connection);

const { queue } = this;
while (queue.length > 0) {
const task = queue.shift();
task();
}
}
}

class SqliteDriver extends AbstractDriver {
constructor(opts = {}) {
super(opts);
const { logger } = this;
const client = require(opts.client || 'sqlite3');
this.type = 'sqlite';
this.connections = [ new Connection({ ...opts, client, logger }) ];
this.callbacks = [];
this.pool = this.createPool(opts);
this.recycleConnections();
}

async getConnection() {
const { connections, callbacks } = this;

if (connections.length > 0) {
const connection = connections.shift();
return Object.assign(connection, {
release() {
connections.push(connection);
while (callbacks.length > 0) {
const callback = callbacks.shift();
callback();
}
},
});
}
createPool(opts) {
return new Pool(opts);
}

await new Promise((resolve) => {
callbacks.push(resolve);
});
async getConnection() {
return await this.pool.getConnection();
}

return this.getConnection();
async closeConnection(connection) {
connection.release();
await connection.destroy();
}

async query(query, values, opts = {}) {
Expand Down
13 changes: 6 additions & 7 deletions test/unit/adapters/sequelize.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -351,9 +351,8 @@ describe('=> Sequelize adapter', () => {
].map(opts => Post.create(opts)));

const stub = sinon.stub(logger, 'warn').callsFake((tag, message) => {
throw new Error(message);
}
);
throw new Error(message);
});

let posts = await Post.findAll({
where: {
Expand All @@ -364,7 +363,7 @@ describe('=> Sequelize adapter', () => {
assert.equal(posts[0].title, 'Leah');

await Post.destroy({ title: 'Leah' });
const post = await Post.findOne({ title: 'Leah' });
const post = await Post.findOne({ where: { title: 'Leah' } });
assert(!post);

posts = await Post.findAll({
Expand Down Expand Up @@ -494,7 +493,7 @@ describe('=> Sequelize adapter', () => {
{ title: 'Tyrael' },
].map(opts => Post.create(opts)));
await Post.destroy({ title: 'Leah' });
const post = await Post.findOne({ title: 'Leah' });
const post = await Post.findOne({ where: { title: 'Leah' } });
assert(!post);
const post1 = await Post.findOne({ where: { title: 'Leah' }, paranoid: false });
assert.equal(post1.title, 'Leah');
Expand All @@ -518,7 +517,7 @@ describe('=> Sequelize adapter', () => {
const post = await Post.findOne(posts[1].id);
assert.equal(post.title, 'Tyrael');

const post2 = await Post.findOne({ title: 'Leah' });
const post2 = await Post.findOne({ where: { title: 'Leah' } });
assert.equal(post2.title, 'Leah');
});

Expand All @@ -531,7 +530,7 @@ describe('=> Sequelize adapter', () => {
const post = await Post.find(posts[1].id);
assert.equal(post.title, 'Tyrael');

const post2 = await Post.find({ title: 'Leah' });
const post2 = await Post.find({ where: { title: 'Leah' } });
assert.equal(post2.title, 'Leah');
});

Expand Down
Loading

0 comments on commit 837a266

Please sign in to comment.