Augmented kubelet healthz with syncLoop check.

Monit uses read-only port for health monitoring.
This commit is contained in:
Abhishek Shah
2015-06-17 15:31:46 -07:00
parent 29ffee51a8
commit 3556993179
7 changed files with 247 additions and 63 deletions

View File

@@ -262,6 +262,7 @@ func NewMainKubelet(
mounter: mounter,
configureCBR0: configureCBR0,
pods: pods,
syncLoopMonitor: util.AtomicValue{},
}
if plug, err := network.InitNetworkPlugin(networkPlugins, networkPluginName, &networkHost{klet}); err != nil {
@@ -491,6 +492,9 @@ type Kubelet struct {
// Number of Pods which can be run by this Kubelet
pods int
// Monitor Kubelet's sync loop
syncLoopMonitor util.AtomicValue
}
// getRootDir returns the full path to the directory under which kubelet can
@@ -1682,41 +1686,58 @@ func (kl *Kubelet) admitPods(allPods []*api.Pod, podSyncTypes map[types.UID]Sync
func (kl *Kubelet) syncLoop(updates <-chan PodUpdate, handler SyncHandler) {
glog.Info("Starting kubelet main sync loop.")
for {
if !kl.containerRuntimeUp() {
time.Sleep(5 * time.Second)
glog.Infof("Skipping pod synchronization, container runtime is not up.")
continue
kl.syncLoopIteration(updates, handler)
}
}
func (kl *Kubelet) syncLoopIteration(updates <-chan PodUpdate, handler SyncHandler) {
kl.syncLoopMonitor.Store(time.Now())
if !kl.containerRuntimeUp() {
time.Sleep(5 * time.Second)
glog.Infof("Skipping pod synchronization, container runtime is not up.")
return
}
unsyncedPod := false
podSyncTypes := make(map[types.UID]SyncPodType)
select {
case u, ok := <-updates:
if !ok {
glog.Errorf("Update channel is closed. Exiting the sync loop.")
return
}
unsyncedPod := false
podSyncTypes := make(map[types.UID]SyncPodType)
kl.podManager.UpdatePods(u, podSyncTypes)
unsyncedPod = true
kl.syncLoopMonitor.Store(time.Now())
case <-time.After(kl.resyncInterval):
glog.V(4).Infof("Periodic sync")
}
start := time.Now()
// If we already caught some update, try to wait for some short time
// to possibly batch it with other incoming updates.
for unsyncedPod {
select {
case u, ok := <-updates:
if !ok {
glog.Errorf("Update channel is closed. Exiting the sync loop.")
return
}
case u := <-updates:
kl.podManager.UpdatePods(u, podSyncTypes)
unsyncedPod = true
case <-time.After(kl.resyncInterval):
glog.V(4).Infof("Periodic sync")
}
start := time.Now()
// If we already caught some update, try to wait for some short time
// to possibly batch it with other incoming updates.
for unsyncedPod {
select {
case u := <-updates:
kl.podManager.UpdatePods(u, podSyncTypes)
case <-time.After(5 * time.Millisecond):
// Break the for loop.
unsyncedPod = false
}
}
pods, mirrorPods := kl.podManager.GetPodsAndMirrorMap()
if err := handler.SyncPods(pods, podSyncTypes, mirrorPods, start); err != nil {
glog.Errorf("Couldn't sync containers: %v", err)
kl.syncLoopMonitor.Store(time.Now())
case <-time.After(5 * time.Millisecond):
// Break the for loop.
unsyncedPod = false
}
}
pods, mirrorPods := kl.podManager.GetPodsAndMirrorMap()
kl.syncLoopMonitor.Store(time.Now())
if err := handler.SyncPods(pods, podSyncTypes, mirrorPods, start); err != nil {
glog.Errorf("Couldn't sync containers: %v", err)
}
kl.syncLoopMonitor.Store(time.Now())
}
func (kl *Kubelet) LatestLoopEntryTime() time.Time {
val := kl.syncLoopMonitor.Load()
if val == nil {
return time.Time{}
}
return val.(time.Time)
}
// Returns the container runtime version for this Kubelet.
@@ -2263,6 +2284,10 @@ func (kl *Kubelet) StreamingConnectionIdleTimeout() time.Duration {
return kl.streamingConnectionIdleTimeout
}
func (kl *Kubelet) ResyncInterval() time.Duration {
return kl.resyncInterval
}
// GetContainerInfo returns stats (from Cadvisor) for a container.
func (kl *Kubelet) GetContainerInfo(podFullName string, podUID types.UID, containerName string, req *cadvisorApi.ContainerInfoRequest) (*cadvisorApi.ContainerInfo, error) {

View File

@@ -299,6 +299,28 @@ func TestKubeletDirsCompat(t *testing.T) {
var emptyPodUIDs map[types.UID]SyncPodType
func TestSyncLoopTimeUpdate(t *testing.T) {
testKubelet := newTestKubelet(t)
testKubelet.fakeCadvisor.On("MachineInfo").Return(&cadvisorApi.MachineInfo{}, nil)
kubelet := testKubelet.kubelet
loopTime1 := kubelet.LatestLoopEntryTime()
if !loopTime1.IsZero() {
t.Errorf("Unexpected sync loop time: %s, expected 0", loopTime1)
}
kubelet.syncLoopIteration(make(chan PodUpdate), kubelet)
loopTime2 := kubelet.LatestLoopEntryTime()
if loopTime2.IsZero() {
t.Errorf("Unexpected sync loop time: 0, expected non-zero value.")
}
kubelet.syncLoopIteration(make(chan PodUpdate), kubelet)
loopTime3 := kubelet.LatestLoopEntryTime()
if !loopTime3.After(loopTime1) {
t.Errorf("Sync Loop Time was not updated correctly. Second update timestamp should be greater than first update timestamp")
}
}
func TestSyncPodsStartPod(t *testing.T) {
testKubelet := newTestKubelet(t)
testKubelet.fakeCadvisor.On("MachineInfo").Return(&cadvisorApi.MachineInfo{}, nil)

View File

@@ -80,14 +80,12 @@ func ListenAndServeKubeletServer(host HostInterface, address net.IP, port uint,
// ListenAndServeKubeletReadOnlyServer initializes a server to respond to HTTP network requests on the Kubelet.
func ListenAndServeKubeletReadOnlyServer(host HostInterface, address net.IP, port uint) {
glog.V(1).Infof("Starting to listen read-only on %s:%d", address, port)
s := &Server{host, http.NewServeMux()}
healthz.InstallHandler(s.mux)
s.mux.HandleFunc("/stats/", s.handleStats)
s := NewServer(host, false)
s.mux.Handle("/metrics", prometheus.Handler())
server := &http.Server{
Addr: net.JoinHostPort(address.String(), strconv.FormatUint(uint64(port), 10)),
Handler: s,
Handler: &s,
ReadTimeout: 5 * time.Minute,
WriteTimeout: 5 * time.Minute,
MaxHeaderBytes: 1 << 20,
@@ -110,7 +108,9 @@ type HostInterface interface {
ServeLogs(w http.ResponseWriter, req *http.Request)
PortForward(name string, uid types.UID, port uint16, stream io.ReadWriteCloser) error
StreamingConnectionIdleTimeout() time.Duration
ResyncInterval() time.Duration
GetHostname() string
LatestLoopEntryTime() time.Time
}
// NewServer initializes and configures a kubelet.Server object to handle HTTP requests.
@@ -132,6 +132,7 @@ func (s *Server) InstallDefaultHandlers() {
healthz.PingHealthz,
healthz.NamedCheck("docker", s.dockerHealthCheck),
healthz.NamedCheck("hostname", s.hostnameHealthCheck),
healthz.NamedCheck("syncloop", s.syncLoopHealthCheck),
)
s.mux.HandleFunc("/pods", s.handlePods)
s.mux.HandleFunc("/stats/", s.handleStats)
@@ -195,6 +196,20 @@ func (s *Server) hostnameHealthCheck(req *http.Request) error {
return nil
}
// Checks if kubelet's sync loop that updates containers is working.
func (s *Server) syncLoopHealthCheck(req *http.Request) error {
duration := s.host.ResyncInterval() * 2
minDuration := time.Minute * 5
if duration < minDuration {
duration = minDuration
}
enterLoopTime := s.host.LatestLoopEntryTime()
if !enterLoopTime.IsZero() && time.Now().After(enterLoopTime.Add(duration)) {
return fmt.Errorf("Sync Loop took longer than expected.")
}
return nil
}
// handleContainerLogs handles containerLogs request against the Kubelet
func (s *Server) handleContainerLogs(w http.ResponseWriter, req *http.Request) {
defer req.Body.Close()

View File

@@ -55,6 +55,16 @@ type fakeKubelet struct {
containerLogsFunc func(podFullName, containerName, tail string, follow, pervious bool, stdout, stderr io.Writer) error
streamingConnectionIdleTimeoutFunc func() time.Duration
hostnameFunc func() string
resyncInterval time.Duration
loopEntryTime time.Time
}
func (fk *fakeKubelet) ResyncInterval() time.Duration {
return fk.resyncInterval
}
func (fk *fakeKubelet) LatestLoopEntryTime() time.Time {
return fk.loopEntryTime
}
func (fk *fakeKubelet) GetPodByName(namespace, name string) (*api.Pod, bool) {
@@ -453,51 +463,71 @@ func TestHealthCheck(t *testing.T) {
}
// Test with correct hostname, Docker version
resp, err := http.Get(fw.testHTTPServer.URL + "/healthz")
if err != nil {
t.Fatalf("Got error GETing: %v", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
t.Errorf("expected status code %d, got %d", http.StatusOK, resp.StatusCode)
}
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
// copying the response body did not work
t.Fatalf("Cannot copy resp: %#v", err)
}
result := string(body)
if !strings.Contains(result, "ok") {
t.Errorf("expected body contains ok, got %s", result)
}
assertHealthIsOk(t, fw.testHTTPServer.URL+"/healthz")
//Test with incorrect hostname
fw.fakeKubelet.hostnameFunc = func() string {
return "fake"
}
resp, err = http.Get(fw.testHTTPServer.URL + "/healthz")
if err != nil {
t.Fatalf("Got error GETing: %v", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
t.Errorf("expected status code %d, got %d", http.StatusOK, resp.StatusCode)
}
assertHealthIsOk(t, fw.testHTTPServer.URL+"/healthz")
//Test with old container runtime version
fw.fakeKubelet.containerVersionFunc = func() (kubecontainer.Version, error) {
return dockertools.NewVersion("1.1")
}
resp, err = http.Get(fw.testHTTPServer.URL + "/healthz")
assertHealthFails(t, fw.testHTTPServer.URL+"/healthz", http.StatusInternalServerError)
}
func assertHealthFails(t *testing.T, httpURL string, expectedErrorCode int) {
resp, err := http.Get(httpURL)
if err != nil {
t.Fatalf("Got error GETing: %v", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusInternalServerError {
t.Errorf("expected status code %d, got %d", http.StatusInternalServerError, resp.StatusCode)
if resp.StatusCode != expectedErrorCode {
t.Errorf("expected status code %d, got %d", expectedErrorCode, resp.StatusCode)
}
}
func TestSyncLoopCheck(t *testing.T) {
fw := newServerTest()
fw.fakeKubelet.containerVersionFunc = func() (kubecontainer.Version, error) {
return dockertools.NewVersion("1.15")
}
fw.fakeKubelet.hostnameFunc = func() string {
return "127.0.0.1"
}
fw.fakeKubelet.resyncInterval = time.Minute
fw.fakeKubelet.loopEntryTime = time.Now()
// Test with correct hostname, Docker version
assertHealthIsOk(t, fw.testHTTPServer.URL+"/healthz")
fw.fakeKubelet.loopEntryTime = time.Now().Add(time.Minute * -10)
assertHealthFails(t, fw.testHTTPServer.URL+"/healthz", http.StatusInternalServerError)
}
// returns http response status code from the HTTP GET
func assertHealthIsOk(t *testing.T, httpURL string) {
resp, err := http.Get(httpURL)
if err != nil {
t.Fatalf("Got error GETing: %v", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
t.Errorf("expected status code %d, got %d", http.StatusOK, resp.StatusCode)
}
body, readErr := ioutil.ReadAll(resp.Body)
if readErr != nil {
// copying the response body did not work
t.Fatalf("Cannot copy resp: %#v", readErr)
}
result := string(body)
if !strings.Contains(result, "ok") {
t.Errorf("expected body contains ok, got %s", result)
}
}
func setPodByNameFunc(fw *serverTestFramework, namespace, pod, container string) {