Skip to content

Commit

Permalink
add cache manager to fuse
Browse files Browse the repository at this point in the history
Signed-off-by: sitan liu <[email protected]>
  • Loading branch information
liusitan committed Aug 25, 2022
1 parent 2f2c7f7 commit 4740031
Show file tree
Hide file tree
Showing 10 changed files with 377 additions and 27 deletions.
71 changes: 71 additions & 0 deletions modules/fuse/cache_manager/manager.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/** Copyright 2020-2022 Alibaba Group Holding Limited.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
#ifndef MODULES_FUSE_CACHE_MANAGER_MANAGER_H_
#define MODULES_FUSE_CACHE_MANAGER_MANAGER_H_
#include <iostream>
#include <list>
#include <memory>
#include <type_traits>
#include <unordered_map>
#include <vector>

#include "arrow/buffer.h"
#include "common/util/logging.h"
#include "common/util/status.h"
namespace vineyard {
namespace fuse {

namespace cache_manager {
template <typename K, typename V>
struct KeyValue {
using KeyType = K;
using ValType = std::shared_ptr<V>;
const KeyType key;
ValType value;
KeyValue(KeyType k, ValType v) : key(k), value(v) { ; }
KeyValue(const KeyValue<K, V>& kv) : key(kv.key), value(kv.value) { ; }
};

template <typename KeyValue>
class CacheManager {
private:
std::list<KeyValue> myList;
std::unordered_map<typename KeyValue::KeyType,
typename std::list<KeyValue>::iterator>
myMap;
size_t capacityBytes;
size_t curBytes;
void popToNBytes(size_t n);
bool WithInCapacity(size_t data);

public:
explicit CacheManager(size_t capacity);
CacheManager();
void resize(size_t capacity);
Status put(const typename KeyValue::KeyType& key,
typename KeyValue::ValType val);
typename KeyValue::ValType get(const typename KeyValue::KeyType& key);
bool has(const typename KeyValue::KeyType& key);
std::list<KeyValue> getLinkedList();
typename KeyValue::ValType operator[](const typename KeyValue::KeyType& key);
size_t getCurBytes();
size_t getCapacityBytes();
void destroy();
};

} // namespace cache_manager
} // namespace fuse
}; // namespace vineyard
#endif // MODULES_FUSE_CACHE_MANAGER_MANAGER_H_
115 changes: 115 additions & 0 deletions modules/fuse/cache_manager/manager.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
#ifndef MODULES_FUSE_CACHE_MANAGER_MANAGER_HPP_
#define MODULES_FUSE_CACHE_MANAGER_MANAGER_HPP_

#include "fuse/cache_manager/manager.h"
#include "arrow/buffer.h"

namespace vineyard {
namespace fuse {

namespace cache_manager {
template <typename KV>
void CacheManager<KV>::popToNBytes(size_t n) {
while (this->curBytes > n) {
auto keyToBeDel = myList.back().key;
auto dataToBeDel = myList.back().value;
this->curBytes -= dataToBeDel->capacity();
myList.pop_back();
myMap.erase(keyToBeDel);
DLOG(INFO) << "remove key: " << keyToBeDel << " value: " << dataToBeDel->ToString()<< " remaining bytes: "<<this->curBytes;
}
}
template <class KV>
bool CacheManager<KV>::WithInCapacity(size_t data) {
return data <= capacityBytes;
}
template<class KV>
CacheManager<KV>::CacheManager(size_t capacityBytes):capacityBytes(capacityBytes),curBytes(0){
}
template<class KV>
CacheManager<KV>::CacheManager():capacityBytes(0),curBytes(0){
}
template<class KV>
void CacheManager<KV>::resize(size_t targetCapacityBytes){
capacityBytes = targetCapacityBytes;
}
template<class KV>
void CacheManager<KV>::destroy(){
this->~CacheManager();
}
template<class KV>
bool CacheManager<KV>::has(const typename KV::KeyType& key){
return myMap.find(key)!= myMap.end();
}
template<class KV>
typename KV::ValType CacheManager<KV>::operator[](const typename KV::KeyType& key) {
return get(key);
}

template<class KV>
size_t CacheManager<KV>::getCapacityBytes(){
return this->capacityBytes;
}
template<class KV>
size_t CacheManager<KV>::getCurBytes(){
return this->curBytes;
}
template <class KV>
Status CacheManager<KV>::put(const typename KV::KeyType& key, typename KV::ValType v) {

if (WithInCapacity(v->capacity())) {

auto found_map_iter = myMap.find(key);

if (found_map_iter != myMap.end()) {
DLOG(INFO) << "update key: " << key << " value: " << v->ToString()<<std::endl;

auto found_key = found_map_iter->first;
auto& found_kv = found_map_iter->second;

curBytes -= found_kv->value->capacity();
popToNBytes(capacityBytes - v->capacity());
myList.splice(myList.begin(), this->myList, found_kv);
found_kv->value = v;
return Status::OK();
} else {
DLOG(INFO) << "put key: " << key << " value: " << v->ToString()<<std::endl;
popToNBytes(capacityBytes - v->capacity());
myList.emplace_front(key,v);
// decltype(myMap[key])::nothing;
myMap[key] = myList.begin();
this->curBytes += v->capacity();
return Status::OK();
}
} else {
DLOG(INFO)<<"this keyvalue is too large to put int"<<std::endl;
return Status::NotEnoughMemory("");
}
}
template <class KV>

std::list<KV> CacheManager<KV>::getLinkedList(){
return myList;

}

template <class KV>
typename KV::ValType CacheManager<KV>::get(const typename KV::KeyType& key) {
auto found_iter = myMap.find(key);
if (found_iter == myMap.end()) // key doesn't exist
{
DLOG(INFO)<< "not found key " << key;

return nullptr;}
DLOG(INFO)<< "found key " << key;

myList.splice(
myList.begin(), myList,
found_iter->second); // move the node corresponding to key to front
return found_iter->second->value;
}
} // namespace cache_manager

} // namespace fuse
} // namespace vineyard
#endif // MODULES_FUSE_CACHE_MANAGER_MANAGER_HPP_
32 changes: 15 additions & 17 deletions modules/fuse/fuse_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,8 @@ int fs::fuse_getattr(const char* path, struct stat* stbuf,
stbuf->st_nlink = 1;

{
auto iter = state.views.find(path);
if (iter != state.views.end()) {
stbuf->st_size = iter->second->size();
if (state.views.has(path)) {
stbuf->st_size = state.views.get(path)->size();
return 0;
}
}
Expand Down Expand Up @@ -111,8 +110,7 @@ int fs::fuse_getattr(const char* path, struct stat* stbuf,
auto d =
fs::state.ipc_desearilizer_registry.at(obj->meta().GetTypeName());
auto buffer = d(obj);
state.views[path_string] = buffer;

state.views.put(path_string, buffer);
stbuf->st_size = buffer->size();
} else {
auto obj = state.client->GetObject(ObjectIDFromString(prefix));
Expand All @@ -124,7 +122,7 @@ int fs::fuse_getattr(const char* path, struct stat* stbuf,
auto d =
fs::state.ipc_desearilizer_registry.at(obj->meta().GetTypeName());
auto buffer = d(obj);
state.views[path_string] = buffer;
state.views.put(path_string, buffer);
stbuf->st_size = buffer->size();
}
return 0;
Expand All @@ -145,10 +143,9 @@ int fs::fuse_open(const char* path, struct fuse_file_info* fi) {
// the opened file referenced by the user-defined name
auto filename = name_from_path(path);
auto target = InvalidObjectID();
auto loc = state.views.find(path);
std::string path_string(path);
std::shared_ptr<vineyard::Object> object = nullptr;
if (loc == state.views.end()) {
if (!state.views.has(path)) {
if (state.client->GetName(filename, target).ok()) {
object = state.client->GetObject(target);
}
Expand All @@ -161,7 +158,7 @@ int fs::fuse_open(const char* path, struct fuse_file_info* fi) {
} else {
auto d =
fs::state.ipc_desearilizer_registry.at(object->meta().GetTypeName());
state.views[path_string] = d(object);
state.views.put(path_string, d(object));
}
}

Expand All @@ -177,16 +174,17 @@ int fs::fuse_read(const char* path, char* buf, size_t size, off_t offset,
DLOG(INFO) << "fuse: read " << path << " from " << offset << ", expect "
<< size << " bytes";

std::unordered_map<std::string,
std::shared_ptr<arrow::Buffer>>::const_iterator loc;
std::shared_ptr<arrow::Buffer> buffer;
{
std::lock_guard<std::mutex> guard(state.mtx_);
loc = state.views.find(path);
}
if (loc == state.views.end()) {
return -ENOENT;

std::string path_string(path);

if (!state.views.has(path)) {
return -ENOENT;
}
buffer = state.views[path_string];
}
auto buffer = loc->second;
if (offset >= buffer->size()) {
return 0;
} else {
Expand Down Expand Up @@ -319,7 +317,7 @@ void* fs::fuse_init(struct fuse_conn_info* conn, struct fuse_config* cfg) {
void fs::fuse_destroy(void* private_data) {
DLOG(INFO) << "fuse: destroy";

state.views.clear();
state.views.destroy();
state.mutable_views.clear();
state.client->Disconnect();
}
Expand Down
10 changes: 5 additions & 5 deletions modules/fuse/fuse_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,8 @@ limitations under the License.

#include "adaptors/arrow_ipc/deserializer_registry.h"

namespace arrow {
class Buffer;
}

#include "cache_manager/manager.h"
#include "cache_manager/manager.hpp"
namespace vineyard {

namespace fuse {
Expand All @@ -51,7 +49,9 @@ struct fs {
std::string vineyard_socket;
std::shared_ptr<Client> client;
std::mutex mtx_;
std::unordered_map<std::string, std::shared_ptr<arrow::Buffer>> views;
cache_manager::CacheManager<
cache_manager::KeyValue<std::string, arrow::Buffer>>
views;
std::unordered_map<std::string, std::shared_ptr<arrow::BufferBuilder>>
mutable_views;
std::unordered_map<std::string, vineyard::fuse::vineyard_deserializer_nt>
Expand Down
16 changes: 12 additions & 4 deletions modules/fuse/fusermount.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ limitations under the License.
#include "common/util/env.h"
#include "common/util/logging.h"
#include "fuse/fuse_impl.h"
#include "modules/fuse/cache_manager/manager.h"

/*
* Command line options
Expand All @@ -34,6 +35,7 @@ limitations under the License.
*/
static struct options {
const char* vineyard_socket;
size_t cache_size;
int show_help;
} options;

