Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 71 additions & 8 deletions src/unzip/seekable_http_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ const DEFAULT_MAX_BLOCK: usize = 1024 * 1024;
/// an expensive rewind.
const DEFAULT_SKIP_AHEAD_THRESHOLD: u64 = 2 * 1024 * 1024; // 2MB

/// keep track of number of retries we do when we encounter read failure from stream
/// keeping this number 3 for now,
const MAX_RETRIES: usize = 3;

/// A hint to the [`SeekableHttpReaderEngine`] about the expected access pattern.
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
pub(crate) enum AccessPattern {
Expand Down Expand Up @@ -381,6 +385,12 @@ impl SeekableHttpReaderEngine {
log::debug!("Immediate cache success");
return Ok(bytes_read_from_cache);
}
if state.read_failed_somewhere {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where is this being set?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea was when we consider two thread scenarios; when one reader thread enters this method and has already notified the second thread of the failure, but second thread is not 'on time' to receive the notification, this could result in the problem described in the issue.

but there could be better way of handling this.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI, this kind of resulted in problems for us, we checked in this change, we could see that there were issues of partial downloads. but when we revert the change, we still see the original problem, so maybe the fix is something more subtle.

return Err(std::io::Err::new(
std::io::ErrorKind::BrokenPipe,
"another thread experienced a problem"
));
}
// - If no, check if read in progress
let mut reading_stuff = state.reader.take();
// Is there read in progress?
Expand Down Expand Up @@ -487,26 +497,77 @@ impl SeekableHttpReaderEngine {
reader_created = true;
};

let (reader, reader_pos) = reading_stuff.reader.as_mut().unwrap();
if pos > *reader_pos {
let (mut reader, mut reader_pos) = reading_stuff.reader.take().unwrap();
if pos > reader_pos {
log::debug!(
"Read: reading ahead from 0x{:x} to 0x{:x} without skipping",
*reader_pos,
reader_pos,
pos
);
}
while pos >= *reader_pos {
while pos >= reader_pos {
// Fast forward beyond the desired position, recording any reads in the cache
// for later.
let to_read = min(max_block, self.len as usize - *reader_pos as usize);
let to_read = min(max_block, self.len as usize - reader_pos as usize);
let mut new_block = vec![0u8; to_read];
reader.read_exact(&mut new_block)?;

let mut retries = 0;
while retries < MAX_RETRIES {
match reader.read_exact(&mut new_block) {
Ok(_) => break,
Err(e) => match e.kind() {
ErrorKind::UnexpectedEof | ErrorKind::Interrupted => {
retries += 1;
// This is a bit of a hack, but it seems to be the only way
// to get the underlying stream to retry.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you explain more what happens? i am a bit hesitant about this way of retry

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm sure there is a better way to perform retries in rust,
so, here when I want to retry, I ensure that we use a new stream reader, and use 'take()' to get full ownership of the reader for the thread in execution. and we retry only in case of specific failures. now that we have a new reader,
ensure that we update the reading_stuff with the right information.

log::error!(
"Error reading from stream: {:?} - retrying {}/{}",
e,
retries,
MAX_RETRIES
);
// Recreate the HTTP stream since the server errored out on the old stream
reading_stuff.reader = Some((
BufReader::new(
reading_stuff
.range_fetcher
.fetch_range(reader_pos)
.map_err(|e| {
std::io::Error::new(
ErrorKind::Unsupported,
e.to_string(),
)
})?,
),
reader_pos,
));
let (new_reader, new_reader_pos) = reading_stuff.reader.take().unwrap();
// Update the reader and reader_pos variables
reader = new_reader;
reader_pos = new_reader_pos;
}
_ => {
log::error!("Error reading from stream: {:?}", e);
return Err(std::io::Error::new(
std::io::ErrorKind::BrokenPipe,
"error reading from stream",
));
}
},
}
}
if retries == MAX_RETRIES {
return Err(std::io::Error::new(
std::io::ErrorKind::BrokenPipe,
"reached max number of retries reading from stream",
));
}
// claim STATE mutex
let mut state = self.state.lock().unwrap();
state.insert(*reader_pos, new_block);
state.insert(reader_pos, new_block);
// Tell any waiting threads they should re-check the cache
self.read_completed.notify_all();
*reader_pos += to_read as u64;
reader_pos += to_read as u64;
}
// Because the above condition is >=, and because we know the request was not
// to read at the very end of the file, we know we now have some data in the
Expand All @@ -520,6 +581,8 @@ impl SeekableHttpReaderEngine {
if reader_created {
state.stats.num_http_streams += 1;
}
// Update reading_stuff with right information
reading_stuff.reader = Some((reader, reader_pos));
// return the underlying reader to the state so that some other
// thread can use it
state.reader = Some(reading_stuff);
Expand Down