From 969ecabc0f25dc9fbaccb0741104f52f2fc09f91 Mon Sep 17 00:00:00 2001 From: Dan Winship Date: Wed, 20 Nov 2024 14:48:16 -0500 Subject: [PATCH] Remove all references to v1.Endpoints from non-network e2e tests kube-proxy does not look at Endpoints ever, so it is incorrect for a test to assume that there is any correlation between whether Endpoints exist and whether a Service is working. Tests should only be using the v1.Endpoints API if they are explicitly testing the behavior of v1.Endpoints, the Endpoints controller, or the EndpointSlice mirroring controller. There is no reason for any non SIG Network tests to be testing any of those things, so there should be no references to v1.Endpoints in test/e2e outside of test/e2e/network. Also, simplify some pointlessly complicated e2eservice code. --- test/e2e/apimachinery/aggregator.go | 5 +- .../framework/endpoints/.import-restrictions | 12 -- test/e2e/framework/service/jig.go | 140 +++++------------- test/e2e/framework/util.go | 56 +------ test/e2e/kubectl/kubectl.go | 9 +- .../ports.go => network/endpoints.go} | 21 +-- test/e2e/network/service.go | 5 +- 7 files changed, 57 insertions(+), 191 deletions(-) delete mode 100644 test/e2e/framework/endpoints/.import-restrictions rename test/e2e/{framework/endpoints/ports.go => network/endpoints.go} (71%) diff --git a/test/e2e/apimachinery/aggregator.go b/test/e2e/apimachinery/aggregator.go index 596950ac178..a6448c106ab 100644 --- a/test/e2e/apimachinery/aggregator.go +++ b/test/e2e/apimachinery/aggregator.go @@ -28,6 +28,7 @@ import ( appsv1 "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" + discoveryv1 "k8s.io/api/discovery/v1" rbacv1 "k8s.io/api/rbac/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -763,9 +764,9 @@ func validateErrorWithDebugInfo(ctx context.Context, f *framework.Framework, err msg := fmt.Sprintf(msg, fields...) msg += fmt.Sprintf(" but received unexpected error:\n%v", err) client := f.ClientSet - ep, err := client.CoreV1().Endpoints(namespace).Get(ctx, "sample-api", metav1.GetOptions{}) + slices, err := client.DiscoveryV1().EndpointSlices(namespace).List(ctx, metav1.ListOptions{LabelSelector: fmt.Sprintf("%s=%s", discoveryv1.LabelServiceName, "sample-api")}) if err == nil { - msg += fmt.Sprintf("\nFound endpoints for sample-api:\n%v", ep) + msg += fmt.Sprintf("\nFound endpoint slices for sample-api:\n%v", slices) } pds, err := client.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{}) if err == nil { diff --git a/test/e2e/framework/endpoints/.import-restrictions b/test/e2e/framework/endpoints/.import-restrictions deleted file mode 100644 index 03b5ee5ec2c..00000000000 --- a/test/e2e/framework/endpoints/.import-restrictions +++ /dev/null @@ -1,12 +0,0 @@ -# This E2E framework sub-package is currently allowed to use arbitrary -# dependencies except of k/k/pkg, therefore we need to override the -# restrictions from the parent .import-restrictions file. -# -# At some point it may become useful to also check this package's -# dependencies more careful. -rules: - - selectorRegexp: "^k8s[.]io/kubernetes/pkg" - allowedPrefixes: [] - - - selectorRegexp: "" - allowedPrefixes: [ "" ] diff --git a/test/e2e/framework/service/jig.go b/test/e2e/framework/service/jig.go index ef8ac4a1421..2685ead4f92 100644 --- a/test/e2e/framework/service/jig.go +++ b/test/e2e/framework/service/jig.go @@ -33,17 +33,13 @@ import ( policyv1 "k8s.io/api/policy/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" - "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/intstr" utilnet "k8s.io/apimachinery/pkg/util/net" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/apimachinery/pkg/watch" clientset "k8s.io/client-go/kubernetes" - "k8s.io/client-go/tools/cache" "k8s.io/kubernetes/test/e2e/framework" e2enode "k8s.io/kubernetes/test/e2e/framework/node" e2epod "k8s.io/kubernetes/test/e2e/framework/pod" @@ -404,42 +400,47 @@ func (j *TestJig) GetEndpointNodeNames(ctx context.Context) (sets.String, error) if err != nil { return nil, err } - endpoints, err := j.Client.CoreV1().Endpoints(j.Namespace).Get(ctx, j.Name, metav1.GetOptions{}) + slices, err := j.Client.DiscoveryV1().EndpointSlices(j.Namespace).List(ctx, metav1.ListOptions{LabelSelector: discoveryv1.LabelServiceName + "=" + j.Name}) if err != nil { - return nil, fmt.Errorf("get endpoints for service %s/%s failed (%s)", j.Namespace, j.Name, err) - } - if len(endpoints.Subsets) == 0 { - return nil, fmt.Errorf("endpoint has no subsets, cannot determine node addresses") + return nil, fmt.Errorf("list endpointslices for service %s/%s failed (%w)", j.Namespace, j.Name, err) } epNodes := sets.NewString() - for _, ss := range endpoints.Subsets { - for _, e := range ss.Addresses { - if e.NodeName != nil { - epNodes.Insert(*e.NodeName) + for _, slice := range slices.Items { + for _, ep := range slice.Endpoints { + if ep.NodeName != nil { + epNodes.Insert(*ep.NodeName) } } } + if len(epNodes) == 0 { + return nil, fmt.Errorf("EndpointSlice has no endpoints, cannot determine node addresses") + } return epNodes, nil } -// WaitForEndpointOnNode waits for a service endpoint on the given node. +// WaitForEndpointOnNode waits for a service endpoint on the given node (which must be the service's only endpoint). func (j *TestJig) WaitForEndpointOnNode(ctx context.Context, nodeName string) error { return wait.PollUntilContextTimeout(ctx, framework.Poll, KubeProxyLagTimeout, true, func(ctx context.Context) (bool, error) { - endpoints, err := j.Client.CoreV1().Endpoints(j.Namespace).Get(ctx, j.Name, metav1.GetOptions{}) + slices, err := j.Client.DiscoveryV1().EndpointSlices(j.Namespace).List(ctx, metav1.ListOptions{LabelSelector: "kubernetes.io/service-name=" + j.Name}) if err != nil { - framework.Logf("Get endpoints for service %s/%s failed (%s)", j.Namespace, j.Name, err) + framework.Logf("List endpointslices for service %s/%s failed (%s)", j.Namespace, j.Name, err) return false, nil } - if len(endpoints.Subsets) == 0 { - framework.Logf("Expect endpoints with subsets, got none.") + if len(slices.Items) == 0 { + framework.Logf("Expected 1 EndpointSlice for service %s/%s, got 0", j.Namespace, j.Name) return false, nil } - // TODO: Handle multiple endpoints - if len(endpoints.Subsets[0].Addresses) == 0 { + slice := slices.Items[0] + if len(slice.Endpoints) == 0 { + framework.Logf("Expected EndpointSlice with Endpoints, got none.") + return false, nil + } + endpoint := slice.Endpoints[0] + if len(endpoint.Addresses) == 0 || (endpoint.Conditions.Ready != nil && !*endpoint.Conditions.Ready) { framework.Logf("Expected Ready endpoints - found none") return false, nil } - epHostName := *endpoints.Subsets[0].Addresses[0].NodeName + epHostName := *endpoint.NodeName framework.Logf("Pod for service %s/%s is on node %s", j.Namespace, j.Name, epHostName) if epHostName != nodeName { framework.Logf("Found endpoint on wrong node, expected %v, got %v", nodeName, epHostName) @@ -451,91 +452,18 @@ func (j *TestJig) WaitForEndpointOnNode(ctx context.Context, nodeName string) er // waitForAvailableEndpoint waits for at least 1 endpoint to be available till timeout func (j *TestJig) waitForAvailableEndpoint(ctx context.Context, timeout time.Duration) error { - ctx, cancel := context.WithTimeout(ctx, timeout) - defer cancel() - //Wait for endpoints to be created, this may take longer time if service backing pods are taking longer time to run - endpointSelector := fields.OneTermEqualSelector("metadata.name", j.Name) - endpointAvailable := false - endpointSliceAvailable := false - - var controller cache.Controller - _, controller = cache.NewInformer( - &cache.ListWatch{ - ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { - options.FieldSelector = endpointSelector.String() - obj, err := j.Client.CoreV1().Endpoints(j.Namespace).List(ctx, options) - return runtime.Object(obj), err - }, - WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { - options.FieldSelector = endpointSelector.String() - return j.Client.CoreV1().Endpoints(j.Namespace).Watch(ctx, options) - }, - }, - &v1.Endpoints{}, - 0, - cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - if e, ok := obj.(*v1.Endpoints); ok { - if len(e.Subsets) > 0 && len(e.Subsets[0].Addresses) > 0 { - endpointAvailable = true - } - } - }, - UpdateFunc: func(old, cur interface{}) { - if e, ok := cur.(*v1.Endpoints); ok { - if len(e.Subsets) > 0 && len(e.Subsets[0].Addresses) > 0 { - endpointAvailable = true - } - } - }, - }, - ) - - go controller.Run(ctx.Done()) - - var esController cache.Controller - _, esController = cache.NewInformer( - &cache.ListWatch{ - ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { - options.LabelSelector = "kubernetes.io/service-name=" + j.Name - obj, err := j.Client.DiscoveryV1().EndpointSlices(j.Namespace).List(ctx, options) - return runtime.Object(obj), err - }, - WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { - options.LabelSelector = "kubernetes.io/service-name=" + j.Name - return j.Client.DiscoveryV1().EndpointSlices(j.Namespace).Watch(ctx, options) - }, - }, - &discoveryv1.EndpointSlice{}, - 0, - cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - if es, ok := obj.(*discoveryv1.EndpointSlice); ok { - // TODO: currently we only consider addresses in 1 slice, but services with - // a large number of endpoints (>1000) may have multiple slices. Some slices - // with only a few addresses. We should check the addresses in all slices. - if len(es.Endpoints) > 0 && len(es.Endpoints[0].Addresses) > 0 { - endpointSliceAvailable = true - } - } - }, - UpdateFunc: func(old, cur interface{}) { - if es, ok := cur.(*discoveryv1.EndpointSlice); ok { - // TODO: currently we only consider addresses in 1 slice, but services with - // a large number of endpoints (>1000) may have multiple slices. Some slices - // with only a few addresses. We should check the addresses in all slices. - if len(es.Endpoints) > 0 && len(es.Endpoints[0].Addresses) > 0 { - endpointSliceAvailable = true - } - } - }, - }, - ) - - go esController.Run(ctx.Done()) - - err := wait.PollUntilContextCancel(ctx, 1*time.Second, false, func(ctx context.Context) (bool, error) { - return endpointAvailable && endpointSliceAvailable, nil + err := wait.PollUntilContextTimeout(ctx, framework.Poll, timeout, true, func(ctx context.Context) (bool, error) { + slices, err := j.Client.DiscoveryV1().EndpointSlices(j.Namespace).List(ctx, metav1.ListOptions{LabelSelector: "kubernetes.io/service-name=" + j.Name}) + if err != nil || len(slices.Items) == 0 { + // Retry + return false, nil + } + for _, es := range slices.Items { + if len(es.Endpoints) > 0 && len(es.Endpoints[0].Addresses) > 0 { + return true, nil + } + } + return false, nil }) if err != nil { return fmt.Errorf("no subset of available IP address found for the endpoint %s within timeout %v", j.Name, timeout) diff --git a/test/e2e/framework/util.go b/test/e2e/framework/util.go index 04f6ecaccd6..7eed8e36751 100644 --- a/test/e2e/framework/util.go +++ b/test/e2e/framework/util.go @@ -53,7 +53,6 @@ import ( "k8s.io/client-go/tools/clientcmd" clientcmdapi "k8s.io/client-go/tools/clientcmd/api" watchtools "k8s.io/client-go/tools/watch" - netutils "k8s.io/utils/net" ) // DEPRECATED constants. Use the timeouts in framework.Framework instead. @@ -411,29 +410,12 @@ func CheckTestingNSDeletedExcept(ctx context.Context, c clientset.Interface, ski return fmt.Errorf("Waiting for terminating namespaces to be deleted timed out") } -// WaitForServiceEndpointsNum waits until the amount of endpoints that implement service to expectNum. -// Some components use EndpointSlices other Endpoints, we must verify that both objects meet the requirements. +// WaitForServiceEndpointsNum waits until there are EndpointSlices for serviceName +// containing a total of expectNum endpoints. (If the service is dual-stack, expectNum +// must count the endpoints of both IP families.) func WaitForServiceEndpointsNum(ctx context.Context, c clientset.Interface, namespace, serviceName string, expectNum int, interval, timeout time.Duration) error { return wait.PollUntilContextTimeout(ctx, interval, timeout, false, func(ctx context.Context) (bool, error) { Logf("Waiting for amount of service:%s endpoints to be %d", serviceName, expectNum) - endpoint, err := c.CoreV1().Endpoints(namespace).Get(ctx, serviceName, metav1.GetOptions{}) - if err != nil { - Logf("Unexpected error trying to get Endpoints for %s : %v", serviceName, err) - return false, nil - } - - if countEndpointsNum(endpoint) != expectNum { - Logf("Unexpected number of Endpoints, got %d, expected %d", countEndpointsNum(endpoint), expectNum) - return false, nil - } - - // Endpoints are single family but EndpointSlices can have dual stack addresses, - // so we verify the number of addresses that matches the same family on both. - addressType := discoveryv1.AddressTypeIPv4 - if isIPv6Endpoint(endpoint) { - addressType = discoveryv1.AddressTypeIPv6 - } - esList, err := c.DiscoveryV1().EndpointSlices(namespace).List(ctx, metav1.ListOptions{LabelSelector: fmt.Sprintf("%s=%s", discoveryv1.LabelServiceName, serviceName)}) if err != nil { Logf("Unexpected error trying to get EndpointSlices for %s : %v", serviceName, err) @@ -445,44 +427,18 @@ func WaitForServiceEndpointsNum(ctx context.Context, c clientset.Interface, name return false, nil } - if countEndpointsSlicesNum(esList, addressType) != expectNum { - Logf("Unexpected number of Endpoints on Slices, got %d, expected %d", countEndpointsSlicesNum(esList, addressType), expectNum) + if countEndpointsSlicesNum(esList) != expectNum { + Logf("Unexpected number of Endpoints on Slices, got %d, expected %d", countEndpointsSlicesNum(esList), expectNum) return false, nil } return true, nil }) } -func countEndpointsNum(e *v1.Endpoints) int { - num := 0 - for _, sub := range e.Subsets { - num += len(sub.Addresses) - } - return num -} - -// isIPv6Endpoint returns true if the Endpoint uses IPv6 addresses -func isIPv6Endpoint(e *v1.Endpoints) bool { - for _, sub := range e.Subsets { - for _, addr := range sub.Addresses { - if len(addr.IP) == 0 { - continue - } - // Endpoints are single family, so it is enough to check only one address - return netutils.IsIPv6String(addr.IP) - } - } - // default to IPv4 an Endpoint without IP addresses - return false -} - -func countEndpointsSlicesNum(epList *discoveryv1.EndpointSliceList, addressType discoveryv1.AddressType) int { +func countEndpointsSlicesNum(epList *discoveryv1.EndpointSliceList) int { // EndpointSlices can contain the same address on multiple Slices addresses := sets.Set[string]{} for _, epSlice := range epList.Items { - if epSlice.AddressType != addressType { - continue - } for _, ep := range epSlice.Endpoints { if len(ep.Addresses) > 0 { addresses.Insert(ep.Addresses[0]) diff --git a/test/e2e/kubectl/kubectl.go b/test/e2e/kubectl/kubectl.go index 25c90a67444..3994771a021 100644 --- a/test/e2e/kubectl/kubectl.go +++ b/test/e2e/kubectl/kubectl.go @@ -43,6 +43,7 @@ import ( "sigs.k8s.io/yaml" v1 "k8s.io/api/core/v1" + discoveryv1 "k8s.io/api/discovery/v1" rbacv1 "k8s.io/api/rbac/v1" apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -66,7 +67,7 @@ import ( "k8s.io/kubernetes/test/e2e/framework" e2eauth "k8s.io/kubernetes/test/e2e/framework/auth" e2edebug "k8s.io/kubernetes/test/e2e/framework/debug" - e2eendpoints "k8s.io/kubernetes/test/e2e/framework/endpoints" + e2eendpointslice "k8s.io/kubernetes/test/e2e/framework/endpointslice" e2ekubectl "k8s.io/kubernetes/test/e2e/framework/kubectl" e2enetwork "k8s.io/kubernetes/test/e2e/framework/network" e2enode "k8s.io/kubernetes/test/e2e/framework/node" @@ -1538,10 +1539,10 @@ metadata: }) validateService := func(name string, servicePort int, timeout time.Duration) { err := wait.Poll(framework.Poll, timeout, func() (bool, error) { - ep, err := c.CoreV1().Endpoints(ns).Get(ctx, name, metav1.GetOptions{}) + slices, err := c.DiscoveryV1().EndpointSlices(ns).List(ctx, metav1.ListOptions{LabelSelector: fmt.Sprintf("%s=%s", discoveryv1.LabelServiceName, name)}) if err != nil { // log the real error - framework.Logf("Get endpoints failed (interval %v): %v", framework.Poll, err) + framework.Logf("List endpointslices failed (interval %v): %v", framework.Poll, err) // if the error is API not found or could not find default credentials or TLS handshake timeout, try again if apierrors.IsNotFound(err) || @@ -1552,7 +1553,7 @@ metadata: return false, err } - uidToPort := e2eendpoints.GetContainerPortsByPodUID(ep) + uidToPort := e2eendpointslice.GetContainerPortsByPodUID(slices.Items) if len(uidToPort) == 0 { framework.Logf("No endpoint found, retrying") return false, nil diff --git a/test/e2e/framework/endpoints/ports.go b/test/e2e/network/endpoints.go similarity index 71% rename from test/e2e/framework/endpoints/ports.go rename to test/e2e/network/endpoints.go index 8c116b031a5..29eb72afea1 100644 --- a/test/e2e/framework/endpoints/ports.go +++ b/test/e2e/network/endpoints.go @@ -14,22 +14,15 @@ See the License for the specific language governing permissions and limitations under the License. */ -package endpoints +package network import ( v1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/types" ) -// PortsByPodUID is a map that maps pod UID to container ports. -type PortsByPodUID map[types.UID][]int - -// FullPortsByPodUID is a map that maps pod UID to container ports. -type FullPortsByPodUID map[types.UID][]v1.ContainerPort - -// GetContainerPortsByPodUID returns a PortsByPodUID map on the given endpoints. -func GetContainerPortsByPodUID(ep *v1.Endpoints) PortsByPodUID { - m := PortsByPodUID{} +// getContainerPortsByPodUID returns a portsByPodUID map on the given endpoints. +func getContainerPortsByPodUID(ep *v1.Endpoints) portsByPodUID { + m := portsByPodUID{} for _, ss := range ep.Subsets { for _, port := range ss.Ports { for _, addr := range ss.Addresses { @@ -44,9 +37,9 @@ func GetContainerPortsByPodUID(ep *v1.Endpoints) PortsByPodUID { return m } -// GetFullContainerPortsByPodUID returns a FullPortsByPodUID map on the given endpoints with all the port data. -func GetFullContainerPortsByPodUID(ep *v1.Endpoints) FullPortsByPodUID { - m := FullPortsByPodUID{} +// getFullContainerPortsByPodUID returns a fullPortsByPodUID map on the given endpoints with all the port data. +func getFullContainerPortsByPodUID(ep *v1.Endpoints) fullPortsByPodUID { + m := fullPortsByPodUID{} for _, ss := range ep.Subsets { for _, port := range ss.Ports { containerPort := v1.ContainerPort{ diff --git a/test/e2e/network/service.go b/test/e2e/network/service.go index 8935b0e3b68..37f401b4b0a 100644 --- a/test/e2e/network/service.go +++ b/test/e2e/network/service.go @@ -59,7 +59,6 @@ import ( podutil "k8s.io/kubernetes/pkg/api/v1/pod" "k8s.io/kubernetes/test/e2e/framework" e2edeployment "k8s.io/kubernetes/test/e2e/framework/deployment" - e2eendpoints "k8s.io/kubernetes/test/e2e/framework/endpoints" e2eendpointslice "k8s.io/kubernetes/test/e2e/framework/endpointslice" e2enetwork "k8s.io/kubernetes/test/e2e/framework/network" e2enode "k8s.io/kubernetes/test/e2e/framework/node" @@ -4591,7 +4590,7 @@ func validateEndpointsPortsOrFail(ctx context.Context, c clientset.Interface, na // Retry the error return false, nil } - portsByUID := portsByPodUID(e2eendpoints.GetContainerPortsByPodUID(ep)) + portsByUID := getContainerPortsByPodUID(ep) if err := validatePorts(portsByUID, expectedPortsByPodUID); err != nil { if i%5 == 0 { framework.Logf("Unexpected endpoints: found %v, expected %v, will retry", portsByUID, expectedEndpoints) @@ -4675,7 +4674,7 @@ func validateEndpointsPortsWithProtocolsOrFail(c clientset.Interface, namespace, // Retry the error return false, nil } - portsByUID := fullPortsByPodUID(e2eendpoints.GetFullContainerPortsByPodUID(ep)) + portsByUID := getFullContainerPortsByPodUID(ep) if err := validatePortsAndProtocols(portsByUID, expectedPortsByPodUID); err != nil { if i%5 == 0 { framework.Logf("Unexpected endpoints: found %v, expected %v, will retry", portsByUID, expectedEndpoints)