gvisor/pkg/tcpip/transport/tcp/endpoint.go

2070 lines
56 KiB
Go
Raw Normal View History

// Copyright 2018 The gVisor Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tcp
import (
"fmt"
"math"
"strings"
"sync"
"sync/atomic"
"time"
"gvisor.dev/gvisor/pkg/rand"
"gvisor.dev/gvisor/pkg/sleep"
"gvisor.dev/gvisor/pkg/tcpip"
"gvisor.dev/gvisor/pkg/tcpip/buffer"
"gvisor.dev/gvisor/pkg/tcpip/header"
"gvisor.dev/gvisor/pkg/tcpip/seqnum"
"gvisor.dev/gvisor/pkg/tcpip/stack"
"gvisor.dev/gvisor/pkg/tmutex"
"gvisor.dev/gvisor/pkg/waiter"
)
// EndpointState represents the state of a TCP endpoint.
type EndpointState uint32
// Endpoint states. Note that are represented in a netstack-specific manner and
// may not be meaningful externally. Specifically, they need to be translated to
// Linux's representation for these states if presented to userspace.
const (
// Endpoint states internal to netstack. These map to the TCP state CLOSED.
StateInitial EndpointState = iota
StateBound
StateConnecting // Connect() called, but the initial SYN hasn't been sent.
StateError
// TCP protocol states.
StateEstablished
StateSynSent
StateSynRecv
StateFinWait1
StateFinWait2
StateTimeWait
StateClose
StateCloseWait
StateLastAck
StateListen
StateClosing
)
// connected is the set of states where an endpoint is connected to a peer.
func (s EndpointState) connected() bool {
switch s {
case StateEstablished, StateFinWait1, StateFinWait2, StateTimeWait, StateCloseWait, StateLastAck, StateClosing:
return true
default:
return false
}
}
// String implements fmt.Stringer.String.
func (s EndpointState) String() string {
switch s {
case StateInitial:
return "INITIAL"
case StateBound:
return "BOUND"
case StateConnecting:
return "CONNECTING"
case StateError:
return "ERROR"
case StateEstablished:
return "ESTABLISHED"
case StateSynSent:
return "SYN-SENT"
case StateSynRecv:
return "SYN-RCVD"
case StateFinWait1:
return "FIN-WAIT1"
case StateFinWait2:
return "FIN-WAIT2"
case StateTimeWait:
return "TIME-WAIT"
case StateClose:
return "CLOSED"
case StateCloseWait:
return "CLOSE-WAIT"
case StateLastAck:
return "LAST-ACK"
case StateListen:
return "LISTEN"
case StateClosing:
return "CLOSING"
default:
panic("unreachable")
}
}
// Reasons for notifying the protocol goroutine.
const (
notifyNonZeroReceiveWindow = 1 << iota
notifyReceiveWindowChanged
notifyClose
notifyMTUChanged
notifyDrain
notifyReset
notifyKeepaliveChanged
notifyMSSChanged
)
// SACKInfo holds TCP SACK related information for a given endpoint.
//
// +stateify savable
type SACKInfo struct {
// Blocks is the maximum number of SACK blocks we track
// per endpoint.
Blocks [MaxSACKBlocks]header.SACKBlock
// NumBlocks is the number of valid SACK blocks stored in the
// blocks array above.
NumBlocks int
}
// rcvBufAutoTuneParams are used to hold state variables to compute
// the auto tuned recv buffer size.
//
// +stateify savable
type rcvBufAutoTuneParams struct {
// measureTime is the time at which the current measurement
// was started.
measureTime time.Time `state:".(unixTime)"`
// copied is the number of bytes copied out of the receive
// buffers since this measure began.
copied int
// prevCopied is the number of bytes copied out of the receive
// buffers in the previous RTT period.
prevCopied int
// rtt is the non-smoothed minimum RTT as measured by observing the time
// between when a byte is first acknowledged and the receipt of data
// that is at least one window beyond the sequence number that was
// acknowledged.
rtt time.Duration
// rttMeasureSeqNumber is the highest acceptable sequence number at the
// time this RTT measurement period began.
rttMeasureSeqNumber seqnum.Value
// rttMeasureTime is the absolute time at which the current rtt
// measurement period began.
rttMeasureTime time.Time `state:".(unixTime)"`
// disabled is true if an explicit receive buffer is set for the
// endpoint.
disabled bool
}
// endpoint represents a TCP endpoint. This struct serves as the interface
// between users of the endpoint and the protocol implementation; it is legal to
// have concurrent goroutines make calls into the endpoint, they are properly
// synchronized. The protocol implementation, however, runs in a single
// goroutine.
//
// +stateify savable
type endpoint struct {
// workMu is used to arbitrate which goroutine may perform protocol
// work. Only the main protocol goroutine is expected to call Lock() on
// it, but other goroutines (e.g., send) may call TryLock() to eagerly
// perform work without having to wait for the main one to wake up.
workMu tmutex.Mutex `state:"nosave"`
// The following fields are initialized at creation time and do not
// change throughout the lifetime of the endpoint.
stack *stack.Stack `state:"manual"`
netProto tcpip.NetworkProtocolNumber
waiterQueue *waiter.Queue `state:"wait"`
// lastError represents the last error that the endpoint reported;
// access to it is protected by the following mutex.
lastErrorMu sync.Mutex `state:"nosave"`
lastError *tcpip.Error `state:".(string)"`
// The following fields are used to manage the receive queue. The
// protocol goroutine adds ready-for-delivery segments to rcvList,
// which are returned by Read() calls to users.
//
// Once the peer has closed its send side, rcvClosed is set to true
// to indicate to users that no more data is coming.
//
// rcvListMu can be taken after the endpoint mu below.
rcvListMu sync.Mutex `state:"nosave"`
rcvList segmentList `state:"wait"`
rcvClosed bool
rcvBufSize int
rcvBufUsed int
rcvAutoParams rcvBufAutoTuneParams
// zeroWindow indicates that the window was closed due to receive buffer
// space being filled up. This is set by the worker goroutine before
// moving a segment to the rcvList. This setting is cleared by the
// endpoint when a Read() call reads enough data for the new window to
// be non-zero.
zeroWindow bool
// The following fields are protected by the mutex.
mu sync.RWMutex `state:"nosave"`
id stack.TransportEndpointID
state EndpointState `state:".(EndpointState)"`
isPortReserved bool `state:"manual"`
isRegistered bool
boundNICID tcpip.NICID `state:"manual"`
route stack.Route `state:"manual"`
v6only bool
isConnectNotified bool
// TCP should never broadcast but Linux nevertheless supports enabling/
// disabling SO_BROADCAST, albeit as a NOOP.
broadcast bool
// effectiveNetProtos contains the network protocols actually in use. In
// most cases it will only contain "netProto", but in cases like IPv6
// endpoints with v6only set to false, this could include multiple
// protocols (e.g., IPv6 and IPv4) or a single different protocol (e.g.,
// IPv4 when IPv6 endpoint is bound or connected to an IPv4 mapped
// address).
effectiveNetProtos []tcpip.NetworkProtocolNumber `state:"manual"`
// hardError is meaningful only when state is stateError, it stores the
// error to be returned when read/write syscalls are called and the
// endpoint is in this state. hardError is protected by mu.
hardError *tcpip.Error `state:".(string)"`
// workerRunning specifies if a worker goroutine is running.
workerRunning bool
// workerCleanup specifies if the worker goroutine must perform cleanup
// before exitting. This can only be set to true when workerRunning is
// also true, and they're both protected by the mutex.
workerCleanup bool
// sendTSOk is used to indicate when the TS Option has been negotiated.
// When sendTSOk is true every non-RST segment should carry a TS as per
// RFC7323#section-1.1
sendTSOk bool
// recentTS is the timestamp that should be sent in the TSEcr field of
// the timestamp for future segments sent by the endpoint. This field is
// updated if required when a new segment is received by this endpoint.
recentTS uint32
// tsOffset is a randomized offset added to the value of the
// TSVal field in the timestamp option.
tsOffset uint32
// shutdownFlags represent the current shutdown state of the endpoint.
shutdownFlags tcpip.ShutdownFlags
// sackPermitted is set to true if the peer sends the TCPSACKPermitted
// option in the SYN/SYN-ACK.
sackPermitted bool
// sack holds TCP SACK related information for this endpoint.
sack SACKInfo
// reusePort is set to true if SO_REUSEPORT is enabled.
reusePort bool
// delay enables Nagle's algorithm.
//
// delay is a boolean (0 is false) and must be accessed atomically.
delay uint32
// cork holds back segments until full.
//
// cork is a boolean (0 is false) and must be accessed atomically.
cork uint32
// scoreboard holds TCP SACK Scoreboard information for this endpoint.
scoreboard *SACKScoreboard
// The options below aren't implemented, but we remember the user
// settings because applications expect to be able to set/query these
// options.
reuseAddr bool
// slowAck holds the negated state of quick ack. It is stubbed out and
// does nothing.
//
// slowAck is a boolean (0 is false) and must be accessed atomically.
slowAck uint32
// segmentQueue is used to hand received segments to the protocol
// goroutine. Segments are queued as long as the queue is not full,
// and dropped when it is.
segmentQueue segmentQueue `state:"wait"`
Fixes to TCP listen behavior. Netstack listen loop can get stuck if cookies are in-use and the app is slow to accept incoming connections. Further we continue to complete handshake for a connection even if the backlog is full. This creates a problem when a lots of connections come in rapidly and we end up with lots of completed connections just hanging around to be delivered. These fixes change netstack behaviour to mirror what linux does as described here in the following article http://veithen.io/2014/01/01/how-tcp-backlog-works-in-linux.html Now when cookies are not in-use Netstack will silently drop the ACK to a SYN-ACK and not complete the handshake if the backlog is full. This will result in the connection staying in a half-complete state. Eventually the sender will retransmit the ACK and if backlog has space we will transition to a connected state and deliver the endpoint. Similarly when cookies are in use we do not try and create an endpoint unless there is space in the accept queue to accept the newly created endpoint. If there is no space then we again silently drop the ACK as we can just recreate it when the ACK is retransmitted by the peer. We also now use the backlog to cap the size of the SYN-RCVD queue for a given endpoint. So at any time there can be N connections in the backlog and N in a SYN-RCVD state if the application is not accepting connections. Any new SYNs will be dropped. This CL also fixes another small bug where we mark a new endpoint which has not completed handshake as connected. We should wait till handshake successfully completes before marking it connected. Updates #236 PiperOrigin-RevId: 250717817
2019-05-30 17:47:11 +00:00
// synRcvdCount is the number of connections for this endpoint that are
// in SYN-RCVD state.
synRcvdCount int
// userMSS if non-zero is the MSS value explicitly set by the user
// for this endpoint using the TCP_MAXSEG setsockopt.
userMSS int
// The following fields are used to manage the send buffer. When
// segments are ready to be sent, they are added to sndQueue and the
// protocol goroutine is signaled via sndWaker.
//
// When the send side is closed, the protocol goroutine is notified via
// sndCloseWaker, and sndClosed is set to true.
sndBufMu sync.Mutex `state:"nosave"`
sndBufSize int
sndBufUsed int
sndClosed bool
sndBufInQueue seqnum.Size
sndQueue segmentList `state:"wait"`
sndWaker sleep.Waker `state:"manual"`
sndCloseWaker sleep.Waker `state:"manual"`
// cc stores the name of the Congestion Control algorithm to use for
// this endpoint.
cc tcpip.CongestionControlOption
// The following are used when a "packet too big" control packet is
// received. They are protected by sndBufMu. They are used to
// communicate to the main protocol goroutine how many such control
// messages have been received since the last notification was processed
// and what was the smallest MTU seen.
packetTooBigCount int
sndMTU int
// newSegmentWaker is used to indicate to the protocol goroutine that
// it needs to wake up and handle new segments queued to it.
newSegmentWaker sleep.Waker `state:"manual"`
// notificationWaker is used to indicate to the protocol goroutine that
// it needs to wake up and check for notifications.
notificationWaker sleep.Waker `state:"manual"`
// notifyFlags is a bitmask of flags used to indicate to the protocol
// goroutine what it was notified; this is only accessed atomically.
notifyFlags uint32 `state:"nosave"`
// keepalive manages TCP keepalive state. When the connection is idle
// (no data sent or received) for keepaliveIdle, we start sending
// keepalives every keepalive.interval. If we send keepalive.count
// without hearing a response, the connection is closed.
keepalive keepalive
// acceptedChan is used by a listening endpoint protocol goroutine to
// send newly accepted connections to the endpoint so that they can be
// read by Accept() calls.
acceptedChan chan *endpoint `state:".([]*endpoint)"`
// The following are only used from the protocol goroutine, and
// therefore don't need locks to protect them.
rcv *receiver `state:"wait"`
snd *sender `state:"wait"`
// The goroutine drain completion notification channel.
drainDone chan struct{} `state:"nosave"`
// The goroutine undrain notification channel.
undrain chan struct{} `state:"nosave"`
// probe if not nil is invoked on every received segment. It is passed
// a copy of the current state of the endpoint.
probe stack.TCPProbeFunc `state:"nosave"`
// The following are only used to assist the restore run to re-connect.
bindAddress tcpip.Address
connectingAddress tcpip.Address
// amss is the advertised MSS to the peer by this endpoint.
amss uint16
gso *stack.GSO
}
// StopWork halts packet processing. Only to be used in tests.
func (e *endpoint) StopWork() {
e.workMu.Lock()
}
// ResumeWork resumes packet processing. Only to be used in tests.
func (e *endpoint) ResumeWork() {
e.workMu.Unlock()
}
// keepalive is a synchronization wrapper used to appease stateify. See the
// comment in endpoint, where it is used.
//
// +stateify savable
type keepalive struct {
sync.Mutex `state:"nosave"`
enabled bool
idle time.Duration
interval time.Duration
count int
unacked int
timer timer `state:"nosave"`
waker sleep.Waker `state:"nosave"`
}
func newEndpoint(stack *stack.Stack, netProto tcpip.NetworkProtocolNumber, waiterQueue *waiter.Queue) *endpoint {
e := &endpoint{
stack: stack,
netProto: netProto,
waiterQueue: waiterQueue,
state: StateInitial,
rcvBufSize: DefaultReceiveBufferSize,
sndBufSize: DefaultSendBufferSize,
sndMTU: int(math.MaxInt32),
reuseAddr: true,
keepalive: keepalive{
// Linux defaults.
idle: 2 * time.Hour,
interval: 75 * time.Second,
count: 9,
},
}
var ss SendBufferSizeOption
if err := stack.TransportProtocolOption(ProtocolNumber, &ss); err == nil {
e.sndBufSize = ss.Default
}
var rs ReceiveBufferSizeOption
if err := stack.TransportProtocolOption(ProtocolNumber, &rs); err == nil {
e.rcvBufSize = rs.Default
}
var cs tcpip.CongestionControlOption
if err := stack.TransportProtocolOption(ProtocolNumber, &cs); err == nil {
e.cc = cs
}
var mrb tcpip.ModerateReceiveBufferOption
if err := stack.TransportProtocolOption(ProtocolNumber, &mrb); err == nil {
e.rcvAutoParams.disabled = !bool(mrb)
}
if p := stack.GetTCPProbe(); p != nil {
e.probe = p
}
e.segmentQueue.setLimit(MaxUnprocessedSegments)
e.workMu.Init()
e.workMu.Lock()
e.tsOffset = timeStampOffset()
return e
}
// Readiness returns the current readiness of the endpoint. For example, if
// waiter.EventIn is set, the endpoint is immediately readable.
func (e *endpoint) Readiness(mask waiter.EventMask) waiter.EventMask {
result := waiter.EventMask(0)
e.mu.RLock()
defer e.mu.RUnlock()
switch e.state {
case StateInitial, StateBound, StateConnecting, StateSynSent, StateSynRecv:
// Ready for nothing.
case StateClose, StateError:
// Ready for anything.
result = mask
case StateListen:
// Check if there's anything in the accepted channel.
if (mask & waiter.EventIn) != 0 {
if len(e.acceptedChan) > 0 {
result |= waiter.EventIn
}
}
}
if e.state.connected() {
// Determine if the endpoint is writable if requested.
if (mask & waiter.EventOut) != 0 {
e.sndBufMu.Lock()
if e.sndClosed || e.sndBufUsed < e.sndBufSize {
result |= waiter.EventOut
}
e.sndBufMu.Unlock()
}
// Determine if the endpoint is readable if requested.
if (mask & waiter.EventIn) != 0 {
e.rcvListMu.Lock()
if e.rcvBufUsed > 0 || e.rcvClosed {
result |= waiter.EventIn
}
e.rcvListMu.Unlock()
}
}
return result
}
func (e *endpoint) fetchNotifications() uint32 {
return atomic.SwapUint32(&e.notifyFlags, 0)
}
func (e *endpoint) notifyProtocolGoroutine(n uint32) {
for {
v := atomic.LoadUint32(&e.notifyFlags)
if v&n == n {
// The flags are already set.
return
}
if atomic.CompareAndSwapUint32(&e.notifyFlags, v, v|n) {
if v == 0 {
// We are causing a transition from no flags to
// at least one flag set, so we must cause the
// protocol goroutine to wake up.
e.notificationWaker.Assert()
}
return
}
}
}
// Close puts the endpoint in a closed state and frees all resources associated
// with it. It must be called only once and with no other concurrent calls to
// the endpoint.
func (e *endpoint) Close() {
// Issue a shutdown so that the peer knows we won't send any more data
// if we're connected, or stop accepting if we're listening.
e.Shutdown(tcpip.ShutdownWrite | tcpip.ShutdownRead)
e.mu.Lock()
// For listening sockets, we always release ports inline so that they
// are immediately available for reuse after Close() is called. If also
// registered, we unregister as well otherwise the next user would fail
// in Listen() when trying to register.
if e.state == StateListen && e.isPortReserved {
if e.isRegistered {
e.stack.UnregisterTransportEndpoint(e.boundNICID, e.effectiveNetProtos, ProtocolNumber, e.id, e)
e.isRegistered = false
}
e.stack.ReleasePort(e.effectiveNetProtos, ProtocolNumber, e.id.LocalAddress, e.id.LocalPort)
e.isPortReserved = false
}
// Either perform the local cleanup or kick the worker to make sure it
// knows it needs to cleanup.
tcpip.AddDanglingEndpoint(e)
if !e.workerRunning {
e.cleanupLocked()
} else {
e.workerCleanup = true
e.notifyProtocolGoroutine(notifyClose)
}
e.mu.Unlock()
}
// cleanupLocked frees all resources associated with the endpoint. It is called
// after Close() is called and the worker goroutine (if any) is done with its
// work.
func (e *endpoint) cleanupLocked() {
// Close all endpoints that might have been accepted by TCP but not by
// the client.
if e.acceptedChan != nil {
close(e.acceptedChan)
for n := range e.acceptedChan {
n.mu.Lock()
n.resetConnectionLocked(tcpip.ErrConnectionAborted)
n.mu.Unlock()
n.Close()
}
e.acceptedChan = nil
}
e.workerCleanup = false
if e.isRegistered {
e.stack.UnregisterTransportEndpoint(e.boundNICID, e.effectiveNetProtos, ProtocolNumber, e.id, e)
e.isRegistered = false
}
if e.isPortReserved {
e.stack.ReleasePort(e.effectiveNetProtos, ProtocolNumber, e.id.LocalAddress, e.id.LocalPort)
e.isPortReserved = false
}
e.route.Release()
tcpip.DeleteDanglingEndpoint(e)
}
// initialReceiveWindow returns the initial receive window to advertise in the
// SYN/SYN-ACK.
func (e *endpoint) initialReceiveWindow() int {
rcvWnd := e.receiveBufferAvailable()
if rcvWnd > math.MaxUint16 {
rcvWnd = math.MaxUint16
}
routeWnd := InitialCwnd * int(mssForRoute(&e.route)) * 2
if rcvWnd > routeWnd {
rcvWnd = routeWnd
}
return rcvWnd
}
// ModerateRecvBuf adjusts the receive buffer and the advertised window
// based on the number of bytes copied to user space.
func (e *endpoint) ModerateRecvBuf(copied int) {
e.rcvListMu.Lock()
if e.rcvAutoParams.disabled {
e.rcvListMu.Unlock()
return
}
now := time.Now()
if rtt := e.rcvAutoParams.rtt; rtt == 0 || now.Sub(e.rcvAutoParams.measureTime) < rtt {
e.rcvAutoParams.copied += copied
e.rcvListMu.Unlock()
return
}
prevRTTCopied := e.rcvAutoParams.copied + copied
prevCopied := e.rcvAutoParams.prevCopied
rcvWnd := 0
if prevRTTCopied > prevCopied {
// The minimal receive window based on what was copied by the app
// in the immediate preceding RTT and some extra buffer for 16
// segments to account for variations.
// We multiply by 2 to account for packet losses.
rcvWnd = prevRTTCopied*2 + 16*int(e.amss)
// Scale for slow start based on bytes copied in this RTT vs previous.
grow := (rcvWnd * (prevRTTCopied - prevCopied)) / prevCopied
// Multiply growth factor by 2 again to account for sender being
// in slow-start where the sender grows it's congestion window
// by 100% per RTT.
rcvWnd += grow * 2
// Make sure auto tuned buffer size can always receive upto 2x
// the initial window of 10 segments.
if minRcvWnd := int(e.amss) * InitialCwnd * 2; rcvWnd < minRcvWnd {
rcvWnd = minRcvWnd
}
// Cap the auto tuned buffer size by the maximum permissible
// receive buffer size.
if max := e.maxReceiveBufferSize(); rcvWnd > max {
rcvWnd = max
}
// We do not adjust downwards as that can cause the receiver to
// reject valid data that might already be in flight as the
// acceptable window will shrink.
if rcvWnd > e.rcvBufSize {
e.rcvBufSize = rcvWnd
e.notifyProtocolGoroutine(notifyReceiveWindowChanged)
}
// We only update prevCopied when we grow the buffer because in cases
// where prevCopied > prevRTTCopied the existing buffer is already big
// enough to handle the current rate and we don't need to do any
// adjustments.
e.rcvAutoParams.prevCopied = prevRTTCopied
}
e.rcvAutoParams.measureTime = now
e.rcvAutoParams.copied = 0
e.rcvListMu.Unlock()
}
// Read reads data from the endpoint.
func (e *endpoint) Read(*tcpip.FullAddress) (buffer.View, tcpip.ControlMessages, *tcpip.Error) {
e.mu.RLock()
// The endpoint can be read if it's connected, or if it's already closed
// but has some pending unread data. Also note that a RST being received
// would cause the state to become StateError so we should allow the
// reads to proceed before returning a ECONNRESET.
e.rcvListMu.Lock()
bufUsed := e.rcvBufUsed
if s := e.state; !s.connected() && s != StateClose && bufUsed == 0 {
e.rcvListMu.Unlock()
he := e.hardError
e.mu.RUnlock()
if s == StateError {
return buffer.View{}, tcpip.ControlMessages{}, he
}
return buffer.View{}, tcpip.ControlMessages{}, tcpip.ErrInvalidEndpointState
}
v, err := e.readLocked()
e.rcvListMu.Unlock()
e.mu.RUnlock()
return v, tcpip.ControlMessages{}, err
}
func (e *endpoint) readLocked() (buffer.View, *tcpip.Error) {
if e.rcvBufUsed == 0 {
if e.rcvClosed || !e.state.connected() {
return buffer.View{}, tcpip.ErrClosedForReceive
}
return buffer.View{}, tcpip.ErrWouldBlock
}
s := e.rcvList.Front()
views := s.data.Views()
v := views[s.viewToDeliver]
s.viewToDeliver++
if s.viewToDeliver >= len(views) {
e.rcvList.Remove(s)
s.decRef()
}
e.rcvBufUsed -= len(v)
// If the window was zero before this read and if the read freed up
// enough buffer space for the scaled window to be non-zero then notify
// the protocol goroutine to send a window update.
if e.zeroWindow && !e.zeroReceiveWindow(e.rcv.rcvWndScale) {
e.zeroWindow = false
e.notifyProtocolGoroutine(notifyNonZeroReceiveWindow)
}
return v, nil
}
// Write writes data to the endpoint's peer.
func (e *endpoint) Write(p tcpip.Payload, opts tcpip.WriteOptions) (uintptr, <-chan struct{}, *tcpip.Error) {
// Linux completely ignores any address passed to sendto(2) for TCP sockets
// (without the MSG_FASTOPEN flag). Corking is unimplemented, so opts.More
// and opts.EndOfRecord are also ignored.
e.mu.RLock()
defer e.mu.RUnlock()
// The endpoint cannot be written to if it's not connected.
if !e.state.connected() {
switch e.state {
case StateError:
return 0, nil, e.hardError
default:
return 0, nil, tcpip.ErrClosedForSend
}
}
// Nothing to do if the buffer is empty.
if p.Size() == 0 {
return 0, nil, nil
}
e.sndBufMu.Lock()
// Check if the connection has already been closed for sends.
if e.sndClosed {
e.sndBufMu.Unlock()
return 0, nil, tcpip.ErrClosedForSend
}
// Check against the limit.
avail := e.sndBufSize - e.sndBufUsed
if avail <= 0 {
e.sndBufMu.Unlock()
return 0, nil, tcpip.ErrWouldBlock
}
v, perr := p.Get(avail)
if perr != nil {
e.sndBufMu.Unlock()
return 0, nil, perr
}
l := len(v)
s := newSegmentFromView(&e.route, e.id, v)
// Add data to the send queue.
e.sndBufUsed += l
e.sndBufInQueue += seqnum.Size(l)
e.sndQueue.PushBack(s)
e.sndBufMu.Unlock()
if e.workMu.TryLock() {
// Do the work inline.
e.handleWrite()
e.workMu.Unlock()
} else {
// Let the protocol goroutine do the work.
e.sndWaker.Assert()
}
return uintptr(l), nil, nil
}
// Peek reads data without consuming it from the endpoint.
//
// This method does not block if there is no data pending.
func (e *endpoint) Peek(vec [][]byte) (uintptr, tcpip.ControlMessages, *tcpip.Error) {
e.mu.RLock()
defer e.mu.RUnlock()
// The endpoint can be read if it's connected, or if it's already closed
// but has some pending unread data.
if s := e.state; !s.connected() && s != StateClose {
if s == StateError {
return 0, tcpip.ControlMessages{}, e.hardError
}
return 0, tcpip.ControlMessages{}, tcpip.ErrInvalidEndpointState
}
e.rcvListMu.Lock()
defer e.rcvListMu.Unlock()
if e.rcvBufUsed == 0 {
if e.rcvClosed || !e.state.connected() {
return 0, tcpip.ControlMessages{}, tcpip.ErrClosedForReceive
}
return 0, tcpip.ControlMessages{}, tcpip.ErrWouldBlock
}
// Make a copy of vec so we can modify the slide headers.
vec = append([][]byte(nil), vec...)
var num uintptr
for s := e.rcvList.Front(); s != nil; s = s.Next() {
views := s.data.Views()
for i := s.viewToDeliver; i < len(views); i++ {
v := views[i]
for len(v) > 0 {
if len(vec) == 0 {
return num, tcpip.ControlMessages{}, nil
}
if len(vec[0]) == 0 {
vec = vec[1:]
continue
}
n := copy(vec[0], v)
v = v[n:]
vec[0] = vec[0][n:]
num += uintptr(n)
}
}
}
return num, tcpip.ControlMessages{}, nil
}
// zeroReceiveWindow checks if the receive window to be announced now would be
// zero, based on the amount of available buffer and the receive window scaling.
//
// It must be called with rcvListMu held.
func (e *endpoint) zeroReceiveWindow(scale uint8) bool {
if e.rcvBufUsed >= e.rcvBufSize {
return true
}
return ((e.rcvBufSize - e.rcvBufUsed) >> scale) == 0
}
// SetSockOpt sets a socket option.
func (e *endpoint) SetSockOpt(opt interface{}) *tcpip.Error {
switch v := opt.(type) {
case tcpip.DelayOption:
if v == 0 {
atomic.StoreUint32(&e.delay, 0)
// Handle delayed data.
e.sndWaker.Assert()
} else {
atomic.StoreUint32(&e.delay, 1)
}
return nil
case tcpip.CorkOption:
if v == 0 {
atomic.StoreUint32(&e.cork, 0)
// Handle the corked data.
e.sndWaker.Assert()
} else {
atomic.StoreUint32(&e.cork, 1)
}
return nil
case tcpip.ReuseAddressOption:
e.mu.Lock()
e.reuseAddr = v != 0
e.mu.Unlock()
return nil
case tcpip.ReusePortOption:
e.mu.Lock()
e.reusePort = v != 0
e.mu.Unlock()
return nil
case tcpip.QuickAckOption:
if v == 0 {
atomic.StoreUint32(&e.slowAck, 1)
} else {
atomic.StoreUint32(&e.slowAck, 0)
}
return nil
case tcpip.MaxSegOption:
userMSS := v
if userMSS < header.TCPMinimumMSS || userMSS > header.TCPMaximumMSS {
return tcpip.ErrInvalidOptionValue
}
e.mu.Lock()
e.userMSS = int(userMSS)
e.mu.Unlock()
e.notifyProtocolGoroutine(notifyMSSChanged)
return nil
case tcpip.ReceiveBufferSizeOption:
// Make sure the receive buffer size is within the min and max
// allowed.
var rs ReceiveBufferSizeOption
size := int(v)
if err := e.stack.TransportProtocolOption(ProtocolNumber, &rs); err == nil {
if size < rs.Min {
size = rs.Min
}
if size > rs.Max {
size = rs.Max
}
}
mask := uint32(notifyReceiveWindowChanged)
e.rcvListMu.Lock()
// Make sure the receive buffer size allows us to send a
// non-zero window size.
scale := uint8(0)
if e.rcv != nil {
scale = e.rcv.rcvWndScale
}
if size>>scale == 0 {
size = 1 << scale
}
// Make sure 2*size doesn't overflow.
if size > math.MaxInt32/2 {
size = math.MaxInt32 / 2
}
e.rcvBufSize = size
e.rcvAutoParams.disabled = true
if e.zeroWindow && !e.zeroReceiveWindow(scale) {
e.zeroWindow = false
mask |= notifyNonZeroReceiveWindow
}
e.rcvListMu.Unlock()
e.notifyProtocolGoroutine(mask)
return nil
case tcpip.SendBufferSizeOption:
// Make sure the send buffer size is within the min and max
// allowed.
size := int(v)
var ss SendBufferSizeOption
if err := e.stack.TransportProtocolOption(ProtocolNumber, &ss); err == nil {
if size < ss.Min {
size = ss.Min
}
if size > ss.Max {
size = ss.Max
}
}
e.sndBufMu.Lock()
e.sndBufSize = size
e.sndBufMu.Unlock()
return nil
case tcpip.V6OnlyOption:
// We only recognize this option on v6 endpoints.
if e.netProto != header.IPv6ProtocolNumber {
return tcpip.ErrInvalidEndpointState
}
e.mu.Lock()
defer e.mu.Unlock()
// We only allow this to be set when we're in the initial state.
if e.state != StateInitial {
return tcpip.ErrInvalidEndpointState
}
e.v6only = v != 0
return nil
case tcpip.KeepaliveEnabledOption:
e.keepalive.Lock()
e.keepalive.enabled = v != 0
e.keepalive.Unlock()
e.notifyProtocolGoroutine(notifyKeepaliveChanged)
return nil
case tcpip.KeepaliveIdleOption:
e.keepalive.Lock()
e.keepalive.idle = time.Duration(v)
e.keepalive.Unlock()
e.notifyProtocolGoroutine(notifyKeepaliveChanged)
return nil
case tcpip.KeepaliveIntervalOption:
e.keepalive.Lock()
e.keepalive.interval = time.Duration(v)
e.keepalive.Unlock()
e.notifyProtocolGoroutine(notifyKeepaliveChanged)
return nil
case tcpip.KeepaliveCountOption:
e.keepalive.Lock()
e.keepalive.count = int(v)
e.keepalive.Unlock()
e.notifyProtocolGoroutine(notifyKeepaliveChanged)
return nil
case tcpip.BroadcastOption:
e.mu.Lock()
e.broadcast = v != 0
e.mu.Unlock()
return nil
case tcpip.CongestionControlOption:
// Query the available cc algorithms in the stack and
// validate that the specified algorithm is actually
// supported in the stack.
var avail tcpip.AvailableCongestionControlOption
if err := e.stack.TransportProtocolOption(ProtocolNumber, &avail); err != nil {
return err
}
availCC := strings.Split(string(avail), " ")
for _, cc := range availCC {
if v == tcpip.CongestionControlOption(cc) {
// Acquire the work mutex as we may need to
// reinitialize the congestion control state.
e.mu.Lock()
state := e.state
e.cc = v
e.mu.Unlock()
switch state {
case StateEstablished:
e.workMu.Lock()
e.mu.Lock()
if e.state == state {
e.snd.cc = e.snd.initCongestionControl(e.cc)
}
e.mu.Unlock()
e.workMu.Unlock()
}
return nil
}
}
// Linux returns ENOENT when an invalid congestion
// control algorithm is specified.
return tcpip.ErrNoSuchFile
default:
return nil
}
}
// readyReceiveSize returns the number of bytes ready to be received.
func (e *endpoint) readyReceiveSize() (int, *tcpip.Error) {
e.mu.RLock()
defer e.mu.RUnlock()
// The endpoint cannot be in listen state.
if e.state == StateListen {
return 0, tcpip.ErrInvalidEndpointState
}
e.rcvListMu.Lock()
defer e.rcvListMu.Unlock()
return e.rcvBufUsed, nil
}
// GetSockOptInt implements tcpip.Endpoint.GetSockOptInt.
func (e *endpoint) GetSockOptInt(opt tcpip.SockOpt) (int, *tcpip.Error) {
switch opt {
case tcpip.ReceiveQueueSizeOption:
return e.readyReceiveSize()
}
return -1, tcpip.ErrUnknownProtocolOption
}
// GetSockOpt implements tcpip.Endpoint.GetSockOpt.
func (e *endpoint) GetSockOpt(opt interface{}) *tcpip.Error {
switch o := opt.(type) {
case tcpip.ErrorOption:
e.lastErrorMu.Lock()
err := e.lastError
e.lastError = nil
e.lastErrorMu.Unlock()
return err
case *tcpip.MaxSegOption:
// This is just stubbed out. Linux never returns the user_mss
// value as it either returns the defaultMSS or returns the
// actual current MSS. Netstack just returns the defaultMSS
// always for now.
*o = header.TCPDefaultMSS
return nil
case *tcpip.SendBufferSizeOption:
e.sndBufMu.Lock()
*o = tcpip.SendBufferSizeOption(e.sndBufSize)
e.sndBufMu.Unlock()
return nil
case *tcpip.ReceiveBufferSizeOption:
e.rcvListMu.Lock()
*o = tcpip.ReceiveBufferSizeOption(e.rcvBufSize)
e.rcvListMu.Unlock()
return nil
case *tcpip.DelayOption:
*o = 0
if v := atomic.LoadUint32(&e.delay); v != 0 {
*o = 1
}
return nil
case *tcpip.CorkOption:
*o = 0
if v := atomic.LoadUint32(&e.cork); v != 0 {
*o = 1
}
return nil
case *tcpip.ReuseAddressOption:
e.mu.RLock()
v := e.reuseAddr
e.mu.RUnlock()
*o = 0
if v {
*o = 1
}
return nil
case *tcpip.ReusePortOption:
e.mu.RLock()
v := e.reusePort
e.mu.RUnlock()
*o = 0
if v {
*o = 1
}
return nil
case *tcpip.QuickAckOption:
*o = 1
if v := atomic.LoadUint32(&e.slowAck); v != 0 {
*o = 0
}
return nil
case *tcpip.V6OnlyOption:
// We only recognize this option on v6 endpoints.
if e.netProto != header.IPv6ProtocolNumber {
return tcpip.ErrUnknownProtocolOption
}
e.mu.Lock()
v := e.v6only
e.mu.Unlock()
*o = 0
if v {
*o = 1
}
return nil
case *tcpip.TCPInfoOption:
*o = tcpip.TCPInfoOption{}
e.mu.RLock()
snd := e.snd
e.mu.RUnlock()
if snd != nil {
snd.rtt.Lock()
o.RTT = snd.rtt.srtt
o.RTTVar = snd.rtt.rttvar
snd.rtt.Unlock()
}
return nil
case *tcpip.KeepaliveEnabledOption:
e.keepalive.Lock()
v := e.keepalive.enabled
e.keepalive.Unlock()
*o = 0
if v {
*o = 1
}
return nil
case *tcpip.KeepaliveIdleOption:
e.keepalive.Lock()
*o = tcpip.KeepaliveIdleOption(e.keepalive.idle)
e.keepalive.Unlock()
return nil
case *tcpip.KeepaliveIntervalOption:
e.keepalive.Lock()
*o = tcpip.KeepaliveIntervalOption(e.keepalive.interval)
e.keepalive.Unlock()
return nil
case *tcpip.KeepaliveCountOption:
e.keepalive.Lock()
*o = tcpip.KeepaliveCountOption(e.keepalive.count)
e.keepalive.Unlock()
return nil
case *tcpip.OutOfBandInlineOption:
// We don't currently support disabling this option.
*o = 1
return nil
case *tcpip.BroadcastOption:
e.mu.Lock()
v := e.broadcast
e.mu.Unlock()
*o = 0
if v {
*o = 1
}
return nil
case *tcpip.CongestionControlOption:
e.mu.Lock()
*o = e.cc
e.mu.Unlock()
return nil
default:
return tcpip.ErrUnknownProtocolOption
}
}
func (e *endpoint) checkV4Mapped(addr *tcpip.FullAddress) (tcpip.NetworkProtocolNumber, *tcpip.Error) {
netProto := e.netProto
if header.IsV4MappedAddress(addr.Addr) {
// Fail if using a v4 mapped address on a v6only endpoint.
if e.v6only {
return 0, tcpip.ErrNoRoute
}
netProto = header.IPv4ProtocolNumber
addr.Addr = addr.Addr[header.IPv6AddressSize-header.IPv4AddressSize:]
if addr.Addr == "\x00\x00\x00\x00" {
addr.Addr = ""
}
}
// Fail if we're bound to an address length different from the one we're
// checking.
if l := len(e.id.LocalAddress); l != 0 && len(addr.Addr) != 0 && l != len(addr.Addr) {
return 0, tcpip.ErrInvalidEndpointState
}
return netProto, nil
}
// Connect connects the endpoint to its peer.
func (e *endpoint) Connect(addr tcpip.FullAddress) *tcpip.Error {
if addr.Addr == "" && addr.Port == 0 {
// AF_UNSPEC isn't supported.
return tcpip.ErrAddressFamilyNotSupported
}
return e.connect(addr, true, true)
}
// connect connects the endpoint to its peer. In the normal non-S/R case, the
// new connection is expected to run the main goroutine and perform handshake.
// In restore of previously connected endpoints, both ends will be passively
// created (so no new handshaking is done); for stack-accepted connections not
// yet accepted by the app, they are restored without running the main goroutine
// here.
func (e *endpoint) connect(addr tcpip.FullAddress, handshake bool, run bool) (err *tcpip.Error) {
e.mu.Lock()
defer e.mu.Unlock()
defer func() {
if err != nil && !err.IgnoreStats() {
e.stack.Stats().TCP.FailedConnectionAttempts.Increment()
}
}()
connectingAddr := addr.Addr
netProto, err := e.checkV4Mapped(&addr)
if err != nil {
return err
}
if e.state.connected() {
// The endpoint is already connected. If caller hasn't been
// notified yet, return success.
if !e.isConnectNotified {
e.isConnectNotified = true
return nil
}
// Otherwise return that it's already connected.
return tcpip.ErrAlreadyConnected
}
nicid := addr.NIC
switch e.state {
case StateBound:
// If we're already bound to a NIC but the caller is requesting
// that we use a different one now, we cannot proceed.
if e.boundNICID == 0 {
break
}
if nicid != 0 && nicid != e.boundNICID {
return tcpip.ErrNoRoute
}
nicid = e.boundNICID
case StateInitial:
// Nothing to do. We'll eventually fill-in the gaps in the ID (if any)
// when we find a route.
case StateConnecting, StateSynSent, StateSynRecv:
// A connection request has already been issued but hasn't completed
// yet.
return tcpip.ErrAlreadyConnecting
case StateError:
return e.hardError
default:
return tcpip.ErrInvalidEndpointState
}
// Find a route to the desired destination.
r, err := e.stack.FindRoute(nicid, e.id.LocalAddress, addr.Addr, netProto, false /* multicastLoop */)
if err != nil {
return err
}
defer r.Release()
origID := e.id
netProtos := []tcpip.NetworkProtocolNumber{netProto}
e.id.LocalAddress = r.LocalAddress
e.id.RemoteAddress = r.RemoteAddress
e.id.RemotePort = addr.Port
if e.id.LocalPort != 0 {
// The endpoint is bound to a port, attempt to register it.
err := e.stack.RegisterTransportEndpoint(nicid, netProtos, ProtocolNumber, e.id, e, e.reusePort)
if err != nil {
return err
}
} else {
// The endpoint doesn't have a local port yet, so try to get
// one. Make sure that it isn't one that will result in the same
// address/port for both local and remote (otherwise this
// endpoint would be trying to connect to itself).
sameAddr := e.id.LocalAddress == e.id.RemoteAddress
if _, err := e.stack.PickEphemeralPort(func(p uint16) (bool, *tcpip.Error) {
if sameAddr && p == e.id.RemotePort {
return false, nil
}
if !e.stack.IsPortAvailable(netProtos, ProtocolNumber, e.id.LocalAddress, p, false) {
return false, nil
}
id := e.id
id.LocalPort = p
switch e.stack.RegisterTransportEndpoint(nicid, netProtos, ProtocolNumber, id, e, e.reusePort) {
case nil:
e.id = id
return true, nil
case tcpip.ErrPortInUse:
return false, nil
default:
return false, err
}
}); err != nil {
return err
}
}
// Remove the port reservation. This can happen when Bind is called
// before Connect: in such a case we don't want to hold on to
// reservations anymore.
if e.isPortReserved {
e.stack.ReleasePort(e.effectiveNetProtos, ProtocolNumber, origID.LocalAddress, origID.LocalPort)
e.isPortReserved = false
}
e.isRegistered = true
e.state = StateConnecting
e.route = r.Clone()
e.boundNICID = nicid
e.effectiveNetProtos = netProtos
e.connectingAddress = connectingAddr
e.initGSO()
// Connect in the restore phase does not perform handshake. Restore its
// connection setting here.
if !handshake {
e.segmentQueue.mu.Lock()
for _, l := range []segmentList{e.segmentQueue.list, e.sndQueue, e.snd.writeList} {
for s := l.Front(); s != nil; s = s.Next() {
s.id = e.id
s.route = r.Clone()
e.sndWaker.Assert()
}
}
e.segmentQueue.mu.Unlock()
e.snd.updateMaxPayloadSize(int(e.route.MTU()), 0)
e.state = StateEstablished
}
if run {
e.workerRunning = true
e.stack.Stats().TCP.ActiveConnectionOpenings.Increment()
go e.protocolMainLoop(handshake) // S/R-SAFE: will be drained before save.
}
return tcpip.ErrConnectStarted
}
// ConnectEndpoint is not supported.
func (*endpoint) ConnectEndpoint(tcpip.Endpoint) *tcpip.Error {
return tcpip.ErrInvalidEndpointState
}
// Shutdown closes the read and/or write end of the endpoint connection to its
// peer.
func (e *endpoint) Shutdown(flags tcpip.ShutdownFlags) *tcpip.Error {
e.mu.Lock()
defer e.mu.Unlock()
e.shutdownFlags |= flags
switch {
case e.state.connected():
// Close for read.
if (e.shutdownFlags & tcpip.ShutdownRead) != 0 {
// Mark read side as closed.
e.rcvListMu.Lock()
e.rcvClosed = true
rcvBufUsed := e.rcvBufUsed
e.rcvListMu.Unlock()
// If we're fully closed and we have unread data we need to abort
// the connection with a RST.
if (e.shutdownFlags&tcpip.ShutdownWrite) != 0 && rcvBufUsed > 0 {
e.notifyProtocolGoroutine(notifyReset)
return nil
}
}
// Close for write.
if (e.shutdownFlags & tcpip.ShutdownWrite) != 0 {
e.sndBufMu.Lock()
if e.sndClosed {
// Already closed.
e.sndBufMu.Unlock()
break
}
// Queue fin segment.
s := newSegmentFromView(&e.route, e.id, nil)
e.sndQueue.PushBack(s)
e.sndBufInQueue++
// Mark endpoint as closed.
e.sndClosed = true
e.sndBufMu.Unlock()
// Tell protocol goroutine to close.
e.sndCloseWaker.Assert()
}
case e.state == StateListen:
// Tell protocolListenLoop to stop.
if flags&tcpip.ShutdownRead != 0 {
e.notifyProtocolGoroutine(notifyClose)
}
default:
return tcpip.ErrNotConnected
}
return nil
}
// Listen puts the endpoint in "listen" mode, which allows it to accept
// new connections.
func (e *endpoint) Listen(backlog int) (err *tcpip.Error) {
e.mu.Lock()
defer e.mu.Unlock()
defer func() {
if err != nil && !err.IgnoreStats() {
e.stack.Stats().TCP.FailedConnectionAttempts.Increment()
}
}()
// Allow the backlog to be adjusted if the endpoint is not shutting down.
// When the endpoint shuts down, it sets workerCleanup to true, and from
// that point onward, acceptedChan is the responsibility of the cleanup()
// method (and should not be touched anywhere else, including here).
if e.state == StateListen && !e.workerCleanup {
// Adjust the size of the channel iff we can fix existing
// pending connections into the new one.
if len(e.acceptedChan) > backlog {
return tcpip.ErrInvalidEndpointState
}
if cap(e.acceptedChan) == backlog {
return nil
}
origChan := e.acceptedChan
e.acceptedChan = make(chan *endpoint, backlog)
close(origChan)
for ep := range origChan {
e.acceptedChan <- ep
}
return nil
}
// Endpoint must be bound before it can transition to listen mode.
if e.state != StateBound {
return tcpip.ErrInvalidEndpointState
}
// Register the endpoint.
if err := e.stack.RegisterTransportEndpoint(e.boundNICID, e.effectiveNetProtos, ProtocolNumber, e.id, e, e.reusePort); err != nil {
return err
}
e.isRegistered = true
e.state = StateListen
if e.acceptedChan == nil {
e.acceptedChan = make(chan *endpoint, backlog)
}
e.workerRunning = true
go e.protocolListenLoop( // S/R-SAFE: drained on save.
seqnum.Size(e.receiveBufferAvailable()))
return nil
}
// startAcceptedLoop sets up required state and starts a goroutine with the
// main loop for accepted connections.
func (e *endpoint) startAcceptedLoop(waiterQueue *waiter.Queue) {
e.waiterQueue = waiterQueue
e.workerRunning = true
go e.protocolMainLoop(false) // S/R-SAFE: drained on save.
}
// Accept returns a new endpoint if a peer has established a connection
// to an endpoint previously set to listen mode.
func (e *endpoint) Accept() (tcpip.Endpoint, *waiter.Queue, *tcpip.Error) {
e.mu.RLock()
defer e.mu.RUnlock()
// Endpoint must be in listen state before it can accept connections.
if e.state != StateListen {
return nil, nil, tcpip.ErrInvalidEndpointState
}
// Get the new accepted endpoint.
var n *endpoint
select {
case n = <-e.acceptedChan:
default:
return nil, nil, tcpip.ErrWouldBlock
}
// Start the protocol goroutine.
wq := &waiter.Queue{}
n.startAcceptedLoop(wq)
Fixes to TCP listen behavior. Netstack listen loop can get stuck if cookies are in-use and the app is slow to accept incoming connections. Further we continue to complete handshake for a connection even if the backlog is full. This creates a problem when a lots of connections come in rapidly and we end up with lots of completed connections just hanging around to be delivered. These fixes change netstack behaviour to mirror what linux does as described here in the following article http://veithen.io/2014/01/01/how-tcp-backlog-works-in-linux.html Now when cookies are not in-use Netstack will silently drop the ACK to a SYN-ACK and not complete the handshake if the backlog is full. This will result in the connection staying in a half-complete state. Eventually the sender will retransmit the ACK and if backlog has space we will transition to a connected state and deliver the endpoint. Similarly when cookies are in use we do not try and create an endpoint unless there is space in the accept queue to accept the newly created endpoint. If there is no space then we again silently drop the ACK as we can just recreate it when the ACK is retransmitted by the peer. We also now use the backlog to cap the size of the SYN-RCVD queue for a given endpoint. So at any time there can be N connections in the backlog and N in a SYN-RCVD state if the application is not accepting connections. Any new SYNs will be dropped. This CL also fixes another small bug where we mark a new endpoint which has not completed handshake as connected. We should wait till handshake successfully completes before marking it connected. Updates #236 PiperOrigin-RevId: 250717817
2019-05-30 17:47:11 +00:00
e.stack.Stats().TCP.PassiveConnectionOpenings.Increment()
return n, wq, nil
}
// Bind binds the endpoint to a specific local port and optionally address.
func (e *endpoint) Bind(addr tcpip.FullAddress) (err *tcpip.Error) {
e.mu.Lock()
defer e.mu.Unlock()
// Don't allow binding once endpoint is not in the initial state
// anymore. This is because once the endpoint goes into a connected or
// listen state, it is already bound.
if e.state != StateInitial {
return tcpip.ErrAlreadyBound
}
e.bindAddress = addr.Addr
netProto, err := e.checkV4Mapped(&addr)
if err != nil {
return err
}
// Expand netProtos to include v4 and v6 if the caller is binding to a
// wildcard (empty) address, and this is an IPv6 endpoint with v6only
// set to false.
netProtos := []tcpip.NetworkProtocolNumber{netProto}
if netProto == header.IPv6ProtocolNumber && !e.v6only && addr.Addr == "" {
netProtos = []tcpip.NetworkProtocolNumber{
header.IPv6ProtocolNumber,
header.IPv4ProtocolNumber,
}
}
port, err := e.stack.ReservePort(netProtos, ProtocolNumber, addr.Addr, addr.Port, e.reusePort)
if err != nil {
return err
}
e.isPortReserved = true
e.effectiveNetProtos = netProtos
e.id.LocalPort = port
// Any failures beyond this point must remove the port registration.
defer func() {
if err != nil {
e.stack.ReleasePort(netProtos, ProtocolNumber, addr.Addr, port)
e.isPortReserved = false
e.effectiveNetProtos = nil
e.id.LocalPort = 0
e.id.LocalAddress = ""
e.boundNICID = 0
}
}()
// If an address is specified, we must ensure that it's one of our
// local addresses.
if len(addr.Addr) != 0 {
nic := e.stack.CheckLocalAddress(addr.NIC, netProto, addr.Addr)
if nic == 0 {
return tcpip.ErrBadLocalAddress
}
e.boundNICID = nic
e.id.LocalAddress = addr.Addr
}
// Mark endpoint as bound.
e.state = StateBound
return nil
}
// GetLocalAddress returns the address to which the endpoint is bound.
func (e *endpoint) GetLocalAddress() (tcpip.FullAddress, *tcpip.Error) {
e.mu.RLock()
defer e.mu.RUnlock()
return tcpip.FullAddress{
Addr: e.id.LocalAddress,
Port: e.id.LocalPort,
NIC: e.boundNICID,
}, nil
}
// GetRemoteAddress returns the address to which the endpoint is connected.
func (e *endpoint) GetRemoteAddress() (tcpip.FullAddress, *tcpip.Error) {
e.mu.RLock()
defer e.mu.RUnlock()
if !e.state.connected() {
return tcpip.FullAddress{}, tcpip.ErrNotConnected
}
return tcpip.FullAddress{
Addr: e.id.RemoteAddress,
Port: e.id.RemotePort,
NIC: e.boundNICID,
}, nil
}
// HandlePacket is called by the stack when new packets arrive to this transport
// endpoint.
func (e *endpoint) HandlePacket(r *stack.Route, id stack.TransportEndpointID, vv buffer.VectorisedView) {
s := newSegment(r, id, vv)
if !s.parse() {
e.stack.Stats().MalformedRcvdPackets.Increment()
e.stack.Stats().TCP.InvalidSegmentsReceived.Increment()
s.decRef()
return
}
if !s.csumValid {
e.stack.Stats().MalformedRcvdPackets.Increment()
e.stack.Stats().TCP.ChecksumErrors.Increment()
s.decRef()
return
}
e.stack.Stats().TCP.ValidSegmentsReceived.Increment()
if (s.flags & header.TCPFlagRst) != 0 {
e.stack.Stats().TCP.ResetsReceived.Increment()
}
// Send packet to worker goroutine.
if e.segmentQueue.enqueue(s) {
e.newSegmentWaker.Assert()
} else {
// The queue is full, so we drop the segment.
e.stack.Stats().DroppedPackets.Increment()
s.decRef()
}
}
// HandleControlPacket implements stack.TransportEndpoint.HandleControlPacket.
func (e *endpoint) HandleControlPacket(id stack.TransportEndpointID, typ stack.ControlType, extra uint32, vv buffer.VectorisedView) {
switch typ {
case stack.ControlPacketTooBig:
e.sndBufMu.Lock()
e.packetTooBigCount++
if v := int(extra); v < e.sndMTU {
e.sndMTU = v
}
e.sndBufMu.Unlock()
e.notifyProtocolGoroutine(notifyMTUChanged)
}
}
// updateSndBufferUsage is called by the protocol goroutine when room opens up
// in the send buffer. The number of newly available bytes is v.
func (e *endpoint) updateSndBufferUsage(v int) {
e.sndBufMu.Lock()
notify := e.sndBufUsed >= e.sndBufSize>>1
e.sndBufUsed -= v
// We only notify when there is half the sndBufSize available after
// a full buffer event occurs. This ensures that we don't wake up
// writers to queue just 1-2 segments and go back to sleep.
notify = notify && e.sndBufUsed < e.sndBufSize>>1
e.sndBufMu.Unlock()
if notify {
e.waiterQueue.Notify(waiter.EventOut)
}
}
// readyToRead is called by the protocol goroutine when a new segment is ready
// to be read, or when the connection is closed for receiving (in which case
// s will be nil).
func (e *endpoint) readyToRead(s *segment) {
e.rcvListMu.Lock()
if s != nil {
s.incRef()
e.rcvBufUsed += s.data.Size()
// Check if the receive window is now closed. If so make sure
// we set the zero window before we deliver the segment to ensure
// that a subsequent read of the segment will correctly trigger
// a non-zero notification.
if avail := e.receiveBufferAvailableLocked(); avail>>e.rcv.rcvWndScale == 0 {
e.zeroWindow = true
}
e.rcvList.PushBack(s)
} else {
e.rcvClosed = true
}
e.rcvListMu.Unlock()
e.waiterQueue.Notify(waiter.EventIn)
}
// receiveBufferAvailableLocked calculates how many bytes are still available
// in the receive buffer.
// rcvListMu must be held when this function is called.
func (e *endpoint) receiveBufferAvailableLocked() int {
// We may use more bytes than the buffer size when the receive buffer
// shrinks.
if e.rcvBufUsed >= e.rcvBufSize {
return 0
}
return e.rcvBufSize - e.rcvBufUsed
}
// receiveBufferAvailable calculates how many bytes are still available in the
// receive buffer.
func (e *endpoint) receiveBufferAvailable() int {
e.rcvListMu.Lock()
available := e.receiveBufferAvailableLocked()
e.rcvListMu.Unlock()
return available
}
func (e *endpoint) receiveBufferSize() int {
e.rcvListMu.Lock()
size := e.rcvBufSize
e.rcvListMu.Unlock()
return size
}
func (e *endpoint) maxReceiveBufferSize() int {
var rs ReceiveBufferSizeOption
if err := e.stack.TransportProtocolOption(ProtocolNumber, &rs); err != nil {
// As a fallback return the hardcoded max buffer size.
return MaxBufferSize
}
return rs.Max
}
// rcvWndScaleForHandshake computes the receive window scale to offer to the
// peer when window scaling is enabled (true by default). If auto-tuning is
// disabled then the window scaling factor is based on the size of the
// receiveBuffer otherwise we use the max permissible receive buffer size to
// compute the scale.
func (e *endpoint) rcvWndScaleForHandshake() int {
bufSizeForScale := e.receiveBufferSize()
e.rcvListMu.Lock()
autoTuningDisabled := e.rcvAutoParams.disabled
e.rcvListMu.Unlock()
if autoTuningDisabled {
return FindWndScale(seqnum.Size(bufSizeForScale))
}
return FindWndScale(seqnum.Size(e.maxReceiveBufferSize()))
}
// updateRecentTimestamp updates the recent timestamp using the algorithm
// described in https://tools.ietf.org/html/rfc7323#section-4.3
func (e *endpoint) updateRecentTimestamp(tsVal uint32, maxSentAck seqnum.Value, segSeq seqnum.Value) {
if e.sendTSOk && seqnum.Value(e.recentTS).LessThan(seqnum.Value(tsVal)) && segSeq.LessThanEq(maxSentAck) {
e.recentTS = tsVal
}
}
// maybeEnableTimestamp marks the timestamp option enabled for this endpoint if
// the SYN options indicate that timestamp option was negotiated. It also
// initializes the recentTS with the value provided in synOpts.TSval.
func (e *endpoint) maybeEnableTimestamp(synOpts *header.TCPSynOptions) {
if synOpts.TS {
e.sendTSOk = true
e.recentTS = synOpts.TSVal
}
}
// timestamp returns the timestamp value to be used in the TSVal field of the
// timestamp option for outgoing TCP segments for a given endpoint.
func (e *endpoint) timestamp() uint32 {
return tcpTimeStamp(e.tsOffset)
}
// tcpTimeStamp returns a timestamp offset by the provided offset. This is
// not inlined above as it's used when SYN cookies are in use and endpoint
// is not created at the time when the SYN cookie is sent.
func tcpTimeStamp(offset uint32) uint32 {
now := time.Now()
return uint32(now.Unix()*1000+int64(now.Nanosecond()/1e6)) + offset
}
// timeStampOffset returns a randomized timestamp offset to be used when sending
// timestamp values in a timestamp option for a TCP segment.
func timeStampOffset() uint32 {
b := make([]byte, 4)
if _, err := rand.Read(b); err != nil {
panic(err)
}
// Initialize a random tsOffset that will be added to the recentTS
// everytime the timestamp is sent when the Timestamp option is enabled.
//
// See https://tools.ietf.org/html/rfc7323#section-5.4 for details on
// why this is required.
//
// NOTE: This is not completely to spec as normally this should be
// initialized in a manner analogous to how sequence numbers are
// randomized per connection basis. But for now this is sufficient.
return uint32(b[0]) | uint32(b[1])<<8 | uint32(b[2])<<16 | uint32(b[3])<<24
}
// maybeEnableSACKPermitted marks the SACKPermitted option enabled for this endpoint
// if the SYN options indicate that the SACK option was negotiated and the TCP
// stack is configured to enable TCP SACK option.
func (e *endpoint) maybeEnableSACKPermitted(synOpts *header.TCPSynOptions) {
var v SACKEnabled
if err := e.stack.TransportProtocolOption(ProtocolNumber, &v); err != nil {
// Stack doesn't support SACK. So just return.
return
}
if bool(v) && synOpts.SACKPermitted {
e.sackPermitted = true
}
}
// maxOptionSize return the maximum size of TCP options.
func (e *endpoint) maxOptionSize() (size int) {
var maxSackBlocks [header.TCPMaxSACKBlocks]header.SACKBlock
options := e.makeOptions(maxSackBlocks[:])
size = len(options)
putOptions(options)
return size
}
// completeState makes a full copy of the endpoint and returns it. This is used
// before invoking the probe. The state returned may not be fully consistent if
// there are intervening syscalls when the state is being copied.
func (e *endpoint) completeState() stack.TCPEndpointState {
var s stack.TCPEndpointState
s.SegTime = time.Now()
// Copy EndpointID.
e.mu.Lock()
s.ID = stack.TCPEndpointID(e.id)
e.mu.Unlock()
// Copy endpoint rcv state.
e.rcvListMu.Lock()
s.RcvBufSize = e.rcvBufSize
s.RcvBufUsed = e.rcvBufUsed
s.RcvClosed = e.rcvClosed
s.RcvAutoParams.MeasureTime = e.rcvAutoParams.measureTime
s.RcvAutoParams.CopiedBytes = e.rcvAutoParams.copied
s.RcvAutoParams.PrevCopiedBytes = e.rcvAutoParams.prevCopied
s.RcvAutoParams.RTT = e.rcvAutoParams.rtt
s.RcvAutoParams.RTTMeasureSeqNumber = e.rcvAutoParams.rttMeasureSeqNumber
s.RcvAutoParams.RTTMeasureTime = e.rcvAutoParams.rttMeasureTime
s.RcvAutoParams.Disabled = e.rcvAutoParams.disabled
e.rcvListMu.Unlock()
// Endpoint TCP Option state.
s.SendTSOk = e.sendTSOk
s.RecentTS = e.recentTS
s.TSOffset = e.tsOffset
s.SACKPermitted = e.sackPermitted
s.SACK.Blocks = make([]header.SACKBlock, e.sack.NumBlocks)
copy(s.SACK.Blocks, e.sack.Blocks[:e.sack.NumBlocks])
s.SACK.ReceivedBlocks, s.SACK.MaxSACKED = e.scoreboard.Copy()
// Copy endpoint send state.
e.sndBufMu.Lock()
s.SndBufSize = e.sndBufSize
s.SndBufUsed = e.sndBufUsed
s.SndClosed = e.sndClosed
s.SndBufInQueue = e.sndBufInQueue
s.PacketTooBigCount = e.packetTooBigCount
s.SndMTU = e.sndMTU
e.sndBufMu.Unlock()
// Copy receiver state.
s.Receiver = stack.TCPReceiverState{
RcvNxt: e.rcv.rcvNxt,
RcvAcc: e.rcv.rcvAcc,
RcvWndScale: e.rcv.rcvWndScale,
PendingBufUsed: e.rcv.pendingBufUsed,
PendingBufSize: e.rcv.pendingBufSize,
}
// Copy sender state.
s.Sender = stack.TCPSenderState{
LastSendTime: e.snd.lastSendTime,
DupAckCount: e.snd.dupAckCount,
FastRecovery: stack.TCPFastRecoveryState{
Active: e.snd.fr.active,
First: e.snd.fr.first,
Last: e.snd.fr.last,
MaxCwnd: e.snd.fr.maxCwnd,
HighRxt: e.snd.fr.highRxt,
RescueRxt: e.snd.fr.rescueRxt,
},
SndCwnd: e.snd.sndCwnd,
Ssthresh: e.snd.sndSsthresh,
SndCAAckCount: e.snd.sndCAAckCount,
Outstanding: e.snd.outstanding,
SndWnd: e.snd.sndWnd,
SndUna: e.snd.sndUna,
SndNxt: e.snd.sndNxt,
RTTMeasureSeqNum: e.snd.rttMeasureSeqNum,
RTTMeasureTime: e.snd.rttMeasureTime,
Closed: e.snd.closed,
RTO: e.snd.rto,
MaxPayloadSize: e.snd.maxPayloadSize,
SndWndScale: e.snd.sndWndScale,
MaxSentAck: e.snd.maxSentAck,
}
e.snd.rtt.Lock()
s.Sender.SRTT = e.snd.rtt.srtt
s.Sender.SRTTInited = e.snd.rtt.srttInited
e.snd.rtt.Unlock()
if cubic, ok := e.snd.cc.(*cubicState); ok {
s.Sender.Cubic = stack.TCPCubicState{
WMax: cubic.wMax,
WLastMax: cubic.wLastMax,
T: cubic.t,
TimeSinceLastCongestion: time.Since(cubic.t),
C: cubic.c,
K: cubic.k,
Beta: cubic.beta,
WC: cubic.wC,
WEst: cubic.wEst,
}
}
return s
}
func (e *endpoint) initGSO() {
if e.route.Capabilities()&stack.CapabilityGSO == 0 {
return
}
gso := &stack.GSO{}
switch e.route.NetProto {
case header.IPv4ProtocolNumber:
gso.Type = stack.GSOTCPv4
gso.L3HdrLen = header.IPv4MinimumSize
case header.IPv6ProtocolNumber:
gso.Type = stack.GSOTCPv6
gso.L3HdrLen = header.IPv6MinimumSize
default:
panic(fmt.Sprintf("Unknown netProto: %v", e.netProto))
}
gso.NeedsCsum = true
gso.CsumOffset = header.TCPChecksumOffset
gso.MaxSize = e.route.GSOMaxSize()
e.gso = gso
}
// State implements tcpip.Endpoint.State. It exports the endpoint's protocol
// state for diagnostics.
func (e *endpoint) State() uint32 {
e.mu.Lock()
defer e.mu.Unlock()
return uint32(e.state)
}
func mssForRoute(r *stack.Route) uint16 {
return uint16(r.MTU() - header.TCPMinimumSize)
}