mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-26 05:03:09 +00:00
Update provider and cache
Update equivalent class & remove priority Use controller ref Directly clear the cache
This commit is contained in:
parent
4dc418c5ee
commit
204dbe7fdd
@ -61,3 +61,5 @@ func EmptyMetadataProducer(pod *api.Pod) interface{} {
|
|||||||
type PredicateFailureReason interface {
|
type PredicateFailureReason interface {
|
||||||
GetReason() string
|
GetReason() string
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type GetEquivalencePodFunc func(pod *api.Pod) interface{}
|
||||||
|
@ -21,6 +21,7 @@ import (
|
|||||||
"os"
|
"os"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
|
||||||
|
"k8s.io/kubernetes/pkg/api"
|
||||||
"k8s.io/kubernetes/pkg/cloudprovider/providers/aws"
|
"k8s.io/kubernetes/pkg/cloudprovider/providers/aws"
|
||||||
"k8s.io/kubernetes/pkg/util/sets"
|
"k8s.io/kubernetes/pkg/util/sets"
|
||||||
"k8s.io/kubernetes/plugin/pkg/scheduler"
|
"k8s.io/kubernetes/plugin/pkg/scheduler"
|
||||||
@ -36,6 +37,7 @@ const (
|
|||||||
// GCE instances can have up to 16 PD volumes attached.
|
// GCE instances can have up to 16 PD volumes attached.
|
||||||
DefaultMaxGCEPDVolumes = 16
|
DefaultMaxGCEPDVolumes = 16
|
||||||
ClusterAutoscalerProvider = "ClusterAutoscalerProvider"
|
ClusterAutoscalerProvider = "ClusterAutoscalerProvider"
|
||||||
|
PetSetKind = "PetSet"
|
||||||
)
|
)
|
||||||
|
|
||||||
// getMaxVols checks the max PD volumes environment variable, otherwise returning a default value
|
// getMaxVols checks the max PD volumes environment variable, otherwise returning a default value
|
||||||
@ -105,6 +107,8 @@ func init() {
|
|||||||
factory.RegisterFitPredicate("MatchNodeSelector", predicates.PodSelectorMatches)
|
factory.RegisterFitPredicate("MatchNodeSelector", predicates.PodSelectorMatches)
|
||||||
// Optional, cluster-autoscaler friendly priority function - give used nodes higher priority.
|
// Optional, cluster-autoscaler friendly priority function - give used nodes higher priority.
|
||||||
factory.RegisterPriorityFunction2("MostRequestedPriority", priorities.MostRequestedPriorityMap, nil, 1)
|
factory.RegisterPriorityFunction2("MostRequestedPriority", priorities.MostRequestedPriorityMap, nil, 1)
|
||||||
|
// Use equivalence class to speed up predicates & priorities
|
||||||
|
factory.RegisterGetEquivalencePodFunction(GetEquivalencePod)
|
||||||
}
|
}
|
||||||
|
|
||||||
func replace(set sets.String, replaceWhat, replaceWith string) sets.String {
|
func replace(set sets.String, replaceWhat, replaceWith string) sets.String {
|
||||||
@ -205,3 +209,39 @@ func defaultPriorities() sets.String {
|
|||||||
),
|
),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GetEquivalencePod returns a EquivalencePod which contains a group of pod attributes which can be reused.
|
||||||
|
func GetEquivalencePod(pod *api.Pod) interface{} {
|
||||||
|
equivalencePod := EquivalencePod{}
|
||||||
|
// For now we only consider pods:
|
||||||
|
// 1. OwnerReferences is Controller
|
||||||
|
// 2. OwnerReferences kind is in valid controller kinds
|
||||||
|
// 3. with same OwnerReferences
|
||||||
|
// to be equivalent
|
||||||
|
if len(pod.OwnerReferences) != 0 {
|
||||||
|
for _, ref := range pod.OwnerReferences {
|
||||||
|
if *ref.Controller && isValidControllerKind(ref.Kind) {
|
||||||
|
equivalencePod.ControllerRef = ref
|
||||||
|
// a pod can only belongs to one controller
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return &equivalencePod
|
||||||
|
}
|
||||||
|
|
||||||
|
// isValidControllerKind checks if a given controller's kind can be applied to equivalence pod algorithm.
|
||||||
|
func isValidControllerKind(kind string) bool {
|
||||||
|
switch kind {
|
||||||
|
// list of kinds that we cannot handle
|
||||||
|
case PetSetKind:
|
||||||
|
return false
|
||||||
|
default:
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// EquivalencePod is a group of pod attributes which can be reused as equivalence to schedule other pods.
|
||||||
|
type EquivalencePod struct {
|
||||||
|
ControllerRef api.OwnerReference
|
||||||
|
}
|
||||||
|
132
plugin/pkg/scheduler/equivalence_cache.go
Normal file
132
plugin/pkg/scheduler/equivalence_cache.go
Normal file
@ -0,0 +1,132 @@
|
|||||||
|
/*
|
||||||
|
Copyright 2016 The Kubernetes Authors.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package scheduler
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/golang/groupcache/lru"
|
||||||
|
"hash/adler32"
|
||||||
|
|
||||||
|
"k8s.io/kubernetes/pkg/api"
|
||||||
|
hashutil "k8s.io/kubernetes/pkg/util/hash"
|
||||||
|
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm"
|
||||||
|
"sync"
|
||||||
|
)
|
||||||
|
|
||||||
|
// TODO(harryz) figure out the right number for this, 4096 may be too big
|
||||||
|
const maxCacheEntries = 4096
|
||||||
|
|
||||||
|
type HostPredicate struct {
|
||||||
|
Fit bool
|
||||||
|
FailReasons []algorithm.PredicateFailureReason
|
||||||
|
}
|
||||||
|
|
||||||
|
type AlgorithmCache struct {
|
||||||
|
// Only consider predicates for now, priorities rely on: #31606
|
||||||
|
predicatesCache *lru.Cache
|
||||||
|
}
|
||||||
|
|
||||||
|
func newAlgorithmCache() AlgorithmCache {
|
||||||
|
return AlgorithmCache{
|
||||||
|
predicatesCache: lru.New(maxCacheEntries),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Store a map of predicate cache with maxsize
|
||||||
|
type EquivalenceCache struct {
|
||||||
|
sync.RWMutex
|
||||||
|
getEquivalencePod algorithm.GetEquivalencePodFunc
|
||||||
|
algorithmCache map[string]AlgorithmCache
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewEquivalenceCache(getEquivalencePodFunc algorithm.GetEquivalencePodFunc) *EquivalenceCache {
|
||||||
|
return &EquivalenceCache{
|
||||||
|
getEquivalencePod: getEquivalencePodFunc,
|
||||||
|
algorithmCache: make(map[string]AlgorithmCache),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// addPodPredicate adds pod predicate for equivalence class
|
||||||
|
func (ec *EquivalenceCache) addPodPredicate(podKey uint64, nodeName string, fit bool, failReasons []algorithm.PredicateFailureReason) {
|
||||||
|
if _, exist := ec.algorithmCache[nodeName]; !exist {
|
||||||
|
ec.algorithmCache[nodeName] = newAlgorithmCache()
|
||||||
|
}
|
||||||
|
ec.algorithmCache[nodeName].predicatesCache.Add(podKey, HostPredicate{Fit: fit, FailReasons: failReasons})
|
||||||
|
}
|
||||||
|
|
||||||
|
// AddPodPredicatesCache cache pod predicate for equivalence class
|
||||||
|
func (ec *EquivalenceCache) AddPodPredicatesCache(pod *api.Pod, fitNodeList []*api.Node, failedPredicates *FailedPredicateMap) {
|
||||||
|
equivalenceHash := ec.hashEquivalencePod(pod)
|
||||||
|
|
||||||
|
for _, fitNode := range fitNodeList {
|
||||||
|
ec.addPodPredicate(equivalenceHash, fitNode.Name, true, nil)
|
||||||
|
}
|
||||||
|
for failNodeName, failReasons := range *failedPredicates {
|
||||||
|
ec.addPodPredicate(equivalenceHash, failNodeName, false, failReasons)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetCachedPredicates gets cached predicates for equivalence class
|
||||||
|
func (ec *EquivalenceCache) GetCachedPredicates(pod *api.Pod, nodes []*api.Node) ([]*api.Node, FailedPredicateMap, []*api.Node) {
|
||||||
|
fitNodeList := []*api.Node{}
|
||||||
|
failedPredicates := FailedPredicateMap{}
|
||||||
|
noCacheNodeList := []*api.Node{}
|
||||||
|
equivalenceHash := ec.hashEquivalencePod(pod)
|
||||||
|
for _, node := range nodes {
|
||||||
|
findCache := false
|
||||||
|
if algorithmCache, exist := ec.algorithmCache[node.Name]; exist {
|
||||||
|
if cachePredicate, exist := algorithmCache.predicatesCache.Get(equivalenceHash); exist {
|
||||||
|
hostPredicate := cachePredicate.(HostPredicate)
|
||||||
|
if hostPredicate.Fit {
|
||||||
|
fitNodeList = append(fitNodeList, node)
|
||||||
|
} else {
|
||||||
|
failedPredicates[node.Name] = hostPredicate.FailReasons
|
||||||
|
}
|
||||||
|
findCache = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if !findCache {
|
||||||
|
noCacheNodeList = append(noCacheNodeList, node)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return fitNodeList, failedPredicates, noCacheNodeList
|
||||||
|
}
|
||||||
|
|
||||||
|
// SendInvalidAlgorithmCacheReq marks AlgorithmCache item as invalid
|
||||||
|
func (ec *EquivalenceCache) SendInvalidAlgorithmCacheReq(nodeName string) {
|
||||||
|
ec.Lock()
|
||||||
|
defer ec.Unlock()
|
||||||
|
// clear the cache of this node
|
||||||
|
delete(ec.algorithmCache, nodeName)
|
||||||
|
}
|
||||||
|
|
||||||
|
// SendClearAllCacheReq marks all cached item as invalid
|
||||||
|
func (ec *EquivalenceCache) SendClearAllCacheReq() {
|
||||||
|
ec.Lock()
|
||||||
|
defer ec.Unlock()
|
||||||
|
// clear cache of all nodes
|
||||||
|
for nodeName := range ec.algorithmCache {
|
||||||
|
delete(ec.algorithmCache, nodeName)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// hashEquivalencePod returns the hash of equivalence pod.
|
||||||
|
func (ec *EquivalenceCache) hashEquivalencePod(pod *api.Pod) uint64 {
|
||||||
|
equivalencePod := ec.getEquivalencePod(pod)
|
||||||
|
hash := adler32.New()
|
||||||
|
hashutil.DeepHashObject(hash, equivalencePod)
|
||||||
|
return uint64(hash.Sum32())
|
||||||
|
}
|
@ -76,8 +76,12 @@ var (
|
|||||||
fitPredicateMap = make(map[string]FitPredicateFactory)
|
fitPredicateMap = make(map[string]FitPredicateFactory)
|
||||||
priorityFunctionMap = make(map[string]PriorityConfigFactory)
|
priorityFunctionMap = make(map[string]PriorityConfigFactory)
|
||||||
algorithmProviderMap = make(map[string]AlgorithmProviderConfig)
|
algorithmProviderMap = make(map[string]AlgorithmProviderConfig)
|
||||||
|
|
||||||
// Registered metadata producers
|
// Registered metadata producers
|
||||||
priorityMetadataProducer MetadataProducerFactory
|
priorityMetadataProducer MetadataProducerFactory
|
||||||
|
|
||||||
|
// get equivalence pod function
|
||||||
|
getEquivalencePodFunc algorithm.GetEquivalencePodFunc = nil
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
@ -244,6 +248,10 @@ func RegisterCustomPriorityFunction(policy schedulerapi.PriorityPolicy) string {
|
|||||||
return RegisterPriorityConfigFactory(policy.Name, *pcf)
|
return RegisterPriorityConfigFactory(policy.Name, *pcf)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func RegisterGetEquivalencePodFunction(equivalenceFunc algorithm.GetEquivalencePodFunc) {
|
||||||
|
getEquivalencePodFunc = equivalenceFunc
|
||||||
|
}
|
||||||
|
|
||||||
// This check is useful for testing providers.
|
// This check is useful for testing providers.
|
||||||
func IsPriorityFunctionRegistered(name string) bool {
|
func IsPriorityFunctionRegistered(name string) bool {
|
||||||
schedulerFactoryMutex.Lock()
|
schedulerFactoryMutex.Lock()
|
||||||
|
Loading…
Reference in New Issue
Block a user