From e0fdeb7bcbb24822607267b08b2919af5e73f2db Mon Sep 17 00:00:00 2001 From: Ivan Artemiev <29709626+iartemiev@users.noreply.github.com> Date: Tue, 29 Oct 2024 13:25:43 -0400 Subject: [PATCH] fix(api-graphql): events url pattern; non-retryable error handling (#13970) --- .../__tests__/AWSAppSyncEventProvider.test.ts | 177 ++++++++++++++++++ .../api-graphql/__tests__/appsyncUrl.test.ts | 14 ++ packages/api-graphql/__tests__/events.test.ts | 3 +- .../AWSAppSyncEventsProvider/index.ts | 19 +- .../AWSAppSyncRealTimeProvider/index.ts | 21 ++- .../AWSWebSocketProvider/appsyncUrl.ts | 5 +- .../Providers/AWSWebSocketProvider/index.ts | 35 ++-- .../api-graphql/src/Providers/constants.ts | 4 + packages/aws-amplify/package.json | 2 +- 9 files changed, 261 insertions(+), 19 deletions(-) create mode 100644 packages/api-graphql/__tests__/AWSAppSyncEventProvider.test.ts create mode 100644 packages/api-graphql/__tests__/appsyncUrl.test.ts diff --git a/packages/api-graphql/__tests__/AWSAppSyncEventProvider.test.ts b/packages/api-graphql/__tests__/AWSAppSyncEventProvider.test.ts new file mode 100644 index 00000000000..d84f1daad89 --- /dev/null +++ b/packages/api-graphql/__tests__/AWSAppSyncEventProvider.test.ts @@ -0,0 +1,177 @@ +import { Observable, Observer } from 'rxjs'; +import { Reachability } from '@aws-amplify/core/internals/utils'; +import { ConsoleLogger } from '@aws-amplify/core'; +import { MESSAGE_TYPES } from '../src/Providers/constants'; +import * as constants from '../src/Providers/constants'; + +import { delay, FakeWebSocketInterface } from './helpers'; +import { ConnectionState as CS } from '../src/types/PubSub'; + +import { AWSAppSyncEventProvider } from '../src/Providers/AWSAppSyncEventsProvider'; + +describe('AppSyncEventProvider', () => { + describe('subscribe()', () => { + describe('returned observer', () => { + describe('connection logic with mocked websocket', () => { + let fakeWebSocketInterface: FakeWebSocketInterface; + const loggerSpy: jest.SpyInstance = jest.spyOn( + ConsoleLogger.prototype, + '_log', + ); + + let provider: AWSAppSyncEventProvider; + let reachabilityObserver: Observer<{ online: boolean }>; + + beforeEach(async () => { + // Set the network to "online" for these tests + jest + .spyOn(Reachability.prototype, 'networkMonitor') + .mockImplementationOnce(() => { + return new Observable(observer => { + reachabilityObserver = observer; + }); + }) + // Twice because we subscribe to get the initial state then again to monitor reachability + .mockImplementationOnce(() => { + return new Observable(observer => { + reachabilityObserver = observer; + }); + }); + + fakeWebSocketInterface = new FakeWebSocketInterface(); + provider = new AWSAppSyncEventProvider(); + + // Saving this spy and resetting it by hand causes badness + // Saving it causes new websockets to be reachable across past tests that have not fully closed + // Resetting it proactively causes those same past tests to be dealing with null while they reach a settled state + jest + .spyOn(provider as any, '_getNewWebSocket') + .mockImplementation(() => { + fakeWebSocketInterface.newWebSocket(); + return fakeWebSocketInterface.webSocket as WebSocket; + }); + + // Reduce retry delay for tests to 100ms + Object.defineProperty(constants, 'MAX_DELAY_MS', { + value: 100, + }); + // Reduce retry delay for tests to 100ms + Object.defineProperty(constants, 'RECONNECT_DELAY', { + value: 100, + }); + }); + + afterEach(async () => { + provider?.close(); + await fakeWebSocketInterface?.closeInterface(); + fakeWebSocketInterface?.teardown(); + loggerSpy.mockClear(); + }); + + test('subscription observer error is triggered when a connection is formed and a non-retriable connection_error data message is received', async () => { + expect.assertions(3); + + const socketCloseSpy = jest.spyOn( + fakeWebSocketInterface.webSocket, + 'close', + ); + fakeWebSocketInterface.webSocket.readyState = WebSocket.OPEN; + + const observer = provider.subscribe({ + appSyncGraphqlEndpoint: 'ws://localhost:8080', + }); + + observer.subscribe({ + error: e => { + expect(e.errors[0].message).toEqual( + 'Connection failed: UnauthorizedException', + ); + }, + }); + + await fakeWebSocketInterface?.readyForUse; + await fakeWebSocketInterface?.triggerOpen(); + + // Resolve the message delivery actions + await Promise.resolve( + fakeWebSocketInterface?.sendDataMessage({ + type: MESSAGE_TYPES.GQL_CONNECTION_ERROR, + errors: [ + { + errorType: 'UnauthorizedException', // - non-retriable + errorCode: 401, + }, + ], + }), + ); + + // Watching for raised exception to be caught and logged + expect(loggerSpy).toHaveBeenCalledWith( + 'DEBUG', + expect.stringContaining('error on bound '), + expect.objectContaining({ + message: expect.stringMatching('UnauthorizedException'), + }), + ); + + await delay(1); + + expect(socketCloseSpy).toHaveBeenCalledWith(3001); + }); + + test('subscription observer error is not triggered when a connection is formed and a retriable connection_error data message is received', async () => { + expect.assertions(2); + + const observer = provider.subscribe({ + appSyncGraphqlEndpoint: 'ws://localhost:8080', + }); + + observer.subscribe({ + error: x => {}, + }); + + const openSocketAttempt = async () => { + await fakeWebSocketInterface?.readyForUse; + await fakeWebSocketInterface?.triggerOpen(); + + // Resolve the message delivery actions + await Promise.resolve( + fakeWebSocketInterface?.sendDataMessage({ + type: MESSAGE_TYPES.GQL_CONNECTION_ERROR, + errors: [ + { + errorType: 'Retriable Test', + errorCode: 408, // Request timed out - retriable + }, + ], + }), + ); + await fakeWebSocketInterface?.resetWebsocket(); + }; + + // Go through two connection attempts to excercise backoff and retriable raise + await openSocketAttempt(); + await openSocketAttempt(); + + // Watching for raised exception to be caught and logged + expect(loggerSpy).toHaveBeenCalledWith( + 'DEBUG', + expect.stringContaining('error on bound '), + expect.objectContaining({ + message: expect.stringMatching('Retriable Test'), + }), + ); + + await fakeWebSocketInterface?.waitUntilConnectionStateIn([ + CS.ConnectionDisrupted, + ]); + + expect(loggerSpy).toHaveBeenCalledWith( + 'DEBUG', + 'Connection failed: Retriable Test', + ); + }); + }); + }); + }); +}); diff --git a/packages/api-graphql/__tests__/appsyncUrl.test.ts b/packages/api-graphql/__tests__/appsyncUrl.test.ts new file mode 100644 index 00000000000..1f5ffdc25a9 --- /dev/null +++ b/packages/api-graphql/__tests__/appsyncUrl.test.ts @@ -0,0 +1,14 @@ +import { getRealtimeEndpointUrl } from '../src/Providers/AWSWebSocketProvider/appsyncUrl'; + +describe('getRealtimeEndpointUrl', () => { + test('events', () => { + const httpUrl = + 'https://abcdefghijklmnopqrstuvwxyz.appsync-api.us-east-1.amazonaws.com/event'; + + const res = getRealtimeEndpointUrl(httpUrl).toString(); + + expect(res).toEqual( + 'wss://abcdefghijklmnopqrstuvwxyz.appsync-realtime-api.us-east-1.amazonaws.com/event/realtime', + ); + }); +}); diff --git a/packages/api-graphql/__tests__/events.test.ts b/packages/api-graphql/__tests__/events.test.ts index 334d8e49b7c..a1f8054e3dd 100644 --- a/packages/api-graphql/__tests__/events.test.ts +++ b/packages/api-graphql/__tests__/events.test.ts @@ -3,6 +3,7 @@ import { AppSyncEventProvider } from '../src/Providers/AWSAppSyncEventsProvider' import { events } from '../src/'; import { appsyncRequest } from '../src/internals/events/appsyncRequest'; + import { GraphQLAuthMode } from '@aws-amplify/core/internals/utils'; const abortController = new AbortController(); @@ -38,7 +39,7 @@ jest.mock('../src/internals/events/appsyncRequest', () => { * so we're just sanity checking that the expected auth mode is passed to the provider in this test file. */ -describe('Events', () => { +describe('Events client', () => { afterAll(() => { jest.resetAllMocks(); jest.clearAllMocks(); diff --git a/packages/api-graphql/src/Providers/AWSAppSyncEventsProvider/index.ts b/packages/api-graphql/src/Providers/AWSAppSyncEventsProvider/index.ts index 6a25bc9ff25..5bfeddd89fa 100644 --- a/packages/api-graphql/src/Providers/AWSAppSyncEventsProvider/index.ts +++ b/packages/api-graphql/src/Providers/AWSAppSyncEventsProvider/index.ts @@ -9,7 +9,7 @@ import { } from '@aws-amplify/core/internals/utils'; import { CustomHeaders } from '@aws-amplify/data-schema/runtime'; -import { MESSAGE_TYPES } from '../constants'; +import { DEFAULT_KEEP_ALIVE_TIMEOUT, MESSAGE_TYPES } from '../constants'; import { AWSWebSocketProvider } from '../AWSWebSocketProvider'; import { awsRealTimeHeaderBasedAuth } from '../AWSWebSocketProvider/authHeaders'; @@ -44,7 +44,7 @@ interface DataResponse { const PROVIDER_NAME = 'AWSAppSyncEventsProvider'; const WS_PROTOCOL_NAME = 'aws-appsync-event-ws'; -class AWSAppSyncEventProvider extends AWSWebSocketProvider { +export class AWSAppSyncEventProvider extends AWSWebSocketProvider { constructor() { super({ providerName: PROVIDER_NAME, wsProtocolName: WS_PROTOCOL_NAME }); } @@ -187,6 +187,21 @@ class AWSAppSyncEventProvider extends AWSWebSocketProvider { type: MESSAGE_TYPES.EVENT_STOP, }; } + + protected _extractConnectionTimeout(data: Record): number { + const { connectionTimeoutMs = DEFAULT_KEEP_ALIVE_TIMEOUT } = data; + + return connectionTimeoutMs; + } + + protected _extractErrorCodeAndType(data: Record): { + errorCode: number; + errorType: string; + } { + const { errors: [{ errorType = '', errorCode = 0 } = {}] = [] } = data; + + return { errorCode, errorType }; + } } export const AppSyncEventProvider = new AWSAppSyncEventProvider(); diff --git a/packages/api-graphql/src/Providers/AWSAppSyncRealTimeProvider/index.ts b/packages/api-graphql/src/Providers/AWSAppSyncRealTimeProvider/index.ts index ade1cde704c..2ba5968a890 100644 --- a/packages/api-graphql/src/Providers/AWSAppSyncRealTimeProvider/index.ts +++ b/packages/api-graphql/src/Providers/AWSAppSyncRealTimeProvider/index.ts @@ -10,7 +10,7 @@ import { } from '@aws-amplify/core/internals/utils'; import { CustomHeaders } from '@aws-amplify/data-schema/runtime'; -import { MESSAGE_TYPES } from '../constants'; +import { DEFAULT_KEEP_ALIVE_TIMEOUT, MESSAGE_TYPES } from '../constants'; import { AWSWebSocketProvider } from '../AWSWebSocketProvider'; import { awsRealTimeHeaderBasedAuth } from '../AWSWebSocketProvider/authHeaders'; @@ -158,4 +158,23 @@ export class AWSAppSyncRealTimeProvider extends AWSWebSocketProvider { type: MESSAGE_TYPES.GQL_STOP, }; } + + protected _extractConnectionTimeout(data: Record): number { + const { + payload: { connectionTimeoutMs = DEFAULT_KEEP_ALIVE_TIMEOUT } = {}, + } = data; + + return connectionTimeoutMs; + } + + protected _extractErrorCodeAndType(data: any): { + errorCode: number; + errorType: string; + } { + const { + payload: { errors: [{ errorType = '', errorCode = 0 } = {}] = [] } = {}, + } = data; + + return { errorCode, errorType }; + } } diff --git a/packages/api-graphql/src/Providers/AWSWebSocketProvider/appsyncUrl.ts b/packages/api-graphql/src/Providers/AWSWebSocketProvider/appsyncUrl.ts index 96e17c0fea3..b79c9cca1eb 100644 --- a/packages/api-graphql/src/Providers/AWSWebSocketProvider/appsyncUrl.ts +++ b/packages/api-graphql/src/Providers/AWSWebSocketProvider/appsyncUrl.ts @@ -13,7 +13,7 @@ const protocol = 'wss://'; const standardDomainPattern = /^https:\/\/\w{26}\.appsync-api\.\w{2}(?:(?:-\w{2,})+)-\d\.amazonaws.com(?:\.cn)?\/graphql$/i; const eventDomainPattern = - /^https:\/\/\w{26}\.ddpg-api\.\w{2}(?:(?:-\w{2,})+)-\d\.amazonaws.com(?:\.cn)?\/event$/i; + /^https:\/\/\w{26}\.\w+-api\.\w{2}(?:(?:-\w{2,})+)-\d\.amazonaws.com(?:\.cn)?\/event$/i; const customDomainPath = '/realtime'; export const isCustomDomain = (url: string): boolean => { @@ -31,7 +31,8 @@ export const getRealtimeEndpointUrl = ( if (isEventDomain(realtimeEndpoint)) { realtimeEndpoint = realtimeEndpoint .concat(customDomainPath) - .replace('ddpg-api', 'grt-gamma'); + .replace('ddpg-api', 'grt-gamma') + .replace('appsync-api', 'appsync-realtime-api'); } else if (isCustomDomain(realtimeEndpoint)) { realtimeEndpoint = realtimeEndpoint.concat(customDomainPath); } else { diff --git a/packages/api-graphql/src/Providers/AWSWebSocketProvider/index.ts b/packages/api-graphql/src/Providers/AWSWebSocketProvider/index.ts index 0acdfeea5ee..3553a008123 100644 --- a/packages/api-graphql/src/Providers/AWSWebSocketProvider/index.ts +++ b/packages/api-graphql/src/Providers/AWSWebSocketProvider/index.ts @@ -27,6 +27,7 @@ import { MAX_DELAY_MS, MESSAGE_TYPES, NON_RETRYABLE_CODES, + NON_RETRYABLE_ERROR_TYPES, SOCKET_STATUS, START_ACK_TIMEOUT, SUBSCRIPTION_STATUS, @@ -546,6 +547,15 @@ export abstract class AWSWebSocketProvider { { id: string; payload: string | Record; type: string }, ]; + protected abstract _extractConnectionTimeout( + data: Record, + ): number; + + protected abstract _extractErrorCodeAndType(data: Record): { + errorCode: number; + errorType: string; + }; + private _handleIncomingSubscriptionMessage(message: MessageEvent) { if (typeof message.data !== 'string') { return; @@ -629,14 +639,14 @@ export abstract class AWSWebSocketProvider { }); this.logger.debug( - `${CONTROL_MSG.CONNECTION_FAILED}: ${JSON.stringify(payload)}`, + `${CONTROL_MSG.CONNECTION_FAILED}: ${JSON.stringify(payload ?? data)}`, ); observer.error({ errors: [ { ...new GraphQLError( - `${CONTROL_MSG.CONNECTION_FAILED}: ${JSON.stringify(payload)}`, + `${CONTROL_MSG.CONNECTION_FAILED}: ${JSON.stringify(payload ?? data)}`, ), }, ], @@ -830,10 +840,10 @@ export abstract class AWSWebSocketProvider { ); const data = JSON.parse(message.data) as ParsedMessagePayload; - const { - type, - payload: { connectionTimeoutMs = DEFAULT_KEEP_ALIVE_TIMEOUT } = {}, - } = data; + + const { type } = data; + + const connectionTimeoutMs = this._extractConnectionTimeout(data); if (type === MESSAGE_TYPES.GQL_CONNECTION_ACK) { ackOk = true; @@ -844,11 +854,7 @@ export abstract class AWSWebSocketProvider { } if (type === MESSAGE_TYPES.GQL_CONNECTION_ERROR) { - const { - payload: { - errors: [{ errorType = '', errorCode = 0 } = {}] = [], - } = {}, - } = data; + const { errorType, errorCode } = this._extractErrorCodeAndType(data); // TODO(Eslint): refactor to reject an Error object instead of a plain object // eslint-disable-next-line prefer-promise-reject-errors @@ -920,7 +926,12 @@ export abstract class AWSWebSocketProvider { errorCode: number; }; - if (NON_RETRYABLE_CODES.includes(errorCode)) { + if ( + NON_RETRYABLE_CODES.includes(errorCode) || + // Event API does not currently return `errorCode`. This may change in the future. + // For now fall back to also checking known non-retryable error types + NON_RETRYABLE_ERROR_TYPES.includes(errorType) + ) { throw new NonRetryableError(errorType); } else if (errorType) { throw new Error(errorType); diff --git a/packages/api-graphql/src/Providers/constants.ts b/packages/api-graphql/src/Providers/constants.ts index 79641a32e68..5e82f672081 100644 --- a/packages/api-graphql/src/Providers/constants.ts +++ b/packages/api-graphql/src/Providers/constants.ts @@ -5,6 +5,10 @@ export { AMPLIFY_SYMBOL } from '@aws-amplify/core/internals/utils'; export const MAX_DELAY_MS = 5000; export const NON_RETRYABLE_CODES = [400, 401, 403]; +export const NON_RETRYABLE_ERROR_TYPES = [ + 'BadRequestException', + 'UnauthorizedException', +]; export const CONNECTION_STATE_CHANGE = 'ConnectionStateChange'; diff --git a/packages/aws-amplify/package.json b/packages/aws-amplify/package.json index 68a21ce9e5a..214bcc2e020 100644 --- a/packages/aws-amplify/package.json +++ b/packages/aws-amplify/package.json @@ -335,7 +335,7 @@ "name": "[API] generateClient (AppSync)", "path": "./dist/esm/api/index.mjs", "import": "{ generateClient }", - "limit": "44.0 kB" + "limit": "44.1 kB" }, { "name": "[API] REST API handlers",