gvisor/pkg/flipcall/endpoint_unsafe.go

239 lines
8.6 KiB
Go

// Copyright 2019 The gVisor Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package flipcall
import (
"fmt"
"math"
"reflect"
"sync/atomic"
"syscall"
"unsafe"
)
// An Endpoint provides the ability to synchronously transfer data and control
// to a connected peer Endpoint, which may be in another process.
//
// Since the Endpoint control transfer model is synchronous, at any given time
// one Endpoint "has control" (designated the *active* Endpoint), and the other
// is "waiting for control" (designated the *inactive* Endpoint). Users of the
// flipcall package arbitrarily designate one Endpoint as initially-active, and
// the other as initially-inactive; in a client/server protocol, the client
// Endpoint is usually initially-active (able to send a request) and the server
// Endpoint is usually initially-inactive (waiting for a request). The
// initially-active Endpoint writes data to be sent to Endpoint.Data(), and
// then synchronously transfers control to the inactive Endpoint by calling
// Endpoint.SendRecv(), becoming the inactive Endpoint in the process. The
// initially-inactive Endpoint waits for control by calling
// Endpoint.RecvFirst(); receiving control causes it to become the active
// Endpoint. After this, the protocol is symmetric: the active Endpoint reads
// data sent by the peer by reading from Endpoint.Data(), writes data to be
// sent to the peer into Endpoint.Data(), and then calls Endpoint.SendRecv() to
// exchange roles with the peer, which blocks until the peer has done the same.
type Endpoint struct {
// shutdown is non-zero if Endpoint.Shutdown() has been called. shutdown is
// accessed using atomic memory operations.
shutdown uint32
// dataCap is the size of the datagram part of the packet window in bytes.
// dataCap is immutable.
dataCap uint32
// packet is the beginning of the packet window. packet is immutable.
packet unsafe.Pointer
ctrl endpointControlState
}
// Init must be called on zero-value Endpoints before first use. If it
// succeeds, Destroy() must be called once the Endpoint is no longer in use.
//
// ctrlMode specifies how connected Endpoints will exchange control. Both
// connected Endpoints must specify the same value for ctrlMode.
//
// pwd represents the packet window used to exchange data with the peer
// Endpoint. FD may differ between Endpoints if they are in different
// processes, but must represent the same file. The packet window must
// initially be filled with zero bytes.
func (ep *Endpoint) Init(ctrlMode ControlMode, pwd PacketWindowDescriptor) error {
if pwd.Length < pageSize {
return fmt.Errorf("packet window size (%d) less than minimum (%d)", pwd.Length, pageSize)
}
if pwd.Length > math.MaxUint32 {
return fmt.Errorf("packet window size (%d) exceeds maximum (%d)", pwd.Length, math.MaxUint32)
}
m, _, e := syscall.Syscall6(syscall.SYS_MMAP, 0, uintptr(pwd.Length), syscall.PROT_READ|syscall.PROT_WRITE, syscall.MAP_SHARED, uintptr(pwd.FD), uintptr(pwd.Offset))
if e != 0 {
return fmt.Errorf("failed to mmap packet window: %v", e)
}
ep.dataCap = uint32(pwd.Length) - uint32(packetHeaderBytes)
ep.packet = (unsafe.Pointer)(m)
if err := ep.initControlState(ctrlMode); err != nil {
ep.unmapPacket()
return err
}
return nil
}
// NewEndpoint is a convenience function that returns an initialized Endpoint
// allocated on the heap.
func NewEndpoint(ctrlMode ControlMode, pwd PacketWindowDescriptor) (*Endpoint, error) {
var ep Endpoint
if err := ep.Init(ctrlMode, pwd); err != nil {
return nil, err
}
return &ep, nil
}
func (ep *Endpoint) unmapPacket() {
syscall.Syscall(syscall.SYS_MUNMAP, uintptr(ep.packet), uintptr(ep.dataCap)+packetHeaderBytes, 0)
ep.dataCap = 0
ep.packet = nil
}
// Destroy releases resources owned by ep. No other Endpoint methods may be
// called after Destroy.
func (ep *Endpoint) Destroy() {
ep.unmapPacket()
}
// Packets consist of an 8-byte header followed by an arbitrarily-sized
// datagram. The header consists of:
//
// - A 4-byte native-endian sequence number, which is incremented by the active
// Endpoint after it finishes writing to the packet window. The sequence number
// is needed to handle spurious wakeups.
//
// - A 4-byte native-endian datagram length in bytes.
const (
sizeofUint32 = unsafe.Sizeof(uint32(0))
packetHeaderBytes = 2 * sizeofUint32
)
func (ep *Endpoint) seq() *uint32 {
return (*uint32)(ep.packet)
}
func (ep *Endpoint) dataLen() *uint32 {
return (*uint32)((unsafe.Pointer)(uintptr(ep.packet) + sizeofUint32))
}
// DataCap returns the maximum datagram size supported by ep in bytes.
func (ep *Endpoint) DataCap() uint32 {
return ep.dataCap
}
func (ep *Endpoint) data() unsafe.Pointer {
return unsafe.Pointer(uintptr(ep.packet) + packetHeaderBytes)
}
// Data returns the datagram part of ep's packet window as a byte slice.
//
// Note that the packet window is shared with the potentially-untrusted peer
// Endpoint, which may concurrently mutate the contents of the packet window.
// Thus:
//
// - Readers must not assume that two reads of the same byte in Data() will
// return the same result. In other words, readers should read any given byte
// in Data() at most once.
//
// - Writers must not assume that they will read back the same data that they
// have written. In other words, writers should avoid reading from Data() at
// all.
func (ep *Endpoint) Data() []byte {
var bs []byte
bsReflect := (*reflect.SliceHeader)((unsafe.Pointer)(&bs))
bsReflect.Data = uintptr(ep.data())
bsReflect.Len = int(ep.DataCap())
bsReflect.Cap = bsReflect.Len
return bs
}
// SendRecv transfers control to the peer Endpoint, causing its call to
// Endpoint.SendRecv() or Endpoint.RecvFirst() to return with the given
// datagram length, then blocks until the peer Endpoint calls
// Endpoint.SendRecv() or Endpoint.SendLast().
//
// Preconditions: No previous call to ep.SendRecv() or ep.RecvFirst() has
// returned an error. ep.SendLast() has never been called.
func (ep *Endpoint) SendRecv(dataLen uint32) (uint32, error) {
dataCap := ep.DataCap()
if dataLen > dataCap {
return 0, fmt.Errorf("can't send packet with datagram length %d (maximum %d)", dataLen, dataCap)
}
atomic.StoreUint32(ep.dataLen(), dataLen)
if err := ep.doRoundTrip(); err != nil {
return 0, err
}
recvDataLen := atomic.LoadUint32(ep.dataLen())
if recvDataLen > dataCap {
return 0, fmt.Errorf("received packet with invalid datagram length %d (maximum %d)", recvDataLen, dataCap)
}
return recvDataLen, nil
}
// RecvFirst blocks until the peer Endpoint calls Endpoint.SendRecv(), then
// returns the datagram length specified by that call.
//
// Preconditions: ep.SendRecv(), ep.RecvFirst(), and ep.SendLast() have never
// been called.
func (ep *Endpoint) RecvFirst() (uint32, error) {
if err := ep.doWaitFirst(); err != nil {
return 0, err
}
recvDataLen := atomic.LoadUint32(ep.dataLen())
if dataCap := ep.DataCap(); recvDataLen > dataCap {
return 0, fmt.Errorf("received packet with invalid datagram length %d (maximum %d)", recvDataLen, dataCap)
}
return recvDataLen, nil
}
// SendLast causes the peer Endpoint's call to Endpoint.SendRecv() or
// Endpoint.RecvFirst() to return with the given datagram length.
//
// Preconditions: No previous call to ep.SendRecv() or ep.RecvFirst() has
// returned an error. ep.SendLast() has never been called.
func (ep *Endpoint) SendLast(dataLen uint32) error {
dataCap := ep.DataCap()
if dataLen > dataCap {
return fmt.Errorf("can't send packet with datagram length %d (maximum %d)", dataLen, dataCap)
}
atomic.StoreUint32(ep.dataLen(), dataLen)
if err := ep.doNotifyLast(); err != nil {
return err
}
return nil
}
// Shutdown causes concurrent and future calls to ep.SendRecv(),
// ep.RecvFirst(), and ep.SendLast() to unblock and return errors. It does not
// wait for concurrent calls to return.
func (ep *Endpoint) Shutdown() {
if atomic.SwapUint32(&ep.shutdown, 1) == 0 {
ep.interruptForShutdown()
}
}
func (ep *Endpoint) isShutdown() bool {
return atomic.LoadUint32(&ep.shutdown) != 0
}
type endpointShutdownError struct{}
// Error implements error.Error.
func (endpointShutdownError) Error() string {
return "Endpoint.Shutdown() has been called"
}