Merge pull request #114149 from kidddddddddddddddddddddd/volume_zone

Implement PreFilter for VolumeZone plugin for performance
This commit is contained in:
Kubernetes Prow Robot 2022-12-10 06:04:48 -08:00 committed by GitHub
commit 1de943d9cf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 336 additions and 84 deletions

View File

@ -36,6 +36,7 @@ var PluginsV1beta2 = &config.Plugins{
{Name: names.PodTopologySpread},
{Name: names.InterPodAffinity},
{Name: names.VolumeBinding},
{Name: names.VolumeZone},
{Name: names.NodeAffinity},
},
},
@ -193,6 +194,7 @@ var ExpandedPluginsV1beta3 = &config.Plugins{
{Name: names.NodeResourcesFit},
{Name: names.VolumeRestrictions},
{Name: names.VolumeBinding},
{Name: names.VolumeZone},
{Name: names.PodTopologySpread},
{Name: names.InterPodAffinity},
},
@ -363,6 +365,7 @@ var ExpandedPluginsV1 = &config.Plugins{
{Name: names.NodeResourcesFit},
{Name: names.VolumeRestrictions},
{Name: names.VolumeBinding},
{Name: names.VolumeZone},
{Name: names.PodTopologySpread},
{Name: names.InterPodAffinity},
},

View File

@ -42,6 +42,7 @@ func getDefaultPlugins() *v1beta2.Plugins {
{Name: names.PodTopologySpread},
{Name: names.InterPodAffinity},
{Name: names.VolumeBinding},
{Name: names.VolumeZone},
{Name: names.NodeAffinity},
},
},

View File

@ -51,6 +51,7 @@ func TestApplyFeatureGates(t *testing.T) {
{Name: names.PodTopologySpread},
{Name: names.InterPodAffinity},
{Name: names.VolumeBinding},
{Name: names.VolumeZone},
{Name: names.NodeAffinity},
},
},
@ -138,6 +139,7 @@ func TestApplyFeatureGates(t *testing.T) {
{Name: names.PodTopologySpread},
{Name: names.InterPodAffinity},
{Name: names.VolumeBinding},
{Name: names.VolumeZone},
{Name: names.NodeAffinity},
},
},

View File

@ -339,6 +339,7 @@ func TestSchedulerDefaults(t *testing.T) {
{Name: names.PodTopologySpread},
{Name: names.InterPodAffinity},
{Name: names.VolumeBinding},
{Name: names.VolumeZone},
{Name: names.NodeAffinity},
},
},

View File

