Merge pull request #9059 from fgrzadkowski/watch_density

Use watch-based cache in density e2e tests.
This commit is contained in:
Filip Grzadkowski 2015-06-02 02:00:26 -07:00
commit 7580006771

View File

@ -31,13 +31,16 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/clientcmd"
clientcmdapi "github.com/GoogleCloudPlatform/kubernetes/pkg/client/clientcmd/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider"
"github.com/GoogleCloudPlatform/kubernetes/pkg/fields"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubectl"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
"code.google.com/p/go-uuid/uuid"
"github.com/davecgh/go-spew/spew"
@ -95,6 +98,40 @@ type ContainerFailures struct {
restarts int
}
// Convenient wrapper around cache.Store that returns list of api.Pod instead of interface{}.
type podStore struct {
cache.Store
stopCh chan struct{}
}
func newPodStore(c *client.Client, namespace string, label labels.Selector, field fields.Selector) *podStore {
lw := &cache.ListWatch{
ListFunc: func() (runtime.Object, error) {
return c.Pods(namespace).List(label, field)
},
WatchFunc: func(rv string) (watch.Interface, error) {
return c.Pods(namespace).Watch(label, field, rv)
},
}
store := cache.NewStore(cache.MetaNamespaceKeyFunc)
stopCh := make(chan struct{})
cache.NewReflector(lw, &api.Pod{}, store, 0).RunUntil(stopCh)
return &podStore{store, stopCh}
}
func (s *podStore) List() []*api.Pod {
objects := s.Store.List()
pods := make([]*api.Pod, 0)
for _, o := range objects {
pods = append(pods, o.(*api.Pod))
}
return pods
}
func (s *podStore) Stop() {
close(s.stopCh)
}
func Logf(format string, a ...interface{}) {
fmt.Fprintf(GinkgoWriter, "INFO: "+format+"\n", a...)
}
@ -643,16 +680,16 @@ func (p PodDiff) Print(ignorePhases util.StringSet) {
}
// Diff computes a PodDiff given 2 lists of pods.
func Diff(oldPods *api.PodList, curPods *api.PodList) PodDiff {
func Diff(oldPods []*api.Pod, curPods []*api.Pod) PodDiff {
podInfoMap := PodDiff{}
// New pods will show up in the curPods list but not in oldPods. They have oldhostname/phase == nonexist.
for _, pod := range curPods.Items {
for _, pod := range curPods {
podInfoMap[pod.Name] = &podInfo{hostname: pod.Spec.NodeName, phase: string(pod.Status.Phase), oldHostname: nonExist, oldPhase: nonExist}
}
// Deleted pods will show up in the oldPods list but not in curPods. They have a hostname/phase == nonexist.
for _, pod := range oldPods.Items {
for _, pod := range oldPods {
if info, ok := podInfoMap[pod.Name]; ok {
info.oldHostname, info.oldPhase = pod.Spec.NodeName, string(pod.Status.Phase)
} else {
@ -707,11 +744,10 @@ func RunRC(c *client.Client, name string, ns, image string, replicas int) error
By(fmt.Sprintf("Making sure all %d replicas of rc %s in namespace %s exist", replicas, name, ns))
label := labels.SelectorFromSet(labels.Set(map[string]string{"name": name}))
pods, err := listPods(c, ns, label, fields.Everything())
if err != nil {
return fmt.Errorf("Error listing pods: %v", err)
}
current = len(pods.Items)
podStore := newPodStore(c, ns, label, fields.Everything())
defer podStore.Stop()
pods := podStore.List()
current = len(pods)
failCount := 5
for same < failCount && current < replicas {
Logf("%v Controller %s: Found %d pods out of %d", time.Now(), name, current, replicas)
@ -729,11 +765,8 @@ func RunRC(c *client.Client, name string, ns, image string, replicas int) error
last = current
time.Sleep(5 * time.Second)
pods, err = listPods(c, ns, label, fields.Everything())
if err != nil {
return fmt.Errorf("Error listing pods: %v", err)
}
current = len(pods.Items)
pods = podStore.List()
current = len(pods)
}
if current != replicas {
return fmt.Errorf("Controller %s: Only found %d replicas out of %d", name, current, replicas)
@ -745,7 +778,7 @@ func RunRC(c *client.Client, name string, ns, image string, replicas int) error
last = 0
failCount = 10
current = 0
oldPods := &api.PodList{}
oldPods := make([]*api.Pod, 0)
for same < failCount && current < replicas {
current = 0
waiting := 0
@ -753,18 +786,13 @@ func RunRC(c *client.Client, name string, ns, image string, replicas int) error
unknown := 0
inactive := 0
failedContainers := 0
time.Sleep(10 * time.Second)
time.Sleep(5 * time.Second)
// TODO: Use a reflector both to put less strain on the cluster and
// for more clarity.
currentPods, err := listPods(c, ns, label, fields.Everything())
if err != nil {
return fmt.Errorf("Error listing pods: %v", err)
}
for _, p := range currentPods.Items {
currentPods := podStore.List()
for _, p := range currentPods {
if p.Status.Phase == api.PodRunning {
current++
for _, v := range FailedContainers(p) {
for _, v := range FailedContainers(*p) {
failedContainers = failedContainers + v.restarts
}
} else if p.Status.Phase == api.PodPending {
@ -781,14 +809,14 @@ func RunRC(c *client.Client, name string, ns, image string, replicas int) error
}
Logf("Pod States: %d running, %d pending, %d waiting, %d inactive, %d unknown ", current, pending, waiting, inactive, unknown)
if len(currentPods.Items) != len(pods.Items) {
if len(currentPods) != len(pods) {
// This failure mode includes:
// kubelet is dead, so node controller deleted pods and rc creates more
// - diagnose by noting the pod diff below.
// pod is unhealthy, so replication controller creates another to take its place
// - diagnose by comparing the previous "2 Pod states" lines for inactive pods
errorStr := fmt.Sprintf("Number of reported pods changed: %d vs %d", len(currentPods.Items), len(pods.Items))
errorStr := fmt.Sprintf("Number of reported pods changed: %d vs %d", len(currentPods), len(pods))
Logf("%v, pods that changed since the last iteration:", errorStr)
Diff(oldPods, currentPods).Print(util.NewStringSet())
return fmt.Errorf(errorStr)
@ -811,7 +839,7 @@ func RunRC(c *client.Client, name string, ns, image string, replicas int) error
// stuck in pending.
errorStr := fmt.Sprintf("No pods started for the last %d checks", failCount)
Logf("%v, pods currently in pending:", errorStr)
Diff(currentPods, &api.PodList{}).Print(util.NewStringSet(string(api.PodRunning)))
Diff(currentPods, make([]*api.Pod, 0)).Print(util.NewStringSet(string(api.PodRunning)))
return fmt.Errorf(errorStr)
}
last = current
@ -844,13 +872,11 @@ func ScaleRC(c *client.Client, ns, name string, size uint) error {
func waitForRCPodsRunning(c *client.Client, ns, rcName string) error {
running := false
label := labels.SelectorFromSet(labels.Set(map[string]string{"name": rcName}))
podStore := newPodStore(c, ns, label, fields.Everything())
defer podStore.Stop()
for start := time.Now(); time.Since(start) < 10*time.Minute; time.Sleep(5 * time.Second) {
pods, err := listPods(c, ns, label, fields.Everything())
if err != nil {
Logf("Error listing pods: %v", err)
continue
}
for _, p := range pods.Items {
pods := podStore.List()
for _, p := range pods {
if p.Status.Phase != api.PodRunning {
continue
}
@ -878,19 +904,6 @@ func DeleteRC(c *client.Client, ns, name string) error {
return err
}
// Convenient wrapper around listing pods supporting retries.
func listPods(c *client.Client, namespace string, label labels.Selector, field fields.Selector) (*api.PodList, error) {
maxRetries := 4
pods, err := c.Pods(namespace).List(label, field)
for i := 0; i < maxRetries; i++ {
if err == nil {
return pods, nil
}
pods, err = c.Pods(namespace).List(label, field)
}
return pods, err
}
// FailedContainers inspects all containers in a pod and returns failure
// information for containers that have failed or been restarted.
// A map is returned where the key is the containerID and the value is a