mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-28 14:07:14 +00:00
Merge pull request #88440 from smarterclayton/container_success_fix
Ensure Kubelet always reports terminating pod container status
This commit is contained in:
commit
7a513b575a
@ -61,6 +61,10 @@ type RuntimeHelper interface {
|
|||||||
// ShouldContainerBeRestarted checks whether a container needs to be restarted.
|
// ShouldContainerBeRestarted checks whether a container needs to be restarted.
|
||||||
// TODO(yifan): Think about how to refactor this.
|
// TODO(yifan): Think about how to refactor this.
|
||||||
func ShouldContainerBeRestarted(container *v1.Container, pod *v1.Pod, podStatus *PodStatus) bool {
|
func ShouldContainerBeRestarted(container *v1.Container, pod *v1.Pod, podStatus *PodStatus) bool {
|
||||||
|
// Once a pod has been marked deleted, it should not be restarted
|
||||||
|
if pod.DeletionTimestamp != nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
// Get latest container status.
|
// Get latest container status.
|
||||||
status := podStatus.FindContainerStatusByName(container.Name)
|
status := podStatus.FindContainerStatusByName(container.Name)
|
||||||
// If the container was never started before, we should start it.
|
// If the container was never started before, we should start it.
|
||||||
|
@ -19,6 +19,7 @@ package container
|
|||||||
import (
|
import (
|
||||||
"reflect"
|
"reflect"
|
||||||
"testing"
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/google/go-cmp/cmp"
|
"github.com/google/go-cmp/cmp"
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
@ -449,6 +450,8 @@ func TestShouldContainerBeRestarted(t *testing.T) {
|
|||||||
v1.RestartPolicyOnFailure,
|
v1.RestartPolicyOnFailure,
|
||||||
v1.RestartPolicyAlways,
|
v1.RestartPolicyAlways,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// test policies
|
||||||
expected := map[string][]bool{
|
expected := map[string][]bool{
|
||||||
"no-history": {true, true, true},
|
"no-history": {true, true, true},
|
||||||
"alive": {false, false, false},
|
"alive": {false, false, false},
|
||||||
@ -467,6 +470,27 @@ func TestShouldContainerBeRestarted(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// test deleted pod
|
||||||
|
pod.DeletionTimestamp = &metav1.Time{Time: time.Now()}
|
||||||
|
expected = map[string][]bool{
|
||||||
|
"no-history": {false, false, false},
|
||||||
|
"alive": {false, false, false},
|
||||||
|
"succeed": {false, false, false},
|
||||||
|
"failed": {false, false, false},
|
||||||
|
"unknown": {false, false, false},
|
||||||
|
}
|
||||||
|
for _, c := range pod.Spec.Containers {
|
||||||
|
for i, policy := range policies {
|
||||||
|
pod.Spec.RestartPolicy = policy
|
||||||
|
e := expected[c.Name][i]
|
||||||
|
r := ShouldContainerBeRestarted(&c, pod, podStatus)
|
||||||
|
if r != e {
|
||||||
|
t.Errorf("Restart for container %q with restart policy %q expected %t, got %t",
|
||||||
|
c.Name, policy, e, r)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestHasPrivilegedContainer(t *testing.T) {
|
func TestHasPrivilegedContainer(t *testing.T) {
|
||||||
|
@ -2001,18 +2001,22 @@ func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handle
|
|||||||
}
|
}
|
||||||
|
|
||||||
// dispatchWork starts the asynchronous sync of the pod in a pod worker.
|
// dispatchWork starts the asynchronous sync of the pod in a pod worker.
|
||||||
// If the pod is terminated, dispatchWork
|
// If the pod has completed termination, dispatchWork will perform no action.
|
||||||
func (kl *Kubelet) dispatchWork(pod *v1.Pod, syncType kubetypes.SyncPodType, mirrorPod *v1.Pod, start time.Time) {
|
func (kl *Kubelet) dispatchWork(pod *v1.Pod, syncType kubetypes.SyncPodType, mirrorPod *v1.Pod, start time.Time) {
|
||||||
if kl.podIsTerminated(pod) {
|
// check whether we are ready to delete the pod from the API server (all status up to date)
|
||||||
if pod.DeletionTimestamp != nil {
|
containersTerminal, podWorkerTerminal := kl.podAndContainersAreTerminal(pod)
|
||||||
// If the pod is in a terminated state, there is no pod worker to
|
if pod.DeletionTimestamp != nil && containersTerminal {
|
||||||
// handle the work item. Check if the DeletionTimestamp has been
|
klog.V(4).Infof("Pod %q has completed execution and should be deleted from the API server: %s", format.Pod(pod), syncType)
|
||||||
// set, and force a status update to trigger a pod deletion request
|
kl.statusManager.TerminatePod(pod)
|
||||||
// to the apiserver.
|
|
||||||
kl.statusManager.TerminatePod(pod)
|
|
||||||
}
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// optimization: avoid invoking the pod worker if no further changes are possible to the pod definition
|
||||||
|
if podWorkerTerminal {
|
||||||
|
klog.V(4).Infof("Pod %q has completed, ignoring remaining sync work: %s", format.Pod(pod), syncType)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
// Run the sync in an async worker.
|
// Run the sync in an async worker.
|
||||||
kl.podWorkers.UpdatePod(&UpdatePodOptions{
|
kl.podWorkers.UpdatePod(&UpdatePodOptions{
|
||||||
Pod: pod,
|
Pod: pod,
|
||||||
|
@ -865,8 +865,9 @@ func (kl *Kubelet) getPullSecretsForPod(pod *v1.Pod) []v1.Secret {
|
|||||||
return pullSecrets
|
return pullSecrets
|
||||||
}
|
}
|
||||||
|
|
||||||
// podIsTerminated returns true if pod is in the terminated state ("Failed" or "Succeeded").
|
// podStatusIsTerminal reports when the specified pod has no running containers or is no longer accepting
|
||||||
func (kl *Kubelet) podIsTerminated(pod *v1.Pod) bool {
|
// spec changes.
|
||||||
|
func (kl *Kubelet) podAndContainersAreTerminal(pod *v1.Pod) (containersTerminal, podWorkerTerminal bool) {
|
||||||
// Check the cached pod status which was set after the last sync.
|
// Check the cached pod status which was set after the last sync.
|
||||||
status, ok := kl.statusManager.GetPodStatus(pod.UID)
|
status, ok := kl.statusManager.GetPodStatus(pod.UID)
|
||||||
if !ok {
|
if !ok {
|
||||||
@ -875,11 +876,28 @@ func (kl *Kubelet) podIsTerminated(pod *v1.Pod) bool {
|
|||||||
// restarted.
|
// restarted.
|
||||||
status = pod.Status
|
status = pod.Status
|
||||||
}
|
}
|
||||||
return status.Phase == v1.PodFailed || status.Phase == v1.PodSucceeded || (pod.DeletionTimestamp != nil && notRunning(status.ContainerStatuses))
|
// A pod transitions into failed or succeeded from either container lifecycle (RestartNever container
|
||||||
|
// fails) or due to external events like deletion or eviction. A terminal pod *should* have no running
|
||||||
|
// containers, but to know that the pod has completed its lifecycle you must wait for containers to also
|
||||||
|
// be terminal.
|
||||||
|
containersTerminal = notRunning(status.ContainerStatuses)
|
||||||
|
// The kubelet must accept config changes from the pod spec until it has reached a point where changes would
|
||||||
|
// have no effect on any running container.
|
||||||
|
podWorkerTerminal = status.Phase == v1.PodFailed || status.Phase == v1.PodSucceeded || (pod.DeletionTimestamp != nil && containersTerminal)
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// IsPodTerminated returns true if the pod with the provided UID is in a terminated state ("Failed" or "Succeeded")
|
// podIsTerminated returns true if the provided pod is in a terminal phase ("Failed", "Succeeded") or
|
||||||
// or if the pod has been deleted or removed
|
// has been deleted and has no running containers. This corresponds to when a pod must accept changes to
|
||||||
|
// its pod spec (e.g. terminating containers allow grace period to be shortened).
|
||||||
|
func (kl *Kubelet) podIsTerminated(pod *v1.Pod) bool {
|
||||||
|
_, podWorkerTerminal := kl.podAndContainersAreTerminal(pod)
|
||||||
|
return podWorkerTerminal
|
||||||
|
}
|
||||||
|
|
||||||
|
// IsPodTerminated returns true if the pod with the provided UID is in a terminal phase ("Failed",
|
||||||
|
// "Succeeded") or has been deleted and has no running containers. This corresponds to when a pod must
|
||||||
|
// accept changes to its pod spec (e.g. terminating containers allow grace period to be shortened)
|
||||||
func (kl *Kubelet) IsPodTerminated(uid types.UID) bool {
|
func (kl *Kubelet) IsPodTerminated(uid types.UID) bool {
|
||||||
pod, podFound := kl.podManager.GetPodByUID(uid)
|
pod, podFound := kl.podManager.GetPodByUID(uid)
|
||||||
if !podFound {
|
if !podFound {
|
||||||
|
@ -54,6 +54,7 @@ go_test(
|
|||||||
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
|
||||||
|
"//staging/src/k8s.io/apimachinery/pkg/util/diff:go_default_library",
|
||||||
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
|
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
|
||||||
"//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library",
|
"//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library",
|
||||||
"//staging/src/k8s.io/client-go/testing:go_default_library",
|
"//staging/src/k8s.io/client-go/testing:go_default_library",
|
||||||
|
@ -160,13 +160,20 @@ func (m *manager) Start() {
|
|||||||
syncTicker := time.Tick(syncPeriod)
|
syncTicker := time.Tick(syncPeriod)
|
||||||
// syncPod and syncBatch share the same go routine to avoid sync races.
|
// syncPod and syncBatch share the same go routine to avoid sync races.
|
||||||
go wait.Forever(func() {
|
go wait.Forever(func() {
|
||||||
select {
|
for {
|
||||||
case syncRequest := <-m.podStatusChannel:
|
select {
|
||||||
klog.V(5).Infof("Status Manager: syncing pod: %q, with status: (%d, %v) from podStatusChannel",
|
case syncRequest := <-m.podStatusChannel:
|
||||||
syncRequest.podUID, syncRequest.status.version, syncRequest.status.status)
|
klog.V(5).Infof("Status Manager: syncing pod: %q, with status: (%d, %v) from podStatusChannel",
|
||||||
m.syncPod(syncRequest.podUID, syncRequest.status)
|
syncRequest.podUID, syncRequest.status.version, syncRequest.status.status)
|
||||||
case <-syncTicker:
|
m.syncPod(syncRequest.podUID, syncRequest.status)
|
||||||
m.syncBatch()
|
case <-syncTicker:
|
||||||
|
klog.V(5).Infof("Status Manager: syncing batch")
|
||||||
|
// remove any entries in the status channel since the batch will handle them
|
||||||
|
for i := len(m.podStatusChannel); i > 0; i-- {
|
||||||
|
<-m.podStatusChannel
|
||||||
|
}
|
||||||
|
m.syncBatch()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}, 0)
|
}, 0)
|
||||||
}
|
}
|
||||||
@ -314,21 +321,39 @@ func findContainerStatus(status *v1.PodStatus, containerID string) (containerSta
|
|||||||
func (m *manager) TerminatePod(pod *v1.Pod) {
|
func (m *manager) TerminatePod(pod *v1.Pod) {
|
||||||
m.podStatusesLock.Lock()
|
m.podStatusesLock.Lock()
|
||||||
defer m.podStatusesLock.Unlock()
|
defer m.podStatusesLock.Unlock()
|
||||||
|
|
||||||
|
// ensure that all containers have a terminated state - because we do not know whether the container
|
||||||
|
// was successful, always report an error
|
||||||
oldStatus := &pod.Status
|
oldStatus := &pod.Status
|
||||||
if cachedStatus, ok := m.podStatuses[pod.UID]; ok {
|
if cachedStatus, ok := m.podStatuses[pod.UID]; ok {
|
||||||
oldStatus = &cachedStatus.status
|
oldStatus = &cachedStatus.status
|
||||||
}
|
}
|
||||||
status := *oldStatus.DeepCopy()
|
status := *oldStatus.DeepCopy()
|
||||||
for i := range status.ContainerStatuses {
|
for i := range status.ContainerStatuses {
|
||||||
|
if status.ContainerStatuses[i].State.Terminated != nil || status.ContainerStatuses[i].State.Waiting != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
status.ContainerStatuses[i].State = v1.ContainerState{
|
status.ContainerStatuses[i].State = v1.ContainerState{
|
||||||
Terminated: &v1.ContainerStateTerminated{},
|
Terminated: &v1.ContainerStateTerminated{
|
||||||
|
Reason: "ContainerStatusUnknown",
|
||||||
|
Message: "The container could not be located when the pod was terminated",
|
||||||
|
ExitCode: 137,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
for i := range status.InitContainerStatuses {
|
for i := range status.InitContainerStatuses {
|
||||||
|
if status.InitContainerStatuses[i].State.Terminated != nil || status.InitContainerStatuses[i].State.Waiting != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
status.InitContainerStatuses[i].State = v1.ContainerState{
|
status.InitContainerStatuses[i].State = v1.ContainerState{
|
||||||
Terminated: &v1.ContainerStateTerminated{},
|
Terminated: &v1.ContainerStateTerminated{
|
||||||
|
Reason: "ContainerStatusUnknown",
|
||||||
|
Message: "The container could not be located when the pod was terminated",
|
||||||
|
ExitCode: 137,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
m.updateStatusInternal(pod, status, true)
|
m.updateStatusInternal(pod, status, true)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -27,11 +27,12 @@ import (
|
|||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
|
|
||||||
"k8s.io/api/core/v1"
|
v1 "k8s.io/api/core/v1"
|
||||||
"k8s.io/apimachinery/pkg/api/errors"
|
"k8s.io/apimachinery/pkg/api/errors"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/runtime"
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||||
|
"k8s.io/apimachinery/pkg/util/diff"
|
||||||
clientset "k8s.io/client-go/kubernetes"
|
clientset "k8s.io/client-go/kubernetes"
|
||||||
"k8s.io/client-go/kubernetes/fake"
|
"k8s.io/client-go/kubernetes/fake"
|
||||||
core "k8s.io/client-go/testing"
|
core "k8s.io/client-go/testing"
|
||||||
@ -569,6 +570,16 @@ func TestTerminatePod(t *testing.T) {
|
|||||||
t.Logf("update the pod's status to Failed. TerminatePod should preserve this status update.")
|
t.Logf("update the pod's status to Failed. TerminatePod should preserve this status update.")
|
||||||
firstStatus := getRandomPodStatus()
|
firstStatus := getRandomPodStatus()
|
||||||
firstStatus.Phase = v1.PodFailed
|
firstStatus.Phase = v1.PodFailed
|
||||||
|
firstStatus.InitContainerStatuses = []v1.ContainerStatus{
|
||||||
|
{Name: "init-test-1"},
|
||||||
|
{Name: "init-test-2", State: v1.ContainerState{Terminated: &v1.ContainerStateTerminated{Reason: "InitTest", ExitCode: 0}}},
|
||||||
|
{Name: "init-test-3", State: v1.ContainerState{Terminated: &v1.ContainerStateTerminated{Reason: "InitTest", ExitCode: 3}}},
|
||||||
|
}
|
||||||
|
firstStatus.ContainerStatuses = []v1.ContainerStatus{
|
||||||
|
{Name: "test-1"},
|
||||||
|
{Name: "test-2", State: v1.ContainerState{Terminated: &v1.ContainerStateTerminated{Reason: "Test", ExitCode: 2}}},
|
||||||
|
{Name: "test-3", State: v1.ContainerState{Terminated: &v1.ContainerStateTerminated{Reason: "Test", ExitCode: 0}}},
|
||||||
|
}
|
||||||
syncer.SetPodStatus(testPod, firstStatus)
|
syncer.SetPodStatus(testPod, firstStatus)
|
||||||
|
|
||||||
t.Logf("set the testPod to a pod with Phase running, to simulate a stale pod")
|
t.Logf("set the testPod to a pod with Phase running, to simulate a stale pod")
|
||||||
@ -586,6 +597,26 @@ func TestTerminatePod(t *testing.T) {
|
|||||||
assert.False(t, newStatus.InitContainerStatuses[i].State.Terminated == nil, "expected init containers to be terminated")
|
assert.False(t, newStatus.InitContainerStatuses[i].State.Terminated == nil, "expected init containers to be terminated")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
expectUnknownState := v1.ContainerState{Terminated: &v1.ContainerStateTerminated{Reason: "ContainerStatusUnknown", Message: "The container could not be located when the pod was terminated", ExitCode: 137}}
|
||||||
|
if !reflect.DeepEqual(newStatus.InitContainerStatuses[0].State, expectUnknownState) {
|
||||||
|
t.Errorf("terminated container state not defaulted: %s", diff.ObjectReflectDiff(newStatus.InitContainerStatuses[0].State, expectUnknownState))
|
||||||
|
}
|
||||||
|
if !reflect.DeepEqual(newStatus.InitContainerStatuses[1].State, firstStatus.InitContainerStatuses[1].State) {
|
||||||
|
t.Errorf("existing terminated container state not preserved: %#v", newStatus.ContainerStatuses)
|
||||||
|
}
|
||||||
|
if !reflect.DeepEqual(newStatus.InitContainerStatuses[2].State, firstStatus.InitContainerStatuses[2].State) {
|
||||||
|
t.Errorf("existing terminated container state not preserved: %#v", newStatus.ContainerStatuses)
|
||||||
|
}
|
||||||
|
if !reflect.DeepEqual(newStatus.ContainerStatuses[0].State, expectUnknownState) {
|
||||||
|
t.Errorf("terminated container state not defaulted: %s", diff.ObjectReflectDiff(newStatus.ContainerStatuses[0].State, expectUnknownState))
|
||||||
|
}
|
||||||
|
if !reflect.DeepEqual(newStatus.ContainerStatuses[1].State, firstStatus.ContainerStatuses[1].State) {
|
||||||
|
t.Errorf("existing terminated container state not preserved: %#v", newStatus.ContainerStatuses)
|
||||||
|
}
|
||||||
|
if !reflect.DeepEqual(newStatus.ContainerStatuses[2].State, firstStatus.ContainerStatuses[2].State) {
|
||||||
|
t.Errorf("existing terminated container state not preserved: %#v", newStatus.ContainerStatuses)
|
||||||
|
}
|
||||||
|
|
||||||
t.Logf("we expect the previous status update to be preserved.")
|
t.Logf("we expect the previous status update to be preserved.")
|
||||||
assert.Equal(t, newStatus.Phase, firstStatus.Phase)
|
assert.Equal(t, newStatus.Phase, firstStatus.Phase)
|
||||||
assert.Equal(t, newStatus.Message, firstStatus.Message)
|
assert.Equal(t, newStatus.Message, firstStatus.Message)
|
||||||
|
@ -37,6 +37,7 @@ go_library(
|
|||||||
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/util/uuid:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/util/uuid:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
|
||||||
|
"//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library",
|
||||||
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
|
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
|
||||||
"//test/e2e/framework:go_default_library",
|
"//test/e2e/framework:go_default_library",
|
||||||
"//test/e2e/framework/job:go_default_library",
|
"//test/e2e/framework/job:go_default_library",
|
||||||
|
@ -19,8 +19,13 @@ package node
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"math/rand"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"regexp"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
"strings"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
v1 "k8s.io/api/core/v1"
|
v1 "k8s.io/api/core/v1"
|
||||||
@ -29,6 +34,7 @@ import (
|
|||||||
"k8s.io/apimachinery/pkg/labels"
|
"k8s.io/apimachinery/pkg/labels"
|
||||||
"k8s.io/apimachinery/pkg/util/uuid"
|
"k8s.io/apimachinery/pkg/util/uuid"
|
||||||
"k8s.io/apimachinery/pkg/util/wait"
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
|
"k8s.io/apimachinery/pkg/watch"
|
||||||
"k8s.io/kubernetes/test/e2e/framework"
|
"k8s.io/kubernetes/test/e2e/framework"
|
||||||
e2ekubelet "k8s.io/kubernetes/test/e2e/framework/kubelet"
|
e2ekubelet "k8s.io/kubernetes/test/e2e/framework/kubelet"
|
||||||
imageutils "k8s.io/kubernetes/test/utils/image"
|
imageutils "k8s.io/kubernetes/test/utils/image"
|
||||||
@ -197,4 +203,227 @@ var _ = SIGDescribe("Pods Extended", func() {
|
|||||||
framework.ExpectEqual(pod.Status.QOSClass, v1.PodQOSGuaranteed)
|
framework.ExpectEqual(pod.Status.QOSClass, v1.PodQOSGuaranteed)
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
|
framework.KubeDescribe("Pod Container Status", func() {
|
||||||
|
var podClient *framework.PodClient
|
||||||
|
ginkgo.BeforeEach(func() {
|
||||||
|
podClient = f.PodClient()
|
||||||
|
})
|
||||||
|
|
||||||
|
ginkgo.It("should never report success for a pending container", func() {
|
||||||
|
ginkgo.By("creating pods that should always exit 1 and terminating the pod after a random delay")
|
||||||
|
|
||||||
|
var reBug88766 = regexp.MustCompile(`ContainerCannotRun.*rootfs_linux\.go.*kubernetes\.io~secret.*no such file or directory`)
|
||||||
|
|
||||||
|
var (
|
||||||
|
lock sync.Mutex
|
||||||
|
errs []error
|
||||||
|
|
||||||
|
wg sync.WaitGroup
|
||||||
|
)
|
||||||
|
|
||||||
|
const delay = 2000
|
||||||
|
const workers = 3
|
||||||
|
const pods = 15
|
||||||
|
var min, max time.Duration
|
||||||
|
for i := 0; i < workers; i++ {
|
||||||
|
wg.Add(1)
|
||||||
|
go func(i int) {
|
||||||
|
defer wg.Done()
|
||||||
|
for retries := 0; retries < pods; retries++ {
|
||||||
|
name := fmt.Sprintf("pod-submit-status-%d-%d", i, retries)
|
||||||
|
value := strconv.Itoa(time.Now().Nanosecond())
|
||||||
|
one := int64(1)
|
||||||
|
pod := &v1.Pod{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Name: name,
|
||||||
|
Labels: map[string]string{
|
||||||
|
"name": "foo",
|
||||||
|
"time": value,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Spec: v1.PodSpec{
|
||||||
|
RestartPolicy: v1.RestartPolicyNever,
|
||||||
|
TerminationGracePeriodSeconds: &one,
|
||||||
|
Containers: []v1.Container{
|
||||||
|
{
|
||||||
|
Name: "busybox",
|
||||||
|
Image: imageutils.GetE2EImage(imageutils.BusyBox),
|
||||||
|
Command: []string{
|
||||||
|
"/bin/false",
|
||||||
|
},
|
||||||
|
Resources: v1.ResourceRequirements{
|
||||||
|
Requests: v1.ResourceList{
|
||||||
|
v1.ResourceCPU: resource.MustParse("5m"),
|
||||||
|
v1.ResourceMemory: resource.MustParse("10Mi"),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
// create the pod, capture the change events, then delete the pod
|
||||||
|
start := time.Now()
|
||||||
|
created := podClient.Create(pod)
|
||||||
|
ch := make(chan []watch.Event)
|
||||||
|
go func() {
|
||||||
|
defer close(ch)
|
||||||
|
w, err := podClient.Watch(context.TODO(), metav1.ListOptions{
|
||||||
|
ResourceVersion: created.ResourceVersion,
|
||||||
|
FieldSelector: fmt.Sprintf("metadata.name=%s", pod.Name),
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
framework.Logf("Unable to watch pod %s: %v", pod.Name, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer w.Stop()
|
||||||
|
events := []watch.Event{
|
||||||
|
{Type: watch.Added, Object: created},
|
||||||
|
}
|
||||||
|
for event := range w.ResultChan() {
|
||||||
|
events = append(events, event)
|
||||||
|
if event.Type == watch.Deleted {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
ch <- events
|
||||||
|
}()
|
||||||
|
|
||||||
|
t := time.Duration(rand.Intn(delay)) * time.Millisecond
|
||||||
|
time.Sleep(t)
|
||||||
|
err := podClient.Delete(context.TODO(), pod.Name, nil)
|
||||||
|
framework.ExpectNoError(err, "failed to delete pod")
|
||||||
|
|
||||||
|
events, ok := <-ch
|
||||||
|
if !ok {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if len(events) < 2 {
|
||||||
|
framework.Fail("only got a single event")
|
||||||
|
}
|
||||||
|
|
||||||
|
end := time.Now()
|
||||||
|
|
||||||
|
// check the returned events for consistency
|
||||||
|
var duration, completeDuration time.Duration
|
||||||
|
var hasContainers, hasTerminated, hasTerminalPhase, hasRunningContainers bool
|
||||||
|
verifyFn := func(event watch.Event) error {
|
||||||
|
var ok bool
|
||||||
|
pod, ok = event.Object.(*v1.Pod)
|
||||||
|
if !ok {
|
||||||
|
framework.Logf("Unexpected event object: %s %#v", event.Type, event.Object)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(pod.Status.InitContainerStatuses) != 0 {
|
||||||
|
return fmt.Errorf("pod %s on node %s had incorrect init containers: %#v", pod.Name, pod.Spec.NodeName, pod.Status.InitContainerStatuses)
|
||||||
|
}
|
||||||
|
if len(pod.Status.ContainerStatuses) == 0 {
|
||||||
|
if hasContainers {
|
||||||
|
return fmt.Errorf("pod %s on node %s had incorrect containers: %#v", pod.Name, pod.Spec.NodeName, pod.Status.ContainerStatuses)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
hasContainers = true
|
||||||
|
if len(pod.Status.ContainerStatuses) != 1 {
|
||||||
|
return fmt.Errorf("pod %s on node %s had incorrect containers: %#v", pod.Name, pod.Spec.NodeName, pod.Status.ContainerStatuses)
|
||||||
|
}
|
||||||
|
status := pod.Status.ContainerStatuses[0]
|
||||||
|
t := status.State.Terminated
|
||||||
|
if hasTerminated {
|
||||||
|
if status.State.Waiting != nil || status.State.Running != nil {
|
||||||
|
return fmt.Errorf("pod %s on node %s was terminated and then changed state: %#v", pod.Name, pod.Spec.NodeName, status)
|
||||||
|
}
|
||||||
|
if t == nil {
|
||||||
|
return fmt.Errorf("pod %s on node %s was terminated and then had termination cleared: %#v", pod.Name, pod.Spec.NodeName, status)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
hasRunningContainers = status.State.Waiting == nil && status.State.Terminated == nil
|
||||||
|
if t != nil {
|
||||||
|
if !t.FinishedAt.Time.IsZero() {
|
||||||
|
duration = t.FinishedAt.Sub(t.StartedAt.Time)
|
||||||
|
completeDuration = t.FinishedAt.Sub(pod.CreationTimestamp.Time)
|
||||||
|
}
|
||||||
|
|
||||||
|
defer func() { hasTerminated = true }()
|
||||||
|
switch {
|
||||||
|
case t.ExitCode == 1:
|
||||||
|
// expected
|
||||||
|
case t.ExitCode == 128 && reBug88766.MatchString(t.Message):
|
||||||
|
// pod volume teardown races with container start in CRI, which reports a failure
|
||||||
|
framework.Logf("pod %s on node %s failed with the symptoms of https://github.com/kubernetes/kubernetes/issues/88766")
|
||||||
|
default:
|
||||||
|
return fmt.Errorf("pod %s on node %s container unexpected exit code %d: start=%s end=%s reason=%s message=%s", pod.Name, pod.Spec.NodeName, t.ExitCode, t.StartedAt, t.FinishedAt, t.Reason, t.Message)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if pod.Status.Phase == v1.PodFailed || pod.Status.Phase == v1.PodSucceeded {
|
||||||
|
hasTerminalPhase = true
|
||||||
|
} else {
|
||||||
|
if hasTerminalPhase {
|
||||||
|
return fmt.Errorf("pod %s on node %s was in a terminal phase and then reverted: %#v", pod.Name, pod.Spec.NodeName, pod.Status)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
var eventErr error
|
||||||
|
for _, event := range events[1:] {
|
||||||
|
if err := verifyFn(event); err != nil {
|
||||||
|
eventErr = err
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
func() {
|
||||||
|
defer lock.Unlock()
|
||||||
|
lock.Lock()
|
||||||
|
|
||||||
|
if eventErr != nil {
|
||||||
|
errs = append(errs, eventErr)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if !hasTerminalPhase {
|
||||||
|
var names []string
|
||||||
|
for _, status := range pod.Status.ContainerStatuses {
|
||||||
|
if status.State.Terminated != nil || status.State.Running != nil {
|
||||||
|
names = append(names, status.Name)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
switch {
|
||||||
|
case len(names) > 0:
|
||||||
|
errs = append(errs, fmt.Errorf("pod %s on node %s did not reach a terminal phase before being deleted but had running containers: phase=%s, running-containers=%s", pod.Name, pod.Spec.NodeName, pod.Status.Phase, strings.Join(names, ",")))
|
||||||
|
case pod.Status.Phase != v1.PodPending:
|
||||||
|
errs = append(errs, fmt.Errorf("pod %s on node %s was not Pending but has no running containers: phase=%s", pod.Name, pod.Spec.NodeName, pod.Status.Phase))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if hasRunningContainers {
|
||||||
|
data, _ := json.MarshalIndent(pod.Status.ContainerStatuses, "", " ")
|
||||||
|
errs = append(errs, fmt.Errorf("pod %s on node %s had running or unknown container status before being deleted:\n%s", pod.Name, pod.Spec.NodeName, string(data)))
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
if duration < min {
|
||||||
|
min = duration
|
||||||
|
}
|
||||||
|
if duration > max || max == 0 {
|
||||||
|
max = duration
|
||||||
|
}
|
||||||
|
framework.Logf("Pod %s on node %s timings total=%s t=%s run=%s execute=%s", pod.Name, pod.Spec.NodeName, end.Sub(start), t, completeDuration, duration)
|
||||||
|
}
|
||||||
|
|
||||||
|
}(i)
|
||||||
|
}
|
||||||
|
|
||||||
|
wg.Wait()
|
||||||
|
|
||||||
|
if len(errs) > 0 {
|
||||||
|
var messages []string
|
||||||
|
for _, err := range errs {
|
||||||
|
messages = append(messages, err.Error())
|
||||||
|
}
|
||||||
|
framework.Failf("%d errors:\n%v", len(errs), strings.Join(messages, "\n"))
|
||||||
|
}
|
||||||
|
})
|
||||||
|
})
|
||||||
})
|
})
|
||||||
|
Loading…
Reference in New Issue
Block a user