Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions srtcore/api.h
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ class CUDTUnited
friend class CUDTGroup;
friend class CRendezvousQueue;
friend class CCryptoControl;
friend class TestMockCUDT;

public:
CUDTUnited();
Expand Down
156 changes: 113 additions & 43 deletions srtcore/core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8688,9 +8688,24 @@ void srt::CUDT::updateSndLossListOnACK(int32_t ackdata_seqno)
leaveCS(m_StatsLock);
}

void srt::CUDT::processCtrlAck(const CPacket &ctrlpkt, const steady_clock::time_point& currtime)
{
bool srt::CUDT::processCtrlAck(const CPacket &ctrlpkt, const steady_clock::time_point& currtime)
{
// Valid ACK payloads are either LITE (just an ack seqno) or at least SMALL
// (RCVLASTACK + RTT + RTTVAR + BUFFERLEFT = 16 B). Anything else would OOB-read.
const size_t pktlen = ctrlpkt.getLength();
const bool isLiteAck = pktlen == size_t(SEND_LITE_ACK);
if (!isLiteAck && pktlen < ACKD_TOTAL_SIZE_SMALL * ACKD_FIELD_SIZE)
{
LOGC(inlog.Warn, log << CONID() << "ACK: EPE: wrong payload size=" << pktlen
<< " expected 4 or at least SMALL ("
<< (ACKD_TOTAL_SIZE_SMALL * ACKD_FIELD_SIZE)
<< ") - rejecting");
return false;
}

const int32_t* ackdata = (const int32_t*)ctrlpkt.m_pcData;

// Note: minimum of one 4-byte field is granted before the call.
const int32_t ackdata_seqno = ackdata[ACKD_RCVLASTACK];

// Check the value of ACK in case when it was some rogue peer
Expand All @@ -8702,10 +8717,9 @@ void srt::CUDT::processCtrlAck(const CPacket &ctrlpkt, const steady_clock::time_
// This check MUST BE DONE before making any operation on this number.
LOGC(inlog.Error, log << CONID() << "ACK: IPE/EPE: received invalid ACK value: " << ackdata_seqno
<< " " << std::hex << ackdata_seqno << " (IGNORED)");
return;
return false;
}

const bool isLiteAck = ctrlpkt.getLength() == (size_t)SEND_LITE_ACK;
HLOGC(inlog.Debug,
log << CONID() << "ACK covers: " << m_iSndLastDataAck << " - " << ackdata_seqno << " [ACK=" << m_iSndLastAck
<< "]" << (isLiteAck ? "[LITE]" : "[FULL]"));
Expand All @@ -8724,7 +8738,16 @@ void srt::CUDT::processCtrlAck(const CPacket &ctrlpkt, const steady_clock::time_
m_tsLastRspAckTime = currtime;
m_iReXmitCount = 1; // Reset re-transmit count since last ACK
}
return;
return true;
}

const size_t acksize = pktlen / ACKD_FIELD_SIZE; // ACTUAL VALUE

// Check minimum size acceptable. If less, reject it.
if (acksize < ACKD_TOTAL_SIZE_SMALL)
{
LOGC(inlog.Error, log << CONID() << "EPE: ACK msg received with too small size: " << pktlen);
return false;
}

// Decide to send ACKACK or not
Expand Down Expand Up @@ -8766,7 +8789,7 @@ void srt::CUDT::processCtrlAck(const CPacket &ctrlpkt, const steady_clock::time_

updateBrokenConnection();
completeBrokenConnectionDependencies(SRT_ESECFAIL); // LOCKS!
return;
return false;
}

if (CSeqNo::seqcmp(ackdata_seqno, m_iSndLastAck) >= 0)
Expand Down Expand Up @@ -8802,7 +8825,7 @@ void srt::CUDT::processCtrlAck(const CPacket &ctrlpkt, const steady_clock::time_
if (CSeqNo::seqoff(m_iSndLastFullAck, ackdata_seqno) <= 0)
{
// discard it if it is a repeated ACK
return;
return true;
}
m_iSndLastFullAck = ackdata_seqno;
}
Expand All @@ -8822,24 +8845,12 @@ void srt::CUDT::processCtrlAck(const CPacket &ctrlpkt, const steady_clock::time_
}
#endif

size_t acksize = ctrlpkt.getLength(); // TEMPORARY VALUE FOR CHECKING
bool wrongsize = 0 != (acksize % ACKD_FIELD_SIZE);
acksize = acksize / ACKD_FIELD_SIZE; // ACTUAL VALUE

if (wrongsize)
{
// Issue a log, but don't do anything but skipping the "odd" bytes from the payload.
LOGC(inlog.Warn,
log << CONID() << "Received UMSG_ACK payload is not evened up to 4-byte based field size - cutting to "
<< acksize << " fields");
}

// Start with checking the base size.
if (acksize < ACKD_TOTAL_SIZE_SMALL)
{
LOGC(inlog.Warn, log << CONID() << "Invalid ACK size " << acksize << " fields - less than minimum required!");
// Ack is already interpreted, just skip further parts.
return;
return false;
}
// This check covers fields up to ACKD_BUFFERLEFT.

Expand Down Expand Up @@ -8949,9 +8960,11 @@ void srt::CUDT::processCtrlAck(const CPacket &ctrlpkt, const steady_clock::time_
enterCS(m_StatsLock);
m_stats.sndr.recvdAck.count(1);
leaveCS(m_StatsLock);

return true;
}

void srt::CUDT::processCtrlAckAck(const CPacket& ctrlpkt, const time_point& tsArrival)
bool srt::CUDT::processCtrlAckAck(const CPacket& ctrlpkt, const time_point& tsArrival)
{
int32_t ack = 0;

Expand All @@ -8977,22 +8990,22 @@ void srt::CUDT::processCtrlAckAck(const CPacket& ctrlpkt, const time_point& tsAr
}
#endif

return;
return false;
}

LOGC(inlog.Error,
log << CONID() << "ACK record not found, can't estimate RTT "
<< "(ACK number: " << ctrlpkt.getAckSeqNo() << ", last ACK sent: " << m_iAckSeqNo
<< ", RTT (EWMA): " << m_iSRTT << ")");
return;
return false;
}

