377 lines
9.2 KiB
Go
377 lines
9.2 KiB
Go
// Copyright 2020 The gVisor Authors.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package syncevent
|
|
|
|
import (
|
|
"fmt"
|
|
"math/rand"
|
|
"testing"
|
|
|
|
"gvisor.dev/gvisor/pkg/sync"
|
|
"gvisor.dev/gvisor/pkg/waiter"
|
|
)
|
|
|
|
func TestBroadcasterFilter(t *testing.T) {
|
|
const numReceivers = 2 * MaxEvents
|
|
|
|
var br Broadcaster
|
|
ws := make([]Waiter, numReceivers)
|
|
for i := range ws {
|
|
ws[i].Init()
|
|
br.SubscribeEvents(ws[i].Receiver(), 1<<(i%MaxEvents))
|
|
}
|
|
for ev := 0; ev < MaxEvents; ev++ {
|
|
br.Broadcast(1 << ev)
|
|
for i := range ws {
|
|
want := NoEvents
|
|
if i%MaxEvents == ev {
|
|
want = 1 << ev
|
|
}
|
|
if got := ws[i].Receiver().PendingAndAckAll(); got != want {
|
|
t.Errorf("after Broadcast of event %d: waiter %d has pending event set %#x, wanted %#x", ev, i, got, want)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// TestBroadcasterManySubscriptions tests that subscriptions are not lost by
|
|
// table expansion/compaction.
|
|
func TestBroadcasterManySubscriptions(t *testing.T) {
|
|
const numReceivers = 5000 // arbitrary
|
|
|
|
var br Broadcaster
|
|
ws := make([]Waiter, numReceivers)
|
|
for i := range ws {
|
|
ws[i].Init()
|
|
}
|
|
|
|
ids := make([]SubscriptionID, numReceivers)
|
|
for i := 0; i < numReceivers; i++ {
|
|
// Subscribe receiver i.
|
|
ids[i] = br.SubscribeEvents(ws[i].Receiver(), 1)
|
|
// Check that receivers [0, i] are subscribed.
|
|
br.Broadcast(1)
|
|
for j := 0; j <= i; j++ {
|
|
if ws[j].Pending() != 1 {
|
|
t.Errorf("receiver %d did not receive an event after subscription of receiver %d", j, i)
|
|
}
|
|
ws[j].Ack(1)
|
|
}
|
|
}
|
|
|
|
// Generate a random order for unsubscriptions.
|
|
unsub := rand.Perm(numReceivers)
|
|
for i := 0; i < numReceivers; i++ {
|
|
// Unsubscribe receiver unsub[i].
|
|
br.UnsubscribeEvents(ids[unsub[i]])
|
|
// Check that receivers [unsub[0], unsub[i]] are not subscribed, and that
|
|
// receivers (unsub[i], unsub[numReceivers]) are still subscribed.
|
|
br.Broadcast(1)
|
|
for j := 0; j <= i; j++ {
|
|
if ws[unsub[j]].Pending() != 0 {
|
|
t.Errorf("unsub iteration %d: receiver %d received an event after unsubscription of receiver %d", i, unsub[j], unsub[i])
|
|
}
|
|
}
|
|
for j := i + 1; j < numReceivers; j++ {
|
|
if ws[unsub[j]].Pending() != 1 {
|
|
t.Errorf("unsub iteration %d: receiver %d did not receive an event after unsubscription of receiver %d", i, unsub[j], unsub[i])
|
|
}
|
|
ws[unsub[j]].Ack(1)
|
|
}
|
|
}
|
|
}
|
|
|
|
var (
|
|
receiverCountsNonZero = []int{1, 4, 16, 64}
|
|
receiverCountsIncludingZero = append([]int{0}, receiverCountsNonZero...)
|
|
)
|
|
|
|
// BenchmarkBroadcasterX, BenchmarkMapX, and BenchmarkQueueX benchmark usage
|
|
// pattern X (described in terms of Broadcaster) with Broadcaster, a
|
|
// Mutex-protected map[*Receiver]Set, and waiter.Queue respectively.
|
|
|
|
// BenchmarkXxxSubscribeUnsubscribe measures the cost of a Subscribe/Unsubscribe
|
|
// cycle.
|
|
|
|
func BenchmarkBroadcasterSubscribeUnsubscribe(b *testing.B) {
|
|
var br Broadcaster
|
|
var w Waiter
|
|
w.Init()
|
|
|
|
b.ResetTimer()
|
|
for i := 0; i < b.N; i++ {
|
|
id := br.SubscribeEvents(w.Receiver(), 1)
|
|
br.UnsubscribeEvents(id)
|
|
}
|
|
}
|
|
|
|
func BenchmarkMapSubscribeUnsubscribe(b *testing.B) {
|
|
var mu sync.Mutex
|
|
m := make(map[*Receiver]Set)
|
|
var w Waiter
|
|
w.Init()
|
|
|
|
b.ResetTimer()
|
|
for i := 0; i < b.N; i++ {
|
|
mu.Lock()
|
|
m[w.Receiver()] = Set(1)
|
|
mu.Unlock()
|
|
mu.Lock()
|
|
delete(m, w.Receiver())
|
|
mu.Unlock()
|
|
}
|
|
}
|
|
|
|
func BenchmarkQueueSubscribeUnsubscribe(b *testing.B) {
|
|
var q waiter.Queue
|
|
e, _ := waiter.NewChannelEntry(nil)
|
|
|
|
b.ResetTimer()
|
|
for i := 0; i < b.N; i++ {
|
|
q.EventRegister(&e, 1)
|
|
q.EventUnregister(&e)
|
|
}
|
|
}
|
|
|
|
// BenchmarkXxxSubscribeUnsubscribeBatch is similar to
|
|
// BenchmarkXxxSubscribeUnsubscribe, but subscribes and unsubscribes a large
|
|
// number of Receivers at a time in order to measure the amortized overhead of
|
|
// table expansion/compaction. (Since waiter.Queue is implemented using a
|
|
// linked list, BenchmarkQueueSubscribeUnsubscribe and
|
|
// BenchmarkQueueSubscribeUnsubscribeBatch should produce nearly the same
|
|
// result.)
|
|
|
|
const numBatchReceivers = 1000
|
|
|
|
func BenchmarkBroadcasterSubscribeUnsubscribeBatch(b *testing.B) {
|
|
var br Broadcaster
|
|
ws := make([]Waiter, numBatchReceivers)
|
|
for i := range ws {
|
|
ws[i].Init()
|
|
}
|
|
ids := make([]SubscriptionID, numBatchReceivers)
|
|
|
|
// Generate a random order for unsubscriptions.
|
|
unsub := rand.Perm(numBatchReceivers)
|
|
|
|
b.ResetTimer()
|
|
for i := 0; i < b.N/numBatchReceivers; i++ {
|
|
for j := 0; j < numBatchReceivers; j++ {
|
|
ids[j] = br.SubscribeEvents(ws[j].Receiver(), 1)
|
|
}
|
|
for j := 0; j < numBatchReceivers; j++ {
|
|
br.UnsubscribeEvents(ids[unsub[j]])
|
|
}
|
|
}
|
|
}
|
|
|
|
func BenchmarkMapSubscribeUnsubscribeBatch(b *testing.B) {
|
|
var mu sync.Mutex
|
|
m := make(map[*Receiver]Set)
|
|
ws := make([]Waiter, numBatchReceivers)
|
|
for i := range ws {
|
|
ws[i].Init()
|
|
}
|
|
|
|
// Generate a random order for unsubscriptions.
|
|
unsub := rand.Perm(numBatchReceivers)
|
|
|
|
b.ResetTimer()
|
|
for i := 0; i < b.N/numBatchReceivers; i++ {
|
|
for j := 0; j < numBatchReceivers; j++ {
|
|
mu.Lock()
|
|
m[ws[j].Receiver()] = Set(1)
|
|
mu.Unlock()
|
|
}
|
|
for j := 0; j < numBatchReceivers; j++ {
|
|
mu.Lock()
|
|
delete(m, ws[unsub[j]].Receiver())
|
|
mu.Unlock()
|
|
}
|
|
}
|
|
}
|
|
|
|
func BenchmarkQueueSubscribeUnsubscribeBatch(b *testing.B) {
|
|
var q waiter.Queue
|
|
es := make([]waiter.Entry, numBatchReceivers)
|
|
for i := range es {
|
|
es[i], _ = waiter.NewChannelEntry(nil)
|
|
}
|
|
|
|
// Generate a random order for unsubscriptions.
|
|
unsub := rand.Perm(numBatchReceivers)
|
|
|
|
b.ResetTimer()
|
|
for i := 0; i < b.N/numBatchReceivers; i++ {
|
|
for j := 0; j < numBatchReceivers; j++ {
|
|
q.EventRegister(&es[j], 1)
|
|
}
|
|
for j := 0; j < numBatchReceivers; j++ {
|
|
q.EventUnregister(&es[unsub[j]])
|
|
}
|
|
}
|
|
}
|
|
|
|
// BenchmarkXxxBroadcastRedundant measures how long it takes to Broadcast
|
|
// already-pending events to multiple Receivers.
|
|
|
|
func BenchmarkBroadcasterBroadcastRedundant(b *testing.B) {
|
|
for _, n := range receiverCountsIncludingZero {
|
|
b.Run(fmt.Sprintf("%d", n), func(b *testing.B) {
|
|
var br Broadcaster
|
|
ws := make([]Waiter, n)
|
|
for i := range ws {
|
|
ws[i].Init()
|
|
br.SubscribeEvents(ws[i].Receiver(), 1)
|
|
}
|
|
br.Broadcast(1)
|
|
|
|
b.ResetTimer()
|
|
for i := 0; i < b.N; i++ {
|
|
br.Broadcast(1)
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func BenchmarkMapBroadcastRedundant(b *testing.B) {
|
|
for _, n := range receiverCountsIncludingZero {
|
|
b.Run(fmt.Sprintf("%d", n), func(b *testing.B) {
|
|
var mu sync.Mutex
|
|
m := make(map[*Receiver]Set)
|
|
ws := make([]Waiter, n)
|
|
for i := range ws {
|
|
ws[i].Init()
|
|
m[ws[i].Receiver()] = Set(1)
|
|
}
|
|
mu.Lock()
|
|
for r := range m {
|
|
r.Notify(1)
|
|
}
|
|
mu.Unlock()
|
|
|
|
b.ResetTimer()
|
|
for i := 0; i < b.N; i++ {
|
|
mu.Lock()
|
|
for r := range m {
|
|
r.Notify(1)
|
|
}
|
|
mu.Unlock()
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func BenchmarkQueueBroadcastRedundant(b *testing.B) {
|
|
for _, n := range receiverCountsIncludingZero {
|
|
b.Run(fmt.Sprintf("%d", n), func(b *testing.B) {
|
|
var q waiter.Queue
|
|
for i := 0; i < n; i++ {
|
|
e, _ := waiter.NewChannelEntry(nil)
|
|
q.EventRegister(&e, 1)
|
|
}
|
|
q.Notify(1)
|
|
|
|
b.ResetTimer()
|
|
for i := 0; i < b.N; i++ {
|
|
q.Notify(1)
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
// BenchmarkXxxBroadcastAck measures how long it takes to Broadcast events to
|
|
// multiple Receivers, check that all Receivers have received the event, and
|
|
// clear the event from all Receivers.
|
|
|
|
func BenchmarkBroadcasterBroadcastAck(b *testing.B) {
|
|
for _, n := range receiverCountsNonZero {
|
|
b.Run(fmt.Sprintf("%d", n), func(b *testing.B) {
|
|
var br Broadcaster
|
|
ws := make([]Waiter, n)
|
|
for i := range ws {
|
|
ws[i].Init()
|
|
br.SubscribeEvents(ws[i].Receiver(), 1)
|
|
}
|
|
|
|
b.ResetTimer()
|
|
for i := 0; i < b.N; i++ {
|
|
br.Broadcast(1)
|
|
for j := range ws {
|
|
if got, want := ws[j].Pending(), Set(1); got != want {
|
|
b.Fatalf("Receiver.Pending(): got %#x, wanted %#x", got, want)
|
|
}
|
|
ws[j].Ack(1)
|
|
}
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func BenchmarkMapBroadcastAck(b *testing.B) {
|
|
for _, n := range receiverCountsNonZero {
|
|
b.Run(fmt.Sprintf("%d", n), func(b *testing.B) {
|
|
var mu sync.Mutex
|
|
m := make(map[*Receiver]Set)
|
|
ws := make([]Waiter, n)
|
|
for i := range ws {
|
|
ws[i].Init()
|
|
m[ws[i].Receiver()] = Set(1)
|
|
}
|
|
|
|
b.ResetTimer()
|
|
for i := 0; i < b.N; i++ {
|
|
mu.Lock()
|
|
for r := range m {
|
|
r.Notify(1)
|
|
}
|
|
mu.Unlock()
|
|
for j := range ws {
|
|
if got, want := ws[j].Pending(), Set(1); got != want {
|
|
b.Fatalf("Receiver.Pending(): got %#x, wanted %#x", got, want)
|
|
}
|
|
ws[j].Ack(1)
|
|
}
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func BenchmarkQueueBroadcastAck(b *testing.B) {
|
|
for _, n := range receiverCountsNonZero {
|
|
b.Run(fmt.Sprintf("%d", n), func(b *testing.B) {
|
|
var q waiter.Queue
|
|
chs := make([]chan struct{}, n)
|
|
for i := range chs {
|
|
e, ch := waiter.NewChannelEntry(nil)
|
|
q.EventRegister(&e, 1)
|
|
chs[i] = ch
|
|
}
|
|
|
|
b.ResetTimer()
|
|
for i := 0; i < b.N; i++ {
|
|
q.Notify(1)
|
|
for _, ch := range chs {
|
|
select {
|
|
case <-ch:
|
|
default:
|
|
b.Fatalf("channel did not receive event")
|
|
}
|
|
}
|
|
}
|
|
})
|
|
}
|
|
}
|