Skip to content

Allow streaming data into Archive #427

@chrisduerr

Description

@chrisduerr

This is somewhat of a follow-up to #272, however since that issue was rather old and imprecise, I think it's best to open a new one.

I've added an example for what I believe should work but doesn't at the bottom, if you'd rather skip the explanation I think most of the context should be understandable from the code alone.

To outline my goal first: I would like to download a .tar.bz2 file and write some of its archived files directly to disk without ever storing intermediates on the filesystem. An intermediate memory buffer is acceptable, but since the files could be big this must work with a memory buffer smaller than the compressed file size (in theory it should work without a memory buffer).

In #272 I've already outlined that reading data from an Archive in a streaming fashion is no problem, however I've been running into problems when trying to figure out how to get parts of a tar archive into an Archive, without breaking the ability to continue using that archive.

Fundamentally the idea for streaming data into an archive is simple: Create a reader that returns WouldBlock when the end of the input chunk is reached (or Ok(0), but that's less 'correct'), then continue returning WouldBlock on read until the next chunk is written to it. From Archive I'd expect that you should be able to construct an entries iterator before writing any chunks, then calling next on that should advance it to the end of the currently available data. The most 'difficult' part of this would be reading files from an Entry, since not the entire data can be read at a time, but just returning WouldBlock here should allow the consumer to go back to writing more data to the 'pipe'.

So in theory I think this should work just fine from what I understand of this crate, however in practice there's some issues. When an error like WouldBlock is encountered, that error is propagated upwards and the archive is marked as done. When returning Ok(0) prematurely, an error is created internally about an unexpected EOF and the archive is also marked as done. This means we can no longer use the iterator once the underlying data is exhausted.

tar-rs/src/archive.rs

Lines 565 to 585 in 20a6509

impl<'a> Iterator for EntriesFields<'a> {
type Item = io::Result<Entry<'a, io::Empty>>;
fn next(&mut self) -> Option<io::Result<Entry<'a, io::Empty>>> {
if self.done {
None
} else {
match self.next_entry() {
Ok(Some(e)) => Some(Ok(e)),
Ok(None) => {
self.done = true;
None
}
Err(e) => {
self.done = true;
Some(Err(e))
}
}
}
}
}

Not reusing the entries() iterator is not possible, since the archive can only create an iterator over entries while at "position 0".

I believe the following example is a usage of the API that could (and arguably should) work with Archive, but to my knowledge it does not work:

Streaming Write to Archive
use std::cell::Cell;
use std::io::{self, Read, Write};
use std::rc::Rc;
use std::sync::atomic::{AtomicUsize, Ordering};

use tar::Archive;

#[tokio::main]
async fn main() {
    let bytes = include_bytes!("/home/undeadleech/downloads/0.tar");

    let mut buffer = SharedBuffer::<8192>::new();
    let mut archive = Archive::new(buffer.clone());

    let mut entries = archive.entries().unwrap();
    for (i, chunk) in bytes.chunks(8000).enumerate() {
        println!("PROCESSING CHUNK {i}");

        if let Err(err) = buffer.write_all(chunk) {
            println!(" !! TERMINAL ERR: {err}");
            break;
        };

        while let Some(entry) = entries.next() {
            match entry {
                Ok(entry) => println!(" => FILE: {:?}", entry.path()),
                Err(err) => println!(" !! ERR: {err}"),
            }
        }

        println!("\n");
    }
}

#[derive(Clone)]
struct SharedBuffer<const N: usize> {
    buffer: Rc<Buffer<N>>,
}

impl<const N: usize> SharedBuffer<N> {
    fn new() -> Self {
        Self { buffer: Rc::new(Buffer::new()) }
    }
}

impl<const N: usize> Read for SharedBuffer<N> {
    fn read(&mut self, buf: &mut [u8]) -> Result<usize, io::Error> {
        self.buffer.read(buf)
    }
}

impl<const N: usize> Write for SharedBuffer<N> {
    fn write(&mut self, buf: &[u8]) -> Result<usize, io::Error> {
        self.buffer.write(buf)
    }

    fn flush(&mut self) -> Result<(), io::Error> {
        Ok(())
    }
}

struct Buffer<const N: usize> {
    buffer: Cell<[u8; N]>,
    pending: AtomicUsize,
}

impl<const N: usize> Buffer<N> {
    fn new() -> Self {
        Self { buffer: Cell::new([0; N]), pending: AtomicUsize::new(0) }
    }

