Fix bugs in SACK recovery.
Every call to sender.NextSeg does not need to iterate from the front of the writeList as in a given recovery episode we can cache the last nextSeg returned. There cannot be a lower sequenced segment that matches the next call to NextSeg as otherwise we would have returned that instead in the previous call. This fixes the issue of excessive CPU usage w/ large send buffers where we spend a lot of time iterating from the front of the list on every NextSeg invocation. Further the following other bugs were also fixed: * Iteration of segments never sent in NextSeg() when looking for segments for retransmission that match step1/3/4 of the NextSeg algorithm * Correctly setting rescueRxt only if the rescue segment was actually sent. * Correctly initializing rescueRxt/highRxt when entering SACK recovery. * Correctly re-arming the timer only on retransmissions when SACK is in use and not for every segment being sent as it was being done before. * Copy over xmitTime and xmitCount on segment clone. * Move writeNext along when skipping over SACKED segments. This is required to prevent spurious retransmissions where we end up retransmitting data that was never lost. PiperOrigin-RevId: 310387671
This commit is contained in:
parent
16da7e790f
commit
08f4846ebe
|
@ -96,6 +96,8 @@ func (s *segment) clone() *segment {
|
|||
route: s.route.Clone(),
|
||||
viewToDeliver: s.viewToDeliver,
|
||||
rcvdTime: s.rcvdTime,
|
||||
xmitTime: s.xmitTime,
|
||||
xmitCount: s.xmitCount,
|
||||
}
|
||||
t.data = s.data.Clone(t.views[:])
|
||||
return t
|
||||
|
|
|
@ -598,22 +598,34 @@ func (s *sender) splitSeg(seg *segment, size int) {
|
|||
seg.data.CapLength(size)
|
||||
}
|
||||
|
||||
// NextSeg implements the RFC6675 NextSeg() operation. It returns segments that
|
||||
// match rule 1, 3 and 4 of the NextSeg() operation defined in RFC6675. Rule 2
|
||||
// is handled by the normal send logic.
|
||||
func (s *sender) NextSeg() (nextSeg1, nextSeg3, nextSeg4 *segment) {
|
||||
// NextSeg implements the RFC6675 NextSeg() operation.
|
||||
//
|
||||
// NextSeg starts scanning the writeList starting from nextSegHint and returns
|
||||
// the hint to be passed on the next call to NextSeg. This is required to avoid
|
||||
// iterating the write list repeatedly when NextSeg is invoked in a loop during
|
||||
// recovery. The returned hint will be nil if there are no more segments that
|
||||
// can match rules defined by NextSeg operation in RFC6675.
|
||||
//
|
||||
// rescueRtx will be true only if nextSeg is a rescue retransmission as
|
||||
// described by Step 4) of the NextSeg algorithm.
|
||||
func (s *sender) NextSeg(nextSegHint *segment) (nextSeg, hint *segment, rescueRtx bool) {
|
||||
var s3 *segment
|
||||
var s4 *segment
|
||||
smss := s.ep.scoreboard.SMSS()
|
||||
// Step 1.
|
||||
for seg := s.writeList.Front(); seg != nil; seg = seg.Next() {
|
||||
if !s.isAssignedSequenceNumber(seg) {
|
||||
for seg := nextSegHint; seg != nil; seg = seg.Next() {
|
||||
// Stop iteration if we hit a segment that has never been
|
||||
// transmitted (i.e. either it has no assigned sequence number
|
||||
// or if it does have one, it's >= the next sequence number
|
||||
// to be sent [i.e. >= s.sndNxt]).
|
||||
if !s.isAssignedSequenceNumber(seg) || s.sndNxt.LessThanEq(seg.sequenceNumber) {
|
||||
hint = nil
|
||||
break
|
||||
}
|
||||
segSeq := seg.sequenceNumber
|
||||
if seg.data.Size() > int(smss) {
|
||||
if smss := s.ep.scoreboard.SMSS(); seg.data.Size() > int(smss) {
|
||||
s.splitSeg(seg, int(smss))
|
||||
}
|
||||
|
||||
// See RFC 6675 Section 4
|
||||
//
|
||||
// 1. If there exists a smallest unSACKED sequence number
|
||||
|
@ -630,8 +642,9 @@ func (s *sender) NextSeg() (nextSeg1, nextSeg3, nextSeg4 *segment) {
|
|||
// NextSeg():
|
||||
// (1.c) IsLost(S2) returns true.
|
||||
if s.ep.scoreboard.IsLost(segSeq) {
|
||||
return seg, s3, s4
|
||||
return seg, seg.Next(), false
|
||||
}
|
||||
|
||||
// NextSeg():
|
||||
//
|
||||
// (3): If the conditions for rules (1) and (2)
|
||||
|
@ -643,6 +656,7 @@ func (s *sender) NextSeg() (nextSeg1, nextSeg3, nextSeg4 *segment) {
|
|||
// SHOULD be returned.
|
||||
if s3 == nil {
|
||||
s3 = seg
|
||||
hint = seg.Next()
|
||||
}
|
||||
}
|
||||
// NextSeg():
|
||||
|
@ -651,10 +665,12 @@ func (s *sender) NextSeg() (nextSeg1, nextSeg3, nextSeg4 *segment) {
|
|||
// but there exists outstanding unSACKED data, we
|
||||
// provide the opportunity for a single "rescue"
|
||||
// retransmission per entry into loss recovery. If
|
||||
// HighACK is greater than RescueRxt, the one
|
||||
// segment of upto SMSS octects that MUST include
|
||||
// the highest outstanding unSACKed sequence number
|
||||
// SHOULD be returned.
|
||||
// HighACK is greater than RescueRxt (or RescueRxt
|
||||
// is undefined), then one segment of upto SMSS
|
||||
// octects that MUST include the highest outstanding
|
||||
// unSACKed sequence number SHOULD be returned, and
|
||||
// RescueRxt set to RecoveryPoint. HighRxt MUST NOT
|
||||
// be updated.
|
||||
if s.fr.rescueRxt.LessThan(s.sndUna - 1) {
|
||||
if s4 != nil {
|
||||
if s4.sequenceNumber.LessThan(segSeq) {
|
||||
|
@ -663,12 +679,31 @@ func (s *sender) NextSeg() (nextSeg1, nextSeg3, nextSeg4 *segment) {
|
|||
} else {
|
||||
s4 = seg
|
||||
}
|
||||
s.fr.rescueRxt = s.fr.last
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil, s3, s4
|
||||
// If we got here then no segment matched step (1).
|
||||
// Step (2): "If no sequence number 'S2' per rule (1)
|
||||
// exists but there exists available unsent data and the
|
||||
// receiver's advertised window allows, the sequence
|
||||
// range of one segment of up to SMSS octets of
|
||||
// previously unsent data starting with sequence number
|
||||
// HighData+1 MUST be returned."
|
||||
for seg := s.writeNext; seg != nil; seg = seg.Next() {
|
||||
if s.isAssignedSequenceNumber(seg) && seg.sequenceNumber.LessThan(s.sndNxt) {
|
||||
continue
|
||||
}
|
||||
// We do not split the segment here to <= smss as it has
|
||||
// potentially not been assigned a sequence number yet.
|
||||
return seg, nil, false
|
||||
}
|
||||
|
||||
if s3 != nil {
|
||||
return s3, hint, false
|
||||
}
|
||||
|
||||
return s4, nil, true
|
||||
}
|
||||
|
||||
// maybeSendSegment tries to send the specified segment and either coalesces
|
||||
|
@ -792,64 +827,47 @@ func (s *sender) maybeSendSegment(seg *segment, limit int, end seqnum.Value) (se
|
|||
// section 5, step C.
|
||||
func (s *sender) handleSACKRecovery(limit int, end seqnum.Value) (dataSent bool) {
|
||||
s.SetPipe()
|
||||
|
||||
if smss := int(s.ep.scoreboard.SMSS()); limit > smss {
|
||||
// Cap segment size limit to s.smss as SACK recovery requires
|
||||
// that all retransmissions or new segments send during recovery
|
||||
// be of <= SMSS.
|
||||
limit = smss
|
||||
}
|
||||
|
||||
nextSegHint := s.writeList.Front()
|
||||
for s.outstanding < s.sndCwnd {
|
||||
nextSeg, s3, s4 := s.NextSeg()
|
||||
var nextSeg *segment
|
||||
var rescueRtx bool
|
||||
nextSeg, nextSegHint, rescueRtx = s.NextSeg(nextSegHint)
|
||||
if nextSeg == nil {
|
||||
// NextSeg():
|
||||
return dataSent
|
||||
}
|
||||
if !s.isAssignedSequenceNumber(nextSeg) || s.sndNxt.LessThanEq(nextSeg.sequenceNumber) {
|
||||
// New data being sent.
|
||||
|
||||
// Step C.3 described below is handled by
|
||||
// maybeSendSegment which increments sndNxt when
|
||||
// a segment is transmitted.
|
||||
//
|
||||
// Step (2): "If no sequence number 'S2' per rule (1)
|
||||
// exists but there exists available unsent data and the
|
||||
// receiver's advertised window allows, the sequence
|
||||
// range of one segment of up to SMSS octets of
|
||||
// previously unsent data starting with sequence number
|
||||
// HighData+1 MUST be returned."
|
||||
for seg := s.writeNext; seg != nil; seg = seg.Next() {
|
||||
if s.isAssignedSequenceNumber(seg) && seg.sequenceNumber.LessThan(s.sndNxt) {
|
||||
continue
|
||||
}
|
||||
// Step C.3 described below is handled by
|
||||
// maybeSendSegment which increments sndNxt when
|
||||
// a segment is transmitted.
|
||||
//
|
||||
// Step C.3 "If any of the data octets sent in
|
||||
// (C.1) are above HighData, HighData must be
|
||||
// updated to reflect the transmission of
|
||||
// previously unsent data."
|
||||
if sent := s.maybeSendSegment(seg, limit, end); !sent {
|
||||
break
|
||||
}
|
||||
dataSent = true
|
||||
s.outstanding++
|
||||
s.writeNext = seg.Next()
|
||||
nextSeg = seg
|
||||
break
|
||||
}
|
||||
if nextSeg != nil {
|
||||
continue
|
||||
}
|
||||
}
|
||||
rescueRtx := false
|
||||
if nextSeg == nil && s3 != nil {
|
||||
nextSeg = s3
|
||||
}
|
||||
if nextSeg == nil && s4 != nil {
|
||||
nextSeg = s4
|
||||
rescueRtx = true
|
||||
}
|
||||
if nextSeg == nil {
|
||||
break
|
||||
}
|
||||
segEnd := nextSeg.sequenceNumber.Add(nextSeg.logicalLen())
|
||||
if !rescueRtx && nextSeg.sequenceNumber.LessThan(s.sndNxt) {
|
||||
// RFC 6675, Step C.2
|
||||
// Step C.3 "If any of the data octets sent in
|
||||
// (C.1) are above HighData, HighData must be
|
||||
// updated to reflect the transmission of
|
||||
// previously unsent data."
|
||||
//
|
||||
// "If any of the data octets sent in (C.1) are below
|
||||
// HighData, HighRxt MUST be set to the highest sequence
|
||||
// number of the retransmitted segment unless NextSeg ()
|
||||
// rule (4) was invoked for this retransmission."
|
||||
s.fr.highRxt = segEnd - 1
|
||||
// We pass s.smss as the limit as the Step 2) requires that
|
||||
// new data sent should be of size s.smss or less.
|
||||
if sent := s.maybeSendSegment(nextSeg, limit, end); !sent {
|
||||
return dataSent
|
||||
}
|
||||
dataSent = true
|
||||
s.outstanding++
|
||||
s.writeNext = nextSeg.Next()
|
||||
continue
|
||||
}
|
||||
|
||||
// Now handle the retransmission case where we matched either step 1,3 or 4
|
||||
// of the NextSeg algorithm.
|
||||
// RFC 6675, Step C.4.
|
||||
//
|
||||
// "The estimate of the amount of data outstanding in the network
|
||||
|
@ -858,6 +876,22 @@ func (s *sender) handleSACKRecovery(limit int, end seqnum.Value) (dataSent bool)
|
|||
s.outstanding++
|
||||
dataSent = true
|
||||
s.sendSegment(nextSeg)
|
||||
|
||||
segEnd := nextSeg.sequenceNumber.Add(nextSeg.logicalLen())
|
||||
if rescueRtx {
|
||||
// We do the last part of rule (4) of NextSeg here to update
|
||||
// RescueRxt as until this point we don't know if we are going
|
||||
// to use the rescue transmission.
|
||||
s.fr.rescueRxt = s.fr.last
|
||||
} else {
|
||||
// RFC 6675, Step C.2
|
||||
//
|
||||
// "If any of the data octets sent in (C.1) are below
|
||||
// HighData, HighRxt MUST be set to the highest sequence
|
||||
// number of the retransmitted segment unless NextSeg ()
|
||||
// rule (4) was invoked for this retransmission."
|
||||
s.fr.highRxt = segEnd - 1
|
||||
}
|
||||
}
|
||||
return dataSent
|
||||
}
|
||||
|
@ -903,7 +937,7 @@ func (s *sender) sendData() {
|
|||
// "A TCP SHOULD set cwnd to no more than RW before beginning
|
||||
// transmission if the TCP has not sent data in the interval exceeding
|
||||
// the retrasmission timeout."
|
||||
if !s.fr.active && time.Now().Sub(s.lastSendTime) > s.rto {
|
||||
if !s.fr.active && s.state != RTORecovery && time.Now().Sub(s.lastSendTime) > s.rto {
|
||||
if s.sndCwnd > InitialCwnd {
|
||||
s.sndCwnd = InitialCwnd
|
||||
}
|
||||
|
@ -921,6 +955,9 @@ func (s *sender) sendData() {
|
|||
limit = cwndLimit
|
||||
}
|
||||
if s.isAssignedSequenceNumber(seg) && s.ep.sackPermitted && s.ep.scoreboard.IsSACKED(seg.sackBlock()) {
|
||||
// Move writeNext along so that we don't try and scan data that
|
||||
// has already been SACKED.
|
||||
s.writeNext = seg.Next()
|
||||
continue
|
||||
}
|
||||
if sent := s.maybeSendSegment(seg, limit, end); !sent {
|
||||
|
@ -966,6 +1003,8 @@ func (s *sender) enterFastRecovery() {
|
|||
s.fr.first = s.sndUna
|
||||
s.fr.last = s.sndNxt - 1
|
||||
s.fr.maxCwnd = s.sndCwnd + s.outstanding
|
||||
s.fr.highRxt = s.sndUna
|
||||
s.fr.rescueRxt = s.sndUna
|
||||
if s.ep.sackPermitted {
|
||||
s.state = SACKRecovery
|
||||
s.ep.stack.Stats().TCP.SACKRecovery.Increment()
|
||||
|
@ -1258,6 +1297,7 @@ func (s *sender) handleRcvdSegment(seg *segment) {
|
|||
if s.writeNext == seg {
|
||||
s.writeNext = seg.Next()
|
||||
}
|
||||
|
||||
s.writeList.Remove(seg)
|
||||
|
||||
// if SACK is enabled then Only reduce outstanding if
|
||||
|
@ -1329,7 +1369,23 @@ func (s *sender) sendSegment(seg *segment) *tcpip.Error {
|
|||
}
|
||||
seg.xmitTime = time.Now()
|
||||
seg.xmitCount++
|
||||
return s.sendSegmentFromView(seg.data, seg.flags, seg.sequenceNumber)
|
||||
err := s.sendSegmentFromView(seg.data, seg.flags, seg.sequenceNumber)
|
||||
|
||||
// Every time a packet containing data is sent (including a
|
||||
// retransmission), if SACK is enabled and we are retransmitting data
|
||||
// then use the conservative timer described in RFC6675 Section 6.0,
|
||||
// otherwise follow the standard time described in RFC6298 Section 5.1.
|
||||
if err != nil && seg.data.Size() != 0 {
|
||||
if s.fr.active && seg.xmitCount > 1 && s.ep.sackPermitted {
|
||||
s.resendTimer.enable(s.rto)
|
||||
} else {
|
||||
if !s.resendTimer.enabled() {
|
||||
s.resendTimer.enable(s.rto)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// sendSegmentFromView sends a new segment containing the given payload, flags
|
||||
|
@ -1345,19 +1401,5 @@ func (s *sender) sendSegmentFromView(data buffer.VectorisedView, flags byte, seq
|
|||
// Remember the max sent ack.
|
||||
s.maxSentAck = rcvNxt
|
||||
|
||||
// Every time a packet containing data is sent (including a
|
||||
// retransmission), if SACK is enabled then use the conservative timer
|
||||
// described in RFC6675 Section 4.0, otherwise follow the standard time
|
||||
// described in RFC6298 Section 5.2.
|
||||
if data.Size() != 0 {
|
||||
if s.ep.sackPermitted {
|
||||
s.resendTimer.enable(s.rto)
|
||||
} else {
|
||||
if !s.resendTimer.enabled() {
|
||||
s.resendTimer.enable(s.rto)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return s.ep.sendRaw(data, flags, seq, rcvNxt, rcvWnd)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue