Implement splice methods for pipes and sockets.

This also allows the tee(2) implementation to be enabled, since dup can now be
properly supported via WriteTo.

Note that this change necessitated some minor restructoring with the
fs.FileOperations splice methods. If the *fs.File is passed through directly,
then only public API methods are accessible, which will deadlock immediately
since the locking is already done by fs.Splice. Instead, we pass through an
abstract io.Reader or io.Writer, which elide locks and use the underlying
fs.FileOperations directly.

PiperOrigin-RevId: 268805207
This commit is contained in:
Adin Scannell 2019-09-12 17:42:14 -07:00 committed by gVisor bot
parent df5d377521
commit 7c6ab6a219
23 changed files with 770 additions and 281 deletions

View File

@ -515,6 +515,11 @@ type lockedReader struct {
// File is the file to read from. // File is the file to read from.
File *File File *File
// Offset is the offset to start at.
//
// This applies only to Read, not ReadAt.
Offset int64
} }
// Read implements io.Reader.Read. // Read implements io.Reader.Read.
@ -522,7 +527,8 @@ func (r *lockedReader) Read(buf []byte) (int, error) {
if r.Ctx.Interrupted() { if r.Ctx.Interrupted() {
return 0, syserror.ErrInterrupted return 0, syserror.ErrInterrupted
} }
n, err := r.File.FileOperations.Read(r.Ctx, r.File, usermem.BytesIOSequence(buf), r.File.offset) n, err := r.File.FileOperations.Read(r.Ctx, r.File, usermem.BytesIOSequence(buf), r.Offset)
r.Offset += n
return int(n), err return int(n), err
} }
@ -544,11 +550,21 @@ type lockedWriter struct {
// File is the file to write to. // File is the file to write to.
File *File File *File
// Offset is the offset to start at.
//
// This applies only to Write, not WriteAt.
Offset int64
} }
// Write implements io.Writer.Write. // Write implements io.Writer.Write.
func (w *lockedWriter) Write(buf []byte) (int, error) { func (w *lockedWriter) Write(buf []byte) (int, error) {
return w.WriteAt(buf, w.File.offset) if w.Ctx.Interrupted() {
return 0, syserror.ErrInterrupted
}
n, err := w.WriteAt(buf, w.Offset)
w.Offset += int64(n)
return int(n), err
} }
// WriteAt implements io.Writer.WriteAt. // WriteAt implements io.Writer.WriteAt.
@ -562,6 +578,9 @@ func (w *lockedWriter) WriteAt(buf []byte, offset int64) (int, error) {
// io.Copy, since our own Write interface does not have this same // io.Copy, since our own Write interface does not have this same
// contract. Enforce that here. // contract. Enforce that here.
for written < len(buf) { for written < len(buf) {
if w.Ctx.Interrupted() {
return written, syserror.ErrInterrupted
}
var n int64 var n int64
n, err = w.File.FileOperations.Write(w.Ctx, w.File, usermem.BytesIOSequence(buf[written:]), offset+int64(written)) n, err = w.File.FileOperations.Write(w.Ctx, w.File, usermem.BytesIOSequence(buf[written:]), offset+int64(written))
if n > 0 { if n > 0 {

View File

@ -15,6 +15,8 @@
package fs package fs
import ( import (
"io"
"gvisor.dev/gvisor/pkg/sentry/arch" "gvisor.dev/gvisor/pkg/sentry/arch"
"gvisor.dev/gvisor/pkg/sentry/context" "gvisor.dev/gvisor/pkg/sentry/context"
"gvisor.dev/gvisor/pkg/sentry/memmap" "gvisor.dev/gvisor/pkg/sentry/memmap"
@ -105,8 +107,11 @@ type FileOperations interface {
// on the destination, following by a buffered copy with standard Read // on the destination, following by a buffered copy with standard Read
// and Write operations. // and Write operations.
// //
// If dup is set, the data should be duplicated into the destination
// and retained.
//
// The same preconditions as Read apply. // The same preconditions as Read apply.
WriteTo(ctx context.Context, file *File, dst *File, opts SpliceOpts) (int64, error) WriteTo(ctx context.Context, file *File, dst io.Writer, count int64, dup bool) (int64, error)
// Write writes src to file at offset and returns the number of bytes // Write writes src to file at offset and returns the number of bytes
// written which must be greater than or equal to 0. Like Read, file // written which must be greater than or equal to 0. Like Read, file
@ -126,7 +131,7 @@ type FileOperations interface {
// source. See WriteTo for details regarding how this is called. // source. See WriteTo for details regarding how this is called.
// //
// The same preconditions as Write apply; FileFlags.Write must be set. // The same preconditions as Write apply; FileFlags.Write must be set.
ReadFrom(ctx context.Context, file *File, src *File, opts SpliceOpts) (int64, error) ReadFrom(ctx context.Context, file *File, src io.Reader, count int64) (int64, error)
// Fsync writes buffered modifications of file and/or flushes in-flight // Fsync writes buffered modifications of file and/or flushes in-flight
// operations to backing storage based on syncType. The range to sync is // operations to backing storage based on syncType. The range to sync is

View File

@ -15,6 +15,7 @@
package fs package fs
import ( import (
"io"
"sync" "sync"
"gvisor.dev/gvisor/pkg/refs" "gvisor.dev/gvisor/pkg/refs"
@ -268,9 +269,9 @@ func (f *overlayFileOperations) Read(ctx context.Context, file *File, dst userme
} }
// WriteTo implements FileOperations.WriteTo. // WriteTo implements FileOperations.WriteTo.
func (f *overlayFileOperations) WriteTo(ctx context.Context, file *File, dst *File, opts SpliceOpts) (n int64, err error) { func (f *overlayFileOperations) WriteTo(ctx context.Context, file *File, dst io.Writer, count int64, dup bool) (n int64, err error) {
err = f.onTop(ctx, file, func(file *File, ops FileOperations) error { err = f.onTop(ctx, file, func(file *File, ops FileOperations) error {
n, err = ops.WriteTo(ctx, file, dst, opts) n, err = ops.WriteTo(ctx, file, dst, count, dup)
return err // Will overwrite itself. return err // Will overwrite itself.
}) })
return return
@ -285,9 +286,9 @@ func (f *overlayFileOperations) Write(ctx context.Context, file *File, src userm
} }
// ReadFrom implements FileOperations.ReadFrom. // ReadFrom implements FileOperations.ReadFrom.
func (f *overlayFileOperations) ReadFrom(ctx context.Context, file *File, src *File, opts SpliceOpts) (n int64, err error) { func (f *overlayFileOperations) ReadFrom(ctx context.Context, file *File, src io.Reader, count int64) (n int64, err error) {
// See above; f.upper must be non-nil. // See above; f.upper must be non-nil.
return f.upper.FileOperations.ReadFrom(ctx, f.upper, src, opts) return f.upper.FileOperations.ReadFrom(ctx, f.upper, src, count)
} }
// Fsync implements FileOperations.Fsync. // Fsync implements FileOperations.Fsync.

View File

@ -15,6 +15,8 @@
package fsutil package fsutil
import ( import (
"io"
"gvisor.dev/gvisor/pkg/sentry/arch" "gvisor.dev/gvisor/pkg/sentry/arch"
"gvisor.dev/gvisor/pkg/sentry/context" "gvisor.dev/gvisor/pkg/sentry/context"
"gvisor.dev/gvisor/pkg/sentry/fs" "gvisor.dev/gvisor/pkg/sentry/fs"
@ -228,12 +230,12 @@ func (FileNoIoctl) Ioctl(context.Context, *fs.File, usermem.IO, arch.SyscallArgu
type FileNoSplice struct{} type FileNoSplice struct{}
// WriteTo implements fs.FileOperations.WriteTo. // WriteTo implements fs.FileOperations.WriteTo.
func (FileNoSplice) WriteTo(context.Context, *fs.File, *fs.File, fs.SpliceOpts) (int64, error) { func (FileNoSplice) WriteTo(context.Context, *fs.File, io.Writer, int64, bool) (int64, error) {
return 0, syserror.ENOSYS return 0, syserror.ENOSYS
} }
// ReadFrom implements fs.FileOperations.ReadFrom. // ReadFrom implements fs.FileOperations.ReadFrom.
func (FileNoSplice) ReadFrom(context.Context, *fs.File, *fs.File, fs.SpliceOpts) (int64, error) { func (FileNoSplice) ReadFrom(context.Context, *fs.File, io.Reader, int64) (int64, error) {
return 0, syserror.ENOSYS return 0, syserror.ENOSYS
} }

View File

@ -15,6 +15,7 @@
package fs package fs
import ( import (
"io"
"sync" "sync"
"sync/atomic" "sync/atomic"
@ -172,7 +173,7 @@ func (i *Inotify) Read(ctx context.Context, _ *File, dst usermem.IOSequence, _ i
} }
// WriteTo implements FileOperations.WriteTo. // WriteTo implements FileOperations.WriteTo.
func (*Inotify) WriteTo(context.Context, *File, *File, SpliceOpts) (int64, error) { func (*Inotify) WriteTo(context.Context, *File, io.Writer, int64, bool) (int64, error) {
return 0, syserror.ENOSYS return 0, syserror.ENOSYS
} }
@ -182,7 +183,7 @@ func (*Inotify) Fsync(context.Context, *File, int64, int64, SyncType) error {
} }
// ReadFrom implements FileOperations.ReadFrom. // ReadFrom implements FileOperations.ReadFrom.
func (*Inotify) ReadFrom(context.Context, *File, *File, SpliceOpts) (int64, error) { func (*Inotify) ReadFrom(context.Context, *File, io.Reader, int64) (int64, error) {
return 0, syserror.ENOSYS return 0, syserror.ENOSYS
} }

View File

@ -18,7 +18,6 @@ import (
"io" "io"
"sync/atomic" "sync/atomic"
"gvisor.dev/gvisor/pkg/secio"
"gvisor.dev/gvisor/pkg/sentry/context" "gvisor.dev/gvisor/pkg/sentry/context"
"gvisor.dev/gvisor/pkg/syserror" "gvisor.dev/gvisor/pkg/syserror"
) )
@ -33,146 +32,131 @@ func Splice(ctx context.Context, dst *File, src *File, opts SpliceOpts) (int64,
} }
// Check whether or not the objects being sliced are stream-oriented // Check whether or not the objects being sliced are stream-oriented
// (i.e. pipes or sockets). If yes, we elide checks and offset locks. // (i.e. pipes or sockets). For all stream-oriented files and files
srcPipe := IsPipe(src.Dirent.Inode.StableAttr) || IsSocket(src.Dirent.Inode.StableAttr) // where a specific offiset is not request, we acquire the file mutex.
dstPipe := IsPipe(dst.Dirent.Inode.StableAttr) || IsSocket(dst.Dirent.Inode.StableAttr) // This has two important side effects. First, it provides the standard
// protection against concurrent writes that would mutate the offset.
// Second, it prevents Splice deadlocks. Only internal anonymous files
// implement the ReadFrom and WriteTo methods directly, and since such
// anonymous files are referred to by a unique fs.File object, we know
// that the file mutex takes strict precedence over internal locks.
// Since we enforce lock ordering here, we can't deadlock by using
// using a file in two different splice operations simultaneously.
srcPipe := !IsRegular(src.Dirent.Inode.StableAttr)
dstPipe := !IsRegular(dst.Dirent.Inode.StableAttr)
dstAppend := !dstPipe && dst.Flags().Append
srcLock := srcPipe || !opts.SrcOffset
dstLock := dstPipe || !opts.DstOffset || dstAppend
if !dstPipe && !opts.DstOffset && !srcPipe && !opts.SrcOffset { switch {
case srcLock && dstLock:
switch { switch {
case dst.UniqueID < src.UniqueID: case dst.UniqueID < src.UniqueID:
// Acquire dst first. // Acquire dst first.
if !dst.mu.Lock(ctx) { if !dst.mu.Lock(ctx) {
return 0, syserror.ErrInterrupted return 0, syserror.ErrInterrupted
} }
defer dst.mu.Unlock()
if !src.mu.Lock(ctx) { if !src.mu.Lock(ctx) {
dst.mu.Unlock()
return 0, syserror.ErrInterrupted return 0, syserror.ErrInterrupted
} }
defer src.mu.Unlock()
case dst.UniqueID > src.UniqueID: case dst.UniqueID > src.UniqueID:
// Acquire src first. // Acquire src first.
if !src.mu.Lock(ctx) { if !src.mu.Lock(ctx) {
return 0, syserror.ErrInterrupted return 0, syserror.ErrInterrupted
} }
defer src.mu.Unlock()
if !dst.mu.Lock(ctx) { if !dst.mu.Lock(ctx) {
src.mu.Unlock()
return 0, syserror.ErrInterrupted return 0, syserror.ErrInterrupted
} }
defer dst.mu.Unlock()
case dst.UniqueID == src.UniqueID: case dst.UniqueID == src.UniqueID:
// Acquire only one lock; it's the same file. This is a // Acquire only one lock; it's the same file. This is a
// bit of a edge case, but presumably it's possible. // bit of a edge case, but presumably it's possible.
if !dst.mu.Lock(ctx) { if !dst.mu.Lock(ctx) {
return 0, syserror.ErrInterrupted return 0, syserror.ErrInterrupted
} }
defer dst.mu.Unlock() srcLock = false // Only need one unlock.
} }
// Use both offsets (locked). // Use both offsets (locked).
opts.DstStart = dst.offset opts.DstStart = dst.offset
opts.SrcStart = src.offset opts.SrcStart = src.offset
} else if !dstPipe && !opts.DstOffset { case dstLock:
// Acquire only dst. // Acquire only dst.
if !dst.mu.Lock(ctx) { if !dst.mu.Lock(ctx) {
return 0, syserror.ErrInterrupted return 0, syserror.ErrInterrupted
} }
defer dst.mu.Unlock()
opts.DstStart = dst.offset // Safe: locked. opts.DstStart = dst.offset // Safe: locked.
} else if !srcPipe && !opts.SrcOffset { case srcLock:
// Acquire only src. // Acquire only src.
if !src.mu.Lock(ctx) { if !src.mu.Lock(ctx) {
return 0, syserror.ErrInterrupted return 0, syserror.ErrInterrupted
} }
defer src.mu.Unlock()
opts.SrcStart = src.offset // Safe: locked. opts.SrcStart = src.offset // Safe: locked.
} }
// Check append-only mode and the limit. var err error
if !dstPipe { if dstAppend {
unlock := dst.Dirent.Inode.lockAppendMu(dst.Flags().Append) unlock := dst.Dirent.Inode.lockAppendMu(dst.Flags().Append)
defer unlock() defer unlock()
if dst.Flags().Append {
if opts.DstOffset {
// We need to acquire the lock.
if !dst.mu.Lock(ctx) {
return 0, syserror.ErrInterrupted
}
defer dst.mu.Unlock()
}
// Figure out the appropriate offset to use.
if err := dst.offsetForAppend(ctx, &opts.DstStart); err != nil {
return 0, err
}
}
// Figure out the appropriate offset to use.
err = dst.offsetForAppend(ctx, &opts.DstStart)
}
if err == nil && !dstPipe {
// Enforce file limits. // Enforce file limits.
limit, ok := dst.checkLimit(ctx, opts.DstStart) limit, ok := dst.checkLimit(ctx, opts.DstStart)
switch { switch {
case ok && limit == 0: case ok && limit == 0:
return 0, syserror.ErrExceedsFileSizeLimit err = syserror.ErrExceedsFileSizeLimit
case ok && limit < opts.Length: case ok && limit < opts.Length:
opts.Length = limit // Cap the write. opts.Length = limit // Cap the write.
} }
} }
if err != nil {
if dstLock {
dst.mu.Unlock()
}
if srcLock {
src.mu.Unlock()
}
return 0, err
}
// Construct readers and writers for the splice. This is used to
// provide a safer locking path for the WriteTo/ReadFrom operations
// (since they will otherwise go through public interface methods which
// conflict with locking done above), and simplifies the fallback path.
w := &lockedWriter{
Ctx: ctx,
File: dst,
Offset: opts.DstStart,
}
r := &lockedReader{
Ctx: ctx,
File: src,
Offset: opts.SrcStart,
}
// Attempt to do a WriteTo; this is likely the most efficient. // Attempt to do a WriteTo; this is likely the most efficient.
// n, err := src.FileOperations.WriteTo(ctx, src, w, opts.Length, opts.Dup)
// The underlying implementation may be able to donate buffers. if n == 0 && err != nil && err != syserror.ErrWouldBlock && !opts.Dup {
newOpts := SpliceOpts{ // Attempt as a ReadFrom. If a WriteTo, a ReadFrom may also be
Length: opts.Length, // more efficient than a copy if buffers are cached or readily
SrcStart: opts.SrcStart, // available. (It's unlikely that they can actually be donated).
SrcOffset: !srcPipe, n, err = dst.FileOperations.ReadFrom(ctx, dst, r, opts.Length)
Dup: opts.Dup,
DstStart: opts.DstStart,
DstOffset: !dstPipe,
} }
n, err := src.FileOperations.WriteTo(ctx, src, dst, newOpts)
if n == 0 && err != nil {
// Attempt as a ReadFrom. If a WriteTo, a ReadFrom may also
// be more efficient than a copy if buffers are cached or readily
// available. (It's unlikely that they can actually be donate
n, err = dst.FileOperations.ReadFrom(ctx, dst, src, newOpts)
}
if n == 0 && err != nil {
// If we've failed up to here, and at least one of the sources
// is a pipe or socket, then we can't properly support dup.
// Return an error indicating that this operation is not
// supported.
if (srcPipe || dstPipe) && newOpts.Dup {
return 0, syserror.EINVAL
}
// We failed to splice the files. But that's fine; we just fall // Support one last fallback option, but only if at least one of
// back to a slow path in this case. This copies without doing // the source and destination are regular files. This is because
// any mode changes, so should still be more efficient. // if we block at some point, we could lose data. If the source is
var ( // not a pipe then reading is not destructive; if the destination
r io.Reader // is a regular file, then it is guaranteed not to block writing.
w io.Writer if n == 0 && err != nil && err != syserror.ErrWouldBlock && !opts.Dup && (!dstPipe || !srcPipe) {
) // Fallback to an in-kernel copy.
fw := &lockedWriter{ n, err = io.Copy(w, &io.LimitedReader{
Ctx: ctx, R: r,
File: dst, N: opts.Length,
} })
if newOpts.DstOffset {
// Use the provided offset.
w = secio.NewOffsetWriter(fw, newOpts.DstStart)
} else {
// Writes will proceed with no offset.
w = fw
}
fr := &lockedReader{
Ctx: ctx,
File: src,
}
if newOpts.SrcOffset {
// Limit to the given offset and length.
r = io.NewSectionReader(fr, opts.SrcStart, opts.Length)
} else {
// Limit just to the given length.
r = &io.LimitedReader{fr, opts.Length}
}
// Copy between the two.
n, err = io.Copy(w, r)
} }
// Update offsets, if required. // Update offsets, if required.
@ -185,5 +169,13 @@ func Splice(ctx context.Context, dst *File, src *File, opts SpliceOpts) (int64,
} }
} }
// Drop locks.
if dstLock {
dst.mu.Unlock()
}
if srcLock {
src.mu.Unlock()
}
return n, err return n, err
} }

View File

@ -15,6 +15,7 @@
package pipe package pipe
import ( import (
"io"
"sync" "sync"
"gvisor.dev/gvisor/pkg/sentry/safemem" "gvisor.dev/gvisor/pkg/sentry/safemem"
@ -67,6 +68,17 @@ func (b *buffer) WriteFromBlocks(srcs safemem.BlockSeq) (uint64, error) {
return n, err return n, err
} }
// WriteFromReader writes to the buffer from an io.Reader.
func (b *buffer) WriteFromReader(r io.Reader, count int64) (int64, error) {
dst := b.data[b.write:]
if count < int64(len(dst)) {
dst = b.data[b.write:][:count]
}
n, err := r.Read(dst)
b.write += n
return int64(n), err
}
// ReadToBlocks implements safemem.Reader.ReadToBlocks. // ReadToBlocks implements safemem.Reader.ReadToBlocks.
func (b *buffer) ReadToBlocks(dsts safemem.BlockSeq) (uint64, error) { func (b *buffer) ReadToBlocks(dsts safemem.BlockSeq) (uint64, error) {
src := safemem.BlockSeqOf(safemem.BlockFromSafeSlice(b.data[b.read:b.write])) src := safemem.BlockSeqOf(safemem.BlockFromSafeSlice(b.data[b.read:b.write]))
@ -75,6 +87,19 @@ func (b *buffer) ReadToBlocks(dsts safemem.BlockSeq) (uint64, error) {
return n, err return n, err
} }
// ReadToWriter reads from the buffer into an io.Writer.
func (b *buffer) ReadToWriter(w io.Writer, count int64, dup bool) (int64, error) {
src := b.data[b.read:b.write]
if count < int64(len(src)) {
src = b.data[b.read:][:count]
}
n, err := w.Write(src)
if !dup {
b.read += n
}
return int64(n), err
}
// bufferPool is a pool for buffers. // bufferPool is a pool for buffers.
var bufferPool = sync.Pool{ var bufferPool = sync.Pool{
New: func() interface{} { New: func() interface{} {

View File

@ -23,7 +23,6 @@ import (
"gvisor.dev/gvisor/pkg/sentry/context" "gvisor.dev/gvisor/pkg/sentry/context"
"gvisor.dev/gvisor/pkg/sentry/fs" "gvisor.dev/gvisor/pkg/sentry/fs"
"gvisor.dev/gvisor/pkg/sentry/usermem"
"gvisor.dev/gvisor/pkg/syserror" "gvisor.dev/gvisor/pkg/syserror"
"gvisor.dev/gvisor/pkg/waiter" "gvisor.dev/gvisor/pkg/waiter"
) )
@ -173,13 +172,24 @@ func (p *Pipe) Open(ctx context.Context, d *fs.Dirent, flags fs.FileFlags) *fs.F
} }
} }
type readOps struct {
// left returns the bytes remaining.
left func() int64
// limit limits subsequence reads.
limit func(int64)
// read performs the actual read operation.
read func(*buffer) (int64, error)
}
// read reads data from the pipe into dst and returns the number of bytes // read reads data from the pipe into dst and returns the number of bytes
// read, or returns ErrWouldBlock if the pipe is empty. // read, or returns ErrWouldBlock if the pipe is empty.
// //
// Precondition: this pipe must have readers. // Precondition: this pipe must have readers.
func (p *Pipe) read(ctx context.Context, dst usermem.IOSequence) (int64, error) { func (p *Pipe) read(ctx context.Context, ops readOps) (int64, error) {
// Don't block for a zero-length read even if the pipe is empty. // Don't block for a zero-length read even if the pipe is empty.
if dst.NumBytes() == 0 { if ops.left() == 0 {
return 0, nil return 0, nil
} }
@ -196,12 +206,12 @@ func (p *Pipe) read(ctx context.Context, dst usermem.IOSequence) (int64, error)
} }
// Limit how much we consume. // Limit how much we consume.
if dst.NumBytes() > p.size { if ops.left() > p.size {
dst = dst.TakeFirst64(p.size) ops.limit(p.size)
} }
done := int64(0) done := int64(0)
for dst.NumBytes() > 0 { for ops.left() > 0 {
// Pop the first buffer. // Pop the first buffer.
first := p.data.Front() first := p.data.Front()
if first == nil { if first == nil {
@ -209,10 +219,9 @@ func (p *Pipe) read(ctx context.Context, dst usermem.IOSequence) (int64, error)
} }
// Copy user data. // Copy user data.
n, err := dst.CopyOutFrom(ctx, first) n, err := ops.read(first)
done += int64(n) done += int64(n)
p.size -= n p.size -= n
dst = dst.DropFirst64(n)
// Empty buffer? // Empty buffer?
if first.Empty() { if first.Empty() {
@ -230,12 +239,57 @@ func (p *Pipe) read(ctx context.Context, dst usermem.IOSequence) (int64, error)
return done, nil return done, nil
} }
// dup duplicates all data from this pipe into the given writer.
//
// There is no blocking behavior implemented here. The writer may propagate
// some blocking error. All the writes must be complete writes.
func (p *Pipe) dup(ctx context.Context, ops readOps) (int64, error) {
p.mu.Lock()
defer p.mu.Unlock()
// Is the pipe empty?
if p.size == 0 {
if !p.HasWriters() {
// See above.
return 0, nil
}
return 0, syserror.ErrWouldBlock
}
// Limit how much we consume.
if ops.left() > p.size {
ops.limit(p.size)
}
done := int64(0)
for buf := p.data.Front(); buf != nil; buf = buf.Next() {
n, err := ops.read(buf)
done += n
if err != nil {
return done, err
}
}
return done, nil
}
type writeOps struct {
// left returns the bytes remaining.
left func() int64
// limit should limit subsequent writes.
limit func(int64)
// write should write to the provided buffer.
write func(*buffer) (int64, error)
}
// write writes data from sv into the pipe and returns the number of bytes // write writes data from sv into the pipe and returns the number of bytes
// written. If no bytes are written because the pipe is full (or has less than // written. If no bytes are written because the pipe is full (or has less than
// atomicIOBytes free capacity), write returns ErrWouldBlock. // atomicIOBytes free capacity), write returns ErrWouldBlock.
// //
// Precondition: this pipe must have writers. // Precondition: this pipe must have writers.
func (p *Pipe) write(ctx context.Context, src usermem.IOSequence) (int64, error) { func (p *Pipe) write(ctx context.Context, ops writeOps) (int64, error) {
p.mu.Lock() p.mu.Lock()
defer p.mu.Unlock() defer p.mu.Unlock()
@ -246,17 +300,16 @@ func (p *Pipe) write(ctx context.Context, src usermem.IOSequence) (int64, error)
// POSIX requires that a write smaller than atomicIOBytes (PIPE_BUF) be // POSIX requires that a write smaller than atomicIOBytes (PIPE_BUF) be
// atomic, but requires no atomicity for writes larger than this. // atomic, but requires no atomicity for writes larger than this.
wanted := src.NumBytes() wanted := ops.left()
if avail := p.max - p.size; wanted > avail { if avail := p.max - p.size; wanted > avail {
if wanted <= p.atomicIOBytes { if wanted <= p.atomicIOBytes {
return 0, syserror.ErrWouldBlock return 0, syserror.ErrWouldBlock
} }
// Limit to the available capacity. ops.limit(avail)
src = src.TakeFirst64(avail)
} }
done := int64(0) done := int64(0)
for src.NumBytes() > 0 { for ops.left() > 0 {
// Need a new buffer? // Need a new buffer?
last := p.data.Back() last := p.data.Back()
if last == nil || last.Full() { if last == nil || last.Full() {
@ -266,10 +319,9 @@ func (p *Pipe) write(ctx context.Context, src usermem.IOSequence) (int64, error)
} }
// Copy user data. // Copy user data.
n, err := src.CopyInTo(ctx, last) n, err := ops.write(last)
done += int64(n) done += int64(n)
p.size += n p.size += n
src = src.DropFirst64(n)
// Handle errors. // Handle errors.
if err != nil { if err != nil {

View File

@ -15,6 +15,7 @@
package pipe package pipe
import ( import (
"io"
"math" "math"
"syscall" "syscall"
@ -55,7 +56,45 @@ func (rw *ReaderWriter) Release() {
// Read implements fs.FileOperations.Read. // Read implements fs.FileOperations.Read.
func (rw *ReaderWriter) Read(ctx context.Context, _ *fs.File, dst usermem.IOSequence, _ int64) (int64, error) { func (rw *ReaderWriter) Read(ctx context.Context, _ *fs.File, dst usermem.IOSequence, _ int64) (int64, error) {
n, err := rw.Pipe.read(ctx, dst) n, err := rw.Pipe.read(ctx, readOps{
left: func() int64 {
return dst.NumBytes()
},
limit: func(l int64) {
dst = dst.TakeFirst64(l)
},
read: func(buf *buffer) (int64, error) {
n, err := dst.CopyOutFrom(ctx, buf)
dst = dst.DropFirst64(n)
return n, err
},
})
if n > 0 {
rw.Pipe.Notify(waiter.EventOut)
}
return n, err
}
// WriteTo implements fs.FileOperations.WriteTo.
func (rw *ReaderWriter) WriteTo(ctx context.Context, _ *fs.File, w io.Writer, count int64, dup bool) (int64, error) {
ops := readOps{
left: func() int64 {
return count
},
limit: func(l int64) {
count = l
},
read: func(buf *buffer) (int64, error) {
n, err := buf.ReadToWriter(w, count, dup)
count -= n
return n, err
},
}
if dup {
// There is no notification for dup operations.
return rw.Pipe.dup(ctx, ops)
}
n, err := rw.Pipe.read(ctx, ops)
if n > 0 { if n > 0 {
rw.Pipe.Notify(waiter.EventOut) rw.Pipe.Notify(waiter.EventOut)
} }
@ -64,7 +103,40 @@ func (rw *ReaderWriter) Read(ctx context.Context, _ *fs.File, dst usermem.IOSequ
// Write implements fs.FileOperations.Write. // Write implements fs.FileOperations.Write.
func (rw *ReaderWriter) Write(ctx context.Context, _ *fs.File, src usermem.IOSequence, _ int64) (int64, error) { func (rw *ReaderWriter) Write(ctx context.Context, _ *fs.File, src usermem.IOSequence, _ int64) (int64, error) {
n, err := rw.Pipe.write(ctx, src) n, err := rw.Pipe.write(ctx, writeOps{
left: func() int64 {
return src.NumBytes()
},
limit: func(l int64) {
src = src.TakeFirst64(l)
},
write: func(buf *buffer) (int64, error) {
n, err := src.CopyInTo(ctx, buf)
src = src.DropFirst64(n)
return n, err
},
})
if n > 0 {
rw.Pipe.Notify(waiter.EventIn)
}
return n, err
}
// ReadFrom implements fs.FileOperations.WriteTo.
func (rw *ReaderWriter) ReadFrom(ctx context.Context, _ *fs.File, r io.Reader, count int64) (int64, error) {
n, err := rw.Pipe.write(ctx, writeOps{
left: func() int64 {
return count
},
limit: func(l int64) {
count = l
},
write: func(buf *buffer) (int64, error) {
n, err := buf.WriteFromReader(r, count)
count -= n
return n, err
},
})
if n > 0 { if n > 0 {
rw.Pipe.Notify(waiter.EventIn) rw.Pipe.Notify(waiter.EventIn)
} }

View File

@ -26,6 +26,7 @@ package epsocket
import ( import (
"bytes" "bytes"
"io"
"math" "math"
"reflect" "reflect"
"sync" "sync"
@ -227,7 +228,6 @@ type SocketOperations struct {
fsutil.FileNoopFlush `state:"nosave"` fsutil.FileNoopFlush `state:"nosave"`
fsutil.FileNoFsync `state:"nosave"` fsutil.FileNoFsync `state:"nosave"`
fsutil.FileNoMMap `state:"nosave"` fsutil.FileNoMMap `state:"nosave"`
fsutil.FileNoSplice `state:"nosave"`
fsutil.FileUseInodeUnstableAttr `state:"nosave"` fsutil.FileUseInodeUnstableAttr `state:"nosave"`
socket.SendReceiveTimeout socket.SendReceiveTimeout
*waiter.Queue *waiter.Queue
@ -412,17 +412,58 @@ func (s *SocketOperations) Read(ctx context.Context, _ *fs.File, dst usermem.IOS
return int64(n), nil return int64(n), nil
} }
// ioSequencePayload implements tcpip.Payload. It copies user memory bytes on demand // WriteTo implements fs.FileOperations.WriteTo.
// based on the requested size. func (s *SocketOperations) WriteTo(ctx context.Context, _ *fs.File, dst io.Writer, count int64, dup bool) (int64, error) {
s.readMu.Lock()
defer s.readMu.Unlock()
// Copy as much data as possible.
done := int64(0)
for count > 0 {
// This may return a blocking error.
if err := s.fetchReadView(); err != nil {
return done, err.ToError()
}
// Write to the underlying file.
n, err := dst.Write(s.readView)
done += int64(n)
count -= int64(n)
if dup {
// That's all we support for dup. This is generally
// supported by any Linux system calls, but the
// expectation is that now a caller will call read to
// actually remove these bytes from the socket.
return done, nil
}
// Drop that part of the view.
s.readView.TrimFront(n)
if err != nil {
return done, err
}
}
return done, nil
}
// ioSequencePayload implements tcpip.Payload.
//
// t copies user memory bytes on demand based on the requested size.
type ioSequencePayload struct { type ioSequencePayload struct {
ctx context.Context ctx context.Context
src usermem.IOSequence src usermem.IOSequence
} }
// Get implements tcpip.Payload. // FullPayload implements tcpip.Payloader.FullPayload
func (i *ioSequencePayload) Get(size int) ([]byte, *tcpip.Error) { func (i *ioSequencePayload) FullPayload() ([]byte, *tcpip.Error) {
if size > i.Size() { return i.Payload(int(i.src.NumBytes()))
size = i.Size() }
// Payload implements tcpip.Payloader.Payload.
func (i *ioSequencePayload) Payload(size int) ([]byte, *tcpip.Error) {
if max := int(i.src.NumBytes()); size > max {
size = max
} }
v := buffer.NewView(size) v := buffer.NewView(size)
if _, err := i.src.CopyIn(i.ctx, v); err != nil { if _, err := i.src.CopyIn(i.ctx, v); err != nil {
@ -431,11 +472,6 @@ func (i *ioSequencePayload) Get(size int) ([]byte, *tcpip.Error) {
return v, nil return v, nil
} }
// Size implements tcpip.Payload.
func (i *ioSequencePayload) Size() int {
return int(i.src.NumBytes())
}
// DropFirst drops the first n bytes from underlying src. // DropFirst drops the first n bytes from underlying src.
func (i *ioSequencePayload) DropFirst(n int) { func (i *ioSequencePayload) DropFirst(n int) {
i.src = i.src.DropFirst(int(n)) i.src = i.src.DropFirst(int(n))
@ -469,6 +505,76 @@ func (s *SocketOperations) Write(ctx context.Context, _ *fs.File, src usermem.IO
return int64(n), nil return int64(n), nil
} }
// readerPayload implements tcpip.Payloader.
//
// It allocates a view and reads from a reader on-demand, based on available
// capacity in the endpoint.
type readerPayload struct {
ctx context.Context
r io.Reader
count int64
err error
}
// FullPayload implements tcpip.Payloader.FullPayload.
func (r *readerPayload) FullPayload() ([]byte, *tcpip.Error) {
return r.Payload(int(r.count))
}
// Payload implements tcpip.Payloader.Payload.
func (r *readerPayload) Payload(size int) ([]byte, *tcpip.Error) {
if size > int(r.count) {
size = int(r.count)
}
v := buffer.NewView(size)
n, err := r.r.Read(v)
if n > 0 {
// We ignore the error here. It may re-occur on subsequent
// reads, but for now we can enqueue some amount of data.
r.count -= int64(n)
return v[:n], nil
}
if err == syserror.ErrWouldBlock {
return nil, tcpip.ErrWouldBlock
} else if err != nil {
r.err = err // Save for propation.
return nil, tcpip.ErrBadAddress
}
// There is no data and no error. Return an error, which will propagate
// r.err, which will be nil. This is the desired result: (0, nil).
return nil, tcpip.ErrBadAddress
}
// ReadFrom implements fs.FileOperations.ReadFrom.
func (s *SocketOperations) ReadFrom(ctx context.Context, _ *fs.File, r io.Reader, count int64) (int64, error) {
f := &readerPayload{ctx: ctx, r: r, count: count}
n, resCh, err := s.Endpoint.Write(f, tcpip.WriteOptions{})
if err == tcpip.ErrWouldBlock {
return 0, syserror.ErrWouldBlock
}
if resCh != nil {
t := ctx.(*kernel.Task)
if err := t.Block(resCh); err != nil {
return 0, syserr.FromError(err).ToError()
}
n, _, err = s.Endpoint.Write(f, tcpip.WriteOptions{
// Reads may be destructive but should be very fast,
// so we can't release the lock while copying data.
Atomic: true,
})
}
if err == tcpip.ErrWouldBlock {
return n, syserror.ErrWouldBlock
} else if err != nil {
return int64(n), f.err // Propagate error.
}
return int64(n), nil
}
// Readiness returns a mask of ready events for socket s. // Readiness returns a mask of ready events for socket s.
func (s *SocketOperations) Readiness(mask waiter.EventMask) waiter.EventMask { func (s *SocketOperations) Readiness(mask waiter.EventMask) waiter.EventMask {
r := s.Endpoint.Readiness(mask) r := s.Endpoint.Readiness(mask)
@ -2060,7 +2166,7 @@ func (s *SocketOperations) SendMsg(t *kernel.Task, src usermem.IOSequence, to []
n, _, err = s.Endpoint.Write(v, opts) n, _, err = s.Endpoint.Write(v, opts)
} }
dontWait := flags&linux.MSG_DONTWAIT != 0 dontWait := flags&linux.MSG_DONTWAIT != 0
if err == nil && (n >= int64(v.Size()) || dontWait) { if err == nil && (n >= v.src.NumBytes() || dontWait) {
// Complete write. // Complete write.
return int(n), nil return int(n), nil
} }
@ -2085,7 +2191,7 @@ func (s *SocketOperations) SendMsg(t *kernel.Task, src usermem.IOSequence, to []
return 0, syserr.TranslateNetstackError(err) return 0, syserr.TranslateNetstackError(err)
} }
if err == nil && v.Size() == 0 || err != nil && err != tcpip.ErrWouldBlock { if err == nil && v.src.NumBytes() == 0 || err != nil && err != tcpip.ErrWouldBlock {
return int(total), nil return int(total), nil
} }

View File

@ -320,8 +320,8 @@ var AMD64 = &kernel.SyscallTable{
272: syscalls.PartiallySupported("unshare", Unshare, "Mount, cgroup namespaces not supported. Network namespaces supported but must be empty.", nil), 272: syscalls.PartiallySupported("unshare", Unshare, "Mount, cgroup namespaces not supported. Network namespaces supported but must be empty.", nil),
273: syscalls.Error("set_robust_list", syserror.ENOSYS, "Obsolete.", nil), 273: syscalls.Error("set_robust_list", syserror.ENOSYS, "Obsolete.", nil),
274: syscalls.Error("get_robust_list", syserror.ENOSYS, "Obsolete.", nil), 274: syscalls.Error("get_robust_list", syserror.ENOSYS, "Obsolete.", nil),
275: syscalls.PartiallySupported("splice", Splice, "Stub implementation.", []string{"gvisor.dev/issue/138"}), // TODO(b/29354098) 275: syscalls.Supported("splice", Splice),
276: syscalls.ErrorWithEvent("tee", syserror.ENOSYS, "", []string{"gvisor.dev/issue/138"}), // TODO(b/29354098) 276: syscalls.Supported("tee", Tee),
277: syscalls.PartiallySupported("sync_file_range", SyncFileRange, "Full data flush is not guaranteed at this time.", nil), 277: syscalls.PartiallySupported("sync_file_range", SyncFileRange, "Full data flush is not guaranteed at this time.", nil),
278: syscalls.ErrorWithEvent("vmsplice", syserror.ENOSYS, "", []string{"gvisor.dev/issue/138"}), // TODO(b/29354098) 278: syscalls.ErrorWithEvent("vmsplice", syserror.ENOSYS, "", []string{"gvisor.dev/issue/138"}), // TODO(b/29354098)
279: syscalls.CapError("move_pages", linux.CAP_SYS_NICE, "", nil), // requires cap_sys_nice (mostly) 279: syscalls.CapError("move_pages", linux.CAP_SYS_NICE, "", nil), // requires cap_sys_nice (mostly)

View File

@ -29,9 +29,8 @@ func doSplice(t *kernel.Task, outFile, inFile *fs.File, opts fs.SpliceOpts, nonB
total int64 total int64
n int64 n int64
err error err error
ch chan struct{} inCh chan struct{}
inW bool outCh chan struct{}
outW bool
) )
for opts.Length > 0 { for opts.Length > 0 {
n, err = fs.Splice(t, outFile, inFile, opts) n, err = fs.Splice(t, outFile, inFile, opts)
@ -43,35 +42,33 @@ func doSplice(t *kernel.Task, outFile, inFile *fs.File, opts fs.SpliceOpts, nonB
break break
} }
// Are we a registered waiter? // Note that the blocking behavior here is a bit different than the
if ch == nil { // normal pattern. Because we need to have both data to read and data
ch = make(chan struct{}, 1) // to write simultaneously, we actually explicitly block on both of
// these cases in turn before returning to the splice operation.
if inFile.Readiness(EventMaskRead) == 0 {
if inCh == nil {
inCh = make(chan struct{}, 1)
inW, _ := waiter.NewChannelEntry(inCh)
inFile.EventRegister(&inW, EventMaskRead)
defer inFile.EventUnregister(&inW)
continue // Need to refresh readiness.
}
if err = t.Block(inCh); err != nil {
break
}
} }
if !inW && !inFile.Flags().NonBlocking { if outFile.Readiness(EventMaskWrite) == 0 {
w, _ := waiter.NewChannelEntry(ch) if outCh == nil {
inFile.EventRegister(&w, EventMaskRead) outCh = make(chan struct{}, 1)
defer inFile.EventUnregister(&w) outW, _ := waiter.NewChannelEntry(outCh)
inW = true // Registered. outFile.EventRegister(&outW, EventMaskWrite)
} else if !outW && !outFile.Flags().NonBlocking { defer outFile.EventUnregister(&outW)
w, _ := waiter.NewChannelEntry(ch) continue // Need to refresh readiness.
outFile.EventRegister(&w, EventMaskWrite) }
defer outFile.EventUnregister(&w) if err = t.Block(outCh); err != nil {
outW = true // Registered. break
} }
// Was anything registered? If no, everything is non-blocking.
if !inW && !outW {
break
}
if (!inW || inFile.Readiness(EventMaskRead) != 0) && (!outW || outFile.Readiness(EventMaskWrite) != 0) {
// Something became ready, try again without blocking.
continue
}
// Block until there's data.
if err = t.Block(ch); err != nil {
break
} }
} }
@ -149,7 +146,7 @@ func Sendfile(t *kernel.Task, args arch.SyscallArguments) (uintptr, *kernel.Sysc
Length: count, Length: count,
SrcOffset: true, SrcOffset: true,
SrcStart: offset, SrcStart: offset,
}, false) }, outFile.Flags().NonBlocking)
// Copy out the new offset. // Copy out the new offset.
if _, err := t.CopyOut(offsetAddr, n+offset); err != nil { if _, err := t.CopyOut(offsetAddr, n+offset); err != nil {
@ -159,7 +156,7 @@ func Sendfile(t *kernel.Task, args arch.SyscallArguments) (uintptr, *kernel.Sysc
// Send data using splice. // Send data using splice.
n, err = doSplice(t, outFile, inFile, fs.SpliceOpts{ n, err = doSplice(t, outFile, inFile, fs.SpliceOpts{
Length: count, Length: count,
}, false) }, outFile.Flags().NonBlocking)
} }
// We can only pass a single file to handleIOError, so pick inFile // We can only pass a single file to handleIOError, so pick inFile
@ -181,12 +178,6 @@ func Splice(t *kernel.Task, args arch.SyscallArguments) (uintptr, *kernel.Syscal
return 0, nil, syserror.EINVAL return 0, nil, syserror.EINVAL
} }
// Only non-blocking is meaningful. Note that unlike in Linux, this
// flag is applied consistently. We will have either fully blocking or
// non-blocking behavior below, regardless of the underlying files
// being spliced to. It's unclear if this is a bug or not yet.
nonBlocking := (flags & linux.SPLICE_F_NONBLOCK) != 0
// Get files. // Get files.
outFile := t.GetFile(outFD) outFile := t.GetFile(outFD)
if outFile == nil { if outFile == nil {
@ -200,6 +191,13 @@ func Splice(t *kernel.Task, args arch.SyscallArguments) (uintptr, *kernel.Syscal
} }
defer inFile.DecRef() defer inFile.DecRef()
// The operation is non-blocking if anything is non-blocking.
//
// N.B. This is a rather simplistic heuristic that avoids some
// poor edge case behavior since the exact semantics here are
// underspecified and vary between versions of Linux itself.
nonBlock := inFile.Flags().NonBlocking || outFile.Flags().NonBlocking || (flags&linux.SPLICE_F_NONBLOCK != 0)
// Construct our options. // Construct our options.
// //
// Note that exactly one of the underlying buffers must be a pipe. We // Note that exactly one of the underlying buffers must be a pipe. We
@ -257,7 +255,7 @@ func Splice(t *kernel.Task, args arch.SyscallArguments) (uintptr, *kernel.Syscal
} }
// Splice data. // Splice data.
n, err := doSplice(t, outFile, inFile, opts, nonBlocking) n, err := doSplice(t, outFile, inFile, opts, nonBlock)
// See above; inFile is chosen arbitrarily here. // See above; inFile is chosen arbitrarily here.
return uintptr(n), nil, handleIOError(t, n != 0, err, kernel.ERESTARTSYS, "splice", inFile) return uintptr(n), nil, handleIOError(t, n != 0, err, kernel.ERESTARTSYS, "splice", inFile)
@ -275,9 +273,6 @@ func Tee(t *kernel.Task, args arch.SyscallArguments) (uintptr, *kernel.SyscallCo
return 0, nil, syserror.EINVAL return 0, nil, syserror.EINVAL
} }
// Only non-blocking is meaningful.
nonBlocking := (flags & linux.SPLICE_F_NONBLOCK) != 0
// Get files. // Get files.
outFile := t.GetFile(outFD) outFile := t.GetFile(outFD)
if outFile == nil { if outFile == nil {
@ -301,11 +296,14 @@ func Tee(t *kernel.Task, args arch.SyscallArguments) (uintptr, *kernel.SyscallCo
return 0, nil, syserror.EINVAL return 0, nil, syserror.EINVAL
} }
// The operation is non-blocking if anything is non-blocking.
nonBlock := inFile.Flags().NonBlocking || outFile.Flags().NonBlocking || (flags&linux.SPLICE_F_NONBLOCK != 0)
// Splice data. // Splice data.
n, err := doSplice(t, outFile, inFile, fs.SpliceOpts{ n, err := doSplice(t, outFile, inFile, fs.SpliceOpts{
Length: count, Length: count,
Dup: true, Dup: true,
}, nonBlocking) }, nonBlock)
// See above; inFile is chosen arbitrarily here. // See above; inFile is chosen arbitrarily here.
return uintptr(n), nil, handleIOError(t, n != 0, err, kernel.ERESTARTSYS, "tee", inFile) return uintptr(n), nil, handleIOError(t, n != 0, err, kernel.ERESTARTSYS, "tee", inFile)

View File

@ -27,6 +27,11 @@ const (
udpChecksum = 6 udpChecksum = 6
) )
const (
// UDPMaximumPacketSize is the largest possible UDP packet.
UDPMaximumPacketSize = 0xffff
)
// UDPFields contains the fields of a UDP packet. It is used to describe the // UDPFields contains the fields of a UDP packet. It is used to describe the
// fields of a packet that needs to be encoded. // fields of a packet that needs to be encoded.
type UDPFields struct { type UDPFields struct {

View File

@ -65,13 +65,13 @@ func (*fakeTransportEndpoint) Read(*tcpip.FullAddress) (buffer.View, tcpip.Contr
return buffer.View{}, tcpip.ControlMessages{}, nil return buffer.View{}, tcpip.ControlMessages{}, nil
} }
func (f *fakeTransportEndpoint) Write(p tcpip.Payload, opts tcpip.WriteOptions) (int64, <-chan struct{}, *tcpip.Error) { func (f *fakeTransportEndpoint) Write(p tcpip.Payloader, opts tcpip.WriteOptions) (int64, <-chan struct{}, *tcpip.Error) {
if len(f.route.RemoteAddress) == 0 { if len(f.route.RemoteAddress) == 0 {
return 0, nil, tcpip.ErrNoRoute return 0, nil, tcpip.ErrNoRoute
} }
hdr := buffer.NewPrependable(int(f.route.MaxHeaderLength())) hdr := buffer.NewPrependable(int(f.route.MaxHeaderLength()))
v, err := p.Get(p.Size()) v, err := p.FullPayload()
if err != nil { if err != nil {
return 0, nil, err return 0, nil, err
} }

View File

@ -261,31 +261,34 @@ type FullAddress struct {
Port uint16 Port uint16
} }
// Payload provides an interface around data that is being sent to an endpoint. // Payloader is an interface that provides data.
// This allows the endpoint to request the amount of data it needs based on //
// internal buffers without exposing them. 'p.Get(p.Size())' reads all the data. // This interface allows the endpoint to request the amount of data it needs
type Payload interface { // based on internal buffers without exposing them.
// Get returns a slice containing exactly 'min(size, p.Size())' bytes. type Payloader interface {
Get(size int) ([]byte, *Error) // FullPayload returns all available bytes.
FullPayload() ([]byte, *Error)
// Size returns the payload size. // Payload returns a slice containing at most size bytes.
Size() int Payload(size int) ([]byte, *Error)
} }
// SlicePayload implements Payload on top of slices for convenience. // SlicePayload implements Payloader for slices.
//
// This is typically used for tests.
type SlicePayload []byte type SlicePayload []byte
// Get implements Payload. // FullPayload implements Payloader.FullPayload.
func (s SlicePayload) Get(size int) ([]byte, *Error) { func (s SlicePayload) FullPayload() ([]byte, *Error) {
if size > s.Size() { return s, nil
size = s.Size()
}
return s[:size], nil
} }
// Size implements Payload. // Payload implements Payloader.Payload.
func (s SlicePayload) Size() int { func (s SlicePayload) Payload(size int) ([]byte, *Error) {
return len(s) if size > len(s) {
size = len(s)
}
return s[:size], nil
} }
// A ControlMessages contains socket control messages for IP sockets. // A ControlMessages contains socket control messages for IP sockets.
@ -338,7 +341,7 @@ type Endpoint interface {
// ErrNoLinkAddress and a notification channel is returned for the caller to // ErrNoLinkAddress and a notification channel is returned for the caller to
// block. Channel is closed once address resolution is complete (success or // block. Channel is closed once address resolution is complete (success or
// not). The channel is only non-nil in this case. // not). The channel is only non-nil in this case.
Write(Payload, WriteOptions) (int64, <-chan struct{}, *Error) Write(Payloader, WriteOptions) (int64, <-chan struct{}, *Error)
// Peek reads data without consuming it from the endpoint. // Peek reads data without consuming it from the endpoint.
// //
@ -432,6 +435,11 @@ type WriteOptions struct {
// EndOfRecord has the same semantics as Linux's MSG_EOR. // EndOfRecord has the same semantics as Linux's MSG_EOR.
EndOfRecord bool EndOfRecord bool
// Atomic means that all data fetched from Payloader must be written to the
// endpoint. If Atomic is false, then data fetched from the Payloader may be
// discarded if available endpoint buffer space is unsufficient.
Atomic bool
} }
// SockOpt represents socket options which values have the int type. // SockOpt represents socket options which values have the int type.

View File

@ -204,7 +204,7 @@ func (e *endpoint) prepareForWrite(to *tcpip.FullAddress) (retry bool, err *tcpi
// Write writes data to the endpoint's peer. This method does not block // Write writes data to the endpoint's peer. This method does not block
// if the data cannot be written. // if the data cannot be written.
func (e *endpoint) Write(p tcpip.Payload, opts tcpip.WriteOptions) (int64, <-chan struct{}, *tcpip.Error) { func (e *endpoint) Write(p tcpip.Payloader, opts tcpip.WriteOptions) (int64, <-chan struct{}, *tcpip.Error) {
// MSG_MORE is unimplemented. (This also means that MSG_EOR is a no-op.) // MSG_MORE is unimplemented. (This also means that MSG_EOR is a no-op.)
if opts.More { if opts.More {
return 0, nil, tcpip.ErrInvalidOptionValue return 0, nil, tcpip.ErrInvalidOptionValue
@ -289,7 +289,7 @@ func (e *endpoint) Write(p tcpip.Payload, opts tcpip.WriteOptions) (int64, <-cha
} }
} }
v, err := p.Get(p.Size()) v, err := p.FullPayload()
if err != nil { if err != nil {
return 0, nil, err return 0, nil, err
} }

View File

@ -207,7 +207,7 @@ func (ep *endpoint) Read(addr *tcpip.FullAddress) (buffer.View, tcpip.ControlMes
} }
// Write implements tcpip.Endpoint.Write. // Write implements tcpip.Endpoint.Write.
func (ep *endpoint) Write(payload tcpip.Payload, opts tcpip.WriteOptions) (int64, <-chan struct{}, *tcpip.Error) { func (ep *endpoint) Write(p tcpip.Payloader, opts tcpip.WriteOptions) (int64, <-chan struct{}, *tcpip.Error) {
// MSG_MORE is unimplemented. This also means that MSG_EOR is a no-op. // MSG_MORE is unimplemented. This also means that MSG_EOR is a no-op.
if opts.More { if opts.More {
return 0, nil, tcpip.ErrInvalidOptionValue return 0, nil, tcpip.ErrInvalidOptionValue
@ -220,9 +220,8 @@ func (ep *endpoint) Write(payload tcpip.Payload, opts tcpip.WriteOptions) (int64
return 0, nil, tcpip.ErrInvalidEndpointState return 0, nil, tcpip.ErrInvalidEndpointState
} }
payloadBytes, err := payload.Get(payload.Size()) payloadBytes, err := p.FullPayload()
if err != nil { if err != nil {
ep.mu.RUnlock()
return 0, nil, err return 0, nil, err
} }
@ -230,7 +229,7 @@ func (ep *endpoint) Write(payload tcpip.Payload, opts tcpip.WriteOptions) (int64
// destination address, route using that address. // destination address, route using that address.
if !ep.associated { if !ep.associated {
ip := header.IPv4(payloadBytes) ip := header.IPv4(payloadBytes)
if !ip.IsValid(payload.Size()) { if !ip.IsValid(len(payloadBytes)) {
ep.mu.RUnlock() ep.mu.RUnlock()
return 0, nil, tcpip.ErrInvalidOptionValue return 0, nil, tcpip.ErrInvalidOptionValue
} }

View File

@ -806,7 +806,7 @@ func (e *endpoint) isEndpointWritableLocked() (int, *tcpip.Error) {
} }
// Write writes data to the endpoint's peer. // Write writes data to the endpoint's peer.
func (e *endpoint) Write(p tcpip.Payload, opts tcpip.WriteOptions) (int64, <-chan struct{}, *tcpip.Error) { func (e *endpoint) Write(p tcpip.Payloader, opts tcpip.WriteOptions) (int64, <-chan struct{}, *tcpip.Error) {
// Linux completely ignores any address passed to sendto(2) for TCP sockets // Linux completely ignores any address passed to sendto(2) for TCP sockets
// (without the MSG_FASTOPEN flag). Corking is unimplemented, so opts.More // (without the MSG_FASTOPEN flag). Corking is unimplemented, so opts.More
// and opts.EndOfRecord are also ignored. // and opts.EndOfRecord are also ignored.
@ -821,47 +821,52 @@ func (e *endpoint) Write(p tcpip.Payload, opts tcpip.WriteOptions) (int64, <-cha
return 0, nil, err return 0, nil, err
} }
e.sndBufMu.Unlock() // We can release locks while copying data.
e.mu.RUnlock() //
// This is not possible if atomic is set, because we can't allow the
// Nothing to do if the buffer is empty. // available buffer space to be consumed by some other caller while we
if p.Size() == 0 { // are copying data in.
return 0, nil, nil if !opts.Atomic {
e.sndBufMu.Unlock()
e.mu.RUnlock()
} }
// Copy in memory without holding sndBufMu so that worker goroutine can // Fetch data.
// make progress independent of this operation. v, perr := p.Payload(avail)
v, perr := p.Get(avail) if perr != nil || len(v) == 0 {
if perr != nil { if opts.Atomic { // See above.
e.sndBufMu.Unlock()
e.mu.RUnlock()
}
// Note that perr may be nil if len(v) == 0.
return 0, nil, perr return 0, nil, perr
} }
e.mu.RLock() if !opts.Atomic { // See above.
e.sndBufMu.Lock() e.mu.RLock()
e.sndBufMu.Lock()
// Because we released the lock before copying, check state again // Because we released the lock before copying, check state again
// to make sure the endpoint is still in a valid state for a // to make sure the endpoint is still in a valid state for a write.
// write. avail, err = e.isEndpointWritableLocked()
avail, err = e.isEndpointWritableLocked() if err != nil {
if err != nil { e.sndBufMu.Unlock()
e.sndBufMu.Unlock() e.mu.RUnlock()
e.mu.RUnlock() return 0, nil, err
return 0, nil, err }
}
// Discard any excess data copied in due to avail being reduced due to a // Discard any excess data copied in due to avail being reduced due
// simultaneous write call to the socket. // to a simultaneous write call to the socket.
if avail < len(v) { if avail < len(v) {
v = v[:avail] v = v[:avail]
}
} }
// Add data to the send queue. // Add data to the send queue.
l := len(v)
s := newSegmentFromView(&e.route, e.id, v) s := newSegmentFromView(&e.route, e.id, v)
e.sndBufUsed += l e.sndBufUsed += len(v)
e.sndBufInQueue += seqnum.Size(l) e.sndBufInQueue += seqnum.Size(len(v))
e.sndQueue.PushBack(s) e.sndQueue.PushBack(s)
e.sndBufMu.Unlock() e.sndBufMu.Unlock()
// Release the endpoint lock to prevent deadlocks due to lock // Release the endpoint lock to prevent deadlocks due to lock
// order inversion when acquiring workMu. // order inversion when acquiring workMu.
@ -875,7 +880,8 @@ func (e *endpoint) Write(p tcpip.Payload, opts tcpip.WriteOptions) (int64, <-cha
// Let the protocol goroutine do the work. // Let the protocol goroutine do the work.
e.sndWaker.Assert() e.sndWaker.Assert()
} }
return int64(l), nil, nil
return int64(len(v)), nil, nil
} }
// Peek reads data without consuming it from the endpoint. // Peek reads data without consuming it from the endpoint.

View File

@ -15,7 +15,6 @@
package udp package udp
import ( import (
"math"
"sync" "sync"
"gvisor.dev/gvisor/pkg/tcpip" "gvisor.dev/gvisor/pkg/tcpip"
@ -277,17 +276,12 @@ func (e *endpoint) connectRoute(nicid tcpip.NICID, addr tcpip.FullAddress, netPr
// Write writes data to the endpoint's peer. This method does not block // Write writes data to the endpoint's peer. This method does not block
// if the data cannot be written. // if the data cannot be written.
func (e *endpoint) Write(p tcpip.Payload, opts tcpip.WriteOptions) (int64, <-chan struct{}, *tcpip.Error) { func (e *endpoint) Write(p tcpip.Payloader, opts tcpip.WriteOptions) (int64, <-chan struct{}, *tcpip.Error) {
// MSG_MORE is unimplemented. (This also means that MSG_EOR is a no-op.) // MSG_MORE is unimplemented. (This also means that MSG_EOR is a no-op.)
if opts.More { if opts.More {
return 0, nil, tcpip.ErrInvalidOptionValue return 0, nil, tcpip.ErrInvalidOptionValue
} }
if p.Size() > math.MaxUint16 {
// Payload can't possibly fit in a packet.
return 0, nil, tcpip.ErrMessageTooLong
}
to := opts.To to := opts.To
e.mu.RLock() e.mu.RLock()
@ -370,10 +364,14 @@ func (e *endpoint) Write(p tcpip.Payload, opts tcpip.WriteOptions) (int64, <-cha
} }
} }
v, err := p.Get(p.Size()) v, err := p.FullPayload()
if err != nil { if err != nil {
return 0, nil, err return 0, nil, err
} }
if len(v) > header.UDPMaximumPacketSize {
// Payload can't possibly fit in a packet.
return 0, nil, tcpip.ErrMessageTooLong
}
ttl := route.DefaultTTL() ttl := route.DefaultTTL()
if header.IsV4MulticastAddress(route.RemoteAddress) || header.IsV6MulticastAddress(route.RemoteAddress) { if header.IsV4MulticastAddress(route.RemoteAddress) || header.IsV6MulticastAddress(route.RemoteAddress) {

View File

@ -1867,7 +1867,9 @@ cc_binary(
"//test/util:temp_path", "//test/util:temp_path",
"//test/util:test_main", "//test/util:test_main",
"//test/util:test_util", "//test/util:test_util",
"//test/util:thread_util",
"@com_google_absl//absl/strings", "@com_google_absl//absl/strings",
"@com_google_absl//absl/time",
"@com_google_googletest//:gtest", "@com_google_googletest//:gtest",
], ],
) )
@ -1901,6 +1903,7 @@ cc_binary(
"//test/util:test_util", "//test/util:test_util",
"//test/util:thread_util", "//test/util:thread_util",
"@com_google_absl//absl/strings", "@com_google_absl//absl/strings",
"@com_google_absl//absl/time",
"@com_google_googletest//:gtest", "@com_google_googletest//:gtest",
], ],
) )

View File

@ -168,6 +168,20 @@ TEST_P(PipeTest, Write) {
EXPECT_EQ(wbuf, rbuf); EXPECT_EQ(wbuf, rbuf);
} }
TEST_P(PipeTest, WritePage) {
SKIP_IF(!CreateBlocking());
std::vector<char> wbuf(kPageSize);
RandomizeBuffer(wbuf.data(), wbuf.size());
std::vector<char> rbuf(wbuf.size());
ASSERT_THAT(write(wfd_.get(), wbuf.data(), wbuf.size()),
SyscallSucceedsWithValue(wbuf.size()));
ASSERT_THAT(read(rfd_.get(), rbuf.data(), rbuf.size()),
SyscallSucceedsWithValue(rbuf.size()));
EXPECT_EQ(memcmp(rbuf.data(), wbuf.data(), wbuf.size()), 0);
}
TEST_P(PipeTest, NonBlocking) { TEST_P(PipeTest, NonBlocking) {
SKIP_IF(!CreateNonBlocking()); SKIP_IF(!CreateNonBlocking());

View File

@ -19,9 +19,12 @@
#include "gmock/gmock.h" #include "gmock/gmock.h"
#include "gtest/gtest.h" #include "gtest/gtest.h"
#include "absl/strings/string_view.h" #include "absl/strings/string_view.h"
#include "absl/time/clock.h"
#include "absl/time/time.h"
#include "test/util/file_descriptor.h" #include "test/util/file_descriptor.h"
#include "test/util/temp_path.h" #include "test/util/temp_path.h"
#include "test/util/test_util.h" #include "test/util/test_util.h"
#include "test/util/thread_util.h"
namespace gvisor { namespace gvisor {
namespace testing { namespace testing {
@ -442,6 +445,72 @@ TEST(SendFileTest, SendToNotARegularFile) {
EXPECT_THAT(sendfile(outf.get(), inf.get(), nullptr, 0), EXPECT_THAT(sendfile(outf.get(), inf.get(), nullptr, 0),
SyscallFailsWithErrno(EINVAL)); SyscallFailsWithErrno(EINVAL));
} }
TEST(SendFileTest, SendPipeWouldBlock) {
// Create temp file.
constexpr char kData[] =
"The fool doth think he is wise, but the wise man knows himself to be a "
"fool.";
constexpr int kDataSize = sizeof(kData) - 1;
const TempPath in_file = ASSERT_NO_ERRNO_AND_VALUE(TempPath::CreateFileWith(
GetAbsoluteTestTmpdir(), kData, TempPath::kDefaultFileMode));
// Open the input file as read only.
const FileDescriptor inf =
ASSERT_NO_ERRNO_AND_VALUE(Open(in_file.path(), O_RDONLY));
// Setup the output named pipe.
int fds[2];
ASSERT_THAT(pipe2(fds, O_NONBLOCK), SyscallSucceeds());
const FileDescriptor rfd(fds[0]);
const FileDescriptor wfd(fds[1]);
// Fill up the pipe's buffer.
int pipe_size = -1;
ASSERT_THAT(pipe_size = fcntl(wfd.get(), F_GETPIPE_SZ), SyscallSucceeds());
std::vector<char> buf(2 * pipe_size);
ASSERT_THAT(write(wfd.get(), buf.data(), buf.size()),
SyscallSucceedsWithValue(pipe_size));
EXPECT_THAT(sendfile(wfd.get(), inf.get(), nullptr, kDataSize),
SyscallFailsWithErrno(EWOULDBLOCK));
}
TEST(SendFileTest, SendPipeBlocks) {
// Create temp file.
constexpr char kData[] =
"The fault, dear Brutus, is not in our stars, but in ourselves.";
constexpr int kDataSize = sizeof(kData) - 1;
const TempPath in_file = ASSERT_NO_ERRNO_AND_VALUE(TempPath::CreateFileWith(
GetAbsoluteTestTmpdir(), kData, TempPath::kDefaultFileMode));
// Open the input file as read only.
const FileDescriptor inf =
ASSERT_NO_ERRNO_AND_VALUE(Open(in_file.path(), O_RDONLY));
// Setup the output named pipe.
int fds[2];
ASSERT_THAT(pipe(fds), SyscallSucceeds());
const FileDescriptor rfd(fds[0]);
const FileDescriptor wfd(fds[1]);
// Fill up the pipe's buffer.
int pipe_size = -1;
ASSERT_THAT(pipe_size = fcntl(wfd.get(), F_GETPIPE_SZ), SyscallSucceeds());
std::vector<char> buf(pipe_size);
ASSERT_THAT(write(wfd.get(), buf.data(), buf.size()),
SyscallSucceedsWithValue(pipe_size));
ScopedThread t([&]() {
absl::SleepFor(absl::Milliseconds(100));
ASSERT_THAT(read(rfd.get(), buf.data(), buf.size()),
SyscallSucceedsWithValue(pipe_size));
});
EXPECT_THAT(sendfile(wfd.get(), inf.get(), nullptr, kDataSize),
SyscallSucceedsWithValue(kDataSize));
}
} // namespace } // namespace
} // namespace testing } // namespace testing

View File

@ -14,12 +14,16 @@
#include <fcntl.h> #include <fcntl.h>
#include <sys/eventfd.h> #include <sys/eventfd.h>
#include <sys/resource.h>
#include <sys/sendfile.h> #include <sys/sendfile.h>
#include <sys/time.h>
#include <unistd.h> #include <unistd.h>
#include "gmock/gmock.h" #include "gmock/gmock.h"
#include "gtest/gtest.h" #include "gtest/gtest.h"
#include "absl/strings/string_view.h" #include "absl/strings/string_view.h"
#include "absl/time/clock.h"
#include "absl/time/time.h"
#include "test/util/file_descriptor.h" #include "test/util/file_descriptor.h"
#include "test/util/temp_path.h" #include "test/util/temp_path.h"
#include "test/util/test_util.h" #include "test/util/test_util.h"
@ -36,23 +40,23 @@ TEST(SpliceTest, TwoRegularFiles) {
const TempPath out_file = ASSERT_NO_ERRNO_AND_VALUE(TempPath::CreateFile()); const TempPath out_file = ASSERT_NO_ERRNO_AND_VALUE(TempPath::CreateFile());
// Open the input file as read only. // Open the input file as read only.
const FileDescriptor inf = const FileDescriptor in_fd =
ASSERT_NO_ERRNO_AND_VALUE(Open(in_file.path(), O_RDONLY)); ASSERT_NO_ERRNO_AND_VALUE(Open(in_file.path(), O_RDONLY));
// Open the output file as write only. // Open the output file as write only.
const FileDescriptor outf = const FileDescriptor out_fd =
ASSERT_NO_ERRNO_AND_VALUE(Open(out_file.path(), O_WRONLY)); ASSERT_NO_ERRNO_AND_VALUE(Open(out_file.path(), O_WRONLY));
// Verify that it is rejected as expected; regardless of offsets. // Verify that it is rejected as expected; regardless of offsets.
loff_t in_offset = 0; loff_t in_offset = 0;
loff_t out_offset = 0; loff_t out_offset = 0;
EXPECT_THAT(splice(inf.get(), &in_offset, outf.get(), &out_offset, 1, 0), EXPECT_THAT(splice(in_fd.get(), &in_offset, out_fd.get(), &out_offset, 1, 0),
SyscallFailsWithErrno(EINVAL)); SyscallFailsWithErrno(EINVAL));
EXPECT_THAT(splice(inf.get(), nullptr, outf.get(), &out_offset, 1, 0), EXPECT_THAT(splice(in_fd.get(), nullptr, out_fd.get(), &out_offset, 1, 0),
SyscallFailsWithErrno(EINVAL)); SyscallFailsWithErrno(EINVAL));
EXPECT_THAT(splice(inf.get(), &in_offset, outf.get(), nullptr, 1, 0), EXPECT_THAT(splice(in_fd.get(), &in_offset, out_fd.get(), nullptr, 1, 0),
SyscallFailsWithErrno(EINVAL)); SyscallFailsWithErrno(EINVAL));
EXPECT_THAT(splice(inf.get(), nullptr, outf.get(), nullptr, 1, 0), EXPECT_THAT(splice(in_fd.get(), nullptr, out_fd.get(), nullptr, 1, 0),
SyscallFailsWithErrno(EINVAL)); SyscallFailsWithErrno(EINVAL));
} }
@ -75,8 +79,6 @@ TEST(SpliceTest, SamePipe) {
} }
TEST(TeeTest, SamePipe) { TEST(TeeTest, SamePipe) {
SKIP_IF(IsRunningOnGvisor());
// Create a new pipe. // Create a new pipe.
int fds[2]; int fds[2];
ASSERT_THAT(pipe(fds), SyscallSucceeds()); ASSERT_THAT(pipe(fds), SyscallSucceeds());
@ -95,11 +97,9 @@ TEST(TeeTest, SamePipe) {
} }
TEST(TeeTest, RegularFile) { TEST(TeeTest, RegularFile) {
SKIP_IF(IsRunningOnGvisor());
// Open some file. // Open some file.
const TempPath in_file = ASSERT_NO_ERRNO_AND_VALUE(TempPath::CreateFile()); const TempPath in_file = ASSERT_NO_ERRNO_AND_VALUE(TempPath::CreateFile());
const FileDescriptor inf = const FileDescriptor in_fd =
ASSERT_NO_ERRNO_AND_VALUE(Open(in_file.path(), O_RDWR)); ASSERT_NO_ERRNO_AND_VALUE(Open(in_file.path(), O_RDWR));
// Create a new pipe. // Create a new pipe.
@ -109,9 +109,9 @@ TEST(TeeTest, RegularFile) {
const FileDescriptor wfd(fds[1]); const FileDescriptor wfd(fds[1]);
// Attempt to tee from the file. // Attempt to tee from the file.
EXPECT_THAT(tee(inf.get(), wfd.get(), kPageSize, 0), EXPECT_THAT(tee(in_fd.get(), wfd.get(), kPageSize, 0),
SyscallFailsWithErrno(EINVAL)); SyscallFailsWithErrno(EINVAL));
EXPECT_THAT(tee(rfd.get(), inf.get(), kPageSize, 0), EXPECT_THAT(tee(rfd.get(), in_fd.get(), kPageSize, 0),
SyscallFailsWithErrno(EINVAL)); SyscallFailsWithErrno(EINVAL));
} }
@ -142,7 +142,7 @@ TEST(SpliceTest, FromEventFD) {
constexpr uint64_t kEventFDValue = 1; constexpr uint64_t kEventFDValue = 1;
int efd; int efd;
ASSERT_THAT(efd = eventfd(kEventFDValue, 0), SyscallSucceeds()); ASSERT_THAT(efd = eventfd(kEventFDValue, 0), SyscallSucceeds());
const FileDescriptor inf(efd); const FileDescriptor in_fd(efd);
// Create a new pipe. // Create a new pipe.
int fds[2]; int fds[2];
@ -152,7 +152,7 @@ TEST(SpliceTest, FromEventFD) {
// Splice 8-byte eventfd value to pipe. // Splice 8-byte eventfd value to pipe.
constexpr int kEventFDSize = 8; constexpr int kEventFDSize = 8;
EXPECT_THAT(splice(inf.get(), nullptr, wfd.get(), nullptr, kEventFDSize, 0), EXPECT_THAT(splice(in_fd.get(), nullptr, wfd.get(), nullptr, kEventFDSize, 0),
SyscallSucceedsWithValue(kEventFDSize)); SyscallSucceedsWithValue(kEventFDSize));
// Contents should be equal. // Contents should be equal.
@ -166,7 +166,7 @@ TEST(SpliceTest, FromEventFD) {
TEST(SpliceTest, FromEventFDOffset) { TEST(SpliceTest, FromEventFDOffset) {
int efd; int efd;
ASSERT_THAT(efd = eventfd(0, 0), SyscallSucceeds()); ASSERT_THAT(efd = eventfd(0, 0), SyscallSucceeds());
const FileDescriptor inf(efd); const FileDescriptor in_fd(efd);
// Create a new pipe. // Create a new pipe.
int fds[2]; int fds[2];
@ -179,7 +179,7 @@ TEST(SpliceTest, FromEventFDOffset) {
// This is not allowed because eventfd doesn't support pread. // This is not allowed because eventfd doesn't support pread.
constexpr int kEventFDSize = 8; constexpr int kEventFDSize = 8;
loff_t in_off = 0; loff_t in_off = 0;
EXPECT_THAT(splice(inf.get(), &in_off, wfd.get(), nullptr, kEventFDSize, 0), EXPECT_THAT(splice(in_fd.get(), &in_off, wfd.get(), nullptr, kEventFDSize, 0),
SyscallFailsWithErrno(EINVAL)); SyscallFailsWithErrno(EINVAL));
} }
@ -200,28 +200,29 @@ TEST(SpliceTest, ToEventFDOffset) {
int efd; int efd;
ASSERT_THAT(efd = eventfd(0, 0), SyscallSucceeds()); ASSERT_THAT(efd = eventfd(0, 0), SyscallSucceeds());
const FileDescriptor outf(efd); const FileDescriptor out_fd(efd);
// Attempt to splice 8-byte eventfd value to pipe with offset. // Attempt to splice 8-byte eventfd value to pipe with offset.
// //
// This is not allowed because eventfd doesn't support pwrite. // This is not allowed because eventfd doesn't support pwrite.
loff_t out_off = 0; loff_t out_off = 0;
EXPECT_THAT(splice(rfd.get(), nullptr, outf.get(), &out_off, kEventFDSize, 0), EXPECT_THAT(
SyscallFailsWithErrno(EINVAL)); splice(rfd.get(), nullptr, out_fd.get(), &out_off, kEventFDSize, 0),
SyscallFailsWithErrno(EINVAL));
} }
TEST(SpliceTest, ToPipe) { TEST(SpliceTest, ToPipe) {
// Open the input file. // Open the input file.
const TempPath in_file = ASSERT_NO_ERRNO_AND_VALUE(TempPath::CreateFile()); const TempPath in_file = ASSERT_NO_ERRNO_AND_VALUE(TempPath::CreateFile());
const FileDescriptor inf = const FileDescriptor in_fd =
ASSERT_NO_ERRNO_AND_VALUE(Open(in_file.path(), O_RDWR)); ASSERT_NO_ERRNO_AND_VALUE(Open(in_file.path(), O_RDWR));
// Fill with some random data. // Fill with some random data.
std::vector<char> buf(kPageSize); std::vector<char> buf(kPageSize);
RandomizeBuffer(buf.data(), buf.size()); RandomizeBuffer(buf.data(), buf.size());
ASSERT_THAT(write(inf.get(), buf.data(), buf.size()), ASSERT_THAT(write(in_fd.get(), buf.data(), buf.size()),
SyscallSucceedsWithValue(kPageSize)); SyscallSucceedsWithValue(kPageSize));
ASSERT_THAT(lseek(inf.get(), 0, SEEK_SET), SyscallSucceedsWithValue(0)); ASSERT_THAT(lseek(in_fd.get(), 0, SEEK_SET), SyscallSucceedsWithValue(0));
// Create a new pipe. // Create a new pipe.
int fds[2]; int fds[2];
@ -230,7 +231,7 @@ TEST(SpliceTest, ToPipe) {
const FileDescriptor wfd(fds[1]); const FileDescriptor wfd(fds[1]);
// Splice to the pipe. // Splice to the pipe.
EXPECT_THAT(splice(inf.get(), nullptr, wfd.get(), nullptr, kPageSize, 0), EXPECT_THAT(splice(in_fd.get(), nullptr, wfd.get(), nullptr, kPageSize, 0),
SyscallSucceedsWithValue(kPageSize)); SyscallSucceedsWithValue(kPageSize));
// Contents should be equal. // Contents should be equal.
@ -243,13 +244,13 @@ TEST(SpliceTest, ToPipe) {
TEST(SpliceTest, ToPipeOffset) { TEST(SpliceTest, ToPipeOffset) {
// Open the input file. // Open the input file.
const TempPath in_file = ASSERT_NO_ERRNO_AND_VALUE(TempPath::CreateFile()); const TempPath in_file = ASSERT_NO_ERRNO_AND_VALUE(TempPath::CreateFile());
const FileDescriptor inf = const FileDescriptor in_fd =
ASSERT_NO_ERRNO_AND_VALUE(Open(in_file.path(), O_RDWR)); ASSERT_NO_ERRNO_AND_VALUE(Open(in_file.path(), O_RDWR));
// Fill with some random data. // Fill with some random data.
std::vector<char> buf(kPageSize); std::vector<char> buf(kPageSize);
RandomizeBuffer(buf.data(), buf.size()); RandomizeBuffer(buf.data(), buf.size());
ASSERT_THAT(write(inf.get(), buf.data(), buf.size()), ASSERT_THAT(write(in_fd.get(), buf.data(), buf.size()),
SyscallSucceedsWithValue(kPageSize)); SyscallSucceedsWithValue(kPageSize));
// Create a new pipe. // Create a new pipe.
@ -261,7 +262,7 @@ TEST(SpliceTest, ToPipeOffset) {
// Splice to the pipe. // Splice to the pipe.
loff_t in_offset = kPageSize / 2; loff_t in_offset = kPageSize / 2;
EXPECT_THAT( EXPECT_THAT(
splice(inf.get(), &in_offset, wfd.get(), nullptr, kPageSize / 2, 0), splice(in_fd.get(), &in_offset, wfd.get(), nullptr, kPageSize / 2, 0),
SyscallSucceedsWithValue(kPageSize / 2)); SyscallSucceedsWithValue(kPageSize / 2));
// Contents should be equal to only the second part. // Contents should be equal to only the second part.
@ -286,22 +287,22 @@ TEST(SpliceTest, FromPipe) {
// Open the input file. // Open the input file.
const TempPath out_file = ASSERT_NO_ERRNO_AND_VALUE(TempPath::CreateFile()); const TempPath out_file = ASSERT_NO_ERRNO_AND_VALUE(TempPath::CreateFile());
const FileDescriptor outf = const FileDescriptor out_fd =
ASSERT_NO_ERRNO_AND_VALUE(Open(out_file.path(), O_RDWR)); ASSERT_NO_ERRNO_AND_VALUE(Open(out_file.path(), O_RDWR));
// Splice to the output file. // Splice to the output file.
EXPECT_THAT(splice(rfd.get(), nullptr, outf.get(), nullptr, kPageSize, 0), EXPECT_THAT(splice(rfd.get(), nullptr, out_fd.get(), nullptr, kPageSize, 0),
SyscallSucceedsWithValue(kPageSize)); SyscallSucceedsWithValue(kPageSize));
// The offset of the output should be equal to kPageSize. We assert that and // The offset of the output should be equal to kPageSize. We assert that and
// reset to zero so that we can read the contents and ensure they match. // reset to zero so that we can read the contents and ensure they match.
EXPECT_THAT(lseek(outf.get(), 0, SEEK_CUR), EXPECT_THAT(lseek(out_fd.get(), 0, SEEK_CUR),
SyscallSucceedsWithValue(kPageSize)); SyscallSucceedsWithValue(kPageSize));
ASSERT_THAT(lseek(outf.get(), 0, SEEK_SET), SyscallSucceedsWithValue(0)); ASSERT_THAT(lseek(out_fd.get(), 0, SEEK_SET), SyscallSucceedsWithValue(0));
// Contents should be equal. // Contents should be equal.
std::vector<char> rbuf(kPageSize); std::vector<char> rbuf(kPageSize);
ASSERT_THAT(read(outf.get(), rbuf.data(), rbuf.size()), ASSERT_THAT(read(out_fd.get(), rbuf.data(), rbuf.size()),
SyscallSucceedsWithValue(kPageSize)); SyscallSucceedsWithValue(kPageSize));
EXPECT_EQ(memcmp(rbuf.data(), buf.data(), buf.size()), 0); EXPECT_EQ(memcmp(rbuf.data(), buf.data(), buf.size()), 0);
} }
@ -321,18 +322,19 @@ TEST(SpliceTest, FromPipeOffset) {
// Open the input file. // Open the input file.
const TempPath out_file = ASSERT_NO_ERRNO_AND_VALUE(TempPath::CreateFile()); const TempPath out_file = ASSERT_NO_ERRNO_AND_VALUE(TempPath::CreateFile());
const FileDescriptor outf = const FileDescriptor out_fd =
ASSERT_NO_ERRNO_AND_VALUE(Open(out_file.path(), O_RDWR)); ASSERT_NO_ERRNO_AND_VALUE(Open(out_file.path(), O_RDWR));
// Splice to the output file. // Splice to the output file.
loff_t out_offset = kPageSize / 2; loff_t out_offset = kPageSize / 2;
EXPECT_THAT(splice(rfd.get(), nullptr, outf.get(), &out_offset, kPageSize, 0), EXPECT_THAT(
SyscallSucceedsWithValue(kPageSize)); splice(rfd.get(), nullptr, out_fd.get(), &out_offset, kPageSize, 0),
SyscallSucceedsWithValue(kPageSize));
// Content should reflect the splice. We write to a specific offset in the // Content should reflect the splice. We write to a specific offset in the
// file, so the internals should now be allocated sparsely. // file, so the internals should now be allocated sparsely.
std::vector<char> rbuf(kPageSize); std::vector<char> rbuf(kPageSize);
ASSERT_THAT(read(outf.get(), rbuf.data(), rbuf.size()), ASSERT_THAT(read(out_fd.get(), rbuf.data(), rbuf.size()),
SyscallSucceedsWithValue(kPageSize)); SyscallSucceedsWithValue(kPageSize));
std::vector<char> zbuf(kPageSize / 2); std::vector<char> zbuf(kPageSize / 2);
memset(zbuf.data(), 0, zbuf.size()); memset(zbuf.data(), 0, zbuf.size());
@ -404,8 +406,6 @@ TEST(SpliceTest, Blocking) {
} }
TEST(TeeTest, Blocking) { TEST(TeeTest, Blocking) {
SKIP_IF(IsRunningOnGvisor());
// Create two new pipes. // Create two new pipes.
int first[2], second[2]; int first[2], second[2];
ASSERT_THAT(pipe(first), SyscallSucceeds()); ASSERT_THAT(pipe(first), SyscallSucceeds());
@ -440,6 +440,49 @@ TEST(TeeTest, Blocking) {
EXPECT_EQ(memcmp(rbuf.data(), buf.data(), kPageSize), 0); EXPECT_EQ(memcmp(rbuf.data(), buf.data(), kPageSize), 0);
} }
TEST(TeeTest, BlockingWrite) {
// Create two new pipes.
int first[2], second[2];
ASSERT_THAT(pipe(first), SyscallSucceeds());
const FileDescriptor rfd1(first[0]);
const FileDescriptor wfd1(first[1]);
ASSERT_THAT(pipe(second), SyscallSucceeds());
const FileDescriptor rfd2(second[0]);
const FileDescriptor wfd2(second[1]);
// Make some data available to be read.
std::vector<char> buf1(kPageSize);
RandomizeBuffer(buf1.data(), buf1.size());
ASSERT_THAT(write(wfd1.get(), buf1.data(), buf1.size()),
SyscallSucceedsWithValue(kPageSize));
// Fill up the write pipe's buffer.
int pipe_size = -1;
ASSERT_THAT(pipe_size = fcntl(wfd2.get(), F_GETPIPE_SZ), SyscallSucceeds());
std::vector<char> buf2(pipe_size);
ASSERT_THAT(write(wfd2.get(), buf2.data(), buf2.size()),
SyscallSucceedsWithValue(pipe_size));
ScopedThread t([&]() {
absl::SleepFor(absl::Milliseconds(100));
ASSERT_THAT(read(rfd2.get(), buf2.data(), buf2.size()),
SyscallSucceedsWithValue(pipe_size));
});
// Attempt a tee immediately; it should block.
EXPECT_THAT(tee(rfd1.get(), wfd2.get(), kPageSize, 0),
SyscallSucceedsWithValue(kPageSize));
// Thread should be joinable.
t.Join();
// Content should reflect the tee.
std::vector<char> rbuf(kPageSize);
ASSERT_THAT(read(rfd2.get(), rbuf.data(), rbuf.size()),
SyscallSucceedsWithValue(kPageSize));
EXPECT_EQ(memcmp(rbuf.data(), buf1.data(), kPageSize), 0);
}
TEST(SpliceTest, NonBlocking) { TEST(SpliceTest, NonBlocking) {
// Create two new pipes. // Create two new pipes.
int first[2], second[2]; int first[2], second[2];
@ -457,8 +500,6 @@ TEST(SpliceTest, NonBlocking) {
} }
TEST(TeeTest, NonBlocking) { TEST(TeeTest, NonBlocking) {
SKIP_IF(IsRunningOnGvisor());
// Create two new pipes. // Create two new pipes.
int first[2], second[2]; int first[2], second[2];
ASSERT_THAT(pipe(first), SyscallSucceeds()); ASSERT_THAT(pipe(first), SyscallSucceeds());
@ -473,6 +514,79 @@ TEST(TeeTest, NonBlocking) {
SyscallFailsWithErrno(EAGAIN)); SyscallFailsWithErrno(EAGAIN));
} }
TEST(TeeTest, MultiPage) {
// Create two new pipes.
int first[2], second[2];
ASSERT_THAT(pipe(first), SyscallSucceeds());
const FileDescriptor rfd1(first[0]);
const FileDescriptor wfd1(first[1]);
ASSERT_THAT(pipe(second), SyscallSucceeds());
const FileDescriptor rfd2(second[0]);
const FileDescriptor wfd2(second[1]);
// Make some data available to be read.
std::vector<char> wbuf(8 * kPageSize);
RandomizeBuffer(wbuf.data(), wbuf.size());
ASSERT_THAT(write(wfd1.get(), wbuf.data(), wbuf.size()),
SyscallSucceedsWithValue(wbuf.size()));
// Attempt a tee immediately; it should complete.
EXPECT_THAT(tee(rfd1.get(), wfd2.get(), wbuf.size(), 0),
SyscallSucceedsWithValue(wbuf.size()));
// Content should reflect the tee.
std::vector<char> rbuf(wbuf.size());
ASSERT_THAT(read(rfd2.get(), rbuf.data(), rbuf.size()),
SyscallSucceedsWithValue(rbuf.size()));
EXPECT_EQ(memcmp(rbuf.data(), wbuf.data(), rbuf.size()), 0);
ASSERT_THAT(read(rfd1.get(), rbuf.data(), rbuf.size()),
SyscallSucceedsWithValue(rbuf.size()));
EXPECT_EQ(memcmp(rbuf.data(), wbuf.data(), rbuf.size()), 0);
}
TEST(SpliceTest, FromPipeMaxFileSize) {
// Create a new pipe.
int fds[2];
ASSERT_THAT(pipe(fds), SyscallSucceeds());
const FileDescriptor rfd(fds[0]);
const FileDescriptor wfd(fds[1]);
// Fill with some random data.
std::vector<char> buf(kPageSize);
RandomizeBuffer(buf.data(), buf.size());
ASSERT_THAT(write(wfd.get(), buf.data(), buf.size()),
SyscallSucceedsWithValue(kPageSize));
// Open the input file.
const TempPath out_file = ASSERT_NO_ERRNO_AND_VALUE(TempPath::CreateFile());
const FileDescriptor out_fd =
ASSERT_NO_ERRNO_AND_VALUE(Open(out_file.path(), O_RDWR));
EXPECT_THAT(ftruncate(out_fd.get(), 13 << 20), SyscallSucceeds());
EXPECT_THAT(lseek(out_fd.get(), 0, SEEK_END),
SyscallSucceedsWithValue(13 << 20));
// Set our file size limit.
sigset_t set;
sigemptyset(&set);
sigaddset(&set, SIGXFSZ);
TEST_PCHECK(sigprocmask(SIG_BLOCK, &set, nullptr) == 0);
rlimit rlim = {};
rlim.rlim_cur = rlim.rlim_max = (13 << 20);
EXPECT_THAT(setrlimit(RLIMIT_FSIZE, &rlim), SyscallSucceeds());
// Splice to the output file.
EXPECT_THAT(
splice(rfd.get(), nullptr, out_fd.get(), nullptr, 3 * kPageSize, 0),
SyscallFailsWithErrno(EFBIG));
// Contents should be equal.
std::vector<char> rbuf(kPageSize);
ASSERT_THAT(read(rfd.get(), rbuf.data(), rbuf.size()),
SyscallSucceedsWithValue(kPageSize));
EXPECT_EQ(memcmp(rbuf.data(), buf.data(), buf.size()), 0);
}
} // namespace } // namespace
} // namespace testing } // namespace testing