diff --git a/join/HashJoin.js b/join/HashJoin.js index e5f601b..6ea664c 100644 --- a/join/HashJoin.js +++ b/join/HashJoin.js @@ -23,17 +23,12 @@ class HashJoin extends AsyncIterator this.left.on('error', (error) => this.destroy(error)); this.right.on('error', (error) => this.destroy(error)); - if (this.left.readable || this.right.readable) - { - this.readable = true; - } + this.readable = false; this.left.on('end', allowJoining.bind(this)); function allowJoining () { - if (this.leftMap.size <= 0) - return this.close(); this.readable = true; this.right.on('readable', () => this.readable = true); this.right.on('end', () => { if (!this.hasResults()) this._end(); }); @@ -41,12 +36,14 @@ class HashJoin extends AsyncIterator this.on('newListener', (eventName) => { - if (eventName === 'data' && !this.addedDataListener) + if (eventName === 'data') { - this.addedDataListener = true; - this._addDataListener(); + this._addDataListenerIfNeeded(); } - }) + }); + if (this.left.readable) + this._addDataListenerIfNeeded(); + this.left.on('readable', () => this._addDataListenerIfNeeded()); } hasResults () @@ -63,11 +60,7 @@ class HashJoin extends AsyncIterator read () { - if (!this.addedDataListener) - { - this.addedDataListener = true; - this._addDataListener(); - } + this._addDataListenerIfNeeded(); while(true) { if (this.ended || !this.readable) @@ -98,6 +91,14 @@ class HashJoin extends AsyncIterator } } + _addDataListenerIfNeeded() { + if (!this.addedDataListener) + { + this.addedDataListener = true; + this._addDataListener(); + } + } + _addDataListener() { this.left.on('data', addItem.bind(this));