Skip to content

Commit

Permalink
fix(fetcher): crash on basis file download failure
Browse files Browse the repository at this point in the history
  • Loading branch information
PhotonQuantum committed May 31, 2023
1 parent 1581058 commit b31ac59
Showing 1 changed file with 33 additions and 6 deletions.
39 changes: 33 additions & 6 deletions rsync-fetcher/src/rsync/downloader.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
use std::fmt::{Debug, Display, Formatter};
use std::io::SeekFrom;
use std::path::PathBuf;
use std::sync::Arc;

use eyre::{eyre, Result};
use eyre::{bail, eyre, Result};
use futures::stream::FuturesUnordered;
use futures::{FutureExt, TryStreamExt};
use tokio::fs::{File, OpenOptions};
use tokio::io::{AsyncSeekExt, BufReader};
use tokio::sync::mpsc;
use tracing::debug;
use tracing::{debug, warn};

use rsync_core::s3::S3Opts;
use rsync_core::utils::ToHex;
Expand Down Expand Up @@ -54,6 +55,16 @@ struct DownloadEntry<'a> {
path: &'a [u8],
}

impl<'a> Debug for DownloadEntry<'a> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("DownloadEntry")
.field("idx", &self.idx)
.field("blake2b_hash", &format!("{:x}", self.blake2b_hash.as_hex()))
.field("path", &String::from_utf8_lossy(self.path))
.finish()
}
}

impl Downloader {
pub async fn tasks(self, transfer_plan: &[TransferItem]) -> Result<()> {
let (tx, rx) = flume::bounded(BASIS_BUFFER_LIMIT);
Expand Down Expand Up @@ -112,7 +123,7 @@ impl Downloader {
.truncate(true)
.open(&basis_path)
.await?;
let obj = self
let try_obj = self
.s3
.get_object()
.bucket(&self.s3_opts.bucket)
Expand All @@ -122,9 +133,25 @@ impl Downloader {
entry.blake2b_hash.as_hex()
))
.send()
.await?;
let mut body = BufReader::new(obj.body.into_async_read());
tokio::io::copy(&mut body, &mut basis_file).await?;
.await;
match try_obj {
Ok(obj) => {
let mut body = BufReader::new(obj.body.into_async_read());
tokio::io::copy(&mut body, &mut basis_file).await?;
}
Err(e) => {
let e = e.into_service_error();
if e.is_no_such_key() {
warn!(
?entry,
"INCONSISTENCY: basis file exists in metadata but not present in S3. \
Fallback to full file download"
);
} else {
bail!(e);
}
}
}

basis_file.seek(SeekFrom::Start(0)).await?;
permit.send((entry.idx, basis_file));
Expand Down

0 comments on commit b31ac59

Please sign in to comment.