From 881be6c9529b4ebe3b08be157fa97187473283ca Mon Sep 17 00:00:00 2001 From: Deyuan Deng Date: Sun, 25 Jan 2015 11:01:53 -0500 Subject: [PATCH] fix pod-cache with node semantic change --- pkg/master/pod_cache.go | 56 +++++++++++++++----------- pkg/master/pod_cache_test.go | 76 ++++++++++++++++++++++++++++-------- 2 files changed, 93 insertions(+), 39 deletions(-) diff --git a/pkg/master/pod_cache.go b/pkg/master/pod_cache.go index 7b2ba52a0ba..50a293417c7 100644 --- a/pkg/master/pod_cache.go +++ b/pkg/master/pod_cache.go @@ -20,7 +20,6 @@ import ( "sync" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" - "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" "github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/pod" @@ -47,7 +46,7 @@ type PodCache struct { podStatus map[objKey]api.PodStatus // nodes that we know exist. Cleared at the beginning of each // UpdateAllPods call. - currentNodes map[objKey]bool + currentNodes map[objKey]api.NodeStatus } type objKey struct { @@ -63,7 +62,7 @@ func NewPodCache(ipCache IPGetter, info client.PodInfoGetter, nodes client.NodeI containerInfo: info, pods: pods, nodes: nodes, - currentNodes: map[objKey]bool{}, + currentNodes: map[objKey]api.NodeStatus{}, podStatus: map[objKey]api.PodStatus{}, } } @@ -80,37 +79,34 @@ func (p *PodCache) GetPodStatus(namespace, name string) (*api.PodStatus, error) return &value, nil } -func (p *PodCache) nodeExistsInCache(name string) (exists, cacheHit bool) { +func (p *PodCache) getNodeStatusInCache(name string) (*api.NodeStatus, bool) { p.lock.Lock() defer p.lock.Unlock() - exists, cacheHit = p.currentNodes[objKey{"", name}] - return exists, cacheHit + nodeStatus, cacheHit := p.currentNodes[objKey{"", name}] + return &nodeStatus, cacheHit } // lock must *not* be held -func (p *PodCache) nodeExists(name string) bool { - exists, cacheHit := p.nodeExistsInCache(name) +func (p *PodCache) getNodeStatus(name string) (*api.NodeStatus, error) { + nodeStatus, cacheHit := p.getNodeStatusInCache(name) if cacheHit { - return exists + return nodeStatus, nil } // TODO: suppose there's N concurrent requests for node "foo"; in that case // it might be useful to block all of them and only look up "foo" once. // (This code will make up to N lookups.) One way of doing that would be to // have a pool of M mutexes and require that before looking up "foo" you must // lock mutex hash("foo") % M. - _, err := p.nodes.Get(name) - exists = true + node, err := p.nodes.Get(name) if err != nil { - exists = false - if !errors.IsNotFound(err) { - glog.Errorf("Unexpected error type verifying minion existence: %+v", err) - } + glog.Errorf("Unexpected error verifying node existence: %+v", err) + return nil, err } p.lock.Lock() defer p.lock.Unlock() - p.currentNodes[objKey{"", name}] = exists - return exists + p.currentNodes[objKey{"", name}] = node.Status + return &node.Status, nil } // TODO: once Host gets moved to spec, this can take a podSpec + metadata instead of an @@ -138,12 +134,26 @@ func (p *PodCache) computePodStatus(pod *api.Pod) (api.PodStatus, error) { return newStatus, nil } - if !p.nodeExists(pod.Status.Host) { - // Assigned to non-existing node. - newStatus.Phase = api.PodFailed + nodeStatus, err := p.getNodeStatus(pod.Status.Host) + + // Assigned to non-existing node. + if err != nil || len(nodeStatus.Conditions) == 0 { + newStatus.Phase = api.PodUnknown return newStatus, nil } + // Assigned to an unhealthy node. + for _, condition := range nodeStatus.Conditions { + if condition.Kind == api.NodeReady && condition.Status == api.ConditionNone { + newStatus.Phase = api.PodUnknown + return newStatus, nil + } + if condition.Kind == api.NodeReachable && condition.Status == api.ConditionNone { + newStatus.Phase = api.PodUnknown + return newStatus, nil + } + } + result, err := p.containerInfo.GetPodStatus(pod.Status.Host, pod.Namespace, pod.Name) newStatus.HostIP = p.ipCache.GetInstanceIP(pod.Status.Host) @@ -161,10 +171,10 @@ func (p *PodCache) computePodStatus(pod *api.Pod) (api.PodStatus, error) { return newStatus, err } -func (p *PodCache) resetNodeExistenceCache() { +func (p *PodCache) resetNodeStatusCache() { p.lock.Lock() defer p.lock.Unlock() - p.currentNodes = map[objKey]bool{} + p.currentNodes = map[objKey]api.NodeStatus{} } // UpdateAllContainers updates information about all containers. @@ -172,7 +182,7 @@ func (p *PodCache) resetNodeExistenceCache() { // calling again, or risk having new info getting clobbered by delayed // old info. func (p *PodCache) UpdateAllContainers() { - p.resetNodeExistenceCache() + p.resetNodeStatusCache() ctx := api.NewContext() pods, err := p.pods.ListPods(ctx, labels.Everything()) diff --git a/pkg/master/pod_cache_test.go b/pkg/master/pod_cache_test.go index 37f7257ad9f..b3532ed91a5 100644 --- a/pkg/master/pod_cache_test.go +++ b/pkg/master/pod_cache_test.go @@ -186,21 +186,31 @@ func makePod(namespace, name, host string, containers ...string) *api.Pod { Status: api.PodStatus{Host: host}, } for _, c := range containers { - pod.Spec.Containers = append(pod.Spec.Containers, api.Container{ - Name: c, - }) + pod.Spec.Containers = append(pod.Spec.Containers, api.Container{Name: c}) } return pod } -func makeNode(name string) *api.Node { +func makeHealthyNode(name string) *api.Node { return &api.Node{ ObjectMeta: api.ObjectMeta{Name: name}, + Status: api.NodeStatus{Conditions: []api.NodeCondition{ + {Kind: api.NodeReady, Status: api.ConditionFull}, + }}, + } +} + +func makeUnhealthyNode(name string) *api.Node { + return &api.Node{ + ObjectMeta: api.ObjectMeta{Name: name}, + Status: api.NodeStatus{Conditions: []api.NodeCondition{ + {Kind: api.NodeReady, Status: api.ConditionNone}, + }}, } } func TestPodUpdateAllContainers(t *testing.T) { - pod := makePod(api.NamespaceDefault, "foo", "machine", "bar") + pod1 := makePod(api.NamespaceDefault, "foo", "machine", "bar") pod2 := makePod(api.NamespaceDefault, "baz", "machine", "qux") config := podCacheTestConfig{ ipFunc: func(host string) string { @@ -211,8 +221,8 @@ func TestPodUpdateAllContainers(t *testing.T) { }, kubeletContainerInfo: api.PodStatus{ Info: api.PodInfo{"bar": api.ContainerStatus{}}}, - nodes: []api.Node{*makeNode("machine")}, - pods: []api.Pod{*pod, *pod2}, + nodes: []api.Node{*makeHealthyNode("machine")}, + pods: []api.Pod{*pod1, *pod2}, } cache := config.Construct() @@ -254,7 +264,7 @@ func TestFillPodStatusNoHost(t *testing.T) { pod := makePod(api.NamespaceDefault, "foo", "", "bar") config := podCacheTestConfig{ kubeletContainerInfo: api.PodStatus{}, - nodes: []api.Node{*makeNode("machine")}, + nodes: []api.Node{*makeHealthyNode("machine")}, pods: []api.Pod{*pod}, } cache := config.Construct() @@ -283,7 +293,7 @@ func TestFillPodStatusMissingMachine(t *testing.T) { } status, err := cache.GetPodStatus(pod.Namespace, pod.Name) - if e, a := api.PodFailed, status.Phase; e != a { + if e, a := api.PodUnknown, status.Phase; e != a { t.Errorf("Expected: %+v, Got %+v", e, a) } } @@ -310,7 +320,7 @@ func TestFillPodStatus(t *testing.T) { }, }, }, - nodes: []api.Node{*makeNode("machine")}, + nodes: []api.Node{*makeHealthyNode("machine")}, pods: []api.Pod{*pod}, } cache := config.Construct() @@ -337,7 +347,7 @@ func TestFillPodInfoNoData(t *testing.T) { "net": {}, }, }, - nodes: []api.Node{*makeNode("machine")}, + nodes: []api.Node{*makeHealthyNode("machine")}, pods: []api.Pod{*pod}, } cache := config.Construct() @@ -376,6 +386,7 @@ func TestPodPhaseWithBadNode(t *testing.T) { tests := []struct { pod *api.Pod + nodes []api.Node status api.PodPhase test string }{ @@ -383,10 +394,11 @@ func TestPodPhaseWithBadNode(t *testing.T) { &api.Pod{ Spec: desiredState, Status: api.PodStatus{ - Host: "machine-2", + Host: "machine-two", }, }, - api.PodFailed, + []api.Node{}, + api.PodUnknown, "no info, but bad machine", }, { @@ -400,7 +412,8 @@ func TestPodPhaseWithBadNode(t *testing.T) { Host: "machine-two", }, }, - api.PodFailed, + []api.Node{}, + api.PodUnknown, "all running but minion is missing", }, { @@ -414,14 +427,45 @@ func TestPodPhaseWithBadNode(t *testing.T) { Host: "machine-two", }, }, - api.PodFailed, + []api.Node{}, + api.PodUnknown, "all stopped but minion missing", }, + { + &api.Pod{ + Spec: desiredState, + Status: api.PodStatus{ + Info: map[string]api.ContainerStatus{ + "containerA": runningState, + "containerB": runningState, + }, + Host: "machine-two", + }, + }, + []api.Node{*makeUnhealthyNode("machine-two")}, + api.PodUnknown, + "all running but minion is unhealthy", + }, + { + &api.Pod{ + Spec: desiredState, + Status: api.PodStatus{ + Info: map[string]api.ContainerStatus{ + "containerA": stoppedState, + "containerB": stoppedState, + }, + Host: "machine-two", + }, + }, + []api.Node{*makeUnhealthyNode("machine-two")}, + api.PodUnknown, + "all stopped but minion is unhealthy", + }, } for _, test := range tests { config := podCacheTestConfig{ kubeletContainerInfo: test.pod.Status, - nodes: []api.Node{}, + nodes: test.nodes, pods: []api.Pod{*test.pod}, } cache := config.Construct()