Merge pull request #8510 from GoogleCloudPlatform/revert-8399-improve_e2e_retry_logic

Revert "improve e2e retry logic with standard wait.Poll()"
This commit is contained in:
Quinton Hoole 2015-05-19 11:18:07 -07:00
commit 441f69f34e
10 changed files with 119 additions and 123 deletions

View File

@ -27,7 +27,6 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/fields" "github.com/GoogleCloudPlatform/kubernetes/pkg/fields"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/wait"
. "github.com/onsi/ginkgo" . "github.com/onsi/ginkgo"
. "github.com/onsi/gomega" . "github.com/onsi/gomega"
@ -73,14 +72,12 @@ func ClusterLevelLoggingWithElasticsearch(c *client.Client) {
// being run as the first e2e test just after the e2e cluster has been created. // being run as the first e2e test just after the e2e cluster has been created.
var err error var err error
const graceTime = 10 * time.Minute const graceTime = 10 * time.Minute
start := time.Now() for start := time.Now(); time.Since(start) < graceTime; time.Sleep(5 * time.Second) {
expectNoError(wait.Poll(5*time.Second, graceTime, func() (bool, error) {
if _, err = s.Get("elasticsearch-logging"); err == nil { if _, err = s.Get("elasticsearch-logging"); err == nil {
return true, nil break
} }
Logf("Attempt to check for the existence of the Elasticsearch service failed after %v", time.Since(start)) Logf("Attempt to check for the existence of the Elasticsearch service failed after %v", time.Since(start))
return false, nil }
}))
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
// Wait for the Elasticsearch pods to enter the running state. // Wait for the Elasticsearch pods to enter the running state.
@ -98,8 +95,7 @@ func ClusterLevelLoggingWithElasticsearch(c *client.Client) {
var statusCode float64 var statusCode float64
var esResponse map[string]interface{} var esResponse map[string]interface{}
err = nil err = nil
start = time.Now() for start := time.Now(); time.Since(start) < graceTime; time.Sleep(5 * time.Second) {
expectNoError(wait.Poll(5*time.Second, graceTime, func() (bool, error) {
// Query against the root URL for Elasticsearch. // Query against the root URL for Elasticsearch.
body, err := c.Get(). body, err := c.Get().
Namespace(api.NamespaceDefault). Namespace(api.NamespaceDefault).
@ -109,26 +105,26 @@ func ClusterLevelLoggingWithElasticsearch(c *client.Client) {
DoRaw() DoRaw()
if err != nil { if err != nil {
Logf("After %v proxy call to elasticsearch-loigging failed: %v", time.Since(start), err) Logf("After %v proxy call to elasticsearch-loigging failed: %v", time.Since(start), err)
return false, nil continue
} }
esResponse, err = bodyToJSON(body) esResponse, err = bodyToJSON(body)
if err != nil { if err != nil {
Logf("After %v failed to convert Elasticsearch JSON response %v to map[string]interface{}: %v", time.Since(start), string(body), err) Logf("After %v failed to convert Elasticsearch JSON response %v to map[string]interface{}: %v", time.Since(start), string(body), err)
return false, nil continue
} }
statusIntf, ok := esResponse["status"] statusIntf, ok := esResponse["status"]
if !ok { if !ok {
Logf("After %v Elasticsearch response has no status field: %v", time.Since(start), esResponse) Logf("After %v Elasticsearch response has no status field: %v", time.Since(start), esResponse)
return false, nil continue
} }
statusCode, ok = statusIntf.(float64) statusCode, ok = statusIntf.(float64)
if !ok { if !ok {
// Assume this is a string returning Failure. Retry. // Assume this is a string returning Failure. Retry.
Logf("After %v expected status to be a float64 but got %v of type %T", time.Since(start), statusIntf, statusIntf) Logf("After %v expected status to be a float64 but got %v of type %T", time.Since(start), statusIntf, statusIntf)
return false, nil continue
}
break
} }
return true, nil
}))
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
if int(statusCode) != 200 { if int(statusCode) != 200 {
Failf("Elasticsearch cluster has a bad status: %v", statusCode) Failf("Elasticsearch cluster has a bad status: %v", statusCode)
@ -237,8 +233,7 @@ func ClusterLevelLoggingWithElasticsearch(c *client.Client) {
By("Checking all the log lines were ingested into Elasticsearch") By("Checking all the log lines were ingested into Elasticsearch")
missing := 0 missing := 0
expected := nodeCount * countTo expected := nodeCount * countTo
start = time.Now() for start := time.Now(); time.Since(start) < graceTime; time.Sleep(10 * time.Second) {
expectNoError(wait.Poll(10*time.Second, graceTime, func() (bool, error) {
// Ask Elasticsearch to return all the log lines that were tagged with the underscore // Ask Elasticsearch to return all the log lines that were tagged with the underscore
// verison of the name. Ask for twice as many log lines as we expect to check for // verison of the name. Ask for twice as many log lines as we expect to check for
// duplication bugs. // duplication bugs.
@ -253,13 +248,13 @@ func ClusterLevelLoggingWithElasticsearch(c *client.Client) {
DoRaw() DoRaw()
if err != nil { if err != nil {
Logf("After %v failed to make proxy call to elasticsearch-logging: %v", time.Since(start), err) Logf("After %v failed to make proxy call to elasticsearch-logging: %v", time.Since(start), err)
return false, nil continue
} }
response, err := bodyToJSON(body) response, err := bodyToJSON(body)
if err != nil { if err != nil {
Logf("After %v failed to unmarshal response: %v", time.Since(start), err) Logf("After %v failed to unmarshal response: %v", time.Since(start), err)
return false, nil continue
} }
hits, ok := response["hits"].(map[string]interface{}) hits, ok := response["hits"].(map[string]interface{})
if !ok { if !ok {
@ -268,17 +263,17 @@ func ClusterLevelLoggingWithElasticsearch(c *client.Client) {
totalF, ok := hits["total"].(float64) totalF, ok := hits["total"].(float64)
if !ok { if !ok {
Logf("After %v hits[total] not of the expected type: %T", time.Since(start), hits["total"]) Logf("After %v hits[total] not of the expected type: %T", time.Since(start), hits["total"])
return false, nil continue
} }
total := int(totalF) total := int(totalF)
if total < expected { if total < expected {
Logf("After %v expecting to find %d log lines but saw only %d", time.Since(start), expected, total) Logf("After %v expecting to find %d log lines but saw only %d", time.Since(start), expected, total)
return false, nil continue
} }
h, ok := hits["hits"].([]interface{}) h, ok := hits["hits"].([]interface{})
if !ok { if !ok {
Logf("After %v hits not of the expected type: %T", time.Since(start), hits["hits"]) Logf("After %v hits not of the expected type: %T", time.Since(start), hits["hits"])
return false, nil continue
} }
// Initialize data-structure for observing counts. // Initialize data-structure for observing counts.
observed := make([][]int, nodeCount) observed := make([][]int, nodeCount)
@ -334,10 +329,10 @@ func ClusterLevelLoggingWithElasticsearch(c *client.Client) {
} }
if missing != 0 { if missing != 0 {
Logf("After %v still missing %d log lines", time.Since(start), missing) Logf("After %v still missing %d log lines", time.Since(start), missing)
return false, nil continue
} }
Logf("After %s found all %d log lines", time.Since(start), expected) Logf("After %s found all %d log lines", time.Since(start), expected)
return true, nil return
})) }
Failf("Failed to find all %d log lines", expected) Failf("Failed to find all %d log lines", expected)
} }

View File

@ -26,7 +26,6 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/wait"
. "github.com/onsi/ginkgo" . "github.com/onsi/ginkgo"
. "github.com/onsi/gomega" . "github.com/onsi/gomega"
@ -150,14 +149,13 @@ func validateGuestbookApp(c *client.Client, ns string) {
// Returns whether received expected response from guestbook on time. // Returns whether received expected response from guestbook on time.
func waitForGuestbookResponse(c *client.Client, cmd, arg, expectedResponse string, timeout time.Duration, ns string) bool { func waitForGuestbookResponse(c *client.Client, cmd, arg, expectedResponse string, timeout time.Duration, ns string) bool {
expectNoError(wait.Poll(5*time.Second, timeout, func() (bool, error) { for start := time.Now(); time.Since(start) < timeout; time.Sleep(5 * time.Second) {
res, err := makeRequestToGuestbook(c, cmd, arg, ns) res, err := makeRequestToGuestbook(c, cmd, arg, ns)
if err == nil && res == expectedResponse { if err == nil && res == expectedResponse {
return true, nil
}
return false, nil
}))
return true return true
}
}
return false
} }
func makeRequestToGuestbook(c *client.Client, cmd, value string, ns string) (string, error) { func makeRequestToGuestbook(c *client.Client, cmd, value string, ns string) (string, error) {

View File

@ -26,7 +26,6 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/fields" "github.com/GoogleCloudPlatform/kubernetes/pkg/fields"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/wait"
. "github.com/onsi/ginkgo" . "github.com/onsi/ginkgo"
. "github.com/onsi/gomega" . "github.com/onsi/gomega"
@ -119,7 +118,7 @@ func playWithRC(c *client.Client, wg *sync.WaitGroup, ns, name string, size int)
defer wg.Done() defer wg.Done()
rcExist := false rcExist := false
// Once every 1-2 minutes perform resize of RC. // Once every 1-2 minutes perform resize of RC.
expectNoError(wait.Poll(time.Duration(60+rand.Intn(60))*time.Second, simulationTime, func() (bool, error) { for start := time.Now(); time.Since(start) < simulationTime; time.Sleep(time.Duration(60+rand.Intn(60)) * time.Second) {
if !rcExist { if !rcExist {
expectNoError(RunRC(c, name, ns, image, size), fmt.Sprintf("creating rc %s in namespace %s", name, ns)) expectNoError(RunRC(c, name, ns, image, size), fmt.Sprintf("creating rc %s in namespace %s", name, ns))
rcExist = true rcExist = true
@ -132,8 +131,7 @@ func playWithRC(c *client.Client, wg *sync.WaitGroup, ns, name string, size int)
expectNoError(DeleteRC(c, ns, name), fmt.Sprintf("deleting rc %s in namespace %s", name, ns)) expectNoError(DeleteRC(c, ns, name), fmt.Sprintf("deleting rc %s in namespace %s", name, ns))
rcExist = false rcExist = false
} }
return false, nil }
}))
if rcExist { if rcExist {
expectNoError(DeleteRC(c, ns, name), fmt.Sprintf("deleting rc %s in namespace %s after test completion", name, ns)) expectNoError(DeleteRC(c, ns, name), fmt.Sprintf("deleting rc %s in namespace %s after test completion", name, ns))
} }

View File

@ -26,7 +26,6 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/fields" "github.com/GoogleCloudPlatform/kubernetes/pkg/fields"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/wait"
influxdb "github.com/influxdb/influxdb/client" influxdb "github.com/influxdb/influxdb/client"
. "github.com/onsi/ginkgo" . "github.com/onsi/ginkgo"
@ -224,12 +223,15 @@ func testMonitoringUsingHeapsterInfluxdb(c *client.Client) {
expectedNodes, err := getAllNodesInCluster(c) expectedNodes, err := getAllNodesInCluster(c)
expectNoError(err) expectNoError(err)
startTime := time.Now()
expectNoError(wait.Poll(sleepBetweenAttempts, testTimeout, func() (bool, error) { for {
if validatePodsAndNodes(influxdbClient, expectedPods, expectedNodes) { if validatePodsAndNodes(influxdbClient, expectedPods, expectedNodes) {
return true, nil return
}
if time.Since(startTime) >= testTimeout {
break
}
time.Sleep(sleepBetweenAttempts)
} }
return false, nil
}))
Failf("monitoring using heapster and influxdb test failed") Failf("monitoring using heapster and influxdb test failed")
} }

View File

@ -29,8 +29,6 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/fields" "github.com/GoogleCloudPlatform/kubernetes/pkg/fields"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/wait"
. "github.com/onsi/ginkgo" . "github.com/onsi/ginkgo"
. "github.com/onsi/gomega" . "github.com/onsi/gomega"
) )
@ -110,14 +108,14 @@ var _ = Describe("PD", func() {
expectNoError(podClient.Delete(host1Pod.Name, nil), "Failed to delete host1Pod") expectNoError(podClient.Delete(host1Pod.Name, nil), "Failed to delete host1Pod")
By(fmt.Sprintf("deleting PD %q", diskName)) By(fmt.Sprintf("deleting PD %q", diskName))
expectNoError(wait.Poll(5*time.Second, 180*time.Second, func() (bool, error) { for start := time.Now(); time.Since(start) < 180*time.Second; time.Sleep(5 * time.Second) {
if err = deletePD(diskName); err != nil { if err = deletePD(diskName); err != nil {
Logf("Couldn't delete PD. Sleeping 5 seconds (%v)", err) Logf("Couldn't delete PD. Sleeping 5 seconds (%v)", err)
return false, nil continue
} }
Logf("Deleted PD %v", diskName) Logf("Deleted PD %v", diskName)
return true, nil break
})) }
expectNoError(err, "Error deleting PD") expectNoError(err, "Error deleting PD")
return return
@ -178,14 +176,13 @@ var _ = Describe("PD", func() {
expectNoError(podClient.Delete(host1ROPod.Name, nil), "Failed to delete host1ROPod") expectNoError(podClient.Delete(host1ROPod.Name, nil), "Failed to delete host1ROPod")
By(fmt.Sprintf("deleting PD %q", diskName)) By(fmt.Sprintf("deleting PD %q", diskName))
for start := time.Now(); time.Since(start) < 180*time.Second; time.Sleep(5 * time.Second) {
expectNoError(wait.Poll(5*time.Second, 180*time.Second, func() (bool, error) {
if err = deletePD(diskName); err != nil { if err = deletePD(diskName); err != nil {
Logf("Couldn't delete PD. Sleeping 5 seconds") Logf("Couldn't delete PD. Sleeping 5 seconds")
return false, nil continue
}
break
} }
return true, nil
}))
expectNoError(err, "Error deleting PD") expectNoError(err, "Error deleting PD")
}) })
}) })

View File

@ -98,21 +98,22 @@ func testHostIP(c *client.Client, pod *api.Pod) {
err = waitForPodRunningInNamespace(c, pod.Name, ns) err = waitForPodRunningInNamespace(c, pod.Name, ns)
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
// Try to make sure we get a hostIP for each pod. // Try to make sure we get a hostIP for each pod.
hostIPTimeout := 2 * time.Minute
var ( t := time.Now()
hostIPTimeout = 2 * time.Minute for {
pods *api.Pod p, err := podClient.Get(pod.Name)
)
expectNoError(wait.Poll(5*time.Second, hostIPTimeout, func() (bool, error) {
pods, err = podClient.Get(pod.Name)
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
if pods.Status.HostIP != "" { if p.Status.HostIP != "" {
Logf("Pod %s has hostIP: %s", pods.Name, pods.Status.HostIP) Logf("Pod %s has hostIP: %s", p.Name, p.Status.HostIP)
return true, nil break
}
if time.Since(t) >= hostIPTimeout {
Failf("Gave up waiting for hostIP of pod %s after %v seconds",
p.Name, time.Since(t).Seconds())
}
Logf("Retrying to get the hostIP of pod %s", p.Name)
time.Sleep(5 * time.Second)
} }
Logf("Retrying to get the hostIP of pod %s", pods.Name)
return false, nil
}))
} }
var _ = Describe("Pods", func() { var _ = Describe("Pods", func() {

View File

@ -111,16 +111,22 @@ func ServeImageOrFail(c *client.Client, test string, image string) {
// List the pods, making sure we observe all the replicas. // List the pods, making sure we observe all the replicas.
listTimeout := time.Minute listTimeout := time.Minute
label := labels.SelectorFromSet(labels.Set(map[string]string{"name": name})) label := labels.SelectorFromSet(labels.Set(map[string]string{"name": name}))
pods, err := c.Pods(ns).List(label, fields.Everything())
var pods *api.PodList Expect(err).NotTo(HaveOccurred())
expectNoError(wait.Poll(5*time.Second, listTimeout, func() (bool, error) { t := time.Now()
pods, err = c.Pods(ns).List(label, fields.Everything()) for {
Logf("Controller %s: Found %d pods out of %d", name, len(pods.Items), replicas) Logf("Controller %s: Found %d pods out of %d", name, len(pods.Items), replicas)
if len(pods.Items) == replicas { if len(pods.Items) == replicas {
return true, nil break
}
if time.Since(t) > listTimeout {
Failf("Controller %s: Gave up waiting for %d pods to come up after seeing only %d pods after %v seconds",
name, replicas, len(pods.Items), time.Since(t).Seconds())
}
time.Sleep(5 * time.Second)
pods, err = c.Pods(ns).List(label, fields.Everything())
Expect(err).NotTo(HaveOccurred())
} }
return false, nil
}))
By("Ensuring each pod is running") By("Ensuring each pod is running")

View File

@ -24,7 +24,6 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/fields" "github.com/GoogleCloudPlatform/kubernetes/pkg/fields"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/wait"
. "github.com/onsi/ginkgo" . "github.com/onsi/ginkgo"
. "github.com/onsi/gomega" . "github.com/onsi/gomega"
@ -256,11 +255,11 @@ func waitForNodeToBeNotReady(c *client.Client, name string, timeout time.Duratio
// ready or unknown). // ready or unknown).
func waitForNodeToBe(c *client.Client, name string, wantReady bool, timeout time.Duration) bool { func waitForNodeToBe(c *client.Client, name string, wantReady bool, timeout time.Duration) bool {
Logf("Waiting up to %v for node %s readiness to be %t", timeout, name, wantReady) Logf("Waiting up to %v for node %s readiness to be %t", timeout, name, wantReady)
expectNoError(wait.Poll(poll, timeout, func() (bool, error) { for start := time.Now(); time.Since(start) < timeout; time.Sleep(poll) {
node, err := c.Nodes().Get(name) node, err := c.Nodes().Get(name)
if err != nil { if err != nil {
Logf("Couldn't get node %s", name) Logf("Couldn't get node %s", name)
return false, nil continue
} }
// Check the node readiness condition (logging all). // Check the node readiness condition (logging all).
@ -271,11 +270,10 @@ func waitForNodeToBe(c *client.Client, name string, wantReady bool, timeout time
// matches as desired. // matches as desired.
if cond.Type == api.NodeReady && (cond.Status == api.ConditionTrue) == wantReady { if cond.Type == api.NodeReady && (cond.Status == api.ConditionTrue) == wantReady {
Logf("Successfully found node %s readiness to be %t", name, wantReady) Logf("Successfully found node %s readiness to be %t", name, wantReady)
return true, nil
}
}
return false, nil
}))
return true return true
}
}
}
Logf("Node %s didn't reach desired readiness (%t) within %v", name, wantReady, timeout)
return false
} }

View File

@ -28,7 +28,6 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/latest" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/latest"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/wait"
. "github.com/onsi/ginkgo" . "github.com/onsi/ginkgo"
. "github.com/onsi/gomega" . "github.com/onsi/gomega"
@ -83,7 +82,7 @@ var _ = Describe("Services", func() {
Expect(foundRO).To(Equal(true)) Expect(foundRO).To(Equal(true))
}) })
It("should serve a basic endpoint from pods", func() { It("should serve a basic endpoint from pods", func(done Done) {
serviceName := "endpoint-test2" serviceName := "endpoint-test2"
ns := namespaces[0] ns := namespaces[0]
labels := map[string]string{ labels := map[string]string{
@ -145,9 +144,13 @@ var _ = Describe("Services", func() {
validateEndpointsOrFail(c, ns, serviceName, map[string][]int{}) validateEndpointsOrFail(c, ns, serviceName, map[string][]int{})
}) // We deferred Gingko pieces that may Fail, we aren't done.
defer func() {
close(done)
}()
}, 240.0)
It("should serve multiport endpoints from pods", func() { It("should serve multiport endpoints from pods", func(done Done) {
// repacking functionality is intentionally not tested here - it's better to test it in an integration test. // repacking functionality is intentionally not tested here - it's better to test it in an integration test.
serviceName := "multi-endpoint-test" serviceName := "multi-endpoint-test"
ns := namespaces[0] ns := namespaces[0]
@ -242,7 +245,11 @@ var _ = Describe("Services", func() {
validateEndpointsOrFail(c, ns, serviceName, map[string][]int{}) validateEndpointsOrFail(c, ns, serviceName, map[string][]int{})
}) // We deferred Gingko pieces that may Fail, we aren't done.
defer func() {
close(done)
}()
}, 240.0)
It("should be able to create a functioning external load balancer", func() { It("should be able to create a functioning external load balancer", func() {
if !providerIs("gce", "gke") { if !providerIs("gce", "gke") {
@ -321,13 +328,13 @@ var _ = Describe("Services", func() {
By("hitting the pod through the service's external load balancer") By("hitting the pod through the service's external load balancer")
var resp *http.Response var resp *http.Response
expectNoError(wait.Poll(5*time.Second, podStartTimeout, func() (bool, error) { for t := time.Now(); time.Since(t) < podStartTimeout; time.Sleep(5 * time.Second) {
resp, err = http.Get(fmt.Sprintf("http://%s:%d", ip, port)) resp, err = http.Get(fmt.Sprintf("http://%s:%d", ip, port))
if err == nil { if err == nil {
return true, nil break
} }
return false, nil }
})) Expect(err).NotTo(HaveOccurred())
defer resp.Body.Close() defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body) body, err := ioutil.ReadAll(resp.Body)
@ -393,22 +400,17 @@ func waitForPublicIPs(c *client.Client, serviceName, namespace string) (*api.Ser
const timeout = 4 * time.Minute const timeout = 4 * time.Minute
var service *api.Service var service *api.Service
By(fmt.Sprintf("waiting up to %v for service %s in namespace %s to have a public IP", timeout, serviceName, namespace)) By(fmt.Sprintf("waiting up to %v for service %s in namespace %s to have a public IP", timeout, serviceName, namespace))
start := time.Now() for start := time.Now(); time.Since(start) < timeout; time.Sleep(5 * time.Second) {
expectNoError(wait.Poll(5*time.Second, timeout, func() (bool, error) {
service, err := c.Services(namespace).Get(serviceName) service, err := c.Services(namespace).Get(serviceName)
if err != nil { if err != nil {
Logf("Get service failed, ignoring for 5s: %v", err) Logf("Get service failed, ignoring for 5s: %v", err)
return false, nil continue
} }
if len(service.Spec.PublicIPs) > 0 {
return true, nil
}
Logf("Waiting for service %s in namespace %s to have a public IP (%v)", serviceName, namespace, time.Since(start))
return false, nil
}))
if len(service.Spec.PublicIPs) > 0 { if len(service.Spec.PublicIPs) > 0 {
return service, nil return service, nil
} }
Logf("Waiting for service %s in namespace %s to have a public IP (%v)", serviceName, namespace, time.Since(start))
}
return service, fmt.Errorf("service %s in namespace %s doesn't have a public IP after %.2f seconds", serviceName, namespace, timeout.Seconds()) return service, fmt.Errorf("service %s in namespace %s doesn't have a public IP after %.2f seconds", serviceName, namespace, timeout.Seconds())
} }
@ -479,28 +481,27 @@ func validatePortsOrFail(endpoints map[string][]int, expectedEndpoints map[strin
func validateEndpointsOrFail(c *client.Client, ns, serviceName string, expectedEndpoints map[string][]int) { func validateEndpointsOrFail(c *client.Client, ns, serviceName string, expectedEndpoints map[string][]int) {
By(fmt.Sprintf("Validating endpoints %v with on service %s/%s", expectedEndpoints, ns, serviceName)) By(fmt.Sprintf("Validating endpoints %v with on service %s/%s", expectedEndpoints, ns, serviceName))
for {
expectNoError(wait.Poll(time.Second, 120*time.Second, func() (bool, error) {
endpoints, err := c.Endpoints(ns).Get(serviceName) endpoints, err := c.Endpoints(ns).Get(serviceName)
if err == nil { if err == nil {
Logf("Found endpoints %v", endpoints) By(fmt.Sprintf("Found endpoints %v", endpoints))
portsByIp := getPortsByIp(endpoints.Subsets) portsByIp := getPortsByIp(endpoints.Subsets)
Logf("Found ports by ip %v", portsByIp) By(fmt.Sprintf("Found ports by ip %v", portsByIp))
if len(portsByIp) == len(expectedEndpoints) { if len(portsByIp) == len(expectedEndpoints) {
expectedPortsByIp := translatePodNameToIpOrFail(c, ns, expectedEndpoints) expectedPortsByIp := translatePodNameToIpOrFail(c, ns, expectedEndpoints)
validatePortsOrFail(portsByIp, expectedPortsByIp) validatePortsOrFail(portsByIp, expectedPortsByIp)
return true, nil break
} else { } else {
Logf("Unexpected number of endpoints: found %v, expected %v (ignoring for 1 second)", portsByIp, expectedEndpoints) By(fmt.Sprintf("Unexpected number of endpoints: found %v, expected %v (ignoring for 1 second)", portsByIp, expectedEndpoints))
} }
} else { } else {
Logf("Failed to get endpoints: %v (ignoring for 1 second)", err) By(fmt.Sprintf("Failed to get endpoints: %v (ignoring for 1 second)", err))
} }
return false, nil time.Sleep(time.Second)
})) }
Logf("successfully validated endpoints %v with on service %s/%s", expectedEndpoints, ns, serviceName) By(fmt.Sprintf("successfully validated endpoints %v with on service %s/%s", expectedEndpoints, ns, serviceName))
} }
func addEndpointPodOrFail(c *client.Client, ns, name string, labels map[string]string, containerPorts []api.ContainerPort) { func addEndpointPodOrFail(c *client.Client, ns, name string, labels map[string]string, containerPorts []api.ContainerPort) {

View File

@ -39,7 +39,6 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubectl" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubectl"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/wait"
"golang.org/x/crypto/ssh" "golang.org/x/crypto/ssh"
@ -272,33 +271,32 @@ func validateController(c *client.Client, containerImage string, replicas int, c
getImageTemplate := fmt.Sprintf(`--template={{if (exists . "status" "containerStatuses")}}{{range .status.containerStatuses}}{{if eq .name "%s"}}{{.image}}{{end}}{{end}}{{end}}`, containername) getImageTemplate := fmt.Sprintf(`--template={{if (exists . "status" "containerStatuses")}}{{range .status.containerStatuses}}{{if eq .name "%s"}}{{.image}}{{end}}{{end}}{{end}}`, containername)
By(fmt.Sprintf("waiting for all containers in %s pods to come up.", testname)) //testname should be selector By(fmt.Sprintf("waiting for all containers in %s pods to come up.", testname)) //testname should be selector
for start := time.Now(); time.Since(start) < podStartTimeout; time.Sleep(5 * time.Second) {
expectNoError(wait.Poll(5*time.Second, podStartTimeout, func() (bool, error) {
getPodsOutput := runKubectl("get", "pods", "-o", "template", getPodsTemplate, "--api-version=v1beta3", "-l", testname, fmt.Sprintf("--namespace=%v", ns)) getPodsOutput := runKubectl("get", "pods", "-o", "template", getPodsTemplate, "--api-version=v1beta3", "-l", testname, fmt.Sprintf("--namespace=%v", ns))
pods := strings.Fields(getPodsOutput) pods := strings.Fields(getPodsOutput)
if numPods := len(pods); numPods != replicas { if numPods := len(pods); numPods != replicas {
By(fmt.Sprintf("Replicas for %s: expected=%d actual=%d", testname, replicas, numPods)) By(fmt.Sprintf("Replicas for %s: expected=%d actual=%d", testname, replicas, numPods))
return false, nil continue
} }
var runningPods []string var runningPods []string
for _, podID := range pods { for _, podID := range pods {
running := runKubectl("get", "pods", podID, "-o", "template", getContainerStateTemplate, "--api-version=v1beta3", fmt.Sprintf("--namespace=%v", ns)) running := runKubectl("get", "pods", podID, "-o", "template", getContainerStateTemplate, "--api-version=v1beta3", fmt.Sprintf("--namespace=%v", ns))
if running != "true" { if running != "true" {
Logf("%s is created but not running", podID) Logf("%s is created but not running", podID)
return false, nil continue
} }
currentImage := runKubectl("get", "pods", podID, "-o", "template", getImageTemplate, "--api-version=v1beta3", fmt.Sprintf("--namespace=%v", ns)) currentImage := runKubectl("get", "pods", podID, "-o", "template", getImageTemplate, "--api-version=v1beta3", fmt.Sprintf("--namespace=%v", ns))
if currentImage != containerImage { if currentImage != containerImage {
Logf("%s is created but running wrong image; expected: %s, actual: %s", podID, containerImage, currentImage) Logf("%s is created but running wrong image; expected: %s, actual: %s", podID, containerImage, currentImage)
return false, nil continue
} }
// Call the generic validator function here. // Call the generic validator function here.
// This might validate for example, that (1) getting a url works and (2) url is serving correct content. // This might validate for example, that (1) getting a url works and (2) url is serving correct content.
if err := validator(c, podID); err != nil { if err := validator(c, podID); err != nil {
Logf("%s is running right image but validator function failed: %v", podID, err) Logf("%s is running right image but validator function failed: %v", podID, err)
return false, nil continue
} }
Logf("%s is verified up and running", podID) Logf("%s is verified up and running", podID)
@ -306,10 +304,9 @@ func validateController(c *client.Client, containerImage string, replicas int, c
} }
// If we reach here, then all our checks passed. // If we reach here, then all our checks passed.
if len(runningPods) == replicas { if len(runningPods) == replicas {
return true, nil return
}
} }
return false, nil
}))
// Reaching here means that one of more checks failed multiple times. Assuming its not a race condition, something is broken. // Reaching here means that one of more checks failed multiple times. Assuming its not a race condition, something is broken.
Failf("Timed out after %v seconds waiting for %s pods to reach valid state", podStartTimeout.Seconds(), testname) Failf("Timed out after %v seconds waiting for %s pods to reach valid state", podStartTimeout.Seconds(), testname)
} }
@ -390,9 +387,10 @@ func testContainerOutputInNamespace(scenarioName string, c *client.Client, pod *
By(fmt.Sprintf("Trying to get logs from host %s pod %s container %s: %v", By(fmt.Sprintf("Trying to get logs from host %s pod %s container %s: %v",
podStatus.Spec.Host, podStatus.Name, containerName, err)) podStatus.Spec.Host, podStatus.Name, containerName, err))
var logs []byte var logs []byte
start := time.Now()
// Sometimes the actual containers take a second to get started, try to get logs for 60s // Sometimes the actual containers take a second to get started, try to get logs for 60s
expectNoError(wait.Poll(5*time.Second, 60*time.Second, func() (bool, error) { for time.Now().Sub(start) < (60 * time.Second) {
logs, err = c.Get(). logs, err = c.Get().
Prefix("proxy"). Prefix("proxy").
Resource("nodes"). Resource("nodes").
@ -400,14 +398,16 @@ func testContainerOutputInNamespace(scenarioName string, c *client.Client, pod *
Suffix("containerLogs", ns, podStatus.Name, containerName). Suffix("containerLogs", ns, podStatus.Name, containerName).
Do(). Do().
Raw() Raw()
Logf("pod logs:%v\n", string(logs)) fmt.Sprintf("pod logs:%v\n", string(logs))
By(fmt.Sprintf("pod logs:%v\n", string(logs)))
if strings.Contains(string(logs), "Internal Error") { if strings.Contains(string(logs), "Internal Error") {
Logf("Failed to get logs from host %q pod %q container %q: %v", By(fmt.Sprintf("Failed to get logs from host %q pod %q container %q: %v",
podStatus.Spec.Host, podStatus.Name, containerName, string(logs)) podStatus.Spec.Host, podStatus.Name, containerName, string(logs)))
return false, nil time.Sleep(5 * time.Second)
continue
}
break
} }
return true, nil
}))
for _, m := range expectedOutput { for _, m := range expectedOutput {
Expect(string(logs)).To(ContainSubstring(m), "%q in container output", m) Expect(string(logs)).To(ContainSubstring(m), "%q in container output", m)