Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,11 @@ readme = "README.md"
keywords = ["fastcgi", "fcgi", "client", "tokio", "php"]

[dependencies]
bytes = "1.10.1"
futures = { version = "0.3.31", default-features = false }
thiserror = "1.0.32"
tokio = { version = "1.20.1", features = ["io-util", "sync", "time"] }
tokio-util = { version = "0.7.15", features = ["io"] }
tracing = "0.1.36"

[dev-dependencies]
Expand Down
30 changes: 15 additions & 15 deletions benches/async_client_bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,30 +37,30 @@ async fn test_client<S: AsyncRead + AsyncWrite + Unpin>(client: &mut Client<S, K
let script_name = script_name.to_str().unwrap();

let params = Params::default()
.set_request_method("GET")
.set_document_root(document_root)
.set_script_name("/index.php")
.set_script_filename(script_name)
.set_request_uri("/index.php")
.set_document_uri("/index.php")
.set_remote_addr("127.0.0.1")
.set_remote_port("12345")
.set_server_addr("127.0.0.1")
.set_server_port("80")
.set_server_name("jmjoy-pc")
.set_content_type("")
.set_content_length("0");
.request_method("GET")
.document_root(document_root)
.script_name("/index.php")
.script_filename(script_name)
.request_uri("/index.php")
.document_uri("/index.php")
.remote_addr("127.0.0.1")
.remote_port(12345)
.server_addr("127.0.0.1")
.server_port(80)
.server_name("jmjoy-pc")
.content_type("")
.content_length(0);

let output = client
.execute(Request::new(params, &mut io::empty()))
.await
.unwrap();

let stdout = String::from_utf8(output.get_stdout().unwrap_or(Default::default())).unwrap();
let stdout = String::from_utf8(output.stdout.unwrap_or(Default::default())).unwrap();
assert!(stdout.contains("Content-type: text/html; charset=UTF-8"));
assert!(stdout.contains("\r\n\r\n"));
assert!(stdout.contains("hello"));
assert_eq!(output.get_stderr(), None);
assert_eq!(output.stderr, None);
}

