Merge pull request #86530 from zouyee/vbb

Move volumebinding predicate to its filter plugin
This commit is contained in:
Kubernetes Prow Robot 2019-12-23 22:05:29 -08:00 committed by GitHub
commit 39d0710a4b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 187 additions and 78 deletions

View File

@ -22,7 +22,6 @@ go_library(
"//pkg/scheduler/algorithm:go_default_library",
"//pkg/scheduler/nodeinfo:go_default_library",
"//pkg/scheduler/util:go_default_library",
"//pkg/scheduler/volumebinder:go_default_library",
"//pkg/volume/util:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/api/storage/v1:go_default_library",

View File

@ -41,7 +41,6 @@ import (
"k8s.io/kubernetes/pkg/scheduler/algorithm"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
schedutil "k8s.io/kubernetes/pkg/scheduler/util"
"k8s.io/kubernetes/pkg/scheduler/volumebinder"
volumeutil "k8s.io/kubernetes/pkg/volume/util"
)
@ -1051,74 +1050,6 @@ func podToleratesNodeTaints(pod *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo, f
return false, []PredicateFailureReason{ErrTaintsTolerationsNotMatch}, nil
}
// VolumeBindingChecker contains information to check a volume binding.
type VolumeBindingChecker struct {
binder *volumebinder.VolumeBinder
}
// NewVolumeBindingPredicate evaluates if a pod can fit due to the volumes it requests,
// for both bound and unbound PVCs.
//
// For PVCs that are bound, then it checks that the corresponding PV's node affinity is
// satisfied by the given node.
//
// For PVCs that are unbound, it tries to find available PVs that can satisfy the PVC requirements
// and that the PV node affinity is satisfied by the given node.
//
// The predicate returns true if all bound PVCs have compatible PVs with the node, and if all unbound
// PVCs can be matched with an available and node-compatible PV.
func NewVolumeBindingPredicate(binder *volumebinder.VolumeBinder) FitPredicate {
c := &VolumeBindingChecker{
binder: binder,
}
return c.predicate
}
func podHasPVCs(pod *v1.Pod) bool {
for _, vol := range pod.Spec.Volumes {
if vol.PersistentVolumeClaim != nil {
return true
}
}
return false
}
func (c *VolumeBindingChecker) predicate(pod *v1.Pod, meta Metadata, nodeInfo *schedulernodeinfo.NodeInfo) (bool, []PredicateFailureReason, error) {
// If pod does not request any PVC, we don't need to do anything.
if !podHasPVCs(pod) {
return true, nil, nil
}
node := nodeInfo.Node()
if node == nil {
return false, nil, fmt.Errorf("node not found")
}
unboundSatisfied, boundSatisfied, err := c.binder.Binder.FindPodVolumes(pod, node)
if err != nil {
return false, nil, err
}
failReasons := []PredicateFailureReason{}
if !boundSatisfied {
klog.V(5).Infof("Bound PVs not satisfied for pod %v/%v, node %q", pod.Namespace, pod.Name, node.Name)
failReasons = append(failReasons, ErrVolumeNodeConflict)
}
if !unboundSatisfied {
klog.V(5).Infof("Couldn't find matching PVs for pod %v/%v, node %q", pod.Namespace, pod.Name, node.Name)
failReasons = append(failReasons, ErrVolumeBindConflict)
}
if len(failReasons) > 0 {
return false, failReasons, nil
}
// All volumes bound or matching PVs found for all unbound PVCs
klog.V(5).Infof("All PVCs found matches for pod %v/%v, node %q", pod.Namespace, pod.Name, node.Name)
return true, nil, nil
}
// EvenPodsSpreadPredicate is the legacy function using old path of metadata.
// DEPRECATED
func EvenPodsSpreadPredicate(pod *v1.Pod, meta Metadata, nodeInfo *schedulernodeinfo.NodeInfo) (bool, []PredicateFailureReason, error) {

View File

@ -133,7 +133,7 @@ func init() {
scheduler.RegisterFitPredicateFactory(
predicates.CheckVolumeBindingPred,
func(args scheduler.AlgorithmFactoryArgs) predicates.FitPredicate {
return predicates.NewVolumeBindingPredicate(args.VolumeBinder)
return nil
},
)
}

View File

@ -1,4 +1,4 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
@ -7,7 +7,6 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/scheduler/algorithm/predicates:go_default_library",
"//pkg/scheduler/framework/plugins/migration:go_default_library",
"//pkg/scheduler/framework/v1alpha1:go_default_library",
"//pkg/scheduler/nodeinfo:go_default_library",
"//pkg/scheduler/volumebinder:go_default_library",
@ -28,3 +27,17 @@ filegroup(
tags = ["automanaged"],
visibility = ["//visibility:public"],
)
go_test(
name = "go_default_test",
srcs = ["volume_binding_test.go"],
embed = [":go_default_library"],
deps = [
"//pkg/controller/volume/scheduling:go_default_library",
"//pkg/scheduler/algorithm/predicates:go_default_library",
"//pkg/scheduler/framework/v1alpha1:go_default_library",
"//pkg/scheduler/nodeinfo:go_default_library",
"//pkg/scheduler/volumebinder:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
],
)

View File

@ -21,7 +21,6 @@ import (
v1 "k8s.io/api/core/v1"
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/migration"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
"k8s.io/kubernetes/pkg/scheduler/volumebinder"
@ -29,7 +28,7 @@ import (
// VolumeBinding is a plugin that binds pod volumes in scheduling.
type VolumeBinding struct {
predicate predicates.FitPredicate
binder *volumebinder.VolumeBinder
}
var _ framework.FilterPlugin = &VolumeBinding{}
@ -42,15 +41,59 @@ func (pl *VolumeBinding) Name() string {
return Name
}
func podHasPVCs(pod *v1.Pod) bool {
for _, vol := range pod.Spec.Volumes {
if vol.PersistentVolumeClaim != nil {
return true
}
}
return false
}
// Filter invoked at the filter extension point.
// It evaluates if a pod can fit due to the volumes it requests,
// for both bound and unbound PVCs.
//
// For PVCs that are bound, then it checks that the corresponding PV's node affinity is
// satisfied by the given node.
//
// For PVCs that are unbound, it tries to find available PVs that can satisfy the PVC requirements
// and that the PV node affinity is satisfied by the given node.
//
// The predicate returns true if all bound PVCs have compatible PVs with the node, and if all unbound
// PVCs can be matched with an available and node-compatible PV.
func (pl *VolumeBinding) Filter(ctx context.Context, cs *framework.CycleState, pod *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *framework.Status {
_, reasons, err := pl.predicate(pod, nil, nodeInfo)
return migration.PredicateResultToFrameworkStatus(reasons, err)
node := nodeInfo.Node()
if node == nil {
return framework.NewStatus(framework.Error, "node not found")
}
// If pod does not request any PVC, we don't need to do anything.
if !podHasPVCs(pod) {
return nil
}
unboundSatisfied, boundSatisfied, err := pl.binder.Binder.FindPodVolumes(pod, node)
if err != nil {
return framework.NewStatus(framework.Error, err.Error())
}
if !boundSatisfied || !unboundSatisfied {
status := framework.NewStatus(framework.UnschedulableAndUnresolvable)
if !boundSatisfied {
status.AppendReason(predicates.ErrVolumeNodeConflict.GetReason())
}
if !unboundSatisfied {
status.AppendReason(predicates.ErrVolumeBindConflict.GetReason())
}
return status
}
return nil
}
// NewFromVolumeBinder initializes a new plugin with volume binder and returns it.
func NewFromVolumeBinder(volumeBinder *volumebinder.VolumeBinder) framework.Plugin {
return &VolumeBinding{
predicate: predicates.NewVolumeBindingPredicate(volumeBinder),
binder: volumeBinder,
}
}

View File

@ -0,0 +1,123 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package volumebinding
import (
"context"
"fmt"
"reflect"
"testing"
v1 "k8s.io/api/core/v1"
volumescheduling "k8s.io/kubernetes/pkg/controller/volume/scheduling"
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
"k8s.io/kubernetes/pkg/scheduler/volumebinder"
)
func TestVolumeBinding(t *testing.T) {
findErr := fmt.Errorf("find err")
volState := v1.PodSpec{
Volumes: []v1.Volume{
{
VolumeSource: v1.VolumeSource{
PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{},
},
},
},
}
table := []struct {
name string
pod *v1.Pod
node *v1.Node
volumeBinderConfig *volumescheduling.FakeVolumeBinderConfig
wantStatus *framework.Status
}{
{
name: "nothing",
pod: &v1.Pod{},
node: &v1.Node{},
wantStatus: nil,
},
{
name: "all bound",
pod: &v1.Pod{Spec: volState},
node: &v1.Node{},
volumeBinderConfig: &volumescheduling.FakeVolumeBinderConfig{
AllBound: true,
FindUnboundSatsified: true,
FindBoundSatsified: true,
},
wantStatus: nil,
},
{
name: "unbound/no matches",
pod: &v1.Pod{Spec: volState},
node: &v1.Node{},
volumeBinderConfig: &volumescheduling.FakeVolumeBinderConfig{
FindUnboundSatsified: false,
FindBoundSatsified: true,
},
wantStatus: framework.NewStatus(framework.UnschedulableAndUnresolvable, predicates.ErrVolumeBindConflict.GetReason()),
},
{
name: "bound and unbound unsatisfied",
pod: &v1.Pod{Spec: volState},
node: &v1.Node{},
volumeBinderConfig: &volumescheduling.FakeVolumeBinderConfig{
FindUnboundSatsified: false,
FindBoundSatsified: false,
},
wantStatus: framework.NewStatus(framework.UnschedulableAndUnresolvable, predicates.ErrVolumeNodeConflict.GetReason(),
predicates.ErrVolumeBindConflict.GetReason()),
},
{
name: "unbound/found matches/bind succeeds",
pod: &v1.Pod{Spec: volState},
node: &v1.Node{},
volumeBinderConfig: &volumescheduling.FakeVolumeBinderConfig{
FindUnboundSatsified: true,
FindBoundSatsified: true,
},
wantStatus: nil,
},
{
name: "predicate error",
pod: &v1.Pod{Spec: volState},
node: &v1.Node{},
volumeBinderConfig: &volumescheduling.FakeVolumeBinderConfig{
FindErr: findErr,
},
wantStatus: framework.NewStatus(framework.Error, findErr.Error()),
},
}
for _, item := range table {
t.Run(item.name, func(t *testing.T) {
nodeInfo := schedulernodeinfo.NewNodeInfo()
nodeInfo.SetNode(item.node)
fakeVolumeBinder := volumebinder.NewFakeVolumeBinder(item.volumeBinderConfig)
p := NewFromVolumeBinder(fakeVolumeBinder)
gotStatus := p.(framework.FilterPlugin).Filter(context.Background(), nil, item.pod, nodeInfo)
if !reflect.DeepEqual(gotStatus, item.wantStatus) {
t.Errorf("status does not match: %v, want: %v", gotStatus, item.wantStatus)
}
})
}
}