From 4846c1e1b22f4e7557a1c22b10cf77a3f5fd284c Mon Sep 17 00:00:00 2001 From: Yu-Ju Hong Date: Tue, 1 Mar 2016 17:46:11 -0800 Subject: [PATCH] pleg: add an internal clock for testability Also add tests for the health check. --- pkg/kubelet/kubelet.go | 2 +- pkg/kubelet/kubelet_test.go | 2 +- pkg/kubelet/pleg/generic.go | 12 ++++++++---- pkg/kubelet/pleg/generic_test.go | 25 ++++++++++++++++++++++++- 4 files changed, 34 insertions(+), 7 deletions(-) diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 5a45ad676ec..d0d27690f90 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -447,7 +447,7 @@ func NewMainKubelet( return nil, fmt.Errorf("unsupported container runtime %q specified", containerRuntime) } - klet.pleg = pleg.NewGenericPLEG(klet.containerRuntime, plegChannelCapacity, plegRelistPeriod, klet.podCache) + klet.pleg = pleg.NewGenericPLEG(klet.containerRuntime, plegChannelCapacity, plegRelistPeriod, klet.podCache, util.RealClock{}) klet.runtimeState = newRuntimeState(maxWaitForContainerRuntime, configureCBR0, klet.isContainerRuntimeVersionCompatible) klet.updatePodCIDR(podCIDR) diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index 2ca80900e6a..db0dbc04946 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -194,7 +194,7 @@ func newTestKubelet(t *testing.T) *TestKubelet { } kubelet.workQueue = queue.NewBasicWorkQueue() // Relist period does not affect the tests. - kubelet.pleg = pleg.NewGenericPLEG(fakeRuntime, 100, time.Hour, nil) + kubelet.pleg = pleg.NewGenericPLEG(fakeRuntime, 100, time.Hour, nil, util.RealClock{}) kubelet.clock = fakeClock kubelet.setNodeStatusFuncs = kubelet.defaultNodeStatusFuncs() return &TestKubelet{kubelet, fakeRuntime, mockCadvisor, fakeKubeClient, fakeMirrorClient, fakeClock, nil} diff --git a/pkg/kubelet/pleg/generic.go b/pkg/kubelet/pleg/generic.go index e397a4568b5..7c06b296068 100644 --- a/pkg/kubelet/pleg/generic.go +++ b/pkg/kubelet/pleg/generic.go @@ -24,6 +24,7 @@ import ( kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/metrics" "k8s.io/kubernetes/pkg/types" + "k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util/atomic" "k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/util/wait" @@ -57,6 +58,8 @@ type GenericPLEG struct { relistTime atomic.Value // Cache for storing the runtime states required for syncing pods. cache kubecontainer.Cache + // For testability. + clock util.Clock } // plegContainerState has a one-to-one mapping to the @@ -92,13 +95,14 @@ type podRecord struct { type podRecords map[types.UID]*podRecord func NewGenericPLEG(runtime kubecontainer.Runtime, channelCapacity int, - relistPeriod time.Duration, cache kubecontainer.Cache) PodLifecycleEventGenerator { + relistPeriod time.Duration, cache kubecontainer.Cache, clock util.Clock) PodLifecycleEventGenerator { return &GenericPLEG{ relistPeriod: relistPeriod, runtime: runtime, eventChannel: make(chan *PodLifecycleEvent, channelCapacity), podRecords: make(podRecords), cache: cache, + clock: clock, } } @@ -121,7 +125,7 @@ func (g *GenericPLEG) Healthy() (bool, error) { // relisting time, which can vary significantly. Set a conservative // threshold so that we don't cause kubelet to be restarted unnecessarily. threshold := 2 * time.Minute - if time.Since(relistTime) > threshold { + if g.clock.Since(relistTime) > threshold { return false, fmt.Errorf("pleg was last seen active at %v", relistTime) } return true, nil @@ -178,7 +182,7 @@ func (g *GenericPLEG) relist() { metrics.PLEGRelistInterval.Observe(metrics.SinceInMicroseconds(lastRelistTime)) } - timestamp := time.Now() + timestamp := g.clock.Now() // Update the relist time. g.updateRelisTime(timestamp) defer func() { @@ -287,7 +291,7 @@ func (g *GenericPLEG) updateCache(pod *kubecontainer.Pod, pid types.UID) error { g.cache.Delete(pid) return nil } - timestamp := time.Now() + timestamp := g.clock.Now() // TODO: Consider adding a new runtime method // GetPodStatus(pod *kubecontainer.Pod) so that Docker can avoid listing // all containers again. diff --git a/pkg/kubelet/pleg/generic_test.go b/pkg/kubelet/pleg/generic_test.go index 7174f2ff0b6..d92ba87d21e 100644 --- a/pkg/kubelet/pleg/generic_test.go +++ b/pkg/kubelet/pleg/generic_test.go @@ -37,10 +37,12 @@ const ( type TestGenericPLEG struct { pleg *GenericPLEG runtime *containertest.FakeRuntime + clock *util.FakeClock } func newTestGenericPLEG() *TestGenericPLEG { fakeRuntime := &containertest.FakeRuntime{} + clock := util.NewFakeClock(time.Time{}) // The channel capacity should be large enough to hold all events in a // single test. pleg := &GenericPLEG{ @@ -48,8 +50,9 @@ func newTestGenericPLEG() *TestGenericPLEG { runtime: fakeRuntime, eventChannel: make(chan *PodLifecycleEvent, 100), podRecords: make(podRecords), + clock: clock, } - return &TestGenericPLEG{pleg: pleg, runtime: fakeRuntime} + return &TestGenericPLEG{pleg: pleg, runtime: fakeRuntime, clock: clock} } func getEventsFromChannel(ch <-chan *PodLifecycleEvent) []*PodLifecycleEvent { @@ -222,6 +225,7 @@ func newTestGenericPLEGWithRuntimeMock() (*GenericPLEG, *containertest.Mock) { eventChannel: make(chan *PodLifecycleEvent, 100), podRecords: make(podRecords), cache: kubecontainer.NewCache(), + clock: util.RealClock{}, } return pleg, runtimeMock } @@ -318,3 +322,22 @@ func TestRemoveCacheEntry(t *testing.T) { assert.Equal(t, &kubecontainer.PodStatus{ID: pods[0].ID}, actualStatus) assert.Equal(t, nil, actualErr) } + +func TestHealthy(t *testing.T) { + testPleg := newTestGenericPLEG() + pleg, _, clock := testPleg.pleg, testPleg.runtime, testPleg.clock + ok, _ := pleg.Healthy() + assert.True(t, ok, "pleg should be healthy") + + // Advance the clock without any relisting. + clock.Step(time.Minute * 10) + ok, _ = pleg.Healthy() + assert.False(t, ok, "pleg should be unhealthy") + + // Relist and than advance the time by 1 minute. pleg should be healthy + // because this is within the allowed limit. + pleg.relist() + clock.Step(time.Minute * 1) + ok, _ = pleg.Healthy() + assert.True(t, ok, "pleg should be healthy") +}