    fn read(&self, buf: &mut [u8]) -> Result<usize, io::Error> {
        let pending = self.pending.load(Ordering::Relaxed);
        if pending == 0 {
            println!("READ WITH NO DATA PENDING");

            // TODO: Neither of these two work.
            return Err(std::io::Error::new(
                std::io::ErrorKind::WouldBlock,
                "no more data (would block)",
            ));
            // return Ok(0);
        }

        let to_read = buf.len().min(pending);
        let mut data = self.buffer.get();
        buf[..to_read].copy_from_slice(&data[..to_read]);

        // If there's more data pending, stay in read mode and update the buffer.
        let new_pending = pending - to_read;
        if new_pending > 0 {
            data.rotate_left(to_read);
        }

        self.pending.store(new_pending, Ordering::Relaxed);

        println!("READ {to_read} BYTES FROM BUFFER");

        Ok(to_read)
    }

    fn write(&self, buf: &[u8]) -> Result<usize, io::Error> {
        if self.pending.load(Ordering::Relaxed) != 0 {
            println!("WRITE WITH DATA ALREADY PENDING");

            // TODO: Neither of these two work.
            return Err(std::io::Error::new(
                std::io::ErrorKind::WouldBlock,
                "wrote twice without reading old data (would block)",
            ));
            // return Ok(0);
        }

        let to_write = buf.len().min(N);
        let mut data = [0; N];
        data[..to_write].copy_from_slice(&buf[..to_write]);
        self.buffer.set(data);

        self.pending.store(to_write, Ordering::Relaxed);

        println!("WROTE {to_write} BYTES TO BUFFER");

        Ok(to_write)
    }
}

This produces the following output:

PROCESSING CHUNK 0
WROTE 8000 BYTES TO BUFFER
READ 512 BYTES FROM BUFFER
 => FILE: Ok("valhalla/tiles/0/000/747.gph.gz")
READ 7488 BYTES FROM BUFFER
READ WITH NO DATA PENDING
 !! ERR: no more data (would block)


PROCESSING CHUNK 1
WROTE 8000 BYTES TO BUFFER


PROCESSING CHUNK 2
WRITE WITH DATA ALREADY PENDING
 !! TERMINAL ERR: wrote twice without reading old data (would block)

It is technically possible to avoid this issue by never reading enough data to run into a scenario where the tar extractor would run into the end of the buffer, but my simple (working) implementation of that is so convoluted and cursed, I can't imagine that's the intended solution?

A Cursed Solution
use std::cell::Cell;
use std::io::{self, Read, Write};
use std::rc::Rc;
use std::sync::atomic::{AtomicUsize, Ordering};

use tar::{Archive, Entry};

// Never read buffers smaller than this, since reading partial headers is
// terminal.
const MIN_PENDING_SIZE: usize = 1024;

#[tokio::main]
async fn main() {
    let bytes = include_bytes!("/home/undeadleech/downloads/0.tar");

    let mut buffer = SharedBuffer::<8192>::new();
    let mut archive = Archive::new(buffer.clone());

    let mut entries = archive.entries().unwrap();
    let mut current_entry: Option<Entry<_>> = None;

    let mut total = 0;
    'outer: for (i, chunk) in bytes.chunks(8192).enumerate() {
        println!("\n");
        println!("PROCESSING CHUNK {i} ({} | {total})", chunk.len());
        total += chunk.len();

        let mut written = 0;
        while written < chunk.len() {
            if written != 0 {
                println!("\n");
                println!("CONTINUING CHUNK {i} ({}/{})", written, chunk.len());
            }

            match buffer.write(&chunk[written..]) {
                Ok(new) => written += new,
                Err(err) => panic!(" !! TERMINAL ERR: {err}"),
            }

            if let Some(entry) = &mut current_entry {
                // Empty the buffer, by advancing the entries reader without running into
                // WouldBlock errors.
                let pending = buffer.buffer.pending.load(Ordering::Relaxed);
                println!("SKIPPING OVER UP TO {pending} BYTES");
                match entry.read(&mut vec![0; pending]) {
                    Ok(read) if read < pending => {
                        current_entry = None;
                    },
                    Ok(_) => continue,
                    Err(err) if err.kind() == io::ErrorKind::WouldBlock => current_entry = None,
                    Err(err) => panic!("{err}"),
                }
            }

            // Avoid running into WouldBlock.
            if buffer.buffer.pending.load(Ordering::Relaxed) < MIN_PENDING_SIZE {
                continue;
            }

            println!("GETTING NEXT ENTRY");
            let entry = match entries.next() {
                Some(entry) => entry,
                // We must break early here, since padding at the end of the archive could
                // overflow our buffer.
                None => break 'outer,
            };

            match entry {
                Ok(mut entry) => {
                    println!(" => FILE: {:?}", entry.path());

                    // Empty the buffer, by advancing the entries reader without running into
                    // WouldBlock errors.
                    let pending = buffer.buffer.pending.load(Ordering::Relaxed);
                    println!("INITIALLY SKIPPING OVER UP TO {pending} BYTES");
                    match entry.read(&mut vec![0; pending]) {
                        Ok(read) if read < pending => (),
                        Ok(_) => current_entry = Some(entry),
                        Err(err) if err.kind() == io::ErrorKind::WouldBlock => (),
                        Err(err) => panic!(" !! TERMINAL ERR: {err}"),
                    }
                },
                Err(err) => panic!(" !! TERMINAL ERR: {err}"),
            }
        }
    }

    println!("SUCCESS!!!");
}

