From 827588f3d168192f84ee08356243288bccf82da0 Mon Sep 17 00:00:00 2001 From: Zita Szupera Date: Mon, 26 Aug 2024 16:41:45 +0200 Subject: [PATCH] fix: throttle mark read API requests --- .../src/lib/channel.service.spec.ts | 85 ++++++++++++++++++- .../src/lib/channel.service.thread.spec.ts | 7 +- .../src/lib/channel.service.ts | 28 +++++- 3 files changed, 116 insertions(+), 4 deletions(-) diff --git a/projects/stream-chat-angular/src/lib/channel.service.spec.ts b/projects/stream-chat-angular/src/lib/channel.service.spec.ts index 74c24d89..57f8b7f4 100644 --- a/projects/stream-chat-angular/src/lib/channel.service.spec.ts +++ b/projects/stream-chat-angular/src/lib/channel.service.spec.ts @@ -1,4 +1,4 @@ -import { fakeAsync, TestBed, tick } from '@angular/core/testing'; +import { fakeAsync, flush, TestBed, tick } from '@angular/core/testing'; import { Subject } from 'rxjs'; import { first, take } from 'rxjs/operators'; import { @@ -208,6 +208,7 @@ describe('ChannelService', () => { events$.next({ eventType: 'connection.recovered' } as ClientEvent); tick(); + flush(); expect(spy).toHaveBeenCalledWith(channels); expect(activeChannelSpy).toHaveBeenCalledWith(channels[0]); @@ -577,6 +578,10 @@ describe('ChannelService', () => { it('should watch for new message events', async () => { await init(); + // wait for mark read throttle time + await new Promise((resolve) => { + setTimeout(resolve, 500); + }); const spy = jasmine.createSpy(); service.activeChannelMessages$.subscribe(spy); const prevCount = (spy.calls.mostRecent().args[0] as Channel[]).length; @@ -991,6 +996,7 @@ describe('ChannelService', () => { it('should add the new channel to the top of the list, and start watching it, if user is added to a channel', fakeAsync(async () => { await init(); + flush(); const newChannel = generateMockChannels()[0]; newChannel.cid = 'newchannel'; newChannel.id = 'newchannel'; @@ -1030,6 +1036,7 @@ describe('ChannelService', () => { event: { channel: channel } as any as Event, }); tick(); + flush(); const channels = spy.calls.mostRecent().args[0] as Channel[]; const firstChannel = channels[0]; @@ -1059,6 +1066,7 @@ describe('ChannelService', () => { event: { channel: channel } as any as Event, }); tick(); + flush(); const channels = spy.calls.mostRecent().args[0] as Channel[]; @@ -2242,6 +2250,7 @@ describe('ChannelService', () => { it('should relaod active channel if active channel is not present after state reconnect', fakeAsync(async () => { await init(); + flush(); let activeChannel!: Channel; service.activeChannel$.subscribe((c) => (activeChannel = c!)); let channels!: Channel[]; @@ -2251,6 +2260,7 @@ describe('ChannelService', () => { mockChatClient.queryChannels.and.resolveTo(channels); events$.next({ eventType: 'connection.recovered' } as ClientEvent); tick(); + flush(); const spy = jasmine.createSpy(); service.activeChannel$.subscribe(spy); @@ -2276,6 +2286,7 @@ describe('ChannelService', () => { activeChannel.state.messages.push(newMessage); events$.next({ eventType: 'connection.recovered' } as ClientEvent); tick(); + flush(); expect(spy).not.toHaveBeenCalled(); expect(service.deselectActiveChannel).not.toHaveBeenCalled(); @@ -2639,4 +2650,76 @@ describe('ChannelService', () => { expect(customQuery).toHaveBeenCalledWith('next-page'); expect(hasMoreSpy).toHaveBeenCalledWith(false); }); + + it('should throttle mark read API calls', async () => { + await init(); + // wait for mark read throttle time + await new Promise((resolve) => { + setTimeout(resolve, 500); + }); + + const activeChannel = service.activeChannel!; + spyOn(activeChannel, 'markRead'); + + (activeChannel as MockChannel).handleEvent('message.new', mockMessage()); + + expect(activeChannel.markRead).toHaveBeenCalledTimes(1); + + (activeChannel as MockChannel).handleEvent('message.new', mockMessage()); + + expect(activeChannel.markRead).toHaveBeenCalledTimes(1); + + // wait for mark read throttle time + await new Promise((resolve) => { + setTimeout(resolve, 500); + }); + + expect(activeChannel.markRead).toHaveBeenCalledTimes(2); + }); + + it('should throttle mark read API calls - channel change', async () => { + await init(); + // wait for mark read throttle time + await new Promise((resolve) => { + setTimeout(resolve, 500); + }); + + const activeChannel = service.activeChannel!; + spyOn(activeChannel, 'markRead'); + + (activeChannel as MockChannel).handleEvent('message.new', mockMessage()); + + expect(activeChannel.markRead).toHaveBeenCalledTimes(1); + + (activeChannel as MockChannel).handleEvent('message.new', mockMessage()); + + expect(activeChannel.markRead).toHaveBeenCalledTimes(1); + + service.setAsActiveChannel(service.channels[1]); + + expect(activeChannel.markRead).toHaveBeenCalledTimes(2); + }); + + it('should throttle mark read API calls - reset', async () => { + await init(); + // wait for mark read throttle time + await new Promise((resolve) => { + setTimeout(resolve, 500); + }); + + const activeChannel = service.activeChannel!; + spyOn(activeChannel, 'markRead'); + + (activeChannel as MockChannel).handleEvent('message.new', mockMessage()); + + expect(activeChannel.markRead).toHaveBeenCalledTimes(1); + + (activeChannel as MockChannel).handleEvent('message.new', mockMessage()); + + expect(activeChannel.markRead).toHaveBeenCalledTimes(1); + + service.reset(); + + expect(activeChannel.markRead).toHaveBeenCalledTimes(2); + }); }); diff --git a/projects/stream-chat-angular/src/lib/channel.service.thread.spec.ts b/projects/stream-chat-angular/src/lib/channel.service.thread.spec.ts index 7c8b642b..b022a24c 100644 --- a/projects/stream-chat-angular/src/lib/channel.service.thread.spec.ts +++ b/projects/stream-chat-angular/src/lib/channel.service.thread.spec.ts @@ -1,4 +1,4 @@ -import { fakeAsync, TestBed, tick } from '@angular/core/testing'; +import { fakeAsync, flush, TestBed, tick } from '@angular/core/testing'; import { Subject } from 'rxjs'; import { first } from 'rxjs/operators'; import { @@ -235,6 +235,7 @@ describe('ChannelService - threads', () => { spy.calls.reset(); events$.next({ eventType: 'connection.recovered' } as ClientEvent); tick(); + flush(); expect(spy).toHaveBeenCalledWith(undefined); })); @@ -314,6 +315,10 @@ describe('ChannelService - threads', () => { it('should watch for new message events', async () => { await init(); + // wait for mark read throttle time + await new Promise((resolve) => { + setTimeout(resolve, 500); + }); const spy = jasmine.createSpy(); const parentMessage = mockMessage(); await service.setAsActiveParentMessage(parentMessage); diff --git a/projects/stream-chat-angular/src/lib/channel.service.ts b/projects/stream-chat-angular/src/lib/channel.service.ts index d3da017f..d4cb2399 100644 --- a/projects/stream-chat-angular/src/lib/channel.service.ts +++ b/projects/stream-chat-angular/src/lib/channel.service.ts @@ -428,6 +428,8 @@ export class ChannelService< }; private dismissErrorNotification?: () => void; private areReadEventsPaused = false; + private markReadTimeout?: ReturnType; + private scheduledMarkReadRequest?: Function; constructor( private chatClientService: ChatClientService, @@ -563,6 +565,7 @@ export class ChannelService< return; } this.stopWatchForActiveChannelEvents(prevActiveChannel); + this.flushMarkReadQueue(); this.areReadEventsPaused = false; const readState = channel.state.read[this.chatClientService.chatClient.user?.id || '']; @@ -595,6 +598,7 @@ export class ChannelService< return; } this.stopWatchForActiveChannelEvents(activeChannel); + this.flushMarkReadQueue(); this.activeChannelMessagesSubject.next([]); this.activeChannelSubject.next(undefined); this.activeParentMessageIdSubject.next(undefined); @@ -2220,8 +2224,10 @@ export class ChannelService< this.usersTypingInThreadSubject.next([]); } - private markRead(channel: Channel) { - if ( + private markRead(channel: Channel, isThrottled = true) { + if (isThrottled) { + this.markReadThrottled(channel); + } else if ( this.canSendReadEvents && this.shouldMarkActiveChannelAsRead && !this.areReadEventsPaused @@ -2230,6 +2236,24 @@ export class ChannelService< } } + private markReadThrottled(channel: Channel) { + if (!this.markReadTimeout) { + this.markRead(channel, false); + this.markReadTimeout = setTimeout(() => { + this.flushMarkReadQueue(); + }, 2000); + } else { + this.scheduledMarkReadRequest = () => this.markRead(channel, false); + } + } + + private flushMarkReadQueue() { + this.scheduledMarkReadRequest?.(); + this.scheduledMarkReadRequest = undefined; + clearTimeout(this.markReadTimeout); + this.markReadTimeout = undefined; + } + private async _init(settings: { shouldSetActiveChannel: boolean; messagePageSize: number;