Skip to content

Commit e91e3d4

Browse files
authored
Merge branch 'master' into add-report-tablet-metrics
2 parents fc9cd09 + 1baeaef commit e91e3d4

File tree

55 files changed

+909
-387
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

55 files changed

+909
-387
lines changed

be/src/common/config.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1091,6 +1091,7 @@ DEFINE_Bool(enable_graceful_exit_check, "false");
10911091
DEFINE_Bool(enable_debug_points, "false");
10921092

10931093
DEFINE_Int32(pipeline_executor_size, "0");
1094+
DEFINE_Int32(blocking_pipeline_executor_size, "0");
10941095
DEFINE_Bool(enable_workload_group_for_scan, "false");
10951096
DEFINE_mInt64(workload_group_scan_task_wait_timeout_ms, "10000");
10961097

@@ -1626,6 +1627,8 @@ DEFINE_Validator(binary_plain_encoding_default_impl, [](const std::string& confi
16261627
return config == "v1" || config == "v2";
16271628
});
16281629

1630+
DEFINE_mBool(integer_type_default_use_plain_encoding, "true");
1631+
16291632
// clang-format off
16301633
#ifdef BE_TEST
16311634
// test s3

be/src/common/config.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1122,6 +1122,7 @@ DECLARE_Bool(enable_graceful_exit_check);
11221122
DECLARE_Bool(enable_debug_points);
11231123

11241124
DECLARE_Int32(pipeline_executor_size);
1125+
DECLARE_Int32(blocking_pipeline_executor_size);
11251126

11261127
// block file cache
11271128
DECLARE_Bool(enable_file_cache);
@@ -1666,6 +1667,8 @@ DECLARE_mString(aws_credentials_provider_version);
16661667

16671668
DECLARE_mString(binary_plain_encoding_default_impl);
16681669

1670+
DECLARE_mBool(integer_type_default_use_plain_encoding);
1671+
16691672
#ifdef BE_TEST
16701673
// test s3
16711674
DECLARE_String(test_s3_resource);

be/src/olap/memtable.cpp

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ namespace doris {
4545
#include "common/compile_check_begin.h"
4646

4747
bvar::Adder<int64_t> g_memtable_cnt("memtable_cnt");
48+
bvar::Adder<uint64_t> g_flush_cuz_memtable_full("flush_cuz_memtable_full");
4849

4950
using namespace ErrorCode;
5051

@@ -665,7 +666,12 @@ bool MemTable::need_flush() const {
665666
max_size = max_size * update_columns_size / _tablet_schema->num_columns();
666667
max_size = max_size > min_buffer_size ? max_size : min_buffer_size;
667668
}
668-
return memory_usage() >= max_size;
669+
670+
if (memory_usage() >= max_size) {
671+
g_flush_cuz_memtable_full << 1;
672+
return true;
673+
}
674+
return false;
669675
}
670676

