Merge pull request #46055 from deads2k/crd-01-embed

Automatic merge from submit-queue (batch tested with PRs 46022, 46055, 45308, 46209, 43590)

embed kube-apiextensions inside of kube-apiserver

To reduce operation complexity, we decided to include the kube-apiextensions-server inside of kube-apiserver (https://github.com/kubernetes/community/blob/master/sig-api-machinery/api-extensions-position-statement.md#q-should-kube-aggregator-be-a-separate-binaryprocess-than-kube-apiserver).  With the API reasonably well established and a finalizer about merge, I think its time to add ourselves.

This pull wires kube-apiextensions-server ahead of the TPRs so that one will replace the other if both are added by accident (CRDs should have priority) and wires a controller for automatic aggregation.

WIP because I still need tests: unit test for controller, test-cmd test to mirror the TPR test.


```release-note
Adds the `CustomResourceDefinition` (crd) types to the `kube-apiserver`.  These are the successors to `ThirdPartyResource`.  See https://github.com/kubernetes/community/blob/master/contributors/design-proposals/thirdpartyresources.md for more details.
```
This commit is contained in:
Kubernetes Submit Queue 2017-05-22 19:59:57 -07:00 committed by GitHub
commit bb56937b92
18 changed files with 370 additions and 73 deletions

View File

@ -11,6 +11,7 @@ go_library(
name = "go_default_library",
srcs = [
"aggregator.go",
"apiextensions.go",
"plugins.go",
"server.go",
],
@ -91,6 +92,9 @@ go_library(
"//vendor/k8s.io/kube-aggregator/pkg/apiserver:go_default_library",
"//vendor/k8s.io/kube-aggregator/pkg/client/clientset_generated/internalclientset/typed/apiregistration/internalversion:go_default_library",
"//vendor/k8s.io/kube-aggregator/pkg/controllers/autoregister:go_default_library",
"//vendor/k8s.io/kube-apiextensions-server/pkg/apiserver:go_default_library",
"//vendor/k8s.io/kube-apiextensions-server/pkg/client/informers/internalversion:go_default_library",
"//vendor/k8s.io/kube-apiextensions-server/pkg/cmd/server:go_default_library",
],
)

View File

@ -36,6 +36,7 @@ import (
aggregatorapiserver "k8s.io/kube-aggregator/pkg/apiserver"
apiregistrationclient "k8s.io/kube-aggregator/pkg/client/clientset_generated/internalclientset/typed/apiregistration/internalversion"
"k8s.io/kube-aggregator/pkg/controllers/autoregister"
apiextensionsinformers "k8s.io/kube-apiextensions-server/pkg/client/informers/internalversion"
"k8s.io/kubernetes/cmd/kube-apiserver/app/options"
informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/internalversion"
"k8s.io/kubernetes/pkg/master/thirdparty"
@ -89,7 +90,7 @@ func createAggregatorConfig(kubeAPIServerConfig genericapiserver.Config, command
}
func createAggregatorServer(aggregatorConfig *aggregatorapiserver.Config, delegateAPIServer genericapiserver.DelegationTarget, sharedInformers informers.SharedInformerFactory) (*aggregatorapiserver.APIAggregator, error) {
func createAggregatorServer(aggregatorConfig *aggregatorapiserver.Config, delegateAPIServer genericapiserver.DelegationTarget, kubeInformers informers.SharedInformerFactory, apiExtensionInformers apiextensionsinformers.SharedInformerFactory) (*aggregatorapiserver.APIAggregator, error) {
aggregatorServer, err := aggregatorConfig.Complete().NewWithDelegate(delegateAPIServer)
if err != nil {
return nil, err
@ -102,7 +103,10 @@ func createAggregatorServer(aggregatorConfig *aggregatorapiserver.Config, delega
}
autoRegistrationController := autoregister.NewAutoRegisterController(aggregatorServer.APIRegistrationInformers.Apiregistration().InternalVersion().APIServices(), apiRegistrationClient)
apiServices := apiServicesToRegister(delegateAPIServer, autoRegistrationController)
tprRegistrationController := thirdparty.NewAutoRegistrationController(sharedInformers.Extensions().InternalVersion().ThirdPartyResources(), autoRegistrationController)
tprRegistrationController := thirdparty.NewAutoRegistrationController(
kubeInformers.Extensions().InternalVersion().ThirdPartyResources(),
apiExtensionInformers.Apiextensions().InternalVersion().CustomResourceDefinitions(),
autoRegistrationController)
aggregatorServer.GenericAPIServer.AddPostStartHook("kube-apiserver-autoregistration", func(context genericapiserver.PostStartHookContext) error {
go autoRegistrationController.Run(5, context.StopCh)

View File

@ -0,0 +1,70 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package app does all of the work necessary to create a Kubernetes
// APIServer by binding together the API, master and APIServer infrastructure.
// It can be configured and called directly or via the hyperkube framework.
package app
import (
"k8s.io/apimachinery/pkg/runtime/schema"
genericapiserver "k8s.io/apiserver/pkg/server"
genericoptions "k8s.io/apiserver/pkg/server/options"
apiextensionsapiserver "k8s.io/kube-apiextensions-server/pkg/apiserver"
apiextensionscmd "k8s.io/kube-apiextensions-server/pkg/cmd/server"
"k8s.io/kubernetes/cmd/kube-apiserver/app/options"
)
func createAPIExtensionsConfig(kubeAPIServerConfig genericapiserver.Config, commandOptions *options.ServerRunOptions) (*apiextensionsapiserver.Config, error) {
// make a shallow copy to let us twiddle a few things
// most of the config actually remains the same. We only need to mess with a couple items related to the particulars of the apiextensions
genericConfig := kubeAPIServerConfig
// the apiextensions doesn't wire these up. It just delegates them to the kubeapiserver
genericConfig.EnableSwaggerUI = false
// TODO these need to be sorted out. There's an issue open
genericConfig.OpenAPIConfig = nil
genericConfig.SwaggerConfig = nil
// copy the loopbackclientconfig. We're going to change the contenttype back to json until we get protobuf serializations for it
t := *kubeAPIServerConfig.LoopbackClientConfig
genericConfig.LoopbackClientConfig = &t
genericConfig.LoopbackClientConfig.ContentConfig.ContentType = ""
// copy the etcd options so we don't mutate originals.
etcdOptions := *commandOptions.Etcd
etcdOptions.StorageConfig.Codec = apiextensionsapiserver.Codecs.LegacyCodec(schema.GroupVersion{Group: "apiextensions.k8s.io", Version: "v1alpha1"})
etcdOptions.StorageConfig.Copier = apiextensionsapiserver.Scheme
genericConfig.RESTOptionsGetter = &genericoptions.SimpleRestOptionsFactory{Options: etcdOptions}
apiextensionsConfig := &apiextensionsapiserver.Config{
GenericConfig: &genericConfig,
CRDRESTOptionsGetter: apiextensionscmd.NewCRDRESTOptionsGetter(etcdOptions),
}
return apiextensionsConfig, nil
}
func createAPIExtensionsServer(apiextensionsConfig *apiextensionsapiserver.Config, delegateAPIServer genericapiserver.DelegationTarget) (*apiextensionsapiserver.CustomResourceDefinitions, error) {
apiextensionsServer, err := apiextensionsConfig.Complete().New(delegateAPIServer)
if err != nil {
return nil, err
}
return apiextensionsServer, nil
}

View File

@ -104,43 +104,66 @@ func Run(runOptions *options.ServerRunOptions, stopCh <-chan struct{}) error {
if err != nil {
return err
}
kubeAPIServer, err := CreateKubeAPIServer(kubeAPIServerConfig, sharedInformers)
// kubeAPIServer is at the base for now. This ensures that CustomResourceDefinitions trump TPRs
kubeAPIServer, err := CreateKubeAPIServer(kubeAPIServerConfig, genericapiserver.EmptyDelegate, sharedInformers)
if err != nil {
return err
}
// run the insecure server now, don't block. It doesn't have any aggregator goodies since authentication wouldn't work
if insecureServingOptions != nil {
insecureHandlerChain := kubeserver.BuildInsecureHandlerChain(kubeAPIServer.GenericAPIServer.UnprotectedHandler(), kubeAPIServerConfig.GenericConfig)
if err := kubeserver.NonBlockingRun(insecureServingOptions, insecureHandlerChain, stopCh); err != nil {
return err
}
}
// if we're starting up a hacked up version of this API server for a weird test case,
// just start the API server as is because clients don't get built correctly when you do this
if len(os.Getenv("KUBE_API_VERSIONS")) > 0 {
if insecureServingOptions != nil {
insecureHandlerChain := kubeserver.BuildInsecureHandlerChain(kubeAPIServer.GenericAPIServer.UnprotectedHandler(), kubeAPIServerConfig.GenericConfig)
if err := kubeserver.NonBlockingRun(insecureServingOptions, insecureHandlerChain, stopCh); err != nil {
return err
}
}
return kubeAPIServer.GenericAPIServer.PrepareRun().Run(stopCh)
}
// otherwise go down the normal path of standing the aggregator up in front of the API server
// this wires up openapi
kubeAPIServer.GenericAPIServer.PrepareRun()
// TPRs are enabled and not yet beta, since this these are the successor, they fall under the same enablement rule
// Subsequent API servers in between here and kube-apiserver will need to be gated.
// These come first so that if someone registers both a TPR and a CRD, the CRD is preferred.
apiExtensionsConfig, err := createAPIExtensionsConfig(*kubeAPIServerConfig.GenericConfig, runOptions)
if err != nil {
return err
}
apiExtensionsServer, err := createAPIExtensionsServer(apiExtensionsConfig, kubeAPIServer.GenericAPIServer)
if err != nil {
return err
}
// aggregator comes last in the chain
aggregatorConfig, err := createAggregatorConfig(*kubeAPIServerConfig.GenericConfig, runOptions)
if err != nil {
return err
}
aggregatorServer, err := createAggregatorServer(aggregatorConfig, kubeAPIServer.GenericAPIServer, sharedInformers)
aggregatorServer, err := createAggregatorServer(aggregatorConfig, apiExtensionsServer.GenericAPIServer, sharedInformers, apiExtensionsServer.Informers)
if err != nil {
// we don't need special handling for innerStopCh because the aggregator server doesn't create any go routines
return err
}
if insecureServingOptions != nil {
insecureHandlerChain := kubeserver.BuildInsecureHandlerChain(aggregatorServer.GenericAPIServer.UnprotectedHandler(), kubeAPIServerConfig.GenericConfig)
if err := kubeserver.NonBlockingRun(insecureServingOptions, insecureHandlerChain, stopCh); err != nil {
return err
}
}
return aggregatorServer.GenericAPIServer.PrepareRun().Run(stopCh)
}
// CreateKubeAPIServer creates and wires a workable kube-apiserver
func CreateKubeAPIServer(kubeAPIServerConfig *master.Config, sharedInformers informers.SharedInformerFactory) (*master.Master, error) {
kubeAPIServer, err := kubeAPIServerConfig.Complete().New(genericapiserver.EmptyDelegate)
func CreateKubeAPIServer(kubeAPIServerConfig *master.Config, delegateAPIServer genericapiserver.DelegationTarget, sharedInformers informers.SharedInformerFactory) (*master.Master, error) {
kubeAPIServer, err := kubeAPIServerConfig.Complete().New(delegateAPIServer)
if err != nil {
return nil, err
}

View File

@ -67,6 +67,7 @@ static="static"
storageclass="storageclass"
subjectaccessreviews="subjectaccessreviews"
thirdpartyresources="thirdpartyresources"
customresourcedefinitions="customresourcedefinitions"
daemonsets="daemonsets"
@ -1286,6 +1287,57 @@ run_kubectl_request_timeout_tests() {
kubectl delete pods valid-pod "${kube_flags[@]}"
}
run_crd_tests() {
create_and_use_new_namespace
kubectl "${kube_flags_with_token[@]}" create -f - << __EOF__
{
"kind": "CustomResourceDefinition",
"apiVersion": "apiextensions.k8s.io/v1alpha1",
"metadata": {
"name": "foos.company.com"
},
"spec": {
"group": "company.com",
"version": "v1",
"names": {
"plural": "foos",
"kind": "Foo"
}
}
}
__EOF__
# Post-Condition: assertion object exist
kube::test::get_object_assert customresourcedefinitions "{{range.items}}{{$id_field}}:{{end}}" 'foos.company.com:'
kubectl "${kube_flags_with_token[@]}" create -f - << __EOF__
{
"kind": "CustomResourceDefinition",
"apiVersion": "apiextensions.k8s.io/v1alpha1",
"metadata": {
"name": "bars.company.com"
},
"spec": {
"group": "company.com",
"version": "v1",
"names": {
"plural": "bars",
"kind": "Bar"
}
}
}
__EOF__
# Post-Condition: assertion object exist
kube::test::get_object_assert customresourcedefinitions "{{range.items}}{{$id_field}}:{{end}}" 'bars.company.com:foos.company.com:'
run_non_native_resource_tests
# teardown
kubectl delete customresourcedefinitions/foos.company.com "${kube_flags_with_token[@]}"
kubectl delete customresourcedefinitions/bars.company.com "${kube_flags_with_token[@]}"
}
run_tpr_tests() {
create_and_use_new_namespace
kubectl "${kube_flags[@]}" create -f - "${kube_flags[@]}" << __EOF__
@ -1324,11 +1376,39 @@ __EOF__
# Post-Condition: assertion object exist
kube::test::get_object_assert thirdpartyresources "{{range.items}}{{$id_field}}:{{end}}" 'bar.company.com:foo.company.com:'
kube::util::wait_for_url "http://127.0.0.1:${API_PORT}/apis/company.com/v1" "third party api"
run_non_native_resource_tests
kube::util::wait_for_url "http://127.0.0.1:${API_PORT}/apis/company.com/v1/foos" "third party api Foo"
# teardown
kubectl delete thirdpartyresources/foo.company.com "${kube_flags[@]}"
kubectl delete thirdpartyresources/bar.company.com "${kube_flags[@]}"
}
kube::util::wait_for_url "http://127.0.0.1:${API_PORT}/apis/company.com/v1/bars" "third party api Bar"
kube::util::non_native_resources() {
local times
local wait
local failed
times=30
wait=10
local i
for i in $(seq 1 $times); do
failed=""
kubectl "${kube_flags[@]}" get --raw '/apis/company.com/v1' || failed=true
kubectl "${kube_flags[@]}" get --raw '/apis/company.com/v1/foos' || failed=true
kubectl "${kube_flags[@]}" get --raw '/apis/company.com/v1/bars' || failed=true
if [ -z "${failed}" ]; then
return 0
fi
sleep ${wait}
done
kube::log::error "Timed out waiting for non-native-resources; tried ${times} waiting ${wait}s between each"
return 1
}
run_non_native_resource_tests() {
kube::util::non_native_resources
# Test that we can list this new third party resource (foos)
kube::test::get_object_assert foos "{{range.items}}{{$id_field}}:{{end}}" ''
@ -1363,7 +1443,7 @@ __EOF__
kubectl "${kube_flags[@]}" get foos/test -o "jsonpath={.someField}" --allow-missing-template-keys=false
kubectl "${kube_flags[@]}" get foos -o "go-template={{range .items}}{{.someField}}{{end}}" --allow-missing-template-keys=false
kubectl "${kube_flags[@]}" get foos/test -o "go-template={{.someField}}" --allow-missing-template-keys=false
output_message=$(kubectl get foos/test -o name)
output_message=$(kubectl "${kube_flags[@]}" get foos/test -o name)
kube::test::if_has_string "${output_message}" 'foos/test'
# Test patching
@ -1558,10 +1638,6 @@ __EOF__
# Make sure it's gone
kube::test::get_object_assert foos "{{range.items}}{{$id_field}}:{{end}}" ''
kube::test::get_object_assert bars "{{range.items}}{{$id_field}}:{{end}}" ''
# teardown
kubectl delete thirdpartyresources foo.company.com "${kube_flags[@]}"
kubectl delete thirdpartyresources bar.company.com "${kube_flags[@]}"
}
run_recursive_resources_tests() {
@ -3172,6 +3248,11 @@ runTests() {
# Third Party Resources #
#####################################
# customresourcedefinitions cleanup after themselves. Run these first, then TPRs
if kube::test::if_supports_resource "${customresourcedefinitions}" ; then
run_crd_tests
fi
if kube::test::if_supports_resource "${thirdpartyresources}" ; then
run_tpr_tests
fi

View File

@ -43,6 +43,9 @@ go_library(
"//vendor/k8s.io/client-go/tools/cache:go_default_library",
"//vendor/k8s.io/client-go/util/workqueue:go_default_library",
"//vendor/k8s.io/kube-aggregator/pkg/apis/apiregistration:go_default_library",
"//vendor/k8s.io/kube-apiextensions-server/pkg/apis/apiextensions:go_default_library",
"//vendor/k8s.io/kube-apiextensions-server/pkg/client/informers/internalversion/apiextensions/internalversion:go_default_library",
"//vendor/k8s.io/kube-apiextensions-server/pkg/client/listers/apiextensions/internalversion:go_default_library",
],
)
@ -72,5 +75,7 @@ go_test(
"//vendor/k8s.io/client-go/tools/cache:go_default_library",
"//vendor/k8s.io/client-go/util/workqueue:go_default_library",
"//vendor/k8s.io/kube-aggregator/pkg/apis/apiregistration:go_default_library",
"//vendor/k8s.io/kube-apiextensions-server/pkg/apis/apiextensions:go_default_library",
"//vendor/k8s.io/kube-apiextensions-server/pkg/client/listers/apiextensions/internalversion:go_default_library",
],
)

View File

@ -30,6 +30,9 @@ import (
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/kube-aggregator/pkg/apis/apiregistration"
"k8s.io/kube-apiextensions-server/pkg/apis/apiextensions"
crdinformers "k8s.io/kube-apiextensions-server/pkg/client/informers/internalversion/apiextensions/internalversion"
crdlisters "k8s.io/kube-apiextensions-server/pkg/client/listers/apiextensions/internalversion"
"k8s.io/kubernetes/pkg/apis/extensions"
informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/internalversion/extensions/internalversion"
listers "k8s.io/kubernetes/pkg/client/listers/extensions/internalversion"
@ -49,6 +52,8 @@ type AutoAPIServiceRegistration interface {
type tprRegistrationController struct {
tprLister listers.ThirdPartyResourceLister
tprSynced cache.InformerSynced
crdLister crdlisters.CustomResourceDefinitionLister
crdSynced cache.InformerSynced
apiServiceRegistration AutoAPIServiceRegistration
@ -61,14 +66,18 @@ type tprRegistrationController struct {
// NewAutoRegistrationController returns a controller which will register TPR GroupVersions with the auto APIService registration
// controller so they automatically stay in sync.
func NewAutoRegistrationController(tprInformer informers.ThirdPartyResourceInformer, apiServiceRegistration AutoAPIServiceRegistration) *tprRegistrationController {
// In order to stay sane with both TPR and CRD present, we have a single controller that manages both. When choosing whether to have an
// APIService, we simply iterate through both.
func NewAutoRegistrationController(tprInformer informers.ThirdPartyResourceInformer, crdinformer crdinformers.CustomResourceDefinitionInformer, apiServiceRegistration AutoAPIServiceRegistration) *tprRegistrationController {
c := &tprRegistrationController{
tprLister: tprInformer.Lister(),
tprSynced: tprInformer.Informer().HasSynced,
crdLister: crdinformer.Lister(),
crdSynced: crdinformer.Informer().HasSynced,
apiServiceRegistration: apiServiceRegistration,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "tpr-autoregister"),
}
c.syncHandler = c.handleTPR
c.syncHandler = c.handleVersionUpdate
tprInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
@ -97,6 +106,33 @@ func NewAutoRegistrationController(tprInformer informers.ThirdPartyResourceInfor
},
})
crdinformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
cast := obj.(*apiextensions.CustomResourceDefinition)
c.enqueueCRD(cast)
},
UpdateFunc: func(_, obj interface{}) {
cast := obj.(*apiextensions.CustomResourceDefinition)
c.enqueueCRD(cast)
},
DeleteFunc: func(obj interface{}) {
cast, ok := obj.(*apiextensions.CustomResourceDefinition)
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
glog.V(2).Infof("Couldn't get object from tombstone %#v", obj)
return
}
cast, ok = tombstone.Obj.(*apiextensions.CustomResourceDefinition)
if !ok {
glog.V(2).Infof("Tombstone contained unexpected object: %#v", obj)
return
}
}
c.enqueueCRD(cast)
},
})
return c
}
@ -173,14 +209,19 @@ func (c *tprRegistrationController) enqueueTPR(tpr *extensions.ThirdPartyResourc
}
}
func (c *tprRegistrationController) handleTPR(groupVersion schema.GroupVersion) error {
func (c *tprRegistrationController) enqueueCRD(crd *apiextensions.CustomResourceDefinition) {
c.queue.Add(schema.GroupVersion{Group: crd.Spec.Group, Version: crd.Spec.Version})
}
func (c *tprRegistrationController) handleVersionUpdate(groupVersion schema.GroupVersion) error {
found := false
apiServiceName := groupVersion.Version + "." + groupVersion.Group
// check all TPRs. There shouldn't that many, but if we have problems later we can index them
tprs, err := c.tprLister.List(labels.Everything())
if err != nil {
return err
}
found := false
for _, tpr := range tprs {
_, group, err := thirdpartyresourcedata.ExtractApiGroupAndKind(tpr)
if err != nil {
@ -194,7 +235,17 @@ func (c *tprRegistrationController) handleTPR(groupVersion schema.GroupVersion)
}
}
apiServiceName := groupVersion.Version + "." + groupVersion.Group
// check all CRDs. There shouldn't that many, but if we have problems later we can index them
crds, err := c.crdLister.List(labels.Everything())
if err != nil {
return err
}
for _, crd := range crds {
if crd.Spec.Version == groupVersion.Version && crd.Spec.Group == groupVersion.Group {
found = true
break
}
}
if !found {
c.apiServiceRegistration.RemoveAPIServiceToSync(apiServiceName)

View File

@ -25,6 +25,8 @@ import (
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/kube-aggregator/pkg/apis/apiregistration"
"k8s.io/kube-apiextensions-server/pkg/apis/apiextensions"
crdlisters "k8s.io/kube-apiextensions-server/pkg/client/listers/apiextensions/internalversion"
"k8s.io/kubernetes/pkg/apis/extensions"
listers "k8s.io/kubernetes/pkg/client/listers/extensions/internalversion"
)
@ -56,17 +58,18 @@ func TestEnqueue(t *testing.T) {
}
}
func TestHandleTPR(t *testing.T) {
func TestHandleVersionUpdate(t *testing.T) {
tests := []struct {
name string
startingTPRs []*extensions.ThirdPartyResource
startingCRDs []*apiextensions.CustomResourceDefinition
version schema.GroupVersion
expectedAdded []*apiregistration.APIService
expectedRemoved []string
}{
{
name: "simple add",
name: "simple add tpr",
startingTPRs: []*extensions.ThirdPartyResource{
{
ObjectMeta: metav1.ObjectMeta{Name: "resource.group.com"},
@ -89,7 +92,7 @@ func TestHandleTPR(t *testing.T) {
},
},
{
name: "simple remove",
name: "simple remove tpr",
startingTPRs: []*extensions.ThirdPartyResource{
{
ObjectMeta: metav1.ObjectMeta{Name: "resource.group.com"},
@ -100,6 +103,43 @@ func TestHandleTPR(t *testing.T) {
},
version: schema.GroupVersion{Group: "group.com", Version: "v2"},
expectedRemoved: []string{"v2.group.com"},
},
{
name: "simple add crd",
startingCRDs: []*apiextensions.CustomResourceDefinition{
{
Spec: apiextensions.CustomResourceDefinitionSpec{
Group: "group.com",
Version: "v1",
},
},
},
version: schema.GroupVersion{Group: "group.com", Version: "v1"},
expectedAdded: []*apiregistration.APIService{
{
ObjectMeta: metav1.ObjectMeta{Name: "v1.group.com"},
Spec: apiregistration.APIServiceSpec{
Group: "group.com",
Version: "v1",
Priority: 500,
},
},
},
},
{
name: "simple remove crd",
startingCRDs: []*apiextensions.CustomResourceDefinition{
{
Spec: apiextensions.CustomResourceDefinitionSpec{
Group: "group.com",
Version: "v1",
},
},
},
version: schema.GroupVersion{Group: "group.com", Version: "v2"},
expectedRemoved: []string{"v2.group.com"},
},
}
@ -108,15 +148,21 @@ func TestHandleTPR(t *testing.T) {
registration := &fakeAPIServiceRegistration{}
tprCache := cache.NewIndexer(cache.DeletionHandlingMetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
tprLister := listers.NewThirdPartyResourceLister(tprCache)
crdCache := cache.NewIndexer(cache.DeletionHandlingMetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
crdLister := crdlisters.NewCustomResourceDefinitionLister(crdCache)
c := tprRegistrationController{
tprLister: tprLister,
crdLister: crdLister,
apiServiceRegistration: registration,
}
for i := range test.startingTPRs {
tprCache.Add(test.startingTPRs[i])
}
for i := range test.startingCRDs {
crdCache.Add(test.startingCRDs[i])
}
c.handleTPR(test.version)
c.handleVersionUpdate(test.version)
if !reflect.DeepEqual(test.expectedAdded, registration.added) {
t.Errorf("%s expected %v, got %v", test.name, test.expectedAdded, registration.added)

View File

@ -74,11 +74,14 @@ func init() {
type Config struct {
GenericConfig *genericapiserver.Config
CustomResourceDefinitionRESTOptionsGetter genericregistry.RESTOptionsGetter
CRDRESTOptionsGetter genericregistry.RESTOptionsGetter
}
type CustomResourceDefinitions struct {
GenericAPIServer *genericapiserver.GenericAPIServer
// provided for easier embedding
Informers internalinformers.SharedInformerFactory
}
type completedConfig struct {
@ -105,7 +108,7 @@ func (c *Config) SkipComplete() completedConfig {
// New returns a new instance of CustomResourceDefinitions from the given config.
func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) (*CustomResourceDefinitions, error) {
genericServer, err := c.Config.GenericConfig.SkipComplete().New(genericapiserver.EmptyDelegate) // completion is done in Complete, no need for a second time
genericServer, err := c.Config.GenericConfig.SkipComplete().New(delegationTarget) // completion is done in Complete, no need for a second time
if err != nil {
return nil, err
}
@ -126,11 +129,11 @@ func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget)
return nil, err
}
customResourceDefinitionClient, err := internalclientset.NewForConfig(s.GenericAPIServer.LoopbackClientConfig)
crdClient, err := internalclientset.NewForConfig(s.GenericAPIServer.LoopbackClientConfig)
if err != nil {
return nil, err
}
customResourceDefinitionInformers := internalinformers.NewSharedInformerFactory(customResourceDefinitionClient, 5*time.Minute)
s.Informers = internalinformers.NewSharedInformerFactory(crdClient, 5*time.Minute)
delegateHandler := delegationTarget.UnprotectedHandler()
if delegateHandler == nil {
@ -145,31 +148,31 @@ func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget)
discovery: map[string]*discovery.APIGroupHandler{},
delegate: delegateHandler,
}
customResourceDefinitionHandler := NewCustomResourceDefinitionHandler(
crdHandler := NewCustomResourceDefinitionHandler(
versionDiscoveryHandler,
groupDiscoveryHandler,
s.GenericAPIServer.RequestContextMapper(),
customResourceDefinitionInformers.Apiextensions().InternalVersion().CustomResourceDefinitions().Lister(),
s.Informers.Apiextensions().InternalVersion().CustomResourceDefinitions().Lister(),
delegateHandler,
c.CustomResourceDefinitionRESTOptionsGetter,
c.CRDRESTOptionsGetter,
c.GenericConfig.AdmissionControl,
)
s.GenericAPIServer.Handler.PostGoRestfulMux.Handle("/apis", customResourceDefinitionHandler)
s.GenericAPIServer.Handler.PostGoRestfulMux.HandlePrefix("/apis/", customResourceDefinitionHandler)
s.GenericAPIServer.Handler.PostGoRestfulMux.Handle("/apis", crdHandler)
s.GenericAPIServer.Handler.PostGoRestfulMux.HandlePrefix("/apis/", crdHandler)
customResourceDefinitionController := NewDiscoveryController(customResourceDefinitionInformers.Apiextensions().InternalVersion().CustomResourceDefinitions(), versionDiscoveryHandler, groupDiscoveryHandler)
namingController := status.NewNamingConditionController(customResourceDefinitionInformers.Apiextensions().InternalVersion().CustomResourceDefinitions(), customResourceDefinitionClient)
crdController := NewDiscoveryController(s.Informers.Apiextensions().InternalVersion().CustomResourceDefinitions(), versionDiscoveryHandler, groupDiscoveryHandler)
namingController := status.NewNamingConditionController(s.Informers.Apiextensions().InternalVersion().CustomResourceDefinitions(), crdClient)
finalizingController := finalizer.NewCRDFinalizer(
customResourceDefinitionInformers.Apiextensions().InternalVersion().CustomResourceDefinitions(),
customResourceDefinitionClient,
s.Informers.Apiextensions().InternalVersion().CustomResourceDefinitions(),
crdClient,
dynamic.NewDynamicClientPool(s.GenericAPIServer.LoopbackClientConfig))
s.GenericAPIServer.AddPostStartHook("start-apiextensions-informers", func(context genericapiserver.PostStartHookContext) error {
customResourceDefinitionInformers.Start(context.StopCh)
s.Informers.Start(context.StopCh)
return nil
})
s.GenericAPIServer.AddPostStartHook("start-apiextensions-controllers", func(context genericapiserver.PostStartHookContext) error {
go customResourceDefinitionController.Run(context.StopCh)
go crdController.Run(context.StopCh)
go namingController.Run(context.StopCh)
go finalizingController.Run(5, context.StopCh)
return nil

View File

@ -127,10 +127,6 @@ func (r *crdHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
r.delegate.ServeHTTP(w, req)
return
}
if len(requestInfo.Subresource) > 0 {
http.NotFound(w, req)
return
}
crdName := requestInfo.Resource + "." + requestInfo.APIGroup
crd, err := r.crdLister.Get(crdName)
@ -150,6 +146,10 @@ func (r *crdHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
if !apiextensions.IsCRDConditionFalse(crd, apiextensions.NameConflict) {
r.delegate.ServeHTTP(w, req)
}
if len(requestInfo.Subresource) > 0 {
http.NotFound(w, req)
return
}
terminating := apiextensions.IsCRDConditionTrue(crd, apiextensions.Terminating)
@ -379,7 +379,7 @@ type UnstructuredDefaulter struct{}
func (UnstructuredDefaulter) Default(in runtime.Object) {}
type CustomResourceDefinitionRESTOptionsGetter struct {
type CRDRESTOptionsGetter struct {
StorageConfig storagebackend.Config
StoragePrefix string
EnableWatchCache bool
@ -388,7 +388,7 @@ type CustomResourceDefinitionRESTOptionsGetter struct {
DeleteCollectionWorkers int
}
func (t CustomResourceDefinitionRESTOptionsGetter) GetRESTOptions(resource schema.GroupResource) (generic.RESTOptions, error) {
func (t CRDRESTOptionsGetter) GetRESTOptions(resource schema.GroupResource) (generic.RESTOptions, error) {
ret := generic.RESTOptions{
StorageConfig: &t.StorageConfig,
Decorator: generic.UndecoratedStorage,

View File

@ -14,6 +14,7 @@ go_library(
deps = [
"//vendor/github.com/spf13/cobra:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1/unstructured:go_default_library",
"//vendor/k8s.io/apiserver/pkg/registry/generic:go_default_library",
"//vendor/k8s.io/apiserver/pkg/server:go_default_library",
"//vendor/k8s.io/apiserver/pkg/server/options:go_default_library",
"//vendor/k8s.io/kube-apiextensions-server/pkg/apis/apiextensions/v1alpha1:go_default_library",

View File

@ -24,6 +24,7 @@ import (
"github.com/spf13/cobra"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
genericregistry "k8s.io/apiserver/pkg/registry/generic"
genericapiserver "k8s.io/apiserver/pkg/server"
genericoptions "k8s.io/apiserver/pkg/server/options"
"k8s.io/kube-apiextensions-server/pkg/apis/apiextensions/v1alpha1"
@ -95,24 +96,28 @@ func (o CustomResourceDefinitionsServerOptions) Config() (*apiserver.Config, err
return nil, err
}
customResourceDefinitionRESTOptionsGetter := apiserver.CustomResourceDefinitionRESTOptionsGetter{
StorageConfig: o.RecommendedOptions.Etcd.StorageConfig,
StoragePrefix: o.RecommendedOptions.Etcd.StorageConfig.Prefix,
EnableWatchCache: o.RecommendedOptions.Etcd.EnableWatchCache,
DefaultWatchCacheSize: o.RecommendedOptions.Etcd.DefaultWatchCacheSize,
EnableGarbageCollection: o.RecommendedOptions.Etcd.EnableGarbageCollection,
DeleteCollectionWorkers: o.RecommendedOptions.Etcd.DeleteCollectionWorkers,
}
customResourceDefinitionRESTOptionsGetter.StorageConfig.Codec = unstructured.UnstructuredJSONScheme
customResourceDefinitionRESTOptionsGetter.StorageConfig.Copier = apiserver.UnstructuredCopier{}
config := &apiserver.Config{
GenericConfig: serverConfig,
CustomResourceDefinitionRESTOptionsGetter: customResourceDefinitionRESTOptionsGetter,
GenericConfig: serverConfig,
CRDRESTOptionsGetter: NewCRDRESTOptionsGetter(*o.RecommendedOptions.Etcd),
}
return config, nil
}
func NewCRDRESTOptionsGetter(etcdOptions genericoptions.EtcdOptions) genericregistry.RESTOptionsGetter {
ret := apiserver.CRDRESTOptionsGetter{
StorageConfig: etcdOptions.StorageConfig,
StoragePrefix: etcdOptions.StorageConfig.Prefix,
EnableWatchCache: etcdOptions.EnableWatchCache,
DefaultWatchCacheSize: etcdOptions.DefaultWatchCacheSize,
EnableGarbageCollection: etcdOptions.EnableGarbageCollection,
DeleteCollectionWorkers: etcdOptions.DeleteCollectionWorkers,
}
ret.StorageConfig.Codec = unstructured.UnstructuredJSONScheme
ret.StorageConfig.Copier = apiserver.UnstructuredCopier{}
return ret
}
func (o CustomResourceDefinitionsServerOptions) RunCustomResourceDefinitionsServer(stopCh <-chan struct{}) error {
config, err := o.Config()
if err != nil {

View File

@ -457,9 +457,9 @@ func TestEtcdStorage(t *testing.T) {
}
func getPrefixFromConfig(t *testing.T, config *extensionsapiserver.Config) string {
extensionsOptionsGetter, ok := config.CustomResourceDefinitionRESTOptionsGetter.(extensionsapiserver.CustomResourceDefinitionRESTOptionsGetter)
extensionsOptionsGetter, ok := config.CRDRESTOptionsGetter.(extensionsapiserver.CRDRESTOptionsGetter)
if !ok {
t.Fatal("can't obtain etcd prefix: unable to cast config.CustomResourceDefinitionRESTOptionsGetter to extensionsapiserver.CustomResourceDefinitionRESTOptionsGetter")
t.Fatal("can't obtain etcd prefix: unable to cast config.CRDRESTOptionsGetter to extensionsapiserver.CRDRESTOptionsGetter")
}
return extensionsOptionsGetter.StoragePrefix
}

View File

@ -76,7 +76,7 @@ func DefaultServerConfig() (*extensionsapiserver.Config, error) {
return nil, err
}
customResourceDefinitionRESTOptionsGetter := extensionsapiserver.CustomResourceDefinitionRESTOptionsGetter{
customResourceDefinitionRESTOptionsGetter := extensionsapiserver.CRDRESTOptionsGetter{
StorageConfig: options.RecommendedOptions.Etcd.StorageConfig,
StoragePrefix: options.RecommendedOptions.Etcd.StorageConfig.Prefix,
EnableWatchCache: options.RecommendedOptions.Etcd.EnableWatchCache,
@ -88,8 +88,8 @@ func DefaultServerConfig() (*extensionsapiserver.Config, error) {
customResourceDefinitionRESTOptionsGetter.StorageConfig.Copier = extensionsapiserver.UnstructuredCopier{}
config := &extensionsapiserver.Config{
GenericConfig: genericConfig,
CustomResourceDefinitionRESTOptionsGetter: customResourceDefinitionRESTOptionsGetter,
GenericConfig: genericConfig,
CRDRESTOptionsGetter: customResourceDefinitionRESTOptionsGetter,
}
return config, nil

View File

@ -34,6 +34,7 @@ go_test(
"//vendor/k8s.io/apimachinery/pkg/util/diff:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/k8s.io/apiserver/pkg/server:go_default_library",
"//vendor/k8s.io/apiserver/pkg/storage/storagebackend:go_default_library",
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
"//vendor/k8s.io/client-go/rest:go_default_library",

View File

@ -39,6 +39,7 @@ import (
"k8s.io/apimachinery/pkg/util/diff"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
genericapiserver "k8s.io/apiserver/pkg/server"
"k8s.io/apiserver/pkg/storage/storagebackend"
kclient "k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"
@ -559,7 +560,7 @@ func startRealMasterOrDie(t *testing.T, certDir string) (*allClient, clientv3.KV
kubeAPIServerConfig.APIResourceConfigSource = &allResourceSource{} // force enable all resources
kubeAPIServer, err := app.CreateKubeAPIServer(kubeAPIServerConfig, sharedInformers)
kubeAPIServer, err := app.CreateKubeAPIServer(kubeAPIServerConfig, genericapiserver.EmptyDelegate, sharedInformers)
if err != nil {
t.Fatal(err)
}

View File

@ -21,6 +21,7 @@ go_test(
"//vendor/github.com/stretchr/testify/assert:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/k8s.io/apiserver/pkg/server:go_default_library",
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
"//vendor/k8s.io/client-go/rest:go_default_library",
"//vendor/k8s.io/client-go/tools/clientcmd:go_default_library",

View File

@ -33,6 +33,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
genericapiserver "k8s.io/apiserver/pkg/server"
client "k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
@ -117,7 +118,7 @@ func TestAggregatedAPIServer(t *testing.T) {
}
kubeClientConfigValue.Store(kubeAPIServerConfig.GenericConfig.LoopbackClientConfig)
kubeAPIServer, err := app.CreateKubeAPIServer(kubeAPIServerConfig, sharedInformers)
kubeAPIServer, err := app.CreateKubeAPIServer(kubeAPIServerConfig, genericapiserver.EmptyDelegate, sharedInformers)
if err != nil {
t.Fatal(err)
}