Skip to content

Commit

Permalink
WIP: yamux backed channel
Browse files Browse the repository at this point in the history
  • Loading branch information
robinhundt committed Feb 12, 2024
1 parent e9c1497 commit fb6e780
Show file tree
Hide file tree
Showing 3 changed files with 110 additions and 0 deletions.
46 changes: 46 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 11 additions & 0 deletions crates/seec-yamux/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
[package]
name = "seec-yamux"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
yamux = "0.13.1"
futures = "0.3.30"
tokio = { version = "1.36.0", features = ["macros", "sync"]}
53 changes: 53 additions & 0 deletions crates/seec-yamux/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
use std::collections::VecDeque;
use std::future::{Future, poll_fn};
use std::task::Poll;
use futures::{AsyncRead, AsyncWrite};
use tokio::sync::{mpsc, oneshot};
use yamux::Stream;

pub struct Connection<T> {
inner: yamux::Connection<T>,
cmd_recv: mpsc::Receiver<Cmd>,
buffered_inbound: VecDeque<Stream>,
requested_inbound: VecDeque<oneshot::Sender<Stream>>,
requested_outbound: VecDeque<oneshot::Sender<Stream>>,
}

pub enum Cmd {
NewChannel((oneshot::Sender<Stream>, oneshot::Sender<Stream>))
}


impl<T: AsyncRead + AsyncWrite + Unpin> Connection<T> {
pub fn spawn(mut self) -> impl Future<Output = ()> {
poll_fn(move |cx| {
match self.inner.poll_next_inbound(cx) {
Poll::Ready(Some(Ok(stream))) => self.buffered_inbound.push_back(stream),
_ => ()
}
match self.cmd_recv.poll_recv(cx) {
Poll::Ready(Some(Cmd::NewChannel((outbound, inbound)))) => {
if let Some(stream) = self.buffered_inbound.pop_front() {
inbound.send(stream).unwrap();
self.requested_outbound.push_back(outbound);
} else {
self.requested_inbound.push_back(inbound);
self.requested_outbound.push_back(outbound);
}
},
_ => ()
}
if !self.requested_outbound.is_empty() {
match self.inner.poll_new_outbound(cx) {
Poll::Ready(Ok(stream)) => {
if let Some(outbound) = self.requested_outbound.pop_front() {
outbound.send(stream).unwrap();
}
}
_ => ()
}
}
Poll::Pending
})
}
}

0 comments on commit fb6e780

Please sign in to comment.