diff --git a/pkg/kubelet/container/cache.go b/pkg/kubelet/container/cache.go new file mode 100644 index 00000000000..55722ec5799 --- /dev/null +++ b/pkg/kubelet/container/cache.go @@ -0,0 +1,199 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package container + +import ( + "sync" + "time" + + "k8s.io/kubernetes/pkg/types" +) + +// Cache stores the PodStatus for the pods. It represents *all* the visible +// pods/containers in the container runtime. All cache entries are at least as +// new or newer than the global timestamp (set by UpdateTime()), while +// individual entries may be slightly newer than the global timestamp. If a pod +// has no states known by the runtime, Cache returns an empty PodStatus object +// with ID populated. +// +// Cache provides two methods to retrive the PodStatus: the non-blocking Get() +// and the blocking GetNewerThan() method. The component responsible for +// populating the cache is expected to call Delete() to explicitly free the +// cache entries. +type Cache interface { + Get(types.UID) (*PodStatus, error) + Set(types.UID, *PodStatus, error, time.Time) + // GetNewerThan is a blocking call that only returns the status + // when it is newer than the given time. + GetNewerThan(types.UID, time.Time) (*PodStatus, error) + Delete(types.UID) + UpdateTime(time.Time) +} + +type data struct { + // Status of the pod. + status *PodStatus + // Error got when trying to inspect the pod. + err error + // Time when the data was last modfied. + modified time.Time +} + +type subRecord struct { + time time.Time + ch chan *data +} + +// cache implements Cache. +type cache struct { + // Lock which guards all internal data structures. + lock sync.RWMutex + // Map that stores the pod statuses. + pods map[types.UID]*data + // A global timestamp represents how fresh the cached data is. All + // cache content is at the least newer than this timestamp. Note that the + // timestamp is nil after initialization, and will only become non-nil when + // it is ready to serve the cached statuses. + timestamp *time.Time + // Map that stores the subscriber records. + subscribers map[types.UID][]*subRecord +} + +// NewCache creates a pod cache. +func NewCache() Cache { + return &cache{pods: map[types.UID]*data{}, subscribers: map[types.UID][]*subRecord{}} +} + +// Get returns the PodStatus for the pod; callers are expected not to +// modify the objects returned. +func (c *cache) Get(id types.UID) (*PodStatus, error) { + c.lock.RLock() + defer c.lock.RUnlock() + d := c.get(id) + return d.status, d.err +} + +func (c *cache) GetNewerThan(id types.UID, minTime time.Time) (*PodStatus, error) { + ch := c.subscribe(id, minTime) + d := <-ch + return d.status, d.err +} + +// Set sets the PodStatus for the pod. +func (c *cache) Set(id types.UID, status *PodStatus, err error, timestamp time.Time) { + c.lock.Lock() + defer c.lock.Unlock() + defer c.notify(id, timestamp) + c.pods[id] = &data{status: status, err: err, modified: timestamp} +} + +// Delete removes the entry of the pod. +func (c *cache) Delete(id types.UID) { + c.lock.Lock() + defer c.lock.Unlock() + delete(c.pods, id) +} + +// UpdateTime modifies the global timestamp of the cache and notify +// subscribers if needed. +func (c *cache) UpdateTime(timestamp time.Time) { + c.lock.Lock() + defer c.lock.Unlock() + c.timestamp = ×tamp + // Notify all the subscribers if the condition is met. + for id := range c.subscribers { + c.notify(id, *c.timestamp) + } +} + +func makeDefaultData(id types.UID) *data { + return &data{status: &PodStatus{ID: id}, err: nil} +} + +func (c *cache) get(id types.UID) *data { + d, ok := c.pods[id] + if !ok { + // Cache should store *all* pod/container information known by the + // container runtime. A cache miss indicates that there are no states + // regarding the pod last time we queried the container runtime. + // What this *really* means is that there are no visible pod/containers + // associated with this pod. Simply return an default (mostly empty) + // PodStatus to reflect this. + return makeDefaultData(id) + } + return d +} + +// getIfNewerThan returns the data it is newer than the given time. +// Otherwise, it returns nil. The caller should acquire the lock. +func (c *cache) getIfNewerThan(id types.UID, minTime time.Time) *data { + d, ok := c.pods[id] + globalTimestampIsNewer := (c.timestamp != nil && c.timestamp.After(minTime)) + if !ok && globalTimestampIsNewer { + // Status is not cached, but the global timestamp is newer than + // minTime, return the default status. + return makeDefaultData(id) + } + if ok && (d.modified.After(minTime) || globalTimestampIsNewer) { + // Status is cached, return status if either of the following is true. + // * status was modified after minTime + // * the global timestamp of the cache is newer than minTime. + return d + } + // The pod status is not ready. + return nil +} + +// notify sends notifications for pod with the given id, if the requirements +// are met. Note that the caller should acquire the lock. +func (c *cache) notify(id types.UID, timestamp time.Time) { + list, ok := c.subscribers[id] + if !ok { + // No one to notify. + return + } + newList := []*subRecord{} + for i, r := range list { + if timestamp.Before(r.time) { + // Doesn't meet the time requirement; keep the record. + newList = append(newList, list[i]) + continue + } + r.ch <- c.get(id) + close(r.ch) + } + if len(newList) == 0 { + delete(c.subscribers, id) + } else { + c.subscribers[id] = newList + } +} + +func (c *cache) subscribe(id types.UID, timestamp time.Time) chan *data { + ch := make(chan *data, 1) + c.lock.Lock() + defer c.lock.Unlock() + d := c.getIfNewerThan(id, timestamp) + if d != nil { + // If the cache entry is ready, send the data and return immediatly. + ch <- d + return ch + } + // Add the subscription record. + c.subscribers[id] = append(c.subscribers[id], &subRecord{time: timestamp, ch: ch}) + return ch +} diff --git a/pkg/kubelet/container/cache_test.go b/pkg/kubelet/container/cache_test.go new file mode 100644 index 00000000000..5755005d841 --- /dev/null +++ b/pkg/kubelet/container/cache_test.go @@ -0,0 +1,210 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package container + +import ( + "fmt" + "strconv" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "k8s.io/kubernetes/pkg/types" +) + +func newTestCache() *cache { + c := NewCache() + return c.(*cache) +} + +func TestCacheNotInitialized(t *testing.T) { + cache := newTestCache() + // If the global timestamp is not set, always return nil. + d := cache.getIfNewerThan(types.UID("1234"), time.Time{}) + assert.True(t, d == nil, "should return nil since cache is not initialized") +} + +func getTestPodIDAndStatus(numContainers int) (types.UID, *PodStatus) { + id := types.UID(strconv.FormatInt(time.Now().UnixNano(), 10)) + name := fmt.Sprintf("cache-foo-%s", string(id)) + namespace := "ns" + var status *PodStatus + if numContainers > 0 { + status = &PodStatus{ID: id, Name: name, Namespace: namespace} + } else { + status = &PodStatus{ID: id} + } + for i := 0; i < numContainers; i++ { + status.ContainerStatuses = append(status.ContainerStatuses, &ContainerStatus{Name: string(i)}) + } + return id, status +} + +func TestGetIfNewerThanWhenPodExists(t *testing.T) { + cache := newTestCache() + timestamp := time.Now() + + cases := []struct { + cacheTime time.Time + modified time.Time + expected bool + }{ + { + // Both the global cache timestamp and the modified time are newer + // than the timestamp. + cacheTime: timestamp.Add(time.Second), + modified: timestamp, + expected: true, + }, + { + // Global cache timestamp is newer, but the pod entry modified + // time is older than the given timestamp. This means that the + // entry is up-to-date even though it hasn't changed for a while. + cacheTime: timestamp.Add(time.Second), + modified: timestamp.Add(-time.Second * 10), + expected: true, + }, + { + // Global cache timestamp is older, but the pod entry modified + // time is newer than the given timestamp. This means that the + // entry is up-to-date but the rest of the cache are still being + // updated. + cacheTime: timestamp.Add(-time.Second), + modified: timestamp.Add(time.Second * 3), + expected: true, + }, + { + // Both the global cache timestamp and the modified time are older + // than the given timestamp. + cacheTime: timestamp.Add(-time.Second), + modified: timestamp.Add(-time.Second), + expected: false, + }, + } + for i, c := range cases { + podID, status := getTestPodIDAndStatus(2) + cache.UpdateTime(c.cacheTime) + cache.Set(podID, status, nil, c.modified) + d := cache.getIfNewerThan(podID, timestamp) + assert.Equal(t, c.expected, d != nil, "test[%d]", i) + } +} + +func TestGetPodNewerThanWhenPodDoesNotExist(t *testing.T) { + cache := newTestCache() + cacheTime := time.Now() + cache.UpdateTime(cacheTime) + podID := types.UID("1234") + + cases := []struct { + timestamp time.Time + expected bool + }{ + { + timestamp: cacheTime.Add(-time.Second), + expected: true, + }, + { + timestamp: cacheTime.Add(time.Second), + expected: false, + }, + } + for i, c := range cases { + d := cache.getIfNewerThan(podID, c.timestamp) + assert.Equal(t, c.expected, d != nil, "test[%d]", i) + } +} + +func TestCacheSetAndGet(t *testing.T) { + cache := NewCache() + cases := []struct { + numContainers int + error error + }{ + {numContainers: 3, error: nil}, + {numContainers: 2, error: fmt.Errorf("unable to get status")}, + {numContainers: 0, error: nil}, + } + for i, c := range cases { + podID, status := getTestPodIDAndStatus(c.numContainers) + cache.Set(podID, status, c.error, time.Time{}) + // Read back the status and error stored in cache and make sure they + // match the original ones. + actualStatus, actualErr := cache.Get(podID) + assert.Equal(t, status, actualStatus, "test[%d]", i) + assert.Equal(t, c.error, actualErr, "test[%d]", i) + } +} + +func TestCacheGetPodDoesNotExist(t *testing.T) { + cache := NewCache() + podID, status := getTestPodIDAndStatus(0) + // If the pod does not exist in cache, cache should return an status + // object with id filled. + actualStatus, actualErr := cache.Get(podID) + assert.Equal(t, status, actualStatus) + assert.Equal(t, nil, actualErr) +} + +func TestDelete(t *testing.T) { + cache := &cache{pods: map[types.UID]*data{}} + // Write a new pod status into the cache. + podID, status := getTestPodIDAndStatus(3) + cache.Set(podID, status, nil, time.Time{}) + actualStatus, actualErr := cache.Get(podID) + assert.Equal(t, status, actualStatus) + assert.Equal(t, nil, actualErr) + // Delete the pod from cache, and verify that we get an empty status. + cache.Delete(podID) + expectedStatus := &PodStatus{ID: podID} + actualStatus, actualErr = cache.Get(podID) + assert.Equal(t, expectedStatus, actualStatus) + assert.Equal(t, nil, actualErr) +} + +func verifyNotification(t *testing.T, ch chan *data, expectNotification bool) { + if expectNotification { + assert.True(t, len(ch) > 0, "Did not receive notification") + } else { + assert.True(t, len(ch) < 1, "Should not have triggered the notification") + } + // Drain the channel. + for i := 0; i < len(ch); i++ { + <-ch + } +} + +func TestRegisterNotification(t *testing.T) { + cache := newTestCache() + cacheTime := time.Now() + cache.UpdateTime(cacheTime) + + podID, status := getTestPodIDAndStatus(1) + ch := cache.subscribe(podID, cacheTime.Add(time.Second)) + verifyNotification(t, ch, false) + cache.Set(podID, status, nil, cacheTime.Add(time.Second)) + // The Set operation should've triggered the notification. + verifyNotification(t, ch, true) + + podID, _ = getTestPodIDAndStatus(1) + + ch = cache.subscribe(podID, cacheTime.Add(time.Second)) + verifyNotification(t, ch, false) + cache.UpdateTime(cacheTime.Add(time.Second * 2)) + // The advance of cache timestamp should've triggered the notification. + verifyNotification(t, ch, true) +} diff --git a/pkg/kubelet/container/runtime.go b/pkg/kubelet/container/runtime.go index 2173a6c55ea..ec215654d19 100644 --- a/pkg/kubelet/container/runtime.go +++ b/pkg/kubelet/container/runtime.go @@ -442,6 +442,15 @@ func (p *Pod) FindContainerByName(containerName string) *Container { return nil } +func (p *Pod) FindContainerByID(id ContainerID) *Container { + for _, c := range p.Containers { + if c.ID == id { + return c + } + } + return nil +} + // ToAPIPod converts Pod to api.Pod. Note that if a field in api.Pod has no // corresponding field in Pod, the field would not be populated. func (p *Pod) ToAPIPod() *api.Pod { diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 713748a91ad..5a7fc0b1817 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -367,7 +367,7 @@ func NewMainKubelet( serializeImagePulls, ) - klet.pleg = pleg.NewGenericPLEG(klet.containerRuntime, plegChannelCapacity, plegRelistPeriod) + klet.pleg = pleg.NewGenericPLEG(klet.containerRuntime, plegChannelCapacity, plegRelistPeriod, nil) case "rkt": conf := &rkt.Config{ Path: rktPath, @@ -388,7 +388,7 @@ func NewMainKubelet( return nil, err } klet.containerRuntime = rktRuntime - klet.pleg = pleg.NewGenericPLEG(klet.containerRuntime, plegChannelCapacity, plegRelistPeriod) + klet.pleg = pleg.NewGenericPLEG(klet.containerRuntime, plegChannelCapacity, plegRelistPeriod, nil) // No Docker daemon to put in a container. dockerDaemonContainer = "" diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index 4698371ebc1..00e8d09b4b7 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -183,7 +183,7 @@ func newTestKubelet(t *testing.T) *TestKubelet { } kubelet.workQueue = queue.NewBasicWorkQueue() // Relist period does not affect the tests. - kubelet.pleg = pleg.NewGenericPLEG(fakeRuntime, 100, time.Hour) + kubelet.pleg = pleg.NewGenericPLEG(fakeRuntime, 100, time.Hour, nil) kubelet.clock = fakeClock return &TestKubelet{kubelet, fakeRuntime, mockCadvisor, fakeKubeClient, fakeMirrorClient, fakeClock} } diff --git a/pkg/kubelet/pleg/generic.go b/pkg/kubelet/pleg/generic.go index 7816062cf89..07ecf71ed1b 100644 --- a/pkg/kubelet/pleg/generic.go +++ b/pkg/kubelet/pleg/generic.go @@ -25,6 +25,7 @@ import ( "k8s.io/kubernetes/pkg/kubelet/metrics" "k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/util" + "k8s.io/kubernetes/pkg/util/sets" ) // GenericPLEG is an extremely simple generic PLEG that relies solely on @@ -49,24 +50,54 @@ type GenericPLEG struct { runtime kubecontainer.Runtime // The channel from which the subscriber listens events. eventChannel chan *PodLifecycleEvent - // The internal cache for container information. - containers map[string]containerInfo + // The internal cache for pod/container information. + podRecords podRecords // Time of the last relisting. lastRelistTime time.Time + // Cache for storing the runtime states required for syncing pods. + cache kubecontainer.Cache } -type containerInfo struct { - podID types.UID - state kubecontainer.ContainerState +// plegContainerState has a one-to-one mapping to the +// kubecontainer.ContainerState except for the non-existent state. This state +// is introduced here to complete the state transition scenarios. +type plegContainerState string + +const ( + plegContainerRunning plegContainerState = "running" + plegContainerExited plegContainerState = "exited" + plegContainerUnknown plegContainerState = "unknown" + plegContainerNonExistent plegContainerState = "non-existent" +) + +func convertState(state kubecontainer.ContainerState) plegContainerState { + switch state { + case kubecontainer.ContainerStateRunning: + return plegContainerRunning + case kubecontainer.ContainerStateExited: + return plegContainerExited + case kubecontainer.ContainerStateUnknown: + return plegContainerUnknown + default: + panic(fmt.Sprintf("unrecognized container state: %v", state)) + } } +type podRecord struct { + old *kubecontainer.Pod + current *kubecontainer.Pod +} + +type podRecords map[types.UID]*podRecord + func NewGenericPLEG(runtime kubecontainer.Runtime, channelCapacity int, - relistPeriod time.Duration) PodLifecycleEventGenerator { + relistPeriod time.Duration, cache kubecontainer.Cache) PodLifecycleEventGenerator { return &GenericPLEG{ relistPeriod: relistPeriod, runtime: runtime, eventChannel: make(chan *PodLifecycleEvent, channelCapacity), - containers: make(map[string]containerInfo), + podRecords: make(podRecords), + cache: cache, } } @@ -82,18 +113,30 @@ func (g *GenericPLEG) Start() { go util.Until(g.relist, g.relistPeriod, util.NeverStop) } -func generateEvent(podID types.UID, cid string, oldState, newState kubecontainer.ContainerState) *PodLifecycleEvent { +func generateEvent(podID types.UID, cid string, oldState, newState plegContainerState) *PodLifecycleEvent { + glog.V(7).Infof("GenericPLEG: %v/%v: %v -> %v", podID, cid, oldState, newState) if newState == oldState { return nil } switch newState { - case kubecontainer.ContainerStateRunning: + case plegContainerRunning: return &PodLifecycleEvent{ID: podID, Type: ContainerStarted, Data: cid} - case kubecontainer.ContainerStateExited: + case plegContainerExited: return &PodLifecycleEvent{ID: podID, Type: ContainerDied, Data: cid} - case kubecontainer.ContainerStateUnknown: + case plegContainerUnknown: // Don't generate any event if the status is unknown. return nil + case plegContainerNonExistent: + // We report "ContainerDied" when container was stopped OR removed. We + // may want to distinguish the two cases in the future. + switch oldState { + case plegContainerExited: + // We already reported that the container died before. There is no + // need to do it again. + return nil + default: + return &PodLifecycleEvent{ID: podID, Type: ContainerDied, Data: cid} + } default: panic(fmt.Sprintf("unrecognized container state: %v", newState)) } @@ -116,40 +159,173 @@ func (g *GenericPLEG) relist() { }() // Get all the pods. - pods, err := g.runtime.GetPods(true) + podList, err := g.runtime.GetPods(true) if err != nil { glog.Errorf("GenericPLEG: Unable to retrieve pods: %v", err) return } + pods := kubecontainer.Pods(podList) + for _, pod := range pods { + g.podRecords.setCurrent(pod) + } - events := []*PodLifecycleEvent{} - containers := make(map[string]containerInfo, len(g.containers)) - // Create a new containers map, compares container statuses, and generates - // correspoinding events. - for _, p := range pods { - for _, c := range p.Containers { - cid := c.ID.ID - // Get the of existing container info. Defaults to state unknown. - oldState := kubecontainer.ContainerStateUnknown - if info, ok := g.containers[cid]; ok { - oldState = info.state - } - // Generate an event if required. - glog.V(7).Infof("GenericPLEG: %v/%v: %v -> %v", p.ID, cid, oldState, c.State) - if e := generateEvent(p.ID, cid, oldState, c.State); e != nil { - events = append(events, e) - } - // Write to the new cache. - containers[cid] = containerInfo{podID: p.ID, state: c.State} + // Compare the old and the current pods, and generate events. + eventsByPodID := map[types.UID][]*PodLifecycleEvent{} + for pid := range g.podRecords { + oldPod := g.podRecords.getOld(pid) + pod := g.podRecords.getCurrent(pid) + // Get all containers in the old and the new pod. + allContainers := getContainersFromPods(oldPod, pod) + for _, container := range allContainers { + e := computeEvent(oldPod, pod, &container.ID) + updateEvents(eventsByPodID, e) } } - // Swap the container info cache. This is purely to avoid the need of - // garbage collection. - g.containers = containers + // If there are events associated with a pod, we should update the + // podCache. + for pid, events := range eventsByPodID { + pod := g.podRecords.getCurrent(pid) + if g.cacheEnabled() { + // updateCache() will inspect the pod and update the cache. If an + // error occurs during the inspection, we want PLEG to retry again + // in the next relist. To achieve this, we do not update the + // associated podRecord of the pod, so that the change will be + // detect again in the next relist. + // TODO: If many pods changed during the same relist period, + // inspecting the pod and getting the PodStatus to update the cache + // serially may take a while. We should be aware of this and + // parallelize if needed. + if err := g.updateCache(pod, pid); err != nil { + glog.Errorf("PLEG: Ignoring events for pod %s/%s: %v", pod.Name, pod.Namespace, err) + continue + } + } + // Update the internal storage and send out the events. + g.podRecords.update(pid) + for i := range events { + g.eventChannel <- events[i] + } + } - // Send out the events. - for i := range events { - g.eventChannel <- events[i] + if g.cacheEnabled() { + // Update the cache timestamp. This needs to happen *after* + // all pods have been properly updated in the cache. + g.cache.UpdateTime(timestamp) } } + +func getContainersFromPods(pods ...*kubecontainer.Pod) []*kubecontainer.Container { + cidSet := sets.NewString() + var containers []*kubecontainer.Container + for _, p := range pods { + if p == nil { + continue + } + for _, c := range p.Containers { + cid := string(c.ID.ID) + if cidSet.Has(cid) { + continue + } + cidSet.Insert(cid) + containers = append(containers, c) + } + } + return containers +} + +func computeEvent(oldPod, newPod *kubecontainer.Pod, cid *kubecontainer.ContainerID) *PodLifecycleEvent { + var pid types.UID + if oldPod != nil { + pid = oldPod.ID + } else if newPod != nil { + pid = newPod.ID + } + oldState := getContainerState(oldPod, cid) + newState := getContainerState(newPod, cid) + return generateEvent(pid, cid.ID, oldState, newState) +} + +func (g *GenericPLEG) cacheEnabled() bool { + return g.cache != nil +} + +func (g *GenericPLEG) updateCache(pod *kubecontainer.Pod, pid types.UID) error { + if pod == nil { + // The pod is missing in the current relist. This means that + // the pod has no visible (active or inactive) containers. + glog.V(8).Infof("PLEG: Delete status for pod %q", string(pid)) + g.cache.Delete(pid) + return nil + } + timestamp := time.Now() + // TODO: Consider adding a new runtime method + // GetPodStatus(pod *kubecontainer.Pod) so that Docker can avoid listing + // all containers again. + status, err := g.runtime.GetPodStatus(pod.ID, pod.Name, pod.Namespace) + glog.V(8).Infof("PLEG: Write status for %s/%s: %+v (err: %v)", pod.Name, pod.Namespace, status, err) + g.cache.Set(pod.ID, status, err, timestamp) + return err +} + +func updateEvents(eventsByPodID map[types.UID][]*PodLifecycleEvent, e *PodLifecycleEvent) { + if e == nil { + return + } + eventsByPodID[e.ID] = append(eventsByPodID[e.ID], e) +} + +func getContainerState(pod *kubecontainer.Pod, cid *kubecontainer.ContainerID) plegContainerState { + // Default to the non-existent state. + state := plegContainerNonExistent + if pod == nil { + return state + } + container := pod.FindContainerByID(*cid) + if container == nil { + return state + } + return convertState(container.State) +} + +func (pr podRecords) getOld(id types.UID) *kubecontainer.Pod { + r, ok := pr[id] + if !ok { + return nil + } + return r.old +} + +func (pr podRecords) getCurrent(id types.UID) *kubecontainer.Pod { + r, ok := pr[id] + if !ok { + return nil + } + return r.current +} + +func (pr podRecords) setCurrent(pod *kubecontainer.Pod) { + if r, ok := pr[pod.ID]; ok { + r.current = pod + return + } + pr[pod.ID] = &podRecord{current: pod} +} + +func (pr podRecords) update(id types.UID) { + r, ok := pr[id] + if !ok { + return + } + pr.updateInternal(id, r) +} + +func (pr podRecords) updateInternal(id types.UID, r *podRecord) { + if r.current == nil { + // Pod no longer exists; delete the entry. + delete(pr, id) + return + } + r.old = r.current + r.current = nil +} diff --git a/pkg/kubelet/pleg/generic_test.go b/pkg/kubelet/pleg/generic_test.go index 7f9562a4f53..c702cf1a0ba 100644 --- a/pkg/kubelet/pleg/generic_test.go +++ b/pkg/kubelet/pleg/generic_test.go @@ -17,12 +17,15 @@ limitations under the License. package pleg import ( + "fmt" "reflect" "sort" "testing" "time" + "github.com/stretchr/testify/assert" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" + "k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/util" ) @@ -43,7 +46,7 @@ func newTestGenericPLEG() *TestGenericPLEG { relistPeriod: time.Hour, runtime: fakeRuntime, eventChannel: make(chan *PodLifecycleEvent, 100), - containers: make(map[string]containerInfo), + podRecords: make(podRecords), } return &TestGenericPLEG{pleg: pleg, runtime: fakeRuntime} } @@ -79,7 +82,7 @@ func verifyEvents(t *testing.T, expected, actual []*PodLifecycleEvent) { sort.Sort(sortableEvents(expected)) sort.Sort(sortableEvents(actual)) if !reflect.DeepEqual(expected, actual) { - t.Errorf("Actual events differ from the expected; diff: %v", util.ObjectDiff(expected, actual)) + t.Errorf("Actual events differ from the expected; diff:\n %v", util.ObjectDiff(expected, actual)) } } @@ -87,7 +90,6 @@ func TestRelisting(t *testing.T) { testPleg := newTestGenericPLEG() pleg, runtime := testPleg.pleg, testPleg.runtime ch := pleg.Watch() - // The first relist should send a PodSync event to each pod. runtime.AllPodList = []*kubecontainer.Pod{ { @@ -146,3 +148,172 @@ func TestRelisting(t *testing.T) { actual = getEventsFromChannel(ch) verifyEvents(t, expected, actual) } + +func TestReportMissingContainers(t *testing.T) { + testPleg := newTestGenericPLEG() + pleg, runtime := testPleg.pleg, testPleg.runtime + ch := pleg.Watch() + runtime.AllPodList = []*kubecontainer.Pod{ + { + ID: "1234", + Containers: []*kubecontainer.Container{ + createTestContainer("c1", kubecontainer.ContainerStateRunning), + createTestContainer("c2", kubecontainer.ContainerStateRunning), + createTestContainer("c3", kubecontainer.ContainerStateExited), + }, + }, + } + // Drain the events from the channel + pleg.relist() + getEventsFromChannel(ch) + + // Container c2 was stopped and removed between relists. We should report + // the event. The exited container c3 was garbage collected (i.e., removed) + // between relists. We should ignore that event. + runtime.AllPodList = []*kubecontainer.Pod{ + { + ID: "1234", + Containers: []*kubecontainer.Container{ + createTestContainer("c1", kubecontainer.ContainerStateRunning), + }, + }, + } + pleg.relist() + expected := []*PodLifecycleEvent{ + {ID: "1234", Type: ContainerDied, Data: "c2"}, + } + actual := getEventsFromChannel(ch) + verifyEvents(t, expected, actual) +} + +func TestReportMissingPods(t *testing.T) { + testPleg := newTestGenericPLEG() + pleg, runtime := testPleg.pleg, testPleg.runtime + ch := pleg.Watch() + runtime.AllPodList = []*kubecontainer.Pod{ + { + ID: "1234", + Containers: []*kubecontainer.Container{ + createTestContainer("c2", kubecontainer.ContainerStateRunning), + }, + }, + } + // Drain the events from the channel + pleg.relist() + getEventsFromChannel(ch) + + // Container c2 was stopped and removed between relists. We should report + // the event. + runtime.AllPodList = []*kubecontainer.Pod{} + pleg.relist() + expected := []*PodLifecycleEvent{ + {ID: "1234", Type: ContainerDied, Data: "c2"}, + } + actual := getEventsFromChannel(ch) + verifyEvents(t, expected, actual) +} + +func newTestGenericPLEGWithRuntimeMock() (*GenericPLEG, *kubecontainer.Mock) { + runtimeMock := &kubecontainer.Mock{} + pleg := &GenericPLEG{ + relistPeriod: time.Hour, + runtime: runtimeMock, + eventChannel: make(chan *PodLifecycleEvent, 100), + podRecords: make(podRecords), + cache: kubecontainer.NewCache(), + } + return pleg, runtimeMock +} + +func createTestPodsStatusesAndEvents(num int) ([]*kubecontainer.Pod, []*kubecontainer.PodStatus, []*PodLifecycleEvent) { + var pods []*kubecontainer.Pod + var statuses []*kubecontainer.PodStatus + var events []*PodLifecycleEvent + for i := 0; i < num; i++ { + id := types.UID(fmt.Sprintf("test-pod-%d", i)) + cState := kubecontainer.ContainerStateRunning + container := createTestContainer(fmt.Sprintf("c%d", i), cState) + pod := &kubecontainer.Pod{ + ID: id, + Containers: []*kubecontainer.Container{container}, + } + status := &kubecontainer.PodStatus{ + ID: id, + ContainerStatuses: []*kubecontainer.ContainerStatus{{ID: container.ID, State: cState}}, + } + event := &PodLifecycleEvent{ID: pod.ID, Type: ContainerStarted, Data: container.ID.ID} + pods = append(pods, pod) + statuses = append(statuses, status) + events = append(events, event) + + } + return pods, statuses, events +} + +func TestRelistWithCache(t *testing.T) { + pleg, runtimeMock := newTestGenericPLEGWithRuntimeMock() + ch := pleg.Watch() + + pods, statuses, events := createTestPodsStatusesAndEvents(2) + runtimeMock.On("GetPods", true).Return(pods, nil) + runtimeMock.On("GetPodStatus", pods[0].ID, "", "").Return(statuses[0], nil).Once() + // Inject an error when querying runtime for the pod status for pods[1]. + statusErr := fmt.Errorf("unable to get status") + runtimeMock.On("GetPodStatus", pods[1].ID, "", "").Return(&kubecontainer.PodStatus{}, statusErr).Once() + + pleg.relist() + actualEvents := getEventsFromChannel(ch) + cases := []struct { + pod *kubecontainer.Pod + status *kubecontainer.PodStatus + error error + }{ + {pod: pods[0], status: statuses[0], error: nil}, + {pod: pods[1], status: &kubecontainer.PodStatus{}, error: statusErr}, + } + for i, c := range cases { + testStr := fmt.Sprintf("test[%d]", i) + actualStatus, actualErr := pleg.cache.Get(c.pod.ID) + assert.Equal(t, c.status, actualStatus, testStr) + assert.Equal(t, c.error, actualErr, testStr) + } + // pleg should not generate any event for pods[1] because of the error. + assert.Exactly(t, []*PodLifecycleEvent{events[0]}, actualEvents) + + // Return normal status for pods[1]. + runtimeMock.On("GetPodStatus", pods[1].ID, "", "").Return(statuses[1], nil).Once() + pleg.relist() + actualEvents = getEventsFromChannel(ch) + cases = []struct { + pod *kubecontainer.Pod + status *kubecontainer.PodStatus + error error + }{ + {pod: pods[0], status: statuses[0], error: nil}, + {pod: pods[1], status: statuses[1], error: nil}, + } + for i, c := range cases { + testStr := fmt.Sprintf("test[%d]", i) + actualStatus, actualErr := pleg.cache.Get(c.pod.ID) + assert.Equal(t, c.status, actualStatus, testStr) + assert.Equal(t, c.error, actualErr, testStr) + } + // Now that we are able to query status for pods[1], pleg should generate an event. + assert.Exactly(t, []*PodLifecycleEvent{events[1]}, actualEvents) +} + +func TestRemoveCacheEntry(t *testing.T) { + pleg, runtimeMock := newTestGenericPLEGWithRuntimeMock() + pods, statuses, _ := createTestPodsStatusesAndEvents(1) + runtimeMock.On("GetPods", true).Return(pods, nil).Once() + runtimeMock.On("GetPodStatus", pods[0].ID, "", "").Return(statuses[0], nil).Once() + // Does a relist to populate the cache. + pleg.relist() + // Delete the pod from runtime. Verify that the cache entry has been + // removed after relisting. + runtimeMock.On("GetPods", true).Return([]*kubecontainer.Pod{}, nil).Once() + pleg.relist() + actualStatus, actualErr := pleg.cache.Get(pods[0].ID) + assert.Equal(t, &kubecontainer.PodStatus{ID: pods[0].ID}, actualStatus) + assert.Equal(t, nil, actualErr) +} diff --git a/pkg/kubelet/util/queue/work_queue.go b/pkg/kubelet/util/queue/work_queue.go index 97ab440b79c..6870c18ace2 100644 --- a/pkg/kubelet/util/queue/work_queue.go +++ b/pkg/kubelet/util/queue/work_queue.go @@ -29,8 +29,7 @@ import ( type WorkQueue interface { // GetWork dequeues and returns all ready items. GetWork() []types.UID - // Enqueue inserts a new item or overwrites an existing item with the - // new timestamp (time.Now() + delay) if it is greater. + // Enqueue inserts a new item or overwrites an existing item. Enqueue(item types.UID, delay time.Duration) } @@ -64,10 +63,5 @@ func (q *basicWorkQueue) GetWork() []types.UID { func (q *basicWorkQueue) Enqueue(item types.UID, delay time.Duration) { q.lock.Lock() defer q.lock.Unlock() - now := q.clock.Now() - timestamp := now.Add(delay) - existing, ok := q.queue[item] - if !ok || (ok && existing.Before(timestamp)) { - q.queue[item] = timestamp - } + q.queue[item] = q.clock.Now().Add(delay) } diff --git a/pkg/kubelet/util/queue/work_queue_test.go b/pkg/kubelet/util/queue/work_queue_test.go index f105b037321..6dd452a9112 100644 --- a/pkg/kubelet/util/queue/work_queue_test.go +++ b/pkg/kubelet/util/queue/work_queue_test.go @@ -63,15 +63,3 @@ func TestGetWork(t *testing.T) { compareResults(t, expected, q.GetWork()) compareResults(t, []types.UID{}, q.GetWork()) } - -func TestEnqueueKeepGreaterTimestamp(t *testing.T) { - q, _ := newTestBasicWorkQueue() - item := types.UID("foo") - q.Enqueue(item, -7*time.Hour) - q.Enqueue(item, 3*time.Hour) - compareResults(t, []types.UID{}, q.GetWork()) - - q.Enqueue(item, 3*time.Hour) - q.Enqueue(item, -7*time.Hour) - compareResults(t, []types.UID{}, q.GetWork()) -}