Skip to content
Draft
Show file tree
Hide file tree
Changes from 51 commits
Commits
Show all changes
54 commits
Select commit Hold shift + click to select a range
463085c
First trial version
Jul 29, 2022
7fbca8a
Completed balancing code (compiled, not tested)
Aug 1, 2022
6602b15
Updated to latest master
Aug 4, 2022
29735ff
First version compiled with new recv buffer. Nothing tested.
Aug 19, 2022
3549bb8
Merged last upstream. Fixed compiling against ENABLE_NEW_RCVBUFFER
Aug 22, 2022
027072a
Fixed referring to buffer pointer in the socket
Aug 24, 2022
7a5853b
Fixed some bux during the first run
Aug 24, 2022
44803f2
Merge remote-tracking branch 'origin/master' into dev-add-group-balan…
Aug 24, 2022
6862b97
Fixed getting group status with the reading function
Aug 25, 2022
087b8db
Fixed bux in receiver buffer. Fixed handshake resource creation order…
Aug 30, 2022
b5d95d2
Added a workaround to read-ready check for a group-member socket
Aug 31, 2022
f03b09e
Added specific algorithm for tracking group loss for balancing groups
Sep 2, 2022
38510ab
Fixed broken release build
ethouris Sep 2, 2022
23dec40
Fixed bug: wrong loss range found in the buffer. Added broadcast grou…
ethouris Sep 2, 2022
e739133
Added common group losses handling
Sep 22, 2022
92639ad
Implemented receiver loss management for new bonding
ethouris Sep 22, 2022
2de148e
Cosmetics
ethouris Sep 22, 2022
047f126
Merge branch 'master' into dev-add-group-balancing
ethouris Sep 22, 2022
d07e71a
Minor bugfixes. Some line reordering and comment fixes
ethouris Sep 22, 2022
dbc3729
Fixed one deadlock. Added verification in snd-loss-removal procedure
Sep 23, 2022
b514108
Added handling of balancing group for srt-test-live. Fixed permissions.
ethouris Sep 26, 2022
a1bb540
Fixed balancing mode sending. Fixed loss list selective extraction. A…
Oct 5, 2022
1f8faac
Improved and fixed balancing loss detection
ethouris Oct 24, 2022
faf9bec
Merged against latest master
Oct 25, 2022
deeaf05
Some cosmetic fixes after the update
Oct 26, 2022
974a6ed
Merged with master (changes for AEAD)
Oct 27, 2022
c103433
Updated taking pts for stats and logs only where needed. Added more e…
Nov 3, 2022
b3f768f
Improved logging for the receiver side
ethouris Nov 3, 2022
e979a68
Updated from master and post-fixed
Nov 3, 2022
a5d0e73
Merge remote-tracking branch 'refs/remotes/ethouris/dev-add-group-bal…
Nov 3, 2022
b15876e
Added some comments regarding the new TSBPD triggering rules
Nov 7, 2022
37ec000
Updated to the latest master and fixed
Nov 8, 2022
e217b7d
Updated to latest upstream
Nov 23, 2022
251c9c5
Updated, part 1. Not tested (only UT)
Dec 6, 2022
4b39b7b
Merge branch 'master' into dev-add-group-balancing
Dec 6, 2022
631adb0
Some cosmetics and updates from upstream
Dec 7, 2022
26d11de
Updated to latest upstream
Dec 7, 2022
db18435
Fixed atomic operator bug (reported by gcc-11)
Dec 7, 2022
9f7db49
Fixed: for backup link sync use the common group buffer start sequence
Dec 8, 2022
35450ca
Fixed deadlock on 2L locked GroupLock. Fixed crash when discarding a …
Dec 15, 2022
a826e3c
Trial fix for the rogue ACK in backup groups
Dec 16, 2022
b116838
Some cosmetic log fixes
ethouris Dec 16, 2022
eaf51e4
Updated to latest upstream
ethouris Dec 19, 2022
dc2dcdf
Updated lock info in the comment
Jul 18, 2023
4302615
Updated and fixed
Feb 21, 2024
7bdd82c
Merged and post-fixed
Feb 22, 2024
432b0c7
Fixed some problems around macros that caused build breaks
Feb 22, 2024
f416392
Fixed gcc extension causing BB on Windows
Feb 22, 2024
5b99754
Fixed some build breaks reported by CI
Feb 26, 2024
328c359
Fixed C++03 incompat case
Feb 26, 2024
2e182a8
Updated with latest changes in receiver buffer. NOT TESTED, one IPE i…
Jun 20, 2024
498d980
Updated and fixed
Sep 4, 2024
b9d0c48
Updted and fixed
Apr 9, 2025
5115561
Merged with latest dev and fixed. Fixed managed flag on accepted memb…
Feb 13, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions apps/socketoptions.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,7 @@ const SocketOption srt_options [] {
#if ENABLE_BONDING
{ "groupconnect", 0, SRTO_GROUPCONNECT, SocketOption::PRE, SocketOption::INT, nullptr},
{ "groupminstabletimeo", 0, SRTO_GROUPMINSTABLETIMEO, SocketOption::PRE, SocketOption::INT, nullptr},
{ "groupconfig", 0, SRTO_GROUPCONFIG, SocketOption::PRE, SocketOption::STRING, nullptr},
#endif
#ifdef SRT_ENABLE_BINDTODEVICE
{ "bindtodevice", 0, SRTO_BINDTODEVICE, SocketOption::PRE, SocketOption::STRING, nullptr},
Expand Down
63 changes: 30 additions & 33 deletions srtcore/api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,17 @@ void srt::CUDTSocket::setBrokenClosed()

bool srt::CUDTSocket::readReady()
{
#if ENABLE_BONDING

// If this is a group member socket, then reading happens exclusively from
// the group and the socket is only used as a connection point, packet
// dispatching and single link management. Data buffering and hence ability
// to deliver a packet through API is exclusively the matter of group,
// therefore a single socket is never "read ready".

if (m_GroupOf)
return false;
#endif
if (m_UDT.m_bConnected && m_UDT.isRcvBufferReady())
return true;

Expand Down Expand Up @@ -716,35 +727,17 @@ int srt::CUDTUnited::newConnection(const SRTSOCKET listen,
g->m_bConnected = true;
}

// XXX PROLBEM!!! These events are subscribed here so that this is done once, lazily,
// but groupwise connections could be accepted from multiple listeners for the same group!
// m_listener MUST BE A CONTAINER, NOT POINTER!!!
// ALSO: Maybe checking "the same listener" is not necessary as subscruption may be done
// multiple times anyway?
if (!g->m_listener)
{
// Newly created group from the listener, which hasn't yet
// the listener set.
g->m_listener = ls;

// Listen on both first connected socket and continued sockets.
// This might help with jump-over situations, and in regular continued
// sockets the IN event won't be reported anyway.
int listener_modes = SRT_EPOLL_ACCEPT | SRT_EPOLL_UPDATE;
epoll_add_usock_INTERNAL(g->m_RcvEID, ls, &listener_modes);

// This listening should be done always when a first connected socket
// appears as accepted off the listener. This is for the sake of swait() calls
// inside the group receiving and sending functions so that they get
// interrupted when a new socket is connected.
}
// In the new recvbuffer mode (and common receiver buffer) there's no waiting for reception
// on a socket and no reading from a socket directly is being done; instead the reading API
// is directly bound to the group and reading happens directly from the group's buffer.
// This includes also a situation of a newly connected socket, which will be delivering packets
// into the same common receiver buffer for the group, so readable will be the group itself
// when it has its own common buffer read-ready, by whatever reason. Packets to the buffer
// will be delivered by the sockets' receiver threads, so all these things happen strictly
// in the background.

// Add also per-direction subscription for the about-to-be-accepted socket.
// Both first accepted socket that makes the group-accept and every next
// socket that adds a new link.
int read_modes = SRT_EPOLL_IN | SRT_EPOLL_ERR;
// Keep per-socket sender ready EID.
int write_modes = SRT_EPOLL_OUT | SRT_EPOLL_ERR;
epoll_add_usock_INTERNAL(g->m_RcvEID, ns, &read_modes);
epoll_add_usock_INTERNAL(g->m_SndEID, ns, &write_modes);

// With app reader, do not set groupPacketArrival (block the
Expand Down Expand Up @@ -1439,7 +1432,7 @@ int srt::CUDTUnited::groupConnect(CUDTGroup* pg, SRT_SOCKGROUPCONFIG* targets, i
// Do it after setting all stored options, as some of them may
// influence some group data.

srt::groups::SocketData data = srt::groups::prepareSocketData(ns);
srt::groups::SocketData data = srt::groups::prepareSocketData(ns, g.type());
if (targets[tii].token != -1)
{
// Reuse the token, if specified by the caller
Expand Down Expand Up @@ -1530,7 +1523,6 @@ int srt::CUDTUnited::groupConnect(CUDTGroup* pg, SRT_SOCKGROUPCONFIG* targets, i
// connection succeeded or failed and whether the new socket is
// ready to use or needs to be closed.
epoll_add_usock_INTERNAL(g.m_SndEID, ns, &connect_modes);
epoll_add_usock_INTERNAL(g.m_RcvEID, ns, &connect_modes);

// Adding a socket on which we need to block to BOTH these tracking EIDs
// and the blocker EID. We'll simply remove from them later all sockets that
Expand Down Expand Up @@ -1652,7 +1644,6 @@ int srt::CUDTUnited::groupConnect(CUDTGroup* pg, SRT_SOCKGROUPCONFIG* targets, i
f->sndstate = SRT_GST_BROKEN;
f->rcvstate = SRT_GST_BROKEN;
epoll_remove_socket_INTERNAL(g.m_SndEID, ns);
epoll_remove_socket_INTERNAL(g.m_RcvEID, ns);
}
else
{
Expand Down Expand Up @@ -1738,7 +1729,6 @@ int srt::CUDTUnited::groupConnect(CUDTGroup* pg, SRT_SOCKGROUPCONFIG* targets, i

epoll_remove_socket_INTERNAL(eid, y->second);
epoll_remove_socket_INTERNAL(g.m_SndEID, y->second);
epoll_remove_socket_INTERNAL(g.m_RcvEID, y->second);
}
}

Expand Down Expand Up @@ -1778,7 +1768,6 @@ int srt::CUDTUnited::groupConnect(CUDTGroup* pg, SRT_SOCKGROUPCONFIG* targets, i

epoll_remove_socket_INTERNAL(eid, s);
epoll_remove_socket_INTERNAL(g.m_SndEID, s);
epoll_remove_socket_INTERNAL(g.m_RcvEID, s);

continue;
}
Expand Down Expand Up @@ -2287,6 +2276,8 @@ int srt::CUDTUnited::select(UDT::UDSET* readfds, UDT::UDSET* writefds, UDT::UDSE
return count;
}

// XXX This may crash when a member socket is added to selectEx.
// Consider revising to prevent a member socket from being used.
int srt::CUDTUnited::selectEx(const vector<SRTSOCKET>& fds,
vector<SRTSOCKET>* readfds,
vector<SRTSOCKET>* writefds,
Expand Down Expand Up @@ -2622,6 +2613,12 @@ void srt::CUDTUnited::checkBrokenSockets()
continue;
}
else

// Additional note on group receiver: with the new group
// receiver m_pRcvBuffer in the socket core is NULL always,
// but that's not a problem - you can close the member socket
// safely without worrying about reading data because they are
// in the group anyway.
{
CUDT& u = s->core();

Expand Down Expand Up @@ -3186,7 +3183,7 @@ void srt::CUDTUnited::updateMux(CUDTSocket* s, const sockaddr_any& reqaddr, cons
m.m_pSndQueue = new CSndQueue;
m.m_pSndQueue->init(m.m_pChannel, m.m_pTimer);
m.m_pRcvQueue = new CRcvQueue;
m.m_pRcvQueue->init(128, s->core().maxPayloadSize(), m.m_iIPversion, 1024, m.m_pChannel, m.m_pTimer);
m.m_pRcvQueue->init(128, s->core().maxPayloadSize(), m.m_iIPversion, 1024, m.m_pChannel, m.m_pTimer, s->m_SocketID);

// Rewrite the port here, as it might be only known upon return
// from CChannel::open.
Expand Down
5 changes: 4 additions & 1 deletion srtcore/api.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,8 @@ class CUDTSocket

// XXX Controversial as to whether it should be guarded by this lock.
// It is used in many places without the lock, and it is also atomic.
SRT_ATTR_GUARDED_BY(m_ControlLock)
// (blocked because it makes more ado than actual help)
//SRT_ATTR_GUARDED_BY(m_ControlLock)
sync::atomic<SRT_SOCKSTATUS> m_Status; //< current socket state

/// Time when the socket is closed.
Expand Down Expand Up @@ -486,6 +487,8 @@ class CUDTUnited
#endif

void checkBrokenSockets();

SRT_ATTR_REQUIRES(m_GlobControlLock)
void removeSocket(const SRTSOCKET u);

CEPoll m_EPoll; // handling epoll data structures and events
Expand Down
34 changes: 29 additions & 5 deletions srtcore/buffer_rcv.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,9 @@ namespace {
* m_iMaxPosOff: none? (modified on add and ack
*/

CRcvBuffer::CRcvBuffer(int initSeqNo, size_t size, CUnitQueue* unitqueue, bool bMessageAPI)
CRcvBuffer::CRcvBuffer(int initSeqNo, size_t size, /*CUnitQueue* unitqueue, */ bool bMessageAPI)
: m_entries(size)
, m_szSize(size) // TODO: maybe just use m_entries.size()
, m_pUnitQueue(unitqueue)
, m_iStartSeqNo(initSeqNo) // NOTE: SRT_SEQNO_NONE is allowed here.
, m_iStartPos(0)
, m_iEndOff(0)
Expand All @@ -129,7 +128,7 @@ CRcvBuffer::~CRcvBuffer()
if (!it->pUnit)
continue;

m_pUnitQueue->makeUnitFree(it->pUnit);
it->pUnit->m_pParentQueue->makeUnitFree(it->pUnit);
it->pUnit = NULL;
}
}
Expand Down Expand Up @@ -201,7 +200,8 @@ CRcvBuffer::InsertInfo CRcvBuffer::insert(CUnit* unit)
}
SRT_ASSERT(m_entries[newpktpos].pUnit == NULL);

m_pUnitQueue->makeUnitTaken(unit);
CUnitQueue* q = unit->m_pParentQueue;
q->makeUnitTaken(unit);
m_entries[newpktpos].pUnit = unit;
m_entries[newpktpos].status = EntryState_Avail;
countBytes(1, (int)unit->m_Packet.getLength());
Expand Down Expand Up @@ -1133,7 +1133,7 @@ void CRcvBuffer::releaseUnitInPos(CPos pos)
CUnit* tmp = m_entries[pos].pUnit;
m_entries[pos] = Entry(); // pUnit = NULL; status = Empty
if (tmp != NULL)
m_pUnitQueue->makeUnitFree(tmp);
tmp->m_pParentQueue->makeUnitFree(tmp);
}

bool CRcvBuffer::dropUnitInPos(CPos pos)
Expand Down Expand Up @@ -1631,5 +1631,29 @@ int32_t CRcvBuffer::getFirstLossSeq(int32_t fromseq, int32_t* pw_end)
return ret_seq;
}

void CRcvBuffer::getUnitSeriesInfo(int32_t fromseq, size_t maxsize, std::vector<SRTSOCKET>& w_sources)
{
const int offset = CSeqNo(fromseq) - m_iStartSeqNo;

// Check if it's still inside the buffer
if (offset < 0 || offset >= m_iMaxPosOff)
return;

// All you need to do is to check if there's a valid packet
// at given position
size_t pass = 0;
for (int off = offset; off < m_iMaxPosOff; ++off)
{
int pos = incPos(m_iStartPos, off);
if (m_entries[pos].pUnit)
{
w_sources.push_back(m_entries[pos].pUnit->m_pParentQueue->ownerID());
++pass;
if (pass == maxsize)
break;
}
}
}


} // namespace srt
9 changes: 7 additions & 2 deletions srtcore/buffer_rcv.h
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,7 @@ class CRcvBuffer
typedef sync::steady_clock::duration duration;

public:
CRcvBuffer(int initSeqNo, size_t size, CUnitQueue* unitqueue, bool bMessageAPI);
CRcvBuffer(int initSeqNo, size_t size, /*CUnitQueue* unitqueue, */ bool bMessageAPI);

~CRcvBuffer();

Expand Down Expand Up @@ -619,6 +619,7 @@ class CRcvBuffer
std::pair<int, int> getAvailablePacketsRange() const;

int32_t getFirstLossSeq(int32_t fromseq, int32_t* opt_end = NULL);
void getUnitSeriesInfo(int32_t fromseq, size_t maxsize, std::vector<SRTSOCKET>& w_sources);

bool empty() const
{
Expand Down Expand Up @@ -826,7 +827,11 @@ class CRcvBuffer
entries_t m_entries;

const size_t m_szSize; // size of the array of units (buffer)
CUnitQueue* m_pUnitQueue; // the shared unit queue

//XXX removed. In this buffer the units may come from various different
// queues, and the unit has a pointer pointing to the queue from which
// it comes, and it should be returned to the same queue.
//CUnitQueue* m_pUnitQueue; // the shared unit queue

CSeqNo m_iStartSeqNo;
CPos m_iStartPos; // the head position for I/O (inclusive)
Expand Down
20 changes: 20 additions & 0 deletions srtcore/buffer_snd.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -550,12 +550,20 @@ void CSndBuffer::ackData(int offset)
{
ScopedLock bufferguard(m_BufLock);

// It is illegal to call this function without having first checked
// that the offset is within the range between 0 and the current count.
SRT_ASSERT(offset <= m_iCount);

bool move = false;
for (int i = 0; i < offset; ++i)
{
m_iBytesCount -= m_pFirstBlock->m_iLength;
if (m_pFirstBlock == m_pCurrBlock)
move = true;

// Sanity check to see if signing off for removal didn't
// exceed the last position of the used space.
SRT_ASSERT(m_pFirstBlock != m_pLastBlock);
m_pFirstBlock = m_pFirstBlock->m_pNext;
}
if (move)
Expand All @@ -566,6 +574,18 @@ void CSndBuffer::ackData(int offset)
updAvgBufSize(steady_clock::now());
}

void CSndBuffer::clear()
{
ScopedLock bufferguard(m_BufLock);

// Keep the m_pLastBlock intact and adjust the other
// fields to it. Blocks are still linked in circle.

m_pCurrBlock = m_pFirstBlock = m_pLastBlock;
m_iCount = 0;
m_iBytesCount = 0;
}

int CSndBuffer::getCurrBufSize() const
{
return m_iCount;
Expand Down
32 changes: 26 additions & 6 deletions srtcore/buffer_snd.h
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,8 @@ class CSndBuffer

void ackData(int offset);

void clear();

/// Read size of data still in the sending list.
/// @return Current size of the data in the sending list.
int getCurrBufSize() const;
Expand Down Expand Up @@ -241,12 +243,30 @@ class CSndBuffer
return m_iMsgNoBitset & MSGNO_SEQ::mask;
}

} * m_pBlock, *m_pFirstBlock, *m_pCurrBlock, *m_pLastBlock;

// m_pBlock: The head pointer
// m_pFirstBlock: The first block
// m_pCurrBlock: The current block
// m_pLastBlock: The last block (if first == last, buffer is empty)
}
/// The head pointer. Only used in constructor to roll over the initial set of blocks.
* m_pBlock,
/// The edge of the buffer at the reading end, pointing to the oldest ever stored
/// block that can still be read (although it's not readable if it is equal to m_pLastBlock).
* m_pFirstBlock,
/// The pointer to the next block to be read when extracting packets to send over the
/// network as "unique" (first time sent). Not used when extracting a packet for
/// retransmission, although this field must be updated every time any block removal
/// happens in case it would become stale.
* m_pCurrBlock,
/// Points to the block considered past-the-end of the used block space, simultaneously
/// the edge of the buffer at the writing end. The distance between m_pFirstBlock and
/// m_pLastBlock in the order of linked objects through the m_pNext field should be
/// always equal to m_iCount.
* m_pLastBlock;

// NOTE: The actual reserved space size for the buffer is saved in m_iSize and this is
// the true number of allocated blocks. There is no pointer to point directly to the end
// of reserved space - this can only be determined by the series of buffers that are
// linked to one another through the m_pNext field, which is NULL in the last one. Note
// also that Blocks are linked in circle, while Buffers are linked up to the last one,
// and Buffers are the true holders of the pieces of memory, while a block operates on
// its fragment only, which's size is defined by m_iBlockLen.

struct Buffer
{
Expand Down
Loading