Skip to content

Commit

Permalink
msglist: Throttle fetchOlder retries
Browse files Browse the repository at this point in the history
This approach is different from how a BackoffMachine is typically used,
because the message list doesn't send and retry requests in a loop; its
caller retries rapidly on scroll changes, and we want to ignore the
excessive requests.

Fixes: #945

Signed-off-by: Zixuan James Li <[email protected]>
  • Loading branch information
PIG208 authored and gnprice committed Dec 11, 2024
1 parent 31a6d9c commit 381370e
Show file tree
Hide file tree
Showing 2 changed files with 170 additions and 13 deletions.
73 changes: 62 additions & 11 deletions lib/model/message_list.dart
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import 'dart:async';

import 'package:collection/collection.dart';
import 'package:flutter/foundation.dart';

import '../api/backoff.dart';
import '../api/model/events.dart';
import '../api/model/model.dart';
import '../api/route/messages.dart';
Expand Down Expand Up @@ -89,9 +92,32 @@ mixin _MessageSequence {
bool _haveOldest = false;

/// Whether we are currently fetching the next batch of older messages.
///
/// When this is true, [fetchOlder] is a no-op.
/// That method is called frequently by Flutter's scrolling logic,
/// and this field helps us avoid spamming the same request just to get
/// the same response each time.
///
/// See also [fetchOlderCoolingDown].
bool get fetchingOlder => _fetchingOlder;
bool _fetchingOlder = false;

/// Whether [fetchOlder] had a request error recently.
///
/// When this is true, [fetchOlder] is a no-op.
/// That method is called frequently by Flutter's scrolling logic,
/// and this field mitigates spamming the same request and getting
/// the same error each time.
///
/// "Recently" is decided by a [BackoffMachine] that resets
/// when a [fetchOlder] request succeeds.
///
/// See also [fetchingOlder].
bool get fetchOlderCoolingDown => _fetchOlderCoolingDown;
bool _fetchOlderCoolingDown = false;

BackoffMachine? _fetchOlderCooldownBackoffMachine;

/// The parsed message contents, as a list parallel to [messages].
///
/// The i'th element is the result of parsing the i'th element of [messages].
Expand All @@ -107,7 +133,7 @@ mixin _MessageSequence {
/// before, between, or after the messages.
///
/// This information is completely derived from [messages] and
/// the flags [haveOldest] and [fetchingOlder].
/// the flags [haveOldest], [fetchingOlder] and [fetchOlderCoolingDown].
/// It exists as an optimization, to memoize that computation.
final QueueList<MessageListItem> items = QueueList();

Expand Down Expand Up @@ -241,6 +267,8 @@ mixin _MessageSequence {
_fetched = false;
_haveOldest = false;
_fetchingOlder = false;
_fetchOlderCoolingDown = false;
_fetchOlderCooldownBackoffMachine = null;
contents.clear();
items.clear();
}
Expand Down Expand Up @@ -289,8 +317,10 @@ mixin _MessageSequence {
/// Update [items] to include markers at start and end as appropriate.
void _updateEndMarkers() {
assert(fetched);
assert(!(haveOldest && fetchingOlder));
final startMarker = switch ((fetchingOlder, haveOldest)) {
assert(!(fetchingOlder && fetchOlderCoolingDown));
final effectiveFetchingOlder = fetchingOlder || fetchOlderCoolingDown;
assert(!(effectiveFetchingOlder && haveOldest));
final startMarker = switch ((effectiveFetchingOlder, haveOldest)) {
(true, _) => const MessageListLoadingItem(MessageListDirection.older),
(_, true) => const MessageListHistoryStartItem(),
(_, _) => null,
Expand Down Expand Up @@ -470,7 +500,7 @@ class MessageListView with ChangeNotifier, _MessageSequence {
Future<void> fetchInitial() async {
// TODO(#80): fetch from anchor firstUnread, instead of newest
// TODO(#82): fetch from a given message ID as anchor
assert(!fetched && !haveOldest && !fetchingOlder);
assert(!fetched && !haveOldest && !fetchingOlder && !fetchOlderCoolingDown);
assert(messages.isEmpty && contents.isEmpty);
// TODO schedule all this in another isolate
final generation = this.generation;
Expand Down Expand Up @@ -498,20 +528,28 @@ class MessageListView with ChangeNotifier, _MessageSequence {
Future<void> fetchOlder() async {
if (haveOldest) return;
if (fetchingOlder) return;
if (fetchOlderCoolingDown) return;
assert(fetched);
assert(messages.isNotEmpty);
_fetchingOlder = true;
_updateEndMarkers();
notifyListeners();
final generation = this.generation;
bool hasFetchError = false;
try {
final result = await getMessages(store.connection,
narrow: narrow.apiEncode(),
anchor: NumericAnchor(messages[0].id),
includeAnchor: false,
numBefore: kMessageListFetchBatchSize,
numAfter: 0,
);
final GetMessagesResult result;
try {
result = await getMessages(store.connection,
narrow: narrow.apiEncode(),
anchor: NumericAnchor(messages[0].id),
includeAnchor: false,
numBefore: kMessageListFetchBatchSize,
numAfter: 0,
);
} catch (e) {
hasFetchError = true;
rethrow;
}
if (this.generation > generation) return;

if (result.messages.isNotEmpty
Expand All @@ -532,6 +570,19 @@ class MessageListView with ChangeNotifier, _MessageSequence {
} finally {
if (this.generation == generation) {
_fetchingOlder = false;
if (hasFetchError) {
assert(!fetchOlderCoolingDown);
_fetchOlderCoolingDown = true;
unawaited((_fetchOlderCooldownBackoffMachine ??= BackoffMachine())
.wait().then((_) {
if (this.generation != generation) return;
_fetchOlderCoolingDown = false;
_updateEndMarkers();
notifyListeners();
}));
} else {
_fetchOlderCooldownBackoffMachine = null;
}
_updateEndMarkers();
notifyListeners();
}
Expand Down
110 changes: 108 additions & 2 deletions test/model/message_list_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ import 'dart:convert';
import 'package:checks/checks.dart';
import 'package:http/http.dart' as http;
import 'package:test/scaffolding.dart';
import 'package:zulip/api/backoff.dart';
import 'package:zulip/api/exception.dart';
import 'package:zulip/api/model/events.dart';
import 'package:zulip/api/model/model.dart';
import 'package:zulip/api/model/narrow.dart';
Expand Down Expand Up @@ -238,6 +240,40 @@ void main() {
..messages.length.equals(30);
});

test('fetchOlder nop during backoff', () => awaitFakeAsync((async) async {
final olderMessages = List.generate(5, (i) => eg.streamMessage());
final initialMessages = List.generate(5, (i) => eg.streamMessage());
await prepare(narrow: const CombinedFeedNarrow());
await prepareMessages(foundOldest: false, messages: initialMessages);
check(connection.takeRequests()).single;

connection.prepare(httpStatus: 400, json: {
'result': 'error', 'code': 'BAD_REQUEST', 'msg': 'Bad request'});
check(async.pendingTimers).isEmpty();
await check(model.fetchOlder()).throws<ZulipApiException>();
checkNotified(count: 2);
check(model).fetchOlderCoolingDown.isTrue();
check(connection.takeRequests()).single;

await model.fetchOlder();
checkNotNotified();
check(model).fetchOlderCoolingDown.isTrue();
check(model).fetchingOlder.isFalse();
check(connection.lastRequest).isNull();

// Wait long enough that a first backoff is sure to finish.
async.elapse(const Duration(seconds: 1));
check(model).fetchOlderCoolingDown.isFalse();
checkNotifiedOnce();
check(connection.lastRequest).isNull();

connection.prepare(json: olderResult(
anchor: 1000, foundOldest: false, messages: olderMessages).toJson());
await model.fetchOlder();
checkNotified(count: 2);
check(connection.takeRequests()).single;
}));

test('fetchOlder handles servers not understanding includeAnchor', () async {
const narrow = CombinedFeedNarrow();
await prepare(narrow: narrow);
Expand Down Expand Up @@ -1020,6 +1056,70 @@ void main() {
checkNotNotified();
}));

test('fetchOlder backoff A starts, _reset, move fetch finishes,'
' fetchOlder backoff B starts, fetchOlder backoff A ends', () => awaitFakeAsync((async) async {
addTearDown(() => BackoffMachine.debugDuration = null);
await prepareNarrow(narrow, initialMessages);

connection.prepare(httpStatus: 400, json: {
'result': 'error', 'code': 'BAD_REQUEST', 'msg': 'Bad request'});
BackoffMachine.debugDuration = const Duration(seconds: 1);
await check(model.fetchOlder()).throws<ZulipApiException>();
final backoffTimerA = async.pendingTimers.single;
check(model).fetchOlderCoolingDown.isTrue();
check(model).fetched.isTrue();
checkHasMessages(initialMessages);
checkNotified(count: 2);

connection.prepare(json: newestResult(
foundOldest: false,
messages: initialMessages + movedMessages,
).toJson());
await store.handleEvent(eg.updateMessageEventMoveTo(
origTopic: movedMessages[0].topic,
origStreamId: otherStream.streamId,
newMessages: movedMessages,
));
// Check that _reset was called.
check(model).fetched.isFalse();
checkHasMessages([]);
checkNotifiedOnce();
check(model).fetchOlderCoolingDown.isFalse();
check(backoffTimerA.isActive).isTrue();

async.elapse(Duration.zero);
check(model).fetched.isTrue();
checkHasMessages(initialMessages + movedMessages);
checkNotifiedOnce();
check(model).fetchOlderCoolingDown.isFalse();
check(backoffTimerA.isActive).isTrue();

connection.prepare(httpStatus: 400, json: {
'result': 'error', 'code': 'BAD_REQUEST', 'msg': 'Bad request'});
BackoffMachine.debugDuration = const Duration(seconds: 2);
await check(model.fetchOlder()).throws<ZulipApiException>();
final backoffTimerB = async.pendingTimers.last;
check(model).fetchOlderCoolingDown.isTrue();
check(backoffTimerA.isActive).isTrue();
check(backoffTimerB.isActive).isTrue();
checkNotified(count: 2);

// When `backoffTimerA` ends, `fetchOlderCoolingDown` remains `true`
// because the backoff was from a previous generation.
async.elapse(const Duration(seconds: 1));
check(model).fetchOlderCoolingDown.isTrue();
check(backoffTimerA.isActive).isFalse();
check(backoffTimerB.isActive).isTrue();
checkNotNotified();

// When `backoffTimerB` ends, `fetchOlderCoolingDown` gets reset.
async.elapse(const Duration(seconds: 1));
check(model).fetchOlderCoolingDown.isFalse();
check(backoffTimerA.isActive).isFalse();
check(backoffTimerB.isActive).isFalse();
checkNotifiedOnce();
}));

test('fetchInitial, _reset, initial fetch finishes, move fetch finishes', () => awaitFakeAsync((async) async {
await prepareNarrow(narrow, null);

Expand Down Expand Up @@ -1750,10 +1850,15 @@ void checkInvariants(MessageListView model) {
check(model)
..messages.isEmpty()
..haveOldest.isFalse()
..fetchingOlder.isFalse();
..fetchingOlder.isFalse()
..fetchOlderCoolingDown.isFalse();
}
if (model.haveOldest) {
check(model).fetchingOlder.isFalse();
check(model).fetchOlderCoolingDown.isFalse();
}
if (model.fetchingOlder) {
check(model).fetchOlderCoolingDown.isFalse();
}

for (final message in model.messages) {
Expand Down Expand Up @@ -1793,7 +1898,7 @@ void checkInvariants(MessageListView model) {
if (model.haveOldest) {
check(model.items[i++]).isA<MessageListHistoryStartItem>();
}
if (model.fetchingOlder) {
if (model.fetchingOlder || model.fetchOlderCoolingDown) {
check(model.items[i++]).isA<MessageListLoadingItem>();
}
for (int j = 0; j < model.messages.length; j++) {
Expand Down Expand Up @@ -1849,4 +1954,5 @@ extension MessageListViewChecks on Subject<MessageListView> {
Subject<bool> get fetched => has((x) => x.fetched, 'fetched');
Subject<bool> get haveOldest => has((x) => x.haveOldest, 'haveOldest');
Subject<bool> get fetchingOlder => has((x) => x.fetchingOlder, 'fetchingOlder');
Subject<bool> get fetchOlderCoolingDown => has((x) => x.fetchOlderCoolingDown, 'fetchOlderCoolingDown');
}

0 comments on commit 381370e

Please sign in to comment.