summary refs log tree commit diff
path: root/src/decoding.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/decoding.rs')
-rw-r--r--src/decoding.rs117
1 files changed, 117 insertions, 0 deletions
diff --git a/src/decoding.rs b/src/decoding.rs
new file mode 100644
index 0000000..cba4d76
--- /dev/null
+++ b/src/decoding.rs
@@ -0,0 +1,117 @@
+#![forbid(unsafe_code)]
+use crate::prelude::*;
+
+use crate::error;
+
+use pin_project::pin_project;
+use pin_utils::pin_mut;
+use std::future::Future;
+use std::pin::Pin;
+use std::str;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+use tokio::io::{AsyncBufRead, AsyncRead, BufReader};
+use tokio::sync::Mutex;
+
+
+#[pin_project]
+pub struct CharBufReader<R: AsyncRead + Unpin> {
+  #[pin] byte_reader: BufReader<R>,
+  #[pin] char_buffer: String,
+}
+
+
+#[pin_project]
+struct FillBufFuture<R: AsyncRead + Unpin> {
+  char_reader: Arc<Mutex<CharBufReader<R>>>,
+}
+
+
+impl<R: AsyncRead + Unpin> Future for FillBufFuture<R> {
+  type Output = Result<String>;
+
+  fn poll(self: Pin<&mut Self>, context: &mut Context<'_>)
+    -> Poll<Self::Output>
+  {
+    let future = self.project();
+    let char_reader: &mut Arc<Mutex<CharBufReader<R>>> = future.char_reader;
+    let char_reader_future = char_reader.lock();
+    pin_mut!(char_reader_future);
+    match char_reader_future.poll(context) {
+      Poll::Ready(mut char_reader) => {
+        let char_reader = &mut *char_reader;
+        let mut byte_reader: Pin<&mut BufReader<R>> = Pin::new(&mut char_reader.byte_reader);
+
+        loop {
+          match byte_reader.as_mut().poll_fill_buf(context).map_err(error::input)?
+          {
+            Poll::Ready(byte_buffer) => {
+              match str::from_utf8(&byte_buffer) {
+                Err(error) => {
+                  let n_valid = error.valid_up_to();
+                  if n_valid == 0 {
+                    byte_reader.as_mut().consume(1);
+                  } else {
+                    match str::from_utf8(&byte_buffer[..n_valid]) {
+                      Err(_) => {
+                        byte_reader.as_mut().consume(1);
+                      },
+                      Ok(chars) => {
+                        char_reader.char_buffer.push_str(chars);
+
+                        byte_reader.as_mut().consume(n_valid);
+
+                        break;
+                      },
+                    }
+                  }
+                }
+                Ok(chars) => {
+                  char_reader.char_buffer.push_str(chars);
+
+                  let n_to_consume = byte_buffer.len();
+                  byte_reader.as_mut().consume(n_to_consume);
+
+                  break;
+                }
+              }
+            }
+            Poll::Pending => {
+              return Poll::Pending;
+            }
+          }
+        }
+
+        return Poll::Ready(Ok(char_reader.char_buffer.to_string()));
+      }
+      Poll::Pending => {
+        return Poll::Pending;
+      }
+    }
+  }
+}
+
+
+impl<R: AsyncRead + Unpin> CharBufReader<R> {
+  pub fn new(input_stream: R) -> CharBufReader<R> {
+    let byte_reader = BufReader::new(input_stream);
+
+    CharBufReader {
+      byte_reader: byte_reader,
+      char_buffer: String::new(),
+    }
+  }
+
+  pub fn fill_buf(reader: Arc<Mutex<Self>>)
+    -> impl Future<Output = Result<String>>
+  {
+    FillBufFuture {
+      char_reader: reader,
+    }
+  }
+
+  pub fn consume(&mut self, amount: usize) {
+    self.char_buffer.replace_range(..amount, "");
+  }
+}
+