From 1b356208bd88e153971995a179b5c2f186cbed5a Mon Sep 17 00:00:00 2001 From: trueeyu Date: Fri, 19 Apr 2024 11:27:26 +0800 Subject: [PATCH] [Enhancement] HashJoin no longer output columns that the upper node does not need (#44355) Why I'm doing: Before the (branch-2.2) pr (#3042), Hash join node should output all the columns of left and right table, because he don't know which column will be used by upper project node. After the (branch-2.2) pr (#3042), we will populate a default column for upgrade compatibility. In branch-3.3 we can remove this logic. The purpose of doing this is to prepare for the later integration of Join Lazymaterized code. What I'm doing: No need to populate a default column for upgrade compatibility. Signed-off-by: trueeyu --- be/src/exec/hash_join_node.cpp | 6 +++--- be/src/exec/hash_join_node.h | 4 ++-- be/src/exec/hash_joiner.cpp | 11 ++++++----- be/src/exec/hash_joiner.h | 7 ++----- be/src/exec/join_hash_map.cpp | 3 ++- be/src/exec/join_hash_map.h | 16 ++++------------ be/src/exec/join_hash_map.tpp | 34 ++++++++++++++++------------------ 7 files changed, 35 insertions(+), 46 deletions(-) diff --git a/be/src/exec/hash_join_node.cpp b/be/src/exec/hash_join_node.cpp index e1cb06a205f5d5..0b97289c656d73 100644 --- a/be/src/exec/hash_join_node.cpp +++ b/be/src/exec/hash_join_node.cpp @@ -180,8 +180,8 @@ Status HashJoinNode::prepare(RuntimeState* state) { _init_hash_table_param(¶m); _ht.create(param); - _probe_column_count = _ht.get_probe_column_count(); - _build_column_count = _ht.get_build_column_count(); + _output_probe_column_count = _ht.get_output_probe_column_count(); + _output_build_column_count = _ht.get_output_build_column_count(); return Status::OK(); } @@ -877,7 +877,7 @@ Status HashJoinNode::_process_other_conjunct(ChunkPtr* chunk) { switch (_join_type) { case TJoinOp::LEFT_OUTER_JOIN: case TJoinOp::FULL_OUTER_JOIN: - return _process_outer_join_with_other_conjunct(chunk, _probe_column_count, _build_column_count); + return _process_outer_join_with_other_conjunct(chunk, _output_probe_column_count, _output_build_column_count); case TJoinOp::RIGHT_OUTER_JOIN: case TJoinOp::LEFT_SEMI_JOIN: case TJoinOp::LEFT_ANTI_JOIN: diff --git a/be/src/exec/hash_join_node.h b/be/src/exec/hash_join_node.h index 87a5fa442e0310..08e7bcfcd3297e 100644 --- a/be/src/exec/hash_join_node.h +++ b/be/src/exec/hash_join_node.h @@ -119,8 +119,8 @@ class HashJoinNode final : public ExecNode { ChunkPtr _probing_chunk = nullptr; Columns _key_columns; - size_t _probe_column_count = 0; - size_t _build_column_count = 0; + size_t _output_probe_column_count = 0; + size_t _output_build_column_count = 0; size_t _probe_chunk_count = 0; size_t _output_chunk_count = 0; diff --git a/be/src/exec/hash_joiner.cpp b/be/src/exec/hash_joiner.cpp index ea15ad70eb8888..e4d81ee52800ea 100644 --- a/be/src/exec/hash_joiner.cpp +++ b/be/src/exec/hash_joiner.cpp @@ -107,8 +107,8 @@ Status HashJoiner::prepare_builder(RuntimeState* state, RuntimeProfile* runtime_ _hash_join_builder->create(hash_table_param()); auto& ht = _hash_join_builder->hash_table(); - _probe_column_count = ht.get_probe_column_count(); - _build_column_count = ht.get_build_column_count(); + _output_probe_column_count = ht.get_output_probe_column_count(); + _output_build_column_count = ht.get_output_build_column_count(); return Status::OK(); } @@ -321,8 +321,8 @@ void HashJoiner::reference_hash_table(HashJoiner* src_join_builder) { // _hash_table_build_rows is root truth, it used to by _short_circuit_break(). _hash_table_build_rows = src_join_builder->_hash_table_build_rows; - _probe_column_count = src_join_builder->_probe_column_count; - _build_column_count = src_join_builder->_build_column_count; + _output_probe_column_count = src_join_builder->_output_probe_column_count; + _output_build_column_count = src_join_builder->_output_build_column_count; _has_referenced_hash_table = true; @@ -486,7 +486,8 @@ Status HashJoiner::_process_other_conjunct(ChunkPtr* chunk, JoinHashTable& hash_ switch (_join_type) { case TJoinOp::LEFT_OUTER_JOIN: case TJoinOp::FULL_OUTER_JOIN: - return _process_outer_join_with_other_conjunct(chunk, _probe_column_count, _build_column_count, hash_table); + return _process_outer_join_with_other_conjunct(chunk, _output_probe_column_count, _output_build_column_count, + hash_table); case TJoinOp::RIGHT_OUTER_JOIN: case TJoinOp::LEFT_SEMI_JOIN: case TJoinOp::LEFT_ANTI_JOIN: diff --git a/be/src/exec/hash_joiner.h b/be/src/exec/hash_joiner.h index 8e6bbdc4e7b6d2..0abb5350ef5f8c 100644 --- a/be/src/exec/hash_joiner.h +++ b/be/src/exec/hash_joiner.h @@ -366,8 +366,6 @@ class HashJoiner final : public pipeline::ContextWithDependency { } } - [[nodiscard]] Status _build(RuntimeState* state); - [[nodiscard]] StatusOr _pull_probe_output_chunk(RuntimeState* state); [[nodiscard]] Status _calc_filter_for_other_conjunct(ChunkPtr* chunk, Filter& filter, bool& filter_all, @@ -394,7 +392,6 @@ class HashJoiner final : public pipeline::ContextWithDependency { TJoinOp::type _join_type = TJoinOp::INNER_JOIN; std::atomic _phase = HashJoinPhase::BUILD; - bool _is_closed = false; const std::vector& _is_null_safes; // Equal conjuncts in Join On. @@ -425,8 +422,8 @@ class HashJoiner final : public pipeline::ContextWithDependency { // in-filter constructed from string-typed key columns reference the memory of this column, and the in-filter's // lifetime can last beyond HashJoiner. Columns _string_key_columns; - size_t _probe_column_count = 0; - size_t _build_column_count = 0; + size_t _output_probe_column_count = 0; + size_t _output_build_column_count = 0; // hash table doesn't have reserved data // bool _ht_has_remain = false; diff --git a/be/src/exec/join_hash_map.cpp b/be/src/exec/join_hash_map.cpp index 00911911e80c9c..7b6b46990ae2db 100644 --- a/be/src/exec/join_hash_map.cpp +++ b/be/src/exec/join_hash_map.cpp @@ -319,7 +319,6 @@ void JoinHashTable::create(const HashTableParam& param) { _table_items->build_chunk = std::make_shared(); _table_items->with_other_conjunct = param.with_other_conjunct; _table_items->join_type = param.join_type; - _table_items->row_desc = param.row_desc; _table_items->mor_reader_mode = param.mor_reader_mode; if (_table_items->join_type == TJoinOp::RIGHT_SEMI_JOIN || _table_items->join_type == TJoinOp::RIGHT_ANTI_JOIN || @@ -347,6 +346,7 @@ void JoinHashTable::create(const HashTableParam& param) { std::find(param.predicate_slots.begin(), param.predicate_slots.end(), slot->id()) != param.predicate_slots.end()) { hash_table_slot.need_output = true; + _table_items->output_probe_column_count++; } else { hash_table_slot.need_output = false; } @@ -367,6 +367,7 @@ void JoinHashTable::create(const HashTableParam& param) { std::find(param.predicate_slots.begin(), param.predicate_slots.end(), slot->id()) != param.predicate_slots.end())) { hash_table_slot.need_output = true; + _table_items->output_build_column_count++; } else { hash_table_slot.need_output = false; } diff --git a/be/src/exec/join_hash_map.h b/be/src/exec/join_hash_map.h index fd34207da3c18c..b093bab6191eb6 100644 --- a/be/src/exec/join_hash_map.h +++ b/be/src/exec/join_hash_map.h @@ -102,7 +102,6 @@ struct JoinHashTableItems { Columns key_columns; Buffer build_slots; Buffer probe_slots; - const RowDescriptor* row_desc; // A hash value is the bucket index of the hash map. "JoinHashTableItems.first" is the // buckets of the hash map, and it holds the index of the first key value saved in each bucket, // while other keys can be found by following the indices saved in @@ -117,7 +116,9 @@ struct JoinHashTableItems { uint32_t bucket_size = 0; uint32_t row_count = 0; // real row count size_t build_column_count = 0; + size_t output_build_column_count = 0; size_t probe_column_count = 0; + size_t output_probe_column_count = 0; bool with_other_conjunct = false; bool left_to_nullable = false; bool right_to_nullable = false; @@ -556,11 +557,6 @@ class JoinHashMapForEmpty { // DCHECK_EQ(column->is_nullable(), to_nullable); (*chunk)->append_column(std::move(column), slot->id()); } - } else { - ColumnPtr default_column = - ColumnHelper::create_column(slot->type(), column->is_nullable() || to_nullable); - default_column->append_default(_probe_state->count); - (*chunk)->append_column(std::move(default_column), slot->id()); } } } @@ -575,18 +571,12 @@ class JoinHashMapForEmpty { for (size_t i = 0; i < _table_items->build_column_count; i++) { HashTableSlotDescriptor hash_table_slot = _table_items->build_slots[i]; SlotDescriptor* slot = hash_table_slot.slot; - ColumnPtr& column = _table_items->build_chunk->columns()[i]; if (hash_table_slot.need_output) { // always output nulls. DCHECK(to_nullable); ColumnPtr dest_column = ColumnHelper::create_column(slot->type(), true); dest_column->append_nulls(_probe_state->count); (*chunk)->append_column(std::move(dest_column), slot->id()); - } else { - ColumnPtr default_column = - ColumnHelper::create_column(slot->type(), column->is_nullable() || to_nullable); - default_column->append_default(_probe_state->count); - (*chunk)->append_column(std::move(default_column), slot->id()); } } } @@ -771,7 +761,9 @@ class JoinHashTable { Columns& get_key_columns() { return _table_items->key_columns; } uint32_t get_row_count() const { return _table_items->row_count; } size_t get_probe_column_count() const { return _table_items->probe_column_count; } + size_t get_output_probe_column_count() const { return _table_items->output_probe_column_count; } size_t get_build_column_count() const { return _table_items->build_column_count; } + size_t get_output_build_column_count() const { return _table_items->output_build_column_count; } size_t get_bucket_size() const { return _table_items->bucket_size; } float get_keys_per_bucket() const; void remove_duplicate_index(Filter* filter); diff --git a/be/src/exec/join_hash_map.tpp b/be/src/exec/join_hash_map.tpp index 1e3b84589beff2..917933181ac2a7 100644 --- a/be/src/exec/join_hash_map.tpp +++ b/be/src/exec/join_hash_map.tpp @@ -517,17 +517,13 @@ void JoinHashMap::_probe_output(ChunkPtr* probe_chunk, for (size_t i = 0; i < _table_items->probe_column_count; i++) { HashTableSlotDescriptor hash_table_slot = _table_items->probe_slots[i]; SlotDescriptor* slot = hash_table_slot.slot; - auto& column = (*probe_chunk)->get_column_by_slot_id(slot->id()); if (hash_table_slot.need_output) { + auto& column = (*probe_chunk)->get_column_by_slot_id(slot->id()); if (!column->is_nullable()) { _copy_probe_column(&column, chunk, slot, to_nullable); } else { _copy_probe_nullable_column(&column, chunk, slot); } - } else { - ColumnPtr default_column = ColumnHelper::create_column(slot->type(), column->is_nullable() || to_nullable); - default_column->append_default(_probe_state->count); - (*chunk)->append_column(std::move(default_column), slot->id()); } } } @@ -535,10 +531,13 @@ void JoinHashMap::_probe_output(ChunkPtr* probe_chunk, template void JoinHashMap::_probe_null_output(ChunkPtr* chunk, size_t count) { for (size_t i = 0; i < _table_items->probe_column_count; i++) { - SlotDescriptor* slot = _table_items->probe_slots[i].slot; - ColumnPtr column = ColumnHelper::create_column(slot->type(), true); - column->append_nulls(count); - (*chunk)->append_column(std::move(column), slot->id()); + HashTableSlotDescriptor hash_table_slot = _table_items->probe_slots[i]; + SlotDescriptor* slot = hash_table_slot.slot; + if (hash_table_slot.need_output) { + ColumnPtr column = ColumnHelper::create_column(slot->type(), true); + column->append_nulls(count); + (*chunk)->append_column(std::move(column), slot->id()); + } } } @@ -548,17 +547,13 @@ void JoinHashMap::_build_output(ChunkPtr* chunk) { for (size_t i = 0; i < _table_items->build_column_count; i++) { HashTableSlotDescriptor hash_table_slot = _table_items->build_slots[i]; SlotDescriptor* slot = hash_table_slot.slot; - ColumnPtr& column = _table_items->build_chunk->columns()[i]; if (hash_table_slot.need_output) { + ColumnPtr& column = _table_items->build_chunk->columns()[i]; if (!column->is_nullable()) { _copy_build_column(column, chunk, slot, to_nullable); } else { _copy_build_nullable_column(column, chunk, slot); } - } else { - ColumnPtr default_column = ColumnHelper::create_column(slot->type(), column->is_nullable() || to_nullable); - default_column->append_default(_probe_state->count); - (*chunk)->append_column(std::move(default_column), slot->id()); } } } @@ -566,10 +561,13 @@ void JoinHashMap::_build_output(ChunkPtr* chunk) { template void JoinHashMap::_build_default_output(ChunkPtr* chunk, size_t count) { for (size_t i = 0; i < _table_items->build_column_count; i++) { - SlotDescriptor* slot = _table_items->build_slots[i].slot; - ColumnPtr column = ColumnHelper::create_column(slot->type(), true); - column->append_nulls(count); - (*chunk)->append_column(std::move(column), slot->id()); + auto hash_tablet_slot = _table_items->build_slots[i]; + SlotDescriptor* slot = hash_tablet_slot.slot; + if (hash_tablet_slot.need_output) { + ColumnPtr column = ColumnHelper::create_column(slot->type(), true); + column->append_nulls(count); + (*chunk)->append_column(std::move(column), slot->id()); + } } }