diff --git a/plugin/pkg/scheduler/algorithm/types.go b/plugin/pkg/scheduler/algorithm/types.go index 954c96f07c0..f1982fe27de 100644 --- a/plugin/pkg/scheduler/algorithm/types.go +++ b/plugin/pkg/scheduler/algorithm/types.go @@ -61,3 +61,5 @@ func EmptyMetadataProducer(pod *api.Pod) interface{} { type PredicateFailureReason interface { GetReason() string } + +type GetEquivalencePodFunc func(pod *api.Pod) interface{} diff --git a/plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go b/plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go index 24bd419f4cb..3c9fcf1517e 100644 --- a/plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go +++ b/plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go @@ -21,6 +21,7 @@ import ( "os" "strconv" + "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/cloudprovider/providers/aws" "k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/plugin/pkg/scheduler" @@ -36,6 +37,7 @@ const ( // GCE instances can have up to 16 PD volumes attached. DefaultMaxGCEPDVolumes = 16 ClusterAutoscalerProvider = "ClusterAutoscalerProvider" + PetSetKind = "PetSet" ) // getMaxVols checks the max PD volumes environment variable, otherwise returning a default value @@ -105,6 +107,8 @@ func init() { factory.RegisterFitPredicate("MatchNodeSelector", predicates.PodSelectorMatches) // Optional, cluster-autoscaler friendly priority function - give used nodes higher priority. factory.RegisterPriorityFunction2("MostRequestedPriority", priorities.MostRequestedPriorityMap, nil, 1) + // Use equivalence class to speed up predicates & priorities + factory.RegisterGetEquivalencePodFunction(GetEquivalencePod) } func replace(set sets.String, replaceWhat, replaceWith string) sets.String { @@ -205,3 +209,39 @@ func defaultPriorities() sets.String { ), ) } + +// GetEquivalencePod returns a EquivalencePod which contains a group of pod attributes which can be reused. +func GetEquivalencePod(pod *api.Pod) interface{} { + equivalencePod := EquivalencePod{} + // For now we only consider pods: + // 1. OwnerReferences is Controller + // 2. OwnerReferences kind is in valid controller kinds + // 3. with same OwnerReferences + // to be equivalent + if len(pod.OwnerReferences) != 0 { + for _, ref := range pod.OwnerReferences { + if *ref.Controller && isValidControllerKind(ref.Kind) { + equivalencePod.ControllerRef = ref + // a pod can only belongs to one controller + break + } + } + } + return &equivalencePod +} + +// isValidControllerKind checks if a given controller's kind can be applied to equivalence pod algorithm. +func isValidControllerKind(kind string) bool { + switch kind { + // list of kinds that we cannot handle + case PetSetKind: + return false + default: + return true + } +} + +// EquivalencePod is a group of pod attributes which can be reused as equivalence to schedule other pods. +type EquivalencePod struct { + ControllerRef api.OwnerReference +} diff --git a/plugin/pkg/scheduler/equivalence_cache.go b/plugin/pkg/scheduler/equivalence_cache.go new file mode 100644 index 00000000000..cd13b250aea --- /dev/null +++ b/plugin/pkg/scheduler/equivalence_cache.go @@ -0,0 +1,132 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package scheduler + +import ( + "github.com/golang/groupcache/lru" + "hash/adler32" + + "k8s.io/kubernetes/pkg/api" + hashutil "k8s.io/kubernetes/pkg/util/hash" + "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm" + "sync" +) + +// TODO(harryz) figure out the right number for this, 4096 may be too big +const maxCacheEntries = 4096 + +type HostPredicate struct { + Fit bool + FailReasons []algorithm.PredicateFailureReason +} + +type AlgorithmCache struct { + // Only consider predicates for now, priorities rely on: #31606 + predicatesCache *lru.Cache +} + +func newAlgorithmCache() AlgorithmCache { + return AlgorithmCache{ + predicatesCache: lru.New(maxCacheEntries), + } +} + +// Store a map of predicate cache with maxsize +type EquivalenceCache struct { + sync.RWMutex + getEquivalencePod algorithm.GetEquivalencePodFunc + algorithmCache map[string]AlgorithmCache +} + +func NewEquivalenceCache(getEquivalencePodFunc algorithm.GetEquivalencePodFunc) *EquivalenceCache { + return &EquivalenceCache{ + getEquivalencePod: getEquivalencePodFunc, + algorithmCache: make(map[string]AlgorithmCache), + } +} + +// addPodPredicate adds pod predicate for equivalence class +func (ec *EquivalenceCache) addPodPredicate(podKey uint64, nodeName string, fit bool, failReasons []algorithm.PredicateFailureReason) { + if _, exist := ec.algorithmCache[nodeName]; !exist { + ec.algorithmCache[nodeName] = newAlgorithmCache() + } + ec.algorithmCache[nodeName].predicatesCache.Add(podKey, HostPredicate{Fit: fit, FailReasons: failReasons}) +} + +// AddPodPredicatesCache cache pod predicate for equivalence class +func (ec *EquivalenceCache) AddPodPredicatesCache(pod *api.Pod, fitNodeList []*api.Node, failedPredicates *FailedPredicateMap) { + equivalenceHash := ec.hashEquivalencePod(pod) + + for _, fitNode := range fitNodeList { + ec.addPodPredicate(equivalenceHash, fitNode.Name, true, nil) + } + for failNodeName, failReasons := range *failedPredicates { + ec.addPodPredicate(equivalenceHash, failNodeName, false, failReasons) + } +} + +// GetCachedPredicates gets cached predicates for equivalence class +func (ec *EquivalenceCache) GetCachedPredicates(pod *api.Pod, nodes []*api.Node) ([]*api.Node, FailedPredicateMap, []*api.Node) { + fitNodeList := []*api.Node{} + failedPredicates := FailedPredicateMap{} + noCacheNodeList := []*api.Node{} + equivalenceHash := ec.hashEquivalencePod(pod) + for _, node := range nodes { + findCache := false + if algorithmCache, exist := ec.algorithmCache[node.Name]; exist { + if cachePredicate, exist := algorithmCache.predicatesCache.Get(equivalenceHash); exist { + hostPredicate := cachePredicate.(HostPredicate) + if hostPredicate.Fit { + fitNodeList = append(fitNodeList, node) + } else { + failedPredicates[node.Name] = hostPredicate.FailReasons + } + findCache = true + } + } + if !findCache { + noCacheNodeList = append(noCacheNodeList, node) + } + } + return fitNodeList, failedPredicates, noCacheNodeList +} + +// SendInvalidAlgorithmCacheReq marks AlgorithmCache item as invalid +func (ec *EquivalenceCache) SendInvalidAlgorithmCacheReq(nodeName string) { + ec.Lock() + defer ec.Unlock() + // clear the cache of this node + delete(ec.algorithmCache, nodeName) +} + +// SendClearAllCacheReq marks all cached item as invalid +func (ec *EquivalenceCache) SendClearAllCacheReq() { + ec.Lock() + defer ec.Unlock() + // clear cache of all nodes + for nodeName := range ec.algorithmCache { + delete(ec.algorithmCache, nodeName) + } +} + +// hashEquivalencePod returns the hash of equivalence pod. +func (ec *EquivalenceCache) hashEquivalencePod(pod *api.Pod) uint64 { + equivalencePod := ec.getEquivalencePod(pod) + hash := adler32.New() + hashutil.DeepHashObject(hash, equivalencePod) + return uint64(hash.Sum32()) +} diff --git a/plugin/pkg/scheduler/factory/plugins.go b/plugin/pkg/scheduler/factory/plugins.go index 4f00bba3a0b..9d5952764c7 100644 --- a/plugin/pkg/scheduler/factory/plugins.go +++ b/plugin/pkg/scheduler/factory/plugins.go @@ -76,8 +76,12 @@ var ( fitPredicateMap = make(map[string]FitPredicateFactory) priorityFunctionMap = make(map[string]PriorityConfigFactory) algorithmProviderMap = make(map[string]AlgorithmProviderConfig) + // Registered metadata producers priorityMetadataProducer MetadataProducerFactory + + // get equivalence pod function + getEquivalencePodFunc algorithm.GetEquivalencePodFunc = nil ) const ( @@ -244,6 +248,10 @@ func RegisterCustomPriorityFunction(policy schedulerapi.PriorityPolicy) string { return RegisterPriorityConfigFactory(policy.Name, *pcf) } +func RegisterGetEquivalencePodFunction(equivalenceFunc algorithm.GetEquivalencePodFunc) { + getEquivalencePodFunc = equivalenceFunc +} + // This check is useful for testing providers. func IsPriorityFunctionRegistered(name string) bool { schedulerFactoryMutex.Lock()