#[derive(Clone)]
struct SharedBuffer<const N: usize> {
    buffer: Rc<Buffer<N>>,
}

impl<const N: usize> SharedBuffer<N> {
    fn new() -> Self {
        Self { buffer: Rc::new(Buffer::new()) }
    }
}

impl<const N: usize> Read for SharedBuffer<N> {
    fn read(&mut self, buf: &mut [u8]) -> Result<usize, io::Error> {
        self.buffer.read(buf)
    }
}

impl<const N: usize> Write for SharedBuffer<N> {
    fn write(&mut self, buf: &[u8]) -> Result<usize, io::Error> {
        self.buffer.write(buf)
    }

    fn flush(&mut self) -> Result<(), io::Error> {
        Ok(())
    }
}

struct Buffer<const N: usize> {
    buffer: Cell<[u8; N]>,
    pending: AtomicUsize,
}

impl<const N: usize> Buffer<N> {
    fn new() -> Self {
        Self { buffer: Cell::new([0; N]), pending: AtomicUsize::new(0) }
    }

    fn read(&self, buf: &mut [u8]) -> Result<usize, io::Error> {
        let pending = self.pending.load(Ordering::Relaxed);
        if pending == 0 {
            println!("READ WITH NO DATA PENDING");

            return Err(io::Error::new(io::ErrorKind::WouldBlock, "no more data (would block)"));
        }

        let to_read = buf.len().min(pending);
        let mut data = self.buffer.get();
        buf[..to_read].copy_from_slice(&data[..to_read]);

        // If there's more data pending, stay in read mode and update the buffer.
        let new_pending = pending - to_read;
        if new_pending > 0 {
            data.rotate_left(to_read);
            self.buffer.set(data);
        }

        self.pending.store(new_pending, Ordering::Relaxed);

        println!("READ {to_read} BYTES FROM BUFFER");

        Ok(to_read)
    }

    fn write(&self, buf: &[u8]) -> Result<usize, io::Error> {
        let pending = self.pending.load(Ordering::Relaxed);
        if pending == N {
            println!("WRITE WITH DATA ALREADY PENDING");

            return Err(io::Error::new(
                io::ErrorKind::WouldBlock,
                "wrote with buffer already full (would block)",
            ));
        }

        let to_write = buf.len().min(N - pending);
        let mut data = self.buffer.replace([0; N]);
        data[pending..pending + to_write].copy_from_slice(&buf[..to_write]);
        self.buffer.set(data);

        self.pending.store(pending + to_write, Ordering::Relaxed);

        println!("WROTE {to_write} BYTES TO BUFFER");

        Ok(to_write)
    }
}

I've also managed to make this work with flate2 gzip decompression, however at this point I've lost all grasp on how things might be working under the hood, so I'm not entirely sure if this is just using an infinite buffer in flate2 which makes it magically work, or if it's actually streaming. This didn't work with bzip2 at least, I'm still going to leave it here just because I've wasted too much time on it already.

Tried this with a bigger file later, turns out something in the middle here does allocate an arbitrarily sized buffer that will just lead to the OOM killer knocking on your door, so as far as I can tell no matter what you do there's no way to stream tar data in chunks with this crate.

Cursed Decompression
use std::cell::Cell;
use std::io::{self, Read, Write};
use std::rc::Rc;
use std::sync::atomic::{AtomicUsize, Ordering};

use flate2::read::GzDecoder;
use tar::{Archive, Entry};

