diff --git a/test/e2e/framework.go b/test/e2e/framework.go index 7a9749c2c95..c56e5a0c8ed 100644 --- a/test/e2e/framework.go +++ b/test/e2e/framework.go @@ -272,6 +272,11 @@ func (f *Framework) WaitForPodRunning(podName string) error { return waitForPodRunningInNamespace(f.Client, podName, f.Namespace.Name) } +// WaitForPodReady waits for the pod to flip to ready in the namespace. +func (f *Framework) WaitForPodReady(podName string) error { + return waitTimeoutForPodReadyInNamespace(f.Client, podName, f.Namespace.Name, podStartTimeout) +} + // WaitForPodRunningSlow waits for the pod to run in the namespace. // It has a longer timeout then WaitForPodRunning (util.slowPodStartTimeout). func (f *Framework) WaitForPodRunningSlow(podName string) error { diff --git a/test/e2e/kubeproxy.go b/test/e2e/kubeproxy.go index 288bec9d59d..47b21c8e5cb 100644 --- a/test/e2e/kubeproxy.go +++ b/test/e2e/kubeproxy.go @@ -46,12 +46,16 @@ const ( nodeHttpPort = 32080 nodeUdpPort = 32081 loadBalancerHttpPort = 100 - netexecImageName = "gcr.io/google_containers/netexec:1.4" + netexecImageName = "gcr.io/google_containers/netexec:1.5" testPodName = "test-container-pod" hostTestPodName = "host-test-container-pod" nodePortServiceName = "node-port-service" loadBalancerServiceName = "load-balancer-service" enableLoadBalancerTest = false + hitEndpointRetryDelay = 1 * time.Second + // Number of retries to hit a given set of endpoints. Needs to be high + // because we verify iptables statistical rr loadbalancing. + testTries = 30 ) type KubeProxyTestConfig struct { @@ -150,7 +154,7 @@ func createHTTPClient(transport *http.Transport) *http.Client { func (config *KubeProxyTestConfig) hitClusterIP(epCount int) { clusterIP := config.nodePortService.Spec.ClusterIP - tries := epCount*epCount + 15 // if epCount == 0 + tries := epCount*epCount + testTries // if epCount == 0 By("dialing(udp) node1 --> clusterIP:clusterUdpPort") config.dialFromNode("udp", clusterIP, clusterUdpPort, tries, epCount) By("dialing(http) node1 --> clusterIP:clusterHttpPort") @@ -169,7 +173,7 @@ func (config *KubeProxyTestConfig) hitClusterIP(epCount int) { func (config *KubeProxyTestConfig) hitNodePort(epCount int) { node1_IP := config.externalAddrs[0] - tries := epCount*epCount + 15 // if epCount == 0 + tries := epCount*epCount + testTries // if epCount == 0 By("dialing(udp) node1 --> node1:nodeUdpPort") config.dialFromNode("udp", node1_IP, nodeUdpPort, tries, epCount) By("dialing(http) node1 --> node1:nodeHttpPort") @@ -248,7 +252,10 @@ func (config *KubeProxyTestConfig) dialFromNode(protocol, targetIP string, targe } else { cmd = fmt.Sprintf("curl -s --connect-timeout 1 http://%s:%d/hostName", targetIP, targetPort) } - forLoop := fmt.Sprintf("for i in $(seq 1 %d); do %s; echo; done | grep -v '^\\s*$' |sort | uniq -c | wc -l", tries, cmd) + // TODO: This simply tells us that we can reach the endpoints. Check that + // the probability of hitting a specific endpoint is roughly the same as + // hitting any other. + forLoop := fmt.Sprintf("for i in $(seq 1 %d); do %s; echo; sleep %v; done | grep -v '^\\s*$' |sort | uniq -c | wc -l", tries, cmd, hitEndpointRetryDelay) By(fmt.Sprintf("Dialing from node. command:%s", forLoop)) stdout := RunHostCmdOrDie(config.f.Namespace.Name, config.hostTestContainerPod.Name, forLoop) Expect(strconv.Atoi(strings.TrimSpace(stdout))).To(BeNumerically("==", expectedCount)) @@ -262,6 +269,19 @@ func (config *KubeProxyTestConfig) getSelfURL(path string, expected string) { } func (config *KubeProxyTestConfig) createNetShellPodSpec(podName string, node string) *api.Pod { + probe := &api.Probe{ + InitialDelaySeconds: 10, + TimeoutSeconds: 30, + PeriodSeconds: 10, + SuccessThreshold: 1, + FailureThreshold: 3, + Handler: api.Handler{ + HTTPGet: &api.HTTPGetAction{ + Path: "/healthz", + Port: intstr.IntOrString{IntVal: endpointHttpPort}, + }, + }, + } pod := &api.Pod{ TypeMeta: unversioned.TypeMeta{ Kind: "Pod", @@ -293,6 +313,8 @@ func (config *KubeProxyTestConfig) createNetShellPodSpec(podName string, node st Protocol: api.ProtocolUDP, }, }, + LivenessProbe: probe, + ReadinessProbe: probe, }, }, NodeName: node, @@ -492,7 +514,7 @@ func (config *KubeProxyTestConfig) createNetProxyPods(podName string, selector m // wait that all of them are up runningPods := make([]*api.Pod, 0, len(nodes.Items)) for _, p := range createdPods { - expectNoError(config.f.WaitForPodRunning(p.Name)) + expectNoError(config.f.WaitForPodReady(p.Name)) rp, err := config.getPodClient().Get(p.Name) expectNoError(err) runningPods = append(runningPods, rp) diff --git a/test/e2e/util.go b/test/e2e/util.go index 761f1db671b..9b07f919e75 100644 --- a/test/e2e/util.go +++ b/test/e2e/util.go @@ -930,6 +930,19 @@ func waitTimeoutForPodNoLongerRunningInNamespace(c *client.Client, podName strin }) } +func waitTimeoutForPodReadyInNamespace(c *client.Client, podName string, namespace string, timeout time.Duration) error { + return waitForPodCondition(c, namespace, podName, "running", timeout, func(pod *api.Pod) (bool, error) { + if pod.Status.Phase == api.PodRunning { + Logf("Found pod '%s' on node '%s'", podName, pod.Spec.NodeName) + return true, nil + } + if pod.Status.Phase == api.PodFailed { + return true, fmt.Errorf("Giving up; pod went into failed status: \n%s", spew.Sprintf("%#v", pod)) + } + return podReady(pod), nil + }) +} + // waitForPodNotPending returns an error if it took too long for the pod to go out of pending state. func waitForPodNotPending(c *client.Client, ns, podName string) error { return waitForPodCondition(c, ns, podName, "!pending", podStartTimeout, func(pod *api.Pod) (bool, error) { @@ -2806,6 +2819,7 @@ func RunHostCmd(ns, name, cmd string) (string, error) { // RunHostCmdOrDie calls RunHostCmd and dies on error. func RunHostCmdOrDie(ns, name, cmd string) string { stdout, err := RunHostCmd(ns, name, cmd) + Logf("stdout: %v", stdout) expectNoError(err) return stdout } diff --git a/test/images/netexec/Makefile b/test/images/netexec/Makefile index c82990d7bc3..92ce0967958 100644 --- a/test/images/netexec/Makefile +++ b/test/images/netexec/Makefile @@ -14,7 +14,7 @@ .PHONY: all netexec image push clean -TAG = 1.4 +TAG = 1.5 PREFIX = gcr.io/google_containers diff --git a/test/images/netexec/netexec.go b/test/images/netexec/netexec.go index d761b3b147f..4289e6da19b 100644 --- a/test/images/netexec/netexec.go +++ b/test/images/netexec/netexec.go @@ -30,15 +30,36 @@ import ( "os/exec" "strconv" "strings" + "sync/atomic" "time" ) var ( - httpPort = 8080 - udpPort = 8081 - shellPath = "/bin/sh" + httpPort = 8080 + udpPort = 8081 + shellPath = "/bin/sh" + serverReady = &atomicBool{0} ) +// atomicBool uses load/store operations on an int32 to simulate an atomic boolean. +type atomicBool struct { + v int32 +} + +// set sets the int32 to the given boolean. +func (a *atomicBool) set(value bool) { + if value { + atomic.StoreInt32(&a.v, 1) + return + } + atomic.StoreInt32(&a.v, 0) +} + +// get returns true if the int32 == 1 +func (a *atomicBool) get() bool { + return atomic.LoadInt32(&a.v) == 1 +} + type output struct { responses []string errors []string @@ -91,6 +112,18 @@ func exitHandler(w http.ResponseWriter, r *http.Request) { func hostnameHandler(w http.ResponseWriter, r *http.Request) { log.Printf("GET /hostname") fmt.Fprintf(w, getHostName()) + http.HandleFunc("/healthz", healthzHandler) + log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", httpPort), nil)) +} + +// healthHandler response with a 200 if the UDP server is ready. It also serves +// as a health check of the HTTP server by virtue of being a HTTP handler. +func healthzHandler(w http.ResponseWriter, r *http.Request) { + if serverReady.get() { + w.WriteHeader(200) + return + } + w.WriteHeader(http.StatusPreconditionFailed) } func shutdownHandler(w http.ResponseWriter, r *http.Request) { @@ -318,6 +351,13 @@ func startUDPServer(udpPort int) { defer serverConn.Close() buf := make([]byte, 1024) + log.Printf("Started UDP server") + // Start responding to readiness probes. + serverReady.set(true) + defer func() { + log.Printf("UDP server exited") + serverReady.set(false) + }() for { n, clientAddress, err := serverConn.ReadFromUDP(buf) assertNoError(err) diff --git a/test/images/netexec/pod.yaml b/test/images/netexec/pod.yaml index f53bfd4d99c..9aef4017147 100644 --- a/test/images/netexec/pod.yaml +++ b/test/images/netexec/pod.yaml @@ -7,9 +7,33 @@ metadata: spec: containers: - name: netexec - image: gcr.io/google_containers/netexec:1.4 + image: gcr.io/google_containers/netexec:1.5 ports: - containerPort: 8080 protocol: TCP - containerPort: 8081 protocol: UDP + # give this pod the same liveness and readiness probe because + # we always want the kubelet to restart it if it becomes + # unready, and at the same time we want to observe readiness + # as a signal to start testing. + livenessProbe: + httpGet: + path: /healthz + port: 8080 + scheme: HTTP + initialDelaySeconds: 10 + timeoutSeconds: 5 + failureThreshold: 3 + periodSeconds: 10 + successThreshold: 1 + readinessProbe: + httpGet: + path: /healthz + port: 8080 + scheme: HTTP + initialDelaySeconds: 10 + timeoutSeconds: 5 + failureThreshold: 3 + periodSeconds: 10 + successThreshold: 1