Expand All @@ -42,14 +44,16 @@ static struct options {

static const struct fuse_opt option_spec[] = {
OPTION("--vineyard-socket=%s", vineyard_socket),
OPTION("--help", show_help), OPTION("-h", show_help), FUSE_OPT_END};

OPTION("--max-cache-size=%d", cache_size), OPTION("--help", show_help),
OPTION("-h", show_help), FUSE_OPT_END};
static void print_help(const char* progname) {
printf("usage: %s [options] <mountpoint>\n\n", progname);
printf(
"Vineyard specific options:\n"
" --vineyard-socket=<s> Path of UNIX-domain socket of vineyard "
"server\n"
" --max-cache-size=<size> Size of cache in bytes\n"

" (default: \"$VINEYARD_IPC_SOCKET\")\n"
"\n");
}
Expand All @@ -62,6 +66,9 @@ static int process_args(struct fuse_args& args, int argc, char** argv) {

options.vineyard_socket = strdup(env.c_str());
}
if (!options.cache_size) {
options.cache_size = 1 * 1024 * 1024 * 1024;
}

/* Parse options */
if (fuse_opt_parse(&args, &options, option_spec, NULL) == -1) {
Expand All @@ -86,9 +93,10 @@ static int process_args(struct fuse_args& args, int argc, char** argv) {

// populate state
vineyard::fuse::fs::state.vineyard_socket = options.vineyard_socket;
vineyard::fuse::fs::state.views.resize(options.cache_size);
LOG(INFO) << "prepare to conncet to socket"
<< vineyard::fuse::fs::state.vineyard_socket;

<< vineyard::fuse::fs::state.vineyard_socket << " with cache size "
<< options.cache_size;
vineyard::fuse::fs::state.ipc_desearilizer_registry =
vineyard::fuse::arrow_ipc_register_once();
return 0;
Expand Down
11 changes: 11 additions & 0 deletions modules/fuse/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,17 @@ def pytest_addoption(parser):
default='/tmp/vineyard_fuse.default',
help='fusermount directory',
)
parser.addoption(
'--vineyard-fuse-process-pid',
action='store',
default=None,
help='fusermount directory',
)


@pytest.fixture(scope='session')
def vineyard_fuse_process_pid(request):
return request.config.option.vineyard_fuse_process_pid


@pytest.fixture(scope='session')
Expand Down
Loading

0 comments on commit 4740031

Please sign in to comment.