gvisor/pkg/compressio/compressio.go

744 lines
20 KiB
Go

// Copyright 2018 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package compressio provides parallel compression and decompression, as well
// as optional SHA-256 hashing.
//
// The stream format is defined as follows.
//
// /------------------------------------------------------\
// | chunk size (4-bytes) |
// +------------------------------------------------------+
// | (optional) hash (32-bytes) |
// +------------------------------------------------------+
// | compressed data size (4-bytes) |
// +------------------------------------------------------+
// | compressed data |
// +------------------------------------------------------+
// | (optional) hash (32-bytes) |
// +------------------------------------------------------+
// | compressed data size (4-bytes) |
// +------------------------------------------------------+
// | ...... |
// \------------------------------------------------------/
//
// where each subsequent hash is calculated from the following items in order
//
// compressed data
// compressed data size
// previous hash
//
// so the stream integrity cannot be compromised by switching and mixing
// compressed chunks.
package compressio
import (
"bytes"
"compress/flate"
"crypto/hmac"
"crypto/sha256"
"errors"
"hash"
"io"
"runtime"
"sync"
"gvisor.googlesource.com/gvisor/pkg/binary"
)
var bufPool = sync.Pool{
New: func() interface{} {
return bytes.NewBuffer(nil)
},
}
var chunkPool = sync.Pool{
New: func() interface{} {
return new(chunk)
},
}
// chunk is a unit of work.
type chunk struct {
// compressed is compressed data.
//
// This will always be returned to the bufPool directly when work has
// finished (in schedule) and therefore must be allocated.
compressed *bytes.Buffer
// uncompressed is the uncompressed data.
//
// This is not returned to the bufPool automatically, since it may
// correspond to a inline slice (provided directly to Read or Write).
uncompressed *bytes.Buffer
// The current hash object. Only used in compress mode.
h hash.Hash
// The hash from previous chunks. Only used in uncompress mode.
lastSum []byte
// The expected hash after current chunk. Only used in uncompress mode.
sum []byte
}
// newChunk allocates a new chunk object (or pulls one from the pool). Buffers
// will be allocated if nil is provided for compressed or uncompressed.
func newChunk(lastSum []byte, sum []byte, compressed *bytes.Buffer, uncompressed *bytes.Buffer) *chunk {
c := chunkPool.Get().(*chunk)
c.lastSum = lastSum
c.sum = sum
if compressed != nil {
c.compressed = compressed
} else {
c.compressed = bufPool.Get().(*bytes.Buffer)
}
if uncompressed != nil {
c.uncompressed = uncompressed
} else {
c.uncompressed = bufPool.Get().(*bytes.Buffer)
}
return c
}
// result is the result of some work; it includes the original chunk.
type result struct {
*chunk
err error
}
// worker is a compression/decompression worker.
//
// The associated worker goroutine reads in uncompressed buffers from input and
// writes compressed buffers to its output. Alternatively, the worker reads
// compressed buffers from input and writes uncompressed buffers to its output.
//
// The goroutine will exit when input is closed, and the goroutine will close
// output.
type worker struct {
hashPool *hashPool
input chan *chunk
output chan result
}
// work is the main work routine; see worker.
func (w *worker) work(compress bool, level int) {
defer close(w.output)
var h hash.Hash
for c := range w.input {
if h == nil && w.hashPool != nil {
h = w.hashPool.getHash()
}
if compress {
mw := io.Writer(c.compressed)
if h != nil {
mw = io.MultiWriter(mw, h)
}
// Encode this slice.
fw, err := flate.NewWriter(mw, level)
if err != nil {
w.output <- result{c, err}
continue
}
// Encode the input.
if _, err := io.CopyN(fw, c.uncompressed, int64(c.uncompressed.Len())); err != nil {
w.output <- result{c, err}
continue
}
if err := fw.Close(); err != nil {
w.output <- result{c, err}
continue
}
// Write the hash, if enabled.
if h != nil {
binary.WriteUint32(h, binary.BigEndian, uint32(c.compressed.Len()))
c.h = h
h = nil
}
} else {
// Check the hash of the compressed contents.
if h != nil {
h.Write(c.compressed.Bytes())
binary.WriteUint32(h, binary.BigEndian, uint32(c.compressed.Len()))
io.CopyN(h, bytes.NewReader(c.lastSum), int64(len(c.lastSum)))
sum := h.Sum(nil)
h.Reset()
if !hmac.Equal(c.sum, sum) {
w.output <- result{c, ErrHashMismatch}
continue
}
}
// Decode this slice.
fr := flate.NewReader(c.compressed)
// Decode the input.
if _, err := io.Copy(c.uncompressed, fr); err != nil {
w.output <- result{c, err}
continue
}
}
// Send the output.
w.output <- result{c, nil}
}
}
type hashPool struct {
// mu protexts the hash list.
mu sync.Mutex
// key is the key used to create hash objects.
key []byte
// hashes is the hash object free list. Note that this cannot be
// globally shared across readers or writers, as it is key-specific.
hashes []hash.Hash
}
// getHash gets a hash object for the pool. It should only be called when the
// pool key is non-nil.
func (p *hashPool) getHash() hash.Hash {
p.mu.Lock()
defer p.mu.Unlock()
if len(p.hashes) == 0 {
return hmac.New(sha256.New, p.key)
}
h := p.hashes[len(p.hashes)-1]
p.hashes = p.hashes[:len(p.hashes)-1]
return h
}
func (p *hashPool) putHash(h hash.Hash) {
h.Reset()
p.mu.Lock()
defer p.mu.Unlock()
p.hashes = append(p.hashes, h)
}
// pool is common functionality for reader/writers.
type pool struct {
// workers are the compression/decompression workers.
workers []worker
// chunkSize is the chunk size. This is the first four bytes in the
// stream and is shared across both the reader and writer.
chunkSize uint32
// mu protects below; it is generally the responsibility of users to
// acquire this mutex before calling any methods on the pool.
mu sync.Mutex
// nextInput is the next worker for input (scheduling).
nextInput int
// nextOutput is the next worker for output (result).
nextOutput int
// buf is the current active buffer; the exact semantics of this buffer
// depending on whether this is a reader or a writer.
buf *bytes.Buffer
// lasSum records the hash of the last chunk processed.
lastSum []byte
// hashPool is the hash object pool. It cannot be embedded into pool
// itself as worker refers to it and that would stop pool from being
// GCed.
hashPool *hashPool
}
// init initializes the worker pool.
//
// This should only be called once.
func (p *pool) init(key []byte, workers int, compress bool, level int) {
if key != nil {
p.hashPool = &hashPool{key: key}
}
p.workers = make([]worker, workers)
for i := 0; i < len(p.workers); i++ {
p.workers[i] = worker{
hashPool: p.hashPool,
input: make(chan *chunk, 1),
output: make(chan result, 1),
}
go p.workers[i].work(compress, level) // S/R-SAFE: In save path only.
}
runtime.SetFinalizer(p, (*pool).stop)
}
// stop stops all workers.
func (p *pool) stop() {
for i := 0; i < len(p.workers); i++ {
close(p.workers[i].input)
}
p.workers = nil
p.hashPool = nil
}
// handleResult calls the callback.
func handleResult(r result, callback func(*chunk) error) error {
defer func() {
r.chunk.compressed.Reset()
bufPool.Put(r.chunk.compressed)
chunkPool.Put(r.chunk)
}()
if r.err != nil {
return r.err
}
return callback(r.chunk)
}
// schedule schedules the given buffers.
//
// If c is non-nil, then it will return as soon as the chunk is scheduled. If c
// is nil, then it will return only when no more work is left to do.
//
// If no callback function is provided, then the output channel will be
// ignored. You must be sure that the input is schedulable in this case.
func (p *pool) schedule(c *chunk, callback func(*chunk) error) error {
for {
var (
inputChan chan *chunk
outputChan chan result
)
if c != nil && len(p.workers) != 0 {
inputChan = p.workers[(p.nextInput+1)%len(p.workers)].input
}
if callback != nil && p.nextOutput != p.nextInput && len(p.workers) != 0 {
outputChan = p.workers[(p.nextOutput+1)%len(p.workers)].output
}
if inputChan == nil && outputChan == nil {
return nil
}
select {
case inputChan <- c:
p.nextInput++
return nil
case r := <-outputChan:
p.nextOutput++
if err := handleResult(r, callback); err != nil {
return err
}
}
}
}
// reader chunks reads and decompresses.
type reader struct {
pool
// in is the source.
in io.Reader
}
// NewReader returns a new compressed reader. If key is non-nil, the data stream
// is assumed to contain expected hash values, which will be compared against
// hash values computed from the compressed bytes. See package comments for
// details.
func NewReader(in io.Reader, key []byte) (io.Reader, error) {
r := &reader{
in: in,
}
// Use double buffering for read.
r.init(key, 2*runtime.GOMAXPROCS(0), false, 0)
var err error
if r.chunkSize, err = binary.ReadUint32(in, binary.BigEndian); err != nil {
return nil, err
}
if r.hashPool != nil {
h := r.hashPool.getHash()
binary.WriteUint32(h, binary.BigEndian, r.chunkSize)
r.lastSum = h.Sum(nil)
r.hashPool.putHash(h)
sum := make([]byte, len(r.lastSum))
if _, err := io.ReadFull(r.in, sum); err != nil {
return nil, err
}
if !hmac.Equal(r.lastSum, sum) {
return nil, ErrHashMismatch
}
}
return r, nil
}
// errNewBuffer is returned when a new buffer is completed.
var errNewBuffer = errors.New("buffer ready")
// ErrHashMismatch is returned if the hash does not match.
var ErrHashMismatch = errors.New("hash mismatch")
// Read implements io.Reader.Read.
func (r *reader) Read(p []byte) (int, error) {
r.mu.Lock()
defer r.mu.Unlock()
// Total bytes completed; this is declared up front because it must be
// adjustable by the callback below.
done := 0
// Total bytes pending in the asynchronous workers for buffers. This is
// used to process the proper regions of the input as inline buffers.
var (
pendingPre = r.nextInput - r.nextOutput
pendingInline = 0
)
// Define our callback for completed work.
callback := func(c *chunk) error {
// Check for an inline buffer.
if pendingPre == 0 && pendingInline > 0 {
pendingInline--
done += c.uncompressed.Len()
return nil
}
// Copy the resulting buffer to our intermediate one, and
// return errNewBuffer to ensure that we aren't called a second
// time. This error code is handled specially below.
//
// c.buf will be freed and return to the pool when it is done.
if pendingPre > 0 {
pendingPre--
}
r.buf = c.uncompressed
return errNewBuffer
}
for done < len(p) {
// Do we have buffered data available?
if r.buf != nil {
n, err := r.buf.Read(p[done:])
done += n
if err == io.EOF {
// This is the uncompressed buffer, it can be
// returned to the pool at this point.
r.buf.Reset()
bufPool.Put(r.buf)
r.buf = nil
} else if err != nil {
// Should never happen.
defer r.stop()
return done, err
}
continue
}
// Read the length of the next chunk and reset the
// reader. The length is used to limit the reader.
//
// See writer.flush.
l, err := binary.ReadUint32(r.in, binary.BigEndian)
if err != nil {
// This is generally okay as long as there
// are still buffers outstanding. We actually
// just wait for completion of those buffers here
// and continue our loop.
if err := r.schedule(nil, callback); err == nil {
// We've actually finished all buffers; this is
// the normal EOF exit path.
defer r.stop()
return done, io.EOF
} else if err == errNewBuffer {
// A new buffer is now available.
continue
} else {
// Some other error occurred; we cannot
// process any further.
defer r.stop()
return done, err
}
}
// Read this chunk and schedule decompression.
compressed := bufPool.Get().(*bytes.Buffer)
if _, err := io.CopyN(compressed, r.in, int64(l)); err != nil {
// Some other error occurred; see above.
if err == io.EOF {
err = io.ErrUnexpectedEOF
}
return done, err
}
var sum []byte
if r.hashPool != nil {
sum = make([]byte, len(r.lastSum))
if _, err := io.ReadFull(r.in, sum); err != nil {
if err == io.EOF {
err = io.ErrUnexpectedEOF
}
return done, err
}
}
// Are we doing inline decoding?
//
// Note that we need to check the length here against
// bytes.MinRead, since the bytes library will choose to grow
// the slice if the available capacity is not at least
// bytes.MinRead. This limits inline decoding to chunkSizes
// that are at least bytes.MinRead (which is not unreasonable).
var c *chunk
start := done + ((pendingPre + pendingInline) * int(r.chunkSize))
if len(p) >= start+int(r.chunkSize) && len(p) >= start+bytes.MinRead {
c = newChunk(r.lastSum, sum, compressed, bytes.NewBuffer(p[start:start]))
pendingInline++
} else {
c = newChunk(r.lastSum, sum, compressed, nil)
}
r.lastSum = sum
if err := r.schedule(c, callback); err == errNewBuffer {
// A new buffer was completed while we were reading.
// That's great, but we need to force schedule the
// current buffer so that it does not get lost.
//
// It is safe to pass nil as an output function here,
// because we know that we just freed up a slot above.
r.schedule(c, nil)
} else if err != nil {
// Some other error occurred; see above.
defer r.stop()
return done, err
}
}
// Make sure that everything has been decoded successfully, otherwise
// parts of p may not actually have completed.
for pendingInline > 0 {
if err := r.schedule(nil, func(c *chunk) error {
if err := callback(c); err != nil {
return err
}
// The nil case means that an inline buffer has
// completed. The callback will have already removed
// the inline buffer from the map, so we just return an
// error to check the top of the loop again.
return errNewBuffer
}); err != errNewBuffer {
// Some other error occurred; see above.
return done, err
}
}
// Need to return done here, since it may have been adjusted by the
// callback to compensation for partial reads on some inline buffer.
return done, nil
}
// writer chunks and schedules writes.
type writer struct {
pool
// out is the underlying writer.
out io.Writer
// closed indicates whether the file has been closed.
closed bool
}
// NewWriter returns a new compressed writer. If key is non-nil, hash values are
// generated and written out for compressed bytes. See package comments for
// details.
//
// The recommended chunkSize is on the order of 1M. Extra memory may be
// buffered (in the form of read-ahead, or buffered writes), and is limited to
// O(chunkSize * [1+GOMAXPROCS]).
func NewWriter(out io.Writer, key []byte, chunkSize uint32, level int) (io.WriteCloser, error) {
w := &writer{
pool: pool{
chunkSize: chunkSize,
buf: bufPool.Get().(*bytes.Buffer),
},
out: out,
}
w.init(key, 1+runtime.GOMAXPROCS(0), true, level)
if err := binary.WriteUint32(w.out, binary.BigEndian, chunkSize); err != nil {
return nil, err
}
if w.hashPool != nil {
h := w.hashPool.getHash()
binary.WriteUint32(h, binary.BigEndian, chunkSize)
w.lastSum = h.Sum(nil)
w.hashPool.putHash(h)
if _, err := io.CopyN(w.out, bytes.NewReader(w.lastSum), int64(len(w.lastSum))); err != nil {
return nil, err
}
}
return w, nil
}
// flush writes a single buffer.
func (w *writer) flush(c *chunk) error {
// Prefix each chunk with a length; this allows the reader to safely
// limit reads while buffering.
l := uint32(c.compressed.Len())
if err := binary.WriteUint32(w.out, binary.BigEndian, l); err != nil {
return err
}
// Write out to the stream.
if _, err := io.CopyN(w.out, c.compressed, int64(c.compressed.Len())); err != nil {
return err
}
if w.hashPool != nil {
io.CopyN(c.h, bytes.NewReader(w.lastSum), int64(len(w.lastSum)))
sum := c.h.Sum(nil)
w.hashPool.putHash(c.h)
c.h = nil
if _, err := io.CopyN(w.out, bytes.NewReader(sum), int64(len(sum))); err != nil {
return err
}
w.lastSum = sum
}
return nil
}
// Write implements io.Writer.Write.
func (w *writer) Write(p []byte) (int, error) {
w.mu.Lock()
defer w.mu.Unlock()
// Did we close already?
if w.closed {
return 0, io.ErrUnexpectedEOF
}
// See above; we need to track in the same way.
var (
pendingPre = w.nextInput - w.nextOutput
pendingInline = 0
)
callback := func(c *chunk) error {
if pendingPre == 0 && pendingInline > 0 {
pendingInline--
return w.flush(c)
}
if pendingPre > 0 {
pendingPre--
}
err := w.flush(c)
c.uncompressed.Reset()
bufPool.Put(c.uncompressed)
return err
}
for done := 0; done < len(p); {
// Construct an inline buffer if we're doing an inline
// encoding; see above regarding the bytes.MinRead constraint.
if w.buf.Len() == 0 && len(p) >= done+int(w.chunkSize) && len(p) >= done+bytes.MinRead {
bufPool.Put(w.buf) // Return to the pool; never scheduled.
w.buf = bytes.NewBuffer(p[done : done+int(w.chunkSize)])
done += int(w.chunkSize)
pendingInline++
}
// Do we need to flush w.buf? Note that this case should be hit
// immediately following the inline case above.
left := int(w.chunkSize) - w.buf.Len()
if left == 0 {
if err := w.schedule(newChunk(nil, nil, nil, w.buf), callback); err != nil {
return done, err
}
// Reset the buffer, since this has now been scheduled
// for compression. Note that this may be trampled
// immediately by the bufPool.Put(w.buf) above if the
// next buffer happens to be inline, but that's okay.
w.buf = bufPool.Get().(*bytes.Buffer)
continue
}
// Read from p into w.buf.
toWrite := len(p) - done
if toWrite > left {
toWrite = left
}
n, err := w.buf.Write(p[done : done+toWrite])
done += n
if err != nil {
return done, err
}
}
// Make sure that everything has been flushed, we can't return until
// all the contents from p have been used.
for pendingInline > 0 {
if err := w.schedule(nil, func(c *chunk) error {
if err := callback(c); err != nil {
return err
}
// The flush was successful, return errNewBuffer here
// to break from the loop and check the condition
// again.
return errNewBuffer
}); err != errNewBuffer {
return len(p), err
}
}
return len(p), nil
}
// Close implements io.Closer.Close.
func (w *writer) Close() error {
w.mu.Lock()
defer w.mu.Unlock()
// Did we already close? After the call to Close, we always mark as
// closed, regardless of whether the flush is successful.
if w.closed {
return io.ErrUnexpectedEOF
}
w.closed = true
defer w.stop()
// Schedule any remaining partial buffer; we pass w.flush directly here
// because the final buffer is guaranteed to not be an inline buffer.
if w.buf.Len() > 0 {
if err := w.schedule(newChunk(nil, nil, nil, w.buf), w.flush); err != nil {
return err
}
}
// Flush all scheduled buffers; see above.
if err := w.schedule(nil, w.flush); err != nil {
return err
}
// Close the underlying writer (if necessary).
if closer, ok := w.out.(io.Closer); ok {
return closer.Close()
}
return nil
}