From 5b8e91595a472839e1a8cf209eecfb1f48911c59 Mon Sep 17 00:00:00 2001 From: Daniel Smith Date: Sat, 20 Dec 2014 18:49:10 -0800 Subject: [PATCH 1/4] Make pod listing costant-time * move ip cache out of registry/pod * combine, rationalize, and move pod status logic * Fix unit and integration tests --- cmd/integration/integration.go | 1 + pkg/master/ip_cache.go | 98 +++++ pkg/master/ip_cache_test.go | 50 +++ pkg/master/master.go | 24 +- pkg/master/pod_cache.go | 218 +++++++++-- pkg/master/pod_cache_test.go | 669 +++++++++++++++++++++++++++++---- pkg/registry/pod/rest.go | 233 ++---------- pkg/registry/pod/rest_test.go | 636 +++++-------------------------- 8 files changed, 1057 insertions(+), 872 deletions(-) create mode 100644 pkg/master/ip_cache.go create mode 100644 pkg/master/ip_cache_test.go diff --git a/cmd/integration/integration.go b/cmd/integration/integration.go index c539c73c649..39744e3dd17 100644 --- a/cmd/integration/integration.go +++ b/cmd/integration/integration.go @@ -63,6 +63,7 @@ var ( type fakeKubeletClient struct{} func (fakeKubeletClient) GetPodInfo(host, podNamespace, podID string) (api.PodContainerInfo, error) { + glog.V(3).Infof("Trying to get container info for %v/%v/%v", host, podNamespace, podID) // This is a horrible hack to get around the fact that we can't provide // different port numbers per kubelet... var c client.PodInfoGetter diff --git a/pkg/master/ip_cache.go b/pkg/master/ip_cache.go new file mode 100644 index 00000000000..02d7be71d10 --- /dev/null +++ b/pkg/master/ip_cache.go @@ -0,0 +1,98 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package master + +import ( + "sync" + "time" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider" + + "github.com/golang/glog" +) + +type ipCacheEntry struct { + ip string + lastUpdate time.Time +} + +type ipCache struct { + clock Clock + cloudProvider cloudprovider.Interface + cache map[string]ipCacheEntry + lock sync.Mutex +} + +// NewIPCache makes a new ip caching layer, which will get IP addresses from cp, +// and use clock for deciding when to re-get an IP address. +// Thread-safe. +func NewIPCache(cp cloudprovider.Interface, clock Clock) *ipCache { + return &ipCache{ + clock: clock, + cloudProvider: cp, + cache: map[string]ipCacheEntry{}, + } +} + +// Clock allows for injecting fake or real clocks into +// the cache. +type Clock interface { + Now() time.Time +} + +// RealClock really calls time.Now() +type RealClock struct{} + +// Now returns the current time. +func (r RealClock) Now() time.Time { + return time.Now() +} + +// GetInstanceIP returns the IP address of host, from the cache +// if possible, otherwise it asks the cloud provider. +func (c *ipCache) GetInstanceIP(host string) string { + c.lock.Lock() + defer c.lock.Unlock() + data, ok := c.cache[host] + now := c.clock.Now() + + if !ok || now.Sub(data.lastUpdate) > (30*time.Second) { + ip := getInstanceIPFromCloud(c.cloudProvider, host) + data = ipCacheEntry{ + ip: ip, + lastUpdate: now, + } + c.cache[host] = data + } + return data.ip +} + +func getInstanceIPFromCloud(cloud cloudprovider.Interface, host string) string { + if cloud == nil { + return "" + } + instances, ok := cloud.Instances() + if instances == nil || !ok { + return "" + } + addr, err := instances.IPAddress(host) + if err != nil { + glog.Errorf("Error getting instance IP for %q: %v", host, err) + return "" + } + return addr.String() +} diff --git a/pkg/master/ip_cache_test.go b/pkg/master/ip_cache_test.go new file mode 100644 index 00000000000..dac9c33d86e --- /dev/null +++ b/pkg/master/ip_cache_test.go @@ -0,0 +1,50 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package master + +import ( + "testing" + "time" + + fake_cloud "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/fake" +) + +type fakeClock struct { + t time.Time +} + +func (f *fakeClock) Now() time.Time { + return f.t +} + +func TestCacheExpire(t *testing.T) { + fakeCloud := &fake_cloud.FakeCloud{} + clock := &fakeClock{t: time.Now()} + + c := NewIPCache(fakeCloud, clock) + + _ = c.GetInstanceIP("foo") + // This call should hit the cache, so we expect no additional calls to the cloud + _ = c.GetInstanceIP("foo") + // Advance the clock, this call should miss the cache, so expect one more call. + clock.t = clock.t.Add(60 * time.Second) + _ = c.GetInstanceIP("foo") + + if len(fakeCloud.Calls) != 2 || fakeCloud.Calls[1] != "ip-address" || fakeCloud.Calls[0] != "ip-address" { + t.Errorf("Unexpected calls: %+v", fakeCloud.Calls) + } +} diff --git a/pkg/master/master.go b/pkg/master/master.go index 814d4a6933f..1b18d12f7ad 100644 --- a/pkg/master/master.go +++ b/pkg/master/master.go @@ -319,28 +319,24 @@ func makeMinionRegistry(c *Config) minion.Registry { // init initializes master. func (m *Master) init(c *Config) { - podCache := NewPodCache(c.KubeletClient, m.podRegistry) - go util.Forever(func() { podCache.UpdateAllContainers() }, time.Second*30) - var userContexts = handlers.NewUserRequestContext() var authenticator = c.Authenticator nodeRESTStorage := minion.NewREST(m.minionRegistry) + ipCache := NewIPCache(c.Cloud, RealClock{}) + podCache := NewPodCache( + ipCache, + c.KubeletClient, + RESTStorageToNodes(nodeRESTStorage).Nodes(), + m.podRegistry, + ) + go util.Forever(func() { podCache.UpdateAllContainers() }, time.Second*30) // TODO: Factor out the core API registration m.storage = map[string]apiserver.RESTStorage{ "pods": pod.NewREST(&pod.RESTConfig{ - CloudProvider: c.Cloud, - PodCache: podCache, - PodInfoGetter: c.KubeletClient, - Registry: m.podRegistry, - // Note: this allows the pod rest object to directly call - // the node rest object without going through the network & - // apiserver. This arrangement should be temporary, nodes - // shouldn't really need this at all. Once we add more auth in, - // we need to consider carefully if this sort of shortcut is a - // good idea. - Nodes: RESTStorageToNodes(nodeRESTStorage).Nodes(), + PodCache: podCache, + Registry: m.podRegistry, }), "replicationControllers": controller.NewREST(m.controllerRegistry, m.podRegistry), "services": service.NewREST(m.serviceRegistry, c.Cloud, m.minionRegistry, m.portalNet), diff --git a/pkg/master/pod_cache.go b/pkg/master/pod_cache.go index c592a4ae5ab..819bd8d708a 100644 --- a/pkg/master/pod_cache.go +++ b/pkg/master/pod_cache.go @@ -20,6 +20,7 @@ import ( "sync" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" "github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/pod" @@ -27,69 +28,216 @@ import ( "github.com/golang/glog" ) +type IPGetter interface { + GetInstanceIP(host string) (ip string) +} + // PodCache contains both a cache of container information, as well as the mechanism for keeping // that cache up to date. type PodCache struct { + ipCache IPGetter containerInfo client.PodInfoGetter pods pod.Registry - // This is a map of pod id to a map of container name to the - podInfo map[string]api.PodContainerInfo - podLock sync.Mutex + // For confirming existance of a node + nodes client.NodeInterface + + // lock protects access to all fields below + lock sync.Mutex + // cached pod statuses. + podStatus map[objKey]api.PodStatus + // nodes that we know exist. Cleared at the beginning of each + // UpdateAllPods call. + currentNodes map[objKey]bool } -// NewPodCache returns a new PodCache which watches container information registered in the given PodRegistry. -func NewPodCache(info client.PodInfoGetter, pods pod.Registry) *PodCache { +type objKey struct { + namespace, name string +} + +// NewPodCache returns a new PodCache which watches container information +// registered in the given PodRegistry. +// TODO(lavalamp): pods should be a client.PodInterface. +func NewPodCache(ipCache IPGetter, info client.PodInfoGetter, nodes client.NodeInterface, pods pod.Registry) *PodCache { return &PodCache{ + ipCache: ipCache, containerInfo: info, pods: pods, - podInfo: map[string]api.PodContainerInfo{}, + nodes: nodes, + currentNodes: map[objKey]bool{}, + podStatus: map[objKey]api.PodStatus{}, } } -// makePodCacheKey constructs a key for use in a map to address a pod with specified namespace and id -func makePodCacheKey(podNamespace, podID string) string { - return podNamespace + "." + podID -} - -// GetPodInfo implements the PodInfoGetter.GetPodInfo. -// The returned value should be treated as read-only. -// TODO: Remove the host from this call, it's totally unnecessary. -func (p *PodCache) GetPodInfo(host, podNamespace, podID string) (api.PodContainerInfo, error) { - p.podLock.Lock() - defer p.podLock.Unlock() - value, ok := p.podInfo[makePodCacheKey(podNamespace, podID)] +// GetPodStatus gets the stored pod status. +func (p *PodCache) GetPodStatus(namespace, name string) (*api.PodStatus, error) { + p.lock.Lock() + defer p.lock.Unlock() + value, ok := p.podStatus[objKey{namespace, name}] if !ok { - return api.PodContainerInfo{}, client.ErrPodInfoNotAvailable + return nil, client.ErrPodInfoNotAvailable } - return value, nil + // Make a copy + return &value, nil } -func (p *PodCache) updatePodInfo(host, podNamespace, podID string) error { - info, err := p.containerInfo.GetPodInfo(host, podNamespace, podID) +// lock must *not* be held +func (p *PodCache) nodeExists(name string) bool { + p.lock.Lock() + defer p.lock.Unlock() + exists, cacheHit := p.currentNodes[objKey{"", name}] + if cacheHit { + return exists + } + // Don't block everyone while looking up this minion. + // Because this may require an RPC to our storage (e.g. etcd). + func() { + p.lock.Unlock() + defer p.lock.Lock() + _, err := p.nodes.Get(name) + exists = true + if err != nil { + exists = false + if !errors.IsNotFound(err) { + glog.Errorf("Unexpected error type verifying minion existence: %+v", err) + } + } + }() + p.currentNodes[objKey{"", name}] = exists + return exists +} + +// TODO: once Host gets moved to spec, this can take a podSpec + metadata instead of an +// entire pod? +func (p *PodCache) updatePodStatus(pod *api.Pod) error { + newStatus := pod.Status + if pod.Status.Host == "" { + p.lock.Lock() + defer p.lock.Unlock() + // Not assigned. + newStatus.Phase = api.PodPending + p.podStatus[objKey{pod.Namespace, pod.Name}] = newStatus + return nil + } + + if !p.nodeExists(pod.Status.Host) { + p.lock.Lock() + defer p.lock.Unlock() + // Assigned to non-existing node. + newStatus.Phase = api.PodFailed + p.podStatus[objKey{pod.Namespace, pod.Name}] = newStatus + return nil + } + + info, err := p.containerInfo.GetPodInfo(pod.Status.Host, pod.Namespace, pod.Name) + newStatus.HostIP = p.ipCache.GetInstanceIP(pod.Status.Host) + if err != nil { - return err + newStatus.Phase = api.PodUnknown + } else { + newStatus.Info = info.ContainerInfo + newStatus.Phase = getPhase(&pod.Spec, newStatus.Info) + if netContainerInfo, ok := newStatus.Info["net"]; ok { + if netContainerInfo.PodIP != "" { + newStatus.PodIP = netContainerInfo.PodIP + } + } } - p.podLock.Lock() - defer p.podLock.Unlock() - p.podInfo[makePodCacheKey(podNamespace, podID)] = info - return nil + p.lock.Lock() + defer p.lock.Unlock() + p.podStatus[objKey{pod.Namespace, pod.Name}] = newStatus + return err } -// UpdateAllContainers updates information about all containers. Either called by Loop() below, or one-off. +// UpdateAllContainers updates information about all containers. func (p *PodCache) UpdateAllContainers() { + func() { + // Reset which nodes we think exist + p.lock.Lock() + defer p.lock.Unlock() + p.currentNodes = map[objKey]bool{} + }() + ctx := api.NewContext() pods, err := p.pods.ListPods(ctx, labels.Everything()) if err != nil { glog.Errorf("Error synchronizing container list: %v", err) return } - for _, pod := range pods.Items { - if pod.Status.Host == "" { - continue - } - err := p.updatePodInfo(pod.Status.Host, pod.Namespace, pod.Name) - if err != nil && err != client.ErrPodInfoNotAvailable { - glog.Errorf("Error synchronizing container: %v", err) + var wg sync.WaitGroup + for i := range pods.Items { + pod := &pods.Items[i] + wg.Add(1) + go func() { + defer wg.Done() + err := p.updatePodStatus(pod) + if err != nil && err != client.ErrPodInfoNotAvailable { + glog.Errorf("Error synchronizing container: %v", err) + } + }() + } + wg.Wait() +} + +// getPhase returns the phase of a pod given its container info. +// TODO(dchen1107): push this all the way down into kubelet. +func getPhase(spec *api.PodSpec, info api.PodInfo) api.PodPhase { + if info == nil { + return api.PodPending + } + running := 0 + waiting := 0 + stopped := 0 + failed := 0 + succeeded := 0 + unknown := 0 + for _, container := range spec.Containers { + if containerStatus, ok := info[container.Name]; ok { + if containerStatus.State.Running != nil { + running++ + } else if containerStatus.State.Termination != nil { + stopped++ + if containerStatus.State.Termination.ExitCode == 0 { + succeeded++ + } else { + failed++ + } + } else if containerStatus.State.Waiting != nil { + waiting++ + } else { + unknown++ + } + } else { + unknown++ } } + switch { + case waiting > 0: + // One or more containers has not been started + return api.PodPending + case running > 0 && unknown == 0: + // All containers have been started, and at least + // one container is running + return api.PodRunning + case running == 0 && stopped > 0 && unknown == 0: + // All containers are terminated + if spec.RestartPolicy.Always != nil { + // All containers are in the process of restarting + return api.PodRunning + } + if stopped == succeeded { + // RestartPolicy is not Always, and all + // containers are terminated in success + return api.PodSucceeded + } + if spec.RestartPolicy.Never != nil { + // RestartPolicy is Never, and all containers are + // terminated with at least one in failure + return api.PodFailed + } + // RestartPolicy is OnFailure, and at least one in failure + // and in the process of restarting + return api.PodRunning + default: + return api.PodPending + } } diff --git a/pkg/master/pod_cache_test.go b/pkg/master/pod_cache_test.go index 74dd57672e0..341c3aece81 100644 --- a/pkg/master/pod_cache_test.go +++ b/pkg/master/pod_cache_test.go @@ -19,9 +19,12 @@ package master import ( "reflect" "testing" + "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/registrytest" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" ) type FakePodInfoGetter struct { @@ -40,128 +43,660 @@ func (f *FakePodInfoGetter) GetPodInfo(host, namespace, id string) (api.PodConta } func TestPodCacheGetDifferentNamespace(t *testing.T) { - cache := NewPodCache(nil, nil) + cache := NewPodCache(nil, nil, nil, nil) - expectedDefault := api.PodContainerInfo{ - ContainerInfo: api.PodInfo{ + expectedDefault := api.PodStatus{ + Info: api.PodInfo{ "foo": api.ContainerStatus{}, }, } - expectedOther := api.PodContainerInfo{ - ContainerInfo: api.PodInfo{ + expectedOther := api.PodStatus{ + Info: api.PodInfo{ "bar": api.ContainerStatus{}, }, } - cache.podInfo[makePodCacheKey(api.NamespaceDefault, "foo")] = expectedDefault - cache.podInfo[makePodCacheKey("other", "foo")] = expectedOther + cache.podStatus[objKey{api.NamespaceDefault, "foo"}] = expectedDefault + cache.podStatus[objKey{"other", "foo"}] = expectedOther - info, err := cache.GetPodInfo("host", api.NamespaceDefault, "foo") + info, err := cache.GetPodStatus(api.NamespaceDefault, "foo") if err != nil { - t.Errorf("Unexpected error: %#v", err) + t.Errorf("Unexpected error: %+v", err) } - if !reflect.DeepEqual(info, expectedDefault) { - t.Errorf("Unexpected mismatch. Expected: %#v, Got: #%v", &expectedOther, info) + if !reflect.DeepEqual(info, &expectedDefault) { + t.Errorf("Unexpected mismatch. Expected: %+v, Got: %+v", &expectedOther, info) } - info, err = cache.GetPodInfo("host", "other", "foo") + info, err = cache.GetPodStatus("other", "foo") if err != nil { - t.Errorf("Unexpected error: %#v", err) + t.Errorf("Unexpected error: %+v", err) } - if !reflect.DeepEqual(info, expectedOther) { - t.Errorf("Unexpected mismatch. Expected: %#v, Got: #%v", &expectedOther, info) + if !reflect.DeepEqual(info, &expectedOther) { + t.Errorf("Unexpected mismatch. Expected: %+v, Got: %+v", &expectedOther, info) } } func TestPodCacheGet(t *testing.T) { - cache := NewPodCache(nil, nil) + cache := NewPodCache(nil, nil, nil, nil) - expected := api.PodContainerInfo{ - ContainerInfo: api.PodInfo{ + expected := api.PodStatus{ + Info: api.PodInfo{ "foo": api.ContainerStatus{}, }, } - cache.podInfo[makePodCacheKey(api.NamespaceDefault, "foo")] = expected + cache.podStatus[objKey{api.NamespaceDefault, "foo"}] = expected - info, err := cache.GetPodInfo("host", api.NamespaceDefault, "foo") + info, err := cache.GetPodStatus(api.NamespaceDefault, "foo") if err != nil { - t.Errorf("Unexpected error: %#v", err) + t.Errorf("Unexpected error: %+v", err) } - if !reflect.DeepEqual(info, expected) { - t.Errorf("Unexpected mismatch. Expected: %#v, Got: #%v", &expected, info) + if !reflect.DeepEqual(info, &expected) { + t.Errorf("Unexpected mismatch. Expected: %+v, Got: %+v", &expected, info) } } func TestPodCacheGetMissing(t *testing.T) { - cache := NewPodCache(nil, nil) + cache := NewPodCache(nil, nil, nil, nil) - info, err := cache.GetPodInfo("host", api.NamespaceDefault, "foo") + status, err := cache.GetPodStatus(api.NamespaceDefault, "foo") if err == nil { - t.Errorf("Unexpected non-error: %#v", err) + t.Errorf("Unexpected non-error: %+v", err) } - if !reflect.DeepEqual(info, api.PodContainerInfo{}) { - t.Errorf("Unexpected info: %#v", info) + if status != nil { + t.Errorf("Unexpected status: %+v", status) } } -func TestPodGetPodInfoGetter(t *testing.T) { - expected := api.PodContainerInfo{ - ContainerInfo: api.PodInfo{ - "foo": api.ContainerStatus{}, +type fakeIPCache func(string) string + +func (f fakeIPCache) GetInstanceIP(host string) (ip string) { + return f(host) +} + +type podCacheTestConfig struct { + ipFunc func(string) string // Construct will set a default if nil + nodes []api.Node + pods []api.Pod + kubeletContainerInfo api.PodInfo + + // Construct will fill in these fields + fakePodInfo *FakePodInfoGetter + fakeNodes *client.Fake + fakePods *registrytest.PodRegistry +} + +func (c *podCacheTestConfig) Construct() *PodCache { + if c.ipFunc == nil { + c.ipFunc = func(host string) string { + return "ip of " + host + } + } + c.fakePodInfo = &FakePodInfoGetter{ + data: api.PodContainerInfo{ + ContainerInfo: c.kubeletContainerInfo, }, } - fake := FakePodInfoGetter{ - data: expected, + c.fakeNodes = &client.Fake{ + MinionsList: api.NodeList{ + Items: c.nodes, + }, } - cache := NewPodCache(&fake, nil) + c.fakePods = registrytest.NewPodRegistry(&api.PodList{Items: c.pods}) + return NewPodCache( + fakeIPCache(c.ipFunc), + c.fakePodInfo, + c.fakeNodes.Nodes(), + c.fakePods, + ) +} - cache.updatePodInfo("host", api.NamespaceDefault, "foo") - - if fake.host != "host" || fake.id != "foo" || fake.namespace != api.NamespaceDefault { - t.Errorf("Unexpected access: %#v", fake) +func makePod(namespace, name, host string, containers ...string) *api.Pod { + pod := &api.Pod{ + ObjectMeta: api.ObjectMeta{Namespace: namespace, Name: name}, + Status: api.PodStatus{Host: host}, } - - info, err := cache.GetPodInfo("host", api.NamespaceDefault, "foo") - if err != nil { - t.Errorf("Unexpected error: %#v", err) + for _, c := range containers { + pod.Spec.Containers = append(pod.Spec.Containers, api.Container{ + Name: c, + }) } - if !reflect.DeepEqual(info, expected) { - t.Errorf("Unexpected mismatch. Expected: %#v, Got: #%v", &expected, info) + return pod +} + +func makeNode(name string) *api.Node { + return &api.Node{ + ObjectMeta: api.ObjectMeta{Name: name}, } } func TestPodUpdateAllContainers(t *testing.T) { - pod := api.Pod{ - ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: api.NamespaceDefault}, - Status: api.PodStatus{ - Host: "machine", + pod := makePod(api.NamespaceDefault, "foo", "machine", "bar") + config := podCacheTestConfig{ + ipFunc: func(host string) string { + if host == "machine" { + return "1.2.3.5" + } + return "" }, + kubeletContainerInfo: api.PodInfo{"bar": api.ContainerStatus{}}, + nodes: []api.Node{*makeNode("machine")}, + pods: []api.Pod{*pod}, } - - pods := []api.Pod{pod} - mockRegistry := registrytest.NewPodRegistry(&api.PodList{Items: pods}) - - expected := api.PodContainerInfo{ - ContainerInfo: api.PodInfo{ - "foo": api.ContainerStatus{}, - }, - } - fake := FakePodInfoGetter{ - data: expected, - } - cache := NewPodCache(&fake, mockRegistry) + cache := config.Construct() cache.UpdateAllContainers() + fake := config.fakePodInfo if fake.host != "machine" || fake.id != "foo" || fake.namespace != api.NamespaceDefault { - t.Errorf("Unexpected access: %#v", fake) + t.Errorf("Unexpected access: %+v", fake) } - info, err := cache.GetPodInfo("machine", api.NamespaceDefault, "foo") + status, err := cache.GetPodStatus(api.NamespaceDefault, "foo") if err != nil { - t.Errorf("Unexpected error: %#v", err) + t.Fatalf("Unexpected error: %+v", err) } - if !reflect.DeepEqual(info, expected) { - t.Errorf("Unexpected mismatch. Expected: %#v, Got: #%v", &expected, info) + if e, a := config.kubeletContainerInfo, status.Info; !reflect.DeepEqual(e, a) { + t.Errorf("Unexpected mismatch. Expected: %+v, Got: %+v", e, a) + } + if e, a := "1.2.3.5", status.HostIP; e != a { + t.Errorf("Unexpected mismatch. Expected: %+v, Got: %+v", e, a) + } +} + +func TestFillPodStatusNoHost(t *testing.T) { + pod := makePod(api.NamespaceDefault, "foo", "", "bar") + config := podCacheTestConfig{ + kubeletContainerInfo: api.PodInfo{}, + nodes: []api.Node{*makeNode("machine")}, + pods: []api.Pod{*pod}, + } + cache := config.Construct() + err := cache.updatePodStatus(&config.pods[0]) + if err != nil { + t.Fatalf("Unexpected error: %+v", err) + } + + status, err := cache.GetPodStatus(pod.Namespace, pod.Name) + if e, a := api.PodPending, status.Phase; e != a { + t.Errorf("Expected: %+v, Got %+v", e, a) + } +} + +func TestFillPodStatusMissingMachine(t *testing.T) { + pod := makePod(api.NamespaceDefault, "foo", "machine", "bar") + config := podCacheTestConfig{ + kubeletContainerInfo: api.PodInfo{}, + nodes: []api.Node{}, + pods: []api.Pod{*pod}, + } + cache := config.Construct() + err := cache.updatePodStatus(&config.pods[0]) + if err != nil { + t.Fatalf("Unexpected error: %+v", err) + } + + status, err := cache.GetPodStatus(pod.Namespace, pod.Name) + if e, a := api.PodFailed, status.Phase; e != a { + t.Errorf("Expected: %+v, Got %+v", e, a) + } +} + +func TestFillPodStatus(t *testing.T) { + pod := makePod(api.NamespaceDefault, "foo", "machine", "bar") + expectedIP := "1.2.3.4" + expectedTime, _ := time.Parse("2013-Feb-03", "2013-Feb-03") + config := podCacheTestConfig{ + kubeletContainerInfo: api.PodInfo{ + "net": { + State: api.ContainerState{ + Running: &api.ContainerStateRunning{ + StartedAt: util.NewTime(expectedTime), + }, + }, + RestartCount: 1, + PodIP: expectedIP, + }, + }, + nodes: []api.Node{*makeNode("machine")}, + pods: []api.Pod{*pod}, + } + cache := config.Construct() + err := cache.updatePodStatus(&config.pods[0]) + if err != nil { + t.Fatalf("Unexpected error: %+v", err) + } + + status, err := cache.GetPodStatus(pod.Namespace, pod.Name) + if e, a := config.kubeletContainerInfo, status.Info; !reflect.DeepEqual(e, a) { + t.Errorf("Expected: %+v, Got %+v", e, a) + } + if status.PodIP != expectedIP { + t.Errorf("Expected %s, Got %s\n%+v", expectedIP, status.PodIP, status) + } +} + +func TestFillPodInfoNoData(t *testing.T) { + pod := makePod(api.NamespaceDefault, "foo", "machine", "bar") + expectedIP := "" + config := podCacheTestConfig{ + kubeletContainerInfo: api.PodInfo{ + "net": {}, + }, + nodes: []api.Node{*makeNode("machine")}, + pods: []api.Pod{*pod}, + } + cache := config.Construct() + err := cache.updatePodStatus(&config.pods[0]) + if err != nil { + t.Fatalf("Unexpected error: %+v", err) + } + + status, err := cache.GetPodStatus(pod.Namespace, pod.Name) + if e, a := config.kubeletContainerInfo, status.Info; !reflect.DeepEqual(e, a) { + t.Errorf("Expected: %+v, Got %+v", e, a) + } + if status.PodIP != expectedIP { + t.Errorf("Expected %s, Got %s", expectedIP, status.PodIP) + } +} + +func TestPodPhaseWithBadNode(t *testing.T) { + desiredState := api.PodSpec{ + Containers: []api.Container{ + {Name: "containerA"}, + {Name: "containerB"}, + }, + RestartPolicy: api.RestartPolicy{Always: &api.RestartPolicyAlways{}}, + } + runningState := api.ContainerStatus{ + State: api.ContainerState{ + Running: &api.ContainerStateRunning{}, + }, + } + stoppedState := api.ContainerStatus{ + State: api.ContainerState{ + Termination: &api.ContainerStateTerminated{}, + }, + } + + tests := []struct { + pod *api.Pod + status api.PodPhase + test string + }{ + { + &api.Pod{ + Spec: desiredState, + Status: api.PodStatus{ + Host: "machine-2", + }, + }, + api.PodFailed, + "no info, but bad machine", + }, + { + &api.Pod{ + Spec: desiredState, + Status: api.PodStatus{ + Info: map[string]api.ContainerStatus{ + "containerA": runningState, + "containerB": runningState, + }, + Host: "machine-two", + }, + }, + api.PodFailed, + "all running but minion is missing", + }, + { + &api.Pod{ + Spec: desiredState, + Status: api.PodStatus{ + Info: map[string]api.ContainerStatus{ + "containerA": stoppedState, + "containerB": stoppedState, + }, + Host: "machine-two", + }, + }, + api.PodFailed, + "all stopped but minion missing", + }, + } + for _, test := range tests { + config := podCacheTestConfig{ + kubeletContainerInfo: test.pod.Status.Info, + nodes: []api.Node{}, + pods: []api.Pod{*test.pod}, + } + cache := config.Construct() + cache.UpdateAllContainers() + status, err := cache.GetPodStatus(test.pod.Namespace, test.pod.Name) + if err != nil { + t.Errorf("%v: Unexpected error %v", test.test, err) + continue + } + if e, a := test.status, status.Phase; e != a { + t.Errorf("In test %s, expected %v, got %v", test.test, e, a) + } + } +} + +func TestPodPhaseWithRestartAlways(t *testing.T) { + desiredState := api.PodSpec{ + Containers: []api.Container{ + {Name: "containerA"}, + {Name: "containerB"}, + }, + RestartPolicy: api.RestartPolicy{Always: &api.RestartPolicyAlways{}}, + } + currentState := api.PodStatus{ + Host: "machine", + } + runningState := api.ContainerStatus{ + State: api.ContainerState{ + Running: &api.ContainerStateRunning{}, + }, + } + stoppedState := api.ContainerStatus{ + State: api.ContainerState{ + Termination: &api.ContainerStateTerminated{}, + }, + } + + tests := []struct { + pod *api.Pod + status api.PodPhase + test string + }{ + {&api.Pod{Spec: desiredState, Status: currentState}, api.PodPending, "waiting"}, + { + &api.Pod{ + Spec: desiredState, + Status: api.PodStatus{ + Info: map[string]api.ContainerStatus{ + "containerA": runningState, + "containerB": runningState, + }, + Host: "machine", + }, + }, + api.PodRunning, + "all running", + }, + { + &api.Pod{ + Spec: desiredState, + Status: api.PodStatus{ + Info: map[string]api.ContainerStatus{ + "containerA": stoppedState, + "containerB": stoppedState, + }, + Host: "machine", + }, + }, + api.PodRunning, + "all stopped with restart always", + }, + { + &api.Pod{ + Spec: desiredState, + Status: api.PodStatus{ + Info: map[string]api.ContainerStatus{ + "containerA": runningState, + "containerB": stoppedState, + }, + Host: "machine", + }, + }, + api.PodRunning, + "mixed state #1 with restart always", + }, + { + &api.Pod{ + Spec: desiredState, + Status: api.PodStatus{ + Info: map[string]api.ContainerStatus{ + "containerA": runningState, + }, + Host: "machine", + }, + }, + api.PodPending, + "mixed state #2 with restart always", + }, + } + for _, test := range tests { + if status := getPhase(&test.pod.Spec, test.pod.Status.Info); status != test.status { + t.Errorf("In test %s, expected %v, got %v", test.test, test.status, status) + } + } +} + +func TestPodPhaseWithRestartNever(t *testing.T) { + desiredState := api.PodSpec{ + Containers: []api.Container{ + {Name: "containerA"}, + {Name: "containerB"}, + }, + RestartPolicy: api.RestartPolicy{Never: &api.RestartPolicyNever{}}, + } + currentState := api.PodStatus{ + Host: "machine", + } + runningState := api.ContainerStatus{ + State: api.ContainerState{ + Running: &api.ContainerStateRunning{}, + }, + } + succeededState := api.ContainerStatus{ + State: api.ContainerState{ + Termination: &api.ContainerStateTerminated{ + ExitCode: 0, + }, + }, + } + failedState := api.ContainerStatus{ + State: api.ContainerState{ + Termination: &api.ContainerStateTerminated{ + ExitCode: -1, + }, + }, + } + + tests := []struct { + pod *api.Pod + status api.PodPhase + test string + }{ + {&api.Pod{Spec: desiredState, Status: currentState}, api.PodPending, "waiting"}, + { + &api.Pod{ + Spec: desiredState, + Status: api.PodStatus{ + Info: map[string]api.ContainerStatus{ + "containerA": runningState, + "containerB": runningState, + }, + Host: "machine", + }, + }, + api.PodRunning, + "all running with restart never", + }, + { + &api.Pod{ + Spec: desiredState, + Status: api.PodStatus{ + Info: map[string]api.ContainerStatus{ + "containerA": succeededState, + "containerB": succeededState, + }, + Host: "machine", + }, + }, + api.PodSucceeded, + "all succeeded with restart never", + }, + { + &api.Pod{ + Spec: desiredState, + Status: api.PodStatus{ + Info: map[string]api.ContainerStatus{ + "containerA": failedState, + "containerB": failedState, + }, + Host: "machine", + }, + }, + api.PodFailed, + "all failed with restart never", + }, + { + &api.Pod{ + Spec: desiredState, + Status: api.PodStatus{ + Info: map[string]api.ContainerStatus{ + "containerA": runningState, + "containerB": succeededState, + }, + Host: "machine", + }, + }, + api.PodRunning, + "mixed state #1 with restart never", + }, + { + &api.Pod{ + Spec: desiredState, + Status: api.PodStatus{ + Info: map[string]api.ContainerStatus{ + "containerA": runningState, + }, + Host: "machine", + }, + }, + api.PodPending, + "mixed state #2 with restart never", + }, + } + for _, test := range tests { + if status := getPhase(&test.pod.Spec, test.pod.Status.Info); status != test.status { + t.Errorf("In test %s, expected %v, got %v", test.test, test.status, status) + } + } +} + +func TestPodPhaseWithRestartOnFailure(t *testing.T) { + desiredState := api.PodSpec{ + Containers: []api.Container{ + {Name: "containerA"}, + {Name: "containerB"}, + }, + RestartPolicy: api.RestartPolicy{OnFailure: &api.RestartPolicyOnFailure{}}, + } + currentState := api.PodStatus{ + Host: "machine", + } + runningState := api.ContainerStatus{ + State: api.ContainerState{ + Running: &api.ContainerStateRunning{}, + }, + } + succeededState := api.ContainerStatus{ + State: api.ContainerState{ + Termination: &api.ContainerStateTerminated{ + ExitCode: 0, + }, + }, + } + failedState := api.ContainerStatus{ + State: api.ContainerState{ + Termination: &api.ContainerStateTerminated{ + ExitCode: -1, + }, + }, + } + + tests := []struct { + pod *api.Pod + status api.PodPhase + test string + }{ + {&api.Pod{Spec: desiredState, Status: currentState}, api.PodPending, "waiting"}, + { + &api.Pod{ + Spec: desiredState, + Status: api.PodStatus{ + Info: map[string]api.ContainerStatus{ + "containerA": runningState, + "containerB": runningState, + }, + Host: "machine", + }, + }, + api.PodRunning, + "all running with restart onfailure", + }, + { + &api.Pod{ + Spec: desiredState, + Status: api.PodStatus{ + Info: map[string]api.ContainerStatus{ + "containerA": succeededState, + "containerB": succeededState, + }, + Host: "machine", + }, + }, + api.PodSucceeded, + "all succeeded with restart onfailure", + }, + { + &api.Pod{ + Spec: desiredState, + Status: api.PodStatus{ + Info: map[string]api.ContainerStatus{ + "containerA": failedState, + "containerB": failedState, + }, + Host: "machine", + }, + }, + api.PodRunning, + "all failed with restart never", + }, + { + &api.Pod{ + Spec: desiredState, + Status: api.PodStatus{ + Info: map[string]api.ContainerStatus{ + "containerA": runningState, + "containerB": succeededState, + }, + Host: "machine", + }, + }, + api.PodRunning, + "mixed state #1 with restart onfailure", + }, + { + &api.Pod{ + Spec: desiredState, + Status: api.PodStatus{ + Info: map[string]api.ContainerStatus{ + "containerA": runningState, + }, + Host: "machine", + }, + }, + api.PodPending, + "mixed state #2 with restart onfailure", + }, + } + for _, test := range tests { + if status := getPhase(&test.pod.Spec, test.pod.Status.Info); status != test.status { + t.Errorf("In test %s, expected %v, got %v", test.test, test.status, status) + } } } diff --git a/pkg/registry/pod/rest.go b/pkg/registry/pod/rest.go index a89a1917b07..270f1aaa8ec 100644 --- a/pkg/registry/pod/rest.go +++ b/pkg/registry/pod/rest.go @@ -18,72 +18,37 @@ package pod import ( "fmt" - "sync" - "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/v1beta1" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/validation" "github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver" - "github.com/GoogleCloudPlatform/kubernetes/pkg/client" - "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" - - "github.com/golang/glog" ) -type ipCacheEntry struct { - ip string - lastUpdate time.Time -} - -type ipCache map[string]ipCacheEntry - -type clock interface { - Now() time.Time -} - -type realClock struct{} - -func (r realClock) Now() time.Time { - return time.Now() +type PodStatusGetter interface { + GetPodStatus(namespace, name string) (*api.PodStatus, error) } // REST implements the RESTStorage interface in terms of a PodRegistry. type REST struct { - cloudProvider cloudprovider.Interface - mu sync.Mutex - podCache client.PodInfoGetter - podInfoGetter client.PodInfoGetter - podPollPeriod time.Duration - registry Registry - nodes client.NodeInterface - ipCache ipCache - clock clock + podCache PodStatusGetter + registry Registry } type RESTConfig struct { - CloudProvider cloudprovider.Interface - PodCache client.PodInfoGetter - PodInfoGetter client.PodInfoGetter - Registry Registry - Nodes client.NodeInterface + PodCache PodStatusGetter + Registry Registry } // NewREST returns a new REST. func NewREST(config *RESTConfig) *REST { return &REST{ - cloudProvider: config.CloudProvider, - podCache: config.PodCache, - podInfoGetter: config.PodInfoGetter, - podPollPeriod: time.Second * 10, - registry: config.Registry, - nodes: config.Nodes, - ipCache: ipCache{}, - clock: realClock{}, + podCache: config.PodCache, + registry: config.Registry, } } @@ -123,17 +88,17 @@ func (rs *REST) Get(ctx api.Context, id string) (runtime.Object, error) { if pod == nil { return pod, nil } - if rs.podCache != nil || rs.podInfoGetter != nil { - rs.fillPodInfo(pod) - status, err := getPodStatus(pod, rs.nodes) - if err != nil { - return pod, err + host := pod.Status.Host + if status, err := rs.podCache.GetPodStatus(pod.Namespace, pod.Name); err != nil { + pod.Status = api.PodStatus{ + Phase: api.PodUnknown, } - pod.Status.Phase = status - } - if pod.Status.Host != "" { - pod.Status.HostIP = rs.getInstanceIP(pod.Status.Host) + } else { + pod.Status = *status } + // Make sure not to hide a recent host with an old one from the cache. + // TODO: move host to spec + pod.Status.Host = host return pod, err } @@ -168,15 +133,18 @@ func (rs *REST) List(ctx api.Context, label, field labels.Selector) (runtime.Obj if err == nil { for i := range pods.Items { pod := &pods.Items[i] - rs.fillPodInfo(pod) - status, err := getPodStatus(pod, rs.nodes) - if err != nil { - status = api.PodUnknown - } - pod.Status.Phase = status - if pod.Status.Host != "" { - pod.Status.HostIP = rs.getInstanceIP(pod.Status.Host) + host := pod.Status.Host + if status, err := rs.podCache.GetPodStatus(pod.Namespace, pod.Name); err != nil { + pod.Status = api.PodStatus{ + Phase: api.PodUnknown, + } + } else { + pod.Status = *status } + // Make sure not to hide a recent host with an old one from the cache. + // This is tested by the integration test. + // TODO: move host to spec + pod.Status.Host = host } } return pods, err @@ -207,148 +175,3 @@ func (rs *REST) Update(ctx api.Context, obj runtime.Object) (<-chan apiserver.RE return rs.registry.GetPod(ctx, pod.Name) }), nil } - -func (rs *REST) fillPodInfo(pod *api.Pod) { - if pod.Status.Host == "" { - return - } - // Get cached info for the list currently. - // TODO: Optionally use fresh info - if rs.podCache != nil { - info, err := rs.podCache.GetPodInfo(pod.Status.Host, pod.Namespace, pod.Name) - if err != nil { - if err != client.ErrPodInfoNotAvailable { - glog.Errorf("Error getting container info from cache: %v", err) - } - if rs.podInfoGetter != nil { - info, err = rs.podInfoGetter.GetPodInfo(pod.Status.Host, pod.Namespace, pod.Name) - } - if err != nil { - if err != client.ErrPodInfoNotAvailable { - glog.Errorf("Error getting fresh container info: %v", err) - } - return - } - } - pod.Status.Info = info.ContainerInfo - netContainerInfo, ok := pod.Status.Info["net"] - if ok { - if netContainerInfo.PodIP != "" { - pod.Status.PodIP = netContainerInfo.PodIP - } else if netContainerInfo.State.Running != nil { - glog.Warningf("No network settings: %#v", netContainerInfo) - } - } else { - glog.Warningf("Couldn't find network container for %s in %v", pod.Name, info) - } - } -} - -func (rs *REST) getInstanceIP(host string) string { - data, ok := rs.ipCache[host] - now := rs.clock.Now() - - if !ok || now.Sub(data.lastUpdate) > (30*time.Second) { - ip := getInstanceIPFromCloud(rs.cloudProvider, host) - data = ipCacheEntry{ - ip: ip, - lastUpdate: now, - } - rs.ipCache[host] = data - } - return data.ip -} - -func getInstanceIPFromCloud(cloud cloudprovider.Interface, host string) string { - if cloud == nil { - return "" - } - instances, ok := cloud.Instances() - if instances == nil || !ok { - return "" - } - addr, err := instances.IPAddress(host) - if err != nil { - glog.Errorf("Error getting instance IP for %q: %v", host, err) - return "" - } - return addr.String() -} - -func getPodStatus(pod *api.Pod, nodes client.NodeInterface) (api.PodPhase, error) { - if pod.Status.Host == "" { - return api.PodPending, nil - } - if nodes != nil { - _, err := nodes.Get(pod.Status.Host) - if err != nil { - if errors.IsNotFound(err) { - return api.PodFailed, nil - } - glog.Errorf("Error getting pod info: %v", err) - return api.PodUnknown, nil - } - } else { - glog.Errorf("Unexpected missing minion interface, status may be in-accurate") - } - if pod.Status.Info == nil { - return api.PodPending, nil - } - // TODO(dchen1107): move the entire logic to kubelet? - running := 0 - waiting := 0 - stopped := 0 - failed := 0 - succeeded := 0 - unknown := 0 - for _, container := range pod.Spec.Containers { - if containerStatus, ok := pod.Status.Info[container.Name]; ok { - if containerStatus.State.Running != nil { - running++ - } else if containerStatus.State.Termination != nil { - stopped++ - if containerStatus.State.Termination.ExitCode == 0 { - succeeded++ - } else { - failed++ - } - } else if containerStatus.State.Waiting != nil { - waiting++ - } else { - unknown++ - } - } else { - unknown++ - } - } - switch { - case waiting > 0: - // One or more containers has not been started - return api.PodPending, nil - case running > 0 && unknown == 0: - // All containers have been started, and at least - // one container is running - return api.PodRunning, nil - case running == 0 && stopped > 0 && unknown == 0: - // All containers are terminated - if pod.Spec.RestartPolicy.Always != nil { - // All containers are in the process of restarting - return api.PodRunning, nil - } - if stopped == succeeded { - // RestartPolicy is not Always, and all - // containers are terminated in success - return api.PodSucceeded, nil - } - if pod.Spec.RestartPolicy.Never != nil { - // RestartPolicy is Never, and all containers are - // terminated with at least one in failure - return api.PodFailed, nil - } - // RestartPolicy is OnFailure, and at least one in failure - // and in the process of restarting - return api.PodRunning, nil - default: - return api.PodPending, nil - } -} diff --git a/pkg/registry/pod/rest_test.go b/pkg/registry/pod/rest_test.go index 177af00c515..e93853c69f6 100644 --- a/pkg/registry/pod/rest_test.go +++ b/pkg/registry/pod/rest_test.go @@ -28,12 +28,25 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api/latest" "github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver" "github.com/GoogleCloudPlatform/kubernetes/pkg/client" - fake_cloud "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/fake" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/registrytest" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" ) +type fakeCache struct { + requestedNamespace string + requestedName string + + statusToReturn *api.PodStatus + errorToReturn error +} + +func (f *fakeCache) GetPodStatus(namespace, name string) (*api.PodStatus, error) { + f.requestedNamespace = namespace + f.requestedName = name + return f.statusToReturn, f.errorToReturn +} + func expectApiStatusError(t *testing.T, ch <-chan apiserver.RESTResult, msg string) { out := <-ch status, ok := out.Object.(*api.Status) @@ -61,6 +74,7 @@ func TestCreatePodRegistryError(t *testing.T) { podRegistry.Err = fmt.Errorf("test error") storage := REST{ registry: podRegistry, + podCache: &fakeCache{statusToReturn: &api.PodStatus{}}, } pod := &api.Pod{} ctx := api.NewDefaultContext() @@ -76,6 +90,7 @@ func TestCreatePodSetsIds(t *testing.T) { podRegistry.Err = fmt.Errorf("test error") storage := REST{ registry: podRegistry, + podCache: &fakeCache{statusToReturn: &api.PodStatus{}}, } pod := &api.Pod{} ctx := api.NewDefaultContext() @@ -98,6 +113,7 @@ func TestCreatePodSetsUID(t *testing.T) { podRegistry.Err = fmt.Errorf("test error") storage := REST{ registry: podRegistry, + podCache: &fakeCache{statusToReturn: &api.PodStatus{}}, } pod := &api.Pod{} ctx := api.NewDefaultContext() @@ -117,6 +133,7 @@ func TestListPodsError(t *testing.T) { podRegistry.Err = fmt.Errorf("test error") storage := REST{ registry: podRegistry, + podCache: &fakeCache{statusToReturn: &api.PodStatus{}}, } ctx := api.NewContext() pods, err := storage.List(ctx, labels.Everything(), labels.Everything()) @@ -128,10 +145,40 @@ func TestListPodsError(t *testing.T) { } } +func TestListPodsCacheError(t *testing.T) { + podRegistry := registrytest.NewPodRegistry(nil) + podRegistry.Pods = &api.PodList{ + Items: []api.Pod{ + { + ObjectMeta: api.ObjectMeta{ + Name: "foo", + }, + }, + }, + } + storage := REST{ + registry: podRegistry, + podCache: &fakeCache{errorToReturn: client.ErrPodInfoNotAvailable}, + } + ctx := api.NewContext() + pods, err := storage.List(ctx, labels.Everything(), labels.Everything()) + if err != nil { + t.Fatalf("Expected no error, got %#v", err) + } + pl := pods.(*api.PodList) + if len(pl.Items) != 1 { + t.Fatalf("Unexpected 0-len pod list: %+v", pl) + } + if e, a := api.PodUnknown, pl.Items[0].Status.Phase; e != a { + t.Errorf("Expected %v, got %v", e, a) + } +} + func TestListEmptyPodList(t *testing.T) { podRegistry := registrytest.NewPodRegistry(&api.PodList{ListMeta: api.ListMeta{ResourceVersion: "1"}}) storage := REST{ registry: podRegistry, + podCache: &fakeCache{statusToReturn: &api.PodStatus{}}, } ctx := api.NewContext() pods, err := storage.List(ctx, labels.Everything(), labels.Everything()) @@ -147,14 +194,6 @@ func TestListEmptyPodList(t *testing.T) { } } -type fakeClock struct { - t time.Time -} - -func (f *fakeClock) Now() time.Time { - return f.t -} - func TestListPodList(t *testing.T) { podRegistry := registrytest.NewPodRegistry(nil) podRegistry.Pods = &api.PodList{ @@ -173,8 +212,7 @@ func TestListPodList(t *testing.T) { } storage := REST{ registry: podRegistry, - ipCache: ipCache{}, - clock: &fakeClock{}, + podCache: &fakeCache{statusToReturn: &api.PodStatus{Phase: api.PodRunning}}, } ctx := api.NewContext() podsObj, err := storage.List(ctx, labels.Everything(), labels.Everything()) @@ -186,7 +224,7 @@ func TestListPodList(t *testing.T) { if len(pods.Items) != 2 { t.Errorf("Unexpected pod list: %#v", pods) } - if pods.Items[0].Name != "foo" { + if pods.Items[0].Name != "foo" || pods.Items[0].Status.Phase != api.PodRunning { t.Errorf("Unexpected pod: %#v", pods.Items[0]) } if pods.Items[1].Name != "bar" { @@ -218,8 +256,7 @@ func TestListPodListSelection(t *testing.T) { } storage := REST{ registry: podRegistry, - ipCache: ipCache{}, - clock: &fakeClock{}, + podCache: &fakeCache{statusToReturn: &api.PodStatus{}}, } ctx := api.NewContext() @@ -283,6 +320,7 @@ func TestPodDecode(t *testing.T) { podRegistry := registrytest.NewPodRegistry(nil) storage := REST{ registry: podRegistry, + podCache: &fakeCache{statusToReturn: &api.PodStatus{}}, } expected := &api.Pod{ ObjectMeta: api.ObjectMeta{ @@ -305,12 +343,37 @@ func TestPodDecode(t *testing.T) { } func TestGetPod(t *testing.T) { + podRegistry := registrytest.NewPodRegistry(nil) + podRegistry.Pod = &api.Pod{ + ObjectMeta: api.ObjectMeta{Name: "foo"}, + Status: api.PodStatus{Host: "machine"}, + } + storage := REST{ + registry: podRegistry, + podCache: &fakeCache{statusToReturn: &api.PodStatus{Phase: api.PodRunning}}, + } + ctx := api.NewContext() + obj, err := storage.Get(ctx, "foo") + pod := obj.(*api.Pod) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + + expect := *podRegistry.Pod + expect.Status.Phase = api.PodRunning + // TODO: when host is moved to spec, remove this line. + expect.Status.Host = "machine" + if e, a := &expect, pod; !reflect.DeepEqual(e, a) { + t.Errorf("Unexpected pod. Expected %#v, Got %#v", e, a) + } +} + +func TestGetPodCacheError(t *testing.T) { podRegistry := registrytest.NewPodRegistry(nil) podRegistry.Pod = &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}} storage := REST{ registry: podRegistry, - ipCache: ipCache{}, - clock: &fakeClock{}, + podCache: &fakeCache{errorToReturn: client.ErrPodInfoNotAvailable}, } ctx := api.NewContext() obj, err := storage.Get(ctx, "foo") @@ -319,497 +382,19 @@ func TestGetPod(t *testing.T) { t.Errorf("unexpected error: %v", err) } - if e, a := podRegistry.Pod, pod; !reflect.DeepEqual(e, a) { + expect := *podRegistry.Pod + expect.Status.Phase = api.PodUnknown + if e, a := &expect, pod; !reflect.DeepEqual(e, a) { t.Errorf("Unexpected pod. Expected %#v, Got %#v", e, a) } } -func TestGetPodCloud(t *testing.T) { - fakeCloud := &fake_cloud.FakeCloud{} - podRegistry := registrytest.NewPodRegistry(nil) - podRegistry.Pod = &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}, Status: api.PodStatus{Host: "machine"}} - - clock := &fakeClock{t: time.Now()} - - storage := REST{ - registry: podRegistry, - cloudProvider: fakeCloud, - ipCache: ipCache{}, - clock: clock, - } - ctx := api.NewContext() - obj, err := storage.Get(ctx, "foo") - pod := obj.(*api.Pod) - if err != nil { - t.Errorf("unexpected error: %v", err) - } - - if e, a := podRegistry.Pod, pod; !reflect.DeepEqual(e, a) { - t.Errorf("Unexpected pod. Expected %#v, Got %#v", e, a) - } - - // This call should hit the cache, so we expect no additional calls to the cloud - obj, err = storage.Get(ctx, "foo") - if err != nil { - t.Errorf("unexpected error: %v", err) - } - if len(fakeCloud.Calls) != 1 || fakeCloud.Calls[0] != "ip-address" { - t.Errorf("Unexpected calls: %#v", fakeCloud.Calls) - } - - // Advance the clock, this call should miss the cache, so expect one more call. - clock.t = clock.t.Add(60 * time.Second) - obj, err = storage.Get(ctx, "foo") - if err != nil { - t.Errorf("unexpected error: %v", err) - } - if len(fakeCloud.Calls) != 2 || fakeCloud.Calls[1] != "ip-address" { - t.Errorf("Unexpected calls: %#v", fakeCloud.Calls) - } -} - -func TestPodStatusWithBadNode(t *testing.T) { - fakeClient := client.Fake{ - MinionsList: api.NodeList{ - Items: []api.Node{ - { - ObjectMeta: api.ObjectMeta{Name: "machine"}, - }, - }, - }, - } - desiredState := api.PodSpec{ - Containers: []api.Container{ - {Name: "containerA"}, - {Name: "containerB"}, - }, - RestartPolicy: api.RestartPolicy{Always: &api.RestartPolicyAlways{}}, - } - runningState := api.ContainerStatus{ - State: api.ContainerState{ - Running: &api.ContainerStateRunning{}, - }, - } - stoppedState := api.ContainerStatus{ - State: api.ContainerState{ - Termination: &api.ContainerStateTerminated{}, - }, - } - - tests := []struct { - pod *api.Pod - status api.PodPhase - test string - }{ - { - &api.Pod{ - Spec: desiredState, - Status: api.PodStatus{ - Host: "machine-2", - }, - }, - api.PodFailed, - "no info, but bad machine", - }, - { - &api.Pod{ - Spec: desiredState, - Status: api.PodStatus{ - Info: map[string]api.ContainerStatus{ - "containerA": runningState, - "containerB": runningState, - }, - Host: "machine-two", - }, - }, - api.PodFailed, - "all running but minion is missing", - }, - { - &api.Pod{ - Spec: desiredState, - Status: api.PodStatus{ - Info: map[string]api.ContainerStatus{ - "containerA": stoppedState, - "containerB": stoppedState, - }, - Host: "machine-two", - }, - }, - api.PodFailed, - "all stopped but minion missing", - }, - } - for _, test := range tests { - if status, err := getPodStatus(test.pod, fakeClient.Nodes()); status != test.status { - t.Errorf("In test %s, expected %v, got %v", test.test, test.status, status) - if err != nil { - t.Errorf("In test %s, unexpected error: %v", test.test, err) - } - } - } -} - -func TestPodStatusWithRestartAlways(t *testing.T) { - fakeClient := client.Fake{ - MinionsList: api.NodeList{ - Items: []api.Node{ - { - ObjectMeta: api.ObjectMeta{Name: "machine"}, - }, - }, - }, - } - desiredState := api.PodSpec{ - Containers: []api.Container{ - {Name: "containerA"}, - {Name: "containerB"}, - }, - RestartPolicy: api.RestartPolicy{Always: &api.RestartPolicyAlways{}}, - } - currentState := api.PodStatus{ - Host: "machine", - } - runningState := api.ContainerStatus{ - State: api.ContainerState{ - Running: &api.ContainerStateRunning{}, - }, - } - stoppedState := api.ContainerStatus{ - State: api.ContainerState{ - Termination: &api.ContainerStateTerminated{}, - }, - } - - tests := []struct { - pod *api.Pod - status api.PodPhase - test string - }{ - {&api.Pod{Spec: desiredState, Status: currentState}, api.PodPending, "waiting"}, - { - &api.Pod{ - Spec: desiredState, - Status: api.PodStatus{ - Info: map[string]api.ContainerStatus{ - "containerA": runningState, - "containerB": runningState, - }, - Host: "machine", - }, - }, - api.PodRunning, - "all running", - }, - { - &api.Pod{ - Spec: desiredState, - Status: api.PodStatus{ - Info: map[string]api.ContainerStatus{ - "containerA": stoppedState, - "containerB": stoppedState, - }, - Host: "machine", - }, - }, - api.PodRunning, - "all stopped with restart always", - }, - { - &api.Pod{ - Spec: desiredState, - Status: api.PodStatus{ - Info: map[string]api.ContainerStatus{ - "containerA": runningState, - "containerB": stoppedState, - }, - Host: "machine", - }, - }, - api.PodRunning, - "mixed state #1 with restart always", - }, - { - &api.Pod{ - Spec: desiredState, - Status: api.PodStatus{ - Info: map[string]api.ContainerStatus{ - "containerA": runningState, - }, - Host: "machine", - }, - }, - api.PodPending, - "mixed state #2 with restart always", - }, - } - for _, test := range tests { - if status, err := getPodStatus(test.pod, fakeClient.Nodes()); status != test.status { - t.Errorf("In test %s, expected %v, got %v", test.test, test.status, status) - if err != nil { - t.Errorf("In test %s, unexpected error: %v", test.test, err) - } - } - } -} - -func TestPodStatusWithRestartNever(t *testing.T) { - fakeClient := client.Fake{ - MinionsList: api.NodeList{ - Items: []api.Node{ - { - ObjectMeta: api.ObjectMeta{Name: "machine"}, - }, - }, - }, - } - desiredState := api.PodSpec{ - Containers: []api.Container{ - {Name: "containerA"}, - {Name: "containerB"}, - }, - RestartPolicy: api.RestartPolicy{Never: &api.RestartPolicyNever{}}, - } - currentState := api.PodStatus{ - Host: "machine", - } - runningState := api.ContainerStatus{ - State: api.ContainerState{ - Running: &api.ContainerStateRunning{}, - }, - } - succeededState := api.ContainerStatus{ - State: api.ContainerState{ - Termination: &api.ContainerStateTerminated{ - ExitCode: 0, - }, - }, - } - failedState := api.ContainerStatus{ - State: api.ContainerState{ - Termination: &api.ContainerStateTerminated{ - ExitCode: -1, - }, - }, - } - - tests := []struct { - pod *api.Pod - status api.PodPhase - test string - }{ - {&api.Pod{Spec: desiredState, Status: currentState}, api.PodPending, "waiting"}, - { - &api.Pod{ - Spec: desiredState, - Status: api.PodStatus{ - Info: map[string]api.ContainerStatus{ - "containerA": runningState, - "containerB": runningState, - }, - Host: "machine", - }, - }, - api.PodRunning, - "all running with restart never", - }, - { - &api.Pod{ - Spec: desiredState, - Status: api.PodStatus{ - Info: map[string]api.ContainerStatus{ - "containerA": succeededState, - "containerB": succeededState, - }, - Host: "machine", - }, - }, - api.PodSucceeded, - "all succeeded with restart never", - }, - { - &api.Pod{ - Spec: desiredState, - Status: api.PodStatus{ - Info: map[string]api.ContainerStatus{ - "containerA": failedState, - "containerB": failedState, - }, - Host: "machine", - }, - }, - api.PodFailed, - "all failed with restart never", - }, - { - &api.Pod{ - Spec: desiredState, - Status: api.PodStatus{ - Info: map[string]api.ContainerStatus{ - "containerA": runningState, - "containerB": succeededState, - }, - Host: "machine", - }, - }, - api.PodRunning, - "mixed state #1 with restart never", - }, - { - &api.Pod{ - Spec: desiredState, - Status: api.PodStatus{ - Info: map[string]api.ContainerStatus{ - "containerA": runningState, - }, - Host: "machine", - }, - }, - api.PodPending, - "mixed state #2 with restart never", - }, - } - for _, test := range tests { - if status, err := getPodStatus(test.pod, fakeClient.Nodes()); status != test.status { - t.Errorf("In test %s, expected %v, got %v", test.test, test.status, status) - if err != nil { - t.Errorf("In test %s, unexpected error: %v", test.test, err) - } - } - } -} - -func TestPodStatusWithRestartOnFailure(t *testing.T) { - fakeClient := client.Fake{ - MinionsList: api.NodeList{ - Items: []api.Node{ - { - ObjectMeta: api.ObjectMeta{Name: "machine"}, - }, - }, - }, - } - desiredState := api.PodSpec{ - Containers: []api.Container{ - {Name: "containerA"}, - {Name: "containerB"}, - }, - RestartPolicy: api.RestartPolicy{OnFailure: &api.RestartPolicyOnFailure{}}, - } - currentState := api.PodStatus{ - Host: "machine", - } - runningState := api.ContainerStatus{ - State: api.ContainerState{ - Running: &api.ContainerStateRunning{}, - }, - } - succeededState := api.ContainerStatus{ - State: api.ContainerState{ - Termination: &api.ContainerStateTerminated{ - ExitCode: 0, - }, - }, - } - failedState := api.ContainerStatus{ - State: api.ContainerState{ - Termination: &api.ContainerStateTerminated{ - ExitCode: -1, - }, - }, - } - - tests := []struct { - pod *api.Pod - status api.PodPhase - test string - }{ - {&api.Pod{Spec: desiredState, Status: currentState}, api.PodPending, "waiting"}, - { - &api.Pod{ - Spec: desiredState, - Status: api.PodStatus{ - Info: map[string]api.ContainerStatus{ - "containerA": runningState, - "containerB": runningState, - }, - Host: "machine", - }, - }, - api.PodRunning, - "all running with restart onfailure", - }, - { - &api.Pod{ - Spec: desiredState, - Status: api.PodStatus{ - Info: map[string]api.ContainerStatus{ - "containerA": succeededState, - "containerB": succeededState, - }, - Host: "machine", - }, - }, - api.PodSucceeded, - "all succeeded with restart onfailure", - }, - { - &api.Pod{ - Spec: desiredState, - Status: api.PodStatus{ - Info: map[string]api.ContainerStatus{ - "containerA": failedState, - "containerB": failedState, - }, - Host: "machine", - }, - }, - api.PodRunning, - "all failed with restart never", - }, - { - &api.Pod{ - Spec: desiredState, - Status: api.PodStatus{ - Info: map[string]api.ContainerStatus{ - "containerA": runningState, - "containerB": succeededState, - }, - Host: "machine", - }, - }, - api.PodRunning, - "mixed state #1 with restart onfailure", - }, - { - &api.Pod{ - Spec: desiredState, - Status: api.PodStatus{ - Info: map[string]api.ContainerStatus{ - "containerA": runningState, - }, - Host: "machine", - }, - }, - api.PodPending, - "mixed state #2 with restart onfailure", - }, - } - for _, test := range tests { - if status, err := getPodStatus(test.pod, fakeClient.Nodes()); status != test.status { - t.Errorf("In test %s, expected %v, got %v", test.test, test.status, status) - if err != nil { - t.Errorf("In test %s, unexpected error: %v", test.test, err) - } - } - } -} - func TestPodStorageValidatesCreate(t *testing.T) { podRegistry := registrytest.NewPodRegistry(nil) podRegistry.Err = fmt.Errorf("test error") storage := REST{ registry: podRegistry, + podCache: &fakeCache{statusToReturn: &api.PodStatus{}}, } ctx := api.NewDefaultContext() pod := &api.Pod{ @@ -837,8 +422,8 @@ func TestCreatePod(t *testing.T) { }, } storage := REST{ - registry: podRegistry, - podPollPeriod: time.Millisecond * 100, + registry: podRegistry, + podCache: &fakeCache{statusToReturn: &api.PodStatus{}}, } pod := &api.Pod{} pod.Name = "foo" @@ -867,57 +452,6 @@ func (f *FakePodInfoGetter) GetPodInfo(host, podNamespace string, podID string) return api.PodContainerInfo{ContainerInfo: f.info}, f.err } -func TestFillPodInfo(t *testing.T) { - expectedIP := "1.2.3.4" - expectedTime, _ := time.Parse("2013-Feb-03", "2013-Feb-03") - fakeGetter := FakePodInfoGetter{ - info: map[string]api.ContainerStatus{ - "net": { - State: api.ContainerState{ - Running: &api.ContainerStateRunning{ - StartedAt: util.NewTime(expectedTime), - }, - }, - RestartCount: 1, - PodIP: expectedIP, - }, - }, - } - storage := REST{ - podCache: &fakeGetter, - } - pod := api.Pod{Status: api.PodStatus{Host: "foo"}} - storage.fillPodInfo(&pod) - if !reflect.DeepEqual(fakeGetter.info, pod.Status.Info) { - t.Errorf("Expected: %#v, Got %#v", fakeGetter.info, pod.Status.Info) - } - if pod.Status.PodIP != expectedIP { - t.Errorf("Expected %s, Got %s", expectedIP, pod.Status.PodIP) - } -} - -func TestFillPodInfoNoData(t *testing.T) { - expectedIP := "" - fakeGetter := FakePodInfoGetter{ - info: map[string]api.ContainerStatus{ - "net": { - State: api.ContainerState{}, - }, - }, - } - storage := REST{ - podCache: &fakeGetter, - } - pod := api.Pod{Status: api.PodStatus{Host: "foo"}} - storage.fillPodInfo(&pod) - if !reflect.DeepEqual(fakeGetter.info, pod.Status.Info) { - t.Errorf("Expected %#v, Got %#v", fakeGetter.info, pod.Status.Info) - } - if pod.Status.PodIP != expectedIP { - t.Errorf("Expected %s, Got %s", expectedIP, pod.Status.PodIP) - } -} - func TestCreatePodWithConflictingNamespace(t *testing.T) { storage := REST{} pod := &api.Pod{ From 545d87d554e293b6bc11096ccaca0b4535146930 Mon Sep 17 00:00:00 2001 From: Daniel Smith Date: Mon, 22 Dec 2014 11:54:32 -0800 Subject: [PATCH 2/4] Move clock to util --- pkg/master/ip_cache.go | 19 +++------------- pkg/master/ip_cache_test.go | 13 +++-------- pkg/master/master.go | 2 +- pkg/util/clock.go | 45 +++++++++++++++++++++++++++++++++++++ 4 files changed, 52 insertions(+), 27 deletions(-) create mode 100644 pkg/util/clock.go diff --git a/pkg/master/ip_cache.go b/pkg/master/ip_cache.go index 02d7be71d10..35621384d71 100644 --- a/pkg/master/ip_cache.go +++ b/pkg/master/ip_cache.go @@ -21,6 +21,7 @@ import ( "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/golang/glog" ) @@ -31,7 +32,7 @@ type ipCacheEntry struct { } type ipCache struct { - clock Clock + clock util.Clock cloudProvider cloudprovider.Interface cache map[string]ipCacheEntry lock sync.Mutex @@ -40,7 +41,7 @@ type ipCache struct { // NewIPCache makes a new ip caching layer, which will get IP addresses from cp, // and use clock for deciding when to re-get an IP address. // Thread-safe. -func NewIPCache(cp cloudprovider.Interface, clock Clock) *ipCache { +func NewIPCache(cp cloudprovider.Interface, clock util.Clock) *ipCache { return &ipCache{ clock: clock, cloudProvider: cp, @@ -48,20 +49,6 @@ func NewIPCache(cp cloudprovider.Interface, clock Clock) *ipCache { } } -// Clock allows for injecting fake or real clocks into -// the cache. -type Clock interface { - Now() time.Time -} - -// RealClock really calls time.Now() -type RealClock struct{} - -// Now returns the current time. -func (r RealClock) Now() time.Time { - return time.Now() -} - // GetInstanceIP returns the IP address of host, from the cache // if possible, otherwise it asks the cloud provider. func (c *ipCache) GetInstanceIP(host string) string { diff --git a/pkg/master/ip_cache_test.go b/pkg/master/ip_cache_test.go index dac9c33d86e..9b3cd487707 100644 --- a/pkg/master/ip_cache_test.go +++ b/pkg/master/ip_cache_test.go @@ -21,19 +21,12 @@ import ( "time" fake_cloud "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/fake" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" ) -type fakeClock struct { - t time.Time -} - -func (f *fakeClock) Now() time.Time { - return f.t -} - func TestCacheExpire(t *testing.T) { fakeCloud := &fake_cloud.FakeCloud{} - clock := &fakeClock{t: time.Now()} + clock := &util.FakeClock{time.Now()} c := NewIPCache(fakeCloud, clock) @@ -41,7 +34,7 @@ func TestCacheExpire(t *testing.T) { // This call should hit the cache, so we expect no additional calls to the cloud _ = c.GetInstanceIP("foo") // Advance the clock, this call should miss the cache, so expect one more call. - clock.t = clock.t.Add(60 * time.Second) + clock.Time = clock.Time.Add(60 * time.Second) _ = c.GetInstanceIP("foo") if len(fakeCloud.Calls) != 2 || fakeCloud.Calls[1] != "ip-address" || fakeCloud.Calls[0] != "ip-address" { diff --git a/pkg/master/master.go b/pkg/master/master.go index 1b18d12f7ad..b8c55b57248 100644 --- a/pkg/master/master.go +++ b/pkg/master/master.go @@ -323,7 +323,7 @@ func (m *Master) init(c *Config) { var authenticator = c.Authenticator nodeRESTStorage := minion.NewREST(m.minionRegistry) - ipCache := NewIPCache(c.Cloud, RealClock{}) + ipCache := NewIPCache(c.Cloud, util.RealClock{}) podCache := NewPodCache( ipCache, c.KubeletClient, diff --git a/pkg/util/clock.go b/pkg/util/clock.go new file mode 100644 index 00000000000..0ef99f99daf --- /dev/null +++ b/pkg/util/clock.go @@ -0,0 +1,45 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package util + +import ( + "time" +) + +// Clock allows for injecting fake or real clocks into code that +// needs to do arbitrary things based on time. +type Clock interface { + Now() time.Time +} + +// RealClock really calls time.Now() +type RealClock struct{} + +// Now returns the current time. +func (r RealClock) Now() time.Time { + return time.Now() +} + +// FakeClock implements Clock, but returns an arbitary time. +type FakeClock struct { + Time time.Time +} + +// Now returns f's time. +func (f *FakeClock) Now() time.Time { + return f.Time +} From dc5383dcf8d03bef536104bc920058fd063f9338 Mon Sep 17 00:00:00 2001 From: Daniel Smith Date: Mon, 22 Dec 2014 12:00:44 -0800 Subject: [PATCH 3/4] add ttl as param to ip cache --- pkg/master/ip_cache.go | 9 +++++++-- pkg/master/ip_cache_test.go | 20 ++++++++++++++++++-- pkg/master/master.go | 2 +- 3 files changed, 26 insertions(+), 5 deletions(-) diff --git a/pkg/master/ip_cache.go b/pkg/master/ip_cache.go index 35621384d71..05a59c647c2 100644 --- a/pkg/master/ip_cache.go +++ b/pkg/master/ip_cache.go @@ -36,16 +36,21 @@ type ipCache struct { cloudProvider cloudprovider.Interface cache map[string]ipCacheEntry lock sync.Mutex + ttl time.Duration } // NewIPCache makes a new ip caching layer, which will get IP addresses from cp, // and use clock for deciding when to re-get an IP address. // Thread-safe. -func NewIPCache(cp cloudprovider.Interface, clock util.Clock) *ipCache { +// +// TODO: when we switch to go1.4, this class would be a good candidate for something +// that could be produced from a template and a type via `go generate`. +func NewIPCache(cp cloudprovider.Interface, clock util.Clock, ttl time.Duration) *ipCache { return &ipCache{ clock: clock, cloudProvider: cp, cache: map[string]ipCacheEntry{}, + ttl: ttl, } } @@ -57,7 +62,7 @@ func (c *ipCache) GetInstanceIP(host string) string { data, ok := c.cache[host] now := c.clock.Now() - if !ok || now.Sub(data.lastUpdate) > (30*time.Second) { + if !ok || now.Sub(data.lastUpdate) > c.ttl { ip := getInstanceIPFromCloud(c.cloudProvider, host) data = ipCacheEntry{ ip: ip, diff --git a/pkg/master/ip_cache_test.go b/pkg/master/ip_cache_test.go index 9b3cd487707..6e8139b88e8 100644 --- a/pkg/master/ip_cache_test.go +++ b/pkg/master/ip_cache_test.go @@ -28,16 +28,32 @@ func TestCacheExpire(t *testing.T) { fakeCloud := &fake_cloud.FakeCloud{} clock := &util.FakeClock{time.Now()} - c := NewIPCache(fakeCloud, clock) + c := NewIPCache(fakeCloud, clock, 60*time.Second) _ = c.GetInstanceIP("foo") // This call should hit the cache, so we expect no additional calls to the cloud _ = c.GetInstanceIP("foo") // Advance the clock, this call should miss the cache, so expect one more call. - clock.Time = clock.Time.Add(60 * time.Second) + clock.Time = clock.Time.Add(61 * time.Second) _ = c.GetInstanceIP("foo") if len(fakeCloud.Calls) != 2 || fakeCloud.Calls[1] != "ip-address" || fakeCloud.Calls[0] != "ip-address" { t.Errorf("Unexpected calls: %+v", fakeCloud.Calls) } } + +func TestCacheNotExpire(t *testing.T) { + fakeCloud := &fake_cloud.FakeCloud{} + clock := &util.FakeClock{time.Now()} + + c := NewIPCache(fakeCloud, clock, 60*time.Second) + + _ = c.GetInstanceIP("foo") + // This call should hit the cache, so we expect no additional calls to the cloud + clock.Time = clock.Time.Add(60 * time.Second) + _ = c.GetInstanceIP("foo") + + if len(fakeCloud.Calls) != 1 || fakeCloud.Calls[0] != "ip-address" { + t.Errorf("Unexpected calls: %+v", fakeCloud.Calls) + } +} diff --git a/pkg/master/master.go b/pkg/master/master.go index b8c55b57248..ca489f9a84d 100644 --- a/pkg/master/master.go +++ b/pkg/master/master.go @@ -323,7 +323,7 @@ func (m *Master) init(c *Config) { var authenticator = c.Authenticator nodeRESTStorage := minion.NewREST(m.minionRegistry) - ipCache := NewIPCache(c.Cloud, util.RealClock{}) + ipCache := NewIPCache(c.Cloud, util.RealClock{}, 30*time.Second) podCache := NewPodCache( ipCache, c.KubeletClient, From 691623551324825d9043c849bfd46295e61115b6 Mon Sep 17 00:00:00 2001 From: Daniel Smith Date: Mon, 22 Dec 2014 14:11:41 -0800 Subject: [PATCH 4/4] Make locking simpler add test for node existence cache behavior --- pkg/master/pod_cache.go | 90 ++++++++++++++++++++++-------------- pkg/master/pod_cache_test.go | 72 +++++++++++++++++++++++------ 2 files changed, 115 insertions(+), 47 deletions(-) diff --git a/pkg/master/pod_cache.go b/pkg/master/pod_cache.go index 819bd8d708a..4da89469813 100644 --- a/pkg/master/pod_cache.go +++ b/pkg/master/pod_cache.go @@ -80,28 +80,35 @@ func (p *PodCache) GetPodStatus(namespace, name string) (*api.PodStatus, error) return &value, nil } -// lock must *not* be held -func (p *PodCache) nodeExists(name string) bool { +func (p *PodCache) nodeExistsInCache(name string) (exists, cacheHit bool) { p.lock.Lock() defer p.lock.Unlock() - exists, cacheHit := p.currentNodes[objKey{"", name}] + exists, cacheHit = p.currentNodes[objKey{"", name}] + return exists, cacheHit +} + +// lock must *not* be held +func (p *PodCache) nodeExists(name string) bool { + exists, cacheHit := p.nodeExistsInCache(name) if cacheHit { return exists } - // Don't block everyone while looking up this minion. - // Because this may require an RPC to our storage (e.g. etcd). - func() { - p.lock.Unlock() - defer p.lock.Lock() - _, err := p.nodes.Get(name) - exists = true - if err != nil { - exists = false - if !errors.IsNotFound(err) { - glog.Errorf("Unexpected error type verifying minion existence: %+v", err) - } + // TODO: suppose there's N concurrent requests for node "foo"; in that case + // it might be useful to block all of them and only look up "foo" once. + // (This code will make up to N lookups.) One way of doing that would be to + // have a pool of M mutexes and require that before looking up "foo" you must + // lock mutex hash("foo") % M. + _, err := p.nodes.Get(name) + exists = true + if err != nil { + exists = false + if !errors.IsNotFound(err) { + glog.Errorf("Unexpected error type verifying minion existence: %+v", err) } - }() + } + + p.lock.Lock() + defer p.lock.Unlock() p.currentNodes[objKey{"", name}] = exists return exists } @@ -109,23 +116,32 @@ func (p *PodCache) nodeExists(name string) bool { // TODO: once Host gets moved to spec, this can take a podSpec + metadata instead of an // entire pod? func (p *PodCache) updatePodStatus(pod *api.Pod) error { + newStatus, err := p.computePodStatus(pod) + + p.lock.Lock() + defer p.lock.Unlock() + // Map accesses must be locked. + p.podStatus[objKey{pod.Namespace, pod.Name}] = newStatus + + return err +} + +// computePodStatus always returns a new status, even if it also returns a non-nil error. +// TODO: once Host gets moved to spec, this can take a podSpec + metadata instead of an +// entire pod? +func (p *PodCache) computePodStatus(pod *api.Pod) (api.PodStatus, error) { newStatus := pod.Status + if pod.Status.Host == "" { - p.lock.Lock() - defer p.lock.Unlock() // Not assigned. newStatus.Phase = api.PodPending - p.podStatus[objKey{pod.Namespace, pod.Name}] = newStatus - return nil + return newStatus, nil } if !p.nodeExists(pod.Status.Host) { - p.lock.Lock() - defer p.lock.Unlock() // Assigned to non-existing node. newStatus.Phase = api.PodFailed - p.podStatus[objKey{pod.Namespace, pod.Name}] = newStatus - return nil + return newStatus, nil } info, err := p.containerInfo.GetPodInfo(pod.Status.Host, pod.Namespace, pod.Name) @@ -142,27 +158,33 @@ func (p *PodCache) updatePodStatus(pod *api.Pod) error { } } } + return newStatus, err +} + +func (p *PodCache) resetNodeExistenceCache() { p.lock.Lock() defer p.lock.Unlock() - p.podStatus[objKey{pod.Namespace, pod.Name}] = newStatus - return err + p.currentNodes = map[objKey]bool{} } // UpdateAllContainers updates information about all containers. +// Callers should let one call to UpdateAllContainers finish before +// calling again, or risk having new info getting clobbered by delayed +// old info. func (p *PodCache) UpdateAllContainers() { - func() { - // Reset which nodes we think exist - p.lock.Lock() - defer p.lock.Unlock() - p.currentNodes = map[objKey]bool{} - }() + p.resetNodeExistenceCache() ctx := api.NewContext() pods, err := p.pods.ListPods(ctx, labels.Everything()) if err != nil { - glog.Errorf("Error synchronizing container list: %v", err) + glog.Errorf("Error getting pod list: %v", err) return } + + // TODO: this algorithm is 1 goroutine & RPC per pod. With a little work, + // it should be possible to make it 1 per *node*, which will be important + // at very large scales. (To be clear, the goroutines shouldn't matter-- + // it's the RPCs that need to be minimized.) var wg sync.WaitGroup for i := range pods.Items { pod := &pods.Items[i] @@ -171,7 +193,7 @@ func (p *PodCache) UpdateAllContainers() { defer wg.Done() err := p.updatePodStatus(pod) if err != nil && err != client.ErrPodInfoNotAvailable { - glog.Errorf("Error synchronizing container: %v", err) + glog.Errorf("Error getting info for pod %v/%v: %v", pod.Namespace, pod.Name, err) } }() } diff --git a/pkg/master/pod_cache_test.go b/pkg/master/pod_cache_test.go index 341c3aece81..09c09a9e397 100644 --- a/pkg/master/pod_cache_test.go +++ b/pkg/master/pod_cache_test.go @@ -18,6 +18,7 @@ package master import ( "reflect" + "sync" "testing" "time" @@ -27,19 +28,48 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/util" ) -type FakePodInfoGetter struct { +type podInfoCall struct { host string - id string namespace string - data api.PodContainerInfo - err error + name string } -func (f *FakePodInfoGetter) GetPodInfo(host, namespace, id string) (api.PodContainerInfo, error) { - f.host = host - f.id = id - f.namespace = namespace - return f.data, f.err +type podInfoResponse struct { + useCount int + data api.PodContainerInfo + err error +} + +type podInfoCalls map[podInfoCall]*podInfoResponse + +type FakePodInfoGetter struct { + calls podInfoCalls + lock sync.Mutex + + // default data/error to return, or you can add + // responses to specific calls-- that will take precedence. + data api.PodContainerInfo + err error +} + +func (f *FakePodInfoGetter) GetPodInfo(host, namespace, name string) (api.PodContainerInfo, error) { + f.lock.Lock() + defer f.lock.Unlock() + + if f.calls == nil { + f.calls = podInfoCalls{} + } + + key := podInfoCall{host, namespace, name} + call, ok := f.calls[key] + if !ok { + f.calls[key] = &podInfoResponse{ + 0, f.data, f.err, + } + call = f.calls[key] + } + call.useCount++ + return call.data, call.err } func TestPodCacheGetDifferentNamespace(t *testing.T) { @@ -171,6 +201,7 @@ func makeNode(name string) *api.Node { func TestPodUpdateAllContainers(t *testing.T) { pod := makePod(api.NamespaceDefault, "foo", "machine", "bar") + pod2 := makePod(api.NamespaceDefault, "baz", "machine", "qux") config := podCacheTestConfig{ ipFunc: func(host string) string { if host == "machine" { @@ -180,15 +211,22 @@ func TestPodUpdateAllContainers(t *testing.T) { }, kubeletContainerInfo: api.PodInfo{"bar": api.ContainerStatus{}}, nodes: []api.Node{*makeNode("machine")}, - pods: []api.Pod{*pod}, + pods: []api.Pod{*pod, *pod2}, } cache := config.Construct() cache.UpdateAllContainers() - fake := config.fakePodInfo - if fake.host != "machine" || fake.id != "foo" || fake.namespace != api.NamespaceDefault { - t.Errorf("Unexpected access: %+v", fake) + call1 := config.fakePodInfo.calls[podInfoCall{"machine", api.NamespaceDefault, "foo"}] + call2 := config.fakePodInfo.calls[podInfoCall{"machine", api.NamespaceDefault, "baz"}] + if call1 == nil || call1.useCount != 1 { + t.Errorf("Expected 1 call for 'foo': %+v", config.fakePodInfo.calls) + } + if call2 == nil || call2.useCount != 1 { + t.Errorf("Expected 1 call for 'baz': %+v", config.fakePodInfo.calls) + } + if len(config.fakePodInfo.calls) != 2 { + t.Errorf("Expected 2 calls: %+v", config.fakePodInfo.calls) } status, err := cache.GetPodStatus(api.NamespaceDefault, "foo") @@ -201,6 +239,14 @@ func TestPodUpdateAllContainers(t *testing.T) { if e, a := "1.2.3.5", status.HostIP; e != a { t.Errorf("Unexpected mismatch. Expected: %+v, Got: %+v", e, a) } + + if e, a := 1, len(config.fakeNodes.Actions); e != a { + t.Errorf("Expected: %v, Got: %v; %+v", e, a, config.fakeNodes.Actions) + } else { + if e, a := "get-minion", config.fakeNodes.Actions[0].Action; e != a { + t.Errorf("Expected: %v, Got: %v; %+v", e, a, config.fakeNodes.Actions) + } + } } func TestFillPodStatusNoHost(t *testing.T) {