Skip to content

Decompression Threads #1

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 53 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 10 additions & 7 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,21 @@ repository = "https://github.com/ZeroTwo-Bot/zlib-stream-rs"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
log = "0.4.14"
env_logger = "0.9.0"
thiserror = "1.0.28"
log = "^0.4"
env_logger = "^0.9"
thiserror = "^1.0"
flate2 = { version = "1.0", features = ["cloudflare_zlib"], default-features = false }

futures-util = { version = "0.3.17", optional = true }
tokio = { version = "^1", features = ["rt-multi-thread"], optional = true }
futures-util = { version = "^0.3", optional = true }
futures = { version = "^0.3", optional = true }
tokio = { version = "^1", features = ["rt", "rt-multi-thread"], optional = true }

[dev-dependencies]
tokio = { version = "1.14.0", features = ["rt", "macros"] }
tokio = { version = "1.14.0", features = ["rt", "rt-multi-thread", "macros"] }

[features]
default = ["stream"]
tokio-runtime = ["stream", "tokio"]
stream = ["futures-util"]
thread = ["futures"]
chunk = ["stream"]
tokio-runtime = ["tokio", "thread"]
57 changes: 57 additions & 0 deletions src/chunk.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
use futures_util::{Stream, StreamExt};
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};

pub struct ChunkedByteStream<V: AsRef<[u8]> + Sized, T: Stream<Item = V> + Unpin> {
stream: T,
max_chunk_size: usize,
pending_frame: Option<Vec<u8>>,
}

impl<V: AsRef<[u8]> + Sized, T: Stream<Item = V> + Unpin> ChunkedByteStream<V, T> {
/// Creates a new ChunkedByteStream object with a defined chunk size which splits incoming
/// vec chunks into chunks of the defined `max_chunk_size`.
/// `max_chunk_size` must not be lower than 64 to avoid issues with the `ZlibStream` implementation.
pub fn new(stream: T, max_chunk_size: usize) -> Self {
Self {
stream,
max_chunk_size,
pending_frame: None,
}
}
}

impl<V: AsRef<[u8]> + Sized, T: Stream<Item = V> + Unpin> Stream for ChunkedByteStream<V, T> {
type Item = Vec<u8>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if let Some(frame) = &self.pending_frame {
let frame = frame.clone();
let send_frame = if frame.len() > self.max_chunk_size {
self.pending_frame = Some(frame[self.max_chunk_size..].to_owned());
frame[..self.max_chunk_size].to_owned()
} else {
self.pending_frame = None;
frame
};
return Poll::Ready(Some(send_frame));
}
match Pin::new(&mut self.stream.next()).poll(cx) {
Poll::Ready(data) => {
if let Some(data) = data {
let vec = data.as_ref().to_vec();
if vec.len() > self.max_chunk_size {
self.pending_frame = Some(vec[self.max_chunk_size..].to_owned());
Poll::Ready(Some(vec[..self.max_chunk_size].to_owned()))
} else {
Poll::Ready(Some(vec))
}
} else {
Poll::Ready(None)
}
}
Poll::Pending => Poll::Pending,
}
}
}
4 changes: 4 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
#[cfg(feature = "chunk")]
pub mod chunk;
#[cfg(feature = "stream")]
pub mod stream;
#[cfg(feature = "thread")]
pub mod thread;

#[cfg(test)]
mod test;
Expand Down
78 changes: 73 additions & 5 deletions src/stream.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#[cfg(feature = "thread")]
use crate::thread::ZlibStreamDecompressorThread;
use crate::{ZlibDecompressionError, ZlibStreamDecompressor};
use flate2::DecompressError;
use futures_util::{Stream, StreamExt};
Expand All @@ -6,26 +8,42 @@ use std::pin::Pin;
use std::task::{Context, Poll};

pub struct ZlibStream<V: AsRef<[u8]> + Sized, T: Stream<Item = V> + Unpin> {
#[cfg(not(feature = "thread"))]
decompressor: ZlibStreamDecompressor,
#[cfg(feature = "thread")]
decompressor: ZlibStreamDecompressorThread,
stream: T,
#[cfg(feature = "thread")]
thread_poll:
Option<futures::channel::oneshot::Receiver<Result<Vec<u8>, ZlibDecompressionError>>>,
}