#[tokio::main]
async fn main() {
    let bytes = include_bytes!("/home/undeadleech/downloads/0.tar.gz");

    /// This buffer size must be big enough to fit any tar header, since partial
    /// read of a header is unrecoverable.
    const BUFSIZE: usize = 8192;

    // Create a buffer that we can dump unused file contents into.
    //
    // This must be smaller than the maximum buffer size, since the data available
    // by the decompressor will be at least BUFSIZE, but anything smaller than
    // that risks running into WouldBlock.
    let mut sink = vec![0; BUFSIZE];

    let mut buffer = SharedBuffer::<BUFSIZE>::new();
    let decoder = GzDecoder::new(buffer.clone());
    let mut archive = Archive::new(decoder);

    let mut entries = archive.entries().unwrap();
    let mut current_entry: Option<Entry<_>> = None;

    let mut total = 0;
    'outer: for (i, chunk) in bytes.chunks(BUFSIZE).enumerate() {
        println!("\n");
        println!("PROCESSING CHUNK {i} ({} | {total})", chunk.len());
        total += chunk.len();

        let mut written = 0;
        while written < chunk.len() {
            if written != 0 {
                println!("\n");
                println!("CONTINUING CHUNK {i} ({}/{})", written, chunk.len());
            }

            // Fill buffer up with as much data as possible.
            //
            // We always need to make sure enough data is available so when we call
            // `entries.next()` we don't run into `WouldBlock`. For files we can handle
            // `WouldBlock` ourselves, so that is not an issue.
            if buffer.buffer.pending.load(Ordering::Relaxed) < BUFSIZE {
                match buffer.write(&chunk[written..]) {
                    Ok(new) => written += new,
                    Err(err) => panic!(" !! TERMINAL ERR: {err}"),
                }
            }

            // If we're currently processing an entry, read all of the file's content.
            if let Some(entry) = &mut current_entry {
                println!("SKIPPING OVER FILE CONTENT");

                match entry.read(&mut sink) {
                    Ok(read) => {
                        println!("  SKIPPED {read} BYTES");

                        // Ok(0) indicates more data could be read (otherwise it would be
                        // `WouldBlock`), but the file is done.
                        if read == 0 {
                            current_entry = None;
                        }

                        continue;
                    },
                    Err(err) if err.kind() == io::ErrorKind::WouldBlock => (),
                    Err(err) => panic!("{err}"),
                }
            }

            println!("GETTING NEXT ENTRY");
            let entry = match entries.next() {
                Some(entry) => entry,
                // We must break early here, since padding at the end of the archive could
                // overflow our buffer.
                None => break 'outer,
            };

            match entry {
                Ok(mut entry) => {
                    println!(" => FILE: {:?}", entry.path());
                    current_entry = Some(entry);
                },
                Err(err) => panic!(" !! TERMINAL ERR: {err}"),
            }
        }
    }

    println!("SUCCESS!!!");
}

#[derive(Clone)]
struct SharedBuffer<const N: usize> {
    buffer: Rc<Buffer<N>>,
}

impl<const N: usize> SharedBuffer<N> {
    fn new() -> Self {
        Self { buffer: Rc::new(Buffer::new()) }
    }
}

impl<const N: usize> Read for SharedBuffer<N> {
    fn read(&mut self, buf: &mut [u8]) -> Result<usize, io::Error> {
        self.buffer.read(buf)
    }
}

impl<const N: usize> Write for SharedBuffer<N> {
    fn write(&mut self, buf: &[u8]) -> Result<usize, io::Error> {
        self.buffer.write(buf)
    }

    fn flush(&mut self) -> Result<(), io::Error> {
        Ok(())
    }
}

struct Buffer<const N: usize> {
    buffer: Cell<[u8; N]>,
    pending: AtomicUsize,
}

impl<const N: usize> Buffer<N> {
    fn new() -> Self {
        Self { buffer: Cell::new([0; N]), pending: AtomicUsize::new(0) }
    }

    fn read(&self, buf: &mut [u8]) -> Result<usize, io::Error> {
        let pending = self.pending.load(Ordering::Relaxed);
        if pending == 0 {
            println!("READ WITH NO DATA PENDING");

            return Err(io::Error::new(io::ErrorKind::WouldBlock, "no more data (would block)"));
        }

        let to_read = buf.len().min(pending);
        let mut data = self.buffer.get();
        buf[..to_read].copy_from_slice(&data[..to_read]);

        // If there's more data pending, stay in read mode and update the buffer.
        let new_pending = pending - to_read;
        if new_pending > 0 {
            data.rotate_left(to_read);
            self.buffer.set(data);
        }

        self.pending.store(new_pending, Ordering::Relaxed);

        println!("READ {to_read} BYTES FROM BUFFER");

        Ok(to_read)
    }

    fn write(&self, buf: &[u8]) -> Result<usize, io::Error> {
        let pending = self.pending.load(Ordering::Relaxed);
        if pending == N {
            println!("WRITE WITH DATA ALREADY PENDING");

            return Err(io::Error::new(
                io::ErrorKind::WouldBlock,
                "wrote with buffer already full (would block)",
            ));
        }

        let to_write = buf.len().min(N - pending);
        let mut data = self.buffer.replace([0; N]);
        data[pending..pending + to_write].copy_from_slice(&buf[..to_write]);
        self.buffer.set(data);

        self.pending.store(pending + to_write, Ordering::Relaxed);

        println!("WROTE {to_write} BYTES TO BUFFER");

        Ok(to_write)
    }
}

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions