Skip to content

Commit

Permalink
chore: support for common interface to get a session to support multi…
Browse files Browse the repository at this point in the history
…plexed session (#2204)

This PR contains the common interface class SessionFactory which will be responsible for the creation of the Session Pool and the Multiplexed Session(if the env variable would be set to true) upon client initialization.

This PR also contains the getSession method which will return the session(multiplexed/regular) based upon the env variable value.
  • Loading branch information
alkatrivedi authored Dec 27, 2024
1 parent 559031d commit b467380
Show file tree
Hide file tree
Showing 9 changed files with 518 additions and 50 deletions.
2 changes: 2 additions & 0 deletions observability-test/database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import {Instance, MutationGroup, Spanner} from '../src';
import * as pfy from '@google-cloud/promisify';
import {grpc} from 'google-gax';
import {MockError} from '../test/mockserver/mockspanner';
import {FakeSessionFactory} from '../test/database';
const {generateWithAllSpansHaveDBName} = require('./helper');

const fakePfy = extend({}, pfy, {
Expand Down Expand Up @@ -234,6 +235,7 @@ describe('Database', () => {
'./codec': {codec: fakeCodec},
'./partial-result-stream': {partialResultStream: fakePartialResultStream},
'./session-pool': {SessionPool: FakeSessionPool},
'./session-factory': {SessionFactory: FakeSessionFactory},
'./session': {Session: FakeSession},
'./table': {Table: FakeTable},
'./transaction-runner': {
Expand Down
21 changes: 9 additions & 12 deletions src/database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import {
} from 'google-gax';
import {Backup} from './backup';
import {BatchTransaction, TransactionIdentifier} from './batch-transaction';
import {SessionFactory, SessionFactoryInterface} from './session-factory';
import {
google as databaseAdmin,
google,
Expand Down Expand Up @@ -111,7 +112,6 @@ import {
setSpanErrorAndException,
traceConfig,
} from './instrument';

export type GetDatabaseRolesCallback = RequestCallback<
IDatabaseRole,
databaseAdmin.spanner.admin.database.v1.IListDatabaseRolesResponse
Expand Down Expand Up @@ -339,6 +339,7 @@ class Database extends common.GrpcServiceObject {
private instance: Instance;
formattedName_: string;
pool_: SessionPoolInterface;
sessionFactory_: SessionFactoryInterface;
queryOptions_?: spannerClient.spanner.v1.ExecuteSqlRequest.IQueryOptions;
commonHeaders_: {[k: string]: string};
request: DatabaseRequest;
Expand Down Expand Up @@ -450,15 +451,6 @@ class Database extends common.GrpcServiceObject {
},
} as {} as ServiceObjectConfig);

this.pool_ =
typeof poolOptions === 'function'
? new (poolOptions as SessionPoolConstructor)(this, null)
: new SessionPool(this, poolOptions);
const sessionPoolInstance = this.pool_ as SessionPool;
if (sessionPoolInstance) {
sessionPoolInstance._observabilityOptions =
instance._observabilityOptions;
}
if (typeof poolOptions === 'object') {
this.databaseRole = poolOptions.databaseRole || null;
this.labels = poolOptions.labels || null;
Expand All @@ -480,8 +472,13 @@ class Database extends common.GrpcServiceObject {

// eslint-disable-next-line @typescript-eslint/no-explicit-any
this.requestStream = instance.requestStream as any;
this.pool_.on('error', this.emit.bind(this, 'error'));
this.pool_.open();
this.sessionFactory_ = new SessionFactory(this, name, poolOptions);
this.pool_ = this.sessionFactory_.getPool();
const sessionPoolInstance = this.pool_ as SessionPool;
if (sessionPoolInstance) {
sessionPoolInstance._observabilityOptions =
instance._observabilityOptions;
}
this.queryOptions_ = Object.assign(
Object.assign({}, queryOptions),
Database.getEnvironmentQueryOptions()
Expand Down
8 changes: 6 additions & 2 deletions src/multiplexed-session.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import {EventEmitter} from 'events';
import {Database} from './database';
import {Session} from './session';
import {GetSessionCallback} from './session-pool';
import {GetSessionCallback} from './session-factory';
import {
ObservabilityOptions,
getActiveOrNoopSpan,
Expand All @@ -38,7 +38,7 @@ export const MUX_SESSION_CREATE_ERROR = 'mux-session-create-error';
* @constructs MultiplexedSessionInterface
* @param {Database} database The database to create a multiplexed session for.
*/
export interface MultiplexedSessionInterface {
export interface MultiplexedSessionInterface extends EventEmitter {
/**
* When called creates a multiplexed session.
*
Expand Down Expand Up @@ -71,6 +71,7 @@ export class MultiplexedSession
database: Database;
// frequency to create new mux session
refreshRate: number;
isMultiplexedEnabled: boolean;
_multiplexedSession: Session | null;
_refreshHandle!: NodeJS.Timer;
_observabilityOptions?: ObservabilityOptions;
Expand All @@ -81,6 +82,9 @@ export class MultiplexedSession
this.refreshRate = 7;
this._multiplexedSession = null;
this._observabilityOptions = database._observabilityOptions;
process.env.GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS === 'true'
? (this.isMultiplexedEnabled = true)
: (this.isMultiplexedEnabled = false);
}

/**
Expand Down
161 changes: 161 additions & 0 deletions src/session-factory.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
/*!
* Copyright 2024 Google LLC. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import {Database, Session, Transaction} from '.';
import {
MultiplexedSession,
MultiplexedSessionInterface,
} from './multiplexed-session';
import {
SessionPool,
SessionPoolInterface,
SessionPoolOptions,
} from './session-pool';
import {SessionPoolConstructor} from './database';
import {ServiceObjectConfig} from '@google-cloud/common';
const common = require('./common-grpc/service-object');

/**
* @callback GetSessionCallback
* @param {?Error} error Request error, if any.
* @param {Session} session The read-write session.
* @param {Transaction} transaction The transaction object.
*/
export interface GetSessionCallback {
(
err: Error | null,
session?: Session | null,
transaction?: Transaction | null
): void;
}

/**
* Interface for implementing session-factory logic.
*
* @interface SessionFactoryInterface
*/
export interface SessionFactoryInterface {
/**
* When called returns a session.
*
* @name SessionFactoryInterface#getSession
* @param {GetSessionCallback} callback The callback function.
*/
getSession(callback: GetSessionCallback): void;

/**
* When called returns the pool object.
*
* @name SessionFactoryInterface#getPool
*/
getPool(): SessionPoolInterface;

/**
* To be called when releasing a session.
*
* @name SessionFactoryInterface#release
* @param {Session} session The session to be released.
*/
release(session: Session): void;
}

/**
* Creates a SessionFactory object to manage the creation of
* session-pool and multiplexed session.
*
* @class
*
* @param {Database} database Database object.
* @param {String} name Name of the database.
* @param {SessionPoolOptions|SessionPoolInterface} options Session pool
* configuration options or custom pool inteface.
*/
export class SessionFactory
extends common.GrpcServiceObject
implements SessionFactoryInterface
{
multiplexedSession_: MultiplexedSessionInterface;
pool_: SessionPoolInterface;
constructor(
database: Database,
name: String,
poolOptions?: SessionPoolConstructor | SessionPoolOptions
) {
super({
parent: database,
id: name,
} as {} as ServiceObjectConfig);
this.pool_ =
typeof poolOptions === 'function'
? new (poolOptions as SessionPoolConstructor)(database, null)
: new SessionPool(database, poolOptions);
this.pool_.on('error', this.emit.bind(database, 'error'));
this.pool_.open();
this.multiplexedSession_ = new MultiplexedSession(database);
// Multiplexed sessions should only be created if its enabled.
if ((this.multiplexedSession_ as MultiplexedSession).isMultiplexedEnabled) {
this.multiplexedSession_.on('error', this.emit.bind(database, 'error'));
this.multiplexedSession_.createSession();
}
}

/**
* Retrieves a session, either a regular session or a multiplexed session, based on the environment variable configuration.
*
* If the environment variable `GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS` is set to `true`, the method will attempt to
* retrieve a multiplexed session. Otherwise, it will retrieve a session from the regular pool.
*
* @param {GetSessionCallback} callback The callback function.
*/

getSession(callback: GetSessionCallback): void {
const sessionHandler = (this.multiplexedSession_ as MultiplexedSession)
.isMultiplexedEnabled
? this.multiplexedSession_
: this.pool_;

sessionHandler!.getSession((err, session) => callback(err, session));
}

/**
* Returns the regular session pool object.
*
* @returns {SessionPoolInterface} The session pool used by current instance.
*/

getPool(): SessionPoolInterface {
return this.pool_;
}

/**
* Releases a session back to the session pool.
*
* This method returns a session to the pool after it is no longer needed.
* It is a no-op for multiplexed sessions.
*
* @param {Session} session - The session to be released. This should be an instance of `Session` that was
* previously acquired from the session pool.
*
* @throws {Error} If the session is invalid or cannot be released.
*/
release(session: Session): void {
if (
!(this.multiplexedSession_ as MultiplexedSession).isMultiplexedEnabled
) {
this.pool_.release(session);
}
}
}
16 changes: 1 addition & 15 deletions src/session-pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import {
setSpanErrorAndException,
startTrace,
} from './instrument';

import {GetSessionCallback} from './session-factory';
import {
isDatabaseNotFoundError,
isInstanceNotFoundError,
Expand Down Expand Up @@ -59,20 +59,6 @@ export interface GetWriteSessionCallback {
): void;
}

/**
* @callback GetSessionCallback
* @param {?Error} error Request error, if any.
* @param {Session} session The read-write session.
* @param {Transaction} transaction The transaction object.
*/
export interface GetSessionCallback {
(
err: Error | null,
session?: Session | null,
transaction?: Transaction | null
): void;
}

/**
* Interface for implementing custom session pooling logic, it should extend the
* {@link https://nodejs.org/api/events.html|EventEmitter} class and emit any
Expand Down
Loading

0 comments on commit b467380

Please sign in to comment.