Simplify e2e.RunRC method and wait up to 10 minutes for pods to start

This commit is contained in:
Filip Grzadkowski 2015-06-10 13:59:30 +02:00
parent 93ffeb93b6
commit 03f161def2
3 changed files with 78 additions and 267 deletions

View File

@ -97,17 +97,17 @@ var _ = Describe("Density", func() {
skip bool
podsPerMinion int
/* Controls how often the apiserver is polled for pods */
interval int
interval time.Duration
}
densityTests := []Density{
// This test should always run, even if larger densities are skipped.
{podsPerMinion: 3, skip: false, interval: 10},
{podsPerMinion: 30, skip: false, interval: 10},
{podsPerMinion: 3, skip: false, interval: 10 * time.Second},
{podsPerMinion: 30, skip: false, interval: 10 * time.Second},
// More than 30 pods per node is outside our v1.0 goals.
// We might want to enable those tests in the future.
{podsPerMinion: 50, skip: true, interval: 10},
{podsPerMinion: 100, skip: true, interval: 1},
{podsPerMinion: 50, skip: true, interval: 10 * time.Second},
{podsPerMinion: 100, skip: true, interval: 1 * time.Second},
}
for _, testArg := range densityTests {

View File

@ -1,75 +0,0 @@
/*
Copyright 2014 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package e2e
import (
"sync"
"time"
)
type QueueItem struct {
createTime string
value interface{}
}
type QueueItems struct {
pos int
mutex *sync.Mutex
list []QueueItem
}
type FifoQueue QueueItems
func (fq *FifoQueue) Push(elem interface{}) {
fq.mutex.Lock()
fq.list = append(fq.list, QueueItem{time.Now().String(), elem})
fq.mutex.Unlock()
}
func (fq *FifoQueue) Pop() QueueItem {
fq.mutex.Lock()
var val QueueItem
if len(fq.list)-1 >= fq.pos {
val = fq.list[fq.pos]
fq.pos++
}
fq.mutex.Unlock()
return val
}
func (fq FifoQueue) Len() int {
return len(fq.list[fq.pos:])
}
func (fq *FifoQueue) First() QueueItem {
return fq.list[fq.pos]
}
func (fq *FifoQueue) Last() QueueItem {
return fq.list[len(fq.list)-1]
}
func (fq *FifoQueue) Reset() {
fq.pos = 0
}
func newFifoQueue() *FifoQueue {
tmp := new(FifoQueue)
tmp.mutex = &sync.Mutex{}
tmp.pos = 0
return tmp
}

View File

@ -155,7 +155,7 @@ type RCConfig struct {
Image string
Name string
Namespace string
PollInterval int
PollInterval time.Duration
PodStatusFile *os.File
Replicas int
}
@ -812,43 +812,28 @@ func Diff(oldPods []*api.Pod, curPods []*api.Pod) PodDiff {
// It's the caller's responsibility to clean up externally (i.e. use the
// namespace lifecycle for handling cleanup).
func RunRC(config RCConfig) error {
var last int
c := config.Client
name := config.Name
ns := config.Namespace
image := config.Image
replicas := config.Replicas
interval := config.PollInterval
maxContainerFailures := int(math.Max(1.0, float64(replicas)*.01))
current := 0
same := 0
label := labels.SelectorFromSet(labels.Set(map[string]string{"name": name}))
podLists := newFifoQueue()
maxContainerFailures := int(math.Max(1.0, float64(config.Replicas)*.01))
label := labels.SelectorFromSet(labels.Set(map[string]string{"name": config.Name}))
// Default to 10 second polling/check interval
if interval <= 0 {
interval = 10
}
By(fmt.Sprintf("%v Creating replication controller %s", time.Now(), name))
By(fmt.Sprintf("%v Creating replication controller %s", time.Now(), config.Name))
rc := &api.ReplicationController{
ObjectMeta: api.ObjectMeta{
Name: name,
Name: config.Name,
},
Spec: api.ReplicationControllerSpec{
Replicas: replicas,
Replicas: config.Replicas,
Selector: map[string]string{
"name": name,
"name": config.Name,
},
Template: &api.PodTemplateSpec{
ObjectMeta: api.ObjectMeta{
Labels: map[string]string{"name": name},
Labels: map[string]string{"name": config.Name},
},
Spec: api.PodSpec{
Containers: []api.Container{
{
Name: name,
Image: image,
Name: config.Name,
Image: config.Image,
Ports: []api.ContainerPort{{ContainerPort: 80}},
},
},
@ -856,179 +841,80 @@ func RunRC(config RCConfig) error {
},
},
}
_, err := c.ReplicationControllers(ns).Create(rc)
_, err := config.Client.ReplicationControllers(config.Namespace).Create(rc)
if err != nil {
return fmt.Errorf("Error creating replication controller: %v", err)
}
Logf("%v Created replication controller with name: %v, namespace: %v, replica count: %v", time.Now(), rc.Name, ns, rc.Spec.Replicas)
podStore := newPodStore(c, ns, label, fields.Everything())
Logf("%v Created replication controller with name: %v, namespace: %v, replica count: %v", time.Now(), rc.Name, config.Namespace, rc.Spec.Replicas)
podStore := newPodStore(config.Client, config.Namespace, label, fields.Everything())
defer podStore.Stop()
// Create a routine to query for the list of pods
stop := make(chan struct{})
go func(stop <-chan struct{}, ns string, label labels.Selector, interval int) {
for {
select {
case <-stop:
return
default:
podLists.Push(podStore.List())
time.Sleep(time.Duration(interval) * time.Second)
}
}
}(stop, ns, label, interval)
defer close(stop)
// Look for all the replicas to be created by the replication
// controller. Stop looking if all replicas are found or no new
// replicas are found for a continual number of times
By(fmt.Sprintf("Making sure all %d replicas of rc %s in namespace %s exist", replicas, name, ns))
// There must be some amount of new pods created within 2 minutes, so
// determine the number of checks needed to ensure timeout within
// that time period. 2 minutes is generous amount of time to see
// a change new pods created in the system even if it is under load.
failCount := int(math.Max(1.0, 120.0/float64(interval)))
for same < failCount && current < replicas {
// Wait just longer than an interval to allow processing
// information in the queue quickly
time.Sleep(time.Duration(float32(interval)*1.1) * time.Second)
// Greedily read all existing entries in the queue until
// all pods are found submitted or the queue is empty. If
// the queue is empty then we need to stop trying to process
// entries until there is something or process in the queue
for podLists.Len() > 0 && current < replicas {
item := podLists.Pop()
pods := item.value.([]*api.Pod)
current = len(pods)
Logf("%v Controller %s: Found %d pods out of %d", time.Now(), name, current, replicas)
if last < current {
same = 0
} else if last == current {
same++
} else if current < last {
return fmt.Errorf("Controller %s: Number of submitted pods dropped from %d to %d", name, last, current)
}
if same >= failCount {
return fmt.Errorf("Controller %s: No pods submitted for the last %d checks", name, failCount)
}
last = current
}
interval := config.PollInterval
if interval <= 0 {
interval = 10 * time.Second
}
if current != replicas {
return fmt.Errorf("Controller %s: Only found %d replicas out of %d", name, current, replicas)
}
Logf("%v Controller %s in ns %s: Found %d pods out of %d", time.Now(), name, ns, current, replicas)
oldPods := make([]*api.Pod, 0)
oldRunning := 0
lastChange := time.Now()
for oldRunning != config.Replicas && time.Since(lastChange) < 5*time.Minute {
time.Sleep(interval)
// Look for all the replicas to be in a Running state. Stop looking
// if all replicas are found in a Running state or no new
// replicas are found Running for a continual number of times
By(fmt.Sprintf("%v Waiting for all %d replicas to be running with a max container failures of %d", time.Now(), replicas, maxContainerFailures))
// There must be some amount of pods that have newly transitioned to
// the Running state within 100 seconds, so determine the number of
// checks needed to ensure timeout within that time period.
// 100 seconds is generous amount of time to see a change in the
// system even if it is under load.
failCount = int(math.Max(1.0, 100.0/float64(interval)))
same = 0
last = 0
current = 0
var oldPods []*api.Pod
podLists.Reset()
foundAllPods := false
for same < failCount && current < replicas {
// Wait just longer than an interval to allow processing
// information in the queue quickly
time.Sleep(time.Duration(float32(interval)*1.1) * time.Second)
// Greedily read all existing entries in the queue until
// either all pods are running or the queue is empty. If
// the queue is empty we need to stop looking for entries
// and wait for a new entry to process
for podLists.Len() > 0 && current < replicas {
item := podLists.Pop()
current = 0
waiting := 0
pending := 0
unknown := 0
inactive := 0
failedContainers := 0
currentPods := item.value.([]*api.Pod)
for _, p := range currentPods {
if p.Status.Phase == api.PodRunning {
current++
for _, v := range FailedContainers(p) {
failedContainers = failedContainers + v.restarts
}
} else if p.Status.Phase == api.PodPending {
if p.Spec.NodeName == "" {
waiting++
} else {
pending++
}
} else if p.Status.Phase == api.PodSucceeded || p.Status.Phase == api.PodFailed {
inactive++
} else if p.Status.Phase == api.PodUnknown {
unknown++
running := 0
waiting := 0
pending := 0
unknown := 0
inactive := 0
failedContainers := 0
pods := podStore.List()
for _, p := range pods {
if p.Status.Phase == api.PodRunning {
running++
for _, v := range FailedContainers(p) {
failedContainers = failedContainers + v.restarts
}
}
Logf("Pod States: %d running, %d pending, %d waiting, %d inactive, %d unknown ", current, pending, waiting, inactive, unknown)
if config.PodStatusFile != nil {
fmt.Fprintf(config.PodStatusFile, "%s, %d, running, %d, pending, %d, waiting, %d, inactive, %d, unknown\n", item.createTime, current, pending, waiting, inactive, unknown)
}
if foundAllPods && len(currentPods) != len(oldPods) {
// This failure mode includes:
// kubelet is dead, so node controller deleted pods and rc creates more
// - diagnose by noting the pod diff below.
// pod is unhealthy, so replication controller creates another to take its place
// - diagnose by comparing the previous "2 Pod states" lines for inactive pods
errorStr := fmt.Sprintf("Number of reported pods changed: %d vs %d", len(currentPods), len(oldPods))
Logf("%v, pods that changed since the last iteration:", errorStr)
Diff(oldPods, currentPods).Print(util.NewStringSet())
return fmt.Errorf(errorStr)
}
if last < current {
same = 0
} else if last == current {
same++
} else if current < last {
// The pod failed or succeeded, or was somehow pushed out of running by the kubelet.
errorStr := fmt.Sprintf("Number of running pods dropped from %d to %d", last, current)
Logf("%v, pods that changed since the last iteration:", errorStr)
Diff(oldPods, currentPods).Print(util.NewStringSet())
return fmt.Errorf(errorStr)
}
if same >= failCount {
// Most times this happens because a few nodes have kubelet problems, and their pods are
// stuck in pending.
errorStr := fmt.Sprintf("No pods started for the last %d checks", failCount)
Logf("%v, pods currently in pending:", errorStr)
Diff(currentPods, make([]*api.Pod, 0)).Print(util.NewStringSet(string(api.PodRunning)))
return fmt.Errorf(errorStr)
}
if !foundAllPods {
foundAllPods = len(currentPods) == replicas
}
last = current
oldPods = currentPods
if failedContainers > maxContainerFailures {
return fmt.Errorf("%d containers failed which is more than allowed %d", failedContainers, maxContainerFailures)
} else if p.Status.Phase == api.PodPending {
if p.Spec.NodeName == "" {
waiting++
} else {
pending++
}
} else if p.Status.Phase == api.PodSucceeded || p.Status.Phase == api.PodFailed {
inactive++
} else if p.Status.Phase == api.PodUnknown {
unknown++
}
}
Logf("%v Pods: %d out of %d created, %d running, %d pending, %d waiting, %d inactive, %d unknown ",
time.Now(), len(pods), config.Replicas, running, pending, waiting, inactive, unknown)
if config.PodStatusFile != nil {
fmt.Fprintf(config.PodStatusFile, "%s, %d, running, %d, pending, %d, waiting, %d, inactive, %d, unknown\n", time.Now(), running, pending, waiting, inactive, unknown)
}
if failedContainers > maxContainerFailures {
return fmt.Errorf("%d containers failed which is more than allowed %d", failedContainers, maxContainerFailures)
}
if len(pods) < len(oldPods) || len(pods) > config.Replicas {
// This failure mode includes:
// kubelet is dead, so node controller deleted pods and rc creates more
// - diagnose by noting the pod diff below.
// pod is unhealthy, so replication controller creates another to take its place
// - diagnose by comparing the previous "2 Pod states" lines for inactive pods
errorStr := fmt.Sprintf("Number of reported pods changed: %d vs %d", len(pods), len(oldPods))
Logf("%v, pods that changed since the last iteration:", errorStr)
Diff(oldPods, pods).Print(util.NewStringSet())
return fmt.Errorf(errorStr)
}
if len(pods) > len(oldPods) || running > oldRunning {
lastChange = time.Now()
}
oldPods = pods
oldRunning = running
}
if current != replicas {
return fmt.Errorf("Only %d pods started out of %d", current, replicas)
if oldRunning != config.Replicas {
return fmt.Errorf("Only %d pods started out of %d", oldRunning, config.Replicas)
}
return nil
}