Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
zhli1142015 committed Dec 5, 2024
1 parent 89d888e commit d74a262
Show file tree
Hide file tree
Showing 2 changed files with 140 additions and 158 deletions.
2 changes: 1 addition & 1 deletion velox/functions/sparksql/specialforms/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
velox_add_library(
velox_functions_spark_specialforms
AtLeastNNonNulls.cpp
FromJson.cpp
DecimalRound.cpp
FromJson.cpp
MakeDecimal.cpp
SparkCastExpr.cpp
SparkCastHooks.cpp)
Expand Down
296 changes: 139 additions & 157 deletions velox/functions/sparksql/specialforms/FromJson.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,8 @@ struct ParseJsonTypedImpl {
template <TypeKind kind, typename Dummy = void>
struct KindDispatcher {
static simdjson::error_code apply(Input, exec::GenericWriter&, bool) {
VELOX_NYI(
"Casting from JSON to {} is not supported.", TypeTraits<kind>::name);
return simdjson::error_code::UNEXPECTED_ERROR; // Make compiler happy.
VELOX_NYI("Parse json to {} is not supported.", TypeTraits<kind>::name);
return simdjson::error_code::UNEXPECTED_ERROR;
}
};

Expand Down Expand Up @@ -367,155 +366,111 @@ struct ParseJsonTypedImpl {
}
};

template <TypeKind kind>
simdjson::error_code parseJsonOneRow(
simdjson::padded_string_view input,
exec::VectorWriter<Any>& writer) {
SIMDJSON_ASSIGN_OR_RAISE(auto doc, simdjsonParse(input));
if (doc.is_null()) {
writer.commitNull();
} else {
SIMDJSON_TRY(ParseJsonTypedImpl<simdjson::ondemand::document&>::apply<kind>(
doc, writer.current(), true));
writer.commit(true);
}
return simdjson::SUCCESS;
}

