Enforce ReadWriteOncePod access mode during scheduling

Check the PVC ref count on the node info cache to determine if a pod's
PVCs are in use. If they are and it is using ReadWriteOncePod, fail the
request.
This commit is contained in:
Chris Henzie 2021-06-16 13:21:49 -07:00
parent ebc3fdb293
commit 7ad44d04fc
11 changed files with 271 additions and 9 deletions

View File

@ -32,6 +32,7 @@ var PluginsV1beta1 = &config.Plugins{
Enabled: []config.Plugin{
{Name: names.NodeResourcesFit},
{Name: names.NodePorts},
{Name: names.VolumeRestrictions},
{Name: names.PodTopologySpread},
{Name: names.InterPodAffinity},
{Name: names.VolumeBinding},
@ -113,6 +114,7 @@ var PluginsV1beta2 = &config.Plugins{
Enabled: []config.Plugin{
{Name: names.NodeResourcesFit},
{Name: names.NodePorts},
{Name: names.VolumeRestrictions},
{Name: names.PodTopologySpread},
{Name: names.InterPodAffinity},
{Name: names.VolumeBinding},

View File

@ -38,6 +38,7 @@ func getDefaultPlugins() *v1beta1.Plugins {
Enabled: []v1beta1.Plugin{
{Name: names.NodeResourcesFit},
{Name: names.NodePorts},
{Name: names.VolumeRestrictions},
{Name: names.PodTopologySpread},
{Name: names.InterPodAffinity},
{Name: names.VolumeBinding},

View File

@ -47,6 +47,7 @@ func TestApplyFeatureGates(t *testing.T) {
Enabled: []v1beta1.Plugin{
{Name: names.NodeResourcesFit},
{Name: names.NodePorts},
{Name: names.VolumeRestrictions},
{Name: names.PodTopologySpread},
{Name: names.InterPodAffinity},
{Name: names.VolumeBinding},
@ -129,6 +130,7 @@ func TestApplyFeatureGates(t *testing.T) {
Enabled: []v1beta1.Plugin{
{Name: names.NodeResourcesFit},
{Name: names.NodePorts},
{Name: names.VolumeRestrictions},
{Name: names.PodTopologySpread},
{Name: names.InterPodAffinity},
{Name: names.VolumeBinding},

View File

@ -329,6 +329,7 @@ func TestSchedulerDefaults(t *testing.T) {
Enabled: []v1beta1.Plugin{
{Name: names.NodeResourcesFit},
{Name: names.NodePorts},
{Name: names.VolumeRestrictions},
{Name: names.PodTopologySpread},
{Name: names.InterPodAffinity},
{Name: names.VolumeBinding},

View File

@ -38,6 +38,7 @@ func getDefaultPlugins() *v1beta2.Plugins {
Enabled: []v1beta2.Plugin{
{Name: names.NodeResourcesFit},
{Name: names.NodePorts},
{Name: names.VolumeRestrictions},
{Name: names.PodTopologySpread},
{Name: names.InterPodAffinity},
{Name: names.VolumeBinding},

View File

@ -47,6 +47,7 @@ func TestApplyFeatureGates(t *testing.T) {
Enabled: []v1beta2.Plugin{
{Name: names.NodeResourcesFit},
{Name: names.NodePorts},
{Name: names.VolumeRestrictions},
{Name: names.PodTopologySpread},
{Name: names.InterPodAffinity},
{Name: names.VolumeBinding},
@ -128,6 +129,7 @@ func TestApplyFeatureGates(t *testing.T) {
Enabled: []v1beta2.Plugin{
{Name: names.NodeResourcesFit},
{Name: names.NodePorts},
{Name: names.VolumeRestrictions},
{Name: names.PodTopologySpread},
{Name: names.InterPodAffinity},
{Name: names.VolumeBinding},

View File

@ -332,6 +332,7 @@ func TestSchedulerDefaults(t *testing.T) {
Enabled: []v1beta2.Plugin{
{Name: names.NodeResourcesFit},
{Name: names.NodePorts},
{Name: names.VolumeRestrictions},
{Name: names.PodTopologySpread},
{Name: names.InterPodAffinity},
{Name: names.VolumeBinding},

View File

@ -23,4 +23,5 @@ type Features struct {
EnablePodAffinityNamespaceSelector bool
EnablePodDisruptionBudget bool
EnablePodOverhead bool
EnableReadWriteOncePod bool
}

View File

@ -53,6 +53,7 @@ func NewInTreeRegistry() runtime.Registry {
EnablePodAffinityNamespaceSelector: feature.DefaultFeatureGate.Enabled(features.PodAffinityNamespaceSelector),
EnablePodDisruptionBudget: feature.DefaultFeatureGate.Enabled(features.PodDisruptionBudget),
EnablePodOverhead: feature.DefaultFeatureGate.Enabled(features.PodOverhead),
EnableReadWriteOncePod: feature.DefaultFeatureGate.Enabled(features.ReadWriteOncePod),
}
return runtime.Registry{
@ -80,8 +81,10 @@ func NewInTreeRegistry() runtime.Registry {
noderesources.RequestedToCapacityRatioName: func(plArgs apiruntime.Object, fh framework.Handle) (framework.Plugin, error) {
return noderesources.NewRequestedToCapacityRatio(plArgs, fh, fts)
},
volumebinding.Name: volumebinding.New,
volumerestrictions.Name: volumerestrictions.New,
volumebinding.Name: volumebinding.New,
volumerestrictions.Name: func(plArgs apiruntime.Object, fh framework.Handle) (framework.Plugin, error) {
return volumerestrictions.New(plArgs, fh, fts)
},
volumezone.Name: volumezone.New,
nodevolumelimits.CSIName: nodevolumelimits.NewCSI,
nodevolumelimits.EBSName: nodevolumelimits.NewEBS,

View File

@ -18,17 +18,29 @@ package volumerestrictions
import (
"context"
"sync/atomic"
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/sets"
corelisters "k8s.io/client-go/listers/core/v1"
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
"k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/names"
"k8s.io/kubernetes/pkg/scheduler/internal/parallelize"
)
// VolumeRestrictions is a plugin that checks volume restrictions.
type VolumeRestrictions struct{}
type VolumeRestrictions struct {
parallelizer parallelize.Parallelizer
pvcLister corelisters.PersistentVolumeClaimLister
nodeInfoLister framework.SharedLister
enableReadWriteOncePod bool
}
var _ framework.PreFilterPlugin = &VolumeRestrictions{}
var _ framework.FilterPlugin = &VolumeRestrictions{}
var _ framework.EnqueueExtensions = &VolumeRestrictions{}
@ -38,6 +50,8 @@ const Name = names.VolumeRestrictions
const (
// ErrReasonDiskConflict is used for NoDiskConflict predicate error.
ErrReasonDiskConflict = "node(s) had no available disk"
// ErrReasonReadWriteOncePodConflict is used when a pod is found using the same PVC with the ReadWriteOncePod access mode.
ErrReasonReadWriteOncePodConflict = "node has pod using PersistentVolumeClaim with the same name and ReadWriteOncePod access mode"
)
// Name returns name of the plugin. It is used in logs, etc.
@ -106,6 +120,72 @@ func haveOverlap(a1, a2 []string) bool {
return false
}
func (pl *VolumeRestrictions) PreFilter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod) *framework.Status {
if pl.enableReadWriteOncePod {
return pl.isReadWriteOncePodAccessModeConflict(pod)
}
return framework.NewStatus(framework.Success)
}
// isReadWriteOncePodAccessModeConflict checks if a pod uses a PVC with the ReadWriteOncePod access mode.
// This access mode restricts volume access to a single pod on a single node. Since only a single pod can
// use a ReadWriteOncePod PVC, mark any other pods attempting to use this PVC as UnschedulableAndUnresolvable.
// TODO(#103132): Mark pod as Unschedulable and add preemption logic.
func (pl *VolumeRestrictions) isReadWriteOncePodAccessModeConflict(pod *v1.Pod) *framework.Status {
nodeInfos, err := pl.nodeInfoLister.NodeInfos().List()
if err != nil {
return framework.NewStatus(framework.Error, "error while getting node info")
}
var pvcKeys []string
for _, volume := range pod.Spec.Volumes {
if volume.PersistentVolumeClaim == nil {
continue
}
pvc, err := pl.pvcLister.PersistentVolumeClaims(pod.Namespace).Get(volume.PersistentVolumeClaim.ClaimName)
if err != nil {
if apierrors.IsNotFound(err) {
return framework.NewStatus(framework.UnschedulableAndUnresolvable, err.Error())
}
return framework.AsStatus(err)
}
if !v1helper.ContainsAccessMode(pvc.Spec.AccessModes, v1.ReadWriteOncePod) {
continue
}
key := pod.Namespace + "/" + volume.PersistentVolumeClaim.ClaimName
pvcKeys = append(pvcKeys, key)
}
ctx, cancel := context.WithCancel(context.Background())
var conflicts uint32
processNode := func(i int) {
nodeInfo := nodeInfos[i]
for _, key := range pvcKeys {
refCount := nodeInfo.PVCRefCounts[key]
if refCount > 0 {
atomic.AddUint32(&conflicts, 1)
cancel()
}
}
}
pl.parallelizer.Until(ctx, len(nodeInfos), processNode)
// Enforce ReadWriteOncePod access mode. This is also enforced during volume mount in kubelet.
if conflicts > 0 {
return framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrReasonReadWriteOncePodConflict)
}
return nil
}
func (pl *VolumeRestrictions) PreFilterExtensions() framework.PreFilterExtensions {
return nil
}
// Filter invoked at the filter extension point.
// It evaluates if a pod can fit due to the volumes it requests, and those that
// are already mounted. If there is already a volume mounted on that node, another pod that uses the same volume
@ -142,10 +222,22 @@ func (pl *VolumeRestrictions) EventsToRegister() []framework.ClusterEvent {
{Resource: framework.Pod, ActionType: framework.Delete},
// A new Node may make a pod schedulable.
{Resource: framework.Node, ActionType: framework.Add},
// Pods may fail to schedule because the PVC it uses has not yet been created.
// This PVC is required to exist to check its access modes.
{Resource: framework.PersistentVolumeClaim, ActionType: framework.Add | framework.Update},
}
}
// New initializes a new plugin and returns it.
func New(_ runtime.Object, _ framework.Handle) (framework.Plugin, error) {
return &VolumeRestrictions{}, nil
func New(_ runtime.Object, handle framework.Handle, fts feature.Features) (framework.Plugin, error) {
informerFactory := handle.SharedInformerFactory()
pvcLister := informerFactory.Core().V1().PersistentVolumeClaims().Lister()
nodeInfoLister := handle.SnapshotSharedLister()
return &VolumeRestrictions{
parallelizer: handle.Parallelizer(),
pvcLister: pvcLister,
nodeInfoLister: nodeInfoLister,
enableReadWriteOncePod: fts.EnableReadWriteOncePod,
}, nil
}

View File

@ -22,7 +22,16 @@ import (
"testing"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
utilfeature "k8s.io/apiserver/pkg/util/feature"
featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature"
plugintesting "k8s.io/kubernetes/pkg/scheduler/framework/plugins/testing"
"k8s.io/kubernetes/pkg/scheduler/internal/cache"
)
func TestGCEDiskConflicts(t *testing.T) {
@ -64,7 +73,7 @@ func TestGCEDiskConflicts(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
p, _ := New(nil, nil)
p := newPlugin(t)
gotStatus := p.(framework.FilterPlugin).Filter(context.Background(), nil, test.pod, test.nodeInfo)
if !reflect.DeepEqual(gotStatus, test.wantStatus) {
t.Errorf("status does not match: %v, want: %v", gotStatus, test.wantStatus)
@ -112,7 +121,7 @@ func TestAWSDiskConflicts(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
p, _ := New(nil, nil)
p := newPlugin(t)
gotStatus := p.(framework.FilterPlugin).Filter(context.Background(), nil, test.pod, test.nodeInfo)
if !reflect.DeepEqual(gotStatus, test.wantStatus) {
t.Errorf("status does not match: %v, want: %v", gotStatus, test.wantStatus)
@ -166,7 +175,7 @@ func TestRBDDiskConflicts(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
p, _ := New(nil, nil)
p := newPlugin(t)
gotStatus := p.(framework.FilterPlugin).Filter(context.Background(), nil, test.pod, test.nodeInfo)
if !reflect.DeepEqual(gotStatus, test.wantStatus) {
t.Errorf("status does not match: %v, want: %v", gotStatus, test.wantStatus)
@ -220,7 +229,7 @@ func TestISCSIDiskConflicts(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
p, _ := New(nil, nil)
p := newPlugin(t)
gotStatus := p.(framework.FilterPlugin).Filter(context.Background(), nil, test.pod, test.nodeInfo)
if !reflect.DeepEqual(gotStatus, test.wantStatus) {
t.Errorf("status does not match: %v, want: %v", gotStatus, test.wantStatus)
@ -228,3 +237,150 @@ func TestISCSIDiskConflicts(t *testing.T) {
})
}
}
func TestAccessModeConflicts(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ReadWriteOncePod, true)()
podWithReadWriteOncePodPVC := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
// Required for querying lister for PVCs in the same namespace.
Namespace: "default",
Name: "pod-with-rwop",
},
Spec: v1.PodSpec{
NodeName: "node-1",
Volumes: []v1.Volume{
{
VolumeSource: v1.VolumeSource{
PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{
ClaimName: "claim-with-rwop",
},
},
},
},
},
}
podWithReadWriteManyPVC := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
// Required for querying lister for PVCs in the same namespace.
Namespace: "default",
Name: "pod-with-rwx",
},
Spec: v1.PodSpec{
NodeName: "node-1",
Volumes: []v1.Volume{
{
VolumeSource: v1.VolumeSource{
PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{
ClaimName: "claim-with-rwx",
},
},
},
},
},
}
node := &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "node-1",
},
}
readWriteOncePodPVC := &v1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "claim-with-rwop",
},
Spec: v1.PersistentVolumeClaimSpec{
AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadWriteOncePod},
},
}
readWriteManyPVC := &v1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "claim-with-rwx",
},
Spec: v1.PersistentVolumeClaimSpec{
AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadWriteMany},
},
}
tests := []struct {
name string
pod *v1.Pod
existingPods []*v1.Pod
existingNodes []*v1.Node
existingPVCs []*v1.PersistentVolumeClaim
enableReadWriteOncePod bool
wantStatus *framework.Status
}{
{
name: "nothing",
pod: &v1.Pod{},
existingPods: []*v1.Pod{},
existingNodes: []*v1.Node{},
existingPVCs: []*v1.PersistentVolumeClaim{},
enableReadWriteOncePod: true,
wantStatus: nil,
},
{
name: "failed to get PVC",
pod: podWithReadWriteOncePodPVC,
existingPods: []*v1.Pod{},
existingNodes: []*v1.Node{},
existingPVCs: []*v1.PersistentVolumeClaim{},
enableReadWriteOncePod: true,
wantStatus: framework.NewStatus(framework.UnschedulableAndUnresolvable, "persistentvolumeclaim \"claim-with-rwop\" not found"),
},
{
name: "no access mode conflict",
pod: podWithReadWriteOncePodPVC,
existingPods: []*v1.Pod{podWithReadWriteManyPVC},
existingNodes: []*v1.Node{node},
existingPVCs: []*v1.PersistentVolumeClaim{readWriteOncePodPVC, readWriteManyPVC},
enableReadWriteOncePod: true,
wantStatus: nil,
},
{
name: "access mode conflict",
pod: podWithReadWriteOncePodPVC,
existingPods: []*v1.Pod{podWithReadWriteOncePodPVC, podWithReadWriteManyPVC},
existingNodes: []*v1.Node{node},
existingPVCs: []*v1.PersistentVolumeClaim{readWriteOncePodPVC, readWriteManyPVC},
enableReadWriteOncePod: true,
wantStatus: framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrReasonReadWriteOncePodConflict),
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
p := newPluginWithListers(t, test.existingPods, test.existingNodes, test.existingPVCs, test.enableReadWriteOncePod)
gotStatus := p.(framework.PreFilterPlugin).PreFilter(context.Background(), nil, test.pod)
if !reflect.DeepEqual(gotStatus, test.wantStatus) {
t.Errorf("status does not match: %+v, want: %+v", gotStatus, test.wantStatus)
}
})
}
}
func newPlugin(t *testing.T) framework.Plugin {
return newPluginWithListers(t, nil, nil, nil, true)
}
func newPluginWithListers(t *testing.T, pods []*v1.Pod, nodes []*v1.Node, pvcs []*v1.PersistentVolumeClaim, enableReadWriteOncePod bool) framework.Plugin {
ctx := context.Background()
pluginFactory := func(plArgs runtime.Object, fh framework.Handle) (framework.Plugin, error) {
return New(plArgs, fh, feature.Features{
EnableReadWriteOncePod: enableReadWriteOncePod,
})
}
snapshot := cache.NewSnapshot(pods, nodes)
objects := make([]runtime.Object, 0, len(pvcs))
for _, pvc := range pvcs {
objects = append(objects, pvc)
}
return plugintesting.SetupPluginWithInformers(ctx, t, pluginFactory, &config.InterPodAffinityArgs{}, snapshot, objects)
}