Skip to content
Open
76 changes: 55 additions & 21 deletions core/api/rpc/wsc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,25 +37,26 @@ namespace fc::api::rpc {
return connect(ip, port, target, token);
}

outcome::result<void> Client::connect(const std::string &host,
const std::string &port,
const std::string &target,
const std::string &token) {
outcome::result<void> Client::connect(const std::string &host,
const std::string &port,
const std::string &target,
const std::string &token) {
boost::system::error_code ec;
socket.next_layer().connect({boost::asio::ip::make_address(host),
boost::lexical_cast<uint16_t>(port)},
ec);
socket->next_layer().connect({boost::asio::ip::make_address(host),
boost::lexical_cast<uint16_t>(port)},
ec);
if (ec) {
return ec;
}
if (not token.empty()) {
socket.set_option(
socket->set_option(
boost::beast::websocket::stream_base::decorator([&](auto &req) {
req.set(boost::beast::http::field::authorization,
"Bearer " + token);
}));
}
socket.handshake(host, target, ec);
socket->handshake(host, target, ec);
client_data = ClientData{host, port, target, token};
if (ec) {
return ec;
}
Expand Down Expand Up @@ -87,27 +88,28 @@ namespace fc::api::rpc {
}
}
chans.clear();
reconnect(3, std::chrono::seconds(5));
}

void Client::_flush() {
if (!writing && !write_queue.empty()) {
if (!writing && !write_queue.empty() && not reconnecting) {
auto &[id, buffer] = write_queue.front();
writing = true;
socket.async_write(boost::asio::buffer(buffer.data(), buffer.size()),
[=](auto &&ec, auto) {
std::lock_guard lock{mutex};
if (ec) {
return _error(ec);
}
writing = false;
write_queue.pop();
_flush();
});
socket->async_write(boost::asio::buffer(buffer.data(), buffer.size()),
[=](auto &&ec, auto) {
std::lock_guard lock{mutex};
if (ec) {
return _error(ec);
}
writing = false;
write_queue.pop();
_flush();
});
}
}

void Client::_read() {
socket.async_read(buffer, [=](auto &&ec, auto) {
socket->async_read(buffer, [=](auto &&ec, auto) {
if (ec) {
std::lock_guard lock{mutex};
return _error(ec);
Expand Down Expand Up @@ -185,4 +187,36 @@ namespace fc::api::rpc {
}
}
}

void Client::reconnect(int counter, std::chrono::milliseconds wait) {
if (reconnecting) return;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it thread-safe?

reconnecting = true;
bool rec_status{false};
logger_->info(
"Starting reconnect to {}:{}", client_data.host, client_data.port);
for (int i = 0; i < counter; i++) {
std::this_thread::sleep_for(wait);
socket.reset();
socket.emplace(io);
auto res = connect(client_data.host,
client_data.port,
client_data.target,
client_data.token);
if (not res.has_error()) {
rec_status = true;
break;
}
}
reconnecting = false;
if (rec_status) {
logger_->info("Reconnect to {}:{} was successful",
client_data.host,
client_data.port);
_flush();
} else {
logger_->error("Reconnect to {}:{} have been failed",
client_data.host,
client_data.port);
}
}
} // namespace fc::api::rpc
13 changes: 12 additions & 1 deletion core/api/rpc/wsc.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,21 +57,32 @@ namespace fc::api::rpc {
void setup(A &api) {
visit(api, [&](auto &m) { _setup(*this, m); });
}
struct ClientData {
std::string host;
std::string port;
std::string target;
std::string token;
} client_data;

void reconnect(int counter, std::chrono::milliseconds wait);

private:
std::thread thread;
IoThread thread_chan;
io_context io;
io_context &io2;
boost::asio::executor_work_guard<io_context::executor_type> work_guard;
boost::beast::websocket::stream<boost::asio::ip::tcp::socket> socket;
boost::optional<
boost::beast::websocket::stream<boost::asio::ip::tcp::socket>>
socket;
boost::beast::flat_buffer buffer;
std::mutex mutex;
uint64_t next_req{};
std::map<uint64_t, ResultCb> result_queue;
std::map<uint64_t, ChanCb> chans;
std::queue<std::pair<uint64_t, Bytes>> write_queue;
bool writing{false};
bool reconnecting{false};

template <typename M>
void _setup(Client &c, M &m);
Expand Down
10 changes: 5 additions & 5 deletions core/primitives/sector_file/sector_file.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,11 @@ namespace fc::primitives::sector_file {
struct SectorPaths {
public:
SectorId id;
std::string unsealed;
std::string sealed;
std::string cache;
std::string update;
std::string update_cache;
std::string unsealed{};
std::string sealed{};
std::string cache{};
std::string update{};
std::string update_cache{};

void setPathByType(const SectorFileType &file_type,
const std::string &path);
Expand Down
10 changes: 9 additions & 1 deletion core/primitives/types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ namespace fc::primitives {
struct FsStat {
uint64_t capacity = 0;
uint64_t available = 0;
uint64_t fs_available = 0; // Available to use for sector storage
uint64_t fs_available = 0; // Available to use for sector storage
uint64_t reserved = 0;
uint64_t max = 0;
uint64_t used = 0;
Expand Down Expand Up @@ -91,6 +91,14 @@ namespace fc::primitives {
std::vector<std::string> gpus;
};

inline bool operator==(const WorkerResources &lhs,
const WorkerResources &rhs) {
return (lhs.physical_memory == rhs.physical_memory
&& lhs.swap_memory == rhs.swap_memory
&& lhs.reserved_memory == rhs.reserved_memory
&& lhs.cpus == rhs.cpus && lhs.gpus == rhs.gpus);
}

struct WorkerInfo {
std::string hostname;
WorkerResources resources;
Expand Down
3 changes: 3 additions & 0 deletions core/sector_storage/impl/local_worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -904,4 +904,7 @@ namespace fc::sector_storage {

return call_id;
}
void LocalWorker::ping(std::function<void(const bool resp)> cb) {
Comment on lines 910 to +911
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
}
void LocalWorker::ping(std::function<void(const bool resp)> cb) {
}
void LocalWorker::ping(std::function<void(const bool resp)> cb) {

cb(true);
}
} // namespace fc::sector_storage
2 changes: 2 additions & 0 deletions core/sector_storage/impl/local_worker.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ namespace fc::sector_storage {
outcome::result<std::vector<primitives::StoragePath>> getAccessiblePaths()
override;

void ping(std::function<void(const bool resp)> cb) override;

private:
template <typename W, typename R>
outcome::result<CallId> asyncCall(const SectorRef &sector,
Expand Down
1 change: 0 additions & 1 deletion core/sector_storage/impl/manager_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,6 @@ namespace fc::sector_storage {

worker_handler->worker = std::move(worker);
worker_handler->info = std::move(info);

scheduler_->newWorker(std::move(worker_handler));

return outcome::success();
Expand Down
4 changes: 4 additions & 0 deletions core/sector_storage/impl/remote_worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -254,4 +254,8 @@ namespace fc::sector_storage {
AcquireMode mode) {
return api_.Fetch(sector, file_type, path_type, mode);
}

void RemoteWorker::ping(std::function<void(const bool resp)> cb) {
api_.Version([=](auto res) { cb(!res.has_error()); });
}
} // namespace fc::sector_storage
2 changes: 2 additions & 0 deletions core/sector_storage/impl/remote_worker.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ namespace fc::sector_storage {
const SectorRef &sector,
const PreCommit1Output &pre_commit_1_output) override;

void ping(std::function<void(const bool resp)> cb) override;

outcome::result<CallId> sealCommit1(const SectorRef &sector,
const SealRandomness &ticket,
const InteractiveRandomness &seed,
Expand Down
23 changes: 20 additions & 3 deletions core/sector_storage/impl/scheduler_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,11 @@ namespace fc::sector_storage {

void SchedulerImpl::newWorker(std::unique_ptr<WorkerHandle> worker) {
std::unique_lock<std::mutex> lock(workers_lock_);
for(const auto &[key, value] : workers_){
if(*value == *worker){
return;
}
}
if (current_worker_id_ == std::numeric_limits<uint64_t>::max()) {
current_worker_id_ = 0; // TODO(ortyomka): maybe better mechanism
}
Expand Down Expand Up @@ -196,10 +201,22 @@ namespace fc::sector_storage {
return SchedulerErrors::kCannotSelectWorker;
}

WorkerID wid = acceptable[0];

std::promise<WorkerID> wid_promise;
std::future<WorkerID> wid_future = wid_promise.get_future();
auto done = std::make_shared<std::atomic_bool>();
for (const auto &cur : acceptable) {
workers_[cur]->worker->ping([&wid_promise, done, cur](const bool resp) {
if (resp && !done->exchange(true)) {
wid_promise.set_value(cur);
}
});
}
auto status = wid_future.wait_for(std::chrono::seconds(5));
if (status == std::future_status::timeout) {
return false;
}
WorkerID wid = wid_future.get();
assignWorker(wid, workers_[wid], request);

return true;
}

Expand Down
5 changes: 5 additions & 0 deletions core/sector_storage/selector.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@ namespace fc::sector_storage {
ActiveResources active;
};

inline bool operator==(const WorkerHandle &lhs, const WorkerHandle &rhs) {
return lhs.info.hostname == rhs.info.hostname
&& lhs.info.resources == rhs.info.resources;
}

class WorkerSelector {
public:
virtual ~WorkerSelector() = default;
Expand Down
2 changes: 2 additions & 0 deletions core/sector_storage/worker.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,8 @@ namespace fc::sector_storage {

virtual outcome::result<std::vector<primitives::StoragePath>>
getAccessiblePaths() = 0;

virtual void ping(std::function<void(const bool resp)> cb) = 0;
};

enum class CallErrorCode : uint64_t {
Expand Down
8 changes: 6 additions & 2 deletions test/core/sector_storage/scheduler_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include "storage/in_memory/in_memory_storage.hpp"
#include "testutil/mocks/sector_storage/selector_mock.hpp"
#include "testutil/outcome.hpp"
#include "testutil/mocks/sector_storage/worker_mock.hpp"

namespace fc::sector_storage {
using primitives::WorkerInfo;
Expand Down Expand Up @@ -70,9 +71,12 @@ namespace fc::sector_storage {
EXPECT_OUTCOME_TRUE(scheduler, SchedulerImpl::newScheduler(io_, kv_));

scheduler_ = scheduler;

auto worker_test = std::make_shared<WorkerMock>();
EXPECT_CALL(*worker_test, ping(_)).WillRepeatedly(testing::Invoke([](auto cb){
cb(true);
}));
std::unique_ptr<WorkerHandle> worker = std::make_unique<WorkerHandle>();

worker->worker = std::move(worker_test);
worker_name_ = "worker";

worker->info = WorkerInfo{
Expand Down
1 change: 1 addition & 0 deletions test/testutil/mocks/sector_storage/worker_mock.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,5 +101,6 @@ namespace fc::sector_storage {
gsl::span<const UnpaddedPieceSize>,
const UnpaddedPieceSize &,
int));
MOCK_METHOD1(ping, void(std::function<void(const bool)>));
};
} // namespace fc::sector_storage