Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion cpp/src/arrow/compute/row/grouper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,12 @@ struct GrouperImpl : public Grouper {
continue;
}

if (is_binary_view_like(key_type.id())) {
impl->encoders_[i] =
std::make_unique<internal::BinaryViewKeyEncoder>(key_type.GetSharedPtr());
continue;
}

if (key_type.id() == Type::NA) {
impl->encoders_[i] = std::make_unique<internal::NullKeyEncoder>();
continue;
Expand Down Expand Up @@ -556,7 +562,10 @@ struct GrouperFastImpl : public Grouper {
}
#if ARROW_LITTLE_ENDIAN
for (size_t i = 0; i < key_types.size(); ++i) {
if (is_large_binary_like(key_types[i].id())) {
// View keys fall back to GrouperImpl; the fast row format can't hold
// their variadic buffers.
if (is_large_binary_like(key_types[i].id()) ||
is_binary_view_like(key_types[i].id())) {
return false;
}
}
Expand Down
23 changes: 21 additions & 2 deletions cpp/src/arrow/compute/row/grouper_benchmark.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@
#include "arrow/util/key_value_metadata.h"
#include "arrow/util/string.h"

#include "arrow/compute/cast.h"
#include "arrow/compute/row/grouper.h"
#include "arrow/testing/gtest_util.h"
#include "arrow/testing/random.h"
#include "arrow/type_traits.h"
#include "arrow/util/benchmark_util.h"

namespace arrow {
Expand Down Expand Up @@ -62,8 +64,20 @@ static ExecBatch MakeRandomExecBatch(const DataTypeVector& types, int64_t num_ro
std::vector<Datum> values;
values.resize(num_types);
for (int i = 0; i < num_types; ++i) {
auto field = ::arrow::field("", types[i], metadata);
values[i] = rng.ArrayOf(*field, num_rows, alignment, memory_pool);
// The "unique" cardinality knob isn't honored for view types, so generate the
// plain equivalent and cast it (outside the timed loop) to match {utf8}.
const auto& type = types[i];
const bool is_view = is_binary_view_like(*type);
std::shared_ptr<DataType> gen_type = type;
if (is_view) {
gen_type = type->id() == Type::STRING_VIEW ? utf8() : binary();
}
auto field = ::arrow::field("", gen_type, metadata);
Datum value = rng.ArrayOf(*field, num_rows, alignment, memory_pool);
if (is_view) {
ASSIGN_OR_ABORT(value, compute::Cast(value, type));
}
values[i] = std::move(value);
}

return ExecBatch(std::move(values), num_rows);
Expand Down Expand Up @@ -125,6 +139,8 @@ BENCHMARK_CAPTURE(GrouperWithMultiTypes, "{boolean}", {boolean()})->Apply(SetArg
BENCHMARK_CAPTURE(GrouperWithMultiTypes, "{int32}", {int32()})->Apply(SetArgs);
BENCHMARK_CAPTURE(GrouperWithMultiTypes, "{int64}", {int64()})->Apply(SetArgs);
BENCHMARK_CAPTURE(GrouperWithMultiTypes, "{utf8}", {utf8()})->Apply(SetArgs);
// View keys use the generic GrouperImpl path (the {utf8} fast path rejects them).
BENCHMARK_CAPTURE(GrouperWithMultiTypes, "{utf8_view}", {utf8_view()})->Apply(SetArgs);
BENCHMARK_CAPTURE(GrouperWithMultiTypes, "{fixed_size_binary(32)}",
{fixed_size_binary(32)})
->Apply(SetArgs);
Expand All @@ -147,6 +163,9 @@ BENCHMARK_CAPTURE(GrouperWithMultiTypes, "{int32, boolean, utf8}",
BENCHMARK_CAPTURE(GrouperWithMultiTypes, "{int32, int64, boolean, utf8}",
{int32(), int64(), boolean(), utf8()})
->Apply(SetArgs);
BENCHMARK_CAPTURE(GrouperWithMultiTypes, "{int32, int64, boolean, utf8_view}",
{int32(), int64(), boolean(), utf8_view()})
->Apply(SetArgs);
BENCHMARK_CAPTURE(GrouperWithMultiTypes,
"{utf8, int32, int64, fixed_size_binary(32), boolean}",
{utf8(), int32(), int64(), fixed_size_binary(32), boolean()})
Expand Down
74 changes: 72 additions & 2 deletions cpp/src/arrow/compute/row/grouper_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "arrow/array.h"
#include "arrow/array/util.h"
#include "arrow/compute/api_vector.h"
#include "arrow/compute/cast.h"
#include "arrow/compute/exec.h"
#include "arrow/compute/row/grouper.h"
#include "arrow/compute/row/grouper_internal.h"
Expand Down Expand Up @@ -120,6 +121,8 @@ void TestGroupClassSupportedKeys(

ASSERT_OK(make_func({utf8(), binary(), large_utf8(), large_binary()}));

ASSERT_OK(make_func({utf8_view(), binary_view()}));

ASSERT_OK(make_func({fixed_size_binary(16), fixed_size_binary(32)}));

ASSERT_OK(make_func({decimal128(32, 10), decimal256(76, 20)}));
Expand Down Expand Up @@ -754,13 +757,24 @@ struct TestGrouper {
auto group_ids = id_batch.make_array();
ValidateOutput(*group_ids);

// Take has no view kernel yet (GH-43010); validate via the non-view cast.
// Output type is checked separately by ExpectUniques.
auto as_takeable = [](std::shared_ptr<Array> arr) -> std::shared_ptr<Array> {
if (is_binary_view_like(*arr->type())) {
auto target = arr->type_id() == Type::STRING_VIEW ? utf8() : binary();
arr = Cast(Datum(arr), target).ValueOrDie().make_array();
}
return arr;
};

for (int i = 0; i < key_batch.num_values(); ++i) {
SCOPED_TRACE(ToChars(i) + "th key array");
auto original =
key_batch[i].is_array()
? key_batch[i].make_array()
: *MakeArrayFromScalar(*key_batch[i].scalar(), key_batch.length);
ASSERT_OK_AND_ASSIGN(auto encoded, Take(*uniques_[i].make_array(), *group_ids));
ASSERT_OK_AND_ASSIGN(auto encoded,
Take(*as_takeable(uniques_[i].make_array()), *group_ids));
std::shared_ptr<Array> expected = original;
if (can_be_null && original->type_id() != Type::NA) {
// To compute the expected output, mask out the original entries that
Expand Down Expand Up @@ -791,7 +805,7 @@ struct TestGrouper {
expected_data->null_count = kUnknownNullCount;
expected = MakeArray(expected_data);
}
AssertArraysEqual(*expected, *encoded, /*verbose=*/true,
AssertArraysEqual(*as_takeable(expected), *encoded, /*verbose=*/true,
EqualOptions().nans_equal(true));
}
}
Expand Down Expand Up @@ -913,6 +927,42 @@ TEST(Grouper, StringKey) {
}
}

TEST(Grouper, StringViewKey) {
// Mix inline (<=12 byte) and out-of-line view keys.
for (auto ty : {utf8_view(), binary_view()}) {
ARROW_SCOPED_TRACE("key type = ", *ty);
{
TestGrouper g({ty});
g.ExpectConsume(R"([["eh"], ["eh"]])", "[0, 0]");
g.ExpectConsume(R"([["eh"], ["eh"]])", "[0, 0]");
g.ExpectConsume(R"([["be"], [null]])", "[1, 2]");
g.ExpectConsume(R"([["a long out-of-line view"], ["a long out-of-line view"]])",
"[3, 3]");
g.ExpectUniques(R"([["eh"], ["be"], [null], ["a long out-of-line view"]])");
}
{
TestGrouper g({ty});
g.ExpectPopulate(R"([["eh"], ["eh"]])");
g.ExpectPopulate(R"([["be"], [null]])");
g.ExpectLookup(R"([["be"], [null], ["da"]])", "[1, 2, null]");
}
}
}

TEST(Grouper, StringViewInt64Key) {
for (auto ty : {utf8_view(), binary_view()}) {
ARROW_SCOPED_TRACE("key type = ", *ty);
TestGrouper g({ty, int64()});

g.ExpectConsume(R"([["eh", 0], ["eh", 0]])", "[0, 0]");
g.ExpectConsume(R"([["eh", 0], ["eh", null]])", "[0, 1]");
g.ExpectConsume(R"([["eh", 1], ["bee", 1]])", "[2, 3]");
g.ExpectConsume(R"([["eh", null], ["bee", 1]])", "[1, 3]");

g.ExpectUniques(R"([["eh", 0], ["eh", null], ["eh", 1], ["bee", 1]])");
}
}

TEST(Grouper, DictKey) {
// For dictionary keys, all batches must share a single dictionary.
// Eventually, differing dictionaries will be unified and indices transposed
Expand Down Expand Up @@ -1074,6 +1124,10 @@ FieldVector AnnotateForRandomGeneration(FieldVector fields) {
} else if (is_binary_like(*field->type())) {
// (note this is unsupported for large binary types)
field = field->WithMergedMetadata(key_value_metadata({"unique"}, {"100"}));
} else if (is_binary_view_like(*field->type())) {
// Views don't support the "unique" knob; small lengths keep cardinality low.
field = field->WithMergedMetadata(
key_value_metadata({"min_length", "max_length"}, {"0", "5"}));
}
field = field->WithMergedMetadata(key_value_metadata({"null_probability"}, {"0.1"}));
}
Expand Down Expand Up @@ -1130,6 +1184,22 @@ TEST(Grouper, RandomStringInt64DoubleInt32Keys) {
TestRandomLookup(TestGrouper({utf8(), int64(), float64(), int32()}));
}

TEST(Grouper, RandomStringViewKeys) {
for (auto ty : {utf8_view(), binary_view()}) {
ARROW_SCOPED_TRACE("key type = ", *ty);
TestRandomConsume(TestGrouper({ty}));
TestRandomLookup(TestGrouper({ty}));
}
}

TEST(Grouper, RandomStringViewInt64Keys) {
for (auto ty : {utf8_view(), binary_view()}) {
ARROW_SCOPED_TRACE("key type = ", *ty);
TestRandomConsume(TestGrouper({ty, int64()}));
TestRandomLookup(TestGrouper({ty, int64()}));
}
}

TEST(Grouper, NullKeys) {
{
TestGrouper g({null()});
Expand Down
105 changes: 105 additions & 0 deletions cpp/src/arrow/compute/row/row_encoder_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include "arrow/compute/row/row_encoder_internal.h"

#include "arrow/array/builder_binary.h"
#include "arrow/util/bitmap_writer.h"
#include "arrow/util/logging_internal.h"

Expand Down Expand Up @@ -256,6 +257,105 @@ Result<std::shared_ptr<ArrayData>> DictionaryKeyEncoder::Decode(uint8_t** encode
return data;
}

void BinaryViewKeyEncoder::AddLength(const ExecValue& data, int64_t batch_length,
int32_t* lengths) {
if (data.is_array()) {
int64_t i = 0;
ARROW_DCHECK_EQ(data.array.length, batch_length);
VisitArraySpanInline<BinaryViewType>(
data.array,
[&](std::string_view bytes) {
lengths[i++] +=
kExtraByteForNull + sizeof(Offset) + static_cast<int32_t>(bytes.size());
},
[&] { lengths[i++] += kExtraByteForNull + sizeof(Offset); });
} else {
const Scalar& scalar = *data.scalar;
const int32_t buffer_size =
scalar.is_valid
? static_cast<int32_t>(UnboxScalar<BinaryViewType>::Unbox(scalar).size())
: 0;
for (int64_t i = 0; i < batch_length; i++) {
lengths[i] += kExtraByteForNull + sizeof(Offset) + buffer_size;
}
}
}

void BinaryViewKeyEncoder::AddLengthNull(int32_t* length) {
*length += kExtraByteForNull + sizeof(Offset);
}

Status BinaryViewKeyEncoder::Encode(const ExecValue& data, int64_t batch_length,
uint8_t** encoded_bytes) {
auto handle_next_valid_value = [&encoded_bytes](std::string_view bytes) {
auto& encoded_ptr = *encoded_bytes++;
*encoded_ptr++ = kValidByte;
util::SafeStore(encoded_ptr, static_cast<Offset>(bytes.size()));
encoded_ptr += sizeof(Offset);
memcpy(encoded_ptr, bytes.data(), bytes.size());
encoded_ptr += bytes.size();
};
auto handle_next_null_value = [&encoded_bytes]() {
auto& encoded_ptr = *encoded_bytes++;
*encoded_ptr++ = kNullByte;
util::SafeStore(encoded_ptr, static_cast<Offset>(0));
encoded_ptr += sizeof(Offset);
};
if (data.is_array()) {
ARROW_DCHECK_EQ(data.length(), batch_length);
VisitArraySpanInline<BinaryViewType>(data.array, handle_next_valid_value,
handle_next_null_value);
} else {
const auto& scalar = data.scalar_as<BaseBinaryScalar>();
if (scalar.is_valid) {
const auto bytes = std::string_view{*scalar.value};
for (int64_t i = 0; i < batch_length; i++) {
handle_next_valid_value(bytes);
}
} else {
for (int64_t i = 0; i < batch_length; i++) {
handle_next_null_value();
}
}
}
return Status::OK();
}

void BinaryViewKeyEncoder::EncodeNull(uint8_t** encoded_bytes) {
auto& encoded_ptr = *encoded_bytes;
*encoded_ptr++ = kNullByte;
util::SafeStore(encoded_ptr, static_cast<Offset>(0));
encoded_ptr += sizeof(Offset);
}

Result<std::shared_ptr<ArrayData>> BinaryViewKeyEncoder::Decode(uint8_t** encoded_bytes,
int32_t length,
MemoryPool* pool) {
// Build a fresh view array; MakeBuilder gives the type-faithful builder and
// Append handles inline vs. out-of-line storage.
std::unique_ptr<ArrayBuilder> builder;
RETURN_NOT_OK(MakeBuilder(pool, type_, &builder));
auto& view_builder = checked_cast<BinaryViewBuilder&>(*builder);
RETURN_NOT_OK(view_builder.Reserve(length));

for (int32_t i = 0; i < length; ++i) {
uint8_t*& encoded_ptr = encoded_bytes[i];
const bool is_valid = (*encoded_ptr++ == kValidByte);
const auto key_length = util::SafeLoadAs<Offset>(encoded_ptr);
encoded_ptr += sizeof(Offset);
if (is_valid) {
RETURN_NOT_OK(view_builder.Append(encoded_ptr, key_length));
} else {
RETURN_NOT_OK(view_builder.AppendNull());
}
encoded_ptr += key_length; // zero for null rows
}

std::shared_ptr<Array> out;
RETURN_NOT_OK(view_builder.Finish(&out));
return out->data();
}

void RowEncoder::Init(const std::vector<TypeHolder>& column_types, ExecContext* ctx) {
ctx_ = ctx;
encoders_.resize(column_types.size());
Expand Down Expand Up @@ -301,6 +401,11 @@ void RowEncoder::Init(const std::vector<TypeHolder>& column_types, ExecContext*
continue;
}

if (is_binary_view_like(type.id())) {
encoders_[i] = std::make_shared<BinaryViewKeyEncoder>(type.GetSharedPtr());
continue;
}

// We should not get here
ARROW_DCHECK(false);
}
Expand Down
25 changes: 25 additions & 0 deletions cpp/src/arrow/compute/row/row_encoder_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,31 @@ struct VarLengthKeyEncoder : KeyEncoder {
std::shared_ptr<DataType> type_;
};

// Encodes BinaryView/StringView keys in the same variable-length row format as
// VarLengthKeyEncoder. The encoded row copies the key bytes, so Decode builds a
// fresh view array rather than aliasing the input's variadic buffers.
struct ARROW_COMPUTE_EXPORT BinaryViewKeyEncoder : KeyEncoder {
// On-row length prefix; matches the binary (int32-offset) encoder's format.
using Offset = int32_t;

explicit BinaryViewKeyEncoder(std::shared_ptr<DataType> type)
: type_(std::move(type)) {}

void AddLength(const ExecValue& data, int64_t batch_length, int32_t* lengths) override;

void AddLengthNull(int32_t* length) override;

Status Encode(const ExecValue& data, int64_t batch_length,
uint8_t** encoded_bytes) override;

void EncodeNull(uint8_t** encoded_bytes) override;

Result<std::shared_ptr<ArrayData>> Decode(uint8_t** encoded_bytes, int32_t length,
MemoryPool* pool) override;

std::shared_ptr<DataType> type_;
};

struct ARROW_COMPUTE_EXPORT NullKeyEncoder : KeyEncoder {
void AddLength(const ExecValue&, int64_t batch_length, int32_t* lengths) override {}

Expand Down
Loading