Skip to content

Commit d3993d0

Browse files
add BufWriter to limit calls to ZSTD_compressStream
1 parent c430e32 commit d3993d0

File tree

2 files changed

+23
-7
lines changed

2 files changed

+23
-7
lines changed

libdd-profiling/src/internal/observation/timestamped_observations.rs

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,12 @@ use crate::collections::identifiable::Id;
1212
use crate::internal::Timestamp;
1313
use crate::profiles::{DefaultObservationCodec as DefaultCodec, ObservationCodec};
1414
use byteorder::{NativeEndian, ReadBytesExt};
15-
use std::io::{self, Write};
15+
use std::io::{self, BufWriter, Write};
1616

1717
pub type TimestampedObservations = TimestampedObservationsImpl<DefaultCodec>;
1818

1919
pub struct TimestampedObservationsImpl<C: ObservationCodec> {
20-
compressed_timestamped_data: C::Encoder,
20+
compressed_timestamped_data: BufWriter<C::Encoder>,
2121
sample_types_len: usize,
2222
}
2323

@@ -40,10 +40,10 @@ impl<C: ObservationCodec> TimestampedObservationsImpl<C> {
4040

4141
pub fn try_new(sample_types_len: usize) -> io::Result<Self> {
4242
Ok(Self {
43-
compressed_timestamped_data: C::new_encoder(
44-
Self::DEFAULT_BUFFER_SIZE,
45-
Self::MAX_CAPACITY,
46-
)?,
43+
compressed_timestamped_data: BufWriter::with_capacity(
44+
C::recommended_input_buf_size(),
45+
C::new_encoder(Self::DEFAULT_BUFFER_SIZE, Self::MAX_CAPACITY)?,
46+
),
4747
sample_types_len,
4848
})
4949
}
@@ -74,8 +74,12 @@ impl<C: ObservationCodec> TimestampedObservationsImpl<C> {
7474
}
7575

7676
pub fn try_into_iter(self) -> io::Result<TimestampedObservationsIterImpl<C>> {
77+
let encoder = self
78+
.compressed_timestamped_data
79+
.into_inner()
80+
.map_err(|e| e.into_error())?;
7781
Ok(TimestampedObservationsIterImpl {
78-
decoder: C::encoder_into_decoder(self.compressed_timestamped_data)?,
82+
decoder: C::encoder_into_decoder(encoder)?,
7983
sample_types_len: self.sample_types_len,
8084
})
8185
}

libdd-profiling/src/profiles/compressor.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,10 @@ pub trait ObservationCodec {
145145

146146
fn new_encoder(size_hint: usize, max_capacity: usize) -> io::Result<Self::Encoder>;
147147
fn encoder_into_decoder(encoder: Self::Encoder) -> io::Result<Self::Decoder>;
148+
149+
/// Returns the recommended input buffer size for the encoder.
150+
/// Used to size the `BufWriter` that wraps the encoder.
151+
fn recommended_input_buf_size() -> usize;
148152
}
149153

150154
#[allow(unused)]
@@ -161,6 +165,10 @@ impl ObservationCodec for NoopObservationCodec {
161165
fn encoder_into_decoder(encoder: Self::Encoder) -> io::Result<Self::Decoder> {
162166
Ok(io::Cursor::new(encoder))
163167
}
168+
169+
fn recommended_input_buf_size() -> usize {
170+
0
171+
}
164172
}
165173

166174
#[allow(unused)]
@@ -181,6 +189,10 @@ impl ObservationCodec for ZstdObservationCodec {
181189
Err((_enc, error)) => Err(error),
182190
}
183191
}
192+
193+
fn recommended_input_buf_size() -> usize {
194+
zstd::Encoder::<SizeRestrictedBuffer>::recommended_input_size()
195+
}
184196
}
185197

186198
#[cfg(not(miri))]

0 commit comments

Comments
 (0)