From 3624b65f1cd5d89a14293321365ecb5439d87b08 Mon Sep 17 00:00:00 2001 From: Brendan Burns Date: Wed, 28 Jan 2015 22:15:23 -0800 Subject: [PATCH] Transform the podCache into a write-through cache. Don't always clear podInfo, instead occasionally garbage collect. --- pkg/master/master.go | 1 + pkg/master/pod_cache.go | 47 +++++++++++++++++++++---- pkg/master/pod_cache_test.go | 66 ++++++++++++++++++++++++++++++++---- 3 files changed, 102 insertions(+), 12 deletions(-) diff --git a/pkg/master/master.go b/pkg/master/master.go index cd2169c6e02..c086a118315 100644 --- a/pkg/master/master.go +++ b/pkg/master/master.go @@ -352,6 +352,7 @@ func (m *Master) init(c *Config) { m.podRegistry, ) go util.Forever(func() { podCache.UpdateAllContainers() }, time.Second*30) + go util.Forever(func() { podCache.GarbageCollectPodStatus() }, time.Minute*30) // TODO: Factor out the core API registration m.storage = map[string]apiserver.RESTStorage{ diff --git a/pkg/master/pod_cache.go b/pkg/master/pod_cache.go index 073dcb03e0d..fdab6d3c6b0 100644 --- a/pkg/master/pod_cache.go +++ b/pkg/master/pod_cache.go @@ -69,14 +69,38 @@ func NewPodCache(ipCache IPGetter, info client.PodInfoGetter, nodes client.NodeI // GetPodStatus gets the stored pod status. func (p *PodCache) GetPodStatus(namespace, name string) (*api.PodStatus, error) { + status := p.getPodStatusInternal(namespace, name) + if status != nil { + return status, nil + } + return p.updateCacheAndReturn(namespace, name) +} + +func (p *PodCache) updateCacheAndReturn(namespace, name string) (*api.PodStatus, error) { + pod, err := p.pods.GetPod(api.WithNamespace(api.NewContext(), namespace), name) + if err != nil { + return nil, err + } + if err := p.updatePodStatus(pod); err != nil { + return nil, err + } + status := p.getPodStatusInternal(namespace, name) + if status == nil { + glog.Warningf("nil status after successful update. that's odd... (%s %s)", namespace, name) + return nil, client.ErrPodInfoNotAvailable + } + return status, nil +} + +func (p *PodCache) getPodStatusInternal(namespace, name string) *api.PodStatus { p.lock.Lock() defer p.lock.Unlock() value, ok := p.podStatus[objKey{namespace, name}] if !ok { - return nil, client.ErrPodInfoNotAvailable + return nil } // Make a copy - return &value, nil + return &value } func (p *PodCache) ClearPodStatus(namespace, name string) { @@ -178,10 +202,23 @@ func (p *PodCache) computePodStatus(pod *api.Pod) (api.PodStatus, error) { return newStatus, err } -func (p *PodCache) resetNodeStatusCache() { +func (p *PodCache) GarbageCollectPodStatus() { + pods, err := p.pods.ListPods(api.NewContext(), labels.Everything()) + if err != nil { + glog.Errorf("Error getting pod list: %v", err) + } + keys := map[objKey]bool{} + for _, pod := range pods.Items { + keys[objKey{pod.Namespace, pod.Name}] = true + } p.lock.Lock() defer p.lock.Unlock() - p.currentNodes = map[objKey]api.NodeStatus{} + for key := range p.podStatus { + if _, found := keys[key]; !found { + glog.Infof("Deleting orphaned cache entry: %v", key) + delete(p.podStatus, key) + } + } } // UpdateAllContainers updates information about all containers. @@ -189,8 +226,6 @@ func (p *PodCache) resetNodeStatusCache() { // calling again, or risk having new info getting clobbered by delayed // old info. func (p *PodCache) UpdateAllContainers() { - p.resetNodeStatusCache() - ctx := api.NewContext() pods, err := p.pods.ListPods(ctx, labels.Everything()) if err != nil { diff --git a/pkg/master/pod_cache_test.go b/pkg/master/pod_cache_test.go index f6d84af4633..cbaf855179f 100644 --- a/pkg/master/pod_cache_test.go +++ b/pkg/master/pod_cache_test.go @@ -126,7 +126,10 @@ func TestPodCacheGet(t *testing.T) { } func TestPodCacheDelete(t *testing.T) { - cache := NewPodCache(nil, nil, nil, nil) + config := podCacheTestConfig{ + err: client.ErrPodInfoNotAvailable, + } + cache := config.Construct() expected := api.PodStatus{ Info: api.PodInfo{ @@ -156,14 +159,38 @@ func TestPodCacheDelete(t *testing.T) { } func TestPodCacheGetMissing(t *testing.T) { - cache := NewPodCache(nil, nil, nil, nil) + pod1 := makePod(api.NamespaceDefault, "foo", "machine", "bar") + config := podCacheTestConfig{ + ipFunc: func(host string) string { + if host == "machine" { + return "1.2.3.5" + } + return "" + }, + kubeletContainerInfo: api.PodStatus{ + Info: api.PodInfo{"bar": api.ContainerStatus{}}}, + nodes: []api.Node{*makeHealthyNode("machine")}, + pod: pod1, + } + cache := config.Construct() status, err := cache.GetPodStatus(api.NamespaceDefault, "foo") - if err == nil { - t.Errorf("Unexpected non-error: %+v", err) + if err != nil { + t.Errorf("Unexpected error: %+v", err) } - if status != nil { - t.Errorf("Unexpected status: %+v", status) + if status == nil { + t.Errorf("Unexpected non-status.") + } + expected := &api.PodStatus{ + Phase: "Pending", + Host: "machine", + HostIP: "1.2.3.5", + Info: api.PodInfo{ + "bar": api.ContainerStatus{}, + }, + } + if !reflect.DeepEqual(status, expected) { + t.Errorf("expected:\n%#v\ngot:\n%#v\n", expected, status) } } @@ -177,6 +204,8 @@ type podCacheTestConfig struct { ipFunc func(string) string // Construct will set a default if nil nodes []api.Node pods []api.Pod + pod *api.Pod + err error kubeletContainerInfo api.PodStatus // Construct will fill in these fields @@ -202,6 +231,8 @@ func (c *podCacheTestConfig) Construct() *PodCache { }, } c.fakePods = registrytest.NewPodRegistry(&api.PodList{Items: c.pods}) + c.fakePods.Pod = c.pod + c.fakePods.Err = c.err return NewPodCache( fakeIPCache(c.ipFunc), c.fakePodInfo, @@ -829,3 +860,26 @@ func TestPodPhaseWithRestartOnFailure(t *testing.T) { } } } + +func TestGarbageCollection(t *testing.T) { + pod1 := makePod(api.NamespaceDefault, "foo", "machine", "bar") + pod2 := makePod(api.NamespaceDefault, "baz", "machine", "qux") + config := podCacheTestConfig{ + pods: []api.Pod{*pod1, *pod2}, + } + cache := config.Construct() + + expected := api.PodStatus{ + Info: api.PodInfo{ + "extra": api.ContainerStatus{}, + }, + } + cache.podStatus[objKey{api.NamespaceDefault, "extra"}] = expected + + cache.GarbageCollectPodStatus() + + status, found := cache.podStatus[objKey{api.NamespaceDefault, "extra"}] + if found { + t.Errorf("unexpectedly found: %v for key %v", status, objKey{api.NamespaceDefault, "extra"}) + } +}