diff --git a/staging/src/k8s.io/apimachinery/pkg/runtime/helper.go b/staging/src/k8s.io/apimachinery/pkg/runtime/helper.go index cc0a77bba6f..395dfdbd02d 100644 --- a/staging/src/k8s.io/apimachinery/pkg/runtime/helper.go +++ b/staging/src/k8s.io/apimachinery/pkg/runtime/helper.go @@ -284,3 +284,21 @@ func (e *encoderWithAllocator) Encode(obj Object, w io.Writer) error { func (e *encoderWithAllocator) Identifier() Identifier { return e.encoder.Identifier() } + +type nondeterministicEncoderToEncoderAdapter struct { + NondeterministicEncoder +} + +func (e nondeterministicEncoderToEncoderAdapter) Encode(obj Object, w io.Writer) error { + return e.EncodeNondeterministic(obj, w) +} + +// UseNondeterministicEncoding returns an Encoder that encodes objects using the provided Encoder's +// EncodeNondeterministic method if it implements NondeterministicEncoder, otherwise it returns the +// provided Encoder as-is. +func UseNondeterministicEncoding(encoder Encoder) Encoder { + if nondeterministic, ok := encoder.(NondeterministicEncoder); ok { + return nondeterministicEncoderToEncoderAdapter{nondeterministic} + } + return encoder +} diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/responsewriters/writers.go b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/responsewriters/writers.go index 218d18efa03..9b9e10ad179 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/responsewriters/writers.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/responsewriters/writers.go @@ -285,7 +285,12 @@ func WriteObjectNegotiated(s runtime.NegotiatedSerializer, restrictions negotiat audit.LogResponseObject(req.Context(), object, gv, s) - encoder := s.EncoderForVersion(serializer.Serializer, gv) + var encoder runtime.Encoder + if utilfeature.TestOnlyFeatureGate.Enabled(features.TestOnlyCBORServingAndStorage) { + encoder = s.EncoderForVersion(runtime.UseNondeterministicEncoding(serializer.Serializer), gv) + } else { + encoder = s.EncoderForVersion(serializer.Serializer, gv) + } request.TrackSerializeResponseObjectLatency(req.Context(), func() { if listGVKInContentType { SerializeObject(generateMediaTypeWithGVK(serializer.MediaType, mediaType.Convert), encoder, w, req, statusCode, object) diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/watch.go b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/watch.go index 7443e3d577a..938e8ef5b80 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/watch.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/watch.go @@ -76,8 +76,12 @@ func serveWatchHandler(watcher watch.Interface, scope *RequestScope, mediaTypeOp return nil, err } framer := serializer.StreamSerializer.Framer - streamSerializer := serializer.StreamSerializer.Serializer - encoder := scope.Serializer.EncoderForVersion(streamSerializer, scope.Kind.GroupVersion()) + var encoder runtime.Encoder + if utilfeature.TestOnlyFeatureGate.Enabled(features.TestOnlyCBORServingAndStorage) { + encoder = scope.Serializer.EncoderForVersion(runtime.UseNondeterministicEncoding(serializer.StreamSerializer.Serializer), scope.Kind.GroupVersion()) + } else { + encoder = scope.Serializer.EncoderForVersion(serializer.StreamSerializer.Serializer, scope.Kind.GroupVersion()) + } useTextFraming := serializer.EncodesAsText if framer == nil { return nil, fmt.Errorf("no framer defined for %q available for embedded encoding", serializer.MediaType) @@ -98,9 +102,17 @@ func serveWatchHandler(watcher watch.Interface, scope *RequestScope, mediaTypeOp if !ok { return nil, fmt.Errorf("no encoder for %q exists in the requested target %#v", serializer.MediaType, contentSerializer) } - negotiatedEncoder = contentSerializer.EncoderForVersion(info.Serializer, contentKind.GroupVersion()) + if utilfeature.TestOnlyFeatureGate.Enabled(features.TestOnlyCBORServingAndStorage) { + negotiatedEncoder = contentSerializer.EncoderForVersion(runtime.UseNondeterministicEncoding(info.Serializer), contentKind.GroupVersion()) + } else { + negotiatedEncoder = contentSerializer.EncoderForVersion(info.Serializer, contentKind.GroupVersion()) + } } else { - negotiatedEncoder = scope.Serializer.EncoderForVersion(serializer.Serializer, contentKind.GroupVersion()) + if utilfeature.TestOnlyFeatureGate.Enabled(features.TestOnlyCBORServingAndStorage) { + negotiatedEncoder = scope.Serializer.EncoderForVersion(runtime.UseNondeterministicEncoding(serializer.Serializer), contentKind.GroupVersion()) + } else { + negotiatedEncoder = scope.Serializer.EncoderForVersion(serializer.Serializer, contentKind.GroupVersion()) + } } var memoryAllocator runtime.MemoryAllocator diff --git a/test/integration/apiserver/cbor_test.go b/test/integration/apiserver/cbor_test.go new file mode 100644 index 00000000000..9368bd36b52 --- /dev/null +++ b/test/integration/apiserver/cbor_test.go @@ -0,0 +1,254 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package apiserver + +import ( + "bytes" + "context" + "fmt" + "testing" + "time" + + "github.com/google/go-cmp/cmp" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/equality" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/runtime/serializer/cbor" + "k8s.io/apimachinery/pkg/runtime/serializer/cbor/direct" + "k8s.io/apimachinery/pkg/runtime/serializer/streaming" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/kubernetes/scheme" + corev1client "k8s.io/client-go/kubernetes/typed/core/v1" + "k8s.io/client-go/rest" + kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing" + "k8s.io/kubernetes/test/integration/framework" + "k8s.io/utils/ptr" +) + +// TestNondeterministicResponseEncoding verifies that the encoding of response bodies to CBOR is not +// deterministic. Even in cases where encoding deterministically has no overhead, some randomness is +// introduced to prevent clients from inadvertently depending on deterministic encoding when it is +// not guaranteed. +func TestNondeterministicResponseEncoding(t *testing.T) { + // Nondeterministic map key order is not guaranteed to select each possible ordering with + // equal probability. In practice, since metav1.WatchEvent only has two fields, it does and + // the probability of either possible map key ordering is approximately 50%. The probability + // that this test flakes because a watch event was encoded the same way on every trial is + // about 2^-NTrials, so NTrials needs to be big enough to make sure that doesn't happen. + const NTrials = 40 + + framework.EnableCBORServingAndStorageForTest(t) + framework.SetTestOnlyCBORClientFeatureGatesForTest(t, true, true) + + server := kubeapiservertesting.StartTestServerOrDie(t, nil, framework.DefaultTestServerFlags(), framework.SharedEtcd()) + t.Cleanup(server.TearDownFn) + + config := rest.CopyConfig(server.ClientConfig) + config.AcceptContentTypes = "application/cbor" + client, err := corev1client.NewForConfig(config) + if err != nil { + t.Fatal(err) + } + + namespace, err := client.Namespaces().Create(context.TODO(), &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-nondeterministic-response-encoding", + Annotations: map[string]string{"hello": "world"}, + }, + }, metav1.CreateOptions{}) + if err != nil { + t.Fatal(err) + } + + // Compare pairs of "get" requests at the same resource version for encoding differences. + responseDiff := false + for i := 0; i < NTrials && !responseDiff; i++ { + request := client.RESTClient().Get().Resource("namespaces").Name(namespace.GetName()) + + // get at latest resource version + result := request.Do(context.TODO()) + raw1, err := result.Raw() + if err != nil { + t.Fatal(err) + } + if err := result.Into(namespace); err != nil { + t.Fatal(err) + } + + // get again at same resource version + trial := request.VersionedParams(&metav1.GetOptions{ResourceVersion: namespace.ResourceVersion}, scheme.ParameterCodec).Do(context.TODO()) + var trialObject corev1.Namespace + if err := trial.Into(&trialObject); err != nil { + if errors.IsResourceExpired(err) { + t.Logf("retrying: %v", err) + continue + } + t.Fatal(err) + } + if !equality.Semantic.DeepEqual(namespace, &trialObject) { + t.Fatalf("objects differed semantically between runs:\n%s", cmp.Diff(namespace, trialObject)) + } + raw2, err := trial.Raw() + if err != nil { + t.Fatal(err) + } + if !bytes.Equal(raw1, raw2) { + // Observed a response body that was not byte-for-byte the same as the first + // response body. + responseDiff = true + } + } + if !responseDiff { + t.Errorf("performed %d consecutive get requests to the same resource and observed identical response bodies each time", NTrials) + } + + // Induce a watch event, then compare the encoding of the induced event across pairs of + // watch requests. + eventDiff := false + objDiff := false + for i := 0; i < NTrials && (!eventDiff || !objDiff); i++ { + // Get latest to have a valid resource version to start watches from. + namespace, err := client.Namespaces().Get(context.TODO(), namespace.GetName(), metav1.GetOptions{}) + if err != nil { + t.Fatal(err) + } + + // Patch so that watchers will see a "modified" event. + patched, err := client.Namespaces().Patch(context.TODO(), namespace.GetName(), types.JSONPatchType, []byte(fmt.Sprintf(`[{"op":"add","path":"/metadata/annotations/foo","value":"%d"}]`, i)), metav1.PatchOptions{}) + if err != nil { + t.Fatal(err) + } + + request := client.RESTClient().Get().Resource("namespaces") + + // Get the raw bytes of the watch event induced by the patch plus the raw bytes of + // its embedded object. + getRawEventAndRawObject := func() ([]byte, []byte, error) { + ctx, cancel := context.WithTimeout(context.TODO(), 6*time.Second) + defer cancel() + rc, err := request. + VersionedParams(&metav1.ListOptions{ + ResourceVersion: namespace.ResourceVersion, + Watch: true, + TimeoutSeconds: ptr.To(int64(5)), + FieldSelector: fmt.Sprintf("metadata.name=%s", namespace.GetName()), + }, scheme.ParameterCodec). + Stream(ctx) + if err != nil { + return nil, nil, err + } + defer func() { + if err := rc.Close(); err != nil { + t.Error(err) + } + }() + d := &rawCapturingDecoder{delegate: cbor.NewSerializer(scheme.Scheme, scheme.Scheme, cbor.Transcode(false))} + sd := streaming.NewDecoder(rc, d) + for { + var event metav1.WatchEvent + got, _, err := sd.Decode(nil, &event) + if err != nil { + // Either the server timeout or client context timeout will + // cause an EOF here to terminate the loop if the expected + // event is never received. + t.Fatal(err) + } + if got != &event { + t.Fatalf("returned new object %#v (%T) instead of decoding into %T", got, got, &event) + } + var u map[string]interface{} + if err := direct.Unmarshal(event.Object.Raw, &u); err != nil { + t.Fatal(err) + } + rv, ok, err := unstructured.NestedString(u, "metadata", "resourceVersion") + if err != nil { + t.Fatalf("failed to get resourceVersion from watch event: %v", err) + } + if !ok { + t.Fatal("watch event missing resource version") + } + if rv == patched.ResourceVersion { + return d.raw, event.Object.Raw, nil + } + } + } + + event1, raw1, err := getRawEventAndRawObject() + if err != nil { + if errors.IsResourceExpired(err) { + t.Logf("retrying: %v", err) + continue + } + t.Fatal(err) + } + var obj1 map[string]interface{} + if err := direct.Unmarshal(raw1, &obj1); err != nil { + t.Fatal(err) + } + + event2, raw2, err := getRawEventAndRawObject() + if err != nil { + if errors.IsResourceExpired(err) { + t.Logf("retrying: %v", err) + continue + } + t.Fatal(err) + } + var obj2 map[string]interface{} + if err := direct.Unmarshal(raw2, &obj2); err != nil { + t.Fatal(err) + } + + if !equality.Semantic.DeepEqual(obj1, obj2) { + t.Fatalf("objects differed semantically between runs:\n%s", cmp.Diff(obj1, obj2)) + } + + if !bytes.Equal(raw1, raw2) { + objDiff = true + } + + // Cut out the embedded object so that we can observe that the watch event itself is + // encoded nondeterministically rather than simply embedding a nondeterministic + // object encoding. + event1 = bytes.Replace(event1, raw1, nil, 1) + event2 = bytes.Replace(event2, raw2, nil, 1) + if !bytes.Equal(event1, event2) { + eventDiff = true + } + } + if !eventDiff { + t.Errorf("watch event encoded identically over %d consecutive watch requests", NTrials) + } + if !objDiff { + t.Errorf("watch event embedded object encoded identically over %d consecutive watch requests", NTrials) + } +} + +type rawCapturingDecoder struct { + raw []byte + delegate runtime.Decoder +} + +func (d *rawCapturingDecoder) Decode(data []byte, defaults *schema.GroupVersionKind, into runtime.Object) (runtime.Object, *schema.GroupVersionKind, error) { + d.raw = append([]byte(nil), data...) + return d.delegate.Decode(data, defaults, into) +}