From 94368df91a1bb395413f1e7c062f6ec5d5e44f8d Mon Sep 17 00:00:00 2001 From: Yu-Ju Hong Date: Mon, 29 Feb 2016 16:11:48 -0800 Subject: [PATCH 1/2] kubelet: monitor the health of pleg PLEG is reponsible for listing the pods running on the node. If it's hung due to non-responsive container runtime or internal bugs, we should restart kubelet. --- pkg/kubelet/kubelet.go | 4 ++++ pkg/kubelet/pleg/generic.go | 39 ++++++++++++++++++++++++++----- pkg/kubelet/pleg/pleg.go | 1 + pkg/kubelet/server/server.go | 10 ++++++++ pkg/kubelet/server/server_test.go | 17 +++++++++++++- 5 files changed, 64 insertions(+), 7 deletions(-) diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index a03b8f0855c..5a45ad676ec 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -2511,6 +2511,10 @@ func (kl *Kubelet) LatestLoopEntryTime() time.Time { return val.(time.Time) } +func (kl *Kubelet) PLEGHealthCheck() (bool, error) { + return kl.pleg.Healthy() +} + // validateContainerLogStatus returns the container ID for the desired container to retrieve logs for, based on the state // of the container. The previous flag will only return the logs for the the last terminated container, otherwise, the current // running container is preferred over a previous termination. If info about the container is not available then a specific diff --git a/pkg/kubelet/pleg/generic.go b/pkg/kubelet/pleg/generic.go index 1f091e49723..e397a4568b5 100644 --- a/pkg/kubelet/pleg/generic.go +++ b/pkg/kubelet/pleg/generic.go @@ -24,6 +24,7 @@ import ( kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/metrics" "k8s.io/kubernetes/pkg/types" + "k8s.io/kubernetes/pkg/util/atomic" "k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/util/wait" ) @@ -53,7 +54,7 @@ type GenericPLEG struct { // The internal cache for pod/container information. podRecords podRecords // Time of the last relisting. - lastRelistTime time.Time + relistTime atomic.Value // Cache for storing the runtime states required for syncing pods. cache kubecontainer.Cache } @@ -113,6 +114,19 @@ func (g *GenericPLEG) Start() { go wait.Until(g.relist, g.relistPeriod, wait.NeverStop) } +func (g *GenericPLEG) Healthy() (bool, error) { + relistTime := g.getRelistTime() + // TODO: Evaluate if we can reduce this threshold. + // The threshold needs to be greater than the relisting period + the + // relisting time, which can vary significantly. Set a conservative + // threshold so that we don't cause kubelet to be restarted unnecessarily. + threshold := 2 * time.Minute + if time.Since(relistTime) > threshold { + return false, fmt.Errorf("pleg was last seen active at %v", relistTime) + } + return true, nil +} + func generateEvent(podID types.UID, cid string, oldState, newState plegContainerState) *PodLifecycleEvent { if newState == oldState { return nil @@ -143,18 +157,31 @@ func generateEvent(podID types.UID, cid string, oldState, newState plegContainer return nil } +func (g *GenericPLEG) getRelistTime() time.Time { + val := g.relistTime.Load() + if val == nil { + return time.Time{} + } + return val.(time.Time) +} + +func (g *GenericPLEG) updateRelisTime(timestamp time.Time) { + g.relistTime.Store(timestamp) +} + // relist queries the container runtime for list of pods/containers, compare // with the internal pods/containers, and generats events accordingly. func (g *GenericPLEG) relist() { glog.V(5).Infof("GenericPLEG: Relisting") - timestamp := time.Now() - if !g.lastRelistTime.IsZero() { - metrics.PLEGRelistInterval.Observe(metrics.SinceInMicroseconds(g.lastRelistTime)) + if lastRelistTime := g.getRelistTime(); !lastRelistTime.IsZero() { + metrics.PLEGRelistInterval.Observe(metrics.SinceInMicroseconds(lastRelistTime)) } + + timestamp := time.Now() + // Update the relist time. + g.updateRelisTime(timestamp) defer func() { - // Update the relist time. - g.lastRelistTime = timestamp metrics.PLEGRelistLatency.Observe(metrics.SinceInMicroseconds(timestamp)) }() diff --git a/pkg/kubelet/pleg/pleg.go b/pkg/kubelet/pleg/pleg.go index c23472ca925..01798237286 100644 --- a/pkg/kubelet/pleg/pleg.go +++ b/pkg/kubelet/pleg/pleg.go @@ -48,4 +48,5 @@ type PodLifecycleEvent struct { type PodLifecycleEventGenerator interface { Start() Watch() chan *PodLifecycleEvent + Healthy() (bool, error) } diff --git a/pkg/kubelet/server/server.go b/pkg/kubelet/server/server.go index 8246417d87d..0ff3284e935 100644 --- a/pkg/kubelet/server/server.go +++ b/pkg/kubelet/server/server.go @@ -166,6 +166,7 @@ type HostInterface interface { DockerImagesFsInfo() (cadvisorapiv2.FsInfo, error) RootFsInfo() (cadvisorapiv2.FsInfo, error) ListVolumesForPod(podUID types.UID) (map[string]volume.Volume, bool) + PLEGHealthCheck() (bool, error) } // NewServer initializes and configures a kubelet.Server object to handle HTTP requests. @@ -223,6 +224,7 @@ func (s *Server) InstallDefaultHandlers() { healthz.InstallHandler(s.restfulCont, healthz.PingHealthz, healthz.NamedCheck("syncloop", s.syncLoopHealthCheck), + healthz.NamedCheck("pleg", s.plegHealthCheck), ) var ws *restful.WebService ws = new(restful.WebService) @@ -385,6 +387,14 @@ func (s *Server) syncLoopHealthCheck(req *http.Request) error { return nil } +// Checks if pleg, which lists pods periodically, is healthy. +func (s *Server) plegHealthCheck(req *http.Request) error { + if ok, err := s.host.PLEGHealthCheck(); !ok { + return fmt.Errorf("PLEG took longer than expected: %v", err) + } + return nil +} + // getContainerLogs handles containerLogs request against the Kubelet func (s *Server) getContainerLogs(request *restful.Request, response *restful.Response) { podNamespace := request.PathParameter("podNamespace") diff --git a/pkg/kubelet/server/server_test.go b/pkg/kubelet/server/server_test.go index cf5dd1ea7ee..7b5b4fc1e74 100644 --- a/pkg/kubelet/server/server_test.go +++ b/pkg/kubelet/server/server_test.go @@ -67,6 +67,7 @@ type fakeKubelet struct { hostnameFunc func() string resyncInterval time.Duration loopEntryTime time.Time + plegHealth bool } func (fk *fakeKubelet) ResyncInterval() time.Duration { @@ -133,6 +134,8 @@ func (fk *fakeKubelet) StreamingConnectionIdleTimeout() time.Duration { return fk.streamingConnectionIdleTimeoutFunc() } +func (fk *fakeKubelet) PLEGHealthCheck() (bool, error) { return fk.plegHealth, nil } + // Unused functions func (_ *fakeKubelet) GetContainerInfoV2(_ string, _ cadvisorapiv2.RequestOptions) (map[string]cadvisorapiv2.ContainerInfo, error) { return nil, nil @@ -190,6 +193,7 @@ func newServerTest() *serverTestFramework { }, }, true }, + plegHealth: true, } fw.fakeAuth = &fakeAuth{ authenticateFunc: func(req *http.Request) (user.Info, bool, error) { @@ -532,7 +536,7 @@ func TestHealthCheck(t *testing.T) { // Test with correct hostname, Docker version assertHealthIsOk(t, fw.testHTTPServer.URL+"/healthz") - //Test with incorrect hostname + // Test with incorrect hostname fw.fakeKubelet.hostnameFunc = func() string { return "fake" } @@ -738,6 +742,17 @@ func TestSyncLoopCheck(t *testing.T) { assertHealthFails(t, fw.testHTTPServer.URL+"/healthz", http.StatusInternalServerError) } +func TestPLEGHealthCheck(t *testing.T) { + fw := newServerTest() + fw.fakeKubelet.hostnameFunc = func() string { + return "127.0.0.1" + } + + // Test with failed pleg health check. + fw.fakeKubelet.plegHealth = false + assertHealthFails(t, fw.testHTTPServer.URL+"/healthz", http.StatusInternalServerError) +} + // returns http response status code from the HTTP GET func assertHealthIsOk(t *testing.T, httpURL string) { resp, err := http.Get(httpURL) From 4846c1e1b22f4e7557a1c22b10cf77a3f5fd284c Mon Sep 17 00:00:00 2001 From: Yu-Ju Hong Date: Tue, 1 Mar 2016 17:46:11 -0800 Subject: [PATCH 2/2] pleg: add an internal clock for testability Also add tests for the health check. --- pkg/kubelet/kubelet.go | 2 +- pkg/kubelet/kubelet_test.go | 2 +- pkg/kubelet/pleg/generic.go | 12 ++++++++---- pkg/kubelet/pleg/generic_test.go | 25 ++++++++++++++++++++++++- 4 files changed, 34 insertions(+), 7 deletions(-) diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 5a45ad676ec..d0d27690f90 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -447,7 +447,7 @@ func NewMainKubelet( return nil, fmt.Errorf("unsupported container runtime %q specified", containerRuntime) } - klet.pleg = pleg.NewGenericPLEG(klet.containerRuntime, plegChannelCapacity, plegRelistPeriod, klet.podCache) + klet.pleg = pleg.NewGenericPLEG(klet.containerRuntime, plegChannelCapacity, plegRelistPeriod, klet.podCache, util.RealClock{}) klet.runtimeState = newRuntimeState(maxWaitForContainerRuntime, configureCBR0, klet.isContainerRuntimeVersionCompatible) klet.updatePodCIDR(podCIDR) diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index 2ca80900e6a..db0dbc04946 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -194,7 +194,7 @@ func newTestKubelet(t *testing.T) *TestKubelet { } kubelet.workQueue = queue.NewBasicWorkQueue() // Relist period does not affect the tests. - kubelet.pleg = pleg.NewGenericPLEG(fakeRuntime, 100, time.Hour, nil) + kubelet.pleg = pleg.NewGenericPLEG(fakeRuntime, 100, time.Hour, nil, util.RealClock{}) kubelet.clock = fakeClock kubelet.setNodeStatusFuncs = kubelet.defaultNodeStatusFuncs() return &TestKubelet{kubelet, fakeRuntime, mockCadvisor, fakeKubeClient, fakeMirrorClient, fakeClock, nil} diff --git a/pkg/kubelet/pleg/generic.go b/pkg/kubelet/pleg/generic.go index e397a4568b5..7c06b296068 100644 --- a/pkg/kubelet/pleg/generic.go +++ b/pkg/kubelet/pleg/generic.go @@ -24,6 +24,7 @@ import ( kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/metrics" "k8s.io/kubernetes/pkg/types" + "k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util/atomic" "k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/util/wait" @@ -57,6 +58,8 @@ type GenericPLEG struct { relistTime atomic.Value // Cache for storing the runtime states required for syncing pods. cache kubecontainer.Cache + // For testability. + clock util.Clock } // plegContainerState has a one-to-one mapping to the @@ -92,13 +95,14 @@ type podRecord struct { type podRecords map[types.UID]*podRecord func NewGenericPLEG(runtime kubecontainer.Runtime, channelCapacity int, - relistPeriod time.Duration, cache kubecontainer.Cache) PodLifecycleEventGenerator { + relistPeriod time.Duration, cache kubecontainer.Cache, clock util.Clock) PodLifecycleEventGenerator { return &GenericPLEG{ relistPeriod: relistPeriod, runtime: runtime, eventChannel: make(chan *PodLifecycleEvent, channelCapacity), podRecords: make(podRecords), cache: cache, + clock: clock, } } @@ -121,7 +125,7 @@ func (g *GenericPLEG) Healthy() (bool, error) { // relisting time, which can vary significantly. Set a conservative // threshold so that we don't cause kubelet to be restarted unnecessarily. threshold := 2 * time.Minute - if time.Since(relistTime) > threshold { + if g.clock.Since(relistTime) > threshold { return false, fmt.Errorf("pleg was last seen active at %v", relistTime) } return true, nil @@ -178,7 +182,7 @@ func (g *GenericPLEG) relist() { metrics.PLEGRelistInterval.Observe(metrics.SinceInMicroseconds(lastRelistTime)) } - timestamp := time.Now() + timestamp := g.clock.Now() // Update the relist time. g.updateRelisTime(timestamp) defer func() { @@ -287,7 +291,7 @@ func (g *GenericPLEG) updateCache(pod *kubecontainer.Pod, pid types.UID) error { g.cache.Delete(pid) return nil } - timestamp := time.Now() + timestamp := g.clock.Now() // TODO: Consider adding a new runtime method // GetPodStatus(pod *kubecontainer.Pod) so that Docker can avoid listing // all containers again. diff --git a/pkg/kubelet/pleg/generic_test.go b/pkg/kubelet/pleg/generic_test.go index 7174f2ff0b6..d92ba87d21e 100644 --- a/pkg/kubelet/pleg/generic_test.go +++ b/pkg/kubelet/pleg/generic_test.go @@ -37,10 +37,12 @@ const ( type TestGenericPLEG struct { pleg *GenericPLEG runtime *containertest.FakeRuntime + clock *util.FakeClock } func newTestGenericPLEG() *TestGenericPLEG { fakeRuntime := &containertest.FakeRuntime{} + clock := util.NewFakeClock(time.Time{}) // The channel capacity should be large enough to hold all events in a // single test. pleg := &GenericPLEG{ @@ -48,8 +50,9 @@ func newTestGenericPLEG() *TestGenericPLEG { runtime: fakeRuntime, eventChannel: make(chan *PodLifecycleEvent, 100), podRecords: make(podRecords), + clock: clock, } - return &TestGenericPLEG{pleg: pleg, runtime: fakeRuntime} + return &TestGenericPLEG{pleg: pleg, runtime: fakeRuntime, clock: clock} } func getEventsFromChannel(ch <-chan *PodLifecycleEvent) []*PodLifecycleEvent { @@ -222,6 +225,7 @@ func newTestGenericPLEGWithRuntimeMock() (*GenericPLEG, *containertest.Mock) { eventChannel: make(chan *PodLifecycleEvent, 100), podRecords: make(podRecords), cache: kubecontainer.NewCache(), + clock: util.RealClock{}, } return pleg, runtimeMock } @@ -318,3 +322,22 @@ func TestRemoveCacheEntry(t *testing.T) { assert.Equal(t, &kubecontainer.PodStatus{ID: pods[0].ID}, actualStatus) assert.Equal(t, nil, actualErr) } + +func TestHealthy(t *testing.T) { + testPleg := newTestGenericPLEG() + pleg, _, clock := testPleg.pleg, testPleg.runtime, testPleg.clock + ok, _ := pleg.Healthy() + assert.True(t, ok, "pleg should be healthy") + + // Advance the clock without any relisting. + clock.Step(time.Minute * 10) + ok, _ = pleg.Healthy() + assert.False(t, ok, "pleg should be unhealthy") + + // Relist and than advance the time by 1 minute. pleg should be healthy + // because this is within the allowed limit. + pleg.relist() + clock.Step(time.Minute * 1) + ok, _ = pleg.Healthy() + assert.True(t, ok, "pleg should be healthy") +}