diff options
Diffstat (limited to 'src/terminal')
-rw-r--r-- | src/terminal/decoding.rs | 151 | ||||
-rw-r--r-- | src/terminal/error.rs | 14 | ||||
-rw-r--r-- | src/terminal/prelude.rs | 4 |
3 files changed, 126 insertions, 43 deletions
diff --git a/src/terminal/decoding.rs b/src/terminal/decoding.rs index e2654fb..11f6046 100644 --- a/src/terminal/decoding.rs +++ b/src/terminal/decoding.rs @@ -1,63 +1,128 @@ -use crate::terminal::error::{self, TerminalError}; +#![forbid(unsafe_code)] +use crate::terminal::prelude::*; -use std::io::{BufRead, BufReader, Read}; +use crate::terminal::error; + +use async_std::future::Future; +use async_std::io::{BufRead, BufReader, Read}; +use async_std::sync::{Arc, Mutex}; +use async_std::task::{Context, Poll}; +use pin_project::pin_project; +use pin_utils::pin_mut; +use std::pin::Pin; use std::str; +use std::sync::atomic::{AtomicBool, Ordering}; -pub struct CharBufReader<R: Read> { - byte_reader: BufReader<R>, - char_buffer: String, +#[pin_project] +pub struct CharBufReader<R: Read + Unpin> { + #[pin] byte_reader: BufReader<R>, + #[pin] char_buffer: String, + is_interrupted: Arc<AtomicBool>, } -impl<R: Read> CharBufReader<R> { - pub fn new(input_stream: R) -> CharBufReader<R> { - let byte_reader = BufReader::new(input_stream); +#[pin_project] +struct FillBufFuture<R: Read + Unpin> { + char_reader: Arc<Mutex<CharBufReader<R>>>, + //char_reader: Arc<CharBufReader<R>>, +} - CharBufReader { - byte_reader: byte_reader, - char_buffer: String::new(), - } - } - pub fn fill_buf(&mut self) - -> std::result::Result<&str, TerminalError> +impl<R: Read + Unpin> Future for FillBufFuture<R> { + type Output = Result<String>; + + fn poll(self: Pin<&mut Self>, context: &mut Context<'_>) + -> Poll<Self::Output> { - loop { - let byte_buffer = self.byte_reader.fill_buf().map_err(error::input)?; - - match str::from_utf8(byte_buffer) { - Err(error) => { - let n_valid = error.valid_up_to(); - if n_valid == 0 { - self.byte_reader.consume(1); - } else { - match str::from_utf8(&byte_buffer[..n_valid]) { - Err(_) => { - self.byte_reader.consume(1); - }, - Ok(chars) => { - self.char_buffer.push_str(chars); - - self.byte_reader.consume(n_valid); - - break; - }, - } + let future = self.project(); + let char_reader: &mut Arc<Mutex<CharBufReader<R>>> = future.char_reader; + let char_reader_future = char_reader.lock(); + pin_mut!(char_reader_future); + match char_reader_future.poll(context) { + Poll::Ready(mut char_reader) => { + let char_reader = &mut *char_reader; + let mut byte_reader: Pin<&mut BufReader<R>> = Pin::new(&mut char_reader.byte_reader); + + loop { + if char_reader.is_interrupted.load(Ordering::Relaxed) { + println!("char reader got interrupt"); + return Poll::Pending; } - }, - Ok(chars) => { - self.char_buffer.push_str(chars); - let n_to_consume = byte_buffer.len(); - self.byte_reader.consume(n_to_consume); + println!("about to fill_buf"); + match byte_reader.as_mut().poll_fill_buf(context).map_err(error::input)? + { + Poll::Ready(byte_buffer) => { + println!("done with fill_buf"); + match str::from_utf8(&byte_buffer) { + Err(error) => { + let n_valid = error.valid_up_to(); + if n_valid == 0 { + byte_reader.as_mut().consume(1); + } else { + match str::from_utf8(&byte_buffer[..n_valid]) { + Err(_) => { + byte_reader.as_mut().consume(1); + }, + Ok(chars) => { + char_reader.char_buffer.push_str(chars); + + byte_reader.as_mut().consume(n_valid); + + break; + }, + } + } + } + Ok(chars) => { + char_reader.char_buffer.push_str(chars); - break; + let n_to_consume = byte_buffer.len(); + byte_reader.as_mut().consume(n_to_consume); + + break; + } + } + } + Poll::Pending => { + println!("fill_buf pending"); + return Poll::Pending; + } + } } + + return Poll::Ready(Ok(char_reader.char_buffer.to_string())); + } + Poll::Pending => { + println!("char_reader mutex pending"); + return Poll::Pending; } } + } +} + + +impl<R: Read + Unpin> CharBufReader<R> { + pub fn new(input_stream: R, is_interrupted: Arc<AtomicBool>) + -> CharBufReader<R> + { + let byte_reader = BufReader::new(input_stream); + + CharBufReader { + byte_reader: byte_reader, + char_buffer: String::new(), + is_interrupted: is_interrupted, + } + } - return Ok(&self.char_buffer); + pub fn fill_buf(reader: Arc<Mutex<Self>>) + //pub fn fill_buf(reader: Arc<Self>) + -> impl Future<Output = Result<String>> + { + FillBufFuture { + char_reader: reader, + } } pub fn consume(&mut self, amount: usize) { diff --git a/src/terminal/error.rs b/src/terminal/error.rs index 6666e49..795f973 100644 --- a/src/terminal/error.rs +++ b/src/terminal/error.rs @@ -1,7 +1,13 @@ +#![forbid(unsafe_code)] + +pub type Result<T> = std::result::Result<T, TerminalError>; + + #[derive(Clone,Debug,Eq,Hash,Ord,PartialEq,PartialOrd)] pub enum TerminalError { Input(String), ModeSetting(String), + Internal(String), } impl std::error::Error for TerminalError { } @@ -15,6 +21,9 @@ impl std::fmt::Display for TerminalError { TerminalError::ModeSetting(s) => f.write_fmt(format_args!( "Can't set terminal mode: {}", s)), + TerminalError::Internal(s) => + f.write_fmt(format_args!( + "Internal error regarding the terminal: {}", s)), } } } @@ -29,3 +38,8 @@ pub fn mode_setting(e: impl std::error::Error) -> TerminalError { TerminalError::ModeSetting(format!("{}", e)) } + +pub fn internal(e: impl std::error::Error) -> TerminalError { + TerminalError::Internal(format!("{}", e)) +} + diff --git a/src/terminal/prelude.rs b/src/terminal/prelude.rs new file mode 100644 index 0000000..bada817 --- /dev/null +++ b/src/terminal/prelude.rs @@ -0,0 +1,4 @@ +#![forbid(unsafe_code)] + +pub use crate::terminal::error::{Result, TerminalError}; + |