Skip to content

Commit 9e72044

Browse files
authored
[improvement](spill) improve config and fix spill bugs (#33519)
1 parent 53f2143 commit 9e72044

15 files changed

+247
-119
lines changed

be/src/common/config.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1160,13 +1160,13 @@ DEFINE_mDouble(high_disk_avail_level_diff_usages, "0.15");
11601160
DEFINE_Int32(partition_disk_index_lru_size, "10000");
11611161
// limit the storage space that query spill files can use
11621162
DEFINE_String(spill_storage_root_path, "${DORIS_HOME}/storage");
1163-
DEFINE_mInt64(spill_storage_limit, "10737418240"); // 10G
1164-
DEFINE_mInt32(spill_gc_interval_ms, "2000"); // 2s
1163+
DEFINE_String(spill_storage_limit, "20%"); // 20%
1164+
DEFINE_mInt32(spill_gc_interval_ms, "2000"); // 2s
1165+
DEFINE_mInt32(spill_gc_file_count, "2000");
11651166
DEFINE_Int32(spill_io_thread_pool_per_disk_thread_num, "2");
11661167
DEFINE_Int32(spill_io_thread_pool_queue_size, "1024");
11671168
DEFINE_Int32(spill_async_task_thread_pool_thread_num, "2");
11681169
DEFINE_Int32(spill_async_task_thread_pool_queue_size, "1024");
1169-
DEFINE_mInt32(spill_mem_warning_water_mark_multiplier, "2");
11701170

11711171
DEFINE_mBool(check_segment_when_build_rowset_meta, "false");
11721172

be/src/common/config.h

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1242,13 +1242,20 @@ DECLARE_mDouble(high_disk_avail_level_diff_usages);
12421242
// create tablet in partition random robin idx lru size, default 10000
12431243
DECLARE_Int32(partition_disk_index_lru_size);
12441244
DECLARE_String(spill_storage_root_path);
1245-
DECLARE_mInt64(spill_storage_limit);
1245+
// Spill storage limit specified as number of bytes
1246+
// ('<int>[bB]?'), megabytes ('<float>[mM]'), gigabytes ('<float>[gG]'),
1247+
// or percentage of capaity ('<int>%').
1248+
// Defaults to bytes if no unit is given.
1249+
// Must larger than 0.
1250+
// If specified as percentage, the final limit value is:
1251+
// disk_capacity_bytes * storage_flood_stage_usage_percent * spill_storage_limit
1252+
DECLARE_String(spill_storage_limit);
12461253
DECLARE_mInt32(spill_gc_interval_ms);
1254+
DECLARE_mInt32(spill_gc_file_count);
12471255
DECLARE_Int32(spill_io_thread_pool_per_disk_thread_num);
12481256
DECLARE_Int32(spill_io_thread_pool_queue_size);
12491257
DECLARE_Int32(spill_async_task_thread_pool_thread_num);
12501258
DECLARE_Int32(spill_async_task_thread_pool_queue_size);
1251-
DECLARE_mInt32(spill_mem_warning_water_mark_multiplier);
12521259

12531260
DECLARE_mBool(check_segment_when_build_rowset_meta);
12541261

be/src/olap/storage_engine.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ class StorageEngine final : public BaseStorageEngine {
170170
// get all info of root_path
171171
Status get_all_data_dir_info(std::vector<DataDirInfo>* data_dir_infos, bool need_update);
172172

173-
int64_t get_file_or_directory_size(const std::string& file_path);
173+
static int64_t get_file_or_directory_size(const std::string& file_path);
174174

175175
// get root path for creating tablet. The returned vector of root path should be round robin,
176176
// for avoiding that all the tablet would be deployed one disk.

be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -249,14 +249,15 @@ Status PartitionedAggSinkLocalState::revoke_memory(RuntimeState* state) {
249249

250250
auto execution_context = state->get_task_execution_context();
251251
_shared_state_holder = _shared_state->shared_from_this();
252+
auto query_id = state->query_id();
252253

253254
MonotonicStopWatch submit_timer;
254255
submit_timer.start();
255256
status = ExecEnv::GetInstance()->spill_stream_mgr()->get_async_task_thread_pool()->submit_func(
256-
[this, &parent, state, execution_context, submit_timer] {
257+
[this, &parent, state, query_id, execution_context, submit_timer] {
257258
auto execution_context_lock = execution_context.lock();
258259
if (!execution_context_lock) {
259-
LOG(INFO) << "query " << print_id(state->query_id())
260+
LOG(INFO) << "query " << print_id(query_id)
260261
<< " execution_context released, maybe query was cancelled.";
261262
return Status::Cancelled("Cancelled");
262263
}
@@ -267,13 +268,13 @@ Status PartitionedAggSinkLocalState::revoke_memory(RuntimeState* state) {
267268
if (!_shared_state->sink_status.ok() || state->is_cancelled()) {
268269
if (!_shared_state->sink_status.ok()) {
269270
LOG(WARNING)
270-
<< "query " << print_id(state->query_id()) << " agg node "
271+
<< "query " << print_id(query_id) << " agg node "
271272
<< Base::_parent->id()
272273
<< " revoke_memory error: " << Base::_shared_state->sink_status;
273274
}
274275
_shared_state->close();
275276
} else {
276-
VLOG_DEBUG << "query " << print_id(state->query_id()) << " agg node "
277+
VLOG_DEBUG << "query " << print_id(query_id) << " agg node "
277278
<< Base::_parent->id() << " revoke_memory finish"
278279
<< ", eos: " << _eos;
279280
}

be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -205,16 +205,17 @@ Status PartitionedAggLocalState::initiate_merge_spill_partition_agg_data(Runtime
205205

206206
auto execution_context = state->get_task_execution_context();
207207
_shared_state_holder = _shared_state->shared_from_this();
208+
auto query_id = state->query_id();
208209

209210
MonotonicStopWatch submit_timer;
210211
submit_timer.start();
211212

212213
RETURN_IF_ERROR(
213214
ExecEnv::GetInstance()->spill_stream_mgr()->get_async_task_thread_pool()->submit_func(
214-
[this, state, execution_context, submit_timer] {
215+
[this, state, query_id, execution_context, submit_timer] {
215216
auto execution_context_lock = execution_context.lock();
216217
if (!execution_context_lock) {
217-
LOG(INFO) << "query " << print_id(state->query_id())
218+
LOG(INFO) << "query " << print_id(query_id)
218219
<< " execution_context released, maybe query was cancelled.";
219220
// FIXME: return status is meaningless?
220221
return Status::Cancelled("Cancelled");
@@ -225,14 +226,14 @@ Status PartitionedAggLocalState::initiate_merge_spill_partition_agg_data(Runtime
225226
Defer defer {[&]() {
226227
if (!_status.ok() || state->is_cancelled()) {
227228
if (!_status.ok()) {
228-
LOG(WARNING) << "query " << print_id(state->query_id())
229-
<< " agg node " << _parent->node_id()
229+
LOG(WARNING) << "query " << print_id(query_id) << " agg node "
230+
<< _parent->node_id()
230231
<< " merge spilled agg data error: " << _status;
231232
}
232233
_shared_state->close();
233234
} else if (_shared_state->spill_partitions.empty()) {
234-
VLOG_DEBUG << "query " << print_id(state->query_id())
235-
<< " agg node " << _parent->node_id()
235+
VLOG_DEBUG << "query " << print_id(query_id) << " agg node "
236+
<< _parent->node_id()
236237
<< " merge spilled agg data finish";
237238
}
238239
Base::_shared_state->in_mem_shared_state->aggregate_data_container

be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -493,6 +493,7 @@ Status PartitionedHashJoinProbeOperatorX::prepare(RuntimeState* state) {
493493
// to avoid prepare _child_x twice
494494
auto child_x = std::move(_child_x);
495495
RETURN_IF_ERROR(JoinProbeOperatorX::prepare(state));
496+
RETURN_IF_ERROR(vectorized::VExpr::prepare(_output_expr_ctxs, state, *_intermediate_row_desc));
496497
RETURN_IF_ERROR(_inner_probe_operator->set_child(child_x));
497498
DCHECK(_build_side_child != nullptr);
498499
_inner_probe_operator->set_build_side_child(_build_side_child);
@@ -648,6 +649,8 @@ Status PartitionedHashJoinProbeOperatorX::pull(doris::RuntimeState* state,
648649
}
649650
}
650651

652+
const auto partition_index = local_state._partition_cursor;
653+
auto& probe_blocks = local_state._probe_blocks[partition_index];
651654
if (local_state._need_to_setup_internal_operators) {
652655
*eos = false;
653656
bool has_data = false;
@@ -659,12 +662,13 @@ Status PartitionedHashJoinProbeOperatorX::pull(doris::RuntimeState* state,
659662
}
660663
RETURN_IF_ERROR(_setup_internal_operators(local_state, state));
661664
local_state._need_to_setup_internal_operators = false;
665+
auto& mutable_block = local_state._partitioned_blocks[partition_index];
666+
if (mutable_block && !mutable_block->empty()) {
667+
probe_blocks.emplace_back(mutable_block->to_block());
668+
}
662669
}
663-
664-
auto partition_index = local_state._partition_cursor;
665-
bool in_mem_eos_;
670+
bool in_mem_eos = false;
666671
auto* runtime_state = local_state._runtime_state.get();
667-
auto& probe_blocks = local_state._probe_blocks[partition_index];
668672
while (_inner_probe_operator->need_more_input_data(runtime_state)) {
669673
if (probe_blocks.empty()) {
670674
*eos = false;
@@ -682,14 +686,16 @@ Status PartitionedHashJoinProbeOperatorX::pull(doris::RuntimeState* state,
682686

683687
auto block = std::move(probe_blocks.back());
684688
probe_blocks.pop_back();
685-
RETURN_IF_ERROR(_inner_probe_operator->push(runtime_state, &block, false));
689+
if (!block.empty()) {
690+
RETURN_IF_ERROR(_inner_probe_operator->push(runtime_state, &block, false));
691+
}
686692
}
687693

688694
RETURN_IF_ERROR(_inner_probe_operator->pull(local_state._runtime_state.get(), output_block,
689-
&in_mem_eos_));
695+
&in_mem_eos));
690696

691697
*eos = false;
692-
if (in_mem_eos_) {
698+
if (in_mem_eos) {
693699
local_state._partition_cursor++;
694700
if (local_state._partition_cursor == _partition_count) {
695701
*eos = true;
@@ -829,6 +835,10 @@ Status PartitionedHashJoinProbeOperatorX::get_block(RuntimeState* state, vectori
829835
RETURN_IF_ERROR(local_state.finish_spilling(0));
830836
}
831837

838+
if (local_state._child_block->rows() == 0 && !local_state._child_eos) {
839+
return Status::OK();
840+
}
841+
832842
Defer defer([&] { local_state._child_block->clear_column_data(); });
833843
if (need_to_spill) {
834844
SCOPED_TIMER(local_state.exec_time_counter());

be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,8 @@ Status PartitionedHashJoinSinkLocalState::revoke_memory(RuntimeState* state) {
9393
DCHECK_EQ(_spilling_streams_count, 0);
9494

9595
if (!_shared_state->need_to_spill) {
96+
profile()->add_info_string("Spilled", "true");
97+
_shared_state->need_to_spill = true;
9698
auto& p = _parent->cast<PartitionedHashJoinSinkOperatorX>();
9799
_shared_state->inner_shared_state->hash_table_variants.reset();
98100
auto row_desc = p._child_x->row_desc();
@@ -172,7 +174,6 @@ Status PartitionedHashJoinSinkLocalState::revoke_memory(RuntimeState* state) {
172174
}
173175

174176
if (_spilling_streams_count > 0) {
175-
_shared_state->need_to_spill = true;
176177
std::unique_lock<std::mutex> lock(_spill_lock);
177178
if (_spilling_streams_count > 0) {
178179
_dependency->block();
@@ -202,7 +203,8 @@ Status PartitionedHashJoinSinkLocalState::_partition_block(RuntimeState* state,
202203
SCOPED_TIMER(_partition_shuffle_timer);
203204
auto* channel_ids = reinterpret_cast<uint64_t*>(_partitioner->get_channel_ids());
204205
std::vector<uint32_t> partition_indexes[p._partition_count];
205-
for (uint32_t i = 0; i != rows; ++i) {
206+
DCHECK_LT(begin, end);
207+
for (size_t i = begin; i != end; ++i) {
206208
partition_indexes[channel_ids[i]].emplace_back(i);
207209
}
208210

be/src/pipeline/exec/spill_sort_sink_operator.cpp

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -105,9 +105,10 @@ Status SpillSortSinkLocalState::setup_in_memory_sort_op(RuntimeState* state) {
105105
auto* sink_local_state = _runtime_state->get_sink_local_state();
106106
DCHECK(sink_local_state != nullptr);
107107

108-
_profile->add_info_string("TOP-N", *sink_local_state->profile()->get_info_string("TOP-N"));
108+
RETURN_IF_ERROR(sink_local_state->open(state));
109109

110-
return sink_local_state->open(state);
110+
_profile->add_info_string("TOP-N", *sink_local_state->profile()->get_info_string("TOP-N"));
111+
return Status::OK();
111112
}
112113

113114
SpillSortSinkOperatorX::SpillSortSinkOperatorX(ObjectPool* pool, int operator_id,
@@ -230,17 +231,19 @@ Status SpillSortSinkLocalState::revoke_memory(RuntimeState* state) {
230231

231232
auto execution_context = state->get_task_execution_context();
232233
_shared_state_holder = _shared_state->shared_from_this();
234+
auto query_id = state->query_id();
233235

234236
MonotonicStopWatch submit_timer;
235237
submit_timer.start();
236238

237239
status = ExecEnv::GetInstance()
238240
->spill_stream_mgr()
239241
->get_spill_io_thread_pool(_spilling_stream->get_spill_root_dir())
240-
->submit_func([this, state, &parent, execution_context, submit_timer] {
242+
->submit_func([this, state, query_id, &parent, execution_context,
243+
submit_timer] {
241244
auto execution_context_lock = execution_context.lock();
242245
if (!execution_context_lock) {
243-
LOG(INFO) << "query " << print_id(state->query_id())
246+
LOG(INFO) << "query " << print_id(query_id)
244247
<< " execution_context released, maybe query was cancelled.";
245248
return Status::OK();
246249
}
@@ -250,16 +253,14 @@ Status SpillSortSinkLocalState::revoke_memory(RuntimeState* state) {
250253
Defer defer {[&]() {
251254
if (!_shared_state->sink_status.ok() || state->is_cancelled()) {
252255
if (!_shared_state->sink_status.ok()) {
253-
LOG(WARNING) << "query " << print_id(state->query_id())
254-
<< " sort node " << _parent->id()
255-
<< " revoke memory error: "
256+
LOG(WARNING) << "query " << print_id(query_id) << " sort node "
257+
<< _parent->id() << " revoke memory error: "
256258
<< _shared_state->sink_status;
257259
}
258260
_shared_state->close();
259261
} else {
260-
VLOG_DEBUG << "query " << print_id(state->query_id())
261-
<< " sort node " << _parent->id()
262-
<< " revoke memory finish";
262+
VLOG_DEBUG << "query " << print_id(query_id) << " sort node "
263+
<< _parent->id() << " revoke memory finish";
263264
}
264265

265266
_spilling_stream->end_spill(_shared_state->sink_status);

be/src/pipeline/exec/spill_sort_source_operator.cpp

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -89,14 +89,15 @@ Status SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* stat
8989

9090
auto execution_context = state->get_task_execution_context();
9191
_shared_state_holder = _shared_state->shared_from_this();
92+
auto query_id = state->query_id();
9293

9394
MonotonicStopWatch submit_timer;
9495
submit_timer.start();
9596

96-
auto spill_func = [this, state, &parent, execution_context, submit_timer] {
97+
auto spill_func = [this, state, query_id, &parent, execution_context, submit_timer] {
9798
auto execution_context_lock = execution_context.lock();
9899
if (!execution_context_lock) {
99-
LOG(INFO) << "query " << print_id(state->query_id())
100+
LOG(INFO) << "query " << print_id(query_id)
100101
<< " execution_context released, maybe query was cancelled.";
101102
return Status::OK();
102103
}
@@ -107,7 +108,7 @@ Status SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* stat
107108
Defer defer {[&]() {
108109
if (!_status.ok() || state->is_cancelled()) {
109110
if (!_status.ok()) {
110-
LOG(WARNING) << "query " << print_id(state->query_id()) << " sort node "
111+
LOG(WARNING) << "query " << print_id(query_id) << " sort node "
111112
<< _parent->node_id() << " merge spill data error: " << _status;
112113
}
113114
_shared_state->close();
@@ -116,16 +117,16 @@ Status SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* stat
116117
}
117118
_current_merging_streams.clear();
118119
} else {
119-
VLOG_DEBUG << "query " << print_id(state->query_id()) << " sort node "
120-
<< _parent->node_id() << " merge spill data finish";
120+
VLOG_DEBUG << "query " << print_id(query_id) << " sort node " << _parent->node_id()
121+
<< " merge spill data finish";
121122
}
122123
_dependency->Dependency::set_ready();
123124
}};
124125
vectorized::Block merge_sorted_block;
125126
vectorized::SpillStreamSPtr tmp_stream;
126127
while (!state->is_cancelled()) {
127128
int max_stream_count = _calc_spill_blocks_to_merge();
128-
VLOG_DEBUG << "query " << print_id(state->query_id()) << " sort node " << _parent->id()
129+
VLOG_DEBUG << "query " << print_id(query_id) << " sort node " << _parent->id()
129130
<< " merge spill streams, streams count: "
130131
<< _shared_state->sorted_streams.size()
131132
<< ", curren merge max stream count: " << max_stream_count;

be/src/runtime/exec_env_init.cpp

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,12 @@ Status ExecEnv::_init(const std::vector<StorePath>& store_paths,
151151
if (ready()) {
152152
return Status::OK();
153153
}
154+
std::unordered_map<std::string, std::unique_ptr<vectorized::SpillDataDir>> spill_store_map;
155+
for (const auto& spill_path : spill_store_paths) {
156+
spill_store_map.emplace(spill_path.path, std::make_unique<vectorized::SpillDataDir>(
157+
spill_path.path, spill_path.capacity_bytes,
158+
spill_path.storage_medium));
159+
}
154160
init_doris_metrics(store_paths);
155161
_store_paths = store_paths;
156162
_tmp_file_dirs = std::make_unique<segment_v2::TmpFileDirs>(_store_paths);
@@ -246,7 +252,7 @@ Status ExecEnv::_init(const std::vector<StorePath>& store_paths,
246252
_wal_manager = WalManager::create_shared(this, config::group_commit_wal_path);
247253
_dns_cache = new DNSCache();
248254
_write_cooldown_meta_executors = std::make_unique<WriteCooldownMetaExecutors>();
249-
_spill_stream_mgr = new vectorized::SpillStreamManager(spill_store_paths);
255+
_spill_stream_mgr = new vectorized::SpillStreamManager(std::move(spill_store_map));
250256
_backend_client_cache->init_metrics("backend");
251257
_frontend_client_cache->init_metrics("frontend");
252258
_broker_client_cache->init_metrics("broker");

0 commit comments

Comments
 (0)