Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
8a92b06
Merge pull request #23 from DCP-CloudMesh/integration
jordanmao Mar 6, 2025
b5ffd7c
Use ip address from acceptConn in bootstrap to determine sender publi…
jordanmao Mar 12, 2025
61be411
remove ngrok
jordanmao Mar 12, 2025
2ae4aee
Update provider public ip address after bootstrapping
jordanmao Mar 13, 2025
61a30f1
more changes
jordanmao Mar 15, 2025
5712449
added ml/*.txt files to gitignore
jordanmao Mar 16, 2025
6042dcf
Have requester obtain its public ip in discovery response
jordanmao Mar 16, 2025
3622d32
avoid uses of const char* for host and port to prevent dangling point…
jordanmao Mar 16, 2025
fd3835b
Make bootstrap node public host and port configurable in env vars
jordanmao Mar 16, 2025
8782b05
Add tailscale installation instructions to README
jordanmao Mar 17, 2025
9945f94
Merge pull request #27 from DCP-CloudMesh/hamachi
jordanmao Mar 17, 2025
be1000c
Multiple Iterations Aggregation Cycles (#28)
PalaashKolhe Mar 17, 2025
059113e
testing
PalaashKolhe Mar 17, 2025
d38bed0
Fixed registration response deserialization bug
jordanmao Mar 18, 2025
dc92479
Added better command line processing for main.cpp
jordanmao Mar 23, 2025
27ffdf3
Update README
jordanmao Mar 24, 2025
99f661c
Merge pull request #29 from DCP-CloudMesh/better-cmd-line-args
jordanmao Mar 24, 2025
df06200
Seperate source (requestor) and target (provider) data or index file …
TonyxSun Feb 3, 2025
7f4bba9
Fix compilation error on Tony's Mac
TonyxSun Feb 2, 2025
55479fe
format
TonyxSun Feb 3, 2025
7efdf8f
Impl: two dir only, reduce risk of port timeout”
TonyxSun Feb 6, 2025
a4628b2
Fix bug with hanging fp's
TonyxSun Feb 25, 2025
c099e7a
Add ml txt to gitignore
TonyxSun Feb 27, 2025
3c82e9d
Work with main
TonyxSun Mar 24, 2025
48c1a72
Change timeout
TonyxSun Mar 24, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,4 @@ third_party/libzmq*
CIFAR10/output
CIFAR10/*.txt
ml/proto
ml/*.txt
7 changes: 3 additions & 4 deletions BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,13 @@ cc_library(
cc_library(
name = "src_files",
srcs = glob(["src/**/*.cpp"]),
hdrs = glob(["src/**/*.h"]),
hdrs = glob(["src/**/*.h"], allow_empty = True),
visibility = ["//visibility:public"],
deps = [
":include_files",
"@third_party//:libzmq",
"@third_party//:cppzmq",
"@boost//:program_options",
],
defines = local_defines,
)
Expand Down Expand Up @@ -62,9 +63,7 @@ cc_binary(
name = "provider",
srcs = ["main.cpp"],
defines = ["PROVIDER=1"] + local_defines,
deps = [
":src_files",
],
deps = [":src_files"],
)

