Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: throttle mark read API requests #628

Merged
merged 2 commits into from
Aug 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 112 additions & 1 deletion projects/stream-chat-angular/src/lib/channel.service.spec.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { fakeAsync, TestBed, tick } from '@angular/core/testing';
import { fakeAsync, flush, TestBed, tick } from '@angular/core/testing';
import { Subject } from 'rxjs';
import { first, take } from 'rxjs/operators';
import {
Expand Down Expand Up @@ -208,6 +208,7 @@ describe('ChannelService', () => {
events$.next({ eventType: 'connection.recovered' } as ClientEvent);

tick();
flush();

expect(spy).toHaveBeenCalledWith(channels);
expect(activeChannelSpy).toHaveBeenCalledWith(channels[0]);
Expand Down Expand Up @@ -577,6 +578,10 @@ describe('ChannelService', () => {

it('should watch for new message events', async () => {
await init();
// wait for mark read throttle time
await new Promise((resolve) => {
setTimeout(resolve, service['markReadThrottleTime']);
});
const spy = jasmine.createSpy();
service.activeChannelMessages$.subscribe(spy);
const prevCount = (spy.calls.mostRecent().args[0] as Channel[]).length;
Expand Down Expand Up @@ -991,6 +996,7 @@ describe('ChannelService', () => {

it('should add the new channel to the top of the list, and start watching it, if user is added to a channel', fakeAsync(async () => {
await init();
flush();
const newChannel = generateMockChannels()[0];
newChannel.cid = 'newchannel';
newChannel.id = 'newchannel';
Expand Down Expand Up @@ -1030,6 +1036,7 @@ describe('ChannelService', () => {
event: { channel: channel } as any as Event<DefaultStreamChatGenerics>,
});
tick();
flush();

const channels = spy.calls.mostRecent().args[0] as Channel[];
const firstChannel = channels[0];
Expand Down Expand Up @@ -1059,6 +1066,7 @@ describe('ChannelService', () => {
event: { channel: channel } as any as Event<DefaultStreamChatGenerics>,
});
tick();
flush();

const channels = spy.calls.mostRecent().args[0] as Channel[];

Expand Down Expand Up @@ -2242,6 +2250,7 @@ describe('ChannelService', () => {

it('should relaod active channel if active channel is not present after state reconnect', fakeAsync(async () => {
await init();
flush();
let activeChannel!: Channel<DefaultStreamChatGenerics>;
service.activeChannel$.subscribe((c) => (activeChannel = c!));
let channels!: Channel<DefaultStreamChatGenerics>[];
Expand All @@ -2251,6 +2260,7 @@ describe('ChannelService', () => {
mockChatClient.queryChannels.and.resolveTo(channels);
events$.next({ eventType: 'connection.recovered' } as ClientEvent);
tick();
flush();
const spy = jasmine.createSpy();
service.activeChannel$.subscribe(spy);

Expand All @@ -2276,6 +2286,7 @@ describe('ChannelService', () => {
activeChannel.state.messages.push(newMessage);
events$.next({ eventType: 'connection.recovered' } as ClientEvent);
tick();
flush();

expect(spy).not.toHaveBeenCalled();
expect(service.deselectActiveChannel).not.toHaveBeenCalled();
Expand Down Expand Up @@ -2354,6 +2365,22 @@ describe('ChannelService', () => {
expect(service.activeChannelUnreadCount).toBe(0);
});

it(`should set last read message id to undefined if unread count is 0`, () => {
const activeChannel = generateMockChannels()[0];
activeChannel.id = 'next-active-channel';
activeChannel.state.read[user.id] = {
last_read: new Date(),
last_read_message_id: 'last-read-message-id',
unread_messages: 0,
user: user,
};

service.setAsActiveChannel(activeChannel);

expect(service.activeChannelLastReadMessageId).toBe(undefined);
expect(service.activeChannelUnreadCount).toBe(0);
});

it('should be able to select empty channel as active channel', () => {
const channel = generateMockChannels()[0];
channel.id = 'new-empty-channel';
Expand Down Expand Up @@ -2558,6 +2585,18 @@ describe('ChannelService', () => {

expect(service.activeChannelLastReadMessageId).toBe('last-read-message');
expect(service.activeChannelUnreadCount).toBe(12);

events$.next({
eventType: 'notification.mark_unread',
event: {
channel_id: service.activeChannel?.id,
unread_messages: 0,
last_read_message_id: 'last-read-message',
} as Event<DefaultStreamChatGenerics>,
});

expect(service.activeChannelLastReadMessageId).toBe(undefined);
expect(service.activeChannelUnreadCount).toBe(0);
});

it('should halt marking the channel as read if an unread call was made in that session', async () => {
Expand Down Expand Up @@ -2639,4 +2678,76 @@ describe('ChannelService', () => {
expect(customQuery).toHaveBeenCalledWith('next-page');
expect(hasMoreSpy).toHaveBeenCalledWith(false);
});

it('should throttle mark read API calls', async () => {
await init();
// wait for mark read throttle time
await new Promise((resolve) => {
setTimeout(resolve, service['markReadThrottleTime']);
});

const activeChannel = service.activeChannel!;
spyOn(activeChannel, 'markRead');

(activeChannel as MockChannel).handleEvent('message.new', mockMessage());

expect(activeChannel.markRead).toHaveBeenCalledTimes(1);

(activeChannel as MockChannel).handleEvent('message.new', mockMessage());

expect(activeChannel.markRead).toHaveBeenCalledTimes(1);

// wait for mark read throttle time
await new Promise((resolve) => {
setTimeout(resolve, service['markReadThrottleTime']);
});

expect(activeChannel.markRead).toHaveBeenCalledTimes(2);
});

it('should throttle mark read API calls - channel change', async () => {
await init();
// wait for mark read throttle time
await new Promise((resolve) => {
setTimeout(resolve, service['markReadThrottleTime']);
});

const activeChannel = service.activeChannel!;
spyOn(activeChannel, 'markRead');

(activeChannel as MockChannel).handleEvent('message.new', mockMessage());

expect(activeChannel.markRead).toHaveBeenCalledTimes(1);

(activeChannel as MockChannel).handleEvent('message.new', mockMessage());

expect(activeChannel.markRead).toHaveBeenCalledTimes(1);

service.setAsActiveChannel(service.channels[1]);

expect(activeChannel.markRead).toHaveBeenCalledTimes(2);
});

it('should throttle mark read API calls - reset', async () => {
await init();
// wait for mark read throttle time
await new Promise((resolve) => {
setTimeout(resolve, service['markReadThrottleTime']);
});

const activeChannel = service.activeChannel!;
spyOn(activeChannel, 'markRead');

(activeChannel as MockChannel).handleEvent('message.new', mockMessage());

expect(activeChannel.markRead).toHaveBeenCalledTimes(1);

(activeChannel as MockChannel).handleEvent('message.new', mockMessage());

expect(activeChannel.markRead).toHaveBeenCalledTimes(1);

service.reset();

expect(activeChannel.markRead).toHaveBeenCalledTimes(2);
});
});
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { fakeAsync, TestBed, tick } from '@angular/core/testing';
import { fakeAsync, flush, TestBed, tick } from '@angular/core/testing';
import { Subject } from 'rxjs';
import { first } from 'rxjs/operators';
import {
Expand Down Expand Up @@ -235,6 +235,7 @@ describe('ChannelService - threads', () => {
spy.calls.reset();
events$.next({ eventType: 'connection.recovered' } as ClientEvent);
tick();
flush();

expect(spy).toHaveBeenCalledWith(undefined);
}));
Expand Down Expand Up @@ -314,6 +315,10 @@ describe('ChannelService - threads', () => {

it('should watch for new message events', async () => {
await init();
// wait for mark read throttle time
await new Promise((resolve) => {
setTimeout(resolve, service['markReadThrottleTime']);
});
const spy = jasmine.createSpy();
const parentMessage = mockMessage();
await service.setAsActiveParentMessage(parentMessage);
Expand Down
46 changes: 41 additions & 5 deletions projects/stream-chat-angular/src/lib/channel.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,9 @@ export class ChannelService<
};
private dismissErrorNotification?: () => void;
private areReadEventsPaused = false;
private markReadThrottleTime = 1050;
private markReadTimeout?: ReturnType<typeof setTimeout>;
private scheduledMarkReadRequest?: () => void;

constructor(
private chatClientService: ChatClientService<T>,
Expand Down Expand Up @@ -563,17 +566,19 @@ export class ChannelService<
return;
}
this.stopWatchForActiveChannelEvents(prevActiveChannel);
this.flushMarkReadQueue();
this.areReadEventsPaused = false;
const readState =
channel.state.read[this.chatClientService.chatClient.user?.id || ''];
this.activeChannelLastReadMessageId = readState?.last_read_message_id;
this.activeChannelUnreadCount = readState?.unread_messages || 0;
if (
channel.state.latestMessages[channel.state.latestMessages.length - 1]
?.id === this.activeChannelLastReadMessageId
?.id === this.activeChannelLastReadMessageId ||
this.activeChannelUnreadCount === 0
) {
this.activeChannelLastReadMessageId = undefined;
}
this.activeChannelUnreadCount = readState?.unread_messages || 0;
this.watchForActiveChannelEvents(channel);
this.addChannel(channel);
this.activeChannelSubject.next(channel);
Expand All @@ -595,6 +600,7 @@ export class ChannelService<
return;
}
this.stopWatchForActiveChannelEvents(activeChannel);
this.flushMarkReadQueue();
this.activeChannelMessagesSubject.next([]);
this.activeChannelSubject.next(undefined);
this.activeParentMessageIdSubject.next(undefined);
Expand Down Expand Up @@ -1567,6 +1573,9 @@ export class ChannelService<
this.ngZone.run(() => {
this.activeChannelLastReadMessageId = e.last_read_message_id;
this.activeChannelUnreadCount = e.unread_messages;
if (this.activeChannelUnreadCount === 0) {
this.activeChannelLastReadMessageId = undefined;
}
this.activeChannelSubject.next(this.activeChannel);
});
})
Expand Down Expand Up @@ -2220,16 +2229,43 @@ export class ChannelService<
this.usersTypingInThreadSubject.next([]);
}

private markRead(channel: Channel<T>) {
private markRead(channel: Channel<T>, isThrottled = true) {
if (
this.canSendReadEvents &&
this.shouldMarkActiveChannelAsRead &&
!this.areReadEventsPaused
!this.areReadEventsPaused &&
channel.countUnread() > 0
) {
void channel.markRead();
if (isThrottled) {
this.markReadThrottled(channel);
} else {
void channel.markRead();
}
}
}

private markReadThrottled(channel: Channel<T>) {
if (!this.markReadTimeout) {
this.markRead(channel, false);
this.markReadTimeout = setTimeout(() => {
this.flushMarkReadQueue();
}, this.markReadThrottleTime);
} else {
clearTimeout(this.markReadTimeout);
this.scheduledMarkReadRequest = () => this.markRead(channel, false);
this.markReadTimeout = setTimeout(() => {
this.flushMarkReadQueue();
}, this.markReadThrottleTime);
}
}

private flushMarkReadQueue() {
this.scheduledMarkReadRequest?.();
this.scheduledMarkReadRequest = undefined;
clearTimeout(this.markReadTimeout);
this.markReadTimeout = undefined;
}

private async _init(settings: {
shouldSetActiveChannel: boolean;
messagePageSize: number;
Expand Down
2 changes: 1 addition & 1 deletion projects/stream-chat-angular/src/lib/mocks/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ export const generateMockChannels = (length = 25) => {
sendAction: () => {},
deleteImage: () => {},
deleteFile: () => {},
countUnread: () => {},
countUnread: () => 3,
markRead: () => {},
getReplies: () => {},
keystroke: () => {},
Expand Down
Loading