671677
int64_t MemTable::_adaptive_write_buffer_size() const {

be/src/olap/memtable_memory_limiter.cpp

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@ bvar::Status<int64_t> g_memtable_flush_memory("mm_limiter_mem_flush", 0);
4040
bvar::Status<int64_t> g_memtable_load_memory("mm_limiter_mem_load", 0);
4141
bvar::Status<int64_t> g_load_hard_mem_limit("mm_limiter_limit_hard", 0);
4242
bvar::Status<int64_t> g_load_soft_mem_limit("mm_limiter_limit_soft", 0);
43+
bvar::Adder<uint64_t> g_flush_cuz_load_mem_exceed_hard_limit("flush_cuz_hard_limit");
44+
bvar::Adder<uint64_t> g_flush_cuz_sys_mem_exceed_soft_limit("flush_cuz_soft_limit");
4345
bvar::Adder<int> g_memtable_memory_limit_flush_memtable_count("mm_limiter_flush_memtable_count");
4446
bvar::LatencyRecorder g_memtable_memory_limit_flush_size_bytes("mm_limiter_flush_size_bytes");
4547

@@ -115,7 +117,7 @@ int64_t MemTableMemoryLimiter::_need_flush() {
115117
int64_t limit1 = _mem_tracker->consumption() - _load_soft_mem_limit;
116118
int64_t limit2 = _sys_avail_mem_less_than_warning_water_mark();
117119
int64_t limit3 = _process_used_mem_more_than_soft_mem_limit();
118-
int64_t need_flush = std::max(limit1, std::max(limit2, limit3));
120+
int64_t need_flush = std::max({limit1, limit2, limit3});
119121
return need_flush - _queue_mem_usage - _flush_mem_usage;
120122
}
121123

@@ -166,6 +168,13 @@ void MemTableMemoryLimiter::handle_memtable_flush(std::function<bool()> cancel_c
166168
->process_memory_detail_str();
167169
LOG_LONG_STRING(INFO, log_str);
168170
}
171+
if (limit == Limit::HARD) {
172+
g_flush_cuz_load_mem_exceed_hard_limit << 1;
173+
} else if (limit == Limit::SOFT) {
174+
g_flush_cuz_sys_mem_exceed_soft_limit << 1;
175+
} else {
176+
// will not reach here
177+
}
169178
_flush_active_memtables(need_flush);
170179
}
171180
} while (_hard_limit_reached() && !_load_usage_low());

be/src/olap/memtable_writer.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@
4646
#include "vec/core/block.h"
4747

