diff --git a/staging/src/k8s.io/client-go/rest/client.go b/staging/src/k8s.io/client-go/rest/client.go index c864d1ec79c..b98d0276479 100644 --- a/staging/src/k8s.io/client-go/rest/client.go +++ b/staging/src/k8s.io/client-go/rest/client.go @@ -17,6 +17,8 @@ limitations under the License. package rest import ( + "fmt" + "mime" "net/http" "net/url" "os" @@ -24,9 +26,11 @@ import ( "strings" "time" + "github.com/munnerz/goautoneg" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" + clientfeatures "k8s.io/client-go/features" "k8s.io/client-go/util/flowcontrol" ) @@ -115,7 +119,7 @@ func NewRESTClient(baseURL *url.URL, versionedAPIPath string, config ClientConte return &RESTClient{ base: &base, versionedAPIPath: versionedAPIPath, - content: config, + content: scrubCBORContentConfigIfDisabled(config), createBackoffMgr: readExpBackoffConfig, rateLimiter: rateLimiter, @@ -123,6 +127,45 @@ func NewRESTClient(baseURL *url.URL, versionedAPIPath string, config ClientConte }, nil } +func scrubCBORContentConfigIfDisabled(content ClientContentConfig) ClientContentConfig { + if clientfeatures.TestOnlyFeatureGates.Enabled(clientfeatures.TestOnlyClientAllowsCBOR) { + return content + } + + if mediatype, _, err := mime.ParseMediaType(content.ContentType); err == nil && mediatype == "application/cbor" { + content.ContentType = "application/json" + } + + clauses := goautoneg.ParseAccept(content.AcceptContentTypes) + scrubbed := false + for i, clause := range clauses { + if clause.Type == "application" && clause.SubType == "cbor" { + scrubbed = true + clauses[i].SubType = "json" + } + } + if !scrubbed { + // No application/cbor in AcceptContentTypes, nothing more to do. + return content + } + + parts := make([]string, 0, len(clauses)) + for _, clause := range clauses { + // ParseAccept does not store the parameter "q" in Params. + params := clause.Params + if clause.Q < 1 { // omit q=1, it's the default + if params == nil { + params = make(map[string]string, 1) + } + params["q"] = strconv.FormatFloat(clause.Q, 'g', 3, 32) + } + parts = append(parts, mime.FormatMediaType(fmt.Sprintf("%s/%s", clause.Type, clause.SubType), params)) + } + content.AcceptContentTypes = strings.Join(parts, ",") + + return content +} + // GetRateLimiter returns rate limiter for a given client, or nil if it's called on a nil client func (c *RESTClient) GetRateLimiter() flowcontrol.RateLimiter { if c == nil { diff --git a/staging/src/k8s.io/client-go/rest/request.go b/staging/src/k8s.io/client-go/rest/request.go index 2f325ecd628..765b897d88f 100644 --- a/staging/src/k8s.io/client-go/rest/request.go +++ b/staging/src/k8s.io/client-go/rest/request.go @@ -160,6 +160,9 @@ func NewRequest(c *RESTClient) *Request { contentTypeNotSet := len(contentConfig.ContentType) == 0 if contentTypeNotSet { contentConfig.ContentType = "application/json" + if clientfeatures.TestOnlyFeatureGates.Enabled(clientfeatures.TestOnlyClientAllowsCBOR) && clientfeatures.TestOnlyFeatureGates.Enabled(clientfeatures.TestOnlyClientPrefersCBOR) { + contentConfig.ContentType = "application/cbor" + } } r := &Request{ diff --git a/test/integration/client/client_test.go b/test/integration/client/client_test.go index 27786c49a81..04e47ddd74b 100644 --- a/test/integration/client/client_test.go +++ b/test/integration/client/client_test.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "log" + "net/http" "reflect" rt "runtime" "strings" @@ -28,9 +29,12 @@ import ( "time" "github.com/google/go-cmp/cmp" + appsv1 "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" eventsv1 "k8s.io/api/events/v1" + apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" + apiextensionsv1client "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/typed/apiextensions/v1" "k8s.io/apimachinery/pkg/api/equality" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/resource" @@ -45,14 +49,22 @@ import ( appsv1ac "k8s.io/client-go/applyconfigurations/apps/v1" corev1ac "k8s.io/client-go/applyconfigurations/core/v1" metav1ac "k8s.io/client-go/applyconfigurations/meta/v1" + "k8s.io/client-go/discovery" + "k8s.io/client-go/gentype" clientset "k8s.io/client-go/kubernetes" - "k8s.io/utils/pointer" - + clientscheme "k8s.io/client-go/kubernetes/scheme" + corev1client "k8s.io/client-go/kubernetes/typed/core/v1" + "k8s.io/client-go/rest" + "k8s.io/client-go/util/retry" utilversion "k8s.io/component-base/version" kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing" "k8s.io/kubernetes/pkg/api/legacyscheme" "k8s.io/kubernetes/test/integration/framework" imageutils "k8s.io/kubernetes/test/utils/image" + "k8s.io/kubernetes/test/utils/ktesting" + wardlev1alpha1 "k8s.io/sample-apiserver/pkg/apis/wardle/v1alpha1" + wardlev1alpha1client "k8s.io/sample-apiserver/pkg/generated/clientset/versioned/typed/wardle/v1alpha1" + "k8s.io/utils/ptr" ) func TestClient(t *testing.T) { @@ -147,7 +159,7 @@ func TestAtomicPut(t *testing.T) { }, }, Spec: v1.ReplicationControllerSpec{ - Replicas: pointer.Int32(0), + Replicas: ptr.To(int32(0)), Selector: map[string]string{ "foo": "bar", }, @@ -1347,3 +1359,494 @@ func TestExtractModifyApply_ForceOwnership(t *testing.T) { t.Errorf("createMgrExtracted apply configuration did not match expected, got:\n%s\n", cmp.Diff(expectedCreateExtracted, createMgrExtracted)) } } + +func TestGeneratedClientCBOREnablement(t *testing.T) { + // Generated clients for built-in types force Protobuf by default. They are tested here to + // ensure that the CBOR client feature gates do not interfere with this. + DoRequestWithProtobufPreferredGeneratedClient := func(t *testing.T, config *rest.Config) error { + clientset, err := clientset.NewForConfig(config) + if err != nil { + t.Fatal(err) + } + + _, err = clientset.CoreV1().Namespaces().Create( + context.TODO(), + &v1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-generated-client-cbor-enablement", + }, + }, + metav1.CreateOptions{DryRun: []string{metav1.DryRunAll}}, + ) + return err + } + + DoRequestWithGeneratedClient := func(t *testing.T, config *rest.Config) error { + // This is using a generated client from sample-apiserver because it is generated + // without --prefer-protobuf. For convenience, the test serves the API as a CRD with + // a permissive schema instead of running a real aggregated sample-apiserver. + wardleClient, err := wardlev1alpha1client.NewForConfig(config) + if err != nil { + t.Fatal(err) + } + + _, err = wardleClient.Fischers().Create( + context.TODO(), + &wardlev1alpha1.Fischer{ + ObjectMeta: metav1.ObjectMeta{Name: "test-generated-client-cbor-enablement"}, + }, + metav1.CreateOptions{DryRun: []string{metav1.DryRunAll}}, + ) + return err + } + + type testCase struct { + name string + served bool + allowed bool + preferred bool + configuredContentType string + configuredAccept string + wantRequestContentType string + wantRequestAccept string + wantResponseContentType string + wantResponseStatus int + wantStatusError bool + doRequest func(t *testing.T, config *rest.Config) error + } + + testCases := []testCase{ + { + name: "cbor allowed and preferred client forces protobuf", + served: true, + allowed: true, + preferred: true, + wantRequestContentType: "application/vnd.kubernetes.protobuf", + wantRequestAccept: "application/vnd.kubernetes.protobuf,application/json", + wantResponseContentType: "application/vnd.kubernetes.protobuf", + wantResponseStatus: http.StatusCreated, + wantStatusError: false, + doRequest: DoRequestWithProtobufPreferredGeneratedClient, + }, + { + name: "cbor allowed and not preferred client forces protobuf", + served: true, + allowed: true, + preferred: false, + wantRequestContentType: "application/vnd.kubernetes.protobuf", + wantRequestAccept: "application/vnd.kubernetes.protobuf,application/json", + wantResponseContentType: "application/vnd.kubernetes.protobuf", + wantResponseStatus: http.StatusCreated, + wantStatusError: false, + doRequest: DoRequestWithProtobufPreferredGeneratedClient, + }, + { + name: "cbor not allowed and not preferred client forces protobuf", + served: true, + allowed: false, + preferred: false, + wantRequestContentType: "application/vnd.kubernetes.protobuf", + wantRequestAccept: "application/vnd.kubernetes.protobuf,application/json", + wantResponseContentType: "application/vnd.kubernetes.protobuf", + wantResponseStatus: http.StatusCreated, + wantStatusError: false, + doRequest: DoRequestWithProtobufPreferredGeneratedClient, + }, + { + name: "cbor not allowed and preferred client forces protobuf", + served: true, + allowed: false, + preferred: true, + wantRequestContentType: "application/vnd.kubernetes.protobuf", + wantRequestAccept: "application/vnd.kubernetes.protobuf,application/json", + wantResponseContentType: "application/vnd.kubernetes.protobuf", + wantResponseStatus: http.StatusCreated, + wantStatusError: false, + doRequest: DoRequestWithProtobufPreferredGeneratedClient, + }, + { + name: "fully disabled", + served: true, + allowed: false, + preferred: false, + wantRequestContentType: "application/json", + wantRequestAccept: "application/json, */*", + wantResponseContentType: "application/json", + wantResponseStatus: http.StatusCreated, + wantStatusError: false, + doRequest: DoRequestWithGeneratedClient, + }, + { + name: "send json accept both get json", + served: true, + allowed: true, + preferred: false, + wantRequestContentType: "application/json", + wantRequestAccept: "application/json, */*", + wantResponseContentType: "application/json", + wantResponseStatus: http.StatusCreated, + wantStatusError: false, + doRequest: DoRequestWithGeneratedClient, + }, + { + name: "send json accept both get json", + served: false, + allowed: true, + preferred: false, + wantRequestContentType: "application/json", + wantRequestAccept: "application/json, */*", + wantResponseContentType: "application/json", + wantResponseStatus: http.StatusCreated, + wantStatusError: false, + doRequest: DoRequestWithGeneratedClient, + }, + { + name: "send cbor accept both get cbor", + served: true, + allowed: true, + preferred: true, + wantRequestContentType: "application/cbor", + wantRequestAccept: "application/cbor, */*", + wantResponseContentType: "application/cbor", + wantResponseStatus: http.StatusCreated, + wantStatusError: false, + doRequest: DoRequestWithGeneratedClient, + }, + { + name: "send cbor accept both get 415", + served: false, + allowed: true, + preferred: true, + wantRequestContentType: "application/cbor", + wantRequestAccept: "application/cbor, */*", + wantResponseContentType: "application/json", + wantResponseStatus: http.StatusUnsupportedMediaType, + wantStatusError: true, + doRequest: DoRequestWithGeneratedClient, + }, + { + name: "both gates required to send cbor", + served: true, + allowed: false, + preferred: true, + wantRequestContentType: "application/json", + wantRequestAccept: "application/json, */*", + wantResponseContentType: "application/json", + wantResponseStatus: http.StatusCreated, + wantStatusError: false, + doRequest: DoRequestWithGeneratedClient, + }, + { + name: "actively configured cbor", + served: true, + allowed: true, + preferred: false, + configuredContentType: "application/cbor", + configuredAccept: "application/cbor", + wantRequestContentType: "application/cbor", + wantRequestAccept: "application/cbor", + wantResponseContentType: "application/cbor", + wantResponseStatus: http.StatusCreated, + wantStatusError: false, + doRequest: DoRequestWithGeneratedClient, + }, + { + name: "force disable actively configured cbor", + served: true, + allowed: false, + preferred: false, + configuredContentType: "application/cbor", + configuredAccept: "application/cbor", + wantRequestContentType: "application/json", + wantRequestAccept: "application/json", + wantResponseContentType: "application/json", + wantResponseStatus: http.StatusCreated, + wantStatusError: false, + doRequest: DoRequestWithGeneratedClient, + }, + { + name: "actively configured cbor with two accepted media types", + served: true, + allowed: true, + preferred: false, + configuredContentType: "application/cbor", + configuredAccept: "application/cbor;q=0.9,example/foo;q=0.8", + wantRequestContentType: "application/cbor", + wantRequestAccept: "application/cbor;q=0.9,example/foo;q=0.8", + wantResponseContentType: "application/cbor", + wantResponseStatus: http.StatusCreated, + wantStatusError: false, + doRequest: DoRequestWithGeneratedClient, + }, + { + name: "force disable actively configured cbor with two accepted media types", + served: true, + allowed: false, + preferred: false, + configuredContentType: "application/cbor", + configuredAccept: "application/cbor;q=0.9,example/foo;q=0.8", + wantRequestContentType: "application/json", + wantRequestAccept: "application/json; q=0.9,example/foo; q=0.8", + wantResponseContentType: "application/json", + wantResponseStatus: http.StatusCreated, + wantStatusError: false, + doRequest: DoRequestWithGeneratedClient, + }, + } + + for _, served := range []bool{true, false} { + t.Run(fmt.Sprintf("served=%t", served), func(t *testing.T) { + // Batch test cases with their server configuration instead of starting and stopping + // a new apiserver for each test case. + if served { + framework.EnableCBORServingAndStorageForTest(t) + } + + server := kubeapiservertesting.StartTestServerOrDie(t, nil, framework.DefaultTestServerFlags(), framework.SharedEtcd()) + defer server.TearDownFn() + + apiextensionsClient, err := apiextensionsv1client.NewForConfig(server.ClientConfig) + if err != nil { + t.Fatal(err) + } + crd, err := apiextensionsClient.CustomResourceDefinitions().Create( + context.TODO(), + &apiextensionsv1.CustomResourceDefinition{ + ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("fischers.%s", wardlev1alpha1.SchemeGroupVersion.Group)}, + Spec: apiextensionsv1.CustomResourceDefinitionSpec{ + Group: wardlev1alpha1.SchemeGroupVersion.Group, + Versions: []apiextensionsv1.CustomResourceDefinitionVersion{{ + Name: wardlev1alpha1.SchemeGroupVersion.Version, + Served: true, + Storage: true, + Schema: &apiextensionsv1.CustomResourceValidation{ + OpenAPIV3Schema: &apiextensionsv1.JSONSchemaProps{ + XPreserveUnknownFields: ptr.To(true), + Type: "object", + }, + }, + }}, + Names: apiextensionsv1.CustomResourceDefinitionNames{ + Plural: "fischers", + Singular: "fischer", + Kind: "Fischer", + ListKind: "FischerList", + }, + Scope: apiextensionsv1.ClusterScoped, + }, + }, + metav1.CreateOptions{}, + ) + if err != nil { + t.Fatal(err) + } + + // wait to see cr in discovery + discoveryClient, err := discovery.NewDiscoveryClientForConfig(server.ClientConfig) + if err != nil { + t.Fatal(err) + } + if err := wait.PollUntilContextTimeout(context.TODO(), 100*time.Millisecond, 5*time.Second, true, func(context.Context) (done bool, err error) { + resources, err := discoveryClient.ServerResourcesForGroupVersion(wardlev1alpha1.SchemeGroupVersion.String()) + if err != nil { + if apierrors.IsNotFound(err) { + return false, nil + } + return false, err + } + for _, resource := range resources.APIResources { + if resource.Name == crd.Spec.Names.Plural { + return true, nil + } + } + return false, nil + }); err != nil { + t.Fatal(err) + } + + for _, tc := range testCases { + if tc.served != served { + continue + } + + t.Run(tc.name, func(t *testing.T) { + framework.SetTestOnlyCBORClientFeatureGatesForTest(t, tc.allowed, tc.preferred) + + config := rest.CopyConfig(server.ClientConfig) + config.ContentType = tc.configuredContentType + config.AcceptContentTypes = tc.configuredAccept + config.Wrap(func(rt http.RoundTripper) http.RoundTripper { + return roundTripperFunc(func(request *http.Request) (*http.Response, error) { + response, err := rt.RoundTrip(request) + if got := response.Request.Header.Get("Content-Type"); got != tc.wantRequestContentType { + t.Errorf("want request content type %q, got %q", tc.wantRequestContentType, got) + } + if got := response.Request.Header.Get("Accept"); got != tc.wantRequestAccept { + t.Errorf("want request accept %q, got %q", tc.wantRequestAccept, got) + } + if got := response.Header.Get("Content-Type"); got != tc.wantResponseContentType { + t.Errorf("want response content type %q, got %q", tc.wantResponseContentType, got) + } + if got := response.StatusCode; got != tc.wantResponseStatus { + t.Errorf("want response status %d, got %d", tc.wantResponseStatus, got) + } + return response, err + }) + }) + + err := tc.doRequest(t, config) + switch { + case tc.wantStatusError && apierrors.IsUnsupportedMediaType(err): + // ok + case !tc.wantStatusError && err == nil: + // ok + default: + t.Errorf("unexpected error: %v", err) + } + }) + } + }) + } +} + +func TestCBORWithTypedClient(t *testing.T) { + ktesting.SetDefaultVerbosity(10) // todo + + framework.EnableCBORServingAndStorageForTest(t) + framework.SetTestOnlyCBORClientFeatureGatesForTest(t, true, true) + + server := kubeapiservertesting.StartTestServerOrDie(t, nil, framework.DefaultTestServerFlags(), framework.SharedEtcd()) + t.Cleanup(server.TearDownFn) + + const TestNamespace = "test-cbor-typed-client" + + { + // Setup using client with default config. + clientset, err := clientset.NewForConfig(server.ClientConfig) + if err != nil { + t.Fatal(err) + } + t.Cleanup(func() { + if err := clientset.CoreV1().Namespaces().Delete(context.TODO(), TestNamespace, metav1.DeleteOptions{}); err != nil && !apierrors.IsNotFound(err) { + t.Fatal(err) + } + }) + if _, err := clientset.CoreV1().Namespaces().Create(context.TODO(), &v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: TestNamespace}}, metav1.CreateOptions{}); err != nil { + t.Fatal(err) + } + } + + config := rest.CopyConfig(server.ClientConfig) + // Content negotiation controlled by client feature gates. + config.ContentType = "" + config.AcceptContentTypes = "" + config.Wrap(framework.AssertRequestResponseAsCBOR(t)) + clientset, err := clientset.NewForConfig(config) + if err != nil { + t.Fatal(err) + } + + // Should be identical to + // https://github.com/kubernetes/kubernetes/blob/9ec52fc06395e6ac2fd7a947d6b9fbd3f1bbacb3/staging/src/k8s.io/client-go/kubernetes/typed/core/v1/namespace.go#L64-L72 + // minus the PrefersProtobuf option, which overrides content negotiation to Protobuf on a + // per-request basis. + var secretClient corev1client.SecretInterface = gentype.NewClientWithListAndApply[*v1.Secret, *v1.SecretList, *corev1ac.SecretApplyConfiguration]( + "secrets", + clientset.CoreV1().RESTClient(), + clientscheme.ParameterCodec, + TestNamespace, + func() *v1.Secret { return &v1.Secret{} }, + func() *v1.SecretList { return &v1.SecretList{} }, + ) + + secret, err := secretClient.Create(context.TODO(), &v1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-secret", + }, + }, metav1.CreateOptions{}) + if err != nil { + t.Fatal(err) + } + + w, err := secretClient.Watch(context.TODO(), metav1.ListOptions{ResourceVersion: secret.ResourceVersion, FieldSelector: fmt.Sprintf("metadata.name=%s", secret.GetName())}) + if err != nil { + t.Fatal(err) + } + defer w.Stop() + + // do a real update to observe a watch event + if err := retry.RetryOnConflict(retry.DefaultBackoff, func() error { + s, err := secretClient.Get(context.TODO(), secret.GetName(), metav1.GetOptions{}) + if err != nil { + return err + } + if s.Annotations == nil { + s.Annotations = map[string]string{} + } + s.Annotations["foo"] = "bar" + _, err = secretClient.Update(context.TODO(), s, metav1.UpdateOptions{}) + return err + }); err != nil { + t.Fatal(err) + } + + var seen bool + timeout := time.After(5 * time.Second) + for !seen { + select { + case e, ok := <-w.ResultChan(): + if !ok { + t.Fatal("watch closed without receiving expected event") + } + + if e.Type == watch.Error { + t.Fatalf("watch received unexpected error event: %v", apierrors.FromObject(e.Object)) + } + + if ns, ok := e.Object.(*v1.Secret); ok && ns.GetAnnotations()["foo"] == "bar" { + // observed update + seen = true + break + } + case <-timeout: + t.Fatal("timed out waiting for event") + } + } + + if err := secretClient.Delete(context.TODO(), secret.GetName(), metav1.DeleteOptions{DryRun: []string{metav1.DryRunAll}}); err != nil { + t.Fatal(err) + } + + if err := secretClient.DeleteCollection(context.TODO(), metav1.DeleteOptions{}, metav1.ListOptions{LabelSelector: "a,!a"}); err != nil { + t.Fatal(err) + } + + if _, err := secretClient.Get(context.TODO(), secret.GetName(), metav1.GetOptions{}); err != nil { + t.Fatal(err) + } + + if _, err := secretClient.List(context.TODO(), metav1.ListOptions{}); err != nil { + t.Fatal(err) + } + + // for UpdateStatus + nsClient := gentype.NewClientWithListAndApply[*v1.Namespace, *v1.NamespaceList, *corev1ac.NamespaceApplyConfiguration]( + "namespaces", + clientset.CoreV1().RESTClient(), + clientscheme.ParameterCodec, + "", + func() *v1.Namespace { return &v1.Namespace{} }, + func() *v1.NamespaceList { return &v1.NamespaceList{} }, + ) + + if err := retry.RetryOnConflict(retry.DefaultBackoff, func() error { + ns, err := nsClient.Get(context.TODO(), TestNamespace, metav1.GetOptions{}) + if err != nil { + return err + } + _, err = nsClient.UpdateStatus(context.TODO(), ns, metav1.UpdateOptions{DryRun: []string{metav1.DryRunAll}}) + return err + }); err != nil { + t.Fatal(err) + } +}