From 059c2aa7991060890fbd68493be3acc0c4d6a181 Mon Sep 17 00:00:00 2001 From: "Madhusudan.C.S" Date: Mon, 4 Jan 2016 12:03:28 -0800 Subject: [PATCH] Mitigate node out of disk status oscillation by delaying it. Implement a flag that defines the frequency at which a node's out of disk condition can change its status. Use this flag to suspend out of disk status changes in the time period specified by the flag, after the status is changed once. Set the flag to 0 in e2e tests so that we can predictably test out of disk node condition. Also, use util.Clock interface for all time related functionality in the kubelet. Calling time functions in unversioned package or time package such as unversioned.Now() or time.Now() makes it really hard to test such code. It also makes the tests flaky and sometimes unnecessarily slow due to time.Sleep() calls used to simulate the time elapsed. So use util.Clock interface instead which can be faked in the tests. --- cluster/gce/config-test.sh | 2 +- cmd/integration/integration.go | 2 + cmd/kubelet/app/server.go | 9 +- docs/admin/kubelet.md | 3 +- .../extensions/v1beta1/definitions.html | 2 +- .../extensions/v1beta1/operations.html | 2 +- docs/api-reference/v1/definitions.html | 2 +- docs/api-reference/v1/operations.html | 2 +- hack/verify-flags/known-flags.txt | 1 + pkg/kubelet/kubelet.go | 65 ++-- pkg/kubelet/kubelet_test.go | 311 +++++++++++++++--- pkg/kubemark/hollow_kubelet.go | 1 + 12 files changed, 320 insertions(+), 82 deletions(-) diff --git a/cluster/gce/config-test.sh b/cluster/gce/config-test.sh index e915750b22b..6989c8e1c33 100755 --- a/cluster/gce/config-test.sh +++ b/cluster/gce/config-test.sh @@ -78,7 +78,7 @@ KUBEPROXY_TEST_LOG_LEVEL="${KUBEPROXY_TEST_LOG_LEVEL:-$TEST_CLUSTER_LOG_LEVEL}" TEST_CLUSTER_RESYNC_PERIOD="${TEST_CLUSTER_RESYNC_PERIOD:---min-resync-period=3m}" -KUBELET_TEST_ARGS="--max-pods=110 --serialize-image-pulls=false" +KUBELET_TEST_ARGS="--max-pods=110 --serialize-image-pulls=false --outofdisk-transition-frequency=0" APISERVER_TEST_ARGS="--runtime-config=extensions/v1beta1" CONTROLLER_MANAGER_TEST_ARGS="${TEST_CLUSTER_RESYNC_PERIOD}" SCHEDULER_TEST_ARGS="" diff --git a/cmd/integration/integration.go b/cmd/integration/integration.go index 5af5033036d..ca5804a6751 100644 --- a/cmd/integration/integration.go +++ b/cmd/integration/integration.go @@ -222,6 +222,7 @@ func startComponents(firstManifestURL, secondManifestURL string) (string, string 10*time.Second, /* MinimumGCAge */ 3*time.Second, /* NodeStatusUpdateFrequency */ 10*time.Second, /* SyncFrequency */ + 10*time.Second, /* OutOfDiskTransitionFrequency */ 40, /* MaxPods */ cm, net.ParseIP("127.0.0.1")) @@ -254,6 +255,7 @@ func startComponents(firstManifestURL, secondManifestURL string) (string, string 10*time.Second, /* MinimumGCAge */ 3*time.Second, /* NodeStatusUpdateFrequency */ 10*time.Second, /* SyncFrequency */ + 10*time.Second, /* OutOfDiskTransitionFrequency */ 40, /* MaxPods */ cm, diff --git a/cmd/kubelet/app/server.go b/cmd/kubelet/app/server.go index e16e7316dc4..985c7f758af 100644 --- a/cmd/kubelet/app/server.go +++ b/cmd/kubelet/app/server.go @@ -143,6 +143,7 @@ type KubeletServer struct { TLSCertFile string TLSPrivateKeyFile string ReconcileCIDR bool + OutOfDiskTransitionFrequency time.Duration // Flags intended for testing // Is the kubelet containerized? @@ -233,6 +234,7 @@ func NewKubeletServer() *KubeletServer { ReconcileCIDR: true, KubeAPIQPS: 5.0, KubeAPIBurst: 10, + OutOfDiskTransitionFrequency: 5 * time.Minute, ExperimentalFlannelOverlay: experimentalFlannelOverlay, } } @@ -349,6 +351,7 @@ func (s *KubeletServer) AddFlags(fs *pflag.FlagSet) { fs.Float32Var(&s.KubeAPIQPS, "kube-api-qps", s.KubeAPIQPS, "QPS to use while talking with kubernetes apiserver") fs.IntVar(&s.KubeAPIBurst, "kube-api-burst", s.KubeAPIBurst, "Burst to use while talking with kubernetes apiserver") fs.BoolVar(&s.SerializeImagePulls, "serialize-image-pulls", s.SerializeImagePulls, "Pull images one at a time. We recommend *not* changing the default value on nodes that run docker daemon with version < 1.9 or an Aufs storage backend. Issue #10959 has more details. [default=true]") + fs.DurationVar(&s.OutOfDiskTransitionFrequency, "outofdisk-transition-frequency", s.OutOfDiskTransitionFrequency, "Duration for which the kubelet has to wait before transitioning out of out-of-disk node condition status. Default: 5m0s") fs.BoolVar(&s.ExperimentalFlannelOverlay, "experimental-flannel-overlay", s.ExperimentalFlannelOverlay, "Experimental support for starting the kubelet with the default overlay network (flannel). Assumes flanneld is already running in client mode. [default=false]") fs.IPVar(&s.NodeIP, "node-ip", s.NodeIP, "IP address of the node. If set, kubelet will use this IP address for the node") } @@ -488,6 +491,7 @@ func (s *KubeletServer) UnsecuredKubeletConfig() (*KubeletConfig, error) { TLSOptions: tlsOptions, Writer: writer, VolumePlugins: ProbeVolumePlugins(s.VolumePluginDir), + OutOfDiskTransitionFrequency: s.OutOfDiskTransitionFrequency, ExperimentalFlannelOverlay: s.ExperimentalFlannelOverlay, NodeIP: s.NodeIP, @@ -706,7 +710,7 @@ func SimpleKubelet(client *client.Client, configFilePath string, cloud cloudprovider.Interface, osInterface kubecontainer.OSInterface, - fileCheckFrequency, httpCheckFrequency, minimumGCAge, nodeStatusUpdateFrequency, syncFrequency time.Duration, + fileCheckFrequency, httpCheckFrequency, minimumGCAge, nodeStatusUpdateFrequency, syncFrequency, outOfDiskTransitionFrequency time.Duration, maxPods int, containerManager cm.ContainerManager, clusterDNS net.IP) *KubeletConfig { imageGCPolicy := kubelet.ImageGCPolicy{ @@ -768,6 +772,7 @@ func SimpleKubelet(client *client.Client, TLSOptions: tlsOptions, VolumePlugins: volumePlugins, Writer: &io.StdWriter{}, + OutOfDiskTransitionFrequency: outOfDiskTransitionFrequency, } return &kcfg } @@ -965,6 +970,7 @@ type KubeletConfig struct { TLSOptions *server.TLSOptions Writer io.Writer VolumePlugins []volume.VolumePlugin + OutOfDiskTransitionFrequency time.Duration ExperimentalFlannelOverlay bool NodeIP net.IP @@ -1050,6 +1056,7 @@ func CreateAndInitKubelet(kc *KubeletConfig) (k KubeletBootstrap, pc *config.Pod kc.OOMAdjuster, kc.SerializeImagePulls, kc.ContainerManager, + kc.OutOfDiskTransitionFrequency, kc.ExperimentalFlannelOverlay, kc.NodeIP, ) diff --git a/docs/admin/kubelet.md b/docs/admin/kubelet.md index 0beebf21876..5ad25b3a78f 100644 --- a/docs/admin/kubelet.md +++ b/docs/admin/kubelet.md @@ -118,6 +118,7 @@ kubelet --node-labels-file="": the path to a yaml or json file containing a series of key pair labels to apply on node registration --node-status-update-frequency=10s: Specifies how often kubelet posts node status to master. Note: be cautious when changing the constant, it must work with nodeMonitorGracePeriod in nodecontroller. Default: 10s --oom-score-adj=-999: The oom-score-adj value for kubelet process. Values must be within the range [-1000, 1000] + --outofdisk-transition-frequency=5m0s: Duration for which the kubelet has to wait before transitioning out of out-of-disk node condition status. Default: 5m0s --pod-cidr="": The CIDR to use for pod IP addresses, only used in standalone mode. In cluster mode, this is obtained from the master. --pod-infra-container-image="gcr.io/google_containers/pause:2.0": The image whose network/ipc namespaces containers in each pod will use. --port=10250: The port for the Kubelet to serve on. @@ -143,7 +144,7 @@ kubelet --volume-plugin-dir="/usr/libexec/kubernetes/kubelet-plugins/volume/exec/": The full path of the directory in which to search for additional third party volume plugins ``` -###### Auto generated by spf13/cobra on 11-Dec-2015 +###### Auto generated by spf13/cobra on 29-Dec-2015 diff --git a/docs/api-reference/extensions/v1beta1/definitions.html b/docs/api-reference/extensions/v1beta1/definitions.html index fc2b7ed9b59..83d162c9aa4 100755 --- a/docs/api-reference/extensions/v1beta1/definitions.html +++ b/docs/api-reference/extensions/v1beta1/definitions.html @@ -4361,7 +4361,7 @@ Populated by the system when a graceful deletion is requested. Read-only. More i diff --git a/docs/api-reference/extensions/v1beta1/operations.html b/docs/api-reference/extensions/v1beta1/operations.html index b6191c446fd..9b0f95841e3 100755 --- a/docs/api-reference/extensions/v1beta1/operations.html +++ b/docs/api-reference/extensions/v1beta1/operations.html @@ -5578,7 +5578,7 @@ span.icon > [class^="icon-"], span.icon > [class*=" icon-"] { cursor: default; } diff --git a/docs/api-reference/v1/definitions.html b/docs/api-reference/v1/definitions.html index a1fbc1ae1ac..4cb332f96eb 100755 --- a/docs/api-reference/v1/definitions.html +++ b/docs/api-reference/v1/definitions.html @@ -7001,7 +7001,7 @@ The resulting set of endpoints can be viewed as:
diff --git a/docs/api-reference/v1/operations.html b/docs/api-reference/v1/operations.html index d5ae5f80232..532f1f224ab 100755 --- a/docs/api-reference/v1/operations.html +++ b/docs/api-reference/v1/operations.html @@ -25733,7 +25733,7 @@ span.icon > [class^="icon-"], span.icon > [class*=" icon-"] { cursor: default; } diff --git a/hack/verify-flags/known-flags.txt b/hack/verify-flags/known-flags.txt index bfbc0225cbb..aabf2f6fe26 100644 --- a/hack/verify-flags/known-flags.txt +++ b/hack/verify-flags/known-flags.txt @@ -238,6 +238,7 @@ oidc-client-id oidc-issuer-url oidc-username-claim oom-score-adj +outofdisk-transition-frequency output-base output-package output-print-type diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index d206e0409b3..ae2c7dd27b4 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -196,10 +196,10 @@ func NewMainKubelet( oomAdjuster *oom.OOMAdjuster, serializeImagePulls bool, containerManager cm.ContainerManager, + outOfDiskTransitionFrequency time.Duration, flannelExperimentalOverlay bool, nodeIP net.IP, ) (*Kubelet, error) { - if rootDirectory == "" { return nil, fmt.Errorf("invalid root directory %q", rootDirectory) } @@ -315,6 +315,8 @@ func NewMainKubelet( flannelExperimentalOverlay: flannelExperimentalOverlay, flannelHelper: NewFlannelHelper(), nodeIP: nodeIP, + clock: util.RealClock{}, + outOfDiskTransitionFrequency: outOfDiskTransitionFrequency, } if klet.flannelExperimentalOverlay { glog.Infof("Flannel is in charge of podCIDR and overlay networking.") @@ -419,7 +421,7 @@ func NewMainKubelet( SystemContainerName: systemContainer, KubeletContainerName: resourceContainer, } - klet.runtimeState.setRuntimeSync(time.Now()) + klet.runtimeState.setRuntimeSync(klet.clock.Now()) klet.runner = klet.containerRuntime klet.podManager = kubepod.NewBasicPodManager(kubepod.NewBasicMirrorClient(klet.kubeClient)) @@ -654,6 +656,16 @@ type Kubelet struct { // If non-nil, use this IP address for the node nodeIP net.IP + + // clock is an interface that provides time related functionality in a way that makes it + // easy to test the code. + clock util.Clock + + // outOfDiskTransitionFrequency specifies the amount of time the kubelet has to be actually + // not out of disk before it can transition the node condition status from out-of-disk to + // not-out-of-disk. This prevents a pod that causes out-of-disk condition from repeatedly + // getting rescheduled onto the node. + outOfDiskTransitionFrequency time.Duration } // Validate given node IP belongs to the current host @@ -1614,7 +1626,7 @@ func (kl *Kubelet) makePodDataDirs(pod *api.Pod) error { } func (kl *Kubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecontainer.Pod, updateType kubetypes.SyncPodType) (syncErr error) { - start := time.Now() + start := kl.clock.Now() var firstSeenTime time.Time if firstSeenTimeStr, ok := pod.Annotations[kubetypes.ConfigFirstSeenAnnotationKey]; !ok { glog.V(3).Infof("First seen time not recorded for pod %q", pod.UID) @@ -1945,7 +1957,6 @@ func (kl *Kubelet) cleanupTerminatedPods(pods []*api.Pod, runningPods []*kubecon // pastActiveDeadline returns true if the pod has been active for more than // ActiveDeadlineSeconds. func (kl *Kubelet) pastActiveDeadline(pod *api.Pod) bool { - now := unversioned.Now() if pod.Spec.ActiveDeadlineSeconds != nil { podStatus, ok := kl.statusManager.GetPodStatus(pod.UID) if !ok { @@ -1953,7 +1964,7 @@ func (kl *Kubelet) pastActiveDeadline(pod *api.Pod) bool { } if !podStatus.StartTime.IsZero() { startTime := podStatus.StartTime.Time - duration := now.Time.Sub(startTime) + duration := kl.clock.Since(startTime) allowedDuration := time.Duration(*pod.Spec.ActiveDeadlineSeconds) * time.Second if duration >= allowedDuration { return true @@ -2315,7 +2326,7 @@ func (kl *Kubelet) syncLoop(updates <-chan kubetypes.PodUpdate, handler SyncHand func (kl *Kubelet) syncLoopIteration(updates <-chan kubetypes.PodUpdate, handler SyncHandler, syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool { - kl.syncLoopMonitor.Store(time.Now()) + kl.syncLoopMonitor.Store(kl.clock.Now()) select { case u, open := <-updates: if !open { @@ -2351,7 +2362,7 @@ func (kl *Kubelet) syncLoopIteration(updates <-chan kubetypes.PodUpdate, handler } glog.V(2).Infof("SyncLoop (PLEG): %q, event: %#v", format.Pod(pod), e) // Force the container runtime cache to update. - if err := kl.runtimeCache.ForceUpdateIfOlder(time.Now()); err != nil { + if err := kl.runtimeCache.ForceUpdateIfOlder(kl.clock.Now()); err != nil { glog.Errorf("SyncLoop: unable to update runtime cache") // TODO (yujuhong): should we delay the sync until container // runtime can be updated? @@ -2382,7 +2393,7 @@ func (kl *Kubelet) syncLoopIteration(updates <-chan kubetypes.PodUpdate, handler } } } - kl.syncLoopMonitor.Store(time.Now()) + kl.syncLoopMonitor.Store(kl.clock.Now()) return true } @@ -2411,7 +2422,7 @@ func (kl *Kubelet) handleMirrorPod(mirrorPod *api.Pod, start time.Time) { } func (kl *Kubelet) HandlePodAdditions(pods []*api.Pod) { - start := time.Now() + start := kl.clock.Now() sort.Sort(podsByCreationTime(pods)) for _, pod := range pods { kl.podManager.AddPod(pod) @@ -2437,7 +2448,7 @@ func (kl *Kubelet) HandlePodAdditions(pods []*api.Pod) { } func (kl *Kubelet) HandlePodUpdates(pods []*api.Pod) { - start := time.Now() + start := kl.clock.Now() for _, pod := range pods { kl.podManager.UpdatePod(pod) if kubepod.IsMirrorPod(pod) { @@ -2452,7 +2463,7 @@ func (kl *Kubelet) HandlePodUpdates(pods []*api.Pod) { } func (kl *Kubelet) HandlePodDeletions(pods []*api.Pod) { - start := time.Now() + start := kl.clock.Now() for _, pod := range pods { kl.podManager.DeletePod(pod) if kubepod.IsMirrorPod(pod) { @@ -2469,7 +2480,7 @@ func (kl *Kubelet) HandlePodDeletions(pods []*api.Pod) { } func (kl *Kubelet) HandlePodSyncs(pods []*api.Pod) { - start := time.Now() + start := kl.clock.Now() for _, pod := range pods { mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod) kl.dispatchWork(pod, kubetypes.SyncPodSync, mirrorPod, start) @@ -2617,7 +2628,7 @@ func (kl *Kubelet) updateRuntimeUp() { return } kl.oneTimeInitializer.Do(kl.initializeRuntimeDependentModules) - kl.runtimeState.setRuntimeSync(time.Now()) + kl.runtimeState.setRuntimeSync(kl.clock.Now()) } func (kl *Kubelet) reconcileCBR0(podCIDR string) error { @@ -2821,7 +2832,7 @@ func (kl *Kubelet) setNodeReadyCondition(node *api.Node) { // NOTE(aaronlevy): NodeReady condition needs to be the last in the list of node conditions. // This is due to an issue with version skewed kubelet and master components. // ref: https://github.com/kubernetes/kubernetes/issues/16961 - currentTime := unversioned.Now() + currentTime := unversioned.NewTime(kl.clock.Now()) var newNodeReadyCondition api.NodeCondition if rs := kl.runtimeState.errors(); len(rs) == 0 { newNodeReadyCondition = api.NodeCondition{ @@ -2871,7 +2882,7 @@ func (kl *Kubelet) setNodeReadyCondition(node *api.Node) { // Set OODcondition for the node. func (kl *Kubelet) setNodeOODCondition(node *api.Node) { - currentTime := unversioned.Now() + currentTime := unversioned.NewTime(kl.clock.Now()) var nodeOODCondition *api.NodeCondition // Check if NodeOutOfDisk condition already exists and if it does, just pick it up for update. @@ -2885,9 +2896,8 @@ func (kl *Kubelet) setNodeOODCondition(node *api.Node) { // If the NodeOutOfDisk condition doesn't exist, create one. if nodeOODCondition == nil { nodeOODCondition = &api.NodeCondition{ - Type: api.NodeOutOfDisk, - Status: api.ConditionUnknown, - LastTransitionTime: currentTime, + Type: api.NodeOutOfDisk, + Status: api.ConditionUnknown, } // nodeOODCondition cannot be appended to node.Status.Conditions here because it gets // copied to the slice. So if we append nodeOODCondition to the slice here none of the @@ -2914,11 +2924,18 @@ func (kl *Kubelet) setNodeOODCondition(node *api.Node) { } } else { if nodeOODCondition.Status != api.ConditionFalse { - nodeOODCondition.Status = api.ConditionFalse - nodeOODCondition.Reason = "KubeletHasSufficientDisk" - nodeOODCondition.Message = "kubelet has sufficient disk space available" - nodeOODCondition.LastTransitionTime = currentTime - kl.recordNodeStatusEvent(api.EventTypeNormal, "NodeHasSufficientDisk") + // Update the out of disk condition when the condition status is unknown even if we + // are within the outOfDiskTransitionFrequency duration. We do this to set the + // condition status correctly at kubelet startup. + if nodeOODCondition.Status == api.ConditionUnknown || kl.clock.Since(nodeOODCondition.LastTransitionTime.Time) >= kl.outOfDiskTransitionFrequency { + nodeOODCondition.Status = api.ConditionFalse + nodeOODCondition.Reason = "KubeletHasSufficientDisk" + nodeOODCondition.Message = "kubelet has sufficient disk space available" + nodeOODCondition.LastTransitionTime = currentTime + kl.recordNodeStatusEvent(api.EventTypeNormal, "NodeHasSufficientDisk") + } else { + glog.Infof("Node condition status for OutOfDisk is false, but last transition time is less than %s", kl.outOfDiskTransitionFrequency) + } } } @@ -3092,7 +3109,7 @@ func GetPhase(spec *api.PodSpec, info []api.ContainerStatus) api.PodPhase { // after refactoring, modify them later. func (kl *Kubelet) generatePodStatus(pod *api.Pod) (api.PodStatus, error) { - start := time.Now() + start := kl.clock.Now() defer func() { metrics.PodStatusLatency.Observe(metrics.SinceInMicroseconds(start)) }() diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index 327387473c6..4cf4ec15dd8 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -85,6 +85,7 @@ type TestKubelet struct { fakeCadvisor *cadvisor.Mock fakeKubeClient *testclient.Fake fakeMirrorClient *kubepod.FakeMirrorClient + fakeClock *util.FakeClock } func newTestKubelet(t *testing.T) *TestKubelet { @@ -151,7 +152,8 @@ func newTestKubelet(t *testing.T) *TestKubelet { kubelet.workQueue = queue.NewBasicWorkQueue() // Relist period does not affect the tests. kubelet.pleg = pleg.NewGenericPLEG(fakeRuntime, 100, time.Hour) - return &TestKubelet{kubelet, fakeRuntime, mockCadvisor, fakeKubeClient, fakeMirrorClient} + kubelet.clock = fakeClock + return &TestKubelet{kubelet, fakeRuntime, mockCadvisor, fakeKubeClient, fakeMirrorClient, fakeClock} } func newTestPods(count int) []*api.Pod { @@ -2465,6 +2467,26 @@ func TestValidateContainerStatus(t *testing.T) { } } +// updateDiskSpacePolicy creates a new DiskSpaceManager with a new policy. This new manager along +// with the mock FsInfo values added to Cadvisor should make the kubelet report that it has +// sufficient disk space or it is out of disk, depending on the capacity, availability and +// threshold values. +func updateDiskSpacePolicy(kubelet *Kubelet, mockCadvisor *cadvisor.Mock, rootCap, dockerCap, rootAvail, dockerAvail uint64, rootThreshold, dockerThreshold int) error { + dockerimagesFsInfo := cadvisorapiv2.FsInfo{Capacity: rootCap * mb, Available: rootAvail * mb} + rootFsInfo := cadvisorapiv2.FsInfo{Capacity: dockerCap * mb, Available: dockerAvail * mb} + mockCadvisor.On("DockerImagesFsInfo").Return(dockerimagesFsInfo, nil) + mockCadvisor.On("RootFsInfo").Return(rootFsInfo, nil) + + dsp := DiskSpacePolicy{DockerFreeDiskMB: rootThreshold, RootFreeDiskMB: dockerThreshold} + diskSpaceManager, err := newDiskSpaceManager(mockCadvisor, dsp) + if err != nil { + return err + } + diskSpaceManager.Unfreeze() + kubelet.diskSpaceManager = diskSpaceManager + return nil +} + func TestUpdateNewNodeStatus(t *testing.T) { testKubelet := newTestKubelet(t) kubelet := testKubelet.kubelet @@ -2489,21 +2511,10 @@ func TestUpdateNewNodeStatus(t *testing.T) { } mockCadvisor.On("VersionInfo").Return(versionInfo, nil) - // Create a new DiskSpaceManager with a new policy. This new manager along with the mock - // FsInfo values added to Cadvisor should make the kubelet report that it has sufficient - // disk space. - dockerimagesFsInfo := cadvisorapiv2.FsInfo{Capacity: 500 * mb, Available: 200 * mb} - rootFsInfo := cadvisorapiv2.FsInfo{Capacity: 500 * mb, Available: 200 * mb} - mockCadvisor.On("DockerImagesFsInfo").Return(dockerimagesFsInfo, nil) - mockCadvisor.On("RootFsInfo").Return(rootFsInfo, nil) - - dsp := DiskSpacePolicy{DockerFreeDiskMB: 100, RootFreeDiskMB: 100} - diskSpaceManager, err := newDiskSpaceManager(mockCadvisor, dsp) - if err != nil { + // Make kubelet report that it has sufficient disk space. + if err := updateDiskSpacePolicy(kubelet, mockCadvisor, 500, 500, 200, 200, 100, 100); err != nil { t.Fatalf("can't update disk space manager: %v", err) } - diskSpaceManager.Unfreeze() - kubelet.diskSpaceManager = diskSpaceManager expectedNode := &api.Node{ ObjectMeta: api.ObjectMeta{Name: testKubeletHostname}, @@ -2585,6 +2596,82 @@ func TestUpdateNewNodeStatus(t *testing.T) { } } +func TestUpdateNewNodeOutOfDiskStatusWithTransitionFrequency(t *testing.T) { + testKubelet := newTestKubelet(t) + kubelet := testKubelet.kubelet + kubeClient := testKubelet.fakeKubeClient + kubeClient.ReactionChain = testclient.NewSimpleFake(&api.NodeList{Items: []api.Node{ + {ObjectMeta: api.ObjectMeta{Name: testKubeletHostname}}, + }}).ReactionChain + machineInfo := &cadvisorapi.MachineInfo{ + MachineID: "123", + SystemUUID: "abc", + BootID: "1b3", + NumCores: 2, + MemoryCapacity: 1024, + } + mockCadvisor := testKubelet.fakeCadvisor + mockCadvisor.On("Start").Return(nil) + mockCadvisor.On("MachineInfo").Return(machineInfo, nil) + versionInfo := &cadvisorapi.VersionInfo{ + KernelVersion: "3.16.0-0.bpo.4-amd64", + ContainerOsVersion: "Debian GNU/Linux 7 (wheezy)", + DockerVersion: "1.5.0", + } + mockCadvisor.On("VersionInfo").Return(versionInfo, nil) + + // Make Kubelet report that it has sufficient disk space. + if err := updateDiskSpacePolicy(kubelet, mockCadvisor, 500, 500, 200, 200, 100, 100); err != nil { + t.Fatalf("can't update disk space manager: %v", err) + } + + kubelet.outOfDiskTransitionFrequency = 10 * time.Second + + expectedNodeOutOfDiskCondition := api.NodeCondition{ + Type: api.NodeOutOfDisk, + Status: api.ConditionFalse, + Reason: "KubeletHasSufficientDisk", + Message: fmt.Sprintf("kubelet has sufficient disk space available"), + LastHeartbeatTime: unversioned.Time{}, + LastTransitionTime: unversioned.Time{}, + } + + kubelet.updateRuntimeUp() + if err := kubelet.updateNodeStatus(); err != nil { + t.Errorf("unexpected error: %v", err) + } + actions := kubeClient.Actions() + if len(actions) != 2 { + t.Fatalf("unexpected actions: %v", actions) + } + if !actions[1].Matches("update", "nodes") || actions[1].GetSubresource() != "status" { + t.Fatalf("unexpected actions: %v", actions) + } + updatedNode, ok := actions[1].(testclient.UpdateAction).GetObject().(*api.Node) + if !ok { + t.Errorf("unexpected object type") + } + + var oodCondition api.NodeCondition + for i, cond := range updatedNode.Status.Conditions { + if cond.LastHeartbeatTime.IsZero() { + t.Errorf("unexpected zero last probe timestamp for %v condition", cond.Type) + } + if cond.LastTransitionTime.IsZero() { + t.Errorf("unexpected zero last transition timestamp for %v condition", cond.Type) + } + updatedNode.Status.Conditions[i].LastHeartbeatTime = unversioned.Time{} + updatedNode.Status.Conditions[i].LastTransitionTime = unversioned.Time{} + if cond.Type == api.NodeOutOfDisk { + oodCondition = updatedNode.Status.Conditions[i] + } + } + + if !reflect.DeepEqual(expectedNodeOutOfDiskCondition, oodCondition) { + t.Errorf("unexpected objects: %s", util.ObjectDiff(expectedNodeOutOfDiskCondition, oodCondition)) + } +} + // FIXME: Enable me.. func testDockerRuntimeVersion(t *testing.T) { testKubelet := newTestKubelet(t) @@ -2611,20 +2698,11 @@ func testDockerRuntimeVersion(t *testing.T) { DockerVersion: "1.5.0", } mockCadvisor.On("VersionInfo").Return(versionInfo, nil) - // Create a new DiskSpaceManager with a new policy. This new manager along with the mock - // FsInfo values added to Cadvisor should make the kubelet report that it has sufficient - // disk space. - dockerimagesFsInfo := cadvisorapiv2.FsInfo{Capacity: 500 * mb, Available: 200 * mb} - rootFsInfo := cadvisorapiv2.FsInfo{Capacity: 500 * mb, Available: 200 * mb} - mockCadvisor.On("DockerImagesFsInfo").Return(dockerimagesFsInfo, nil) - mockCadvisor.On("RootFsInfo").Return(rootFsInfo, nil) - dsp := DiskSpacePolicy{DockerFreeDiskMB: 100, RootFreeDiskMB: 100} - diskSpaceManager, err := newDiskSpaceManager(mockCadvisor, dsp) - if err != nil { + + // Make kubelet report that it has sufficient disk space. + if err := updateDiskSpacePolicy(kubelet, mockCadvisor, 500, 500, 200, 200, 100, 100); err != nil { t.Fatalf("can't update disk space manager: %v", err) } - diskSpaceManager.Unfreeze() - kubelet.diskSpaceManager = diskSpaceManager expectedNode := &api.Node{ ObjectMeta: api.ObjectMeta{Name: testKubeletHostname}, @@ -2781,20 +2859,10 @@ func TestUpdateExistingNodeStatus(t *testing.T) { } mockCadvisor.On("VersionInfo").Return(versionInfo, nil) - // Create a new DiskSpaceManager with a new policy. This new manager along with the mock FsInfo - // values added to Cadvisor should make the kubelet report that it is out of disk space. - dockerimagesFsInfo := cadvisorapiv2.FsInfo{Capacity: 500 * mb, Available: 70 * mb} - rootFsInfo := cadvisorapiv2.FsInfo{Capacity: 500 * mb, Available: 50 * mb} - mockCadvisor.On("DockerImagesFsInfo").Return(dockerimagesFsInfo, nil) - mockCadvisor.On("RootFsInfo").Return(rootFsInfo, nil) - - dsp := DiskSpacePolicy{DockerFreeDiskMB: 100, RootFreeDiskMB: 100} - diskSpaceManager, err := newDiskSpaceManager(mockCadvisor, dsp) - if err != nil { + // Make kubelet report that it is out of disk space. + if err := updateDiskSpacePolicy(kubelet, mockCadvisor, 500, 500, 50, 50, 100, 100); err != nil { t.Fatalf("can't update disk space manager: %v", err) } - diskSpaceManager.Unfreeze() - kubelet.diskSpaceManager = diskSpaceManager expectedNode := &api.Node{ ObjectMeta: api.ObjectMeta{Name: testKubeletHostname}, @@ -2878,6 +2946,158 @@ func TestUpdateExistingNodeStatus(t *testing.T) { } } +func TestUpdateExistingNodeOutOfDiskStatusWithTransitionFrequency(t *testing.T) { + testKubelet := newTestKubelet(t) + kubelet := testKubelet.kubelet + clock := testKubelet.fakeClock + kubeClient := testKubelet.fakeKubeClient + kubeClient.ReactionChain = testclient.NewSimpleFake(&api.NodeList{Items: []api.Node{ + { + ObjectMeta: api.ObjectMeta{Name: testKubeletHostname}, + Spec: api.NodeSpec{}, + Status: api.NodeStatus{ + Conditions: []api.NodeCondition{ + { + Type: api.NodeReady, + Status: api.ConditionTrue, + Reason: "KubeletReady", + Message: fmt.Sprintf("kubelet is posting ready status"), + LastHeartbeatTime: unversioned.NewTime(clock.Now()), + LastTransitionTime: unversioned.NewTime(clock.Now()), + }, + { + + Type: api.NodeOutOfDisk, + Status: api.ConditionTrue, + Reason: "KubeletOutOfDisk", + Message: "out of disk space", + LastHeartbeatTime: unversioned.NewTime(clock.Now()), + LastTransitionTime: unversioned.NewTime(clock.Now()), + }, + }, + }, + }, + }}).ReactionChain + mockCadvisor := testKubelet.fakeCadvisor + machineInfo := &cadvisorapi.MachineInfo{ + MachineID: "123", + SystemUUID: "abc", + BootID: "1b3", + NumCores: 2, + MemoryCapacity: 1024, + } + mockCadvisor.On("Start").Return(nil) + mockCadvisor.On("MachineInfo").Return(machineInfo, nil) + versionInfo := &cadvisorapi.VersionInfo{ + KernelVersion: "3.16.0-0.bpo.4-amd64", + ContainerOsVersion: "Debian GNU/Linux 7 (wheezy)", + DockerVersion: "1.5.0", + } + mockCadvisor.On("VersionInfo").Return(versionInfo, nil) + + kubelet.outOfDiskTransitionFrequency = 5 * time.Second + + ood := api.NodeCondition{ + Type: api.NodeOutOfDisk, + Status: api.ConditionTrue, + Reason: "KubeletOutOfDisk", + Message: "out of disk space", + LastHeartbeatTime: unversioned.NewTime(clock.Now()), // placeholder + LastTransitionTime: unversioned.NewTime(clock.Now()), // placeholder + } + noOod := api.NodeCondition{ + Type: api.NodeOutOfDisk, + Status: api.ConditionFalse, + Reason: "KubeletHasSufficientDisk", + Message: fmt.Sprintf("kubelet has sufficient disk space available"), + LastHeartbeatTime: unversioned.NewTime(clock.Now()), // placeholder + LastTransitionTime: unversioned.NewTime(clock.Now()), // placeholder + } + + testCases := []struct { + rootFsAvail uint64 + dockerFsAvail uint64 + expected api.NodeCondition + }{ + { + // NodeOutOfDisk==false + rootFsAvail: 200, + dockerFsAvail: 200, + expected: ood, + }, + { + // NodeOutOfDisk==true + rootFsAvail: 50, + dockerFsAvail: 200, + expected: ood, + }, + { + // NodeOutOfDisk==false + rootFsAvail: 200, + dockerFsAvail: 200, + expected: ood, + }, + { + // NodeOutOfDisk==true + rootFsAvail: 200, + dockerFsAvail: 50, + expected: ood, + }, + { + // NodeOutOfDisk==false + rootFsAvail: 200, + dockerFsAvail: 200, + expected: noOod, + }, + } + + kubelet.updateRuntimeUp() + for tcIdx, tc := range testCases { + // Step by a second + clock.Step(1 * time.Second) + + // Setup expected times. + tc.expected.LastHeartbeatTime = unversioned.NewTime(clock.Now()) + // In the last case, there should be a status transition for NodeOutOfDisk + if tcIdx == len(testCases)-1 { + tc.expected.LastTransitionTime = unversioned.NewTime(clock.Now()) + } + + // Make kubelet report that it has sufficient disk space + if err := updateDiskSpacePolicy(kubelet, mockCadvisor, 500, 500, tc.rootFsAvail, tc.dockerFsAvail, 100, 100); err != nil { + t.Fatalf("can't update disk space manager: %v", err) + } + + if err := kubelet.updateNodeStatus(); err != nil { + t.Errorf("unexpected error: %v", err) + } + actions := kubeClient.Actions() + if len(actions) != 2 { + t.Errorf("%d. unexpected actions: %v", tcIdx, actions) + } + updateAction, ok := actions[1].(testclient.UpdateAction) + if !ok { + t.Errorf("%d. unexpected action type. expected UpdateAction, got %#v", tcIdx, actions[1]) + } + updatedNode, ok := updateAction.GetObject().(*api.Node) + if !ok { + t.Errorf("%d. unexpected object type", tcIdx) + } + kubeClient.ClearActions() + + var oodCondition api.NodeCondition + for i, cond := range updatedNode.Status.Conditions { + if cond.Type == api.NodeOutOfDisk { + oodCondition = updatedNode.Status.Conditions[i] + } + } + + if !reflect.DeepEqual(tc.expected, oodCondition) { + t.Errorf("%d.\nwant \n%v\n, got \n%v", tcIdx, tc.expected, oodCondition) + } + } +} + func TestUpdateNodeStatusWithoutContainerRuntime(t *testing.T) { testKubelet := newTestKubelet(t) kubelet := testKubelet.kubelet @@ -2907,21 +3127,10 @@ func TestUpdateNodeStatusWithoutContainerRuntime(t *testing.T) { } mockCadvisor.On("VersionInfo").Return(versionInfo, nil) - // Create a new DiskSpaceManager with a new policy. This new manager along with the - // mock FsInfo values assigned to Cadvisor should make the kubelet report that it has - // sufficient disk space. - dockerimagesFsInfo := cadvisorapiv2.FsInfo{Capacity: 500 * mb, Available: 200 * mb} - rootFsInfo := cadvisorapiv2.FsInfo{Capacity: 500 * mb, Available: 200 * mb} - mockCadvisor.On("DockerImagesFsInfo").Return(dockerimagesFsInfo, nil) - mockCadvisor.On("RootFsInfo").Return(rootFsInfo, nil) - - dsp := DiskSpacePolicy{DockerFreeDiskMB: 100, RootFreeDiskMB: 100} - diskSpaceManager, err := newDiskSpaceManager(mockCadvisor, dsp) - if err != nil { + // Make kubelet report that it has sufficient disk space. + if err := updateDiskSpacePolicy(kubelet, mockCadvisor, 500, 500, 200, 200, 100, 100); err != nil { t.Fatalf("can't update disk space manager: %v", err) } - diskSpaceManager.Unfreeze() - kubelet.diskSpaceManager = diskSpaceManager expectedNode := &api.Node{ ObjectMeta: api.ObjectMeta{Name: testKubeletHostname}, diff --git a/pkg/kubemark/hollow_kubelet.go b/pkg/kubemark/hollow_kubelet.go index aa04c72f31b..21aa44a39ed 100644 --- a/pkg/kubemark/hollow_kubelet.go +++ b/pkg/kubemark/hollow_kubelet.go @@ -70,6 +70,7 @@ func NewHollowKubelet( 1*time.Minute, /* MinimumGCAge */ 10*time.Second, /* NodeStatusUpdateFrequency */ 10*time.Second, /* SyncFrequency */ + 5*time.Minute, /* OutOfDiskTransitionFrequency */ 40, /* MaxPods */ containerManager, nil,