Merge pull request #119798 from aojea/endpoints_and_slices

Fix flaky test depending on EndpointSlices to be ready
This commit is contained in:
Kubernetes Prow Robot 2023-08-08 04:52:14 -07:00 committed by GitHub
commit 68d79b0d69
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -36,6 +36,7 @@ import (
"github.com/onsi/gomega" "github.com/onsi/gomega"
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
discoveryv1 "k8s.io/api/discovery/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors" apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/fields"
@ -420,20 +421,37 @@ func CheckTestingNSDeletedExcept(ctx context.Context, c clientset.Interface, ski
} }
// WaitForServiceEndpointsNum waits until the amount of endpoints that implement service to expectNum. // WaitForServiceEndpointsNum waits until the amount of endpoints that implement service to expectNum.
// Some components use EndpointSlices other Endpoints, we must verify that both objects meet the requirements.
func WaitForServiceEndpointsNum(ctx context.Context, c clientset.Interface, namespace, serviceName string, expectNum int, interval, timeout time.Duration) error { func WaitForServiceEndpointsNum(ctx context.Context, c clientset.Interface, namespace, serviceName string, expectNum int, interval, timeout time.Duration) error {
return wait.PollWithContext(ctx, interval, timeout, func(ctx context.Context) (bool, error) { return wait.PollWithContext(ctx, interval, timeout, func(ctx context.Context) (bool, error) {
Logf("Waiting for amount of service:%s endpoints to be %d", serviceName, expectNum) Logf("Waiting for amount of service:%s endpoints to be %d", serviceName, expectNum)
list, err := c.CoreV1().Endpoints(namespace).List(ctx, metav1.ListOptions{}) endpoint, err := c.CoreV1().Endpoints(namespace).Get(ctx, serviceName, metav1.GetOptions{})
if err != nil { if err != nil {
return false, err Logf("Unexpected error trying to get Endpoints for %s : %v", serviceName, err)
return false, nil
} }
for _, e := range list.Items { if countEndpointsNum(endpoint) != expectNum {
if e.Name == serviceName && countEndpointsNum(&e) == expectNum { Logf("Unexpected number of Endpoints, got %d, expected %d", countEndpointsNum(endpoint), expectNum)
return true, nil return false, nil
}
} }
return false, nil
esList, err := c.DiscoveryV1().EndpointSlices(namespace).List(ctx, metav1.ListOptions{LabelSelector: fmt.Sprintf("%s=%s", discoveryv1.LabelServiceName, serviceName)})
if err != nil {
Logf("Unexpected error trying to get EndpointSlices for %s : %v", serviceName, err)
return false, nil
}
if len(esList.Items) == 0 {
Logf("Waiting for at least 1 EndpointSlice to exist")
return false, nil
}
if countEndpointsSlicesNum(esList) != expectNum {
Logf("Unexpected number of Endpoints on Slices, got %d, expected %d", countEndpointsSlicesNum(esList), expectNum)
return false, nil
}
return true, nil
}) })
} }
@ -445,6 +463,19 @@ func countEndpointsNum(e *v1.Endpoints) int {
return num return num
} }
func countEndpointsSlicesNum(epList *discoveryv1.EndpointSliceList) int {
// EndpointSlices can contain the same address on multiple Slices
addresses := sets.Set[string]{}
for _, epSlice := range epList.Items {
for _, ep := range epSlice.Endpoints {
if len(ep.Addresses) > 0 {
addresses.Insert(ep.Addresses[0])
}
}
}
return addresses.Len()
}
// restclientConfig returns a config holds the information needed to build connection to kubernetes clusters. // restclientConfig returns a config holds the information needed to build connection to kubernetes clusters.
func restclientConfig(kubeContext string) (*clientcmdapi.Config, error) { func restclientConfig(kubeContext string) (*clientcmdapi.Config, error) {
Logf(">>> kubeConfig: %s", TestContext.KubeConfig) Logf(">>> kubeConfig: %s", TestContext.KubeConfig)