From a58c774c08b8ae92b6409197b13087e1af81e169 Mon Sep 17 00:00:00 2001 From: Ron Lai Date: Wed, 13 Jul 2016 16:48:43 -0700 Subject: [PATCH] Including ContainerRemoved in PLEG event reporting --- pkg/kubelet/kubelet.go | 24 ++++++++++++++++-------- pkg/kubelet/pleg/generic.go | 28 +++++++++++++--------------- pkg/kubelet/pleg/generic_test.go | 5 +++++ pkg/kubelet/pleg/pleg.go | 2 +- 4 files changed, 35 insertions(+), 24 deletions(-) diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 530d518d180..56ae2982430 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -2335,15 +2335,17 @@ func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handle } case e := <-plegCh: - // PLEG event for a pod; sync it. - pod, ok := kl.podManager.GetPodByUID(e.ID) - if !ok { - // If the pod no longer exists, ignore the event. - glog.V(4).Infof("SyncLoop (PLEG): ignore irrelevant event: %#v", e) - break + if isSyncPodWorthy(e) { + // PLEG event for a pod; sync it. + pod, ok := kl.podManager.GetPodByUID(e.ID) + if !ok { + // If the pod no longer exists, ignore the event. + glog.V(4).Infof("SyncLoop (PLEG): ignore irrelevant event: %#v", e) + break + } + glog.V(2).Infof("SyncLoop (PLEG): %q, event: %#v", format.Pod(pod), e) + handler.HandlePodSyncs([]*api.Pod{pod}) } - glog.V(2).Infof("SyncLoop (PLEG): %q, event: %#v", format.Pod(pod), e) - handler.HandlePodSyncs([]*api.Pod{pod}) case <-syncCh: // Sync pods waiting for sync podsToSync := kl.getPodsToSync() @@ -3563,3 +3565,9 @@ func (kl *Kubelet) ListenAndServe(address net.IP, port uint, tlsOptions *server. func (kl *Kubelet) ListenAndServeReadOnly(address net.IP, port uint) { server.ListenAndServeKubeletReadOnlyServer(kl, kl.resourceAnalyzer, address, port, kl.containerRuntime) } + +// Filter out events that are not worthy of pod syncing +func isSyncPodWorthy(event *pleg.PodLifecycleEvent) bool { + // ContatnerRemoved doesn't affect pod state + return event.Type != pleg.ContainerRemoved +} diff --git a/pkg/kubelet/pleg/generic.go b/pkg/kubelet/pleg/generic.go index 2841425a37c..c359273472b 100644 --- a/pkg/kubelet/pleg/generic.go +++ b/pkg/kubelet/pleg/generic.go @@ -134,29 +134,25 @@ func (g *GenericPLEG) Healthy() (bool, error) { return true, nil } -func generateEvent(podID types.UID, cid string, oldState, newState plegContainerState) *PodLifecycleEvent { +func generateEvents(podID types.UID, cid string, oldState, newState plegContainerState) []*PodLifecycleEvent { if newState == oldState { return nil } glog.V(4).Infof("GenericPLEG: %v/%v: %v -> %v", podID, cid, oldState, newState) switch newState { case plegContainerRunning: - return &PodLifecycleEvent{ID: podID, Type: ContainerStarted, Data: cid} + return []*PodLifecycleEvent{{ID: podID, Type: ContainerStarted, Data: cid}} case plegContainerExited: - return &PodLifecycleEvent{ID: podID, Type: ContainerDied, Data: cid} + return []*PodLifecycleEvent{{ID: podID, Type: ContainerDied, Data: cid}} case plegContainerUnknown: - return &PodLifecycleEvent{ID: podID, Type: ContainerChanged, Data: cid} + return []*PodLifecycleEvent{{ID: podID, Type: ContainerChanged, Data: cid}} case plegContainerNonExistent: - // We report "ContainerDied" when container was stopped OR removed. We - // may want to distinguish the two cases in the future. switch oldState { case plegContainerExited: // We already reported that the container died before. - return &PodLifecycleEvent{ID: podID, Type: ContainerRemoved, Data: cid} + return []*PodLifecycleEvent{{ID: podID, Type: ContainerRemoved, Data: cid}} default: - // TODO: We may want to generate a ContainerRemoved event as well. - // It's ok now because no one relies on the ContainerRemoved event. - return &PodLifecycleEvent{ID: podID, Type: ContainerDied, Data: cid} + return []*PodLifecycleEvent{{ID: podID, Type: ContainerDied, Data: cid}, {ID: podID, Type: ContainerRemoved, Data: cid}} } default: panic(fmt.Sprintf("unrecognized container state: %v", newState)) @@ -208,8 +204,10 @@ func (g *GenericPLEG) relist() { // Get all containers in the old and the new pod. allContainers := getContainersFromPods(oldPod, pod) for _, container := range allContainers { - e := computeEvent(oldPod, pod, &container.ID) - updateEvents(eventsByPodID, e) + events := computeEvents(oldPod, pod, &container.ID) + for _, e := range events { + updateEvents(eventsByPodID, e) + } } } @@ -250,7 +248,7 @@ func (g *GenericPLEG) relist() { g.podRecords.update(pid) for i := range events { // Filter out events that are not reliable and no other components use yet. - if events[i].Type == ContainerChanged || events[i].Type == ContainerRemoved { + if events[i].Type == ContainerChanged { continue } g.eventChannel <- events[i] @@ -297,7 +295,7 @@ func getContainersFromPods(pods ...*kubecontainer.Pod) []*kubecontainer.Containe return containers } -func computeEvent(oldPod, newPod *kubecontainer.Pod, cid *kubecontainer.ContainerID) *PodLifecycleEvent { +func computeEvents(oldPod, newPod *kubecontainer.Pod, cid *kubecontainer.ContainerID) []*PodLifecycleEvent { var pid types.UID if oldPod != nil { pid = oldPod.ID @@ -306,7 +304,7 @@ func computeEvent(oldPod, newPod *kubecontainer.Pod, cid *kubecontainer.Containe } oldState := getContainerState(oldPod, cid) newState := getContainerState(newPod, cid) - return generateEvent(pid, cid.ID, oldState, newState) + return generateEvents(pid, cid.ID, oldState, newState) } func (g *GenericPLEG) cacheEnabled() bool { diff --git a/pkg/kubelet/pleg/generic_test.go b/pkg/kubelet/pleg/generic_test.go index db004ba247d..70792ab7ce9 100644 --- a/pkg/kubelet/pleg/generic_test.go +++ b/pkg/kubelet/pleg/generic_test.go @@ -146,8 +146,10 @@ func TestRelisting(t *testing.T) { pleg.relist() // Only report containers that transitioned to running or exited status. expected = []*PodLifecycleEvent{ + {ID: "1234", Type: ContainerRemoved, Data: "c1"}, {ID: "1234", Type: ContainerDied, Data: "c2"}, {ID: "1234", Type: ContainerStarted, Data: "c3"}, + {ID: "4567", Type: ContainerRemoved, Data: "c1"}, {ID: "4567", Type: ContainerStarted, Data: "c4"}, } @@ -199,6 +201,8 @@ func testReportMissingContainers(t *testing.T, numRelists int) { pleg.relist() expected := []*PodLifecycleEvent{ {ID: "1234", Type: ContainerDied, Data: "c2"}, + {ID: "1234", Type: ContainerRemoved, Data: "c2"}, + {ID: "1234", Type: ContainerRemoved, Data: "c3"}, } actual := getEventsFromChannel(ch) verifyEvents(t, expected, actual) @@ -228,6 +232,7 @@ func testReportMissingPods(t *testing.T, numRelists int) { pleg.relist() expected := []*PodLifecycleEvent{ {ID: "1234", Type: ContainerDied, Data: "c2"}, + {ID: "1234", Type: ContainerRemoved, Data: "c2"}, } actual := getEventsFromChannel(ch) verifyEvents(t, expected, actual) diff --git a/pkg/kubelet/pleg/pleg.go b/pkg/kubelet/pleg/pleg.go index 49f839d9c63..78900e4e4b0 100644 --- a/pkg/kubelet/pleg/pleg.go +++ b/pkg/kubelet/pleg/pleg.go @@ -25,11 +25,11 @@ type PodLifeCycleEventType string const ( ContainerStarted PodLifeCycleEventType = "ContainerStarted" ContainerDied PodLifeCycleEventType = "ContainerDied" + ContainerRemoved PodLifeCycleEventType = "ContainerRemoved" // PodSync is used to trigger syncing of a pod when the observed change of // the state of the pod cannot be captured by any single event above. PodSync PodLifeCycleEventType = "PodSync" // Do not use the events below because they are disabled in GenericPLEG. - ContainerRemoved PodLifeCycleEventType = "ContainerRemoved" ContainerChanged PodLifeCycleEventType = "ContainerChanged" )