From 0fd2385e0bd8f639d59e1bd7992f415d6192a794 Mon Sep 17 00:00:00 2001 From: Yu-Ju Hong Date: Wed, 24 Aug 2016 14:12:23 -0700 Subject: [PATCH 1/2] Add "Sandboxes" to the kubecontainer.Pod This field will only be used by the kuberuntime package and should be ignored by other type of runtimes. --- pkg/kubelet/container/runtime.go | 5 ++++ pkg/kubelet/kuberuntime/helpers.go | 30 +++++++++++++++++++ .../kuberuntime/kuberuntime_manager.go | 17 ++++++++--- .../kuberuntime/kuberuntime_manager_test.go | 15 +++++++++- 4 files changed, 62 insertions(+), 5 deletions(-) diff --git a/pkg/kubelet/container/runtime.go b/pkg/kubelet/container/runtime.go index 1c86fa14b46..98d58b3aac4 100644 --- a/pkg/kubelet/container/runtime.go +++ b/pkg/kubelet/container/runtime.go @@ -153,6 +153,11 @@ type Pod struct { // List of containers that belongs to this pod. It may contain only // running containers, or mixed with dead ones (when GetPods(true)). Containers []*Container + // List of sandboxes associated with this pod. The sandboxes are converted + // to Container temporariliy to avoid substantial changes to other + // components. This is only populated by kuberuntime. + // TODO: use the runtimeApi.PodSandbox type directly. + Sandboxes []*Container } // PodPair contains both runtime#Pod and api#Pod diff --git a/pkg/kubelet/kuberuntime/helpers.go b/pkg/kubelet/kuberuntime/helpers.go index 7cc24dedc74..d0838573b99 100644 --- a/pkg/kubelet/kuberuntime/helpers.go +++ b/pkg/kubelet/kuberuntime/helpers.go @@ -64,6 +64,21 @@ func toKubeContainerState(state runtimeApi.ContainerState) kubecontainer.Contain return kubecontainer.ContainerStateUnknown } +// sandboxToKubeContainerState converts runtimeApi.PodSandboxState to +// kubecontainer.ContainerState. +// This is only needed because we need to return sandboxes as if they were +// kubecontainer.Containers to avoid substantial changes to PLEG. +// TODO: Remove this once it becomes obsolete. +func sandboxToKubeContainerState(state runtimeApi.PodSandBoxState) kubecontainer.ContainerState { + switch state { + case runtimeApi.PodSandBoxState_READY: + return kubecontainer.ContainerStateRunning + case runtimeApi.PodSandBoxState_NOTREADY: + return kubecontainer.ContainerStateExited + } + return kubecontainer.ContainerStateUnknown +} + // toRuntimeProtocol converts api.Protocol to runtimeApi.Protocol. func toRuntimeProtocol(protocol api.Protocol) runtimeApi.Protocol { switch protocol { @@ -94,6 +109,21 @@ func (m *kubeGenericRuntimeManager) toKubeContainer(c *runtimeApi.Container) (*k }, nil } +// sandboxToKubeContainer converts runtimeApi.PodSandbox to kubecontainer.Container. +// This is only needed because we need to return sandboxes as if they were +// kubecontainer.Containers to avoid substantial changes to PLEG. +// TODO: Remove this once it becomes obsolete. +func (m *kubeGenericRuntimeManager) sandboxToKubeContainer(s *runtimeApi.PodSandbox) (*kubecontainer.Container, error) { + if s == nil || s.Id == nil || s.State == nil { + return nil, fmt.Errorf("unable to convert a nil pointer to a runtime container") + } + + return &kubecontainer.Container{ + ID: kubecontainer.ContainerID{Type: m.runtimeName, ID: s.GetId()}, + State: sandboxToKubeContainerState(s.GetState()), + }, nil +} + // milliCPUToShares converts milliCPU to CPU shares func milliCPUToShares(milliCPU int64) int64 { if milliCPU == 0 { diff --git a/pkg/kubelet/kuberuntime/kuberuntime_manager.go b/pkg/kubelet/kuberuntime/kuberuntime_manager.go index 77d83b90f07..ca5d322cb3f 100644 --- a/pkg/kubelet/kuberuntime/kuberuntime_manager.go +++ b/pkg/kubelet/kuberuntime/kuberuntime_manager.go @@ -231,11 +231,20 @@ func (m *kubeGenericRuntimeManager) GetPods(all bool) ([]*kubecontainer.Pod, err } for _, s := range sandboxes { podUID := kubetypes.UID(s.Metadata.GetUid()) - pods[podUID] = &kubecontainer.Pod{ - ID: podUID, - Name: s.Metadata.GetName(), - Namespace: s.Metadata.GetNamespace(), + if _, ok := pods[podUID]; !ok { + pods[podUID] = &kubecontainer.Pod{ + ID: podUID, + Name: s.Metadata.GetName(), + Namespace: s.Metadata.GetNamespace(), + } } + p := pods[podUID] + converted, err := m.sandboxToKubeContainer(s) + if err != nil { + glog.Warningf("Convert %q sandbox %v of pod %q failed: %v", m.runtimeName, s, podUID, err) + continue + } + p.Sandboxes = append(p.Sandboxes, converted) } containers, err := m.getKubeletContainers(all) diff --git a/pkg/kubelet/kuberuntime/kuberuntime_manager_test.go b/pkg/kubelet/kuberuntime/kuberuntime_manager_test.go index 0f63d6eb6de..dae589df529 100644 --- a/pkg/kubelet/kuberuntime/kuberuntime_manager_test.go +++ b/pkg/kubelet/kuberuntime/kuberuntime_manager_test.go @@ -200,7 +200,7 @@ func TestGetPods(t *testing.T) { } // Set fake sandbox and fake containers to fakeRuntime. - _, fakeContainers, err := makeAndSetFakePod(m, fakeRuntime, pod) + fakeSandbox, fakeContainers, err := makeAndSetFakePod(m, fakeRuntime, pod) assert.NoError(t, err) // Convert the fakeContainers to kubecontainer.Container @@ -220,12 +220,25 @@ func TestGetPods(t *testing.T) { } containers[i] = c } + // Convert fakeSandbox to kubecontainer.Container + sandbox, err := m.sandboxToKubeContainer(&runtimeApi.PodSandbox{ + Id: fakeSandbox.Id, + Metadata: fakeSandbox.Metadata, + State: fakeSandbox.State, + CreatedAt: fakeSandbox.CreatedAt, + Labels: fakeSandbox.Labels, + }) + if err != nil { + t.Fatalf("unexpected error %v", err) + } + expected := []*kubecontainer.Pod{ { ID: kubetypes.UID("12345678"), Name: "foo", Namespace: "new", Containers: []*kubecontainer.Container{containers[0], containers[1]}, + Sandboxes: []*kubecontainer.Container{sandbox}, }, } From a49d28710a33a25e310c9264ff9fc58fd447a96f Mon Sep 17 00:00:00 2001 From: Yu-Ju Hong Date: Wed, 24 Aug 2016 17:08:37 -0700 Subject: [PATCH 2/2] Extend PLEG to handle pod sandboxes PLEG will treat them as if they are regular containers and detect changes the same manner. Note that this makes an assumption that container IDs will not collide with the podsandbox IDs. --- pkg/kubelet/container/runtime.go | 9 +++ .../kuberuntime/kuberuntime_manager.go | 19 ++++-- pkg/kubelet/pleg/generic.go | 26 +++++-- pkg/kubelet/pleg/generic_test.go | 68 ++++++++++++++++++- 4 files changed, 113 insertions(+), 9 deletions(-) diff --git a/pkg/kubelet/container/runtime.go b/pkg/kubelet/container/runtime.go index 98d58b3aac4..8a3a412157a 100644 --- a/pkg/kubelet/container/runtime.go +++ b/pkg/kubelet/container/runtime.go @@ -465,6 +465,15 @@ func (p *Pod) FindContainerByID(id ContainerID) *Container { return nil } +func (p *Pod) FindSandboxByID(id ContainerID) *Container { + for _, c := range p.Sandboxes { + if c.ID == id { + return c + } + } + return nil +} + // ToAPIPod converts Pod to api.Pod. Note that if a field in api.Pod has no // corresponding field in Pod, the field would not be populated. func (p *Pod) ToAPIPod() *api.Pod { diff --git a/pkg/kubelet/kuberuntime/kuberuntime_manager.go b/pkg/kubelet/kuberuntime/kuberuntime_manager.go index ca5d322cb3f..afa8283f139 100644 --- a/pkg/kubelet/kuberuntime/kuberuntime_manager.go +++ b/pkg/kubelet/kuberuntime/kuberuntime_manager.go @@ -229,7 +229,12 @@ func (m *kubeGenericRuntimeManager) GetPods(all bool) ([]*kubecontainer.Pod, err if err != nil { return nil, err } - for _, s := range sandboxes { + for i := range sandboxes { + s := sandboxes[i] + if s.Metadata == nil { + glog.V(4).Infof("Sandbox does not have metadata: %+v", s) + continue + } podUID := kubetypes.UID(s.Metadata.GetUid()) if _, ok := pods[podUID]; !ok { pods[podUID] = &kubecontainer.Pod{ @@ -241,7 +246,7 @@ func (m *kubeGenericRuntimeManager) GetPods(all bool) ([]*kubecontainer.Pod, err p := pods[podUID] converted, err := m.sandboxToKubeContainer(s) if err != nil { - glog.Warningf("Convert %q sandbox %v of pod %q failed: %v", m.runtimeName, s, podUID, err) + glog.V(4).Infof("Convert %q sandbox %v of pod %q failed: %v", m.runtimeName, s, podUID, err) continue } p.Sandboxes = append(p.Sandboxes, converted) @@ -251,7 +256,13 @@ func (m *kubeGenericRuntimeManager) GetPods(all bool) ([]*kubecontainer.Pod, err if err != nil { return nil, err } - for _, c := range containers { + for i := range containers { + c := containers[i] + if c.Metadata == nil { + glog.V(4).Infof("Container does not have metadata: %+v", c) + continue + } + labelledInfo := getContainerInfoFromLabels(c.Labels) pod, found := pods[labelledInfo.PodUID] if !found { @@ -265,7 +276,7 @@ func (m *kubeGenericRuntimeManager) GetPods(all bool) ([]*kubecontainer.Pod, err converted, err := m.toKubeContainer(c) if err != nil { - glog.Warningf("Convert %s container %v of pod %q failed: %v", m.runtimeName, c, labelledInfo.PodUID, err) + glog.V(4).Infof("Convert %s container %v of pod %q failed: %v", m.runtimeName, c, labelledInfo.PodUID, err) continue } diff --git a/pkg/kubelet/pleg/generic.go b/pkg/kubelet/pleg/generic.go index 375de16cf1b..64ebade8bb3 100644 --- a/pkg/kubelet/pleg/generic.go +++ b/pkg/kubelet/pleg/generic.go @@ -138,6 +138,7 @@ func generateEvents(podID types.UID, cid string, oldState, newState plegContaine if newState == oldState { return nil } + glog.V(4).Infof("GenericPLEG: %v/%v: %v -> %v", podID, cid, oldState, newState) switch newState { case plegContainerRunning: @@ -291,6 +292,17 @@ func getContainersFromPods(pods ...*kubecontainer.Pod) []*kubecontainer.Containe cidSet.Insert(cid) containers = append(containers, c) } + // Update sandboxes as containers + // TODO: keep track of sandboxes explicitly. + for _, c := range p.Sandboxes { + cid := string(c.ID.ID) + if cidSet.Has(cid) { + continue + } + cidSet.Insert(cid) + containers = append(containers, c) + } + } return containers } @@ -342,11 +354,17 @@ func getContainerState(pod *kubecontainer.Pod, cid *kubecontainer.ContainerID) p if pod == nil { return state } - container := pod.FindContainerByID(*cid) - if container == nil { - return state + c := pod.FindContainerByID(*cid) + if c != nil { + return convertState(c.State) } - return convertState(container.State) + // Search through sandboxes too. + c = pod.FindSandboxByID(*cid) + if c != nil { + return convertState(c.State) + } + + return state } func (pr podRecords) getOld(id types.UID) *kubecontainer.Pod { diff --git a/pkg/kubelet/pleg/generic_test.go b/pkg/kubelet/pleg/generic_test.go index 6b5cce5110c..3536e9f48cc 100644 --- a/pkg/kubelet/pleg/generic_test.go +++ b/pkg/kubelet/pleg/generic_test.go @@ -123,7 +123,7 @@ func TestRelisting(t *testing.T) { actual := getEventsFromChannel(ch) verifyEvents(t, expected, actual) - // The second relist should not send out any event because no container + // The second relist should not send out any event because no container has // changed. pleg.relist() verifyEvents(t, expected, actual) @@ -430,3 +430,69 @@ func TestRelistWithReinspection(t *testing.T) { // containers was the same as relist #1, nothing "changed", so there are no new events. assert.Exactly(t, []*PodLifecycleEvent{}, actualEvents) } + +// Test detecting sandbox state changes. +func TestRelistingWithSandboxes(t *testing.T) { + testPleg := newTestGenericPLEG() + pleg, runtime := testPleg.pleg, testPleg.runtime + ch := pleg.Watch() + // The first relist should send a PodSync event to each pod. + runtime.AllPodList = []*containertest.FakePod{ + {Pod: &kubecontainer.Pod{ + ID: "1234", + Sandboxes: []*kubecontainer.Container{ + createTestContainer("c1", kubecontainer.ContainerStateExited), + createTestContainer("c2", kubecontainer.ContainerStateRunning), + createTestContainer("c3", kubecontainer.ContainerStateUnknown), + }, + }}, + {Pod: &kubecontainer.Pod{ + ID: "4567", + Sandboxes: []*kubecontainer.Container{ + createTestContainer("c1", kubecontainer.ContainerStateExited), + }, + }}, + } + pleg.relist() + // Report every running/exited container if we see them for the first time. + expected := []*PodLifecycleEvent{ + {ID: "1234", Type: ContainerStarted, Data: "c2"}, + {ID: "4567", Type: ContainerDied, Data: "c1"}, + {ID: "1234", Type: ContainerDied, Data: "c1"}, + } + actual := getEventsFromChannel(ch) + verifyEvents(t, expected, actual) + + // The second relist should not send out any event because no container has + // changed. + pleg.relist() + verifyEvents(t, expected, actual) + + runtime.AllPodList = []*containertest.FakePod{ + {Pod: &kubecontainer.Pod{ + ID: "1234", + Sandboxes: []*kubecontainer.Container{ + createTestContainer("c2", kubecontainer.ContainerStateExited), + createTestContainer("c3", kubecontainer.ContainerStateRunning), + }, + }}, + {Pod: &kubecontainer.Pod{ + ID: "4567", + Sandboxes: []*kubecontainer.Container{ + createTestContainer("c4", kubecontainer.ContainerStateRunning), + }, + }}, + } + pleg.relist() + // Only report containers that transitioned to running or exited status. + expected = []*PodLifecycleEvent{ + {ID: "1234", Type: ContainerRemoved, Data: "c1"}, + {ID: "1234", Type: ContainerDied, Data: "c2"}, + {ID: "1234", Type: ContainerStarted, Data: "c3"}, + {ID: "4567", Type: ContainerRemoved, Data: "c1"}, + {ID: "4567", Type: ContainerStarted, Data: "c4"}, + } + + actual = getEventsFromChannel(ch) + verifyEvents(t, expected, actual) +}