mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-30 06:54:01 +00:00
Merge pull request #8421 from yujuhong/active_deadline
Kubelet: move active deadline check to per pod worker
This commit is contained in:
commit
c5da035d51
@ -1155,37 +1155,33 @@ func (kl *Kubelet) cleanupOrphanedVolumes(pods []*api.Pod, runningPods []*kubeco
|
||||
return nil
|
||||
}
|
||||
|
||||
// filterOutPodsPastActiveDeadline filters pods with an ActiveDeadlineSeconds value that has been exceeded.
|
||||
// It records an event that the pod has been active longer than the allocated time, and updates the pod status as failed.
|
||||
// By filtering the pod from the result set, the Kubelet will kill the pod's containers as part of normal SyncPods workflow.
|
||||
func (kl *Kubelet) filterOutPodsPastActiveDeadline(allPods []*api.Pod) (pods []*api.Pod) {
|
||||
// pastActiveDeadline returns true if the pod has been active for more than
|
||||
// ActiveDeadlineSeconds.
|
||||
func (kl *Kubelet) pastActiveDeadline(pod *api.Pod) bool {
|
||||
now := util.Now()
|
||||
for _, pod := range allPods {
|
||||
keepPod := true
|
||||
if pod.Spec.ActiveDeadlineSeconds != nil {
|
||||
podStatus, ok := kl.statusManager.GetPodStatus(kubecontainer.GetPodFullName(pod))
|
||||
if !ok {
|
||||
podStatus = pod.Status
|
||||
}
|
||||
if !podStatus.StartTime.IsZero() {
|
||||
startTime := podStatus.StartTime.Time
|
||||
duration := now.Time.Sub(startTime)
|
||||
allowedDuration := time.Duration(*pod.Spec.ActiveDeadlineSeconds) * time.Second
|
||||
if duration >= allowedDuration {
|
||||
keepPod = false
|
||||
}
|
||||
}
|
||||
if pod.Spec.ActiveDeadlineSeconds != nil {
|
||||
podStatus, ok := kl.statusManager.GetPodStatus(kubecontainer.GetPodFullName(pod))
|
||||
if !ok {
|
||||
podStatus = pod.Status
|
||||
}
|
||||
if keepPod {
|
||||
pods = append(pods, pod)
|
||||
} else {
|
||||
kl.recorder.Eventf(pod, "deadline", "Pod was active on the node longer than specified deadline")
|
||||
kl.statusManager.SetPodStatus(pod, api.PodStatus{
|
||||
Phase: api.PodFailed,
|
||||
Message: "Pod was active on the node longer than specified deadline"})
|
||||
if !podStatus.StartTime.IsZero() {
|
||||
startTime := podStatus.StartTime.Time
|
||||
duration := now.Time.Sub(startTime)
|
||||
allowedDuration := time.Duration(*pod.Spec.ActiveDeadlineSeconds) * time.Second
|
||||
if duration >= allowedDuration {
|
||||
return true
|
||||
}
|
||||
}
|
||||
}
|
||||
return pods
|
||||
return false
|
||||
}
|
||||
|
||||
//podIsTerminated returns true if status is in one of the terminated state.
|
||||
func podIsTerminated(status *api.PodStatus) bool {
|
||||
if status.Phase == api.PodFailed || status.Phase == api.PodSucceeded {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// Filter out pods in the terminated state ("Failed" or "Succeeded").
|
||||
@ -1201,8 +1197,7 @@ func (kl *Kubelet) filterOutTerminatedPods(allPods []*api.Pod) []*api.Pod {
|
||||
// restarted.
|
||||
status = pod.Status
|
||||
}
|
||||
if status.Phase == api.PodFailed || status.Phase == api.PodSucceeded {
|
||||
// Pod has reached the final state; ignore it.
|
||||
if podIsTerminated(&status) {
|
||||
continue
|
||||
}
|
||||
pods = append(pods, pod)
|
||||
@ -1497,8 +1492,6 @@ func (kl *Kubelet) admitPods(allPods []*api.Pod, podSyncTypes map[types.UID]metr
|
||||
// These two conditions could be alleviated by checkpointing kubelet.
|
||||
pods := kl.filterOutTerminatedPods(allPods)
|
||||
|
||||
pods = kl.filterOutPodsPastActiveDeadline(pods)
|
||||
|
||||
// Respect the pod creation order when resolving conflicts.
|
||||
sort.Sort(podsByCreationTime(pods))
|
||||
|
||||
@ -1597,9 +1590,12 @@ func (kl *Kubelet) validateContainerStatus(podStatus *api.PodStatus, containerNa
|
||||
// or all of them.
|
||||
func (kl *Kubelet) GetKubeletContainerLogs(podFullName, containerName, tail string, follow, previous bool, stdout, stderr io.Writer) error {
|
||||
// TODO(vmarmol): Refactor to not need the pod status and verification.
|
||||
podStatus, err := kl.getPodStatus(podFullName)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get status for pod %q - %v", podFullName, err)
|
||||
// Pod workers periodically write status to statusManager. If status is not
|
||||
// cached there, something is wrong (or kubelet just restarted and hasn't
|
||||
// caught up yet). Just assume the pod is not ready yet.
|
||||
podStatus, found := kl.statusManager.GetPodStatus(podFullName)
|
||||
if !found {
|
||||
return fmt.Errorf("failed to get status for pod %q", podFullName)
|
||||
}
|
||||
if err := kl.validatePodPhase(&podStatus); err != nil {
|
||||
// No log is available if pod is not in a "known" phase (e.g. Unknown).
|
||||
@ -1925,30 +1921,21 @@ func getPodReadyCondition(spec *api.PodSpec, statuses []api.ContainerStatus) []a
|
||||
return ready
|
||||
}
|
||||
|
||||
// getPodStatus returns information of the containers in the pod from the
|
||||
// container runtime.
|
||||
func (kl *Kubelet) getPodStatus(podFullName string) (api.PodStatus, error) {
|
||||
// Check to see if we have a cached version of the status.
|
||||
cachedPodStatus, found := kl.statusManager.GetPodStatus(podFullName)
|
||||
if found {
|
||||
glog.V(3).Infof("Returning cached status for %q", podFullName)
|
||||
return cachedPodStatus, nil
|
||||
}
|
||||
pod, found := kl.GetPodByFullName(podFullName)
|
||||
if !found {
|
||||
return api.PodStatus{}, fmt.Errorf("couldn't find pod %q", podFullName)
|
||||
}
|
||||
return kl.generatePodStatus(pod)
|
||||
}
|
||||
|
||||
// By passing the pod directly, this method avoids pod lookup, which requires
|
||||
// grabbing a lock.
|
||||
func (kl *Kubelet) generatePodStatus(pod *api.Pod) (api.PodStatus, error) {
|
||||
podFullName := kubecontainer.GetPodFullName(pod)
|
||||
glog.V(3).Infof("Generating status for %q", podFullName)
|
||||
|
||||
spec := &pod.Spec
|
||||
// TODO: Consider include the container information.
|
||||
if kl.pastActiveDeadline(pod) {
|
||||
kl.recorder.Eventf(pod, "deadline", "Pod was active on the node longer than specified deadline")
|
||||
return api.PodStatus{
|
||||
Phase: api.PodFailed,
|
||||
Message: "Pod was active on the node longer than specified deadline"}, nil
|
||||
}
|
||||
|
||||
spec := &pod.Spec
|
||||
podStatus, err := kl.containerRuntime.GetPodStatus(pod)
|
||||
|
||||
if err != nil {
|
||||
|
@ -3010,19 +3010,9 @@ func TestHandlePortConflicts(t *testing.T) {
|
||||
|
||||
kl.handleNotFittingPods(pods)
|
||||
// Check pod status stored in the status map.
|
||||
status, err := kl.getPodStatus(conflictedPodName)
|
||||
if err != nil {
|
||||
t.Fatalf("status of pod %q is not found in the status map: %#v", conflictedPodName, err)
|
||||
}
|
||||
if status.Phase != api.PodFailed {
|
||||
t.Fatalf("expected pod status %q. Got %q.", api.PodFailed, status.Phase)
|
||||
}
|
||||
|
||||
// Check if we can retrieve the pod status from GetPodStatus().
|
||||
kl.podManager.SetPods(pods)
|
||||
status, err = kl.getPodStatus(conflictedPodName)
|
||||
if err != nil {
|
||||
t.Fatalf("unable to retrieve pod status for pod %q: %#v.", conflictedPodName, err)
|
||||
status, found := kl.statusManager.GetPodStatus(conflictedPodName)
|
||||
if !found {
|
||||
t.Fatalf("status of pod %q is not found in the status map", conflictedPodName)
|
||||
}
|
||||
if status.Phase != api.PodFailed {
|
||||
t.Fatalf("expected pod status %q. Got %q.", api.PodFailed, status.Phase)
|
||||
@ -3062,19 +3052,9 @@ func TestHandleNodeSelector(t *testing.T) {
|
||||
|
||||
kl.handleNotFittingPods(pods)
|
||||
// Check pod status stored in the status map.
|
||||
status, err := kl.getPodStatus(notfittingPodName)
|
||||
if err != nil {
|
||||
t.Fatalf("status of pod %q is not found in the status map: %#v", notfittingPodName, err)
|
||||
}
|
||||
if status.Phase != api.PodFailed {
|
||||
t.Fatalf("expected pod status %q. Got %q.", api.PodFailed, status.Phase)
|
||||
}
|
||||
|
||||
// Check if we can retrieve the pod status from GetPodStatus().
|
||||
kl.podManager.SetPods(pods)
|
||||
status, err = kl.getPodStatus(notfittingPodName)
|
||||
if err != nil {
|
||||
t.Fatalf("unable to retrieve pod status for pod %q: %#v.", notfittingPodName, err)
|
||||
status, found := kl.statusManager.GetPodStatus(notfittingPodName)
|
||||
if !found {
|
||||
t.Fatalf("status of pod %q is not found in the status map", notfittingPodName)
|
||||
}
|
||||
if status.Phase != api.PodFailed {
|
||||
t.Fatalf("expected pod status %q. Got %q.", api.PodFailed, status.Phase)
|
||||
@ -3120,19 +3100,9 @@ func TestHandleMemExceeded(t *testing.T) {
|
||||
|
||||
kl.handleNotFittingPods(pods)
|
||||
// Check pod status stored in the status map.
|
||||
status, err := kl.getPodStatus(notfittingPodName)
|
||||
if err != nil {
|
||||
t.Fatalf("status of pod %q is not found in the status map: %#v", notfittingPodName, err)
|
||||
}
|
||||
if status.Phase != api.PodFailed {
|
||||
t.Fatalf("expected pod status %q. Got %q.", api.PodFailed, status.Phase)
|
||||
}
|
||||
|
||||
// Check if we can retrieve the pod status from GetPodStatus().
|
||||
kl.podManager.SetPods(pods)
|
||||
status, err = kl.getPodStatus(notfittingPodName)
|
||||
if err != nil {
|
||||
t.Fatalf("unable to retrieve pod status for pod %q: %#v.", notfittingPodName, err)
|
||||
status, found := kl.statusManager.GetPodStatus(notfittingPodName)
|
||||
if !found {
|
||||
t.Fatalf("status of pod %q is not found in the status map", notfittingPodName)
|
||||
}
|
||||
if status.Phase != api.PodFailed {
|
||||
t.Fatalf("expected pod status %q. Got %q.", api.PodFailed, status.Phase)
|
||||
@ -3153,13 +3123,13 @@ func TestPurgingObsoleteStatusMapEntries(t *testing.T) {
|
||||
}
|
||||
// Run once to populate the status map.
|
||||
kl.handleNotFittingPods(pods)
|
||||
if _, err := kl.getPodStatus(kubecontainer.BuildPodFullName("pod2", "")); err != nil {
|
||||
t.Fatalf("expected to have status cached for %q: %v", "pod2", err)
|
||||
if _, found := kl.statusManager.GetPodStatus(kubecontainer.BuildPodFullName("pod2", "")); !found {
|
||||
t.Fatalf("expected to have status cached for pod2")
|
||||
}
|
||||
// Sync with empty pods so that the entry in status map will be removed.
|
||||
kl.SyncPods([]*api.Pod{}, emptyPodUIDs, map[string]*api.Pod{}, time.Now())
|
||||
if _, err := kl.getPodStatus(kubecontainer.BuildPodFullName("pod2", "")); err == nil {
|
||||
t.Fatalf("expected to not have status cached for %q: %v", "pod2", err)
|
||||
if _, found := kl.statusManager.GetPodStatus(kubecontainer.BuildPodFullName("pod2", "")); found {
|
||||
t.Fatalf("expected to not have status cached for pod2")
|
||||
}
|
||||
}
|
||||
|
||||
@ -4169,11 +4139,11 @@ func TestGetPodStatusWithLastTermination(t *testing.T) {
|
||||
t.Errorf("%d: unexpected error: %v", i, err)
|
||||
}
|
||||
|
||||
// Check if we can retrieve the pod status from GetPodStatus().
|
||||
// Check if we can retrieve the pod status.
|
||||
podName := kubecontainer.GetPodFullName(pods[0])
|
||||
status, err := kubelet.getPodStatus(podName)
|
||||
if err != nil {
|
||||
t.Fatalf("unable to retrieve pod status for pod %q: %#v.", podName, err)
|
||||
status, found := kubelet.statusManager.GetPodStatus(podName)
|
||||
if !found {
|
||||
t.Fatalf("unable to retrieve pod status for pod %q.", podName)
|
||||
} else {
|
||||
terminatedContainers := []string{}
|
||||
for _, cs := range status.ContainerStatuses {
|
||||
@ -4244,9 +4214,9 @@ func TestGetPodCreationFailureReason(t *testing.T) {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
status, err := kubelet.getPodStatus(kubecontainer.GetPodFullName(pod))
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error %v", err)
|
||||
status, found := kubelet.statusManager.GetPodStatus(kubecontainer.GetPodFullName(pod))
|
||||
if !found {
|
||||
t.Fatalf("unexpected error %v", err)
|
||||
}
|
||||
if len(status.ContainerStatuses) < 1 {
|
||||
t.Errorf("expected 1 container status, got %d", len(status.ContainerStatuses))
|
||||
@ -4310,9 +4280,9 @@ func TestGetPodPullImageFailureReason(t *testing.T) {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
status, err := kubelet.getPodStatus(kubecontainer.GetPodFullName(pod))
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error %v", err)
|
||||
status, found := kubelet.statusManager.GetPodStatus(kubecontainer.GetPodFullName(pod))
|
||||
if !found {
|
||||
t.Errorf("expected status of pod %q to be found", kubecontainer.GetPodFullName(pod))
|
||||
}
|
||||
if len(status.ContainerStatuses) < 1 {
|
||||
t.Errorf("expected 1 container status, got %d", len(status.ContainerStatuses))
|
||||
@ -4506,7 +4476,7 @@ func TestMakePortMappings(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestFilterOutPodsPastActiveDeadline(t *testing.T) {
|
||||
func TestIsPodPastActiveDeadline(t *testing.T) {
|
||||
testKubelet := newTestKubelet(t)
|
||||
kubelet := testKubelet.kubelet
|
||||
pods := newTestPods(5)
|
||||
@ -4519,23 +4489,21 @@ func TestFilterOutPodsPastActiveDeadline(t *testing.T) {
|
||||
pods[0].Spec.ActiveDeadlineSeconds = &exceededActiveDeadlineSeconds
|
||||
pods[1].Status.StartTime = &startTime
|
||||
pods[1].Spec.ActiveDeadlineSeconds = ¬YetActiveDeadlineSeconds
|
||||
expected := []*api.Pod{pods[1], pods[2], pods[3], pods[4]}
|
||||
tests := []struct {
|
||||
pod *api.Pod
|
||||
expected bool
|
||||
}{{pods[0], true}, {pods[1], false}, {pods[2], false}, {pods[3], false}, {pods[4], false}}
|
||||
|
||||
kubelet.podManager.SetPods(pods)
|
||||
actual := kubelet.filterOutPodsPastActiveDeadline(pods)
|
||||
if !reflect.DeepEqual(expected, actual) {
|
||||
expectedNames := ""
|
||||
for _, pod := range expected {
|
||||
expectedNames = expectedNames + pod.Name + " "
|
||||
for i, tt := range tests {
|
||||
actual := kubelet.pastActiveDeadline(tt.pod)
|
||||
if actual != tt.expected {
|
||||
t.Errorf("[%d] expected %#v, got %#v", i, tt.expected, actual)
|
||||
}
|
||||
actualNames := ""
|
||||
for _, pod := range actual {
|
||||
actualNames = actualNames + pod.Name + " "
|
||||
}
|
||||
t.Errorf("expected %#v, got %#v", expectedNames, actualNames)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSyncPodsDeletesPodsThatRunTooLong(t *testing.T) {
|
||||
func TestSyncPodsSetStatusToFailedForPodsThatRunTooLong(t *testing.T) {
|
||||
testKubelet := newTestKubelet(t)
|
||||
testKubelet.fakeCadvisor.On("MachineInfo").Return(&cadvisorApi.MachineInfo{}, nil)
|
||||
kubelet := testKubelet.kubelet
|
||||
@ -4593,27 +4561,22 @@ func TestSyncPodsDeletesPodsThatRunTooLong(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
// Let the pod worker sets the status to fail after this sync.
|
||||
err := kubelet.SyncPods(pods, emptyPodUIDs, map[string]*api.Pod{}, time.Now())
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
verifyCalls(t, fakeDocker, []string{"list", "inspect_container", "stop", "inspect_container", "stop", "list"})
|
||||
|
||||
// A map iteration is used to delete containers, so must not depend on
|
||||
// order here.
|
||||
expectedToStop := map[string]bool{
|
||||
"1234": true,
|
||||
"9876": true,
|
||||
podFullName := kubecontainer.GetPodFullName(pods[0])
|
||||
status, found := kubelet.statusManager.GetPodStatus(podFullName)
|
||||
if !found {
|
||||
t.Errorf("expected to found status for pod %q", status)
|
||||
}
|
||||
if len(fakeDocker.Stopped) != 2 ||
|
||||
!expectedToStop[fakeDocker.Stopped[0]] ||
|
||||
!expectedToStop[fakeDocker.Stopped[1]] {
|
||||
t.Errorf("Wrong containers were stopped: %v", fakeDocker.Stopped)
|
||||
if status.Phase != api.PodFailed {
|
||||
t.Fatalf("expected pod status %q, ot %q.", api.PodFailed, status.Phase)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSyncPodsDoesNotDeletePodsThatRunTooLong(t *testing.T) {
|
||||
func TestSyncPodsDoesNotSetPodsThatDidNotRunTooLongToFailed(t *testing.T) {
|
||||
testKubelet := newTestKubelet(t)
|
||||
testKubelet.fakeCadvisor.On("MachineInfo").Return(&cadvisorApi.MachineInfo{}, nil)
|
||||
kubelet := testKubelet.kubelet
|
||||
@ -4676,14 +4639,14 @@ func TestSyncPodsDoesNotDeletePodsThatRunTooLong(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
verifyCalls(t, fakeDocker, []string{
|
||||
"list", "list", "list",
|
||||
// Get pod status.
|
||||
"inspect_container", "inspect_container",
|
||||
// Check the pod infra container.
|
||||
"inspect_container",
|
||||
// Get pod status.
|
||||
"list", "inspect_container", "inspect_container", "list"})
|
||||
podFullName := kubecontainer.GetPodFullName(pods[0])
|
||||
status, found := kubelet.statusManager.GetPodStatus(podFullName)
|
||||
if !found {
|
||||
t.Errorf("expected to found status for pod %q", status)
|
||||
}
|
||||
if status.Phase == api.PodFailed {
|
||||
t.Fatalf("expected pod status to not be %q", status.Phase)
|
||||
}
|
||||
}
|
||||
|
||||
func TestDeletePodDirsForDeletedPods(t *testing.T) {
|
||||
|
@ -81,6 +81,9 @@ func (s *statusManager) SetPodStatus(pod *api.Pod, status api.PodStatus) {
|
||||
}
|
||||
|
||||
// if the status has no start time, we need to set an initial time
|
||||
// TODO(yujuhong): Consider setting StartTime when generating the pod
|
||||
// status instead, which would allow statusManager to become a simple cache
|
||||
// again.
|
||||
if status.StartTime.IsZero() {
|
||||
if pod.Status.StartTime.IsZero() {
|
||||
// the pod did not have a previously recorded value so set to now
|
||||
|
Loading…
Reference in New Issue
Block a user