summary refs log tree commit diff
path: root/src/terminal/decoding.rs
diff options
context:
space:
mode:
authorIrene Knapp <ireneista@gmail.com>2021-03-11 20:15:47 -0800
committerIrene Knapp <ireneista@gmail.com>2021-03-11 20:15:47 -0800
commit86990efcf8fa63decb05f462d745b95e4992dc77 (patch)
tree46990fd1c749ccca0b6477650973d83174e5e264 /src/terminal/decoding.rs
parenta159d4dd29491a1eb88163e63ceb41a903dc47b6 (diff)
refactor everything into smaller modules; move to using async-std; make the read loop async. some stuff doesn't work yet but it needed to be done and this is as clean a state as it's likely to get in, so we're committing it as a base to build on.
Diffstat (limited to 'src/terminal/decoding.rs')
-rw-r--r--src/terminal/decoding.rs151
1 files changed, 108 insertions, 43 deletions
diff --git a/src/terminal/decoding.rs b/src/terminal/decoding.rs
index e2654fb..11f6046 100644
--- a/src/terminal/decoding.rs
+++ b/src/terminal/decoding.rs
@@ -1,63 +1,128 @@
-use crate::terminal::error::{self, TerminalError};
+#![forbid(unsafe_code)]
+use crate::terminal::prelude::*;
 
-use std::io::{BufRead, BufReader, Read};
+use crate::terminal::error;
+
+use async_std::future::Future;
+use async_std::io::{BufRead, BufReader, Read};
+use async_std::sync::{Arc, Mutex};
+use async_std::task::{Context, Poll};
+use pin_project::pin_project;
+use pin_utils::pin_mut;
+use std::pin::Pin;
 use std::str;
+use std::sync::atomic::{AtomicBool, Ordering};
 
 
-pub struct CharBufReader<R: Read> {
-  byte_reader: BufReader<R>,
-  char_buffer: String,
+#[pin_project]
+pub struct CharBufReader<R: Read + Unpin> {
+  #[pin] byte_reader: BufReader<R>,
+  #[pin] char_buffer: String,
+  is_interrupted: Arc<AtomicBool>,
 }
 
 
-impl<R: Read> CharBufReader<R> {
-  pub fn new(input_stream: R) -> CharBufReader<R> {
-    let byte_reader = BufReader::new(input_stream);
+#[pin_project]
+struct FillBufFuture<R: Read + Unpin> {
+  char_reader: Arc<Mutex<CharBufReader<R>>>,
+  //char_reader: Arc<CharBufReader<R>>,
+}
 
-    CharBufReader {
-      byte_reader: byte_reader,
-      char_buffer: String::new(),
-    }
-  }
 
-  pub fn fill_buf(&mut self)
-    -> std::result::Result<&str, TerminalError>
+impl<R: Read + Unpin> Future for FillBufFuture<R> {
+  type Output = Result<String>;
+
+  fn poll(self: Pin<&mut Self>, context: &mut Context<'_>)
+    -> Poll<Self::Output>
   {
-    loop {
-      let byte_buffer = self.byte_reader.fill_buf().map_err(error::input)?;
-
-      match str::from_utf8(byte_buffer) {
-        Err(error) => {
-          let n_valid = error.valid_up_to();
-          if n_valid == 0 {
-            self.byte_reader.consume(1);
-          } else {
-            match str::from_utf8(&byte_buffer[..n_valid]) {
-              Err(_) => {
-                self.byte_reader.consume(1);
-              },
-              Ok(chars) => {
-                self.char_buffer.push_str(chars);
-
-                self.byte_reader.consume(n_valid);
-
-                break;
-              },
-            }
+    let future = self.project();
+    let char_reader: &mut Arc<Mutex<CharBufReader<R>>> = future.char_reader;
+    let char_reader_future = char_reader.lock();
+    pin_mut!(char_reader_future);
+    match char_reader_future.poll(context) {
+      Poll::Ready(mut char_reader) => {
+        let char_reader = &mut *char_reader;
+        let mut byte_reader: Pin<&mut BufReader<R>> = Pin::new(&mut char_reader.byte_reader);
+
+        loop {
+          if char_reader.is_interrupted.load(Ordering::Relaxed) {
+            println!("char reader got interrupt");
+            return Poll::Pending;
           }
-        },
-        Ok(chars) => {
-          self.char_buffer.push_str(chars);
 
-          let n_to_consume = byte_buffer.len();
-          self.byte_reader.consume(n_to_consume);
+          println!("about to fill_buf");
+          match byte_reader.as_mut().poll_fill_buf(context).map_err(error::input)?
+          {
+            Poll::Ready(byte_buffer) => {
+              println!("done with fill_buf");
+              match str::from_utf8(&byte_buffer) {
+                Err(error) => {
+                  let n_valid = error.valid_up_to();
+                  if n_valid == 0 {
+                    byte_reader.as_mut().consume(1);
+                  } else {
+                    match str::from_utf8(&byte_buffer[..n_valid]) {
+                      Err(_) => {
+                        byte_reader.as_mut().consume(1);
+                      },
+                      Ok(chars) => {
+                        char_reader.char_buffer.push_str(chars);
+
+                        byte_reader.as_mut().consume(n_valid);
+
+                        break;
+                      },
+                    }
+                  }
+                }
+                Ok(chars) => {
+                  char_reader.char_buffer.push_str(chars);
 
-          break;
+                  let n_to_consume = byte_buffer.len();
+                  byte_reader.as_mut().consume(n_to_consume);
+
+                  break;
+                }
+              }
+            }
+            Poll::Pending => {
+              println!("fill_buf pending");
+              return Poll::Pending;
+            }
+          }
         }
+
+        return Poll::Ready(Ok(char_reader.char_buffer.to_string()));
+      }
+      Poll::Pending => {
+        println!("char_reader mutex pending");
+        return Poll::Pending;
       }
     }
+  }
+}
+
+
+impl<R: Read + Unpin> CharBufReader<R> {
+  pub fn new(input_stream: R, is_interrupted: Arc<AtomicBool>)
+    -> CharBufReader<R>
+  {
+    let byte_reader = BufReader::new(input_stream);
+
+    CharBufReader {
+      byte_reader: byte_reader,
+      char_buffer: String::new(),
+      is_interrupted: is_interrupted,
+    }
+  }
 
-    return Ok(&self.char_buffer);
+  pub fn fill_buf(reader: Arc<Mutex<Self>>)
+  //pub fn fill_buf(reader: Arc<Self>)
+    -> impl Future<Output = Result<String>>
+  {
+    FillBufFuture {
+      char_reader: reader,
+    }
   }
 
   pub fn consume(&mut self, amount: usize) {