Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 8 additions & 11 deletions velox/experimental/cudf/exec/CudfAssignUniqueId.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,16 @@ CudfAssignUniqueId::CudfAssignUniqueId(
const std::shared_ptr<const core::AssignUniqueIdNode>& planNode,
int32_t uniqueTaskId,
std::shared_ptr<std::atomic_int64_t> rowIdPool)
: Operator(
: CudfOperatorBase(
operatorId,
driverCtx,
planNode->outputType(),
operatorId,
planNode->id(),
"CudfAssignUniqueId"),
NvtxHelper(
"CudfAssignUniqueId",
nvtx3::rgb{160, 82, 45}, // Sienna
operatorId,
fmt::format("[{}]", planNode->id())),
NvtxMethodFlag::kAll,
std::nullopt,
planNode),
rowIdPool_(std::move(rowIdPool)) {
VELOX_USER_CHECK_LT(
uniqueTaskId,
Expand All @@ -50,17 +50,14 @@ CudfAssignUniqueId::CudfAssignUniqueId(
maxRowIdCounterValue_ = 0;
}

void CudfAssignUniqueId::addInput(RowVectorPtr input) {
VELOX_NVTX_OPERATOR_FUNC_RANGE();
void CudfAssignUniqueId::doAddInput(RowVectorPtr input) {
auto numInput = input->size();
VELOX_CHECK_NE(
numInput, 0, "CudfAssignUniqueId::addInput received empty set of rows");
input_ = std::move(input);
}

RowVectorPtr CudfAssignUniqueId::getOutput() {
VELOX_NVTX_OPERATOR_FUNC_RANGE();

RowVectorPtr CudfAssignUniqueId::doGetOutput() {
if (input_ == nullptr) {
return nullptr;
}
Expand Down
12 changes: 6 additions & 6 deletions velox/experimental/cudf/exec/CudfAssignUniqueId.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@
*/
#pragma once

#include "velox/experimental/cudf/exec/NvtxHelper.h"
#include "velox/experimental/cudf/exec/CudfOperator.h"
#include "velox/experimental/cudf/vector/CudfVector.h"

#include "velox/exec/Operator.h"
#include "velox/vector/FlatVector.h"

namespace facebook::velox::cudf_velox {

class CudfAssignUniqueId : public exec::Operator, public NvtxHelper {
class CudfAssignUniqueId : public CudfOperatorBase {
public:
CudfAssignUniqueId(
int32_t operatorId,
Expand All @@ -44,10 +44,6 @@ class CudfAssignUniqueId : public exec::Operator, public NvtxHelper {
return input_ == nullptr;
}

void addInput(RowVectorPtr input) override;

RowVectorPtr getOutput() override;

exec::BlockingReason isBlocked(ContinueFuture* /*future*/) override {
return exec::BlockingReason::kNotBlocked;
}
Expand All @@ -59,6 +55,10 @@ class CudfAssignUniqueId : public exec::Operator, public NvtxHelper {

bool isFinished() override;

protected:
void doAddInput(RowVectorPtr input) override;
RowVectorPtr doGetOutput() override;

private:
std::unique_ptr<cudf::column> generateIdColumn(
vector_size_t size,
Expand Down
19 changes: 9 additions & 10 deletions velox/experimental/cudf/exec/CudfBatchConcat.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,20 @@ CudfBatchConcat::CudfBatchConcat(
int32_t operatorId,
exec::DriverCtx* driverCtx,
std::shared_ptr<const core::PlanNode> planNode)
: exec::Operator(
: CudfOperatorBase(
operatorId,
driverCtx,
planNode->outputType(),
operatorId,
planNode->id(),
"CudfBatchConcat"),
CudfOperator(
operatorId,
planNode->id(),
nvtx3::rgb{211, 211, 211} /* LightGrey */),
"CudfBatchConcat",
nvtx3::rgb{211, 211, 211}, /* LightGrey */
NvtxMethodFlag::kAll,
std::nullopt,
planNode),
driverCtx_(driverCtx),
targetRows_(CudfConfig::getInstance().batchSizeMinThreshold) {}

void CudfBatchConcat::addInput(RowVectorPtr input) {
void CudfBatchConcat::doAddInput(RowVectorPtr input) {
auto cudfVector = std::dynamic_pointer_cast<CudfVector>(input);
VELOX_CHECK_NOT_NULL(cudfVector, "CudfBatchConcat expects CudfVector input");

Expand All @@ -48,8 +48,7 @@ void CudfBatchConcat::addInput(RowVectorPtr input) {
buffer_.push_back(std::move(cudfVector));
}

RowVectorPtr CudfBatchConcat::getOutput() {
VELOX_NVTX_OPERATOR_FUNC_RANGE();
RowVectorPtr CudfBatchConcat::doGetOutput() {
// Drain the queue if there is any output to be flushed
if (!outputQueue_.empty()) {
auto table = std::move(outputQueue_.front());
Expand Down
10 changes: 5 additions & 5 deletions velox/experimental/cudf/exec/CudfBatchConcat.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

namespace facebook::velox::cudf_velox {

class CudfBatchConcat : public exec::Operator, public CudfOperator {
class CudfBatchConcat : public CudfOperatorBase {
public:
CudfBatchConcat(
int32_t operatorId,
Expand All @@ -38,16 +38,16 @@ class CudfBatchConcat : public exec::Operator, public CudfOperator {
currentNumRows_ < targetRows_;
}

void addInput(RowVectorPtr input) override;

RowVectorPtr getOutput() override;

exec::BlockingReason isBlocked(ContinueFuture* /*future*/) override {
return exec::BlockingReason::kNotBlocked;
}

bool isFinished() override;

protected:
void doAddInput(RowVectorPtr input) override;
RowVectorPtr doGetOutput() override;

private:
exec::DriverCtx* const driverCtx_;
std::vector<CudfVectorPtr> buffer_;
Expand Down
43 changes: 20 additions & 23 deletions velox/experimental/cudf/exec/CudfConversion.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,19 +75,18 @@ CudfFromVelox::CudfFromVelox(
RowTypePtr outputType,
exec::DriverCtx* driverCtx,
std::string planNodeId)
: exec::Operator(
: CudfOperatorBase(
operatorId,
driverCtx,
outputType,
operatorId,
planNodeId,
"CudfFromVelox"),
NvtxHelper(
"CudfFromVelox",
nvtx3::rgb{255, 140, 0}, // Orange
operatorId,
fmt::format("[{}]", planNodeId)) {}
NvtxMethodFlag::kAll,
std::nullopt,
std::nullopt) {}

void CudfFromVelox::addInput(RowVectorPtr input) {
VELOX_NVTX_OPERATOR_FUNC_RANGE();
void CudfFromVelox::doAddInput(RowVectorPtr input) {
if (input->size() > 0) {
// Materialize lazy vectors
for (auto& child : input->children()) {
Expand All @@ -101,8 +100,7 @@ void CudfFromVelox::addInput(RowVectorPtr input) {
}
}

RowVectorPtr CudfFromVelox::getOutput() {
VELOX_NVTX_OPERATOR_FUNC_RANGE();
RowVectorPtr CudfFromVelox::doGetOutput() {
const auto targetOutputSize =
preferredGpuBatchSizeRows(operatorCtx_->driverCtx()->queryConfig());

Expand Down Expand Up @@ -159,10 +157,10 @@ RowVectorPtr CudfFromVelox::getOutput() {
input->pool(), outputType_, size, std::move(tbl), stream);
}

void CudfFromVelox::close() {
void CudfFromVelox::doClose() {
// TODO(kn): Remove default stream after redesign of CudfFromVelox
cudf::get_default_stream(cudf::allow_default_stream).synchronize();
exec::Operator::close();
Operator::close();
inputs_.clear();
}

Expand All @@ -171,23 +169,23 @@ CudfToVelox::CudfToVelox(
RowTypePtr outputType,
exec::DriverCtx* driverCtx,
std::string planNodeId)
: exec::Operator(
: CudfOperatorBase(
operatorId,
driverCtx,
outputType,
operatorId,
planNodeId,
"CudfToVelox"),
NvtxHelper(
"CudfToVelox",
nvtx3::rgb{148, 0, 211}, // Purple
operatorId,
fmt::format("[{}]", planNodeId)) {}
NvtxMethodFlag::kAll,
std::nullopt,
std::nullopt) {}

bool CudfToVelox::isPassthroughMode() const {
return operatorCtx_->driverCtx()->queryConfig().get<bool>(
kPassthroughMode, true);
}

void CudfToVelox::addInput(RowVectorPtr input) {
void CudfToVelox::doAddInput(RowVectorPtr input) {
// Accumulate inputs
if (input->size() > 0) {
auto cudfInput = std::dynamic_pointer_cast<CudfVector>(input);
Expand All @@ -204,8 +202,7 @@ std::optional<uint64_t> CudfToVelox::averageRowSize() {
return averageRowSize_;
}

RowVectorPtr CudfToVelox::getOutput() {
VELOX_NVTX_OPERATOR_FUNC_RANGE();
RowVectorPtr CudfToVelox::doGetOutput() {
if (finished_ || inputs_.empty()) {
finished_ = noMoreInput_ && inputs_.empty();
return nullptr;
Expand Down Expand Up @@ -311,8 +308,8 @@ RowVectorPtr CudfToVelox::getOutput() {
return output;
}

void CudfToVelox::close() {
exec::Operator::close();
void CudfToVelox::doClose() {
Operator::close();
inputs_.clear();
}

Expand Down
24 changes: 11 additions & 13 deletions velox/experimental/cudf/exec/CudfConversion.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

#pragma once

#include "velox/experimental/cudf/exec/NvtxHelper.h"
#include "velox/experimental/cudf/exec/CudfOperator.h"
#include "velox/experimental/cudf/vector/CudfVector.h"

#include "velox/exec/Driver.h"
Expand All @@ -29,7 +29,7 @@

namespace facebook::velox::cudf_velox {

class CudfFromVelox : public exec::Operator, public NvtxHelper {
class CudfFromVelox : public CudfOperatorBase {
public:
static constexpr const char* kGpuBatchSizeRows =
"velox.cudf.gpu_batch_size_rows";
Expand All @@ -44,10 +44,6 @@ class CudfFromVelox : public exec::Operator, public NvtxHelper {
return !finished_;
}

void addInput(RowVectorPtr input) override;

RowVectorPtr getOutput() override;

exec::BlockingReason isBlocked(ContinueFuture* /*future*/) override {
return exec::BlockingReason::kNotBlocked;
}
Expand All @@ -56,15 +52,18 @@ class CudfFromVelox : public exec::Operator, public NvtxHelper {
return finished_;
}

void close() override;
protected:
void doAddInput(RowVectorPtr input) override;
RowVectorPtr doGetOutput() override;
void doClose() override;

private:
std::vector<RowVectorPtr> inputs_;
std::size_t currentOutputSize_ = 0;
bool finished_ = false;
};

class CudfToVelox : public exec::Operator, public NvtxHelper {
class CudfToVelox : public CudfOperatorBase {
public:
static constexpr const char* kPassthroughMode =
"velox.cudf.to_velox.passthrough_mode";
Expand All @@ -79,10 +78,6 @@ class CudfToVelox : public exec::Operator, public NvtxHelper {
return !finished_;
}

void addInput(RowVectorPtr input) override;

RowVectorPtr getOutput() override;

exec::BlockingReason isBlocked(ContinueFuture* /*future*/) override {
return exec::BlockingReason::kNotBlocked;
}
Expand All @@ -91,7 +86,10 @@ class CudfToVelox : public exec::Operator, public NvtxHelper {
return finished_;
}

void close() override;
protected:
void doAddInput(RowVectorPtr input) override;
RowVectorPtr doGetOutput() override;
void doClose() override;

private:
bool isPassthroughMode() const;
Expand Down
19 changes: 9 additions & 10 deletions velox/experimental/cudf/exec/CudfFilterProject.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,16 +133,17 @@ CudfFilterProject::CudfFilterProject(
velox::exec::DriverCtx* driverCtx,
const std::shared_ptr<const core::FilterNode>& filter,
const std::shared_ptr<const core::ProjectNode>& project)
: Operator(
: CudfOperatorBase(
operatorId,
driverCtx,
project ? project->outputType() : filter->outputType(),
operatorId,
project ? project->id() : filter->id(),
"CudfFilterProject"),
NvtxHelper(
"CudfFilterProject",
nvtx3::rgb{220, 20, 60}, // Crimson
operatorId,
fmt::format("[{}]", project ? project->id() : filter->id())),
NvtxMethodFlag::kAll,
std::nullopt,
project ? std::static_pointer_cast<const core::PlanNode>(project)
: std::static_pointer_cast<const core::PlanNode>(filter)),
hasFilter_(filter != nullptr),
project_(project),
filter_(filter) {
Expand Down Expand Up @@ -227,13 +228,11 @@ void CudfFilterProject::initialize() {
project_.reset();
}

void CudfFilterProject::addInput(RowVectorPtr input) {
void CudfFilterProject::doAddInput(RowVectorPtr input) {
input_ = std::move(input);
}

RowVectorPtr CudfFilterProject::getOutput() {
VELOX_NVTX_OPERATOR_FUNC_RANGE();

RowVectorPtr CudfFilterProject::doGetOutput() {
if (allInputProcessed()) {
return nullptr;
}
Expand Down
Loading