Skip to content

Commit

Permalink
[Enhancement] HashJoin no longer output columns that the upper node d…
Browse files Browse the repository at this point in the history
…oes not need (StarRocks#44355)

Why I'm doing:
Before the (branch-2.2) pr (StarRocks#3042), Hash join node should output all the columns of left and right table, because he don't know which column will be used by upper project node.

After the (branch-2.2) pr (StarRocks#3042), we will populate a default column for upgrade compatibility.

In branch-3.3 we can remove this logic.

The purpose of doing this is to prepare for the later integration of Join Lazymaterized code.

What I'm doing:
No need to populate a default column for upgrade compatibility.

Signed-off-by: trueeyu <[email protected]>
  • Loading branch information
trueeyu authored Apr 19, 2024
1 parent 420d78f commit 1b35620
Show file tree
Hide file tree
Showing 7 changed files with 35 additions and 46 deletions.
6 changes: 3 additions & 3 deletions be/src/exec/hash_join_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -180,8 +180,8 @@ Status HashJoinNode::prepare(RuntimeState* state) {
_init_hash_table_param(&param);
_ht.create(param);

_probe_column_count = _ht.get_probe_column_count();
_build_column_count = _ht.get_build_column_count();
_output_probe_column_count = _ht.get_output_probe_column_count();
_output_build_column_count = _ht.get_output_build_column_count();

return Status::OK();
}
Expand Down Expand Up @@ -877,7 +877,7 @@ Status HashJoinNode::_process_other_conjunct(ChunkPtr* chunk) {
switch (_join_type) {
case TJoinOp::LEFT_OUTER_JOIN:
case TJoinOp::FULL_OUTER_JOIN:
return _process_outer_join_with_other_conjunct(chunk, _probe_column_count, _build_column_count);
return _process_outer_join_with_other_conjunct(chunk, _output_probe_column_count, _output_build_column_count);
case TJoinOp::RIGHT_OUTER_JOIN:
case TJoinOp::LEFT_SEMI_JOIN:
case TJoinOp::LEFT_ANTI_JOIN:
Expand Down
4 changes: 2 additions & 2 deletions be/src/exec/hash_join_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,8 @@ class HashJoinNode final : public ExecNode {
ChunkPtr _probing_chunk = nullptr;

Columns _key_columns;
size_t _probe_column_count = 0;
size_t _build_column_count = 0;
size_t _output_probe_column_count = 0;
size_t _output_build_column_count = 0;
size_t _probe_chunk_count = 0;
size_t _output_chunk_count = 0;

Expand Down
11 changes: 6 additions & 5 deletions be/src/exec/hash_joiner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,8 @@ Status HashJoiner::prepare_builder(RuntimeState* state, RuntimeProfile* runtime_
_hash_join_builder->create(hash_table_param());
auto& ht = _hash_join_builder->hash_table();

_probe_column_count = ht.get_probe_column_count();
_build_column_count = ht.get_build_column_count();
_output_probe_column_count = ht.get_output_probe_column_count();
_output_build_column_count = ht.get_output_build_column_count();

return Status::OK();
}
Expand Down Expand Up @@ -321,8 +321,8 @@ void HashJoiner::reference_hash_table(HashJoiner* src_join_builder) {

// _hash_table_build_rows is root truth, it used to by _short_circuit_break().
_hash_table_build_rows = src_join_builder->_hash_table_build_rows;
_probe_column_count = src_join_builder->_probe_column_count;
_build_column_count = src_join_builder->_build_column_count;
_output_probe_column_count = src_join_builder->_output_probe_column_count;
_output_build_column_count = src_join_builder->_output_build_column_count;

_has_referenced_hash_table = true;

Expand Down Expand Up @@ -486,7 +486,8 @@ Status HashJoiner::_process_other_conjunct(ChunkPtr* chunk, JoinHashTable& hash_
switch (_join_type) {
case TJoinOp::LEFT_OUTER_JOIN:
case TJoinOp::FULL_OUTER_JOIN:
return _process_outer_join_with_other_conjunct(chunk, _probe_column_count, _build_column_count, hash_table);
return _process_outer_join_with_other_conjunct(chunk, _output_probe_column_count, _output_build_column_count,
hash_table);
case TJoinOp::RIGHT_OUTER_JOIN:
case TJoinOp::LEFT_SEMI_JOIN:
case TJoinOp::LEFT_ANTI_JOIN:
Expand Down
7 changes: 2 additions & 5 deletions be/src/exec/hash_joiner.h
Original file line number Diff line number Diff line change
Expand Up @@ -366,8 +366,6 @@ class HashJoiner final : public pipeline::ContextWithDependency {
}
}

[[nodiscard]] Status _build(RuntimeState* state);

[[nodiscard]] StatusOr<ChunkPtr> _pull_probe_output_chunk(RuntimeState* state);

[[nodiscard]] Status _calc_filter_for_other_conjunct(ChunkPtr* chunk, Filter& filter, bool& filter_all,
Expand All @@ -394,7 +392,6 @@ class HashJoiner final : public pipeline::ContextWithDependency {

TJoinOp::type _join_type = TJoinOp::INNER_JOIN;
std::atomic<HashJoinPhase> _phase = HashJoinPhase::BUILD;
bool _is_closed = false;

const std::vector<bool>& _is_null_safes;
// Equal conjuncts in Join On.
Expand Down Expand Up @@ -425,8 +422,8 @@ class HashJoiner final : public pipeline::ContextWithDependency {
// in-filter constructed from string-typed key columns reference the memory of this column, and the in-filter's
// lifetime can last beyond HashJoiner.
Columns _string_key_columns;
size_t _probe_column_count = 0;
size_t _build_column_count = 0;
size_t _output_probe_column_count = 0;
size_t _output_build_column_count = 0;

// hash table doesn't have reserved data
// bool _ht_has_remain = false;
Expand Down
3 changes: 2 additions & 1 deletion be/src/exec/join_hash_map.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,6 @@ void JoinHashTable::create(const HashTableParam& param) {
_table_items->build_chunk = std::make_shared<Chunk>();
_table_items->with_other_conjunct = param.with_other_conjunct;
_table_items->join_type = param.join_type;
_table_items->row_desc = param.row_desc;
_table_items->mor_reader_mode = param.mor_reader_mode;

if (_table_items->join_type == TJoinOp::RIGHT_SEMI_JOIN || _table_items->join_type == TJoinOp::RIGHT_ANTI_JOIN ||
Expand Down Expand Up @@ -347,6 +346,7 @@ void JoinHashTable::create(const HashTableParam& param) {
std::find(param.predicate_slots.begin(), param.predicate_slots.end(), slot->id()) !=
param.predicate_slots.end()) {
hash_table_slot.need_output = true;
_table_items->output_probe_column_count++;
} else {
hash_table_slot.need_output = false;
}
Expand All @@ -367,6 +367,7 @@ void JoinHashTable::create(const HashTableParam& param) {
std::find(param.predicate_slots.begin(), param.predicate_slots.end(),
slot->id()) != param.predicate_slots.end())) {
hash_table_slot.need_output = true;
_table_items->output_build_column_count++;
} else {
hash_table_slot.need_output = false;
}
Expand Down
16 changes: 4 additions & 12 deletions be/src/exec/join_hash_map.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,6 @@ struct JoinHashTableItems {
Columns key_columns;
Buffer<HashTableSlotDescriptor> build_slots;
Buffer<HashTableSlotDescriptor> probe_slots;
const RowDescriptor* row_desc;
// A hash value is the bucket index of the hash map. "JoinHashTableItems.first" is the
// buckets of the hash map, and it holds the index of the first key value saved in each bucket,
// while other keys can be found by following the indices saved in
Expand All @@ -117,7 +116,9 @@ struct JoinHashTableItems {
uint32_t bucket_size = 0;
uint32_t row_count = 0; // real row count
size_t build_column_count = 0;
size_t output_build_column_count = 0;
size_t probe_column_count = 0;
size_t output_probe_column_count = 0;
bool with_other_conjunct = false;
bool left_to_nullable = false;
bool right_to_nullable = false;
Expand Down Expand Up @@ -556,11 +557,6 @@ class JoinHashMapForEmpty {
// DCHECK_EQ(column->is_nullable(), to_nullable);
(*chunk)->append_column(std::move(column), slot->id());
}
} else {
ColumnPtr default_column =
ColumnHelper::create_column(slot->type(), column->is_nullable() || to_nullable);
default_column->append_default(_probe_state->count);
(*chunk)->append_column(std::move(default_column), slot->id());
}
}
}
Expand All @@ -575,18 +571,12 @@ class JoinHashMapForEmpty {
for (size_t i = 0; i < _table_items->build_column_count; i++) {
HashTableSlotDescriptor hash_table_slot = _table_items->build_slots[i];
SlotDescriptor* slot = hash_table_slot.slot;
ColumnPtr& column = _table_items->build_chunk->columns()[i];
if (hash_table_slot.need_output) {
// always output nulls.
DCHECK(to_nullable);
ColumnPtr dest_column = ColumnHelper::create_column(slot->type(), true);
dest_column->append_nulls(_probe_state->count);
(*chunk)->append_column(std::move(dest_column), slot->id());
} else {
ColumnPtr default_column =
ColumnHelper::create_column(slot->type(), column->is_nullable() || to_nullable);
default_column->append_default(_probe_state->count);
(*chunk)->append_column(std::move(default_column), slot->id());
}
}
}
Expand Down Expand Up @@ -771,7 +761,9 @@ class JoinHashTable {
Columns& get_key_columns() { return _table_items->key_columns; }
uint32_t get_row_count() const { return _table_items->row_count; }
size_t get_probe_column_count() const { return _table_items->probe_column_count; }
size_t get_output_probe_column_count() const { return _table_items->output_probe_column_count; }
size_t get_build_column_count() const { return _table_items->build_column_count; }
size_t get_output_build_column_count() const { return _table_items->output_build_column_count; }
size_t get_bucket_size() const { return _table_items->bucket_size; }
float get_keys_per_bucket() const;
void remove_duplicate_index(Filter* filter);
Expand Down
34 changes: 16 additions & 18 deletions be/src/exec/join_hash_map.tpp
Original file line number Diff line number Diff line change
Expand Up @@ -517,28 +517,27 @@ void JoinHashMap<LT, BuildFunc, ProbeFunc>::_probe_output(ChunkPtr* probe_chunk,
for (size_t i = 0; i < _table_items->probe_column_count; i++) {
HashTableSlotDescriptor hash_table_slot = _table_items->probe_slots[i];
SlotDescriptor* slot = hash_table_slot.slot;
auto& column = (*probe_chunk)->get_column_by_slot_id(slot->id());
if (hash_table_slot.need_output) {
auto& column = (*probe_chunk)->get_column_by_slot_id(slot->id());
if (!column->is_nullable()) {
_copy_probe_column(&column, chunk, slot, to_nullable);
} else {
_copy_probe_nullable_column(&column, chunk, slot);
}
} else {
ColumnPtr default_column = ColumnHelper::create_column(slot->type(), column->is_nullable() || to_nullable);
default_column->append_default(_probe_state->count);
(*chunk)->append_column(std::move(default_column), slot->id());
}
}
}

template <LogicalType LT, class BuildFunc, class ProbeFunc>
void JoinHashMap<LT, BuildFunc, ProbeFunc>::_probe_null_output(ChunkPtr* chunk, size_t count) {
for (size_t i = 0; i < _table_items->probe_column_count; i++) {
SlotDescriptor* slot = _table_items->probe_slots[i].slot;
ColumnPtr column = ColumnHelper::create_column(slot->type(), true);
column->append_nulls(count);
(*chunk)->append_column(std::move(column), slot->id());
HashTableSlotDescriptor hash_table_slot = _table_items->probe_slots[i];
SlotDescriptor* slot = hash_table_slot.slot;
if (hash_table_slot.need_output) {
ColumnPtr column = ColumnHelper::create_column(slot->type(), true);
column->append_nulls(count);
(*chunk)->append_column(std::move(column), slot->id());
}
}
}

Expand All @@ -548,28 +547,27 @@ void JoinHashMap<LT, BuildFunc, ProbeFunc>::_build_output(ChunkPtr* chunk) {
for (size_t i = 0; i < _table_items->build_column_count; i++) {
HashTableSlotDescriptor hash_table_slot = _table_items->build_slots[i];
SlotDescriptor* slot = hash_table_slot.slot;
ColumnPtr& column = _table_items->build_chunk->columns()[i];
if (hash_table_slot.need_output) {
ColumnPtr& column = _table_items->build_chunk->columns()[i];
if (!column->is_nullable()) {
_copy_build_column(column, chunk, slot, to_nullable);
} else {
_copy_build_nullable_column(column, chunk, slot);
}
} else {
ColumnPtr default_column = ColumnHelper::create_column(slot->type(), column->is_nullable() || to_nullable);
default_column->append_default(_probe_state->count);
(*chunk)->append_column(std::move(default_column), slot->id());
}
}
}

template <LogicalType LT, class BuildFunc, class ProbeFunc>
void JoinHashMap<LT, BuildFunc, ProbeFunc>::_build_default_output(ChunkPtr* chunk, size_t count) {
for (size_t i = 0; i < _table_items->build_column_count; i++) {
SlotDescriptor* slot = _table_items->build_slots[i].slot;
ColumnPtr column = ColumnHelper::create_column(slot->type(), true);
column->append_nulls(count);
(*chunk)->append_column(std::move(column), slot->id());
auto hash_tablet_slot = _table_items->build_slots[i];
SlotDescriptor* slot = hash_tablet_slot.slot;
if (hash_tablet_slot.need_output) {
ColumnPtr column = ColumnHelper::create_column(slot->type(), true);
column->append_nulls(count);
(*chunk)->append_column(std::move(column), slot->id());
}
}
}

Expand Down

0 comments on commit 1b35620

Please sign in to comment.