diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index a03b8f0855c..5a45ad676ec 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -2511,6 +2511,10 @@ func (kl *Kubelet) LatestLoopEntryTime() time.Time { return val.(time.Time) } +func (kl *Kubelet) PLEGHealthCheck() (bool, error) { + return kl.pleg.Healthy() +} + // validateContainerLogStatus returns the container ID for the desired container to retrieve logs for, based on the state // of the container. The previous flag will only return the logs for the the last terminated container, otherwise, the current // running container is preferred over a previous termination. If info about the container is not available then a specific diff --git a/pkg/kubelet/pleg/generic.go b/pkg/kubelet/pleg/generic.go index 1f091e49723..e397a4568b5 100644 --- a/pkg/kubelet/pleg/generic.go +++ b/pkg/kubelet/pleg/generic.go @@ -24,6 +24,7 @@ import ( kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/metrics" "k8s.io/kubernetes/pkg/types" + "k8s.io/kubernetes/pkg/util/atomic" "k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/util/wait" ) @@ -53,7 +54,7 @@ type GenericPLEG struct { // The internal cache for pod/container information. podRecords podRecords // Time of the last relisting. - lastRelistTime time.Time + relistTime atomic.Value // Cache for storing the runtime states required for syncing pods. cache kubecontainer.Cache } @@ -113,6 +114,19 @@ func (g *GenericPLEG) Start() { go wait.Until(g.relist, g.relistPeriod, wait.NeverStop) } +func (g *GenericPLEG) Healthy() (bool, error) { + relistTime := g.getRelistTime() + // TODO: Evaluate if we can reduce this threshold. + // The threshold needs to be greater than the relisting period + the + // relisting time, which can vary significantly. Set a conservative + // threshold so that we don't cause kubelet to be restarted unnecessarily. + threshold := 2 * time.Minute + if time.Since(relistTime) > threshold { + return false, fmt.Errorf("pleg was last seen active at %v", relistTime) + } + return true, nil +} + func generateEvent(podID types.UID, cid string, oldState, newState plegContainerState) *PodLifecycleEvent { if newState == oldState { return nil @@ -143,18 +157,31 @@ func generateEvent(podID types.UID, cid string, oldState, newState plegContainer return nil } +func (g *GenericPLEG) getRelistTime() time.Time { + val := g.relistTime.Load() + if val == nil { + return time.Time{} + } + return val.(time.Time) +} + +func (g *GenericPLEG) updateRelisTime(timestamp time.Time) { + g.relistTime.Store(timestamp) +} + // relist queries the container runtime for list of pods/containers, compare // with the internal pods/containers, and generats events accordingly. func (g *GenericPLEG) relist() { glog.V(5).Infof("GenericPLEG: Relisting") - timestamp := time.Now() - if !g.lastRelistTime.IsZero() { - metrics.PLEGRelistInterval.Observe(metrics.SinceInMicroseconds(g.lastRelistTime)) + if lastRelistTime := g.getRelistTime(); !lastRelistTime.IsZero() { + metrics.PLEGRelistInterval.Observe(metrics.SinceInMicroseconds(lastRelistTime)) } + + timestamp := time.Now() + // Update the relist time. + g.updateRelisTime(timestamp) defer func() { - // Update the relist time. - g.lastRelistTime = timestamp metrics.PLEGRelistLatency.Observe(metrics.SinceInMicroseconds(timestamp)) }() diff --git a/pkg/kubelet/pleg/pleg.go b/pkg/kubelet/pleg/pleg.go index c23472ca925..01798237286 100644 --- a/pkg/kubelet/pleg/pleg.go +++ b/pkg/kubelet/pleg/pleg.go @@ -48,4 +48,5 @@ type PodLifecycleEvent struct { type PodLifecycleEventGenerator interface { Start() Watch() chan *PodLifecycleEvent + Healthy() (bool, error) } diff --git a/pkg/kubelet/server/server.go b/pkg/kubelet/server/server.go index 8246417d87d..0ff3284e935 100644 --- a/pkg/kubelet/server/server.go +++ b/pkg/kubelet/server/server.go @@ -166,6 +166,7 @@ type HostInterface interface { DockerImagesFsInfo() (cadvisorapiv2.FsInfo, error) RootFsInfo() (cadvisorapiv2.FsInfo, error) ListVolumesForPod(podUID types.UID) (map[string]volume.Volume, bool) + PLEGHealthCheck() (bool, error) } // NewServer initializes and configures a kubelet.Server object to handle HTTP requests. @@ -223,6 +224,7 @@ func (s *Server) InstallDefaultHandlers() { healthz.InstallHandler(s.restfulCont, healthz.PingHealthz, healthz.NamedCheck("syncloop", s.syncLoopHealthCheck), + healthz.NamedCheck("pleg", s.plegHealthCheck), ) var ws *restful.WebService ws = new(restful.WebService) @@ -385,6 +387,14 @@ func (s *Server) syncLoopHealthCheck(req *http.Request) error { return nil } +// Checks if pleg, which lists pods periodically, is healthy. +func (s *Server) plegHealthCheck(req *http.Request) error { + if ok, err := s.host.PLEGHealthCheck(); !ok { + return fmt.Errorf("PLEG took longer than expected: %v", err) + } + return nil +} + // getContainerLogs handles containerLogs request against the Kubelet func (s *Server) getContainerLogs(request *restful.Request, response *restful.Response) { podNamespace := request.PathParameter("podNamespace") diff --git a/pkg/kubelet/server/server_test.go b/pkg/kubelet/server/server_test.go index cf5dd1ea7ee..7b5b4fc1e74 100644 --- a/pkg/kubelet/server/server_test.go +++ b/pkg/kubelet/server/server_test.go @@ -67,6 +67,7 @@ type fakeKubelet struct { hostnameFunc func() string resyncInterval time.Duration loopEntryTime time.Time + plegHealth bool } func (fk *fakeKubelet) ResyncInterval() time.Duration { @@ -133,6 +134,8 @@ func (fk *fakeKubelet) StreamingConnectionIdleTimeout() time.Duration { return fk.streamingConnectionIdleTimeoutFunc() } +func (fk *fakeKubelet) PLEGHealthCheck() (bool, error) { return fk.plegHealth, nil } + // Unused functions func (_ *fakeKubelet) GetContainerInfoV2(_ string, _ cadvisorapiv2.RequestOptions) (map[string]cadvisorapiv2.ContainerInfo, error) { return nil, nil @@ -190,6 +193,7 @@ func newServerTest() *serverTestFramework { }, }, true }, + plegHealth: true, } fw.fakeAuth = &fakeAuth{ authenticateFunc: func(req *http.Request) (user.Info, bool, error) { @@ -532,7 +536,7 @@ func TestHealthCheck(t *testing.T) { // Test with correct hostname, Docker version assertHealthIsOk(t, fw.testHTTPServer.URL+"/healthz") - //Test with incorrect hostname + // Test with incorrect hostname fw.fakeKubelet.hostnameFunc = func() string { return "fake" } @@ -738,6 +742,17 @@ func TestSyncLoopCheck(t *testing.T) { assertHealthFails(t, fw.testHTTPServer.URL+"/healthz", http.StatusInternalServerError) } +func TestPLEGHealthCheck(t *testing.T) { + fw := newServerTest() + fw.fakeKubelet.hostnameFunc = func() string { + return "127.0.0.1" + } + + // Test with failed pleg health check. + fw.fakeKubelet.plegHealth = false + assertHealthFails(t, fw.testHTTPServer.URL+"/healthz", http.StatusInternalServerError) +} + // returns http response status code from the HTTP GET func assertHealthIsOk(t *testing.T, httpURL string) { resp, err := http.Get(httpURL)