Skip to content

Commit

Permalink
fix: throttle mark read API requests
Browse files Browse the repository at this point in the history
  • Loading branch information
szuperaz committed Aug 27, 2024
1 parent 164b676 commit 827588f
Show file tree
Hide file tree
Showing 3 changed files with 116 additions and 4 deletions.
85 changes: 84 additions & 1 deletion projects/stream-chat-angular/src/lib/channel.service.spec.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { fakeAsync, TestBed, tick } from '@angular/core/testing';
import { fakeAsync, flush, TestBed, tick } from '@angular/core/testing';
import { Subject } from 'rxjs';
import { first, take } from 'rxjs/operators';
import {
Expand Down Expand Up @@ -208,6 +208,7 @@ describe('ChannelService', () => {
events$.next({ eventType: 'connection.recovered' } as ClientEvent);

tick();
flush();

expect(spy).toHaveBeenCalledWith(channels);
expect(activeChannelSpy).toHaveBeenCalledWith(channels[0]);
Expand Down Expand Up @@ -577,6 +578,10 @@ describe('ChannelService', () => {

it('should watch for new message events', async () => {
await init();
// wait for mark read throttle time
await new Promise((resolve) => {
setTimeout(resolve, 500);
});
const spy = jasmine.createSpy();
service.activeChannelMessages$.subscribe(spy);
const prevCount = (spy.calls.mostRecent().args[0] as Channel[]).length;
Expand Down Expand Up @@ -991,6 +996,7 @@ describe('ChannelService', () => {

it('should add the new channel to the top of the list, and start watching it, if user is added to a channel', fakeAsync(async () => {
await init();
flush();
const newChannel = generateMockChannels()[0];
newChannel.cid = 'newchannel';
newChannel.id = 'newchannel';
Expand Down Expand Up @@ -1030,6 +1036,7 @@ describe('ChannelService', () => {
event: { channel: channel } as any as Event<DefaultStreamChatGenerics>,
});
tick();
flush();

const channels = spy.calls.mostRecent().args[0] as Channel[];
const firstChannel = channels[0];
Expand Down Expand Up @@ -1059,6 +1066,7 @@ describe('ChannelService', () => {
event: { channel: channel } as any as Event<DefaultStreamChatGenerics>,
});
tick();
flush();

const channels = spy.calls.mostRecent().args[0] as Channel[];

Expand Down Expand Up @@ -2242,6 +2250,7 @@ describe('ChannelService', () => {

it('should relaod active channel if active channel is not present after state reconnect', fakeAsync(async () => {
await init();
flush();
let activeChannel!: Channel<DefaultStreamChatGenerics>;
service.activeChannel$.subscribe((c) => (activeChannel = c!));
let channels!: Channel<DefaultStreamChatGenerics>[];
Expand All @@ -2251,6 +2260,7 @@ describe('ChannelService', () => {
mockChatClient.queryChannels.and.resolveTo(channels);
events$.next({ eventType: 'connection.recovered' } as ClientEvent);
tick();
flush();
const spy = jasmine.createSpy();
service.activeChannel$.subscribe(spy);

Expand All @@ -2276,6 +2286,7 @@ describe('ChannelService', () => {
activeChannel.state.messages.push(newMessage);
events$.next({ eventType: 'connection.recovered' } as ClientEvent);
tick();
flush();

expect(spy).not.toHaveBeenCalled();
expect(service.deselectActiveChannel).not.toHaveBeenCalled();
Expand Down Expand Up @@ -2639,4 +2650,76 @@ describe('ChannelService', () => {
expect(customQuery).toHaveBeenCalledWith('next-page');
expect(hasMoreSpy).toHaveBeenCalledWith(false);
});

it('should throttle mark read API calls', async () => {
await init();
// wait for mark read throttle time
await new Promise((resolve) => {
setTimeout(resolve, 500);
});

const activeChannel = service.activeChannel!;
spyOn(activeChannel, 'markRead');

(activeChannel as MockChannel).handleEvent('message.new', mockMessage());

expect(activeChannel.markRead).toHaveBeenCalledTimes(1);

(activeChannel as MockChannel).handleEvent('message.new', mockMessage());

expect(activeChannel.markRead).toHaveBeenCalledTimes(1);

// wait for mark read throttle time
await new Promise((resolve) => {
setTimeout(resolve, 500);
});

expect(activeChannel.markRead).toHaveBeenCalledTimes(2);
});

it('should throttle mark read API calls - channel change', async () => {
await init();
// wait for mark read throttle time
await new Promise((resolve) => {
setTimeout(resolve, 500);
});

const activeChannel = service.activeChannel!;
spyOn(activeChannel, 'markRead');

(activeChannel as MockChannel).handleEvent('message.new', mockMessage());

expect(activeChannel.markRead).toHaveBeenCalledTimes(1);

(activeChannel as MockChannel).handleEvent('message.new', mockMessage());

expect(activeChannel.markRead).toHaveBeenCalledTimes(1);

service.setAsActiveChannel(service.channels[1]);

expect(activeChannel.markRead).toHaveBeenCalledTimes(2);
});

it('should throttle mark read API calls - reset', async () => {
await init();
// wait for mark read throttle time
await new Promise((resolve) => {
setTimeout(resolve, 500);
});

const activeChannel = service.activeChannel!;
spyOn(activeChannel, 'markRead');

(activeChannel as MockChannel).handleEvent('message.new', mockMessage());

expect(activeChannel.markRead).toHaveBeenCalledTimes(1);

(activeChannel as MockChannel).handleEvent('message.new', mockMessage());

expect(activeChannel.markRead).toHaveBeenCalledTimes(1);

service.reset();

expect(activeChannel.markRead).toHaveBeenCalledTimes(2);
});
});
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { fakeAsync, TestBed, tick } from '@angular/core/testing';
import { fakeAsync, flush, TestBed, tick } from '@angular/core/testing';
import { Subject } from 'rxjs';
import { first } from 'rxjs/operators';
import {
Expand Down Expand Up @@ -235,6 +235,7 @@ describe('ChannelService - threads', () => {
spy.calls.reset();
events$.next({ eventType: 'connection.recovered' } as ClientEvent);
tick();
flush();

expect(spy).toHaveBeenCalledWith(undefined);
}));
Expand Down Expand Up @@ -314,6 +315,10 @@ describe('ChannelService - threads', () => {

it('should watch for new message events', async () => {
await init();
// wait for mark read throttle time
await new Promise((resolve) => {
setTimeout(resolve, 500);
});
const spy = jasmine.createSpy();
const parentMessage = mockMessage();
await service.setAsActiveParentMessage(parentMessage);
Expand Down
28 changes: 26 additions & 2 deletions projects/stream-chat-angular/src/lib/channel.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,8 @@ export class ChannelService<
};
private dismissErrorNotification?: () => void;
private areReadEventsPaused = false;
private markReadTimeout?: ReturnType<typeof setTimeout>;
private scheduledMarkReadRequest?: Function;

constructor(
private chatClientService: ChatClientService<T>,
Expand Down Expand Up @@ -563,6 +565,7 @@ export class ChannelService<
return;
}
this.stopWatchForActiveChannelEvents(prevActiveChannel);
this.flushMarkReadQueue();
this.areReadEventsPaused = false;
const readState =
channel.state.read[this.chatClientService.chatClient.user?.id || ''];
Expand Down Expand Up @@ -595,6 +598,7 @@ export class ChannelService<
return;
}
this.stopWatchForActiveChannelEvents(activeChannel);
this.flushMarkReadQueue();
this.activeChannelMessagesSubject.next([]);
this.activeChannelSubject.next(undefined);
this.activeParentMessageIdSubject.next(undefined);
Expand Down Expand Up @@ -2220,8 +2224,10 @@ export class ChannelService<
this.usersTypingInThreadSubject.next([]);
}

private markRead(channel: Channel<T>) {
if (
private markRead(channel: Channel<T>, isThrottled = true) {
if (isThrottled) {
this.markReadThrottled(channel);
} else if (
this.canSendReadEvents &&
this.shouldMarkActiveChannelAsRead &&
!this.areReadEventsPaused
Expand All @@ -2230,6 +2236,24 @@ export class ChannelService<
}
}

private markReadThrottled(channel: Channel<T>) {
if (!this.markReadTimeout) {
this.markRead(channel, false);
this.markReadTimeout = setTimeout(() => {
this.flushMarkReadQueue();
}, 2000);
} else {
this.scheduledMarkReadRequest = () => this.markRead(channel, false);
}
}

private flushMarkReadQueue() {
this.scheduledMarkReadRequest?.();
this.scheduledMarkReadRequest = undefined;
clearTimeout(this.markReadTimeout);
this.markReadTimeout = undefined;
}

private async _init(settings: {
shouldSetActiveChannel: boolean;
messagePageSize: number;
Expand Down

0 comments on commit 827588f

Please sign in to comment.