Skip to content

Commit

Permalink
Fix HashJoin hanging for non-autoStarted iterators
Browse files Browse the repository at this point in the history
  • Loading branch information
rubensworks committed Jun 14, 2024
1 parent 36d70cb commit 8ec9c78
Showing 1 changed file with 16 additions and 15 deletions.
31 changes: 16 additions & 15 deletions join/HashJoin.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,30 +23,27 @@ class HashJoin extends AsyncIterator
this.left.on('error', (error) => this.destroy(error));
this.right.on('error', (error) => this.destroy(error));

if (this.left.readable || this.right.readable)
{
this.readable = true;
}
this.readable = false;

this.left.on('end', allowJoining.bind(this));

function allowJoining ()
{
if (this.leftMap.size <= 0)
return this.close();
this.readable = true;
this.right.on('readable', () => this.readable = true);
this.right.on('end', () => { if (!this.hasResults()) this._end(); });
}

this.on('newListener', (eventName) =>
{
if (eventName === 'data' && !this.addedDataListener)
if (eventName === 'data')
{
this.addedDataListener = true;
this._addDataListener();
this._addDataListenerIfNeeded();
}
})
});
if (this.left.readable)
this._addDataListenerIfNeeded();
this.left.on('readable', () => this._addDataListenerIfNeeded());
}

hasResults ()
Expand All @@ -63,11 +60,7 @@ class HashJoin extends AsyncIterator

read ()
{
if (!this.addedDataListener)
{
this.addedDataListener = true;
this._addDataListener();
}
this._addDataListenerIfNeeded();

while(true) {
if (this.ended || !this.readable)
Expand Down Expand Up @@ -98,6 +91,14 @@ class HashJoin extends AsyncIterator
}
}

_addDataListenerIfNeeded() {
if (!this.addedDataListener)
{
this.addedDataListener = true;
this._addDataListener();
}
}

_addDataListener()
{
this.left.on('data', addItem.bind(this));
Expand Down

0 comments on commit 8ec9c78

Please sign in to comment.