// Copyright 2019 The gVisor Authors. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package pipe import ( "io" "math" "syscall" "gvisor.dev/gvisor/pkg/abi/linux" "gvisor.dev/gvisor/pkg/amutex" "gvisor.dev/gvisor/pkg/context" "gvisor.dev/gvisor/pkg/sentry/arch" "gvisor.dev/gvisor/pkg/sync" "gvisor.dev/gvisor/pkg/usermem" "gvisor.dev/gvisor/pkg/waiter" ) // This file contains Pipe file functionality that is tied to neither VFS nor // the old fs architecture. // Release cleans up the pipe's state. func (p *Pipe) Release() { p.rClose() p.wClose() // Wake up readers and writers. p.Notify(waiter.EventIn | waiter.EventOut) } // Read reads from the Pipe into dst. func (p *Pipe) Read(ctx context.Context, dst usermem.IOSequence) (int64, error) { n, err := p.read(ctx, readOps{ left: func() int64 { return dst.NumBytes() }, limit: func(l int64) { dst = dst.TakeFirst64(l) }, read: func(buf *buffer) (int64, error) { n, err := dst.CopyOutFrom(ctx, buf) dst = dst.DropFirst64(n) return n, err }, }) if n > 0 { p.Notify(waiter.EventOut) } return n, err } // WriteTo writes to w from the Pipe. func (p *Pipe) WriteTo(ctx context.Context, w io.Writer, count int64, dup bool) (int64, error) { ops := readOps{ left: func() int64 { return count }, limit: func(l int64) { count = l }, read: func(buf *buffer) (int64, error) { n, err := buf.ReadToWriter(w, count, dup) count -= n return n, err }, } if dup { // There is no notification for dup operations. return p.dup(ctx, ops) } n, err := p.read(ctx, ops) if n > 0 { p.Notify(waiter.EventOut) } return n, err } // Write writes to the Pipe from src. func (p *Pipe) Write(ctx context.Context, src usermem.IOSequence) (int64, error) { n, err := p.write(ctx, writeOps{ left: func() int64 { return src.NumBytes() }, limit: func(l int64) { src = src.TakeFirst64(l) }, write: func(buf *buffer) (int64, error) { n, err := src.CopyInTo(ctx, buf) src = src.DropFirst64(n) return n, err }, }) if n > 0 { p.Notify(waiter.EventIn) } return n, err } // ReadFrom reads from r to the Pipe. func (p *Pipe) ReadFrom(ctx context.Context, r io.Reader, count int64) (int64, error) { n, err := p.write(ctx, writeOps{ left: func() int64 { return count }, limit: func(l int64) { count = l }, write: func(buf *buffer) (int64, error) { n, err := buf.WriteFromReader(r, count) count -= n return n, err }, }) if n > 0 { p.Notify(waiter.EventIn) } return n, err } // Readiness returns the ready events in the underlying pipe. func (p *Pipe) Readiness(mask waiter.EventMask) waiter.EventMask { return p.rwReadiness() & mask } // Ioctl implements ioctls on the Pipe. func (p *Pipe) Ioctl(ctx context.Context, io usermem.IO, args arch.SyscallArguments) (uintptr, error) { // Switch on ioctl request. switch int(args[1].Int()) { case linux.FIONREAD: v := p.queued() if v > math.MaxInt32 { v = math.MaxInt32 // Silently truncate. } // Copy result to user-space. _, err := usermem.CopyObjectOut(ctx, io, args[2].Pointer(), int32(v), usermem.IOOpts{ AddressSpaceActive: true, }) return 0, err default: return 0, syscall.ENOTTY } } // waitFor blocks until the underlying pipe has at least one reader/writer is // announced via 'wakeupChan', or until 'sleeper' is cancelled. Any call to this // function will block for either readers or writers, depending on where // 'wakeupChan' points. // // mu must be held by the caller. waitFor returns with mu held, but it will // drop mu before blocking for any reader/writers. func waitFor(mu *sync.Mutex, wakeupChan *chan struct{}, sleeper amutex.Sleeper) bool { // Ideally this function would simply use a condition variable. However, the // wait needs to be interruptible via 'sleeper', so we must sychronize via a // channel. The synchronization below relies on the fact that closing a // channel unblocks all receives on the channel. // Does an appropriate wakeup channel already exist? If not, create a new // one. This is all done under f.mu to avoid races. if *wakeupChan == nil { *wakeupChan = make(chan struct{}) } // Grab a local reference to the wakeup channel since it may disappear as // soon as we drop f.mu. wakeup := *wakeupChan // Drop the lock and prepare to sleep. mu.Unlock() cancel := sleeper.SleepStart() // Wait for either a new reader/write to be signalled via 'wakeup', or // for the sleep to be cancelled. select { case <-wakeup: sleeper.SleepFinish(true) case <-cancel: sleeper.SleepFinish(false) } // Take the lock and check if we were woken. If we were woken and // interrupted, the former takes priority. mu.Lock() select { case <-wakeup: return true default: return false } } // newHandleLocked signals a new pipe reader or writer depending on where // 'wakeupChan' points. This unblocks any corresponding reader or writer // waiting for the other end of the channel to be opened, see Fifo.waitFor. // // Precondition: the mutex protecting wakeupChan must be held. func newHandleLocked(wakeupChan *chan struct{}) { if *wakeupChan != nil { close(*wakeupChan) *wakeupChan = nil } }