e2e WaitForServiceEndpointsNum should watch EndpointSlices

EndpointSlices is the evolution of the Endpoint object and most of the
components are using it for implementing Services, this menas that
despite the Endpoint object is up to date, the EndpointSlices may
lag behind, so test must ensure that both objects are in sync to
avoid race conditions.

Change-Id: I5d9bc7774c68f321537379d1f20b2a1fe0b39e6e
This commit is contained in:
Antonio Ojea 2023-08-07 17:44:18 +00:00
parent 1620473a9a
commit 2ceca1c78d

View File

@ -36,6 +36,7 @@ import (
"github.com/onsi/gomega"
v1 "k8s.io/api/core/v1"
discoveryv1 "k8s.io/api/discovery/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
@ -420,20 +421,37 @@ func CheckTestingNSDeletedExcept(ctx context.Context, c clientset.Interface, ski
}
// WaitForServiceEndpointsNum waits until the amount of endpoints that implement service to expectNum.
// Some components use EndpointSlices other Endpoints, we must verify that both objects meet the requirements.
func WaitForServiceEndpointsNum(ctx context.Context, c clientset.Interface, namespace, serviceName string, expectNum int, interval, timeout time.Duration) error {
return wait.PollWithContext(ctx, interval, timeout, func(ctx context.Context) (bool, error) {
Logf("Waiting for amount of service:%s endpoints to be %d", serviceName, expectNum)
list, err := c.CoreV1().Endpoints(namespace).List(ctx, metav1.ListOptions{})
endpoint, err := c.CoreV1().Endpoints(namespace).Get(ctx, serviceName, metav1.GetOptions{})
if err != nil {
return false, err
Logf("Unexpected error trying to get Endpoints for %s : %v", serviceName, err)
return false, nil
}
for _, e := range list.Items {
if e.Name == serviceName && countEndpointsNum(&e) == expectNum {
return true, nil
}
if countEndpointsNum(endpoint) != expectNum {
Logf("Unexpected number of Endpoints, got %d, expected %d", countEndpointsNum(endpoint), expectNum)
return false, nil
}
return false, nil
esList, err := c.DiscoveryV1().EndpointSlices(namespace).List(ctx, metav1.ListOptions{LabelSelector: fmt.Sprintf("%s=%s", discoveryv1.LabelServiceName, serviceName)})
if err != nil {
Logf("Unexpected error trying to get EndpointSlices for %s : %v", serviceName, err)
return false, nil
}
if len(esList.Items) == 0 {
Logf("Waiting for at least 1 EndpointSlice to exist")
return false, nil
}
if countEndpointsSlicesNum(esList) != expectNum {
Logf("Unexpected number of Endpoints on Slices, got %d, expected %d", countEndpointsSlicesNum(esList), expectNum)
return false, nil
}
return true, nil
})
}
@ -445,6 +463,19 @@ func countEndpointsNum(e *v1.Endpoints) int {
return num
}
func countEndpointsSlicesNum(epList *discoveryv1.EndpointSliceList) int {
// EndpointSlices can contain the same address on multiple Slices
addresses := sets.Set[string]{}
for _, epSlice := range epList.Items {
for _, ep := range epSlice.Endpoints {
if len(ep.Addresses) > 0 {
addresses.Insert(ep.Addresses[0])
}
}
}
return addresses.Len()
}
// restclientConfig returns a config holds the information needed to build connection to kubernetes clusters.
func restclientConfig(kubeContext string) (*clientcmdapi.Config, error) {
Logf(">>> kubeConfig: %s", TestContext.KubeConfig)