Use fine grained locks while sending NUD probes

Previously when sending probe messages, we would hold a shared lock
which lead to deadlocks (due to synchronous packet loooping (e.g. pipe
and loopback link endpoints)) and lock contention.

Writing packets may be an expensive operation which could prevent other
goroutines from doing meaningful work if a shared lock is held while
writing packets.

This change upates the NUD timers to not hold shared locks while
sending packets.

PiperOrigin-RevId: 356048697
This commit is contained in:
Ghanan Gowripalan 2021-02-06 12:42:25 -08:00 committed by gVisor bot
parent a83c8585af
commit 4943347137
1 changed files with 157 additions and 97 deletions

View File

@ -70,6 +70,13 @@ const (
Failed
)
type timer struct {
// done indicates to the timer that the timer was stopped.
done *bool
timer tcpip.Timer
}
// neighborEntry implements a neighbor entry's individual node behavior, as per
// RFC 4861 section 7.3.3. Neighbor Unreachability Detection operates in
// parallel with the sending of packets to a neighbor, necessitating the
@ -95,7 +102,8 @@ type neighborEntry struct {
onResolve []func(LinkResolutionResult)
isRouter bool
job *tcpip.Job
timer timer
}
}
@ -138,6 +146,20 @@ func newStaticNeighborEntry(cache *neighborCache, addr tcpip.Address, linkAddr t
return n
}
type remainingCounter struct {
mu struct {
sync.Mutex
remaining uint32
}
}
func (r *remainingCounter) init(max uint32) {
r.mu.Lock()
defer r.mu.Unlock()
r.mu.remaining = max
}
// notifyCompletionLocked notifies those waiting for address resolution, with
// the link address if resolution completed successfully.
//
@ -192,13 +214,16 @@ func (e *neighborEntry) dispatchRemoveEventLocked() {
}
}
// cancelJobLocked cancels the currently scheduled action, if there is one.
// cancelTimerLocked cancels the currently scheduled action, if there is one.
// Entries in Unknown, Stale, or Static state do not have a scheduled action.
//
// Precondition: e.mu MUST be locked.
func (e *neighborEntry) cancelJobLocked() {
if job := e.mu.job; job != nil {
job.Cancel()
func (e *neighborEntry) cancelTimerLocked() {
if e.mu.timer.timer != nil {
e.mu.timer.timer.Stop()
*e.mu.timer.done = true
e.mu.timer = timer{}
}
}
@ -208,7 +233,7 @@ func (e *neighborEntry) cancelJobLocked() {
func (e *neighborEntry) removeLocked() {
e.mu.neigh.UpdatedAtNanos = e.cache.nic.stack.clock.NowNanoseconds()
e.dispatchRemoveEventLocked()
e.cancelJobLocked()
e.cancelTimerLocked()
e.notifyCompletionLocked(false /* succeeded */)
}
@ -218,7 +243,7 @@ func (e *neighborEntry) removeLocked() {
//
// Precondition: e.mu MUST be locked.
func (e *neighborEntry) setStateLocked(next NeighborState) {
e.cancelJobLocked()
e.cancelTimerLocked()
prev := e.mu.neigh.State
e.mu.neigh.State = next
@ -230,47 +255,90 @@ func (e *neighborEntry) setStateLocked(next NeighborState) {
panic(fmt.Sprintf("should never transition to Incomplete with setStateLocked; neigh = %#v, prev state = %s", e.mu.neigh, prev))
case Reachable:
e.mu.job = e.cache.nic.stack.newJob(&e.mu, func() {
e.setStateLocked(Stale)
e.dispatchChangeEventLocked()
})
e.mu.job.Schedule(e.nudState.ReachableTime())
// Protected by e.mu.
done := false
case Delay:
e.mu.job = e.cache.nic.stack.newJob(&e.mu, func() {
e.setStateLocked(Probe)
e.dispatchChangeEventLocked()
})
e.mu.job.Schedule(config.DelayFirstProbeTime)
e.mu.timer = timer{
done: &done,
timer: e.cache.nic.stack.Clock().AfterFunc(e.nudState.ReachableTime(), func() {
e.mu.Lock()
defer e.mu.Unlock()
case Probe:
var retryCounter uint32
var sendUnicastProbe func()
if done {
// The timer was stopped because the entry changed state.
return
}
sendUnicastProbe = func() {
if retryCounter == config.MaxUnicastProbes {
e.dispatchRemoveEventLocked()
e.setStateLocked(Failed)
return
}
if err := e.cache.linkRes.LinkAddressRequest(e.mu.neigh.Addr, "" /* localAddr */, e.mu.neigh.LinkAddr); err != nil {
e.dispatchRemoveEventLocked()
e.setStateLocked(Failed)
return
}
retryCounter++
e.mu.job = e.cache.nic.stack.newJob(&e.mu, sendUnicastProbe)
e.mu.job.Schedule(config.RetransmitTimer)
e.setStateLocked(Stale)
e.dispatchChangeEventLocked()
}),
}
case Delay:
// Protected by e.mu.
done := false
e.mu.timer = timer{
done: &done,
timer: e.cache.nic.stack.Clock().AfterFunc(config.DelayFirstProbeTime, func() {
e.mu.Lock()
defer e.mu.Unlock()
if done {
// The timer was stopped because the entry changed state.
return
}
e.setStateLocked(Probe)
e.dispatchChangeEventLocked()
}),
}
case Probe:
// Protected by e.mu.
done := false
var remaining remainingCounter
remaining.init(config.MaxUnicastProbes)
addr := e.mu.neigh.Addr
linkAddr := e.mu.neigh.LinkAddr
// Send a probe in another gorountine to free this thread of execution
// for finishing the state transition. This is necessary to avoid
// deadlock where sending and processing probes are done synchronously,
// such as loopback and integration tests.
e.mu.job = e.cache.nic.stack.newJob(&e.mu, sendUnicastProbe)
e.mu.job.Schedule(immediateDuration)
// for finishing the state transition. This is necessary to escape the
// currently held lock so we can send the probe message without holding
// a shared lock.
e.mu.timer = timer{
done: &done,
timer: e.cache.nic.stack.Clock().AfterFunc(0, func() {
// Okay to hold this lock while writing packets since we use a different
// lock per probe timer so there will not be any lock contention.
remaining.mu.Lock()
defer remaining.mu.Unlock()
var err tcpip.Error
timedoutResolution := remaining.mu.remaining == 0
if !timedoutResolution {
err = e.cache.linkRes.LinkAddressRequest(addr, "" /* localAddr */, linkAddr)
}
e.mu.Lock()
defer e.mu.Unlock()
if done {
// The timer was stopped because the entry changed state.
return
}
if timedoutResolution || err != nil {
e.dispatchRemoveEventLocked()
e.setStateLocked(Failed)
return
}
remaining.mu.remaining--
e.mu.timer.timer.Reset(config.RetransmitTimer)
}),
}
case Failed:
e.notifyCompletionLocked(false /* succeeded */)
@ -300,66 +368,58 @@ func (e *neighborEntry) handlePacketQueuedLocked(localAddr tcpip.Address) {
e.mu.neigh.UpdatedAtNanos = e.cache.nic.stack.clock.NowNanoseconds()
e.dispatchAddEventLocked()
config := e.nudState.Config()
var retryCounter uint32
var sendMulticastProbe func()
// Protected by e.mu.
done := false
sendMulticastProbe = func() {
if retryCounter == config.MaxMulticastProbes {
// "If no Neighbor Advertisement is received after
// MAX_MULTICAST_SOLICIT solicitations, address resolution has failed.
// The sender MUST return ICMP destination unreachable indications with
// code 3 (Address Unreachable) for each packet queued awaiting address
// resolution." - RFC 4861 section 7.2.2
//
// There is no need to send an ICMP destination unreachable indication
// since the failure to resolve the address is expected to only occur
// on this node. Thus, redirecting traffic is currently not supported.
//
// "If the error occurs on a node other than the node originating the
// packet, an ICMP error message is generated. If the error occurs on
// the originating node, an implementation is not required to actually
// create and send an ICMP error packet to the source, as long as the
// upper-layer sender is notified through an appropriate mechanism
// (e.g. return value from a procedure call). Note, however, that an
// implementation may find it convenient in some cases to return errors
// to the sender by taking the offending packet, generating an ICMP
// error message, and then delivering it (locally) through the generic
// error-handling routines." - RFC 4861 section 2.1
e.dispatchRemoveEventLocked()
e.setStateLocked(Failed)
return
}
// As per RFC 4861 section 7.2.2:
//
// If the source address of the packet prompting the solicitation is the
// same as one of the addresses assigned to the outgoing interface, that
// address SHOULD be placed in the IP Source Address of the outgoing
// solicitation.
//
if err := e.cache.linkRes.LinkAddressRequest(e.mu.neigh.Addr, localAddr, ""); err != nil {
// There is no need to log the error here; the NUD implementation may
// assume a working link. A valid link should be the responsibility of
// the NIC/stack.LinkEndpoint.
e.dispatchRemoveEventLocked()
e.setStateLocked(Failed)
return
}
retryCounter++
e.mu.job = e.cache.nic.stack.newJob(&e.mu, sendMulticastProbe)
e.mu.job.Schedule(config.RetransmitTimer)
}
var remaining remainingCounter
remaining.init(config.MaxMulticastProbes)
addr := e.mu.neigh.Addr
// Send a probe in another gorountine to free this thread of execution
// for finishing the state transition. This is necessary to avoid
// deadlock where sending and processing probes are done synchronously,
// such as loopback and integration tests.
e.mu.job = e.cache.nic.stack.newJob(&e.mu, sendMulticastProbe)
e.mu.job.Schedule(immediateDuration)
// for finishing the state transition. This is necessary to escape the
// currently held lock so we can send the probe message without holding
// a shared lock.
e.mu.timer = timer{
done: &done,
timer: e.cache.nic.stack.Clock().AfterFunc(0, func() {
// Okay to hold this lock while writing packets since we use a different
// lock per probe timer so there will not be any lock contention.
remaining.mu.Lock()
defer remaining.mu.Unlock()
var err tcpip.Error
timedoutResolution := remaining.mu.remaining == 0
if !timedoutResolution {
// As per RFC 4861 section 7.2.2:
//
// If the source address of the packet prompting the solicitation is
// the same as one of the addresses assigned to the outgoing interface,
// that address SHOULD be placed in the IP Source Address of the
// outgoing solicitation.
//
err = e.cache.linkRes.LinkAddressRequest(addr, localAddr, "" /* linkAddr */)
}
e.mu.Lock()
defer e.mu.Unlock()
if done {
// The timer was stopped because the entry changed state.
return
}
if timedoutResolution || err != nil {
e.dispatchRemoveEventLocked()
e.setStateLocked(Failed)
return
}
remaining.mu.remaining--
e.mu.timer.timer.Reset(config.RetransmitTimer)
}),
}
case Stale:
e.setStateLocked(Delay)