gvisor/pkg/syncevent/broadcaster.go

221 lines
6.6 KiB
Go

// Copyright 2020 The gVisor Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package syncevent
import (
"gvisor.dev/gvisor/pkg/sync"
)
// Broadcaster is an implementation of Source that supports any number of
// subscribed Receivers.
//
// The zero value of Broadcaster is valid and has no subscribed Receivers.
// Broadcaster is not copyable by value.
//
// All Broadcaster methods may be called concurrently from multiple goroutines.
type Broadcaster struct {
// Broadcaster is implemented as a hash table where keys are assigned by
// the Broadcaster and returned as SubscriptionIDs, making it safe to use
// the identity function for hashing. The hash table resolves collisions
// using linear probing and features Robin Hood insertion and backward
// shift deletion in order to support a relatively high load factor
// efficiently, which matters since the cost of Broadcast is linear in the
// size of the table.
// mu protects the following fields.
mu sync.Mutex
// Invariants: len(table) is 0 or a power of 2.
table []broadcasterSlot
// load is the number of entries in table with receiver != nil.
load int
lastID SubscriptionID
}
type broadcasterSlot struct {
// Invariants: If receiver == nil, then filter == NoEvents and id == 0.
// Otherwise, id != 0.
receiver *Receiver
filter Set
id SubscriptionID
}
const (
broadcasterMinNonZeroTableSize = 2 // must be a power of 2 > 1
broadcasterMaxLoadNum = 13
broadcasterMaxLoadDen = 16
)
// SubscribeEvents implements Source.SubscribeEvents.
func (b *Broadcaster) SubscribeEvents(r *Receiver, filter Set) SubscriptionID {
b.mu.Lock()
// Assign an ID for this subscription.
b.lastID++
id := b.lastID
// Expand the table if over the maximum load factor:
//
// load / len(b.table) > broadcasterMaxLoadNum / broadcasterMaxLoadDen
// load * broadcasterMaxLoadDen > broadcasterMaxLoadNum * len(b.table)
b.load++
if (b.load * broadcasterMaxLoadDen) > (broadcasterMaxLoadNum * len(b.table)) {
// Double the number of slots in the new table.
newlen := broadcasterMinNonZeroTableSize
if len(b.table) != 0 {
newlen = 2 * len(b.table)
}
if newlen <= cap(b.table) {
// Reuse excess capacity in the current table, moving entries not
// already in their first-probed positions to better ones.
newtable := b.table[:newlen]
newmask := uint64(newlen - 1)
for i := range b.table {
if b.table[i].receiver != nil && uint64(b.table[i].id)&newmask != uint64(i) {
entry := b.table[i]
b.table[i] = broadcasterSlot{}
broadcasterTableInsert(newtable, entry.id, entry.receiver, entry.filter)
}
}
b.table = newtable
} else {
newtable := make([]broadcasterSlot, newlen)
// Copy existing entries to the new table.
for i := range b.table {
if b.table[i].receiver != nil {
broadcasterTableInsert(newtable, b.table[i].id, b.table[i].receiver, b.table[i].filter)
}
}
// Switch to the new table.
b.table = newtable
}
}
broadcasterTableInsert(b.table, id, r, filter)
b.mu.Unlock()
return id
}
// Preconditions:
// * table must not be full.
// * len(table) is a power of 2.
func broadcasterTableInsert(table []broadcasterSlot, id SubscriptionID, r *Receiver, filter Set) {
entry := broadcasterSlot{
receiver: r,
filter: filter,
id: id,
}
mask := uint64(len(table) - 1)
i := uint64(id) & mask
disp := uint64(0)
for {
if table[i].receiver == nil {
table[i] = entry
return
}
// If we've been displaced farther from our first-probed slot than the
// element stored in this one, swap elements and switch to inserting
// the replaced one. (This is Robin Hood insertion.)
slotDisp := (i - uint64(table[i].id)) & mask
if disp > slotDisp {
table[i], entry = entry, table[i]
disp = slotDisp
}
i = (i + 1) & mask
disp++
}
}
// UnsubscribeEvents implements Source.UnsubscribeEvents.
func (b *Broadcaster) UnsubscribeEvents(id SubscriptionID) {
b.mu.Lock()
mask := uint64(len(b.table) - 1)
i := uint64(id) & mask
for {
if b.table[i].id == id {
// Found the element to remove. Move all subsequent elements
// backward until we either find an empty slot, or an element that
// is already in its first-probed slot. (This is backward shift
// deletion.)
for {
next := (i + 1) & mask
if b.table[next].receiver == nil {
break
}
if uint64(b.table[next].id)&mask == next {
break
}
b.table[i] = b.table[next]
i = next
}
b.table[i] = broadcasterSlot{}
break
}
i = (i + 1) & mask
}
// If a table 1/4 of the current size would still be at or under the
// maximum load factor (i.e. the current table size is at least two
// expansions bigger than necessary), halve the size of the table to reduce
// the cost of Broadcast. Since we are concerned with iteration time and
// not memory usage, reuse the existing slice to reduce future allocations
// from table re-expansion.
b.load--
if len(b.table) > broadcasterMinNonZeroTableSize && (b.load*(4*broadcasterMaxLoadDen)) <= (broadcasterMaxLoadNum*len(b.table)) {
newlen := len(b.table) / 2
newtable := b.table[:newlen]
for i := newlen; i < len(b.table); i++ {
if b.table[i].receiver != nil {
broadcasterTableInsert(newtable, b.table[i].id, b.table[i].receiver, b.table[i].filter)
b.table[i] = broadcasterSlot{}
}
}
b.table = newtable
}
b.mu.Unlock()
}
// Broadcast notifies all Receivers subscribed to the Broadcaster of the subset
// of events to which they subscribed. The order in which Receivers are
// notified is unspecified.
func (b *Broadcaster) Broadcast(events Set) {
b.mu.Lock()
for i := range b.table {
if intersection := events & b.table[i].filter; intersection != 0 {
// We don't need to check if broadcasterSlot.receiver is nil, since
// if it is then broadcasterSlot.filter is 0.
b.table[i].receiver.Notify(intersection)
}
}
b.mu.Unlock()
}
// FilteredEvents returns the set of events for which Broadcast will notify at
// least one Receiver, i.e. the union of filters for all subscribed Receivers.
func (b *Broadcaster) FilteredEvents() Set {
var es Set
b.mu.Lock()
for i := range b.table {
es |= b.table[i].filter
}
b.mu.Unlock()
return es
}