class FromJsonExpr : public SpecialForm {
/// @brief Parses a JSON string into the specified data type. Supports ROW,
/// ARRAY, and MAP as root types. Key Behavior:
/// - Failure Handling: Returns `NULL` for invalid JSON or incompatible values.
/// - Boolean: Only `true` and `false` are valid; others return `NULL`.
/// - Integral Types: Accepts only integers; floats or strings return `NULL`.
/// - Float/Double: All numbers are valid; strings like `"NaN"` , `"INF"`
/// `"Infinity"` are accepted, others return `NULL`.
/// - Array: Accepts JSON objects only if the array is the root type with ROW
/// child type.
/// - Map: Keys must be `VARCHAR` type.
/// - Row: Partial parsing is supported, but JSON arrays cannot be parsed into a
/// ROW type.
class FromJsonFunction final : public exec::VectorFunction {
public:
/// @param type The target type of the cast expression
/// @param expr The expression to gerenate input
/// @param trackCpuUsage Whether to track CPU usage
FromJsonExpr(TypePtr type, ExprPtr&& expr, bool trackCpuUsage)
: SpecialForm(
type,
std::vector<ExprPtr>({expr}),
FromJsonCallToSpecialForm::kFromJson,
false /* supportsFlatNoNullsFastPath */,
trackCpuUsage) {
if (!isSupportedType(type)) {
VELOX_UNSUPPORTED("Unsupported type {}.", type->toString());
}
simdjsonErrorsToExceptions(errors_);
}

void evalSpecialForm(
const SelectivityVector& rows,
EvalCtx& context,
VectorPtr& result) override {
VectorPtr input;
inputs_[0]->eval(rows, context, input);
auto toType = std::const_pointer_cast<const Type>(type_);
apply(rows, input, context, toType, result);
// Return 'input' back to the vector pool in 'context' so it can be reused.
context.releaseVector(input);
}

private:
void computePropagatesNulls() override {
propagatesNulls_ = false;
}

// Peal data.
void apply(
const SelectivityVector& rows,
const VectorPtr& input,
exec::EvalCtx& context,
const TypePtr& toType,
VectorPtr& result) {
LocalSelectivityVector remainingRows(context, rows);

context.deselectErrors(*remainingRows);

LocalDecodedVector decoded(context, *input, *remainingRows);
auto* rawNulls = decoded->nulls(remainingRows.get());

if (rawNulls) {
remainingRows->deselectNulls(
rawNulls, remainingRows->begin(), remainingRows->end());
}

VectorPtr localResult;
if (!remainingRows->hasSelections()) {
localResult =
BaseVector::createNullConstant(toType, rows.end(), context.pool());
} else if (decoded->isIdentityMapping()) {
applyPeeled(
*remainingRows, *decoded->base(), context, toType, localResult);
} else {
withContextSaver([&](ContextSaver& saver) {
LocalSelectivityVector newRowsHolder(*context.execCtx());

LocalDecodedVector localDecoded(context);
std::vector<VectorPtr> peeledVectors;
auto peeledEncoding = PeeledEncoding::peel(
{input}, *remainingRows, localDecoded, true, peeledVectors);
VELOX_CHECK_EQ(peeledVectors.size(), 1);
if (peeledVectors[0]->isLazy()) {
peeledVectors[0] =
peeledVectors[0]->as<LazyVector>()->loadedVectorShared();
}
auto newRows =
peeledEncoding->translateToInnerRows(*remainingRows, newRowsHolder);
// Save context and set the peel.
context.saveAndReset(saver, *remainingRows);
context.setPeeledEncoding(peeledEncoding);
applyPeeled(*newRows, *peeledVectors[0], context, toType, localResult);

localResult = context.getPeeledEncoding()->wrap(
toType, context.pool(), localResult, *remainingRows);
});
}
context.moveOrCopyResult(localResult, *remainingRows, result);
context.releaseVector(localResult);

// If there are nulls or rows that encountered errors in the input, add
// nulls to the result at the same rows.
VELOX_CHECK_NOT_NULL(result);
if (rawNulls || context.errors()) {
EvalCtx::addNulls(
rows, remainingRows->asRange().bits(), context, toType, result);
}
}

void applyPeeled(
const SelectivityVector& rows,
const BaseVector& input,
std::vector<VectorPtr>& args, // Not using const ref so we can reuse args
const TypePtr& outputType,
exec::EvalCtx& context,
const TypePtr& toType,
VectorPtr& result) {
context.ensureWritable(rows, toType, result);
VectorPtr& result) const final {
VELOX_USER_CHECK_EQ(args.size(), 1, "from_json expects one argument.");
VELOX_USER_CHECK(
args[0]->isConstantEncoding() || args[0]->isFlatEncoding(),
"Single-arg deterministic functions receive their only argument as flat or constant vector.");
context.ensureWritable(rows, outputType, result);
result->clearNulls(rows);
switch (result->typeKind()) {
case TypeKind::ARRAY: {
parseJson<TypeKind::ARRAY>(input, context, rows, *result);
if (args[0]->isConstantEncoding()) {
parseJsonConstant<TypeKind::ARRAY>(args[0], context, rows, *result);
} else {
parseJsonFlat<TypeKind::ARRAY>(args[0], context, rows, *result);
}
break;
}
case TypeKind::MAP: {
parseJson<TypeKind::MAP>(input, context, rows, *result);
if (args[0]->isConstantEncoding()) {
parseJsonConstant<TypeKind::MAP>(args[0], context, rows, *result);
} else {
parseJsonFlat<TypeKind::MAP>(args[0], context, rows, *result);
}
break;
}
case TypeKind::ROW: {
parseJson<TypeKind::ROW>(input, context, rows, *result);
if (args[0]->isConstantEncoding()) {
parseJsonConstant<TypeKind::ROW>(args[0], context, rows, *result);
} else {
parseJsonFlat<TypeKind::ROW>(args[0], context, rows, *result);
}
break;
}
default:
VELOX_UNSUPPORTED("INVALID_JSON_SCHEMA");
}
}

private:
template <TypeKind kind>
void parseJson(
const BaseVector& input,
void parseJsonConstant(
VectorPtr& input,
exec::EvalCtx& context,
const SelectivityVector& rows,
BaseVector& result) const {
// Result is guaranteed to be a flat writable vector.
auto* flatResult = result.as<typename KindToFlatVector<kind>::type>();
exec::VectorWriter<Any> writer;
writer.init(*flatResult);
// Input is guaranteed to be in flat or constant encodings when passed in.
auto* inputVector = input.as<SimpleVector<StringView>>();
const auto constInput = input->asUnchecked<ConstantVector<StringView>>();
if (constInput->isNullAt(0)) {
context.applyToSelectedNoThrow(rows, [&](auto row) {
writer.setOffset(row);
writer.commitNull();
});
} else {
const auto constant = constInput->valueAt(0);
paddedInput_.resize(constant.size() + simdjson::SIMDJSON_PADDING);
memcpy(paddedInput_.data(), constant.data(), constant.size());
simdjson::padded_string_view paddedInput(
paddedInput_.data(), constant.size(), paddedInput_.size());

simdjson::ondemand::document jsonDoc;
auto error = simdjsonParse(paddedInput).get(jsonDoc);

context.applyToSelectedNoThrow(rows, [&](auto row) {
writer.setOffset(row);
if (error != simdjson::SUCCESS ||
paseJsonOneRow<kind>(jsonDoc, writer) != simdjson::SUCCESS) {
writer.commitNull();
}
});
}

writer.finish();
}

template <TypeKind kind>
void parseJsonFlat(
VectorPtr& input,
exec::EvalCtx& context,
const SelectivityVector& rows,
BaseVector& result) const {
auto* flatResult = result.as<typename KindToFlatVector<kind>::type>();
exec::VectorWriter<Any> writer;
writer.init(*flatResult);
auto* inputVector = input->asUnchecked<FlatVector<StringView>>();
size_t maxSize = 0;
rows.applyToSelected([&](auto row) {
if (inputVector->isNullAt(row)) {
Expand All @@ -535,49 +490,67 @@ class FromJsonExpr : public SpecialForm {
memcpy(paddedInput_.data(), input.data(), input.size());
simdjson::padded_string_view paddedInput(
paddedInput_.data(), input.size(), paddedInput_.size());
if (auto error = parseJsonOneRow<kind>(paddedInput, writer)) {
simdjson::ondemand::document doc;
auto error = simdjsonParse(paddedInput).get(doc);
if (error != simdjson::SUCCESS ||
paseJsonOneRow<kind>(doc, writer) != simdjson::SUCCESS) {
writer.commitNull();
}
});
writer.finish();
}

bool isSupportedType(const TypePtr& other, bool isRootType = true) const {
switch (other->kind()) {
case TypeKind::ARRAY:
return isSupportedType(other->childAt(0), false);
case TypeKind::ROW:
for (const auto& child : other->as<TypeKind::ROW>().children()) {
if (!isSupportedType(child, false)) {
return false;
}
}
return true;
case TypeKind::MAP:
return (
other->childAt(0)->kind() == TypeKind::VARCHAR &&
isSupportedType(other->childAt(1), false));
case TypeKind::BOOLEAN:
case TypeKind::BIGINT:
case TypeKind::INTEGER:
case TypeKind::SMALLINT:
case TypeKind::TINYINT:
case TypeKind::DOUBLE:
case TypeKind::REAL:
case TypeKind::VARCHAR: {
if (other->isDate() || other->isDecimal()) {
template <TypeKind kind>
static simdjson::error_code paseJsonOneRow(
simdjson::ondemand::document& doc,
exec::VectorWriter<Any>& writer) {
if (doc.is_null()) {
writer.commitNull();
} else {
SIMDJSON_TRY(
ParseJsonTypedImpl<simdjson::ondemand::document&>::apply<kind>(
doc, writer.current(), true));
writer.commit(true);
}
return simdjson::SUCCESS;
}

mutable std::string paddedInput_;
};

bool isSupportedType(const TypePtr& other, bool isRootType = true) {
switch (other->kind()) {
case TypeKind::ARRAY:
return isSupportedType(other->childAt(0), false);
case TypeKind::ROW:
for (const auto& child : other->as<TypeKind::ROW>().children()) {
if (!isSupportedType(child, false)) {
return false;
}
return !isRootType;
}
default:
return true;
case TypeKind::MAP:
return (
other->childAt(0)->kind() == TypeKind::VARCHAR &&
isSupportedType(other->childAt(1), false));
case TypeKind::BOOLEAN:
case TypeKind::BIGINT:
case TypeKind::INTEGER:
case TypeKind::SMALLINT:
case TypeKind::TINYINT:
case TypeKind::DOUBLE:
case TypeKind::REAL:
case TypeKind::VARCHAR: {
if (other->isDate() || other->isDecimal()) {
return false;
}
return !isRootType;
}
default:
return false;
}
}

std::exception_ptr errors_[simdjson::NUM_ERROR_CODES];
mutable std::string paddedInput_;
};
} // namespace

TypePtr FromJsonCallToSpecialForm::resolveType(
Expand All @@ -596,7 +569,16 @@ exec::ExprPtr FromJsonCallToSpecialForm::constructSpecialForm(
TypeKind::VARCHAR,
"The first argument of from_json should be of varchar type.");

return std::make_shared<FromJsonExpr>(
type, std::move(args[0]), trackCpuUsage);
if (!isSupportedType(type)) {
VELOX_UNSUPPORTED("Unsupported type {}.", type->toString());
}

return std::make_shared<exec::Expr>(
type,
std::move(args),
std::make_shared<FromJsonFunction>(),
exec::VectorFunctionMetadata{},
kFromJson,
trackCpuUsage);
}
} // namespace facebook::velox::functions::sparksql

0 comments on commit d74a262

Please sign in to comment.