diff --git a/pkg/kubelet/container/runtime.go b/pkg/kubelet/container/runtime.go index 2173a6c55ea..ec215654d19 100644 --- a/pkg/kubelet/container/runtime.go +++ b/pkg/kubelet/container/runtime.go @@ -442,6 +442,15 @@ func (p *Pod) FindContainerByName(containerName string) *Container { return nil } +func (p *Pod) FindContainerByID(id ContainerID) *Container { + for _, c := range p.Containers { + if c.ID == id { + return c + } + } + return nil +} + // ToAPIPod converts Pod to api.Pod. Note that if a field in api.Pod has no // corresponding field in Pod, the field would not be populated. func (p *Pod) ToAPIPod() *api.Pod { diff --git a/pkg/kubelet/pleg/generic.go b/pkg/kubelet/pleg/generic.go index 7816062cf89..fa07d297ce5 100644 --- a/pkg/kubelet/pleg/generic.go +++ b/pkg/kubelet/pleg/generic.go @@ -49,24 +49,51 @@ type GenericPLEG struct { runtime kubecontainer.Runtime // The channel from which the subscriber listens events. eventChannel chan *PodLifecycleEvent - // The internal cache for container information. - containers map[string]containerInfo + // The internal cache for pod/container information. + podRecords podRecords // Time of the last relisting. lastRelistTime time.Time } -type containerInfo struct { - podID types.UID - state kubecontainer.ContainerState +// plegContainerState has an one-to-one mapping to the +// kubecontainer.ContainerState except for the Non-existent state. This state +// is introduced here to complete the state transition scenarios. +type plegContainerState string + +const ( + plegContainerRunning plegContainerState = "running" + plegContainerExited plegContainerState = "exited" + plegContainerUnknown plegContainerState = "unknown" + plegContainerNonExistent plegContainerState = "non-existent" +) + +func convertState(state kubecontainer.ContainerState) plegContainerState { + switch state { + case kubecontainer.ContainerStateRunning: + return plegContainerRunning + case kubecontainer.ContainerStateExited: + return plegContainerExited + case kubecontainer.ContainerStateUnknown: + return plegContainerUnknown + default: + panic(fmt.Sprintf("unrecognized container state: %v", state)) + } } +type podRecord struct { + old *kubecontainer.Pod + current *kubecontainer.Pod +} + +type podRecords map[types.UID]*podRecord + func NewGenericPLEG(runtime kubecontainer.Runtime, channelCapacity int, relistPeriod time.Duration) PodLifecycleEventGenerator { return &GenericPLEG{ relistPeriod: relistPeriod, runtime: runtime, eventChannel: make(chan *PodLifecycleEvent, channelCapacity), - containers: make(map[string]containerInfo), + podRecords: make(podRecords), } } @@ -82,18 +109,30 @@ func (g *GenericPLEG) Start() { go util.Until(g.relist, g.relistPeriod, util.NeverStop) } -func generateEvent(podID types.UID, cid string, oldState, newState kubecontainer.ContainerState) *PodLifecycleEvent { +func generateEvent(podID types.UID, cid string, oldState, newState plegContainerState) *PodLifecycleEvent { + glog.V(7).Infof("GenericPLEG: %v/%v: %v -> %v", podID, cid, oldState, newState) if newState == oldState { return nil } switch newState { - case kubecontainer.ContainerStateRunning: + case plegContainerRunning: return &PodLifecycleEvent{ID: podID, Type: ContainerStarted, Data: cid} - case kubecontainer.ContainerStateExited: + case plegContainerExited: return &PodLifecycleEvent{ID: podID, Type: ContainerDied, Data: cid} - case kubecontainer.ContainerStateUnknown: + case plegContainerUnknown: // Don't generate any event if the status is unknown. return nil + case plegContainerNonExistent: + // We report "ContainerDied" when container was stopped OR removed. We + // may want to distinguish the two cases in the future. + switch oldState { + case plegContainerExited: + // We already reported that the container died before. There is no + // need to do it again. + return nil + default: + return &PodLifecycleEvent{ID: podID, Type: ContainerDied, Data: cid} + } default: panic(fmt.Sprintf("unrecognized container state: %v", newState)) } @@ -116,40 +155,124 @@ func (g *GenericPLEG) relist() { }() // Get all the pods. - pods, err := g.runtime.GetPods(true) + podList, err := g.runtime.GetPods(true) if err != nil { glog.Errorf("GenericPLEG: Unable to retrieve pods: %v", err) return } + pods := kubecontainer.Pods(podList) - events := []*PodLifecycleEvent{} - containers := make(map[string]containerInfo, len(g.containers)) - // Create a new containers map, compares container statuses, and generates - // correspoinding events. - for _, p := range pods { - for _, c := range p.Containers { - cid := c.ID.ID - // Get the of existing container info. Defaults to state unknown. - oldState := kubecontainer.ContainerStateUnknown - if info, ok := g.containers[cid]; ok { - oldState = info.state + eventsByPodID := map[types.UID][]*PodLifecycleEvent{} + // Process all currently visible pods. + for _, pod := range pods { + g.podRecords.setCurrent(pod) + // Locate the old pod. + oldPod := g.podRecords.getOld(pod.ID) + + // Process all currently visible containers in the pod. + for _, container := range pod.Containers { + cid := container.ID + oldState := getContainerState(oldPod, cid) + newState := convertState(container.State) + e := generateEvent(pod.ID, cid.ID, oldState, newState) + updateEvents(eventsByPodID, e) + } + + if oldPod == nil { + continue + } + // Process all containers in the old pod, but no longer in the new pod. + for _, oldContainer := range oldPod.Containers { + cid := oldContainer.ID + oldState := convertState(oldContainer.State) + newState := getContainerState(pod, cid) + if newState != plegContainerNonExistent { + // We already processed the container. + continue } - // Generate an event if required. - glog.V(7).Infof("GenericPLEG: %v/%v: %v -> %v", p.ID, cid, oldState, c.State) - if e := generateEvent(p.ID, cid, oldState, c.State); e != nil { - events = append(events, e) - } - // Write to the new cache. - containers[cid] = containerInfo{podID: p.ID, state: c.State} + // Container no longer visible, generate an event. + e := generateEvent(pod.ID, cid.ID, oldState, plegContainerNonExistent) + updateEvents(eventsByPodID, e) } } - // Swap the container info cache. This is purely to avoid the need of - // garbage collection. - g.containers = containers + // Process all pods that are no longer visible. + for pid := range g.podRecords { + if pod := g.podRecords.getCurrent(pid); pod != nil { + continue + } + oldPod := g.podRecords.getOld(pid) + for _, oldContainer := range oldPod.Containers { + cid := oldContainer.ID + oldState := convertState(oldContainer.State) + e := generateEvent(oldPod.ID, cid.ID, oldState, plegContainerNonExistent) + updateEvents(eventsByPodID, e) + } + } + + // Update the internal storage. + g.podRecords.updateAll() // Send out the events. - for i := range events { - g.eventChannel <- events[i] + for _, events := range eventsByPodID { + for i := range events { + g.eventChannel <- events[i] + } + } +} + +func updateEvents(eventsByPodID map[types.UID][]*PodLifecycleEvent, e *PodLifecycleEvent) { + if e == nil { + return + } + eventsByPodID[e.ID] = append(eventsByPodID[e.ID], e) +} + +func getContainerState(pod *kubecontainer.Pod, cid kubecontainer.ContainerID) plegContainerState { + // Default to the non-existent state. + state := plegContainerNonExistent + if pod == nil { + return state + } + container := pod.FindContainerByID(cid) + if container == nil { + return state + } + return convertState(container.State) +} + +func (pr podRecords) getOld(id types.UID) *kubecontainer.Pod { + r, ok := pr[id] + if !ok { + return nil + } + return r.old +} + +func (pr podRecords) getCurrent(id types.UID) *kubecontainer.Pod { + r, ok := pr[id] + if !ok { + return nil + } + return r.current +} + +func (pr podRecords) setCurrent(pod *kubecontainer.Pod) { + if r, ok := pr[pod.ID]; ok { + r.current = pod + return + } + pr[pod.ID] = &podRecord{current: pod} +} + +func (pr podRecords) updateAll() { + for k, r := range pr { + if r.current == nil { + // Pod no longer exists; delete the entry. + delete(pr, k) + continue + } + r.old = r.current + r.current = nil } } diff --git a/pkg/kubelet/pleg/generic_test.go b/pkg/kubelet/pleg/generic_test.go index 7f9562a4f53..ff7b414b490 100644 --- a/pkg/kubelet/pleg/generic_test.go +++ b/pkg/kubelet/pleg/generic_test.go @@ -43,7 +43,7 @@ func newTestGenericPLEG() *TestGenericPLEG { relistPeriod: time.Hour, runtime: fakeRuntime, eventChannel: make(chan *PodLifecycleEvent, 100), - containers: make(map[string]containerInfo), + podRecords: make(podRecords), } return &TestGenericPLEG{pleg: pleg, runtime: fakeRuntime} } @@ -79,7 +79,7 @@ func verifyEvents(t *testing.T, expected, actual []*PodLifecycleEvent) { sort.Sort(sortableEvents(expected)) sort.Sort(sortableEvents(actual)) if !reflect.DeepEqual(expected, actual) { - t.Errorf("Actual events differ from the expected; diff: %v", util.ObjectDiff(expected, actual)) + t.Errorf("Actual events differ from the expected; diff:\n %v", util.ObjectDiff(expected, actual)) } } @@ -87,7 +87,6 @@ func TestRelisting(t *testing.T) { testPleg := newTestGenericPLEG() pleg, runtime := testPleg.pleg, testPleg.runtime ch := pleg.Watch() - // The first relist should send a PodSync event to each pod. runtime.AllPodList = []*kubecontainer.Pod{ { @@ -146,3 +145,67 @@ func TestRelisting(t *testing.T) { actual = getEventsFromChannel(ch) verifyEvents(t, expected, actual) } + +func TestReportMissingContainers(t *testing.T) { + testPleg := newTestGenericPLEG() + pleg, runtime := testPleg.pleg, testPleg.runtime + ch := pleg.Watch() + runtime.AllPodList = []*kubecontainer.Pod{ + { + ID: "1234", + Containers: []*kubecontainer.Container{ + createTestContainer("c1", kubecontainer.ContainerStateRunning), + createTestContainer("c2", kubecontainer.ContainerStateRunning), + createTestContainer("c3", kubecontainer.ContainerStateExited), + }, + }, + } + // Drain the events from the channel + pleg.relist() + getEventsFromChannel(ch) + + // Container c2 was stopped and removed between relists. We should report + // the event. The exited container c3 was garbage collected (i.e., removed) + // between relists. We should ignore that event. + runtime.AllPodList = []*kubecontainer.Pod{ + { + ID: "1234", + Containers: []*kubecontainer.Container{ + createTestContainer("c1", kubecontainer.ContainerStateRunning), + }, + }, + } + pleg.relist() + expected := []*PodLifecycleEvent{ + {ID: "1234", Type: ContainerDied, Data: "c2"}, + } + actual := getEventsFromChannel(ch) + verifyEvents(t, expected, actual) +} + +func TestReportMissingPods(t *testing.T) { + testPleg := newTestGenericPLEG() + pleg, runtime := testPleg.pleg, testPleg.runtime + ch := pleg.Watch() + runtime.AllPodList = []*kubecontainer.Pod{ + { + ID: "1234", + Containers: []*kubecontainer.Container{ + createTestContainer("c2", kubecontainer.ContainerStateRunning), + }, + }, + } + // Drain the events from the channel + pleg.relist() + getEventsFromChannel(ch) + + // Container c2 was stopped and removed between relists. We should report + // the event. + runtime.AllPodList = []*kubecontainer.Pod{} + pleg.relist() + expected := []*PodLifecycleEvent{ + {ID: "1234", Type: ContainerDied, Data: "c2"}, + } + actual := getEventsFromChannel(ch) + verifyEvents(t, expected, actual) +}