Monitor progress tracking for permission change

This commit is contained in:
Hemant Kumar 2025-02-20 16:39:38 -05:00
parent 65321bf5ee
commit 49dabe56d0
9 changed files with 119 additions and 6 deletions

View File

@ -61,6 +61,7 @@ const (
VolumeResizeFailed = "VolumeResizeFailed"
VolumeResizeSuccess = "VolumeResizeSuccessful"
FileSystemResizeFailed = "FileSystemResizeFailed"
VolumePermissionChangeInProgress = "VolumePermissionChangeInProgress"
FileSystemResizeSuccess = "FileSystemResizeSuccessful"
FailedMapVolume = "FailedMapVolume"
WarnAlreadyMountedVolume = "AlreadyMountedVolume"

View File

@ -246,7 +246,8 @@ func (b *configMapVolumeMounter) SetUpAt(dir string, mounterArgs volume.MounterA
setPerms := func(_ string) error {
// This may be the first time writing and new files get created outside the timestamp subdirectory:
// change the permissions on the whole volume and not only in the timestamp directory.
return volume.SetVolumeOwnership(b, dir, mounterArgs.FsGroup, nil /*fsGroupChangePolicy*/, volumeutil.FSGroupCompleteHook(b.plugin, nil))
ownerShipChanger := volume.NewVolumeOwnership(b, dir, mounterArgs.FsGroup, nil /*fsGroupChangePolicy*/, volumeutil.FSGroupCompleteHook(b.plugin, nil))
return ownerShipChanger.ChangePermissions()
}
err = writer.Write(payload, setPerms)
if err != nil {

View File

@ -335,7 +335,9 @@ func (c *csiMountMgr) SetUpAt(dir string, mounterArgs volume.MounterArgs) error
// Driver doesn't support applying FSGroup. Kubelet must apply it instead.
// fullPluginName helps to distinguish different driver from csi plugin
err := volume.SetVolumeOwnership(c, dir, mounterArgs.FsGroup, mounterArgs.FSGroupChangePolicy, util.FSGroupCompleteHook(c.plugin, c.spec))
ownershipChanger := volume.NewVolumeOwnership(c, dir, mounterArgs.FsGroup, mounterArgs.FSGroupChangePolicy, util.FSGroupCompleteHook(c.plugin, c.spec))
ownershipChanger.AddProgressNotifier(c.pod, mounterArgs.Recorder)
err = ownershipChanger.ChangePermissions()
if err != nil {
// At this point mount operation is successful:
// 1. Since volume can not be used by the pod because of invalid permissions, we must return error

View File

@ -217,7 +217,8 @@ func (b *downwardAPIVolumeMounter) SetUpAt(dir string, mounterArgs volume.Mounte
setPerms := func(_ string) error {
// This may be the first time writing and new files get created outside the timestamp subdirectory:
// change the permissions on the whole volume and not only in the timestamp directory.
return volume.SetVolumeOwnership(b, dir, mounterArgs.FsGroup, nil /*fsGroupChangePolicy*/, volumeutil.FSGroupCompleteHook(b.plugin, nil))
ownershipChanger := volume.NewVolumeOwnership(b, dir, mounterArgs.FsGroup, nil /*fsGroupChangePolicy*/, volumeutil.FSGroupCompleteHook(b.plugin, nil))
return ownershipChanger.ChangePermissions()
}
err = writer.Write(data, setPerms)
if err != nil {

View File

@ -18,10 +18,11 @@ package emptydir
import (
"fmt"
"k8s.io/kubernetes/pkg/kubelet/util/swap"
"os"
"path/filepath"
"k8s.io/kubernetes/pkg/kubelet/util/swap"
"k8s.io/klog/v2"
"k8s.io/mount-utils"
utilstrings "k8s.io/utils/strings"
@ -278,7 +279,8 @@ func (ed *emptyDir) SetUpAt(dir string, mounterArgs volume.MounterArgs) error {
err = fmt.Errorf("unknown storage medium %q", ed.medium)
}
volume.SetVolumeOwnership(ed, dir, mounterArgs.FsGroup, nil /*fsGroupChangePolicy*/, volumeutil.FSGroupCompleteHook(ed.plugin, nil))
ownershipChanger := volume.NewVolumeOwnership(ed, dir, mounterArgs.FsGroup, nil /*fsGroupChangePolicy*/, volumeutil.FSGroupCompleteHook(ed.plugin, nil))
ownershipChanger.ChangePermissions()
// If setting up the quota fails, just log a message but don't actually error out.
// We'll use the old du mechanism in this case, at least until we support

View File

@ -91,7 +91,8 @@ func diskSetUp(manager diskManager, b fcDiskMounter, volPath string, mounter mou
}
if !b.readOnly {
volume.SetVolumeOwnership(&b, volPath, fsGroup, fsGroupChangePolicy, util.FSGroupCompleteHook(b.plugin, nil))
ownershipChanger := volume.NewVolumeOwnership(&b, volPath, fsGroup, fsGroupChangePolicy, util.FSGroupCompleteHook(b.plugin, nil))
ownershipChanger.ChangePermissions()
}
return nil

View File

@ -584,6 +584,7 @@ func (og *operationGenerator) GenerateMountVolumeFunc(
FsGroup: fsGroup,
DesiredSize: volumeToMount.DesiredSizeLimit,
FSGroupChangePolicy: fsGroupChangePolicy,
Recorder: og.recorder,
SELinuxLabel: volumeToMount.SELinuxLabel,
})
// Update actual state of world

View File

@ -23,6 +23,7 @@ import (
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
)
// Volume represents a directory used by pods or hosts on a node. All method
@ -130,6 +131,7 @@ type MounterArgs struct {
FSGroupChangePolicy *v1.PodFSGroupChangePolicy
DesiredSize *resource.Quantity
SELinuxLabel string
Recorder record.EventRecorder
}
// Mounter interface provides methods to set up/mount the volume.

View File

@ -20,14 +20,19 @@ limitations under the License.
package volume
import (
"context"
"fmt"
"path/filepath"
"sync/atomic"
"syscall"
"os"
"time"
v1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/kubelet/events"
"k8s.io/kubernetes/pkg/volume/util/types"
)
@ -35,8 +40,105 @@ const (
rwMask = os.FileMode(0660)
roMask = os.FileMode(0440)
execMask = os.FileMode(0110)
progressReportDuration = 60 * time.Second
)
type VolumeOwnership struct {
mounter Mounter
dir string
fsGroup *int64
fsGroupChangePolicy *v1.PodFSGroupChangePolicy
completionCallback func(types.CompleteFuncParam)
// for monitoring progress of permission change operation
pod *v1.Pod
fileCounter atomic.Int64
recorder record.EventRecorder
}
func NewVolumeOwnership(mounter Mounter, dir string, fsGroup *int64, fsGroupChangePolicy *v1.PodFSGroupChangePolicy, completeFunc func(types.CompleteFuncParam)) *VolumeOwnership {
vo := &VolumeOwnership{
mounter: mounter,
dir: dir,
fsGroup: fsGroup,
fsGroupChangePolicy: fsGroupChangePolicy,
completionCallback: completeFunc,
}
vo.fileCounter.Store(0)
return vo
}
func (vo *VolumeOwnership) AddProgressNotifier(pod *v1.Pod, recorder record.EventRecorder) *VolumeOwnership {
vo.pod = pod
vo.recorder = recorder
return vo
}
func (vo *VolumeOwnership) ChangePermissions() error {
if vo.fsGroup == nil {
return nil
}
if skipPermissionChange(vo.mounter, vo.dir, vo.fsGroup, vo.fsGroupChangePolicy) {
klog.V(3).InfoS("Skipping permission and ownership change for volume", "path", vo.dir)
return nil
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
timer := time.AfterFunc(30*time.Second, func() {
vo.initiateProgressMonitor(ctx)
})
defer timer.Stop()
return vo.changePermissionsRecursively()
}
func (vo *VolumeOwnership) initiateProgressMonitor(ctx context.Context) {
klog.Warningf("Setting volume ownership for %s and fsGroup set. If the volume has a lot of files then setting volume ownership could be slow, see https://github.com/kubernetes/kubernetes/issues/69699", vo.dir)
if vo.pod != nil {
go vo.monitorProgress(ctx)
}
}
func (vo *VolumeOwnership) changePermissionsRecursively() error {
err := walkDeep(vo.dir, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
vo.fileCounter.Add(1)
return changeFilePermission(path, vo.fsGroup, vo.mounter.GetAttributes().ReadOnly, info)
})
if vo.completionCallback != nil {
vo.completionCallback(types.CompleteFuncParam{
Err: &err,
})
}
return err
}
func (vo *VolumeOwnership) monitorProgress(ctx context.Context) {
ticker := time.NewTicker(progressReportDuration)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
vo.logWarning()
}
}
}
func (vo *VolumeOwnership) logWarning() {
msg := fmt.Sprintf("Setting volume ownership for %s, processed %d files", vo.dir, vo.fileCounter.Load())
klog.Warning(msg)
vo.recorder.Event(vo.pod, v1.EventTypeWarning, events.VolumePermissionChangeInProgress, msg)
}
// SetVolumeOwnership modifies the given volume to be owned by
// fsGroup, and sets SetGid so that newly created files are owned by
// fsGroup. If fsGroup is nil nothing is done.