Merge pull request #6342 from bpradipt/fsnotify

runtime: propagate configmap, secrets, downward-api etc changes for remote-hyp
This commit is contained in:
Steve Horsman
2023-03-31 09:03:44 +01:00
committed by GitHub
6 changed files with 293 additions and 4 deletions

View File

@@ -529,6 +529,12 @@ func (c *Container) mountSharedDirMounts(ctx context.Context, sharedDirMounts, i
sharedDirMounts[sharedDirMount.Destination] = sharedDirMount
}
// Start the event loop to watch for file change notifications
// The event loop will only start if there are watches added
if c.sandbox.fsShare != nil {
go c.sandbox.fsShare.StartFileEventWatcher(ctx)
}
return storages, nil
}

View File

@@ -74,4 +74,12 @@ type FilesystemSharer interface {
// UnshareRootFilesystem stops sharing a container bundle
// rootfs.
UnshareRootFilesystem(context.Context, *Container) error
// startFileEventWatcher is the event loop to detect changes in
// specific volumes - configmap, secrets, downward-api, projected-volumes
// and copy the changes to the guest
StartFileEventWatcher(context.Context) error
// Stops the event loop for file watcher
StopFileEventWatcher(context.Context)
}

View File

@@ -56,3 +56,10 @@ func (f *FilesystemShare) ShareRootFilesystem(ctx context.Context, c *Container)
func (f *FilesystemShare) UnshareRootFilesystem(ctx context.Context, c *Container) error {
return nil
}
func (f *FilesystemShare) StartFileEventWatcher(ctx context.Context) error {
return nil
}
func (f *FilesystemShare) StopFileEventWatcher(ctx context.Context) {
}

View File

