add convenient wrapper for eventfd
The same create/write/read pattern is copied around several places. It's easier to understand in a package with names and comments, and we can reuse the smart blocking code in package rawfile. PiperOrigin-RevId: 401647108
This commit is contained in:
parent
487651ac46
commit
e44b100654
|
@ -0,0 +1,22 @@
|
|||
load("//tools:defs.bzl", "go_library", "go_test")
|
||||
|
||||
package(licenses = ["notice"])
|
||||
|
||||
go_library(
|
||||
name = "eventfd",
|
||||
srcs = [
|
||||
"eventfd.go",
|
||||
],
|
||||
visibility = ["//:sandbox"],
|
||||
deps = [
|
||||
"//pkg/hostarch",
|
||||
"//pkg/tcpip/link/rawfile",
|
||||
"@org_golang_x_sys//unix:go_default_library",
|
||||
],
|
||||
)
|
||||
|
||||
go_test(
|
||||
name = "eventfd_test",
|
||||
srcs = ["eventfd_test.go"],
|
||||
library = ":eventfd",
|
||||
)
|
|
@ -0,0 +1,115 @@
|
|||
// Copyright 2021 The gVisor Authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
// Package eventfd wraps Linux's eventfd(2) syscall.
|
||||
package eventfd
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
|
||||
"golang.org/x/sys/unix"
|
||||
"gvisor.dev/gvisor/pkg/hostarch"
|
||||
"gvisor.dev/gvisor/pkg/tcpip/link/rawfile"
|
||||
)
|
||||
|
||||
const sizeofUint64 = 8
|
||||
|
||||
// Eventfd represents a Linux eventfd object.
|
||||
type Eventfd struct {
|
||||
fd int
|
||||
}
|
||||
|
||||
// Create returns an initialized eventfd.
|
||||
func Create() (Eventfd, error) {
|
||||
fd, _, err := unix.RawSyscall(unix.SYS_EVENTFD2, 0, 0, 0)
|
||||
if err != 0 {
|
||||
return Eventfd{}, fmt.Errorf("failed to create eventfd: %v", error(err))
|
||||
}
|
||||
if err := unix.SetNonblock(int(fd), true); err != nil {
|
||||
unix.Close(int(fd))
|
||||
return Eventfd{}, err
|
||||
}
|
||||
return Eventfd{int(fd)}, nil
|
||||
}
|
||||
|
||||
// Wrap returns an initialized Eventfd using the provided fd.
|
||||
func Wrap(fd int) Eventfd {
|
||||
return Eventfd{fd}
|
||||
}
|
||||
|
||||
// Close closes the eventfd, after which it should not be used.
|
||||
func (ev Eventfd) Close() error {
|
||||
return unix.Close(ev.fd)
|
||||
}
|
||||
|
||||
// Dup copies the eventfd, calling dup(2) on the underlying file descriptor.
|
||||
func (ev Eventfd) Dup() (Eventfd, error) {
|
||||
other, err := unix.Dup(ev.fd)
|
||||
if err != nil {
|
||||
return Eventfd{}, fmt.Errorf("failed to dup: %v", other)
|
||||
}
|
||||
return Eventfd{other}, nil
|
||||
}
|
||||
|
||||
// Notify alerts other users of the eventfd. Users can receive alerts by
|
||||
// calling Wait or Read.
|
||||
func (ev Eventfd) Notify() error {
|
||||
return ev.Write(1)
|
||||
}
|
||||
|
||||
// Write writes a specific value to the eventfd.
|
||||
func (ev Eventfd) Write(val uint64) error {
|
||||
var buf [sizeofUint64]byte
|
||||
hostarch.ByteOrder.PutUint64(buf[:], val)
|
||||
for {
|
||||
n, err := unix.Write(ev.fd, buf[:])
|
||||
if err == unix.EINTR {
|
||||
continue
|
||||
}
|
||||
if n != sizeofUint64 {
|
||||
panic(fmt.Sprintf("short write to eventfd: got %d bytes, wanted %d", n, sizeofUint64))
|
||||
}
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// Wait blocks until eventfd is non-zero (i.e. someone calls Notify or Write).
|
||||
func (ev Eventfd) Wait() error {
|
||||
_, err := ev.Read()
|
||||
return err
|
||||
}
|
||||
|
||||
// Read blocks until eventfd is non-zero (i.e. someone calls Notify or Write)
|
||||
// and returns the value read.
|
||||
func (ev Eventfd) Read() (uint64, error) {
|
||||
var tmp [sizeofUint64]byte
|
||||
n, err := rawfile.BlockingReadUntranslated(ev.fd, tmp[:])
|
||||
if err != 0 {
|
||||
return 0, err
|
||||
}
|
||||
if n == 0 {
|
||||
return 0, io.EOF
|
||||
}
|
||||
if n != sizeofUint64 {
|
||||
panic(fmt.Sprintf("short read from eventfd: got %d bytes, wanted %d", n, sizeofUint64))
|
||||
}
|
||||
return hostarch.ByteOrder.Uint64(tmp[:]), nil
|
||||
}
|
||||
|
||||
// FD returns the underlying file descriptor. Use with care, as this breaks the
|
||||
// Eventfd abstraction.
|
||||
func (ev Eventfd) FD() int {
|
||||
return ev.fd
|
||||
}
|
|
@ -0,0 +1,75 @@
|
|||
// Copyright 2021 The gVisor Authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package eventfd
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestReadWrite(t *testing.T) {
|
||||
efd, err := Create()
|
||||
if err != nil {
|
||||
t.Fatalf("failed to Create(): %v", err)
|
||||
}
|
||||
defer efd.Close()
|
||||
|
||||
// Make sure we can read actual values
|
||||
const want = 343
|
||||
if err := efd.Write(want); err != nil {
|
||||
t.Fatalf("failed to write value: %d", want)
|
||||
}
|
||||
|
||||
got, err := efd.Read()
|
||||
if err != nil {
|
||||
t.Fatalf("failed to read value: %v", err)
|
||||
}
|
||||
if got != want {
|
||||
t.Fatalf("Read(): got %d, but wanted %d", got, want)
|
||||
}
|
||||
}
|
||||
|
||||
func TestWait(t *testing.T) {
|
||||
efd, err := Create()
|
||||
if err != nil {
|
||||
t.Fatalf("failed to Create(): %v", err)
|
||||
}
|
||||
defer efd.Close()
|
||||
|
||||
// There's no way to test with certainty that Wait() blocks indefinitely, but
|
||||
// as a best-effort we can wait a bit on it.
|
||||
errCh := make(chan error)
|
||||
go func() {
|
||||
errCh <- efd.Wait()
|
||||
}()
|
||||
select {
|
||||
case err := <-errCh:
|
||||
t.Fatalf("Wait() returned without a call to Notify(): %v", err)
|
||||
case <-time.After(500 * time.Millisecond):
|
||||
}
|
||||
|
||||
// Notify and check that Wait() returned.
|
||||
if err := efd.Notify(); err != nil {
|
||||
t.Fatalf("Notify() failed: %v", err)
|
||||
}
|
||||
select {
|
||||
case err := <-errCh:
|
||||
if err != nil {
|
||||
t.Fatalf("Read() failed: %v", err)
|
||||
}
|
||||
case <-time.After(5 * time.Second):
|
||||
t.Fatalf("Read() did not return after Notify()")
|
||||
}
|
||||
}
|
|
@ -12,8 +12,7 @@ go_library(
|
|||
visibility = ["//pkg/sentry:internal"],
|
||||
deps = [
|
||||
"//pkg/abi/linux",
|
||||
"//pkg/fd",
|
||||
"//pkg/hostarch",
|
||||
"//pkg/eventfd",
|
||||
"//pkg/log",
|
||||
"@org_golang_x_sys//unix:go_default_library",
|
||||
],
|
||||
|
|
|
@ -21,9 +21,7 @@ import (
|
|||
"os"
|
||||
"path"
|
||||
|
||||
"golang.org/x/sys/unix"
|
||||
"gvisor.dev/gvisor/pkg/fd"
|
||||
"gvisor.dev/gvisor/pkg/hostarch"
|
||||
"gvisor.dev/gvisor/pkg/eventfd"
|
||||
"gvisor.dev/gvisor/pkg/log"
|
||||
)
|
||||
|
||||
|
@ -54,7 +52,7 @@ func NotifyCurrentMemcgPressureCallback(f func(), level string) (func(), error)
|
|||
}
|
||||
defer eventControlFile.Close()
|
||||
|
||||
eventFD, err := newEventFD()
|
||||
eventFD, err := eventfd.Create()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -75,20 +73,11 @@ func NotifyCurrentMemcgPressureCallback(f func(), level string) (func(), error)
|
|||
const stopVal = 1 << 63
|
||||
stopCh := make(chan struct{})
|
||||
go func() { // S/R-SAFE: f provides synchronization if necessary
|
||||
rw := fd.NewReadWriter(eventFD.FD())
|
||||
var buf [sizeofUint64]byte
|
||||
for {
|
||||
n, err := rw.Read(buf[:])
|
||||
val, err := eventFD.Read()
|
||||
if err != nil {
|
||||
if err == unix.EINTR {
|
||||
continue
|
||||
}
|
||||
panic(fmt.Sprintf("failed to read from memory pressure level eventfd: %v", err))
|
||||
}
|
||||
if n != sizeofUint64 {
|
||||
panic(fmt.Sprintf("short read from memory pressure level eventfd: got %d bytes, wanted %d", n, sizeofUint64))
|
||||
}
|
||||
val := hostarch.ByteOrder.Uint64(buf[:])
|
||||
if val >= stopVal {
|
||||
// Assume this was due to the notifier's "destructor" (the
|
||||
// function returned by NotifyCurrentMemcgPressureCallback
|
||||
|
@ -101,30 +90,7 @@ func NotifyCurrentMemcgPressureCallback(f func(), level string) (func(), error)
|
|||
}
|
||||
}()
|
||||
return func() {
|
||||
rw := fd.NewReadWriter(eventFD.FD())
|
||||
var buf [sizeofUint64]byte
|
||||
hostarch.ByteOrder.PutUint64(buf[:], stopVal)
|
||||
for {
|
||||
n, err := rw.Write(buf[:])
|
||||
if err != nil {
|
||||
if err == unix.EINTR {
|
||||
continue
|
||||
}
|
||||
panic(fmt.Sprintf("failed to write to memory pressure level eventfd: %v", err))
|
||||
}
|
||||
if n != sizeofUint64 {
|
||||
panic(fmt.Sprintf("short write to memory pressure level eventfd: got %d bytes, wanted %d", n, sizeofUint64))
|
||||
}
|
||||
break
|
||||
}
|
||||
eventFD.Write(stopVal)
|
||||
<-stopCh
|
||||
}, nil
|
||||
}
|
||||
|
||||
func newEventFD() (*fd.FD, error) {
|
||||
f, _, e := unix.Syscall(unix.SYS_EVENTFD2, 0, 0, 0)
|
||||
if e != 0 {
|
||||
return nil, fmt.Errorf("failed to create eventfd: %v", e)
|
||||
}
|
||||
return fd.New(int(f)), nil
|
||||
}
|
||||
|
|
|
@ -152,10 +152,22 @@ type PollEvent struct {
|
|||
// no data is available, it will block in a poll() syscall until the file
|
||||
// descriptor becomes readable.
|
||||
func BlockingRead(fd int, b []byte) (int, tcpip.Error) {
|
||||
n, err := BlockingReadUntranslated(fd, b)
|
||||
if err != 0 {
|
||||
return n, TranslateErrno(err)
|
||||
}
|
||||
return n, nil
|
||||
}
|
||||
|
||||
// BlockingReadUntranslated reads from a file descriptor that is set up as
|
||||
// non-blocking. If no data is available, it will block in a poll() syscall
|
||||
// until the file descriptor becomes readable. It returns the raw unix.Errno
|
||||
// value returned by the underlying syscalls.
|
||||
func BlockingReadUntranslated(fd int, b []byte) (int, unix.Errno) {
|
||||
for {
|
||||
n, _, e := unix.RawSyscall(unix.SYS_READ, uintptr(fd), uintptr(unsafe.Pointer(&b[0])), uintptr(len(b)))
|
||||
if e == 0 {
|
||||
return int(n), nil
|
||||
return int(n), 0
|
||||
}
|
||||
|
||||
event := PollEvent{
|
||||
|
@ -165,7 +177,7 @@ func BlockingRead(fd int, b []byte) (int, tcpip.Error) {
|
|||
|
||||
_, e = BlockingPoll(&event, 1, nil)
|
||||
if e != 0 && e != unix.EINTR {
|
||||
return 0, TranslateErrno(e)
|
||||
return 0, e
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -17,6 +17,7 @@ go_library(
|
|||
visibility = ["//visibility:public"],
|
||||
deps = [
|
||||
"//pkg/cleanup",
|
||||
"//pkg/eventfd",
|
||||
"//pkg/log",
|
||||
"//pkg/sync",
|
||||
"//pkg/tcpip",
|
||||
|
|
|
@ -22,6 +22,7 @@ import (
|
|||
"io/ioutil"
|
||||
|
||||
"golang.org/x/sys/unix"
|
||||
"gvisor.dev/gvisor/pkg/eventfd"
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -116,25 +117,25 @@ type queueSizes struct {
|
|||
|
||||
func createQueueFDs(s queueSizes) (QueueConfig, error) {
|
||||
success := false
|
||||
var fd uintptr
|
||||
var eventFD eventfd.Eventfd
|
||||
var dataFD, txPipeFD, rxPipeFD, sharedDataFD int
|
||||
defer func() {
|
||||
if success {
|
||||
return
|
||||
}
|
||||
closeFDs(QueueConfig{
|
||||
EventFD: int(fd),
|
||||
EventFD: eventFD,
|
||||
DataFD: dataFD,
|
||||
TxPipeFD: txPipeFD,
|
||||
RxPipeFD: rxPipeFD,
|
||||
SharedDataFD: sharedDataFD,
|
||||
})
|
||||
}()
|
||||
eventFD, _, errno := unix.RawSyscall(unix.SYS_EVENTFD2, 0, 0, 0)
|
||||
if errno != 0 {
|
||||
return QueueConfig{}, fmt.Errorf("eventfd failed: %v", error(errno))
|
||||
eventFD, err := eventfd.Create()
|
||||
if err != nil {
|
||||
return QueueConfig{}, fmt.Errorf("eventfd failed: %v", err)
|
||||
}
|
||||
dataFD, err := createFile(s.dataSize, false)
|
||||
dataFD, err = createFile(s.dataSize, false)
|
||||
if err != nil {
|
||||
return QueueConfig{}, fmt.Errorf("failed to create dataFD: %s", err)
|
||||
}
|
||||
|
@ -152,7 +153,7 @@ func createQueueFDs(s queueSizes) (QueueConfig, error) {
|
|||
}
|
||||
success = true
|
||||
return QueueConfig{
|
||||
EventFD: int(eventFD),
|
||||
EventFD: eventFD,
|
||||
DataFD: dataFD,
|
||||
TxPipeFD: txPipeFD,
|
||||
RxPipeFD: rxPipeFD,
|
||||
|
@ -191,7 +192,7 @@ func createFile(size int64, initQueue bool) (fd int, err error) {
|
|||
|
||||
func closeFDs(c QueueConfig) {
|
||||
unix.Close(c.DataFD)
|
||||
unix.Close(c.EventFD)
|
||||
c.EventFD.Close()
|
||||
unix.Close(c.TxPipeFD)
|
||||
unix.Close(c.RxPipeFD)
|
||||
unix.Close(c.SharedDataFD)
|
||||
|
|
|
@ -21,7 +21,7 @@ import (
|
|||
"sync/atomic"
|
||||
|
||||
"golang.org/x/sys/unix"
|
||||
"gvisor.dev/gvisor/pkg/tcpip/link/rawfile"
|
||||
"gvisor.dev/gvisor/pkg/eventfd"
|
||||
"gvisor.dev/gvisor/pkg/tcpip/link/sharedmem/queue"
|
||||
)
|
||||
|
||||
|
@ -30,7 +30,7 @@ type rx struct {
|
|||
data []byte
|
||||
sharedData []byte
|
||||
q queue.Rx
|
||||
eventFD int
|
||||
eventFD eventfd.Eventfd
|
||||
}
|
||||
|
||||
// init initializes all state needed by the rx queue based on the information
|
||||
|
@ -68,7 +68,7 @@ func (r *rx) init(mtu uint32, c *QueueConfig) error {
|
|||
|
||||
// Duplicate the eventFD so that caller can close it but we can still
|
||||
// use it.
|
||||
efd, err := unix.Dup(c.EventFD)
|
||||
efd, err := c.EventFD.Dup()
|
||||
if err != nil {
|
||||
unix.Munmap(txPipe)
|
||||
unix.Munmap(rxPipe)
|
||||
|
@ -77,16 +77,6 @@ func (r *rx) init(mtu uint32, c *QueueConfig) error {
|
|||
return err
|
||||
}
|
||||
|
||||
// Set the eventfd as non-blocking.
|
||||
if err := unix.SetNonblock(efd, true); err != nil {
|
||||
unix.Munmap(txPipe)
|
||||
unix.Munmap(rxPipe)
|
||||
unix.Munmap(data)
|
||||
unix.Munmap(sharedData)
|
||||
unix.Close(efd)
|
||||
return err
|
||||
}
|
||||
|
||||
// Initialize state based on buffers.
|
||||
r.q.Init(txPipe, rxPipe, sharedDataPointer(sharedData))
|
||||
r.data = data
|
||||
|
@ -105,13 +95,13 @@ func (r *rx) cleanup() {
|
|||
|
||||
unix.Munmap(r.data)
|
||||
unix.Munmap(r.sharedData)
|
||||
unix.Close(r.eventFD)
|
||||
r.eventFD.Close()
|
||||
}
|
||||
|
||||
// notify writes to the tx.eventFD to indicate to the peer that there is data to
|
||||
// be read.
|
||||
func (r *rx) notify() {
|
||||
unix.Write(r.eventFD, []byte{1, 0, 0, 0, 0, 0, 0, 0})
|
||||
r.eventFD.Notify()
|
||||
}
|
||||
|
||||
// postAndReceive posts the provided buffers (if any), and then tries to read
|
||||
|
@ -128,8 +118,7 @@ func (r *rx) postAndReceive(b []queue.RxBuffer, stopRequested *uint32) ([]queue.
|
|||
if len(b) != 0 && !r.q.PostBuffers(b) {
|
||||
r.q.EnableNotification()
|
||||
for !r.q.PostBuffers(b) {
|
||||
var tmp [8]byte
|
||||
rawfile.BlockingRead(r.eventFD, tmp[:])
|
||||
r.eventFD.Wait()
|
||||
if atomic.LoadUint32(stopRequested) != 0 {
|
||||
r.q.DisableNotification()
|
||||
return nil, 0
|
||||
|
@ -153,8 +142,7 @@ func (r *rx) postAndReceive(b []queue.RxBuffer, stopRequested *uint32) ([]queue.
|
|||
}
|
||||
|
||||
// Wait for notification.
|
||||
var tmp [8]byte
|
||||
rawfile.BlockingRead(r.eventFD, tmp[:])
|
||||
r.eventFD.Wait()
|
||||
if atomic.LoadUint32(stopRequested) != 0 {
|
||||
r.q.DisableNotification()
|
||||
return nil, 0
|
||||
|
|
|
@ -20,7 +20,7 @@ package sharedmem
|
|||
import (
|
||||
"golang.org/x/sys/unix"
|
||||
"gvisor.dev/gvisor/pkg/cleanup"
|
||||
"gvisor.dev/gvisor/pkg/tcpip/link/rawfile"
|
||||
"gvisor.dev/gvisor/pkg/eventfd"
|
||||
"gvisor.dev/gvisor/pkg/tcpip/link/sharedmem/pipe"
|
||||
"gvisor.dev/gvisor/pkg/tcpip/link/sharedmem/queue"
|
||||
)
|
||||
|
@ -38,7 +38,7 @@ type serverRx struct {
|
|||
data []byte
|
||||
|
||||
// eventFD is used to notify the peer when transmission is completed.
|
||||
eventFD int
|
||||
eventFD eventfd.Eventfd
|
||||
|
||||
// sharedData the memory region to use to enable/disable notifications.
|
||||
sharedData []byte
|
||||
|
@ -78,16 +78,11 @@ func (s *serverRx) init(c *QueueConfig) error {
|
|||
|
||||
// Duplicate the eventFD so that caller can close it but we can still
|
||||
// use it.
|
||||
efd, err := unix.Dup(c.EventFD)
|
||||
efd, err := c.EventFD.Dup()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
cu.Add(func() { unix.Close(efd) })
|
||||
|
||||
// Set the eventfd as non-blocking.
|
||||
if err := unix.SetNonblock(efd, true); err != nil {
|
||||
return err
|
||||
}
|
||||
cu.Add(func() { efd.Close() })
|
||||
|
||||
s.packetPipe.Init(packetPipeMem)
|
||||
s.completionPipe.Init(completionPipeMem)
|
||||
|
@ -104,7 +99,7 @@ func (s *serverRx) cleanup() {
|
|||
unix.Munmap(s.completionPipe.Bytes())
|
||||
unix.Munmap(s.data)
|
||||
unix.Munmap(s.sharedData)
|
||||
unix.Close(s.eventFD)
|
||||
s.eventFD.Close()
|
||||
}
|
||||
|
||||
// completionNotificationSize is size in bytes of a completion notification sent
|
||||
|
@ -143,6 +138,5 @@ func (s *serverRx) receive() []byte {
|
|||
}
|
||||
|
||||
func (s *serverRx) waitForPackets() {
|
||||
var tmp [8]byte
|
||||
rawfile.BlockingRead(s.eventFD, tmp[:])
|
||||
s.eventFD.Wait()
|
||||
}
|
||||
|
|
|
@ -20,6 +20,7 @@ package sharedmem
|
|||
import (
|
||||
"golang.org/x/sys/unix"
|
||||
"gvisor.dev/gvisor/pkg/cleanup"
|
||||
"gvisor.dev/gvisor/pkg/eventfd"
|
||||
"gvisor.dev/gvisor/pkg/tcpip/buffer"
|
||||
"gvisor.dev/gvisor/pkg/tcpip/link/sharedmem/pipe"
|
||||
"gvisor.dev/gvisor/pkg/tcpip/link/sharedmem/queue"
|
||||
|
@ -40,7 +41,7 @@ type serverTx struct {
|
|||
data []byte
|
||||
|
||||
// eventFD is used to notify the peer when fill requests are fulfilled.
|
||||
eventFD int
|
||||
eventFD eventfd.Eventfd
|
||||
|
||||
// sharedData the memory region to use to enable/disable notifications.
|
||||
sharedData []byte
|
||||
|
@ -80,16 +81,11 @@ func (s *serverTx) init(c *QueueConfig) error {
|
|||
|
||||
// Duplicate the eventFD so that caller can close it but we can still
|
||||
// use it.
|
||||
efd, err := unix.Dup(c.EventFD)
|
||||
efd, err := c.EventFD.Dup()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
cu.Add(func() { unix.Close(efd) })
|
||||
|
||||
// Set the eventfd as non-blocking.
|
||||
if err := unix.SetNonblock(efd, true); err != nil {
|
||||
return err
|
||||
}
|
||||
cu.Add(func() { efd.Close() })
|
||||
|
||||
cu.Release()
|
||||
|
||||
|
@ -107,7 +103,7 @@ func (s *serverTx) cleanup() {
|
|||
unix.Munmap(s.completionPipe.Bytes())
|
||||
unix.Munmap(s.data)
|
||||
unix.Munmap(s.sharedData)
|
||||
unix.Close(s.eventFD)
|
||||
s.eventFD.Close()
|
||||
}
|
||||
|
||||
// fillPacket copies the data in the provided views into buffers pulled from the
|
||||
|
@ -175,5 +171,5 @@ func (s *serverTx) transmit(views []buffer.View) bool {
|
|||
}
|
||||
|
||||
func (s *serverTx) notify() {
|
||||
unix.Write(s.eventFD, []byte{1, 0, 0, 0, 0, 0, 0, 0})
|
||||
s.eventFD.Notify()
|
||||
}
|
||||
|
|
|
@ -27,7 +27,7 @@ import (
|
|||
"fmt"
|
||||
"sync/atomic"
|
||||
|
||||
"golang.org/x/sys/unix"
|
||||
"gvisor.dev/gvisor/pkg/eventfd"
|
||||
"gvisor.dev/gvisor/pkg/log"
|
||||
"gvisor.dev/gvisor/pkg/sync"
|
||||
"gvisor.dev/gvisor/pkg/tcpip"
|
||||
|
@ -49,7 +49,7 @@ type QueueConfig struct {
|
|||
|
||||
// EventFD is a file descriptor for the event that is signaled when
|
||||
// data is becomes available in this queue.
|
||||
EventFD int
|
||||
EventFD eventfd.Eventfd
|
||||
|
||||
// TxPipeFD is a file descriptor for the tx pipe associated with the
|
||||
// queue.
|
||||
|
@ -70,7 +70,7 @@ type QueueConfig struct {
|
|||
// of FDs matches when reconstructing the config when serialized or sent
|
||||
// as part of control messages.
|
||||
func (q *QueueConfig) FDs() []int {
|
||||
return []int{q.DataFD, q.EventFD, q.TxPipeFD, q.RxPipeFD, q.SharedDataFD}
|
||||
return []int{q.DataFD, q.EventFD.FD(), q.TxPipeFD, q.RxPipeFD, q.SharedDataFD}
|
||||
}
|
||||
|
||||
// QueueConfigFromFDs constructs a QueueConfig out of a slice of ints where each
|
||||
|
@ -84,7 +84,7 @@ func QueueConfigFromFDs(fds []int) (QueueConfig, error) {
|
|||
}
|
||||
return QueueConfig{
|
||||
DataFD: fds[0],
|
||||
EventFD: fds[1],
|
||||
EventFD: eventfd.Wrap(fds[1]),
|
||||
TxPipeFD: fds[2],
|
||||
RxPipeFD: fds[3],
|
||||
SharedDataFD: fds[4],
|
||||
|
@ -223,7 +223,7 @@ func (e *endpoint) Close() {
|
|||
// Tell dispatch goroutine to stop, then write to the eventfd so that
|
||||
// it wakes up in case it's sleeping.
|
||||
atomic.StoreUint32(&e.stopRequested, 1)
|
||||
unix.Write(e.rx.eventFD, []byte{1, 0, 0, 0, 0, 0, 0, 0})
|
||||
e.rx.eventFD.Notify()
|
||||
|
||||
// Cleanup the queues inline if the worker hasn't started yet; we also
|
||||
// know it won't start from now on because stopRequested is set to 1.
|
||||
|
|
|
@ -20,7 +20,6 @@ package sharedmem
|
|||
import (
|
||||
"sync/atomic"
|
||||
|
||||
"golang.org/x/sys/unix"
|
||||
"gvisor.dev/gvisor/pkg/sync"
|
||||
"gvisor.dev/gvisor/pkg/tcpip"
|
||||
"gvisor.dev/gvisor/pkg/tcpip/buffer"
|
||||
|
@ -122,7 +121,7 @@ func (e *serverEndpoint) Close() {
|
|||
// Tell dispatch goroutine to stop, then write to the eventfd so that it wakes
|
||||
// up in case it's sleeping.
|
||||
atomic.StoreUint32(&e.stopRequested, 1)
|
||||
unix.Write(e.rx.eventFD, []byte{1, 0, 0, 0, 0, 0, 0, 0})
|
||||
e.rx.eventFD.Notify()
|
||||
|
||||
// Cleanup the queues inline if the worker hasn't started yet; we also know it
|
||||
// won't start from now on because stopRequested is set to 1.
|
||||
|
|
|
@ -619,7 +619,7 @@ func TestSimpleReceive(t *testing.T) {
|
|||
// Push completion.
|
||||
c.pushRxCompletion(uint32(len(contents)), bufs)
|
||||
c.rxq.rx.Flush()
|
||||
unix.Write(c.rxCfg.EventFD, []byte{1, 0, 0, 0, 0, 0, 0, 0})
|
||||
c.rxCfg.EventFD.Notify()
|
||||
|
||||
// Wait for packet to be received, then check it.
|
||||
c.waitForPackets(1, time.After(5*time.Second), "Timeout waiting for packet")
|
||||
|
@ -665,7 +665,7 @@ func TestRxBuffersReposted(t *testing.T) {
|
|||
// Complete the buffer.
|
||||
c.pushRxCompletion(buffers[i].Size, buffers[i:][:1])
|
||||
c.rxq.rx.Flush()
|
||||
unix.Write(c.rxCfg.EventFD, []byte{1, 0, 0, 0, 0, 0, 0, 0})
|
||||
c.rxCfg.EventFD.Notify()
|
||||
|
||||
// Wait for it to be reposted.
|
||||
bi := queue.DecodeRxBufferHeader(pollPull(t, &c.rxq.tx, timeout, "Timeout waiting for buffer to be reposted"))
|
||||
|
@ -681,7 +681,7 @@ func TestRxBuffersReposted(t *testing.T) {
|
|||
// Complete with two buffers.
|
||||
c.pushRxCompletion(2*bufferSize, buffers[2*i:][:2])
|
||||
c.rxq.rx.Flush()
|
||||
unix.Write(c.rxCfg.EventFD, []byte{1, 0, 0, 0, 0, 0, 0, 0})
|
||||
c.rxCfg.EventFD.Notify()
|
||||
|
||||
// Wait for them to be reposted.
|
||||
for j := 0; j < 2; j++ {
|
||||
|
@ -706,7 +706,7 @@ func TestReceivePostingIsFull(t *testing.T) {
|
|||
first := queue.DecodeRxBufferHeader(pollPull(t, &c.rxq.tx, time.After(time.Second), "Timeout waiting for first buffer to be posted"))
|
||||
c.pushRxCompletion(first.Size, []queue.RxBuffer{first})
|
||||
c.rxq.rx.Flush()
|
||||
unix.Write(c.rxCfg.EventFD, []byte{1, 0, 0, 0, 0, 0, 0, 0})
|
||||
c.rxCfg.EventFD.Notify()
|
||||
|
||||
// Check that packet is received.
|
||||
c.waitForPackets(1, time.After(time.Second), "Timeout waiting for completed packet")
|
||||
|
@ -715,7 +715,7 @@ func TestReceivePostingIsFull(t *testing.T) {
|
|||
second := queue.DecodeRxBufferHeader(pollPull(t, &c.rxq.tx, time.After(time.Second), "Timeout waiting for second buffer to be posted"))
|
||||
c.pushRxCompletion(second.Size, []queue.RxBuffer{second})
|
||||
c.rxq.rx.Flush()
|
||||
unix.Write(c.rxCfg.EventFD, []byte{1, 0, 0, 0, 0, 0, 0, 0})
|
||||
c.rxCfg.EventFD.Notify()
|
||||
|
||||
// Check that no packet is received yet, as the worker is blocked trying
|
||||
// to repost.
|
||||
|
@ -728,7 +728,7 @@ func TestReceivePostingIsFull(t *testing.T) {
|
|||
// Flush tx queue, which will allow the first buffer to be reposted,
|
||||
// and the second completion to be pulled.
|
||||
c.rxq.tx.Flush()
|
||||
unix.Write(c.rxCfg.EventFD, []byte{1, 0, 0, 0, 0, 0, 0, 0})
|
||||
c.rxCfg.EventFD.Notify()
|
||||
|
||||
// Check that second packet completes.
|
||||
c.waitForPackets(1, time.After(time.Second), "Timeout waiting for second completed packet")
|
||||
|
@ -750,7 +750,7 @@ func TestCloseWhileWaitingToPost(t *testing.T) {
|
|||
bi := queue.DecodeRxBufferHeader(pollPull(t, &c.rxq.tx, time.After(time.Second), "Timeout waiting for initial buffer to be posted"))
|
||||
c.pushRxCompletion(bi.Size, []queue.RxBuffer{bi})
|
||||
c.rxq.rx.Flush()
|
||||
unix.Write(c.rxCfg.EventFD, []byte{1, 0, 0, 0, 0, 0, 0, 0})
|
||||
c.rxCfg.EventFD.Notify()
|
||||
|
||||
// Wait for packet to be indicated.
|
||||
c.waitForPackets(1, time.After(time.Second), "Timeout waiting for completed packet")
|
||||
|
|
|
@ -18,6 +18,7 @@ import (
|
|||
"math"
|
||||
|
||||
"golang.org/x/sys/unix"
|
||||
"gvisor.dev/gvisor/pkg/eventfd"
|
||||
"gvisor.dev/gvisor/pkg/tcpip/buffer"
|
||||
"gvisor.dev/gvisor/pkg/tcpip/link/sharedmem/queue"
|
||||
)
|
||||
|
@ -32,7 +33,7 @@ type tx struct {
|
|||
q queue.Tx
|
||||
ids idManager
|
||||
bufs bufferManager
|
||||
eventFD int
|
||||
eventFD eventfd.Eventfd
|
||||
sharedDataFD int
|
||||
}
|
||||
|
||||
|
@ -148,7 +149,7 @@ func (t *tx) transmit(bufs ...buffer.View) bool {
|
|||
// notify writes to the tx.eventFD to indicate to the peer that there is data to
|
||||
// be read.
|
||||
func (t *tx) notify() {
|
||||
unix.Write(t.eventFD, []byte{1, 0, 0, 0, 0, 0, 0, 0})
|
||||
t.eventFD.Notify()
|
||||
}
|
||||
|
||||
// getBuffer returns a memory region mapped to the full contents of the given
|
||||
|
|
|
@ -10,6 +10,7 @@ go_library(
|
|||
],
|
||||
visibility = ["//visibility:public"],
|
||||
deps = [
|
||||
"//pkg/eventfd",
|
||||
"//pkg/sync",
|
||||
"@org_golang_x_sys//unix:go_default_library",
|
||||
],
|
||||
|
|
|
@ -23,6 +23,7 @@ import (
|
|||
"sync/atomic"
|
||||
|
||||
"golang.org/x/sys/unix"
|
||||
"gvisor.dev/gvisor/pkg/eventfd"
|
||||
"gvisor.dev/gvisor/pkg/sync"
|
||||
)
|
||||
|
||||
|
@ -55,15 +56,6 @@ func socket(packet bool) (int, error) {
|
|||
return fd, nil
|
||||
}
|
||||
|
||||
// eventFD returns a new event FD with initial value 0.
|
||||
func eventFD() (int, error) {
|
||||
f, _, e := unix.Syscall(unix.SYS_EVENTFD2, 0, 0, 0)
|
||||
if e != 0 {
|
||||
return -1, e
|
||||
}
|
||||
return int(f), nil
|
||||
}
|
||||
|
||||
// Socket is a connected unix domain socket.
|
||||
type Socket struct {
|
||||
// gate protects use of fd.
|
||||
|
@ -78,7 +70,7 @@ type Socket struct {
|
|||
// efd is an event FD that is signaled when the socket is closing.
|
||||
//
|
||||
// efd is immutable and remains valid until Close/Release.
|
||||
efd int
|
||||
efd eventfd.Eventfd
|
||||
|
||||
// race is an atomic variable used to avoid triggering the race
|
||||
// detector. See comment in SocketPair below.
|
||||
|
@ -95,7 +87,7 @@ func NewSocket(fd int) (*Socket, error) {
|
|||
return nil, err
|
||||
}
|
||||
|
||||
efd, err := eventFD()
|
||||
efd, err := eventfd.Create()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -110,16 +102,14 @@ func NewSocket(fd int) (*Socket, error) {
|
|||
// closing the event FD.
|
||||
func (s *Socket) finish() error {
|
||||
// Signal any blocked or future polls.
|
||||
//
|
||||
// N.B. eventfd writes must be 8 bytes.
|
||||
if _, err := unix.Write(s.efd, []byte{1, 0, 0, 0, 0, 0, 0, 0}); err != nil {
|
||||
if err := s.efd.Notify(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Close the gate, blocking until all FD users leave.
|
||||
s.gate.Close()
|
||||
|
||||
return unix.Close(s.efd)
|
||||
return s.efd.Close()
|
||||
}
|
||||
|
||||
// Close closes the socket.
|
||||
|
|
|
@ -43,7 +43,7 @@ func (s *Socket) wait(write bool) error {
|
|||
},
|
||||
{
|
||||
// The eventfd, signaled when we are closing.
|
||||
Fd: int32(s.efd),
|
||||
Fd: int32(s.efd.FD()),
|
||||
Events: unix.POLLIN,
|
||||
},
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue