mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-03 17:30:00 +00:00
Merge pull request #22233 from yujuhong/pleg_health
Auto commit by PR queue bot
This commit is contained in:
commit
3f16f5f2b8
@ -449,7 +449,7 @@ func NewMainKubelet(
|
|||||||
return nil, fmt.Errorf("unsupported container runtime %q specified", containerRuntime)
|
return nil, fmt.Errorf("unsupported container runtime %q specified", containerRuntime)
|
||||||
}
|
}
|
||||||
|
|
||||||
klet.pleg = pleg.NewGenericPLEG(klet.containerRuntime, plegChannelCapacity, plegRelistPeriod, klet.podCache)
|
klet.pleg = pleg.NewGenericPLEG(klet.containerRuntime, plegChannelCapacity, plegRelistPeriod, klet.podCache, util.RealClock{})
|
||||||
klet.runtimeState = newRuntimeState(maxWaitForContainerRuntime, configureCBR0, klet.isContainerRuntimeVersionCompatible)
|
klet.runtimeState = newRuntimeState(maxWaitForContainerRuntime, configureCBR0, klet.isContainerRuntimeVersionCompatible)
|
||||||
klet.updatePodCIDR(podCIDR)
|
klet.updatePodCIDR(podCIDR)
|
||||||
|
|
||||||
@ -2516,6 +2516,10 @@ func (kl *Kubelet) LatestLoopEntryTime() time.Time {
|
|||||||
return val.(time.Time)
|
return val.(time.Time)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (kl *Kubelet) PLEGHealthCheck() (bool, error) {
|
||||||
|
return kl.pleg.Healthy()
|
||||||
|
}
|
||||||
|
|
||||||
// validateContainerLogStatus returns the container ID for the desired container to retrieve logs for, based on the state
|
// validateContainerLogStatus returns the container ID for the desired container to retrieve logs for, based on the state
|
||||||
// of the container. The previous flag will only return the logs for the the last terminated container, otherwise, the current
|
// of the container. The previous flag will only return the logs for the the last terminated container, otherwise, the current
|
||||||
// running container is preferred over a previous termination. If info about the container is not available then a specific
|
// running container is preferred over a previous termination. If info about the container is not available then a specific
|
||||||
|
@ -194,7 +194,7 @@ func newTestKubelet(t *testing.T) *TestKubelet {
|
|||||||
}
|
}
|
||||||
kubelet.workQueue = queue.NewBasicWorkQueue()
|
kubelet.workQueue = queue.NewBasicWorkQueue()
|
||||||
// Relist period does not affect the tests.
|
// Relist period does not affect the tests.
|
||||||
kubelet.pleg = pleg.NewGenericPLEG(fakeRuntime, 100, time.Hour, nil)
|
kubelet.pleg = pleg.NewGenericPLEG(fakeRuntime, 100, time.Hour, nil, util.RealClock{})
|
||||||
kubelet.clock = fakeClock
|
kubelet.clock = fakeClock
|
||||||
kubelet.setNodeStatusFuncs = kubelet.defaultNodeStatusFuncs()
|
kubelet.setNodeStatusFuncs = kubelet.defaultNodeStatusFuncs()
|
||||||
return &TestKubelet{kubelet, fakeRuntime, mockCadvisor, fakeKubeClient, fakeMirrorClient, fakeClock, nil}
|
return &TestKubelet{kubelet, fakeRuntime, mockCadvisor, fakeKubeClient, fakeMirrorClient, fakeClock, nil}
|
||||||
|
@ -24,6 +24,8 @@ import (
|
|||||||
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
||||||
"k8s.io/kubernetes/pkg/kubelet/metrics"
|
"k8s.io/kubernetes/pkg/kubelet/metrics"
|
||||||
"k8s.io/kubernetes/pkg/types"
|
"k8s.io/kubernetes/pkg/types"
|
||||||
|
"k8s.io/kubernetes/pkg/util"
|
||||||
|
"k8s.io/kubernetes/pkg/util/atomic"
|
||||||
"k8s.io/kubernetes/pkg/util/sets"
|
"k8s.io/kubernetes/pkg/util/sets"
|
||||||
"k8s.io/kubernetes/pkg/util/wait"
|
"k8s.io/kubernetes/pkg/util/wait"
|
||||||
)
|
)
|
||||||
@ -53,9 +55,11 @@ type GenericPLEG struct {
|
|||||||
// The internal cache for pod/container information.
|
// The internal cache for pod/container information.
|
||||||
podRecords podRecords
|
podRecords podRecords
|
||||||
// Time of the last relisting.
|
// Time of the last relisting.
|
||||||
lastRelistTime time.Time
|
relistTime atomic.Value
|
||||||
// Cache for storing the runtime states required for syncing pods.
|
// Cache for storing the runtime states required for syncing pods.
|
||||||
cache kubecontainer.Cache
|
cache kubecontainer.Cache
|
||||||
|
// For testability.
|
||||||
|
clock util.Clock
|
||||||
}
|
}
|
||||||
|
|
||||||
// plegContainerState has a one-to-one mapping to the
|
// plegContainerState has a one-to-one mapping to the
|
||||||
@ -91,13 +95,14 @@ type podRecord struct {
|
|||||||
type podRecords map[types.UID]*podRecord
|
type podRecords map[types.UID]*podRecord
|
||||||
|
|
||||||
func NewGenericPLEG(runtime kubecontainer.Runtime, channelCapacity int,
|
func NewGenericPLEG(runtime kubecontainer.Runtime, channelCapacity int,
|
||||||
relistPeriod time.Duration, cache kubecontainer.Cache) PodLifecycleEventGenerator {
|
relistPeriod time.Duration, cache kubecontainer.Cache, clock util.Clock) PodLifecycleEventGenerator {
|
||||||
return &GenericPLEG{
|
return &GenericPLEG{
|
||||||
relistPeriod: relistPeriod,
|
relistPeriod: relistPeriod,
|
||||||
runtime: runtime,
|
runtime: runtime,
|
||||||
eventChannel: make(chan *PodLifecycleEvent, channelCapacity),
|
eventChannel: make(chan *PodLifecycleEvent, channelCapacity),
|
||||||
podRecords: make(podRecords),
|
podRecords: make(podRecords),
|
||||||
cache: cache,
|
cache: cache,
|
||||||
|
clock: clock,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -113,6 +118,19 @@ func (g *GenericPLEG) Start() {
|
|||||||
go wait.Until(g.relist, g.relistPeriod, wait.NeverStop)
|
go wait.Until(g.relist, g.relistPeriod, wait.NeverStop)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (g *GenericPLEG) Healthy() (bool, error) {
|
||||||
|
relistTime := g.getRelistTime()
|
||||||
|
// TODO: Evaluate if we can reduce this threshold.
|
||||||
|
// The threshold needs to be greater than the relisting period + the
|
||||||
|
// relisting time, which can vary significantly. Set a conservative
|
||||||
|
// threshold so that we don't cause kubelet to be restarted unnecessarily.
|
||||||
|
threshold := 2 * time.Minute
|
||||||
|
if g.clock.Since(relistTime) > threshold {
|
||||||
|
return false, fmt.Errorf("pleg was last seen active at %v", relistTime)
|
||||||
|
}
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
|
||||||
func generateEvent(podID types.UID, cid string, oldState, newState plegContainerState) *PodLifecycleEvent {
|
func generateEvent(podID types.UID, cid string, oldState, newState plegContainerState) *PodLifecycleEvent {
|
||||||
if newState == oldState {
|
if newState == oldState {
|
||||||
return nil
|
return nil
|
||||||
@ -143,18 +161,31 @@ func generateEvent(podID types.UID, cid string, oldState, newState plegContainer
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (g *GenericPLEG) getRelistTime() time.Time {
|
||||||
|
val := g.relistTime.Load()
|
||||||
|
if val == nil {
|
||||||
|
return time.Time{}
|
||||||
|
}
|
||||||
|
return val.(time.Time)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (g *GenericPLEG) updateRelisTime(timestamp time.Time) {
|
||||||
|
g.relistTime.Store(timestamp)
|
||||||
|
}
|
||||||
|
|
||||||
// relist queries the container runtime for list of pods/containers, compare
|
// relist queries the container runtime for list of pods/containers, compare
|
||||||
// with the internal pods/containers, and generats events accordingly.
|
// with the internal pods/containers, and generats events accordingly.
|
||||||
func (g *GenericPLEG) relist() {
|
func (g *GenericPLEG) relist() {
|
||||||
glog.V(5).Infof("GenericPLEG: Relisting")
|
glog.V(5).Infof("GenericPLEG: Relisting")
|
||||||
timestamp := time.Now()
|
|
||||||
|
|
||||||
if !g.lastRelistTime.IsZero() {
|
if lastRelistTime := g.getRelistTime(); !lastRelistTime.IsZero() {
|
||||||
metrics.PLEGRelistInterval.Observe(metrics.SinceInMicroseconds(g.lastRelistTime))
|
metrics.PLEGRelistInterval.Observe(metrics.SinceInMicroseconds(lastRelistTime))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
timestamp := g.clock.Now()
|
||||||
|
// Update the relist time.
|
||||||
|
g.updateRelisTime(timestamp)
|
||||||
defer func() {
|
defer func() {
|
||||||
// Update the relist time.
|
|
||||||
g.lastRelistTime = timestamp
|
|
||||||
metrics.PLEGRelistLatency.Observe(metrics.SinceInMicroseconds(timestamp))
|
metrics.PLEGRelistLatency.Observe(metrics.SinceInMicroseconds(timestamp))
|
||||||
}()
|
}()
|
||||||
|
|
||||||
@ -260,7 +291,7 @@ func (g *GenericPLEG) updateCache(pod *kubecontainer.Pod, pid types.UID) error {
|
|||||||
g.cache.Delete(pid)
|
g.cache.Delete(pid)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
timestamp := time.Now()
|
timestamp := g.clock.Now()
|
||||||
// TODO: Consider adding a new runtime method
|
// TODO: Consider adding a new runtime method
|
||||||
// GetPodStatus(pod *kubecontainer.Pod) so that Docker can avoid listing
|
// GetPodStatus(pod *kubecontainer.Pod) so that Docker can avoid listing
|
||||||
// all containers again.
|
// all containers again.
|
||||||
|
@ -37,10 +37,12 @@ const (
|
|||||||
type TestGenericPLEG struct {
|
type TestGenericPLEG struct {
|
||||||
pleg *GenericPLEG
|
pleg *GenericPLEG
|
||||||
runtime *containertest.FakeRuntime
|
runtime *containertest.FakeRuntime
|
||||||
|
clock *util.FakeClock
|
||||||
}
|
}
|
||||||
|
|
||||||
func newTestGenericPLEG() *TestGenericPLEG {
|
func newTestGenericPLEG() *TestGenericPLEG {
|
||||||
fakeRuntime := &containertest.FakeRuntime{}
|
fakeRuntime := &containertest.FakeRuntime{}
|
||||||
|
clock := util.NewFakeClock(time.Time{})
|
||||||
// The channel capacity should be large enough to hold all events in a
|
// The channel capacity should be large enough to hold all events in a
|
||||||
// single test.
|
// single test.
|
||||||
pleg := &GenericPLEG{
|
pleg := &GenericPLEG{
|
||||||
@ -48,8 +50,9 @@ func newTestGenericPLEG() *TestGenericPLEG {
|
|||||||
runtime: fakeRuntime,
|
runtime: fakeRuntime,
|
||||||
eventChannel: make(chan *PodLifecycleEvent, 100),
|
eventChannel: make(chan *PodLifecycleEvent, 100),
|
||||||
podRecords: make(podRecords),
|
podRecords: make(podRecords),
|
||||||
|
clock: clock,
|
||||||
}
|
}
|
||||||
return &TestGenericPLEG{pleg: pleg, runtime: fakeRuntime}
|
return &TestGenericPLEG{pleg: pleg, runtime: fakeRuntime, clock: clock}
|
||||||
}
|
}
|
||||||
|
|
||||||
func getEventsFromChannel(ch <-chan *PodLifecycleEvent) []*PodLifecycleEvent {
|
func getEventsFromChannel(ch <-chan *PodLifecycleEvent) []*PodLifecycleEvent {
|
||||||
@ -236,6 +239,7 @@ func newTestGenericPLEGWithRuntimeMock() (*GenericPLEG, *containertest.Mock) {
|
|||||||
eventChannel: make(chan *PodLifecycleEvent, 100),
|
eventChannel: make(chan *PodLifecycleEvent, 100),
|
||||||
podRecords: make(podRecords),
|
podRecords: make(podRecords),
|
||||||
cache: kubecontainer.NewCache(),
|
cache: kubecontainer.NewCache(),
|
||||||
|
clock: util.RealClock{},
|
||||||
}
|
}
|
||||||
return pleg, runtimeMock
|
return pleg, runtimeMock
|
||||||
}
|
}
|
||||||
@ -332,3 +336,22 @@ func TestRemoveCacheEntry(t *testing.T) {
|
|||||||
assert.Equal(t, &kubecontainer.PodStatus{ID: pods[0].ID}, actualStatus)
|
assert.Equal(t, &kubecontainer.PodStatus{ID: pods[0].ID}, actualStatus)
|
||||||
assert.Equal(t, nil, actualErr)
|
assert.Equal(t, nil, actualErr)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestHealthy(t *testing.T) {
|
||||||
|
testPleg := newTestGenericPLEG()
|
||||||
|
pleg, _, clock := testPleg.pleg, testPleg.runtime, testPleg.clock
|
||||||
|
ok, _ := pleg.Healthy()
|
||||||
|
assert.True(t, ok, "pleg should be healthy")
|
||||||
|
|
||||||
|
// Advance the clock without any relisting.
|
||||||
|
clock.Step(time.Minute * 10)
|
||||||
|
ok, _ = pleg.Healthy()
|
||||||
|
assert.False(t, ok, "pleg should be unhealthy")
|
||||||
|
|
||||||
|
// Relist and than advance the time by 1 minute. pleg should be healthy
|
||||||
|
// because this is within the allowed limit.
|
||||||
|
pleg.relist()
|
||||||
|
clock.Step(time.Minute * 1)
|
||||||
|
ok, _ = pleg.Healthy()
|
||||||
|
assert.True(t, ok, "pleg should be healthy")
|
||||||
|
}
|
||||||
|
@ -48,4 +48,5 @@ type PodLifecycleEvent struct {
|
|||||||
type PodLifecycleEventGenerator interface {
|
type PodLifecycleEventGenerator interface {
|
||||||
Start()
|
Start()
|
||||||
Watch() chan *PodLifecycleEvent
|
Watch() chan *PodLifecycleEvent
|
||||||
|
Healthy() (bool, error)
|
||||||
}
|
}
|
||||||
|
@ -166,6 +166,7 @@ type HostInterface interface {
|
|||||||
DockerImagesFsInfo() (cadvisorapiv2.FsInfo, error)
|
DockerImagesFsInfo() (cadvisorapiv2.FsInfo, error)
|
||||||
RootFsInfo() (cadvisorapiv2.FsInfo, error)
|
RootFsInfo() (cadvisorapiv2.FsInfo, error)
|
||||||
ListVolumesForPod(podUID types.UID) (map[string]volume.Volume, bool)
|
ListVolumesForPod(podUID types.UID) (map[string]volume.Volume, bool)
|
||||||
|
PLEGHealthCheck() (bool, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewServer initializes and configures a kubelet.Server object to handle HTTP requests.
|
// NewServer initializes and configures a kubelet.Server object to handle HTTP requests.
|
||||||
@ -223,6 +224,7 @@ func (s *Server) InstallDefaultHandlers() {
|
|||||||
healthz.InstallHandler(s.restfulCont,
|
healthz.InstallHandler(s.restfulCont,
|
||||||
healthz.PingHealthz,
|
healthz.PingHealthz,
|
||||||
healthz.NamedCheck("syncloop", s.syncLoopHealthCheck),
|
healthz.NamedCheck("syncloop", s.syncLoopHealthCheck),
|
||||||
|
healthz.NamedCheck("pleg", s.plegHealthCheck),
|
||||||
)
|
)
|
||||||
var ws *restful.WebService
|
var ws *restful.WebService
|
||||||
ws = new(restful.WebService)
|
ws = new(restful.WebService)
|
||||||
@ -385,6 +387,14 @@ func (s *Server) syncLoopHealthCheck(req *http.Request) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Checks if pleg, which lists pods periodically, is healthy.
|
||||||
|
func (s *Server) plegHealthCheck(req *http.Request) error {
|
||||||
|
if ok, err := s.host.PLEGHealthCheck(); !ok {
|
||||||
|
return fmt.Errorf("PLEG took longer than expected: %v", err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// getContainerLogs handles containerLogs request against the Kubelet
|
// getContainerLogs handles containerLogs request against the Kubelet
|
||||||
func (s *Server) getContainerLogs(request *restful.Request, response *restful.Response) {
|
func (s *Server) getContainerLogs(request *restful.Request, response *restful.Response) {
|
||||||
podNamespace := request.PathParameter("podNamespace")
|
podNamespace := request.PathParameter("podNamespace")
|
||||||
|
@ -67,6 +67,7 @@ type fakeKubelet struct {
|
|||||||
hostnameFunc func() string
|
hostnameFunc func() string
|
||||||
resyncInterval time.Duration
|
resyncInterval time.Duration
|
||||||
loopEntryTime time.Time
|
loopEntryTime time.Time
|
||||||
|
plegHealth bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func (fk *fakeKubelet) ResyncInterval() time.Duration {
|
func (fk *fakeKubelet) ResyncInterval() time.Duration {
|
||||||
@ -133,6 +134,8 @@ func (fk *fakeKubelet) StreamingConnectionIdleTimeout() time.Duration {
|
|||||||
return fk.streamingConnectionIdleTimeoutFunc()
|
return fk.streamingConnectionIdleTimeoutFunc()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (fk *fakeKubelet) PLEGHealthCheck() (bool, error) { return fk.plegHealth, nil }
|
||||||
|
|
||||||
// Unused functions
|
// Unused functions
|
||||||
func (_ *fakeKubelet) GetContainerInfoV2(_ string, _ cadvisorapiv2.RequestOptions) (map[string]cadvisorapiv2.ContainerInfo, error) {
|
func (_ *fakeKubelet) GetContainerInfoV2(_ string, _ cadvisorapiv2.RequestOptions) (map[string]cadvisorapiv2.ContainerInfo, error) {
|
||||||
return nil, nil
|
return nil, nil
|
||||||
@ -190,6 +193,7 @@ func newServerTest() *serverTestFramework {
|
|||||||
},
|
},
|
||||||
}, true
|
}, true
|
||||||
},
|
},
|
||||||
|
plegHealth: true,
|
||||||
}
|
}
|
||||||
fw.fakeAuth = &fakeAuth{
|
fw.fakeAuth = &fakeAuth{
|
||||||
authenticateFunc: func(req *http.Request) (user.Info, bool, error) {
|
authenticateFunc: func(req *http.Request) (user.Info, bool, error) {
|
||||||
@ -532,7 +536,7 @@ func TestHealthCheck(t *testing.T) {
|
|||||||
// Test with correct hostname, Docker version
|
// Test with correct hostname, Docker version
|
||||||
assertHealthIsOk(t, fw.testHTTPServer.URL+"/healthz")
|
assertHealthIsOk(t, fw.testHTTPServer.URL+"/healthz")
|
||||||
|
|
||||||
//Test with incorrect hostname
|
// Test with incorrect hostname
|
||||||
fw.fakeKubelet.hostnameFunc = func() string {
|
fw.fakeKubelet.hostnameFunc = func() string {
|
||||||
return "fake"
|
return "fake"
|
||||||
}
|
}
|
||||||
@ -738,6 +742,17 @@ func TestSyncLoopCheck(t *testing.T) {
|
|||||||
assertHealthFails(t, fw.testHTTPServer.URL+"/healthz", http.StatusInternalServerError)
|
assertHealthFails(t, fw.testHTTPServer.URL+"/healthz", http.StatusInternalServerError)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestPLEGHealthCheck(t *testing.T) {
|
||||||
|
fw := newServerTest()
|
||||||
|
fw.fakeKubelet.hostnameFunc = func() string {
|
||||||
|
return "127.0.0.1"
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test with failed pleg health check.
|
||||||
|
fw.fakeKubelet.plegHealth = false
|
||||||
|
assertHealthFails(t, fw.testHTTPServer.URL+"/healthz", http.StatusInternalServerError)
|
||||||
|
}
|
||||||
|
|
||||||
// returns http response status code from the HTTP GET
|
// returns http response status code from the HTTP GET
|
||||||
func assertHealthIsOk(t *testing.T, httpURL string) {
|
func assertHealthIsOk(t *testing.T, httpURL string) {
|
||||||
resp, err := http.Get(httpURL)
|
resp, err := http.Get(httpURL)
|
||||||
|
Loading…
Reference in New Issue
Block a user