gvisor/pkg/sentry/kernel/pipe/pipe.go

327 lines
9.5 KiB
Go
Raw Normal View History

// Copyright 2018 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package pipe provides an in-memory implementation of a unidirectional
// pipe.
//
// The goal of this pipe is to emulate the pipe syscall in all of its
// edge cases and guarantees of atomic IO.
package pipe
import (
"fmt"
"sync"
"sync/atomic"
"syscall"
"gvisor.googlesource.com/gvisor/pkg/ilist"
"gvisor.googlesource.com/gvisor/pkg/sentry/context"
"gvisor.googlesource.com/gvisor/pkg/sentry/fs"
"gvisor.googlesource.com/gvisor/pkg/sentry/usermem"
"gvisor.googlesource.com/gvisor/pkg/syserror"
"gvisor.googlesource.com/gvisor/pkg/waiter"
)
// DefaultPipeSize is the system-wide default size of a pipe in bytes.
const DefaultPipeSize = 65536
// Pipe is an encapsulation of a platform-independent pipe.
// It manages a buffered byte queue shared between a reader/writer
// pair.
//
// +stateify savable
type Pipe struct {
waiter.Queue `state:"nosave"`
// Whether this is a named or anonymous pipe.
isNamed bool
// The dirent backing this pipe. Shared by all readers and writers.
Dirent *fs.Dirent
// The buffered byte queue.
data ilist.List
// Max size of the pipe in bytes. When this max has been reached,
// writers will get EWOULDBLOCK.
max int
// Current size of the pipe in bytes.
size int
// Max number of bytes the pipe can guarantee to read or write
// atomically.
atomicIOBytes int
// The number of active readers for this pipe. Load/store atomically.
readers int32
// The number of active writes for this pipe. Load/store atomically.
writers int32
// This flag indicates if this pipe ever had a writer. Note that this does
// not necessarily indicate there is *currently* a writer, just that there
// has been a writer at some point since the pipe was created.
//
// Protected by mu.
hadWriter bool
// Lock protecting all pipe internal state.
mu sync.Mutex `state:"nosave"`
}
// NewPipe initializes and returns a pipe. A pipe created by this function is
// persistent, and will remain valid even without any open fds to it. Named
// pipes for mknod(2) are created via this function. Note that the
// implementation of blocking semantics for opening the read and write ends of a
// named pipe are left to filesystems.
func NewPipe(ctx context.Context, isNamed bool, sizeBytes, atomicIOBytes int) *Pipe {
p := &Pipe{
isNamed: isNamed,
max: sizeBytes,
atomicIOBytes: atomicIOBytes,
}
// Build the fs.Dirent of this pipe, shared by all fs.Files associated
// with this pipe.
perms := fs.FilePermissions{
User: fs.PermMask{Read: true, Write: true},
}
iops := NewInodeOperations(ctx, perms, p)
ino := pipeDevice.NextIno()
sattr := fs.StableAttr{
Type: fs.Pipe,
DeviceID: pipeDevice.DeviceID(),
InodeID: ino,
BlockSize: int64(atomicIOBytes),
}
ms := fs.NewPseudoMountSource()
p.Dirent = fs.NewDirent(fs.NewInode(iops, ms, sattr), fmt.Sprintf("pipe:[%d]", ino))
return p
}
// NewConnectedPipe initializes a pipe and returns a pair of objects (which
// implement kio.File) representing the read and write ends of the pipe. A pipe
// created by this function becomes invalid as soon as either the read or write
// end is closed, and errors on subsequent operations on either end. Pipes
// for pipe(2) and pipe2(2) are generally created this way.
func NewConnectedPipe(ctx context.Context, sizeBytes int, atomicIOBytes int) (*fs.File, *fs.File) {
p := NewPipe(ctx, false /* isNamed */, sizeBytes, atomicIOBytes)
return p.ROpen(ctx), p.WOpen(ctx)
}
// ROpen opens the pipe for reading.
func (p *Pipe) ROpen(ctx context.Context) *fs.File {
p.rOpen()
return fs.NewFile(ctx, p.Dirent, fs.FileFlags{Read: true}, &Reader{
ReaderWriter: ReaderWriter{Pipe: p},
})
}
// WOpen opens the pipe for writing.
func (p *Pipe) WOpen(ctx context.Context) *fs.File {
p.wOpen()
return fs.NewFile(ctx, p.Dirent, fs.FileFlags{Write: true}, &Writer{
ReaderWriter: ReaderWriter{Pipe: p},
})
}
// RWOpen opens the pipe for both reading and writing.
func (p *Pipe) RWOpen(ctx context.Context) *fs.File {
p.rOpen()
p.wOpen()
return fs.NewFile(ctx, p.Dirent, fs.FileFlags{Read: true, Write: true}, &ReaderWriter{
Pipe: p,
})
}
// read reads data from the pipe into dst and returns the number of bytes
// read, or returns ErrWouldBlock if the pipe is empty.
func (p *Pipe) read(ctx context.Context, dst usermem.IOSequence) (int64, error) {
if !p.HasReaders() {
return 0, syscall.EBADF
}
// Don't block for a zero-length read even if the pipe is empty.
if dst.NumBytes() == 0 {
return 0, nil
}
p.mu.Lock()
defer p.mu.Unlock()
// If there is nothing to read at the moment but there is a writer, tell the
// caller to block.
if p.size == 0 {
if !p.HasWriters() {
// There are no writers, return EOF.
return 0, nil
}
return 0, syserror.ErrWouldBlock
}
var n int64
for b := p.data.Front(); b != nil; b = p.data.Front() {
buffer := b.(*Buffer)
n0, err := dst.CopyOut(ctx, buffer.bytes())
n += int64(n0)
p.size -= n0
if buffer.truncate(n0) == 0 {
p.data.Remove(b)
}
dst = dst.DropFirst(n0)
if dst.NumBytes() == 0 || err != nil {
return n, err
}
}
return n, nil
}
// write writes data from sv into the pipe and returns the number of bytes
// written. If no bytes are written because the pipe is full (or has less than
// atomicIOBytes free capacity), write returns ErrWouldBlock.
func (p *Pipe) write(ctx context.Context, src usermem.IOSequence) (int64, error) {
p.mu.Lock()
defer p.mu.Unlock()
if !p.HasWriters() {
return 0, syscall.EBADF
}
if !p.HasReaders() {
return 0, syscall.EPIPE
}
// POSIX requires that a write smaller than atomicIOBytes (PIPE_BUF) be
// atomic, but requires no atomicity for writes larger than this. However,
// Linux appears to provide stronger semantics than this in practice:
// unmerged writes are done one PAGE_SIZE buffer at a time, so for larger
// writes, the writing of each PIPE_BUF-sized chunk is atomic. We implement
// this by writing at most atomicIOBytes at a time if we can't service the
// write in its entirety.
canWrite := src.NumBytes()
if canWrite > int64(p.max-p.size) {
if p.max-p.size >= p.atomicIOBytes {
canWrite = int64(p.atomicIOBytes)
} else {
return 0, syserror.ErrWouldBlock
}
}
// Copy data from user memory into a pipe-owned buffer.
buf := make([]byte, canWrite)
n, err := src.CopyIn(ctx, buf)
if n > 0 {
p.data.PushBack(newBuffer(buf[:n]))
p.size += n
}
if int64(n) < src.NumBytes() && err == nil {
// Partial write due to full pipe.
err = syserror.ErrWouldBlock
}
return int64(n), err
}
// rOpen signals a new reader of the pipe.
func (p *Pipe) rOpen() {
atomic.AddInt32(&p.readers, 1)
}
// wOpen signals a new writer of the pipe.
func (p *Pipe) wOpen() {
p.mu.Lock()
defer p.mu.Unlock()
p.hadWriter = true
atomic.AddInt32(&p.writers, 1)
}
// rClose signals that a reader has closed their end of the pipe.
func (p *Pipe) rClose() {
newReaders := atomic.AddInt32(&p.readers, -1)
if newReaders < 0 {
panic(fmt.Sprintf("Refcounting bug, pipe has negative readers: %v", newReaders))
}
}
// wClose signals that a writer has closed their end of the pipe.
func (p *Pipe) wClose() {
newWriters := atomic.AddInt32(&p.writers, -1)
if newWriters < 0 {
panic(fmt.Sprintf("Refcounting bug, pipe has negative writers: %v.", newWriters))
}
}
// HasReaders returns whether the pipe has any active readers.
func (p *Pipe) HasReaders() bool {
return atomic.LoadInt32(&p.readers) > 0
}
// HasWriters returns whether the pipe has any active writers.
func (p *Pipe) HasWriters() bool {
return atomic.LoadInt32(&p.writers) > 0
}
func (p *Pipe) rReadinessLocked() waiter.EventMask {
ready := waiter.EventMask(0)
if p.HasReaders() && p.data.Front() != nil {
ready |= waiter.EventIn
}
if !p.HasWriters() && p.hadWriter {
// POLLHUP must be suppressed until the pipe has had at least one writer
// at some point. Otherwise a reader thread may poll and immediately get
// a POLLHUP before the writer ever opens the pipe, which the reader may
// interpret as the writer opening then closing the pipe.
ready |= waiter.EventHUp
}
return ready
}
// rReadiness returns a mask that states whether the read end of the pipe is
// ready for reading.
func (p *Pipe) rReadiness() waiter.EventMask {
p.mu.Lock()
defer p.mu.Unlock()
return p.rReadinessLocked()
}
func (p *Pipe) wReadinessLocked() waiter.EventMask {
ready := waiter.EventMask(0)
if p.HasWriters() && p.size < p.max {
ready |= waiter.EventOut
}
if !p.HasReaders() {
ready |= waiter.EventErr
}
return ready
}
// wReadiness returns a mask that states whether the write end of the pipe
// is ready for writing.
func (p *Pipe) wReadiness() waiter.EventMask {
p.mu.Lock()
defer p.mu.Unlock()
return p.wReadinessLocked()
}
// rwReadiness returns a mask that states whether a read-write handle to the
// pipe is ready for IO.
func (p *Pipe) rwReadiness() waiter.EventMask {
p.mu.Lock()
defer p.mu.Unlock()
return p.rReadinessLocked() | p.wReadinessLocked()
}
func (p *Pipe) queuedSize() int {
p.mu.Lock()
defer p.mu.Unlock()
return p.size
}