Merge pull request #3083 from lavalamp/optimize

Make list pods constant time
This commit is contained in:
Brendan Burns 2014-12-22 15:38:54 -08:00
commit 474cf20810
9 changed files with 1182 additions and 883 deletions

View File

@ -63,6 +63,7 @@ var (
type fakeKubeletClient struct{}
func (fakeKubeletClient) GetPodInfo(host, podNamespace, podID string) (api.PodContainerInfo, error) {
glog.V(3).Infof("Trying to get container info for %v/%v/%v", host, podNamespace, podID)
// This is a horrible hack to get around the fact that we can't provide
// different port numbers per kubelet...
var c client.PodInfoGetter

90
pkg/master/ip_cache.go Normal file
View File

@ -0,0 +1,90 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package master
import (
"sync"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/golang/glog"
)
type ipCacheEntry struct {
ip string
lastUpdate time.Time
}
type ipCache struct {
clock util.Clock
cloudProvider cloudprovider.Interface
cache map[string]ipCacheEntry
lock sync.Mutex
ttl time.Duration
}
// NewIPCache makes a new ip caching layer, which will get IP addresses from cp,
// and use clock for deciding when to re-get an IP address.
// Thread-safe.
//
// TODO: when we switch to go1.4, this class would be a good candidate for something
// that could be produced from a template and a type via `go generate`.
func NewIPCache(cp cloudprovider.Interface, clock util.Clock, ttl time.Duration) *ipCache {
return &ipCache{
clock: clock,
cloudProvider: cp,
cache: map[string]ipCacheEntry{},
ttl: ttl,
}
}
// GetInstanceIP returns the IP address of host, from the cache
// if possible, otherwise it asks the cloud provider.
func (c *ipCache) GetInstanceIP(host string) string {
c.lock.Lock()
defer c.lock.Unlock()
data, ok := c.cache[host]
now := c.clock.Now()
if !ok || now.Sub(data.lastUpdate) > c.ttl {
ip := getInstanceIPFromCloud(c.cloudProvider, host)
data = ipCacheEntry{
ip: ip,
lastUpdate: now,
}
c.cache[host] = data
}
return data.ip
}
func getInstanceIPFromCloud(cloud cloudprovider.Interface, host string) string {
if cloud == nil {
return ""
}
instances, ok := cloud.Instances()
if instances == nil || !ok {
return ""
}
addr, err := instances.IPAddress(host)
if err != nil {
glog.Errorf("Error getting instance IP for %q: %v", host, err)
return ""
}
return addr.String()
}

View File

@ -0,0 +1,59 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package master
import (
"testing"
"time"
fake_cloud "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/fake"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
)
func TestCacheExpire(t *testing.T) {
fakeCloud := &fake_cloud.FakeCloud{}
clock := &util.FakeClock{time.Now()}
c := NewIPCache(fakeCloud, clock, 60*time.Second)
_ = c.GetInstanceIP("foo")
// This call should hit the cache, so we expect no additional calls to the cloud
_ = c.GetInstanceIP("foo")
// Advance the clock, this call should miss the cache, so expect one more call.
clock.Time = clock.Time.Add(61 * time.Second)
_ = c.GetInstanceIP("foo")
if len(fakeCloud.Calls) != 2 || fakeCloud.Calls[1] != "ip-address" || fakeCloud.Calls[0] != "ip-address" {
t.Errorf("Unexpected calls: %+v", fakeCloud.Calls)
}
}
func TestCacheNotExpire(t *testing.T) {
fakeCloud := &fake_cloud.FakeCloud{}
clock := &util.FakeClock{time.Now()}
c := NewIPCache(fakeCloud, clock, 60*time.Second)
_ = c.GetInstanceIP("foo")
// This call should hit the cache, so we expect no additional calls to the cloud
clock.Time = clock.Time.Add(60 * time.Second)
_ = c.GetInstanceIP("foo")
if len(fakeCloud.Calls) != 1 || fakeCloud.Calls[0] != "ip-address" {
t.Errorf("Unexpected calls: %+v", fakeCloud.Calls)
}
}

View File

@ -319,28 +319,24 @@ func makeMinionRegistry(c *Config) minion.Registry {
// init initializes master.
func (m *Master) init(c *Config) {
podCache := NewPodCache(c.KubeletClient, m.podRegistry)
go util.Forever(func() { podCache.UpdateAllContainers() }, time.Second*30)
var userContexts = handlers.NewUserRequestContext()
var authenticator = c.Authenticator
nodeRESTStorage := minion.NewREST(m.minionRegistry)
ipCache := NewIPCache(c.Cloud, util.RealClock{}, 30*time.Second)
podCache := NewPodCache(
ipCache,
c.KubeletClient,
RESTStorageToNodes(nodeRESTStorage).Nodes(),
m.podRegistry,
)
go util.Forever(func() { podCache.UpdateAllContainers() }, time.Second*30)
// TODO: Factor out the core API registration
m.storage = map[string]apiserver.RESTStorage{
"pods": pod.NewREST(&pod.RESTConfig{
CloudProvider: c.Cloud,
PodCache: podCache,
PodInfoGetter: c.KubeletClient,
Registry: m.podRegistry,
// Note: this allows the pod rest object to directly call
// the node rest object without going through the network &
// apiserver. This arrangement should be temporary, nodes
// shouldn't really need this at all. Once we add more auth in,
// we need to consider carefully if this sort of shortcut is a
// good idea.
Nodes: RESTStorageToNodes(nodeRESTStorage).Nodes(),
PodCache: podCache,
Registry: m.podRegistry,
}),
"replicationControllers": controller.NewREST(m.controllerRegistry, m.podRegistry),
"services": service.NewREST(m.serviceRegistry, c.Cloud, m.minionRegistry, m.portalNet),

View File

@ -20,6 +20,7 @@ import (
"sync"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/pod"
@ -27,69 +28,238 @@ import (
"github.com/golang/glog"
)
type IPGetter interface {
GetInstanceIP(host string) (ip string)
}
// PodCache contains both a cache of container information, as well as the mechanism for keeping
// that cache up to date.
type PodCache struct {
ipCache IPGetter
containerInfo client.PodInfoGetter
pods pod.Registry
// This is a map of pod id to a map of container name to the
podInfo map[string]api.PodContainerInfo
podLock sync.Mutex
// For confirming existance of a node
nodes client.NodeInterface
// lock protects access to all fields below
lock sync.Mutex
// cached pod statuses.
podStatus map[objKey]api.PodStatus
// nodes that we know exist. Cleared at the beginning of each
// UpdateAllPods call.
currentNodes map[objKey]bool
}
// NewPodCache returns a new PodCache which watches container information registered in the given PodRegistry.
func NewPodCache(info client.PodInfoGetter, pods pod.Registry) *PodCache {
type objKey struct {
namespace, name string
}
// NewPodCache returns a new PodCache which watches container information
// registered in the given PodRegistry.
// TODO(lavalamp): pods should be a client.PodInterface.
func NewPodCache(ipCache IPGetter, info client.PodInfoGetter, nodes client.NodeInterface, pods pod.Registry) *PodCache {
return &PodCache{
ipCache: ipCache,
containerInfo: info,
pods: pods,
podInfo: map[string]api.PodContainerInfo{},
nodes: nodes,
currentNodes: map[objKey]bool{},
podStatus: map[objKey]api.PodStatus{},
}
}
// makePodCacheKey constructs a key for use in a map to address a pod with specified namespace and id
func makePodCacheKey(podNamespace, podID string) string {
return podNamespace + "." + podID
}
// GetPodInfo implements the PodInfoGetter.GetPodInfo.
// The returned value should be treated as read-only.
// TODO: Remove the host from this call, it's totally unnecessary.
func (p *PodCache) GetPodInfo(host, podNamespace, podID string) (api.PodContainerInfo, error) {
p.podLock.Lock()
defer p.podLock.Unlock()
value, ok := p.podInfo[makePodCacheKey(podNamespace, podID)]
// GetPodStatus gets the stored pod status.
func (p *PodCache) GetPodStatus(namespace, name string) (*api.PodStatus, error) {
p.lock.Lock()
defer p.lock.Unlock()
value, ok := p.podStatus[objKey{namespace, name}]
if !ok {
return api.PodContainerInfo{}, client.ErrPodInfoNotAvailable
return nil, client.ErrPodInfoNotAvailable
}
return value, nil
// Make a copy
return &value, nil
}
func (p *PodCache) updatePodInfo(host, podNamespace, podID string) error {
info, err := p.containerInfo.GetPodInfo(host, podNamespace, podID)
func (p *PodCache) nodeExistsInCache(name string) (exists, cacheHit bool) {
p.lock.Lock()
defer p.lock.Unlock()
exists, cacheHit = p.currentNodes[objKey{"", name}]
return exists, cacheHit
}
// lock must *not* be held
func (p *PodCache) nodeExists(name string) bool {
exists, cacheHit := p.nodeExistsInCache(name)
if cacheHit {
return exists
}
// TODO: suppose there's N concurrent requests for node "foo"; in that case
// it might be useful to block all of them and only look up "foo" once.
// (This code will make up to N lookups.) One way of doing that would be to
// have a pool of M mutexes and require that before looking up "foo" you must
// lock mutex hash("foo") % M.
_, err := p.nodes.Get(name)
exists = true
if err != nil {
return err
exists = false
if !errors.IsNotFound(err) {
glog.Errorf("Unexpected error type verifying minion existence: %+v", err)
}
}
p.podLock.Lock()
defer p.podLock.Unlock()
p.podInfo[makePodCacheKey(podNamespace, podID)] = info
return nil
p.lock.Lock()
defer p.lock.Unlock()
p.currentNodes[objKey{"", name}] = exists
return exists
}
// UpdateAllContainers updates information about all containers. Either called by Loop() below, or one-off.
// TODO: once Host gets moved to spec, this can take a podSpec + metadata instead of an
// entire pod?
func (p *PodCache) updatePodStatus(pod *api.Pod) error {
newStatus, err := p.computePodStatus(pod)
p.lock.Lock()
defer p.lock.Unlock()
// Map accesses must be locked.
p.podStatus[objKey{pod.Namespace, pod.Name}] = newStatus
return err
}
// computePodStatus always returns a new status, even if it also returns a non-nil error.
// TODO: once Host gets moved to spec, this can take a podSpec + metadata instead of an
// entire pod?
func (p *PodCache) computePodStatus(pod *api.Pod) (api.PodStatus, error) {
newStatus := pod.Status
if pod.Status.Host == "" {
// Not assigned.
newStatus.Phase = api.PodPending
return newStatus, nil
}
if !p.nodeExists(pod.Status.Host) {
// Assigned to non-existing node.
newStatus.Phase = api.PodFailed
return newStatus, nil
}
info, err := p.containerInfo.GetPodInfo(pod.Status.Host, pod.Namespace, pod.Name)
newStatus.HostIP = p.ipCache.GetInstanceIP(pod.Status.Host)
if err != nil {
newStatus.Phase = api.PodUnknown
} else {
newStatus.Info = info.ContainerInfo
newStatus.Phase = getPhase(&pod.Spec, newStatus.Info)
if netContainerInfo, ok := newStatus.Info["net"]; ok {
if netContainerInfo.PodIP != "" {
newStatus.PodIP = netContainerInfo.PodIP
}
}
}
return newStatus, err
}
func (p *PodCache) resetNodeExistenceCache() {
p.lock.Lock()
defer p.lock.Unlock()
p.currentNodes = map[objKey]bool{}
}
// UpdateAllContainers updates information about all containers.
// Callers should let one call to UpdateAllContainers finish before
// calling again, or risk having new info getting clobbered by delayed
// old info.
func (p *PodCache) UpdateAllContainers() {
p.resetNodeExistenceCache()
ctx := api.NewContext()
pods, err := p.pods.ListPods(ctx, labels.Everything())
if err != nil {
glog.Errorf("Error synchronizing container list: %v", err)
glog.Errorf("Error getting pod list: %v", err)
return
}
for _, pod := range pods.Items {
if pod.Status.Host == "" {
continue
}
err := p.updatePodInfo(pod.Status.Host, pod.Namespace, pod.Name)
if err != nil && err != client.ErrPodInfoNotAvailable {
glog.Errorf("Error synchronizing container: %v", err)
// TODO: this algorithm is 1 goroutine & RPC per pod. With a little work,
// it should be possible to make it 1 per *node*, which will be important
// at very large scales. (To be clear, the goroutines shouldn't matter--
// it's the RPCs that need to be minimized.)
var wg sync.WaitGroup
for i := range pods.Items {
pod := &pods.Items[i]
wg.Add(1)
go func() {
defer wg.Done()
err := p.updatePodStatus(pod)
if err != nil && err != client.ErrPodInfoNotAvailable {
glog.Errorf("Error getting info for pod %v/%v: %v", pod.Namespace, pod.Name, err)
}
}()
}
wg.Wait()
}
// getPhase returns the phase of a pod given its container info.
// TODO(dchen1107): push this all the way down into kubelet.
func getPhase(spec *api.PodSpec, info api.PodInfo) api.PodPhase {
if info == nil {
return api.PodPending
}
running := 0
waiting := 0
stopped := 0
failed := 0
succeeded := 0
unknown := 0
for _, container := range spec.Containers {
if containerStatus, ok := info[container.Name]; ok {
if containerStatus.State.Running != nil {
running++
} else if containerStatus.State.Termination != nil {
stopped++
if containerStatus.State.Termination.ExitCode == 0 {
succeeded++
} else {
failed++
}
} else if containerStatus.State.Waiting != nil {
waiting++
} else {
unknown++
}
} else {
unknown++
}
}
switch {
case waiting > 0:
// One or more containers has not been started
return api.PodPending
case running > 0 && unknown == 0:
// All containers have been started, and at least
// one container is running
return api.PodRunning
case running == 0 && stopped > 0 && unknown == 0:
// All containers are terminated
if spec.RestartPolicy.Always != nil {
// All containers are in the process of restarting
return api.PodRunning
}
if stopped == succeeded {
// RestartPolicy is not Always, and all
// containers are terminated in success
return api.PodSucceeded
}
if spec.RestartPolicy.Never != nil {
// RestartPolicy is Never, and all containers are
// terminated with at least one in failure
return api.PodFailed
}
// RestartPolicy is OnFailure, and at least one in failure
// and in the process of restarting
return api.PodRunning
default:
return api.PodPending
}
}

View File

@ -18,150 +18,731 @@ package master
import (
"reflect"
"sync"
"testing"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/registrytest"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
)
type FakePodInfoGetter struct {
type podInfoCall struct {
host string
id string
namespace string
data api.PodContainerInfo
err error
name string
}
func (f *FakePodInfoGetter) GetPodInfo(host, namespace, id string) (api.PodContainerInfo, error) {
f.host = host
f.id = id
f.namespace = namespace
return f.data, f.err
type podInfoResponse struct {
useCount int
data api.PodContainerInfo
err error
}
type podInfoCalls map[podInfoCall]*podInfoResponse
type FakePodInfoGetter struct {
calls podInfoCalls
lock sync.Mutex
// default data/error to return, or you can add
// responses to specific calls-- that will take precedence.
data api.PodContainerInfo
err error
}
func (f *FakePodInfoGetter) GetPodInfo(host, namespace, name string) (api.PodContainerInfo, error) {
f.lock.Lock()
defer f.lock.Unlock()
if f.calls == nil {
f.calls = podInfoCalls{}
}
key := podInfoCall{host, namespace, name}
call, ok := f.calls[key]
if !ok {
f.calls[key] = &podInfoResponse{
0, f.data, f.err,
}
call = f.calls[key]
}
call.useCount++
return call.data, call.err
}
func TestPodCacheGetDifferentNamespace(t *testing.T) {
cache := NewPodCache(nil, nil)
cache := NewPodCache(nil, nil, nil, nil)
expectedDefault := api.PodContainerInfo{
ContainerInfo: api.PodInfo{
expectedDefault := api.PodStatus{
Info: api.PodInfo{
"foo": api.ContainerStatus{},
},
}
expectedOther := api.PodContainerInfo{
ContainerInfo: api.PodInfo{
expectedOther := api.PodStatus{
Info: api.PodInfo{
"bar": api.ContainerStatus{},
},
}
cache.podInfo[makePodCacheKey(api.NamespaceDefault, "foo")] = expectedDefault
cache.podInfo[makePodCacheKey("other", "foo")] = expectedOther
cache.podStatus[objKey{api.NamespaceDefault, "foo"}] = expectedDefault
cache.podStatus[objKey{"other", "foo"}] = expectedOther
info, err := cache.GetPodInfo("host", api.NamespaceDefault, "foo")
info, err := cache.GetPodStatus(api.NamespaceDefault, "foo")
if err != nil {
t.Errorf("Unexpected error: %#v", err)
t.Errorf("Unexpected error: %+v", err)
}
if !reflect.DeepEqual(info, expectedDefault) {
t.Errorf("Unexpected mismatch. Expected: %#v, Got: #%v", &expectedOther, info)
if !reflect.DeepEqual(info, &expectedDefault) {
t.Errorf("Unexpected mismatch. Expected: %+v, Got: %+v", &expectedOther, info)
}
info, err = cache.GetPodInfo("host", "other", "foo")
info, err = cache.GetPodStatus("other", "foo")
if err != nil {
t.Errorf("Unexpected error: %#v", err)
t.Errorf("Unexpected error: %+v", err)
}
if !reflect.DeepEqual(info, expectedOther) {
t.Errorf("Unexpected mismatch. Expected: %#v, Got: #%v", &expectedOther, info)
if !reflect.DeepEqual(info, &expectedOther) {
t.Errorf("Unexpected mismatch. Expected: %+v, Got: %+v", &expectedOther, info)
}
}
func TestPodCacheGet(t *testing.T) {
cache := NewPodCache(nil, nil)
cache := NewPodCache(nil, nil, nil, nil)
expected := api.PodContainerInfo{
ContainerInfo: api.PodInfo{
expected := api.PodStatus{
Info: api.PodInfo{
"foo": api.ContainerStatus{},
},
}
cache.podInfo[makePodCacheKey(api.NamespaceDefault, "foo")] = expected
cache.podStatus[objKey{api.NamespaceDefault, "foo"}] = expected
info, err := cache.GetPodInfo("host", api.NamespaceDefault, "foo")
info, err := cache.GetPodStatus(api.NamespaceDefault, "foo")
if err != nil {
t.Errorf("Unexpected error: %#v", err)
t.Errorf("Unexpected error: %+v", err)
}
if !reflect.DeepEqual(info, expected) {
t.Errorf("Unexpected mismatch. Expected: %#v, Got: #%v", &expected, info)
if !reflect.DeepEqual(info, &expected) {
t.Errorf("Unexpected mismatch. Expected: %+v, Got: %+v", &expected, info)
}
}
func TestPodCacheGetMissing(t *testing.T) {
cache := NewPodCache(nil, nil)
cache := NewPodCache(nil, nil, nil, nil)
info, err := cache.GetPodInfo("host", api.NamespaceDefault, "foo")
status, err := cache.GetPodStatus(api.NamespaceDefault, "foo")
if err == nil {
t.Errorf("Unexpected non-error: %#v", err)
t.Errorf("Unexpected non-error: %+v", err)
}
if !reflect.DeepEqual(info, api.PodContainerInfo{}) {
t.Errorf("Unexpected info: %#v", info)
if status != nil {
t.Errorf("Unexpected status: %+v", status)
}
}
func TestPodGetPodInfoGetter(t *testing.T) {
expected := api.PodContainerInfo{
ContainerInfo: api.PodInfo{
"foo": api.ContainerStatus{},
type fakeIPCache func(string) string
func (f fakeIPCache) GetInstanceIP(host string) (ip string) {
return f(host)
}
type podCacheTestConfig struct {
ipFunc func(string) string // Construct will set a default if nil
nodes []api.Node
pods []api.Pod
kubeletContainerInfo api.PodInfo
// Construct will fill in these fields
fakePodInfo *FakePodInfoGetter
fakeNodes *client.Fake
fakePods *registrytest.PodRegistry
}
func (c *podCacheTestConfig) Construct() *PodCache {
if c.ipFunc == nil {
c.ipFunc = func(host string) string {
return "ip of " + host
}
}
c.fakePodInfo = &FakePodInfoGetter{
data: api.PodContainerInfo{
ContainerInfo: c.kubeletContainerInfo,
},
}
fake := FakePodInfoGetter{
data: expected,
c.fakeNodes = &client.Fake{
MinionsList: api.NodeList{
Items: c.nodes,
},
}
cache := NewPodCache(&fake, nil)
c.fakePods = registrytest.NewPodRegistry(&api.PodList{Items: c.pods})
return NewPodCache(
fakeIPCache(c.ipFunc),
c.fakePodInfo,
c.fakeNodes.Nodes(),
c.fakePods,
)
}
cache.updatePodInfo("host", api.NamespaceDefault, "foo")
if fake.host != "host" || fake.id != "foo" || fake.namespace != api.NamespaceDefault {
t.Errorf("Unexpected access: %#v", fake)
func makePod(namespace, name, host string, containers ...string) *api.Pod {
pod := &api.Pod{
ObjectMeta: api.ObjectMeta{Namespace: namespace, Name: name},
Status: api.PodStatus{Host: host},
}
info, err := cache.GetPodInfo("host", api.NamespaceDefault, "foo")
if err != nil {
t.Errorf("Unexpected error: %#v", err)
for _, c := range containers {
pod.Spec.Containers = append(pod.Spec.Containers, api.Container{
Name: c,
})
}
if !reflect.DeepEqual(info, expected) {
t.Errorf("Unexpected mismatch. Expected: %#v, Got: #%v", &expected, info)
return pod
}
func makeNode(name string) *api.Node {
return &api.Node{
ObjectMeta: api.ObjectMeta{Name: name},
}
}
func TestPodUpdateAllContainers(t *testing.T) {
pod := api.Pod{
ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: api.NamespaceDefault},
Status: api.PodStatus{
Host: "machine",
pod := makePod(api.NamespaceDefault, "foo", "machine", "bar")
pod2 := makePod(api.NamespaceDefault, "baz", "machine", "qux")
config := podCacheTestConfig{
ipFunc: func(host string) string {
if host == "machine" {
return "1.2.3.5"
}
return ""
},
kubeletContainerInfo: api.PodInfo{"bar": api.ContainerStatus{}},
nodes: []api.Node{*makeNode("machine")},
pods: []api.Pod{*pod, *pod2},
}
pods := []api.Pod{pod}
mockRegistry := registrytest.NewPodRegistry(&api.PodList{Items: pods})
expected := api.PodContainerInfo{
ContainerInfo: api.PodInfo{
"foo": api.ContainerStatus{},
},
}
fake := FakePodInfoGetter{
data: expected,
}
cache := NewPodCache(&fake, mockRegistry)
cache := config.Construct()
cache.UpdateAllContainers()
if fake.host != "machine" || fake.id != "foo" || fake.namespace != api.NamespaceDefault {
t.Errorf("Unexpected access: %#v", fake)
call1 := config.fakePodInfo.calls[podInfoCall{"machine", api.NamespaceDefault, "foo"}]
call2 := config.fakePodInfo.calls[podInfoCall{"machine", api.NamespaceDefault, "baz"}]
if call1 == nil || call1.useCount != 1 {
t.Errorf("Expected 1 call for 'foo': %+v", config.fakePodInfo.calls)
}
if call2 == nil || call2.useCount != 1 {
t.Errorf("Expected 1 call for 'baz': %+v", config.fakePodInfo.calls)
}
if len(config.fakePodInfo.calls) != 2 {
t.Errorf("Expected 2 calls: %+v", config.fakePodInfo.calls)
}
info, err := cache.GetPodInfo("machine", api.NamespaceDefault, "foo")
status, err := cache.GetPodStatus(api.NamespaceDefault, "foo")
if err != nil {
t.Errorf("Unexpected error: %#v", err)
t.Fatalf("Unexpected error: %+v", err)
}
if !reflect.DeepEqual(info, expected) {
t.Errorf("Unexpected mismatch. Expected: %#v, Got: #%v", &expected, info)
if e, a := config.kubeletContainerInfo, status.Info; !reflect.DeepEqual(e, a) {
t.Errorf("Unexpected mismatch. Expected: %+v, Got: %+v", e, a)
}
if e, a := "1.2.3.5", status.HostIP; e != a {
t.Errorf("Unexpected mismatch. Expected: %+v, Got: %+v", e, a)
}
if e, a := 1, len(config.fakeNodes.Actions); e != a {
t.Errorf("Expected: %v, Got: %v; %+v", e, a, config.fakeNodes.Actions)
} else {
if e, a := "get-minion", config.fakeNodes.Actions[0].Action; e != a {
t.Errorf("Expected: %v, Got: %v; %+v", e, a, config.fakeNodes.Actions)
}
}
}
func TestFillPodStatusNoHost(t *testing.T) {
pod := makePod(api.NamespaceDefault, "foo", "", "bar")
config := podCacheTestConfig{
kubeletContainerInfo: api.PodInfo{},
nodes: []api.Node{*makeNode("machine")},
pods: []api.Pod{*pod},
}
cache := config.Construct()
err := cache.updatePodStatus(&config.pods[0])
if err != nil {
t.Fatalf("Unexpected error: %+v", err)
}
status, err := cache.GetPodStatus(pod.Namespace, pod.Name)
if e, a := api.PodPending, status.Phase; e != a {
t.Errorf("Expected: %+v, Got %+v", e, a)
}
}
func TestFillPodStatusMissingMachine(t *testing.T) {
pod := makePod(api.NamespaceDefault, "foo", "machine", "bar")
config := podCacheTestConfig{
kubeletContainerInfo: api.PodInfo{},
nodes: []api.Node{},
pods: []api.Pod{*pod},
}
cache := config.Construct()
err := cache.updatePodStatus(&config.pods[0])
if err != nil {
t.Fatalf("Unexpected error: %+v", err)
}
status, err := cache.GetPodStatus(pod.Namespace, pod.Name)
if e, a := api.PodFailed, status.Phase; e != a {
t.Errorf("Expected: %+v, Got %+v", e, a)
}
}
func TestFillPodStatus(t *testing.T) {
pod := makePod(api.NamespaceDefault, "foo", "machine", "bar")
expectedIP := "1.2.3.4"
expectedTime, _ := time.Parse("2013-Feb-03", "2013-Feb-03")
config := podCacheTestConfig{
kubeletContainerInfo: api.PodInfo{
"net": {
State: api.ContainerState{
Running: &api.ContainerStateRunning{
StartedAt: util.NewTime(expectedTime),
},
},
RestartCount: 1,
PodIP: expectedIP,
},
},
nodes: []api.Node{*makeNode("machine")},
pods: []api.Pod{*pod},
}
cache := config.Construct()
err := cache.updatePodStatus(&config.pods[0])
if err != nil {
t.Fatalf("Unexpected error: %+v", err)
}
status, err := cache.GetPodStatus(pod.Namespace, pod.Name)
if e, a := config.kubeletContainerInfo, status.Info; !reflect.DeepEqual(e, a) {
t.Errorf("Expected: %+v, Got %+v", e, a)
}
if status.PodIP != expectedIP {
t.Errorf("Expected %s, Got %s\n%+v", expectedIP, status.PodIP, status)
}
}
func TestFillPodInfoNoData(t *testing.T) {
pod := makePod(api.NamespaceDefault, "foo", "machine", "bar")
expectedIP := ""
config := podCacheTestConfig{
kubeletContainerInfo: api.PodInfo{
"net": {},
},
nodes: []api.Node{*makeNode("machine")},
pods: []api.Pod{*pod},
}
cache := config.Construct()
err := cache.updatePodStatus(&config.pods[0])
if err != nil {
t.Fatalf("Unexpected error: %+v", err)
}
status, err := cache.GetPodStatus(pod.Namespace, pod.Name)
if e, a := config.kubeletContainerInfo, status.Info; !reflect.DeepEqual(e, a) {
t.Errorf("Expected: %+v, Got %+v", e, a)
}
if status.PodIP != expectedIP {
t.Errorf("Expected %s, Got %s", expectedIP, status.PodIP)
}
}
func TestPodPhaseWithBadNode(t *testing.T) {
desiredState := api.PodSpec{
Containers: []api.Container{
{Name: "containerA"},
{Name: "containerB"},
},
RestartPolicy: api.RestartPolicy{Always: &api.RestartPolicyAlways{}},
}
runningState := api.ContainerStatus{
State: api.ContainerState{
Running: &api.ContainerStateRunning{},
},
}
stoppedState := api.ContainerStatus{
State: api.ContainerState{
Termination: &api.ContainerStateTerminated{},
},
}
tests := []struct {
pod *api.Pod
status api.PodPhase
test string
}{
{
&api.Pod{
Spec: desiredState,
Status: api.PodStatus{
Host: "machine-2",
},
},
api.PodFailed,
"no info, but bad machine",
},
{
&api.Pod{
Spec: desiredState,
Status: api.PodStatus{
Info: map[string]api.ContainerStatus{
"containerA": runningState,
"containerB": runningState,
},
Host: "machine-two",
},
},
api.PodFailed,
"all running but minion is missing",
},
{
&api.Pod{
Spec: desiredState,
Status: api.PodStatus{
Info: map[string]api.ContainerStatus{
"containerA": stoppedState,
"containerB": stoppedState,
},
Host: "machine-two",
},
},
api.PodFailed,
"all stopped but minion missing",
},
}
for _, test := range tests {
config := podCacheTestConfig{
kubeletContainerInfo: test.pod.Status.Info,
nodes: []api.Node{},
pods: []api.Pod{*test.pod},
}
cache := config.Construct()
cache.UpdateAllContainers()
status, err := cache.GetPodStatus(test.pod.Namespace, test.pod.Name)
if err != nil {
t.Errorf("%v: Unexpected error %v", test.test, err)
continue
}
if e, a := test.status, status.Phase; e != a {
t.Errorf("In test %s, expected %v, got %v", test.test, e, a)
}
}
}
func TestPodPhaseWithRestartAlways(t *testing.T) {
desiredState := api.PodSpec{
Containers: []api.Container{
{Name: "containerA"},
{Name: "containerB"},
},
RestartPolicy: api.RestartPolicy{Always: &api.RestartPolicyAlways{}},
}
currentState := api.PodStatus{
Host: "machine",
}
runningState := api.ContainerStatus{
State: api.ContainerState{
Running: &api.ContainerStateRunning{},
},
}
stoppedState := api.ContainerStatus{
State: api.ContainerState{
Termination: &api.ContainerStateTerminated{},
},
}
tests := []struct {
pod *api.Pod
status api.PodPhase
test string
}{
{&api.Pod{Spec: desiredState, Status: currentState}, api.PodPending, "waiting"},
{
&api.Pod{
Spec: desiredState,
Status: api.PodStatus{
Info: map[string]api.ContainerStatus{
"containerA": runningState,
"containerB": runningState,
},
Host: "machine",
},
},
api.PodRunning,
"all running",
},
{
&api.Pod{
Spec: desiredState,
Status: api.PodStatus{
Info: map[string]api.ContainerStatus{
"containerA": stoppedState,
"containerB": stoppedState,
},
Host: "machine",
},
},
api.PodRunning,
"all stopped with restart always",
},
{
&api.Pod{
Spec: desiredState,
Status: api.PodStatus{
Info: map[string]api.ContainerStatus{
"containerA": runningState,
"containerB": stoppedState,
},
Host: "machine",
},
},
api.PodRunning,
"mixed state #1 with restart always",
},
{
&api.Pod{
Spec: desiredState,
Status: api.PodStatus{
Info: map[string]api.ContainerStatus{
"containerA": runningState,
},
Host: "machine",
},
},
api.PodPending,
"mixed state #2 with restart always",
},
}
for _, test := range tests {
if status := getPhase(&test.pod.Spec, test.pod.Status.Info); status != test.status {
t.Errorf("In test %s, expected %v, got %v", test.test, test.status, status)
}
}
}
func TestPodPhaseWithRestartNever(t *testing.T) {
desiredState := api.PodSpec{
Containers: []api.Container{
{Name: "containerA"},
{Name: "containerB"},
},
RestartPolicy: api.RestartPolicy{Never: &api.RestartPolicyNever{}},
}
currentState := api.PodStatus{
Host: "machine",
}
runningState := api.ContainerStatus{
State: api.ContainerState{
Running: &api.ContainerStateRunning{},
},
}
succeededState := api.ContainerStatus{
State: api.ContainerState{
Termination: &api.ContainerStateTerminated{
ExitCode: 0,
},
},
}
failedState := api.ContainerStatus{
State: api.ContainerState{
Termination: &api.ContainerStateTerminated{
ExitCode: -1,
},
},
}
tests := []struct {
pod *api.Pod
status api.PodPhase
test string
}{
{&api.Pod{Spec: desiredState, Status: currentState}, api.PodPending, "waiting"},
{
&api.Pod{
Spec: desiredState,
Status: api.PodStatus{
Info: map[string]api.ContainerStatus{
"containerA": runningState,
"containerB": runningState,
},
Host: "machine",
},
},
api.PodRunning,
"all running with restart never",
},
{
&api.Pod{
Spec: desiredState,
Status: api.PodStatus{
Info: map[string]api.ContainerStatus{
"containerA": succeededState,
"containerB": succeededState,
},
Host: "machine",
},
},
api.PodSucceeded,
"all succeeded with restart never",
},
{
&api.Pod{
Spec: desiredState,
Status: api.PodStatus{
Info: map[string]api.ContainerStatus{
"containerA": failedState,
"containerB": failedState,
},
Host: "machine",
},
},
api.PodFailed,
"all failed with restart never",
},
{
&api.Pod{
Spec: desiredState,
Status: api.PodStatus{
Info: map[string]api.ContainerStatus{
"containerA": runningState,
"containerB": succeededState,
},
Host: "machine",
},
},
api.PodRunning,
"mixed state #1 with restart never",
},
{
&api.Pod{
Spec: desiredState,
Status: api.PodStatus{
Info: map[string]api.ContainerStatus{
"containerA": runningState,
},
Host: "machine",
},
},
api.PodPending,
"mixed state #2 with restart never",
},
}
for _, test := range tests {
if status := getPhase(&test.pod.Spec, test.pod.Status.Info); status != test.status {
t.Errorf("In test %s, expected %v, got %v", test.test, test.status, status)
}
}
}
func TestPodPhaseWithRestartOnFailure(t *testing.T) {
desiredState := api.PodSpec{
Containers: []api.Container{
{Name: "containerA"},
{Name: "containerB"},
},
RestartPolicy: api.RestartPolicy{OnFailure: &api.RestartPolicyOnFailure{}},
}
currentState := api.PodStatus{
Host: "machine",
}
runningState := api.ContainerStatus{
State: api.ContainerState{
Running: &api.ContainerStateRunning{},
},
}
succeededState := api.ContainerStatus{
State: api.ContainerState{
Termination: &api.ContainerStateTerminated{
ExitCode: 0,
},
},
}
failedState := api.ContainerStatus{
State: api.ContainerState{
Termination: &api.ContainerStateTerminated{
ExitCode: -1,
},
},
}
tests := []struct {
pod *api.Pod
status api.PodPhase
test string
}{
{&api.Pod{Spec: desiredState, Status: currentState}, api.PodPending, "waiting"},
{
&api.Pod{
Spec: desiredState,
Status: api.PodStatus{
Info: map[string]api.ContainerStatus{
"containerA": runningState,
"containerB": runningState,
},
Host: "machine",
},
},
api.PodRunning,
"all running with restart onfailure",
},
{
&api.Pod{
Spec: desiredState,
Status: api.PodStatus{
Info: map[string]api.ContainerStatus{
"containerA": succeededState,
"containerB": succeededState,
},
Host: "machine",
},
},
api.PodSucceeded,
"all succeeded with restart onfailure",
},
{
&api.Pod{
Spec: desiredState,
Status: api.PodStatus{
Info: map[string]api.ContainerStatus{
"containerA": failedState,
"containerB": failedState,
},
Host: "machine",
},
},
api.PodRunning,
"all failed with restart never",
},
{
&api.Pod{
Spec: desiredState,
Status: api.PodStatus{
Info: map[string]api.ContainerStatus{
"containerA": runningState,
"containerB": succeededState,
},
Host: "machine",
},
},
api.PodRunning,
"mixed state #1 with restart onfailure",
},
{
&api.Pod{
Spec: desiredState,
Status: api.PodStatus{
Info: map[string]api.ContainerStatus{
"containerA": runningState,
},
Host: "machine",
},
},
api.PodPending,
"mixed state #2 with restart onfailure",
},
}
for _, test := range tests {
if status := getPhase(&test.pod.Spec, test.pod.Status.Info); status != test.status {
t.Errorf("In test %s, expected %v, got %v", test.test, test.status, status)
}
}
}

View File

@ -18,72 +18,37 @@ package pod
import (
"fmt"
"sync"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/v1beta1"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/validation"
"github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
"github.com/golang/glog"
)
type ipCacheEntry struct {
ip string
lastUpdate time.Time
}
type ipCache map[string]ipCacheEntry
type clock interface {
Now() time.Time
}
type realClock struct{}
func (r realClock) Now() time.Time {
return time.Now()
type PodStatusGetter interface {
GetPodStatus(namespace, name string) (*api.PodStatus, error)
}
// REST implements the RESTStorage interface in terms of a PodRegistry.
type REST struct {
cloudProvider cloudprovider.Interface
mu sync.Mutex
podCache client.PodInfoGetter
podInfoGetter client.PodInfoGetter
podPollPeriod time.Duration
registry Registry
nodes client.NodeInterface
ipCache ipCache
clock clock
podCache PodStatusGetter
registry Registry
}
type RESTConfig struct {
CloudProvider cloudprovider.Interface
PodCache client.PodInfoGetter
PodInfoGetter client.PodInfoGetter
Registry Registry
Nodes client.NodeInterface
PodCache PodStatusGetter
Registry Registry
}
// NewREST returns a new REST.
func NewREST(config *RESTConfig) *REST {
return &REST{
cloudProvider: config.CloudProvider,
podCache: config.PodCache,
podInfoGetter: config.PodInfoGetter,
podPollPeriod: time.Second * 10,
registry: config.Registry,
nodes: config.Nodes,
ipCache: ipCache{},
clock: realClock{},
podCache: config.PodCache,
registry: config.Registry,
}
}
@ -123,17 +88,17 @@ func (rs *REST) Get(ctx api.Context, id string) (runtime.Object, error) {
if pod == nil {
return pod, nil
}
if rs.podCache != nil || rs.podInfoGetter != nil {
rs.fillPodInfo(pod)
status, err := getPodStatus(pod, rs.nodes)
if err != nil {
return pod, err
host := pod.Status.Host
if status, err := rs.podCache.GetPodStatus(pod.Namespace, pod.Name); err != nil {
pod.Status = api.PodStatus{
Phase: api.PodUnknown,
}
pod.Status.Phase = status
}
if pod.Status.Host != "" {
pod.Status.HostIP = rs.getInstanceIP(pod.Status.Host)
} else {
pod.Status = *status
}
// Make sure not to hide a recent host with an old one from the cache.
// TODO: move host to spec
pod.Status.Host = host
return pod, err
}
@ -168,15 +133,18 @@ func (rs *REST) List(ctx api.Context, label, field labels.Selector) (runtime.Obj
if err == nil {
for i := range pods.Items {
pod := &pods.Items[i]
rs.fillPodInfo(pod)
status, err := getPodStatus(pod, rs.nodes)
if err != nil {
status = api.PodUnknown
}
pod.Status.Phase = status
if pod.Status.Host != "" {
pod.Status.HostIP = rs.getInstanceIP(pod.Status.Host)
host := pod.Status.Host
if status, err := rs.podCache.GetPodStatus(pod.Namespace, pod.Name); err != nil {
pod.Status = api.PodStatus{
Phase: api.PodUnknown,
}
} else {
pod.Status = *status
}
// Make sure not to hide a recent host with an old one from the cache.
// This is tested by the integration test.
// TODO: move host to spec
pod.Status.Host = host
}
}
return pods, err
@ -207,148 +175,3 @@ func (rs *REST) Update(ctx api.Context, obj runtime.Object) (<-chan apiserver.RE
return rs.registry.GetPod(ctx, pod.Name)
}), nil
}
func (rs *REST) fillPodInfo(pod *api.Pod) {
if pod.Status.Host == "" {
return
}
// Get cached info for the list currently.
// TODO: Optionally use fresh info
if rs.podCache != nil {
info, err := rs.podCache.GetPodInfo(pod.Status.Host, pod.Namespace, pod.Name)
if err != nil {
if err != client.ErrPodInfoNotAvailable {
glog.Errorf("Error getting container info from cache: %v", err)
}
if rs.podInfoGetter != nil {
info, err = rs.podInfoGetter.GetPodInfo(pod.Status.Host, pod.Namespace, pod.Name)
}
if err != nil {
if err != client.ErrPodInfoNotAvailable {
glog.Errorf("Error getting fresh container info: %v", err)
}
return
}
}
pod.Status.Info = info.ContainerInfo
netContainerInfo, ok := pod.Status.Info["net"]
if ok {
if netContainerInfo.PodIP != "" {
pod.Status.PodIP = netContainerInfo.PodIP
} else if netContainerInfo.State.Running != nil {
glog.Warningf("No network settings: %#v", netContainerInfo)
}
} else {
glog.Warningf("Couldn't find network container for %s in %v", pod.Name, info)
}
}
}
func (rs *REST) getInstanceIP(host string) string {
data, ok := rs.ipCache[host]
now := rs.clock.Now()
if !ok || now.Sub(data.lastUpdate) > (30*time.Second) {
ip := getInstanceIPFromCloud(rs.cloudProvider, host)
data = ipCacheEntry{
ip: ip,
lastUpdate: now,
}
rs.ipCache[host] = data
}
return data.ip
}
func getInstanceIPFromCloud(cloud cloudprovider.Interface, host string) string {
if cloud == nil {
return ""
}
instances, ok := cloud.Instances()
if instances == nil || !ok {
return ""
}
addr, err := instances.IPAddress(host)
if err != nil {
glog.Errorf("Error getting instance IP for %q: %v", host, err)
return ""
}
return addr.String()
}
func getPodStatus(pod *api.Pod, nodes client.NodeInterface) (api.PodPhase, error) {
if pod.Status.Host == "" {
return api.PodPending, nil
}
if nodes != nil {
_, err := nodes.Get(pod.Status.Host)
if err != nil {
if errors.IsNotFound(err) {
return api.PodFailed, nil
}
glog.Errorf("Error getting pod info: %v", err)
return api.PodUnknown, nil
}
} else {
glog.Errorf("Unexpected missing minion interface, status may be in-accurate")
}
if pod.Status.Info == nil {
return api.PodPending, nil
}
// TODO(dchen1107): move the entire logic to kubelet?
running := 0
waiting := 0
stopped := 0
failed := 0
succeeded := 0
unknown := 0
for _, container := range pod.Spec.Containers {
if containerStatus, ok := pod.Status.Info[container.Name]; ok {
if containerStatus.State.Running != nil {
running++
} else if containerStatus.State.Termination != nil {
stopped++
if containerStatus.State.Termination.ExitCode == 0 {
succeeded++
} else {
failed++
}
} else if containerStatus.State.Waiting != nil {
waiting++
} else {
unknown++
}
} else {
unknown++
}
}
switch {
case waiting > 0:
// One or more containers has not been started
return api.PodPending, nil
case running > 0 && unknown == 0:
// All containers have been started, and at least
// one container is running
return api.PodRunning, nil
case running == 0 && stopped > 0 && unknown == 0:
// All containers are terminated
if pod.Spec.RestartPolicy.Always != nil {
// All containers are in the process of restarting
return api.PodRunning, nil
}
if stopped == succeeded {
// RestartPolicy is not Always, and all
// containers are terminated in success
return api.PodSucceeded, nil
}
if pod.Spec.RestartPolicy.Never != nil {
// RestartPolicy is Never, and all containers are
// terminated with at least one in failure
return api.PodFailed, nil
}
// RestartPolicy is OnFailure, and at least one in failure
// and in the process of restarting
return api.PodRunning, nil
default:
return api.PodPending, nil
}
}

View File

@ -28,12 +28,25 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/latest"
"github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
fake_cloud "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/fake"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/registrytest"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
)
type fakeCache struct {
requestedNamespace string
requestedName string
statusToReturn *api.PodStatus
errorToReturn error
}
func (f *fakeCache) GetPodStatus(namespace, name string) (*api.PodStatus, error) {
f.requestedNamespace = namespace
f.requestedName = name
return f.statusToReturn, f.errorToReturn
}
func expectApiStatusError(t *testing.T, ch <-chan apiserver.RESTResult, msg string) {
out := <-ch
status, ok := out.Object.(*api.Status)
@ -61,6 +74,7 @@ func TestCreatePodRegistryError(t *testing.T) {
podRegistry.Err = fmt.Errorf("test error")
storage := REST{
registry: podRegistry,
podCache: &fakeCache{statusToReturn: &api.PodStatus{}},
}
pod := &api.Pod{}
ctx := api.NewDefaultContext()
@ -76,6 +90,7 @@ func TestCreatePodSetsIds(t *testing.T) {
podRegistry.Err = fmt.Errorf("test error")
storage := REST{
registry: podRegistry,
podCache: &fakeCache{statusToReturn: &api.PodStatus{}},
}
pod := &api.Pod{}
ctx := api.NewDefaultContext()
@ -98,6 +113,7 @@ func TestCreatePodSetsUID(t *testing.T) {
podRegistry.Err = fmt.Errorf("test error")
storage := REST{
registry: podRegistry,
podCache: &fakeCache{statusToReturn: &api.PodStatus{}},
}
pod := &api.Pod{}
ctx := api.NewDefaultContext()
@ -117,6 +133,7 @@ func TestListPodsError(t *testing.T) {
podRegistry.Err = fmt.Errorf("test error")
storage := REST{
registry: podRegistry,
podCache: &fakeCache{statusToReturn: &api.PodStatus{}},
}
ctx := api.NewContext()
pods, err := storage.List(ctx, labels.Everything(), labels.Everything())
@ -128,10 +145,40 @@ func TestListPodsError(t *testing.T) {
}
}
func TestListPodsCacheError(t *testing.T) {
podRegistry := registrytest.NewPodRegistry(nil)
podRegistry.Pods = &api.PodList{
Items: []api.Pod{
{
ObjectMeta: api.ObjectMeta{
Name: "foo",
},
},
},
}
storage := REST{
registry: podRegistry,
podCache: &fakeCache{errorToReturn: client.ErrPodInfoNotAvailable},
}
ctx := api.NewContext()
pods, err := storage.List(ctx, labels.Everything(), labels.Everything())
if err != nil {
t.Fatalf("Expected no error, got %#v", err)
}
pl := pods.(*api.PodList)
if len(pl.Items) != 1 {
t.Fatalf("Unexpected 0-len pod list: %+v", pl)
}
if e, a := api.PodUnknown, pl.Items[0].Status.Phase; e != a {
t.Errorf("Expected %v, got %v", e, a)
}
}
func TestListEmptyPodList(t *testing.T) {
podRegistry := registrytest.NewPodRegistry(&api.PodList{ListMeta: api.ListMeta{ResourceVersion: "1"}})
storage := REST{
registry: podRegistry,
podCache: &fakeCache{statusToReturn: &api.PodStatus{}},
}
ctx := api.NewContext()
pods, err := storage.List(ctx, labels.Everything(), labels.Everything())
@ -147,14 +194,6 @@ func TestListEmptyPodList(t *testing.T) {
}
}
type fakeClock struct {
t time.Time
}
func (f *fakeClock) Now() time.Time {
return f.t
}
func TestListPodList(t *testing.T) {
podRegistry := registrytest.NewPodRegistry(nil)
podRegistry.Pods = &api.PodList{
@ -173,8 +212,7 @@ func TestListPodList(t *testing.T) {
}
storage := REST{
registry: podRegistry,
ipCache: ipCache{},
clock: &fakeClock{},
podCache: &fakeCache{statusToReturn: &api.PodStatus{Phase: api.PodRunning}},
}
ctx := api.NewContext()
podsObj, err := storage.List(ctx, labels.Everything(), labels.Everything())
@ -186,7 +224,7 @@ func TestListPodList(t *testing.T) {
if len(pods.Items) != 2 {
t.Errorf("Unexpected pod list: %#v", pods)
}
if pods.Items[0].Name != "foo" {
if pods.Items[0].Name != "foo" || pods.Items[0].Status.Phase != api.PodRunning {
t.Errorf("Unexpected pod: %#v", pods.Items[0])
}
if pods.Items[1].Name != "bar" {
@ -218,8 +256,7 @@ func TestListPodListSelection(t *testing.T) {
}
storage := REST{
registry: podRegistry,
ipCache: ipCache{},
clock: &fakeClock{},
podCache: &fakeCache{statusToReturn: &api.PodStatus{}},
}
ctx := api.NewContext()
@ -283,6 +320,7 @@ func TestPodDecode(t *testing.T) {
podRegistry := registrytest.NewPodRegistry(nil)
storage := REST{
registry: podRegistry,
podCache: &fakeCache{statusToReturn: &api.PodStatus{}},
}
expected := &api.Pod{
ObjectMeta: api.ObjectMeta{
@ -305,12 +343,37 @@ func TestPodDecode(t *testing.T) {
}
func TestGetPod(t *testing.T) {
podRegistry := registrytest.NewPodRegistry(nil)
podRegistry.Pod = &api.Pod{
ObjectMeta: api.ObjectMeta{Name: "foo"},
Status: api.PodStatus{Host: "machine"},
}
storage := REST{
registry: podRegistry,
podCache: &fakeCache{statusToReturn: &api.PodStatus{Phase: api.PodRunning}},
}
ctx := api.NewContext()
obj, err := storage.Get(ctx, "foo")
pod := obj.(*api.Pod)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
expect := *podRegistry.Pod
expect.Status.Phase = api.PodRunning
// TODO: when host is moved to spec, remove this line.
expect.Status.Host = "machine"
if e, a := &expect, pod; !reflect.DeepEqual(e, a) {
t.Errorf("Unexpected pod. Expected %#v, Got %#v", e, a)
}
}
func TestGetPodCacheError(t *testing.T) {
podRegistry := registrytest.NewPodRegistry(nil)
podRegistry.Pod = &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}
storage := REST{
registry: podRegistry,
ipCache: ipCache{},
clock: &fakeClock{},
podCache: &fakeCache{errorToReturn: client.ErrPodInfoNotAvailable},
}
ctx := api.NewContext()
obj, err := storage.Get(ctx, "foo")
@ -319,497 +382,19 @@ func TestGetPod(t *testing.T) {
t.Errorf("unexpected error: %v", err)
}
if e, a := podRegistry.Pod, pod; !reflect.DeepEqual(e, a) {
expect := *podRegistry.Pod
expect.Status.Phase = api.PodUnknown
if e, a := &expect, pod; !reflect.DeepEqual(e, a) {
t.Errorf("Unexpected pod. Expected %#v, Got %#v", e, a)
}
}
func TestGetPodCloud(t *testing.T) {
fakeCloud := &fake_cloud.FakeCloud{}
podRegistry := registrytest.NewPodRegistry(nil)
podRegistry.Pod = &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}, Status: api.PodStatus{Host: "machine"}}
clock := &fakeClock{t: time.Now()}
storage := REST{
registry: podRegistry,
cloudProvider: fakeCloud,
ipCache: ipCache{},
clock: clock,
}
ctx := api.NewContext()
obj, err := storage.Get(ctx, "foo")
pod := obj.(*api.Pod)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if e, a := podRegistry.Pod, pod; !reflect.DeepEqual(e, a) {
t.Errorf("Unexpected pod. Expected %#v, Got %#v", e, a)
}
// This call should hit the cache, so we expect no additional calls to the cloud
obj, err = storage.Get(ctx, "foo")
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if len(fakeCloud.Calls) != 1 || fakeCloud.Calls[0] != "ip-address" {
t.Errorf("Unexpected calls: %#v", fakeCloud.Calls)
}
// Advance the clock, this call should miss the cache, so expect one more call.
clock.t = clock.t.Add(60 * time.Second)
obj, err = storage.Get(ctx, "foo")
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if len(fakeCloud.Calls) != 2 || fakeCloud.Calls[1] != "ip-address" {
t.Errorf("Unexpected calls: %#v", fakeCloud.Calls)
}
}
func TestPodStatusWithBadNode(t *testing.T) {
fakeClient := client.Fake{
MinionsList: api.NodeList{
Items: []api.Node{
{
ObjectMeta: api.ObjectMeta{Name: "machine"},
},
},
},
}
desiredState := api.PodSpec{
Containers: []api.Container{
{Name: "containerA"},
{Name: "containerB"},
},
RestartPolicy: api.RestartPolicy{Always: &api.RestartPolicyAlways{}},
}
runningState := api.ContainerStatus{
State: api.ContainerState{
Running: &api.ContainerStateRunning{},
},
}
stoppedState := api.ContainerStatus{
State: api.ContainerState{
Termination: &api.ContainerStateTerminated{},
},
}
tests := []struct {
pod *api.Pod
status api.PodPhase
test string
}{
{
&api.Pod{
Spec: desiredState,
Status: api.PodStatus{
Host: "machine-2",
},
},
api.PodFailed,
"no info, but bad machine",
},
{
&api.Pod{
Spec: desiredState,
Status: api.PodStatus{
Info: map[string]api.ContainerStatus{
"containerA": runningState,
"containerB": runningState,
},
Host: "machine-two",
},
},
api.PodFailed,
"all running but minion is missing",
},
{
&api.Pod{
Spec: desiredState,
Status: api.PodStatus{
Info: map[string]api.ContainerStatus{
"containerA": stoppedState,
"containerB": stoppedState,
},
Host: "machine-two",
},
},
api.PodFailed,
"all stopped but minion missing",
},
}
for _, test := range tests {
if status, err := getPodStatus(test.pod, fakeClient.Nodes()); status != test.status {
t.Errorf("In test %s, expected %v, got %v", test.test, test.status, status)
if err != nil {
t.Errorf("In test %s, unexpected error: %v", test.test, err)
}
}
}
}
func TestPodStatusWithRestartAlways(t *testing.T) {
fakeClient := client.Fake{
MinionsList: api.NodeList{
Items: []api.Node{
{
ObjectMeta: api.ObjectMeta{Name: "machine"},
},
},
},
}
desiredState := api.PodSpec{
Containers: []api.Container{
{Name: "containerA"},
{Name: "containerB"},
},
RestartPolicy: api.RestartPolicy{Always: &api.RestartPolicyAlways{}},
}
currentState := api.PodStatus{
Host: "machine",
}
runningState := api.ContainerStatus{
State: api.ContainerState{
Running: &api.ContainerStateRunning{},
},
}
stoppedState := api.ContainerStatus{
State: api.ContainerState{
Termination: &api.ContainerStateTerminated{},
},
}
tests := []struct {
pod *api.Pod
status api.PodPhase
test string
}{
{&api.Pod{Spec: desiredState, Status: currentState}, api.PodPending, "waiting"},
{
&api.Pod{
Spec: desiredState,
Status: api.PodStatus{
Info: map[string]api.ContainerStatus{
"containerA": runningState,
"containerB": runningState,
},
Host: "machine",
},
},
api.PodRunning,
"all running",
},
{
&api.Pod{
Spec: desiredState,
Status: api.PodStatus{
Info: map[string]api.ContainerStatus{
"containerA": stoppedState,
"containerB": stoppedState,
},
Host: "machine",
},
},
api.PodRunning,
"all stopped with restart always",
},
{
&api.Pod{
Spec: desiredState,
Status: api.PodStatus{
Info: map[string]api.ContainerStatus{
"containerA": runningState,
"containerB": stoppedState,
},
Host: "machine",
},
},
api.PodRunning,
"mixed state #1 with restart always",
},
{
&api.Pod{
Spec: desiredState,
Status: api.PodStatus{
Info: map[string]api.ContainerStatus{
"containerA": runningState,
},
Host: "machine",
},
},
api.PodPending,
"mixed state #2 with restart always",
},
}
for _, test := range tests {
if status, err := getPodStatus(test.pod, fakeClient.Nodes()); status != test.status {
t.Errorf("In test %s, expected %v, got %v", test.test, test.status, status)
if err != nil {
t.Errorf("In test %s, unexpected error: %v", test.test, err)
}
}
}
}
func TestPodStatusWithRestartNever(t *testing.T) {
fakeClient := client.Fake{
MinionsList: api.NodeList{
Items: []api.Node{
{
ObjectMeta: api.ObjectMeta{Name: "machine"},
},
},
},
}
desiredState := api.PodSpec{
Containers: []api.Container{
{Name: "containerA"},
{Name: "containerB"},
},
RestartPolicy: api.RestartPolicy{Never: &api.RestartPolicyNever{}},
}
currentState := api.PodStatus{
Host: "machine",
}
runningState := api.ContainerStatus{
State: api.ContainerState{
Running: &api.ContainerStateRunning{},
},
}
succeededState := api.ContainerStatus{
State: api.ContainerState{
Termination: &api.ContainerStateTerminated{
ExitCode: 0,
},
},
}
failedState := api.ContainerStatus{
State: api.ContainerState{
Termination: &api.ContainerStateTerminated{
ExitCode: -1,
},
},
}
tests := []struct {
pod *api.Pod
status api.PodPhase
test string
}{
{&api.Pod{Spec: desiredState, Status: currentState}, api.PodPending, "waiting"},
{
&api.Pod{
Spec: desiredState,
Status: api.PodStatus{
Info: map[string]api.ContainerStatus{
"containerA": runningState,
"containerB": runningState,
},
Host: "machine",
},
},
api.PodRunning,
"all running with restart never",
},
{
&api.Pod{
Spec: desiredState,
Status: api.PodStatus{
Info: map[string]api.ContainerStatus{
"containerA": succeededState,
"containerB": succeededState,
},
Host: "machine",
},
},
api.PodSucceeded,
"all succeeded with restart never",
},
{
&api.Pod{
Spec: desiredState,
Status: api.PodStatus{
Info: map[string]api.ContainerStatus{
"containerA": failedState,
"containerB": failedState,
},
Host: "machine",
},
},
api.PodFailed,
"all failed with restart never",
},
{
&api.Pod{
Spec: desiredState,
Status: api.PodStatus{
Info: map[string]api.ContainerStatus{
"containerA": runningState,
"containerB": succeededState,
},
Host: "machine",
},
},
api.PodRunning,
"mixed state #1 with restart never",
},
{
&api.Pod{
Spec: desiredState,
Status: api.PodStatus{
Info: map[string]api.ContainerStatus{
"containerA": runningState,
},
Host: "machine",
},
},
api.PodPending,
"mixed state #2 with restart never",
},
}
for _, test := range tests {
if status, err := getPodStatus(test.pod, fakeClient.Nodes()); status != test.status {
t.Errorf("In test %s, expected %v, got %v", test.test, test.status, status)
if err != nil {
t.Errorf("In test %s, unexpected error: %v", test.test, err)
}
}
}
}
func TestPodStatusWithRestartOnFailure(t *testing.T) {
fakeClient := client.Fake{
MinionsList: api.NodeList{
Items: []api.Node{
{
ObjectMeta: api.ObjectMeta{Name: "machine"},
},
},
},
}
desiredState := api.PodSpec{
Containers: []api.Container{
{Name: "containerA"},
{Name: "containerB"},
},
RestartPolicy: api.RestartPolicy{OnFailure: &api.RestartPolicyOnFailure{}},
}
currentState := api.PodStatus{
Host: "machine",
}
runningState := api.ContainerStatus{
State: api.ContainerState{
Running: &api.ContainerStateRunning{},
},
}
succeededState := api.ContainerStatus{
State: api.ContainerState{
Termination: &api.ContainerStateTerminated{
ExitCode: 0,
},
},
}
failedState := api.ContainerStatus{
State: api.ContainerState{
Termination: &api.ContainerStateTerminated{
ExitCode: -1,
},
},
}
tests := []struct {
pod *api.Pod
status api.PodPhase
test string
}{
{&api.Pod{Spec: desiredState, Status: currentState}, api.PodPending, "waiting"},
{
&api.Pod{
Spec: desiredState,
Status: api.PodStatus{
Info: map[string]api.ContainerStatus{
"containerA": runningState,
"containerB": runningState,
},
Host: "machine",
},
},
api.PodRunning,
"all running with restart onfailure",
},
{
&api.Pod{
Spec: desiredState,
Status: api.PodStatus{
Info: map[string]api.ContainerStatus{
"containerA": succeededState,
"containerB": succeededState,
},
Host: "machine",
},
},
api.PodSucceeded,
"all succeeded with restart onfailure",
},
{
&api.Pod{
Spec: desiredState,
Status: api.PodStatus{
Info: map[string]api.ContainerStatus{
"containerA": failedState,
"containerB": failedState,
},
Host: "machine",
},
},
api.PodRunning,
"all failed with restart never",
},
{
&api.Pod{
Spec: desiredState,
Status: api.PodStatus{
Info: map[string]api.ContainerStatus{
"containerA": runningState,
"containerB": succeededState,
},
Host: "machine",
},
},
api.PodRunning,
"mixed state #1 with restart onfailure",
},
{
&api.Pod{
Spec: desiredState,
Status: api.PodStatus{
Info: map[string]api.ContainerStatus{
"containerA": runningState,
},
Host: "machine",
},
},
api.PodPending,
"mixed state #2 with restart onfailure",
},
}
for _, test := range tests {
if status, err := getPodStatus(test.pod, fakeClient.Nodes()); status != test.status {
t.Errorf("In test %s, expected %v, got %v", test.test, test.status, status)
if err != nil {
t.Errorf("In test %s, unexpected error: %v", test.test, err)
}
}
}
}
func TestPodStorageValidatesCreate(t *testing.T) {
podRegistry := registrytest.NewPodRegistry(nil)
podRegistry.Err = fmt.Errorf("test error")
storage := REST{
registry: podRegistry,
podCache: &fakeCache{statusToReturn: &api.PodStatus{}},
}
ctx := api.NewDefaultContext()
pod := &api.Pod{
@ -837,8 +422,8 @@ func TestCreatePod(t *testing.T) {
},
}
storage := REST{
registry: podRegistry,
podPollPeriod: time.Millisecond * 100,
registry: podRegistry,
podCache: &fakeCache{statusToReturn: &api.PodStatus{}},
}
pod := &api.Pod{}
pod.Name = "foo"
@ -867,57 +452,6 @@ func (f *FakePodInfoGetter) GetPodInfo(host, podNamespace string, podID string)
return api.PodContainerInfo{ContainerInfo: f.info}, f.err
}
func TestFillPodInfo(t *testing.T) {
expectedIP := "1.2.3.4"
expectedTime, _ := time.Parse("2013-Feb-03", "2013-Feb-03")
fakeGetter := FakePodInfoGetter{
info: map[string]api.ContainerStatus{
"net": {
State: api.ContainerState{
Running: &api.ContainerStateRunning{
StartedAt: util.NewTime(expectedTime),
},
},
RestartCount: 1,
PodIP: expectedIP,
},
},
}
storage := REST{
podCache: &fakeGetter,
}
pod := api.Pod{Status: api.PodStatus{Host: "foo"}}
storage.fillPodInfo(&pod)
if !reflect.DeepEqual(fakeGetter.info, pod.Status.Info) {
t.Errorf("Expected: %#v, Got %#v", fakeGetter.info, pod.Status.Info)
}
if pod.Status.PodIP != expectedIP {
t.Errorf("Expected %s, Got %s", expectedIP, pod.Status.PodIP)
}
}
func TestFillPodInfoNoData(t *testing.T) {
expectedIP := ""
fakeGetter := FakePodInfoGetter{
info: map[string]api.ContainerStatus{
"net": {
State: api.ContainerState{},
},
},
}
storage := REST{
podCache: &fakeGetter,
}
pod := api.Pod{Status: api.PodStatus{Host: "foo"}}
storage.fillPodInfo(&pod)
if !reflect.DeepEqual(fakeGetter.info, pod.Status.Info) {
t.Errorf("Expected %#v, Got %#v", fakeGetter.info, pod.Status.Info)
}
if pod.Status.PodIP != expectedIP {
t.Errorf("Expected %s, Got %s", expectedIP, pod.Status.PodIP)
}
}
func TestCreatePodWithConflictingNamespace(t *testing.T) {
storage := REST{}
pod := &api.Pod{

45
pkg/util/clock.go Normal file
View File

@ -0,0 +1,45 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"time"
)
// Clock allows for injecting fake or real clocks into code that
// needs to do arbitrary things based on time.
type Clock interface {
Now() time.Time
}
// RealClock really calls time.Now()
type RealClock struct{}
// Now returns the current time.
func (r RealClock) Now() time.Time {
return time.Now()
}
// FakeClock implements Clock, but returns an arbitary time.
type FakeClock struct {
Time time.Time
}
// Now returns f's time.
func (f *FakeClock) Now() time.Time {
return f.Time
}