Skip to content

Commit

Permalink
fix: flush of one table might be triggered multiple times (apache#236)
Browse files Browse the repository at this point in the history
* fix: flush of one table might be triggered multiple times

* add: checking  before performing operations

* fix: checking worker id before performing operations

* fix: format number

* fix: fmt

* add: Add verification of worker id and table id to write, flush, and dump
  • Loading branch information
dust1 authored Sep 13, 2022
1 parent df2bfed commit c150bad
Show file tree
Hide file tree
Showing 6 changed files with 85 additions and 8 deletions.
12 changes: 12 additions & 0 deletions analytic_engine/src/instance/drop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,18 @@ impl Instance {
return Ok(false);
}

worker_local
.validate_table_data(
&table_data.name,
table_data.id.as_u64() as usize,
self.write_group_worker_num,
)
.context(OperateByWriteWorker {
space_id: table_data.space_id,
table: &table_data.name,
table_id: table_data.id,
})?;

// Fixme(xikai): Trigger a force flush so that the data of the table in the wal
// is marked for deletable. However, the overhead of the flushing can
// be avoided.
Expand Down
8 changes: 8 additions & 0 deletions analytic_engine/src/instance/flush_compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,14 @@ impl Instance {
worker_local: &mut WorkerLocal,
table_data: &TableDataRef,
) -> Result<TableFlushRequest> {
worker_local
.validate_table_data(
&table_data.name,
table_data.id.as_u64() as usize,
self.write_group_worker_num,
)
.context(BackgroundFlushFailed)?;

let current_version = table_data.current_version();
let last_sequence = table_data.last_sequence();
// Switch (freeze) all mutable memtables. And update segment duration if
Expand Down
23 changes: 20 additions & 3 deletions analytic_engine/src/instance/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,15 @@ impl Instance {
}
);

let worker_id = worker_local.worker_id();
worker_local
.validate_table_data(
&table_data.name,
table_data.id.as_u64() as usize,
self.write_group_worker_num,
)
.context(Write)?;

// Checks schema compatibility.
table_data
.schema()
Expand All @@ -334,7 +343,7 @@ impl Instance {

if self.should_flush_instance() {
if let Some(space) = self.space_store.find_maximum_memory_usage_space() {
if let Some(table) = space.find_maximum_memory_usage_table() {
if let Some(table) = space.find_maximum_memory_usage_table(worker_id) {
info!("Trying to flush table {} bytes {} in space {} because engine total memtable memory usage exceeds db_write_buffer_size {}.",
table.name,
table.memtable_memory_usage(),
Expand All @@ -347,7 +356,7 @@ impl Instance {
}

if space.should_flush_space() {
if let Some(table) = space.find_maximum_memory_usage_table() {
if let Some(table) = space.find_maximum_memory_usage_table(worker_id) {
info!("Trying to flush table {} bytes {} in space {} because space total memtable memory usage exceeds space_write_buffer_size {}.",
table.name,
table.memtable_memory_usage() ,
Expand All @@ -368,10 +377,18 @@ impl Instance {
/// Write log_batch into wal, return the sequence number of log_batch.
async fn write_to_wal(
&self,
_worker_local: &WorkerLocal,
worker_local: &WorkerLocal,
table_data: &TableData,
encoded_rows: Vec<ByteVec>,
) -> Result<SequenceNumber> {
worker_local
.validate_table_data(
&table_data.name,
table_data.id.as_u64() as usize,
self.write_group_worker_num,
)
.context(Write)?;

// Convert into pb
let mut write_req_pb = table_requests::WriteRequest::new();
// Use the table schema instead of the schema in request to avoid schema
Expand Down
32 changes: 32 additions & 0 deletions analytic_engine/src/instance/write_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,13 @@ pub enum Error {
Channel {
source: Box<dyn std::error::Error + Send + Sync>,
},

#[snafu(display(
"Failed to manipulate table data, this table:{} does not belong to this worker: {}",
table,
worker_id
))]
DataNotLegal { table: String, worker_id: usize },
}

define_result!(Error);
Expand Down Expand Up @@ -285,6 +292,27 @@ impl WorkerLocal {
let data = self.data.clone();
CompactionNotifier::new(data)
}

pub fn worker_id(&self) -> usize {
self.data.as_ref().id
}

pub fn validate_table_data(
&self,
table_name: &str,
table_id: usize,
worker_num: usize,
) -> Result<()> {
let worker_id = self.data.as_ref().id;
if table_id % worker_num != worker_id {
return DataNotLegal {
table: table_name,
worker_id,
}
.fail();
}
Ok(())
}
}

/// Write table command.
Expand Down Expand Up @@ -643,6 +671,10 @@ impl WriteGroup {

WriteHandle { worker_data }
}

pub fn worker_num(&self) -> usize {
self.worker_datas.len()
}
}

/// Data of write worker
Expand Down
8 changes: 5 additions & 3 deletions analytic_engine/src/space.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,13 +113,15 @@ impl Space {
self.write_buffer_size > 0 && self.memtable_memory_usage() >= self.write_buffer_size
}

/// Find the table in space which it's memtable consumes maximum memory.
/// Find the table whose memtable consumes the most memory in the space by
/// specifying Worker.
#[inline]
pub fn find_maximum_memory_usage_table(&self) -> Option<TableDataRef> {
pub fn find_maximum_memory_usage_table(&self, worker_index: usize) -> Option<TableDataRef> {
let worker_num = self.write_group.worker_num();
self.table_datas
.read()
.unwrap()
.find_maximum_memory_usage_table()
.find_maximum_memory_usage_table(worker_num, worker_index)
}

#[inline]
Expand Down
10 changes: 8 additions & 2 deletions analytic_engine/src/table/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -529,10 +529,16 @@ impl TableDataSet {
self.table_datas.len()
}

/// Find the table which consumes maximum memtable memory usag.
pub fn find_maximum_memory_usage_table(&self) -> Option<TableDataRef> {
/// Find the table that the current WorkerLocal belongs to and consumes the
/// largest memtable memory usage.
pub fn find_maximum_memory_usage_table(
&self,
worker_num: usize,
worker_index: usize,
) -> Option<TableDataRef> {
self.table_datas
.values()
.filter(|t| t.id.as_u64() as usize % worker_num == worker_index)
.max_by_key(|t| t.memtable_memory_usage())
.cloned()
}
Expand Down

0 comments on commit c150bad

Please sign in to comment.