From 16d7ecfa45b191d85592c93ee8a7dd62228f5274 Mon Sep 17 00:00:00 2001 From: Aldo Culquicondor Date: Thu, 13 Aug 2020 14:03:00 -0400 Subject: [PATCH 1/2] Remove direct accesses to cache's node map Signed-off-by: Aldo Culquicondor Change-Id: Iebb22fc816926aaa1ddd1e4b2e52f335a275ffaa Signed-off-by: Aldo Culquicondor --- pkg/scheduler/BUILD | 1 - pkg/scheduler/internal/cache/BUILD | 1 - pkg/scheduler/internal/cache/cache.go | 28 ++++--------------- pkg/scheduler/internal/cache/cache_test.go | 15 +++++++++- pkg/scheduler/internal/cache/fake/BUILD | 1 - .../internal/cache/fake/fake_cache.go | 15 ++-------- pkg/scheduler/internal/cache/interface.go | 5 ++-- pkg/scheduler/scheduler_test.go | 5 ++-- test/integration/scheduler/BUILD | 1 - test/integration/scheduler/util.go | 5 ++-- 10 files changed, 28 insertions(+), 49 deletions(-) diff --git a/pkg/scheduler/BUILD b/pkg/scheduler/BUILD index 230865207e8..f6f77557d85 100644 --- a/pkg/scheduler/BUILD +++ b/pkg/scheduler/BUILD @@ -84,7 +84,6 @@ go_test( "//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", - "//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library", diff --git a/pkg/scheduler/internal/cache/BUILD b/pkg/scheduler/internal/cache/BUILD index 67a128db343..a359f89db2c 100644 --- a/pkg/scheduler/internal/cache/BUILD +++ b/pkg/scheduler/internal/cache/BUILD @@ -16,7 +16,6 @@ go_library( "//pkg/scheduler/metrics:go_default_library", "//pkg/util/node:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", - "//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", diff --git a/pkg/scheduler/internal/cache/cache.go b/pkg/scheduler/internal/cache/cache.go index 9f40b620825..d7dca478988 100644 --- a/pkg/scheduler/internal/cache/cache.go +++ b/pkg/scheduler/internal/cache/cache.go @@ -22,7 +22,6 @@ import ( "time" v1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" utilfeature "k8s.io/apiserver/pkg/util/feature" @@ -315,7 +314,9 @@ func (cache *schedulerCache) removeDeletedNodesFromSnapshot(snapshot *Snapshot) } } -func (cache *schedulerCache) ListPods(selector labels.Selector) ([]*v1.Pod, error) { +// PodCount returns the number of pods in the cache (including those from deleted nodes). +// DO NOT use outside of tests. +func (cache *schedulerCache) PodCount() (int, error) { cache.mu.RLock() defer cache.mu.RUnlock() // podFilter is expected to return true for most or all of the pods. We @@ -325,15 +326,11 @@ func (cache *schedulerCache) ListPods(selector labels.Selector) ([]*v1.Pod, erro for _, n := range cache.nodes { maxSize += len(n.info.Pods) } - pods := make([]*v1.Pod, 0, maxSize) + count := 0 for _, n := range cache.nodes { - for _, p := range n.info.Pods { - if selector.Matches(labels.Set(p.Pod.Labels)) { - pods = append(pods, p.Pod) - } - } + count += len(n.info.Pods) } - return pods, nil + return count, nil } func (cache *schedulerCache) AssumePod(pod *v1.Pod) error { @@ -736,19 +733,6 @@ func (cache *schedulerCache) expirePod(key string, ps *podState) error { return nil } -// GetNodeInfo returns cached data for the node name. -func (cache *schedulerCache) GetNodeInfo(nodeName string) (*v1.Node, error) { - cache.mu.RLock() - defer cache.mu.RUnlock() - - n, ok := cache.nodes[nodeName] - if !ok { - return nil, fmt.Errorf("node %q not found in cache", nodeName) - } - - return n.info.Node(), nil -} - // updateMetrics updates cache size metric values for pods, assumed pods, and nodes func (cache *schedulerCache) updateMetrics() { metrics.CacheSize.WithLabelValues("assumed_pods").Set(float64(len(cache.assumedPods))) diff --git a/pkg/scheduler/internal/cache/cache_test.go b/pkg/scheduler/internal/cache/cache_test.go index 5a8d3fb6f54..aea35689b7c 100644 --- a/pkg/scheduler/internal/cache/cache_test.go +++ b/pkg/scheduler/internal/cache/cache_test.go @@ -1142,7 +1142,7 @@ func TestNodeOperators(t *testing.T) { if err := cache.RemoveNode(node); err != nil { t.Error(err) } - if _, err := cache.GetNodeInfo(node.Name); err == nil { + if _, err := cache.getNodeInfo(node.Name); err == nil { t.Errorf("The node %v should be removed.", node.Name) } // Check node is removed from nodeTree as well. @@ -1798,3 +1798,16 @@ func isForgottenFromCache(p *v1.Pod, c *schedulerCache) error { } return nil } + +// getNodeInfo returns cached data for the node name. +func (cache *schedulerCache) getNodeInfo(nodeName string) (*v1.Node, error) { + cache.mu.RLock() + defer cache.mu.RUnlock() + + n, ok := cache.nodes[nodeName] + if !ok { + return nil, fmt.Errorf("node %q not found in cache", nodeName) + } + + return n.info.Node(), nil +} diff --git a/pkg/scheduler/internal/cache/fake/BUILD b/pkg/scheduler/internal/cache/fake/BUILD index a3363de0ec9..acdceb80fba 100644 --- a/pkg/scheduler/internal/cache/fake/BUILD +++ b/pkg/scheduler/internal/cache/fake/BUILD @@ -8,7 +8,6 @@ go_library( deps = [ "//pkg/scheduler/internal/cache:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", - "//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library", ], ) diff --git a/pkg/scheduler/internal/cache/fake/fake_cache.go b/pkg/scheduler/internal/cache/fake/fake_cache.go index 80a775988ea..b0b5013ab16 100644 --- a/pkg/scheduler/internal/cache/fake/fake_cache.go +++ b/pkg/scheduler/internal/cache/fake/fake_cache.go @@ -18,7 +18,6 @@ package fake import ( v1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/labels" internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache" ) @@ -78,20 +77,10 @@ func (c *Cache) UpdateSnapshot(snapshot *internalcache.Snapshot) error { return nil } -// ListPods is a fake method for testing. -func (c *Cache) ListPods(s labels.Selector) ([]*v1.Pod, error) { return nil, nil } +// PodCount is a fake method for testing. +func (c *Cache) PodCount() (int, error) { return 0, nil } // Dump is a fake method for testing. func (c *Cache) Dump() *internalcache.Dump { return &internalcache.Dump{} } - -// GetNodeInfo is a fake method for testing. -func (c *Cache) GetNodeInfo(nodeName string) (*v1.Node, error) { - return nil, nil -} - -// ListNodes is a fake method for testing. -func (c *Cache) ListNodes() []*v1.Node { - return nil -} diff --git a/pkg/scheduler/internal/cache/interface.go b/pkg/scheduler/internal/cache/interface.go index 17124e3e9f5..e9ab9db8fac 100644 --- a/pkg/scheduler/internal/cache/interface.go +++ b/pkg/scheduler/internal/cache/interface.go @@ -18,7 +18,6 @@ package cache import ( "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/labels" framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" ) @@ -57,8 +56,8 @@ import ( // - Both "Expired" and "Deleted" are valid end states. In case of some problems, e.g. network issue, // a pod might have changed its state (e.g. added and deleted) without delivering notification to the cache. type Cache interface { - // ListPods lists all pods in the cache. - ListPods(selector labels.Selector) ([]*v1.Pod, error) + // PodCount returns the number of pods in the cache (including those from deleted nodes). + PodCount() (int, error) // AssumePod assumes a pod scheduled and aggregates the pod's information into its node. // The implementation also decides the policy to expire pod before being confirmed (receiving Add event). diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index 914185bd879..5446f12657d 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -36,7 +36,6 @@ import ( eventsv1 "k8s.io/api/events/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" @@ -527,12 +526,12 @@ func TestSchedulerNoPhantomPodAfterExpire(t *testing.T) { return default: } - pods, err := scache.ListPods(labels.Everything()) + pods, err := scache.PodCount() if err != nil { errChan <- fmt.Errorf("cache.List failed: %v", err) return } - if len(pods) == 0 { + if pods == 0 { close(waitPodExpireChan) return } diff --git a/test/integration/scheduler/BUILD b/test/integration/scheduler/BUILD index fcc6d0d7802..41afd3119ef 100644 --- a/test/integration/scheduler/BUILD +++ b/test/integration/scheduler/BUILD @@ -96,7 +96,6 @@ go_library( "//staging/src/k8s.io/api/policy/v1beta1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", - "//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//staging/src/k8s.io/client-go/discovery/cached/memory:go_default_library", "//staging/src/k8s.io/client-go/dynamic:go_default_library", diff --git a/test/integration/scheduler/util.go b/test/integration/scheduler/util.go index b0fcec47fc2..e752a81726f 100644 --- a/test/integration/scheduler/util.go +++ b/test/integration/scheduler/util.go @@ -26,7 +26,6 @@ import ( policy "k8s.io/api/policy/v1beta1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/util/wait" cacheddiscovery "k8s.io/client-go/discovery/cached/memory" "k8s.io/client-go/dynamic" @@ -401,11 +400,11 @@ func waitForPDBsStable(testCtx *testutils.TestContext, pdbs []*policy.PodDisrupt // waitCachedPodsStable waits until scheduler cache has the given pods. func waitCachedPodsStable(testCtx *testutils.TestContext, pods []*v1.Pod) error { return wait.Poll(time.Second, 30*time.Second, func() (bool, error) { - cachedPods, err := testCtx.Scheduler.SchedulerCache.ListPods(labels.Everything()) + cachedPods, err := testCtx.Scheduler.SchedulerCache.PodCount() if err != nil { return false, err } - if len(pods) != len(cachedPods) { + if len(pods) != cachedPods { return false, nil } for _, p := range pods { From dfe9e413d9fab9037a4c8b62ce3bdddd7b0d58da Mon Sep 17 00:00:00 2001 From: Aldo Culquicondor Date: Wed, 12 Aug 2020 13:24:37 -0400 Subject: [PATCH 2/2] Keep track of remaining pods when a node is deleted. The apiserver is expected to send pod deletion events that might arrive at a different time. However, sometimes a node could be recreated without its pods being deleted. Partial revert of https://github.com/kubernetes/kubernetes/pull/86964 Signed-off-by: Aldo Culquicondor Change-Id: I51f683e5f05689b711c81ebff34e7118b5337571 --- pkg/scheduler/framework/v1alpha1/types.go | 6 +++ pkg/scheduler/internal/cache/cache.go | 45 +++++++++++-------- pkg/scheduler/internal/cache/cache_test.go | 12 ++--- .../internal/cache/debugger/dumper.go | 12 ++--- 4 files changed, 45 insertions(+), 30 deletions(-) diff --git a/pkg/scheduler/framework/v1alpha1/types.go b/pkg/scheduler/framework/v1alpha1/types.go index 915c5d1ec25..c878a7c9ffa 100644 --- a/pkg/scheduler/framework/v1alpha1/types.go +++ b/pkg/scheduler/framework/v1alpha1/types.go @@ -625,6 +625,12 @@ func (n *NodeInfo) SetNode(node *v1.Node) error { return nil } +// RemoveNode removes the node object, leaving all other tracking information. +func (n *NodeInfo) RemoveNode() { + n.node = nil + n.Generation = nextGeneration() +} + // FilterOutPods receives a list of pods and filters out those whose node names // are equal to the node of this NodeInfo, but are not found in the pods of this NodeInfo. // diff --git a/pkg/scheduler/internal/cache/cache.go b/pkg/scheduler/internal/cache/cache.go index d7dca478988..e0f44e78d59 100644 --- a/pkg/scheduler/internal/cache/cache.go +++ b/pkg/scheduler/internal/cache/cache.go @@ -420,13 +420,6 @@ func (cache *schedulerCache) addPod(pod *v1.Pod) { // Assumes that lock is already acquired. func (cache *schedulerCache) updatePod(oldPod, newPod *v1.Pod) error { - if _, ok := cache.nodes[newPod.Spec.NodeName]; !ok { - // The node might have been deleted already. - // This is not a problem in the case where a pod update arrives before the - // node creation, because we will always have a create pod event before - // that, which will create the placeholder node item. - return nil - } if err := cache.removePod(oldPod); err != nil { return err } @@ -435,18 +428,23 @@ func (cache *schedulerCache) updatePod(oldPod, newPod *v1.Pod) error { } // Assumes that lock is already acquired. -// Removes a pod from the cached node info. When a node is removed, some pod -// deletion events might arrive later. This is not a problem, as the pods in -// the node are assumed to be removed already. +// Removes a pod from the cached node info. If the node information was already +// removed and there are no more pods left in the node, cleans up the node from +// the cache. func (cache *schedulerCache) removePod(pod *v1.Pod) error { n, ok := cache.nodes[pod.Spec.NodeName] if !ok { + klog.Errorf("node %v not found when trying to remove pod %v", pod.Spec.NodeName, pod.Name) return nil } if err := n.info.RemovePod(pod); err != nil { return err } - cache.moveNodeInfoToHead(pod.Spec.NodeName) + if len(n.info.Pods) == 0 && n.info.Node() == nil { + cache.removeNodeInfoFromList(pod.Spec.NodeName) + } else { + cache.moveNodeInfoToHead(pod.Spec.NodeName) + } return nil } @@ -616,21 +614,30 @@ func (cache *schedulerCache) UpdateNode(oldNode, newNode *v1.Node) error { return n.info.SetNode(newNode) } -// RemoveNode removes a node from the cache. -// Some nodes might still have pods because their deletion events didn't arrive -// yet. For most intents and purposes, those pods are removed from the cache, -// having it's source of truth in the cached nodes. -// However, some information on pods (assumedPods, podStates) persist. These -// caches will be eventually consistent as pod deletion events arrive. +// RemoveNode removes a node from the cache's tree. +// The node might still have pods because their deletion events didn't arrive +// yet. Those pods are considered removed from the cache, being the node tree +// the source of truth. +// However, we keep a ghost node with the list of pods until all pod deletion +// events have arrived. A ghost node is skipped from snapshots. func (cache *schedulerCache) RemoveNode(node *v1.Node) error { cache.mu.Lock() defer cache.mu.Unlock() - _, ok := cache.nodes[node.Name] + n, ok := cache.nodes[node.Name] if !ok { return fmt.Errorf("node %v is not found", node.Name) } - cache.removeNodeInfoFromList(node.Name) + n.info.RemoveNode() + // We remove NodeInfo for this node only if there aren't any pods on this node. + // We can't do it unconditionally, because notifications about pods are delivered + // in a different watch, and thus can potentially be observed later, even though + // they happened before node removal. + if len(n.info.Pods) == 0 { + cache.removeNodeInfoFromList(node.Name) + } else { + cache.moveNodeInfoToHead(node.Name) + } if err := cache.nodeTree.removeNode(node); err != nil { return err } diff --git a/pkg/scheduler/internal/cache/cache_test.go b/pkg/scheduler/internal/cache/cache_test.go index aea35689b7c..1a5e86a4c9b 100644 --- a/pkg/scheduler/internal/cache/cache_test.go +++ b/pkg/scheduler/internal/cache/cache_test.go @@ -268,7 +268,7 @@ func TestExpirePod(t *testing.T) { {pod: testPods[0], finishBind: true, assumedTime: now}, }, cleanupTime: now.Add(2 * ttl), - wNodeInfo: framework.NewNodeInfo(), + wNodeInfo: nil, }, { // first one would expire, second and third would not. pods: []*testExpirePodStruct{ {pod: testPods[0], finishBind: true, assumedTime: now}, @@ -1142,10 +1142,12 @@ func TestNodeOperators(t *testing.T) { if err := cache.RemoveNode(node); err != nil { t.Error(err) } - if _, err := cache.getNodeInfo(node.Name); err == nil { - t.Errorf("The node %v should be removed.", node.Name) + if n, err := cache.getNodeInfo(node.Name); err != nil { + t.Errorf("The node %v should still have a ghost entry: %v", node.Name, err) + } else if n != nil { + t.Errorf("The node object for %v should be nil", node.Name) } - // Check node is removed from nodeTree as well. + // Check node is removed from nodeTree. if cache.nodeTree.numNodes != 0 || cache.nodeTree.next() != "" { t.Errorf("unexpected cache.nodeTree after removing node: %v", node.Name) } @@ -1466,7 +1468,7 @@ func TestSchedulerCache_UpdateSnapshot(t *testing.T) { var i int // Check that cache is in the expected state. for node := cache.headNode; node != nil; node = node.next { - if node.info.Node().Name != test.expected[i].Name { + if node.info.Node() != nil && node.info.Node().Name != test.expected[i].Name { t.Errorf("unexpected node. Expected: %v, got: %v, index: %v", test.expected[i].Name, node.info.Node().Name, i) } i++ diff --git a/pkg/scheduler/internal/cache/debugger/dumper.go b/pkg/scheduler/internal/cache/debugger/dumper.go index 835be5a3d2f..63792166088 100644 --- a/pkg/scheduler/internal/cache/debugger/dumper.go +++ b/pkg/scheduler/internal/cache/debugger/dumper.go @@ -45,8 +45,8 @@ func (d *CacheDumper) DumpAll() { func (d *CacheDumper) dumpNodes() { dump := d.cache.Dump() klog.Info("Dump of cached NodeInfo") - for _, nodeInfo := range dump.Nodes { - klog.Info(d.printNodeInfo(nodeInfo)) + for name, nodeInfo := range dump.Nodes { + klog.Info(d.printNodeInfo(name, nodeInfo)) } } @@ -61,16 +61,16 @@ func (d *CacheDumper) dumpSchedulingQueue() { } // printNodeInfo writes parts of NodeInfo to a string. -func (d *CacheDumper) printNodeInfo(n *framework.NodeInfo) string { +func (d *CacheDumper) printNodeInfo(name string, n *framework.NodeInfo) string { var nodeData strings.Builder - nodeData.WriteString(fmt.Sprintf("\nNode name: %+v\nRequested Resources: %+v\nAllocatable Resources:%+v\nScheduled Pods(number: %v):\n", - n.Node().Name, n.Requested, n.Allocatable, len(n.Pods))) + nodeData.WriteString(fmt.Sprintf("\nNode name: %s\nDeleted: %t\nRequested Resources: %+v\nAllocatable Resources:%+v\nScheduled Pods(number: %v):\n", + name, n.Node() == nil, n.Requested, n.Allocatable, len(n.Pods))) // Dumping Pod Info for _, p := range n.Pods { nodeData.WriteString(printPod(p.Pod)) } // Dumping nominated pods info on the node - nominatedPods := d.podQueue.NominatedPodsForNode(n.Node().Name) + nominatedPods := d.podQueue.NominatedPodsForNode(name) if len(nominatedPods) != 0 { nodeData.WriteString(fmt.Sprintf("Nominated Pods(number: %v):\n", len(nominatedPods))) for _, p := range nominatedPods {