Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pass in Python callback to native FlightSQLServer, invoke in GetFlightInfoStatement #492

Merged
merged 19 commits into from
Apr 21, 2024
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 15 additions & 16 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,13 @@ find_package(Boost REQUIRED)

add_subdirectory(third_party)

add_library(brad_server_lib OBJECT
server/brad_server_simple.cc
server/brad_sql_info.cc
server/brad_statement_batch_reader.cc
server/brad_statement.cc
server/brad_tables_schema_batch_reader.cc)
# Concurrent hash table
FetchContent_Declare(
libcuckoo
GIT_REPOSITORY https://github.com/efficient/libcuckoo.git
GIT_TAG 784d0f5d147b9a73f897ae55f6c3712d9a91b058
)
FetchContent_MakeAvailable(libcuckoo)
sopzha marked this conversation as resolved.
Show resolved Hide resolved

add_library(sqlite_server_lib OBJECT
sqlite_server/sqlite_server.cc
Expand All @@ -31,12 +32,18 @@ add_library(sqlite_server_lib OBJECT
sqlite_server/sqlite_tables_schema_batch_reader.cc
sqlite_server/sqlite_type_info.cc)

pybind11_add_module(pybind_brad_server pybind/brad_server.cc)
pybind11_add_module(pybind_brad_server pybind/brad_server.cc
server/brad_server_simple.cc
server/brad_sql_info.cc
server/brad_statement_batch_reader.cc
server/brad_statement.cc
server/brad_tables_schema_batch_reader.cc)

target_link_libraries(pybind_brad_server
PRIVATE Arrow::arrow_shared
PRIVATE ArrowFlight::arrow_flight_shared
PRIVATE ArrowFlightSql::arrow_flight_sql_shared
PRIVATE brad_server_lib)
PUBLIC libcuckoo)

