Merge pull request #307 from cyphar/vendor-ci193

[donotmerge] vendor: update to c/i@copy-obey-destination-compression
This commit is contained in:
Antonio Murdaca 2017-02-17 14:03:28 +01:00 committed by GitHub
commit b9826f0c42
3 changed files with 71 additions and 22 deletions

View File

@ -13,6 +13,7 @@ import (
"github.com/Sirupsen/logrus"
"github.com/containers/image/image"
"github.com/containers/image/manifest"
"github.com/containers/image/pkg/compression"
"github.com/containers/image/signature"
"github.com/containers/image/transports"
"github.com/containers/image/types"
@ -370,7 +371,7 @@ func (ic *imageCopier) copyLayer(srcInfo types.BlobInfo) (types.BlobInfo, digest
// and returns a complete blobInfo of the copied blob and perhaps a <-chan diffIDResult if diffIDIsNeeded, to be read by the caller.
func (ic *imageCopier) copyLayerFromStream(srcStream io.Reader, srcInfo types.BlobInfo,
diffIDIsNeeded bool) (types.BlobInfo, <-chan diffIDResult, error) {
var getDiffIDRecorder func(decompressorFunc) io.Writer // = nil
var getDiffIDRecorder func(compression.DecompressorFunc) io.Writer // = nil
var diffIDChan chan diffIDResult
err := errors.New("Internal error: unexpected panic in copyLayer") // For pipeWriter.CloseWithError below
@ -381,7 +382,7 @@ func (ic *imageCopier) copyLayerFromStream(srcStream io.Reader, srcInfo types.Bl
pipeWriter.CloseWithError(err) // CloseWithError(nil) is equivalent to Close()
}()
getDiffIDRecorder = func(decompressor decompressorFunc) io.Writer {
getDiffIDRecorder = func(decompressor compression.DecompressorFunc) io.Writer {
// If this fails, e.g. because we have exited and due to pipeWriter.CloseWithError() above further
// reading from the pipe has failed, we dont really care.
// We only read from diffIDChan if the rest of the flow has succeeded, and when we do read from it,
@ -399,7 +400,7 @@ func (ic *imageCopier) copyLayerFromStream(srcStream io.Reader, srcInfo types.Bl
}
// diffIDComputationGoroutine reads all input from layerStream, uncompresses using decompressor if necessary, and sends its digest, and status, if any, to dest.
func diffIDComputationGoroutine(dest chan<- diffIDResult, layerStream io.ReadCloser, decompressor decompressorFunc) {
func diffIDComputationGoroutine(dest chan<- diffIDResult, layerStream io.ReadCloser, decompressor compression.DecompressorFunc) {
result := diffIDResult{
digest: "",
err: errors.New("Internal error: unexpected panic in diffIDComputationGoroutine"),
@ -411,7 +412,7 @@ func diffIDComputationGoroutine(dest chan<- diffIDResult, layerStream io.ReadClo
}
// computeDiffID reads all input from layerStream, uncompresses it using decompressor if necessary, and returns its digest.
func computeDiffID(stream io.Reader, decompressor decompressorFunc) (digest.Digest, error) {
func computeDiffID(stream io.Reader, decompressor compression.DecompressorFunc) (digest.Digest, error) {
if decompressor != nil {
s, err := decompressor(stream)
if err != nil {
@ -428,7 +429,7 @@ func computeDiffID(stream io.Reader, decompressor decompressorFunc) (digest.Dige
// perhaps compressing it if canCompress,
// and returns a complete blobInfo of the copied blob.
func (ic *imageCopier) copyBlobFromStream(srcStream io.Reader, srcInfo types.BlobInfo,
getOriginalLayerCopyWriter func(decompressor decompressorFunc) io.Writer,
getOriginalLayerCopyWriter func(decompressor compression.DecompressorFunc) io.Writer,
canCompress bool) (types.BlobInfo, error) {
// The copying happens through a pipeline of connected io.Readers.
// === Input: srcStream
@ -446,8 +447,8 @@ func (ic *imageCopier) copyBlobFromStream(srcStream io.Reader, srcInfo types.Blo
var destStream io.Reader = digestingReader
// === Detect compression of the input stream.
// This requires us to “peek ahead” into the stream to read the initial part, which requires us to chain through another io.Reader returned by detectCompression.
decompressor, destStream, err := detectCompression(destStream) // We could skip this in some cases, but let's keep the code path uniform
// This requires us to “peek ahead” into the stream to read the initial part, which requires us to chain through another io.Reader returned by DetectCompression.
decompressor, destStream, err := compression.DetectCompression(destStream) // We could skip this in some cases, but let's keep the code path uniform
if err != nil {
return types.BlobInfo{}, errors.Wrapf(err, "Error reading blob %s", srcInfo.Digest)
}

View File

@ -10,6 +10,7 @@ import (
"path"
"github.com/containers/image/manifest"
"github.com/containers/image/pkg/compression"
"github.com/containers/image/types"
"github.com/docker/docker/client"
"github.com/opencontainers/go-digest"
@ -334,6 +335,18 @@ func (s *daemonImageSource) GetTargetManifest(digest digest.Digest) ([]byte, str
return nil, "", errors.Errorf(`Manifest lists are not supported by "docker-daemon:"`)
}
type readCloseWrapper struct {
io.Reader
closeFunc func() error
}
func (r readCloseWrapper) Close() error {
if r.closeFunc != nil {
return r.closeFunc()
}
return nil
}
// GetBlob returns a stream for the specified blob, and the blobs size (or -1 if unknown).
func (s *daemonImageSource) GetBlob(info types.BlobInfo) (io.ReadCloser, int64, error) {
if err := s.ensureCachedDataIsPresent(); err != nil {
@ -349,7 +362,37 @@ func (s *daemonImageSource) GetBlob(info types.BlobInfo) (io.ReadCloser, int64,
if err != nil {
return nil, 0, err
}
return stream, li.size, nil
// In order to handle the fact that digests != diffIDs (and thus that a
// caller which is trying to verify the blob will run into problems),
// we need to decompress blobs. This is a bit ugly, but it's a
// consequence of making everything addressable by their DiffID rather
// than by their digest...
//
// In particular, because the v2s2 manifest being generated uses
// DiffIDs, any caller of GetBlob is going to be asking for DiffIDs of
// layers not their _actual_ digest. The result is that copy/... will
// be verifing a "digest" which is not the actual layer's digest (but
// is instead the DiffID).
decompressFunc, reader, err := compression.DetectCompression(stream)
if err != nil {
return nil, 0, errors.Wrapf(err, "Detecting compression in blob %s", info.Digest)
}
if decompressFunc != nil {
reader, err = decompressFunc(reader)
if err != nil {
return nil, 0, errors.Wrapf(err, "Decompressing blob %s stream", info.Digest)
}
}
newStream := readCloseWrapper{
Reader: reader,
closeFunc: stream.Close,
}
return newStream, li.size, nil
}
return nil, 0, errors.Errorf("Unknown blob %s", info.Digest)

View File

@ -1,4 +1,4 @@
package copy
package compression
import (
"bytes"
@ -11,32 +11,37 @@ import (
"github.com/Sirupsen/logrus"
)
// decompressorFunc, given a compressed stream, returns the decompressed stream.
type decompressorFunc func(io.Reader) (io.Reader, error)
// DecompressorFunc returns the decompressed stream, given a compressed stream.
type DecompressorFunc func(io.Reader) (io.Reader, error)
func gzipDecompressor(r io.Reader) (io.Reader, error) {
// GzipDecompressor is a DecompressorFunc for the gzip compression algorithm.
func GzipDecompressor(r io.Reader) (io.Reader, error) {
return gzip.NewReader(r)
}
func bzip2Decompressor(r io.Reader) (io.Reader, error) {
// Bzip2Decompressor is a DecompressorFunc for the bzip2 compression algorithm.
func Bzip2Decompressor(r io.Reader) (io.Reader, error) {
return bzip2.NewReader(r), nil
}
func xzDecompressor(r io.Reader) (io.Reader, error) {
// XzDecompressor is a DecompressorFunc for the xz compression algorithm.
func XzDecompressor(r io.Reader) (io.Reader, error) {
return nil, errors.New("Decompressing xz streams is not supported")
}
// compressionAlgos is an internal implementation detail of detectCompression
// compressionAlgos is an internal implementation detail of DetectCompression
var compressionAlgos = map[string]struct {
prefix []byte
decompressor decompressorFunc
decompressor DecompressorFunc
}{
"gzip": {[]byte{0x1F, 0x8B, 0x08}, gzipDecompressor}, // gzip (RFC 1952)
"bzip2": {[]byte{0x42, 0x5A, 0x68}, bzip2Decompressor}, // bzip2 (decompress.c:BZ2_decompress)
"xz": {[]byte{0xFD, 0x37, 0x7A, 0x58, 0x5A, 0x00}, xzDecompressor}, // xz (/usr/share/doc/xz/xz-file-format.txt)
"gzip": {[]byte{0x1F, 0x8B, 0x08}, GzipDecompressor}, // gzip (RFC 1952)
"bzip2": {[]byte{0x42, 0x5A, 0x68}, Bzip2Decompressor}, // bzip2 (decompress.c:BZ2_decompress)
"xz": {[]byte{0xFD, 0x37, 0x7A, 0x58, 0x5A, 0x00}, XzDecompressor}, // xz (/usr/share/doc/xz/xz-file-format.txt)
}
// detectCompression returns a decompressorFunc if the input is recognized as a compressed format, nil otherwise.
// DetectCompression returns a DecompressorFunc if the input is recognized as a compressed format, nil otherwise.
// Because it consumes the start of input, other consumers must use the returned io.Reader instead to also read from the beginning.
func detectCompression(input io.Reader) (decompressorFunc, io.Reader, error) {
func DetectCompression(input io.Reader) (DecompressorFunc, io.Reader, error) {
buffer := [8]byte{}
n, err := io.ReadAtLeast(input, buffer[:], len(buffer))
@ -46,7 +51,7 @@ func detectCompression(input io.Reader) (decompressorFunc, io.Reader, error) {
return nil, nil, err
}
var decompressor decompressorFunc
var decompressor DecompressorFunc
for name, algo := range compressionAlgos {
if bytes.HasPrefix(buffer[:n], algo.prefix) {
logrus.Debugf("Detected compression format %s", name)