Merge pull request #28829 from gmarek/hooks

Automatic merge from submit-queue

Add hooks for cluster health detection

Separate a function that decides if zone is healthy. First real commit for preventing massive pod eviction.
Ref. #28832

cc @davidopp
This commit is contained in:
k8s-merge-robot 2016-07-12 08:12:04 -07:00 committed by GitHub
commit 5894dc4615
4 changed files with 132 additions and 99 deletions

View File

@ -22,7 +22,6 @@ import (
"time" "time"
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/client/cache" "k8s.io/kubernetes/pkg/client/cache"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/client/record" "k8s.io/kubernetes/pkg/client/record"
@ -36,6 +35,20 @@ import (
"github.com/golang/glog" "github.com/golang/glog"
) )
// This function is expected to get a slice of NodeReadyConditions for all Nodes in a given zone.
func ComputeZoneState(nodeReadyConditions []*api.NodeCondition) zoneState {
seenReady := false
for i := range nodeReadyConditions {
if nodeReadyConditions[i] != nil && nodeReadyConditions[i].Status == api.ConditionTrue {
seenReady = true
}
}
if seenReady {
return stateNormal
}
return stateFullSegmentation
}
// cleanupOrphanedPods deletes pods that are bound to nodes that don't // cleanupOrphanedPods deletes pods that are bound to nodes that don't
// exist. // exist.
func cleanupOrphanedPods(pods []*api.Pod, nodeStore cache.Store, forcefulDeletePodFunc func(*api.Pod) error) { func cleanupOrphanedPods(pods []*api.Pod, nodeStore cache.Store, forcefulDeletePodFunc func(*api.Pod) error) {
@ -124,16 +137,6 @@ func forcefullyDeleteNode(kubeClient clientset.Interface, nodeName string, force
return nil return nil
} }
// forceUpdateAllProbeTimes bumps all observed timestamps in saved nodeStatuses to now. This makes
// all eviction timer to reset.
func forceUpdateAllProbeTimes(now unversioned.Time, statusData map[string]nodeStatusData) {
for k, v := range statusData {
v.probeTimestamp = now
v.readyTransitionTimestamp = now
statusData[k] = v
}
}
// maybeDeleteTerminatingPod non-gracefully deletes pods that are terminating // maybeDeleteTerminatingPod non-gracefully deletes pods that are terminating
// that should not be gracefully terminated. // that should not be gracefully terminated.
func (nc *NodeController) maybeDeleteTerminatingPod(obj interface{}, nodeStore cache.Store, forcefulDeletePodFunc func(*api.Pod) error) { func (nc *NodeController) maybeDeleteTerminatingPod(obj interface{}, nodeStore cache.Store, forcefulDeletePodFunc func(*api.Pod) error) {

View File

@ -38,8 +38,8 @@ import (
"k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util/flowcontrol" "k8s.io/kubernetes/pkg/util/flowcontrol"
"k8s.io/kubernetes/pkg/util/metrics" "k8s.io/kubernetes/pkg/util/metrics"
utilnode "k8s.io/kubernetes/pkg/util/node"
utilruntime "k8s.io/kubernetes/pkg/util/runtime" utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/util/system" "k8s.io/kubernetes/pkg/util/system"
"k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/version" "k8s.io/kubernetes/pkg/version"
@ -58,6 +58,14 @@ const (
nodeEvictionPeriod = 100 * time.Millisecond nodeEvictionPeriod = 100 * time.Millisecond
) )
type zoneState string
const (
stateNormal = zoneState("Normal")
stateFullSegmentation = zoneState("FullSegmentation")
statePartialSegmentation = zoneState("PartialSegmentation")
)
type nodeStatusData struct { type nodeStatusData struct {
probeTimestamp unversioned.Time probeTimestamp unversioned.Time
readyTransitionTimestamp unversioned.Time readyTransitionTimestamp unversioned.Time
@ -70,7 +78,7 @@ type NodeController struct {
clusterCIDR *net.IPNet clusterCIDR *net.IPNet
serviceCIDR *net.IPNet serviceCIDR *net.IPNet
deletingPodsRateLimiter flowcontrol.RateLimiter deletingPodsRateLimiter flowcontrol.RateLimiter
knownNodeSet sets.String knownNodeSet map[string]*api.Node
kubeClient clientset.Interface kubeClient clientset.Interface
// Method for easy mocking in unittest. // Method for easy mocking in unittest.
lookupIP func(host string) ([]net.IP, error) lookupIP func(host string) ([]net.IP, error)
@ -124,11 +132,9 @@ type NodeController struct {
forcefullyDeletePod func(*api.Pod) error forcefullyDeletePod func(*api.Pod) error
nodeExistsInCloudProvider func(string) (bool, error) nodeExistsInCloudProvider func(string) (bool, error)
computeZoneStateFunc func(nodeConditions []*api.NodeCondition) zoneState
// If in network segmentation mode NodeController won't evict Pods from unhealthy Nodes. zoneStates map[string]zoneState
// It is enabled when all Nodes observed by the NodeController are NotReady and disabled
// when NC sees any healthy Node. This is a temporary fix for v1.3.
networkSegmentationMode bool
} }
// NewNodeController returns a new node controller to sync instances from cloudprovider. // NewNodeController returns a new node controller to sync instances from cloudprovider.
@ -172,7 +178,7 @@ func NewNodeController(
nc := &NodeController{ nc := &NodeController{
cloud: cloud, cloud: cloud,
knownNodeSet: make(sets.String), knownNodeSet: make(map[string]*api.Node),
kubeClient: kubeClient, kubeClient: kubeClient,
recorder: recorder, recorder: recorder,
podEvictionTimeout: podEvictionTimeout, podEvictionTimeout: podEvictionTimeout,
@ -191,6 +197,8 @@ func NewNodeController(
allocateNodeCIDRs: allocateNodeCIDRs, allocateNodeCIDRs: allocateNodeCIDRs,
forcefullyDeletePod: func(p *api.Pod) error { return forcefullyDeletePod(kubeClient, p) }, forcefullyDeletePod: func(p *api.Pod) error { return forcefullyDeletePod(kubeClient, p) },
nodeExistsInCloudProvider: func(nodeName string) (bool, error) { return nodeExistsInCloudProvider(cloud, nodeName) }, nodeExistsInCloudProvider: func(nodeName string) (bool, error) { return nodeExistsInCloudProvider(cloud, nodeName) },
computeZoneStateFunc: ComputeZoneState,
zoneStates: make(map[string]zoneState),
} }
nc.podStore.Indexer, nc.podController = framework.NewIndexerInformer( nc.podStore.Indexer, nc.podController = framework.NewIndexerInformer(
@ -360,31 +368,22 @@ func (nc *NodeController) monitorNodeStatus() error {
if err != nil { if err != nil {
return err return err
} }
for _, node := range nodes.Items { added, deleted := nc.checkForNodeAddedDeleted(nodes)
if !nc.knownNodeSet.Has(node.Name) { for i := range added {
glog.V(1).Infof("NodeController observed a new Node: %#v", node) glog.V(1).Infof("NodeController observed a new Node: %#v", added[i].Name)
recordNodeEvent(nc.recorder, node.Name, api.EventTypeNormal, "RegisteredNode", fmt.Sprintf("Registered Node %v in NodeController", node.Name)) recordNodeEvent(nc.recorder, added[i].Name, api.EventTypeNormal, "RegisteredNode", fmt.Sprintf("Registered Node %v in NodeController", added[i].Name))
nc.cancelPodEviction(node.Name) nc.cancelPodEviction(added[i])
nc.knownNodeSet.Insert(node.Name) nc.knownNodeSet[added[i].Name] = added[i]
}
}
// If there's a difference between lengths of known Nodes and observed nodes
// we must have removed some Node.
if len(nc.knownNodeSet) != len(nodes.Items) {
observedSet := make(sets.String)
for _, node := range nodes.Items {
observedSet.Insert(node.Name)
}
deleted := nc.knownNodeSet.Difference(observedSet)
for nodeName := range deleted {
glog.V(1).Infof("NodeController observed a Node deletion: %v", nodeName)
recordNodeEvent(nc.recorder, nodeName, api.EventTypeNormal, "RemovingNode", fmt.Sprintf("Removing Node %v from NodeController", nodeName))
nc.evictPods(nodeName)
nc.knownNodeSet.Delete(nodeName)
}
} }
seenReady := false for i := range deleted {
glog.V(1).Infof("NodeController observed a Node deletion: %v", deleted[i].Name)
recordNodeEvent(nc.recorder, deleted[i].Name, api.EventTypeNormal, "RemovingNode", fmt.Sprintf("Removing Node %v from NodeController", deleted[i].Name))
nc.evictPods(deleted[i])
delete(nc.knownNodeSet, deleted[i].Name)
}
zoneToNodeConditions := map[string][]*api.NodeCondition{}
for i := range nodes.Items { for i := range nodes.Items {
var gracePeriod time.Duration var gracePeriod time.Duration
var observedReadyCondition api.NodeCondition var observedReadyCondition api.NodeCondition
@ -407,29 +406,28 @@ func (nc *NodeController) monitorNodeStatus() error {
"Skipping - no pods will be evicted.", node.Name) "Skipping - no pods will be evicted.", node.Name)
continue continue
} }
// We do not treat a master node as a part of the cluster for network segmentation checking.
if !system.IsMasterNode(node) {
zoneToNodeConditions[utilnode.GetZoneKey(node)] = append(zoneToNodeConditions[utilnode.GetZoneKey(node)], currentReadyCondition)
}
decisionTimestamp := nc.now() decisionTimestamp := nc.now()
if currentReadyCondition != nil { if currentReadyCondition != nil {
// Check eviction timeout against decisionTimestamp // Check eviction timeout against decisionTimestamp
if observedReadyCondition.Status == api.ConditionFalse && if observedReadyCondition.Status == api.ConditionFalse &&
decisionTimestamp.After(nc.nodeStatusMap[node.Name].readyTransitionTimestamp.Add(nc.podEvictionTimeout)) { decisionTimestamp.After(nc.nodeStatusMap[node.Name].readyTransitionTimestamp.Add(nc.podEvictionTimeout)) {
if nc.evictPods(node.Name) { if nc.evictPods(node) {
glog.V(4).Infof("Evicting pods on node %s: %v is later than %v + %v", node.Name, decisionTimestamp, nc.nodeStatusMap[node.Name].readyTransitionTimestamp, nc.podEvictionTimeout) glog.V(4).Infof("Evicting pods on node %s: %v is later than %v + %v", node.Name, decisionTimestamp, nc.nodeStatusMap[node.Name].readyTransitionTimestamp, nc.podEvictionTimeout)
} }
} }
if observedReadyCondition.Status == api.ConditionUnknown && if observedReadyCondition.Status == api.ConditionUnknown &&
decisionTimestamp.After(nc.nodeStatusMap[node.Name].probeTimestamp.Add(nc.podEvictionTimeout)) { decisionTimestamp.After(nc.nodeStatusMap[node.Name].probeTimestamp.Add(nc.podEvictionTimeout)) {
if nc.evictPods(node.Name) { if nc.evictPods(node) {
glog.V(4).Infof("Evicting pods on node %s: %v is later than %v + %v", node.Name, decisionTimestamp, nc.nodeStatusMap[node.Name].readyTransitionTimestamp, nc.podEvictionTimeout-gracePeriod) glog.V(4).Infof("Evicting pods on node %s: %v is later than %v + %v", node.Name, decisionTimestamp, nc.nodeStatusMap[node.Name].readyTransitionTimestamp, nc.podEvictionTimeout-gracePeriod)
} }
} }
if observedReadyCondition.Status == api.ConditionTrue { if observedReadyCondition.Status == api.ConditionTrue {
// We do not treat a master node as a part of the cluster for network segmentation checking. if nc.cancelPodEviction(node) {
if !system.IsMasterNode(node) {
seenReady = true
}
if nc.cancelPodEviction(node.Name) {
glog.V(2).Infof("Node %s is ready again, cancelled pod eviction", node.Name) glog.V(2).Infof("Node %s is ready again, cancelled pod eviction", node.Name)
} }
} }
@ -468,19 +466,35 @@ func (nc *NodeController) monitorNodeStatus() error {
} }
} }
// NC don't see any Ready Node. We assume that the network is segmented and Nodes cannot connect to API server and for k, v := range zoneToNodeConditions {
// update their statuses. NC enteres network segmentation mode and cancels all evictions in progress. newState := nc.computeZoneStateFunc(v)
if !seenReady { if newState == nc.zoneStates[k] {
nc.networkSegmentationMode = true continue
nc.stopAllPodEvictions()
glog.V(2).Info("NodeController is entering network segmentation mode.")
} else {
if nc.networkSegmentationMode {
forceUpdateAllProbeTimes(nc.now(), nc.nodeStatusMap)
nc.networkSegmentationMode = false
glog.V(2).Info("NodeController exited network segmentation mode.")
} }
if newState == stateFullSegmentation {
glog.V(2).Infof("NodeController is entering network segmentation mode in zone %v.", k)
} else if newState == stateNormal {
glog.V(2).Infof("NodeController exited network segmentation mode in zone %v.", k)
}
for i := range nodes.Items {
if utilnode.GetZoneKey(&nodes.Items[i]) == k {
if newState == stateFullSegmentation {
// When zone is fully segmented we stop the eviction all together.
nc.cancelPodEviction(&nodes.Items[i])
}
if newState == stateNormal && nc.zoneStates[k] == stateFullSegmentation {
// When exiting segmentation mode update probe timestamps on all Nodes.
now := nc.now()
v := nc.nodeStatusMap[nodes.Items[i].Name]
v.probeTimestamp = now
v.readyTransitionTimestamp = now
nc.nodeStatusMap[nodes.Items[i].Name] = v
}
}
}
nc.zoneStates[k] = newState
} }
return nil return nil
} }
@ -649,15 +663,38 @@ func (nc *NodeController) tryUpdateNodeStatus(node *api.Node) (time.Duration, ap
return gracePeriod, observedReadyCondition, currentReadyCondition, err return gracePeriod, observedReadyCondition, currentReadyCondition, err
} }
func (nc *NodeController) checkForNodeAddedDeleted(nodes *api.NodeList) (added, deleted []*api.Node) {
for i := range nodes.Items {
if _, has := nc.knownNodeSet[nodes.Items[i].Name]; !has {
added = append(added, &nodes.Items[i])
}
}
// If there's a difference between lengths of known Nodes and observed nodes
// we must have removed some Node.
if len(nc.knownNodeSet)+len(added) != len(nodes.Items) {
knowSetCopy := map[string]*api.Node{}
for k, v := range nc.knownNodeSet {
knowSetCopy[k] = v
}
for i := range nodes.Items {
delete(knowSetCopy, nodes.Items[i].Name)
}
for i := range knowSetCopy {
deleted = append(deleted, knowSetCopy[i])
}
}
return
}
// cancelPodEviction removes any queued evictions, typically because the node is available again. It // cancelPodEviction removes any queued evictions, typically because the node is available again. It
// returns true if an eviction was queued. // returns true if an eviction was queued.
func (nc *NodeController) cancelPodEviction(nodeName string) bool { func (nc *NodeController) cancelPodEviction(node *api.Node) bool {
nc.evictorLock.Lock() nc.evictorLock.Lock()
defer nc.evictorLock.Unlock() defer nc.evictorLock.Unlock()
wasDeleting := nc.podEvictor.Remove(nodeName) wasDeleting := nc.podEvictor.Remove(node.Name)
wasTerminating := nc.terminationEvictor.Remove(nodeName) wasTerminating := nc.terminationEvictor.Remove(node.Name)
if wasDeleting || wasTerminating { if wasDeleting || wasTerminating {
glog.V(2).Infof("Cancelling pod Eviction on Node: %v", nodeName) glog.V(2).Infof("Cancelling pod Eviction on Node: %v", node.Name)
return true return true
} }
return false return false
@ -665,20 +702,11 @@ func (nc *NodeController) cancelPodEviction(nodeName string) bool {
// evictPods queues an eviction for the provided node name, and returns false if the node is already // evictPods queues an eviction for the provided node name, and returns false if the node is already
// queued for eviction. // queued for eviction.
func (nc *NodeController) evictPods(nodeName string) bool { func (nc *NodeController) evictPods(node *api.Node) bool {
if nc.networkSegmentationMode { if nc.zoneStates[utilnode.GetZoneKey(node)] == stateFullSegmentation {
return false return false
} }
nc.evictorLock.Lock() nc.evictorLock.Lock()
defer nc.evictorLock.Unlock() defer nc.evictorLock.Unlock()
return nc.podEvictor.Add(nodeName) return nc.podEvictor.Add(node.Name)
}
// stopAllPodEvictions removes any queued evictions for all Nodes.
func (nc *NodeController) stopAllPodEvictions() {
nc.evictorLock.Lock()
defer nc.evictorLock.Unlock()
glog.V(3).Infof("Cancelling all pod evictions.")
nc.podEvictor.Clear()
nc.terminationEvictor.Clear()
} }

View File

@ -24,6 +24,7 @@ import (
"github.com/golang/glog" "github.com/golang/glog"
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/unversioned"
) )
func GetHostname(hostnameOverride string) string { func GetHostname(hostnameOverride string) string {
@ -59,3 +60,24 @@ func GetNodeHostIP(node *api.Node) (net.IP, error) {
} }
return nil, fmt.Errorf("host IP unknown; known addresses: %v", addresses) return nil, fmt.Errorf("host IP unknown; known addresses: %v", addresses)
} }
// Helper function that builds a string identifier that is unique per failure-zone
// Returns empty-string for no zone
func GetZoneKey(node *api.Node) string {
labels := node.Labels
if labels == nil {
return ""
}
region, _ := labels[unversioned.LabelZoneRegion]
failureDomain, _ := labels[unversioned.LabelZoneFailureDomain]
if region == "" && failureDomain == "" {
return ""
}
// We include the null character just in case region or failureDomain has a colon
// (We do assume there's no null characters in a region or failureDomain)
// As a nice side-benefit, the null character is not printed by fmt.Print or glog
return region + ":\x00:" + failureDomain
}

View File

@ -23,6 +23,7 @@ import (
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/labels"
utilnode "k8s.io/kubernetes/pkg/util/node"
utilruntime "k8s.io/kubernetes/pkg/util/runtime" utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm" "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm"
schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api" schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api"
@ -54,27 +55,6 @@ func NewSelectorSpreadPriority(podLister algorithm.PodLister, serviceLister algo
return selectorSpread.CalculateSpreadPriority return selectorSpread.CalculateSpreadPriority
} }
// Helper function that builds a string identifier that is unique per failure-zone
// Returns empty-string for no zone
func getZoneKey(node *api.Node) string {
labels := node.Labels
if labels == nil {
return ""
}
region, _ := labels[unversioned.LabelZoneRegion]
failureDomain, _ := labels[unversioned.LabelZoneFailureDomain]
if region == "" && failureDomain == "" {
return ""
}
// We include the null character just in case region or failureDomain has a colon
// (We do assume there's no null characters in a region or failureDomain)
// As a nice side-benefit, the null character is not printed by fmt.Print or glog
return region + ":\x00:" + failureDomain
}
// CalculateSpreadPriority spreads pods across hosts and zones, considering pods belonging to the same service or replication controller. // CalculateSpreadPriority spreads pods across hosts and zones, considering pods belonging to the same service or replication controller.
// When a pod is scheduled, it looks for services or RCs that match the pod, then finds existing pods that match those selectors. // When a pod is scheduled, it looks for services or RCs that match the pod, then finds existing pods that match those selectors.
// It favors nodes that have fewer existing matching pods. // It favors nodes that have fewer existing matching pods.
@ -188,7 +168,7 @@ func (s *SelectorSpread) CalculateSpreadPriority(pod *api.Pod, nodeNameToInfo ma
continue continue
} }
zoneId := getZoneKey(node) zoneId := utilnode.GetZoneKey(node)
if zoneId == "" { if zoneId == "" {
continue continue
} }
@ -218,7 +198,7 @@ func (s *SelectorSpread) CalculateSpreadPriority(pod *api.Pod, nodeNameToInfo ma
// If there is zone information present, incorporate it // If there is zone information present, incorporate it
if haveZones { if haveZones {
zoneId := getZoneKey(node) zoneId := utilnode.GetZoneKey(node)
if zoneId != "" { if zoneId != "" {
zoneScore := maxPriority * (float32(maxCountByZone-countsByZone[zoneId]) / float32(maxCountByZone)) zoneScore := maxPriority * (float32(maxCountByZone-countsByZone[zoneId]) / float32(maxCountByZone))
fScore = (fScore * (1.0 - zoneWeighting)) + (zoneWeighting * zoneScore) fScore = (fScore * (1.0 - zoneWeighting)) + (zoneWeighting * zoneScore)