Skip to content

Commit 0b38061

Browse files
authored
Merge pull request #48 from Jorropo/multicast-linux
add first ultra hacky AsyncUDP support
2 parents e54db5e + b3cfafd commit 0b38061

File tree

2 files changed

+256
-0
lines changed

2 files changed

+256
-0
lines changed

cores/portduino/AsyncUDP.cpp

Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
#include "AsyncUDP.h"
2+
#include "Utility.h"
3+
4+
void _asyncudp_async_cb(uv_async_t *handle) {
5+
AsyncUDP *udp = (AsyncUDP *)handle->data;
6+
udp->_DO_NOT_CALL_async_cb();
7+
}
8+
9+
AsyncUDP::AsyncUDP() {
10+
_handler = NULL;
11+
_connected = false;
12+
uv_loop_init(&_loop);
13+
_async.data = this;
14+
uv_async_init(&_loop, &_async, _asyncudp_async_cb);
15+
}
16+
17+
AsyncUDP::~AsyncUDP() {
18+
_quit.store(true);
19+
uv_async_send(&_async);
20+
_ioThread.join();
21+
uv_loop_close(&_loop);
22+
}
23+
24+
asyncUDPSendTask::asyncUDPSendTask(uint8_t *data, size_t len, IPAddress addr, uint16_t port) {
25+
this->data = (uint8_t*)malloc(len);
26+
memcpy(this->data, data, len);
27+
this->len = len;
28+
this->addr = addr;
29+
this->port = port;
30+
}
31+
32+
void _asyncudp_alloc_buffer_cb(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf) {
33+
buf->base = (char *)malloc(suggested_size);
34+
buf->len = suggested_size;
35+
}
36+
37+
void _asyncudp_on_read_cb(uv_udp_t *handle, ssize_t nread, const uv_buf_t *buf, const struct sockaddr *addr, unsigned flags) {
38+
AsyncUDP *udp = (AsyncUDP *)handle->data;
39+
udp->_DO_NOT_CALL_uv_on_read(handle, nread, buf, addr, flags);
40+
}
41+
42+
void AsyncUDP::_DO_NOT_CALL_uv_on_read(uv_udp_t *handle, ssize_t nread, const uv_buf_t *buf, const struct sockaddr *addr, unsigned flags) {
43+
_handlerMutex.lock();
44+
auto h = _handler;
45+
_handlerMutex.unlock();
46+
if (h) {
47+
AsyncUDPPacket packet((uint8_t*)buf->base, nread);
48+
h(packet);
49+
}
50+
free(buf->base);
51+
}
52+
53+
bool AsyncUDP::listenMulticast(const IPAddress addr, uint16_t port, uint8_t ttl) {
54+
if (_connected) {
55+
return false;
56+
}
57+
if (uv_udp_init(&_loop, &_socket) < 0) {
58+
portduinoError("FIXME: implement proper error handling; uv_udp_init failed");
59+
}
60+
_socket.data = this;
61+
// FIXME: don't do bytes → string → bytes IP conversion
62+
int maxIpLength = 3*4+3; // 3 digits per octet, 4 octets, 3 dots
63+
char addr_str[maxIpLength+1]; // +1 for null terminator
64+
snprintf(addr_str, maxIpLength, "%d.%d.%d.%d", addr[0], addr[1], addr[2], addr[3]);
65+
addr_str[maxIpLength] = '\0';
66+
struct sockaddr uvAddr;
67+
uv_ip4_addr(addr_str, port, (struct sockaddr_in *)&uvAddr);
68+
if (uv_udp_bind(&_socket, (const struct sockaddr *)&uvAddr, 0) < 0) {
69+
portduinoError("FIXME: implement proper error handling; uv_udp_bind failed");
70+
}
71+
if (uv_udp_set_multicast_loop(&_socket, false) < 0) {
72+
portduinoError("FIXME: implement proper error handling; uv_udp_set_multicast_loop failed");
73+
}
74+
if (uv_udp_set_multicast_ttl(&_socket, ttl) < 0) {
75+
portduinoError("FIXME: implement proper error handling; uv_udp_set_multicast_ttl failed");
76+
}
77+
if (uv_udp_set_membership(&_socket, addr_str, NULL, UV_JOIN_GROUP) < 0) {
78+
portduinoError("FIXME: implement proper error handling; uv_udp_set_membership failed");
79+
}
80+
if (uv_udp_recv_start(&_socket, _asyncudp_alloc_buffer_cb, _asyncudp_on_read_cb) < 0) {
81+
portduinoError("FIXME: implement proper error handling; uv_udp_recv_start failed");
82+
}
83+
84+
_ioThread = std::thread([this](){
85+
uv_run(&_loop, UV_RUN_DEFAULT);
86+
});
87+
88+
_listenIP = addr;
89+
_connected = true;
90+
return true;
91+
}
92+
93+
size_t AsyncUDP::writeTo(const uint8_t *data, size_t len, const IPAddress addr, uint16_t port) {
94+
auto task = std::make_unique<asyncUDPSendTask>((uint8_t*)data, len, addr, port);
95+
_sendQueueMutex.lock();
96+
_sendQueue.push_back(std::move(task));
97+
_sendQueueMutex.unlock();
98+
uv_async_send(&_async);
99+
return len;
100+
}
101+
102+
void AsyncUDP::_DO_NOT_CALL_async_cb() {
103+
_sendQueueMutex.lock();
104+
while (!_sendQueue.empty()) {
105+
auto task = std::move(_sendQueue.back());
106+
_sendQueue.pop_back();
107+
_sendQueueMutex.unlock();
108+
_doWrite(task->data, task->len, task->addr, task->port);
109+
_sendQueueMutex.lock();
110+
}
111+
_sendQueueMutex.unlock();
112+
if (_quit.load()) {
113+
uv_udp_recv_stop(&_socket);
114+
// FIXME: don't do bytes → string → bytes IP conversion
115+
int maxIpLength = 3*4+3; // 3 digits per octet, 4 octets, 3 dots
116+
char addr_str[maxIpLength+1]; // +1 for null terminator
117+
snprintf(addr_str, maxIpLength, "%d.%d.%d.%d", _listenIP[0], _listenIP[1], _listenIP[2], _listenIP[3]);
118+
addr_str[maxIpLength] = '\0';
119+
uv_udp_set_membership(&_socket, addr_str, NULL, UV_LEAVE_GROUP);
120+
uv_stop(&_loop);
121+
}
122+
}
123+
124+
void _asyncudp_send_cb(uv_udp_send_t *req, int status) {
125+
free(req);
126+
}
127+
128+
void AsyncUDP::_doWrite(const uint8_t *data, size_t len, const IPAddress addr, uint16_t port) {
129+
// FIXME: don't do bytes → string → bytes IP conversion
130+
int maxIpLength = 3*4+3; // 3 digits per octet, 4 octets, 3 dots
131+
char addr_str[maxIpLength+1]; // +1 for null terminator
132+
snprintf(addr_str, maxIpLength, "%d.%d.%d.%d", addr[0], addr[1], addr[2], addr[3]);
133+
addr_str[maxIpLength] = '\0';
134+
135+
// FIXME: implement error handling rather than raising SIGSEGV
136+
struct sockaddr uvAddr;
137+
uv_ip4_addr(addr_str, port, (struct sockaddr_in *)&uvAddr);
138+
139+
uv_udp_send_t *req = (uv_udp_send_t *)malloc(sizeof(uv_udp_send_t));
140+
uv_buf_t msg;
141+
msg.base = (char *)data;
142+
msg.len = len;
143+
if (uv_udp_send(req, &_socket, &msg, 1, (const struct sockaddr *)&uvAddr, _asyncudp_send_cb) < 0) {
144+
portduinoError("FIXME: implement proper error handling; uv_udp_send failed");
145+
}
146+
}

