Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions benchmarks/storage_bench/StorageBench.cc
Original file line number Diff line number Diff line change
Expand Up @@ -169,13 +169,13 @@ bool runBenchmarks() {
endpointRawStrs.clear();
boost::split(endpointRawStrs, FLAGS_mgmtdEndpoints, boost::is_any_of(", "));

std::vector<net::Address> mgmtdEndpoints;
std::vector<net::NamedAddress> mgmtdEndpoints;

for (auto str : endpointRawStrs) {
boost::trim(str);
if (str.empty()) continue;

auto endpoint = net::Address::fromString(str);
auto endpoint = net::NamedAddress::from(str).value();
mgmtdEndpoints.push_back(endpoint);
XLOGF(WARN, "Add mgmtd endpoint: {}", endpoint);
}
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/storage_bench/StorageBench.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class StorageBench : public test::UnitTestFabric {
const std::string statsFilePath = "./perfstats.csv";
const std::vector<std::string> ibvDevices = {};
const std::vector<std::string> ibnetZones = {};
const std::vector<net::Address> mgmtdEndpoints = {};
const std::vector<net::NamedAddress> mgmtdEndpoints = {};
const std::string clusterId = kClusterId;
const uint32_t chainTableId = 0;
const uint32_t chainTableVersion = 0;
Expand Down
16 changes: 15 additions & 1 deletion src/client/mgmtd/MgmtdClient.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,15 @@
#include <folly/experimental/coro/Promise.h>
#include <folly/experimental/coro/Sleep.h>
#include <folly/logging/xlog.h>
#include <iterator>
#include <memory>
#include <scn/scn.h>

#include "common/app/ApplicationBase.h"
#include "common/utils/BackgroundRunner.h"
#include "common/utils/LogCommands.h"
#include "common/utils/Result.h"
#include "common/utils/Status.h"
#include "core/utils/ServiceOperation.h"
#include "core/utils/runOp.h"

Expand Down Expand Up @@ -461,7 +464,18 @@ struct MgmtdClient::Impl {
if (serverAddrs.empty()) {
return makeError(StatusCode::kInvalidConfig, "Empty mgmtdServers");
}
for (auto addr : serverAddrs) {
std::vector<net::Address> resolved;
for (const auto &addr : serverAddrs) {
auto res = addr.resolve(std::back_inserter(resolved));
if (res.hasError()) {
XLOG(WARN, "resolve mgmtd address: {}", res.error().describe());
}
}
if (resolved.empty()) {
return makeError(StatusCode::kInvalidConfig, "No resolved mgmtdServers");
}

for (auto addr : resolved) {
if (addr == net::Address(0))
return makeError(StatusCode::kInvalidConfig, "Invalid MGMTD address: " + addr.toString());
if (config_.network_type() && addr.type != *config_.network_type()) {
Expand Down
3 changes: 2 additions & 1 deletion src/client/mgmtd/MgmtdClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,15 @@
#include "RoutingInfo.h"
#include "common/utils/ConfigBase.h"
#include "common/utils/Duration.h"
#include "common/utils/NamedAddress.h"
#include "stubs/common/IStubFactory.h"
#include "stubs/mgmtd/IMgmtdServiceStub.h"

namespace hf3fs::client {
class MgmtdClient {
public:
struct Config : public ConfigBase<Config> {
CONFIG_ITEM(mgmtd_server_addresses, std::vector<net::Address>{});
CONFIG_ITEM(mgmtd_server_addresses, std::vector<net::NamedAddress>{});
CONFIG_ITEM(work_queue_size, 100, ConfigCheckers::checkPositive);
CONFIG_ITEM(network_type, std::optional<net::Address::Type>{});

Expand Down
71 changes: 71 additions & 0 deletions src/common/utils/NamedAddress.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
#include "common/utils/NamedAddress.h"

#include <netdb.h>
#include <netinet/in.h>
#include <sys/socket.h>

namespace hf3fs::net {

template<typename It>
requires std::output_iterator<It, Address>
Result<Void> NamedAddress::resolve(It out) const {
struct addrinfo req{
.ai_family = AF_INET,
.ai_socktype = SOCK_STREAM,
};
struct addrinfo *res;
int err = getaddrinfo(node.c_str(), service.c_str(), &req, &res);
if (err != 0) {
return MAKE_ERROR_F(StatusCode::kInvalidFormat, "failed to resolve {}:{}: {}", node, service, gai_strerror(err));
}
SCOPE_EXIT { freeaddrinfo(res); };

auto iter = res;
while (iter != nullptr) {
if (iter->ai_family == AF_INET) {
auto sin = (struct sockaddr_in *)iter->ai_addr;
if (sin->sin_family == AF_INET) {
*out++ = Address(sin->sin_addr.s_addr, ntohs(sin->sin_port), type);
}
}
iter = iter->ai_next;
}
return Void{};
}

template Result<Void> NamedAddress::resolve(std::back_insert_iterator<std::vector<Address>> out) const;

Result<NamedAddress> NamedAddress::from(std::string_view sv) {
constexpr std::string_view delimiter = "://";
auto pos = sv.find(delimiter);
auto tp = Address::Type::TCP;
if (pos != sv.npos) {
auto tpStr = sv.substr(0, pos);
auto opt = magic_enum::enum_cast<Address::Type>(tpStr, magic_enum::case_insensitive);
if (!opt) {
return makeError(StatusCode::kInvalidFormat, "invalid address type: {}", tpStr);
}
tp = *opt;
sv = sv.substr(pos + delimiter.size());
}
pos = sv.find_last_of(':');
if (pos == sv.npos) {
return makeError(StatusCode::kInvalidFormat, "service not found in address: {}", sv);
}
return NamedAddress(std::string(sv.substr(0, pos)), std::string(sv.substr(pos + 1)), tp);
}

NamedAddress to_named(Address addr) {
return NamedAddress(addr.ipStr(), std::to_string(addr.port), addr.type);
}

std::vector<NamedAddress> to_named(const std::vector<Address> &addrs) {
std::vector<NamedAddress> named;
named.reserve(addrs.size());
for (auto a : addrs) {
named.push_back(to_named(a));
}
return named;
}

}
38 changes: 38 additions & 0 deletions src/common/utils/NamedAddress.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
#pragma once

#include <string>
#include <iterator>

#include "common/utils/Address.h"
#include "common/utils/Result.h"

namespace hf3fs::net {

struct NamedAddress {
std::string node, service;
Address::Type type = Address::Type::TCP;

NamedAddress(std::string node, std::string service, Address::Type type)
: node(node), service(service), type(type) {}

bool operator==(const NamedAddress &other) const {
return node == other.node && service == other.service && type == other.type;
}

template<typename It>
requires std::output_iterator<It, Address>
Result<Void> resolve(It out) const;

std::string toString() const {
return fmt::format("{}://{}:{}", magic_enum::enum_name(type), node, service);
}

static Result<NamedAddress> from(std::string_view sv);
};

inline auto format_as(const NamedAddress &a) { return a.toString(); }

net::NamedAddress to_named(net::Address addr);
std::vector<net::NamedAddress> to_named(const std::vector<net::Address> &addrs);

}
2 changes: 1 addition & 1 deletion tests/client/ClientWithConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ struct MgmtdClientWithConfig {
MgmtdClientWithConfig(String clusterId,
stubs::ClientContextCreator clientContextCreator,
std::vector<net::Address> mgmtdServerAddrs) {
config.set_mgmtd_server_addresses(mgmtdServerAddrs);
config.set_mgmtd_server_addresses(to_named(mgmtdServerAddrs));
config.set_enable_auto_refresh(false);
config.set_enable_auto_heartbeat(false);
config.set_enable_auto_extend_client_session(false);
Expand Down
2 changes: 1 addition & 1 deletion tests/client/ServerWithConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ struct MetaServerWithConfig : ServerWithConfig<meta::server::MetaServer> {
MetaServerWithConfig(String clusterId, flat::NodeId nodeId, std::vector<net::Address> mgmtdServerAddrs)
: Base(std::move(clusterId), nodeId) {
auto &mgmtdClientConfig = config.mgmtd_client();
mgmtdClientConfig.set_mgmtd_server_addresses(mgmtdServerAddrs);
mgmtdClientConfig.set_mgmtd_server_addresses(to_named(mgmtdServerAddrs));
}
};

Expand Down
25 changes: 12 additions & 13 deletions tests/client/TestMgmtdClient.cc
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ TEST_F(MgmtdClientTest, testWhenNoPrimary) {
};

MgmtdClient::Config config;
config.set_mgmtd_server_addresses({ada, adb, adc});
config.set_mgmtd_server_addresses({to_named(ada), to_named(adb), to_named(adc)});
config.set_enable_auto_refresh(false);
config.set_enable_auto_heartbeat(false);
MgmtdClient client("", std::make_unique<StubFactory>(), config);
Expand All @@ -158,7 +158,7 @@ TEST_F(MgmtdClientTest, testWhenNoPrimary) {

TEST_F(MgmtdClientTest, testInvalidAddress) {
MgmtdClient::Config config;
config.set_mgmtd_server_addresses({ada, ade});
config.set_mgmtd_server_addresses({to_named(ada), to_named(ade)});
MgmtdClient client("", nullptr, config);

folly::coro::blockingWait([&]() -> CoTask<void> {
Expand All @@ -169,7 +169,7 @@ TEST_F(MgmtdClientTest, testInvalidAddress) {

TEST_F(MgmtdClientTest, testAddressTypeMismatch) {
MgmtdClient::Config config;
config.set_mgmtd_server_addresses({ada, adf});
config.set_mgmtd_server_addresses({to_named(ada), to_named(adf)});
config.set_network_type(net::Address::TCP);
MgmtdClient client("", nullptr, config);

Expand All @@ -195,7 +195,7 @@ TEST_F(MgmtdClientTest, testWhenLastIsPrimary) {
};

MgmtdClient::Config config;
config.set_mgmtd_server_addresses({ada, adb, adc});
config.set_mgmtd_server_addresses({to_named(ada), to_named(adb), to_named(adc)});
config.set_enable_auto_refresh(false);
config.set_enable_auto_heartbeat(false);
MgmtdClient client("", std::make_unique<StubFactory>(), config);
Expand Down Expand Up @@ -229,7 +229,7 @@ TEST_F(MgmtdClientTest, testWhenPrimaryNotInConfig) {
};

MgmtdClient::Config config;
config.set_mgmtd_server_addresses({ada, adb, adc});
config.set_mgmtd_server_addresses({to_named(ada), to_named(adb), to_named(adc)});
config.set_enable_auto_refresh(false);
config.set_enable_auto_heartbeat(false);
MgmtdClient client("", std::make_unique<StubFactory>(), config);
Expand Down Expand Up @@ -272,7 +272,7 @@ TEST_F(MgmtdClientTest, testWhenGetPrimaryLoop) {
};

MgmtdClient::Config config;
config.set_mgmtd_server_addresses(addresses);
config.set_mgmtd_server_addresses(to_named(addresses));
config.set_enable_auto_refresh(false);
config.set_enable_auto_heartbeat(false);
MgmtdClient client("", std::make_unique<StubFactory>(addresses), config);
Expand Down Expand Up @@ -321,7 +321,7 @@ TEST_F(MgmtdClientTest, testRetryOnRefreshFail) {
};

MgmtdClient::Config config;
config.set_mgmtd_server_addresses(addresses);
config.set_mgmtd_server_addresses(to_named(addresses));
config.set_enable_auto_refresh(false);
config.set_enable_auto_heartbeat(false);
MgmtdClient client("", std::make_unique<StubFactory>(), config);
Expand Down Expand Up @@ -356,7 +356,7 @@ TEST_F(MgmtdClientTest, testRefreshRoutingInfoCallback) {
};

MgmtdClient::Config config;
config.set_mgmtd_server_addresses(addresses);
config.set_mgmtd_server_addresses(to_named(addresses));
config.set_enable_auto_refresh(false);
config.set_enable_auto_heartbeat(false);
MgmtdClient client("", std::make_unique<StubFactory>(), config);
Expand Down Expand Up @@ -400,7 +400,7 @@ TEST_F(MgmtdClientTest, testRetryAllAvailableAddresses) {
};

MgmtdClient::Config config;
config.set_mgmtd_server_addresses({adj});
config.set_mgmtd_server_addresses({to_named(adj)});
config.set_enable_auto_refresh(false);
config.set_enable_auto_heartbeat(false);
config.set_network_type(net::Address::TCP);
Expand Down Expand Up @@ -462,7 +462,7 @@ TEST_F(MgmtdClientTest, testRetryEndWhenNoPrimary) {
};

MgmtdClient::Config config;
config.set_mgmtd_server_addresses({adj});
config.set_mgmtd_server_addresses({to_named(adj)});
config.set_enable_auto_refresh(false);
config.set_enable_auto_heartbeat(false);
config.set_network_type(net::Address::TCP);
Expand Down Expand Up @@ -522,7 +522,7 @@ TEST_F(MgmtdClientTest, testRetryUnknownAddrs) {
};

MgmtdClient::Config config;
config.set_mgmtd_server_addresses({adj, add});
config.set_mgmtd_server_addresses({to_named(adj), to_named(add)});
config.set_enable_auto_refresh(false);
config.set_enable_auto_heartbeat(false);
config.set_network_type(net::Address::TCP);
Expand Down Expand Up @@ -562,7 +562,6 @@ TEST_F(MgmtdClientTest, testRetryUnknownAddrs) {
}

TEST_F(MgmtdClientTest, testSetGetConfigViaInvoke) {
std::vector<net::Address> addresses = {ada};
struct StubFactory : public stubs::IStubFactory<mgmtd::IMgmtdServiceStub> {
std::map<flat::NodeType, flat::ConfigInfo> configs;
std::unique_ptr<mgmtd::IMgmtdServiceStub> create(net::Address) {
Expand All @@ -585,7 +584,7 @@ TEST_F(MgmtdClientTest, testSetGetConfigViaInvoke) {
};

MgmtdClient::Config config;
config.set_mgmtd_server_addresses(addresses);
config.set_mgmtd_server_addresses({to_named(ada)});
config.set_enable_auto_refresh(false);
config.set_enable_auto_heartbeat(false);
MgmtdClient client("", std::make_unique<StubFactory>(), config);
Expand Down
4 changes: 2 additions & 2 deletions tests/lib/UnitTestFabric.cc
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ std::unique_ptr<storage::StorageServer> UnitTestFabric::createStorageServer(size
serverConfig.targets().storage_target().kv_store().set_type(metaStoreType);
serverConfig.targets().storage_target().file_store().set_preopen_chunk_size_list(
std::set(chunkSizeList.begin(), chunkSizeList.end()));
serverConfig.mgmtd().set_mgmtd_server_addresses(mgmtdAddressList);
serverConfig.mgmtd().set_mgmtd_server_addresses(to_named(mgmtdAddressList));
serverConfig.coroutines_pool_read().set_threads_num(32);
serverConfig.coroutines_pool_read().set_coroutines_num(4096);
serverConfig.coroutines_pool_update().set_threads_num(32);
Expand Down Expand Up @@ -417,7 +417,7 @@ bool UnitTestFabric::setUpStorageSystem() {

mgmtdForClient_.reset((new FakeMgmtdClient(rawRoutingInfo_))->asCommon());
} else {
mgmtdClientConfig_.set_mgmtd_server_addresses(mgmtdAddressList);
mgmtdClientConfig_.set_mgmtd_server_addresses(to_named(mgmtdAddressList));
mgmtdClientConfig_.set_enable_auto_refresh(true);
mgmtdClientConfig_.set_enable_auto_heartbeat(false);
mgmtdClientConfig_.set_auto_refresh_interval(mgmtdServer_.config.service().check_status_interval());
Expand Down
2 changes: 1 addition & 1 deletion tests/meta/MetaTestBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ class MockCluster {
CONFIG_OBJ(mock_mgmtd, mgmtd::MockMgmtd::Config);
CONFIG_OBJ(mgmtd_client, client::MgmtdClientForServer::Config, [](client::MgmtdClientForServer::Config &cfg) {
cfg.set_enable_auto_heartbeat(false); // currently heartbeat is not needed in meta test
cfg.set_mgmtd_server_addresses({net::Address::from("TCP://127.0.0.1:8000").value()}); // just a fake TCP address.
cfg.set_mgmtd_server_addresses({net::NamedAddress::from("TCP://127.0.0.1:8000").value()}); // just a fake TCP address.
});

CONFIG_OBJ_ARRAY(chain_tables, ChainTableConfig, 64, [](std::array<ChainTableConfig, 64> &cfg) {
Expand Down
2 changes: 1 addition & 1 deletion tests/migration/TestMigrationService.cc
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ TEST_F(TestTestMigrationService, StartAndStopServer) {
const auto &mgmtdAddressList = mgmtdServer_.collectAddressList("Mgmtd");

server::MigrationServer::Config config;
config.mgmtd_client().set_mgmtd_server_addresses(mgmtdAddressList);
config.mgmtd_client().set_mgmtd_server_addresses(to_named(mgmtdAddressList));
server::MigrationServer server(config);

auto result = server.setup();
Expand Down