mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-31 07:20:13 +00:00
kubelet: monitor the health of pleg
PLEG is reponsible for listing the pods running on the node. If it's hung due to non-responsive container runtime or internal bugs, we should restart kubelet.
This commit is contained in:
parent
719158d2c8
commit
94368df91a
@ -2511,6 +2511,10 @@ func (kl *Kubelet) LatestLoopEntryTime() time.Time {
|
||||
return val.(time.Time)
|
||||
}
|
||||
|
||||
func (kl *Kubelet) PLEGHealthCheck() (bool, error) {
|
||||
return kl.pleg.Healthy()
|
||||
}
|
||||
|
||||
// validateContainerLogStatus returns the container ID for the desired container to retrieve logs for, based on the state
|
||||
// of the container. The previous flag will only return the logs for the the last terminated container, otherwise, the current
|
||||
// running container is preferred over a previous termination. If info about the container is not available then a specific
|
||||
|
@ -24,6 +24,7 @@ import (
|
||||
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
||||
"k8s.io/kubernetes/pkg/kubelet/metrics"
|
||||
"k8s.io/kubernetes/pkg/types"
|
||||
"k8s.io/kubernetes/pkg/util/atomic"
|
||||
"k8s.io/kubernetes/pkg/util/sets"
|
||||
"k8s.io/kubernetes/pkg/util/wait"
|
||||
)
|
||||
@ -53,7 +54,7 @@ type GenericPLEG struct {
|
||||
// The internal cache for pod/container information.
|
||||
podRecords podRecords
|
||||
// Time of the last relisting.
|
||||
lastRelistTime time.Time
|
||||
relistTime atomic.Value
|
||||
// Cache for storing the runtime states required for syncing pods.
|
||||
cache kubecontainer.Cache
|
||||
}
|
||||
@ -113,6 +114,19 @@ func (g *GenericPLEG) Start() {
|
||||
go wait.Until(g.relist, g.relistPeriod, wait.NeverStop)
|
||||
}
|
||||
|
||||
func (g *GenericPLEG) Healthy() (bool, error) {
|
||||
relistTime := g.getRelistTime()
|
||||
// TODO: Evaluate if we can reduce this threshold.
|
||||
// The threshold needs to be greater than the relisting period + the
|
||||
// relisting time, which can vary significantly. Set a conservative
|
||||
// threshold so that we don't cause kubelet to be restarted unnecessarily.
|
||||
threshold := 2 * time.Minute
|
||||
if time.Since(relistTime) > threshold {
|
||||
return false, fmt.Errorf("pleg was last seen active at %v", relistTime)
|
||||
}
|
||||
return true, nil
|
||||
}
|
||||
|
||||
func generateEvent(podID types.UID, cid string, oldState, newState plegContainerState) *PodLifecycleEvent {
|
||||
if newState == oldState {
|
||||
return nil
|
||||
@ -143,18 +157,31 @@ func generateEvent(podID types.UID, cid string, oldState, newState plegContainer
|
||||
return nil
|
||||
}
|
||||
|
||||
func (g *GenericPLEG) getRelistTime() time.Time {
|
||||
val := g.relistTime.Load()
|
||||
if val == nil {
|
||||
return time.Time{}
|
||||
}
|
||||
return val.(time.Time)
|
||||
}
|
||||
|
||||
func (g *GenericPLEG) updateRelisTime(timestamp time.Time) {
|
||||
g.relistTime.Store(timestamp)
|
||||
}
|
||||
|
||||
// relist queries the container runtime for list of pods/containers, compare
|
||||
// with the internal pods/containers, and generats events accordingly.
|
||||
func (g *GenericPLEG) relist() {
|
||||
glog.V(5).Infof("GenericPLEG: Relisting")
|
||||
timestamp := time.Now()
|
||||
|
||||
if !g.lastRelistTime.IsZero() {
|
||||
metrics.PLEGRelistInterval.Observe(metrics.SinceInMicroseconds(g.lastRelistTime))
|
||||
if lastRelistTime := g.getRelistTime(); !lastRelistTime.IsZero() {
|
||||
metrics.PLEGRelistInterval.Observe(metrics.SinceInMicroseconds(lastRelistTime))
|
||||
}
|
||||
|
||||
timestamp := time.Now()
|
||||
// Update the relist time.
|
||||
g.updateRelisTime(timestamp)
|
||||
defer func() {
|
||||
// Update the relist time.
|
||||
g.lastRelistTime = timestamp
|
||||
metrics.PLEGRelistLatency.Observe(metrics.SinceInMicroseconds(timestamp))
|
||||
}()
|
||||
|
||||
|
@ -48,4 +48,5 @@ type PodLifecycleEvent struct {
|
||||
type PodLifecycleEventGenerator interface {
|
||||
Start()
|
||||
Watch() chan *PodLifecycleEvent
|
||||
Healthy() (bool, error)
|
||||
}
|
||||
|
@ -166,6 +166,7 @@ type HostInterface interface {
|
||||
DockerImagesFsInfo() (cadvisorapiv2.FsInfo, error)
|
||||
RootFsInfo() (cadvisorapiv2.FsInfo, error)
|
||||
ListVolumesForPod(podUID types.UID) (map[string]volume.Volume, bool)
|
||||
PLEGHealthCheck() (bool, error)
|
||||
}
|
||||
|
||||
// NewServer initializes and configures a kubelet.Server object to handle HTTP requests.
|
||||
@ -223,6 +224,7 @@ func (s *Server) InstallDefaultHandlers() {
|
||||
healthz.InstallHandler(s.restfulCont,
|
||||
healthz.PingHealthz,
|
||||
healthz.NamedCheck("syncloop", s.syncLoopHealthCheck),
|
||||
healthz.NamedCheck("pleg", s.plegHealthCheck),
|
||||
)
|
||||
var ws *restful.WebService
|
||||
ws = new(restful.WebService)
|
||||
@ -385,6 +387,14 @@ func (s *Server) syncLoopHealthCheck(req *http.Request) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Checks if pleg, which lists pods periodically, is healthy.
|
||||
func (s *Server) plegHealthCheck(req *http.Request) error {
|
||||
if ok, err := s.host.PLEGHealthCheck(); !ok {
|
||||
return fmt.Errorf("PLEG took longer than expected: %v", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// getContainerLogs handles containerLogs request against the Kubelet
|
||||
func (s *Server) getContainerLogs(request *restful.Request, response *restful.Response) {
|
||||
podNamespace := request.PathParameter("podNamespace")
|
||||
|
@ -67,6 +67,7 @@ type fakeKubelet struct {
|
||||
hostnameFunc func() string
|
||||
resyncInterval time.Duration
|
||||
loopEntryTime time.Time
|
||||
plegHealth bool
|
||||
}
|
||||
|
||||
func (fk *fakeKubelet) ResyncInterval() time.Duration {
|
||||
@ -133,6 +134,8 @@ func (fk *fakeKubelet) StreamingConnectionIdleTimeout() time.Duration {
|
||||
return fk.streamingConnectionIdleTimeoutFunc()
|
||||
}
|
||||
|
||||
func (fk *fakeKubelet) PLEGHealthCheck() (bool, error) { return fk.plegHealth, nil }
|
||||
|
||||
// Unused functions
|
||||
func (_ *fakeKubelet) GetContainerInfoV2(_ string, _ cadvisorapiv2.RequestOptions) (map[string]cadvisorapiv2.ContainerInfo, error) {
|
||||
return nil, nil
|
||||
@ -190,6 +193,7 @@ func newServerTest() *serverTestFramework {
|
||||
},
|
||||
}, true
|
||||
},
|
||||
plegHealth: true,
|
||||
}
|
||||
fw.fakeAuth = &fakeAuth{
|
||||
authenticateFunc: func(req *http.Request) (user.Info, bool, error) {
|
||||
@ -532,7 +536,7 @@ func TestHealthCheck(t *testing.T) {
|
||||
// Test with correct hostname, Docker version
|
||||
assertHealthIsOk(t, fw.testHTTPServer.URL+"/healthz")
|
||||
|
||||
//Test with incorrect hostname
|
||||
// Test with incorrect hostname
|
||||
fw.fakeKubelet.hostnameFunc = func() string {
|
||||
return "fake"
|
||||
}
|
||||
@ -738,6 +742,17 @@ func TestSyncLoopCheck(t *testing.T) {
|
||||
assertHealthFails(t, fw.testHTTPServer.URL+"/healthz", http.StatusInternalServerError)
|
||||
}
|
||||
|
||||
func TestPLEGHealthCheck(t *testing.T) {
|
||||
fw := newServerTest()
|
||||
fw.fakeKubelet.hostnameFunc = func() string {
|
||||
return "127.0.0.1"
|
||||
}
|
||||
|
||||
// Test with failed pleg health check.
|
||||
fw.fakeKubelet.plegHealth = false
|
||||
assertHealthFails(t, fw.testHTTPServer.URL+"/healthz", http.StatusInternalServerError)
|
||||
}
|
||||
|
||||
// returns http response status code from the HTTP GET
|
||||
func assertHealthIsOk(t *testing.T, httpURL string) {
|
||||
resp, err := http.Get(httpURL)
|
||||
|
Loading…
Reference in New Issue
Block a user