diff --git a/src/database.ts b/src/database.ts index 8c5d200ab..8e73fbef0 100644 --- a/src/database.ts +++ b/src/database.ts @@ -484,7 +484,11 @@ class Database extends common.GrpcServiceObject { }; this.request = instance.request; this._nthRequest = newAtomicCounter(0); - this._clientId = (this.parent.parent as Spanner)._nthClientId; + if (this.parent && this.parent.parent) { + this._clientId = (this.parent.parent as Spanner)._nthClientId; + } else { + this._clientId = instance._nthClientId; + } this._observabilityOptions = instance._observabilityOptions; // eslint-disable-next-line @typescript-eslint/no-explicit-any this.requestStream = instance.requestStream as any; @@ -1030,7 +1034,11 @@ class Database extends common.GrpcServiceObject { reqOpts.session.creatorRole = options.databaseRole || this.databaseRole || null; - const headers = this.resourceHeader_; + const headers = this._metadataWithRequestId( + this._nextNthRequest(), + 1, + this.resourceHeader_ + ); if (this._getSpanner().routeToLeaderEnabled) { addLeaderAwareRoutingHeader(headers); } @@ -1951,6 +1959,12 @@ class Database extends common.GrpcServiceObject { delete (gaxOpts as GetSessionsOptions).pageToken; } + const headers = this._metadataWithRequestId( + this._nextNthRequest(), + 1, + this.resourceHeader_ + ); + return startTrace('Database.getSessions', this._traceConfig, span => { this.request< google.spanner.v1.ISession, @@ -1961,7 +1975,7 @@ class Database extends common.GrpcServiceObject { method: 'listSessions', reqOpts, gaxOpts, - headers: this.resourceHeader_, + headers: headers, }, (err, sessions, nextPageRequest, ...args) => { if (err) { diff --git a/src/request_id_header.ts b/src/request_id_header.ts index c29b72430..049ee52d8 100644 --- a/src/request_id_header.ts +++ b/src/request_id_header.ts @@ -80,11 +80,13 @@ class XGoogRequestHeaderInterceptor { private nUnary: number; private streamCalls: any[]; private unaryCalls: any[]; - constructor() { + private prefixesToIgnore?: string[]; + constructor(prefixesToIgnore?: string[]) { this.nStream = 0; this.streamCalls = []; this.nUnary = 0; this.unaryCalls = []; + this.prefixesToIgnore = prefixesToIgnore || []; } assertHasHeader(call): string | unknown { @@ -118,13 +120,36 @@ class XGoogRequestHeaderInterceptor { next(call); } + generateServerInterceptor() { + return this.serverInterceptor.bind(this); + } + + reset() { + this.nStream = 0; + this.streamCalls = []; + this.nUnary = 0; + this.unaryCalls = []; + } + serverInterceptor(methodDescriptor, call) { const method = call.handler.path; const isUnary = call.handler.type === 'unary'; + const that = this; const listener = new grpc.ServerListenerBuilder() .withOnReceiveMetadata((metadata, next) => { - const gotReqId = metadata[X_GOOG_SPANNER_REQUEST_ID_HEADER]; - if (!gotReqId) { + let i = 0; + const prefixesToIgnore: string[] = that.prefixesToIgnore || []; + for (i = 0; i < prefixesToIgnore.length; i++) { + const prefix = prefixesToIgnore[i]; + console.log(`prefix: ${prefix}\nmethod: ${method}`); + if (method.startsWith(prefix)) { + next(metadata); + return; + } + } + + const gotReqIds = metadata.get(X_GOOG_SPANNER_REQUEST_ID_HEADER); + if (!(gotReqIds && gotReqIds.length > 0)) { call.sendStatus({ code: grpc.status.INVALID_ARGUMENT, details: `${method} is missing ${X_GOOG_SPANNER_REQUEST_ID_HEADER} header`, @@ -132,21 +157,32 @@ class XGoogRequestHeaderInterceptor { return; } + if (gotReqIds.length !== 1) { + call.sendStatus({ + code: grpc.status.INVALID_ARGUMENT, + details: `${method} set multiple ${X_GOOG_SPANNER_REQUEST_ID_HEADER} headers: ${gotReqIds}`, + }); + return; + } + + const gotReqId = gotReqIds[0].toString(); if (!gotReqId.match(X_GOOG_REQ_ID_REGEX)) { call.sendStatus({ code: grpc.status.INVALID_ARGUMENT, details: `${method} reqID header ${gotReqId} does not match ${X_GOOG_REQ_ID_REGEX}`, }); + return; } - // Otherwise it matched all good. if (isUnary) { - this.unaryCalls.push({method: method, reqId: gotReqId}); - this.nUnary++; + that.unaryCalls.push({method: method, reqId: gotReqId}); + that.nUnary++; } else { - this.streamCalls.push({method: method, reqId: gotReqId}); - this.nStream++; + that.streamCalls.push({method: method, reqId: gotReqId}); + that.nStream++; } + + next(metadata); }) .build(); diff --git a/src/transaction.ts b/src/transaction.ts index 2cc7962c9..26315576e 100644 --- a/src/transaction.ts +++ b/src/transaction.ts @@ -1975,7 +1975,12 @@ export class Transaction extends Dml { statements, } as spannerClient.spanner.v1.ExecuteBatchDmlRequest; - const headers = this.resourceHeader_; + const database = this.session.parent as Database; + const headers = this.session._metadataWithRequestId( + database._nextNthRequest(), + 1, + this.resourceHeader_ + ); if (this._getSpanner().routeToLeaderEnabled) { addLeaderAwareRoutingHeader(headers); } @@ -2208,13 +2213,18 @@ export class Transaction extends Dml { span.addEvent('Starting Commit'); + const database = this.session.parent as Database; this.request( { client: 'SpannerClient', method: 'commit', reqOpts, gaxOpts: gaxOpts, - headers: headers, + headers: this.session._metadataWithRequestId( + database._nextNthRequest(), + 1, + headers + ), }, (err: null | Error, resp: spannerClient.spanner.v1.ICommitResponse) => { this.end(); diff --git a/test/spanner.ts b/test/spanner.ts index f745bfb87..80f20492c 100644 --- a/test/spanner.ts +++ b/test/spanner.ts @@ -102,9 +102,12 @@ describe('Spanner with mock server', () => { const fooNotFoundErr = Object.assign(new Error('Table FOO not found'), { code: grpc.status.NOT_FOUND, }); - const xGoogReqIDInterceptor = new XGoogRequestHeaderInterceptor(); + const xGoogReqIDInterceptor = new XGoogRequestHeaderInterceptor([ + '/google.spanner.admin', + '/google.spanner.admin.database.v1.DatabaseAdmin', + ]); const server = new grpc.Server({ - interceptors: [xGoogReqIDInterceptor.serverInterceptor], + interceptors: [xGoogReqIDInterceptor.generateServerInterceptor()], }); const spannerMock = mock.createMockSpanner(server); mockInstanceAdmin.createMockInstanceAdmin(server); @@ -118,6 +121,10 @@ describe('Spanner with mock server', () => { return instance.database(`database-${dbCounter++}`, options); } + beforeEach(() => { + xGoogReqIDInterceptor.reset(); + }); + before(async () => { sandbox = sinon.createSandbox(); port = await new Promise((resolve, reject) => { @@ -5101,7 +5108,7 @@ describe('Spanner with mock server', () => { const sentMetadata = spannerMock.getMetadata(); const sentRequests = spannerMock.getRequests(); - const xGoogRequestHeaders = []; + const xGoogRequestHeaders: grpc.MetadataValue[] = []; for (const index in sentMetadata) { const req = sentRequests[index]; console.log(index, 'req', req.constructor.name); @@ -5110,7 +5117,9 @@ describe('Spanner with mock server', () => { for (const md of sentMetadata) { const got = md.get('x-goog-spanner-request-id'); if (got) { - xGoogRequestHeaders.push(...got); + for (const value of got) { + xGoogRequestHeaders.push(value); + } } } console.log('xGoogHeaders', xGoogRequestHeaders!);