Skip to content

Commit

Permalink
fix: simplify fetch promise hooking
Browse files Browse the repository at this point in the history
  • Loading branch information
sagivoululumigo committed May 20, 2024
1 parent 23559ea commit d5be678
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 70 deletions.
11 changes: 3 additions & 8 deletions src/extender.js
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ export const hookFunc = (func, options = {}) => {
return function (...args) {
safeBeforeHook.call(this, args, extenderContext);
const originalFnResult = originalFn.apply(this, args);
safeAfterHook.call(this, args, originalFnResult, extenderContext);
return originalFnResult;
// TODO: Make sure we always have a result to return
return safeAfterHook.call(this, args, originalFnResult, extenderContext);
};
};
return getWrappedFunc(func, wrapper);
Expand Down Expand Up @@ -78,12 +78,7 @@ export const hookPromiseAsyncHandlers = (originalPromise, options) => {
const safeThenHandler = safeExecuteAsync(thenHandler, `thenHandler of fail`);
const safeCatchHandler = safeExecuteAsync(catchHandler, `catchHandler of fail`);

const errorHandler = async (err) => {
safeCatchHandler(err);
throw err;
};

return originalPromise.then(safeThenHandler).catch(errorHandler);
return originalPromise.then(safeThenHandler).catch(safeCatchHandler);
};

function defineProperty(obj, name, value) {
Expand Down
32 changes: 32 additions & 0 deletions src/extender.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -271,4 +271,36 @@ describe('extender', () => {
extender.hookPromise(p, { catchHandler: catchHandler });
await expect(p).rejects.toThrow();
});

test('hookPromiseAsyncHandlers -> thenHandler', async () => {
const orderArray = [];
const myFunc = async (seconds) => {
orderArray.push('myFunc - Start');
console.log(`myFunc - Sleeping for ${seconds} seconds`);
await new Promise((resolve) => setTimeout(resolve, seconds * 1000));
console.log('myFunc - Done sleeping');
orderArray.push('myFunc - End');
};

const wrappedPromise = extender.hookPromiseAsyncHandlers(myFunc(2), {
thenHandler: async (value) => {
orderArray.push('Then handler - Start');
console.log('Then handler - Sleeping for 1 second');
await new Promise((resolve) => setTimeout(resolve, 1000));
console.log('Then handler - Done sleeping');
orderArray.push('Then handler - End');
},
});

await wrappedPromise;
orderArray.push('Await wrapped promise - End');

expect(orderArray).toEqual([
'myFunc - Start',
'myFunc - End',
'Then handler - Start',
'Then handler - End',
'Await wrapped promise - End',
]);
});
});
105 changes: 44 additions & 61 deletions src/hooks/fetch.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import { BaseHttp, ParseHttpRequestOptions, RequestData, UrlAndRequestOptions } from './baseHttp';
import * as logger from '../logger';
import { hookFunc, hookPromiseAsyncHandlers } from '../extender';
import { TextDecoder } from 'util';
import { getEventEntitySize } from '../utils';

