GenericEphemeralVolume: initial implementation

The implementation consists of
- identifying all places where VolumeSource.PersistentVolumeClaim has
  a special meaning and then ensuring that the same code path is taken
  for an ephemeral volume, with the ownership check
- adding a controller that produces the PVCs for each embedded
  VolumeSource.EphemeralVolume
- relaxing the PVC protection controller such that it removes
  the finalizer already before the pod is deleted (only
  if the GenericEphemeralVolume feature is enabled): this is
  needed to break a cycle where foreground deletion of the pod
  blocks on removing the PVC, which waits for deletion of the pod

The controller was derived from the endpointslices controller.
This commit is contained in:
Patrick Ohly 2020-06-08 10:31:38 +02:00
parent af91e76df8
commit ff3e5e06a7
25 changed files with 861 additions and 72 deletions

View File

@ -74,6 +74,7 @@ go_library(
"//pkg/controller/ttl:go_default_library",
"//pkg/controller/ttlafterfinished:go_default_library",
"//pkg/controller/volume/attachdetach:go_default_library",
"//pkg/controller/volume/ephemeral:go_default_library",
"//pkg/controller/volume/expand:go_default_library",
"//pkg/controller/volume/persistentvolume:go_default_library",
"//pkg/controller/volume/persistentvolume/config:go_default_library",

View File

@ -423,6 +423,7 @@ func NewControllerInitializers(loopMode ControllerLoopMode) map[string]InitFunc
controllers["pv-protection"] = startPVProtectionController
controllers["ttl-after-finished"] = startTTLAfterFinishedController
controllers["root-ca-cert-publisher"] = startRootCACertPublisher
controllers["ephemeral-volume"] = startEphemeralVolumeController
return controllers
}

View File

