From fd4ff0caa5dae26316798ed174473f0d0a24751e Mon Sep 17 00:00:00 2001 From: Maru Newby Date: Tue, 2 May 2017 14:12:25 -0700 Subject: [PATCH 1/2] fed: Fix deletion helper to use namespace-qualified object names --- .../federation-controller/deployment/deploymentcontroller.go | 2 +- .../pkg/federation-controller/ingress/ingress_controller.go | 2 +- .../pkg/federation-controller/namespace/namespace_controller.go | 2 +- .../federation-controller/replicaset/replicasetcontroller.go | 2 +- .../pkg/federation-controller/service/servicecontroller.go | 2 +- federation/pkg/federation-controller/sync/controller.go | 2 +- 6 files changed, 6 insertions(+), 6 deletions(-) diff --git a/federation/pkg/federation-controller/deployment/deploymentcontroller.go b/federation/pkg/federation-controller/deployment/deploymentcontroller.go index 4b2328428cf..b6e629a554e 100644 --- a/federation/pkg/federation-controller/deployment/deploymentcontroller.go +++ b/federation/pkg/federation-controller/deployment/deploymentcontroller.go @@ -211,7 +211,7 @@ func NewDeploymentController(federationClient fedclientset.Interface) *Deploymen // objNameFunc func(obj runtime.Object) string { deployment := obj.(*extensionsv1.Deployment) - return deployment.Name + return fmt.Sprintf("%s/%s", deployment.Namespace, deployment.Name) }, updateTimeout, fdc.eventRecorder, diff --git a/federation/pkg/federation-controller/ingress/ingress_controller.go b/federation/pkg/federation-controller/ingress/ingress_controller.go index ad9ff847636..46215e72cb7 100644 --- a/federation/pkg/federation-controller/ingress/ingress_controller.go +++ b/federation/pkg/federation-controller/ingress/ingress_controller.go @@ -294,7 +294,7 @@ func NewIngressController(client federationclientset.Interface) *IngressControll // objNameFunc func(obj pkgruntime.Object) string { ingress := obj.(*extensionsv1beta1.Ingress) - return ingress.Name + return fmt.Sprintf("%s/%s", ingress.Namespace, ingress.Name) }, ic.updateTimeout, ic.eventRecorder, diff --git a/federation/pkg/federation-controller/namespace/namespace_controller.go b/federation/pkg/federation-controller/namespace/namespace_controller.go index b33e4ad1dba..37f5748063b 100644 --- a/federation/pkg/federation-controller/namespace/namespace_controller.go +++ b/federation/pkg/federation-controller/namespace/namespace_controller.go @@ -183,7 +183,7 @@ func NewNamespaceController(client federationclientset.Interface, dynamicClientP // objNameFunc func(obj runtime.Object) string { namespace := obj.(*apiv1.Namespace) - return namespace.Name + return fmt.Sprintf("%s/%s", namespace.Namespace, namespace.Name) }, nc.updateTimeout, nc.eventRecorder, diff --git a/federation/pkg/federation-controller/replicaset/replicasetcontroller.go b/federation/pkg/federation-controller/replicaset/replicasetcontroller.go index 2d2b5242170..69e158c0dee 100644 --- a/federation/pkg/federation-controller/replicaset/replicasetcontroller.go +++ b/federation/pkg/federation-controller/replicaset/replicasetcontroller.go @@ -219,7 +219,7 @@ func NewReplicaSetController(federationClient fedclientset.Interface) *ReplicaSe // objNameFunc func(obj runtime.Object) string { replicaset := obj.(*extensionsv1.ReplicaSet) - return replicaset.Name + return fmt.Sprintf("%s/%s", replicaset.Namespace, replicaset.Name) }, updateTimeout, frsc.eventRecorder, diff --git a/federation/pkg/federation-controller/service/servicecontroller.go b/federation/pkg/federation-controller/service/servicecontroller.go index ef89a27df80..2219abd6252 100644 --- a/federation/pkg/federation-controller/service/servicecontroller.go +++ b/federation/pkg/federation-controller/service/servicecontroller.go @@ -240,7 +240,7 @@ func New(federationClient fedclientset.Interface, dns dnsprovider.Interface, // objNameFunc func(obj pkgruntime.Object) string { service := obj.(*v1.Service) - return service.Name + return fmt.Sprintf("%s/%s", service.Namespace, service.Name) }, updateTimeout, s.eventRecorder, diff --git a/federation/pkg/federation-controller/sync/controller.go b/federation/pkg/federation-controller/sync/controller.go index 529521fd787..7d850667717 100644 --- a/federation/pkg/federation-controller/sync/controller.go +++ b/federation/pkg/federation-controller/sync/controller.go @@ -186,7 +186,7 @@ func newFederationSyncController(client federationclientset.Interface, adapter f s.updateObject, // objNameFunc func(obj pkgruntime.Object) string { - return adapter.ObjectMeta(obj).Name + return adapter.NamespacedName(obj).String() }, s.updateTimeout, s.eventRecorder, From 00ea2eb1cb8cc6717bec01c0544ed0047f32770d Mon Sep 17 00:00:00 2001 From: Maru Newby Date: Mon, 1 May 2017 14:52:56 -0700 Subject: [PATCH 2/2] fed: Make federated updater responsible for recording events --- .../deployment/deploymentcontroller.go | 16 ++----- .../ingress/ingress_controller.go | 20 +++------ .../namespace/namespace_controller.go | 16 ++----- .../replicaset/replicasetcontroller.go | 16 ++----- .../service/servicecontroller.go | 13 ++---- .../federation-controller/sync/controller.go | 16 ++----- .../pkg/federation-controller/util/BUILD | 1 + .../util/deletionhelper/BUILD | 2 - .../util/deletionhelper/deletion_helper.go | 14 ++---- .../util/federated_updater.go | 44 +++++++++++++++---- .../util/federated_updater_test.go | 14 ++++-- 11 files changed, 77 insertions(+), 95 deletions(-) diff --git a/federation/pkg/federation-controller/deployment/deploymentcontroller.go b/federation/pkg/federation-controller/deployment/deploymentcontroller.go index b6e629a554e..7a0e9864f54 100644 --- a/federation/pkg/federation-controller/deployment/deploymentcontroller.go +++ b/federation/pkg/federation-controller/deployment/deploymentcontroller.go @@ -188,7 +188,7 @@ func NewDeploymentController(federationClient fedclientset.Interface) *Deploymen ), ) - fdc.fedUpdater = fedutil.NewFederatedUpdater(fdc.fedDeploymentInformer, + fdc.fedUpdater = fedutil.NewFederatedUpdater(fdc.fedDeploymentInformer, "deployment", fdc.eventRecorder, func(client kubeclientset.Interface, obj runtime.Object) error { rs := obj.(*extensionsv1.Deployment) _, err := client.Extensions().Deployments(rs.Namespace).Create(rs) @@ -214,7 +214,6 @@ func NewDeploymentController(federationClient fedclientset.Interface) *Deploymen return fmt.Sprintf("%s/%s", deployment.Namespace, deployment.Name) }, updateTimeout, - fdc.eventRecorder, fdc.fedDeploymentInformer, fdc.fedUpdater, ) @@ -526,13 +525,11 @@ func (fdc *DeploymentController) reconcileDeployment(key string) (reconciliation if !exists { if replicas > 0 { - fdc.eventRecorder.Eventf(fd, api.EventTypeNormal, "CreateInCluster", - "Creating deployment in cluster %s", clusterName) - operations = append(operations, fedutil.FederatedOperation{ Type: fedutil.OperationTypeAdd, Obj: ld, ClusterName: clusterName, + Key: key, }) } } else { @@ -541,13 +538,11 @@ func (fdc *DeploymentController) reconcileDeployment(key string) (reconciliation currentLd := ldObj.(*extensionsv1.Deployment) // Update existing replica set, if needed. if !fedutil.DeploymentEquivalent(ld, currentLd) { - fdc.eventRecorder.Eventf(fd, api.EventTypeNormal, "UpdateInCluster", - "Updating deployment in cluster %s", clusterName) - operations = append(operations, fedutil.FederatedOperation{ Type: fedutil.OperationTypeUpdate, Obj: ld, ClusterName: clusterName, + Key: key, }) glog.Infof("Updating %s in %s", currentLd.Name, clusterName) } @@ -572,10 +567,7 @@ func (fdc *DeploymentController) reconcileDeployment(key string) (reconciliation // Everything is in order return statusAllOk, nil } - err = fdc.fedUpdater.UpdateWithOnError(operations, updateTimeout, func(op fedutil.FederatedOperation, operror error) { - fdc.eventRecorder.Eventf(fd, api.EventTypeWarning, "FailedUpdateInCluster", - "Deployment update in cluster %s failed: %v", op.ClusterName, operror) - }) + err = fdc.fedUpdater.Update(operations, updateTimeout) if err != nil { glog.Errorf("Failed to execute updates for %s: %v", key, err) return statusError, err diff --git a/federation/pkg/federation-controller/ingress/ingress_controller.go b/federation/pkg/federation-controller/ingress/ingress_controller.go index 46215e72cb7..debae9d4006 100644 --- a/federation/pkg/federation-controller/ingress/ingress_controller.go +++ b/federation/pkg/federation-controller/ingress/ingress_controller.go @@ -229,7 +229,7 @@ func NewIngressController(client federationclientset.Interface) *IngressControll ) // Federated ingress updater along with Create/Update/Delete operations. - ic.federatedIngressUpdater = util.NewFederatedUpdater(ic.ingressFederatedInformer, + ic.federatedIngressUpdater = util.NewFederatedUpdater(ic.ingressFederatedInformer, "ingress", ic.eventRecorder, func(client kubeclientset.Interface, obj pkgruntime.Object) error { ingress := obj.(*extensionsv1beta1.Ingress) glog.V(4).Infof("Attempting to create Ingress: %v", ingress) @@ -261,7 +261,7 @@ func NewIngressController(client federationclientset.Interface) *IngressControll }) // Federated configmap updater along with Create/Update/Delete operations. Only Update should ever be called. - ic.federatedConfigMapUpdater = util.NewFederatedUpdater(ic.configMapFederatedInformer, + ic.federatedConfigMapUpdater = util.NewFederatedUpdater(ic.configMapFederatedInformer, "configmap", ic.eventRecorder, func(client kubeclientset.Interface, obj pkgruntime.Object) error { configMap := obj.(*v1.ConfigMap) configMapName := types.NamespacedName{Name: configMap.Name, Namespace: configMap.Namespace} @@ -297,7 +297,6 @@ func NewIngressController(client federationclientset.Interface) *IngressControll return fmt.Sprintf("%s/%s", ingress.Namespace, ingress.Name) }, ic.updateTimeout, - ic.eventRecorder, ic.ingressFederatedInformer, ic.federatedIngressUpdater, ) @@ -566,12 +565,12 @@ func (ic *IngressController) reconcileConfigMap(cluster *federationapi.Cluster, Type: util.OperationTypeUpdate, Obj: configMap, ClusterName: cluster.Name, + Key: configMapNsName.String(), }} glog.V(4).Infof("Calling federatedConfigMapUpdater.Update() - operations: %v", operations) err := ic.federatedConfigMapUpdater.Update(operations, ic.updateTimeout) if err != nil { - configMapName := types.NamespacedName{Name: configMap.Name, Namespace: configMap.Namespace} - glog.Errorf("Failed to execute update of ConfigMap %q on cluster %q: %v", configMapName, cluster.Name, err) + glog.Errorf("Failed to execute update of ConfigMap %q on cluster %q: %v", configMapNsName, cluster.Name, err) ic.configMapDeliverer.DeliverAfter(cluster.Name, nil, ic.configMapReviewDelay) } } @@ -770,8 +769,6 @@ func (ic *IngressController) reconcileIngress(ingress types.NamespacedName) { glog.V(4).Infof("No existing Ingress %s in cluster %s - checking if appropriate to queue a create operation", ingress, cluster.Name) // We can't supply server-created fields when creating a new object. desiredIngress.ObjectMeta = util.DeepCopyRelevantObjectMeta(baseIngress.ObjectMeta) - ic.eventRecorder.Eventf(baseIngress, api.EventTypeNormal, "CreateInCluster", - "Creating ingress in cluster %s", cluster.Name) // We always first create an ingress in the first available cluster. Once that ingress // has been created and allocated a global IP (visible via an annotation), @@ -797,6 +794,7 @@ func (ic *IngressController) reconcileIngress(ingress types.NamespacedName) { Type: util.OperationTypeAdd, Obj: desiredIngress, ClusterName: cluster.Name, + Key: key, }) } else { glog.V(4).Infof("No annotation %q exists on ingress %q in federation and waiting for ingress in cluster %s. Not queueing create operation for ingress until annotation exists", staticIPNameKeyWritable, ingress, firstClusterName) @@ -867,13 +865,12 @@ func (ic *IngressController) reconcileIngress(ingress types.NamespacedName) { for key, val := range baseIngress.ObjectMeta.Labels { desiredIngress.ObjectMeta.Labels[key] = val } - ic.eventRecorder.Eventf(baseIngress, api.EventTypeNormal, "UpdateInCluster", - "Updating ingress in cluster %s", cluster.Name) operations = append(operations, util.FederatedOperation{ Type: util.OperationTypeUpdate, Obj: desiredIngress, ClusterName: cluster.Name, + Key: key, }) // TODO: Transfer any readonly (target-proxy, url-map etc) annotations from the master cluster to the federation, if this is the master cluster. // This is only for consistency, so that the federation ingress metadata matches the underlying clusters. It's not actually required } @@ -887,10 +884,7 @@ func (ic *IngressController) reconcileIngress(ingress types.NamespacedName) { return } glog.V(4).Infof("Calling federatedUpdater.Update() - operations: %v", operations) - err = ic.federatedIngressUpdater.UpdateWithOnError(operations, ic.updateTimeout, func(op util.FederatedOperation, operror error) { - ic.eventRecorder.Eventf(baseIngress, api.EventTypeWarning, "FailedClusterUpdate", - "Ingress update in cluster %s failed: %v", op.ClusterName, operror) - }) + err = ic.federatedIngressUpdater.Update(operations, ic.updateTimeout) if err != nil { glog.Errorf("Failed to execute updates for %s: %v", ingress, err) ic.deliverIngress(ingress, ic.ingressReviewDelay, true) diff --git a/federation/pkg/federation-controller/namespace/namespace_controller.go b/federation/pkg/federation-controller/namespace/namespace_controller.go index 37f5748063b..12144ac487f 100644 --- a/federation/pkg/federation-controller/namespace/namespace_controller.go +++ b/federation/pkg/federation-controller/namespace/namespace_controller.go @@ -156,7 +156,7 @@ func NewNamespaceController(client federationclientset.Interface, dynamicClientP ) // Federated updater along with Create/Update/Delete operations. - nc.federatedUpdater = util.NewFederatedUpdater(nc.namespaceFederatedInformer, + nc.federatedUpdater = util.NewFederatedUpdater(nc.namespaceFederatedInformer, "namespace", nc.eventRecorder, func(client kubeclientset.Interface, obj runtime.Object) error { namespace := obj.(*apiv1.Namespace) _, err := client.Core().Namespaces().Create(namespace) @@ -186,7 +186,6 @@ func NewNamespaceController(client federationclientset.Interface, dynamicClientP return fmt.Sprintf("%s/%s", namespace.Namespace, namespace.Name) }, nc.updateTimeout, - nc.eventRecorder, nc.namespaceFederatedInformer, nc.federatedUpdater, ) @@ -370,26 +369,22 @@ func (nc *NamespaceController) reconcileNamespace(namespace string) { glog.V(5).Infof("Desired namespace in underlying clusters: %+v", desiredNamespace) if !found { - nc.eventRecorder.Eventf(baseNamespace, api.EventTypeNormal, "CreateInCluster", - "Creating namespace in cluster %s", cluster.Name) - operations = append(operations, util.FederatedOperation{ Type: util.OperationTypeAdd, Obj: desiredNamespace, ClusterName: cluster.Name, + Key: namespace, }) } else { clusterNamespace := clusterNamespaceObj.(*apiv1.Namespace) // Update existing namespace, if needed. if !util.ObjectMetaAndSpecEquivalent(desiredNamespace, clusterNamespace) { - nc.eventRecorder.Eventf(baseNamespace, api.EventTypeNormal, "UpdateInCluster", - "Updating namespace in cluster %s. Desired: %+v\n Actual: %+v\n", cluster.Name, desiredNamespace, clusterNamespace) - operations = append(operations, util.FederatedOperation{ Type: util.OperationTypeUpdate, Obj: desiredNamespace, ClusterName: cluster.Name, + Key: namespace, }) } } @@ -401,10 +396,7 @@ func (nc *NamespaceController) reconcileNamespace(namespace string) { } glog.V(2).Infof("Updating namespace %s in underlying clusters. Operations: %d", baseNamespace.Name, len(operations)) - err = nc.federatedUpdater.UpdateWithOnError(operations, nc.updateTimeout, func(op util.FederatedOperation, operror error) { - nc.eventRecorder.Eventf(baseNamespace, api.EventTypeWarning, "UpdateInClusterFailed", - "Namespace update in cluster %s failed: %v", op.ClusterName, operror) - }) + err = nc.federatedUpdater.Update(operations, nc.updateTimeout) if err != nil { glog.Errorf("Failed to execute updates for %s: %v", namespace, err) nc.deliverNamespace(namespace, 0, true) diff --git a/federation/pkg/federation-controller/replicaset/replicasetcontroller.go b/federation/pkg/federation-controller/replicaset/replicasetcontroller.go index 69e158c0dee..0cd8c4fde60 100644 --- a/federation/pkg/federation-controller/replicaset/replicasetcontroller.go +++ b/federation/pkg/federation-controller/replicaset/replicasetcontroller.go @@ -196,7 +196,7 @@ func NewReplicaSetController(federationClient fedclientset.Interface) *ReplicaSe ) frsc.replicaSetLister = extensionslisters.NewReplicaSetLister(replicaSetIndexer) - frsc.fedUpdater = fedutil.NewFederatedUpdater(frsc.fedReplicaSetInformer, + frsc.fedUpdater = fedutil.NewFederatedUpdater(frsc.fedReplicaSetInformer, "replicaset", frsc.eventRecorder, func(client kubeclientset.Interface, obj runtime.Object) error { rs := obj.(*extensionsv1.ReplicaSet) _, err := client.Extensions().ReplicaSets(rs.Namespace).Create(rs) @@ -222,7 +222,6 @@ func NewReplicaSetController(federationClient fedclientset.Interface) *ReplicaSe return fmt.Sprintf("%s/%s", replicaset.Namespace, replicaset.Name) }, updateTimeout, - frsc.eventRecorder, frsc.fedReplicaSetInformer, frsc.fedUpdater, ) @@ -543,26 +542,22 @@ func (frsc *ReplicaSetController) reconcileReplicaSet(key string) (reconciliatio if !exists { if replicas > 0 { - frsc.eventRecorder.Eventf(frs, api.EventTypeNormal, "CreateInCluster", - "Creating replicaset in cluster %s", clusterName) - operations = append(operations, fedutil.FederatedOperation{ Type: fedutil.OperationTypeAdd, Obj: lrs, ClusterName: clusterName, + Key: key, }) } } else { currentLrs := lrsObj.(*extensionsv1.ReplicaSet) // Update existing replica set, if needed. if !fedutil.ObjectMetaAndSpecEquivalent(lrs, currentLrs) { - frsc.eventRecorder.Eventf(frs, api.EventTypeNormal, "UpdateInCluster", - "Updating replicaset in cluster %s", clusterName) - operations = append(operations, fedutil.FederatedOperation{ Type: fedutil.OperationTypeUpdate, Obj: lrs, ClusterName: clusterName, + Key: key, }) } fedStatus.Replicas += currentLrs.Status.Replicas @@ -584,10 +579,7 @@ func (frsc *ReplicaSetController) reconcileReplicaSet(key string) (reconciliatio // Everything is in order return statusAllOk, nil } - err = frsc.fedUpdater.UpdateWithOnError(operations, updateTimeout, func(op fedutil.FederatedOperation, operror error) { - frsc.eventRecorder.Eventf(frs, api.EventTypeWarning, "FailedUpdateInCluster", - "Replicaset update in cluster %s failed: %v", op.ClusterName, operror) - }) + err = frsc.fedUpdater.Update(operations, updateTimeout) if err != nil { glog.Errorf("Failed to execute updates for %s: %v", key, err) return statusError, err diff --git a/federation/pkg/federation-controller/service/servicecontroller.go b/federation/pkg/federation-controller/service/servicecontroller.go index 2219abd6252..f04abeef583 100644 --- a/federation/pkg/federation-controller/service/servicecontroller.go +++ b/federation/pkg/federation-controller/service/servicecontroller.go @@ -185,7 +185,7 @@ func New(federationClient fedclientset.Interface, dns dnsprovider.Interface, s.federatedInformer = fedutil.NewFederatedInformer(federationClient, fedInformerFactory, &clusterLifecycle) - s.federatedUpdater = fedutil.NewFederatedUpdater(s.federatedInformer, + s.federatedUpdater = fedutil.NewFederatedUpdater(s.federatedInformer, "service", s.eventRecorder, func(client kubeclientset.Interface, obj pkgruntime.Object) error { svc := obj.(*v1.Service) _, err := client.Core().Services(svc.Namespace).Create(svc) @@ -243,7 +243,6 @@ func New(federationClient fedclientset.Interface, dns dnsprovider.Interface, return fmt.Sprintf("%s/%s", service.Namespace, service.Name) }, updateTimeout, - s.eventRecorder, s.federatedInformer, s.federatedUpdater, ) @@ -601,11 +600,7 @@ func (s *ServiceController) reconcileService(key string) reconciliationStatus { } if len(operations) != 0 { - err = s.federatedUpdater.UpdateWithOnError(operations, s.updateTimeout, - func(op fedutil.FederatedOperation, operror error) { - runtime.HandleError(fmt.Errorf("Service update in cluster %s failed: %v", op.ClusterName, operror)) - s.eventRecorder.Eventf(fedService, api.EventTypeWarning, "UpdateInClusterFailed", "Service update in cluster %s failed: %v", op.ClusterName, operror) - }) + err = s.federatedUpdater.Update(operations, s.updateTimeout) if err != nil { if !errors.IsAlreadyExists(err) { runtime.HandleError(fmt.Errorf("Failed to execute updates for %s: %v", key, err)) @@ -642,12 +637,12 @@ func (s *ServiceController) getOperationsToPerformOnCluster(cluster *v1beta1.Clu desiredService.ResourceVersion = "" glog.V(4).Infof("Creating service in underlying cluster %s: %+v", cluster.Name, desiredService) - s.eventRecorder.Eventf(fedService, api.EventTypeNormal, "CreateInCluster", "Creating service in cluster %s", cluster.Name) operation = &fedutil.FederatedOperation{ Type: fedutil.OperationTypeAdd, Obj: desiredService, ClusterName: cluster.Name, + Key: key, } } else { clusterService, ok := clusterServiceObj.(*v1.Service) @@ -674,7 +669,6 @@ func (s *ServiceController) getOperationsToPerformOnCluster(cluster *v1beta1.Clu // Update existing service, if needed. if !Equivalent(desiredService, clusterService) { glog.V(4).Infof("Service in underlying cluster %s does not match, Desired: %+v, Existing: %+v", cluster.Name, desiredService, clusterService) - s.eventRecorder.Eventf(fedService, api.EventTypeNormal, "UpdateInCluster", "Updating service in cluster %s. Desired: %+v\n Actual: %+v\n", cluster.Name, desiredService, clusterService) // ResourceVersion of cluster service can be different from federated service, // so do not update ResourceVersion while updating cluster service @@ -684,6 +678,7 @@ func (s *ServiceController) getOperationsToPerformOnCluster(cluster *v1beta1.Clu Type: fedutil.OperationTypeUpdate, Obj: desiredService, ClusterName: cluster.Name, + Key: key, } } else { glog.V(5).Infof("Service in underlying cluster %s is up to date: %+v", cluster.Name, desiredService) diff --git a/federation/pkg/federation-controller/sync/controller.go b/federation/pkg/federation-controller/sync/controller.go index 7d850667717..4f70fb694a1 100644 --- a/federation/pkg/federation-controller/sync/controller.go +++ b/federation/pkg/federation-controller/sync/controller.go @@ -166,7 +166,7 @@ func newFederationSyncController(client federationclientset.Interface, adapter f ) // Federated updeater along with Create/Update/Delete operations. - s.updater = util.NewFederatedUpdater(s.informer, + s.updater = util.NewFederatedUpdater(s.informer, adapter.Kind(), s.eventRecorder, func(client kubeclientset.Interface, obj pkgruntime.Object) error { _, err := adapter.ClusterCreate(client, obj) return err @@ -189,7 +189,6 @@ func newFederationSyncController(client federationclientset.Interface, adapter f return adapter.NamespacedName(obj).String() }, s.updateTimeout, - s.eventRecorder, s.informer, s.updater, ) @@ -352,25 +351,22 @@ func (s *FederationSyncController) reconcile(namespacedName types.NamespacedName desiredObj := s.adapter.Copy(obj) if !found { - s.eventRecorder.Eventf(obj, api.EventTypeNormal, "CreateInCluster", - "Creating %s in cluster %s", kind, cluster.Name) - operations = append(operations, util.FederatedOperation{ Type: util.OperationTypeAdd, Obj: desiredObj, ClusterName: cluster.Name, + Key: key, }) } else { clusterObj := clusterObj.(pkgruntime.Object) // Update existing resource, if needed. if !s.adapter.Equivalent(desiredObj, clusterObj) { - s.eventRecorder.Eventf(obj, api.EventTypeNormal, "UpdateInCluster", - "Updating %s in cluster %s", kind, cluster.Name) operations = append(operations, util.FederatedOperation{ Type: util.OperationTypeUpdate, Obj: desiredObj, ClusterName: cluster.Name, + Key: key, }) } } @@ -380,11 +376,7 @@ func (s *FederationSyncController) reconcile(namespacedName types.NamespacedName // Everything is in order return } - err = s.updater.UpdateWithOnError(operations, s.updateTimeout, - func(op util.FederatedOperation, operror error) { - s.eventRecorder.Eventf(obj, api.EventTypeWarning, "UpdateInClusterFailed", - "%s update in cluster %s failed: %v", strings.ToTitle(kind), op.ClusterName, operror) - }) + err = s.updater.Update(operations, s.updateTimeout) if err != nil { glog.Errorf("Failed to execute updates for %s: %v", key, err) diff --git a/federation/pkg/federation-controller/util/BUILD b/federation/pkg/federation-controller/util/BUILD index f73dce965ab..fca4e867b0b 100644 --- a/federation/pkg/federation-controller/util/BUILD +++ b/federation/pkg/federation-controller/util/BUILD @@ -43,6 +43,7 @@ go_library( "//vendor/k8s.io/client-go/tools/cache:go_default_library", "//vendor/k8s.io/client-go/tools/clientcmd:go_default_library", "//vendor/k8s.io/client-go/tools/clientcmd/api:go_default_library", + "//vendor/k8s.io/client-go/tools/record:go_default_library", "//vendor/k8s.io/client-go/util/flowcontrol:go_default_library", ], ) diff --git a/federation/pkg/federation-controller/util/deletionhelper/BUILD b/federation/pkg/federation-controller/util/deletionhelper/BUILD index be5628c873e..6565c0c7de8 100644 --- a/federation/pkg/federation-controller/util/deletionhelper/BUILD +++ b/federation/pkg/federation-controller/util/deletionhelper/BUILD @@ -14,12 +14,10 @@ go_library( deps = [ "//federation/pkg/federation-controller/util:go_default_library", "//federation/pkg/federation-controller/util/finalizers:go_default_library", - "//pkg/api:go_default_library", "//vendor/github.com/golang/glog:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library", - "//vendor/k8s.io/client-go/tools/record:go_default_library", ], ) diff --git a/federation/pkg/federation-controller/util/deletionhelper/deletion_helper.go b/federation/pkg/federation-controller/util/deletionhelper/deletion_helper.go index 9efeeb573d4..4171b8f6985 100644 --- a/federation/pkg/federation-controller/util/deletionhelper/deletion_helper.go +++ b/federation/pkg/federation-controller/util/deletionhelper/deletion_helper.go @@ -27,10 +27,8 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/sets" - "k8s.io/client-go/tools/record" "k8s.io/kubernetes/federation/pkg/federation-controller/util" finalizersutil "k8s.io/kubernetes/federation/pkg/federation-controller/util/finalizers" - "k8s.io/kubernetes/pkg/api" "github.com/golang/glog" ) @@ -53,20 +51,17 @@ type DeletionHelper struct { updateObjFunc UpdateObjFunc objNameFunc ObjNameFunc updateTimeout time.Duration - eventRecorder record.EventRecorder informer util.FederatedInformer updater util.FederatedUpdater } func NewDeletionHelper( - updateObjFunc UpdateObjFunc, objNameFunc ObjNameFunc, - updateTimeout time.Duration, eventRecorder record.EventRecorder, + updateObjFunc UpdateObjFunc, objNameFunc ObjNameFunc, updateTimeout time.Duration, informer util.FederatedInformer, updater util.FederatedUpdater) *DeletionHelper { return &DeletionHelper{ updateObjFunc: updateObjFunc, objNameFunc: objNameFunc, updateTimeout: updateTimeout, - eventRecorder: eventRecorder, informer: informer, updater: updater, } @@ -157,13 +152,10 @@ func (dh *DeletionHelper) HandleObjectInUnderlyingClusters(obj runtime.Object) ( Type: util.OperationTypeDelete, ClusterName: clusterNsObj.ClusterName, Obj: clusterNsObj.Object.(runtime.Object), + Key: objName, }) } - err = dh.updater.UpdateWithOnError(operations, dh.updateTimeout, func(op util.FederatedOperation, operror error) { - objName := dh.objNameFunc(op.Obj) - dh.eventRecorder.Eventf(obj, api.EventTypeWarning, "DeleteInClusterFailed", - "Failed to delete obj %s in cluster %s: %v", objName, op.ClusterName, operror) - }) + err = dh.updater.Update(operations, dh.updateTimeout) if err != nil { return nil, fmt.Errorf("failed to execute updates for obj %s: %v", objName, err) } diff --git a/federation/pkg/federation-controller/util/federated_updater.go b/federation/pkg/federation-controller/util/federated_updater.go index 929f397966f..c26a93832b8 100644 --- a/federation/pkg/federation-controller/util/federated_updater.go +++ b/federation/pkg/federation-controller/util/federated_updater.go @@ -18,10 +18,14 @@ package util import ( "fmt" + "strings" "time" "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/runtime" pkgruntime "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/tools/record" + "k8s.io/kubernetes/pkg/api" kubeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" ) @@ -39,6 +43,7 @@ type FederatedOperation struct { Type FederatedOperationType ClusterName string Obj pkgruntime.Object + Key string } // A helper that executes the given set of updates on federation, in parallel. @@ -48,8 +53,6 @@ type FederatedUpdater interface { // stopped when it is reached. However the function will return after the timeout // with a non-nil error. Update([]FederatedOperation, time.Duration) error - - UpdateWithOnError([]FederatedOperation, time.Duration, func(FederatedOperation, error)) error } // A function that executes some operation using the passed client and object. @@ -58,25 +61,32 @@ type FederatedOperationHandler func(kubeclientset.Interface, pkgruntime.Object) type federatedUpdaterImpl struct { federation FederationView + kind string + + eventRecorder record.EventRecorder + addFunction FederatedOperationHandler updateFunction FederatedOperationHandler deleteFunction FederatedOperationHandler } -func NewFederatedUpdater(federation FederationView, add, update, del FederatedOperationHandler) FederatedUpdater { +func NewFederatedUpdater(federation FederationView, kind string, recorder record.EventRecorder, add, update, del FederatedOperationHandler) FederatedUpdater { return &federatedUpdaterImpl{ federation: federation, + kind: kind, + eventRecorder: recorder, addFunction: add, updateFunction: update, deleteFunction: del, } } -func (fu *federatedUpdaterImpl) Update(ops []FederatedOperation, timeout time.Duration) error { - return fu.UpdateWithOnError(ops, timeout, nil) +func (fu *federatedUpdaterImpl) recordEvent(obj runtime.Object, eventType, eventVerb string, args ...interface{}) { + messageFmt := eventVerb + " %s %q in cluster %s" + fu.eventRecorder.Eventf(obj, api.EventTypeNormal, eventType, messageFmt, args...) } -func (fu *federatedUpdaterImpl) UpdateWithOnError(ops []FederatedOperation, timeout time.Duration, onError func(FederatedOperation, error)) error { +func (fu *federatedUpdaterImpl) Update(ops []FederatedOperation, timeout time.Duration) error { done := make(chan error, len(ops)) for _, op := range ops { go func(op FederatedOperation) { @@ -89,21 +99,37 @@ func (fu *federatedUpdaterImpl) UpdateWithOnError(ops []FederatedOperation, time return } + eventArgs := []interface{}{fu.kind, op.Key, clusterName} + baseEventType := fmt.Sprintf("%s", op.Type) + eventType := fmt.Sprintf("%sInCluster", strings.Title(baseEventType)) + switch op.Type { case OperationTypeAdd: + // TODO s+OperationTypeAdd+OperationTypeCreate+ + baseEventType = "create" + eventType := "CreateInCluster" + + fu.recordEvent(op.Obj, eventType, "Creating", eventArgs...) err = fu.addFunction(clientset, op.Obj) case OperationTypeUpdate: + fu.recordEvent(op.Obj, eventType, "Updating", eventArgs...) err = fu.updateFunction(clientset, op.Obj) case OperationTypeDelete: + fu.recordEvent(op.Obj, eventType, "Deleting", eventArgs...) err = fu.deleteFunction(clientset, op.Obj) // IsNotFound error is fine since that means the object is deleted already. - if errors.IsNotFound(err) { + if err != nil && !errors.IsNotFound(err) { err = nil } } - if err != nil && onError != nil { - onError(op, err) + + if err != nil { + eventType := eventType + "Failed" + messageFmt := "Failed to " + baseEventType + " %s %q in cluster %s: %v" + eventArgs = append(eventArgs, err) + fu.eventRecorder.Eventf(op.Obj, api.EventTypeWarning, eventType, messageFmt, eventArgs...) } + done <- err }(op) } diff --git a/federation/pkg/federation-controller/util/federated_updater_test.go b/federation/pkg/federation-controller/util/federated_updater_test.go index 43a44818b23..88efd99a569 100644 --- a/federation/pkg/federation-controller/util/federated_updater_test.go +++ b/federation/pkg/federation-controller/util/federated_updater_test.go @@ -58,11 +58,19 @@ func (f *fakeFederationView) ClustersSynced() bool { return true } +type fakeEventRecorder struct{} + +func (f *fakeEventRecorder) Event(object pkgruntime.Object, eventtype, reason, message string) {} +func (f *fakeEventRecorder) Eventf(object pkgruntime.Object, eventtype, reason, messageFmt string, args ...interface{}) { +} +func (f *fakeEventRecorder) PastEventf(object pkgruntime.Object, timestamp metav1.Time, eventtype, reason, messageFmt string, args ...interface{}) { +} + func TestFederatedUpdaterOK(t *testing.T) { addChan := make(chan string, 5) updateChan := make(chan string, 5) - updater := NewFederatedUpdater(&fakeFederationView{}, + updater := NewFederatedUpdater(&fakeFederationView{}, "foo", &fakeEventRecorder{}, func(_ kubeclientset.Interface, obj pkgruntime.Object) error { service := obj.(*apiv1.Service) addChan <- service.Name @@ -93,7 +101,7 @@ func TestFederatedUpdaterOK(t *testing.T) { } func TestFederatedUpdaterError(t *testing.T) { - updater := NewFederatedUpdater(&fakeFederationView{}, + updater := NewFederatedUpdater(&fakeFederationView{}, "foo", &fakeEventRecorder{}, func(_ kubeclientset.Interface, obj pkgruntime.Object) error { return fmt.Errorf("boom") }, noop, noop) @@ -113,7 +121,7 @@ func TestFederatedUpdaterError(t *testing.T) { func TestFederatedUpdaterTimeout(t *testing.T) { start := time.Now() - updater := NewFederatedUpdater(&fakeFederationView{}, + updater := NewFederatedUpdater(&fakeFederationView{}, "foo", &fakeEventRecorder{}, func(_ kubeclientset.Interface, obj pkgruntime.Object) error { time.Sleep(time.Minute) return nil