e2e: move funs of framework/service to e2e/network

Signed-off-by: clarklee92 <clarklee1992@hotmail.com>
This commit is contained in:
clarklee92 2019-12-11 20:13:34 +08:00
parent eef4c00ae9
commit c4ad07b0b1
15 changed files with 737 additions and 765 deletions

View File

@ -3,10 +3,7 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library(
name = "go_default_library",
srcs = [
"affinity_checker.go",
"const.go",
"fixture.go",
"hostname.go",
"jig.go",
"resource.go",
"util.go",
@ -32,14 +29,12 @@ go_library(
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/rest:go_default_library",
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
"//staging/src/k8s.io/client-go/util/retry:go_default_library",
"//staging/src/k8s.io/cloud-provider/service/helpers:go_default_library",
"//test/e2e/framework:go_default_library",
"//test/e2e/framework/network:go_default_library",
"//test/e2e/framework/node:go_default_library",
"//test/e2e/framework/pod:go_default_library",
"//test/e2e/framework/rc:go_default_library",
"//test/e2e/framework/ssh:go_default_library",
"//test/utils:go_default_library",
"//test/utils/image:go_default_library",
"//vendor/github.com/onsi/ginkgo:go_default_library",

View File

@ -1,114 +0,0 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package service
import (
"fmt"
"net"
"strconv"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/kubernetes/test/e2e/framework"
)
// CheckAffinity function tests whether the service affinity works as expected.
// If affinity is expected, the test will return true once affinityConfirmCount
// number of same response observed in a row. If affinity is not expected, the
// test will keep observe until different responses observed. The function will
// return false only in case of unexpected errors.
func CheckAffinity(execPod *v1.Pod, serviceIP string, servicePort int, shouldHold bool) bool {
serviceIPPort := net.JoinHostPort(serviceIP, strconv.Itoa(servicePort))
cmd := fmt.Sprintf(`curl -q -s --connect-timeout 2 http://%s/`, serviceIPPort)
timeout := AffinityTimeout
if execPod == nil {
timeout = LoadBalancerPollTimeout
}
var tracker affinityTracker
if pollErr := wait.PollImmediate(framework.Poll, timeout, func() (bool, error) {
if execPod != nil {
stdout, err := framework.RunHostCmd(execPod.Namespace, execPod.Name, cmd)
if err != nil {
framework.Logf("Failed to get response from %s. Retry until timeout", serviceIPPort)
return false, nil
}
tracker.recordHost(stdout)
} else {
rawResponse := GetHTTPContent(serviceIP, servicePort, timeout, "")
tracker.recordHost(rawResponse.String())
}
trackerFulfilled, affinityHolds := tracker.checkHostTrace(AffinityConfirmCount)
if !shouldHold && !affinityHolds {
return true, nil
}
if shouldHold && trackerFulfilled && affinityHolds {
return true, nil
}
return false, nil
}); pollErr != nil {
trackerFulfilled, _ := tracker.checkHostTrace(AffinityConfirmCount)
if pollErr != wait.ErrWaitTimeout {
checkAffinityFailed(tracker, pollErr.Error())
return false
}
if !trackerFulfilled {
checkAffinityFailed(tracker, fmt.Sprintf("Connection to %s timed out or not enough responses.", serviceIPPort))
}
if shouldHold {
checkAffinityFailed(tracker, "Affinity should hold but didn't.")
} else {
checkAffinityFailed(tracker, "Affinity shouldn't hold but did.")
}
return true
}
return true
}
// affinityTracker tracks the destination of a request for the affinity tests.
type affinityTracker struct {
hostTrace []string
}
// Record the response going to a given host.
func (at *affinityTracker) recordHost(host string) {
at.hostTrace = append(at.hostTrace, host)
framework.Logf("Received response from host: %s", host)
}
// Check that we got a constant count requests going to the same host.
func (at *affinityTracker) checkHostTrace(count int) (fulfilled, affinityHolds bool) {
fulfilled = (len(at.hostTrace) >= count)
if len(at.hostTrace) == 0 {
return fulfilled, true
}
last := at.hostTrace[0:]
if len(at.hostTrace)-count >= 0 {
last = at.hostTrace[len(at.hostTrace)-count:]
}
host := at.hostTrace[len(at.hostTrace)-1]
for _, h := range last {
if h != host {
return fulfilled, false
}
}
return fulfilled, true
}
func checkAffinityFailed(tracker affinityTracker, err string) {
framework.Logf("%v", tracker.hostTrace)
framework.Failf(err)
}

View File

@ -76,16 +76,6 @@ const (
// TestTimeout is used for most polling/waiting activities
TestTimeout = 60 * time.Second
// AffinityTimeout is the maximum time that CheckAffinity is allowed to take; this
// needs to be more than long enough for AffinityConfirmCount HTTP requests to
// complete in a busy CI cluster, but shouldn't be too long since we will end up
// waiting the entire time in the tests where affinity is not expected.
AffinityTimeout = 2 * time.Minute
// AffinityConfirmCount is the number of needed continuous requests to confirm that
// affinity is enabled.
AffinityConfirmCount = 15
// ServiceEndpointsTimeout is the maximum time in which endpoints for the service should be created.
ServiceEndpointsTimeout = 2 * time.Minute

View File

@ -1,202 +0,0 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package service
import (
"fmt"
"net"
"sort"
"strconv"
"strings"
"time"
"github.com/onsi/ginkgo"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/kubernetes/test/e2e/framework"
e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
e2erc "k8s.io/kubernetes/test/e2e/framework/rc"
e2essh "k8s.io/kubernetes/test/e2e/framework/ssh"
testutils "k8s.io/kubernetes/test/utils"
)
// StartServeHostnameService creates a replication controller that serves its
// hostname and a service on top of it.
func StartServeHostnameService(c clientset.Interface, svc *v1.Service, ns string, replicas int) ([]string, string, error) {
podNames := make([]string, replicas)
name := svc.ObjectMeta.Name
ginkgo.By("creating service " + name + " in namespace " + ns)
_, err := c.CoreV1().Services(ns).Create(svc)
if err != nil {
return podNames, "", err
}
var createdPods []*v1.Pod
maxContainerFailures := 0
config := testutils.RCConfig{
Client: c,
Image: framework.ServeHostnameImage,
Command: []string{"/agnhost", "serve-hostname"},
Name: name,
Namespace: ns,
PollInterval: 3 * time.Second,
Timeout: framework.PodReadyBeforeTimeout,
Replicas: replicas,
CreatedPods: &createdPods,
MaxContainerFailures: &maxContainerFailures,
}
err = e2erc.RunRC(config)
if err != nil {
return podNames, "", err
}
if len(createdPods) != replicas {
return podNames, "", fmt.Errorf("incorrect number of running pods: %v", len(createdPods))
}
for i := range createdPods {
podNames[i] = createdPods[i].ObjectMeta.Name
}
sort.StringSlice(podNames).Sort()
service, err := c.CoreV1().Services(ns).Get(name, metav1.GetOptions{})
if err != nil {
return podNames, "", err
}
if service.Spec.ClusterIP == "" {
return podNames, "", fmt.Errorf("service IP is blank for %v", name)
}
serviceIP := service.Spec.ClusterIP
return podNames, serviceIP, nil
}
// StopServeHostnameService stops the given service.
func StopServeHostnameService(clientset clientset.Interface, ns, name string) error {
if err := e2erc.DeleteRCAndWaitForGC(clientset, ns, name); err != nil {
return err
}
if err := clientset.CoreV1().Services(ns).Delete(name, nil); err != nil {
return err
}
return nil
}
// VerifyServeHostnameServiceUp wgets the given serviceIP:servicePort from the
// given host and from within a pod. The host is expected to be an SSH-able node
// in the cluster. Each pod in the service is expected to echo its name. These
// names are compared with the given expectedPods list after a sort | uniq.
func VerifyServeHostnameServiceUp(c clientset.Interface, ns, host string, expectedPods []string, serviceIP string, servicePort int) error {
execPod := e2epod.CreateExecPodOrFail(c, ns, "execpod-", nil)
defer func() {
e2epod.DeletePodOrFail(c, ns, execPod.Name)
}()
// Loop a bunch of times - the proxy is randomized, so we want a good
// chance of hitting each backend at least once.
buildCommand := func(wget string) string {
serviceIPPort := net.JoinHostPort(serviceIP, strconv.Itoa(servicePort))
return fmt.Sprintf("for i in $(seq 1 %d); do %s http://%s 2>&1 || true; echo; done",
50*len(expectedPods), wget, serviceIPPort)
}
commands := []func() string{
// verify service from node
func() string {
cmd := "set -e; " + buildCommand("wget -q --timeout=0.2 --tries=1 -O -")
framework.Logf("Executing cmd %q on host %v", cmd, host)
result, err := e2essh.SSH(cmd, host, framework.TestContext.Provider)
if err != nil || result.Code != 0 {
e2essh.LogResult(result)
framework.Logf("error while SSH-ing to node: %v", err)
}
return result.Stdout
},
// verify service from pod
func() string {
cmd := buildCommand("wget -q -T 1 -O -")
framework.Logf("Executing cmd %q in pod %v/%v", cmd, ns, execPod.Name)
// TODO: Use exec-over-http via the netexec pod instead of kubectl exec.
output, err := framework.RunHostCmd(ns, execPod.Name, cmd)
if err != nil {
framework.Logf("error while kubectl execing %q in pod %v/%v: %v\nOutput: %v", cmd, ns, execPod.Name, err, output)
}
return output
},
}
expectedEndpoints := sets.NewString(expectedPods...)
ginkgo.By(fmt.Sprintf("verifying service has %d reachable backends", len(expectedPods)))
for _, cmdFunc := range commands {
passed := false
gotEndpoints := sets.NewString()
// Retry cmdFunc for a while
for start := time.Now(); time.Since(start) < KubeProxyLagTimeout; time.Sleep(5 * time.Second) {
for _, endpoint := range strings.Split(cmdFunc(), "\n") {
trimmedEp := strings.TrimSpace(endpoint)
if trimmedEp != "" {
gotEndpoints.Insert(trimmedEp)
}
}
// TODO: simply checking that the retrieved endpoints is a superset
// of the expected allows us to ignore intermitten network flakes that
// result in output like "wget timed out", but these should be rare
// and we need a better way to track how often it occurs.
if gotEndpoints.IsSuperset(expectedEndpoints) {
if !gotEndpoints.Equal(expectedEndpoints) {
framework.Logf("Ignoring unexpected output wgetting endpoints of service %s: %v", serviceIP, gotEndpoints.Difference(expectedEndpoints))
}
passed = true
break
}
framework.Logf("Unable to reach the following endpoints of service %s: %v", serviceIP, expectedEndpoints.Difference(gotEndpoints))
}
if !passed {
// Sort the lists so they're easier to visually diff.
exp := expectedEndpoints.List()
got := gotEndpoints.List()
sort.StringSlice(exp).Sort()
sort.StringSlice(got).Sort()
return fmt.Errorf("service verification failed for: %s\nexpected %v\nreceived %v", serviceIP, exp, got)
}
}
return nil
}
// VerifyServeHostnameServiceDown verifies that the given service isn't served.
func VerifyServeHostnameServiceDown(c clientset.Interface, host string, serviceIP string, servicePort int) error {
ipPort := net.JoinHostPort(serviceIP, strconv.Itoa(servicePort))
// The current versions of curl included in CentOS and RHEL distros
// misinterpret square brackets around IPv6 as globbing, so use the -g
// argument to disable globbing to handle the IPv6 case.
command := fmt.Sprintf(
"curl -g -s --connect-timeout 2 http://%s && exit 99", ipPort)
for start := time.Now(); time.Since(start) < time.Minute; time.Sleep(5 * time.Second) {
result, err := e2essh.SSH(command, host, framework.TestContext.Provider)
if err != nil {
e2essh.LogResult(result)
framework.Logf("error while SSH-ing to node: %v", err)
}
if result.Code != 99 {
return nil
}
framework.Logf("service still alive - still waiting")
}
return fmt.Errorf("waiting for service to be down timed out")
}

View File

@ -17,7 +17,6 @@ limitations under the License.
package service
import (
"fmt"
"time"
v1 "k8s.io/api/core/v1"
@ -101,14 +100,6 @@ func EnableAndDisableInternalLB() (enable func(svc *v1.Service), disable func(sv
return framework.TestContext.CloudConfig.Provider.EnableAndDisableInternalLB()
}
// DescribeSvc logs the output of kubectl describe svc for the given namespace
func DescribeSvc(ns string) {
framework.Logf("\nOutput of kubectl describe svc:\n")
desc, _ := framework.RunKubectl(
ns, "describe", "svc", fmt.Sprintf("--namespace=%v", ns))
framework.Logf(desc)
}
// GetServiceLoadBalancerCreationTimeout returns a timeout value for creating a load balancer of a service.
func GetServiceLoadBalancerCreationTimeout(cs clientset.Interface) time.Duration {
nodes, err := e2enode.GetReadySchedulableNodes(cs)

View File

@ -17,15 +17,8 @@ limitations under the License.
package service
import (
"bytes"
"fmt"
"net"
"net/http"
"strconv"
"strings"
"time"
utilnet "k8s.io/apimachinery/pkg/util/net"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/kubernetes/test/e2e/framework"
e2enetwork "k8s.io/kubernetes/test/e2e/framework/network"
@ -58,288 +51,3 @@ func TestReachableHTTPWithRetriableErrorCodes(host string, port int, retriableEr
}
}
}
// TestNotReachableHTTP tests that a HTTP request doesn't connect to the given host and port.
func TestNotReachableHTTP(host string, port int, timeout time.Duration) {
pollfn := func() (bool, error) {
result := e2enetwork.PokeHTTP(host, port, "/", nil)
if result.Code == 0 {
return true, nil
}
return false, nil // caller can retry
}
if err := wait.PollImmediate(framework.Poll, timeout, pollfn); err != nil {
framework.Failf("HTTP service %v:%v reachable after %v: %v", host, port, timeout, err)
}
}
// TestRejectedHTTP tests that the given host rejects a HTTP request on the given port.
func TestRejectedHTTP(host string, port int, timeout time.Duration) {
pollfn := func() (bool, error) {
result := e2enetwork.PokeHTTP(host, port, "/", nil)
if result.Status == e2enetwork.HTTPRefused {
return true, nil
}
return false, nil // caller can retry
}
if err := wait.PollImmediate(framework.Poll, timeout, pollfn); err != nil {
framework.Failf("HTTP service %v:%v not rejected: %v", host, port, err)
}
}
// TestReachableUDP tests that the given host serves UDP on the given port.
func TestReachableUDP(host string, port int, timeout time.Duration) {
pollfn := func() (bool, error) {
result := pokeUDP(host, port, "echo hello", &UDPPokeParams{
Timeout: 3 * time.Second,
Response: "hello",
})
if result.Status == UDPSuccess {
return true, nil
}
return false, nil // caller can retry
}
if err := wait.PollImmediate(framework.Poll, timeout, pollfn); err != nil {
framework.Failf("Could not reach UDP service through %v:%v after %v: %v", host, port, timeout, err)
}
}
// TestNotReachableUDP tests that the given host doesn't serve UDP on the given port.
func TestNotReachableUDP(host string, port int, timeout time.Duration) {
pollfn := func() (bool, error) {
result := pokeUDP(host, port, "echo hello", &UDPPokeParams{Timeout: 3 * time.Second})
if result.Status != UDPSuccess && result.Status != UDPError {
return true, nil
}
return false, nil // caller can retry
}
if err := wait.PollImmediate(framework.Poll, timeout, pollfn); err != nil {
framework.Failf("UDP service %v:%v reachable after %v: %v", host, port, timeout, err)
}
}
// TestRejectedUDP tests that the given host rejects a UDP request on the given port.
func TestRejectedUDP(host string, port int, timeout time.Duration) {
pollfn := func() (bool, error) {
result := pokeUDP(host, port, "echo hello", &UDPPokeParams{Timeout: 3 * time.Second})
if result.Status == UDPRefused {
return true, nil
}
return false, nil // caller can retry
}
if err := wait.PollImmediate(framework.Poll, timeout, pollfn); err != nil {
framework.Failf("UDP service %v:%v not rejected: %v", host, port, err)
}
}
// TestHTTPHealthCheckNodePort tests a HTTP connection by the given request to the given host and port.
func TestHTTPHealthCheckNodePort(host string, port int, request string, timeout time.Duration, expectSucceed bool, threshold int) error {
count := 0
condition := func() (bool, error) {
success, _ := testHTTPHealthCheckNodePort(host, port, request)
if success && expectSucceed ||
!success && !expectSucceed {
count++
}
if count >= threshold {
return true, nil
}
return false, nil
}
if err := wait.PollImmediate(time.Second, timeout, condition); err != nil {
return fmt.Errorf("error waiting for healthCheckNodePort: expected at least %d succeed=%v on %v%v, got %d", threshold, expectSucceed, host, port, count)
}
return nil
}
func testHTTPHealthCheckNodePort(ip string, port int, request string) (bool, error) {
ipPort := net.JoinHostPort(ip, strconv.Itoa(port))
url := fmt.Sprintf("http://%s%s", ipPort, request)
if ip == "" || port == 0 {
framework.Failf("Got empty IP for reachability check (%s)", url)
return false, fmt.Errorf("invalid input ip or port")
}
framework.Logf("Testing HTTP health check on %v", url)
resp, err := httpGetNoConnectionPoolTimeout(url, 5*time.Second)
if err != nil {
framework.Logf("Got error testing for reachability of %s: %v", url, err)
return false, err
}
defer resp.Body.Close()
if err != nil {
framework.Logf("Got error reading response from %s: %v", url, err)
return false, err
}
// HealthCheck responder returns 503 for no local endpoints
if resp.StatusCode == 503 {
return false, nil
}
// HealthCheck responder returns 200 for non-zero local endpoints
if resp.StatusCode == 200 {
return true, nil
}
return false, fmt.Errorf("unexpected HTTP response code %s from health check responder at %s", resp.Status, url)
}
// Does an HTTP GET, but does not reuse TCP connections
// This masks problems where the iptables rule has changed, but we don't see it
func httpGetNoConnectionPoolTimeout(url string, timeout time.Duration) (*http.Response, error) {
tr := utilnet.SetTransportDefaults(&http.Transport{
DisableKeepAlives: true,
})
client := &http.Client{
Transport: tr,
Timeout: timeout,
}
return client.Get(url)
}
// GetHTTPContent returns the content of the given url by HTTP.
func GetHTTPContent(host string, port int, timeout time.Duration, url string) bytes.Buffer {
var body bytes.Buffer
if pollErr := wait.PollImmediate(framework.Poll, timeout, func() (bool, error) {
result := e2enetwork.PokeHTTP(host, port, url, nil)
if result.Status == e2enetwork.HTTPSuccess {
body.Write(result.Body)
return true, nil
}
return false, nil
}); pollErr != nil {
framework.Failf("Could not reach HTTP service through %v:%v%v after %v: %v", host, port, url, timeout, pollErr)
}
return body
}
// UDPPokeParams is a struct for UDP poke parameters.
type UDPPokeParams struct {
Timeout time.Duration
Response string
}
// UDPPokeResult is a struct for UDP poke result.
type UDPPokeResult struct {
Status UDPPokeStatus
Error error // if there was any error
Response []byte // if code != 0
}
// UDPPokeStatus is string for representing UDP poke status.
type UDPPokeStatus string
const (
// UDPSuccess is UDP poke status which is success.
UDPSuccess UDPPokeStatus = "Success"
// UDPError is UDP poke status which is error.
UDPError UDPPokeStatus = "UnknownError"
// UDPTimeout is UDP poke status which is timeout.
UDPTimeout UDPPokeStatus = "TimedOut"
// UDPRefused is UDP poke status which is connection refused.
UDPRefused UDPPokeStatus = "ConnectionRefused"
// UDPBadResponse is UDP poke status which is bad response.
UDPBadResponse UDPPokeStatus = "BadResponse"
// Any time we add new errors, we should audit all callers of this.
)
// pokeUDP tries to connect to a host on a port and send the given request. Callers
// can specify additional success parameters, if desired.
//
// The result status will be characterized as precisely as possible, given the
// known users of this.
//
// The result error will be populated for any status other than Success.
//
// The result response will be populated if the UDP transaction was completed, even
// if the other test params make this a failure).
func pokeUDP(host string, port int, request string, params *UDPPokeParams) UDPPokeResult {
hostPort := net.JoinHostPort(host, strconv.Itoa(port))
url := fmt.Sprintf("udp://%s", hostPort)
ret := UDPPokeResult{}
// Sanity check inputs, because it has happened. These are the only things
// that should hard fail the test - they are basically ASSERT()s.
if host == "" {
framework.Failf("Got empty host for UDP poke (%s)", url)
return ret
}
if port == 0 {
framework.Failf("Got port==0 for UDP poke (%s)", url)
return ret
}
// Set default params.
if params == nil {
params = &UDPPokeParams{}
}
framework.Logf("Poking %v", url)
con, err := net.Dial("udp", hostPort)
if err != nil {
ret.Status = UDPError
ret.Error = err
framework.Logf("Poke(%q): %v", url, err)
return ret
}
_, err = con.Write([]byte(fmt.Sprintf("%s\n", request)))
if err != nil {
ret.Error = err
neterr, ok := err.(net.Error)
if ok && neterr.Timeout() {
ret.Status = UDPTimeout
} else if strings.Contains(err.Error(), "connection refused") {
ret.Status = UDPRefused
} else {
ret.Status = UDPError
}
framework.Logf("Poke(%q): %v", url, err)
return ret
}
if params.Timeout != 0 {
err = con.SetDeadline(time.Now().Add(params.Timeout))
if err != nil {
ret.Status = UDPError
ret.Error = err
framework.Logf("Poke(%q): %v", url, err)
return ret
}
}
bufsize := len(params.Response) + 1
if bufsize == 0 {
bufsize = 4096
}
var buf = make([]byte, bufsize)
n, err := con.Read(buf)
if err != nil {
ret.Error = err
neterr, ok := err.(net.Error)
if ok && neterr.Timeout() {
ret.Status = UDPTimeout
} else if strings.Contains(err.Error(), "connection refused") {
ret.Status = UDPRefused
} else {
ret.Status = UDPError
}
framework.Logf("Poke(%q): %v", url, err)
return ret
}
ret.Response = buf[0:n]
if params.Response != "" && string(ret.Response) != params.Response {
ret.Status = UDPBadResponse
ret.Error = fmt.Errorf("response does not match expected string: %q", string(ret.Response))
framework.Logf("Poke(%q): %v", url, ret.Error)
return ret
}
ret.Status = UDPSuccess
framework.Logf("Poke(%q): success", url)
return ret
}

View File

@ -17,7 +17,6 @@ limitations under the License.
package service
import (
"context"
"fmt"
apierrors "k8s.io/apimachinery/pkg/api/errors"
@ -30,43 +29,6 @@ import (
"github.com/onsi/ginkgo"
)
// WaitForServiceResponding waits for the service to be responding.
func WaitForServiceResponding(c clientset.Interface, ns, name string) error {
ginkgo.By(fmt.Sprintf("trying to dial the service %s.%s via the proxy", ns, name))
return wait.PollImmediate(framework.Poll, RespondingTimeout, func() (done bool, err error) {
proxyRequest, errProxy := GetServicesProxyRequest(c, c.CoreV1().RESTClient().Get())
if errProxy != nil {
framework.Logf("Failed to get services proxy request: %v:", errProxy)
return false, nil
}
ctx, cancel := context.WithTimeout(context.Background(), framework.SingleCallTimeout)
defer cancel()
body, err := proxyRequest.Namespace(ns).
Context(ctx).
Name(name).
Do().
Raw()
if err != nil {
if ctx.Err() != nil {
framework.Failf("Failed to GET from service %s: %v", name, err)
return true, err
}
framework.Logf("Failed to GET from service %s: %v:", name, err)
return false, nil
}
got := string(body)
if len(got) == 0 {
framework.Logf("Service %s: expected non-empty response", name)
return false, err // stop polling
}
framework.Logf("Service %s: found nonempty answer: %s", name, got)
return true, nil
})
}
// WaitForServiceDeletedWithFinalizer waits for the service with finalizer to be deleted.
func WaitForServiceDeletedWithFinalizer(cs clientset.Interface, namespace, name string) {
ginkgo.By("Delete service with finalizer")

View File

@ -17,6 +17,7 @@ go_library(
"endpointslice.go",
"example_cluster_dns.go",
"firewall.go",
"fixture.go",
"framework.go",
"ingress.go",
"ingress_scale.go",
@ -29,6 +30,7 @@ go_library(
"proxy.go",
"service.go",
"service_latency.go",
"util.go",
"util_iperf.go",
],
importpath = "k8s.io/kubernetes/test/e2e/network",
@ -57,6 +59,7 @@ go_library(
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
"//staging/src/k8s.io/client-go/util/flowcontrol:go_default_library",
"//staging/src/k8s.io/client-go/util/retry:go_default_library",
"//staging/src/k8s.io/client-go/util/workqueue:go_default_library",
"//staging/src/k8s.io/cloud-provider:go_default_library",
"//staging/src/k8s.io/legacy-cloud-providers/gce:go_default_library",

View File

@ -217,7 +217,7 @@ var _ = SIGDescribe("[Feature:IPv6DualStackAlphaFeature] [LinuxOnly]", func() {
defaultIPFamily = v1.IPv6Protocol
}
t := e2eservice.NewServerTest(cs, ns, serviceName)
t := NewServerTest(cs, ns, serviceName)
defer func() {
defer ginkgo.GinkgoRecover()
if errs := t.Cleanup(); len(errs) != 0 {
@ -254,7 +254,7 @@ var _ = SIGDescribe("[Feature:IPv6DualStackAlphaFeature] [LinuxOnly]", func() {
jig := e2eservice.NewTestJig(cs, ns, serviceName)
t := e2eservice.NewServerTest(cs, ns, serviceName)
t := NewServerTest(cs, ns, serviceName)
defer func() {
defer ginkgo.GinkgoRecover()
if errs := t.Cleanup(); len(errs) != 0 {
@ -291,7 +291,7 @@ var _ = SIGDescribe("[Feature:IPv6DualStackAlphaFeature] [LinuxOnly]", func() {
jig := e2eservice.NewTestJig(cs, ns, serviceName)
t := e2eservice.NewServerTest(cs, ns, serviceName)
t := NewServerTest(cs, ns, serviceName)
defer func() {
defer ginkgo.GinkgoRecover()
if errs := t.Cleanup(); len(errs) != 0 {

View File

@ -17,6 +17,7 @@ limitations under the License.
package network
import (
"context"
"fmt"
"io/ioutil"
"os"
@ -28,6 +29,7 @@ import (
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/wait"
clientset "k8s.io/client-go/kubernetes"
api "k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/test/e2e/framework"
@ -37,6 +39,9 @@ import (
const (
dnsReadyTimeout = time.Minute
// RespondingTimeout is how long to wait for a service to be responding.
RespondingTimeout = 2 * time.Minute
)
const queryDNSPythonTemplate string = `
@ -110,7 +115,7 @@ var _ = SIGDescribe("ClusterDns [Feature:Example]", func() {
framework.ExpectNoError(err, "waiting for all pods to respond")
framework.Logf("found %d backend pods responding in namespace %s", len(pods.Items), ns.Name)
err = e2eservice.WaitForServiceResponding(c, ns.Name, backendSvcName)
err = waitForServiceResponding(c, ns.Name, backendSvcName)
framework.ExpectNoError(err, "waiting for the service to respond")
}
@ -171,3 +176,40 @@ func prepareResourceWithReplacedString(inputFile, old, new string) string {
podYaml := strings.Replace(string(data), old, new, 1)
return podYaml
}
// waitForServiceResponding waits for the service to be responding.
func waitForServiceResponding(c clientset.Interface, ns, name string) error {
ginkgo.By(fmt.Sprintf("trying to dial the service %s.%s via the proxy", ns, name))
return wait.PollImmediate(framework.Poll, RespondingTimeout, func() (done bool, err error) {
proxyRequest, errProxy := e2eservice.GetServicesProxyRequest(c, c.CoreV1().RESTClient().Get())
if errProxy != nil {
framework.Logf("Failed to get services proxy request: %v:", errProxy)
return false, nil
}
ctx, cancel := context.WithTimeout(context.Background(), framework.SingleCallTimeout)
defer cancel()
body, err := proxyRequest.Namespace(ns).
Context(ctx).
Name(name).
Do().
Raw()
if err != nil {
if ctx.Err() != nil {
framework.Failf("Failed to GET from service %s: %v", name, err)
return true, err
}
framework.Logf("Failed to GET from service %s: %v:", name, err)
return false, nil
}
got := string(body)
if len(got) == 0 {
framework.Logf("Service %s: expected non-empty response", name)
return false, err // stop polling
}
framework.Logf("Service %s: found nonempty answer: %s", name, got)
return true, nil
})
}

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package service
package network
import (
v1 "k8s.io/api/core/v1"

View File

@ -50,7 +50,7 @@ var _ = SIGDescribe("Services [Feature:GCEAlphaFeature][Slow]", func() {
ginkgo.AfterEach(func() {
if ginkgo.CurrentGinkgoTestDescription().Failed {
e2eservice.DescribeSvc(f.Namespace.Name)
DescribeSvc(f.Namespace.Name)
}
for _, lb := range serviceLBNames {
framework.Logf("cleaning gce resource for %s", lb)

View File

@ -30,7 +30,6 @@ import (
"k8s.io/kubernetes/test/e2e/framework"
e2enetwork "k8s.io/kubernetes/test/e2e/framework/network"
e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
e2eservice "k8s.io/kubernetes/test/e2e/framework/service"
e2essh "k8s.io/kubernetes/test/e2e/framework/ssh"
"github.com/onsi/ginkgo"
@ -327,9 +326,9 @@ var _ = SIGDescribe("Networking", func() {
svc := "iptables-flush-test"
defer func() {
framework.ExpectNoError(e2eservice.StopServeHostnameService(f.ClientSet, ns, svc))
framework.ExpectNoError(StopServeHostnameService(f.ClientSet, ns, svc))
}()
podNames, svcIP, err := e2eservice.StartServeHostnameService(f.ClientSet, getServeHostnameService(svc), ns, numPods)
podNames, svcIP, err := StartServeHostnameService(f.ClientSet, getServeHostnameService(svc), ns, numPods)
framework.ExpectNoError(err, "failed to create replication controller with service: %s in the namespace: %s", svc, ns)
// Ideally we want to reload the system firewall, but we don't necessarily
@ -377,7 +376,7 @@ var _ = SIGDescribe("Networking", func() {
}
ginkgo.By("verifying that kube-proxy rules are eventually recreated")
framework.ExpectNoError(e2eservice.VerifyServeHostnameServiceUp(f.ClientSet, ns, host, podNames, svcIP, servicePort))
framework.ExpectNoError(verifyServeHostnameServiceUp(f.ClientSet, ns, host, podNames, svcIP, servicePort))
ginkgo.By("verifying that kubelet rules are eventually recreated")
err = utilwait.PollImmediate(framework.Poll, framework.RestartNodeReadyAgainTimeout, func() (bool, error) {

File diff suppressed because it is too large Load Diff

51
test/e2e/network/util.go Normal file
View File

@ -0,0 +1,51 @@
/*
Copyright 2014 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package network
import (
"bytes"
"fmt"
"time"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/kubernetes/test/e2e/framework"
e2enetwork "k8s.io/kubernetes/test/e2e/framework/network"
)
// GetHTTPContent returns the content of the given url by HTTP.
func GetHTTPContent(host string, port int, timeout time.Duration, url string) bytes.Buffer {
var body bytes.Buffer
if pollErr := wait.PollImmediate(framework.Poll, timeout, func() (bool, error) {
result := e2enetwork.PokeHTTP(host, port, url, nil)
if result.Status == e2enetwork.HTTPSuccess {
body.Write(result.Body)
return true, nil
}
return false, nil
}); pollErr != nil {
framework.Failf("Could not reach HTTP service through %v:%v%v after %v: %v", host, port, url, timeout, pollErr)
}
return body
}
// DescribeSvc logs the output of kubectl describe svc for the given namespace
func DescribeSvc(ns string) {
framework.Logf("\nOutput of kubectl describe svc:\n")
desc, _ := framework.RunKubectl(
ns, "describe", "svc", fmt.Sprintf("--namespace=%v", ns))
framework.Logf(desc)
}