Merge pull request #16900 from madhusudancs/avoid-kubelet-oscillation

Auto commit by PR queue bot
This commit is contained in:
k8s-merge-robot 2016-01-05 17:14:13 -08:00
commit 66d3cbf889
12 changed files with 320 additions and 82 deletions

View File

@ -78,7 +78,7 @@ KUBEPROXY_TEST_LOG_LEVEL="${KUBEPROXY_TEST_LOG_LEVEL:-$TEST_CLUSTER_LOG_LEVEL}"
TEST_CLUSTER_RESYNC_PERIOD="${TEST_CLUSTER_RESYNC_PERIOD:---min-resync-period=3m}" TEST_CLUSTER_RESYNC_PERIOD="${TEST_CLUSTER_RESYNC_PERIOD:---min-resync-period=3m}"
KUBELET_TEST_ARGS="--max-pods=110 --serialize-image-pulls=false" KUBELET_TEST_ARGS="--max-pods=110 --serialize-image-pulls=false --outofdisk-transition-frequency=0"
APISERVER_TEST_ARGS="--runtime-config=extensions/v1beta1" APISERVER_TEST_ARGS="--runtime-config=extensions/v1beta1"
CONTROLLER_MANAGER_TEST_ARGS="${TEST_CLUSTER_RESYNC_PERIOD}" CONTROLLER_MANAGER_TEST_ARGS="${TEST_CLUSTER_RESYNC_PERIOD}"
SCHEDULER_TEST_ARGS="" SCHEDULER_TEST_ARGS=""

View File

