diff --git a/pkg/kubelet/container/cache.go b/pkg/kubelet/container/cache.go new file mode 100644 index 00000000000..55722ec5799 --- /dev/null +++ b/pkg/kubelet/container/cache.go @@ -0,0 +1,199 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package container + +import ( + "sync" + "time" + + "k8s.io/kubernetes/pkg/types" +) + +// Cache stores the PodStatus for the pods. It represents *all* the visible +// pods/containers in the container runtime. All cache entries are at least as +// new or newer than the global timestamp (set by UpdateTime()), while +// individual entries may be slightly newer than the global timestamp. If a pod +// has no states known by the runtime, Cache returns an empty PodStatus object +// with ID populated. +// +// Cache provides two methods to retrive the PodStatus: the non-blocking Get() +// and the blocking GetNewerThan() method. The component responsible for +// populating the cache is expected to call Delete() to explicitly free the +// cache entries. +type Cache interface { + Get(types.UID) (*PodStatus, error) + Set(types.UID, *PodStatus, error, time.Time) + // GetNewerThan is a blocking call that only returns the status + // when it is newer than the given time. + GetNewerThan(types.UID, time.Time) (*PodStatus, error) + Delete(types.UID) + UpdateTime(time.Time) +} + +type data struct { + // Status of the pod. + status *PodStatus + // Error got when trying to inspect the pod. + err error + // Time when the data was last modfied. + modified time.Time +} + +type subRecord struct { + time time.Time + ch chan *data +} + +// cache implements Cache. +type cache struct { + // Lock which guards all internal data structures. + lock sync.RWMutex + // Map that stores the pod statuses. + pods map[types.UID]*data + // A global timestamp represents how fresh the cached data is. All + // cache content is at the least newer than this timestamp. Note that the + // timestamp is nil after initialization, and will only become non-nil when + // it is ready to serve the cached statuses. + timestamp *time.Time + // Map that stores the subscriber records. + subscribers map[types.UID][]*subRecord +} + +// NewCache creates a pod cache. +func NewCache() Cache { + return &cache{pods: map[types.UID]*data{}, subscribers: map[types.UID][]*subRecord{}} +} + +// Get returns the PodStatus for the pod; callers are expected not to +// modify the objects returned. +func (c *cache) Get(id types.UID) (*PodStatus, error) { + c.lock.RLock() + defer c.lock.RUnlock() + d := c.get(id) + return d.status, d.err +} + +func (c *cache) GetNewerThan(id types.UID, minTime time.Time) (*PodStatus, error) { + ch := c.subscribe(id, minTime) + d := <-ch + return d.status, d.err +} + +// Set sets the PodStatus for the pod. +func (c *cache) Set(id types.UID, status *PodStatus, err error, timestamp time.Time) { + c.lock.Lock() + defer c.lock.Unlock() + defer c.notify(id, timestamp) + c.pods[id] = &data{status: status, err: err, modified: timestamp} +} + +// Delete removes the entry of the pod. +func (c *cache) Delete(id types.UID) { + c.lock.Lock() + defer c.lock.Unlock() + delete(c.pods, id) +} + +// UpdateTime modifies the global timestamp of the cache and notify +// subscribers if needed. +func (c *cache) UpdateTime(timestamp time.Time) { + c.lock.Lock() + defer c.lock.Unlock() + c.timestamp = ×tamp + // Notify all the subscribers if the condition is met. + for id := range c.subscribers { + c.notify(id, *c.timestamp) + } +} + +func makeDefaultData(id types.UID) *data { + return &data{status: &PodStatus{ID: id}, err: nil} +} + +func (c *cache) get(id types.UID) *data { + d, ok := c.pods[id] + if !ok { + // Cache should store *all* pod/container information known by the + // container runtime. A cache miss indicates that there are no states + // regarding the pod last time we queried the container runtime. + // What this *really* means is that there are no visible pod/containers + // associated with this pod. Simply return an default (mostly empty) + // PodStatus to reflect this. + return makeDefaultData(id) + } + return d +} + +// getIfNewerThan returns the data it is newer than the given time. +// Otherwise, it returns nil. The caller should acquire the lock. +func (c *cache) getIfNewerThan(id types.UID, minTime time.Time) *data { + d, ok := c.pods[id] + globalTimestampIsNewer := (c.timestamp != nil && c.timestamp.After(minTime)) + if !ok && globalTimestampIsNewer { + // Status is not cached, but the global timestamp is newer than + // minTime, return the default status. + return makeDefaultData(id) + } + if ok && (d.modified.After(minTime) || globalTimestampIsNewer) { + // Status is cached, return status if either of the following is true. + // * status was modified after minTime + // * the global timestamp of the cache is newer than minTime. + return d + } + // The pod status is not ready. + return nil +} + +// notify sends notifications for pod with the given id, if the requirements +// are met. Note that the caller should acquire the lock. +func (c *cache) notify(id types.UID, timestamp time.Time) { + list, ok := c.subscribers[id] + if !ok { + // No one to notify. + return + } + newList := []*subRecord{} + for i, r := range list { + if timestamp.Before(r.time) { + // Doesn't meet the time requirement; keep the record. + newList = append(newList, list[i]) + continue + } + r.ch <- c.get(id) + close(r.ch) + } + if len(newList) == 0 { + delete(c.subscribers, id) + } else { + c.subscribers[id] = newList + } +} + +func (c *cache) subscribe(id types.UID, timestamp time.Time) chan *data { + ch := make(chan *data, 1) + c.lock.Lock() + defer c.lock.Unlock() + d := c.getIfNewerThan(id, timestamp) + if d != nil { + // If the cache entry is ready, send the data and return immediatly. + ch <- d + return ch + } + // Add the subscription record. + c.subscribers[id] = append(c.subscribers[id], &subRecord{time: timestamp, ch: ch}) + return ch +} diff --git a/pkg/kubelet/container/cache_test.go b/pkg/kubelet/container/cache_test.go new file mode 100644 index 00000000000..5755005d841 --- /dev/null +++ b/pkg/kubelet/container/cache_test.go @@ -0,0 +1,210 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package container + +import ( + "fmt" + "strconv" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "k8s.io/kubernetes/pkg/types" +) + +func newTestCache() *cache { + c := NewCache() + return c.(*cache) +} + +func TestCacheNotInitialized(t *testing.T) { + cache := newTestCache() + // If the global timestamp is not set, always return nil. + d := cache.getIfNewerThan(types.UID("1234"), time.Time{}) + assert.True(t, d == nil, "should return nil since cache is not initialized") +} + +func getTestPodIDAndStatus(numContainers int) (types.UID, *PodStatus) { + id := types.UID(strconv.FormatInt(time.Now().UnixNano(), 10)) + name := fmt.Sprintf("cache-foo-%s", string(id)) + namespace := "ns" + var status *PodStatus + if numContainers > 0 { + status = &PodStatus{ID: id, Name: name, Namespace: namespace} + } else { + status = &PodStatus{ID: id} + } + for i := 0; i < numContainers; i++ { + status.ContainerStatuses = append(status.ContainerStatuses, &ContainerStatus{Name: string(i)}) + } + return id, status +} + +func TestGetIfNewerThanWhenPodExists(t *testing.T) { + cache := newTestCache() + timestamp := time.Now() + + cases := []struct { + cacheTime time.Time + modified time.Time + expected bool + }{ + { + // Both the global cache timestamp and the modified time are newer + // than the timestamp. + cacheTime: timestamp.Add(time.Second), + modified: timestamp, + expected: true, + }, + { + // Global cache timestamp is newer, but the pod entry modified + // time is older than the given timestamp. This means that the + // entry is up-to-date even though it hasn't changed for a while. + cacheTime: timestamp.Add(time.Second), + modified: timestamp.Add(-time.Second * 10), + expected: true, + }, + { + // Global cache timestamp is older, but the pod entry modified + // time is newer than the given timestamp. This means that the + // entry is up-to-date but the rest of the cache are still being + // updated. + cacheTime: timestamp.Add(-time.Second), + modified: timestamp.Add(time.Second * 3), + expected: true, + }, + { + // Both the global cache timestamp and the modified time are older + // than the given timestamp. + cacheTime: timestamp.Add(-time.Second), + modified: timestamp.Add(-time.Second), + expected: false, + }, + } + for i, c := range cases { + podID, status := getTestPodIDAndStatus(2) + cache.UpdateTime(c.cacheTime) + cache.Set(podID, status, nil, c.modified) + d := cache.getIfNewerThan(podID, timestamp) + assert.Equal(t, c.expected, d != nil, "test[%d]", i) + } +} + +func TestGetPodNewerThanWhenPodDoesNotExist(t *testing.T) { + cache := newTestCache() + cacheTime := time.Now() + cache.UpdateTime(cacheTime) + podID := types.UID("1234") + + cases := []struct { + timestamp time.Time + expected bool + }{ + { + timestamp: cacheTime.Add(-time.Second), + expected: true, + }, + { + timestamp: cacheTime.Add(time.Second), + expected: false, + }, + } + for i, c := range cases { + d := cache.getIfNewerThan(podID, c.timestamp) + assert.Equal(t, c.expected, d != nil, "test[%d]", i) + } +} + +func TestCacheSetAndGet(t *testing.T) { + cache := NewCache() + cases := []struct { + numContainers int + error error + }{ + {numContainers: 3, error: nil}, + {numContainers: 2, error: fmt.Errorf("unable to get status")}, + {numContainers: 0, error: nil}, + } + for i, c := range cases { + podID, status := getTestPodIDAndStatus(c.numContainers) + cache.Set(podID, status, c.error, time.Time{}) + // Read back the status and error stored in cache and make sure they + // match the original ones. + actualStatus, actualErr := cache.Get(podID) + assert.Equal(t, status, actualStatus, "test[%d]", i) + assert.Equal(t, c.error, actualErr, "test[%d]", i) + } +} + +func TestCacheGetPodDoesNotExist(t *testing.T) { + cache := NewCache() + podID, status := getTestPodIDAndStatus(0) + // If the pod does not exist in cache, cache should return an status + // object with id filled. + actualStatus, actualErr := cache.Get(podID) + assert.Equal(t, status, actualStatus) + assert.Equal(t, nil, actualErr) +} + +func TestDelete(t *testing.T) { + cache := &cache{pods: map[types.UID]*data{}} + // Write a new pod status into the cache. + podID, status := getTestPodIDAndStatus(3) + cache.Set(podID, status, nil, time.Time{}) + actualStatus, actualErr := cache.Get(podID) + assert.Equal(t, status, actualStatus) + assert.Equal(t, nil, actualErr) + // Delete the pod from cache, and verify that we get an empty status. + cache.Delete(podID) + expectedStatus := &PodStatus{ID: podID} + actualStatus, actualErr = cache.Get(podID) + assert.Equal(t, expectedStatus, actualStatus) + assert.Equal(t, nil, actualErr) +} + +func verifyNotification(t *testing.T, ch chan *data, expectNotification bool) { + if expectNotification { + assert.True(t, len(ch) > 0, "Did not receive notification") + } else { + assert.True(t, len(ch) < 1, "Should not have triggered the notification") + } + // Drain the channel. + for i := 0; i < len(ch); i++ { + <-ch + } +} + +func TestRegisterNotification(t *testing.T) { + cache := newTestCache() + cacheTime := time.Now() + cache.UpdateTime(cacheTime) + + podID, status := getTestPodIDAndStatus(1) + ch := cache.subscribe(podID, cacheTime.Add(time.Second)) + verifyNotification(t, ch, false) + cache.Set(podID, status, nil, cacheTime.Add(time.Second)) + // The Set operation should've triggered the notification. + verifyNotification(t, ch, true) + + podID, _ = getTestPodIDAndStatus(1) + + ch = cache.subscribe(podID, cacheTime.Add(time.Second)) + verifyNotification(t, ch, false) + cache.UpdateTime(cacheTime.Add(time.Second * 2)) + // The advance of cache timestamp should've triggered the notification. + verifyNotification(t, ch, true) +}