gvisor/pkg/shim/proc/io.go

163 lines
3.8 KiB
Go

// Copyright 2018 The containerd Authors.
// Copyright 2018 The gVisor Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package proc
import (
"context"
"fmt"
"io"
"os"
"sync"
"sync/atomic"
"github.com/containerd/containerd/log"
"github.com/containerd/fifo"
runc "github.com/containerd/go-runc"
"golang.org/x/sys/unix"
)
// TODO(random-liu): This file can be a util.
var bufPool = sync.Pool{
New: func() interface{} {
buffer := make([]byte, 32<<10)
return &buffer
},
}
func copyPipes(ctx context.Context, rio runc.IO, stdin, stdout, stderr string, wg *sync.WaitGroup) error {
var sameFile *countingWriteCloser
for _, i := range []struct {
name string
dest func(wc io.WriteCloser, rc io.Closer)
}{
{
name: stdout,
dest: func(wc io.WriteCloser, rc io.Closer) {
wg.Add(1)
go func() {
p := bufPool.Get().(*[]byte)
defer bufPool.Put(p)
if _, err := io.CopyBuffer(wc, rio.Stdout(), *p); err != nil {
log.G(ctx).Warn("error copying stdout")
}
wg.Done()
wc.Close()
if rc != nil {
rc.Close()
}
}()
},
}, {
name: stderr,
dest: func(wc io.WriteCloser, rc io.Closer) {
wg.Add(1)
go func() {
p := bufPool.Get().(*[]byte)
defer bufPool.Put(p)
if _, err := io.CopyBuffer(wc, rio.Stderr(), *p); err != nil {
log.G(ctx).Warn("error copying stderr")
}
wg.Done()
wc.Close()
if rc != nil {
rc.Close()
}
}()
},
},
} {
ok, err := isFifo(i.name)
if err != nil {
return err
}
var (
fw io.WriteCloser
fr io.Closer
)
if ok {
if fw, err = fifo.OpenFifo(ctx, i.name, unix.O_WRONLY, 0); err != nil {
return fmt.Errorf("gvisor-containerd-shim: opening %s failed: %s", i.name, err)
}
if fr, err = fifo.OpenFifo(ctx, i.name, unix.O_RDONLY, 0); err != nil {
return fmt.Errorf("gvisor-containerd-shim: opening %s failed: %s", i.name, err)
}
} else {
if sameFile != nil {
sameFile.count++
i.dest(sameFile, nil)
continue
}
if fw, err = os.OpenFile(i.name, unix.O_WRONLY|unix.O_APPEND, 0); err != nil {
return fmt.Errorf("gvisor-containerd-shim: opening %s failed: %s", i.name, err)
}
if stdout == stderr {
sameFile = &countingWriteCloser{
WriteCloser: fw,
count: 1,
}
}
}
i.dest(fw, fr)
}
if stdin == "" {
return nil
}
f, err := fifo.OpenFifo(context.Background(), stdin, unix.O_RDONLY|unix.O_NONBLOCK, 0)
if err != nil {
return fmt.Errorf("gvisor-containerd-shim: opening %s failed: %s", stdin, err)
}
go func() {
p := bufPool.Get().(*[]byte)
defer bufPool.Put(p)
io.CopyBuffer(rio.Stdin(), f, *p)
rio.Stdin().Close()
f.Close()
}()
return nil
}
// countingWriteCloser masks io.Closer() until close has been invoked a certain number of times.
type countingWriteCloser struct {
io.WriteCloser
count int64
}
func (c *countingWriteCloser) Close() error {
if atomic.AddInt64(&c.count, -1) > 0 {
return nil
}
return c.WriteCloser.Close()
}
// isFifo checks if a file is a fifo.
//
// If the file does not exist then it returns false.
func isFifo(path string) (bool, error) {
stat, err := os.Stat(path)
if err != nil {
if os.IsNotExist(err) {
return false, nil
}
return false, err
}
if stat.Mode()&os.ModeNamedPipe == os.ModeNamedPipe {
return true, nil
}
return false, nil
}