From 691623551324825d9043c849bfd46295e61115b6 Mon Sep 17 00:00:00 2001 From: Daniel Smith Date: Mon, 22 Dec 2014 14:11:41 -0800 Subject: [PATCH] Make locking simpler add test for node existence cache behavior --- pkg/master/pod_cache.go | 90 ++++++++++++++++++++++-------------- pkg/master/pod_cache_test.go | 72 +++++++++++++++++++++++------ 2 files changed, 115 insertions(+), 47 deletions(-) diff --git a/pkg/master/pod_cache.go b/pkg/master/pod_cache.go index 819bd8d708a..4da89469813 100644 --- a/pkg/master/pod_cache.go +++ b/pkg/master/pod_cache.go @@ -80,28 +80,35 @@ func (p *PodCache) GetPodStatus(namespace, name string) (*api.PodStatus, error) return &value, nil } -// lock must *not* be held -func (p *PodCache) nodeExists(name string) bool { +func (p *PodCache) nodeExistsInCache(name string) (exists, cacheHit bool) { p.lock.Lock() defer p.lock.Unlock() - exists, cacheHit := p.currentNodes[objKey{"", name}] + exists, cacheHit = p.currentNodes[objKey{"", name}] + return exists, cacheHit +} + +// lock must *not* be held +func (p *PodCache) nodeExists(name string) bool { + exists, cacheHit := p.nodeExistsInCache(name) if cacheHit { return exists } - // Don't block everyone while looking up this minion. - // Because this may require an RPC to our storage (e.g. etcd). - func() { - p.lock.Unlock() - defer p.lock.Lock() - _, err := p.nodes.Get(name) - exists = true - if err != nil { - exists = false - if !errors.IsNotFound(err) { - glog.Errorf("Unexpected error type verifying minion existence: %+v", err) - } + // TODO: suppose there's N concurrent requests for node "foo"; in that case + // it might be useful to block all of them and only look up "foo" once. + // (This code will make up to N lookups.) One way of doing that would be to + // have a pool of M mutexes and require that before looking up "foo" you must + // lock mutex hash("foo") % M. + _, err := p.nodes.Get(name) + exists = true + if err != nil { + exists = false + if !errors.IsNotFound(err) { + glog.Errorf("Unexpected error type verifying minion existence: %+v", err) } - }() + } + + p.lock.Lock() + defer p.lock.Unlock() p.currentNodes[objKey{"", name}] = exists return exists } @@ -109,23 +116,32 @@ func (p *PodCache) nodeExists(name string) bool { // TODO: once Host gets moved to spec, this can take a podSpec + metadata instead of an // entire pod? func (p *PodCache) updatePodStatus(pod *api.Pod) error { + newStatus, err := p.computePodStatus(pod) + + p.lock.Lock() + defer p.lock.Unlock() + // Map accesses must be locked. + p.podStatus[objKey{pod.Namespace, pod.Name}] = newStatus + + return err +} + +// computePodStatus always returns a new status, even if it also returns a non-nil error. +// TODO: once Host gets moved to spec, this can take a podSpec + metadata instead of an +// entire pod? +func (p *PodCache) computePodStatus(pod *api.Pod) (api.PodStatus, error) { newStatus := pod.Status + if pod.Status.Host == "" { - p.lock.Lock() - defer p.lock.Unlock() // Not assigned. newStatus.Phase = api.PodPending - p.podStatus[objKey{pod.Namespace, pod.Name}] = newStatus - return nil + return newStatus, nil } if !p.nodeExists(pod.Status.Host) { - p.lock.Lock() - defer p.lock.Unlock() // Assigned to non-existing node. newStatus.Phase = api.PodFailed - p.podStatus[objKey{pod.Namespace, pod.Name}] = newStatus - return nil + return newStatus, nil } info, err := p.containerInfo.GetPodInfo(pod.Status.Host, pod.Namespace, pod.Name) @@ -142,27 +158,33 @@ func (p *PodCache) updatePodStatus(pod *api.Pod) error { } } } + return newStatus, err +} + +func (p *PodCache) resetNodeExistenceCache() { p.lock.Lock() defer p.lock.Unlock() - p.podStatus[objKey{pod.Namespace, pod.Name}] = newStatus - return err + p.currentNodes = map[objKey]bool{} } // UpdateAllContainers updates information about all containers. +// Callers should let one call to UpdateAllContainers finish before +// calling again, or risk having new info getting clobbered by delayed +// old info. func (p *PodCache) UpdateAllContainers() { - func() { - // Reset which nodes we think exist - p.lock.Lock() - defer p.lock.Unlock() - p.currentNodes = map[objKey]bool{} - }() + p.resetNodeExistenceCache() ctx := api.NewContext() pods, err := p.pods.ListPods(ctx, labels.Everything()) if err != nil { - glog.Errorf("Error synchronizing container list: %v", err) + glog.Errorf("Error getting pod list: %v", err) return } + + // TODO: this algorithm is 1 goroutine & RPC per pod. With a little work, + // it should be possible to make it 1 per *node*, which will be important + // at very large scales. (To be clear, the goroutines shouldn't matter-- + // it's the RPCs that need to be minimized.) var wg sync.WaitGroup for i := range pods.Items { pod := &pods.Items[i] @@ -171,7 +193,7 @@ func (p *PodCache) UpdateAllContainers() { defer wg.Done() err := p.updatePodStatus(pod) if err != nil && err != client.ErrPodInfoNotAvailable { - glog.Errorf("Error synchronizing container: %v", err) + glog.Errorf("Error getting info for pod %v/%v: %v", pod.Namespace, pod.Name, err) } }() } diff --git a/pkg/master/pod_cache_test.go b/pkg/master/pod_cache_test.go index 341c3aece81..09c09a9e397 100644 --- a/pkg/master/pod_cache_test.go +++ b/pkg/master/pod_cache_test.go @@ -18,6 +18,7 @@ package master import ( "reflect" + "sync" "testing" "time" @@ -27,19 +28,48 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/util" ) -type FakePodInfoGetter struct { +type podInfoCall struct { host string - id string namespace string - data api.PodContainerInfo - err error + name string } -func (f *FakePodInfoGetter) GetPodInfo(host, namespace, id string) (api.PodContainerInfo, error) { - f.host = host - f.id = id - f.namespace = namespace - return f.data, f.err +type podInfoResponse struct { + useCount int + data api.PodContainerInfo + err error +} + +type podInfoCalls map[podInfoCall]*podInfoResponse + +type FakePodInfoGetter struct { + calls podInfoCalls + lock sync.Mutex + + // default data/error to return, or you can add + // responses to specific calls-- that will take precedence. + data api.PodContainerInfo + err error +} + +func (f *FakePodInfoGetter) GetPodInfo(host, namespace, name string) (api.PodContainerInfo, error) { + f.lock.Lock() + defer f.lock.Unlock() + + if f.calls == nil { + f.calls = podInfoCalls{} + } + + key := podInfoCall{host, namespace, name} + call, ok := f.calls[key] + if !ok { + f.calls[key] = &podInfoResponse{ + 0, f.data, f.err, + } + call = f.calls[key] + } + call.useCount++ + return call.data, call.err } func TestPodCacheGetDifferentNamespace(t *testing.T) { @@ -171,6 +201,7 @@ func makeNode(name string) *api.Node { func TestPodUpdateAllContainers(t *testing.T) { pod := makePod(api.NamespaceDefault, "foo", "machine", "bar") + pod2 := makePod(api.NamespaceDefault, "baz", "machine", "qux") config := podCacheTestConfig{ ipFunc: func(host string) string { if host == "machine" { @@ -180,15 +211,22 @@ func TestPodUpdateAllContainers(t *testing.T) { }, kubeletContainerInfo: api.PodInfo{"bar": api.ContainerStatus{}}, nodes: []api.Node{*makeNode("machine")}, - pods: []api.Pod{*pod}, + pods: []api.Pod{*pod, *pod2}, } cache := config.Construct() cache.UpdateAllContainers() - fake := config.fakePodInfo - if fake.host != "machine" || fake.id != "foo" || fake.namespace != api.NamespaceDefault { - t.Errorf("Unexpected access: %+v", fake) + call1 := config.fakePodInfo.calls[podInfoCall{"machine", api.NamespaceDefault, "foo"}] + call2 := config.fakePodInfo.calls[podInfoCall{"machine", api.NamespaceDefault, "baz"}] + if call1 == nil || call1.useCount != 1 { + t.Errorf("Expected 1 call for 'foo': %+v", config.fakePodInfo.calls) + } + if call2 == nil || call2.useCount != 1 { + t.Errorf("Expected 1 call for 'baz': %+v", config.fakePodInfo.calls) + } + if len(config.fakePodInfo.calls) != 2 { + t.Errorf("Expected 2 calls: %+v", config.fakePodInfo.calls) } status, err := cache.GetPodStatus(api.NamespaceDefault, "foo") @@ -201,6 +239,14 @@ func TestPodUpdateAllContainers(t *testing.T) { if e, a := "1.2.3.5", status.HostIP; e != a { t.Errorf("Unexpected mismatch. Expected: %+v, Got: %+v", e, a) } + + if e, a := 1, len(config.fakeNodes.Actions); e != a { + t.Errorf("Expected: %v, Got: %v; %+v", e, a, config.fakeNodes.Actions) + } else { + if e, a := "get-minion", config.fakeNodes.Actions[0].Action; e != a { + t.Errorf("Expected: %v, Got: %v; %+v", e, a, config.fakeNodes.Actions) + } + } } func TestFillPodStatusNoHost(t *testing.T) {