Merge pull request #61025 from ayushpateria/crd_watch_e2e

Automatic merge from submit-queue (batch tested with PRs 61404, 61025). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.

Add e2e test for CRD Watch

**What this PR does / why we need it**:
This adds an e2e test to watch for custom resources. 
**Which issue(s) this PR fixes** *(optional, in `fixes #<issue number>(, fixes #<issue_number>, ...)` format, will close the issue(s) when PR gets merged)*:
Fixes #55538

**Special notes for your reviewer**:
This PR depends on some functions from #60331, and shouldn't be merged before that one gets merged.
**Release note**:

```release-note
Add e2e test for CRD Watch
```
This commit is contained in:
Kubernetes Submit Queue 2018-04-03 07:31:08 -07:00 committed by GitHub
commit 6c96dfd81e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 173 additions and 5 deletions

View File

@ -11,6 +11,7 @@ go_library(
"aggregator.go",
"certs.go",
"chunking.go",
"crd_watch.go",
"custom_resource_definition.go",
"etcd_failure.go",
"framework.go",
@ -50,6 +51,7 @@ go_library(
"//vendor/k8s.io/apiextensions-apiserver/test/integration/testserver:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/equality:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/meta:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1/unstructured:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1beta1:go_default_library",

View File

@ -0,0 +1,161 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package apimachinery
import (
"fmt"
apiextensionsv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
"k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
"k8s.io/apiextensions-apiserver/test/integration/testserver"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/dynamic"
"k8s.io/kubernetes/test/e2e/framework"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)
var _ = SIGDescribe("CustomResourceDefinition Watch", func() {
f := framework.NewDefaultFramework("crd-watch")
Context("CustomResourceDefinition Watch", func() {
/*
Testname: crd-watch
Description: Create a Custom Resource Definition and make sure
watches observe events on create/delete.
*/
It("watch on custom resource definition objects", func() {
framework.SkipUnlessServerVersionGTE(crdVersion, f.ClientSet.Discovery())
const (
watchCRNameA = "name1"
watchCRNameB = "name2"
)
config, err := framework.LoadConfig()
if err != nil {
framework.Failf("failed to load config: %v", err)
}
apiExtensionClient, err := clientset.NewForConfig(config)
if err != nil {
framework.Failf("failed to initialize apiExtensionClient: %v", err)
}
noxuDefinition := testserver.NewNoxuCustomResourceDefinition(apiextensionsv1beta1.ClusterScoped)
noxuVersionClient, err := testserver.CreateNewCustomResourceDefinition(noxuDefinition, apiExtensionClient, f.ClientPool)
if err != nil {
framework.Failf("failed to create CustomResourceDefinition: %v", err)
}
defer func() {
err = testserver.DeleteCustomResourceDefinition(noxuDefinition, apiExtensionClient)
if err != nil {
framework.Failf("failed to delete CustomResourceDefinition: %v", err)
}
}()
ns := ""
noxuResourceClient := newNamespacedCustomResourceClient(ns, noxuVersionClient, noxuDefinition)
watchA, err := watchCRWithName(noxuResourceClient, watchCRNameA)
Expect(err).NotTo(HaveOccurred())
watchB, err := watchCRWithName(noxuResourceClient, watchCRNameB)
Expect(err).NotTo(HaveOccurred())
testCrA := testserver.NewNoxuInstance(ns, watchCRNameA)
testCrB := testserver.NewNoxuInstance(ns, watchCRNameB)
By("Creating first CR ")
testCrA, err = instantiateCustomResource(testCrA, noxuResourceClient, noxuDefinition)
Expect(err).NotTo(HaveOccurred())
expectEvent(watchA, watch.Added, testCrA)
expectNoEvent(watchB, watch.Added, testCrA)
By("Creating second CR")
testCrB, err = instantiateCustomResource(testCrB, noxuResourceClient, noxuDefinition)
Expect(err).NotTo(HaveOccurred())
expectEvent(watchB, watch.Added, testCrB)
expectNoEvent(watchA, watch.Added, testCrB)
By("Deleting first CR")
err = deleteCustomResource(noxuResourceClient, watchCRNameA)
Expect(err).NotTo(HaveOccurred())
expectEvent(watchA, watch.Deleted, nil)
expectNoEvent(watchB, watch.Deleted, nil)
By("Deleting second CR")
err = deleteCustomResource(noxuResourceClient, watchCRNameB)
Expect(err).NotTo(HaveOccurred())
expectEvent(watchB, watch.Deleted, nil)
expectNoEvent(watchA, watch.Deleted, nil)
})
})
})
func watchCRWithName(crdResourceClient dynamic.ResourceInterface, name string) (watch.Interface, error) {
return crdResourceClient.Watch(
metav1.ListOptions{
FieldSelector: "metadata.name=" + name,
TimeoutSeconds: int64ptr(600),
},
)
}
func instantiateCustomResource(instanceToCreate *unstructured.Unstructured, client dynamic.ResourceInterface, definition *apiextensionsv1beta1.CustomResourceDefinition) (*unstructured.Unstructured, error) {
createdInstance, err := client.Create(instanceToCreate)
if err != nil {
return nil, err
}
createdObjectMeta, err := meta.Accessor(createdInstance)
if err != nil {
return nil, err
}
// it should have a UUID
if len(createdObjectMeta.GetUID()) == 0 {
return nil, fmt.Errorf("missing uuid: %#v", createdInstance)
}
createdTypeMeta, err := meta.TypeAccessor(createdInstance)
if err != nil {
return nil, err
}
if e, a := definition.Spec.Group+"/"+definition.Spec.Version, createdTypeMeta.GetAPIVersion(); e != a {
return nil, fmt.Errorf("expected %v, got %v", e, a)
}
if e, a := definition.Spec.Names.Kind, createdTypeMeta.GetKind(); e != a {
return nil, fmt.Errorf("expected %v, got %v", e, a)
}
return createdInstance, nil
}
func deleteCustomResource(client dynamic.ResourceInterface, name string) error {
return client.Delete(name, &metav1.DeleteOptions{})
}
func newNamespacedCustomResourceClient(ns string, client dynamic.Interface, definition *apiextensionsv1beta1.CustomResourceDefinition) dynamic.ResourceInterface {
return client.Resource(&metav1.APIResource{
Name: definition.Spec.Names.Plural,
Namespaced: definition.Spec.Scope == apiextensionsv1beta1.NamespaceScoped,
}, ns)
}

View File

@ -179,23 +179,28 @@ func updatePod(f *framework.Framework, name string, update updatePodFunc) (*v1.P
}
func expectEvent(w watch.Interface, eventType watch.EventType, object runtime.Object) {
if event, ok := waitForEvent(w, eventType, object); !ok {
if event, ok := waitForEvent(w, eventType, object, 1*time.Minute); !ok {
framework.Failf("Timed out waiting for expected event: %v", event)
}
}
func expectNoEvent(w watch.Interface, eventType watch.EventType, object runtime.Object) {
if event, ok := waitForEvent(w, eventType, object); ok {
if event, ok := waitForEvent(w, eventType, object, 10*time.Second); ok {
framework.Failf("Unexpected event occurred: %v", event)
}
}
func waitForEvent(w watch.Interface, expectType watch.EventType, expectObject runtime.Object) (watch.Event, bool) {
stopTimer := time.NewTimer(1 * time.Minute)
func waitForEvent(w watch.Interface, expectType watch.EventType, expectObject runtime.Object, duration time.Duration) (watch.Event, bool) {
stopTimer := time.NewTimer(duration)
defer stopTimer.Stop()
for {
select {
case actual := <-w.ResultChan():
case actual, ok := <-w.ResultChan():
if ok {
framework.Logf("Got : %v %v", actual.Type, actual.Object)
} else {
framework.Failf("Watch closed unexpectedly")
}
if expectType == actual.Type && (expectObject == nil || apiequality.Semantic.DeepEqual(expectObject, actual.Object)) {
return actual, true
}