Wire test-only feature gate for CBOR serving.

To mitigate the risk of introducing a new protocol, integration tests for CBOR will be written using
a test-only feature gate instance that is not wired to runtime options. On alpha graduation, the
test-only feature gate instance will be replaced by a normal feature gate in the existing apiserver
feature gate instance.
This commit is contained in:
Ben Luddy 2024-10-23 16:36:25 -04:00
parent d638d64572
commit 0cad1a89b6
No known key found for this signature in database
GPG Key ID: A6551E73A5974C30
8 changed files with 397 additions and 2 deletions

View File

@ -54,6 +54,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/apimachinery/pkg/runtime/serializer/cbor"
"k8s.io/apimachinery/pkg/runtime/serializer/json"
"k8s.io/apimachinery/pkg/runtime/serializer/protobuf"
"k8s.io/apimachinery/pkg/runtime/serializer/versioning"
@ -69,8 +70,10 @@ import (
"k8s.io/apiserver/pkg/endpoints/handlers/responsewriters"
"k8s.io/apiserver/pkg/endpoints/metrics"
apirequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/features"
"k8s.io/apiserver/pkg/registry/generic"
genericfilters "k8s.io/apiserver/pkg/server/filters"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/apiserver/pkg/util/webhook"
"k8s.io/apiserver/pkg/warning"
"k8s.io/client-go/scale"
@ -600,6 +603,20 @@ func (r *crdHandler) GetCustomResourceListerCollectionDeleter(crd *apiextensions
return info.storages[info.storageVersion].CustomResource, nil
}
func newCBORSerializerInfo(creater runtime.ObjectCreater, typer runtime.ObjectTyper) runtime.SerializerInfo {
return runtime.SerializerInfo{
MediaType: "application/cbor",
MediaTypeType: "application",
MediaTypeSubType: "cbor",
Serializer: cbor.NewSerializer(creater, typer),
StrictSerializer: cbor.NewSerializer(creater, typer, cbor.Strict(true)),
StreamSerializer: &runtime.StreamSerializerInfo{
Framer: cbor.NewFramer(),
Serializer: cbor.NewSerializer(creater, typer, cbor.Transcode(false)),
},
}
}
// getOrCreateServingInfoFor gets the CRD serving info for the given CRD UID if the key exists in the storage map.
// Otherwise the function fetches the up-to-date CRD using the given CRD name and creates CRD serving info.
func (r *crdHandler) getOrCreateServingInfoFor(uid types.UID, name string) (*crdInfo, error) {
@ -892,6 +909,11 @@ func (r *crdHandler) getOrCreateServingInfoFor(uid types.UID, name string) (*crd
},
},
}
if utilfeature.TestOnlyFeatureGate.Enabled(features.TestOnlyCBORServingAndStorage) {
negotiatedSerializer.supportedMediaTypes = append(negotiatedSerializer.supportedMediaTypes, newCBORSerializerInfo(creator, typer))
}
var standardSerializers []runtime.SerializerInfo
for _, s := range negotiatedSerializer.SupportedMediaTypes() {
if s.MediaType == runtime.ContentTypeProtobuf {
@ -955,7 +977,11 @@ func (r *crdHandler) getOrCreateServingInfoFor(uid types.UID, name string) (*crd
scaleScope := *requestScopes[v.Name]
scaleConverter := scale.NewScaleConverter()
scaleScope.Subresource = "scale"
scaleScope.Serializer = serializer.NewCodecFactory(scaleConverter.Scheme())
var opts []serializer.CodecFactoryOptionsMutator
if utilfeature.TestOnlyFeatureGate.Enabled(features.TestOnlyCBORServingAndStorage) {
opts = append(opts, serializer.WithSerializer(newCBORSerializerInfo))
}
scaleScope.Serializer = serializer.NewCodecFactory(scaleConverter.Scheme(), opts...)
scaleScope.Kind = autoscalingv1.SchemeGroupVersion.WithKind("Scale")
scaleScope.Namer = handlers.ContextBasedNaming{
Namer: meta.NewAccessor(),

View File

@ -0,0 +1,236 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package integration
import (
"context"
"fmt"
"testing"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
apiextensionsclientset "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
"k8s.io/apiextensions-apiserver/test/integration/fixtures"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer"
cbor "k8s.io/apimachinery/pkg/runtime/serializer/cbor/direct"
"k8s.io/apiserver/pkg/features"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/rest"
"k8s.io/client-go/util/retry"
featuregatetesting "k8s.io/component-base/featuregate/testing"
)
func TestCBORServingEnablement(t *testing.T) {
for _, tc := range []struct {
name string
enabled bool
}{
{name: "enabled", enabled: true},
{name: "disabled", enabled: false},
} {
t.Run(tc.name, func(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.TestOnlyFeatureGate, features.TestOnlyCBORServingAndStorage, tc.enabled)
tearDown, config, _, err := fixtures.StartDefaultServer(t)
if err != nil {
t.Fatal(err)
}
defer tearDown()
apiExtensionsClientset, err := apiextensionsclientset.NewForConfig(config)
if err != nil {
t.Fatal(err)
}
dynamicClient, err := dynamic.NewForConfig(config)
if err != nil {
t.Fatal(err)
}
crd := &apiextensionsv1.CustomResourceDefinition{
ObjectMeta: metav1.ObjectMeta{Name: "foos.mygroup.example.com"},
Spec: apiextensionsv1.CustomResourceDefinitionSpec{
Group: "mygroup.example.com",
Versions: []apiextensionsv1.CustomResourceDefinitionVersion{{
Name: "v1beta1",
Served: true,
Storage: true,
Schema: fixtures.AllowAllSchema(),
Subresources: &apiextensionsv1.CustomResourceSubresources{
Status: &apiextensionsv1.CustomResourceSubresourceStatus{},
Scale: &apiextensionsv1.CustomResourceSubresourceScale{
SpecReplicasPath: ".spec.replicas",
StatusReplicasPath: ".status.replicas",
},
},
}},
Names: apiextensionsv1.CustomResourceDefinitionNames{
Plural: "foos",
Singular: "foo",
Kind: "Foo",
ListKind: "FooList",
},
Scope: apiextensionsv1.ClusterScoped,
},
}
if _, err = fixtures.CreateNewV1CustomResourceDefinition(crd, apiExtensionsClientset, dynamicClient); err != nil {
t.Fatal(err)
}
cr, err := dynamicClient.Resource(schema.GroupVersionResource{Group: "mygroup.example.com", Version: "v1beta1", Resource: "foos"}).Create(
context.TODO(),
&unstructured.Unstructured{
Object: map[string]interface{}{
"apiVersion": "mygroup.example.com/v1beta1",
"kind": "Foo",
"metadata": map[string]interface{}{
"name": fmt.Sprintf("test-cbor-%s", tc.name),
},
"spec": map[string]interface{}{
"replicas": int64(0),
},
"status": map[string]interface{}{
"replicas": int64(0),
},
}},
metav1.CreateOptions{},
)
if err != nil {
t.Fatal(err)
}
config = rest.CopyConfig(config)
config.NegotiatedSerializer = serializer.NewCodecFactory(runtime.NewScheme()).WithoutConversion()
config.APIPath = "/apis"
config.GroupVersion = &schema.GroupVersion{Group: "mygroup.example.com", Version: "v1beta1"}
restClient, err := rest.RESTClientFor(config)
if err != nil {
t.Fatal(err)
}
for _, subresource := range []string{"", "status", "scale"} {
err = restClient.Get().
Resource(crd.Spec.Names.Plural).
SubResource(subresource).
Name(cr.GetName()).
SetHeader("Accept", "application/cbor").
Do(context.TODO()).Error()
switch {
case tc.enabled && err == nil:
// ok
case !tc.enabled && errors.IsNotAcceptable(err):
// ok
default:
t.Errorf("unexpected error on read (subresource %q): %v", subresource, err)
}
}
createBody, err := cbor.Marshal(map[string]interface{}{
"apiVersion": "mygroup.example.com/v1beta1",
"kind": "Foo",
"metadata": map[string]interface{}{
"name": fmt.Sprintf("test-cbor-%s-2", tc.name),
},
"spec": map[string]interface{}{
"replicas": int64(0),
},
"status": map[string]interface{}{
"replicas": int64(0),
},
})
if err != nil {
t.Fatal(err)
}
err = restClient.Post().
Resource(crd.Spec.Names.Plural).
SetHeader("Content-Type", "application/cbor").
Body(createBody).
Do(context.TODO()).Error()
switch {
case tc.enabled && err == nil:
// ok
case !tc.enabled && errors.IsUnsupportedMediaType(err):
// ok
default:
t.Errorf("unexpected error on write: %v", err)
}
scaleBody, err := cbor.Marshal(map[string]interface{}{
"apiVersion": "autoscaling/v1",
"kind": "Scale",
"metadata": map[string]interface{}{
"name": cr.GetName(),
},
"spec": map[string]interface{}{
"replicas": int64(0),
},
"status": map[string]interface{}{
"replicas": int64(0),
},
})
if err != nil {
t.Fatal(err)
}
err = restClient.Put().
Resource(crd.Spec.Names.Plural).
SubResource("scale").
Name(cr.GetName()).
SetHeader("Content-Type", "application/cbor").
Body(scaleBody).
Do(context.TODO()).Error()
switch {
case tc.enabled && err == nil:
// ok
case !tc.enabled && errors.IsUnsupportedMediaType(err):
// ok
default:
t.Errorf("unexpected error on scale write: %v", err)
}
err = retry.RetryOnConflict(retry.DefaultBackoff, func() error {
latest, err := dynamicClient.Resource(schema.GroupVersionResource{Group: "mygroup.example.com", Version: "v1beta1", Resource: "foos"}).Get(context.TODO(), cr.GetName(), metav1.GetOptions{})
if err != nil {
t.Fatal(err)
}
statusBody, err := cbor.Marshal(latest.Object)
if err != nil {
t.Fatal(err)
}
return restClient.Put().
Resource(crd.Spec.Names.Plural).
SubResource("status").
Name(cr.GetName()).
SetHeader("Content-Type", "application/cbor").
Body(statusBody).
Do(context.TODO()).Error()
})
switch {
case tc.enabled && err == nil:
// ok
case !tc.enabled && errors.IsUnsupportedMediaType(err):
// ok
default:
t.Fatalf("unexpected error on status write: %v", err)
}
})
}
}

View File

@ -87,6 +87,15 @@ const (
// Allows authorization to use field and label selectors.
AuthorizeWithSelectors featuregate.Feature = "AuthorizeWithSelectors"
// owner: @benluddy
// kep: https://kep.k8s.io/4222
//
// Enables CBOR as a supported encoding for requests and responses, and as the
// preferred storage encoding for custom resources.
//
// This feature is currently PRE-ALPHA and MUST NOT be enabled outside of integration tests.
TestOnlyCBORServingAndStorage featuregate.Feature = "TestOnlyCBORServingAndStorage"
// owner: @serathius
// Enables concurrent watch object decoding to avoid starving watch cache when conversion webhook is installed.
ConcurrentWatchObjectDecode featuregate.Feature = "ConcurrentWatchObjectDecode"
@ -238,6 +247,7 @@ const (
func init() {
runtime.Must(utilfeature.DefaultMutableFeatureGate.Add(defaultKubernetesFeatureGates))
runtime.Must(utilfeature.DefaultMutableFeatureGate.AddVersioned(defaultVersionedKubernetesFeatureGates))
runtime.Must(utilfeature.TestOnlyMutableFeatureGate.AddVersioned(testOnlyVersionedKubernetesFeatureGates))
}
// defaultVersionedKubernetesFeatureGates consists of all known Kubernetes-specific feature keys with VersionedSpecs.
@ -410,3 +420,12 @@ var defaultVersionedKubernetesFeatureGates = map[featuregate.Feature]featuregate
// defaultKubernetesFeatureGates consists of legacy unversioned Kubernetes-specific feature keys.
// Please do not add to this struct and use defaultVersionedKubernetesFeatureGates instead.
var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureSpec{}
// testOnlyVersionedKubernetesFeatureGates consists of features that require programmatic enablement
// for integration testing, but have not yet graduated to alpha in a release and must not be enabled
// by a runtime option.
var testOnlyVersionedKubernetesFeatureGates = map[featuregate.Feature]featuregate.VersionedSpecs{
TestOnlyCBORServingAndStorage: {
{Version: version.MustParse("1.32"), Default: false, PreRelease: featuregate.Alpha},
},
}

View File

@ -742,7 +742,7 @@ func (c *RecommendedConfig) Complete() CompletedConfig {
return c.Config.Complete(c.SharedInformerFactory)
}
var allowedMediaTypes = []string{
var defaultAllowedMediaTypes = []string{
runtime.ContentTypeJSON,
runtime.ContentTypeYAML,
runtime.ContentTypeProtobuf,
@ -755,6 +755,10 @@ func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*G
if c.Serializer == nil {
return nil, fmt.Errorf("Genericapiserver.New() called with config.Serializer == nil")
}
allowedMediaTypes := defaultAllowedMediaTypes
if utilfeature.TestOnlyFeatureGate.Enabled(genericfeatures.TestOnlyCBORServingAndStorage) {
allowedMediaTypes = append(allowedMediaTypes, runtime.ContentTypeCBOR)
}
for _, info := range c.Serializer.SupportedMediaTypes() {
var ok bool
for _, mt := range allowedMediaTypes {

View File

@ -29,6 +29,7 @@ import (
"time"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/apimachinery/pkg/util/json"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
@ -40,12 +41,14 @@ import (
"k8s.io/apiserver/pkg/authentication/user"
"k8s.io/apiserver/pkg/authorization/authorizer"
"k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/features"
"k8s.io/apiserver/pkg/server/healthz"
utilfeature "k8s.io/apiserver/pkg/util/feature"
utilversion "k8s.io/apiserver/pkg/util/version"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/rest"
featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/component-base/tracing"
"k8s.io/klog/v2/ktesting"
netutils "k8s.io/utils/net"
@ -419,3 +422,22 @@ func TestNewErrorForbiddenSerializer(t *testing.T) {
t.Errorf("unexpected error: %v", err)
}
}
func TestNewFeatureGatedSerializer(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.TestOnlyFeatureGate, features.TestOnlyCBORServingAndStorage, true)
config := NewConfig(serializer.NewCodecFactory(scheme, serializer.WithSerializer(func(creater runtime.ObjectCreater, typer runtime.ObjectTyper) runtime.SerializerInfo {
return runtime.SerializerInfo{
MediaType: "application/cbor",
MediaTypeType: "application",
MediaTypeSubType: "cbor",
}
})))
config.ExternalAddress = "192.168.10.4:443"
config.EffectiveVersion = utilversion.NewEffectiveVersion("")
config.LoopbackClientConfig = &rest.Config{}
if _, err := config.Complete(nil).New("test", NewEmptyDelegate()); err != nil {
t.Errorf("unexpected error: %v", err)
}
}

View File

@ -31,3 +31,15 @@ var (
// Top-level commands/options setup that needs to modify this feature gate should use DefaultMutableFeatureGate.
DefaultFeatureGate featuregate.FeatureGate = DefaultMutableFeatureGate
)
var (
// TestOnlyMutableFeatureGate is a mutable version of TestOnlyFeatureGate. Only top-level
// commands/options setup and the k8s.io/component-base/featuregate/testing package should
// make use of this.
TestOnlyMutableFeatureGate featuregate.MutableVersionedFeatureGate = featuregate.NewFeatureGate()
// TestOnlyFeatureGate is a shared global FeatureGate for features that have not yet
// graduated to alpha and require programmatic feature enablement for pre-alpha integration
// testing without exposing the feature as a runtime option.
TestOnlyFeatureGate featuregate.FeatureGate = TestOnlyMutableFeatureGate
)

View File

@ -1228,6 +1228,12 @@
lockToDefault: false
preRelease: Beta
version: "1.32"
- name: TestOnlyCBORServingAndStorage
versionedSpecs:
- default: false
lockToDefault: false
preRelease: Alpha
version: "1.32"
- name: TopologyAwareHints
versionedSpecs:
- default: false

View File

@ -0,0 +1,70 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package framework
import (
"testing"
apiextensionsapiserver "k8s.io/apiextensions-apiserver/pkg/apiserver"
metainternalscheme "k8s.io/apimachinery/pkg/apis/meta/internalversion/scheme"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/apimachinery/pkg/runtime/serializer/cbor"
"k8s.io/apiserver/pkg/features"
utilfeature "k8s.io/apiserver/pkg/util/feature"
featuregatetesting "k8s.io/component-base/featuregate/testing"
aggregatorscheme "k8s.io/kube-aggregator/pkg/apiserver/scheme"
"k8s.io/kubernetes/pkg/api/legacyscheme"
)
// EnableCBORForTest patches global state to enable the CBOR serializer and reverses those changes
// at the end of the test. As a risk mitigation, integration tests are initially written this way so
// that integration tests can be implemented fully and incrementally before exposing options
// (including feature gates) that can enable CBOR at runtime. After integration test coverage is
// complete, feature gates will be introduced to completely supersede this mechanism.
func EnableCBORServingAndStorageForTest(tb testing.TB) {
featuregatetesting.SetFeatureGateDuringTest(tb, utilfeature.TestOnlyFeatureGate, features.TestOnlyCBORServingAndStorage, true)
newCBORSerializerInfo := func(creater runtime.ObjectCreater, typer runtime.ObjectTyper) runtime.SerializerInfo {
return runtime.SerializerInfo{
MediaType: "application/cbor",
MediaTypeType: "application",
MediaTypeSubType: "cbor",
Serializer: cbor.NewSerializer(creater, typer),
StrictSerializer: cbor.NewSerializer(creater, typer, cbor.Strict(true)),
StreamSerializer: &runtime.StreamSerializerInfo{
Framer: cbor.NewFramer(),
Serializer: cbor.NewSerializer(creater, typer, cbor.Transcode(false)),
},
}
}
// Codecs for built-in types are constructed at package initialization time and read by
// value from REST storage providers.
codecs := map[*runtime.Scheme]*serializer.CodecFactory{
legacyscheme.Scheme: &legacyscheme.Codecs,
metainternalscheme.Scheme: &metainternalscheme.Codecs,
aggregatorscheme.Scheme: &aggregatorscheme.Codecs,
apiextensionsapiserver.Scheme: &apiextensionsapiserver.Codecs,
}
for scheme, factory := range codecs {
original := *factory // shallow copy of original value
tb.Cleanup(func() { *codecs[scheme] = original })
*codecs[scheme] = serializer.NewCodecFactory(scheme, serializer.WithSerializer(newCBORSerializerInfo))
}
}