Skip to content

Commit

Permalink
sort of owrks
Browse files Browse the repository at this point in the history
  • Loading branch information
Jarred-Sumner committed Dec 27, 2024
1 parent 44465a8 commit 1e34965
Show file tree
Hide file tree
Showing 13 changed files with 211 additions and 95 deletions.
31 changes: 27 additions & 4 deletions src/bun.js/bindings/BunReadableStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "BunReadableStreamDefaultController.h"

#include "BunPromiseInlines.h"
#include <JavaScriptCore/VMTrapsInlines.h>

namespace Bun {

Expand Down Expand Up @@ -113,6 +114,8 @@ JSPromise* JSReadableStream::cancel(VM& vm, JSGlobalObject* globalObject, JSValu
auto* controller = this->controller();
JSObject* cancelAlgorithm = controller->cancelAlgorithm();
m_controller.clear();
if (!cancelAlgorithm)
return Bun::createFulfilledPromise(globalObject, jsUndefined());

JSC::CallData callData = JSC::getCallData(cancelAlgorithm);

Expand Down Expand Up @@ -288,12 +291,29 @@ void JSReadableStream::visitChildrenImpl(JSCell* cell, Visitor& visitor)
ASSERT_GC_OBJECT_INHERITS(thisObject, info());
Base::visitChildren(thisObject, visitor);

visitor.append(thisObject->m_reader);
visitor.append(thisObject->m_controller);
visitor.append(thisObject->m_storedError);
thisObject->visitAdditionalChildren(visitor);
}

template<typename Visitor>
void JSReadableStream::visitAdditionalChildren(Visitor& visitor)
{
visitor.append(m_reader);
visitor.append(m_controller);
visitor.append(m_storedError);
}

template<typename Visitor>
void JSReadableStream::visitOutputConstraintsImpl(JSCell* cell, Visitor& visitor)
{
auto* thisObject = jsCast<JSReadableStream*>(cell);
Base::visitOutputConstraints(cell, visitor);

thisObject->visitAdditionalChildren(visitor);
}

DEFINE_VISIT_CHILDREN(JSReadableStream);
DEFINE_VISIT_ADDITIONAL_CHILDREN(JSReadableStream);
DEFINE_VISIT_OUTPUT_CONSTRAINTS(JSReadableStream);

bool JSReadableStream::isLocked() const
{
Expand All @@ -317,6 +337,9 @@ void JSReadableStream::finishCreation(VM& vm)
Base::finishCreation(vm);
m_state = State::Readable;
m_disturbed = false;
m_reader.clear();
m_controller.clear();
m_storedError.clear();
}

void JSReadableStream::setController(JSC::VM& vm, JSReadableStreamDefaultController* controller)
Expand All @@ -333,7 +356,7 @@ void JSReadableStream::close(JSGlobalObject* globalObject)
{
m_state = State::Closed;
if (auto* reader = this->reader())
reader->closedPromise()->resolve(globalObject, jsUndefined());
reader->closedPromise()->fulfill(globalObject, jsUndefined());
}

void JSReadableStream::error(JSGlobalObject* globalObject, JSValue error)
Expand Down
5 changes: 4 additions & 1 deletion src/bun.js/bindings/BunReadableStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ class JSReadableStream final : public JSC::JSNonFinalObject {

DECLARE_INFO;
DECLARE_VISIT_CHILDREN;
DECLARE_VISIT_OUTPUT_CONSTRAINTS;

template<typename Visitor>
void visitAdditionalChildren(Visitor& visitor);

// Public API for C++ usage
bool isLocked() const;
Expand All @@ -59,7 +63,6 @@ class JSReadableStream final : public JSC::JSNonFinalObject {
void setController(JSC::VM& vm, JSReadableStreamDefaultController*);
State state() const { return m_state; }
JSValue storedError() const { return m_storedError.get(); }
bool disturbed() const { return m_disturbed; }

private:
JSReadableStream(VM&, Structure*);
Expand Down
48 changes: 36 additions & 12 deletions src/bun.js/bindings/BunReadableStreamDefaultController.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@

#include "JavaScriptCore/SlotVisitorMacros.h"
#include "root.h"

#include "JavaScriptCore/IteratorOperations.h"
Expand Down Expand Up @@ -44,14 +45,32 @@ void JSReadableStreamDefaultController::visitChildrenImpl(JSC::JSCell* cell, Vis
{
auto* thisObject = static_cast<JSReadableStreamDefaultController*>(cell);
Base::visitChildren(cell, visitor);
visitor.append(thisObject->m_underlyingSource);
visitor.append(thisObject->m_pullAlgorithm);
visitor.append(thisObject->m_cancelAlgorithm);
visitor.append(thisObject->m_stream);
thisObject->m_queue.visit<Visitor>(visitor);

thisObject->visitAdditionalChildren(visitor);
}

template<typename Visitor>
void JSReadableStreamDefaultController::visitAdditionalChildren(Visitor& visitor)
{
visitor.append(m_underlyingSource);
visitor.append(m_pullAlgorithm);
visitor.append(m_cancelAlgorithm);
visitor.append(m_stream);
m_queue.visit<Visitor>(visitor);
}

template<typename Visitor>
void JSReadableStreamDefaultController::visitOutputConstraintsImpl(JSCell* cell, Visitor& visitor)
{
auto* thisObject = jsCast<JSReadableStreamDefaultController*>(cell);
Base::visitOutputConstraints(cell, visitor);

thisObject->visitAdditionalChildren(visitor);
}

DEFINE_VISIT_CHILDREN(JSReadableStreamDefaultController);
DEFINE_VISIT_ADDITIONAL_CHILDREN(JSReadableStreamDefaultController);
DEFINE_VISIT_OUTPUT_CONSTRAINTS(JSReadableStreamDefaultController);

JSReadableStreamDefaultController* JSReadableStreamDefaultController::create(VM& vm, JSGlobalObject* globalObject, Structure* structure, JSReadableStream* stream)
{
Expand Down Expand Up @@ -99,6 +118,7 @@ void JSReadableStreamDefaultController::performPullSteps(VM& vm, JSGlobalObject*
{
auto* stream = this->stream();
ASSERT(stream);
vm.writeBarrier(readRequest);

if (!this->queue().isEmpty()) {
// Let chunk be ! DequeueValue(this).
Expand All @@ -120,7 +140,7 @@ void JSReadableStreamDefaultController::performPullSteps(VM& vm, JSGlobalObject*
return;
}

stream->reader()->addReadRequest(vm, readRequest);
stream->reader()->addReadRequest(vm, globalObject, readRequest);

// Otherwise, perform ! ReadableStreamDefaultControllerCallPullIfNeeded(this).
this->callPullIfNeeded(globalObject);
Expand All @@ -139,7 +159,7 @@ JSValue JSReadableStreamDefaultController::enqueue(VM& vm, JSGlobalObject* globa
// 1. Let reader be stream.[[reader]].
// 2. Assert: reader.[[readRequests]] is not empty.
// 3. Let readRequest be reader.[[readRequests]][0].
JSPromise* readRequest = reader->takeFirst(vm);
JSPromise* readRequest = reader->takeFirst(vm, globalObject);
JSObject* result = JSC::createIteratorResultObject(globalObject, chunk, false);
readRequest->fulfill(globalObject, result);
callPullIfNeeded(globalObject);
Expand All @@ -162,7 +182,7 @@ void JSReadableStreamDefaultController::error(VM& vm, JSGlobalObject* globalObje
return;

// Reset queue
queue().resetQueue();
queue().resetQueue(vm, globalObject, this);

// Clear our algorithms so we stop executing them
clearAlgorithms();
Expand Down Expand Up @@ -361,17 +381,21 @@ bool JSReadableStreamDefaultController::shouldCallPull() const

void JSReadableStreamDefaultController::clearAlgorithms()
{
m_pullAlgorithm.clear();
m_cancelAlgorithm.clear();
m_underlyingSource.clear();
// m_pullAlgorithm.clear();
// m_cancelAlgorithm.clear();
// m_underlyingSource.clear();

queue().clearAlgorithms();
// queue().clearAlgorithms();
}

void JSReadableStreamDefaultController::finishCreation(VM& vm, JSReadableStream* stream)
{
Base::finishCreation(vm);
m_stream.set(vm, this, stream);
m_pullAlgorithm.clear();
m_cancelAlgorithm.clear();
m_underlyingSource.clear();
queue().resetQueue(vm, globalObject(), this);
}

const ClassInfo JSReadableStreamDefaultController::s_info = { "ReadableStreamDefaultController"_s, &Base::s_info, nullptr, nullptr, CREATE_METHOD_TABLE(JSReadableStreamDefaultController) };
Expand Down
5 changes: 5 additions & 0 deletions src/bun.js/bindings/BunReadableStreamDefaultController.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ class JSReadableStreamDefaultController final : public JSC::JSDestructibleObject
~JSReadableStreamDefaultController()
{
// We want the queue destructor with the WTF::Vector to be called.
m_queue.~StreamQueue();
}

static JSReadableStreamDefaultController* create(JSC::VM&, JSC::JSGlobalObject*, JSC::Structure*, JSReadableStream*);
Expand All @@ -37,6 +38,10 @@ class JSReadableStreamDefaultController final : public JSC::JSDestructibleObject

DECLARE_INFO;
DECLARE_VISIT_CHILDREN;
DECLARE_VISIT_OUTPUT_CONSTRAINTS;

template<typename Visitor>
void visitAdditionalChildren(Visitor& visitor);

void performPullSteps(JSC::VM&, JSC::JSGlobalObject*, JSC::JSPromise* readRequest);

Expand Down
74 changes: 40 additions & 34 deletions src/bun.js/bindings/BunReadableStreamDefaultReader.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
#include "root.h"

#include "JavaScriptCore/ArrayAllocationProfile.h"
#include "JavaScriptCore/JSGlobalObject.h"

#include <JavaScriptCore/LazyPropertyInlines.h>
#include "BunReadableStreamDefaultReader.h"
#include "BunClientData.h"
Expand All @@ -12,6 +15,8 @@
#include <JavaScriptCore/JSArray.h>
#include <JavaScriptCore/JSObjectInlines.h>
#include <JavaScriptCore/WriteBarrierInlines.h>
#include <JavaScriptCore/VMTrapsInlines.h>

namespace Bun {

using namespace JSC;
Expand All @@ -32,25 +37,39 @@ void JSReadableStreamDefaultReader::visitChildrenImpl(JSCell* cell, Visitor& vis
auto* reader = static_cast<JSReadableStreamDefaultReader*>(cell);
ASSERT_GC_OBJECT_INHERITS(reader, JSReadableStreamDefaultReader::info());
Base::visitChildren(reader, visitor);
visitor.append(reader->m_stream);
reader->m_readyPromise.visit(visitor);
reader->m_closedPromise.visit(visitor);
{
WTF::Locker lock(reader->m_gcLock);
for (auto request : reader->m_readRequests) {
visitor.appendUnbarriered(request);
}
}

reader->visitAdditionalChildren(visitor);
}

template<typename Visitor>
void JSReadableStreamDefaultReader::visitAdditionalChildren(Visitor& visitor)
{
m_readyPromise.visit(visitor);
m_closedPromise.visit(visitor);
visitor.append(m_stream);
visitor.append(m_readRequests);
}

DEFINE_VISIT_CHILDREN(JSReadableStreamDefaultReader);

template<typename Visitor>
void JSReadableStreamDefaultReader::visitOutputConstraintsImpl(JSCell* cell, Visitor& visitor)
{
auto* thisObject = jsCast<JSReadableStreamDefaultReader*>(cell);
Base::visitOutputConstraints(cell, visitor);

thisObject->visitAdditionalChildren(visitor);
}

DEFINE_VISIT_ADDITIONAL_CHILDREN(JSReadableStreamDefaultReader);
DEFINE_VISIT_OUTPUT_CONSTRAINTS(JSReadableStreamDefaultReader);

void JSReadableStreamDefaultReader::finishCreation(JSC::VM& vm, JSReadableStream* stream)
{
Base::finishCreation(vm);
ASSERT(inherits(info()));

m_stream.set(vm, this, stream);
m_readRequests.setMayBeNull(vm, this, JSC::constructEmptyArray(globalObject(), static_cast<JSC::ArrayAllocationProfile*>(nullptr), 0));
m_stream.setMayBeNull(vm, this, stream);

m_closedPromise.initLater(
[](const auto& init) {
Expand All @@ -66,31 +85,22 @@ void JSReadableStreamDefaultReader::finishCreation(JSC::VM& vm, JSReadableStream
});
}

JSPromise* JSReadableStreamDefaultReader::takeFirst(JSC::VM& vm)
JSPromise* JSReadableStreamDefaultReader::takeFirst(JSC::VM& vm, JSGlobalObject* globalObject)
{
JSPromise* promise;
{
WTF::Locker lock(m_gcLock);
promise = jsCast<JSPromise*>(m_readRequests.takeFirst());
if (!m_readRequests || m_readRequests->length() == 0) {
return nullptr;
}
vm.writeBarrier(this);
return promise;
auto* readRequests = m_readRequests.get();
JSValue first = readRequests->getIndex(globalObject, 0);
JSArray* replacement = readRequests->fastSlice(globalObject, readRequests, 1, readRequests->getArrayLength() - 1);
m_readRequests.set(vm, this, replacement);
return jsCast<JSPromise*>(first);
}

void JSReadableStreamDefaultReader::detach()
{
ASSERT(isActive());
m_stream.clear();
if (m_readyPromise.isInitialized()) {
m_readyPromise.setMayBeNull(vm(), this, nullptr);
}
{
WTF::Locker lock(m_gcLock);
m_readRequests.clear();
}
if (m_closedPromise.isInitialized()) {
m_closedPromise.setMayBeNull(vm(), this, nullptr);
}
}

void JSReadableStreamDefaultReader::releaseLock()
Expand All @@ -113,13 +123,9 @@ JSPromise* JSReadableStreamDefaultReader::cancel(JSC::VM& vm, JSGlobalObject* gl
return stream->cancel(vm, globalObject, reason);
}

void JSReadableStreamDefaultReader::addReadRequest(JSC::VM& vm, JSC::JSValue promise)
void JSReadableStreamDefaultReader::addReadRequest(JSC::VM& vm, JSC::JSGlobalObject* globalObject, JSC::JSValue promise)
{
{
WTF::Locker lock(m_gcLock);
m_readRequests.append(promise);
}
vm.writeBarrier(this, promise);
m_readRequests->push(globalObject, promise);
}

JSPromise* JSReadableStreamDefaultReader::read(JSC::VM& vm, JSGlobalObject* globalObject)
Expand Down
16 changes: 10 additions & 6 deletions src/bun.js/bindings/BunReadableStreamDefaultReader.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#pragma once

#include "JavaScriptCore/JSGlobalObject.h"
#include "root.h"

#include "JavaScriptCore/JSDestructibleObject.h"
Expand Down Expand Up @@ -38,6 +39,9 @@ class JSReadableStreamDefaultReader final : public JSC::JSDestructibleObject {

DECLARE_INFO;
DECLARE_VISIT_CHILDREN;
DECLARE_VISIT_OUTPUT_CONSTRAINTS;
template<typename Visitor>
void visitAdditionalChildren(Visitor& visitor);

template<typename, JSC::SubspaceAccess mode>
static JSC::GCClient::IsoSubspace* subspaceFor(JSC::VM& vm)
Expand All @@ -60,14 +64,15 @@ class JSReadableStreamDefaultReader final : public JSC::JSDestructibleObject {

bool isEmpty()
{
WTF::Locker lock(m_gcLock);
return m_readRequests.isEmpty();
if (!m_readRequests)
return true;
return m_readRequests->length() == 0;
}

// Implements ReadableStreamDefaultReader
void releaseLock();
JSC::JSPromise* takeFirst(JSC::VM&);
void addReadRequest(JSC::VM&, JSC::JSValue promise);
JSC::JSPromise* takeFirst(JSC::VM&, JSC::JSGlobalObject*);
void addReadRequest(JSC::VM&, JSC::JSGlobalObject* globalObject, JSC::JSValue promise);

private:
JSReadableStreamDefaultReader(JSC::VM& vm, JSC::Structure* structure)
Expand All @@ -77,8 +82,7 @@ class JSReadableStreamDefaultReader final : public JSC::JSDestructibleObject {

void finishCreation(JSC::VM&, JSReadableStream* stream);

WTF::Lock m_gcLock {};
WTF::Deque<JSC::JSValue> m_readRequests WTF_GUARDED_BY_LOCK(m_gcLock) = {};
mutable WriteBarrier<JSC::JSArray> m_readRequests;

// Internal slots defined by the spec
mutable JSC::WriteBarrier<JSObject> m_stream;
Expand Down
Loading

0 comments on commit 1e34965

Please sign in to comment.