219 lines
6.6 KiB
Go
219 lines
6.6 KiB
Go
|
// Copyright 2020 The gVisor Authors.
|
||
|
//
|
||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||
|
// you may not use this file except in compliance with the License.
|
||
|
// You may obtain a copy of the License at
|
||
|
//
|
||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||
|
//
|
||
|
// Unless required by applicable law or agreed to in writing, software
|
||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||
|
// See the License for the specific language governing permissions and
|
||
|
// limitations under the License.
|
||
|
|
||
|
package syncevent
|
||
|
|
||
|
import (
|
||
|
"gvisor.dev/gvisor/pkg/sync"
|
||
|
)
|
||
|
|
||
|
// Broadcaster is an implementation of Source that supports any number of
|
||
|
// subscribed Receivers.
|
||
|
//
|
||
|
// The zero value of Broadcaster is valid and has no subscribed Receivers.
|
||
|
// Broadcaster is not copyable by value.
|
||
|
//
|
||
|
// All Broadcaster methods may be called concurrently from multiple goroutines.
|
||
|
type Broadcaster struct {
|
||
|
// Broadcaster is implemented as a hash table where keys are assigned by
|
||
|
// the Broadcaster and returned as SubscriptionIDs, making it safe to use
|
||
|
// the identity function for hashing. The hash table resolves collisions
|
||
|
// using linear probing and features Robin Hood insertion and backward
|
||
|
// shift deletion in order to support a relatively high load factor
|
||
|
// efficiently, which matters since the cost of Broadcast is linear in the
|
||
|
// size of the table.
|
||
|
|
||
|
// mu protects the following fields.
|
||
|
mu sync.Mutex
|
||
|
|
||
|
// Invariants: len(table) is 0 or a power of 2.
|
||
|
table []broadcasterSlot
|
||
|
|
||
|
// load is the number of entries in table with receiver != nil.
|
||
|
load int
|
||
|
|
||
|
lastID SubscriptionID
|
||
|
}
|
||
|
|
||
|
type broadcasterSlot struct {
|
||
|
// Invariants: If receiver == nil, then filter == NoEvents and id == 0.
|
||
|
// Otherwise, id != 0.
|
||
|
receiver *Receiver
|
||
|
filter Set
|
||
|
id SubscriptionID
|
||
|
}
|
||
|
|
||
|
const (
|
||
|
broadcasterMinNonZeroTableSize = 2 // must be a power of 2 > 1
|
||
|
|
||
|
broadcasterMaxLoadNum = 13
|
||
|
broadcasterMaxLoadDen = 16
|
||
|
)
|
||
|
|
||
|
// SubscribeEvents implements Source.SubscribeEvents.
|
||
|
func (b *Broadcaster) SubscribeEvents(r *Receiver, filter Set) SubscriptionID {
|
||
|
b.mu.Lock()
|
||
|
|
||
|
// Assign an ID for this subscription.
|
||
|
b.lastID++
|
||
|
id := b.lastID
|
||
|
|
||
|
// Expand the table if over the maximum load factor:
|
||
|
//
|
||
|
// load / len(b.table) > broadcasterMaxLoadNum / broadcasterMaxLoadDen
|
||
|
// load * broadcasterMaxLoadDen > broadcasterMaxLoadNum * len(b.table)
|
||
|
b.load++
|
||
|
if (b.load * broadcasterMaxLoadDen) > (broadcasterMaxLoadNum * len(b.table)) {
|
||
|
// Double the number of slots in the new table.
|
||
|
newlen := broadcasterMinNonZeroTableSize
|
||
|
if len(b.table) != 0 {
|
||
|
newlen = 2 * len(b.table)
|
||
|
}
|
||
|
if newlen <= cap(b.table) {
|
||
|
// Reuse excess capacity in the current table, moving entries not
|
||
|
// already in their first-probed positions to better ones.
|
||
|
newtable := b.table[:newlen]
|
||
|
newmask := uint64(newlen - 1)
|
||
|
for i := range b.table {
|
||
|
if b.table[i].receiver != nil && uint64(b.table[i].id)&newmask != uint64(i) {
|
||
|
entry := b.table[i]
|
||
|
b.table[i] = broadcasterSlot{}
|
||
|
broadcasterTableInsert(newtable, entry.id, entry.receiver, entry.filter)
|
||
|
}
|
||
|
}
|
||
|
b.table = newtable
|
||
|
} else {
|
||
|
newtable := make([]broadcasterSlot, newlen)
|
||
|
// Copy existing entries to the new table.
|
||
|
for i := range b.table {
|
||
|
if b.table[i].receiver != nil {
|
||
|
broadcasterTableInsert(newtable, b.table[i].id, b.table[i].receiver, b.table[i].filter)
|
||
|
}
|
||
|
}
|
||
|
// Switch to the new table.
|
||
|
b.table = newtable
|
||
|
}
|
||
|
}
|
||
|
|
||
|
broadcasterTableInsert(b.table, id, r, filter)
|
||
|
b.mu.Unlock()
|
||
|
return id
|
||
|
}
|
||
|
|
||
|
// Preconditions: table must not be full. len(table) is a power of 2.
|
||
|
func broadcasterTableInsert(table []broadcasterSlot, id SubscriptionID, r *Receiver, filter Set) {
|
||
|
entry := broadcasterSlot{
|
||
|
receiver: r,
|
||
|
filter: filter,
|
||
|
id: id,
|
||
|
}
|
||
|
mask := uint64(len(table) - 1)
|
||
|
i := uint64(id) & mask
|
||
|
disp := uint64(0)
|
||
|
for {
|
||
|
if table[i].receiver == nil {
|
||
|
table[i] = entry
|
||
|
return
|
||
|
}
|
||
|
// If we've been displaced farther from our first-probed slot than the
|
||
|
// element stored in this one, swap elements and switch to inserting
|
||
|
// the replaced one. (This is Robin Hood insertion.)
|
||
|
slotDisp := (i - uint64(table[i].id)) & mask
|
||
|
if disp > slotDisp {
|
||
|
table[i], entry = entry, table[i]
|
||
|
disp = slotDisp
|
||
|
}
|
||
|
i = (i + 1) & mask
|
||
|
disp++
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// UnsubscribeEvents implements Source.UnsubscribeEvents.
|
||
|
func (b *Broadcaster) UnsubscribeEvents(id SubscriptionID) {
|
||
|
b.mu.Lock()
|
||
|
|
||
|
mask := uint64(len(b.table) - 1)
|
||
|
i := uint64(id) & mask
|
||
|
for {
|
||
|
if b.table[i].id == id {
|
||
|
// Found the element to remove. Move all subsequent elements
|
||
|
// backward until we either find an empty slot, or an element that
|
||
|
// is already in its first-probed slot. (This is backward shift
|
||
|
// deletion.)
|
||
|
for {
|
||
|
next := (i + 1) & mask
|
||
|
if b.table[next].receiver == nil {
|
||
|
break
|
||
|
}
|
||
|
if uint64(b.table[next].id)&mask == next {
|
||
|
break
|
||
|
}
|
||
|
b.table[i] = b.table[next]
|
||
|
i = next
|
||
|
}
|
||
|
b.table[i] = broadcasterSlot{}
|
||
|
break
|
||
|
}
|
||
|
i = (i + 1) & mask
|
||
|
}
|
||
|
|
||
|
// If a table 1/4 of the current size would still be at or under the
|
||
|
// maximum load factor (i.e. the current table size is at least two
|
||
|
// expansions bigger than necessary), halve the size of the table to reduce
|
||
|
// the cost of Broadcast. Since we are concerned with iteration time and
|
||
|
// not memory usage, reuse the existing slice to reduce future allocations
|
||
|
// from table re-expansion.
|
||
|
b.load--
|
||
|
if len(b.table) > broadcasterMinNonZeroTableSize && (b.load*(4*broadcasterMaxLoadDen)) <= (broadcasterMaxLoadNum*len(b.table)) {
|
||
|
newlen := len(b.table) / 2
|
||
|
newtable := b.table[:newlen]
|
||
|
for i := newlen; i < len(b.table); i++ {
|
||
|
if b.table[i].receiver != nil {
|
||
|
broadcasterTableInsert(newtable, b.table[i].id, b.table[i].receiver, b.table[i].filter)
|
||
|
b.table[i] = broadcasterSlot{}
|
||
|
}
|
||
|
}
|
||
|
b.table = newtable
|
||
|
}
|
||
|
|
||
|
b.mu.Unlock()
|
||
|
}
|
||
|
|
||
|
// Broadcast notifies all Receivers subscribed to the Broadcaster of the subset
|
||
|
// of events to which they subscribed. The order in which Receivers are
|
||
|
// notified is unspecified.
|
||
|
func (b *Broadcaster) Broadcast(events Set) {
|
||
|
b.mu.Lock()
|
||
|
for i := range b.table {
|
||
|
if intersection := events & b.table[i].filter; intersection != 0 {
|
||
|
// We don't need to check if broadcasterSlot.receiver is nil, since
|
||
|
// if it is then broadcasterSlot.filter is 0.
|
||
|
b.table[i].receiver.Notify(intersection)
|
||
|
}
|
||
|
}
|
||
|
b.mu.Unlock()
|
||
|
}
|
||
|
|
||
|
// FilteredEvents returns the set of events for which Broadcast will notify at
|
||
|
// least one Receiver, i.e. the union of filters for all subscribed Receivers.
|
||
|
func (b *Broadcaster) FilteredEvents() Set {
|
||
|
var es Set
|
||
|
b.mu.Lock()
|
||
|
for i := range b.table {
|
||
|
es |= b.table[i].filter
|
||
|
}
|
||
|
b.mu.Unlock()
|
||
|
return es
|
||
|
}
|