@ -222,6 +222,7 @@ func startComponents(firstManifestURL, secondManifestURL string) (string, string
10*time.Second, /* MinimumGCAge */ 10*time.Second, /* MinimumGCAge */
3*time.Second, /* NodeStatusUpdateFrequency */ 3*time.Second, /* NodeStatusUpdateFrequency */
10*time.Second, /* SyncFrequency */ 10*time.Second, /* SyncFrequency */
10*time.Second, /* OutOfDiskTransitionFrequency */
40, /* MaxPods */ 40, /* MaxPods */
cm, net.ParseIP("127.0.0.1")) cm, net.ParseIP("127.0.0.1"))
@ -254,6 +255,7 @@ func startComponents(firstManifestURL, secondManifestURL string) (string, string
10*time.Second, /* MinimumGCAge */ 10*time.Second, /* MinimumGCAge */
3*time.Second, /* NodeStatusUpdateFrequency */ 3*time.Second, /* NodeStatusUpdateFrequency */
10*time.Second, /* SyncFrequency */ 10*time.Second, /* SyncFrequency */
10*time.Second, /* OutOfDiskTransitionFrequency */
40, /* MaxPods */ 40, /* MaxPods */
cm, cm,

View File

@ -143,6 +143,7 @@ type KubeletServer struct {
TLSCertFile string TLSCertFile string
TLSPrivateKeyFile string TLSPrivateKeyFile string
ReconcileCIDR bool ReconcileCIDR bool
OutOfDiskTransitionFrequency time.Duration
// Flags intended for testing // Flags intended for testing
// Is the kubelet containerized? // Is the kubelet containerized?
@ -233,6 +234,7 @@ func NewKubeletServer() *KubeletServer {
ReconcileCIDR: true, ReconcileCIDR: true,
KubeAPIQPS: 5.0, KubeAPIQPS: 5.0,
KubeAPIBurst: 10, KubeAPIBurst: 10,
OutOfDiskTransitionFrequency: 5 * time.Minute,
ExperimentalFlannelOverlay: experimentalFlannelOverlay, ExperimentalFlannelOverlay: experimentalFlannelOverlay,
} }
} }
@ -349,6 +351,7 @@ func (s *KubeletServer) AddFlags(fs *pflag.FlagSet) {
fs.Float32Var(&s.KubeAPIQPS, "kube-api-qps", s.KubeAPIQPS, "QPS to use while talking with kubernetes apiserver") fs.Float32Var(&s.KubeAPIQPS, "kube-api-qps", s.KubeAPIQPS, "QPS to use while talking with kubernetes apiserver")
fs.IntVar(&s.KubeAPIBurst, "kube-api-burst", s.KubeAPIBurst, "Burst to use while talking with kubernetes apiserver") fs.IntVar(&s.KubeAPIBurst, "kube-api-burst", s.KubeAPIBurst, "Burst to use while talking with kubernetes apiserver")
fs.BoolVar(&s.SerializeImagePulls, "serialize-image-pulls", s.SerializeImagePulls, "Pull images one at a time. We recommend *not* changing the default value on nodes that run docker daemon with version < 1.9 or an Aufs storage backend. Issue #10959 has more details. [default=true]") fs.BoolVar(&s.SerializeImagePulls, "serialize-image-pulls", s.SerializeImagePulls, "Pull images one at a time. We recommend *not* changing the default value on nodes that run docker daemon with version < 1.9 or an Aufs storage backend. Issue #10959 has more details. [default=true]")
fs.DurationVar(&s.OutOfDiskTransitionFrequency, "outofdisk-transition-frequency", s.OutOfDiskTransitionFrequency, "Duration for which the kubelet has to wait before transitioning out of out-of-disk node condition status. Default: 5m0s")
fs.BoolVar(&s.ExperimentalFlannelOverlay, "experimental-flannel-overlay", s.ExperimentalFlannelOverlay, "Experimental support for starting the kubelet with the default overlay network (flannel). Assumes flanneld is already running in client mode. [default=false]") fs.BoolVar(&s.ExperimentalFlannelOverlay, "experimental-flannel-overlay", s.ExperimentalFlannelOverlay, "Experimental support for starting the kubelet with the default overlay network (flannel). Assumes flanneld is already running in client mode. [default=false]")
fs.IPVar(&s.NodeIP, "node-ip", s.NodeIP, "IP address of the node. If set, kubelet will use this IP address for the node") fs.IPVar(&s.NodeIP, "node-ip", s.NodeIP, "IP address of the node. If set, kubelet will use this IP address for the node")
} }
@ -488,6 +491,7 @@ func (s *KubeletServer) UnsecuredKubeletConfig() (*KubeletConfig, error) {
TLSOptions: tlsOptions, TLSOptions: tlsOptions,
Writer: writer, Writer: writer,
VolumePlugins: ProbeVolumePlugins(s.VolumePluginDir), VolumePlugins: ProbeVolumePlugins(s.VolumePluginDir),
OutOfDiskTransitionFrequency: s.OutOfDiskTransitionFrequency,
ExperimentalFlannelOverlay: s.ExperimentalFlannelOverlay, ExperimentalFlannelOverlay: s.ExperimentalFlannelOverlay,
NodeIP: s.NodeIP, NodeIP: s.NodeIP,
@ -706,7 +710,7 @@ func SimpleKubelet(client *client.Client,
configFilePath string, configFilePath string,
cloud cloudprovider.Interface, cloud cloudprovider.Interface,
osInterface kubecontainer.OSInterface, osInterface kubecontainer.OSInterface,
fileCheckFrequency, httpCheckFrequency, minimumGCAge, nodeStatusUpdateFrequency, syncFrequency time.Duration, fileCheckFrequency, httpCheckFrequency, minimumGCAge, nodeStatusUpdateFrequency, syncFrequency, outOfDiskTransitionFrequency time.Duration,
maxPods int, maxPods int,
containerManager cm.ContainerManager, clusterDNS net.IP) *KubeletConfig { containerManager cm.ContainerManager, clusterDNS net.IP) *KubeletConfig {
imageGCPolicy := kubelet.ImageGCPolicy{ imageGCPolicy := kubelet.ImageGCPolicy{
@ -768,6 +772,7 @@ func SimpleKubelet(client *client.Client,
TLSOptions: tlsOptions, TLSOptions: tlsOptions,
VolumePlugins: volumePlugins, VolumePlugins: volumePlugins,
Writer: &io.StdWriter{}, Writer: &io.StdWriter{},
OutOfDiskTransitionFrequency: outOfDiskTransitionFrequency,
} }
return &kcfg return &kcfg
} }
@ -965,6 +970,7 @@ type KubeletConfig struct {
TLSOptions *server.TLSOptions TLSOptions *server.TLSOptions
Writer io.Writer Writer io.Writer
VolumePlugins []volume.VolumePlugin VolumePlugins []volume.VolumePlugin
OutOfDiskTransitionFrequency time.Duration
ExperimentalFlannelOverlay bool ExperimentalFlannelOverlay bool
NodeIP net.IP NodeIP net.IP
@ -1050,6 +1056,7 @@ func CreateAndInitKubelet(kc *KubeletConfig) (k KubeletBootstrap, pc *config.Pod
kc.OOMAdjuster, kc.OOMAdjuster,
kc.SerializeImagePulls, kc.SerializeImagePulls,
kc.ContainerManager, kc.ContainerManager,
kc.OutOfDiskTransitionFrequency,
kc.ExperimentalFlannelOverlay, kc.ExperimentalFlannelOverlay,
kc.NodeIP, kc.NodeIP,
) )

View File

@ -118,6 +118,7 @@ kubelet
--node-labels-file="": the path to a yaml or json file containing a series of key pair labels to apply on node registration --node-labels-file="": the path to a yaml or json file containing a series of key pair labels to apply on node registration
--node-status-update-frequency=10s: Specifies how often kubelet posts node status to master. Note: be cautious when changing the constant, it must work with nodeMonitorGracePeriod in nodecontroller. Default: 10s --node-status-update-frequency=10s: Specifies how often kubelet posts node status to master. Note: be cautious when changing the constant, it must work with nodeMonitorGracePeriod in nodecontroller. Default: 10s
--oom-score-adj=-999: The oom-score-adj value for kubelet process. Values must be within the range [-1000, 1000] --oom-score-adj=-999: The oom-score-adj value for kubelet process. Values must be within the range [-1000, 1000]
--outofdisk-transition-frequency=5m0s: Duration for which the kubelet has to wait before transitioning out of out-of-disk node condition status. Default: 5m0s
--pod-cidr="": The CIDR to use for pod IP addresses, only used in standalone mode. In cluster mode, this is obtained from the master. --pod-cidr="": The CIDR to use for pod IP addresses, only used in standalone mode. In cluster mode, this is obtained from the master.
--pod-infra-container-image="gcr.io/google_containers/pause:2.0": The image whose network/ipc namespaces containers in each pod will use. --pod-infra-container-image="gcr.io/google_containers/pause:2.0": The image whose network/ipc namespaces containers in each pod will use.
--port=10250: The port for the Kubelet to serve on. --port=10250: The port for the Kubelet to serve on.
@ -143,7 +144,7 @@ kubelet
--volume-plugin-dir="/usr/libexec/kubernetes/kubelet-plugins/volume/exec/": <Warning: Alpha feature> The full path of the directory in which to search for additional third party volume plugins --volume-plugin-dir="/usr/libexec/kubernetes/kubelet-plugins/volume/exec/": <Warning: Alpha feature> The full path of the directory in which to search for additional third party volume plugins
``` ```
###### Auto generated by spf13/cobra on 11-Dec-2015 ###### Auto generated by spf13/cobra on 29-Dec-2015
<!-- BEGIN MUNGE: GENERATED_ANALYTICS --> <!-- BEGIN MUNGE: GENERATED_ANALYTICS -->

View File

@ -4361,7 +4361,7 @@ Populated by the system when a graceful deletion is requested. Read-only. More i
</div> </div>
<div id="footer"> <div id="footer">
<div id="footer-text"> <div id="footer-text">
Last updated 2015-12-18 08:02:33 UTC Last updated 2015-12-29 09:07:13 UTC
</div> </div>
</div> </div>
</body> </body>

View File

@ -5578,7 +5578,7 @@ span.icon > [class^="icon-"], span.icon > [class*=" icon-"] { cursor: default; }
</div> </div>
<div id="footer"> <div id="footer">
<div id="footer-text"> <div id="footer-text">
Last updated 2015-12-15 06:44:37 UTC Last updated 2015-12-22 07:40:13 UTC
</div> </div>
</div> </div>
</body> </body>

View File

@ -7001,7 +7001,7 @@ The resulting set of endpoints can be viewed as:<br>
</div> </div>
<div id="footer"> <div id="footer">
<div id="footer-text"> <div id="footer-text">
Last updated 2015-12-18 08:02:17 UTC Last updated 2015-12-29 09:07:07 UTC
</div> </div>
</div> </div>
</body> </body>

View File

@ -25733,7 +25733,7 @@ span.icon > [class^="icon-"], span.icon > [class*=" icon-"] { cursor: default; }
</div> </div>
<div id="footer"> <div id="footer">
<div id="footer-text"> <div id="footer-text">
Last updated 2015-12-22 14:29:57 UTC Last updated 2015-12-22 07:40:07 UTC
</div> </div>
</div> </div>
</body> </body>

View File

@ -238,6 +238,7 @@ oidc-client-id
oidc-issuer-url oidc-issuer-url
oidc-username-claim oidc-username-claim
oom-score-adj oom-score-adj
outofdisk-transition-frequency
output-base output-base
output-package output-package
output-print-type output-print-type

View File

@ -196,10 +196,10 @@ func NewMainKubelet(
oomAdjuster *oom.OOMAdjuster, oomAdjuster *oom.OOMAdjuster,
serializeImagePulls bool, serializeImagePulls bool,
containerManager cm.ContainerManager, containerManager cm.ContainerManager,
outOfDiskTransitionFrequency time.Duration,
flannelExperimentalOverlay bool, flannelExperimentalOverlay bool,
nodeIP net.IP, nodeIP net.IP,
) (*Kubelet, error) { ) (*Kubelet, error) {
if rootDirectory == "" { if rootDirectory == "" {
return nil, fmt.Errorf("invalid root directory %q", rootDirectory) return nil, fmt.Errorf("invalid root directory %q", rootDirectory)
} }
@ -315,6 +315,8 @@ func NewMainKubelet(
flannelExperimentalOverlay: flannelExperimentalOverlay, flannelExperimentalOverlay: flannelExperimentalOverlay,
flannelHelper: NewFlannelHelper(), flannelHelper: NewFlannelHelper(),
nodeIP: nodeIP, nodeIP: nodeIP,
clock: util.RealClock{},
outOfDiskTransitionFrequency: outOfDiskTransitionFrequency,
} }
if klet.flannelExperimentalOverlay { if klet.flannelExperimentalOverlay {
glog.Infof("Flannel is in charge of podCIDR and overlay networking.") glog.Infof("Flannel is in charge of podCIDR and overlay networking.")
@ -419,7 +421,7 @@ func NewMainKubelet(
SystemContainerName: systemContainer, SystemContainerName: systemContainer,
KubeletContainerName: resourceContainer, KubeletContainerName: resourceContainer,
} }
klet.runtimeState.setRuntimeSync(time.Now()) klet.runtimeState.setRuntimeSync(klet.clock.Now())
klet.runner = klet.containerRuntime klet.runner = klet.containerRuntime
klet.podManager = kubepod.NewBasicPodManager(kubepod.NewBasicMirrorClient(klet.kubeClient)) klet.podManager = kubepod.NewBasicPodManager(kubepod.NewBasicMirrorClient(klet.kubeClient))
@ -654,6 +656,16 @@ type Kubelet struct {
// If non-nil, use this IP address for the node // If non-nil, use this IP address for the node
nodeIP net.IP nodeIP net.IP
// clock is an interface that provides time related functionality in a way that makes it
// easy to test the code.
clock util.Clock
// outOfDiskTransitionFrequency specifies the amount of time the kubelet has to be actually
// not out of disk before it can transition the node condition status from out-of-disk to
// not-out-of-disk. This prevents a pod that causes out-of-disk condition from repeatedly
// getting rescheduled onto the node.
outOfDiskTransitionFrequency time.Duration
} }
// Validate given node IP belongs to the current host // Validate given node IP belongs to the current host
@ -1614,7 +1626,7 @@ func (kl *Kubelet) makePodDataDirs(pod *api.Pod) error {
} }
func (kl *Kubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecontainer.Pod, updateType kubetypes.SyncPodType) (syncErr error) { func (kl *Kubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecontainer.Pod, updateType kubetypes.SyncPodType) (syncErr error) {
start := time.Now() start := kl.clock.Now()
var firstSeenTime time.Time var firstSeenTime time.Time
if firstSeenTimeStr, ok := pod.Annotations[kubetypes.ConfigFirstSeenAnnotationKey]; !ok { if firstSeenTimeStr, ok := pod.Annotations[kubetypes.ConfigFirstSeenAnnotationKey]; !ok {
glog.V(3).Infof("First seen time not recorded for pod %q", pod.UID) glog.V(3).Infof("First seen time not recorded for pod %q", pod.UID)
@ -1943,7 +1955,6 @@ func (kl *Kubelet) cleanupTerminatedPods(pods []*api.Pod, runningPods []*kubecon
// pastActiveDeadline returns true if the pod has been active for more than // pastActiveDeadline returns true if the pod has been active for more than
// ActiveDeadlineSeconds. // ActiveDeadlineSeconds.
func (kl *Kubelet) pastActiveDeadline(pod *api.Pod) bool { func (kl *Kubelet) pastActiveDeadline(pod *api.Pod) bool {
now := unversioned.Now()
if pod.Spec.ActiveDeadlineSeconds != nil { if pod.Spec.ActiveDeadlineSeconds != nil {
podStatus, ok := kl.statusManager.GetPodStatus(pod.UID) podStatus, ok := kl.statusManager.GetPodStatus(pod.UID)
if !ok { if !ok {
@ -1951,7 +1962,7 @@ func (kl *Kubelet) pastActiveDeadline(pod *api.Pod) bool {
} }
if !podStatus.StartTime.IsZero() { if !podStatus.StartTime.IsZero() {
startTime := podStatus.StartTime.Time startTime := podStatus.StartTime.Time
duration := now.Time.Sub(startTime) duration := kl.clock.Since(startTime)
allowedDuration := time.Duration(*pod.Spec.ActiveDeadlineSeconds) * time.Second allowedDuration := time.Duration(*pod.Spec.ActiveDeadlineSeconds) * time.Second
if duration >= allowedDuration { if duration >= allowedDuration {
return true return true
@ -2313,7 +2324,7 @@ func (kl *Kubelet) syncLoop(updates <-chan kubetypes.PodUpdate, handler SyncHand
func (kl *Kubelet) syncLoopIteration(updates <-chan kubetypes.PodUpdate, handler SyncHandler, func (kl *Kubelet) syncLoopIteration(updates <-chan kubetypes.PodUpdate, handler SyncHandler,
syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool { syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
kl.syncLoopMonitor.Store(time.Now()) kl.syncLoopMonitor.Store(kl.clock.Now())
select { select {
case u, open := <-updates: case u, open := <-updates:
if !open { if !open {
@ -2349,7 +2360,7 @@ func (kl *Kubelet) syncLoopIteration(updates <-chan kubetypes.PodUpdate, handler
} }
glog.V(2).Infof("SyncLoop (PLEG): %q, event: %#v", format.Pod(pod), e) glog.V(2).Infof("SyncLoop (PLEG): %q, event: %#v", format.Pod(pod), e)
// Force the container runtime cache to update. // Force the container runtime cache to update.
if err := kl.runtimeCache.ForceUpdateIfOlder(time.Now()); err != nil { if err := kl.runtimeCache.ForceUpdateIfOlder(kl.clock.Now()); err != nil {
glog.Errorf("SyncLoop: unable to update runtime cache") glog.Errorf("SyncLoop: unable to update runtime cache")
// TODO (yujuhong): should we delay the sync until container // TODO (yujuhong): should we delay the sync until container
// runtime can be updated? // runtime can be updated?
@ -2380,7 +2391,7 @@ func (kl *Kubelet) syncLoopIteration(updates <-chan kubetypes.PodUpdate, handler
} }
} }
} }
kl.syncLoopMonitor.Store(time.Now()) kl.syncLoopMonitor.Store(kl.clock.Now())
return true return true
} }
@ -2409,7 +2420,7 @@ func (kl *Kubelet) handleMirrorPod(mirrorPod *api.Pod, start time.Time) {
} }
func (kl *Kubelet) HandlePodAdditions(pods []*api.Pod) { func (kl *Kubelet) HandlePodAdditions(pods []*api.Pod) {
start := time.Now() start := kl.clock.Now()
sort.Sort(podsByCreationTime(pods)) sort.Sort(podsByCreationTime(pods))
for _, pod := range pods { for _, pod := range pods {
kl.podManager.AddPod(pod) kl.podManager.AddPod(pod)
@ -2435,7 +2446,7 @@ func (kl *Kubelet) HandlePodAdditions(pods []*api.Pod) {
} }
func (kl *Kubelet) HandlePodUpdates(pods []*api.Pod) { func (kl *Kubelet) HandlePodUpdates(pods []*api.Pod) {
start := time.Now() start := kl.clock.Now()
for _, pod := range pods { for _, pod := range pods {
kl.podManager.UpdatePod(pod) kl.podManager.UpdatePod(pod)
if kubepod.IsMirrorPod(pod) { if kubepod.IsMirrorPod(pod) {
@ -2450,7 +2461,7 @@ func (kl *Kubelet) HandlePodUpdates(pods []*api.Pod) {
} }
func (kl *Kubelet) HandlePodDeletions(pods []*api.Pod) { func (kl *Kubelet) HandlePodDeletions(pods []*api.Pod) {
start := time.Now() start := kl.clock.Now()
for _, pod := range pods { for _, pod := range pods {
kl.podManager.DeletePod(pod) kl.podManager.DeletePod(pod)
if kubepod.IsMirrorPod(pod) { if kubepod.IsMirrorPod(pod) {
@ -2467,7 +2478,7 @@ func (kl *Kubelet) HandlePodDeletions(pods []*api.Pod) {
} }
func (kl *Kubelet) HandlePodSyncs(pods []*api.Pod) { func (kl *Kubelet) HandlePodSyncs(pods []*api.Pod) {
start := time.Now() start := kl.clock.Now()
for _, pod := range pods { for _, pod := range pods {
mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod) mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
kl.dispatchWork(pod, kubetypes.SyncPodSync, mirrorPod, start) kl.dispatchWork(pod, kubetypes.SyncPodSync, mirrorPod, start)
@ -2615,7 +2626,7 @@ func (kl *Kubelet) updateRuntimeUp() {
return return
} }
kl.oneTimeInitializer.Do(kl.initializeRuntimeDependentModules) kl.oneTimeInitializer.Do(kl.initializeRuntimeDependentModules)
kl.runtimeState.setRuntimeSync(time.Now()) kl.runtimeState.setRuntimeSync(kl.clock.Now())
} }
func (kl *Kubelet) reconcileCBR0(podCIDR string) error { func (kl *Kubelet) reconcileCBR0(podCIDR string) error {
@ -2819,7 +2830,7 @@ func (kl *Kubelet) setNodeReadyCondition(node *api.Node) {
// NOTE(aaronlevy): NodeReady condition needs to be the last in the list of node conditions. // NOTE(aaronlevy): NodeReady condition needs to be the last in the list of node conditions.
// This is due to an issue with version skewed kubelet and master components. // This is due to an issue with version skewed kubelet and master components.
// ref: https://github.com/kubernetes/kubernetes/issues/16961 // ref: https://github.com/kubernetes/kubernetes/issues/16961
currentTime := unversioned.Now() currentTime := unversioned.NewTime(kl.clock.Now())
var newNodeReadyCondition api.NodeCondition var newNodeReadyCondition api.NodeCondition
if rs := kl.runtimeState.errors(); len(rs) == 0 { if rs := kl.runtimeState.errors(); len(rs) == 0 {
newNodeReadyCondition = api.NodeCondition{ newNodeReadyCondition = api.NodeCondition{
@ -2869,7 +2880,7 @@ func (kl *Kubelet) setNodeReadyCondition(node *api.Node) {
// Set OODcondition for the node. // Set OODcondition for the node.
func (kl *Kubelet) setNodeOODCondition(node *api.Node) { func (kl *Kubelet) setNodeOODCondition(node *api.Node) {
currentTime := unversioned.Now() currentTime := unversioned.NewTime(kl.clock.Now())
var nodeOODCondition *api.NodeCondition var nodeOODCondition *api.NodeCondition
// Check if NodeOutOfDisk condition already exists and if it does, just pick it up for update. // Check if NodeOutOfDisk condition already exists and if it does, just pick it up for update.
@ -2883,9 +2894,8 @@ func (kl *Kubelet) setNodeOODCondition(node *api.Node) {
// If the NodeOutOfDisk condition doesn't exist, create one. // If the NodeOutOfDisk condition doesn't exist, create one.
if nodeOODCondition == nil { if nodeOODCondition == nil {
nodeOODCondition = &api.NodeCondition{ nodeOODCondition = &api.NodeCondition{
Type: api.NodeOutOfDisk, Type: api.NodeOutOfDisk,
Status: api.ConditionUnknown, Status: api.ConditionUnknown,
LastTransitionTime: currentTime,
} }
// nodeOODCondition cannot be appended to node.Status.Conditions here because it gets // nodeOODCondition cannot be appended to node.Status.Conditions here because it gets
// copied to the slice. So if we append nodeOODCondition to the slice here none of the // copied to the slice. So if we append nodeOODCondition to the slice here none of the
@ -2912,11 +2922,18 @@ func (kl *Kubelet) setNodeOODCondition(node *api.Node) {
} }
} else { } else {
if nodeOODCondition.Status != api.ConditionFalse { if nodeOODCondition.Status != api.ConditionFalse {
nodeOODCondition.Status = api.ConditionFalse // Update the out of disk condition when the condition status is unknown even if we
nodeOODCondition.Reason = "KubeletHasSufficientDisk" // are within the outOfDiskTransitionFrequency duration. We do this to set the
nodeOODCondition.Message = "kubelet has sufficient disk space available" // condition status correctly at kubelet startup.
nodeOODCondition.LastTransitionTime = currentTime if nodeOODCondition.Status == api.ConditionUnknown || kl.clock.Since(nodeOODCondition.LastTransitionTime.Time) >= kl.outOfDiskTransitionFrequency {
kl.recordNodeStatusEvent(api.EventTypeNormal, "NodeHasSufficientDisk") nodeOODCondition.Status = api.ConditionFalse
nodeOODCondition.Reason = "KubeletHasSufficientDisk"
nodeOODCondition.Message = "kubelet has sufficient disk space available"
nodeOODCondition.LastTransitionTime = currentTime
kl.recordNodeStatusEvent(api.EventTypeNormal, "NodeHasSufficientDisk")
} else {
glog.Infof("Node condition status for OutOfDisk is false, but last transition time is less than %s", kl.outOfDiskTransitionFrequency)
}
} }
} }
@ -3090,7 +3107,7 @@ func GetPhase(spec *api.PodSpec, info []api.ContainerStatus) api.PodPhase {
// after refactoring, modify them later. // after refactoring, modify them later.
func (kl *Kubelet) generatePodStatus(pod *api.Pod) (api.PodStatus, error) { func (kl *Kubelet) generatePodStatus(pod *api.Pod) (api.PodStatus, error) {
start := time.Now() start := kl.clock.Now()
defer func() { defer func() {
metrics.PodStatusLatency.Observe(metrics.SinceInMicroseconds(start)) metrics.PodStatusLatency.Observe(metrics.SinceInMicroseconds(start))
}() }()

View File

@ -85,6 +85,7 @@ type TestKubelet struct {
fakeCadvisor *cadvisor.Mock fakeCadvisor *cadvisor.Mock
fakeKubeClient *testclient.Fake fakeKubeClient *testclient.Fake
fakeMirrorClient *kubepod.FakeMirrorClient fakeMirrorClient *kubepod.FakeMirrorClient
fakeClock *util.FakeClock
} }
func newTestKubelet(t *testing.T) *TestKubelet { func newTestKubelet(t *testing.T) *TestKubelet {
@ -151,7 +152,8 @@ func newTestKubelet(t *testing.T) *TestKubelet {
kubelet.workQueue = queue.NewBasicWorkQueue() kubelet.workQueue = queue.NewBasicWorkQueue()
// Relist period does not affect the tests. // Relist period does not affect the tests.
kubelet.pleg = pleg.NewGenericPLEG(fakeRuntime, 100, time.Hour) kubelet.pleg = pleg.NewGenericPLEG(fakeRuntime, 100, time.Hour)
return &TestKubelet{kubelet, fakeRuntime, mockCadvisor, fakeKubeClient, fakeMirrorClient} kubelet.clock = fakeClock
return &TestKubelet{kubelet, fakeRuntime, mockCadvisor, fakeKubeClient, fakeMirrorClient, fakeClock}
} }
func newTestPods(count int) []*api.Pod { func newTestPods(count int) []*api.Pod {
@ -2465,6 +2467,26 @@ func TestValidateContainerStatus(t *testing.T) {
} }
} }
// updateDiskSpacePolicy creates a new DiskSpaceManager with a new policy. This new manager along
// with the mock FsInfo values added to Cadvisor should make the kubelet report that it has
// sufficient disk space or it is out of disk, depending on the capacity, availability and
// threshold values.
func updateDiskSpacePolicy(kubelet *Kubelet, mockCadvisor *cadvisor.Mock, rootCap, dockerCap, rootAvail, dockerAvail uint64, rootThreshold, dockerThreshold int) error {
dockerimagesFsInfo := cadvisorapiv2.FsInfo{Capacity: rootCap * mb, Available: rootAvail * mb}
rootFsInfo := cadvisorapiv2.FsInfo{Capacity: dockerCap * mb, Available: dockerAvail * mb}
mockCadvisor.On("DockerImagesFsInfo").Return(dockerimagesFsInfo, nil)
mockCadvisor.On("RootFsInfo").Return(rootFsInfo, nil)
dsp := DiskSpacePolicy{DockerFreeDiskMB: rootThreshold, RootFreeDiskMB: dockerThreshold}
diskSpaceManager, err := newDiskSpaceManager(mockCadvisor, dsp)
if err != nil {
return err
}
diskSpaceManager.Unfreeze()
kubelet.diskSpaceManager = diskSpaceManager
return nil
}
func TestUpdateNewNodeStatus(t *testing.T) { func TestUpdateNewNodeStatus(t *testing.T) {
testKubelet := newTestKubelet(t) testKubelet := newTestKubelet(t)
kubelet := testKubelet.kubelet kubelet := testKubelet.kubelet
@ -2489,21 +2511,10 @@ func TestUpdateNewNodeStatus(t *testing.T) {
} }
mockCadvisor.On("VersionInfo").Return(versionInfo, nil) mockCadvisor.On("VersionInfo").Return(versionInfo, nil)
// Create a new DiskSpaceManager with a new policy. This new manager along with the mock // Make kubelet report that it has sufficient disk space.
// FsInfo values added to Cadvisor should make the kubelet report that it has sufficient if err := updateDiskSpacePolicy(kubelet, mockCadvisor, 500, 500, 200, 200, 100, 100); err != nil {
// disk space.
dockerimagesFsInfo := cadvisorapiv2.FsInfo{Capacity: 500 * mb, Available: 200 * mb}
rootFsInfo := cadvisorapiv2.FsInfo{Capacity: 500 * mb, Available: 200 * mb}
mockCadvisor.On("DockerImagesFsInfo").Return(dockerimagesFsInfo, nil)
mockCadvisor.On("RootFsInfo").Return(rootFsInfo, nil)
dsp := DiskSpacePolicy{DockerFreeDiskMB: 100, RootFreeDiskMB: 100}
diskSpaceManager, err := newDiskSpaceManager(mockCadvisor, dsp)
if err != nil {
t.Fatalf("can't update disk space manager: %v", err) t.Fatalf("can't update disk space manager: %v", err)
} }
diskSpaceManager.Unfreeze()
kubelet.diskSpaceManager = diskSpaceManager
expectedNode := &api.Node{ expectedNode := &api.Node{
ObjectMeta: api.ObjectMeta{Name: testKubeletHostname}, ObjectMeta: api.ObjectMeta{Name: testKubeletHostname},
@ -2585,6 +2596,82 @@ func TestUpdateNewNodeStatus(t *testing.T) {
} }
} }
func TestUpdateNewNodeOutOfDiskStatusWithTransitionFrequency(t *testing.T) {
testKubelet := newTestKubelet(t)
kubelet := testKubelet.kubelet
kubeClient := testKubelet.fakeKubeClient
kubeClient.ReactionChain = testclient.NewSimpleFake(&api.NodeList{Items: []api.Node{
{ObjectMeta: api.ObjectMeta{Name: testKubeletHostname}},
}}).ReactionChain
machineInfo := &cadvisorapi.MachineInfo{
MachineID: "123",
SystemUUID: "abc",
BootID: "1b3",
NumCores: 2,
MemoryCapacity: 1024,
}
mockCadvisor := testKubelet.fakeCadvisor
mockCadvisor.On("Start").Return(nil)
mockCadvisor.On("MachineInfo").Return(machineInfo, nil)
versionInfo := &cadvisorapi.VersionInfo{
KernelVersion: "3.16.0-0.bpo.4-amd64",
ContainerOsVersion: "Debian GNU/Linux 7 (wheezy)",
DockerVersion: "1.5.0",
}
mockCadvisor.On("VersionInfo").Return(versionInfo, nil)
// Make Kubelet report that it has sufficient disk space.
if err := updateDiskSpacePolicy(kubelet, mockCadvisor, 500, 500, 200, 200, 100, 100); err != nil {
t.Fatalf("can't update disk space manager: %v", err)
}
kubelet.outOfDiskTransitionFrequency = 10 * time.Second
expectedNodeOutOfDiskCondition := api.NodeCondition{
Type: api.NodeOutOfDisk,
Status: api.ConditionFalse,
Reason: "KubeletHasSufficientDisk",
Message: fmt.Sprintf("kubelet has sufficient disk space available"),
LastHeartbeatTime: unversioned.Time{},
LastTransitionTime: unversioned.Time{},
}
kubelet.updateRuntimeUp()
if err := kubelet.updateNodeStatus(); err != nil {
t.Errorf("unexpected error: %v", err)
}
actions := kubeClient.Actions()
if len(actions) != 2 {
t.Fatalf("unexpected actions: %v", actions)
}
if !actions[1].Matches("update", "nodes") || actions[1].GetSubresource() != "status" {
t.Fatalf("unexpected actions: %v", actions)
}
updatedNode, ok := actions[1].(testclient.UpdateAction).GetObject().(*api.Node)
if !ok {
t.Errorf("unexpected object type")
}
var oodCondition api.NodeCondition
for i, cond := range updatedNode.Status.Conditions {
if cond.LastHeartbeatTime.IsZero() {
t.Errorf("unexpected zero last probe timestamp for %v condition", cond.Type)
}
if cond.LastTransitionTime.IsZero() {
t.Errorf("unexpected zero last transition timestamp for %v condition", cond.Type)
}
updatedNode.Status.Conditions[i].LastHeartbeatTime = unversioned.Time{}
updatedNode.Status.Conditions[i].LastTransitionTime = unversioned.Time{}
if cond.Type == api.NodeOutOfDisk {
oodCondition = updatedNode.Status.Conditions[i]
}
}
if !reflect.DeepEqual(expectedNodeOutOfDiskCondition, oodCondition) {
t.Errorf("unexpected objects: %s", util.ObjectDiff(expectedNodeOutOfDiskCondition, oodCondition))
}
}
// FIXME: Enable me.. // FIXME: Enable me..
func testDockerRuntimeVersion(t *testing.T) { func testDockerRuntimeVersion(t *testing.T) {
testKubelet := newTestKubelet(t) testKubelet := newTestKubelet(t)
@ -2611,20 +2698,11 @@ func testDockerRuntimeVersion(t *testing.T) {
DockerVersion: "1.5.0", DockerVersion: "1.5.0",
} }
mockCadvisor.On("VersionInfo").Return(versionInfo, nil) mockCadvisor.On("VersionInfo").Return(versionInfo, nil)
// Create a new DiskSpaceManager with a new policy. This new manager along with the mock
// FsInfo values added to Cadvisor should make the kubelet report that it has sufficient // Make kubelet report that it has sufficient disk space.
// disk space. if err := updateDiskSpacePolicy(kubelet, mockCadvisor, 500, 500, 200, 200, 100, 100); err != nil {
dockerimagesFsInfo := cadvisorapiv2.FsInfo{Capacity: 500 * mb, Available: 200 * mb}
rootFsInfo := cadvisorapiv2.FsInfo{Capacity: 500 * mb, Available: 200 * mb}
mockCadvisor.On("DockerImagesFsInfo").Return(dockerimagesFsInfo, nil)
mockCadvisor.On("RootFsInfo").Return(rootFsInfo, nil)
dsp := DiskSpacePolicy{DockerFreeDiskMB: 100, RootFreeDiskMB: 100}
diskSpaceManager, err := newDiskSpaceManager(mockCadvisor, dsp)
if err != nil {
t.Fatalf("can't update disk space manager: %v", err) t.Fatalf("can't update disk space manager: %v", err)
} }
diskSpaceManager.Unfreeze()
kubelet.diskSpaceManager = diskSpaceManager
expectedNode := &api.Node{ expectedNode := &api.Node{
ObjectMeta: api.ObjectMeta{Name: testKubeletHostname}, ObjectMeta: api.ObjectMeta{Name: testKubeletHostname},
@ -2781,20 +2859,10 @@ func TestUpdateExistingNodeStatus(t *testing.T) {
} }
mockCadvisor.On("VersionInfo").Return(versionInfo, nil) mockCadvisor.On("VersionInfo").Return(versionInfo, nil)
// Create a new DiskSpaceManager with a new policy. This new manager along with the mock FsInfo // Make kubelet report that it is out of disk space.
// values added to Cadvisor should make the kubelet report that it is out of disk space. if err := updateDiskSpacePolicy(kubelet, mockCadvisor, 500, 500, 50, 50, 100, 100); err != nil {
dockerimagesFsInfo := cadvisorapiv2.FsInfo{Capacity: 500 * mb, Available: 70 * mb}
rootFsInfo := cadvisorapiv2.FsInfo{Capacity: 500 * mb, Available: 50 * mb}
mockCadvisor.On("DockerImagesFsInfo").Return(dockerimagesFsInfo, nil)
mockCadvisor.On("RootFsInfo").Return(rootFsInfo, nil)
dsp := DiskSpacePolicy{DockerFreeDiskMB: 100, RootFreeDiskMB: 100}
diskSpaceManager, err := newDiskSpaceManager(mockCadvisor, dsp)
if err != nil {
t.Fatalf("can't update disk space manager: %v", err) t.Fatalf("can't update disk space manager: %v", err)
} }
diskSpaceManager.Unfreeze()
kubelet.diskSpaceManager = diskSpaceManager
expectedNode := &api.Node{ expectedNode := &api.Node{
ObjectMeta: api.ObjectMeta{Name: testKubeletHostname}, ObjectMeta: api.ObjectMeta{Name: testKubeletHostname},
@ -2878,6 +2946,158 @@ func TestUpdateExistingNodeStatus(t *testing.T) {
} }
} }
func TestUpdateExistingNodeOutOfDiskStatusWithTransitionFrequency(t *testing.T) {
testKubelet := newTestKubelet(t)
kubelet := testKubelet.kubelet
clock := testKubelet.fakeClock
kubeClient := testKubelet.fakeKubeClient
kubeClient.ReactionChain = testclient.NewSimpleFake(&api.NodeList{Items: []api.Node{
{
ObjectMeta: api.ObjectMeta{Name: testKubeletHostname},
Spec: api.NodeSpec{},
Status: api.NodeStatus{
Conditions: []api.NodeCondition{
{
Type: api.NodeReady,
Status: api.ConditionTrue,
Reason: "KubeletReady",
Message: fmt.Sprintf("kubelet is posting ready status"),
LastHeartbeatTime: unversioned.NewTime(clock.Now()),
LastTransitionTime: unversioned.NewTime(clock.Now()),
},
{
Type: api.NodeOutOfDisk,
Status: api.ConditionTrue,
Reason: "KubeletOutOfDisk",
Message: "out of disk space",
LastHeartbeatTime: unversioned.NewTime(clock.Now()),
LastTransitionTime: unversioned.NewTime(clock.Now()),
},
},
},
},
}}).ReactionChain
mockCadvisor := testKubelet.fakeCadvisor
machineInfo := &cadvisorapi.MachineInfo{
MachineID: "123",
SystemUUID: "abc",
BootID: "1b3",
NumCores: 2,
MemoryCapacity: 1024,
}
mockCadvisor.On("Start").Return(nil)
mockCadvisor.On("MachineInfo").Return(machineInfo, nil)
versionInfo := &cadvisorapi.VersionInfo{
KernelVersion: "3.16.0-0.bpo.4-amd64",
ContainerOsVersion: "Debian GNU/Linux 7 (wheezy)",
DockerVersion: "1.5.0",
}
mockCadvisor.On("VersionInfo").Return(versionInfo, nil)
kubelet.outOfDiskTransitionFrequency = 5 * time.Second
ood := api.NodeCondition{
Type: api.NodeOutOfDisk,
Status: api.ConditionTrue,
Reason: "KubeletOutOfDisk",
Message: "out of disk space",
LastHeartbeatTime: unversioned.NewTime(clock.Now()), // placeholder
LastTransitionTime: unversioned.NewTime(clock.Now()), // placeholder
}
noOod := api.NodeCondition{
Type: api.NodeOutOfDisk,
Status: api.ConditionFalse,
Reason: "KubeletHasSufficientDisk",
Message: fmt.Sprintf("kubelet has sufficient disk space available"),
LastHeartbeatTime: unversioned.NewTime(clock.Now()), // placeholder
LastTransitionTime: unversioned.NewTime(clock.Now()), // placeholder
}
testCases := []struct {
rootFsAvail uint64
dockerFsAvail uint64
expected api.NodeCondition
}{
{
// NodeOutOfDisk==false
rootFsAvail: 200,
dockerFsAvail: 200,
expected: ood,
},
{
// NodeOutOfDisk==true
rootFsAvail: 50,
dockerFsAvail: 200,
expected: ood,
},
{
// NodeOutOfDisk==false
rootFsAvail: 200,
dockerFsAvail: 200,
expected: ood,
},
{
// NodeOutOfDisk==true
rootFsAvail: 200,
dockerFsAvail: 50,
expected: ood,
},
{
// NodeOutOfDisk==false
rootFsAvail: 200,
dockerFsAvail: 200,
expected: noOod,
},
}
kubelet.updateRuntimeUp()
for tcIdx, tc := range testCases {
// Step by a second
clock.Step(1 * time.Second)
// Setup expected times.
tc.expected.LastHeartbeatTime = unversioned.NewTime(clock.Now())
// In the last case, there should be a status transition for NodeOutOfDisk
if tcIdx == len(testCases)-1 {
tc.expected.LastTransitionTime = unversioned.NewTime(clock.Now())
}
// Make kubelet report that it has sufficient disk space
if err := updateDiskSpacePolicy(kubelet, mockCadvisor, 500, 500, tc.rootFsAvail, tc.dockerFsAvail, 100, 100); err != nil {
t.Fatalf("can't update disk space manager: %v", err)
}
if err := kubelet.updateNodeStatus(); err != nil {
t.Errorf("unexpected error: %v", err)
}
actions := kubeClient.Actions()
if len(actions) != 2 {
t.Errorf("%d. unexpected actions: %v", tcIdx, actions)
}
updateAction, ok := actions[1].(testclient.UpdateAction)
if !ok {
t.Errorf("%d. unexpected action type. expected UpdateAction, got %#v", tcIdx, actions[1])
}
updatedNode, ok := updateAction.GetObject().(*api.Node)
if !ok {
t.Errorf("%d. unexpected object type", tcIdx)
}
kubeClient.ClearActions()
var oodCondition api.NodeCondition
for i, cond := range updatedNode.Status.Conditions {
if cond.Type == api.NodeOutOfDisk {
oodCondition = updatedNode.Status.Conditions[i]
}
}
if !reflect.DeepEqual(tc.expected, oodCondition) {
t.Errorf("%d.\nwant \n%v\n, got \n%v", tcIdx, tc.expected, oodCondition)
}
}
}
func TestUpdateNodeStatusWithoutContainerRuntime(t *testing.T) { func TestUpdateNodeStatusWithoutContainerRuntime(t *testing.T) {
testKubelet := newTestKubelet(t) testKubelet := newTestKubelet(t)
kubelet := testKubelet.kubelet kubelet := testKubelet.kubelet
@ -2907,21 +3127,10 @@ func TestUpdateNodeStatusWithoutContainerRuntime(t *testing.T) {
} }
mockCadvisor.On("VersionInfo").Return(versionInfo, nil) mockCadvisor.On("VersionInfo").Return(versionInfo, nil)
// Create a new DiskSpaceManager with a new policy. This new manager along with the // Make kubelet report that it has sufficient disk space.
// mock FsInfo values assigned to Cadvisor should make the kubelet report that it has if err := updateDiskSpacePolicy(kubelet, mockCadvisor, 500, 500, 200, 200, 100, 100); err != nil {
// sufficient disk space.
dockerimagesFsInfo := cadvisorapiv2.FsInfo{Capacity: 500 * mb, Available: 200 * mb}
rootFsInfo := cadvisorapiv2.FsInfo{Capacity: 500 * mb, Available: 200 * mb}
mockCadvisor.On("DockerImagesFsInfo").Return(dockerimagesFsInfo, nil)
mockCadvisor.On("RootFsInfo").Return(rootFsInfo, nil)
dsp := DiskSpacePolicy{DockerFreeDiskMB: 100, RootFreeDiskMB: 100}
diskSpaceManager, err := newDiskSpaceManager(mockCadvisor, dsp)
if err != nil {
t.Fatalf("can't update disk space manager: %v", err) t.Fatalf("can't update disk space manager: %v", err)
} }
diskSpaceManager.Unfreeze()
kubelet.diskSpaceManager = diskSpaceManager
expectedNode := &api.Node{ expectedNode := &api.Node{
ObjectMeta: api.ObjectMeta{Name: testKubeletHostname}, ObjectMeta: api.ObjectMeta{Name: testKubeletHostname},

View File

@ -70,6 +70,7 @@ func NewHollowKubelet(
1*time.Minute, /* MinimumGCAge */ 1*time.Minute, /* MinimumGCAge */
10*time.Second, /* NodeStatusUpdateFrequency */ 10*time.Second, /* NodeStatusUpdateFrequency */
10*time.Second, /* SyncFrequency */ 10*time.Second, /* SyncFrequency */
5*time.Minute, /* OutOfDiskTransitionFrequency */
40, /* MaxPods */ 40, /* MaxPods */
containerManager, containerManager,
nil, nil,