From c4fe8e7eb3fc5f29d4c4261b87d9194821097842 Mon Sep 17 00:00:00 2001 From: 21e8 Date: Sun, 8 Dec 2024 15:00:52 +0100 Subject: [PATCH] fixes --- package.json | 2 +- src/__tests__/batcher.test.ts | 111 ++++++++--------- src/__tests__/processors/telegram.test.ts | 13 +- src/batcher.ts | 139 ++++++++++++++-------- src/index.ts | 2 + src/processors/console.ts | 4 +- src/processors/custom.ts | 19 +++ src/processors/telegram.ts | 1 + src/scripts/test-timeout.ts | 28 ++--- src/types.ts | 5 + src/utils/errorClassifier.ts | 2 - 11 files changed, 196 insertions(+), 130 deletions(-) create mode 100644 src/processors/custom.ts diff --git a/package.json b/package.json index 430248d..3c4a7af 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "0xalice-tgram-bot", - "version": "0.2.16", + "version": "0.2.17", "description": "Batched Telegram notification bot for 0xAlice", "type": "module", "main": "./dist/cjs/index.js", diff --git a/src/__tests__/batcher.test.ts b/src/__tests__/batcher.test.ts index 96d3768..fbbc026 100644 --- a/src/__tests__/batcher.test.ts +++ b/src/__tests__/batcher.test.ts @@ -1,8 +1,7 @@ import { createMessageBatcher, - resetBatcher, - globalQueues, - timers, + // globalQueues, + // timers, } from '../batcher'; import type { Message, MessageBatcher, MessageProcessor } from '../types'; @@ -15,6 +14,7 @@ describe('MessageBatcher', () => { beforeEach(() => { processedMessages = []; mockProcessor = { + name: 'mock', processBatch: jest.fn(async (messages) => { processedMessages = messages; }), @@ -29,49 +29,37 @@ describe('MessageBatcher', () => { } jest.clearAllMocks(); jest.useRealTimers(); - resetBatcher(); }); - it('should set a timer and process messages after the specified delay', async () => { - jest.useFakeTimers(); - const setTimeoutSpy = jest.spyOn(global, 'setTimeout'); + it('should set a timer and process 1 message after the specified delay', async () => { + jest.useRealTimers(); const processBatchSpy = jest.fn(); + const mockProcessor = { + name: 'test', + processBatch: processBatchSpy, + }; // Clear any existing queues - resetBatcher(); + batcher = createMessageBatcher([mockProcessor], { + maxBatchSize: 5, + maxWaitMs: 100, + }); // Verify queue doesn't exist yet - expect(globalQueues.has('default')).toBe(false); - - batcher = createMessageBatcher( - [ - mockProcessor, - { - processBatch: processBatchSpy, - }, - ], - { - maxBatchSize: 5, - maxWaitMs: 100, - } - ); + expect(batcher.queues.has('default')).toBe(false); // Add a message batcher.info('test message'); // Verify queue was initialized - expect(globalQueues.has('default')).toBe(true); - expect(globalQueues.get('default')).toHaveLength(1); + expect(batcher.queues.has('default')).toBe(true); + expect(batcher.queues.get('default')).toHaveLength(1); - // Verify that setTimeout was called to add a timer - expect(setTimeoutSpy).toHaveBeenCalledWith(expect.any(Function), 100); - // expect(processBatchSpy).toHaveBeenCalled(); - - // Fast-forward time to trigger the timer - jest.advanceTimersByTime(100); - await Promise.resolve(); + // Wait for the timer to expire and any promises to resolve + await new Promise((resolve) => setTimeout(resolve, 150)); + await new Promise(process.nextTick); // Verify the message was processed - expect(mockProcessor.processBatch).toHaveBeenCalledWith([ + expect(processBatchSpy).toHaveBeenCalledWith([ { chatId: 'default', text: 'test message', @@ -79,24 +67,21 @@ describe('MessageBatcher', () => { error: undefined, }, ]); - - setTimeoutSpy.mockRestore(); - jest.useRealTimers(); }); - it('should set a timer and process messages after the specified delay', async () => { + it('should set a timer and process 2 messages after the specified delay', async () => { jest.useRealTimers(); const processBatchSpy = jest.fn(); // Clear any existing queues - resetBatcher(); // Verify queue doesn't exist yet - expect(globalQueues.has('default')).toBe(false); + expect(batcher.queues.has('default')).toBe(false); batcher = createMessageBatcher( [ mockProcessor, { + name: 'mock', processBatch: processBatchSpy, }, ], @@ -110,8 +95,8 @@ describe('MessageBatcher', () => { batcher.info('test message'); // Verify queue was initialized - expect(globalQueues.has('default')).toBe(true); - expect(globalQueues.get('default')).toHaveLength(1); + expect(batcher.queues.has('default')).toBe(true); + expect(batcher.queues.get('default')).toHaveLength(1); // Wait for the timer to expire await new Promise((resolve) => setTimeout(resolve, 200)); @@ -265,6 +250,7 @@ describe('MessageBatcher', () => { it('should handle processor failures gracefully', async () => { const consoleSpy = jest.spyOn(console, 'error').mockImplementation(); const failingProcessor: MessageProcessor = { + name: 'mock', processBatch: jest.fn().mockRejectedValue(new Error('Process failed')), }; @@ -277,7 +263,7 @@ describe('MessageBatcher', () => { await batcher.flush(); expect(consoleSpy).toHaveBeenCalledWith( - 'Processor 0 failed:', + 'Processor mock failed:', expect.any(Error) ); @@ -334,7 +320,7 @@ describe('MessageBatcher', () => { ]); }); - it('should clear all queues when reset', async () => { + it('should flush items in queue', async () => { batcher = createMessageBatcher([mockProcessor], { maxBatchSize: 5, maxWaitMs: 1000, @@ -344,15 +330,17 @@ describe('MessageBatcher', () => { batcher.warning('Message 2'); // Reset before processing - resetBatcher(); await batcher.flush(); - expect(processedMessages).toHaveLength(0); + expect(processedMessages).toHaveLength(2); }); it('should process messages synchronously with flushSync', async () => { const processBatchSpy = jest.fn().mockResolvedValue(undefined); - const syncProcessor = { processBatch: processBatchSpy }; + const syncProcessor = { + name: 'mock', + processBatch: processBatchSpy, + }; batcher = createMessageBatcher([syncProcessor], { maxBatchSize: 5, @@ -387,7 +375,10 @@ describe('MessageBatcher', () => { const processBatchSpy = jest.fn(() => Promise.reject(new Error('Process failed')) ); - const failingProcessor = { processBatch: processBatchSpy }; + const failingProcessor = { + name: 'mock', + processBatch: processBatchSpy, + }; batcher = createMessageBatcher([failingProcessor], { maxBatchSize: 2, @@ -427,6 +418,7 @@ describe('MessageBatcher', () => { it('should handle sync processor errors without breaking the queue', () => { const consoleSpy = jest.spyOn(console, 'error').mockImplementation(); const syncProcessor = { + name: 'mock', processBatch: jest.fn().mockImplementation(() => { throw new Error('Sync error'); }), @@ -451,7 +443,6 @@ describe('MessageBatcher', () => { it('should initialize queue for new chat ID', async () => { // Reset the global queue to ensure we start fresh - resetBatcher(); batcher = createMessageBatcher([mockProcessor], { maxBatchSize: 5, @@ -476,15 +467,13 @@ describe('MessageBatcher', () => { }); it('should initialize empty queue for new chat ID', () => { - resetBatcher(); - batcher = createMessageBatcher([mockProcessor], { maxBatchSize: 5, maxWaitMs: 1000, }); // Access the internal queue map - const queues = globalQueues as Map; + const queues = batcher.queues as Map; expect(queues.has('default')).toBe(false); // This should initialize the queue @@ -508,14 +497,14 @@ describe('MessageBatcher', () => { }); // Verify queue doesn't exist initially - expect(globalQueues.has('default')).toBe(false); + expect(batcher.queues.has('default')).toBe(false); // Add a message which should initialize the queue (batcher as any).queueMessage('test message', 'info'); // Verify queue was initialized with the message - expect(globalQueues.has('default')).toBe(true); - const queue = globalQueues.get('default'); + expect(batcher.queues.has('default')).toBe(true); + const queue = batcher.queues.get('default'); expect(queue).toBeDefined(); expect(queue).toEqual([ { @@ -529,7 +518,7 @@ describe('MessageBatcher', () => { it('should handle undefined queue result from get', async () => { // Clear any existing queues - globalQueues.clear(); + batcher.queues.clear(); // Create a new batcher batcher = createMessageBatcher([mockProcessor], { @@ -538,7 +527,7 @@ describe('MessageBatcher', () => { }); // Force the queue to be undefined for the first get - const queue = globalQueues.get('default'); + const queue = batcher.queues.get('default'); expect(queue).toBeUndefined(); // Add a message - this should handle the undefined case @@ -646,14 +635,14 @@ describe('MessageBatcher', () => { (batcher as any).queueMessage('test message', 'info'); // Verify timer was created - expect(timers.size).toBe(1); + expect(batcher.timers.size).toBe(1); // Advance time to trigger the timer jest.advanceTimersByTime(1000); await Promise.resolve(); // Verify timer was cleaned up - expect(timers.size).toBe(0); + expect(batcher.timers.size).toBe(0); // Verify message was processed expect(mockProcessor.processBatch).toHaveBeenCalledWith([ @@ -680,7 +669,7 @@ describe('MessageBatcher', () => { (batcher as any).queueMessage('test message', 'info'); // Get the timer ID - const timerIds = Array.from(timers.values()); + const timerIds = Array.from(batcher.timers.values()); expect(timerIds).toHaveLength(1); // Advance time to trigger the callback @@ -688,11 +677,11 @@ describe('MessageBatcher', () => { await Promise.resolve(); // Verify timer was deleted - expect(timers.has('default')).toBe(false); - expect(timers.size).toBe(0); + expect(batcher.timers.has('default')).toBe(false); + expect(batcher.timers.size).toBe(0); // Add another message to verify new timer can be created (batcher as any).queueMessage('test message 2', 'info'); - expect(timers.size).toBe(1); + expect(batcher.timers.size).toBe(1); jest.useRealTimers(); }); diff --git a/src/__tests__/processors/telegram.test.ts b/src/__tests__/processors/telegram.test.ts index 77a0761..2e98844 100644 --- a/src/__tests__/processors/telegram.test.ts +++ b/src/__tests__/processors/telegram.test.ts @@ -17,8 +17,9 @@ describe('TelegramProcessor', () => { json: () => Promise.resolve({}), } as Response) ); - // Silence console.log except for specific tests + // Silence console output except for specific tests jest.spyOn(console, 'log').mockImplementation(); + jest.spyOn(console, 'error').mockImplementation(); }); afterEach(() => { @@ -64,6 +65,7 @@ describe('TelegramProcessor', () => { }); it('should throw error on failed API response', async () => { + const consoleSpy = jest.spyOn(console, 'error'); // Updated mock implementation for failed response (global.fetch as jest.Mock).mockImplementationOnce(() => Promise.resolve({ @@ -86,6 +88,15 @@ describe('TelegramProcessor', () => { await expect(processor.processBatch(messages)).rejects.toThrow( 'Telegram API error: Bad Request - Bad Request: message text is empty' ); + + expect(consoleSpy).toHaveBeenCalledWith( + '[Telegram] API Response:', + expect.objectContaining({ + ok: false, + error_code: 400, + description: expect.any(String) + }) + ); }); it('should handle empty message batch', async () => { diff --git a/src/batcher.ts b/src/batcher.ts index f53cea6..5780c5e 100644 --- a/src/batcher.ts +++ b/src/batcher.ts @@ -7,23 +7,46 @@ import { } from './types'; // Export for testing -export const globalQueues: Map = new Map(); -export const timers: Map = new Map(); - +let globalBatcher: MessageBatcher | null = null; export function createMessageBatcher( processors: MessageProcessor[], config: Required ): MessageBatcher { + if (globalBatcher) { + return globalBatcher; + } + let processInterval: NodeJS.Timeout | null = null; + const queues: Map = new Map(); + const timers: Map = new Map(); + let extraProcessors: MessageProcessor[] = []; + const processorNames = new Set(processors.map((p) => p.name)); function startProcessing(): void { - processInterval = setInterval(() => { - for (const chatId of globalQueues.keys()) { - processBatch(chatId); + processInterval = setInterval(async () => { + for (const chatId of queues.keys()) { + await processBatch(chatId); } }, config.maxWaitMs); } + function addExtraProcessor(processor: MessageProcessor): void { + if (!processorNames.has(processor.name)) { + console.error(`Processor ${processor.name} not found in main processors`); + return; + } + extraProcessors.push(processor); + } + + function removeExtraProcessor(processor: MessageProcessor): void { + if (!processorNames.has(processor.name)) { + console.error(`Processor ${processor.name} not found in main processors`); + return; + } + extraProcessors = extraProcessors.filter((p) => p !== processor); + processorNames.delete(processor.name); + } + function info(message: string): void { queueMessage(message, 'info'); } @@ -35,17 +58,18 @@ export function createMessageBatcher( function error(message: string, error?: Error | string): void { queueMessage(message, 'error', error); } + function queueMessage( message: string, level: NotificationLevel, error?: Error | string ): void { const chatId = 'default'; - if (!globalQueues.has(chatId)) { - globalQueues.set(chatId, []); + if (!queues.has(chatId)) { + queues.set(chatId, []); } - const queue = globalQueues.get(chatId) ?? []; + const queue = queues.get(chatId) ?? []; queue.push({ chatId, text: message, level, error }); // Set a timeout to process this batch if maxBatchSize isn't reached @@ -64,8 +88,24 @@ export function createMessageBatcher( } } + // Helper function for concurrent processing + async function processInBatches( + items: T[], + concurrency: number, + processor: (item: T) => Promise + ): Promise { + const chunks = []; + for (let i = 0; i < items.length; i += concurrency) { + chunks.push(items.slice(i, i + concurrency)); + } + + for (const chunk of chunks) { + await Promise.all(chunk.map(processor)); + } + } + async function processBatch(chatId: string): Promise { - const queue = globalQueues.get(chatId); + const queue = queues.get(chatId); if (!queue?.length) return; // Clear any pending timer for this batch @@ -76,53 +116,57 @@ export function createMessageBatcher( } const batch = [...queue]; - globalQueues.set(chatId, []); - - const results = await Promise.allSettled( - processors.map((processor) => processor.processBatch(batch)) - ); - - for (let i = 0; i < results.length; i++) { - const result = results[i]; - if (result.status === 'rejected') { - console.error(`Processor ${i} failed:`, result.reason); - } + queues.set(chatId, []); + + try { + const allProcessors = [...processors, ...extraProcessors]; + await processInBatches( + allProcessors, + 3, // Process 3 processors concurrently + async (processor) => { + try { + await processor.processBatch(batch); + } catch (error) { + console.error(`Processor ${processor.name} failed:`, error); + } + } + ); + } catch (error) { + console.error('Error processing batch:', error); } } function processBatchSync(chatId: string): void { - const queue = globalQueues.get(chatId); + const queue = queues.get(chatId); if (!queue?.length) return; const batch = [...queue]; - globalQueues.set(chatId, []); - - for (const item of batch) { - for (const processor of processors) { - try { - if (processor.processBatchSync) { - processor.processBatchSync([item]); - } else { - // Handle async processBatch by ignoring the Promise - (processor.processBatch([item]) as Promise).catch((error) => { - console.error(`Processor failed:`, error); - }); - } - } catch (error) { - console.error(`Processor failed:`, error); + queues.set(chatId, []); + + for (const processor of processors) { + try { + if (processor.processBatchSync) { + processor.processBatchSync(batch); + } else { + // Handle async processBatch by ignoring the Promise + (processor.processBatch(batch) as Promise).catch((error) => { + console.error(`Processor ${processor.name} failed:`, error); + }); } + } catch (error) { + console.error(`Processor ${processor.name} failed:`, error); } } } async function flush(): Promise { - for (const chatId of globalQueues.keys()) { + for (const chatId of queues.keys()) { await processBatch(chatId); } } function flushSync(): void { - for (const chatId of globalQueues.keys()) { + for (const chatId of queues.keys()) { processBatchSync(chatId); } } @@ -136,25 +180,24 @@ export function createMessageBatcher( clearTimeout(timer); } timers.clear(); + queues.clear(); } - // Start processing on creation startProcessing(); - // Return a new instance - return { + globalBatcher = { info, warning, error, queueMessage, - processBatch: processBatch, + processBatch, flush, flushSync, destroy, + queues, + timers, + addExtraProcessor, + removeExtraProcessor, }; -} - -// Reset function now only clears the queue -export function resetBatcher(): void { - globalQueues.clear(); + return globalBatcher; } diff --git a/src/index.ts b/src/index.ts index a0e4d80..ee2ff2c 100644 --- a/src/index.ts +++ b/src/index.ts @@ -3,6 +3,8 @@ export * from './types'; export { createTelegramProcessor } from './processors/telegram'; export type { TelegramConfig } from './types'; +export { createCustomProcessor } from './processors/custom'; + export { createConsoleProcessor } from './processors/console'; export { diff --git a/src/processors/console.ts b/src/processors/console.ts index aa68515..d38c4e8 100644 --- a/src/processors/console.ts +++ b/src/processors/console.ts @@ -18,5 +18,5 @@ export function createConsoleProcessor(): MessageProcessor { } } - return { processBatch }; -} \ No newline at end of file + return { processBatch, name: 'console' }; +} diff --git a/src/processors/custom.ts b/src/processors/custom.ts new file mode 100644 index 0000000..5c0f4dc --- /dev/null +++ b/src/processors/custom.ts @@ -0,0 +1,19 @@ +import { Message } from '../types'; + +import { MessageProcessor } from '../types'; + +export function createCustomProcessor({ + name, + processBatch, + processBatchSync, +}: { + name: string; + processBatch: (messages: Message[]) => Promise; + processBatchSync?: (messages: Message[]) => void; +}): MessageProcessor { + return { + name, + processBatch: processBatch || processBatchSync, + processBatchSync: processBatchSync || processBatch, + }; +} diff --git a/src/processors/telegram.ts b/src/processors/telegram.ts index 49f0f17..499c863 100644 --- a/src/processors/telegram.ts +++ b/src/processors/telegram.ts @@ -93,5 +93,6 @@ export function createTelegramProcessor( return { processBatch, + name: 'telegram', }; } diff --git a/src/scripts/test-timeout.ts b/src/scripts/test-timeout.ts index 7105302..3c478a0 100644 --- a/src/scripts/test-timeout.ts +++ b/src/scripts/test-timeout.ts @@ -2,6 +2,7 @@ import { createMessageBatcher } from '../batcher.js'; import { type Message } from '../types.js'; const mockProcessor = { + name: 'mock', processBatch: async (messages: Message[]) => { console.log('Processing batch:', messages); }, @@ -9,30 +10,27 @@ const mockProcessor = { async function testTimeout() { console.log('Starting timeout test...'); - - const batcher = createMessageBatcher( - [mockProcessor], - { - maxBatchSize: 5, - maxWaitMs: 2000, // 2 seconds for easier observation - } - ); + + const batcher = createMessageBatcher([mockProcessor], { + maxBatchSize: 5, + maxWaitMs: 2000, // 2 seconds for easier observation + }); console.log('Sending first message...'); batcher.info('First message'); - + // Wait 1 second - await new Promise(resolve => setTimeout(resolve, 1000)); - + await new Promise((resolve) => setTimeout(resolve, 1000)); + console.log('Sending second message...'); batcher.info('Second message'); - + // Wait 3 seconds to ensure the timeout triggers - await new Promise(resolve => setTimeout(resolve, 3000)); - + await new Promise((resolve) => setTimeout(resolve, 3000)); + console.log('Test complete. Cleaning up...'); batcher.destroy(); } // Run the test -testTimeout().catch(console.error); \ No newline at end of file +testTimeout().catch(console.error); diff --git a/src/types.ts b/src/types.ts index fae826e..0140735 100644 --- a/src/types.ts +++ b/src/types.ts @@ -20,6 +20,7 @@ export interface TelegramConfig { } export interface MessageProcessor { + name: string; processBatch(messages: Message[]): void | Promise; processBatchSync?(messages: Message[]): void; } @@ -33,6 +34,10 @@ export interface MessageBatcher { flush(): Promise; flushSync(): void; destroy(): void; + queues: Map; + timers: Map; + addExtraProcessor(processor: MessageProcessor): void; + removeExtraProcessor(processor: MessageProcessor): void; } export type ProcessorOptions = { diff --git a/src/utils/errorClassifier.ts b/src/utils/errorClassifier.ts index 6a02fb8..46dfe3a 100644 --- a/src/utils/errorClassifier.ts +++ b/src/utils/errorClassifier.ts @@ -109,8 +109,6 @@ export async function classifyError( const now = Date.now(); const patterns = getPatterns(); - console.log('Patterns:', patterns); - console.log('Message:', message); for (const [pattern, category, severity, aggregation] of patterns) { let matches = false;