diff --git a/test/e2e/framework/service/BUILD b/test/e2e/framework/service/BUILD index 43be89af1c3..3af1bb6cf31 100644 --- a/test/e2e/framework/service/BUILD +++ b/test/e2e/framework/service/BUILD @@ -13,6 +13,7 @@ go_library( visibility = ["//visibility:public"], deps = [ "//staging/src/k8s.io/api/core/v1:go_default_library", + "//staging/src/k8s.io/api/discovery/v1beta1:go_default_library", "//staging/src/k8s.io/api/policy/v1beta1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", diff --git a/test/e2e/framework/service/jig.go b/test/e2e/framework/service/jig.go index b1c48b1376f..5eaf83b7523 100644 --- a/test/e2e/framework/service/jig.go +++ b/test/e2e/framework/service/jig.go @@ -28,6 +28,7 @@ import ( "github.com/onsi/ginkgo" v1 "k8s.io/api/core/v1" + discoveryv1beta1 "k8s.io/api/discovery/v1beta1" policyv1beta1 "k8s.io/api/policy/v1beta1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -352,6 +353,8 @@ func (j *TestJig) waitForAvailableEndpoint(timeout time.Duration) error { endpointSelector := fields.OneTermEqualSelector("metadata.name", j.Name) stopCh := make(chan struct{}) endpointAvailable := false + endpointSliceAvailable := false + var controller cache.Controller _, controller = cache.NewInformer( &cache.ListWatch{ @@ -390,8 +393,54 @@ func (j *TestJig) waitForAvailableEndpoint(timeout time.Duration) error { go controller.Run(stopCh) + // If EndpointSlice API is enabled, then validate if appropriate EndpointSlice objects were also create/updated/deleted. + if _, err := j.Client.Discovery().ServerResourcesForGroupVersion(discoveryv1beta1.SchemeGroupVersion.String()); err == nil { + var esController cache.Controller + _, esController = cache.NewInformer( + &cache.ListWatch{ + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + options.LabelSelector = "kubernetes.io/service-name=" + j.Name + obj, err := j.Client.DiscoveryV1beta1().EndpointSlices(j.Namespace).List(context.TODO(), options) + return runtime.Object(obj), err + }, + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + options.LabelSelector = "kubernetes.io/service-name=" + j.Name + return j.Client.DiscoveryV1beta1().EndpointSlices(j.Namespace).Watch(context.TODO(), options) + }, + }, + &discoveryv1beta1.EndpointSlice{}, + 0, + cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + if es, ok := obj.(*discoveryv1beta1.EndpointSlice); ok { + // TODO: currently we only consider addreses in 1 slice, but services with + // a large number of endpoints (>1000) may have multiple slices. Some slices + // with only a few addresses. We should check the addresses in all slices. + if len(es.Endpoints) > 0 && len(es.Endpoints[0].Addresses) > 0 { + endpointSliceAvailable = true + } + } + }, + UpdateFunc: func(old, cur interface{}) { + if es, ok := cur.(*discoveryv1beta1.EndpointSlice); ok { + // TODO: currently we only consider addreses in 1 slice, but services with + // a large number of endpoints (>1000) may have multiple slices. Some slices + // with only a few addresses. We should check the addresses in all slices. + if len(es.Endpoints) > 0 && len(es.Endpoints[0].Addresses) > 0 { + endpointSliceAvailable = true + } + } + }, + }, + ) + + go esController.Run(stopCh) + + } else { + endpointSliceAvailable = true + } err := wait.Poll(1*time.Second, timeout, func() (bool, error) { - return endpointAvailable, nil + return endpointAvailable && endpointSliceAvailable, nil }) if err != nil { return fmt.Errorf("no subset of available IP address found for the endpoint %s within timeout %v", j.Name, timeout)