diff --git a/pkg/kubelet/pleg/generic.go b/pkg/kubelet/pleg/generic.go index bf0d44d37a6..3685be30dfb 100644 --- a/pkg/kubelet/pleg/generic.go +++ b/pkg/kubelet/pleg/generic.go @@ -60,6 +60,9 @@ type GenericPLEG struct { cache kubecontainer.Cache // For testability. clock util.Clock + // Pods that failed to have their status retrieved during a relist. These pods will be + // retried during the next relisting. + podsToReinspect map[types.UID]*kubecontainer.Pod } // plegContainerState has a one-to-one mapping to the @@ -210,6 +213,11 @@ func (g *GenericPLEG) relist() { } } + var needsReinspection map[types.UID]*kubecontainer.Pod + if g.cacheEnabled() { + needsReinspection = make(map[types.UID]*kubecontainer.Pod) + } + // If there are events associated with a pod, we should update the // podCache. for pid, events := range eventsByPodID { @@ -226,7 +234,16 @@ func (g *GenericPLEG) relist() { // parallelize if needed. if err := g.updateCache(pod, pid); err != nil { glog.Errorf("PLEG: Ignoring events for pod %s/%s: %v", pod.Name, pod.Namespace, err) + + // make sure we try to reinspect the pod during the next relisting + needsReinspection[pid] = pod + continue + } else if _, found := g.podsToReinspect[pid]; found { + // this pod was in the list to reinspect and we did so because it had events, so remove it + // from the list (we don't want the reinspection code below to inspect it a second time in + // this relist execution) + delete(g.podsToReinspect, pid) } } // Update the internal storage and send out the events. @@ -241,10 +258,24 @@ func (g *GenericPLEG) relist() { } if g.cacheEnabled() { + // reinspect any pods that failed inspection during the previous relist + if len(g.podsToReinspect) > 0 { + glog.V(5).Infof("GenericPLEG: Reinspecting pods that previously failed inspection") + for pid, pod := range g.podsToReinspect { + if err := g.updateCache(pod, pid); err != nil { + glog.Errorf("PLEG: pod %s/%s failed reinspection: %v", pod.Name, pod.Namespace, err) + needsReinspection[pid] = pod + } + } + } + // Update the cache timestamp. This needs to happen *after* // all pods have been properly updated in the cache. g.cache.UpdateTime(timestamp) } + + // make sure we retain the list of pods that need reinspecting the next time relist is called + g.podsToReinspect = needsReinspection } func getContainersFromPods(pods ...*kubecontainer.Pod) []*kubecontainer.Container { diff --git a/pkg/kubelet/pleg/generic_test.go b/pkg/kubelet/pleg/generic_test.go index 3e3c0f24d43..460547d85a1 100644 --- a/pkg/kubelet/pleg/generic_test.go +++ b/pkg/kubelet/pleg/generic_test.go @@ -17,6 +17,7 @@ limitations under the License. package pleg import ( + "errors" "fmt" "reflect" "sort" @@ -356,3 +357,71 @@ func TestHealthy(t *testing.T) { ok, _ = pleg.Healthy() assert.True(t, ok, "pleg should be healthy") } + +func TestRelistWithReinspection(t *testing.T) { + pleg, runtimeMock := newTestGenericPLEGWithRuntimeMock() + ch := pleg.Watch() + + infraContainer := createTestContainer("infra", kubecontainer.ContainerStateRunning) + + podID := types.UID("test-pod") + pods := []*kubecontainer.Pod{{ + ID: podID, + Containers: []*kubecontainer.Container{infraContainer}, + }} + runtimeMock.On("GetPods", true).Return(pods, nil).Once() + + goodStatus := &kubecontainer.PodStatus{ + ID: podID, + ContainerStatuses: []*kubecontainer.ContainerStatus{{ID: infraContainer.ID, State: infraContainer.State}}, + } + runtimeMock.On("GetPodStatus", podID, "", "").Return(goodStatus, nil).Once() + + goodEvent := &PodLifecycleEvent{ID: podID, Type: ContainerStarted, Data: infraContainer.ID.ID} + + // listing 1 - everything ok, infra container set up for pod + pleg.relist() + actualEvents := getEventsFromChannel(ch) + actualStatus, actualErr := pleg.cache.Get(podID) + assert.Equal(t, goodStatus, actualStatus) + assert.Equal(t, nil, actualErr) + assert.Exactly(t, []*PodLifecycleEvent{goodEvent}, actualEvents) + + // listing 2 - pretend runtime was in the middle of creating the non-infra container for the pod + // and return an error during inspection + transientContainer := createTestContainer("transient", kubecontainer.ContainerStateUnknown) + podsWithTransientContainer := []*kubecontainer.Pod{{ + ID: podID, + Containers: []*kubecontainer.Container{infraContainer, transientContainer}, + }} + runtimeMock.On("GetPods", true).Return(podsWithTransientContainer, nil).Once() + + badStatus := &kubecontainer.PodStatus{ + ID: podID, + ContainerStatuses: []*kubecontainer.ContainerStatus{}, + } + runtimeMock.On("GetPodStatus", podID, "", "").Return(badStatus, errors.New("inspection error")).Once() + + pleg.relist() + actualEvents = getEventsFromChannel(ch) + actualStatus, actualErr = pleg.cache.Get(podID) + assert.Equal(t, badStatus, actualStatus) + assert.Equal(t, errors.New("inspection error"), actualErr) + assert.Exactly(t, []*PodLifecycleEvent{}, actualEvents) + + // listing 3 - pretend the transient container has now disappeared, leaving just the infra + // container. Make sure the pod is reinspected for its status and the cache is updated. + runtimeMock.On("GetPods", true).Return(pods, nil).Once() + runtimeMock.On("GetPodStatus", podID, "", "").Return(goodStatus, nil).Once() + + pleg.relist() + actualEvents = getEventsFromChannel(ch) + actualStatus, actualErr = pleg.cache.Get(podID) + assert.Equal(t, goodStatus, actualStatus) + assert.Equal(t, nil, actualErr) + // no events are expected because relist #1 set the old pod record which has the infra container + // running. relist #2 had the inspection error and therefore didn't modify either old or new. + // relist #3 forced the reinspection of the pod to retrieve its status, but because the list of + // containers was the same as relist #1, nothing "changed", so there are no new events. + assert.Exactly(t, []*PodLifecycleEvent{}, actualEvents) +}