Merge pull request #93938 from alculquicondor/revert-node-delete

Keep track of remaining pods when a node is deleted
This commit is contained in:
Kubernetes Prow Robot 2020-08-13 13:25:41 -07:00 committed by GitHub
commit 3647766cbc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 72 additions and 78 deletions

View File

@ -84,7 +84,6 @@ go_test(
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library",

View File

@ -625,6 +625,12 @@ func (n *NodeInfo) SetNode(node *v1.Node) error {
return nil return nil
} }
// RemoveNode removes the node object, leaving all other tracking information.
func (n *NodeInfo) RemoveNode() {
n.node = nil
n.Generation = nextGeneration()
}
// FilterOutPods receives a list of pods and filters out those whose node names // FilterOutPods receives a list of pods and filters out those whose node names
// are equal to the node of this NodeInfo, but are not found in the pods of this NodeInfo. // are equal to the node of this NodeInfo, but are not found in the pods of this NodeInfo.
// //

View File

@ -16,7 +16,6 @@ go_library(
"//pkg/scheduler/metrics:go_default_library", "//pkg/scheduler/metrics:go_default_library",
"//pkg/util/node:go_default_library", "//pkg/util/node:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",

View File

@ -22,7 +22,6 @@ import (
"time" "time"
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature" utilfeature "k8s.io/apiserver/pkg/util/feature"
@ -315,7 +314,9 @@ func (cache *schedulerCache) removeDeletedNodesFromSnapshot(snapshot *Snapshot)
} }
} }
func (cache *schedulerCache) ListPods(selector labels.Selector) ([]*v1.Pod, error) { // PodCount returns the number of pods in the cache (including those from deleted nodes).
// DO NOT use outside of tests.
func (cache *schedulerCache) PodCount() (int, error) {
cache.mu.RLock() cache.mu.RLock()
defer cache.mu.RUnlock() defer cache.mu.RUnlock()
// podFilter is expected to return true for most or all of the pods. We // podFilter is expected to return true for most or all of the pods. We
@ -325,15 +326,11 @@ func (cache *schedulerCache) ListPods(selector labels.Selector) ([]*v1.Pod, erro
for _, n := range cache.nodes { for _, n := range cache.nodes {
maxSize += len(n.info.Pods) maxSize += len(n.info.Pods)
} }
pods := make([]*v1.Pod, 0, maxSize) count := 0
for _, n := range cache.nodes { for _, n := range cache.nodes {
for _, p := range n.info.Pods { count += len(n.info.Pods)
if selector.Matches(labels.Set(p.Pod.Labels)) {
pods = append(pods, p.Pod)
}
}
} }
return pods, nil return count, nil
} }
func (cache *schedulerCache) AssumePod(pod *v1.Pod) error { func (cache *schedulerCache) AssumePod(pod *v1.Pod) error {
@ -423,13 +420,6 @@ func (cache *schedulerCache) addPod(pod *v1.Pod) {
// Assumes that lock is already acquired. // Assumes that lock is already acquired.
func (cache *schedulerCache) updatePod(oldPod, newPod *v1.Pod) error { func (cache *schedulerCache) updatePod(oldPod, newPod *v1.Pod) error {
if _, ok := cache.nodes[newPod.Spec.NodeName]; !ok {
// The node might have been deleted already.
// This is not a problem in the case where a pod update arrives before the
// node creation, because we will always have a create pod event before
// that, which will create the placeholder node item.
return nil
}
if err := cache.removePod(oldPod); err != nil { if err := cache.removePod(oldPod); err != nil {
return err return err
} }
@ -438,18 +428,23 @@ func (cache *schedulerCache) updatePod(oldPod, newPod *v1.Pod) error {
} }
// Assumes that lock is already acquired. // Assumes that lock is already acquired.
// Removes a pod from the cached node info. When a node is removed, some pod // Removes a pod from the cached node info. If the node information was already
// deletion events might arrive later. This is not a problem, as the pods in // removed and there are no more pods left in the node, cleans up the node from
// the node are assumed to be removed already. // the cache.
func (cache *schedulerCache) removePod(pod *v1.Pod) error { func (cache *schedulerCache) removePod(pod *v1.Pod) error {
n, ok := cache.nodes[pod.Spec.NodeName] n, ok := cache.nodes[pod.Spec.NodeName]
if !ok { if !ok {
klog.Errorf("node %v not found when trying to remove pod %v", pod.Spec.NodeName, pod.Name)
return nil return nil
} }
if err := n.info.RemovePod(pod); err != nil { if err := n.info.RemovePod(pod); err != nil {
return err return err
} }
cache.moveNodeInfoToHead(pod.Spec.NodeName) if len(n.info.Pods) == 0 && n.info.Node() == nil {
cache.removeNodeInfoFromList(pod.Spec.NodeName)
} else {
cache.moveNodeInfoToHead(pod.Spec.NodeName)
}
return nil return nil
} }
@ -619,21 +614,30 @@ func (cache *schedulerCache) UpdateNode(oldNode, newNode *v1.Node) error {
return n.info.SetNode(newNode) return n.info.SetNode(newNode)
} }
// RemoveNode removes a node from the cache. // RemoveNode removes a node from the cache's tree.
// Some nodes might still have pods because their deletion events didn't arrive // The node might still have pods because their deletion events didn't arrive
// yet. For most intents and purposes, those pods are removed from the cache, // yet. Those pods are considered removed from the cache, being the node tree
// having it's source of truth in the cached nodes. // the source of truth.
// However, some information on pods (assumedPods, podStates) persist. These // However, we keep a ghost node with the list of pods until all pod deletion
// caches will be eventually consistent as pod deletion events arrive. // events have arrived. A ghost node is skipped from snapshots.
func (cache *schedulerCache) RemoveNode(node *v1.Node) error { func (cache *schedulerCache) RemoveNode(node *v1.Node) error {
cache.mu.Lock() cache.mu.Lock()
defer cache.mu.Unlock() defer cache.mu.Unlock()
_, ok := cache.nodes[node.Name] n, ok := cache.nodes[node.Name]
if !ok { if !ok {
return fmt.Errorf("node %v is not found", node.Name) return fmt.Errorf("node %v is not found", node.Name)
} }
cache.removeNodeInfoFromList(node.Name) n.info.RemoveNode()
// We remove NodeInfo for this node only if there aren't any pods on this node.
// We can't do it unconditionally, because notifications about pods are delivered
// in a different watch, and thus can potentially be observed later, even though
// they happened before node removal.
if len(n.info.Pods) == 0 {
cache.removeNodeInfoFromList(node.Name)
} else {
cache.moveNodeInfoToHead(node.Name)
}
if err := cache.nodeTree.removeNode(node); err != nil { if err := cache.nodeTree.removeNode(node); err != nil {
return err return err
} }
@ -736,19 +740,6 @@ func (cache *schedulerCache) expirePod(key string, ps *podState) error {
return nil return nil
} }
// GetNodeInfo returns cached data for the node name.
func (cache *schedulerCache) GetNodeInfo(nodeName string) (*v1.Node, error) {
cache.mu.RLock()
defer cache.mu.RUnlock()
n, ok := cache.nodes[nodeName]
if !ok {
return nil, fmt.Errorf("node %q not found in cache", nodeName)
}
return n.info.Node(), nil
}
// updateMetrics updates cache size metric values for pods, assumed pods, and nodes // updateMetrics updates cache size metric values for pods, assumed pods, and nodes
func (cache *schedulerCache) updateMetrics() { func (cache *schedulerCache) updateMetrics() {
metrics.CacheSize.WithLabelValues("assumed_pods").Set(float64(len(cache.assumedPods))) metrics.CacheSize.WithLabelValues("assumed_pods").Set(float64(len(cache.assumedPods)))

View File

@ -268,7 +268,7 @@ func TestExpirePod(t *testing.T) {
{pod: testPods[0], finishBind: true, assumedTime: now}, {pod: testPods[0], finishBind: true, assumedTime: now},
}, },
cleanupTime: now.Add(2 * ttl), cleanupTime: now.Add(2 * ttl),
wNodeInfo: framework.NewNodeInfo(), wNodeInfo: nil,
}, { // first one would expire, second and third would not. }, { // first one would expire, second and third would not.
pods: []*testExpirePodStruct{ pods: []*testExpirePodStruct{
{pod: testPods[0], finishBind: true, assumedTime: now}, {pod: testPods[0], finishBind: true, assumedTime: now},
@ -1142,10 +1142,12 @@ func TestNodeOperators(t *testing.T) {
if err := cache.RemoveNode(node); err != nil { if err := cache.RemoveNode(node); err != nil {
t.Error(err) t.Error(err)
} }
if _, err := cache.GetNodeInfo(node.Name); err == nil { if n, err := cache.getNodeInfo(node.Name); err != nil {
t.Errorf("The node %v should be removed.", node.Name) t.Errorf("The node %v should still have a ghost entry: %v", node.Name, err)
} else if n != nil {
t.Errorf("The node object for %v should be nil", node.Name)
} }
// Check node is removed from nodeTree as well. // Check node is removed from nodeTree.
if cache.nodeTree.numNodes != 0 || cache.nodeTree.next() != "" { if cache.nodeTree.numNodes != 0 || cache.nodeTree.next() != "" {
t.Errorf("unexpected cache.nodeTree after removing node: %v", node.Name) t.Errorf("unexpected cache.nodeTree after removing node: %v", node.Name)
} }
@ -1466,7 +1468,7 @@ func TestSchedulerCache_UpdateSnapshot(t *testing.T) {
var i int var i int
// Check that cache is in the expected state. // Check that cache is in the expected state.
for node := cache.headNode; node != nil; node = node.next { for node := cache.headNode; node != nil; node = node.next {
if node.info.Node().Name != test.expected[i].Name { if node.info.Node() != nil && node.info.Node().Name != test.expected[i].Name {
t.Errorf("unexpected node. Expected: %v, got: %v, index: %v", test.expected[i].Name, node.info.Node().Name, i) t.Errorf("unexpected node. Expected: %v, got: %v, index: %v", test.expected[i].Name, node.info.Node().Name, i)
} }
i++ i++
@ -1798,3 +1800,16 @@ func isForgottenFromCache(p *v1.Pod, c *schedulerCache) error {
} }
return nil return nil
} }
// getNodeInfo returns cached data for the node name.
func (cache *schedulerCache) getNodeInfo(nodeName string) (*v1.Node, error) {
cache.mu.RLock()
defer cache.mu.RUnlock()
n, ok := cache.nodes[nodeName]
if !ok {
return nil, fmt.Errorf("node %q not found in cache", nodeName)
}
return n.info.Node(), nil
}

View File

@ -45,8 +45,8 @@ func (d *CacheDumper) DumpAll() {
func (d *CacheDumper) dumpNodes() { func (d *CacheDumper) dumpNodes() {
dump := d.cache.Dump() dump := d.cache.Dump()
klog.Info("Dump of cached NodeInfo") klog.Info("Dump of cached NodeInfo")
for _, nodeInfo := range dump.Nodes { for name, nodeInfo := range dump.Nodes {
klog.Info(d.printNodeInfo(nodeInfo)) klog.Info(d.printNodeInfo(name, nodeInfo))
} }
} }
@ -61,16 +61,16 @@ func (d *CacheDumper) dumpSchedulingQueue() {
} }
// printNodeInfo writes parts of NodeInfo to a string. // printNodeInfo writes parts of NodeInfo to a string.
func (d *CacheDumper) printNodeInfo(n *framework.NodeInfo) string { func (d *CacheDumper) printNodeInfo(name string, n *framework.NodeInfo) string {
var nodeData strings.Builder var nodeData strings.Builder
nodeData.WriteString(fmt.Sprintf("\nNode name: %+v\nRequested Resources: %+v\nAllocatable Resources:%+v\nScheduled Pods(number: %v):\n", nodeData.WriteString(fmt.Sprintf("\nNode name: %s\nDeleted: %t\nRequested Resources: %+v\nAllocatable Resources:%+v\nScheduled Pods(number: %v):\n",
n.Node().Name, n.Requested, n.Allocatable, len(n.Pods))) name, n.Node() == nil, n.Requested, n.Allocatable, len(n.Pods)))
// Dumping Pod Info // Dumping Pod Info
for _, p := range n.Pods { for _, p := range n.Pods {
nodeData.WriteString(printPod(p.Pod)) nodeData.WriteString(printPod(p.Pod))
} }
// Dumping nominated pods info on the node // Dumping nominated pods info on the node
nominatedPods := d.podQueue.NominatedPodsForNode(n.Node().Name) nominatedPods := d.podQueue.NominatedPodsForNode(name)
if len(nominatedPods) != 0 { if len(nominatedPods) != 0 {
nodeData.WriteString(fmt.Sprintf("Nominated Pods(number: %v):\n", len(nominatedPods))) nodeData.WriteString(fmt.Sprintf("Nominated Pods(number: %v):\n", len(nominatedPods)))
for _, p := range nominatedPods { for _, p := range nominatedPods {

View File

@ -8,7 +8,6 @@ go_library(
deps = [ deps = [
"//pkg/scheduler/internal/cache:go_default_library", "//pkg/scheduler/internal/cache:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
], ],
) )

View File

@ -18,7 +18,6 @@ package fake
import ( import (
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache" internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
) )
@ -78,20 +77,10 @@ func (c *Cache) UpdateSnapshot(snapshot *internalcache.Snapshot) error {
return nil return nil
} }
// ListPods is a fake method for testing. // PodCount is a fake method for testing.
func (c *Cache) ListPods(s labels.Selector) ([]*v1.Pod, error) { return nil, nil } func (c *Cache) PodCount() (int, error) { return 0, nil }
// Dump is a fake method for testing. // Dump is a fake method for testing.
func (c *Cache) Dump() *internalcache.Dump { func (c *Cache) Dump() *internalcache.Dump {
return &internalcache.Dump{} return &internalcache.Dump{}
} }
// GetNodeInfo is a fake method for testing.
func (c *Cache) GetNodeInfo(nodeName string) (*v1.Node, error) {
return nil, nil
}
// ListNodes is a fake method for testing.
func (c *Cache) ListNodes() []*v1.Node {
return nil
}

View File

@ -18,7 +18,6 @@ package cache
import ( import (
"k8s.io/api/core/v1" "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
) )
@ -57,8 +56,8 @@ import (
// - Both "Expired" and "Deleted" are valid end states. In case of some problems, e.g. network issue, // - Both "Expired" and "Deleted" are valid end states. In case of some problems, e.g. network issue,
// a pod might have changed its state (e.g. added and deleted) without delivering notification to the cache. // a pod might have changed its state (e.g. added and deleted) without delivering notification to the cache.
type Cache interface { type Cache interface {
// ListPods lists all pods in the cache. // PodCount returns the number of pods in the cache (including those from deleted nodes).
ListPods(selector labels.Selector) ([]*v1.Pod, error) PodCount() (int, error)
// AssumePod assumes a pod scheduled and aggregates the pod's information into its node. // AssumePod assumes a pod scheduled and aggregates the pod's information into its node.
// The implementation also decides the policy to expire pod before being confirmed (receiving Add event). // The implementation also decides the policy to expire pod before being confirmed (receiving Add event).

View File

@ -36,7 +36,6 @@ import (
eventsv1 "k8s.io/api/events/v1" eventsv1 "k8s.io/api/events/v1"
"k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/sets"
@ -570,12 +569,12 @@ func TestSchedulerNoPhantomPodAfterExpire(t *testing.T) {
return return
default: default:
} }
pods, err := scache.ListPods(labels.Everything()) pods, err := scache.PodCount()
if err != nil { if err != nil {
errChan <- fmt.Errorf("cache.List failed: %v", err) errChan <- fmt.Errorf("cache.List failed: %v", err)
return return
} }
if len(pods) == 0 { if pods == 0 {
close(waitPodExpireChan) close(waitPodExpireChan)
return return
} }

View File

@ -96,7 +96,6 @@ go_library(
"//staging/src/k8s.io/api/policy/v1beta1:go_default_library", "//staging/src/k8s.io/api/policy/v1beta1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/client-go/discovery/cached/memory:go_default_library", "//staging/src/k8s.io/client-go/discovery/cached/memory:go_default_library",
"//staging/src/k8s.io/client-go/dynamic:go_default_library", "//staging/src/k8s.io/client-go/dynamic:go_default_library",

View File

@ -26,7 +26,6 @@ import (
policy "k8s.io/api/policy/v1beta1" policy "k8s.io/api/policy/v1beta1"
apierrors "k8s.io/apimachinery/pkg/api/errors" apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
cacheddiscovery "k8s.io/client-go/discovery/cached/memory" cacheddiscovery "k8s.io/client-go/discovery/cached/memory"
"k8s.io/client-go/dynamic" "k8s.io/client-go/dynamic"
@ -401,11 +400,11 @@ func waitForPDBsStable(testCtx *testutils.TestContext, pdbs []*policy.PodDisrupt
// waitCachedPodsStable waits until scheduler cache has the given pods. // waitCachedPodsStable waits until scheduler cache has the given pods.
func waitCachedPodsStable(testCtx *testutils.TestContext, pods []*v1.Pod) error { func waitCachedPodsStable(testCtx *testutils.TestContext, pods []*v1.Pod) error {
return wait.Poll(time.Second, 30*time.Second, func() (bool, error) { return wait.Poll(time.Second, 30*time.Second, func() (bool, error) {
cachedPods, err := testCtx.Scheduler.SchedulerCache.ListPods(labels.Everything()) cachedPods, err := testCtx.Scheduler.SchedulerCache.PodCount()
if err != nil { if err != nil {
return false, err return false, err
} }
if len(pods) != len(cachedPods) { if len(pods) != cachedPods {
return false, nil return false, nil
} }
for _, p := range pods { for _, p := range pods {