From 86990efcf8fa63decb05f462d745b95e4992dc77 Mon Sep 17 00:00:00 2001 From: Irene Knapp Date: Thu, 11 Mar 2021 20:15:47 -0800 Subject: refactor everything into smaller modules; move to using async-std; make the read loop async. some stuff doesn't work yet but it needed to be done and this is as clean a state as it's likely to get in, so we're committing it as a base to build on. --- src/terminal/decoding.rs | 151 +++++++++++++++++++++++++++++++++-------------- src/terminal/error.rs | 14 +++++ src/terminal/prelude.rs | 4 ++ 3 files changed, 126 insertions(+), 43 deletions(-) create mode 100644 src/terminal/prelude.rs (limited to 'src/terminal') diff --git a/src/terminal/decoding.rs b/src/terminal/decoding.rs index e2654fb..11f6046 100644 --- a/src/terminal/decoding.rs +++ b/src/terminal/decoding.rs @@ -1,63 +1,128 @@ -use crate::terminal::error::{self, TerminalError}; +#![forbid(unsafe_code)] +use crate::terminal::prelude::*; -use std::io::{BufRead, BufReader, Read}; +use crate::terminal::error; + +use async_std::future::Future; +use async_std::io::{BufRead, BufReader, Read}; +use async_std::sync::{Arc, Mutex}; +use async_std::task::{Context, Poll}; +use pin_project::pin_project; +use pin_utils::pin_mut; +use std::pin::Pin; use std::str; +use std::sync::atomic::{AtomicBool, Ordering}; -pub struct CharBufReader { - byte_reader: BufReader, - char_buffer: String, +#[pin_project] +pub struct CharBufReader { + #[pin] byte_reader: BufReader, + #[pin] char_buffer: String, + is_interrupted: Arc, } -impl CharBufReader { - pub fn new(input_stream: R) -> CharBufReader { - let byte_reader = BufReader::new(input_stream); +#[pin_project] +struct FillBufFuture { + char_reader: Arc>>, + //char_reader: Arc>, +} - CharBufReader { - byte_reader: byte_reader, - char_buffer: String::new(), - } - } - pub fn fill_buf(&mut self) - -> std::result::Result<&str, TerminalError> +impl Future for FillBufFuture { + type Output = Result; + + fn poll(self: Pin<&mut Self>, context: &mut Context<'_>) + -> Poll { - loop { - let byte_buffer = self.byte_reader.fill_buf().map_err(error::input)?; - - match str::from_utf8(byte_buffer) { - Err(error) => { - let n_valid = error.valid_up_to(); - if n_valid == 0 { - self.byte_reader.consume(1); - } else { - match str::from_utf8(&byte_buffer[..n_valid]) { - Err(_) => { - self.byte_reader.consume(1); - }, - Ok(chars) => { - self.char_buffer.push_str(chars); - - self.byte_reader.consume(n_valid); - - break; - }, - } + let future = self.project(); + let char_reader: &mut Arc>> = future.char_reader; + let char_reader_future = char_reader.lock(); + pin_mut!(char_reader_future); + match char_reader_future.poll(context) { + Poll::Ready(mut char_reader) => { + let char_reader = &mut *char_reader; + let mut byte_reader: Pin<&mut BufReader> = Pin::new(&mut char_reader.byte_reader); + + loop { + if char_reader.is_interrupted.load(Ordering::Relaxed) { + println!("char reader got interrupt"); + return Poll::Pending; } - }, - Ok(chars) => { - self.char_buffer.push_str(chars); - let n_to_consume = byte_buffer.len(); - self.byte_reader.consume(n_to_consume); + println!("about to fill_buf"); + match byte_reader.as_mut().poll_fill_buf(context).map_err(error::input)? + { + Poll::Ready(byte_buffer) => { + println!("done with fill_buf"); + match str::from_utf8(&byte_buffer) { + Err(error) => { + let n_valid = error.valid_up_to(); + if n_valid == 0 { + byte_reader.as_mut().consume(1); + } else { + match str::from_utf8(&byte_buffer[..n_valid]) { + Err(_) => { + byte_reader.as_mut().consume(1); + }, + Ok(chars) => { + char_reader.char_buffer.push_str(chars); + + byte_reader.as_mut().consume(n_valid); + + break; + }, + } + } + } + Ok(chars) => { + char_reader.char_buffer.push_str(chars); - break; + let n_to_consume = byte_buffer.len(); + byte_reader.as_mut().consume(n_to_consume); + + break; + } + } + } + Poll::Pending => { + println!("fill_buf pending"); + return Poll::Pending; + } + } } + + return Poll::Ready(Ok(char_reader.char_buffer.to_string())); + } + Poll::Pending => { + println!("char_reader mutex pending"); + return Poll::Pending; } } + } +} + + +impl CharBufReader { + pub fn new(input_stream: R, is_interrupted: Arc) + -> CharBufReader + { + let byte_reader = BufReader::new(input_stream); + + CharBufReader { + byte_reader: byte_reader, + char_buffer: String::new(), + is_interrupted: is_interrupted, + } + } - return Ok(&self.char_buffer); + pub fn fill_buf(reader: Arc>) + //pub fn fill_buf(reader: Arc) + -> impl Future> + { + FillBufFuture { + char_reader: reader, + } } pub fn consume(&mut self, amount: usize) { diff --git a/src/terminal/error.rs b/src/terminal/error.rs index 6666e49..795f973 100644 --- a/src/terminal/error.rs +++ b/src/terminal/error.rs @@ -1,7 +1,13 @@ +#![forbid(unsafe_code)] + +pub type Result = std::result::Result; + + #[derive(Clone,Debug,Eq,Hash,Ord,PartialEq,PartialOrd)] pub enum TerminalError { Input(String), ModeSetting(String), + Internal(String), } impl std::error::Error for TerminalError { } @@ -15,6 +21,9 @@ impl std::fmt::Display for TerminalError { TerminalError::ModeSetting(s) => f.write_fmt(format_args!( "Can't set terminal mode: {}", s)), + TerminalError::Internal(s) => + f.write_fmt(format_args!( + "Internal error regarding the terminal: {}", s)), } } } @@ -29,3 +38,8 @@ pub fn mode_setting(e: impl std::error::Error) -> TerminalError { TerminalError::ModeSetting(format!("{}", e)) } + +pub fn internal(e: impl std::error::Error) -> TerminalError { + TerminalError::Internal(format!("{}", e)) +} + diff --git a/src/terminal/prelude.rs b/src/terminal/prelude.rs new file mode 100644 index 0000000..bada817 --- /dev/null +++ b/src/terminal/prelude.rs @@ -0,0 +1,4 @@ +#![forbid(unsafe_code)] + +pub use crate::terminal::error::{Result, TerminalError}; + -- cgit 1.4.1