Remove all references to v1.Endpoints from non-network e2e tests

kube-proxy does not look at Endpoints ever, so it is incorrect for a
test to assume that there is any correlation between whether Endpoints
exist and whether a Service is working. Tests should only be using the
v1.Endpoints API if they are explicitly testing the behavior of
v1.Endpoints, the Endpoints controller, or the EndpointSlice mirroring
controller. There is no reason for any non SIG Network tests to be
testing any of those things, so there should be no references to
v1.Endpoints in test/e2e outside of test/e2e/network.

Also, simplify some pointlessly complicated e2eservice code.
This commit is contained in:
Dan Winship 2024-11-20 14:48:16 -05:00
parent da5bf27bc5
commit 969ecabc0f
7 changed files with 57 additions and 191 deletions

View File

@ -28,6 +28,7 @@ import (
appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
discoveryv1 "k8s.io/api/discovery/v1"
rbacv1 "k8s.io/api/rbac/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -763,9 +764,9 @@ func validateErrorWithDebugInfo(ctx context.Context, f *framework.Framework, err
msg := fmt.Sprintf(msg, fields...)
msg += fmt.Sprintf(" but received unexpected error:\n%v", err)
client := f.ClientSet
ep, err := client.CoreV1().Endpoints(namespace).Get(ctx, "sample-api", metav1.GetOptions{})
slices, err := client.DiscoveryV1().EndpointSlices(namespace).List(ctx, metav1.ListOptions{LabelSelector: fmt.Sprintf("%s=%s", discoveryv1.LabelServiceName, "sample-api")})
if err == nil {
msg += fmt.Sprintf("\nFound endpoints for sample-api:\n%v", ep)
msg += fmt.Sprintf("\nFound endpoint slices for sample-api:\n%v", slices)
}
pds, err := client.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{})
if err == nil {

View File

@ -1,12 +0,0 @@
# This E2E framework sub-package is currently allowed to use arbitrary
# dependencies except of k/k/pkg, therefore we need to override the
# restrictions from the parent .import-restrictions file.
#
# At some point it may become useful to also check this package's
# dependencies more careful.
rules:
- selectorRegexp: "^k8s[.]io/kubernetes/pkg"
allowedPrefixes: []
- selectorRegexp: ""
allowedPrefixes: [ "" ]

View File

@ -33,17 +33,13 @@ import (
policyv1 "k8s.io/api/policy/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/intstr"
utilnet "k8s.io/apimachinery/pkg/util/net"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/kubernetes/test/e2e/framework"
e2enode "k8s.io/kubernetes/test/e2e/framework/node"
e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
@ -404,42 +400,47 @@ func (j *TestJig) GetEndpointNodeNames(ctx context.Context) (sets.String, error)
if err != nil {
return nil, err
}
endpoints, err := j.Client.CoreV1().Endpoints(j.Namespace).Get(ctx, j.Name, metav1.GetOptions{})
slices, err := j.Client.DiscoveryV1().EndpointSlices(j.Namespace).List(ctx, metav1.ListOptions{LabelSelector: discoveryv1.LabelServiceName + "=" + j.Name})
if err != nil {
return nil, fmt.Errorf("get endpoints for service %s/%s failed (%s)", j.Namespace, j.Name, err)
}
if len(endpoints.Subsets) == 0 {
return nil, fmt.Errorf("endpoint has no subsets, cannot determine node addresses")
return nil, fmt.Errorf("list endpointslices for service %s/%s failed (%w)", j.Namespace, j.Name, err)
}
epNodes := sets.NewString()
for _, ss := range endpoints.Subsets {
for _, e := range ss.Addresses {
if e.NodeName != nil {
epNodes.Insert(*e.NodeName)
for _, slice := range slices.Items {
for _, ep := range slice.Endpoints {
if ep.NodeName != nil {
epNodes.Insert(*ep.NodeName)
}
}
}
if len(epNodes) == 0 {
return nil, fmt.Errorf("EndpointSlice has no endpoints, cannot determine node addresses")
}
return epNodes, nil
}
// WaitForEndpointOnNode waits for a service endpoint on the given node.
// WaitForEndpointOnNode waits for a service endpoint on the given node (which must be the service's only endpoint).
func (j *TestJig) WaitForEndpointOnNode(ctx context.Context, nodeName string) error {
return wait.PollUntilContextTimeout(ctx, framework.Poll, KubeProxyLagTimeout, true, func(ctx context.Context) (bool, error) {
endpoints, err := j.Client.CoreV1().Endpoints(j.Namespace).Get(ctx, j.Name, metav1.GetOptions{})
slices, err := j.Client.DiscoveryV1().EndpointSlices(j.Namespace).List(ctx, metav1.ListOptions{LabelSelector: "kubernetes.io/service-name=" + j.Name})
if err != nil {
framework.Logf("Get endpoints for service %s/%s failed (%s)", j.Namespace, j.Name, err)
framework.Logf("List endpointslices for service %s/%s failed (%s)", j.Namespace, j.Name, err)
return false, nil
}
if len(endpoints.Subsets) == 0 {
framework.Logf("Expect endpoints with subsets, got none.")
if len(slices.Items) == 0 {
framework.Logf("Expected 1 EndpointSlice for service %s/%s, got 0", j.Namespace, j.Name)
return false, nil
}
// TODO: Handle multiple endpoints
if len(endpoints.Subsets[0].Addresses) == 0 {
slice := slices.Items[0]
if len(slice.Endpoints) == 0 {
framework.Logf("Expected EndpointSlice with Endpoints, got none.")
return false, nil
}
endpoint := slice.Endpoints[0]
if len(endpoint.Addresses) == 0 || (endpoint.Conditions.Ready != nil && !*endpoint.Conditions.Ready) {
framework.Logf("Expected Ready endpoints - found none")
return false, nil
}
epHostName := *endpoints.Subsets[0].Addresses[0].NodeName
epHostName := *endpoint.NodeName
framework.Logf("Pod for service %s/%s is on node %s", j.Namespace, j.Name, epHostName)
if epHostName != nodeName {
framework.Logf("Found endpoint on wrong node, expected %v, got %v", nodeName, epHostName)
@ -451,91 +452,18 @@ func (j *TestJig) WaitForEndpointOnNode(ctx context.Context, nodeName string) er
// waitForAvailableEndpoint waits for at least 1 endpoint to be available till timeout
func (j *TestJig) waitForAvailableEndpoint(ctx context.Context, timeout time.Duration) error {
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
//Wait for endpoints to be created, this may take longer time if service backing pods are taking longer time to run
endpointSelector := fields.OneTermEqualSelector("metadata.name", j.Name)
endpointAvailable := false
endpointSliceAvailable := false
var controller cache.Controller
_, controller = cache.NewInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
options.FieldSelector = endpointSelector.String()
obj, err := j.Client.CoreV1().Endpoints(j.Namespace).List(ctx, options)
return runtime.Object(obj), err
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
options.FieldSelector = endpointSelector.String()
return j.Client.CoreV1().Endpoints(j.Namespace).Watch(ctx, options)
},
},
&v1.Endpoints{},
0,
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
if e, ok := obj.(*v1.Endpoints); ok {
if len(e.Subsets) > 0 && len(e.Subsets[0].Addresses) > 0 {
endpointAvailable = true
}
}
},
UpdateFunc: func(old, cur interface{}) {
if e, ok := cur.(*v1.Endpoints); ok {
if len(e.Subsets) > 0 && len(e.Subsets[0].Addresses) > 0 {
endpointAvailable = true
}
}
},
},
)
go controller.Run(ctx.Done())
var esController cache.Controller
_, esController = cache.NewInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
options.LabelSelector = "kubernetes.io/service-name=" + j.Name
obj, err := j.Client.DiscoveryV1().EndpointSlices(j.Namespace).List(ctx, options)
return runtime.Object(obj), err
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
options.LabelSelector = "kubernetes.io/service-name=" + j.Name
return j.Client.DiscoveryV1().EndpointSlices(j.Namespace).Watch(ctx, options)
},
},
&discoveryv1.EndpointSlice{},
0,
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
if es, ok := obj.(*discoveryv1.EndpointSlice); ok {
// TODO: currently we only consider addresses in 1 slice, but services with
// a large number of endpoints (>1000) may have multiple slices. Some slices
// with only a few addresses. We should check the addresses in all slices.
if len(es.Endpoints) > 0 && len(es.Endpoints[0].Addresses) > 0 {
endpointSliceAvailable = true
}
}
},
UpdateFunc: func(old, cur interface{}) {
if es, ok := cur.(*discoveryv1.EndpointSlice); ok {
// TODO: currently we only consider addresses in 1 slice, but services with
// a large number of endpoints (>1000) may have multiple slices. Some slices
// with only a few addresses. We should check the addresses in all slices.
if len(es.Endpoints) > 0 && len(es.Endpoints[0].Addresses) > 0 {
endpointSliceAvailable = true
}
}
},
},
)
go esController.Run(ctx.Done())
err := wait.PollUntilContextCancel(ctx, 1*time.Second, false, func(ctx context.Context) (bool, error) {
return endpointAvailable && endpointSliceAvailable, nil
err := wait.PollUntilContextTimeout(ctx, framework.Poll, timeout, true, func(ctx context.Context) (bool, error) {
slices, err := j.Client.DiscoveryV1().EndpointSlices(j.Namespace).List(ctx, metav1.ListOptions{LabelSelector: "kubernetes.io/service-name=" + j.Name})
if err != nil || len(slices.Items) == 0 {
// Retry
return false, nil
}
for _, es := range slices.Items {
if len(es.Endpoints) > 0 && len(es.Endpoints[0].Addresses) > 0 {
return true, nil
}
}
return false, nil
})
if err != nil {
return fmt.Errorf("no subset of available IP address found for the endpoint %s within timeout %v", j.Name, timeout)

View File

@ -53,7 +53,6 @@ import (
"k8s.io/client-go/tools/clientcmd"
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
watchtools "k8s.io/client-go/tools/watch"
netutils "k8s.io/utils/net"
)
// DEPRECATED constants. Use the timeouts in framework.Framework instead.
@ -411,29 +410,12 @@ func CheckTestingNSDeletedExcept(ctx context.Context, c clientset.Interface, ski
return fmt.Errorf("Waiting for terminating namespaces to be deleted timed out")
}
// WaitForServiceEndpointsNum waits until the amount of endpoints that implement service to expectNum.
// Some components use EndpointSlices other Endpoints, we must verify that both objects meet the requirements.
// WaitForServiceEndpointsNum waits until there are EndpointSlices for serviceName
// containing a total of expectNum endpoints. (If the service is dual-stack, expectNum
// must count the endpoints of both IP families.)
func WaitForServiceEndpointsNum(ctx context.Context, c clientset.Interface, namespace, serviceName string, expectNum int, interval, timeout time.Duration) error {
return wait.PollUntilContextTimeout(ctx, interval, timeout, false, func(ctx context.Context) (bool, error) {
Logf("Waiting for amount of service:%s endpoints to be %d", serviceName, expectNum)
endpoint, err := c.CoreV1().Endpoints(namespace).Get(ctx, serviceName, metav1.GetOptions{})
if err != nil {
Logf("Unexpected error trying to get Endpoints for %s : %v", serviceName, err)
return false, nil
}
if countEndpointsNum(endpoint) != expectNum {
Logf("Unexpected number of Endpoints, got %d, expected %d", countEndpointsNum(endpoint), expectNum)
return false, nil
}
// Endpoints are single family but EndpointSlices can have dual stack addresses,
// so we verify the number of addresses that matches the same family on both.
addressType := discoveryv1.AddressTypeIPv4
if isIPv6Endpoint(endpoint) {
addressType = discoveryv1.AddressTypeIPv6
}
esList, err := c.DiscoveryV1().EndpointSlices(namespace).List(ctx, metav1.ListOptions{LabelSelector: fmt.Sprintf("%s=%s", discoveryv1.LabelServiceName, serviceName)})
if err != nil {
Logf("Unexpected error trying to get EndpointSlices for %s : %v", serviceName, err)
@ -445,44 +427,18 @@ func WaitForServiceEndpointsNum(ctx context.Context, c clientset.Interface, name
return false, nil
}
if countEndpointsSlicesNum(esList, addressType) != expectNum {
Logf("Unexpected number of Endpoints on Slices, got %d, expected %d", countEndpointsSlicesNum(esList, addressType), expectNum)
if countEndpointsSlicesNum(esList) != expectNum {
Logf("Unexpected number of Endpoints on Slices, got %d, expected %d", countEndpointsSlicesNum(esList), expectNum)
return false, nil
}
return true, nil
})
}
func countEndpointsNum(e *v1.Endpoints) int {
num := 0
for _, sub := range e.Subsets {
num += len(sub.Addresses)
}
return num
}
// isIPv6Endpoint returns true if the Endpoint uses IPv6 addresses
func isIPv6Endpoint(e *v1.Endpoints) bool {
for _, sub := range e.Subsets {
for _, addr := range sub.Addresses {
if len(addr.IP) == 0 {
continue
}
// Endpoints are single family, so it is enough to check only one address
return netutils.IsIPv6String(addr.IP)
}
}
// default to IPv4 an Endpoint without IP addresses
return false
}
func countEndpointsSlicesNum(epList *discoveryv1.EndpointSliceList, addressType discoveryv1.AddressType) int {
func countEndpointsSlicesNum(epList *discoveryv1.EndpointSliceList) int {
// EndpointSlices can contain the same address on multiple Slices
addresses := sets.Set[string]{}
for _, epSlice := range epList.Items {
if epSlice.AddressType != addressType {
continue
}
for _, ep := range epSlice.Endpoints {
if len(ep.Addresses) > 0 {
addresses.Insert(ep.Addresses[0])

View File

@ -43,6 +43,7 @@ import (
"sigs.k8s.io/yaml"
v1 "k8s.io/api/core/v1"
discoveryv1 "k8s.io/api/discovery/v1"
rbacv1 "k8s.io/api/rbac/v1"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
@ -66,7 +67,7 @@ import (
"k8s.io/kubernetes/test/e2e/framework"
e2eauth "k8s.io/kubernetes/test/e2e/framework/auth"
e2edebug "k8s.io/kubernetes/test/e2e/framework/debug"
e2eendpoints "k8s.io/kubernetes/test/e2e/framework/endpoints"
e2eendpointslice "k8s.io/kubernetes/test/e2e/framework/endpointslice"
e2ekubectl "k8s.io/kubernetes/test/e2e/framework/kubectl"
e2enetwork "k8s.io/kubernetes/test/e2e/framework/network"
e2enode "k8s.io/kubernetes/test/e2e/framework/node"
@ -1538,10 +1539,10 @@ metadata:
})
validateService := func(name string, servicePort int, timeout time.Duration) {
err := wait.Poll(framework.Poll, timeout, func() (bool, error) {
ep, err := c.CoreV1().Endpoints(ns).Get(ctx, name, metav1.GetOptions{})
slices, err := c.DiscoveryV1().EndpointSlices(ns).List(ctx, metav1.ListOptions{LabelSelector: fmt.Sprintf("%s=%s", discoveryv1.LabelServiceName, name)})
if err != nil {
// log the real error
framework.Logf("Get endpoints failed (interval %v): %v", framework.Poll, err)
framework.Logf("List endpointslices failed (interval %v): %v", framework.Poll, err)
// if the error is API not found or could not find default credentials or TLS handshake timeout, try again
if apierrors.IsNotFound(err) ||
@ -1552,7 +1553,7 @@ metadata:
return false, err
}
uidToPort := e2eendpoints.GetContainerPortsByPodUID(ep)
uidToPort := e2eendpointslice.GetContainerPortsByPodUID(slices.Items)
if len(uidToPort) == 0 {
framework.Logf("No endpoint found, retrying")
return false, nil

View File

@ -14,22 +14,15 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package endpoints
package network
import (
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
)
// PortsByPodUID is a map that maps pod UID to container ports.
type PortsByPodUID map[types.UID][]int
// FullPortsByPodUID is a map that maps pod UID to container ports.
type FullPortsByPodUID map[types.UID][]v1.ContainerPort
// GetContainerPortsByPodUID returns a PortsByPodUID map on the given endpoints.
func GetContainerPortsByPodUID(ep *v1.Endpoints) PortsByPodUID {
m := PortsByPodUID{}
// getContainerPortsByPodUID returns a portsByPodUID map on the given endpoints.
func getContainerPortsByPodUID(ep *v1.Endpoints) portsByPodUID {
m := portsByPodUID{}
for _, ss := range ep.Subsets {
for _, port := range ss.Ports {
for _, addr := range ss.Addresses {
@ -44,9 +37,9 @@ func GetContainerPortsByPodUID(ep *v1.Endpoints) PortsByPodUID {
return m
}
// GetFullContainerPortsByPodUID returns a FullPortsByPodUID map on the given endpoints with all the port data.
func GetFullContainerPortsByPodUID(ep *v1.Endpoints) FullPortsByPodUID {
m := FullPortsByPodUID{}
// getFullContainerPortsByPodUID returns a fullPortsByPodUID map on the given endpoints with all the port data.
func getFullContainerPortsByPodUID(ep *v1.Endpoints) fullPortsByPodUID {
m := fullPortsByPodUID{}
for _, ss := range ep.Subsets {
for _, port := range ss.Ports {
containerPort := v1.ContainerPort{

View File

@ -59,7 +59,6 @@ import (
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/test/e2e/framework"
e2edeployment "k8s.io/kubernetes/test/e2e/framework/deployment"
e2eendpoints "k8s.io/kubernetes/test/e2e/framework/endpoints"
e2eendpointslice "k8s.io/kubernetes/test/e2e/framework/endpointslice"
e2enetwork "k8s.io/kubernetes/test/e2e/framework/network"
e2enode "k8s.io/kubernetes/test/e2e/framework/node"
@ -4591,7 +4590,7 @@ func validateEndpointsPortsOrFail(ctx context.Context, c clientset.Interface, na
// Retry the error
return false, nil
}
portsByUID := portsByPodUID(e2eendpoints.GetContainerPortsByPodUID(ep))
portsByUID := getContainerPortsByPodUID(ep)
if err := validatePorts(portsByUID, expectedPortsByPodUID); err != nil {
if i%5 == 0 {
framework.Logf("Unexpected endpoints: found %v, expected %v, will retry", portsByUID, expectedEndpoints)
@ -4675,7 +4674,7 @@ func validateEndpointsPortsWithProtocolsOrFail(c clientset.Interface, namespace,
// Retry the error
return false, nil
}
portsByUID := fullPortsByPodUID(e2eendpoints.GetFullContainerPortsByPodUID(ep))
portsByUID := getFullContainerPortsByPodUID(ep)
if err := validatePortsAndProtocols(portsByUID, expectedPortsByPodUID); err != nil {
if i%5 == 0 {
framework.Logf("Unexpected endpoints: found %v, expected %v, will retry", portsByUID, expectedEndpoints)