mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-24 20:24:09 +00:00
Merge pull request #22336 from cjcullen/evict
Auto commit by PR queue bot
This commit is contained in:
commit
dc46ae031d
@ -119,7 +119,8 @@ type NodeController struct {
|
|||||||
daemonSetController *framework.Controller
|
daemonSetController *framework.Controller
|
||||||
daemonSetStore cache.StoreToDaemonSetLister
|
daemonSetStore cache.StoreToDaemonSetLister
|
||||||
|
|
||||||
forcefullyDeletePod func(*api.Pod) error
|
forcefullyDeletePod func(*api.Pod) error
|
||||||
|
nodeExistsInCloudProvider func(string) (bool, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewNodeController returns a new node controller to sync instances from cloudprovider.
|
// NewNodeController returns a new node controller to sync instances from cloudprovider.
|
||||||
@ -149,24 +150,25 @@ func NewNodeController(
|
|||||||
evictorLock := sync.Mutex{}
|
evictorLock := sync.Mutex{}
|
||||||
|
|
||||||
nc := &NodeController{
|
nc := &NodeController{
|
||||||
cloud: cloud,
|
cloud: cloud,
|
||||||
knownNodeSet: make(sets.String),
|
knownNodeSet: make(sets.String),
|
||||||
kubeClient: kubeClient,
|
kubeClient: kubeClient,
|
||||||
recorder: recorder,
|
recorder: recorder,
|
||||||
podEvictionTimeout: podEvictionTimeout,
|
podEvictionTimeout: podEvictionTimeout,
|
||||||
maximumGracePeriod: 5 * time.Minute,
|
maximumGracePeriod: 5 * time.Minute,
|
||||||
evictorLock: &evictorLock,
|
evictorLock: &evictorLock,
|
||||||
podEvictor: NewRateLimitedTimedQueue(deletionEvictionLimiter),
|
podEvictor: NewRateLimitedTimedQueue(deletionEvictionLimiter),
|
||||||
terminationEvictor: NewRateLimitedTimedQueue(terminationEvictionLimiter),
|
terminationEvictor: NewRateLimitedTimedQueue(terminationEvictionLimiter),
|
||||||
nodeStatusMap: make(map[string]nodeStatusData),
|
nodeStatusMap: make(map[string]nodeStatusData),
|
||||||
nodeMonitorGracePeriod: nodeMonitorGracePeriod,
|
nodeMonitorGracePeriod: nodeMonitorGracePeriod,
|
||||||
nodeMonitorPeriod: nodeMonitorPeriod,
|
nodeMonitorPeriod: nodeMonitorPeriod,
|
||||||
nodeStartupGracePeriod: nodeStartupGracePeriod,
|
nodeStartupGracePeriod: nodeStartupGracePeriod,
|
||||||
lookupIP: net.LookupIP,
|
lookupIP: net.LookupIP,
|
||||||
now: unversioned.Now,
|
now: unversioned.Now,
|
||||||
clusterCIDR: clusterCIDR,
|
clusterCIDR: clusterCIDR,
|
||||||
allocateNodeCIDRs: allocateNodeCIDRs,
|
allocateNodeCIDRs: allocateNodeCIDRs,
|
||||||
forcefullyDeletePod: func(p *api.Pod) error { return forcefullyDeletePod(kubeClient, p) },
|
forcefullyDeletePod: func(p *api.Pod) error { return forcefullyDeletePod(kubeClient, p) },
|
||||||
|
nodeExistsInCloudProvider: func(nodeName string) (bool, error) { return nodeExistsInCloudProvider(cloud, nodeName) },
|
||||||
}
|
}
|
||||||
|
|
||||||
nc.podStore.Store, nc.podController = framework.NewInformer(
|
nc.podStore.Store, nc.podController = framework.NewInformer(
|
||||||
@ -466,33 +468,26 @@ func (nc *NodeController) monitorNodeStatus() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Check with the cloud provider to see if the node still exists. If it
|
// Check with the cloud provider to see if the node still exists. If it
|
||||||
// doesn't, delete the node and all pods scheduled on the node.
|
// doesn't, delete the node immediately.
|
||||||
if readyCondition.Status != api.ConditionTrue && nc.cloud != nil {
|
if readyCondition.Status != api.ConditionTrue && nc.cloud != nil {
|
||||||
instances, ok := nc.cloud.Instances()
|
exists, err := nc.nodeExistsInCloudProvider(node.Name)
|
||||||
if !ok {
|
if err != nil {
|
||||||
glog.Errorf("%v", ErrCloudInstance)
|
glog.Errorf("Error determining if node %v exists in cloud: %v", node.Name, err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if _, err := instances.ExternalID(node.Name); err != nil && err == cloudprovider.InstanceNotFound {
|
if !exists {
|
||||||
glog.Infof("Deleting node (no longer present in cloud provider): %s", node.Name)
|
glog.Infof("Deleting node (no longer present in cloud provider): %s", node.Name)
|
||||||
nc.recordNodeEvent(node.Name, api.EventTypeNormal, "DeletingNode", fmt.Sprintf("Deleting Node %v because it's not present according to cloud provider", node.Name))
|
nc.recordNodeEvent(node.Name, api.EventTypeNormal, "DeletingNode", fmt.Sprintf("Deleting Node %v because it's not present according to cloud provider", node.Name))
|
||||||
|
go func(nodeName string) {
|
||||||
remaining, err := nc.hasPods(node.Name)
|
defer utilruntime.HandleCrash()
|
||||||
if err != nil {
|
// Kubelet is not reporting and Cloud Provider says node
|
||||||
glog.Errorf("Unable to determine whether node %s has pods, will retry: %v", node.Name, err)
|
// is gone. Delete it without worrying about grace
|
||||||
continue
|
// periods.
|
||||||
}
|
if err := nc.forcefullyDeleteNode(nodeName); err != nil {
|
||||||
if remaining {
|
glog.Errorf("Unable to forcefully delete node %q: %v", nodeName, err)
|
||||||
// queue eviction of the pods on the node
|
}
|
||||||
glog.V(2).Infof("Deleting node %s is delayed while pods are evicted", node.Name)
|
}(node.Name)
|
||||||
nc.evictPods(node.Name)
|
continue
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := nc.kubeClient.Core().Nodes().Delete(node.Name, nil); err != nil {
|
|
||||||
glog.Errorf("Unable to delete node %s: %v", node.Name, err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -500,6 +495,43 @@ func (nc *NodeController) monitorNodeStatus() error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func nodeExistsInCloudProvider(cloud cloudprovider.Interface, nodeName string) (bool, error) {
|
||||||
|
instances, ok := cloud.Instances()
|
||||||
|
if !ok {
|
||||||
|
return false, fmt.Errorf("%v", ErrCloudInstance)
|
||||||
|
}
|
||||||
|
if _, err := instances.ExternalID(nodeName); err != nil {
|
||||||
|
if err == cloudprovider.InstanceNotFound {
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// forcefullyDeleteNode immediately deletes all pods on the node, and then
|
||||||
|
// deletes the node itself.
|
||||||
|
func (nc *NodeController) forcefullyDeleteNode(nodeName string) error {
|
||||||
|
selector := fields.OneTermEqualSelector(api.PodHostField, nodeName)
|
||||||
|
options := api.ListOptions{FieldSelector: selector}
|
||||||
|
pods, err := nc.kubeClient.Core().Pods(api.NamespaceAll).List(options)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("unable to list pods on node %q: %v", nodeName, err)
|
||||||
|
}
|
||||||
|
for _, pod := range pods.Items {
|
||||||
|
if pod.Spec.NodeName != nodeName {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if err := nc.forcefullyDeletePod(&pod); err != nil {
|
||||||
|
return fmt.Errorf("unable to delete pod %q on node %q: %v", pod.Name, nodeName, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if err := nc.kubeClient.Core().Nodes().Delete(nodeName, nil); err != nil {
|
||||||
|
return fmt.Errorf("unable to delete node %q: %v", nodeName, err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// reconcileNodeCIDRs looks at each node and assigns it a valid CIDR
|
// reconcileNodeCIDRs looks at each node and assigns it a valid CIDR
|
||||||
// if it doesn't currently have one.
|
// if it doesn't currently have one.
|
||||||
func (nc *NodeController) reconcileNodeCIDRs(nodes *api.NodeList) {
|
func (nc *NodeController) reconcileNodeCIDRs(nodes *api.NodeList) {
|
||||||
|
@ -30,7 +30,9 @@ import (
|
|||||||
"k8s.io/kubernetes/pkg/client/cache"
|
"k8s.io/kubernetes/pkg/client/cache"
|
||||||
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake"
|
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake"
|
||||||
unversionedcore "k8s.io/kubernetes/pkg/client/typed/generated/core/unversioned"
|
unversionedcore "k8s.io/kubernetes/pkg/client/typed/generated/core/unversioned"
|
||||||
|
fakecloud "k8s.io/kubernetes/pkg/cloudprovider/providers/fake"
|
||||||
"k8s.io/kubernetes/pkg/util"
|
"k8s.io/kubernetes/pkg/util"
|
||||||
|
"k8s.io/kubernetes/pkg/util/wait"
|
||||||
"k8s.io/kubernetes/pkg/watch"
|
"k8s.io/kubernetes/pkg/watch"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -59,7 +61,8 @@ type FakeNodeHandler struct {
|
|||||||
RequestCount int
|
RequestCount int
|
||||||
|
|
||||||
// Synchronization
|
// Synchronization
|
||||||
createLock sync.Mutex
|
createLock sync.Mutex
|
||||||
|
deleteWaitChan chan struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
type FakeLegacyHandler struct {
|
type FakeLegacyHandler struct {
|
||||||
@ -125,6 +128,11 @@ func (m *FakeNodeHandler) List(opts api.ListOptions) (*api.NodeList, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (m *FakeNodeHandler) Delete(id string, opt *api.DeleteOptions) error {
|
func (m *FakeNodeHandler) Delete(id string, opt *api.DeleteOptions) error {
|
||||||
|
defer func() {
|
||||||
|
if m.deleteWaitChan != nil {
|
||||||
|
m.deleteWaitChan <- struct{}{}
|
||||||
|
}
|
||||||
|
}()
|
||||||
m.DeletedNodes = append(m.DeletedNodes, newNode(id))
|
m.DeletedNodes = append(m.DeletedNodes, newNode(id))
|
||||||
m.RequestCount++
|
m.RequestCount++
|
||||||
return nil
|
return nil
|
||||||
@ -451,6 +459,58 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TestCloudProviderNoRateLimit tests that monitorNodes() immediately deletes
|
||||||
|
// pods and the node when kubelet has not reported, and the cloudprovider says
|
||||||
|
// the node is gone.
|
||||||
|
func TestCloudProviderNoRateLimit(t *testing.T) {
|
||||||
|
fnh := &FakeNodeHandler{
|
||||||
|
Existing: []*api.Node{
|
||||||
|
{
|
||||||
|
ObjectMeta: api.ObjectMeta{
|
||||||
|
Name: "node0",
|
||||||
|
CreationTimestamp: unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
|
||||||
|
},
|
||||||
|
Status: api.NodeStatus{
|
||||||
|
Conditions: []api.NodeCondition{
|
||||||
|
{
|
||||||
|
Type: api.NodeReady,
|
||||||
|
Status: api.ConditionUnknown,
|
||||||
|
LastHeartbeatTime: unversioned.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
|
||||||
|
LastTransitionTime: unversioned.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Clientset: fake.NewSimpleClientset(&api.PodList{Items: []api.Pod{*newPod("pod0", "node0"), *newPod("pod1", "node0")}}),
|
||||||
|
deleteWaitChan: make(chan struct{}),
|
||||||
|
}
|
||||||
|
nodeController := NewNodeController(nil, fnh, 10*time.Minute,
|
||||||
|
util.NewFakeAlwaysRateLimiter(), util.NewFakeAlwaysRateLimiter(),
|
||||||
|
testNodeMonitorGracePeriod, testNodeStartupGracePeriod,
|
||||||
|
testNodeMonitorPeriod, nil, false)
|
||||||
|
nodeController.cloud = &fakecloud.FakeCloud{}
|
||||||
|
nodeController.now = func() unversioned.Time { return unversioned.Date(2016, 1, 1, 12, 0, 0, 0, time.UTC) }
|
||||||
|
nodeController.nodeExistsInCloudProvider = func(nodeName string) (bool, error) {
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
// monitorNodeStatus should allow this node to be immediately deleted
|
||||||
|
if err := nodeController.monitorNodeStatus(); err != nil {
|
||||||
|
t.Errorf("unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
select {
|
||||||
|
case <-fnh.deleteWaitChan:
|
||||||
|
case <-time.After(wait.ForeverTestTimeout):
|
||||||
|
t.Errorf("Timed out waiting %v for node to be deleted", wait.ForeverTestTimeout)
|
||||||
|
}
|
||||||
|
if len(fnh.DeletedNodes) != 1 || fnh.DeletedNodes[0].Name != "node0" {
|
||||||
|
t.Errorf("Node was not deleted")
|
||||||
|
}
|
||||||
|
if nodeOnQueue := nodeController.podEvictor.Remove("node0"); nodeOnQueue {
|
||||||
|
t.Errorf("Node was queued for eviction. Should have been immediately deleted.")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestMonitorNodeStatusUpdateStatus(t *testing.T) {
|
func TestMonitorNodeStatusUpdateStatus(t *testing.T) {
|
||||||
fakeNow := unversioned.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC)
|
fakeNow := unversioned.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC)
|
||||||
table := []struct {
|
table := []struct {
|
||||||
|
Loading…
Reference in New Issue
Block a user