-
Notifications
You must be signed in to change notification settings - Fork 962
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
core/muxing: Redesign StreamMuxer
trait
#2648
core/muxing: Redesign StreamMuxer
trait
#2648
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Things found in self-review.
Great. I meant to touch this code part since some time. I will take a look. Thanks @thomaseizinger for proposing the patch set. |
This comment was marked as resolved.
This comment was marked as resolved.
This comment was marked as resolved.
This comment was marked as resolved.
This comment was marked as resolved.
This comment was marked as resolved.
As part of fixing the benchmarks, I ran them against
|
This comment was marked as resolved.
This comment was marked as resolved.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some very cool simplifications in here.
Any chance this pull request can be split into multiple autonomous ones to ease the review process?
This comment was marked as resolved.
This comment was marked as resolved.
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
8bc4dfb
to
cc75a1a
Compare
This comment was marked as resolved.
This comment was marked as resolved.
67c8901
to
2b0f2de
Compare
This comment was marked as resolved.
This comment was marked as resolved.
muxers/mplex/tests/async_write.rs
Outdated
let mut buf = Vec::new(); | ||
outbound.read_to_end(&mut buf).await.unwrap(); | ||
let mut buf = vec![0u8; 11]; | ||
outbound.read_exact(&mut buf).await.unwrap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would that not defeat the purpose of the unit test?
Above:
// Tests that `AsyncWrite::close` implies flush.
Not using read_to_end
thus no longer tests the closing and flushing, no?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah you might be right. I wasn't too happy about these changes but extracted them as a separate commit so we can discuss it easier.
I suspect there might be a bug somewhere but I couldn't spot it so far.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I fixed it! The important commit is cec1176.
This means I can drop this commit from the PR history but that will require a force-push. I've noted it in the PR description as a task for the final clean-up once this PR is closer to being merged :)
This comment was marked as resolved.
This comment was marked as resolved.
I would guess that https://github.com/flamegraph-rs/flamegraph proves useful here. One can compile the benchmarks to a binary and then run them via |
Great idea. I am familiar with flamegraph-rs, just didn't think of using it. |
This is analogous to other sub-modules like `transport` within `libp2p-core` and thus contributes to consistency within the code base. Additionally, it will reduce the diff of libp2p#2648.
We are already enforcing that the associated type must convert to `io::Error`. We might as well just make all functions return an `io::Error` directly.
Visually separating this makes it easier to read.
The current `StreamMuxer` API is based around a poll-based approach where each substream needs to be passed by value to read/write/close it. The muxer itself however is not available in the end-user abstractions like `ConnectionHandler`. To circumvent this, the `StreamMuxerBox` type exists which allows a `StreamMuxer` to be cloned. Together with `SubstreamRef`, this allows each substream to own a reference to the muxer it was created with and pass itself to the muxer within implementations of `AsyncRead` and `AsyncWrite`. These implementations are convenient as they allow an end-user to simply read and write to the substream without holding a reference to anything else. We can achieve the same goal by changing the `StreamMuxerBox` abstraction to use a `SubstreamBox`. Users of `libp2p-core` can continue to depend on the `Substream` associated type of `StreamMuxer`.
e9506aa
to
d44b0a6
Compare
EDIT: I managed to extract the crucial bit into a dedicated PR: #2706 Best to benchmark that in isolation. OLD COMMENT BELOW: Interestingly enough, I can no longer reproduce the performance problems. Would you mind trying @elenaf9 / @mxinden? What I did was:
Funnily, I can now actually see a performance improvement of 5% for some of the TCP benchmarks and even 10% for the memory benchmarks.
|
Reduce diff.
Reduce diff.
Fix compile errors.
This aligns the public API of the `libp2p-mplex` module with the one from `libp2p-yamux`. This change has two benefits: 1. For standalone users of `libp2p-mplex`, the substreams itself are now useful, similar to `libp2p-yamux` and don't necessarily need to be polled via the `StreamMuxer`. The `StreamMuxer` only forwards to the `Async{Read,Write}` implementations. 2. This will reduce the diff of libp2p#2648 because we can chunk the one giant commit into smaller atomic ones.
Fix docs error.
…2706) This aligns the public API of the `libp2p-mplex` module with the one from `libp2p-yamux`. This change has two benefits: 1. For standalone users of `libp2p-mplex`, the substreams itself are now useful, similar to `libp2p-yamux` and don't necessarily need to be polled via the `StreamMuxer`. The `StreamMuxer` only forwards to the `Async{Read,Write}` implementations. 2. This will reduce the diff of #2648 because we can chunk the one giant commit into smaller atomic ones.
If you want, you can benchmark #2706 against master before it was merged :) |
@melekes given that #2622 now also implements the Let us know in case there is something we should be considering (with regards to WebRTC). I would expect the overall change set to simplify the WebRTC muxing implementation. |
I am closing this PR in favor of a ticket where we can track all of this: #2722 |
@thomaseizinger: Ran on a
where 3c120ef is the base and ea487ae includes the change. I don't see any of this as a sign of a regression. |
I did not get the chance to review the But either way I think the changes of this PR are the right direction. |
I think worst case, you can follow an approach similar to the mplex muxer! |
Description
This PR re-designs the
StreamMuxer
trait to:&mut self
for all methodspoll
", same as we do for many other parts ofrust-libp2p
Splitting out separate PRs
StreamMuxer::Error
to io::Error #2664Poll
import #2685StreamMuxer::flush_all
function #2669close
topoll_close
#2666Sync
bounds #2667StreamMuxerBox
#2668Yamux::close
to use?
#2677ConnectionError
in public API withio::Error
rust-yamux#135StreamMuxerEvent::map_inbound_stream
#2691boxed
module #2703AsyncRead
andAsyncWrite
forSubstream
#2706StreamMuxer::Substream
to implementAsync{Read,Write}
#2707Into<io::Error>
bound onStreamMuxer
withstd::error::Error
#2710Additional tasks
Substream
associated type on libp2p-core levelBytes
/[u8; X]
?)StreamMuxer
usePin<&mut self>
Include "first close, then flush" logic withinSubstreamBox
Releaseyamux
once ReplaceConnectionError
in public API withio::Error
rust-yamux#135 is mergedUpdate PR with latest yamux releaselibp2p-yamux
.Links to any relevant issues
Open Questions
Is there a better name forAsyncReadWriteBox
?Change checklist