gvisor/pkg/compressio/compressio.go

557 lines
14 KiB
Go

// Copyright 2018 Google Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package compressio provides parallel compression and decompression.
package compressio
import (
"bytes"
"compress/flate"
"errors"
"io"
"runtime"
"sync"
"gvisor.googlesource.com/gvisor/pkg/binary"
)
var bufPool = sync.Pool{
New: func() interface{} {
return bytes.NewBuffer(nil)
},
}
var chunkPool = sync.Pool{
New: func() interface{} {
return new(chunk)
},
}
// chunk is a unit of work.
type chunk struct {
// compressed is compressed data.
//
// This will always be returned to the bufPool directly when work has
// finished (in schedule) and therefore must be allocated.
compressed *bytes.Buffer
// uncompressed is the uncompressed data.
//
// This is not returned to the bufPool automatically, since it may
// correspond to a inline slice (provided directly to Read or Write).
uncompressed *bytes.Buffer
}
// newChunk allocates a new chunk object (or pulls one from the pool). Buffers
// will be allocated if nil is provided for compressed or uncompressed.
func newChunk(compressed *bytes.Buffer, uncompressed *bytes.Buffer) *chunk {
c := chunkPool.Get().(*chunk)
if compressed != nil {
c.compressed = compressed
} else {
c.compressed = bufPool.Get().(*bytes.Buffer)
}
if uncompressed != nil {
c.uncompressed = uncompressed
} else {
c.uncompressed = bufPool.Get().(*bytes.Buffer)
}
return c
}
// result is the result of some work; it includes the original chunk.
type result struct {
*chunk
err error
}
// worker is a compression/decompression worker.
//
// The associated worker goroutine reads in uncompressed buffers from input and
// writes compressed buffers to its output. Alternatively, the worker reads
// compressed buffers from input and writes uncompressed buffers to its output.
//
// The goroutine will exit when input is closed, and the goroutine will close
// output.
type worker struct {
input chan *chunk
output chan result
}
// work is the main work routine; see worker.
func (w *worker) work(compress bool, level int) {
defer close(w.output)
for c := range w.input {
if compress {
// Encode this slice.
fw, err := flate.NewWriter(c.compressed, level)
if err != nil {
w.output <- result{c, err}
continue
}
// Encode the input.
if _, err := io.Copy(fw, c.uncompressed); err != nil {
w.output <- result{c, err}
continue
}
if err := fw.Close(); err != nil {
w.output <- result{c, err}
continue
}
} else {
// Decode this slice.
fr := flate.NewReader(c.compressed)
// Decode the input.
if _, err := io.Copy(c.uncompressed, fr); err != nil {
w.output <- result{c, err}
continue
}
}
// Send the output.
w.output <- result{c, nil}
}
}
// pool is common functionality for reader/writers.
type pool struct {
// workers are the compression/decompression workers.
workers []worker
// chunkSize is the chunk size. This is the first four bytes in the
// stream and is shared across both the reader and writer.
chunkSize uint32
// mu protects below; it is generally the responsibility of users to
// acquire this mutex before calling any methods on the pool.
mu sync.Mutex
// nextInput is the next worker for input (scheduling).
nextInput int
// nextOutput is the next worker for output (result).
nextOutput int
// buf is the current active buffer; the exact semantics of this buffer
// depending on whether this is a reader or a writer.
buf *bytes.Buffer
}
// init initializes the worker pool.
//
// This should only be called once.
func (p *pool) init(compress bool, level int) {
p.workers = make([]worker, 1+runtime.GOMAXPROCS(0))
for i := 0; i < len(p.workers); i++ {
p.workers[i] = worker{
input: make(chan *chunk, 1),
output: make(chan result, 1),
}
go p.workers[i].work(compress, level) // S/R-SAFE: In save path only.
}
runtime.SetFinalizer(p, (*pool).stop)
}
// stop stops all workers.
func (p *pool) stop() {
for i := 0; i < len(p.workers); i++ {
close(p.workers[i].input)
}
p.workers = nil
}
// handleResult calls the callback.
func handleResult(r result, callback func(*chunk) error) error {
defer func() {
r.chunk.compressed.Reset()
bufPool.Put(r.chunk.compressed)
chunkPool.Put(r.chunk)
}()
if r.err != nil {
return r.err
}
return callback(r.chunk)
}
// schedule schedules the given buffers.
//
// If c is non-nil, then it will return as soon as the chunk is scheduled. If c
// is nil, then it will return only when no more work is left to do.
//
// If no callback function is provided, then the output channel will be
// ignored. You must be sure that the input is schedulable in this case.
func (p *pool) schedule(c *chunk, callback func(*chunk) error) error {
for {
var (
inputChan chan *chunk
outputChan chan result
)
if c != nil {
inputChan = p.workers[(p.nextInput+1)%len(p.workers)].input
}
if callback != nil && p.nextOutput != p.nextInput {
outputChan = p.workers[(p.nextOutput+1)%len(p.workers)].output
}
if inputChan == nil && outputChan == nil {
return nil
}
select {
case inputChan <- c:
p.nextInput++
return nil
case r := <-outputChan:
p.nextOutput++
if err := handleResult(r, callback); err != nil {
return err
}
}
}
}
// reader chunks reads and decompresses.
type reader struct {
pool
// in is the source.
in io.Reader
}
// NewReader returns a new compressed reader.
func NewReader(in io.Reader) (io.Reader, error) {
r := &reader{
in: in,
}
r.init(false, 0)
var err error
if r.chunkSize, err = binary.ReadUint32(r.in, binary.BigEndian); err != nil {
return nil, err
}
return r, nil
}
// errNewBuffer is returned when a new buffer is completed.
var errNewBuffer = errors.New("buffer ready")
// Read implements io.Reader.Read.
func (r *reader) Read(p []byte) (int, error) {
r.mu.Lock()
defer r.mu.Unlock()
// Total bytes completed; this is declared up front because it must be
// adjustable by the callback below.
done := 0
// Total bytes pending in the asynchronous workers for buffers. This is
// used to process the proper regions of the input as inline buffers.
var (
pendingPre = r.nextInput - r.nextOutput
pendingInline = 0
)
// Define our callback for completed work.
callback := func(c *chunk) error {
// Check for an inline buffer.
if pendingPre == 0 && pendingInline > 0 {
pendingInline--
done += c.uncompressed.Len()
return nil
}
// Copy the resulting buffer to our intermediate one, and
// return errNewBuffer to ensure that we aren't called a second
// time. This error code is handled specially below.
//
// c.buf will be freed and return to the pool when it is done.
if pendingPre > 0 {
pendingPre--
}
r.buf = c.uncompressed
return errNewBuffer
}
for done < len(p) {
// Do we have buffered data available?
if r.buf != nil {
n, err := r.buf.Read(p[done:])
done += n
if err == io.EOF {
// This is the uncompressed buffer, it can be
// returned to the pool at this point.
r.buf.Reset()
bufPool.Put(r.buf)
r.buf = nil
} else if err != nil {
// Should never happen.
defer r.stop()
return done, err
}
continue
}
// Read the length of the next chunk and reset the
// reader. The length is used to limit the reader.
//
// See writer.flush.
l, err := binary.ReadUint32(r.in, binary.BigEndian)
if err != nil {
// This is generally okay as long as there
// are still buffers outstanding. We actually
// just wait for completion of those buffers here
// and continue our loop.
if err := r.schedule(nil, callback); err == nil {
// We've actually finished all buffers; this is
// the normal EOF exit path.
defer r.stop()
return done, io.EOF
} else if err == errNewBuffer {
// A new buffer is now available.
continue
} else {
// Some other error occurred; we cannot
// process any further.
defer r.stop()
return done, err
}
}
// Read this chunk and schedule decompression.
compressed := bufPool.Get().(*bytes.Buffer)
if _, err := io.Copy(compressed, &io.LimitedReader{
R: r.in,
N: int64(l),
}); err != nil {
// Some other error occurred; see above.
return done, err
}
// Are we doing inline decoding?
//
// Note that we need to check the length here against
// bytes.MinRead, since the bytes library will choose to grow
// the slice if the available capacity is not at least
// bytes.MinRead. This limits inline decoding to chunkSizes
// that are at least bytes.MinRead (which is not unreasonable).
var c *chunk
start := done + ((pendingPre + pendingInline) * int(r.chunkSize))
if len(p) >= start+int(r.chunkSize) && len(p) >= start+bytes.MinRead {
c = newChunk(compressed, bytes.NewBuffer(p[start:start]))
pendingInline++
} else {
c = newChunk(compressed, nil)
}
if err := r.schedule(c, callback); err == errNewBuffer {
// A new buffer was completed while we were reading.
// That's great, but we need to force schedule the
// current buffer so that it does not get lost.
//
// It is safe to pass nil as an output function here,
// because we know that we just freed up a slot above.
r.schedule(c, nil)
} else if err != nil {
// Some other error occurred; see above.
defer r.stop()
return done, err
}
}
// Make sure that everything has been decoded successfully, otherwise
// parts of p may not actually have completed.
for pendingInline > 0 {
if err := r.schedule(nil, func(c *chunk) error {
if err := callback(c); err != nil {
return err
}
// The nil case means that an inline buffer has
// completed. The callback will have already removed
// the inline buffer from the map, so we just return an
// error to check the top of the loop again.
return errNewBuffer
}); err != errNewBuffer {
// Some other error occurred; see above.
return done, err
}
}
// Need to return done here, since it may have been adjusted by the
// callback to compensation for partial reads on some inline buffer.
return done, nil
}
// writer chunks and schedules writes.
type writer struct {
pool
// out is the underlying writer.
out io.Writer
// closed indicates whether the file has been closed.
closed bool
}
// NewWriter returns a new compressed writer.
//
// The recommended chunkSize is on the order of 1M. Extra memory may be
// buffered (in the form of read-ahead, or buffered writes), and is limited to
// O(chunkSize * [1+GOMAXPROCS]).
func NewWriter(out io.Writer, chunkSize uint32, level int) (io.WriteCloser, error) {
w := &writer{
pool: pool{
chunkSize: chunkSize,
buf: bufPool.Get().(*bytes.Buffer),
},
out: out,
}
w.init(true, level)
if err := binary.WriteUint32(w.out, binary.BigEndian, chunkSize); err != nil {
return nil, err
}
return w, nil
}
// flush writes a single buffer.
func (w *writer) flush(c *chunk) error {
// Prefix each chunk with a length; this allows the reader to safely
// limit reads while buffering.
l := uint32(c.compressed.Len())
if err := binary.WriteUint32(w.out, binary.BigEndian, l); err != nil {
return err
}
// Write out to the stream.
_, err := io.Copy(w.out, c.compressed)
return err
}
// Write implements io.Writer.Write.
func (w *writer) Write(p []byte) (int, error) {
w.mu.Lock()
defer w.mu.Unlock()
// Did we close already?
if w.closed {
return 0, io.ErrUnexpectedEOF
}
// See above; we need to track in the same way.
var (
pendingPre = w.nextInput - w.nextOutput
pendingInline = 0
)
callback := func(c *chunk) error {
if pendingPre == 0 && pendingInline > 0 {
pendingInline--
return w.flush(c)
}
if pendingPre > 0 {
pendingPre--
}
err := w.flush(c)
c.uncompressed.Reset()
bufPool.Put(c.uncompressed)
return err
}
for done := 0; done < len(p); {
// Construct an inline buffer if we're doing an inline
// encoding; see above regarding the bytes.MinRead constraint.
if w.buf.Len() == 0 && len(p) >= done+int(w.chunkSize) && len(p) >= done+bytes.MinRead {
bufPool.Put(w.buf) // Return to the pool; never scheduled.
w.buf = bytes.NewBuffer(p[done : done+int(w.chunkSize)])
done += int(w.chunkSize)
pendingInline++
}
// Do we need to flush w.buf? Note that this case should be hit
// immediately following the inline case above.
left := int(w.chunkSize) - w.buf.Len()
if left == 0 {
if err := w.schedule(newChunk(nil, w.buf), callback); err != nil {
return done, err
}
// Reset the buffer, since this has now been scheduled
// for compression. Note that this may be trampled
// immediately by the bufPool.Put(w.buf) above if the
// next buffer happens to be inline, but that's okay.
w.buf = bufPool.Get().(*bytes.Buffer)
continue
}
// Read from p into w.buf.
toWrite := len(p) - done
if toWrite > left {
toWrite = left
}
n, err := w.buf.Write(p[done : done+toWrite])
done += n
if err != nil {
return done, err
}
}
// Make sure that everything has been flushed, we can't return until
// all the contents from p have been used.
for pendingInline > 0 {
if err := w.schedule(nil, func(c *chunk) error {
if err := callback(c); err != nil {
return err
}
// The flush was successful, return errNewBuffer here
// to break from the loop and check the condition
// again.
return errNewBuffer
}); err != errNewBuffer {
return len(p), err
}
}
return len(p), nil
}
// Close implements io.Closer.Close.
func (w *writer) Close() error {
w.mu.Lock()
defer w.mu.Unlock()
// Did we already close? After the call to Close, we always mark as
// closed, regardless of whether the flush is successful.
if w.closed {
return io.ErrUnexpectedEOF
}
w.closed = true
defer w.stop()
// Schedule any remaining partial buffer; we pass w.flush directly here
// because the final buffer is guaranteed to not be an inline buffer.
if w.buf.Len() > 0 {
if err := w.schedule(newChunk(nil, w.buf), w.flush); err != nil {
return err
}
}
// Flush all scheduled buffers; see above.
if err := w.schedule(nil, w.flush); err != nil {
return err
}
// Close the underlying writer (if necessary).
if closer, ok := w.out.(io.Closer); ok {
return closer.Close()
}
return nil
}