if (rtt <= 0)
{
LOGC(inlog.Error,
log << CONID() << "IPE: invalid RTT estimate " << rtt
<< ", possible time shift. Clock: " << SRT_SYNC_CLOCK_STR);
return;
return false;
}

// If increasing delay is detected.
Expand Down Expand Up @@ -9047,12 +9060,13 @@ void srt::CUDT::processCtrlAckAck(const CPacket& ctrlpkt, const time_point& tsAr
// Update last ACK that has been received by the sender
if (CSeqNo::seqcmp(ack, m_iRcvLastAckAck) > 0)
m_iRcvLastAckAck = ack;
return true;
}

void srt::CUDT::processCtrlLossReport(const CPacket& ctrlpkt)
bool srt::CUDT::processCtrlLossReport(const CPacket& ctrlpkt)
{
const int32_t* losslist = (int32_t*)(ctrlpkt.m_pcData);
const size_t losslist_len = ctrlpkt.getLength() / 4;
const size_t losslist_len = ctrlpkt.getLength() / sizeof(int32_t);

bool secure = true;

Expand Down Expand Up @@ -9207,7 +9221,7 @@ void srt::CUDT::processCtrlLossReport(const CPacket& ctrlpkt)

updateBrokenConnection();
completeBrokenConnectionDependencies(SRT_ESECFAIL); // LOCKS!
return;
return false;
}

// the lost packet (retransmission) should be sent out immediately
Expand All @@ -9216,12 +9230,18 @@ void srt::CUDT::processCtrlLossReport(const CPacket& ctrlpkt)
enterCS(m_StatsLock);
m_stats.sndr.recvdNak.count(1);
leaveCS(m_StatsLock);

return true;
}

void srt::CUDT::processCtrlHS(const CPacket& ctrlpkt)
bool srt::CUDT::processCtrlHS(const CPacket& ctrlpkt)
{
CHandShake req;
req.load_from(ctrlpkt.m_pcData, ctrlpkt.getLength());
if (-1 == req.load_from(ctrlpkt.m_pcData, ctrlpkt.getLength()))
{
LOGC(inlog.Error, log << CONID() << "processCtrlHS: EPE: Handshake has wrong size: " << ctrlpkt.getLength());
return false;
}

HLOGC(inlog.Debug, log << CONID() << "processCtrl: got HS: " << req.show());

Expand Down Expand Up @@ -9328,21 +9348,51 @@ void srt::CUDT::processCtrlHS(const CPacket& ctrlpkt)
else
{
HLOGC(inlog.Debug, log << CONID() << "processCtrl: ... not INDUCTION, not ERROR, not rendezvous - IGNORED.");
return false;
}
return true;
}

