mirror of
https://github.com/kata-containers/kata-containers.git
synced 2025-08-07 11:04:12 +00:00
Merge pull request #7211 from stevenhorsman/propogate-secrets
Propogate secrets, config maps etc into guest if sharedFS not available
This commit is contained in:
commit
a57e7ffe14
@ -8,8 +8,9 @@ use rustjail::{pipestream::PipeStream, process::StreamType};
|
||||
use tokio::io::{AsyncReadExt, AsyncWriteExt, ReadHalf};
|
||||
use tokio::sync::Mutex;
|
||||
|
||||
use std::ffi::CString;
|
||||
use std::ffi::{CString, OsStr};
|
||||
use std::io;
|
||||
use std::os::unix::ffi::OsStrExt;
|
||||
use std::path::Path;
|
||||
use std::sync::Arc;
|
||||
use ttrpc::{
|
||||
@ -1856,6 +1857,38 @@ fn do_copy_file(req: &CopyFileRequest) -> Result<()> {
|
||||
}
|
||||
}
|
||||
|
||||
let sflag = stat::SFlag::from_bits_truncate(req.file_mode);
|
||||
|
||||
if sflag.contains(stat::SFlag::S_IFDIR) {
|
||||
fs::create_dir(&path).or_else(|e| {
|
||||
if e.kind() != std::io::ErrorKind::AlreadyExists {
|
||||
return Err(e);
|
||||
}
|
||||
Ok(())
|
||||
})?;
|
||||
|
||||
std::fs::set_permissions(&path, std::fs::Permissions::from_mode(req.file_mode))?;
|
||||
|
||||
unistd::chown(
|
||||
&path,
|
||||
Some(Uid::from_raw(req.uid as u32)),
|
||||
Some(Gid::from_raw(req.gid as u32)),
|
||||
)?;
|
||||
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
if sflag.contains(stat::SFlag::S_IFLNK) {
|
||||
let src = PathBuf::from(OsStr::from_bytes(&req.data));
|
||||
unistd::symlinkat(&src, None, &path)?;
|
||||
let path_str = CString::new(path.as_os_str().as_bytes())?;
|
||||
|
||||
let ret = unsafe { libc::lchown(path_str.as_ptr(), req.uid as u32, req.gid as u32) };
|
||||
Errno::result(ret).map(drop)?;
|
||||
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let mut tmpfile = path.clone();
|
||||
tmpfile.set_extension("tmp");
|
||||
|
||||
|
@ -132,10 +132,6 @@ default_maxmemory = @DEFMAXMEMSZ@
|
||||
# - virtio-fs (default)
|
||||
# - virtio-fs-nydus
|
||||
# - none
|
||||
# WARNING: "none" should be carefully used, and only used in very few specific cases, as
|
||||
# any update to the mount will *NOT* be reflected during the lifecycle of the pod, causing
|
||||
# issues with rotation of secrets, certs, or configurations via kubernetes objects like
|
||||
# configMaps or secrets, as those will be copied into the guest at *pod* *creation* *time*.
|
||||
shared_fs = "@DEFSHAREDFS_CLH_VIRTIOFS@"
|
||||
|
||||
# Path to vhost-user-fs daemon.
|
||||
|
@ -179,10 +179,6 @@ disable_block_device_use = @DEFDISABLEBLOCK@
|
||||
# - virtio-9p
|
||||
# - virtio-fs-nydus
|
||||
# - none
|
||||
# WARNING: "none" should be carefully used, and only used in very few specific cases, as
|
||||
# any update to the mount will *NOT* be reflected during the lifecycle of the pod, causing
|
||||
# issues with rotation of secrets, certs, or configurations via kubernetes objects like
|
||||
# configMaps or secrets, as those will be copied into the guest at *pod* *creation* *time*.
|
||||
shared_fs = "@DEFSHAREDFS_QEMU_VIRTIOFS@"
|
||||
|
||||
# Path to vhost-user-fs daemon.
|
||||
|
@ -165,10 +165,6 @@ disable_block_device_use = @DEFDISABLEBLOCK@
|
||||
# - virtio-9p
|
||||
# - virtio-fs-nydus
|
||||
# - none
|
||||
# WARNING: "none" should be carefully used, and only used in very few specific cases, as
|
||||
# any update to the mount will *NOT* be reflected during the lifecycle of the pod, causing
|
||||
# issues with rotation of secrets, certs, or configurations via kubernetes objects like
|
||||
# configMaps or secrets, as those will be copied into the guest at *pod* *creation* *time*.
|
||||
shared_fs = "@DEFSHAREDFS_QEMU_SEV_VIRTIOFS@"
|
||||
|
||||
# Path to vhost-user-fs daemon.
|
||||
|
@ -177,10 +177,6 @@ disable_block_device_use = @DEFDISABLEBLOCK@
|
||||
# - virtio-9p
|
||||
# - virtio-fs-nydus
|
||||
# - none
|
||||
# WARNING: "none" should be carefully used, and only used in very few specific cases, as
|
||||
# any update to the mount will *NOT* be reflected during the lifecycle of the pod, causing
|
||||
# issues with rotation of secrets, certs, or configurations via kubernetes objects like
|
||||
# configMaps or secrets, as those will be copied into the guest at *pod* *creation* *time*.
|
||||
shared_fs = "@DEFSHAREDFS_QEMU_SNP_VIRTIOFS@"
|
||||
|
||||
# Path to vhost-user-fs daemon.
|
||||
|
@ -173,10 +173,6 @@ disable_block_device_use = @DEFDISABLEBLOCK@
|
||||
# - virtio-9p
|
||||
# - virtio-fs-nydus
|
||||
# - none
|
||||
# WARNING: "none" should be carefully used, and only used in very few specific cases, as
|
||||
# any update to the mount will *NOT* be reflected during the lifecycle of the pod, causing
|
||||
# issues with rotation of secrets, certs, or configurations via kubernetes objects like
|
||||
# configMaps or secrets, as those will be copied into the guest at *pod* *creation* *time*.
|
||||
shared_fs = "@DEFSHAREDFS_QEMU_TDX_VIRTIOFS@"
|
||||
|
||||
# Path to vhost-user-fs daemon.
|
||||
|
@ -179,10 +179,6 @@ disable_block_device_use = @DEFDISABLEBLOCK@
|
||||
# - virtio-9p
|
||||
# - virtio-fs-nydus
|
||||
# - none
|
||||
# WARNING: "none" should be carefully used, and only used in very few specific cases, as
|
||||
# any update to the mount will *NOT* be reflected during the lifecycle of the pod, causing
|
||||
# issues with rotation of secrets, certs, or configurations via kubernetes objects like
|
||||
# configMaps or secrets, as those will be copied into the guest at *pod* *creation* *time*.
|
||||
shared_fs = "@DEFSHAREDFS_QEMU_VIRTIOFS@"
|
||||
|
||||
# Path to vhost-user-fs daemon.
|
||||
|
@ -74,4 +74,12 @@ type FilesystemSharer interface {
|
||||
// UnshareRootFilesystem stops sharing a container bundle
|
||||
// rootfs.
|
||||
UnshareRootFilesystem(context.Context, *Container) error
|
||||
|
||||
// startFileEventWatcher is the event loop to detect changes in
|
||||
// specific volumes - configmap, secrets, downward-api, projected-volumes
|
||||
// and copy the changes to the guest
|
||||
StartFileEventWatcher(context.Context) error
|
||||
|
||||
// Stops the event loop for file watcher
|
||||
StopFileEventWatcher(context.Context)
|
||||
}
|
||||
|
@ -56,3 +56,10 @@ func (f *FilesystemShare) ShareRootFilesystem(ctx context.Context, c *Container)
|
||||
func (f *FilesystemShare) UnshareRootFilesystem(ctx context.Context, c *Container) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *FilesystemShare) StartFileEventWatcher(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *FilesystemShare) StopFileEventWatcher(ctx context.Context) {
|
||||
}
|
||||
|
@ -11,11 +11,14 @@ import (
|
||||
"context"
|
||||
"encoding/hex"
|
||||
"fmt"
|
||||
"io/fs"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"regexp"
|
||||
"sync"
|
||||
"syscall"
|
||||
|
||||
"github.com/fsnotify/fsnotify"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/sirupsen/logrus"
|
||||
|
||||
@ -32,14 +35,29 @@ func unmountNoFollow(path string) error {
|
||||
|
||||
type FilesystemShare struct {
|
||||
sandbox *Sandbox
|
||||
watcher *fsnotify.Watcher
|
||||
// The same volume mount can be shared by multiple containers in the same sandbox (pod)
|
||||
srcDstMap map[string][]string
|
||||
srcDstMapLock sync.Mutex
|
||||
eventLoopStarted bool
|
||||
eventLoopStartedLock sync.Mutex
|
||||
watcherDoneChannel chan bool
|
||||
sync.Mutex
|
||||
prepared bool
|
||||
}
|
||||
|
||||
func NewFilesystemShare(s *Sandbox) (FilesystemSharer, error) {
|
||||
watcher, err := fsnotify.NewWatcher()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Creating watcher returned error %w", err)
|
||||
}
|
||||
|
||||
return &FilesystemShare{
|
||||
prepared: false,
|
||||
sandbox: s,
|
||||
prepared: false,
|
||||
sandbox: s,
|
||||
watcherDoneChannel: make(chan bool),
|
||||
srcDstMap: make(map[string][]string),
|
||||
watcher: watcher,
|
||||
}, nil
|
||||
}
|
||||
|
||||
@ -240,23 +258,95 @@ func (f *FilesystemShare) ShareFile(ctx context.Context, c *Container, m *Mount)
|
||||
if !caps.IsFsSharingSupported() {
|
||||
f.Logger().Debug("filesystem sharing is not supported, files will be copied")
|
||||
|
||||
fileInfo, err := os.Stat(m.Source)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
var ignored bool
|
||||
srcRoot := filepath.Clean(m.Source)
|
||||
|
||||
walk := func(srcPath string, d fs.DirEntry, err error) error {
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
info, err := d.Info()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if !(info.Mode().IsRegular() || info.Mode().IsDir() || (info.Mode()&os.ModeSymlink) == os.ModeSymlink) {
|
||||
f.Logger().WithField("ignored-file", srcPath).Debug("Ignoring file as FS sharing not supported")
|
||||
if srcPath == srcRoot {
|
||||
// Ignore the mount if this is not a regular file (excludes socket, device, ...) as it cannot be handled by
|
||||
// a simple copy. But this should not be treated as an error, only as a limitation.
|
||||
ignored = true
|
||||
return filepath.SkipDir
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
dstPath := filepath.Join(guestPath, srcPath[len(srcRoot):])
|
||||
f.Logger().Infof("ShareFile: Copying file from src (%s) to dest (%s)", srcPath, dstPath)
|
||||
//TODO: Improve the agent protocol, to handle the case for existing symlink.
|
||||
// Currently for an existing symlink, this will fail with EEXIST.
|
||||
err = f.sandbox.agent.copyFile(ctx, srcPath, dstPath)
|
||||
if err != nil {
|
||||
f.Logger().WithError(err).Error("Failed to copy file")
|
||||
return err
|
||||
}
|
||||
|
||||
// Add fsNotify watcher for volume mounts
|
||||
// Use regex for strict matching instead of strings.Contains
|
||||
// match for kubernetes.io~configmap, kubernetes.io~secret, kubernetes.io~projected, kubernetes.io~downward-api
|
||||
// as recommended in review comments for PR #7211
|
||||
|
||||
// Example directory structure for the volume mounts.
|
||||
// /var/lib/kubelet/pods/f51ae853-557e-4ce1-b60b-a1101b555612/volumes/kubernetes.io~configmap
|
||||
// /var/lib/kubelet/pods/f51ae853-557e-4ce1-b60b-a1101b555612/volumes/kubernetes.io~secret
|
||||
// /var/lib/kubelet/pods/f51ae853-557e-4ce1-b60b-a1101b555612/volumes/kubernetes.io~projected
|
||||
// /var/lib/kubelet/pods/f51ae853-557e-4ce1-b60b-a1101b555612/volumes/kubernetes.io~downward-api
|
||||
|
||||
// More relaxed regex for the pod UID
|
||||
// `^/var/lib/kubelet/pods/[a-fA-F0-9\-]+/volumes/kubernetes\.io~(configmap|secret|projected|downward-api)`
|
||||
|
||||
//TODO: Move this to a global variable and compile only once.
|
||||
regex := regexp.MustCompile(`^/var/lib/kubelet/pods/[a-fA-F0-9\-]{36}/volumes/kubernetes\.io~(configmap|secret|projected|downward-api)`)
|
||||
if regex.MatchString(srcPath) {
|
||||
// fsNotify doesn't add watcher recursively.
|
||||
// So we need to add the watcher for directories under kubernetes.io~configmap, kubernetes.io~secret,
|
||||
// kubernetes.io~downward-api and kubernetes.io~projected
|
||||
if info.Mode().IsDir() {
|
||||
// The cm dir is of the form /var/lib/kubelet/pods/<uid>/volumes/kubernetes.io~configmap/foo/{..data, key1, key2,...}
|
||||
// The secret dir is of the form /var/lib/kubelet/pods/<uid>/volumes/kubernetes.io~secret/foo/{..data, key1, key2,...}
|
||||
// The projected dir is of the form /var/lib/kubelet/pods/<uid>/volumes/kubernetes.io~projected/foo/{..data, key1, key2,...}
|
||||
// The downward-api dir is of the form /var/lib/kubelet/pods/<uid>/volumes/kubernetes.io~downward-api/foo/{..data, key1, key2,...}
|
||||
f.Logger().Infof("ShareFile: srcPath(%s) is a directory", srcPath)
|
||||
err := f.watchDir(srcPath)
|
||||
if err != nil {
|
||||
f.Logger().WithError(err).Error("Failed to watch directory")
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
f.Logger().Infof("ShareFile: srcPath(%s) is not a directory", srcPath)
|
||||
}
|
||||
// Add the source and destination to the global map which will be used by the event loop
|
||||
// to copy the modified content to the destination
|
||||
f.Logger().Infof("ShareFile: Adding srcPath(%s) dstPath(%s) to srcDstMap", srcPath, dstPath)
|
||||
// Lock the map before adding the entry
|
||||
f.srcDstMapLock.Lock()
|
||||
defer f.srcDstMapLock.Unlock()
|
||||
f.srcDstMap[srcPath] = append(f.srcDstMap[srcPath], dstPath)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Ignore the mount if this is not a regular file (excludes
|
||||
// directory, socket, device, ...) as it cannot be handled by
|
||||
// a simple copy. But this should not be treated as an error,
|
||||
// only as a limitation.
|
||||
if !fileInfo.Mode().IsRegular() {
|
||||
f.Logger().WithField("ignored-file", m.Source).Debug("Ignoring non-regular file as FS sharing not supported")
|
||||
if err := filepath.WalkDir(srcRoot, walk); err != nil {
|
||||
c.Logger().WithField("failed-file", m.Source).Debugf("failed to copy file to sandbox: %v", err)
|
||||
return nil, err
|
||||
}
|
||||
if ignored {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
if err := f.sandbox.agent.copyFile(ctx, m.Source, guestPath); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
} else {
|
||||
// These mounts are created in the shared dir
|
||||
mountDest := filepath.Join(getMountPath(f.sandbox.ID()), filename)
|
||||
@ -491,3 +581,236 @@ func (f *FilesystemShare) UnshareRootFilesystem(ctx context.Context, c *Containe
|
||||
return nil
|
||||
|
||||
}
|
||||
|
||||
func (f *FilesystemShare) watchDir(source string) error {
|
||||
|
||||
// Add a watcher for the configmap, secret, projected-volumes and downwar-api directories
|
||||
// /var/lib/kubelet/pods/<uid>/volumes/{kubernetes.io~configmap, kubernetes.io~secret, kubernetes.io~downward-api, kubernetes.io~projected-volume}
|
||||
|
||||
// Note: From fsNotify docs - https://pkg.go.dev/github.com/fsnotify/fsnotify
|
||||
// Watching individual files (rather than directories) is generally not
|
||||
// recommended as many tools update files atomically. Instead of "just"
|
||||
// writing to the file a temporary file will be written to first, and if
|
||||
// successful the temporary file is moved to to destination removing the
|
||||
// original, or some variant thereof. The watcher on the original file is
|
||||
// now lost, as it no longer exists.
|
||||
// Instead, watch the parent directory and use Event.Name to filter out files
|
||||
// you're not interested in.
|
||||
|
||||
// Also fsNotify doesn't add watcher recursively. So we need to walk the root directory and add the required watches
|
||||
|
||||
f.Logger().Infof("watchDir: Add fsnotify watcher for dir (%s)", source)
|
||||
watchList := f.watcher.WatchList()
|
||||
|
||||
for _, v := range watchList {
|
||||
if v == source {
|
||||
f.Logger().Infof("watchDir: Watcher for dir(%s) is already present", source)
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
err := f.watcher.Add(source)
|
||||
if err != nil {
|
||||
f.Logger().WithError(err).Error("watchDir: Failed to add watcher to list")
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
|
||||
}
|
||||
|
||||
func (f *FilesystemShare) StartFileEventWatcher(ctx context.Context) error {
|
||||
|
||||
// Acquire lock and check if eventLoopStarted
|
||||
// If not started set the event loop started flag
|
||||
f.eventLoopStartedLock.Lock()
|
||||
|
||||
// Check if the event loop is already started
|
||||
if f.eventLoopStarted {
|
||||
f.Logger().Info("StartFileEventWatcher: Event loop already started, returning")
|
||||
f.eventLoopStartedLock.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
f.Logger().Infof("StartFileEventWatcher: starting the event loop")
|
||||
|
||||
f.eventLoopStarted = true
|
||||
f.eventLoopStartedLock.Unlock()
|
||||
|
||||
// Regex for the temp directory with timestamp that is used to handle the updates by K8s
|
||||
// Examples
|
||||
// /var/lib/kubelet/pods/e33907eb-54c7-4113-a3dc-447f247084cc/volumes/kubernetes.io~secret/foosecret/..2023_07_27_07_13_00.1257228
|
||||
// /var/lib/kubelet/pods/e33907eb-54c7-4113-a3dc-447f247084cc/volumes/kubernetes.io~downward-api/fooinfo/..2023_07_27_07_13_00.3704578339
|
||||
// The timestamp is of the format 2023_07_27_07_13_00.3704578339 or 2023_07_27_07_13_00.1257228
|
||||
|
||||
var re = regexp.MustCompile(`(?m)\s*[0-9]{4}_[0-9]{2}_[0-9]{2}_[0-9]{2}_[0-9]{2}_[0-9]{2}.[0-9]*$`)
|
||||
|
||||
f.Logger().Debugf("StartFileEventWatcher: srcDstMap dump %v", f.srcDstMap)
|
||||
|
||||
for {
|
||||
select {
|
||||
case event, ok := <-f.watcher.Events:
|
||||
if !ok {
|
||||
return fmt.Errorf("StartFileEventWatcher: watcher events channel closed")
|
||||
}
|
||||
f.Logger().Infof("StartFileEventWatcher: got an event %s %s", event.Op, event.Name)
|
||||
if event.Op&fsnotify.Remove == fsnotify.Remove {
|
||||
// Ref: (kubernetes) pkg/volume/util/atomic_writer.go to understand the configmap/secret update algo
|
||||
//
|
||||
// Write does an atomic projection of the given payload into the writer's target
|
||||
// directory. Input paths must not begin with '..'.
|
||||
//
|
||||
// The Write algorithm is:
|
||||
//
|
||||
// 1. The payload is validated; if the payload is invalid, the function returns
|
||||
// 2. The current timestamped directory is detected by reading the data directory
|
||||
// symlink
|
||||
// 3. The old version of the volume is walked to determine whether any
|
||||
// portion of the payload was deleted and is still present on disk.
|
||||
// 4. The data in the current timestamped directory is compared to the projected
|
||||
// data to determine if an update is required.
|
||||
// 5. A new timestamped dir is created
|
||||
// 6. The payload is written to the new timestamped directory
|
||||
// 7. Symlinks and directory for new user-visible files are created (if needed).
|
||||
//
|
||||
// For example, consider the files:
|
||||
// <target-dir>/podName
|
||||
// <target-dir>/user/labels
|
||||
// <target-dir>/k8s/annotations
|
||||
//
|
||||
// The user visible files are symbolic links into the internal data directory:
|
||||
// <target-dir>/podName -> ..data/podName
|
||||
// <target-dir>/usr -> ..data/usr
|
||||
// <target-dir>/k8s -> ..data/k8s
|
||||
//
|
||||
// The data directory itself is a link to a timestamped directory with
|
||||
// the real data:
|
||||
// <target-dir>/..data -> ..2016_02_01_15_04_05.12345678/
|
||||
// 8. A symlink to the new timestamped directory ..data_tmp is created that will
|
||||
// become the new data directory
|
||||
// 9. The new data directory symlink is renamed to the data directory; rename is atomic
|
||||
// 10. Old paths are removed from the user-visible portion of the target directory
|
||||
// 11. The previous timestamped directory is removed, if it exists
|
||||
|
||||
// In this code, we are relying on the REMOVE event to initate a copy of the updated data.
|
||||
// This ensures that the required data is updated and available for copying.
|
||||
// For REMOVE event, the event.Name (source) will be of the form:
|
||||
// /var/lib/kubelet/pods/<uid>/volumes/<k8s-special-dir>/foo/..2023_02_11_09_21_08.2202253910
|
||||
// For example, the event.Name (source) for configmap update will like this:
|
||||
// /var/lib/kubelet/pods/b44e3261-7cf0-48d3-83b4-6094bba95dc8/volumes/kubernetes.io~configmap/foo/..2023_02_11_09_21_08.2202253910
|
||||
|
||||
source := event.Name
|
||||
f.Logger().Infof("StartFileEventWatcher: source for the event: %s", source)
|
||||
if re.FindString(source) != "" {
|
||||
// This block will be entered when the timestamped directory is removed.
|
||||
// This also indicates that foo/..data contains the updated info
|
||||
|
||||
volumeDir := filepath.Dir(source)
|
||||
f.Logger().Infof("StartFileEventWatcher: volumeDir (%s)", volumeDir)
|
||||
// eg. volumDir = /var/lib/kubelet/pods/b44e3261-7cf0-48d3-83b4-6094bba95dc8/volumes/kubernetes.io~configmap/foo
|
||||
|
||||
dataDir := filepath.Join(volumeDir, "..data")
|
||||
f.Logger().Infof("StartFileEventWatcher: dataDir (%s)", dataDir)
|
||||
// eg. dataDir = /var/lib/kubelet/pods/b44e3261-7cf0-48d3-83b4-6094bba95dc8/volumes/kubernetes.io~configmap/foo/..data
|
||||
|
||||
// Handle different destination for the same source
|
||||
// Acquire srcDstMapLock before reading srcDstMap
|
||||
f.srcDstMapLock.Lock()
|
||||
for _, destination := range f.srcDstMap[dataDir] {
|
||||
f.Logger().Infof("StartFileEventWatcher: Copy file from src (%s) to dst (%s)", dataDir, destination)
|
||||
// We explicitly ignore any errors here. Copy will continue for other files
|
||||
// Errors are logged in the copyFilesFromDataDir method
|
||||
_ = f.copyFilesFromDataDir(dataDir, destination)
|
||||
}
|
||||
f.srcDstMapLock.Unlock()
|
||||
}
|
||||
}
|
||||
case err, ok := <-f.watcher.Errors:
|
||||
if !ok {
|
||||
return fmt.Errorf("StartFileEventWatcher: watcher error channel closed")
|
||||
}
|
||||
// We continue explicitly here to avoid exiting the watcher loop
|
||||
f.Logger().Infof("StartFileEventWatcher: got an error event (%v)", err)
|
||||
continue
|
||||
case <-f.watcherDoneChannel:
|
||||
f.Logger().Info("StartFileEventWatcher: watcher closed")
|
||||
f.watcher.Close()
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (f *FilesystemShare) copyFilesFromDataDir(src, dst string) error {
|
||||
|
||||
// The src is a symlink and is of the following form:
|
||||
// /var/lib/kubelet/pods/<uid>/volumes/<k8s-special-dir>/foo/..data
|
||||
// eg, for configmap, src = /var/lib/kubelet/pods/b44e3261-7cf0-48d3-83b4-6094bba95dc8/volumes/kubernetes.io~configmap/foo/..data
|
||||
// The dst is of the following form:
|
||||
// /run/kata-containers/shared/containers/<cid>-<volume>/..data
|
||||
// eg. dst = /run/kata-containers/shared/containers/e70739a6cc38daf15de916b4d22aad035d42bc977024f2c8cae6b0b607251d44-39407b03e4b448f1-config-volume/..data
|
||||
|
||||
// Get the symlink target
|
||||
// eg. srcdir = ..2023_02_09_06_40_51.2326009790
|
||||
srcdir, err := os.Readlink(src)
|
||||
if err != nil {
|
||||
f.Logger().Infof("copyFilesFromDataDir: Reading data symlink returned error (%v)", err)
|
||||
return err
|
||||
}
|
||||
|
||||
// Get the base directory path of src
|
||||
volumeDir := filepath.Dir(src)
|
||||
// eg. volumeDir = /var/lib/kubelet/pods/b44e3261-7cf0-48d3-83b4-6094bba95dc8/volumes/kubernetes.io~configmap/foo
|
||||
|
||||
dataDir := filepath.Join(volumeDir, srcdir)
|
||||
// eg. dataDir = /var/lib/kubelet/pods/b44e3261-7cf0-48d3-83b4-6094bba95dc8/volumes/kubernetes.io~configmap/foo/..2023_02_09_06_40_51.2326009790
|
||||
|
||||
f.Logger().Infof("copyFilesFromDataDir: full path to data symlink (%s)", dataDir)
|
||||
|
||||
// Using WalkDir is more efficient than Walk
|
||||
err = filepath.WalkDir(dataDir,
|
||||
func(path string, d fs.DirEntry, err error) error {
|
||||
if err != nil {
|
||||
f.Logger().Infof("copyFilesFromDataDir: Error in file walk %v", err)
|
||||
return err
|
||||
}
|
||||
|
||||
// eg. path = /var/lib/kubelet/pods/b44e3261-7cf0-48d3-83b4-6094bba95dc8/volumes/kubernetes.io~configmap/foo/..2023_02_09_06_40_51.2326009790/{key1, key2, ...}
|
||||
f.Logger().Infof("copyFilesFromDataDir: path (%s)", path)
|
||||
if !d.IsDir() {
|
||||
// Using filePath.Rel to handle these cases
|
||||
// /var/lib/kubelet/pods/2481b69e-9ac8-475a-9e11-88af1daca60e/volumes/kubernetes.io~projected/all-in-one/..2023_02_13_12_35_49.1380323032/config-dir1/config.file1
|
||||
// /var/lib/kubelet/pods/2481b69e-9ac8-475a-9e11-88af1daca60e/volumes/kubernetes.io~projected/all-in-one/..2023_02_13_12_35_49.1380323032/config.file2
|
||||
rel, err := filepath.Rel(dataDir, path)
|
||||
if err != nil {
|
||||
f.Logger().Infof("copyFilesFromDataDir: Unable to get relative path")
|
||||
return err
|
||||
}
|
||||
f.Logger().Debugf("copyFilesFromDataDir: dataDir(%s), path(%s), rel(%s)", dataDir, path, rel)
|
||||
// Form the destination path in the guest
|
||||
dstFile := filepath.Join(dst, rel)
|
||||
f.Logger().Infof("copyFilesFromDataDir: Copying file %s to dst %s", path, dstFile)
|
||||
err = f.sandbox.agent.copyFile(context.Background(), path, dstFile)
|
||||
if err != nil {
|
||||
f.Logger().Infof("copyFilesFromDataDir: Error in copying file %v", err)
|
||||
return err
|
||||
}
|
||||
f.Logger().Infof("copyFilesFromDataDir: Successfully copied file (%s)", path)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
f.Logger().Infof("copyFilesFromDataDir: Error in filepath.WalkDir (%v)", err)
|
||||
return err
|
||||
}
|
||||
|
||||
f.Logger().Infof("copyFilesFromDataDir: Done")
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *FilesystemShare) StopFileEventWatcher(ctx context.Context) {
|
||||
|
||||
f.Logger().Info("StopFileEventWatcher: Closing watcher")
|
||||
close(f.watcherDoneChannel)
|
||||
|
||||
}
|
||||
|
@ -2197,40 +2197,57 @@ func (k *kataAgent) setGuestDateTime(ctx context.Context, tv time.Time) error {
|
||||
func (k *kataAgent) copyFile(ctx context.Context, src, dst string) error {
|
||||
var st unix.Stat_t
|
||||
|
||||
err := unix.Stat(src, &st)
|
||||
err := unix.Lstat(src, &st)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Could not get file %s information: %v", src, err)
|
||||
}
|
||||
|
||||
b, err := os.ReadFile(src)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Could not read file %s: %v", src, err)
|
||||
cpReq := &grpc.CopyFileRequest{
|
||||
Path: dst,
|
||||
DirMode: uint32(DirMode),
|
||||
FileMode: st.Mode,
|
||||
Uid: int32(st.Uid),
|
||||
Gid: int32(st.Gid),
|
||||
}
|
||||
|
||||
fileSize := int64(len(b))
|
||||
var b []byte
|
||||
|
||||
switch sflag := st.Mode & unix.S_IFMT; sflag {
|
||||
case unix.S_IFREG:
|
||||
var err error
|
||||
// TODO: Support incremental file copying instead of loading whole file into memory
|
||||
b, err = os.ReadFile(src)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Could not read file %s: %v", src, err)
|
||||
}
|
||||
cpReq.FileSize = int64(len(b))
|
||||
|
||||
case unix.S_IFDIR:
|
||||
|
||||
case unix.S_IFLNK:
|
||||
symlink, err := os.Readlink(src)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Could not read symlink %s: %v", src, err)
|
||||
}
|
||||
cpReq.Data = []byte(symlink)
|
||||
|
||||
default:
|
||||
return fmt.Errorf("Unsupported file type: %o", sflag)
|
||||
}
|
||||
|
||||
k.Logger().WithFields(logrus.Fields{
|
||||
"source": src,
|
||||
"dest": dst,
|
||||
}).Debugf("Copying file from host to guest")
|
||||
|
||||
cpReq := &grpc.CopyFileRequest{
|
||||
Path: dst,
|
||||
DirMode: uint32(DirMode),
|
||||
FileMode: uint32(st.Mode),
|
||||
FileSize: fileSize,
|
||||
Uid: int32(st.Uid),
|
||||
Gid: int32(st.Gid),
|
||||
}
|
||||
|
||||
// Handle the special case where the file is empty
|
||||
if fileSize == 0 {
|
||||
_, err = k.sendReq(ctx, cpReq)
|
||||
if cpReq.FileSize == 0 {
|
||||
_, err := k.sendReq(ctx, cpReq)
|
||||
return err
|
||||
}
|
||||
|
||||
// Copy file by parts if it's needed
|
||||
remainingBytes := fileSize
|
||||
remainingBytes := cpReq.FileSize
|
||||
offset := int64(0)
|
||||
for remainingBytes > 0 {
|
||||
bytesToCopy := int64(len(b))
|
||||
|
@ -993,6 +993,49 @@ func TestKataCopyFile(t *testing.T) {
|
||||
assert.NoError(err)
|
||||
}
|
||||
|
||||
func TestKataCopyFileWithSymlink(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
|
||||
url, err := mock.GenerateKataMockHybridVSock()
|
||||
assert.NoError(err)
|
||||
defer mock.RemoveKataMockHybridVSock(url)
|
||||
|
||||
hybridVSockTTRPCMock := mock.HybridVSockTTRPCMock{}
|
||||
err = hybridVSockTTRPCMock.Start(url)
|
||||
assert.NoError(err)
|
||||
defer hybridVSockTTRPCMock.Stop()
|
||||
|
||||
k := &kataAgent{
|
||||
ctx: context.Background(),
|
||||
state: KataAgentState{
|
||||
URL: url,
|
||||
},
|
||||
}
|
||||
|
||||
tempDir := t.TempDir()
|
||||
|
||||
target := filepath.Join(tempDir, "target")
|
||||
err = os.WriteFile(target, []byte("abcdefghi123456789"), 0666)
|
||||
assert.NoError(err)
|
||||
|
||||
symlink := filepath.Join(tempDir, "symlink")
|
||||
os.Symlink(target, symlink)
|
||||
|
||||
dst, err := os.CreateTemp("", "dst")
|
||||
assert.NoError(err)
|
||||
assert.NoError(dst.Close())
|
||||
defer os.Remove(dst.Name())
|
||||
|
||||
orgGrpcMaxDataSize := grpcMaxDataSize
|
||||
grpcMaxDataSize = 1
|
||||
defer func() {
|
||||
grpcMaxDataSize = orgGrpcMaxDataSize
|
||||
}()
|
||||
|
||||
err = k.copyFile(context.Background(), symlink, dst.Name())
|
||||
assert.NoError(err)
|
||||
}
|
||||
|
||||
func TestKataCleanupSandbox(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
|
||||
|
@ -331,6 +331,7 @@ func (s *Sandbox) Release(ctx context.Context) error {
|
||||
if s.monitor != nil {
|
||||
s.monitor.stop()
|
||||
}
|
||||
s.fsShare.StopFileEventWatcher(ctx)
|
||||
s.hypervisor.Disconnect(ctx)
|
||||
return s.agent.disconnect(ctx)
|
||||
}
|
||||
@ -613,6 +614,27 @@ func newSandbox(ctx context.Context, sandboxConfig SandboxConfig, factory Factor
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Start the event loop if not already started when fs sharing is not used
|
||||
if sandboxConfig.HypervisorConfig.SharedFS == config.NoSharedFS {
|
||||
// Start the StartFileEventWatcher method as a goroutine
|
||||
// to monitor the file events.
|
||||
go func() {
|
||||
if err := s.fsShare.StartFileEventWatcher(ctx); err != nil {
|
||||
s.Logger().WithError(err).Error("Failed to start file event watcher")
|
||||
return
|
||||
}
|
||||
}()
|
||||
|
||||
// Stop the file event watcher on error
|
||||
defer func() {
|
||||
if retErr != nil {
|
||||
s.Logger().WithError(retErr).Error("Stopping File Event Watcher")
|
||||
s.fsShare.StopFileEventWatcher(ctx)
|
||||
}
|
||||
}()
|
||||
|
||||
}
|
||||
|
||||
coldPlugVFIO, err := s.coldOrHotPlugVFIO(&sandboxConfig)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
Loading…
Reference in New Issue
Block a user