Merge pull request #249 from runcom/layers-federation

Support layers federation
This commit is contained in:
Antonio Murdaca 2016-11-30 22:14:30 +01:00 committed by GitHub
commit 980ff3eadd
16 changed files with 135 additions and 42 deletions

View File

@ -82,7 +82,7 @@ var layersCmd = cli.Command{
defer dest.Close()
for _, digest := range blobDigests {
r, blobSize, err := rawSource.GetBlob(digest)
r, blobSize, err := rawSource.GetBlob(types.BlobInfo{Digest: digest, Size: -1})
if err != nil {
return err
}

View File

@ -6,7 +6,7 @@ rm -rf vendor/
source 'hack/.vendor-helpers.sh'
clone git github.com/urfave/cli v1.17.0
clone git github.com/containers/image master
clone git github.com/containers/image layers-federation https://github.com/runcom/image
clone git gopkg.in/cheggaaa/pb.v1 ad4efe000aa550bb54918c06ebbadc0ff17687b9 https://github.com/cheggaaa/pb
clone git github.com/Sirupsen/logrus v0.10.0
clone git github.com/go-check/check v1

View File

@ -224,11 +224,27 @@ func copyLayers(manifestUpdates *types.ManifestUpdateOptions, dest types.ImageDe
for _, srcLayer := range srcInfos {
cl, ok := copiedLayers[srcLayer.Digest]
if !ok {
var (
destInfo types.BlobInfo
diffID digest.Digest
err error
)
if dest.AcceptsForeignLayerURLs() && len(srcLayer.URLs) != 0 {
// DiffIDs are, currently, needed only when converting from schema1.
// In which case src.LayerInfos will not have URLs because schema1
// does not support them.
if diffIDsAreNeeded {
return errors.New("getting DiffID for foreign layers is unimplemented")
}
destInfo = srcLayer
fmt.Fprintf(reportWriter, "Skipping foreign layer %q copy to %s\n", destInfo.Digest, dest.Reference().Transport().Name())
} else {
fmt.Fprintf(reportWriter, "Copying blob %s\n", srcLayer.Digest)
destInfo, diffID, err := copyLayer(dest, rawSource, srcLayer, diffIDsAreNeeded, canModifyManifest, reportWriter)
destInfo, diffID, err = copyLayer(dest, rawSource, srcLayer, diffIDsAreNeeded, canModifyManifest, reportWriter)
if err != nil {
return err
}
}
cl = copiedLayer{blobInfo: destInfo, diffID: diffID}
copiedLayers[srcLayer.Digest] = cl
}
@ -289,7 +305,7 @@ type diffIDResult struct {
// and returns a complete blobInfo of the copied layer, and a value for LayerDiffIDs if diffIDIsNeeded
func copyLayer(dest types.ImageDestination, src types.ImageSource, srcInfo types.BlobInfo,
diffIDIsNeeded bool, canCompress bool, reportWriter io.Writer) (types.BlobInfo, digest.Digest, error) {
srcStream, srcBlobSize, err := src.GetBlob(srcInfo.Digest) // We currently completely ignore srcInfo.Size throughout.
srcStream, srcBlobSize, err := src.GetBlob(srcInfo)
if err != nil {
return types.BlobInfo{}, "", fmt.Errorf("Error reading blob %s: %v", srcInfo.Digest, err)
}

View File

@ -44,6 +44,12 @@ func (d *dirImageDestination) ShouldCompressLayers() bool {
return false
}
// AcceptsForeignLayerURLs returns false iff foreign layers in manifest should be actually
// uploaded to the image destination, true otherwise.
func (d *dirImageDestination) AcceptsForeignLayerURLs() bool {
return false
}
// PutBlob writes contents of stream and returns data representing the result (with all data filled in).
// inputInfo.Digest can be optionally provided if known; it is not mandatory for the implementation to verify it.
// inputInfo.Size is the expected length of stream, if known.

View File

@ -46,8 +46,8 @@ func (s *dirImageSource) GetTargetManifest(digest digest.Digest) ([]byte, string
}
// GetBlob returns a stream for the specified blob, and the blobs size (or -1 if unknown).
func (s *dirImageSource) GetBlob(digest digest.Digest) (io.ReadCloser, int64, error) {
r, err := os.Open(s.ref.layerPath(digest))
func (s *dirImageSource) GetBlob(info types.BlobInfo) (io.ReadCloser, int64, error) {
r, err := os.Open(s.ref.layerPath(info.Digest))
if err != nil {
return nil, 0, nil
}

View File

@ -129,6 +129,12 @@ func (d *daemonImageDestination) ShouldCompressLayers() bool {
return false
}
// AcceptsForeignLayerURLs returns false iff foreign layers in manifest should be actually
// uploaded to the image destination, true otherwise.
func (d *daemonImageDestination) AcceptsForeignLayerURLs() bool {
return false
}
// PutBlob writes contents of stream and returns data representing the result (with all data filled in).
// inputInfo.Digest can be optionally provided if known; it is not mandatory for the implementation to verify it.
// inputInfo.Size is the expected length of stream, if known.

View File

@ -335,16 +335,16 @@ func (s *daemonImageSource) GetTargetManifest(digest digest.Digest) ([]byte, str
}
// GetBlob returns a stream for the specified blob, and the blobs size (or -1 if unknown).
func (s *daemonImageSource) GetBlob(digest digest.Digest) (io.ReadCloser, int64, error) {
func (s *daemonImageSource) GetBlob(info types.BlobInfo) (io.ReadCloser, int64, error) {
if err := s.ensureCachedDataIsPresent(); err != nil {
return nil, 0, err
}
if digest == s.configDigest { // FIXME? Implement a more general algorithm matching instead of assuming sha256.
if info.Digest == s.configDigest { // FIXME? Implement a more general algorithm matching instead of assuming sha256.
return ioutil.NopCloser(bytes.NewReader(s.configBytes)), int64(len(s.configBytes)), nil
}
if li, ok := s.knownLayers[diffID(digest)]; ok { // diffID is a digest of the uncompressed tarball,
if li, ok := s.knownLayers[diffID(info.Digest)]; ok { // diffID is a digest of the uncompressed tarball,
stream, err := s.openTarComponent(li.path)
if err != nil {
return nil, 0, err
@ -352,7 +352,7 @@ func (s *daemonImageSource) GetBlob(digest digest.Digest) (io.ReadCloser, int64,
return stream, li.size, nil
}
return nil, 0, fmt.Errorf("Unknown blob %s", digest)
return nil, 0, fmt.Errorf("Unknown blob %s", info.Digest)
}
// GetSignatures returns the image's signatures. It may use a remote (= slow) service.

View File

@ -139,13 +139,14 @@ func (c *dockerClient) makeRequest(method, url string, headers map[string][]stri
}
url = fmt.Sprintf(baseURL, c.scheme, c.registry) + url
return c.makeRequestToResolvedURL(method, url, headers, stream, -1)
return c.makeRequestToResolvedURL(method, url, headers, stream, -1, true)
}
// makeRequestToResolvedURL creates and executes a http.Request with the specified parameters, adding authentication and TLS options for the Docker client.
// streamLen, if not -1, specifies the length of the data expected on stream.
// makeRequest should generally be preferred.
func (c *dockerClient) makeRequestToResolvedURL(method, url string, headers map[string][]string, stream io.Reader, streamLen int64) (*http.Response, error) {
// TODO(runcom): too many arguments here, use a struct
func (c *dockerClient) makeRequestToResolvedURL(method, url string, headers map[string][]string, stream io.Reader, streamLen int64, sendAuth bool) (*http.Response, error) {
req, err := http.NewRequest(method, url, stream)
if err != nil {
return nil, err
@ -162,7 +163,7 @@ func (c *dockerClient) makeRequestToResolvedURL(method, url string, headers map[
if c.ctx != nil && c.ctx.DockerRegistryUserAgent != "" {
req.Header.Add("User-Agent", c.ctx.DockerRegistryUserAgent)
}
if c.wwwAuthenticate != "" {
if c.wwwAuthenticate != "" && sendAuth {
if err := c.setupRequestAuth(req); err != nil {
return nil, err
}
@ -351,7 +352,7 @@ type pingResponse struct {
func (c *dockerClient) ping() (*pingResponse, error) {
ping := func(scheme string) (*pingResponse, error) {
url := fmt.Sprintf(baseURL, scheme, c.registry)
resp, err := c.makeRequestToResolvedURL("GET", url, nil, nil, -1)
resp, err := c.makeRequestToResolvedURL("GET", url, nil, nil, -1, true)
logrus.Debugf("Ping %s err %#v", url, err)
if err != nil {
return nil, err

View File

@ -9,7 +9,6 @@ import (
"net/url"
"os"
"path/filepath"
"strconv"
"github.com/Sirupsen/logrus"
"github.com/containers/image/manifest"
@ -66,6 +65,12 @@ func (d *dockerImageDestination) ShouldCompressLayers() bool {
return true
}
// AcceptsForeignLayerURLs returns false iff foreign layers in manifest should be actually
// uploaded to the image destination, true otherwise.
func (d *dockerImageDestination) AcceptsForeignLayerURLs() bool {
return true
}
// sizeCounter is an io.Writer which only counts the total size of its input.
type sizeCounter struct{ size int64 }
@ -93,11 +98,7 @@ func (d *dockerImageDestination) PutBlob(stream io.Reader, inputInfo types.BlobI
switch res.StatusCode {
case http.StatusOK:
logrus.Debugf("... already exists, not uploading")
blobLength, err := strconv.ParseInt(res.Header.Get("Content-Length"), 10, 64)
if err != nil {
return types.BlobInfo{}, err
}
return types.BlobInfo{Digest: inputInfo.Digest, Size: blobLength}, nil
return types.BlobInfo{Digest: inputInfo.Digest, Size: getBlobSize(res)}, nil
case http.StatusUnauthorized:
logrus.Debugf("... not authorized")
return types.BlobInfo{}, fmt.Errorf("not authorized to read from destination repository %s", d.ref.ref.RemoteName())
@ -129,7 +130,7 @@ func (d *dockerImageDestination) PutBlob(stream io.Reader, inputInfo types.BlobI
digester := digest.Canonical.New()
sizeCounter := &sizeCounter{}
tee := io.TeeReader(stream, io.MultiWriter(digester.Hash(), sizeCounter))
res, err = d.c.makeRequestToResolvedURL("PATCH", uploadLocation.String(), map[string][]string{"Content-Type": {"application/octet-stream"}}, tee, inputInfo.Size)
res, err = d.c.makeRequestToResolvedURL("PATCH", uploadLocation.String(), map[string][]string{"Content-Type": {"application/octet-stream"}}, tee, inputInfo.Size, true)
if err != nil {
logrus.Debugf("Error uploading layer chunked, response %#v", *res)
return types.BlobInfo{}, err
@ -148,7 +149,7 @@ func (d *dockerImageDestination) PutBlob(stream io.Reader, inputInfo types.BlobI
// TODO: check inputInfo.Digest == computedDigest https://github.com/containers/image/pull/70#discussion_r77646717
locationQuery.Set("digest", computedDigest.String())
uploadLocation.RawQuery = locationQuery.Encode()
res, err = d.c.makeRequestToResolvedURL("PUT", uploadLocation.String(), map[string][]string{"Content-Type": {"application/octet-stream"}}, nil, -1)
res, err = d.c.makeRequestToResolvedURL("PUT", uploadLocation.String(), map[string][]string{"Content-Type": {"application/octet-stream"}}, nil, -1, true)
if err != nil {
return types.BlobInfo{}, err
}

View File

@ -130,9 +130,42 @@ func (s *dockerImageSource) ensureManifestIsLoaded() error {
return nil
}
func (s *dockerImageSource) getExternalBlob(urls []string) (io.ReadCloser, int64, error) {
var (
resp *http.Response
err error
)
for _, url := range urls {
resp, err = s.c.makeRequestToResolvedURL("GET", url, nil, nil, -1, false)
if err == nil {
if resp.StatusCode != http.StatusOK {
err = fmt.Errorf("error fetching external blob from %q: %d", url, resp.StatusCode)
logrus.Debug(err)
continue
}
}
}
if resp.Body != nil && err == nil {
return resp.Body, getBlobSize(resp), nil
}
return nil, 0, err
}
func getBlobSize(resp *http.Response) int64 {
size, err := strconv.ParseInt(resp.Header.Get("Content-Length"), 10, 64)
if err != nil {
size = -1
}
return size
}
// GetBlob returns a stream for the specified blob, and the blobs size (or -1 if unknown).
func (s *dockerImageSource) GetBlob(digest digest.Digest) (io.ReadCloser, int64, error) {
url := fmt.Sprintf(blobsURL, s.ref.ref.RemoteName(), digest.String())
func (s *dockerImageSource) GetBlob(info types.BlobInfo) (io.ReadCloser, int64, error) {
if len(info.URLs) != 0 {
return s.getExternalBlob(info.URLs)
}
url := fmt.Sprintf(blobsURL, s.ref.ref.RemoteName(), info.Digest.String())
logrus.Debugf("Downloading %s", url)
res, err := s.c.makeRequest("GET", url, nil, nil)
if err != nil {
@ -142,11 +175,7 @@ func (s *dockerImageSource) GetBlob(digest digest.Digest) (io.ReadCloser, int64,
// print url also
return nil, 0, fmt.Errorf("Invalid status code returned when fetching blob %d", res.StatusCode)
}
size, err := strconv.ParseInt(res.Header.Get("Content-Length"), 10, 64)
if err != nil {
size = -1
}
return res.Body, size, nil
return res.Body, getBlobSize(res), nil
}
func (s *dockerImageSource) GetSignatures() ([][]byte, error) {

View File

@ -31,6 +31,7 @@ type descriptor struct {
MediaType string `json:"mediaType"`
Size int64 `json:"size"`
Digest digest.Digest `json:"digest"`
URLs []string `json:"urls,omitempty"`
}
type manifestSchema2 struct {
@ -83,7 +84,11 @@ func (m *manifestSchema2) ConfigBlob() ([]byte, error) {
if m.src == nil {
return nil, fmt.Errorf("Internal error: neither src nor configBlob set in manifestSchema2")
}
stream, _, err := m.src.GetBlob(m.ConfigDescriptor.Digest)
stream, _, err := m.src.GetBlob(types.BlobInfo{
Digest: m.ConfigDescriptor.Digest,
Size: m.ConfigDescriptor.Size,
URLs: m.ConfigDescriptor.URLs,
})
if err != nil {
return nil, err
}
@ -107,7 +112,11 @@ func (m *manifestSchema2) ConfigBlob() ([]byte, error) {
func (m *manifestSchema2) LayerInfos() []types.BlobInfo {
blobs := []types.BlobInfo{}
for _, layer := range m.LayersDescriptors {
blobs = append(blobs, types.BlobInfo{Digest: layer.Digest, Size: layer.Size})
blobs = append(blobs, types.BlobInfo{
Digest: layer.Digest,
Size: layer.Size,
URLs: layer.URLs,
})
}
return blobs
}
@ -149,6 +158,7 @@ func (m *manifestSchema2) UpdatedImage(options types.ManifestUpdateOptions) (typ
for i, info := range options.LayerInfos {
copy.LayersDescriptors[i].Digest = info.Digest
copy.LayersDescriptors[i].Size = info.Size
copy.LayersDescriptors[i].URLs = info.URLs
}
}

View File

@ -24,6 +24,8 @@ const (
DockerV2Schema2LayerMediaType = "application/vnd.docker.image.rootfs.diff.tar.gzip"
// DockerV2ListMediaType MIME type represents Docker manifest schema 2 list
DockerV2ListMediaType = "application/vnd.docker.distribution.manifest.list.v2+json"
// DockerV2Schema2ForeignLayerMediaType is the MIME type used for schema 2 foreign layers.
DockerV2Schema2ForeignLayerMediaType = "application/vnd.docker.image.rootfs.foreign.diff.tar.gzip"
)
// DefaultRequestedManifestMIMETypes is a list of MIME types a types.ImageSource

View File

@ -52,6 +52,12 @@ func (d *ociImageDestination) ShouldCompressLayers() bool {
return false
}
// AcceptsForeignLayerURLs returns false iff foreign layers in manifest should be actually
// uploaded to the image destination, true otherwise.
func (d *ociImageDestination) AcceptsForeignLayerURLs() bool {
return false
}
// PutBlob writes contents of stream and returns data representing the result (with all data filled in).
// inputInfo.Digest can be optionally provided if known; it is not mandatory for the implementation to verify it.
// inputInfo.Size is the expected length of stream, if known.
@ -121,9 +127,13 @@ func createManifest(m []byte) ([]byte, string, error) {
return nil, "", err
}
om.MediaType = imgspecv1.MediaTypeImageManifest
for i := range om.Layers {
for i, l := range om.Layers {
if l.MediaType == manifest.DockerV2Schema2ForeignLayerMediaType {
om.Layers[i].MediaType = imgspecv1.MediaTypeImageLayerNonDistributable
} else {
om.Layers[i].MediaType = imgspecv1.MediaTypeImageLayer
}
}
om.Config.MediaType = imgspecv1.MediaTypeImageConfig
b, err := json.Marshal(om)
if err != nil {

View File

@ -72,8 +72,8 @@ func (s *ociImageSource) GetTargetManifest(digest digest.Digest) ([]byte, string
}
// GetBlob returns a stream for the specified blob, and the blob's size.
func (s *ociImageSource) GetBlob(digest digest.Digest) (io.ReadCloser, int64, error) {
path, err := s.ref.blobPath(digest)
func (s *ociImageSource) GetBlob(info types.BlobInfo) (io.ReadCloser, int64, error) {
path, err := s.ref.blobPath(info.Digest)
if err != nil {
return nil, 0, err
}

View File

@ -214,11 +214,11 @@ func (s *openshiftImageSource) GetManifest() ([]byte, string, error) {
}
// GetBlob returns a stream for the specified blob, and the blobs size (or -1 if unknown).
func (s *openshiftImageSource) GetBlob(digest digest.Digest) (io.ReadCloser, int64, error) {
func (s *openshiftImageSource) GetBlob(info types.BlobInfo) (io.ReadCloser, int64, error) {
if err := s.ensureImageIsResolved(); err != nil {
return nil, 0, err
}
return s.docker.GetBlob(digest)
return s.docker.GetBlob(info)
}
func (s *openshiftImageSource) GetSignatures() ([][]byte, error) {
@ -350,6 +350,12 @@ func (d *openshiftImageDestination) ShouldCompressLayers() bool {
return true
}
// AcceptsForeignLayerURLs returns false iff foreign layers in manifest should be actually
// uploaded to the image destination, true otherwise.
func (d *openshiftImageDestination) AcceptsForeignLayerURLs() bool {
return true
}
// PutBlob writes contents of stream and returns data representing the result (with all data filled in).
// inputInfo.Digest can be optionally provided if known; it is not mandatory for the implementation to verify it.
// inputInfo.Size is the expected length of stream, if known.

View File

@ -94,6 +94,7 @@ type ImageReference interface {
type BlobInfo struct {
Digest digest.Digest // "" if unknown.
Size int64 // -1 if unknown
URLs []string
}
// ImageSource is a service, possibly remote (= slow), to download components of a single image.
@ -116,7 +117,8 @@ type ImageSource interface {
// out of a manifest list.
GetTargetManifest(digest digest.Digest) ([]byte, string, error)
// GetBlob returns a stream for the specified blob, and the blobs size (or -1 if unknown).
GetBlob(digest digest.Digest) (io.ReadCloser, int64, error)
// The Digest field in BlobInfo is guaranteed to be provided; Size may be -1.
GetBlob(BlobInfo) (io.ReadCloser, int64, error)
// GetSignatures returns the image's signatures. It may use a remote (= slow) service.
GetSignatures() ([][]byte, error)
}
@ -145,6 +147,10 @@ type ImageDestination interface {
// ShouldCompressLayers returns true iff it is desirable to compress layer blobs written to this destination.
ShouldCompressLayers() bool
// AcceptsForeignLayerURLs returns false iff foreign layers in manifest should be actually
// uploaded to the image destination, true otherwise.
AcceptsForeignLayerURLs() bool
// PutBlob writes contents of stream and returns data representing the result (with all data filled in).
// inputInfo.Digest can be optionally provided if known; it is not mandatory for the implementation to verify it.
// inputInfo.Size is the expected length of stream, if known.
@ -210,7 +216,7 @@ type Image interface {
// ManifestUpdateOptions is a way to pass named optional arguments to Image.UpdatedManifest
type ManifestUpdateOptions struct {
LayerInfos []BlobInfo // Complete BlobInfos (size+digest) which should replace the originals, in order (the root layer first, and then successive layered layers)
LayerInfos []BlobInfo // Complete BlobInfos (size+digest+urls) which should replace the originals, in order (the root layer first, and then successive layered layers)
ManifestMIMEType string
// The values below are NOT requests to modify the image; they provide optional context which may or may not be used.
InformationOnly ManifestUpdateInformation