@ -57,6 +57,7 @@ import (
ttlcontroller "k8s.io/kubernetes/pkg/controller/ttl"
"k8s.io/kubernetes/pkg/controller/ttlafterfinished"
"k8s.io/kubernetes/pkg/controller/volume/attachdetach"
"k8s.io/kubernetes/pkg/controller/volume/ephemeral"
"k8s.io/kubernetes/pkg/controller/volume/expand"
persistentvolumecontroller "k8s.io/kubernetes/pkg/controller/volume/persistentvolume"
"k8s.io/kubernetes/pkg/controller/volume/pvcprotection"
@ -373,6 +374,22 @@ func startVolumeExpandController(ctx ControllerContext) (http.Handler, bool, err
return nil, false, nil
}
func startEphemeralVolumeController(ctx ControllerContext) (http.Handler, bool, error) {
if utilfeature.DefaultFeatureGate.Enabled(features.GenericEphemeralVolume) {
ephemeralController, err := ephemeral.NewController(
ctx.ClientBuilder.ClientOrDie("ephemeral-volume-controller"),
ctx.InformerFactory.Core().V1().Pods(),
ctx.InformerFactory.Core().V1().PersistentVolumeClaims())
if err != nil {
return nil, true, fmt.Errorf("failed to start ephemeral volume controller: %v", err)
}
// TODO (before beta at the latest): make this configurable similar to the EndpointController
go ephemeralController.Run(1 /* int(ctx.ComponentConfig.EphemeralController.ConcurrentEphemeralVolumeSyncs) */, ctx.Stop)
return nil, true, nil
}
return nil, false, nil
}
func startEndpointController(ctx ControllerContext) (http.Handler, bool, error) {
go endpointcontroller.NewEndpointController(
ctx.InformerFactory.Core().V1().Pods(),
@ -539,6 +556,7 @@ func startPVCProtectionController(ctx ControllerContext) (http.Handler, bool, er
ctx.InformerFactory.Core().V1().Pods(),
ctx.ClientBuilder.ClientOrDie("pvc-protection-controller"),
utilfeature.DefaultFeatureGate.Enabled(features.StorageObjectInUseProtection),
utilfeature.DefaultFeatureGate.Enabled(features.StorageObjectInUseProtection),
)
if err != nil {
return nil, true, fmt.Errorf("failed to start the pvc protection controller: %v", err)

View File

@ -136,6 +136,7 @@ filegroup(
"//pkg/controller/util/node:all-srcs",
"//pkg/controller/volume/attachdetach:all-srcs",
"//pkg/controller/volume/common:all-srcs",
"//pkg/controller/volume/ephemeral:all-srcs",
"//pkg/controller/volume/events:all-srcs",
"//pkg/controller/volume/expand:all-srcs",
"//pkg/controller/volume/persistentvolume:all-srcs",

View File

@ -203,7 +203,7 @@ func NewAttachDetachController(
// This custom indexer will index pods by its PVC keys. Then we don't need
// to iterate all pods every time to find pods which reference given PVC.
if err := common.AddIndexerIfNotPresent(adc.podIndexer, common.PodPVCIndex, common.PodPVCIndexFunc); err != nil {
if err := common.AddPodPVCIndexerIfNotPresent(adc.podIndexer); err != nil {
return nil, fmt.Errorf("Could not initialize attach detach controller: %v", err)
}
@ -425,7 +425,7 @@ func (adc *attachDetachController) populateDesiredStateOfWorld() error {
// The volume specs present in the ActualStateOfWorld are nil, let's replace those
// with the correct ones found on pods. The present in the ASW with no corresponding
// pod will be detached and the spec is irrelevant.
volumeSpec, err := util.CreateVolumeSpec(podVolume, podToAdd.Namespace, nodeName, &adc.volumePluginMgr, adc.pvcLister, adc.pvLister, adc.csiMigratedPluginManager, adc.intreeToCSITranslator)
volumeSpec, err := util.CreateVolumeSpec(podVolume, podToAdd, nodeName, &adc.volumePluginMgr, adc.pvcLister, adc.pvLister, adc.csiMigratedPluginManager, adc.intreeToCSITranslator)
if err != nil {
klog.Errorf(
"Error creating spec for volume %q, pod %q/%q: %v",

View File

@ -168,7 +168,7 @@ func (collector *attachDetachStateCollector) getVolumeInUseCount() volumeCount {
continue
}
for _, podVolume := range pod.Spec.Volumes {
volumeSpec, err := util.CreateVolumeSpec(podVolume, pod.Namespace, types.NodeName(pod.Spec.NodeName), collector.volumePluginMgr, collector.pvcLister, collector.pvLister, collector.csiMigratedPluginManager, collector.intreeToCSITranslator)
volumeSpec, err := util.CreateVolumeSpec(podVolume, pod, types.NodeName(pod.Spec.NodeName), collector.volumePluginMgr, collector.pvcLister, collector.pvLister, collector.csiMigratedPluginManager, collector.intreeToCSITranslator)
if err != nil {
continue
}

View File

@ -39,39 +39,49 @@ import (
// A volume.Spec that refers to an in-tree plugin spec is translated to refer
// to a migrated CSI plugin spec if all conditions for CSI migration on a node
// for the in-tree plugin is satisfied.
func CreateVolumeSpec(podVolume v1.Volume, podNamespace string, nodeName types.NodeName, vpm *volume.VolumePluginMgr, pvcLister corelisters.PersistentVolumeClaimLister, pvLister corelisters.PersistentVolumeLister, csiMigratedPluginManager csimigration.PluginManager, csiTranslator csimigration.InTreeToCSITranslator) (*volume.Spec, error) {
func CreateVolumeSpec(podVolume v1.Volume, pod *v1.Pod, nodeName types.NodeName, vpm *volume.VolumePluginMgr, pvcLister corelisters.PersistentVolumeClaimLister, pvLister corelisters.PersistentVolumeLister, csiMigratedPluginManager csimigration.PluginManager, csiTranslator csimigration.InTreeToCSITranslator) (*volume.Spec, error) {
claimName := ""
readOnly := false
if pvcSource := podVolume.VolumeSource.PersistentVolumeClaim; pvcSource != nil {
claimName = pvcSource.ClaimName
readOnly = pvcSource.ReadOnly
}
if ephemeralSource := podVolume.VolumeSource.Ephemeral; ephemeralSource != nil && utilfeature.DefaultFeatureGate.Enabled(features.GenericEphemeralVolume) {
claimName = pod.Name + "-" + podVolume.Name
readOnly = ephemeralSource.ReadOnly
}
if claimName != "" {
klog.V(10).Infof(
"Found PVC, ClaimName: %q/%q",
podNamespace,
pvcSource.ClaimName)
pod.Namespace,
claimName)
// If podVolume is a PVC, fetch the real PV behind the claim
pvName, pvcUID, err := getPVCFromCacheExtractPV(
podNamespace, pvcSource.ClaimName, pvcLister)
pod.Namespace, claimName, pvcLister)
if err != nil {
return nil, fmt.Errorf(
"error processing PVC %q/%q: %v",
podNamespace,
pvcSource.ClaimName,
pod.Namespace,
claimName,
err)
}
klog.V(10).Infof(
"Found bound PV for PVC (ClaimName %q/%q pvcUID %v): pvName=%q",
podNamespace,
pvcSource.ClaimName,
pod.Namespace,
claimName,
pvcUID,
pvName)
// Fetch actual PV object
volumeSpec, err := getPVSpecFromCache(
pvName, pvcSource.ReadOnly, pvcUID, pvLister)
pvName, readOnly, pvcUID, pvLister)
if err != nil {
return nil, fmt.Errorf(
"error processing PVC %q/%q: %v",
podNamespace,
pvcSource.ClaimName,
pod.Namespace,
claimName,
err)
}
@ -79,8 +89,8 @@ func CreateVolumeSpec(podVolume v1.Volume, podNamespace string, nodeName types.N
if err != nil {
return nil, fmt.Errorf(
"error performing CSI migration checks and translation for PVC %q/%q: %v",
podNamespace,
pvcSource.ClaimName,
pod.Namespace,
claimName,
err)
}
@ -88,8 +98,8 @@ func CreateVolumeSpec(podVolume v1.Volume, podNamespace string, nodeName types.N
"Extracted volumeSpec (%v) from bound PV (pvName %q) and PVC (ClaimName %q/%q pvcUID %v)",
volumeSpec.Name(),
pvName,
podNamespace,
pvcSource.ClaimName,
pod.Namespace,
claimName,
pvcUID)
return volumeSpec, nil
@ -219,7 +229,7 @@ func ProcessPodVolumes(pod *v1.Pod, addVolumes bool, desiredStateOfWorld cache.D
// Process volume spec for each volume defined in pod
for _, podVolume := range pod.Spec.Volumes {
volumeSpec, err := CreateVolumeSpec(podVolume, pod.Namespace, nodeName, volumePluginMgr, pvcLister, pvLister, csiMigratedPluginManager, csiTranslator)
volumeSpec, err := CreateVolumeSpec(podVolume, pod, nodeName, volumePluginMgr, pvcLister, pvLister, csiMigratedPluginManager, csiTranslator)
if err != nil {
klog.V(10).Infof(
"Error processing volume %q for pod %q/%q: %v",

View File

@ -6,7 +6,9 @@ go_library(
importpath = "k8s.io/kubernetes/pkg/controller/volume/common",
visibility = ["//visibility:public"],
deps = [
"//pkg/features:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
],
)

View File

@ -20,7 +20,9 @@ import (
"fmt"
v1 "k8s.io/api/core/v1"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/tools/cache"
"k8s.io/kubernetes/pkg/features"
)
const (
@ -28,20 +30,37 @@ const (
PodPVCIndex = "pod-pvc-index"
)
// PodPVCIndexFunc returns PVC keys for given pod
func PodPVCIndexFunc(obj interface{}) ([]string, error) {
// PodPVCIndexFunc creates an index function that returns PVC keys (=
// namespace/name) for given pod. If enabled, this includes the PVCs
// that might be created for generic ephemeral volumes.
func PodPVCIndexFunc(genericEphemeralVolumeFeatureEnabled bool) func(obj interface{}) ([]string, error) {
return func(obj interface{}) ([]string, error) {
pod, ok := obj.(*v1.Pod)
if !ok {
return []string{}, nil
}
keys := []string{}
for _, podVolume := range pod.Spec.Volumes {
claimName := ""
if pvcSource := podVolume.VolumeSource.PersistentVolumeClaim; pvcSource != nil {
keys = append(keys, fmt.Sprintf("%s/%s", pod.Namespace, pvcSource.ClaimName))
claimName = pvcSource.ClaimName
}
if ephemeralSource := podVolume.VolumeSource.Ephemeral; genericEphemeralVolumeFeatureEnabled && ephemeralSource != nil {
claimName = pod.Name + "-" + podVolume.Name
}
if claimName != "" {
keys = append(keys, fmt.Sprintf("%s/%s", pod.Namespace, claimName))
}
}
return keys, nil
}
}
// AddPodPVCIndexerIfNotPresent adds the PodPVCIndexFunc with the current global setting for GenericEphemeralVolume.
func AddPodPVCIndexerIfNotPresent(indexer cache.Indexer) error {
return AddIndexerIfNotPresent(indexer, PodPVCIndex,
PodPVCIndexFunc(utilfeature.DefaultFeatureGate.Enabled(features.GenericEphemeralVolume)))
}
// AddIndexerIfNotPresent adds the index function with the name into the cache indexer if not present
func AddIndexerIfNotPresent(indexer cache.Indexer, indexName string, indexFunc cache.IndexFunc) error {

View File

@ -0,0 +1,62 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = [
"controller.go",
"doc.go",
],
importpath = "k8s.io/kubernetes/pkg/controller/volume/ephemeral",
visibility = ["//visibility:public"],
deps = [
"//pkg/controller/volume/common:go_default_library",
"//pkg/controller/volume/events:go_default_library",
"//pkg/volume/util:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/client-go/informers/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/scheme:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/listers/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
"//staging/src/k8s.io/client-go/tools/record:go_default_library",
"//staging/src/k8s.io/client-go/util/workqueue:go_default_library",
"//vendor/k8s.io/klog/v2:go_default_library",
],
)
go_test(
name = "go_default_test",
srcs = ["controller_test.go"],
embed = [":go_default_library"],
deps = [
"//pkg/controller:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/client-go/informers:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library",
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
"//vendor/github.com/stretchr/testify/assert:go_default_library",
"//vendor/k8s.io/klog/v2:go_default_library",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)

View File

@ -0,0 +1,6 @@
# See the OWNERS docs at https://go.k8s.io/owners
approvers:
- saad-ali
- jsafrane
- pohly

View File

@ -0,0 +1,286 @@
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package ephemeral
import (
"context"
"fmt"
"time"
"k8s.io/klog/v2"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
coreinformers "k8s.io/client-go/informers/core/v1"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
kcache "k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
"k8s.io/kubernetes/pkg/controller/volume/common"
"k8s.io/kubernetes/pkg/controller/volume/events"
"k8s.io/kubernetes/pkg/volume/util"
)
// Controller creates PVCs for ephemeral inline volumes in a pod spec.
type Controller interface {
Run(workers int, stopCh <-chan struct{})
}
type ephemeralController struct {
// kubeClient is the kube API client used by volumehost to communicate with
// the API server.
kubeClient clientset.Interface
// pvcLister is the shared PVC lister used to fetch and store PVC
// objects from the API server. It is shared with other controllers and
// therefore the PVC objects in its store should be treated as immutable.
pvcLister corelisters.PersistentVolumeClaimLister
pvcsSynced kcache.InformerSynced
// podLister is the shared Pod lister used to fetch Pod
// objects from the API server. It is shared with other controllers and
// therefore the Pod objects in its store should be treated as immutable.
podLister corelisters.PodLister
podSynced kcache.InformerSynced
// podIndexer has the common PodPVC indexer indexer installed To
// limit iteration over pods to those of interest.
podIndexer cache.Indexer
// recorder is used to record events in the API server
recorder record.EventRecorder
queue workqueue.RateLimitingInterface
}
// NewController creates an ephemeral volume controller.
func NewController(
kubeClient clientset.Interface,
podInformer coreinformers.PodInformer,
pvcInformer coreinformers.PersistentVolumeClaimInformer) (Controller, error) {
ec := &ephemeralController{
kubeClient: kubeClient,
podLister: podInformer.Lister(),
podIndexer: podInformer.Informer().GetIndexer(),
podSynced: podInformer.Informer().HasSynced,
pvcLister: pvcInformer.Lister(),
pvcsSynced: pvcInformer.Informer().HasSynced,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "ephemeral_volume"),
}
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(klog.Infof)
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})
ec.recorder = eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "ephemeral_volume"})
podInformer.Informer().AddEventHandler(kcache.ResourceEventHandlerFuncs{
AddFunc: ec.enqueuePod,
// The pod spec is immutable. Therefore the controller can ignore pod updates
// because there cannot be any changes that have to be copied into the generated
// PVC.
// Deletion of the PVC is handled through the owner reference and garbage collection.
// Therefore pod deletions also can be ignored.
})
pvcInformer.Informer().AddEventHandler(kcache.ResourceEventHandlerFuncs{
DeleteFunc: ec.onPVCDelete,
})
if err := common.AddPodPVCIndexerIfNotPresent(ec.podIndexer); err != nil {
return nil, fmt.Errorf("Could not initialize pvc protection controller: %v", err)
}
return ec, nil
}
func (ec *ephemeralController) enqueuePod(obj interface{}) {
pod, ok := obj.(*v1.Pod)
if !ok {
return
}
// Ignore pods which are already getting deleted.
if pod.DeletionTimestamp != nil {
return
}
for _, vol := range pod.Spec.Volumes {
if vol.Ephemeral != nil {
// It has at least one ephemeral inline volume, work on it.
key, err := kcache.DeletionHandlingMetaNamespaceKeyFunc(pod)
if err != nil {
runtime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", pod, err))
return
}
ec.queue.Add(key)
break
}
}
}
func (ec *ephemeralController) onPVCDelete(obj interface{}) {
pvc, ok := obj.(*v1.PersistentVolumeClaim)
if !ok {
return
}
// Someone deleted a PVC, either intentionally or
// accidentally. If there is a pod referencing it because of
// an ephemeral volume, then we should re-create the PVC.
// The common indexer does some prefiltering for us by
// limiting the list to those pods which reference
// the PVC.
objs, err := ec.podIndexer.ByIndex(common.PodPVCIndex, fmt.Sprintf("%s/%s", pvc.Namespace, pvc.Name))
if err != nil {
runtime.HandleError(fmt.Errorf("listing pods from cache: %v", err))
return
}
for _, obj := range objs {
ec.enqueuePod(obj)
}
}
func (ec *ephemeralController) Run(workers int, stopCh <-chan struct{}) {
defer runtime.HandleCrash()
defer ec.queue.ShutDown()
klog.Infof("Starting ephemeral volume controller")
defer klog.Infof("Shutting down ephemeral volume controller")
if !cache.WaitForNamedCacheSync("ephemeral", stopCh, ec.podSynced, ec.pvcsSynced) {
return
}
for i := 0; i < workers; i++ {
go wait.Until(ec.runWorker, time.Second, stopCh)
}
<-stopCh
}
func (ec *ephemeralController) runWorker() {
for ec.processNextWorkItem() {
}
}
func (ec *ephemeralController) processNextWorkItem() bool {
key, shutdown := ec.queue.Get()
if shutdown {
return false
}
defer ec.queue.Done(key)
err := ec.syncHandler(key.(string))
if err == nil {
ec.queue.Forget(key)
return true
}
runtime.HandleError(fmt.Errorf("%v failed with: %v", key, err))
ec.queue.AddRateLimited(key)
return true
}
// syncHandler is invoked for each pod which might need to be processed.
// If an error is returned from this function, the pod will be requeued.
func (ec *ephemeralController) syncHandler(key string) error {
namespace, name, err := kcache.SplitMetaNamespaceKey(key)
if err != nil {
return err
}
pod, err := ec.podLister.Pods(namespace).Get(name)
if err != nil {
if errors.IsNotFound(err) {
klog.V(5).Infof("ephemeral: nothing to do for pod %s, it is gone", key)
return nil
}
klog.V(5).Infof("Error getting pod %s/%s (uid: %q) from informer : %v", pod.Namespace, pod.Name, pod.UID, err)
return err
}
// Ignore pods which are already getting deleted.
if pod.DeletionTimestamp != nil {
klog.V(5).Infof("ephemeral: nothing to do for pod %s, it is marked for deletion", key)
return nil
}
for _, vol := range pod.Spec.Volumes {
if err := ec.handleVolume(pod, vol); err != nil {
ec.recorder.Event(pod, v1.EventTypeWarning, events.FailedBinding, fmt.Sprintf("ephemeral volume %s: %v", vol.Name, err))
return fmt.Errorf("pod %s, ephemeral volume %s: %v", key, vol.Name, err)
}
}
return nil
}
// handleEphemeralVolume is invoked for each volume of a pod.
func (ec *ephemeralController) handleVolume(pod *v1.Pod, vol v1.Volume) error {
klog.V(5).Infof("ephemeral: checking volume %s", vol.Name)
ephemeral := vol.Ephemeral
if ephemeral == nil {
return nil
}
pvcName := pod.Name + "-" + vol.Name
pvc, err := ec.pvcLister.PersistentVolumeClaims(pod.Namespace).Get(pvcName)
if err != nil && !errors.IsNotFound(err) {
return err
}
if pvc != nil {
if metav1.IsControlledBy(pvc, pod) {
// Already created, nothing more to do.
klog.V(5).Infof("ephemeral: volume %s: PVC %s already created", vol.Name, pvcName)
return nil
}
return fmt.Errorf("PVC %q (uid: %q) was not created for the pod",
util.GetPersistentVolumeClaimQualifiedName(pvc), pvc.UID)
}
// Create the PVC with pod as owner.
isTrue := true
pvc = &v1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Name: pvcName,
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: "v1",
Kind: "Pod",
Name: pod.Name,
UID: pod.UID,
Controller: &isTrue,
BlockOwnerDeletion: &isTrue,
},
},
Annotations: ephemeral.VolumeClaimTemplate.Annotations,
Labels: ephemeral.VolumeClaimTemplate.Labels,
},
Spec: ephemeral.VolumeClaimTemplate.Spec,
}
_, err = ec.kubeClient.CoreV1().PersistentVolumeClaims(pod.Namespace).Create(context.TODO(), pvc, metav1.CreateOptions{})
if err != nil {
return fmt.Errorf("create PVC %s: %v", pvcName, err)
}
return nil
}

View File

@ -0,0 +1,221 @@
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package ephemeral
import (
"context"
"sort"
"testing"
"k8s.io/api/core/v1"
// storagev1 "k8s.io/api/storage/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
// "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/tools/cache"
kcache "k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/controller"
"github.com/stretchr/testify/assert"
)
var (
testPodName = "test-pod"
testNamespace = "my-namespace"
testPodUID = types.UID("uidpod1")
otherNamespace = "not-my-namespace"
ephemeralVolumeName = "ephemeral-volume"
testPod = makePod(testPodName, testNamespace, testPodUID)
testPodWithEphemeral = makePod(testPodName, testNamespace, testPodUID, *makeEphemeralVolume(ephemeralVolumeName))
testPodEphemeralClaim = makePVC(testPodName+"-"+ephemeralVolumeName, testNamespace, makeOwnerReference(testPodWithEphemeral, true))
conflictingClaim = makePVC(testPodName+"-"+ephemeralVolumeName, testNamespace, nil)
otherNamespaceClaim = makePVC(testPodName+"-"+ephemeralVolumeName, otherNamespace, nil)
)
func init() {
klog.InitFlags(nil)
}
func TestSyncHandler(t *testing.T) {
tests := []struct {
name string
podKey string
pvcs []*v1.PersistentVolumeClaim
pods []*v1.Pod
expectedPVCs []v1.PersistentVolumeClaim
expectedError bool
}{
{
name: "create",
pods: []*v1.Pod{testPodWithEphemeral},
podKey: podKey(testPodWithEphemeral),
expectedPVCs: []v1.PersistentVolumeClaim{*testPodEphemeralClaim},
},
{
name: "no-such-pod",
podKey: podKey(testPodWithEphemeral),
},
{
name: "pod-deleted",
pods: func() []*v1.Pod {
deleted := metav1.Now()
pods := []*v1.Pod{testPodWithEphemeral.DeepCopy()}
pods[0].DeletionTimestamp = &deleted
return pods
}(),
podKey: podKey(testPodWithEphemeral),
},
{
name: "no-volumes",
pods: []*v1.Pod{testPod},
podKey: podKey(testPod),
},
{
name: "create-with-other-PVC",
pods: []*v1.Pod{testPodWithEphemeral},
podKey: podKey(testPodWithEphemeral),
pvcs: []*v1.PersistentVolumeClaim{otherNamespaceClaim},
expectedPVCs: []v1.PersistentVolumeClaim{*otherNamespaceClaim, *testPodEphemeralClaim},
},
{
name: "wrong-PVC-owner",
pods: []*v1.Pod{testPodWithEphemeral},
podKey: podKey(testPodWithEphemeral),
pvcs: []*v1.PersistentVolumeClaim{conflictingClaim},
expectedPVCs: []v1.PersistentVolumeClaim{*conflictingClaim},
expectedError: true,
},
}
for _, tc := range tests {
// Run sequentially because of global logging.
t.Run(tc.name, func(t *testing.T) {
// There is no good way to shut down the informers. They spawn
// various goroutines and some of them (in particular shared informer)
// become very unhappy ("close on closed channel") when using a context
// that gets cancelled. Therefore we just keep everything running.
ctx := context.Background()
var objects []runtime.Object
for _, pod := range tc.pods {
objects = append(objects, pod)
}
for _, pvc := range tc.pvcs {
objects = append(objects, pvc)
}
fakeKubeClient := createTestClient(objects...)
informerFactory := informers.NewSharedInformerFactory(fakeKubeClient, controller.NoResyncPeriodFunc())
podInformer := informerFactory.Core().V1().Pods()
pvcInformer := informerFactory.Core().V1().PersistentVolumeClaims()
c, err := NewController(fakeKubeClient, podInformer, pvcInformer)
if err != nil {
t.Fatalf("error creating ephemeral controller : %v", err)
}
ec, _ := c.(*ephemeralController)
// Ensure informers are up-to-date.
go informerFactory.Start(ctx.Done())
informerFactory.WaitForCacheSync(ctx.Done())
cache.WaitForCacheSync(ctx.Done(), podInformer.Informer().HasSynced, pvcInformer.Informer().HasSynced)
err = ec.syncHandler(tc.podKey)
if err != nil && !tc.expectedError {
t.Fatalf("unexpected error while running handler: %v", err)
}
if err == nil && tc.expectedError {
t.Fatalf("unexpected success")
}
pvcs, err := fakeKubeClient.CoreV1().PersistentVolumeClaims("").List(ctx, metav1.ListOptions{})
if err != nil {
t.Fatalf("unexpected error while listing PVCs: %v", err)
}
assert.Equal(t, sortPVCs(tc.expectedPVCs), sortPVCs(pvcs.Items))
})
}
}
func makePVC(name, namespace string, owner *metav1.OwnerReference) *v1.PersistentVolumeClaim {
pvc := &v1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{Name: name, Namespace: namespace},
Spec: v1.PersistentVolumeClaimSpec{},
}
if owner != nil {
pvc.OwnerReferences = []metav1.OwnerReference{*owner}
}
return pvc
}
func makeEphemeralVolume(name string) *v1.Volume {
return &v1.Volume{
Name: name,
VolumeSource: v1.VolumeSource{
Ephemeral: &v1.EphemeralVolumeSource{
VolumeClaimTemplate: &v1.PersistentVolumeClaimTemplate{},
},
},
}
}
func makePod(name, namespace string, uid types.UID, volumes ...v1.Volume) *v1.Pod {
pvc := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{Name: name, Namespace: namespace, UID: uid},
Spec: v1.PodSpec{
Volumes: volumes,
},
}
return pvc
}
func podKey(pod *v1.Pod) string {
key, _ := kcache.DeletionHandlingMetaNamespaceKeyFunc(testPodWithEphemeral)
return key
}
func makeOwnerReference(pod *v1.Pod, isController bool) *metav1.OwnerReference {
isTrue := true
return &metav1.OwnerReference{
APIVersion: "v1",
Kind: "Pod",
Name: pod.Name,
UID: pod.UID,
Controller: &isController,
BlockOwnerDeletion: &isTrue,
}
}
func sortPVCs(pvcs []v1.PersistentVolumeClaim) []v1.PersistentVolumeClaim {
sort.Slice(pvcs, func(i, j int) bool {
return pvcs[i].Namespace < pvcs[j].Namespace ||
pvcs[i].Name < pvcs[j].Name
})
return pvcs
}
func createTestClient(objects ...runtime.Object) *fake.Clientset {
fakeClient := fake.NewSimpleClientset(objects...)
return fakeClient
}

View File

@ -0,0 +1,21 @@
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package ephemeral implements the controller part of
// https://github.com/kubernetes/enhancements/tree/master/keps/sig-storage/1698-generic-ephemeral-volumes
//
// It was derived from the expand controller.
package ephemeral

View File

@ -134,7 +134,7 @@ func NewController(p ControllerParameters) (*PersistentVolumeController, error)
// This custom indexer will index pods by its PVC keys. Then we don't need
// to iterate all pods every time to find pods which reference given PVC.
if err := common.AddIndexerIfNotPresent(controller.podIndexer, common.PodPVCIndex, common.PodPVCIndexFunc); err != nil {
if err := common.AddPodPVCIndexerIfNotPresent(controller.podIndexer); err != nil {
return nil, fmt.Errorf("Could not initialize attach detach controller: %v", err)
}

View File

@ -55,14 +55,18 @@ type Controller struct {
// allows overriding of StorageObjectInUseProtection feature Enabled/Disabled for testing
storageObjectInUseProtectionEnabled bool
// allows overriding of GenericEphemeralVolume feature Enabled/Disabled for testing
genericEphemeralVolumeFeatureEnabled bool
}
// NewPVCProtectionController returns a new instance of PVCProtectionController.
func NewPVCProtectionController(pvcInformer coreinformers.PersistentVolumeClaimInformer, podInformer coreinformers.PodInformer, cl clientset.Interface, storageObjectInUseProtectionFeatureEnabled bool) (*Controller, error) {
func NewPVCProtectionController(pvcInformer coreinformers.PersistentVolumeClaimInformer, podInformer coreinformers.PodInformer, cl clientset.Interface, storageObjectInUseProtectionFeatureEnabled, genericEphemeralVolumeFeatureEnabled bool) (*Controller, error) {
e := &Controller{
client: cl,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "pvcprotection"),
storageObjectInUseProtectionEnabled: storageObjectInUseProtectionFeatureEnabled,
genericEphemeralVolumeFeatureEnabled: genericEphemeralVolumeFeatureEnabled,
}
if cl != nil && cl.CoreV1().RESTClient().GetRateLimiter() != nil {
ratelimiter.RegisterMetricAndTrackRateLimiterUsage("persistentvolumeclaim_protection_controller", cl.CoreV1().RESTClient().GetRateLimiter())
@ -80,7 +84,7 @@ func NewPVCProtectionController(pvcInformer coreinformers.PersistentVolumeClaimI
e.podLister = podInformer.Lister()
e.podListerSynced = podInformer.Informer().HasSynced
e.podIndexer = podInformer.Informer().GetIndexer()
if err := common.AddIndexerIfNotPresent(e.podIndexer, common.PodPVCIndex, common.PodPVCIndexFunc); err != nil {
if err := common.AddIndexerIfNotPresent(e.podIndexer, common.PodPVCIndex, common.PodPVCIndexFunc(genericEphemeralVolumeFeatureEnabled)); err != nil {
return nil, fmt.Errorf("Could not initialize pvc protection controller: %v", err)
}
podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
@ -236,6 +240,7 @@ func (c *Controller) isBeingUsed(pvc *v1.PersistentVolumeClaim) (bool, error) {
func (c *Controller) askInformer(pvc *v1.PersistentVolumeClaim) (bool, error) {
klog.V(4).Infof("Looking for Pods using PVC %s/%s in the Informer's cache", pvc.Namespace, pvc.Name)
// The indexer is used to find pods which might use the PVC.
objs, err := c.podIndexer.ByIndex(common.PodPVCIndex, fmt.Sprintf("%s/%s", pvc.Namespace, pvc.Name))
if err != nil {
return false, fmt.Errorf("cache-based list of pods failed while processing %s/%s: %s", pvc.Namespace, pvc.Name, err.Error())
@ -245,6 +250,19 @@ func (c *Controller) askInformer(pvc *v1.PersistentVolumeClaim) (bool, error) {
if !ok {
continue
}
if c.genericEphemeralVolumeFeatureEnabled {
// We still need to look at each volume: that's redundant for volume.PersistentVolumeClaim,
// but for volume.Ephemeral we need to be sure that this particular PVC is the one
// created for the ephemeral volume.
if c.podUsesPVC(pod, pvc) {
return true, nil
}
continue
}
// This is the traditional behavior without GenericEphemeralVolume enabled.
if pod.Spec.NodeName == "" {
continue
}
@ -265,7 +283,7 @@ func (c *Controller) askAPIServer(pvc *v1.PersistentVolumeClaim) (bool, error) {
}
for _, pod := range podsList.Items {
if podUsesPVC(&pod, pvc.Name) {
if c.podUsesPVC(&pod, pvc) {
return true, nil
}
}
@ -274,13 +292,14 @@ func (c *Controller) askAPIServer(pvc *v1.PersistentVolumeClaim) (bool, error) {
return false, nil
}
func podUsesPVC(pod *v1.Pod, pvc string) bool {
func (c *Controller) podUsesPVC(pod *v1.Pod, pvc *v1.PersistentVolumeClaim) bool {
// Check whether pvc is used by pod only if pod is scheduled, because
// kubelet sees pods after they have been scheduled and it won't allow
// starting a pod referencing a PVC with a non-nil deletionTimestamp.
if pod.Spec.NodeName != "" {
for _, volume := range pod.Spec.Volumes {
if volume.PersistentVolumeClaim != nil && volume.PersistentVolumeClaim.ClaimName == pvc {
if volume.PersistentVolumeClaim != nil && volume.PersistentVolumeClaim.ClaimName == pvc.Name ||
c.genericEphemeralVolumeFeatureEnabled && !podIsShutDown(pod) && volume.Ephemeral != nil && pod.Name+"-"+volume.Name == pvc.Name && metav1.IsControlledBy(pvc, pod) {
klog.V(2).Infof("Pod %s/%s uses PVC %s", pod.Namespace, pod.Name, pvc)
return true
}
@ -289,6 +308,43 @@ func podUsesPVC(pod *v1.Pod, pvc string) bool {
return false
}
// podIsShutDown returns true if kubelet is done with the pod or
// it was force-deleted.
func podIsShutDown(pod *v1.Pod) bool {
// The following text is based on how pod shutdown was
// initially described to me. During PR review, it was pointed out
// that this is not correct: "deleteGracePeriodSeconds tells
// kubelet when it can start force terminating the
// containers. Volume teardown only starts after containers
// are termianted. So there is an additional time period after
// the grace period where volume teardown is happening."
//
// TODO (https://github.com/kubernetes/enhancements/issues/1698#issuecomment-655344680):
// investigate what kubelet really does and if necessary,
// add some other signal for "kubelet is done". For now the check
// is used only for ephemeral volumes, because it
// is needed to avoid the deadlock.
//
// A pod that has a deletionTimestamp and a zero
// deletionGracePeriodSeconds
// a) has been processed by kubelet and is ready for deletion or
// b) was force-deleted.
//
// It's now just waiting for garbage collection. We could wait
// for it to actually get removed, but that may be blocked by
// finalizers for the pod and thus get delayed.
//
// Worse, it is possible that there is a cyclic dependency
// (pod finalizer waits for PVC to get removed, PVC protection
// controller waits for pod to get removed). By considering
// the PVC unused in this case, we allow the PVC to get
// removed and break such a cycle.
//
// Therefore it is better to proceed with PVC removal,
// which is safe (case a) and/or desirable (case b).
return pod.DeletionTimestamp != nil && pod.DeletionGracePeriodSeconds != nil && *pod.DeletionGracePeriodSeconds == 0
}
// pvcAddedUpdated reacts to pvc added/updated events
func (c *Controller) pvcAddedUpdated(obj interface{}) {
pvc, ok := obj.(*v1.PersistentVolumeClaim)
@ -354,8 +410,11 @@ func (c *Controller) enqueuePVCs(pod *v1.Pod, deleted bool) {
// Enqueue all PVCs that the pod uses
for _, volume := range pod.Spec.Volumes {
if volume.PersistentVolumeClaim != nil {
switch {
case volume.PersistentVolumeClaim != nil:
c.queue.Add(pod.Namespace + "/" + volume.PersistentVolumeClaim.ClaimName)
case c.genericEphemeralVolumeFeatureEnabled && volume.Ephemeral != nil:
c.queue.Add(pod.Namespace + "/" + pod.Name + "-" + volume.Name)
}
}
}

View File

@ -146,7 +146,7 @@ func generateUpdateErrorFunc(t *testing.T, failures int) clienttesting.ReactionF
}
}
func TestPVCProtectionController(t *testing.T) {
func testPVCProtectionController(t *testing.T, genericEphemeralVolumeFeatureEnabled bool) {
pvcGVR := schema.GroupVersionResource{
Group: v1.GroupName,
Version: "v1",
@ -430,7 +430,7 @@ func TestPVCProtectionController(t *testing.T) {
podInformer := informers.Core().V1().Pods()
// Create the controller
ctrl, err := NewPVCProtectionController(pvcInformer, podInformer, client, test.storageObjectInUseProtectionEnabled)
ctrl, err := NewPVCProtectionController(pvcInformer, podInformer, client, test.storageObjectInUseProtectionEnabled, genericEphemeralVolumeFeatureEnabled)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
@ -518,3 +518,8 @@ func TestPVCProtectionController(t *testing.T) {
}
}
func TestPVCProtectionController(t *testing.T) {
t.Run("with-GenericEphemeralVolume", func(t *testing.T) { testPVCProtectionController(t, true) })
t.Run("without-GenericEphemeralVolume", func(t *testing.T) { testPVCProtectionController(t, false) })
}

View File

@ -661,13 +661,28 @@ func (b *volumeBinder) checkBindings(pod *v1.Pod, bindings []*BindingInfo, claim
return true, nil
}
func (b *volumeBinder) isVolumeBound(namespace string, vol *v1.Volume) (bool, *v1.PersistentVolumeClaim, error) {
if vol.PersistentVolumeClaim == nil {
func (b *volumeBinder) isVolumeBound(pod *v1.Pod, vol *v1.Volume) (bound bool, pvc *v1.PersistentVolumeClaim, err error) {
pvcName := ""
ephemeral := false
switch {
case vol.PersistentVolumeClaim != nil:
pvcName = vol.PersistentVolumeClaim.ClaimName
case vol.Ephemeral != nil &&
utilfeature.DefaultFeatureGate.Enabled(features.GenericEphemeralVolume):
// Generic ephemeral inline volumes also use a PVC,
// just with a computed name, and...
pvcName = pod.Name + "-" + vol.Name
ephemeral = true
default:
return true, nil, nil
}
pvcName := vol.PersistentVolumeClaim.ClaimName
return b.isPVCBound(namespace, pvcName)
bound, pvc, err = b.isPVCBound(pod.Namespace, pvcName)
// ... the PVC must be owned by the pod.
if ephemeral && err == nil && pvc != nil && !metav1.IsControlledBy(pvc, pod) {
return false, nil, fmt.Errorf("PVC %s/%s is not owned by pod", pod.Namespace, pvcName)
}
return
}
func (b *volumeBinder) isPVCBound(namespace, pvcName string) (bool, *v1.PersistentVolumeClaim, error) {
@ -703,7 +718,7 @@ func (b *volumeBinder) isPVCFullyBound(pvc *v1.PersistentVolumeClaim) bool {
// arePodVolumesBound returns true if all volumes are fully bound
func (b *volumeBinder) arePodVolumesBound(pod *v1.Pod) bool {
for _, vol := range pod.Spec.Volumes {
if isBound, _, _ := b.isVolumeBound(pod.Namespace, &vol); !isBound {
if isBound, _, _ := b.isVolumeBound(pod, &vol); !isBound {
// Pod has at least one PVC that needs binding
return false
}
@ -719,7 +734,7 @@ func (b *volumeBinder) GetPodVolumes(pod *v1.Pod) (boundClaims []*v1.PersistentV
unboundClaimsDelayBinding = []*v1.PersistentVolumeClaim{}
for _, vol := range pod.Spec.Volumes {
volumeBound, pvc, err := b.isVolumeBound(pod.Namespace, &vol)
volumeBound, pvc, err := b.isVolumeBound(pod, &vol)
if err != nil {
return nil, nil, nil, err
}

View File

@ -316,7 +316,7 @@ func (dswp *desiredStateOfWorldPopulator) processPodVolumes(
}
pvc, volumeSpec, volumeGidValue, err :=
dswp.createVolumeSpec(podVolume, pod.Name, pod.Namespace, mounts, devices)
dswp.createVolumeSpec(podVolume, pod, mounts, devices)
if err != nil {
klog.Errorf(
"Error processing volume %q for pod %q: %v",
@ -491,29 +491,50 @@ func (dswp *desiredStateOfWorldPopulator) deleteProcessedPod(
// specified volume. It dereference any PVC to get PV objects, if needed.
// Returns an error if unable to obtain the volume at this time.
func (dswp *desiredStateOfWorldPopulator) createVolumeSpec(
podVolume v1.Volume, podName string, podNamespace string, mounts, devices sets.String) (*v1.PersistentVolumeClaim, *volume.Spec, string, error) {
if pvcSource :=
podVolume.VolumeSource.PersistentVolumeClaim; pvcSource != nil {
podVolume v1.Volume, pod *v1.Pod, mounts, devices sets.String) (*v1.PersistentVolumeClaim, *volume.Spec, string, error) {
pvcSource := podVolume.VolumeSource.PersistentVolumeClaim
ephemeral := false
if pvcSource == nil &&
podVolume.VolumeSource.Ephemeral != nil &&
utilfeature.DefaultFeatureGate.Enabled(features.GenericEphemeralVolume) {
// Generic ephemeral inline volumes are handled the
// same way as a PVC reference. The only additional
// constraint (checked below) is that the PVC must be
// owned by the pod.
pvcSource = &v1.PersistentVolumeClaimVolumeSource{
ClaimName: pod.Name + "-" + podVolume.Name,
ReadOnly: podVolume.VolumeSource.Ephemeral.ReadOnly,
}
ephemeral = true
}
if pvcSource != nil {
klog.V(5).Infof(
"Found PVC, ClaimName: %q/%q",
podNamespace,
pod.Namespace,
pvcSource.ClaimName)
// If podVolume is a PVC, fetch the real PV behind the claim
pvc, err := dswp.getPVCExtractPV(
podNamespace, pvcSource.ClaimName)
pod.Namespace, pvcSource.ClaimName)
if err != nil {
return nil, nil, "", fmt.Errorf(
"error processing PVC %s/%s: %v",
podNamespace,
pod.Namespace,
pvcSource.ClaimName,
err)
}
if ephemeral && !metav1.IsControlledBy(pvc, pod) {
return nil, nil, "", fmt.Errorf(
"error processing PVC %s/%s: not the ephemeral PVC for the pod",
pod.Namespace,
pvcSource.ClaimName,
)
}
pvName, pvcUID := pvc.Spec.VolumeName, pvc.UID
klog.V(5).Infof(
"Found bound PV for PVC (ClaimName %q/%q pvcUID %v): pvName=%q",
podNamespace,
pod.Namespace,
pvcSource.ClaimName,
pvcUID,
pvName)
@ -524,7 +545,7 @@ func (dswp *desiredStateOfWorldPopulator) createVolumeSpec(
if err != nil {
return nil, nil, "", fmt.Errorf(
"error processing PVC %s/%s: %v",
podNamespace,
pod.Namespace,
pvcSource.ClaimName,
err)
}
@ -533,7 +554,7 @@ func (dswp *desiredStateOfWorldPopulator) createVolumeSpec(
"Extracted volumeSpec (%v) from bound PV (pvName %q) and PVC (ClaimName %q/%q pvcUID %v)",
volumeSpec.Name(),
pvName,
podNamespace,
pod.Namespace,
pvcSource.ClaimName,
pvcUID)

View File

@ -518,7 +518,7 @@ func TestCreateVolumeSpec_Valid_File_VolumeMounts(t *testing.T) {
fakePodManager.AddPod(pod)
mountsMap, devicesMap := util.GetPodVolumeNames(pod)
_, volumeSpec, _, err :=
dswp.createVolumeSpec(pod.Spec.Volumes[0], pod.Name, pod.Namespace, mountsMap, devicesMap)
dswp.createVolumeSpec(pod.Spec.Volumes[0], pod, mountsMap, devicesMap)
// Assert
if volumeSpec == nil || err != nil {
@ -564,7 +564,7 @@ func TestCreateVolumeSpec_Valid_Nil_VolumeMounts(t *testing.T) {
fakePodManager.AddPod(pod)
mountsMap, devicesMap := util.GetPodVolumeNames(pod)
_, volumeSpec, _, err :=
dswp.createVolumeSpec(pod.Spec.Volumes[0], pod.Name, pod.Namespace, mountsMap, devicesMap)
dswp.createVolumeSpec(pod.Spec.Volumes[0], pod, mountsMap, devicesMap)
// Assert
if volumeSpec == nil || err != nil {
@ -610,7 +610,7 @@ func TestCreateVolumeSpec_Valid_Block_VolumeDevices(t *testing.T) {
fakePodManager.AddPod(pod)
mountsMap, devicesMap := util.GetPodVolumeNames(pod)
_, volumeSpec, _, err :=
dswp.createVolumeSpec(pod.Spec.Volumes[0], pod.Name, pod.Namespace, mountsMap, devicesMap)
dswp.createVolumeSpec(pod.Spec.Volumes[0], pod, mountsMap, devicesMap)
// Assert
if volumeSpec == nil || err != nil {
@ -656,7 +656,7 @@ func TestCreateVolumeSpec_Invalid_File_VolumeDevices(t *testing.T) {
fakePodManager.AddPod(pod)
mountsMap, devicesMap := util.GetPodVolumeNames(pod)
_, volumeSpec, _, err :=
dswp.createVolumeSpec(pod.Spec.Volumes[0], pod.Name, pod.Namespace, mountsMap, devicesMap)
dswp.createVolumeSpec(pod.Spec.Volumes[0], pod, mountsMap, devicesMap)
// Assert
if volumeSpec != nil || err == nil {
@ -702,7 +702,7 @@ func TestCreateVolumeSpec_Invalid_Block_VolumeMounts(t *testing.T) {
fakePodManager.AddPod(pod)
mountsMap, devicesMap := util.GetPodVolumeNames(pod)
_, volumeSpec, _, err :=
dswp.createVolumeSpec(pod.Spec.Volumes[0], pod.Name, pod.Namespace, mountsMap, devicesMap)
dswp.createVolumeSpec(pod.Spec.Volumes[0], pod, mountsMap, devicesMap)
// Assert
if volumeSpec != nil || err == nil {

View File

@ -10,6 +10,7 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/api/v1/pod:go_default_library",
"//pkg/features:go_default_library",
"//pkg/scheduler/apis/config:go_default_library",
"//pkg/scheduler/framework/runtime:go_default_library",
"//pkg/scheduler/framework/v1alpha1:go_default_library",
@ -19,8 +20,10 @@ go_library(
"//pkg/scheduler/profile:go_default_library",
"//pkg/scheduler/util:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/client-go/listers/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/rest:go_default_library",
"//staging/src/k8s.io/kube-scheduler/extender/v1:go_default_library",

View File

@ -29,9 +29,12 @@ import (
"k8s.io/klog/v2"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilfeature "k8s.io/apiserver/pkg/util/feature"
corelisters "k8s.io/client-go/listers/core/v1"
extenderv1 "k8s.io/kube-scheduler/extender/v1"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/scheduler/framework/runtime"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
@ -575,11 +578,19 @@ func podPassesBasicChecks(pod *v1.Pod, pvcLister corelisters.PersistentVolumeCla
manifest := &(pod.Spec)
for i := range manifest.Volumes {
volume := &manifest.Volumes[i]
if volume.PersistentVolumeClaim == nil {
// Volume is not a PVC, ignore
var pvcName string
ephemeral := false
switch {
case volume.PersistentVolumeClaim != nil:
pvcName = volume.PersistentVolumeClaim.ClaimName
case volume.Ephemeral != nil &&
utilfeature.DefaultFeatureGate.Enabled(features.GenericEphemeralVolume):
pvcName = pod.Name + "-" + volume.Name
ephemeral = true
default:
// Volume is not using a PVC, ignore
continue
}
pvcName := volume.PersistentVolumeClaim.ClaimName
pvc, err := pvcLister.PersistentVolumeClaims(namespace).Get(pvcName)
if err != nil {
// The error has already enough context ("persistentvolumeclaim "myclaim" not found")
@ -589,6 +600,11 @@ func podPassesBasicChecks(pod *v1.Pod, pvcLister corelisters.PersistentVolumeCla
if pvc.DeletionTimestamp != nil {
return fmt.Errorf("persistentvolumeclaim %q is being deleted", pvc.Name)
}
if ephemeral &&
!metav1.IsControlledBy(pvc, pod) {
return fmt.Errorf("persistentvolumeclaim %q was not created for the pod", pvc.Name)
}
}
return nil

View File

@ -62,6 +62,7 @@ func (d *stateData) Clone() framework.StateData {
// Reserve and PreBind phases.
type VolumeBinding struct {
Binder scheduling.SchedulerVolumeBinder
GenericEphemeralVolumeFeatureEnabled bool
}
var _ framework.PreFilterPlugin = &VolumeBinding{}
@ -77,9 +78,10 @@ func (pl *VolumeBinding) Name() string {
return Name
}
func podHasPVCs(pod *v1.Pod) bool {
func (pl *VolumeBinding) podHasPVCs(pod *v1.Pod) bool {
for _, vol := range pod.Spec.Volumes {
if vol.PersistentVolumeClaim != nil {
if vol.PersistentVolumeClaim != nil ||
pl.GenericEphemeralVolumeFeatureEnabled && vol.Ephemeral != nil {
return true
}
}
@ -91,7 +93,7 @@ func podHasPVCs(pod *v1.Pod) bool {
// UnschedulableAndUnresolvable is returned.
func (pl *VolumeBinding) PreFilter(ctx context.Context, state *framework.CycleState, pod *v1.Pod) *framework.Status {
// If pod does not reference any PVC, we don't need to do anything.
if !podHasPVCs(pod) {
if !pl.podHasPVCs(pod) {
state.Write(stateKey, &stateData{skip: true})
return nil
}
@ -269,6 +271,7 @@ func New(plArgs runtime.Object, fh framework.FrameworkHandle) (framework.Plugin,
binder := scheduling.NewVolumeBinder(fh.ClientSet(), podInformer, nodeInformer, csiNodeInformer, pvcInformer, pvInformer, storageClassInformer, capacityCheck, time.Duration(args.BindTimeoutSeconds)*time.Second)
return &VolumeBinding{
Binder: binder,
GenericEphemeralVolumeFeatureEnabled: utilfeature.DefaultFeatureGate.Enabled(features.GenericEphemeralVolume),
}, nil
}

View File

@ -20,8 +20,10 @@ import (
"sync"
corev1 "k8s.io/api/core/v1"
utilfeature "k8s.io/apiserver/pkg/util/feature"
pvutil "k8s.io/kubernetes/pkg/api/v1/persistentvolume"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/third_party/forked/gonum/graph"
"k8s.io/kubernetes/third_party/forked/gonum/graph/simple"
)
@ -375,8 +377,14 @@ func (g *Graph) AddPod(pod *corev1.Pod) {
})
for _, v := range pod.Spec.Volumes {
claimName := ""
if v.PersistentVolumeClaim != nil {
pvcVertex := g.getOrCreateVertex_locked(pvcVertexType, pod.Namespace, v.PersistentVolumeClaim.ClaimName)
claimName = v.PersistentVolumeClaim.ClaimName
} else if v.Ephemeral != nil && utilfeature.DefaultFeatureGate.Enabled(features.GenericEphemeralVolume) {
claimName = pod.Name + "-" + v.Name
}
if claimName != "" {
pvcVertex := g.getOrCreateVertex_locked(pvcVertexType, pod.Namespace, claimName)
e := newDestinationEdge(pvcVertex, podVertex, nodeVertex)
g.graph.SetEdge(e)
g.addEdgeToDestinationIndex_locked(e)

View File

@ -178,6 +178,17 @@ func buildControllerRoles() ([]rbacv1.ClusterRole, []rbacv1.ClusterRoleBinding)
})
}
if utilfeature.DefaultFeatureGate.Enabled(features.GenericEphemeralVolume) {
addControllerRole(&controllerRoles, &controllerRoleBindings, rbacv1.ClusterRole{
ObjectMeta: metav1.ObjectMeta{Name: saRolePrefix + "ephemeral-volume-controller"},
Rules: []rbacv1.PolicyRule{
rbacv1helpers.NewRule("get", "list", "watch").Groups(legacyGroup).Resources("pods").RuleOrDie(),
rbacv1helpers.NewRule("get", "list", "watch", "create").Groups(legacyGroup).Resources("persistentvolumeclaims").RuleOrDie(),
eventsRule(),
},
})
}
addControllerRole(&controllerRoles, &controllerRoleBindings, rbacv1.ClusterRole{
ObjectMeta: metav1.ObjectMeta{Name: saRolePrefix + "generic-garbage-collector"},
Rules: []rbacv1.PolicyRule{