Cleanup multicast routing table.

The purpose of this change is twofold:

1. Simplify AddInstalledRoute by returning a PacketBuffer slice instead of a
PendingRoute. This obviates PendingRoute.Dequeue and PendingRoute.IsEmpty.
2. Address PacketBuffer lifetime issues. With this change, the routing table
will call PacketBuffer.Clone() if it decides to enqueue the packet. When
AddInstalledRoute is called, the caller will then assume ownership of the
relevant packets and is expected to call PacketBuffer.DecRef() after
forwarding.

Updates #7338.

PiperOrigin-RevId: 446740297
This commit is contained in:
Nate Hurley 2022-05-05 09:43:58 -07:00 committed by gVisor bot
parent b86c98c82b
commit 09b7a17066
3 changed files with 71 additions and 64 deletions

View File

@ -50,6 +50,7 @@ func Example() {
// Create a route table from a specified config.
table := multicast.RouteTable{}
defer table.Close()
config := multicast.DefaultConfig(clock)
if err := table.Init(config); err != nil {
@ -86,20 +87,13 @@ func Example() {
// To transition a pending route to the installed state, call:
route := table.NewInstalledRoute(inputNICID, defaultOutgoingInterfaces)
pendingRoute, ok := table.AddInstalledRoute(routeKey, route)
if !ok {
return
}
pendingPackets := table.AddInstalledRoute(routeKey, route)
// If there was a pending route, then the caller is responsible for
// flushing any pending packets.
for !pendingRoute.IsEmpty() {
pkt, err := pendingRoute.Dequeue()
if err != nil {
panic(fmt.Sprintf("pendingRoute.Dequeue() = (_, %s)", err))
}
for _, pkt := range pendingPackets {
forwardPkt(pkt, route)
pkt.DecRef()
}
// To obtain the last used time of the route, call:

View File

@ -151,29 +151,16 @@ type PendingRoute struct {
expiration tcpip.MonotonicTime
}
func (p *PendingRoute) releasePackets() {
for _, pkt := range p.packets {
pkt.DecRef()
}
}
func (p *PendingRoute) isExpired(currentTime tcpip.MonotonicTime) bool {
return currentTime.After(p.expiration)
}
// Dequeue removes the first element in the queue and returns it.
//
// If the queue is empty, then an error will be returned.
func (p *PendingRoute) Dequeue() (*stack.PacketBuffer, error) {
if len(p.packets) == 0 {
return nil, errors.New("dequeue called on queue empty")
}
val := p.packets[0]
p.packets[0] = nil
p.packets = p.packets[1:]
return val, nil
}
// IsEmpty returns true if the queue contains no more elements. Otherwise,
// returns false.
func (p *PendingRoute) IsEmpty() bool {
return len(p.packets) == 0
}
const (
// DefaultMaxPendingQueueSize corresponds to the number of elements that can
// be in the packet queue for a pending route.
@ -248,6 +235,24 @@ func (r *RouteTable) Init(config Config) error {
return nil
}
// Close cleans up resources held by the table.
//
// Calling this will stop the cleanup routine and release any packets owned by
// the table.
func (r *RouteTable) Close() {
if r.cleanupPendingRoutesTimer != nil {
r.cleanupPendingRoutesTimer.Stop()
}
r.pendingMu.Lock()
defer r.pendingMu.Unlock()
for key, route := range r.pendingRoutes {
delete(r.pendingRoutes, key)
route.releasePackets()
}
}
func (r *RouteTable) cleanupPendingRoutes() {
currentTime := r.config.Clock.NowMonotonic()
r.pendingMu.Lock()
@ -256,6 +261,7 @@ func (r *RouteTable) cleanupPendingRoutes() {
for key, route := range r.pendingRoutes {
if route.isExpired(currentTime) {
delete(r.pendingRoutes, key)
route.releasePackets()
}
}
r.cleanupPendingRoutesTimer.Reset(DefaultCleanupInterval)
@ -324,9 +330,9 @@ func (e PendingRouteState) String() string {
// GetRouteOrInsertPending attempts to fetch the installed route that matches
// the provided key.
//
// If no matching installed route is found, then the pkt is queued in a
// pending route. The GetRouteResult.PendingRouteState will indicate whether
// the pkt was queued in a new pending route or an existing one.
// If no matching installed route is found, then the pkt is cloned and queued
// in a pending route. The GetRouteResult.PendingRouteState will indicate
// whether the pkt was queued in a new pending route or an existing one.
//
// If the relevant pending route queue is at max capacity, then
// ErrNoBufferSpace is returned. In such a case, callers are typically expected
@ -349,7 +355,7 @@ func (r *RouteTable) GetRouteOrInsertPending(key RouteKey, pkt *stack.PacketBuff
// https://github.com/torvalds/linux/blob/ae085d7f936/net/ipv4/ipmr.c#L1147
return GetRouteResult{}, ErrNoBufferSpace
}
pendingRoute.packets = append(pendingRoute.packets, pkt)
pendingRoute.packets = append(pendingRoute.packets, pkt.Clone())
r.pendingRoutes[key] = pendingRoute
return GetRouteResult{PendingRouteState: pendingRouteState, InstalledRoute: nil}, nil
@ -365,13 +371,11 @@ func (r *RouteTable) getOrCreatePendingRouteRLocked(key RouteKey) (PendingRoute,
// AddInstalledRoute adds the provided route to the table.
//
// Returns true if the route was previously in the pending state. Otherwise,
// returns false.
//
// If the route was previously pending, then the caller is responsible for
// flushing the returned pending route packet queue. Conversely, if the route
// was not pending, then any existing installed route will be overwritten.
func (r *RouteTable) AddInstalledRoute(key RouteKey, route *InstalledRoute) (PendingRoute, bool) {
// Packets that were queued while the route was in the pending state are
// returned. The caller assumes ownership of these packets and is responsible
// for forwarding and relasing them. If an installed route already exists for
// the provided key, then it is overwritten.
func (r *RouteTable) AddInstalledRoute(key RouteKey, route *InstalledRoute) []*stack.PacketBuffer {
r.installedMu.Lock()
defer r.installedMu.Unlock()
r.installedRoutes[key] = route
@ -384,9 +388,11 @@ func (r *RouteTable) AddInstalledRoute(key RouteKey, route *InstalledRoute) (Pen
// Ignore the pending route if it is expired. It may be in this state since
// the cleanup process is only run periodically.
if !ok || pendingRoute.isExpired(r.config.Clock.NowMonotonic()) {
return PendingRoute{}, false
pendingRoute.releasePackets()
return nil
}
return pendingRoute, true
return pendingRoute.packets
}
// RemoveInstalledRoute deletes any installed route that matches the provided

View File

@ -118,6 +118,7 @@ func TestInit(t *testing.T) {
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
table := RouteTable{}
defer table.Close()
err := table.Init(tc.config)
if tc.invokeTwice {
@ -133,6 +134,7 @@ func TestInit(t *testing.T) {
func TestNewInstalledRoute(t *testing.T) {
table := RouteTable{}
defer table.Close()
clock := faketime.NewManualClock()
clock.Advance(5 * time.Second)
@ -151,6 +153,7 @@ func TestNewInstalledRoute(t *testing.T) {
func TestPendingRouteStates(t *testing.T) {
table := RouteTable{}
defer table.Close()
config := defaultConfig(withMaxPendingQueueSize(2))
if err := table.Init(config); err != nil {
t.Fatalf("table.Init(%#v): %s", config, err)
@ -213,6 +216,7 @@ func TestPendingRouteExpiration(t *testing.T) {
clock := faketime.NewManualClock()
table := RouteTable{}
defer table.Close()
config := defaultConfig(withClock(clock))
if err := table.Init(config); err != nil {
@ -242,15 +246,24 @@ func TestAddInstalledRouteWithPending(t *testing.T) {
pkt := newPacketBuffer("foo")
defer pkt.DecRef()
cmpOpts := []cmp.Option{
cmp.Transformer("AsViews", func(pkt *stack.PacketBuffer) []buffer.View {
return pkt.Views()
}),
cmp.Comparer(func(a []buffer.View, b []buffer.View) bool {
return cmp.Equal(a, b)
}),
}
testCases := []struct {
name string
advance time.Duration
want *stack.PacketBuffer
want []*stack.PacketBuffer
}{
{
name: "not expired",
advance: DefaultPendingRouteExpiration,
want: pkt,
want: []*stack.PacketBuffer{pkt},
},
{
name: "expired",
@ -264,6 +277,7 @@ func TestAddInstalledRouteWithPending(t *testing.T) {
clock := faketime.NewManualClock()
table := RouteTable{}
defer table.Close()
config := defaultConfig(withClock(clock))
if err := table.Init(config); err != nil {
@ -279,26 +293,14 @@ func TestAddInstalledRouteWithPending(t *testing.T) {
clock.Advance(test.advance)
route := table.NewInstalledRoute(inputNICID, defaultOutgoingInterfaces)
pendingRoute, wasPending := table.AddInstalledRoute(defaultRouteKey, route)
pendingPackets := table.AddInstalledRoute(defaultRouteKey, route)
if test.want == nil {
if wasPending {
t.Errorf("got table.AddInstalledRoute(%#v, %#v) = (%#v, true), want = (_, false)", defaultRouteKey, route, pendingRoute)
}
} else {
if !wasPending {
t.Fatalf("got table.AddInstalledRoute(%#v, %#v) = (nil, false), want = (_, true)", defaultRouteKey, route)
}
if diff := cmp.Diff(test.want, pendingPackets, cmpOpts...); diff != "" {
t.Errorf("tableAddInstalledRoute(%#v, %#v) mismatch (-want +got):\n%s", defaultRouteKey, route, diff)
}
pkt, err := pendingRoute.Dequeue()
if err != nil {
t.Fatalf("got pendingRoute.Dequeue() = (_, %v), want = (_, nil)", err)
}
if !cmp.Equal(test.want.Views(), pkt.Views()) {
t.Errorf("got pkt = %v, want = %v", pkt.Views(), test.want.Views())
}
for _, pendingPkt := range pendingPackets {
pendingPkt.DecRef()
}
// Verify that the pending route is actually deleted.
@ -313,6 +315,7 @@ func TestAddInstalledRouteWithPending(t *testing.T) {
func TestAddInstalledRouteWithNoPending(t *testing.T) {
table := RouteTable{}
defer table.Close()
config := defaultConfig()
if err := table.Init(config); err != nil {
t.Fatalf("table.Init(%#v): %s", config, err)
@ -324,8 +327,8 @@ func TestAddInstalledRouteWithNoPending(t *testing.T) {
pkt := newPacketBuffer("hello")
defer pkt.DecRef()
for _, route := range [...]*InstalledRoute{firstRoute, secondRoute} {
if pendingRoute, wasPending := table.AddInstalledRoute(defaultRouteKey, route); wasPending {
t.Errorf("got table.AddInstalledRoute(%#v, %#v) = (%#v, true), want = (_, false)", defaultRouteKey, route, pendingRoute)
if pendingPackets := table.AddInstalledRoute(defaultRouteKey, route); pendingPackets != nil {
t.Errorf("got table.AddInstalledRoute(%#v, %#v) = %#v, want = false", defaultRouteKey, route, pendingPackets)
}
// AddInstalledRoute is invoked for the same routeKey two times. Verify
@ -349,6 +352,7 @@ func TestAddInstalledRouteWithNoPending(t *testing.T) {
func TestRemoveInstalledRoute(t *testing.T) {
table := RouteTable{}
defer table.Close()
config := defaultConfig()
if err := table.Init(config); err != nil {
t.Fatalf("table.Init(%#v): %s", config, err)
@ -378,6 +382,7 @@ func TestRemoveInstalledRoute(t *testing.T) {
func TestRemoveInstalledRouteWithNoMatchingRoute(t *testing.T) {
table := RouteTable{}
defer table.Close()
config := defaultConfig()
if err := table.Init(config); err != nil {
t.Fatalf("table.Init(%#v): %s", config, err)
@ -390,6 +395,7 @@ func TestRemoveInstalledRouteWithNoMatchingRoute(t *testing.T) {
func TestGetLastUsedTimestampWithNoMatchingRoute(t *testing.T) {
table := RouteTable{}
defer table.Close()
config := defaultConfig()
if err := table.Init(config); err != nil {
t.Fatalf("table.Init(%#v): %s", config, err)
@ -427,6 +433,7 @@ func TestSetLastUsedTimestamp(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
table := RouteTable{}
defer table.Close()
config := defaultConfig(withClock(clock))
if err := table.Init(config); err != nil {
t.Fatalf("table.Init(%#v): %s", config, err)