Skip to content

Commit

Permalink
refactor: improve event middleware types (#15995)
Browse files Browse the repository at this point in the history
  • Loading branch information
atomrc authored Oct 16, 2023
1 parent 94ebd2e commit 6b34cbf
Show file tree
Hide file tree
Showing 12 changed files with 398 additions and 445 deletions.
8 changes: 8 additions & 0 deletions src/script/conversation/EventBuilder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,14 @@ export type MessageAddEvent = Omit<
replacing_message_id?: string;
previews?: string[];
expects_read_confirmation?: boolean;
quote?:
| string
| {
message_id: string;
user_id: string;
hash: Uint8Array;
}
| {error: {type: string}};
}>,
'id'
> & {
Expand Down
4 changes: 4 additions & 0 deletions src/script/event/EventProcessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,7 @@ export type IncomingEvent = BackendEvent | ClientConversationEvent;
export interface EventProcessor {
processEvent(event: IncomingEvent, source: EventSource): Promise<void>;
}

export interface EventMiddleware {
processEvent(event: IncomingEvent): Promise<IncomingEvent>;
}
20 changes: 12 additions & 8 deletions src/script/event/EventRepository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import {queue} from 'Util/PromiseQueue';
import {TIME_IN_MILLIS} from 'Util/TimeUtil';

import {ClientEvent} from './Client';
import {EventProcessor, IncomingEvent} from './EventProcessor';
import {EventMiddleware, EventProcessor, IncomingEvent} from './EventProcessor';
import type {EventService} from './EventService';
import {EventSource} from './EventSource';
import {EVENT_TYPE} from './EventType';
Expand Down Expand Up @@ -69,7 +69,7 @@ export class EventRepository {
notificationsHandled: number;
notificationsTotal: number;
lastEventDate: ko.Observable<string | undefined>;
eventProcessMiddlewares: Function[] = [];
eventProcessMiddlewares: EventMiddleware[] = [];
/** event processors are classes that are able to react and process an incoming event */
eventProcessors: EventProcessor[] = [];

Expand Down Expand Up @@ -123,15 +123,19 @@ export class EventRepository {
}

/**
* Will set a middleware to run before the EventRepository actually processes the event.
* Middleware is just a function with the following signature (Event) => Promise<Event>.
*
* @param middlewares middlewares to run when a new event is about to be processed
* Will register a pipeline that transforms an event before it is being processed by the EventProcessors.
* Those middleware are run sequentially one after the other. Thus the order at which they are defined matters.
* When one middleware fails the entire even handling process will stop and no further middleware will be executed.
*/
setEventProcessMiddlewares(middlewares: Function[]) {
setEventProcessMiddlewares(middlewares: EventMiddleware[]) {
this.eventProcessMiddlewares = middlewares;
}

/**
* EventProcessors are classes that are able to react and process an incoming event.
* They will all be executed in parallel. If one processor fails the other ones are not impacted
* @param processors
*/
setEventProcessors(processors: EventProcessor[]) {
this.eventProcessors = processors;
}
Expand Down Expand Up @@ -427,7 +431,7 @@ export class EventRepository {
*/
private async processEvent(event: IncomingEvent | ClientConversationEvent, source: EventSource) {
for (const eventProcessMiddleware of this.eventProcessMiddlewares) {
event = await eventProcessMiddleware(event);
event = await eventProcessMiddleware.processEvent(event);
}

const shouldSaveEvent = EventTypeHandling.STORE.includes(event.type as CONVERSATION_EVENT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,49 +19,55 @@

import {Quote} from '@wireapp/protocol-messaging';

import {EventBuilder, MessageAddEvent} from 'src/script/conversation/EventBuilder';
import {Conversation} from 'src/script/entity/Conversation';
import {User} from 'src/script/entity/User';
import {ClientEvent} from 'src/script/event/Client';
import {QuotedMessageMiddleware} from 'src/script/event/preprocessor/QuotedMessageMiddleware';
import {MessageHasher} from 'src/script/message/MessageHasher';
import {QuoteEntity} from 'src/script/message/QuoteEntity';
import {EventRecord} from 'src/script/storage';
import {arrayToBase64} from 'Util/util';
import {createUuid} from 'Util/uuid';

import {TestFactory} from '../../../helper/TestFactory';
import {QuotedMessageMiddleware} from './QuotedMessageMiddleware';

describe('QuotedMessageMiddleware', () => {
const testFactory = new TestFactory();
let quotedMessageMiddleware;
import {EventService} from '../EventService';

beforeEach(() => {
return testFactory.exposeEventActors().then(() => {
quotedMessageMiddleware = new QuotedMessageMiddleware(testFactory.event_service);
});
});
function buildQuotedMessageMiddleware() {
const eventService = {
loadEvent: jest.fn(() => []),
loadEventsReplyingToMessage: jest.fn(),
loadReplacingEvent: jest.fn(),
replaceEvent: jest.fn(),
} as unknown as jest.Mocked<EventService>;

return [new QuotedMessageMiddleware(eventService), {eventService}] as const;
}

describe('QuotedMessageMiddleware', () => {
const conversation = new Conversation(createUuid());
conversation.selfUser(new User());

describe('processEvent', () => {
it('ignores messages that do not have quotes', () => {
const event = {
data: {
content: 'salut',
quote: undefined,
},
type: ClientEvent.CONVERSATION.MESSAGE_ADD,
};
it('ignores messages that do not have quotes', async () => {
const [quotedMessageMiddleware] = buildQuotedMessageMiddleware();
const event = EventBuilder.buildMessageAdd(conversation, 0, 'salut');

return quotedMessageMiddleware.processEvent(event).then(decoratedEvent => {
expect(decoratedEvent).toBe(event);
});
const decoratedEvent = await quotedMessageMiddleware.processEvent(event);
expect(decoratedEvent).toEqual(event);
});

it('adds an error if quoted message is not found', async () => {
spyOn(quotedMessageMiddleware.eventService, 'loadEvent').and.returnValue(Promise.resolve(undefined));
const [quotedMessageMiddleware, {eventService}] = buildQuotedMessageMiddleware();
eventService.loadEvent.mockResolvedValue(undefined);

const expectedError = {
type: QuoteEntity.ERROR.MESSAGE_NOT_FOUND,
};

const quote = new Quote({
quotedMessageId: 'invalid-message-uuid',
quotedMessageSha256: '',
quotedMessageSha256: new Uint8Array(),
});

const base64Quote = arrayToBase64(Quote.encode(quote).finish());
Expand All @@ -73,29 +79,29 @@ describe('QuotedMessageMiddleware', () => {
quote: base64Quote,
},
type: ClientEvent.CONVERSATION.MESSAGE_ADD,
};
} as MessageAddEvent;

const parsedEvent = await quotedMessageMiddleware.processEvent(event);
const parsedEvent: any = await quotedMessageMiddleware.processEvent(event);

expect(parsedEvent.data.quote.quotedMessageId).toBeUndefined();
expect(parsedEvent.data.quote.error).toEqual(expectedError);
});

it('decorates event with the quote metadata if validation is successful', async () => {
const [quotedMessageMiddleware, {eventService}] = buildQuotedMessageMiddleware();
const quotedMessage = {
data: {
content: 'salut',
},
from: 'user-id',
time: 100,
type: ClientEvent.CONVERSATION.MESSAGE_ADD,
};
spyOn(MessageHasher, 'validateHash').and.returnValue(Promise.resolve(true));
spyOn(quotedMessageMiddleware.eventService, 'loadEvent').and.returnValue(Promise.resolve(quotedMessage));
} as any;
jest.spyOn(MessageHasher, 'validateHash').mockResolvedValue(true);
eventService.loadEvent.mockResolvedValue(quotedMessage);

const quote = new Quote({
quotedMessageId: 'message-uuid',
quotedMessageSha256: '7fec6710751f67587b6f6109782257cd7c56b5d29570824132e8543e18242f1b',
quotedMessageSha256: new Uint8Array(),
});

const base64Quote = arrayToBase64(Quote.encode(quote).finish());
Expand All @@ -106,22 +112,22 @@ describe('QuotedMessageMiddleware', () => {
content: 'salut',
quote: base64Quote,
},
time: 100,
type: ClientEvent.CONVERSATION.MESSAGE_ADD,
};
} as MessageAddEvent;

const parsedEvent = await quotedMessageMiddleware.processEvent(event);
const parsedEvent: any = await quotedMessageMiddleware.processEvent(event);

expect(parsedEvent.data.quote.message_id).toEqual('message-uuid');
expect(parsedEvent.data.quote.user_id).toEqual('user-id');
});

it('updates quotes in DB when a message is edited', () => {
const [quotedMessageMiddleware, {eventService}] = buildQuotedMessageMiddleware();
const originalMessage = {
data: {
content: 'hello',
},
};
} as EventRecord;
const replies = [
{
data: {
Expand All @@ -140,35 +146,32 @@ describe('QuotedMessageMiddleware', () => {
},
},
];
spyOn(quotedMessageMiddleware.eventService, 'loadEvent').and.returnValue(Promise.resolve(originalMessage));
spyOn(quotedMessageMiddleware.eventService, 'loadEventsReplyingToMessage').and.returnValue(
Promise.resolve(replies),
);
spyOn(quotedMessageMiddleware.eventService, 'replaceEvent').and.returnValue(Promise.resolve());
eventService.loadEvent.mockResolvedValue(originalMessage);
eventService.loadEventsReplyingToMessage.mockResolvedValue(replies);

const event = {
conversation: 'conversation-uuid',
data: {
replacing_message_id: 'original-id',
},
id: 'new-id',
time: 100,
type: ClientEvent.CONVERSATION.MESSAGE_ADD,
};
} as any;

jest.useFakeTimers();

return quotedMessageMiddleware.processEvent(event).then(() => {
jest.advanceTimersByTime(1);

expect(quotedMessageMiddleware.eventService.replaceEvent).toHaveBeenCalledWith(
expect(eventService.replaceEvent).toHaveBeenCalledWith(
jasmine.objectContaining({data: jasmine.objectContaining({quote: {message_id: 'new-id'}})}),
);
jest.useRealTimers();
});
});

it('invalidates quotes in DB when a message is deleted', () => {
const [quotedMessageMiddleware, {eventService}] = buildQuotedMessageMiddleware();
const originalMessage = {
data: {
content: 'hello',
Expand All @@ -192,24 +195,21 @@ describe('QuotedMessageMiddleware', () => {
},
},
];
spyOn(quotedMessageMiddleware.eventService, 'loadEvent').and.returnValue(Promise.resolve(originalMessage));
spyOn(quotedMessageMiddleware.eventService, 'loadEventsReplyingToMessage').and.returnValue(
Promise.resolve(replies),
);
spyOn(quotedMessageMiddleware.eventService, 'replaceEvent').and.returnValue(Promise.resolve());
spyOn(eventService, 'loadEvent').and.returnValue(Promise.resolve(originalMessage));
spyOn(eventService, 'loadEventsReplyingToMessage').and.returnValue(Promise.resolve(replies));
spyOn(eventService, 'replaceEvent').and.returnValue(Promise.resolve());

const event = {
conversation: 'conversation-uuid',
data: {
replacing_message_id: 'original-id',
},
id: 'new-id',
time: 100,
type: ClientEvent.CONVERSATION.MESSAGE_DELETE,
};
} as any;

return quotedMessageMiddleware.processEvent(event).then(() => {
expect(quotedMessageMiddleware.eventService.replaceEvent).toHaveBeenCalledWith(
expect(eventService.replaceEvent).toHaveBeenCalledWith(
jasmine.objectContaining({
data: jasmine.objectContaining({quote: {error: {type: QuoteEntity.ERROR.MESSAGE_NOT_FOUND}}}),
}),
Expand Down
44 changes: 22 additions & 22 deletions src/script/event/preprocessor/QuotedMessageMiddleware.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,20 @@

import {Quote} from '@wireapp/protocol-messaging';

import {DeleteEvent, MessageAddEvent} from 'src/script/conversation/EventBuilder';
import {getLogger, Logger} from 'Util/Logger';
import {base64ToArray} from 'Util/util';

import {QuoteEntity} from '../../message/QuoteEntity';
import {EventRecord} from '../../storage/record/EventRecord';
import {EventRecord, StoredEvent} from '../../storage/record/EventRecord';
import {ClientEvent} from '../Client';
import {EventMiddleware, IncomingEvent} from '../EventProcessor';
import type {EventService} from '../EventService';

export class QuotedMessageMiddleware {
private readonly eventService: EventService;
export class QuotedMessageMiddleware implements EventMiddleware {
private readonly logger: Logger;

constructor(eventService: EventService) {
this.eventService = eventService;
constructor(private readonly eventService: EventService) {
this.logger = getLogger('QuotedMessageMiddleware');
}

Expand All @@ -43,17 +43,15 @@ export class QuotedMessageMiddleware {
* @param event event in the DB format
* @returns the original event if no quote is found (or does not validate). The decorated event if the quote is valid
*/
async processEvent(event: EventRecord): Promise<EventRecord> {
async processEvent(event: IncomingEvent): Promise<IncomingEvent> {
switch (event.type) {
case ClientEvent.CONVERSATION.MESSAGE_ADD: {
if (event.data.replacing_message_id) {
return this._handleEditEvent(event);
}
return this._handleAddEvent(event);
const originalMessageId = event.data.replacing_message_id;
return originalMessageId ? this.handleEditEvent(event, originalMessageId) : this.handleAddEvent(event);
}

case ClientEvent.CONVERSATION.MESSAGE_DELETE: {
return this._handleDeleteEvent(event);
return this.handleDeleteEvent(event);
}

default: {
Expand All @@ -62,9 +60,9 @@ export class QuotedMessageMiddleware {
}
}

private async _handleDeleteEvent(event: EventRecord): Promise<EventRecord> {
private async handleDeleteEvent(event: DeleteEvent): Promise<DeleteEvent> {
const originalMessageId = event.data.message_id;
const {replies} = await this._findRepliesToMessage(event.conversation, originalMessageId);
const {replies} = await this.findRepliesToMessage(event.conversation, originalMessageId);
this.logger.info(`Invalidating '${replies.length}' replies to deleted message '${originalMessageId}'`);
replies.forEach(reply => {
reply.data.quote = {error: {type: QuoteEntity.ERROR.MESSAGE_NOT_FOUND}};
Expand All @@ -73,27 +71,29 @@ export class QuotedMessageMiddleware {
return event;
}

private async _handleEditEvent(event: EventRecord): Promise<EventRecord> {
const originalMessageId = event.data.replacing_message_id;
const {originalEvent, replies} = await this._findRepliesToMessage(event.conversation, originalMessageId);
private async handleEditEvent(event: MessageAddEvent, originalMessageId: string): Promise<MessageAddEvent> {
const {originalEvent, replies} = await this.findRepliesToMessage(event.conversation, originalMessageId);
if (!originalEvent) {
return event;
}

this.logger.info(`Updating '${replies.length}' replies to updated message '${originalMessageId}'`);
replies.forEach(reply => {
reply.data.quote.message_id = event.id;
const quote = reply.data.quote;
if (quote && typeof quote !== 'string' && 'message_id' in quote && 'id' in event) {
quote.message_id = event.id as string;
}
// we want to update the messages quoting the original message later, thus the timeout
setTimeout(() => this.eventService.replaceEvent(reply));
});
const decoratedData = {...event.data, quote: originalEvent.data.quote};
return {...event, data: decoratedData};
}

private async _handleAddEvent(event: EventRecord): Promise<EventRecord> {
const rawQuote = event.data && event.data.quote;
private async handleAddEvent(event: MessageAddEvent): Promise<MessageAddEvent> {
const rawQuote = event.data.quote;

if (!rawQuote) {
if (!rawQuote || typeof rawQuote !== 'string') {
return event;
}

Expand Down Expand Up @@ -132,10 +132,10 @@ export class QuotedMessageMiddleware {
return {...event, data: decoratedData};
}

private async _findRepliesToMessage(
private async findRepliesToMessage(
conversationId: string,
messageId: string,
): Promise<{originalEvent?: EventRecord; replies: EventRecord[]}> {
): Promise<{originalEvent?: EventRecord; replies: StoredEvent<MessageAddEvent>[]}> {
const originalEvent = await this.eventService.loadEvent(conversationId, messageId);

if (!originalEvent) {
Expand Down
Loading

0 comments on commit 6b34cbf

Please sign in to comment.