Merge pull request #128896 from danwinship/e2e-endpoints

Remove all references to v1.Endpoints from non-network e2e tests
This commit is contained in:
Kubernetes Prow Robot 2025-01-21 00:08:38 -08:00 committed by GitHub
commit eed4930b31
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 57 additions and 348 deletions

View File

@ -28,6 +28,7 @@ import (
appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
discoveryv1 "k8s.io/api/discovery/v1"
rbacv1 "k8s.io/api/rbac/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -763,9 +764,9 @@ func validateErrorWithDebugInfo(ctx context.Context, f *framework.Framework, err
msg := fmt.Sprintf(msg, fields...)
msg += fmt.Sprintf(" but received unexpected error:\n%v", err)
client := f.ClientSet
ep, err := client.CoreV1().Endpoints(namespace).Get(ctx, "sample-api", metav1.GetOptions{})
slices, err := client.DiscoveryV1().EndpointSlices(namespace).List(ctx, metav1.ListOptions{LabelSelector: fmt.Sprintf("%s=%s", discoveryv1.LabelServiceName, "sample-api")})
if err == nil {
msg += fmt.Sprintf("\nFound endpoints for sample-api:\n%v", ep)
msg += fmt.Sprintf("\nFound endpoint slices for sample-api:\n%v", slices)
}
pds, err := client.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{})
if err == nil {

View File

@ -1,12 +0,0 @@
# This E2E framework sub-package is currently allowed to use arbitrary
# dependencies except of k/k/pkg, therefore we need to override the
# restrictions from the parent .import-restrictions file.
#
# At some point it may become useful to also check this package's
# dependencies more careful.
rules:
- selectorRegexp: "^k8s[.]io/kubernetes/pkg"
allowedPrefixes: []
- selectorRegexp: ""
allowedPrefixes: [ "" ]

View File

@ -45,7 +45,6 @@ import (
e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
e2epodoutput "k8s.io/kubernetes/test/e2e/framework/pod/output"
e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper"
e2essh "k8s.io/kubernetes/test/e2e/framework/ssh"
imageutils "k8s.io/kubernetes/test/utils/image"
netutils "k8s.io/utils/net"
)
@ -1099,101 +1098,6 @@ func httpGetNoConnectionPoolTimeout(url string, timeout time.Duration) (*http.Re
return client.Get(url)
}
// TestUnderTemporaryNetworkFailure blocks outgoing network traffic on 'node'. Then runs testFunc and returns its status.
// At the end (even in case of errors), the network traffic is brought back to normal.
// This function executes commands on a node so it will work only for some
// environments.
func TestUnderTemporaryNetworkFailure(ctx context.Context, c clientset.Interface, ns string, node *v1.Node, testFunc func(ctx context.Context)) {
host, err := e2enode.GetSSHExternalIP(node)
if err != nil {
framework.Failf("Error getting node external ip : %v", err)
}
controlPlaneAddresses := framework.GetControlPlaneAddresses(ctx, c)
ginkgo.By(fmt.Sprintf("block network traffic from node %s to the control plane", node.Name))
defer func() {
// This code will execute even if setting the iptables rule failed.
// It is on purpose because we may have an error even if the new rule
// had been inserted. (yes, we could look at the error code and ssh error
// separately, but I prefer to stay on the safe side).
ginkgo.By(fmt.Sprintf("Unblock network traffic from node %s to the control plane", node.Name))
for _, instanceAddress := range controlPlaneAddresses {
UnblockNetwork(ctx, host, instanceAddress)
}
}()
framework.Logf("Waiting %v to ensure node %s is ready before beginning test...", resizeNodeReadyTimeout, node.Name)
if !e2enode.WaitConditionToBe(ctx, c, node.Name, v1.NodeReady, true, resizeNodeReadyTimeout) {
framework.Failf("Node %s did not become ready within %v", node.Name, resizeNodeReadyTimeout)
}
for _, instanceAddress := range controlPlaneAddresses {
BlockNetwork(ctx, host, instanceAddress)
}
framework.Logf("Waiting %v for node %s to be not ready after simulated network failure", resizeNodeNotReadyTimeout, node.Name)
if !e2enode.WaitConditionToBe(ctx, c, node.Name, v1.NodeReady, false, resizeNodeNotReadyTimeout) {
framework.Failf("Node %s did not become not-ready within %v", node.Name, resizeNodeNotReadyTimeout)
}
testFunc(ctx)
// network traffic is unblocked in a deferred function
}
// BlockNetwork blocks network between the given from value and the given to value.
// The following helper functions can block/unblock network from source
// host to destination host by manipulating iptable rules.
// This function assumes it can ssh to the source host.
//
// Caution:
// Recommend to input IP instead of hostnames. Using hostnames will cause iptables to
// do a DNS lookup to resolve the name to an IP address, which will
// slow down the test and cause it to fail if DNS is absent or broken.
//
// Suggested usage pattern:
//
// func foo() {
// ...
// defer UnblockNetwork(from, to)
// BlockNetwork(from, to)
// ...
// }
func BlockNetwork(ctx context.Context, from string, to string) {
framework.Logf("block network traffic from %s to %s", from, to)
iptablesRule := fmt.Sprintf("OUTPUT --destination %s --jump REJECT", to)
dropCmd := fmt.Sprintf("sudo iptables --insert %s", iptablesRule)
if result, err := e2essh.SSH(ctx, dropCmd, from, framework.TestContext.Provider); result.Code != 0 || err != nil {
e2essh.LogResult(result)
framework.Failf("Unexpected error: %v", err)
}
}
// UnblockNetwork unblocks network between the given from value and the given to value.
func UnblockNetwork(ctx context.Context, from string, to string) {
framework.Logf("Unblock network traffic from %s to %s", from, to)
iptablesRule := fmt.Sprintf("OUTPUT --destination %s --jump REJECT", to)
undropCmd := fmt.Sprintf("sudo iptables --delete %s", iptablesRule)
// Undrop command may fail if the rule has never been created.
// In such case we just lose 30 seconds, but the cluster is healthy.
// But if the rule had been created and removing it failed, the node is broken and
// not coming back. Subsequent tests will run or fewer nodes (some of the tests
// may fail). Manual intervention is required in such case (recreating the
// cluster solves the problem too).
err := wait.PollUntilContextTimeout(ctx, time.Millisecond*100, time.Second*30, false, func(ctx context.Context) (bool, error) {
result, err := e2essh.SSH(ctx, undropCmd, from, framework.TestContext.Provider)
if result.Code == 0 && err == nil {
return true, nil
}
e2essh.LogResult(result)
if err != nil {
framework.Logf("Unexpected error: %v", err)
}
return false, nil
})
if err != nil {
framework.Failf("Failed to remove the iptable REJECT rule. Manual intervention is "+
"required on host %s: remove rule %s, if exists", from, iptablesRule)
}
}
// WaitForService waits until the service appears (exist == true), or disappears (exist == false)
func WaitForService(ctx context.Context, c clientset.Interface, namespace, name string, exist bool, interval, timeout time.Duration) error {
err := wait.PollUntilContextTimeout(ctx, interval, timeout, true, func(ctx context.Context) (bool, error) {

View File

@ -33,17 +33,13 @@ import (
policyv1 "k8s.io/api/policy/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/intstr"
utilnet "k8s.io/apimachinery/pkg/util/net"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/kubernetes/test/e2e/framework"
e2enode "k8s.io/kubernetes/test/e2e/framework/node"
e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
@ -404,42 +400,47 @@ func (j *TestJig) GetEndpointNodeNames(ctx context.Context) (sets.String, error)
if err != nil {
return nil, err
}
endpoints, err := j.Client.CoreV1().Endpoints(j.Namespace).Get(ctx, j.Name, metav1.GetOptions{})
slices, err := j.Client.DiscoveryV1().EndpointSlices(j.Namespace).List(ctx, metav1.ListOptions{LabelSelector: discoveryv1.LabelServiceName + "=" + j.Name})
if err != nil {
return nil, fmt.Errorf("get endpoints for service %s/%s failed (%s)", j.Namespace, j.Name, err)
}
if len(endpoints.Subsets) == 0 {
return nil, fmt.Errorf("endpoint has no subsets, cannot determine node addresses")
return nil, fmt.Errorf("list endpointslices for service %s/%s failed (%w)", j.Namespace, j.Name, err)
}
epNodes := sets.NewString()
for _, ss := range endpoints.Subsets {
for _, e := range ss.Addresses {
if e.NodeName != nil {
epNodes.Insert(*e.NodeName)
for _, slice := range slices.Items {
for _, ep := range slice.Endpoints {
if ep.NodeName != nil {
epNodes.Insert(*ep.NodeName)
}
}
}
if len(epNodes) == 0 {
return nil, fmt.Errorf("EndpointSlice has no endpoints, cannot determine node addresses")
}
return epNodes, nil
}
// WaitForEndpointOnNode waits for a service endpoint on the given node.
// WaitForEndpointOnNode waits for a service endpoint on the given node (which must be the service's only endpoint).
func (j *TestJig) WaitForEndpointOnNode(ctx context.Context, nodeName string) error {
return wait.PollUntilContextTimeout(ctx, framework.Poll, KubeProxyLagTimeout, true, func(ctx context.Context) (bool, error) {
endpoints, err := j.Client.CoreV1().Endpoints(j.Namespace).Get(ctx, j.Name, metav1.GetOptions{})
slices, err := j.Client.DiscoveryV1().EndpointSlices(j.Namespace).List(ctx, metav1.ListOptions{LabelSelector: "kubernetes.io/service-name=" + j.Name})
if err != nil {
framework.Logf("Get endpoints for service %s/%s failed (%s)", j.Namespace, j.Name, err)
framework.Logf("List endpointslices for service %s/%s failed (%s)", j.Namespace, j.Name, err)
return false, nil
}
if len(endpoints.Subsets) == 0 {
framework.Logf("Expect endpoints with subsets, got none.")
if len(slices.Items) == 0 {
framework.Logf("Expected 1 EndpointSlice for service %s/%s, got 0", j.Namespace, j.Name)
return false, nil
}
// TODO: Handle multiple endpoints
if len(endpoints.Subsets[0].Addresses) == 0 {
slice := slices.Items[0]
if len(slice.Endpoints) == 0 {
framework.Logf("Expected EndpointSlice with Endpoints, got none.")
return false, nil
}
endpoint := slice.Endpoints[0]
if len(endpoint.Addresses) == 0 || (endpoint.Conditions.Ready != nil && !*endpoint.Conditions.Ready) {
framework.Logf("Expected Ready endpoints - found none")
return false, nil
}
epHostName := *endpoints.Subsets[0].Addresses[0].NodeName
epHostName := *endpoint.NodeName
framework.Logf("Pod for service %s/%s is on node %s", j.Namespace, j.Name, epHostName)
if epHostName != nodeName {
framework.Logf("Found endpoint on wrong node, expected %v, got %v", nodeName, epHostName)
@ -451,91 +452,18 @@ func (j *TestJig) WaitForEndpointOnNode(ctx context.Context, nodeName string) er
// waitForAvailableEndpoint waits for at least 1 endpoint to be available till timeout
func (j *TestJig) waitForAvailableEndpoint(ctx context.Context, timeout time.Duration) error {
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
//Wait for endpoints to be created, this may take longer time if service backing pods are taking longer time to run
endpointSelector := fields.OneTermEqualSelector("metadata.name", j.Name)
endpointAvailable := false
endpointSliceAvailable := false
var controller cache.Controller
_, controller = cache.NewInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
options.FieldSelector = endpointSelector.String()
obj, err := j.Client.CoreV1().Endpoints(j.Namespace).List(ctx, options)
return runtime.Object(obj), err
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
options.FieldSelector = endpointSelector.String()
return j.Client.CoreV1().Endpoints(j.Namespace).Watch(ctx, options)
},
},
&v1.Endpoints{},
0,
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
if e, ok := obj.(*v1.Endpoints); ok {
if len(e.Subsets) > 0 && len(e.Subsets[0].Addresses) > 0 {
endpointAvailable = true
err := wait.PollUntilContextTimeout(ctx, framework.Poll, timeout, true, func(ctx context.Context) (bool, error) {
slices, err := j.Client.DiscoveryV1().EndpointSlices(j.Namespace).List(ctx, metav1.ListOptions{LabelSelector: "kubernetes.io/service-name=" + j.Name})
if err != nil || len(slices.Items) == 0 {
// Retry
return false, nil
}
}
},
UpdateFunc: func(old, cur interface{}) {
if e, ok := cur.(*v1.Endpoints); ok {
if len(e.Subsets) > 0 && len(e.Subsets[0].Addresses) > 0 {
endpointAvailable = true
}
}
},
},
)
go controller.Run(ctx.Done())
var esController cache.Controller
_, esController = cache.NewInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
options.LabelSelector = "kubernetes.io/service-name=" + j.Name
obj, err := j.Client.DiscoveryV1().EndpointSlices(j.Namespace).List(ctx, options)
return runtime.Object(obj), err
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
options.LabelSelector = "kubernetes.io/service-name=" + j.Name
return j.Client.DiscoveryV1().EndpointSlices(j.Namespace).Watch(ctx, options)
},
},
&discoveryv1.EndpointSlice{},
0,
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
if es, ok := obj.(*discoveryv1.EndpointSlice); ok {
// TODO: currently we only consider addresses in 1 slice, but services with
// a large number of endpoints (>1000) may have multiple slices. Some slices
// with only a few addresses. We should check the addresses in all slices.
for _, es := range slices.Items {
if len(es.Endpoints) > 0 && len(es.Endpoints[0].Addresses) > 0 {
endpointSliceAvailable = true
return true, nil
}
}
},
UpdateFunc: func(old, cur interface{}) {
if es, ok := cur.(*discoveryv1.EndpointSlice); ok {
// TODO: currently we only consider addresses in 1 slice, but services with
// a large number of endpoints (>1000) may have multiple slices. Some slices
// with only a few addresses. We should check the addresses in all slices.
if len(es.Endpoints) > 0 && len(es.Endpoints[0].Addresses) > 0 {
endpointSliceAvailable = true
}
}
},
},
)
go esController.Run(ctx.Done())
err := wait.PollUntilContextCancel(ctx, 1*time.Second, false, func(ctx context.Context) (bool, error) {
return endpointAvailable && endpointSliceAvailable, nil
return false, nil
})
if err != nil {
return fmt.Errorf("no subset of available IP address found for the endpoint %s within timeout %v", j.Name, timeout)

View File

@ -53,12 +53,6 @@ import (
"k8s.io/client-go/tools/clientcmd"
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
watchtools "k8s.io/client-go/tools/watch"
netutils "k8s.io/utils/net"
)
const (
// TODO(justinsb): Avoid hardcoding this.
awsMasterIP = "172.20.0.9"
)
// DEPRECATED constants. Use the timeouts in framework.Framework instead.
@ -416,29 +410,12 @@ func CheckTestingNSDeletedExcept(ctx context.Context, c clientset.Interface, ski
return fmt.Errorf("Waiting for terminating namespaces to be deleted timed out")
}
// WaitForServiceEndpointsNum waits until the amount of endpoints that implement service to expectNum.
// Some components use EndpointSlices other Endpoints, we must verify that both objects meet the requirements.
// WaitForServiceEndpointsNum waits until there are EndpointSlices for serviceName
// containing a total of expectNum endpoints. (If the service is dual-stack, expectNum
// must count the endpoints of both IP families.)
func WaitForServiceEndpointsNum(ctx context.Context, c clientset.Interface, namespace, serviceName string, expectNum int, interval, timeout time.Duration) error {
return wait.PollUntilContextTimeout(ctx, interval, timeout, false, func(ctx context.Context) (bool, error) {
Logf("Waiting for amount of service:%s endpoints to be %d", serviceName, expectNum)
endpoint, err := c.CoreV1().Endpoints(namespace).Get(ctx, serviceName, metav1.GetOptions{})
if err != nil {
Logf("Unexpected error trying to get Endpoints for %s : %v", serviceName, err)
return false, nil
}
if countEndpointsNum(endpoint) != expectNum {
Logf("Unexpected number of Endpoints, got %d, expected %d", countEndpointsNum(endpoint), expectNum)
return false, nil
}
// Endpoints are single family but EndpointSlices can have dual stack addresses,
// so we verify the number of addresses that matches the same family on both.
addressType := discoveryv1.AddressTypeIPv4
if isIPv6Endpoint(endpoint) {
addressType = discoveryv1.AddressTypeIPv6
}
esList, err := c.DiscoveryV1().EndpointSlices(namespace).List(ctx, metav1.ListOptions{LabelSelector: fmt.Sprintf("%s=%s", discoveryv1.LabelServiceName, serviceName)})
if err != nil {
Logf("Unexpected error trying to get EndpointSlices for %s : %v", serviceName, err)
@ -450,44 +427,18 @@ func WaitForServiceEndpointsNum(ctx context.Context, c clientset.Interface, name
return false, nil
}
if countEndpointsSlicesNum(esList, addressType) != expectNum {
Logf("Unexpected number of Endpoints on Slices, got %d, expected %d", countEndpointsSlicesNum(esList, addressType), expectNum)
if countEndpointsSlicesNum(esList) != expectNum {
Logf("Unexpected number of Endpoints on Slices, got %d, expected %d", countEndpointsSlicesNum(esList), expectNum)
return false, nil
}
return true, nil
})
}
func countEndpointsNum(e *v1.Endpoints) int {
num := 0
for _, sub := range e.Subsets {
num += len(sub.Addresses)
}
return num
}
// isIPv6Endpoint returns true if the Endpoint uses IPv6 addresses
func isIPv6Endpoint(e *v1.Endpoints) bool {
for _, sub := range e.Subsets {
for _, addr := range sub.Addresses {
if len(addr.IP) == 0 {
continue
}
// Endpoints are single family, so it is enough to check only one address
return netutils.IsIPv6String(addr.IP)
}
}
// default to IPv4 an Endpoint without IP addresses
return false
}
func countEndpointsSlicesNum(epList *discoveryv1.EndpointSliceList, addressType discoveryv1.AddressType) int {
func countEndpointsSlicesNum(epList *discoveryv1.EndpointSliceList) int {
// EndpointSlices can contain the same address on multiple Slices
addresses := sets.Set[string]{}
for _, epSlice := range epList.Items {
if epSlice.AddressType != addressType {
continue
}
for _, ep := range epSlice.Endpoints {
if len(ep.Addresses) > 0 {
addresses.Insert(ep.Addresses[0])
@ -678,38 +629,6 @@ func GetNodeExternalIPs(node *v1.Node) (ips []string) {
return
}
// getControlPlaneAddresses returns the externalIP, internalIP and hostname fields of control plane nodes.
// If any of these is unavailable, empty slices are returned.
func getControlPlaneAddresses(ctx context.Context, c clientset.Interface) ([]string, []string, []string) {
var externalIPs, internalIPs, hostnames []string
// Populate the internal IPs.
eps, err := c.CoreV1().Endpoints(metav1.NamespaceDefault).Get(ctx, "kubernetes", metav1.GetOptions{})
if err != nil {
Failf("Failed to get kubernetes endpoints: %v", err)
}
for _, subset := range eps.Subsets {
for _, address := range subset.Addresses {
if address.IP != "" {
internalIPs = append(internalIPs, address.IP)
}
}
}
// Populate the external IP/hostname.
hostURL, err := url.Parse(TestContext.Host)
if err != nil {
Failf("Failed to parse hostname: %v", err)
}
if netutils.ParseIPSloppy(hostURL.Host) != nil {
externalIPs = append(externalIPs, hostURL.Host)
} else {
hostnames = append(hostnames, hostURL.Host)
}
return externalIPs, internalIPs, hostnames
}
// GetControlPlaneNodes returns a list of control plane nodes
func GetControlPlaneNodes(ctx context.Context, c clientset.Interface) *v1.NodeList {
allNodes, err := c.CoreV1().Nodes().List(ctx, metav1.ListOptions{})
@ -737,30 +656,6 @@ func GetControlPlaneNodes(ctx context.Context, c clientset.Interface) *v1.NodeLi
return &cpNodes
}
// GetControlPlaneAddresses returns all IP addresses on which the kubelet can reach the control plane.
// It may return internal and external IPs, even if we expect for
// e.g. internal IPs to be used (issue #56787), so that we can be
// sure to block the control plane fully during tests.
func GetControlPlaneAddresses(ctx context.Context, c clientset.Interface) []string {
externalIPs, internalIPs, _ := getControlPlaneAddresses(ctx, c)
ips := sets.NewString()
switch TestContext.Provider {
case "gce":
for _, ip := range externalIPs {
ips.Insert(ip)
}
for _, ip := range internalIPs {
ips.Insert(ip)
}
case "aws":
ips.Insert(awsMasterIP)
default:
Failf("This test is not supported for provider %s and should be disabled", TestContext.Provider)
}
return ips.List()
}
// PrettyPrintJSON converts metrics to JSON format.
func PrettyPrintJSON(metrics interface{}) string {
output := &bytes.Buffer{}

View File

@ -43,6 +43,7 @@ import (
"sigs.k8s.io/yaml"
v1 "k8s.io/api/core/v1"
discoveryv1 "k8s.io/api/discovery/v1"
rbacv1 "k8s.io/api/rbac/v1"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
@ -66,7 +67,7 @@ import (
"k8s.io/kubernetes/test/e2e/framework"
e2eauth "k8s.io/kubernetes/test/e2e/framework/auth"
e2edebug "k8s.io/kubernetes/test/e2e/framework/debug"
e2eendpoints "k8s.io/kubernetes/test/e2e/framework/endpoints"
e2eendpointslice "k8s.io/kubernetes/test/e2e/framework/endpointslice"
e2ekubectl "k8s.io/kubernetes/test/e2e/framework/kubectl"
e2enetwork "k8s.io/kubernetes/test/e2e/framework/network"
e2enode "k8s.io/kubernetes/test/e2e/framework/node"
@ -1538,10 +1539,10 @@ metadata:
})
validateService := func(name string, servicePort int, timeout time.Duration) {
err := wait.Poll(framework.Poll, timeout, func() (bool, error) {
ep, err := c.CoreV1().Endpoints(ns).Get(ctx, name, metav1.GetOptions{})
slices, err := c.DiscoveryV1().EndpointSlices(ns).List(ctx, metav1.ListOptions{LabelSelector: fmt.Sprintf("%s=%s", discoveryv1.LabelServiceName, name)})
if err != nil {
// log the real error
framework.Logf("Get endpoints failed (interval %v): %v", framework.Poll, err)
framework.Logf("List endpointslices failed (interval %v): %v", framework.Poll, err)
// if the error is API not found or could not find default credentials or TLS handshake timeout, try again
if apierrors.IsNotFound(err) ||
@ -1552,7 +1553,7 @@ metadata:
return false, err
}
uidToPort := e2eendpoints.GetContainerPortsByPodUID(ep)
uidToPort := e2eendpointslice.GetContainerPortsByPodUID(slices.Items)
if len(uidToPort) == 0 {
framework.Logf("No endpoint found, retrying")
return false, nil

View File

@ -14,22 +14,15 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package endpoints
package network
import (
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
)
// PortsByPodUID is a map that maps pod UID to container ports.
type PortsByPodUID map[types.UID][]int
// FullPortsByPodUID is a map that maps pod UID to container ports.
type FullPortsByPodUID map[types.UID][]v1.ContainerPort
// GetContainerPortsByPodUID returns a PortsByPodUID map on the given endpoints.
func GetContainerPortsByPodUID(ep *v1.Endpoints) PortsByPodUID {
m := PortsByPodUID{}
// getContainerPortsByPodUID returns a portsByPodUID map on the given endpoints.
func getContainerPortsByPodUID(ep *v1.Endpoints) portsByPodUID {
m := portsByPodUID{}
for _, ss := range ep.Subsets {
for _, port := range ss.Ports {
for _, addr := range ss.Addresses {
@ -44,9 +37,9 @@ func GetContainerPortsByPodUID(ep *v1.Endpoints) PortsByPodUID {
return m
}
// GetFullContainerPortsByPodUID returns a FullPortsByPodUID map on the given endpoints with all the port data.
func GetFullContainerPortsByPodUID(ep *v1.Endpoints) FullPortsByPodUID {
m := FullPortsByPodUID{}
// getFullContainerPortsByPodUID returns a fullPortsByPodUID map on the given endpoints with all the port data.
func getFullContainerPortsByPodUID(ep *v1.Endpoints) fullPortsByPodUID {
m := fullPortsByPodUID{}
for _, ss := range ep.Subsets {
for _, port := range ss.Ports {
containerPort := v1.ContainerPort{

View File

@ -59,7 +59,6 @@ import (
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/test/e2e/framework"
e2edeployment "k8s.io/kubernetes/test/e2e/framework/deployment"
e2eendpoints "k8s.io/kubernetes/test/e2e/framework/endpoints"
e2eendpointslice "k8s.io/kubernetes/test/e2e/framework/endpointslice"
e2enetwork "k8s.io/kubernetes/test/e2e/framework/network"
e2enode "k8s.io/kubernetes/test/e2e/framework/node"
@ -4591,7 +4590,7 @@ func validateEndpointsPortsOrFail(ctx context.Context, c clientset.Interface, na
// Retry the error
return false, nil
}
portsByUID := portsByPodUID(e2eendpoints.GetContainerPortsByPodUID(ep))
portsByUID := getContainerPortsByPodUID(ep)
if err := validatePorts(portsByUID, expectedPortsByPodUID); err != nil {
if i%5 == 0 {
framework.Logf("Unexpected endpoints: found %v, expected %v, will retry", portsByUID, expectedEndpoints)
@ -4675,7 +4674,7 @@ func validateEndpointsPortsWithProtocolsOrFail(c clientset.Interface, namespace,
// Retry the error
return false, nil
}
portsByUID := fullPortsByPodUID(e2eendpoints.GetFullContainerPortsByPodUID(ep))
portsByUID := getFullContainerPortsByPodUID(ep)
if err := validatePortsAndProtocols(portsByUID, expectedPortsByPodUID); err != nil {
if i%5 == 0 {
framework.Logf("Unexpected endpoints: found %v, expected %v, will retry", portsByUID, expectedEndpoints)