mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-24 04:06:03 +00:00
Merge pull request #17700 from yujuhong/status_cache
Auto commit by PR queue bot
This commit is contained in:
commit
5462a37dfe
199
pkg/kubelet/container/cache.go
Normal file
199
pkg/kubelet/container/cache.go
Normal file
@ -0,0 +1,199 @@
|
||||
/*
|
||||
Copyright 2015 The Kubernetes Authors All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package container
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"k8s.io/kubernetes/pkg/types"
|
||||
)
|
||||
|
||||
// Cache stores the PodStatus for the pods. It represents *all* the visible
|
||||
// pods/containers in the container runtime. All cache entries are at least as
|
||||
// new or newer than the global timestamp (set by UpdateTime()), while
|
||||
// individual entries may be slightly newer than the global timestamp. If a pod
|
||||
// has no states known by the runtime, Cache returns an empty PodStatus object
|
||||
// with ID populated.
|
||||
//
|
||||
// Cache provides two methods to retrive the PodStatus: the non-blocking Get()
|
||||
// and the blocking GetNewerThan() method. The component responsible for
|
||||
// populating the cache is expected to call Delete() to explicitly free the
|
||||
// cache entries.
|
||||
type Cache interface {
|
||||
Get(types.UID) (*PodStatus, error)
|
||||
Set(types.UID, *PodStatus, error, time.Time)
|
||||
// GetNewerThan is a blocking call that only returns the status
|
||||
// when it is newer than the given time.
|
||||
GetNewerThan(types.UID, time.Time) (*PodStatus, error)
|
||||
Delete(types.UID)
|
||||
UpdateTime(time.Time)
|
||||
}
|
||||
|
||||
type data struct {
|
||||
// Status of the pod.
|
||||
status *PodStatus
|
||||
// Error got when trying to inspect the pod.
|
||||
err error
|
||||
// Time when the data was last modfied.
|
||||
modified time.Time
|
||||
}
|
||||
|
||||
type subRecord struct {
|
||||
time time.Time
|
||||
ch chan *data
|
||||
}
|
||||
|
||||
// cache implements Cache.
|
||||
type cache struct {
|
||||
// Lock which guards all internal data structures.
|
||||
lock sync.RWMutex
|
||||
// Map that stores the pod statuses.
|
||||
pods map[types.UID]*data
|
||||
// A global timestamp represents how fresh the cached data is. All
|
||||
// cache content is at the least newer than this timestamp. Note that the
|
||||
// timestamp is nil after initialization, and will only become non-nil when
|
||||
// it is ready to serve the cached statuses.
|
||||
timestamp *time.Time
|
||||
// Map that stores the subscriber records.
|
||||
subscribers map[types.UID][]*subRecord
|
||||
}
|
||||
|
||||
// NewCache creates a pod cache.
|
||||
func NewCache() Cache {
|
||||
return &cache{pods: map[types.UID]*data{}, subscribers: map[types.UID][]*subRecord{}}
|
||||
}
|
||||
|
||||
// Get returns the PodStatus for the pod; callers are expected not to
|
||||
// modify the objects returned.
|
||||
func (c *cache) Get(id types.UID) (*PodStatus, error) {
|
||||
c.lock.RLock()
|
||||
defer c.lock.RUnlock()
|
||||
d := c.get(id)
|
||||
return d.status, d.err
|
||||
}
|
||||
|
||||
func (c *cache) GetNewerThan(id types.UID, minTime time.Time) (*PodStatus, error) {
|
||||
ch := c.subscribe(id, minTime)
|
||||
d := <-ch
|
||||
return d.status, d.err
|
||||
}
|
||||
|
||||
// Set sets the PodStatus for the pod.
|
||||
func (c *cache) Set(id types.UID, status *PodStatus, err error, timestamp time.Time) {
|
||||
c.lock.Lock()
|
||||
defer c.lock.Unlock()
|
||||
defer c.notify(id, timestamp)
|
||||
c.pods[id] = &data{status: status, err: err, modified: timestamp}
|
||||
}
|
||||
|
||||
// Delete removes the entry of the pod.
|
||||
func (c *cache) Delete(id types.UID) {
|
||||
c.lock.Lock()
|
||||
defer c.lock.Unlock()
|
||||
delete(c.pods, id)
|
||||
}
|
||||
|
||||
// UpdateTime modifies the global timestamp of the cache and notify
|
||||
// subscribers if needed.
|
||||
func (c *cache) UpdateTime(timestamp time.Time) {
|
||||
c.lock.Lock()
|
||||
defer c.lock.Unlock()
|
||||
c.timestamp = ×tamp
|
||||
// Notify all the subscribers if the condition is met.
|
||||
for id := range c.subscribers {
|
||||
c.notify(id, *c.timestamp)
|
||||
}
|
||||
}
|
||||
|
||||
func makeDefaultData(id types.UID) *data {
|
||||
return &data{status: &PodStatus{ID: id}, err: nil}
|
||||
}
|
||||
|
||||
func (c *cache) get(id types.UID) *data {
|
||||
d, ok := c.pods[id]
|
||||
if !ok {
|
||||
// Cache should store *all* pod/container information known by the
|
||||
// container runtime. A cache miss indicates that there are no states
|
||||
// regarding the pod last time we queried the container runtime.
|
||||
// What this *really* means is that there are no visible pod/containers
|
||||
// associated with this pod. Simply return an default (mostly empty)
|
||||
// PodStatus to reflect this.
|
||||
return makeDefaultData(id)
|
||||
}
|
||||
return d
|
||||
}
|
||||
|
||||
// getIfNewerThan returns the data it is newer than the given time.
|
||||
// Otherwise, it returns nil. The caller should acquire the lock.
|
||||
func (c *cache) getIfNewerThan(id types.UID, minTime time.Time) *data {
|
||||
d, ok := c.pods[id]
|
||||
globalTimestampIsNewer := (c.timestamp != nil && c.timestamp.After(minTime))
|
||||
if !ok && globalTimestampIsNewer {
|
||||
// Status is not cached, but the global timestamp is newer than
|
||||
// minTime, return the default status.
|
||||
return makeDefaultData(id)
|
||||
}
|
||||
if ok && (d.modified.After(minTime) || globalTimestampIsNewer) {
|
||||
// Status is cached, return status if either of the following is true.
|
||||
// * status was modified after minTime
|
||||
// * the global timestamp of the cache is newer than minTime.
|
||||
return d
|
||||
}
|
||||
// The pod status is not ready.
|
||||
return nil
|
||||
}
|
||||
|
||||
// notify sends notifications for pod with the given id, if the requirements
|
||||
// are met. Note that the caller should acquire the lock.
|
||||
func (c *cache) notify(id types.UID, timestamp time.Time) {
|
||||
list, ok := c.subscribers[id]
|
||||
if !ok {
|
||||
// No one to notify.
|
||||
return
|
||||
}
|
||||
newList := []*subRecord{}
|
||||
for i, r := range list {
|
||||
if timestamp.Before(r.time) {
|
||||
// Doesn't meet the time requirement; keep the record.
|
||||
newList = append(newList, list[i])
|
||||
continue
|
||||
}
|
||||
r.ch <- c.get(id)
|
||||
close(r.ch)
|
||||
}
|
||||
if len(newList) == 0 {
|
||||
delete(c.subscribers, id)
|
||||
} else {
|
||||
c.subscribers[id] = newList
|
||||
}
|
||||
}
|
||||
|
||||
func (c *cache) subscribe(id types.UID, timestamp time.Time) chan *data {
|
||||
ch := make(chan *data, 1)
|
||||
c.lock.Lock()
|
||||
defer c.lock.Unlock()
|
||||
d := c.getIfNewerThan(id, timestamp)
|
||||
if d != nil {
|
||||
// If the cache entry is ready, send the data and return immediatly.
|
||||
ch <- d
|
||||
return ch
|
||||
}
|
||||
// Add the subscription record.
|
||||
c.subscribers[id] = append(c.subscribers[id], &subRecord{time: timestamp, ch: ch})
|
||||
return ch
|
||||
}
|
210
pkg/kubelet/container/cache_test.go
Normal file
210
pkg/kubelet/container/cache_test.go
Normal file
@ -0,0 +1,210 @@
|
||||
/*
|
||||
Copyright 2015 The Kubernetes Authors All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package container
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strconv"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"k8s.io/kubernetes/pkg/types"
|
||||
)
|
||||
|
||||
func newTestCache() *cache {
|
||||
c := NewCache()
|
||||
return c.(*cache)
|
||||
}
|
||||
|
||||
func TestCacheNotInitialized(t *testing.T) {
|
||||
cache := newTestCache()
|
||||
// If the global timestamp is not set, always return nil.
|
||||
d := cache.getIfNewerThan(types.UID("1234"), time.Time{})
|
||||
assert.True(t, d == nil, "should return nil since cache is not initialized")
|
||||
}
|
||||
|
||||
func getTestPodIDAndStatus(numContainers int) (types.UID, *PodStatus) {
|
||||
id := types.UID(strconv.FormatInt(time.Now().UnixNano(), 10))
|
||||
name := fmt.Sprintf("cache-foo-%s", string(id))
|
||||
namespace := "ns"
|
||||
var status *PodStatus
|
||||
if numContainers > 0 {
|
||||
status = &PodStatus{ID: id, Name: name, Namespace: namespace}
|
||||
} else {
|
||||
status = &PodStatus{ID: id}
|
||||
}
|
||||
for i := 0; i < numContainers; i++ {
|
||||
status.ContainerStatuses = append(status.ContainerStatuses, &ContainerStatus{Name: string(i)})
|
||||
}
|
||||
return id, status
|
||||
}
|
||||
|
||||
func TestGetIfNewerThanWhenPodExists(t *testing.T) {
|
||||
cache := newTestCache()
|
||||
timestamp := time.Now()
|
||||
|
||||
cases := []struct {
|
||||
cacheTime time.Time
|
||||
modified time.Time
|
||||
expected bool
|
||||
}{
|
||||
{
|
||||
// Both the global cache timestamp and the modified time are newer
|
||||
// than the timestamp.
|
||||
cacheTime: timestamp.Add(time.Second),
|
||||
modified: timestamp,
|
||||
expected: true,
|
||||
},
|
||||
{
|
||||
// Global cache timestamp is newer, but the pod entry modified
|
||||
// time is older than the given timestamp. This means that the
|
||||
// entry is up-to-date even though it hasn't changed for a while.
|
||||
cacheTime: timestamp.Add(time.Second),
|
||||
modified: timestamp.Add(-time.Second * 10),
|
||||
expected: true,
|
||||
},
|
||||
{
|
||||
// Global cache timestamp is older, but the pod entry modified
|
||||
// time is newer than the given timestamp. This means that the
|
||||
// entry is up-to-date but the rest of the cache are still being
|
||||
// updated.
|
||||
cacheTime: timestamp.Add(-time.Second),
|
||||
modified: timestamp.Add(time.Second * 3),
|
||||
expected: true,
|
||||
},
|
||||
{
|
||||
// Both the global cache timestamp and the modified time are older
|
||||
// than the given timestamp.
|
||||
cacheTime: timestamp.Add(-time.Second),
|
||||
modified: timestamp.Add(-time.Second),
|
||||
expected: false,
|
||||
},
|
||||
}
|
||||
for i, c := range cases {
|
||||
podID, status := getTestPodIDAndStatus(2)
|
||||
cache.UpdateTime(c.cacheTime)
|
||||
cache.Set(podID, status, nil, c.modified)
|
||||
d := cache.getIfNewerThan(podID, timestamp)
|
||||
assert.Equal(t, c.expected, d != nil, "test[%d]", i)
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetPodNewerThanWhenPodDoesNotExist(t *testing.T) {
|
||||
cache := newTestCache()
|
||||
cacheTime := time.Now()
|
||||
cache.UpdateTime(cacheTime)
|
||||
podID := types.UID("1234")
|
||||
|
||||
cases := []struct {
|
||||
timestamp time.Time
|
||||
expected bool
|
||||
}{
|
||||
{
|
||||
timestamp: cacheTime.Add(-time.Second),
|
||||
expected: true,
|
||||
},
|
||||
{
|
||||
timestamp: cacheTime.Add(time.Second),
|
||||
expected: false,
|
||||
},
|
||||
}
|
||||
for i, c := range cases {
|
||||
d := cache.getIfNewerThan(podID, c.timestamp)
|
||||
assert.Equal(t, c.expected, d != nil, "test[%d]", i)
|
||||
}
|
||||
}
|
||||
|
||||
func TestCacheSetAndGet(t *testing.T) {
|
||||
cache := NewCache()
|
||||
cases := []struct {
|
||||
numContainers int
|
||||
error error
|
||||
}{
|
||||
{numContainers: 3, error: nil},
|
||||
{numContainers: 2, error: fmt.Errorf("unable to get status")},
|
||||
{numContainers: 0, error: nil},
|
||||
}
|
||||
for i, c := range cases {
|
||||
podID, status := getTestPodIDAndStatus(c.numContainers)
|
||||
cache.Set(podID, status, c.error, time.Time{})
|
||||
// Read back the status and error stored in cache and make sure they
|
||||
// match the original ones.
|
||||
actualStatus, actualErr := cache.Get(podID)
|
||||
assert.Equal(t, status, actualStatus, "test[%d]", i)
|
||||
assert.Equal(t, c.error, actualErr, "test[%d]", i)
|
||||
}
|
||||
}
|
||||
|
||||
func TestCacheGetPodDoesNotExist(t *testing.T) {
|
||||
cache := NewCache()
|
||||
podID, status := getTestPodIDAndStatus(0)
|
||||
// If the pod does not exist in cache, cache should return an status
|
||||
// object with id filled.
|
||||
actualStatus, actualErr := cache.Get(podID)
|
||||
assert.Equal(t, status, actualStatus)
|
||||
assert.Equal(t, nil, actualErr)
|
||||
}
|
||||
|
||||
func TestDelete(t *testing.T) {
|
||||
cache := &cache{pods: map[types.UID]*data{}}
|
||||
// Write a new pod status into the cache.
|
||||
podID, status := getTestPodIDAndStatus(3)
|
||||
cache.Set(podID, status, nil, time.Time{})
|
||||
actualStatus, actualErr := cache.Get(podID)
|
||||
assert.Equal(t, status, actualStatus)
|
||||
assert.Equal(t, nil, actualErr)
|
||||
// Delete the pod from cache, and verify that we get an empty status.
|
||||
cache.Delete(podID)
|
||||
expectedStatus := &PodStatus{ID: podID}
|
||||
actualStatus, actualErr = cache.Get(podID)
|
||||
assert.Equal(t, expectedStatus, actualStatus)
|
||||
assert.Equal(t, nil, actualErr)
|
||||
}
|
||||
|
||||
func verifyNotification(t *testing.T, ch chan *data, expectNotification bool) {
|
||||
if expectNotification {
|
||||
assert.True(t, len(ch) > 0, "Did not receive notification")
|
||||
} else {
|
||||
assert.True(t, len(ch) < 1, "Should not have triggered the notification")
|
||||
}
|
||||
// Drain the channel.
|
||||
for i := 0; i < len(ch); i++ {
|
||||
<-ch
|
||||
}
|
||||
}
|
||||
|
||||
func TestRegisterNotification(t *testing.T) {
|
||||
cache := newTestCache()
|
||||
cacheTime := time.Now()
|
||||
cache.UpdateTime(cacheTime)
|
||||
|
||||
podID, status := getTestPodIDAndStatus(1)
|
||||
ch := cache.subscribe(podID, cacheTime.Add(time.Second))
|
||||
verifyNotification(t, ch, false)
|
||||
cache.Set(podID, status, nil, cacheTime.Add(time.Second))
|
||||
// The Set operation should've triggered the notification.
|
||||
verifyNotification(t, ch, true)
|
||||
|
||||
podID, _ = getTestPodIDAndStatus(1)
|
||||
|
||||
ch = cache.subscribe(podID, cacheTime.Add(time.Second))
|
||||
verifyNotification(t, ch, false)
|
||||
cache.UpdateTime(cacheTime.Add(time.Second * 2))
|
||||
// The advance of cache timestamp should've triggered the notification.
|
||||
verifyNotification(t, ch, true)
|
||||
}
|
@ -442,6 +442,15 @@ func (p *Pod) FindContainerByName(containerName string) *Container {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *Pod) FindContainerByID(id ContainerID) *Container {
|
||||
for _, c := range p.Containers {
|
||||
if c.ID == id {
|
||||
return c
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// ToAPIPod converts Pod to api.Pod. Note that if a field in api.Pod has no
|
||||
// corresponding field in Pod, the field would not be populated.
|
||||
func (p *Pod) ToAPIPod() *api.Pod {
|
||||
|
@ -367,7 +367,7 @@ func NewMainKubelet(
|
||||
serializeImagePulls,
|
||||
)
|
||||
|
||||
klet.pleg = pleg.NewGenericPLEG(klet.containerRuntime, plegChannelCapacity, plegRelistPeriod)
|
||||
klet.pleg = pleg.NewGenericPLEG(klet.containerRuntime, plegChannelCapacity, plegRelistPeriod, nil)
|
||||
case "rkt":
|
||||
conf := &rkt.Config{
|
||||
Path: rktPath,
|
||||
@ -388,7 +388,7 @@ func NewMainKubelet(
|
||||
return nil, err
|
||||
}
|
||||
klet.containerRuntime = rktRuntime
|
||||
klet.pleg = pleg.NewGenericPLEG(klet.containerRuntime, plegChannelCapacity, plegRelistPeriod)
|
||||
klet.pleg = pleg.NewGenericPLEG(klet.containerRuntime, plegChannelCapacity, plegRelistPeriod, nil)
|
||||
|
||||
// No Docker daemon to put in a container.
|
||||
dockerDaemonContainer = ""
|
||||
|
@ -183,7 +183,7 @@ func newTestKubelet(t *testing.T) *TestKubelet {
|
||||
}
|
||||
kubelet.workQueue = queue.NewBasicWorkQueue()
|
||||
// Relist period does not affect the tests.
|
||||
kubelet.pleg = pleg.NewGenericPLEG(fakeRuntime, 100, time.Hour)
|
||||
kubelet.pleg = pleg.NewGenericPLEG(fakeRuntime, 100, time.Hour, nil)
|
||||
kubelet.clock = fakeClock
|
||||
return &TestKubelet{kubelet, fakeRuntime, mockCadvisor, fakeKubeClient, fakeMirrorClient, fakeClock}
|
||||
}
|
||||
|
@ -25,6 +25,7 @@ import (
|
||||
"k8s.io/kubernetes/pkg/kubelet/metrics"
|
||||
"k8s.io/kubernetes/pkg/types"
|
||||
"k8s.io/kubernetes/pkg/util"
|
||||
"k8s.io/kubernetes/pkg/util/sets"
|
||||
)
|
||||
|
||||
// GenericPLEG is an extremely simple generic PLEG that relies solely on
|
||||
@ -49,24 +50,54 @@ type GenericPLEG struct {
|
||||
runtime kubecontainer.Runtime
|
||||
// The channel from which the subscriber listens events.
|
||||
eventChannel chan *PodLifecycleEvent
|
||||
// The internal cache for container information.
|
||||
containers map[string]containerInfo
|
||||
// The internal cache for pod/container information.
|
||||
podRecords podRecords
|
||||
// Time of the last relisting.
|
||||
lastRelistTime time.Time
|
||||
// Cache for storing the runtime states required for syncing pods.
|
||||
cache kubecontainer.Cache
|
||||
}
|
||||
|
||||
type containerInfo struct {
|
||||
podID types.UID
|
||||
state kubecontainer.ContainerState
|
||||
// plegContainerState has a one-to-one mapping to the
|
||||
// kubecontainer.ContainerState except for the non-existent state. This state
|
||||
// is introduced here to complete the state transition scenarios.
|
||||
type plegContainerState string
|
||||
|
||||
const (
|
||||
plegContainerRunning plegContainerState = "running"
|
||||
plegContainerExited plegContainerState = "exited"
|
||||
plegContainerUnknown plegContainerState = "unknown"
|
||||
plegContainerNonExistent plegContainerState = "non-existent"
|
||||
)
|
||||
|
||||
func convertState(state kubecontainer.ContainerState) plegContainerState {
|
||||
switch state {
|
||||
case kubecontainer.ContainerStateRunning:
|
||||
return plegContainerRunning
|
||||
case kubecontainer.ContainerStateExited:
|
||||
return plegContainerExited
|
||||
case kubecontainer.ContainerStateUnknown:
|
||||
return plegContainerUnknown
|
||||
default:
|
||||
panic(fmt.Sprintf("unrecognized container state: %v", state))
|
||||
}
|
||||
}
|
||||
|
||||
type podRecord struct {
|
||||
old *kubecontainer.Pod
|
||||
current *kubecontainer.Pod
|
||||
}
|
||||
|
||||
type podRecords map[types.UID]*podRecord
|
||||
|
||||
func NewGenericPLEG(runtime kubecontainer.Runtime, channelCapacity int,
|
||||
relistPeriod time.Duration) PodLifecycleEventGenerator {
|
||||
relistPeriod time.Duration, cache kubecontainer.Cache) PodLifecycleEventGenerator {
|
||||
return &GenericPLEG{
|
||||
relistPeriod: relistPeriod,
|
||||
runtime: runtime,
|
||||
eventChannel: make(chan *PodLifecycleEvent, channelCapacity),
|
||||
containers: make(map[string]containerInfo),
|
||||
podRecords: make(podRecords),
|
||||
cache: cache,
|
||||
}
|
||||
}
|
||||
|
||||
@ -82,18 +113,30 @@ func (g *GenericPLEG) Start() {
|
||||
go util.Until(g.relist, g.relistPeriod, util.NeverStop)
|
||||
}
|
||||
|
||||
func generateEvent(podID types.UID, cid string, oldState, newState kubecontainer.ContainerState) *PodLifecycleEvent {
|
||||
func generateEvent(podID types.UID, cid string, oldState, newState plegContainerState) *PodLifecycleEvent {
|
||||
glog.V(7).Infof("GenericPLEG: %v/%v: %v -> %v", podID, cid, oldState, newState)
|
||||
if newState == oldState {
|
||||
return nil
|
||||
}
|
||||
switch newState {
|
||||
case kubecontainer.ContainerStateRunning:
|
||||
case plegContainerRunning:
|
||||
return &PodLifecycleEvent{ID: podID, Type: ContainerStarted, Data: cid}
|
||||
case kubecontainer.ContainerStateExited:
|
||||
case plegContainerExited:
|
||||
return &PodLifecycleEvent{ID: podID, Type: ContainerDied, Data: cid}
|
||||
case kubecontainer.ContainerStateUnknown:
|
||||
case plegContainerUnknown:
|
||||
// Don't generate any event if the status is unknown.
|
||||
return nil
|
||||
case plegContainerNonExistent:
|
||||
// We report "ContainerDied" when container was stopped OR removed. We
|
||||
// may want to distinguish the two cases in the future.
|
||||
switch oldState {
|
||||
case plegContainerExited:
|
||||
// We already reported that the container died before. There is no
|
||||
// need to do it again.
|
||||
return nil
|
||||
default:
|
||||
return &PodLifecycleEvent{ID: podID, Type: ContainerDied, Data: cid}
|
||||
}
|
||||
default:
|
||||
panic(fmt.Sprintf("unrecognized container state: %v", newState))
|
||||
}
|
||||
@ -116,40 +159,173 @@ func (g *GenericPLEG) relist() {
|
||||
}()
|
||||
|
||||
// Get all the pods.
|
||||
pods, err := g.runtime.GetPods(true)
|
||||
podList, err := g.runtime.GetPods(true)
|
||||
if err != nil {
|
||||
glog.Errorf("GenericPLEG: Unable to retrieve pods: %v", err)
|
||||
return
|
||||
}
|
||||
pods := kubecontainer.Pods(podList)
|
||||
for _, pod := range pods {
|
||||
g.podRecords.setCurrent(pod)
|
||||
}
|
||||
|
||||
events := []*PodLifecycleEvent{}
|
||||
containers := make(map[string]containerInfo, len(g.containers))
|
||||
// Create a new containers map, compares container statuses, and generates
|
||||
// correspoinding events.
|
||||
for _, p := range pods {
|
||||
for _, c := range p.Containers {
|
||||
cid := c.ID.ID
|
||||
// Get the of existing container info. Defaults to state unknown.
|
||||
oldState := kubecontainer.ContainerStateUnknown
|
||||
if info, ok := g.containers[cid]; ok {
|
||||
oldState = info.state
|
||||
}
|
||||
// Generate an event if required.
|
||||
glog.V(7).Infof("GenericPLEG: %v/%v: %v -> %v", p.ID, cid, oldState, c.State)
|
||||
if e := generateEvent(p.ID, cid, oldState, c.State); e != nil {
|
||||
events = append(events, e)
|
||||
}
|
||||
// Write to the new cache.
|
||||
containers[cid] = containerInfo{podID: p.ID, state: c.State}
|
||||
// Compare the old and the current pods, and generate events.
|
||||
eventsByPodID := map[types.UID][]*PodLifecycleEvent{}
|
||||
for pid := range g.podRecords {
|
||||
oldPod := g.podRecords.getOld(pid)
|
||||
pod := g.podRecords.getCurrent(pid)
|
||||
// Get all containers in the old and the new pod.
|
||||
allContainers := getContainersFromPods(oldPod, pod)
|
||||
for _, container := range allContainers {
|
||||
e := computeEvent(oldPod, pod, &container.ID)
|
||||
updateEvents(eventsByPodID, e)
|
||||
}
|
||||
}
|
||||
|
||||
// Swap the container info cache. This is purely to avoid the need of
|
||||
// garbage collection.
|
||||
g.containers = containers
|
||||
// If there are events associated with a pod, we should update the
|
||||
// podCache.
|
||||
for pid, events := range eventsByPodID {
|
||||
pod := g.podRecords.getCurrent(pid)
|
||||
if g.cacheEnabled() {
|
||||
// updateCache() will inspect the pod and update the cache. If an
|
||||
// error occurs during the inspection, we want PLEG to retry again
|
||||
// in the next relist. To achieve this, we do not update the
|
||||
// associated podRecord of the pod, so that the change will be
|
||||
// detect again in the next relist.
|
||||
// TODO: If many pods changed during the same relist period,
|
||||
// inspecting the pod and getting the PodStatus to update the cache
|
||||
// serially may take a while. We should be aware of this and
|
||||
// parallelize if needed.
|
||||
if err := g.updateCache(pod, pid); err != nil {
|
||||
glog.Errorf("PLEG: Ignoring events for pod %s/%s: %v", pod.Name, pod.Namespace, err)
|
||||
continue
|
||||
}
|
||||
}
|
||||
// Update the internal storage and send out the events.
|
||||
g.podRecords.update(pid)
|
||||
for i := range events {
|
||||
g.eventChannel <- events[i]
|
||||
}
|
||||
}
|
||||
|
||||
// Send out the events.
|
||||
for i := range events {
|
||||
g.eventChannel <- events[i]
|
||||
if g.cacheEnabled() {
|
||||
// Update the cache timestamp. This needs to happen *after*
|
||||
// all pods have been properly updated in the cache.
|
||||
g.cache.UpdateTime(timestamp)
|
||||
}
|
||||
}
|
||||
|
||||
func getContainersFromPods(pods ...*kubecontainer.Pod) []*kubecontainer.Container {
|
||||
cidSet := sets.NewString()
|
||||
var containers []*kubecontainer.Container
|
||||
for _, p := range pods {
|
||||
if p == nil {
|
||||
continue
|
||||
}
|
||||
for _, c := range p.Containers {
|
||||
cid := string(c.ID.ID)
|
||||
if cidSet.Has(cid) {
|
||||
continue
|
||||
}
|
||||
cidSet.Insert(cid)
|
||||
containers = append(containers, c)
|
||||
}
|
||||
}
|
||||
return containers
|
||||
}
|
||||
|
||||
func computeEvent(oldPod, newPod *kubecontainer.Pod, cid *kubecontainer.ContainerID) *PodLifecycleEvent {
|
||||
var pid types.UID
|
||||
if oldPod != nil {
|
||||
pid = oldPod.ID
|
||||
} else if newPod != nil {
|
||||
pid = newPod.ID
|
||||
}
|
||||
oldState := getContainerState(oldPod, cid)
|
||||
newState := getContainerState(newPod, cid)
|
||||
return generateEvent(pid, cid.ID, oldState, newState)
|
||||
}
|
||||
|
||||
func (g *GenericPLEG) cacheEnabled() bool {
|
||||
return g.cache != nil
|
||||
}
|
||||
|
||||
func (g *GenericPLEG) updateCache(pod *kubecontainer.Pod, pid types.UID) error {
|
||||
if pod == nil {
|
||||
// The pod is missing in the current relist. This means that
|
||||
// the pod has no visible (active or inactive) containers.
|
||||
glog.V(8).Infof("PLEG: Delete status for pod %q", string(pid))
|
||||
g.cache.Delete(pid)
|
||||
return nil
|
||||
}
|
||||
timestamp := time.Now()
|
||||
// TODO: Consider adding a new runtime method
|
||||
// GetPodStatus(pod *kubecontainer.Pod) so that Docker can avoid listing
|
||||
// all containers again.
|
||||
status, err := g.runtime.GetPodStatus(pod.ID, pod.Name, pod.Namespace)
|
||||
glog.V(8).Infof("PLEG: Write status for %s/%s: %+v (err: %v)", pod.Name, pod.Namespace, status, err)
|
||||
g.cache.Set(pod.ID, status, err, timestamp)
|
||||
return err
|
||||
}
|
||||
|
||||
func updateEvents(eventsByPodID map[types.UID][]*PodLifecycleEvent, e *PodLifecycleEvent) {
|
||||
if e == nil {
|
||||
return
|
||||
}
|
||||
eventsByPodID[e.ID] = append(eventsByPodID[e.ID], e)
|
||||
}
|
||||
|
||||
func getContainerState(pod *kubecontainer.Pod, cid *kubecontainer.ContainerID) plegContainerState {
|
||||
// Default to the non-existent state.
|
||||
state := plegContainerNonExistent
|
||||
if pod == nil {
|
||||
return state
|
||||
}
|
||||
container := pod.FindContainerByID(*cid)
|
||||
if container == nil {
|
||||
return state
|
||||
}
|
||||
return convertState(container.State)
|
||||
}
|
||||
|
||||
func (pr podRecords) getOld(id types.UID) *kubecontainer.Pod {
|
||||
r, ok := pr[id]
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
return r.old
|
||||
}
|
||||
|
||||
func (pr podRecords) getCurrent(id types.UID) *kubecontainer.Pod {
|
||||
r, ok := pr[id]
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
return r.current
|
||||
}
|
||||
|
||||
func (pr podRecords) setCurrent(pod *kubecontainer.Pod) {
|
||||
if r, ok := pr[pod.ID]; ok {
|
||||
r.current = pod
|
||||
return
|
||||
}
|
||||
pr[pod.ID] = &podRecord{current: pod}
|
||||
}
|
||||
|
||||
func (pr podRecords) update(id types.UID) {
|
||||
r, ok := pr[id]
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
pr.updateInternal(id, r)
|
||||
}
|
||||
|
||||
func (pr podRecords) updateInternal(id types.UID, r *podRecord) {
|
||||
if r.current == nil {
|
||||
// Pod no longer exists; delete the entry.
|
||||
delete(pr, id)
|
||||
return
|
||||
}
|
||||
r.old = r.current
|
||||
r.current = nil
|
||||
}
|
||||
|
@ -17,12 +17,15 @@ limitations under the License.
|
||||
package pleg
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"reflect"
|
||||
"sort"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
||||
"k8s.io/kubernetes/pkg/types"
|
||||
"k8s.io/kubernetes/pkg/util"
|
||||
)
|
||||
|
||||
@ -43,7 +46,7 @@ func newTestGenericPLEG() *TestGenericPLEG {
|
||||
relistPeriod: time.Hour,
|
||||
runtime: fakeRuntime,
|
||||
eventChannel: make(chan *PodLifecycleEvent, 100),
|
||||
containers: make(map[string]containerInfo),
|
||||
podRecords: make(podRecords),
|
||||
}
|
||||
return &TestGenericPLEG{pleg: pleg, runtime: fakeRuntime}
|
||||
}
|
||||
@ -79,7 +82,7 @@ func verifyEvents(t *testing.T, expected, actual []*PodLifecycleEvent) {
|
||||
sort.Sort(sortableEvents(expected))
|
||||
sort.Sort(sortableEvents(actual))
|
||||
if !reflect.DeepEqual(expected, actual) {
|
||||
t.Errorf("Actual events differ from the expected; diff: %v", util.ObjectDiff(expected, actual))
|
||||
t.Errorf("Actual events differ from the expected; diff:\n %v", util.ObjectDiff(expected, actual))
|
||||
}
|
||||
}
|
||||
|
||||
@ -87,7 +90,6 @@ func TestRelisting(t *testing.T) {
|
||||
testPleg := newTestGenericPLEG()
|
||||
pleg, runtime := testPleg.pleg, testPleg.runtime
|
||||
ch := pleg.Watch()
|
||||
|
||||
// The first relist should send a PodSync event to each pod.
|
||||
runtime.AllPodList = []*kubecontainer.Pod{
|
||||
{
|
||||
@ -146,3 +148,172 @@ func TestRelisting(t *testing.T) {
|
||||
actual = getEventsFromChannel(ch)
|
||||
verifyEvents(t, expected, actual)
|
||||
}
|
||||
|
||||
func TestReportMissingContainers(t *testing.T) {
|
||||
testPleg := newTestGenericPLEG()
|
||||
pleg, runtime := testPleg.pleg, testPleg.runtime
|
||||
ch := pleg.Watch()
|
||||
runtime.AllPodList = []*kubecontainer.Pod{
|
||||
{
|
||||
ID: "1234",
|
||||
Containers: []*kubecontainer.Container{
|
||||
createTestContainer("c1", kubecontainer.ContainerStateRunning),
|
||||
createTestContainer("c2", kubecontainer.ContainerStateRunning),
|
||||
createTestContainer("c3", kubecontainer.ContainerStateExited),
|
||||
},
|
||||
},
|
||||
}
|
||||
// Drain the events from the channel
|
||||
pleg.relist()
|
||||
getEventsFromChannel(ch)
|
||||
|
||||
// Container c2 was stopped and removed between relists. We should report
|
||||
// the event. The exited container c3 was garbage collected (i.e., removed)
|
||||
// between relists. We should ignore that event.
|
||||
runtime.AllPodList = []*kubecontainer.Pod{
|
||||
{
|
||||
ID: "1234",
|
||||
Containers: []*kubecontainer.Container{
|
||||
createTestContainer("c1", kubecontainer.ContainerStateRunning),
|
||||
},
|
||||
},
|
||||
}
|
||||
pleg.relist()
|
||||
expected := []*PodLifecycleEvent{
|
||||
{ID: "1234", Type: ContainerDied, Data: "c2"},
|
||||
}
|
||||
actual := getEventsFromChannel(ch)
|
||||
verifyEvents(t, expected, actual)
|
||||
}
|
||||
|
||||
func TestReportMissingPods(t *testing.T) {
|
||||
testPleg := newTestGenericPLEG()
|
||||
pleg, runtime := testPleg.pleg, testPleg.runtime
|
||||
ch := pleg.Watch()
|
||||
runtime.AllPodList = []*kubecontainer.Pod{
|
||||
{
|
||||
ID: "1234",
|
||||
Containers: []*kubecontainer.Container{
|
||||
createTestContainer("c2", kubecontainer.ContainerStateRunning),
|
||||
},
|
||||
},
|
||||
}
|
||||
// Drain the events from the channel
|
||||
pleg.relist()
|
||||
getEventsFromChannel(ch)
|
||||
|
||||
// Container c2 was stopped and removed between relists. We should report
|
||||
// the event.
|
||||
runtime.AllPodList = []*kubecontainer.Pod{}
|
||||
pleg.relist()
|
||||
expected := []*PodLifecycleEvent{
|
||||
{ID: "1234", Type: ContainerDied, Data: "c2"},
|
||||
}
|
||||
actual := getEventsFromChannel(ch)
|
||||
verifyEvents(t, expected, actual)
|
||||
}
|
||||
|
||||
func newTestGenericPLEGWithRuntimeMock() (*GenericPLEG, *kubecontainer.Mock) {
|
||||
runtimeMock := &kubecontainer.Mock{}
|
||||
pleg := &GenericPLEG{
|
||||
relistPeriod: time.Hour,
|
||||
runtime: runtimeMock,
|
||||
eventChannel: make(chan *PodLifecycleEvent, 100),
|
||||
podRecords: make(podRecords),
|
||||
cache: kubecontainer.NewCache(),
|
||||
}
|
||||
return pleg, runtimeMock
|
||||
}
|
||||
|
||||
func createTestPodsStatusesAndEvents(num int) ([]*kubecontainer.Pod, []*kubecontainer.PodStatus, []*PodLifecycleEvent) {
|
||||
var pods []*kubecontainer.Pod
|
||||
var statuses []*kubecontainer.PodStatus
|
||||
var events []*PodLifecycleEvent
|
||||
for i := 0; i < num; i++ {
|
||||
id := types.UID(fmt.Sprintf("test-pod-%d", i))
|
||||
cState := kubecontainer.ContainerStateRunning
|
||||
container := createTestContainer(fmt.Sprintf("c%d", i), cState)
|
||||
pod := &kubecontainer.Pod{
|
||||
ID: id,
|
||||
Containers: []*kubecontainer.Container{container},
|
||||
}
|
||||
status := &kubecontainer.PodStatus{
|
||||
ID: id,
|
||||
ContainerStatuses: []*kubecontainer.ContainerStatus{{ID: container.ID, State: cState}},
|
||||
}
|
||||
event := &PodLifecycleEvent{ID: pod.ID, Type: ContainerStarted, Data: container.ID.ID}
|
||||
pods = append(pods, pod)
|
||||
statuses = append(statuses, status)
|
||||
events = append(events, event)
|
||||
|
||||
}
|
||||
return pods, statuses, events
|
||||
}
|
||||
|
||||
func TestRelistWithCache(t *testing.T) {
|
||||
pleg, runtimeMock := newTestGenericPLEGWithRuntimeMock()
|
||||
ch := pleg.Watch()
|
||||
|
||||
pods, statuses, events := createTestPodsStatusesAndEvents(2)
|
||||
runtimeMock.On("GetPods", true).Return(pods, nil)
|
||||
runtimeMock.On("GetPodStatus", pods[0].ID, "", "").Return(statuses[0], nil).Once()
|
||||
// Inject an error when querying runtime for the pod status for pods[1].
|
||||
statusErr := fmt.Errorf("unable to get status")
|
||||
runtimeMock.On("GetPodStatus", pods[1].ID, "", "").Return(&kubecontainer.PodStatus{}, statusErr).Once()
|
||||
|
||||
pleg.relist()
|
||||
actualEvents := getEventsFromChannel(ch)
|
||||
cases := []struct {
|
||||
pod *kubecontainer.Pod
|
||||
status *kubecontainer.PodStatus
|
||||
error error
|
||||
}{
|
||||
{pod: pods[0], status: statuses[0], error: nil},
|
||||
{pod: pods[1], status: &kubecontainer.PodStatus{}, error: statusErr},
|
||||
}
|
||||
for i, c := range cases {
|
||||
testStr := fmt.Sprintf("test[%d]", i)
|
||||
actualStatus, actualErr := pleg.cache.Get(c.pod.ID)
|
||||
assert.Equal(t, c.status, actualStatus, testStr)
|
||||
assert.Equal(t, c.error, actualErr, testStr)
|
||||
}
|
||||
// pleg should not generate any event for pods[1] because of the error.
|
||||
assert.Exactly(t, []*PodLifecycleEvent{events[0]}, actualEvents)
|
||||
|
||||
// Return normal status for pods[1].
|
||||
runtimeMock.On("GetPodStatus", pods[1].ID, "", "").Return(statuses[1], nil).Once()
|
||||
pleg.relist()
|
||||
actualEvents = getEventsFromChannel(ch)
|
||||
cases = []struct {
|
||||
pod *kubecontainer.Pod
|
||||
status *kubecontainer.PodStatus
|
||||
error error
|
||||
}{
|
||||
{pod: pods[0], status: statuses[0], error: nil},
|
||||
{pod: pods[1], status: statuses[1], error: nil},
|
||||
}
|
||||
for i, c := range cases {
|
||||
testStr := fmt.Sprintf("test[%d]", i)
|
||||
actualStatus, actualErr := pleg.cache.Get(c.pod.ID)
|
||||
assert.Equal(t, c.status, actualStatus, testStr)
|
||||
assert.Equal(t, c.error, actualErr, testStr)
|
||||
}
|
||||
// Now that we are able to query status for pods[1], pleg should generate an event.
|
||||
assert.Exactly(t, []*PodLifecycleEvent{events[1]}, actualEvents)
|
||||
}
|
||||
|
||||
func TestRemoveCacheEntry(t *testing.T) {
|
||||
pleg, runtimeMock := newTestGenericPLEGWithRuntimeMock()
|
||||
pods, statuses, _ := createTestPodsStatusesAndEvents(1)
|
||||
runtimeMock.On("GetPods", true).Return(pods, nil).Once()
|
||||
runtimeMock.On("GetPodStatus", pods[0].ID, "", "").Return(statuses[0], nil).Once()
|
||||
// Does a relist to populate the cache.
|
||||
pleg.relist()
|
||||
// Delete the pod from runtime. Verify that the cache entry has been
|
||||
// removed after relisting.
|
||||
runtimeMock.On("GetPods", true).Return([]*kubecontainer.Pod{}, nil).Once()
|
||||
pleg.relist()
|
||||
actualStatus, actualErr := pleg.cache.Get(pods[0].ID)
|
||||
assert.Equal(t, &kubecontainer.PodStatus{ID: pods[0].ID}, actualStatus)
|
||||
assert.Equal(t, nil, actualErr)
|
||||
}
|
||||
|
@ -29,8 +29,7 @@ import (
|
||||
type WorkQueue interface {
|
||||
// GetWork dequeues and returns all ready items.
|
||||
GetWork() []types.UID
|
||||
// Enqueue inserts a new item or overwrites an existing item with the
|
||||
// new timestamp (time.Now() + delay) if it is greater.
|
||||
// Enqueue inserts a new item or overwrites an existing item.
|
||||
Enqueue(item types.UID, delay time.Duration)
|
||||
}
|
||||
|
||||
@ -64,10 +63,5 @@ func (q *basicWorkQueue) GetWork() []types.UID {
|
||||
func (q *basicWorkQueue) Enqueue(item types.UID, delay time.Duration) {
|
||||
q.lock.Lock()
|
||||
defer q.lock.Unlock()
|
||||
now := q.clock.Now()
|
||||
timestamp := now.Add(delay)
|
||||
existing, ok := q.queue[item]
|
||||
if !ok || (ok && existing.Before(timestamp)) {
|
||||
q.queue[item] = timestamp
|
||||
}
|
||||
q.queue[item] = q.clock.Now().Add(delay)
|
||||
}
|
||||
|
@ -63,15 +63,3 @@ func TestGetWork(t *testing.T) {
|
||||
compareResults(t, expected, q.GetWork())
|
||||
compareResults(t, []types.UID{}, q.GetWork())
|
||||
}
|
||||
|
||||
func TestEnqueueKeepGreaterTimestamp(t *testing.T) {
|
||||
q, _ := newTestBasicWorkQueue()
|
||||
item := types.UID("foo")
|
||||
q.Enqueue(item, -7*time.Hour)
|
||||
q.Enqueue(item, 3*time.Hour)
|
||||
compareResults(t, []types.UID{}, q.GetWork())
|
||||
|
||||
q.Enqueue(item, 3*time.Hour)
|
||||
q.Enqueue(item, -7*time.Hour)
|
||||
compareResults(t, []types.UID{}, q.GetWork())
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user