Merge pull request #124660 from danwinship/feature-loadbalancer

Fix LoadBalancer tests to be provider-agnostic
This commit is contained in:
Kubernetes Prow Robot 2024-05-09 01:23:11 -07:00 committed by GitHub
commit d36b267023
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 1871 additions and 6 deletions

View File

@ -147,9 +147,6 @@ var (
// Ingress.networking.k8s.io to be present.
Ingress = framework.WithFeature(framework.ValidFeatures.Add("Ingress"))
// TODO: document the feature (owning SIG, when to use this feature for a test)
IngressScale = framework.WithFeature(framework.ValidFeatures.Add("IngressScale"))
// TODO: document the feature (owning SIG, when to use this feature for a test)
InPlacePodVerticalScaling = framework.WithFeature(framework.ValidFeatures.Add("InPlacePodVerticalScaling"))
@ -178,6 +175,10 @@ var (
// TODO: document the feature (owning SIG, when to use this feature for a test)
LabelSelector = framework.WithFeature(framework.ValidFeatures.Add("LabelSelector"))
// Owner: sig-network
// Marks tests that require a cloud provider that implements LoadBalancer Services
LoadBalancer = framework.WithFeature(framework.ValidFeatures.Add("LoadBalancer"))
// TODO: document the feature (owning SIG, when to use this feature for a test)
LocalStorageCapacityIsolation = framework.WithFeature(framework.ValidFeatures.Add("LocalStorageCapacityIsolation"))
@ -190,9 +191,6 @@ var (
// TODO: document the feature (owning SIG, when to use this feature for a test)
MemoryManager = framework.WithFeature(framework.ValidFeatures.Add("MemoryManager"))
// TODO: document the feature (owning SIG, when to use this feature for a test)
NEG = framework.WithFeature(framework.ValidFeatures.Add("NEG"))
// Owner: sig-network
// Marks tests that require working external DNS.
NetworkingDNS = framework.WithFeature(framework.ValidFeatures.Add("Networking-DNS"))

File diff suppressed because it is too large Load Diff

View File

@ -23,6 +23,7 @@ import (
"fmt"
"math/rand"
"net"
"net/http"
"sort"
"strconv"
"strings"
@ -39,6 +40,7 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
utilnet "k8s.io/apimachinery/pkg/util/net"
utilrand "k8s.io/apimachinery/pkg/util/rand"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
@ -50,6 +52,7 @@ import (
watchtools "k8s.io/client-go/tools/watch"
"k8s.io/client-go/util/retry"
cloudprovider "k8s.io/cloud-provider"
netutils "k8s.io/utils/net"
utilpointer "k8s.io/utils/pointer"
@ -426,6 +429,21 @@ func verifyServeHostnameServiceDown(ctx context.Context, c clientset.Interface,
return fmt.Errorf("waiting for service to be down timed out")
}
// testNotReachableHTTP tests that a HTTP request doesn't connect to the given host and port.
func testNotReachableHTTP(ctx context.Context, host string, port int, timeout time.Duration) {
pollfn := func(ctx context.Context) (bool, error) {
result := e2enetwork.PokeHTTP(host, port, "/", nil)
if result.Code == 0 {
return true, nil
}
return false, nil // caller can retry
}
if err := wait.PollUntilContextTimeout(ctx, framework.Poll, timeout, true, pollfn); err != nil {
framework.Failf("HTTP service %v:%v reachable after %v: %v", host, port, timeout, err)
}
}
// UDPPokeParams is a struct for UDP poke parameters.
type UDPPokeParams struct {
Timeout time.Duration
@ -456,6 +474,241 @@ const (
// Any time we add new errors, we should audit all callers of this.
)
// pokeUDP tries to connect to a host on a port and send the given request. Callers
// can specify additional success parameters, if desired.
//
// The result status will be characterized as precisely as possible, given the
// known users of this.
//
// The result error will be populated for any status other than Success.
//
// The result response will be populated if the UDP transaction was completed, even
// if the other test params make this a failure).
func pokeUDP(host string, port int, request string, params *UDPPokeParams) UDPPokeResult {
hostPort := net.JoinHostPort(host, strconv.Itoa(port))
url := fmt.Sprintf("udp://%s", hostPort)
ret := UDPPokeResult{}
// Sanity check inputs, because it has happened. These are the only things
// that should hard fail the test - they are basically ASSERT()s.
if host == "" {
framework.Failf("Got empty host for UDP poke (%s)", url)
return ret
}
if port == 0 {
framework.Failf("Got port==0 for UDP poke (%s)", url)
return ret
}
// Set default params.
if params == nil {
params = &UDPPokeParams{}
}
framework.Logf("Poking %v", url)
con, err := net.Dial("udp", hostPort)
if err != nil {
ret.Status = UDPError
ret.Error = err
framework.Logf("Poke(%q): %v", url, err)
return ret
}
_, err = con.Write([]byte(fmt.Sprintf("%s\n", request)))
if err != nil {
ret.Error = err
var neterr net.Error
if errors.As(err, &neterr) && neterr.Timeout() {
ret.Status = UDPTimeout
} else if strings.Contains(err.Error(), "connection refused") {
ret.Status = UDPRefused
} else {
ret.Status = UDPError
}
framework.Logf("Poke(%q): %v", url, err)
return ret
}
if params.Timeout != 0 {
err = con.SetDeadline(time.Now().Add(params.Timeout))
if err != nil {
ret.Status = UDPError
ret.Error = err
framework.Logf("Poke(%q): %v", url, err)
return ret
}
}
bufsize := len(params.Response) + 1
if bufsize == 0 {
bufsize = 4096
}
var buf = make([]byte, bufsize)
n, err := con.Read(buf)
if err != nil {
ret.Error = err
var neterr net.Error
if errors.As(err, &neterr) && neterr.Timeout() {
ret.Status = UDPTimeout
} else if strings.Contains(err.Error(), "connection refused") {
ret.Status = UDPRefused
} else {
ret.Status = UDPError
}
framework.Logf("Poke(%q): %v", url, err)
return ret
}
ret.Response = buf[0:n]
if params.Response != "" && string(ret.Response) != params.Response {
ret.Status = UDPBadResponse
ret.Error = fmt.Errorf("response does not match expected string: %q", string(ret.Response))
framework.Logf("Poke(%q): %v", url, ret.Error)
return ret
}
ret.Status = UDPSuccess
framework.Logf("Poke(%q): success", url)
return ret
}
// testReachableUDP tests that the given host serves UDP on the given port.
func testReachableUDP(ctx context.Context, host string, port int, timeout time.Duration) {
pollfn := func(ctx context.Context) (bool, error) {
result := pokeUDP(host, port, "echo hello", &UDPPokeParams{
Timeout: 3 * time.Second,
Response: "hello",
})
if result.Status == UDPSuccess {
return true, nil
}
return false, nil // caller can retry
}
if err := wait.PollUntilContextTimeout(ctx, framework.Poll, timeout, true, pollfn); err != nil {
framework.Failf("Could not reach UDP service through %v:%v after %v: %v", host, port, timeout, err)
}
}
// testNotReachableUDP tests that the given host doesn't serve UDP on the given port.
func testNotReachableUDP(ctx context.Context, host string, port int, timeout time.Duration) {
pollfn := func(ctx context.Context) (bool, error) {
result := pokeUDP(host, port, "echo hello", &UDPPokeParams{Timeout: 3 * time.Second})
if result.Status != UDPSuccess && result.Status != UDPError {
return true, nil
}
return false, nil // caller can retry
}
if err := wait.PollUntilContextTimeout(ctx, framework.Poll, timeout, true, pollfn); err != nil {
framework.Failf("UDP service %v:%v reachable after %v: %v", host, port, timeout, err)
}
}
// testRejectedUDP tests that the given host rejects a UDP request on the given port.
func testRejectedUDP(ctx context.Context, host string, port int, timeout time.Duration) {
pollfn := func(ctx context.Context) (bool, error) {
result := pokeUDP(host, port, "echo hello", &UDPPokeParams{Timeout: 3 * time.Second})
if result.Status == UDPRefused {
return true, nil
}
return false, nil // caller can retry
}
if err := wait.PollUntilContextTimeout(ctx, framework.Poll, timeout, true, pollfn); err != nil {
framework.Failf("UDP service %v:%v not rejected: %v", host, port, err)
}
}
// TestHTTPHealthCheckNodePort tests a HTTP connection by the given request to the given host and port.
func TestHTTPHealthCheckNodePort(ctx context.Context, host string, port int, request string, timeout time.Duration, expectSucceed bool, threshold int) error {
count := 0
condition := func(ctx context.Context) (bool, error) {
success, _ := testHTTPHealthCheckNodePort(host, port, request)
if success && expectSucceed ||
!success && !expectSucceed {
count++
}
if count >= threshold {
return true, nil
}
return false, nil
}
if err := wait.PollUntilContextTimeout(ctx, time.Second, timeout, true, condition); err != nil {
return fmt.Errorf("error waiting for healthCheckNodePort: expected at least %d succeed=%v on %v%v, got %d", threshold, expectSucceed, host, port, count)
}
return nil
}
func testHTTPHealthCheckNodePort(ip string, port int, request string) (bool, error) {
ipPort := net.JoinHostPort(ip, strconv.Itoa(port))
url := fmt.Sprintf("http://%s%s", ipPort, request)
if ip == "" || port == 0 {
framework.Failf("Got empty IP for reachability check (%s)", url)
return false, fmt.Errorf("invalid input ip or port")
}
framework.Logf("Testing HTTP health check on %v", url)
resp, err := httpGetNoConnectionPoolTimeout(url, 5*time.Second)
if err != nil {
framework.Logf("Got error testing for reachability of %s: %v", url, err)
return false, err
}
defer func() { _ = resp.Body.Close() }()
if err != nil {
framework.Logf("Got error reading response from %s: %v", url, err)
return false, err
}
// HealthCheck responder returns 503 for no local endpoints
if resp.StatusCode == 503 {
return false, nil
}
// HealthCheck responder returns 200 for non-zero local endpoints
if resp.StatusCode == 200 {
return true, nil
}
return false, fmt.Errorf("unexpected HTTP response code %s from health check responder at %s", resp.Status, url)
}
func testHTTPHealthCheckNodePortFromTestContainer(ctx context.Context, config *e2enetwork.NetworkingTestConfig, host string, port int, timeout time.Duration, expectSucceed bool, threshold int) error {
count := 0
pollFn := func(ctx context.Context) (bool, error) {
statusCode, err := config.GetHTTPCodeFromTestContainer(ctx,
"/healthz",
host,
port)
if err != nil {
framework.Logf("Got error reading status code from http://%s:%d/healthz via test container: %v", host, port, err)
return false, nil
}
framework.Logf("Got status code from http://%s:%d/healthz via test container: %d", host, port, statusCode)
success := statusCode == 200
if (success && expectSucceed) ||
(!success && !expectSucceed) {
count++
}
return count >= threshold, nil
}
err := wait.PollUntilContextTimeout(ctx, time.Second, timeout, true, pollFn)
if err != nil {
return fmt.Errorf("error waiting for healthCheckNodePort: expected at least %d succeed=%v on %v:%v/healthz, got %d", threshold, expectSucceed, host, port, count)
}
return nil
}
// Does an HTTP GET, but does not reuse TCP connections
// This masks problems where the iptables rule has changed, but we don't see it
func httpGetNoConnectionPoolTimeout(url string, timeout time.Duration) (*http.Response, error) {
tr := utilnet.SetTransportDefaults(&http.Transport{
DisableKeepAlives: true,
})
client := &http.Client{
Transport: tr,
Timeout: timeout,
}
return client.Get(url)
}
func getServeHostnameService(name string) *v1.Service {
svc := defaultServeHostnameService.DeepCopy()
svc.ObjectMeta.Name = name
@ -474,6 +727,23 @@ func waitForAPIServerUp(ctx context.Context, c clientset.Interface) error {
return fmt.Errorf("waiting for apiserver timed out")
}
// getEndpointNodesWithInternalIP returns a map of nodenames:internal-ip on which the
// endpoints of the Service are running.
func getEndpointNodesWithInternalIP(ctx context.Context, jig *e2eservice.TestJig) (map[string]string, error) {
nodesWithIPs, err := jig.GetEndpointNodesWithIP(ctx, v1.NodeInternalIP)
if err != nil {
return nil, err
}
endpointsNodeMap := make(map[string]string, len(nodesWithIPs))
for nodeName, internalIPs := range nodesWithIPs {
if len(internalIPs) < 1 {
return nil, fmt.Errorf("no internal ip found for node %s", nodeName)
}
endpointsNodeMap[nodeName] = internalIPs[0]
}
return endpointsNodeMap, nil
}
var _ = common.SIGDescribe("Services", func() {
f := framework.NewDefaultFramework("services")
f.NamespacePodSecurityLevel = admissionapi.LevelPrivileged
@ -3756,6 +4026,62 @@ func execAffinityTestForNonLBServiceWithOptionalTransition(ctx context.Context,
}
}
func execAffinityTestForLBServiceWithTransition(ctx context.Context, f *framework.Framework, cs clientset.Interface, svc *v1.Service) {
execAffinityTestForLBServiceWithOptionalTransition(ctx, f, cs, svc, true)
}
func execAffinityTestForLBService(ctx context.Context, f *framework.Framework, cs clientset.Interface, svc *v1.Service) {
execAffinityTestForLBServiceWithOptionalTransition(ctx, f, cs, svc, false)
}
// execAffinityTestForLBServiceWithOptionalTransition is a helper function that wrap the logic of
// affinity test for load balancer services, similar to
// execAffinityTestForNonLBServiceWithOptionalTransition.
func execAffinityTestForLBServiceWithOptionalTransition(ctx context.Context, f *framework.Framework, cs clientset.Interface, svc *v1.Service, isTransitionTest bool) {
numPods, ns, serviceName := 3, f.Namespace.Name, svc.ObjectMeta.Name
ginkgo.By("creating service in namespace " + ns)
svc.Spec.SessionAffinity = v1.ServiceAffinityClientIP
_, _, err := StartServeHostnameService(ctx, cs, svc, ns, numPods)
framework.ExpectNoError(err, "failed to create replication controller with service in the namespace: %s", ns)
jig := e2eservice.NewTestJig(cs, ns, serviceName)
ginkgo.By("waiting for loadbalancer for service " + ns + "/" + serviceName)
svc, err = jig.WaitForLoadBalancer(ctx, e2eservice.GetServiceLoadBalancerCreationTimeout(ctx, cs))
framework.ExpectNoError(err)
ginkgo.DeferCleanup(func(ctx context.Context) {
podNodePairs, err := e2enode.PodNodePairs(ctx, cs, ns)
framework.Logf("[pod,node] pairs: %+v; err: %v", podNodePairs, err)
_ = StopServeHostnameService(ctx, cs, ns, serviceName)
lb := cloudprovider.DefaultLoadBalancerName(svc)
framework.Logf("cleaning load balancer resource for %s", lb)
e2eservice.CleanupServiceResources(ctx, cs, lb, framework.TestContext.CloudConfig.Region, framework.TestContext.CloudConfig.Zone)
})
ingressIP := e2eservice.GetIngressPoint(&svc.Status.LoadBalancer.Ingress[0])
port := int(svc.Spec.Ports[0].Port)
if !isTransitionTest {
if !checkAffinity(ctx, cs, nil, ingressIP, port, true) {
framework.Failf("Failed to verify affinity for loadbalance service %s/%s", ns, serviceName)
}
}
if isTransitionTest {
svc, err = jig.UpdateService(ctx, func(svc *v1.Service) {
svc.Spec.SessionAffinity = v1.ServiceAffinityNone
})
framework.ExpectNoError(err)
if !checkAffinity(ctx, cs, nil, ingressIP, port, false) {
framework.Failf("Failed to verify affinity for loadbalance service %s/%s without session affinity ", ns, serviceName)
}
svc, err = jig.UpdateService(ctx, func(svc *v1.Service) {
svc.Spec.SessionAffinity = v1.ServiceAffinityClientIP
})
framework.ExpectNoError(err)
if !checkAffinity(ctx, cs, nil, ingressIP, port, true) {
framework.Failf("Failed to verify affinity for loadbalance service %s/%s with session affinity ", ns, serviceName)
}
}
}
func createAndGetExternalServiceFQDN(ctx context.Context, cs clientset.Interface, ns, serviceName string) string {
_, _, err := StartServeHostnameService(ctx, cs, getServeHostnameService(serviceName), ns, 2)
framework.ExpectNoError(err, "Expected Service %s to be running", serviceName)
@ -3807,6 +4133,25 @@ func launchHostExecPod(ctx context.Context, client clientset.Interface, ns, name
return pod
}
// checkReachabilityFromPod checks reachability from the specified pod.
func checkReachabilityFromPod(ctx context.Context, expectToBeReachable bool, timeout time.Duration, namespace, pod, target string) {
cmd := fmt.Sprintf("wget -T 5 -qO- %q", target)
err := wait.PollUntilContextTimeout(ctx, framework.Poll, timeout, true, func(ctx context.Context) (bool, error) {
_, err := e2eoutput.RunHostCmd(namespace, pod, cmd)
if expectToBeReachable && err != nil {
framework.Logf("Expect target to be reachable. But got err: %v. Retry until timeout", err)
return false, nil
}
if !expectToBeReachable && err == nil {
framework.Logf("Expect target NOT to be reachable. But it is reachable. Retry until timeout")
return false, nil
}
return true, nil
})
framework.ExpectNoError(err)
}
func validatePorts(ep, expectedEndpoints portsByPodUID) error {
if len(ep) != len(expectedEndpoints) {
// should not happen because we check this condition before