@ -18,11 +18,11 @@ package volumezone
import (
"context"
"errors"
"fmt"
v1 "k8s.io/api/core/v1"
storage "k8s.io/api/storage/v1"
"k8s.io/apimachinery/pkg/api/errors"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/sets"
corelisters "k8s.io/client-go/listers/core/v1"
@ -42,28 +42,133 @@ type VolumeZone struct {
}
var _ framework.FilterPlugin = &VolumeZone{}
var _ framework.PreFilterPlugin = &VolumeZone{}
var _ framework.EnqueueExtensions = &VolumeZone{}
const (
// Name is the name of the plugin used in the plugin registry and configurations.
Name = names.VolumeZone
preFilterStateKey framework.StateKey = "PreFilter" + Name
// ErrReasonConflict is used for NoVolumeZoneConflict predicate error.
ErrReasonConflict = "node(s) had no available volume zone"
)
var volumeZoneLabels = sets.NewString(
// pvTopology holds the value of a pv's topologyLabel
type pvTopology struct {
pvName string
key string
values sets.String
}
// the state is initialized in PreFilter phase. because we save the pointer in
// framework.CycleState, in the later phases we don't need to call Write method
// to update the value
type stateData struct {
// podPVTopologies holds the pv information we need
// it's initialized in the PreFilter phase
podPVTopologies []pvTopology
}
func (d *stateData) Clone() framework.StateData {
return d
}
var topologyLabels = []string{
v1.LabelFailureDomainBetaZone,
v1.LabelFailureDomainBetaRegion,
v1.LabelTopologyZone,
v1.LabelTopologyRegion,
)
}
// Name returns name of the plugin. It is used in logs, etc.
func (pl *VolumeZone) Name() string {
return Name
}
// PreFilter invoked at the prefilter extension point
//
// # It finds the topology of the PersistentVolumes corresponding to the volumes a pod requests
//
// Currently, this is only supported with PersistentVolumeClaims,
// and only looks for the bound PersistentVolume.
func (pl *VolumeZone) PreFilter(ctx context.Context, cs *framework.CycleState, pod *v1.Pod) (*framework.PreFilterResult, *framework.Status) {
podPVTopologies, status := pl.getPVbyPod(ctx, pod)
if !status.IsSuccess() {
return nil, status
}
cs.Write(preFilterStateKey, &stateData{podPVTopologies: podPVTopologies})
return nil, nil
}
func (pl *VolumeZone) getPVbyPod(ctx context.Context, pod *v1.Pod) ([]pvTopology, *framework.Status) {
podPVTopologies := make([]pvTopology, 0)
for i := range pod.Spec.Volumes {
volume := pod.Spec.Volumes[i]
if volume.PersistentVolumeClaim == nil {
continue
}
pvcName := volume.PersistentVolumeClaim.ClaimName
if pvcName == "" {
return nil, framework.NewStatus(framework.UnschedulableAndUnresolvable, "PersistentVolumeClaim had no name")
}
pvc, err := pl.pvcLister.PersistentVolumeClaims(pod.Namespace).Get(pvcName)
if s := getErrorAsStatus(err); !s.IsSuccess() {
return nil, s
}
pvName := pvc.Spec.VolumeName
if pvName == "" {
scName := storagehelpers.GetPersistentVolumeClaimClass(pvc)
if len(scName) == 0 {
return nil, framework.NewStatus(framework.UnschedulableAndUnresolvable, "PersistentVolumeClaim had no pv name and storageClass name")
}
class, err := pl.scLister.Get(scName)
if s := getErrorAsStatus(err); !s.IsSuccess() {
return nil, s
}
if class.VolumeBindingMode == nil {
return nil, framework.NewStatus(framework.UnschedulableAndUnresolvable, fmt.Sprintf("VolumeBindingMode not set for StorageClass %q", scName))
}
if *class.VolumeBindingMode == storage.VolumeBindingWaitForFirstConsumer {
// Skip unbound volumes
continue
}
return nil, framework.NewStatus(framework.UnschedulableAndUnresolvable, "PersistentVolume had no name")
}
pv, err := pl.pvLister.Get(pvName)
if s := getErrorAsStatus(err); !s.IsSuccess() {
return nil, s
}
for _, key := range topologyLabels {
if value, ok := pv.ObjectMeta.Labels[key]; ok {
volumeVSet, err := volumehelpers.LabelZonesToSet(value)
if err != nil {
klog.InfoS("Failed to parse label, ignoring the label", "label", fmt.Sprintf("%s:%s", key, value), "err", err)
continue
}
podPVTopologies = append(podPVTopologies, pvTopology{
pvName: pv.Name,
key: key,
values: volumeVSet,
})
}
}
}
return podPVTopologies, nil
}
// PreFilterExtensions returns prefilter extensions, pod add and remove.
func (pl *VolumeZone) PreFilterExtensions() framework.PreFilterExtensions {
return nil
}
// Filter invoked at the filter extension point.
//
// It evaluates if a pod can fit due to the volumes it requests, given
@ -80,94 +185,66 @@ func (pl *VolumeZone) Name() string {
// determining the zone of a volume during scheduling, and that is likely to
// require calling out to the cloud provider. It seems that we are moving away
// from inline volume declarations anyway.
func (pl *VolumeZone) Filter(ctx context.Context, _ *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status {
func (pl *VolumeZone) Filter(ctx context.Context, cs *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status {
// If a pod doesn't have any volume attached to it, the predicate will always be true.
// Thus we make a fast path for it, to avoid unnecessary computations in this case.
if len(pod.Spec.Volumes) == 0 {
return nil
}
node := nodeInfo.Node()
if node == nil {
return framework.NewStatus(framework.Error, "node not found")
}
nodeConstraints := make(map[string]string)
for k, v := range node.ObjectMeta.Labels {
if !volumeZoneLabels.Has(k) {
continue
var podPVTopologies []pvTopology
state, err := getStateData(cs)
if err != nil {
// Fallback to calculate pv list here
var status *framework.Status
podPVTopologies, status = pl.getPVbyPod(ctx, pod)
if !status.IsSuccess() {
return status
}
nodeConstraints[k] = v
} else {
podPVTopologies = state.podPVTopologies
}
if len(nodeConstraints) == 0 {
node := nodeInfo.Node()
hasAnyNodeConstraint := false
for _, pvTopology := range podPVTopologies {
if _, ok := node.Labels[pvTopology.key]; ok {
hasAnyNodeConstraint = true
break
}
}
if !hasAnyNodeConstraint {
// The node has no zone constraints, so we're OK to schedule.
// In practice, when using zones, all nodes must be labeled with zone labels.
// We want to fast-path this case though.
// This is to handle a single-zone cluster scenario where the node may not have any topology labels.
return nil
}
for i := range pod.Spec.Volumes {
volume := pod.Spec.Volumes[i]
if volume.PersistentVolumeClaim == nil {
continue
}
pvcName := volume.PersistentVolumeClaim.ClaimName
if pvcName == "" {
return framework.NewStatus(framework.UnschedulableAndUnresolvable, "PersistentVolumeClaim had no name")
}
pvc, err := pl.pvcLister.PersistentVolumeClaims(pod.Namespace).Get(pvcName)
if s := getErrorAsStatus(err); !s.IsSuccess() {
return s
}
pvName := pvc.Spec.VolumeName
if pvName == "" {
scName := storagehelpers.GetPersistentVolumeClaimClass(pvc)
if len(scName) == 0 {
return framework.NewStatus(framework.UnschedulableAndUnresolvable, "PersistentVolumeClaim had no pv name and storageClass name")
}
class, err := pl.scLister.Get(scName)
if s := getErrorAsStatus(err); !s.IsSuccess() {
return s
}
if class.VolumeBindingMode == nil {
return framework.NewStatus(framework.UnschedulableAndUnresolvable, fmt.Sprintf("VolumeBindingMode not set for StorageClass %q", scName))
}
if *class.VolumeBindingMode == storage.VolumeBindingWaitForFirstConsumer {
// Skip unbound volumes
continue
}
return framework.NewStatus(framework.UnschedulableAndUnresolvable, "PersistentVolume had no name")
}
pv, err := pl.pvLister.Get(pvName)
if s := getErrorAsStatus(err); !s.IsSuccess() {
return s
}
for k, v := range pv.ObjectMeta.Labels {
if !volumeZoneLabels.Has(k) {
continue
}
nodeV := nodeConstraints[k]
volumeVSet, err := volumehelpers.LabelZonesToSet(v)
if err != nil {
klog.InfoS("Failed to parse label, ignoring the label", "label", fmt.Sprintf("%s:%s", k, v), "err", err)
continue
}
if !volumeVSet.Has(nodeV) {
klog.V(10).InfoS("Won't schedule pod onto node due to volume (mismatch on label key)", "pod", klog.KObj(pod), "node", klog.KObj(node), "PV", klog.KRef("", pvName), "PVLabelKey", k)
return framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrReasonConflict)
}
for _, pvTopology := range podPVTopologies {
v, ok := node.Labels[pvTopology.key]
if !ok || !pvTopology.values.Has(v) {
klog.V(10).InfoS("Won't schedule pod onto node due to volume (mismatch on label key)", "pod", klog.KObj(pod), "node", klog.KObj(node), "PV", klog.KRef("", pvTopology.pvName), "PVLabelKey", pvTopology.key)
return framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrReasonConflict)
}
}
return nil
}
func getStateData(cs *framework.CycleState) (*stateData, error) {
state, err := cs.Read(preFilterStateKey)
if err != nil {
return nil, err
}
s, ok := state.(*stateData)
if !ok {
return nil, errors.New("unable to convert state into stateData")
}
return s, nil
}
func getErrorAsStatus(err error) *framework.Status {
if err != nil {
if errors.IsNotFound(err) {
if apierrors.IsNotFound(err) {
return framework.NewStatus(framework.UnschedulableAndUnresolvable, err.Error())
}
return framework.AsStatus(err)

View File

@ -18,14 +18,19 @@ package volumezone
import (
"context"
"reflect"
"fmt"
"testing"
"github.com/google/go-cmp/cmp"
v1 "k8s.io/api/core/v1"
storagev1 "k8s.io/api/storage/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/scheduler/framework"
fakeframework "k8s.io/kubernetes/pkg/scheduler/framework/fake"
plugintesting "k8s.io/kubernetes/pkg/scheduler/framework/plugins/testing"
"k8s.io/kubernetes/pkg/scheduler/internal/cache"
st "k8s.io/kubernetes/pkg/scheduler/testing"
)
@ -50,6 +55,9 @@ func TestSingleZone(t *testing.T) {
{
ObjectMeta: metav1.ObjectMeta{Name: "Vol_Stable_2", Labels: map[string]string{v1.LabelTopologyRegion: "us-west1", "uselessLabel": "none"}},
},
{
ObjectMeta: metav1.ObjectMeta{Name: "Vol_Stable_3", Labels: map[string]string{v1.LabelTopologyZone: "us-west1-a", v1.LabelTopologyRegion: "us-west1-a"}},
},
}
pvcLister := fakeframework.PersistentVolumeClaimLister{
@ -77,6 +85,10 @@ func TestSingleZone(t *testing.T) {
ObjectMeta: metav1.ObjectMeta{Name: "PVC_Stable_2", Namespace: "default"},
Spec: v1.PersistentVolumeClaimSpec{VolumeName: "Vol_Stable_2"},
},
{
ObjectMeta: metav1.ObjectMeta{Name: "PVC_Stable_3", Namespace: "default"},
Spec: v1.PersistentVolumeClaimSpec{VolumeName: "Vol_Stable_3"},
},
}
tests := []struct {
@ -188,10 +200,27 @@ func TestSingleZone(t *testing.T) {
},
wantStatus: framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrReasonConflict),
},
{
name: "pv with zone and region, node with only zone",
Pod: createPodWithVolume("pod_1", "Vol_Stable_3", "PVC_Stable_3"),
Node: &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "host1",
Labels: map[string]string{
v1.LabelTopologyZone: "us-west1-a",
},
},
},
wantStatus: framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrReasonConflict),
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
state := framework.NewCycleState()
node := &framework.NodeInfo{}
node.SetNode(test.Node)
p := &VolumeZone{
@ -199,9 +228,17 @@ func TestSingleZone(t *testing.T) {
pvcLister,
nil,
}
gotStatus := p.Filter(context.Background(), nil, test.Pod, node)
if !reflect.DeepEqual(gotStatus, test.wantStatus) {
t.Errorf("status does not match: %v, want: %v", gotStatus, test.wantStatus)
_, gotPreFilterStatus := p.PreFilter(ctx, state, test.Pod)
if !gotPreFilterStatus.IsSuccess() {
if diff := cmp.Diff(gotPreFilterStatus, test.wantStatus); diff != "" {
t.Errorf("status does not match (-want,+got):\n%s", diff)
}
} else {
gotStatus := p.Filter(ctx, state, test.Pod, node)
if diff := cmp.Diff(gotStatus, test.wantStatus); diff != "" {
t.Errorf("status does not match (-want,+got):\n%s", diff)
}
}
})
}
@ -314,6 +351,10 @@ func TestMultiZone(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
state := framework.NewCycleState()
node := &framework.NodeInfo{}
node.SetNode(test.Node)
p := &VolumeZone{
@ -321,9 +362,16 @@ func TestMultiZone(t *testing.T) {
pvcLister,
nil,
}
gotStatus := p.Filter(context.Background(), nil, test.Pod, node)
if !reflect.DeepEqual(gotStatus, test.wantStatus) {
t.Errorf("status does not match: %v, want: %v", gotStatus, test.wantStatus)
_, gotPreFilterStatus := p.PreFilter(ctx, state, test.Pod)
if !gotPreFilterStatus.IsSuccess() {
if diff := cmp.Diff(gotPreFilterStatus, test.wantStatus); diff != "" {
t.Errorf("status does not match (-want,+got):\n%s", diff)
}
} else {
gotStatus := p.Filter(context.Background(), state, test.Pod, node)
if diff := cmp.Diff(gotStatus, test.wantStatus); diff != "" {
t.Errorf("status does not match (-want,+got):\n%s", diff)
}
}
})
}
@ -423,6 +471,10 @@ func TestWithBinding(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
state := framework.NewCycleState()
node := &framework.NodeInfo{}
node.SetNode(test.Node)
p := &VolumeZone{
@ -430,10 +482,126 @@ func TestWithBinding(t *testing.T) {
pvcLister,
scLister,
}
gotStatus := p.Filter(context.Background(), nil, test.Pod, node)
if !reflect.DeepEqual(gotStatus, test.wantStatus) {
t.Errorf("status does not match: %v, want: %v", gotStatus, test.wantStatus)
_, gotPreFilterStatus := p.PreFilter(ctx, state, test.Pod)
if !gotPreFilterStatus.IsSuccess() {
if diff := cmp.Diff(gotPreFilterStatus, test.wantStatus); diff != "" {
t.Errorf("status does not match (-want,+got):\n%s", diff)
}
} else {
gotStatus := p.Filter(ctx, state, test.Pod, node)
if diff := cmp.Diff(gotStatus, test.wantStatus); diff != "" {
t.Errorf("status does not match (-want,+got):\n%s", diff)
}
}
})
}
}
func BenchmarkVolumeZone(b *testing.B) {
tests := []struct {
Name string
Pod *v1.Pod
NumPV int
NumPVC int
NumNodes int
PreFilter bool
}{
{
Name: "with prefilter",
Pod: createPodWithVolume("pod_0", "Vol_Stable_0", "PVC_Stable_0"),
NumPV: 1000,
NumPVC: 1000,
NumNodes: 1000,
PreFilter: true,
},
{
Name: "without prefilter",
Pod: createPodWithVolume("pod_0", "Vol_Stable_0", "PVC_Stable_0"),
NumPV: 1000,
NumPVC: 1000,
NumNodes: 1000,
PreFilter: false,
},
}
for _, tt := range tests {
b.Run(tt.Name, func(b *testing.B) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
nodes := makeNodesWithTopologyZone(tt.NumNodes)
pl := newPluginWithListers(ctx, b, []*v1.Pod{tt.Pod}, nodes, makePVCsWithPV(tt.NumPVC), makePVsWithZoneLabel(tt.NumPV))
nodeInfos := make([]*framework.NodeInfo, len(nodes), len(nodes))
for i := 0; i < len(nodes); i++ {
nodeInfo := &framework.NodeInfo{}
nodeInfo.SetNode(nodes[i])
nodeInfos[i] = nodeInfo
}
p := pl.(*VolumeZone)
state := framework.NewCycleState()
b.ResetTimer()
for i := 0; i < b.N; i++ {
if tt.PreFilter {
_, _ = p.PreFilter(ctx, state, tt.Pod)
}
for _, node := range nodeInfos {
_ = p.Filter(ctx, state, tt.Pod, node)
}
}
})
}
}
func newPluginWithListers(ctx context.Context, tb testing.TB, pods []*v1.Pod, nodes []*v1.Node, pvcs []*v1.PersistentVolumeClaim, pvs []*v1.PersistentVolume) framework.Plugin {
snapshot := cache.NewSnapshot(pods, nodes)
objects := make([]runtime.Object, 0, len(pvcs))
for _, pvc := range pvcs {
objects = append(objects, pvc)
}
for _, pv := range pvs {
objects = append(objects, pv)
}
return plugintesting.SetupPluginWithInformers(ctx, tb, New, &config.InterPodAffinityArgs{}, snapshot, objects)
}
func makePVsWithZoneLabel(num int) []*v1.PersistentVolume {
pvList := make([]*v1.PersistentVolume, num, num)
for i := 0; i < len(pvList); i++ {
pvName := fmt.Sprintf("Vol_Stable_%d", i)
zone := fmt.Sprintf("us-west-%d", i)
pvList[i] = &v1.PersistentVolume{
ObjectMeta: metav1.ObjectMeta{Name: pvName, Labels: map[string]string{v1.LabelTopologyZone: zone}},
}
}
return pvList
}
func makePVCsWithPV(num int) []*v1.PersistentVolumeClaim {
pvcList := make([]*v1.PersistentVolumeClaim, num, num)
for i := 0; i < len(pvcList); i++ {
pvcName := fmt.Sprintf("PVC_Stable_%d", i)
pvName := fmt.Sprintf("Vol_Stable_%d", i)
pvcList[i] = &v1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{Name: pvcName, Namespace: "default"},
Spec: v1.PersistentVolumeClaimSpec{VolumeName: pvName},
}
}
return pvcList
}
func makeNodesWithTopologyZone(num int) []*v1.Node {
nodeList := make([]*v1.Node, num, num)
for i := 0; i < len(nodeList); i++ {
nodeName := fmt.Sprintf("host_%d", i)
zone := fmt.Sprintf("us-west-0")
nodeList[i] = &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: nodeName,
Labels: map[string]string{v1.LabelTopologyZone: zone, "uselessLabel": "none"},
},
}
}
return nodeList
}