Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: support for common interface to get a session to support multiplexed session #2204

Merged
merged 12 commits into from
Dec 27, 2024
2 changes: 2 additions & 0 deletions observability-test/database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import * as pfy from '@google-cloud/promisify';
import {grpc} from 'google-gax';
import {MockError} from '../test/mockserver/mockspanner';
import {FakeSessionFactory} from '../test/database';
const {generateWithAllSpansHaveDBName} = require('./helper');

const fakePfy = extend({}, pfy, {
Expand Down Expand Up @@ -93,7 +94,7 @@

class FakeSession {
calledWith_: IArguments;
formattedName_: any;

Check warning on line 97 in observability-test/database.ts

View workflow job for this annotation

GitHub Actions / lint

Unexpected any. Specify a different type
constructor() {
this.calledWith_ = arguments;
}
Expand Down Expand Up @@ -234,6 +235,7 @@
'./codec': {codec: fakeCodec},
'./partial-result-stream': {partialResultStream: fakePartialResultStream},
'./session-pool': {SessionPool: FakeSessionPool},
'./session-factory': {SessionFactory: FakeSessionFactory},
'./session': {Session: FakeSession},
'./table': {Table: FakeTable},
'./transaction-runner': {
Expand Down
21 changes: 9 additions & 12 deletions src/database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import {
} from 'google-gax';
import {Backup} from './backup';
import {BatchTransaction, TransactionIdentifier} from './batch-transaction';
import {SessionFactory, SessionFactoryInterface} from './session-factory';
import {
google as databaseAdmin,
google,
Expand Down Expand Up @@ -111,7 +112,6 @@ import {
setSpanErrorAndException,
traceConfig,
} from './instrument';

export type GetDatabaseRolesCallback = RequestCallback<
IDatabaseRole,
databaseAdmin.spanner.admin.database.v1.IListDatabaseRolesResponse
Expand Down Expand Up @@ -339,6 +339,7 @@ class Database extends common.GrpcServiceObject {
private instance: Instance;
formattedName_: string;
pool_: SessionPoolInterface;
sessionFactory_: SessionFactoryInterface;
queryOptions_?: spannerClient.spanner.v1.ExecuteSqlRequest.IQueryOptions;
commonHeaders_: {[k: string]: string};
request: DatabaseRequest;
Expand Down Expand Up @@ -450,15 +451,6 @@ class Database extends common.GrpcServiceObject {
},
} as {} as ServiceObjectConfig);

this.pool_ =
typeof poolOptions === 'function'
? new (poolOptions as SessionPoolConstructor)(this, null)
: new SessionPool(this, poolOptions);
const sessionPoolInstance = this.pool_ as SessionPool;
if (sessionPoolInstance) {
sessionPoolInstance._observabilityOptions =
instance._observabilityOptions;
}
if (typeof poolOptions === 'object') {
this.databaseRole = poolOptions.databaseRole || null;
this.labels = poolOptions.labels || null;
Expand All @@ -480,8 +472,13 @@ class Database extends common.GrpcServiceObject {

// eslint-disable-next-line @typescript-eslint/no-explicit-any
this.requestStream = instance.requestStream as any;
this.pool_.on('error', this.emit.bind(this, 'error'));
this.pool_.open();
this.sessionFactory_ = new SessionFactory(this, name, poolOptions);
this.pool_ = this.sessionFactory_.getPool();
surbhigarg92 marked this conversation as resolved.
Show resolved Hide resolved
const sessionPoolInstance = this.pool_ as SessionPool;
if (sessionPoolInstance) {
sessionPoolInstance._observabilityOptions =
instance._observabilityOptions;
}
this.queryOptions_ = Object.assign(
Object.assign({}, queryOptions),
Database.getEnvironmentQueryOptions()
Expand Down
8 changes: 6 additions & 2 deletions src/multiplexed-session.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import {EventEmitter} from 'events';
import {Database} from './database';
import {Session} from './session';
import {GetSessionCallback} from './session-pool';
import {GetSessionCallback} from './session-factory';
import {
ObservabilityOptions,
getActiveOrNoopSpan,
Expand All @@ -38,7 +38,7 @@ export const MUX_SESSION_CREATE_ERROR = 'mux-session-create-error';
* @constructs MultiplexedSessionInterface
* @param {Database} database The database to create a multiplexed session for.
*/
export interface MultiplexedSessionInterface {
export interface MultiplexedSessionInterface extends EventEmitter {
/**
* When called creates a multiplexed session.
*
Expand Down Expand Up @@ -71,6 +71,7 @@ export class MultiplexedSession
database: Database;
// frequency to create new mux session
refreshRate: number;
isMultiplexedEnabled: boolean;
_multiplexedSession: Session | null;
_refreshHandle!: NodeJS.Timer;
_observabilityOptions?: ObservabilityOptions;
Expand All @@ -81,6 +82,9 @@ export class MultiplexedSession
this.refreshRate = 7;
this._multiplexedSession = null;
this._observabilityOptions = database._observabilityOptions;
process.env.GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS === 'true'
? (this.isMultiplexedEnabled = true)
: (this.isMultiplexedEnabled = false);
}

/**
Expand Down
161 changes: 161 additions & 0 deletions src/session-factory.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
/*!
* Copyright 2024 Google LLC. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import {Database, Session, Transaction} from '.';
import {
MultiplexedSession,
MultiplexedSessionInterface,
} from './multiplexed-session';
import {
SessionPool,
SessionPoolInterface,
SessionPoolOptions,
} from './session-pool';
import {SessionPoolConstructor} from './database';
import {ServiceObjectConfig} from '@google-cloud/common';
const common = require('./common-grpc/service-object');

/**
* @callback GetSessionCallback
* @param {?Error} error Request error, if any.
* @param {Session} session The read-write session.
* @param {Transaction} transaction The transaction object.
*/
export interface GetSessionCallback {
(
err: Error | null,
session?: Session | null,
transaction?: Transaction | null
): void;
}

/**
* Interface for implementing session-factory logic.
*
* @interface SessionFactoryInterface
*/
export interface SessionFactoryInterface {
/**
* When called returns a session.
*
* @name SessionFactoryInterface#getSession
* @param {GetSessionCallback} callback The callback function.
*/
getSession(callback: GetSessionCallback): void;
surbhigarg92 marked this conversation as resolved.
Show resolved Hide resolved
surbhigarg92 marked this conversation as resolved.
Show resolved Hide resolved

/**
* When called returns the pool object.
*
* @name SessionFactoryInterface#getPool
*/
getPool(): SessionPoolInterface;

/**
* To be called when releasing a session.
*
* @name SessionFactoryInterface#release
* @param {Session} session The session to be released.
*/
release(session: Session): void;
}

/**
* Creates a SessionFactory object to manage the creation of
* session-pool and multiplexed session.
*
* @class
*
* @param {Database} database Database object.
* @param {String} name Name of the database.
* @param {SessionPoolOptions|SessionPoolInterface} options Session pool
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SessionPoolConstructor right

Suggested change
* @param {SessionPoolOptions|SessionPoolInterface} options Session pool
* @param {SessionPoolOptions| SessionPoolConstructor} options Session pool

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please let me know if my understanding is incorrect ?

Copy link
Contributor Author

@alkatrivedi alkatrivedi Dec 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since, the SessionPoolConstructor is eventually creating SessionPoolInterface object, hence we can keep SessionPoolInterface here in the documentation

* configuration options or custom pool inteface.
*/
export class SessionFactory
extends common.GrpcServiceObject
implements SessionFactoryInterface
{
multiplexedSession_: MultiplexedSessionInterface;
pool_: SessionPoolInterface;
constructor(
database: Database,
name: String,
poolOptions?: SessionPoolConstructor | SessionPoolOptions
) {
super({
parent: database,
id: name,
} as {} as ServiceObjectConfig);
this.pool_ =
typeof poolOptions === 'function'
? new (poolOptions as SessionPoolConstructor)(database, null)
: new SessionPool(database, poolOptions);
this.pool_.on('error', this.emit.bind(database, 'error'));
this.pool_.open();
this.multiplexedSession_ = new MultiplexedSession(database);
// Multiplexed sessions should only be created if its enabled.
if ((this.multiplexedSession_ as MultiplexedSession).isMultiplexedEnabled) {
this.multiplexedSession_.on('error', this.emit.bind(database, 'error'));
this.multiplexedSession_.createSession();
}
}

/**
* Retrieves a session, either a regular session or a multiplexed session, based on the environment variable configuration.
*
* If the environment variable `GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS` is set to `true`, the method will attempt to
* retrieve a multiplexed session. Otherwise, it will retrieve a session from the regular pool.
*
* @param {GetSessionCallback} callback The callback function.
*/

getSession(callback: GetSessionCallback): void {
const sessionHandler = (this.multiplexedSession_ as MultiplexedSession)
.isMultiplexedEnabled
? this.multiplexedSession_
: this.pool_;

sessionHandler!.getSession((err, session) => callback(err, session));
}

/**
* Returns the regular session pool object.
*
* @returns {SessionPoolInterface} The session pool used by current instance.
*/

getPool(): SessionPoolInterface {
return this.pool_;
}

/**
* Releases a session back to the session pool.
*
* This method returns a session to the pool after it is no longer needed.
* It is a no-op for multiplexed sessions.
*
* @param {Session} session - The session to be released. This should be an instance of `Session` that was
* previously acquired from the session pool.
*
* @throws {Error} If the session is invalid or cannot be released.
*/
release(session: Session): void {
if (
!(this.multiplexedSession_ as MultiplexedSession).isMultiplexedEnabled
) {
this.pool_.release(session);
}
}
}
16 changes: 1 addition & 15 deletions src/session-pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import {
setSpanErrorAndException,
startTrace,
} from './instrument';

import {GetSessionCallback} from './session-factory';
import {
isDatabaseNotFoundError,
isInstanceNotFoundError,
Expand Down Expand Up @@ -59,20 +59,6 @@ export interface GetWriteSessionCallback {
): void;
}

/**
* @callback GetSessionCallback
* @param {?Error} error Request error, if any.
* @param {Session} session The read-write session.
* @param {Transaction} transaction The transaction object.
*/
export interface GetSessionCallback {
(
err: Error | null,
session?: Session | null,
transaction?: Transaction | null
): void;
}

/**
* Interface for implementing custom session pooling logic, it should extend the
* {@link https://nodejs.org/api/events.html|EventEmitter} class and emit any
Expand Down
Loading
Loading