Remove direct accesses to cache's node map

Signed-off-by: Aldo Culquicondor <acondor@google.com>
Change-Id: Iebb22fc816926aaa1ddd1e4b2e52f335a275ffaa
Signed-off-by: Aldo Culquicondor <acondor@google.com>
This commit is contained in:
Aldo Culquicondor 2020-08-13 14:03:00 -04:00
parent eb8b5a9854
commit 16d7ecfa45
10 changed files with 28 additions and 49 deletions

View File

@ -84,7 +84,6 @@ go_test(
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library",

View File

@ -16,7 +16,6 @@ go_library(
"//pkg/scheduler/metrics:go_default_library",
"//pkg/util/node:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",

View File

@ -22,7 +22,6 @@ import (
"time"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature"
@ -315,7 +314,9 @@ func (cache *schedulerCache) removeDeletedNodesFromSnapshot(snapshot *Snapshot)
}
}
func (cache *schedulerCache) ListPods(selector labels.Selector) ([]*v1.Pod, error) {
// PodCount returns the number of pods in the cache (including those from deleted nodes).
// DO NOT use outside of tests.
func (cache *schedulerCache) PodCount() (int, error) {
cache.mu.RLock()
defer cache.mu.RUnlock()
// podFilter is expected to return true for most or all of the pods. We
@ -325,15 +326,11 @@ func (cache *schedulerCache) ListPods(selector labels.Selector) ([]*v1.Pod, erro
for _, n := range cache.nodes {
maxSize += len(n.info.Pods)
}
pods := make([]*v1.Pod, 0, maxSize)
count := 0
for _, n := range cache.nodes {
for _, p := range n.info.Pods {
if selector.Matches(labels.Set(p.Pod.Labels)) {
pods = append(pods, p.Pod)
count += len(n.info.Pods)
}
}
}
return pods, nil
return count, nil
}
func (cache *schedulerCache) AssumePod(pod *v1.Pod) error {
@ -736,19 +733,6 @@ func (cache *schedulerCache) expirePod(key string, ps *podState) error {
return nil
}
// GetNodeInfo returns cached data for the node name.
func (cache *schedulerCache) GetNodeInfo(nodeName string) (*v1.Node, error) {
cache.mu.RLock()
defer cache.mu.RUnlock()
n, ok := cache.nodes[nodeName]
if !ok {
return nil, fmt.Errorf("node %q not found in cache", nodeName)
}
return n.info.Node(), nil
}
// updateMetrics updates cache size metric values for pods, assumed pods, and nodes
func (cache *schedulerCache) updateMetrics() {
metrics.CacheSize.WithLabelValues("assumed_pods").Set(float64(len(cache.assumedPods)))

View File

@ -1142,7 +1142,7 @@ func TestNodeOperators(t *testing.T) {
if err := cache.RemoveNode(node); err != nil {
t.Error(err)
}
if _, err := cache.GetNodeInfo(node.Name); err == nil {
if _, err := cache.getNodeInfo(node.Name); err == nil {
t.Errorf("The node %v should be removed.", node.Name)
}
// Check node is removed from nodeTree as well.
@ -1798,3 +1798,16 @@ func isForgottenFromCache(p *v1.Pod, c *schedulerCache) error {
}
return nil
}
// getNodeInfo returns cached data for the node name.
func (cache *schedulerCache) getNodeInfo(nodeName string) (*v1.Node, error) {
cache.mu.RLock()
defer cache.mu.RUnlock()
n, ok := cache.nodes[nodeName]
if !ok {
return nil, fmt.Errorf("node %q not found in cache", nodeName)
}
return n.info.Node(), nil
}

View File

@ -8,7 +8,6 @@ go_library(
deps = [
"//pkg/scheduler/internal/cache:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
],
)

View File

@ -18,7 +18,6 @@ package fake
import (
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
)
@ -78,20 +77,10 @@ func (c *Cache) UpdateSnapshot(snapshot *internalcache.Snapshot) error {
return nil
}
// ListPods is a fake method for testing.
func (c *Cache) ListPods(s labels.Selector) ([]*v1.Pod, error) { return nil, nil }
// PodCount is a fake method for testing.
func (c *Cache) PodCount() (int, error) { return 0, nil }
// Dump is a fake method for testing.
func (c *Cache) Dump() *internalcache.Dump {
return &internalcache.Dump{}
}
// GetNodeInfo is a fake method for testing.
func (c *Cache) GetNodeInfo(nodeName string) (*v1.Node, error) {
return nil, nil
}
// ListNodes is a fake method for testing.
func (c *Cache) ListNodes() []*v1.Node {
return nil
}

View File

@ -18,7 +18,6 @@ package cache
import (
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
)
@ -57,8 +56,8 @@ import (
// - Both "Expired" and "Deleted" are valid end states. In case of some problems, e.g. network issue,
// a pod might have changed its state (e.g. added and deleted) without delivering notification to the cache.
type Cache interface {
// ListPods lists all pods in the cache.
ListPods(selector labels.Selector) ([]*v1.Pod, error)
// PodCount returns the number of pods in the cache (including those from deleted nodes).
PodCount() (int, error)
// AssumePod assumes a pod scheduled and aggregates the pod's information into its node.
// The implementation also decides the policy to expire pod before being confirmed (receiving Add event).

View File

@ -36,7 +36,6 @@ import (
eventsv1 "k8s.io/api/events/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
@ -527,12 +526,12 @@ func TestSchedulerNoPhantomPodAfterExpire(t *testing.T) {
return
default:
}
pods, err := scache.ListPods(labels.Everything())
pods, err := scache.PodCount()
if err != nil {
errChan <- fmt.Errorf("cache.List failed: %v", err)
return
}
if len(pods) == 0 {
if pods == 0 {
close(waitPodExpireChan)
return
}

View File

@ -96,7 +96,6 @@ go_library(
"//staging/src/k8s.io/api/policy/v1beta1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/client-go/discovery/cached/memory:go_default_library",
"//staging/src/k8s.io/client-go/dynamic:go_default_library",

View File

@ -26,7 +26,6 @@ import (
policy "k8s.io/api/policy/v1beta1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/wait"
cacheddiscovery "k8s.io/client-go/discovery/cached/memory"
"k8s.io/client-go/dynamic"
@ -401,11 +400,11 @@ func waitForPDBsStable(testCtx *testutils.TestContext, pdbs []*policy.PodDisrupt
// waitCachedPodsStable waits until scheduler cache has the given pods.
func waitCachedPodsStable(testCtx *testutils.TestContext, pods []*v1.Pod) error {
return wait.Poll(time.Second, 30*time.Second, func() (bool, error) {
cachedPods, err := testCtx.Scheduler.SchedulerCache.ListPods(labels.Everything())
cachedPods, err := testCtx.Scheduler.SchedulerCache.PodCount()
if err != nil {
return false, err
}
if len(pods) != len(cachedPods) {
if len(pods) != cachedPods {
return false, nil
}
for _, p := range pods {