diff --git a/src/runtime/virtcontainers/fs_share.go b/src/runtime/virtcontainers/fs_share.go index fa8374c3b8..b5000291cc 100644 --- a/src/runtime/virtcontainers/fs_share.go +++ b/src/runtime/virtcontainers/fs_share.go @@ -74,4 +74,12 @@ type FilesystemSharer interface { // UnshareRootFilesystem stops sharing a container bundle // rootfs. UnshareRootFilesystem(context.Context, *Container) error + + // startFileEventWatcher is the event loop to detect changes in + // specific volumes - configmap, secrets, downward-api, projected-volumes + // and copy the changes to the guest + StartFileEventWatcher(context.Context) error + + // Stops the event loop for file watcher + StopFileEventWatcher(context.Context) } diff --git a/src/runtime/virtcontainers/fs_share_darwin.go b/src/runtime/virtcontainers/fs_share_darwin.go index 126ddfb638..5f4bbec17f 100644 --- a/src/runtime/virtcontainers/fs_share_darwin.go +++ b/src/runtime/virtcontainers/fs_share_darwin.go @@ -56,3 +56,10 @@ func (f *FilesystemShare) ShareRootFilesystem(ctx context.Context, c *Container) func (f *FilesystemShare) UnshareRootFilesystem(ctx context.Context, c *Container) error { return nil } + +func (f *FilesystemShare) StartFileEventWatcher(ctx context.Context) error { + return nil +} + +func (f *FilesystemShare) StopFileEventWatcher(ctx context.Context) { +} diff --git a/src/runtime/virtcontainers/fs_share_linux.go b/src/runtime/virtcontainers/fs_share_linux.go index cb5aff391e..5bafb9e403 100644 --- a/src/runtime/virtcontainers/fs_share_linux.go +++ b/src/runtime/virtcontainers/fs_share_linux.go @@ -14,9 +14,11 @@ import ( "io/fs" "os" "path/filepath" + "regexp" "sync" "syscall" + "github.com/fsnotify/fsnotify" "github.com/pkg/errors" "github.com/sirupsen/logrus" @@ -33,14 +35,29 @@ func unmountNoFollow(path string) error { type FilesystemShare struct { sandbox *Sandbox + watcher *fsnotify.Watcher + // The same volume mount can be shared by multiple containers in the same sandbox (pod) + srcDstMap map[string][]string + srcDstMapLock sync.Mutex + eventLoopStarted bool + eventLoopStartedLock sync.Mutex + watcherDoneChannel chan bool sync.Mutex prepared bool } func NewFilesystemShare(s *Sandbox) (FilesystemSharer, error) { + watcher, err := fsnotify.NewWatcher() + if err != nil { + return nil, fmt.Errorf("Creating watcher returned error %w", err) + } + return &FilesystemShare{ - prepared: false, - sandbox: s, + prepared: false, + sandbox: s, + watcherDoneChannel: make(chan bool), + srcDstMap: make(map[string][]string), + watcher: watcher, }, nil } @@ -267,8 +284,59 @@ func (f *FilesystemShare) ShareFile(ctx context.Context, c *Container, m *Mount) } dstPath := filepath.Join(guestPath, srcPath[len(srcRoot):]) + f.Logger().Infof("ShareFile: Copying file from src (%s) to dest (%s)", srcPath, dstPath) + //TODO: Improve the agent protocol, to handle the case for existing symlink. + // Currently for an existing symlink, this will fail with EEXIST. + err = f.sandbox.agent.copyFile(ctx, srcPath, dstPath) + if err != nil { + f.Logger().WithError(err).Error("Failed to copy file") + return err + } - return f.sandbox.agent.copyFile(ctx, srcPath, dstPath) + // Add fsNotify watcher for volume mounts + // Use regex for strict matching instead of strings.Contains + // match for kubernetes.io~configmap, kubernetes.io~secret, kubernetes.io~projected, kubernetes.io~downward-api + // as recommended in review comments for PR #7211 + + // Example directory structure for the volume mounts. + // /var/lib/kubelet/pods/f51ae853-557e-4ce1-b60b-a1101b555612/volumes/kubernetes.io~configmap + // /var/lib/kubelet/pods/f51ae853-557e-4ce1-b60b-a1101b555612/volumes/kubernetes.io~secret + // /var/lib/kubelet/pods/f51ae853-557e-4ce1-b60b-a1101b555612/volumes/kubernetes.io~projected + // /var/lib/kubelet/pods/f51ae853-557e-4ce1-b60b-a1101b555612/volumes/kubernetes.io~downward-api + + // More relaxed regex for the pod UID + // `^/var/lib/kubelet/pods/[a-fA-F0-9\-]+/volumes/kubernetes\.io~(configmap|secret|projected|downward-api)` + + //TODO: Move this to a global variable and compile only once. + regex := regexp.MustCompile(`^/var/lib/kubelet/pods/[a-fA-F0-9\-]{36}/volumes/kubernetes\.io~(configmap|secret|projected|downward-api)`) + if regex.MatchString(srcPath) { + // fsNotify doesn't add watcher recursively. + // So we need to add the watcher for directories under kubernetes.io~configmap, kubernetes.io~secret, + // kubernetes.io~downward-api and kubernetes.io~projected + if info.Mode().IsDir() { + // The cm dir is of the form /var/lib/kubelet/pods//volumes/kubernetes.io~configmap/foo/{..data, key1, key2,...} + // The secret dir is of the form /var/lib/kubelet/pods//volumes/kubernetes.io~secret/foo/{..data, key1, key2,...} + // The projected dir is of the form /var/lib/kubelet/pods//volumes/kubernetes.io~projected/foo/{..data, key1, key2,...} + // The downward-api dir is of the form /var/lib/kubelet/pods//volumes/kubernetes.io~downward-api/foo/{..data, key1, key2,...} + f.Logger().Infof("ShareFile: srcPath(%s) is a directory", srcPath) + err := f.watchDir(srcPath) + if err != nil { + f.Logger().WithError(err).Error("Failed to watch directory") + return err + } + } else { + f.Logger().Infof("ShareFile: srcPath(%s) is not a directory", srcPath) + } + // Add the source and destination to the global map which will be used by the event loop + // to copy the modified content to the destination + f.Logger().Infof("ShareFile: Adding srcPath(%s) dstPath(%s) to srcDstMap", srcPath, dstPath) + // Lock the map before adding the entry + f.srcDstMapLock.Lock() + defer f.srcDstMapLock.Unlock() + f.srcDstMap[srcPath] = append(f.srcDstMap[srcPath], dstPath) + } + + return nil } if err := filepath.WalkDir(srcRoot, walk); err != nil { @@ -278,6 +346,7 @@ func (f *FilesystemShare) ShareFile(ctx context.Context, c *Container, m *Mount) if ignored { return nil, nil } + } else { // These mounts are created in the shared dir mountDest := filepath.Join(getMountPath(f.sandbox.ID()), filename) @@ -512,3 +581,236 @@ func (f *FilesystemShare) UnshareRootFilesystem(ctx context.Context, c *Containe return nil } + +func (f *FilesystemShare) watchDir(source string) error { + + // Add a watcher for the configmap, secret, projected-volumes and downwar-api directories + // /var/lib/kubelet/pods//volumes/{kubernetes.io~configmap, kubernetes.io~secret, kubernetes.io~downward-api, kubernetes.io~projected-volume} + + // Note: From fsNotify docs - https://pkg.go.dev/github.com/fsnotify/fsnotify + // Watching individual files (rather than directories) is generally not + // recommended as many tools update files atomically. Instead of "just" + // writing to the file a temporary file will be written to first, and if + // successful the temporary file is moved to to destination removing the + // original, or some variant thereof. The watcher on the original file is + // now lost, as it no longer exists. + // Instead, watch the parent directory and use Event.Name to filter out files + // you're not interested in. + + // Also fsNotify doesn't add watcher recursively. So we need to walk the root directory and add the required watches + + f.Logger().Infof("watchDir: Add fsnotify watcher for dir (%s)", source) + watchList := f.watcher.WatchList() + + for _, v := range watchList { + if v == source { + f.Logger().Infof("watchDir: Watcher for dir(%s) is already present", source) + return nil + } + } + + err := f.watcher.Add(source) + if err != nil { + f.Logger().WithError(err).Error("watchDir: Failed to add watcher to list") + return err + } + + return nil + +} + +func (f *FilesystemShare) StartFileEventWatcher(ctx context.Context) error { + + // Acquire lock and check if eventLoopStarted + // If not started set the event loop started flag + f.eventLoopStartedLock.Lock() + + // Check if the event loop is already started + if f.eventLoopStarted { + f.Logger().Info("StartFileEventWatcher: Event loop already started, returning") + f.eventLoopStartedLock.Unlock() + return nil + } + + f.Logger().Infof("StartFileEventWatcher: starting the event loop") + + f.eventLoopStarted = true + f.eventLoopStartedLock.Unlock() + + // Regex for the temp directory with timestamp that is used to handle the updates by K8s + // Examples + // /var/lib/kubelet/pods/e33907eb-54c7-4113-a3dc-447f247084cc/volumes/kubernetes.io~secret/foosecret/..2023_07_27_07_13_00.1257228 + // /var/lib/kubelet/pods/e33907eb-54c7-4113-a3dc-447f247084cc/volumes/kubernetes.io~downward-api/fooinfo/..2023_07_27_07_13_00.3704578339 + // The timestamp is of the format 2023_07_27_07_13_00.3704578339 or 2023_07_27_07_13_00.1257228 + + var re = regexp.MustCompile(`(?m)\s*[0-9]{4}_[0-9]{2}_[0-9]{2}_[0-9]{2}_[0-9]{2}_[0-9]{2}.[0-9]*$`) + + f.Logger().Debugf("StartFileEventWatcher: srcDstMap dump %v", f.srcDstMap) + + for { + select { + case event, ok := <-f.watcher.Events: + if !ok { + return fmt.Errorf("StartFileEventWatcher: watcher events channel closed") + } + f.Logger().Infof("StartFileEventWatcher: got an event %s %s", event.Op, event.Name) + if event.Op&fsnotify.Remove == fsnotify.Remove { + // Ref: (kubernetes) pkg/volume/util/atomic_writer.go to understand the configmap/secret update algo + // + // Write does an atomic projection of the given payload into the writer's target + // directory. Input paths must not begin with '..'. + // + // The Write algorithm is: + // + // 1. The payload is validated; if the payload is invalid, the function returns + // 2.  The current timestamped directory is detected by reading the data directory + // symlink + // 3. The old version of the volume is walked to determine whether any + // portion of the payload was deleted and is still present on disk. + // 4. The data in the current timestamped directory is compared to the projected + // data to determine if an update is required. + // 5.  A new timestamped dir is created + // 6. The payload is written to the new timestamped directory + // 7.  Symlinks and directory for new user-visible files are created (if needed). + // + // For example, consider the files: + // /podName + // /user/labels + // /k8s/annotations + // + // The user visible files are symbolic links into the internal data directory: + // /podName -> ..data/podName + // /usr -> ..data/usr + // /k8s -> ..data/k8s + // + // The data directory itself is a link to a timestamped directory with + // the real data: + // /..data -> ..2016_02_01_15_04_05.12345678/ + // 8.  A symlink to the new timestamped directory ..data_tmp is created that will + // become the new data directory + // 9.  The new data directory symlink is renamed to the data directory; rename is atomic + // 10. Old paths are removed from the user-visible portion of the target directory + // 11.  The previous timestamped directory is removed, if it exists + + // In this code, we are relying on the REMOVE event to initate a copy of the updated data. + // This ensures that the required data is updated and available for copying. + // For REMOVE event, the event.Name (source) will be of the form: + // /var/lib/kubelet/pods//volumes//foo/..2023_02_11_09_21_08.2202253910 + // For example, the event.Name (source) for configmap update will like this: + // /var/lib/kubelet/pods/b44e3261-7cf0-48d3-83b4-6094bba95dc8/volumes/kubernetes.io~configmap/foo/..2023_02_11_09_21_08.2202253910 + + source := event.Name + f.Logger().Infof("StartFileEventWatcher: source for the event: %s", source) + if re.FindString(source) != "" { + // This block will be entered when the timestamped directory is removed. + // This also indicates that foo/..data contains the updated info + + volumeDir := filepath.Dir(source) + f.Logger().Infof("StartFileEventWatcher: volumeDir (%s)", volumeDir) + // eg. volumDir = /var/lib/kubelet/pods/b44e3261-7cf0-48d3-83b4-6094bba95dc8/volumes/kubernetes.io~configmap/foo + + dataDir := filepath.Join(volumeDir, "..data") + f.Logger().Infof("StartFileEventWatcher: dataDir (%s)", dataDir) + // eg. dataDir = /var/lib/kubelet/pods/b44e3261-7cf0-48d3-83b4-6094bba95dc8/volumes/kubernetes.io~configmap/foo/..data + + // Handle different destination for the same source + // Acquire srcDstMapLock before reading srcDstMap + f.srcDstMapLock.Lock() + for _, destination := range f.srcDstMap[dataDir] { + f.Logger().Infof("StartFileEventWatcher: Copy file from src (%s) to dst (%s)", dataDir, destination) + // We explicitly ignore any errors here. Copy will continue for other files + // Errors are logged in the copyFilesFromDataDir method + _ = f.copyFilesFromDataDir(dataDir, destination) + } + f.srcDstMapLock.Unlock() + } + } + case err, ok := <-f.watcher.Errors: + if !ok { + return fmt.Errorf("StartFileEventWatcher: watcher error channel closed") + } + // We continue explicitly here to avoid exiting the watcher loop + f.Logger().Infof("StartFileEventWatcher: got an error event (%v)", err) + continue + case <-f.watcherDoneChannel: + f.Logger().Info("StartFileEventWatcher: watcher closed") + f.watcher.Close() + return nil + } + } +} + +func (f *FilesystemShare) copyFilesFromDataDir(src, dst string) error { + + // The src is a symlink and is of the following form: + // /var/lib/kubelet/pods//volumes//foo/..data + // eg, for configmap, src = /var/lib/kubelet/pods/b44e3261-7cf0-48d3-83b4-6094bba95dc8/volumes/kubernetes.io~configmap/foo/..data + // The dst is of the following form: + // /run/kata-containers/shared/containers/-/..data + // eg. dst = /run/kata-containers/shared/containers/e70739a6cc38daf15de916b4d22aad035d42bc977024f2c8cae6b0b607251d44-39407b03e4b448f1-config-volume/..data + + // Get the symlink target + // eg. srcdir = ..2023_02_09_06_40_51.2326009790 + srcdir, err := os.Readlink(src) + if err != nil { + f.Logger().Infof("copyFilesFromDataDir: Reading data symlink returned error (%v)", err) + return err + } + + // Get the base directory path of src + volumeDir := filepath.Dir(src) + // eg. volumeDir = /var/lib/kubelet/pods/b44e3261-7cf0-48d3-83b4-6094bba95dc8/volumes/kubernetes.io~configmap/foo + + dataDir := filepath.Join(volumeDir, srcdir) + // eg. dataDir = /var/lib/kubelet/pods/b44e3261-7cf0-48d3-83b4-6094bba95dc8/volumes/kubernetes.io~configmap/foo/..2023_02_09_06_40_51.2326009790 + + f.Logger().Infof("copyFilesFromDataDir: full path to data symlink (%s)", dataDir) + + // Using WalkDir is more efficient than Walk + err = filepath.WalkDir(dataDir, + func(path string, d fs.DirEntry, err error) error { + if err != nil { + f.Logger().Infof("copyFilesFromDataDir: Error in file walk %v", err) + return err + } + + // eg. path = /var/lib/kubelet/pods/b44e3261-7cf0-48d3-83b4-6094bba95dc8/volumes/kubernetes.io~configmap/foo/..2023_02_09_06_40_51.2326009790/{key1, key2, ...} + f.Logger().Infof("copyFilesFromDataDir: path (%s)", path) + if !d.IsDir() { + // Using filePath.Rel to handle these cases + // /var/lib/kubelet/pods/2481b69e-9ac8-475a-9e11-88af1daca60e/volumes/kubernetes.io~projected/all-in-one/..2023_02_13_12_35_49.1380323032/config-dir1/config.file1 + // /var/lib/kubelet/pods/2481b69e-9ac8-475a-9e11-88af1daca60e/volumes/kubernetes.io~projected/all-in-one/..2023_02_13_12_35_49.1380323032/config.file2 + rel, err := filepath.Rel(dataDir, path) + if err != nil { + f.Logger().Infof("copyFilesFromDataDir: Unable to get relative path") + return err + } + f.Logger().Debugf("copyFilesFromDataDir: dataDir(%s), path(%s), rel(%s)", dataDir, path, rel) + // Form the destination path in the guest + dstFile := filepath.Join(dst, rel) + f.Logger().Infof("copyFilesFromDataDir: Copying file %s to dst %s", path, dstFile) + err = f.sandbox.agent.copyFile(context.Background(), path, dstFile) + if err != nil { + f.Logger().Infof("copyFilesFromDataDir: Error in copying file %v", err) + return err + } + f.Logger().Infof("copyFilesFromDataDir: Successfully copied file (%s)", path) + } + return nil + }) + + if err != nil { + f.Logger().Infof("copyFilesFromDataDir: Error in filepath.WalkDir (%v)", err) + return err + } + + f.Logger().Infof("copyFilesFromDataDir: Done") + return nil +} + +func (f *FilesystemShare) StopFileEventWatcher(ctx context.Context) { + + f.Logger().Info("StopFileEventWatcher: Closing watcher") + close(f.watcherDoneChannel) + +} diff --git a/src/runtime/virtcontainers/sandbox.go b/src/runtime/virtcontainers/sandbox.go index 3a3c275bb5..aba798a905 100644 --- a/src/runtime/virtcontainers/sandbox.go +++ b/src/runtime/virtcontainers/sandbox.go @@ -331,6 +331,7 @@ func (s *Sandbox) Release(ctx context.Context) error { if s.monitor != nil { s.monitor.stop() } + s.fsShare.StopFileEventWatcher(ctx) s.hypervisor.Disconnect(ctx) return s.agent.disconnect(ctx) } @@ -613,6 +614,27 @@ func newSandbox(ctx context.Context, sandboxConfig SandboxConfig, factory Factor return nil, err } + // Start the event loop if not already started when fs sharing is not used + if sandboxConfig.HypervisorConfig.SharedFS == config.NoSharedFS { + // Start the StartFileEventWatcher method as a goroutine + // to monitor the file events. + go func() { + if err := s.fsShare.StartFileEventWatcher(ctx); err != nil { + s.Logger().WithError(err).Error("Failed to start file event watcher") + return + } + }() + + // Stop the file event watcher on error + defer func() { + if retErr != nil { + s.Logger().WithError(retErr).Error("Stopping File Event Watcher") + s.fsShare.StopFileEventWatcher(ctx) + } + }() + + } + coldPlugVFIO, err := s.coldOrHotPlugVFIO(&sandboxConfig) if err != nil { return nil, err