From dbf4cbe3101d0a00820d5545e2617cf616d84d85 Mon Sep 17 00:00:00 2001 From: sherlock Date: Tue, 19 Mar 2024 15:25:19 +0700 Subject: [PATCH] fixup! fixup! TW-1586: add testing to guarantee that only 1 file is downloading at once --- lib/utils/task_queue/worker_queue.dart | 18 +- test/worker_queue_test.dart | 233 ++++++++++++++++++------- 2 files changed, 177 insertions(+), 74 deletions(-) diff --git a/lib/utils/task_queue/worker_queue.dart b/lib/utils/task_queue/worker_queue.dart index 8c9a8c345a..2b9e0ac966 100644 --- a/lib/utils/task_queue/worker_queue.dart +++ b/lib/utils/task_queue/worker_queue.dart @@ -8,7 +8,7 @@ typedef OnTaskCompleted = void Function(String? taskId); abstract class WorkerQueue { final Queue queue = Queue(); - Completer? completer; + Completer? _completer; String get workerName; @@ -21,13 +21,13 @@ abstract class WorkerQueue { } Future _processTask() async { - if (completer != null) { - return completer!.future; + if (_completer != null) { + return _completer!.future; } Logs().i( 'WorkerQueue<$workerName>::addTask(): QUEUE is ready', ); - completer = Completer(); + _completer = Completer(); if (queue.isNotEmpty) { final firstTask = queue.removeFirst(); Logs().i('WorkerQueue<$workerName>::_processTask(): ${firstTask.id}'); @@ -37,14 +37,14 @@ abstract class WorkerQueue { .catchError(_handleTaskExecuteError) .whenComplete(() => firstTask.onTaskCompleted?.call()); } else { - completer?.complete(); + _completer?.complete(); } - return completer!.future; + return _completer!.future; } void _handleTaskExecuteCompleted(dynamic value) { Logs().i('WorkerQueue<$workerName>::_handleTaskExecuteCompleted(): $value'); - completer?.complete(value); + _completer?.complete(value); _releaseCompleter(); if (queue.isNotEmpty) { _processTask(); @@ -53,7 +53,7 @@ abstract class WorkerQueue { void _handleTaskExecuteError(error) { Logs().i('WorkerQueue<$workerName>::_handleTaskExecuteError(): $error'); - completer?.complete(error); + _completer?.complete(error); _releaseCompleter(); if (queue.isNotEmpty) { _processTask(); @@ -61,7 +61,7 @@ abstract class WorkerQueue { } void _releaseCompleter() { - completer = null; + _completer = null; } Future release() async { diff --git a/test/worker_queue_test.dart b/test/worker_queue_test.dart index 613f7abd6d..c865367a76 100644 --- a/test/worker_queue_test.dart +++ b/test/worker_queue_test.dart @@ -20,19 +20,22 @@ void main() { // Testing group for worker queue group("worker queue test", () { // Test case for adding tasks without cancelation or error - test("add 4 tasks without any cancel or error in worker queue", () async { - final completedValues = []; - final returnValues = []; + test(""" + WHEN add 4 tasks without any cancel or error in worker queue, + SHOULD completed tasks should be order and in time + """, () async { + final completedTasks = []; + final alreadyRunTasks = []; final tasks = List.generate(4, (index) { return generateTask( '${index + 1}', - () async => await Future.delayed(const Duration(seconds: 5), () { - returnValues.add(index + 1); + () async => await Future.delayed(const Duration(seconds: 2), () { + alreadyRunTasks.add(index + 1); return index + 1; }), onTaskCompleted: () { Logs().i('task${index + 1} completed'); - completedValues.add(index + 1); + completedTasks.add(index + 1); }, ); }); @@ -40,68 +43,69 @@ void main() { final workerQueue = DownloadWorkerQueue(); tasks.forEach(workerQueue.addTask); - // Adjust the waiting time to slightly exceed the total execution time of all tasks - await Future.delayed(const Duration(seconds: 4)); + await Future.delayed(const Duration(seconds: 1)); expect(workerQueue.queue.length, 3); - await Future.delayed(const Duration(seconds: 4)); + await Future.delayed(const Duration(seconds: 1)); expect(workerQueue.queue.length, 2); - await Future.delayed(const Duration(seconds: 14)); + await Future.delayed(const Duration(seconds: 7)); // Verify that all tasks completed in the expected order - expect(completedValues, [1, 2, 3, 4]); - expect(returnValues, [1, 2, 3, 4]); + expect(completedTasks, [1, 2, 3, 4]); + expect(alreadyRunTasks, [1, 2, 3, 4]); }); - // Test case for handling a task with an error - test("handle a task error gracefully while processing tasks in queue", - () async { - final completedValues = []; - final returnValues = []; + test(""" + WHEN add 4 tasks consecutively, + THEN there is an error task while the other task is running + SHOULD completed tasks should be order + """, () async { + final completedTasks = []; + final alreadyRunTasks = []; final errorValues = []; final tasks = [ generateTask( '1', - () async => await Future.delayed(const Duration(seconds: 5), () { - returnValues.add(1); + () async => await Future.delayed(const Duration(seconds: 2), () { + alreadyRunTasks.add(1); return 1; }), onTaskCompleted: () { Logs().i('task1 completed'); - completedValues.add(1); + completedTasks.add(1); }, ), generateTask( '2', () async { - await Future.delayed(const Duration(seconds: 2)); + await Future.delayed(const Duration(seconds: 1)); errorValues.add('task2 error'); throw Exception('task2 error'); }, onTaskCompleted: () { Logs().i('task2 completed'); - completedValues.add(2); + completedTasks.add(2); }, ), generateTask( '3', - () async => await Future.delayed(const Duration(seconds: 5), () { - returnValues.add(3); + () async => await Future.delayed(const Duration(seconds: 2), () { + alreadyRunTasks.add(3); return 3; }), onTaskCompleted: () { Logs().i('task3 completed'); - completedValues.add(3); + completedTasks.add(3); }, ), generateTask( '4', - () async => await Future.delayed(const Duration(seconds: 5), () { - returnValues.add(4); + () async => await Future.delayed(const Duration(seconds: 2), () { + alreadyRunTasks.add(4); return 4; }), onTaskCompleted: () { Logs().i('task4 completed'); - completedValues.add(4); + completedTasks.add(4); }, ), ]; @@ -109,35 +113,33 @@ void main() { final workerQueue = DownloadWorkerQueue(); tasks.forEach(workerQueue.addTask); - await Future.delayed(const Duration(seconds: 22)); - // Verify that completedValues includes tasks 1, 3, and 4 but not the errored task 2 - expect(completedValues, containsAllInOrder([1, 3, 4])); - expect(returnValues, [ - 1, - 3, - 4, - ]); // Task 2 does not contribute to returnValues due to the error + await Future.delayed(const Duration(seconds: 10)); + // Verify that completedTasks includes tasks 1, 3, and 4 but not the errored task 2 + expect(completedTasks, containsAllInOrder([1, 3, 4])); + expect(alreadyRunTasks, containsAllInOrder([1, 3, 4])); expect( errorValues, ['task2 error'], ); // Verify that task 2's error is recorded }); - test( - "Handle a new task is coming to queue when there is task is procress in queue", - () async { - final completedValues = []; - final returnValues = []; + test(""" + WHEN a task is processing in queue with 4 tasks + THEN add a new task to the queue + SHOULD the new task should be added to the end of the queue + """, () async { + final completedTasks = []; + final alreadyRunTasks = []; final tasks = List.generate(4, (index) { return generateTask( '${index + 1}', - () async => await Future.delayed(const Duration(seconds: 5), () { - returnValues.add(index + 1); + () async => await Future.delayed(const Duration(seconds: 2), () { + alreadyRunTasks.add(index + 1); return index + 1; }), onTaskCompleted: () { Logs().i('task${index + 1} completed'); - completedValues.add(index + 1); + completedTasks.add(index + 1); }, ); }); @@ -145,48 +147,84 @@ void main() { final workerQueue = DownloadWorkerQueue(); tasks.forEach(workerQueue.addTask); - // Adjust the waiting time to slightly exceed the total execution time of all tasks - await Future.delayed(const Duration(seconds: 4)); + await Future.delayed(const Duration(seconds: 1)); expect(workerQueue.queue.length, 3); workerQueue.addTask( Task( id: '5', runnable: () async => - await Future.delayed(const Duration(seconds: 5), () { - returnValues.add(5); + await Future.delayed(const Duration(seconds: 2), () { + alreadyRunTasks.add(5); return 5; }), onTaskCompleted: () { Logs().i('task5 completed'); - completedValues.add(5); + completedTasks.add(5); }, ), ); expect(workerQueue.queue.length, 4); + await Future.delayed(const Duration(seconds: 10)); + // Verify that all tasks completed in the expected order + expect(completedTasks, containsAllInOrder([1, 2, 3, 4])); + expect(alreadyRunTasks, containsAllInOrder([1, 2, 3, 4])); + }); - await Future.delayed(const Duration(seconds: 4)); + test(""" + WHEN add 4 tasks with the same task id to the worker queue, + SHOULD the queue executes task in order and consider the duplicate task id as normal task + """, () async { + final completedTasks = []; + final alreadyRunTasks = []; + final tasks = List.generate(4, (index) { + return generateTask( + '1', + () async => await Future.delayed(const Duration(seconds: 2), () { + alreadyRunTasks.add(index + 1); + return index + 1; + }), + onTaskCompleted: () { + Logs().i('task${index + 1} completed'); + completedTasks.add(index + 1); + }, + ); + }); + + final workerQueue = DownloadWorkerQueue(); + tasks.forEach(workerQueue.addTask); + + await Future.delayed(const Duration(seconds: 1)); expect(workerQueue.queue.length, 3); - await Future.delayed(const Duration(seconds: 20)); + expect(workerQueue.queue.first.id, '1'); + + await Future.delayed(const Duration(seconds: 1)); + expect(workerQueue.queue.length, 2); + await Future.delayed(const Duration(seconds: 7)); // Verify that all tasks completed in the expected order - expect(completedValues, [1, 2, 3, 4, 5]); - expect(returnValues, [1, 2, 3, 4, 5]); + expect(completedTasks, containsAllInOrder([1, 2, 3, 4])); + expect(alreadyRunTasks, containsAllInOrder([1, 2, 3, 4])); }); - test("Add a task with same id to the queue", () async { - final completedValues = []; - final returnValues = []; + test(""" + WHEN add 4 tasks with the same task id to the worker queue, + THEN in task complete, there is async task + SHOULD async task do not block any task in the queue + """, () async { + final completedTasks = []; + final alreadyRunTasks = []; final tasks = List.generate(4, (index) { return generateTask( '1', - () async => await Future.delayed(const Duration(seconds: 5), () { - returnValues.add(index + 1); + () async => await Future.delayed(const Duration(seconds: 2), () { + alreadyRunTasks.add(index + 1); return index + 1; }), - onTaskCompleted: () { + onTaskCompleted: () async { Logs().i('task${index + 1} completed'); - completedValues.add(index + 1); + await Future.delayed(const Duration(seconds: 1)); + completedTasks.add(index + 1); }, ); }); @@ -194,16 +232,81 @@ void main() { final workerQueue = DownloadWorkerQueue(); tasks.forEach(workerQueue.addTask); - // Adjust the waiting time to slightly exceed the total execution time of all tasks + await Future.delayed(const Duration(seconds: 9)); + // Verify that all tasks completed in the expected order + expect(alreadyRunTasks, containsAllInOrder([1, 2, 3, 4])); await Future.delayed(const Duration(seconds: 4)); + expect(completedTasks, containsAllInOrder([1, 2, 3, 4])); + }); + + test(""" + WHEN add 4 tasks at once to the queue using Future.wait + SHOULD the tasks are executed in order and the queue is empty after all tasks are completed + """, () async { + final completedTasks = []; + final alreadyRunTasks = []; + + final tasks = List.generate(4, (index) { + return generateTask( + '1', + () async => await Future.delayed(const Duration(seconds: 2), () { + alreadyRunTasks.add(index + 1); + return index + 1; + }), + onTaskCompleted: () async { + Logs().i('task${index + 1} completed'); + completedTasks.add(index + 1); + }, + ); + }); + + final workerQueue = DownloadWorkerQueue(); + Future.wait(tasks.map((task) => workerQueue.addTask(task))); + await Future.delayed(const Duration(seconds: 1)); expect(workerQueue.queue.length, 3); + expect(workerQueue.queue.first.id, '1'); - await Future.delayed(const Duration(seconds: 4)); + await Future.delayed(const Duration(seconds: 9)); + // Verify that all tasks completed in the expected order + expect(alreadyRunTasks, containsAllInOrder([1, 2, 3, 4])); + expect(completedTasks, containsAllInOrder([1, 2, 3, 4])); + }); + + test(""" + WHEN add 4 tasks to the worker queue + THEN task 1 is processing + THEN remove the last task from the queue + SHOULD the queue executes task except the removed task + """, () async { + final completedTasks = []; + final alreadyRunTasks = []; + + final tasks = List.generate(4, (index) { + return generateTask( + '1', + () async => await Future.delayed(const Duration(seconds: 2), () { + alreadyRunTasks.add(index + 1); + return index + 1; + }), + onTaskCompleted: () async { + Logs().i('task${index + 1} completed'); + completedTasks.add(index + 1); + }, + ); + }); + + final workerQueue = DownloadWorkerQueue(); + Future.wait(tasks.map((task) => workerQueue.addTask(task))); + await Future.delayed(const Duration(seconds: 1)); + + workerQueue.queue.removeLast(); expect(workerQueue.queue.length, 2); - await Future.delayed(const Duration(seconds: 14)); + expect(workerQueue.queue.first.id, '1'); + + await Future.delayed(const Duration(seconds: 7)); // Verify that all tasks completed in the expected order - expect(completedValues, [1, 2, 3, 4]); - expect(returnValues, [1, 2, 3, 4]); + expect(alreadyRunTasks, containsAllInOrder([1, 2, 3])); + expect(completedTasks, containsAllInOrder([1, 2, 3])); }); }); }