mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-22 11:21:47 +00:00
Merge pull request #56577 from resouer/fix-eclass-pvc
Automatic merge from submit-queue (batch tested with PRs 56688, 56577). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>. Add pvc as part of equivalence hash **What this PR does / why we need it**: Should add PVC as part of equivalence hash so that `StatefulSe`t and `Operator` will always run the volume predicate, while the `ReplicaSet` can still re-use cached ones. **Which issue(s) this PR fixes** *(optional, in `fixes #<issue number>(, fixes #<issue_number>, ...)` format, will close the issue(s) when PR gets merged)*: Fixes #56265 **Special notes for your reviewer**: **Release note**: ```release-note Add pvc as part of equivalence hash ```
This commit is contained in:
commit
305d644363
@ -12,6 +12,7 @@ go_library(
|
|||||||
"error.go",
|
"error.go",
|
||||||
"metadata.go",
|
"metadata.go",
|
||||||
"predicates.go",
|
"predicates.go",
|
||||||
|
"testing_helper.go",
|
||||||
"utils.go",
|
"utils.go",
|
||||||
],
|
],
|
||||||
importpath = "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/predicates",
|
importpath = "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/predicates",
|
||||||
@ -34,6 +35,7 @@ go_library(
|
|||||||
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||||
"//vendor/k8s.io/apimachinery/pkg/labels:go_default_library",
|
"//vendor/k8s.io/apimachinery/pkg/labels:go_default_library",
|
||||||
"//vendor/k8s.io/apimachinery/pkg/util/rand:go_default_library",
|
"//vendor/k8s.io/apimachinery/pkg/util/rand:go_default_library",
|
||||||
|
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
|
||||||
"//vendor/k8s.io/apiserver/pkg/util/feature:go_default_library",
|
"//vendor/k8s.io/apiserver/pkg/util/feature:go_default_library",
|
||||||
"//vendor/k8s.io/client-go/listers/core/v1:go_default_library",
|
"//vendor/k8s.io/client-go/listers/core/v1:go_default_library",
|
||||||
"//vendor/k8s.io/client-go/listers/storage/v1:go_default_library",
|
"//vendor/k8s.io/client-go/listers/storage/v1:go_default_library",
|
||||||
|
@ -17,7 +17,6 @@ limitations under the License.
|
|||||||
package predicates
|
package predicates
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
|
||||||
"os"
|
"os"
|
||||||
"reflect"
|
"reflect"
|
||||||
"strconv"
|
"strconv"
|
||||||
@ -36,57 +35,6 @@ import (
|
|||||||
schedutil "k8s.io/kubernetes/plugin/pkg/scheduler/util"
|
schedutil "k8s.io/kubernetes/plugin/pkg/scheduler/util"
|
||||||
)
|
)
|
||||||
|
|
||||||
type FakeNodeInfo v1.Node
|
|
||||||
|
|
||||||
func (n FakeNodeInfo) GetNodeInfo(nodeName string) (*v1.Node, error) {
|
|
||||||
node := v1.Node(n)
|
|
||||||
return &node, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
type FakeNodeListInfo []v1.Node
|
|
||||||
|
|
||||||
func (nodes FakeNodeListInfo) GetNodeInfo(nodeName string) (*v1.Node, error) {
|
|
||||||
for _, node := range nodes {
|
|
||||||
if node.Name == nodeName {
|
|
||||||
return &node, nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nil, fmt.Errorf("Unable to find node: %s", nodeName)
|
|
||||||
}
|
|
||||||
|
|
||||||
type FakePersistentVolumeClaimInfo []v1.PersistentVolumeClaim
|
|
||||||
|
|
||||||
func (pvcs FakePersistentVolumeClaimInfo) GetPersistentVolumeClaimInfo(namespace string, pvcID string) (*v1.PersistentVolumeClaim, error) {
|
|
||||||
for _, pvc := range pvcs {
|
|
||||||
if pvc.Name == pvcID && pvc.Namespace == namespace {
|
|
||||||
return &pvc, nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nil, fmt.Errorf("Unable to find persistent volume claim: %s/%s", namespace, pvcID)
|
|
||||||
}
|
|
||||||
|
|
||||||
type FakePersistentVolumeInfo []v1.PersistentVolume
|
|
||||||
|
|
||||||
func (pvs FakePersistentVolumeInfo) GetPersistentVolumeInfo(pvID string) (*v1.PersistentVolume, error) {
|
|
||||||
for _, pv := range pvs {
|
|
||||||
if pv.Name == pvID {
|
|
||||||
return &pv, nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nil, fmt.Errorf("Unable to find persistent volume: %s", pvID)
|
|
||||||
}
|
|
||||||
|
|
||||||
type FakeStorageClassInfo []storagev1.StorageClass
|
|
||||||
|
|
||||||
func (classes FakeStorageClassInfo) GetStorageClassInfo(name string) (*storagev1.StorageClass, error) {
|
|
||||||
for _, sc := range classes {
|
|
||||||
if sc.Name == name {
|
|
||||||
return &sc, nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nil, fmt.Errorf("Unable to find storage class: %s", name)
|
|
||||||
}
|
|
||||||
|
|
||||||
var (
|
var (
|
||||||
extendedResourceA = v1.ResourceName("example.com/aaa")
|
extendedResourceA = v1.ResourceName("example.com/aaa")
|
||||||
extendedResourceB = v1.ResourceName("example.com/bbb")
|
extendedResourceB = v1.ResourceName("example.com/bbb")
|
||||||
|
75
plugin/pkg/scheduler/algorithm/predicates/testing_helper.go
Normal file
75
plugin/pkg/scheduler/algorithm/predicates/testing_helper.go
Normal file
@ -0,0 +1,75 @@
|
|||||||
|
/*
|
||||||
|
Copyright 2017 The Kubernetes Authors.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package predicates
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"k8s.io/api/core/v1"
|
||||||
|
storagev1 "k8s.io/api/storage/v1"
|
||||||
|
)
|
||||||
|
|
||||||
|
type FakePersistentVolumeClaimInfo []v1.PersistentVolumeClaim
|
||||||
|
|
||||||
|
func (pvcs FakePersistentVolumeClaimInfo) GetPersistentVolumeClaimInfo(namespace string, pvcID string) (*v1.PersistentVolumeClaim, error) {
|
||||||
|
for _, pvc := range pvcs {
|
||||||
|
if pvc.Name == pvcID && pvc.Namespace == namespace {
|
||||||
|
return &pvc, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil, fmt.Errorf("Unable to find persistent volume claim: %s/%s", namespace, pvcID)
|
||||||
|
}
|
||||||
|
|
||||||
|
type FakeNodeInfo v1.Node
|
||||||
|
|
||||||
|
func (n FakeNodeInfo) GetNodeInfo(nodeName string) (*v1.Node, error) {
|
||||||
|
node := v1.Node(n)
|
||||||
|
return &node, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type FakeNodeListInfo []v1.Node
|
||||||
|
|
||||||
|
func (nodes FakeNodeListInfo) GetNodeInfo(nodeName string) (*v1.Node, error) {
|
||||||
|
for _, node := range nodes {
|
||||||
|
if node.Name == nodeName {
|
||||||
|
return &node, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil, fmt.Errorf("Unable to find node: %s", nodeName)
|
||||||
|
}
|
||||||
|
|
||||||
|
type FakePersistentVolumeInfo []v1.PersistentVolume
|
||||||
|
|
||||||
|
func (pvs FakePersistentVolumeInfo) GetPersistentVolumeInfo(pvID string) (*v1.PersistentVolume, error) {
|
||||||
|
for _, pv := range pvs {
|
||||||
|
if pv.Name == pvID {
|
||||||
|
return &pv, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil, fmt.Errorf("Unable to find persistent volume: %s", pvID)
|
||||||
|
}
|
||||||
|
|
||||||
|
type FakeStorageClassInfo []storagev1.StorageClass
|
||||||
|
|
||||||
|
func (classes FakeStorageClassInfo) GetStorageClassInfo(name string) (*storagev1.StorageClass, error) {
|
||||||
|
for _, sc := range classes {
|
||||||
|
if sc.Name == name {
|
||||||
|
return &sc, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil, fmt.Errorf("Unable to find storage class: %s", name)
|
||||||
|
}
|
@ -19,9 +19,13 @@ package predicates
|
|||||||
import (
|
import (
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
|
"github.com/golang/glog"
|
||||||
|
|
||||||
"k8s.io/api/core/v1"
|
"k8s.io/api/core/v1"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/labels"
|
"k8s.io/apimachinery/pkg/labels"
|
||||||
|
"k8s.io/apimachinery/pkg/util/sets"
|
||||||
|
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm"
|
||||||
schedutil "k8s.io/kubernetes/plugin/pkg/scheduler/util"
|
schedutil "k8s.io/kubernetes/plugin/pkg/scheduler/util"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -69,28 +73,66 @@ func CreateSelectorFromLabels(aL map[string]string) labels.Selector {
|
|||||||
return labels.Set(aL).AsSelector()
|
return labels.Set(aL).AsSelector()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// EquivalencePodGenerator is a generator of equivalence class for pod with consideration of PVC info.
|
||||||
|
type EquivalencePodGenerator struct {
|
||||||
|
pvcInfo PersistentVolumeClaimInfo
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewEquivalencePodGenerator returns a getEquivalencePod method with consideration of PVC info.
|
||||||
|
func NewEquivalencePodGenerator(pvcInfo PersistentVolumeClaimInfo) algorithm.GetEquivalencePodFunc {
|
||||||
|
g := &EquivalencePodGenerator{
|
||||||
|
pvcInfo: pvcInfo,
|
||||||
|
}
|
||||||
|
return g.getEquivalencePod
|
||||||
|
}
|
||||||
|
|
||||||
// GetEquivalencePod returns a EquivalencePod which contains a group of pod attributes which can be reused.
|
// GetEquivalencePod returns a EquivalencePod which contains a group of pod attributes which can be reused.
|
||||||
func GetEquivalencePod(pod *v1.Pod) interface{} {
|
func (e *EquivalencePodGenerator) getEquivalencePod(pod *v1.Pod) interface{} {
|
||||||
// For now we only consider pods:
|
// For now we only consider pods:
|
||||||
// 1. OwnerReferences is Controller
|
// 1. OwnerReferences is Controller
|
||||||
// 2. with same OwnerReferences
|
// 2. with same OwnerReferences
|
||||||
|
// 3. with same PVC claim
|
||||||
// to be equivalent
|
// to be equivalent
|
||||||
if len(pod.OwnerReferences) != 0 {
|
for _, ref := range pod.OwnerReferences {
|
||||||
for _, ref := range pod.OwnerReferences {
|
if ref.Controller != nil && *ref.Controller {
|
||||||
if *ref.Controller {
|
if pvcSet, err := e.getPVCSet(pod); err == nil {
|
||||||
// a pod can only belongs to one controller
|
// A pod can only belongs to one controller, so let's return.
|
||||||
return &EquivalencePod{
|
return &EquivalencePod{
|
||||||
ControllerRef: ref,
|
ControllerRef: ref,
|
||||||
|
PVCSet: pvcSet,
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
// If error encountered, log warning and return nil (i.e. no equivalent pod found)
|
||||||
|
glog.Warningf("[EquivalencePodGenerator] for pod: %v failed due to: %v", pod.GetName(), err)
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// getPVCSet returns a set of PVC UIDs of given pod.
|
||||||
|
func (e *EquivalencePodGenerator) getPVCSet(pod *v1.Pod) (sets.String, error) {
|
||||||
|
result := sets.NewString()
|
||||||
|
for _, volume := range pod.Spec.Volumes {
|
||||||
|
if volume.PersistentVolumeClaim == nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
pvcName := volume.PersistentVolumeClaim.ClaimName
|
||||||
|
pvc, err := e.pvcInfo.GetPersistentVolumeClaimInfo(pod.GetNamespace(), pvcName)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
result.Insert(string(pvc.UID))
|
||||||
|
}
|
||||||
|
|
||||||
|
return result, nil
|
||||||
|
}
|
||||||
|
|
||||||
// EquivalencePod is a group of pod attributes which can be reused as equivalence to schedule other pods.
|
// EquivalencePod is a group of pod attributes which can be reused as equivalence to schedule other pods.
|
||||||
type EquivalencePod struct {
|
type EquivalencePod struct {
|
||||||
ControllerRef metav1.OwnerReference
|
ControllerRef metav1.OwnerReference
|
||||||
|
PVCSet sets.String
|
||||||
}
|
}
|
||||||
|
|
||||||
type hostPortInfo struct {
|
type hostPortInfo struct {
|
||||||
|
@ -80,8 +80,12 @@ func init() {
|
|||||||
// Fit is determined by node selector query.
|
// Fit is determined by node selector query.
|
||||||
factory.RegisterFitPredicate("MatchNodeSelector", predicates.PodMatchNodeSelector)
|
factory.RegisterFitPredicate("MatchNodeSelector", predicates.PodMatchNodeSelector)
|
||||||
|
|
||||||
// Use equivalence class to speed up predicates & priorities
|
// Use equivalence class to speed up heavy predicates phase.
|
||||||
factory.RegisterGetEquivalencePodFunction(predicates.GetEquivalencePod)
|
factory.RegisterGetEquivalencePodFunction(
|
||||||
|
func(args factory.PluginFactoryArgs) algorithm.GetEquivalencePodFunc {
|
||||||
|
return predicates.NewEquivalencePodGenerator(args.PVCInfo)
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
// ServiceSpreadingPriority is a priority config factory that spreads pods by minimizing
|
// ServiceSpreadingPriority is a priority config factory that spreads pods by minimizing
|
||||||
// the number of pods (belonging to the same service) on the same node.
|
// the number of pods (belonging to the same service) on the same node.
|
||||||
|
@ -173,7 +173,7 @@ func (ec *EquivalenceCache) InvalidateAllCachedPredicateItemOfNode(nodeName stri
|
|||||||
|
|
||||||
// InvalidateCachedPredicateItemForPodAdd is a wrapper of InvalidateCachedPredicateItem for pod add case
|
// InvalidateCachedPredicateItemForPodAdd is a wrapper of InvalidateCachedPredicateItem for pod add case
|
||||||
func (ec *EquivalenceCache) InvalidateCachedPredicateItemForPodAdd(pod *v1.Pod, nodeName string) {
|
func (ec *EquivalenceCache) InvalidateCachedPredicateItemForPodAdd(pod *v1.Pod, nodeName string) {
|
||||||
// MatchInterPodAffinity: we assume scheduler can make sure newly binded pod
|
// MatchInterPodAffinity: we assume scheduler can make sure newly bound pod
|
||||||
// will not break the existing inter pod affinity. So we does not need to invalidate
|
// will not break the existing inter pod affinity. So we does not need to invalidate
|
||||||
// MatchInterPodAffinity when pod added.
|
// MatchInterPodAffinity when pod added.
|
||||||
//
|
//
|
||||||
@ -188,12 +188,29 @@ func (ec *EquivalenceCache) InvalidateCachedPredicateItemForPodAdd(pod *v1.Pod,
|
|||||||
|
|
||||||
// GeneralPredicates: will always be affected by adding a new pod
|
// GeneralPredicates: will always be affected by adding a new pod
|
||||||
invalidPredicates := sets.NewString("GeneralPredicates")
|
invalidPredicates := sets.NewString("GeneralPredicates")
|
||||||
|
|
||||||
|
// MaxPDVolumeCountPredicate: we check the volumes of pod to make decision.
|
||||||
|
for _, vol := range pod.Spec.Volumes {
|
||||||
|
if vol.PersistentVolumeClaim != nil {
|
||||||
|
invalidPredicates.Insert("MaxEBSVolumeCount", "MaxGCEPDVolumeCount", "MaxAzureDiskVolumeCount")
|
||||||
|
} else {
|
||||||
|
if vol.AWSElasticBlockStore != nil {
|
||||||
|
invalidPredicates.Insert("MaxEBSVolumeCount")
|
||||||
|
}
|
||||||
|
if vol.GCEPersistentDisk != nil {
|
||||||
|
invalidPredicates.Insert("MaxGCEPDVolumeCount")
|
||||||
|
}
|
||||||
|
if vol.AzureDisk != nil {
|
||||||
|
invalidPredicates.Insert("MaxAzureDiskVolumeCount")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
ec.InvalidateCachedPredicateItem(nodeName, invalidPredicates)
|
ec.InvalidateCachedPredicateItem(nodeName, invalidPredicates)
|
||||||
}
|
}
|
||||||
|
|
||||||
// getHashEquivalencePod returns the hash of equivalence pod.
|
// getHashEquivalencePod returns the hash of equivalence pod.
|
||||||
// 1. equivalenceHash
|
// 1. equivalenceHash
|
||||||
// 2. if equivalence pod is found
|
// 2. if equivalence hash is valid
|
||||||
func (ec *EquivalenceCache) getHashEquivalencePod(pod *v1.Pod) (uint64, bool) {
|
func (ec *EquivalenceCache) getHashEquivalencePod(pod *v1.Pod) (uint64, bool) {
|
||||||
equivalencePod := ec.getEquivalencePod(pod)
|
equivalencePod := ec.getEquivalencePod(pod)
|
||||||
if equivalencePod != nil {
|
if equivalencePod != nil {
|
||||||
|
@ -238,13 +238,37 @@ func TestPredicateWithECache(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestGetHashEquivalencePod(t *testing.T) {
|
func TestGetHashEquivalencePod(t *testing.T) {
|
||||||
// use default equivalence class calculator
|
|
||||||
ecache := NewEquivalenceCache(predicates.GetEquivalencePod)
|
testNamespace := "test"
|
||||||
|
|
||||||
|
pvcInfo := predicates.FakePersistentVolumeClaimInfo{
|
||||||
|
{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{UID: "someEBSVol1", Name: "someEBSVol1", Namespace: testNamespace},
|
||||||
|
Spec: v1.PersistentVolumeClaimSpec{VolumeName: "someEBSVol1"},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{UID: "someEBSVol2", Name: "someEBSVol2", Namespace: testNamespace},
|
||||||
|
Spec: v1.PersistentVolumeClaimSpec{VolumeName: "someNonEBSVol"},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{UID: "someEBSVol3-0", Name: "someEBSVol3-0", Namespace: testNamespace},
|
||||||
|
Spec: v1.PersistentVolumeClaimSpec{VolumeName: "pvcWithDeletedPV"},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{UID: "someEBSVol3-1", Name: "someEBSVol3-1", Namespace: testNamespace},
|
||||||
|
Spec: v1.PersistentVolumeClaimSpec{VolumeName: "anotherPVCWithDeletedPV"},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
// use default equivalence class generator
|
||||||
|
ecache := NewEquivalenceCache(predicates.NewEquivalencePodGenerator(pvcInfo))
|
||||||
|
|
||||||
isController := true
|
isController := true
|
||||||
|
|
||||||
pod1 := &v1.Pod{
|
pod1 := &v1.Pod{
|
||||||
ObjectMeta: metav1.ObjectMeta{
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
Name: "pod1",
|
Name: "pod1",
|
||||||
|
Namespace: testNamespace,
|
||||||
OwnerReferences: []metav1.OwnerReference{
|
OwnerReferences: []metav1.OwnerReference{
|
||||||
{
|
{
|
||||||
APIVersion: "v1",
|
APIVersion: "v1",
|
||||||
@ -255,11 +279,30 @@ func TestGetHashEquivalencePod(t *testing.T) {
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
Spec: v1.PodSpec{
|
||||||
|
Volumes: []v1.Volume{
|
||||||
|
{
|
||||||
|
VolumeSource: v1.VolumeSource{
|
||||||
|
PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{
|
||||||
|
ClaimName: "someEBSVol1",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
VolumeSource: v1.VolumeSource{
|
||||||
|
PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{
|
||||||
|
ClaimName: "someEBSVol2",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
pod2 := &v1.Pod{
|
pod2 := &v1.Pod{
|
||||||
ObjectMeta: metav1.ObjectMeta{
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
Name: "pod2",
|
Name: "pod2",
|
||||||
|
Namespace: testNamespace,
|
||||||
OwnerReferences: []metav1.OwnerReference{
|
OwnerReferences: []metav1.OwnerReference{
|
||||||
{
|
{
|
||||||
APIVersion: "v1",
|
APIVersion: "v1",
|
||||||
@ -270,11 +313,118 @@ func TestGetHashEquivalencePod(t *testing.T) {
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
Spec: v1.PodSpec{
|
||||||
|
Volumes: []v1.Volume{
|
||||||
|
{
|
||||||
|
VolumeSource: v1.VolumeSource{
|
||||||
|
PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{
|
||||||
|
ClaimName: "someEBSVol2",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
VolumeSource: v1.VolumeSource{
|
||||||
|
PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{
|
||||||
|
ClaimName: "someEBSVol1",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
pod3 := &v1.Pod{
|
pod3 := &v1.Pod{
|
||||||
ObjectMeta: metav1.ObjectMeta{
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
Name: "pod3",
|
Name: "pod3",
|
||||||
|
Namespace: testNamespace,
|
||||||
|
OwnerReferences: []metav1.OwnerReference{
|
||||||
|
{
|
||||||
|
APIVersion: "v1",
|
||||||
|
Kind: "ReplicationController",
|
||||||
|
Name: "rc",
|
||||||
|
UID: "567",
|
||||||
|
Controller: &isController,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Spec: v1.PodSpec{
|
||||||
|
Volumes: []v1.Volume{
|
||||||
|
{
|
||||||
|
VolumeSource: v1.VolumeSource{
|
||||||
|
PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{
|
||||||
|
ClaimName: "someEBSVol3-1",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
pod4 := &v1.Pod{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Name: "pod4",
|
||||||
|
Namespace: testNamespace,
|
||||||
|
OwnerReferences: []metav1.OwnerReference{
|
||||||
|
{
|
||||||
|
APIVersion: "v1",
|
||||||
|
Kind: "ReplicationController",
|
||||||
|
Name: "rc",
|
||||||
|
UID: "567",
|
||||||
|
Controller: &isController,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Spec: v1.PodSpec{
|
||||||
|
Volumes: []v1.Volume{
|
||||||
|
{
|
||||||
|
VolumeSource: v1.VolumeSource{
|
||||||
|
PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{
|
||||||
|
ClaimName: "someEBSVol3-0",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
pod5 := &v1.Pod{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Name: "pod5",
|
||||||
|
Namespace: testNamespace,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
pod6 := &v1.Pod{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Name: "pod6",
|
||||||
|
Namespace: testNamespace,
|
||||||
|
OwnerReferences: []metav1.OwnerReference{
|
||||||
|
{
|
||||||
|
APIVersion: "v1",
|
||||||
|
Kind: "ReplicationController",
|
||||||
|
Name: "rc",
|
||||||
|
UID: "567",
|
||||||
|
Controller: &isController,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Spec: v1.PodSpec{
|
||||||
|
Volumes: []v1.Volume{
|
||||||
|
{
|
||||||
|
VolumeSource: v1.VolumeSource{
|
||||||
|
PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{
|
||||||
|
ClaimName: "no-exists-pvc",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
pod7 := &v1.Pod{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Name: "pod7",
|
||||||
|
Namespace: testNamespace,
|
||||||
OwnerReferences: []metav1.OwnerReference{
|
OwnerReferences: []metav1.OwnerReference{
|
||||||
{
|
{
|
||||||
APIVersion: "v1",
|
APIVersion: "v1",
|
||||||
@ -287,28 +437,73 @@ func TestGetHashEquivalencePod(t *testing.T) {
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
hash1, _ := ecache.getHashEquivalencePod(pod1)
|
type podInfo struct {
|
||||||
hash2, _ := ecache.getHashEquivalencePod(pod2)
|
pod *v1.Pod
|
||||||
hash3, _ := ecache.getHashEquivalencePod(pod3)
|
hashIsValid bool
|
||||||
|
|
||||||
if hash1 != hash2 {
|
|
||||||
t.Errorf("Failed: pod %v and %v is expected to be equivalent", pod1.Name, pod2.Name)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if hash2 == hash3 {
|
tests := []struct {
|
||||||
t.Errorf("Failed: pod %v and %v is not expected to be equivalent", pod2.Name, pod3.Name)
|
podInfoList []podInfo
|
||||||
}
|
isEquivalent bool
|
||||||
|
}{
|
||||||
// pod4 is a pod without controller ref
|
// pods with same controllerRef and same pvc claim
|
||||||
pod4 := &v1.Pod{
|
{
|
||||||
ObjectMeta: metav1.ObjectMeta{
|
podInfoList: []podInfo{
|
||||||
Name: "pod4",
|
{pod: pod1, hashIsValid: true},
|
||||||
|
{pod: pod2, hashIsValid: true},
|
||||||
|
},
|
||||||
|
isEquivalent: true,
|
||||||
|
},
|
||||||
|
// pods with same controllerRef but different pvc claim
|
||||||
|
{
|
||||||
|
podInfoList: []podInfo{
|
||||||
|
{pod: pod3, hashIsValid: true},
|
||||||
|
{pod: pod4, hashIsValid: true},
|
||||||
|
},
|
||||||
|
isEquivalent: false,
|
||||||
|
},
|
||||||
|
// pod without controllerRef
|
||||||
|
{
|
||||||
|
podInfoList: []podInfo{
|
||||||
|
{pod: pod5, hashIsValid: false},
|
||||||
|
},
|
||||||
|
isEquivalent: false,
|
||||||
|
},
|
||||||
|
// pods with same controllerRef but one has non-exists pvc claim
|
||||||
|
{
|
||||||
|
podInfoList: []podInfo{
|
||||||
|
{pod: pod6, hashIsValid: false},
|
||||||
|
{pod: pod7, hashIsValid: true},
|
||||||
|
},
|
||||||
|
isEquivalent: false,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
_, found := ecache.getHashEquivalencePod(pod4)
|
|
||||||
if found {
|
var (
|
||||||
t.Errorf("Failed: equivalence hash of pod %v is not expected to be found, but got: %v",
|
targetPodInfo podInfo
|
||||||
pod4.Name, found)
|
targetHash uint64
|
||||||
|
)
|
||||||
|
|
||||||
|
for _, test := range tests {
|
||||||
|
for i, podInfo := range test.podInfoList {
|
||||||
|
testPod := podInfo.pod
|
||||||
|
hash, isValid := ecache.getHashEquivalencePod(testPod)
|
||||||
|
if isValid != podInfo.hashIsValid {
|
||||||
|
t.Errorf("Failed: pod %v is expected to have valid hash", testPod)
|
||||||
|
}
|
||||||
|
// NOTE(harry): the first element will be used as target so
|
||||||
|
// this logic can't verify more than two inequivalent pods
|
||||||
|
if i == 0 {
|
||||||
|
targetHash = hash
|
||||||
|
targetPodInfo = podInfo
|
||||||
|
} else {
|
||||||
|
if targetHash != hash {
|
||||||
|
if test.isEquivalent {
|
||||||
|
t.Errorf("Failed: pod: %v is expected to be equivalent to: %v", testPod, targetPodInfo.pod)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -71,11 +71,11 @@ const (
|
|||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
serviceAffinitySet = sets.NewString("ServiceAffinity")
|
serviceAffinitySet = sets.NewString("ServiceAffinity")
|
||||||
maxPDVolumeCountPredicateSet = sets.NewString("MaxPDVolumeCountPredicate")
|
matchInterPodAffinitySet = sets.NewString("MatchInterPodAffinity")
|
||||||
matchInterPodAffinitySet = sets.NewString("MatchInterPodAffinity")
|
generalPredicatesSets = sets.NewString("GeneralPredicates")
|
||||||
generalPredicatesSets = sets.NewString("GeneralPredicates")
|
noDiskConflictSet = sets.NewString("NoDiskConflict")
|
||||||
noDiskConflictSet = sets.NewString("NoDiskConflict")
|
maxPDVolumeCountPredicateKeys = []string{"MaxGCEPDVolumeCount", "MaxAzureDiskVolumeCount", "MaxEBSVolumeCount"}
|
||||||
)
|
)
|
||||||
|
|
||||||
// configFactory is the default implementation of the scheduler.Configurator interface.
|
// configFactory is the default implementation of the scheduler.Configurator interface.
|
||||||
@ -384,7 +384,11 @@ func (c *configFactory) onPvDelete(obj interface{}) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (c *configFactory) invalidatePredicatesForPv(pv *v1.PersistentVolume) {
|
func (c *configFactory) invalidatePredicatesForPv(pv *v1.PersistentVolume) {
|
||||||
invalidPredicates := sets.NewString("MaxPDVolumeCountPredicate")
|
// You could have a PVC that points to a PV, but the PV object doesn't exist.
|
||||||
|
// So when the PV object gets added, we can recount.
|
||||||
|
invalidPredicates := sets.NewString()
|
||||||
|
|
||||||
|
// PV types which impact MaxPDVolumeCountPredicate
|
||||||
if pv.Spec.AWSElasticBlockStore != nil {
|
if pv.Spec.AWSElasticBlockStore != nil {
|
||||||
invalidPredicates.Insert("MaxEBSVolumeCount")
|
invalidPredicates.Insert("MaxEBSVolumeCount")
|
||||||
}
|
}
|
||||||
@ -395,6 +399,14 @@ func (c *configFactory) invalidatePredicatesForPv(pv *v1.PersistentVolume) {
|
|||||||
invalidPredicates.Insert("MaxAzureDiskVolumeCount")
|
invalidPredicates.Insert("MaxAzureDiskVolumeCount")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// If PV contains zone related label, it may impact cached NoVolumeZoneConflict
|
||||||
|
for k := range pv.ObjectMeta.Labels {
|
||||||
|
if k == kubeletapis.LabelZoneFailureDomain || k == kubeletapis.LabelZoneRegion {
|
||||||
|
invalidPredicates.Insert("NoVolumeZoneConflict")
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) {
|
if utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) {
|
||||||
// Add/delete impacts the available PVs to choose from
|
// Add/delete impacts the available PVs to choose from
|
||||||
invalidPredicates.Insert(predicates.CheckVolumeBinding)
|
invalidPredicates.Insert(predicates.CheckVolumeBinding)
|
||||||
@ -458,24 +470,36 @@ func (c *configFactory) onPvcDelete(obj interface{}) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (c *configFactory) invalidatePredicatesForPvc(pvc *v1.PersistentVolumeClaim) {
|
func (c *configFactory) invalidatePredicatesForPvc(pvc *v1.PersistentVolumeClaim) {
|
||||||
if pvc.Spec.VolumeName != "" {
|
// We need to do this here because the ecache uses PVC uid as part of equivalence hash of pod
|
||||||
c.equivalencePodCache.InvalidateCachedPredicateItemOfAllNodes(maxPDVolumeCountPredicateSet)
|
|
||||||
|
// The bound volume type may change
|
||||||
|
invalidPredicates := sets.NewString(maxPDVolumeCountPredicateKeys...)
|
||||||
|
|
||||||
|
// The bound volume's label may change
|
||||||
|
invalidPredicates.Insert("NoVolumeZoneConflict")
|
||||||
|
|
||||||
|
if utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) {
|
||||||
|
// Add/delete impacts the available PVs to choose from
|
||||||
|
invalidPredicates.Insert(predicates.CheckVolumeBinding)
|
||||||
}
|
}
|
||||||
|
c.equivalencePodCache.InvalidateCachedPredicateItemOfAllNodes(invalidPredicates)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *configFactory) invalidatePredicatesForPvcUpdate(old, new *v1.PersistentVolumeClaim) {
|
func (c *configFactory) invalidatePredicatesForPvcUpdate(old, new *v1.PersistentVolumeClaim) {
|
||||||
invalidPredicates := sets.NewString()
|
invalidPredicates := sets.NewString()
|
||||||
|
|
||||||
if utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) {
|
if old.Spec.VolumeName != new.Spec.VolumeName {
|
||||||
if old.Spec.VolumeName != new.Spec.VolumeName {
|
if utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) {
|
||||||
// PVC volume binding has changed
|
// PVC volume binding has changed
|
||||||
invalidPredicates.Insert(predicates.CheckVolumeBinding)
|
invalidPredicates.Insert(predicates.CheckVolumeBinding)
|
||||||
}
|
}
|
||||||
|
// The bound volume type may change
|
||||||
|
invalidPredicates.Insert(maxPDVolumeCountPredicateKeys...)
|
||||||
|
// The bound volume's label may change
|
||||||
|
invalidPredicates.Insert("NoVolumeZoneConflict")
|
||||||
}
|
}
|
||||||
|
|
||||||
if invalidPredicates.Len() > 0 {
|
c.equivalencePodCache.InvalidateCachedPredicateItemOfAllNodes(invalidPredicates)
|
||||||
c.equivalencePodCache.InvalidateCachedPredicateItemOfAllNodes(invalidPredicates)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *configFactory) onServiceAdd(obj interface{}) {
|
func (c *configFactory) onServiceAdd(obj interface{}) {
|
||||||
@ -541,7 +565,7 @@ func (c *configFactory) addPodToCache(obj interface{}) {
|
|||||||
c.podQueue.AssignedPodAdded(pod)
|
c.podQueue.AssignedPodAdded(pod)
|
||||||
|
|
||||||
// NOTE: Updating equivalence cache of addPodToCache has been
|
// NOTE: Updating equivalence cache of addPodToCache has been
|
||||||
// handled optimistically in InvalidateCachedPredicateItemForPodAdd.
|
// handled optimistically in: plugin/pkg/scheduler/scheduler.go#assume()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *configFactory) updatePodInCache(oldObj, newObj interface{}) {
|
func (c *configFactory) updatePodInCache(oldObj, newObj interface{}) {
|
||||||
@ -566,8 +590,8 @@ func (c *configFactory) updatePodInCache(oldObj, newObj interface{}) {
|
|||||||
|
|
||||||
func (c *configFactory) invalidateCachedPredicatesOnUpdatePod(newPod *v1.Pod, oldPod *v1.Pod) {
|
func (c *configFactory) invalidateCachedPredicatesOnUpdatePod(newPod *v1.Pod, oldPod *v1.Pod) {
|
||||||
if c.enableEquivalenceClassCache {
|
if c.enableEquivalenceClassCache {
|
||||||
// if the pod does not have binded node, updating equivalence cache is meaningless;
|
// if the pod does not have bound node, updating equivalence cache is meaningless;
|
||||||
// if pod's binded node has been changed, that case should be handled by pod add & delete.
|
// if pod's bound node has been changed, that case should be handled by pod add & delete.
|
||||||
if len(newPod.Spec.NodeName) != 0 && newPod.Spec.NodeName == oldPod.Spec.NodeName {
|
if len(newPod.Spec.NodeName) != 0 && newPod.Spec.NodeName == oldPod.Spec.NodeName {
|
||||||
if !reflect.DeepEqual(oldPod.GetLabels(), newPod.GetLabels()) {
|
if !reflect.DeepEqual(oldPod.GetLabels(), newPod.GetLabels()) {
|
||||||
// MatchInterPodAffinity need to be reconsidered for this node,
|
// MatchInterPodAffinity need to be reconsidered for this node,
|
||||||
@ -898,8 +922,14 @@ func (f *configFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String,
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Init equivalence class cache
|
// Init equivalence class cache
|
||||||
if f.enableEquivalenceClassCache && getEquivalencePodFunc != nil {
|
if f.enableEquivalenceClassCache && getEquivalencePodFuncFactory != nil {
|
||||||
f.equivalencePodCache = core.NewEquivalenceCache(getEquivalencePodFunc)
|
pluginArgs, err := f.getPluginArgs()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
f.equivalencePodCache = core.NewEquivalenceCache(
|
||||||
|
getEquivalencePodFuncFactory(*pluginArgs),
|
||||||
|
)
|
||||||
glog.Info("Created equivalence class cache")
|
glog.Info("Created equivalence class cache")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -76,6 +76,9 @@ type PriorityConfigFactory struct {
|
|||||||
Weight int
|
Weight int
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// EquivalencePodFuncFactory produces a function to get equivalence class for given pod.
|
||||||
|
type EquivalencePodFuncFactory func(PluginFactoryArgs) algorithm.GetEquivalencePodFunc
|
||||||
|
|
||||||
var (
|
var (
|
||||||
schedulerFactoryMutex sync.Mutex
|
schedulerFactoryMutex sync.Mutex
|
||||||
|
|
||||||
@ -90,7 +93,7 @@ var (
|
|||||||
predicateMetadataProducer PredicateMetadataProducerFactory
|
predicateMetadataProducer PredicateMetadataProducerFactory
|
||||||
|
|
||||||
// get equivalence pod function
|
// get equivalence pod function
|
||||||
getEquivalencePodFunc algorithm.GetEquivalencePodFunc
|
getEquivalencePodFuncFactory EquivalencePodFuncFactory
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
@ -339,8 +342,9 @@ func RegisterCustomPriorityFunction(policy schedulerapi.PriorityPolicy) string {
|
|||||||
return RegisterPriorityConfigFactory(policy.Name, *pcf)
|
return RegisterPriorityConfigFactory(policy.Name, *pcf)
|
||||||
}
|
}
|
||||||
|
|
||||||
func RegisterGetEquivalencePodFunction(equivalenceFunc algorithm.GetEquivalencePodFunc) {
|
// RegisterGetEquivalencePodFunction registers equivalenceFuncFactory to produce equivalence class for given pod.
|
||||||
getEquivalencePodFunc = equivalenceFunc
|
func RegisterGetEquivalencePodFunction(equivalenceFuncFactory EquivalencePodFuncFactory) {
|
||||||
|
getEquivalencePodFuncFactory = equivalenceFuncFactory
|
||||||
}
|
}
|
||||||
|
|
||||||
// IsPriorityFunctionRegistered is useful for testing providers.
|
// IsPriorityFunctionRegistered is useful for testing providers.
|
||||||
|
Loading…
Reference in New Issue
Block a user