diff --git a/test/e2e/common/BUILD b/test/e2e/common/BUILD index 3c642b6e247..5d8f37709a0 100644 --- a/test/e2e/common/BUILD +++ b/test/e2e/common/BUILD @@ -58,6 +58,7 @@ go_library( "//staging/src/k8s.io/apimachinery/pkg/fields:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/diff:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/intstr:go_default_library", @@ -66,6 +67,7 @@ go_library( "//staging/src/k8s.io/apimachinery/pkg/util/uuid:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library", + "//staging/src/k8s.io/client-go/dynamic:go_default_library", "//staging/src/k8s.io/client-go/kubernetes:go_default_library", "//staging/src/k8s.io/client-go/tools/cache:go_default_library", "//staging/src/k8s.io/client-go/tools/watch:go_default_library", diff --git a/test/e2e/common/configmap.go b/test/e2e/common/configmap.go index 106bd4fc40d..93311c5ee3d 100644 --- a/test/e2e/common/configmap.go +++ b/test/e2e/common/configmap.go @@ -20,11 +20,16 @@ import ( "context" "encoding/json" "fmt" + "time" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/uuid" + watch "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/dynamic" + watchtools "k8s.io/client-go/tools/watch" "k8s.io/kubernetes/test/e2e/framework" imageutils "k8s.io/kubernetes/test/utils/image" @@ -34,6 +39,12 @@ import ( var _ = ginkgo.Describe("[sig-node] ConfigMap", func() { f := framework.NewDefaultFramework("configmap") + var dc dynamic.Interface + + ginkgo.BeforeEach(func() { + dc = f.DynamicClient + }) + /* Release : v1.9 Testname: ConfigMap, from environment field @@ -161,7 +172,13 @@ var _ = ginkgo.Describe("[sig-node] ConfigMap", func() { testNamespaceName := f.Namespace.Name testConfigMapName := "test-configmap" + string(uuid.NewUUID()) - _, err := f.ClientSet.CoreV1().ConfigMaps(testNamespaceName).Create(context.TODO(), &v1.ConfigMap{ + configMapResource := schema.GroupVersionResource{Group: "", Version: "v1", Resource: "configmaps"} + expectedWatchEvents := []watch.Event{ + {Type: watch.Added}, + {Type: watch.Modified}, + {Type: watch.Deleted}, + } + testConfigMap := v1.ConfigMap{ ObjectMeta: metav1.ObjectMeta{ Name: testConfigMapName, Labels: map[string]string{ @@ -171,51 +188,105 @@ var _ = ginkgo.Describe("[sig-node] ConfigMap", func() { Data: map[string]string{ "valueName": "value", }, - }, metav1.CreateOptions{}) - framework.ExpectNoError(err, "failed to create ConfigMap") - - configMapPatchPayload, err := json.Marshal(v1.ConfigMap{ - ObjectMeta: metav1.ObjectMeta{ - Labels: map[string]string{ - "test-configmap": "patched", - }, - }, - Data: map[string]string{ - "valueName": "value1", - }, - }) - framework.ExpectNoError(err, "failed to marshal patch data") - - _, err = f.ClientSet.CoreV1().ConfigMaps(testNamespaceName).Patch(context.TODO(), testConfigMapName, types.StrategicMergePatchType, []byte(configMapPatchPayload), metav1.PatchOptions{}) - framework.ExpectNoError(err, "failed to patch ConfigMap") - - configMap, err := f.ClientSet.CoreV1().ConfigMaps(testNamespaceName).Get(context.TODO(), testConfigMapName, metav1.GetOptions{}) - framework.ExpectNoError(err, "failed to get ConfigMap") - framework.ExpectEqual(configMap.Data["valueName"], "value1", "failed to patch ConfigMap") - framework.ExpectEqual(configMap.Labels["test-configmap"], "patched", "failed to patch ConfigMap") - - // listing in all namespaces to hit the endpoint - configMapList, err := f.ClientSet.CoreV1().ConfigMaps("").List(context.TODO(), metav1.ListOptions{ - LabelSelector: "test-configmap-static=true", - }) - framework.ExpectNoError(err, "failed to list ConfigMaps with LabelSelector") - framework.ExpectNotEqual(len(configMapList.Items), 0, "no ConfigMaps found in ConfigMap list") - testConfigMapFound := false - for _, cm := range configMapList.Items { - if cm.ObjectMeta.Name == testConfigMapName && - cm.ObjectMeta.Namespace == testNamespaceName && - cm.ObjectMeta.Labels["test-configmap-static"] == "true" && - cm.Data["valueName"] == "value1" { - testConfigMapFound = true - break - } } - framework.ExpectEqual(testConfigMapFound, true, "failed to find ConfigMap in list") - err = f.ClientSet.CoreV1().ConfigMaps(testNamespaceName).DeleteCollection(context.TODO(), metav1.DeleteOptions{}, metav1.ListOptions{ - LabelSelector: "test-configmap-static=true", + framework.WatchEventSequenceVerifier(context.TODO(), dc, configMapResource, testNamespaceName, testConfigMapName, metav1.ListOptions{LabelSelector: "test-configmap-static=true"}, expectedWatchEvents, func(retryWatcher *watchtools.RetryWatcher) (actualWatchEvents []watch.Event) { + ginkgo.By("creating a ConfigMap") + _, err := f.ClientSet.CoreV1().ConfigMaps(testNamespaceName).Create(context.TODO(), &testConfigMap, metav1.CreateOptions{}) + framework.ExpectNoError(err, "failed to create ConfigMap") + eventFound := false + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + _, err = framework.WatchUntilWithoutRetry(ctx, retryWatcher, func(watchEvent watch.Event) (bool, error) { + if watchEvent.Type != watch.Added { + return false, nil + } + actualWatchEvents = append(actualWatchEvents, watchEvent) + eventFound = true + return true, nil + }) + framework.ExpectNoError(err, "Wait until condition with watch events should not return an error") + framework.ExpectEqual(eventFound, true, "failed to find ConfigMap %v event", watch.Added) + + configMapPatchPayload, err := json.Marshal(v1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + "test-configmap": "patched", + }, + }, + Data: map[string]string{ + "valueName": "value1", + }, + }) + framework.ExpectNoError(err, "failed to marshal patch data") + + ginkgo.By("patching the ConfigMap") + _, err = f.ClientSet.CoreV1().ConfigMaps(testNamespaceName).Patch(context.TODO(), testConfigMapName, types.StrategicMergePatchType, []byte(configMapPatchPayload), metav1.PatchOptions{}) + framework.ExpectNoError(err, "failed to patch ConfigMap") + ginkgo.By("waiting for the ConfigMap to be modified") + eventFound = false + ctx, cancel = context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + _, err = framework.WatchUntilWithoutRetry(ctx, retryWatcher, func(watchEvent watch.Event) (bool, error) { + if watchEvent.Type != watch.Modified { + return false, nil + } + actualWatchEvents = append(actualWatchEvents, watchEvent) + eventFound = true + return true, nil + }) + framework.ExpectNoError(err, "Wait until condition with watch events should not return an error") + framework.ExpectEqual(eventFound, true, "failed to find ConfigMap %v event", watch.Modified) + + ginkgo.By("fetching the ConfigMap") + configMap, err := f.ClientSet.CoreV1().ConfigMaps(testNamespaceName).Get(context.TODO(), testConfigMapName, metav1.GetOptions{}) + framework.ExpectNoError(err, "failed to get ConfigMap") + framework.ExpectEqual(configMap.Data["valueName"], "value1", "failed to patch ConfigMap") + framework.ExpectEqual(configMap.Labels["test-configmap"], "patched", "failed to patch ConfigMap") + + ginkgo.By("listing all ConfigMaps in all namespaces") + configMapList, err := f.ClientSet.CoreV1().ConfigMaps("").List(context.TODO(), metav1.ListOptions{ + LabelSelector: "test-configmap-static=true", + }) + framework.ExpectNoError(err, "failed to list ConfigMaps with LabelSelector") + framework.ExpectNotEqual(len(configMapList.Items), 0, "no ConfigMaps found in ConfigMap list") + testConfigMapFound := false + for _, cm := range configMapList.Items { + if cm.ObjectMeta.Name == testConfigMapName && + cm.ObjectMeta.Namespace == testNamespaceName && + cm.ObjectMeta.Labels["test-configmap-static"] == "true" && + cm.Data["valueName"] == "value1" { + testConfigMapFound = true + break + } + } + framework.ExpectEqual(testConfigMapFound, true, "failed to find ConfigMap in list") + + ginkgo.By("deleting the ConfigMap by a collection") + err = f.ClientSet.CoreV1().ConfigMaps(testNamespaceName).DeleteCollection(context.TODO(), metav1.DeleteOptions{}, metav1.ListOptions{ + LabelSelector: "test-configmap-static=true", + }) + framework.ExpectNoError(err, "failed to delete ConfigMap collection with LabelSelector") + ginkgo.By("waiting for the ConfigMap to be deleted") + eventFound = false + ctx, cancel = context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + _, err = framework.WatchUntilWithoutRetry(ctx, retryWatcher, func(watchEvent watch.Event) (bool, error) { + if watchEvent.Type != watch.Deleted { + return false, nil + } + actualWatchEvents = append(actualWatchEvents, watchEvent) + eventFound = true + return true, nil + }) + framework.ExpectNoError(err, "Wait until condition with watch events should not return an error") + framework.ExpectEqual(eventFound, true, "failed to find ConfigMap %v event", watch.Deleted) + + return actualWatchEvents + }, func() (err error) { + _ = f.ClientSet.CoreV1().ConfigMaps(testNamespaceName).DeleteCollection(context.TODO(), metav1.DeleteOptions{}, metav1.ListOptions{LabelSelector: "test-configmap-static=true"}) + return err }) - framework.ExpectNoError(err, "failed to delete ConfigMap collection with LabelSelector") }) }) diff --git a/test/e2e/framework/util.go b/test/e2e/framework/util.go index 2f21428ce73..1ec1382d5da 100644 --- a/test/e2e/framework/util.go +++ b/test/e2e/framework/util.go @@ -51,6 +51,7 @@ import ( "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" utilfeature "k8s.io/apiserver/pkg/util/feature" + "k8s.io/client-go/dynamic" clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" restclient "k8s.io/client-go/rest" @@ -1302,3 +1303,119 @@ func taintExists(taints []v1.Taint, taintToFind *v1.Taint) bool { } return false } + +// WatchEventSequenceVerifier ... +// manages a watch for a given resource, ensures that events take place in a given order, retries the test on failure +// testContext cancelation signal across API boundries, e.g: context.TODO() +// dc sets up a client to the API +// resourceType specify the type of resource +// namespace select a namespace +// resourceName the name of the given resource +// listOptions options used to find the resource, recommended to use listOptions.labelSelector +// expectedWatchEvents array of events which are expected to occur +// scenario the test itself +// retryCleanup a function to run which ensures that there are no dangling resources upon test failure +// this tooling relies on the test to return the events as they occur +// the entire scenario must be run to ensure that the desired watch events arrive in order (allowing for interweaving of watch events) +// if an expected watch event is missing we elect to clean up and run the entire scenario again +// we try the scenario three times to allow the sequencing to fail a couple of times +func WatchEventSequenceVerifier(ctx context.Context, dc dynamic.Interface, resourceType schema.GroupVersionResource, namespace string, resourceName string, listOptions metav1.ListOptions, expectedWatchEvents []watch.Event, scenario func(*watchtools.RetryWatcher) []watch.Event, retryCleanup func() error) { + listWatcher := &cache.ListWatch{ + WatchFunc: func(listOptions metav1.ListOptions) (watch.Interface, error) { + return dc.Resource(resourceType).Namespace(namespace).Watch(ctx, listOptions) + }, + } + + retries := 3 +retriesLoop: + for try := 1; try <= retries; try++ { + initResource, err := dc.Resource(resourceType).Namespace(namespace).List(ctx, listOptions) + ExpectNoError(err, "Failed to fetch initial resource") + + resourceWatch, err := watchtools.NewRetryWatcher(initResource.GetResourceVersion(), listWatcher) + ExpectNoError(err, "Failed to create a resource watch of %v in namespace %v", resourceType.Resource, namespace) + + // NOTE the test may need access to the events to see what's going on, such as a change in status + actualWatchEvents := scenario(resourceWatch) + errs := sets.NewString() + ExpectEqual(len(expectedWatchEvents) <= len(actualWatchEvents), true, "Error: actual watch events amount (%d) must be greater than or equal to expected watch events amount (%d)", len(actualWatchEvents), len(expectedWatchEvents)) + + totalValidWatchEvents := 0 + foundEventIndexes := map[int]*int{} + + for watchEventIndex, expectedWatchEvent := range expectedWatchEvents { + foundExpectedWatchEvent := false + actualWatchEventsLoop: + for actualWatchEventIndex, actualWatchEvent := range actualWatchEvents { + if foundEventIndexes[actualWatchEventIndex] != nil { + continue actualWatchEventsLoop + } + if actualWatchEvent.Type == expectedWatchEvent.Type { + foundExpectedWatchEvent = true + foundEventIndexes[actualWatchEventIndex] = &watchEventIndex + break actualWatchEventsLoop + } + } + if foundExpectedWatchEvent == false { + errs.Insert(fmt.Sprintf("Watch event %v not found", expectedWatchEvent.Type)) + } + totalValidWatchEvents++ + } + err = retryCleanup() + ExpectNoError(err, "Error occurred when cleaning up resources") + if errs.Len() > 0 && try < retries { + fmt.Println("invariants violated:\n", strings.Join(errs.List(), "\n - ")) + continue retriesLoop + } + ExpectEqual(errs.Len() > 0, false, strings.Join(errs.List(), "\n - ")) + ExpectEqual(totalValidWatchEvents, len(expectedWatchEvents), "Error: there must be an equal amount of total valid watch events (%d) and expected watch events (%d)", totalValidWatchEvents, len(expectedWatchEvents)) + break retriesLoop + } +} + +// WatchUntilWithoutRetry ... +// reads items from the watch until each provided condition succeeds, and then returns the last watch +// encountered. The first condition that returns an error terminates the watch (and the event is also returned). +// If no event has been received, the returned event will be nil. +// Conditions are satisfied sequentially so as to provide a useful primitive for higher level composition. +// Waits until context deadline or until context is canceled. +// +// the same as watchtools.UntilWithoutRetry, just without the closing of the watch - as for the purpose of being paired with WatchEventSequenceVerifier, the watch is needed for continual watch event collection +func WatchUntilWithoutRetry(ctx context.Context, watcher watch.Interface, conditions ...watchtools.ConditionFunc) (*watch.Event, error) { + ch := watcher.ResultChan() + var lastEvent *watch.Event + for _, condition := range conditions { + // check the next condition against the previous event and short circuit waiting for the next watch + if lastEvent != nil { + done, err := condition(*lastEvent) + if err != nil { + return lastEvent, err + } + if done { + continue + } + } + ConditionSucceeded: + for { + select { + case event, ok := <-ch: + if !ok { + return lastEvent, watchtools.ErrWatchClosed + } + lastEvent = &event + + done, err := condition(event) + if err != nil { + return lastEvent, err + } + if done { + break ConditionSucceeded + } + + case <-ctx.Done(): + return lastEvent, wait.ErrWaitTimeout + } + } + } + return lastEvent, nil +}