-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathudp_consumer.cpp
66 lines (48 loc) · 1.93 KB
/
udp_consumer.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
#include <stdio.h>
#include <stdlib.h>
#include "data_structs.hpp"
#include "service_discovery.h"
#include "udp_multicast_receiver.hpp"
#include "tcp_client.hpp"
#include "container_impl.h"
#include "udp_consumer_task.hpp"
#include "tcp_consumer_forward_task.hpp"
#include "worker_thread.hpp"
int main(int argc, char *argv[])
{
printf("UDP Multicast Consumer started...\n");
printf("Initializing containers...\n");
vi::container_hashset mainStorage;
vi::container_queue tcpConsumerQueue;
printf("Binding UDP receiver to interface: %s, group: %s, port: %d...\n", VI_INTERFACE_ADDRESS, VI_MULTICAST_GROUP, VI_MULTICAST_PORT);
vi::udp_multicast_receiver r;
r.connection().open()
.reuse_address()
.set_nonblocking()
.listen(VI_MULTICAST_PORT)
.join_multicast_group(VI_MULTICAST_GROUP, VI_INTERFACE_ADDRESS);
printf("Connecting to TCP Consumer on %s:%d\n", VI_INTERFACE_ADDRESS, VI_SERIALIZATION_PORT);
vi::tcp_client tc;
tc.connection().open()
.reuse_address()
.set_nonblocking();
while(tc.connection().connect(VI_INTERFACE_ADDRESS, VI_SERIALIZATION_PORT) != vi::tcp_socket::ok)
{
sleep(0);
}
printf("Connection to TCP Consumer established...\n");
printf("Starting threads...\n");
vi::udp_consumer_task task0(&mainStorage, &tcpConsumerQueue, &r);
vi::udp_consumer_task task1(&mainStorage, &tcpConsumerQueue, &r);
vi::tcp_consumer_forward_task task2(&mainStorage, &tcpConsumerQueue, &tc);
vi::worker_thread<vi::udp_consumer_task> thread0(task0);
vi::worker_thread<vi::udp_consumer_task> thread1(task1);
vi::worker_thread<vi::tcp_consumer_forward_task> thread2(task2);
thread0.start();
thread1.start();
thread2.start();
thread0.join();
thread1.join();
thread2.join();
printf("UDP Multicast Consumer finished...\n");
}