void srt::CUDT::processCtrlDropReq(const CPacket& ctrlpkt)
bool srt::CUDT::processCtrlDropReq(const CPacket& ctrlpkt)
{
// dropdata[0..1] are indexed unconditionally below.
if (ctrlpkt.getLength() < 2 * sizeof(int32_t))
{
LOGC(inlog.Warn, log << CONID() << "DROPREQ: payload " << ctrlpkt.getLength()
<< " bytes < " << (2 * sizeof(int32_t)) << " - rejecting");
return;
return false;
}

const int32_t* dropdata = (const int32_t*) ctrlpkt.m_pcData;

// The wire format carries a (lo, hi) seqno range. Reject reversed
// ranges: dropMessage walks a circular buffer from offset(lo) to
// offset(hi)+1 via incPos(); when seqcmp(lo, hi) > 0 the loop wraps
// and clears nearly the entire receive buffer (DoS primitive). The
// analogous LOSSREPORT path already rejects reversed ranges.

// This is for check only - one packet read will not spoil it
m_RcvBufferLock.lock();
const int32_t hookseq_begin = m_pRcvBuffer->getStartSeqNo();
m_RcvBufferLock.unlock();

int dist_begin = CSeqNo::seqoff(dropdata[0], hookseq_begin);
int dist_rel = CSeqNo::seqoff(dropdata[0], dropdata[1]);

if (abs(dist_begin) > CSeqNo::m_iSeqNoTH/2)
{
LOGC(inlog.Warn, log << CONID() << "EPE: rcv DROPREQ low rng %"
<< dropdata[0] << " - too distant to receiver buffer %" << hookseq_begin
<< " by " << dist_begin << " (exceeds threshold)");
return false;
}
if (dist_rel < 0 || dist_rel > CSeqNo::m_iSeqNoTH/2)
{
LOGC(inlog.Warn, log << CONID() << "EPE: rcv DROPREQ rng %"
<< dropdata[0] << " - %" << dropdata[1] << " - REVERSED RANGE, DISCARDING");
return false;
}

{
CUniqueSync rcvtscc (m_RecvLock, m_RcvTsbPdCond);
// With both TLPktDrop and TsbPd enabled, a message always consists only of one packet.
Expand Down Expand Up @@ -9404,9 +9454,11 @@ void srt::CUDT::processCtrlDropReq(const CPacket& ctrlpkt)
HLOGC(inlog.Debug, log << CONID() << "DROPREQ: dropping %"
<< dropdata[0] << "-" << dropdata[1] << " current %" << m_iRcvCurrSeqNo);
}

return true;
}

void srt::CUDT::processCtrlShutdown()
bool srt::CUDT::processCtrlShutdown()
{
m_bShutdown = true;
m_bClosing = true;
Expand All @@ -9417,9 +9469,10 @@ void srt::CUDT::processCtrlShutdown()
// just we know about this state prematurely thanks to this message.
updateBrokenConnection();
completeBrokenConnectionDependencies(SRT_ECONNLOST); // LOCKS!
return true;
}

