diff options
author | Irene Knapp <ireneista@gmail.com> | 2021-06-07 03:28:22 -0700 |
---|---|---|
committer | Irene Knapp <ireneista@gmail.com> | 2021-06-07 03:28:22 -0700 |
commit | 2d5e6c6a0758a39ac16de217bc4bd6bd53104d1c (patch) | |
tree | 2f5c9a57924af984f2247cf89a27fa16f7a53a53 /src | |
parent | 86990efcf8fa63decb05f462d745b95e4992dc77 (diff) |
wooooo! switch to using tokio; make ^C work properly
Diffstat (limited to 'src')
-rw-r--r-- | src/main.rs | 15 | ||||
-rw-r--r-- | src/prelude.rs | 3 | ||||
-rw-r--r-- | src/terminal.rs | 89 | ||||
-rw-r--r-- | src/terminal/decoding.rs | 35 |
4 files changed, 63 insertions, 79 deletions
diff --git a/src/main.rs b/src/main.rs index 935155c..e94ce0c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,28 +1,26 @@ #![forbid(unsafe_code)] -use crate::prelude::*; - use crate::result::Result; use crate::terminal::{Input, Terminal}; -use async_std::io; use std::collections::HashMap; use std::collections::HashSet; use std::env; use std::os::unix::fs::PermissionsExt; use std::process::{self, Command}; +use tokio::io::{self, AsyncWriteExt}; #[macro_use] extern crate lalrpop_util; lalrpop_mod!(pub commandline); pub mod error; pub mod path; -pub mod prelude; pub mod result; pub mod terminal; -fn main() -> Result<()> { - let result = async_std::task::block_on(async { repl().await }); +#[tokio::main] +async fn main() -> Result<()> { + let result = repl().await; process::exit(match result { Ok(()) => 0, Err(ref e) => { @@ -61,8 +59,9 @@ async fn repl() -> Result<()> { async fn prompt() -> Result<()> { - print!("\n$ "); - io::stdout().flush().await?; + let mut stdout = io::stdout(); + stdout.write_all("\n$ ".as_bytes()).await?; + stdout.flush().await?; Ok(()) } diff --git a/src/prelude.rs b/src/prelude.rs deleted file mode 100644 index f678ba1..0000000 --- a/src/prelude.rs +++ /dev/null @@ -1,3 +0,0 @@ -#![forbid(unsafe_code)] -pub use async_std::prelude::*; - diff --git a/src/terminal.rs b/src/terminal.rs index 73ba035..f42d153 100644 --- a/src/terminal.rs +++ b/src/terminal.rs @@ -1,15 +1,15 @@ #![forbid(unsafe_code)] -use crate::prelude::*; use crate::terminal::prelude::*; use crate::terminal::decoding::CharBufReader; -use async_std::io::{self, Read}; -use async_std::os::unix::io::{AsRawFd, RawFd}; -use async_std::sync::{Arc, Mutex}; use nix::sys::termios::{self, Termios}; -use signal_hook::consts::TERM_SIGNALS; -use std::sync::atomic::{AtomicBool, Ordering}; +use std::os::unix::io::{AsRawFd, RawFd}; +use std::sync::Arc; +use tokio::io::{self, AsyncRead, AsyncWriteExt}; +use tokio::signal::unix::{signal, SignalKind}; +use tokio::sync::{mpsc, Mutex}; +use tokio::task; pub mod decoding; pub mod error; @@ -87,31 +87,41 @@ enum InputAction { } -pub struct Terminal<InputStream: Read + Unpin> { +pub struct Terminal<InputStream: AsyncRead + Unpin> { reader: Arc<Mutex<CharBufReader<InputStream>>>, - //reader: Arc<CharBufReader<InputStream>>, line_buffer: LineBuffer, file_descriptor: RawFd, initial_termios: Termios, - is_interrupted: Arc<AtomicBool>, + interrupt_receiver: mpsc::Receiver<()>, } -impl<InputStream: Read + AsRawFd + Unpin> Terminal<InputStream> { +async fn handle_signals(interrupt_sender: mpsc::Sender<()>) -> Result<()> +{ + let mut stream = signal(SignalKind::interrupt()).map_err(error::internal)?; + /* TODO make it work on other signals: + SignalKind::hangup(); + SignalKind::interrupt(); + SignalKind::terminate(); + SignalKind::quit(); + */ + + loop { + stream.recv().await; + interrupt_sender.send(()).await.map_err(error::internal)?; + } +} + + +impl<InputStream: AsyncRead + AsRawFd + Unpin> Terminal<InputStream> { pub fn init(input_stream: InputStream) -> Result<Terminal<InputStream>> { - let is_interrupted = Arc::new(AtomicBool::new(false)); + let (interrupt_sender, interrupt_receiver) = mpsc::channel(1); - for signal in TERM_SIGNALS { - signal_hook::flag::register(*signal, Arc::clone(&is_interrupted)) - .map_err(error::internal)?; - } + let _ = task::spawn(handle_signals(interrupt_sender)); let fd = input_stream.as_raw_fd(); let termios = termios::tcgetattr(fd).map_err(error::mode_setting)?; - let reader = Arc::new(Mutex::new(CharBufReader::new( - input_stream, Arc::clone(&is_interrupted)))); - //let reader = Arc::new(CharBufReader::new( - //input_stream, Arc::clone(&is_interrupted))); + let reader = Arc::new(Mutex::new(CharBufReader::new(input_stream))); let line_buffer = LineBuffer::new(); let terminal = Terminal { @@ -119,7 +129,7 @@ impl<InputStream: Read + AsRawFd + Unpin> Terminal<InputStream> { line_buffer: line_buffer, file_descriptor: fd, initial_termios: termios, - is_interrupted: is_interrupted, + interrupt_receiver: interrupt_receiver, }; terminal.init_modes()?; @@ -158,8 +168,6 @@ impl<InputStream: Read + AsRawFd + Unpin> Terminal<InputStream> { pub fn cleanup_modes(&self) -> Result<()> { - println!("de-initializing"); // DO NOT SUBMIT - let termios = self.initial_termios.clone(); termios::tcsetattr(self.file_descriptor, @@ -171,23 +179,20 @@ impl<InputStream: Read + AsRawFd + Unpin> Terminal<InputStream> { } - pub fn is_exiting(&self) -> bool { - self.is_interrupted.load(Ordering::Relaxed) - } - - pub async fn handle_input(&mut self) -> Result<Input> { - let is_interrupted = Arc::clone(&self.is_interrupted); - loop { let mut action: Option<InputAction> = None; - let string = CharBufReader::fill_buf(Arc::clone(&self.reader)).await.map_err(error::input)?; - - if is_interrupted.load(Ordering::Relaxed) { - break; - } + let string = tokio::select! { + result = CharBufReader::fill_buf(Arc::clone(&self.reader)) => { + let string: String = result.map_err(error::input)?; + string + } + _ = self.interrupt_receiver.recv() => { + return Ok(Input::End); + } + }; let mut chars = string.char_indices(); @@ -201,8 +206,11 @@ impl<InputStream: Read + AsRawFd + Unpin> Terminal<InputStream> { } _ => { self.line_buffer.insert(&c.to_string()); - print!("{}", c); - io::stdout().flush(); + + let mut stdout = io::stdout(); + stdout.write_all(format!("{}", c).as_bytes()).await + .map_err(error::internal)?; + stdout.flush().await.map_err(error::internal)?; } } @@ -230,13 +238,8 @@ impl<InputStream: Read + AsRawFd + Unpin> Terminal<InputStream> { } println!("line buffer {:?}", self.line_buffer); - if self.is_interrupted.load(Ordering::Relaxed) { - println!("exiting 3"); - Ok(Input::End) - } else { - let input = Input::String(self.line_buffer.as_string()); - Ok(input) - } + let input = Input::String(self.line_buffer.as_string()); + Ok(input) } } diff --git a/src/terminal/decoding.rs b/src/terminal/decoding.rs index 11f6046..018730e 100644 --- a/src/terminal/decoding.rs +++ b/src/terminal/decoding.rs @@ -3,33 +3,31 @@ use crate::terminal::prelude::*; use crate::terminal::error; -use async_std::future::Future; -use async_std::io::{BufRead, BufReader, Read}; -use async_std::sync::{Arc, Mutex}; -use async_std::task::{Context, Poll}; use pin_project::pin_project; use pin_utils::pin_mut; +use std::future::Future; use std::pin::Pin; use std::str; -use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; +use std::task::{Context, Poll}; +use tokio::io::{AsyncBufRead, AsyncRead, BufReader}; +use tokio::sync::Mutex; #[pin_project] -pub struct CharBufReader<R: Read + Unpin> { +pub struct CharBufReader<R: AsyncRead + Unpin> { #[pin] byte_reader: BufReader<R>, #[pin] char_buffer: String, - is_interrupted: Arc<AtomicBool>, } #[pin_project] -struct FillBufFuture<R: Read + Unpin> { +struct FillBufFuture<R: AsyncRead + Unpin> { char_reader: Arc<Mutex<CharBufReader<R>>>, - //char_reader: Arc<CharBufReader<R>>, } -impl<R: Read + Unpin> Future for FillBufFuture<R> { +impl<R: AsyncRead + Unpin> Future for FillBufFuture<R> { type Output = Result<String>; fn poll(self: Pin<&mut Self>, context: &mut Context<'_>) @@ -45,16 +43,9 @@ impl<R: Read + Unpin> Future for FillBufFuture<R> { let mut byte_reader: Pin<&mut BufReader<R>> = Pin::new(&mut char_reader.byte_reader); loop { - if char_reader.is_interrupted.load(Ordering::Relaxed) { - println!("char reader got interrupt"); - return Poll::Pending; - } - - println!("about to fill_buf"); match byte_reader.as_mut().poll_fill_buf(context).map_err(error::input)? { Poll::Ready(byte_buffer) => { - println!("done with fill_buf"); match str::from_utf8(&byte_buffer) { Err(error) => { let n_valid = error.valid_up_to(); @@ -86,7 +77,6 @@ impl<R: Read + Unpin> Future for FillBufFuture<R> { } } Poll::Pending => { - println!("fill_buf pending"); return Poll::Pending; } } @@ -95,7 +85,6 @@ impl<R: Read + Unpin> Future for FillBufFuture<R> { return Poll::Ready(Ok(char_reader.char_buffer.to_string())); } Poll::Pending => { - println!("char_reader mutex pending"); return Poll::Pending; } } @@ -103,21 +92,17 @@ impl<R: Read + Unpin> Future for FillBufFuture<R> { } -impl<R: Read + Unpin> CharBufReader<R> { - pub fn new(input_stream: R, is_interrupted: Arc<AtomicBool>) - -> CharBufReader<R> - { +impl<R: AsyncRead + Unpin> CharBufReader<R> { + pub fn new(input_stream: R) -> CharBufReader<R> { let byte_reader = BufReader::new(input_stream); CharBufReader { byte_reader: byte_reader, char_buffer: String::new(), - is_interrupted: is_interrupted, } } pub fn fill_buf(reader: Arc<Mutex<Self>>) - //pub fn fill_buf(reader: Arc<Self>) -> impl Future<Output = Result<String>> { FillBufFuture { |