Expand All @@ -16,9 +15,9 @@ interface ResponseData {

interface RequestExtenderContext {
isTracedDisabled?: boolean;
awsRequestId: string;
transactionId: string;
requestRandomId: string;
awsRequestId?: string;
transactionId?: string;
requestRandomId?: string;
currentSpan?: any;
requestData?: RequestData;
response?: Response;
Expand All @@ -45,21 +44,37 @@ export class FetchInstrumentation {
}

private static attachHooks(): void {
const originalFetch = fetch;

// @ts-ignore
fetch = hookFunc(fetch, {
beforeHook: FetchInstrumentation.beforeFetch,
afterHook: FetchInstrumentation.onFetchPromiseReturned,
});
fetch = async (...args): Promise<Response> => {
const context: RequestExtenderContext = {};
// TODO: Make all the extra logic fail safe
// edit args (add headers for example)
const modifiedArgs = FetchInstrumentation.beforeFetch(args, context);

try {
// TODO: Switch to explicit args and not generic array
// @ts-ignore
const response = await originalFetch(...modifiedArgs);
context.response = response;
await FetchInstrumentation.createResponseSpan(context);
return response;
} catch (error) {
await FetchInstrumentation.createResponseSpan(context);
throw error;
}
};
}

/**
* Runs before the fetch command is executed
* @param {any[]} args The arguments passed to the fetch function
* @param {RequestExtenderContext} extenderContext A blank object that will be passed to the next hooks for fetch
* @private
*/
private static beforeFetch(args: any[], extenderContext: RequestExtenderContext): void {
logger.debug('Fetch instrumentor - before fetch function call', { args, extenderContext });
private static beforeFetch(args: any[], extenderContext: RequestExtenderContext): any[] {
logger.debug('Fetch instrumentor - before fetch function call', {
args,
extenderContext,
});
extenderContext.isTracedDisabled = true;
const { url, options } = FetchInstrumentation.parseRequestArguments(args);
const requestTracingData = BaseHttp.onRequestCreated({
Expand Down Expand Up @@ -97,52 +112,14 @@ export class FetchInstrumentation {
extenderContext.currentSpan = httpSpan;
}

let modifiedArgs = args;
if (addedHeaders) {
options.headers = headers;
FetchInstrumentation.addHeadersToFetchArguments(args, options);
modifiedArgs = FetchInstrumentation.addHeadersToFetchArguments(args, options);
}
extenderContext.isTracedDisabled = false;
}

/**
* Runs after the fetch promise is created but before it is returned to the user.
* Here we alter the promise to resolve to our own function that will record the response data.
* @param {any[]} args The arguments passed to the fetch function call
* @param {Promise<any>} originalFnResult The original promise returned by the fetch function
* @param {RequestExtenderContext} extenderContext The context object passed to the next hooks for fetch
* @private
*/
private static onFetchPromiseReturned(
args: any[],
originalFnResult: Promise<any>,
extenderContext: RequestExtenderContext
): void {
logger.debug('onFetchPromiseReturned', { args, originalFnResult, extenderContext });
if (extenderContext.isTracedDisabled) {
return;
}
if (!(originalFnResult instanceof Promise)) {
logger.debug('Fetch instrumentation after fetch: original function result is not a promise');
return;
}

const { transactionId, awsRequestId, requestData, requestRandomId } = extenderContext;

return hookPromiseAsyncHandlers(originalFnResult, {
thenHandler: async (response: Response) => {
return FetchInstrumentation.onFetchPromiseResolved({
transactionId,
awsRequestId,
requestData,
requestRandomId,
response,
});
},
catchHandler: async (args: any) => {
// TODO: Figure out what to do here
logger.debug(`afterFetch promise catch (args: ${args})`);
},
});
return modifiedArgs;
}

/**
Expand All @@ -155,13 +132,14 @@ export class FetchInstrumentation {
* @param {Response} response The response object returned by the fetch promise
* @private
*/
private static async onFetchPromiseResolved({
private static async createResponseSpan({
transactionId,
awsRequestId,
requestData,
requestRandomId,
response,
}: RequestExtenderContext): Promise<void> {
console.log('onFetchPromiseResolved - Start');
if (!response) {
logger.debug(`Fetch instrumentation response handler - no response: ${response}`);
return;
Expand All @@ -176,14 +154,16 @@ export class FetchInstrumentation {
requestRandomId,
response: responseData,
});

const bodyStream = clonedResponse.body;
if (bodyStream) {
logger.debug('Fetch instrumentation - body found in response');
const textDecoder = new TextDecoder();
// @ts-ignore
for await (const chunk: Uint8Array of bodyStream) {
try {
// TODO: reuse the decoder object
const chunkString = new TextDecoder().decode(chunk);
const chunkString = textDecoder.decode(chunk);
const { truncated } = responseDataWriterHandler(['data', chunkString]);
if (truncated) {
// No need to consume the rest of the body if it reached the limit
Expand All @@ -198,6 +178,7 @@ export class FetchInstrumentation {

logger.debug('Fetch instrumentation - response end');
responseDataWriterHandler(['end']);
console.log('onFetchPromiseResolved - End');
}

/**
Expand Down Expand Up @@ -226,13 +207,15 @@ export class FetchInstrumentation {
* @param {ParseHttpRequestOptions} options
* @private
*/
private static addHeadersToFetchArguments(args: any[], options: ParseHttpRequestOptions): void {
if (args.length === 1) {
args.push({ headers: options.headers });
private static addHeadersToFetchArguments(args: any[], options: ParseHttpRequestOptions): any[] {
const modifiedArgs = [...args];
if (modifiedArgs.length === 1) {
modifiedArgs.push({ headers: options.headers });
}
if (args.length === 2) {
Object.assign(args[1], { headers: options.headers });
if (modifiedArgs.length === 2) {
modifiedArgs[1] = { ...modifiedArgs[1], headers: options.headers };
}
return modifiedArgs;
}

/**
Expand Down
2 changes: 1 addition & 1 deletion src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -602,7 +602,7 @@ export function safeExecuteAsync<T>(
callback: Function,
message: string = 'Error in Lumigo tracer',
logLevel: string = logger.LOG_LEVELS.WARNING,
defaultReturn: T = undefined
defaultReturn = new Promise((resolve, reject) => resolve(undefined))
): Function {
return async function (...args) {
try {
Expand Down

0 comments on commit d5be678

Please sign in to comment.