Wire client feature gates affecting RESTClient content config.

This commit is contained in:
Ben Luddy 2024-10-25 11:50:01 -04:00
parent 8fb9622b88
commit 67b9dc1f3e
No known key found for this signature in database
GPG Key ID: A6551E73A5974C30
3 changed files with 553 additions and 4 deletions

View File

@ -17,6 +17,8 @@ limitations under the License.
package rest
import (
"fmt"
"mime"
"net/http"
"net/url"
"os"
@ -24,9 +26,11 @@ import (
"strings"
"time"
"github.com/munnerz/goautoneg"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
clientfeatures "k8s.io/client-go/features"
"k8s.io/client-go/util/flowcontrol"
)
@ -115,7 +119,7 @@ func NewRESTClient(baseURL *url.URL, versionedAPIPath string, config ClientConte
return &RESTClient{
base: &base,
versionedAPIPath: versionedAPIPath,
content: config,
content: scrubCBORContentConfigIfDisabled(config),
createBackoffMgr: readExpBackoffConfig,
rateLimiter: rateLimiter,
@ -123,6 +127,45 @@ func NewRESTClient(baseURL *url.URL, versionedAPIPath string, config ClientConte
}, nil
}
func scrubCBORContentConfigIfDisabled(content ClientContentConfig) ClientContentConfig {
if clientfeatures.TestOnlyFeatureGates.Enabled(clientfeatures.TestOnlyClientAllowsCBOR) {
return content
}
if mediatype, _, err := mime.ParseMediaType(content.ContentType); err == nil && mediatype == "application/cbor" {
content.ContentType = "application/json"
}
clauses := goautoneg.ParseAccept(content.AcceptContentTypes)
scrubbed := false
for i, clause := range clauses {
if clause.Type == "application" && clause.SubType == "cbor" {
scrubbed = true
clauses[i].SubType = "json"
}
}
if !scrubbed {
// No application/cbor in AcceptContentTypes, nothing more to do.
return content
}
parts := make([]string, 0, len(clauses))
for _, clause := range clauses {
// ParseAccept does not store the parameter "q" in Params.
params := clause.Params
if clause.Q < 1 { // omit q=1, it's the default
if params == nil {
params = make(map[string]string, 1)
}
params["q"] = strconv.FormatFloat(clause.Q, 'g', 3, 32)
}
parts = append(parts, mime.FormatMediaType(fmt.Sprintf("%s/%s", clause.Type, clause.SubType), params))
}
content.AcceptContentTypes = strings.Join(parts, ",")
return content
}
// GetRateLimiter returns rate limiter for a given client, or nil if it's called on a nil client
func (c *RESTClient) GetRateLimiter() flowcontrol.RateLimiter {
if c == nil {

View File

@ -160,6 +160,9 @@ func NewRequest(c *RESTClient) *Request {
contentTypeNotSet := len(contentConfig.ContentType) == 0
if contentTypeNotSet {
contentConfig.ContentType = "application/json"
if clientfeatures.TestOnlyFeatureGates.Enabled(clientfeatures.TestOnlyClientAllowsCBOR) && clientfeatures.TestOnlyFeatureGates.Enabled(clientfeatures.TestOnlyClientPrefersCBOR) {
contentConfig.ContentType = "application/cbor"
}
}
r := &Request{

View File

@ -20,6 +20,7 @@ import (
"context"
"fmt"
"log"
"net/http"
"reflect"
rt "runtime"
"strings"
@ -28,9 +29,12 @@ import (
"time"
"github.com/google/go-cmp/cmp"
appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
eventsv1 "k8s.io/api/events/v1"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
apiextensionsv1client "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/typed/apiextensions/v1"
"k8s.io/apimachinery/pkg/api/equality"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
@ -45,14 +49,22 @@ import (
appsv1ac "k8s.io/client-go/applyconfigurations/apps/v1"
corev1ac "k8s.io/client-go/applyconfigurations/core/v1"
metav1ac "k8s.io/client-go/applyconfigurations/meta/v1"
"k8s.io/client-go/discovery"
"k8s.io/client-go/gentype"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/utils/pointer"
clientscheme "k8s.io/client-go/kubernetes/scheme"
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/rest"
"k8s.io/client-go/util/retry"
utilversion "k8s.io/component-base/version"
kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
"k8s.io/kubernetes/pkg/api/legacyscheme"
"k8s.io/kubernetes/test/integration/framework"
imageutils "k8s.io/kubernetes/test/utils/image"
"k8s.io/kubernetes/test/utils/ktesting"
wardlev1alpha1 "k8s.io/sample-apiserver/pkg/apis/wardle/v1alpha1"
wardlev1alpha1client "k8s.io/sample-apiserver/pkg/generated/clientset/versioned/typed/wardle/v1alpha1"
"k8s.io/utils/ptr"
)
func TestClient(t *testing.T) {
@ -147,7 +159,7 @@ func TestAtomicPut(t *testing.T) {
},
},
Spec: v1.ReplicationControllerSpec{
Replicas: pointer.Int32(0),
Replicas: ptr.To(int32(0)),
Selector: map[string]string{
"foo": "bar",
},
@ -1347,3 +1359,494 @@ func TestExtractModifyApply_ForceOwnership(t *testing.T) {
t.Errorf("createMgrExtracted apply configuration did not match expected, got:\n%s\n", cmp.Diff(expectedCreateExtracted, createMgrExtracted))
}
}
func TestGeneratedClientCBOREnablement(t *testing.T) {
// Generated clients for built-in types force Protobuf by default. They are tested here to
// ensure that the CBOR client feature gates do not interfere with this.
DoRequestWithProtobufPreferredGeneratedClient := func(t *testing.T, config *rest.Config) error {
clientset, err := clientset.NewForConfig(config)
if err != nil {
t.Fatal(err)
}
_, err = clientset.CoreV1().Namespaces().Create(
context.TODO(),
&v1.Namespace{
ObjectMeta: metav1.ObjectMeta{
Name: "test-generated-client-cbor-enablement",
},
},
metav1.CreateOptions{DryRun: []string{metav1.DryRunAll}},
)
return err
}
DoRequestWithGeneratedClient := func(t *testing.T, config *rest.Config) error {
// This is using a generated client from sample-apiserver because it is generated
// without --prefer-protobuf. For convenience, the test serves the API as a CRD with
// a permissive schema instead of running a real aggregated sample-apiserver.
wardleClient, err := wardlev1alpha1client.NewForConfig(config)
if err != nil {
t.Fatal(err)
}
_, err = wardleClient.Fischers().Create(
context.TODO(),
&wardlev1alpha1.Fischer{
ObjectMeta: metav1.ObjectMeta{Name: "test-generated-client-cbor-enablement"},
},
metav1.CreateOptions{DryRun: []string{metav1.DryRunAll}},
)
return err
}
type testCase struct {
name string
served bool
allowed bool
preferred bool
configuredContentType string
configuredAccept string
wantRequestContentType string
wantRequestAccept string
wantResponseContentType string
wantResponseStatus int
wantStatusError bool
doRequest func(t *testing.T, config *rest.Config) error
}
testCases := []testCase{
{
name: "cbor allowed and preferred client forces protobuf",
served: true,
allowed: true,
preferred: true,
wantRequestContentType: "application/vnd.kubernetes.protobuf",
wantRequestAccept: "application/vnd.kubernetes.protobuf,application/json",
wantResponseContentType: "application/vnd.kubernetes.protobuf",
wantResponseStatus: http.StatusCreated,
wantStatusError: false,
doRequest: DoRequestWithProtobufPreferredGeneratedClient,
},
{
name: "cbor allowed and not preferred client forces protobuf",
served: true,
allowed: true,
preferred: false,
wantRequestContentType: "application/vnd.kubernetes.protobuf",
wantRequestAccept: "application/vnd.kubernetes.protobuf,application/json",
wantResponseContentType: "application/vnd.kubernetes.protobuf",
wantResponseStatus: http.StatusCreated,
wantStatusError: false,
doRequest: DoRequestWithProtobufPreferredGeneratedClient,
},
{
name: "cbor not allowed and not preferred client forces protobuf",
served: true,
allowed: false,
preferred: false,
wantRequestContentType: "application/vnd.kubernetes.protobuf",
wantRequestAccept: "application/vnd.kubernetes.protobuf,application/json",
wantResponseContentType: "application/vnd.kubernetes.protobuf",
wantResponseStatus: http.StatusCreated,
wantStatusError: false,
doRequest: DoRequestWithProtobufPreferredGeneratedClient,
},
{
name: "cbor not allowed and preferred client forces protobuf",
served: true,
allowed: false,
preferred: true,
wantRequestContentType: "application/vnd.kubernetes.protobuf",
wantRequestAccept: "application/vnd.kubernetes.protobuf,application/json",
wantResponseContentType: "application/vnd.kubernetes.protobuf",
wantResponseStatus: http.StatusCreated,
wantStatusError: false,
doRequest: DoRequestWithProtobufPreferredGeneratedClient,
},
{
name: "fully disabled",
served: true,
allowed: false,
preferred: false,
wantRequestContentType: "application/json",
wantRequestAccept: "application/json, */*",
wantResponseContentType: "application/json",
wantResponseStatus: http.StatusCreated,
wantStatusError: false,
doRequest: DoRequestWithGeneratedClient,
},
{
name: "send json accept both get json",
served: true,
allowed: true,
preferred: false,
wantRequestContentType: "application/json",
wantRequestAccept: "application/json, */*",
wantResponseContentType: "application/json",
wantResponseStatus: http.StatusCreated,
wantStatusError: false,
doRequest: DoRequestWithGeneratedClient,
},
{
name: "send json accept both get json",
served: false,
allowed: true,
preferred: false,
wantRequestContentType: "application/json",
wantRequestAccept: "application/json, */*",
wantResponseContentType: "application/json",
wantResponseStatus: http.StatusCreated,
wantStatusError: false,
doRequest: DoRequestWithGeneratedClient,
},
{
name: "send cbor accept both get cbor",
served: true,
allowed: true,
preferred: true,
wantRequestContentType: "application/cbor",
wantRequestAccept: "application/cbor, */*",
wantResponseContentType: "application/cbor",
wantResponseStatus: http.StatusCreated,
wantStatusError: false,
doRequest: DoRequestWithGeneratedClient,
},
{
name: "send cbor accept both get 415",
served: false,
allowed: true,
preferred: true,
wantRequestContentType: "application/cbor",
wantRequestAccept: "application/cbor, */*",
wantResponseContentType: "application/json",
wantResponseStatus: http.StatusUnsupportedMediaType,
wantStatusError: true,
doRequest: DoRequestWithGeneratedClient,
},
{
name: "both gates required to send cbor",
served: true,
allowed: false,
preferred: true,
wantRequestContentType: "application/json",
wantRequestAccept: "application/json, */*",
wantResponseContentType: "application/json",
wantResponseStatus: http.StatusCreated,
wantStatusError: false,
doRequest: DoRequestWithGeneratedClient,
},
{
name: "actively configured cbor",
served: true,
allowed: true,
preferred: false,
configuredContentType: "application/cbor",
configuredAccept: "application/cbor",
wantRequestContentType: "application/cbor",
wantRequestAccept: "application/cbor",
wantResponseContentType: "application/cbor",
wantResponseStatus: http.StatusCreated,
wantStatusError: false,
doRequest: DoRequestWithGeneratedClient,
},
{
name: "force disable actively configured cbor",
served: true,
allowed: false,
preferred: false,
configuredContentType: "application/cbor",
configuredAccept: "application/cbor",
wantRequestContentType: "application/json",
wantRequestAccept: "application/json",
wantResponseContentType: "application/json",
wantResponseStatus: http.StatusCreated,
wantStatusError: false,
doRequest: DoRequestWithGeneratedClient,
},
{
name: "actively configured cbor with two accepted media types",
served: true,
allowed: true,
preferred: false,
configuredContentType: "application/cbor",
configuredAccept: "application/cbor;q=0.9,example/foo;q=0.8",
wantRequestContentType: "application/cbor",
wantRequestAccept: "application/cbor;q=0.9,example/foo;q=0.8",
wantResponseContentType: "application/cbor",
wantResponseStatus: http.StatusCreated,
wantStatusError: false,
doRequest: DoRequestWithGeneratedClient,
},
{
name: "force disable actively configured cbor with two accepted media types",
served: true,
allowed: false,
preferred: false,
configuredContentType: "application/cbor",
configuredAccept: "application/cbor;q=0.9,example/foo;q=0.8",
wantRequestContentType: "application/json",
wantRequestAccept: "application/json; q=0.9,example/foo; q=0.8",
wantResponseContentType: "application/json",
wantResponseStatus: http.StatusCreated,
wantStatusError: false,
doRequest: DoRequestWithGeneratedClient,
},
}
for _, served := range []bool{true, false} {
t.Run(fmt.Sprintf("served=%t", served), func(t *testing.T) {
// Batch test cases with their server configuration instead of starting and stopping
// a new apiserver for each test case.
if served {
framework.EnableCBORServingAndStorageForTest(t)
}
server := kubeapiservertesting.StartTestServerOrDie(t, nil, framework.DefaultTestServerFlags(), framework.SharedEtcd())
defer server.TearDownFn()
apiextensionsClient, err := apiextensionsv1client.NewForConfig(server.ClientConfig)
if err != nil {
t.Fatal(err)
}
crd, err := apiextensionsClient.CustomResourceDefinitions().Create(
context.TODO(),
&apiextensionsv1.CustomResourceDefinition{
ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("fischers.%s", wardlev1alpha1.SchemeGroupVersion.Group)},
Spec: apiextensionsv1.CustomResourceDefinitionSpec{
Group: wardlev1alpha1.SchemeGroupVersion.Group,
Versions: []apiextensionsv1.CustomResourceDefinitionVersion{{
Name: wardlev1alpha1.SchemeGroupVersion.Version,
Served: true,
Storage: true,
Schema: &apiextensionsv1.CustomResourceValidation{
OpenAPIV3Schema: &apiextensionsv1.JSONSchemaProps{
XPreserveUnknownFields: ptr.To(true),
Type: "object",
},
},
}},
Names: apiextensionsv1.CustomResourceDefinitionNames{
Plural: "fischers",
Singular: "fischer",
Kind: "Fischer",
ListKind: "FischerList",
},
Scope: apiextensionsv1.ClusterScoped,
},
},
metav1.CreateOptions{},
)
if err != nil {
t.Fatal(err)
}
// wait to see cr in discovery
discoveryClient, err := discovery.NewDiscoveryClientForConfig(server.ClientConfig)
if err != nil {
t.Fatal(err)
}
if err := wait.PollUntilContextTimeout(context.TODO(), 100*time.Millisecond, 5*time.Second, true, func(context.Context) (done bool, err error) {
resources, err := discoveryClient.ServerResourcesForGroupVersion(wardlev1alpha1.SchemeGroupVersion.String())
if err != nil {
if apierrors.IsNotFound(err) {
return false, nil
}
return false, err
}
for _, resource := range resources.APIResources {
if resource.Name == crd.Spec.Names.Plural {
return true, nil
}
}
return false, nil
}); err != nil {
t.Fatal(err)
}
for _, tc := range testCases {
if tc.served != served {
continue
}
t.Run(tc.name, func(t *testing.T) {
framework.SetTestOnlyCBORClientFeatureGatesForTest(t, tc.allowed, tc.preferred)
config := rest.CopyConfig(server.ClientConfig)
config.ContentType = tc.configuredContentType
config.AcceptContentTypes = tc.configuredAccept
config.Wrap(func(rt http.RoundTripper) http.RoundTripper {
return roundTripperFunc(func(request *http.Request) (*http.Response, error) {
response, err := rt.RoundTrip(request)
if got := response.Request.Header.Get("Content-Type"); got != tc.wantRequestContentType {
t.Errorf("want request content type %q, got %q", tc.wantRequestContentType, got)
}
if got := response.Request.Header.Get("Accept"); got != tc.wantRequestAccept {
t.Errorf("want request accept %q, got %q", tc.wantRequestAccept, got)
}
if got := response.Header.Get("Content-Type"); got != tc.wantResponseContentType {
t.Errorf("want response content type %q, got %q", tc.wantResponseContentType, got)
}
if got := response.StatusCode; got != tc.wantResponseStatus {
t.Errorf("want response status %d, got %d", tc.wantResponseStatus, got)
}
return response, err
})
})
err := tc.doRequest(t, config)
switch {
case tc.wantStatusError && apierrors.IsUnsupportedMediaType(err):
// ok
case !tc.wantStatusError && err == nil:
// ok
default:
t.Errorf("unexpected error: %v", err)
}
})
}
})
}
}
func TestCBORWithTypedClient(t *testing.T) {
ktesting.SetDefaultVerbosity(10) // todo
framework.EnableCBORServingAndStorageForTest(t)
framework.SetTestOnlyCBORClientFeatureGatesForTest(t, true, true)
server := kubeapiservertesting.StartTestServerOrDie(t, nil, framework.DefaultTestServerFlags(), framework.SharedEtcd())
t.Cleanup(server.TearDownFn)
const TestNamespace = "test-cbor-typed-client"
{
// Setup using client with default config.
clientset, err := clientset.NewForConfig(server.ClientConfig)
if err != nil {
t.Fatal(err)
}
t.Cleanup(func() {
if err := clientset.CoreV1().Namespaces().Delete(context.TODO(), TestNamespace, metav1.DeleteOptions{}); err != nil && !apierrors.IsNotFound(err) {
t.Fatal(err)
}
})
if _, err := clientset.CoreV1().Namespaces().Create(context.TODO(), &v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: TestNamespace}}, metav1.CreateOptions{}); err != nil {
t.Fatal(err)
}
}
config := rest.CopyConfig(server.ClientConfig)
// Content negotiation controlled by client feature gates.
config.ContentType = ""
config.AcceptContentTypes = ""
config.Wrap(framework.AssertRequestResponseAsCBOR(t))
clientset, err := clientset.NewForConfig(config)
if err != nil {
t.Fatal(err)
}
// Should be identical to
// https://github.com/kubernetes/kubernetes/blob/9ec52fc06395e6ac2fd7a947d6b9fbd3f1bbacb3/staging/src/k8s.io/client-go/kubernetes/typed/core/v1/namespace.go#L64-L72
// minus the PrefersProtobuf option, which overrides content negotiation to Protobuf on a
// per-request basis.
var secretClient corev1client.SecretInterface = gentype.NewClientWithListAndApply[*v1.Secret, *v1.SecretList, *corev1ac.SecretApplyConfiguration](
"secrets",
clientset.CoreV1().RESTClient(),
clientscheme.ParameterCodec,
TestNamespace,
func() *v1.Secret { return &v1.Secret{} },
func() *v1.SecretList { return &v1.SecretList{} },
)
secret, err := secretClient.Create(context.TODO(), &v1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: "test-secret",
},
}, metav1.CreateOptions{})
if err != nil {
t.Fatal(err)
}
w, err := secretClient.Watch(context.TODO(), metav1.ListOptions{ResourceVersion: secret.ResourceVersion, FieldSelector: fmt.Sprintf("metadata.name=%s", secret.GetName())})
if err != nil {
t.Fatal(err)
}
defer w.Stop()
// do a real update to observe a watch event
if err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
s, err := secretClient.Get(context.TODO(), secret.GetName(), metav1.GetOptions{})
if err != nil {
return err
}
if s.Annotations == nil {
s.Annotations = map[string]string{}
}
s.Annotations["foo"] = "bar"
_, err = secretClient.Update(context.TODO(), s, metav1.UpdateOptions{})
return err
}); err != nil {
t.Fatal(err)
}
var seen bool
timeout := time.After(5 * time.Second)
for !seen {
select {
case e, ok := <-w.ResultChan():
if !ok {
t.Fatal("watch closed without receiving expected event")
}
if e.Type == watch.Error {
t.Fatalf("watch received unexpected error event: %v", apierrors.FromObject(e.Object))
}
if ns, ok := e.Object.(*v1.Secret); ok && ns.GetAnnotations()["foo"] == "bar" {
// observed update
seen = true
break
}
case <-timeout:
t.Fatal("timed out waiting for event")
}
}
if err := secretClient.Delete(context.TODO(), secret.GetName(), metav1.DeleteOptions{DryRun: []string{metav1.DryRunAll}}); err != nil {
t.Fatal(err)
}
if err := secretClient.DeleteCollection(context.TODO(), metav1.DeleteOptions{}, metav1.ListOptions{LabelSelector: "a,!a"}); err != nil {
t.Fatal(err)
}
if _, err := secretClient.Get(context.TODO(), secret.GetName(), metav1.GetOptions{}); err != nil {
t.Fatal(err)
}
if _, err := secretClient.List(context.TODO(), metav1.ListOptions{}); err != nil {
t.Fatal(err)
}
// for UpdateStatus
nsClient := gentype.NewClientWithListAndApply[*v1.Namespace, *v1.NamespaceList, *corev1ac.NamespaceApplyConfiguration](
"namespaces",
clientset.CoreV1().RESTClient(),
clientscheme.ParameterCodec,
"",
func() *v1.Namespace { return &v1.Namespace{} },
func() *v1.NamespaceList { return &v1.NamespaceList{} },
)
if err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
ns, err := nsClient.Get(context.TODO(), TestNamespace, metav1.GetOptions{})
if err != nil {
return err
}
_, err = nsClient.UpdateStatus(context.TODO(), ns, metav1.UpdateOptions{DryRun: []string{metav1.DryRunAll}})
return err
}); err != nil {
t.Fatal(err)
}
}