From 940c1b7b3b0503a8d62e072514d705e6ff54fbcd Mon Sep 17 00:00:00 2001 From: Avi Deitcher Date: Mon, 30 Jun 2025 20:58:50 +0300 Subject: [PATCH] simplify cache locking (#4136) Signed-off-by: Avi Deitcher --- src/cmd/linuxkit/cache/cacheindex.go | 71 -------------------------- src/cmd/linuxkit/cache/const.go | 5 ++ src/cmd/linuxkit/cache/open.go | 51 +++++++++--------- src/cmd/linuxkit/cache/provider.go | 53 +++++++++++++++++-- src/cmd/linuxkit/cache/pull.go | 6 +++ src/cmd/linuxkit/cache/write.go | 59 +++++++++++++++++++++ src/cmd/linuxkit/util/filelock_unix.go | 9 +++- 7 files changed, 152 insertions(+), 102 deletions(-) delete mode 100644 src/cmd/linuxkit/cache/cacheindex.go create mode 100644 src/cmd/linuxkit/cache/const.go diff --git a/src/cmd/linuxkit/cache/cacheindex.go b/src/cmd/linuxkit/cache/cacheindex.go deleted file mode 100644 index 31bd6328b..000000000 --- a/src/cmd/linuxkit/cache/cacheindex.go +++ /dev/null @@ -1,71 +0,0 @@ -// ALL writes to index.json at the root of the cache directory -// must be done through calls in this file. This is to ensure that it always does -// proper locking. -package cache - -import ( - "errors" - "fmt" - "path/filepath" - - v1 "github.com/google/go-containerregistry/pkg/v1" - "github.com/google/go-containerregistry/pkg/v1/match" - "github.com/linuxkit/linuxkit/src/cmd/linuxkit/util" - imagespec "github.com/opencontainers/image-spec/specs-go/v1" - log "github.com/sirupsen/logrus" -) - -const ( - indexFile = "index.json" -) - -// DescriptorWrite writes a descriptor to the cache index; it validates that it has a name -// and replaces any existing one -func (p *Provider) DescriptorWrite(image string, desc v1.Descriptor) error { - if image == "" { - return errors.New("cannot write descriptor without reference name") - } - if desc.Annotations == nil { - desc.Annotations = map[string]string{} - } - desc.Annotations[imagespec.AnnotationRefName] = image - log.Debugf("writing descriptor for image %s", image) - - // get our lock - lock, err := util.Lock(filepath.Join(p.dir, indexFile)) - if err != nil { - return fmt.Errorf("unable to lock cache index for writing descriptor for %s: %v", image, err) - } - defer func() { - if err := lock.Unlock(); err != nil { - log.Errorf("unable to close lock for cache index after writing descriptor for %s: %v", image, err) - } - }() - - // do we update an existing one? Or create a new one? - if err := p.cache.RemoveDescriptors(match.Name(image)); err != nil { - return fmt.Errorf("unable to remove old descriptors for %s: %v", image, err) - } - - if err := p.cache.AppendDescriptor(desc); err != nil { - return fmt.Errorf("unable to append new descriptor for %s: %v", image, err) - } - - return nil -} - -// RemoveDescriptors removes all descriptors that match the provided matcher. -// It does so in a parallel-access-safe way -func (p *Provider) RemoveDescriptors(matcher match.Matcher) error { - // get our lock - lock, err := util.Lock(filepath.Join(p.dir, indexFile)) - if err != nil { - return fmt.Errorf("unable to lock cache index for removing descriptor for %v: %v", matcher, err) - } - defer func() { - if err := lock.Unlock(); err != nil { - log.Errorf("unable to close lock for cache index after writing descriptor for %v: %v", matcher, err) - } - }() - return p.cache.RemoveDescriptors(matcher) -} diff --git a/src/cmd/linuxkit/cache/const.go b/src/cmd/linuxkit/cache/const.go new file mode 100644 index 000000000..b38f8c48a --- /dev/null +++ b/src/cmd/linuxkit/cache/const.go @@ -0,0 +1,5 @@ +package cache + +const ( + lockfile = ".lock" +) diff --git a/src/cmd/linuxkit/cache/open.go b/src/cmd/linuxkit/cache/open.go index 7c2025398..f7b7ebc84 100644 --- a/src/cmd/linuxkit/cache/open.go +++ b/src/cmd/linuxkit/cache/open.go @@ -3,42 +3,43 @@ package cache import ( "fmt" "os" - "path/filepath" "github.com/google/go-containerregistry/pkg/v1/empty" "github.com/google/go-containerregistry/pkg/v1/layout" - "github.com/linuxkit/linuxkit/src/cmd/linuxkit/util" - log "github.com/sirupsen/logrus" -) - -var ( - newIndexLockFile = filepath.Join(os.TempDir(), "linuxkit-new-cache-index.lock") ) // Get get or initialize the cache -func Get(cache string) (layout.Path, error) { +func (p *Provider) Get(cache string) (layout.Path, error) { + // ensure the dir exists + if err := os.MkdirAll(cache, os.ModePerm); err != nil { + return "", fmt.Errorf("unable to create cache directory %s: %v", cache, err) + } + + // first try to read the layout from the path + // if it exists, we can use it + // if it does not exist, we will initialize it + // + // do not lock for first read, because we do not need the lock except for initialization + // and future writes, so why slow down reads? + l, err := layout.FromPath(cache) + // initialize the cache path if needed - p, err := layout.FromPath(cache) if err != nil { - if err := os.WriteFile(newIndexLockFile, []byte{}, 0644); err != nil { - return "", fmt.Errorf("unable to create lock file %s for writing descriptor for new cache %s: %v", newIndexLockFile, cache, err) + if err := p.Lock(); err != nil { + return "", fmt.Errorf("unable to lock cache %s: %v", cache, err) } - lock, err := util.Lock(newIndexLockFile) + defer p.Unlock() + + // after lock, try to read the layout again + // in case another process initialized it while we were waiting for the lock + // if it still does not exist, we will initialize it + l, err = layout.FromPath(cache) if err != nil { - return "", fmt.Errorf("unable to retrieve lock for writing descriptor for new cache %s: %v", newIndexLockFile, err) - } - defer func() { - if err := lock.Unlock(); err != nil { - log.Errorf("unable to close lock for cache index after writing descriptor for new cache: %v", err) + l, err = layout.Write(cache, empty.Index) + if err != nil { + return l, fmt.Errorf("could not initialize cache at path %s: %v", cache, err) } - if err := os.RemoveAll(newIndexLockFile); err != nil { - log.Errorf("unable to remove lock file %s after writing descriptor for new cache: %v", newIndexLockFile, err) - } - }() - p, err = layout.Write(cache, empty.Index) - if err != nil { - return p, fmt.Errorf("could not initialize cache at path %s: %v", cache, err) } } - return p, nil + return l, nil } diff --git a/src/cmd/linuxkit/cache/provider.go b/src/cmd/linuxkit/cache/provider.go index ec2a12f6d..da0b6b969 100644 --- a/src/cmd/linuxkit/cache/provider.go +++ b/src/cmd/linuxkit/cache/provider.go @@ -1,21 +1,30 @@ package cache import ( + "fmt" + "path/filepath" + "sync" + "github.com/containerd/containerd/v2/core/content" "github.com/containerd/containerd/v2/plugins/content/local" "github.com/google/go-containerregistry/pkg/v1/layout" + "github.com/linuxkit/linuxkit/src/cmd/linuxkit/util" + log "github.com/sirupsen/logrus" ) // Provider cache implementation of cacheProvider type Provider struct { - cache layout.Path - store content.Store - dir string + cache layout.Path + store content.Store + dir string + lock *util.FileLock + lockMut sync.Mutex } // NewProvider create a new CacheProvider based in the provided directory func NewProvider(dir string) (*Provider, error) { - p, err := Get(dir) + p := &Provider{dir: dir, lockMut: sync.Mutex{}} + layout, err := p.Get(dir) if err != nil { return nil, err } @@ -23,5 +32,39 @@ func NewProvider(dir string) (*Provider, error) { if err != nil { return nil, err } - return &Provider{p, store, dir}, nil + p.cache = layout + p.store = store + return p, nil +} + +// Lock locks the cache directory to prevent concurrent access +func (p *Provider) Lock() error { + // if the lock is already set, we do not need to do anything + if p.lock != nil { + return nil + } + p.lockMut.Lock() + defer p.lockMut.Unlock() + var lockFile = filepath.Join(p.dir, lockfile) + lock, err := util.Lock(lockFile) + if err != nil { + return fmt.Errorf("unable to retrieve cache lock %s: %v", lockFile, err) + } + p.lock = lock + return nil +} + +// Unlock releases the lock on the cache directory +func (p *Provider) Unlock() { + p.lockMut.Lock() + defer p.lockMut.Unlock() + // if the lock is not set, we do not need to do anything + if p.lock == nil { + return + } + var lockFile = filepath.Join(p.dir, lockfile) + if err := p.lock.Unlock(); err != nil { + log.Errorf("unable to close lock for cache %s: %v", lockFile, err) + } + p.lock = nil } diff --git a/src/cmd/linuxkit/cache/pull.go b/src/cmd/linuxkit/cache/pull.go index c5206085d..b10e2ef91 100644 --- a/src/cmd/linuxkit/cache/pull.go +++ b/src/cmd/linuxkit/cache/pull.go @@ -180,6 +180,12 @@ func (p *Provider) Pull(name string, withArchReferences bool) error { return fmt.Errorf("error getting manifest for trusted image %s: %v", name, err) } + // lock the cache so we can write to it + if err := p.Lock(); err != nil { + return fmt.Errorf("unable to lock cache for writing: %v", err) + } + defer p.Unlock() + // first attempt as an index ii, err := desc.ImageIndex() if err == nil { diff --git a/src/cmd/linuxkit/cache/write.go b/src/cmd/linuxkit/cache/write.go index 890b83723..83da2775e 100644 --- a/src/cmd/linuxkit/cache/write.go +++ b/src/cmd/linuxkit/cache/write.go @@ -76,6 +76,12 @@ func (p *Provider) ImagePull(ref *reference.Spec, platforms []imagespec.Platform return fmt.Errorf("error getting manifest for image %s: %v", pullImageName, err) } + // get our lock + if err := p.Lock(); err != nil { + return fmt.Errorf("unable to lock cache for removing descriptors: %v", err) + } + defer p.Unlock() + // first attempt as an index ii, err := desc.ImageIndex() if err == nil { @@ -146,6 +152,11 @@ func (p *Provider) ImageLoad(r io.Reader) ([]v1.Descriptor, error) { index bytes.Buffer ) log.Debugf("ImageWriteTar to cache") + // get our lock + if err := p.Lock(); err != nil { + return nil, fmt.Errorf("unable to lock cache: %v", err) + } + defer p.Unlock() for { header, err := tr.Next() if err == io.EOF { @@ -244,6 +255,11 @@ func (p *Provider) IndexWrite(ref *reference.Spec, descriptors ...v1.Descriptor) return fmt.Errorf("error parsing index: %v", err) } var im v1.IndexManifest + // get our lock + if err := p.Lock(); err != nil { + return fmt.Errorf("unable to lock cache: %v", err) + } + defer p.Unlock() // do we update an existing one? Or create a new one? if len(indexes) > 0 { // we already had one, so update just the referenced index and return @@ -452,3 +468,46 @@ func (p *Provider) ImageInRegistry(ref *reference.Spec, trustedRef, architecture } return false, nil } + +// DescriptorWrite writes a descriptor to the cache index; it validates that it has a name +// and replaces any existing one +func (p *Provider) DescriptorWrite(image string, desc v1.Descriptor) error { + if image == "" { + return errors.New("cannot write descriptor without reference name") + } + if desc.Annotations == nil { + desc.Annotations = map[string]string{} + } + desc.Annotations[imagespec.AnnotationRefName] = image + log.Debugf("writing descriptor for image %s", image) + + // get our lock + if err := p.Lock(); err != nil { + return fmt.Errorf("unable to lock cache for writing descriptors: %v", err) + } + defer p.Unlock() + + // get our lock + // do we update an existing one? Or create a new one? + if err := p.cache.RemoveDescriptors(match.Name(image)); err != nil { + return fmt.Errorf("unable to remove old descriptors for %s: %v", image, err) + } + + if err := p.cache.AppendDescriptor(desc); err != nil { + return fmt.Errorf("unable to append new descriptor for %s: %v", image, err) + } + + return nil +} + +// RemoveDescriptors removes all descriptors that match the provided matcher. +// It does so in a parallel-access-safe way +func (p *Provider) RemoveDescriptors(matcher match.Matcher) error { + // get our lock + if err := p.Lock(); err != nil { + return fmt.Errorf("unable to lock cache for removing descriptors: %v", err) + } + defer p.Unlock() + + return p.cache.RemoveDescriptors(matcher) +} diff --git a/src/cmd/linuxkit/util/filelock_unix.go b/src/cmd/linuxkit/util/filelock_unix.go index 80f3807a1..185e73d96 100644 --- a/src/cmd/linuxkit/util/filelock_unix.go +++ b/src/cmd/linuxkit/util/filelock_unix.go @@ -35,6 +35,9 @@ func Lock(path string) (*FileLock, error) { // Unlock releases the lock and closes the file. func (l *FileLock) Unlock() error { + if l == nil || l.file == nil { + return fmt.Errorf("unlock: file handle is nil") + } flock := unix.Flock_t{ Type: unix.F_UNLCK, Whence: int16(io.SeekStart), @@ -44,7 +47,11 @@ func (l *FileLock) Unlock() error { if err := unix.FcntlFlock(l.file.Fd(), unix.F_SETLKW, &flock); err != nil { return fmt.Errorf("unlock: %w", err) } - return l.file.Close() + if err := l.file.Close(); err != nil { + return fmt.Errorf("close lock file: %w", err) + } + l.file = nil // Prevent further use of the file handle + return nil } // CheckLock attempts to detect if the file is locked by another process.