Skip to content

Commit

Permalink
Merge pull request #1307 from JS-AK/fix/transit/updated-remove-pendin…
Browse files Browse the repository at this point in the history
…g-streams

fix: updated closing dangling streams at src/transit.js
  • Loading branch information
icebob authored Nov 14, 2024
2 parents 6e15131 + c197d5c commit 7683348
Showing 1 changed file with 54 additions and 7 deletions.
61 changes: 54 additions & 7 deletions src/transit.js
Original file line number Diff line number Diff line change
Expand Up @@ -1033,12 +1033,8 @@ class Transit {
// Close pending request streams of the node
this.pendingReqStreams.forEach(({ sender, stream }, id) => {
if (sender === nodeID) {
// Close the stream with error
if (!stream.destroyed) {
stream.destroy(new Error(`Request stream closed by ${nodeID}`));
}

this.pendingReqStreams.delete(id);
this._destroyStreamIfPossible(stream, `Stream closed by ${nodeID}`);
}
});

Expand All @@ -1054,12 +1050,63 @@ class Transit {
})
);

this.pendingReqStreams.delete(id);
this.pendingResStreams.delete(id);
this._deletePendingReqStream(id, nodeID);
this._deletePendingResStream(id, nodeID);
}
});
}

/**
* Internal method to delete a pending response stream from `pendingResStreams`
* and destroy it (if not already destroyed) with error.
*
* @param {String} id ID of the stream in `pendingResStreams`
* @param {String} origin NodeID of the origin of the destroy request
*
* @memberof Transit
*/
_deletePendingResStream(id, origin) {
const stream = this.pendingResStreams.get(id);
this.pendingResStreams.delete(id);

if (stream) {
this._destroyStreamIfPossible(stream, `Stream closed by ${origin}`);
}
}

/**
* Internal method to delete a pending request stream from `pendingReqStreams`
* and destroy it (if not already ended) with error.
*
* @param {String} id ID of the stream in `pendingReqStreams`
* @param {String} origin NodeID of the origin of the destroy request
*
* @memberof Transit
*/
_deletePendingReqStream(id, origin) {
const reqStream = this.pendingReqStreams.get(id);
const pass = reqStream ? reqStream.stream : undefined;
this.pendingReqStreams.delete(id);

if (pass) {
this._destroyStreamIfPossible(pass, `Stream closed by ${origin}`);
}
}

/**
* Internal method to destroy a stream if it is not already destroyed.
*
* @param {DuplexStream} stream - The stream to be destroyed.
* @param {String} errorMessage - The error message to be used when destroying.
*
* @memberof Transit
*/
_destroyStreamIfPossible(stream, errorMessage) {
if (!stream.destroyed && stream.destroy) {
stream.destroy(new Error(errorMessage));
}
}

/**
* Create error field in outgoing payload
*
Expand Down

0 comments on commit 7683348

Please sign in to comment.