mirror of
https://github.com/linuxkit/linuxkit.git
synced 2025-07-18 17:01:07 +00:00
simplify cache locking (#4136)
Signed-off-by: Avi Deitcher <avi@deitcher.net>
This commit is contained in:
parent
818bccf20f
commit
940c1b7b3b
71
src/cmd/linuxkit/cache/cacheindex.go
vendored
71
src/cmd/linuxkit/cache/cacheindex.go
vendored
@ -1,71 +0,0 @@
|
|||||||
// ALL writes to index.json at the root of the cache directory
|
|
||||||
// must be done through calls in this file. This is to ensure that it always does
|
|
||||||
// proper locking.
|
|
||||||
package cache
|
|
||||||
|
|
||||||
import (
|
|
||||||
"errors"
|
|
||||||
"fmt"
|
|
||||||
"path/filepath"
|
|
||||||
|
|
||||||
v1 "github.com/google/go-containerregistry/pkg/v1"
|
|
||||||
"github.com/google/go-containerregistry/pkg/v1/match"
|
|
||||||
"github.com/linuxkit/linuxkit/src/cmd/linuxkit/util"
|
|
||||||
imagespec "github.com/opencontainers/image-spec/specs-go/v1"
|
|
||||||
log "github.com/sirupsen/logrus"
|
|
||||||
)
|
|
||||||
|
|
||||||
const (
|
|
||||||
indexFile = "index.json"
|
|
||||||
)
|
|
||||||
|
|
||||||
// DescriptorWrite writes a descriptor to the cache index; it validates that it has a name
|
|
||||||
// and replaces any existing one
|
|
||||||
func (p *Provider) DescriptorWrite(image string, desc v1.Descriptor) error {
|
|
||||||
if image == "" {
|
|
||||||
return errors.New("cannot write descriptor without reference name")
|
|
||||||
}
|
|
||||||
if desc.Annotations == nil {
|
|
||||||
desc.Annotations = map[string]string{}
|
|
||||||
}
|
|
||||||
desc.Annotations[imagespec.AnnotationRefName] = image
|
|
||||||
log.Debugf("writing descriptor for image %s", image)
|
|
||||||
|
|
||||||
// get our lock
|
|
||||||
lock, err := util.Lock(filepath.Join(p.dir, indexFile))
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("unable to lock cache index for writing descriptor for %s: %v", image, err)
|
|
||||||
}
|
|
||||||
defer func() {
|
|
||||||
if err := lock.Unlock(); err != nil {
|
|
||||||
log.Errorf("unable to close lock for cache index after writing descriptor for %s: %v", image, err)
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
// do we update an existing one? Or create a new one?
|
|
||||||
if err := p.cache.RemoveDescriptors(match.Name(image)); err != nil {
|
|
||||||
return fmt.Errorf("unable to remove old descriptors for %s: %v", image, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := p.cache.AppendDescriptor(desc); err != nil {
|
|
||||||
return fmt.Errorf("unable to append new descriptor for %s: %v", image, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// RemoveDescriptors removes all descriptors that match the provided matcher.
|
|
||||||
// It does so in a parallel-access-safe way
|
|
||||||
func (p *Provider) RemoveDescriptors(matcher match.Matcher) error {
|
|
||||||
// get our lock
|
|
||||||
lock, err := util.Lock(filepath.Join(p.dir, indexFile))
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("unable to lock cache index for removing descriptor for %v: %v", matcher, err)
|
|
||||||
}
|
|
||||||
defer func() {
|
|
||||||
if err := lock.Unlock(); err != nil {
|
|
||||||
log.Errorf("unable to close lock for cache index after writing descriptor for %v: %v", matcher, err)
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
return p.cache.RemoveDescriptors(matcher)
|
|
||||||
}
|
|
5
src/cmd/linuxkit/cache/const.go
vendored
Normal file
5
src/cmd/linuxkit/cache/const.go
vendored
Normal file
@ -0,0 +1,5 @@
|
|||||||
|
package cache
|
||||||
|
|
||||||
|
const (
|
||||||
|
lockfile = ".lock"
|
||||||
|
)
|
51
src/cmd/linuxkit/cache/open.go
vendored
51
src/cmd/linuxkit/cache/open.go
vendored
@ -3,42 +3,43 @@ package cache
|
|||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
|
||||||
|
|
||||||
"github.com/google/go-containerregistry/pkg/v1/empty"
|
"github.com/google/go-containerregistry/pkg/v1/empty"
|
||||||
"github.com/google/go-containerregistry/pkg/v1/layout"
|
"github.com/google/go-containerregistry/pkg/v1/layout"
|
||||||
"github.com/linuxkit/linuxkit/src/cmd/linuxkit/util"
|
|
||||||
log "github.com/sirupsen/logrus"
|
|
||||||
)
|
|
||||||
|
|
||||||
var (
|
|
||||||
newIndexLockFile = filepath.Join(os.TempDir(), "linuxkit-new-cache-index.lock")
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// Get get or initialize the cache
|
// Get get or initialize the cache
|
||||||
func Get(cache string) (layout.Path, error) {
|
func (p *Provider) Get(cache string) (layout.Path, error) {
|
||||||
|
// ensure the dir exists
|
||||||
|
if err := os.MkdirAll(cache, os.ModePerm); err != nil {
|
||||||
|
return "", fmt.Errorf("unable to create cache directory %s: %v", cache, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// first try to read the layout from the path
|
||||||
|
// if it exists, we can use it
|
||||||
|
// if it does not exist, we will initialize it
|
||||||
|
//
|
||||||
|
// do not lock for first read, because we do not need the lock except for initialization
|
||||||
|
// and future writes, so why slow down reads?
|
||||||
|
l, err := layout.FromPath(cache)
|
||||||
|
|
||||||
// initialize the cache path if needed
|
// initialize the cache path if needed
|
||||||
p, err := layout.FromPath(cache)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err := os.WriteFile(newIndexLockFile, []byte{}, 0644); err != nil {
|
if err := p.Lock(); err != nil {
|
||||||
return "", fmt.Errorf("unable to create lock file %s for writing descriptor for new cache %s: %v", newIndexLockFile, cache, err)
|
return "", fmt.Errorf("unable to lock cache %s: %v", cache, err)
|
||||||
}
|
}
|
||||||
lock, err := util.Lock(newIndexLockFile)
|
defer p.Unlock()
|
||||||
|
|
||||||
|
// after lock, try to read the layout again
|
||||||
|
// in case another process initialized it while we were waiting for the lock
|
||||||
|
// if it still does not exist, we will initialize it
|
||||||
|
l, err = layout.FromPath(cache)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", fmt.Errorf("unable to retrieve lock for writing descriptor for new cache %s: %v", newIndexLockFile, err)
|
l, err = layout.Write(cache, empty.Index)
|
||||||
}
|
if err != nil {
|
||||||
defer func() {
|
return l, fmt.Errorf("could not initialize cache at path %s: %v", cache, err)
|
||||||
if err := lock.Unlock(); err != nil {
|
|
||||||
log.Errorf("unable to close lock for cache index after writing descriptor for new cache: %v", err)
|
|
||||||
}
|
}
|
||||||
if err := os.RemoveAll(newIndexLockFile); err != nil {
|
|
||||||
log.Errorf("unable to remove lock file %s after writing descriptor for new cache: %v", newIndexLockFile, err)
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
p, err = layout.Write(cache, empty.Index)
|
|
||||||
if err != nil {
|
|
||||||
return p, fmt.Errorf("could not initialize cache at path %s: %v", cache, err)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return p, nil
|
return l, nil
|
||||||
}
|
}
|
||||||
|
53
src/cmd/linuxkit/cache/provider.go
vendored
53
src/cmd/linuxkit/cache/provider.go
vendored
@ -1,21 +1,30 @@
|
|||||||
package cache
|
package cache
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
|
"path/filepath"
|
||||||
|
"sync"
|
||||||
|
|
||||||
"github.com/containerd/containerd/v2/core/content"
|
"github.com/containerd/containerd/v2/core/content"
|
||||||
"github.com/containerd/containerd/v2/plugins/content/local"
|
"github.com/containerd/containerd/v2/plugins/content/local"
|
||||||
"github.com/google/go-containerregistry/pkg/v1/layout"
|
"github.com/google/go-containerregistry/pkg/v1/layout"
|
||||||
|
"github.com/linuxkit/linuxkit/src/cmd/linuxkit/util"
|
||||||
|
log "github.com/sirupsen/logrus"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Provider cache implementation of cacheProvider
|
// Provider cache implementation of cacheProvider
|
||||||
type Provider struct {
|
type Provider struct {
|
||||||
cache layout.Path
|
cache layout.Path
|
||||||
store content.Store
|
store content.Store
|
||||||
dir string
|
dir string
|
||||||
|
lock *util.FileLock
|
||||||
|
lockMut sync.Mutex
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewProvider create a new CacheProvider based in the provided directory
|
// NewProvider create a new CacheProvider based in the provided directory
|
||||||
func NewProvider(dir string) (*Provider, error) {
|
func NewProvider(dir string) (*Provider, error) {
|
||||||
p, err := Get(dir)
|
p := &Provider{dir: dir, lockMut: sync.Mutex{}}
|
||||||
|
layout, err := p.Get(dir)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -23,5 +32,39 @@ func NewProvider(dir string) (*Provider, error) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return &Provider{p, store, dir}, nil
|
p.cache = layout
|
||||||
|
p.store = store
|
||||||
|
return p, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Lock locks the cache directory to prevent concurrent access
|
||||||
|
func (p *Provider) Lock() error {
|
||||||
|
// if the lock is already set, we do not need to do anything
|
||||||
|
if p.lock != nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
p.lockMut.Lock()
|
||||||
|
defer p.lockMut.Unlock()
|
||||||
|
var lockFile = filepath.Join(p.dir, lockfile)
|
||||||
|
lock, err := util.Lock(lockFile)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("unable to retrieve cache lock %s: %v", lockFile, err)
|
||||||
|
}
|
||||||
|
p.lock = lock
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Unlock releases the lock on the cache directory
|
||||||
|
func (p *Provider) Unlock() {
|
||||||
|
p.lockMut.Lock()
|
||||||
|
defer p.lockMut.Unlock()
|
||||||
|
// if the lock is not set, we do not need to do anything
|
||||||
|
if p.lock == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
var lockFile = filepath.Join(p.dir, lockfile)
|
||||||
|
if err := p.lock.Unlock(); err != nil {
|
||||||
|
log.Errorf("unable to close lock for cache %s: %v", lockFile, err)
|
||||||
|
}
|
||||||
|
p.lock = nil
|
||||||
}
|
}
|
||||||
|
6
src/cmd/linuxkit/cache/pull.go
vendored
6
src/cmd/linuxkit/cache/pull.go
vendored
@ -180,6 +180,12 @@ func (p *Provider) Pull(name string, withArchReferences bool) error {
|
|||||||
return fmt.Errorf("error getting manifest for trusted image %s: %v", name, err)
|
return fmt.Errorf("error getting manifest for trusted image %s: %v", name, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// lock the cache so we can write to it
|
||||||
|
if err := p.Lock(); err != nil {
|
||||||
|
return fmt.Errorf("unable to lock cache for writing: %v", err)
|
||||||
|
}
|
||||||
|
defer p.Unlock()
|
||||||
|
|
||||||
// first attempt as an index
|
// first attempt as an index
|
||||||
ii, err := desc.ImageIndex()
|
ii, err := desc.ImageIndex()
|
||||||
if err == nil {
|
if err == nil {
|
||||||
|
59
src/cmd/linuxkit/cache/write.go
vendored
59
src/cmd/linuxkit/cache/write.go
vendored
@ -76,6 +76,12 @@ func (p *Provider) ImagePull(ref *reference.Spec, platforms []imagespec.Platform
|
|||||||
return fmt.Errorf("error getting manifest for image %s: %v", pullImageName, err)
|
return fmt.Errorf("error getting manifest for image %s: %v", pullImageName, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// get our lock
|
||||||
|
if err := p.Lock(); err != nil {
|
||||||
|
return fmt.Errorf("unable to lock cache for removing descriptors: %v", err)
|
||||||
|
}
|
||||||
|
defer p.Unlock()
|
||||||
|
|
||||||
// first attempt as an index
|
// first attempt as an index
|
||||||
ii, err := desc.ImageIndex()
|
ii, err := desc.ImageIndex()
|
||||||
if err == nil {
|
if err == nil {
|
||||||
@ -146,6 +152,11 @@ func (p *Provider) ImageLoad(r io.Reader) ([]v1.Descriptor, error) {
|
|||||||
index bytes.Buffer
|
index bytes.Buffer
|
||||||
)
|
)
|
||||||
log.Debugf("ImageWriteTar to cache")
|
log.Debugf("ImageWriteTar to cache")
|
||||||
|
// get our lock
|
||||||
|
if err := p.Lock(); err != nil {
|
||||||
|
return nil, fmt.Errorf("unable to lock cache: %v", err)
|
||||||
|
}
|
||||||
|
defer p.Unlock()
|
||||||
for {
|
for {
|
||||||
header, err := tr.Next()
|
header, err := tr.Next()
|
||||||
if err == io.EOF {
|
if err == io.EOF {
|
||||||
@ -244,6 +255,11 @@ func (p *Provider) IndexWrite(ref *reference.Spec, descriptors ...v1.Descriptor)
|
|||||||
return fmt.Errorf("error parsing index: %v", err)
|
return fmt.Errorf("error parsing index: %v", err)
|
||||||
}
|
}
|
||||||
var im v1.IndexManifest
|
var im v1.IndexManifest
|
||||||
|
// get our lock
|
||||||
|
if err := p.Lock(); err != nil {
|
||||||
|
return fmt.Errorf("unable to lock cache: %v", err)
|
||||||
|
}
|
||||||
|
defer p.Unlock()
|
||||||
// do we update an existing one? Or create a new one?
|
// do we update an existing one? Or create a new one?
|
||||||
if len(indexes) > 0 {
|
if len(indexes) > 0 {
|
||||||
// we already had one, so update just the referenced index and return
|
// we already had one, so update just the referenced index and return
|
||||||
@ -452,3 +468,46 @@ func (p *Provider) ImageInRegistry(ref *reference.Spec, trustedRef, architecture
|
|||||||
}
|
}
|
||||||
return false, nil
|
return false, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// DescriptorWrite writes a descriptor to the cache index; it validates that it has a name
|
||||||
|
// and replaces any existing one
|
||||||
|
func (p *Provider) DescriptorWrite(image string, desc v1.Descriptor) error {
|
||||||
|
if image == "" {
|
||||||
|
return errors.New("cannot write descriptor without reference name")
|
||||||
|
}
|
||||||
|
if desc.Annotations == nil {
|
||||||
|
desc.Annotations = map[string]string{}
|
||||||
|
}
|
||||||
|
desc.Annotations[imagespec.AnnotationRefName] = image
|
||||||
|
log.Debugf("writing descriptor for image %s", image)
|
||||||
|
|
||||||
|
// get our lock
|
||||||
|
if err := p.Lock(); err != nil {
|
||||||
|
return fmt.Errorf("unable to lock cache for writing descriptors: %v", err)
|
||||||
|
}
|
||||||
|
defer p.Unlock()
|
||||||
|
|
||||||
|
// get our lock
|
||||||
|
// do we update an existing one? Or create a new one?
|
||||||
|
if err := p.cache.RemoveDescriptors(match.Name(image)); err != nil {
|
||||||
|
return fmt.Errorf("unable to remove old descriptors for %s: %v", image, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := p.cache.AppendDescriptor(desc); err != nil {
|
||||||
|
return fmt.Errorf("unable to append new descriptor for %s: %v", image, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// RemoveDescriptors removes all descriptors that match the provided matcher.
|
||||||
|
// It does so in a parallel-access-safe way
|
||||||
|
func (p *Provider) RemoveDescriptors(matcher match.Matcher) error {
|
||||||
|
// get our lock
|
||||||
|
if err := p.Lock(); err != nil {
|
||||||
|
return fmt.Errorf("unable to lock cache for removing descriptors: %v", err)
|
||||||
|
}
|
||||||
|
defer p.Unlock()
|
||||||
|
|
||||||
|
return p.cache.RemoveDescriptors(matcher)
|
||||||
|
}
|
||||||
|
@ -35,6 +35,9 @@ func Lock(path string) (*FileLock, error) {
|
|||||||
|
|
||||||
// Unlock releases the lock and closes the file.
|
// Unlock releases the lock and closes the file.
|
||||||
func (l *FileLock) Unlock() error {
|
func (l *FileLock) Unlock() error {
|
||||||
|
if l == nil || l.file == nil {
|
||||||
|
return fmt.Errorf("unlock: file handle is nil")
|
||||||
|
}
|
||||||
flock := unix.Flock_t{
|
flock := unix.Flock_t{
|
||||||
Type: unix.F_UNLCK,
|
Type: unix.F_UNLCK,
|
||||||
Whence: int16(io.SeekStart),
|
Whence: int16(io.SeekStart),
|
||||||
@ -44,7 +47,11 @@ func (l *FileLock) Unlock() error {
|
|||||||
if err := unix.FcntlFlock(l.file.Fd(), unix.F_SETLKW, &flock); err != nil {
|
if err := unix.FcntlFlock(l.file.Fd(), unix.F_SETLKW, &flock); err != nil {
|
||||||
return fmt.Errorf("unlock: %w", err)
|
return fmt.Errorf("unlock: %w", err)
|
||||||
}
|
}
|
||||||
return l.file.Close()
|
if err := l.file.Close(); err != nil {
|
||||||
|
return fmt.Errorf("close lock file: %w", err)
|
||||||
|
}
|
||||||
|
l.file = nil // Prevent further use of the file handle
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// CheckLock attempts to detect if the file is locked by another process.
|
// CheckLock attempts to detect if the file is locked by another process.
|
||||||
|
Loading…
Reference in New Issue
Block a user