diff --git a/pkg/features/kube_features.go b/pkg/features/kube_features.go index 0fc991e4e0f..b18b106e8bd 100644 --- a/pkg/features/kube_features.go +++ b/pkg/features/kube_features.go @@ -120,6 +120,12 @@ const ( // // Add priority to pods. Priority affects scheduling and preemption of pods. PodPriority utilfeature.Feature = "PodPriority" + + // owner: @resouer + // alpha: v1.8 + // + // Enable equivalence class cache for scheduler. + EnableEquivalenceClassCache utilfeature.Feature = "EnableEquivalenceClassCache" ) func init() { @@ -144,6 +150,7 @@ var defaultKubernetesFeatureGates = map[utilfeature.Feature]utilfeature.FeatureS LocalStorageCapacityIsolation: {Default: false, PreRelease: utilfeature.Alpha}, DebugContainers: {Default: false, PreRelease: utilfeature.Alpha}, PodPriority: {Default: false, PreRelease: utilfeature.Alpha}, + EnableEquivalenceClassCache: {Default: false, PreRelease: utilfeature.Alpha}, // inherited features from generic apiserver, relisted here to get a conflict if it is changed // unintentionally on either side: diff --git a/plugin/cmd/kube-scheduler/app/BUILD b/plugin/cmd/kube-scheduler/app/BUILD index fd3f5c8f521..b1a5d104a33 100644 --- a/plugin/cmd/kube-scheduler/app/BUILD +++ b/plugin/cmd/kube-scheduler/app/BUILD @@ -18,6 +18,7 @@ go_library( deps = [ "//pkg/api:go_default_library", "//pkg/controller:go_default_library", + "//pkg/features:go_default_library", "//pkg/util/configz:go_default_library", "//plugin/cmd/kube-scheduler/app/options:go_default_library", "//plugin/pkg/scheduler:go_default_library", @@ -33,6 +34,7 @@ go_library( "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", "//vendor/k8s.io/apiserver/pkg/server/healthz:go_default_library", + "//vendor/k8s.io/apiserver/pkg/util/feature:go_default_library", "//vendor/k8s.io/client-go/informers:go_default_library", "//vendor/k8s.io/client-go/informers/apps/v1beta1:go_default_library", "//vendor/k8s.io/client-go/informers/core/v1:go_default_library", diff --git a/plugin/cmd/kube-scheduler/app/configurator.go b/plugin/cmd/kube-scheduler/app/configurator.go index a5b425fa316..3191a6a97f5 100644 --- a/plugin/cmd/kube-scheduler/app/configurator.go +++ b/plugin/cmd/kube-scheduler/app/configurator.go @@ -28,6 +28,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/kubernetes" v1core "k8s.io/client-go/kubernetes/typed/core/v1" @@ -39,6 +40,7 @@ import ( clientv1 "k8s.io/api/core/v1" + "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/plugin/pkg/scheduler" _ "k8s.io/kubernetes/plugin/pkg/scheduler/algorithmprovider" schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api" @@ -104,6 +106,7 @@ func CreateScheduler( statefulSetInformer, serviceInformer, s.HardPodAffinitySymmetricWeight, + utilfeature.DefaultFeatureGate.Enabled(features.EnableEquivalenceClassCache), ) // Rebuild the configurator with a default Create(...) method. diff --git a/plugin/pkg/scheduler/algorithmprovider/defaults/compatibility_test.go b/plugin/pkg/scheduler/algorithmprovider/defaults/compatibility_test.go index 5390a940940..fce476b15bc 100644 --- a/plugin/pkg/scheduler/algorithmprovider/defaults/compatibility_test.go +++ b/plugin/pkg/scheduler/algorithmprovider/defaults/compatibility_test.go @@ -37,6 +37,8 @@ import ( "k8s.io/kubernetes/plugin/pkg/scheduler/factory" ) +const enableEquivalenceCache = true + func TestCompatibility_v1_Scheduler(t *testing.T) { // Add serialized versions of scheduler config that exercise available options to ensure compatibility between releases schedulerFiles := map[string]struct { @@ -432,6 +434,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { informerFactory.Apps().V1beta1().StatefulSets(), informerFactory.Core().V1().Services(), v1.DefaultHardPodAffinitySymmetricWeight, + enableEquivalenceCache, ).CreateFromConfig(policy); err != nil { t.Errorf("%s: Error constructing: %v", v, err) continue diff --git a/plugin/pkg/scheduler/core/equivalence_cache.go b/plugin/pkg/scheduler/core/equivalence_cache.go index a9075677133..7b192502b23 100644 --- a/plugin/pkg/scheduler/core/equivalence_cache.go +++ b/plugin/pkg/scheduler/core/equivalence_cache.go @@ -158,7 +158,8 @@ func (ec *EquivalenceCache) InvalidateAllCachedPredicateItemOfNode(nodeName stri glog.V(5).Infof("Done invalidating all cached predicates on node: %s", nodeName) } -// InvalidateCachedPredicateItemForPod marks item of given predicateKeys, of given pod, on the given node as invalid +// InvalidateCachedPredicateItemForPod marks item of given predicateKeys, of given pod (i.e. equivalenceHash), +// on the given node as invalid func (ec *EquivalenceCache) InvalidateCachedPredicateItemForPod(nodeName string, predicateKeys sets.String, pod *v1.Pod) { if len(predicateKeys) == 0 { return diff --git a/plugin/pkg/scheduler/factory/BUILD b/plugin/pkg/scheduler/factory/BUILD index 65c060cdc67..b225411c687 100644 --- a/plugin/pkg/scheduler/factory/BUILD +++ b/plugin/pkg/scheduler/factory/BUILD @@ -16,7 +16,9 @@ go_library( ], tags = ["automanaged"], deps = [ + "//pkg/api/helper:go_default_library", "//pkg/api/v1/pod:go_default_library", + "//pkg/kubelet/apis:go_default_library", "//plugin/pkg/scheduler:go_default_library", "//plugin/pkg/scheduler/algorithm:go_default_library", "//plugin/pkg/scheduler/algorithm/predicates:go_default_library", diff --git a/plugin/pkg/scheduler/factory/factory.go b/plugin/pkg/scheduler/factory/factory.go index a498955e6ab..03348f3ee6c 100644 --- a/plugin/pkg/scheduler/factory/factory.go +++ b/plugin/pkg/scheduler/factory/factory.go @@ -20,6 +20,7 @@ package factory import ( "fmt" + "reflect" "time" "github.com/golang/glog" @@ -40,7 +41,9 @@ import ( corelisters "k8s.io/client-go/listers/core/v1" extensionslisters "k8s.io/client-go/listers/extensions/v1beta1" "k8s.io/client-go/tools/cache" + "k8s.io/kubernetes/pkg/api/helper" podutil "k8s.io/kubernetes/pkg/api/v1/pod" + kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis" "k8s.io/kubernetes/plugin/pkg/scheduler" "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm" "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/predicates" @@ -56,6 +59,14 @@ const ( maximalGetBackoff = time.Minute ) +var ( + serviceAffinitySet = sets.NewString("ServiceAffinity") + maxPDVolumeCountPredicateSet = sets.NewString("MaxPDVolumeCountPredicate") + matchInterPodAffinitySet = sets.NewString("MatchInterPodAffinity") + generalPredicatesSets = sets.NewString("GeneralPredicates") + noDiskConflictSet = sets.NewString("NoDiskConflict") +) + // ConfigFactory is the default implementation of the scheduler.Configurator interface. // TODO make this private if possible, so that only its interface is externally used. type ConfigFactory struct { @@ -99,6 +110,9 @@ type ConfigFactory struct { // Equivalence class cache equivalencePodCache *core.EquivalenceCache + + // Enable equivalence class cache + enableEquivalenceClassCache bool } // NewConfigFactory initializes the default implementation of a Configurator To encourage eventual privatization of the struct type, we only @@ -115,6 +129,7 @@ func NewConfigFactory( statefulSetInformer appsinformers.StatefulSetInformer, serviceInformer coreinformers.ServiceInformer, hardPodAffinitySymmetricWeight int, + enableEquivalenceClassCache bool, ) scheduler.Configurator { stopEverything := make(chan struct{}) schedulerCache := schedulercache.New(30*time.Second, stopEverything) @@ -133,6 +148,7 @@ func NewConfigFactory( StopEverything: stopEverything, schedulerName: schedulerName, hardPodAffinitySymmetricWeight: hardPodAffinitySymmetricWeight, + enableEquivalenceClassCache: enableEquivalenceClassCache, } c.scheduledPodsHasSynced = podInformer.Informer().HasSynced @@ -201,11 +217,154 @@ func NewConfigFactory( ) c.nodeLister = nodeInformer.Lister() - // TODO(harryz) need to fill all the handlers here and below for equivalence cache + // On add and delete of PVs, it will affect equivalence cache items + // related to persistent volume + pvInformer.Informer().AddEventHandlerWithResyncPeriod( + cache.ResourceEventHandlerFuncs{ + // MaxPDVolumeCountPredicate: since it relies on the counts of PV. + AddFunc: c.onPvAdd, + DeleteFunc: c.onPvDelete, + }, + 0, + ) + c.pVLister = pvInformer.Lister() + + // This is for MaxPDVolumeCountPredicate: add/delete PVC will affect counts of PV when it is bound. + pvcInformer.Informer().AddEventHandlerWithResyncPeriod( + cache.ResourceEventHandlerFuncs{ + AddFunc: c.onPvcAdd, + DeleteFunc: c.onPvcDelete, + }, + 0, + ) + c.pVCLister = pvcInformer.Lister() + + // This is for ServiceAffinity: affected by the selector of the service is updated. + // Also, if new service is added, equivalence cache will also become invalid since + // existing pods may be "captured" by this service and change this predicate result. + serviceInformer.Informer().AddEventHandlerWithResyncPeriod( + cache.ResourceEventHandlerFuncs{ + AddFunc: c.onServiceAdd, + UpdateFunc: c.onServiceUpdate, + DeleteFunc: c.onServiceDelete, + }, + 0, + ) + c.serviceLister = serviceInformer.Lister() + + // Existing equivalence cache should not be affected by add/delete RC/Deployment etc, + // it only make sense when pod is scheduled or deleted return c } +func (c *ConfigFactory) onPvAdd(obj interface{}) { + if c.enableEquivalenceClassCache { + pv, ok := obj.(*v1.PersistentVolume) + if !ok { + glog.Errorf("cannot convert to *v1.PersistentVolume: %v", obj) + return + } + c.invalidatePredicatesForPv(pv) + } +} + +func (c *ConfigFactory) onPvDelete(obj interface{}) { + if c.enableEquivalenceClassCache { + var pv *v1.PersistentVolume + switch t := obj.(type) { + case *v1.PersistentVolume: + pv = t + case cache.DeletedFinalStateUnknown: + var ok bool + pv, ok = t.Obj.(*v1.PersistentVolume) + if !ok { + glog.Errorf("cannot convert to *v1.PersistentVolume: %v", t.Obj) + return + } + default: + glog.Errorf("cannot convert to *v1.PersistentVolume: %v", t) + return + } + c.invalidatePredicatesForPv(pv) + } +} + +func (c *ConfigFactory) invalidatePredicatesForPv(pv *v1.PersistentVolume) { + invalidPredicates := sets.NewString("MaxPDVolumeCountPredicate") + if pv.Spec.AWSElasticBlockStore != nil { + invalidPredicates.Insert("MaxEBSVolumeCount") + } + if pv.Spec.GCEPersistentDisk != nil { + invalidPredicates.Insert("MaxGCEPDVolumeCount") + } + if pv.Spec.AzureDisk != nil { + invalidPredicates.Insert("MaxAzureDiskVolumeCount") + } + c.equivalencePodCache.InvalidateCachedPredicateItemOfAllNodes(invalidPredicates) +} + +func (c *ConfigFactory) onPvcAdd(obj interface{}) { + if c.enableEquivalenceClassCache { + pvc, ok := obj.(*v1.PersistentVolumeClaim) + if !ok { + glog.Errorf("cannot convert to *v1.PersistentVolumeClaim: %v", obj) + return + } + c.invalidatePredicatesForPvc(pvc) + } +} + +func (c *ConfigFactory) onPvcDelete(obj interface{}) { + if c.enableEquivalenceClassCache { + var pvc *v1.PersistentVolumeClaim + switch t := obj.(type) { + case *v1.PersistentVolumeClaim: + pvc = t + case cache.DeletedFinalStateUnknown: + var ok bool + pvc, ok = t.Obj.(*v1.PersistentVolumeClaim) + if !ok { + glog.Errorf("cannot convert to *v1.PersistentVolumeClaim: %v", t.Obj) + return + } + default: + glog.Errorf("cannot convert to *v1.PersistentVolumeClaim: %v", t) + return + } + c.invalidatePredicatesForPvc(pvc) + } +} + +func (c *ConfigFactory) invalidatePredicatesForPvc(pvc *v1.PersistentVolumeClaim) { + if pvc.Spec.VolumeName != "" { + c.equivalencePodCache.InvalidateCachedPredicateItemOfAllNodes(maxPDVolumeCountPredicateSet) + } +} + +func (c *ConfigFactory) onServiceAdd(obj interface{}) { + if c.enableEquivalenceClassCache { + c.equivalencePodCache.InvalidateCachedPredicateItemOfAllNodes(serviceAffinitySet) + } +} + +func (c *ConfigFactory) onServiceUpdate(oldObj interface{}, newObj interface{}) { + if c.enableEquivalenceClassCache { + // TODO(resouer) We may need to invalidate this for specified group of pods only + oldService := oldObj.(*v1.Service) + newService := newObj.(*v1.Service) + if !reflect.DeepEqual(oldService.Spec.Selector, newService.Spec.Selector) { + c.equivalencePodCache.InvalidateCachedPredicateItemOfAllNodes(serviceAffinitySet) + } + } +} + +func (c *ConfigFactory) onServiceDelete(obj interface{}) { + if c.enableEquivalenceClassCache { + c.equivalencePodCache.InvalidateCachedPredicateItemOfAllNodes(serviceAffinitySet) + } +} + // GetNodeStore provides the cache to the nodes, mostly internal use, but may also be called by mock-tests. func (c *ConfigFactory) GetNodeLister() corelisters.NodeLister { return c.nodeLister @@ -229,7 +388,6 @@ func (c *ConfigFactory) GetScheduledPodLister() corelisters.PodLister { return c.scheduledPodLister } -// TODO(resouer) need to update all the handlers here and below for equivalence cache func (c *ConfigFactory) addPodToCache(obj interface{}) { pod, ok := obj.(*v1.Pod) if !ok { @@ -240,6 +398,8 @@ func (c *ConfigFactory) addPodToCache(obj interface{}) { if err := c.schedulerCache.AddPod(pod); err != nil { glog.Errorf("scheduler cache AddPod failed: %v", err) } + // NOTE: Updating equivalence cache of addPodToCache has been + // handled optimistically in InvalidateCachedPredicateItemForPodAdd. } func (c *ConfigFactory) updatePodInCache(oldObj, newObj interface{}) { @@ -257,6 +417,29 @@ func (c *ConfigFactory) updatePodInCache(oldObj, newObj interface{}) { if err := c.schedulerCache.UpdatePod(oldPod, newPod); err != nil { glog.Errorf("scheduler cache UpdatePod failed: %v", err) } + + c.invalidateCachedPredicatesOnUpdatePod(newPod, oldPod) +} + +func (c *ConfigFactory) invalidateCachedPredicatesOnUpdatePod(newPod *v1.Pod, oldPod *v1.Pod) { + if c.enableEquivalenceClassCache { + // if the pod does not have binded node, updating equivalence cache is meaningless; + // if pod's binded node has been changed, that case should be handled by pod add & delete. + if len(newPod.Spec.NodeName) != 0 && newPod.Spec.NodeName == oldPod.Spec.NodeName { + if !reflect.DeepEqual(oldPod.GetLabels(), newPod.GetLabels()) { + // MatchInterPodAffinity need to be reconsidered for this node, + // as well as all nodes in its same failure domain. + c.equivalencePodCache.InvalidateCachedPredicateItemOfAllNodes( + matchInterPodAffinitySet) + } + // if requested container resource changed, invalidate GeneralPredicates of this node + if !reflect.DeepEqual(predicates.GetResourceRequest(newPod), + predicates.GetResourceRequest(oldPod)) { + c.equivalencePodCache.InvalidateCachedPredicateItem( + newPod.Spec.NodeName, generalPredicatesSets) + } + } + } } func (c *ConfigFactory) deletePodFromCache(obj interface{}) { @@ -278,6 +461,29 @@ func (c *ConfigFactory) deletePodFromCache(obj interface{}) { if err := c.schedulerCache.RemovePod(pod); err != nil { glog.Errorf("scheduler cache RemovePod failed: %v", err) } + + c.invalidateCachedPredicatesOnDeletePod(pod) +} + +func (c *ConfigFactory) invalidateCachedPredicatesOnDeletePod(pod *v1.Pod) { + if c.enableEquivalenceClassCache { + // part of this case is the same as pod add. + c.equivalencePodCache.InvalidateCachedPredicateItemForPodAdd(pod, pod.Spec.NodeName) + // MatchInterPodAffinity need to be reconsidered for this node, + // as well as all nodes in its same failure domain. + // TODO(resouer) can we just do this for nodes in the same failure domain + c.equivalencePodCache.InvalidateCachedPredicateItemOfAllNodes( + matchInterPodAffinitySet) + + // if this pod have these PV, cached result of disk conflict will become invalid. + for _, volume := range pod.Spec.Volumes { + if volume.GCEPersistentDisk != nil || volume.AWSElasticBlockStore != nil || + volume.RBD != nil || volume.ISCSI != nil { + c.equivalencePodCache.InvalidateCachedPredicateItem( + pod.Spec.NodeName, noDiskConflictSet) + } + } + } } func (c *ConfigFactory) addNodeToCache(obj interface{}) { @@ -290,6 +496,8 @@ func (c *ConfigFactory) addNodeToCache(obj interface{}) { if err := c.schedulerCache.AddNode(node); err != nil { glog.Errorf("scheduler cache AddNode failed: %v", err) } + + // NOTE: add a new node does not affect existing predicates in equivalence cache } func (c *ConfigFactory) updateNodeInCache(oldObj, newObj interface{}) { @@ -307,6 +515,64 @@ func (c *ConfigFactory) updateNodeInCache(oldObj, newObj interface{}) { if err := c.schedulerCache.UpdateNode(oldNode, newNode); err != nil { glog.Errorf("scheduler cache UpdateNode failed: %v", err) } + + c.invalidateCachedPredicatesOnNodeUpdate(newNode, oldNode) +} + +func (c *ConfigFactory) invalidateCachedPredicatesOnNodeUpdate(newNode *v1.Node, oldNode *v1.Node) { + if c.enableEquivalenceClassCache { + // Begin to update equivalence cache based on node update + // TODO(resouer): think about lazily initialize this set + invalidPredicates := sets.NewString() + + oldTaints, oldErr := helper.GetTaintsFromNodeAnnotations(oldNode.GetAnnotations()) + if oldErr != nil { + glog.Errorf("Failed to get taints from old node annotation for equivalence cache") + } + newTaints, newErr := helper.GetTaintsFromNodeAnnotations(newNode.GetAnnotations()) + if newErr != nil { + glog.Errorf("Failed to get taints from new node annotation for equivalence cache") + } + + if !reflect.DeepEqual(oldNode.Status.Allocatable, newNode.Status.Allocatable) { + invalidPredicates.Insert("GeneralPredicates") // "PodFitsResources" + } + if !reflect.DeepEqual(oldNode.GetLabels(), newNode.GetLabels()) { + invalidPredicates.Insert("GeneralPredicates", "ServiceAffinity") // "PodSelectorMatches" + for k, v := range oldNode.GetLabels() { + // any label can be topology key of pod, we have to invalidate in all cases + if v != newNode.GetLabels()[k] { + invalidPredicates.Insert("MatchInterPodAffinity") + } + // NoVolumeZoneConflict will only be affected by zone related label change + if k == kubeletapis.LabelZoneFailureDomain || k == kubeletapis.LabelZoneRegion { + if v != newNode.GetLabels()[k] { + invalidPredicates.Insert("NoVolumeZoneConflict") + } + } + } + } + if !reflect.DeepEqual(oldTaints, newTaints) { + invalidPredicates.Insert("PodToleratesNodeTaints") + } + if !reflect.DeepEqual(oldNode.Status.Conditions, newNode.Status.Conditions) { + oldConditions := make(map[v1.NodeConditionType]v1.ConditionStatus) + newConditions := make(map[v1.NodeConditionType]v1.ConditionStatus) + for _, cond := range oldNode.Status.Conditions { + oldConditions[cond.Type] = cond.Status + } + for _, cond := range newNode.Status.Conditions { + newConditions[cond.Type] = cond.Status + } + if oldConditions[v1.NodeMemoryPressure] != newConditions[v1.NodeMemoryPressure] { + invalidPredicates.Insert("CheckNodeMemoryPressure") + } + if oldConditions[v1.NodeDiskPressure] != newConditions[v1.NodeDiskPressure] { + invalidPredicates.Insert("CheckNodeDiskPressure") + } + } + c.equivalencePodCache.InvalidateCachedPredicateItem(newNode.GetName(), invalidPredicates) + } } func (c *ConfigFactory) deleteNodeFromCache(obj interface{}) { @@ -328,6 +594,9 @@ func (c *ConfigFactory) deleteNodeFromCache(obj interface{}) { if err := c.schedulerCache.RemoveNode(node); err != nil { glog.Errorf("scheduler cache RemoveNode failed: %v", err) } + if c.enableEquivalenceClassCache { + c.equivalencePodCache.InvalidateAllCachedPredicateItemOfNode(node.GetName()) + } } // Create creates a scheduler with the default algorithm provider. @@ -424,11 +693,17 @@ func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String, return nil, err } - // TODO(resouer) use equivalence cache instead of nil here when #36238 get merged - algo := core.NewGenericScheduler(f.schedulerCache, nil, predicateFuncs, predicateMetaProducer, priorityConfigs, priorityMetaProducer, extenders) + // Init equivalence class cache + if f.enableEquivalenceClassCache && getEquivalencePodFunc != nil { + f.equivalencePodCache = core.NewEquivalenceCache(getEquivalencePodFunc) + glog.Info("Created equivalence class cache") + } + algo := core.NewGenericScheduler(f.schedulerCache, f.equivalencePodCache, predicateFuncs, predicateMetaProducer, priorityConfigs, priorityMetaProducer, extenders) + podBackoff := util.CreateDefaultPodBackoff() return &scheduler.Config{ SchedulerCache: f.schedulerCache, + Ecache: f.equivalencePodCache, // The scheduler only needs to consider schedulable nodes. NodeLister: &nodePredicateLister{f.nodeLister}, Algorithm: algo, diff --git a/plugin/pkg/scheduler/factory/factory_test.go b/plugin/pkg/scheduler/factory/factory_test.go index 31758e6afd0..730e1bb035a 100644 --- a/plugin/pkg/scheduler/factory/factory_test.go +++ b/plugin/pkg/scheduler/factory/factory_test.go @@ -40,6 +40,8 @@ import ( "k8s.io/kubernetes/plugin/pkg/scheduler/util" ) +const enableEquivalenceCache = true + func TestCreate(t *testing.T) { handler := utiltesting.FakeHandler{ StatusCode: 500, @@ -62,6 +64,7 @@ func TestCreate(t *testing.T) { informerFactory.Apps().V1beta1().StatefulSets(), informerFactory.Core().V1().Services(), v1.DefaultHardPodAffinitySymmetricWeight, + enableEquivalenceCache, ) factory.Create() } @@ -93,6 +96,7 @@ func TestCreateFromConfig(t *testing.T) { informerFactory.Apps().V1beta1().StatefulSets(), informerFactory.Core().V1().Services(), v1.DefaultHardPodAffinitySymmetricWeight, + enableEquivalenceCache, ) // Pre-register some predicate and priority functions @@ -151,6 +155,7 @@ func TestCreateFromConfigWithHardPodAffinitySymmetricWeight(t *testing.T) { informerFactory.Apps().V1beta1().StatefulSets(), informerFactory.Core().V1().Services(), v1.DefaultHardPodAffinitySymmetricWeight, + enableEquivalenceCache, ) // Pre-register some predicate and priority functions @@ -210,6 +215,7 @@ func TestCreateFromEmptyConfig(t *testing.T) { informerFactory.Apps().V1beta1().StatefulSets(), informerFactory.Core().V1().Services(), v1.DefaultHardPodAffinitySymmetricWeight, + enableEquivalenceCache, ) configData = []byte(`{}`) @@ -266,6 +272,7 @@ func TestDefaultErrorFunc(t *testing.T) { informerFactory.Apps().V1beta1().StatefulSets(), informerFactory.Core().V1().Services(), v1.DefaultHardPodAffinitySymmetricWeight, + enableEquivalenceCache, ) queue := cache.NewFIFO(cache.MetaNamespaceKeyFunc) podBackoff := util.CreatePodBackoff(1*time.Millisecond, 1*time.Second) @@ -378,6 +385,7 @@ func TestResponsibleForPod(t *testing.T) { informerFactory.Apps().V1beta1().StatefulSets(), informerFactory.Core().V1().Services(), v1.DefaultHardPodAffinitySymmetricWeight, + enableEquivalenceCache, ) // factory of "foo-scheduler" factoryFooScheduler := NewConfigFactory( @@ -392,6 +400,7 @@ func TestResponsibleForPod(t *testing.T) { informerFactory.Apps().V1beta1().StatefulSets(), informerFactory.Core().V1().Services(), v1.DefaultHardPodAffinitySymmetricWeight, + enableEquivalenceCache, ) // scheduler annotations to be tested schedulerFitsDefault := "default-scheduler" @@ -461,6 +470,7 @@ func TestInvalidHardPodAffinitySymmetricWeight(t *testing.T) { informerFactory.Apps().V1beta1().StatefulSets(), informerFactory.Core().V1().Services(), -1, + enableEquivalenceCache, ) _, err := factory.Create() if err == nil { @@ -506,6 +516,7 @@ func TestInvalidFactoryArgs(t *testing.T) { informerFactory.Apps().V1beta1().StatefulSets(), informerFactory.Core().V1().Services(), test.hardPodAffinitySymmetricWeight, + enableEquivalenceCache, ) _, err := factory.Create() if err == nil { diff --git a/test/e2e/autoscaling/cluster_size_autoscaling.go b/test/e2e/autoscaling/cluster_size_autoscaling.go index 565c9ea92f4..f1d8f775e8d 100644 --- a/test/e2e/autoscaling/cluster_size_autoscaling.go +++ b/test/e2e/autoscaling/cluster_size_autoscaling.go @@ -411,7 +411,7 @@ var _ = framework.KubeDescribe("Cluster size autoscaling [Slow]", func() { framework.AddOrUpdateLabelOnNode(c, node, labelKey, labelValue) } - CreateNodeSelectorPods(f, "node-selector", minSize+1, map[string]string{labelKey: labelValue}, false) + scheduling.CreateNodeSelectorPods(f, "node-selector", minSize+1, map[string]string{labelKey: labelValue}, false) By("Waiting for new node to appear and annotating it") framework.WaitForGroupSize(minMig, int32(minSize+1)) @@ -907,26 +907,6 @@ func doPut(url, content string) (string, error) { return strBody, nil } -func CreateNodeSelectorPods(f *framework.Framework, id string, replicas int, nodeSelector map[string]string, expectRunning bool) { - By(fmt.Sprintf("Running RC which reserves host port and defines node selector")) - - config := &testutils.RCConfig{ - Client: f.ClientSet, - InternalClient: f.InternalClientset, - Name: id, - Namespace: f.Namespace.Name, - Timeout: defaultTimeout, - Image: framework.GetPauseImageName(f.ClientSet), - Replicas: replicas, - HostPorts: map[string]int{"port1": 4321}, - NodeSelector: nodeSelector, - } - err := framework.RunRC(*config) - if expectRunning { - framework.ExpectNoError(err) - } -} - func ReserveMemory(f *framework.Framework, id string, replicas, megabytes int, expectRunning bool, timeout time.Duration) { By(fmt.Sprintf("Running RC which reserves %v MB of memory", megabytes)) request := int64(1024 * 1024 * megabytes / replicas) diff --git a/test/e2e/framework/util.go b/test/e2e/framework/util.go index f8b76005dac..93d788c1a9c 100644 --- a/test/e2e/framework/util.go +++ b/test/e2e/framework/util.go @@ -2462,6 +2462,15 @@ func AddOrUpdateLabelOnNode(c clientset.Interface, nodeName string, labelKey, la ExpectNoError(testutil.AddLabelsToNode(c, nodeName, map[string]string{labelKey: labelValue})) } +func AddOrUpdateLabelOnNodeAndReturnOldValue(c clientset.Interface, nodeName string, labelKey, labelValue string) string { + var oldValue string + node, err := c.Core().Nodes().Get(nodeName, metav1.GetOptions{}) + ExpectNoError(err) + oldValue = node.Labels[labelKey] + ExpectNoError(testutil.AddLabelsToNode(c, nodeName, map[string]string{labelKey: labelValue})) + return oldValue +} + func ExpectNodeHasLabel(c clientset.Interface, nodeName string, labelKey string, labelValue string) { By("verifying the node has the label " + labelKey + " " + labelValue) node, err := c.Core().Nodes().Get(nodeName, metav1.GetOptions{}) diff --git a/test/e2e/scheduling/BUILD b/test/e2e/scheduling/BUILD index b06a1b9e9d2..a5e83f66d5d 100644 --- a/test/e2e/scheduling/BUILD +++ b/test/e2e/scheduling/BUILD @@ -10,6 +10,7 @@ load( go_library( name = "go_default_library", srcs = [ + "equivalence_cache_predicates.go", "events.go", "framework.go", "nvidia-gpus.go", diff --git a/test/e2e/scheduling/equivalence_cache_predicates.go b/test/e2e/scheduling/equivalence_cache_predicates.go new file mode 100644 index 00000000000..1b65de56f4b --- /dev/null +++ b/test/e2e/scheduling/equivalence_cache_predicates.go @@ -0,0 +1,286 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package scheduling + +import ( + "fmt" + "time" + + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/util/uuid" + clientset "k8s.io/client-go/kubernetes" + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/test/e2e/framework" + testutils "k8s.io/kubernetes/test/utils" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + _ "github.com/stretchr/testify/assert" +) + +var _ = framework.KubeDescribe("EquivalenceCache [Serial]", func() { + var cs clientset.Interface + var nodeList *v1.NodeList + var masterNodes sets.String + var systemPodsNo int + var ns string + f := framework.NewDefaultFramework("equivalence-cache") + ignoreLabels := framework.ImagePullerLabels + + BeforeEach(func() { + cs = f.ClientSet + ns = f.Namespace.Name + + framework.WaitForAllNodesHealthy(cs, time.Minute) + masterNodes, nodeList = framework.GetMasterAndWorkerNodesOrDie(cs) + + framework.ExpectNoError(framework.CheckTestingNSDeletedExcept(cs, ns)) + + // Every test case in this suite assumes that cluster add-on pods stay stable and + // cannot be run in parallel with any other test that touches Nodes or Pods. + // It is so because we need to have precise control on what's running in the cluster. + systemPods, err := framework.GetPodsInNamespace(cs, ns, ignoreLabels) + Expect(err).NotTo(HaveOccurred()) + systemPodsNo = 0 + for _, pod := range systemPods { + if !masterNodes.Has(pod.Spec.NodeName) && pod.DeletionTimestamp == nil { + systemPodsNo++ + } + } + + err = framework.WaitForPodsRunningReady(cs, api.NamespaceSystem, int32(systemPodsNo), int32(systemPodsNo), framework.PodReadyBeforeTimeout, ignoreLabels) + Expect(err).NotTo(HaveOccurred()) + + for _, node := range nodeList.Items { + framework.Logf("\nLogging pods the kubelet thinks is on node %v before test", node.Name) + framework.PrintAllKubeletPods(cs, node.Name) + } + + }) + + // This test verifies that GeneralPredicates works as expected: + // When a replica pod (with HostPorts) is scheduled to a node, it will invalidate GeneralPredicates cache on this node, + // so that subsequent replica pods with same host port claim will be rejected. + // We enforce all replica pods bind to the same node so there will always be conflicts. + It("validates GeneralPredicates is properly invalidated when a pod is scheduled [Slow]", func() { + By("Launching a RC with two replica pods with HostPorts") + nodeName := getNodeThatCanRunPodWithoutToleration(f) + rcName := "host-port" + + // bind all replicas to same node + nodeSelector := map[string]string{"kubernetes.io/hostname": nodeName} + + By("One pod should be scheduled, the other should be rejected") + // CreateNodeSelectorPods creates RC with host port 4312 + WaitForSchedulerAfterAction(f, func() error { + err := CreateNodeSelectorPods(f, rcName, 2, nodeSelector, false) + return err + }, rcName, false) + defer framework.DeleteRCAndPods(f.ClientSet, f.InternalClientset, ns, rcName) + // the first replica pod is scheduled, and the second pod will be rejected. + verifyResult(cs, 1, 1, ns) + }) + + // This test verifies that MatchInterPodAffinity works as expected. + // In equivalence cache, it does not handle inter pod affinity (anti-affinity) specially (unless node label changed), + // because current predicates algorithm will ensure newly scheduled pod does not break existing affinity in cluster. + It("validates pod affinity works properly when new replica pod is scheduled", func() { + // create a pod running with label {security: S1}, and choose this node + nodeName, _ := runAndKeepPodWithLabelAndGetNodeName(f) + + By("Trying to apply a random label on the found node.") + // we need to use real failure domains, since scheduler only know them + k := "failure-domain.beta.kubernetes.io/zone" + v := "equivalence-e2e-test" + oldValue := framework.AddOrUpdateLabelOnNodeAndReturnOldValue(cs, nodeName, k, v) + framework.ExpectNodeHasLabel(cs, nodeName, k, v) + // restore the node label + defer framework.AddOrUpdateLabelOnNode(cs, nodeName, k, oldValue) + + By("Trying to schedule RC with Pod Affinity should success.") + framework.WaitForStableCluster(cs, masterNodes) + affinityRCName := "with-pod-affinity-" + string(uuid.NewUUID()) + replica := 2 + labelsMap := map[string]string{ + "name": affinityRCName, + } + affinity := &v1.Affinity{ + PodAffinity: &v1.PodAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: []v1.PodAffinityTerm{ + { + LabelSelector: &metav1.LabelSelector{ + MatchExpressions: []metav1.LabelSelectorRequirement{ + { + Key: "security", + Operator: metav1.LabelSelectorOpIn, + Values: []string{"S1"}, + }, + }, + }, + TopologyKey: k, + Namespaces: []string{ns}, + }, + }, + }, + } + rc := getRCWithInterPodAffinity(affinityRCName, labelsMap, replica, affinity, framework.GetPauseImageName(f.ClientSet)) + defer framework.DeleteRCAndPods(f.ClientSet, f.InternalClientset, ns, affinityRCName) + + // RC should be running successfully + // TODO: WaitForSchedulerAfterAction() can on be used to wait for failure event, + // not for successful RC, since no specific pod name can be provided. + _, err := cs.Core().ReplicationControllers(ns).Create(rc) + framework.ExpectNoError(err) + framework.ExpectNoError(framework.WaitForControlledPodsRunning(cs, ns, affinityRCName, api.Kind("ReplicationController"))) + + By("Remove node failure domain label") + framework.RemoveLabelOffNode(cs, nodeName, k) + + By("Trying to schedule another equivalent Pod should fail due to node label has been removed.") + // use scale to create another equivalent pod and wait for failure event + WaitForSchedulerAfterAction(f, func() error { + err := framework.ScaleRC(f.ClientSet, f.InternalClientset, ns, affinityRCName, uint(replica+1), false) + return err + }, affinityRCName, false) + // and this new pod should be rejected since node label has been updated + verifyReplicasResult(cs, replica, 1, ns, affinityRCName) + }) + + // This test verifies that MatchInterPodAffinity (anti-affinity) is respected as expected. + It("validates pod anti-affinity works properly when new replica pod is scheduled", func() { + By("Launching two pods on two distinct nodes to get two node names") + CreateHostPortPods(f, "host-port", 2, true) + defer framework.DeleteRCAndPods(f.ClientSet, f.InternalClientset, ns, "host-port") + podList, err := cs.Core().Pods(ns).List(metav1.ListOptions{}) + framework.ExpectNoError(err) + Expect(len(podList.Items)).To(Equal(2)) + nodeNames := []string{podList.Items[0].Spec.NodeName, podList.Items[1].Spec.NodeName} + Expect(nodeNames[0]).ToNot(Equal(nodeNames[1])) + + By("Applying a random label to both nodes.") + k := "e2e.inter-pod-affinity.kubernetes.io/zone" + v := "equivalence-e2etest" + for _, nodeName := range nodeNames { + framework.AddOrUpdateLabelOnNode(cs, nodeName, k, v) + framework.ExpectNodeHasLabel(cs, nodeName, k, v) + defer framework.RemoveLabelOffNode(cs, nodeName, k) + } + + By("Trying to launch a pod with the service label on the selected nodes.") + // run a pod with label {"service": "S1"} and expect it to be running + runPausePod(f, pausePodConfig{ + Name: "with-label-" + string(uuid.NewUUID()), + Labels: map[string]string{"service": "S1"}, + NodeSelector: map[string]string{k: v}, // only launch on our two nodes + }) + + By("Trying to launch RC with podAntiAffinity on these two nodes should be rejected.") + labelRCName := "with-podantiaffinity-" + string(uuid.NewUUID()) + replica := 2 + labelsMap := map[string]string{ + "name": labelRCName, + } + affinity := &v1.Affinity{ + PodAntiAffinity: &v1.PodAntiAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: []v1.PodAffinityTerm{ + { + LabelSelector: &metav1.LabelSelector{ + MatchExpressions: []metav1.LabelSelectorRequirement{ + { + Key: "service", + Operator: metav1.LabelSelectorOpIn, + Values: []string{"S1"}, + }, + }, + }, + TopologyKey: k, + Namespaces: []string{ns}, + }, + }, + }, + } + rc := getRCWithInterPodAffinityNodeSelector(labelRCName, labelsMap, replica, affinity, + framework.GetPauseImageName(f.ClientSet), map[string]string{k: v}) + defer framework.DeleteRCAndPods(f.ClientSet, f.InternalClientset, ns, labelRCName) + + WaitForSchedulerAfterAction(f, func() error { + _, err := cs.Core().ReplicationControllers(ns).Create(rc) + return err + }, labelRCName, false) + + // these two replicas should all be rejected since podAntiAffinity says it they anit-affinity with pod {"service": "S1"} + verifyReplicasResult(cs, 0, replica, ns, labelRCName) + }) +}) + +// getRCWithInterPodAffinity returns RC with given affinity rules. +func getRCWithInterPodAffinity(name string, labelsMap map[string]string, replica int, affinity *v1.Affinity, image string) *v1.ReplicationController { + return getRCWithInterPodAffinityNodeSelector(name, labelsMap, replica, affinity, image, map[string]string{}) +} + +// getRCWithInterPodAffinity returns RC with given affinity rules and node selector. +func getRCWithInterPodAffinityNodeSelector(name string, labelsMap map[string]string, replica int, affinity *v1.Affinity, image string, nodeSelector map[string]string) *v1.ReplicationController { + replicaInt32 := int32(replica) + return &v1.ReplicationController{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + }, + Spec: v1.ReplicationControllerSpec{ + Replicas: &replicaInt32, + Selector: labelsMap, + Template: &v1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: labelsMap, + }, + Spec: v1.PodSpec{ + Affinity: affinity, + Containers: []v1.Container{ + { + Name: name, + Image: image, + }, + }, + DNSPolicy: v1.DNSDefault, + NodeSelector: nodeSelector, + }, + }, + }, + } +} + +func CreateNodeSelectorPods(f *framework.Framework, id string, replicas int, nodeSelector map[string]string, expectRunning bool) error { + By(fmt.Sprintf("Running RC which reserves host port and defines node selector")) + + config := &testutils.RCConfig{ + Client: f.ClientSet, + InternalClient: f.InternalClientset, + Name: id, + Namespace: f.Namespace.Name, + Timeout: defaultTimeout, + Image: framework.GetPauseImageName(f.ClientSet), + Replicas: replicas, + HostPorts: map[string]int{"port1": 4321}, + NodeSelector: nodeSelector, + } + err := framework.RunRC(*config) + if expectRunning { + return err + } + return nil +} diff --git a/test/e2e/scheduling/predicates.go b/test/e2e/scheduling/predicates.go index b792d3b3258..70e9d00f881 100644 --- a/test/e2e/scheduling/predicates.go +++ b/test/e2e/scheduling/predicates.go @@ -24,6 +24,7 @@ import ( "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/uuid" clientset "k8s.io/client-go/kubernetes" @@ -51,6 +52,8 @@ type pausePodConfig struct { Resources *v1.ResourceRequirements Tolerations []v1.Toleration NodeName string + Ports []v1.ContainerPort + OwnerReferences []metav1.OwnerReference } var _ = SIGDescribe("SchedulerPredicates [Serial]", func() { @@ -725,9 +728,10 @@ var _ = SIGDescribe("SchedulerPredicates [Serial]", func() { func initPausePod(f *framework.Framework, conf pausePodConfig) *v1.Pod { pod := &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ - Name: conf.Name, - Labels: conf.Labels, - Annotations: conf.Annotations, + Name: conf.Name, + Labels: conf.Labels, + Annotations: conf.Annotations, + OwnerReferences: conf.OwnerReferences, }, Spec: v1.PodSpec{ NodeSelector: conf.NodeSelector, @@ -736,6 +740,7 @@ func initPausePod(f *framework.Framework, conf pausePodConfig) *v1.Pod { { Name: conf.Name, Image: framework.GetPauseImageName(f.ClientSet), + Ports: conf.Ports, }, }, Tolerations: conf.Tolerations, @@ -924,6 +929,32 @@ func verifyResult(c clientset.Interface, expectedScheduled int, expectedNotSched Expect(len(scheduledPods)).To(Equal(expectedScheduled), printOnce(fmt.Sprintf("Scheduled Pods: %#v", scheduledPods))) } +// verifyReplicasResult is wrapper of verifyResult for a group pods with same "name: labelName" label, which means they belong to same RC +func verifyReplicasResult(c clientset.Interface, expectedScheduled int, expectedNotScheduled int, ns string, labelName string) { + allPods := getPodsByLabels(c, ns, map[string]string{"name": labelName}) + scheduledPods, notScheduledPods := framework.GetPodsScheduled(masterNodes, allPods) + + printed := false + printOnce := func(msg string) string { + if !printed { + printed = true + return msg + } else { + return "" + } + } + + Expect(len(notScheduledPods)).To(Equal(expectedNotScheduled), printOnce(fmt.Sprintf("Not scheduled Pods: %#v", notScheduledPods))) + Expect(len(scheduledPods)).To(Equal(expectedScheduled), printOnce(fmt.Sprintf("Scheduled Pods: %#v", scheduledPods))) +} + +func getPodsByLabels(c clientset.Interface, ns string, labelsMap map[string]string) *v1.PodList { + selector := labels.SelectorFromSet(labels.Set(labelsMap)) + allPods, err := c.Core().Pods(ns).List(metav1.ListOptions{LabelSelector: selector.String()}) + framework.ExpectNoError(err) + return allPods +} + func runAndKeepPodWithLabelAndGetNodeName(f *framework.Framework) (string, string) { // launch a pod to find a node which can launch a pod. We intentionally do // not just take the node list and choose the first of them. Depending on the diff --git a/test/integration/scheduler/extender_test.go b/test/integration/scheduler/extender_test.go index 93482f03618..2d7a0d5e82b 100644 --- a/test/integration/scheduler/extender_test.go +++ b/test/integration/scheduler/extender_test.go @@ -370,6 +370,7 @@ func TestSchedulerExtender(t *testing.T) { informerFactory.Apps().V1beta1().StatefulSets(), informerFactory.Core().V1().Services(), v1.DefaultHardPodAffinitySymmetricWeight, + enableEquivalenceCache, ) schedulerConfig, err := schedulerConfigFactory.CreateFromConfig(policy) if err != nil { diff --git a/test/integration/scheduler/scheduler_test.go b/test/integration/scheduler/scheduler_test.go index aa64c08530b..414cf45b4e0 100644 --- a/test/integration/scheduler/scheduler_test.go +++ b/test/integration/scheduler/scheduler_test.go @@ -50,6 +50,8 @@ import ( "k8s.io/kubernetes/test/integration/framework" ) +const enableEquivalenceCache = true + type nodeMutationFunc func(t *testing.T, n *v1.Node, nodeLister corelisters.NodeLister, c clientset.Interface) type nodeStateManager struct { @@ -257,6 +259,7 @@ func TestUnschedulableNodes(t *testing.T) { informerFactory.Apps().V1beta1().StatefulSets(), informerFactory.Core().V1().Services(), v1.DefaultHardPodAffinitySymmetricWeight, + enableEquivalenceCache, ) schedulerConfig, err := schedulerConfigFactory.Create() if err != nil { @@ -540,6 +543,7 @@ func TestMultiScheduler(t *testing.T) { informerFactory.Apps().V1beta1().StatefulSets(), informerFactory.Core().V1().Services(), v1.DefaultHardPodAffinitySymmetricWeight, + enableEquivalenceCache, ) schedulerConfig, err := schedulerConfigFactory.Create() if err != nil { @@ -626,6 +630,7 @@ func TestMultiScheduler(t *testing.T) { informerFactory.Apps().V1beta1().StatefulSets(), informerFactory.Core().V1().Services(), v1.DefaultHardPodAffinitySymmetricWeight, + enableEquivalenceCache, ) schedulerConfig2, err := schedulerConfigFactory2.Create() if err != nil { @@ -736,6 +741,7 @@ func TestAllocatable(t *testing.T) { informerFactory.Apps().V1beta1().StatefulSets(), informerFactory.Core().V1().Services(), v1.DefaultHardPodAffinitySymmetricWeight, + enableEquivalenceCache, ) schedulerConfig, err := schedulerConfigFactory.Create() if err != nil { diff --git a/test/integration/scheduler_perf/util.go b/test/integration/scheduler_perf/util.go index 5278defc6ef..e022c4851f5 100644 --- a/test/integration/scheduler_perf/util.go +++ b/test/integration/scheduler_perf/util.go @@ -36,6 +36,8 @@ import ( "k8s.io/kubernetes/test/integration/framework" ) +const enableEquivalenceCache = true + // mustSetupScheduler starts the following components: // - k8s api server (a.k.a. master) // - scheduler @@ -74,6 +76,7 @@ func mustSetupScheduler() (schedulerConfigurator scheduler.Configurator, destroy informerFactory.Apps().V1beta1().StatefulSets(), informerFactory.Core().V1().Services(), v1.DefaultHardPodAffinitySymmetricWeight, + enableEquivalenceCache, ) eventBroadcaster := record.NewBroadcaster()