add_executable(flight_sql_example_client flight_sql_example_client.cc)
target_link_libraries(flight_sql_example_client
Expand All @@ -55,14 +62,6 @@ target_link_libraries(flight_sql_example_server
${SQLite3_LIBRARIES}
${Boost_LIBRARIES})

add_executable(flight_sql_brad_server flight_sql_brad_server.cc)
target_link_libraries(flight_sql_brad_server
PRIVATE Arrow::arrow_shared
PRIVATE ArrowFlight::arrow_flight_shared
PRIVATE ArrowFlightSql::arrow_flight_sql_shared
PRIVATE brad_server_lib
gflags)

add_executable(brad_front_end brad_front_end.cc)
target_link_libraries(brad_front_end
PRIVATE Arrow::arrow_shared
Expand Down
2 changes: 2 additions & 0 deletions cpp/pybind/brad_server.cc
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
#include <pybind11/pybind11.h>
#include <pybind11/functional.h>
#include <pybind11/stl.h>

#include <iostream>

Expand Down
77 changes: 62 additions & 15 deletions cpp/server/brad_server_simple.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,16 @@ using arrow::internal::checked_cast;
using namespace arrow::flight;
using namespace arrow::flight::sql;

arrow::Result<Ticket> EncodeTransactionQuery(
const std::string &query,
std::string GetQueryTicket(
const std::string &autoincrement_id,
const std::string &transaction_id) {
std::string transaction_query = transaction_id;
transaction_query += ':';
transaction_query += query;
return transaction_id + ':' + autoincrement_id;
}

arrow::Result<Ticket> EncodeTransactionQuery(
const std::string &query_ticket) {
ARROW_ASSIGN_OR_RAISE(auto ticket_string,
CreateStatementQueryTicket(transaction_query));
CreateStatementQueryTicket(query_ticket));
return Ticket{std::move(ticket_string)};
}

Expand All @@ -40,8 +42,27 @@ arrow::Result<std::pair<std::string, std::string>> DecodeTransactionQuery(
return arrow::Status::Invalid("Malformed ticket");
}
std::string transaction_id = ticket.substr(0, divider);
std::string query = ticket.substr(divider + 1);
return std::make_pair(std::move(query), std::move(transaction_id));
std::string autoincrement_id = ticket.substr(divider + 1);
return std::make_pair(std::move(autoincrement_id), std::move(transaction_id));
}

std::vector<std::vector<std::any>> TransformQueryResult(
std::vector<py::tuple> query_result) {
std::vector<std::vector<std::any>> transformed_query_result;
for (const auto &row : query_result) {
std::vector<std::any> transformed_row{};
for (const auto &field : row) {
if (py::isinstance<py::int_>(field)) {
transformed_row.push_back(std::make_any<int>(py::cast<int>(field)));
} else if (py::isinstance<py::float_>(field)) {
transformed_row.push_back(std::make_any<float>(py::cast<float>(field)));
} else {
transformed_row.push_back(std::make_any<std::string>(py::cast<std::string>(field)));
}
}
transformed_query_result.push_back(transformed_row);
}
return transformed_query_result;
geoffxy marked this conversation as resolved.
Show resolved Hide resolved
}

BradFlightSqlServer::BradFlightSqlServer() = default;
sopzha marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -50,7 +71,6 @@ BradFlightSqlServer::~BradFlightSqlServer() = default;

std::shared_ptr<BradFlightSqlServer>
BradFlightSqlServer::Create() {
// std::shared_ptr<BradFlightSqlServer> result(new BradFlightSqlServer());
std::shared_ptr<BradFlightSqlServer> result =
std::make_shared<BradFlightSqlServer>();
for (const auto &id_to_result : GetSqlInfoResultMap()) {
Expand All @@ -59,9 +79,15 @@ std::shared_ptr<BradFlightSqlServer>
return result;
}

void BradFlightSqlServer::InitWrapper(const std::string &host, int port) {
void BradFlightSqlServer::InitWrapper(
const std::string &host,
int port,
std::function<std::vector<py::tuple>(std::string)> handle_query) {
auto location = arrow::flight::Location::ForGrpcTcp(host, port).ValueOrDie();
arrow::flight::FlightServerOptions options(location);

handle_query_ = handle_query;

this->Init(options);
}

Expand All @@ -79,10 +105,28 @@ arrow::Result<std::unique_ptr<FlightInfo>>
const StatementQuery &command,
const FlightDescriptor &descriptor) {
const std::string &query = command.query;
ARROW_ASSIGN_OR_RAISE(auto statement, BradStatement::Create(query));
ARROW_ASSIGN_OR_RAISE(auto schema, statement->GetSchema());

const std::string &autoincrement_id = std::to_string(++autoincrement_id_);
const std::string &query_ticket = GetQueryTicket(autoincrement_id, command.transaction_id);
ARROW_ASSIGN_OR_RAISE(auto ticket,
EncodeTransactionQuery(query, command.transaction_id));
EncodeTransactionQuery(query_ticket));

std::vector<std::vector<std::any>> transformed_query_result;

{
py::gil_scoped_acquire guard;
std::vector<py::tuple> query_result = handle_query_(query);
transformed_query_result = TransformQueryResult(query_result);
}

{
std::scoped_lock guard(query_data_mutex_);
sopzha marked this conversation as resolved.
Show resolved Hide resolved
query_data_.insert(query_ticket, transformed_query_result);
}

ARROW_ASSIGN_OR_RAISE(auto statement, BradStatement::Create(transformed_query_result));
ARROW_ASSIGN_OR_RAISE(auto schema, statement->GetSchema());

std::vector<FlightEndpoint> endpoints{
FlightEndpoint{std::move(ticket), {}, std::nullopt, ""}};

Expand All @@ -103,11 +147,14 @@ arrow::Result<std::unique_ptr<FlightDataStream>>
const StatementQueryTicket &command) {
ARROW_ASSIGN_OR_RAISE(auto pair,
DecodeTransactionQuery(command.statement_handle));
const std::string &sql = pair.first;
const std::string &autoincrement_id = pair.first;
const std::string transaction_id = pair.second;

const std::string &query_ticket = transaction_id + ':' + autoincrement_id;
const auto query_result = query_data_.find(query_ticket);
sopzha marked this conversation as resolved.
Show resolved Hide resolved

std::shared_ptr<BradStatement> statement;
ARROW_ASSIGN_OR_RAISE(statement, BradStatement::Create(sql));
ARROW_ASSIGN_OR_RAISE(statement, BradStatement::Create(query_result));
sopzha marked this conversation as resolved.
Show resolved Hide resolved

std::shared_ptr<BradStatementBatchReader> reader;
ARROW_ASSIGN_OR_RAISE(reader, BradStatementBatchReader::Create(statement));
Expand Down
22 changes: 21 additions & 1 deletion cpp/server/brad_server_simple.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,21 @@
#include <cstdint>
#include <memory>
#include <string>
#include <functional>
#include <any>
#include <atomic>
#include <mutex>
sopzha marked this conversation as resolved.
Show resolved Hide resolved

#include <arrow/flight/sql/server.h>
#include <arrow/result.h>

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Another minor thing:) We should still have the includes for atomic and functional (since they are used in this class) 😄. We should add #include <vector> and #include <string> too. My previous comment was referring to removing mutex since we aren't using it in the class anymore.

#include "libcuckoo/cuckoohash_map.hh"

#include <pybind11/pybind11.h>

namespace py = pybind11;
using namespace pybind11::literals;

namespace brad {

class BradFlightSqlServer : public arrow::flight::sql::FlightSqlServerBase {
Expand All @@ -17,7 +28,9 @@ class BradFlightSqlServer : public arrow::flight::sql::FlightSqlServerBase {

static std::shared_ptr<BradFlightSqlServer> Create();

void InitWrapper(const std::string &host, int port);
void InitWrapper(const std::string &host,
int port,
std::function<std::vector<py::tuple>(std::string)>);

void ServeWrapper();

Expand All @@ -33,6 +46,13 @@ class BradFlightSqlServer : public arrow::flight::sql::FlightSqlServerBase {
DoGetStatement(
const arrow::flight::ServerCallContext &context,
const arrow::flight::sql::StatementQueryTicket &command) override;

sopzha marked this conversation as resolved.
Show resolved Hide resolved
std::function<std::vector<py::tuple>(std::string)> handle_query_;

libcuckoo::cuckoohash_map<std::string, std::vector<std::vector<std::any>>> query_data_;
std::mutex query_data_mutex_;

std::atomic<uint64_t> autoincrement_id_;
};

} // namespace brad
57 changes: 23 additions & 34 deletions cpp/server/brad_statement.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,47 +24,36 @@ arrow::Result<std::shared_ptr<BradStatement>> BradStatement::Create(
return result;
}

BradStatement::~BradStatement() {
arrow::Result<std::shared_ptr<BradStatement>> BradStatement::Create(
std::vector<std::vector<std::any>> query_result) {
std::shared_ptr<BradStatement> result(
new BradStatement(query_result));
sopzha marked this conversation as resolved.
Show resolved Hide resolved
return result;
}

arrow::Result<std::shared_ptr<arrow::Schema>> BradStatement::GetSchema() const {
std::vector<std::shared_ptr<arrow::Field>> fields;
fields.push_back(arrow::field("Day", arrow::int8()));
fields.push_back(arrow::field("Month", arrow::int8()));
fields.push_back(arrow::field("Year", arrow::int16()));
return arrow::schema(fields);
BradStatement::BradStatement(std::vector<std::vector<std::any>> query_result) {
sopzha marked this conversation as resolved.
Show resolved Hide resolved
query_result_ = query_result;
}

arrow::Result<std::shared_ptr<arrow::RecordBatch>> BradStatement::FetchResult() {
sopzha marked this conversation as resolved.
Show resolved Hide resolved
arrow::Int8Builder int8builder;
int8_t days_raw[5] = {1, 12, 17, 23, 28};
ARROW_RETURN_NOT_OK(int8builder.AppendValues(days_raw, 5));
std::shared_ptr<arrow::Array> days;
ARROW_ASSIGN_OR_RAISE(days, int8builder.Finish());

int8_t months_raw[5] = {1, 3, 5, 7, 1};
ARROW_RETURN_NOT_OK(int8builder.AppendValues(months_raw, 5));
std::shared_ptr<arrow::Array> months;
ARROW_ASSIGN_OR_RAISE(months, int8builder.Finish());

arrow::Int16Builder int16builder;
int16_t years_raw[5] = {1990, 2000, 1995, 2000, 1995};
ARROW_RETURN_NOT_OK(int16builder.AppendValues(years_raw, 5));
std::shared_ptr<arrow::Array> years;
ARROW_ASSIGN_OR_RAISE(years, int16builder.Finish());

std::shared_ptr<arrow::RecordBatch> record_batch;
BradStatement::~BradStatement() {
}

arrow::Result<std::shared_ptr<arrow::Schema>> result = GetSchema();
if (result.ok()) {
std::shared_ptr<arrow::Schema> schema = result.ValueOrDie();
record_batch = arrow::RecordBatch::Make(schema,
days->length(),
{days, months, years});
return record_batch;
arrow::Result<std::shared_ptr<arrow::Schema>> BradStatement::GetSchema() const {
std::vector<std::shared_ptr<arrow::Field>> fields;
const std::vector<std::any> &row = query_result_[0];
sopzha marked this conversation as resolved.
Show resolved Hide resolved

for (const auto &field : row) {
std::string field_type = field.type().name();
if (field_type == "i") {
fields.push_back(arrow::field("INT FIELD", arrow::int8()));
sopzha marked this conversation as resolved.
Show resolved Hide resolved
} else if (field_type == "f") {
fields.push_back(arrow::field("FLOAT FIELD", arrow::float16()));
sopzha marked this conversation as resolved.
Show resolved Hide resolved
} else {
fields.push_back(arrow::field("STRING FIELD", arrow::utf8()));
}
sopzha marked this conversation as resolved.
Show resolved Hide resolved
}

return arrow::Status::OK();
return arrow::schema(fields);
sopzha marked this conversation as resolved.
Show resolved Hide resolved
}

std::string* BradStatement::GetBradStmt() const { return stmt_; }
Expand Down
15 changes: 13 additions & 2 deletions cpp/server/brad_statement.h
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
#pragma once

#include <memory>
#include <any>
#include <string>

#include <arrow/flight/sql/column_metadata.h>
#include <arrow/type_fwd.h>

#include <pybind11/pybind11.h>

namespace py = pybind11;
using namespace pybind11::literals;
sopzha marked this conversation as resolved.
Show resolved Hide resolved

namespace brad {

/// \brief Create an object ColumnMetadata using the column type and
Expand All @@ -23,16 +29,21 @@ class BradStatement {
static arrow::Result<std::shared_ptr<BradStatement>> Create(
const std::string& sql);

static arrow::Result<std::shared_ptr<BradStatement>> Create(
const std::vector<std::vector<std::any>>);

BradStatement(std::vector<std::vector<std::any>>);

~BradStatement();

/// \brief Creates an Arrow Schema based on the results of this statement.
/// \return The resulting Schema.
arrow::Result<std::shared_ptr<arrow::Schema>> GetSchema() const;

arrow::Result<std::shared_ptr<arrow::RecordBatch>> FetchResult();

std::string* GetBradStmt() const;

std::vector<std::vector<std::any>> query_result_;
sopzha marked this conversation as resolved.
Show resolved Hide resolved

private:
std::string* stmt_;

Expand Down
2 changes: 1 addition & 1 deletion cpp/server/brad_statement_batch_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ arrow::Status BradStatementBatchReader::ReadNext(std::shared_ptr<arrow::RecordBa
return arrow::Status::OK();
}

ARROW_ASSIGN_OR_RAISE(*out, statement_->FetchResult());
// ARROW_ASSIGN_OR_RAISE(*out, statement_->FetchResult());
sopzha marked this conversation as resolved.
Show resolved Hide resolved
already_executed_ = true;
return arrow::Status::OK();
}
Expand Down
5 changes: 3 additions & 2 deletions src/brad/front_end/flight_sql_server.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import logging
import threading
from typing import Callable

# pylint: disable-next=import-error,no-name-in-module,unused-import
import brad.native.pybind_brad_server as brad_server
Expand All @@ -8,9 +9,9 @@


class BradFlightSqlServer:
def __init__(self, host: str, port: int) -> None:
def __init__(self, host: str, port: int, callback: Callable) -> None:
self._flight_sql_server = brad_server.BradFlightSqlServer()
self._flight_sql_server.init(host, port)
self._flight_sql_server.init(host, port, callback)
self._thread = threading.Thread(name="BradFlightSqlServer", target=self._serve)

def start(self) -> None:
Expand Down
Loading
Loading