mirror of
https://github.com/distribution/distribution.git
synced 2025-09-23 20:16:53 +00:00
registry/storage/cache/memory: Use LRU cache to bound cache size
Instead of letting the cache grow without bound, use a LRU to impose a size limit. The limit is configurable through a new `blobdescriptorsize` config key. Signed-off-by: Aaron Lehmann <alehmann@netflix.com>
This commit is contained in:
@@ -25,7 +25,7 @@ func TestWriteSeek(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
imageName, _ := reference.WithName("foo/bar")
|
||||
driver := testdriver.New()
|
||||
registry, err := NewRegistry(ctx, driver, BlobDescriptorCacheProvider(memory.NewInMemoryBlobDescriptorCacheProvider()), EnableDelete, EnableRedirect)
|
||||
registry, err := NewRegistry(ctx, driver, BlobDescriptorCacheProvider(memory.NewInMemoryBlobDescriptorCacheProvider(memory.UnlimitedSize)), EnableDelete, EnableRedirect)
|
||||
if err != nil {
|
||||
t.Fatalf("error creating registry: %v", err)
|
||||
}
|
||||
@@ -61,7 +61,7 @@ func TestSimpleBlobUpload(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
imageName, _ := reference.WithName("foo/bar")
|
||||
driver := testdriver.New()
|
||||
registry, err := NewRegistry(ctx, driver, BlobDescriptorCacheProvider(memory.NewInMemoryBlobDescriptorCacheProvider()), EnableDelete, EnableRedirect)
|
||||
registry, err := NewRegistry(ctx, driver, BlobDescriptorCacheProvider(memory.NewInMemoryBlobDescriptorCacheProvider(memory.UnlimitedSize)), EnableDelete, EnableRedirect)
|
||||
if err != nil {
|
||||
t.Fatalf("error creating registry: %v", err)
|
||||
}
|
||||
@@ -234,7 +234,7 @@ func TestSimpleBlobUpload(t *testing.T) {
|
||||
}
|
||||
|
||||
// Reuse state to test delete with a delete-disabled registry
|
||||
registry, err = NewRegistry(ctx, driver, BlobDescriptorCacheProvider(memory.NewInMemoryBlobDescriptorCacheProvider()), EnableRedirect)
|
||||
registry, err = NewRegistry(ctx, driver, BlobDescriptorCacheProvider(memory.NewInMemoryBlobDescriptorCacheProvider(memory.UnlimitedSize)), EnableRedirect)
|
||||
if err != nil {
|
||||
t.Fatalf("error creating registry: %v", err)
|
||||
}
|
||||
@@ -256,7 +256,7 @@ func TestSimpleBlobRead(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
imageName, _ := reference.WithName("foo/bar")
|
||||
driver := testdriver.New()
|
||||
registry, err := NewRegistry(ctx, driver, BlobDescriptorCacheProvider(memory.NewInMemoryBlobDescriptorCacheProvider()), EnableDelete, EnableRedirect)
|
||||
registry, err := NewRegistry(ctx, driver, BlobDescriptorCacheProvider(memory.NewInMemoryBlobDescriptorCacheProvider(memory.UnlimitedSize)), EnableDelete, EnableRedirect)
|
||||
if err != nil {
|
||||
t.Fatalf("error creating registry: %v", err)
|
||||
}
|
||||
@@ -368,7 +368,7 @@ func TestBlobMount(t *testing.T) {
|
||||
imageName, _ := reference.WithName("foo/bar")
|
||||
sourceImageName, _ := reference.WithName("foo/source")
|
||||
driver := testdriver.New()
|
||||
registry, err := NewRegistry(ctx, driver, BlobDescriptorCacheProvider(memory.NewInMemoryBlobDescriptorCacheProvider()), EnableDelete, EnableRedirect)
|
||||
registry, err := NewRegistry(ctx, driver, BlobDescriptorCacheProvider(memory.NewInMemoryBlobDescriptorCacheProvider(memory.UnlimitedSize)), EnableDelete, EnableRedirect)
|
||||
if err != nil {
|
||||
t.Fatalf("error creating registry: %v", err)
|
||||
}
|
||||
@@ -519,7 +519,7 @@ func TestLayerUploadZeroLength(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
imageName, _ := reference.WithName("foo/bar")
|
||||
driver := testdriver.New()
|
||||
registry, err := NewRegistry(ctx, driver, BlobDescriptorCacheProvider(memory.NewInMemoryBlobDescriptorCacheProvider()), EnableDelete, EnableRedirect)
|
||||
registry, err := NewRegistry(ctx, driver, BlobDescriptorCacheProvider(memory.NewInMemoryBlobDescriptorCacheProvider(memory.UnlimitedSize)), EnableDelete, EnableRedirect)
|
||||
if err != nil {
|
||||
t.Fatalf("error creating registry: %v", err)
|
||||
}
|
||||
|
195
registry/storage/cache/memory/memory.go
vendored
195
registry/storage/cache/memory/memory.go
vendored
@@ -2,26 +2,46 @@ package memory
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"math"
|
||||
|
||||
"github.com/distribution/distribution/v3"
|
||||
"github.com/distribution/distribution/v3/reference"
|
||||
"github.com/distribution/distribution/v3/registry/storage/cache"
|
||||
lru "github.com/hashicorp/golang-lru"
|
||||
"github.com/opencontainers/go-digest"
|
||||
)
|
||||
|
||||
const (
|
||||
// DefaultSize is the default cache size to use if no size is explicitly
|
||||
// configured.
|
||||
DefaultSize = 10000
|
||||
|
||||
// UnlimitedSize indicates the cache size should not be limited.
|
||||
UnlimitedSize = math.MaxInt
|
||||
)
|
||||
|
||||
type descriptorCacheKey struct {
|
||||
digest digest.Digest
|
||||
repo string
|
||||
}
|
||||
|
||||
type inMemoryBlobDescriptorCacheProvider struct {
|
||||
global *mapBlobDescriptorCache
|
||||
repositories map[string]*mapBlobDescriptorCache
|
||||
mu sync.RWMutex
|
||||
lru *lru.ARCCache
|
||||
}
|
||||
|
||||
// NewInMemoryBlobDescriptorCacheProvider returns a new mapped-based cache for
|
||||
// storing blob descriptor data.
|
||||
func NewInMemoryBlobDescriptorCacheProvider() cache.BlobDescriptorCacheProvider {
|
||||
func NewInMemoryBlobDescriptorCacheProvider(size int) cache.BlobDescriptorCacheProvider {
|
||||
if size <= 0 {
|
||||
size = math.MaxInt
|
||||
}
|
||||
lruCache, err := lru.NewARC(size)
|
||||
if err != nil {
|
||||
// NewARC can only fail if size is <= 0, so this unreachable
|
||||
panic(err)
|
||||
}
|
||||
return &inMemoryBlobDescriptorCacheProvider{
|
||||
global: newMapBlobDescriptorCache(),
|
||||
repositories: make(map[string]*mapBlobDescriptorCache),
|
||||
lru: lruCache,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -30,39 +50,63 @@ func (imbdcp *inMemoryBlobDescriptorCacheProvider) RepositoryScoped(repo string)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
imbdcp.mu.RLock()
|
||||
defer imbdcp.mu.RUnlock()
|
||||
|
||||
return &repositoryScopedInMemoryBlobDescriptorCache{
|
||||
repo: repo,
|
||||
parent: imbdcp,
|
||||
repository: imbdcp.repositories[repo],
|
||||
repo: repo,
|
||||
parent: imbdcp,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (imbdcp *inMemoryBlobDescriptorCacheProvider) Stat(ctx context.Context, dgst digest.Digest) (distribution.Descriptor, error) {
|
||||
return imbdcp.global.Stat(ctx, dgst)
|
||||
if err := dgst.Validate(); err != nil {
|
||||
return distribution.Descriptor{}, err
|
||||
}
|
||||
|
||||
key := descriptorCacheKey{
|
||||
digest: dgst,
|
||||
}
|
||||
descriptor, ok := imbdcp.lru.Get(key)
|
||||
if ok {
|
||||
// Type assertion not really necessary, but included in case
|
||||
// it's necessary for the fuzzer
|
||||
if desc, ok := descriptor.(distribution.Descriptor); ok {
|
||||
return desc, nil
|
||||
}
|
||||
}
|
||||
return distribution.Descriptor{}, distribution.ErrBlobUnknown
|
||||
}
|
||||
|
||||
func (imbdcp *inMemoryBlobDescriptorCacheProvider) Clear(ctx context.Context, dgst digest.Digest) error {
|
||||
return imbdcp.global.Clear(ctx, dgst)
|
||||
key := descriptorCacheKey{
|
||||
digest: dgst,
|
||||
}
|
||||
imbdcp.lru.Remove(key)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (imbdcp *inMemoryBlobDescriptorCacheProvider) SetDescriptor(ctx context.Context, dgst digest.Digest, desc distribution.Descriptor) error {
|
||||
_, err := imbdcp.Stat(ctx, dgst)
|
||||
if err == distribution.ErrBlobUnknown {
|
||||
|
||||
if dgst.Algorithm() != desc.Digest.Algorithm() && dgst != desc.Digest {
|
||||
// if the digests differ, set the other canonical mapping
|
||||
if err := imbdcp.global.SetDescriptor(ctx, desc.Digest, desc); err != nil {
|
||||
if err := imbdcp.SetDescriptor(ctx, desc.Digest, desc); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// unknown, just set it
|
||||
return imbdcp.global.SetDescriptor(ctx, dgst, desc)
|
||||
}
|
||||
if err := dgst.Validate(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := cache.ValidateDescriptor(desc); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
key := descriptorCacheKey{
|
||||
digest: dgst,
|
||||
}
|
||||
imbdcp.lru.Add(key, desc)
|
||||
return nil
|
||||
}
|
||||
// we already know it, do nothing
|
||||
return err
|
||||
}
|
||||
@@ -71,98 +115,40 @@ func (imbdcp *inMemoryBlobDescriptorCacheProvider) SetDescriptor(ctx context.Con
|
||||
// repository cache. Instances are not thread-safe but the delegated
|
||||
// operations are.
|
||||
type repositoryScopedInMemoryBlobDescriptorCache struct {
|
||||
repo string
|
||||
parent *inMemoryBlobDescriptorCacheProvider // allows lazy allocation of repo's map
|
||||
repository *mapBlobDescriptorCache
|
||||
repo string
|
||||
parent *inMemoryBlobDescriptorCacheProvider // allows lazy allocation of repo's map
|
||||
}
|
||||
|
||||
func (rsimbdcp *repositoryScopedInMemoryBlobDescriptorCache) Stat(ctx context.Context, dgst digest.Digest) (distribution.Descriptor, error) {
|
||||
rsimbdcp.parent.mu.Lock()
|
||||
repo := rsimbdcp.repository
|
||||
rsimbdcp.parent.mu.Unlock()
|
||||
|
||||
if repo == nil {
|
||||
return distribution.Descriptor{}, distribution.ErrBlobUnknown
|
||||
}
|
||||
|
||||
return repo.Stat(ctx, dgst)
|
||||
}
|
||||
|
||||
func (rsimbdcp *repositoryScopedInMemoryBlobDescriptorCache) Clear(ctx context.Context, dgst digest.Digest) error {
|
||||
rsimbdcp.parent.mu.Lock()
|
||||
repo := rsimbdcp.repository
|
||||
rsimbdcp.parent.mu.Unlock()
|
||||
|
||||
if repo == nil {
|
||||
return distribution.ErrBlobUnknown
|
||||
}
|
||||
|
||||
return repo.Clear(ctx, dgst)
|
||||
}
|
||||
|
||||
func (rsimbdcp *repositoryScopedInMemoryBlobDescriptorCache) SetDescriptor(ctx context.Context, dgst digest.Digest, desc distribution.Descriptor) error {
|
||||
rsimbdcp.parent.mu.Lock()
|
||||
repo := rsimbdcp.repository
|
||||
if repo == nil {
|
||||
// allocate map since we are setting it now.
|
||||
var ok bool
|
||||
// have to read back value since we may have allocated elsewhere.
|
||||
repo, ok = rsimbdcp.parent.repositories[rsimbdcp.repo]
|
||||
if !ok {
|
||||
repo = newMapBlobDescriptorCache()
|
||||
rsimbdcp.parent.repositories[rsimbdcp.repo] = repo
|
||||
}
|
||||
rsimbdcp.repository = repo
|
||||
}
|
||||
rsimbdcp.parent.mu.Unlock()
|
||||
|
||||
if err := repo.SetDescriptor(ctx, dgst, desc); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return rsimbdcp.parent.SetDescriptor(ctx, dgst, desc)
|
||||
}
|
||||
|
||||
// mapBlobDescriptorCache provides a simple map-based implementation of the
|
||||
// descriptor cache.
|
||||
type mapBlobDescriptorCache struct {
|
||||
descriptors map[digest.Digest]distribution.Descriptor
|
||||
mu sync.RWMutex
|
||||
}
|
||||
|
||||
var _ distribution.BlobDescriptorService = &mapBlobDescriptorCache{}
|
||||
|
||||
func newMapBlobDescriptorCache() *mapBlobDescriptorCache {
|
||||
return &mapBlobDescriptorCache{
|
||||
descriptors: make(map[digest.Digest]distribution.Descriptor),
|
||||
}
|
||||
}
|
||||
|
||||
func (mbdc *mapBlobDescriptorCache) Stat(ctx context.Context, dgst digest.Digest) (distribution.Descriptor, error) {
|
||||
if err := dgst.Validate(); err != nil {
|
||||
return distribution.Descriptor{}, err
|
||||
}
|
||||
|
||||
mbdc.mu.RLock()
|
||||
defer mbdc.mu.RUnlock()
|
||||
|
||||
desc, ok := mbdc.descriptors[dgst]
|
||||
if !ok {
|
||||
return distribution.Descriptor{}, distribution.ErrBlobUnknown
|
||||
key := descriptorCacheKey{
|
||||
digest: dgst,
|
||||
repo: rsimbdcp.repo,
|
||||
}
|
||||
|
||||
return desc, nil
|
||||
descriptor, ok := rsimbdcp.parent.lru.Get(key)
|
||||
if ok {
|
||||
// Type assertion not really necessary, but included in case
|
||||
// it's necessary for the fuzzer
|
||||
if desc, ok := descriptor.(distribution.Descriptor); ok {
|
||||
return desc, nil
|
||||
}
|
||||
}
|
||||
return distribution.Descriptor{}, distribution.ErrBlobUnknown
|
||||
}
|
||||
|
||||
func (mbdc *mapBlobDescriptorCache) Clear(ctx context.Context, dgst digest.Digest) error {
|
||||
mbdc.mu.Lock()
|
||||
defer mbdc.mu.Unlock()
|
||||
|
||||
delete(mbdc.descriptors, dgst)
|
||||
func (rsimbdcp *repositoryScopedInMemoryBlobDescriptorCache) Clear(ctx context.Context, dgst digest.Digest) error {
|
||||
key := descriptorCacheKey{
|
||||
digest: dgst,
|
||||
repo: rsimbdcp.repo,
|
||||
}
|
||||
rsimbdcp.parent.lru.Remove(key)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (mbdc *mapBlobDescriptorCache) SetDescriptor(ctx context.Context, dgst digest.Digest, desc distribution.Descriptor) error {
|
||||
func (rsimbdcp *repositoryScopedInMemoryBlobDescriptorCache) SetDescriptor(ctx context.Context, dgst digest.Digest, desc distribution.Descriptor) error {
|
||||
if err := dgst.Validate(); err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -171,9 +157,10 @@ func (mbdc *mapBlobDescriptorCache) SetDescriptor(ctx context.Context, dgst dige
|
||||
return err
|
||||
}
|
||||
|
||||
mbdc.mu.Lock()
|
||||
defer mbdc.mu.Unlock()
|
||||
|
||||
mbdc.descriptors[dgst] = desc
|
||||
return nil
|
||||
key := descriptorCacheKey{
|
||||
digest: dgst,
|
||||
repo: rsimbdcp.repo,
|
||||
}
|
||||
rsimbdcp.parent.lru.Add(key, desc)
|
||||
return rsimbdcp.parent.SetDescriptor(ctx, dgst, desc)
|
||||
}
|
||||
|
2
registry/storage/cache/memory/memory_test.go
vendored
2
registry/storage/cache/memory/memory_test.go
vendored
@@ -9,5 +9,5 @@ import (
|
||||
// TestInMemoryBlobInfoCache checks the in memory implementation is working
|
||||
// correctly.
|
||||
func TestInMemoryBlobInfoCache(t *testing.T) {
|
||||
cachecheck.CheckBlobDescriptorCache(t, NewInMemoryBlobDescriptorCacheProvider())
|
||||
cachecheck.CheckBlobDescriptorCache(t, NewInMemoryBlobDescriptorCacheProvider(UnlimitedSize))
|
||||
}
|
||||
|
@@ -26,7 +26,7 @@ type setupEnv struct {
|
||||
func setupFS(t *testing.T) *setupEnv {
|
||||
d := inmemory.New()
|
||||
ctx := context.Background()
|
||||
registry, err := NewRegistry(ctx, d, BlobDescriptorCacheProvider(memory.NewInMemoryBlobDescriptorCacheProvider()), EnableRedirect, EnableSchema1)
|
||||
registry, err := NewRegistry(ctx, d, BlobDescriptorCacheProvider(memory.NewInMemoryBlobDescriptorCacheProvider(memory.UnlimitedSize)), EnableRedirect, EnableSchema1)
|
||||
if err != nil {
|
||||
t.Fatalf("error creating registry: %v", err)
|
||||
}
|
||||
@@ -207,7 +207,7 @@ func testEq(a, b []string, size int) bool {
|
||||
func setupBadWalkEnv(t *testing.T) *setupEnv {
|
||||
d := newBadListDriver()
|
||||
ctx := context.Background()
|
||||
registry, err := NewRegistry(ctx, d, BlobDescriptorCacheProvider(memory.NewInMemoryBlobDescriptorCacheProvider()), EnableRedirect, EnableSchema1)
|
||||
registry, err := NewRegistry(ctx, d, BlobDescriptorCacheProvider(memory.NewInMemoryBlobDescriptorCacheProvider(memory.UnlimitedSize)), EnableRedirect, EnableSchema1)
|
||||
if err != nil {
|
||||
t.Fatalf("error creating registry: %v", err)
|
||||
}
|
||||
|
@@ -59,7 +59,7 @@ func TestManifestStorage(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
testManifestStorage(t, true, BlobDescriptorCacheProvider(memory.NewInMemoryBlobDescriptorCacheProvider()), EnableDelete, EnableRedirect, Schema1SigningKey(k), EnableSchema1)
|
||||
testManifestStorage(t, true, BlobDescriptorCacheProvider(memory.NewInMemoryBlobDescriptorCacheProvider(memory.UnlimitedSize)), EnableDelete, EnableRedirect, Schema1SigningKey(k), EnableSchema1)
|
||||
}
|
||||
|
||||
func TestManifestStorageV1Unsupported(t *testing.T) {
|
||||
@@ -67,7 +67,7 @@ func TestManifestStorageV1Unsupported(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
testManifestStorage(t, false, BlobDescriptorCacheProvider(memory.NewInMemoryBlobDescriptorCacheProvider()), EnableDelete, EnableRedirect, Schema1SigningKey(k))
|
||||
testManifestStorage(t, false, BlobDescriptorCacheProvider(memory.NewInMemoryBlobDescriptorCacheProvider(memory.UnlimitedSize)), EnableDelete, EnableRedirect, Schema1SigningKey(k))
|
||||
}
|
||||
|
||||
func testManifestStorage(t *testing.T, schema1Enabled bool, options ...RegistryOption) {
|
||||
@@ -357,7 +357,7 @@ func testManifestStorage(t *testing.T, schema1Enabled bool, options ...RegistryO
|
||||
t.Errorf("Deleted manifest get returned non-nil")
|
||||
}
|
||||
|
||||
r, err := NewRegistry(ctx, env.driver, BlobDescriptorCacheProvider(memory.NewInMemoryBlobDescriptorCacheProvider()), EnableRedirect)
|
||||
r, err := NewRegistry(ctx, env.driver, BlobDescriptorCacheProvider(memory.NewInMemoryBlobDescriptorCacheProvider(memory.UnlimitedSize)), EnableRedirect)
|
||||
if err != nil {
|
||||
t.Fatalf("error creating registry: %v", err)
|
||||
}
|
||||
@@ -393,7 +393,7 @@ func testOCIManifestStorage(t *testing.T, testname string, includeMediaTypes boo
|
||||
|
||||
repoName, _ := reference.WithName("foo/bar")
|
||||
env := newManifestStoreTestEnv(t, repoName, "thetag",
|
||||
BlobDescriptorCacheProvider(memory.NewInMemoryBlobDescriptorCacheProvider()),
|
||||
BlobDescriptorCacheProvider(memory.NewInMemoryBlobDescriptorCacheProvider(memory.UnlimitedSize)),
|
||||
EnableDelete, EnableRedirect)
|
||||
|
||||
ctx := context.Background()
|
||||
|
Reference in New Issue
Block a user