@@ -14,9 +14,12 @@ import (
"io/fs"
"os"
"path/filepath"
"regexp"
"strings"
"sync"
"syscall"
"github.com/fsnotify/fsnotify"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
@@ -32,14 +35,25 @@ func unmountNoFollow(path string) error {
type FilesystemShare struct {
sandbox * Sandbox
watcher * fsnotify . Watcher
srcDstMap map [ string ] string
watcherDoneChannel chan bool
sync . Mutex
prepared bool
}
func NewFilesystemShare ( s * Sandbox ) ( FilesystemSharer , error ) {
watcher , err := fsnotify . NewWatcher ( )
if err != nil {
return nil , fmt . Errorf ( "Creating watcher returned error %w" , err )
}
return & FilesystemShare {
prepared : false ,
sandbox : s ,
watcherDoneChannel : make ( chan bool ) ,
srcDstMap : make ( map [ string ] string ) ,
watcher : watcher ,
} , nil
}
@@ -266,8 +280,46 @@ func (f *FilesystemShare) ShareFile(ctx context.Context, c *Container, m *Mount)
}
dstPath := filepath . Join ( guestPath , srcPath [ len ( srcRoot ) : ] )
f . Logger ( ) . Infof ( "ShareFile: Copying file from src (%s) to dest (%s)" , srcPath , dstPath )
//TODO: Improve the agent protocol, to handle the case for existing symlink.
// Currently for an existing symlink, this will fail with EEXIST.
err = f . sandbox . agent . copyFile ( ctx , srcPath , dstPath )
if err != nil {
f . Logger ( ) . WithError ( err ) . Error ( "Failed to copy file" )
return err
}
return f . sandbox . agent . copyFile ( ctx , srcPath , dstPath )
// Add fsNotify watcher for volume mounts
if strings . Contains ( srcPath , "kubernetes.io~configmap" ) ||
strings . Contains ( srcPath , "kubernetes.io~secrets" ) ||
strings . Contains ( srcPath , "kubernetes.io~projected" ) ||
strings . Contains ( srcPath , "kubernetes.io~downward-api" ) {
// fsNotify doesn't add watcher recursively.
// So we need to add the watcher for directories under kubernetes.io~configmap, kubernetes.io~secrets,
// kubernetes.io~downward-api and kubernetes.io~projected
if info . Mode ( ) . IsDir ( ) {
// The cm dir is of the form /var/lib/kubelet/pods/<uid>/volumes/kubernetes.io~configmap/foo/{..data, key1, key2,...}
// The secrets dir is of the form /var/lib/kubelet/pods/<uid>/volumes/kubernetes.io~secrets/foo/{..data, key1, key2,...}
// The projected dir is of the form /var/lib/kubelet/pods/<uid>/volumes/kubernetes.io~projected/foo/{..data, key1, key2,...}
// The downward-api dir is of the form /var/lib/kubelet/pods/<uid>/volumes/kubernetes.io~downward-api/foo/{..data, key1, key2,...}
f . Logger ( ) . Infof ( "ShareFile: srcPath(%s) is a directory" , srcPath )
err := f . watchDir ( srcPath )
if err != nil {
f . Logger ( ) . WithError ( err ) . Error ( "Failed to watch directory" )
return err
}
} else {
f . Logger ( ) . Infof ( "ShareFile: srcPath(%s) is not a directory" , srcPath )
}
// Add the source and destination to the global map which will be used by the event loop
// to copy the modified content to the destination
f . Logger ( ) . Infof ( "ShareFile: Adding srcPath(%s) dstPath(%s) to srcDstMap" , srcPath , dstPath )
f . srcDstMap [ srcPath ] = dstPath
}
return nil
}
if err := filepath . WalkDir ( srcRoot , walk ) ; err != nil {
@@ -507,3 +559,217 @@ func (f *FilesystemShare) UnshareRootFilesystem(ctx context.Context, c *Containe
return nil
}
func ( f * FilesystemShare ) watchDir ( source string ) error {
// Add a watcher for the configmap, secrets, projected-volumes and downwar-api directories
// /var/lib/kubelet/pods/<uid>/volumes/{kubernetes.io~configmap, kubernetes.io~secrets, kubernetes.io~downward-api, kubernetes.io~projected-volume}
// Note: From fsNotify docs - https://pkg.go.dev/github.com/fsnotify/fsnotify
// Watching individual files (rather than directories) is generally not
// recommended as many tools update files atomically. Instead of "just"
// writing to the file a temporary file will be written to first, and if
// successful the temporary file is moved to to destination removing the
// original, or some variant thereof. The watcher on the original file is
// now lost, as it no longer exists.
// Instead, watch the parent directory and use Event.Name to filter out files
// you're not interested in.
// Also fsNotify doesn't add watcher recursively. So we need to walk the root directory and add the required watches
f . Logger ( ) . Infof ( "watchDir: Add fsnotify watcher for dir (%s)" , source )
watchList := f . watcher . WatchList ( )
for _ , v := range watchList {
if v == source {
f . Logger ( ) . Infof ( "watchDir: Watcher for dir(%s) is already present" , source )
return nil
}
}
err := f . watcher . Add ( source )
if err != nil {
f . Logger ( ) . WithError ( err ) . Error ( "watchDir: Failed to add watcher to list" )
return err
}
return nil
}
func ( f * FilesystemShare ) StartFileEventWatcher ( ctx context . Context ) error {
// Start event loop if watchList is not empty
if ( f . watcher == nil ) || len ( f . watcher . WatchList ( ) ) == 0 {
f . Logger ( ) . Info ( "StartFileEventWatcher: No watches found, returning" )
return nil
}
// Regex for the temp directory with timestamp that is used to handle the updates by K8s
var re = regexp . MustCompile ( ` (?m)\s*[0-9] { 4}_[0-9] { 2}_[0-9] { 2}_[0-9] { 2}_[0-9] { 2}_[0-9] { 2}.[0-9] { 10}$ ` )
f . Logger ( ) . Debugf ( "StartFileEventWatcher: srcDstMap dump %v" , f . srcDstMap )
// This is the event loop to watch for fsNotify events and copy the contents to the guest
for {
select {
case event , ok := <- f . watcher . Events :
if ! ok {
return fmt . Errorf ( "StartFileEventWatcher: Error in receiving events" )
}
f . Logger ( ) . Infof ( "StartFileEventWatcher: got an event %s %s" , event . Op , event . Name )
if event . Op & fsnotify . Remove == fsnotify . Remove {
// Ref: (kubernetes) pkg/volume/util/atomic_writer.go to understand the configmap/secrets update algo
//
// Write does an atomic projection of the given payload into the writer's target
// directory. Input paths must not begin with '..'.
//
// The Write algorithm is:
//
// 1. The payload is validated; if the payload is invalid, the function returns
// 2. The current timestamped directory is detected by reading the data directory
// symlink
// 3. The old version of the volume is walked to determine whether any
// portion of the payload was deleted and is still present on disk.
// 4. The data in the current timestamped directory is compared to the projected
// data to determine if an update is required.
// 5. A new timestamped dir is created
// 6. The payload is written to the new timestamped directory
// 7. Symlinks and directory for new user-visible files are created (if needed).
//
// For example, consider the files:
// <target-dir>/podName
// <target-dir>/user/labels
// <target-dir>/k8s/annotations
//
// The user visible files are symbolic links into the internal data directory:
// <target-dir>/podName -> ..data/podName
// <target-dir>/usr -> ..data/usr
// <target-dir>/k8s -> ..data/k8s
//
// The data directory itself is a link to a timestamped directory with
// the real data:
// <target-dir>/..data -> ..2016_02_01_15_04_05.12345678/
// 8. A symlink to the new timestamped directory ..data_tmp is created that will
// become the new data directory
// 9. The new data directory symlink is renamed to the data directory; rename is atomic
// 10. Old paths are removed from the user-visible portion of the target directory
// 11. The previous timestamped directory is removed, if it exists
// In this code, we are relying on the REMOVE event to initate a copy of the updated data.
// This ensures that the required data is updated and available for copying.
// For REMOVE event, the event.Name (source) will be of the form:
// /var/lib/kubelet/pods/<uid>/volumes/<k8s-special-dir>/foo/..2023_02_11_09_21_08.2202253910
// For example, the event.Name (source) for configmap update will like this:
// /var/lib/kubelet/pods/b44e3261-7cf0-48d3-83b4-6094bba95dc8/volumes/kubernetes.io~configmap/foo/..2023_02_11_09_21_08.2202253910
source := event . Name
f . Logger ( ) . Infof ( "StartFileEventWatcher: source for the event: %s" , source )
if re . FindString ( source ) != "" {
// This block will be entered when the timestamped directory is removed.
// This also indicates that foo/..data contains the updated info
volumeDir := filepath . Dir ( source )
f . Logger ( ) . Infof ( "StartFileEventWatcher: volumeDir (%s)" , volumeDir )
// eg. volumDir = /var/lib/kubelet/pods/b44e3261-7cf0-48d3-83b4-6094bba95dc8/volumes/kubernetes.io~configmap/foo
dataDir := filepath . Join ( volumeDir , "..data" )
f . Logger ( ) . Infof ( "StartFileEventWatcher: dataDir (%s)" , dataDir )
// eg. dataDir = /var/lib/kubelet/pods/b44e3261-7cf0-48d3-83b4-6094bba95dc8/volumes/kubernetes.io~configmap/foo/..data
destination := f . srcDstMap [ dataDir ]
f . Logger ( ) . Infof ( "StartFileEventWatcher: Copy file from src (%s) to dst (%s)" , dataDir , destination )
err := f . copyFilesFromDataDir ( dataDir , destination )
if err != nil {
f . Logger ( ) . Infof ( "StartFileEventWatcher: got an error (%v) when copying file from src (%s) to dst (%s)" , err , dataDir , destination )
return err
}
}
}
case err , ok := <- f . watcher . Errors :
if ! ok {
return fmt . Errorf ( "StartFileEventWatcher: Error (%v) in receiving error events" , err )
}
f . Logger ( ) . Infof ( "StartFileEventWatcher: got an error event (%v)" , err )
return err
case <- f . watcherDoneChannel :
f . Logger ( ) . Info ( "StartFileEventWatcher: watcher closed" )
f . watcher . Close ( )
return nil
}
}
}
func ( f * FilesystemShare ) copyFilesFromDataDir ( src , dst string ) error {
// The src is a symlink and is of the following form:
// /var/lib/kubelet/pods/<uid>/volumes/<k8s-special-dir>/foo/..data
// eg, for configmap, src = /var/lib/kubelet/pods/b44e3261-7cf0-48d3-83b4-6094bba95dc8/volumes/kubernetes.io~configmap/foo/..data
// The dst is of the following form:
// /run/kata-containers/shared/containers/<cid>-<volume>/..data
// eg. dst = /run/kata-containers/shared/containers/e70739a6cc38daf15de916b4d22aad035d42bc977024f2c8cae6b0b607251d44-39407b03e4b448f1-config-volume/..data
// Get the symlink target
// eg. srcdir = ..2023_02_09_06_40_51.2326009790
srcdir , err := os . Readlink ( src )
if err != nil {
f . Logger ( ) . Infof ( "copyFilesFromDataDir: Reading data symlink returned error (%v)" , err )
return err
}
// Get the base directory path of src
volumeDir := filepath . Dir ( src )
// eg. volumeDir = /var/lib/kubelet/pods/b44e3261-7cf0-48d3-83b4-6094bba95dc8/volumes/kubernetes.io~configmap/foo
dataDir := filepath . Join ( volumeDir , srcdir )
// eg. dataDir = /var/lib/kubelet/pods/b44e3261-7cf0-48d3-83b4-6094bba95dc8/volumes/kubernetes.io~configmap/foo/..2023_02_09_06_40_51.2326009790
f . Logger ( ) . Infof ( "copyFilesFromDataDir: full path to data symlink (%s)" , dataDir )
// Using WalkDir is more efficient than Walk
err = filepath . WalkDir ( dataDir ,
func ( path string , d fs . DirEntry , err error ) error {
if err != nil {
f . Logger ( ) . Infof ( "copyFilesFromDataDir: Error in file walk %v" , err )
return err
}
// eg. path = /var/lib/kubelet/pods/b44e3261-7cf0-48d3-83b4-6094bba95dc8/volumes/kubernetes.io~configmap/foo/..2023_02_09_06_40_51.2326009790/{key1, key2, ...}
f . Logger ( ) . Infof ( "copyFilesFromDataDir: path (%s)" , path )
if ! d . IsDir ( ) {
// Using filePath.Rel to handle these cases
// /var/lib/kubelet/pods/2481b69e-9ac8-475a-9e11-88af1daca60e/volumes/kubernetes.io~projected/all-in-one/..2023_02_13_12_35_49.1380323032/config-dir1/config.file1
// /var/lib/kubelet/pods/2481b69e-9ac8-475a-9e11-88af1daca60e/volumes/kubernetes.io~projected/all-in-one/..2023_02_13_12_35_49.1380323032/config.file2
rel , err := filepath . Rel ( dataDir , path )
if err != nil {
f . Logger ( ) . Infof ( "copyFilesFromDataDir: Unable to get relative path" )
return err
}
f . Logger ( ) . Debugf ( "copyFilesFromDataDir: dataDir(%s), path(%s), rel(%s)" , dataDir , path , rel )
// Form the destination path in the guest
dstFile := filepath . Join ( dst , rel )
f . Logger ( ) . Infof ( "copyFilesFromDataDir: Copying file %s to dst %s" , path , dstFile )
err = f . sandbox . agent . copyFile ( context . Background ( ) , path , dstFile )
if err != nil {
f . Logger ( ) . Infof ( "copyFilesFromDataDir: Error in copying file %v" , err )
return err
}
f . Logger ( ) . Infof ( "copyFilesFromDataDir: Successfully copied file (%s)" , path )
}
return nil
} )
if err != nil {
f . Logger ( ) . Infof ( "copyFilesFromDataDir: Error in filepath.WalkDir (%v)" , err )
return err
}
f . Logger ( ) . Infof ( "copyFilesFromDataDir: Done" )
return nil
}
func ( f * FilesystemShare ) StopFileEventWatcher ( ctx context . Context ) {
f . Logger ( ) . Info ( "StopFileEventWatcher: Closing watcher" )
close ( f . watcherDoneChannel )
}