Skip to content

Commit

Permalink
feat: build Payload in encode_batch for LogBatchEncoder (apache#209)
Browse files Browse the repository at this point in the history
  • Loading branch information
ygf11 authored Aug 22, 2022
1 parent a7c44db commit 4dd2cc0
Show file tree
Hide file tree
Showing 9 changed files with 67 additions and 40 deletions.
20 changes: 8 additions & 12 deletions analytic_engine/src/instance/alter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,12 +117,10 @@ impl Instance {
region_id: table_data.wal_region_id(),
})?;

let log_batch = log_batch_encoder
.encode(&[payload])
.context(EncodePayloads {
table: &table_data.name,
region_id: table_data.wal_region_id(),
})?;
let log_batch = log_batch_encoder.encode(&payload).context(EncodePayloads {
table: &table_data.name,
region_id: table_data.wal_region_id(),
})?;

// Write log batch
let write_ctx = WriteContext::default();
Expand Down Expand Up @@ -290,12 +288,10 @@ impl Instance {
region_id: table_data.wal_region_id(),
})?;

let log_batch = log_batch_encoder
.encode(&[payload])
.context(EncodePayloads {
table: &table_data.name,
region_id: table_data.wal_region_id(),
})?;
let log_batch = log_batch_encoder.encode(&payload).context(EncodePayloads {
table: &table_data.name,
region_id: table_data.wal_region_id(),
})?;

// Write log batch
let write_ctx = WriteContext::default();
Expand Down
10 changes: 4 additions & 6 deletions analytic_engine/src/instance/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -374,12 +374,10 @@ impl Instance {
table: &table_data.name,
region_id: table_data.wal_region_id(),
})?;
let log_batch = log_batch_encoder
.encode(&[payload])
.context(EncodePayloads {
table: &table_data.name,
region_id: table_data.wal_region_id(),
})?;
let log_batch = log_batch_encoder.encode(&payload).context(EncodePayloads {
table: &table_data.name,
region_id: table_data.wal_region_id(),
})?;

// Write to wal manager
let write_ctx = WriteContext::default();
Expand Down
12 changes: 2 additions & 10 deletions analytic_engine/src/meta/details.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ impl ManifestImpl {
.encoder(region_id)
.context(GetLogBatchEncoder { region_id })?;
let log_batch = log_batch_encoder
.encode(&[payload])
.encode(&payload)
.context(EncodePayloads { region_id })?;

let write_ctx = WriteContext::default();
Expand Down Expand Up @@ -331,21 +331,13 @@ impl MetaUpdateLogStore for RegionWal {
}

async fn store(&self, log_entries: &[MetaUpdateLogEntry]) -> Result<()> {
let mut payload_batch = Vec::with_capacity(log_entries.len());

// TODO(ygf11): maybe we can build payload in encode loop.
for entry in log_entries {
let payload = MetaUpdatePayload::from(entry);
payload_batch.push(payload);
}

let region_id = self.region_id;
let log_batch_encoder = self
.wal_manager
.encoder(self.region_id)
.context(GetLogBatchEncoder { region_id })?;
let log_batch = log_batch_encoder
.encode(&payload_batch)
.encode_batch::<MetaUpdatePayload, MetaUpdateLogEntry>(log_entries)
.context(EncodePayloads { region_id })?;

let write_ctx = WriteContext::default();
Expand Down
6 changes: 6 additions & 0 deletions analytic_engine/src/payload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,12 @@ impl<'a> Payload for WritePayload<'a> {
}
}

impl<'a> From<&'a table_requests::WriteRequest> for WritePayload<'a> {
fn from(write_request: &'a table_requests::WriteRequest) -> Self {
Self::Write(write_request)
}
}

/// Payload decoded from wal
#[derive(Debug)]
pub enum ReadPayload {
Expand Down
6 changes: 6 additions & 0 deletions benchmarks/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,3 +192,9 @@ impl<'a> Payload for WritePayload<'a> {
Ok(())
}
}

