diff --git a/observability-test/database.ts b/observability-test/database.ts index 39ebe9afc..f8cc714fa 100644 --- a/observability-test/database.ts +++ b/observability-test/database.ts @@ -47,6 +47,7 @@ import {Instance, MutationGroup, Spanner} from '../src'; import * as pfy from '@google-cloud/promisify'; import {grpc} from 'google-gax'; import {MockError} from '../test/mockserver/mockspanner'; +import {FakeSessionFactory} from '../test/database'; const {generateWithAllSpansHaveDBName} = require('./helper'); const fakePfy = extend({}, pfy, { @@ -234,6 +235,7 @@ describe('Database', () => { './codec': {codec: fakeCodec}, './partial-result-stream': {partialResultStream: fakePartialResultStream}, './session-pool': {SessionPool: FakeSessionPool}, + './session-factory': {SessionFactory: FakeSessionFactory}, './session': {Session: FakeSession}, './table': {Table: FakeTable}, './transaction-runner': { diff --git a/observability-test/spanner.ts b/observability-test/spanner.ts index c740a6dba..c60549776 100644 --- a/observability-test/spanner.ts +++ b/observability-test/spanner.ts @@ -505,7 +505,7 @@ describe('ObservabilityOptions injection and propagation', async () => { db.formattedName_ ); - it('run', () => { + it('run', done => { database.getTransaction((err, tx) => { assert.ifError(err); @@ -549,6 +549,8 @@ describe('ObservabilityOptions injection and propagation', async () => { true, `Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}` ); + + done(); }); }); }); @@ -608,16 +610,14 @@ describe('ObservabilityOptions injection and propagation', async () => { }); }); - it('runStream', () => { + it('runStream', done => { let rowCount = 0; database.getTransaction((err, tx) => { assert.ifError(err); tx! .runStream(selectSql) .on('data', () => rowCount++) - .on('error', () => { - assert.ifError; - }) + .on('error', assert.ifError) .on('stats', () => {}) .on('end', async () => { tx!.end(); @@ -657,6 +657,8 @@ describe('ObservabilityOptions injection and propagation', async () => { expectedEventNames, `Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}` ); + + done(); }); }); }); diff --git a/src/database.ts b/src/database.ts index 5fdf288a8..cf483d214 100644 --- a/src/database.ts +++ b/src/database.ts @@ -36,6 +36,7 @@ import { } from 'google-gax'; import {Backup} from './backup'; import {BatchTransaction, TransactionIdentifier} from './batch-transaction'; +import {SessionFactory, SessionFactoryInterface} from './session-factory'; import { google as databaseAdmin, google, @@ -102,7 +103,6 @@ import Policy = google.iam.v1.Policy; import FieldMask = google.protobuf.FieldMask; import IDatabase = google.spanner.admin.database.v1.IDatabase; import snakeCase = require('lodash.snakecase'); -import {SessionFactory, SessionFactoryInterface} from './session-factory'; import { ObservabilityOptions, Span, @@ -112,7 +112,7 @@ import { setSpanErrorAndException, traceConfig, } from './instrument'; - +import {MultiplexedSessionInterface} from './multiplexed-session'; export type GetDatabaseRolesCallback = RequestCallback< IDatabaseRole, databaseAdmin.spanner.admin.database.v1.IListDatabaseRolesResponse @@ -458,6 +458,7 @@ class Database extends common.GrpcServiceObject { } this.formattedName_ = formattedName_; this.instance = instance; + this._observabilityOptions = instance._observabilityOptions; this._traceConfig = { opts: this._observabilityOptions, dbName: this.formattedName_, @@ -472,7 +473,6 @@ class Database extends common.GrpcServiceObject { this.requestStream = instance.requestStream as any; this.sessionFactory_ = new SessionFactory(this, name, poolOptions); this.pool_ = this.sessionFactory_.getPool(); - this.multiplexedSession_ = this.sessionFactory_.getMultiplexedSession(); const sessionPoolInstance = this.pool_ as SessionPool; if (sessionPoolInstance) { sessionPoolInstance._observabilityOptions = diff --git a/src/session-factory.ts b/src/session-factory.ts index 899aa8625..903e761a0 100644 --- a/src/session-factory.ts +++ b/src/session-factory.ts @@ -27,7 +27,6 @@ import { import {SessionPoolConstructor} from './database'; import {ServiceObjectConfig} from '@google-cloud/common'; const common = require('./common-grpc/service-object'); - /** * @callback GetSessionCallback * @param {?Error} error Request error, if any. @@ -68,15 +67,28 @@ export class SessionFactory ? new (poolOptions as SessionPoolConstructor)(database, null) : new SessionPool(database, poolOptions); this.multiplexedSession_ = new MultiplexedSession(database); - this.pool_.on('error', this.emit.bind(this, 'error')); + this.pool_.on('error', this.emit.bind(database, 'error')); + + // create session pool this.pool_.open(); // multiplexed session should only get created if the env varaible is enabled if (process.env.GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS === 'true') { - this.multiplexedSession_.on('error', this.emit.bind(this, 'error')); + this.multiplexedSession_.on('error', this.emit.bind(database, 'error')); + + // create multiplexed session this.multiplexedSession_.createSession(); } } + /** + * Retrieves the session, either the regular session or the multiplexed session based upon the environment varibale + * If the environment variable GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS is set to `true` the method will attempt to + * retrieve the multiplexed session. Otherwise it will retrieve the session from the pool. + * + * The session is returned asynchronously via the provided callback, which will receive either an error or the session object. + * @param callback + */ + getSession(callback: GetSessionCallback): void { if (process.env.GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS === 'true') { this.multiplexedSession_?.getSession((err, session) => { @@ -89,10 +101,25 @@ export class SessionFactory } } + /** + * Returns the SessionPoolInterface used by the current instance, which provide access to the session pool + * for obtaining database sessions. + * + * @returns {SessionPoolInterface} The session pool used by current instance. + * This object allows interaction with the pool for acquiring and managing sessions. + */ + getPool(): SessionPoolInterface { return this.pool_; } + /** + * Returns the MultiplexedSession used bt the current instance. + * + * @returns {MultiplexedSessionInterface | undefined} The multiplexed session used by current instance. + * The object allows interaction with the multiplexed session. + */ + getMultiplexedSession(): MultiplexedSessionInterface | undefined { return this.multiplexedSession_; } diff --git a/src/session-pool.ts b/src/session-pool.ts index daa6e50f1..025216937 100644 --- a/src/session-pool.ts +++ b/src/session-pool.ts @@ -24,14 +24,13 @@ import {Transaction} from './transaction'; import {NormalCallback} from './common'; import {GoogleError, grpc, ServiceError} from 'google-gax'; import trace = require('stack-trace'); -import {GetSessionCallback} from './session-factory'; import { ObservabilityOptions, getActiveOrNoopSpan, setSpanErrorAndException, startTrace, } from './instrument'; - +import {GetSessionCallback} from './session-factory'; import { isDatabaseNotFoundError, isInstanceNotFoundError, diff --git a/test/database.ts b/test/database.ts index 63aabf74b..33bb64f8f 100644 --- a/test/database.ts +++ b/test/database.ts @@ -46,7 +46,7 @@ import { CommitOptions, MutationSet, } from '../src/transaction'; - +import {SessionFactory} from '../src/session-factory'; let promisified = false; const fakePfy = extend({}, pfy, { promisifyAll(klass, options) { @@ -91,7 +91,7 @@ function fakePartialResultStream(this: Function & {calledWith_: IArguments}) { return this; } -class FakeSession { +export class FakeSession { calledWith_: IArguments; formattedName_: any; constructor() { @@ -109,42 +109,46 @@ class FakeSession { } } -export class FakeSessionFactory extends EventEmitter { +export class FakeSessionPool extends EventEmitter { calledWith_: IArguments; constructor() { super(); this.calledWith_ = arguments; } - getSession(): FakeSession { - return new FakeSession(); - } - getPool(): FakeSessionPool { - return new FakeSessionPool(); - } - getMultiplexedSession(): FakeMultiplexedSession { - return new FakeMultiplexedSession(); - } + open() {} + getSession() {} + release() {} } -export class FakeSessionPool extends EventEmitter { +export class FakeMultiplexedSession extends EventEmitter { calledWith_: IArguments; constructor() { super(); this.calledWith_ = arguments; } - open() {} + createSession() {} getSession() {} - release() {} } -export class FakeMultiplexedSession extends EventEmitter { +export class FakeSessionFactory extends EventEmitter { calledWith_: IArguments; constructor() { super(); this.calledWith_ = arguments; } - createSession() {} - getSession() {} + getSession(): FakeSession | FakeMultiplexedSession { + if (process.env.GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS) { + return new FakeSession(); + } else { + return new FakeMultiplexedSession(); + } + } + getPool(): FakeSessionPool { + return new FakeSessionPool(); + } + getMultiplexedSession(): FakeMultiplexedSession { + return new FakeMultiplexedSession(); + } } class FakeTable { @@ -269,8 +273,9 @@ describe('Database', () => { './batch-transaction': {BatchTransaction: FakeBatchTransaction}, './codec': {codec: fakeCodec}, './partial-result-stream': {partialResultStream: fakePartialResultStream}, - './session': {Session: FakeSession}, + './session-pool': {SessionPool: FakeSessionPool}, './session-factory': {SessionFactory: FakeSessionFactory}, + './session': {Session: FakeSession}, './table': {Table: FakeTable}, './transaction-runner': { TransactionRunner: FakeTransactionRunner, @@ -322,6 +327,32 @@ describe('Database', () => { assert(database.formattedName_, formattedName); }); + it('should re-emit SessionPool errors', done => { + const error = new Error('err'); + + const sessionFactory = new SessionFactory(database, NAME); + + database.on('error', err => { + assert.strictEqual(err, error); + done(); + }); + + sessionFactory.pool_.emit('error', error); + }); + + it('should re-emit Multiplexed Session errors', done => { + process.env.GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS = 'true'; + const error = new Error('err'); + + const sessionFactory = new SessionFactory(database, NAME); + + database.on('error', err => { + assert.strictEqual(err, error); + done(); + }); + sessionFactory.multiplexedSession_?.emit('error', error); + }); + it('should inherit from ServiceObject', done => { const options = {}; diff --git a/test/session-factory.ts b/test/session-factory.ts index a0183acf8..50b949103 100644 --- a/test/session-factory.ts +++ b/test/session-factory.ts @@ -24,13 +24,17 @@ import * as db from '../src/database'; import {FakeTransaction} from './session-pool'; describe('SessionFactory', () => { + process.env.GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS = 'true'; let sessionFactory; + let fakeSession; + let fakeMuxSession; const sandbox = sinon.createSandbox(); const NAME = 'table-name'; const POOL_OPTIONS = {}; function noop() {} const DATABASE = { createSession: noop, + batchCreateSessions: noop, databaseRole: 'parent_role', } as unknown as Database; @@ -53,16 +57,33 @@ describe('SessionFactory', () => { }; beforeEach(() => { + fakeSession = createSession(); + fakeMuxSession = createMuxSession(); + sandbox.stub(DATABASE, 'batchCreateSessions').callsFake(() => { + return Promise.resolve([[fakeSession, fakeSession, fakeSession]]); + }); + sandbox + .stub(DATABASE, 'createSession') + .withArgs({multiplexed: true}) + .callsFake(() => { + return Promise.resolve([fakeMuxSession]); + }); sessionFactory = new SessionFactory(DATABASE, NAME, POOL_OPTIONS); sessionFactory.parent = DATABASE; }); afterEach(() => { sandbox.restore(); + }); + + after(() => { process.env.GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS = 'false'; }); describe('instantiation', () => { + before(() => { + process.env.GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS = 'false'; + }); it('should create a SessionPool object', () => { assert(sessionFactory.pool_ instanceof SessionPool); }); @@ -76,23 +97,12 @@ describe('SessionFactory', () => { FakePool.prototype.on = util.noop; FakePool.prototype.open = util.noop; - const getSession = new SessionFactory( + const sessionFactory = new SessionFactory( DATABASE, NAME, FakePool as {} as db.SessionPoolConstructor ); - assert(getSession.pool_ instanceof FakePool); - }); - - it('should re-emit SessionPool errors', done => { - const error = new Error('err'); - - sessionFactory.on('error', err => { - assert.strictEqual(err, error); - done(); - }); - - sessionFactory.pool_.emit('error', error); + assert(sessionFactory.pool_ instanceof FakePool); }); it('should open the pool', () => { @@ -105,18 +115,6 @@ describe('SessionFactory', () => { assert.strictEqual(openStub.callCount, 1); }); - it('should re-emit MultiplexedSession errors', done => { - process.env.GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS = 'true'; - const error = new Error('err'); - - sessionFactory.on('error', err => { - assert.strictEqual(err, error); - done(); - }); - - sessionFactory.pool_.emit('error', error); - }); - it('should initiate the multiplexed session creation if the env is enabled', () => { process.env.GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS = 'true'; const createSessionStub = sandbox @@ -130,81 +128,88 @@ describe('SessionFactory', () => { }); describe('getSession', () => { - let multiplexedSession; - let fakeMuxSession; - let sessionPool; - let fakeSession; - - beforeEach(() => { - multiplexedSession = new MultiplexedSession(DATABASE); - fakeMuxSession = createMuxSession(); - sessionPool = new SessionPool(DATABASE, POOL_OPTIONS); - fakeSession = createSession(); - }); + describe('for regular session', () => { + before(() => { + process.env.GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS = 'false'; + }); - afterEach(() => { - process.env.GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS = 'false'; - }); + it('should return the regular session if GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS env is disabled', done => { + ( + sandbox.stub(sessionFactory.pool_, 'getSession') as sinon.SinonStub + ).callsFake(callback => callback(null, fakeSession)); + sessionFactory.getSession((err, resp) => { + assert.strictEqual(err, null); + assert.strictEqual(resp, fakeSession); + done(); + }); + }); - it('should return the multiplexed session if GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS env is enabled', () => { - process.env.GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS = 'true'; - ( - sandbox.stub(multiplexedSession, 'getSession') as sinon.SinonStub - ).callsFake(callback => callback(null, fakeMuxSession)); - sessionFactory.getSession((err, resp) => { - assert.strictEqual(err, null); - assert.strictEqual(resp, fakeMuxSession); - assert.strictEqual(resp.multiplexed, true); - assert.strictEqual(fakeMuxSession.multiplexed, true); + it('should return the error from getSession if GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS env is disabled and regular session creation get failed', done => { + const fakeError = new Error(); + ( + sandbox.stub(sessionFactory.pool_, 'getSession') as sinon.SinonStub + ).callsFake(callback => callback(fakeError, null)); + sessionFactory.getSession((err, resp) => { + assert.strictEqual(err, fakeError); + assert.strictEqual(resp, null); + done(); + }); }); }); - it('should return the err for getSession if GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS env is enabled', () => { - process.env.GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS = 'true'; - const fakeError = new Error(); - ( - sandbox.stub(multiplexedSession, 'getSession') as sinon.SinonStub - ).callsFake(callback => callback(fakeError, null)); - sessionFactory.getSession((err, resp) => { - assert.strictEqual(err, fakeError); - assert.strictEqual(resp, null); + describe('for multiplexed session', () => { + before(() => { + process.env.GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS = 'true'; }); - }); - it('should return the multiplexed session if GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS env is disabled', () => { - (sandbox.stub(sessionPool, 'getSession') as sinon.SinonStub).callsFake( - callback => callback(null, fakeSession) - ); - sessionFactory.getSession((err, resp) => { - assert.strictEqual(err, null); - assert.strictEqual(resp, fakeSession); + it('should return the multiplexed session if GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS env is enabled', done => { + ( + sandbox.stub( + sessionFactory.multiplexedSession_, + 'getSession' + ) as sinon.SinonStub + ).callsFake(callback => callback(null, fakeMuxSession)); + sessionFactory.getSession((err, resp) => { + assert.strictEqual(err, null); + assert.strictEqual(resp, fakeMuxSession); + assert.strictEqual(resp?.multiplexed, true); + assert.strictEqual(fakeMuxSession.multiplexed, true); + done(); + }); }); - }); - it('should return the err for getSession if GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS env is disabled', () => { - const fakeError = new Error(); - (sandbox.stub(sessionPool, 'getSession') as sinon.SinonStub).callsFake( - callback => callback(fakeError, null) - ); - sessionFactory.getSession((err, resp) => { - assert.strictEqual(err, fakeError); - assert.strictEqual(resp, null); + it('should return the error from getSession if GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS env is enabled and multiplexed session creation get failed', done => { + const fakeError = new Error(); + ( + sandbox.stub( + sessionFactory.multiplexedSession_, + 'getSession' + ) as sinon.SinonStub + ).callsFake(callback => callback(fakeError, null)); + sessionFactory.getSession((err, resp) => { + assert.strictEqual(err, fakeError); + assert.strictEqual(resp, null); + done(); + }); }); }); }); describe('getPool', () => { - it('should return an instance of SessionPool', () => { - const sessionFactory = new SessionFactory(DATABASE, NAME, POOL_OPTIONS); - assert(sessionFactory.getPool() instanceof SessionPool); + it('should return the session pool object', () => { + const pool = sessionFactory.getPool(); + assert(pool instanceof SessionPool); + assert.deepStrictEqual(pool, sessionFactory.pool_); }); }); describe('getMultiplexedSession', () => { - it('should return an instance of MultiplexedSession', () => { - const sessionFactory = new SessionFactory(DATABASE, NAME, POOL_OPTIONS); - assert( - sessionFactory.getMultiplexedSession() instanceof MultiplexedSession + it('should return the multiplexed session object', () => { + const multiplexedSession = sessionFactory.getMultiplexedSession(); + assert(multiplexedSession instanceof MultiplexedSession); + assert.strictEqual( + multiplexedSession, + sessionFactory.multiplexedSession_ ); }); });