Skip to content

Commit

Permalink
msglist: Throttle fetchOlder retries
Browse files Browse the repository at this point in the history
This approach is different from how a BackoffMachine is typically used,
because the message list doesn't send and retry requests in a loop; its
caller retries rapidly on scroll changes, and we want to ignore the
excessive requests.

The test drops irrelevant requests with `connection.takeRequests`
without checking, as we are only interested in verifying that no request
was sent.

Fixes: #945

Signed-off-by: Zixuan James Li <[email protected]>
  • Loading branch information
PIG208 committed Nov 12, 2024
1 parent 67fbf9e commit 71e0376
Show file tree
Hide file tree
Showing 2 changed files with 130 additions and 14 deletions.
76 changes: 63 additions & 13 deletions lib/model/message_list.dart
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import 'dart:async';

import 'package:collection/collection.dart';
import 'package:flutter/foundation.dart';

import '../api/backoff.dart';
import '../api/model/events.dart';
import '../api/model/model.dart';
import '../api/route/messages.dart';
Expand Down Expand Up @@ -89,9 +92,32 @@ mixin _MessageSequence {
bool _haveOldest = false;

/// Whether we are currently fetching the next batch of older messages.
///
/// When this is true, [fetchOlder] is a no-op.
/// That method is called frequently by Flutter's scrolling logic,
/// and this field helps us avoid spamming the same request just to get
/// the same response each time.
///
/// See also [fetchOlderCoolingDown].
bool get fetchingOlder => _fetchingOlder;
bool _fetchingOlder = false;

/// Whether [fetchOlder] had a request error recently.
///
/// When this is true, [fetchOlder] is a no-op.
/// That method is called frequently by Flutter's scrolling logic,
/// and this field mitigates spamming the same request and getting
/// the same error each time.
///
/// "Recently" is decided by a [BackoffMachine] that resets
/// when a [fetchOlder] request succeeds.
///
/// See also [fetchingOlder].
bool get fetchOlderCoolingDown => _fetchOlderCoolingDown;
bool _fetchOlderCoolingDown = false;

BackoffMachine? _fetchOlderCooldownBackoffMachine;

/// The parsed message contents, as a list parallel to [messages].
///
/// The i'th element is the result of parsing the i'th element of [messages].
Expand All @@ -107,7 +133,7 @@ mixin _MessageSequence {
/// before, between, or after the messages.
///
/// This information is completely derived from [messages] and
/// the flags [haveOldest] and [fetchingOlder].
/// the flags [haveOldest], [fetchingOlder] and [fetchOlderCoolingDown].
/// It exists as an optimization, to memoize that computation.
final QueueList<MessageListItem> items = QueueList();

Expand Down Expand Up @@ -241,6 +267,8 @@ mixin _MessageSequence {
_fetched = false;
_haveOldest = false;
_fetchingOlder = false;
_fetchOlderCoolingDown = false;
_fetchOlderCooldownBackoffMachine = null;
contents.clear();
items.clear();
}
Expand Down Expand Up @@ -290,10 +318,12 @@ mixin _MessageSequence {
void _updateEndMarkers() {
assert(fetched);
assert(!(haveOldest && fetchingOlder));
final startMarker = switch ((fetchingOlder, haveOldest)) {
(true, _) => const MessageListLoadingItem(MessageListDirection.older),
(_, true) => const MessageListHistoryStartItem(),
(_, _) => null,
assert(!(fetchingOlder && fetchOlderCoolingDown));
final startMarker = switch ((fetchingOlder, haveOldest, fetchOlderCoolingDown)) {
(true, _, _) => const MessageListLoadingItem(MessageListDirection.older),
(_, true, _) => const MessageListHistoryStartItem(),
(_, _, true) => const MessageListLoadingItem(MessageListDirection.older),
(_, _, _) => null,
};
final hasStartMarker = switch (items.firstOrNull) {
MessageListLoadingItem() => true,
Expand Down Expand Up @@ -470,7 +500,7 @@ class MessageListView with ChangeNotifier, _MessageSequence {
Future<void> fetchInitial() async {
// TODO(#80): fetch from anchor firstUnread, instead of newest
// TODO(#82): fetch from a given message ID as anchor
assert(!fetched && !haveOldest && !fetchingOlder);
assert(!fetched && !haveOldest && !fetchingOlder && !fetchOlderCoolingDown);
assert(messages.isEmpty && contents.isEmpty);
// TODO schedule all this in another isolate
final generation = this.generation;
Expand Down Expand Up @@ -498,20 +528,28 @@ class MessageListView with ChangeNotifier, _MessageSequence {
Future<void> fetchOlder() async {
if (haveOldest) return;
if (fetchingOlder) return;
if (fetchOlderCoolingDown) return;
assert(fetched);
assert(messages.isNotEmpty);
_fetchingOlder = true;
_updateEndMarkers();
notifyListeners();
final generation = this.generation;
bool hasFetchError = false;
try {
final result = await getMessages(store.connection,
narrow: narrow.apiEncode(),
anchor: NumericAnchor(messages[0].id),
includeAnchor: false,
numBefore: kMessageListFetchBatchSize,
numAfter: 0,
);
final GetMessagesResult result;
try {
result = await getMessages(store.connection,
narrow: narrow.apiEncode(),
anchor: NumericAnchor(messages[0].id),
includeAnchor: false,
numBefore: kMessageListFetchBatchSize,
numAfter: 0,
);
} catch (e) {
hasFetchError = true;
rethrow;
}
if (this.generation > generation) return;

if (result.messages.isNotEmpty
Expand All @@ -529,12 +567,24 @@ class MessageListView with ChangeNotifier, _MessageSequence {

_insertAllMessages(0, fetchedMessages);
_haveOldest = result.foundOldest;
_fetchOlderCooldownBackoffMachine = null;
} finally {
if (this.generation != generation) {
// ignore: control_flow_in_finally
return;
}
_fetchingOlder = false;
if (hasFetchError) {
assert(!fetchOlderCoolingDown);
_fetchOlderCoolingDown = true;
unawaited((_fetchOlderCooldownBackoffMachine ??= BackoffMachine())
.wait().then((_) {
if (this.generation != generation) return;
_fetchOlderCoolingDown = false;
_updateEndMarkers();
notifyListeners();
}));
}
_updateEndMarkers();
notifyListeners();
}
Expand Down
68 changes: 67 additions & 1 deletion test/model/message_list_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import 'dart:convert';
import 'package:checks/checks.dart';
import 'package:http/http.dart' as http;
import 'package:test/scaffolding.dart';
import 'package:zulip/api/exception.dart';
import 'package:zulip/api/model/events.dart';
import 'package:zulip/api/model/model.dart';
import 'package:zulip/api/model/narrow.dart';
Expand Down Expand Up @@ -236,6 +237,34 @@ void main() {
..messages.length.equals(30);
});

test('fetchOlder nop during backoff', () => awaitFakeAsync((async) async {
final olderMessages = List.generate(5, (i) => eg.streamMessage());
final initialMessages = List.generate(5, (i) => eg.streamMessage());
await prepare(narrow: const CombinedFeedNarrow());
await prepareMessages(foundOldest: false, messages: initialMessages);

connection.prepare(httpStatus: 400, json: {
'result': 'error', 'code': 'BAD_REQUEST', 'msg': 'Bad request'});
check(async.pendingTimers).isEmpty();
await check(model.fetchOlder()).throws<ZulipApiException>();
checkNotified(count: 2);
check(model).fetchOlderCoolingDown.isTrue();

connection.takeRequests();
await model.fetchOlder();
checkNotNotified();
check(model).fetchOlderCoolingDown.isTrue();
check(model).fetchingOlder.isFalse();

async.flushTimers();
check(model).fetchOlderCoolingDown.isFalse();

connection.prepare(json: olderResult(
anchor: 1000, foundOldest: false, messages: olderMessages).toJson());
await model.fetchOlder();
checkNotified(count: 2);
}));

test('fetchOlder handles servers not understanding includeAnchor', () async {
const narrow = CombinedFeedNarrow();
await prepare(narrow: narrow);
Expand Down Expand Up @@ -983,6 +1012,42 @@ void main() {
checkNotifiedOnce();
}));

test('fetchOlder backoff start, _reset, fetchOlder backoff ends, move fetch finishes', () => awaitFakeAsync((async) async {
await prepareNarrow(narrow, initialMessages);

connection.prepare(httpStatus: 400, json: {
'result': 'error', 'code': 'BAD_REQUEST', 'msg': 'Bad request'});
await check(model.fetchOlder()).throws<ZulipApiException>();
final backoffTimer = async.pendingTimers.single;
check(model).fetchOlderCoolingDown.isTrue();
checkHasMessages(initialMessages);
checkNotified(count: 2);

connection.prepare(delay: const Duration(seconds: 2), json: newestResult(
foundOldest: false,
messages: initialMessages + movedMessages,
).toJson());
await store.handleEvent(eg.updateMessageEventMoveTo(
origTopic: movedMessages[0].topic,
origStreamId: otherStream.streamId,
newMessages: movedMessages,
));
check(model).fetchOlderCoolingDown.isFalse();
check(async.pendingTimers).contains(backoffTimer);
checkHasMessages([]);
checkNotifiedOnce();

// The first backoff is expected to be short enough to complete.
async.elapse(const Duration(seconds: 1));
check(model).fetchOlderCoolingDown.isFalse();
check(async.pendingTimers).not((x) => x.contains(backoffTimer));
checkNotNotified();

async.elapse(const Duration(seconds: 1));
checkHasMessages(initialMessages + movedMessages);
checkNotifiedOnce();
}));

test('fetchOlder, _reset, move fetch finishes, fetchOlder returns', () => awaitFakeAsync((async) async {
await prepareNarrow(narrow, initialMessages);

Expand Down Expand Up @@ -1791,7 +1856,7 @@ void checkInvariants(MessageListView model) {
if (model.haveOldest) {
check(model.items[i++]).isA<MessageListHistoryStartItem>();
}
if (model.fetchingOlder) {
if (model.fetchingOlder || model.fetchOlderCoolingDown) {
check(model.items[i++]).isA<MessageListLoadingItem>();
}
for (int j = 0; j < model.messages.length; j++) {
Expand Down Expand Up @@ -1847,6 +1912,7 @@ extension MessageListViewChecks on Subject<MessageListView> {
Subject<bool> get fetched => has((x) => x.fetched, 'fetched');
Subject<bool> get haveOldest => has((x) => x.haveOldest, 'haveOldest');
Subject<bool> get fetchingOlder => has((x) => x.fetchingOlder, 'fetchingOlder');
Subject<bool> get fetchOlderCoolingDown => has((x) => x.fetchOlderCoolingDown, 'fetchOlderCoolingDown');
}

/// A GetMessagesResult the server might return on an `anchor=newest` request.
Expand Down

0 comments on commit 71e0376

Please sign in to comment.