mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-24 12:15:52 +00:00
NodeController small cleanup
This commit is contained in:
parent
908b1e08f1
commit
2201e75666
@ -196,7 +196,7 @@ func startComponents(firstManifestURL, secondManifestURL, apiVersion string) (st
|
|||||||
// TODO: Write an integration test for the replication controllers watch.
|
// TODO: Write an integration test for the replication controllers watch.
|
||||||
go controllerManager.Run(3, util.NeverStop)
|
go controllerManager.Run(3, util.NeverStop)
|
||||||
|
|
||||||
nodeController := nodecontroller.NewNodeController(nil, cl, 10, 5*time.Minute, nodecontroller.NewPodEvictor(util.NewFakeRateLimiter()),
|
nodeController := nodecontroller.NewNodeController(nil, cl, 5*time.Minute, nodecontroller.NewPodEvictor(util.NewFakeRateLimiter()),
|
||||||
40*time.Second, 60*time.Second, 5*time.Second, nil, false)
|
40*time.Second, 60*time.Second, 5*time.Second, nil, false)
|
||||||
nodeController.Run(5 * time.Second)
|
nodeController.Run(5 * time.Second)
|
||||||
cadvisorInterface := new(cadvisor.Fake)
|
cadvisorInterface := new(cadvisor.Fake)
|
||||||
|
@ -194,7 +194,7 @@ func (s *CMServer) Run(_ []string) error {
|
|||||||
glog.Fatalf("Cloud provider could not be initialized: %v", err)
|
glog.Fatalf("Cloud provider could not be initialized: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
nodeController := nodecontroller.NewNodeController(cloud, kubeClient, s.RegisterRetryCount,
|
nodeController := nodecontroller.NewNodeController(cloud, kubeClient,
|
||||||
s.PodEvictionTimeout, nodecontroller.NewPodEvictor(util.NewTokenBucketRateLimiter(s.DeletingPodsQps, s.DeletingPodsBurst)),
|
s.PodEvictionTimeout, nodecontroller.NewPodEvictor(util.NewTokenBucketRateLimiter(s.DeletingPodsQps, s.DeletingPodsBurst)),
|
||||||
s.NodeMonitorGracePeriod, s.NodeStartupGracePeriod, s.NodeMonitorPeriod, (*net.IPNet)(&s.ClusterCIDR), s.AllocateNodeCIDRs)
|
s.NodeMonitorGracePeriod, s.NodeStartupGracePeriod, s.NodeMonitorPeriod, (*net.IPNet)(&s.ClusterCIDR), s.AllocateNodeCIDRs)
|
||||||
nodeController.Run(s.NodeSyncPeriod)
|
nodeController.Run(s.NodeSyncPeriod)
|
||||||
|
@ -131,7 +131,7 @@ func runControllerManager(cl *client.Client) {
|
|||||||
const serviceSyncPeriod = 5 * time.Minute
|
const serviceSyncPeriod = 5 * time.Minute
|
||||||
const nodeSyncPeriod = 10 * time.Second
|
const nodeSyncPeriod = 10 * time.Second
|
||||||
nodeController := nodecontroller.NewNodeController(
|
nodeController := nodecontroller.NewNodeController(
|
||||||
nil, cl, 10, 5*time.Minute, nodecontroller.NewPodEvictor(util.NewTokenBucketRateLimiter(*deletingPodsQps, *deletingPodsBurst)),
|
nil, cl, 5*time.Minute, nodecontroller.NewPodEvictor(util.NewTokenBucketRateLimiter(*deletingPodsQps, *deletingPodsBurst)),
|
||||||
40*time.Second, 60*time.Second, 5*time.Second, nil, false)
|
40*time.Second, 60*time.Second, 5*time.Second, nil, false)
|
||||||
nodeController.Run(nodeSyncPeriod)
|
nodeController.Run(nodeSyncPeriod)
|
||||||
|
|
||||||
|
@ -122,7 +122,7 @@ func (s *CMServer) Run(_ []string) error {
|
|||||||
glog.Fatalf("Cloud provider could not be initialized: %v", err)
|
glog.Fatalf("Cloud provider could not be initialized: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
nodeController := nodecontroller.NewNodeController(cloud, kubeClient, s.RegisterRetryCount,
|
nodeController := nodecontroller.NewNodeController(cloud, kubeClient,
|
||||||
s.PodEvictionTimeout, nodecontroller.NewPodEvictor(util.NewTokenBucketRateLimiter(s.DeletingPodsQps, s.DeletingPodsBurst)),
|
s.PodEvictionTimeout, nodecontroller.NewPodEvictor(util.NewTokenBucketRateLimiter(s.DeletingPodsQps, s.DeletingPodsBurst)),
|
||||||
s.NodeMonitorGracePeriod, s.NodeStartupGracePeriod, s.NodeMonitorPeriod, (*net.IPNet)(&s.ClusterCIDR), s.AllocateNodeCIDRs)
|
s.NodeMonitorGracePeriod, s.NodeStartupGracePeriod, s.NodeMonitorPeriod, (*net.IPNet)(&s.ClusterCIDR), s.AllocateNodeCIDRs)
|
||||||
nodeController.Run(s.NodeSyncPeriod)
|
nodeController.Run(s.NodeSyncPeriod)
|
||||||
|
@ -73,7 +73,6 @@ controller, and serviceaccounts controller.
|
|||||||
--port=0: The port that the controller-manager's http service runs on
|
--port=0: The port that the controller-manager's http service runs on
|
||||||
--profiling=true: Enable profiling via web interface host:port/debug/pprof/
|
--profiling=true: Enable profiling via web interface host:port/debug/pprof/
|
||||||
--pvclaimbinder-sync-period=0: The period for syncing persistent volumes and persistent volume claims
|
--pvclaimbinder-sync-period=0: The period for syncing persistent volumes and persistent volume claims
|
||||||
--register-retry-count=0: The number of retries for initial node registration. Retry interval equals node-sync-period.
|
|
||||||
--resource-quota-sync-period=0: The period for syncing quota usage status in the system
|
--resource-quota-sync-period=0: The period for syncing quota usage status in the system
|
||||||
--root-ca-file="": If set, this root certificate authority will be included in service account's token secret. This must be a valid PEM-encoded CA bundle.
|
--root-ca-file="": If set, this root certificate authority will be included in service account's token secret. This must be a valid PEM-encoded CA bundle.
|
||||||
--service-account-private-key-file="": Filename containing a PEM-encoded private RSA key used to sign service account tokens.
|
--service-account-private-key-file="": Filename containing a PEM-encoded private RSA key used to sign service account tokens.
|
||||||
|
@ -53,19 +53,13 @@ type nodeStatusData struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type NodeController struct {
|
type NodeController struct {
|
||||||
|
allocateNodeCIDRs bool
|
||||||
cloud cloudprovider.Interface
|
cloud cloudprovider.Interface
|
||||||
kubeClient client.Interface
|
clusterCIDR *net.IPNet
|
||||||
recorder record.EventRecorder
|
|
||||||
registerRetryCount int
|
|
||||||
podEvictionTimeout time.Duration
|
|
||||||
deletingPodsRateLimiter util.RateLimiter
|
deletingPodsRateLimiter util.RateLimiter
|
||||||
// worker that evicts pods from unresponsive nodes.
|
kubeClient client.Interface
|
||||||
podEvictor *PodEvictor
|
// Method for easy mocking in unittest.
|
||||||
|
lookupIP func(host string) ([]net.IP, error)
|
||||||
// per Node map storing last observed Status together with a local time when it was observed.
|
|
||||||
// This timestamp is to be used instead of LastProbeTime stored in Condition. We do this
|
|
||||||
// to aviod the problem with time skew across the cluster.
|
|
||||||
nodeStatusMap map[string]nodeStatusData
|
|
||||||
// Value used if sync_nodes_status=False. NodeController will not proactively
|
// Value used if sync_nodes_status=False. NodeController will not proactively
|
||||||
// sync node status in this case, but will monitor node status updated from kubelet. If
|
// sync node status in this case, but will monitor node status updated from kubelet. If
|
||||||
// it doesn't receive update for this amount of time, it will start posting "NodeReady==
|
// it doesn't receive update for this amount of time, it will start posting "NodeReady==
|
||||||
@ -81,25 +75,28 @@ type NodeController struct {
|
|||||||
// 2. nodeMonitorGracePeriod can't be too large for user experience - larger value takes
|
// 2. nodeMonitorGracePeriod can't be too large for user experience - larger value takes
|
||||||
// longer for user to see up-to-date node status.
|
// longer for user to see up-to-date node status.
|
||||||
nodeMonitorGracePeriod time.Duration
|
nodeMonitorGracePeriod time.Duration
|
||||||
// Value used if sync_nodes_status=False, only for node startup. When node
|
|
||||||
// is just created, e.g. cluster bootstrap or node creation, we give a longer grace period.
|
|
||||||
nodeStartupGracePeriod time.Duration
|
|
||||||
// Value controlling NodeController monitoring period, i.e. how often does NodeController
|
// Value controlling NodeController monitoring period, i.e. how often does NodeController
|
||||||
// check node status posted from kubelet. This value should be lower than nodeMonitorGracePeriod.
|
// check node status posted from kubelet. This value should be lower than nodeMonitorGracePeriod.
|
||||||
// TODO: Change node status monitor to watch based.
|
// TODO: Change node status monitor to watch based.
|
||||||
nodeMonitorPeriod time.Duration
|
nodeMonitorPeriod time.Duration
|
||||||
clusterCIDR *net.IPNet
|
// Value used if sync_nodes_status=False, only for node startup. When node
|
||||||
allocateNodeCIDRs bool
|
// is just created, e.g. cluster bootstrap or node creation, we give a longer grace period.
|
||||||
// Method for easy mocking in unittest.
|
nodeStartupGracePeriod time.Duration
|
||||||
lookupIP func(host string) ([]net.IP, error)
|
// per Node map storing last observed Status together with a local time when it was observed.
|
||||||
|
// This timestamp is to be used instead of LastProbeTime stored in Condition. We do this
|
||||||
|
// to aviod the problem with time skew across the cluster.
|
||||||
|
nodeStatusMap map[string]nodeStatusData
|
||||||
now func() util.Time
|
now func() util.Time
|
||||||
|
// worker that evicts pods from unresponsive nodes.
|
||||||
|
podEvictor *PodEvictor
|
||||||
|
podEvictionTimeout time.Duration
|
||||||
|
recorder record.EventRecorder
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewNodeController returns a new node controller to sync instances from cloudprovider.
|
// NewNodeController returns a new node controller to sync instances from cloudprovider.
|
||||||
func NewNodeController(
|
func NewNodeController(
|
||||||
cloud cloudprovider.Interface,
|
cloud cloudprovider.Interface,
|
||||||
kubeClient client.Interface,
|
kubeClient client.Interface,
|
||||||
registerRetryCount int,
|
|
||||||
podEvictionTimeout time.Duration,
|
podEvictionTimeout time.Duration,
|
||||||
podEvictor *PodEvictor,
|
podEvictor *PodEvictor,
|
||||||
nodeMonitorGracePeriod time.Duration,
|
nodeMonitorGracePeriod time.Duration,
|
||||||
@ -123,7 +120,6 @@ func NewNodeController(
|
|||||||
cloud: cloud,
|
cloud: cloud,
|
||||||
kubeClient: kubeClient,
|
kubeClient: kubeClient,
|
||||||
recorder: recorder,
|
recorder: recorder,
|
||||||
registerRetryCount: registerRetryCount,
|
|
||||||
podEvictionTimeout: podEvictionTimeout,
|
podEvictionTimeout: podEvictionTimeout,
|
||||||
podEvictor: podEvictor,
|
podEvictor: podEvictor,
|
||||||
nodeStatusMap: make(map[string]nodeStatusData),
|
nodeStatusMap: make(map[string]nodeStatusData),
|
||||||
@ -137,6 +133,44 @@ func NewNodeController(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Run starts an asynchronous loop that monitors the status of cluster nodes.
|
||||||
|
func (nc *NodeController) Run(period time.Duration) {
|
||||||
|
// Incorporate the results of node status pushed from kubelet to master.
|
||||||
|
go util.Forever(func() {
|
||||||
|
if err := nc.monitorNodeStatus(); err != nil {
|
||||||
|
glog.Errorf("Error monitoring node status: %v", err)
|
||||||
|
}
|
||||||
|
}, nc.nodeMonitorPeriod)
|
||||||
|
|
||||||
|
go util.Forever(func() {
|
||||||
|
nc.podEvictor.TryEvict(func(nodeName string) { nc.deletePods(nodeName) })
|
||||||
|
}, nodeEvictionPeriod)
|
||||||
|
}
|
||||||
|
|
||||||
|
// deletePods will delete all pods from master running on given node.
|
||||||
|
func (nc *NodeController) deletePods(nodeID string) error {
|
||||||
|
glog.V(2).Infof("Delete all pods from %v", nodeID)
|
||||||
|
pods, err := nc.kubeClient.Pods(api.NamespaceAll).List(labels.Everything(),
|
||||||
|
fields.OneTermEqualSelector(client.PodHost, nodeID))
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
nc.recordNodeEvent(nodeID, fmt.Sprintf("Deleting all Pods from Node %v.", nodeID))
|
||||||
|
for _, pod := range pods.Items {
|
||||||
|
// Defensive check, also needed for tests.
|
||||||
|
if pod.Spec.NodeName != nodeID {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
glog.V(2).Infof("Delete pod %v", pod.Name)
|
||||||
|
nc.recorder.Eventf(&pod, "NodeControllerEviction", "Deleting Pod %s from Node %s", pod.Name, nodeID)
|
||||||
|
if err := nc.kubeClient.Pods(pod.Namespace).Delete(pod.Name, nil); err != nil {
|
||||||
|
glog.Errorf("Error deleting pod %v: %v", pod.Name, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// Generates num pod CIDRs that could be assigned to nodes.
|
// Generates num pod CIDRs that could be assigned to nodes.
|
||||||
func generateCIDRs(clusterCIDR *net.IPNet, num int) util.StringSet {
|
func generateCIDRs(clusterCIDR *net.IPNet, num int) util.StringSet {
|
||||||
res := util.NewStringSet()
|
res := util.NewStringSet()
|
||||||
@ -150,6 +184,108 @@ func generateCIDRs(clusterCIDR *net.IPNet, num int) util.StringSet {
|
|||||||
return res
|
return res
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// getCondition returns a condition object for the specific condition
|
||||||
|
// type, nil if the condition is not set.
|
||||||
|
func (nc *NodeController) getCondition(status *api.NodeStatus, conditionType api.NodeConditionType) *api.NodeCondition {
|
||||||
|
if status == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
for i := range status.Conditions {
|
||||||
|
if status.Conditions[i].Type == conditionType {
|
||||||
|
return &status.Conditions[i]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// monitorNodeStatus verifies node status are constantly updated by kubelet, and if not,
|
||||||
|
// post "NodeReady==ConditionUnknown". It also evicts all pods if node is not ready or
|
||||||
|
// not reachable for a long period of time.
|
||||||
|
func (nc *NodeController) monitorNodeStatus() error {
|
||||||
|
nodes, err := nc.kubeClient.Nodes().List(labels.Everything(), fields.Everything())
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if nc.allocateNodeCIDRs {
|
||||||
|
// TODO (cjcullen): Use pkg/controller/framework to watch nodes and
|
||||||
|
// reduce lists/decouple this from monitoring status.
|
||||||
|
nc.reconcileNodeCIDRs(nodes)
|
||||||
|
}
|
||||||
|
for i := range nodes.Items {
|
||||||
|
var gracePeriod time.Duration
|
||||||
|
var lastReadyCondition api.NodeCondition
|
||||||
|
var readyCondition *api.NodeCondition
|
||||||
|
node := &nodes.Items[i]
|
||||||
|
for rep := 0; rep < nodeStatusUpdateRetry; rep++ {
|
||||||
|
gracePeriod, lastReadyCondition, readyCondition, err = nc.tryUpdateNodeStatus(node)
|
||||||
|
if err == nil {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
name := node.Name
|
||||||
|
node, err = nc.kubeClient.Nodes().Get(name)
|
||||||
|
if err != nil {
|
||||||
|
glog.Errorf("Failed while getting a Node to retry updating NodeStatus. Probably Node %s was deleted.", name)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
glog.Errorf("Update status of Node %v from NodeController exceeds retry count."+
|
||||||
|
"Skipping - no pods will be evicted.", node.Name)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
decisionTimestamp := nc.now()
|
||||||
|
|
||||||
|
if readyCondition != nil {
|
||||||
|
// Check eviction timeout against decisionTimestamp
|
||||||
|
if lastReadyCondition.Status == api.ConditionFalse &&
|
||||||
|
decisionTimestamp.After(nc.nodeStatusMap[node.Name].readyTransitionTimestamp.Add(nc.podEvictionTimeout)) {
|
||||||
|
if nc.podEvictor.AddNodeToEvict(node.Name) {
|
||||||
|
glog.Infof("Adding pods to evict: %v is later than %v + %v", decisionTimestamp, nc.nodeStatusMap[node.Name].readyTransitionTimestamp, nc.podEvictionTimeout)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if lastReadyCondition.Status == api.ConditionUnknown &&
|
||||||
|
decisionTimestamp.After(nc.nodeStatusMap[node.Name].probeTimestamp.Add(nc.podEvictionTimeout-gracePeriod)) {
|
||||||
|
if nc.podEvictor.AddNodeToEvict(node.Name) {
|
||||||
|
glog.Infof("Adding pods to evict2: %v is later than %v + %v", decisionTimestamp, nc.nodeStatusMap[node.Name].readyTransitionTimestamp, nc.podEvictionTimeout-gracePeriod)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if lastReadyCondition.Status == api.ConditionTrue {
|
||||||
|
if nc.podEvictor.RemoveNodeToEvict(node.Name) {
|
||||||
|
glog.Infof("Pods on %v won't be evicted", node.Name)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Report node event.
|
||||||
|
if readyCondition.Status != api.ConditionTrue && lastReadyCondition.Status == api.ConditionTrue {
|
||||||
|
nc.recordNodeStatusChange(node, "NodeNotReady")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check with the cloud provider to see if the node still exists. If it
|
||||||
|
// doesn't, delete the node and all pods scheduled on the node.
|
||||||
|
if readyCondition.Status != api.ConditionTrue && nc.cloud != nil {
|
||||||
|
instances, ok := nc.cloud.Instances()
|
||||||
|
if !ok {
|
||||||
|
glog.Errorf("%v", ErrCloudInstance)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if _, err := instances.ExternalID(node.Name); err != nil && err == cloudprovider.InstanceNotFound {
|
||||||
|
glog.Infof("Deleting node (no longer present in cloud provider): %s", node.Name)
|
||||||
|
nc.recordNodeEvent(node.Name, fmt.Sprintf("Deleting Node %v because it's not present according to cloud provider", node.Name))
|
||||||
|
if err := nc.kubeClient.Nodes().Delete(node.Name); err != nil {
|
||||||
|
glog.Errorf("Unable to delete node %s: %v", node.Name, err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if err := nc.deletePods(node.Name); err != nil {
|
||||||
|
glog.Errorf("Unable to delete pods from node %s: %v", node.Name, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// reconcileNodeCIDRs looks at each node and assigns it a valid CIDR
|
// reconcileNodeCIDRs looks at each node and assigns it a valid CIDR
|
||||||
// if it doesn't currently have one.
|
// if it doesn't currently have one.
|
||||||
func (nc *NodeController) reconcileNodeCIDRs(nodes *api.NodeList) {
|
func (nc *NodeController) reconcileNodeCIDRs(nodes *api.NodeList) {
|
||||||
@ -179,18 +315,15 @@ func (nc *NodeController) reconcileNodeCIDRs(nodes *api.NodeList) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Run starts an asynchronous loop that monitors the status of cluster nodes.
|
func (nc *NodeController) recordNodeEvent(nodeName string, event string) {
|
||||||
func (nc *NodeController) Run(period time.Duration) {
|
ref := &api.ObjectReference{
|
||||||
// Incorporate the results of node status pushed from kubelet to master.
|
Kind: "Node",
|
||||||
go util.Forever(func() {
|
Name: nodeName,
|
||||||
if err := nc.monitorNodeStatus(); err != nil {
|
UID: types.UID(nodeName),
|
||||||
glog.Errorf("Error monitoring node status: %v", err)
|
Namespace: "",
|
||||||
}
|
}
|
||||||
}, nc.nodeMonitorPeriod)
|
glog.V(2).Infof("Recording %s event message for node %s", event, nodeName)
|
||||||
|
nc.recorder.Eventf(ref, event, "Node %s event: %s", nodeName, event)
|
||||||
go util.Forever(func() {
|
|
||||||
nc.podEvictor.TryEvict(func(nodeName string) { nc.deletePods(nodeName) })
|
|
||||||
}, nodeEvictionPeriod)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (nc *NodeController) recordNodeStatusChange(node *api.Node, new_status string) {
|
func (nc *NodeController) recordNodeStatusChange(node *api.Node, new_status string) {
|
||||||
@ -206,17 +339,6 @@ func (nc *NodeController) recordNodeStatusChange(node *api.Node, new_status stri
|
|||||||
nc.recorder.Eventf(ref, new_status, "Node %s status is now: %s", node.Name, new_status)
|
nc.recorder.Eventf(ref, new_status, "Node %s status is now: %s", node.Name, new_status)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (nc *NodeController) recordNodeEvent(nodeName string, event string) {
|
|
||||||
ref := &api.ObjectReference{
|
|
||||||
Kind: "Node",
|
|
||||||
Name: nodeName,
|
|
||||||
UID: types.UID(nodeName),
|
|
||||||
Namespace: "",
|
|
||||||
}
|
|
||||||
glog.V(2).Infof("Recording %s event message for node %s", event, nodeName)
|
|
||||||
nc.recorder.Eventf(ref, event, "Node %s event: %s", nodeName, event)
|
|
||||||
}
|
|
||||||
|
|
||||||
// For a given node checks its conditions and tries to update it. Returns grace period to which given node
|
// For a given node checks its conditions and tries to update it. Returns grace period to which given node
|
||||||
// is entitled, state of current and last observed Ready Condition, and an error if it ocured.
|
// is entitled, state of current and last observed Ready Condition, and an error if it ocured.
|
||||||
func (nc *NodeController) tryUpdateNodeStatus(node *api.Node) (time.Duration, api.NodeCondition, *api.NodeCondition, error) {
|
func (nc *NodeController) tryUpdateNodeStatus(node *api.Node) (time.Duration, api.NodeCondition, *api.NodeCondition, error) {
|
||||||
@ -348,129 +470,3 @@ func (nc *NodeController) tryUpdateNodeStatus(node *api.Node) (time.Duration, ap
|
|||||||
|
|
||||||
return gracePeriod, lastReadyCondition, readyCondition, err
|
return gracePeriod, lastReadyCondition, readyCondition, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// monitorNodeStatus verifies node status are constantly updated by kubelet, and if not,
|
|
||||||
// post "NodeReady==ConditionUnknown". It also evicts all pods if node is not ready or
|
|
||||||
// not reachable for a long period of time.
|
|
||||||
func (nc *NodeController) monitorNodeStatus() error {
|
|
||||||
nodes, err := nc.kubeClient.Nodes().List(labels.Everything(), fields.Everything())
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if nc.allocateNodeCIDRs {
|
|
||||||
// TODO (cjcullen): Use pkg/controller/framework to watch nodes and
|
|
||||||
// reduce lists/decouple this from monitoring status.
|
|
||||||
nc.reconcileNodeCIDRs(nodes)
|
|
||||||
}
|
|
||||||
for i := range nodes.Items {
|
|
||||||
var gracePeriod time.Duration
|
|
||||||
var lastReadyCondition api.NodeCondition
|
|
||||||
var readyCondition *api.NodeCondition
|
|
||||||
node := &nodes.Items[i]
|
|
||||||
for rep := 0; rep < nodeStatusUpdateRetry; rep++ {
|
|
||||||
gracePeriod, lastReadyCondition, readyCondition, err = nc.tryUpdateNodeStatus(node)
|
|
||||||
if err == nil {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
name := node.Name
|
|
||||||
node, err = nc.kubeClient.Nodes().Get(name)
|
|
||||||
if err != nil {
|
|
||||||
glog.Errorf("Failed while getting a Node to retry updating NodeStatus. Probably Node %s was deleted.", name)
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if err != nil {
|
|
||||||
glog.Errorf("Update status of Node %v from NodeController exceeds retry count."+
|
|
||||||
"Skipping - no pods will be evicted.", node.Name)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
decisionTimestamp := nc.now()
|
|
||||||
|
|
||||||
if readyCondition != nil {
|
|
||||||
// Check eviction timeout against decisionTimestamp
|
|
||||||
if lastReadyCondition.Status == api.ConditionFalse &&
|
|
||||||
decisionTimestamp.After(nc.nodeStatusMap[node.Name].readyTransitionTimestamp.Add(nc.podEvictionTimeout)) {
|
|
||||||
if nc.podEvictor.AddNodeToEvict(node.Name) {
|
|
||||||
glog.Infof("Adding pods to evict: %v is later than %v + %v", decisionTimestamp, nc.nodeStatusMap[node.Name].readyTransitionTimestamp, nc.podEvictionTimeout)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if lastReadyCondition.Status == api.ConditionUnknown &&
|
|
||||||
decisionTimestamp.After(nc.nodeStatusMap[node.Name].probeTimestamp.Add(nc.podEvictionTimeout-gracePeriod)) {
|
|
||||||
if nc.podEvictor.AddNodeToEvict(node.Name) {
|
|
||||||
glog.Infof("Adding pods to evict2: %v is later than %v + %v", decisionTimestamp, nc.nodeStatusMap[node.Name].readyTransitionTimestamp, nc.podEvictionTimeout-gracePeriod)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if lastReadyCondition.Status == api.ConditionTrue {
|
|
||||||
if nc.podEvictor.RemoveNodeToEvict(node.Name) {
|
|
||||||
glog.Infof("Pods on %v won't be evicted", node.Name)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Report node event.
|
|
||||||
if readyCondition.Status != api.ConditionTrue && lastReadyCondition.Status == api.ConditionTrue {
|
|
||||||
nc.recordNodeStatusChange(node, "NodeNotReady")
|
|
||||||
}
|
|
||||||
|
|
||||||
// Check with the cloud provider to see if the node still exists. If it
|
|
||||||
// doesn't, delete the node and all pods scheduled on the node.
|
|
||||||
if readyCondition.Status != api.ConditionTrue && nc.cloud != nil {
|
|
||||||
instances, ok := nc.cloud.Instances()
|
|
||||||
if !ok {
|
|
||||||
glog.Errorf("%v", ErrCloudInstance)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
if _, err := instances.ExternalID(node.Name); err != nil && err == cloudprovider.InstanceNotFound {
|
|
||||||
glog.Infof("Deleting node (no longer present in cloud provider): %s", node.Name)
|
|
||||||
nc.recordNodeEvent(node.Name, fmt.Sprintf("Deleting Node %v because it's not present according to cloud provider", node.Name))
|
|
||||||
if err := nc.kubeClient.Nodes().Delete(node.Name); err != nil {
|
|
||||||
glog.Errorf("Unable to delete node %s: %v", node.Name, err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
if err := nc.deletePods(node.Name); err != nil {
|
|
||||||
glog.Errorf("Unable to delete pods from node %s: %v", node.Name, err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// deletePods will delete all pods from master running on given node.
|
|
||||||
func (nc *NodeController) deletePods(nodeID string) error {
|
|
||||||
glog.V(2).Infof("Delete all pods from %v", nodeID)
|
|
||||||
pods, err := nc.kubeClient.Pods(api.NamespaceAll).List(labels.Everything(),
|
|
||||||
fields.OneTermEqualSelector(client.PodHost, nodeID))
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
nc.recordNodeEvent(nodeID, fmt.Sprintf("Deleting all Pods from Node %v.", nodeID))
|
|
||||||
for _, pod := range pods.Items {
|
|
||||||
// Defensive check, also needed for tests.
|
|
||||||
if pod.Spec.NodeName != nodeID {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
glog.V(2).Infof("Delete pod %v", pod.Name)
|
|
||||||
nc.recorder.Eventf(&pod, "NodeControllerEviction", "Deleting Pod %s from Node %s", pod.Name, nodeID)
|
|
||||||
if err := nc.kubeClient.Pods(pod.Namespace).Delete(pod.Name, nil); err != nil {
|
|
||||||
glog.Errorf("Error deleting pod %v: %v", pod.Name, err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// getCondition returns a condition object for the specific condition
|
|
||||||
// type, nil if the condition is not set.
|
|
||||||
func (nc *NodeController) getCondition(status *api.NodeStatus, conditionType api.NodeConditionType) *api.NodeCondition {
|
|
||||||
if status == nil {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
for i := range status.Conditions {
|
|
||||||
if status.Conditions[i].Type == conditionType {
|
|
||||||
return &status.Conditions[i]
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
@ -325,7 +325,7 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) {
|
|||||||
|
|
||||||
for _, item := range table {
|
for _, item := range table {
|
||||||
podEvictor := NewPodEvictor(util.NewFakeRateLimiter())
|
podEvictor := NewPodEvictor(util.NewFakeRateLimiter())
|
||||||
nodeController := NewNodeController(nil, item.fakeNodeHandler, 10,
|
nodeController := NewNodeController(nil, item.fakeNodeHandler,
|
||||||
evictionTimeout, podEvictor, testNodeMonitorGracePeriod,
|
evictionTimeout, podEvictor, testNodeMonitorGracePeriod,
|
||||||
testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, false)
|
testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, false)
|
||||||
nodeController.now = func() util.Time { return fakeNow }
|
nodeController.now = func() util.Time { return fakeNow }
|
||||||
@ -531,7 +531,7 @@ func TestMonitorNodeStatusUpdateStatus(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
for _, item := range table {
|
for _, item := range table {
|
||||||
nodeController := NewNodeController(nil, item.fakeNodeHandler, 10, 5*time.Minute, NewPodEvictor(util.NewFakeRateLimiter()),
|
nodeController := NewNodeController(nil, item.fakeNodeHandler, 5*time.Minute, NewPodEvictor(util.NewFakeRateLimiter()),
|
||||||
testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, false)
|
testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, false)
|
||||||
nodeController.now = func() util.Time { return fakeNow }
|
nodeController.now = func() util.Time { return fakeNow }
|
||||||
if err := nodeController.monitorNodeStatus(); err != nil {
|
if err := nodeController.monitorNodeStatus(); err != nil {
|
||||||
|
Loading…
Reference in New Issue
Block a user