diff --git a/pkg/kubelet/active_deadline.go b/pkg/kubelet/active_deadline.go new file mode 100644 index 00000000000..4a613ef6aad --- /dev/null +++ b/pkg/kubelet/active_deadline.go @@ -0,0 +1,98 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kubelet + +import ( + "fmt" + "time" + + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/client/record" + "k8s.io/kubernetes/pkg/kubelet/lifecycle" + "k8s.io/kubernetes/pkg/kubelet/status" + "k8s.io/kubernetes/pkg/util" +) + +const ( + reason = "DeadlineExceeded" + message = "Pod was active on the node longer than the specified deadline" +) + +// activeDeadlineHandler knows how to enforce active deadlines on pods. +type activeDeadlineHandler struct { + // the clock to use for deadline enforcement + clock util.Clock + // the provider of pod status + podStatusProvider status.PodStatusProvider + // the recorder to dispatch events when we identify a pod has exceeded active deadline + recorder record.EventRecorder +} + +// newActiveDeadlineHandler returns an active deadline handler that can enforce pod active deadline +func newActiveDeadlineHandler( + podStatusProvider status.PodStatusProvider, + recorder record.EventRecorder, + clock util.Clock, +) (*activeDeadlineHandler, error) { + + // check for all required fields + if clock == nil || podStatusProvider == nil || recorder == nil { + return nil, fmt.Errorf("Required arguments must not be nil: %v, %v, %v", clock, podStatusProvider, recorder) + } + return &activeDeadlineHandler{ + clock: clock, + podStatusProvider: podStatusProvider, + recorder: recorder, + }, nil +} + +// ShouldSync returns true if the pod is past its active deadline. +func (m *activeDeadlineHandler) ShouldSync(pod *api.Pod) bool { + return m.pastActiveDeadline(pod) +} + +// ShouldEvict returns true if the pod is past its active deadline. +// It dispatches an event that the pod should be evicted if it is past its deadline. +func (m *activeDeadlineHandler) ShouldEvict(pod *api.Pod) lifecycle.ShouldEvictResponse { + if !m.pastActiveDeadline(pod) { + return lifecycle.ShouldEvictResponse{Evict: false} + } + m.recorder.Eventf(pod, api.EventTypeNormal, reason, message) + return lifecycle.ShouldEvictResponse{Evict: true, Reason: reason, Message: message} +} + +// pastActiveDeadline returns true if the pod has been active for more than its ActiveDeadlineSeconds +func (m *activeDeadlineHandler) pastActiveDeadline(pod *api.Pod) bool { + // no active deadline was specified + if pod.Spec.ActiveDeadlineSeconds == nil { + return false + } + // get the latest status to determine if it was started + podStatus, ok := m.podStatusProvider.GetPodStatus(pod.UID) + if !ok { + podStatus = pod.Status + } + // we have no start time so just return + if podStatus.StartTime.IsZero() { + return false + } + // determine if the deadline was exceeded + start := podStatus.StartTime.Time + duration := m.clock.Since(start) + allowedDuration := time.Duration(*pod.Spec.ActiveDeadlineSeconds) * time.Second + return duration >= allowedDuration +} diff --git a/pkg/kubelet/active_deadline_test.go b/pkg/kubelet/active_deadline_test.go new file mode 100644 index 00000000000..e343ab283d5 --- /dev/null +++ b/pkg/kubelet/active_deadline_test.go @@ -0,0 +1,95 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kubelet + +import ( + "testing" + "time" + + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/unversioned" + "k8s.io/kubernetes/pkg/client/record" + "k8s.io/kubernetes/pkg/types" + "k8s.io/kubernetes/pkg/util" +) + +// mockPodStatusProvider returns the status on the specified pod +type mockPodStatusProvider struct { + pods []*api.Pod +} + +// GetPodStatus returns the status on the associated pod with matching uid (if found) +func (m *mockPodStatusProvider) GetPodStatus(uid types.UID) (api.PodStatus, bool) { + for _, pod := range m.pods { + if pod.UID == uid { + return pod.Status, true + } + } + return api.PodStatus{}, false +} + +// TestActiveDeadlineHandler verifies the active deadline handler functions as expected. +func TestActiveDeadlineHandler(t *testing.T) { + pods := newTestPods(4) + fakeClock := util.NewFakeClock(time.Now()) + podStatusProvider := &mockPodStatusProvider{pods: pods} + fakeRecorder := &record.FakeRecorder{} + handler, err := newActiveDeadlineHandler(podStatusProvider, fakeRecorder, fakeClock) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + now := unversioned.Now() + startTime := unversioned.NewTime(now.Time.Add(-1 * time.Minute)) + + // this pod has exceeded its active deadline + exceededActiveDeadlineSeconds := int64(30) + pods[0].Status.StartTime = &startTime + pods[0].Spec.ActiveDeadlineSeconds = &exceededActiveDeadlineSeconds + + // this pod has not exceeded its active deadline + notYetActiveDeadlineSeconds := int64(120) + pods[1].Status.StartTime = &startTime + pods[1].Spec.ActiveDeadlineSeconds = ¬YetActiveDeadlineSeconds + + // this pod has no deadline + pods[2].Status.StartTime = &startTime + pods[2].Spec.ActiveDeadlineSeconds = nil + + testCases := []struct { + pod *api.Pod + expected bool + }{{pods[0], true}, {pods[1], false}, {pods[2], false}, {pods[3], false}} + + for i, testCase := range testCases { + if actual := handler.ShouldSync(testCase.pod); actual != testCase.expected { + t.Errorf("[%d] ShouldSync expected %#v, got %#v", i, testCase.expected, actual) + } + actual := handler.ShouldEvict(testCase.pod) + if actual.Evict != testCase.expected { + t.Errorf("[%d] ShouldEvict.Evict expected %#v, got %#v", i, testCase.expected, actual.Evict) + } + if testCase.expected { + if actual.Reason != reason { + t.Errorf("[%d] ShouldEvict.Reason expected %#v, got %#v", i, message, actual.Reason) + } + if actual.Message != message { + t.Errorf("[%d] ShouldEvict.Message expected %#v, got %#v", i, message, actual.Message) + } + } + } +} diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index d6f79a5d9c6..c8a5c6c8a22 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -530,6 +530,14 @@ func NewMainKubelet( klet.evictionManager = evictionManager klet.AddPodAdmitHandler(evictionAdmitHandler) + // enable active deadline handler + activeDeadlineHandler, err := newActiveDeadlineHandler(klet.statusManager, klet.recorder, klet.clock) + if err != nil { + return nil, err + } + klet.AddPodSyncLoopHandler(activeDeadlineHandler) + klet.AddPodSyncHandler(activeDeadlineHandler) + // apply functional Option's for _, opt := range kubeOptions { opt(klet) @@ -2076,29 +2084,8 @@ func (kl *Kubelet) cleanupBandwidthLimits(allPods []*api.Pod) error { return nil } -// pastActiveDeadline returns true if the pod has been active for more than -// ActiveDeadlineSeconds. -func (kl *Kubelet) pastActiveDeadline(pod *api.Pod) bool { - if pod.Spec.ActiveDeadlineSeconds != nil { - podStatus, ok := kl.statusManager.GetPodStatus(pod.UID) - if !ok { - podStatus = pod.Status - } - if !podStatus.StartTime.IsZero() { - startTime := podStatus.StartTime.Time - duration := kl.clock.Since(startTime) - allowedDuration := time.Duration(*pod.Spec.ActiveDeadlineSeconds) * time.Second - if duration >= allowedDuration { - return true - } - } - } - return false -} - // Get pods which should be resynchronized. Currently, the following pod should be resynchronized: // * pod whose work is ready. -// * pod past the active deadline. // * internal modules that request sync of a pod. func (kl *Kubelet) getPodsToSync() []*api.Pod { allPods := kl.podManager.GetPods() @@ -2109,12 +2096,6 @@ func (kl *Kubelet) getPodsToSync() []*api.Pod { } var podsToSync []*api.Pod for _, pod := range allPods { - // TODO: move active deadline code into a sync/evict pattern - if kl.pastActiveDeadline(pod) { - // The pod has passed the active deadline - podsToSync = append(podsToSync, pod) - continue - } if podUIDSet.Has(string(pod.UID)) { // The work of the pod is ready podsToSync = append(podsToSync, pod) @@ -3546,17 +3527,6 @@ func (kl *Kubelet) generateAPIPodStatus(pod *api.Pod, podStatus *kubecontainer.P } } - // TODO: Consider include the container information. - // TODO: move this into a sync/evictor - if kl.pastActiveDeadline(pod) { - reason := "DeadlineExceeded" - kl.recorder.Eventf(pod, api.EventTypeNormal, reason, "Pod was active on the node longer than specified deadline") - return api.PodStatus{ - Phase: api.PodFailed, - Reason: reason, - Message: "Pod was active on the node longer than specified deadline"} - } - s := kl.convertStatusToAPIStatus(pod, podStatus) // Assume info is ready to process diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index 27642a6b368..e9d6ec049e0 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -188,6 +188,7 @@ func newTestKubeletWithImageList( fakeRecorder := &record.FakeRecorder{} fakeKubeClient := &fake.Clientset{} kubelet := &Kubelet{} + kubelet.recorder = fakeRecorder kubelet.kubeClient = fakeKubeClient kubelet.os = &containertest.FakeOS{} @@ -305,6 +306,14 @@ func newTestKubeletWithImageList( t.Fatalf("failed to initialize volume manager: %v", err) } + // enable active deadline handler + activeDeadlineHandler, err := newActiveDeadlineHandler(kubelet.statusManager, kubelet.recorder, kubelet.clock) + if err != nil { + t.Fatalf("can't initialize active deadline handler: %v", err) + } + kubelet.AddPodSyncLoopHandler(activeDeadlineHandler) + kubelet.AddPodSyncHandler(activeDeadlineHandler) + return &TestKubelet{kubelet, fakeRuntime, mockCadvisor, fakeKubeClient, fakeMirrorClient, fakeClock, nil, plug} } @@ -3891,33 +3900,6 @@ func TestMakePortMappings(t *testing.T) { } } -func TestIsPodPastActiveDeadline(t *testing.T) { - testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) - kubelet := testKubelet.kubelet - pods := newTestPods(5) - - exceededActiveDeadlineSeconds := int64(30) - notYetActiveDeadlineSeconds := int64(120) - now := unversioned.Now() - startTime := unversioned.NewTime(now.Time.Add(-1 * time.Minute)) - pods[0].Status.StartTime = &startTime - pods[0].Spec.ActiveDeadlineSeconds = &exceededActiveDeadlineSeconds - pods[1].Status.StartTime = &startTime - pods[1].Spec.ActiveDeadlineSeconds = ¬YetActiveDeadlineSeconds - tests := []struct { - pod *api.Pod - expected bool - }{{pods[0], true}, {pods[1], false}, {pods[2], false}, {pods[3], false}, {pods[4], false}} - - kubelet.podManager.SetPods(pods) - for i, tt := range tests { - actual := kubelet.pastActiveDeadline(tt.pod) - if actual != tt.expected { - t.Errorf("[%d] expected %#v, got %#v", i, tt.expected, actual) - } - } -} - func TestSyncPodsSetStatusToFailedForPodsThatRunTooLong(t *testing.T) { testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) fakeRuntime := testKubelet.fakeRuntime @@ -3965,7 +3947,7 @@ func TestSyncPodsSetStatusToFailedForPodsThatRunTooLong(t *testing.T) { t.Errorf("expected to found status for pod %q", pods[0].UID) } if status.Phase != api.PodFailed { - t.Fatalf("expected pod status %q, ot %q.", api.PodFailed, status.Phase) + t.Fatalf("expected pod status %q, got %q.", api.PodFailed, status.Phase) } } diff --git a/pkg/kubelet/status/status_manager.go b/pkg/kubelet/status/status_manager.go index 704dc08ca9d..382f18d0035 100644 --- a/pkg/kubelet/status/status_manager.go +++ b/pkg/kubelet/status/status_manager.go @@ -66,15 +66,21 @@ type manager struct { apiStatusVersions map[types.UID]uint64 } -// status.Manager is the Source of truth for kubelet pod status, and should be kept up-to-date with -// the latest api.PodStatus. It also syncs updates back to the API server. -type Manager interface { - // Start the API server status sync loop. - Start() - +// PodStatusProvider knows how to provide status for a pod. It's intended to be used by other components +// that need to introspect status. +type PodStatusProvider interface { // GetPodStatus returns the cached status for the provided pod UID, as well as whether it // was a cache hit. GetPodStatus(uid types.UID) (api.PodStatus, bool) +} + +// Manager is the Source of truth for kubelet pod status, and should be kept up-to-date with +// the latest api.PodStatus. It also syncs updates back to the API server. +type Manager interface { + PodStatusProvider + + // Start the API server status sync loop. + Start() // SetPodStatus caches updates the cached status for the given pod, and triggers a status update. SetPodStatus(pod *api.Pod, status api.PodStatus)