2018-04-27 17:37:02 +00:00
|
|
|
// Copyright 2018 Google Inc.
|
|
|
|
//
|
|
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
|
|
// you may not use this file except in compliance with the License.
|
|
|
|
// You may obtain a copy of the License at
|
|
|
|
//
|
|
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
//
|
|
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
// See the License for the specific language governing permissions and
|
|
|
|
// limitations under the License.
|
|
|
|
|
|
|
|
// Package pipe provides an in-memory implementation of a unidirectional
|
|
|
|
// pipe.
|
|
|
|
//
|
|
|
|
// The goal of this pipe is to emulate the pipe syscall in all of its
|
|
|
|
// edge cases and guarantees of atomic IO.
|
|
|
|
package pipe
|
|
|
|
|
|
|
|
import (
|
|
|
|
"fmt"
|
|
|
|
"sync"
|
|
|
|
"sync/atomic"
|
|
|
|
"syscall"
|
|
|
|
|
|
|
|
"gvisor.googlesource.com/gvisor/pkg/abi/linux"
|
|
|
|
"gvisor.googlesource.com/gvisor/pkg/ilist"
|
|
|
|
"gvisor.googlesource.com/gvisor/pkg/sentry/context"
|
|
|
|
"gvisor.googlesource.com/gvisor/pkg/sentry/fs"
|
|
|
|
"gvisor.googlesource.com/gvisor/pkg/sentry/fs/fsutil"
|
|
|
|
"gvisor.googlesource.com/gvisor/pkg/sentry/usermem"
|
|
|
|
"gvisor.googlesource.com/gvisor/pkg/syserror"
|
|
|
|
"gvisor.googlesource.com/gvisor/pkg/waiter"
|
|
|
|
)
|
|
|
|
|
|
|
|
// DefaultPipeSize is the system-wide default size of a pipe in bytes.
|
|
|
|
const DefaultPipeSize = 65536
|
|
|
|
|
|
|
|
// Pipe is an encapsulation of a platform-independent pipe.
|
|
|
|
// It manages a buffered byte queue shared between a reader/writer
|
|
|
|
// pair.
|
2018-08-02 17:41:44 +00:00
|
|
|
//
|
|
|
|
// +stateify savable
|
2018-04-27 17:37:02 +00:00
|
|
|
type Pipe struct {
|
|
|
|
waiter.Queue `state:"nosave"`
|
|
|
|
|
|
|
|
// Whether this is a named or anonymous pipe.
|
|
|
|
isNamed bool
|
|
|
|
|
|
|
|
// The dirent backing this pipe. Shared by all readers and writers.
|
|
|
|
dirent *fs.Dirent
|
|
|
|
|
|
|
|
// The buffered byte queue.
|
|
|
|
data ilist.List
|
|
|
|
|
|
|
|
// Max size of the pipe in bytes. When this max has been reached,
|
|
|
|
// writers will get EWOULDBLOCK.
|
|
|
|
max int
|
|
|
|
|
|
|
|
// Current size of the pipe in bytes.
|
|
|
|
size int
|
|
|
|
|
|
|
|
// Max number of bytes the pipe can guarantee to read or write
|
|
|
|
// atomically.
|
|
|
|
atomicIOBytes int
|
|
|
|
|
|
|
|
// The number of active readers for this pipe. Load/store atomically.
|
|
|
|
readers int32
|
|
|
|
|
|
|
|
// The number of active writes for this pipe. Load/store atomically.
|
|
|
|
writers int32
|
|
|
|
|
|
|
|
// This flag indicates if this pipe ever had a writer. Note that this does
|
|
|
|
// not necessarily indicate there is *currently* a writer, just that there
|
|
|
|
// has been a writer at some point since the pipe was created.
|
|
|
|
//
|
|
|
|
// Protected by mu.
|
|
|
|
hadWriter bool
|
|
|
|
|
|
|
|
// Lock protecting all pipe internal state.
|
|
|
|
mu sync.Mutex `state:"nosave"`
|
|
|
|
}
|
|
|
|
|
|
|
|
// NewPipe initializes and returns a pipe. A pipe created by this function is
|
|
|
|
// persistent, and will remain valid even without any open fds to it. Named
|
|
|
|
// pipes for mknod(2) are created via this function. Note that the
|
|
|
|
// implementation of blocking semantics for opening the read and write ends of a
|
|
|
|
// named pipe are left to filesystems.
|
|
|
|
func NewPipe(ctx context.Context, isNamed bool, sizeBytes, atomicIOBytes int) *Pipe {
|
|
|
|
p := &Pipe{
|
|
|
|
isNamed: isNamed,
|
|
|
|
max: sizeBytes,
|
|
|
|
atomicIOBytes: atomicIOBytes,
|
|
|
|
}
|
|
|
|
|
|
|
|
// Build the fs.Dirent of this pipe, shared by all fs.Files associated
|
|
|
|
// with this pipe.
|
|
|
|
ino := pipeDevice.NextIno()
|
|
|
|
base := fsutil.NewSimpleInodeOperations(fsutil.InodeSimpleAttributes{
|
|
|
|
FSType: linux.PIPEFS_MAGIC,
|
|
|
|
UAttr: fs.WithCurrentTime(ctx, fs.UnstableAttr{
|
|
|
|
Owner: fs.FileOwnerFromContext(ctx),
|
|
|
|
Perms: fs.FilePermissions{
|
|
|
|
User: fs.PermMask{Read: true, Write: true},
|
|
|
|
},
|
|
|
|
Links: 1,
|
|
|
|
}),
|
|
|
|
})
|
|
|
|
sattr := fs.StableAttr{
|
|
|
|
Type: fs.Pipe,
|
|
|
|
DeviceID: pipeDevice.DeviceID(),
|
|
|
|
InodeID: ino,
|
|
|
|
BlockSize: int64(atomicIOBytes),
|
|
|
|
}
|
|
|
|
// There is no real filesystem backing this pipe, so we pass in a nil
|
|
|
|
// Filesystem.
|
|
|
|
sb := fs.NewNonCachingMountSource(nil, fs.MountSourceFlags{})
|
|
|
|
p.dirent = fs.NewDirent(fs.NewInode(NewInodeOperations(base, p), sb, sattr), fmt.Sprintf("pipe:[%d]", ino))
|
|
|
|
|
|
|
|
return p
|
|
|
|
}
|
|
|
|
|
|
|
|
// NewConnectedPipe initializes a pipe and returns a pair of objects (which
|
|
|
|
// implement kio.File) representing the read and write ends of the pipe. A pipe
|
|
|
|
// created by this function becomes invalid as soon as either the read or write
|
|
|
|
// end is closed, and errors on subsequent operations on either end. Pipes
|
|
|
|
// for pipe(2) and pipe2(2) are generally created this way.
|
|
|
|
func NewConnectedPipe(ctx context.Context, sizeBytes int, atomicIOBytes int) (*fs.File, *fs.File) {
|
|
|
|
p := NewPipe(ctx, false /* isNamed */, sizeBytes, atomicIOBytes)
|
|
|
|
return p.ROpen(ctx), p.WOpen(ctx)
|
|
|
|
}
|
|
|
|
|
|
|
|
// ROpen opens the pipe for reading.
|
|
|
|
func (p *Pipe) ROpen(ctx context.Context) *fs.File {
|
|
|
|
p.rOpen()
|
|
|
|
return fs.NewFile(ctx, p.dirent, fs.FileFlags{Read: true}, &Reader{
|
|
|
|
ReaderWriter: ReaderWriter{Pipe: p},
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
|
|
|
// WOpen opens the pipe for writing.
|
|
|
|
func (p *Pipe) WOpen(ctx context.Context) *fs.File {
|
|
|
|
p.wOpen()
|
|
|
|
return fs.NewFile(ctx, p.dirent, fs.FileFlags{Write: true}, &Writer{
|
|
|
|
ReaderWriter: ReaderWriter{Pipe: p},
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
|
|
|
// RWOpen opens the pipe for both reading and writing.
|
|
|
|
func (p *Pipe) RWOpen(ctx context.Context) *fs.File {
|
|
|
|
p.rOpen()
|
|
|
|
p.wOpen()
|
|
|
|
return fs.NewFile(ctx, p.dirent, fs.FileFlags{Read: true, Write: true}, &ReaderWriter{
|
|
|
|
Pipe: p,
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
|
|
|
// read reads data from the pipe into dst and returns the number of bytes
|
|
|
|
// read, or returns ErrWouldBlock if the pipe is empty.
|
|
|
|
func (p *Pipe) read(ctx context.Context, dst usermem.IOSequence) (int64, error) {
|
|
|
|
if !p.HasReaders() {
|
|
|
|
return 0, syscall.EBADF
|
|
|
|
}
|
|
|
|
|
|
|
|
// Don't block for a zero-length read even if the pipe is empty.
|
|
|
|
if dst.NumBytes() == 0 {
|
|
|
|
return 0, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
p.mu.Lock()
|
|
|
|
defer p.mu.Unlock()
|
|
|
|
// If there is nothing to read at the moment but there is a writer, tell the
|
|
|
|
// caller to block.
|
|
|
|
if p.size == 0 {
|
|
|
|
if !p.HasWriters() {
|
|
|
|
// There are no writers, return EOF.
|
|
|
|
return 0, nil
|
|
|
|
}
|
|
|
|
return 0, syserror.ErrWouldBlock
|
|
|
|
}
|
|
|
|
var n int64
|
|
|
|
for b := p.data.Front(); b != nil; b = p.data.Front() {
|
|
|
|
buffer := b.(*Buffer)
|
|
|
|
n0, err := dst.CopyOut(ctx, buffer.bytes())
|
|
|
|
n += int64(n0)
|
|
|
|
p.size -= n0
|
|
|
|
if buffer.truncate(n0) == 0 {
|
|
|
|
p.data.Remove(b)
|
|
|
|
}
|
|
|
|
dst = dst.DropFirst(n0)
|
|
|
|
if dst.NumBytes() == 0 || err != nil {
|
|
|
|
return n, err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return n, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// write writes data from sv into the pipe and returns the number of bytes
|
|
|
|
// written. If no bytes are written because the pipe is full (or has less than
|
|
|
|
// atomicIOBytes free capacity), write returns ErrWouldBlock.
|
|
|
|
func (p *Pipe) write(ctx context.Context, src usermem.IOSequence) (int64, error) {
|
|
|
|
p.mu.Lock()
|
|
|
|
defer p.mu.Unlock()
|
|
|
|
|
|
|
|
if !p.HasWriters() {
|
|
|
|
return 0, syscall.EBADF
|
|
|
|
}
|
|
|
|
if !p.HasReaders() {
|
|
|
|
return 0, syscall.EPIPE
|
|
|
|
}
|
|
|
|
|
|
|
|
// POSIX requires that a write smaller than atomicIOBytes (PIPE_BUF) be
|
|
|
|
// atomic, but requires no atomicity for writes larger than this. However,
|
|
|
|
// Linux appears to provide stronger semantics than this in practice:
|
|
|
|
// unmerged writes are done one PAGE_SIZE buffer at a time, so for larger
|
|
|
|
// writes, the writing of each PIPE_BUF-sized chunk is atomic. We implement
|
|
|
|
// this by writing at most atomicIOBytes at a time if we can't service the
|
|
|
|
// write in its entirety.
|
|
|
|
canWrite := src.NumBytes()
|
|
|
|
if canWrite > int64(p.max-p.size) {
|
|
|
|
if p.max-p.size >= p.atomicIOBytes {
|
|
|
|
canWrite = int64(p.atomicIOBytes)
|
|
|
|
} else {
|
|
|
|
return 0, syserror.ErrWouldBlock
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Copy data from user memory into a pipe-owned buffer.
|
|
|
|
buf := make([]byte, canWrite)
|
|
|
|
n, err := src.CopyIn(ctx, buf)
|
|
|
|
if n > 0 {
|
|
|
|
p.data.PushBack(newBuffer(buf[:n]))
|
|
|
|
p.size += n
|
|
|
|
}
|
|
|
|
if int64(n) < src.NumBytes() && err == nil {
|
|
|
|
// Partial write due to full pipe.
|
|
|
|
err = syserror.ErrWouldBlock
|
|
|
|
}
|
|
|
|
return int64(n), err
|
|
|
|
}
|
|
|
|
|
|
|
|
// rOpen signals a new reader of the pipe.
|
|
|
|
func (p *Pipe) rOpen() {
|
|
|
|
atomic.AddInt32(&p.readers, 1)
|
|
|
|
}
|
|
|
|
|
|
|
|
// wOpen signals a new writer of the pipe.
|
|
|
|
func (p *Pipe) wOpen() {
|
|
|
|
p.mu.Lock()
|
|
|
|
defer p.mu.Unlock()
|
|
|
|
p.hadWriter = true
|
|
|
|
atomic.AddInt32(&p.writers, 1)
|
|
|
|
}
|
|
|
|
|
|
|
|
// rClose signals that a reader has closed their end of the pipe.
|
|
|
|
func (p *Pipe) rClose() {
|
|
|
|
newReaders := atomic.AddInt32(&p.readers, -1)
|
|
|
|
if newReaders < 0 {
|
|
|
|
panic(fmt.Sprintf("Refcounting bug, pipe has negative readers: %v", newReaders))
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// wClose signals that a writer has closed their end of the pipe.
|
|
|
|
func (p *Pipe) wClose() {
|
|
|
|
newWriters := atomic.AddInt32(&p.writers, -1)
|
|
|
|
if newWriters < 0 {
|
|
|
|
panic(fmt.Sprintf("Refcounting bug, pipe has negative writers: %v.", newWriters))
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// HasReaders returns whether the pipe has any active readers.
|
|
|
|
func (p *Pipe) HasReaders() bool {
|
|
|
|
return atomic.LoadInt32(&p.readers) > 0
|
|
|
|
}
|
|
|
|
|
|
|
|
// HasWriters returns whether the pipe has any active writers.
|
|
|
|
func (p *Pipe) HasWriters() bool {
|
|
|
|
return atomic.LoadInt32(&p.writers) > 0
|
|
|
|
}
|
|
|
|
|
|
|
|
func (p *Pipe) rReadinessLocked() waiter.EventMask {
|
|
|
|
ready := waiter.EventMask(0)
|
|
|
|
if p.HasReaders() && p.data.Front() != nil {
|
|
|
|
ready |= waiter.EventIn
|
|
|
|
}
|
|
|
|
if !p.HasWriters() && p.hadWriter {
|
2018-05-07 23:37:08 +00:00
|
|
|
// POLLHUP must be suppressed until the pipe has had at least one writer
|
2018-04-27 17:37:02 +00:00
|
|
|
// at some point. Otherwise a reader thread may poll and immediately get
|
|
|
|
// a POLLHUP before the writer ever opens the pipe, which the reader may
|
|
|
|
// interpret as the writer opening then closing the pipe.
|
|
|
|
ready |= waiter.EventHUp
|
|
|
|
}
|
|
|
|
return ready
|
|
|
|
}
|
|
|
|
|
|
|
|
// rReadiness returns a mask that states whether the read end of the pipe is
|
|
|
|
// ready for reading.
|
|
|
|
func (p *Pipe) rReadiness() waiter.EventMask {
|
|
|
|
p.mu.Lock()
|
|
|
|
defer p.mu.Unlock()
|
|
|
|
return p.rReadinessLocked()
|
|
|
|
}
|
|
|
|
|
|
|
|
func (p *Pipe) wReadinessLocked() waiter.EventMask {
|
|
|
|
ready := waiter.EventMask(0)
|
|
|
|
if p.HasWriters() && p.size < p.max {
|
|
|
|
ready |= waiter.EventOut
|
|
|
|
}
|
|
|
|
if !p.HasReaders() {
|
|
|
|
ready |= waiter.EventErr
|
|
|
|
}
|
|
|
|
return ready
|
|
|
|
}
|
|
|
|
|
|
|
|
// wReadiness returns a mask that states whether the write end of the pipe
|
|
|
|
// is ready for writing.
|
|
|
|
func (p *Pipe) wReadiness() waiter.EventMask {
|
|
|
|
p.mu.Lock()
|
|
|
|
defer p.mu.Unlock()
|
|
|
|
return p.wReadinessLocked()
|
|
|
|
}
|
|
|
|
|
|
|
|
// rwReadiness returns a mask that states whether a read-write handle to the
|
|
|
|
// pipe is ready for IO.
|
|
|
|
func (p *Pipe) rwReadiness() waiter.EventMask {
|
|
|
|
p.mu.Lock()
|
|
|
|
defer p.mu.Unlock()
|
|
|
|
return p.rReadinessLocked() | p.wReadinessLocked()
|
|
|
|
}
|
|
|
|
|
|
|
|
func (p *Pipe) queuedSize() int {
|
|
|
|
p.mu.Lock()
|
|
|
|
defer p.mu.Unlock()
|
|
|
|
return p.size
|
|
|
|
}
|