mirror of
https://github.com/mudler/luet.git
synced 2025-09-11 20:19:30 +00:00
update vendor
This commit is contained in:
490
vendor/github.com/google/go-containerregistry/pkg/v1/remote/write.go
generated
vendored
490
vendor/github.com/google/go-containerregistry/pkg/v1/remote/write.go
generated
vendored
@@ -23,15 +23,18 @@ import (
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strings"
|
||||
"sync/atomic"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/google/go-containerregistry/pkg/internal/redact"
|
||||
"github.com/google/go-containerregistry/pkg/internal/retry"
|
||||
"github.com/google/go-containerregistry/internal/redact"
|
||||
"github.com/google/go-containerregistry/internal/retry"
|
||||
"github.com/google/go-containerregistry/pkg/logs"
|
||||
"github.com/google/go-containerregistry/pkg/name"
|
||||
v1 "github.com/google/go-containerregistry/pkg/v1"
|
||||
"github.com/google/go-containerregistry/pkg/v1/partial"
|
||||
"github.com/google/go-containerregistry/pkg/v1/remote/transport"
|
||||
"github.com/google/go-containerregistry/pkg/v1/stream"
|
||||
"github.com/google/go-containerregistry/pkg/v1/types"
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
@@ -42,62 +45,97 @@ type Taggable interface {
|
||||
}
|
||||
|
||||
// Write pushes the provided img to the specified image reference.
|
||||
func Write(ref name.Reference, img v1.Image, options ...Option) error {
|
||||
ls, err := img.Layers()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
func Write(ref name.Reference, img v1.Image, options ...Option) (rerr error) {
|
||||
o, err := makeOptions(ref.Context(), options...)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var lastUpdate *v1.Update
|
||||
if o.updates != nil {
|
||||
lastUpdate = &v1.Update{}
|
||||
lastUpdate.Total, err = countImage(img, o.allowNondistributableArtifacts)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer close(o.updates)
|
||||
defer func() { sendError(o.updates, rerr) }()
|
||||
}
|
||||
return writeImage(ref, img, o, lastUpdate)
|
||||
}
|
||||
|
||||
func writeImage(ref name.Reference, img v1.Image, o *options, lastUpdate *v1.Update) error {
|
||||
ls, err := img.Layers()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
scopes := scopesForUploadingImage(ref.Context(), ls)
|
||||
tr, err := transport.NewWithContext(o.context, ref.Context().Registry, o.auth, o.transport, scopes)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
w := writer{
|
||||
repo: ref.Context(),
|
||||
client: &http.Client{Transport: tr},
|
||||
context: o.context,
|
||||
repo: ref.Context(),
|
||||
client: &http.Client{Transport: tr},
|
||||
context: o.context,
|
||||
updates: o.updates,
|
||||
lastUpdate: lastUpdate,
|
||||
}
|
||||
|
||||
// Upload individual blobs and collect any errors.
|
||||
blobChan := make(chan v1.Layer, 2*o.jobs)
|
||||
g, ctx := errgroup.WithContext(o.context)
|
||||
for i := 0; i < o.jobs; i++ {
|
||||
// Start N workers consuming blobs to upload.
|
||||
g.Go(func() error {
|
||||
for b := range blobChan {
|
||||
if err := w.uploadOne(b); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
// Upload individual layers in goroutines and collect any errors.
|
||||
// If we can dedupe by the layer digest, try to do so. If we can't determine
|
||||
// the digest for whatever reason, we can't dedupe and might re-upload.
|
||||
var g errgroup.Group
|
||||
uploaded := map[v1.Hash]bool{}
|
||||
for _, l := range ls {
|
||||
l := l
|
||||
g.Go(func() error {
|
||||
defer close(blobChan)
|
||||
uploaded := map[v1.Hash]bool{}
|
||||
for _, l := range ls {
|
||||
l := l
|
||||
|
||||
// Handle foreign layers.
|
||||
mt, err := l.MediaType()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !mt.IsDistributable() {
|
||||
// TODO(jonjohnsonjr): Add "allow-nondistributable-artifacts" option.
|
||||
continue
|
||||
}
|
||||
|
||||
// Streaming layers calculate their digests while uploading them. Assume
|
||||
// an error here indicates we need to upload the layer.
|
||||
h, err := l.Digest()
|
||||
if err == nil {
|
||||
// If we can determine the layer's digest ahead of
|
||||
// time, use it to dedupe uploads.
|
||||
if uploaded[h] {
|
||||
continue // Already uploading.
|
||||
// Handle foreign layers.
|
||||
mt, err := l.MediaType()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !mt.IsDistributable() && !o.allowNondistributableArtifacts {
|
||||
continue
|
||||
}
|
||||
uploaded[h] = true
|
||||
}
|
||||
|
||||
// TODO(#803): Pipe through remote.WithJobs and upload these in parallel.
|
||||
g.Go(func() error {
|
||||
return w.uploadOne(l)
|
||||
})
|
||||
// Streaming layers calculate their digests while uploading them. Assume
|
||||
// an error here indicates we need to upload the layer.
|
||||
h, err := l.Digest()
|
||||
if err == nil {
|
||||
// If we can determine the layer's digest ahead of
|
||||
// time, use it to dedupe uploads.
|
||||
if uploaded[h] {
|
||||
continue // Already uploading.
|
||||
}
|
||||
uploaded[h] = true
|
||||
}
|
||||
select {
|
||||
case blobChan <- l:
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err := g.Wait(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if l, err := partial.ConfigLayer(img); err != nil {
|
||||
@@ -138,6 +176,16 @@ type writer struct {
|
||||
repo name.Repository
|
||||
client *http.Client
|
||||
context context.Context
|
||||
|
||||
updates chan<- v1.Update
|
||||
lastUpdate *v1.Update
|
||||
}
|
||||
|
||||
func sendError(ch chan<- v1.Update, err error) error {
|
||||
if err != nil && ch != nil {
|
||||
ch <- v1.Update{Error: err}
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// url returns a url.Url for the specified path in the context of this remote image reference.
|
||||
@@ -260,14 +308,54 @@ func (w *writer) initiateUpload(from, mount string) (location string, mounted bo
|
||||
}
|
||||
}
|
||||
|
||||
type progressReader struct {
|
||||
rc io.ReadCloser
|
||||
|
||||
count *int64 // number of bytes this reader has read, to support resetting on retry.
|
||||
updates chan<- v1.Update
|
||||
lastUpdate *v1.Update
|
||||
}
|
||||
|
||||
func (r *progressReader) Read(b []byte) (int, error) {
|
||||
n, err := r.rc.Read(b)
|
||||
if err != nil {
|
||||
return n, err
|
||||
}
|
||||
atomic.AddInt64(r.count, int64(n))
|
||||
// TODO: warn/debug log if sending takes too long, or if sending is blocked while context is cancelled.
|
||||
r.updates <- v1.Update{
|
||||
Total: r.lastUpdate.Total,
|
||||
Complete: atomic.AddInt64(&r.lastUpdate.Complete, int64(n)),
|
||||
}
|
||||
return n, nil
|
||||
}
|
||||
|
||||
func (r *progressReader) Close() error { return r.rc.Close() }
|
||||
|
||||
// streamBlob streams the contents of the blob to the specified location.
|
||||
// On failure, this will return an error. On success, this will return the location
|
||||
// header indicating how to commit the streamed blob.
|
||||
func (w *writer) streamBlob(ctx context.Context, blob io.ReadCloser, streamLocation string) (commitLocation string, err error) {
|
||||
func (w *writer) streamBlob(ctx context.Context, blob io.ReadCloser, streamLocation string) (commitLocation string, rerr error) {
|
||||
reset := func() {}
|
||||
defer func() {
|
||||
if rerr != nil {
|
||||
reset()
|
||||
}
|
||||
}()
|
||||
if w.updates != nil {
|
||||
var count int64
|
||||
blob = &progressReader{rc: blob, updates: w.updates, lastUpdate: w.lastUpdate, count: &count}
|
||||
reset = func() {
|
||||
atomic.AddInt64(&w.lastUpdate.Complete, -count)
|
||||
w.updates <- *w.lastUpdate
|
||||
}
|
||||
}
|
||||
|
||||
req, err := http.NewRequest(http.MethodPatch, streamLocation, blob)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
req.Header.Set("Content-Type", "application/octet-stream")
|
||||
|
||||
resp, err := w.client.Do(req.WithContext(ctx))
|
||||
if err != nil {
|
||||
@@ -299,6 +387,7 @@ func (w *writer) commitBlob(location, digest string) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
req.Header.Set("Content-Type", "application/octet-stream")
|
||||
|
||||
resp, err := w.client.Do(req.WithContext(w.context))
|
||||
if err != nil {
|
||||
@@ -309,6 +398,35 @@ func (w *writer) commitBlob(location, digest string) error {
|
||||
return transport.CheckError(resp, http.StatusCreated)
|
||||
}
|
||||
|
||||
// incrProgress increments and sends a progress update, if WithProgress is used.
|
||||
func (w *writer) incrProgress(written int64) {
|
||||
if w.updates == nil {
|
||||
return
|
||||
}
|
||||
w.updates <- v1.Update{
|
||||
Total: w.lastUpdate.Total,
|
||||
Complete: atomic.AddInt64(&w.lastUpdate.Complete, int64(written)),
|
||||
}
|
||||
}
|
||||
|
||||
var shouldRetry retry.Predicate = func(err error) bool {
|
||||
// Various failure modes here, as we're often reading from and writing to
|
||||
// the network.
|
||||
if retry.IsTemporary(err) || errors.Is(err, io.ErrUnexpectedEOF) || errors.Is(err, syscall.EPIPE) {
|
||||
logs.Warn.Printf("retrying %v", err)
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// Try this three times, waiting 1s after first failure, 3s after second.
|
||||
var backoff = retry.Backoff{
|
||||
Duration: 1.0 * time.Second,
|
||||
Factor: 3.0,
|
||||
Jitter: 0.1,
|
||||
Steps: 3,
|
||||
}
|
||||
|
||||
// uploadOne performs a complete upload of a single layer.
|
||||
func (w *writer) uploadOne(l v1.Layer) error {
|
||||
var from, mount string
|
||||
@@ -320,6 +438,11 @@ func (w *writer) uploadOne(l v1.Layer) error {
|
||||
return err
|
||||
}
|
||||
if existing {
|
||||
size, err := l.Size()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
w.incrProgress(size)
|
||||
logs.Progress.Printf("existing blob: %v", h)
|
||||
return nil
|
||||
}
|
||||
@@ -339,6 +462,11 @@ func (w *writer) uploadOne(l v1.Layer) error {
|
||||
if err != nil {
|
||||
return err
|
||||
} else if mounted {
|
||||
size, err := l.Size()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
w.incrProgress(size)
|
||||
h, err := l.Digest()
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -380,15 +508,11 @@ func (w *writer) uploadOne(l v1.Layer) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Try this three times, waiting 1s after first failure, 3s after second.
|
||||
backoff := retry.Backoff{
|
||||
Duration: 1.0 * time.Second,
|
||||
Factor: 3.0,
|
||||
Jitter: 0.1,
|
||||
Steps: 3,
|
||||
}
|
||||
return retry.Retry(tryUpload, shouldRetry, backoff)
|
||||
}
|
||||
|
||||
return retry.Retry(tryUpload, retry.IsTemporary, backoff)
|
||||
type withLayer interface {
|
||||
Layer(v1.Hash) (v1.Layer, error)
|
||||
}
|
||||
|
||||
func (w *writer) writeIndex(ref name.Reference, ii v1.ImageIndex, options ...Option) error {
|
||||
@@ -397,6 +521,11 @@ func (w *writer) writeIndex(ref name.Reference, ii v1.ImageIndex, options ...Opt
|
||||
return err
|
||||
}
|
||||
|
||||
o, err := makeOptions(ref.Context(), options...)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// TODO(#803): Pipe through remote.WithJobs and upload these in parallel.
|
||||
for _, desc := range index.Manifests {
|
||||
ref := ref.Context().Digest(desc.Digest.String())
|
||||
@@ -415,7 +544,6 @@ func (w *writer) writeIndex(ref name.Reference, ii v1.ImageIndex, options ...Opt
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := w.writeIndex(ref, ii); err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -424,12 +552,20 @@ func (w *writer) writeIndex(ref name.Reference, ii v1.ImageIndex, options ...Opt
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// TODO: Ideally we could reuse this writer, but we need to know
|
||||
// scopes before we do the token exchange. To be lazy here, just
|
||||
// re-do the token exchange. MultiWrite fixes this.
|
||||
if err := Write(ref, img, options...); err != nil {
|
||||
if err := writeImage(ref, img, o, w.lastUpdate); err != nil {
|
||||
return err
|
||||
}
|
||||
default:
|
||||
// Workaround for #819.
|
||||
if wl, ok := ii.(withLayer); ok {
|
||||
layer, err := wl.Layer(desc.Digest)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := w.uploadOne(layer); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -448,13 +584,13 @@ type withMediaType interface {
|
||||
//
|
||||
// Use reflection to either pull the v1.Descriptor out of remote.Descriptor or
|
||||
// create a descriptor based on the RawManifest and (optionally) MediaType.
|
||||
func unpackTaggable(t Taggable) (*v1.Descriptor, error) {
|
||||
func unpackTaggable(t Taggable) ([]byte, *v1.Descriptor, error) {
|
||||
if d, ok := t.(*Descriptor); ok {
|
||||
return &d.Descriptor, nil
|
||||
return d.Manifest, &d.Descriptor, nil
|
||||
}
|
||||
b, err := t.RawManifest()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
// A reasonable default if Taggable doesn't implement MediaType.
|
||||
@@ -463,17 +599,17 @@ func unpackTaggable(t Taggable) (*v1.Descriptor, error) {
|
||||
if wmt, ok := t.(withMediaType); ok {
|
||||
m, err := wmt.MediaType()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, nil, err
|
||||
}
|
||||
mt = m
|
||||
}
|
||||
|
||||
h, sz, err := v1.SHA256(bytes.NewReader(b))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
return &v1.Descriptor{
|
||||
return b, &v1.Descriptor{
|
||||
MediaType: mt,
|
||||
Size: sz,
|
||||
Digest: h,
|
||||
@@ -482,37 +618,38 @@ func unpackTaggable(t Taggable) (*v1.Descriptor, error) {
|
||||
|
||||
// commitManifest does a PUT of the image's manifest.
|
||||
func (w *writer) commitManifest(t Taggable, ref name.Reference) error {
|
||||
raw, err := t.RawManifest()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
desc, err := unpackTaggable(t)
|
||||
if err != nil {
|
||||
return err
|
||||
tryUpload := func() error {
|
||||
raw, desc, err := unpackTaggable(t)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
u := w.url(fmt.Sprintf("/v2/%s/manifests/%s", w.repo.RepositoryStr(), ref.Identifier()))
|
||||
|
||||
// Make the request to PUT the serialized manifest
|
||||
req, err := http.NewRequest(http.MethodPut, u.String(), bytes.NewBuffer(raw))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
req.Header.Set("Content-Type", string(desc.MediaType))
|
||||
|
||||
resp, err := w.client.Do(req.WithContext(w.context))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if err := transport.CheckError(resp, http.StatusOK, http.StatusCreated, http.StatusAccepted); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// The image was successfully pushed!
|
||||
logs.Progress.Printf("%v: digest: %v size: %d", ref, desc.Digest, desc.Size)
|
||||
w.incrProgress(int64(len(raw)))
|
||||
return nil
|
||||
}
|
||||
|
||||
u := w.url(fmt.Sprintf("/v2/%s/manifests/%s", w.repo.RepositoryStr(), ref.Identifier()))
|
||||
|
||||
// Make the request to PUT the serialized manifest
|
||||
req, err := http.NewRequest(http.MethodPut, u.String(), bytes.NewBuffer(raw))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
req.Header.Set("Content-Type", string(desc.MediaType))
|
||||
|
||||
resp, err := w.client.Do(req.WithContext(w.context))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if err := transport.CheckError(resp, http.StatusOK, http.StatusCreated, http.StatusAccepted); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// The image was successfully pushed!
|
||||
logs.Progress.Printf("%v: digest: %v size: %d", ref, desc.Digest, len(raw))
|
||||
return nil
|
||||
return retry.Retry(tryUpload, shouldRetry, backoff)
|
||||
}
|
||||
|
||||
func scopesForUploadingImage(repo name.Repository, layers []v1.Layer) []string {
|
||||
@@ -543,11 +680,12 @@ func scopesForUploadingImage(repo name.Repository, layers []v1.Layer) []string {
|
||||
// WriteIndex pushes the provided ImageIndex to the specified image reference.
|
||||
// WriteIndex will attempt to push all of the referenced manifests before
|
||||
// attempting to push the ImageIndex, to retain referential integrity.
|
||||
func WriteIndex(ref name.Reference, ii v1.ImageIndex, options ...Option) error {
|
||||
func WriteIndex(ref name.Reference, ii v1.ImageIndex, options ...Option) (rerr error) {
|
||||
o, err := makeOptions(ref.Context(), options...)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
scopes := []string{ref.Scope(transport.PushScope)}
|
||||
tr, err := transport.NewWithContext(o.context, ref.Context().Registry, o.auth, o.transport, scopes)
|
||||
if err != nil {
|
||||
@@ -557,12 +695,132 @@ func WriteIndex(ref name.Reference, ii v1.ImageIndex, options ...Option) error {
|
||||
repo: ref.Context(),
|
||||
client: &http.Client{Transport: tr},
|
||||
context: o.context,
|
||||
updates: o.updates,
|
||||
}
|
||||
|
||||
if o.updates != nil {
|
||||
w.lastUpdate = &v1.Update{}
|
||||
w.lastUpdate.Total, err = countIndex(ii, o.allowNondistributableArtifacts)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer close(o.updates)
|
||||
defer func() { sendError(o.updates, rerr) }()
|
||||
}
|
||||
|
||||
return w.writeIndex(ref, ii, options...)
|
||||
}
|
||||
|
||||
// countImage counts the total size of all layers + config blob + manifest for
|
||||
// an image. It de-dupes duplicate layers.
|
||||
func countImage(img v1.Image, allowNondistributableArtifacts bool) (int64, error) {
|
||||
var total int64
|
||||
ls, err := img.Layers()
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
seen := map[v1.Hash]bool{}
|
||||
for _, l := range ls {
|
||||
// Handle foreign layers.
|
||||
mt, err := l.MediaType()
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
if !mt.IsDistributable() && !allowNondistributableArtifacts {
|
||||
continue
|
||||
}
|
||||
|
||||
// TODO: support streaming layers which update the total count as they write.
|
||||
if _, ok := l.(*stream.Layer); ok {
|
||||
return 0, errors.New("cannot use stream.Layer and WithProgress")
|
||||
}
|
||||
|
||||
// Dedupe layers.
|
||||
d, err := l.Digest()
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
if seen[d] {
|
||||
continue
|
||||
}
|
||||
seen[d] = true
|
||||
|
||||
size, err := l.Size()
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
total += size
|
||||
}
|
||||
b, err := img.RawConfigFile()
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
total += int64(len(b))
|
||||
size, err := img.Size()
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
total += size
|
||||
return total, nil
|
||||
}
|
||||
|
||||
// countIndex counts the total size of all images + sub-indexes for an index.
|
||||
// It does not attempt to de-dupe duplicate images, etc.
|
||||
func countIndex(idx v1.ImageIndex, allowNondistributableArtifacts bool) (int64, error) {
|
||||
var total int64
|
||||
mf, err := idx.IndexManifest()
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
for _, desc := range mf.Manifests {
|
||||
switch desc.MediaType {
|
||||
case types.OCIImageIndex, types.DockerManifestList:
|
||||
sidx, err := idx.ImageIndex(desc.Digest)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
size, err := countIndex(sidx, allowNondistributableArtifacts)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
total += size
|
||||
case types.OCIManifestSchema1, types.DockerManifestSchema2:
|
||||
simg, err := idx.Image(desc.Digest)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
size, err := countImage(simg, allowNondistributableArtifacts)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
total += size
|
||||
default:
|
||||
// Workaround for #819.
|
||||
if wl, ok := idx.(withLayer); ok {
|
||||
layer, err := wl.Layer(desc.Digest)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
size, err := layer.Size()
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
total += size
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
size, err := idx.Size()
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
total += size
|
||||
return total, nil
|
||||
}
|
||||
|
||||
// WriteLayer uploads the provided Layer to the specified repo.
|
||||
func WriteLayer(repo name.Repository, layer v1.Layer, options ...Option) error {
|
||||
func WriteLayer(repo name.Repository, layer v1.Layer, options ...Option) (rerr error) {
|
||||
o, err := makeOptions(repo, options...)
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -576,18 +834,58 @@ func WriteLayer(repo name.Repository, layer v1.Layer, options ...Option) error {
|
||||
repo: repo,
|
||||
client: &http.Client{Transport: tr},
|
||||
context: o.context,
|
||||
updates: o.updates,
|
||||
}
|
||||
|
||||
if o.updates != nil {
|
||||
defer close(o.updates)
|
||||
defer func() { sendError(o.updates, rerr) }()
|
||||
|
||||
// TODO: support streaming layers which update the total count as they write.
|
||||
if _, ok := layer.(*stream.Layer); ok {
|
||||
return errors.New("cannot use stream.Layer and WithProgress")
|
||||
}
|
||||
size, err := layer.Size()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
w.lastUpdate = &v1.Update{Total: size}
|
||||
}
|
||||
return w.uploadOne(layer)
|
||||
}
|
||||
|
||||
// Tag adds a tag to the given Taggable.
|
||||
// Tag adds a tag to the given Taggable via PUT /v2/.../manifests/<tag>
|
||||
//
|
||||
// Notable implementations of Taggable are v1.Image, v1.ImageIndex, and
|
||||
// remote.Descriptor.
|
||||
//
|
||||
// If t implements MediaType, we will use that for the Content-Type, otherwise
|
||||
// we will default to types.DockerManifestSchema2.
|
||||
//
|
||||
// Tag does not attempt to write anything other than the manifest, so callers
|
||||
// should ensure that all blobs or manifests that are referenced by t exist
|
||||
// in the target registry.
|
||||
func Tag(tag name.Tag, t Taggable, options ...Option) error {
|
||||
o, err := makeOptions(tag.Context(), options...)
|
||||
return Put(tag, t, options...)
|
||||
}
|
||||
|
||||
// Put adds a manifest from the given Taggable via PUT /v1/.../manifest/<ref>
|
||||
//
|
||||
// Notable implementations of Taggable are v1.Image, v1.ImageIndex, and
|
||||
// remote.Descriptor.
|
||||
//
|
||||
// If t implements MediaType, we will use that for the Content-Type, otherwise
|
||||
// we will default to types.DockerManifestSchema2.
|
||||
//
|
||||
// Put does not attempt to write anything other than the manifest, so callers
|
||||
// should ensure that all blobs or manifests that are referenced by t exist
|
||||
// in the target registry.
|
||||
func Put(ref name.Reference, t Taggable, options ...Option) error {
|
||||
o, err := makeOptions(ref.Context(), options...)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
scopes := []string{tag.Scope(transport.PushScope)}
|
||||
scopes := []string{ref.Scope(transport.PushScope)}
|
||||
|
||||
// TODO: This *always* does a token exchange. For some registries,
|
||||
// that's pretty slow. Some ideas;
|
||||
@@ -595,15 +893,15 @@ func Tag(tag name.Tag, t Taggable, options ...Option) error {
|
||||
// * Allow callers to pass in a transport.Transport, typecheck
|
||||
// it to allow them to reuse the transport across multiple calls.
|
||||
// * WithTag option to do multiple manifest PUTs in commitManifest.
|
||||
tr, err := transport.NewWithContext(o.context, tag.Context().Registry, o.auth, o.transport, scopes)
|
||||
tr, err := transport.NewWithContext(o.context, ref.Context().Registry, o.auth, o.transport, scopes)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
w := writer{
|
||||
repo: tag.Context(),
|
||||
repo: ref.Context(),
|
||||
client: &http.Client{Transport: tr},
|
||||
context: o.context,
|
||||
}
|
||||
|
||||
return w.commitManifest(t, tag)
|
||||
return w.commitManifest(t, ref)
|
||||
}
|
||||
|
Reference in New Issue
Block a user