@@ -14,9 +14,12 @@ import (
"io/fs"
"os"
"path/filepath"
"regexp"
"strings"
"sync"
"syscall"
"github.com/fsnotify/fsnotify"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
@@ -31,15 +34,26 @@ func unmountNoFollow(path string) error {
}
type FilesystemShare struct {
sandbox *Sandbox
sandbox *Sandbox
watcher *fsnotify.Watcher
srcDstMap map[string]string
watcherDoneChannel chan bool
sync.Mutex
prepared bool
}
func NewFilesystemShare(s *Sandbox) (FilesystemSharer, error) {
watcher, err := fsnotify.NewWatcher()
if err != nil {
return nil, fmt.Errorf("Creating watcher returned error %w", err)
}
return &FilesystemShare{
prepared: false,
sandbox: s,
prepared: false,
sandbox: s,
watcherDoneChannel: make(chan bool),
srcDstMap: make(map[string]string),
watcher: watcher,
}, nil
}
@@ -266,8 +280,46 @@ func (f *FilesystemShare) ShareFile(ctx context.Context, c *Container, m *Mount)
}
dstPath := filepath.Join(guestPath, srcPath[len(srcRoot):])
f.Logger().Infof("ShareFile: Copying file from src (%s) to dest (%s)", srcPath, dstPath)
//TODO: Improve the agent protocol, to handle the case for existing symlink.
// Currently for an existing symlink, this will fail with EEXIST.
err = f.sandbox.agent.copyFile(ctx, srcPath, dstPath)
if err != nil {
f.Logger().WithError(err).Error("Failed to copy file")
return err
}
return f.sandbox.agent.copyFile(ctx, srcPath, dstPath)
// Add fsNotify watcher for volume mounts
if strings.Contains(srcPath, "kubernetes.io~configmap") ||
strings.Contains(srcPath, "kubernetes.io~secrets") ||
strings.Contains(srcPath, "kubernetes.io~projected") ||
strings.Contains(srcPath, "kubernetes.io~downward-api") {
// fsNotify doesn't add watcher recursively.
// So we need to add the watcher for directories under kubernetes.io~configmap, kubernetes.io~secrets,
// kubernetes.io~downward-api and kubernetes.io~projected
if info.Mode().IsDir() {
// The cm dir is of the form /var/lib/kubelet/pods/<uid>/volumes/kubernetes.io~configmap/foo/{..data, key1, key2,...}
// The secrets dir is of the form /var/lib/kubelet/pods/<uid>/volumes/kubernetes.io~secrets/foo/{..data, key1, key2,...}
// The projected dir is of the form /var/lib/kubelet/pods/<uid>/volumes/kubernetes.io~projected/foo/{..data, key1, key2,...}
// The downward-api dir is of the form /var/lib/kubelet/pods/<uid>/volumes/kubernetes.io~downward-api/foo/{..data, key1, key2,...}
f.Logger().Infof("ShareFile: srcPath(%s) is a directory", srcPath)
err := f.watchDir(srcPath)
if err != nil {
f.Logger().WithError(err).Error("Failed to watch directory")
return err
}
} else {
f.Logger().Infof("ShareFile: srcPath(%s) is not a directory", srcPath)
}
// Add the source and destination to the global map which will be used by the event loop
// to copy the modified content to the destination
f.Logger().Infof("ShareFile: Adding srcPath(%s) dstPath(%s) to srcDstMap", srcPath, dstPath)
f.srcDstMap[srcPath] = dstPath
}
return nil
}
if err := filepath.WalkDir(srcRoot, walk); err != nil {
@@ -507,3 +559,217 @@ func (f *FilesystemShare) UnshareRootFilesystem(ctx context.Context, c *Containe
return nil
}
func (f *FilesystemShare) watchDir(source string) error {
// Add a watcher for the configmap, secrets, projected-volumes and downwar-api directories
// /var/lib/kubelet/pods/<uid>/volumes/{kubernetes.io~configmap, kubernetes.io~secrets, kubernetes.io~downward-api, kubernetes.io~projected-volume}
// Note: From fsNotify docs - https://pkg.go.dev/github.com/fsnotify/fsnotify
// Watching individual files (rather than directories) is generally not
// recommended as many tools update files atomically. Instead of "just"
// writing to the file a temporary file will be written to first, and if
// successful the temporary file is moved to to destination removing the
// original, or some variant thereof. The watcher on the original file is
// now lost, as it no longer exists.
// Instead, watch the parent directory and use Event.Name to filter out files
// you're not interested in.
// Also fsNotify doesn't add watcher recursively. So we need to walk the root directory and add the required watches
f.Logger().Infof("watchDir: Add fsnotify watcher for dir (%s)", source)
watchList := f.watcher.WatchList()
for _, v := range watchList {
if v == source {
f.Logger().Infof("watchDir: Watcher for dir(%s) is already present", source)
return nil
}
}
err := f.watcher.Add(source)
if err != nil {
f.Logger().WithError(err).Error("watchDir: Failed to add watcher to list")
return err
}
return nil
}
func (f *FilesystemShare) StartFileEventWatcher(ctx context.Context) error {
// Start event loop if watchList is not empty
if (f.watcher == nil) || len(f.watcher.WatchList()) == 0 {
f.Logger().Info("StartFileEventWatcher: No watches found, returning")
return nil
}
// Regex for the temp directory with timestamp that is used to handle the updates by K8s
var re = regexp.MustCompile(`(?m)\s*[0-9]{4}_[0-9]{2}_[0-9]{2}_[0-9]{2}_[0-9]{2}_[0-9]{2}.[0-9]{10}$`)
f.Logger().Debugf("StartFileEventWatcher: srcDstMap dump %v", f.srcDstMap)
// This is the event loop to watch for fsNotify events and copy the contents to the guest
for {
select {
case event, ok := <-f.watcher.Events:
if !ok {
return fmt.Errorf("StartFileEventWatcher: Error in receiving events")
}
f.Logger().Infof("StartFileEventWatcher: got an event %s %s", event.Op, event.Name)
if event.Op&fsnotify.Remove == fsnotify.Remove {
// Ref: (kubernetes) pkg/volume/util/atomic_writer.go to understand the configmap/secrets update algo
//
// Write does an atomic projection of the given payload into the writer's target
// directory. Input paths must not begin with '..'.
//
// The Write algorithm is:
//
// 1. The payload is validated; if the payload is invalid, the function returns
// 2.  The current timestamped directory is detected by reading the data directory
// symlink
// 3. The old version of the volume is walked to determine whether any
// portion of the payload was deleted and is still present on disk.
// 4. The data in the current timestamped directory is compared to the projected
// data to determine if an update is required.
// 5.  A new timestamped dir is created
// 6. The payload is written to the new timestamped directory
// 7.  Symlinks and directory for new user-visible files are created (if needed).
//
// For example, consider the files:
// <target-dir>/podName
// <target-dir>/user/labels
// <target-dir>/k8s/annotations
//
// The user visible files are symbolic links into the internal data directory:
// <target-dir>/podName -> ..data/podName
// <target-dir>/usr -> ..data/usr
// <target-dir>/k8s -> ..data/k8s
//
// The data directory itself is a link to a timestamped directory with
// the real data:
// <target-dir>/..data -> ..2016_02_01_15_04_05.12345678/
// 8.  A symlink to the new timestamped directory ..data_tmp is created that will
// become the new data directory
// 9.  The new data directory symlink is renamed to the data directory; rename is atomic
// 10. Old paths are removed from the user-visible portion of the target directory
// 11.  The previous timestamped directory is removed, if it exists
// In this code, we are relying on the REMOVE event to initate a copy of the updated data.
// This ensures that the required data is updated and available for copying.
// For REMOVE event, the event.Name (source) will be of the form:
// /var/lib/kubelet/pods/<uid>/volumes/<k8s-special-dir>/foo/..2023_02_11_09_21_08.2202253910
// For example, the event.Name (source) for configmap update will like this:
// /var/lib/kubelet/pods/b44e3261-7cf0-48d3-83b4-6094bba95dc8/volumes/kubernetes.io~configmap/foo/..2023_02_11_09_21_08.2202253910
source := event.Name
f.Logger().Infof("StartFileEventWatcher: source for the event: %s", source)
if re.FindString(source) != "" {
// This block will be entered when the timestamped directory is removed.
// This also indicates that foo/..data contains the updated info
volumeDir := filepath.Dir(source)
f.Logger().Infof("StartFileEventWatcher: volumeDir (%s)", volumeDir)
// eg. volumDir = /var/lib/kubelet/pods/b44e3261-7cf0-48d3-83b4-6094bba95dc8/volumes/kubernetes.io~configmap/foo
dataDir := filepath.Join(volumeDir, "..data")
f.Logger().Infof("StartFileEventWatcher: dataDir (%s)", dataDir)
// eg. dataDir = /var/lib/kubelet/pods/b44e3261-7cf0-48d3-83b4-6094bba95dc8/volumes/kubernetes.io~configmap/foo/..data
destination := f.srcDstMap[dataDir]
f.Logger().Infof("StartFileEventWatcher: Copy file from src (%s) to dst (%s)", dataDir, destination)
err := f.copyFilesFromDataDir(dataDir, destination)
if err != nil {
f.Logger().Infof("StartFileEventWatcher: got an error (%v) when copying file from src (%s) to dst (%s)", err, dataDir, destination)
return err
}
}
}
case err, ok := <-f.watcher.Errors:
if !ok {
return fmt.Errorf("StartFileEventWatcher: Error (%v) in receiving error events", err)
}
f.Logger().Infof("StartFileEventWatcher: got an error event (%v)", err)
return err
case <-f.watcherDoneChannel:
f.Logger().Info("StartFileEventWatcher: watcher closed")
f.watcher.Close()
return nil
}
}
}
func (f *FilesystemShare) copyFilesFromDataDir(src, dst string) error {
// The src is a symlink and is of the following form:
// /var/lib/kubelet/pods/<uid>/volumes/<k8s-special-dir>/foo/..data
// eg, for configmap, src = /var/lib/kubelet/pods/b44e3261-7cf0-48d3-83b4-6094bba95dc8/volumes/kubernetes.io~configmap/foo/..data
// The dst is of the following form:
// /run/kata-containers/shared/containers/<cid>-<volume>/..data
// eg. dst = /run/kata-containers/shared/containers/e70739a6cc38daf15de916b4d22aad035d42bc977024f2c8cae6b0b607251d44-39407b03e4b448f1-config-volume/..data
// Get the symlink target
// eg. srcdir = ..2023_02_09_06_40_51.2326009790
srcdir, err := os.Readlink(src)
if err != nil {
f.Logger().Infof("copyFilesFromDataDir: Reading data symlink returned error (%v)", err)
return err
}
// Get the base directory path of src
volumeDir := filepath.Dir(src)
// eg. volumeDir = /var/lib/kubelet/pods/b44e3261-7cf0-48d3-83b4-6094bba95dc8/volumes/kubernetes.io~configmap/foo
dataDir := filepath.Join(volumeDir, srcdir)
// eg. dataDir = /var/lib/kubelet/pods/b44e3261-7cf0-48d3-83b4-6094bba95dc8/volumes/kubernetes.io~configmap/foo/..2023_02_09_06_40_51.2326009790
f.Logger().Infof("copyFilesFromDataDir: full path to data symlink (%s)", dataDir)
// Using WalkDir is more efficient than Walk
err = filepath.WalkDir(dataDir,
func(path string, d fs.DirEntry, err error) error {
if err != nil {
f.Logger().Infof("copyFilesFromDataDir: Error in file walk %v", err)
return err
}
// eg. path = /var/lib/kubelet/pods/b44e3261-7cf0-48d3-83b4-6094bba95dc8/volumes/kubernetes.io~configmap/foo/..2023_02_09_06_40_51.2326009790/{key1, key2, ...}
f.Logger().Infof("copyFilesFromDataDir: path (%s)", path)
if !d.IsDir() {
// Using filePath.Rel to handle these cases
// /var/lib/kubelet/pods/2481b69e-9ac8-475a-9e11-88af1daca60e/volumes/kubernetes.io~projected/all-in-one/..2023_02_13_12_35_49.1380323032/config-dir1/config.file1
// /var/lib/kubelet/pods/2481b69e-9ac8-475a-9e11-88af1daca60e/volumes/kubernetes.io~projected/all-in-one/..2023_02_13_12_35_49.1380323032/config.file2
rel, err := filepath.Rel(dataDir, path)
if err != nil {
f.Logger().Infof("copyFilesFromDataDir: Unable to get relative path")
return err
}
f.Logger().Debugf("copyFilesFromDataDir: dataDir(%s), path(%s), rel(%s)", dataDir, path, rel)
// Form the destination path in the guest
dstFile := filepath.Join(dst, rel)
f.Logger().Infof("copyFilesFromDataDir: Copying file %s to dst %s", path, dstFile)
err = f.sandbox.agent.copyFile(context.Background(), path, dstFile)
if err != nil {
f.Logger().Infof("copyFilesFromDataDir: Error in copying file %v", err)
return err
}
f.Logger().Infof("copyFilesFromDataDir: Successfully copied file (%s)", path)
}
return nil
})
if err != nil {
f.Logger().Infof("copyFilesFromDataDir: Error in filepath.WalkDir (%v)", err)
return err
}
f.Logger().Infof("copyFilesFromDataDir: Done")
return nil
}
func (f *FilesystemShare) StopFileEventWatcher(ctx context.Context) {
f.Logger().Info("StopFileEventWatcher: Closing watcher")
close(f.watcherDoneChannel)
}

View File

@@ -1257,6 +1257,7 @@ func (k *kataAgent) createContainer(ctx context.Context, sandbox *Sandbox, c *Co
sharedDirMounts := make(map[string]Mount)
ignoredMounts := make(map[string]Mount)
k.Logger().Info("mounting shared dir mounts")
shareStorages, err := c.mountSharedDirMounts(ctx, sharedDirMounts, ignoredMounts)
if err != nil {
return nil, err

View File

@@ -329,6 +329,7 @@ func (s *Sandbox) Release(ctx context.Context) error {
if s.monitor != nil {
s.monitor.stop()
}
s.fsShare.StopFileEventWatcher(ctx)
s.hypervisor.Disconnect(ctx)
return s.agent.disconnect(ctx)
}