Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 47 additions & 8 deletions data/src/shard_interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@ use std::time::SystemTime;
use bytes::Bytes;
use cas_client::Client;
use error_printer::ErrorPrinter;
use mdb_shard::ShardFileManager;
use mdb_shard::cas_structs::MDBCASInfo;
use mdb_shard::constants::MDB_SHARD_MAX_TARGET_SIZE;
use mdb_shard::file_structs::{FileDataSequenceEntry, MDBFileInfo};
use mdb_shard::session_directory::{ShardMergeResult, consolidate_shards_in_directory, merge_shards_background};
use mdb_shard::shard_in_memory::MDBInMemoryShard;
use mdb_shard::{MDBShardFile, MDBShardFileHeader, ShardFileManager};
use merklehash::MerkleHash;
use tempfile::TempDir;
use tokio::sync::Mutex;
Expand Down Expand Up @@ -271,13 +271,7 @@ impl SessionShardInterface {
debug!("Uploading shard {shard_prefix}/{:?} from staging area to CAS.", &si.shard_hash);

let data: Bytes = if !shard_client.use_shard_footer() {
let split_off_index = si.shard.metadata.file_lookup_offset as usize;
// Read only the portion of the shard file up to the file_lookup_offset,
// which excludes the footer and lookup sections.
let mut file = File::open(&si.path)?;
let mut buf = vec![0u8; split_off_index];
file.read_exact(&mut buf)?;
Bytes::from(buf)
read_shard_to_bytes_remove_footer(&si)?
} else {
std::fs::read(&si.path)?.into()
};
Expand Down Expand Up @@ -328,3 +322,48 @@ impl SessionShardInterface {
Ok(shard_bytes_uploaded.load(Ordering::Relaxed))
}
}

fn read_shard_to_bytes_remove_footer(si: &Arc<MDBShardFile>) -> Result<Bytes> {
let split_off_index = si.shard.metadata.file_lookup_offset as usize;
// Read only the portion of the shard file up to the file_lookup_offset,
// which excludes the footer and lookup sections.
let mut file = File::open(&si.path)?;
let mut buf = vec![0u8; split_off_index];
file.read_exact(&mut buf)?;
// re-write the header to set footer_size to 0.
let mut header = si.shard.header.clone();
header.footer_size = 0;
header.serialize(&mut (&mut buf[..size_of::<MDBShardFileHeader>()]))?;
#[cfg(debug_assertions)]
{
let new_header =
MDBShardFileHeader::deserialize(&mut std::io::Cursor::new(&buf[..size_of::<MDBShardFileHeader>()]))?;
debug_assert_eq!(new_header.footer_size, 0);
}
Ok(Bytes::from(buf))
}

#[cfg(test)]
mod tests {
use std::io::Cursor;

use super::*;

#[test]
fn test_read_shard_to_bytes_remove_footer() -> Result<()> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing the #[test] attribute

let tmp_dir = TempDir::with_prefix("test_read_shard_to_bytes_remove_footer")?;
let tmp_dir_path = tmp_dir.path();

let mdb_in_mem = MDBInMemoryShard::default();
let temp_shard_file_path = mdb_in_mem.write_to_directory(tmp_dir_path, None)?;

let shard_file = MDBShardFile::load_from_file(&temp_shard_file_path)?;
assert_eq!(shard_file.shard.header.footer_size, size_of::<mdb_shard::MDBShardFileFooter>() as u64);

let no_footer_shard_buf = read_shard_to_bytes_remove_footer(&shard_file)?;
let buf_shard_header =
MDBShardFileHeader::deserialize(&mut Cursor::new(&no_footer_shard_buf[..size_of::<MDBShardFileHeader>()]))?;
assert_eq!(buf_shard_header.footer_size, 0);
Ok(())
}
}
Loading