mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-22 19:31:44 +00:00
apiextensions-apiserver: add establishing controller to avoid race between established and CRs actually served
This commit is contained in:
parent
07e6410cf7
commit
2bf66c377d
@ -35,6 +35,7 @@ func createAPIExtensionsConfig(
|
|||||||
externalInformers kubeexternalinformers.SharedInformerFactory,
|
externalInformers kubeexternalinformers.SharedInformerFactory,
|
||||||
pluginInitializers []admission.PluginInitializer,
|
pluginInitializers []admission.PluginInitializer,
|
||||||
commandOptions *options.ServerRunOptions,
|
commandOptions *options.ServerRunOptions,
|
||||||
|
masterCount int,
|
||||||
) (*apiextensionsapiserver.Config, error) {
|
) (*apiextensionsapiserver.Config, error) {
|
||||||
// make a shallow copy to let us twiddle a few things
|
// make a shallow copy to let us twiddle a few things
|
||||||
// most of the config actually remains the same. We only need to mess with a couple items related to the particulars of the apiextensions
|
// most of the config actually remains the same. We only need to mess with a couple items related to the particulars of the apiextensions
|
||||||
@ -69,6 +70,7 @@ func createAPIExtensionsConfig(
|
|||||||
},
|
},
|
||||||
ExtraConfig: apiextensionsapiserver.ExtraConfig{
|
ExtraConfig: apiextensionsapiserver.ExtraConfig{
|
||||||
CRDRESTOptionsGetter: apiextensionscmd.NewCRDRESTOptionsGetter(etcdOptions),
|
CRDRESTOptionsGetter: apiextensionscmd.NewCRDRESTOptionsGetter(etcdOptions),
|
||||||
|
MasterCount: masterCount,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -165,7 +165,7 @@ func CreateServerChain(completedOptions completedServerRunOptions, stopCh <-chan
|
|||||||
}
|
}
|
||||||
|
|
||||||
// If additional API servers are added, they should be gated.
|
// If additional API servers are added, they should be gated.
|
||||||
apiExtensionsConfig, err := createAPIExtensionsConfig(*kubeAPIServerConfig.GenericConfig, versionedInformers, pluginInitializer, completedOptions.ServerRunOptions)
|
apiExtensionsConfig, err := createAPIExtensionsConfig(*kubeAPIServerConfig.GenericConfig, versionedInformers, pluginInitializer, completedOptions.ServerRunOptions, completedOptions.MasterCount)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -44,6 +44,7 @@ filegroup(
|
|||||||
"//staging/src/k8s.io/apiextensions-apiserver/pkg/client/listers/apiextensions/internalversion:all-srcs",
|
"//staging/src/k8s.io/apiextensions-apiserver/pkg/client/listers/apiextensions/internalversion:all-srcs",
|
||||||
"//staging/src/k8s.io/apiextensions-apiserver/pkg/client/listers/apiextensions/v1beta1:all-srcs",
|
"//staging/src/k8s.io/apiextensions-apiserver/pkg/client/listers/apiextensions/v1beta1:all-srcs",
|
||||||
"//staging/src/k8s.io/apiextensions-apiserver/pkg/cmd/server:all-srcs",
|
"//staging/src/k8s.io/apiextensions-apiserver/pkg/cmd/server:all-srcs",
|
||||||
|
"//staging/src/k8s.io/apiextensions-apiserver/pkg/controller/establish:all-srcs",
|
||||||
"//staging/src/k8s.io/apiextensions-apiserver/pkg/controller/finalizer:all-srcs",
|
"//staging/src/k8s.io/apiextensions-apiserver/pkg/controller/finalizer:all-srcs",
|
||||||
"//staging/src/k8s.io/apiextensions-apiserver/pkg/controller/status:all-srcs",
|
"//staging/src/k8s.io/apiextensions-apiserver/pkg/controller/status:all-srcs",
|
||||||
"//staging/src/k8s.io/apiextensions-apiserver/pkg/features:all-srcs",
|
"//staging/src/k8s.io/apiextensions-apiserver/pkg/features:all-srcs",
|
||||||
|
@ -32,6 +32,7 @@ go_library(
|
|||||||
"//vendor/k8s.io/apiextensions-apiserver/pkg/client/informers/internalversion:go_default_library",
|
"//vendor/k8s.io/apiextensions-apiserver/pkg/client/informers/internalversion:go_default_library",
|
||||||
"//vendor/k8s.io/apiextensions-apiserver/pkg/client/informers/internalversion/apiextensions/internalversion:go_default_library",
|
"//vendor/k8s.io/apiextensions-apiserver/pkg/client/informers/internalversion/apiextensions/internalversion:go_default_library",
|
||||||
"//vendor/k8s.io/apiextensions-apiserver/pkg/client/listers/apiextensions/internalversion:go_default_library",
|
"//vendor/k8s.io/apiextensions-apiserver/pkg/client/listers/apiextensions/internalversion:go_default_library",
|
||||||
|
"//vendor/k8s.io/apiextensions-apiserver/pkg/controller/establish:go_default_library",
|
||||||
"//vendor/k8s.io/apiextensions-apiserver/pkg/controller/finalizer:go_default_library",
|
"//vendor/k8s.io/apiextensions-apiserver/pkg/controller/finalizer:go_default_library",
|
||||||
"//vendor/k8s.io/apiextensions-apiserver/pkg/controller/status:go_default_library",
|
"//vendor/k8s.io/apiextensions-apiserver/pkg/controller/status:go_default_library",
|
||||||
"//vendor/k8s.io/apiextensions-apiserver/pkg/features:go_default_library",
|
"//vendor/k8s.io/apiextensions-apiserver/pkg/features:go_default_library",
|
||||||
|
@ -37,11 +37,11 @@ import (
|
|||||||
"k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
|
"k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
|
||||||
"k8s.io/apiextensions-apiserver/pkg/client/clientset/internalclientset"
|
"k8s.io/apiextensions-apiserver/pkg/client/clientset/internalclientset"
|
||||||
internalinformers "k8s.io/apiextensions-apiserver/pkg/client/informers/internalversion"
|
internalinformers "k8s.io/apiextensions-apiserver/pkg/client/informers/internalversion"
|
||||||
|
"k8s.io/apiextensions-apiserver/pkg/controller/establish"
|
||||||
"k8s.io/apiextensions-apiserver/pkg/controller/finalizer"
|
"k8s.io/apiextensions-apiserver/pkg/controller/finalizer"
|
||||||
"k8s.io/apiextensions-apiserver/pkg/controller/status"
|
"k8s.io/apiextensions-apiserver/pkg/controller/status"
|
||||||
"k8s.io/apiextensions-apiserver/pkg/registry/customresourcedefinition"
|
"k8s.io/apiextensions-apiserver/pkg/registry/customresourcedefinition"
|
||||||
|
|
||||||
// make sure the generated client works
|
|
||||||
_ "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
|
_ "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
|
||||||
_ "k8s.io/apiextensions-apiserver/pkg/client/informers/externalversions"
|
_ "k8s.io/apiextensions-apiserver/pkg/client/informers/externalversions"
|
||||||
_ "k8s.io/apiextensions-apiserver/pkg/client/informers/internalversion"
|
_ "k8s.io/apiextensions-apiserver/pkg/client/informers/internalversion"
|
||||||
@ -74,6 +74,10 @@ func init() {
|
|||||||
|
|
||||||
type ExtraConfig struct {
|
type ExtraConfig struct {
|
||||||
CRDRESTOptionsGetter genericregistry.RESTOptionsGetter
|
CRDRESTOptionsGetter genericregistry.RESTOptionsGetter
|
||||||
|
|
||||||
|
// MasterCount is used to detect whether cluster is HA, and if it is
|
||||||
|
// the CRD Establishing will be hold by 5 seconds.
|
||||||
|
MasterCount int
|
||||||
}
|
}
|
||||||
|
|
||||||
type Config struct {
|
type Config struct {
|
||||||
@ -162,6 +166,7 @@ func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget)
|
|||||||
discovery: map[string]*discovery.APIGroupHandler{},
|
discovery: map[string]*discovery.APIGroupHandler{},
|
||||||
delegate: delegateHandler,
|
delegate: delegateHandler,
|
||||||
}
|
}
|
||||||
|
establishingController := establish.NewEstablishingController(s.Informers.Apiextensions().InternalVersion().CustomResourceDefinitions(), crdClient.Apiextensions())
|
||||||
crdHandler := NewCustomResourceDefinitionHandler(
|
crdHandler := NewCustomResourceDefinitionHandler(
|
||||||
versionDiscoveryHandler,
|
versionDiscoveryHandler,
|
||||||
groupDiscoveryHandler,
|
groupDiscoveryHandler,
|
||||||
@ -169,6 +174,8 @@ func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget)
|
|||||||
delegateHandler,
|
delegateHandler,
|
||||||
c.ExtraConfig.CRDRESTOptionsGetter,
|
c.ExtraConfig.CRDRESTOptionsGetter,
|
||||||
c.GenericConfig.AdmissionControl,
|
c.GenericConfig.AdmissionControl,
|
||||||
|
establishingController,
|
||||||
|
c.ExtraConfig.MasterCount,
|
||||||
)
|
)
|
||||||
s.GenericAPIServer.Handler.NonGoRestfulMux.Handle("/apis", crdHandler)
|
s.GenericAPIServer.Handler.NonGoRestfulMux.Handle("/apis", crdHandler)
|
||||||
s.GenericAPIServer.Handler.NonGoRestfulMux.HandlePrefix("/apis/", crdHandler)
|
s.GenericAPIServer.Handler.NonGoRestfulMux.HandlePrefix("/apis/", crdHandler)
|
||||||
@ -188,6 +195,7 @@ func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget)
|
|||||||
s.GenericAPIServer.AddPostStartHook("start-apiextensions-controllers", func(context genericapiserver.PostStartHookContext) error {
|
s.GenericAPIServer.AddPostStartHook("start-apiextensions-controllers", func(context genericapiserver.PostStartHookContext) error {
|
||||||
go crdController.Run(context.StopCh)
|
go crdController.Run(context.StopCh)
|
||||||
go namingController.Run(context.StopCh)
|
go namingController.Run(context.StopCh)
|
||||||
|
go establishingController.Run(context.StopCh)
|
||||||
go finalizingController.Run(5, context.StopCh)
|
go finalizingController.Run(5, context.StopCh)
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
|
@ -62,6 +62,7 @@ import (
|
|||||||
apiservervalidation "k8s.io/apiextensions-apiserver/pkg/apiserver/validation"
|
apiservervalidation "k8s.io/apiextensions-apiserver/pkg/apiserver/validation"
|
||||||
informers "k8s.io/apiextensions-apiserver/pkg/client/informers/internalversion/apiextensions/internalversion"
|
informers "k8s.io/apiextensions-apiserver/pkg/client/informers/internalversion/apiextensions/internalversion"
|
||||||
listers "k8s.io/apiextensions-apiserver/pkg/client/listers/apiextensions/internalversion"
|
listers "k8s.io/apiextensions-apiserver/pkg/client/listers/apiextensions/internalversion"
|
||||||
|
"k8s.io/apiextensions-apiserver/pkg/controller/establish"
|
||||||
"k8s.io/apiextensions-apiserver/pkg/controller/finalizer"
|
"k8s.io/apiextensions-apiserver/pkg/controller/finalizer"
|
||||||
apiextensionsfeatures "k8s.io/apiextensions-apiserver/pkg/features"
|
apiextensionsfeatures "k8s.io/apiextensions-apiserver/pkg/features"
|
||||||
"k8s.io/apiextensions-apiserver/pkg/registry/customresource"
|
"k8s.io/apiextensions-apiserver/pkg/registry/customresource"
|
||||||
@ -86,6 +87,12 @@ type crdHandler struct {
|
|||||||
delegate http.Handler
|
delegate http.Handler
|
||||||
restOptionsGetter generic.RESTOptionsGetter
|
restOptionsGetter generic.RESTOptionsGetter
|
||||||
admission admission.Interface
|
admission admission.Interface
|
||||||
|
|
||||||
|
establishingController *establish.EstablishingController
|
||||||
|
|
||||||
|
// MasterCount is used to implement sleep to improve
|
||||||
|
// CRD establishing process for HA clusters.
|
||||||
|
masterCount int
|
||||||
}
|
}
|
||||||
|
|
||||||
// crdInfo stores enough information to serve the storage for the custom resource
|
// crdInfo stores enough information to serve the storage for the custom resource
|
||||||
@ -120,7 +127,9 @@ func NewCustomResourceDefinitionHandler(
|
|||||||
crdInformer informers.CustomResourceDefinitionInformer,
|
crdInformer informers.CustomResourceDefinitionInformer,
|
||||||
delegate http.Handler,
|
delegate http.Handler,
|
||||||
restOptionsGetter generic.RESTOptionsGetter,
|
restOptionsGetter generic.RESTOptionsGetter,
|
||||||
admission admission.Interface) *crdHandler {
|
admission admission.Interface,
|
||||||
|
establishingController *establish.EstablishingController,
|
||||||
|
masterCount int) *crdHandler {
|
||||||
ret := &crdHandler{
|
ret := &crdHandler{
|
||||||
versionDiscoveryHandler: versionDiscoveryHandler,
|
versionDiscoveryHandler: versionDiscoveryHandler,
|
||||||
groupDiscoveryHandler: groupDiscoveryHandler,
|
groupDiscoveryHandler: groupDiscoveryHandler,
|
||||||
@ -129,6 +138,8 @@ func NewCustomResourceDefinitionHandler(
|
|||||||
delegate: delegate,
|
delegate: delegate,
|
||||||
restOptionsGetter: restOptionsGetter,
|
restOptionsGetter: restOptionsGetter,
|
||||||
admission: admission,
|
admission: admission,
|
||||||
|
establishingController: establishingController,
|
||||||
|
masterCount: masterCount,
|
||||||
}
|
}
|
||||||
crdInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
|
crdInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||||
UpdateFunc: ret.updateCustomResourceDefinition,
|
UpdateFunc: ret.updateCustomResourceDefinition,
|
||||||
@ -181,7 +192,12 @@ func (r *crdHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
|
|||||||
r.delegate.ServeHTTP(w, req)
|
r.delegate.ServeHTTP(w, req)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if !apiextensions.IsCRDConditionTrue(crd, apiextensions.Established) {
|
// There is a small chance that a CRD is being served because NamesAccepted condition is true,
|
||||||
|
// but it becomes "unserved" because another names update leads to a conflict
|
||||||
|
// and EstablishingController wasn't fast enough to put the CRD into the Established condition.
|
||||||
|
// We accept this as the problem is small and self-healing.
|
||||||
|
if !apiextensions.IsCRDConditionTrue(crd, apiextensions.NamesAccepted) &&
|
||||||
|
!apiextensions.IsCRDConditionTrue(crd, apiextensions.Established) {
|
||||||
r.delegate.ServeHTTP(w, req)
|
r.delegate.ServeHTTP(w, req)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -299,6 +315,19 @@ func (r *crdHandler) updateCustomResourceDefinition(oldObj, newObj interface{})
|
|||||||
r.customStorageLock.Lock()
|
r.customStorageLock.Lock()
|
||||||
defer r.customStorageLock.Unlock()
|
defer r.customStorageLock.Unlock()
|
||||||
|
|
||||||
|
// Add CRD to the establishing controller queue.
|
||||||
|
// For HA clusters, we want to prevent race conditions when changing status to Established,
|
||||||
|
// so we want to be sure that CRD is Installing at least for 5 seconds before Establishing it.
|
||||||
|
// TODO: find a real HA safe checkpointing mechanism instead of an arbitrary wait.
|
||||||
|
if !apiextensions.IsCRDConditionTrue(newCRD, apiextensions.Established) &&
|
||||||
|
apiextensions.IsCRDConditionTrue(newCRD, apiextensions.NamesAccepted) {
|
||||||
|
if r.masterCount > 1 {
|
||||||
|
r.establishingController.QueueCRD(newCRD.Name, 5*time.Second)
|
||||||
|
} else {
|
||||||
|
r.establishingController.QueueCRD(newCRD.Name, 0)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
storageMap := r.customStorage.Load().(crdStorageMap)
|
storageMap := r.customStorage.Load().(crdStorageMap)
|
||||||
oldInfo, found := storageMap[newCRD.UID]
|
oldInfo, found := storageMap[newCRD.UID]
|
||||||
if !found {
|
if !found {
|
||||||
|
@ -0,0 +1,34 @@
|
|||||||
|
load("@io_bazel_rules_go//go:def.bzl", "go_library")
|
||||||
|
|
||||||
|
go_library(
|
||||||
|
name = "go_default_library",
|
||||||
|
srcs = ["establishing_controller.go"],
|
||||||
|
importpath = "k8s.io/apiextensions-apiserver/pkg/controller/establish",
|
||||||
|
visibility = ["//visibility:public"],
|
||||||
|
deps = [
|
||||||
|
"//vendor/github.com/golang/glog:go_default_library",
|
||||||
|
"//vendor/k8s.io/apiextensions-apiserver/pkg/apis/apiextensions:go_default_library",
|
||||||
|
"//vendor/k8s.io/apiextensions-apiserver/pkg/client/clientset/internalclientset/typed/apiextensions/internalversion:go_default_library",
|
||||||
|
"//vendor/k8s.io/apiextensions-apiserver/pkg/client/informers/internalversion/apiextensions/internalversion:go_default_library",
|
||||||
|
"//vendor/k8s.io/apiextensions-apiserver/pkg/client/listers/apiextensions/internalversion:go_default_library",
|
||||||
|
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
|
||||||
|
"//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
|
||||||
|
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
|
||||||
|
"//vendor/k8s.io/client-go/tools/cache:go_default_library",
|
||||||
|
"//vendor/k8s.io/client-go/util/workqueue:go_default_library",
|
||||||
|
],
|
||||||
|
)
|
||||||
|
|
||||||
|
filegroup(
|
||||||
|
name = "package-srcs",
|
||||||
|
srcs = glob(["**"]),
|
||||||
|
tags = ["automanaged"],
|
||||||
|
visibility = ["//visibility:private"],
|
||||||
|
)
|
||||||
|
|
||||||
|
filegroup(
|
||||||
|
name = "all-srcs",
|
||||||
|
srcs = [":package-srcs"],
|
||||||
|
tags = ["automanaged"],
|
||||||
|
visibility = ["//visibility:public"],
|
||||||
|
)
|
@ -0,0 +1,142 @@
|
|||||||
|
/*
|
||||||
|
Copyright 2018 The Kubernetes Authors.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package establish
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/golang/glog"
|
||||||
|
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||||
|
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||||
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
|
"k8s.io/client-go/tools/cache"
|
||||||
|
"k8s.io/client-go/util/workqueue"
|
||||||
|
|
||||||
|
"k8s.io/apiextensions-apiserver/pkg/apis/apiextensions"
|
||||||
|
client "k8s.io/apiextensions-apiserver/pkg/client/clientset/internalclientset/typed/apiextensions/internalversion"
|
||||||
|
informers "k8s.io/apiextensions-apiserver/pkg/client/informers/internalversion/apiextensions/internalversion"
|
||||||
|
listers "k8s.io/apiextensions-apiserver/pkg/client/listers/apiextensions/internalversion"
|
||||||
|
)
|
||||||
|
|
||||||
|
// EstablishingController controls how and when CRD is established.
|
||||||
|
type EstablishingController struct {
|
||||||
|
crdClient client.CustomResourceDefinitionsGetter
|
||||||
|
crdLister listers.CustomResourceDefinitionLister
|
||||||
|
crdSynced cache.InformerSynced
|
||||||
|
|
||||||
|
// To allow injection for testing.
|
||||||
|
syncFn func(key string) error
|
||||||
|
|
||||||
|
queue workqueue.RateLimitingInterface
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewEstablishingController creates new EstablishingController.
|
||||||
|
func NewEstablishingController(crdInformer informers.CustomResourceDefinitionInformer,
|
||||||
|
crdClient client.CustomResourceDefinitionsGetter) *EstablishingController {
|
||||||
|
ec := &EstablishingController{
|
||||||
|
crdClient: crdClient,
|
||||||
|
crdLister: crdInformer.Lister(),
|
||||||
|
crdSynced: crdInformer.Informer().HasSynced,
|
||||||
|
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "crdEstablishing"),
|
||||||
|
}
|
||||||
|
|
||||||
|
ec.syncFn = ec.sync
|
||||||
|
|
||||||
|
return ec
|
||||||
|
}
|
||||||
|
|
||||||
|
// QueueCRD adds CRD into the establishing queue.
|
||||||
|
func (ec *EstablishingController) QueueCRD(key string, timeout time.Duration) {
|
||||||
|
ec.queue.AddAfter(key, timeout)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Run starts the EstablishingController.
|
||||||
|
func (ec *EstablishingController) Run(stopCh <-chan struct{}) {
|
||||||
|
defer utilruntime.HandleCrash()
|
||||||
|
defer ec.queue.ShutDown()
|
||||||
|
|
||||||
|
glog.Infof("Starting EstablishingController")
|
||||||
|
defer glog.Infof("Shutting down EstablishingController")
|
||||||
|
|
||||||
|
if !cache.WaitForCacheSync(stopCh, ec.crdSynced) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// only start one worker thread since its a slow moving API
|
||||||
|
go wait.Until(ec.runWorker, time.Second, stopCh)
|
||||||
|
|
||||||
|
<-stopCh
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ec *EstablishingController) runWorker() {
|
||||||
|
for ec.processNextWorkItem() {
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// processNextWorkItem deals with one key off the queue.
|
||||||
|
// It returns false when it's time to quit.
|
||||||
|
func (ec *EstablishingController) processNextWorkItem() bool {
|
||||||
|
key, quit := ec.queue.Get()
|
||||||
|
if quit {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
defer ec.queue.Done(key)
|
||||||
|
|
||||||
|
err := ec.syncFn(key.(string))
|
||||||
|
if err == nil {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
utilruntime.HandleError(fmt.Errorf("%v failed with: %v", key, err))
|
||||||
|
ec.queue.AddRateLimited(key)
|
||||||
|
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
// sync is used to turn CRDs into the Established state.
|
||||||
|
func (ec *EstablishingController) sync(key string) error {
|
||||||
|
cachedCRD, err := ec.crdLister.Get(key)
|
||||||
|
if apierrors.IsNotFound(err) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if !apiextensions.IsCRDConditionTrue(cachedCRD, apiextensions.NamesAccepted) ||
|
||||||
|
apiextensions.IsCRDConditionTrue(cachedCRD, apiextensions.Established) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
crd := cachedCRD.DeepCopy()
|
||||||
|
establishedCondition := apiextensions.CustomResourceDefinitionCondition{
|
||||||
|
Type: apiextensions.Established,
|
||||||
|
Status: apiextensions.ConditionTrue,
|
||||||
|
Reason: "InitialNamesAccepted",
|
||||||
|
Message: "the initial names have been accepted",
|
||||||
|
}
|
||||||
|
apiextensions.SetCRDCondition(crd, establishedCondition)
|
||||||
|
|
||||||
|
// Update server with new CRD condition.
|
||||||
|
_, err = ec.crdClient.CustomResourceDefinitions().UpdateStatus(crd)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
@ -28,6 +28,7 @@ go_library(
|
|||||||
"//vendor/k8s.io/apiextensions-apiserver/pkg/client/clientset/internalclientset/typed/apiextensions/internalversion:go_default_library",
|
"//vendor/k8s.io/apiextensions-apiserver/pkg/client/clientset/internalclientset/typed/apiextensions/internalversion:go_default_library",
|
||||||
"//vendor/k8s.io/apiextensions-apiserver/pkg/client/informers/internalversion/apiextensions/internalversion:go_default_library",
|
"//vendor/k8s.io/apiextensions-apiserver/pkg/client/informers/internalversion/apiextensions/internalversion:go_default_library",
|
||||||
"//vendor/k8s.io/apiextensions-apiserver/pkg/client/listers/apiextensions/internalversion:go_default_library",
|
"//vendor/k8s.io/apiextensions-apiserver/pkg/client/listers/apiextensions/internalversion:go_default_library",
|
||||||
|
"//vendor/k8s.io/apimachinery/pkg/api/equality:go_default_library",
|
||||||
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
|
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
|
||||||
"//vendor/k8s.io/apimachinery/pkg/labels:go_default_library",
|
"//vendor/k8s.io/apimachinery/pkg/labels:go_default_library",
|
||||||
"//vendor/k8s.io/apimachinery/pkg/util/errors:go_default_library",
|
"//vendor/k8s.io/apimachinery/pkg/util/errors:go_default_library",
|
||||||
|
@ -24,6 +24,7 @@ import (
|
|||||||
|
|
||||||
"github.com/golang/glog"
|
"github.com/golang/glog"
|
||||||
|
|
||||||
|
"k8s.io/apimachinery/pkg/api/equality"
|
||||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||||
"k8s.io/apimachinery/pkg/labels"
|
"k8s.io/apimachinery/pkg/labels"
|
||||||
utilerrors "k8s.io/apimachinery/pkg/util/errors"
|
utilerrors "k8s.io/apimachinery/pkg/util/errors"
|
||||||
@ -191,7 +192,10 @@ func (c *NamingConditionController) calculateNamesAndConditions(in *apiextension
|
|||||||
namesAcceptedCondition.Message = "no conflicts found"
|
namesAcceptedCondition.Message = "no conflicts found"
|
||||||
}
|
}
|
||||||
|
|
||||||
// set EstablishedCondition to true if all names are accepted. Never set it back to false.
|
// set EstablishedCondition initially to false, then set it to true in establishing controller.
|
||||||
|
// The Establishing Controller will see the NamesAccepted condition when it arrives through the shared informer.
|
||||||
|
// At that time the API endpoint handler will serve the endpoint, avoiding a race
|
||||||
|
// which we had if we set Established to true here.
|
||||||
establishedCondition := apiextensions.CustomResourceDefinitionCondition{
|
establishedCondition := apiextensions.CustomResourceDefinitionCondition{
|
||||||
Type: apiextensions.Established,
|
Type: apiextensions.Established,
|
||||||
Status: apiextensions.ConditionFalse,
|
Status: apiextensions.ConditionFalse,
|
||||||
@ -204,8 +208,8 @@ func (c *NamingConditionController) calculateNamesAndConditions(in *apiextension
|
|||||||
if establishedCondition.Status != apiextensions.ConditionTrue && namesAcceptedCondition.Status == apiextensions.ConditionTrue {
|
if establishedCondition.Status != apiextensions.ConditionTrue && namesAcceptedCondition.Status == apiextensions.ConditionTrue {
|
||||||
establishedCondition = apiextensions.CustomResourceDefinitionCondition{
|
establishedCondition = apiextensions.CustomResourceDefinitionCondition{
|
||||||
Type: apiextensions.Established,
|
Type: apiextensions.Established,
|
||||||
Status: apiextensions.ConditionTrue,
|
Status: apiextensions.ConditionFalse,
|
||||||
Reason: "InitialNamesAccepted",
|
Reason: "Installing",
|
||||||
Message: "the initial names have been accepted",
|
Message: "the initial names have been accepted",
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -238,12 +242,16 @@ func (c *NamingConditionController) sync(key string) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Skip checking names if Spec and Status names are same.
|
||||||
|
if equality.Semantic.DeepEqual(inCustomResourceDefinition.Spec.Names, inCustomResourceDefinition.Status.AcceptedNames) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
acceptedNames, namingCondition, establishedCondition := c.calculateNamesAndConditions(inCustomResourceDefinition)
|
acceptedNames, namingCondition, establishedCondition := c.calculateNamesAndConditions(inCustomResourceDefinition)
|
||||||
|
|
||||||
// nothing to do if accepted names and NamesAccepted condition didn't change
|
// nothing to do if accepted names and NamesAccepted condition didn't change
|
||||||
if reflect.DeepEqual(inCustomResourceDefinition.Status.AcceptedNames, acceptedNames) &&
|
if reflect.DeepEqual(inCustomResourceDefinition.Status.AcceptedNames, acceptedNames) &&
|
||||||
apiextensions.IsCRDConditionEquivalent(&namingCondition, apiextensions.FindCRDCondition(inCustomResourceDefinition, apiextensions.NamesAccepted)) &&
|
apiextensions.IsCRDConditionEquivalent(&namingCondition, apiextensions.FindCRDCondition(inCustomResourceDefinition, apiextensions.NamesAccepted)) {
|
||||||
apiextensions.IsCRDConditionEquivalent(&establishedCondition, apiextensions.FindCRDCondition(inCustomResourceDefinition, apiextensions.Established)) {
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -95,19 +95,17 @@ var acceptedCondition = apiextensions.CustomResourceDefinitionCondition{
|
|||||||
Message: "no conflicts found",
|
Message: "no conflicts found",
|
||||||
}
|
}
|
||||||
|
|
||||||
func nameConflictCondition(reason, message string) apiextensions.CustomResourceDefinitionCondition {
|
var notAcceptedCondition = apiextensions.CustomResourceDefinitionCondition{
|
||||||
return apiextensions.CustomResourceDefinitionCondition{
|
Type: apiextensions.NamesAccepted,
|
||||||
Type: apiextensions.NamesAccepted,
|
Status: apiextensions.ConditionFalse,
|
||||||
Status: apiextensions.ConditionFalse,
|
Reason: "NotAccepted",
|
||||||
Reason: reason,
|
Message: "not all names are accepted",
|
||||||
Message: message,
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
var establishedCondition = apiextensions.CustomResourceDefinitionCondition{
|
var installingCondition = apiextensions.CustomResourceDefinitionCondition{
|
||||||
Type: apiextensions.Established,
|
Type: apiextensions.Established,
|
||||||
Status: apiextensions.ConditionTrue,
|
Status: apiextensions.ConditionFalse,
|
||||||
Reason: "InitialNamesAccepted",
|
Reason: "Installing",
|
||||||
Message: "the initial names have been accepted",
|
Message: "the initial names have been accepted",
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -118,6 +116,15 @@ var notEstablishedCondition = apiextensions.CustomResourceDefinitionCondition{
|
|||||||
Message: "not all names are accepted",
|
Message: "not all names are accepted",
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func nameConflictCondition(reason, message string) apiextensions.CustomResourceDefinitionCondition {
|
||||||
|
return apiextensions.CustomResourceDefinitionCondition{
|
||||||
|
Type: apiextensions.NamesAccepted,
|
||||||
|
Status: apiextensions.ConditionFalse,
|
||||||
|
Reason: reason,
|
||||||
|
Message: message,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestSync(t *testing.T) {
|
func TestSync(t *testing.T) {
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
name string
|
name string
|
||||||
@ -136,7 +143,7 @@ func TestSync(t *testing.T) {
|
|||||||
Plural: "alfa",
|
Plural: "alfa",
|
||||||
},
|
},
|
||||||
expectedNameConflictCondition: acceptedCondition,
|
expectedNameConflictCondition: acceptedCondition,
|
||||||
expectedEstablishedCondition: establishedCondition,
|
expectedEstablishedCondition: installingCondition,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "different groups",
|
name: "different groups",
|
||||||
@ -146,7 +153,7 @@ func TestSync(t *testing.T) {
|
|||||||
},
|
},
|
||||||
expectedNames: names("alfa", "delta-singular", "echo-kind", "foxtrot-listkind", "golf-shortname-1", "hotel-shortname-2"),
|
expectedNames: names("alfa", "delta-singular", "echo-kind", "foxtrot-listkind", "golf-shortname-1", "hotel-shortname-2"),
|
||||||
expectedNameConflictCondition: acceptedCondition,
|
expectedNameConflictCondition: acceptedCondition,
|
||||||
expectedEstablishedCondition: establishedCondition,
|
expectedEstablishedCondition: installingCondition,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "conflict plural to singular",
|
name: "conflict plural to singular",
|
||||||
@ -206,7 +213,7 @@ func TestSync(t *testing.T) {
|
|||||||
},
|
},
|
||||||
expectedNames: names("alfa", "delta-singular", "echo-kind", "foxtrot-listkind", "golf-shortname-1", "hotel-shortname-2"),
|
expectedNames: names("alfa", "delta-singular", "echo-kind", "foxtrot-listkind", "golf-shortname-1", "hotel-shortname-2"),
|
||||||
expectedNameConflictCondition: acceptedCondition,
|
expectedNameConflictCondition: acceptedCondition,
|
||||||
expectedEstablishedCondition: establishedCondition,
|
expectedEstablishedCondition: installingCondition,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "merge on conflicts",
|
name: "merge on conflicts",
|
||||||
@ -248,7 +255,7 @@ func TestSync(t *testing.T) {
|
|||||||
},
|
},
|
||||||
expectedNames: names("alfa", "delta-singular", "echo-kind", "foxtrot-listkind", "golf-shortname-1", "hotel-shortname-2"),
|
expectedNames: names("alfa", "delta-singular", "echo-kind", "foxtrot-listkind", "golf-shortname-1", "hotel-shortname-2"),
|
||||||
expectedNameConflictCondition: acceptedCondition,
|
expectedNameConflictCondition: acceptedCondition,
|
||||||
expectedEstablishedCondition: establishedCondition,
|
expectedEstablishedCondition: installingCondition,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "no conflicts on self, remove shortname",
|
name: "no conflicts on self, remove shortname",
|
||||||
@ -264,44 +271,44 @@ func TestSync(t *testing.T) {
|
|||||||
},
|
},
|
||||||
expectedNames: names("alfa", "delta-singular", "echo-kind", "foxtrot-listkind", "golf-shortname-1"),
|
expectedNames: names("alfa", "delta-singular", "echo-kind", "foxtrot-listkind", "golf-shortname-1"),
|
||||||
expectedNameConflictCondition: acceptedCondition,
|
expectedNameConflictCondition: acceptedCondition,
|
||||||
expectedEstablishedCondition: establishedCondition,
|
expectedEstablishedCondition: installingCondition,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "established before with true condition",
|
name: "installing before with true condition",
|
||||||
in: newCRD("alfa.bravo.com").Condition(establishedCondition).NewOrDie(),
|
in: newCRD("alfa.bravo.com").Condition(acceptedCondition).NewOrDie(),
|
||||||
existing: []*apiextensions.CustomResourceDefinition{},
|
existing: []*apiextensions.CustomResourceDefinition{},
|
||||||
expectedNames: apiextensions.CustomResourceDefinitionNames{
|
expectedNames: apiextensions.CustomResourceDefinitionNames{
|
||||||
Plural: "alfa",
|
Plural: "alfa",
|
||||||
},
|
},
|
||||||
expectedNameConflictCondition: acceptedCondition,
|
expectedNameConflictCondition: acceptedCondition,
|
||||||
expectedEstablishedCondition: establishedCondition,
|
expectedEstablishedCondition: installingCondition,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "not established before with false condition",
|
name: "not installing before with false condition",
|
||||||
in: newCRD("alfa.bravo.com").Condition(notEstablishedCondition).NewOrDie(),
|
in: newCRD("alfa.bravo.com").Condition(notAcceptedCondition).NewOrDie(),
|
||||||
existing: []*apiextensions.CustomResourceDefinition{},
|
existing: []*apiextensions.CustomResourceDefinition{},
|
||||||
expectedNames: apiextensions.CustomResourceDefinitionNames{
|
expectedNames: apiextensions.CustomResourceDefinitionNames{
|
||||||
Plural: "alfa",
|
Plural: "alfa",
|
||||||
},
|
},
|
||||||
expectedNameConflictCondition: acceptedCondition,
|
expectedNameConflictCondition: acceptedCondition,
|
||||||
expectedEstablishedCondition: establishedCondition,
|
expectedEstablishedCondition: installingCondition,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "conflicting, established before with true condition",
|
name: "conflicting, installing before with true condition",
|
||||||
in: newCRD("alfa.bravo.com").SpecNames("alfa", "delta-singular", "echo-kind", "foxtrot-listkind", "golf-shortname-1", "hotel-shortname-2").
|
in: newCRD("alfa.bravo.com").SpecNames("alfa", "delta-singular", "echo-kind", "foxtrot-listkind", "golf-shortname-1", "hotel-shortname-2").
|
||||||
Condition(establishedCondition).
|
Condition(acceptedCondition).
|
||||||
NewOrDie(),
|
NewOrDie(),
|
||||||
existing: []*apiextensions.CustomResourceDefinition{
|
existing: []*apiextensions.CustomResourceDefinition{
|
||||||
newCRD("india.bravo.com").StatusNames("india", "alfa", "", "").NewOrDie(),
|
newCRD("india.bravo.com").StatusNames("india", "alfa", "", "").NewOrDie(),
|
||||||
},
|
},
|
||||||
expectedNames: names("", "delta-singular", "echo-kind", "foxtrot-listkind", "golf-shortname-1", "hotel-shortname-2"),
|
expectedNames: names("", "delta-singular", "echo-kind", "foxtrot-listkind", "golf-shortname-1", "hotel-shortname-2"),
|
||||||
expectedNameConflictCondition: nameConflictCondition("PluralConflict", `"alfa" is already in use`),
|
expectedNameConflictCondition: nameConflictCondition("PluralConflict", `"alfa" is already in use`),
|
||||||
expectedEstablishedCondition: establishedCondition,
|
expectedEstablishedCondition: notEstablishedCondition,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "conflicting, not established before with false condition",
|
name: "conflicting, not installing before with false condition",
|
||||||
in: newCRD("alfa.bravo.com").SpecNames("alfa", "delta-singular", "echo-kind", "foxtrot-listkind", "golf-shortname-1", "hotel-shortname-2").
|
in: newCRD("alfa.bravo.com").SpecNames("alfa", "delta-singular", "echo-kind", "foxtrot-listkind", "golf-shortname-1", "hotel-shortname-2").
|
||||||
Condition(notEstablishedCondition).
|
Condition(notAcceptedCondition).
|
||||||
NewOrDie(),
|
NewOrDie(),
|
||||||
existing: []*apiextensions.CustomResourceDefinition{
|
existing: []*apiextensions.CustomResourceDefinition{
|
||||||
newCRD("india.bravo.com").StatusNames("india", "alfa", "", "").NewOrDie(),
|
newCRD("india.bravo.com").StatusNames("india", "alfa", "", "").NewOrDie(),
|
||||||
@ -322,7 +329,7 @@ func TestSync(t *testing.T) {
|
|||||||
crdLister: listers.NewCustomResourceDefinitionLister(crdIndexer),
|
crdLister: listers.NewCustomResourceDefinitionLister(crdIndexer),
|
||||||
crdMutationCache: cache.NewIntegerResourceVersionMutationCache(crdIndexer, crdIndexer, 60*time.Second, false),
|
crdMutationCache: cache.NewIntegerResourceVersionMutationCache(crdIndexer, crdIndexer, 60*time.Second, false),
|
||||||
}
|
}
|
||||||
actualNames, actualNameConflictCondition, actualEstablishedCondition := c.calculateNamesAndConditions(tc.in)
|
actualNames, actualNameConflictCondition, establishedCondition := c.calculateNamesAndConditions(tc.in)
|
||||||
|
|
||||||
if e, a := tc.expectedNames, actualNames; !reflect.DeepEqual(e, a) {
|
if e, a := tc.expectedNames, actualNames; !reflect.DeepEqual(e, a) {
|
||||||
t.Errorf("%v expected %v, got %#v", tc.name, e, a)
|
t.Errorf("%v expected %v, got %#v", tc.name, e, a)
|
||||||
@ -330,7 +337,7 @@ func TestSync(t *testing.T) {
|
|||||||
if e, a := tc.expectedNameConflictCondition, actualNameConflictCondition; !apiextensions.IsCRDConditionEquivalent(&e, &a) {
|
if e, a := tc.expectedNameConflictCondition, actualNameConflictCondition; !apiextensions.IsCRDConditionEquivalent(&e, &a) {
|
||||||
t.Errorf("%v expected %v, got %v", tc.name, e, a)
|
t.Errorf("%v expected %v, got %v", tc.name, e, a)
|
||||||
}
|
}
|
||||||
if e, a := tc.expectedEstablishedCondition, actualEstablishedCondition; !apiextensions.IsCRDConditionEquivalent(&e, &a) {
|
if e, a := tc.expectedEstablishedCondition, establishedCondition; !apiextensions.IsCRDConditionEquivalent(&e, &a) {
|
||||||
t.Errorf("%v expected %v, got %v", tc.name, e, a)
|
t.Errorf("%v expected %v, got %v", tc.name, e, a)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user