diff --git a/test/e2e/density.go b/test/e2e/density.go index 30639cacca3..585387a50ea 100644 --- a/test/e2e/density.go +++ b/test/e2e/density.go @@ -97,17 +97,17 @@ var _ = Describe("Density", func() { skip bool podsPerMinion int /* Controls how often the apiserver is polled for pods */ - interval int + interval time.Duration } densityTests := []Density{ // This test should always run, even if larger densities are skipped. - {podsPerMinion: 3, skip: false, interval: 10}, - {podsPerMinion: 30, skip: false, interval: 10}, + {podsPerMinion: 3, skip: false, interval: 10 * time.Second}, + {podsPerMinion: 30, skip: false, interval: 10 * time.Second}, // More than 30 pods per node is outside our v1.0 goals. // We might want to enable those tests in the future. - {podsPerMinion: 50, skip: true, interval: 10}, - {podsPerMinion: 100, skip: true, interval: 1}, + {podsPerMinion: 50, skip: true, interval: 10 * time.Second}, + {podsPerMinion: 100, skip: true, interval: 1 * time.Second}, } for _, testArg := range densityTests { diff --git a/test/e2e/fifo_queue.go b/test/e2e/fifo_queue.go deleted file mode 100644 index 0942c37df72..00000000000 --- a/test/e2e/fifo_queue.go +++ /dev/null @@ -1,75 +0,0 @@ -/* -Copyright 2014 The Kubernetes Authors All rights reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package e2e - -import ( - "sync" - "time" -) - -type QueueItem struct { - createTime string - value interface{} -} - -type QueueItems struct { - pos int - mutex *sync.Mutex - list []QueueItem -} - -type FifoQueue QueueItems - -func (fq *FifoQueue) Push(elem interface{}) { - fq.mutex.Lock() - fq.list = append(fq.list, QueueItem{time.Now().String(), elem}) - fq.mutex.Unlock() -} - -func (fq *FifoQueue) Pop() QueueItem { - fq.mutex.Lock() - var val QueueItem - if len(fq.list)-1 >= fq.pos { - val = fq.list[fq.pos] - fq.pos++ - } - fq.mutex.Unlock() - return val -} - -func (fq FifoQueue) Len() int { - return len(fq.list[fq.pos:]) -} - -func (fq *FifoQueue) First() QueueItem { - return fq.list[fq.pos] -} - -func (fq *FifoQueue) Last() QueueItem { - return fq.list[len(fq.list)-1] -} - -func (fq *FifoQueue) Reset() { - fq.pos = 0 -} - -func newFifoQueue() *FifoQueue { - tmp := new(FifoQueue) - tmp.mutex = &sync.Mutex{} - tmp.pos = 0 - return tmp -} diff --git a/test/e2e/util.go b/test/e2e/util.go index 07e8dfa4e8b..64283497cc9 100644 --- a/test/e2e/util.go +++ b/test/e2e/util.go @@ -155,7 +155,7 @@ type RCConfig struct { Image string Name string Namespace string - PollInterval int + PollInterval time.Duration PodStatusFile *os.File Replicas int } @@ -812,43 +812,28 @@ func Diff(oldPods []*api.Pod, curPods []*api.Pod) PodDiff { // It's the caller's responsibility to clean up externally (i.e. use the // namespace lifecycle for handling cleanup). func RunRC(config RCConfig) error { - var last int - c := config.Client - name := config.Name - ns := config.Namespace - image := config.Image - replicas := config.Replicas - interval := config.PollInterval - maxContainerFailures := int(math.Max(1.0, float64(replicas)*.01)) - current := 0 - same := 0 - label := labels.SelectorFromSet(labels.Set(map[string]string{"name": name})) - podLists := newFifoQueue() + maxContainerFailures := int(math.Max(1.0, float64(config.Replicas)*.01)) + label := labels.SelectorFromSet(labels.Set(map[string]string{"name": config.Name})) - // Default to 10 second polling/check interval - if interval <= 0 { - interval = 10 - } - - By(fmt.Sprintf("%v Creating replication controller %s", time.Now(), name)) + By(fmt.Sprintf("%v Creating replication controller %s", time.Now(), config.Name)) rc := &api.ReplicationController{ ObjectMeta: api.ObjectMeta{ - Name: name, + Name: config.Name, }, Spec: api.ReplicationControllerSpec{ - Replicas: replicas, + Replicas: config.Replicas, Selector: map[string]string{ - "name": name, + "name": config.Name, }, Template: &api.PodTemplateSpec{ ObjectMeta: api.ObjectMeta{ - Labels: map[string]string{"name": name}, + Labels: map[string]string{"name": config.Name}, }, Spec: api.PodSpec{ Containers: []api.Container{ { - Name: name, - Image: image, + Name: config.Name, + Image: config.Image, Ports: []api.ContainerPort{{ContainerPort: 80}}, }, }, @@ -856,179 +841,80 @@ func RunRC(config RCConfig) error { }, }, } - _, err := c.ReplicationControllers(ns).Create(rc) + _, err := config.Client.ReplicationControllers(config.Namespace).Create(rc) if err != nil { return fmt.Errorf("Error creating replication controller: %v", err) } - Logf("%v Created replication controller with name: %v, namespace: %v, replica count: %v", time.Now(), rc.Name, ns, rc.Spec.Replicas) - podStore := newPodStore(c, ns, label, fields.Everything()) + Logf("%v Created replication controller with name: %v, namespace: %v, replica count: %v", time.Now(), rc.Name, config.Namespace, rc.Spec.Replicas) + podStore := newPodStore(config.Client, config.Namespace, label, fields.Everything()) defer podStore.Stop() - // Create a routine to query for the list of pods - stop := make(chan struct{}) - go func(stop <-chan struct{}, ns string, label labels.Selector, interval int) { - for { - select { - case <-stop: - return - default: - podLists.Push(podStore.List()) - time.Sleep(time.Duration(interval) * time.Second) - } - } - }(stop, ns, label, interval) - defer close(stop) - - // Look for all the replicas to be created by the replication - // controller. Stop looking if all replicas are found or no new - // replicas are found for a continual number of times - By(fmt.Sprintf("Making sure all %d replicas of rc %s in namespace %s exist", replicas, name, ns)) - - // There must be some amount of new pods created within 2 minutes, so - // determine the number of checks needed to ensure timeout within - // that time period. 2 minutes is generous amount of time to see - // a change new pods created in the system even if it is under load. - failCount := int(math.Max(1.0, 120.0/float64(interval))) - for same < failCount && current < replicas { - // Wait just longer than an interval to allow processing - // information in the queue quickly - time.Sleep(time.Duration(float32(interval)*1.1) * time.Second) - - // Greedily read all existing entries in the queue until - // all pods are found submitted or the queue is empty. If - // the queue is empty then we need to stop trying to process - // entries until there is something or process in the queue - for podLists.Len() > 0 && current < replicas { - item := podLists.Pop() - pods := item.value.([]*api.Pod) - current = len(pods) - Logf("%v Controller %s: Found %d pods out of %d", time.Now(), name, current, replicas) - if last < current { - same = 0 - } else if last == current { - same++ - } else if current < last { - return fmt.Errorf("Controller %s: Number of submitted pods dropped from %d to %d", name, last, current) - } - - if same >= failCount { - return fmt.Errorf("Controller %s: No pods submitted for the last %d checks", name, failCount) - } - last = current - } + interval := config.PollInterval + if interval <= 0 { + interval = 10 * time.Second } - if current != replicas { - return fmt.Errorf("Controller %s: Only found %d replicas out of %d", name, current, replicas) - } - Logf("%v Controller %s in ns %s: Found %d pods out of %d", time.Now(), name, ns, current, replicas) + oldPods := make([]*api.Pod, 0) + oldRunning := 0 + lastChange := time.Now() + for oldRunning != config.Replicas && time.Since(lastChange) < 5*time.Minute { + time.Sleep(interval) - // Look for all the replicas to be in a Running state. Stop looking - // if all replicas are found in a Running state or no new - // replicas are found Running for a continual number of times - By(fmt.Sprintf("%v Waiting for all %d replicas to be running with a max container failures of %d", time.Now(), replicas, maxContainerFailures)) - - // There must be some amount of pods that have newly transitioned to - // the Running state within 100 seconds, so determine the number of - // checks needed to ensure timeout within that time period. - // 100 seconds is generous amount of time to see a change in the - // system even if it is under load. - failCount = int(math.Max(1.0, 100.0/float64(interval))) - - same = 0 - last = 0 - current = 0 - var oldPods []*api.Pod - podLists.Reset() - foundAllPods := false - for same < failCount && current < replicas { - // Wait just longer than an interval to allow processing - // information in the queue quickly - time.Sleep(time.Duration(float32(interval)*1.1) * time.Second) - - // Greedily read all existing entries in the queue until - // either all pods are running or the queue is empty. If - // the queue is empty we need to stop looking for entries - // and wait for a new entry to process - for podLists.Len() > 0 && current < replicas { - item := podLists.Pop() - current = 0 - waiting := 0 - pending := 0 - unknown := 0 - inactive := 0 - failedContainers := 0 - currentPods := item.value.([]*api.Pod) - for _, p := range currentPods { - if p.Status.Phase == api.PodRunning { - current++ - for _, v := range FailedContainers(p) { - failedContainers = failedContainers + v.restarts - } - } else if p.Status.Phase == api.PodPending { - if p.Spec.NodeName == "" { - waiting++ - } else { - pending++ - } - } else if p.Status.Phase == api.PodSucceeded || p.Status.Phase == api.PodFailed { - inactive++ - } else if p.Status.Phase == api.PodUnknown { - unknown++ + running := 0 + waiting := 0 + pending := 0 + unknown := 0 + inactive := 0 + failedContainers := 0 + pods := podStore.List() + for _, p := range pods { + if p.Status.Phase == api.PodRunning { + running++ + for _, v := range FailedContainers(p) { + failedContainers = failedContainers + v.restarts } - } - - Logf("Pod States: %d running, %d pending, %d waiting, %d inactive, %d unknown ", current, pending, waiting, inactive, unknown) - if config.PodStatusFile != nil { - fmt.Fprintf(config.PodStatusFile, "%s, %d, running, %d, pending, %d, waiting, %d, inactive, %d, unknown\n", item.createTime, current, pending, waiting, inactive, unknown) - } - - if foundAllPods && len(currentPods) != len(oldPods) { - - // This failure mode includes: - // kubelet is dead, so node controller deleted pods and rc creates more - // - diagnose by noting the pod diff below. - // pod is unhealthy, so replication controller creates another to take its place - // - diagnose by comparing the previous "2 Pod states" lines for inactive pods - errorStr := fmt.Sprintf("Number of reported pods changed: %d vs %d", len(currentPods), len(oldPods)) - Logf("%v, pods that changed since the last iteration:", errorStr) - Diff(oldPods, currentPods).Print(util.NewStringSet()) - return fmt.Errorf(errorStr) - } - if last < current { - same = 0 - } else if last == current { - same++ - } else if current < last { - - // The pod failed or succeeded, or was somehow pushed out of running by the kubelet. - errorStr := fmt.Sprintf("Number of running pods dropped from %d to %d", last, current) - Logf("%v, pods that changed since the last iteration:", errorStr) - Diff(oldPods, currentPods).Print(util.NewStringSet()) - return fmt.Errorf(errorStr) - } - if same >= failCount { - - // Most times this happens because a few nodes have kubelet problems, and their pods are - // stuck in pending. - errorStr := fmt.Sprintf("No pods started for the last %d checks", failCount) - Logf("%v, pods currently in pending:", errorStr) - Diff(currentPods, make([]*api.Pod, 0)).Print(util.NewStringSet(string(api.PodRunning))) - return fmt.Errorf(errorStr) - } - - if !foundAllPods { - foundAllPods = len(currentPods) == replicas - } - last = current - oldPods = currentPods - - if failedContainers > maxContainerFailures { - return fmt.Errorf("%d containers failed which is more than allowed %d", failedContainers, maxContainerFailures) + } else if p.Status.Phase == api.PodPending { + if p.Spec.NodeName == "" { + waiting++ + } else { + pending++ + } + } else if p.Status.Phase == api.PodSucceeded || p.Status.Phase == api.PodFailed { + inactive++ + } else if p.Status.Phase == api.PodUnknown { + unknown++ } } + + Logf("%v Pods: %d out of %d created, %d running, %d pending, %d waiting, %d inactive, %d unknown ", + time.Now(), len(pods), config.Replicas, running, pending, waiting, inactive, unknown) + if config.PodStatusFile != nil { + fmt.Fprintf(config.PodStatusFile, "%s, %d, running, %d, pending, %d, waiting, %d, inactive, %d, unknown\n", time.Now(), running, pending, waiting, inactive, unknown) + } + + if failedContainers > maxContainerFailures { + return fmt.Errorf("%d containers failed which is more than allowed %d", failedContainers, maxContainerFailures) + } + if len(pods) < len(oldPods) || len(pods) > config.Replicas { + // This failure mode includes: + // kubelet is dead, so node controller deleted pods and rc creates more + // - diagnose by noting the pod diff below. + // pod is unhealthy, so replication controller creates another to take its place + // - diagnose by comparing the previous "2 Pod states" lines for inactive pods + errorStr := fmt.Sprintf("Number of reported pods changed: %d vs %d", len(pods), len(oldPods)) + Logf("%v, pods that changed since the last iteration:", errorStr) + Diff(oldPods, pods).Print(util.NewStringSet()) + return fmt.Errorf(errorStr) + } + + if len(pods) > len(oldPods) || running > oldRunning { + lastChange = time.Now() + } + oldPods = pods + oldRunning = running } - if current != replicas { - return fmt.Errorf("Only %d pods started out of %d", current, replicas) + + if oldRunning != config.Replicas { + return fmt.Errorf("Only %d pods started out of %d", oldRunning, config.Replicas) } return nil }