#[bench]
Expand Down
4 changes: 2 additions & 2 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ impl<S: AsyncRead + AsyncWrite + Unpin> Client<S, ShortConn> {
/// # Examples
///
/// ```
/// use fastcgi_client::{response::Content, Client, Params, Request};
/// use fastcgi_client::{response::Content, Client, Params, Request, StreamExt};
/// use tokio::{io, net::TcpStream};
///
/// async fn stream() {
Expand Down Expand Up @@ -112,7 +112,7 @@ impl<S: AsyncRead + AsyncWrite + Unpin> Client<S, KeepAlive> {
/// # Examples
///
/// ```
/// use fastcgi_client::{response::Content, Client, Params, Request};
/// use fastcgi_client::{response::Content, Client, Params, Request, StreamExt};
/// use tokio::{io, net::TcpStream};
///
/// async fn stream() {
Expand Down
3 changes: 3 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,7 @@ pub mod params;
pub mod request;
pub mod response;

/// Re Export StreamExt for .next support
pub use futures::stream::StreamExt;

pub use crate::{client::Client, error::*, params::Params, request::Request, response::Response};
261 changes: 117 additions & 144 deletions src/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,23 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::{
fmt::{self, Debug},
pin::Pin,
str,
task::Poll,
};

use bytes::{Bytes, BytesMut};
use futures::stream::Stream;
use tokio::io::AsyncRead;
use tokio_util::io::ReaderStream;
use tracing::debug;

use crate::{
meta::{EndRequestRec, Header, RequestType},
meta::{EndRequestRec, Header, RequestType, HEADER_LEN},
ClientError, ClientResult,
};
use std::{cmp::min, fmt, fmt::Debug, str};
use tokio::io::{AsyncRead, AsyncReadExt};
use tracing::debug;

/// Output of fastcgi request, contains STDOUT and STDERR.
#[derive(Default, Clone)]
Expand All @@ -37,177 +47,140 @@ impl Debug for Response {
}
}

pub enum Content<'a> {
Stdout(&'a [u8]),
Stderr(&'a [u8]),
}

#[derive(PartialEq)]
enum ReadStep {
Content,
Padding,
pub enum Content {
Stdout(Bytes),
Stderr(Bytes),
}

/// Generated by
/// [Client::execute_once_stream](crate::client::Client::execute_once_stream) or
/// [Client::execute_stream](crate::client::Client::execute_stream).
///
/// The [ResponseStream] does not implement `futures::Stream`, because
/// `futures::Stream` does not yet support GAT, so manually provide the
/// [next](ResponseStream::next) method, which support the `while let` syntax.
pub struct ResponseStream<S: AsyncRead + Unpin> {
stream: S,
stream: ReaderStream<S>,
id: u16,

ended: bool,

eof: bool,
header: Option<Header>,

content_buf: Vec<u8>,
content_read: usize,

read_step: ReadStep,
buf: BytesMut,
}

impl<S: AsyncRead + Unpin> ResponseStream<S> {
#[inline]
pub(crate) fn new(stream: S, id: u16) -> Self {
Self {
stream,
stream: ReaderStream::new(stream),
id,
ended: false,
eof: false,
header: None,
content_buf: vec![0; 4096],
content_read: 0,
read_step: ReadStep::Content,
buf: BytesMut::new(),
}
}

pub async fn next(&mut self) -> Option<ClientResult<Content<'_>>> {
if self.ended {
#[inline]
fn read_header(&mut self) -> Option<Header> {
if self.buf.len() < HEADER_LEN {
return None;
}
let buf = self.buf.split_to(HEADER_LEN);
let header = (&buf as &[u8]).try_into().expect("failed to read header");
Some(Header::new_from_buf(header))
}

loop {
if self.header.is_none() {
match Header::new_from_stream(&mut self.stream).await {
Ok(header) => {
self.header = Some(header);
}
Err(err) => {
self.ended = true;
return Some(Err(err.into()));
}
};
}
#[inline]
fn read_content(&mut self) -> Option<Bytes> {
let header = self.header.as_ref().unwrap();
let block_length = header.content_length as usize + header.padding_length as usize;
if self.buf.len() < block_length {
return None;
}
let content = self.buf.split_to(header.content_length as usize);
let _ = self.buf.split_to(header.padding_length as usize);
self.header = None;
Some(content.freeze())
}

let header = self.header.as_ref().unwrap();

match header.r#type.clone() {
RequestType::Stdout => match self.read_step {
ReadStep::Content => {
return self
.read_to_content(
header.content_length as usize,
Content::Stdout,
Self::prepare_for_read_padding,
)
.await;
}
ReadStep::Padding => {
self.read_to_content(
header.padding_length as usize,
Content::Stdout,
Self::prepare_for_read_header,
)
.await;
continue;
}
},
RequestType::Stderr => match self.read_step {
ReadStep::Content => {
return self
.read_to_content(
header.content_length as usize,
Content::Stderr,
Self::prepare_for_read_padding,
)
.await;
}
ReadStep::Padding => {
self.read_to_content(
header.padding_length as usize,
Content::Stderr,
Self::prepare_for_read_header,
)
.await;
continue;
}
},
RequestType::EndRequest => {
let end_request_rec =
match EndRequestRec::from_header(header, &mut self.stream).await {
Ok(rec) => rec,
Err(err) => {
self.ended = true;
return Some(Err(err.into()));
}
};
debug!(id = self.id, ?end_request_rec, "Receive from stream.");

self.ended = true;

return match end_request_rec
.end_request
.protocol_status
.convert_to_client_result(end_request_rec.end_request.app_status)
{
Ok(_) => None,
Err(err) => Some(Err(err)),
};
fn process_message(&mut self) -> Result<Option<Content>, ClientError> {
if self.buf.is_empty() {
return Ok(None);
}
if self.header.is_none() {
match self.read_header() {
Some(header) => self.header = Some(header),
None => return Ok(None),
}
}
let header = self.header.as_ref().unwrap();
match header.r#type.clone() {
RequestType::Stdout => {
if let Some(data) = self.read_content() {
return Ok(Some(Content::Stdout(data)));
}
r#type => {
self.ended = true;
return Some(Err(ClientError::UnknownRequestType {
request_type: r#type,
}));
}
RequestType::Stderr => {
if let Some(data) = self.read_content() {
return Ok(Some(Content::Stderr(data)));
}
}
}
}
RequestType::EndRequest => {
let header = header.clone();
let Some(data) = self.read_content() else {
return Ok(None);
};

async fn read_to_content<'a, T: 'a>(
&'a mut self, length: usize, content_fn: impl FnOnce(&'a [u8]) -> T,
prepare_for_next_fn: impl FnOnce(&mut Self),
) -> Option<ClientResult<T>> {
let content_len = self.content_buf.len();
let read = match self
.stream
.read(&mut self.content_buf[..min(content_len, length - self.content_read)])
.await
{
Ok(read) => read,
Err(err) => {
self.ended = true;
return Some(Err(err.into()));
}
};
let end = EndRequestRec::new_from_buf(header, &data);
debug!(id = self.id, ?end, "Receive from stream.");

self.content_read += read;
if self.content_read >= length {
self.content_read = 0;
prepare_for_next_fn(self);
self.eof = true;
end.end_request
.protocol_status
.convert_to_client_result(end.end_request.app_status)?;
return Ok(None);
}
r#type => {
self.eof = true;
return Err(ClientError::UnknownRequestType {
request_type: r#type,
});
}
}

Some(Ok(content_fn(&self.content_buf[..read])))
Ok(None)
}
}

fn prepare_for_read_padding(&mut self) {
self.read_step = ReadStep::Padding;
}
impl<S> Stream for ResponseStream<S>
where
S: AsyncRead + Unpin,
{
type Item = ClientResult<Content>;

fn prepare_for_read_header(&mut self) {
self.header = None;
self.read_step = ReadStep::Content;
fn poll_next(
mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
let mut pending = false;
loop {
match Pin::new(&mut self.stream).poll_next(cx) {
Poll::Ready(Some(Ok(data))) => {
self.buf.extend_from_slice(&data);

match self.process_message() {
Ok(Some(data)) => return Poll::Ready(Some(Ok(data))),
Ok(None) if self.eof => return Poll::Ready(None),
Ok(None) => continue,
Err(err) => return Poll::Ready(Some(Err(err))),
}
}
Poll::Ready(Some(Err(err))) => return Poll::Ready(Some(Err(err.into()))),
Poll::Ready(None) => break,
Poll::Pending => {
pending = true;
break;
}
}
}
match self.process_message() {
Ok(Some(data)) => Poll::Ready(Some(Ok(data))),
Ok(None) if !self.eof && pending => Poll::Pending,
Ok(None) => Poll::Ready(None),
Err(err) => Poll::Ready(Some(Err(err))),
}
}
}
Loading