4848
namespace doris {
49+
bvar::Adder<uint64_t> g_flush_cuz_rowscnt_oveflow("flush_cuz_rowscnt_oveflow");
50+
4951
using namespace ErrorCode;
5052

5153
MemTableWriter::MemTableWriter(const WriteRequest& req) : _req(req) {}
@@ -108,6 +110,7 @@ Status MemTableWriter::write(const vectorized::Block* block,
108110
DBUG_EXECUTE_IF("MemTableWriter.too_many_raws",
109111
{ raw_rows = std::numeric_limits<int32_t>::max(); });
110112
if (raw_rows + row_idxs.size() > std::numeric_limits<int32_t>::max()) {
113+
g_flush_cuz_rowscnt_oveflow << 1;
111114
RETURN_IF_ERROR(_flush_memtable());
112115
}
113116

be/src/olap/rowset/segment_v2/encoding_info.cpp

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
#include <gen_cpp/segment_v2.pb.h>
2121

22+
#include <array>
2223
#include <iterator>
2324
#include <type_traits>
2425
#include <unordered_map>
@@ -317,6 +318,80 @@ EncodingInfoResolver::~EncodingInfoResolver() {
317318
_encoding_map.clear();
318319
}
319320

321+
namespace {
322+
bool is_integer_type(FieldType type) {
323+
return type == FieldType::OLAP_FIELD_TYPE_TINYINT ||
324+
type == FieldType::OLAP_FIELD_TYPE_SMALLINT || type == FieldType::OLAP_FIELD_TYPE_INT ||
325+
type == FieldType::OLAP_FIELD_TYPE_BIGINT || type == FieldType::OLAP_FIELD_TYPE_LARGEINT;
326+
}
327+
328+
bool is_binary_type(FieldType type) {
329+
return type == FieldType::OLAP_FIELD_TYPE_CHAR || type == FieldType::OLAP_FIELD_TYPE_VARCHAR ||
330+
type == FieldType::OLAP_FIELD_TYPE_STRING || type == FieldType::OLAP_FIELD_TYPE_JSONB ||
331+
type == FieldType::OLAP_FIELD_TYPE_VARIANT || type == FieldType::OLAP_FIELD_TYPE_HLL ||
332+
type == FieldType::OLAP_FIELD_TYPE_BITMAP ||
333+
type == FieldType::OLAP_FIELD_TYPE_QUANTILE_STATE ||
334+
type == FieldType::OLAP_FIELD_TYPE_AGG_STATE;
335+
}
336+
} // namespace
337+
338+
EncodingTypePB EncodingInfoResolver::get_default_encoding(FieldType type,
339+
bool optimize_value_seek) const {
340+
// Predicate for default encoding transformation
341+
// Parameters: (type, current_default_encoding, optimize_value_seek)
342+
// Returns: true if the transformation should be applied
343+
using Predicate = std::function<bool(FieldType, EncodingTypePB, bool)>;
344+
345+
// Hook for transforming default encoding: predicate -> target encoding
346+
struct EncodingTransform {
347+
Predicate predicate;
348+
EncodingTypePB target_encoding;
349+
};
350+
351+
// Static array of hooks for default encoding transformations
352+
static const std::vector<EncodingTransform> hooks = {
353+
// Hook 1: Binary types - PLAIN_ENCODING -> PLAIN_ENCODING_V2
354+
// Applies when: type is binary, encoding is PLAIN_ENCODING, and config enables v2
355+
EncodingTransform {
356+
.predicate =
357+
[](FieldType type, EncodingTypePB encoding, bool optimize_value_seek) {
358+
return encoding == PLAIN_ENCODING && is_binary_type(type) &&
359+
config::binary_plain_encoding_default_impl == "v2";
360+
},
361+
.target_encoding = PLAIN_ENCODING_V2},
362+
363+
// Hook 2: Integer types - any encoding -> PLAIN_ENCODING
364+
// Applies when: type is integer and config enables plain encoding for integers
365+
EncodingTransform {
366+
.predicate =
367+
[](FieldType type, EncodingTypePB encoding, bool optimize_value_seek) {
368+
return is_integer_type(type) &&
369+
config::integer_type_default_use_plain_encoding;
370+
},
371+
.target_encoding = PLAIN_ENCODING}};
372+
373+
auto& encoding_map =
374+
optimize_value_seek ? _value_seek_encoding_map : _default_encoding_type_map;
375+
auto it = encoding_map.find(type);
376+
if (it != encoding_map.end()) {
377+
EncodingTypePB encoding = it->second;
378+
379+
// Apply hooks in order to transform the default encoding
380+
for (const auto& hook : hooks) {
381+
if (hook.predicate(type, encoding, optimize_value_seek)) {
382+
// Verify target encoding is available for this type
383+
if (_encoding_map.contains(std::make_pair(type, hook.target_encoding))) {
384+
encoding = hook.target_encoding;
385+
break; // Apply only the first matching hook
386+
}
387+
}
388+
}
389+
390+
return encoding;
391+
}
392+
return UNKNOWN_ENCODING;
393+
}
394+
320395
Status EncodingInfoResolver::get(FieldType data_type, EncodingTypePB encoding_type,
321396
const EncodingInfo** out) {
322397
if (encoding_type == DEFAULT_ENCODING) {

be/src/olap/rowset/segment_v2/encoding_info.h

Lines changed: 1 addition & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -103,21 +103,7 @@ class EncodingInfoResolver {
103103
EncodingInfoResolver();
104104
~EncodingInfoResolver();
105105

106-
EncodingTypePB get_default_encoding(FieldType type, bool optimize_value_seek) const {
107-
auto& encoding_map =
108-
optimize_value_seek ? _value_seek_encoding_map : _default_encoding_type_map;
109-
auto it = encoding_map.find(type);
110-
if (it != encoding_map.end()) {
111-
EncodingTypePB encoding = it->second;
112-
// For binary types, use PLAIN_ENCODING_V2 if config::binary_plain_encoding_default_impl is "v2"
113-
if (encoding == PLAIN_ENCODING && config::binary_plain_encoding_default_impl == "v2" &&
114-
_encoding_map.contains(std::make_pair(type, PLAIN_ENCODING_V2))) {
115-
return PLAIN_ENCODING_V2;
116-
}
117-
return encoding;
118-
}
119-
return UNKNOWN_ENCODING;
120-
}
106+
EncodingTypePB get_default_encoding(FieldType type, bool optimize_value_seek) const;
121107

122108
Status get(FieldType data_type, EncodingTypePB encoding_type, const EncodingInfo** out);
123109

be/src/olap/rowset/segment_v2/plain_page.h

Lines changed: 60 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
#include "olap/types.h"
2626
#include "util/coding.h"
2727
#include "util/faststring.h"
28+
#include "vec/common/unaligned.h"
2829

2930
namespace doris {
3031
#include "common/compile_check_begin.h"
@@ -33,32 +34,36 @@ namespace segment_v2 {
3334
static const size_t PLAIN_PAGE_HEADER_SIZE = sizeof(uint32_t);
3435

3536
template <FieldType Type>
36-
class PlainPageBuilder : public PageBuilderHelper<PlainPageBuilder<Type> > {
37+
class PlainPageBuilder : public PageBuilderHelper<PlainPageBuilder<Type>> {
3738
public:
3839
using Self = PlainPageBuilder<Type>;
3940
friend class PageBuilderHelper<Self>;
4041

4142
Status init() override {
4243
// Reserve enough space for the page, plus a bit of slop since
4344
// we often overrun the page by a few values.
44-
RETURN_IF_CATCH_EXCEPTION(_buffer.reserve(_options.data_page_size + 1024));
45+
RETURN_IF_CATCH_EXCEPTION(_buffer.reserve(_options.data_page_size));
4546
return reset();
4647
}
4748

48-
bool is_page_full() override { return _buffer.size() > _options.data_page_size; }
49+
bool is_page_full() override { return _remain_element_capacity == 0; }
4950

5051
Status add(const uint8_t* vals, size_t* count) override {
51-
if (is_page_full()) {
52+
if (is_page_full() || *count == 0) {
5253
*count = 0;
5354
return Status::OK();
5455
}
5556
size_t old_size = _buffer.size();
57+
size_t to_add = std::min(_remain_element_capacity, *count);
5658
// This may need a large memory, should return error if could not allocated
5759
// successfully, to avoid BE OOM.
58-
RETURN_IF_CATCH_EXCEPTION(_buffer.resize(old_size + *count * SIZE_OF_TYPE));
59-
memcpy(&_buffer[old_size], vals, *count * SIZE_OF_TYPE);
60-
_count += *count;
61-
_raw_data_size += *count * SIZE_OF_TYPE;
60+
RETURN_IF_CATCH_EXCEPTION(_buffer.resize(old_size + to_add * SIZE_OF_TYPE));
61+
memcpy(&_buffer[old_size], vals, to_add * SIZE_OF_TYPE);
62+
_count += to_add;
63+
_raw_data_size += to_add * SIZE_OF_TYPE;
64+
65+
*count = to_add;
66+
_remain_element_capacity -= to_add;
6267
return Status::OK();
6368
}
6469

@@ -78,11 +83,12 @@ class PlainPageBuilder : public PageBuilderHelper<PlainPageBuilder<Type> > {
7883

7984
Status reset() override {
8085
RETURN_IF_CATCH_EXCEPTION({
81-
_buffer.reserve(_options.data_page_size + 1024);
86+
_buffer.reserve(_options.data_page_size);
8287
_count = 0;
8388
_raw_data_size = 0;
8489
_buffer.clear();
8590
_buffer.resize(PLAIN_PAGE_HEADER_SIZE);
91+
_remain_element_capacity = _options.data_page_size / SIZE_OF_TYPE;
8692
});
8793
return Status::OK();
8894
}
@@ -115,6 +121,7 @@ class PlainPageBuilder : public PageBuilderHelper<PlainPageBuilder<Type> > {
115121
faststring _buffer;
116122
PageBuilderOptions _options;
117123
size_t _count;
124+
size_t _remain_element_capacity {0};
118125
uint64_t _raw_data_size = 0;
119126
typedef typename TypeTraits<Type>::CppType CppType;
120127
enum { SIZE_OF_TYPE = TypeTraits<Type>::size };
@@ -204,7 +211,48 @@ class PlainPageDecoder : public PageDecoder {
204211
}
205212

206213
Status next_batch(size_t* n, vectorized::MutableColumnPtr& dst) override {
207-
return Status::NotSupported("plain page not implement vec op now");
214+
DCHECK(_parsed);
215+
if (*n == 0 || _cur_idx >= _num_elems) [[unlikely]] {
216+
return Status::OK();
217+
}
218+
219+
size_t max_fetch = std::min(*n, static_cast<size_t>(_num_elems - _cur_idx));
220+
const void* src_data = &_data[PLAIN_PAGE_HEADER_SIZE + _cur_idx * SIZE_OF_TYPE];
221+
222+
dst->insert_many_fix_len_data((const char*)src_data, max_fetch);
223+
224+
*n = max_fetch;
225+
_cur_idx += max_fetch;
226+
227+
return Status::OK();
228+
}
229+
230+
Status read_by_rowids(const rowid_t* rowids, ordinal_t page_first_ordinal, size_t* n,
231+
vectorized::MutableColumnPtr& dst) override {
232+
DCHECK(_parsed);
233+
if (*n == 0) [[unlikely]] {
234+
return Status::OK();
235+
}
236+
237+
auto total = *n;
238+
auto read_count = 0;
239+
_buffer.resize(total);
240+
for (size_t i = 0; i < total; ++i) {
241+
ordinal_t ord = rowids[i] - page_first_ordinal;
242+
if (UNLIKELY(ord >= _num_elems)) {
243+
break;
244+
}
245+
246+
_buffer[read_count++] =
247+
unaligned_load<CppType>(&_data[PLAIN_PAGE_HEADER_SIZE + ord * SIZE_OF_TYPE]);
248+
}
249+
250+
if (LIKELY(read_count > 0)) {
251+
dst->insert_many_fix_len_data((char*)_buffer.data(), read_count);
252+
}
253+
254+
*n = read_count;
255+
return Status::OK();
208256
}
209257

210258
size_t count() const override {
@@ -225,6 +273,8 @@ class PlainPageDecoder : public PageDecoder {
225273
uint32_t _cur_idx;
226274
typedef typename TypeTraits<Type>::CppType CppType;
227275
enum { SIZE_OF_TYPE = TypeTraits<Type>::size };
276+
277+
std::vector<std::conditional_t<std::is_same_v<CppType, bool>, uint8_t, CppType>> _buffer;
228278
};
229279

230280
} // namespace segment_v2

be/src/pipeline/task_scheduler.h

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -80,10 +80,11 @@ class TaskScheduler {
8080

8181
class HybridTaskScheduler MOCK_REMOVE(final) : public TaskScheduler {
8282
public:
83-
HybridTaskScheduler(int core_num, std::string name,
83+
HybridTaskScheduler(int exec_thread_num, int blocking_exec_thread_num, std::string name,
8484
std::shared_ptr<CgroupCpuCtl> cgroup_cpu_ctl)
85-
: _blocking_scheduler(core_num * 2, name + "_blocking_scheduler", cgroup_cpu_ctl),
86-
_simple_scheduler(core_num, name + "_simple_scheduler", cgroup_cpu_ctl) {}
85+
: _blocking_scheduler(blocking_exec_thread_num, name + "_blocking_scheduler",
86+
cgroup_cpu_ctl),
87+
_simple_scheduler(exec_thread_num, name + "_simple_scheduler", cgroup_cpu_ctl) {}
8788

8889
Status submit(PipelineTaskSPtr task) override;
8990

0 commit comments

Comments
 (0)