Skip to content
Draft
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
05e0666
Added base utilities and implementation for sender schedule (untested)
Jun 16, 2025
700eb7f
Merged-in dev-rework-multiplexer, post-fixed
Jun 25, 2025
371606f
Added SocketKeeper tracking. Added speed stats recording
Jul 9, 2025
85daec8
Merged und fixed
Jul 10, 2025
6a94985
Added stop handling and muxer config entry. STILL NOT TESTED.
Jul 11, 2025
5b4cb49
Fixed bug: twice locking in wait. STILL TESTS FAIL
Jul 11, 2025
ddc6ab2
Merged latest changes from dev and dev-rework-multiplexer
Aug 12, 2025
c5b1e2c
Added HeapSet::find_next function
Aug 13, 2025
c7e85ad
Some log fixes
Aug 14, 2025
25a6027
Updated from dev and post-fixed
Nov 6, 2025
b4943f1
Cleared a compile warning
Nov 6, 2025
34c46ea
Updated to latest dev
Nov 6, 2025
12c4285
Removed another warning error
Nov 6, 2025
4f60162
Extracted CiBuffer from CRcvBuffer. Removed offset-dep from CSndBuffe…
Nov 12, 2025
1fbe6b4
Added alternative sender buffer
Nov 21, 2025
86bc2c9
Implemented per-packet busy flags and locker object for sending packets
Nov 25, 2025
1213952
Fixed build break
Nov 25, 2025
97c005f
Fixed bug: missing setting srctime when extracting. Fixed bug: missin…
Nov 26, 2025
16d3f20
Rewritten insert_loss. Reshaped the code to collect all SndPktArray m…
Dec 4, 2025
e1162dd
Updated with latest cutoff-dev
Dec 4, 2025
c9d1ec6
Fixed some build breaks
Dec 4, 2025
5fc7180
Fixed some build breaks (test)
Dec 4, 2025
05a4f03
Restored working code with the old CSndBuffer
Dec 4, 2025
ee5ef58
Fixed preallocation and maximum storage capacity parameters
Dec 4, 2025
a76e5b1
Tracking a CI failure on Ubuntu
Dec 4, 2025
5d3c5be
[BUG] Fixed std::shared_mutex usage for C++17. Removed external clone…
Dec 8, 2025
eb255c1
Merge branch 'dev-fix-shared-mutex-selection' into dev-new-sender-buffer
Dec 8, 2025
0144e17
Logging: no passing string by value. Removed verbose entries for RcvB…
Dec 8, 2025
3455f5c
Unified rexmit extraction. Removed old implementation
Dec 10, 2025
7444735
Merge branch 'dev' into dev-new-sender-buffer
Dec 11, 2025
f17d9b9
Fixed bug in CRcvBuf: m_iEndOff incorrectly updated after removal. Re…
Dec 12, 2025
bad3676
Fixed build break on pedantic configurations
Dec 12, 2025
8396cf4
Merge branch 'dev' into dev-sender-schedule
Dec 12, 2025
a6e9cfd
Merged the changes with the new sender buffer
Dec 15, 2025
f1e5b89
Literal in comment
Dec 15, 2025
db168ac
Updated to latest dev
Apr 9, 2026
9468f92
Fixed: deleted outdated doc file
Apr 9, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion apps/socketoptions.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,8 @@ const SocketOption srt_options [] {
{ "bindtodevice", 0, SRTO_BINDTODEVICE, SocketOption::PRE, SocketOption::STRING, nullptr},
{ "retransmitalgo", 0, SRTO_RETRANSMITALGO, SocketOption::PRE, SocketOption::INT, nullptr },
{ "cryptomode", 0, SRTO_CRYPTOMODE, SocketOption::PRE, SocketOption::INT, nullptr },
{ "maxrexmitbw", 0, SRTO_MAXREXMITBW, SocketOption::POST, SocketOption::INT64, nullptr }
{ "maxrexmitbw", 0, SRTO_MAXREXMITBW, SocketOption::POST, SocketOption::INT64, nullptr },
{ "sendmode", 0, SRTO_SENDMODE, SocketOption::PRE, SocketOption::INT, nullptr }
};
}

