diff --git a/federation/cmd/federation-controller-manager/app/BUILD b/federation/cmd/federation-controller-manager/app/BUILD index 38402d62baa..a942168e54b 100644 --- a/federation/cmd/federation-controller-manager/app/BUILD +++ b/federation/cmd/federation-controller-manager/app/BUILD @@ -24,7 +24,6 @@ go_library( "//federation/pkg/dnsprovider/providers/google/clouddns:go_default_library", "//federation/pkg/federatedtypes:go_default_library", "//federation/pkg/federation-controller/cluster:go_default_library", - "//federation/pkg/federation-controller/configmap:go_default_library", "//federation/pkg/federation-controller/daemonset:go_default_library", "//federation/pkg/federation-controller/deployment:go_default_library", "//federation/pkg/federation-controller/ingress:go_default_library", diff --git a/federation/cmd/federation-controller-manager/app/controllermanager.go b/federation/cmd/federation-controller-manager/app/controllermanager.go index 2ac834f4db6..cf22253c929 100644 --- a/federation/cmd/federation-controller-manager/app/controllermanager.go +++ b/federation/cmd/federation-controller-manager/app/controllermanager.go @@ -38,7 +38,6 @@ import ( "k8s.io/kubernetes/federation/pkg/dnsprovider" "k8s.io/kubernetes/federation/pkg/federatedtypes" clustercontroller "k8s.io/kubernetes/federation/pkg/federation-controller/cluster" - configmapcontroller "k8s.io/kubernetes/federation/pkg/federation-controller/configmap" daemonsetcontroller "k8s.io/kubernetes/federation/pkg/federation-controller/daemonset" deploymentcontroller "k8s.io/kubernetes/federation/pkg/federation-controller/deployment" ingresscontroller "k8s.io/kubernetes/federation/pkg/federation-controller/ingress" @@ -166,12 +165,6 @@ func StartControllers(s *options.CMServer, restClientCfg *restclient.Config) err } } - if controllerEnabled(s.Controllers, serverResources, configmapcontroller.ControllerName, configmapcontroller.RequiredResources, true) { - configmapcontrollerClientset := federationclientset.NewForConfigOrDie(restclient.AddUserAgent(restClientCfg, "configmap-controller")) - configmapcontroller := configmapcontroller.NewConfigMapController(configmapcontrollerClientset) - configmapcontroller.Run(wait.NeverStop) - } - if controllerEnabled(s.Controllers, serverResources, daemonsetcontroller.ControllerName, daemonsetcontroller.RequiredResources, true) { daemonsetcontrollerClientset := federationclientset.NewForConfigOrDie(restclient.AddUserAgent(restClientCfg, "daemonset-controller")) daemonsetcontroller := daemonsetcontroller.NewDaemonSetController(daemonsetcontrollerClientset) diff --git a/federation/pkg/federatedtypes/BUILD b/federation/pkg/federatedtypes/BUILD index 977d6384360..3fe24120af5 100644 --- a/federation/pkg/federatedtypes/BUILD +++ b/federation/pkg/federatedtypes/BUILD @@ -11,6 +11,7 @@ go_library( name = "go_default_library", srcs = [ "adapter.go", + "configmap.go", "registry.go", "secret.go", ], diff --git a/federation/pkg/federatedtypes/configmap.go b/federation/pkg/federatedtypes/configmap.go new file mode 100644 index 00000000000..b60cea286df --- /dev/null +++ b/federation/pkg/federatedtypes/configmap.go @@ -0,0 +1,146 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package federatedtypes + +import ( + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + pkgruntime "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/watch" + federationclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset" + "k8s.io/kubernetes/federation/pkg/federation-controller/util" + apiv1 "k8s.io/kubernetes/pkg/api/v1" + kubeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" +) + +const ( + ConfigMapKind = "configmap" + ConfigMapControllerName = "configmaps" +) + +func init() { + RegisterFederatedType(ConfigMapKind, ConfigMapControllerName, []schema.GroupVersionResource{apiv1.SchemeGroupVersion.WithResource(ConfigMapControllerName)}, NewConfigMapAdapter) +} + +type ConfigMapAdapter struct { + client federationclientset.Interface +} + +func NewConfigMapAdapter(client federationclientset.Interface) FederatedTypeAdapter { + return &ConfigMapAdapter{client: client} +} + +func (a *ConfigMapAdapter) Kind() string { + return ConfigMapKind +} + +func (a *ConfigMapAdapter) ObjectType() pkgruntime.Object { + return &apiv1.ConfigMap{} +} + +func (a *ConfigMapAdapter) IsExpectedType(obj interface{}) bool { + _, ok := obj.(*apiv1.ConfigMap) + return ok +} + +func (a *ConfigMapAdapter) Copy(obj pkgruntime.Object) pkgruntime.Object { + configmap := obj.(*apiv1.ConfigMap) + return &apiv1.ConfigMap{ + ObjectMeta: util.DeepCopyRelevantObjectMeta(configmap.ObjectMeta), + Data: configmap.Data, + } +} + +func (a *ConfigMapAdapter) Equivalent(obj1, obj2 pkgruntime.Object) bool { + configmap1 := obj1.(*apiv1.ConfigMap) + configmap2 := obj2.(*apiv1.ConfigMap) + return util.ConfigMapEquivalent(configmap1, configmap2) +} + +func (a *ConfigMapAdapter) NamespacedName(obj pkgruntime.Object) types.NamespacedName { + configmap := obj.(*apiv1.ConfigMap) + return types.NamespacedName{Namespace: configmap.Namespace, Name: configmap.Name} +} + +func (a *ConfigMapAdapter) ObjectMeta(obj pkgruntime.Object) *metav1.ObjectMeta { + return &obj.(*apiv1.ConfigMap).ObjectMeta +} + +func (a *ConfigMapAdapter) FedCreate(obj pkgruntime.Object) (pkgruntime.Object, error) { + configmap := obj.(*apiv1.ConfigMap) + return a.client.CoreV1().ConfigMaps(configmap.Namespace).Create(configmap) +} + +func (a *ConfigMapAdapter) FedDelete(namespacedName types.NamespacedName, options *metav1.DeleteOptions) error { + return a.client.CoreV1().ConfigMaps(namespacedName.Namespace).Delete(namespacedName.Name, options) +} + +func (a *ConfigMapAdapter) FedGet(namespacedName types.NamespacedName) (pkgruntime.Object, error) { + return a.client.CoreV1().ConfigMaps(namespacedName.Namespace).Get(namespacedName.Name, metav1.GetOptions{}) +} + +func (a *ConfigMapAdapter) FedList(namespace string, options metav1.ListOptions) (pkgruntime.Object, error) { + return a.client.CoreV1().ConfigMaps(namespace).List(options) +} + +func (a *ConfigMapAdapter) FedUpdate(obj pkgruntime.Object) (pkgruntime.Object, error) { + configmap := obj.(*apiv1.ConfigMap) + return a.client.CoreV1().ConfigMaps(configmap.Namespace).Update(configmap) +} + +func (a *ConfigMapAdapter) FedWatch(namespace string, options metav1.ListOptions) (watch.Interface, error) { + return a.client.CoreV1().ConfigMaps(namespace).Watch(options) +} + +func (a *ConfigMapAdapter) ClusterCreate(client kubeclientset.Interface, obj pkgruntime.Object) (pkgruntime.Object, error) { + configmap := obj.(*apiv1.ConfigMap) + return client.CoreV1().ConfigMaps(configmap.Namespace).Create(configmap) +} + +func (a *ConfigMapAdapter) ClusterDelete(client kubeclientset.Interface, nsName types.NamespacedName, options *metav1.DeleteOptions) error { + return client.CoreV1().ConfigMaps(nsName.Namespace).Delete(nsName.Name, options) +} + +func (a *ConfigMapAdapter) ClusterGet(client kubeclientset.Interface, namespacedName types.NamespacedName) (pkgruntime.Object, error) { + return client.CoreV1().ConfigMaps(namespacedName.Namespace).Get(namespacedName.Name, metav1.GetOptions{}) +} + +func (a *ConfigMapAdapter) ClusterList(client kubeclientset.Interface, namespace string, options metav1.ListOptions) (pkgruntime.Object, error) { + return client.CoreV1().ConfigMaps(namespace).List(options) +} + +func (a *ConfigMapAdapter) ClusterUpdate(client kubeclientset.Interface, obj pkgruntime.Object) (pkgruntime.Object, error) { + configmap := obj.(*apiv1.ConfigMap) + return client.CoreV1().ConfigMaps(configmap.Namespace).Update(configmap) +} + +func (a *ConfigMapAdapter) ClusterWatch(client kubeclientset.Interface, namespace string, options metav1.ListOptions) (watch.Interface, error) { + return client.CoreV1().ConfigMaps(namespace).Watch(options) +} + +func (a *ConfigMapAdapter) NewTestObject(namespace string) pkgruntime.Object { + return &apiv1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: "test-configmap-", + Namespace: namespace, + }, + Data: map[string]string{ + "A": "ala ma kota", + }, + } +} diff --git a/federation/pkg/federation-controller/BUILD b/federation/pkg/federation-controller/BUILD index 85c0acda9c5..9e557def121 100644 --- a/federation/pkg/federation-controller/BUILD +++ b/federation/pkg/federation-controller/BUILD @@ -25,7 +25,6 @@ filegroup( srcs = [ ":package-srcs", "//federation/pkg/federation-controller/cluster:all-srcs", - "//federation/pkg/federation-controller/configmap:all-srcs", "//federation/pkg/federation-controller/daemonset:all-srcs", "//federation/pkg/federation-controller/deployment:all-srcs", "//federation/pkg/federation-controller/ingress:all-srcs", diff --git a/federation/pkg/federation-controller/configmap/BUILD b/federation/pkg/federation-controller/configmap/BUILD deleted file mode 100644 index f26c6a88f6a..00000000000 --- a/federation/pkg/federation-controller/configmap/BUILD +++ /dev/null @@ -1,73 +0,0 @@ -package(default_visibility = ["//visibility:public"]) - -licenses(["notice"]) - -load( - "@io_bazel_rules_go//go:def.bzl", - "go_library", - "go_test", -) - -go_library( - name = "go_default_library", - srcs = ["configmap_controller.go"], - tags = ["automanaged"], - deps = [ - "//federation/apis/federation/v1beta1:go_default_library", - "//federation/client/clientset_generated/federation_clientset:go_default_library", - "//federation/pkg/federation-controller/util:go_default_library", - "//federation/pkg/federation-controller/util/deletionhelper:go_default_library", - "//federation/pkg/federation-controller/util/eventsink:go_default_library", - "//pkg/api:go_default_library", - "//pkg/api/v1:go_default_library", - "//pkg/client/clientset_generated/clientset:go_default_library", - "//pkg/controller:go_default_library", - "//vendor/github.com/golang/glog:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/types:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/watch:go_default_library", - "//vendor/k8s.io/client-go/pkg/api/v1:go_default_library", - "//vendor/k8s.io/client-go/tools/cache:go_default_library", - "//vendor/k8s.io/client-go/tools/record:go_default_library", - "//vendor/k8s.io/client-go/util/flowcontrol:go_default_library", - ], -) - -go_test( - name = "go_default_test", - srcs = ["configmap_controller_test.go"], - library = ":go_default_library", - tags = ["automanaged"], - deps = [ - "//federation/apis/federation/v1beta1:go_default_library", - "//federation/client/clientset_generated/federation_clientset/fake:go_default_library", - "//federation/pkg/federation-controller/util:go_default_library", - "//federation/pkg/federation-controller/util/deletionhelper:go_default_library", - "//federation/pkg/federation-controller/util/test:go_default_library", - "//pkg/api/v1:go_default_library", - "//pkg/client/clientset_generated/clientset:go_default_library", - "//pkg/client/clientset_generated/clientset/fake:go_default_library", - "//vendor/github.com/golang/glog:go_default_library", - "//vendor/github.com/stretchr/testify/assert:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/types:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", - ], -) - -filegroup( - name = "package-srcs", - srcs = glob(["**"]), - tags = ["automanaged"], - visibility = ["//visibility:private"], -) - -filegroup( - name = "all-srcs", - srcs = [":package-srcs"], - tags = ["automanaged"], -) diff --git a/federation/pkg/federation-controller/configmap/configmap_controller.go b/federation/pkg/federation-controller/configmap/configmap_controller.go deleted file mode 100644 index ca0c0955a18..00000000000 --- a/federation/pkg/federation-controller/configmap/configmap_controller.go +++ /dev/null @@ -1,401 +0,0 @@ -/* -Copyright 2016 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package configmap - -import ( - "fmt" - "time" - - "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - pkgruntime "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/runtime/schema" - "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/watch" - clientv1 "k8s.io/client-go/pkg/api/v1" - "k8s.io/client-go/tools/cache" - "k8s.io/client-go/tools/record" - "k8s.io/client-go/util/flowcontrol" - federationapi "k8s.io/kubernetes/federation/apis/federation/v1beta1" - federationclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset" - "k8s.io/kubernetes/federation/pkg/federation-controller/util" - "k8s.io/kubernetes/federation/pkg/federation-controller/util/deletionhelper" - "k8s.io/kubernetes/federation/pkg/federation-controller/util/eventsink" - "k8s.io/kubernetes/pkg/api" - apiv1 "k8s.io/kubernetes/pkg/api/v1" - kubeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" - "k8s.io/kubernetes/pkg/controller" - - "github.com/golang/glog" -) - -const ( - allClustersKey = "ALL_CLUSTERS" - ControllerName = "configmaps" -) - -var ( - RequiredResources = []schema.GroupVersionResource{apiv1.SchemeGroupVersion.WithResource("configmaps")} -) - -type ConfigMapController struct { - // For triggering single configmap reconciliation. This is used when there is an - // add/update/delete operation on a configmap in either federated API server or - // in some member of the federation. - configmapDeliverer *util.DelayingDeliverer - - // For triggering all configmaps reconciliation. This is used when - // a new cluster becomes available. - clusterDeliverer *util.DelayingDeliverer - - // Contains configmaps present in members of federation. - configmapFederatedInformer util.FederatedInformer - // For updating members of federation. - federatedUpdater util.FederatedUpdater - // Definitions of configmaps that should be federated. - configmapInformerStore cache.Store - // Informer controller for configmaps that should be federated. - configmapInformerController cache.Controller - - // Client to federated api server. - federatedApiClient federationclientset.Interface - - // Backoff manager for configmaps - configmapBackoff *flowcontrol.Backoff - - // For events - eventRecorder record.EventRecorder - - // Finalizers - deletionHelper *deletionhelper.DeletionHelper - - configmapReviewDelay time.Duration - clusterAvailableDelay time.Duration - smallDelay time.Duration - updateTimeout time.Duration -} - -// NewConfigMapController returns a new configmap controller -func NewConfigMapController(client federationclientset.Interface) *ConfigMapController { - broadcaster := record.NewBroadcaster() - broadcaster.StartRecordingToSink(eventsink.NewFederatedEventSink(client)) - recorder := broadcaster.NewRecorder(api.Scheme, clientv1.EventSource{Component: "federated-configmaps-controller"}) - - configmapcontroller := &ConfigMapController{ - federatedApiClient: client, - configmapReviewDelay: time.Second * 10, - clusterAvailableDelay: time.Second * 20, - smallDelay: time.Second * 3, - updateTimeout: time.Second * 30, - configmapBackoff: flowcontrol.NewBackOff(5*time.Second, time.Minute), - eventRecorder: recorder, - } - - // Build delivereres for triggering reconciliations. - configmapcontroller.configmapDeliverer = util.NewDelayingDeliverer() - configmapcontroller.clusterDeliverer = util.NewDelayingDeliverer() - - // Start informer on federated API servers on configmaps that should be federated. - configmapcontroller.configmapInformerStore, configmapcontroller.configmapInformerController = cache.NewInformer( - &cache.ListWatch{ - ListFunc: func(options metav1.ListOptions) (pkgruntime.Object, error) { - return client.Core().ConfigMaps(metav1.NamespaceAll).List(options) - }, - WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { - return client.Core().ConfigMaps(metav1.NamespaceAll).Watch(options) - }, - }, - &apiv1.ConfigMap{}, - controller.NoResyncPeriodFunc(), - util.NewTriggerOnAllChanges(func(obj pkgruntime.Object) { configmapcontroller.deliverConfigMapObj(obj, 0, false) })) - - // Federated informer on configmaps in members of federation. - configmapcontroller.configmapFederatedInformer = util.NewFederatedInformer( - client, - func(cluster *federationapi.Cluster, targetClient kubeclientset.Interface) (cache.Store, cache.Controller) { - return cache.NewInformer( - &cache.ListWatch{ - ListFunc: func(options metav1.ListOptions) (pkgruntime.Object, error) { - return targetClient.Core().ConfigMaps(metav1.NamespaceAll).List(options) - }, - WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { - return targetClient.Core().ConfigMaps(metav1.NamespaceAll).Watch(options) - }, - }, - &apiv1.ConfigMap{}, - controller.NoResyncPeriodFunc(), - // Trigger reconciliation whenever something in federated cluster is changed. In most cases it - // would be just confirmation that some configmap operation succeeded. - util.NewTriggerOnAllChanges( - func(obj pkgruntime.Object) { - configmapcontroller.deliverConfigMapObj(obj, configmapcontroller.configmapReviewDelay, false) - }, - )) - }, - - &util.ClusterLifecycleHandlerFuncs{ - ClusterAvailable: func(cluster *federationapi.Cluster) { - // When new cluster becomes available process all the configmaps again. - configmapcontroller.clusterDeliverer.DeliverAt(allClustersKey, nil, time.Now().Add(configmapcontroller.clusterAvailableDelay)) - }, - }, - ) - - // Federated updater along with Create/Update/Delete operations. - configmapcontroller.federatedUpdater = util.NewFederatedUpdater(configmapcontroller.configmapFederatedInformer, - func(client kubeclientset.Interface, obj pkgruntime.Object) error { - configmap := obj.(*apiv1.ConfigMap) - _, err := client.Core().ConfigMaps(configmap.Namespace).Create(configmap) - return err - }, - func(client kubeclientset.Interface, obj pkgruntime.Object) error { - configmap := obj.(*apiv1.ConfigMap) - _, err := client.Core().ConfigMaps(configmap.Namespace).Update(configmap) - return err - }, - func(client kubeclientset.Interface, obj pkgruntime.Object) error { - configmap := obj.(*apiv1.ConfigMap) - orphanDependents := false - err := client.Core().ConfigMaps(configmap.Namespace).Delete(configmap.Name, &metav1.DeleteOptions{OrphanDependents: &orphanDependents}) - return err - }) - - configmapcontroller.deletionHelper = deletionhelper.NewDeletionHelper( - configmapcontroller.updateConfigMap, - // objNameFunc - func(obj pkgruntime.Object) string { - configmap := obj.(*apiv1.ConfigMap) - return configmap.Name - }, - configmapcontroller.updateTimeout, - configmapcontroller.eventRecorder, - configmapcontroller.configmapFederatedInformer, - configmapcontroller.federatedUpdater, - ) - - return configmapcontroller -} - -// Sends the given updated object to apiserver. -// Assumes that the given object is a configmap. -func (configmapcontroller *ConfigMapController) updateConfigMap(obj pkgruntime.Object) (pkgruntime.Object, error) { - configmap := obj.(*apiv1.ConfigMap) - return configmapcontroller.federatedApiClient.Core().ConfigMaps(configmap.Namespace).Update(configmap) -} - -func (configmapcontroller *ConfigMapController) Run(stopChan <-chan struct{}) { - go configmapcontroller.configmapInformerController.Run(stopChan) - configmapcontroller.configmapFederatedInformer.Start() - go func() { - <-stopChan - configmapcontroller.configmapFederatedInformer.Stop() - }() - configmapcontroller.configmapDeliverer.StartWithHandler(func(item *util.DelayingDelivererItem) { - configmap := item.Value.(*types.NamespacedName) - configmapcontroller.reconcileConfigMap(*configmap) - }) - configmapcontroller.clusterDeliverer.StartWithHandler(func(_ *util.DelayingDelivererItem) { - configmapcontroller.reconcileConfigMapsOnClusterChange() - }) - util.StartBackoffGC(configmapcontroller.configmapBackoff, stopChan) -} - -func (configmapcontroller *ConfigMapController) deliverConfigMapObj(obj interface{}, delay time.Duration, failed bool) { - configmap := obj.(*apiv1.ConfigMap) - configmapcontroller.deliverConfigMap(types.NamespacedName{Namespace: configmap.Namespace, Name: configmap.Name}, delay, failed) -} - -// Adds backoff to delay if this delivery is related to some failure. Resets backoff if there was no failure. -func (configmapcontroller *ConfigMapController) deliverConfigMap(configmap types.NamespacedName, delay time.Duration, failed bool) { - key := configmap.String() - if failed { - configmapcontroller.configmapBackoff.Next(key, time.Now()) - delay = delay + configmapcontroller.configmapBackoff.Get(key) - } else { - configmapcontroller.configmapBackoff.Reset(key) - } - configmapcontroller.configmapDeliverer.DeliverAfter(key, &configmap, delay) -} - -// Check whether all data stores are in sync. False is returned if any of the informer/stores is not yet -// synced with the corresponding api server. -func (configmapcontroller *ConfigMapController) isSynced() bool { - if !configmapcontroller.configmapFederatedInformer.ClustersSynced() { - glog.V(2).Infof("Cluster list not synced") - return false - } - clusters, err := configmapcontroller.configmapFederatedInformer.GetReadyClusters() - if err != nil { - glog.Errorf("Failed to get ready clusters: %v", err) - return false - } - if !configmapcontroller.configmapFederatedInformer.GetTargetStore().ClustersSynced(clusters) { - return false - } - return true -} - -// The function triggers reconciliation of all federated configmaps. -func (configmapcontroller *ConfigMapController) reconcileConfigMapsOnClusterChange() { - if !configmapcontroller.isSynced() { - glog.V(4).Infof("Configmap controller not synced") - configmapcontroller.clusterDeliverer.DeliverAt(allClustersKey, nil, time.Now().Add(configmapcontroller.clusterAvailableDelay)) - } - for _, obj := range configmapcontroller.configmapInformerStore.List() { - configmap := obj.(*apiv1.ConfigMap) - configmapcontroller.deliverConfigMap(types.NamespacedName{Namespace: configmap.Namespace, Name: configmap.Name}, - configmapcontroller.smallDelay, false) - } -} - -func (configmapcontroller *ConfigMapController) reconcileConfigMap(configmap types.NamespacedName) { - - if !configmapcontroller.isSynced() { - glog.V(4).Infof("Configmap controller not synced") - configmapcontroller.deliverConfigMap(configmap, configmapcontroller.clusterAvailableDelay, false) - return - } - - key := configmap.String() - baseConfigMapObj, exist, err := configmapcontroller.configmapInformerStore.GetByKey(key) - if err != nil { - glog.Errorf("Failed to query main configmap store for %v: %v", key, err) - configmapcontroller.deliverConfigMap(configmap, 0, true) - return - } - - if !exist { - // Not federated configmap, ignoring. - glog.V(8).Infof("Skipping not federated config map: %s", key) - return - } - obj, err := api.Scheme.DeepCopy(baseConfigMapObj) - configMap, ok := obj.(*apiv1.ConfigMap) - if err != nil || !ok { - glog.Errorf("Error in retrieving obj from store: %v, %v", ok, err) - return - } - - // Check if deletion has been requested. - if configMap.DeletionTimestamp != nil { - if err := configmapcontroller.delete(configMap); err != nil { - glog.Errorf("Failed to delete %s: %v", configmap, err) - configmapcontroller.eventRecorder.Eventf(configMap, api.EventTypeWarning, "DeleteFailed", - "ConfigMap delete failed: %v", err) - configmapcontroller.deliverConfigMap(configmap, 0, true) - } - return - } - - glog.V(3).Infof("Ensuring delete object from underlying clusters finalizer for configmap: %s", - configMap.Name) - // Add the required finalizers before creating a configmap in underlying clusters. - updatedConfigMapObj, err := configmapcontroller.deletionHelper.EnsureFinalizers(configMap) - if err != nil { - glog.Errorf("Failed to ensure delete object from underlying clusters finalizer in configmap %s: %v", - configMap.Name, err) - configmapcontroller.deliverConfigMap(configmap, 0, false) - return - } - configMap = updatedConfigMapObj.(*apiv1.ConfigMap) - - glog.V(3).Infof("Syncing configmap %s in underlying clusters", configMap.Name) - - clusters, err := configmapcontroller.configmapFederatedInformer.GetReadyClusters() - if err != nil { - glog.Errorf("Failed to get cluster list: %v, retrying shortly", err) - configmapcontroller.deliverConfigMap(configmap, configmapcontroller.clusterAvailableDelay, false) - return - } - - operations := make([]util.FederatedOperation, 0) - for _, cluster := range clusters { - clusterConfigMapObj, found, err := configmapcontroller.configmapFederatedInformer.GetTargetStore().GetByKey(cluster.Name, key) - if err != nil { - glog.Errorf("Failed to get %s from %s: %v, retrying shortly", key, cluster.Name, err) - configmapcontroller.deliverConfigMap(configmap, 0, true) - return - } - - // Do not modify data. - desiredConfigMap := &apiv1.ConfigMap{ - ObjectMeta: util.DeepCopyRelevantObjectMeta(configMap.ObjectMeta), - Data: configMap.Data, - } - - if !found { - configmapcontroller.eventRecorder.Eventf(configMap, api.EventTypeNormal, "CreateInCluster", - "Creating configmap in cluster %s", cluster.Name) - - operations = append(operations, util.FederatedOperation{ - Type: util.OperationTypeAdd, - Obj: desiredConfigMap, - ClusterName: cluster.Name, - }) - } else { - clusterConfigMap := clusterConfigMapObj.(*apiv1.ConfigMap) - - // Update existing configmap, if needed. - if !util.ConfigMapEquivalent(desiredConfigMap, clusterConfigMap) { - configmapcontroller.eventRecorder.Eventf(configMap, api.EventTypeNormal, "UpdateInCluster", - "Updating configmap in cluster %s", cluster.Name) - operations = append(operations, util.FederatedOperation{ - Type: util.OperationTypeUpdate, - Obj: desiredConfigMap, - ClusterName: cluster.Name, - }) - } - } - } - - if len(operations) == 0 { - // Everything is in order - glog.V(8).Infof("No operations needed for %s", key) - return - } - err = configmapcontroller.federatedUpdater.UpdateWithOnError(operations, configmapcontroller.updateTimeout, - func(op util.FederatedOperation, operror error) { - configmapcontroller.eventRecorder.Eventf(configMap, api.EventTypeWarning, "UpdateInClusterFailed", - "ConfigMap update in cluster %s failed: %v", op.ClusterName, operror) - }) - - if err != nil { - glog.Errorf("Failed to execute updates for %s: %v, retrying shortly", key, err) - configmapcontroller.deliverConfigMap(configmap, 0, true) - return - } -} - -// delete deletes the given configmap or returns error if the deletion was not complete. -func (configmapcontroller *ConfigMapController) delete(configmap *apiv1.ConfigMap) error { - glog.V(3).Infof("Handling deletion of configmap: %v", *configmap) - _, err := configmapcontroller.deletionHelper.HandleObjectInUnderlyingClusters(configmap) - if err != nil { - return err - } - - err = configmapcontroller.federatedApiClient.Core().ConfigMaps(configmap.Namespace).Delete(configmap.Name, nil) - if err != nil { - // Its all good if the error is not found error. That means it is deleted already and we do not have to do anything. - // This is expected when we are processing an update as a result of configmap finalizer deletion. - // The process that deleted the last finalizer is also going to delete the configmap and we do not have to do anything. - if !errors.IsNotFound(err) { - return fmt.Errorf("failed to delete configmap: %v", err) - } - } - return nil -} diff --git a/federation/pkg/federation-controller/sync/BUILD b/federation/pkg/federation-controller/sync/BUILD index 58f9918827c..eb226c3b0f4 100644 --- a/federation/pkg/federation-controller/sync/BUILD +++ b/federation/pkg/federation-controller/sync/BUILD @@ -51,7 +51,10 @@ filegroup( go_test( name = "go_default_test", - srcs = ["secret_controller_test.go"], + srcs = [ + "configmap_controller_test.go", + "secret_controller_test.go", + ], library = ":go_default_library", tags = ["automanaged"], deps = [ diff --git a/federation/pkg/federation-controller/configmap/configmap_controller_test.go b/federation/pkg/federation-controller/sync/configmap_controller_test.go similarity index 91% rename from federation/pkg/federation-controller/configmap/configmap_controller_test.go rename to federation/pkg/federation-controller/sync/configmap_controller_test.go index fdb6642ad81..c0ffe56ea8b 100644 --- a/federation/pkg/federation-controller/configmap/configmap_controller_test.go +++ b/federation/pkg/federation-controller/sync/configmap_controller_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package configmap +package sync import ( "fmt" @@ -27,6 +27,7 @@ import ( "k8s.io/apimachinery/pkg/util/wait" federationapi "k8s.io/kubernetes/federation/apis/federation/v1beta1" fakefedclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset/fake" + "k8s.io/kubernetes/federation/pkg/federatedtypes" "k8s.io/kubernetes/federation/pkg/federation-controller/util" "k8s.io/kubernetes/federation/pkg/federation-controller/util/deletionhelper" . "k8s.io/kubernetes/federation/pkg/federation-controller/util/test" @@ -38,13 +39,11 @@ import ( "github.com/stretchr/testify/assert" ) -const ( - configmaps string = "configmaps" - clusters string = "clusters" - informerStoreErr string = "configmap should have appeared in the informer store" -) - func TestConfigMapController(t *testing.T) { + configmaps := "configmaps" + clusters := "clusters" + informerStoreErr := "configmap should have appeared in the informer store" + cluster1 := NewCluster("cluster1", apiv1.ConditionTrue) cluster2 := NewCluster("cluster2", apiv1.ConditionTrue) @@ -66,8 +65,8 @@ func TestConfigMapController(t *testing.T) { RegisterFakeList(configmaps, &cluster2Client.Fake, &apiv1.ConfigMapList{Items: []apiv1.ConfigMap{}}) cluster2CreateChan := RegisterFakeCopyOnCreate(configmaps, &cluster2Client.Fake, cluster2Watch) - configmapController := NewConfigMapController(fakeClient) - informer := ToFederatedInformerForTestOnly(configmapController.configmapFederatedInformer) + configmapController := newFederationSyncController(fakeClient, federatedtypes.NewConfigMapAdapter(fakeClient)) + informer := ToFederatedInformerForTestOnly(configmapController.informer) informer.SetClientFactory(func(cluster *federationapi.Cluster) (kubeclientset.Interface, error) { switch cluster.Name { case cluster1.Name: @@ -79,10 +78,7 @@ func TestConfigMapController(t *testing.T) { } }) - configmapController.clusterAvailableDelay = time.Second - configmapController.configmapReviewDelay = 50 * time.Millisecond - configmapController.smallDelay = 20 * time.Millisecond - configmapController.updateTimeout = 5 * time.Second + configmapController.minimizeLatency() stop := make(chan struct{}) configmapController.Run(stop) @@ -115,7 +111,7 @@ func TestConfigMapController(t *testing.T) { // Wait for the configmap to appear in the informer store err := WaitForStoreUpdate( - configmapController.configmapFederatedInformer.GetTargetStore(), + configmapController.informer.GetTargetStore(), cluster1.Name, types.NamespacedName{Namespace: configmap1.Namespace, Name: configmap1.Name}.String(), wait.ForeverTestTimeout) assert.Nil(t, err, informerStoreErr) @@ -132,7 +128,7 @@ func TestConfigMapController(t *testing.T) { // Wait for the configmap to appear in the informer store err = WaitForConfigMapStoreUpdate( - configmapController.configmapFederatedInformer.GetTargetStore(), + configmapController.informer.GetTargetStore(), cluster1.Name, types.NamespacedName{Namespace: configmap1.Namespace, Name: configmap1.Name}.String(), configmap1, wait.ForeverTestTimeout) assert.Nil(t, err, informerStoreErr) diff --git a/federation/pkg/federation-controller/sync/secret_controller_test.go b/federation/pkg/federation-controller/sync/secret_controller_test.go index 7499070cc26..870fa1e36fe 100644 --- a/federation/pkg/federation-controller/sync/secret_controller_test.go +++ b/federation/pkg/federation-controller/sync/secret_controller_test.go @@ -40,12 +40,10 @@ import ( "github.com/stretchr/testify/assert" ) -const ( - clusters string = "clusters" - secrets string = "secrets" -) - func TestSecretController(t *testing.T) { + clusters := "clusters" + secrets := "secrets" + cluster1 := NewCluster("cluster1", apiv1.ConditionTrue) cluster2 := NewCluster("cluster2", apiv1.ConditionTrue) diff --git a/test/test_owners.csv b/test/test_owners.csv index f42046f7976..cc34b3096d4 100644 --- a/test/test_owners.csv +++ b/test/test_owners.csv @@ -600,7 +600,6 @@ k8s.io/kubernetes/federation/pkg/dnsprovider/providers/aws/route53,cjcullen,1, k8s.io/kubernetes/federation/pkg/dnsprovider/providers/coredns,brendandburns,0, k8s.io/kubernetes/federation/pkg/dnsprovider/providers/google/clouddns,madhusudancs,1, k8s.io/kubernetes/federation/pkg/federation-controller/cluster,nikhiljindal,0, -k8s.io/kubernetes/federation/pkg/federation-controller/configmap,mwielgus,0, k8s.io/kubernetes/federation/pkg/federation-controller/daemonset,childsb,1, k8s.io/kubernetes/federation/pkg/federation-controller/deployment,zmerlynn,1, k8s.io/kubernetes/federation/pkg/federation-controller/ingress,vishh,1,