mirror of
				https://github.com/distribution/distribution.git
				synced 2025-10-26 14:55:21 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			249 lines
		
	
	
		
			6.4 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			249 lines
		
	
	
		
			6.4 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| package proxy
 | |
| 
 | |
| import (
 | |
| 	"fmt"
 | |
| 	"net/http"
 | |
| 	"net/url"
 | |
| 	"sync"
 | |
| 
 | |
| 	"github.com/docker/distribution"
 | |
| 	"github.com/docker/distribution/configuration"
 | |
| 	"github.com/docker/distribution/context"
 | |
| 	"github.com/docker/distribution/reference"
 | |
| 	"github.com/docker/distribution/registry/client"
 | |
| 	"github.com/docker/distribution/registry/client/auth"
 | |
| 	"github.com/docker/distribution/registry/client/transport"
 | |
| 	"github.com/docker/distribution/registry/proxy/scheduler"
 | |
| 	"github.com/docker/distribution/registry/storage"
 | |
| 	"github.com/docker/distribution/registry/storage/driver"
 | |
| )
 | |
| 
 | |
| // proxyingRegistry fetches content from a remote registry and caches it locally
 | |
| type proxyingRegistry struct {
 | |
| 	embedded       distribution.Namespace // provides local registry functionality
 | |
| 	scheduler      *scheduler.TTLExpirationScheduler
 | |
| 	remoteURL      url.URL
 | |
| 	authChallenger authChallenger
 | |
| }
 | |
| 
 | |
| // NewRegistryPullThroughCache creates a registry acting as a pull through cache
 | |
| func NewRegistryPullThroughCache(ctx context.Context, registry distribution.Namespace, driver driver.StorageDriver, config configuration.Proxy) (distribution.Namespace, error) {
 | |
| 	remoteURL, err := url.Parse(config.RemoteURL)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	v := storage.NewVacuum(ctx, driver)
 | |
| 	s := scheduler.New(ctx, driver, "/scheduler-state.json")
 | |
| 	s.OnBlobExpire(func(ref reference.Reference) error {
 | |
| 		var r reference.Canonical
 | |
| 		var ok bool
 | |
| 		if r, ok = ref.(reference.Canonical); !ok {
 | |
| 			return fmt.Errorf("unexpected reference type : %T", ref)
 | |
| 		}
 | |
| 
 | |
| 		repo, err := registry.Repository(ctx, r)
 | |
| 		if err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 
 | |
| 		blobs := repo.Blobs(ctx)
 | |
| 
 | |
| 		// Clear the repository reference and descriptor caches
 | |
| 		err = blobs.Delete(ctx, r.Digest())
 | |
| 		if err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 
 | |
| 		err = v.RemoveBlob(r.Digest().String())
 | |
| 		if err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 
 | |
| 		return nil
 | |
| 	})
 | |
| 
 | |
| 	s.OnManifestExpire(func(ref reference.Reference) error {
 | |
| 		var r reference.Canonical
 | |
| 		var ok bool
 | |
| 		if r, ok = ref.(reference.Canonical); !ok {
 | |
| 			return fmt.Errorf("unexpected reference type : %T", ref)
 | |
| 		}
 | |
| 
 | |
| 		repo, err := registry.Repository(ctx, r)
 | |
| 		if err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 
 | |
| 		manifests, err := repo.Manifests(ctx)
 | |
| 		if err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 		err = manifests.Delete(ctx, r.Digest())
 | |
| 		if err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 		return nil
 | |
| 	})
 | |
| 
 | |
| 	err = s.Start()
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	cs, err := configureAuth(config.Username, config.Password)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	return &proxyingRegistry{
 | |
| 		embedded:  registry,
 | |
| 		scheduler: s,
 | |
| 		remoteURL: *remoteURL,
 | |
| 		authChallenger: &remoteAuthChallenger{
 | |
| 			remoteURL: *remoteURL,
 | |
| 			cm:        auth.NewSimpleChallengeManager(),
 | |
| 			cs:        cs,
 | |
| 		},
 | |
| 	}, nil
 | |
| }
 | |
| 
 | |
| func (pr *proxyingRegistry) Scope() distribution.Scope {
 | |
| 	return distribution.GlobalScope
 | |
| }
 | |
| 
 | |
| func (pr *proxyingRegistry) Repositories(ctx context.Context, repos []string, last string) (n int, err error) {
 | |
| 	return pr.embedded.Repositories(ctx, repos, last)
 | |
| }
 | |
| 
 | |
