diff --git a/test/integration/apiserver/apiserver_test.go b/test/integration/apiserver/apiserver_test.go index 13a46da2cb5..816220ee3e9 100644 --- a/test/integration/apiserver/apiserver_test.go +++ b/test/integration/apiserver/apiserver_test.go @@ -37,6 +37,7 @@ import ( apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" apiextensionsclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" "k8s.io/apiextensions-apiserver/test/integration/fixtures" + apiequality "k8s.io/apimachinery/pkg/api/equality" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" metainternalversionscheme "k8s.io/apimachinery/pkg/apis/meta/internalversion/scheme" @@ -2307,6 +2308,142 @@ func TestTransform(t *testing.T) { } } +func TestWatchTransformCaching(t *testing.T) { + ctx, clientSet, _, tearDownFn := setup(t) + defer tearDownFn() + + ns := framework.CreateNamespaceOrDie(clientSet, "watch-transform", t) + defer framework.DeleteNamespaceOrDie(clientSet, ns, t) + + list, err := clientSet.CoreV1().ConfigMaps(ns.Name).List(ctx, metav1.ListOptions{}) + if err != nil { + t.Fatalf("Failed to list objects: %v", err) + } + + timeout := 30 * time.Second + listOptions := &metav1.ListOptions{ + ResourceVersion: list.ResourceVersion, + Watch: true, + } + + wMeta, err := clientSet.CoreV1().RESTClient().Get(). + AbsPath("/api/v1/namespaces/watch-transform/configmaps"). + SetHeader("Accept", "application/vnd.kubernetes.protobuf;as=PartialObjectMetadata;g=meta.k8s.io;v=v1beta1"). + VersionedParams(listOptions, metav1.ParameterCodec). + Timeout(timeout). + Stream(ctx) + if err != nil { + t.Fatalf("Failed to start meta watch: %v", err) + } + defer wMeta.Close() + + wTableIncludeMeta, err := clientSet.CoreV1().RESTClient().Get(). + AbsPath("/api/v1/namespaces/watch-transform/configmaps"). + SetHeader("Accept", "application/json;as=Table;g=meta.k8s.io;v=v1beta1"). + VersionedParams(listOptions, metav1.ParameterCodec). + Param("includeObject", string(metav1.IncludeMetadata)). + Timeout(timeout). + Stream(ctx) + if err != nil { + t.Fatalf("Failed to start table meta watch: %v", err) + } + defer wTableIncludeMeta.Close() + + wTableIncludeObject, err := clientSet.CoreV1().RESTClient().Get(). + AbsPath("/api/v1/namespaces/watch-transform/configmaps"). + SetHeader("Accept", "application/json;as=Table;g=meta.k8s.io;v=v1beta1"). + VersionedParams(listOptions, metav1.ParameterCodec). + Param("includeObject", string(metav1.IncludeObject)). + Timeout(timeout). + Stream(ctx) + if err != nil { + t.Fatalf("Failed to start table object watch: %v", err) + } + defer wTableIncludeObject.Close() + + configMap, err := clientSet.CoreV1().ConfigMaps("watch-transform").Create(ctx, &v1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{Name: "test1"}, + Data: map[string]string{ + "foo": "bar", + }, + }, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("Failed to create a configMap: %v", err) + } + + listOptionsDelayed := &metav1.ListOptions{ + ResourceVersion: configMap.ResourceVersion, + Watch: true, + } + wTableIncludeObjectDelayed, err := clientSet.CoreV1().RESTClient().Get(). + AbsPath("/api/v1/namespaces/watch-transform/configmaps"). + SetHeader("Accept", "application/json;as=Table;g=meta.k8s.io;v=v1beta1"). + VersionedParams(listOptionsDelayed, metav1.ParameterCodec). + Param("includeObject", string(metav1.IncludeObject)). + Timeout(timeout). + Stream(ctx) + if err != nil { + t.Fatalf("Failed to start table object watch: %v", err) + } + defer wTableIncludeObjectDelayed.Close() + + configMap2, err := clientSet.CoreV1().ConfigMaps("watch-transform").Create(ctx, &v1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{Name: "test2"}, + Data: map[string]string{ + "foo": "bar", + }, + }, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("Failed to create a second configMap: %v", err) + } + + metaChecks := []partialObjectMetadataCheck{ + func(res *metav1beta1.PartialObjectMetadata) { + if !apiequality.Semantic.DeepEqual(configMap.ObjectMeta, res.ObjectMeta) { + t.Errorf("expected object: %#v, got: %#v", configMap.ObjectMeta, res.ObjectMeta) + } + }, + func(res *metav1beta1.PartialObjectMetadata) { + if !apiequality.Semantic.DeepEqual(configMap2.ObjectMeta, res.ObjectMeta) { + t.Errorf("expected object: %#v, got: %#v", configMap2.ObjectMeta, res.ObjectMeta) + } + }, + } + expectPartialObjectMetaEventsProtobufChecks(t, wMeta, metaChecks) + + tableMetaCheck := func(expected *v1.ConfigMap, got []byte) { + var obj metav1.PartialObjectMetadata + if err := json.Unmarshal(got, &obj); err != nil { + t.Fatal(err) + } + if !apiequality.Semantic.DeepEqual(expected.ObjectMeta, obj.ObjectMeta) { + t.Errorf("expected object: %#v, got: %#v", expected, obj) + } + } + + objectMetas := expectTableWatchEvents(t, 2, 3, metav1.IncludeMetadata, json.NewDecoder(wTableIncludeMeta)) + tableMetaCheck(configMap, objectMetas[0]) + tableMetaCheck(configMap2, objectMetas[1]) + + tableObjectCheck := func(expected *v1.ConfigMap, got []byte) { + var obj *v1.ConfigMap + if err := json.Unmarshal(got, &obj); err != nil { + t.Fatal(err) + } + obj.TypeMeta = metav1.TypeMeta{} + if !apiequality.Semantic.DeepEqual(expected, obj) { + t.Errorf("expected object: %#v, got: %#v", expected, obj) + } + } + + objects := expectTableWatchEvents(t, 2, 3, metav1.IncludeObject, json.NewDecoder(wTableIncludeObject)) + tableObjectCheck(configMap, objects[0]) + tableObjectCheck(configMap2, objects[1]) + + delayedObjects := expectTableWatchEvents(t, 1, 3, metav1.IncludeObject, json.NewDecoder(wTableIncludeObjectDelayed)) + tableObjectCheck(configMap2, delayedObjects[0]) +} + func expectTableWatchEvents(t *testing.T, count, columns int, policy metav1.IncludeObjectPolicy, d *json.Decoder) [][]byte { t.Helper() @@ -2347,6 +2484,7 @@ func expectTableWatchEvents(t *testing.T, count, columns int, policy metav1.Incl if meta.TypeMeta != partialObj { t.Fatalf("expected partial object: %#v", meta) } + objects = append(objects, row.Object.Raw) case metav1.IncludeNone: if len(row.Object.Raw) != 0 { t.Fatalf("Expected no object: %s", string(row.Object.Raw)) @@ -2383,7 +2521,22 @@ func expectPartialObjectMetaEvents(t *testing.T, d *json.Decoder, values ...stri } } +type partialObjectMetadataCheck func(*metav1beta1.PartialObjectMetadata) + func expectPartialObjectMetaEventsProtobuf(t *testing.T, r io.Reader, values ...string) { + checks := []partialObjectMetadataCheck{} + for i, value := range values { + i, value := i, value + checks = append(checks, func(meta *metav1beta1.PartialObjectMetadata) { + if meta.Annotations["test"] != value { + t.Fatalf("expected event %d to have value %q instead of %q", i+1, value, meta.Annotations["test"]) + } + }) + } + expectPartialObjectMetaEventsProtobufChecks(t, r, checks) +} + +func expectPartialObjectMetaEventsProtobufChecks(t *testing.T, r io.Reader, checks []partialObjectMetadataCheck) { scheme := runtime.NewScheme() metav1.AddToGroupVersion(scheme, schema.GroupVersion{Version: "v1"}) rs := protobuf.NewRawSerializer(scheme, scheme) @@ -2393,7 +2546,7 @@ func expectPartialObjectMetaEventsProtobuf(t *testing.T, r io.Reader, values ... ) ds := metainternalversionscheme.Codecs.UniversalDeserializer() - for i, value := range values { + for _, check := range checks { var evt metav1.WatchEvent if _, _, err := d.Decode(nil, &evt); err != nil { t.Fatal(err) @@ -2410,9 +2563,7 @@ func expectPartialObjectMetaEventsProtobuf(t *testing.T, r io.Reader, values ... if !reflect.DeepEqual(expected, gvk) { t.Fatalf("expected partial object: %#v", meta) } - if meta.Annotations["test"] != value { - t.Fatalf("expected event %d to have value %q instead of %q", i+1, value, meta.Annotations["test"]) - } + check(meta) } }