1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
|
#![forbid(unsafe_code)]
use crate::terminal::prelude::*;
use crate::terminal::error;
use pin_project::pin_project;
use pin_utils::pin_mut;
use std::future::Future;
use std::pin::Pin;
use std::str;
use std::sync::Arc;
use std::task::{Context, Poll};
use tokio::io::{AsyncBufRead, AsyncRead, BufReader};
use tokio::sync::Mutex;
#[pin_project]
pub struct CharBufReader<R: AsyncRead + Unpin> {
#[pin] byte_reader: BufReader<R>,
#[pin] char_buffer: String,
}
#[pin_project]
struct FillBufFuture<R: AsyncRead + Unpin> {
char_reader: Arc<Mutex<CharBufReader<R>>>,
}
impl<R: AsyncRead + Unpin> Future for FillBufFuture<R> {
type Output = Result<String>;
fn poll(self: Pin<&mut Self>, context: &mut Context<'_>)
-> Poll<Self::Output>
{
let future = self.project();
let char_reader: &mut Arc<Mutex<CharBufReader<R>>> = future.char_reader;
let char_reader_future = char_reader.lock();
pin_mut!(char_reader_future);
match char_reader_future.poll(context) {
Poll::Ready(mut char_reader) => {
let char_reader = &mut *char_reader;
let mut byte_reader: Pin<&mut BufReader<R>> = Pin::new(&mut char_reader.byte_reader);
loop {
match byte_reader.as_mut().poll_fill_buf(context).map_err(error::input)?
{
Poll::Ready(byte_buffer) => {
match str::from_utf8(&byte_buffer) {
Err(error) => {
let n_valid = error.valid_up_to();
if n_valid == 0 {
byte_reader.as_mut().consume(1);
} else {
match str::from_utf8(&byte_buffer[..n_valid]) {
Err(_) => {
byte_reader.as_mut().consume(1);
},
Ok(chars) => {
char_reader.char_buffer.push_str(chars);
byte_reader.as_mut().consume(n_valid);
break;
},
}
}
}
Ok(chars) => {
char_reader.char_buffer.push_str(chars);
let n_to_consume = byte_buffer.len();
byte_reader.as_mut().consume(n_to_consume);
break;
}
}
}
Poll::Pending => {
return Poll::Pending;
}
}
}
return Poll::Ready(Ok(char_reader.char_buffer.to_string()));
}
Poll::Pending => {
return Poll::Pending;
}
}
}
}
impl<R: AsyncRead + Unpin> CharBufReader<R> {
pub fn new(input_stream: R) -> CharBufReader<R> {
let byte_reader = BufReader::new(input_stream);
CharBufReader {
byte_reader: byte_reader,
char_buffer: String::new(),
}
}
pub fn fill_buf(reader: Arc<Mutex<Self>>)
-> impl Future<Output = Result<String>>
{
FillBufFuture {
char_reader: reader,
}
}
pub fn consume(&mut self, amount: usize) {
self.char_buffer.replace_range(..amount, "");
}
}
|