Vendor after merging mtrmac/image:compress

This commit is contained in:
Miloslav Trmač
2016-09-06 21:01:41 +02:00
parent 9c1cb79754
commit d705644f22
10 changed files with 257 additions and 29 deletions

View File

@@ -1,14 +1,19 @@
package copy
import (
"bytes"
"compress/gzip"
"crypto/sha256"
"crypto/subtle"
"encoding/hex"
"errors"
"fmt"
"hash"
"io"
"reflect"
"strings"
"github.com/Sirupsen/logrus"
"github.com/containers/image/image"
"github.com/containers/image/signature"
"github.com/containers/image/transports"
@@ -120,27 +125,52 @@ func Image(ctx *types.SystemContext, policyContext *signature.PolicyContext, des
return fmt.Errorf("Can not copy signatures: %v", err)
}
}
canModifyManifest := len(sigs) == 0
configInfo, err := src.ConfigInfo()
srcConfigInfo, err := src.ConfigInfo()
if err != nil {
return fmt.Errorf("Error parsing manifest: %v", err)
}
if configInfo.Digest != "" {
if err := copyBlob(dest, rawSource, configInfo.Digest); err != nil {
if srcConfigInfo.Digest != "" {
destConfigInfo, err := copyBlob(dest, rawSource, srcConfigInfo, false)
if err != nil {
return err
}
if destConfigInfo.Digest != srcConfigInfo.Digest {
return fmt.Errorf("Internal error: copying uncompressed config blob %s changed digest to %s", srcConfigInfo.Digest, destConfigInfo.Digest)
}
}
layerInfos, err := src.LayerInfos()
srcLayerInfos, err := src.LayerInfos()
if err != nil {
return fmt.Errorf("Error parsing manifest: %v", err)
}
copiedLayers := map[string]struct{}{}
for _, info := range layerInfos {
if _, ok := copiedLayers[info.Digest]; !ok {
if err := copyBlob(dest, rawSource, info.Digest); err != nil {
destLayerInfos := []types.BlobInfo{}
copiedLayers := map[string]types.BlobInfo{}
for _, srcLayer := range srcLayerInfos {
destLayer, ok := copiedLayers[srcLayer.Digest]
if !ok {
destLayer, err = copyBlob(dest, rawSource, srcLayer, canModifyManifest)
if err != nil {
return err
}
copiedLayers[info.Digest] = struct{}{}
copiedLayers[srcLayer.Digest] = destLayer
}
destLayerInfos = append(destLayerInfos, destLayer)
}
manifestUpdates := types.ManifestUpdateOptions{}
if layerDigestsDiffer(srcLayerInfos, destLayerInfos) {
manifestUpdates.LayerInfos = destLayerInfos
}
if !reflect.DeepEqual(manifestUpdates, types.ManifestUpdateOptions{}) {
if !canModifyManifest {
return fmt.Errorf("Internal error: copy needs an updated manifest but that was known to be forbidden")
}
manifest, err = src.UpdatedManifest(manifestUpdates)
if err != nil {
return fmt.Errorf("Error creating an updated manifest: %v", err)
}
}
@@ -176,27 +206,117 @@ func Image(ctx *types.SystemContext, policyContext *signature.PolicyContext, des
return nil
}
func copyBlob(dest types.ImageDestination, src types.ImageSource, digest string) error {
stream, blobSize, err := src.GetBlob(digest)
if err != nil {
return fmt.Errorf("Error reading blob %s: %v", digest, err)
// layerDigestsDiffer return true iff the digests in a and b differ (ignoring sizes and possible other fields)
func layerDigestsDiffer(a, b []types.BlobInfo) bool {
if len(a) != len(b) {
return true
}
defer stream.Close()
for i := range a {
if a[i].Digest != b[i].Digest {
return true
}
}
return false
}
// copyBlob copies a blob with srcInfo (with known Digest and possibly known Size) in src to dest, perhaps compressing it if canCompress,
// and returns a complete blobInfo of the copied blob.
func copyBlob(dest types.ImageDestination, src types.ImageSource, srcInfo types.BlobInfo, canCompress bool) (types.BlobInfo, error) {
srcStream, srcBlobSize, err := src.GetBlob(srcInfo.Digest) // We currently completely ignore srcInfo.Size throughout.
if err != nil {
return types.BlobInfo{}, fmt.Errorf("Error reading blob %s: %v", srcInfo.Digest, err)
}
defer srcStream.Close()
// Be paranoid; in case PutBlob somehow managed to ignore an error from digestingReader,
// use a separate validation failure indicator.
// Note that we don't use a stronger "validationSucceeded" indicator, because
// dest.PutBlob may detect that the layer already exists, in which case we don't
// read stream to the end, and validation does not happen.
digestingReader, err := newDigestingReader(stream, digest)
digestingReader, err := newDigestingReader(srcStream, srcInfo.Digest)
if err != nil {
return fmt.Errorf("Error preparing to verify blob %s: %v", digest, err)
return types.BlobInfo{}, fmt.Errorf("Error preparing to verify blob %s: %v", srcInfo.Digest, err)
}
if _, err := dest.PutBlob(digestingReader, types.BlobInfo{Digest: digest, Size: blobSize}); err != nil {
return fmt.Errorf("Error writing blob: %v", err)
var destStream io.Reader = digestingReader
isCompressed, destStream, err := isStreamCompressed(destStream) // We could skip this in some cases, but let's keep the code path uniform
if err != nil {
return types.BlobInfo{}, fmt.Errorf("Error reading blob %s: %v", srcInfo.Digest, err)
}
var inputInfo types.BlobInfo
if !canCompress || isCompressed || !dest.ShouldCompressLayers() {
logrus.Debugf("Using original blob without modification")
inputInfo.Digest = srcInfo.Digest
inputInfo.Size = srcBlobSize
} else {
logrus.Debugf("Compressing blob on the fly")
pipeReader, pipeWriter := io.Pipe()
defer pipeReader.Close()
// If this fails while writing data, it will do pipeWriter.CloseWithError(); if it fails otherwise,
// e.g. because we have exited and due to pipeReader.Close() above further writing to the pipe has failed,
// we dont care.
go compressGoroutine(pipeWriter, destStream) // Closes pipeWriter
destStream = pipeReader
inputInfo.Digest = ""
inputInfo.Size = -1
}
uploadedInfo, err := dest.PutBlob(destStream, inputInfo)
if err != nil {
return types.BlobInfo{}, fmt.Errorf("Error writing blob: %v", err)
}
if digestingReader.validationFailed { // Coverage: This should never happen.
return fmt.Errorf("Internal error uploading blob %s, digest verification failed but was ignored", digest)
return types.BlobInfo{}, fmt.Errorf("Internal error uploading blob %s, digest verification failed but was ignored", srcInfo.Digest)
}
return nil
if inputInfo.Digest != "" && uploadedInfo.Digest != inputInfo.Digest {
return types.BlobInfo{}, fmt.Errorf("Internal error uploading blob %s, blob with digest %s uploaded with digest %s", srcInfo.Digest, inputInfo.Digest, uploadedInfo.Digest)
}
return uploadedInfo, nil
}
// compressionPrefixes is an internal implementation detail of isStreamCompressed
var compressionPrefixes = map[string][]byte{
"gzip": {0x1F, 0x8B, 0x08}, // gzip (RFC 1952)
"bzip2": {0x42, 0x5A, 0x68}, // bzip2 (decompress.c:BZ2_decompress)
"xz": {0xFD, 0x37, 0x7A, 0x58, 0x5A, 0x00}, // xz (/usr/share/doc/xz/xz-file-format.txt)
}
// isStreamCompressed returns true if input is recognized as a compressed format.
// Because it consumes the start of input, other consumers must use the returned io.Reader instead to also read from the beginning.
func isStreamCompressed(input io.Reader) (bool, io.Reader, error) {
buffer := [8]byte{}
n, err := io.ReadAtLeast(input, buffer[:], len(buffer))
if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
// This is a “real” error. We could just ignore it this time, process the data we have, and hope that the source will report the same error again.
// Instead, fail immediately with the original error cause instead of a possibly secondary/misleading error returned later.
return false, nil, err
}
isCompressed := false
for algo, prefix := range compressionPrefixes {
if bytes.HasPrefix(buffer[:n], prefix) {
logrus.Debugf("Detected compression format %s", algo)
isCompressed = true
break
}
}
if !isCompressed {
logrus.Debugf("No compression detected")
}
return isCompressed, io.MultiReader(bytes.NewReader(buffer[:n]), input), nil
}
// compressGoroutine reads all input from src and writes its compressed equivalent to dest.
func compressGoroutine(dest *io.PipeWriter, src io.Reader) {
err := errors.New("Internal error: unexpected panic in compressGoroutine")
defer func() { // Note that this is not the same as {defer dest.CloseWithError(err)}; we need err to be evaluated lazily.
dest.CloseWithError(err) // CloseWithError(nil) is equivalent to Close()
}()
zipper := gzip.NewWriter(dest)
defer zipper.Close()
_, err = io.Copy(zipper, src) // Sets err to nil, i.e. causes dest.Close()
}