From b56ed1a8c22263a1760aaeecee8c0768ac165dce Mon Sep 17 00:00:00 2001 From: Yu-Ju Hong Date: Fri, 8 Jan 2016 12:04:40 -0800 Subject: [PATCH] Support populating the runtime cache in PLEG This changes does not turn on this feature (cache) for kubelet. --- pkg/kubelet/kubelet.go | 4 +- pkg/kubelet/kubelet_test.go | 2 +- pkg/kubelet/pleg/generic.go | 171 ++++++++++++++++++++----------- pkg/kubelet/pleg/generic_test.go | 108 +++++++++++++++++++ 4 files changed, 223 insertions(+), 62 deletions(-) diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 84823f984ea..5a068015d15 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -367,7 +367,7 @@ func NewMainKubelet( serializeImagePulls, ) - klet.pleg = pleg.NewGenericPLEG(klet.containerRuntime, plegChannelCapacity, plegRelistPeriod) + klet.pleg = pleg.NewGenericPLEG(klet.containerRuntime, plegChannelCapacity, plegRelistPeriod, nil) case "rkt": conf := &rkt.Config{ Path: rktPath, @@ -388,7 +388,7 @@ func NewMainKubelet( return nil, err } klet.containerRuntime = rktRuntime - klet.pleg = pleg.NewGenericPLEG(klet.containerRuntime, plegChannelCapacity, plegRelistPeriod) + klet.pleg = pleg.NewGenericPLEG(klet.containerRuntime, plegChannelCapacity, plegRelistPeriod, nil) // No Docker daemon to put in a container. dockerDaemonContainer = "" diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index 282a8f83c1e..9b5ec2bf09c 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -174,7 +174,7 @@ func newTestKubelet(t *testing.T) *TestKubelet { kubelet.resyncInterval = 10 * time.Second kubelet.workQueue = queue.NewBasicWorkQueue() // Relist period does not affect the tests. - kubelet.pleg = pleg.NewGenericPLEG(fakeRuntime, 100, time.Hour) + kubelet.pleg = pleg.NewGenericPLEG(fakeRuntime, 100, time.Hour, nil) kubelet.clock = fakeClock return &TestKubelet{kubelet, fakeRuntime, mockCadvisor, fakeKubeClient, fakeMirrorClient, fakeClock} } diff --git a/pkg/kubelet/pleg/generic.go b/pkg/kubelet/pleg/generic.go index fa07d297ce5..07ecf71ed1b 100644 --- a/pkg/kubelet/pleg/generic.go +++ b/pkg/kubelet/pleg/generic.go @@ -25,6 +25,7 @@ import ( "k8s.io/kubernetes/pkg/kubelet/metrics" "k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/util" + "k8s.io/kubernetes/pkg/util/sets" ) // GenericPLEG is an extremely simple generic PLEG that relies solely on @@ -53,10 +54,12 @@ type GenericPLEG struct { podRecords podRecords // Time of the last relisting. lastRelistTime time.Time + // Cache for storing the runtime states required for syncing pods. + cache kubecontainer.Cache } -// plegContainerState has an one-to-one mapping to the -// kubecontainer.ContainerState except for the Non-existent state. This state +// plegContainerState has a one-to-one mapping to the +// kubecontainer.ContainerState except for the non-existent state. This state // is introduced here to complete the state transition scenarios. type plegContainerState string @@ -88,12 +91,13 @@ type podRecord struct { type podRecords map[types.UID]*podRecord func NewGenericPLEG(runtime kubecontainer.Runtime, channelCapacity int, - relistPeriod time.Duration) PodLifecycleEventGenerator { + relistPeriod time.Duration, cache kubecontainer.Cache) PodLifecycleEventGenerator { return &GenericPLEG{ relistPeriod: relistPeriod, runtime: runtime, eventChannel: make(chan *PodLifecycleEvent, channelCapacity), podRecords: make(podRecords), + cache: cache, } } @@ -161,64 +165,107 @@ func (g *GenericPLEG) relist() { return } pods := kubecontainer.Pods(podList) - - eventsByPodID := map[types.UID][]*PodLifecycleEvent{} - // Process all currently visible pods. for _, pod := range pods { g.podRecords.setCurrent(pod) - // Locate the old pod. - oldPod := g.podRecords.getOld(pod.ID) + } - // Process all currently visible containers in the pod. - for _, container := range pod.Containers { - cid := container.ID - oldState := getContainerState(oldPod, cid) - newState := convertState(container.State) - e := generateEvent(pod.ID, cid.ID, oldState, newState) + // Compare the old and the current pods, and generate events. + eventsByPodID := map[types.UID][]*PodLifecycleEvent{} + for pid := range g.podRecords { + oldPod := g.podRecords.getOld(pid) + pod := g.podRecords.getCurrent(pid) + // Get all containers in the old and the new pod. + allContainers := getContainersFromPods(oldPod, pod) + for _, container := range allContainers { + e := computeEvent(oldPod, pod, &container.ID) updateEvents(eventsByPodID, e) } + } - if oldPod == nil { - continue - } - // Process all containers in the old pod, but no longer in the new pod. - for _, oldContainer := range oldPod.Containers { - cid := oldContainer.ID - oldState := convertState(oldContainer.State) - newState := getContainerState(pod, cid) - if newState != plegContainerNonExistent { - // We already processed the container. + // If there are events associated with a pod, we should update the + // podCache. + for pid, events := range eventsByPodID { + pod := g.podRecords.getCurrent(pid) + if g.cacheEnabled() { + // updateCache() will inspect the pod and update the cache. If an + // error occurs during the inspection, we want PLEG to retry again + // in the next relist. To achieve this, we do not update the + // associated podRecord of the pod, so that the change will be + // detect again in the next relist. + // TODO: If many pods changed during the same relist period, + // inspecting the pod and getting the PodStatus to update the cache + // serially may take a while. We should be aware of this and + // parallelize if needed. + if err := g.updateCache(pod, pid); err != nil { + glog.Errorf("PLEG: Ignoring events for pod %s/%s: %v", pod.Name, pod.Namespace, err) continue } - // Container no longer visible, generate an event. - e := generateEvent(pod.ID, cid.ID, oldState, plegContainerNonExistent) - updateEvents(eventsByPodID, e) } - } - - // Process all pods that are no longer visible. - for pid := range g.podRecords { - if pod := g.podRecords.getCurrent(pid); pod != nil { - continue - } - oldPod := g.podRecords.getOld(pid) - for _, oldContainer := range oldPod.Containers { - cid := oldContainer.ID - oldState := convertState(oldContainer.State) - e := generateEvent(oldPod.ID, cid.ID, oldState, plegContainerNonExistent) - updateEvents(eventsByPodID, e) - } - } - - // Update the internal storage. - g.podRecords.updateAll() - - // Send out the events. - for _, events := range eventsByPodID { + // Update the internal storage and send out the events. + g.podRecords.update(pid) for i := range events { g.eventChannel <- events[i] } } + + if g.cacheEnabled() { + // Update the cache timestamp. This needs to happen *after* + // all pods have been properly updated in the cache. + g.cache.UpdateTime(timestamp) + } +} + +func getContainersFromPods(pods ...*kubecontainer.Pod) []*kubecontainer.Container { + cidSet := sets.NewString() + var containers []*kubecontainer.Container + for _, p := range pods { + if p == nil { + continue + } + for _, c := range p.Containers { + cid := string(c.ID.ID) + if cidSet.Has(cid) { + continue + } + cidSet.Insert(cid) + containers = append(containers, c) + } + } + return containers +} + +func computeEvent(oldPod, newPod *kubecontainer.Pod, cid *kubecontainer.ContainerID) *PodLifecycleEvent { + var pid types.UID + if oldPod != nil { + pid = oldPod.ID + } else if newPod != nil { + pid = newPod.ID + } + oldState := getContainerState(oldPod, cid) + newState := getContainerState(newPod, cid) + return generateEvent(pid, cid.ID, oldState, newState) +} + +func (g *GenericPLEG) cacheEnabled() bool { + return g.cache != nil +} + +func (g *GenericPLEG) updateCache(pod *kubecontainer.Pod, pid types.UID) error { + if pod == nil { + // The pod is missing in the current relist. This means that + // the pod has no visible (active or inactive) containers. + glog.V(8).Infof("PLEG: Delete status for pod %q", string(pid)) + g.cache.Delete(pid) + return nil + } + timestamp := time.Now() + // TODO: Consider adding a new runtime method + // GetPodStatus(pod *kubecontainer.Pod) so that Docker can avoid listing + // all containers again. + status, err := g.runtime.GetPodStatus(pod.ID, pod.Name, pod.Namespace) + glog.V(8).Infof("PLEG: Write status for %s/%s: %+v (err: %v)", pod.Name, pod.Namespace, status, err) + g.cache.Set(pod.ID, status, err, timestamp) + return err } func updateEvents(eventsByPodID map[types.UID][]*PodLifecycleEvent, e *PodLifecycleEvent) { @@ -228,13 +275,13 @@ func updateEvents(eventsByPodID map[types.UID][]*PodLifecycleEvent, e *PodLifecy eventsByPodID[e.ID] = append(eventsByPodID[e.ID], e) } -func getContainerState(pod *kubecontainer.Pod, cid kubecontainer.ContainerID) plegContainerState { +func getContainerState(pod *kubecontainer.Pod, cid *kubecontainer.ContainerID) plegContainerState { // Default to the non-existent state. state := plegContainerNonExistent if pod == nil { return state } - container := pod.FindContainerByID(cid) + container := pod.FindContainerByID(*cid) if container == nil { return state } @@ -265,14 +312,20 @@ func (pr podRecords) setCurrent(pod *kubecontainer.Pod) { pr[pod.ID] = &podRecord{current: pod} } -func (pr podRecords) updateAll() { - for k, r := range pr { - if r.current == nil { - // Pod no longer exists; delete the entry. - delete(pr, k) - continue - } - r.old = r.current - r.current = nil +func (pr podRecords) update(id types.UID) { + r, ok := pr[id] + if !ok { + return } + pr.updateInternal(id, r) +} + +func (pr podRecords) updateInternal(id types.UID, r *podRecord) { + if r.current == nil { + // Pod no longer exists; delete the entry. + delete(pr, id) + return + } + r.old = r.current + r.current = nil } diff --git a/pkg/kubelet/pleg/generic_test.go b/pkg/kubelet/pleg/generic_test.go index ff7b414b490..c702cf1a0ba 100644 --- a/pkg/kubelet/pleg/generic_test.go +++ b/pkg/kubelet/pleg/generic_test.go @@ -17,12 +17,15 @@ limitations under the License. package pleg import ( + "fmt" "reflect" "sort" "testing" "time" + "github.com/stretchr/testify/assert" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" + "k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/util" ) @@ -209,3 +212,108 @@ func TestReportMissingPods(t *testing.T) { actual := getEventsFromChannel(ch) verifyEvents(t, expected, actual) } + +func newTestGenericPLEGWithRuntimeMock() (*GenericPLEG, *kubecontainer.Mock) { + runtimeMock := &kubecontainer.Mock{} + pleg := &GenericPLEG{ + relistPeriod: time.Hour, + runtime: runtimeMock, + eventChannel: make(chan *PodLifecycleEvent, 100), + podRecords: make(podRecords), + cache: kubecontainer.NewCache(), + } + return pleg, runtimeMock +} + +func createTestPodsStatusesAndEvents(num int) ([]*kubecontainer.Pod, []*kubecontainer.PodStatus, []*PodLifecycleEvent) { + var pods []*kubecontainer.Pod + var statuses []*kubecontainer.PodStatus + var events []*PodLifecycleEvent + for i := 0; i < num; i++ { + id := types.UID(fmt.Sprintf("test-pod-%d", i)) + cState := kubecontainer.ContainerStateRunning + container := createTestContainer(fmt.Sprintf("c%d", i), cState) + pod := &kubecontainer.Pod{ + ID: id, + Containers: []*kubecontainer.Container{container}, + } + status := &kubecontainer.PodStatus{ + ID: id, + ContainerStatuses: []*kubecontainer.ContainerStatus{{ID: container.ID, State: cState}}, + } + event := &PodLifecycleEvent{ID: pod.ID, Type: ContainerStarted, Data: container.ID.ID} + pods = append(pods, pod) + statuses = append(statuses, status) + events = append(events, event) + + } + return pods, statuses, events +} + +func TestRelistWithCache(t *testing.T) { + pleg, runtimeMock := newTestGenericPLEGWithRuntimeMock() + ch := pleg.Watch() + + pods, statuses, events := createTestPodsStatusesAndEvents(2) + runtimeMock.On("GetPods", true).Return(pods, nil) + runtimeMock.On("GetPodStatus", pods[0].ID, "", "").Return(statuses[0], nil).Once() + // Inject an error when querying runtime for the pod status for pods[1]. + statusErr := fmt.Errorf("unable to get status") + runtimeMock.On("GetPodStatus", pods[1].ID, "", "").Return(&kubecontainer.PodStatus{}, statusErr).Once() + + pleg.relist() + actualEvents := getEventsFromChannel(ch) + cases := []struct { + pod *kubecontainer.Pod + status *kubecontainer.PodStatus + error error + }{ + {pod: pods[0], status: statuses[0], error: nil}, + {pod: pods[1], status: &kubecontainer.PodStatus{}, error: statusErr}, + } + for i, c := range cases { + testStr := fmt.Sprintf("test[%d]", i) + actualStatus, actualErr := pleg.cache.Get(c.pod.ID) + assert.Equal(t, c.status, actualStatus, testStr) + assert.Equal(t, c.error, actualErr, testStr) + } + // pleg should not generate any event for pods[1] because of the error. + assert.Exactly(t, []*PodLifecycleEvent{events[0]}, actualEvents) + + // Return normal status for pods[1]. + runtimeMock.On("GetPodStatus", pods[1].ID, "", "").Return(statuses[1], nil).Once() + pleg.relist() + actualEvents = getEventsFromChannel(ch) + cases = []struct { + pod *kubecontainer.Pod + status *kubecontainer.PodStatus + error error + }{ + {pod: pods[0], status: statuses[0], error: nil}, + {pod: pods[1], status: statuses[1], error: nil}, + } + for i, c := range cases { + testStr := fmt.Sprintf("test[%d]", i) + actualStatus, actualErr := pleg.cache.Get(c.pod.ID) + assert.Equal(t, c.status, actualStatus, testStr) + assert.Equal(t, c.error, actualErr, testStr) + } + // Now that we are able to query status for pods[1], pleg should generate an event. + assert.Exactly(t, []*PodLifecycleEvent{events[1]}, actualEvents) +} + +func TestRemoveCacheEntry(t *testing.T) { + pleg, runtimeMock := newTestGenericPLEGWithRuntimeMock() + pods, statuses, _ := createTestPodsStatusesAndEvents(1) + runtimeMock.On("GetPods", true).Return(pods, nil).Once() + runtimeMock.On("GetPodStatus", pods[0].ID, "", "").Return(statuses[0], nil).Once() + // Does a relist to populate the cache. + pleg.relist() + // Delete the pod from runtime. Verify that the cache entry has been + // removed after relisting. + runtimeMock.On("GetPods", true).Return([]*kubecontainer.Pod{}, nil).Once() + pleg.relist() + actualStatus, actualErr := pleg.cache.Get(pods[0].ID) + assert.Equal(t, &kubecontainer.PodStatus{ID: pods[0].ID}, actualStatus) + assert.Equal(t, nil, actualErr) +}