mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-30 15:05:27 +00:00
Merge pull request #25077 from ncdc/pleg-retry
Automatic merge from submit-queue PLEG: reinspect pods that failed prior inspections Fix the following sequence of events: 1. relist call 1 successfully inspects a pod (just has infra container) 1. relist call 2 gets an error inspecting the same pod (has infra container and a transient container that failed to create) and doesn't update the old/new pod records 1. relist calls 3+ don't inspect the pod any more (just has infra container so it doesn't look like anything changed) This change adds a new list that keeps track of pods that failed inspection and retries them the next time relist is called. Without this change, a pod in this state would never be inspected again, its entry in the status cache would never be updated, and the pod worker would never call syncPod again because the most recent entry in the status cache has an error associated with it. Without this change, pods in this state would be stuck Terminating forever, unless the user issued a deletion with a grace period value of 0. Fixes #24819 cc @kubernetes/rh-cluster-infra @kubernetes/sig-node
This commit is contained in:
commit
660050631e
@ -60,6 +60,9 @@ type GenericPLEG struct {
|
||||
cache kubecontainer.Cache
|
||||
// For testability.
|
||||
clock util.Clock
|
||||
// Pods that failed to have their status retrieved during a relist. These pods will be
|
||||
// retried during the next relisting.
|
||||
podsToReinspect map[types.UID]*kubecontainer.Pod
|
||||
}
|
||||
|
||||
// plegContainerState has a one-to-one mapping to the
|
||||
@ -210,6 +213,11 @@ func (g *GenericPLEG) relist() {
|
||||
}
|
||||
}
|
||||
|
||||
var needsReinspection map[types.UID]*kubecontainer.Pod
|
||||
if g.cacheEnabled() {
|
||||
needsReinspection = make(map[types.UID]*kubecontainer.Pod)
|
||||
}
|
||||
|
||||
// If there are events associated with a pod, we should update the
|
||||
// podCache.
|
||||
for pid, events := range eventsByPodID {
|
||||
@ -226,7 +234,16 @@ func (g *GenericPLEG) relist() {
|
||||
// parallelize if needed.
|
||||
if err := g.updateCache(pod, pid); err != nil {
|
||||
glog.Errorf("PLEG: Ignoring events for pod %s/%s: %v", pod.Name, pod.Namespace, err)
|
||||
|
||||
// make sure we try to reinspect the pod during the next relisting
|
||||
needsReinspection[pid] = pod
|
||||
|
||||
continue
|
||||
} else if _, found := g.podsToReinspect[pid]; found {
|
||||
// this pod was in the list to reinspect and we did so because it had events, so remove it
|
||||
// from the list (we don't want the reinspection code below to inspect it a second time in
|
||||
// this relist execution)
|
||||
delete(g.podsToReinspect, pid)
|
||||
}
|
||||
}
|
||||
// Update the internal storage and send out the events.
|
||||
@ -241,10 +258,24 @@ func (g *GenericPLEG) relist() {
|
||||
}
|
||||
|
||||
if g.cacheEnabled() {
|
||||
// reinspect any pods that failed inspection during the previous relist
|
||||
if len(g.podsToReinspect) > 0 {
|
||||
glog.V(5).Infof("GenericPLEG: Reinspecting pods that previously failed inspection")
|
||||
for pid, pod := range g.podsToReinspect {
|
||||
if err := g.updateCache(pod, pid); err != nil {
|
||||
glog.Errorf("PLEG: pod %s/%s failed reinspection: %v", pod.Name, pod.Namespace, err)
|
||||
needsReinspection[pid] = pod
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Update the cache timestamp. This needs to happen *after*
|
||||
// all pods have been properly updated in the cache.
|
||||
g.cache.UpdateTime(timestamp)
|
||||
}
|
||||
|
||||
// make sure we retain the list of pods that need reinspecting the next time relist is called
|
||||
g.podsToReinspect = needsReinspection
|
||||
}
|
||||
|
||||
func getContainersFromPods(pods ...*kubecontainer.Pod) []*kubecontainer.Container {
|
||||
|
@ -17,6 +17,7 @@ limitations under the License.
|
||||
package pleg
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"reflect"
|
||||
"sort"
|
||||
@ -356,3 +357,71 @@ func TestHealthy(t *testing.T) {
|
||||
ok, _ = pleg.Healthy()
|
||||
assert.True(t, ok, "pleg should be healthy")
|
||||
}
|
||||
|
||||
func TestRelistWithReinspection(t *testing.T) {
|
||||
pleg, runtimeMock := newTestGenericPLEGWithRuntimeMock()
|
||||
ch := pleg.Watch()
|
||||
|
||||
infraContainer := createTestContainer("infra", kubecontainer.ContainerStateRunning)
|
||||
|
||||
podID := types.UID("test-pod")
|
||||
pods := []*kubecontainer.Pod{{
|
||||
ID: podID,
|
||||
Containers: []*kubecontainer.Container{infraContainer},
|
||||
}}
|
||||
runtimeMock.On("GetPods", true).Return(pods, nil).Once()
|
||||
|
||||
goodStatus := &kubecontainer.PodStatus{
|
||||
ID: podID,
|
||||
ContainerStatuses: []*kubecontainer.ContainerStatus{{ID: infraContainer.ID, State: infraContainer.State}},
|
||||
}
|
||||
runtimeMock.On("GetPodStatus", podID, "", "").Return(goodStatus, nil).Once()
|
||||
|
||||
goodEvent := &PodLifecycleEvent{ID: podID, Type: ContainerStarted, Data: infraContainer.ID.ID}
|
||||
|
||||
// listing 1 - everything ok, infra container set up for pod
|
||||
pleg.relist()
|
||||
actualEvents := getEventsFromChannel(ch)
|
||||
actualStatus, actualErr := pleg.cache.Get(podID)
|
||||
assert.Equal(t, goodStatus, actualStatus)
|
||||
assert.Equal(t, nil, actualErr)
|
||||
assert.Exactly(t, []*PodLifecycleEvent{goodEvent}, actualEvents)
|
||||
|
||||
// listing 2 - pretend runtime was in the middle of creating the non-infra container for the pod
|
||||
// and return an error during inspection
|
||||
transientContainer := createTestContainer("transient", kubecontainer.ContainerStateUnknown)
|
||||
podsWithTransientContainer := []*kubecontainer.Pod{{
|
||||
ID: podID,
|
||||
Containers: []*kubecontainer.Container{infraContainer, transientContainer},
|
||||
}}
|
||||
runtimeMock.On("GetPods", true).Return(podsWithTransientContainer, nil).Once()
|
||||
|
||||
badStatus := &kubecontainer.PodStatus{
|
||||
ID: podID,
|
||||
ContainerStatuses: []*kubecontainer.ContainerStatus{},
|
||||
}
|
||||
runtimeMock.On("GetPodStatus", podID, "", "").Return(badStatus, errors.New("inspection error")).Once()
|
||||
|
||||
pleg.relist()
|
||||
actualEvents = getEventsFromChannel(ch)
|
||||
actualStatus, actualErr = pleg.cache.Get(podID)
|
||||
assert.Equal(t, badStatus, actualStatus)
|
||||
assert.Equal(t, errors.New("inspection error"), actualErr)
|
||||
assert.Exactly(t, []*PodLifecycleEvent{}, actualEvents)
|
||||
|
||||
// listing 3 - pretend the transient container has now disappeared, leaving just the infra
|
||||
// container. Make sure the pod is reinspected for its status and the cache is updated.
|
||||
runtimeMock.On("GetPods", true).Return(pods, nil).Once()
|
||||
runtimeMock.On("GetPodStatus", podID, "", "").Return(goodStatus, nil).Once()
|
||||
|
||||
pleg.relist()
|
||||
actualEvents = getEventsFromChannel(ch)
|
||||
actualStatus, actualErr = pleg.cache.Get(podID)
|
||||
assert.Equal(t, goodStatus, actualStatus)
|
||||
assert.Equal(t, nil, actualErr)
|
||||
// no events are expected because relist #1 set the old pod record which has the infra container
|
||||
// running. relist #2 had the inspection error and therefore didn't modify either old or new.
|
||||
// relist #3 forced the reinspection of the pod to retrieve its status, but because the list of
|
||||
// containers was the same as relist #1, nothing "changed", so there are no new events.
|
||||
assert.Exactly(t, []*PodLifecycleEvent{}, actualEvents)
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user