diff --git a/cluster/saltbase/salt/monit/kubelet b/cluster/saltbase/salt/monit/kubelet index 88fa236618e..eb3211b06a3 100644 --- a/cluster/saltbase/salt/monit/kubelet +++ b/cluster/saltbase/salt/monit/kubelet @@ -5,7 +5,7 @@ stop program = "/etc/init.d/kubelet stop" if does not exist then restart if failed host 127.0.0.1 - port 10248 + port 10255 protocol HTTP request "/healthz" then restart diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 5aa2dec1ca4..2553c6bbaeb 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -262,6 +262,7 @@ func NewMainKubelet( mounter: mounter, configureCBR0: configureCBR0, pods: pods, + syncLoopMonitor: util.AtomicValue{}, } if plug, err := network.InitNetworkPlugin(networkPlugins, networkPluginName, &networkHost{klet}); err != nil { @@ -491,6 +492,9 @@ type Kubelet struct { // Number of Pods which can be run by this Kubelet pods int + + // Monitor Kubelet's sync loop + syncLoopMonitor util.AtomicValue } // getRootDir returns the full path to the directory under which kubelet can @@ -1690,41 +1694,58 @@ func (kl *Kubelet) admitPods(allPods []*api.Pod, podSyncTypes map[types.UID]Sync func (kl *Kubelet) syncLoop(updates <-chan PodUpdate, handler SyncHandler) { glog.Info("Starting kubelet main sync loop.") for { - if !kl.containerRuntimeUp() { - time.Sleep(5 * time.Second) - glog.Infof("Skipping pod synchronization, container runtime is not up.") - continue + kl.syncLoopIteration(updates, handler) + } +} + +func (kl *Kubelet) syncLoopIteration(updates <-chan PodUpdate, handler SyncHandler) { + kl.syncLoopMonitor.Store(time.Now()) + if !kl.containerRuntimeUp() { + time.Sleep(5 * time.Second) + glog.Infof("Skipping pod synchronization, container runtime is not up.") + return + } + unsyncedPod := false + podSyncTypes := make(map[types.UID]SyncPodType) + select { + case u, ok := <-updates: + if !ok { + glog.Errorf("Update channel is closed. Exiting the sync loop.") + return } - unsyncedPod := false - podSyncTypes := make(map[types.UID]SyncPodType) + kl.podManager.UpdatePods(u, podSyncTypes) + unsyncedPod = true + kl.syncLoopMonitor.Store(time.Now()) + case <-time.After(kl.resyncInterval): + glog.V(4).Infof("Periodic sync") + } + start := time.Now() + // If we already caught some update, try to wait for some short time + // to possibly batch it with other incoming updates. + for unsyncedPod { select { - case u, ok := <-updates: - if !ok { - glog.Errorf("Update channel is closed. Exiting the sync loop.") - return - } + case u := <-updates: kl.podManager.UpdatePods(u, podSyncTypes) - unsyncedPod = true - case <-time.After(kl.resyncInterval): - glog.V(4).Infof("Periodic sync") - } - start := time.Now() - // If we already caught some update, try to wait for some short time - // to possibly batch it with other incoming updates. - for unsyncedPod { - select { - case u := <-updates: - kl.podManager.UpdatePods(u, podSyncTypes) - case <-time.After(5 * time.Millisecond): - // Break the for loop. - unsyncedPod = false - } - } - pods, mirrorPods := kl.podManager.GetPodsAndMirrorMap() - if err := handler.SyncPods(pods, podSyncTypes, mirrorPods, start); err != nil { - glog.Errorf("Couldn't sync containers: %v", err) + kl.syncLoopMonitor.Store(time.Now()) + case <-time.After(5 * time.Millisecond): + // Break the for loop. + unsyncedPod = false } } + pods, mirrorPods := kl.podManager.GetPodsAndMirrorMap() + kl.syncLoopMonitor.Store(time.Now()) + if err := handler.SyncPods(pods, podSyncTypes, mirrorPods, start); err != nil { + glog.Errorf("Couldn't sync containers: %v", err) + } + kl.syncLoopMonitor.Store(time.Now()) +} + +func (kl *Kubelet) LatestLoopEntryTime() time.Time { + val := kl.syncLoopMonitor.Load() + if val == nil { + return time.Time{} + } + return val.(time.Time) } // Returns the container runtime version for this Kubelet. @@ -2274,6 +2295,10 @@ func (kl *Kubelet) StreamingConnectionIdleTimeout() time.Duration { return kl.streamingConnectionIdleTimeout } +func (kl *Kubelet) ResyncInterval() time.Duration { + return kl.resyncInterval +} + // GetContainerInfo returns stats (from Cadvisor) for a container. func (kl *Kubelet) GetContainerInfo(podFullName string, podUID types.UID, containerName string, req *cadvisorApi.ContainerInfoRequest) (*cadvisorApi.ContainerInfo, error) { diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index 9f575939d3c..96183eda521 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -299,6 +299,28 @@ func TestKubeletDirsCompat(t *testing.T) { var emptyPodUIDs map[types.UID]SyncPodType +func TestSyncLoopTimeUpdate(t *testing.T) { + testKubelet := newTestKubelet(t) + testKubelet.fakeCadvisor.On("MachineInfo").Return(&cadvisorApi.MachineInfo{}, nil) + kubelet := testKubelet.kubelet + + loopTime1 := kubelet.LatestLoopEntryTime() + if !loopTime1.IsZero() { + t.Errorf("Unexpected sync loop time: %s, expected 0", loopTime1) + } + + kubelet.syncLoopIteration(make(chan PodUpdate), kubelet) + loopTime2 := kubelet.LatestLoopEntryTime() + if loopTime2.IsZero() { + t.Errorf("Unexpected sync loop time: 0, expected non-zero value.") + } + kubelet.syncLoopIteration(make(chan PodUpdate), kubelet) + loopTime3 := kubelet.LatestLoopEntryTime() + if !loopTime3.After(loopTime1) { + t.Errorf("Sync Loop Time was not updated correctly. Second update timestamp should be greater than first update timestamp") + } +} + func TestSyncPodsStartPod(t *testing.T) { testKubelet := newTestKubelet(t) testKubelet.fakeCadvisor.On("MachineInfo").Return(&cadvisorApi.MachineInfo{}, nil) diff --git a/pkg/kubelet/server.go b/pkg/kubelet/server.go index 35d709645bb..32d0c3a7dd3 100644 --- a/pkg/kubelet/server.go +++ b/pkg/kubelet/server.go @@ -80,14 +80,12 @@ func ListenAndServeKubeletServer(host HostInterface, address net.IP, port uint, // ListenAndServeKubeletReadOnlyServer initializes a server to respond to HTTP network requests on the Kubelet. func ListenAndServeKubeletReadOnlyServer(host HostInterface, address net.IP, port uint) { glog.V(1).Infof("Starting to listen read-only on %s:%d", address, port) - s := &Server{host, http.NewServeMux()} - healthz.InstallHandler(s.mux) - s.mux.HandleFunc("/stats/", s.handleStats) + s := NewServer(host, false) s.mux.Handle("/metrics", prometheus.Handler()) server := &http.Server{ Addr: net.JoinHostPort(address.String(), strconv.FormatUint(uint64(port), 10)), - Handler: s, + Handler: &s, ReadTimeout: 5 * time.Minute, WriteTimeout: 5 * time.Minute, MaxHeaderBytes: 1 << 20, @@ -110,7 +108,9 @@ type HostInterface interface { ServeLogs(w http.ResponseWriter, req *http.Request) PortForward(name string, uid types.UID, port uint16, stream io.ReadWriteCloser) error StreamingConnectionIdleTimeout() time.Duration + ResyncInterval() time.Duration GetHostname() string + LatestLoopEntryTime() time.Time } // NewServer initializes and configures a kubelet.Server object to handle HTTP requests. @@ -132,6 +132,7 @@ func (s *Server) InstallDefaultHandlers() { healthz.PingHealthz, healthz.NamedCheck("docker", s.dockerHealthCheck), healthz.NamedCheck("hostname", s.hostnameHealthCheck), + healthz.NamedCheck("syncloop", s.syncLoopHealthCheck), ) s.mux.HandleFunc("/pods", s.handlePods) s.mux.HandleFunc("/stats/", s.handleStats) @@ -195,6 +196,20 @@ func (s *Server) hostnameHealthCheck(req *http.Request) error { return nil } +// Checks if kubelet's sync loop that updates containers is working. +func (s *Server) syncLoopHealthCheck(req *http.Request) error { + duration := s.host.ResyncInterval() * 2 + minDuration := time.Minute * 5 + if duration < minDuration { + duration = minDuration + } + enterLoopTime := s.host.LatestLoopEntryTime() + if !enterLoopTime.IsZero() && time.Now().After(enterLoopTime.Add(duration)) { + return fmt.Errorf("Sync Loop took longer than expected.") + } + return nil +} + // handleContainerLogs handles containerLogs request against the Kubelet func (s *Server) handleContainerLogs(w http.ResponseWriter, req *http.Request) { defer req.Body.Close() diff --git a/pkg/kubelet/server_test.go b/pkg/kubelet/server_test.go index c0c7814ee49..51de9625a75 100644 --- a/pkg/kubelet/server_test.go +++ b/pkg/kubelet/server_test.go @@ -55,6 +55,16 @@ type fakeKubelet struct { containerLogsFunc func(podFullName, containerName, tail string, follow, pervious bool, stdout, stderr io.Writer) error streamingConnectionIdleTimeoutFunc func() time.Duration hostnameFunc func() string + resyncInterval time.Duration + loopEntryTime time.Time +} + +func (fk *fakeKubelet) ResyncInterval() time.Duration { + return fk.resyncInterval +} + +func (fk *fakeKubelet) LatestLoopEntryTime() time.Time { + return fk.loopEntryTime } func (fk *fakeKubelet) GetPodByName(namespace, name string) (*api.Pod, bool) { @@ -453,51 +463,71 @@ func TestHealthCheck(t *testing.T) { } // Test with correct hostname, Docker version - resp, err := http.Get(fw.testHTTPServer.URL + "/healthz") - if err != nil { - t.Fatalf("Got error GETing: %v", err) - } - defer resp.Body.Close() - if resp.StatusCode != http.StatusOK { - t.Errorf("expected status code %d, got %d", http.StatusOK, resp.StatusCode) - } - body, err := ioutil.ReadAll(resp.Body) - if err != nil { - // copying the response body did not work - t.Fatalf("Cannot copy resp: %#v", err) - } - result := string(body) - if !strings.Contains(result, "ok") { - t.Errorf("expected body contains ok, got %s", result) - } + assertHealthIsOk(t, fw.testHTTPServer.URL+"/healthz") //Test with incorrect hostname fw.fakeKubelet.hostnameFunc = func() string { return "fake" } - resp, err = http.Get(fw.testHTTPServer.URL + "/healthz") - if err != nil { - t.Fatalf("Got error GETing: %v", err) - } - defer resp.Body.Close() - if resp.StatusCode != http.StatusOK { - t.Errorf("expected status code %d, got %d", http.StatusOK, resp.StatusCode) - } + assertHealthIsOk(t, fw.testHTTPServer.URL+"/healthz") //Test with old container runtime version fw.fakeKubelet.containerVersionFunc = func() (kubecontainer.Version, error) { return dockertools.NewVersion("1.1") } - resp, err = http.Get(fw.testHTTPServer.URL + "/healthz") + assertHealthFails(t, fw.testHTTPServer.URL+"/healthz", http.StatusInternalServerError) +} + +func assertHealthFails(t *testing.T, httpURL string, expectedErrorCode int) { + resp, err := http.Get(httpURL) if err != nil { t.Fatalf("Got error GETing: %v", err) } defer resp.Body.Close() - if resp.StatusCode != http.StatusInternalServerError { - t.Errorf("expected status code %d, got %d", http.StatusInternalServerError, resp.StatusCode) + if resp.StatusCode != expectedErrorCode { + t.Errorf("expected status code %d, got %d", expectedErrorCode, resp.StatusCode) + } +} + +func TestSyncLoopCheck(t *testing.T) { + fw := newServerTest() + fw.fakeKubelet.containerVersionFunc = func() (kubecontainer.Version, error) { + return dockertools.NewVersion("1.15") + } + fw.fakeKubelet.hostnameFunc = func() string { + return "127.0.0.1" } + fw.fakeKubelet.resyncInterval = time.Minute + fw.fakeKubelet.loopEntryTime = time.Now() + + // Test with correct hostname, Docker version + assertHealthIsOk(t, fw.testHTTPServer.URL+"/healthz") + + fw.fakeKubelet.loopEntryTime = time.Now().Add(time.Minute * -10) + assertHealthFails(t, fw.testHTTPServer.URL+"/healthz", http.StatusInternalServerError) +} + +// returns http response status code from the HTTP GET +func assertHealthIsOk(t *testing.T, httpURL string) { + resp, err := http.Get(httpURL) + if err != nil { + t.Fatalf("Got error GETing: %v", err) + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + t.Errorf("expected status code %d, got %d", http.StatusOK, resp.StatusCode) + } + body, readErr := ioutil.ReadAll(resp.Body) + if readErr != nil { + // copying the response body did not work + t.Fatalf("Cannot copy resp: %#v", readErr) + } + result := string(body) + if !strings.Contains(result, "ok") { + t.Errorf("expected body contains ok, got %s", result) + } } func setPodByNameFunc(fw *serverTestFramework, namespace, pod, container string) { diff --git a/pkg/util/atomic_value.go b/pkg/util/atomic_value.go new file mode 100644 index 00000000000..3bb1a317601 --- /dev/null +++ b/pkg/util/atomic_value.go @@ -0,0 +1,42 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package util + +import ( + "sync" +) + +// TODO(ArtfulCoder) +// sync/atomic/Value was added in golang 1.4 +// Once support is dropped for go 1.3, this type must be deprecated in favor of sync/atomic/Value. +// The functions are named Load/Store to match sync/atomic/Value function names. +type AtomicValue struct { + value interface{} + valueMutex sync.RWMutex +} + +func (at *AtomicValue) Store(val interface{}) { + at.valueMutex.Lock() + defer at.valueMutex.Unlock() + at.value = val +} + +func (at *AtomicValue) Load() interface{} { + at.valueMutex.RLock() + defer at.valueMutex.RUnlock() + return at.value +} diff --git a/pkg/util/atomic_value_test.go b/pkg/util/atomic_value_test.go new file mode 100644 index 00000000000..29b8bd076b6 --- /dev/null +++ b/pkg/util/atomic_value_test.go @@ -0,0 +1,50 @@ +/* +Copyright 2014 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package util + +import ( + "testing" + "time" +) + +func ExpectValue(t *testing.T, atomicValue *AtomicValue, expectedValue interface{}) { + actualValue := atomicValue.Load() + if actualValue != expectedValue { + t.Error("Expected to find %v, found %v", expectedValue, actualValue) + } + ch := make(chan interface{}) + go func() { + ch <- atomicValue.Load() + }() + select { + case actualValue = <-ch: + if actualValue != expectedValue { + t.Error("Expected to find %v, found %v", expectedValue, actualValue) + return + } + case <-time.After(time.Second * 5): + t.Error("Value could not be read") + return + } +} + +func TestAtomicValue(t *testing.T) { + atomicValue := &AtomicValue{} + ExpectValue(t, atomicValue, nil) + atomicValue.Store(10) + ExpectValue(t, atomicValue, 10) +}