mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-29 22:46:12 +00:00
Move ActiveDeadlineSeconds to use lifecycle observers
This commit is contained in:
parent
3895cede49
commit
c162fec94d
98
pkg/kubelet/active_deadline.go
Normal file
98
pkg/kubelet/active_deadline.go
Normal file
@ -0,0 +1,98 @@
|
||||
/*
|
||||
Copyright 2016 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package kubelet
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/client/record"
|
||||
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
|
||||
"k8s.io/kubernetes/pkg/kubelet/status"
|
||||
"k8s.io/kubernetes/pkg/util"
|
||||
)
|
||||
|
||||
const (
|
||||
reason = "DeadlineExceeded"
|
||||
message = "Pod was active on the node longer than the specified deadline"
|
||||
)
|
||||
|
||||
// activeDeadlineHandler knows how to enforce active deadlines on pods.
|
||||
type activeDeadlineHandler struct {
|
||||
// the clock to use for deadline enforcement
|
||||
clock util.Clock
|
||||
// the provider of pod status
|
||||
podStatusProvider status.PodStatusProvider
|
||||
// the recorder to dispatch events when we identify a pod has exceeded active deadline
|
||||
recorder record.EventRecorder
|
||||
}
|
||||
|
||||
// newActiveDeadlineHandler returns an active deadline handler that can enforce pod active deadline
|
||||
func newActiveDeadlineHandler(
|
||||
podStatusProvider status.PodStatusProvider,
|
||||
recorder record.EventRecorder,
|
||||
clock util.Clock,
|
||||
) (*activeDeadlineHandler, error) {
|
||||
|
||||
// check for all required fields
|
||||
if clock == nil || podStatusProvider == nil || recorder == nil {
|
||||
return nil, fmt.Errorf("Required arguments must not be nil: %v, %v, %v", clock, podStatusProvider, recorder)
|
||||
}
|
||||
return &activeDeadlineHandler{
|
||||
clock: clock,
|
||||
podStatusProvider: podStatusProvider,
|
||||
recorder: recorder,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// ShouldSync returns true if the pod is past its active deadline.
|
||||
func (m *activeDeadlineHandler) ShouldSync(pod *api.Pod) bool {
|
||||
return m.pastActiveDeadline(pod)
|
||||
}
|
||||
|
||||
// ShouldEvict returns true if the pod is past its active deadline.
|
||||
// It dispatches an event that the pod should be evicted if it is past its deadline.
|
||||
func (m *activeDeadlineHandler) ShouldEvict(pod *api.Pod) lifecycle.ShouldEvictResponse {
|
||||
if !m.pastActiveDeadline(pod) {
|
||||
return lifecycle.ShouldEvictResponse{Evict: false}
|
||||
}
|
||||
m.recorder.Eventf(pod, api.EventTypeNormal, reason, message)
|
||||
return lifecycle.ShouldEvictResponse{Evict: true, Reason: reason, Message: message}
|
||||
}
|
||||
|
||||
// pastActiveDeadline returns true if the pod has been active for more than its ActiveDeadlineSeconds
|
||||
func (m *activeDeadlineHandler) pastActiveDeadline(pod *api.Pod) bool {
|
||||
// no active deadline was specified
|
||||
if pod.Spec.ActiveDeadlineSeconds == nil {
|
||||
return false
|
||||
}
|
||||
// get the latest status to determine if it was started
|
||||
podStatus, ok := m.podStatusProvider.GetPodStatus(pod.UID)
|
||||
if !ok {
|
||||
podStatus = pod.Status
|
||||
}
|
||||
// we have no start time so just return
|
||||
if podStatus.StartTime.IsZero() {
|
||||
return false
|
||||
}
|
||||
// determine if the deadline was exceeded
|
||||
start := podStatus.StartTime.Time
|
||||
duration := m.clock.Since(start)
|
||||
allowedDuration := time.Duration(*pod.Spec.ActiveDeadlineSeconds) * time.Second
|
||||
return duration >= allowedDuration
|
||||
}
|
95
pkg/kubelet/active_deadline_test.go
Normal file
95
pkg/kubelet/active_deadline_test.go
Normal file
@ -0,0 +1,95 @@
|
||||
/*
|
||||
Copyright 2016 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package kubelet
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/api/unversioned"
|
||||
"k8s.io/kubernetes/pkg/client/record"
|
||||
"k8s.io/kubernetes/pkg/types"
|
||||
"k8s.io/kubernetes/pkg/util"
|
||||
)
|
||||
|
||||
// mockPodStatusProvider returns the status on the specified pod
|
||||
type mockPodStatusProvider struct {
|
||||
pods []*api.Pod
|
||||
}
|
||||
|
||||
// GetPodStatus returns the status on the associated pod with matching uid (if found)
|
||||
func (m *mockPodStatusProvider) GetPodStatus(uid types.UID) (api.PodStatus, bool) {
|
||||
for _, pod := range m.pods {
|
||||
if pod.UID == uid {
|
||||
return pod.Status, true
|
||||
}
|
||||
}
|
||||
return api.PodStatus{}, false
|
||||
}
|
||||
|
||||
// TestActiveDeadlineHandler verifies the active deadline handler functions as expected.
|
||||
func TestActiveDeadlineHandler(t *testing.T) {
|
||||
pods := newTestPods(4)
|
||||
fakeClock := util.NewFakeClock(time.Now())
|
||||
podStatusProvider := &mockPodStatusProvider{pods: pods}
|
||||
fakeRecorder := &record.FakeRecorder{}
|
||||
handler, err := newActiveDeadlineHandler(podStatusProvider, fakeRecorder, fakeClock)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
now := unversioned.Now()
|
||||
startTime := unversioned.NewTime(now.Time.Add(-1 * time.Minute))
|
||||
|
||||
// this pod has exceeded its active deadline
|
||||
exceededActiveDeadlineSeconds := int64(30)
|
||||
pods[0].Status.StartTime = &startTime
|
||||
pods[0].Spec.ActiveDeadlineSeconds = &exceededActiveDeadlineSeconds
|
||||
|
||||
// this pod has not exceeded its active deadline
|
||||
notYetActiveDeadlineSeconds := int64(120)
|
||||
pods[1].Status.StartTime = &startTime
|
||||
pods[1].Spec.ActiveDeadlineSeconds = ¬YetActiveDeadlineSeconds
|
||||
|
||||
// this pod has no deadline
|
||||
pods[2].Status.StartTime = &startTime
|
||||
pods[2].Spec.ActiveDeadlineSeconds = nil
|
||||
|
||||
testCases := []struct {
|
||||
pod *api.Pod
|
||||
expected bool
|
||||
}{{pods[0], true}, {pods[1], false}, {pods[2], false}, {pods[3], false}}
|
||||
|
||||
for i, testCase := range testCases {
|
||||
if actual := handler.ShouldSync(testCase.pod); actual != testCase.expected {
|
||||
t.Errorf("[%d] ShouldSync expected %#v, got %#v", i, testCase.expected, actual)
|
||||
}
|
||||
actual := handler.ShouldEvict(testCase.pod)
|
||||
if actual.Evict != testCase.expected {
|
||||
t.Errorf("[%d] ShouldEvict.Evict expected %#v, got %#v", i, testCase.expected, actual.Evict)
|
||||
}
|
||||
if testCase.expected {
|
||||
if actual.Reason != reason {
|
||||
t.Errorf("[%d] ShouldEvict.Reason expected %#v, got %#v", i, message, actual.Reason)
|
||||
}
|
||||
if actual.Message != message {
|
||||
t.Errorf("[%d] ShouldEvict.Message expected %#v, got %#v", i, message, actual.Message)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@ -530,6 +530,14 @@ func NewMainKubelet(
|
||||
klet.evictionManager = evictionManager
|
||||
klet.AddPodAdmitHandler(evictionAdmitHandler)
|
||||
|
||||
// enable active deadline handler
|
||||
activeDeadlineHandler, err := newActiveDeadlineHandler(klet.statusManager, klet.recorder, klet.clock)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
klet.AddPodSyncLoopHandler(activeDeadlineHandler)
|
||||
klet.AddPodSyncHandler(activeDeadlineHandler)
|
||||
|
||||
// apply functional Option's
|
||||
for _, opt := range kubeOptions {
|
||||
opt(klet)
|
||||
@ -2076,29 +2084,8 @@ func (kl *Kubelet) cleanupBandwidthLimits(allPods []*api.Pod) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// pastActiveDeadline returns true if the pod has been active for more than
|
||||
// ActiveDeadlineSeconds.
|
||||
func (kl *Kubelet) pastActiveDeadline(pod *api.Pod) bool {
|
||||
if pod.Spec.ActiveDeadlineSeconds != nil {
|
||||
podStatus, ok := kl.statusManager.GetPodStatus(pod.UID)
|
||||
if !ok {
|
||||
podStatus = pod.Status
|
||||
}
|
||||
if !podStatus.StartTime.IsZero() {
|
||||
startTime := podStatus.StartTime.Time
|
||||
duration := kl.clock.Since(startTime)
|
||||
allowedDuration := time.Duration(*pod.Spec.ActiveDeadlineSeconds) * time.Second
|
||||
if duration >= allowedDuration {
|
||||
return true
|
||||
}
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// Get pods which should be resynchronized. Currently, the following pod should be resynchronized:
|
||||
// * pod whose work is ready.
|
||||
// * pod past the active deadline.
|
||||
// * internal modules that request sync of a pod.
|
||||
func (kl *Kubelet) getPodsToSync() []*api.Pod {
|
||||
allPods := kl.podManager.GetPods()
|
||||
@ -2109,12 +2096,6 @@ func (kl *Kubelet) getPodsToSync() []*api.Pod {
|
||||
}
|
||||
var podsToSync []*api.Pod
|
||||
for _, pod := range allPods {
|
||||
// TODO: move active deadline code into a sync/evict pattern
|
||||
if kl.pastActiveDeadline(pod) {
|
||||
// The pod has passed the active deadline
|
||||
podsToSync = append(podsToSync, pod)
|
||||
continue
|
||||
}
|
||||
if podUIDSet.Has(string(pod.UID)) {
|
||||
// The work of the pod is ready
|
||||
podsToSync = append(podsToSync, pod)
|
||||
@ -3546,17 +3527,6 @@ func (kl *Kubelet) generateAPIPodStatus(pod *api.Pod, podStatus *kubecontainer.P
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: Consider include the container information.
|
||||
// TODO: move this into a sync/evictor
|
||||
if kl.pastActiveDeadline(pod) {
|
||||
reason := "DeadlineExceeded"
|
||||
kl.recorder.Eventf(pod, api.EventTypeNormal, reason, "Pod was active on the node longer than specified deadline")
|
||||
return api.PodStatus{
|
||||
Phase: api.PodFailed,
|
||||
Reason: reason,
|
||||
Message: "Pod was active on the node longer than specified deadline"}
|
||||
}
|
||||
|
||||
s := kl.convertStatusToAPIStatus(pod, podStatus)
|
||||
|
||||
// Assume info is ready to process
|
||||
|
@ -188,6 +188,7 @@ func newTestKubeletWithImageList(
|
||||
fakeRecorder := &record.FakeRecorder{}
|
||||
fakeKubeClient := &fake.Clientset{}
|
||||
kubelet := &Kubelet{}
|
||||
kubelet.recorder = fakeRecorder
|
||||
kubelet.kubeClient = fakeKubeClient
|
||||
kubelet.os = &containertest.FakeOS{}
|
||||
|
||||
@ -305,6 +306,14 @@ func newTestKubeletWithImageList(
|
||||
t.Fatalf("failed to initialize volume manager: %v", err)
|
||||
}
|
||||
|
||||
// enable active deadline handler
|
||||
activeDeadlineHandler, err := newActiveDeadlineHandler(kubelet.statusManager, kubelet.recorder, kubelet.clock)
|
||||
if err != nil {
|
||||
t.Fatalf("can't initialize active deadline handler: %v", err)
|
||||
}
|
||||
kubelet.AddPodSyncLoopHandler(activeDeadlineHandler)
|
||||
kubelet.AddPodSyncHandler(activeDeadlineHandler)
|
||||
|
||||
return &TestKubelet{kubelet, fakeRuntime, mockCadvisor, fakeKubeClient, fakeMirrorClient, fakeClock, nil, plug}
|
||||
}
|
||||
|
||||
@ -3891,33 +3900,6 @@ func TestMakePortMappings(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestIsPodPastActiveDeadline(t *testing.T) {
|
||||
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
|
||||
kubelet := testKubelet.kubelet
|
||||
pods := newTestPods(5)
|
||||
|
||||
exceededActiveDeadlineSeconds := int64(30)
|
||||
notYetActiveDeadlineSeconds := int64(120)
|
||||
now := unversioned.Now()
|
||||
startTime := unversioned.NewTime(now.Time.Add(-1 * time.Minute))
|
||||
pods[0].Status.StartTime = &startTime
|
||||
pods[0].Spec.ActiveDeadlineSeconds = &exceededActiveDeadlineSeconds
|
||||
pods[1].Status.StartTime = &startTime
|
||||
pods[1].Spec.ActiveDeadlineSeconds = ¬YetActiveDeadlineSeconds
|
||||
tests := []struct {
|
||||
pod *api.Pod
|
||||
expected bool
|
||||
}{{pods[0], true}, {pods[1], false}, {pods[2], false}, {pods[3], false}, {pods[4], false}}
|
||||
|
||||
kubelet.podManager.SetPods(pods)
|
||||
for i, tt := range tests {
|
||||
actual := kubelet.pastActiveDeadline(tt.pod)
|
||||
if actual != tt.expected {
|
||||
t.Errorf("[%d] expected %#v, got %#v", i, tt.expected, actual)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestSyncPodsSetStatusToFailedForPodsThatRunTooLong(t *testing.T) {
|
||||
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
|
||||
fakeRuntime := testKubelet.fakeRuntime
|
||||
@ -3965,7 +3947,7 @@ func TestSyncPodsSetStatusToFailedForPodsThatRunTooLong(t *testing.T) {
|
||||
t.Errorf("expected to found status for pod %q", pods[0].UID)
|
||||
}
|
||||
if status.Phase != api.PodFailed {
|
||||
t.Fatalf("expected pod status %q, ot %q.", api.PodFailed, status.Phase)
|
||||
t.Fatalf("expected pod status %q, got %q.", api.PodFailed, status.Phase)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -66,15 +66,21 @@ type manager struct {
|
||||
apiStatusVersions map[types.UID]uint64
|
||||
}
|
||||
|
||||
// status.Manager is the Source of truth for kubelet pod status, and should be kept up-to-date with
|
||||
// the latest api.PodStatus. It also syncs updates back to the API server.
|
||||
type Manager interface {
|
||||
// Start the API server status sync loop.
|
||||
Start()
|
||||
|
||||
// PodStatusProvider knows how to provide status for a pod. It's intended to be used by other components
|
||||
// that need to introspect status.
|
||||
type PodStatusProvider interface {
|
||||
// GetPodStatus returns the cached status for the provided pod UID, as well as whether it
|
||||
// was a cache hit.
|
||||
GetPodStatus(uid types.UID) (api.PodStatus, bool)
|
||||
}
|
||||
|
||||
// Manager is the Source of truth for kubelet pod status, and should be kept up-to-date with
|
||||
// the latest api.PodStatus. It also syncs updates back to the API server.
|
||||
type Manager interface {
|
||||
PodStatusProvider
|
||||
|
||||
// Start the API server status sync loop.
|
||||
Start()
|
||||
|
||||
// SetPodStatus caches updates the cached status for the given pod, and triggers a status update.
|
||||
SetPodStatus(pod *api.Pod, status api.PodStatus)
|
||||
|
Loading…
Reference in New Issue
Block a user