| func (pr *proxyingRegistry) Repository(ctx context.Context, name reference.Named) (distribution.Repository, error) {
 | |
| 	c := pr.authChallenger
 | |
| 
 | |
| 	tr := transport.NewTransport(http.DefaultTransport,
 | |
| 		auth.NewAuthorizer(c.challengeManager(), auth.NewTokenHandler(http.DefaultTransport, c.credentialStore(), name.Name(), "pull")))
 | |
| 
 | |
| 	localRepo, err := pr.embedded.Repository(ctx, name)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	localManifests, err := localRepo.Manifests(ctx, storage.SkipLayerVerification())
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	remoteRepo, err := client.NewRepository(ctx, name, pr.remoteURL.String(), tr)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	remoteManifests, err := remoteRepo.Manifests(ctx)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	return &proxiedRepository{
 | |
| 		blobStore: &proxyBlobStore{
 | |
| 			localStore:     localRepo.Blobs(ctx),
 | |
| 			remoteStore:    remoteRepo.Blobs(ctx),
 | |
| 			scheduler:      pr.scheduler,
 | |
| 			repositoryName: name,
 | |
| 			authChallenger: pr.authChallenger,
 | |
| 		},
 | |
| 		manifests: &proxyManifestStore{
 | |
| 			repositoryName:  name,
 | |
| 			localManifests:  localManifests, // Options?
 | |
| 			remoteManifests: remoteManifests,
 | |
| 			ctx:             ctx,
 | |
| 			scheduler:       pr.scheduler,
 | |
| 			authChallenger:  pr.authChallenger,
 | |
| 		},
 | |
| 		name: name,
 | |
| 		tags: &proxyTagService{
 | |
| 			localTags:      localRepo.Tags(ctx),
 | |
| 			remoteTags:     remoteRepo.Tags(ctx),
 | |
| 			authChallenger: pr.authChallenger,
 | |
| 		},
 | |
| 	}, nil
 | |
| }
 | |
| 
 | |
| func (pr *proxyingRegistry) Blobs() distribution.BlobEnumerator {
 | |
| 	return pr.embedded.Blobs()
 | |
| }
 | |
| 
 | |
| func (pr *proxyingRegistry) BlobStatter() distribution.BlobStatter {
 | |
| 	return pr.embedded.BlobStatter()
 | |
| }
 | |
| 
 | |
| // authChallenger encapsulates a request to the upstream to establish credential challenges
 | |
| type authChallenger interface {
 | |
| 	tryEstablishChallenges(context.Context) error
 | |
| 	challengeManager() auth.ChallengeManager
 | |
| 	credentialStore() auth.CredentialStore
 | |
| }
 | |
| 
 | |
| type remoteAuthChallenger struct {
 | |
| 	remoteURL url.URL
 | |
| 	sync.Mutex
 | |
| 	cm auth.ChallengeManager
 | |
| 	cs auth.CredentialStore
 | |
| }
 | |
| 
 | |
| func (r *remoteAuthChallenger) credentialStore() auth.CredentialStore {
 | |
| 	return r.cs
 | |
| }
 | |
| 
 | |
| func (r *remoteAuthChallenger) challengeManager() auth.ChallengeManager {
 | |
| 	return r.cm
 | |
| }
 | |
| 
 | |
| // tryEstablishChallenges will attempt to get a challenge type for the upstream if none currently exist
 | |
| func (r *remoteAuthChallenger) tryEstablishChallenges(ctx context.Context) error {
 | |
| 	r.Lock()
 | |
| 	defer r.Unlock()
 | |
| 
 | |
| 	remoteURL := r.remoteURL
 | |
| 	remoteURL.Path = "/v2/"
 | |
| 	challenges, err := r.cm.GetChallenges(r.remoteURL)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	if len(challenges) > 0 {
 | |
| 		return nil
 | |
| 	}
 | |
| 
 | |
| 	// establish challenge type with upstream
 | |
| 	if err := ping(r.cm, remoteURL.String(), challengeHeader); err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	context.GetLogger(ctx).Infof("Challenge established with upstream : %s %s", remoteURL, r.cm)
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // proxiedRepository uses proxying blob and manifest services to serve content
 | |
| // locally, or pulling it through from a remote and caching it locally if it doesn't
 | |
| // already exist
 | |
| type proxiedRepository struct {
 | |
| 	blobStore distribution.BlobStore
 | |
| 	manifests distribution.ManifestService
 | |
| 	name      reference.Named
 | |
| 	tags      distribution.TagService
 | |
| }
 | |
| 
 | |
| func (pr *proxiedRepository) Manifests(ctx context.Context, options ...distribution.ManifestServiceOption) (distribution.ManifestService, error) {
 | |
| 	return pr.manifests, nil
 | |
| }
 | |
| 
 | |
| func (pr *proxiedRepository) Blobs(ctx context.Context) distribution.BlobStore {
 | |
| 	return pr.blobStore
 | |
| }
 | |
| 
 | |
| func (pr *proxiedRepository) Named() reference.Named {
 | |
| 	return pr.name
 | |
| }
 | |
| 
 | |
| func (pr *proxiedRepository) Tags(ctx context.Context) distribution.TagService {
 | |
| 	return pr.tags
 | |
| }
 |