diff --git a/test/e2e/framework/service/BUILD b/test/e2e/framework/service/BUILD index 1be4648bb38..4a464c15aef 100644 --- a/test/e2e/framework/service/BUILD +++ b/test/e2e/framework/service/BUILD @@ -3,10 +3,7 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library") go_library( name = "go_default_library", srcs = [ - "affinity_checker.go", "const.go", - "fixture.go", - "hostname.go", "jig.go", "resource.go", "util.go", @@ -32,14 +29,12 @@ go_library( "//staging/src/k8s.io/client-go/kubernetes:go_default_library", "//staging/src/k8s.io/client-go/rest:go_default_library", "//staging/src/k8s.io/client-go/tools/cache:go_default_library", - "//staging/src/k8s.io/client-go/util/retry:go_default_library", "//staging/src/k8s.io/cloud-provider/service/helpers:go_default_library", "//test/e2e/framework:go_default_library", "//test/e2e/framework/network:go_default_library", "//test/e2e/framework/node:go_default_library", "//test/e2e/framework/pod:go_default_library", "//test/e2e/framework/rc:go_default_library", - "//test/e2e/framework/ssh:go_default_library", "//test/utils:go_default_library", "//test/utils/image:go_default_library", "//vendor/github.com/onsi/ginkgo:go_default_library", diff --git a/test/e2e/framework/service/affinity_checker.go b/test/e2e/framework/service/affinity_checker.go deleted file mode 100644 index 94e200616d5..00000000000 --- a/test/e2e/framework/service/affinity_checker.go +++ /dev/null @@ -1,114 +0,0 @@ -/* -Copyright 2019 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package service - -import ( - "fmt" - "net" - "strconv" - - v1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/kubernetes/test/e2e/framework" -) - -// CheckAffinity function tests whether the service affinity works as expected. -// If affinity is expected, the test will return true once affinityConfirmCount -// number of same response observed in a row. If affinity is not expected, the -// test will keep observe until different responses observed. The function will -// return false only in case of unexpected errors. -func CheckAffinity(execPod *v1.Pod, serviceIP string, servicePort int, shouldHold bool) bool { - serviceIPPort := net.JoinHostPort(serviceIP, strconv.Itoa(servicePort)) - cmd := fmt.Sprintf(`curl -q -s --connect-timeout 2 http://%s/`, serviceIPPort) - timeout := AffinityTimeout - if execPod == nil { - timeout = LoadBalancerPollTimeout - } - var tracker affinityTracker - if pollErr := wait.PollImmediate(framework.Poll, timeout, func() (bool, error) { - if execPod != nil { - stdout, err := framework.RunHostCmd(execPod.Namespace, execPod.Name, cmd) - if err != nil { - framework.Logf("Failed to get response from %s. Retry until timeout", serviceIPPort) - return false, nil - } - tracker.recordHost(stdout) - } else { - rawResponse := GetHTTPContent(serviceIP, servicePort, timeout, "") - tracker.recordHost(rawResponse.String()) - } - trackerFulfilled, affinityHolds := tracker.checkHostTrace(AffinityConfirmCount) - if !shouldHold && !affinityHolds { - return true, nil - } - if shouldHold && trackerFulfilled && affinityHolds { - return true, nil - } - return false, nil - }); pollErr != nil { - trackerFulfilled, _ := tracker.checkHostTrace(AffinityConfirmCount) - if pollErr != wait.ErrWaitTimeout { - checkAffinityFailed(tracker, pollErr.Error()) - return false - } - if !trackerFulfilled { - checkAffinityFailed(tracker, fmt.Sprintf("Connection to %s timed out or not enough responses.", serviceIPPort)) - } - if shouldHold { - checkAffinityFailed(tracker, "Affinity should hold but didn't.") - } else { - checkAffinityFailed(tracker, "Affinity shouldn't hold but did.") - } - return true - } - return true -} - -// affinityTracker tracks the destination of a request for the affinity tests. -type affinityTracker struct { - hostTrace []string -} - -// Record the response going to a given host. -func (at *affinityTracker) recordHost(host string) { - at.hostTrace = append(at.hostTrace, host) - framework.Logf("Received response from host: %s", host) -} - -// Check that we got a constant count requests going to the same host. -func (at *affinityTracker) checkHostTrace(count int) (fulfilled, affinityHolds bool) { - fulfilled = (len(at.hostTrace) >= count) - if len(at.hostTrace) == 0 { - return fulfilled, true - } - last := at.hostTrace[0:] - if len(at.hostTrace)-count >= 0 { - last = at.hostTrace[len(at.hostTrace)-count:] - } - host := at.hostTrace[len(at.hostTrace)-1] - for _, h := range last { - if h != host { - return fulfilled, false - } - } - return fulfilled, true -} - -func checkAffinityFailed(tracker affinityTracker, err string) { - framework.Logf("%v", tracker.hostTrace) - framework.Failf(err) -} diff --git a/test/e2e/framework/service/const.go b/test/e2e/framework/service/const.go index bdf14f2035d..e7839a839e5 100644 --- a/test/e2e/framework/service/const.go +++ b/test/e2e/framework/service/const.go @@ -76,16 +76,6 @@ const ( // TestTimeout is used for most polling/waiting activities TestTimeout = 60 * time.Second - // AffinityTimeout is the maximum time that CheckAffinity is allowed to take; this - // needs to be more than long enough for AffinityConfirmCount HTTP requests to - // complete in a busy CI cluster, but shouldn't be too long since we will end up - // waiting the entire time in the tests where affinity is not expected. - AffinityTimeout = 2 * time.Minute - - // AffinityConfirmCount is the number of needed continuous requests to confirm that - // affinity is enabled. - AffinityConfirmCount = 15 - // ServiceEndpointsTimeout is the maximum time in which endpoints for the service should be created. ServiceEndpointsTimeout = 2 * time.Minute diff --git a/test/e2e/framework/service/hostname.go b/test/e2e/framework/service/hostname.go deleted file mode 100644 index 171ce462740..00000000000 --- a/test/e2e/framework/service/hostname.go +++ /dev/null @@ -1,202 +0,0 @@ -/* -Copyright 2019 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package service - -import ( - "fmt" - "net" - "sort" - "strconv" - "strings" - "time" - - "github.com/onsi/ginkgo" - v1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/util/sets" - clientset "k8s.io/client-go/kubernetes" - "k8s.io/kubernetes/test/e2e/framework" - e2epod "k8s.io/kubernetes/test/e2e/framework/pod" - e2erc "k8s.io/kubernetes/test/e2e/framework/rc" - e2essh "k8s.io/kubernetes/test/e2e/framework/ssh" - testutils "k8s.io/kubernetes/test/utils" -) - -// StartServeHostnameService creates a replication controller that serves its -// hostname and a service on top of it. -func StartServeHostnameService(c clientset.Interface, svc *v1.Service, ns string, replicas int) ([]string, string, error) { - podNames := make([]string, replicas) - name := svc.ObjectMeta.Name - ginkgo.By("creating service " + name + " in namespace " + ns) - _, err := c.CoreV1().Services(ns).Create(svc) - if err != nil { - return podNames, "", err - } - - var createdPods []*v1.Pod - maxContainerFailures := 0 - config := testutils.RCConfig{ - Client: c, - Image: framework.ServeHostnameImage, - Command: []string{"/agnhost", "serve-hostname"}, - Name: name, - Namespace: ns, - PollInterval: 3 * time.Second, - Timeout: framework.PodReadyBeforeTimeout, - Replicas: replicas, - CreatedPods: &createdPods, - MaxContainerFailures: &maxContainerFailures, - } - err = e2erc.RunRC(config) - if err != nil { - return podNames, "", err - } - - if len(createdPods) != replicas { - return podNames, "", fmt.Errorf("incorrect number of running pods: %v", len(createdPods)) - } - - for i := range createdPods { - podNames[i] = createdPods[i].ObjectMeta.Name - } - sort.StringSlice(podNames).Sort() - - service, err := c.CoreV1().Services(ns).Get(name, metav1.GetOptions{}) - if err != nil { - return podNames, "", err - } - if service.Spec.ClusterIP == "" { - return podNames, "", fmt.Errorf("service IP is blank for %v", name) - } - serviceIP := service.Spec.ClusterIP - return podNames, serviceIP, nil -} - -// StopServeHostnameService stops the given service. -func StopServeHostnameService(clientset clientset.Interface, ns, name string) error { - if err := e2erc.DeleteRCAndWaitForGC(clientset, ns, name); err != nil { - return err - } - if err := clientset.CoreV1().Services(ns).Delete(name, nil); err != nil { - return err - } - return nil -} - -// VerifyServeHostnameServiceUp wgets the given serviceIP:servicePort from the -// given host and from within a pod. The host is expected to be an SSH-able node -// in the cluster. Each pod in the service is expected to echo its name. These -// names are compared with the given expectedPods list after a sort | uniq. -func VerifyServeHostnameServiceUp(c clientset.Interface, ns, host string, expectedPods []string, serviceIP string, servicePort int) error { - execPod := e2epod.CreateExecPodOrFail(c, ns, "execpod-", nil) - defer func() { - e2epod.DeletePodOrFail(c, ns, execPod.Name) - }() - - // Loop a bunch of times - the proxy is randomized, so we want a good - // chance of hitting each backend at least once. - buildCommand := func(wget string) string { - serviceIPPort := net.JoinHostPort(serviceIP, strconv.Itoa(servicePort)) - return fmt.Sprintf("for i in $(seq 1 %d); do %s http://%s 2>&1 || true; echo; done", - 50*len(expectedPods), wget, serviceIPPort) - } - commands := []func() string{ - // verify service from node - func() string { - cmd := "set -e; " + buildCommand("wget -q --timeout=0.2 --tries=1 -O -") - framework.Logf("Executing cmd %q on host %v", cmd, host) - result, err := e2essh.SSH(cmd, host, framework.TestContext.Provider) - if err != nil || result.Code != 0 { - e2essh.LogResult(result) - framework.Logf("error while SSH-ing to node: %v", err) - } - return result.Stdout - }, - // verify service from pod - func() string { - cmd := buildCommand("wget -q -T 1 -O -") - framework.Logf("Executing cmd %q in pod %v/%v", cmd, ns, execPod.Name) - // TODO: Use exec-over-http via the netexec pod instead of kubectl exec. - output, err := framework.RunHostCmd(ns, execPod.Name, cmd) - if err != nil { - framework.Logf("error while kubectl execing %q in pod %v/%v: %v\nOutput: %v", cmd, ns, execPod.Name, err, output) - } - return output - }, - } - - expectedEndpoints := sets.NewString(expectedPods...) - ginkgo.By(fmt.Sprintf("verifying service has %d reachable backends", len(expectedPods))) - for _, cmdFunc := range commands { - passed := false - gotEndpoints := sets.NewString() - - // Retry cmdFunc for a while - for start := time.Now(); time.Since(start) < KubeProxyLagTimeout; time.Sleep(5 * time.Second) { - for _, endpoint := range strings.Split(cmdFunc(), "\n") { - trimmedEp := strings.TrimSpace(endpoint) - if trimmedEp != "" { - gotEndpoints.Insert(trimmedEp) - } - } - // TODO: simply checking that the retrieved endpoints is a superset - // of the expected allows us to ignore intermitten network flakes that - // result in output like "wget timed out", but these should be rare - // and we need a better way to track how often it occurs. - if gotEndpoints.IsSuperset(expectedEndpoints) { - if !gotEndpoints.Equal(expectedEndpoints) { - framework.Logf("Ignoring unexpected output wgetting endpoints of service %s: %v", serviceIP, gotEndpoints.Difference(expectedEndpoints)) - } - passed = true - break - } - framework.Logf("Unable to reach the following endpoints of service %s: %v", serviceIP, expectedEndpoints.Difference(gotEndpoints)) - } - if !passed { - // Sort the lists so they're easier to visually diff. - exp := expectedEndpoints.List() - got := gotEndpoints.List() - sort.StringSlice(exp).Sort() - sort.StringSlice(got).Sort() - return fmt.Errorf("service verification failed for: %s\nexpected %v\nreceived %v", serviceIP, exp, got) - } - } - return nil -} - -// VerifyServeHostnameServiceDown verifies that the given service isn't served. -func VerifyServeHostnameServiceDown(c clientset.Interface, host string, serviceIP string, servicePort int) error { - ipPort := net.JoinHostPort(serviceIP, strconv.Itoa(servicePort)) - // The current versions of curl included in CentOS and RHEL distros - // misinterpret square brackets around IPv6 as globbing, so use the -g - // argument to disable globbing to handle the IPv6 case. - command := fmt.Sprintf( - "curl -g -s --connect-timeout 2 http://%s && exit 99", ipPort) - - for start := time.Now(); time.Since(start) < time.Minute; time.Sleep(5 * time.Second) { - result, err := e2essh.SSH(command, host, framework.TestContext.Provider) - if err != nil { - e2essh.LogResult(result) - framework.Logf("error while SSH-ing to node: %v", err) - } - if result.Code != 99 { - return nil - } - framework.Logf("service still alive - still waiting") - } - return fmt.Errorf("waiting for service to be down timed out") -} diff --git a/test/e2e/framework/service/resource.go b/test/e2e/framework/service/resource.go index 87c17ac678f..91697939ca5 100644 --- a/test/e2e/framework/service/resource.go +++ b/test/e2e/framework/service/resource.go @@ -17,7 +17,6 @@ limitations under the License. package service import ( - "fmt" "time" v1 "k8s.io/api/core/v1" @@ -101,14 +100,6 @@ func EnableAndDisableInternalLB() (enable func(svc *v1.Service), disable func(sv return framework.TestContext.CloudConfig.Provider.EnableAndDisableInternalLB() } -// DescribeSvc logs the output of kubectl describe svc for the given namespace -func DescribeSvc(ns string) { - framework.Logf("\nOutput of kubectl describe svc:\n") - desc, _ := framework.RunKubectl( - ns, "describe", "svc", fmt.Sprintf("--namespace=%v", ns)) - framework.Logf(desc) -} - // GetServiceLoadBalancerCreationTimeout returns a timeout value for creating a load balancer of a service. func GetServiceLoadBalancerCreationTimeout(cs clientset.Interface) time.Duration { nodes, err := e2enode.GetReadySchedulableNodes(cs) diff --git a/test/e2e/framework/service/util.go b/test/e2e/framework/service/util.go index 47d4063ca11..dcea55be35b 100644 --- a/test/e2e/framework/service/util.go +++ b/test/e2e/framework/service/util.go @@ -17,15 +17,8 @@ limitations under the License. package service import ( - "bytes" - "fmt" - "net" - "net/http" - "strconv" - "strings" "time" - utilnet "k8s.io/apimachinery/pkg/util/net" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/kubernetes/test/e2e/framework" e2enetwork "k8s.io/kubernetes/test/e2e/framework/network" @@ -58,288 +51,3 @@ func TestReachableHTTPWithRetriableErrorCodes(host string, port int, retriableEr } } } - -// TestNotReachableHTTP tests that a HTTP request doesn't connect to the given host and port. -func TestNotReachableHTTP(host string, port int, timeout time.Duration) { - pollfn := func() (bool, error) { - result := e2enetwork.PokeHTTP(host, port, "/", nil) - if result.Code == 0 { - return true, nil - } - return false, nil // caller can retry - } - - if err := wait.PollImmediate(framework.Poll, timeout, pollfn); err != nil { - framework.Failf("HTTP service %v:%v reachable after %v: %v", host, port, timeout, err) - } -} - -// TestRejectedHTTP tests that the given host rejects a HTTP request on the given port. -func TestRejectedHTTP(host string, port int, timeout time.Duration) { - pollfn := func() (bool, error) { - result := e2enetwork.PokeHTTP(host, port, "/", nil) - if result.Status == e2enetwork.HTTPRefused { - return true, nil - } - return false, nil // caller can retry - } - - if err := wait.PollImmediate(framework.Poll, timeout, pollfn); err != nil { - framework.Failf("HTTP service %v:%v not rejected: %v", host, port, err) - } -} - -// TestReachableUDP tests that the given host serves UDP on the given port. -func TestReachableUDP(host string, port int, timeout time.Duration) { - pollfn := func() (bool, error) { - result := pokeUDP(host, port, "echo hello", &UDPPokeParams{ - Timeout: 3 * time.Second, - Response: "hello", - }) - if result.Status == UDPSuccess { - return true, nil - } - return false, nil // caller can retry - } - - if err := wait.PollImmediate(framework.Poll, timeout, pollfn); err != nil { - framework.Failf("Could not reach UDP service through %v:%v after %v: %v", host, port, timeout, err) - } -} - -// TestNotReachableUDP tests that the given host doesn't serve UDP on the given port. -func TestNotReachableUDP(host string, port int, timeout time.Duration) { - pollfn := func() (bool, error) { - result := pokeUDP(host, port, "echo hello", &UDPPokeParams{Timeout: 3 * time.Second}) - if result.Status != UDPSuccess && result.Status != UDPError { - return true, nil - } - return false, nil // caller can retry - } - if err := wait.PollImmediate(framework.Poll, timeout, pollfn); err != nil { - framework.Failf("UDP service %v:%v reachable after %v: %v", host, port, timeout, err) - } -} - -// TestRejectedUDP tests that the given host rejects a UDP request on the given port. -func TestRejectedUDP(host string, port int, timeout time.Duration) { - pollfn := func() (bool, error) { - result := pokeUDP(host, port, "echo hello", &UDPPokeParams{Timeout: 3 * time.Second}) - if result.Status == UDPRefused { - return true, nil - } - return false, nil // caller can retry - } - if err := wait.PollImmediate(framework.Poll, timeout, pollfn); err != nil { - framework.Failf("UDP service %v:%v not rejected: %v", host, port, err) - } -} - -// TestHTTPHealthCheckNodePort tests a HTTP connection by the given request to the given host and port. -func TestHTTPHealthCheckNodePort(host string, port int, request string, timeout time.Duration, expectSucceed bool, threshold int) error { - count := 0 - condition := func() (bool, error) { - success, _ := testHTTPHealthCheckNodePort(host, port, request) - if success && expectSucceed || - !success && !expectSucceed { - count++ - } - if count >= threshold { - return true, nil - } - return false, nil - } - - if err := wait.PollImmediate(time.Second, timeout, condition); err != nil { - return fmt.Errorf("error waiting for healthCheckNodePort: expected at least %d succeed=%v on %v%v, got %d", threshold, expectSucceed, host, port, count) - } - return nil -} - -func testHTTPHealthCheckNodePort(ip string, port int, request string) (bool, error) { - ipPort := net.JoinHostPort(ip, strconv.Itoa(port)) - url := fmt.Sprintf("http://%s%s", ipPort, request) - if ip == "" || port == 0 { - framework.Failf("Got empty IP for reachability check (%s)", url) - return false, fmt.Errorf("invalid input ip or port") - } - framework.Logf("Testing HTTP health check on %v", url) - resp, err := httpGetNoConnectionPoolTimeout(url, 5*time.Second) - if err != nil { - framework.Logf("Got error testing for reachability of %s: %v", url, err) - return false, err - } - defer resp.Body.Close() - if err != nil { - framework.Logf("Got error reading response from %s: %v", url, err) - return false, err - } - // HealthCheck responder returns 503 for no local endpoints - if resp.StatusCode == 503 { - return false, nil - } - // HealthCheck responder returns 200 for non-zero local endpoints - if resp.StatusCode == 200 { - return true, nil - } - return false, fmt.Errorf("unexpected HTTP response code %s from health check responder at %s", resp.Status, url) -} - -// Does an HTTP GET, but does not reuse TCP connections -// This masks problems where the iptables rule has changed, but we don't see it -func httpGetNoConnectionPoolTimeout(url string, timeout time.Duration) (*http.Response, error) { - tr := utilnet.SetTransportDefaults(&http.Transport{ - DisableKeepAlives: true, - }) - client := &http.Client{ - Transport: tr, - Timeout: timeout, - } - return client.Get(url) -} - -// GetHTTPContent returns the content of the given url by HTTP. -func GetHTTPContent(host string, port int, timeout time.Duration, url string) bytes.Buffer { - var body bytes.Buffer - if pollErr := wait.PollImmediate(framework.Poll, timeout, func() (bool, error) { - result := e2enetwork.PokeHTTP(host, port, url, nil) - if result.Status == e2enetwork.HTTPSuccess { - body.Write(result.Body) - return true, nil - } - return false, nil - }); pollErr != nil { - framework.Failf("Could not reach HTTP service through %v:%v%v after %v: %v", host, port, url, timeout, pollErr) - } - return body -} - -// UDPPokeParams is a struct for UDP poke parameters. -type UDPPokeParams struct { - Timeout time.Duration - Response string -} - -// UDPPokeResult is a struct for UDP poke result. -type UDPPokeResult struct { - Status UDPPokeStatus - Error error // if there was any error - Response []byte // if code != 0 -} - -// UDPPokeStatus is string for representing UDP poke status. -type UDPPokeStatus string - -const ( - // UDPSuccess is UDP poke status which is success. - UDPSuccess UDPPokeStatus = "Success" - // UDPError is UDP poke status which is error. - UDPError UDPPokeStatus = "UnknownError" - // UDPTimeout is UDP poke status which is timeout. - UDPTimeout UDPPokeStatus = "TimedOut" - // UDPRefused is UDP poke status which is connection refused. - UDPRefused UDPPokeStatus = "ConnectionRefused" - // UDPBadResponse is UDP poke status which is bad response. - UDPBadResponse UDPPokeStatus = "BadResponse" - // Any time we add new errors, we should audit all callers of this. -) - -// pokeUDP tries to connect to a host on a port and send the given request. Callers -// can specify additional success parameters, if desired. -// -// The result status will be characterized as precisely as possible, given the -// known users of this. -// -// The result error will be populated for any status other than Success. -// -// The result response will be populated if the UDP transaction was completed, even -// if the other test params make this a failure). -func pokeUDP(host string, port int, request string, params *UDPPokeParams) UDPPokeResult { - hostPort := net.JoinHostPort(host, strconv.Itoa(port)) - url := fmt.Sprintf("udp://%s", hostPort) - - ret := UDPPokeResult{} - - // Sanity check inputs, because it has happened. These are the only things - // that should hard fail the test - they are basically ASSERT()s. - if host == "" { - framework.Failf("Got empty host for UDP poke (%s)", url) - return ret - } - if port == 0 { - framework.Failf("Got port==0 for UDP poke (%s)", url) - return ret - } - - // Set default params. - if params == nil { - params = &UDPPokeParams{} - } - - framework.Logf("Poking %v", url) - - con, err := net.Dial("udp", hostPort) - if err != nil { - ret.Status = UDPError - ret.Error = err - framework.Logf("Poke(%q): %v", url, err) - return ret - } - - _, err = con.Write([]byte(fmt.Sprintf("%s\n", request))) - if err != nil { - ret.Error = err - neterr, ok := err.(net.Error) - if ok && neterr.Timeout() { - ret.Status = UDPTimeout - } else if strings.Contains(err.Error(), "connection refused") { - ret.Status = UDPRefused - } else { - ret.Status = UDPError - } - framework.Logf("Poke(%q): %v", url, err) - return ret - } - - if params.Timeout != 0 { - err = con.SetDeadline(time.Now().Add(params.Timeout)) - if err != nil { - ret.Status = UDPError - ret.Error = err - framework.Logf("Poke(%q): %v", url, err) - return ret - } - } - - bufsize := len(params.Response) + 1 - if bufsize == 0 { - bufsize = 4096 - } - var buf = make([]byte, bufsize) - n, err := con.Read(buf) - if err != nil { - ret.Error = err - neterr, ok := err.(net.Error) - if ok && neterr.Timeout() { - ret.Status = UDPTimeout - } else if strings.Contains(err.Error(), "connection refused") { - ret.Status = UDPRefused - } else { - ret.Status = UDPError - } - framework.Logf("Poke(%q): %v", url, err) - return ret - } - ret.Response = buf[0:n] - - if params.Response != "" && string(ret.Response) != params.Response { - ret.Status = UDPBadResponse - ret.Error = fmt.Errorf("response does not match expected string: %q", string(ret.Response)) - framework.Logf("Poke(%q): %v", url, ret.Error) - return ret - } - - ret.Status = UDPSuccess - framework.Logf("Poke(%q): success", url) - return ret -} diff --git a/test/e2e/framework/service/wait.go b/test/e2e/framework/service/wait.go index 738a008951e..67d1b1b6370 100644 --- a/test/e2e/framework/service/wait.go +++ b/test/e2e/framework/service/wait.go @@ -17,7 +17,6 @@ limitations under the License. package service import ( - "context" "fmt" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -30,43 +29,6 @@ import ( "github.com/onsi/ginkgo" ) -// WaitForServiceResponding waits for the service to be responding. -func WaitForServiceResponding(c clientset.Interface, ns, name string) error { - ginkgo.By(fmt.Sprintf("trying to dial the service %s.%s via the proxy", ns, name)) - - return wait.PollImmediate(framework.Poll, RespondingTimeout, func() (done bool, err error) { - proxyRequest, errProxy := GetServicesProxyRequest(c, c.CoreV1().RESTClient().Get()) - if errProxy != nil { - framework.Logf("Failed to get services proxy request: %v:", errProxy) - return false, nil - } - - ctx, cancel := context.WithTimeout(context.Background(), framework.SingleCallTimeout) - defer cancel() - - body, err := proxyRequest.Namespace(ns). - Context(ctx). - Name(name). - Do(). - Raw() - if err != nil { - if ctx.Err() != nil { - framework.Failf("Failed to GET from service %s: %v", name, err) - return true, err - } - framework.Logf("Failed to GET from service %s: %v:", name, err) - return false, nil - } - got := string(body) - if len(got) == 0 { - framework.Logf("Service %s: expected non-empty response", name) - return false, err // stop polling - } - framework.Logf("Service %s: found nonempty answer: %s", name, got) - return true, nil - }) -} - // WaitForServiceDeletedWithFinalizer waits for the service with finalizer to be deleted. func WaitForServiceDeletedWithFinalizer(cs clientset.Interface, namespace, name string) { ginkgo.By("Delete service with finalizer") diff --git a/test/e2e/network/BUILD b/test/e2e/network/BUILD index cd28328cf1a..81fcdb3dea6 100644 --- a/test/e2e/network/BUILD +++ b/test/e2e/network/BUILD @@ -17,6 +17,7 @@ go_library( "endpointslice.go", "example_cluster_dns.go", "firewall.go", + "fixture.go", "framework.go", "ingress.go", "ingress_scale.go", @@ -29,6 +30,7 @@ go_library( "proxy.go", "service.go", "service_latency.go", + "util.go", "util_iperf.go", ], importpath = "k8s.io/kubernetes/test/e2e/network", @@ -57,6 +59,7 @@ go_library( "//staging/src/k8s.io/client-go/kubernetes:go_default_library", "//staging/src/k8s.io/client-go/tools/cache:go_default_library", "//staging/src/k8s.io/client-go/util/flowcontrol:go_default_library", + "//staging/src/k8s.io/client-go/util/retry:go_default_library", "//staging/src/k8s.io/client-go/util/workqueue:go_default_library", "//staging/src/k8s.io/cloud-provider:go_default_library", "//staging/src/k8s.io/legacy-cloud-providers/gce:go_default_library", diff --git a/test/e2e/network/dual_stack.go b/test/e2e/network/dual_stack.go index 8413d94cc43..6c8a879b95b 100644 --- a/test/e2e/network/dual_stack.go +++ b/test/e2e/network/dual_stack.go @@ -217,7 +217,7 @@ var _ = SIGDescribe("[Feature:IPv6DualStackAlphaFeature] [LinuxOnly]", func() { defaultIPFamily = v1.IPv6Protocol } - t := e2eservice.NewServerTest(cs, ns, serviceName) + t := NewServerTest(cs, ns, serviceName) defer func() { defer ginkgo.GinkgoRecover() if errs := t.Cleanup(); len(errs) != 0 { @@ -254,7 +254,7 @@ var _ = SIGDescribe("[Feature:IPv6DualStackAlphaFeature] [LinuxOnly]", func() { jig := e2eservice.NewTestJig(cs, ns, serviceName) - t := e2eservice.NewServerTest(cs, ns, serviceName) + t := NewServerTest(cs, ns, serviceName) defer func() { defer ginkgo.GinkgoRecover() if errs := t.Cleanup(); len(errs) != 0 { @@ -291,7 +291,7 @@ var _ = SIGDescribe("[Feature:IPv6DualStackAlphaFeature] [LinuxOnly]", func() { jig := e2eservice.NewTestJig(cs, ns, serviceName) - t := e2eservice.NewServerTest(cs, ns, serviceName) + t := NewServerTest(cs, ns, serviceName) defer func() { defer ginkgo.GinkgoRecover() if errs := t.Cleanup(); len(errs) != 0 { diff --git a/test/e2e/network/example_cluster_dns.go b/test/e2e/network/example_cluster_dns.go index a9d3fb58c20..52b7bf58222 100644 --- a/test/e2e/network/example_cluster_dns.go +++ b/test/e2e/network/example_cluster_dns.go @@ -17,6 +17,7 @@ limitations under the License. package network import ( + "context" "fmt" "io/ioutil" "os" @@ -28,6 +29,7 @@ import ( v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/util/wait" clientset "k8s.io/client-go/kubernetes" api "k8s.io/kubernetes/pkg/apis/core" "k8s.io/kubernetes/test/e2e/framework" @@ -37,6 +39,9 @@ import ( const ( dnsReadyTimeout = time.Minute + + // RespondingTimeout is how long to wait for a service to be responding. + RespondingTimeout = 2 * time.Minute ) const queryDNSPythonTemplate string = ` @@ -110,7 +115,7 @@ var _ = SIGDescribe("ClusterDns [Feature:Example]", func() { framework.ExpectNoError(err, "waiting for all pods to respond") framework.Logf("found %d backend pods responding in namespace %s", len(pods.Items), ns.Name) - err = e2eservice.WaitForServiceResponding(c, ns.Name, backendSvcName) + err = waitForServiceResponding(c, ns.Name, backendSvcName) framework.ExpectNoError(err, "waiting for the service to respond") } @@ -171,3 +176,40 @@ func prepareResourceWithReplacedString(inputFile, old, new string) string { podYaml := strings.Replace(string(data), old, new, 1) return podYaml } + +// waitForServiceResponding waits for the service to be responding. +func waitForServiceResponding(c clientset.Interface, ns, name string) error { + ginkgo.By(fmt.Sprintf("trying to dial the service %s.%s via the proxy", ns, name)) + + return wait.PollImmediate(framework.Poll, RespondingTimeout, func() (done bool, err error) { + proxyRequest, errProxy := e2eservice.GetServicesProxyRequest(c, c.CoreV1().RESTClient().Get()) + if errProxy != nil { + framework.Logf("Failed to get services proxy request: %v:", errProxy) + return false, nil + } + + ctx, cancel := context.WithTimeout(context.Background(), framework.SingleCallTimeout) + defer cancel() + + body, err := proxyRequest.Namespace(ns). + Context(ctx). + Name(name). + Do(). + Raw() + if err != nil { + if ctx.Err() != nil { + framework.Failf("Failed to GET from service %s: %v", name, err) + return true, err + } + framework.Logf("Failed to GET from service %s: %v:", name, err) + return false, nil + } + got := string(body) + if len(got) == 0 { + framework.Logf("Service %s: expected non-empty response", name) + return false, err // stop polling + } + framework.Logf("Service %s: found nonempty answer: %s", name, got) + return true, nil + }) +} diff --git a/test/e2e/framework/service/fixture.go b/test/e2e/network/fixture.go similarity index 99% rename from test/e2e/framework/service/fixture.go rename to test/e2e/network/fixture.go index fbf7e9fe6d5..b6f2521eb83 100644 --- a/test/e2e/framework/service/fixture.go +++ b/test/e2e/network/fixture.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package service +package network import ( v1 "k8s.io/api/core/v1" diff --git a/test/e2e/network/network_tiers.go b/test/e2e/network/network_tiers.go index 5edda5e5fa6..b495fa381ec 100644 --- a/test/e2e/network/network_tiers.go +++ b/test/e2e/network/network_tiers.go @@ -50,7 +50,7 @@ var _ = SIGDescribe("Services [Feature:GCEAlphaFeature][Slow]", func() { ginkgo.AfterEach(func() { if ginkgo.CurrentGinkgoTestDescription().Failed { - e2eservice.DescribeSvc(f.Namespace.Name) + DescribeSvc(f.Namespace.Name) } for _, lb := range serviceLBNames { framework.Logf("cleaning gce resource for %s", lb) diff --git a/test/e2e/network/networking.go b/test/e2e/network/networking.go index 7b06535b69b..275edb46c60 100644 --- a/test/e2e/network/networking.go +++ b/test/e2e/network/networking.go @@ -30,7 +30,6 @@ import ( "k8s.io/kubernetes/test/e2e/framework" e2enetwork "k8s.io/kubernetes/test/e2e/framework/network" e2epod "k8s.io/kubernetes/test/e2e/framework/pod" - e2eservice "k8s.io/kubernetes/test/e2e/framework/service" e2essh "k8s.io/kubernetes/test/e2e/framework/ssh" "github.com/onsi/ginkgo" @@ -327,9 +326,9 @@ var _ = SIGDescribe("Networking", func() { svc := "iptables-flush-test" defer func() { - framework.ExpectNoError(e2eservice.StopServeHostnameService(f.ClientSet, ns, svc)) + framework.ExpectNoError(StopServeHostnameService(f.ClientSet, ns, svc)) }() - podNames, svcIP, err := e2eservice.StartServeHostnameService(f.ClientSet, getServeHostnameService(svc), ns, numPods) + podNames, svcIP, err := StartServeHostnameService(f.ClientSet, getServeHostnameService(svc), ns, numPods) framework.ExpectNoError(err, "failed to create replication controller with service: %s in the namespace: %s", svc, ns) // Ideally we want to reload the system firewall, but we don't necessarily @@ -377,7 +376,7 @@ var _ = SIGDescribe("Networking", func() { } ginkgo.By("verifying that kube-proxy rules are eventually recreated") - framework.ExpectNoError(e2eservice.VerifyServeHostnameServiceUp(f.ClientSet, ns, host, podNames, svcIP, servicePort)) + framework.ExpectNoError(verifyServeHostnameServiceUp(f.ClientSet, ns, host, podNames, svcIP, servicePort)) ginkgo.By("verifying that kubelet rules are eventually recreated") err = utilwait.PollImmediate(framework.Poll, framework.RestartNodeReadyAgainTimeout, func() (bool, error) { diff --git a/test/e2e/network/service.go b/test/e2e/network/service.go index 2349bd22833..e1a4052003c 100644 --- a/test/e2e/network/service.go +++ b/test/e2e/network/service.go @@ -22,10 +22,14 @@ import ( "fmt" "math/rand" "net" + "net/http" + "sort" "strconv" "strings" "time" + utilnet "k8s.io/apimachinery/pkg/util/net" + compute "google.golang.org/api/compute/v1" appsv1 "k8s.io/api/apps/v1" @@ -33,6 +37,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" clientset "k8s.io/client-go/kubernetes" cloudprovider "k8s.io/cloud-provider" @@ -46,6 +51,7 @@ import ( e2erc "k8s.io/kubernetes/test/e2e/framework/rc" e2eservice "k8s.io/kubernetes/test/e2e/framework/service" e2essh "k8s.io/kubernetes/test/e2e/framework/ssh" + testutils "k8s.io/kubernetes/test/utils" imageutils "k8s.io/kubernetes/test/utils/image" gcecloud "k8s.io/legacy-cloud-providers/gce" @@ -56,6 +62,26 @@ import ( const ( defaultServeHostnameServicePort = 80 defaultServeHostnameServiceName = "svc-hostname" + + // KubeProxyLagTimeout is the maximum time a kube-proxy daemon on a node is allowed + // to not notice a Service update, such as type=NodePort. + // TODO: This timeout should be O(10s), observed values are O(1m), 5m is very + // liberal. Fix tracked in #20567. + KubeProxyLagTimeout = 5 * time.Minute + + // LoadBalancerPollTimeout is the time required by the loadbalancer to poll. + // On average it takes ~6 minutes for a single backend to come online in GCE. + LoadBalancerPollTimeout = 22 * time.Minute + + // AffinityTimeout is the maximum time that CheckAffinity is allowed to take; this + // needs to be more than long enough for AffinityConfirmCount HTTP requests to + // complete in a busy CI cluster, but shouldn't be too long since we will end up + // waiting the entire time in the tests where affinity is not expected. + AffinityTimeout = 2 * time.Minute + + // AffinityConfirmCount is the number of needed continuous requests to confirm that + // affinity is enabled. + AffinityConfirmCount = 15 ) var ( @@ -76,6 +102,527 @@ var ( } ) +// CheckAffinity function tests whether the service affinity works as expected. +// If affinity is expected, the test will return true once affinityConfirmCount +// number of same response observed in a row. If affinity is not expected, the +// test will keep observe until different responses observed. The function will +// return false only in case of unexpected errors. +func checkAffinity(execPod *v1.Pod, serviceIP string, servicePort int, shouldHold bool) bool { + serviceIPPort := net.JoinHostPort(serviceIP, strconv.Itoa(servicePort)) + cmd := fmt.Sprintf(`curl -q -s --connect-timeout 2 http://%s/`, serviceIPPort) + timeout := AffinityTimeout + if execPod == nil { + timeout = LoadBalancerPollTimeout + } + var tracker affinityTracker + if pollErr := wait.PollImmediate(framework.Poll, timeout, func() (bool, error) { + if execPod != nil { + stdout, err := framework.RunHostCmd(execPod.Namespace, execPod.Name, cmd) + if err != nil { + framework.Logf("Failed to get response from %s. Retry until timeout", serviceIPPort) + return false, nil + } + tracker.recordHost(stdout) + } else { + rawResponse := GetHTTPContent(serviceIP, servicePort, timeout, "") + tracker.recordHost(rawResponse.String()) + } + trackerFulfilled, affinityHolds := tracker.checkHostTrace(AffinityConfirmCount) + if !shouldHold && !affinityHolds { + return true, nil + } + if shouldHold && trackerFulfilled && affinityHolds { + return true, nil + } + return false, nil + }); pollErr != nil { + trackerFulfilled, _ := tracker.checkHostTrace(AffinityConfirmCount) + if pollErr != wait.ErrWaitTimeout { + checkAffinityFailed(tracker, pollErr.Error()) + return false + } + if !trackerFulfilled { + checkAffinityFailed(tracker, fmt.Sprintf("Connection to %s timed out or not enough responses.", serviceIPPort)) + } + if shouldHold { + checkAffinityFailed(tracker, "Affinity should hold but didn't.") + } else { + checkAffinityFailed(tracker, "Affinity shouldn't hold but did.") + } + return true + } + return true +} + +// affinityTracker tracks the destination of a request for the affinity tests. +type affinityTracker struct { + hostTrace []string +} + +// Record the response going to a given host. +func (at *affinityTracker) recordHost(host string) { + at.hostTrace = append(at.hostTrace, host) + framework.Logf("Received response from host: %s", host) +} + +// Check that we got a constant count requests going to the same host. +func (at *affinityTracker) checkHostTrace(count int) (fulfilled, affinityHolds bool) { + fulfilled = (len(at.hostTrace) >= count) + if len(at.hostTrace) == 0 { + return fulfilled, true + } + last := at.hostTrace[0:] + if len(at.hostTrace)-count >= 0 { + last = at.hostTrace[len(at.hostTrace)-count:] + } + host := at.hostTrace[len(at.hostTrace)-1] + for _, h := range last { + if h != host { + return fulfilled, false + } + } + return fulfilled, true +} + +func checkAffinityFailed(tracker affinityTracker, err string) { + framework.Logf("%v", tracker.hostTrace) + framework.Failf(err) +} + +// StartServeHostnameService creates a replication controller that serves its +// hostname and a service on top of it. +func StartServeHostnameService(c clientset.Interface, svc *v1.Service, ns string, replicas int) ([]string, string, error) { + podNames := make([]string, replicas) + name := svc.ObjectMeta.Name + ginkgo.By("creating service " + name + " in namespace " + ns) + _, err := c.CoreV1().Services(ns).Create(svc) + if err != nil { + return podNames, "", err + } + + var createdPods []*v1.Pod + maxContainerFailures := 0 + config := testutils.RCConfig{ + Client: c, + Image: framework.ServeHostnameImage, + Command: []string{"/agnhost", "serve-hostname"}, + Name: name, + Namespace: ns, + PollInterval: 3 * time.Second, + Timeout: framework.PodReadyBeforeTimeout, + Replicas: replicas, + CreatedPods: &createdPods, + MaxContainerFailures: &maxContainerFailures, + } + err = e2erc.RunRC(config) + if err != nil { + return podNames, "", err + } + + if len(createdPods) != replicas { + return podNames, "", fmt.Errorf("incorrect number of running pods: %v", len(createdPods)) + } + + for i := range createdPods { + podNames[i] = createdPods[i].ObjectMeta.Name + } + sort.StringSlice(podNames).Sort() + + service, err := c.CoreV1().Services(ns).Get(name, metav1.GetOptions{}) + if err != nil { + return podNames, "", err + } + if service.Spec.ClusterIP == "" { + return podNames, "", fmt.Errorf("service IP is blank for %v", name) + } + serviceIP := service.Spec.ClusterIP + return podNames, serviceIP, nil +} + +// StopServeHostnameService stops the given service. +func StopServeHostnameService(clientset clientset.Interface, ns, name string) error { + if err := e2erc.DeleteRCAndWaitForGC(clientset, ns, name); err != nil { + return err + } + if err := clientset.CoreV1().Services(ns).Delete(name, nil); err != nil { + return err + } + return nil +} + +// verifyServeHostnameServiceUp wgets the given serviceIP:servicePort from the +// given host and from within a pod. The host is expected to be an SSH-able node +// in the cluster. Each pod in the service is expected to echo its name. These +// names are compared with the given expectedPods list after a sort | uniq. +func verifyServeHostnameServiceUp(c clientset.Interface, ns, host string, expectedPods []string, serviceIP string, servicePort int) error { + execPod := e2epod.CreateExecPodOrFail(c, ns, "execpod-", nil) + defer func() { + e2epod.DeletePodOrFail(c, ns, execPod.Name) + }() + + // Loop a bunch of times - the proxy is randomized, so we want a good + // chance of hitting each backend at least once. + buildCommand := func(wget string) string { + serviceIPPort := net.JoinHostPort(serviceIP, strconv.Itoa(servicePort)) + return fmt.Sprintf("for i in $(seq 1 %d); do %s http://%s 2>&1 || true; echo; done", + 50*len(expectedPods), wget, serviceIPPort) + } + commands := []func() string{ + // verify service from node + func() string { + cmd := "set -e; " + buildCommand("wget -q --timeout=0.2 --tries=1 -O -") + framework.Logf("Executing cmd %q on host %v", cmd, host) + result, err := e2essh.SSH(cmd, host, framework.TestContext.Provider) + if err != nil || result.Code != 0 { + e2essh.LogResult(result) + framework.Logf("error while SSH-ing to node: %v", err) + } + return result.Stdout + }, + // verify service from pod + func() string { + cmd := buildCommand("wget -q -T 1 -O -") + framework.Logf("Executing cmd %q in pod %v/%v", cmd, ns, execPod.Name) + // TODO: Use exec-over-http via the netexec pod instead of kubectl exec. + output, err := framework.RunHostCmd(ns, execPod.Name, cmd) + if err != nil { + framework.Logf("error while kubectl execing %q in pod %v/%v: %v\nOutput: %v", cmd, ns, execPod.Name, err, output) + } + return output + }, + } + + expectedEndpoints := sets.NewString(expectedPods...) + ginkgo.By(fmt.Sprintf("verifying service has %d reachable backends", len(expectedPods))) + for _, cmdFunc := range commands { + passed := false + gotEndpoints := sets.NewString() + + // Retry cmdFunc for a while + for start := time.Now(); time.Since(start) < KubeProxyLagTimeout; time.Sleep(5 * time.Second) { + for _, endpoint := range strings.Split(cmdFunc(), "\n") { + trimmedEp := strings.TrimSpace(endpoint) + if trimmedEp != "" { + gotEndpoints.Insert(trimmedEp) + } + } + // TODO: simply checking that the retrieved endpoints is a superset + // of the expected allows us to ignore intermitten network flakes that + // result in output like "wget timed out", but these should be rare + // and we need a better way to track how often it occurs. + if gotEndpoints.IsSuperset(expectedEndpoints) { + if !gotEndpoints.Equal(expectedEndpoints) { + framework.Logf("Ignoring unexpected output wgetting endpoints of service %s: %v", serviceIP, gotEndpoints.Difference(expectedEndpoints)) + } + passed = true + break + } + framework.Logf("Unable to reach the following endpoints of service %s: %v", serviceIP, expectedEndpoints.Difference(gotEndpoints)) + } + if !passed { + // Sort the lists so they're easier to visually diff. + exp := expectedEndpoints.List() + got := gotEndpoints.List() + sort.StringSlice(exp).Sort() + sort.StringSlice(got).Sort() + return fmt.Errorf("service verification failed for: %s\nexpected %v\nreceived %v", serviceIP, exp, got) + } + } + return nil +} + +// verifyServeHostnameServiceDown verifies that the given service isn't served. +func verifyServeHostnameServiceDown(c clientset.Interface, host string, serviceIP string, servicePort int) error { + ipPort := net.JoinHostPort(serviceIP, strconv.Itoa(servicePort)) + // The current versions of curl included in CentOS and RHEL distros + // misinterpret square brackets around IPv6 as globbing, so use the -g + // argument to disable globbing to handle the IPv6 case. + command := fmt.Sprintf( + "curl -g -s --connect-timeout 2 http://%s && exit 99", ipPort) + + for start := time.Now(); time.Since(start) < time.Minute; time.Sleep(5 * time.Second) { + result, err := e2essh.SSH(command, host, framework.TestContext.Provider) + if err != nil { + e2essh.LogResult(result) + framework.Logf("error while SSH-ing to node: %v", err) + } + if result.Code != 99 { + return nil + } + framework.Logf("service still alive - still waiting") + } + return fmt.Errorf("waiting for service to be down timed out") +} + +// testNotReachableHTTP tests that a HTTP request doesn't connect to the given host and port. +func testNotReachableHTTP(host string, port int, timeout time.Duration) { + pollfn := func() (bool, error) { + result := e2enetwork.PokeHTTP(host, port, "/", nil) + if result.Code == 0 { + return true, nil + } + return false, nil // caller can retry + } + + if err := wait.PollImmediate(framework.Poll, timeout, pollfn); err != nil { + framework.Failf("HTTP service %v:%v reachable after %v: %v", host, port, timeout, err) + } +} + +// testRejectedHTTP tests that the given host rejects a HTTP request on the given port. +func testRejectedHTTP(host string, port int, timeout time.Duration) { + pollfn := func() (bool, error) { + result := e2enetwork.PokeHTTP(host, port, "/", nil) + if result.Status == e2enetwork.HTTPRefused { + return true, nil + } + return false, nil // caller can retry + } + + if err := wait.PollImmediate(framework.Poll, timeout, pollfn); err != nil { + framework.Failf("HTTP service %v:%v not rejected: %v", host, port, err) + } +} + +// UDPPokeParams is a struct for UDP poke parameters. +type UDPPokeParams struct { + Timeout time.Duration + Response string +} + +// UDPPokeResult is a struct for UDP poke result. +type UDPPokeResult struct { + Status UDPPokeStatus + Error error // if there was any error + Response []byte // if code != 0 +} + +// UDPPokeStatus is string for representing UDP poke status. +type UDPPokeStatus string + +const ( + // UDPSuccess is UDP poke status which is success. + UDPSuccess UDPPokeStatus = "Success" + // UDPError is UDP poke status which is error. + UDPError UDPPokeStatus = "UnknownError" + // UDPTimeout is UDP poke status which is timeout. + UDPTimeout UDPPokeStatus = "TimedOut" + // UDPRefused is UDP poke status which is connection refused. + UDPRefused UDPPokeStatus = "ConnectionRefused" + // UDPBadResponse is UDP poke status which is bad response. + UDPBadResponse UDPPokeStatus = "BadResponse" + // Any time we add new errors, we should audit all callers of this. +) + +// pokeUDP tries to connect to a host on a port and send the given request. Callers +// can specify additional success parameters, if desired. +// +// The result status will be characterized as precisely as possible, given the +// known users of this. +// +// The result error will be populated for any status other than Success. +// +// The result response will be populated if the UDP transaction was completed, even +// if the other test params make this a failure). +func pokeUDP(host string, port int, request string, params *UDPPokeParams) UDPPokeResult { + hostPort := net.JoinHostPort(host, strconv.Itoa(port)) + url := fmt.Sprintf("udp://%s", hostPort) + + ret := UDPPokeResult{} + + // Sanity check inputs, because it has happened. These are the only things + // that should hard fail the test - they are basically ASSERT()s. + if host == "" { + framework.Failf("Got empty host for UDP poke (%s)", url) + return ret + } + if port == 0 { + framework.Failf("Got port==0 for UDP poke (%s)", url) + return ret + } + + // Set default params. + if params == nil { + params = &UDPPokeParams{} + } + + framework.Logf("Poking %v", url) + + con, err := net.Dial("udp", hostPort) + if err != nil { + ret.Status = UDPError + ret.Error = err + framework.Logf("Poke(%q): %v", url, err) + return ret + } + + _, err = con.Write([]byte(fmt.Sprintf("%s\n", request))) + if err != nil { + ret.Error = err + neterr, ok := err.(net.Error) + if ok && neterr.Timeout() { + ret.Status = UDPTimeout + } else if strings.Contains(err.Error(), "connection refused") { + ret.Status = UDPRefused + } else { + ret.Status = UDPError + } + framework.Logf("Poke(%q): %v", url, err) + return ret + } + + if params.Timeout != 0 { + err = con.SetDeadline(time.Now().Add(params.Timeout)) + if err != nil { + ret.Status = UDPError + ret.Error = err + framework.Logf("Poke(%q): %v", url, err) + return ret + } + } + + bufsize := len(params.Response) + 1 + if bufsize == 0 { + bufsize = 4096 + } + var buf = make([]byte, bufsize) + n, err := con.Read(buf) + if err != nil { + ret.Error = err + neterr, ok := err.(net.Error) + if ok && neterr.Timeout() { + ret.Status = UDPTimeout + } else if strings.Contains(err.Error(), "connection refused") { + ret.Status = UDPRefused + } else { + ret.Status = UDPError + } + framework.Logf("Poke(%q): %v", url, err) + return ret + } + ret.Response = buf[0:n] + + if params.Response != "" && string(ret.Response) != params.Response { + ret.Status = UDPBadResponse + ret.Error = fmt.Errorf("response does not match expected string: %q", string(ret.Response)) + framework.Logf("Poke(%q): %v", url, ret.Error) + return ret + } + + ret.Status = UDPSuccess + framework.Logf("Poke(%q): success", url) + return ret +} + +// testReachableUDP tests that the given host serves UDP on the given port. +func testReachableUDP(host string, port int, timeout time.Duration) { + pollfn := func() (bool, error) { + result := pokeUDP(host, port, "echo hello", &UDPPokeParams{ + Timeout: 3 * time.Second, + Response: "hello", + }) + if result.Status == UDPSuccess { + return true, nil + } + return false, nil // caller can retry + } + + if err := wait.PollImmediate(framework.Poll, timeout, pollfn); err != nil { + framework.Failf("Could not reach UDP service through %v:%v after %v: %v", host, port, timeout, err) + } +} + +// testNotReachableUDP tests that the given host doesn't serve UDP on the given port. +func testNotReachableUDP(host string, port int, timeout time.Duration) { + pollfn := func() (bool, error) { + result := pokeUDP(host, port, "echo hello", &UDPPokeParams{Timeout: 3 * time.Second}) + if result.Status != UDPSuccess && result.Status != UDPError { + return true, nil + } + return false, nil // caller can retry + } + if err := wait.PollImmediate(framework.Poll, timeout, pollfn); err != nil { + framework.Failf("UDP service %v:%v reachable after %v: %v", host, port, timeout, err) + } +} + +// testRejectedUDP tests that the given host rejects a UDP request on the given port. +func testRejectedUDP(host string, port int, timeout time.Duration) { + pollfn := func() (bool, error) { + result := pokeUDP(host, port, "echo hello", &UDPPokeParams{Timeout: 3 * time.Second}) + if result.Status == UDPRefused { + return true, nil + } + return false, nil // caller can retry + } + if err := wait.PollImmediate(framework.Poll, timeout, pollfn); err != nil { + framework.Failf("UDP service %v:%v not rejected: %v", host, port, err) + } +} + +// TestHTTPHealthCheckNodePort tests a HTTP connection by the given request to the given host and port. +func TestHTTPHealthCheckNodePort(host string, port int, request string, timeout time.Duration, expectSucceed bool, threshold int) error { + count := 0 + condition := func() (bool, error) { + success, _ := testHTTPHealthCheckNodePort(host, port, request) + if success && expectSucceed || + !success && !expectSucceed { + count++ + } + if count >= threshold { + return true, nil + } + return false, nil + } + + if err := wait.PollImmediate(time.Second, timeout, condition); err != nil { + return fmt.Errorf("error waiting for healthCheckNodePort: expected at least %d succeed=%v on %v%v, got %d", threshold, expectSucceed, host, port, count) + } + return nil +} + +func testHTTPHealthCheckNodePort(ip string, port int, request string) (bool, error) { + ipPort := net.JoinHostPort(ip, strconv.Itoa(port)) + url := fmt.Sprintf("http://%s%s", ipPort, request) + if ip == "" || port == 0 { + framework.Failf("Got empty IP for reachability check (%s)", url) + return false, fmt.Errorf("invalid input ip or port") + } + framework.Logf("Testing HTTP health check on %v", url) + resp, err := httpGetNoConnectionPoolTimeout(url, 5*time.Second) + if err != nil { + framework.Logf("Got error testing for reachability of %s: %v", url, err) + return false, err + } + defer resp.Body.Close() + if err != nil { + framework.Logf("Got error reading response from %s: %v", url, err) + return false, err + } + // HealthCheck responder returns 503 for no local endpoints + if resp.StatusCode == 503 { + return false, nil + } + // HealthCheck responder returns 200 for non-zero local endpoints + if resp.StatusCode == 200 { + return true, nil + } + return false, fmt.Errorf("unexpected HTTP response code %s from health check responder at %s", resp.Status, url) +} + +// Does an HTTP GET, but does not reuse TCP connections +// This masks problems where the iptables rule has changed, but we don't see it +func httpGetNoConnectionPoolTimeout(url string, timeout time.Duration) (*http.Response, error) { + tr := utilnet.SetTransportDefaults(&http.Transport{ + DisableKeepAlives: true, + }) + client := &http.Client{ + Transport: tr, + Timeout: timeout, + } + return client.Get(url) +} + func getServeHostnameService(name string) *v1.Service { svc := defaultServeHostnameService.DeepCopy() svc.ObjectMeta.Name = name @@ -143,7 +690,7 @@ var _ = SIGDescribe("Services", func() { ginkgo.AfterEach(func() { if ginkgo.CurrentGinkgoTestDescription().Failed { - e2eservice.DescribeSvc(f.Namespace.Name) + DescribeSvc(f.Namespace.Name) } for _, lb := range serviceLBNames { framework.Logf("cleaning load balancer resource for %s", lb) @@ -432,10 +979,10 @@ var _ = SIGDescribe("Services", func() { svc3 := "up-down-3" ginkgo.By("creating " + svc1 + " in namespace " + ns) - podNames1, svc1IP, err := e2eservice.StartServeHostnameService(cs, getServeHostnameService(svc1), ns, numPods) + podNames1, svc1IP, err := StartServeHostnameService(cs, getServeHostnameService(svc1), ns, numPods) framework.ExpectNoError(err, "failed to create replication controller with service: %s in the namespace: %s", svc1, ns) ginkgo.By("creating " + svc2 + " in namespace " + ns) - podNames2, svc2IP, err := e2eservice.StartServeHostnameService(cs, getServeHostnameService(svc2), ns, numPods) + podNames2, svc2IP, err := StartServeHostnameService(cs, getServeHostnameService(svc2), ns, numPods) framework.ExpectNoError(err, "failed to create replication controller with service: %s in the namespace: %s", svc2, ns) hosts, err := e2essh.NodeSSHHosts(cs) @@ -446,23 +993,23 @@ var _ = SIGDescribe("Services", func() { host := hosts[0] ginkgo.By("verifying service " + svc1 + " is up") - framework.ExpectNoError(e2eservice.VerifyServeHostnameServiceUp(cs, ns, host, podNames1, svc1IP, servicePort)) + framework.ExpectNoError(verifyServeHostnameServiceUp(cs, ns, host, podNames1, svc1IP, servicePort)) ginkgo.By("verifying service " + svc2 + " is up") - framework.ExpectNoError(e2eservice.VerifyServeHostnameServiceUp(cs, ns, host, podNames2, svc2IP, servicePort)) + framework.ExpectNoError(verifyServeHostnameServiceUp(cs, ns, host, podNames2, svc2IP, servicePort)) // Stop service 1 and make sure it is gone. ginkgo.By("stopping service " + svc1) - framework.ExpectNoError(e2eservice.StopServeHostnameService(f.ClientSet, ns, svc1)) + framework.ExpectNoError(StopServeHostnameService(f.ClientSet, ns, svc1)) ginkgo.By("verifying service " + svc1 + " is not up") - framework.ExpectNoError(e2eservice.VerifyServeHostnameServiceDown(cs, host, svc1IP, servicePort)) + framework.ExpectNoError(verifyServeHostnameServiceDown(cs, host, svc1IP, servicePort)) ginkgo.By("verifying service " + svc2 + " is still up") - framework.ExpectNoError(e2eservice.VerifyServeHostnameServiceUp(cs, ns, host, podNames2, svc2IP, servicePort)) + framework.ExpectNoError(verifyServeHostnameServiceUp(cs, ns, host, podNames2, svc2IP, servicePort)) // Start another service and verify both are up. ginkgo.By("creating service " + svc3 + " in namespace " + ns) - podNames3, svc3IP, err := e2eservice.StartServeHostnameService(cs, getServeHostnameService(svc3), ns, numPods) + podNames3, svc3IP, err := StartServeHostnameService(cs, getServeHostnameService(svc3), ns, numPods) framework.ExpectNoError(err, "failed to create replication controller with service: %s in the namespace: %s", svc3, ns) if svc2IP == svc3IP { @@ -470,10 +1017,10 @@ var _ = SIGDescribe("Services", func() { } ginkgo.By("verifying service " + svc2 + " is still up") - framework.ExpectNoError(e2eservice.VerifyServeHostnameServiceUp(cs, ns, host, podNames2, svc2IP, servicePort)) + framework.ExpectNoError(verifyServeHostnameServiceUp(cs, ns, host, podNames2, svc2IP, servicePort)) ginkgo.By("verifying service " + svc3 + " is up") - framework.ExpectNoError(e2eservice.VerifyServeHostnameServiceUp(cs, ns, host, podNames3, svc3IP, servicePort)) + framework.ExpectNoError(verifyServeHostnameServiceUp(cs, ns, host, podNames3, svc3IP, servicePort)) }) ginkgo.It("should work after restarting kube-proxy [Disruptive]", func() { @@ -488,15 +1035,15 @@ var _ = SIGDescribe("Services", func() { svc2 := "restart-proxy-2" defer func() { - framework.ExpectNoError(e2eservice.StopServeHostnameService(f.ClientSet, ns, svc1)) + framework.ExpectNoError(StopServeHostnameService(f.ClientSet, ns, svc1)) }() - podNames1, svc1IP, err := e2eservice.StartServeHostnameService(cs, getServeHostnameService(svc1), ns, numPods) + podNames1, svc1IP, err := StartServeHostnameService(cs, getServeHostnameService(svc1), ns, numPods) framework.ExpectNoError(err, "failed to create replication controller with service: %s in the namespace: %s", svc1, ns) defer func() { - framework.ExpectNoError(e2eservice.StopServeHostnameService(f.ClientSet, ns, svc2)) + framework.ExpectNoError(StopServeHostnameService(f.ClientSet, ns, svc2)) }() - podNames2, svc2IP, err := e2eservice.StartServeHostnameService(cs, getServeHostnameService(svc2), ns, numPods) + podNames2, svc2IP, err := StartServeHostnameService(cs, getServeHostnameService(svc2), ns, numPods) framework.ExpectNoError(err, "failed to create replication controller with service: %s in the namespace: %s", svc2, ns) if svc1IP == svc2IP { @@ -510,15 +1057,15 @@ var _ = SIGDescribe("Services", func() { } host := hosts[0] - framework.ExpectNoError(e2eservice.VerifyServeHostnameServiceUp(cs, ns, host, podNames1, svc1IP, servicePort)) - framework.ExpectNoError(e2eservice.VerifyServeHostnameServiceUp(cs, ns, host, podNames2, svc2IP, servicePort)) + framework.ExpectNoError(verifyServeHostnameServiceUp(cs, ns, host, podNames1, svc1IP, servicePort)) + framework.ExpectNoError(verifyServeHostnameServiceUp(cs, ns, host, podNames2, svc2IP, servicePort)) ginkgo.By(fmt.Sprintf("Restarting kube-proxy on %v", host)) if err := restartKubeProxy(host); err != nil { framework.Failf("error restarting kube-proxy: %v", err) } - framework.ExpectNoError(e2eservice.VerifyServeHostnameServiceUp(cs, ns, host, podNames1, svc1IP, servicePort)) - framework.ExpectNoError(e2eservice.VerifyServeHostnameServiceUp(cs, ns, host, podNames2, svc2IP, servicePort)) + framework.ExpectNoError(verifyServeHostnameServiceUp(cs, ns, host, podNames1, svc1IP, servicePort)) + framework.ExpectNoError(verifyServeHostnameServiceUp(cs, ns, host, podNames2, svc2IP, servicePort)) }) ginkgo.It("should work after restarting apiserver [Disruptive]", func() { @@ -533,9 +1080,9 @@ var _ = SIGDescribe("Services", func() { svc2 := "restart-apiserver-2" defer func() { - framework.ExpectNoError(e2eservice.StopServeHostnameService(f.ClientSet, ns, svc1)) + framework.ExpectNoError(StopServeHostnameService(f.ClientSet, ns, svc1)) }() - podNames1, svc1IP, err := e2eservice.StartServeHostnameService(cs, getServeHostnameService(svc1), ns, numPods) + podNames1, svc1IP, err := StartServeHostnameService(cs, getServeHostnameService(svc1), ns, numPods) framework.ExpectNoError(err, "failed to create replication controller with service: %s in the namespace: %s", svc1, ns) hosts, err := e2essh.NodeSSHHosts(cs) @@ -545,7 +1092,7 @@ var _ = SIGDescribe("Services", func() { } host := hosts[0] - framework.ExpectNoError(e2eservice.VerifyServeHostnameServiceUp(cs, ns, host, podNames1, svc1IP, servicePort)) + framework.ExpectNoError(verifyServeHostnameServiceUp(cs, ns, host, podNames1, svc1IP, servicePort)) // Restart apiserver ginkgo.By("Restarting apiserver") @@ -556,20 +1103,20 @@ var _ = SIGDescribe("Services", func() { if err := waitForApiserverUp(cs); err != nil { framework.Failf("error while waiting for apiserver up: %v", err) } - framework.ExpectNoError(e2eservice.VerifyServeHostnameServiceUp(cs, ns, host, podNames1, svc1IP, servicePort)) + framework.ExpectNoError(verifyServeHostnameServiceUp(cs, ns, host, podNames1, svc1IP, servicePort)) // Create a new service and check if it's not reusing IP. defer func() { - framework.ExpectNoError(e2eservice.StopServeHostnameService(f.ClientSet, ns, svc2)) + framework.ExpectNoError(StopServeHostnameService(f.ClientSet, ns, svc2)) }() - podNames2, svc2IP, err := e2eservice.StartServeHostnameService(cs, getServeHostnameService(svc2), ns, numPods) + podNames2, svc2IP, err := StartServeHostnameService(cs, getServeHostnameService(svc2), ns, numPods) framework.ExpectNoError(err, "failed to create replication controller with service: %s in the namespace: %s", svc2, ns) if svc1IP == svc2IP { framework.Failf("VIPs conflict: %v", svc1IP) } - framework.ExpectNoError(e2eservice.VerifyServeHostnameServiceUp(cs, ns, host, podNames1, svc1IP, servicePort)) - framework.ExpectNoError(e2eservice.VerifyServeHostnameServiceUp(cs, ns, host, podNames2, svc2IP, servicePort)) + framework.ExpectNoError(verifyServeHostnameServiceUp(cs, ns, host, podNames1, svc1IP, servicePort)) + framework.ExpectNoError(verifyServeHostnameServiceUp(cs, ns, host, podNames2, svc2IP, servicePort)) }) /* @@ -679,7 +1226,7 @@ var _ = SIGDescribe("Services", func() { e2eservice.TestReachableHTTP(nodeIP, tcpNodePort, e2eservice.KubeProxyLagTimeout) ginkgo.By("hitting the UDP service's NodePort") - e2eservice.TestReachableUDP(nodeIP, udpNodePort, e2eservice.KubeProxyLagTimeout) + testReachableUDP(nodeIP, udpNodePort, e2eservice.KubeProxyLagTimeout) // Change the services to LoadBalancer. @@ -782,14 +1329,14 @@ var _ = SIGDescribe("Services", func() { e2eservice.TestReachableHTTP(nodeIP, tcpNodePort, e2eservice.KubeProxyLagTimeout) ginkgo.By("hitting the UDP service's NodePort") - e2eservice.TestReachableUDP(nodeIP, udpNodePort, e2eservice.KubeProxyLagTimeout) + testReachableUDP(nodeIP, udpNodePort, e2eservice.KubeProxyLagTimeout) ginkgo.By("hitting the TCP service's LoadBalancer") e2eservice.TestReachableHTTP(tcpIngressIP, svcPort, loadBalancerLagTimeout) if loadBalancerSupportsUDP { ginkgo.By("hitting the UDP service's LoadBalancer") - e2eservice.TestReachableUDP(udpIngressIP, svcPort, loadBalancerLagTimeout) + testReachableUDP(udpIngressIP, svcPort, loadBalancerLagTimeout) } // Change the services' node ports. @@ -824,20 +1371,20 @@ var _ = SIGDescribe("Services", func() { e2eservice.TestReachableHTTP(nodeIP, tcpNodePort, e2eservice.KubeProxyLagTimeout) ginkgo.By("hitting the UDP service's new NodePort") - e2eservice.TestReachableUDP(nodeIP, udpNodePort, e2eservice.KubeProxyLagTimeout) + testReachableUDP(nodeIP, udpNodePort, e2eservice.KubeProxyLagTimeout) ginkgo.By("checking the old TCP NodePort is closed") - e2eservice.TestNotReachableHTTP(nodeIP, tcpNodePortOld, e2eservice.KubeProxyLagTimeout) + testNotReachableHTTP(nodeIP, tcpNodePortOld, e2eservice.KubeProxyLagTimeout) ginkgo.By("checking the old UDP NodePort is closed") - e2eservice.TestNotReachableUDP(nodeIP, udpNodePortOld, e2eservice.KubeProxyLagTimeout) + testNotReachableUDP(nodeIP, udpNodePortOld, e2eservice.KubeProxyLagTimeout) ginkgo.By("hitting the TCP service's LoadBalancer") e2eservice.TestReachableHTTP(tcpIngressIP, svcPort, loadBalancerLagTimeout) if loadBalancerSupportsUDP { ginkgo.By("hitting the UDP service's LoadBalancer") - e2eservice.TestReachableUDP(udpIngressIP, svcPort, loadBalancerLagTimeout) + testReachableUDP(udpIngressIP, svcPort, loadBalancerLagTimeout) } // Change the services' main ports. @@ -880,14 +1427,14 @@ var _ = SIGDescribe("Services", func() { e2eservice.TestReachableHTTP(nodeIP, tcpNodePort, e2eservice.KubeProxyLagTimeout) ginkgo.By("hitting the UDP service's NodePort") - e2eservice.TestReachableUDP(nodeIP, udpNodePort, e2eservice.KubeProxyLagTimeout) + testReachableUDP(nodeIP, udpNodePort, e2eservice.KubeProxyLagTimeout) ginkgo.By("hitting the TCP service's LoadBalancer") e2eservice.TestReachableHTTP(tcpIngressIP, svcPort, loadBalancerCreateTimeout) if loadBalancerSupportsUDP { ginkgo.By("hitting the UDP service's LoadBalancer") - e2eservice.TestReachableUDP(udpIngressIP, svcPort, loadBalancerCreateTimeout) + testReachableUDP(udpIngressIP, svcPort, loadBalancerCreateTimeout) } ginkgo.By("Scaling the pods to 0") @@ -897,17 +1444,17 @@ var _ = SIGDescribe("Services", func() { framework.ExpectNoError(err) ginkgo.By("looking for ICMP REJECT on the TCP service's NodePort") - e2eservice.TestRejectedHTTP(nodeIP, tcpNodePort, e2eservice.KubeProxyLagTimeout) + testRejectedHTTP(nodeIP, tcpNodePort, e2eservice.KubeProxyLagTimeout) ginkgo.By("looking for ICMP REJECT on the UDP service's NodePort") - e2eservice.TestRejectedUDP(nodeIP, udpNodePort, e2eservice.KubeProxyLagTimeout) + testRejectedUDP(nodeIP, udpNodePort, e2eservice.KubeProxyLagTimeout) ginkgo.By("looking for ICMP REJECT on the TCP service's LoadBalancer") - e2eservice.TestRejectedHTTP(tcpIngressIP, svcPort, loadBalancerCreateTimeout) + testRejectedHTTP(tcpIngressIP, svcPort, loadBalancerCreateTimeout) if loadBalancerSupportsUDP { ginkgo.By("looking for ICMP REJECT on the UDP service's LoadBalancer") - e2eservice.TestRejectedUDP(udpIngressIP, svcPort, loadBalancerCreateTimeout) + testRejectedUDP(udpIngressIP, svcPort, loadBalancerCreateTimeout) } ginkgo.By("Scaling the pods to 1") @@ -920,14 +1467,14 @@ var _ = SIGDescribe("Services", func() { e2eservice.TestReachableHTTP(nodeIP, tcpNodePort, e2eservice.KubeProxyLagTimeout) ginkgo.By("hitting the UDP service's NodePort") - e2eservice.TestReachableUDP(nodeIP, udpNodePort, e2eservice.KubeProxyLagTimeout) + testReachableUDP(nodeIP, udpNodePort, e2eservice.KubeProxyLagTimeout) ginkgo.By("hitting the TCP service's LoadBalancer") e2eservice.TestReachableHTTP(tcpIngressIP, svcPort, loadBalancerCreateTimeout) if loadBalancerSupportsUDP { ginkgo.By("hitting the UDP service's LoadBalancer") - e2eservice.TestReachableUDP(udpIngressIP, svcPort, loadBalancerCreateTimeout) + testReachableUDP(udpIngressIP, svcPort, loadBalancerCreateTimeout) } // Change the services back to ClusterIP. @@ -955,17 +1502,17 @@ var _ = SIGDescribe("Services", func() { } ginkgo.By("checking the TCP NodePort is closed") - e2eservice.TestNotReachableHTTP(nodeIP, tcpNodePort, e2eservice.KubeProxyLagTimeout) + testNotReachableHTTP(nodeIP, tcpNodePort, e2eservice.KubeProxyLagTimeout) ginkgo.By("checking the UDP NodePort is closed") - e2eservice.TestNotReachableUDP(nodeIP, udpNodePort, e2eservice.KubeProxyLagTimeout) + testNotReachableUDP(nodeIP, udpNodePort, e2eservice.KubeProxyLagTimeout) ginkgo.By("checking the TCP LoadBalancer is closed") - e2eservice.TestNotReachableHTTP(tcpIngressIP, svcPort, loadBalancerLagTimeout) + testNotReachableHTTP(tcpIngressIP, svcPort, loadBalancerLagTimeout) if loadBalancerSupportsUDP { ginkgo.By("checking the UDP LoadBalancer is closed") - e2eservice.TestNotReachableUDP(udpIngressIP, svcPort, loadBalancerLagTimeout) + testNotReachableUDP(udpIngressIP, svcPort, loadBalancerLagTimeout) } }) @@ -1143,7 +1690,7 @@ var _ = SIGDescribe("Services", func() { externalServiceName := "externalsvc" externalServiceFQDN := createAndGetExternalServiceFQDN(cs, ns, externalServiceName) defer func() { - framework.ExpectNoError(e2eservice.StopServeHostnameService(f.ClientSet, ns, externalServiceName)) + framework.ExpectNoError(StopServeHostnameService(f.ClientSet, ns, externalServiceName)) }() ginkgo.By("changing the ClusterIP service to type=ExternalName") @@ -1185,7 +1732,7 @@ var _ = SIGDescribe("Services", func() { externalServiceName := "externalsvc" externalServiceFQDN := createAndGetExternalServiceFQDN(cs, ns, externalServiceName) defer func() { - framework.ExpectNoError(e2eservice.StopServeHostnameService(f.ClientSet, ns, externalServiceName)) + framework.ExpectNoError(StopServeHostnameService(f.ClientSet, ns, externalServiceName)) }() ginkgo.By("changing the NodePort service to type=ExternalName") @@ -1208,7 +1755,7 @@ var _ = SIGDescribe("Services", func() { serviceName2 := baseName + "2" ns := f.Namespace.Name - t := e2eservice.NewServerTest(cs, ns, serviceName1) + t := NewServerTest(cs, ns, serviceName1) defer func() { defer ginkgo.GinkgoRecover() errs := t.Cleanup() @@ -1260,7 +1807,7 @@ var _ = SIGDescribe("Services", func() { serviceName := "nodeport-range-test" ns := f.Namespace.Name - t := e2eservice.NewServerTest(cs, ns, serviceName) + t := NewServerTest(cs, ns, serviceName) defer func() { defer ginkgo.GinkgoRecover() errs := t.Cleanup() @@ -1327,7 +1874,7 @@ var _ = SIGDescribe("Services", func() { serviceName := "nodeport-reuse" ns := f.Namespace.Name - t := e2eservice.NewServerTest(cs, ns, serviceName) + t := NewServerTest(cs, ns, serviceName) defer func() { defer ginkgo.GinkgoRecover() errs := t.Cleanup() @@ -1389,7 +1936,7 @@ var _ = SIGDescribe("Services", func() { serviceName := "tolerate-unready" ns := f.Namespace.Name - t := e2eservice.NewServerTest(cs, ns, serviceName) + t := NewServerTest(cs, ns, serviceName) defer func() { defer ginkgo.GinkgoRecover() errs := t.Cleanup() @@ -1898,12 +2445,12 @@ var _ = SIGDescribe("Services", func() { ginkgo.By("creating service-disabled in namespace " + ns) svcDisabled := getServeHostnameService("service-proxy-disabled") svcDisabled.ObjectMeta.Labels = serviceProxyNameLabels - _, svcDisabledIP, err := e2eservice.StartServeHostnameService(cs, svcDisabled, ns, numPods) + _, svcDisabledIP, err := StartServeHostnameService(cs, svcDisabled, ns, numPods) framework.ExpectNoError(err, "failed to create replication controller with service: %s in the namespace: %s", svcDisabledIP, ns) ginkgo.By("creating service in namespace " + ns) svcToggled := getServeHostnameService("service-proxy-toggled") - podToggledNames, svcToggledIP, err := e2eservice.StartServeHostnameService(cs, svcToggled, ns, numPods) + podToggledNames, svcToggledIP, err := StartServeHostnameService(cs, svcToggled, ns, numPods) framework.ExpectNoError(err, "failed to create replication controller with service: %s in the namespace: %s", svcToggledIP, ns) jig := e2eservice.NewTestJig(cs, ns, svcToggled.ObjectMeta.Name) @@ -1916,10 +2463,10 @@ var _ = SIGDescribe("Services", func() { host := hosts[0] ginkgo.By("verifying service is up") - framework.ExpectNoError(e2eservice.VerifyServeHostnameServiceUp(cs, ns, host, podToggledNames, svcToggledIP, servicePort)) + framework.ExpectNoError(verifyServeHostnameServiceUp(cs, ns, host, podToggledNames, svcToggledIP, servicePort)) ginkgo.By("verifying service-disabled is not up") - framework.ExpectNoError(e2eservice.VerifyServeHostnameServiceDown(cs, host, svcDisabledIP, servicePort)) + framework.ExpectNoError(verifyServeHostnameServiceDown(cs, host, svcDisabledIP, servicePort)) ginkgo.By("adding service-proxy-name label") _, err = jig.UpdateService(func(svc *v1.Service) { @@ -1928,7 +2475,7 @@ var _ = SIGDescribe("Services", func() { framework.ExpectNoError(err) ginkgo.By("verifying service is not up") - framework.ExpectNoError(e2eservice.VerifyServeHostnameServiceDown(cs, host, svcToggledIP, servicePort)) + framework.ExpectNoError(verifyServeHostnameServiceDown(cs, host, svcToggledIP, servicePort)) ginkgo.By("removing service-proxy-name annotation") _, err = jig.UpdateService(func(svc *v1.Service) { @@ -1937,10 +2484,10 @@ var _ = SIGDescribe("Services", func() { framework.ExpectNoError(err) ginkgo.By("verifying service is up") - framework.ExpectNoError(e2eservice.VerifyServeHostnameServiceUp(cs, ns, host, podToggledNames, svcToggledIP, servicePort)) + framework.ExpectNoError(verifyServeHostnameServiceUp(cs, ns, host, podToggledNames, svcToggledIP, servicePort)) ginkgo.By("verifying service-disabled is still not up") - framework.ExpectNoError(e2eservice.VerifyServeHostnameServiceDown(cs, host, svcDisabledIP, servicePort)) + framework.ExpectNoError(verifyServeHostnameServiceDown(cs, host, svcDisabledIP, servicePort)) }) ginkgo.It("should implement service.kubernetes.io/headless", func() { @@ -1962,12 +2509,12 @@ var _ = SIGDescribe("Services", func() { svcHeadless := getServeHostnameService("service-headless") svcHeadless.ObjectMeta.Labels = serviceHeadlessLabels // This should be improved, as we do not want a Headlesss Service to contain an IP... - _, svcHeadlessIP, err := e2eservice.StartServeHostnameService(cs, svcHeadless, ns, numPods) + _, svcHeadlessIP, err := StartServeHostnameService(cs, svcHeadless, ns, numPods) framework.ExpectNoError(err, "failed to create replication controller with headless service: %s in the namespace: %s", svcHeadlessIP, ns) ginkgo.By("creating service in namespace " + ns) svcHeadlessToggled := getServeHostnameService("service-headless-toggled") - podHeadlessToggledNames, svcHeadlessToggledIP, err := e2eservice.StartServeHostnameService(cs, svcHeadlessToggled, ns, numPods) + podHeadlessToggledNames, svcHeadlessToggledIP, err := StartServeHostnameService(cs, svcHeadlessToggled, ns, numPods) framework.ExpectNoError(err, "failed to create replication controller with service: %s in the namespace: %s", svcHeadlessToggledIP, ns) jig := e2eservice.NewTestJig(cs, ns, svcHeadlessToggled.ObjectMeta.Name) @@ -1980,10 +2527,10 @@ var _ = SIGDescribe("Services", func() { host := hosts[0] ginkgo.By("verifying service is up") - framework.ExpectNoError(e2eservice.VerifyServeHostnameServiceUp(cs, ns, host, podHeadlessToggledNames, svcHeadlessToggledIP, servicePort)) + framework.ExpectNoError(verifyServeHostnameServiceUp(cs, ns, host, podHeadlessToggledNames, svcHeadlessToggledIP, servicePort)) ginkgo.By("verifying service-headless is not up") - framework.ExpectNoError(e2eservice.VerifyServeHostnameServiceDown(cs, host, svcHeadlessIP, servicePort)) + framework.ExpectNoError(verifyServeHostnameServiceDown(cs, host, svcHeadlessIP, servicePort)) ginkgo.By("adding service.kubernetes.io/headless label") _, err = jig.UpdateService(func(svc *v1.Service) { @@ -1992,7 +2539,7 @@ var _ = SIGDescribe("Services", func() { framework.ExpectNoError(err) ginkgo.By("verifying service is not up") - framework.ExpectNoError(e2eservice.VerifyServeHostnameServiceDown(cs, host, svcHeadlessToggledIP, servicePort)) + framework.ExpectNoError(verifyServeHostnameServiceDown(cs, host, svcHeadlessToggledIP, servicePort)) ginkgo.By("removing service.kubernetes.io/headless annotation") _, err = jig.UpdateService(func(svc *v1.Service) { @@ -2001,10 +2548,10 @@ var _ = SIGDescribe("Services", func() { framework.ExpectNoError(err) ginkgo.By("verifying service is up") - framework.ExpectNoError(e2eservice.VerifyServeHostnameServiceUp(cs, ns, host, podHeadlessToggledNames, svcHeadlessToggledIP, servicePort)) + framework.ExpectNoError(verifyServeHostnameServiceUp(cs, ns, host, podHeadlessToggledNames, svcHeadlessToggledIP, servicePort)) ginkgo.By("verifying service-headless is still not up") - framework.ExpectNoError(e2eservice.VerifyServeHostnameServiceDown(cs, host, svcHeadlessIP, servicePort)) + framework.ExpectNoError(verifyServeHostnameServiceDown(cs, host, svcHeadlessIP, servicePort)) }) ginkgo.It("should be rejected when no endpoints exist", func() { @@ -2107,7 +2654,7 @@ var _ = SIGDescribe("ESIPP [Slow] [DisabledForLargeClusters]", func() { ginkgo.AfterEach(func() { if ginkgo.CurrentGinkgoTestDescription().Failed { - e2eservice.DescribeSvc(f.Namespace.Name) + DescribeSvc(f.Namespace.Name) } for _, lb := range serviceLBNames { framework.Logf("cleaning load balancer resource for %s", lb) @@ -2138,7 +2685,7 @@ var _ = SIGDescribe("ESIPP [Slow] [DisabledForLargeClusters]", func() { nodes, err := jig.GetEndpointNodes() framework.ExpectNoError(err) for _, ips := range nodes { - err := e2eservice.TestHTTPHealthCheckNodePort(ips[0], healthCheckNodePort, "/healthz", e2eservice.KubeProxyEndpointLagTimeout, false, threshold) + err := TestHTTPHealthCheckNodePort(ips[0], healthCheckNodePort, "/healthz", e2eservice.KubeProxyEndpointLagTimeout, false, threshold) framework.ExpectNoError(err) } err = cs.CoreV1().Services(svc.Namespace).Delete(svc.Name, nil) @@ -2149,7 +2696,7 @@ var _ = SIGDescribe("ESIPP [Slow] [DisabledForLargeClusters]", func() { ingressIP := e2eservice.GetIngressPoint(&svc.Status.LoadBalancer.Ingress[0]) ginkgo.By("reading clientIP using the TCP service's service port via its external VIP") - content := e2eservice.GetHTTPContent(ingressIP, svcTCPPort, e2eservice.KubeProxyLagTimeout, "/clientip") + content := GetHTTPContent(ingressIP, svcTCPPort, e2eservice.KubeProxyLagTimeout, "/clientip") clientIP := content.String() framework.Logf("ClientIP detected by target pod using VIP:SvcPort is %s", clientIP) @@ -2179,7 +2726,7 @@ var _ = SIGDescribe("ESIPP [Slow] [DisabledForLargeClusters]", func() { for nodeName, nodeIPs := range endpointsNodeMap { nodeIP := nodeIPs[0] ginkgo.By(fmt.Sprintf("reading clientIP using the TCP service's NodePort, on node %v: %v%v%v", nodeName, nodeIP, tcpNodePort, path)) - content := e2eservice.GetHTTPContent(nodeIP, tcpNodePort, e2eservice.KubeProxyLagTimeout, path) + content := GetHTTPContent(nodeIP, tcpNodePort, e2eservice.KubeProxyLagTimeout, path) clientIP := content.String() framework.Logf("ClientIP detected by target pod using NodePort is %s", clientIP) if strings.HasPrefix(clientIP, "10.") { @@ -2252,7 +2799,7 @@ var _ = SIGDescribe("ESIPP [Slow] [DisabledForLargeClusters]", func() { port := strconv.Itoa(healthCheckNodePort) ipPort := net.JoinHostPort(publicIP, port) framework.Logf("Health checking %s, http://%s%s, expectedSuccess %v", nodes.Items[n].Name, ipPort, path, expectedSuccess) - err := e2eservice.TestHTTPHealthCheckNodePort(publicIP, healthCheckNodePort, path, e2eservice.KubeProxyEndpointLagTimeout, expectedSuccess, threshold) + err := TestHTTPHealthCheckNodePort(publicIP, healthCheckNodePort, path, e2eservice.KubeProxyEndpointLagTimeout, expectedSuccess, threshold) framework.ExpectNoError(err) } framework.ExpectNoError(e2erc.DeleteRCAndWaitForGC(f.ClientSet, namespace, serviceName)) @@ -2368,7 +2915,7 @@ var _ = SIGDescribe("ESIPP [Slow] [DisabledForLargeClusters]", func() { ginkgo.By(fmt.Sprintf("endpoints present on nodes %v, absent on nodes %v", endpointNodeMap, noEndpointNodeMap)) for nodeName, nodeIPs := range noEndpointNodeMap { ginkgo.By(fmt.Sprintf("Checking %v (%v:%v%v) proxies to endpoints on another node", nodeName, nodeIPs[0], svcNodePort, path)) - e2eservice.GetHTTPContent(nodeIPs[0], svcNodePort, e2eservice.KubeProxyLagTimeout, path) + GetHTTPContent(nodeIPs[0], svcNodePort, e2eservice.KubeProxyLagTimeout, path) } for nodeName, nodeIPs := range endpointNodeMap { @@ -2393,7 +2940,7 @@ var _ = SIGDescribe("ESIPP [Slow] [DisabledForLargeClusters]", func() { ginkgo.By(fmt.Sprintf("checking source ip is NOT preserved through loadbalancer %v", ingressIP)) var clientIP string pollErr := wait.PollImmediate(framework.Poll, e2eservice.KubeProxyLagTimeout, func() (bool, error) { - content := e2eservice.GetHTTPContent(ingressIP, svcTCPPort, e2eservice.KubeProxyLagTimeout, "/clientip") + content := GetHTTPContent(ingressIP, svcTCPPort, e2eservice.KubeProxyLagTimeout, "/clientip") clientIP = content.String() if strings.HasPrefix(clientIP, "10.") { return true, nil @@ -2418,7 +2965,7 @@ var _ = SIGDescribe("ESIPP [Slow] [DisabledForLargeClusters]", func() { }) framework.ExpectNoError(err) pollErr = wait.PollImmediate(framework.Poll, e2eservice.KubeProxyLagTimeout, func() (bool, error) { - content := e2eservice.GetHTTPContent(ingressIP, svcTCPPort, e2eservice.KubeProxyLagTimeout, path) + content := GetHTTPContent(ingressIP, svcTCPPort, e2eservice.KubeProxyLagTimeout, path) clientIP = content.String() ginkgo.By(fmt.Sprintf("Endpoint %v:%v%v returned client ip %v", ingressIP, svcTCPPort, path, clientIP)) if !strings.HasPrefix(clientIP, "10.") { @@ -2483,10 +3030,10 @@ func execAffinityTestForNonLBServiceWithOptionalTransition(f *framework.Framewor ginkgo.By("creating service in namespace " + ns) serviceType := svc.Spec.Type svc.Spec.SessionAffinity = v1.ServiceAffinityClientIP - _, _, err := e2eservice.StartServeHostnameService(cs, svc, ns, numPods) + _, _, err := StartServeHostnameService(cs, svc, ns, numPods) framework.ExpectNoError(err, "failed to create replication controller with service in the namespace: %s", ns) defer func() { - e2eservice.StopServeHostnameService(cs, ns, serviceName) + StopServeHostnameService(cs, ns, serviceName) }() jig := e2eservice.NewTestJig(cs, ns, serviceName) svc, err = jig.Client.CoreV1().Services(ns).Get(serviceName, metav1.GetOptions{}) @@ -2513,19 +3060,19 @@ func execAffinityTestForNonLBServiceWithOptionalTransition(f *framework.Framewor framework.ExpectNoError(err) if !isTransitionTest { - gomega.Expect(e2eservice.CheckAffinity(execPod, svcIP, servicePort, true)).To(gomega.BeTrue()) + gomega.Expect(checkAffinity(execPod, svcIP, servicePort, true)).To(gomega.BeTrue()) } if isTransitionTest { _, err = jig.UpdateService(func(svc *v1.Service) { svc.Spec.SessionAffinity = v1.ServiceAffinityNone }) framework.ExpectNoError(err) - gomega.Expect(e2eservice.CheckAffinity(execPod, svcIP, servicePort, false)).To(gomega.BeTrue()) + gomega.Expect(checkAffinity(execPod, svcIP, servicePort, false)).To(gomega.BeTrue()) _, err = jig.UpdateService(func(svc *v1.Service) { svc.Spec.SessionAffinity = v1.ServiceAffinityClientIP }) framework.ExpectNoError(err) - gomega.Expect(e2eservice.CheckAffinity(execPod, svcIP, servicePort, true)).To(gomega.BeTrue()) + gomega.Expect(checkAffinity(execPod, svcIP, servicePort, true)).To(gomega.BeTrue()) } } @@ -2545,7 +3092,7 @@ func execAffinityTestForLBServiceWithOptionalTransition(f *framework.Framework, ginkgo.By("creating service in namespace " + ns) svc.Spec.SessionAffinity = v1.ServiceAffinityClientIP - _, _, err := e2eservice.StartServeHostnameService(cs, svc, ns, numPods) + _, _, err := StartServeHostnameService(cs, svc, ns, numPods) framework.ExpectNoError(err, "failed to create replication controller with service in the namespace: %s", ns) jig := e2eservice.NewTestJig(cs, ns, serviceName) ginkgo.By("waiting for loadbalancer for service " + ns + "/" + serviceName) @@ -2554,7 +3101,7 @@ func execAffinityTestForLBServiceWithOptionalTransition(f *framework.Framework, defer func() { podNodePairs, err := e2enode.PodNodePairs(cs, ns) framework.Logf("[pod,node] pairs: %+v; err: %v", podNodePairs, err) - e2eservice.StopServeHostnameService(cs, ns, serviceName) + StopServeHostnameService(cs, ns, serviceName) lb := cloudprovider.DefaultLoadBalancerName(svc) framework.Logf("cleaning load balancer resource for %s", lb) e2eservice.CleanupServiceResources(cs, lb, framework.TestContext.CloudConfig.Region, framework.TestContext.CloudConfig.Zone) @@ -2563,24 +3110,24 @@ func execAffinityTestForLBServiceWithOptionalTransition(f *framework.Framework, port := int(svc.Spec.Ports[0].Port) if !isTransitionTest { - gomega.Expect(e2eservice.CheckAffinity(nil, ingressIP, port, true)).To(gomega.BeTrue()) + gomega.Expect(checkAffinity(nil, ingressIP, port, true)).To(gomega.BeTrue()) } if isTransitionTest { svc, err = jig.UpdateService(func(svc *v1.Service) { svc.Spec.SessionAffinity = v1.ServiceAffinityNone }) framework.ExpectNoError(err) - gomega.Expect(e2eservice.CheckAffinity(nil, ingressIP, port, false)).To(gomega.BeTrue()) + gomega.Expect(checkAffinity(nil, ingressIP, port, false)).To(gomega.BeTrue()) svc, err = jig.UpdateService(func(svc *v1.Service) { svc.Spec.SessionAffinity = v1.ServiceAffinityClientIP }) framework.ExpectNoError(err) - gomega.Expect(e2eservice.CheckAffinity(nil, ingressIP, port, true)).To(gomega.BeTrue()) + gomega.Expect(checkAffinity(nil, ingressIP, port, true)).To(gomega.BeTrue()) } } func createAndGetExternalServiceFQDN(cs clientset.Interface, ns, serviceName string) string { - _, _, err := e2eservice.StartServeHostnameService(cs, getServeHostnameService(serviceName), ns, 2) + _, _, err := StartServeHostnameService(cs, getServeHostnameService(serviceName), ns, 2) framework.ExpectNoError(err, "Expected Service %s to be running", serviceName) return fmt.Sprintf("%s.%s.svc.%s", serviceName, ns, framework.TestContext.ClusterDNSDomain) } diff --git a/test/e2e/network/util.go b/test/e2e/network/util.go new file mode 100644 index 00000000000..37d4142ea90 --- /dev/null +++ b/test/e2e/network/util.go @@ -0,0 +1,51 @@ +/* +Copyright 2014 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package network + +import ( + "bytes" + "fmt" + "time" + + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/kubernetes/test/e2e/framework" + e2enetwork "k8s.io/kubernetes/test/e2e/framework/network" +) + +// GetHTTPContent returns the content of the given url by HTTP. +func GetHTTPContent(host string, port int, timeout time.Duration, url string) bytes.Buffer { + var body bytes.Buffer + if pollErr := wait.PollImmediate(framework.Poll, timeout, func() (bool, error) { + result := e2enetwork.PokeHTTP(host, port, url, nil) + if result.Status == e2enetwork.HTTPSuccess { + body.Write(result.Body) + return true, nil + } + return false, nil + }); pollErr != nil { + framework.Failf("Could not reach HTTP service through %v:%v%v after %v: %v", host, port, url, timeout, pollErr) + } + return body +} + +// DescribeSvc logs the output of kubectl describe svc for the given namespace +func DescribeSvc(ns string) { + framework.Logf("\nOutput of kubectl describe svc:\n") + desc, _ := framework.RunKubectl( + ns, "describe", "svc", fmt.Sprintf("--namespace=%v", ns)) + framework.Logf(desc) +}