gvisor/pkg/sentry/kernel/pipe/pipe.go

467 lines
11 KiB
Go

// Copyright 2018 The gVisor Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package pipe provides a pipe implementation.
package pipe
import (
"fmt"
"sync"
"sync/atomic"
"syscall"
"gvisor.dev/gvisor/pkg/sentry/context"
"gvisor.dev/gvisor/pkg/sentry/fs"
"gvisor.dev/gvisor/pkg/syserror"
"gvisor.dev/gvisor/pkg/waiter"
)
const (
// MinimumPipeSize is a hard limit of the minimum size of a pipe.
MinimumPipeSize = 64 << 10
// DefaultPipeSize is the system-wide default size of a pipe in bytes.
DefaultPipeSize = MinimumPipeSize
// MaximumPipeSize is a hard limit on the maximum size of a pipe.
MaximumPipeSize = 8 << 20
)
// Pipe is an encapsulation of a platform-independent pipe.
// It manages a buffered byte queue shared between a reader/writer
// pair.
//
// +stateify savable
type Pipe struct {
waiter.Queue `state:"nosave"`
// isNamed indicates whether this is a named pipe.
//
// This value is immutable.
isNamed bool
// atomicIOBytes is the maximum number of bytes that the pipe will
// guarantee atomic reads or writes atomically.
//
// This value is immutable.
atomicIOBytes int64
// The number of active readers for this pipe.
//
// Access atomically.
readers int32
// The number of active writes for this pipe.
//
// Access atomically.
writers int32
// mu protects all pipe internal state below.
mu sync.Mutex `state:"nosave"`
// data is the buffer queue of pipe contents.
//
// This is protected by mu.
data bufferList
// max is the maximum size of the pipe in bytes. When this max has been
// reached, writers will get EWOULDBLOCK.
//
// This is protected by mu.
max int64
// size is the current size of the pipe in bytes.
//
// This is protected by mu.
size int64
// hadWriter indicates if this pipe ever had a writer. Note that this
// does not necessarily indicate there is *currently* a writer, just
// that there has been a writer at some point since the pipe was
// created.
//
// This is protected by mu.
hadWriter bool
}
// NewPipe initializes and returns a pipe.
//
// N.B. The size and atomicIOBytes will be bounded.
func NewPipe(isNamed bool, sizeBytes, atomicIOBytes int64) *Pipe {
if sizeBytes < MinimumPipeSize {
sizeBytes = MinimumPipeSize
}
if sizeBytes > MaximumPipeSize {
sizeBytes = MaximumPipeSize
}
if atomicIOBytes <= 0 {
atomicIOBytes = 1
}
if atomicIOBytes > sizeBytes {
atomicIOBytes = sizeBytes
}
return &Pipe{
isNamed: isNamed,
max: sizeBytes,
atomicIOBytes: atomicIOBytes,
}
}
// NewConnectedPipe initializes a pipe and returns a pair of objects
// representing the read and write ends of the pipe.
func NewConnectedPipe(ctx context.Context, sizeBytes, atomicIOBytes int64) (*fs.File, *fs.File) {
p := NewPipe(false /* isNamed */, sizeBytes, atomicIOBytes)
// Build an fs.Dirent for the pipe which will be shared by both
// returned files.
perms := fs.FilePermissions{
User: fs.PermMask{Read: true, Write: true},
}
iops := NewInodeOperations(ctx, perms, p)
ino := pipeDevice.NextIno()
sattr := fs.StableAttr{
Type: fs.Pipe,
DeviceID: pipeDevice.DeviceID(),
InodeID: ino,
BlockSize: int64(atomicIOBytes),
}
ms := fs.NewPseudoMountSource(ctx)
d := fs.NewDirent(ctx, fs.NewInode(ctx, iops, ms, sattr), fmt.Sprintf("pipe:[%d]", ino))
// The p.Open calls below will each take a reference on the Dirent. We
// must drop the one we already have.
defer d.DecRef()
return p.Open(ctx, d, fs.FileFlags{Read: true}), p.Open(ctx, d, fs.FileFlags{Write: true})
}
// Open opens the pipe and returns a new file.
//
// Precondition: at least one of flags.Read or flags.Write must be set.
func (p *Pipe) Open(ctx context.Context, d *fs.Dirent, flags fs.FileFlags) *fs.File {
flags.NonSeekable = true
switch {
case flags.Read && flags.Write:
p.rOpen()
p.wOpen()
return fs.NewFile(ctx, d, flags, &ReaderWriter{
Pipe: p,
})
case flags.Read:
p.rOpen()
return fs.NewFile(ctx, d, flags, &Reader{
ReaderWriter: ReaderWriter{Pipe: p},
})
case flags.Write:
p.wOpen()
return fs.NewFile(ctx, d, flags, &Writer{
ReaderWriter: ReaderWriter{Pipe: p},
})
default:
// Precondition violated.
panic("invalid pipe flags")
}
}
type readOps struct {
// left returns the bytes remaining.
left func() int64
// limit limits subsequence reads.
limit func(int64)
// read performs the actual read operation.
read func(*buffer) (int64, error)
}
// read reads data from the pipe into dst and returns the number of bytes
// read, or returns ErrWouldBlock if the pipe is empty.
//
// Precondition: this pipe must have readers.
func (p *Pipe) read(ctx context.Context, ops readOps) (int64, error) {
// Don't block for a zero-length read even if the pipe is empty.
if ops.left() == 0 {
return 0, nil
}
p.mu.Lock()
defer p.mu.Unlock()
// Is the pipe empty?
if p.size == 0 {
if !p.HasWriters() {
// There are no writers, return EOF.
return 0, nil
}
return 0, syserror.ErrWouldBlock
}
// Limit how much we consume.
if ops.left() > p.size {
ops.limit(p.size)
}
done := int64(0)
for ops.left() > 0 {
// Pop the first buffer.
first := p.data.Front()
if first == nil {
break
}
// Copy user data.
n, err := ops.read(first)
done += int64(n)
p.size -= n
// Empty buffer?
if first.Empty() {
// Push to the free list.
p.data.Remove(first)
bufferPool.Put(first)
}
// Handle errors.
if err != nil {
return done, err
}
}
return done, nil
}
// dup duplicates all data from this pipe into the given writer.
//
// There is no blocking behavior implemented here. The writer may propagate
// some blocking error. All the writes must be complete writes.
func (p *Pipe) dup(ctx context.Context, ops readOps) (int64, error) {
p.mu.Lock()
defer p.mu.Unlock()
// Is the pipe empty?
if p.size == 0 {
if !p.HasWriters() {
// See above.
return 0, nil
}
return 0, syserror.ErrWouldBlock
}
// Limit how much we consume.
if ops.left() > p.size {
ops.limit(p.size)
}
done := int64(0)
for buf := p.data.Front(); buf != nil; buf = buf.Next() {
n, err := ops.read(buf)
done += n
if err != nil {
return done, err
}
}
return done, nil
}
type writeOps struct {
// left returns the bytes remaining.
left func() int64
// limit should limit subsequent writes.
limit func(int64)
// write should write to the provided buffer.
write func(*buffer) (int64, error)
}
// write writes data from sv into the pipe and returns the number of bytes
// written. If no bytes are written because the pipe is full (or has less than
// atomicIOBytes free capacity), write returns ErrWouldBlock.
//
// Precondition: this pipe must have writers.
func (p *Pipe) write(ctx context.Context, ops writeOps) (int64, error) {
p.mu.Lock()
defer p.mu.Unlock()
// Can't write to a pipe with no readers.
if !p.HasReaders() {
return 0, syscall.EPIPE
}
// POSIX requires that a write smaller than atomicIOBytes (PIPE_BUF) be
// atomic, but requires no atomicity for writes larger than this.
wanted := ops.left()
if avail := p.max - p.size; wanted > avail {
if wanted <= p.atomicIOBytes {
return 0, syserror.ErrWouldBlock
}
ops.limit(avail)
}
done := int64(0)
for ops.left() > 0 {
// Need a new buffer?
last := p.data.Back()
if last == nil || last.Full() {
// Add a new buffer to the data list.
last = newBuffer()
p.data.PushBack(last)
}
// Copy user data.
n, err := ops.write(last)
done += int64(n)
p.size += n
// Handle errors.
if err != nil {
return done, err
}
}
if wanted > done {
// Partial write due to full pipe.
return done, syserror.ErrWouldBlock
}
return done, nil
}
// rOpen signals a new reader of the pipe.
func (p *Pipe) rOpen() {
atomic.AddInt32(&p.readers, 1)
}
// wOpen signals a new writer of the pipe.
func (p *Pipe) wOpen() {
p.mu.Lock()
defer p.mu.Unlock()
p.hadWriter = true
atomic.AddInt32(&p.writers, 1)
}
// rClose signals that a reader has closed their end of the pipe.
func (p *Pipe) rClose() {
newReaders := atomic.AddInt32(&p.readers, -1)
if newReaders < 0 {
panic(fmt.Sprintf("Refcounting bug, pipe has negative readers: %v", newReaders))
}
}
// wClose signals that a writer has closed their end of the pipe.
func (p *Pipe) wClose() {
newWriters := atomic.AddInt32(&p.writers, -1)
if newWriters < 0 {
panic(fmt.Sprintf("Refcounting bug, pipe has negative writers: %v.", newWriters))
}
}
// HasReaders returns whether the pipe has any active readers.
func (p *Pipe) HasReaders() bool {
return atomic.LoadInt32(&p.readers) > 0
}
// HasWriters returns whether the pipe has any active writers.
func (p *Pipe) HasWriters() bool {
return atomic.LoadInt32(&p.writers) > 0
}
// rReadinessLocked calculates the read readiness.
//
// Precondition: mu must be held.
func (p *Pipe) rReadinessLocked() waiter.EventMask {
ready := waiter.EventMask(0)
if p.HasReaders() && p.data.Front() != nil {
ready |= waiter.EventIn
}
if !p.HasWriters() && p.hadWriter {
// POLLHUP must be suppressed until the pipe has had at least one writer
// at some point. Otherwise a reader thread may poll and immediately get
// a POLLHUP before the writer ever opens the pipe, which the reader may
// interpret as the writer opening then closing the pipe.
ready |= waiter.EventHUp
}
return ready
}
// rReadiness returns a mask that states whether the read end of the pipe is
// ready for reading.
func (p *Pipe) rReadiness() waiter.EventMask {
p.mu.Lock()
defer p.mu.Unlock()
return p.rReadinessLocked()
}
// wReadinessLocked calculates the write readiness.
//
// Precondition: mu must be held.
func (p *Pipe) wReadinessLocked() waiter.EventMask {
ready := waiter.EventMask(0)
if p.HasWriters() && p.size < p.max {
ready |= waiter.EventOut
}
if !p.HasReaders() {
ready |= waiter.EventErr
}
return ready
}
// wReadiness returns a mask that states whether the write end of the pipe
// is ready for writing.
func (p *Pipe) wReadiness() waiter.EventMask {
p.mu.Lock()
defer p.mu.Unlock()
return p.wReadinessLocked()
}
// rwReadiness returns a mask that states whether a read-write handle to the
// pipe is ready for IO.
func (p *Pipe) rwReadiness() waiter.EventMask {
p.mu.Lock()
defer p.mu.Unlock()
return p.rReadinessLocked() | p.wReadinessLocked()
}
// queued returns the amount of queued data.
func (p *Pipe) queued() int64 {
p.mu.Lock()
defer p.mu.Unlock()
return p.size
}
// FifoSize implements fs.FifoSizer.FifoSize.
func (p *Pipe) FifoSize(context.Context, *fs.File) (int64, error) {
p.mu.Lock()
defer p.mu.Unlock()
return p.max, nil
}
// SetFifoSize implements fs.FifoSizer.SetFifoSize.
func (p *Pipe) SetFifoSize(size int64) (int64, error) {
if size < 0 {
return 0, syserror.EINVAL
}
if size < MinimumPipeSize {
size = MinimumPipeSize // Per spec.
}
if size > MaximumPipeSize {
return 0, syserror.EPERM
}
p.mu.Lock()
defer p.mu.Unlock()
if size < p.size {
return 0, syserror.EBUSY
}
p.max = size
return size, nil
}