diff --git a/packages/core/__tests__/Providers/AWSCloudWatchProvider-test.ts b/packages/core/__tests__/Providers/AWSCloudWatchProvider-test.ts index 6079438114d..ee5b362f50a 100644 --- a/packages/core/__tests__/Providers/AWSCloudWatchProvider-test.ts +++ b/packages/core/__tests__/Providers/AWSCloudWatchProvider-test.ts @@ -14,6 +14,7 @@ import { DescribeLogStreamsCommand, PutLogEventsCommand, } from '@aws-sdk/client-cloudwatch-logs'; +import { loggers } from 'winston'; const credentials = { accessKeyId: 'accessKeyId', @@ -178,33 +179,52 @@ describe('AWSCloudWatchProvider', () => { }); describe('pushLogs test', () => { + let provider; + let mockInitiateLogPushInterval; + beforeEach(() => { + provider = new AWSCloudWatchProvider(testConfig); + mockInitiateLogPushInterval = jest + .spyOn(provider as any, '_initiateLogPushInterval') + .mockImplementation(); + }); + afterEach(() => { + jest.clearAllMocks(); + }); it('should add the provided logs to the log queue', () => { - const provider = new AWSCloudWatchProvider(testConfig); provider.pushLogs([{ message: 'hello', timestamp: 1111 }]); let logQueue = provider.getLogQueue(); - expect(logQueue).toHaveLength(1); provider.pushLogs([ - { - message: 'goodbye', - timestamp: 1112, - }, - { - message: 'ohayou', - timestamp: 1113, - }, - { - message: 'konbanwa', - timestamp: 1114, - }, + { message: 'goodbye', timestamp: 1112 }, + { message: 'ohayou', timestamp: 1113 }, + { message: 'konbanwa', timestamp: 1114 }, ]); logQueue = provider.getLogQueue(); - expect(logQueue).toHaveLength(4); }); + it('should reset retry mechanism when _maxRetriesReached is true', () => { + provider['_maxRetriesReached'] = true; + provider['_retryCount'] = 6; + + provider.pushLogs([{ message: 'test', timestamp: Date.now() }]); + + expect(provider['_retryCount']).toBe(0); + expect(provider['_maxRetriesReached']).toBe(false); + expect(mockInitiateLogPushInterval).toHaveBeenCalledTimes(2); + }); + it('should not reset retry mechanism when _maxRetriesReached is false', () => { + provider['_maxRetriesReached'] = false; + provider['_retryCount'] = 3; + + provider.pushLogs([{ message: 'test', timestamp: Date.now() }]); + + expect(provider['_retryCount']).toBe(3); + expect(provider['_maxRetriesReached']).toBe(false); + expect(mockInitiateLogPushInterval).toHaveBeenCalledTimes(1); + }); }); describe('_safeUploadLogEvents test', () => { @@ -397,4 +417,88 @@ describe('AWSCloudWatchProvider', () => { }); }); }); + describe('_initiateLogPushInterval', () => { + let provider: AWSCloudWatchProvider; + let safeUploadLogEventsSpy: jest.SpyInstance; + let getDocUploadPermissibilitySpy: jest.SpyInstance; + let setIntervalSpy: jest.SpyInstance; + + beforeEach(() => { + jest.useFakeTimers(); + provider = new AWSCloudWatchProvider(testConfig); + safeUploadLogEventsSpy = jest.spyOn( + provider as any, + '_safeUploadLogEvents' + ); + getDocUploadPermissibilitySpy = jest.spyOn( + provider as any, + '_getDocUploadPermissibility' + ); + setIntervalSpy = jest.spyOn(global, 'setInterval'); + }); + + afterEach(() => { + jest.useRealTimers(); + jest.restoreAllMocks(); + }); + + it('should clear existing timer and set a new one', () => { + (provider as any)._timer = setInterval(() => {}, 1000); + (provider as any)._initiateLogPushInterval(); + + expect(setIntervalSpy).toHaveBeenCalledTimes(1); + }); + + it('should not upload logs if max retries are reached', async () => { + (provider as any)._maxRetriesReached = true; + (provider as any)._initiateLogPushInterval(); + + jest.advanceTimersByTime(2000); + await Promise.resolve(); + + expect(safeUploadLogEventsSpy).not.toHaveBeenCalled(); + }); + + it('should upload logs if conditions are met', async () => { + getDocUploadPermissibilitySpy.mockReturnValue(true); + safeUploadLogEventsSpy.mockResolvedValue(undefined); + + (provider as any)._initiateLogPushInterval(); + + jest.advanceTimersByTime(2000); + await Promise.resolve(); + + expect(safeUploadLogEventsSpy).toHaveBeenCalledTimes(1); + expect((provider as any)._retryCount).toBe(0); + }); + + it('should increment retry count on error', async () => { + getDocUploadPermissibilitySpy.mockReturnValue(true); + safeUploadLogEventsSpy.mockRejectedValue(new Error('Test error')); + + (provider as any)._initiateLogPushInterval(); + + jest.advanceTimersByTime(2000); + await Promise.resolve(); + + expect((provider as any)._retryCount).toBe(0); + }); + + it('should stop retrying after max retries', async () => { + getDocUploadPermissibilitySpy.mockReturnValue(true); + safeUploadLogEventsSpy.mockRejectedValue(new Error('Test error')); + (provider as any)._maxRetries = 3; + + (provider as any)._initiateLogPushInterval(); + + for (let i = 0; i < 4; i++) { + jest.advanceTimersByTime(2000); + await Promise.resolve(); // Allow any pending promise to resolve + } + + expect((provider as any)._retryCount).toBe(2); + + expect(safeUploadLogEventsSpy).toHaveBeenCalledTimes(4); + }); + }); }); diff --git a/packages/core/src/Providers/AWSCloudWatchProvider.ts b/packages/core/src/Providers/AWSCloudWatchProvider.ts index 37d4a688649..6348c846697 100644 --- a/packages/core/src/Providers/AWSCloudWatchProvider.ts +++ b/packages/core/src/Providers/AWSCloudWatchProvider.ts @@ -56,7 +56,8 @@ class AWSCloudWatchProvider implements LoggingProvider { private _timer; private _nextSequenceToken: string | undefined; private _maxRetries = 5; - private _retryCount; + private _retryCount = 0; + private _maxRetriesReached: boolean = false; constructor(config?: AWSCloudWatchProviderOptions) { this.configure(config); @@ -211,6 +212,12 @@ class AWSCloudWatchProvider implements LoggingProvider { public pushLogs(logs: InputLogEvent[]): void { logger.debug('pushing log events to Cloudwatch...'); this._dataTracker.logEvents = [...this._dataTracker.logEvents, ...logs]; + + if (this._maxRetriesReached) { + this._retryCount = 0; + this._maxRetriesReached = false; + this._initiateLogPushInterval(); + } } private async _validateLogGroupExistsAndCreate( @@ -493,6 +500,10 @@ class AWSCloudWatchProvider implements LoggingProvider { } this._timer = setInterval(async () => { + if (this._maxRetriesReached) { + return; + } + try { if (this._getDocUploadPermissibility()) { await this._safeUploadLogEvents(); @@ -500,15 +511,16 @@ class AWSCloudWatchProvider implements LoggingProvider { } } catch (err) { this._retryCount++; - if (this._retryCount < this._maxRetries) { + if (this._retryCount > this._maxRetries) { logger.error( - `error when calling _safeUploadLogEvents in the timer interval - ${err}` - ); - } else if (this._retryCount === this._maxRetries) { - logger.error( - `CloudWatch log upload failed after ${this._maxRetries} attempts. Suppressing further error logs. Upload attempts will continue in the background.` + `Max retries (${this._maxRetries}) reached. Stopping log uploads.` ); + this._maxRetriesReached = true; + return; } + logger.error( + `error when calling _safeUploadLogEvents in the timer interval - ${err}` + ); } }, 2000); }