Skip to content

Commit

Permalink
fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
21e8 committed Dec 8, 2024
1 parent 7aeb1f8 commit c4fe8e7
Show file tree
Hide file tree
Showing 11 changed files with 196 additions and 130 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "0xalice-tgram-bot",
"version": "0.2.16",
"version": "0.2.17",
"description": "Batched Telegram notification bot for 0xAlice",
"type": "module",
"main": "./dist/cjs/index.js",
Expand Down
111 changes: 50 additions & 61 deletions src/__tests__/batcher.test.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
import {
createMessageBatcher,
resetBatcher,
globalQueues,
timers,
// globalQueues,
// timers,
} from '../batcher';
import type { Message, MessageBatcher, MessageProcessor } from '../types';

Expand All @@ -15,6 +14,7 @@ describe('MessageBatcher', () => {
beforeEach(() => {
processedMessages = [];
mockProcessor = {
name: 'mock',
processBatch: jest.fn(async (messages) => {
processedMessages = messages;
}),
Expand All @@ -29,74 +29,59 @@ describe('MessageBatcher', () => {
}
jest.clearAllMocks();
jest.useRealTimers();
resetBatcher();
});
it('should set a timer and process messages after the specified delay', async () => {
jest.useFakeTimers();
const setTimeoutSpy = jest.spyOn(global, 'setTimeout');
it('should set a timer and process 1 message after the specified delay', async () => {
jest.useRealTimers();
const processBatchSpy = jest.fn();
const mockProcessor = {
name: 'test',
processBatch: processBatchSpy,
};

// Clear any existing queues
resetBatcher();
batcher = createMessageBatcher([mockProcessor], {
maxBatchSize: 5,
maxWaitMs: 100,
});

// Verify queue doesn't exist yet
expect(globalQueues.has('default')).toBe(false);

batcher = createMessageBatcher(
[
mockProcessor,
{
processBatch: processBatchSpy,
},
],
{
maxBatchSize: 5,
maxWaitMs: 100,
}
);
expect(batcher.queues.has('default')).toBe(false);

// Add a message
batcher.info('test message');

// Verify queue was initialized
expect(globalQueues.has('default')).toBe(true);
expect(globalQueues.get('default')).toHaveLength(1);
expect(batcher.queues.has('default')).toBe(true);
expect(batcher.queues.get('default')).toHaveLength(1);

// Verify that setTimeout was called to add a timer
expect(setTimeoutSpy).toHaveBeenCalledWith(expect.any(Function), 100);
// expect(processBatchSpy).toHaveBeenCalled();

// Fast-forward time to trigger the timer
jest.advanceTimersByTime(100);
await Promise.resolve();
// Wait for the timer to expire and any promises to resolve
await new Promise((resolve) => setTimeout(resolve, 150));
await new Promise(process.nextTick);

// Verify the message was processed
expect(mockProcessor.processBatch).toHaveBeenCalledWith([
expect(processBatchSpy).toHaveBeenCalledWith([
{
chatId: 'default',
text: 'test message',
level: 'info',
error: undefined,
},
]);

setTimeoutSpy.mockRestore();
jest.useRealTimers();
});
it('should set a timer and process messages after the specified delay', async () => {
it('should set a timer and process 2 messages after the specified delay', async () => {
jest.useRealTimers();
const processBatchSpy = jest.fn();

// Clear any existing queues
resetBatcher();

// Verify queue doesn't exist yet
expect(globalQueues.has('default')).toBe(false);
expect(batcher.queues.has('default')).toBe(false);

batcher = createMessageBatcher(
[
mockProcessor,
{
name: 'mock',
processBatch: processBatchSpy,
},
],
Expand All @@ -110,8 +95,8 @@ describe('MessageBatcher', () => {
batcher.info('test message');

// Verify queue was initialized
expect(globalQueues.has('default')).toBe(true);
expect(globalQueues.get('default')).toHaveLength(1);
expect(batcher.queues.has('default')).toBe(true);
expect(batcher.queues.get('default')).toHaveLength(1);

// Wait for the timer to expire
await new Promise((resolve) => setTimeout(resolve, 200));
Expand Down Expand Up @@ -265,6 +250,7 @@ describe('MessageBatcher', () => {
it('should handle processor failures gracefully', async () => {
const consoleSpy = jest.spyOn(console, 'error').mockImplementation();
const failingProcessor: MessageProcessor = {
name: 'mock',
processBatch: jest.fn().mockRejectedValue(new Error('Process failed')),
};

Expand All @@ -277,7 +263,7 @@ describe('MessageBatcher', () => {
await batcher.flush();

expect(consoleSpy).toHaveBeenCalledWith(
'Processor 0 failed:',
'Processor mock failed:',
expect.any(Error)
);

Expand Down Expand Up @@ -334,7 +320,7 @@ describe('MessageBatcher', () => {
]);
});

it('should clear all queues when reset', async () => {
it('should flush items in queue', async () => {
batcher = createMessageBatcher([mockProcessor], {
maxBatchSize: 5,
maxWaitMs: 1000,
Expand All @@ -344,15 +330,17 @@ describe('MessageBatcher', () => {
batcher.warning('Message 2');

// Reset before processing
resetBatcher();
await batcher.flush();

expect(processedMessages).toHaveLength(0);
expect(processedMessages).toHaveLength(2);
});

it('should process messages synchronously with flushSync', async () => {
const processBatchSpy = jest.fn().mockResolvedValue(undefined);
const syncProcessor = { processBatch: processBatchSpy };
const syncProcessor = {
name: 'mock',
processBatch: processBatchSpy,
};

batcher = createMessageBatcher([syncProcessor], {
maxBatchSize: 5,
Expand Down Expand Up @@ -387,7 +375,10 @@ describe('MessageBatcher', () => {
const processBatchSpy = jest.fn(() =>
Promise.reject(new Error('Process failed'))
);
const failingProcessor = { processBatch: processBatchSpy };
const failingProcessor = {
name: 'mock',
processBatch: processBatchSpy,
};

batcher = createMessageBatcher([failingProcessor], {
maxBatchSize: 2,
Expand Down Expand Up @@ -427,6 +418,7 @@ describe('MessageBatcher', () => {
it('should handle sync processor errors without breaking the queue', () => {
const consoleSpy = jest.spyOn(console, 'error').mockImplementation();
const syncProcessor = {
name: 'mock',
processBatch: jest.fn().mockImplementation(() => {
throw new Error('Sync error');
}),
Expand All @@ -451,7 +443,6 @@ describe('MessageBatcher', () => {

it('should initialize queue for new chat ID', async () => {
// Reset the global queue to ensure we start fresh
resetBatcher();

batcher = createMessageBatcher([mockProcessor], {
maxBatchSize: 5,
Expand All @@ -476,15 +467,13 @@ describe('MessageBatcher', () => {
});

it('should initialize empty queue for new chat ID', () => {
resetBatcher();

batcher = createMessageBatcher([mockProcessor], {
maxBatchSize: 5,
maxWaitMs: 1000,
});

// Access the internal queue map
const queues = globalQueues as Map<string, Message[]>;
const queues = batcher.queues as Map<string, Message[]>;
expect(queues.has('default')).toBe(false);

// This should initialize the queue
Expand All @@ -508,14 +497,14 @@ describe('MessageBatcher', () => {
});

// Verify queue doesn't exist initially
expect(globalQueues.has('default')).toBe(false);
expect(batcher.queues.has('default')).toBe(false);

// Add a message which should initialize the queue
(batcher as any).queueMessage('test message', 'info');

// Verify queue was initialized with the message
expect(globalQueues.has('default')).toBe(true);
const queue = globalQueues.get('default');
expect(batcher.queues.has('default')).toBe(true);
const queue = batcher.queues.get('default');
expect(queue).toBeDefined();
expect(queue).toEqual([
{
Expand All @@ -529,7 +518,7 @@ describe('MessageBatcher', () => {

it('should handle undefined queue result from get', async () => {
// Clear any existing queues
globalQueues.clear();
batcher.queues.clear();

// Create a new batcher
batcher = createMessageBatcher([mockProcessor], {
Expand All @@ -538,7 +527,7 @@ describe('MessageBatcher', () => {
});

// Force the queue to be undefined for the first get
const queue = globalQueues.get('default');
const queue = batcher.queues.get('default');
expect(queue).toBeUndefined();

// Add a message - this should handle the undefined case
Expand Down Expand Up @@ -646,14 +635,14 @@ describe('MessageBatcher', () => {
(batcher as any).queueMessage('test message', 'info');

// Verify timer was created
expect(timers.size).toBe(1);
expect(batcher.timers.size).toBe(1);

// Advance time to trigger the timer
jest.advanceTimersByTime(1000);
await Promise.resolve();

// Verify timer was cleaned up
expect(timers.size).toBe(0);
expect(batcher.timers.size).toBe(0);

// Verify message was processed
expect(mockProcessor.processBatch).toHaveBeenCalledWith([
Expand All @@ -680,19 +669,19 @@ describe('MessageBatcher', () => {
(batcher as any).queueMessage('test message', 'info');

// Get the timer ID
const timerIds = Array.from(timers.values());
const timerIds = Array.from(batcher.timers.values());
expect(timerIds).toHaveLength(1);

// Advance time to trigger the callback
jest.advanceTimersByTime(1000);
await Promise.resolve();

// Verify timer was deleted
expect(timers.has('default')).toBe(false);
expect(timers.size).toBe(0);
expect(batcher.timers.has('default')).toBe(false);
expect(batcher.timers.size).toBe(0);
// Add another message to verify new timer can be created
(batcher as any).queueMessage('test message 2', 'info');
expect(timers.size).toBe(1);
expect(batcher.timers.size).toBe(1);

jest.useRealTimers();
});
Expand Down
13 changes: 12 additions & 1 deletion src/__tests__/processors/telegram.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@ describe('TelegramProcessor', () => {
json: () => Promise.resolve({}),
} as Response)
);
// Silence console.log except for specific tests
// Silence console output except for specific tests
jest.spyOn(console, 'log').mockImplementation();
jest.spyOn(console, 'error').mockImplementation();
});

afterEach(() => {
Expand Down Expand Up @@ -64,6 +65,7 @@ describe('TelegramProcessor', () => {
});

it('should throw error on failed API response', async () => {
const consoleSpy = jest.spyOn(console, 'error');
// Updated mock implementation for failed response
(global.fetch as jest.Mock).mockImplementationOnce(() =>
Promise.resolve({
Expand All @@ -86,6 +88,15 @@ describe('TelegramProcessor', () => {
await expect(processor.processBatch(messages)).rejects.toThrow(
'Telegram API error: Bad Request - Bad Request: message text is empty'
);

expect(consoleSpy).toHaveBeenCalledWith(
'[Telegram] API Response:',
expect.objectContaining({
ok: false,
error_code: 400,
description: expect.any(String)
})
);
});

it('should handle empty message batch', async () => {
Expand Down
Loading

0 comments on commit c4fe8e7

Please sign in to comment.