diff --git a/test/e2e/apimachinery/aggregator.go b/test/e2e/apimachinery/aggregator.go index 596950ac178..a6448c106ab 100644 --- a/test/e2e/apimachinery/aggregator.go +++ b/test/e2e/apimachinery/aggregator.go @@ -28,6 +28,7 @@ import ( appsv1 "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" + discoveryv1 "k8s.io/api/discovery/v1" rbacv1 "k8s.io/api/rbac/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -763,9 +764,9 @@ func validateErrorWithDebugInfo(ctx context.Context, f *framework.Framework, err msg := fmt.Sprintf(msg, fields...) msg += fmt.Sprintf(" but received unexpected error:\n%v", err) client := f.ClientSet - ep, err := client.CoreV1().Endpoints(namespace).Get(ctx, "sample-api", metav1.GetOptions{}) + slices, err := client.DiscoveryV1().EndpointSlices(namespace).List(ctx, metav1.ListOptions{LabelSelector: fmt.Sprintf("%s=%s", discoveryv1.LabelServiceName, "sample-api")}) if err == nil { - msg += fmt.Sprintf("\nFound endpoints for sample-api:\n%v", ep) + msg += fmt.Sprintf("\nFound endpoint slices for sample-api:\n%v", slices) } pds, err := client.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{}) if err == nil { diff --git a/test/e2e/framework/endpoints/.import-restrictions b/test/e2e/framework/endpoints/.import-restrictions deleted file mode 100644 index 03b5ee5ec2c..00000000000 --- a/test/e2e/framework/endpoints/.import-restrictions +++ /dev/null @@ -1,12 +0,0 @@ -# This E2E framework sub-package is currently allowed to use arbitrary -# dependencies except of k/k/pkg, therefore we need to override the -# restrictions from the parent .import-restrictions file. -# -# At some point it may become useful to also check this package's -# dependencies more careful. -rules: - - selectorRegexp: "^k8s[.]io/kubernetes/pkg" - allowedPrefixes: [] - - - selectorRegexp: "" - allowedPrefixes: [ "" ] diff --git a/test/e2e/framework/network/utils.go b/test/e2e/framework/network/utils.go index a2febeab5f8..2b7778e7d0e 100644 --- a/test/e2e/framework/network/utils.go +++ b/test/e2e/framework/network/utils.go @@ -45,7 +45,6 @@ import ( e2epod "k8s.io/kubernetes/test/e2e/framework/pod" e2epodoutput "k8s.io/kubernetes/test/e2e/framework/pod/output" e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper" - e2essh "k8s.io/kubernetes/test/e2e/framework/ssh" imageutils "k8s.io/kubernetes/test/utils/image" netutils "k8s.io/utils/net" ) @@ -1099,101 +1098,6 @@ func httpGetNoConnectionPoolTimeout(url string, timeout time.Duration) (*http.Re return client.Get(url) } -// TestUnderTemporaryNetworkFailure blocks outgoing network traffic on 'node'. Then runs testFunc and returns its status. -// At the end (even in case of errors), the network traffic is brought back to normal. -// This function executes commands on a node so it will work only for some -// environments. -func TestUnderTemporaryNetworkFailure(ctx context.Context, c clientset.Interface, ns string, node *v1.Node, testFunc func(ctx context.Context)) { - host, err := e2enode.GetSSHExternalIP(node) - if err != nil { - framework.Failf("Error getting node external ip : %v", err) - } - controlPlaneAddresses := framework.GetControlPlaneAddresses(ctx, c) - ginkgo.By(fmt.Sprintf("block network traffic from node %s to the control plane", node.Name)) - defer func() { - // This code will execute even if setting the iptables rule failed. - // It is on purpose because we may have an error even if the new rule - // had been inserted. (yes, we could look at the error code and ssh error - // separately, but I prefer to stay on the safe side). - ginkgo.By(fmt.Sprintf("Unblock network traffic from node %s to the control plane", node.Name)) - for _, instanceAddress := range controlPlaneAddresses { - UnblockNetwork(ctx, host, instanceAddress) - } - }() - - framework.Logf("Waiting %v to ensure node %s is ready before beginning test...", resizeNodeReadyTimeout, node.Name) - if !e2enode.WaitConditionToBe(ctx, c, node.Name, v1.NodeReady, true, resizeNodeReadyTimeout) { - framework.Failf("Node %s did not become ready within %v", node.Name, resizeNodeReadyTimeout) - } - for _, instanceAddress := range controlPlaneAddresses { - BlockNetwork(ctx, host, instanceAddress) - } - - framework.Logf("Waiting %v for node %s to be not ready after simulated network failure", resizeNodeNotReadyTimeout, node.Name) - if !e2enode.WaitConditionToBe(ctx, c, node.Name, v1.NodeReady, false, resizeNodeNotReadyTimeout) { - framework.Failf("Node %s did not become not-ready within %v", node.Name, resizeNodeNotReadyTimeout) - } - - testFunc(ctx) - // network traffic is unblocked in a deferred function -} - -// BlockNetwork blocks network between the given from value and the given to value. -// The following helper functions can block/unblock network from source -// host to destination host by manipulating iptable rules. -// This function assumes it can ssh to the source host. -// -// Caution: -// Recommend to input IP instead of hostnames. Using hostnames will cause iptables to -// do a DNS lookup to resolve the name to an IP address, which will -// slow down the test and cause it to fail if DNS is absent or broken. -// -// Suggested usage pattern: -// -// func foo() { -// ... -// defer UnblockNetwork(from, to) -// BlockNetwork(from, to) -// ... -// } -func BlockNetwork(ctx context.Context, from string, to string) { - framework.Logf("block network traffic from %s to %s", from, to) - iptablesRule := fmt.Sprintf("OUTPUT --destination %s --jump REJECT", to) - dropCmd := fmt.Sprintf("sudo iptables --insert %s", iptablesRule) - if result, err := e2essh.SSH(ctx, dropCmd, from, framework.TestContext.Provider); result.Code != 0 || err != nil { - e2essh.LogResult(result) - framework.Failf("Unexpected error: %v", err) - } -} - -// UnblockNetwork unblocks network between the given from value and the given to value. -func UnblockNetwork(ctx context.Context, from string, to string) { - framework.Logf("Unblock network traffic from %s to %s", from, to) - iptablesRule := fmt.Sprintf("OUTPUT --destination %s --jump REJECT", to) - undropCmd := fmt.Sprintf("sudo iptables --delete %s", iptablesRule) - // Undrop command may fail if the rule has never been created. - // In such case we just lose 30 seconds, but the cluster is healthy. - // But if the rule had been created and removing it failed, the node is broken and - // not coming back. Subsequent tests will run or fewer nodes (some of the tests - // may fail). Manual intervention is required in such case (recreating the - // cluster solves the problem too). - err := wait.PollUntilContextTimeout(ctx, time.Millisecond*100, time.Second*30, false, func(ctx context.Context) (bool, error) { - result, err := e2essh.SSH(ctx, undropCmd, from, framework.TestContext.Provider) - if result.Code == 0 && err == nil { - return true, nil - } - e2essh.LogResult(result) - if err != nil { - framework.Logf("Unexpected error: %v", err) - } - return false, nil - }) - if err != nil { - framework.Failf("Failed to remove the iptable REJECT rule. Manual intervention is "+ - "required on host %s: remove rule %s, if exists", from, iptablesRule) - } -} - // WaitForService waits until the service appears (exist == true), or disappears (exist == false) func WaitForService(ctx context.Context, c clientset.Interface, namespace, name string, exist bool, interval, timeout time.Duration) error { err := wait.PollUntilContextTimeout(ctx, interval, timeout, true, func(ctx context.Context) (bool, error) { diff --git a/test/e2e/framework/service/jig.go b/test/e2e/framework/service/jig.go index ef8ac4a1421..2685ead4f92 100644 --- a/test/e2e/framework/service/jig.go +++ b/test/e2e/framework/service/jig.go @@ -33,17 +33,13 @@ import ( policyv1 "k8s.io/api/policy/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" - "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/intstr" utilnet "k8s.io/apimachinery/pkg/util/net" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/apimachinery/pkg/watch" clientset "k8s.io/client-go/kubernetes" - "k8s.io/client-go/tools/cache" "k8s.io/kubernetes/test/e2e/framework" e2enode "k8s.io/kubernetes/test/e2e/framework/node" e2epod "k8s.io/kubernetes/test/e2e/framework/pod" @@ -404,42 +400,47 @@ func (j *TestJig) GetEndpointNodeNames(ctx context.Context) (sets.String, error) if err != nil { return nil, err } - endpoints, err := j.Client.CoreV1().Endpoints(j.Namespace).Get(ctx, j.Name, metav1.GetOptions{}) + slices, err := j.Client.DiscoveryV1().EndpointSlices(j.Namespace).List(ctx, metav1.ListOptions{LabelSelector: discoveryv1.LabelServiceName + "=" + j.Name}) if err != nil { - return nil, fmt.Errorf("get endpoints for service %s/%s failed (%s)", j.Namespace, j.Name, err) - } - if len(endpoints.Subsets) == 0 { - return nil, fmt.Errorf("endpoint has no subsets, cannot determine node addresses") + return nil, fmt.Errorf("list endpointslices for service %s/%s failed (%w)", j.Namespace, j.Name, err) } epNodes := sets.NewString() - for _, ss := range endpoints.Subsets { - for _, e := range ss.Addresses { - if e.NodeName != nil { - epNodes.Insert(*e.NodeName) + for _, slice := range slices.Items { + for _, ep := range slice.Endpoints { + if ep.NodeName != nil { + epNodes.Insert(*ep.NodeName) } } } + if len(epNodes) == 0 { + return nil, fmt.Errorf("EndpointSlice has no endpoints, cannot determine node addresses") + } return epNodes, nil } -// WaitForEndpointOnNode waits for a service endpoint on the given node. +// WaitForEndpointOnNode waits for a service endpoint on the given node (which must be the service's only endpoint). func (j *TestJig) WaitForEndpointOnNode(ctx context.Context, nodeName string) error { return wait.PollUntilContextTimeout(ctx, framework.Poll, KubeProxyLagTimeout, true, func(ctx context.Context) (bool, error) { - endpoints, err := j.Client.CoreV1().Endpoints(j.Namespace).Get(ctx, j.Name, metav1.GetOptions{}) + slices, err := j.Client.DiscoveryV1().EndpointSlices(j.Namespace).List(ctx, metav1.ListOptions{LabelSelector: "kubernetes.io/service-name=" + j.Name}) if err != nil { - framework.Logf("Get endpoints for service %s/%s failed (%s)", j.Namespace, j.Name, err) + framework.Logf("List endpointslices for service %s/%s failed (%s)", j.Namespace, j.Name, err) return false, nil } - if len(endpoints.Subsets) == 0 { - framework.Logf("Expect endpoints with subsets, got none.") + if len(slices.Items) == 0 { + framework.Logf("Expected 1 EndpointSlice for service %s/%s, got 0", j.Namespace, j.Name) return false, nil } - // TODO: Handle multiple endpoints - if len(endpoints.Subsets[0].Addresses) == 0 { + slice := slices.Items[0] + if len(slice.Endpoints) == 0 { + framework.Logf("Expected EndpointSlice with Endpoints, got none.") + return false, nil + } + endpoint := slice.Endpoints[0] + if len(endpoint.Addresses) == 0 || (endpoint.Conditions.Ready != nil && !*endpoint.Conditions.Ready) { framework.Logf("Expected Ready endpoints - found none") return false, nil } - epHostName := *endpoints.Subsets[0].Addresses[0].NodeName + epHostName := *endpoint.NodeName framework.Logf("Pod for service %s/%s is on node %s", j.Namespace, j.Name, epHostName) if epHostName != nodeName { framework.Logf("Found endpoint on wrong node, expected %v, got %v", nodeName, epHostName) @@ -451,91 +452,18 @@ func (j *TestJig) WaitForEndpointOnNode(ctx context.Context, nodeName string) er // waitForAvailableEndpoint waits for at least 1 endpoint to be available till timeout func (j *TestJig) waitForAvailableEndpoint(ctx context.Context, timeout time.Duration) error { - ctx, cancel := context.WithTimeout(ctx, timeout) - defer cancel() - //Wait for endpoints to be created, this may take longer time if service backing pods are taking longer time to run - endpointSelector := fields.OneTermEqualSelector("metadata.name", j.Name) - endpointAvailable := false - endpointSliceAvailable := false - - var controller cache.Controller - _, controller = cache.NewInformer( - &cache.ListWatch{ - ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { - options.FieldSelector = endpointSelector.String() - obj, err := j.Client.CoreV1().Endpoints(j.Namespace).List(ctx, options) - return runtime.Object(obj), err - }, - WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { - options.FieldSelector = endpointSelector.String() - return j.Client.CoreV1().Endpoints(j.Namespace).Watch(ctx, options) - }, - }, - &v1.Endpoints{}, - 0, - cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - if e, ok := obj.(*v1.Endpoints); ok { - if len(e.Subsets) > 0 && len(e.Subsets[0].Addresses) > 0 { - endpointAvailable = true - } - } - }, - UpdateFunc: func(old, cur interface{}) { - if e, ok := cur.(*v1.Endpoints); ok { - if len(e.Subsets) > 0 && len(e.Subsets[0].Addresses) > 0 { - endpointAvailable = true - } - } - }, - }, - ) - - go controller.Run(ctx.Done()) - - var esController cache.Controller - _, esController = cache.NewInformer( - &cache.ListWatch{ - ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { - options.LabelSelector = "kubernetes.io/service-name=" + j.Name - obj, err := j.Client.DiscoveryV1().EndpointSlices(j.Namespace).List(ctx, options) - return runtime.Object(obj), err - }, - WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { - options.LabelSelector = "kubernetes.io/service-name=" + j.Name - return j.Client.DiscoveryV1().EndpointSlices(j.Namespace).Watch(ctx, options) - }, - }, - &discoveryv1.EndpointSlice{}, - 0, - cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - if es, ok := obj.(*discoveryv1.EndpointSlice); ok { - // TODO: currently we only consider addresses in 1 slice, but services with - // a large number of endpoints (>1000) may have multiple slices. Some slices - // with only a few addresses. We should check the addresses in all slices. - if len(es.Endpoints) > 0 && len(es.Endpoints[0].Addresses) > 0 { - endpointSliceAvailable = true - } - } - }, - UpdateFunc: func(old, cur interface{}) { - if es, ok := cur.(*discoveryv1.EndpointSlice); ok { - // TODO: currently we only consider addresses in 1 slice, but services with - // a large number of endpoints (>1000) may have multiple slices. Some slices - // with only a few addresses. We should check the addresses in all slices. - if len(es.Endpoints) > 0 && len(es.Endpoints[0].Addresses) > 0 { - endpointSliceAvailable = true - } - } - }, - }, - ) - - go esController.Run(ctx.Done()) - - err := wait.PollUntilContextCancel(ctx, 1*time.Second, false, func(ctx context.Context) (bool, error) { - return endpointAvailable && endpointSliceAvailable, nil + err := wait.PollUntilContextTimeout(ctx, framework.Poll, timeout, true, func(ctx context.Context) (bool, error) { + slices, err := j.Client.DiscoveryV1().EndpointSlices(j.Namespace).List(ctx, metav1.ListOptions{LabelSelector: "kubernetes.io/service-name=" + j.Name}) + if err != nil || len(slices.Items) == 0 { + // Retry + return false, nil + } + for _, es := range slices.Items { + if len(es.Endpoints) > 0 && len(es.Endpoints[0].Addresses) > 0 { + return true, nil + } + } + return false, nil }) if err != nil { return fmt.Errorf("no subset of available IP address found for the endpoint %s within timeout %v", j.Name, timeout) diff --git a/test/e2e/framework/util.go b/test/e2e/framework/util.go index b727fbe7322..7eed8e36751 100644 --- a/test/e2e/framework/util.go +++ b/test/e2e/framework/util.go @@ -53,12 +53,6 @@ import ( "k8s.io/client-go/tools/clientcmd" clientcmdapi "k8s.io/client-go/tools/clientcmd/api" watchtools "k8s.io/client-go/tools/watch" - netutils "k8s.io/utils/net" -) - -const ( - // TODO(justinsb): Avoid hardcoding this. - awsMasterIP = "172.20.0.9" ) // DEPRECATED constants. Use the timeouts in framework.Framework instead. @@ -416,29 +410,12 @@ func CheckTestingNSDeletedExcept(ctx context.Context, c clientset.Interface, ski return fmt.Errorf("Waiting for terminating namespaces to be deleted timed out") } -// WaitForServiceEndpointsNum waits until the amount of endpoints that implement service to expectNum. -// Some components use EndpointSlices other Endpoints, we must verify that both objects meet the requirements. +// WaitForServiceEndpointsNum waits until there are EndpointSlices for serviceName +// containing a total of expectNum endpoints. (If the service is dual-stack, expectNum +// must count the endpoints of both IP families.) func WaitForServiceEndpointsNum(ctx context.Context, c clientset.Interface, namespace, serviceName string, expectNum int, interval, timeout time.Duration) error { return wait.PollUntilContextTimeout(ctx, interval, timeout, false, func(ctx context.Context) (bool, error) { Logf("Waiting for amount of service:%s endpoints to be %d", serviceName, expectNum) - endpoint, err := c.CoreV1().Endpoints(namespace).Get(ctx, serviceName, metav1.GetOptions{}) - if err != nil { - Logf("Unexpected error trying to get Endpoints for %s : %v", serviceName, err) - return false, nil - } - - if countEndpointsNum(endpoint) != expectNum { - Logf("Unexpected number of Endpoints, got %d, expected %d", countEndpointsNum(endpoint), expectNum) - return false, nil - } - - // Endpoints are single family but EndpointSlices can have dual stack addresses, - // so we verify the number of addresses that matches the same family on both. - addressType := discoveryv1.AddressTypeIPv4 - if isIPv6Endpoint(endpoint) { - addressType = discoveryv1.AddressTypeIPv6 - } - esList, err := c.DiscoveryV1().EndpointSlices(namespace).List(ctx, metav1.ListOptions{LabelSelector: fmt.Sprintf("%s=%s", discoveryv1.LabelServiceName, serviceName)}) if err != nil { Logf("Unexpected error trying to get EndpointSlices for %s : %v", serviceName, err) @@ -450,44 +427,18 @@ func WaitForServiceEndpointsNum(ctx context.Context, c clientset.Interface, name return false, nil } - if countEndpointsSlicesNum(esList, addressType) != expectNum { - Logf("Unexpected number of Endpoints on Slices, got %d, expected %d", countEndpointsSlicesNum(esList, addressType), expectNum) + if countEndpointsSlicesNum(esList) != expectNum { + Logf("Unexpected number of Endpoints on Slices, got %d, expected %d", countEndpointsSlicesNum(esList), expectNum) return false, nil } return true, nil }) } -func countEndpointsNum(e *v1.Endpoints) int { - num := 0 - for _, sub := range e.Subsets { - num += len(sub.Addresses) - } - return num -} - -// isIPv6Endpoint returns true if the Endpoint uses IPv6 addresses -func isIPv6Endpoint(e *v1.Endpoints) bool { - for _, sub := range e.Subsets { - for _, addr := range sub.Addresses { - if len(addr.IP) == 0 { - continue - } - // Endpoints are single family, so it is enough to check only one address - return netutils.IsIPv6String(addr.IP) - } - } - // default to IPv4 an Endpoint without IP addresses - return false -} - -func countEndpointsSlicesNum(epList *discoveryv1.EndpointSliceList, addressType discoveryv1.AddressType) int { +func countEndpointsSlicesNum(epList *discoveryv1.EndpointSliceList) int { // EndpointSlices can contain the same address on multiple Slices addresses := sets.Set[string]{} for _, epSlice := range epList.Items { - if epSlice.AddressType != addressType { - continue - } for _, ep := range epSlice.Endpoints { if len(ep.Addresses) > 0 { addresses.Insert(ep.Addresses[0]) @@ -678,38 +629,6 @@ func GetNodeExternalIPs(node *v1.Node) (ips []string) { return } -// getControlPlaneAddresses returns the externalIP, internalIP and hostname fields of control plane nodes. -// If any of these is unavailable, empty slices are returned. -func getControlPlaneAddresses(ctx context.Context, c clientset.Interface) ([]string, []string, []string) { - var externalIPs, internalIPs, hostnames []string - - // Populate the internal IPs. - eps, err := c.CoreV1().Endpoints(metav1.NamespaceDefault).Get(ctx, "kubernetes", metav1.GetOptions{}) - if err != nil { - Failf("Failed to get kubernetes endpoints: %v", err) - } - for _, subset := range eps.Subsets { - for _, address := range subset.Addresses { - if address.IP != "" { - internalIPs = append(internalIPs, address.IP) - } - } - } - - // Populate the external IP/hostname. - hostURL, err := url.Parse(TestContext.Host) - if err != nil { - Failf("Failed to parse hostname: %v", err) - } - if netutils.ParseIPSloppy(hostURL.Host) != nil { - externalIPs = append(externalIPs, hostURL.Host) - } else { - hostnames = append(hostnames, hostURL.Host) - } - - return externalIPs, internalIPs, hostnames -} - // GetControlPlaneNodes returns a list of control plane nodes func GetControlPlaneNodes(ctx context.Context, c clientset.Interface) *v1.NodeList { allNodes, err := c.CoreV1().Nodes().List(ctx, metav1.ListOptions{}) @@ -737,30 +656,6 @@ func GetControlPlaneNodes(ctx context.Context, c clientset.Interface) *v1.NodeLi return &cpNodes } -// GetControlPlaneAddresses returns all IP addresses on which the kubelet can reach the control plane. -// It may return internal and external IPs, even if we expect for -// e.g. internal IPs to be used (issue #56787), so that we can be -// sure to block the control plane fully during tests. -func GetControlPlaneAddresses(ctx context.Context, c clientset.Interface) []string { - externalIPs, internalIPs, _ := getControlPlaneAddresses(ctx, c) - - ips := sets.NewString() - switch TestContext.Provider { - case "gce": - for _, ip := range externalIPs { - ips.Insert(ip) - } - for _, ip := range internalIPs { - ips.Insert(ip) - } - case "aws": - ips.Insert(awsMasterIP) - default: - Failf("This test is not supported for provider %s and should be disabled", TestContext.Provider) - } - return ips.List() -} - // PrettyPrintJSON converts metrics to JSON format. func PrettyPrintJSON(metrics interface{}) string { output := &bytes.Buffer{} diff --git a/test/e2e/kubectl/kubectl.go b/test/e2e/kubectl/kubectl.go index 25c90a67444..3994771a021 100644 --- a/test/e2e/kubectl/kubectl.go +++ b/test/e2e/kubectl/kubectl.go @@ -43,6 +43,7 @@ import ( "sigs.k8s.io/yaml" v1 "k8s.io/api/core/v1" + discoveryv1 "k8s.io/api/discovery/v1" rbacv1 "k8s.io/api/rbac/v1" apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -66,7 +67,7 @@ import ( "k8s.io/kubernetes/test/e2e/framework" e2eauth "k8s.io/kubernetes/test/e2e/framework/auth" e2edebug "k8s.io/kubernetes/test/e2e/framework/debug" - e2eendpoints "k8s.io/kubernetes/test/e2e/framework/endpoints" + e2eendpointslice "k8s.io/kubernetes/test/e2e/framework/endpointslice" e2ekubectl "k8s.io/kubernetes/test/e2e/framework/kubectl" e2enetwork "k8s.io/kubernetes/test/e2e/framework/network" e2enode "k8s.io/kubernetes/test/e2e/framework/node" @@ -1538,10 +1539,10 @@ metadata: }) validateService := func(name string, servicePort int, timeout time.Duration) { err := wait.Poll(framework.Poll, timeout, func() (bool, error) { - ep, err := c.CoreV1().Endpoints(ns).Get(ctx, name, metav1.GetOptions{}) + slices, err := c.DiscoveryV1().EndpointSlices(ns).List(ctx, metav1.ListOptions{LabelSelector: fmt.Sprintf("%s=%s", discoveryv1.LabelServiceName, name)}) if err != nil { // log the real error - framework.Logf("Get endpoints failed (interval %v): %v", framework.Poll, err) + framework.Logf("List endpointslices failed (interval %v): %v", framework.Poll, err) // if the error is API not found or could not find default credentials or TLS handshake timeout, try again if apierrors.IsNotFound(err) || @@ -1552,7 +1553,7 @@ metadata: return false, err } - uidToPort := e2eendpoints.GetContainerPortsByPodUID(ep) + uidToPort := e2eendpointslice.GetContainerPortsByPodUID(slices.Items) if len(uidToPort) == 0 { framework.Logf("No endpoint found, retrying") return false, nil diff --git a/test/e2e/framework/endpoints/ports.go b/test/e2e/network/endpoints.go similarity index 71% rename from test/e2e/framework/endpoints/ports.go rename to test/e2e/network/endpoints.go index 8c116b031a5..29eb72afea1 100644 --- a/test/e2e/framework/endpoints/ports.go +++ b/test/e2e/network/endpoints.go @@ -14,22 +14,15 @@ See the License for the specific language governing permissions and limitations under the License. */ -package endpoints +package network import ( v1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/types" ) -// PortsByPodUID is a map that maps pod UID to container ports. -type PortsByPodUID map[types.UID][]int - -// FullPortsByPodUID is a map that maps pod UID to container ports. -type FullPortsByPodUID map[types.UID][]v1.ContainerPort - -// GetContainerPortsByPodUID returns a PortsByPodUID map on the given endpoints. -func GetContainerPortsByPodUID(ep *v1.Endpoints) PortsByPodUID { - m := PortsByPodUID{} +// getContainerPortsByPodUID returns a portsByPodUID map on the given endpoints. +func getContainerPortsByPodUID(ep *v1.Endpoints) portsByPodUID { + m := portsByPodUID{} for _, ss := range ep.Subsets { for _, port := range ss.Ports { for _, addr := range ss.Addresses { @@ -44,9 +37,9 @@ func GetContainerPortsByPodUID(ep *v1.Endpoints) PortsByPodUID { return m } -// GetFullContainerPortsByPodUID returns a FullPortsByPodUID map on the given endpoints with all the port data. -func GetFullContainerPortsByPodUID(ep *v1.Endpoints) FullPortsByPodUID { - m := FullPortsByPodUID{} +// getFullContainerPortsByPodUID returns a fullPortsByPodUID map on the given endpoints with all the port data. +func getFullContainerPortsByPodUID(ep *v1.Endpoints) fullPortsByPodUID { + m := fullPortsByPodUID{} for _, ss := range ep.Subsets { for _, port := range ss.Ports { containerPort := v1.ContainerPort{ diff --git a/test/e2e/network/service.go b/test/e2e/network/service.go index 8935b0e3b68..37f401b4b0a 100644 --- a/test/e2e/network/service.go +++ b/test/e2e/network/service.go @@ -59,7 +59,6 @@ import ( podutil "k8s.io/kubernetes/pkg/api/v1/pod" "k8s.io/kubernetes/test/e2e/framework" e2edeployment "k8s.io/kubernetes/test/e2e/framework/deployment" - e2eendpoints "k8s.io/kubernetes/test/e2e/framework/endpoints" e2eendpointslice "k8s.io/kubernetes/test/e2e/framework/endpointslice" e2enetwork "k8s.io/kubernetes/test/e2e/framework/network" e2enode "k8s.io/kubernetes/test/e2e/framework/node" @@ -4591,7 +4590,7 @@ func validateEndpointsPortsOrFail(ctx context.Context, c clientset.Interface, na // Retry the error return false, nil } - portsByUID := portsByPodUID(e2eendpoints.GetContainerPortsByPodUID(ep)) + portsByUID := getContainerPortsByPodUID(ep) if err := validatePorts(portsByUID, expectedPortsByPodUID); err != nil { if i%5 == 0 { framework.Logf("Unexpected endpoints: found %v, expected %v, will retry", portsByUID, expectedEndpoints) @@ -4675,7 +4674,7 @@ func validateEndpointsPortsWithProtocolsOrFail(c clientset.Interface, namespace, // Retry the error return false, nil } - portsByUID := fullPortsByPodUID(e2eendpoints.GetFullContainerPortsByPodUID(ep)) + portsByUID := getFullContainerPortsByPodUID(ep) if err := validatePortsAndProtocols(portsByUID, expectedPortsByPodUID); err != nil { if i%5 == 0 { framework.Logf("Unexpected endpoints: found %v, expected %v, will retry", portsByUID, expectedEndpoints)