void srt::CUDT::processCtrlUserDefined(const CPacket& ctrlpkt)
bool srt::CUDT::processCtrlUserDefined(const CPacket& ctrlpkt)
{
HLOGC(inlog.Debug, log << CONID() << "CONTROL EXT MSG RECEIVED:"
<< MessageTypeStr(ctrlpkt.getType(), ctrlpkt.getExtendedType())
Expand Down Expand Up @@ -9447,31 +9500,44 @@ void srt::CUDT::processCtrlUserDefined(const CPacket& ctrlpkt)
{
updateCC(TEV_CUSTOM, EventVariant(&ctrlpkt));
}
return true;
}

void srt::CUDT::processCtrl(const CPacket &ctrlpkt)
bool srt::CUDT::processCtrl(const CPacket &ctrlpkt)
{
// Just heard from the peer, reset the expiration count.
m_iEXPCount = 1;
const steady_clock::time_point currtime = steady_clock::now();
m_tsLastRspTime = currtime;

// Extra check for the payload size:
// - must be aligned to int32_t
// - cannot be 0 (msgs with no args use 4-byte zero-filled padding).
size_t pktlen = ctrlpkt.getLength();
if (!pktlen || pktlen % sizeof(int32_t) != 0)
{
LOGC(inlog.Error, log << CONID() << "EPE: incoming UMSG: " << ctrlpkt.getType() << " INVALID SIZE: " << pktlen
<< " (expected > 0 and aligned to " << sizeof(int32_t) << " bytes)");
return false;
}

HLOGC(inlog.Debug,
log << CONID() << "incoming UMSG:" << ctrlpkt.getType() << " ("
<< MessageTypeStr(ctrlpkt.getType(), ctrlpkt.getExtendedType()) << ") socket=%" << ctrlpkt.id());

bool result = false;
switch (ctrlpkt.getType())
{
case UMSG_ACK: // 010 - Acknowledgement
processCtrlAck(ctrlpkt, currtime);
result = processCtrlAck(ctrlpkt, currtime);
break;

case UMSG_ACKACK: // 110 - Acknowledgement of Acknowledgement
processCtrlAckAck(ctrlpkt, currtime);
result = processCtrlAckAck(ctrlpkt, currtime);
break;

case UMSG_LOSSREPORT: // 011 - Loss Report
processCtrlLossReport(ctrlpkt);
result = processCtrlLossReport(ctrlpkt);
break;

case UMSG_CGWARNING: // 100 - Delay Warning
Expand All @@ -9485,19 +9551,19 @@ void srt::CUDT::processCtrl(const CPacket &ctrlpkt)
break;

case UMSG_KEEPALIVE: // 001 - Keep-alive
processKeepalive(ctrlpkt, currtime);
result = processKeepalive(ctrlpkt, currtime);
break;

case UMSG_HANDSHAKE: // 000 - Handshake
processCtrlHS(ctrlpkt);
result = processCtrlHS(ctrlpkt);
break;

case UMSG_SHUTDOWN: // 101 - Shutdown
processCtrlShutdown();
result = processCtrlShutdown();
break;

case UMSG_DROPREQ: // 111 - Msg drop request
processCtrlDropReq(ctrlpkt);
result = processCtrlDropReq(ctrlpkt);
break;

case UMSG_PEERERROR: // 1000 - An error has happened to the peer side
Expand All @@ -9507,16 +9573,18 @@ void srt::CUDT::processCtrl(const CPacket &ctrlpkt)
// if recvfile() fails (e.g., due to disk fail), blocked sendfile/send should return immediately
// giving the app a chance to fix the issue
m_bPeerHealth = false;
result = true;

break;

case UMSG_EXT: // 0x7FFF - reserved and user defined messages
processCtrlUserDefined(ctrlpkt);
result = processCtrlUserDefined(ctrlpkt);
break;

default:
break;
}
return result;
}

void srt::CUDT::updateSrtRcvSettings()
Expand Down Expand Up @@ -12534,7 +12602,7 @@ bool srt::CUDT::runAcceptHook(CUDT *acore, const sockaddr* peer, const CHandShak
return true;
}

void srt::CUDT::processKeepalive(const CPacket& ctrlpkt, const time_point& tsArrival)
bool srt::CUDT::processKeepalive(const CPacket& ctrlpkt, const time_point& tsArrival)
{
// Here can be handled some protocol definition
// for extra data sent through keepalive.
Expand Down Expand Up @@ -12563,6 +12631,8 @@ void srt::CUDT::processKeepalive(const CPacket& ctrlpkt, const time_point& tsArr
m_pRcvBuffer->updateTsbPdTimeBase(ctrlpkt.getMsgTimeStamp());
if (m_config.bDriftTracer)
m_pRcvBuffer->addRcvTsbPdDriftSample(ctrlpkt.getMsgTimeStamp(), tsArrival, -1);

return true;
}

namespace srt {
Expand Down
Loading
Loading