Merge pull request #93829 from aojea/e2eSvcSlice

e2e services wait for endpoint and endpoint slices
This commit is contained in:
Kubernetes Prow Robot 2020-08-12 06:49:45 -07:00 committed by GitHub
commit c780554a64
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 51 additions and 1 deletions

View File

@ -13,6 +13,7 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/api/discovery/v1beta1:go_default_library",
"//staging/src/k8s.io/api/policy/v1beta1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",

View File

@ -28,6 +28,7 @@ import (
"github.com/onsi/ginkgo"
v1 "k8s.io/api/core/v1"
discoveryv1beta1 "k8s.io/api/discovery/v1beta1"
policyv1beta1 "k8s.io/api/policy/v1beta1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -352,6 +353,8 @@ func (j *TestJig) waitForAvailableEndpoint(timeout time.Duration) error {
endpointSelector := fields.OneTermEqualSelector("metadata.name", j.Name)
stopCh := make(chan struct{})
endpointAvailable := false
endpointSliceAvailable := false
var controller cache.Controller
_, controller = cache.NewInformer(
&cache.ListWatch{
@ -390,8 +393,54 @@ func (j *TestJig) waitForAvailableEndpoint(timeout time.Duration) error {
go controller.Run(stopCh)
// If EndpointSlice API is enabled, then validate if appropriate EndpointSlice objects were also create/updated/deleted.
if _, err := j.Client.Discovery().ServerResourcesForGroupVersion(discoveryv1beta1.SchemeGroupVersion.String()); err == nil {
var esController cache.Controller
_, esController = cache.NewInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
options.LabelSelector = "kubernetes.io/service-name=" + j.Name
obj, err := j.Client.DiscoveryV1beta1().EndpointSlices(j.Namespace).List(context.TODO(), options)
return runtime.Object(obj), err
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
options.LabelSelector = "kubernetes.io/service-name=" + j.Name
return j.Client.DiscoveryV1beta1().EndpointSlices(j.Namespace).Watch(context.TODO(), options)
},
},
&discoveryv1beta1.EndpointSlice{},
0,
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
if es, ok := obj.(*discoveryv1beta1.EndpointSlice); ok {
// TODO: currently we only consider addreses in 1 slice, but services with
// a large number of endpoints (>1000) may have multiple slices. Some slices
// with only a few addresses. We should check the addresses in all slices.
if len(es.Endpoints) > 0 && len(es.Endpoints[0].Addresses) > 0 {
endpointSliceAvailable = true
}
}
},
UpdateFunc: func(old, cur interface{}) {
if es, ok := cur.(*discoveryv1beta1.EndpointSlice); ok {
// TODO: currently we only consider addreses in 1 slice, but services with
// a large number of endpoints (>1000) may have multiple slices. Some slices
// with only a few addresses. We should check the addresses in all slices.
if len(es.Endpoints) > 0 && len(es.Endpoints[0].Addresses) > 0 {
endpointSliceAvailable = true
}
}
},
},
)
go esController.Run(stopCh)
} else {
endpointSliceAvailable = true
}
err := wait.Poll(1*time.Second, timeout, func() (bool, error) {
return endpointAvailable, nil
return endpointAvailable && endpointSliceAvailable, nil
})
if err != nil {
return fmt.Errorf("no subset of available IP address found for the endpoint %s within timeout %v", j.Name, timeout)