Skip to content

Commit

Permalink
simplify Reader implementation, move request code over there
Browse files Browse the repository at this point in the history
  • Loading branch information
fasterthanlime committed Aug 4, 2020
1 parent 9989099 commit c428a19
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 55 deletions.
27 changes: 14 additions & 13 deletions htfs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use color_eyre::Report;
use futures::io::AsyncRead;
use futures::lock::Mutex;
use reqwest::Method;
use std::{collections::HashMap, fmt};
use std::{collections::HashMap, fmt, sync::Arc};
use url::Url;

mod reader;
Expand Down Expand Up @@ -41,7 +41,7 @@ impl fmt::Debug for File {

impl File {
#[tracing::instrument]
pub async fn new(url: Url) -> Result<Self, Report> {
pub async fn new(url: Url) -> Result<Arc<Self>, Report> {
let client = reqwest::Client::new();
let req = client
.request(Method::GET, url.clone())
Expand All @@ -66,26 +66,27 @@ impl File {
connections,
blocks: Default::default(),
};
Ok(f)
Ok(Arc::new(f))
}

pub async fn get_reader(&self, offset: u64) -> Result<impl AsyncRead, Report> {
pub async fn get_reader(self: &Arc<Self>, offset: u64) -> Result<impl AsyncRead, Report> {
if offset > self.size {
Err(Error::ReadAfterEnd {
file_end: self.size,
requested: offset,
})?
} else {
let req = self
.client
.request(Method::GET, self.url.clone())
.header("range", format!("bytes={}-", offset))
.build()?;
let res = self.client.execute(req).await?;
let reader = response_reader::as_reader(res);
let reader = Reader2::new(reader);
// let req = self
// .client
// .request(Method::GET, self.url.clone())
// .header("range", format!("bytes={}-", offset))
// .build()?;
// let res = self.client.execute(req).await?;
// let reader = response_reader::as_reader(res);
// let reader = Reader2::new(Arc::clone(self), reader);
// Ok(reader)

Ok(reader)
Ok(Reader2::new(Arc::clone(self), offset))
}
}

Expand Down
88 changes: 48 additions & 40 deletions htfs/src/reader.rs
Original file line number Diff line number Diff line change
@@ -1,26 +1,31 @@
use crate::{response_reader, File};
use futures::io::AsyncRead;
use futures::prelude::*;
use reqwest::Method;
use std::{
fmt::{self, Debug},
io,
pin::Pin,
sync::Arc,
task::{Context, Poll},
time::Duration,
};

pub struct ReaderInner<R>
where
R: AsyncRead + Unpin,
{
reader: R,
pub struct ReaderInner {
file: Arc<File>,
offset: u64,
buf: Vec<u8>,
}

impl<R> ReaderInner<R>
fn make_io_error<E>(e: E) -> io::Error
where
R: AsyncRead + Unpin,
E: Into<Box<dyn std::error::Error + Send + Sync>>,
{
async fn private_read(mut self, n: usize) -> (Self, io::Result<usize>) {
io::Error::new(io::ErrorKind::Other, e)
}

impl ReaderInner {
async fn read_internal(&mut self, n: usize) -> io::Result<usize> {
tracing::debug!("waiting...");
tokio::time::delay_for(Duration::from_millis(200)).await;
tracing::debug!("reading!");
Expand All @@ -31,24 +36,32 @@ where
self.buf.push(0);
}

let res = self.reader.read(&mut self.buf[..n]).await;
(self, res)
let req = self
.file
.client
.request(Method::GET, self.file.url.clone())
.header("range", format!("bytes={}-", self.offset))
.build()
.map_err(make_io_error)?;

let res = self.file.client.execute(req).map_err(make_io_error).await?;
let mut reader = response_reader::as_reader(res);

let res = reader.read(&mut self.buf[..n]).await;
if let Ok(n) = &res {
self.offset += *n as u64;
}
res
}
}

enum State<R>
where
R: AsyncRead + Unpin + 'static,
{
Idle(ReaderInner<R>),
Pending(Pin<Box<dyn Future<Output = (ReaderInner<R>, io::Result<usize>)> + 'static>>),
enum State {
Idle(ReaderInner),
Pending(Pin<Box<dyn Future<Output = (ReaderInner, io::Result<usize>)> + 'static>>),
Transitional,
}

impl<R> fmt::Debug for State<R>
where
R: AsyncRead + Unpin,
{
impl fmt::Debug for State {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
State::Idle(_) => write!(f, "Idle")?,
Expand All @@ -59,40 +72,29 @@ where
}
}

pub struct Reader2<R>
where
R: AsyncRead + Unpin + 'static,
{
state: State<R>,
pub struct Reader2 {
state: State,
}

impl<R> Reader2<R>
where
R: AsyncRead + Unpin,
{
pub fn new(reader: R) -> Self {
impl Reader2 {
pub fn new(file: Arc<File>, offset: u64) -> Self {
Self {
state: State::Idle(ReaderInner {
reader,
file,
offset,
buf: Default::default(),
}),
}
}
}

impl<R> Debug for Pin<&mut Reader2<R>>
where
R: AsyncRead + Unpin,
{
impl Debug for Pin<&mut Reader2> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "Reader(State={:?})", self.state)
}
}

impl<R> AsyncRead for Reader2<R>
where
R: AsyncRead + Unpin,
{
impl AsyncRead for Reader2 {
#[tracing::instrument(skip(cx))]
fn poll_read(
mut self: Pin<&mut Self>,
Expand All @@ -102,7 +104,13 @@ where
let mut state = State::Transitional;
std::mem::swap(&mut self.state, &mut state);
let mut fut = match state {
State::Idle(r) => Box::pin(r.private_read(buf.len())),
State::Idle(mut r) => {
let len = buf.len();
Box::pin(async move {
let res = r.read_internal(len).await;
(r, res)
})
}
State::Pending(fut) => fut,
State::Transitional => unreachable!(),
};
Expand Down
8 changes: 6 additions & 2 deletions htfs/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,13 @@ async fn some_test_inner() -> Result<(), Report> {
let mut buf = vec![0u8; 7];
let mut reader = f.get_reader(34).await?;

for i in 0..3 {
let slices = &["<title>", "Example", " Domain"];

for (i, &slice) in slices.iter().enumerate() {
reader.read_exact(&mut buf).await?;
log::info!("{:?}", String::from_utf8_lossy(&buf[..]));
let s = String::from_utf8_lossy(&buf[..]);
log::info!("{:?}", s);
assert_eq!(&s, slice);
}

Ok(())
Expand Down

0 comments on commit c428a19

Please sign in to comment.