diff options
Diffstat (limited to 'src/decoding.rs')
-rw-r--r-- | src/decoding.rs | 117 |
1 files changed, 117 insertions, 0 deletions
diff --git a/src/decoding.rs b/src/decoding.rs new file mode 100644 index 0000000..cba4d76 --- /dev/null +++ b/src/decoding.rs @@ -0,0 +1,117 @@ +#![forbid(unsafe_code)] +use crate::prelude::*; + +use crate::error; + +use pin_project::pin_project; +use pin_utils::pin_mut; +use std::future::Future; +use std::pin::Pin; +use std::str; +use std::sync::Arc; +use std::task::{Context, Poll}; +use tokio::io::{AsyncBufRead, AsyncRead, BufReader}; +use tokio::sync::Mutex; + + +#[pin_project] +pub struct CharBufReader<R: AsyncRead + Unpin> { + #[pin] byte_reader: BufReader<R>, + #[pin] char_buffer: String, +} + + +#[pin_project] +struct FillBufFuture<R: AsyncRead + Unpin> { + char_reader: Arc<Mutex<CharBufReader<R>>>, +} + + +impl<R: AsyncRead + Unpin> Future for FillBufFuture<R> { + type Output = Result<String>; + + fn poll(self: Pin<&mut Self>, context: &mut Context<'_>) + -> Poll<Self::Output> + { + let future = self.project(); + let char_reader: &mut Arc<Mutex<CharBufReader<R>>> = future.char_reader; + let char_reader_future = char_reader.lock(); + pin_mut!(char_reader_future); + match char_reader_future.poll(context) { + Poll::Ready(mut char_reader) => { + let char_reader = &mut *char_reader; + let mut byte_reader: Pin<&mut BufReader<R>> = Pin::new(&mut char_reader.byte_reader); + + loop { + match byte_reader.as_mut().poll_fill_buf(context).map_err(error::input)? + { + Poll::Ready(byte_buffer) => { + match str::from_utf8(&byte_buffer) { + Err(error) => { + let n_valid = error.valid_up_to(); + if n_valid == 0 { + byte_reader.as_mut().consume(1); + } else { + match str::from_utf8(&byte_buffer[..n_valid]) { + Err(_) => { + byte_reader.as_mut().consume(1); + }, + Ok(chars) => { + char_reader.char_buffer.push_str(chars); + + byte_reader.as_mut().consume(n_valid); + + break; + }, + } + } + } + Ok(chars) => { + char_reader.char_buffer.push_str(chars); + + let n_to_consume = byte_buffer.len(); + byte_reader.as_mut().consume(n_to_consume); + + break; + } + } + } + Poll::Pending => { + return Poll::Pending; + } + } + } + + return Poll::Ready(Ok(char_reader.char_buffer.to_string())); + } + Poll::Pending => { + return Poll::Pending; + } + } + } +} + + +impl<R: AsyncRead + Unpin> CharBufReader<R> { + pub fn new(input_stream: R) -> CharBufReader<R> { + let byte_reader = BufReader::new(input_stream); + + CharBufReader { + byte_reader: byte_reader, + char_buffer: String::new(), + } + } + + pub fn fill_buf(reader: Arc<Mutex<Self>>) + -> impl Future<Output = Result<String>> + { + FillBufFuture { + char_reader: reader, + } + } + + pub fn consume(&mut self, amount: usize) { + self.char_buffer.replace_range(..amount, ""); + } +} + |