impl<'a> From<&'a Vec<u8>> for WritePayload<'a> {
fn from(data: &'a Vec<u8>) -> Self {
Self(data)
}
}
7 changes: 1 addition & 6 deletions benchmarks/src/wal_write_bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,16 +74,11 @@ impl WalWriteBench {
.expect("should succeed to open WalNamespaceImpl(Memory)");

let values = self.build_value_vec();
let payloads = values
.iter()
.map(|value| WritePayload(value))
.collect::<Vec<_>>();

let wal_encoder = wal
.encoder(1)
.expect("should succeed to create wal encoder");
let log_batch = wal_encoder
.encode(&payloads)
.encode_batch::<WritePayload, Vec<u8>>(values.as_slice())
.expect("should succeed to encode payload batch");

// Write to wal manager
Expand Down
32 changes: 29 additions & 3 deletions wal/src/kv_encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -527,19 +527,45 @@ pub struct LogBatchEncoder {
}

impl LogBatchEncoder {
/// Create LogBatchEncoder with specific region_id.
pub fn create(region_id: RegionId) -> Self {
Self {
region_id,
log_encoding: LogEncoding::newest(),
}
}

pub fn encode<P: Payload>(self, payload_batch: &[P]) -> manager::Result<LogWriteBatch> {
/// Consume LogBatchEncoder and encode single payload to LogWriteBatch.
pub fn encode(self, payload: &impl Payload) -> manager::Result<LogWriteBatch> {
let mut write_batch = LogWriteBatch::new(self.region_id);
let mut buf = BytesMut::new();
for payload in payload_batch.iter() {
self.log_encoding
.encode_value(&mut buf, payload)
.map_err(|e| Box::new(e) as _)
.context(Encoding)?;

write_batch.push(LogWriteEntry {
payload: buf.to_vec(),
});

Ok(write_batch)
}

/// Consume LogBatchEncoder and encode raw payload batch to LogWriteBatch.
/// Note: To build payload from raw payload in `encode_batch`, raw payload
/// need implement From trait.
pub fn encode_batch<'a, P: Payload, I>(
self,
raw_payload_batch: &'a [I],
) -> manager::Result<LogWriteBatch>
where
&'a I: Into<P>,
{
let mut write_batch = LogWriteBatch::new(self.region_id);
let mut buf = BytesMut::new();
for raw_payload in raw_payload_batch.iter() {
self.log_encoding
.encode_value(&mut buf, payload)
.encode_value(&mut buf, &raw_payload.into())
.map_err(|e| Box::new(e) as _)
.context(Encoding)?;

Expand Down
3 changes: 2 additions & 1 deletion wal/src/table_kv_impl/namespace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1699,9 +1699,10 @@ mod tests {
payload_batch.push(payload);
}

let log_entries = (start_sequence..end_sequence).collect::<Vec<_>>();
let wal_encoder = LogBatchEncoder::create(region_id);
let log_batch = wal_encoder
.encode(&payload_batch)
.encode_batch::<TestPayload, u32>(&log_entries)
.expect("should succeed to encode payload batch");
let write_ctx = manager::WriteContext::default();
namespace
Expand Down
11 changes: 9 additions & 2 deletions wal/src/tests/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,16 +148,17 @@ impl<B: WalBuilder> TestEnv<B> {
start: u32,
end: u32,
) -> (Vec<TestPayload>, LogWriteBatch) {
let payload_batch = self.build_payload_batch(start, end);
let log_entries = (start..end).collect::<Vec<_>>();

let log_batch_encoder = wal
.encoder(region_id)
.expect("should succeed to create log batch encoder");

let log_batch = log_batch_encoder
.encode(&payload_batch)
.encode_batch::<TestPayload, u32>(&log_entries)
.expect("should succeed to encode payloads");

let payload_batch = self.build_payload_batch(start, end);
(payload_batch, log_batch)
}

Expand Down Expand Up @@ -219,6 +220,12 @@ impl Payload for TestPayload {
}
}

impl From<&u32> for TestPayload {
fn from(v: &u32) -> Self {
Self { val: *v }
}
}

pub struct TestPayloadDecoder;

impl PayloadDecoder for TestPayloadDecoder {
Expand Down

0 comments on commit 4dd2cc0

Please sign in to comment.