Skip to content

Commit

Permalink
Implement TransportRequestPromise, TransportRequestCallback in AWS Si…
Browse files Browse the repository at this point in the history
…gner (#823)

It was previously not possible to abort a request that used the AWS SigV4
signer transport.

Fixes #819

Signed-off-by: Tim Cooper <[email protected]>
Co-authored-by: Tim Cooper <[email protected]>
  • Loading branch information
u873838 and Tim Cooper authored Jul 22, 2024
1 parent f7dc49f commit aba5fda
Show file tree
Hide file tree
Showing 3 changed files with 222 additions and 29 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
- Bumps `eslint-plugin-prettier` from 5.1.3 to 5.2.1
- Bumps `semver` from 7.6.2 to 7.6.3
### Changed
- Return a transport object from `AwsSigv4SignerTransport.request` that has an `.abort()` method that allows in-flight requests to be canceled
### Deprecated
### Removed
### Fixed
Expand Down
102 changes: 74 additions & 28 deletions lib/aws/shared.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,12 @@ const Connection = require('../Connection');
const Transport = require('../Transport');
const aws4 = require('aws4');
const AwsSigv4SignerError = require('./errors');
const { RequestAbortedError } = require('../errors');
const crypto = require('crypto');
const { toMs } = Transport.internals;

const noop = () => {};

function giveAwsCredentialProviderLoader(getAwsSDKCredentialsProvider) {
return function loadAwsCredentialProvider() {
return new Promise((resolve, reject) => {
Expand Down Expand Up @@ -118,45 +121,88 @@ function giveAwsV4Signer(awsDefaultCredentialsProvider) {
}

if (!expired) {
if (typeof callback === 'undefined') {
if (callback === undefined) {
return super.request(params, options);
} else {
return super.request(params, options, callback);
}
super.request(params, options, callback);
return;
}

// In AWS SDK V2 Credentials.refreshPromise should be available.
if (currentCredentials && typeof currentCredentials.refreshPromise === 'function') {
if (typeof callback === 'undefined') {
return currentCredentials.refreshPromise().then(() => {
return super.request(params, options);
});
} else {
let p = null;

// promises support
if (callback === undefined) {
let onFulfilled = null;
let onRejected = null;
p = new Promise((resolve, reject) => {
onFulfilled = resolve;
onRejected = reject;
});
callback = function callback(err, result) {
err ? onRejected(err) : onFulfilled(result);
};
}

const meta = {
aborted: false,
};

let request = { abort: noop };

const transportReturn = {
then(onFulfilled, onRejected) {
if (p != null) {
return p.then(onFulfilled, onRejected);
}
},
catch(onRejected) {
if (p != null) {
return p.catch(onRejected);
}
},
abort() {
meta.aborted = true;
request.abort();
return this;
},
finally(onFinally) {
if (p != null) {
return p.finally(onFinally);
}
},
};

const makeRequest = () => {
// In AWS SDK V2 Credentials.refreshPromise should be available.
if (currentCredentials && typeof currentCredentials.refreshPromise === 'function') {
currentCredentials
.refreshPromise()
.then(() => {
super.request(params, options, callback);
if (meta.aborted) {
return callback(new RequestAbortedError());
}
request = super.request(params, options, callback);
})
.catch(callback);
return;
}
}
// For AWS SDK V3.
else {
opts
.getCredentials()
.then((credentials) => {
if (meta.aborted) {
return callback(new RequestAbortedError());
}
credentialsState.credentials = credentials;
request = super.request(params, options, callback);
})
.catch(callback);
}
};

// For AWS SDK V3 or when the client has not acquired credentials yet.
if (typeof callback === 'undefined') {
return opts.getCredentials().then((credentials) => {
credentialsState.credentials = credentials;
return super.request(params, options);
});
} else {
opts
.getCredentials()
.then((credentials) => {
credentialsState.credentials = credentials;
super.request(params, options, callback);
})
.catch(callback);
}
makeRequest();

return transportReturn;
}
}

Expand Down
148 changes: 147 additions & 1 deletion test/unit/lib/aws/awssigv4signer.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ const AwsSigv4SignerError = require('../../../../lib/aws/errors');
const { Connection } = require('../../../../index');
const { Client, buildServer } = require('../../../utils');
const { debug } = require('console');
const { RequestAbortedError } = require('../../../../lib/errors');

test('Sign with SigV4', (t) => {
t.plan(4);
Expand Down Expand Up @@ -697,6 +698,151 @@ test('Should create child client', (t) => {
}
count++;
});
t.not_same(child.transport._auth, child2.transport._auth);
t.notSame(child.transport._auth, child2.transport._auth);
});
});

test('pre-request abort (promises)', (t) => {
t.plan(1);

function handler() {
t.fail('Request should have been aborted');
}

buildServer(handler, ({ port }, server) => {
const mockCreds = {
accessKeyId: uuidv4(),
secretAccessKey: uuidv4(),
};

const AwsSigv4SignerOptions = {
region: 'us-east-1',
getCredentials: () =>
new Promise((resolve) => {
setTimeout(() => resolve(mockCreds), 100);
}),
};

const client = new Client({
...AwsSigv4Signer(AwsSigv4SignerOptions),
node: `http://localhost:${port}`,
});

const promise = client.search({
index: 'test',
q: 'foo:bar',
});

promise
.then(() => {
t.fail('Should fail');
})
.catch((err) => {
t.ok(err instanceof RequestAbortedError);
})
.finally(() => {
server.stop();
});

promise.abort();
});
});

test('pre-request abort (callback)', (t) => {
t.plan(1);

function handler() {
t.fail('Request should have been aborted');
}

buildServer(handler, ({ port }, server) => {
const mockCreds = {
accessKeyId: uuidv4(),
secretAccessKey: uuidv4(),
};

const AwsSigv4SignerOptions = {
region: 'us-east-1',
getCredentials: () =>
new Promise((resolve) => {
setTimeout(() => resolve(mockCreds), 100);
}),
};

const client = new Client({
...AwsSigv4Signer(AwsSigv4SignerOptions),
node: `http://localhost:${port}`,
});

const cb = client.search(
{
index: 'test',
q: 'foo:bar',
},
(err) => {
t.ok(err instanceof RequestAbortedError);
server.stop();
}
);

cb.abort();
});
});

test('in-flight abort', (t) => {
t.plan(4);

let handlerStartFn = null;
const handleStart = new Promise((resolve) => {
handlerStartFn = resolve;
});

function handler(req) {
t.ok(req);
req.on('close', () => {
t.ok(true);
});
handlerStartFn();
}

buildServer(handler, ({ port }, server) => {
const mockCreds = {
accessKeyId: uuidv4(),
secretAccessKey: uuidv4(),
};

const AwsSigv4SignerOptions = {
region: 'us-east-1',
getCredentials: () =>
new Promise((resolve) => {
setTimeout(() => resolve(mockCreds), 100);
}),
};

const client = new Client({
...AwsSigv4Signer(AwsSigv4SignerOptions),
node: `http://localhost:${port}`,
});

const promise = client.search({
index: 'test',
q: 'foo:bar',
});

promise
.then(() => {
t.fail('Should fail');
})
.catch((err) => {
t.ok(err instanceof RequestAbortedError);
})
.finally(() => {
server.stop();
});

handleStart.then(() => {
t.ok(true);
promise.abort();
});
});
});

0 comments on commit aba5fda

Please sign in to comment.