From c4556000c6e55450f1a9759d0b8835adc0c2b303 Mon Sep 17 00:00:00 2001 From: Marco Castelluccio Date: Thu, 26 Sep 2024 01:18:09 +0000 Subject: [PATCH] Bug 1920020 [wpt PR 48280] - DOM: Implement abortable async iterable Observables, a=testonly Automatic update from web-platform-tests DOM: Implement abortable async iterable Observables The IteratorRecord#return() function exists as an optional method that sync and async iterator records can supply [1] [2]. They allow for the language, or any consumer of an iterable, to signal to the iterable that the consumer will stop consuming values prematurely (i.e., before exhaustion). This method must be invoked when the consumer aborts its subscription to an Observable that was derived from an iterable. The abort reason is supplied to the `return()` iterator function for completeness. This CL: 1. Adds tests for sync & async iterables 2. Implements this for async iterables A follow-up CL will implement this for sync iterables. The semantics are specified in https://github.com/WICG/observable/pull/160. [1]: https://tc39.es/ecma262/#table-iterator-interface-optional-properties [2]: https://tc39.es/ecma262/#table-async-iterator-optional Bug: 40282760 Change-Id: Ie1091b24b233afecdec572feadc129bcc8a2d4d3 Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/5854985 Reviewed-by: Mason Freed Commit-Queue: Dominic Farolino Reviewed-by: Nate Chapin Cr-Commit-Position: refs/heads/main{#1359083} -- wpt-commits: 83154d0455e572de16e84ebee72f56df73d2ceb3 wpt-pr: 48280 UltraBlame original commit: 3bc676bcae7a77bead1736e9310110ec2cb53e56 --- .../tentative/observable-from.any.js | 440 ++++++++++++++++++ 1 file changed, 440 insertions(+) diff --git a/testing/web-platform/tests/dom/observable/tentative/observable-from.any.js b/testing/web-platform/tests/dom/observable/tentative/observable-from.any.js index b7b916379960..422e71801b04 100644 --- a/testing/web-platform/tests/dom/observable/tentative/observable-from.any.js +++ b/testing/web-platform/tests/dom/observable/tentative/observable-from.any.js @@ -682,6 +682,36 @@ promise_test(async t => { ]); }, "from(): Asynchronous iterable multiple in-flight subscriptions competing"); + + + + +test(() => { + const results = []; + + const array = [1, 2, 3, 4, 5]; + const source = Observable.from(array); + source.subscribe({ + next: v => { + results.push(v); + if (v === 3) { + + source.subscribe({ + next: v => results.push(v), + complete: () => results.push('inner complete'), + }); + } + }, + complete: () => results.push('outer complete'), + }); + + assert_array_equals(results, [ + 1, 2, 3, + 1, 2, 3, 4, 5, 'inner complete', + 4, 5, 'outer complete' + ]); +}, "from(): Sync iterable multiple in-flight subscriptions competing"); + promise_test(async () => { const async_generator = async function*() { yield 1; @@ -866,6 +896,126 @@ test(() => { }, "from(): Aborting sync iterable midway through iteration both stops iteration " + "and invokes `IteratorRecord#return()"); +promise_test(async t => { + const results = []; + const iterable = { + [Symbol.asyncIterator]() { + return { + val: 0, + next() { + results.push(`IteratorRecord#next() pushing ${this.val}`); + return { + value: this.val, + done: this.val++ === 10 ? true : false, + }; + }, + return(reason) { + results.push(`IteratorRecord#return() called with reason=${reason}`); + return {done: true}; + }, + }; + }, + }; + + const ac = new AbortController(); + await new Promise(resolve => { + Observable.from(iterable).subscribe(v => { + results.push(`Observing ${v}`); + if (v === 3) { + ac.abort(`Aborting because v=${v}`); + resolve(); + } + }, {signal: ac.signal}); + }); + + assert_array_equals(results, [ + "IteratorRecord#next() pushing 0", + "Observing 0", + "IteratorRecord#next() pushing 1", + "Observing 1", + "IteratorRecord#next() pushing 2", + "Observing 2", + "IteratorRecord#next() pushing 3", + "Observing 3", + "IteratorRecord#return() called with reason=Aborting because v=3", + ]); +}, "from(): Aborting async iterable midway through iteration both stops iteration " + + "and invokes `IteratorRecord#return()"); + +test(() => { + const iterable = { + [Symbol.iterator]() { + return { + val: 0, + next() { + return {value: this.val, done: this.val++ === 10 ? true : false}; + }, + + return(reason) {}, + }; + }, + }; + + let thrownError = null; + const ac = new AbortController(); + Observable.from(iterable).subscribe(v => { + if (v === 3) { + try { + ac.abort(`Aborting because v=${v}`); + } catch (e) { + thrownError = e; + } + } + }, {signal: ac.signal}); + + assert_not_equals(thrownError, null, "abort() threw an Error"); + assert_true(thrownError instanceof TypeError); + assert_true(thrownError.message.includes('return()')); + assert_true(thrownError.message.includes('Object')); +}, "from(): Sync iterable: `Iterator#return()` must return an Object, or an " + + "error is thrown"); + + +promise_test(async t => { + const iterable = { + [Symbol.asyncIterator]() { + return { + val: 0, + next() { + return {value: this.val, done: this.val++ === 10 ? true : false}; + }, + + return(reason) {}, + }; + }, + }; + + const unhandled_rejection_promise = new Promise((resolve, reject) => { + const unhandled_rejection_handler = e => resolve(e.reason); + self.addEventListener("unhandledrejection", unhandled_rejection_handler); + t.add_cleanup(() => + self.removeEventListener("unhandledrejection", unhandled_rejection_handler)); + + t.step_timeout(() => reject('Timeout'), 3000); + }); + + const ac = new AbortController(); + await new Promise(resolve => { + Observable.from(iterable).subscribe(v => { + if (v === 3) { + ac.abort(`Aborting because v=${v}`); + resolve(); + } + }, {signal: ac.signal}); + }); + + const reason = await unhandled_rejection_promise; + assert_true(reason instanceof TypeError); + assert_true(reason.message.includes('return()')); + assert_true(reason.message.includes('Object')); +}, "from(): Async iterable: `Iterator#return()` must return an Object, or a " + + "Promise rejects asynchronously"); + @@ -1116,3 +1266,293 @@ promise_test(async t => { assert_array_equals(results, [0, 1, 2, 3, "from the async generator"]); assert_true(generatorFinalized); }, "from(): Async generator finally block run when Observable errors"); + + +test(() => { + const results = []; + const iterable = { + [Symbol.iterator]() { + return { + val: 0, + next() { + results.push('next() called'); + return {value: this.val, done: this.val++ === 10 ? true : false}; + }, + return() { + results.push('return() about to throw an error'); + throw new Error('return() error'); + }, + }; + } + }; + + const ac = new AbortController(); + const source = Observable.from(iterable); + source.subscribe(v => { + if (v === 3) { + try { + ac.abort(); + } catch (e) { + results.push(`AbortController#abort() threw an error: ${e.message}`); + } + } + }, {signal: ac.signal}); + + assert_array_equals(results, [ + 'next() called', + 'next() called', + 'next() called', + 'next() called', + 'return() about to throw an error', + 'AbortController#abort() threw an error: return() error', + ]); +}, "from(): Sync iterable: error thrown from IteratorRecord#return() can be " + + "synchronously caught"); +promise_test(async t => { + const results = []; + const iterable = { + [Symbol.asyncIterator]() { + return { + val: 0, + next() { + results.push('next() called'); + return {value: this.val, done: this.val++ === 10 ? true : false}; + }, + return() { + results.push('return() about to throw an error'); + + + + + + throw new Error('return() error'); + }, + }; + } + }; + + const unhandled_rejection_promise = new Promise((resolve, reject) => { + const unhandled_rejection_handler = e => resolve(e.reason); + self.addEventListener("unhandledrejection", unhandled_rejection_handler); + t.add_cleanup(() => + self.removeEventListener("unhandledrejection", unhandled_rejection_handler)); + + t.step_timeout(() => reject('Timeout'), 1500); + }); + + const ac = new AbortController(); + const source = Observable.from(iterable); + await new Promise((resolve, reject) => { + source.subscribe(v => { + if (v === 3) { + try { + ac.abort(); + results.push('No error thrown synchronously'); + resolve('No error thrown synchronously'); + } catch (e) { + results.push(`AbortController#abort() threw an error: ${e.message}`); + reject(e); + } + } + }, {signal: ac.signal}); + }); + + assert_array_equals(results, [ + 'next() called', + 'next() called', + 'next() called', + 'next() called', + 'return() about to throw an error', + 'No error thrown synchronously', + ]); + + const reason = await unhandled_rejection_promise; + assert_true(reason instanceof Error); + assert_equals(reason.message, "return() error", + "Custom error text passed through rejected Promise"); +}, "from(): Async iterable: error thrown from IteratorRecord#return() is " + + "wrapped in rejected Promise"); + +test(() => { + const results = []; + const iterable = { + impl() { + return { + next() { + results.push('next() running'); + return {done: true}; + } + }; + } + }; + + iterable[Symbol.iterator] = iterable.impl; + { + const source = Observable.from(iterable); + source.subscribe({}, {signal: AbortSignal.abort()}); + assert_array_equals(results, []); + } + iterable[Symbol.iterator] = undefined; + iterable[Symbol.asyncIterator] = iterable.impl; + { + const source = Observable.from(iterable); + source.subscribe({}, {signal: AbortSignal.abort()}); + assert_array_equals(results, []); + } +}, "from(): Subscribing to an iterable Observable with an aborted signal " + + "does not call next()"); + +test(() => { + const results = []; + const ac = new AbortController(); + + const iterable = { + [Symbol.iterator]() { + ac.abort(); + return { + val: 0, + next() { + results.push('next() called'); + return {done: true}; + }, + return() { + results.push('return() called'); + } + }; + } + }; + + const source = Observable.from(iterable); + source.subscribe({ + next: v => results.push(v), + complete: () => results.push('complete'), + }, {signal: ac.signal}); + + assert_array_equals(results, []); +}, "from(): When iterable conversion aborts the subscription, next() is " + + "never called"); +test(() => { + const results = []; + const ac = new AbortController(); + + const iterable = { + [Symbol.asyncIterator]() { + ac.abort(); + return { + val: 0, + next() { + results.push('next() called'); + return {done: true}; + }, + return() { + results.push('return() called'); + } + }; + } + }; + + const source = Observable.from(iterable); + source.subscribe({ + next: v => results.push(v), + complete: () => results.push('complete'), + }, {signal: ac.signal}); + + assert_array_equals(results, []); +}, "from(): When async iterable conversion aborts the subscription, next() " + + "is never called"); + + + + + + + + +promise_test(async () => { + const results = []; + let resolveNext = null; + + const iterable = { + [Symbol.asyncIterator]() { + return { + next() { + results.push('next() called'); + return new Promise(resolve => { + resolveNext = resolve; + }); + }, + return() { + results.push('return() called'); + } + }; + } + }; + + const ac = new AbortController(); + const source = Observable.from(iterable); + source.subscribe({ + next: v => results.push(v), + complete: () => results.push('complete'), + }, {signal: ac.signal}); + + assert_array_equals(results, [ + "next() called", + ]); + + + ac.abort(); + + assert_array_equals(results, [ + "next() called", + "return() called", + ]); + + + + + + await new Promise(resolveOuter => { + resolveNext({ + get done() { + results.push('IteratorResult.done GETTER'); + resolveOuter(); + return true; + } + }); + }); + + assert_array_equals(results, [ + "next() called", + "return() called", + "IteratorResult.done GETTER", + + ]); +}, "from(): Aborting an async iterable subscription stops subsequent next() " + + "calls, but old next() Promise reactions are web-observable"); + +test(() => { + const results = []; + const iterable = { + [Symbol.iterator]() { + return { + val: 0, + next() { + return {value: this.val, done: this.val++ === 4 ? true : false}; + }, + return() { + results.push('return() called'); + }, + }; + } + }; + + const source = Observable.from(iterable); + const ac = new AbortController(); + source.subscribe({ + next: v => results.push(v), + complete: () => results.push('complete'), + }, {signal: ac.signal}); + + ac.abort(); + assert_array_equals(results, [0, 1, 2, 3, 'complete']); +}, "from(): Abort after complete does NOT call IteratorRecord#return()");