From b31ac599118713bb720954e2d798f788fde8664f Mon Sep 17 00:00:00 2001 From: LightQuantum Date: Wed, 31 May 2023 16:07:08 +0800 Subject: [PATCH] fix(fetcher): crash on basis file download failure --- rsync-fetcher/src/rsync/downloader.rs | 39 ++++++++++++++++++++++----- 1 file changed, 33 insertions(+), 6 deletions(-) diff --git a/rsync-fetcher/src/rsync/downloader.rs b/rsync-fetcher/src/rsync/downloader.rs index 84f5b3d..a346500 100644 --- a/rsync-fetcher/src/rsync/downloader.rs +++ b/rsync-fetcher/src/rsync/downloader.rs @@ -1,14 +1,15 @@ +use std::fmt::{Debug, Display, Formatter}; use std::io::SeekFrom; use std::path::PathBuf; use std::sync::Arc; -use eyre::{eyre, Result}; +use eyre::{bail, eyre, Result}; use futures::stream::FuturesUnordered; use futures::{FutureExt, TryStreamExt}; use tokio::fs::{File, OpenOptions}; use tokio::io::{AsyncSeekExt, BufReader}; use tokio::sync::mpsc; -use tracing::debug; +use tracing::{debug, warn}; use rsync_core::s3::S3Opts; use rsync_core::utils::ToHex; @@ -54,6 +55,16 @@ struct DownloadEntry<'a> { path: &'a [u8], } +impl<'a> Debug for DownloadEntry<'a> { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("DownloadEntry") + .field("idx", &self.idx) + .field("blake2b_hash", &format!("{:x}", self.blake2b_hash.as_hex())) + .field("path", &String::from_utf8_lossy(self.path)) + .finish() + } +} + impl Downloader { pub async fn tasks(self, transfer_plan: &[TransferItem]) -> Result<()> { let (tx, rx) = flume::bounded(BASIS_BUFFER_LIMIT); @@ -112,7 +123,7 @@ impl Downloader { .truncate(true) .open(&basis_path) .await?; - let obj = self + let try_obj = self .s3 .get_object() .bucket(&self.s3_opts.bucket) @@ -122,9 +133,25 @@ impl Downloader { entry.blake2b_hash.as_hex() )) .send() - .await?; - let mut body = BufReader::new(obj.body.into_async_read()); - tokio::io::copy(&mut body, &mut basis_file).await?; + .await; + match try_obj { + Ok(obj) => { + let mut body = BufReader::new(obj.body.into_async_read()); + tokio::io::copy(&mut body, &mut basis_file).await?; + } + Err(e) => { + let e = e.into_service_error(); + if e.is_no_such_key() { + warn!( + ?entry, + "INCONSISTENCY: basis file exists in metadata but not present in S3. \ + Fallback to full file download" + ); + } else { + bail!(e); + } + } + } basis_file.seek(SeekFrom::Start(0)).await?; permit.send((entry.idx, basis_file));