From ff3e5e06a79bc69ad3d7ccedd277542b6712514b Mon Sep 17 00:00:00 2001 From: Patrick Ohly Date: Mon, 8 Jun 2020 10:31:38 +0200 Subject: [PATCH] GenericEphemeralVolume: initial implementation The implementation consists of - identifying all places where VolumeSource.PersistentVolumeClaim has a special meaning and then ensuring that the same code path is taken for an ephemeral volume, with the ownership check - adding a controller that produces the PVCs for each embedded VolumeSource.EphemeralVolume - relaxing the PVC protection controller such that it removes the finalizer already before the pod is deleted (only if the GenericEphemeralVolume feature is enabled): this is needed to break a cycle where foreground deletion of the pod blocks on removing the PVC, which waits for deletion of the pod The controller was derived from the endpointslices controller. --- cmd/kube-controller-manager/app/BUILD | 1 + .../app/controllermanager.go | 1 + cmd/kube-controller-manager/app/core.go | 18 ++ pkg/controller/BUILD | 1 + .../attachdetach/attach_detach_controller.go | 4 +- .../volume/attachdetach/metrics/metrics.go | 2 +- .../volume/attachdetach/util/util.go | 42 ++- pkg/controller/volume/common/BUILD | 2 + pkg/controller/volume/common/common.go | 41 ++- pkg/controller/volume/ephemeral/BUILD | 62 ++++ pkg/controller/volume/ephemeral/OWNERS | 6 + pkg/controller/volume/ephemeral/controller.go | 286 ++++++++++++++++++ .../volume/ephemeral/controller_test.go | 221 ++++++++++++++ pkg/controller/volume/ephemeral/doc.go | 21 ++ .../persistentvolume/pv_controller_base.go | 2 +- .../pvc_protection_controller.go | 77 ++++- .../pvc_protection_controller_test.go | 9 +- .../volume/scheduling/scheduler_binder.go | 27 +- .../desired_state_of_world_populator.go | 41 ++- .../desired_state_of_world_populator_test.go | 10 +- pkg/scheduler/core/BUILD | 3 + pkg/scheduler/core/generic_scheduler.go | 22 +- .../plugins/volumebinding/volume_binding.go | 13 +- plugin/pkg/auth/authorizer/node/graph.go | 10 +- .../rbac/bootstrappolicy/controller_policy.go | 11 + 25 files changed, 861 insertions(+), 72 deletions(-) create mode 100644 pkg/controller/volume/ephemeral/BUILD create mode 100644 pkg/controller/volume/ephemeral/OWNERS create mode 100644 pkg/controller/volume/ephemeral/controller.go create mode 100644 pkg/controller/volume/ephemeral/controller_test.go create mode 100644 pkg/controller/volume/ephemeral/doc.go diff --git a/cmd/kube-controller-manager/app/BUILD b/cmd/kube-controller-manager/app/BUILD index 14962b7bda0..7ca0218ba11 100644 --- a/cmd/kube-controller-manager/app/BUILD +++ b/cmd/kube-controller-manager/app/BUILD @@ -74,6 +74,7 @@ go_library( "//pkg/controller/ttl:go_default_library", "//pkg/controller/ttlafterfinished:go_default_library", "//pkg/controller/volume/attachdetach:go_default_library", + "//pkg/controller/volume/ephemeral:go_default_library", "//pkg/controller/volume/expand:go_default_library", "//pkg/controller/volume/persistentvolume:go_default_library", "//pkg/controller/volume/persistentvolume/config:go_default_library", diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index 6eb12fe70dc..83588a53550 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -423,6 +423,7 @@ func NewControllerInitializers(loopMode ControllerLoopMode) map[string]InitFunc controllers["pv-protection"] = startPVProtectionController controllers["ttl-after-finished"] = startTTLAfterFinishedController controllers["root-ca-cert-publisher"] = startRootCACertPublisher + controllers["ephemeral-volume"] = startEphemeralVolumeController return controllers } diff --git a/cmd/kube-controller-manager/app/core.go b/cmd/kube-controller-manager/app/core.go index c36a1069ee7..68e1be8d427 100644 --- a/cmd/kube-controller-manager/app/core.go +++ b/cmd/kube-controller-manager/app/core.go @@ -57,6 +57,7 @@ import ( ttlcontroller "k8s.io/kubernetes/pkg/controller/ttl" "k8s.io/kubernetes/pkg/controller/ttlafterfinished" "k8s.io/kubernetes/pkg/controller/volume/attachdetach" + "k8s.io/kubernetes/pkg/controller/volume/ephemeral" "k8s.io/kubernetes/pkg/controller/volume/expand" persistentvolumecontroller "k8s.io/kubernetes/pkg/controller/volume/persistentvolume" "k8s.io/kubernetes/pkg/controller/volume/pvcprotection" @@ -373,6 +374,22 @@ func startVolumeExpandController(ctx ControllerContext) (http.Handler, bool, err return nil, false, nil } +func startEphemeralVolumeController(ctx ControllerContext) (http.Handler, bool, error) { + if utilfeature.DefaultFeatureGate.Enabled(features.GenericEphemeralVolume) { + ephemeralController, err := ephemeral.NewController( + ctx.ClientBuilder.ClientOrDie("ephemeral-volume-controller"), + ctx.InformerFactory.Core().V1().Pods(), + ctx.InformerFactory.Core().V1().PersistentVolumeClaims()) + if err != nil { + return nil, true, fmt.Errorf("failed to start ephemeral volume controller: %v", err) + } + // TODO (before beta at the latest): make this configurable similar to the EndpointController + go ephemeralController.Run(1 /* int(ctx.ComponentConfig.EphemeralController.ConcurrentEphemeralVolumeSyncs) */, ctx.Stop) + return nil, true, nil + } + return nil, false, nil +} + func startEndpointController(ctx ControllerContext) (http.Handler, bool, error) { go endpointcontroller.NewEndpointController( ctx.InformerFactory.Core().V1().Pods(), @@ -539,6 +556,7 @@ func startPVCProtectionController(ctx ControllerContext) (http.Handler, bool, er ctx.InformerFactory.Core().V1().Pods(), ctx.ClientBuilder.ClientOrDie("pvc-protection-controller"), utilfeature.DefaultFeatureGate.Enabled(features.StorageObjectInUseProtection), + utilfeature.DefaultFeatureGate.Enabled(features.StorageObjectInUseProtection), ) if err != nil { return nil, true, fmt.Errorf("failed to start the pvc protection controller: %v", err) diff --git a/pkg/controller/BUILD b/pkg/controller/BUILD index b2adf479bad..c34e6c677cd 100644 --- a/pkg/controller/BUILD +++ b/pkg/controller/BUILD @@ -136,6 +136,7 @@ filegroup( "//pkg/controller/util/node:all-srcs", "//pkg/controller/volume/attachdetach:all-srcs", "//pkg/controller/volume/common:all-srcs", + "//pkg/controller/volume/ephemeral:all-srcs", "//pkg/controller/volume/events:all-srcs", "//pkg/controller/volume/expand:all-srcs", "//pkg/controller/volume/persistentvolume:all-srcs", diff --git a/pkg/controller/volume/attachdetach/attach_detach_controller.go b/pkg/controller/volume/attachdetach/attach_detach_controller.go index 2d85b283023..dfcfe7b50d9 100644 --- a/pkg/controller/volume/attachdetach/attach_detach_controller.go +++ b/pkg/controller/volume/attachdetach/attach_detach_controller.go @@ -203,7 +203,7 @@ func NewAttachDetachController( // This custom indexer will index pods by its PVC keys. Then we don't need // to iterate all pods every time to find pods which reference given PVC. - if err := common.AddIndexerIfNotPresent(adc.podIndexer, common.PodPVCIndex, common.PodPVCIndexFunc); err != nil { + if err := common.AddPodPVCIndexerIfNotPresent(adc.podIndexer); err != nil { return nil, fmt.Errorf("Could not initialize attach detach controller: %v", err) } @@ -425,7 +425,7 @@ func (adc *attachDetachController) populateDesiredStateOfWorld() error { // The volume specs present in the ActualStateOfWorld are nil, let's replace those // with the correct ones found on pods. The present in the ASW with no corresponding // pod will be detached and the spec is irrelevant. - volumeSpec, err := util.CreateVolumeSpec(podVolume, podToAdd.Namespace, nodeName, &adc.volumePluginMgr, adc.pvcLister, adc.pvLister, adc.csiMigratedPluginManager, adc.intreeToCSITranslator) + volumeSpec, err := util.CreateVolumeSpec(podVolume, podToAdd, nodeName, &adc.volumePluginMgr, adc.pvcLister, adc.pvLister, adc.csiMigratedPluginManager, adc.intreeToCSITranslator) if err != nil { klog.Errorf( "Error creating spec for volume %q, pod %q/%q: %v", diff --git a/pkg/controller/volume/attachdetach/metrics/metrics.go b/pkg/controller/volume/attachdetach/metrics/metrics.go index ab6bb39348a..a2dbed738f2 100644 --- a/pkg/controller/volume/attachdetach/metrics/metrics.go +++ b/pkg/controller/volume/attachdetach/metrics/metrics.go @@ -168,7 +168,7 @@ func (collector *attachDetachStateCollector) getVolumeInUseCount() volumeCount { continue } for _, podVolume := range pod.Spec.Volumes { - volumeSpec, err := util.CreateVolumeSpec(podVolume, pod.Namespace, types.NodeName(pod.Spec.NodeName), collector.volumePluginMgr, collector.pvcLister, collector.pvLister, collector.csiMigratedPluginManager, collector.intreeToCSITranslator) + volumeSpec, err := util.CreateVolumeSpec(podVolume, pod, types.NodeName(pod.Spec.NodeName), collector.volumePluginMgr, collector.pvcLister, collector.pvLister, collector.csiMigratedPluginManager, collector.intreeToCSITranslator) if err != nil { continue } diff --git a/pkg/controller/volume/attachdetach/util/util.go b/pkg/controller/volume/attachdetach/util/util.go index 5453bf8de80..389fbec77f6 100644 --- a/pkg/controller/volume/attachdetach/util/util.go +++ b/pkg/controller/volume/attachdetach/util/util.go @@ -39,39 +39,49 @@ import ( // A volume.Spec that refers to an in-tree plugin spec is translated to refer // to a migrated CSI plugin spec if all conditions for CSI migration on a node // for the in-tree plugin is satisfied. -func CreateVolumeSpec(podVolume v1.Volume, podNamespace string, nodeName types.NodeName, vpm *volume.VolumePluginMgr, pvcLister corelisters.PersistentVolumeClaimLister, pvLister corelisters.PersistentVolumeLister, csiMigratedPluginManager csimigration.PluginManager, csiTranslator csimigration.InTreeToCSITranslator) (*volume.Spec, error) { +func CreateVolumeSpec(podVolume v1.Volume, pod *v1.Pod, nodeName types.NodeName, vpm *volume.VolumePluginMgr, pvcLister corelisters.PersistentVolumeClaimLister, pvLister corelisters.PersistentVolumeLister, csiMigratedPluginManager csimigration.PluginManager, csiTranslator csimigration.InTreeToCSITranslator) (*volume.Spec, error) { + claimName := "" + readOnly := false if pvcSource := podVolume.VolumeSource.PersistentVolumeClaim; pvcSource != nil { + claimName = pvcSource.ClaimName + readOnly = pvcSource.ReadOnly + } + if ephemeralSource := podVolume.VolumeSource.Ephemeral; ephemeralSource != nil && utilfeature.DefaultFeatureGate.Enabled(features.GenericEphemeralVolume) { + claimName = pod.Name + "-" + podVolume.Name + readOnly = ephemeralSource.ReadOnly + } + if claimName != "" { klog.V(10).Infof( "Found PVC, ClaimName: %q/%q", - podNamespace, - pvcSource.ClaimName) + pod.Namespace, + claimName) // If podVolume is a PVC, fetch the real PV behind the claim pvName, pvcUID, err := getPVCFromCacheExtractPV( - podNamespace, pvcSource.ClaimName, pvcLister) + pod.Namespace, claimName, pvcLister) if err != nil { return nil, fmt.Errorf( "error processing PVC %q/%q: %v", - podNamespace, - pvcSource.ClaimName, + pod.Namespace, + claimName, err) } klog.V(10).Infof( "Found bound PV for PVC (ClaimName %q/%q pvcUID %v): pvName=%q", - podNamespace, - pvcSource.ClaimName, + pod.Namespace, + claimName, pvcUID, pvName) // Fetch actual PV object volumeSpec, err := getPVSpecFromCache( - pvName, pvcSource.ReadOnly, pvcUID, pvLister) + pvName, readOnly, pvcUID, pvLister) if err != nil { return nil, fmt.Errorf( "error processing PVC %q/%q: %v", - podNamespace, - pvcSource.ClaimName, + pod.Namespace, + claimName, err) } @@ -79,8 +89,8 @@ func CreateVolumeSpec(podVolume v1.Volume, podNamespace string, nodeName types.N if err != nil { return nil, fmt.Errorf( "error performing CSI migration checks and translation for PVC %q/%q: %v", - podNamespace, - pvcSource.ClaimName, + pod.Namespace, + claimName, err) } @@ -88,8 +98,8 @@ func CreateVolumeSpec(podVolume v1.Volume, podNamespace string, nodeName types.N "Extracted volumeSpec (%v) from bound PV (pvName %q) and PVC (ClaimName %q/%q pvcUID %v)", volumeSpec.Name(), pvName, - podNamespace, - pvcSource.ClaimName, + pod.Namespace, + claimName, pvcUID) return volumeSpec, nil @@ -219,7 +229,7 @@ func ProcessPodVolumes(pod *v1.Pod, addVolumes bool, desiredStateOfWorld cache.D // Process volume spec for each volume defined in pod for _, podVolume := range pod.Spec.Volumes { - volumeSpec, err := CreateVolumeSpec(podVolume, pod.Namespace, nodeName, volumePluginMgr, pvcLister, pvLister, csiMigratedPluginManager, csiTranslator) + volumeSpec, err := CreateVolumeSpec(podVolume, pod, nodeName, volumePluginMgr, pvcLister, pvLister, csiMigratedPluginManager, csiTranslator) if err != nil { klog.V(10).Infof( "Error processing volume %q for pod %q/%q: %v", diff --git a/pkg/controller/volume/common/BUILD b/pkg/controller/volume/common/BUILD index 73f3d01bb62..478e53fce3a 100644 --- a/pkg/controller/volume/common/BUILD +++ b/pkg/controller/volume/common/BUILD @@ -6,7 +6,9 @@ go_library( importpath = "k8s.io/kubernetes/pkg/controller/volume/common", visibility = ["//visibility:public"], deps = [ + "//pkg/features:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", "//staging/src/k8s.io/client-go/tools/cache:go_default_library", ], ) diff --git a/pkg/controller/volume/common/common.go b/pkg/controller/volume/common/common.go index bdd74de5300..1c07496deec 100644 --- a/pkg/controller/volume/common/common.go +++ b/pkg/controller/volume/common/common.go @@ -20,7 +20,9 @@ import ( "fmt" v1 "k8s.io/api/core/v1" + utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/tools/cache" + "k8s.io/kubernetes/pkg/features" ) const ( @@ -28,19 +30,36 @@ const ( PodPVCIndex = "pod-pvc-index" ) -// PodPVCIndexFunc returns PVC keys for given pod -func PodPVCIndexFunc(obj interface{}) ([]string, error) { - pod, ok := obj.(*v1.Pod) - if !ok { - return []string{}, nil - } - keys := []string{} - for _, podVolume := range pod.Spec.Volumes { - if pvcSource := podVolume.VolumeSource.PersistentVolumeClaim; pvcSource != nil { - keys = append(keys, fmt.Sprintf("%s/%s", pod.Namespace, pvcSource.ClaimName)) +// PodPVCIndexFunc creates an index function that returns PVC keys (= +// namespace/name) for given pod. If enabled, this includes the PVCs +// that might be created for generic ephemeral volumes. +func PodPVCIndexFunc(genericEphemeralVolumeFeatureEnabled bool) func(obj interface{}) ([]string, error) { + return func(obj interface{}) ([]string, error) { + pod, ok := obj.(*v1.Pod) + if !ok { + return []string{}, nil } + keys := []string{} + for _, podVolume := range pod.Spec.Volumes { + claimName := "" + if pvcSource := podVolume.VolumeSource.PersistentVolumeClaim; pvcSource != nil { + claimName = pvcSource.ClaimName + } + if ephemeralSource := podVolume.VolumeSource.Ephemeral; genericEphemeralVolumeFeatureEnabled && ephemeralSource != nil { + claimName = pod.Name + "-" + podVolume.Name + } + if claimName != "" { + keys = append(keys, fmt.Sprintf("%s/%s", pod.Namespace, claimName)) + } + } + return keys, nil } - return keys, nil +} + +// AddPodPVCIndexerIfNotPresent adds the PodPVCIndexFunc with the current global setting for GenericEphemeralVolume. +func AddPodPVCIndexerIfNotPresent(indexer cache.Indexer) error { + return AddIndexerIfNotPresent(indexer, PodPVCIndex, + PodPVCIndexFunc(utilfeature.DefaultFeatureGate.Enabled(features.GenericEphemeralVolume))) } // AddIndexerIfNotPresent adds the index function with the name into the cache indexer if not present diff --git a/pkg/controller/volume/ephemeral/BUILD b/pkg/controller/volume/ephemeral/BUILD new file mode 100644 index 00000000000..5d72daa0a62 --- /dev/null +++ b/pkg/controller/volume/ephemeral/BUILD @@ -0,0 +1,62 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "go_default_library", + srcs = [ + "controller.go", + "doc.go", + ], + importpath = "k8s.io/kubernetes/pkg/controller/volume/ephemeral", + visibility = ["//visibility:public"], + deps = [ + "//pkg/controller/volume/common:go_default_library", + "//pkg/controller/volume/events:go_default_library", + "//pkg/volume/util:go_default_library", + "//staging/src/k8s.io/api/core/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", + "//staging/src/k8s.io/client-go/informers/core/v1:go_default_library", + "//staging/src/k8s.io/client-go/kubernetes:go_default_library", + "//staging/src/k8s.io/client-go/kubernetes/scheme:go_default_library", + "//staging/src/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library", + "//staging/src/k8s.io/client-go/listers/core/v1:go_default_library", + "//staging/src/k8s.io/client-go/tools/cache:go_default_library", + "//staging/src/k8s.io/client-go/tools/record:go_default_library", + "//staging/src/k8s.io/client-go/util/workqueue:go_default_library", + "//vendor/k8s.io/klog/v2:go_default_library", + ], +) + +go_test( + name = "go_default_test", + srcs = ["controller_test.go"], + embed = [":go_default_library"], + deps = [ + "//pkg/controller:go_default_library", + "//staging/src/k8s.io/api/core/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", + "//staging/src/k8s.io/client-go/informers:go_default_library", + "//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library", + "//staging/src/k8s.io/client-go/tools/cache:go_default_library", + "//vendor/github.com/stretchr/testify/assert:go_default_library", + "//vendor/k8s.io/klog/v2:go_default_library", + ], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [":package-srcs"], + tags = ["automanaged"], + visibility = ["//visibility:public"], +) diff --git a/pkg/controller/volume/ephemeral/OWNERS b/pkg/controller/volume/ephemeral/OWNERS new file mode 100644 index 00000000000..8b52858362e --- /dev/null +++ b/pkg/controller/volume/ephemeral/OWNERS @@ -0,0 +1,6 @@ +# See the OWNERS docs at https://go.k8s.io/owners + +approvers: +- saad-ali +- jsafrane +- pohly diff --git a/pkg/controller/volume/ephemeral/controller.go b/pkg/controller/volume/ephemeral/controller.go new file mode 100644 index 00000000000..59d008a0fff --- /dev/null +++ b/pkg/controller/volume/ephemeral/controller.go @@ -0,0 +1,286 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package ephemeral + +import ( + "context" + "fmt" + "time" + + "k8s.io/klog/v2" + + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/wait" + coreinformers "k8s.io/client-go/informers/core/v1" + clientset "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/scheme" + v1core "k8s.io/client-go/kubernetes/typed/core/v1" + corelisters "k8s.io/client-go/listers/core/v1" + "k8s.io/client-go/tools/cache" + kcache "k8s.io/client-go/tools/cache" + "k8s.io/client-go/tools/record" + "k8s.io/client-go/util/workqueue" + "k8s.io/kubernetes/pkg/controller/volume/common" + "k8s.io/kubernetes/pkg/controller/volume/events" + "k8s.io/kubernetes/pkg/volume/util" +) + +// Controller creates PVCs for ephemeral inline volumes in a pod spec. +type Controller interface { + Run(workers int, stopCh <-chan struct{}) +} + +type ephemeralController struct { + // kubeClient is the kube API client used by volumehost to communicate with + // the API server. + kubeClient clientset.Interface + + // pvcLister is the shared PVC lister used to fetch and store PVC + // objects from the API server. It is shared with other controllers and + // therefore the PVC objects in its store should be treated as immutable. + pvcLister corelisters.PersistentVolumeClaimLister + pvcsSynced kcache.InformerSynced + + // podLister is the shared Pod lister used to fetch Pod + // objects from the API server. It is shared with other controllers and + // therefore the Pod objects in its store should be treated as immutable. + podLister corelisters.PodLister + podSynced kcache.InformerSynced + + // podIndexer has the common PodPVC indexer indexer installed To + // limit iteration over pods to those of interest. + podIndexer cache.Indexer + + // recorder is used to record events in the API server + recorder record.EventRecorder + + queue workqueue.RateLimitingInterface +} + +// NewController creates an ephemeral volume controller. +func NewController( + kubeClient clientset.Interface, + podInformer coreinformers.PodInformer, + pvcInformer coreinformers.PersistentVolumeClaimInformer) (Controller, error) { + + ec := &ephemeralController{ + kubeClient: kubeClient, + podLister: podInformer.Lister(), + podIndexer: podInformer.Informer().GetIndexer(), + podSynced: podInformer.Informer().HasSynced, + pvcLister: pvcInformer.Lister(), + pvcsSynced: pvcInformer.Informer().HasSynced, + queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "ephemeral_volume"), + } + + eventBroadcaster := record.NewBroadcaster() + eventBroadcaster.StartLogging(klog.Infof) + eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")}) + ec.recorder = eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "ephemeral_volume"}) + + podInformer.Informer().AddEventHandler(kcache.ResourceEventHandlerFuncs{ + AddFunc: ec.enqueuePod, + // The pod spec is immutable. Therefore the controller can ignore pod updates + // because there cannot be any changes that have to be copied into the generated + // PVC. + // Deletion of the PVC is handled through the owner reference and garbage collection. + // Therefore pod deletions also can be ignored. + }) + pvcInformer.Informer().AddEventHandler(kcache.ResourceEventHandlerFuncs{ + DeleteFunc: ec.onPVCDelete, + }) + if err := common.AddPodPVCIndexerIfNotPresent(ec.podIndexer); err != nil { + return nil, fmt.Errorf("Could not initialize pvc protection controller: %v", err) + } + + return ec, nil +} + +func (ec *ephemeralController) enqueuePod(obj interface{}) { + pod, ok := obj.(*v1.Pod) + if !ok { + return + } + + // Ignore pods which are already getting deleted. + if pod.DeletionTimestamp != nil { + return + } + + for _, vol := range pod.Spec.Volumes { + if vol.Ephemeral != nil { + // It has at least one ephemeral inline volume, work on it. + key, err := kcache.DeletionHandlingMetaNamespaceKeyFunc(pod) + if err != nil { + runtime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", pod, err)) + return + } + ec.queue.Add(key) + break + } + } +} + +func (ec *ephemeralController) onPVCDelete(obj interface{}) { + pvc, ok := obj.(*v1.PersistentVolumeClaim) + if !ok { + return + } + + // Someone deleted a PVC, either intentionally or + // accidentally. If there is a pod referencing it because of + // an ephemeral volume, then we should re-create the PVC. + // The common indexer does some prefiltering for us by + // limiting the list to those pods which reference + // the PVC. + objs, err := ec.podIndexer.ByIndex(common.PodPVCIndex, fmt.Sprintf("%s/%s", pvc.Namespace, pvc.Name)) + if err != nil { + runtime.HandleError(fmt.Errorf("listing pods from cache: %v", err)) + return + } + for _, obj := range objs { + ec.enqueuePod(obj) + } +} + +func (ec *ephemeralController) Run(workers int, stopCh <-chan struct{}) { + defer runtime.HandleCrash() + defer ec.queue.ShutDown() + + klog.Infof("Starting ephemeral volume controller") + defer klog.Infof("Shutting down ephemeral volume controller") + + if !cache.WaitForNamedCacheSync("ephemeral", stopCh, ec.podSynced, ec.pvcsSynced) { + return + } + + for i := 0; i < workers; i++ { + go wait.Until(ec.runWorker, time.Second, stopCh) + } + + <-stopCh +} + +func (ec *ephemeralController) runWorker() { + for ec.processNextWorkItem() { + } +} + +func (ec *ephemeralController) processNextWorkItem() bool { + key, shutdown := ec.queue.Get() + if shutdown { + return false + } + defer ec.queue.Done(key) + + err := ec.syncHandler(key.(string)) + if err == nil { + ec.queue.Forget(key) + return true + } + + runtime.HandleError(fmt.Errorf("%v failed with: %v", key, err)) + ec.queue.AddRateLimited(key) + + return true +} + +// syncHandler is invoked for each pod which might need to be processed. +// If an error is returned from this function, the pod will be requeued. +func (ec *ephemeralController) syncHandler(key string) error { + namespace, name, err := kcache.SplitMetaNamespaceKey(key) + if err != nil { + return err + } + pod, err := ec.podLister.Pods(namespace).Get(name) + if err != nil { + if errors.IsNotFound(err) { + klog.V(5).Infof("ephemeral: nothing to do for pod %s, it is gone", key) + return nil + } + klog.V(5).Infof("Error getting pod %s/%s (uid: %q) from informer : %v", pod.Namespace, pod.Name, pod.UID, err) + return err + } + + // Ignore pods which are already getting deleted. + if pod.DeletionTimestamp != nil { + klog.V(5).Infof("ephemeral: nothing to do for pod %s, it is marked for deletion", key) + return nil + } + + for _, vol := range pod.Spec.Volumes { + if err := ec.handleVolume(pod, vol); err != nil { + ec.recorder.Event(pod, v1.EventTypeWarning, events.FailedBinding, fmt.Sprintf("ephemeral volume %s: %v", vol.Name, err)) + return fmt.Errorf("pod %s, ephemeral volume %s: %v", key, vol.Name, err) + } + } + + return nil +} + +// handleEphemeralVolume is invoked for each volume of a pod. +func (ec *ephemeralController) handleVolume(pod *v1.Pod, vol v1.Volume) error { + klog.V(5).Infof("ephemeral: checking volume %s", vol.Name) + ephemeral := vol.Ephemeral + if ephemeral == nil { + return nil + } + + pvcName := pod.Name + "-" + vol.Name + pvc, err := ec.pvcLister.PersistentVolumeClaims(pod.Namespace).Get(pvcName) + if err != nil && !errors.IsNotFound(err) { + return err + } + if pvc != nil { + if metav1.IsControlledBy(pvc, pod) { + // Already created, nothing more to do. + klog.V(5).Infof("ephemeral: volume %s: PVC %s already created", vol.Name, pvcName) + return nil + } + return fmt.Errorf("PVC %q (uid: %q) was not created for the pod", + util.GetPersistentVolumeClaimQualifiedName(pvc), pvc.UID) + } + + // Create the PVC with pod as owner. + isTrue := true + pvc = &v1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Name: pvcName, + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: "v1", + Kind: "Pod", + Name: pod.Name, + UID: pod.UID, + Controller: &isTrue, + BlockOwnerDeletion: &isTrue, + }, + }, + Annotations: ephemeral.VolumeClaimTemplate.Annotations, + Labels: ephemeral.VolumeClaimTemplate.Labels, + }, + Spec: ephemeral.VolumeClaimTemplate.Spec, + } + _, err = ec.kubeClient.CoreV1().PersistentVolumeClaims(pod.Namespace).Create(context.TODO(), pvc, metav1.CreateOptions{}) + if err != nil { + return fmt.Errorf("create PVC %s: %v", pvcName, err) + } + return nil +} diff --git a/pkg/controller/volume/ephemeral/controller_test.go b/pkg/controller/volume/ephemeral/controller_test.go new file mode 100644 index 00000000000..370c50d2187 --- /dev/null +++ b/pkg/controller/volume/ephemeral/controller_test.go @@ -0,0 +1,221 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package ephemeral + +import ( + "context" + "sort" + "testing" + + "k8s.io/api/core/v1" + // storagev1 "k8s.io/api/storage/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + // "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes/fake" + "k8s.io/client-go/tools/cache" + kcache "k8s.io/client-go/tools/cache" + "k8s.io/klog/v2" + "k8s.io/kubernetes/pkg/controller" + + "github.com/stretchr/testify/assert" +) + +var ( + testPodName = "test-pod" + testNamespace = "my-namespace" + testPodUID = types.UID("uidpod1") + otherNamespace = "not-my-namespace" + ephemeralVolumeName = "ephemeral-volume" + + testPod = makePod(testPodName, testNamespace, testPodUID) + testPodWithEphemeral = makePod(testPodName, testNamespace, testPodUID, *makeEphemeralVolume(ephemeralVolumeName)) + testPodEphemeralClaim = makePVC(testPodName+"-"+ephemeralVolumeName, testNamespace, makeOwnerReference(testPodWithEphemeral, true)) + conflictingClaim = makePVC(testPodName+"-"+ephemeralVolumeName, testNamespace, nil) + otherNamespaceClaim = makePVC(testPodName+"-"+ephemeralVolumeName, otherNamespace, nil) +) + +func init() { + klog.InitFlags(nil) +} + +func TestSyncHandler(t *testing.T) { + tests := []struct { + name string + podKey string + pvcs []*v1.PersistentVolumeClaim + pods []*v1.Pod + expectedPVCs []v1.PersistentVolumeClaim + expectedError bool + }{ + { + name: "create", + pods: []*v1.Pod{testPodWithEphemeral}, + podKey: podKey(testPodWithEphemeral), + expectedPVCs: []v1.PersistentVolumeClaim{*testPodEphemeralClaim}, + }, + { + name: "no-such-pod", + podKey: podKey(testPodWithEphemeral), + }, + { + name: "pod-deleted", + pods: func() []*v1.Pod { + deleted := metav1.Now() + pods := []*v1.Pod{testPodWithEphemeral.DeepCopy()} + pods[0].DeletionTimestamp = &deleted + return pods + }(), + podKey: podKey(testPodWithEphemeral), + }, + { + name: "no-volumes", + pods: []*v1.Pod{testPod}, + podKey: podKey(testPod), + }, + { + name: "create-with-other-PVC", + pods: []*v1.Pod{testPodWithEphemeral}, + podKey: podKey(testPodWithEphemeral), + pvcs: []*v1.PersistentVolumeClaim{otherNamespaceClaim}, + expectedPVCs: []v1.PersistentVolumeClaim{*otherNamespaceClaim, *testPodEphemeralClaim}, + }, + { + name: "wrong-PVC-owner", + pods: []*v1.Pod{testPodWithEphemeral}, + podKey: podKey(testPodWithEphemeral), + pvcs: []*v1.PersistentVolumeClaim{conflictingClaim}, + expectedPVCs: []v1.PersistentVolumeClaim{*conflictingClaim}, + expectedError: true, + }, + } + + for _, tc := range tests { + // Run sequentially because of global logging. + t.Run(tc.name, func(t *testing.T) { + // There is no good way to shut down the informers. They spawn + // various goroutines and some of them (in particular shared informer) + // become very unhappy ("close on closed channel") when using a context + // that gets cancelled. Therefore we just keep everything running. + ctx := context.Background() + + var objects []runtime.Object + for _, pod := range tc.pods { + objects = append(objects, pod) + } + for _, pvc := range tc.pvcs { + objects = append(objects, pvc) + } + + fakeKubeClient := createTestClient(objects...) + informerFactory := informers.NewSharedInformerFactory(fakeKubeClient, controller.NoResyncPeriodFunc()) + podInformer := informerFactory.Core().V1().Pods() + pvcInformer := informerFactory.Core().V1().PersistentVolumeClaims() + + c, err := NewController(fakeKubeClient, podInformer, pvcInformer) + if err != nil { + t.Fatalf("error creating ephemeral controller : %v", err) + } + ec, _ := c.(*ephemeralController) + + // Ensure informers are up-to-date. + go informerFactory.Start(ctx.Done()) + informerFactory.WaitForCacheSync(ctx.Done()) + cache.WaitForCacheSync(ctx.Done(), podInformer.Informer().HasSynced, pvcInformer.Informer().HasSynced) + + err = ec.syncHandler(tc.podKey) + if err != nil && !tc.expectedError { + t.Fatalf("unexpected error while running handler: %v", err) + } + if err == nil && tc.expectedError { + t.Fatalf("unexpected success") + } + + pvcs, err := fakeKubeClient.CoreV1().PersistentVolumeClaims("").List(ctx, metav1.ListOptions{}) + if err != nil { + t.Fatalf("unexpected error while listing PVCs: %v", err) + } + assert.Equal(t, sortPVCs(tc.expectedPVCs), sortPVCs(pvcs.Items)) + }) + } +} + +func makePVC(name, namespace string, owner *metav1.OwnerReference) *v1.PersistentVolumeClaim { + pvc := &v1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{Name: name, Namespace: namespace}, + Spec: v1.PersistentVolumeClaimSpec{}, + } + if owner != nil { + pvc.OwnerReferences = []metav1.OwnerReference{*owner} + } + + return pvc +} + +func makeEphemeralVolume(name string) *v1.Volume { + return &v1.Volume{ + Name: name, + VolumeSource: v1.VolumeSource{ + Ephemeral: &v1.EphemeralVolumeSource{ + VolumeClaimTemplate: &v1.PersistentVolumeClaimTemplate{}, + }, + }, + } +} + +func makePod(name, namespace string, uid types.UID, volumes ...v1.Volume) *v1.Pod { + pvc := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: name, Namespace: namespace, UID: uid}, + Spec: v1.PodSpec{ + Volumes: volumes, + }, + } + + return pvc +} + +func podKey(pod *v1.Pod) string { + key, _ := kcache.DeletionHandlingMetaNamespaceKeyFunc(testPodWithEphemeral) + return key +} + +func makeOwnerReference(pod *v1.Pod, isController bool) *metav1.OwnerReference { + isTrue := true + return &metav1.OwnerReference{ + APIVersion: "v1", + Kind: "Pod", + Name: pod.Name, + UID: pod.UID, + Controller: &isController, + BlockOwnerDeletion: &isTrue, + } +} + +func sortPVCs(pvcs []v1.PersistentVolumeClaim) []v1.PersistentVolumeClaim { + sort.Slice(pvcs, func(i, j int) bool { + return pvcs[i].Namespace < pvcs[j].Namespace || + pvcs[i].Name < pvcs[j].Name + }) + return pvcs +} + +func createTestClient(objects ...runtime.Object) *fake.Clientset { + fakeClient := fake.NewSimpleClientset(objects...) + return fakeClient +} diff --git a/pkg/controller/volume/ephemeral/doc.go b/pkg/controller/volume/ephemeral/doc.go new file mode 100644 index 00000000000..ae45cbaad1d --- /dev/null +++ b/pkg/controller/volume/ephemeral/doc.go @@ -0,0 +1,21 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package ephemeral implements the controller part of +// https://github.com/kubernetes/enhancements/tree/master/keps/sig-storage/1698-generic-ephemeral-volumes +// +// It was derived from the expand controller. +package ephemeral diff --git a/pkg/controller/volume/persistentvolume/pv_controller_base.go b/pkg/controller/volume/persistentvolume/pv_controller_base.go index 0b2da23ed92..5747db35fd4 100644 --- a/pkg/controller/volume/persistentvolume/pv_controller_base.go +++ b/pkg/controller/volume/persistentvolume/pv_controller_base.go @@ -134,7 +134,7 @@ func NewController(p ControllerParameters) (*PersistentVolumeController, error) // This custom indexer will index pods by its PVC keys. Then we don't need // to iterate all pods every time to find pods which reference given PVC. - if err := common.AddIndexerIfNotPresent(controller.podIndexer, common.PodPVCIndex, common.PodPVCIndexFunc); err != nil { + if err := common.AddPodPVCIndexerIfNotPresent(controller.podIndexer); err != nil { return nil, fmt.Errorf("Could not initialize attach detach controller: %v", err) } diff --git a/pkg/controller/volume/pvcprotection/pvc_protection_controller.go b/pkg/controller/volume/pvcprotection/pvc_protection_controller.go index 1e6a2ee8a62..f3c9f2b4f72 100644 --- a/pkg/controller/volume/pvcprotection/pvc_protection_controller.go +++ b/pkg/controller/volume/pvcprotection/pvc_protection_controller.go @@ -55,14 +55,18 @@ type Controller struct { // allows overriding of StorageObjectInUseProtection feature Enabled/Disabled for testing storageObjectInUseProtectionEnabled bool + + // allows overriding of GenericEphemeralVolume feature Enabled/Disabled for testing + genericEphemeralVolumeFeatureEnabled bool } // NewPVCProtectionController returns a new instance of PVCProtectionController. -func NewPVCProtectionController(pvcInformer coreinformers.PersistentVolumeClaimInformer, podInformer coreinformers.PodInformer, cl clientset.Interface, storageObjectInUseProtectionFeatureEnabled bool) (*Controller, error) { +func NewPVCProtectionController(pvcInformer coreinformers.PersistentVolumeClaimInformer, podInformer coreinformers.PodInformer, cl clientset.Interface, storageObjectInUseProtectionFeatureEnabled, genericEphemeralVolumeFeatureEnabled bool) (*Controller, error) { e := &Controller{ - client: cl, - queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "pvcprotection"), - storageObjectInUseProtectionEnabled: storageObjectInUseProtectionFeatureEnabled, + client: cl, + queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "pvcprotection"), + storageObjectInUseProtectionEnabled: storageObjectInUseProtectionFeatureEnabled, + genericEphemeralVolumeFeatureEnabled: genericEphemeralVolumeFeatureEnabled, } if cl != nil && cl.CoreV1().RESTClient().GetRateLimiter() != nil { ratelimiter.RegisterMetricAndTrackRateLimiterUsage("persistentvolumeclaim_protection_controller", cl.CoreV1().RESTClient().GetRateLimiter()) @@ -80,7 +84,7 @@ func NewPVCProtectionController(pvcInformer coreinformers.PersistentVolumeClaimI e.podLister = podInformer.Lister() e.podListerSynced = podInformer.Informer().HasSynced e.podIndexer = podInformer.Informer().GetIndexer() - if err := common.AddIndexerIfNotPresent(e.podIndexer, common.PodPVCIndex, common.PodPVCIndexFunc); err != nil { + if err := common.AddIndexerIfNotPresent(e.podIndexer, common.PodPVCIndex, common.PodPVCIndexFunc(genericEphemeralVolumeFeatureEnabled)); err != nil { return nil, fmt.Errorf("Could not initialize pvc protection controller: %v", err) } podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ @@ -236,6 +240,7 @@ func (c *Controller) isBeingUsed(pvc *v1.PersistentVolumeClaim) (bool, error) { func (c *Controller) askInformer(pvc *v1.PersistentVolumeClaim) (bool, error) { klog.V(4).Infof("Looking for Pods using PVC %s/%s in the Informer's cache", pvc.Namespace, pvc.Name) + // The indexer is used to find pods which might use the PVC. objs, err := c.podIndexer.ByIndex(common.PodPVCIndex, fmt.Sprintf("%s/%s", pvc.Namespace, pvc.Name)) if err != nil { return false, fmt.Errorf("cache-based list of pods failed while processing %s/%s: %s", pvc.Namespace, pvc.Name, err.Error()) @@ -245,6 +250,19 @@ func (c *Controller) askInformer(pvc *v1.PersistentVolumeClaim) (bool, error) { if !ok { continue } + + if c.genericEphemeralVolumeFeatureEnabled { + // We still need to look at each volume: that's redundant for volume.PersistentVolumeClaim, + // but for volume.Ephemeral we need to be sure that this particular PVC is the one + // created for the ephemeral volume. + if c.podUsesPVC(pod, pvc) { + return true, nil + } + continue + + } + + // This is the traditional behavior without GenericEphemeralVolume enabled. if pod.Spec.NodeName == "" { continue } @@ -265,7 +283,7 @@ func (c *Controller) askAPIServer(pvc *v1.PersistentVolumeClaim) (bool, error) { } for _, pod := range podsList.Items { - if podUsesPVC(&pod, pvc.Name) { + if c.podUsesPVC(&pod, pvc) { return true, nil } } @@ -274,13 +292,14 @@ func (c *Controller) askAPIServer(pvc *v1.PersistentVolumeClaim) (bool, error) { return false, nil } -func podUsesPVC(pod *v1.Pod, pvc string) bool { +func (c *Controller) podUsesPVC(pod *v1.Pod, pvc *v1.PersistentVolumeClaim) bool { // Check whether pvc is used by pod only if pod is scheduled, because // kubelet sees pods after they have been scheduled and it won't allow // starting a pod referencing a PVC with a non-nil deletionTimestamp. if pod.Spec.NodeName != "" { for _, volume := range pod.Spec.Volumes { - if volume.PersistentVolumeClaim != nil && volume.PersistentVolumeClaim.ClaimName == pvc { + if volume.PersistentVolumeClaim != nil && volume.PersistentVolumeClaim.ClaimName == pvc.Name || + c.genericEphemeralVolumeFeatureEnabled && !podIsShutDown(pod) && volume.Ephemeral != nil && pod.Name+"-"+volume.Name == pvc.Name && metav1.IsControlledBy(pvc, pod) { klog.V(2).Infof("Pod %s/%s uses PVC %s", pod.Namespace, pod.Name, pvc) return true } @@ -289,6 +308,43 @@ func podUsesPVC(pod *v1.Pod, pvc string) bool { return false } +// podIsShutDown returns true if kubelet is done with the pod or +// it was force-deleted. +func podIsShutDown(pod *v1.Pod) bool { + // The following text is based on how pod shutdown was + // initially described to me. During PR review, it was pointed out + // that this is not correct: "deleteGracePeriodSeconds tells + // kubelet when it can start force terminating the + // containers. Volume teardown only starts after containers + // are termianted. So there is an additional time period after + // the grace period where volume teardown is happening." + // + // TODO (https://github.com/kubernetes/enhancements/issues/1698#issuecomment-655344680): + // investigate what kubelet really does and if necessary, + // add some other signal for "kubelet is done". For now the check + // is used only for ephemeral volumes, because it + // is needed to avoid the deadlock. + // + // A pod that has a deletionTimestamp and a zero + // deletionGracePeriodSeconds + // a) has been processed by kubelet and is ready for deletion or + // b) was force-deleted. + // + // It's now just waiting for garbage collection. We could wait + // for it to actually get removed, but that may be blocked by + // finalizers for the pod and thus get delayed. + // + // Worse, it is possible that there is a cyclic dependency + // (pod finalizer waits for PVC to get removed, PVC protection + // controller waits for pod to get removed). By considering + // the PVC unused in this case, we allow the PVC to get + // removed and break such a cycle. + // + // Therefore it is better to proceed with PVC removal, + // which is safe (case a) and/or desirable (case b). + return pod.DeletionTimestamp != nil && pod.DeletionGracePeriodSeconds != nil && *pod.DeletionGracePeriodSeconds == 0 +} + // pvcAddedUpdated reacts to pvc added/updated events func (c *Controller) pvcAddedUpdated(obj interface{}) { pvc, ok := obj.(*v1.PersistentVolumeClaim) @@ -354,8 +410,11 @@ func (c *Controller) enqueuePVCs(pod *v1.Pod, deleted bool) { // Enqueue all PVCs that the pod uses for _, volume := range pod.Spec.Volumes { - if volume.PersistentVolumeClaim != nil { + switch { + case volume.PersistentVolumeClaim != nil: c.queue.Add(pod.Namespace + "/" + volume.PersistentVolumeClaim.ClaimName) + case c.genericEphemeralVolumeFeatureEnabled && volume.Ephemeral != nil: + c.queue.Add(pod.Namespace + "/" + pod.Name + "-" + volume.Name) } } } diff --git a/pkg/controller/volume/pvcprotection/pvc_protection_controller_test.go b/pkg/controller/volume/pvcprotection/pvc_protection_controller_test.go index 79ba11ed91a..e49c7417cb0 100644 --- a/pkg/controller/volume/pvcprotection/pvc_protection_controller_test.go +++ b/pkg/controller/volume/pvcprotection/pvc_protection_controller_test.go @@ -146,7 +146,7 @@ func generateUpdateErrorFunc(t *testing.T, failures int) clienttesting.ReactionF } } -func TestPVCProtectionController(t *testing.T) { +func testPVCProtectionController(t *testing.T, genericEphemeralVolumeFeatureEnabled bool) { pvcGVR := schema.GroupVersionResource{ Group: v1.GroupName, Version: "v1", @@ -430,7 +430,7 @@ func TestPVCProtectionController(t *testing.T) { podInformer := informers.Core().V1().Pods() // Create the controller - ctrl, err := NewPVCProtectionController(pvcInformer, podInformer, client, test.storageObjectInUseProtectionEnabled) + ctrl, err := NewPVCProtectionController(pvcInformer, podInformer, client, test.storageObjectInUseProtectionEnabled, genericEphemeralVolumeFeatureEnabled) if err != nil { t.Fatalf("unexpected error: %v", err) } @@ -518,3 +518,8 @@ func TestPVCProtectionController(t *testing.T) { } } + +func TestPVCProtectionController(t *testing.T) { + t.Run("with-GenericEphemeralVolume", func(t *testing.T) { testPVCProtectionController(t, true) }) + t.Run("without-GenericEphemeralVolume", func(t *testing.T) { testPVCProtectionController(t, false) }) +} diff --git a/pkg/controller/volume/scheduling/scheduler_binder.go b/pkg/controller/volume/scheduling/scheduler_binder.go index 5c3930614fd..10c4caccc13 100644 --- a/pkg/controller/volume/scheduling/scheduler_binder.go +++ b/pkg/controller/volume/scheduling/scheduler_binder.go @@ -661,13 +661,28 @@ func (b *volumeBinder) checkBindings(pod *v1.Pod, bindings []*BindingInfo, claim return true, nil } -func (b *volumeBinder) isVolumeBound(namespace string, vol *v1.Volume) (bool, *v1.PersistentVolumeClaim, error) { - if vol.PersistentVolumeClaim == nil { +func (b *volumeBinder) isVolumeBound(pod *v1.Pod, vol *v1.Volume) (bound bool, pvc *v1.PersistentVolumeClaim, err error) { + pvcName := "" + ephemeral := false + switch { + case vol.PersistentVolumeClaim != nil: + pvcName = vol.PersistentVolumeClaim.ClaimName + case vol.Ephemeral != nil && + utilfeature.DefaultFeatureGate.Enabled(features.GenericEphemeralVolume): + // Generic ephemeral inline volumes also use a PVC, + // just with a computed name, and... + pvcName = pod.Name + "-" + vol.Name + ephemeral = true + default: return true, nil, nil } - pvcName := vol.PersistentVolumeClaim.ClaimName - return b.isPVCBound(namespace, pvcName) + bound, pvc, err = b.isPVCBound(pod.Namespace, pvcName) + // ... the PVC must be owned by the pod. + if ephemeral && err == nil && pvc != nil && !metav1.IsControlledBy(pvc, pod) { + return false, nil, fmt.Errorf("PVC %s/%s is not owned by pod", pod.Namespace, pvcName) + } + return } func (b *volumeBinder) isPVCBound(namespace, pvcName string) (bool, *v1.PersistentVolumeClaim, error) { @@ -703,7 +718,7 @@ func (b *volumeBinder) isPVCFullyBound(pvc *v1.PersistentVolumeClaim) bool { // arePodVolumesBound returns true if all volumes are fully bound func (b *volumeBinder) arePodVolumesBound(pod *v1.Pod) bool { for _, vol := range pod.Spec.Volumes { - if isBound, _, _ := b.isVolumeBound(pod.Namespace, &vol); !isBound { + if isBound, _, _ := b.isVolumeBound(pod, &vol); !isBound { // Pod has at least one PVC that needs binding return false } @@ -719,7 +734,7 @@ func (b *volumeBinder) GetPodVolumes(pod *v1.Pod) (boundClaims []*v1.PersistentV unboundClaimsDelayBinding = []*v1.PersistentVolumeClaim{} for _, vol := range pod.Spec.Volumes { - volumeBound, pvc, err := b.isVolumeBound(pod.Namespace, &vol) + volumeBound, pvc, err := b.isVolumeBound(pod, &vol) if err != nil { return nil, nil, nil, err } diff --git a/pkg/kubelet/volumemanager/populator/desired_state_of_world_populator.go b/pkg/kubelet/volumemanager/populator/desired_state_of_world_populator.go index 30bbf34d148..9df3ea9cc0c 100644 --- a/pkg/kubelet/volumemanager/populator/desired_state_of_world_populator.go +++ b/pkg/kubelet/volumemanager/populator/desired_state_of_world_populator.go @@ -316,7 +316,7 @@ func (dswp *desiredStateOfWorldPopulator) processPodVolumes( } pvc, volumeSpec, volumeGidValue, err := - dswp.createVolumeSpec(podVolume, pod.Name, pod.Namespace, mounts, devices) + dswp.createVolumeSpec(podVolume, pod, mounts, devices) if err != nil { klog.Errorf( "Error processing volume %q for pod %q: %v", @@ -491,29 +491,50 @@ func (dswp *desiredStateOfWorldPopulator) deleteProcessedPod( // specified volume. It dereference any PVC to get PV objects, if needed. // Returns an error if unable to obtain the volume at this time. func (dswp *desiredStateOfWorldPopulator) createVolumeSpec( - podVolume v1.Volume, podName string, podNamespace string, mounts, devices sets.String) (*v1.PersistentVolumeClaim, *volume.Spec, string, error) { - if pvcSource := - podVolume.VolumeSource.PersistentVolumeClaim; pvcSource != nil { + podVolume v1.Volume, pod *v1.Pod, mounts, devices sets.String) (*v1.PersistentVolumeClaim, *volume.Spec, string, error) { + pvcSource := podVolume.VolumeSource.PersistentVolumeClaim + ephemeral := false + if pvcSource == nil && + podVolume.VolumeSource.Ephemeral != nil && + utilfeature.DefaultFeatureGate.Enabled(features.GenericEphemeralVolume) { + // Generic ephemeral inline volumes are handled the + // same way as a PVC reference. The only additional + // constraint (checked below) is that the PVC must be + // owned by the pod. + pvcSource = &v1.PersistentVolumeClaimVolumeSource{ + ClaimName: pod.Name + "-" + podVolume.Name, + ReadOnly: podVolume.VolumeSource.Ephemeral.ReadOnly, + } + ephemeral = true + } + if pvcSource != nil { klog.V(5).Infof( "Found PVC, ClaimName: %q/%q", - podNamespace, + pod.Namespace, pvcSource.ClaimName) // If podVolume is a PVC, fetch the real PV behind the claim pvc, err := dswp.getPVCExtractPV( - podNamespace, pvcSource.ClaimName) + pod.Namespace, pvcSource.ClaimName) if err != nil { return nil, nil, "", fmt.Errorf( "error processing PVC %s/%s: %v", - podNamespace, + pod.Namespace, pvcSource.ClaimName, err) } + if ephemeral && !metav1.IsControlledBy(pvc, pod) { + return nil, nil, "", fmt.Errorf( + "error processing PVC %s/%s: not the ephemeral PVC for the pod", + pod.Namespace, + pvcSource.ClaimName, + ) + } pvName, pvcUID := pvc.Spec.VolumeName, pvc.UID klog.V(5).Infof( "Found bound PV for PVC (ClaimName %q/%q pvcUID %v): pvName=%q", - podNamespace, + pod.Namespace, pvcSource.ClaimName, pvcUID, pvName) @@ -524,7 +545,7 @@ func (dswp *desiredStateOfWorldPopulator) createVolumeSpec( if err != nil { return nil, nil, "", fmt.Errorf( "error processing PVC %s/%s: %v", - podNamespace, + pod.Namespace, pvcSource.ClaimName, err) } @@ -533,7 +554,7 @@ func (dswp *desiredStateOfWorldPopulator) createVolumeSpec( "Extracted volumeSpec (%v) from bound PV (pvName %q) and PVC (ClaimName %q/%q pvcUID %v)", volumeSpec.Name(), pvName, - podNamespace, + pod.Namespace, pvcSource.ClaimName, pvcUID) diff --git a/pkg/kubelet/volumemanager/populator/desired_state_of_world_populator_test.go b/pkg/kubelet/volumemanager/populator/desired_state_of_world_populator_test.go index dad1ddbf81d..28d1b05068c 100644 --- a/pkg/kubelet/volumemanager/populator/desired_state_of_world_populator_test.go +++ b/pkg/kubelet/volumemanager/populator/desired_state_of_world_populator_test.go @@ -518,7 +518,7 @@ func TestCreateVolumeSpec_Valid_File_VolumeMounts(t *testing.T) { fakePodManager.AddPod(pod) mountsMap, devicesMap := util.GetPodVolumeNames(pod) _, volumeSpec, _, err := - dswp.createVolumeSpec(pod.Spec.Volumes[0], pod.Name, pod.Namespace, mountsMap, devicesMap) + dswp.createVolumeSpec(pod.Spec.Volumes[0], pod, mountsMap, devicesMap) // Assert if volumeSpec == nil || err != nil { @@ -564,7 +564,7 @@ func TestCreateVolumeSpec_Valid_Nil_VolumeMounts(t *testing.T) { fakePodManager.AddPod(pod) mountsMap, devicesMap := util.GetPodVolumeNames(pod) _, volumeSpec, _, err := - dswp.createVolumeSpec(pod.Spec.Volumes[0], pod.Name, pod.Namespace, mountsMap, devicesMap) + dswp.createVolumeSpec(pod.Spec.Volumes[0], pod, mountsMap, devicesMap) // Assert if volumeSpec == nil || err != nil { @@ -610,7 +610,7 @@ func TestCreateVolumeSpec_Valid_Block_VolumeDevices(t *testing.T) { fakePodManager.AddPod(pod) mountsMap, devicesMap := util.GetPodVolumeNames(pod) _, volumeSpec, _, err := - dswp.createVolumeSpec(pod.Spec.Volumes[0], pod.Name, pod.Namespace, mountsMap, devicesMap) + dswp.createVolumeSpec(pod.Spec.Volumes[0], pod, mountsMap, devicesMap) // Assert if volumeSpec == nil || err != nil { @@ -656,7 +656,7 @@ func TestCreateVolumeSpec_Invalid_File_VolumeDevices(t *testing.T) { fakePodManager.AddPod(pod) mountsMap, devicesMap := util.GetPodVolumeNames(pod) _, volumeSpec, _, err := - dswp.createVolumeSpec(pod.Spec.Volumes[0], pod.Name, pod.Namespace, mountsMap, devicesMap) + dswp.createVolumeSpec(pod.Spec.Volumes[0], pod, mountsMap, devicesMap) // Assert if volumeSpec != nil || err == nil { @@ -702,7 +702,7 @@ func TestCreateVolumeSpec_Invalid_Block_VolumeMounts(t *testing.T) { fakePodManager.AddPod(pod) mountsMap, devicesMap := util.GetPodVolumeNames(pod) _, volumeSpec, _, err := - dswp.createVolumeSpec(pod.Spec.Volumes[0], pod.Name, pod.Namespace, mountsMap, devicesMap) + dswp.createVolumeSpec(pod.Spec.Volumes[0], pod, mountsMap, devicesMap) // Assert if volumeSpec != nil || err == nil { diff --git a/pkg/scheduler/core/BUILD b/pkg/scheduler/core/BUILD index f5d263232b0..738289a2d8a 100644 --- a/pkg/scheduler/core/BUILD +++ b/pkg/scheduler/core/BUILD @@ -10,6 +10,7 @@ go_library( visibility = ["//visibility:public"], deps = [ "//pkg/api/v1/pod:go_default_library", + "//pkg/features:go_default_library", "//pkg/scheduler/apis/config:go_default_library", "//pkg/scheduler/framework/runtime:go_default_library", "//pkg/scheduler/framework/v1alpha1:go_default_library", @@ -19,8 +20,10 @@ go_library( "//pkg/scheduler/profile:go_default_library", "//pkg/scheduler/util:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", "//staging/src/k8s.io/client-go/listers/core/v1:go_default_library", "//staging/src/k8s.io/client-go/rest:go_default_library", "//staging/src/k8s.io/kube-scheduler/extender/v1:go_default_library", diff --git a/pkg/scheduler/core/generic_scheduler.go b/pkg/scheduler/core/generic_scheduler.go index e6c2bcb3956..65326a908ad 100644 --- a/pkg/scheduler/core/generic_scheduler.go +++ b/pkg/scheduler/core/generic_scheduler.go @@ -29,9 +29,12 @@ import ( "k8s.io/klog/v2" v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + utilfeature "k8s.io/apiserver/pkg/util/feature" corelisters "k8s.io/client-go/listers/core/v1" extenderv1 "k8s.io/kube-scheduler/extender/v1" podutil "k8s.io/kubernetes/pkg/api/v1/pod" + "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/scheduler/framework/runtime" framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache" @@ -575,11 +578,19 @@ func podPassesBasicChecks(pod *v1.Pod, pvcLister corelisters.PersistentVolumeCla manifest := &(pod.Spec) for i := range manifest.Volumes { volume := &manifest.Volumes[i] - if volume.PersistentVolumeClaim == nil { - // Volume is not a PVC, ignore + var pvcName string + ephemeral := false + switch { + case volume.PersistentVolumeClaim != nil: + pvcName = volume.PersistentVolumeClaim.ClaimName + case volume.Ephemeral != nil && + utilfeature.DefaultFeatureGate.Enabled(features.GenericEphemeralVolume): + pvcName = pod.Name + "-" + volume.Name + ephemeral = true + default: + // Volume is not using a PVC, ignore continue } - pvcName := volume.PersistentVolumeClaim.ClaimName pvc, err := pvcLister.PersistentVolumeClaims(namespace).Get(pvcName) if err != nil { // The error has already enough context ("persistentvolumeclaim "myclaim" not found") @@ -589,6 +600,11 @@ func podPassesBasicChecks(pod *v1.Pod, pvcLister corelisters.PersistentVolumeCla if pvc.DeletionTimestamp != nil { return fmt.Errorf("persistentvolumeclaim %q is being deleted", pvc.Name) } + + if ephemeral && + !metav1.IsControlledBy(pvc, pod) { + return fmt.Errorf("persistentvolumeclaim %q was not created for the pod", pvc.Name) + } } return nil diff --git a/pkg/scheduler/framework/plugins/volumebinding/volume_binding.go b/pkg/scheduler/framework/plugins/volumebinding/volume_binding.go index 0fee9534f40..52c316de348 100644 --- a/pkg/scheduler/framework/plugins/volumebinding/volume_binding.go +++ b/pkg/scheduler/framework/plugins/volumebinding/volume_binding.go @@ -61,7 +61,8 @@ func (d *stateData) Clone() framework.StateData { // In the Filter phase, pod binding cache is created for the pod and used in // Reserve and PreBind phases. type VolumeBinding struct { - Binder scheduling.SchedulerVolumeBinder + Binder scheduling.SchedulerVolumeBinder + GenericEphemeralVolumeFeatureEnabled bool } var _ framework.PreFilterPlugin = &VolumeBinding{} @@ -77,9 +78,10 @@ func (pl *VolumeBinding) Name() string { return Name } -func podHasPVCs(pod *v1.Pod) bool { +func (pl *VolumeBinding) podHasPVCs(pod *v1.Pod) bool { for _, vol := range pod.Spec.Volumes { - if vol.PersistentVolumeClaim != nil { + if vol.PersistentVolumeClaim != nil || + pl.GenericEphemeralVolumeFeatureEnabled && vol.Ephemeral != nil { return true } } @@ -91,7 +93,7 @@ func podHasPVCs(pod *v1.Pod) bool { // UnschedulableAndUnresolvable is returned. func (pl *VolumeBinding) PreFilter(ctx context.Context, state *framework.CycleState, pod *v1.Pod) *framework.Status { // If pod does not reference any PVC, we don't need to do anything. - if !podHasPVCs(pod) { + if !pl.podHasPVCs(pod) { state.Write(stateKey, &stateData{skip: true}) return nil } @@ -268,7 +270,8 @@ func New(plArgs runtime.Object, fh framework.FrameworkHandle) (framework.Plugin, } binder := scheduling.NewVolumeBinder(fh.ClientSet(), podInformer, nodeInformer, csiNodeInformer, pvcInformer, pvInformer, storageClassInformer, capacityCheck, time.Duration(args.BindTimeoutSeconds)*time.Second) return &VolumeBinding{ - Binder: binder, + Binder: binder, + GenericEphemeralVolumeFeatureEnabled: utilfeature.DefaultFeatureGate.Enabled(features.GenericEphemeralVolume), }, nil } diff --git a/plugin/pkg/auth/authorizer/node/graph.go b/plugin/pkg/auth/authorizer/node/graph.go index e0076feca65..2e0b95cc145 100644 --- a/plugin/pkg/auth/authorizer/node/graph.go +++ b/plugin/pkg/auth/authorizer/node/graph.go @@ -20,8 +20,10 @@ import ( "sync" corev1 "k8s.io/api/core/v1" + utilfeature "k8s.io/apiserver/pkg/util/feature" pvutil "k8s.io/kubernetes/pkg/api/v1/persistentvolume" podutil "k8s.io/kubernetes/pkg/api/v1/pod" + "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/third_party/forked/gonum/graph" "k8s.io/kubernetes/third_party/forked/gonum/graph/simple" ) @@ -375,8 +377,14 @@ func (g *Graph) AddPod(pod *corev1.Pod) { }) for _, v := range pod.Spec.Volumes { + claimName := "" if v.PersistentVolumeClaim != nil { - pvcVertex := g.getOrCreateVertex_locked(pvcVertexType, pod.Namespace, v.PersistentVolumeClaim.ClaimName) + claimName = v.PersistentVolumeClaim.ClaimName + } else if v.Ephemeral != nil && utilfeature.DefaultFeatureGate.Enabled(features.GenericEphemeralVolume) { + claimName = pod.Name + "-" + v.Name + } + if claimName != "" { + pvcVertex := g.getOrCreateVertex_locked(pvcVertexType, pod.Namespace, claimName) e := newDestinationEdge(pvcVertex, podVertex, nodeVertex) g.graph.SetEdge(e) g.addEdgeToDestinationIndex_locked(e) diff --git a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/controller_policy.go b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/controller_policy.go index e6d68801528..27ab11ccaca 100644 --- a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/controller_policy.go +++ b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/controller_policy.go @@ -178,6 +178,17 @@ func buildControllerRoles() ([]rbacv1.ClusterRole, []rbacv1.ClusterRoleBinding) }) } + if utilfeature.DefaultFeatureGate.Enabled(features.GenericEphemeralVolume) { + addControllerRole(&controllerRoles, &controllerRoleBindings, rbacv1.ClusterRole{ + ObjectMeta: metav1.ObjectMeta{Name: saRolePrefix + "ephemeral-volume-controller"}, + Rules: []rbacv1.PolicyRule{ + rbacv1helpers.NewRule("get", "list", "watch").Groups(legacyGroup).Resources("pods").RuleOrDie(), + rbacv1helpers.NewRule("get", "list", "watch", "create").Groups(legacyGroup).Resources("persistentvolumeclaims").RuleOrDie(), + eventsRule(), + }, + }) + } + addControllerRole(&controllerRoles, &controllerRoleBindings, rbacv1.ClusterRole{ ObjectMeta: metav1.ObjectMeta{Name: saRolePrefix + "generic-garbage-collector"}, Rules: []rbacv1.PolicyRule{