Merge pull request #1523 from majek:fix-1522-silly-window-rx
PiperOrigin-RevId: 289019953
This commit is contained in:
commit
b08da42285
|
@ -885,8 +885,14 @@ func (e *endpoint) ModerateRecvBuf(copied int) {
|
||||||
// reject valid data that might already be in flight as the
|
// reject valid data that might already be in flight as the
|
||||||
// acceptable window will shrink.
|
// acceptable window will shrink.
|
||||||
if rcvWnd > e.rcvBufSize {
|
if rcvWnd > e.rcvBufSize {
|
||||||
|
availBefore := e.receiveBufferAvailableLocked()
|
||||||
e.rcvBufSize = rcvWnd
|
e.rcvBufSize = rcvWnd
|
||||||
e.notifyProtocolGoroutine(notifyReceiveWindowChanged)
|
availAfter := e.receiveBufferAvailableLocked()
|
||||||
|
mask := uint32(notifyReceiveWindowChanged)
|
||||||
|
if crossed, above := e.windowCrossedACKThreshold(availAfter - availBefore); crossed && above {
|
||||||
|
mask |= notifyNonZeroReceiveWindow
|
||||||
|
}
|
||||||
|
e.notifyProtocolGoroutine(mask)
|
||||||
}
|
}
|
||||||
|
|
||||||
// We only update prevCopied when we grow the buffer because in cases
|
// We only update prevCopied when we grow the buffer because in cases
|
||||||
|
@ -955,11 +961,12 @@ func (e *endpoint) readLocked() (buffer.View, *tcpip.Error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
e.rcvBufUsed -= len(v)
|
e.rcvBufUsed -= len(v)
|
||||||
// If the window was zero before this read and if the read freed up
|
|
||||||
// enough buffer space for the scaled window to be non-zero then notify
|
// If the window was small before this read and if the read freed up
|
||||||
// the protocol goroutine to send a window update.
|
// enough buffer space, to either fit an aMSS or half a receive buffer
|
||||||
if e.zeroWindow && !e.zeroReceiveWindow(e.rcv.rcvWndScale) {
|
// (whichever smaller), then notify the protocol goroutine to send a
|
||||||
e.zeroWindow = false
|
// window update.
|
||||||
|
if crossed, above := e.windowCrossedACKThreshold(len(v)); crossed && above {
|
||||||
e.notifyProtocolGoroutine(notifyNonZeroReceiveWindow)
|
e.notifyProtocolGoroutine(notifyNonZeroReceiveWindow)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1133,16 +1140,38 @@ func (e *endpoint) Peek(vec [][]byte) (int64, tcpip.ControlMessages, *tcpip.Erro
|
||||||
return num, tcpip.ControlMessages{}, nil
|
return num, tcpip.ControlMessages{}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// zeroReceiveWindow checks if the receive window to be announced now would be
|
// windowCrossedACKThreshold checks if the receive window to be announced now
|
||||||
// zero, based on the amount of available buffer and the receive window scaling.
|
// would be under aMSS or under half receive buffer, whichever smaller. This is
|
||||||
|
// useful as a receive side silly window syndrome prevention mechanism. If
|
||||||
|
// window grows to reasonable value, we should send ACK to the sender to inform
|
||||||
|
// the rx space is now large. We also want ensure a series of small read()'s
|
||||||
|
// won't trigger a flood of spurious tiny ACK's.
|
||||||
//
|
//
|
||||||
// It must be called with rcvListMu held.
|
// For large receive buffers, the threshold is aMSS - once reader reads more
|
||||||
func (e *endpoint) zeroReceiveWindow(scale uint8) bool {
|
// than aMSS we'll send ACK. For tiny receive buffers, the threshold is half of
|
||||||
if e.rcvBufUsed >= e.rcvBufSize {
|
// receive buffer size. This is chosen arbitrairly.
|
||||||
return true
|
// crossed will be true if the window size crossed the ACK threshold.
|
||||||
|
// above will be true if the new window is >= ACK threshold and false
|
||||||
|
// otherwise.
|
||||||
|
func (e *endpoint) windowCrossedACKThreshold(deltaBefore int) (crossed bool, above bool) {
|
||||||
|
newAvail := e.receiveBufferAvailableLocked()
|
||||||
|
oldAvail := newAvail - deltaBefore
|
||||||
|
if oldAvail < 0 {
|
||||||
|
oldAvail = 0
|
||||||
}
|
}
|
||||||
|
|
||||||
return ((e.rcvBufSize - e.rcvBufUsed) >> scale) == 0
|
threshold := int(e.amss)
|
||||||
|
if threshold > e.rcvBufSize/2 {
|
||||||
|
threshold = e.rcvBufSize / 2
|
||||||
|
}
|
||||||
|
|
||||||
|
switch {
|
||||||
|
case oldAvail < threshold && newAvail >= threshold:
|
||||||
|
return true, true
|
||||||
|
case oldAvail >= threshold && newAvail < threshold:
|
||||||
|
return true, false
|
||||||
|
}
|
||||||
|
return false, false
|
||||||
}
|
}
|
||||||
|
|
||||||
// SetSockOptBool sets a socket option.
|
// SetSockOptBool sets a socket option.
|
||||||
|
@ -1204,10 +1233,16 @@ func (e *endpoint) SetSockOptInt(opt tcpip.SockOptInt, v int) *tcpip.Error {
|
||||||
size = math.MaxInt32 / 2
|
size = math.MaxInt32 / 2
|
||||||
}
|
}
|
||||||
|
|
||||||
|
availBefore := e.receiveBufferAvailableLocked()
|
||||||
e.rcvBufSize = size
|
e.rcvBufSize = size
|
||||||
|
availAfter := e.receiveBufferAvailableLocked()
|
||||||
|
|
||||||
e.rcvAutoParams.disabled = true
|
e.rcvAutoParams.disabled = true
|
||||||
if e.zeroWindow && !e.zeroReceiveWindow(scale) {
|
|
||||||
e.zeroWindow = false
|
// Immediately send an ACK to uncork the sender silly window
|
||||||
|
// syndrome prevetion, when our available space grows above aMSS
|
||||||
|
// or half receive buffer, whichever smaller.
|
||||||
|
if crossed, above := e.windowCrossedACKThreshold(availAfter - availBefore); crossed && above {
|
||||||
mask |= notifyNonZeroReceiveWindow
|
mask |= notifyNonZeroReceiveWindow
|
||||||
}
|
}
|
||||||
e.rcvListMu.Unlock()
|
e.rcvListMu.Unlock()
|
||||||
|
@ -2225,13 +2260,10 @@ func (e *endpoint) readyToRead(s *segment) {
|
||||||
if s != nil {
|
if s != nil {
|
||||||
s.incRef()
|
s.incRef()
|
||||||
e.rcvBufUsed += s.data.Size()
|
e.rcvBufUsed += s.data.Size()
|
||||||
// Check if the receive window is now closed. If so make sure
|
// Increase counter if the receive window falls down below MSS
|
||||||
// we set the zero window before we deliver the segment to ensure
|
// or half receive buffer size, whichever smaller.
|
||||||
// that a subsequent read of the segment will correctly trigger
|
if crossed, above := e.windowCrossedACKThreshold(-s.data.Size()); crossed && !above {
|
||||||
// a non-zero notification.
|
|
||||||
if avail := e.receiveBufferAvailableLocked(); avail>>e.rcv.rcvWndScale == 0 {
|
|
||||||
e.stats.ReceiveErrors.ZeroRcvWindowState.Increment()
|
e.stats.ReceiveErrors.ZeroRcvWindowState.Increment()
|
||||||
e.zeroWindow = true
|
|
||||||
}
|
}
|
||||||
e.rcvList.PushBack(s)
|
e.rcvList.PushBack(s)
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -98,12 +98,6 @@ func (r *receiver) getSendParams() (rcvNxt seqnum.Value, rcvWnd seqnum.Size) {
|
||||||
// in such cases we may need to send an ack to indicate to our peer that it can
|
// in such cases we may need to send an ack to indicate to our peer that it can
|
||||||
// resume sending data.
|
// resume sending data.
|
||||||
func (r *receiver) nonZeroWindow() {
|
func (r *receiver) nonZeroWindow() {
|
||||||
if (r.rcvAcc-r.rcvNxt)>>r.rcvWndScale != 0 {
|
|
||||||
// We never got around to announcing a zero window size, so we
|
|
||||||
// don't need to immediately announce a nonzero one.
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Immediately send an ack.
|
// Immediately send an ack.
|
||||||
r.ep.snd.sendAck()
|
r.ep.snd.sendAck()
|
||||||
}
|
}
|
||||||
|
|
|
@ -2091,10 +2091,14 @@ func TestZeroScaledWindowReceive(t *testing.T) {
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Read some data. An ack should be sent in response to that.
|
// Read at least 1MSS of data. An ack should be sent in response to that.
|
||||||
v, _, err := c.EP.Read(nil)
|
sz := 0
|
||||||
if err != nil {
|
for sz < defaultMTU {
|
||||||
t.Fatalf("Read failed: %v", err)
|
v, _, err := c.EP.Read(nil)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Read failed: %v", err)
|
||||||
|
}
|
||||||
|
sz += len(v)
|
||||||
}
|
}
|
||||||
|
|
||||||
checker.IPv4(t, c.GetPacket(),
|
checker.IPv4(t, c.GetPacket(),
|
||||||
|
@ -2103,7 +2107,7 @@ func TestZeroScaledWindowReceive(t *testing.T) {
|
||||||
checker.DstPort(context.TestPort),
|
checker.DstPort(context.TestPort),
|
||||||
checker.SeqNum(uint32(c.IRS)+1),
|
checker.SeqNum(uint32(c.IRS)+1),
|
||||||
checker.AckNum(uint32(790+sent)),
|
checker.AckNum(uint32(790+sent)),
|
||||||
checker.Window(uint16(len(v)>>ws)),
|
checker.Window(uint16(sz>>ws)),
|
||||||
checker.TCPFlags(header.TCPFlagAck),
|
checker.TCPFlags(header.TCPFlagAck),
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
|
@ -6556,3 +6560,140 @@ func TestKeepaliveWithUserTimeout(t *testing.T) {
|
||||||
t.Errorf("got c.Stack().Stats().TCP.EstablishedTimedout = %v, want = %v", got, want)
|
t.Errorf("got c.Stack().Stats().TCP.EstablishedTimedout = %v, want = %v", got, want)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestIncreaseWindowOnReceive(t *testing.T) {
|
||||||
|
// This test ensures that the endpoint sends an ack,
|
||||||
|
// after recv() when the window grows to more than 1 MSS.
|
||||||
|
c := context.New(t, defaultMTU)
|
||||||
|
defer c.Cleanup()
|
||||||
|
|
||||||
|
const rcvBuf = 65535 * 10
|
||||||
|
c.CreateConnected(789, 30000, rcvBuf)
|
||||||
|
|
||||||
|
// Write chunks of ~30000 bytes. It's important that two
|
||||||
|
// payloads make it equal or longer than MSS.
|
||||||
|
remain := rcvBuf
|
||||||
|
sent := 0
|
||||||
|
data := make([]byte, defaultMTU/2)
|
||||||
|
lastWnd := uint16(0)
|
||||||
|
|
||||||
|
for remain > len(data) {
|
||||||
|
c.SendPacket(data, &context.Headers{
|
||||||
|
SrcPort: context.TestPort,
|
||||||
|
DstPort: c.Port,
|
||||||
|
Flags: header.TCPFlagAck,
|
||||||
|
SeqNum: seqnum.Value(790 + sent),
|
||||||
|
AckNum: c.IRS.Add(1),
|
||||||
|
RcvWnd: 30000,
|
||||||
|
})
|
||||||
|
sent += len(data)
|
||||||
|
remain -= len(data)
|
||||||
|
|
||||||
|
lastWnd = uint16(remain)
|
||||||
|
if remain > 0xffff {
|
||||||
|
lastWnd = 0xffff
|
||||||
|
}
|
||||||
|
checker.IPv4(t, c.GetPacket(),
|
||||||
|
checker.PayloadLen(header.TCPMinimumSize),
|
||||||
|
checker.TCP(
|
||||||
|
checker.DstPort(context.TestPort),
|
||||||
|
checker.SeqNum(uint32(c.IRS)+1),
|
||||||
|
checker.AckNum(uint32(790+sent)),
|
||||||
|
checker.Window(lastWnd),
|
||||||
|
checker.TCPFlags(header.TCPFlagAck),
|
||||||
|
),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
if lastWnd == 0xffff || lastWnd == 0 {
|
||||||
|
t.Fatalf("expected small, non-zero window: %d", lastWnd)
|
||||||
|
}
|
||||||
|
|
||||||
|
// We now have < 1 MSS in the buffer space. Read the data! An
|
||||||
|
// ack should be sent in response to that. The window was not
|
||||||
|
// zero, but it grew to larger than MSS.
|
||||||
|
if _, _, err := c.EP.Read(nil); err != nil {
|
||||||
|
t.Fatalf("Read failed: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, _, err := c.EP.Read(nil); err != nil {
|
||||||
|
t.Fatalf("Read failed: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// After reading two packets, we surely crossed MSS. See the ack:
|
||||||
|
checker.IPv4(t, c.GetPacket(),
|
||||||
|
checker.PayloadLen(header.TCPMinimumSize),
|
||||||
|
checker.TCP(
|
||||||
|
checker.DstPort(context.TestPort),
|
||||||
|
checker.SeqNum(uint32(c.IRS)+1),
|
||||||
|
checker.AckNum(uint32(790+sent)),
|
||||||
|
checker.Window(uint16(0xffff)),
|
||||||
|
checker.TCPFlags(header.TCPFlagAck),
|
||||||
|
),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestIncreaseWindowOnBufferResize(t *testing.T) {
|
||||||
|
// This test ensures that the endpoint sends an ack,
|
||||||
|
// after available recv buffer grows to more than 1 MSS.
|
||||||
|
c := context.New(t, defaultMTU)
|
||||||
|
defer c.Cleanup()
|
||||||
|
|
||||||
|
const rcvBuf = 65535 * 10
|
||||||
|
c.CreateConnected(789, 30000, rcvBuf)
|
||||||
|
|
||||||
|
// Write chunks of ~30000 bytes. It's important that two
|
||||||
|
// payloads make it equal or longer than MSS.
|
||||||
|
remain := rcvBuf
|
||||||
|
sent := 0
|
||||||
|
data := make([]byte, defaultMTU/2)
|
||||||
|
lastWnd := uint16(0)
|
||||||
|
|
||||||
|
for remain > len(data) {
|
||||||
|
c.SendPacket(data, &context.Headers{
|
||||||
|
SrcPort: context.TestPort,
|
||||||
|
DstPort: c.Port,
|
||||||
|
Flags: header.TCPFlagAck,
|
||||||
|
SeqNum: seqnum.Value(790 + sent),
|
||||||
|
AckNum: c.IRS.Add(1),
|
||||||
|
RcvWnd: 30000,
|
||||||
|
})
|
||||||
|
sent += len(data)
|
||||||
|
remain -= len(data)
|
||||||
|
|
||||||
|
lastWnd = uint16(remain)
|
||||||
|
if remain > 0xffff {
|
||||||
|
lastWnd = 0xffff
|
||||||
|
}
|
||||||
|
checker.IPv4(t, c.GetPacket(),
|
||||||
|
checker.PayloadLen(header.TCPMinimumSize),
|
||||||
|
checker.TCP(
|
||||||
|
checker.DstPort(context.TestPort),
|
||||||
|
checker.SeqNum(uint32(c.IRS)+1),
|
||||||
|
checker.AckNum(uint32(790+sent)),
|
||||||
|
checker.Window(lastWnd),
|
||||||
|
checker.TCPFlags(header.TCPFlagAck),
|
||||||
|
),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
if lastWnd == 0xffff || lastWnd == 0 {
|
||||||
|
t.Fatalf("expected small, non-zero window: %d", lastWnd)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Increasing the buffer from should generate an ACK,
|
||||||
|
// since window grew from small value to larger equal MSS
|
||||||
|
c.EP.SetSockOptInt(tcpip.ReceiveBufferSizeOption, rcvBuf*2)
|
||||||
|
|
||||||
|
// After reading two packets, we surely crossed MSS. See the ack:
|
||||||
|
checker.IPv4(t, c.GetPacket(),
|
||||||
|
checker.PayloadLen(header.TCPMinimumSize),
|
||||||
|
checker.TCP(
|
||||||
|
checker.DstPort(context.TestPort),
|
||||||
|
checker.SeqNum(uint32(c.IRS)+1),
|
||||||
|
checker.AckNum(uint32(790+sent)),
|
||||||
|
checker.Window(uint16(0xffff)),
|
||||||
|
checker.TCPFlags(header.TCPFlagAck),
|
||||||
|
),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue