From 5336f2de052ea107c5ff1b97af0216704cc1c094 Mon Sep 17 00:00:00 2001 From: Jonathan MacMillan Date: Mon, 24 Apr 2017 17:05:24 -0700 Subject: [PATCH 1/2] [Federation] Convert ConfigMaps to use the generic sync controller. --- .../federation-controller-manager/app/BUILD | 1 - .../app/controllermanager.go | 7 - federation/pkg/federatedtypes/BUILD | 1 + federation/pkg/federatedtypes/configmap.go | 146 +++++++ federation/pkg/federation-controller/BUILD | 1 - .../pkg/federation-controller/configmap/BUILD | 73 ---- .../configmap/configmap_controller.go | 401 ------------------ .../pkg/federation-controller/sync/BUILD | 5 +- .../configmap_controller_test.go | 26 +- .../sync/secret_controller_test.go | 8 +- test/test_owners.csv | 1 - 11 files changed, 165 insertions(+), 505 deletions(-) create mode 100644 federation/pkg/federatedtypes/configmap.go delete mode 100644 federation/pkg/federation-controller/configmap/BUILD delete mode 100644 federation/pkg/federation-controller/configmap/configmap_controller.go rename federation/pkg/federation-controller/{configmap => sync}/configmap_controller_test.go (91%) diff --git a/federation/cmd/federation-controller-manager/app/BUILD b/federation/cmd/federation-controller-manager/app/BUILD index 38402d62baa..a942168e54b 100644 --- a/federation/cmd/federation-controller-manager/app/BUILD +++ b/federation/cmd/federation-controller-manager/app/BUILD @@ -24,7 +24,6 @@ go_library( "//federation/pkg/dnsprovider/providers/google/clouddns:go_default_library", "//federation/pkg/federatedtypes:go_default_library", "//federation/pkg/federation-controller/cluster:go_default_library", - "//federation/pkg/federation-controller/configmap:go_default_library", "//federation/pkg/federation-controller/daemonset:go_default_library", "//federation/pkg/federation-controller/deployment:go_default_library", "//federation/pkg/federation-controller/ingress:go_default_library", diff --git a/federation/cmd/federation-controller-manager/app/controllermanager.go b/federation/cmd/federation-controller-manager/app/controllermanager.go index 2ac834f4db6..cf22253c929 100644 --- a/federation/cmd/federation-controller-manager/app/controllermanager.go +++ b/federation/cmd/federation-controller-manager/app/controllermanager.go @@ -38,7 +38,6 @@ import ( "k8s.io/kubernetes/federation/pkg/dnsprovider" "k8s.io/kubernetes/federation/pkg/federatedtypes" clustercontroller "k8s.io/kubernetes/federation/pkg/federation-controller/cluster" - configmapcontroller "k8s.io/kubernetes/federation/pkg/federation-controller/configmap" daemonsetcontroller "k8s.io/kubernetes/federation/pkg/federation-controller/daemonset" deploymentcontroller "k8s.io/kubernetes/federation/pkg/federation-controller/deployment" ingresscontroller "k8s.io/kubernetes/federation/pkg/federation-controller/ingress" @@ -166,12 +165,6 @@ func StartControllers(s *options.CMServer, restClientCfg *restclient.Config) err } } - if controllerEnabled(s.Controllers, serverResources, configmapcontroller.ControllerName, configmapcontroller.RequiredResources, true) { - configmapcontrollerClientset := federationclientset.NewForConfigOrDie(restclient.AddUserAgent(restClientCfg, "configmap-controller")) - configmapcontroller := configmapcontroller.NewConfigMapController(configmapcontrollerClientset) - configmapcontroller.Run(wait.NeverStop) - } - if controllerEnabled(s.Controllers, serverResources, daemonsetcontroller.ControllerName, daemonsetcontroller.RequiredResources, true) { daemonsetcontrollerClientset := federationclientset.NewForConfigOrDie(restclient.AddUserAgent(restClientCfg, "daemonset-controller")) daemonsetcontroller := daemonsetcontroller.NewDaemonSetController(daemonsetcontrollerClientset) diff --git a/federation/pkg/federatedtypes/BUILD b/federation/pkg/federatedtypes/BUILD index 977d6384360..3fe24120af5 100644 --- a/federation/pkg/federatedtypes/BUILD +++ b/federation/pkg/federatedtypes/BUILD @@ -11,6 +11,7 @@ go_library( name = "go_default_library", srcs = [ "adapter.go", + "configmap.go", "registry.go", "secret.go", ], diff --git a/federation/pkg/federatedtypes/configmap.go b/federation/pkg/federatedtypes/configmap.go new file mode 100644 index 00000000000..b60cea286df --- /dev/null +++ b/federation/pkg/federatedtypes/configmap.go @@ -0,0 +1,146 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package federatedtypes + +import ( + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + pkgruntime "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/watch" + federationclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset" + "k8s.io/kubernetes/federation/pkg/federation-controller/util" + apiv1 "k8s.io/kubernetes/pkg/api/v1" + kubeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" +) + +const ( + ConfigMapKind = "configmap" + ConfigMapControllerName = "configmaps" +) + +func init() { + RegisterFederatedType(ConfigMapKind, ConfigMapControllerName, []schema.GroupVersionResource{apiv1.SchemeGroupVersion.WithResource(ConfigMapControllerName)}, NewConfigMapAdapter) +} + +type ConfigMapAdapter struct { + client federationclientset.Interface +} + +func NewConfigMapAdapter(client federationclientset.Interface) FederatedTypeAdapter { + return &ConfigMapAdapter{client: client} +} + +func (a *ConfigMapAdapter) Kind() string { + return ConfigMapKind +} + +func (a *ConfigMapAdapter) ObjectType() pkgruntime.Object { + return &apiv1.ConfigMap{} +} + +func (a *ConfigMapAdapter) IsExpectedType(obj interface{}) bool { + _, ok := obj.(*apiv1.ConfigMap) + return ok +} + +func (a *ConfigMapAdapter) Copy(obj pkgruntime.Object) pkgruntime.Object { + configmap := obj.(*apiv1.ConfigMap) + return &apiv1.ConfigMap{ + ObjectMeta: util.DeepCopyRelevantObjectMeta(configmap.ObjectMeta), + Data: configmap.Data, + } +} + +func (a *ConfigMapAdapter) Equivalent(obj1, obj2 pkgruntime.Object) bool { + configmap1 := obj1.(*apiv1.ConfigMap) + configmap2 := obj2.(*apiv1.ConfigMap) + return util.ConfigMapEquivalent(configmap1, configmap2) +} + +func (a *ConfigMapAdapter) NamespacedName(obj pkgruntime.Object) types.NamespacedName { + configmap := obj.(*apiv1.ConfigMap) + return types.NamespacedName{Namespace: configmap.Namespace, Name: configmap.Name} +} + +func (a *ConfigMapAdapter) ObjectMeta(obj pkgruntime.Object) *metav1.ObjectMeta { + return &obj.(*apiv1.ConfigMap).ObjectMeta +} + +func (a *ConfigMapAdapter) FedCreate(obj pkgruntime.Object) (pkgruntime.Object, error) { + configmap := obj.(*apiv1.ConfigMap) + return a.client.CoreV1().ConfigMaps(configmap.Namespace).Create(configmap) +} + +func (a *ConfigMapAdapter) FedDelete(namespacedName types.NamespacedName, options *metav1.DeleteOptions) error { + return a.client.CoreV1().ConfigMaps(namespacedName.Namespace).Delete(namespacedName.Name, options) +} + +func (a *ConfigMapAdapter) FedGet(namespacedName types.NamespacedName) (pkgruntime.Object, error) { + return a.client.CoreV1().ConfigMaps(namespacedName.Namespace).Get(namespacedName.Name, metav1.GetOptions{}) +} + +func (a *ConfigMapAdapter) FedList(namespace string, options metav1.ListOptions) (pkgruntime.Object, error) { + return a.client.CoreV1().ConfigMaps(namespace).List(options) +} + +func (a *ConfigMapAdapter) FedUpdate(obj pkgruntime.Object) (pkgruntime.Object, error) { + configmap := obj.(*apiv1.ConfigMap) + return a.client.CoreV1().ConfigMaps(configmap.Namespace).Update(configmap) +} + +func (a *ConfigMapAdapter) FedWatch(namespace string, options metav1.ListOptions) (watch.Interface, error) { + return a.client.CoreV1().ConfigMaps(namespace).Watch(options) +} + +func (a *ConfigMapAdapter) ClusterCreate(client kubeclientset.Interface, obj pkgruntime.Object) (pkgruntime.Object, error) { + configmap := obj.(*apiv1.ConfigMap) + return client.CoreV1().ConfigMaps(configmap.Namespace).Create(configmap) +} + +func (a *ConfigMapAdapter) ClusterDelete(client kubeclientset.Interface, nsName types.NamespacedName, options *metav1.DeleteOptions) error { + return client.CoreV1().ConfigMaps(nsName.Namespace).Delete(nsName.Name, options) +} + +func (a *ConfigMapAdapter) ClusterGet(client kubeclientset.Interface, namespacedName types.NamespacedName) (pkgruntime.Object, error) { + return client.CoreV1().ConfigMaps(namespacedName.Namespace).Get(namespacedName.Name, metav1.GetOptions{}) +} + +func (a *ConfigMapAdapter) ClusterList(client kubeclientset.Interface, namespace string, options metav1.ListOptions) (pkgruntime.Object, error) { + return client.CoreV1().ConfigMaps(namespace).List(options) +} + +func (a *ConfigMapAdapter) ClusterUpdate(client kubeclientset.Interface, obj pkgruntime.Object) (pkgruntime.Object, error) { + configmap := obj.(*apiv1.ConfigMap) + return client.CoreV1().ConfigMaps(configmap.Namespace).Update(configmap) +} + +func (a *ConfigMapAdapter) ClusterWatch(client kubeclientset.Interface, namespace string, options metav1.ListOptions) (watch.Interface, error) { + return client.CoreV1().ConfigMaps(namespace).Watch(options) +} + +func (a *ConfigMapAdapter) NewTestObject(namespace string) pkgruntime.Object { + return &apiv1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: "test-configmap-", + Namespace: namespace, + }, + Data: map[string]string{ + "A": "ala ma kota", + }, + } +} diff --git a/federation/pkg/federation-controller/BUILD b/federation/pkg/federation-controller/BUILD index 85c0acda9c5..9e557def121 100644 --- a/federation/pkg/federation-controller/BUILD +++ b/federation/pkg/federation-controller/BUILD @@ -25,7 +25,6 @@ filegroup( srcs = [ ":package-srcs", "//federation/pkg/federation-controller/cluster:all-srcs", - "//federation/pkg/federation-controller/configmap:all-srcs", "//federation/pkg/federation-controller/daemonset:all-srcs", "//federation/pkg/federation-controller/deployment:all-srcs", "//federation/pkg/federation-controller/ingress:all-srcs", diff --git a/federation/pkg/federation-controller/configmap/BUILD b/federation/pkg/federation-controller/configmap/BUILD deleted file mode 100644 index f26c6a88f6a..00000000000 --- a/federation/pkg/federation-controller/configmap/BUILD +++ /dev/null @@ -1,73 +0,0 @@ -package(default_visibility = ["//visibility:public"]) - -licenses(["notice"]) - -load( - "@io_bazel_rules_go//go:def.bzl", - "go_library", - "go_test", -) - -go_library( - name = "go_default_library", - srcs = ["configmap_controller.go"], - tags = ["automanaged"], - deps = [ - "//federation/apis/federation/v1beta1:go_default_library", - "//federation/client/clientset_generated/federation_clientset:go_default_library", - "//federation/pkg/federation-controller/util:go_default_library", - "//federation/pkg/federation-controller/util/deletionhelper:go_default_library", - "//federation/pkg/federation-controller/util/eventsink:go_default_library", - "//pkg/api:go_default_library", - "//pkg/api/v1:go_default_library", - "//pkg/client/clientset_generated/clientset:go_default_library", - "//pkg/controller:go_default_library", - "//vendor/github.com/golang/glog:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/types:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/watch:go_default_library", - "//vendor/k8s.io/client-go/pkg/api/v1:go_default_library", - "//vendor/k8s.io/client-go/tools/cache:go_default_library", - "//vendor/k8s.io/client-go/tools/record:go_default_library", - "//vendor/k8s.io/client-go/util/flowcontrol:go_default_library", - ], -) - -go_test( - name = "go_default_test", - srcs = ["configmap_controller_test.go"], - library = ":go_default_library", - tags = ["automanaged"], - deps = [ - "//federation/apis/federation/v1beta1:go_default_library", - "//federation/client/clientset_generated/federation_clientset/fake:go_default_library", - "//federation/pkg/federation-controller/util:go_default_library", - "//federation/pkg/federation-controller/util/deletionhelper:go_default_library", - "//federation/pkg/federation-controller/util/test:go_default_library", - "//pkg/api/v1:go_default_library", - "//pkg/client/clientset_generated/clientset:go_default_library", - "//pkg/client/clientset_generated/clientset/fake:go_default_library", - "//vendor/github.com/golang/glog:go_default_library", - "//vendor/github.com/stretchr/testify/assert:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/types:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", - ], -) - -filegroup( - name = "package-srcs", - srcs = glob(["**"]), - tags = ["automanaged"], - visibility = ["//visibility:private"], -) - -filegroup( - name = "all-srcs", - srcs = [":package-srcs"], - tags = ["automanaged"], -) diff --git a/federation/pkg/federation-controller/configmap/configmap_controller.go b/federation/pkg/federation-controller/configmap/configmap_controller.go deleted file mode 100644 index ca0c0955a18..00000000000 --- a/federation/pkg/federation-controller/configmap/configmap_controller.go +++ /dev/null @@ -1,401 +0,0 @@ -/* -Copyright 2016 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package configmap - -import ( - "fmt" - "time" - - "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - pkgruntime "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/runtime/schema" - "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/watch" - clientv1 "k8s.io/client-go/pkg/api/v1" - "k8s.io/client-go/tools/cache" - "k8s.io/client-go/tools/record" - "k8s.io/client-go/util/flowcontrol" - federationapi "k8s.io/kubernetes/federation/apis/federation/v1beta1" - federationclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset" - "k8s.io/kubernetes/federation/pkg/federation-controller/util" - "k8s.io/kubernetes/federation/pkg/federation-controller/util/deletionhelper" - "k8s.io/kubernetes/federation/pkg/federation-controller/util/eventsink" - "k8s.io/kubernetes/pkg/api" - apiv1 "k8s.io/kubernetes/pkg/api/v1" - kubeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" - "k8s.io/kubernetes/pkg/controller" - - "github.com/golang/glog" -) - -const ( - allClustersKey = "ALL_CLUSTERS" - ControllerName = "configmaps" -) - -var ( - RequiredResources = []schema.GroupVersionResource{apiv1.SchemeGroupVersion.WithResource("configmaps")} -) - -type ConfigMapController struct { - // For triggering single configmap reconciliation. This is used when there is an - // add/update/delete operation on a configmap in either federated API server or - // in some member of the federation. - configmapDeliverer *util.DelayingDeliverer - - // For triggering all configmaps reconciliation. This is used when - // a new cluster becomes available. - clusterDeliverer *util.DelayingDeliverer - - // Contains configmaps present in members of federation. - configmapFederatedInformer util.FederatedInformer - // For updating members of federation. - federatedUpdater util.FederatedUpdater - // Definitions of configmaps that should be federated. - configmapInformerStore cache.Store - // Informer controller for configmaps that should be federated. - configmapInformerController cache.Controller - - // Client to federated api server. - federatedApiClient federationclientset.Interface - - // Backoff manager for configmaps - configmapBackoff *flowcontrol.Backoff - - // For events - eventRecorder record.EventRecorder - - // Finalizers - deletionHelper *deletionhelper.DeletionHelper - - configmapReviewDelay time.Duration - clusterAvailableDelay time.Duration - smallDelay time.Duration - updateTimeout time.Duration -} - -// NewConfigMapController returns a new configmap controller -func NewConfigMapController(client federationclientset.Interface) *ConfigMapController { - broadcaster := record.NewBroadcaster() - broadcaster.StartRecordingToSink(eventsink.NewFederatedEventSink(client)) - recorder := broadcaster.NewRecorder(api.Scheme, clientv1.EventSource{Component: "federated-configmaps-controller"}) - - configmapcontroller := &ConfigMapController{ - federatedApiClient: client, - configmapReviewDelay: time.Second * 10, - clusterAvailableDelay: time.Second * 20, - smallDelay: time.Second * 3, - updateTimeout: time.Second * 30, - configmapBackoff: flowcontrol.NewBackOff(5*time.Second, time.Minute), - eventRecorder: recorder, - } - - // Build delivereres for triggering reconciliations. - configmapcontroller.configmapDeliverer = util.NewDelayingDeliverer() - configmapcontroller.clusterDeliverer = util.NewDelayingDeliverer() - - // Start informer on federated API servers on configmaps that should be federated. - configmapcontroller.configmapInformerStore, configmapcontroller.configmapInformerController = cache.NewInformer( - &cache.ListWatch{ - ListFunc: func(options metav1.ListOptions) (pkgruntime.Object, error) { - return client.Core().ConfigMaps(metav1.NamespaceAll).List(options) - }, - WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { - return client.Core().ConfigMaps(metav1.NamespaceAll).Watch(options) - }, - }, - &apiv1.ConfigMap{}, - controller.NoResyncPeriodFunc(), - util.NewTriggerOnAllChanges(func(obj pkgruntime.Object) { configmapcontroller.deliverConfigMapObj(obj, 0, false) })) - - // Federated informer on configmaps in members of federation. - configmapcontroller.configmapFederatedInformer = util.NewFederatedInformer( - client, - func(cluster *federationapi.Cluster, targetClient kubeclientset.Interface) (cache.Store, cache.Controller) { - return cache.NewInformer( - &cache.ListWatch{ - ListFunc: func(options metav1.ListOptions) (pkgruntime.Object, error) { - return targetClient.Core().ConfigMaps(metav1.NamespaceAll).List(options) - }, - WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { - return targetClient.Core().ConfigMaps(metav1.NamespaceAll).Watch(options) - }, - }, - &apiv1.ConfigMap{}, - controller.NoResyncPeriodFunc(), - // Trigger reconciliation whenever something in federated cluster is changed. In most cases it - // would be just confirmation that some configmap operation succeeded. - util.NewTriggerOnAllChanges( - func(obj pkgruntime.Object) { - configmapcontroller.deliverConfigMapObj(obj, configmapcontroller.configmapReviewDelay, false) - }, - )) - }, - - &util.ClusterLifecycleHandlerFuncs{ - ClusterAvailable: func(cluster *federationapi.Cluster) { - // When new cluster becomes available process all the configmaps again. - configmapcontroller.clusterDeliverer.DeliverAt(allClustersKey, nil, time.Now().Add(configmapcontroller.clusterAvailableDelay)) - }, - }, - ) - - // Federated updater along with Create/Update/Delete operations. - configmapcontroller.federatedUpdater = util.NewFederatedUpdater(configmapcontroller.configmapFederatedInformer, - func(client kubeclientset.Interface, obj pkgruntime.Object) error { - configmap := obj.(*apiv1.ConfigMap) - _, err := client.Core().ConfigMaps(configmap.Namespace).Create(configmap) - return err - }, - func(client kubeclientset.Interface, obj pkgruntime.Object) error { - configmap := obj.(*apiv1.ConfigMap) - _, err := client.Core().ConfigMaps(configmap.Namespace).Update(configmap) - return err - }, - func(client kubeclientset.Interface, obj pkgruntime.Object) error { - configmap := obj.(*apiv1.ConfigMap) - orphanDependents := false - err := client.Core().ConfigMaps(configmap.Namespace).Delete(configmap.Name, &metav1.DeleteOptions{OrphanDependents: &orphanDependents}) - return err - }) - - configmapcontroller.deletionHelper = deletionhelper.NewDeletionHelper( - configmapcontroller.updateConfigMap, - // objNameFunc - func(obj pkgruntime.Object) string { - configmap := obj.(*apiv1.ConfigMap) - return configmap.Name - }, - configmapcontroller.updateTimeout, - configmapcontroller.eventRecorder, - configmapcontroller.configmapFederatedInformer, - configmapcontroller.federatedUpdater, - ) - - return configmapcontroller -} - -// Sends the given updated object to apiserver. -// Assumes that the given object is a configmap. -func (configmapcontroller *ConfigMapController) updateConfigMap(obj pkgruntime.Object) (pkgruntime.Object, error) { - configmap := obj.(*apiv1.ConfigMap) - return configmapcontroller.federatedApiClient.Core().ConfigMaps(configmap.Namespace).Update(configmap) -} - -func (configmapcontroller *ConfigMapController) Run(stopChan <-chan struct{}) { - go configmapcontroller.configmapInformerController.Run(stopChan) - configmapcontroller.configmapFederatedInformer.Start() - go func() { - <-stopChan - configmapcontroller.configmapFederatedInformer.Stop() - }() - configmapcontroller.configmapDeliverer.StartWithHandler(func(item *util.DelayingDelivererItem) { - configmap := item.Value.(*types.NamespacedName) - configmapcontroller.reconcileConfigMap(*configmap) - }) - configmapcontroller.clusterDeliverer.StartWithHandler(func(_ *util.DelayingDelivererItem) { - configmapcontroller.reconcileConfigMapsOnClusterChange() - }) - util.StartBackoffGC(configmapcontroller.configmapBackoff, stopChan) -} - -func (configmapcontroller *ConfigMapController) deliverConfigMapObj(obj interface{}, delay time.Duration, failed bool) { - configmap := obj.(*apiv1.ConfigMap) - configmapcontroller.deliverConfigMap(types.NamespacedName{Namespace: configmap.Namespace, Name: configmap.Name}, delay, failed) -} - -// Adds backoff to delay if this delivery is related to some failure. Resets backoff if there was no failure. -func (configmapcontroller *ConfigMapController) deliverConfigMap(configmap types.NamespacedName, delay time.Duration, failed bool) { - key := configmap.String() - if failed { - configmapcontroller.configmapBackoff.Next(key, time.Now()) - delay = delay + configmapcontroller.configmapBackoff.Get(key) - } else { - configmapcontroller.configmapBackoff.Reset(key) - } - configmapcontroller.configmapDeliverer.DeliverAfter(key, &configmap, delay) -} - -// Check whether all data stores are in sync. False is returned if any of the informer/stores is not yet -// synced with the corresponding api server. -func (configmapcontroller *ConfigMapController) isSynced() bool { - if !configmapcontroller.configmapFederatedInformer.ClustersSynced() { - glog.V(2).Infof("Cluster list not synced") - return false - } - clusters, err := configmapcontroller.configmapFederatedInformer.GetReadyClusters() - if err != nil { - glog.Errorf("Failed to get ready clusters: %v", err) - return false - } - if !configmapcontroller.configmapFederatedInformer.GetTargetStore().ClustersSynced(clusters) { - return false - } - return true -} - -// The function triggers reconciliation of all federated configmaps. -func (configmapcontroller *ConfigMapController) reconcileConfigMapsOnClusterChange() { - if !configmapcontroller.isSynced() { - glog.V(4).Infof("Configmap controller not synced") - configmapcontroller.clusterDeliverer.DeliverAt(allClustersKey, nil, time.Now().Add(configmapcontroller.clusterAvailableDelay)) - } - for _, obj := range configmapcontroller.configmapInformerStore.List() { - configmap := obj.(*apiv1.ConfigMap) - configmapcontroller.deliverConfigMap(types.NamespacedName{Namespace: configmap.Namespace, Name: configmap.Name}, - configmapcontroller.smallDelay, false) - } -} - -func (configmapcontroller *ConfigMapController) reconcileConfigMap(configmap types.NamespacedName) { - - if !configmapcontroller.isSynced() { - glog.V(4).Infof("Configmap controller not synced") - configmapcontroller.deliverConfigMap(configmap, configmapcontroller.clusterAvailableDelay, false) - return - } - - key := configmap.String() - baseConfigMapObj, exist, err := configmapcontroller.configmapInformerStore.GetByKey(key) - if err != nil { - glog.Errorf("Failed to query main configmap store for %v: %v", key, err) - configmapcontroller.deliverConfigMap(configmap, 0, true) - return - } - - if !exist { - // Not federated configmap, ignoring. - glog.V(8).Infof("Skipping not federated config map: %s", key) - return - } - obj, err := api.Scheme.DeepCopy(baseConfigMapObj) - configMap, ok := obj.(*apiv1.ConfigMap) - if err != nil || !ok { - glog.Errorf("Error in retrieving obj from store: %v, %v", ok, err) - return - } - - // Check if deletion has been requested. - if configMap.DeletionTimestamp != nil { - if err := configmapcontroller.delete(configMap); err != nil { - glog.Errorf("Failed to delete %s: %v", configmap, err) - configmapcontroller.eventRecorder.Eventf(configMap, api.EventTypeWarning, "DeleteFailed", - "ConfigMap delete failed: %v", err) - configmapcontroller.deliverConfigMap(configmap, 0, true) - } - return - } - - glog.V(3).Infof("Ensuring delete object from underlying clusters finalizer for configmap: %s", - configMap.Name) - // Add the required finalizers before creating a configmap in underlying clusters. - updatedConfigMapObj, err := configmapcontroller.deletionHelper.EnsureFinalizers(configMap) - if err != nil { - glog.Errorf("Failed to ensure delete object from underlying clusters finalizer in configmap %s: %v", - configMap.Name, err) - configmapcontroller.deliverConfigMap(configmap, 0, false) - return - } - configMap = updatedConfigMapObj.(*apiv1.ConfigMap) - - glog.V(3).Infof("Syncing configmap %s in underlying clusters", configMap.Name) - - clusters, err := configmapcontroller.configmapFederatedInformer.GetReadyClusters() - if err != nil { - glog.Errorf("Failed to get cluster list: %v, retrying shortly", err) - configmapcontroller.deliverConfigMap(configmap, configmapcontroller.clusterAvailableDelay, false) - return - } - - operations := make([]util.FederatedOperation, 0) - for _, cluster := range clusters { - clusterConfigMapObj, found, err := configmapcontroller.configmapFederatedInformer.GetTargetStore().GetByKey(cluster.Name, key) - if err != nil { - glog.Errorf("Failed to get %s from %s: %v, retrying shortly", key, cluster.Name, err) - configmapcontroller.deliverConfigMap(configmap, 0, true) - return - } - - // Do not modify data. - desiredConfigMap := &apiv1.ConfigMap{ - ObjectMeta: util.DeepCopyRelevantObjectMeta(configMap.ObjectMeta), - Data: configMap.Data, - } - - if !found { - configmapcontroller.eventRecorder.Eventf(configMap, api.EventTypeNormal, "CreateInCluster", - "Creating configmap in cluster %s", cluster.Name) - - operations = append(operations, util.FederatedOperation{ - Type: util.OperationTypeAdd, - Obj: desiredConfigMap, - ClusterName: cluster.Name, - }) - } else { - clusterConfigMap := clusterConfigMapObj.(*apiv1.ConfigMap) - - // Update existing configmap, if needed. - if !util.ConfigMapEquivalent(desiredConfigMap, clusterConfigMap) { - configmapcontroller.eventRecorder.Eventf(configMap, api.EventTypeNormal, "UpdateInCluster", - "Updating configmap in cluster %s", cluster.Name) - operations = append(operations, util.FederatedOperation{ - Type: util.OperationTypeUpdate, - Obj: desiredConfigMap, - ClusterName: cluster.Name, - }) - } - } - } - - if len(operations) == 0 { - // Everything is in order - glog.V(8).Infof("No operations needed for %s", key) - return - } - err = configmapcontroller.federatedUpdater.UpdateWithOnError(operations, configmapcontroller.updateTimeout, - func(op util.FederatedOperation, operror error) { - configmapcontroller.eventRecorder.Eventf(configMap, api.EventTypeWarning, "UpdateInClusterFailed", - "ConfigMap update in cluster %s failed: %v", op.ClusterName, operror) - }) - - if err != nil { - glog.Errorf("Failed to execute updates for %s: %v, retrying shortly", key, err) - configmapcontroller.deliverConfigMap(configmap, 0, true) - return - } -} - -// delete deletes the given configmap or returns error if the deletion was not complete. -func (configmapcontroller *ConfigMapController) delete(configmap *apiv1.ConfigMap) error { - glog.V(3).Infof("Handling deletion of configmap: %v", *configmap) - _, err := configmapcontroller.deletionHelper.HandleObjectInUnderlyingClusters(configmap) - if err != nil { - return err - } - - err = configmapcontroller.federatedApiClient.Core().ConfigMaps(configmap.Namespace).Delete(configmap.Name, nil) - if err != nil { - // Its all good if the error is not found error. That means it is deleted already and we do not have to do anything. - // This is expected when we are processing an update as a result of configmap finalizer deletion. - // The process that deleted the last finalizer is also going to delete the configmap and we do not have to do anything. - if !errors.IsNotFound(err) { - return fmt.Errorf("failed to delete configmap: %v", err) - } - } - return nil -} diff --git a/federation/pkg/federation-controller/sync/BUILD b/federation/pkg/federation-controller/sync/BUILD index 58f9918827c..eb226c3b0f4 100644 --- a/federation/pkg/federation-controller/sync/BUILD +++ b/federation/pkg/federation-controller/sync/BUILD @@ -51,7 +51,10 @@ filegroup( go_test( name = "go_default_test", - srcs = ["secret_controller_test.go"], + srcs = [ + "configmap_controller_test.go", + "secret_controller_test.go", + ], library = ":go_default_library", tags = ["automanaged"], deps = [ diff --git a/federation/pkg/federation-controller/configmap/configmap_controller_test.go b/federation/pkg/federation-controller/sync/configmap_controller_test.go similarity index 91% rename from federation/pkg/federation-controller/configmap/configmap_controller_test.go rename to federation/pkg/federation-controller/sync/configmap_controller_test.go index fdb6642ad81..c0ffe56ea8b 100644 --- a/federation/pkg/federation-controller/configmap/configmap_controller_test.go +++ b/federation/pkg/federation-controller/sync/configmap_controller_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package configmap +package sync import ( "fmt" @@ -27,6 +27,7 @@ import ( "k8s.io/apimachinery/pkg/util/wait" federationapi "k8s.io/kubernetes/federation/apis/federation/v1beta1" fakefedclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset/fake" + "k8s.io/kubernetes/federation/pkg/federatedtypes" "k8s.io/kubernetes/federation/pkg/federation-controller/util" "k8s.io/kubernetes/federation/pkg/federation-controller/util/deletionhelper" . "k8s.io/kubernetes/federation/pkg/federation-controller/util/test" @@ -38,13 +39,11 @@ import ( "github.com/stretchr/testify/assert" ) -const ( - configmaps string = "configmaps" - clusters string = "clusters" - informerStoreErr string = "configmap should have appeared in the informer store" -) - func TestConfigMapController(t *testing.T) { + configmaps := "configmaps" + clusters := "clusters" + informerStoreErr := "configmap should have appeared in the informer store" + cluster1 := NewCluster("cluster1", apiv1.ConditionTrue) cluster2 := NewCluster("cluster2", apiv1.ConditionTrue) @@ -66,8 +65,8 @@ func TestConfigMapController(t *testing.T) { RegisterFakeList(configmaps, &cluster2Client.Fake, &apiv1.ConfigMapList{Items: []apiv1.ConfigMap{}}) cluster2CreateChan := RegisterFakeCopyOnCreate(configmaps, &cluster2Client.Fake, cluster2Watch) - configmapController := NewConfigMapController(fakeClient) - informer := ToFederatedInformerForTestOnly(configmapController.configmapFederatedInformer) + configmapController := newFederationSyncController(fakeClient, federatedtypes.NewConfigMapAdapter(fakeClient)) + informer := ToFederatedInformerForTestOnly(configmapController.informer) informer.SetClientFactory(func(cluster *federationapi.Cluster) (kubeclientset.Interface, error) { switch cluster.Name { case cluster1.Name: @@ -79,10 +78,7 @@ func TestConfigMapController(t *testing.T) { } }) - configmapController.clusterAvailableDelay = time.Second - configmapController.configmapReviewDelay = 50 * time.Millisecond - configmapController.smallDelay = 20 * time.Millisecond - configmapController.updateTimeout = 5 * time.Second + configmapController.minimizeLatency() stop := make(chan struct{}) configmapController.Run(stop) @@ -115,7 +111,7 @@ func TestConfigMapController(t *testing.T) { // Wait for the configmap to appear in the informer store err := WaitForStoreUpdate( - configmapController.configmapFederatedInformer.GetTargetStore(), + configmapController.informer.GetTargetStore(), cluster1.Name, types.NamespacedName{Namespace: configmap1.Namespace, Name: configmap1.Name}.String(), wait.ForeverTestTimeout) assert.Nil(t, err, informerStoreErr) @@ -132,7 +128,7 @@ func TestConfigMapController(t *testing.T) { // Wait for the configmap to appear in the informer store err = WaitForConfigMapStoreUpdate( - configmapController.configmapFederatedInformer.GetTargetStore(), + configmapController.informer.GetTargetStore(), cluster1.Name, types.NamespacedName{Namespace: configmap1.Namespace, Name: configmap1.Name}.String(), configmap1, wait.ForeverTestTimeout) assert.Nil(t, err, informerStoreErr) diff --git a/federation/pkg/federation-controller/sync/secret_controller_test.go b/federation/pkg/federation-controller/sync/secret_controller_test.go index 7499070cc26..870fa1e36fe 100644 --- a/federation/pkg/federation-controller/sync/secret_controller_test.go +++ b/federation/pkg/federation-controller/sync/secret_controller_test.go @@ -40,12 +40,10 @@ import ( "github.com/stretchr/testify/assert" ) -const ( - clusters string = "clusters" - secrets string = "secrets" -) - func TestSecretController(t *testing.T) { + clusters := "clusters" + secrets := "secrets" + cluster1 := NewCluster("cluster1", apiv1.ConditionTrue) cluster2 := NewCluster("cluster2", apiv1.ConditionTrue) diff --git a/test/test_owners.csv b/test/test_owners.csv index f42046f7976..cc34b3096d4 100644 --- a/test/test_owners.csv +++ b/test/test_owners.csv @@ -600,7 +600,6 @@ k8s.io/kubernetes/federation/pkg/dnsprovider/providers/aws/route53,cjcullen,1, k8s.io/kubernetes/federation/pkg/dnsprovider/providers/coredns,brendandburns,0, k8s.io/kubernetes/federation/pkg/dnsprovider/providers/google/clouddns,madhusudancs,1, k8s.io/kubernetes/federation/pkg/federation-controller/cluster,nikhiljindal,0, -k8s.io/kubernetes/federation/pkg/federation-controller/configmap,mwielgus,0, k8s.io/kubernetes/federation/pkg/federation-controller/daemonset,childsb,1, k8s.io/kubernetes/federation/pkg/federation-controller/deployment,zmerlynn,1, k8s.io/kubernetes/federation/pkg/federation-controller/ingress,vishh,1, From 15e81959e9af124aadb9fae376c176a10f2f00e3 Mon Sep 17 00:00:00 2001 From: Christian Bell Date: Wed, 26 Apr 2017 10:21:59 -0700 Subject: [PATCH 2/2] [Federation] Convert Daemonset to use the generic sync controller --- .../federation-controller-manager/app/BUILD | 1 - .../app/controllermanager.go | 7 - federation/pkg/federatedtypes/BUILD | 2 + federation/pkg/federatedtypes/daemonset.go | 163 +++++++ federation/pkg/federation-controller/BUILD | 1 - .../pkg/federation-controller/daemonset/BUILD | 72 --- .../daemonset/daemonset_controller.go | 436 ------------------ .../pkg/federation-controller/sync/BUILD | 2 + .../daemonset_controller_test.go | 34 +- test/test_owners.csv | 1 - 10 files changed, 185 insertions(+), 534 deletions(-) create mode 100644 federation/pkg/federatedtypes/daemonset.go delete mode 100644 federation/pkg/federation-controller/daemonset/BUILD delete mode 100644 federation/pkg/federation-controller/daemonset/daemonset_controller.go rename federation/pkg/federation-controller/{daemonset => sync}/daemonset_controller_test.go (90%) diff --git a/federation/cmd/federation-controller-manager/app/BUILD b/federation/cmd/federation-controller-manager/app/BUILD index a942168e54b..cf0a729775c 100644 --- a/federation/cmd/federation-controller-manager/app/BUILD +++ b/federation/cmd/federation-controller-manager/app/BUILD @@ -24,7 +24,6 @@ go_library( "//federation/pkg/dnsprovider/providers/google/clouddns:go_default_library", "//federation/pkg/federatedtypes:go_default_library", "//federation/pkg/federation-controller/cluster:go_default_library", - "//federation/pkg/federation-controller/daemonset:go_default_library", "//federation/pkg/federation-controller/deployment:go_default_library", "//federation/pkg/federation-controller/ingress:go_default_library", "//federation/pkg/federation-controller/namespace:go_default_library", diff --git a/federation/cmd/federation-controller-manager/app/controllermanager.go b/federation/cmd/federation-controller-manager/app/controllermanager.go index cf22253c929..7bd7f9b1f39 100644 --- a/federation/cmd/federation-controller-manager/app/controllermanager.go +++ b/federation/cmd/federation-controller-manager/app/controllermanager.go @@ -38,7 +38,6 @@ import ( "k8s.io/kubernetes/federation/pkg/dnsprovider" "k8s.io/kubernetes/federation/pkg/federatedtypes" clustercontroller "k8s.io/kubernetes/federation/pkg/federation-controller/cluster" - daemonsetcontroller "k8s.io/kubernetes/federation/pkg/federation-controller/daemonset" deploymentcontroller "k8s.io/kubernetes/federation/pkg/federation-controller/deployment" ingresscontroller "k8s.io/kubernetes/federation/pkg/federation-controller/ingress" namespacecontroller "k8s.io/kubernetes/federation/pkg/federation-controller/namespace" @@ -165,12 +164,6 @@ func StartControllers(s *options.CMServer, restClientCfg *restclient.Config) err } } - if controllerEnabled(s.Controllers, serverResources, daemonsetcontroller.ControllerName, daemonsetcontroller.RequiredResources, true) { - daemonsetcontrollerClientset := federationclientset.NewForConfigOrDie(restclient.AddUserAgent(restClientCfg, "daemonset-controller")) - daemonsetcontroller := daemonsetcontroller.NewDaemonSetController(daemonsetcontrollerClientset) - daemonsetcontroller.Run(wait.NeverStop) - } - if controllerEnabled(s.Controllers, serverResources, replicasetcontroller.ControllerName, replicasetcontroller.RequiredResources, true) { replicaSetClientset := federationclientset.NewForConfigOrDie(restclient.AddUserAgent(restClientCfg, replicasetcontroller.UserAgentName)) replicaSetController := replicasetcontroller.NewReplicaSetController(replicaSetClientset) diff --git a/federation/pkg/federatedtypes/BUILD b/federation/pkg/federatedtypes/BUILD index 3fe24120af5..a9bcfce4f56 100644 --- a/federation/pkg/federatedtypes/BUILD +++ b/federation/pkg/federatedtypes/BUILD @@ -12,6 +12,7 @@ go_library( srcs = [ "adapter.go", "configmap.go", + "daemonset.go", "registry.go", "secret.go", ], @@ -20,6 +21,7 @@ go_library( "//federation/client/clientset_generated/federation_clientset:go_default_library", "//federation/pkg/federation-controller/util:go_default_library", "//pkg/api/v1:go_default_library", + "//pkg/apis/extensions/v1beta1:go_default_library", "//pkg/client/clientset_generated/clientset:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", diff --git a/federation/pkg/federatedtypes/daemonset.go b/federation/pkg/federatedtypes/daemonset.go new file mode 100644 index 00000000000..593c5ee9bd3 --- /dev/null +++ b/federation/pkg/federatedtypes/daemonset.go @@ -0,0 +1,163 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package federatedtypes + +import ( + "reflect" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + pkgruntime "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/watch" + federationclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset" + "k8s.io/kubernetes/federation/pkg/federation-controller/util" + "k8s.io/kubernetes/pkg/api/v1" + extensionsv1 "k8s.io/kubernetes/pkg/apis/extensions/v1beta1" + kubeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" +) + +const ( + DaemonSetKind = "daemonset" + DaemonSetControllerName = "daemonsets" +) + +func init() { + RegisterFederatedType(DaemonSetKind, DaemonSetControllerName, []schema.GroupVersionResource{extensionsv1.SchemeGroupVersion.WithResource(DaemonSetControllerName)}, NewDaemonSetAdapter) +} + +type DaemonSetAdapter struct { + client federationclientset.Interface +} + +func NewDaemonSetAdapter(client federationclientset.Interface) FederatedTypeAdapter { + return &DaemonSetAdapter{client: client} +} + +func (a *DaemonSetAdapter) Kind() string { + return DaemonSetKind +} + +func (a *DaemonSetAdapter) ObjectType() pkgruntime.Object { + return &extensionsv1.DaemonSet{} +} + +func (a *DaemonSetAdapter) IsExpectedType(obj interface{}) bool { + _, ok := obj.(*extensionsv1.DaemonSet) + return ok +} + +func (a *DaemonSetAdapter) Copy(obj pkgruntime.Object) pkgruntime.Object { + daemonset := obj.(*extensionsv1.DaemonSet) + return &extensionsv1.DaemonSet{ + ObjectMeta: util.DeepCopyRelevantObjectMeta(daemonset.ObjectMeta), + Spec: *(util.DeepCopyApiTypeOrPanic(&daemonset.Spec).(*extensionsv1.DaemonSetSpec)), + } +} + +func (a *DaemonSetAdapter) Equivalent(obj1, obj2 pkgruntime.Object) bool { + daemonset1 := obj1.(*extensionsv1.DaemonSet) + daemonset2 := obj2.(*extensionsv1.DaemonSet) + return util.ObjectMetaEquivalent(daemonset1.ObjectMeta, daemonset2.ObjectMeta) && reflect.DeepEqual(daemonset1.Spec, daemonset2.Spec) +} + +func (a *DaemonSetAdapter) NamespacedName(obj pkgruntime.Object) types.NamespacedName { + daemonset := obj.(*extensionsv1.DaemonSet) + return types.NamespacedName{Namespace: daemonset.Namespace, Name: daemonset.Name} +} + +func (a *DaemonSetAdapter) ObjectMeta(obj pkgruntime.Object) *metav1.ObjectMeta { + return &obj.(*extensionsv1.DaemonSet).ObjectMeta +} + +func (a *DaemonSetAdapter) FedCreate(obj pkgruntime.Object) (pkgruntime.Object, error) { + daemonset := obj.(*extensionsv1.DaemonSet) + return a.client.Extensions().DaemonSets(daemonset.Namespace).Create(daemonset) +} + +func (a *DaemonSetAdapter) FedDelete(namespacedName types.NamespacedName, options *metav1.DeleteOptions) error { + return a.client.Extensions().DaemonSets(namespacedName.Namespace).Delete(namespacedName.Name, options) +} + +func (a *DaemonSetAdapter) FedGet(namespacedName types.NamespacedName) (pkgruntime.Object, error) { + return a.client.Extensions().DaemonSets(namespacedName.Namespace).Get(namespacedName.Name, metav1.GetOptions{}) +} + +func (a *DaemonSetAdapter) FedList(namespace string, options metav1.ListOptions) (pkgruntime.Object, error) { + return a.client.Extensions().DaemonSets(namespace).List(options) +} + +func (a *DaemonSetAdapter) FedUpdate(obj pkgruntime.Object) (pkgruntime.Object, error) { + daemonset := obj.(*extensionsv1.DaemonSet) + return a.client.Extensions().DaemonSets(daemonset.Namespace).Update(daemonset) +} + +func (a *DaemonSetAdapter) FedWatch(namespace string, options metav1.ListOptions) (watch.Interface, error) { + return a.client.Extensions().DaemonSets(namespace).Watch(options) +} + +func (a *DaemonSetAdapter) ClusterCreate(client kubeclientset.Interface, obj pkgruntime.Object) (pkgruntime.Object, error) { + daemonset := obj.(*extensionsv1.DaemonSet) + return client.Extensions().DaemonSets(daemonset.Namespace).Create(daemonset) +} + +func (a *DaemonSetAdapter) ClusterDelete(client kubeclientset.Interface, nsName types.NamespacedName, options *metav1.DeleteOptions) error { + return client.Extensions().DaemonSets(nsName.Namespace).Delete(nsName.Name, options) +} + +func (a *DaemonSetAdapter) ClusterGet(client kubeclientset.Interface, namespacedName types.NamespacedName) (pkgruntime.Object, error) { + return client.Extensions().DaemonSets(namespacedName.Namespace).Get(namespacedName.Name, metav1.GetOptions{}) +} + +func (a *DaemonSetAdapter) ClusterList(client kubeclientset.Interface, namespace string, options metav1.ListOptions) (pkgruntime.Object, error) { + return client.Extensions().DaemonSets(namespace).List(options) +} + +func (a *DaemonSetAdapter) ClusterUpdate(client kubeclientset.Interface, obj pkgruntime.Object) (pkgruntime.Object, error) { + daemonset := obj.(*extensionsv1.DaemonSet) + return client.Extensions().DaemonSets(daemonset.Namespace).Update(daemonset) +} + +func (a *DaemonSetAdapter) ClusterWatch(client kubeclientset.Interface, namespace string, options metav1.ListOptions) (watch.Interface, error) { + return client.Extensions().DaemonSets(namespace).Watch(options) +} + +func (a *DaemonSetAdapter) NewTestObject(namespace string) pkgruntime.Object { + return &extensionsv1.DaemonSet{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: "test-daemonset-", + Namespace: namespace, + Labels: map[string]string{"app": "test-daemonset"}, + }, + Spec: extensionsv1.DaemonSetSpec{ + Template: v1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{"name": "test-pod"}, + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "test-daemonset", + Image: "images/test-daemonset", + Ports: []v1.ContainerPort{{ContainerPort: 9376}}, + }, + }, + }, + }, + }, + } +} diff --git a/federation/pkg/federation-controller/BUILD b/federation/pkg/federation-controller/BUILD index 9e557def121..89eb75e9364 100644 --- a/federation/pkg/federation-controller/BUILD +++ b/federation/pkg/federation-controller/BUILD @@ -25,7 +25,6 @@ filegroup( srcs = [ ":package-srcs", "//federation/pkg/federation-controller/cluster:all-srcs", - "//federation/pkg/federation-controller/daemonset:all-srcs", "//federation/pkg/federation-controller/deployment:all-srcs", "//federation/pkg/federation-controller/ingress:all-srcs", "//federation/pkg/federation-controller/namespace:all-srcs", diff --git a/federation/pkg/federation-controller/daemonset/BUILD b/federation/pkg/federation-controller/daemonset/BUILD deleted file mode 100644 index 7b1d8a67e8d..00000000000 --- a/federation/pkg/federation-controller/daemonset/BUILD +++ /dev/null @@ -1,72 +0,0 @@ -package(default_visibility = ["//visibility:public"]) - -licenses(["notice"]) - -load( - "@io_bazel_rules_go//go:def.bzl", - "go_library", - "go_test", -) - -go_library( - name = "go_default_library", - srcs = ["daemonset_controller.go"], - tags = ["automanaged"], - deps = [ - "//federation/apis/federation/v1beta1:go_default_library", - "//federation/client/clientset_generated/federation_clientset:go_default_library", - "//federation/pkg/federation-controller/util:go_default_library", - "//federation/pkg/federation-controller/util/deletionhelper:go_default_library", - "//federation/pkg/federation-controller/util/eventsink:go_default_library", - "//pkg/api:go_default_library", - "//pkg/apis/extensions/v1beta1:go_default_library", - "//pkg/client/clientset_generated/clientset:go_default_library", - "//pkg/controller:go_default_library", - "//vendor/github.com/golang/glog:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/types:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/watch:go_default_library", - "//vendor/k8s.io/client-go/pkg/api/v1:go_default_library", - "//vendor/k8s.io/client-go/tools/cache:go_default_library", - "//vendor/k8s.io/client-go/tools/record:go_default_library", - "//vendor/k8s.io/client-go/util/flowcontrol:go_default_library", - ], -) - -go_test( - name = "go_default_test", - srcs = ["daemonset_controller_test.go"], - library = ":go_default_library", - tags = ["automanaged"], - deps = [ - "//federation/apis/federation/v1beta1:go_default_library", - "//federation/client/clientset_generated/federation_clientset/fake:go_default_library", - "//federation/pkg/federation-controller/util:go_default_library", - "//federation/pkg/federation-controller/util/deletionhelper:go_default_library", - "//federation/pkg/federation-controller/util/test:go_default_library", - "//pkg/api/v1:go_default_library", - "//pkg/apis/extensions/v1beta1:go_default_library", - "//pkg/client/clientset_generated/clientset:go_default_library", - "//pkg/client/clientset_generated/clientset/fake:go_default_library", - "//vendor/github.com/stretchr/testify/assert:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", - ], -) - -filegroup( - name = "package-srcs", - srcs = glob(["**"]), - tags = ["automanaged"], - visibility = ["//visibility:private"], -) - -filegroup( - name = "all-srcs", - srcs = [":package-srcs"], - tags = ["automanaged"], -) diff --git a/federation/pkg/federation-controller/daemonset/daemonset_controller.go b/federation/pkg/federation-controller/daemonset/daemonset_controller.go deleted file mode 100644 index 2ee33334887..00000000000 --- a/federation/pkg/federation-controller/daemonset/daemonset_controller.go +++ /dev/null @@ -1,436 +0,0 @@ -/* -Copyright 2016 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package daemonset - -import ( - "fmt" - "reflect" - "time" - - "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - pkgruntime "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/runtime/schema" - "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/watch" - clientv1 "k8s.io/client-go/pkg/api/v1" - "k8s.io/client-go/tools/cache" - "k8s.io/client-go/tools/record" - "k8s.io/client-go/util/flowcontrol" - federationapi "k8s.io/kubernetes/federation/apis/federation/v1beta1" - federationclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset" - "k8s.io/kubernetes/federation/pkg/federation-controller/util" - "k8s.io/kubernetes/federation/pkg/federation-controller/util/deletionhelper" - "k8s.io/kubernetes/federation/pkg/federation-controller/util/eventsink" - "k8s.io/kubernetes/pkg/api" - extensionsv1 "k8s.io/kubernetes/pkg/apis/extensions/v1beta1" - kubeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" - "k8s.io/kubernetes/pkg/controller" - - "github.com/golang/glog" -) - -const ( - allClustersKey = "ALL_CLUSTERS" - ControllerName = "daemonsets" -) - -var ( - RequiredResources = []schema.GroupVersionResource{extensionsv1.SchemeGroupVersion.WithResource("daemonsets")} -) - -type DaemonSetController struct { - // For triggering single daemonset reconciliation. This is used when there is an - // add/update/delete operation on a daemonset in either federated API server or - // in some member of the federation. - daemonsetDeliverer *util.DelayingDeliverer - - // For triggering all daemonsets reconciliation. This is used when - // a new cluster becomes available. - clusterDeliverer *util.DelayingDeliverer - - // Contains daemonsets present in members of federation. - daemonsetFederatedInformer util.FederatedInformer - // For updating members of federation. - federatedUpdater util.FederatedUpdater - // Definitions of daemonsets that should be federated. - daemonsetInformerStore cache.Store - // Informer controller for daemonsets that should be federated. - daemonsetInformerController cache.Controller - - // Client to federated api server. - federatedApiClient federationclientset.Interface - - // Backoff manager for daemonsets - daemonsetBackoff *flowcontrol.Backoff - - // For events - eventRecorder record.EventRecorder - - deletionHelper *deletionhelper.DeletionHelper - - daemonsetReviewDelay time.Duration - clusterAvailableDelay time.Duration - smallDelay time.Duration - updateTimeout time.Duration -} - -// NewDaemonSetController returns a new daemonset controller -func NewDaemonSetController(client federationclientset.Interface) *DaemonSetController { - broadcaster := record.NewBroadcaster() - broadcaster.StartRecordingToSink(eventsink.NewFederatedEventSink(client)) - recorder := broadcaster.NewRecorder(api.Scheme, clientv1.EventSource{Component: "federated-daemonset-controller"}) - - daemonsetcontroller := &DaemonSetController{ - federatedApiClient: client, - daemonsetReviewDelay: time.Second * 10, - clusterAvailableDelay: time.Second * 20, - smallDelay: time.Second * 3, - updateTimeout: time.Second * 30, - daemonsetBackoff: flowcontrol.NewBackOff(5*time.Second, time.Minute), - eventRecorder: recorder, - } - - // Build deliverers for triggering reconciliations. - daemonsetcontroller.daemonsetDeliverer = util.NewDelayingDeliverer() - daemonsetcontroller.clusterDeliverer = util.NewDelayingDeliverer() - - // Start informer in federated API servers on daemonsets that should be federated. - daemonsetcontroller.daemonsetInformerStore, daemonsetcontroller.daemonsetInformerController = cache.NewInformer( - &cache.ListWatch{ - ListFunc: func(options metav1.ListOptions) (pkgruntime.Object, error) { - return client.Extensions().DaemonSets(metav1.NamespaceAll).List(options) - }, - WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { - return client.Extensions().DaemonSets(metav1.NamespaceAll).Watch(options) - }, - }, - &extensionsv1.DaemonSet{}, - controller.NoResyncPeriodFunc(), - util.NewTriggerOnAllChanges(func(obj pkgruntime.Object) { daemonsetcontroller.deliverDaemonSetObj(obj, 0, false) })) - - // Federated informer on daemonsets in members of federation. - daemonsetcontroller.daemonsetFederatedInformer = util.NewFederatedInformer( - client, - func(cluster *federationapi.Cluster, targetClient kubeclientset.Interface) (cache.Store, cache.Controller) { - return cache.NewInformer( - &cache.ListWatch{ - ListFunc: func(options metav1.ListOptions) (pkgruntime.Object, error) { - return targetClient.Extensions().DaemonSets(metav1.NamespaceAll).List(options) - }, - WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { - return targetClient.Extensions().DaemonSets(metav1.NamespaceAll).Watch(options) - }, - }, - &extensionsv1.DaemonSet{}, - controller.NoResyncPeriodFunc(), - // Trigger reconciliation whenever something in federated cluster is changed. In most cases it - // would be just confirmation that some daemonset operation succeeded. - util.NewTriggerOnAllChanges( - func(obj pkgruntime.Object) { - daemonsetcontroller.deliverDaemonSetObj(obj, daemonsetcontroller.daemonsetReviewDelay, false) - }, - )) - }, - - &util.ClusterLifecycleHandlerFuncs{ - ClusterAvailable: func(cluster *federationapi.Cluster) { - // When new cluster becomes available process all the daemonsets again. - daemonsetcontroller.clusterDeliverer.DeliverAt(allClustersKey, nil, time.Now().Add(daemonsetcontroller.clusterAvailableDelay)) - }, - }, - ) - - // Federated updater along with Create/Update/Delete operations. - daemonsetcontroller.federatedUpdater = util.NewFederatedUpdater(daemonsetcontroller.daemonsetFederatedInformer, - func(client kubeclientset.Interface, obj pkgruntime.Object) error { - daemonset := obj.(*extensionsv1.DaemonSet) - glog.V(4).Infof("Attempting to create daemonset: %s/%s", daemonset.Namespace, daemonset.Name) - _, err := client.Extensions().DaemonSets(daemonset.Namespace).Create(daemonset) - if err != nil { - glog.Errorf("Error creating daemonset %s/%s/: %v", daemonset.Namespace, daemonset.Name, err) - } else { - glog.V(4).Infof("Successfully created daemonset %s/%s", daemonset.Namespace, daemonset.Name) - } - return err - }, - func(client kubeclientset.Interface, obj pkgruntime.Object) error { - daemonset := obj.(*extensionsv1.DaemonSet) - glog.V(4).Infof("Attempting to update daemonset: %s/%s", daemonset.Namespace, daemonset.Name) - _, err := client.Extensions().DaemonSets(daemonset.Namespace).Update(daemonset) - if err != nil { - glog.Errorf("Error updating daemonset %s/%s/: %v", daemonset.Namespace, daemonset.Name, err) - } else { - glog.V(4).Infof("Successfully updating daemonset %s/%s", daemonset.Namespace, daemonset.Name) - } - return err - }, - func(client kubeclientset.Interface, obj pkgruntime.Object) error { - daemonset := obj.(*extensionsv1.DaemonSet) - glog.V(4).Infof("Attempting to delete daemonset: %s/%s", daemonset.Namespace, daemonset.Name) - orphanDependents := false - err := client.Extensions().DaemonSets(daemonset.Namespace).Delete(daemonset.Name, &metav1.DeleteOptions{OrphanDependents: &orphanDependents}) - if err != nil { - glog.Errorf("Error deleting daemonset %s/%s/: %v", daemonset.Namespace, daemonset.Name, err) - } else { - glog.V(4).Infof("Successfully deleting daemonset %s/%s", daemonset.Namespace, daemonset.Name) - } - return err - }) - - daemonsetcontroller.deletionHelper = deletionhelper.NewDeletionHelper( - daemonsetcontroller.updateDaemonSet, - // objNameFunc - func(obj pkgruntime.Object) string { - daemonset := obj.(*extensionsv1.DaemonSet) - return daemonset.Name - }, - daemonsetcontroller.updateTimeout, - daemonsetcontroller.eventRecorder, - daemonsetcontroller.daemonsetFederatedInformer, - daemonsetcontroller.federatedUpdater, - ) - - return daemonsetcontroller -} - -// Sends the given updated object to apiserver. -// Assumes that the given object is a daemonset. -func (daemonsetcontroller *DaemonSetController) updateDaemonSet(obj pkgruntime.Object) (pkgruntime.Object, error) { - daemonset := obj.(*extensionsv1.DaemonSet) - return daemonsetcontroller.federatedApiClient.Extensions().DaemonSets(daemonset.Namespace).Update(daemonset) -} - -func (daemonsetcontroller *DaemonSetController) Run(stopChan <-chan struct{}) { - glog.V(1).Infof("Starting daemonset controller") - go daemonsetcontroller.daemonsetInformerController.Run(stopChan) - - glog.V(1).Infof("Starting daemonset federated informer") - daemonsetcontroller.daemonsetFederatedInformer.Start() - go func() { - <-stopChan - daemonsetcontroller.daemonsetFederatedInformer.Stop() - }() - glog.V(1).Infof("Starting daemonset deliverers") - daemonsetcontroller.daemonsetDeliverer.StartWithHandler(func(item *util.DelayingDelivererItem) { - daemonset := item.Value.(*types.NamespacedName) - glog.V(4).Infof("Trigerring reconciliation of daemonset %s", daemonset.String()) - daemonsetcontroller.reconcileDaemonSet(daemonset.Namespace, daemonset.Name) - }) - daemonsetcontroller.clusterDeliverer.StartWithHandler(func(_ *util.DelayingDelivererItem) { - glog.V(4).Infof("Triggering reconciliation of all daemonsets") - daemonsetcontroller.reconcileDaemonSetsOnClusterChange() - }) - util.StartBackoffGC(daemonsetcontroller.daemonsetBackoff, stopChan) -} - -func getDaemonSetKey(namespace, name string) string { - return types.NamespacedName{ - Namespace: namespace, - Name: name, - }.String() -} - -func (daemonsetcontroller *DaemonSetController) deliverDaemonSetObj(obj interface{}, delay time.Duration, failed bool) { - daemonset := obj.(*extensionsv1.DaemonSet) - daemonsetcontroller.deliverDaemonSet(daemonset.Namespace, daemonset.Name, delay, failed) -} - -// Adds backoff to delay if this delivery is related to some failure. Resets backoff if there was no failure. -func (daemonsetcontroller *DaemonSetController) deliverDaemonSet(namespace string, name string, delay time.Duration, failed bool) { - key := getDaemonSetKey(namespace, name) - if failed { - daemonsetcontroller.daemonsetBackoff.Next(key, time.Now()) - delay = delay + daemonsetcontroller.daemonsetBackoff.Get(key) - } else { - daemonsetcontroller.daemonsetBackoff.Reset(key) - } - daemonsetcontroller.daemonsetDeliverer.DeliverAfter(key, - &types.NamespacedName{Namespace: namespace, Name: name}, delay) -} - -// Check whether all data stores are in sync. False is returned if any of the informer/stores is not yet -// synced with the corresponding api server. -func (daemonsetcontroller *DaemonSetController) isSynced() bool { - if !daemonsetcontroller.daemonsetFederatedInformer.ClustersSynced() { - glog.V(2).Infof("Cluster list not synced") - return false - } - clusters, err := daemonsetcontroller.daemonsetFederatedInformer.GetReadyClusters() - if err != nil { - glog.Errorf("Failed to get ready clusters: %v", err) - return false - } - if !daemonsetcontroller.daemonsetFederatedInformer.GetTargetStore().ClustersSynced(clusters) { - return false - } - return true -} - -// The function triggers reconciliation of all federated daemonsets. -func (daemonsetcontroller *DaemonSetController) reconcileDaemonSetsOnClusterChange() { - if !daemonsetcontroller.isSynced() { - daemonsetcontroller.clusterDeliverer.DeliverAt(allClustersKey, nil, time.Now().Add(daemonsetcontroller.clusterAvailableDelay)) - } - for _, obj := range daemonsetcontroller.daemonsetInformerStore.List() { - daemonset := obj.(*extensionsv1.DaemonSet) - daemonsetcontroller.deliverDaemonSet(daemonset.Namespace, daemonset.Name, daemonsetcontroller.smallDelay, false) - } -} - -func (daemonsetcontroller *DaemonSetController) reconcileDaemonSet(namespace string, daemonsetName string) { - glog.V(4).Infof("Reconciling daemonset %s/%s", namespace, daemonsetName) - - if !daemonsetcontroller.isSynced() { - glog.V(4).Infof("Daemonset controller is not synced") - daemonsetcontroller.deliverDaemonSet(namespace, daemonsetName, daemonsetcontroller.clusterAvailableDelay, false) - return - } - - key := getDaemonSetKey(namespace, daemonsetName) - baseDaemonSetObjFromStore, exist, err := daemonsetcontroller.daemonsetInformerStore.GetByKey(key) - if err != nil { - glog.Errorf("Failed to query main daemonset store for %v: %v", key, err) - daemonsetcontroller.deliverDaemonSet(namespace, daemonsetName, 0, true) - return - } - - if !exist { - glog.V(4).Infof("Skipping daemonset %s/%s - not federated", namespace, daemonsetName) - // Not federated daemonset, ignoring. - return - } - baseDaemonSetObj, err := api.Scheme.DeepCopy(baseDaemonSetObjFromStore) - baseDaemonSet, ok := baseDaemonSetObj.(*extensionsv1.DaemonSet) - if err != nil || !ok { - glog.Errorf("Error in retrieving obj %s from store: %v, %v", daemonsetName, ok, err) - daemonsetcontroller.deliverDaemonSet(namespace, daemonsetName, 0, true) - return - } - if baseDaemonSet.DeletionTimestamp != nil { - if err := daemonsetcontroller.delete(baseDaemonSet); err != nil { - glog.Errorf("Failed to delete %s: %v", daemonsetName, err) - daemonsetcontroller.eventRecorder.Eventf(baseDaemonSet, api.EventTypeWarning, "DeleteFailed", - "DaemonSet delete failed: %v", err) - daemonsetcontroller.deliverDaemonSet(namespace, daemonsetName, 0, true) - } - return - } - - glog.V(3).Infof("Ensuring delete object from underlying clusters finalizer for daemonset: %s", - baseDaemonSet.Name) - // Add the required finalizers before creating a daemonset in underlying clusters. - updatedDaemonSetObj, err := daemonsetcontroller.deletionHelper.EnsureFinalizers(baseDaemonSet) - if err != nil { - glog.Errorf("Failed to ensure delete object from underlying clusters finalizer in daemonset %s: %v", - baseDaemonSet.Name, err) - daemonsetcontroller.deliverDaemonSet(namespace, daemonsetName, 0, false) - return - } - baseDaemonSet = updatedDaemonSetObj.(*extensionsv1.DaemonSet) - - glog.V(3).Infof("Syncing daemonset %s in underlying clusters", baseDaemonSet.Name) - - clusters, err := daemonsetcontroller.daemonsetFederatedInformer.GetReadyClusters() - if err != nil { - glog.Errorf("Failed to get cluster list: %v", err) - daemonsetcontroller.deliverDaemonSet(namespace, daemonsetName, daemonsetcontroller.clusterAvailableDelay, false) - return - } - - operations := make([]util.FederatedOperation, 0) - for _, cluster := range clusters { - clusterDaemonSetObj, found, err := daemonsetcontroller.daemonsetFederatedInformer.GetTargetStore().GetByKey(cluster.Name, key) - if err != nil { - glog.Errorf("Failed to get %s from %s: %v", key, cluster.Name, err) - daemonsetcontroller.deliverDaemonSet(namespace, daemonsetName, 0, true) - return - } - - // Do not modify. Otherwise make a deep copy. - desiredDaemonSet := &extensionsv1.DaemonSet{ - ObjectMeta: util.DeepCopyRelevantObjectMeta(baseDaemonSet.ObjectMeta), - Spec: *(util.DeepCopyApiTypeOrPanic(&baseDaemonSet.Spec).(*extensionsv1.DaemonSetSpec)), - } - - if !found { - glog.V(4).Infof("Creating daemonset %s/%s in cluster %s", namespace, daemonsetName, cluster.Name) - - daemonsetcontroller.eventRecorder.Eventf(baseDaemonSet, api.EventTypeNormal, "CreateInCluster", - "Creating daemonset in cluster %s", cluster.Name) - - operations = append(operations, util.FederatedOperation{ - Type: util.OperationTypeAdd, - Obj: desiredDaemonSet, - ClusterName: cluster.Name, - }) - } else { - clusterDaemonSet := clusterDaemonSetObj.(*extensionsv1.DaemonSet) - - // Update existing daemonset, if needed. - if !util.ObjectMetaEquivalent(desiredDaemonSet.ObjectMeta, clusterDaemonSet.ObjectMeta) || - !reflect.DeepEqual(desiredDaemonSet.Spec, clusterDaemonSet.Spec) { - - glog.V(4).Infof("Upadting daemonset %s/%s in cluster %s", namespace, daemonsetName, cluster.Name) - daemonsetcontroller.eventRecorder.Eventf(baseDaemonSet, api.EventTypeNormal, "UpdateInCluster", - "Updating daemonset in cluster %s", cluster.Name) - operations = append(operations, util.FederatedOperation{ - Type: util.OperationTypeUpdate, - Obj: desiredDaemonSet, - ClusterName: cluster.Name, - }) - } - } - } - - if len(operations) == 0 { - glog.V(4).Infof("No operation needed for %s/%s", namespace, daemonsetName) - // Everything is in order - return - } - err = daemonsetcontroller.federatedUpdater.UpdateWithOnError(operations, daemonsetcontroller.updateTimeout, - func(op util.FederatedOperation, operror error) { - daemonsetcontroller.eventRecorder.Eventf(baseDaemonSet, api.EventTypeWarning, "UpdateInClusterFailed", - "DaemonSet update in cluster %s failed: %v", op.ClusterName, operror) - }) - - if err != nil { - glog.Errorf("Failed to execute updates for %s: %v, retrying shortly", key, err) - daemonsetcontroller.deliverDaemonSet(namespace, daemonsetName, 0, true) - return - } -} - -// delete deletes the given daemonset or returns error if the deletion was not complete. -func (daemonsetcontroller *DaemonSetController) delete(daemonset *extensionsv1.DaemonSet) error { - glog.V(3).Infof("Handling deletion of daemonset: %v", *daemonset) - _, err := daemonsetcontroller.deletionHelper.HandleObjectInUnderlyingClusters(daemonset) - if err != nil { - return err - } - - err = daemonsetcontroller.federatedApiClient.Extensions().DaemonSets(daemonset.Namespace).Delete(daemonset.Name, nil) - if err != nil { - // Its all good if the error is not found error. That means it is deleted already and we do not have to do anything. - // This is expected when we are processing an update as a result of daemonset finalizer deletion. - // The process that deleted the last finalizer is also going to delete the daemonset and we do not have to do anything. - if !errors.IsNotFound(err) { - return fmt.Errorf("failed to delete daemonset: %v", err) - } - } - return nil -} diff --git a/federation/pkg/federation-controller/sync/BUILD b/federation/pkg/federation-controller/sync/BUILD index eb226c3b0f4..710519cfeff 100644 --- a/federation/pkg/federation-controller/sync/BUILD +++ b/federation/pkg/federation-controller/sync/BUILD @@ -53,6 +53,7 @@ go_test( name = "go_default_test", srcs = [ "configmap_controller_test.go", + "daemonset_controller_test.go", "secret_controller_test.go", ], library = ":go_default_library", @@ -65,6 +66,7 @@ go_test( "//federation/pkg/federation-controller/util/deletionhelper:go_default_library", "//federation/pkg/federation-controller/util/test:go_default_library", "//pkg/api/v1:go_default_library", + "//pkg/apis/extensions/v1beta1:go_default_library", "//pkg/client/clientset_generated/clientset:go_default_library", "//pkg/client/clientset_generated/clientset/fake:go_default_library", "//vendor/github.com/golang/glog:go_default_library", diff --git a/federation/pkg/federation-controller/daemonset/daemonset_controller_test.go b/federation/pkg/federation-controller/sync/daemonset_controller_test.go similarity index 90% rename from federation/pkg/federation-controller/daemonset/daemonset_controller_test.go rename to federation/pkg/federation-controller/sync/daemonset_controller_test.go index 52b9f9f39bc..1a9a035b65d 100644 --- a/federation/pkg/federation-controller/daemonset/daemonset_controller_test.go +++ b/federation/pkg/federation-controller/sync/daemonset_controller_test.go @@ -14,19 +14,20 @@ See the License for the specific language governing permissions and limitations under the License. */ -package daemonset +package sync import ( "fmt" "reflect" "testing" - "time" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/wait" federationapi "k8s.io/kubernetes/federation/apis/federation/v1beta1" fakefedclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset/fake" + "k8s.io/kubernetes/federation/pkg/federatedtypes" "k8s.io/kubernetes/federation/pkg/federation-controller/util" "k8s.io/kubernetes/federation/pkg/federation-controller/util/deletionhelper" . "k8s.io/kubernetes/federation/pkg/federation-controller/util/test" @@ -38,12 +39,10 @@ import ( "github.com/stretchr/testify/assert" ) -const ( - daemonsets string = "daemonsets" - clusters string = "clusters" -) - func TestDaemonSetController(t *testing.T) { + daemonsets := "daemonsets" + clusters := "clusters" + cluster1 := NewCluster("cluster1", apiv1.ConditionTrue) cluster2 := NewCluster("cluster2", apiv1.ConditionTrue) @@ -65,8 +64,8 @@ func TestDaemonSetController(t *testing.T) { RegisterFakeList(daemonsets, &cluster2Client.Fake, &extensionsv1.DaemonSetList{Items: []extensionsv1.DaemonSet{}}) cluster2CreateChan := RegisterFakeCopyOnCreate(daemonsets, &cluster2Client.Fake, cluster2Watch) - daemonsetController := NewDaemonSetController(fakeClient) - informer := ToFederatedInformerForTestOnly(daemonsetController.daemonsetFederatedInformer) + daemonsetController := newFederationSyncController(fakeClient, federatedtypes.NewDaemonSetAdapter(fakeClient)) + informer := ToFederatedInformerForTestOnly(daemonsetController.informer) informer.SetClientFactory(func(cluster *federationapi.Cluster) (kubeclientset.Interface, error) { switch cluster.Name { case cluster1.Name: @@ -78,10 +77,7 @@ func TestDaemonSetController(t *testing.T) { } }) - daemonsetController.clusterAvailableDelay = time.Second - daemonsetController.daemonsetReviewDelay = 50 * time.Millisecond - daemonsetController.smallDelay = 20 * time.Millisecond - daemonsetController.updateTimeout = 5 * time.Second + daemonsetController.minimizeLatency() stop := make(chan struct{}) daemonsetController.Run(stop) @@ -94,7 +90,9 @@ func TestDaemonSetController(t *testing.T) { }, Spec: extensionsv1.DaemonSetSpec{ Selector: &metav1.LabelSelector{ - MatchLabels: make(map[string]string), + MatchLabels: map[string]string{ + "A": "xyz", + }, }, }, } @@ -115,10 +113,14 @@ func TestDaemonSetController(t *testing.T) { assert.True(t, daemonsetsEqual(daemonset1, *createdDaemonSet), fmt.Sprintf("expected: %v, actual: %v", daemonset1, *createdDaemonSet)) + daemonsetKey := types.NamespacedName{ + Namespace: daemonset1.Namespace, + Name: daemonset1.Name, + }.String() // Wait for the daemonset to appear in the informer store err := WaitForStoreUpdate( - daemonsetController.daemonsetFederatedInformer.GetTargetStore(), - cluster1.Name, getDaemonSetKey(daemonset1.Namespace, daemonset1.Name), wait.ForeverTestTimeout) + daemonsetController.informer.GetTargetStore(), + cluster1.Name, daemonsetKey, wait.ForeverTestTimeout) assert.Nil(t, err, "daemonset should have appeared in the informer store") // TODO: Re-enable this when we have fixed these flaky tests: https://github.com/kubernetes/kubernetes/issues/36540. diff --git a/test/test_owners.csv b/test/test_owners.csv index cc34b3096d4..7a22c359d51 100644 --- a/test/test_owners.csv +++ b/test/test_owners.csv @@ -600,7 +600,6 @@ k8s.io/kubernetes/federation/pkg/dnsprovider/providers/aws/route53,cjcullen,1, k8s.io/kubernetes/federation/pkg/dnsprovider/providers/coredns,brendandburns,0, k8s.io/kubernetes/federation/pkg/dnsprovider/providers/google/clouddns,madhusudancs,1, k8s.io/kubernetes/federation/pkg/federation-controller/cluster,nikhiljindal,0, -k8s.io/kubernetes/federation/pkg/federation-controller/daemonset,childsb,1, k8s.io/kubernetes/federation/pkg/federation-controller/deployment,zmerlynn,1, k8s.io/kubernetes/federation/pkg/federation-controller/ingress,vishh,1, k8s.io/kubernetes/federation/pkg/federation-controller/namespace,rrati,0,