mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-22 11:21:47 +00:00
Clean up names in equivalence package.
Remove stutter from names and provide more idiomatic patterns. This makes call sites that use equivalence cache easier to read.
This commit is contained in:
parent
31c746d960
commit
b571065bc4
@ -31,19 +31,51 @@ import (
|
||||
"github.com/golang/glog"
|
||||
)
|
||||
|
||||
// EquivalenceCache saves and reuses the output of predicate functions. Use
|
||||
// RunPredicate to get or update the cached results. An appropriate Invalidate*
|
||||
// function should be called when some predicate results are no longer valid.
|
||||
// Cache saves and reuses the output of predicate functions. Use RunPredicate to
|
||||
// get or update the cached results. An appropriate Invalidate* function should
|
||||
// be called when some predicate results are no longer valid.
|
||||
//
|
||||
// Internally, results are keyed by node name, predicate name, and "equivalence
|
||||
// class". (Equivalence class is defined in the `EquivalenceClassInfo` type.)
|
||||
// Saved results will be reused until an appropriate invalidation function is
|
||||
// called.
|
||||
type EquivalenceCache struct {
|
||||
// class". (Equivalence class is defined in the `Class` type.) Saved results
|
||||
// will be reused until an appropriate invalidation function is called.
|
||||
type Cache struct {
|
||||
mu sync.RWMutex
|
||||
cache nodeMap
|
||||
}
|
||||
|
||||
// NewCache returns an empty Cache.
|
||||
func NewCache() *Cache {
|
||||
return &Cache{
|
||||
cache: make(nodeMap),
|
||||
}
|
||||
}
|
||||
|
||||
// Class represents a set of pods which are equivalent from the perspective of
|
||||
// the scheduler. i.e. the scheduler would make the same decision for any pod
|
||||
// from the same class.
|
||||
type Class struct {
|
||||
// Equivalence hash
|
||||
hash uint64
|
||||
}
|
||||
|
||||
// NewClass returns the equivalence class for a given Pod. The returned Class
|
||||
// objects will be equal for two Pods in the same class. nil values should not
|
||||
// be considered equal to each other.
|
||||
//
|
||||
// NOTE: Make sure to compare types of Class and not *Class.
|
||||
// TODO(misterikkit): Return error instead of nil *Class.
|
||||
func NewClass(pod *v1.Pod) *Class {
|
||||
equivalencePod := getEquivalencePod(pod)
|
||||
if equivalencePod != nil {
|
||||
hash := fnv.New32a()
|
||||
hashutil.DeepHashObject(hash, equivalencePod)
|
||||
return &Class{
|
||||
hash: uint64(hash.Sum32()),
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// nodeMap stores PredicateCaches with node name as the key.
|
||||
type nodeMap map[string]predicateMap
|
||||
|
||||
@ -59,25 +91,17 @@ type predicateResult struct {
|
||||
FailReasons []algorithm.PredicateFailureReason
|
||||
}
|
||||
|
||||
// NewEquivalenceCache returns EquivalenceCache to speed up predicates by caching
|
||||
// result from previous scheduling.
|
||||
func NewEquivalenceCache() *EquivalenceCache {
|
||||
return &EquivalenceCache{
|
||||
cache: make(nodeMap),
|
||||
}
|
||||
}
|
||||
|
||||
// RunPredicate returns a cached predicate result. In case of a cache miss, the predicate will be
|
||||
// run and its results cached for the next call.
|
||||
//
|
||||
// NOTE: RunPredicate will not update the equivalence cache if the given NodeInfo is stale.
|
||||
func (ec *EquivalenceCache) RunPredicate(
|
||||
func (c *Cache) RunPredicate(
|
||||
pred algorithm.FitPredicate,
|
||||
predicateKey string,
|
||||
pod *v1.Pod,
|
||||
meta algorithm.PredicateMetadata,
|
||||
nodeInfo *schedulercache.NodeInfo,
|
||||
equivClassInfo *EquivalenceClassInfo,
|
||||
equivClass *Class,
|
||||
cache schedulercache.Cache,
|
||||
) (bool, []algorithm.PredicateFailureReason, error) {
|
||||
if nodeInfo == nil || nodeInfo.Node() == nil {
|
||||
@ -85,7 +109,7 @@ func (ec *EquivalenceCache) RunPredicate(
|
||||
return false, []algorithm.PredicateFailureReason{}, fmt.Errorf("nodeInfo is nil or node is invalid")
|
||||
}
|
||||
|
||||
result, ok := ec.lookupResult(pod.GetName(), nodeInfo.Node().GetName(), predicateKey, equivClassInfo.hash)
|
||||
result, ok := c.lookupResult(pod.GetName(), nodeInfo.Node().GetName(), predicateKey, equivClass.hash)
|
||||
if ok {
|
||||
return result.Fit, result.FailReasons, nil
|
||||
}
|
||||
@ -94,13 +118,13 @@ func (ec *EquivalenceCache) RunPredicate(
|
||||
return fit, reasons, err
|
||||
}
|
||||
if cache != nil {
|
||||
ec.updateResult(pod.GetName(), predicateKey, fit, reasons, equivClassInfo.hash, cache, nodeInfo)
|
||||
c.updateResult(pod.GetName(), predicateKey, fit, reasons, equivClass.hash, cache, nodeInfo)
|
||||
}
|
||||
return fit, reasons, nil
|
||||
}
|
||||
|
||||
// updateResult updates the cached result of a predicate.
|
||||
func (ec *EquivalenceCache) updateResult(
|
||||
func (c *Cache) updateResult(
|
||||
podName, predicateKey string,
|
||||
fit bool,
|
||||
reasons []algorithm.PredicateFailureReason,
|
||||
@ -108,8 +132,8 @@ func (ec *EquivalenceCache) updateResult(
|
||||
cache schedulercache.Cache,
|
||||
nodeInfo *schedulercache.NodeInfo,
|
||||
) {
|
||||
ec.mu.Lock()
|
||||
defer ec.mu.Unlock()
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
if nodeInfo == nil || nodeInfo.Node() == nil {
|
||||
// This may happen during tests.
|
||||
return
|
||||
@ -119,19 +143,19 @@ func (ec *EquivalenceCache) updateResult(
|
||||
return
|
||||
}
|
||||
nodeName := nodeInfo.Node().GetName()
|
||||
if _, exist := ec.cache[nodeName]; !exist {
|
||||
ec.cache[nodeName] = make(predicateMap)
|
||||
if _, exist := c.cache[nodeName]; !exist {
|
||||
c.cache[nodeName] = make(predicateMap)
|
||||
}
|
||||
predicateItem := predicateResult{
|
||||
Fit: fit,
|
||||
FailReasons: reasons,
|
||||
}
|
||||
// if cached predicate map already exists, just update the predicate by key
|
||||
if predicates, ok := ec.cache[nodeName][predicateKey]; ok {
|
||||
if predicates, ok := c.cache[nodeName][predicateKey]; ok {
|
||||
// maps in golang are references, no need to add them back
|
||||
predicates[equivalenceHash] = predicateItem
|
||||
} else {
|
||||
ec.cache[nodeName][predicateKey] =
|
||||
c.cache[nodeName][predicateKey] =
|
||||
resultMap{
|
||||
equivalenceHash: predicateItem,
|
||||
}
|
||||
@ -141,27 +165,27 @@ func (ec *EquivalenceCache) updateResult(
|
||||
|
||||
// lookupResult returns cached predicate results and a bool saying whether a
|
||||
// cache entry was found.
|
||||
func (ec *EquivalenceCache) lookupResult(
|
||||
func (c *Cache) lookupResult(
|
||||
podName, nodeName, predicateKey string,
|
||||
equivalenceHash uint64,
|
||||
) (value predicateResult, ok bool) {
|
||||
ec.mu.RLock()
|
||||
defer ec.mu.RUnlock()
|
||||
c.mu.RLock()
|
||||
defer c.mu.RUnlock()
|
||||
glog.V(5).Infof("Begin to calculate predicate: %v for pod: %s on node: %s based on equivalence cache",
|
||||
predicateKey, podName, nodeName)
|
||||
value, ok = ec.cache[nodeName][predicateKey][equivalenceHash]
|
||||
value, ok = c.cache[nodeName][predicateKey][equivalenceHash]
|
||||
return value, ok
|
||||
}
|
||||
|
||||
// InvalidatePredicates clears all cached results for the given predicates.
|
||||
func (ec *EquivalenceCache) InvalidatePredicates(predicateKeys sets.String) {
|
||||
func (c *Cache) InvalidatePredicates(predicateKeys sets.String) {
|
||||
if len(predicateKeys) == 0 {
|
||||
return
|
||||
}
|
||||
ec.mu.Lock()
|
||||
defer ec.mu.Unlock()
|
||||
// ec.cache uses nodeName as key, so we just iterate it and invalid given predicates
|
||||
for _, predicates := range ec.cache {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
// c.cache uses nodeName as key, so we just iterate it and invalid given predicates
|
||||
for _, predicates := range c.cache {
|
||||
for predicateKey := range predicateKeys {
|
||||
delete(predicates, predicateKey)
|
||||
}
|
||||
@ -170,30 +194,30 @@ func (ec *EquivalenceCache) InvalidatePredicates(predicateKeys sets.String) {
|
||||
}
|
||||
|
||||
// InvalidatePredicatesOnNode clears cached results for the given predicates on one node.
|
||||
func (ec *EquivalenceCache) InvalidatePredicatesOnNode(nodeName string, predicateKeys sets.String) {
|
||||
func (c *Cache) InvalidatePredicatesOnNode(nodeName string, predicateKeys sets.String) {
|
||||
if len(predicateKeys) == 0 {
|
||||
return
|
||||
}
|
||||
ec.mu.Lock()
|
||||
defer ec.mu.Unlock()
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
for predicateKey := range predicateKeys {
|
||||
delete(ec.cache[nodeName], predicateKey)
|
||||
delete(c.cache[nodeName], predicateKey)
|
||||
}
|
||||
glog.V(5).Infof("Done invalidating cached predicates: %v on node: %s", predicateKeys, nodeName)
|
||||
}
|
||||
|
||||
// InvalidateAllPredicatesOnNode clears all cached results for one node.
|
||||
func (ec *EquivalenceCache) InvalidateAllPredicatesOnNode(nodeName string) {
|
||||
ec.mu.Lock()
|
||||
defer ec.mu.Unlock()
|
||||
delete(ec.cache, nodeName)
|
||||
func (c *Cache) InvalidateAllPredicatesOnNode(nodeName string) {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
delete(c.cache, nodeName)
|
||||
glog.V(5).Infof("Done invalidating all cached predicates on node: %s", nodeName)
|
||||
}
|
||||
|
||||
// InvalidateCachedPredicateItemForPodAdd is a wrapper of
|
||||
// InvalidateCachedPredicateItem for pod add case
|
||||
// TODO: This logic does not belong with the equivalence cache implementation.
|
||||
func (ec *EquivalenceCache) InvalidateCachedPredicateItemForPodAdd(pod *v1.Pod, nodeName string) {
|
||||
// TODO: This does not belong with the equivalence cache implementation.
|
||||
func (c *Cache) InvalidateCachedPredicateItemForPodAdd(pod *v1.Pod, nodeName string) {
|
||||
// MatchInterPodAffinity: we assume scheduler can make sure newly bound pod
|
||||
// will not break the existing inter pod affinity. So we does not need to
|
||||
// invalidate MatchInterPodAffinity when pod added.
|
||||
@ -226,30 +250,7 @@ func (ec *EquivalenceCache) InvalidateCachedPredicateItemForPodAdd(pod *v1.Pod,
|
||||
}
|
||||
}
|
||||
}
|
||||
ec.InvalidatePredicatesOnNode(nodeName, invalidPredicates)
|
||||
}
|
||||
|
||||
// EquivalenceClassInfo holds equivalence hash which is used for checking
|
||||
// equivalence cache. We will pass this to podFitsOnNode to ensure equivalence
|
||||
// hash is only calculated per schedule.
|
||||
type EquivalenceClassInfo struct {
|
||||
// Equivalence hash.
|
||||
hash uint64
|
||||
}
|
||||
|
||||
// GetEquivalenceClassInfo returns a hash of the given pod. The hashing function
|
||||
// returns the same value for any two pods that are equivalent from the
|
||||
// perspective of scheduling.
|
||||
func (ec *EquivalenceCache) GetEquivalenceClassInfo(pod *v1.Pod) *EquivalenceClassInfo {
|
||||
equivalencePod := getEquivalencePod(pod)
|
||||
if equivalencePod != nil {
|
||||
hash := fnv.New32a()
|
||||
hashutil.DeepHashObject(hash, equivalencePod)
|
||||
return &EquivalenceClassInfo{
|
||||
hash: uint64(hash.Sum32()),
|
||||
}
|
||||
}
|
||||
return nil
|
||||
c.InvalidatePredicatesOnNode(nodeName, invalidPredicates)
|
||||
}
|
||||
|
||||
// equivalencePod is the set of pod attributes which must match for two pods to
|
||||
|
@ -247,8 +247,8 @@ func TestRunPredicate(t *testing.T) {
|
||||
pod := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "p1"}}
|
||||
meta := algorithm.EmptyPredicateMetadataProducer(nil, nil)
|
||||
|
||||
ecache := NewEquivalenceCache()
|
||||
equivClass := ecache.GetEquivalenceClassInfo(pod)
|
||||
ecache := NewCache()
|
||||
equivClass := NewClass(pod)
|
||||
if test.expectCacheHit {
|
||||
ecache.updateResult(pod.Name, "testPredicate", test.expectFit, test.expectedReasons, equivClass.hash, test.cache, node)
|
||||
}
|
||||
@ -339,7 +339,7 @@ func TestUpdateResult(t *testing.T) {
|
||||
},
|
||||
}
|
||||
for _, test := range tests {
|
||||
ecache := NewEquivalenceCache()
|
||||
ecache := NewCache()
|
||||
if test.expectPredicateMap {
|
||||
ecache.cache[test.nodeName] = make(predicateMap)
|
||||
predicateItem := predicateResult{
|
||||
@ -473,7 +473,7 @@ func TestLookupResult(t *testing.T) {
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
ecache := NewEquivalenceCache()
|
||||
ecache := NewCache()
|
||||
node := schedulercache.NewNodeInfo()
|
||||
node.SetNode(&v1.Node{ObjectMeta: metav1.ObjectMeta{Name: test.nodeName}})
|
||||
// set cached item to equivalence cache
|
||||
@ -527,9 +527,6 @@ func TestLookupResult(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestGetEquivalenceHash(t *testing.T) {
|
||||
|
||||
ecache := NewEquivalenceCache()
|
||||
|
||||
pod1 := makeBasicPod("pod1")
|
||||
pod2 := makeBasicPod("pod2")
|
||||
|
||||
@ -620,7 +617,7 @@ func TestGetEquivalenceHash(t *testing.T) {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
for i, podInfo := range test.podInfoList {
|
||||
testPod := podInfo.pod
|
||||
eclassInfo := ecache.GetEquivalenceClassInfo(testPod)
|
||||
eclassInfo := NewClass(testPod)
|
||||
if eclassInfo == nil && podInfo.hashIsValid {
|
||||
t.Errorf("Failed: pod %v is expected to have valid hash", testPod)
|
||||
}
|
||||
@ -688,7 +685,7 @@ func TestInvalidateCachedPredicateItemOfAllNodes(t *testing.T) {
|
||||
cache: &upToDateCache{},
|
||||
},
|
||||
}
|
||||
ecache := NewEquivalenceCache()
|
||||
ecache := NewCache()
|
||||
|
||||
for _, test := range tests {
|
||||
node := schedulercache.NewNodeInfo()
|
||||
@ -760,7 +757,7 @@ func TestInvalidateAllCachedPredicateItemOfNode(t *testing.T) {
|
||||
cache: &upToDateCache{},
|
||||
},
|
||||
}
|
||||
ecache := NewEquivalenceCache()
|
||||
ecache := NewCache()
|
||||
|
||||
for _, test := range tests {
|
||||
node := schedulercache.NewNodeInfo()
|
||||
|
@ -86,7 +86,7 @@ func (f *FitError) Error() string {
|
||||
|
||||
type genericScheduler struct {
|
||||
cache schedulercache.Cache
|
||||
equivalenceCache *equivalence.EquivalenceCache
|
||||
equivalenceCache *equivalence.Cache
|
||||
schedulingQueue SchedulingQueue
|
||||
predicates map[string]algorithm.FitPredicate
|
||||
priorityMetaProducer algorithm.PriorityMetadataProducer
|
||||
@ -343,10 +343,10 @@ func (g *genericScheduler) findNodesThatFit(pod *v1.Pod, nodes []*v1.Node) ([]*v
|
||||
// We can use the same metadata producer for all nodes.
|
||||
meta := g.predicateMetaProducer(pod, g.cachedNodeInfoMap)
|
||||
|
||||
var equivCacheInfo *equivalence.EquivalenceClassInfo
|
||||
var equivClass *equivalence.Class
|
||||
if g.equivalenceCache != nil {
|
||||
// getEquivalenceClassInfo will return immediately if no equivalence pod found
|
||||
equivCacheInfo = g.equivalenceCache.GetEquivalenceClassInfo(pod)
|
||||
equivClass = equivalence.NewClass(pod)
|
||||
}
|
||||
|
||||
checkNode := func(i int) {
|
||||
@ -360,7 +360,7 @@ func (g *genericScheduler) findNodesThatFit(pod *v1.Pod, nodes []*v1.Node) ([]*v
|
||||
g.equivalenceCache,
|
||||
g.schedulingQueue,
|
||||
g.alwaysCheckAllPredicates,
|
||||
equivCacheInfo,
|
||||
equivClass,
|
||||
)
|
||||
if err != nil {
|
||||
predicateResultLock.Lock()
|
||||
@ -460,10 +460,10 @@ func podFitsOnNode(
|
||||
info *schedulercache.NodeInfo,
|
||||
predicateFuncs map[string]algorithm.FitPredicate,
|
||||
cache schedulercache.Cache,
|
||||
ecache *equivalence.EquivalenceCache,
|
||||
ecache *equivalence.Cache,
|
||||
queue SchedulingQueue,
|
||||
alwaysCheckAllPredicates bool,
|
||||
equivCacheInfo *equivalence.EquivalenceClassInfo,
|
||||
equivClass *equivalence.Class,
|
||||
) (bool, []algorithm.PredicateFailureReason, error) {
|
||||
var (
|
||||
eCacheAvailable bool
|
||||
@ -500,7 +500,7 @@ func podFitsOnNode(
|
||||
// Bypass eCache if node has any nominated pods.
|
||||
// TODO(bsalamat): consider using eCache and adding proper eCache invalidations
|
||||
// when pods are nominated or their nominations change.
|
||||
eCacheAvailable = equivCacheInfo != nil && !podsAdded
|
||||
eCacheAvailable = equivClass != nil && !podsAdded
|
||||
for _, predicateKey := range predicates.Ordering() {
|
||||
var (
|
||||
fit bool
|
||||
@ -510,7 +510,7 @@ func podFitsOnNode(
|
||||
//TODO (yastij) : compute average predicate restrictiveness to export it as Prometheus metric
|
||||
if predicate, exist := predicateFuncs[predicateKey]; exist {
|
||||
if eCacheAvailable {
|
||||
fit, reasons, err = ecache.RunPredicate(predicate, predicateKey, pod, metaToUse, nodeInfoToUse, equivCacheInfo, cache)
|
||||
fit, reasons, err = ecache.RunPredicate(predicate, predicateKey, pod, metaToUse, nodeInfoToUse, equivClass, cache)
|
||||
} else {
|
||||
fit, reasons, err = predicate(pod, metaToUse, nodeInfoToUse)
|
||||
}
|
||||
@ -1057,7 +1057,7 @@ func podPassesBasicChecks(pod *v1.Pod, pvcLister corelisters.PersistentVolumeCla
|
||||
// NewGenericScheduler creates a genericScheduler object.
|
||||
func NewGenericScheduler(
|
||||
cache schedulercache.Cache,
|
||||
eCache *equivalence.EquivalenceCache,
|
||||
eCache *equivalence.Cache,
|
||||
podQueue SchedulingQueue,
|
||||
predicates map[string]algorithm.FitPredicate,
|
||||
predicateMetaProducer algorithm.PredicateMetadataProducer,
|
||||
|
@ -1367,10 +1367,10 @@ func (c *syncingMockCache) UpdateNodeNameToInfoMap(infoMap map[string]*scheduler
|
||||
return err
|
||||
}
|
||||
|
||||
// TestEquivalenceCacheInvalidationRace tests that equivalence cache invalidation is correctly
|
||||
// TestCacheInvalidationRace tests that equivalence cache invalidation is correctly
|
||||
// handled when an invalidation event happens early in a scheduling cycle. Specifically, the event
|
||||
// occurs after schedulercache is snapshotted and before equivalence cache lock is acquired.
|
||||
func TestEquivalenceCacheInvalidationRace(t *testing.T) {
|
||||
func TestCacheInvalidationRace(t *testing.T) {
|
||||
// Create a predicate that returns false the first time and true on subsequent calls.
|
||||
podWillFit := false
|
||||
var callCount int
|
||||
@ -1394,7 +1394,7 @@ func TestEquivalenceCacheInvalidationRace(t *testing.T) {
|
||||
cacheInvalidated: make(chan struct{}),
|
||||
}
|
||||
|
||||
eCache := equivalence.NewEquivalenceCache()
|
||||
eCache := equivalence.NewCache()
|
||||
// Ensure that equivalence cache invalidation happens after the scheduling cycle starts, but before
|
||||
// the equivalence cache would be updated.
|
||||
go func() {
|
||||
|
@ -124,7 +124,7 @@ type configFactory struct {
|
||||
hardPodAffinitySymmetricWeight int32
|
||||
|
||||
// Equivalence class cache
|
||||
equivalencePodCache *equivalence.EquivalenceCache
|
||||
equivalencePodCache *equivalence.Cache
|
||||
|
||||
// Enable equivalence class cache
|
||||
enableEquivalenceClassCache bool
|
||||
@ -1075,7 +1075,7 @@ func (c *configFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String,
|
||||
|
||||
// Init equivalence class cache
|
||||
if c.enableEquivalenceClassCache {
|
||||
c.equivalencePodCache = equivalence.NewEquivalenceCache()
|
||||
c.equivalencePodCache = equivalence.NewCache()
|
||||
glog.Info("Created equivalence class cache")
|
||||
}
|
||||
|
||||
|
@ -105,7 +105,7 @@ type Config struct {
|
||||
SchedulerCache schedulercache.Cache
|
||||
// Ecache is used for optimistically invalid affected cache items after
|
||||
// successfully binding a pod
|
||||
Ecache *equivalence.EquivalenceCache
|
||||
Ecache *equivalence.Cache
|
||||
NodeLister algorithm.NodeLister
|
||||
Algorithm algorithm.ScheduleAlgorithm
|
||||
GetBinder func(pod *v1.Pod) Binder
|
||||
|
Loading…
Reference in New Issue
Block a user