cc_binary(
Expand Down
19 changes: 18 additions & 1 deletion MODULE.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,22 @@ local_repository(
path = "third_party",
)

bazel_dep(name = "protobuf", version = "29.0-rc1")
bazel_dep(name = "protobuf", version = "29.0")
bazel_dep(name = "googletest", version = "1.15.2")

bazel_dep(name = "rules_boost", repo_name = "com_github_nelhage_rules_boost")
archive_override(
module_name = "rules_boost",
urls = ["https://github.com/nelhage/rules_boost/archive/refs/heads/master.tar.gz"],
strip_prefix = "rules_boost-master",
# It is recommended to edit the above URL and the below sha256 to point to a specific version of this repository.
# integrity = "sha256-...",
)
non_module_boost_repositories = use_extension("@com_github_nelhage_rules_boost//:boost/repositories.bzl", "non_module_dependencies")
use_repo(
non_module_boost_repositories,
"boost",
)
bazel_dep(name = "bazel_skylib", version = "1.7.1")
bazel_dep(name = "rules_proto", version = "7.0.2")
bazel_dep(name = "rules_cc", version = "0.0.16")
48 changes: 38 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ Follow this to install bazel - https://bazel.build/install

We can install ZeroMQ for cpp as follows. Run the following starting from the `CloudMesh/third_party/` folder.

#### Tailscale

Tailscale is used to setup a P2P vpn network to connect the machines. Instructions to install can be found here:
https://tailscale.com/kb/1347/installation

#### Mac Dependencies

May need to install the following when troubleshooting issues
Expand Down Expand Up @@ -45,9 +50,15 @@ We install ZeroMQ for Python using `pip install pyzmq`.
- BUILD file - Contains the build instructions for the targets.
- MODULE.bazel file - Contains the module name and the dependencies.

## Compilation
## Compilation (Non Local - Multiple Machines)

To compile **BOOTSTRAP**, **PROVIDER** and **REQUESTER**, ensure that the `BOOTSTRAP_HOST` env variable is set, which can be done using the following command:
```
export BOOTSTRAP_HOST=___
```
The `BOOTSTRAP_PORT` env variable can also be set (unset is default to 8080).

To compile **BOOTSTRAP**, **PROVIDER** and **REQUESTER**, run the following commands:
Then run the following commands to compile the source code:
### MacOS
```
bazel build //... --experimental_google_legacy_api --config=macos
Expand Down Expand Up @@ -76,23 +87,40 @@ To execute, run the following commands:
```
./bazel-bin/bootstrap
```
(8080 is reserved for bootstrap port so peers know where to connect)
(Uses port 8080)

### Provider
```
./bazel-bin/provider [8080]
```
(8080 is the default port, optional parameter)
./bazel-bin/provider -p <port number>
```
Example:
```
./bazel-bin/provider -p 8081
```
Program arguments can be viewed with
```
./bazel-bin/provider -h
```

### Requester

```
./bazel-bin/requester [8080 [r | c]]
./bazel-bin/requester -w <# workers> -e <# epochs> -p <port number> -m <mode ('c' or 'r')>
```
Request to compute task example
```
./bazel-bin/requester -w 3 -e 10 -p 8082 -m c
```
`8080` is the default port, optional parameter\
`r` is an optional parameter to request to receive the result of the computation (use same port as original request execution)
`c` is an optional parameter to request to provide the computation
Request to receive results example
```
./bazel-bin/requester -p 8082 -m r
```
The program execution for receiving results must use the same port as the execution of the compute request.

Program arguments can be viewed with
```
./bazel-bin/requester -h
```

### Resources

Expand Down
4 changes: 2 additions & 2 deletions include/Networking/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@ class Client {
Sends all bytes from a buffer with resilience to partial sends with the
option of num_retries (=-1 for blocking).
*/
ssize_t send_all_bytes(const char* buffer, size_t length, int flags,
ssize_t sendAllBytes(const char* buffer, size_t length, int flags,
int num_retries = 0);
void closeSocket();

public:
Client();
~Client();
int setupConn(const char* HOST, const char* PORT, const char* CONNTYPE);
int setupConn(const IpAddress& ipAddress, const char* CONNTYPE);
int sendMsg(const std::string& data, int num_retries = 0);
};
Expand Down
18 changes: 10 additions & 8 deletions include/Networking/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,26 +10,28 @@
#include <unistd.h>

class Server {
const char* HOST;
const char* PORT;
IpAddress publicIp;
const char* CONNTYPE;
IpAddress publicIP;
int server; // stores the current running server id
int activeConn; // stores the current active connection id
int server = -1; // stores the current running server id
int activeConn = -1; // stores the current active connection id

/*
Receives all bytes into a buffer with resilience to partial data sends.
*/
ssize_t recv_all_bytes(char* buffer, size_t length, int flags, int num_retries = 0);
ssize_t recvAllBytes(char* buffer, size_t length, int flags, int num_retries = 0);
void closeSocket();

public:
Server(const char* host, const char* port, const char* type);
Server(const IpAddress& addr, const char* type);
~Server();
void setupServer(); // prepare server for connection
bool acceptConn(); // blocking
bool acceptConn(IpAddress& clientAddr); // blocking
int receiveFromConn(std::string& msg, int num_retries = 0); // process the active conn
void replyToConn(std::string message); // reply to the active conn
void getFileFTP(std::string filename); // retrieve remote file
void
getFileIntoDirFTP(std::string filename,
std::string directory); // retrieve remote file into directory
void closeConn(); // close the active conn
};

Expand Down
5 changes: 2 additions & 3 deletions include/Peers/bootstrap_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,12 @@

class BootstrapNode : public Peer {
public:
BootstrapNode(const char*, std::string);
BootstrapNode(std::string);
// ----------------- FIX LATER -----------------
BootstrapNode() {}
~BootstrapNode();

static const char* getServerIpAddress();
static const char* getServerPort();
static IpAddress getServerIpAddr();

void registerPeer(const std::string& peerUuid, const IpAddress& peerIpAddr);
AddressTable discoverPeers(const std::string& peerUuid,
Expand Down
6 changes: 3 additions & 3 deletions include/Peers/peer.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@

class Peer {
protected:
const char* host;
const char* port;
IpAddress publicIp;
std::string uuid;

Server* server;
Expand All @@ -22,7 +21,8 @@ class Peer {
public:
Peer();
Peer(const std::string& uuid);
void setupServer(const char* host, const char* port);
void setPublicIp(const IpAddress& addr);
void setupServer(const IpAddress& addr);
virtual ~Peer();
};

Expand Down
23 changes: 18 additions & 5 deletions include/Peers/provider.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@
#include "../RequestResponse/task_response.h"

#include "peer.h"
#include <algorithm>
#include <cstdio>
#include <cstdlib>
#include <iostream>
#include <memory>
#include <thread>
#include <vector>
Expand All @@ -18,7 +22,11 @@ class Provider : public Peer {
bool isLocalBootstrap;
bool isLeader;
std::unique_ptr<TaskRequest> taskRequest;
std::unique_ptr<TaskResponse> taskResponse;
std::shared_ptr<TaskResponse> taskResponse;

std::string currentAggregatedModelStateDict;

std::thread* workloadThread;

ZMQSender ml_zmq_sender;
ZMQReceiver ml_zmq_receiver;
Expand All @@ -27,18 +35,23 @@ class Provider : public Peer {
ZMQReceiver aggregator_zmq_receiver;

public:
Provider(const char* port, std::string uuid);
Provider(unsigned short port, std::string uuid);
~Provider() noexcept;

void registerWithBootstrap();
void listen();
void leaderHandleTaskRequest(const IpAddress& requesterIpAddr);
void followerHandleTaskRequest();
void processData();
void processWorkload(); // worker function to manipulate the TaskRequest
std::string
void
initializeWorkloadToML(); // worker function to manipulate the TaskRequest
void processWorkload(); //

void
ingestTrainingData(); // worker function to load training data into memory
TaskResponse aggregateResults(std::vector<std::string> followerData);

TaskResponse
aggregateResults(std::vector<std::shared_ptr<TaskResponse>> followerData);
};

#endif
2 changes: 1 addition & 1 deletion include/Peers/requester.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ class Requester : protected Peer {
void divideTask();

public:
Requester(const char* port);
Requester(unsigned short port);
~Requester() noexcept;
void sendDiscoveryRequest(unsigned int numProviders);
void waitForDiscoveryResponse();
Expand Down
4 changes: 3 additions & 1 deletion include/RequestResponse/discovery_response.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@
#include <string>

class DiscoveryResponse : public Payload {
IpAddress callerAddr;
AddressTable availablePeers;

public:
DiscoveryResponse();
DiscoveryResponse(const AddressTable& availablePeers);
DiscoveryResponse(const IpAddress& callerAddr, const AddressTable& availablePeers);

IpAddress getCallerPublicIpAddress() const;
AddressTable getAvailablePeers() const;
google::protobuf::Message* serializeToProto() const override;
void deserializeFromProto(
Expand Down
30 changes: 30 additions & 0 deletions include/RequestResponse/model_state_dict_params.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
#ifndef _MODEL_STATE_DICT_PARAMS_H
#define _MODEL_STATE_DICT_PARAMS_H

#include <string>
#include <vector>

#include "../utility.h"
#include "payload.h"

class ModelStateDictParams : public Payload {
// bytes representing the training data
std::string modelStateDict;
bool trainingIsComplete;

public:
ModelStateDictParams();
ModelStateDictParams(const std::string& modelStateDict);

std::string getTrainingData() const;
void setTrainingData(const std::string& modelStateDict);

bool getTrainingIsComplete() const;
void setTrainingIsComplete(bool trainingIsComplete);

google::protobuf::Message* serializeToProto() const override;
void deserializeFromProto(
const google::protobuf::Message& protoMessage) override;
};

#endif
4 changes: 3 additions & 1 deletion include/RequestResponse/payload.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@ class Payload {
UNKNOWN,
ACKNOWLEDGEMENT,
REGISTRATION,
REGISTRATION_RESPONSE,
DISCOVERY_REQUEST,
DISCOVERY_RESPONSE,
TASK_REQUEST,
TASK_RESPONSE
TASK_RESPONSE,
MODEL_STATE_DICT_PARAMS,
};

Payload(Type type);
Expand Down
21 changes: 21 additions & 0 deletions include/RequestResponse/registration_response.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
#ifndef _REGISTRATION_RESPONSE_
#define _REGISTRATION_RESPONSE_

#include "payload.h"
#include "../utility.h"
#include <string>

class RegistrationResponse : public Payload {
IpAddress callerAddr;

public:
RegistrationResponse();
RegistrationResponse(const IpAddress& callerAddr);

IpAddress getCallerPublicIpAddress() const;
google::protobuf::Message* serializeToProto() const override;
void deserializeFromProto(
const google::protobuf::Message& protoMessage) override;
};

#endif // _REGISTRATION_RESPONSE_
Loading