avoid redundant blob fetching

Signed-off-by: baojiangnan <baojn1998@163.com>
This commit is contained in:
baojiangnan
2022-01-20 11:52:34 +08:00
committed by baojiangnan.bjn
parent b56fb385f6
commit 17952924f3
2 changed files with 55 additions and 54 deletions

View File

@@ -79,35 +79,6 @@ func (pbs *proxyBlobStore) serveLocal(ctx context.Context, w http.ResponseWriter
return true, pbs.localStore.ServeBlob(ctx, w, r, dgst)
}
func (pbs *proxyBlobStore) storeLocal(ctx context.Context, dgst digest.Digest) error {
defer func() {
mu.Lock()
delete(inflight, dgst)
mu.Unlock()
}()
var desc distribution.Descriptor
var err error
var bw distribution.BlobWriter
bw, err = pbs.localStore.Create(ctx)
if err != nil {
return err
}
desc, err = pbs.copyContent(ctx, dgst, bw)
if err != nil {
return err
}
_, err = bw.Commit(ctx, desc)
if err != nil {
return err
}
return nil
}
func (pbs *proxyBlobStore) ServeBlob(ctx context.Context, w http.ResponseWriter, r *http.Request, dgst digest.Digest) error {
served, err := pbs.serveLocal(ctx, w, r, dgst)
if err != nil {
@@ -126,6 +97,9 @@ func (pbs *proxyBlobStore) ServeBlob(ctx context.Context, w http.ResponseWriter,
mu.Lock()
_, ok := inflight[dgst]
if ok {
// If the blob has been serving in other requests.
// Will return the blob from the remote store directly.
// TODO Maybe we could reuse the these blobs are serving remotely and caching locally.
mu.Unlock()
_, err := pbs.copyContent(ctx, dgst, w)
return err
@@ -133,33 +107,40 @@ func (pbs *proxyBlobStore) ServeBlob(ctx context.Context, w http.ResponseWriter,
inflight[dgst] = struct{}{}
mu.Unlock()
// storeLocalCtx will be independent with ctx, because ctx it used to fetch remote image.
// There would be a situation, that is pulling remote bytes ends before pbs.storeLocal( 'Copy', 'Commit' ...)
// Then the registry fails to cache the layer, even though the layer had been served to client.
storeLocalCtx, cancel := context.WithCancel(context.Background())
go func(dgst digest.Digest) {
defer cancel()
if err := pbs.storeLocal(storeLocalCtx, dgst); err != nil {
dcontext.GetLogger(storeLocalCtx).Errorf("Error committing to storage: %s", err.Error())
}
defer func() {
mu.Lock()
delete(inflight, dgst)
mu.Unlock()
}()
blobRef, err := reference.WithDigest(pbs.repositoryName, dgst)
if err != nil {
dcontext.GetLogger(storeLocalCtx).Errorf("Error creating reference: %s", err)
return
}
if pbs.scheduler != nil && pbs.ttl != nil {
pbs.scheduler.AddBlob(blobRef, *pbs.ttl)
}
}(dgst)
_, err = pbs.copyContent(ctx, dgst, w)
bw, err := pbs.localStore.Create(ctx)
if err != nil {
cancel()
return err
}
// Serving client and storing locally over same fetching request.
// This can prevent a redundant blob fetching.
multiWriter := io.MultiWriter(w, bw)
desc, err := pbs.copyContent(ctx, dgst, multiWriter)
if err != nil {
return err
}
_, err = bw.Commit(ctx, desc)
if err != nil {
return err
}
blobRef, err := reference.WithDigest(pbs.repositoryName, dgst)
if err != nil {
dcontext.GetLogger(ctx).Errorf("Error creating reference: %s", err)
return err
}
if pbs.scheduler != nil && pbs.ttl != nil {
pbs.scheduler.AddBlob(blobRef, *pbs.ttl)
}
return nil
}