Rate limit the unimplemented syscall event handler.

This introduces two new types of Emitters:
1. MultiEmitter, which will forward events to other registered Emitters, and
2. RateLimitedEmitter, which will forward events to a wrapped Emitter, subject
	to given rate limits.

The methods in the eventchannel package itself act like a multiEmitter, but is
not actually an Emitter. Now we have a DefaultEmitter, and the methods in
eventchannel simply forward calls to the DefaultEmitter.

The unimplemented syscall handler now uses a RateLimetedEmitter that wraps the
DefaultEmitter.

PiperOrigin-RevId: 260612770
This commit is contained in:
Nicolas Lacasse 2019-07-29 17:11:27 -07:00 committed by gVisor bot
parent f0507e1db1
commit 5fdb945a0d
7 changed files with 294 additions and 28 deletions

View File

@ -144,6 +144,12 @@ go_repository(
importpath = "golang.org/x/sys",
)
go_repository(
name = "org_golang_x_time",
commit = "9d24e82272b4f38b78bc8cff74fa936d31ccd8ef",
importpath = "golang.org/x/time",
)
go_repository(
name = "org_golang_x_tools",
commit = "aa82965741a9fecd12b026fbb3d3c6ed3231b8f8",

View File

@ -1,4 +1,4 @@
load("//tools/go_stateify:defs.bzl", "go_library")
load("//tools/go_stateify:defs.bzl", "go_library", "go_test")
load("@io_bazel_rules_go//proto:def.bzl", "go_proto_library")
package(licenses = ["notice"])
@ -7,6 +7,7 @@ go_library(
name = "eventchannel",
srcs = [
"event.go",
"rate.go",
],
importpath = "gvisor.dev/gvisor/pkg/eventchannel",
visibility = ["//:sandbox"],
@ -16,6 +17,7 @@ go_library(
"//pkg/unet",
"@com_github_golang_protobuf//proto:go_default_library",
"@com_github_golang_protobuf//ptypes:go_default_library_gen",
"@org_golang_x_time//rate:go_default_library",
],
)
@ -30,3 +32,12 @@ go_proto_library(
proto = ":eventchannel_proto",
visibility = ["//:sandbox"],
)
go_test(
name = "eventchannel_test",
srcs = ["event_test.go"],
embed = [":eventchannel"],
deps = [
"@com_github_golang_protobuf//proto:go_default_library",
],
)

View File

@ -43,18 +43,36 @@ type Emitter interface {
Close() error
}
var (
mu sync.Mutex
emitters = make(map[Emitter]struct{})
)
// DefaultEmitter is the default emitter. Calls to Emit and AddEmitter are sent
// to this Emitter.
var DefaultEmitter = &multiEmitter{}
// Emit is a helper method that calls DefaultEmitter.Emit.
func Emit(msg proto.Message) error {
_, err := DefaultEmitter.Emit(msg)
return err
}
// AddEmitter is a helper method that calls DefaultEmitter.AddEmitter.
func AddEmitter(e Emitter) {
DefaultEmitter.AddEmitter(e)
}
// multiEmitter is an Emitter that forwards messages to multiple Emitters.
type multiEmitter struct {
// mu protects emitters.
mu sync.Mutex
// emitters is initialized lazily in AddEmitter.
emitters map[Emitter]struct{}
}
// Emit emits a message using all added emitters.
func Emit(msg proto.Message) error {
mu.Lock()
defer mu.Unlock()
func (me *multiEmitter) Emit(msg proto.Message) (bool, error) {
me.mu.Lock()
defer me.mu.Unlock()
var err error
for e := range emitters {
for e := range me.emitters {
hangup, eerr := e.Emit(msg)
if eerr != nil {
if err == nil {
@ -68,18 +86,36 @@ func Emit(msg proto.Message) error {
}
if hangup {
log.Infof("Hangup on eventchannel emitter %v.", e)
delete(emitters, e)
delete(me.emitters, e)
}
}
return err
return false, err
}
// AddEmitter adds a new emitter.
func AddEmitter(e Emitter) {
mu.Lock()
defer mu.Unlock()
emitters[e] = struct{}{}
func (me *multiEmitter) AddEmitter(e Emitter) {
me.mu.Lock()
defer me.mu.Unlock()
if me.emitters == nil {
me.emitters = make(map[Emitter]struct{})
}
me.emitters[e] = struct{}{}
}
// Close closes all emitters. If any Close call errors, it returns the first
// one encountered.
func (me *multiEmitter) Close() error {
me.mu.Lock()
defer me.mu.Unlock()
var err error
for e := range me.emitters {
if eerr := e.Close(); err == nil && eerr != nil {
err = eerr
}
delete(me.emitters, e)
}
return err
}
func marshal(msg proto.Message) ([]byte, error) {

View File

@ -0,0 +1,146 @@
// Copyright 2019 The gVisor Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package eventchannel
import (
"fmt"
"sync"
"testing"
"time"
"github.com/golang/protobuf/proto"
)
// testEmitter is an emitter that can be used in tests. It records all events
// emitted, and whether it has been closed.
type testEmitter struct {
// mu protects all fields below.
mu sync.Mutex
// events contains all emitted events.
events []proto.Message
// closed records whether Close() was called.
closed bool
}
// Emit implements Emitter.Emit.
func (te *testEmitter) Emit(msg proto.Message) (bool, error) {
te.mu.Lock()
defer te.mu.Unlock()
te.events = append(te.events, msg)
return false, nil
}
// Close implements Emitter.Close.
func (te *testEmitter) Close() error {
te.mu.Lock()
defer te.mu.Unlock()
if te.closed {
return fmt.Errorf("closed called twice")
}
te.closed = true
return nil
}
// testMessage implements proto.Message for testing.
type testMessage struct {
proto.Message
// name is the name of the message, used by tests to compare messages.
name string
}
func TestMultiEmitter(t *testing.T) {
// Create three testEmitters, tied together in a multiEmitter.
me := &multiEmitter{}
var emitters []*testEmitter
for i := 0; i < 3; i++ {
te := &testEmitter{}
emitters = append(emitters, te)
me.AddEmitter(te)
}
// Emit three messages to multiEmitter.
names := []string{"foo", "bar", "baz"}
for _, name := range names {
m := testMessage{name: name}
if _, err := me.Emit(m); err != nil {
t.Fatal("me.Emit(%v) failed: %v", m, err)
}
}
// All three emitters should have all three events.
for _, te := range emitters {
if got, want := len(te.events), len(names); got != want {
t.Fatalf("emitter got %d events, want %d", got, want)
}
for i, name := range names {
if got := te.events[i].(testMessage).name; got != name {
t.Errorf("emitter got message with name %q, want %q", got, name)
}
}
}
// Close multiEmitter.
if err := me.Close(); err != nil {
t.Fatal("me.Close() failed: %v", err)
}
// All testEmitters should be closed.
for _, te := range emitters {
if !te.closed {
t.Errorf("te.closed got false, want true")
}
}
}
func TestRateLimitedEmitter(t *testing.T) {
// Create a RateLimittedEmitter that wraps a testEmitter.
te := &testEmitter{}
max := float64(5) // events per second
burst := 10 // events
rle := RateLimitedEmitterFrom(te, max, burst)
// Send 50 messages in one shot.
for i := 0; i < 50; i++ {
if _, err := rle.Emit(testMessage{}); err != nil {
t.Fatalf("rle.Emit failed: %v", err)
}
}
// We should have received only 10 messages.
if got, want := len(te.events), 10; got != want {
t.Errorf("got %d events, want %d", got, want)
}
// Sleep for a second and then send another 50.
time.Sleep(1 * time.Second)
for i := 0; i < 50; i++ {
if _, err := rle.Emit(testMessage{}); err != nil {
t.Fatalf("rle.Emit failed: %v", err)
}
}
// We should have at least 5 more message, plus maybe a few more if the
// test ran slowly.
got, wantAtLeast, wantAtMost := len(te.events), 15, 20
if got < wantAtLeast {
t.Errorf("got %d events, want at least %d", got, wantAtLeast)
}
if got > wantAtMost {
t.Errorf("got %d events, want at most %d", got, wantAtMost)
}
}

54
pkg/eventchannel/rate.go Normal file
View File

@ -0,0 +1,54 @@
// Copyright 2019 The gVisor Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package eventchannel
import (
"github.com/golang/protobuf/proto"
"golang.org/x/time/rate"
)
// rateLimitedEmitter wraps an emitter and limits events to the given limits.
// Events that would exceed the limit are discarded.
type rateLimitedEmitter struct {
inner Emitter
limiter *rate.Limiter
}
// RateLimitedEmitterFrom creates a new event channel emitter that wraps the
// existing emitter and enforces rate limits. The limits are imposed via a
// token bucket, with `maxRate` events per second, with burst size of `burst`
// events. See the golang.org/x/time/rate package and
// https://en.wikipedia.org/wiki/Token_bucket for more information about token
// buckets generally.
func RateLimitedEmitterFrom(inner Emitter, maxRate float64, burst int) Emitter {
return &rateLimitedEmitter{
inner: inner,
limiter: rate.NewLimiter(rate.Limit(maxRate), burst),
}
}
// Emit implements EventEmitter.Emit.
func (rle *rateLimitedEmitter) Emit(msg proto.Message) (bool, error) {
if !rle.limiter.Allow() {
// Drop event.
return false, nil
}
return rle.inner.Emit(msg)
}
// Close implements EventEmitter.Close.
func (rle *rateLimitedEmitter) Close() error {
return rle.inner.Close()
}

View File

@ -197,6 +197,11 @@ type Kernel struct {
// caches. Not all caches use it, only the caches that use host resources use
// the limiter. It may be nil if disabled.
DirentCacheLimiter *fs.DirentCacheLimiter
// unimplementedSyscallEmitter is used to emit unimplemented syscall
// events. This is initialized lazily on the first unimplemented
// syscall.
unimplementedSyscallEmitter eventchannel.Emitter `state:"nosave"`
}
// InitKernelArgs holds arguments to Init.
@ -290,7 +295,6 @@ func (k *Kernel) Init(args InitKernelArgs) error {
k.monotonicClock = &timekeeperClock{tk: args.Timekeeper, c: sentrytime.Monotonic}
k.futexes = futex.NewManager()
k.netlinkPorts = port.New()
return nil
}
@ -1168,16 +1172,6 @@ func (k *Kernel) SupervisorContext() context.Context {
}
}
// EmitUnimplementedEvent emits an UnimplementedSyscall event via the event
// channel.
func (k *Kernel) EmitUnimplementedEvent(ctx context.Context) {
t := TaskFromContext(ctx)
eventchannel.Emit(&uspb.UnimplementedSyscall{
Tid: int32(t.ThreadID()),
Registers: t.Arch().StateData().Proto(),
})
}
// SocketEntry represents a socket recorded in Kernel.sockets. It implements
// refs.WeakRefUser for sockets stored in the socket table.
//
@ -1272,3 +1266,23 @@ func (ctx supervisorContext) Value(key interface{}) interface{} {
return nil
}
}
// Rate limits for the number of unimplemented syscall evants.
const (
unimplementedSyscallsMaxRate = 100 // events per second
unimplementedSyscallBurst = 1000 // events
)
// EmitUnimplementedEvent emits an UnimplementedSyscall event via the event
// channel.
func (k *Kernel) EmitUnimplementedEvent(ctx context.Context) {
if k.unimplementedSyscallEmitter == nil {
k.unimplementedSyscallEmitter = eventchannel.RateLimitedEmitterFrom(eventchannel.DefaultEmitter, unimplementedSyscallsMaxRate, unimplementedSyscallBurst)
}
t := TaskFromContext(ctx)
k.unimplementedSyscallEmitter.Emit(&uspb.UnimplementedSyscall{
Tid: int32(t.ThreadID()),
Registers: t.Arch().StateData().Proto(),
})
}

View File

@ -30,8 +30,7 @@ import (
const _AUDIT_ARCH_X86_64 = 0xc000003e
// AMD64 is a table of Linux amd64 syscall API with the corresponding syscall
// numbers from Linux 4.4. The entries commented out are those syscalls we
// don't currently support.
// numbers from Linux 4.4.
var AMD64 = &kernel.SyscallTable{
OS: abi.Linux,
Arch: arch.AMD64,