From da5bf27bc5780333ca8d87df7779462a075a4047 Mon Sep 17 00:00:00 2001 From: Dan Winship Date: Wed, 20 Nov 2024 19:09:59 -0500 Subject: [PATCH 1/2] Remove no-longer-used TestUnderTemporaryNetworkFailure() This was previously used by some autoscaling tests that have since been removed. It is somewhat sketchy (and inherently [Disruptive]), and depends on iptables (so would need to be updated to use nftables at some point if we were keeping it). Given that it's now unused, just remove it (as well as some helper functions that are no longer used by anyone else as well). --- test/e2e/framework/network/utils.go | 96 ----------------------------- test/e2e/framework/util.go | 61 ------------------ 2 files changed, 157 deletions(-) diff --git a/test/e2e/framework/network/utils.go b/test/e2e/framework/network/utils.go index a2febeab5f8..2b7778e7d0e 100644 --- a/test/e2e/framework/network/utils.go +++ b/test/e2e/framework/network/utils.go @@ -45,7 +45,6 @@ import ( e2epod "k8s.io/kubernetes/test/e2e/framework/pod" e2epodoutput "k8s.io/kubernetes/test/e2e/framework/pod/output" e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper" - e2essh "k8s.io/kubernetes/test/e2e/framework/ssh" imageutils "k8s.io/kubernetes/test/utils/image" netutils "k8s.io/utils/net" ) @@ -1099,101 +1098,6 @@ func httpGetNoConnectionPoolTimeout(url string, timeout time.Duration) (*http.Re return client.Get(url) } -// TestUnderTemporaryNetworkFailure blocks outgoing network traffic on 'node'. Then runs testFunc and returns its status. -// At the end (even in case of errors), the network traffic is brought back to normal. -// This function executes commands on a node so it will work only for some -// environments. -func TestUnderTemporaryNetworkFailure(ctx context.Context, c clientset.Interface, ns string, node *v1.Node, testFunc func(ctx context.Context)) { - host, err := e2enode.GetSSHExternalIP(node) - if err != nil { - framework.Failf("Error getting node external ip : %v", err) - } - controlPlaneAddresses := framework.GetControlPlaneAddresses(ctx, c) - ginkgo.By(fmt.Sprintf("block network traffic from node %s to the control plane", node.Name)) - defer func() { - // This code will execute even if setting the iptables rule failed. - // It is on purpose because we may have an error even if the new rule - // had been inserted. (yes, we could look at the error code and ssh error - // separately, but I prefer to stay on the safe side). - ginkgo.By(fmt.Sprintf("Unblock network traffic from node %s to the control plane", node.Name)) - for _, instanceAddress := range controlPlaneAddresses { - UnblockNetwork(ctx, host, instanceAddress) - } - }() - - framework.Logf("Waiting %v to ensure node %s is ready before beginning test...", resizeNodeReadyTimeout, node.Name) - if !e2enode.WaitConditionToBe(ctx, c, node.Name, v1.NodeReady, true, resizeNodeReadyTimeout) { - framework.Failf("Node %s did not become ready within %v", node.Name, resizeNodeReadyTimeout) - } - for _, instanceAddress := range controlPlaneAddresses { - BlockNetwork(ctx, host, instanceAddress) - } - - framework.Logf("Waiting %v for node %s to be not ready after simulated network failure", resizeNodeNotReadyTimeout, node.Name) - if !e2enode.WaitConditionToBe(ctx, c, node.Name, v1.NodeReady, false, resizeNodeNotReadyTimeout) { - framework.Failf("Node %s did not become not-ready within %v", node.Name, resizeNodeNotReadyTimeout) - } - - testFunc(ctx) - // network traffic is unblocked in a deferred function -} - -// BlockNetwork blocks network between the given from value and the given to value. -// The following helper functions can block/unblock network from source -// host to destination host by manipulating iptable rules. -// This function assumes it can ssh to the source host. -// -// Caution: -// Recommend to input IP instead of hostnames. Using hostnames will cause iptables to -// do a DNS lookup to resolve the name to an IP address, which will -// slow down the test and cause it to fail if DNS is absent or broken. -// -// Suggested usage pattern: -// -// func foo() { -// ... -// defer UnblockNetwork(from, to) -// BlockNetwork(from, to) -// ... -// } -func BlockNetwork(ctx context.Context, from string, to string) { - framework.Logf("block network traffic from %s to %s", from, to) - iptablesRule := fmt.Sprintf("OUTPUT --destination %s --jump REJECT", to) - dropCmd := fmt.Sprintf("sudo iptables --insert %s", iptablesRule) - if result, err := e2essh.SSH(ctx, dropCmd, from, framework.TestContext.Provider); result.Code != 0 || err != nil { - e2essh.LogResult(result) - framework.Failf("Unexpected error: %v", err) - } -} - -// UnblockNetwork unblocks network between the given from value and the given to value. -func UnblockNetwork(ctx context.Context, from string, to string) { - framework.Logf("Unblock network traffic from %s to %s", from, to) - iptablesRule := fmt.Sprintf("OUTPUT --destination %s --jump REJECT", to) - undropCmd := fmt.Sprintf("sudo iptables --delete %s", iptablesRule) - // Undrop command may fail if the rule has never been created. - // In such case we just lose 30 seconds, but the cluster is healthy. - // But if the rule had been created and removing it failed, the node is broken and - // not coming back. Subsequent tests will run or fewer nodes (some of the tests - // may fail). Manual intervention is required in such case (recreating the - // cluster solves the problem too). - err := wait.PollUntilContextTimeout(ctx, time.Millisecond*100, time.Second*30, false, func(ctx context.Context) (bool, error) { - result, err := e2essh.SSH(ctx, undropCmd, from, framework.TestContext.Provider) - if result.Code == 0 && err == nil { - return true, nil - } - e2essh.LogResult(result) - if err != nil { - framework.Logf("Unexpected error: %v", err) - } - return false, nil - }) - if err != nil { - framework.Failf("Failed to remove the iptable REJECT rule. Manual intervention is "+ - "required on host %s: remove rule %s, if exists", from, iptablesRule) - } -} - // WaitForService waits until the service appears (exist == true), or disappears (exist == false) func WaitForService(ctx context.Context, c clientset.Interface, namespace, name string, exist bool, interval, timeout time.Duration) error { err := wait.PollUntilContextTimeout(ctx, interval, timeout, true, func(ctx context.Context) (bool, error) { diff --git a/test/e2e/framework/util.go b/test/e2e/framework/util.go index b727fbe7322..04f6ecaccd6 100644 --- a/test/e2e/framework/util.go +++ b/test/e2e/framework/util.go @@ -56,11 +56,6 @@ import ( netutils "k8s.io/utils/net" ) -const ( - // TODO(justinsb): Avoid hardcoding this. - awsMasterIP = "172.20.0.9" -) - // DEPRECATED constants. Use the timeouts in framework.Framework instead. const ( // PodListTimeout is how long to wait for the pod to be listable. @@ -678,38 +673,6 @@ func GetNodeExternalIPs(node *v1.Node) (ips []string) { return } -// getControlPlaneAddresses returns the externalIP, internalIP and hostname fields of control plane nodes. -// If any of these is unavailable, empty slices are returned. -func getControlPlaneAddresses(ctx context.Context, c clientset.Interface) ([]string, []string, []string) { - var externalIPs, internalIPs, hostnames []string - - // Populate the internal IPs. - eps, err := c.CoreV1().Endpoints(metav1.NamespaceDefault).Get(ctx, "kubernetes", metav1.GetOptions{}) - if err != nil { - Failf("Failed to get kubernetes endpoints: %v", err) - } - for _, subset := range eps.Subsets { - for _, address := range subset.Addresses { - if address.IP != "" { - internalIPs = append(internalIPs, address.IP) - } - } - } - - // Populate the external IP/hostname. - hostURL, err := url.Parse(TestContext.Host) - if err != nil { - Failf("Failed to parse hostname: %v", err) - } - if netutils.ParseIPSloppy(hostURL.Host) != nil { - externalIPs = append(externalIPs, hostURL.Host) - } else { - hostnames = append(hostnames, hostURL.Host) - } - - return externalIPs, internalIPs, hostnames -} - // GetControlPlaneNodes returns a list of control plane nodes func GetControlPlaneNodes(ctx context.Context, c clientset.Interface) *v1.NodeList { allNodes, err := c.CoreV1().Nodes().List(ctx, metav1.ListOptions{}) @@ -737,30 +700,6 @@ func GetControlPlaneNodes(ctx context.Context, c clientset.Interface) *v1.NodeLi return &cpNodes } -// GetControlPlaneAddresses returns all IP addresses on which the kubelet can reach the control plane. -// It may return internal and external IPs, even if we expect for -// e.g. internal IPs to be used (issue #56787), so that we can be -// sure to block the control plane fully during tests. -func GetControlPlaneAddresses(ctx context.Context, c clientset.Interface) []string { - externalIPs, internalIPs, _ := getControlPlaneAddresses(ctx, c) - - ips := sets.NewString() - switch TestContext.Provider { - case "gce": - for _, ip := range externalIPs { - ips.Insert(ip) - } - for _, ip := range internalIPs { - ips.Insert(ip) - } - case "aws": - ips.Insert(awsMasterIP) - default: - Failf("This test is not supported for provider %s and should be disabled", TestContext.Provider) - } - return ips.List() -} - // PrettyPrintJSON converts metrics to JSON format. func PrettyPrintJSON(metrics interface{}) string { output := &bytes.Buffer{} From 969ecabc0f25dc9fbaccb0741104f52f2fc09f91 Mon Sep 17 00:00:00 2001 From: Dan Winship Date: Wed, 20 Nov 2024 14:48:16 -0500 Subject: [PATCH 2/2] Remove all references to v1.Endpoints from non-network e2e tests kube-proxy does not look at Endpoints ever, so it is incorrect for a test to assume that there is any correlation between whether Endpoints exist and whether a Service is working. Tests should only be using the v1.Endpoints API if they are explicitly testing the behavior of v1.Endpoints, the Endpoints controller, or the EndpointSlice mirroring controller. There is no reason for any non SIG Network tests to be testing any of those things, so there should be no references to v1.Endpoints in test/e2e outside of test/e2e/network. Also, simplify some pointlessly complicated e2eservice code. --- test/e2e/apimachinery/aggregator.go | 5 +- .../framework/endpoints/.import-restrictions | 12 -- test/e2e/framework/service/jig.go | 140 +++++------------- test/e2e/framework/util.go | 56 +------ test/e2e/kubectl/kubectl.go | 9 +- .../ports.go => network/endpoints.go} | 21 +-- test/e2e/network/service.go | 5 +- 7 files changed, 57 insertions(+), 191 deletions(-) delete mode 100644 test/e2e/framework/endpoints/.import-restrictions rename test/e2e/{framework/endpoints/ports.go => network/endpoints.go} (71%) diff --git a/test/e2e/apimachinery/aggregator.go b/test/e2e/apimachinery/aggregator.go index 596950ac178..a6448c106ab 100644 --- a/test/e2e/apimachinery/aggregator.go +++ b/test/e2e/apimachinery/aggregator.go @@ -28,6 +28,7 @@ import ( appsv1 "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" + discoveryv1 "k8s.io/api/discovery/v1" rbacv1 "k8s.io/api/rbac/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -763,9 +764,9 @@ func validateErrorWithDebugInfo(ctx context.Context, f *framework.Framework, err msg := fmt.Sprintf(msg, fields...) msg += fmt.Sprintf(" but received unexpected error:\n%v", err) client := f.ClientSet - ep, err := client.CoreV1().Endpoints(namespace).Get(ctx, "sample-api", metav1.GetOptions{}) + slices, err := client.DiscoveryV1().EndpointSlices(namespace).List(ctx, metav1.ListOptions{LabelSelector: fmt.Sprintf("%s=%s", discoveryv1.LabelServiceName, "sample-api")}) if err == nil { - msg += fmt.Sprintf("\nFound endpoints for sample-api:\n%v", ep) + msg += fmt.Sprintf("\nFound endpoint slices for sample-api:\n%v", slices) } pds, err := client.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{}) if err == nil { diff --git a/test/e2e/framework/endpoints/.import-restrictions b/test/e2e/framework/endpoints/.import-restrictions deleted file mode 100644 index 03b5ee5ec2c..00000000000 --- a/test/e2e/framework/endpoints/.import-restrictions +++ /dev/null @@ -1,12 +0,0 @@ -# This E2E framework sub-package is currently allowed to use arbitrary -# dependencies except of k/k/pkg, therefore we need to override the -# restrictions from the parent .import-restrictions file. -# -# At some point it may become useful to also check this package's -# dependencies more careful. -rules: - - selectorRegexp: "^k8s[.]io/kubernetes/pkg" - allowedPrefixes: [] - - - selectorRegexp: "" - allowedPrefixes: [ "" ] diff --git a/test/e2e/framework/service/jig.go b/test/e2e/framework/service/jig.go index ef8ac4a1421..2685ead4f92 100644 --- a/test/e2e/framework/service/jig.go +++ b/test/e2e/framework/service/jig.go @@ -33,17 +33,13 @@ import ( policyv1 "k8s.io/api/policy/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" - "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/intstr" utilnet "k8s.io/apimachinery/pkg/util/net" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/apimachinery/pkg/watch" clientset "k8s.io/client-go/kubernetes" - "k8s.io/client-go/tools/cache" "k8s.io/kubernetes/test/e2e/framework" e2enode "k8s.io/kubernetes/test/e2e/framework/node" e2epod "k8s.io/kubernetes/test/e2e/framework/pod" @@ -404,42 +400,47 @@ func (j *TestJig) GetEndpointNodeNames(ctx context.Context) (sets.String, error) if err != nil { return nil, err } - endpoints, err := j.Client.CoreV1().Endpoints(j.Namespace).Get(ctx, j.Name, metav1.GetOptions{}) + slices, err := j.Client.DiscoveryV1().EndpointSlices(j.Namespace).List(ctx, metav1.ListOptions{LabelSelector: discoveryv1.LabelServiceName + "=" + j.Name}) if err != nil { - return nil, fmt.Errorf("get endpoints for service %s/%s failed (%s)", j.Namespace, j.Name, err) - } - if len(endpoints.Subsets) == 0 { - return nil, fmt.Errorf("endpoint has no subsets, cannot determine node addresses") + return nil, fmt.Errorf("list endpointslices for service %s/%s failed (%w)", j.Namespace, j.Name, err) } epNodes := sets.NewString() - for _, ss := range endpoints.Subsets { - for _, e := range ss.Addresses { - if e.NodeName != nil { - epNodes.Insert(*e.NodeName) + for _, slice := range slices.Items { + for _, ep := range slice.Endpoints { + if ep.NodeName != nil { + epNodes.Insert(*ep.NodeName) } } } + if len(epNodes) == 0 { + return nil, fmt.Errorf("EndpointSlice has no endpoints, cannot determine node addresses") + } return epNodes, nil } -// WaitForEndpointOnNode waits for a service endpoint on the given node. +// WaitForEndpointOnNode waits for a service endpoint on the given node (which must be the service's only endpoint). func (j *TestJig) WaitForEndpointOnNode(ctx context.Context, nodeName string) error { return wait.PollUntilContextTimeout(ctx, framework.Poll, KubeProxyLagTimeout, true, func(ctx context.Context) (bool, error) { - endpoints, err := j.Client.CoreV1().Endpoints(j.Namespace).Get(ctx, j.Name, metav1.GetOptions{}) + slices, err := j.Client.DiscoveryV1().EndpointSlices(j.Namespace).List(ctx, metav1.ListOptions{LabelSelector: "kubernetes.io/service-name=" + j.Name}) if err != nil { - framework.Logf("Get endpoints for service %s/%s failed (%s)", j.Namespace, j.Name, err) + framework.Logf("List endpointslices for service %s/%s failed (%s)", j.Namespace, j.Name, err) return false, nil } - if len(endpoints.Subsets) == 0 { - framework.Logf("Expect endpoints with subsets, got none.") + if len(slices.Items) == 0 { + framework.Logf("Expected 1 EndpointSlice for service %s/%s, got 0", j.Namespace, j.Name) return false, nil } - // TODO: Handle multiple endpoints - if len(endpoints.Subsets[0].Addresses) == 0 { + slice := slices.Items[0] + if len(slice.Endpoints) == 0 { + framework.Logf("Expected EndpointSlice with Endpoints, got none.") + return false, nil + } + endpoint := slice.Endpoints[0] + if len(endpoint.Addresses) == 0 || (endpoint.Conditions.Ready != nil && !*endpoint.Conditions.Ready) { framework.Logf("Expected Ready endpoints - found none") return false, nil } - epHostName := *endpoints.Subsets[0].Addresses[0].NodeName + epHostName := *endpoint.NodeName framework.Logf("Pod for service %s/%s is on node %s", j.Namespace, j.Name, epHostName) if epHostName != nodeName { framework.Logf("Found endpoint on wrong node, expected %v, got %v", nodeName, epHostName) @@ -451,91 +452,18 @@ func (j *TestJig) WaitForEndpointOnNode(ctx context.Context, nodeName string) er // waitForAvailableEndpoint waits for at least 1 endpoint to be available till timeout func (j *TestJig) waitForAvailableEndpoint(ctx context.Context, timeout time.Duration) error { - ctx, cancel := context.WithTimeout(ctx, timeout) - defer cancel() - //Wait for endpoints to be created, this may take longer time if service backing pods are taking longer time to run - endpointSelector := fields.OneTermEqualSelector("metadata.name", j.Name) - endpointAvailable := false - endpointSliceAvailable := false - - var controller cache.Controller - _, controller = cache.NewInformer( - &cache.ListWatch{ - ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { - options.FieldSelector = endpointSelector.String() - obj, err := j.Client.CoreV1().Endpoints(j.Namespace).List(ctx, options) - return runtime.Object(obj), err - }, - WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { - options.FieldSelector = endpointSelector.String() - return j.Client.CoreV1().Endpoints(j.Namespace).Watch(ctx, options) - }, - }, - &v1.Endpoints{}, - 0, - cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - if e, ok := obj.(*v1.Endpoints); ok { - if len(e.Subsets) > 0 && len(e.Subsets[0].Addresses) > 0 { - endpointAvailable = true - } - } - }, - UpdateFunc: func(old, cur interface{}) { - if e, ok := cur.(*v1.Endpoints); ok { - if len(e.Subsets) > 0 && len(e.Subsets[0].Addresses) > 0 { - endpointAvailable = true - } - } - }, - }, - ) - - go controller.Run(ctx.Done()) - - var esController cache.Controller - _, esController = cache.NewInformer( - &cache.ListWatch{ - ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { - options.LabelSelector = "kubernetes.io/service-name=" + j.Name - obj, err := j.Client.DiscoveryV1().EndpointSlices(j.Namespace).List(ctx, options) - return runtime.Object(obj), err - }, - WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { - options.LabelSelector = "kubernetes.io/service-name=" + j.Name - return j.Client.DiscoveryV1().EndpointSlices(j.Namespace).Watch(ctx, options) - }, - }, - &discoveryv1.EndpointSlice{}, - 0, - cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - if es, ok := obj.(*discoveryv1.EndpointSlice); ok { - // TODO: currently we only consider addresses in 1 slice, but services with - // a large number of endpoints (>1000) may have multiple slices. Some slices - // with only a few addresses. We should check the addresses in all slices. - if len(es.Endpoints) > 0 && len(es.Endpoints[0].Addresses) > 0 { - endpointSliceAvailable = true - } - } - }, - UpdateFunc: func(old, cur interface{}) { - if es, ok := cur.(*discoveryv1.EndpointSlice); ok { - // TODO: currently we only consider addresses in 1 slice, but services with - // a large number of endpoints (>1000) may have multiple slices. Some slices - // with only a few addresses. We should check the addresses in all slices. - if len(es.Endpoints) > 0 && len(es.Endpoints[0].Addresses) > 0 { - endpointSliceAvailable = true - } - } - }, - }, - ) - - go esController.Run(ctx.Done()) - - err := wait.PollUntilContextCancel(ctx, 1*time.Second, false, func(ctx context.Context) (bool, error) { - return endpointAvailable && endpointSliceAvailable, nil + err := wait.PollUntilContextTimeout(ctx, framework.Poll, timeout, true, func(ctx context.Context) (bool, error) { + slices, err := j.Client.DiscoveryV1().EndpointSlices(j.Namespace).List(ctx, metav1.ListOptions{LabelSelector: "kubernetes.io/service-name=" + j.Name}) + if err != nil || len(slices.Items) == 0 { + // Retry + return false, nil + } + for _, es := range slices.Items { + if len(es.Endpoints) > 0 && len(es.Endpoints[0].Addresses) > 0 { + return true, nil + } + } + return false, nil }) if err != nil { return fmt.Errorf("no subset of available IP address found for the endpoint %s within timeout %v", j.Name, timeout) diff --git a/test/e2e/framework/util.go b/test/e2e/framework/util.go index 04f6ecaccd6..7eed8e36751 100644 --- a/test/e2e/framework/util.go +++ b/test/e2e/framework/util.go @@ -53,7 +53,6 @@ import ( "k8s.io/client-go/tools/clientcmd" clientcmdapi "k8s.io/client-go/tools/clientcmd/api" watchtools "k8s.io/client-go/tools/watch" - netutils "k8s.io/utils/net" ) // DEPRECATED constants. Use the timeouts in framework.Framework instead. @@ -411,29 +410,12 @@ func CheckTestingNSDeletedExcept(ctx context.Context, c clientset.Interface, ski return fmt.Errorf("Waiting for terminating namespaces to be deleted timed out") } -// WaitForServiceEndpointsNum waits until the amount of endpoints that implement service to expectNum. -// Some components use EndpointSlices other Endpoints, we must verify that both objects meet the requirements. +// WaitForServiceEndpointsNum waits until there are EndpointSlices for serviceName +// containing a total of expectNum endpoints. (If the service is dual-stack, expectNum +// must count the endpoints of both IP families.) func WaitForServiceEndpointsNum(ctx context.Context, c clientset.Interface, namespace, serviceName string, expectNum int, interval, timeout time.Duration) error { return wait.PollUntilContextTimeout(ctx, interval, timeout, false, func(ctx context.Context) (bool, error) { Logf("Waiting for amount of service:%s endpoints to be %d", serviceName, expectNum) - endpoint, err := c.CoreV1().Endpoints(namespace).Get(ctx, serviceName, metav1.GetOptions{}) - if err != nil { - Logf("Unexpected error trying to get Endpoints for %s : %v", serviceName, err) - return false, nil - } - - if countEndpointsNum(endpoint) != expectNum { - Logf("Unexpected number of Endpoints, got %d, expected %d", countEndpointsNum(endpoint), expectNum) - return false, nil - } - - // Endpoints are single family but EndpointSlices can have dual stack addresses, - // so we verify the number of addresses that matches the same family on both. - addressType := discoveryv1.AddressTypeIPv4 - if isIPv6Endpoint(endpoint) { - addressType = discoveryv1.AddressTypeIPv6 - } - esList, err := c.DiscoveryV1().EndpointSlices(namespace).List(ctx, metav1.ListOptions{LabelSelector: fmt.Sprintf("%s=%s", discoveryv1.LabelServiceName, serviceName)}) if err != nil { Logf("Unexpected error trying to get EndpointSlices for %s : %v", serviceName, err) @@ -445,44 +427,18 @@ func WaitForServiceEndpointsNum(ctx context.Context, c clientset.Interface, name return false, nil } - if countEndpointsSlicesNum(esList, addressType) != expectNum { - Logf("Unexpected number of Endpoints on Slices, got %d, expected %d", countEndpointsSlicesNum(esList, addressType), expectNum) + if countEndpointsSlicesNum(esList) != expectNum { + Logf("Unexpected number of Endpoints on Slices, got %d, expected %d", countEndpointsSlicesNum(esList), expectNum) return false, nil } return true, nil }) } -func countEndpointsNum(e *v1.Endpoints) int { - num := 0 - for _, sub := range e.Subsets { - num += len(sub.Addresses) - } - return num -} - -// isIPv6Endpoint returns true if the Endpoint uses IPv6 addresses -func isIPv6Endpoint(e *v1.Endpoints) bool { - for _, sub := range e.Subsets { - for _, addr := range sub.Addresses { - if len(addr.IP) == 0 { - continue - } - // Endpoints are single family, so it is enough to check only one address - return netutils.IsIPv6String(addr.IP) - } - } - // default to IPv4 an Endpoint without IP addresses - return false -} - -func countEndpointsSlicesNum(epList *discoveryv1.EndpointSliceList, addressType discoveryv1.AddressType) int { +func countEndpointsSlicesNum(epList *discoveryv1.EndpointSliceList) int { // EndpointSlices can contain the same address on multiple Slices addresses := sets.Set[string]{} for _, epSlice := range epList.Items { - if epSlice.AddressType != addressType { - continue - } for _, ep := range epSlice.Endpoints { if len(ep.Addresses) > 0 { addresses.Insert(ep.Addresses[0]) diff --git a/test/e2e/kubectl/kubectl.go b/test/e2e/kubectl/kubectl.go index 25c90a67444..3994771a021 100644 --- a/test/e2e/kubectl/kubectl.go +++ b/test/e2e/kubectl/kubectl.go @@ -43,6 +43,7 @@ import ( "sigs.k8s.io/yaml" v1 "k8s.io/api/core/v1" + discoveryv1 "k8s.io/api/discovery/v1" rbacv1 "k8s.io/api/rbac/v1" apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -66,7 +67,7 @@ import ( "k8s.io/kubernetes/test/e2e/framework" e2eauth "k8s.io/kubernetes/test/e2e/framework/auth" e2edebug "k8s.io/kubernetes/test/e2e/framework/debug" - e2eendpoints "k8s.io/kubernetes/test/e2e/framework/endpoints" + e2eendpointslice "k8s.io/kubernetes/test/e2e/framework/endpointslice" e2ekubectl "k8s.io/kubernetes/test/e2e/framework/kubectl" e2enetwork "k8s.io/kubernetes/test/e2e/framework/network" e2enode "k8s.io/kubernetes/test/e2e/framework/node" @@ -1538,10 +1539,10 @@ metadata: }) validateService := func(name string, servicePort int, timeout time.Duration) { err := wait.Poll(framework.Poll, timeout, func() (bool, error) { - ep, err := c.CoreV1().Endpoints(ns).Get(ctx, name, metav1.GetOptions{}) + slices, err := c.DiscoveryV1().EndpointSlices(ns).List(ctx, metav1.ListOptions{LabelSelector: fmt.Sprintf("%s=%s", discoveryv1.LabelServiceName, name)}) if err != nil { // log the real error - framework.Logf("Get endpoints failed (interval %v): %v", framework.Poll, err) + framework.Logf("List endpointslices failed (interval %v): %v", framework.Poll, err) // if the error is API not found or could not find default credentials or TLS handshake timeout, try again if apierrors.IsNotFound(err) || @@ -1552,7 +1553,7 @@ metadata: return false, err } - uidToPort := e2eendpoints.GetContainerPortsByPodUID(ep) + uidToPort := e2eendpointslice.GetContainerPortsByPodUID(slices.Items) if len(uidToPort) == 0 { framework.Logf("No endpoint found, retrying") return false, nil diff --git a/test/e2e/framework/endpoints/ports.go b/test/e2e/network/endpoints.go similarity index 71% rename from test/e2e/framework/endpoints/ports.go rename to test/e2e/network/endpoints.go index 8c116b031a5..29eb72afea1 100644 --- a/test/e2e/framework/endpoints/ports.go +++ b/test/e2e/network/endpoints.go @@ -14,22 +14,15 @@ See the License for the specific language governing permissions and limitations under the License. */ -package endpoints +package network import ( v1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/types" ) -// PortsByPodUID is a map that maps pod UID to container ports. -type PortsByPodUID map[types.UID][]int - -// FullPortsByPodUID is a map that maps pod UID to container ports. -type FullPortsByPodUID map[types.UID][]v1.ContainerPort - -// GetContainerPortsByPodUID returns a PortsByPodUID map on the given endpoints. -func GetContainerPortsByPodUID(ep *v1.Endpoints) PortsByPodUID { - m := PortsByPodUID{} +// getContainerPortsByPodUID returns a portsByPodUID map on the given endpoints. +func getContainerPortsByPodUID(ep *v1.Endpoints) portsByPodUID { + m := portsByPodUID{} for _, ss := range ep.Subsets { for _, port := range ss.Ports { for _, addr := range ss.Addresses { @@ -44,9 +37,9 @@ func GetContainerPortsByPodUID(ep *v1.Endpoints) PortsByPodUID { return m } -// GetFullContainerPortsByPodUID returns a FullPortsByPodUID map on the given endpoints with all the port data. -func GetFullContainerPortsByPodUID(ep *v1.Endpoints) FullPortsByPodUID { - m := FullPortsByPodUID{} +// getFullContainerPortsByPodUID returns a fullPortsByPodUID map on the given endpoints with all the port data. +func getFullContainerPortsByPodUID(ep *v1.Endpoints) fullPortsByPodUID { + m := fullPortsByPodUID{} for _, ss := range ep.Subsets { for _, port := range ss.Ports { containerPort := v1.ContainerPort{ diff --git a/test/e2e/network/service.go b/test/e2e/network/service.go index 8935b0e3b68..37f401b4b0a 100644 --- a/test/e2e/network/service.go +++ b/test/e2e/network/service.go @@ -59,7 +59,6 @@ import ( podutil "k8s.io/kubernetes/pkg/api/v1/pod" "k8s.io/kubernetes/test/e2e/framework" e2edeployment "k8s.io/kubernetes/test/e2e/framework/deployment" - e2eendpoints "k8s.io/kubernetes/test/e2e/framework/endpoints" e2eendpointslice "k8s.io/kubernetes/test/e2e/framework/endpointslice" e2enetwork "k8s.io/kubernetes/test/e2e/framework/network" e2enode "k8s.io/kubernetes/test/e2e/framework/node" @@ -4591,7 +4590,7 @@ func validateEndpointsPortsOrFail(ctx context.Context, c clientset.Interface, na // Retry the error return false, nil } - portsByUID := portsByPodUID(e2eendpoints.GetContainerPortsByPodUID(ep)) + portsByUID := getContainerPortsByPodUID(ep) if err := validatePorts(portsByUID, expectedPortsByPodUID); err != nil { if i%5 == 0 { framework.Logf("Unexpected endpoints: found %v, expected %v, will retry", portsByUID, expectedEndpoints) @@ -4675,7 +4674,7 @@ func validateEndpointsPortsWithProtocolsOrFail(c clientset.Interface, namespace, // Retry the error return false, nil } - portsByUID := fullPortsByPodUID(e2eendpoints.GetFullContainerPortsByPodUID(ep)) + portsByUID := getFullContainerPortsByPodUID(ep) if err := validatePortsAndProtocols(portsByUID, expectedPortsByPodUID); err != nil { if i%5 == 0 { framework.Logf("Unexpected endpoints: found %v, expected %v, will retry", portsByUID, expectedEndpoints)