mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-30 15:05:27 +00:00
volumezone: scheduler queueing hints: pv (#125001)
* volumezone: scheduler queueing hints * add_comment
This commit is contained in:
parent
a39f42582f
commit
52a622ad6d
@ -20,6 +20,7 @@ import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"reflect"
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
storage "k8s.io/api/storage/v1"
|
||||
@ -158,21 +159,7 @@ func (pl *VolumeZone) getPVbyPod(logger klog.Logger, pod *v1.Pod) ([]pvTopology,
|
||||
if s := getErrorAsStatus(err); !s.IsSuccess() {
|
||||
return nil, s
|
||||
}
|
||||
|
||||
for _, key := range topologyLabels {
|
||||
if value, ok := pv.ObjectMeta.Labels[key]; ok {
|
||||
volumeVSet, err := volumehelpers.LabelZonesToSet(value)
|
||||
if err != nil {
|
||||
logger.Info("Failed to parse label, ignoring the label", "label", fmt.Sprintf("%s:%s", key, value), "err", err)
|
||||
continue
|
||||
}
|
||||
podPVTopologies = append(podPVTopologies, pvTopology{
|
||||
pvName: pv.Name,
|
||||
key: key,
|
||||
values: sets.Set[string](volumeVSet),
|
||||
})
|
||||
}
|
||||
}
|
||||
podPVTopologies = append(podPVTopologies, pl.getPVTopologies(logger, pv)...)
|
||||
}
|
||||
return podPVTopologies, nil
|
||||
}
|
||||
@ -292,7 +279,7 @@ func (pl *VolumeZone) EventsToRegister() []framework.ClusterEventWithHint {
|
||||
// Also, if pvc's VolumeName is filled, that also could make a pod schedulable.
|
||||
{Event: framework.ClusterEvent{Resource: framework.PersistentVolumeClaim, ActionType: framework.Add | framework.Update}, QueueingHintFn: pl.isSchedulableAfterPersistentVolumeClaimChange},
|
||||
// A new pv or updating a pv's volume zone labels may make a pod schedulable.
|
||||
{Event: framework.ClusterEvent{Resource: framework.PersistentVolume, ActionType: framework.Add | framework.Update}},
|
||||
{Event: framework.ClusterEvent{Resource: framework.PersistentVolume, ActionType: framework.Add | framework.Update}, QueueingHintFn: pl.isSchedulableAfterPersistentVolumeChange},
|
||||
}
|
||||
}
|
||||
|
||||
@ -359,6 +346,52 @@ func (pl *VolumeZone) isSchedulableAfterStorageClassAdded(logger klog.Logger, po
|
||||
return framework.Queue, nil
|
||||
}
|
||||
|
||||
// isSchedulableAfterPersistentVolumeChange is invoked whenever a PersistentVolume added or updated.
|
||||
// It checks whether the change of PV has made a previously unschedulable pod schedulable.
|
||||
// Changing the PV topology labels could cause the pod to become schedulable.
|
||||
func (pl *VolumeZone) isSchedulableAfterPersistentVolumeChange(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) {
|
||||
originalPV, modifiedPV, err := util.As[*v1.PersistentVolume](oldObj, newObj)
|
||||
if err != nil {
|
||||
return framework.Queue, fmt.Errorf("unexpected objects in isSchedulableAfterPersistentVolumeChange: %w", err)
|
||||
}
|
||||
if originalPV == nil {
|
||||
logger.V(5).Info("PV is newly created, which might make the pod schedulable")
|
||||
return framework.Queue, nil
|
||||
}
|
||||
originalPVTopologies := pl.getPVTopologies(logger, originalPV)
|
||||
modifiedPVTopologies := pl.getPVTopologies(logger, modifiedPV)
|
||||
if !reflect.DeepEqual(originalPVTopologies, modifiedPVTopologies) {
|
||||
logger.V(5).Info("PV's topology was updated, which might make the pod schedulable.", "pod", klog.KObj(pod), "PV", klog.KObj(modifiedPV))
|
||||
return framework.Queue, nil
|
||||
}
|
||||
|
||||
logger.V(5).Info("PV was updated, but the topology is unchanged, which it doesn't make the pod schedulable", "pod", klog.KObj(pod), "PV", klog.KObj(modifiedPV))
|
||||
return framework.QueueSkip, nil
|
||||
}
|
||||
|
||||
// getPVTopologies retrieves pvTopology from a given PV and returns the array
|
||||
// This function doesn't check spec.nodeAffinity
|
||||
// because it's read-only after creation and thus cannot be updated
|
||||
// and nodeAffinity is being handled in node affinity plugin
|
||||
func (pl *VolumeZone) getPVTopologies(logger klog.Logger, pv *v1.PersistentVolume) []pvTopology {
|
||||
podPVTopologies := make([]pvTopology, 0)
|
||||
for _, key := range topologyLabels {
|
||||
if value, ok := pv.ObjectMeta.Labels[key]; ok {
|
||||
labelZonesSet, err := volumehelpers.LabelZonesToSet(value)
|
||||
if err != nil {
|
||||
logger.V(5).Info("failed to parse PV's topology label, ignoring the label", "label", fmt.Sprintf("%s:%s", key, value), "err", err)
|
||||
continue
|
||||
}
|
||||
podPVTopologies = append(podPVTopologies, pvTopology{
|
||||
pvName: pv.Name,
|
||||
key: key,
|
||||
values: sets.Set[string](labelZonesSet),
|
||||
})
|
||||
}
|
||||
}
|
||||
return podPVTopologies
|
||||
}
|
||||
|
||||
// New initializes a new plugin and returns it.
|
||||
func New(_ context.Context, _ runtime.Object, handle framework.Handle) (framework.Plugin, error) {
|
||||
informerFactory := handle.SharedInformerFactory()
|
||||
|
@ -675,6 +675,105 @@ func TestIsSchedulableAfterStorageClassAdded(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestIsSchedulableAfterPersistentVolumeChange(t *testing.T) {
|
||||
testcases := map[string]struct {
|
||||
pod *v1.Pod
|
||||
oldObj, newObj interface{}
|
||||
expectedHint framework.QueueingHint
|
||||
expectedErr bool
|
||||
}{
|
||||
"error-wrong-new-object": {
|
||||
pod: createPodWithVolume("pod_1", "PVC_1"),
|
||||
newObj: "not-a-pv",
|
||||
expectedHint: framework.Queue,
|
||||
expectedErr: true,
|
||||
},
|
||||
"error-wrong-old-object": {
|
||||
pod: createPodWithVolume("pod_1", "PVC_1"),
|
||||
oldObj: "not-a-pv",
|
||||
newObj: st.MakePersistentVolume().Name("Vol_1").Obj(),
|
||||
expectedHint: framework.Queue,
|
||||
expectedErr: true,
|
||||
},
|
||||
"new-pv-was-added": {
|
||||
pod: createPodWithVolume("pod_1", "PVC_1"),
|
||||
newObj: &v1.PersistentVolume{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "Vol_1",
|
||||
Labels: map[string]string{v1.LabelFailureDomainBetaZone: "us-west1-b"},
|
||||
},
|
||||
},
|
||||
expectedHint: framework.Queue,
|
||||
},
|
||||
"pv-was-updated-and-changed-topology": {
|
||||
pod: createPodWithVolume("pod_1", "PVC_1"),
|
||||
oldObj: &v1.PersistentVolume{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "Vol_1",
|
||||
Labels: map[string]string{v1.LabelFailureDomainBetaZone: "us-west1-a"},
|
||||
},
|
||||
},
|
||||
newObj: &v1.PersistentVolume{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "Vol_1",
|
||||
Labels: map[string]string{v1.LabelFailureDomainBetaZone: "us-west1-b"},
|
||||
},
|
||||
},
|
||||
expectedHint: framework.Queue,
|
||||
},
|
||||
"pv-was-updated-and-added-topology-label": {
|
||||
pod: createPodWithVolume("pod_1", "PVC_1"),
|
||||
oldObj: &v1.PersistentVolume{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "Vol_1",
|
||||
Labels: map[string]string{v1.LabelFailureDomainBetaZone: "us-west1-a"},
|
||||
},
|
||||
},
|
||||
newObj: &v1.PersistentVolume{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "Vol_1",
|
||||
Labels: map[string]string{v1.LabelFailureDomainBetaZone: "us-west1-a",
|
||||
v1.LabelTopologyZone: "zone"},
|
||||
},
|
||||
},
|
||||
expectedHint: framework.Queue,
|
||||
},
|
||||
"pv-was-updated-but-no-topology-is-changed": {
|
||||
pod: createPodWithVolume("pod_1", "PVC_1"),
|
||||
oldObj: &v1.PersistentVolume{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "Vol_1",
|
||||
Labels: map[string]string{v1.LabelFailureDomainBetaZone: "us-west1-a",
|
||||
v1.LabelTopologyZone: "zone"},
|
||||
},
|
||||
},
|
||||
newObj: &v1.PersistentVolume{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "Vol_1",
|
||||
Labels: map[string]string{v1.LabelFailureDomainBetaZone: "us-west1-a",
|
||||
v1.LabelTopologyZone: "zone"},
|
||||
},
|
||||
},
|
||||
expectedHint: framework.QueueSkip,
|
||||
},
|
||||
}
|
||||
|
||||
for name, tc := range testcases {
|
||||
t.Run(name, func(t *testing.T) {
|
||||
logger, _ := ktesting.NewTestContext(t)
|
||||
p := &VolumeZone{}
|
||||
|
||||
got, err := p.isSchedulableAfterPersistentVolumeChange(logger, tc.pod, tc.oldObj, tc.newObj)
|
||||
if err != nil && !tc.expectedErr {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
if got != tc.expectedHint {
|
||||
t.Errorf("isSchedulableAfterPersistentVolumeChange() = %v, want %v", got, tc.expectedHint)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkVolumeZone(b *testing.B) {
|
||||
tests := []struct {
|
||||
Name string
|
||||
|
Loading…
Reference in New Issue
Block a user