mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 19:56:01 +00:00
Keep track of remaining pods when a node is deleted.
The apiserver is expected to send pod deletion events that might arrive at a different time. However, sometimes a node could be recreated without its pods being deleted. Partial revert of https://github.com/kubernetes/kubernetes/pull/86964 Signed-off-by: Aldo Culquicondor <acondor@google.com> Change-Id: I51f683e5f05689b711c81ebff34e7118b5337571
This commit is contained in:
parent
16d7ecfa45
commit
dfe9e413d9
@ -625,6 +625,12 @@ func (n *NodeInfo) SetNode(node *v1.Node) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// RemoveNode removes the node object, leaving all other tracking information.
|
||||
func (n *NodeInfo) RemoveNode() {
|
||||
n.node = nil
|
||||
n.Generation = nextGeneration()
|
||||
}
|
||||
|
||||
// FilterOutPods receives a list of pods and filters out those whose node names
|
||||
// are equal to the node of this NodeInfo, but are not found in the pods of this NodeInfo.
|
||||
//
|
||||
|
45
pkg/scheduler/internal/cache/cache.go
vendored
45
pkg/scheduler/internal/cache/cache.go
vendored
@ -420,13 +420,6 @@ func (cache *schedulerCache) addPod(pod *v1.Pod) {
|
||||
|
||||
// Assumes that lock is already acquired.
|
||||
func (cache *schedulerCache) updatePod(oldPod, newPod *v1.Pod) error {
|
||||
if _, ok := cache.nodes[newPod.Spec.NodeName]; !ok {
|
||||
// The node might have been deleted already.
|
||||
// This is not a problem in the case where a pod update arrives before the
|
||||
// node creation, because we will always have a create pod event before
|
||||
// that, which will create the placeholder node item.
|
||||
return nil
|
||||
}
|
||||
if err := cache.removePod(oldPod); err != nil {
|
||||
return err
|
||||
}
|
||||
@ -435,18 +428,23 @@ func (cache *schedulerCache) updatePod(oldPod, newPod *v1.Pod) error {
|
||||
}
|
||||
|
||||
// Assumes that lock is already acquired.
|
||||
// Removes a pod from the cached node info. When a node is removed, some pod
|
||||
// deletion events might arrive later. This is not a problem, as the pods in
|
||||
// the node are assumed to be removed already.
|
||||
// Removes a pod from the cached node info. If the node information was already
|
||||
// removed and there are no more pods left in the node, cleans up the node from
|
||||
// the cache.
|
||||
func (cache *schedulerCache) removePod(pod *v1.Pod) error {
|
||||
n, ok := cache.nodes[pod.Spec.NodeName]
|
||||
if !ok {
|
||||
klog.Errorf("node %v not found when trying to remove pod %v", pod.Spec.NodeName, pod.Name)
|
||||
return nil
|
||||
}
|
||||
if err := n.info.RemovePod(pod); err != nil {
|
||||
return err
|
||||
}
|
||||
cache.moveNodeInfoToHead(pod.Spec.NodeName)
|
||||
if len(n.info.Pods) == 0 && n.info.Node() == nil {
|
||||
cache.removeNodeInfoFromList(pod.Spec.NodeName)
|
||||
} else {
|
||||
cache.moveNodeInfoToHead(pod.Spec.NodeName)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -616,21 +614,30 @@ func (cache *schedulerCache) UpdateNode(oldNode, newNode *v1.Node) error {
|
||||
return n.info.SetNode(newNode)
|
||||
}
|
||||
|
||||
// RemoveNode removes a node from the cache.
|
||||
// Some nodes might still have pods because their deletion events didn't arrive
|
||||
// yet. For most intents and purposes, those pods are removed from the cache,
|
||||
// having it's source of truth in the cached nodes.
|
||||
// However, some information on pods (assumedPods, podStates) persist. These
|
||||
// caches will be eventually consistent as pod deletion events arrive.
|
||||
// RemoveNode removes a node from the cache's tree.
|
||||
// The node might still have pods because their deletion events didn't arrive
|
||||
// yet. Those pods are considered removed from the cache, being the node tree
|
||||
// the source of truth.
|
||||
// However, we keep a ghost node with the list of pods until all pod deletion
|
||||
// events have arrived. A ghost node is skipped from snapshots.
|
||||
func (cache *schedulerCache) RemoveNode(node *v1.Node) error {
|
||||
cache.mu.Lock()
|
||||
defer cache.mu.Unlock()
|
||||
|
||||
_, ok := cache.nodes[node.Name]
|
||||
n, ok := cache.nodes[node.Name]
|
||||
if !ok {
|
||||
return fmt.Errorf("node %v is not found", node.Name)
|
||||
}
|
||||
cache.removeNodeInfoFromList(node.Name)
|
||||
n.info.RemoveNode()
|
||||
// We remove NodeInfo for this node only if there aren't any pods on this node.
|
||||
// We can't do it unconditionally, because notifications about pods are delivered
|
||||
// in a different watch, and thus can potentially be observed later, even though
|
||||
// they happened before node removal.
|
||||
if len(n.info.Pods) == 0 {
|
||||
cache.removeNodeInfoFromList(node.Name)
|
||||
} else {
|
||||
cache.moveNodeInfoToHead(node.Name)
|
||||
}
|
||||
if err := cache.nodeTree.removeNode(node); err != nil {
|
||||
return err
|
||||
}
|
||||
|
12
pkg/scheduler/internal/cache/cache_test.go
vendored
12
pkg/scheduler/internal/cache/cache_test.go
vendored
@ -268,7 +268,7 @@ func TestExpirePod(t *testing.T) {
|
||||
{pod: testPods[0], finishBind: true, assumedTime: now},
|
||||
},
|
||||
cleanupTime: now.Add(2 * ttl),
|
||||
wNodeInfo: framework.NewNodeInfo(),
|
||||
wNodeInfo: nil,
|
||||
}, { // first one would expire, second and third would not.
|
||||
pods: []*testExpirePodStruct{
|
||||
{pod: testPods[0], finishBind: true, assumedTime: now},
|
||||
@ -1142,10 +1142,12 @@ func TestNodeOperators(t *testing.T) {
|
||||
if err := cache.RemoveNode(node); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
if _, err := cache.getNodeInfo(node.Name); err == nil {
|
||||
t.Errorf("The node %v should be removed.", node.Name)
|
||||
if n, err := cache.getNodeInfo(node.Name); err != nil {
|
||||
t.Errorf("The node %v should still have a ghost entry: %v", node.Name, err)
|
||||
} else if n != nil {
|
||||
t.Errorf("The node object for %v should be nil", node.Name)
|
||||
}
|
||||
// Check node is removed from nodeTree as well.
|
||||
// Check node is removed from nodeTree.
|
||||
if cache.nodeTree.numNodes != 0 || cache.nodeTree.next() != "" {
|
||||
t.Errorf("unexpected cache.nodeTree after removing node: %v", node.Name)
|
||||
}
|
||||
@ -1466,7 +1468,7 @@ func TestSchedulerCache_UpdateSnapshot(t *testing.T) {
|
||||
var i int
|
||||
// Check that cache is in the expected state.
|
||||
for node := cache.headNode; node != nil; node = node.next {
|
||||
if node.info.Node().Name != test.expected[i].Name {
|
||||
if node.info.Node() != nil && node.info.Node().Name != test.expected[i].Name {
|
||||
t.Errorf("unexpected node. Expected: %v, got: %v, index: %v", test.expected[i].Name, node.info.Node().Name, i)
|
||||
}
|
||||
i++
|
||||
|
12
pkg/scheduler/internal/cache/debugger/dumper.go
vendored
12
pkg/scheduler/internal/cache/debugger/dumper.go
vendored
@ -45,8 +45,8 @@ func (d *CacheDumper) DumpAll() {
|
||||
func (d *CacheDumper) dumpNodes() {
|
||||
dump := d.cache.Dump()
|
||||
klog.Info("Dump of cached NodeInfo")
|
||||
for _, nodeInfo := range dump.Nodes {
|
||||
klog.Info(d.printNodeInfo(nodeInfo))
|
||||
for name, nodeInfo := range dump.Nodes {
|
||||
klog.Info(d.printNodeInfo(name, nodeInfo))
|
||||
}
|
||||
}
|
||||
|
||||
@ -61,16 +61,16 @@ func (d *CacheDumper) dumpSchedulingQueue() {
|
||||
}
|
||||
|
||||
// printNodeInfo writes parts of NodeInfo to a string.
|
||||
func (d *CacheDumper) printNodeInfo(n *framework.NodeInfo) string {
|
||||
func (d *CacheDumper) printNodeInfo(name string, n *framework.NodeInfo) string {
|
||||
var nodeData strings.Builder
|
||||
nodeData.WriteString(fmt.Sprintf("\nNode name: %+v\nRequested Resources: %+v\nAllocatable Resources:%+v\nScheduled Pods(number: %v):\n",
|
||||
n.Node().Name, n.Requested, n.Allocatable, len(n.Pods)))
|
||||
nodeData.WriteString(fmt.Sprintf("\nNode name: %s\nDeleted: %t\nRequested Resources: %+v\nAllocatable Resources:%+v\nScheduled Pods(number: %v):\n",
|
||||
name, n.Node() == nil, n.Requested, n.Allocatable, len(n.Pods)))
|
||||
// Dumping Pod Info
|
||||
for _, p := range n.Pods {
|
||||
nodeData.WriteString(printPod(p.Pod))
|
||||
}
|
||||
// Dumping nominated pods info on the node
|
||||
nominatedPods := d.podQueue.NominatedPodsForNode(n.Node().Name)
|
||||
nominatedPods := d.podQueue.NominatedPodsForNode(name)
|
||||
if len(nominatedPods) != 0 {
|
||||
nodeData.WriteString(fmt.Sprintf("Nominated Pods(number: %v):\n", len(nominatedPods)))
|
||||
for _, p := range nominatedPods {
|
||||
|
Loading…
Reference in New Issue
Block a user