Implement QueueingHintFn for pod deleted event

This commit is contained in:
HirazawaUi 2024-06-02 21:44:24 +08:00
parent 9fc0315ce8
commit f9693e0c0a
2 changed files with 214 additions and 1 deletions

View File

@ -25,10 +25,12 @@ import (
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/sets"
corelisters "k8s.io/client-go/listers/core/v1" corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/klog/v2"
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper" v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
"k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/names" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/names"
"k8s.io/kubernetes/pkg/scheduler/util"
) )
// VolumeRestrictions is a plugin that checks volume restrictions. // VolumeRestrictions is a plugin that checks volume restrictions.
@ -321,8 +323,9 @@ func (pl *VolumeRestrictions) EventsToRegister() []framework.ClusterEventWithHin
// Pods may fail to schedule because of volumes conflicting with other pods on same node. // Pods may fail to schedule because of volumes conflicting with other pods on same node.
// Once running pods are deleted and volumes have been released, the unschedulable pod will be schedulable. // Once running pods are deleted and volumes have been released, the unschedulable pod will be schedulable.
// Due to immutable fields `spec.volumes`, pod update events are ignored. // Due to immutable fields `spec.volumes`, pod update events are ignored.
{Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.Delete}}, {Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.Delete}, QueueingHintFn: pl.isSchedulableAfterPodDeleted},
// A new Node may make a pod schedulable. // A new Node may make a pod schedulable.
// We intentionally don't set QueueingHint since all Node/Add events could make Pods schedulable.
{Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add}}, {Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add}},
// Pods may fail to schedule because the PVC it uses has not yet been created. // Pods may fail to schedule because the PVC it uses has not yet been created.
// This PVC is required to exist to check its access modes. // This PVC is required to exist to check its access modes.
@ -330,6 +333,27 @@ func (pl *VolumeRestrictions) EventsToRegister() []framework.ClusterEventWithHin
} }
} }
// isSchedulableAfterPodDeleted is invoked whenever a pod deleted,
// It checks whether the deleted pod will conflict with volumes of other pods on the same node
// TODO If we observe good throughput, we will add a check for conflicts between the deleted Pod and the readWriteOncePodPVC of the current Pod.
func (pl *VolumeRestrictions) isSchedulableAfterPodDeleted(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) {
deletedPod, _, err := util.As[*v1.Pod](oldObj, newObj)
if err != nil {
return framework.Queue, fmt.Errorf("unexpected objects in isSchedulableAfterPodDeleted: %w", err)
}
if deletedPod.Namespace != pod.Namespace {
return framework.QueueSkip, nil
}
nodeInfo := framework.NewNodeInfo(deletedPod)
if !satisfyVolumeConflicts(pod, nodeInfo) {
return framework.Queue, nil
}
return framework.QueueSkip, nil
}
// New initializes a new plugin and returns it. // New initializes a new plugin and returns it.
func New(_ context.Context, _ runtime.Object, handle framework.Handle, fts feature.Features) (framework.Plugin, error) { func New(_ context.Context, _ runtime.Object, handle framework.Handle, fts feature.Features) (framework.Plugin, error) {
informerFactory := handle.SharedInformerFactory() informerFactory := handle.SharedInformerFactory()

View File

@ -24,6 +24,7 @@ import (
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/klog/v2/ktesting"
"k8s.io/kubernetes/pkg/scheduler/apis/config" "k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature"
@ -478,6 +479,194 @@ func TestAccessModeConflicts(t *testing.T) {
} }
} }
func Test_isSchedulableAfterPodDeleted(t *testing.T) {
GCEDiskVolState := v1.Volume{
VolumeSource: v1.VolumeSource{
GCEPersistentDisk: &v1.GCEPersistentDiskVolumeSource{
PDName: "foo",
},
},
}
GCEDiskVolState2 := v1.Volume{
VolumeSource: v1.VolumeSource{
GCEPersistentDisk: &v1.GCEPersistentDiskVolumeSource{
PDName: "bar",
},
},
}
AWSDiskVolState := v1.Volume{
VolumeSource: v1.VolumeSource{
AWSElasticBlockStore: &v1.AWSElasticBlockStoreVolumeSource{
VolumeID: "foo",
},
},
}
AWSDiskVolState2 := v1.Volume{
VolumeSource: v1.VolumeSource{
AWSElasticBlockStore: &v1.AWSElasticBlockStoreVolumeSource{
VolumeID: "bar",
},
},
}
RBDDiskVolState := v1.Volume{
VolumeSource: v1.VolumeSource{
RBD: &v1.RBDVolumeSource{
CephMonitors: []string{"a", "b"},
RBDPool: "foo",
RBDImage: "bar",
FSType: "ext4",
},
},
}
RBDDiskVolState2 := v1.Volume{
VolumeSource: v1.VolumeSource{
RBD: &v1.RBDVolumeSource{
CephMonitors: []string{"c", "d"},
RBDPool: "foo",
RBDImage: "bar",
FSType: "ext4",
},
},
}
ISCSIDiskVolState := v1.Volume{
VolumeSource: v1.VolumeSource{
ISCSI: &v1.ISCSIVolumeSource{
TargetPortal: "127.0.0.1:3260",
IQN: "iqn.2016-12.server:storage.target01",
FSType: "ext4",
Lun: 0,
},
},
}
ISCSIDiskVolState2 := v1.Volume{
VolumeSource: v1.VolumeSource{
ISCSI: &v1.ISCSIVolumeSource{
TargetPortal: "127.0.0.1:3260",
IQN: "iqn.2017-12.server:storage.target01",
FSType: "ext4",
Lun: 0,
},
},
}
podGCEDisk := st.MakePod().Volume(GCEDiskVolState).Obj()
podGCEDiskConflicts := st.MakePod().Volume(GCEDiskVolState).Obj()
podGCEDiskNoConflicts := st.MakePod().Volume(GCEDiskVolState2).Obj()
podAWSDisk := st.MakePod().Volume(AWSDiskVolState).Obj()
podAWSDiskConflicts := st.MakePod().Volume(AWSDiskVolState).Obj()
podAWSDiskNoConflicts := st.MakePod().Volume(AWSDiskVolState2).Obj()
podRBDDiskDisk := st.MakePod().Volume(RBDDiskVolState).Obj()
podRBDDiskDiskConflicts := st.MakePod().Volume(RBDDiskVolState).Obj()
podRBDDiskNoConflicts := st.MakePod().Volume(RBDDiskVolState2).Obj()
podISCSIDiskDisk := st.MakePod().Volume(ISCSIDiskVolState).Obj()
podISCSIDiskConflicts := st.MakePod().Volume(ISCSIDiskVolState).Obj()
podISCSIDiskNoConflicts := st.MakePod().Volume(ISCSIDiskVolState2).Obj()
testcases := map[string]struct {
pod *v1.Pod
oldObj, newObj interface{}
existingPods []*v1.Pod
existingPVC *v1.PersistentVolumeClaim
expectedHint framework.QueueingHint
expectedErr bool
}{
"queue-new-object-gcedisk-conflict": {
pod: podGCEDisk,
oldObj: podGCEDiskConflicts,
existingPods: []*v1.Pod{},
existingPVC: &v1.PersistentVolumeClaim{},
expectedHint: framework.Queue,
expectedErr: false,
},
"skip-new-object-gcedisk-no-conflict": {
pod: podGCEDisk,
oldObj: podGCEDiskNoConflicts,
existingPods: []*v1.Pod{},
existingPVC: &v1.PersistentVolumeClaim{},
expectedHint: framework.QueueSkip,
expectedErr: false,
},
"queue-new-object-awsdisk-conflict": {
pod: podAWSDisk,
oldObj: podAWSDiskConflicts,
existingPods: []*v1.Pod{},
existingPVC: &v1.PersistentVolumeClaim{},
expectedHint: framework.Queue,
expectedErr: false,
},
"skip-new-object-awsdisk-no-conflict": {
pod: podAWSDisk,
oldObj: podAWSDiskNoConflicts,
existingPods: []*v1.Pod{},
existingPVC: &v1.PersistentVolumeClaim{},
expectedHint: framework.QueueSkip,
expectedErr: false,
},
"queue-new-object-rbddisk-conflict": {
pod: podRBDDiskDisk,
oldObj: podRBDDiskDiskConflicts,
existingPods: []*v1.Pod{},
existingPVC: &v1.PersistentVolumeClaim{},
expectedHint: framework.Queue,
expectedErr: false,
},
"skip-new-object-rbddisk-no-conflict": {
pod: podRBDDiskDisk,
oldObj: podRBDDiskNoConflicts,
existingPods: []*v1.Pod{},
existingPVC: &v1.PersistentVolumeClaim{},
expectedHint: framework.QueueSkip,
expectedErr: false,
},
"queue-new-object-iscsidisk-conflict": {
pod: podISCSIDiskDisk,
oldObj: podISCSIDiskConflicts,
existingPods: []*v1.Pod{},
existingPVC: &v1.PersistentVolumeClaim{},
expectedHint: framework.Queue,
expectedErr: false,
},
"skip-new-object-iscsidisk-no-conflict": {
pod: podISCSIDiskDisk,
oldObj: podISCSIDiskNoConflicts,
existingPods: []*v1.Pod{},
existingPVC: &v1.PersistentVolumeClaim{},
expectedHint: framework.QueueSkip,
expectedErr: false,
},
}
for name, tc := range testcases {
t.Run(name, func(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
p := newPluginWithListers(ctx, t, tc.existingPods, nil, []*v1.PersistentVolumeClaim{tc.existingPVC})
actualHint, err := p.(*VolumeRestrictions).isSchedulableAfterPodDeleted(logger, tc.pod, tc.oldObj, nil)
if tc.expectedErr {
if err == nil {
t.Error("Expect error, but got nil")
}
return
}
if err != nil {
t.Errorf("Unexpected error: %v", err)
}
if diff := cmp.Diff(tc.expectedHint, actualHint); diff != "" {
t.Errorf("Unexpected QueueingHint (-want, +got): %s", diff)
}
})
}
}
func newPlugin(ctx context.Context, t *testing.T) framework.Plugin { func newPlugin(ctx context.Context, t *testing.T) framework.Plugin {
return newPluginWithListers(ctx, t, nil, nil, nil) return newPluginWithListers(ctx, t, nil, nil, nil)
} }