Skip to content

fix: save in-progress data for interrupted reads #55

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 59 additions & 16 deletions src/archive.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::{
cmp,
io::Cursor,
pin::Pin,
sync::{Arc, Mutex},
};
Expand Down Expand Up @@ -173,9 +174,9 @@ impl<R: Read + Unpin> Archive<R> {
archive: self,
current: (0, None, 0, None),
fields: None,
gnu_longlink: None,
gnu_longname: None,
pax_extensions: None,
gnu_longlink: (false, None),
gnu_longname: (false, None),
pax_extensions: (false, None),
})
}

Expand Down Expand Up @@ -270,9 +271,9 @@ pub struct Entries<R: Read + Unpin> {
archive: Archive<R>,
current: (u64, Option<Header>, usize, Option<GnuExtSparseHeader>),
fields: Option<EntryFields<Archive<R>>>,
gnu_longname: Option<Vec<u8>>,
gnu_longlink: Option<Vec<u8>>,
pax_extensions: Option<Vec<u8>>,
gnu_longname: (bool, Option<Cursor<Vec<u8>>>),
gnu_longlink: (bool, Option<Cursor<Vec<u8>>>),
pax_extensions: (bool, Option<Cursor<Vec<u8>>>),
}

macro_rules! ready_opt_err {
Expand Down Expand Up @@ -318,45 +319,87 @@ impl<R: Read + Unpin> Stream for Entries<R> {
let is_recognized_header =
fields.header.as_gnu().is_some() || fields.header.as_ustar().is_some();
if is_recognized_header && fields.header.entry_type().is_gnu_longname() {
if this.gnu_longname.is_some() {
if this.gnu_longname.0 {
return Poll::Ready(Some(Err(other(
"two long name entries describing \
the same member",
))));
}
let cursor = this.gnu_longname.1.get_or_insert(Cursor::new(Vec::new()));

if let Poll::Ready(result) = Pin::new(fields).poll_read_all(cx, cursor) {
if let Err(err) = result {
return Poll::Ready(Some(Err(err)));
}
} else {
return Poll::Pending;
}

*this.gnu_longname = Some(ready_err!(Pin::new(fields).poll_read_all(cx)));
this.gnu_longname.0 = true;
*this.fields = None;
continue;
}

if is_recognized_header && fields.header.entry_type().is_gnu_longlink() {
if this.gnu_longlink.is_some() {
if this.gnu_longlink.0 {
return Poll::Ready(Some(Err(other(
"two long name entries describing \
"two long link entries describing \
the same member",
))));
}
*this.gnu_longlink = Some(ready_err!(Pin::new(fields).poll_read_all(cx)));
let cursor = this.gnu_longlink.1.get_or_insert(Cursor::new(Vec::new()));

if let Poll::Ready(result) = Pin::new(fields).poll_read_all(cx, cursor) {
if let Err(err) = result {
return Poll::Ready(Some(Err(err)));
}
} else {
return Poll::Pending;
}

this.gnu_longlink.0 = true;
*this.fields = None;
continue;
}

if is_recognized_header && fields.header.entry_type().is_pax_local_extensions() {
if this.pax_extensions.is_some() {
if this.pax_extensions.0 {
return Poll::Ready(Some(Err(other(
"two pax extensions entries describing \
the same member",
))));
}
*this.pax_extensions = Some(ready_err!(Pin::new(fields).poll_read_all(cx)));
let cursor = this.pax_extensions.1.get_or_insert(Cursor::new(Vec::new()));

if let Poll::Ready(result) = Pin::new(fields).poll_read_all(cx, cursor) {
if let Err(err) = result {
return Poll::Ready(Some(Err(err)));
}
} else {
return Poll::Pending;
}

this.pax_extensions.0 = true;
*this.fields = None;
continue;
}

fields.long_pathname = this.gnu_longname.take();
fields.long_linkname = this.gnu_longlink.take();
fields.pax_extensions = this.pax_extensions.take();
if this.gnu_longname.0 {
fields.long_pathname = this.gnu_longname.1.take().map(|cursor| cursor.into_inner());
this.gnu_longname.0 = false;
}
if this.gnu_longlink.0 {
fields.long_linkname = this.gnu_longlink.1.take().map(|cursor| cursor.into_inner());
this.gnu_longlink.0 = false;
}
if this.pax_extensions.0 {
fields.pax_extensions = this
.pax_extensions
.1
.take()
.map(|cursor| cursor.into_inner());
this.pax_extensions.0 = false;
}

let (next, _, current_pos, current_ext) = &mut this.current;
ready_err!(poll_parse_sparse_header(
Expand Down
19 changes: 14 additions & 5 deletions src/entry.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use std::{
borrow::Cow,
cmp, fmt, marker,
cmp, fmt,
io::{Cursor, Write},
marker,
pin::Pin,
task::{Context, Poll},
};
Expand Down Expand Up @@ -337,15 +339,22 @@ impl<R: Read + Unpin> EntryFields<R> {
pub(crate) fn poll_read_all(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<io::Result<Vec<u8>>> {
out: &mut Cursor<Vec<u8>>,
) -> Poll<io::Result<()>> {
// Preallocate some data but don't let ourselves get too crazy now.
let cap = cmp::min(self.size, 128 * 1024);
let mut buf = Vec::with_capacity(cap as usize);

// Copied from futures::ReadToEnd
match async_std::task::ready!(poll_read_all_internal(self, cx, &mut buf)) {
Ok(_) => Poll::Ready(Ok(buf)),
Err(err) => Poll::Ready(Err(err)),
match poll_read_all_internal(self, cx, &mut buf) {
Poll::Ready(t) => {
out.write_all(&buf)?;
Poll::Ready(t.map(|_| ()))
}
Poll::Pending => {
out.write_all(&buf)?;
Poll::Pending
}
}
}

Expand Down
Loading