mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-02 16:29:21 +00:00
Add container.Cache for storing container.PodStatus
This cache will be used to stores the PodStatus of all pods/containers visible on the node. This will elimiate the need for pod workers to query the container runtime directly.
This commit is contained in:
parent
4ab505606b
commit
032c0a4074
199
pkg/kubelet/container/cache.go
Normal file
199
pkg/kubelet/container/cache.go
Normal file
@ -0,0 +1,199 @@
|
|||||||
|
/*
|
||||||
|
Copyright 2015 The Kubernetes Authors All rights reserved.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package container
|
||||||
|
|
||||||
|
import (
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"k8s.io/kubernetes/pkg/types"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Cache stores the PodStatus for the pods. It represents *all* the visible
|
||||||
|
// pods/containers in the container runtime. All cache entries are at least as
|
||||||
|
// new or newer than the global timestamp (set by UpdateTime()), while
|
||||||
|
// individual entries may be slightly newer than the global timestamp. If a pod
|
||||||
|
// has no states known by the runtime, Cache returns an empty PodStatus object
|
||||||
|
// with ID populated.
|
||||||
|
//
|
||||||
|
// Cache provides two methods to retrive the PodStatus: the non-blocking Get()
|
||||||
|
// and the blocking GetNewerThan() method. The component responsible for
|
||||||
|
// populating the cache is expected to call Delete() to explicitly free the
|
||||||
|
// cache entries.
|
||||||
|
type Cache interface {
|
||||||
|
Get(types.UID) (*PodStatus, error)
|
||||||
|
Set(types.UID, *PodStatus, error, time.Time)
|
||||||
|
// GetNewerThan is a blocking call that only returns the status
|
||||||
|
// when it is newer than the given time.
|
||||||
|
GetNewerThan(types.UID, time.Time) (*PodStatus, error)
|
||||||
|
Delete(types.UID)
|
||||||
|
UpdateTime(time.Time)
|
||||||
|
}
|
||||||
|
|
||||||
|
type data struct {
|
||||||
|
// Status of the pod.
|
||||||
|
status *PodStatus
|
||||||
|
// Error got when trying to inspect the pod.
|
||||||
|
err error
|
||||||
|
// Time when the data was last modfied.
|
||||||
|
modified time.Time
|
||||||
|
}
|
||||||
|
|
||||||
|
type subRecord struct {
|
||||||
|
time time.Time
|
||||||
|
ch chan *data
|
||||||
|
}
|
||||||
|
|
||||||
|
// cache implements Cache.
|
||||||
|
type cache struct {
|
||||||
|
// Lock which guards all internal data structures.
|
||||||
|
lock sync.RWMutex
|
||||||
|
// Map that stores the pod statuses.
|
||||||
|
pods map[types.UID]*data
|
||||||
|
// A global timestamp represents how fresh the cached data is. All
|
||||||
|
// cache content is at the least newer than this timestamp. Note that the
|
||||||
|
// timestamp is nil after initialization, and will only become non-nil when
|
||||||
|
// it is ready to serve the cached statuses.
|
||||||
|
timestamp *time.Time
|
||||||
|
// Map that stores the subscriber records.
|
||||||
|
subscribers map[types.UID][]*subRecord
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewCache creates a pod cache.
|
||||||
|
func NewCache() Cache {
|
||||||
|
return &cache{pods: map[types.UID]*data{}, subscribers: map[types.UID][]*subRecord{}}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get returns the PodStatus for the pod; callers are expected not to
|
||||||
|
// modify the objects returned.
|
||||||
|
func (c *cache) Get(id types.UID) (*PodStatus, error) {
|
||||||
|
c.lock.RLock()
|
||||||
|
defer c.lock.RUnlock()
|
||||||
|
d := c.get(id)
|
||||||
|
return d.status, d.err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *cache) GetNewerThan(id types.UID, minTime time.Time) (*PodStatus, error) {
|
||||||
|
ch := c.subscribe(id, minTime)
|
||||||
|
d := <-ch
|
||||||
|
return d.status, d.err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Set sets the PodStatus for the pod.
|
||||||
|
func (c *cache) Set(id types.UID, status *PodStatus, err error, timestamp time.Time) {
|
||||||
|
c.lock.Lock()
|
||||||
|
defer c.lock.Unlock()
|
||||||
|
defer c.notify(id, timestamp)
|
||||||
|
c.pods[id] = &data{status: status, err: err, modified: timestamp}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Delete removes the entry of the pod.
|
||||||
|
func (c *cache) Delete(id types.UID) {
|
||||||
|
c.lock.Lock()
|
||||||
|
defer c.lock.Unlock()
|
||||||
|
delete(c.pods, id)
|
||||||
|
}
|
||||||
|
|
||||||
|
// UpdateTime modifies the global timestamp of the cache and notify
|
||||||
|
// subscribers if needed.
|
||||||
|
func (c *cache) UpdateTime(timestamp time.Time) {
|
||||||
|
c.lock.Lock()
|
||||||
|
defer c.lock.Unlock()
|
||||||
|
c.timestamp = ×tamp
|
||||||
|
// Notify all the subscribers if the condition is met.
|
||||||
|
for id := range c.subscribers {
|
||||||
|
c.notify(id, *c.timestamp)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func makeDefaultData(id types.UID) *data {
|
||||||
|
return &data{status: &PodStatus{ID: id}, err: nil}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *cache) get(id types.UID) *data {
|
||||||
|
d, ok := c.pods[id]
|
||||||
|
if !ok {
|
||||||
|
// Cache should store *all* pod/container information known by the
|
||||||
|
// container runtime. A cache miss indicates that there are no states
|
||||||
|
// regarding the pod last time we queried the container runtime.
|
||||||
|
// What this *really* means is that there are no visible pod/containers
|
||||||
|
// associated with this pod. Simply return an default (mostly empty)
|
||||||
|
// PodStatus to reflect this.
|
||||||
|
return makeDefaultData(id)
|
||||||
|
}
|
||||||
|
return d
|
||||||
|
}
|
||||||
|
|
||||||
|
// getIfNewerThan returns the data it is newer than the given time.
|
||||||
|
// Otherwise, it returns nil. The caller should acquire the lock.
|
||||||
|
func (c *cache) getIfNewerThan(id types.UID, minTime time.Time) *data {
|
||||||
|
d, ok := c.pods[id]
|
||||||
|
globalTimestampIsNewer := (c.timestamp != nil && c.timestamp.After(minTime))
|
||||||
|
if !ok && globalTimestampIsNewer {
|
||||||
|
// Status is not cached, but the global timestamp is newer than
|
||||||
|
// minTime, return the default status.
|
||||||
|
return makeDefaultData(id)
|
||||||
|
}
|
||||||
|
if ok && (d.modified.After(minTime) || globalTimestampIsNewer) {
|
||||||
|
// Status is cached, return status if either of the following is true.
|
||||||
|
// * status was modified after minTime
|
||||||
|
// * the global timestamp of the cache is newer than minTime.
|
||||||
|
return d
|
||||||
|
}
|
||||||
|
// The pod status is not ready.
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// notify sends notifications for pod with the given id, if the requirements
|
||||||
|
// are met. Note that the caller should acquire the lock.
|
||||||
|
func (c *cache) notify(id types.UID, timestamp time.Time) {
|
||||||
|
list, ok := c.subscribers[id]
|
||||||
|
if !ok {
|
||||||
|
// No one to notify.
|
||||||
|
return
|
||||||
|
}
|
||||||
|
newList := []*subRecord{}
|
||||||
|
for i, r := range list {
|
||||||
|
if timestamp.Before(r.time) {
|
||||||
|
// Doesn't meet the time requirement; keep the record.
|
||||||
|
newList = append(newList, list[i])
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
r.ch <- c.get(id)
|
||||||
|
close(r.ch)
|
||||||
|
}
|
||||||
|
if len(newList) == 0 {
|
||||||
|
delete(c.subscribers, id)
|
||||||
|
} else {
|
||||||
|
c.subscribers[id] = newList
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *cache) subscribe(id types.UID, timestamp time.Time) chan *data {
|
||||||
|
ch := make(chan *data, 1)
|
||||||
|
c.lock.Lock()
|
||||||
|
defer c.lock.Unlock()
|
||||||
|
d := c.getIfNewerThan(id, timestamp)
|
||||||
|
if d != nil {
|
||||||
|
// If the cache entry is ready, send the data and return immediatly.
|
||||||
|
ch <- d
|
||||||
|
return ch
|
||||||
|
}
|
||||||
|
// Add the subscription record.
|
||||||
|
c.subscribers[id] = append(c.subscribers[id], &subRecord{time: timestamp, ch: ch})
|
||||||
|
return ch
|
||||||
|
}
|
210
pkg/kubelet/container/cache_test.go
Normal file
210
pkg/kubelet/container/cache_test.go
Normal file
@ -0,0 +1,210 @@
|
|||||||
|
/*
|
||||||
|
Copyright 2015 The Kubernetes Authors All rights reserved.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package container
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"strconv"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
"k8s.io/kubernetes/pkg/types"
|
||||||
|
)
|
||||||
|
|
||||||
|
func newTestCache() *cache {
|
||||||
|
c := NewCache()
|
||||||
|
return c.(*cache)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestCacheNotInitialized(t *testing.T) {
|
||||||
|
cache := newTestCache()
|
||||||
|
// If the global timestamp is not set, always return nil.
|
||||||
|
d := cache.getIfNewerThan(types.UID("1234"), time.Time{})
|
||||||
|
assert.True(t, d == nil, "should return nil since cache is not initialized")
|
||||||
|
}
|
||||||
|
|
||||||
|
func getTestPodIDAndStatus(numContainers int) (types.UID, *PodStatus) {
|
||||||
|
id := types.UID(strconv.FormatInt(time.Now().UnixNano(), 10))
|
||||||
|
name := fmt.Sprintf("cache-foo-%s", string(id))
|
||||||
|
namespace := "ns"
|
||||||
|
var status *PodStatus
|
||||||
|
if numContainers > 0 {
|
||||||
|
status = &PodStatus{ID: id, Name: name, Namespace: namespace}
|
||||||
|
} else {
|
||||||
|
status = &PodStatus{ID: id}
|
||||||
|
}
|
||||||
|
for i := 0; i < numContainers; i++ {
|
||||||
|
status.ContainerStatuses = append(status.ContainerStatuses, &ContainerStatus{Name: string(i)})
|
||||||
|
}
|
||||||
|
return id, status
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestGetIfNewerThanWhenPodExists(t *testing.T) {
|
||||||
|
cache := newTestCache()
|
||||||
|
timestamp := time.Now()
|
||||||
|
|
||||||
|
cases := []struct {
|
||||||
|
cacheTime time.Time
|
||||||
|
modified time.Time
|
||||||
|
expected bool
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
// Both the global cache timestamp and the modified time are newer
|
||||||
|
// than the timestamp.
|
||||||
|
cacheTime: timestamp.Add(time.Second),
|
||||||
|
modified: timestamp,
|
||||||
|
expected: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
// Global cache timestamp is newer, but the pod entry modified
|
||||||
|
// time is older than the given timestamp. This means that the
|
||||||
|
// entry is up-to-date even though it hasn't changed for a while.
|
||||||
|
cacheTime: timestamp.Add(time.Second),
|
||||||
|
modified: timestamp.Add(-time.Second * 10),
|
||||||
|
expected: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
// Global cache timestamp is older, but the pod entry modified
|
||||||
|
// time is newer than the given timestamp. This means that the
|
||||||
|
// entry is up-to-date but the rest of the cache are still being
|
||||||
|
// updated.
|
||||||
|
cacheTime: timestamp.Add(-time.Second),
|
||||||
|
modified: timestamp.Add(time.Second * 3),
|
||||||
|
expected: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
// Both the global cache timestamp and the modified time are older
|
||||||
|
// than the given timestamp.
|
||||||
|
cacheTime: timestamp.Add(-time.Second),
|
||||||
|
modified: timestamp.Add(-time.Second),
|
||||||
|
expected: false,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for i, c := range cases {
|
||||||
|
podID, status := getTestPodIDAndStatus(2)
|
||||||
|
cache.UpdateTime(c.cacheTime)
|
||||||
|
cache.Set(podID, status, nil, c.modified)
|
||||||
|
d := cache.getIfNewerThan(podID, timestamp)
|
||||||
|
assert.Equal(t, c.expected, d != nil, "test[%d]", i)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestGetPodNewerThanWhenPodDoesNotExist(t *testing.T) {
|
||||||
|
cache := newTestCache()
|
||||||
|
cacheTime := time.Now()
|
||||||
|
cache.UpdateTime(cacheTime)
|
||||||
|
podID := types.UID("1234")
|
||||||
|
|
||||||
|
cases := []struct {
|
||||||
|
timestamp time.Time
|
||||||
|
expected bool
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
timestamp: cacheTime.Add(-time.Second),
|
||||||
|
expected: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
timestamp: cacheTime.Add(time.Second),
|
||||||
|
expected: false,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for i, c := range cases {
|
||||||
|
d := cache.getIfNewerThan(podID, c.timestamp)
|
||||||
|
assert.Equal(t, c.expected, d != nil, "test[%d]", i)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestCacheSetAndGet(t *testing.T) {
|
||||||
|
cache := NewCache()
|
||||||
|
cases := []struct {
|
||||||
|
numContainers int
|
||||||
|
error error
|
||||||
|
}{
|
||||||
|
{numContainers: 3, error: nil},
|
||||||
|
{numContainers: 2, error: fmt.Errorf("unable to get status")},
|
||||||
|
{numContainers: 0, error: nil},
|
||||||
|
}
|
||||||
|
for i, c := range cases {
|
||||||
|
podID, status := getTestPodIDAndStatus(c.numContainers)
|
||||||
|
cache.Set(podID, status, c.error, time.Time{})
|
||||||
|
// Read back the status and error stored in cache and make sure they
|
||||||
|
// match the original ones.
|
||||||
|
actualStatus, actualErr := cache.Get(podID)
|
||||||
|
assert.Equal(t, status, actualStatus, "test[%d]", i)
|
||||||
|
assert.Equal(t, c.error, actualErr, "test[%d]", i)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestCacheGetPodDoesNotExist(t *testing.T) {
|
||||||
|
cache := NewCache()
|
||||||
|
podID, status := getTestPodIDAndStatus(0)
|
||||||
|
// If the pod does not exist in cache, cache should return an status
|
||||||
|
// object with id filled.
|
||||||
|
actualStatus, actualErr := cache.Get(podID)
|
||||||
|
assert.Equal(t, status, actualStatus)
|
||||||
|
assert.Equal(t, nil, actualErr)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestDelete(t *testing.T) {
|
||||||
|
cache := &cache{pods: map[types.UID]*data{}}
|
||||||
|
// Write a new pod status into the cache.
|
||||||
|
podID, status := getTestPodIDAndStatus(3)
|
||||||
|
cache.Set(podID, status, nil, time.Time{})
|
||||||
|
actualStatus, actualErr := cache.Get(podID)
|
||||||
|
assert.Equal(t, status, actualStatus)
|
||||||
|
assert.Equal(t, nil, actualErr)
|
||||||
|
// Delete the pod from cache, and verify that we get an empty status.
|
||||||
|
cache.Delete(podID)
|
||||||
|
expectedStatus := &PodStatus{ID: podID}
|
||||||
|
actualStatus, actualErr = cache.Get(podID)
|
||||||
|
assert.Equal(t, expectedStatus, actualStatus)
|
||||||
|
assert.Equal(t, nil, actualErr)
|
||||||
|
}
|
||||||
|
|
||||||
|
func verifyNotification(t *testing.T, ch chan *data, expectNotification bool) {
|
||||||
|
if expectNotification {
|
||||||
|
assert.True(t, len(ch) > 0, "Did not receive notification")
|
||||||
|
} else {
|
||||||
|
assert.True(t, len(ch) < 1, "Should not have triggered the notification")
|
||||||
|
}
|
||||||
|
// Drain the channel.
|
||||||
|
for i := 0; i < len(ch); i++ {
|
||||||
|
<-ch
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestRegisterNotification(t *testing.T) {
|
||||||
|
cache := newTestCache()
|
||||||
|
cacheTime := time.Now()
|
||||||
|
cache.UpdateTime(cacheTime)
|
||||||
|
|
||||||
|
podID, status := getTestPodIDAndStatus(1)
|
||||||
|
ch := cache.subscribe(podID, cacheTime.Add(time.Second))
|
||||||
|
verifyNotification(t, ch, false)
|
||||||
|
cache.Set(podID, status, nil, cacheTime.Add(time.Second))
|
||||||
|
// The Set operation should've triggered the notification.
|
||||||
|
verifyNotification(t, ch, true)
|
||||||
|
|
||||||
|
podID, _ = getTestPodIDAndStatus(1)
|
||||||
|
|
||||||
|
ch = cache.subscribe(podID, cacheTime.Add(time.Second))
|
||||||
|
verifyNotification(t, ch, false)
|
||||||
|
cache.UpdateTime(cacheTime.Add(time.Second * 2))
|
||||||
|
// The advance of cache timestamp should've triggered the notification.
|
||||||
|
verifyNotification(t, ch, true)
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user