Skip to content

Commit a35d8e5

Browse files
author
Geoffroy Couprie
committed
WIP: support intermediate flushes when encoding
due to the use of ready!(), whenever the underlying reader returns Poll::Pending, it is transmitted directly to the caller, so there is no flush until the entire data has been read
1 parent ada65c6 commit a35d8e5

File tree

1 file changed

+71
-10
lines changed

1 file changed

+71
-10
lines changed

src/tokio/bufread/generic/encoder.rs

Lines changed: 71 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ use tokio::io::{AsyncBufRead, AsyncRead, ReadBuf};
1313
enum State {
1414
Encoding,
1515
Flushing,
16+
Finishing,
1617
Done,
1718
}
1819

@@ -57,27 +58,80 @@ impl<R: AsyncBufRead, E: Encode> Encoder<R, E> {
5758
output: &mut PartialBuffer<&mut [u8]>,
5859
) -> Poll<Result<()>> {
5960
let mut this = self.project();
61+
let mut read = 0usize;
6062

6163
loop {
64+
println!("encoder state: {:?}", this.state);
6265
*this.state = match this.state {
6366
State::Encoding => {
64-
let input = ready!(this.reader.as_mut().poll_fill_buf(cx))?;
65-
if input.is_empty() {
66-
State::Flushing
67-
} else {
68-
let mut input = PartialBuffer::new(input);
69-
this.encoder.encode(&mut input, output)?;
70-
let len = input.written().len();
71-
this.reader.as_mut().consume(len);
72-
State::Encoding
67+
let res = this.reader.as_mut().poll_fill_buf(cx);
68+
69+
match res {
70+
Poll::Pending => {
71+
println!(
72+
"[{}] encoder got pending, read={}",
73+
std::time::SystemTime::now()
74+
.duration_since(std::time::UNIX_EPOCH)
75+
.unwrap()
76+
.as_secs(),
77+
read
78+
);
79+
if read == 0 {
80+
return Poll::Pending;
81+
} else {
82+
println!(
83+
"will flush, output unwritten = {}",
84+
output.unwritten().len()
85+
);
86+
87+
State::Flushing
88+
}
89+
}
90+
Poll::Ready(res) => {
91+
println!(
92+
"[{}]encoder: res_err={:?}",
93+
std::time::SystemTime::now()
94+
.duration_since(std::time::UNIX_EPOCH)
95+
.unwrap()
96+
.as_secs(),
97+
match &res {
98+
Ok(_) => String::new(),
99+
Err(e) => e.clone().to_string(),
100+
}
101+
);
102+
let input = res?;
103+
104+
if input.is_empty() {
105+
State::Finishing
106+
} else {
107+
println!(
108+
"encoder got input: {}",
109+
std::str::from_utf8(input).unwrap()
110+
);
111+
let mut input = PartialBuffer::new(input);
112+
this.encoder.encode(&mut input, output)?;
113+
let len = input.written().len();
114+
this.reader.as_mut().consume(len);
115+
read += len;
116+
State::Encoding
117+
}
118+
}
73119
}
74120
}
75121

76122
State::Flushing => {
123+
if this.encoder.flush(output)? {
124+
State::Encoding
125+
} else {
126+
State::Flushing
127+
}
128+
}
129+
130+
State::Finishing => {
77131
if this.encoder.finish(output)? {
78132
State::Done
79133
} else {
80-
State::Flushing
134+
State::Finishing
81135
}
82136
}
83137

@@ -87,7 +141,14 @@ impl<R: AsyncBufRead, E: Encode> Encoder<R, E> {
87141
if let State::Done = *this.state {
88142
return Poll::Ready(Ok(()));
89143
}
144+
145+
println!(
146+
"should send chunk: output.unwritten().len()={}, output.written().len()={}",
147+
output.unwritten().len(),
148+
output.written().len()
149+
);
90150
if output.unwritten().is_empty() {
151+
println!("returning chunk");
91152
return Poll::Ready(Ok(()));
92153
}
93154
}

0 commit comments

Comments
 (0)