Skip to content

Commit

Permalink
Plumb tests with x-goog-spanner-request-id
Browse files Browse the repository at this point in the history
  • Loading branch information
odeke-em committed Dec 22, 2024
1 parent c86f1cc commit 84bdfb7
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 17 deletions.
20 changes: 17 additions & 3 deletions src/database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,11 @@ class Database extends common.GrpcServiceObject {
};
this.request = instance.request;
this._nthRequest = newAtomicCounter(0);
this._clientId = (this.parent.parent as Spanner)._nthClientId;
if (this.parent && this.parent.parent) {
this._clientId = (this.parent.parent as Spanner)._nthClientId;
} else {
this._clientId = instance._nthClientId;
}
this._observabilityOptions = instance._observabilityOptions;
// eslint-disable-next-line @typescript-eslint/no-explicit-any
this.requestStream = instance.requestStream as any;
Expand Down Expand Up @@ -1030,7 +1034,11 @@ class Database extends common.GrpcServiceObject {
reqOpts.session.creatorRole =
options.databaseRole || this.databaseRole || null;

const headers = this.resourceHeader_;
const headers = this._metadataWithRequestId(
this._nextNthRequest(),
1,
this.resourceHeader_
);
if (this._getSpanner().routeToLeaderEnabled) {
addLeaderAwareRoutingHeader(headers);
}
Expand Down Expand Up @@ -1951,6 +1959,12 @@ class Database extends common.GrpcServiceObject {
delete (gaxOpts as GetSessionsOptions).pageToken;
}

const headers = this._metadataWithRequestId(
this._nextNthRequest(),
1,
this.resourceHeader_
);

return startTrace('Database.getSessions', this._traceConfig, span => {
this.request<
google.spanner.v1.ISession,
Expand All @@ -1961,7 +1975,7 @@ class Database extends common.GrpcServiceObject {
method: 'listSessions',
reqOpts,
gaxOpts,
headers: this.resourceHeader_,
headers: headers,
},
(err, sessions, nextPageRequest, ...args) => {
if (err) {
Expand Down
52 changes: 44 additions & 8 deletions src/request_id_header.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,13 @@ class XGoogRequestHeaderInterceptor {
private nUnary: number;
private streamCalls: any[];
private unaryCalls: any[];
constructor() {
private prefixesToIgnore?: string[];
constructor(prefixesToIgnore?: string[]) {
this.nStream = 0;
this.streamCalls = [];
this.nUnary = 0;
this.unaryCalls = [];
this.prefixesToIgnore = prefixesToIgnore || [];
}

assertHasHeader(call): string | unknown {
Expand Down Expand Up @@ -118,35 +120,69 @@ class XGoogRequestHeaderInterceptor {
next(call);
}

generateServerInterceptor() {
return this.serverInterceptor.bind(this);
}

reset() {
this.nStream = 0;
this.streamCalls = [];
this.nUnary = 0;
this.unaryCalls = [];
}

serverInterceptor(methodDescriptor, call) {
const method = call.handler.path;
const isUnary = call.handler.type === 'unary';
const that = this;
const listener = new grpc.ServerListenerBuilder()
.withOnReceiveMetadata((metadata, next) => {
const gotReqId = metadata[X_GOOG_SPANNER_REQUEST_ID_HEADER];
if (!gotReqId) {
let i = 0;
const prefixesToIgnore: string[] = that.prefixesToIgnore || [];
for (i = 0; i < prefixesToIgnore.length; i++) {
const prefix = prefixesToIgnore[i];
console.log(`prefix: ${prefix}\nmethod: ${method}`);
if (method.startsWith(prefix)) {
next(metadata);
return;
}
}

const gotReqIds = metadata.get(X_GOOG_SPANNER_REQUEST_ID_HEADER);
if (!(gotReqIds && gotReqIds.length > 0)) {
call.sendStatus({
code: grpc.status.INVALID_ARGUMENT,
details: `${method} is missing ${X_GOOG_SPANNER_REQUEST_ID_HEADER} header`,
});
return;
}

if (gotReqIds.length !== 1) {
call.sendStatus({
code: grpc.status.INVALID_ARGUMENT,
details: `${method} set multiple ${X_GOOG_SPANNER_REQUEST_ID_HEADER} headers: ${gotReqIds}`,
});
return;
}

const gotReqId = gotReqIds[0].toString();
if (!gotReqId.match(X_GOOG_REQ_ID_REGEX)) {
call.sendStatus({
code: grpc.status.INVALID_ARGUMENT,
details: `${method} reqID header ${gotReqId} does not match ${X_GOOG_REQ_ID_REGEX}`,
});
return;
}

// Otherwise it matched all good.
if (isUnary) {
this.unaryCalls.push({method: method, reqId: gotReqId});
this.nUnary++;
that.unaryCalls.push({method: method, reqId: gotReqId});
that.nUnary++;
} else {
this.streamCalls.push({method: method, reqId: gotReqId});
this.nStream++;
that.streamCalls.push({method: method, reqId: gotReqId});
that.nStream++;
}

next(metadata);
})
.build();

Expand Down
14 changes: 12 additions & 2 deletions src/transaction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1975,7 +1975,12 @@ export class Transaction extends Dml {
statements,
} as spannerClient.spanner.v1.ExecuteBatchDmlRequest;

const headers = this.resourceHeader_;
const database = this.session.parent as Database;
const headers = this.session._metadataWithRequestId(
database._nextNthRequest(),
1,
this.resourceHeader_
);
if (this._getSpanner().routeToLeaderEnabled) {
addLeaderAwareRoutingHeader(headers);
}
Expand Down Expand Up @@ -2208,13 +2213,18 @@ export class Transaction extends Dml {

span.addEvent('Starting Commit');

const database = this.session.parent as Database;
this.request(
{
client: 'SpannerClient',
method: 'commit',
reqOpts,
gaxOpts: gaxOpts,
headers: headers,
headers: this.session._metadataWithRequestId(
database._nextNthRequest(),
1,
headers
),
},
(err: null | Error, resp: spannerClient.spanner.v1.ICommitResponse) => {
this.end();
Expand Down
17 changes: 13 additions & 4 deletions test/spanner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,12 @@ describe('Spanner with mock server', () => {
const fooNotFoundErr = Object.assign(new Error('Table FOO not found'), {
code: grpc.status.NOT_FOUND,
});
const xGoogReqIDInterceptor = new XGoogRequestHeaderInterceptor();
const xGoogReqIDInterceptor = new XGoogRequestHeaderInterceptor([
'/google.spanner.admin',
'/google.spanner.admin.database.v1.DatabaseAdmin',
]);
const server = new grpc.Server({
interceptors: [xGoogReqIDInterceptor.serverInterceptor],
interceptors: [xGoogReqIDInterceptor.generateServerInterceptor()],
});
const spannerMock = mock.createMockSpanner(server);
mockInstanceAdmin.createMockInstanceAdmin(server);
Expand All @@ -118,6 +121,10 @@ describe('Spanner with mock server', () => {
return instance.database(`database-${dbCounter++}`, options);
}

beforeEach(() => {
xGoogReqIDInterceptor.reset();
});

before(async () => {
sandbox = sinon.createSandbox();
port = await new Promise((resolve, reject) => {
Expand Down Expand Up @@ -5101,7 +5108,7 @@ describe('Spanner with mock server', () => {

const sentMetadata = spannerMock.getMetadata();
const sentRequests = spannerMock.getRequests();
const xGoogRequestHeaders = [];
const xGoogRequestHeaders: grpc.MetadataValue[] = [];
for (const index in sentMetadata) {
const req = sentRequests[index];
console.log(index, 'req', req.constructor.name);
Expand All @@ -5110,7 +5117,9 @@ describe('Spanner with mock server', () => {
for (const md of sentMetadata) {
const got = md.get('x-goog-spanner-request-id');
if (got) {
xGoogRequestHeaders.push(...got);
for (const value of got) {
xGoogRequestHeaders.push(value);
}
}
}
console.log('xGoogHeaders', xGoogRequestHeaders!);
Expand Down

0 comments on commit 84bdfb7

Please sign in to comment.