diff --git a/test/e2e/apimachinery/aggregator.go b/test/e2e/apimachinery/aggregator.go index 596950ac178..a6448c106ab 100644 --- a/test/e2e/apimachinery/aggregator.go +++ b/test/e2e/apimachinery/aggregator.go @@ -28,6 +28,7 @@ import ( appsv1 "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" + discoveryv1 "k8s.io/api/discovery/v1" rbacv1 "k8s.io/api/rbac/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -763,9 +764,9 @@ func validateErrorWithDebugInfo(ctx context.Context, f *framework.Framework, err msg := fmt.Sprintf(msg, fields...) msg += fmt.Sprintf(" but received unexpected error:\n%v", err) client := f.ClientSet - ep, err := client.CoreV1().Endpoints(namespace).Get(ctx, "sample-api", metav1.GetOptions{}) + slices, err := client.DiscoveryV1().EndpointSlices(namespace).List(ctx, metav1.ListOptions{LabelSelector: fmt.Sprintf("%s=%s", discoveryv1.LabelServiceName, "sample-api")}) if err == nil { - msg += fmt.Sprintf("\nFound endpoints for sample-api:\n%v", ep) + msg += fmt.Sprintf("\nFound endpoint slices for sample-api:\n%v", slices) } pds, err := client.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{}) if err == nil { diff --git a/test/e2e/framework/endpoints/.import-restrictions b/test/e2e/framework/endpoints/.import-restrictions deleted file mode 100644 index 03b5ee5ec2c..00000000000 --- a/test/e2e/framework/endpoints/.import-restrictions +++ /dev/null @@ -1,12 +0,0 @@ -# This E2E framework sub-package is currently allowed to use arbitrary -# dependencies except of k/k/pkg, therefore we need to override the -# restrictions from the parent .import-restrictions file. -# -# At some point it may become useful to also check this package's -# dependencies more careful. -rules: - - selectorRegexp: "^k8s[.]io/kubernetes/pkg" - allowedPrefixes: [] - - - selectorRegexp: "" - allowedPrefixes: [ "" ] diff --git a/test/e2e/framework/service/jig.go b/test/e2e/framework/service/jig.go index ef8ac4a1421..2685ead4f92 100644 --- a/test/e2e/framework/service/jig.go +++ b/test/e2e/framework/service/jig.go @@ -33,17 +33,13 @@ import ( policyv1 "k8s.io/api/policy/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" - "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/intstr" utilnet "k8s.io/apimachinery/pkg/util/net" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/apimachinery/pkg/watch" clientset "k8s.io/client-go/kubernetes" - "k8s.io/client-go/tools/cache" "k8s.io/kubernetes/test/e2e/framework" e2enode "k8s.io/kubernetes/test/e2e/framework/node" e2epod "k8s.io/kubernetes/test/e2e/framework/pod" @@ -404,42 +400,47 @@ func (j *TestJig) GetEndpointNodeNames(ctx context.Context) (sets.String, error) if err != nil { return nil, err } - endpoints, err := j.Client.CoreV1().Endpoints(j.Namespace).Get(ctx, j.Name, metav1.GetOptions{}) + slices, err := j.Client.DiscoveryV1().EndpointSlices(j.Namespace).List(ctx, metav1.ListOptions{LabelSelector: discoveryv1.LabelServiceName + "=" + j.Name}) if err != nil { - return nil, fmt.Errorf("get endpoints for service %s/%s failed (%s)", j.Namespace, j.Name, err) - } - if len(endpoints.Subsets) == 0 { - return nil, fmt.Errorf("endpoint has no subsets, cannot determine node addresses") + return nil, fmt.Errorf("list endpointslices for service %s/%s failed (%w)", j.Namespace, j.Name, err) } epNodes := sets.NewString() - for _, ss := range endpoints.Subsets { - for _, e := range ss.Addresses { - if e.NodeName != nil { - epNodes.Insert(*e.NodeName) + for _, slice := range slices.Items { + for _, ep := range slice.Endpoints { + if ep.NodeName != nil { + epNodes.Insert(*ep.NodeName) } } } + if len(epNodes) == 0 { + return nil, fmt.Errorf("EndpointSlice has no endpoints, cannot determine node addresses") + } return epNodes, nil } -// WaitForEndpointOnNode waits for a service endpoint on the given node. +// WaitForEndpointOnNode waits for a service endpoint on the given node (which must be the service's only endpoint). func (j *TestJig) WaitForEndpointOnNode(ctx context.Context, nodeName string) error { return wait.PollUntilContextTimeout(ctx, framework.Poll, KubeProxyLagTimeout, true, func(ctx context.Context) (bool, error) { - endpoints, err := j.Client.CoreV1().Endpoints(j.Namespace).Get(ctx, j.Name, metav1.GetOptions{}) + slices, err := j.Client.DiscoveryV1().EndpointSlices(j.Namespace).List(ctx, metav1.ListOptions{LabelSelector: "kubernetes.io/service-name=" + j.Name}) if err != nil { - framework.Logf("Get endpoints for service %s/%s failed (%s)", j.Namespace, j.Name, err) + framework.Logf("List endpointslices for service %s/%s failed (%s)", j.Namespace, j.Name, err) return false, nil } - if len(endpoints.Subsets) == 0 { - framework.Logf("Expect endpoints with subsets, got none.") + if len(slices.Items) == 0 { + framework.Logf("Expected 1 EndpointSlice for service %s/%s, got 0", j.Namespace, j.Name) return false, nil } - // TODO: Handle multiple endpoints - if len(endpoints.Subsets[0].Addresses) == 0 { + slice := slices.Items[0] + if len(slice.Endpoints) == 0 { + framework.Logf("Expected EndpointSlice with Endpoints, got none.") + return false, nil + } + endpoint := slice.Endpoints[0] + if len(endpoint.Addresses) == 0 || (endpoint.Conditions.Ready != nil && !*endpoint.Conditions.Ready) { framework.Logf("Expected Ready endpoints - found none") return false, nil } - epHostName := *endpoints.Subsets[0].Addresses[0].NodeName + epHostName := *endpoint.NodeName framework.Logf("Pod for service %s/%s is on node %s", j.Namespace, j.Name, epHostName) if epHostName != nodeName { framework.Logf("Found endpoint on wrong node, expected %v, got %v", nodeName, epHostName) @@ -451,91 +452,18 @@ func (j *TestJig) WaitForEndpointOnNode(ctx context.Context, nodeName string) er // waitForAvailableEndpoint waits for at least 1 endpoint to be available till timeout func (j *TestJig) waitForAvailableEndpoint(ctx context.Context, timeout time.Duration) error { - ctx, cancel := context.WithTimeout(ctx, timeout) - defer cancel() - //Wait for endpoints to be created, this may take longer time if service backing pods are taking longer time to run - endpointSelector := fields.OneTermEqualSelector("metadata.name", j.Name) - endpointAvailable := false - endpointSliceAvailable := false - - var controller cache.Controller - _, controller = cache.NewInformer( - &cache.ListWatch{ - ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { - options.FieldSelector = endpointSelector.String() - obj, err := j.Client.CoreV1().Endpoints(j.Namespace).List(ctx, options) - return runtime.Object(obj), err - }, - WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { - options.FieldSelector = endpointSelector.String() - return j.Client.CoreV1().Endpoints(j.Namespace).Watch(ctx, options) - }, - }, - &v1.Endpoints{}, - 0, - cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - if e, ok := obj.(*v1.Endpoints); ok { - if len(e.Subsets) > 0 && len(e.Subsets[0].Addresses) > 0 { - endpointAvailable = true - } - } - }, - UpdateFunc: func(old, cur interface{}) { - if e, ok := cur.(*v1.Endpoints); ok { - if len(e.Subsets) > 0 && len(e.Subsets[0].Addresses) > 0 { - endpointAvailable = true - } - } - }, - }, - ) - - go controller.Run(ctx.Done()) - - var esController cache.Controller - _, esController = cache.NewInformer( - &cache.ListWatch{ - ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { - options.LabelSelector = "kubernetes.io/service-name=" + j.Name - obj, err := j.Client.DiscoveryV1().EndpointSlices(j.Namespace).List(ctx, options) - return runtime.Object(obj), err - }, - WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { - options.LabelSelector = "kubernetes.io/service-name=" + j.Name - return j.Client.DiscoveryV1().EndpointSlices(j.Namespace).Watch(ctx, options) - }, - }, - &discoveryv1.EndpointSlice{}, - 0, - cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - if es, ok := obj.(*discoveryv1.EndpointSlice); ok { - // TODO: currently we only consider addresses in 1 slice, but services with - // a large number of endpoints (>1000) may have multiple slices. Some slices - // with only a few addresses. We should check the addresses in all slices. - if len(es.Endpoints) > 0 && len(es.Endpoints[0].Addresses) > 0 { - endpointSliceAvailable = true - } - } - }, - UpdateFunc: func(old, cur interface{}) { - if es, ok := cur.(*discoveryv1.EndpointSlice); ok { - // TODO: currently we only consider addresses in 1 slice, but services with - // a large number of endpoints (>1000) may have multiple slices. Some slices - // with only a few addresses. We should check the addresses in all slices. - if len(es.Endpoints) > 0 && len(es.Endpoints[0].Addresses) > 0 { - endpointSliceAvailable = true - } - } - }, - }, - ) - - go esController.Run(ctx.Done()) - - err := wait.PollUntilContextCancel(ctx, 1*time.Second, false, func(ctx context.Context) (bool, error) { - return endpointAvailable && endpointSliceAvailable, nil + err := wait.PollUntilContextTimeout(ctx, framework.Poll, timeout, true, func(ctx context.Context) (bool, error) { + slices, err := j.Client.DiscoveryV1().EndpointSlices(j.Namespace).List(ctx, metav1.ListOptions{LabelSelector: "kubernetes.io/service-name=" + j.Name}) + if err != nil || len(slices.Items) == 0 { + // Retry + return false, nil + } + for _, es := range slices.Items { + if len(es.Endpoints) > 0 && len(es.Endpoints[0].Addresses) > 0 { + return true, nil + } + } + return false, nil }) if err != nil { return fmt.Errorf("no subset of available IP address found for the endpoint %s within timeout %v", j.Name, timeout) diff --git a/test/e2e/framework/util.go b/test/e2e/framework/util.go index 04f6ecaccd6..7eed8e36751 100644 --- a/test/e2e/framework/util.go +++ b/test/e2e/framework/util.go @@ -53,7 +53,6 @@ import ( "k8s.io/client-go/tools/clientcmd" clientcmdapi "k8s.io/client-go/tools/clientcmd/api" watchtools "k8s.io/client-go/tools/watch" - netutils "k8s.io/utils/net" ) // DEPRECATED constants. Use the timeouts in framework.Framework instead. @@ -411,29 +410,12 @@ func CheckTestingNSDeletedExcept(ctx context.Context, c clientset.Interface, ski return fmt.Errorf("Waiting for terminating namespaces to be deleted timed out") } -// WaitForServiceEndpointsNum waits until the amount of endpoints that implement service to expectNum. -// Some components use EndpointSlices other Endpoints, we must verify that both objects meet the requirements. +// WaitForServiceEndpointsNum waits until there are EndpointSlices for serviceName +// containing a total of expectNum endpoints. (If the service is dual-stack, expectNum +// must count the endpoints of both IP families.) func WaitForServiceEndpointsNum(ctx context.Context, c clientset.Interface, namespace, serviceName string, expectNum int, interval, timeout time.Duration) error { return wait.PollUntilContextTimeout(ctx, interval, timeout, false, func(ctx context.Context) (bool, error) { Logf("Waiting for amount of service:%s endpoints to be %d", serviceName, expectNum) - endpoint, err := c.CoreV1().Endpoints(namespace).Get(ctx, serviceName, metav1.GetOptions{}) - if err != nil { - Logf("Unexpected error trying to get Endpoints for %s : %v", serviceName, err) - return false, nil - } - - if countEndpointsNum(endpoint) != expectNum { - Logf("Unexpected number of Endpoints, got %d, expected %d", countEndpointsNum(endpoint), expectNum) - return false, nil - } - - // Endpoints are single family but EndpointSlices can have dual stack addresses, - // so we verify the number of addresses that matches the same family on both. - addressType := discoveryv1.AddressTypeIPv4 - if isIPv6Endpoint(endpoint) { - addressType = discoveryv1.AddressTypeIPv6 - } - esList, err := c.DiscoveryV1().EndpointSlices(namespace).List(ctx, metav1.ListOptions{LabelSelector: fmt.Sprintf("%s=%s", discoveryv1.LabelServiceName, serviceName)}) if err != nil { Logf("Unexpected error trying to get EndpointSlices for %s : %v", serviceName, err) @@ -445,44 +427,18 @@ func WaitForServiceEndpointsNum(ctx context.Context, c clientset.Interface, name return false, nil } - if countEndpointsSlicesNum(esList, addressType) != expectNum { - Logf("Unexpected number of Endpoints on Slices, got %d, expected %d", countEndpointsSlicesNum(esList, addressType), expectNum) + if countEndpointsSlicesNum(esList) != expectNum { + Logf("Unexpected number of Endpoints on Slices, got %d, expected %d", countEndpointsSlicesNum(esList), expectNum) return false, nil } return true, nil }) } -func countEndpointsNum(e *v1.Endpoints) int { - num := 0 - for _, sub := range e.Subsets { - num += len(sub.Addresses) - } - return num -} - -// isIPv6Endpoint returns true if the Endpoint uses IPv6 addresses -func isIPv6Endpoint(e *v1.Endpoints) bool { - for _, sub := range e.Subsets { - for _, addr := range sub.Addresses { - if len(addr.IP) == 0 { - continue - } - // Endpoints are single family, so it is enough to check only one address - return netutils.IsIPv6String(addr.IP) - } - } - // default to IPv4 an Endpoint without IP addresses - return false -} - -func countEndpointsSlicesNum(epList *discoveryv1.EndpointSliceList, addressType discoveryv1.AddressType) int { +func countEndpointsSlicesNum(epList *discoveryv1.EndpointSliceList) int { // EndpointSlices can contain the same address on multiple Slices addresses := sets.Set[string]{} for _, epSlice := range epList.Items { - if epSlice.AddressType != addressType { - continue - } for _, ep := range epSlice.Endpoints { if len(ep.Addresses) > 0 { addresses.Insert(ep.Addresses[0]) diff --git a/test/e2e/kubectl/kubectl.go b/test/e2e/kubectl/kubectl.go index 25c90a67444..3994771a021 100644 --- a/test/e2e/kubectl/kubectl.go +++ b/test/e2e/kubectl/kubectl.go @@ -43,6 +43,7 @@ import ( "sigs.k8s.io/yaml" v1 "k8s.io/api/core/v1" + discoveryv1 "k8s.io/api/discovery/v1" rbacv1 "k8s.io/api/rbac/v1" apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -66,7 +67,7 @@ import ( "k8s.io/kubernetes/test/e2e/framework" e2eauth "k8s.io/kubernetes/test/e2e/framework/auth" e2edebug "k8s.io/kubernetes/test/e2e/framework/debug" - e2eendpoints "k8s.io/kubernetes/test/e2e/framework/endpoints" + e2eendpointslice "k8s.io/kubernetes/test/e2e/framework/endpointslice" e2ekubectl "k8s.io/kubernetes/test/e2e/framework/kubectl" e2enetwork "k8s.io/kubernetes/test/e2e/framework/network" e2enode "k8s.io/kubernetes/test/e2e/framework/node" @@ -1538,10 +1539,10 @@ metadata: }) validateService := func(name string, servicePort int, timeout time.Duration) { err := wait.Poll(framework.Poll, timeout, func() (bool, error) { - ep, err := c.CoreV1().Endpoints(ns).Get(ctx, name, metav1.GetOptions{}) + slices, err := c.DiscoveryV1().EndpointSlices(ns).List(ctx, metav1.ListOptions{LabelSelector: fmt.Sprintf("%s=%s", discoveryv1.LabelServiceName, name)}) if err != nil { // log the real error - framework.Logf("Get endpoints failed (interval %v): %v", framework.Poll, err) + framework.Logf("List endpointslices failed (interval %v): %v", framework.Poll, err) // if the error is API not found or could not find default credentials or TLS handshake timeout, try again if apierrors.IsNotFound(err) || @@ -1552,7 +1553,7 @@ metadata: return false, err } - uidToPort := e2eendpoints.GetContainerPortsByPodUID(ep) + uidToPort := e2eendpointslice.GetContainerPortsByPodUID(slices.Items) if len(uidToPort) == 0 { framework.Logf("No endpoint found, retrying") return false, nil diff --git a/test/e2e/framework/endpoints/ports.go b/test/e2e/network/endpoints.go similarity index 71% rename from test/e2e/framework/endpoints/ports.go rename to test/e2e/network/endpoints.go index 8c116b031a5..29eb72afea1 100644 --- a/test/e2e/framework/endpoints/ports.go +++ b/test/e2e/network/endpoints.go @@ -14,22 +14,15 @@ See the License for the specific language governing permissions and limitations under the License. */ -package endpoints +package network import ( v1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/types" ) -// PortsByPodUID is a map that maps pod UID to container ports. -type PortsByPodUID map[types.UID][]int - -// FullPortsByPodUID is a map that maps pod UID to container ports. -type FullPortsByPodUID map[types.UID][]v1.ContainerPort - -// GetContainerPortsByPodUID returns a PortsByPodUID map on the given endpoints. -func GetContainerPortsByPodUID(ep *v1.Endpoints) PortsByPodUID { - m := PortsByPodUID{} +// getContainerPortsByPodUID returns a portsByPodUID map on the given endpoints. +func getContainerPortsByPodUID(ep *v1.Endpoints) portsByPodUID { + m := portsByPodUID{} for _, ss := range ep.Subsets { for _, port := range ss.Ports { for _, addr := range ss.Addresses { @@ -44,9 +37,9 @@ func GetContainerPortsByPodUID(ep *v1.Endpoints) PortsByPodUID { return m } -// GetFullContainerPortsByPodUID returns a FullPortsByPodUID map on the given endpoints with all the port data. -func GetFullContainerPortsByPodUID(ep *v1.Endpoints) FullPortsByPodUID { - m := FullPortsByPodUID{} +// getFullContainerPortsByPodUID returns a fullPortsByPodUID map on the given endpoints with all the port data. +func getFullContainerPortsByPodUID(ep *v1.Endpoints) fullPortsByPodUID { + m := fullPortsByPodUID{} for _, ss := range ep.Subsets { for _, port := range ss.Ports { containerPort := v1.ContainerPort{ diff --git a/test/e2e/network/service.go b/test/e2e/network/service.go index 8935b0e3b68..37f401b4b0a 100644 --- a/test/e2e/network/service.go +++ b/test/e2e/network/service.go @@ -59,7 +59,6 @@ import ( podutil "k8s.io/kubernetes/pkg/api/v1/pod" "k8s.io/kubernetes/test/e2e/framework" e2edeployment "k8s.io/kubernetes/test/e2e/framework/deployment" - e2eendpoints "k8s.io/kubernetes/test/e2e/framework/endpoints" e2eendpointslice "k8s.io/kubernetes/test/e2e/framework/endpointslice" e2enetwork "k8s.io/kubernetes/test/e2e/framework/network" e2enode "k8s.io/kubernetes/test/e2e/framework/node" @@ -4591,7 +4590,7 @@ func validateEndpointsPortsOrFail(ctx context.Context, c clientset.Interface, na // Retry the error return false, nil } - portsByUID := portsByPodUID(e2eendpoints.GetContainerPortsByPodUID(ep)) + portsByUID := getContainerPortsByPodUID(ep) if err := validatePorts(portsByUID, expectedPortsByPodUID); err != nil { if i%5 == 0 { framework.Logf("Unexpected endpoints: found %v, expected %v, will retry", portsByUID, expectedEndpoints) @@ -4675,7 +4674,7 @@ func validateEndpointsPortsWithProtocolsOrFail(c clientset.Interface, namespace, // Retry the error return false, nil } - portsByUID := fullPortsByPodUID(e2eendpoints.GetFullContainerPortsByPodUID(ep)) + portsByUID := getFullContainerPortsByPodUID(ep) if err := validatePortsAndProtocols(portsByUID, expectedPortsByPodUID); err != nil { if i%5 == 0 { framework.Logf("Unexpected endpoints: found %v, expected %v, will retry", portsByUID, expectedEndpoints)