Add timeouts to proxy calls in e2e tests

This commit is contained in:
Kris 2016-12-11 16:14:50 -08:00
parent 8070548ebe
commit 128af75b93
13 changed files with 159 additions and 8 deletions

View File

@ -17,6 +17,7 @@ limitations under the License.
package e2e package e2e
import ( import (
"context"
"fmt" "fmt"
"strconv" "strconv"
"time" "time"
@ -202,7 +203,12 @@ func (rc *ResourceConsumer) makeConsumeCustomMetric() {
func (rc *ResourceConsumer) sendConsumeCPURequest(millicores int) { func (rc *ResourceConsumer) sendConsumeCPURequest(millicores int) {
proxyRequest, err := framework.GetServicesProxyRequest(rc.framework.ClientSet, rc.framework.ClientSet.Core().RESTClient().Post()) proxyRequest, err := framework.GetServicesProxyRequest(rc.framework.ClientSet, rc.framework.ClientSet.Core().RESTClient().Post())
framework.ExpectNoError(err) framework.ExpectNoError(err)
ctx, cancel := context.WithTimeout(context.Background(), framework.SingleCallTimeout)
defer cancel()
req := proxyRequest.Namespace(rc.framework.Namespace.Name). req := proxyRequest.Namespace(rc.framework.Namespace.Name).
Context(ctx).
Name(rc.controllerName). Name(rc.controllerName).
Suffix("ConsumeCPU"). Suffix("ConsumeCPU").
Param("millicores", strconv.Itoa(millicores)). Param("millicores", strconv.Itoa(millicores)).
@ -217,7 +223,12 @@ func (rc *ResourceConsumer) sendConsumeCPURequest(millicores int) {
func (rc *ResourceConsumer) sendConsumeMemRequest(megabytes int) { func (rc *ResourceConsumer) sendConsumeMemRequest(megabytes int) {
proxyRequest, err := framework.GetServicesProxyRequest(rc.framework.ClientSet, rc.framework.ClientSet.Core().RESTClient().Post()) proxyRequest, err := framework.GetServicesProxyRequest(rc.framework.ClientSet, rc.framework.ClientSet.Core().RESTClient().Post())
framework.ExpectNoError(err) framework.ExpectNoError(err)
ctx, cancel := context.WithTimeout(context.Background(), framework.SingleCallTimeout)
defer cancel()
req := proxyRequest.Namespace(rc.framework.Namespace.Name). req := proxyRequest.Namespace(rc.framework.Namespace.Name).
Context(ctx).
Name(rc.controllerName). Name(rc.controllerName).
Suffix("ConsumeMem"). Suffix("ConsumeMem").
Param("megabytes", strconv.Itoa(megabytes)). Param("megabytes", strconv.Itoa(megabytes)).
@ -232,7 +243,12 @@ func (rc *ResourceConsumer) sendConsumeMemRequest(megabytes int) {
func (rc *ResourceConsumer) sendConsumeCustomMetric(delta int) { func (rc *ResourceConsumer) sendConsumeCustomMetric(delta int) {
proxyRequest, err := framework.GetServicesProxyRequest(rc.framework.ClientSet, rc.framework.ClientSet.Core().RESTClient().Post()) proxyRequest, err := framework.GetServicesProxyRequest(rc.framework.ClientSet, rc.framework.ClientSet.Core().RESTClient().Post())
framework.ExpectNoError(err) framework.ExpectNoError(err)
ctx, cancel := context.WithTimeout(context.Background(), framework.SingleCallTimeout)
defer cancel()
req := proxyRequest.Namespace(rc.framework.Namespace.Name). req := proxyRequest.Namespace(rc.framework.Namespace.Name).
Context(ctx).
Name(rc.controllerName). Name(rc.controllerName).
Suffix("BumpMetric"). Suffix("BumpMetric").
Param("metric", customMetricName). Param("metric", customMetricName).

View File

@ -17,6 +17,7 @@ limitations under the License.
package e2e package e2e
import ( import (
"context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"strconv" "strconv"
@ -121,14 +122,23 @@ func checkElasticsearchReadiness(f *framework.Framework) error {
framework.Logf("After %v failed to get services proxy request: %v", time.Since(start), errProxy) framework.Logf("After %v failed to get services proxy request: %v", time.Since(start), errProxy)
continue continue
} }
ctx, cancel := context.WithTimeout(context.Background(), framework.SingleCallTimeout)
defer cancel()
// Query against the root URL for Elasticsearch. // Query against the root URL for Elasticsearch.
response := proxyRequest.Namespace(api.NamespaceSystem). response := proxyRequest.Namespace(api.NamespaceSystem).
Context(ctx).
Name("elasticsearch-logging"). Name("elasticsearch-logging").
Do() Do()
err = response.Error() err = response.Error()
response.StatusCode(&statusCode) response.StatusCode(&statusCode)
if err != nil { if err != nil {
if ctx.Err() != nil {
framework.Failf("After %v proxy call to elasticsearch-loigging failed: %v", time.Since(start), err)
continue
}
framework.Logf("After %v proxy call to elasticsearch-loigging failed: %v", time.Since(start), err) framework.Logf("After %v proxy call to elasticsearch-loigging failed: %v", time.Since(start), err)
continue continue
} }
@ -153,12 +163,20 @@ func checkElasticsearchReadiness(f *framework.Framework) error {
framework.Logf("After %v failed to get services proxy request: %v", time.Since(start), errProxy) framework.Logf("After %v failed to get services proxy request: %v", time.Since(start), errProxy)
continue continue
} }
ctx, cancel := context.WithTimeout(context.Background(), framework.SingleCallTimeout)
defer cancel()
body, err = proxyRequest.Namespace(api.NamespaceSystem). body, err = proxyRequest.Namespace(api.NamespaceSystem).
Context(ctx).
Name("elasticsearch-logging"). Name("elasticsearch-logging").
Suffix("_cluster/health"). Suffix("_cluster/health").
Param("level", "indices"). Param("level", "indices").
DoRaw() DoRaw()
if err != nil { if err != nil {
if ctx.Err() != nil {
framework.Failf("Failed to get cluster health from elasticsearch: %v", err)
}
continue continue
} }
health := make(map[string]interface{}) health := make(map[string]interface{})
@ -195,9 +213,13 @@ func getMissingLinesCountElasticsearch(f *framework.Framework, expectedCount int
return 0, fmt.Errorf("Failed to get services proxy request: %v", errProxy) return 0, fmt.Errorf("Failed to get services proxy request: %v", errProxy)
} }
ctx, cancel := context.WithTimeout(context.Background(), framework.SingleCallTimeout)
defer cancel()
// Ask Elasticsearch to return all the log lines that were tagged with the // Ask Elasticsearch to return all the log lines that were tagged with the
// pod name. Ask for ten times as many log lines because duplication is possible. // pod name. Ask for ten times as many log lines because duplication is possible.
body, err := proxyRequest.Namespace(api.NamespaceSystem). body, err := proxyRequest.Namespace(api.NamespaceSystem).
Context(ctx).
Name("elasticsearch-logging"). Name("elasticsearch-logging").
Suffix("_search"). Suffix("_search").
// TODO: Change filter to only match records from current test run // TODO: Change filter to only match records from current test run
@ -207,6 +229,9 @@ func getMissingLinesCountElasticsearch(f *framework.Framework, expectedCount int
Param("size", strconv.Itoa(expectedCount*10)). Param("size", strconv.Itoa(expectedCount*10)).
DoRaw() DoRaw()
if err != nil { if err != nil {
if ctx.Err() != nil {
framework.Failf("Failed to make proxy call to elasticsearch-logging: %v", err)
}
return 0, fmt.Errorf("Failed to make proxy call to elasticsearch-logging: %v", err) return 0, fmt.Errorf("Failed to make proxy call to elasticsearch-logging: %v", err)
} }

View File

@ -17,6 +17,7 @@ limitations under the License.
package e2e package e2e
import ( import (
"context"
"net/http" "net/http"
"time" "time"
@ -58,17 +59,26 @@ var _ = framework.KubeDescribe("Kubernetes Dashboard", func() {
if errProxy != nil { if errProxy != nil {
framework.Logf("Get services proxy request failed: %v", errProxy) framework.Logf("Get services proxy request failed: %v", errProxy)
} }
ctx, cancel := context.WithTimeout(context.Background(), framework.SingleCallTimeout)
defer cancel()
// Query against the proxy URL for the kube-ui service. // Query against the proxy URL for the kube-ui service.
err := proxyRequest.Namespace(uiNamespace). err := proxyRequest.Namespace(uiNamespace).
Context(ctx).
Name(uiServiceName). Name(uiServiceName).
Timeout(framework.SingleCallTimeout). Timeout(framework.SingleCallTimeout).
Do(). Do().
StatusCode(&status). StatusCode(&status).
Error() Error()
if status != http.StatusOK { if err != nil {
framework.Logf("Unexpected status from kubernetes-dashboard: %v", status) if ctx.Err() != nil {
} else if err != nil { framework.Failf("Request to kube-ui failed: %v", err)
return true, err
}
framework.Logf("Request to kube-ui failed: %v", err) framework.Logf("Request to kube-ui failed: %v", err)
} else if status != http.StatusOK {
framework.Logf("Unexpected status from kubernetes-dashboard: %v", status)
} }
// Don't return err here as it aborts polling. // Don't return err here as it aborts polling.
return status == http.StatusOK, nil return status == http.StatusOK, nil

View File

@ -17,6 +17,7 @@ limitations under the License.
package e2e package e2e
import ( import (
"context"
"fmt" "fmt"
"strings" "strings"
"time" "time"
@ -185,10 +186,15 @@ func assertFilesContain(fileNames []string, fileDir string, pod *v1.Pod, client
if err != nil { if err != nil {
return false, err return false, err
} }
ctx, cancel := context.WithTimeout(context.Background(), framework.SingleCallTimeout)
defer cancel()
var contents []byte var contents []byte
for _, fileName := range fileNames { for _, fileName := range fileNames {
if subResourceProxyAvailable { if subResourceProxyAvailable {
contents, err = client.Core().RESTClient().Get(). contents, err = client.Core().RESTClient().Get().
Context(ctx).
Namespace(pod.Namespace). Namespace(pod.Namespace).
Resource("pods"). Resource("pods").
SubResource("proxy"). SubResource("proxy").
@ -197,6 +203,7 @@ func assertFilesContain(fileNames []string, fileDir string, pod *v1.Pod, client
Do().Raw() Do().Raw()
} else { } else {
contents, err = client.Core().RESTClient().Get(). contents, err = client.Core().RESTClient().Get().
Context(ctx).
Prefix("proxy"). Prefix("proxy").
Resource("pods"). Resource("pods").
Namespace(pod.Namespace). Namespace(pod.Namespace).
@ -205,7 +212,11 @@ func assertFilesContain(fileNames []string, fileDir string, pod *v1.Pod, client
Do().Raw() Do().Raw()
} }
if err != nil { if err != nil {
framework.Logf("Unable to read %s from pod %s: %v", fileName, pod.Name, err) if ctx.Err() != nil {
framework.Failf("Unable to read %s from pod %s: %v", fileName, pod.Name, err)
} else {
framework.Logf("Unable to read %s from pod %s: %v", fileName, pod.Name, err)
}
failed = append(failed, fileName) failed = append(failed, fileName)
} else if check && strings.TrimSpace(string(contents)) != expected { } else if check && strings.TrimSpace(string(contents)) != expected {
framework.Logf("File %s from pod %s contains '%s' instead of '%s'", fileName, pod.Name, string(contents), expected) framework.Logf("File %s from pod %s contains '%s' instead of '%s'", fileName, pod.Name, string(contents), expected)

View File

@ -17,6 +17,7 @@ limitations under the License.
package e2e package e2e
import ( import (
"context"
"fmt" "fmt"
"os" "os"
"os/exec" "os/exec"
@ -52,16 +53,25 @@ func readTransactions(c clientset.Interface, ns string) (error, int) {
if errProxy != nil { if errProxy != nil {
return errProxy, -1 return errProxy, -1
} }
ctx, cancel := context.WithTimeout(context.Background(), framework.SingleCallTimeout)
defer cancel()
body, err := proxyRequest.Namespace(ns). body, err := proxyRequest.Namespace(ns).
Context(ctx).
Name("frontend"). Name("frontend").
Suffix("llen"). Suffix("llen").
DoRaw() DoRaw()
if err != nil { if err != nil {
if ctx.Err() != nil {
framework.Failf("Failed to read petstore transactions: %v", err)
}
return err, -1 return err, -1
} else {
totalTrans, err := strconv.Atoi(string(body))
return err, totalTrans
} }
totalTrans, err := strconv.Atoi(string(body))
return err, totalTrans
} }
// runK8petstore runs the k8petstore application, bound to external nodeport, and // runK8petstore runs the k8petstore application, bound to external nodeport, and
@ -150,7 +160,7 @@ T:
// We should have exceeded the finalTransactionsExpected num of transactions. // We should have exceeded the finalTransactionsExpected num of transactions.
// If this fails, but there are transactions being created, we may need to recalibrate // If this fails, but there are transactions being created, we may need to recalibrate
// the finalTransactionsExpected value - or else - your cluster is broken/slow ! // the finalTransactionsExpected value - or else - your cluster is broken/slow !
Ω(totalTransactions).Should(BeNumerically(">", finalTransactionsExpected)) Expect(totalTransactions).To(BeNumerically(">", finalTransactionsExpected))
} }
var _ = framework.KubeDescribe("Pet Store [Feature:Example]", func() { var _ = framework.KubeDescribe("Pet Store [Feature:Example]", func() {

View File

@ -17,6 +17,7 @@ limitations under the License.
package e2e package e2e
import ( import (
"context"
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"os" "os"
@ -566,7 +567,12 @@ func makeHttpRequestToService(c clientset.Interface, ns, service, path string, t
if errProxy != nil { if errProxy != nil {
break break
} }
ctx, cancel := context.WithTimeout(context.Background(), framework.SingleCallTimeout)
defer cancel()
result, err = proxyRequest.Namespace(ns). result, err = proxyRequest.Namespace(ns).
Context(ctx).
Name(service). Name(service).
Suffix(path). Suffix(path).
Do(). Do().

View File

@ -18,6 +18,7 @@ package framework
import ( import (
"bytes" "bytes"
"context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"sort" "sort"
@ -288,9 +289,13 @@ func getContainerInfo(c clientset.Interface, nodeName string, req *kubeletstats.
return nil, err return nil, err
} }
ctx, cancel := context.WithTimeout(context.Background(), SingleCallTimeout)
defer cancel()
var data []byte var data []byte
if subResourceProxyAvailable { if subResourceProxyAvailable {
data, err = c.Core().RESTClient().Post(). data, err = c.Core().RESTClient().Post().
Context(ctx).
Resource("nodes"). Resource("nodes").
SubResource("proxy"). SubResource("proxy").
Name(fmt.Sprintf("%v:%v", nodeName, ports.KubeletPort)). Name(fmt.Sprintf("%v:%v", nodeName, ports.KubeletPort)).
@ -301,6 +306,7 @@ func getContainerInfo(c clientset.Interface, nodeName string, req *kubeletstats.
} else { } else {
data, err = c.Core().RESTClient().Post(). data, err = c.Core().RESTClient().Post().
Context(ctx).
Prefix("proxy"). Prefix("proxy").
Resource("nodes"). Resource("nodes").
Name(fmt.Sprintf("%v:%v", nodeName, ports.KubeletPort)). Name(fmt.Sprintf("%v:%v", nodeName, ports.KubeletPort)).

View File

@ -18,6 +18,7 @@ package framework
import ( import (
"bytes" "bytes"
"context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"io" "io"
@ -335,7 +336,11 @@ func getSchedulingLatency(c clientset.Interface) (SchedulingLatency, error) {
} }
} }
if masterRegistered { if masterRegistered {
ctx, cancel := context.WithTimeout(context.Background(), SingleCallTimeout)
defer cancel()
rawData, err := c.Core().RESTClient().Get(). rawData, err := c.Core().RESTClient().Get().
Context(ctx).
Prefix("proxy"). Prefix("proxy").
Namespace(api.NamespaceSystem). Namespace(api.NamespaceSystem).
Resource("pods"). Resource("pods").

View File

@ -18,6 +18,7 @@ package framework
import ( import (
"bytes" "bytes"
"context"
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
@ -1568,9 +1569,14 @@ func (r podProxyResponseChecker) CheckAllResponses() (done bool, err error) {
if err != nil { if err != nil {
return false, err return false, err
} }
ctx, cancel := context.WithTimeout(context.Background(), SingleCallTimeout)
defer cancel()
var body []byte var body []byte
if subResourceProxyAvailable { if subResourceProxyAvailable {
body, err = r.c.Core().RESTClient().Get(). body, err = r.c.Core().RESTClient().Get().
Context(ctx).
Namespace(r.ns). Namespace(r.ns).
Resource("pods"). Resource("pods").
SubResource("proxy"). SubResource("proxy").
@ -1579,6 +1585,7 @@ func (r podProxyResponseChecker) CheckAllResponses() (done bool, err error) {
Raw() Raw()
} else { } else {
body, err = r.c.Core().RESTClient().Get(). body, err = r.c.Core().RESTClient().Get().
Context(ctx).
Prefix("proxy"). Prefix("proxy").
Namespace(r.ns). Namespace(r.ns).
Resource("pods"). Resource("pods").
@ -1587,6 +1594,10 @@ func (r podProxyResponseChecker) CheckAllResponses() (done bool, err error) {
Raw() Raw()
} }
if err != nil { if err != nil {
if ctx.Err() != nil {
Failf("Controller %s: Failed to Get from replica %d [%s]: %v\n pod status: %#v", r.controllerName, i+1, pod.Name, err, pod.Status)
return false, err
}
Logf("Controller %s: Failed to GET from replica %d [%s]: %v\npod status: %#v", r.controllerName, i+1, pod.Name, err, pod.Status) Logf("Controller %s: Failed to GET from replica %d [%s]: %v\npod status: %#v", r.controllerName, i+1, pod.Name, err, pod.Status)
continue continue
} }
@ -1756,11 +1767,20 @@ func ServiceResponding(c clientset.Interface, ns, name string) error {
Logf("Failed to get services proxy request: %v:", errProxy) Logf("Failed to get services proxy request: %v:", errProxy)
return false, nil return false, nil
} }
ctx, cancel := context.WithTimeout(context.Background(), SingleCallTimeout)
defer cancel()
body, err := proxyRequest.Namespace(ns). body, err := proxyRequest.Namespace(ns).
Context(ctx).
Name(name). Name(name).
Do(). Do().
Raw() Raw()
if err != nil { if err != nil {
if ctx.Err() != nil {
Failf("Failed to GET from service %s: %v", name, err)
return true, err
}
Logf("Failed to GET from service %s: %v:", name, err) Logf("Failed to GET from service %s: %v:", name, err)
return false, nil return false, nil
} }

View File

@ -17,6 +17,7 @@ limitations under the License.
package e2e package e2e
import ( import (
"context"
"time" "time"
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
@ -87,11 +88,20 @@ func ClusterLevelLoggingWithKibana(f *framework.Framework) {
err = errProxy err = errProxy
continue continue
} }
ctx, cancel := context.WithTimeout(context.Background(), framework.SingleCallTimeout)
defer cancel()
// Query against the root URL for Kibana. // Query against the root URL for Kibana.
_, err = proxyRequest.Namespace(api.NamespaceSystem). _, err = proxyRequest.Namespace(api.NamespaceSystem).
Context(ctx).
Name("kibana-logging"). Name("kibana-logging").
DoRaw() DoRaw()
if err != nil { if err != nil {
if ctx.Err() != nil {
framework.Failf("After %v proxy call to kibana-logging failed: %v", time.Since(start), err)
break
}
framework.Logf("After %v proxy call to kibana-logging failed: %v", time.Since(start), err) framework.Logf("After %v proxy call to kibana-logging failed: %v", time.Since(start), err)
continue continue
} }

View File

@ -18,6 +18,7 @@ package e2e
import ( import (
"bytes" "bytes"
"context"
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
@ -1610,7 +1611,12 @@ func makeRequestToGuestbook(c clientset.Interface, cmd, value string, ns string)
if errProxy != nil { if errProxy != nil {
return "", errProxy return "", errProxy
} }
ctx, cancel := context.WithTimeout(context.Background(), framework.SingleCallTimeout)
defer cancel()
result, err := proxyRequest.Namespace(ns). result, err := proxyRequest.Namespace(ns).
Context(ctx).
Name("frontend"). Name("frontend").
Suffix("/guestbook.php"). Suffix("/guestbook.php").
Param("cmd", cmd). Param("cmd", cmd).
@ -1710,6 +1716,10 @@ func getUDData(jpgExpected string, ns string) func(clientset.Interface, string)
if err != nil { if err != nil {
return err return err
} }
ctx, cancel := context.WithTimeout(context.Background(), framework.SingleCallTimeout)
defer cancel()
var body []byte var body []byte
if subResourceProxyAvailable { if subResourceProxyAvailable {
body, err = c.Core().RESTClient().Get(). body, err = c.Core().RESTClient().Get().
@ -1731,6 +1741,9 @@ func getUDData(jpgExpected string, ns string) func(clientset.Interface, string)
Raw() Raw()
} }
if err != nil { if err != nil {
if ctx.Err() != nil {
framework.Failf("Failed to retrieve data from container: %v", err)
}
return err return err
} }
framework.Logf("got data: %s", body) framework.Logf("got data: %s", body)

View File

@ -18,6 +18,7 @@ package e2e
import ( import (
"bytes" "bytes"
"context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"time" "time"
@ -63,6 +64,10 @@ var (
// Query sends a command to the server and returns the Response // Query sends a command to the server and returns the Response
func Query(c clientset.Interface, query string) (*influxdb.Response, error) { func Query(c clientset.Interface, query string) (*influxdb.Response, error) {
ctx, cancel := context.WithTimeout(context.Background(), framework.SingleCallTimeout)
defer cancel()
result, err := c.Core().RESTClient().Get(). result, err := c.Core().RESTClient().Get().
Prefix("proxy"). Prefix("proxy").
Namespace("kube-system"). Namespace("kube-system").
@ -76,6 +81,9 @@ func Query(c clientset.Interface, query string) (*influxdb.Response, error) {
Raw() Raw()
if err != nil { if err != nil {
if ctx.Err() != nil {
framework.Failf("Failed to query influx db: %v", err)
}
return nil, err return nil, err
} }

View File

@ -17,6 +17,7 @@ limitations under the License.
package e2e package e2e
import ( import (
"context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"time" "time"
@ -122,9 +123,14 @@ func testPreStop(c clientset.Interface, ns string) {
if err != nil { if err != nil {
return false, err return false, err
} }
ctx, cancel := context.WithTimeout(context.Background(), framework.SingleCallTimeout)
defer cancel()
var body []byte var body []byte
if subResourceProxyAvailable { if subResourceProxyAvailable {
body, err = c.Core().RESTClient().Get(). body, err = c.Core().RESTClient().Get().
Context(ctx).
Namespace(ns). Namespace(ns).
Resource("pods"). Resource("pods").
SubResource("proxy"). SubResource("proxy").
@ -133,6 +139,7 @@ func testPreStop(c clientset.Interface, ns string) {
DoRaw() DoRaw()
} else { } else {
body, err = c.Core().RESTClient().Get(). body, err = c.Core().RESTClient().Get().
Context(ctx).
Prefix("proxy"). Prefix("proxy").
Namespace(ns). Namespace(ns).
Resource("pods"). Resource("pods").
@ -141,6 +148,10 @@ func testPreStop(c clientset.Interface, ns string) {
DoRaw() DoRaw()
} }
if err != nil { if err != nil {
if ctx.Err() != nil {
framework.Failf("Error validating prestop: %v", err)
return true, err
}
By(fmt.Sprintf("Error validating prestop: %v", err)) By(fmt.Sprintf("Error validating prestop: %v", err))
} else { } else {
framework.Logf("Saw: %s", string(body)) framework.Logf("Saw: %s", string(body))