diff --git a/test/e2e/network/BUILD b/test/e2e/network/BUILD index 979b274c153..042d4654817 100644 --- a/test/e2e/network/BUILD +++ b/test/e2e/network/BUILD @@ -62,6 +62,7 @@ go_library( "//staging/src/k8s.io/apiserver/pkg/authentication/serviceaccount:go_default_library", "//staging/src/k8s.io/client-go/kubernetes:go_default_library", "//staging/src/k8s.io/client-go/tools/cache:go_default_library", + "//staging/src/k8s.io/client-go/tools/watch:go_default_library", "//staging/src/k8s.io/client-go/util/flowcontrol:go_default_library", "//staging/src/k8s.io/client-go/util/retry:go_default_library", "//staging/src/k8s.io/client-go/util/workqueue:go_default_library", diff --git a/test/e2e/network/service.go b/test/e2e/network/service.go index cce449b5e8e..2d48e37c47b 100644 --- a/test/e2e/network/service.go +++ b/test/e2e/network/service.go @@ -34,6 +34,8 @@ import ( compute "google.golang.org/api/compute/v1" + "k8s.io/client-go/tools/cache" + appsv1 "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -42,7 +44,9 @@ import ( "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" + watch "k8s.io/apimachinery/pkg/watch" clientset "k8s.io/client-go/kubernetes" + watchtools "k8s.io/client-go/tools/watch" cloudprovider "k8s.io/cloud-provider" "k8s.io/kubernetes/test/e2e/framework" e2edeployment "k8s.io/kubernetes/test/e2e/framework/deployment" @@ -2767,16 +2771,13 @@ var _ = SIGDescribe("Services", func() { }) ginkgo.It("should test the lifecycle of an Endpoint", func() { - ns := f.Namespace.Name + testNamespaceName := f.Namespace.Name testEndpointName := "testservice" - - ginkgo.By("creating an Endpoint") - _, err := f.ClientSet.CoreV1().Endpoints(ns).Create(context.TODO(), &v1.Endpoints{ + testEndpoints := v1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ - Name: testEndpointName, - Namespace: ns, + Name: testEndpointName, Labels: map[string]string{ - "testendpoint-static": "true", + "test-endpoint-static": "true", }, }, Subsets: []v1.EndpointSubset{{ @@ -2789,50 +2790,82 @@ var _ = SIGDescribe("Services", func() { Protocol: v1.ProtocolTCP, }}, }}, - }, metav1.CreateOptions{}) - framework.ExpectNoError(err, "failed to create Endpoint") - - // set up a watch for the Endpoint - // this timeout was chosen as there was timeout failure from the CI - endpointWatchTimeoutSeconds := int64(180) - endpointWatch, err := f.ClientSet.CoreV1().Endpoints(ns).Watch(context.TODO(), metav1.ListOptions{LabelSelector: "testendpoint-static=true", TimeoutSeconds: &endpointWatchTimeoutSeconds}) - framework.ExpectNoError(err, "failed to setup watch on newly created Endpoint") - endpointWatchChan := endpointWatch.ResultChan() - ginkgo.By("waiting for available Endpoint") - for watchEvent := range endpointWatchChan { - if watchEvent.Type == "ADDED" { - break - } } + w := &cache.ListWatch{ + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + options.LabelSelector = "test-endpoint-static=true" + return f.ClientSet.CoreV1().Endpoints(testNamespaceName).Watch(context.TODO(), options) + }, + } + endpointsList, err := f.ClientSet.CoreV1().Endpoints("").List(context.TODO(), metav1.ListOptions{LabelSelector: "test-endpoint-static=true"}) + framework.ExpectNoError(err, "failed to list Endpoints") + + ginkgo.By("creating an Endpoint") + _, err = f.ClientSet.CoreV1().Endpoints(testNamespaceName).Create(context.TODO(), &testEndpoints, metav1.CreateOptions{}) + framework.ExpectNoError(err, "failed to create Endpoint") + ginkgo.By("waiting for available Endpoint") + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + _, err = watchtools.Until(ctx, endpointsList.ResourceVersion, w, func(event watch.Event) (bool, error) { + switch event.Type { + case watch.Added: + if endpoints, ok := event.Object.(*v1.Endpoints); ok { + found := endpoints.ObjectMeta.Name == endpoints.Name && + endpoints.Labels["test-endpoint-static"] == "true" + return found, nil + } + default: + framework.Logf("observed event type %v", event.Type) + } + return false, nil + }) + framework.ExpectNoError(err, "failed to see %v event", watch.Added) ginkgo.By("listing all Endpoints") - endpointsList, err := f.ClientSet.CoreV1().Endpoints("").List(context.TODO(), metav1.ListOptions{LabelSelector: "testendpoint-static=true"}) + endpointsList, err = f.ClientSet.CoreV1().Endpoints("").List(context.TODO(), metav1.ListOptions{LabelSelector: "test-endpoint-static=true"}) framework.ExpectNoError(err, "failed to list Endpoints") - foundEndpointService := false + eventFound := false var foundEndpoint v1.Endpoints for _, endpoint := range endpointsList.Items { - if endpoint.ObjectMeta.Name == testEndpointName && endpoint.ObjectMeta.Namespace == ns { - foundEndpointService = true + if endpoint.ObjectMeta.Name == testEndpointName && endpoint.ObjectMeta.Namespace == testNamespaceName { + eventFound = true foundEndpoint = endpoint break } } - framework.ExpectEqual(foundEndpointService, true, "unable to find Endpoint Service in list of Endpoints") + framework.ExpectEqual(eventFound, true, "unable to find Endpoint Service in list of Endpoints") ginkgo.By("updating the Endpoint") - foundEndpoint.ObjectMeta.Labels["testservice"] = "first-modification" - _, err = f.ClientSet.CoreV1().Endpoints(ns).Update(context.TODO(), &foundEndpoint, metav1.UpdateOptions{}) + foundEndpoint.ObjectMeta.Labels["test-service"] = "updated" + _, err = f.ClientSet.CoreV1().Endpoints(testNamespaceName).Update(context.TODO(), &foundEndpoint, metav1.UpdateOptions{}) framework.ExpectNoError(err, "failed to update Endpoint with new label") + ctx, cancel = context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + _, err = watchtools.Until(ctx, endpointsList.ResourceVersion, w, func(event watch.Event) (bool, error) { + switch event.Type { + case watch.Modified: + if endpoints, ok := event.Object.(*v1.Endpoints); ok { + found := endpoints.ObjectMeta.Name == endpoints.Name && + endpoints.Labels["test-endpoint-static"] == "true" + return found, nil + } + default: + framework.Logf("observed event type %v", event.Type) + } + return false, nil + }) + framework.ExpectNoError(err, "failed to see %v event", watch.Modified) + ginkgo.By("fetching the Endpoint") - _, err = f.ClientSet.CoreV1().Endpoints(ns).Get(context.TODO(), testEndpointName, metav1.GetOptions{}) + endpoints, err := f.ClientSet.CoreV1().Endpoints(testNamespaceName).Get(context.TODO(), testEndpointName, metav1.GetOptions{}) framework.ExpectNoError(err, "failed to fetch Endpoint") - framework.ExpectEqual(foundEndpoint.ObjectMeta.Labels["testservice"], "first-modification", "label not patched") + framework.ExpectEqual(foundEndpoint.ObjectMeta.Labels["test-service"], "updated", "failed to update Endpoint %v in namespace %v label not updated", testEndpointName, testNamespaceName) endpointPatch, err := json.Marshal(map[string]interface{}{ "metadata": map[string]interface{}{ "labels": map[string]string{ - "testservice": "second-modification", + "test-service": "patched", }, }, "subsets": []map[string]interface{}{ @@ -2853,14 +2886,30 @@ var _ = SIGDescribe("Services", func() { }) framework.ExpectNoError(err, "failed to marshal JSON for WatchEvent patch") ginkgo.By("patching the Endpoint") - _, err = f.ClientSet.CoreV1().Endpoints(ns).Patch(context.TODO(), testEndpointName, types.StrategicMergePatchType, []byte(endpointPatch), metav1.PatchOptions{}) + _, err = f.ClientSet.CoreV1().Endpoints(testNamespaceName).Patch(context.TODO(), testEndpointName, types.StrategicMergePatchType, []byte(endpointPatch), metav1.PatchOptions{}) framework.ExpectNoError(err, "failed to patch Endpoint") + ctx, cancel = context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + _, err = watchtools.Until(ctx, endpoints.ResourceVersion, w, func(event watch.Event) (bool, error) { + switch event.Type { + case watch.Modified: + if endpoints, ok := event.Object.(*v1.Endpoints); ok { + found := endpoints.ObjectMeta.Name == endpoints.Name && + endpoints.Labels["test-endpoint-static"] == "true" + return found, nil + } + default: + framework.Logf("observed event type %v", event.Type) + } + return false, nil + }) + framework.ExpectNoError(err, "failed to see %v event", watch.Modified) ginkgo.By("fetching the Endpoint") - endpoint, err := f.ClientSet.CoreV1().Endpoints(ns).Get(context.TODO(), testEndpointName, metav1.GetOptions{}) + endpoints, err = f.ClientSet.CoreV1().Endpoints(testNamespaceName).Get(context.TODO(), testEndpointName, metav1.GetOptions{}) framework.ExpectNoError(err, "failed to fetch Endpoint") - framework.ExpectEqual(endpoint.ObjectMeta.Labels["testservice"], "second-modification", "failed to patch Endpoint with Label") - endpointSubsetOne := endpoint.Subsets[0] + framework.ExpectEqual(endpoints.ObjectMeta.Labels["test-service"], "patched", "failed to patch Endpoint with Label") + endpointSubsetOne := endpoints.Subsets[0] endpointSubsetOneAddresses := endpointSubsetOne.Addresses[0] endpointSubsetOnePorts := endpointSubsetOne.Ports[0] framework.ExpectEqual(endpointSubsetOneAddresses.IP, "10.0.0.25", "failed to patch Endpoint") @@ -2868,15 +2917,30 @@ var _ = SIGDescribe("Services", func() { framework.ExpectEqual(endpointSubsetOnePorts.Port, int32(8080), "failed to patch Endpoint") ginkgo.By("deleting the Endpoint by Collection") - err = f.ClientSet.CoreV1().Endpoints(ns).DeleteCollection(context.TODO(), metav1.DeleteOptions{}, metav1.ListOptions{LabelSelector: "testendpoint-static=true"}) + err = f.ClientSet.CoreV1().Endpoints(testNamespaceName).DeleteCollection(context.TODO(), metav1.DeleteOptions{}, metav1.ListOptions{LabelSelector: "test-endpoint-static=true"}) framework.ExpectNoError(err, "failed to delete Endpoint by Collection") ginkgo.By("waiting for Endpoint deletion") - for watchEvent := range endpointWatchChan { - if watchEvent.Type == "DELETED" { - break + ctx, cancel = context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + _, err = watchtools.Until(ctx, endpoints.ResourceVersion, w, func(event watch.Event) (bool, error) { + switch event.Type { + case watch.Deleted: + if endpoints, ok := event.Object.(*v1.Endpoints); ok { + found := endpoints.ObjectMeta.Name == endpoints.Name && + endpoints.Labels["test-endpoint-static"] == "true" + return found, nil + } + default: + framework.Logf("observed event type %v", event.Type) } - } + return false, nil + }) + framework.ExpectNoError(err, "failed to see %v event", watch.Deleted) + + ginkgo.By("fetching the Endpoint") + _, err = f.ClientSet.CoreV1().Endpoints(testNamespaceName).Get(context.TODO(), testEndpointName, metav1.GetOptions{}) + framework.ExpectError(err, "should not be able to fetch Endpoint") }) })