Merge pull request #3795 from ddysher/pod-cache-semantic

fix pod-cache with node semantic change
This commit is contained in:
Dawn Chen 2015-01-26 16:04:48 -08:00
commit c65f83f424
2 changed files with 93 additions and 39 deletions

View File

@ -20,7 +20,6 @@ import (
"sync"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/pod"
@ -47,7 +46,7 @@ type PodCache struct {
podStatus map[objKey]api.PodStatus
// nodes that we know exist. Cleared at the beginning of each
// UpdateAllPods call.
currentNodes map[objKey]bool
currentNodes map[objKey]api.NodeStatus
}
type objKey struct {
@ -63,7 +62,7 @@ func NewPodCache(ipCache IPGetter, info client.PodInfoGetter, nodes client.NodeI
containerInfo: info,
pods: pods,
nodes: nodes,
currentNodes: map[objKey]bool{},
currentNodes: map[objKey]api.NodeStatus{},
podStatus: map[objKey]api.PodStatus{},
}
}
@ -80,37 +79,34 @@ func (p *PodCache) GetPodStatus(namespace, name string) (*api.PodStatus, error)
return &value, nil
}
func (p *PodCache) nodeExistsInCache(name string) (exists, cacheHit bool) {
func (p *PodCache) getNodeStatusInCache(name string) (*api.NodeStatus, bool) {
p.lock.Lock()
defer p.lock.Unlock()
exists, cacheHit = p.currentNodes[objKey{"", name}]
return exists, cacheHit
nodeStatus, cacheHit := p.currentNodes[objKey{"", name}]
return &nodeStatus, cacheHit
}
// lock must *not* be held
func (p *PodCache) nodeExists(name string) bool {
exists, cacheHit := p.nodeExistsInCache(name)
func (p *PodCache) getNodeStatus(name string) (*api.NodeStatus, error) {
nodeStatus, cacheHit := p.getNodeStatusInCache(name)
if cacheHit {
return exists
return nodeStatus, nil
}
// TODO: suppose there's N concurrent requests for node "foo"; in that case
// it might be useful to block all of them and only look up "foo" once.
// (This code will make up to N lookups.) One way of doing that would be to
// have a pool of M mutexes and require that before looking up "foo" you must
// lock mutex hash("foo") % M.
_, err := p.nodes.Get(name)
exists = true
node, err := p.nodes.Get(name)
if err != nil {
exists = false
if !errors.IsNotFound(err) {
glog.Errorf("Unexpected error type verifying minion existence: %+v", err)
}
glog.Errorf("Unexpected error verifying node existence: %+v", err)
return nil, err
}
p.lock.Lock()
defer p.lock.Unlock()
p.currentNodes[objKey{"", name}] = exists
return exists
p.currentNodes[objKey{"", name}] = node.Status
return &node.Status, nil
}
// TODO: once Host gets moved to spec, this can take a podSpec + metadata instead of an
@ -138,12 +134,26 @@ func (p *PodCache) computePodStatus(pod *api.Pod) (api.PodStatus, error) {
return newStatus, nil
}
if !p.nodeExists(pod.Status.Host) {
nodeStatus, err := p.getNodeStatus(pod.Status.Host)
// Assigned to non-existing node.
newStatus.Phase = api.PodFailed
if err != nil || len(nodeStatus.Conditions) == 0 {
newStatus.Phase = api.PodUnknown
return newStatus, nil
}
// Assigned to an unhealthy node.
for _, condition := range nodeStatus.Conditions {
if condition.Kind == api.NodeReady && condition.Status == api.ConditionNone {
newStatus.Phase = api.PodUnknown
return newStatus, nil
}
if condition.Kind == api.NodeReachable && condition.Status == api.ConditionNone {
newStatus.Phase = api.PodUnknown
return newStatus, nil
}
}
result, err := p.containerInfo.GetPodStatus(pod.Status.Host, pod.Namespace, pod.Name)
newStatus.HostIP = p.ipCache.GetInstanceIP(pod.Status.Host)
@ -161,10 +171,10 @@ func (p *PodCache) computePodStatus(pod *api.Pod) (api.PodStatus, error) {
return newStatus, err
}
func (p *PodCache) resetNodeExistenceCache() {
func (p *PodCache) resetNodeStatusCache() {
p.lock.Lock()
defer p.lock.Unlock()
p.currentNodes = map[objKey]bool{}
p.currentNodes = map[objKey]api.NodeStatus{}
}
// UpdateAllContainers updates information about all containers.
@ -172,7 +182,7 @@ func (p *PodCache) resetNodeExistenceCache() {
// calling again, or risk having new info getting clobbered by delayed
// old info.
func (p *PodCache) UpdateAllContainers() {
p.resetNodeExistenceCache()
p.resetNodeStatusCache()
ctx := api.NewContext()
pods, err := p.pods.ListPods(ctx, labels.Everything())

View File

@ -186,21 +186,31 @@ func makePod(namespace, name, host string, containers ...string) *api.Pod {
Status: api.PodStatus{Host: host},
}
for _, c := range containers {
pod.Spec.Containers = append(pod.Spec.Containers, api.Container{
Name: c,
})
pod.Spec.Containers = append(pod.Spec.Containers, api.Container{Name: c})
}
return pod
}
func makeNode(name string) *api.Node {
func makeHealthyNode(name string) *api.Node {
return &api.Node{
ObjectMeta: api.ObjectMeta{Name: name},
Status: api.NodeStatus{Conditions: []api.NodeCondition{
{Kind: api.NodeReady, Status: api.ConditionFull},
}},
}
}
func makeUnhealthyNode(name string) *api.Node {
return &api.Node{
ObjectMeta: api.ObjectMeta{Name: name},
Status: api.NodeStatus{Conditions: []api.NodeCondition{
{Kind: api.NodeReady, Status: api.ConditionNone},
}},
}
}
func TestPodUpdateAllContainers(t *testing.T) {
pod := makePod(api.NamespaceDefault, "foo", "machine", "bar")
pod1 := makePod(api.NamespaceDefault, "foo", "machine", "bar")
pod2 := makePod(api.NamespaceDefault, "baz", "machine", "qux")
config := podCacheTestConfig{
ipFunc: func(host string) string {
@ -211,8 +221,8 @@ func TestPodUpdateAllContainers(t *testing.T) {
},
kubeletContainerInfo: api.PodStatus{
Info: api.PodInfo{"bar": api.ContainerStatus{}}},
nodes: []api.Node{*makeNode("machine")},
pods: []api.Pod{*pod, *pod2},
nodes: []api.Node{*makeHealthyNode("machine")},
pods: []api.Pod{*pod1, *pod2},
}
cache := config.Construct()
@ -254,7 +264,7 @@ func TestFillPodStatusNoHost(t *testing.T) {
pod := makePod(api.NamespaceDefault, "foo", "", "bar")
config := podCacheTestConfig{
kubeletContainerInfo: api.PodStatus{},
nodes: []api.Node{*makeNode("machine")},
nodes: []api.Node{*makeHealthyNode("machine")},
pods: []api.Pod{*pod},
}
cache := config.Construct()
@ -283,7 +293,7 @@ func TestFillPodStatusMissingMachine(t *testing.T) {
}
status, err := cache.GetPodStatus(pod.Namespace, pod.Name)
if e, a := api.PodFailed, status.Phase; e != a {
if e, a := api.PodUnknown, status.Phase; e != a {
t.Errorf("Expected: %+v, Got %+v", e, a)
}
}
@ -310,7 +320,7 @@ func TestFillPodStatus(t *testing.T) {
},
},
},
nodes: []api.Node{*makeNode("machine")},
nodes: []api.Node{*makeHealthyNode("machine")},
pods: []api.Pod{*pod},
}
cache := config.Construct()
@ -337,7 +347,7 @@ func TestFillPodInfoNoData(t *testing.T) {
"net": {},
},
},
nodes: []api.Node{*makeNode("machine")},
nodes: []api.Node{*makeHealthyNode("machine")},
pods: []api.Pod{*pod},
}
cache := config.Construct()
@ -376,6 +386,7 @@ func TestPodPhaseWithBadNode(t *testing.T) {
tests := []struct {
pod *api.Pod
nodes []api.Node
status api.PodPhase
test string
}{
@ -383,10 +394,11 @@ func TestPodPhaseWithBadNode(t *testing.T) {
&api.Pod{
Spec: desiredState,
Status: api.PodStatus{
Host: "machine-2",
Host: "machine-two",
},
},
api.PodFailed,
[]api.Node{},
api.PodUnknown,
"no info, but bad machine",
},
{
@ -400,7 +412,8 @@ func TestPodPhaseWithBadNode(t *testing.T) {
Host: "machine-two",
},
},
api.PodFailed,
[]api.Node{},
api.PodUnknown,
"all running but minion is missing",
},
{
@ -414,14 +427,45 @@ func TestPodPhaseWithBadNode(t *testing.T) {
Host: "machine-two",
},
},
api.PodFailed,
[]api.Node{},
api.PodUnknown,
"all stopped but minion missing",
},
{
&api.Pod{
Spec: desiredState,
Status: api.PodStatus{
Info: map[string]api.ContainerStatus{
"containerA": runningState,
"containerB": runningState,
},
Host: "machine-two",
},
},
[]api.Node{*makeUnhealthyNode("machine-two")},
api.PodUnknown,
"all running but minion is unhealthy",
},
{
&api.Pod{
Spec: desiredState,
Status: api.PodStatus{
Info: map[string]api.ContainerStatus{
"containerA": stoppedState,
"containerB": stoppedState,
},
Host: "machine-two",
},
},
[]api.Node{*makeUnhealthyNode("machine-two")},
api.PodUnknown,
"all stopped but minion is unhealthy",
},
}
for _, test := range tests {
config := podCacheTestConfig{
kubeletContainerInfo: test.pod.Status,
nodes: []api.Node{},
nodes: test.nodes,
pods: []api.Pod{*test.pod},
}
cache := config.Construct()