diff --git a/.github/workflows/build-test.yml b/.github/workflows/build-test.yml index cc06c7142..309643987 100644 --- a/.github/workflows/build-test.yml +++ b/.github/workflows/build-test.yml @@ -363,7 +363,19 @@ jobs: rm -rf default.etcd rm -rf /dev/shm/etcd* python3 test/runner.py --with-io --with-migration + + - name: Run FUSE Tests + run: | + export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/usr/local/lib:/usr/local/lib64:/usr/local/lib/x86_64-linux-gnu + + export VINEYARD_DEVELOP=TRUE + export VINEYARD_DATA_DIR=`pwd`/gstest + export TMPDIR="${TMPDIR:-$(dirname $(mktemp))}" + rm -rf default.etcd + rm -rf /dev/shm/etcd* + python3 test/runner.py --with-fuse + - name: Find vineyard using CMake run: | cmake -S test/vineyard-cmake-example -B build/vineyard-cmake-example diff --git a/.gitmodules b/.gitmodules index ae14c5c47..2a6a0bb65 100644 --- a/.gitmodules +++ b/.gitmodules @@ -33,4 +33,5 @@ [submodule "modules/hosseinmoein-dataframe/thirdparty/DataFrame"] path = modules/hosseinmoein-dataframe/thirdparty/DataFrame url = https://github.com/hosseinmoein/DataFrame.git - shallow = true + shallow = true + diff --git a/cmake/FindParquet.cmake b/cmake/FindParquet.cmake new file mode 100644 index 000000000..84e8d4489 --- /dev/null +++ b/cmake/FindParquet.cmake @@ -0,0 +1,126 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# - Find Parquet (parquet/api/reader.h, libparquet.a, libparquet.so) +# +# This module requires Arrow from which it uses +# arrow_find_package() +# +# This module defines +# PARQUET_FOUND, whether Parquet has been found +# PARQUET_IMPORT_LIB, path to libparquet's import library (Windows only) +# PARQUET_INCLUDE_DIR, directory containing headers +# PARQUET_LIBS, deprecated. Use PARQUET_LIB_DIR instead +# PARQUET_LIB_DIR, directory containing Parquet libraries +# PARQUET_SHARED_IMP_LIB, deprecated. Use PARQUET_IMPORT_LIB instead +# PARQUET_SHARED_LIB, path to libparquet's shared library +# PARQUET_SO_VERSION, shared object version of found Parquet such as "100" +# PARQUET_STATIC_LIB, path to libparquet.a + +if(DEFINED PARQUET_FOUND) + return() +endif() + +set(find_package_arguments) +if(${CMAKE_FIND_PACKAGE_NAME}_FIND_VERSION) + list(APPEND find_package_arguments "${${CMAKE_FIND_PACKAGE_NAME}_FIND_VERSION}") +endif() +if(${CMAKE_FIND_PACKAGE_NAME}_FIND_REQUIRED) + list(APPEND find_package_arguments REQUIRED) +endif() +if(${CMAKE_FIND_PACKAGE_NAME}_FIND_QUIETLY) + list(APPEND find_package_arguments QUIET) +endif() +find_package(Arrow ${find_package_arguments}) + +if(NOT "$ENV{PARQUET_HOME}" STREQUAL "") + file(TO_CMAKE_PATH "$ENV{PARQUET_HOME}" PARQUET_HOME) +endif() + +if((NOT PARQUET_HOME) AND ARROW_HOME) + set(PARQUET_HOME ${ARROW_HOME}) +endif() + +if(ARROW_FOUND) + arrow_find_package(PARQUET + "${PARQUET_HOME}" + parquet + parquet/api/reader.h + Parquet + parquet) + if(PARQUET_HOME) + if(PARQUET_INCLUDE_DIR) + file(READ "${PARQUET_INCLUDE_DIR}/parquet/parquet_version.h" + PARQUET_VERSION_H_CONTENT) + arrow_extract_macro_value(PARQUET_VERSION_MAJOR "PARQUET_VERSION_MAJOR" + "${PARQUET_VERSION_H_CONTENT}") + arrow_extract_macro_value(PARQUET_VERSION_MINOR "PARQUET_VERSION_MINOR" + "${PARQUET_VERSION_H_CONTENT}") + arrow_extract_macro_value(PARQUET_VERSION_PATCH "PARQUET_VERSION_PATCH" + "${PARQUET_VERSION_H_CONTENT}") + if("${PARQUET_VERSION_MAJOR}" STREQUAL "" + OR "${PARQUET_VERSION_MINOR}" STREQUAL "" + OR "${PARQUET_VERSION_PATCH}" STREQUAL "") + set(PARQUET_VERSION "0.0.0") + else() + set(PARQUET_VERSION + "${PARQUET_VERSION_MAJOR}.${PARQUET_VERSION_MINOR}.${PARQUET_VERSION_PATCH}") + endif() + + arrow_extract_macro_value(PARQUET_SO_VERSION_QUOTED "PARQUET_SO_VERSION" + "${PARQUET_VERSION_H_CONTENT}") + string(REGEX REPLACE "^\"(.+)\"$" "\\1" PARQUET_SO_VERSION + "${PARQUET_SO_VERSION_QUOTED}") + arrow_extract_macro_value(PARQUET_FULL_SO_VERSION_QUOTED "PARQUET_FULL_SO_VERSION" + "${PARQUET_VERSION_H_CONTENT}") + string(REGEX REPLACE "^\"(.+)\"$" "\\1" PARQUET_FULL_SO_VERSION + "${PARQUET_FULL_SO_VERSION_QUOTED}") + endif() + else() + if(PARQUET_USE_CMAKE_PACKAGE_CONFIG) + find_package(Parquet CONFIG) + elseif(PARQUET_USE_PKG_CONFIG) + pkg_get_variable(PARQUET_SO_VERSION parquet so_version) + pkg_get_variable(PARQUET_FULL_SO_VERSION parquet full_so_version) + endif() + endif() + set(PARQUET_ABI_VERSION "${PARQUET_SO_VERSION}") +endif() + +mark_as_advanced(PARQUET_ABI_VERSION + PARQUET_IMPORT_LIB + PARQUET_INCLUDE_DIR + PARQUET_LIBS + PARQUET_LIB_DIR + PARQUET_SHARED_IMP_LIB + PARQUET_SHARED_LIB + PARQUET_SO_VERSION + PARQUET_STATIC_LIB + PARQUET_VERSION) + +find_package_handle_standard_args( + Parquet + REQUIRED_VARS PARQUET_INCLUDE_DIR PARQUET_LIB_DIR PARQUET_SO_VERSION + VERSION_VAR PARQUET_VERSION) +set(PARQUET_FOUND ${Parquet_FOUND}) + +if(Parquet_FOUND AND NOT Parquet_FIND_QUIETLY) + message(STATUS "Parquet version: ${PARQUET_VERSION} (${PARQUET_FIND_APPROACH})") + message(STATUS "Found the Parquet shared library: ${PARQUET_SHARED_LIB}") + message(STATUS "Found the Parquet import library: ${PARQUET_IMPORT_LIB}") + message(STATUS "Found the Parquet static library: ${PARQUET_STATIC_LIB}") +endif() \ No newline at end of file diff --git a/modules/basic/ds/dataframe.cc b/modules/basic/ds/dataframe.cc index cfc25fef5..bb9671717 100644 --- a/modules/basic/ds/dataframe.cc +++ b/modules/basic/ds/dataframe.cc @@ -91,6 +91,15 @@ const std::shared_ptr DataFrame::AsBatch(bool copy) const { columns[i] = arrow::MakeArray(arrow::ArrayData::Make( FromAnyType(df_col->value_type()), num_rows, {nullptr, copied_buffer})); + + std::shared_ptr sca; + CHECK_ARROW_ERROR_AND_ASSIGN(sca, columns[i]->GetScalar(0)); + + DLOG(INFO) << "at column" << i << " start element : " << sca->ToString() + << " value type: " << df_col->value_type() + << " meta data type name:" << df_col->meta().GetTypeName() + << std::endl; + fields[i] = std::make_shared( field_name, FromAnyType(df_col->value_type())); } diff --git a/modules/basic/ds/types.cc b/modules/basic/ds/types.cc index 68cbe99ff..bd63fc89d 100644 --- a/modules/basic/ds/types.cc +++ b/modules/basic/ds/types.cc @@ -30,6 +30,8 @@ AnyType ParseAnyType(const std::string& type_name) { return AnyType::UInt64; } else if (type_name == "float") { return AnyType::Float; + } else if (type_name == "float64") { + return AnyType::Double; } else if (type_name == "double") { return AnyType::Double; } else if (type_name == "string") { diff --git a/modules/fuse/CMakeLists.txt b/modules/fuse/CMakeLists.txt index 661859bce..73a75fe29 100644 --- a/modules/fuse/CMakeLists.txt +++ b/modules/fuse/CMakeLists.txt @@ -1,16 +1,18 @@ # build vineyard-fuse set(FUSE_SRC_FILES) -list(APPEND FUSE_SRC_FILES "fused.cc") -list(APPEND FUSE_SRC_FILES "adaptors/arrow.cc") -list(APPEND FUSE_SRC_FILES "adaptors/orc.cc") -list(APPEND FUSE_SRC_FILES "adaptors/parquet.cc") + +list(APPEND FUSE_SRC_FILES "adaptors/arrow_ipc/deserializer_registry.cc") +list(APPEND FUSE_SRC_FILES "adaptors/arrow_ipc/serializer_registry.cc") +list(APPEND FUSE_SRC_FILES "fuse_impl.cc") add_library(vineyard_fuse ${FUSE_SRC_FILES}) target_link_libraries(vineyard_fuse PUBLIC vineyard_client vineyard_basic ${ARROW_SHARED_LIB} ) +set_target_properties(vineyard_fuse PROPERTIES CXX_STANDARD 14) target_link_libraries(vineyard_fuse PUBLIC FUSE3::FUSE3) +target_compile_options(vineyard_fuse PUBLIC -DWITH_ARROW_IPC) if(BUILD_VINEYARD_FUSE_PARQUET) target_compile_options(vineyard_fuse PUBLIC -DWITH_PARQUET) if(TARGET parquet_shared) @@ -19,12 +21,17 @@ if(BUILD_VINEYARD_FUSE_PARQUET) target_link_libraries(vineyard_fuse PUBLIC parquet_static) endif() endif() +target_include_directories(vineyard_fuse PRIVATE ${CMAKE_SOURCE_DIR}) install_vineyard_target(vineyard_fuse) install_vineyard_headers("${CMAKE_CURRENT_SOURCE_DIR}") -add_executable(vineyard-fusermount fusermount.cc) -target_link_libraries(vineyard-fusermount PRIVATE vineyard_fuse) +set(FUSE_MOUNT_SRC_FILES) + +add_executable(vineyard-fusermount fusermount.cc) +target_include_directories(vineyard-fusermount PRIVATE ${CMAKE_SOURCE_DIR}) + +target_link_libraries(vineyard-fusermount PUBLIC vineyard_fuse) install_vineyard_target(vineyard-fusermount) if(BUILD_VINEYARD_TESTS) diff --git a/modules/fuse/adaptors/arrow_ipc/deserializer_registry.cc b/modules/fuse/adaptors/arrow_ipc/deserializer_registry.cc new file mode 100644 index 000000000..5ec0c5a90 --- /dev/null +++ b/modules/fuse/adaptors/arrow_ipc/deserializer_registry.cc @@ -0,0 +1,201 @@ +/** Copyright 2020-2022 Alibaba Group Holding Limited. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +#include "fuse/adaptors/arrow_ipc/deserializer_registry.h" + +namespace vineyard { +namespace fuse { + +std::shared_ptr extractVineyardMetaToArrowMeta( + std::shared_ptr obj) { + auto kvmeta = std::shared_ptr( + new arrow::KeyValueMetadata({}, {})); + + auto meta = obj->meta(); + + for (auto i : meta) { + std::string v = i.value().dump(); + kvmeta->Append(i.key(), v); + } + return kvmeta; +} + +template +std::shared_ptr numeric_array_arrow_ipc_view( + const std::shared_ptr& p) { + auto arr = std::dynamic_pointer_cast>(p); + DLOG(INFO) << "numeric_array_arrow_ipc_view" << type_name() + << " is called"; + std::shared_ptr ssink; + + CHECK_ARROW_ERROR_AND_ASSIGN(ssink, arrow::io::BufferOutputStream::Create()); + DLOG(INFO) << "buffer successfully created"; + + auto kvmeta = extractVineyardMetaToArrowMeta(arr); + auto schema = arrow::schema( + {arrow::field("a", ConvertToArrowType::TypeValue())}, kvmeta); + std::shared_ptr my_table = + arrow::Table::Make(schema, {arr->GetArray()}); + + std::shared_ptr writer; + CHECK_ARROW_ERROR_AND_ASSIGN(writer, + arrow::ipc::MakeFileWriter(ssink, schema)); + + VINEYARD_CHECK_OK(writer->WriteTable(*my_table)); + DLOG(INFO) << "table is written"; + writer->Close(); + std::shared_ptr buffer_; + DLOG(INFO) << "writer is closed"; + CHECK_ARROW_ERROR_AND_ASSIGN(buffer_, ssink->Finish()); + DLOG(INFO) << "buffer is extracted"; + return buffer_; +} + +std::shared_ptr string_array_arrow_ipc_view( + const std::shared_ptr& p) { + auto arr = std::dynamic_pointer_cast< + vineyard::BaseBinaryArray>(p); + DLOG(INFO) << "string_array_arrow_ipc_view is called"; + std::shared_ptr ssink; + + CHECK_ARROW_ERROR_AND_ASSIGN(ssink, arrow::io::BufferOutputStream::Create()); + DLOG(INFO) << "buffer successfully created"; + + auto kvmeta = extractVineyardMetaToArrowMeta(arr); + auto schema = arrow::schema( + {arrow::field("a", ConvertToArrowType::TypeValue())}, + kvmeta); + std::shared_ptr my_table = + arrow::Table::Make(schema, {arr->GetArray()}); + + std::shared_ptr writer; + CHECK_ARROW_ERROR_AND_ASSIGN(writer, + arrow::ipc::MakeFileWriter(ssink, schema)); + + VINEYARD_CHECK_OK(writer->WriteTable(*my_table)); + DLOG(INFO) << "table is written"; + writer->Close(); + std::shared_ptr buffer_; + DLOG(INFO) << "writer is closed"; + CHECK_ARROW_ERROR_AND_ASSIGN(buffer_, ssink->Finish()); + DLOG(INFO) << "buffer is extracted"; + return buffer_; +} + +std::shared_ptr bool_array_arrow_ipc_view( + const std::shared_ptr& p) { + auto arr = std::dynamic_pointer_cast(p); + DLOG(INFO) << "bool_array_arrow_ipc_view is called"; + std::shared_ptr ssink; + auto kvmeta = extractVineyardMetaToArrowMeta(arr); + CHECK_ARROW_ERROR_AND_ASSIGN(ssink, arrow::io::BufferOutputStream::Create()); + DLOG(INFO) << "buffer successfully created"; + + auto schema = arrow::schema( + {arrow::field("a", ConvertToArrowType::TypeValue())}, kvmeta); + std::shared_ptr my_table = + arrow::Table::Make(schema, {arr->GetArray()}); + + std::shared_ptr writer; + CHECK_ARROW_ERROR_AND_ASSIGN(writer, + arrow::ipc::MakeFileWriter(ssink, schema)); + + VINEYARD_CHECK_OK(writer->WriteTable(*my_table)); + DLOG(INFO) << "table is written"; + writer->Close(); + std::shared_ptr buffer_; + DLOG(INFO) << "writer is closed"; + CHECK_ARROW_ERROR_AND_ASSIGN(buffer_, ssink->Finish()); + DLOG(INFO) << "buffer is extracted"; + return buffer_; +} + +std::shared_ptr dataframe_arrow_ipc_view( + const std::shared_ptr& p) { + auto df = std::dynamic_pointer_cast(p); + auto kvmeta = extractVineyardMetaToArrowMeta(df); + + auto batch = df->AsBatch(true); + std::shared_ptr table; + VINEYARD_CHECK_OK(RecordBatchesToTable({batch}, &table)); + std::shared_ptr sink; + CHECK_ARROW_ERROR_AND_ASSIGN(sink, arrow::io::BufferOutputStream::Create()); + std::clog << batch->column_data(2)->GetValues<_Float64>(1) << std::endl; + std::shared_ptr writer; + CHECK_ARROW_ERROR_AND_ASSIGN( + writer, arrow::ipc::MakeFileWriter(sink, batch->schema())); + VINEYARD_CHECK_OK(writer->WriteTable(*table)); + std::shared_ptr buffer; + writer->Close(); + CHECK_ARROW_ERROR_AND_ASSIGN(buffer, sink->Finish()); + return buffer; +} + +std::unordered_map +arrow_ipc_register_once() { + std::unordered_map + d_array_registry; +#define MODULES_FUSE_ADAPTORS_ARROW_IPC_DESERIALIZER_REGISTRY_H_FUSE_REGSITER( \ + T) \ + { \ + std::string array_type = "vineyard::NumericArray"; \ + array_type.append("<").append(type_name()).append(">"); \ + DLOG(INFO) << "register type: " << array_type << std::endl; \ + d_array_registry.emplace(array_type, &numeric_array_arrow_ipc_view); \ + } + + MODULES_FUSE_ADAPTORS_ARROW_IPC_DESERIALIZER_REGISTRY_H_FUSE_REGSITER(int8_t); + MODULES_FUSE_ADAPTORS_ARROW_IPC_DESERIALIZER_REGISTRY_H_FUSE_REGSITER( + int32_t); + MODULES_FUSE_ADAPTORS_ARROW_IPC_DESERIALIZER_REGISTRY_H_FUSE_REGSITER( + int16_t); + MODULES_FUSE_ADAPTORS_ARROW_IPC_DESERIALIZER_REGISTRY_H_FUSE_REGSITER( + int64_t); + MODULES_FUSE_ADAPTORS_ARROW_IPC_DESERIALIZER_REGISTRY_H_FUSE_REGSITER( + uint16_t); + MODULES_FUSE_ADAPTORS_ARROW_IPC_DESERIALIZER_REGISTRY_H_FUSE_REGSITER( + uint8_t); + MODULES_FUSE_ADAPTORS_ARROW_IPC_DESERIALIZER_REGISTRY_H_FUSE_REGSITER( + uint32_t); + MODULES_FUSE_ADAPTORS_ARROW_IPC_DESERIALIZER_REGISTRY_H_FUSE_REGSITER( + uint64_t); + MODULES_FUSE_ADAPTORS_ARROW_IPC_DESERIALIZER_REGISTRY_H_FUSE_REGSITER(float); + MODULES_FUSE_ADAPTORS_ARROW_IPC_DESERIALIZER_REGISTRY_H_FUSE_REGSITER(double); + + { + std::string t_name = type_name(); + DLOG(INFO) << "register type: " << t_name << std::endl; + + d_array_registry.emplace(t_name, &bool_array_arrow_ipc_view); + } + + { + std::string t_name = + type_name>(); + DLOG(INFO) << "register type: " << t_name << std::endl; + + d_array_registry.emplace(t_name, &string_array_arrow_ipc_view); + } + { + std::string t_name = type_name(); + DLOG(INFO) << "register type: " << t_name << std::endl; + + d_array_registry.emplace(t_name, &dataframe_arrow_ipc_view); + } + return d_array_registry; +} + +} // namespace fuse +} // namespace vineyard diff --git a/modules/fuse/adaptors/arrow_ipc/deserializer_registry.h b/modules/fuse/adaptors/arrow_ipc/deserializer_registry.h new file mode 100644 index 000000000..14c4e7a36 --- /dev/null +++ b/modules/fuse/adaptors/arrow_ipc/deserializer_registry.h @@ -0,0 +1,67 @@ +/** Copyright 2020-2022 Alibaba Group Holding Limited. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +#ifndef MODULES_FUSE_ADAPTORS_ARROW_IPC_DESERIALIZER_REGISTRY_H_ +#define MODULES_FUSE_ADAPTORS_ARROW_IPC_DESERIALIZER_REGISTRY_H_ + +#include + +#include +#include +#include +#include +#include +#include +#include + +#include "arrow/api.h" +#include "arrow/io/api.h" +#include "arrow/result.h" +#include "arrow/status.h" +#include "arrow/util/key_value_metadata.h" +#include "arrow/util/macros.h" + +#include "basic/ds/array.h" +#include "basic/ds/dataframe.h" +#include "client/client.h" +#include "client/ds/blob.h" +#include "client/ds/core_types.h" +#include "client/ds/i_object.h" +#include "common/util/env.h" +#include "common/util/logging.h" +#include "common/util/typename.h" +#include "common/util/uuid.h" + +namespace vineyard { +namespace fuse { + +using vineyard_deserializer_nt = std::shared_ptr (*)( + const std::shared_ptr&); + +template +std::shared_ptr numeric_array_arrow_ipc_view( + const std::shared_ptr& p); +std::shared_ptr string_array_arrow_ipc_view( + const std::shared_ptr& p); +std::shared_ptr bool_array_arrow_ipc_view( + const std::shared_ptr& p); +std::shared_ptr dataframe_arrow_ipc_view( + const std::shared_ptr& p); +std::unordered_map +arrow_ipc_register_once(); + +} // namespace fuse +} // namespace vineyard +#endif // MODULES_FUSE_ADAPTORS_ARROW_IPC_DESERIALIZER_REGISTRY_H_ diff --git a/modules/fuse/adaptors/arrow.cc b/modules/fuse/adaptors/arrow_ipc/serializer_registry.cc similarity index 91% rename from modules/fuse/adaptors/arrow.cc rename to modules/fuse/adaptors/arrow_ipc/serializer_registry.cc index dfe097991..0f91ae796 100644 --- a/modules/fuse/adaptors/arrow.cc +++ b/modules/fuse/adaptors/arrow_ipc/serializer_registry.cc @@ -13,7 +13,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -#include "fuse/adaptors/arrow.h" +#include "fuse/adaptors/arrow_ipc/serializer_registry.h" #include #include @@ -24,6 +24,7 @@ limitations under the License. #include "arrow/api.h" #include "arrow/io/api.h" #include "arrow/ipc/api.h" +#include "arrow/ipc/reader.h" #include "basic/ds/arrow.h" #include "basic/ds/arrow_utils.h" @@ -89,24 +90,22 @@ std::shared_ptr arrow_view( static void from_arrow_view(Client* client, std::string const& path, arrow::io::RandomAccessFile* fp) { - std::shared_ptr reader; + std::shared_ptr reader; CHECK_ARROW_ERROR_AND_ASSIGN(reader, - arrow::ipc::RecordBatchFileReader::Open(fp)); + arrow::ipc::RecordBatchStreamReader::Open(fp)); + std::shared_ptr table; std::vector> batches; - for (int64_t index = 0; index < reader->num_record_batches(); ++index) { - std::shared_ptr batch; - CHECK_ARROW_ERROR_AND_ASSIGN(batch, reader->ReadRecordBatch(index)); - batches.emplace_back(batch); - } - std::shared_ptr table; + VINEYARD_CHECK_OK(reader->ReadAll(&batches)); + VINEYARD_CHECK_OK(RecordBatchesToTable(batches, &table)); // build it into vineyard TableBuilder builder(*client, table); auto tb = builder.Seal(*client); VINEYARD_CHECK_OK(client->Persist(tb->id())); + DLOG(INFO) << tb->meta().ToString(); VINEYARD_CHECK_OK(client->PutName( tb->id(), path.substr(1, path.length() - 6 /* .arrow */ - 1))); } diff --git a/modules/fuse/adaptors/arrow.h b/modules/fuse/adaptors/arrow_ipc/serializer_registry.h similarity index 87% rename from modules/fuse/adaptors/arrow.h rename to modules/fuse/adaptors/arrow_ipc/serializer_registry.h index 1d27cd6df..f12148a1c 100644 --- a/modules/fuse/adaptors/arrow.h +++ b/modules/fuse/adaptors/arrow_ipc/serializer_registry.h @@ -13,8 +13,8 @@ See the License for the specific language governing permissions and limitations under the License. */ -#ifndef MODULES_FUSE_ADAPTORS_ARROW_H_ -#define MODULES_FUSE_ADAPTORS_ARROW_H_ +#ifndef MODULES_FUSE_ADAPTORS_ARROW_IPC_SERIALIZER_REGISTRY_H_ +#define MODULES_FUSE_ADAPTORS_ARROW_IPC_SERIALIZER_REGISTRY_H_ #include #include @@ -42,4 +42,4 @@ void from_arrow_view(Client* client, std::string const& path, } // namespace fuse } // namespace vineyard -#endif // MODULES_FUSE_ADAPTORS_ARROW_H_ +#endif // MODULES_FUSE_ADAPTORS_ARROW_IPC_SERIALIZER_REGISTRY_H_ diff --git a/modules/fuse/adaptors/formats.h b/modules/fuse/adaptors/formats.h deleted file mode 100644 index 516a8bdd6..000000000 --- a/modules/fuse/adaptors/formats.h +++ /dev/null @@ -1,23 +0,0 @@ -/** Copyright 2020-2022 Alibaba Group Holding Limited. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -#ifndef MODULES_FUSE_ADAPTORS_FORMATS_H_ -#define MODULES_FUSE_ADAPTORS_FORMATS_H_ - -#include "fuse/adaptors/arrow.h" -#include "fuse/adaptors/orc.h" -#include "fuse/adaptors/parquet.h" - -#endif // MODULES_FUSE_ADAPTORS_FORMATS_H_ diff --git a/modules/fuse/adaptors/orc.cc b/modules/fuse/adaptors/orc/orc.cc similarity index 94% rename from modules/fuse/adaptors/orc.cc rename to modules/fuse/adaptors/orc/orc.cc index 72f3383d2..09b1954b0 100644 --- a/modules/fuse/adaptors/orc.cc +++ b/modules/fuse/adaptors/orc/orc.cc @@ -13,7 +13,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -#include "fuse/adaptors/orc.h" +#include "modules/fuse/adaptors/orc/orc.h" #if defined(WITH_ORC) diff --git a/modules/fuse/adaptors/orc.h b/modules/fuse/adaptors/orc/orc.h similarity index 86% rename from modules/fuse/adaptors/orc.h rename to modules/fuse/adaptors/orc/orc.h index c1710a62b..45bf1d367 100644 --- a/modules/fuse/adaptors/orc.h +++ b/modules/fuse/adaptors/orc/orc.h @@ -13,8 +13,8 @@ See the License for the specific language governing permissions and limitations under the License. */ -#ifndef MODULES_FUSE_ADAPTORS_ORC_H_ -#define MODULES_FUSE_ADAPTORS_ORC_H_ +#ifndef MODULES_FUSE_ADAPTORS_ORC_ORC_H_ +#define MODULES_FUSE_ADAPTORS_ORC_ORC_H_ #if defined(WITH_ORC) @@ -33,4 +33,4 @@ void orc_view(std::shared_ptr& df); #endif -#endif // MODULES_FUSE_ADAPTORS_ORC_H_ +#endif // MODULES_FUSE_ADAPTORS_ORC_ORC_H_ diff --git a/modules/fuse/adaptors/parquet.cc b/modules/fuse/adaptors/parquet/parquet.cc similarity index 97% rename from modules/fuse/adaptors/parquet.cc rename to modules/fuse/adaptors/parquet/parquet.cc index e765073c0..f776cdf6e 100644 --- a/modules/fuse/adaptors/parquet.cc +++ b/modules/fuse/adaptors/parquet/parquet.cc @@ -13,7 +13,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -#include "fuse/adaptors/parquet.h" +#include "fuse/adaptors/parquet/parquet.h" #if defined(WITH_PARQUET) diff --git a/modules/fuse/adaptors/parquet.h b/modules/fuse/adaptors/parquet/parquet.h similarity index 85% rename from modules/fuse/adaptors/parquet.h rename to modules/fuse/adaptors/parquet/parquet.h index df56e8769..16ef528a0 100644 --- a/modules/fuse/adaptors/parquet.h +++ b/modules/fuse/adaptors/parquet/parquet.h @@ -13,8 +13,8 @@ See the License for the specific language governing permissions and limitations under the License. */ -#ifndef MODULES_FUSE_ADAPTORS_PARQUET_H_ -#define MODULES_FUSE_ADAPTORS_PARQUET_H_ +#ifndef MODULES_FUSE_ADAPTORS_PARQUET_PARQUET_H_ +#define MODULES_FUSE_ADAPTORS_PARQUET_PARQUET_H_ #if defined(WITH_PARQUET) @@ -34,4 +34,4 @@ std::shared_ptr parquet_view( #endif -#endif // MODULES_FUSE_ADAPTORS_PARQUET_H_ +#endif // MODULES_FUSE_ADAPTORS_PARQUET_PARQUET_H_ diff --git a/modules/fuse/fused.cc b/modules/fuse/fuse_impl.cc similarity index 63% rename from modules/fuse/fused.cc rename to modules/fuse/fuse_impl.cc index b0bfa4c9c..05c10500f 100644 --- a/modules/fuse/fused.cc +++ b/modules/fuse/fuse_impl.cc @@ -13,21 +13,29 @@ See the License for the specific language governing permissions and limitations under the License. */ -#include "fuse/fused.h" +#include "fuse/fuse_impl.h" #include #include -#include "boost/algorithm/string/predicate.hpp" +#include "arrow/api.h" +#include "arrow/io/api.h" +#include "arrow/result.h" +#include "arrow/status.h" +#include "arrow/util/key_value_metadata.h" +#include "arrow/util/macros.h" #include "basic/ds/array.h" #include "basic/ds/dataframe.h" +#include "boost/algorithm/string/predicate.hpp" #include "client/client.h" #include "client/ds/blob.h" +#include "client/ds/core_types.h" #include "client/ds/i_object.h" #include "common/util/logging.h" #include "common/util/uuid.h" -#include "fuse/adaptors/formats.h" +#include "fuse/adaptors/arrow_ipc/deserializer_registry.h" +#include "fuse/adaptors/arrow_ipc/serializer_registry.h" namespace vineyard { @@ -35,11 +43,11 @@ namespace fuse { struct fs::fs_state_t fs::state {}; -static std::string name_from_path(std::string const& path) { +std::string name_from_path(std::string const& path) { return path.substr(1, path.length() - 6 /* .arrow */ - 1); } -static std::shared_ptr generate_fuse_view( +std::shared_ptr generate_fuse_view( std::shared_ptr object) { if (auto dataframe = std::dynamic_pointer_cast(object)) { return fuse::arrow_view(dataframe); @@ -56,7 +64,7 @@ static std::shared_ptr generate_fuse_view( int fs::fuse_getattr(const char* path, struct stat* stbuf, struct fuse_file_info*) { - VLOG(2) << "fuse: getattr on " << path; + DLOG(INFO) << "fuse: getattr on " << path; std::lock_guard guard(state.mtx_); memset(stbuf, 0, sizeof(struct stat)); @@ -90,46 +98,74 @@ int fs::fuse_getattr(const char* path, struct stat* stbuf, return -ENOENT; } ObjectID target = InvalidObjectID(); - auto status = state.client->GetName(name_from_path(path), target); - if (!status.ok()) { - return -ENOENT; - } - bool exists = false; - VINEYARD_CHECK_OK(state.client->Exists(target, exists)); - if (!exists) { - return -ENOENT; + auto prefix = name_from_path(path); + auto status = state.client->GetName(prefix, target); + if (status.ok()) { + bool exists = false; + VINEYARD_CHECK_OK(state.client->Exists(target, exists)); + if (!exists) { + return -ENOENT; + } + auto obj = state.client->GetObject(target); + DLOG(INFO) << "tryting to deserialize" << obj->meta().GetTypeName(); + auto d = + fs::state.ipc_desearilizer_registry.at(obj->meta().GetTypeName()); + auto buffer = d(obj); + state.views[path_string] = buffer; + + stbuf->st_size = buffer->size(); + } else { + auto obj = state.client->GetObject(ObjectIDFromString(prefix)); + DLOG(INFO) << "trying to deserialize " << obj->meta().GetTypeName(); + + if (obj == nullptr) { + return -ENOENT; + } + auto d = + fs::state.ipc_desearilizer_registry.at(obj->meta().GetTypeName()); + auto buffer = d(obj); + state.views[path_string] = buffer; + stbuf->st_size = buffer->size(); } - - auto buffer = generate_fuse_view(state.client->GetObject(target)); - state.views.emplace(path, buffer); - stbuf->st_size = buffer->size(); return 0; } } int fs::fuse_open(const char* path, struct fuse_file_info* fi) { - VLOG(2) << "fuse: open " << path << " with mode " << fi->flags; - - if ((fi->flags & O_ACCMODE) != O_RDONLY) { + DLOG(INFO) << "fuse: open " << path << " with mode " << fi->flags; + if (((fi->flags & O_ACCMODE) & (O_RDONLY))) { return -EACCES; } + std::lock_guard guard(state.mtx_); - std::string path_string(path); - ObjectID target = InvalidObjectID(); - VINEYARD_CHECK_OK(state.client->GetName(name_from_path(path_string), target)); + // check for the existence todo: switch the name store from the vineyard to + // fuse - auto object = state.client->GetObject(target); - if (object == nullptr) { - return -ENOENT; - } - auto loc = state.views.find(path_string); + // the opened file referenced by the user-defined name + auto filename = name_from_path(path); + auto target = InvalidObjectID(); + auto loc = state.views.find(path); + std::string path_string(path); + std::shared_ptr object = nullptr; if (loc == state.views.end()) { - auto buffer = generate_fuse_view(state.client->GetObject(target)); - state.views[path_string] = buffer; + if (state.client->GetName(filename, target).ok()) { + object = state.client->GetObject(target); + } + + if (object == nullptr) { + object = state.client->GetObject(ObjectIDFromString(filename.substr(1))); + } + if (object == nullptr) { + return -ENOENT; + } else { + auto d = + fs::state.ipc_desearilizer_registry.at(object->meta().GetTypeName()); + state.views[path_string] = d(object); + } } + // bypass kernel's page cache to avoid knowing the size in `getattr`. - // // see also: // https://stackoverflow.com/questions/46267972/fuse-avoid-calculating-size-in-getattr fi->direct_io = 1; @@ -138,8 +174,9 @@ int fs::fuse_open(const char* path, struct fuse_file_info* fi) { int fs::fuse_read(const char* path, char* buf, size_t size, off_t offset, struct fuse_file_info* fi) { - VLOG(2) << "fuse: read " << path << " from " << offset << ", expect " << size - << " bytes"; + DLOG(INFO) << "fuse: read " << path << " from " << offset << ", expect " + << size << " bytes"; + std::unordered_map>::const_iterator loc; { @@ -164,10 +201,16 @@ int fs::fuse_read(const char* path, char* buf, size_t size, off_t offset, int fs::fuse_write(const char* path, const char* buf, size_t size, off_t offset, struct fuse_file_info* fi) { - VLOG(2) << "fuse: write " << path << " from " << offset << ", expect " << size - << " bytes"; + DLOG(INFO) << "fuse: write " << path << " from " << offset << ", expect " + << size << " bytes"; + std::unordered_map>::const_iterator loc; + + { + std::lock_guard guard(state.mtx_); - auto loc = state.mutable_views.find(path); + loc = state.mutable_views.find(path); + } if (loc == state.mutable_views.end()) { return -ENOENT; } @@ -181,17 +224,17 @@ int fs::fuse_write(const char* path, const char* buf, size_t size, off_t offset, } int fs::fuse_statfs(const char* path, struct statvfs*) { - VLOG(2) << "fuse: statfs " << path; + DLOG(INFO) << "fuse: statfs " << path; return 0; } int fs::fuse_flush(const char* path, struct fuse_file_info*) { - VLOG(2) << "fuse: flush " << path; + DLOG(INFO) << "fuse: flush " << path; return 0; } int fs::fuse_release(const char* path, struct fuse_file_info*) { - VLOG(2) << "fuse: release " << path; + DLOG(INFO) << "fuse: release " << path; { // TODO: ref count should be used @@ -217,19 +260,19 @@ int fs::fuse_release(const char* path, struct fuse_file_info*) { } int fs::fuse_getxattr(const char* path, const char* name, char*, size_t) { - VLOG(2) << "fuse: getxattr " << path << ": name"; + DLOG(INFO) << "fuse: getxattr " << path << ": name"; return 0; } int fs::fuse_opendir(const char* path, struct fuse_file_info* info) { - VLOG(2) << "fuse: opendir " << path; + DLOG(INFO) << "fuse: opendir " << path; return 0; } int fs::fuse_readdir(const char* path, void* buf, fuse_fill_dir_t filler, off_t offset, struct fuse_file_info* fi, enum fuse_readdir_flags flags) { - VLOG(2) << "fuse: readdir " << path; + DLOG(INFO) << "fuse: readdir " << path; if (strcmp(path, "/") != 0) { return -ENOENT; @@ -239,20 +282,20 @@ int fs::fuse_readdir(const char* path, void* buf, fuse_fill_dir_t filler, filler(buf, "..", NULL, 0, fuse_fill_dir_flags::FUSE_FILL_DIR_PLUS); std::unordered_map metas{}; - VINEYARD_CHECK_OK(state.client->ListData( - "vineyard::DataFrame", false, std::numeric_limits::max(), metas)); - VINEYARD_CHECK_OK(state.client->ListData("vineyard::RecordBatch", false, - std::numeric_limits::max(), - metas)); - VINEYARD_CHECK_OK(state.client->ListData( - "vineyard::Table", false, std::numeric_limits::max(), metas)); + VINEYARD_CHECK_OK(state.client->ListData( + ".*", true, std::numeric_limits::max(), metas)); for (auto const& item : metas) { - // std::string base = ObjectIDToString(item.first) + ".arrow"; - // filler(buf, base.c_str(), NULL, 0, - // fuse_fill_dir_flags::FUSE_FILL_DIR_PLUS); if (item.second.contains("__name")) { std::string base = item.second["__name"].get() + ".arrow"; + DLOG(INFO) << "open object with name" << base; + filler(buf, base.c_str(), NULL, 0, + fuse_fill_dir_flags::FUSE_FILL_DIR_PLUS); + } else { + std::string base = ObjectIDToString(item.first).c_str(); + base.append(".arrow"); + DLOG(INFO) << "open object without name" << base; + filler(buf, base.c_str(), NULL, 0, fuse_fill_dir_flags::FUSE_FILL_DIR_PLUS); } @@ -261,37 +304,39 @@ int fs::fuse_readdir(const char* path, void* buf, fuse_fill_dir_t filler, } void* fs::fuse_init(struct fuse_conn_info* conn, struct fuse_config* cfg) { - VLOG(2) << "fuse: initfs with vineyard socket " << state.vineyard_socket; + DLOG(INFO) << "fuse: initfs with vineyard socket " << state.vineyard_socket; state.client.reset(new vineyard::Client()); state.client->Connect(state.vineyard_socket); fuse_apply_conn_info_opts(state.conn_opts, conn); - conn->max_read = conn->max_readahead; + // conn->max_read = conn->max_readahead; cfg->kernel_cache = 0; return NULL; } void fs::fuse_destroy(void* private_data) { - VLOG(2) << "fuse: destroy"; + DLOG(INFO) << "fuse: destroy"; state.views.clear(); + state.mutable_views.clear(); state.client->Disconnect(); } int fs::fuse_access(const char* path, int mode) { - VLOG(2) << "fuse: access " << path << " with mode " << mode; + DLOG(INFO) << "fuse: access " << path << " with mode " << mode; return mode; } int fs::fuse_create(const char* path, mode_t mode, struct fuse_file_info*) { - VLOG(2) << "fuse: create " << path << " with mode " << mode; + DLOG(INFO) << "fuse: create " << path << " with mode " << mode; if (state.mutable_views.find(path) != state.mutable_views.end()) { LOG(ERROR) << "fuse: create: file already exists" << path; return EEXIST; } + DLOG(INFO) << "creating " << path; state.mutable_views.emplace( path, std::shared_ptr(new arrow::BufferBuilder())); return 0; diff --git a/modules/fuse/fused.h b/modules/fuse/fuse_impl.h similarity index 90% rename from modules/fuse/fused.h rename to modules/fuse/fuse_impl.h index 4f199757f..4624604ea 100644 --- a/modules/fuse/fused.h +++ b/modules/fuse/fuse_impl.h @@ -13,8 +13,8 @@ See the License for the specific language governing permissions and limitations under the License. */ -#ifndef MODULES_FUSE_FUSED_H_ -#define MODULES_FUSE_FUSED_H_ +#ifndef MODULES_FUSE_FUSE_IMPL_H_ +#define MODULES_FUSE_FUSE_IMPL_H_ #include #include @@ -35,6 +35,8 @@ limitations under the License. #include "client/client.h" +#include "adaptors/arrow_ipc/deserializer_registry.h" + namespace arrow { class Buffer; } @@ -48,10 +50,12 @@ struct fs { struct fuse_conn_info_opts* conn_opts; std::string vineyard_socket; std::shared_ptr client; + std::mutex mtx_; std::unordered_map> views; std::unordered_map> mutable_views; - std::mutex mtx_; + std::unordered_map + ipc_desearilizer_registry; } state; static int fuse_getattr(const char* path, struct stat* stbuf, @@ -92,4 +96,4 @@ struct fs { } // namespace vineyard -#endif // MODULES_FUSE_FUSED_H_ +#endif // MODULES_FUSE_FUSE_IMPL_H_ diff --git a/modules/fuse/fusermount.cc b/modules/fuse/fusermount.cc index 1072eb9d3..b54ef6f81 100644 --- a/modules/fuse/fusermount.cc +++ b/modules/fuse/fusermount.cc @@ -20,9 +20,10 @@ limitations under the License. #include #include +#include "adaptors/arrow_ipc/deserializer_registry.h" #include "common/util/env.h" #include "common/util/logging.h" -#include "fuse/fused.h" +#include "fuse/fuse_impl.h" /* * Command line options @@ -56,8 +57,11 @@ static void print_help(const char* progname) { static int process_args(struct fuse_args& args, int argc, char** argv) { // Set defaults -- we have to use strdup so that fuse_opt_parse can free // the defaults if other values are specified. - std::string env = vineyard::read_env("VINEYARD_IPC_SOCKET"); - options.vineyard_socket = strdup(env.c_str()); + if (!options.vineyard_socket) { + std::string env = vineyard::read_env("VINEYARD_IPC_SOCKET"); + + options.vineyard_socket = strdup(env.c_str()); + } /* Parse options */ if (fuse_opt_parse(&args, &options, option_spec, NULL) == -1) { @@ -82,6 +86,11 @@ static int process_args(struct fuse_args& args, int argc, char** argv) { // populate state vineyard::fuse::fs::state.vineyard_socket = options.vineyard_socket; + LOG(INFO) << "prepare to conncet to socket" + << vineyard::fuse::fs::state.vineyard_socket; + + vineyard::fuse::fs::state.ipc_desearilizer_registry = + vineyard::fuse::arrow_ipc_register_once(); return 0; } @@ -98,8 +107,9 @@ static const struct fuse_operations vineyard_fuse_operations = { .readdir = vineyard::fuse::fs::fuse_readdir, .init = vineyard::fuse::fs::fuse_init, .destroy = vineyard::fuse::fs::fuse_destroy, - .access = vineyard::fuse::fs::fuse_access, .create = vineyard::fuse::fs::fuse_create, + + // .access = vineyard::fuse::fs::fuse_access, }; int main(int argc, char* argv[]) { @@ -120,7 +130,6 @@ int main(int argc, char* argv[]) { // process conn args struct fuse_conn_info_opts* conn_opts = fuse_parse_conn_info_opts(&args); vineyard::fuse::fs::state.conn_opts = conn_opts; - LOG(INFO) << "Starting vineyard fuse driver ..."; ret = fuse_main(args.argc, args.argv, &vineyard_fuse_operations, NULL); fuse_opt_free_args(&args); diff --git a/modules/fuse/test/fuse_test.cc b/modules/fuse/test/fuse_test.cc deleted file mode 100644 index 35dc8c7e3..000000000 --- a/modules/fuse/test/fuse_test.cc +++ /dev/null @@ -1,37 +0,0 @@ -/** Copyright 2020-2022 Alibaba Group Holding Limited. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -#include - -#include -#include -#include - -#include "arrow/api.h" -#include "arrow/io/api.h" - -#include "basic/ds/array.h" -#include "client/client.h" -#include "client/ds/object_meta.h" -#include "common/util/logging.h" -#include "fuse/fused.h" - -using namespace vineyard; // NOLINT(build/namespaces) - -int main() { - printf("%d%d\n", FUSE_MAJOR_VERSION, FUSE_MINOR_VERSION); - printf("%d\n", fuse_version()); - return 0; -} diff --git a/modules/fuse/tests/__init__.py b/modules/fuse/tests/__init__.py new file mode 100644 index 000000000..b82d4fe11 --- /dev/null +++ b/modules/fuse/tests/__init__.py @@ -0,0 +1,17 @@ +#! /usr/bin/env python +# -*- coding: utf-8 -*- +# +# Copyright 2020-2022 Alibaba Group Holding Limited. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# diff --git a/modules/fuse/tests/conftest.py b/modules/fuse/tests/conftest.py new file mode 100644 index 000000000..4fa37498e --- /dev/null +++ b/modules/fuse/tests/conftest.py @@ -0,0 +1,69 @@ +#! /usr/bin/env python +# -*- coding: utf-8 -*- +# +# Copyright 2020-2022 Alibaba Group Holding Limited. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import logging + +import pytest + +import vineyard + +logging.basicConfig(level=logging.NOTSET) + + +def pytest_addoption(parser): + parser.addoption( + '--vineyard-ipc-socket', + action='store', + default='/tmp/vineyard.sock', + help='Location of vineyard IPC socket', + ) + + parser.addoption( + '--vineyard-endpoint', + action='store', + default='127.0.0.1:9600', + help='Address of vineyard RPC endpoint', + ) + + parser.addoption( + '--vineyard-fuse-mount-dir', + action='store', + default='/tmp/vineyard_fuse.default', + help='fusermount directory', + ) + + +@pytest.fixture(scope='session') +def vineyard_ipc_socket(request): + return request.config.option.vineyard_ipc_socket + + +@pytest.fixture(scope='session') +def vineyard_endpoint(request): + return request.config.option.vineyard_endpoint + + +@pytest.fixture(scope='session') +def vineyard_fuse_mount_dir(request): + return request.config.option.vineyard_fuse_mount_dir + + +@pytest.fixture(scope='session') +def vineyard_client(request): + ipc_socket = request.config.option.vineyard_ipc_socket + return vineyard.connect(ipc_socket) diff --git a/modules/fuse/tests/fuse_test.py b/modules/fuse/tests/fuse_test.py new file mode 100644 index 000000000..3f1a3aa17 --- /dev/null +++ b/modules/fuse/tests/fuse_test.py @@ -0,0 +1,160 @@ +#!/usr/env/env python3 +# -*- coding: utf-8 -*- +# +# Copyright 2020-2022 Alibaba Group Holding Limited. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +import os +from enum import Enum + +import numpy as np +import pandas as pd +import pyarrow as pa + + +class Type(Enum): + STRING = 1 + INT64 = 2 + DOUBLE = 3 + + +def generate_dataframe(size=(3, 4)): + height, width = size + ldf = pd.DataFrame( + np.random.randint(0, 100, size=(height, width)) * 2.3, + columns=[''.join(['a'] * i) for i in range(1, width + 1)], + ) + rdf = pd.DataFrame( + np.random.randint(0, 100, size=(height, width)), + columns=[''.join(['b'] * i) for i in range(1, width + 1)], + ) + return pd.concat([ldf, rdf], axis=1, join="inner") + + +def generate_string_array(length=20): + res = [] + alphabet = [ + 'a', + 'b', + 'c', + 'd', + 'e', + 'f', + 'g', + 'h', + 'i', + 'j', + 'k', + 'l', + 'm', + 'n', + 'o', + 'p', + 'q', + 'r', + 's', + 't', + 'u', + 'v', + 'w', + 'x', + 'y', + 'z', + ' ', + ] + + for _ in range(1, length): + s_length = np.random.randint(1, length) + res.append(''.join(np.random.choice(alphabet, s_length))) + + return res + + +def generate_array(type: Type, length=20): + f = { + Type.INT64: lambda x: np.random.randint(0, 1000, x), + Type.DOUBLE: lambda x: np.random.uniform(low=0, high=1000, size=x), + Type.STRING: generate_string_array, + } + return pa.array(f[type](length)) + + +def assert_dataframe(stored_df: pd.DataFrame, extracted_df: pa.Table): + pdf = pa.Table.from_pandas(stored_df) + assert extracted_df.equals(pdf), "data frame unmatch" + + +def assert_array(stored_arr: pa.Array, extracted_array: pa.Array): + assert stored_arr.equals(extracted_array), "array unmatch" + + +def read_data_from_fuse(vid, test_mount_dir): + with open(os.path.join(test_mount_dir, vid), 'rb') as source: + with pa.ipc.open_file(source) as reader: + data = reader.read_all() + return data + + +def compare_two_string_array(arr_str_1, arr_str_2): + a = arr_str_1 + b = arr_str_2 + if len(a) != len(b): + return False + else: + for i, j in zip(a, b): + if str(i) != str(j): + return False + return True + + +def test_fuse_int64_array(vineyard_client, vineyard_fuse_mount_dir): + data = generate_array(Type.INT64) + id = vineyard_client.put(data) + extracted_data = read_data_from_fuse( + str(id)[11:28] + ".arrow", vineyard_fuse_mount_dir + ) + + extracted_data = extracted_data.column("a").chunk(0) + assert_array(data, extracted_data) + + +def test_fuse_double_array(vineyard_client, vineyard_fuse_mount_dir): + data = generate_array(Type.DOUBLE) + id = vineyard_client.put(data) + extracted_data = read_data_from_fuse( + str(id)[11:28] + ".arrow", vineyard_fuse_mount_dir + ) + + extracted_data = extracted_data.column("a").chunk(0) + assert_array(data, extracted_data) + + +def test_fuse_string_array(vineyard_client, vineyard_fuse_mount_dir): + data = generate_array(Type.STRING) + id = vineyard_client.put(data) + extracted_data = read_data_from_fuse( + str(id)[11:28] + ".arrow", vineyard_fuse_mount_dir + ) + extracted_data = extracted_data.column("a").chunk(0) + assert compare_two_string_array(data, extracted_data), "string array not the same" + + +def test_fuse_df(vineyard_client, vineyard_fuse_mount_dir): + data = generate_dataframe() + + id = vineyard_client.put(data) + extracted_data = read_data_from_fuse( + str(id)[11:28] + ".arrow", vineyard_fuse_mount_dir + ) + assert_dataframe(data, extracted_data) diff --git a/python/vineyard/deploy/utils.py b/python/vineyard/deploy/utils.py index 98a59986b..69a63ea63 100644 --- a/python/vineyard/deploy/utils.py +++ b/python/vineyard/deploy/utils.py @@ -72,6 +72,7 @@ def find_executable(name, search_paths=None): def start_program( name, *args, verbose=False, nowait=False, search_paths=None, shell=False, **kwargs ): + # actually start a new program that will be running forever env, cmdargs = os.environ.copy(), list(args) for k, v in kwargs.items(): if k[0].isupper(): diff --git a/setup.py b/setup.py index b85b74cb8..0d0c829e0 100644 --- a/setup.py +++ b/setup.py @@ -121,6 +121,7 @@ def finalize_options(self): 'setup_ml.py', 'setup_ray.py', 'test/runner.py', + 'modules/fuse/tests', ] def extend_cmd_as_import(self, cmd) -> List[str]: diff --git a/test/runner.py b/test/runner.py index ad8793370..b4a366615 100755 --- a/test/runner.py +++ b/test/runner.py @@ -19,6 +19,7 @@ VINEYARD_CI_IPC_SOCKET = '/tmp/vineyard.ci.%s.sock' % time.time() +VINEYARD_FUSE_MOUNT_DIR = '/tmp/vineyard_fuse.%s' % time.time() find_executable_generic = None start_program_generic = None find_port = None @@ -90,6 +91,31 @@ def start_program(*args, **kwargs): return start_program_generic(*args, search_paths=[binary_dir], **kwargs) +@contextlib.contextmanager +def start_fuse(): + if platform.system() != 'Linux': + print('can not mount fuse on the non-linux system yet') + return + with contextlib.ExitStack() as stack: + vfm = find_executable("vineyard-fusermount") + os.mkdir(VINEYARD_FUSE_MOUNT_DIR) + + proc = start_program( + vfm, + '-f', + '-s', + '--vineyard-socket=%s' % VINEYARD_CI_IPC_SOCKET, + VINEYARD_FUSE_MOUNT_DIR, + ) + yield stack.enter_context(proc) + + # cleanup + try: + subprocess.run(['umount', '-f', '-l', VINEYARD_FUSE_MOUNT_DIR], check=False) + except Exception: # pylint: disable=broad-except + pass + + @contextlib.contextmanager def start_etcd(): with contextlib.ExitStack() as stack: @@ -431,6 +457,33 @@ def run_scale_in_out_tests(etcd_endpoints, instance_size=4): time.sleep(5) +def run_fuse_test(etcd_endpoints): + etcd_prefix = 'vineyard_test_%s' % time.time() + + with start_vineyardd( + etcd_endpoints, etcd_prefix, default_ipc_socket=VINEYARD_CI_IPC_SOCKET + ) as (_, rpc_socket_port), start_fuse() as _: + start_time = time.time() + subprocess.check_call( + [ + 'pytest', + '-s', + '-vvv', + '--durations=0', + '--log-cli-level', + 'DEBUG', + 'modules/fuse/tests', + '--vineyard-ipc-socket=%s' % VINEYARD_CI_IPC_SOCKET, + '--vineyard-endpoint=localhost:%s' % rpc_socket_port, + '--vineyard-fuse-mount-dir=%s' % VINEYARD_FUSE_MOUNT_DIR, + ], + cwd=os.path.join(os.path.dirname(os.path.abspath(__file__)), '..'), + ) + print( + 'running fuse tests use %s seconds' % (time.time() - start_time), + ) + + def run_python_tests(etcd_endpoints, tests): etcd_prefix = 'vineyard_test_%s' % time.time() with start_vineyardd( @@ -689,6 +742,13 @@ def parse_sys_args(): default=False, help="Whether to run python contrib tests", ) + + arg_parser.add_argument( + '--with-fuse', + action='store_true', + default=False, + help="whether to run fuse test", + ) arg_parser.add_argument( '--tests', action='extend', @@ -696,6 +756,7 @@ def parse_sys_args(): type=str, help="Specify tests cases ro run", ) + return arg_parser, arg_parser.parse_args() @@ -727,13 +788,17 @@ def execute_tests(args): with start_etcd() as (_, etcd_endpoints): run_io_adaptor_distributed_tests(etcd_endpoints, args.with_migration) + if args.with_fuse: + with start_etcd() as (_, etcd_endpoints): + run_fuse_test(etcd_endpoints) + def main(): parser, args = parse_sys_args() - if not (args.with_cpp or args.with_python or args.with_io): + if not (args.with_cpp or args.with_python or args.with_io or args.with_fuse): print( - 'Error: \n\tat least one of of --with-{cpp,python,io} needs ' + 'Error: \n\tat least one of of --with-{cpp,python,io,fuse} needs ' 'to be specified\n' ) parser.print_help()