Expand Down
37 changes: 19 additions & 18 deletions srtcore/api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2261,7 +2261,8 @@ SRTSTATUS CUDTUnited::close(const SRTSOCKET u, int reason)
};
#endif

SocketKeeper k(*this, u, ERH_THROW);
SocketKeeper k = SOCKET_KEEP(u, ERH_THROW);

IF_HEAVY_LOGGING(ScopedExitLog slog(k.socket));
HLOGC(smlog.Debug, log << "CUDTUnited::close/begin: @" << u << " busy=" << k.socket->isStillBusy());

Expand Down Expand Up @@ -2394,7 +2395,7 @@ void CUDTSocket::breakNonAcceptedSockets()
HLOGC(smlog.Debug, log << "breakNonAcceptedSockets: found " << accepted.size() << " leaky accepted sockets");
for (vector<SRTSOCKET>::iterator i = accepted.begin(); i != accepted.end(); ++i)
{
CUDTUnited::SocketKeeper sk(m_UDT.uglobal(), *i);
SocketKeeper sk = SOCKET_KEEP(*i, ERH_RETURN);
if (sk.socket)
{
sk.socket->m_UDT.m_bBroken = true;
Expand Down Expand Up @@ -4212,7 +4213,7 @@ SRTSTATUS CUDT::getGroupData(SRTSOCKET groupid, SRT_SOCKGROUPDATA* pdata, size_t
return APIError(MJ_NOTSUP, MN_INVAL, 0);
}

CUDTUnited::GroupKeeper k(uglobal(), groupid, CUDTUnited::ERH_RETURN);
CUDTUnited::GroupKeeper k(uglobal(), groupid, ERH_RETURN);
if (!k.group)
{
return APIError(MJ_NOTSUP, MN_INVAL, 0);
Expand Down Expand Up @@ -4386,7 +4387,7 @@ SRTSOCKET CUDT::connectLinks(SRTSOCKET grp, SRT_SOCKGROUPCONFIG targets[], int a

try
{
CUDTUnited::GroupKeeper k(uglobal(), grp, CUDTUnited::ERH_THROW);
CUDTUnited::GroupKeeper k(uglobal(), grp, ERH_THROW);
return uglobal().groupConnect(k.group, targets, arraysize);
}
catch (CUDTException& e)
Expand Down Expand Up @@ -4509,13 +4510,13 @@ SRTSTATUS CUDT::getsockopt(SRTSOCKET u, int, SRT_SOCKOPT optname, void* pw_optva
#if ENABLE_BONDING
if (CUDT::isgroup(u))
{
CUDTUnited::GroupKeeper k(uglobal(), u, CUDTUnited::ERH_THROW);
CUDTUnited::GroupKeeper k(uglobal(), u, ERH_THROW);
k.group->getOpt(optname, (pw_optval), (*pw_optlen));
return SRT_STATUS_OK;
}
#endif

CUDT& udt = uglobal().locateSocket(u, CUDTUnited::ERH_THROW)->core();
CUDT& udt = uglobal().locateSocket(u, ERH_THROW)->core();
udt.getOpt(optname, (pw_optval), (*pw_optlen));
return SRT_STATUS_OK;
}
Expand All @@ -4540,13 +4541,13 @@ SRTSTATUS CUDT::setsockopt(SRTSOCKET u, int, SRT_SOCKOPT optname, const void* op
#if ENABLE_BONDING
if (CUDT::isgroup(u))
{
CUDTUnited::GroupKeeper k(uglobal(), u, CUDTUnited::ERH_THROW);
CUDTUnited::GroupKeeper k(uglobal(), u, ERH_THROW);
k.group->setOpt(optname, optval, optlen);
return SRT_STATUS_OK;
}
#endif

CUDT& udt = uglobal().locateSocket(u, CUDTUnited::ERH_THROW)->core();
CUDT& udt = uglobal().locateSocket(u, ERH_THROW)->core();
udt.setOpt(optname, optval, optlen);
return SRT_STATUS_OK;
}
Expand Down Expand Up @@ -4585,12 +4586,12 @@ int CUDT::sendmsg2(SRTSOCKET u, const char* buf, int len, SRT_MSGCTRL& w_m)
#if ENABLE_BONDING
if (CUDT::isgroup(u))
{
CUDTUnited::GroupKeeper k(uglobal(), u, CUDTUnited::ERH_THROW);
CUDTUnited::GroupKeeper k(uglobal(), u, ERH_THROW);
return k.group->send(buf, len, (w_m));
}
#endif

return uglobal().locateSocket(u, CUDTUnited::ERH_THROW)->core().sendmsg2(buf, len, (w_m));
return uglobal().locateSocket(u, ERH_THROW)->core().sendmsg2(buf, len, (w_m));
}
catch (const CUDTException& e)
{
Expand Down Expand Up @@ -4629,12 +4630,12 @@ int CUDT::recvmsg2(SRTSOCKET u, char* buf, int len, SRT_MSGCTRL& w_m)
#if ENABLE_BONDING
if (CUDT::isgroup(u))
{
CUDTUnited::GroupKeeper k(uglobal(), u, CUDTUnited::ERH_THROW);
CUDTUnited::GroupKeeper k(uglobal(), u, ERH_THROW);
return k.group->recv(buf, len, (w_m));
}
#endif

return uglobal().locateSocket(u, CUDTUnited::ERH_THROW)->core().recvmsg2(buf, len, (w_m));
return uglobal().locateSocket(u, ERH_THROW)->core().recvmsg2(buf, len, (w_m));
}
catch (const CUDTException& e)
{
Expand All @@ -4651,7 +4652,7 @@ int64_t CUDT::sendfile(SRTSOCKET u, fstream& ifs, int64_t& offset, int64_t size,
{
try
{
CUDT& udt = uglobal().locateSocket(u, CUDTUnited::ERH_THROW)->core();
CUDT& udt = uglobal().locateSocket(u, ERH_THROW)->core();
return udt.sendfile(ifs, offset, size, block);
}
catch (const CUDTException& e)
Expand All @@ -4673,7 +4674,7 @@ int64_t CUDT::recvfile(SRTSOCKET u, fstream& ofs, int64_t& offset, int64_t size,
{
try
{
return uglobal().locateSocket(u, CUDTUnited::ERH_THROW)->core().recvfile(ofs, offset, size, block);
return uglobal().locateSocket(u, ERH_THROW)->core().recvfile(ofs, offset, size, block);
}
catch (const CUDTException& e)
{
Expand Down Expand Up @@ -4978,7 +4979,7 @@ SRTSTATUS CUDT::bstats(SRTSOCKET u, CBytePerfMon* perf, bool clear, bool instant

try
{
CUDT& udt = uglobal().locateSocket(u, CUDTUnited::ERH_THROW)->core();
CUDT& udt = uglobal().locateSocket(u, ERH_THROW)->core();
udt.bstats(perf, clear, instantaneous);
return SRT_STATUS_OK;
}
Expand All @@ -4998,7 +4999,7 @@ SRTSTATUS CUDT::groupsockbstats(SRTSOCKET u, CBytePerfMon* perf, bool clear)
{
try
{
CUDTUnited::GroupKeeper k(uglobal(), u, CUDTUnited::ERH_THROW);
CUDTUnited::GroupKeeper k(uglobal(), u, ERH_THROW);
k.group->bstatsSocket(perf, clear);
return SRT_STATUS_OK;
}
Expand All @@ -5020,7 +5021,7 @@ CUDT* CUDT::getUDTHandle(SRTSOCKET u)
{
try
{
return &uglobal().locateSocket(u, CUDTUnited::ERH_THROW)->core();
return &uglobal().locateSocket(u, ERH_THROW)->core();
}
catch (const CUDTException& e)
{
Expand All @@ -5042,7 +5043,7 @@ SRT_SOCKSTATUS CUDT::getsockstate(SRTSOCKET u)
#if ENABLE_BONDING
if (CUDT::isgroup(u))
{
CUDTUnited::GroupKeeper k(uglobal(), u, CUDTUnited::ERH_THROW);
CUDTUnited::GroupKeeper k(uglobal(), u, ERH_THROW);
return k.group->getStatus();
}
#endif
Expand Down
48 changes: 0 additions & 48 deletions srtcore/api.h
Original file line number Diff line number Diff line change
Expand Up @@ -254,12 +254,6 @@ class CUDTUnited
static const size_t MAX_CLOSE_RECORD_SIZE = 10;

public:
enum ErrorHandling
{
ERH_RETURN,
ERH_THROW,
ERH_ABORT
};
static std::string CONID(SRTSOCKET sock);

/// initialize the UDT library.
Expand Down Expand Up @@ -545,48 +539,6 @@ class CUDTUnited

void closeAllSockets();

public:
struct SocketKeeper
{
CUDTSocket* socket;

SocketKeeper(): socket(NULL) {}

// This is intended for API functions to lock the socket's existence
// for the lifetime of their call.
SocketKeeper(CUDTUnited& glob, SRTSOCKET id, ErrorHandling erh = ERH_RETURN) { socket = glob.locateAcquireSocket(id, erh); }

// This is intended for TSBPD thread that should lock the socket's
// existence until it exits.
SocketKeeper(CUDTUnited& glob, CUDTSocket* s)
{
acquire(glob, s);
}

// Note: acquire doesn't check if the keeper already keeps anything.
// This is only for a use together with an empty constructor.
bool acquire(CUDTUnited& glob, CUDTSocket* s)
{
if (s == NULL)
{
socket = NULL;
return false;
}

const bool caught = glob.acquireSocket(s);
socket = caught ? s : NULL;
return caught;
}

~SocketKeeper()
{
if (socket)
{
SRT_ASSERT(socket->isStillBusy() > 0);
socket->apiRelease();
}
}
};

private:

Expand Down
74 changes: 58 additions & 16 deletions srtcore/buffer_snd.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,8 @@ CSndBuffer::CSndBuffer(int ip_family, int size, int maxpld, int authtag)
m_pBuffer->m_pNext = NULL;

// circular linked list for out bound packets
m_pBlock = new Block;
Block* pb = m_pBlock;
m_pBlock = new CSndBlock;
CSndBlock* pb = m_pBlock;
char* pc = m_pBuffer->m_pcData;

for (int i = 0; i < m_iSize; ++i)
Expand All @@ -99,7 +99,7 @@ CSndBuffer::CSndBuffer(int ip_family, int size, int maxpld, int authtag)

if (i < m_iSize - 1)
{
pb->m_pNext = new Block;
pb->m_pNext = new CSndBlock;
pb = pb->m_pNext;
}
}
Expand All @@ -112,10 +112,10 @@ CSndBuffer::CSndBuffer(int ip_family, int size, int maxpld, int authtag)

CSndBuffer::~CSndBuffer()
{
Block* pb = m_pBlock->m_pNext;
CSndBlock* pb = m_pBlock->m_pNext;
while (pb != m_pBlock)
{
Block* temp = pb;
CSndBlock* temp = pb;
pb = pb->m_pNext;
delete temp;
}
Expand Down Expand Up @@ -170,7 +170,7 @@ void CSndBuffer::addBuffer(const char* data, int len, SRT_MSGCTRL& w_mctrl)
// If there's more than one packet, this function must increase it by itself
// and then return the accordingly modified sequence number in the reference.

Block* s = m_pLastBlock;
CSndBlock* s = m_pLastBlock;

if (w_msgno == SRT_MSGNO_NONE) // DEFAULT-UNCHANGED msgno supplied
{
Expand Down Expand Up @@ -258,7 +258,7 @@ int CSndBuffer::addBufferFromFile(fstream& ifs, int len)
log << CONID() << "addBufferFromFile: adding " << iPktLen << " packets (" << len
<< " bytes) to send, msgno=" << m_iNextMsgNo);

Block* s = m_pLastBlock;
CSndBlock* s = m_pLastBlock;
int total = 0;
for (int i = 0; i < iNumBlocks; ++i)
{
Expand Down Expand Up @@ -343,7 +343,7 @@ int CSndBuffer::readData(CPacket& w_packet, steady_clock::time_point& w_srctime,
m_pCurrBlock->m_iMsgNoBitset |= MSGNO_ENCKEYSPEC::wrap(kflgs);
}

Block* p = m_pCurrBlock;
CSndBlock* p = m_pCurrBlock;
w_packet.set_msgflags(m_pCurrBlock->m_iMsgNoBitset);
w_srctime = m_pCurrBlock->m_tsOriginTime;
m_pCurrBlock = m_pCurrBlock->m_pNext;
Expand Down Expand Up @@ -381,7 +381,7 @@ int32_t CSndBuffer::getMsgNoAt(const int offset)
{
ScopedLock bufferguard(m_BufLock);

Block* p = m_pFirstBlock;
CSndBlock* p = m_pFirstBlock;

if (p)
{
Expand All @@ -401,7 +401,7 @@ int32_t CSndBuffer::getMsgNoAt(const int offset)
// XXX Suboptimal procedure to keep the blocks identifiable
// by sequence number. Consider using some circular buffer.
int i;
Block* ee SRT_ATR_UNUSED = 0;
CSndBlock* ee SRT_ATR_UNUSED = 0;
for (i = 0; i < offset && p; ++i)
{
ee = p;
Expand All @@ -423,14 +423,56 @@ int32_t CSndBuffer::getMsgNoAt(const int offset)
return p->getMsgSeq();
}

bool CSndBuffer::getPacketRangeSize(int32_t seqlo, int32_t seqhi, int& w_packets, int& w_bytes)
{
ScopedLock bufferguard(m_BufLock);

int npackets = 0, nbytes = 0;

// XXX Suboptimal procedure to keep the blocks identifiable
// by sequence number. Consider using some circular buffer.
CSndBlock* p = m_pFirstBlock;
for ( ; p != m_pLastBlock; p = p->m_pNext)
{
if (p->m_iSeqNo == seqlo)
break;
}
if (p == m_pLastBlock)
return false;

for ( ; p != m_pLastBlock; p = p->m_pNext)
{
++npackets;
nbytes += p->m_iLength;
if (p->m_iSeqNo == seqhi)
break;
}

w_packets = npackets;
w_bytes = nbytes;

if (p == m_pLastBlock)
{
if (p->m_iSeqNo == seqhi)
return true;

// This means it caught some initial packets, but didn't reach
// the end of range. This way it will have some packets, but
// this was an error
return false;
}

return true;
}

int CSndBuffer::readData(const int offset, CPacket& w_packet, steady_clock::time_point& w_srctime, DropRange& w_drop)
{
// NOTE: w_packet.m_iSeqNo is expected to be set to the value
// of the sequence number with which this packet should be sent.

ScopedLock bufferguard(m_BufLock);

Block* p = m_pFirstBlock;
CSndBlock* p = m_pFirstBlock;

// XXX Suboptimal procedure to keep the blocks identifiable
// by sequence number. Consider using some circular buffer.
Expand Down Expand Up @@ -533,7 +575,7 @@ int CSndBuffer::readData(const int offset, CPacket& w_packet, steady_clock::time
sync::steady_clock::time_point CSndBuffer::getPacketRexmitTime(const int offset)
{
ScopedLock bufferguard(m_BufLock);
const Block* p = m_pFirstBlock;
const CSndBlock* p = m_pFirstBlock;

// XXX Suboptimal procedure to keep the blocks identifiable
// by sequence number. Consider using some circular buffer.
Expand Down Expand Up @@ -721,20 +763,20 @@ void CSndBuffer::increase()
p->m_pNext = nbuf;

// new packet blocks
Block* nblk = NULL;
CSndBlock* nblk = NULL;
try
{
nblk = new Block;
nblk = new CSndBlock;
}
catch (...)
{
delete nblk;
throw CUDTException(MJ_SYSTEMRES, MN_MEMORY, 0);
}
Block* pb = nblk;
CSndBlock* pb = nblk;
for (int i = 1; i < unitsize; ++i)
{
pb->m_pNext = new Block;
pb->m_pNext = new CSndBlock;
pb = pb->m_pNext;
}

Expand Down
Loading
Loading