diff --git a/pkg/scheduler/apis/config/testing/defaults/defaults.go b/pkg/scheduler/apis/config/testing/defaults/defaults.go index b6d4825a630..b3492b1289f 100644 --- a/pkg/scheduler/apis/config/testing/defaults/defaults.go +++ b/pkg/scheduler/apis/config/testing/defaults/defaults.go @@ -36,6 +36,7 @@ var PluginsV1beta2 = &config.Plugins{ {Name: names.PodTopologySpread}, {Name: names.InterPodAffinity}, {Name: names.VolumeBinding}, + {Name: names.VolumeZone}, {Name: names.NodeAffinity}, }, }, @@ -193,6 +194,7 @@ var ExpandedPluginsV1beta3 = &config.Plugins{ {Name: names.NodeResourcesFit}, {Name: names.VolumeRestrictions}, {Name: names.VolumeBinding}, + {Name: names.VolumeZone}, {Name: names.PodTopologySpread}, {Name: names.InterPodAffinity}, }, @@ -363,6 +365,7 @@ var ExpandedPluginsV1 = &config.Plugins{ {Name: names.NodeResourcesFit}, {Name: names.VolumeRestrictions}, {Name: names.VolumeBinding}, + {Name: names.VolumeZone}, {Name: names.PodTopologySpread}, {Name: names.InterPodAffinity}, }, diff --git a/pkg/scheduler/apis/config/v1beta2/default_plugins.go b/pkg/scheduler/apis/config/v1beta2/default_plugins.go index 37341a4233e..70758fab0b9 100644 --- a/pkg/scheduler/apis/config/v1beta2/default_plugins.go +++ b/pkg/scheduler/apis/config/v1beta2/default_plugins.go @@ -42,6 +42,7 @@ func getDefaultPlugins() *v1beta2.Plugins { {Name: names.PodTopologySpread}, {Name: names.InterPodAffinity}, {Name: names.VolumeBinding}, + {Name: names.VolumeZone}, {Name: names.NodeAffinity}, }, }, diff --git a/pkg/scheduler/apis/config/v1beta2/default_plugins_test.go b/pkg/scheduler/apis/config/v1beta2/default_plugins_test.go index aaa5d44d15a..81bcfac7b66 100644 --- a/pkg/scheduler/apis/config/v1beta2/default_plugins_test.go +++ b/pkg/scheduler/apis/config/v1beta2/default_plugins_test.go @@ -51,6 +51,7 @@ func TestApplyFeatureGates(t *testing.T) { {Name: names.PodTopologySpread}, {Name: names.InterPodAffinity}, {Name: names.VolumeBinding}, + {Name: names.VolumeZone}, {Name: names.NodeAffinity}, }, }, @@ -138,6 +139,7 @@ func TestApplyFeatureGates(t *testing.T) { {Name: names.PodTopologySpread}, {Name: names.InterPodAffinity}, {Name: names.VolumeBinding}, + {Name: names.VolumeZone}, {Name: names.NodeAffinity}, }, }, diff --git a/pkg/scheduler/apis/config/v1beta2/defaults_test.go b/pkg/scheduler/apis/config/v1beta2/defaults_test.go index e76ddea41d9..824959e0697 100644 --- a/pkg/scheduler/apis/config/v1beta2/defaults_test.go +++ b/pkg/scheduler/apis/config/v1beta2/defaults_test.go @@ -339,6 +339,7 @@ func TestSchedulerDefaults(t *testing.T) { {Name: names.PodTopologySpread}, {Name: names.InterPodAffinity}, {Name: names.VolumeBinding}, + {Name: names.VolumeZone}, {Name: names.NodeAffinity}, }, }, diff --git a/pkg/scheduler/framework/plugins/volumezone/volume_zone.go b/pkg/scheduler/framework/plugins/volumezone/volume_zone.go index 1bfab85fc86..898441461b9 100644 --- a/pkg/scheduler/framework/plugins/volumezone/volume_zone.go +++ b/pkg/scheduler/framework/plugins/volumezone/volume_zone.go @@ -18,11 +18,11 @@ package volumezone import ( "context" + "errors" "fmt" - v1 "k8s.io/api/core/v1" storage "k8s.io/api/storage/v1" - "k8s.io/apimachinery/pkg/api/errors" + apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/sets" corelisters "k8s.io/client-go/listers/core/v1" @@ -42,28 +42,133 @@ type VolumeZone struct { } var _ framework.FilterPlugin = &VolumeZone{} +var _ framework.PreFilterPlugin = &VolumeZone{} var _ framework.EnqueueExtensions = &VolumeZone{} const ( // Name is the name of the plugin used in the plugin registry and configurations. Name = names.VolumeZone + preFilterStateKey framework.StateKey = "PreFilter" + Name + // ErrReasonConflict is used for NoVolumeZoneConflict predicate error. ErrReasonConflict = "node(s) had no available volume zone" ) -var volumeZoneLabels = sets.NewString( +// pvTopology holds the value of a pv's topologyLabel +type pvTopology struct { + pvName string + key string + values sets.String +} + +// the state is initialized in PreFilter phase. because we save the pointer in +// framework.CycleState, in the later phases we don't need to call Write method +// to update the value +type stateData struct { + // podPVTopologies holds the pv information we need + // it's initialized in the PreFilter phase + podPVTopologies []pvTopology +} + +func (d *stateData) Clone() framework.StateData { + return d +} + +var topologyLabels = []string{ v1.LabelFailureDomainBetaZone, v1.LabelFailureDomainBetaRegion, v1.LabelTopologyZone, v1.LabelTopologyRegion, -) +} // Name returns name of the plugin. It is used in logs, etc. func (pl *VolumeZone) Name() string { return Name } +// PreFilter invoked at the prefilter extension point +// +// # It finds the topology of the PersistentVolumes corresponding to the volumes a pod requests +// +// Currently, this is only supported with PersistentVolumeClaims, +// and only looks for the bound PersistentVolume. +func (pl *VolumeZone) PreFilter(ctx context.Context, cs *framework.CycleState, pod *v1.Pod) (*framework.PreFilterResult, *framework.Status) { + podPVTopologies, status := pl.getPVbyPod(ctx, pod) + if !status.IsSuccess() { + return nil, status + } + cs.Write(preFilterStateKey, &stateData{podPVTopologies: podPVTopologies}) + return nil, nil +} + +func (pl *VolumeZone) getPVbyPod(ctx context.Context, pod *v1.Pod) ([]pvTopology, *framework.Status) { + podPVTopologies := make([]pvTopology, 0) + + for i := range pod.Spec.Volumes { + volume := pod.Spec.Volumes[i] + if volume.PersistentVolumeClaim == nil { + continue + } + pvcName := volume.PersistentVolumeClaim.ClaimName + if pvcName == "" { + return nil, framework.NewStatus(framework.UnschedulableAndUnresolvable, "PersistentVolumeClaim had no name") + } + pvc, err := pl.pvcLister.PersistentVolumeClaims(pod.Namespace).Get(pvcName) + if s := getErrorAsStatus(err); !s.IsSuccess() { + return nil, s + } + + pvName := pvc.Spec.VolumeName + if pvName == "" { + scName := storagehelpers.GetPersistentVolumeClaimClass(pvc) + if len(scName) == 0 { + return nil, framework.NewStatus(framework.UnschedulableAndUnresolvable, "PersistentVolumeClaim had no pv name and storageClass name") + } + + class, err := pl.scLister.Get(scName) + if s := getErrorAsStatus(err); !s.IsSuccess() { + return nil, s + } + if class.VolumeBindingMode == nil { + return nil, framework.NewStatus(framework.UnschedulableAndUnresolvable, fmt.Sprintf("VolumeBindingMode not set for StorageClass %q", scName)) + } + if *class.VolumeBindingMode == storage.VolumeBindingWaitForFirstConsumer { + // Skip unbound volumes + continue + } + + return nil, framework.NewStatus(framework.UnschedulableAndUnresolvable, "PersistentVolume had no name") + } + + pv, err := pl.pvLister.Get(pvName) + if s := getErrorAsStatus(err); !s.IsSuccess() { + return nil, s + } + + for _, key := range topologyLabels { + if value, ok := pv.ObjectMeta.Labels[key]; ok { + volumeVSet, err := volumehelpers.LabelZonesToSet(value) + if err != nil { + klog.InfoS("Failed to parse label, ignoring the label", "label", fmt.Sprintf("%s:%s", key, value), "err", err) + continue + } + podPVTopologies = append(podPVTopologies, pvTopology{ + pvName: pv.Name, + key: key, + values: volumeVSet, + }) + } + } + } + return podPVTopologies, nil +} + +// PreFilterExtensions returns prefilter extensions, pod add and remove. +func (pl *VolumeZone) PreFilterExtensions() framework.PreFilterExtensions { + return nil +} + // Filter invoked at the filter extension point. // // It evaluates if a pod can fit due to the volumes it requests, given @@ -80,94 +185,66 @@ func (pl *VolumeZone) Name() string { // determining the zone of a volume during scheduling, and that is likely to // require calling out to the cloud provider. It seems that we are moving away // from inline volume declarations anyway. -func (pl *VolumeZone) Filter(ctx context.Context, _ *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status { +func (pl *VolumeZone) Filter(ctx context.Context, cs *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status { // If a pod doesn't have any volume attached to it, the predicate will always be true. // Thus we make a fast path for it, to avoid unnecessary computations in this case. if len(pod.Spec.Volumes) == 0 { return nil } - node := nodeInfo.Node() - if node == nil { - return framework.NewStatus(framework.Error, "node not found") - } - nodeConstraints := make(map[string]string) - for k, v := range node.ObjectMeta.Labels { - if !volumeZoneLabels.Has(k) { - continue + var podPVTopologies []pvTopology + state, err := getStateData(cs) + if err != nil { + // Fallback to calculate pv list here + var status *framework.Status + podPVTopologies, status = pl.getPVbyPod(ctx, pod) + if !status.IsSuccess() { + return status } - nodeConstraints[k] = v + } else { + podPVTopologies = state.podPVTopologies } - if len(nodeConstraints) == 0 { + + node := nodeInfo.Node() + hasAnyNodeConstraint := false + for _, pvTopology := range podPVTopologies { + if _, ok := node.Labels[pvTopology.key]; ok { + hasAnyNodeConstraint = true + break + } + } + + if !hasAnyNodeConstraint { // The node has no zone constraints, so we're OK to schedule. - // In practice, when using zones, all nodes must be labeled with zone labels. - // We want to fast-path this case though. + // This is to handle a single-zone cluster scenario where the node may not have any topology labels. return nil } - for i := range pod.Spec.Volumes { - volume := pod.Spec.Volumes[i] - if volume.PersistentVolumeClaim == nil { - continue - } - pvcName := volume.PersistentVolumeClaim.ClaimName - if pvcName == "" { - return framework.NewStatus(framework.UnschedulableAndUnresolvable, "PersistentVolumeClaim had no name") - } - pvc, err := pl.pvcLister.PersistentVolumeClaims(pod.Namespace).Get(pvcName) - if s := getErrorAsStatus(err); !s.IsSuccess() { - return s - } - - pvName := pvc.Spec.VolumeName - if pvName == "" { - scName := storagehelpers.GetPersistentVolumeClaimClass(pvc) - if len(scName) == 0 { - return framework.NewStatus(framework.UnschedulableAndUnresolvable, "PersistentVolumeClaim had no pv name and storageClass name") - } - - class, err := pl.scLister.Get(scName) - if s := getErrorAsStatus(err); !s.IsSuccess() { - return s - } - if class.VolumeBindingMode == nil { - return framework.NewStatus(framework.UnschedulableAndUnresolvable, fmt.Sprintf("VolumeBindingMode not set for StorageClass %q", scName)) - } - if *class.VolumeBindingMode == storage.VolumeBindingWaitForFirstConsumer { - // Skip unbound volumes - continue - } - - return framework.NewStatus(framework.UnschedulableAndUnresolvable, "PersistentVolume had no name") - } - - pv, err := pl.pvLister.Get(pvName) - if s := getErrorAsStatus(err); !s.IsSuccess() { - return s - } - - for k, v := range pv.ObjectMeta.Labels { - if !volumeZoneLabels.Has(k) { - continue - } - nodeV := nodeConstraints[k] - volumeVSet, err := volumehelpers.LabelZonesToSet(v) - if err != nil { - klog.InfoS("Failed to parse label, ignoring the label", "label", fmt.Sprintf("%s:%s", k, v), "err", err) - continue - } - - if !volumeVSet.Has(nodeV) { - klog.V(10).InfoS("Won't schedule pod onto node due to volume (mismatch on label key)", "pod", klog.KObj(pod), "node", klog.KObj(node), "PV", klog.KRef("", pvName), "PVLabelKey", k) - return framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrReasonConflict) - } + for _, pvTopology := range podPVTopologies { + v, ok := node.Labels[pvTopology.key] + if !ok || !pvTopology.values.Has(v) { + klog.V(10).InfoS("Won't schedule pod onto node due to volume (mismatch on label key)", "pod", klog.KObj(pod), "node", klog.KObj(node), "PV", klog.KRef("", pvTopology.pvName), "PVLabelKey", pvTopology.key) + return framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrReasonConflict) } } + return nil } +func getStateData(cs *framework.CycleState) (*stateData, error) { + state, err := cs.Read(preFilterStateKey) + if err != nil { + return nil, err + } + s, ok := state.(*stateData) + if !ok { + return nil, errors.New("unable to convert state into stateData") + } + return s, nil +} + func getErrorAsStatus(err error) *framework.Status { if err != nil { - if errors.IsNotFound(err) { + if apierrors.IsNotFound(err) { return framework.NewStatus(framework.UnschedulableAndUnresolvable, err.Error()) } return framework.AsStatus(err) diff --git a/pkg/scheduler/framework/plugins/volumezone/volume_zone_test.go b/pkg/scheduler/framework/plugins/volumezone/volume_zone_test.go index ecc99896b08..3eae1eba6fe 100644 --- a/pkg/scheduler/framework/plugins/volumezone/volume_zone_test.go +++ b/pkg/scheduler/framework/plugins/volumezone/volume_zone_test.go @@ -18,14 +18,19 @@ package volumezone import ( "context" - "reflect" + "fmt" "testing" + "github.com/google/go-cmp/cmp" v1 "k8s.io/api/core/v1" storagev1 "k8s.io/api/storage/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/kubernetes/pkg/scheduler/apis/config" "k8s.io/kubernetes/pkg/scheduler/framework" fakeframework "k8s.io/kubernetes/pkg/scheduler/framework/fake" + plugintesting "k8s.io/kubernetes/pkg/scheduler/framework/plugins/testing" + "k8s.io/kubernetes/pkg/scheduler/internal/cache" st "k8s.io/kubernetes/pkg/scheduler/testing" ) @@ -50,6 +55,9 @@ func TestSingleZone(t *testing.T) { { ObjectMeta: metav1.ObjectMeta{Name: "Vol_Stable_2", Labels: map[string]string{v1.LabelTopologyRegion: "us-west1", "uselessLabel": "none"}}, }, + { + ObjectMeta: metav1.ObjectMeta{Name: "Vol_Stable_3", Labels: map[string]string{v1.LabelTopologyZone: "us-west1-a", v1.LabelTopologyRegion: "us-west1-a"}}, + }, } pvcLister := fakeframework.PersistentVolumeClaimLister{ @@ -77,6 +85,10 @@ func TestSingleZone(t *testing.T) { ObjectMeta: metav1.ObjectMeta{Name: "PVC_Stable_2", Namespace: "default"}, Spec: v1.PersistentVolumeClaimSpec{VolumeName: "Vol_Stable_2"}, }, + { + ObjectMeta: metav1.ObjectMeta{Name: "PVC_Stable_3", Namespace: "default"}, + Spec: v1.PersistentVolumeClaimSpec{VolumeName: "Vol_Stable_3"}, + }, } tests := []struct { @@ -188,10 +200,27 @@ func TestSingleZone(t *testing.T) { }, wantStatus: framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrReasonConflict), }, + { + name: "pv with zone and region, node with only zone", + Pod: createPodWithVolume("pod_1", "Vol_Stable_3", "PVC_Stable_3"), + Node: &v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "host1", + Labels: map[string]string{ + v1.LabelTopologyZone: "us-west1-a", + }, + }, + }, + wantStatus: framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrReasonConflict), + }, } for _, test := range tests { t.Run(test.name, func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + state := framework.NewCycleState() node := &framework.NodeInfo{} node.SetNode(test.Node) p := &VolumeZone{ @@ -199,9 +228,17 @@ func TestSingleZone(t *testing.T) { pvcLister, nil, } - gotStatus := p.Filter(context.Background(), nil, test.Pod, node) - if !reflect.DeepEqual(gotStatus, test.wantStatus) { - t.Errorf("status does not match: %v, want: %v", gotStatus, test.wantStatus) + + _, gotPreFilterStatus := p.PreFilter(ctx, state, test.Pod) + if !gotPreFilterStatus.IsSuccess() { + if diff := cmp.Diff(gotPreFilterStatus, test.wantStatus); diff != "" { + t.Errorf("status does not match (-want,+got):\n%s", diff) + } + } else { + gotStatus := p.Filter(ctx, state, test.Pod, node) + if diff := cmp.Diff(gotStatus, test.wantStatus); diff != "" { + t.Errorf("status does not match (-want,+got):\n%s", diff) + } } }) } @@ -314,6 +351,10 @@ func TestMultiZone(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + state := framework.NewCycleState() node := &framework.NodeInfo{} node.SetNode(test.Node) p := &VolumeZone{ @@ -321,9 +362,16 @@ func TestMultiZone(t *testing.T) { pvcLister, nil, } - gotStatus := p.Filter(context.Background(), nil, test.Pod, node) - if !reflect.DeepEqual(gotStatus, test.wantStatus) { - t.Errorf("status does not match: %v, want: %v", gotStatus, test.wantStatus) + _, gotPreFilterStatus := p.PreFilter(ctx, state, test.Pod) + if !gotPreFilterStatus.IsSuccess() { + if diff := cmp.Diff(gotPreFilterStatus, test.wantStatus); diff != "" { + t.Errorf("status does not match (-want,+got):\n%s", diff) + } + } else { + gotStatus := p.Filter(context.Background(), state, test.Pod, node) + if diff := cmp.Diff(gotStatus, test.wantStatus); diff != "" { + t.Errorf("status does not match (-want,+got):\n%s", diff) + } } }) } @@ -423,6 +471,10 @@ func TestWithBinding(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + state := framework.NewCycleState() node := &framework.NodeInfo{} node.SetNode(test.Node) p := &VolumeZone{ @@ -430,10 +482,126 @@ func TestWithBinding(t *testing.T) { pvcLister, scLister, } - gotStatus := p.Filter(context.Background(), nil, test.Pod, node) - if !reflect.DeepEqual(gotStatus, test.wantStatus) { - t.Errorf("status does not match: %v, want: %v", gotStatus, test.wantStatus) + _, gotPreFilterStatus := p.PreFilter(ctx, state, test.Pod) + if !gotPreFilterStatus.IsSuccess() { + if diff := cmp.Diff(gotPreFilterStatus, test.wantStatus); diff != "" { + t.Errorf("status does not match (-want,+got):\n%s", diff) + } + } else { + gotStatus := p.Filter(ctx, state, test.Pod, node) + if diff := cmp.Diff(gotStatus, test.wantStatus); diff != "" { + t.Errorf("status does not match (-want,+got):\n%s", diff) + } } }) } } + +func BenchmarkVolumeZone(b *testing.B) { + tests := []struct { + Name string + Pod *v1.Pod + NumPV int + NumPVC int + NumNodes int + PreFilter bool + }{ + { + Name: "with prefilter", + Pod: createPodWithVolume("pod_0", "Vol_Stable_0", "PVC_Stable_0"), + NumPV: 1000, + NumPVC: 1000, + NumNodes: 1000, + PreFilter: true, + }, + { + Name: "without prefilter", + Pod: createPodWithVolume("pod_0", "Vol_Stable_0", "PVC_Stable_0"), + NumPV: 1000, + NumPVC: 1000, + NumNodes: 1000, + PreFilter: false, + }, + } + + for _, tt := range tests { + b.Run(tt.Name, func(b *testing.B) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + nodes := makeNodesWithTopologyZone(tt.NumNodes) + pl := newPluginWithListers(ctx, b, []*v1.Pod{tt.Pod}, nodes, makePVCsWithPV(tt.NumPVC), makePVsWithZoneLabel(tt.NumPV)) + nodeInfos := make([]*framework.NodeInfo, len(nodes), len(nodes)) + for i := 0; i < len(nodes); i++ { + nodeInfo := &framework.NodeInfo{} + nodeInfo.SetNode(nodes[i]) + nodeInfos[i] = nodeInfo + } + p := pl.(*VolumeZone) + state := framework.NewCycleState() + + b.ResetTimer() + + for i := 0; i < b.N; i++ { + if tt.PreFilter { + _, _ = p.PreFilter(ctx, state, tt.Pod) + } + for _, node := range nodeInfos { + _ = p.Filter(ctx, state, tt.Pod, node) + } + } + }) + } +} + +func newPluginWithListers(ctx context.Context, tb testing.TB, pods []*v1.Pod, nodes []*v1.Node, pvcs []*v1.PersistentVolumeClaim, pvs []*v1.PersistentVolume) framework.Plugin { + snapshot := cache.NewSnapshot(pods, nodes) + + objects := make([]runtime.Object, 0, len(pvcs)) + for _, pvc := range pvcs { + objects = append(objects, pvc) + } + for _, pv := range pvs { + objects = append(objects, pv) + } + return plugintesting.SetupPluginWithInformers(ctx, tb, New, &config.InterPodAffinityArgs{}, snapshot, objects) +} + +func makePVsWithZoneLabel(num int) []*v1.PersistentVolume { + pvList := make([]*v1.PersistentVolume, num, num) + for i := 0; i < len(pvList); i++ { + pvName := fmt.Sprintf("Vol_Stable_%d", i) + zone := fmt.Sprintf("us-west-%d", i) + pvList[i] = &v1.PersistentVolume{ + ObjectMeta: metav1.ObjectMeta{Name: pvName, Labels: map[string]string{v1.LabelTopologyZone: zone}}, + } + } + return pvList +} + +func makePVCsWithPV(num int) []*v1.PersistentVolumeClaim { + pvcList := make([]*v1.PersistentVolumeClaim, num, num) + for i := 0; i < len(pvcList); i++ { + pvcName := fmt.Sprintf("PVC_Stable_%d", i) + pvName := fmt.Sprintf("Vol_Stable_%d", i) + pvcList[i] = &v1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{Name: pvcName, Namespace: "default"}, + Spec: v1.PersistentVolumeClaimSpec{VolumeName: pvName}, + } + } + return pvcList +} + +func makeNodesWithTopologyZone(num int) []*v1.Node { + nodeList := make([]*v1.Node, num, num) + for i := 0; i < len(nodeList); i++ { + nodeName := fmt.Sprintf("host_%d", i) + zone := fmt.Sprintf("us-west-0") + nodeList[i] = &v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: nodeName, + Labels: map[string]string{v1.LabelTopologyZone: zone, "uselessLabel": "none"}, + }, + } + } + return nodeList +}