e2e services wait for endpoint and endpoint slices

Since 1.19 endpoint slices is enabled by default, so all the e2e
tests should consider them.

The e2e networking tests for services use the jig object for
all the tests, but was not taking into account endpoint slices.

This considers endpoints slices for the method waitForAvailableEndpoint()

 Date:      Sun Aug 9 12:34:06 2020 +0200
This commit is contained in:
Antonio Ojea 2020-08-09 12:34:06 +02:00
parent 92e51c1901
commit f31839acea
2 changed files with 51 additions and 1 deletions

View File

@ -13,6 +13,7 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/api/discovery/v1beta1:go_default_library",
"//staging/src/k8s.io/api/policy/v1beta1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",

View File

@ -28,6 +28,7 @@ import (
"github.com/onsi/ginkgo"
v1 "k8s.io/api/core/v1"
discoveryv1beta1 "k8s.io/api/discovery/v1beta1"
policyv1beta1 "k8s.io/api/policy/v1beta1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -352,6 +353,8 @@ func (j *TestJig) waitForAvailableEndpoint(timeout time.Duration) error {
endpointSelector := fields.OneTermEqualSelector("metadata.name", j.Name)
stopCh := make(chan struct{})
endpointAvailable := false
endpointSliceAvailable := false
var controller cache.Controller
_, controller = cache.NewInformer(
&cache.ListWatch{
@ -390,8 +393,54 @@ func (j *TestJig) waitForAvailableEndpoint(timeout time.Duration) error {
go controller.Run(stopCh)
// If EndpointSlice API is enabled, then validate if appropriate EndpointSlice objects were also create/updated/deleted.
if _, err := j.Client.Discovery().ServerResourcesForGroupVersion(discoveryv1beta1.SchemeGroupVersion.String()); err == nil {
var esController cache.Controller
_, esController = cache.NewInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
options.LabelSelector = "kubernetes.io/service-name=" + j.Name
obj, err := j.Client.DiscoveryV1beta1().EndpointSlices(j.Namespace).List(context.TODO(), options)
return runtime.Object(obj), err
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
options.LabelSelector = "kubernetes.io/service-name=" + j.Name
return j.Client.DiscoveryV1beta1().EndpointSlices(j.Namespace).Watch(context.TODO(), options)
},
},
&discoveryv1beta1.EndpointSlice{},
0,
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
if es, ok := obj.(*discoveryv1beta1.EndpointSlice); ok {
// TODO: currently we only consider addreses in 1 slice, but services with
// a large number of endpoints (>1000) may have multiple slices. Some slices
// with only a few addresses. We should check the addresses in all slices.
if len(es.Endpoints) > 0 && len(es.Endpoints[0].Addresses) > 0 {
endpointSliceAvailable = true
}
}
},
UpdateFunc: func(old, cur interface{}) {
if es, ok := cur.(*discoveryv1beta1.EndpointSlice); ok {
// TODO: currently we only consider addreses in 1 slice, but services with
// a large number of endpoints (>1000) may have multiple slices. Some slices
// with only a few addresses. We should check the addresses in all slices.
if len(es.Endpoints) > 0 && len(es.Endpoints[0].Addresses) > 0 {
endpointSliceAvailable = true
}
}
},
},
)
go esController.Run(stopCh)
} else {
endpointSliceAvailable = true
}
err := wait.Poll(1*time.Second, timeout, func() (bool, error) {
return endpointAvailable, nil
return endpointAvailable && endpointSliceAvailable, nil
})
if err != nil {
return fmt.Errorf("no subset of available IP address found for the endpoint %s within timeout %v", j.Name, timeout)