cores/portduino/AsyncUDP.h

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
#ifndef ESPASYNCUDP_H
2+
#define ESPASYNCUDP_H
3+
4+
#include "IPAddress.h"
5+
#include "Print.h"
6+
#include <functional>
7+
#include <atomic>
8+
#include <mutex>
9+
#include <memory>
10+
#include <thread>
11+
#include <uv.h>
12+
13+
class AsyncUDP;
14+
15+
class AsyncUDPPacket final
16+
{
17+
private:
18+
uint8_t *_data;
19+
size_t _len;
20+
21+
protected:
22+
AsyncUDPPacket(uint8_t* byte, size_t len) {
23+
_data = byte;
24+
_len = len;
25+
};
26+
27+
public:
28+
uint8_t * data() {
29+
return _data;
30+
};
31+
size_t length() {
32+
return _len;
33+
};
34+
35+
friend AsyncUDP;
36+
};
37+
38+
class asyncUDPSendTask final {
39+
protected:
40+
uint8_t *data;
41+
size_t len;
42+
IPAddress addr;
43+
uint16_t port;
44+
45+
public:
46+
asyncUDPSendTask(uint8_t *data, size_t len, IPAddress addr, uint16_t port);
47+
48+
~asyncUDPSendTask() {
49+
free(data);
50+
};
51+
52+
friend AsyncUDP;
53+
};
54+
55+
typedef std::function<void(AsyncUDPPacket& packet)> AuPacketHandlerFunction;
56+
typedef std::function<void(void * arg, AsyncUDPPacket& packet)> AuPacketHandlerFunctionWithArg;
57+
58+
class AsyncUDP final
59+
{
60+
private:
61+
std::mutex _handlerMutex;
62+
AuPacketHandlerFunction _handler;
63+
64+
std::mutex _sendQueueMutex;
65+
// the queue is used because uv_udp_send is not threadsafe and uv_async can merge multiple calls into one callback
66+
std::vector<std::unique_ptr<asyncUDPSendTask>> _sendQueue;
67+
68+
std::atomic<bool> _quit;
69+
std::thread _ioThread;
70+
71+
bool _connected;
72+
IPAddress _listenIP;
73+
74+
uv_loop_t _loop;
75+
uv_udp_t _socket;
76+
uv_async_t _async;
77+
78+
public:
79+
AsyncUDP();
80+
~AsyncUDP();
81+
82+
void onPacket(AuPacketHandlerFunctionWithArg cb, void * arg=NULL) {
83+
onPacket(std::bind(cb, arg, std::placeholders::_1));
84+
};
85+
void onPacket(AuPacketHandlerFunction cb) {
86+
_handlerMutex.lock();
87+
_handler = cb;
88+
_handlerMutex.unlock();
89+
};
90+
91+
bool listenMulticast(const IPAddress addr, uint16_t port, uint8_t ttl=1);
92+
93+
size_t writeTo(const uint8_t *data, size_t len, const IPAddress addr, uint16_t port);
94+
95+
IPAddress listenIP() {
96+
return _listenIP;
97+
};
98+
operator bool() {
99+
return _connected;
100+
};
101+
102+
// do not call, used internally as callback from libuv's C callback
103+
void _DO_NOT_CALL_uv_on_read(uv_udp_t *handle, ssize_t nread, const uv_buf_t *buf, const struct sockaddr *addr, unsigned flags);
104+
void _DO_NOT_CALL_async_cb();
105+
106+
private:
107+
void _doWrite(const uint8_t *data, size_t len, const IPAddress addr, uint16_t port);
108+
};
109+
110+
#endif

0 commit comments

Comments
 (0)