impl<V: AsRef<[u8]> + Sized, T: Stream<Item = V> + Unpin> ZlibStream<V, T> {
/// Creates a new ZlibStream object with the default decompressor and the underlying
/// stream as data source
pub fn new(stream: T) -> Self {
#[cfg(not(feature = "thread"))]
let decompressor = Default::default();
#[cfg(feature = "thread")]
let decompressor = ZlibStreamDecompressorThread::spawn(Default::default());
Self {
decompressor: Default::default(),
decompressor,
stream,
#[cfg(feature = "thread")]
thread_poll: None,
}
}

/// Creates a new ZlibStream object with the specified decompressor and the underlying
/// stream as data source
pub fn new_with_decompressor(decompressor: ZlibStreamDecompressor, stream: T) -> Self {
#[cfg(feature = "thread")]
let decompressor = ZlibStreamDecompressorThread::spawn(decompressor);
Self {
decompressor,
stream,
#[cfg(feature = "thread")]
thread_poll: None,
}
}
}
Expand All @@ -34,15 +52,29 @@ impl<V: AsRef<[u8]> + Sized, T: Stream<Item = V> + Unpin> Stream for ZlibStream<
type Item = Result<Vec<u8>, DecompressError>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
#[cfg(feature = "thread")]
if let Some(poll) = &mut self.thread_poll {
if let Some(poll) = poll_decompress_channel(poll, cx) {
match poll {
Poll::Ready(_) => {
self.thread_poll = None;
}
Poll::Pending => {
cx.waker().wake_by_ref();
}
}
return poll;
}
}
match Pin::new(&mut self.stream.next()).poll(cx) {
Poll::Ready(vec) => {
if let Some(vec) = vec {
#[cfg(feature = "tokio-runtime")]
let result = tokio::task::block_in_place(|| self.decompressor.decompress(vec));

#[cfg(not(feature = "tokio-runtime"))]
#[cfg(not(feature = "thread"))]
let result = self.decompressor.decompress(vec);
#[cfg(feature = "thread")]
let mut result = self.decompressor.decompress(vec.as_ref().to_vec());

#[cfg(not(feature = "thread"))]
match result {
Ok(data) => Poll::Ready(Some(Ok(data))),
Err(ZlibDecompressionError::NeedMoreData) => {
Expand All @@ -53,6 +85,20 @@ impl<V: AsRef<[u8]> + Sized, T: Stream<Item = V> + Unpin> Stream for ZlibStream<
Poll::Ready(Some(Err(err)))
}
}

#[cfg(feature = "thread")]
{
if let Some(poll) = poll_decompress_channel(&mut result, cx) {
if let Poll::Pending = poll {
self.thread_poll = Some(result);
cx.waker().wake_by_ref();
}
poll
} else {
cx.waker().wake_by_ref();
Poll::Pending
}
}
} else {
Poll::Ready(None)
}
Expand All @@ -61,3 +107,25 @@ impl<V: AsRef<[u8]> + Sized, T: Stream<Item = V> + Unpin> Stream for ZlibStream<
}
}
}

#[cfg(feature = "thread")]
fn poll_decompress_channel(
channel: &mut futures::channel::oneshot::Receiver<Result<Vec<u8>, ZlibDecompressionError>>,
cx: &mut Context<'_>,
) -> Option<Poll<Option<Result<Vec<u8>, DecompressError>>>> {
match Pin::new(channel).poll(cx) {
Poll::Ready(outer) => match outer {
Ok(result) => match result {
Ok(data) => Some(Poll::Ready(Some(Ok(data)))),
Err(ZlibDecompressionError::DecompressError(err)) => {
Some(Poll::Ready(Some(Err(err))))
}
_ => None,
},
Err(_cancelled) => return Some(Poll::Ready(None)),
},
Poll::Pending => {
return Some(Poll::Pending);
}
}
}
47 changes: 44 additions & 3 deletions src/test.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
#[cfg(feature = "chunk")]
use crate::chunk::ChunkedByteStream;
#[cfg(feature = "stream")]
use crate::stream::ZlibStream;
use crate::{ZlibDecompressionError, ZlibStreamDecompressor};
use futures_util::{Stream, StreamExt};
use std::pin::Pin;
use futures_util::StreamExt;

fn payload() -> Vec<u8> {
vec![
Expand Down Expand Up @@ -72,7 +74,7 @@ async fn test_stream() {
let stream = futures_util::stream::iter(stream);
let mut stream = ZlibStream::new(stream);

let result = futures_util::future::poll_fn(move |cx| Pin::new(&mut stream).poll_next(cx)).await;
let result = stream.next().await;
assert_eq!(
inflated(),
String::from_utf8(
Expand Down Expand Up @@ -104,3 +106,42 @@ async fn test_stream_split() {
.unwrap()
)
}

#[cfg(feature = "chunk")]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_chunk_stream() {
let chunk_size = 8usize;
let data = vec![payload()];

let stream = futures_util::stream::iter(data);
let mut stream = ChunkedByteStream::new(stream, chunk_size);
let mut concat = vec![];
while let Some(data) = stream.next().await {
concat.extend_from_slice(data.as_slice());
assert!(data.len() <= chunk_size, "Data size exceeded threshold!")
}

assert_eq!(concat, payload(), "Payloads aren't equal")
}

#[cfg(feature = "chunk")]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_chunk_stream_zlib() {
let chunk_size = 55usize;
let data = vec![payload()];

let stream = futures_util::stream::iter(data);
let stream = ChunkedByteStream::new(stream, chunk_size);
let mut stream = ZlibStream::new(stream);

let result = stream.next().await;
assert_eq!(
inflated(),
String::from_utf8(
result
.expect("Poll returned end of stream")
.expect("Decompression failed")
)
.unwrap()
)
}
Loading