Merge pull request #45364 from marun/fed-record-events-in-updater

Automatic merge from submit-queue

[Federation] Record events in federated updater

Controllers for federated types were previously recording events when generating the list of operations.  This change delegates responsibility for recording events to the federated updater so that events are recorded when the operations are actually executed, and ensures consistency across recording of both operation initiation and failure.  

The deletion helper was similarly updated to rely on the federated updater for event recording.  To support this change to the deletion helper, controllers have been updated to provide a namespace qualified name via the objNameFunc function to ensure that the updater can record events for deletions with the same detail as for add and update operations.

cc: @kubernetes/sig-federation-pr-reviews @perotinus
This commit is contained in:
Kubernetes Submit Queue 2017-05-04 16:27:15 -07:00 committed by GitHub
commit b3beeff9c2
11 changed files with 83 additions and 101 deletions

View File

@ -188,7 +188,7 @@ func NewDeploymentController(federationClient fedclientset.Interface) *Deploymen
),
)
fdc.fedUpdater = fedutil.NewFederatedUpdater(fdc.fedDeploymentInformer,
fdc.fedUpdater = fedutil.NewFederatedUpdater(fdc.fedDeploymentInformer, "deployment", fdc.eventRecorder,
func(client kubeclientset.Interface, obj runtime.Object) error {
rs := obj.(*extensionsv1.Deployment)
_, err := client.Extensions().Deployments(rs.Namespace).Create(rs)
@ -211,10 +211,9 @@ func NewDeploymentController(federationClient fedclientset.Interface) *Deploymen
// objNameFunc
func(obj runtime.Object) string {
deployment := obj.(*extensionsv1.Deployment)
return deployment.Name
return fmt.Sprintf("%s/%s", deployment.Namespace, deployment.Name)
},
updateTimeout,
fdc.eventRecorder,
fdc.fedDeploymentInformer,
fdc.fedUpdater,
)
@ -526,13 +525,11 @@ func (fdc *DeploymentController) reconcileDeployment(key string) (reconciliation
if !exists {
if replicas > 0 {
fdc.eventRecorder.Eventf(fd, api.EventTypeNormal, "CreateInCluster",
"Creating deployment in cluster %s", clusterName)
operations = append(operations, fedutil.FederatedOperation{
Type: fedutil.OperationTypeAdd,
Obj: ld,
ClusterName: clusterName,
Key: key,
})
}
} else {
@ -541,13 +538,11 @@ func (fdc *DeploymentController) reconcileDeployment(key string) (reconciliation
currentLd := ldObj.(*extensionsv1.Deployment)
// Update existing replica set, if needed.
if !fedutil.DeploymentEquivalent(ld, currentLd) {
fdc.eventRecorder.Eventf(fd, api.EventTypeNormal, "UpdateInCluster",
"Updating deployment in cluster %s", clusterName)
operations = append(operations, fedutil.FederatedOperation{
Type: fedutil.OperationTypeUpdate,
Obj: ld,
ClusterName: clusterName,
Key: key,
})
glog.Infof("Updating %s in %s", currentLd.Name, clusterName)
}
@ -572,10 +567,7 @@ func (fdc *DeploymentController) reconcileDeployment(key string) (reconciliation
// Everything is in order
return statusAllOk, nil
}
err = fdc.fedUpdater.UpdateWithOnError(operations, updateTimeout, func(op fedutil.FederatedOperation, operror error) {
fdc.eventRecorder.Eventf(fd, api.EventTypeWarning, "FailedUpdateInCluster",
"Deployment update in cluster %s failed: %v", op.ClusterName, operror)
})
err = fdc.fedUpdater.Update(operations, updateTimeout)
if err != nil {
glog.Errorf("Failed to execute updates for %s: %v", key, err)
return statusError, err

View File

@ -229,7 +229,7 @@ func NewIngressController(client federationclientset.Interface) *IngressControll
)
// Federated ingress updater along with Create/Update/Delete operations.
ic.federatedIngressUpdater = util.NewFederatedUpdater(ic.ingressFederatedInformer,
ic.federatedIngressUpdater = util.NewFederatedUpdater(ic.ingressFederatedInformer, "ingress", ic.eventRecorder,
func(client kubeclientset.Interface, obj pkgruntime.Object) error {
ingress := obj.(*extensionsv1beta1.Ingress)
glog.V(4).Infof("Attempting to create Ingress: %v", ingress)
@ -261,7 +261,7 @@ func NewIngressController(client federationclientset.Interface) *IngressControll
})
// Federated configmap updater along with Create/Update/Delete operations. Only Update should ever be called.
ic.federatedConfigMapUpdater = util.NewFederatedUpdater(ic.configMapFederatedInformer,
ic.federatedConfigMapUpdater = util.NewFederatedUpdater(ic.configMapFederatedInformer, "configmap", ic.eventRecorder,
func(client kubeclientset.Interface, obj pkgruntime.Object) error {
configMap := obj.(*v1.ConfigMap)
configMapName := types.NamespacedName{Name: configMap.Name, Namespace: configMap.Namespace}
@ -294,10 +294,9 @@ func NewIngressController(client federationclientset.Interface) *IngressControll
// objNameFunc
func(obj pkgruntime.Object) string {
ingress := obj.(*extensionsv1beta1.Ingress)
return ingress.Name
return fmt.Sprintf("%s/%s", ingress.Namespace, ingress.Name)
},
ic.updateTimeout,
ic.eventRecorder,
ic.ingressFederatedInformer,
ic.federatedIngressUpdater,
)
@ -566,12 +565,12 @@ func (ic *IngressController) reconcileConfigMap(cluster *federationapi.Cluster,
Type: util.OperationTypeUpdate,
Obj: configMap,
ClusterName: cluster.Name,
Key: configMapNsName.String(),
}}
glog.V(4).Infof("Calling federatedConfigMapUpdater.Update() - operations: %v", operations)
err := ic.federatedConfigMapUpdater.Update(operations, ic.updateTimeout)
if err != nil {
configMapName := types.NamespacedName{Name: configMap.Name, Namespace: configMap.Namespace}
glog.Errorf("Failed to execute update of ConfigMap %q on cluster %q: %v", configMapName, cluster.Name, err)
glog.Errorf("Failed to execute update of ConfigMap %q on cluster %q: %v", configMapNsName, cluster.Name, err)
ic.configMapDeliverer.DeliverAfter(cluster.Name, nil, ic.configMapReviewDelay)
}
}
@ -770,8 +769,6 @@ func (ic *IngressController) reconcileIngress(ingress types.NamespacedName) {
glog.V(4).Infof("No existing Ingress %s in cluster %s - checking if appropriate to queue a create operation", ingress, cluster.Name)
// We can't supply server-created fields when creating a new object.
desiredIngress.ObjectMeta = util.DeepCopyRelevantObjectMeta(baseIngress.ObjectMeta)
ic.eventRecorder.Eventf(baseIngress, api.EventTypeNormal, "CreateInCluster",
"Creating ingress in cluster %s", cluster.Name)
// We always first create an ingress in the first available cluster. Once that ingress
// has been created and allocated a global IP (visible via an annotation),
@ -797,6 +794,7 @@ func (ic *IngressController) reconcileIngress(ingress types.NamespacedName) {
Type: util.OperationTypeAdd,
Obj: desiredIngress,
ClusterName: cluster.Name,
Key: key,
})
} else {
glog.V(4).Infof("No annotation %q exists on ingress %q in federation and waiting for ingress in cluster %s. Not queueing create operation for ingress until annotation exists", staticIPNameKeyWritable, ingress, firstClusterName)
@ -867,13 +865,12 @@ func (ic *IngressController) reconcileIngress(ingress types.NamespacedName) {
for key, val := range baseIngress.ObjectMeta.Labels {
desiredIngress.ObjectMeta.Labels[key] = val
}
ic.eventRecorder.Eventf(baseIngress, api.EventTypeNormal, "UpdateInCluster",
"Updating ingress in cluster %s", cluster.Name)
operations = append(operations, util.FederatedOperation{
Type: util.OperationTypeUpdate,
Obj: desiredIngress,
ClusterName: cluster.Name,
Key: key,
})
// TODO: Transfer any readonly (target-proxy, url-map etc) annotations from the master cluster to the federation, if this is the master cluster.
// This is only for consistency, so that the federation ingress metadata matches the underlying clusters. It's not actually required }
@ -887,10 +884,7 @@ func (ic *IngressController) reconcileIngress(ingress types.NamespacedName) {
return
}
glog.V(4).Infof("Calling federatedUpdater.Update() - operations: %v", operations)
err = ic.federatedIngressUpdater.UpdateWithOnError(operations, ic.updateTimeout, func(op util.FederatedOperation, operror error) {
ic.eventRecorder.Eventf(baseIngress, api.EventTypeWarning, "FailedClusterUpdate",
"Ingress update in cluster %s failed: %v", op.ClusterName, operror)
})
err = ic.federatedIngressUpdater.Update(operations, ic.updateTimeout)
if err != nil {
glog.Errorf("Failed to execute updates for %s: %v", ingress, err)
ic.deliverIngress(ingress, ic.ingressReviewDelay, true)

View File

@ -156,7 +156,7 @@ func NewNamespaceController(client federationclientset.Interface, dynamicClientP
)
// Federated updater along with Create/Update/Delete operations.
nc.federatedUpdater = util.NewFederatedUpdater(nc.namespaceFederatedInformer,
nc.federatedUpdater = util.NewFederatedUpdater(nc.namespaceFederatedInformer, "namespace", nc.eventRecorder,
func(client kubeclientset.Interface, obj runtime.Object) error {
namespace := obj.(*apiv1.Namespace)
_, err := client.Core().Namespaces().Create(namespace)
@ -183,10 +183,9 @@ func NewNamespaceController(client federationclientset.Interface, dynamicClientP
// objNameFunc
func(obj runtime.Object) string {
namespace := obj.(*apiv1.Namespace)
return namespace.Name
return fmt.Sprintf("%s/%s", namespace.Namespace, namespace.Name)
},
nc.updateTimeout,
nc.eventRecorder,
nc.namespaceFederatedInformer,
nc.federatedUpdater,
)
@ -370,26 +369,22 @@ func (nc *NamespaceController) reconcileNamespace(namespace string) {
glog.V(5).Infof("Desired namespace in underlying clusters: %+v", desiredNamespace)
if !found {
nc.eventRecorder.Eventf(baseNamespace, api.EventTypeNormal, "CreateInCluster",
"Creating namespace in cluster %s", cluster.Name)
operations = append(operations, util.FederatedOperation{
Type: util.OperationTypeAdd,
Obj: desiredNamespace,
ClusterName: cluster.Name,
Key: namespace,
})
} else {
clusterNamespace := clusterNamespaceObj.(*apiv1.Namespace)
// Update existing namespace, if needed.
if !util.ObjectMetaAndSpecEquivalent(desiredNamespace, clusterNamespace) {
nc.eventRecorder.Eventf(baseNamespace, api.EventTypeNormal, "UpdateInCluster",
"Updating namespace in cluster %s. Desired: %+v\n Actual: %+v\n", cluster.Name, desiredNamespace, clusterNamespace)
operations = append(operations, util.FederatedOperation{
Type: util.OperationTypeUpdate,
Obj: desiredNamespace,
ClusterName: cluster.Name,
Key: namespace,
})
}
}
@ -401,10 +396,7 @@ func (nc *NamespaceController) reconcileNamespace(namespace string) {
}
glog.V(2).Infof("Updating namespace %s in underlying clusters. Operations: %d", baseNamespace.Name, len(operations))
err = nc.federatedUpdater.UpdateWithOnError(operations, nc.updateTimeout, func(op util.FederatedOperation, operror error) {
nc.eventRecorder.Eventf(baseNamespace, api.EventTypeWarning, "UpdateInClusterFailed",
"Namespace update in cluster %s failed: %v", op.ClusterName, operror)
})
err = nc.federatedUpdater.Update(operations, nc.updateTimeout)
if err != nil {
glog.Errorf("Failed to execute updates for %s: %v", namespace, err)
nc.deliverNamespace(namespace, 0, true)

View File

@ -196,7 +196,7 @@ func NewReplicaSetController(federationClient fedclientset.Interface) *ReplicaSe
)
frsc.replicaSetLister = extensionslisters.NewReplicaSetLister(replicaSetIndexer)
frsc.fedUpdater = fedutil.NewFederatedUpdater(frsc.fedReplicaSetInformer,
frsc.fedUpdater = fedutil.NewFederatedUpdater(frsc.fedReplicaSetInformer, "replicaset", frsc.eventRecorder,
func(client kubeclientset.Interface, obj runtime.Object) error {
rs := obj.(*extensionsv1.ReplicaSet)
_, err := client.Extensions().ReplicaSets(rs.Namespace).Create(rs)
@ -219,10 +219,9 @@ func NewReplicaSetController(federationClient fedclientset.Interface) *ReplicaSe
// objNameFunc
func(obj runtime.Object) string {
replicaset := obj.(*extensionsv1.ReplicaSet)
return replicaset.Name
return fmt.Sprintf("%s/%s", replicaset.Namespace, replicaset.Name)
},
updateTimeout,
frsc.eventRecorder,
frsc.fedReplicaSetInformer,
frsc.fedUpdater,
)
@ -543,26 +542,22 @@ func (frsc *ReplicaSetController) reconcileReplicaSet(key string) (reconciliatio
if !exists {
if replicas > 0 {
frsc.eventRecorder.Eventf(frs, api.EventTypeNormal, "CreateInCluster",
"Creating replicaset in cluster %s", clusterName)
operations = append(operations, fedutil.FederatedOperation{
Type: fedutil.OperationTypeAdd,
Obj: lrs,
ClusterName: clusterName,
Key: key,
})
}
} else {
currentLrs := lrsObj.(*extensionsv1.ReplicaSet)
// Update existing replica set, if needed.
if !fedutil.ObjectMetaAndSpecEquivalent(lrs, currentLrs) {
frsc.eventRecorder.Eventf(frs, api.EventTypeNormal, "UpdateInCluster",
"Updating replicaset in cluster %s", clusterName)
operations = append(operations, fedutil.FederatedOperation{
Type: fedutil.OperationTypeUpdate,
Obj: lrs,
ClusterName: clusterName,
Key: key,
})
}
fedStatus.Replicas += currentLrs.Status.Replicas
@ -584,10 +579,7 @@ func (frsc *ReplicaSetController) reconcileReplicaSet(key string) (reconciliatio
// Everything is in order
return statusAllOk, nil
}
err = frsc.fedUpdater.UpdateWithOnError(operations, updateTimeout, func(op fedutil.FederatedOperation, operror error) {
frsc.eventRecorder.Eventf(frs, api.EventTypeWarning, "FailedUpdateInCluster",
"Replicaset update in cluster %s failed: %v", op.ClusterName, operror)
})
err = frsc.fedUpdater.Update(operations, updateTimeout)
if err != nil {
glog.Errorf("Failed to execute updates for %s: %v", key, err)
return statusError, err

View File

@ -185,7 +185,7 @@ func New(federationClient fedclientset.Interface, dns dnsprovider.Interface,
s.federatedInformer = fedutil.NewFederatedInformer(federationClient, fedInformerFactory, &clusterLifecycle)
s.federatedUpdater = fedutil.NewFederatedUpdater(s.federatedInformer,
s.federatedUpdater = fedutil.NewFederatedUpdater(s.federatedInformer, "service", s.eventRecorder,
func(client kubeclientset.Interface, obj pkgruntime.Object) error {
svc := obj.(*v1.Service)
_, err := client.Core().Services(svc.Namespace).Create(svc)
@ -240,10 +240,9 @@ func New(federationClient fedclientset.Interface, dns dnsprovider.Interface,
// objNameFunc
func(obj pkgruntime.Object) string {
service := obj.(*v1.Service)
return service.Name
return fmt.Sprintf("%s/%s", service.Namespace, service.Name)
},
updateTimeout,
s.eventRecorder,
s.federatedInformer,
s.federatedUpdater,
)
@ -601,11 +600,7 @@ func (s *ServiceController) reconcileService(key string) reconciliationStatus {
}
if len(operations) != 0 {
err = s.federatedUpdater.UpdateWithOnError(operations, s.updateTimeout,
func(op fedutil.FederatedOperation, operror error) {
runtime.HandleError(fmt.Errorf("Service update in cluster %s failed: %v", op.ClusterName, operror))
s.eventRecorder.Eventf(fedService, api.EventTypeWarning, "UpdateInClusterFailed", "Service update in cluster %s failed: %v", op.ClusterName, operror)
})
err = s.federatedUpdater.Update(operations, s.updateTimeout)
if err != nil {
if !errors.IsAlreadyExists(err) {
runtime.HandleError(fmt.Errorf("Failed to execute updates for %s: %v", key, err))
@ -642,12 +637,12 @@ func (s *ServiceController) getOperationsToPerformOnCluster(cluster *v1beta1.Clu
desiredService.ResourceVersion = ""
glog.V(4).Infof("Creating service in underlying cluster %s: %+v", cluster.Name, desiredService)
s.eventRecorder.Eventf(fedService, api.EventTypeNormal, "CreateInCluster", "Creating service in cluster %s", cluster.Name)
operation = &fedutil.FederatedOperation{
Type: fedutil.OperationTypeAdd,
Obj: desiredService,
ClusterName: cluster.Name,
Key: key,
}
} else {
clusterService, ok := clusterServiceObj.(*v1.Service)
@ -674,7 +669,6 @@ func (s *ServiceController) getOperationsToPerformOnCluster(cluster *v1beta1.Clu
// Update existing service, if needed.
if !Equivalent(desiredService, clusterService) {
glog.V(4).Infof("Service in underlying cluster %s does not match, Desired: %+v, Existing: %+v", cluster.Name, desiredService, clusterService)
s.eventRecorder.Eventf(fedService, api.EventTypeNormal, "UpdateInCluster", "Updating service in cluster %s. Desired: %+v\n Actual: %+v\n", cluster.Name, desiredService, clusterService)
// ResourceVersion of cluster service can be different from federated service,
// so do not update ResourceVersion while updating cluster service
@ -684,6 +678,7 @@ func (s *ServiceController) getOperationsToPerformOnCluster(cluster *v1beta1.Clu
Type: fedutil.OperationTypeUpdate,
Obj: desiredService,
ClusterName: cluster.Name,
Key: key,
}
} else {
glog.V(5).Infof("Service in underlying cluster %s is up to date: %+v", cluster.Name, desiredService)

View File

@ -166,7 +166,7 @@ func newFederationSyncController(client federationclientset.Interface, adapter f
)
// Federated updeater along with Create/Update/Delete operations.
s.updater = util.NewFederatedUpdater(s.informer,
s.updater = util.NewFederatedUpdater(s.informer, adapter.Kind(), s.eventRecorder,
func(client kubeclientset.Interface, obj pkgruntime.Object) error {
_, err := adapter.ClusterCreate(client, obj)
return err
@ -186,10 +186,9 @@ func newFederationSyncController(client federationclientset.Interface, adapter f
s.updateObject,
// objNameFunc
func(obj pkgruntime.Object) string {
return adapter.ObjectMeta(obj).Name
return adapter.NamespacedName(obj).String()
},
s.updateTimeout,
s.eventRecorder,
s.informer,
s.updater,
)
@ -352,25 +351,22 @@ func (s *FederationSyncController) reconcile(namespacedName types.NamespacedName
desiredObj := s.adapter.Copy(obj)
if !found {
s.eventRecorder.Eventf(obj, api.EventTypeNormal, "CreateInCluster",
"Creating %s in cluster %s", kind, cluster.Name)
operations = append(operations, util.FederatedOperation{
Type: util.OperationTypeAdd,
Obj: desiredObj,
ClusterName: cluster.Name,
Key: key,
})
} else {
clusterObj := clusterObj.(pkgruntime.Object)
// Update existing resource, if needed.
if !s.adapter.Equivalent(desiredObj, clusterObj) {
s.eventRecorder.Eventf(obj, api.EventTypeNormal, "UpdateInCluster",
"Updating %s in cluster %s", kind, cluster.Name)
operations = append(operations, util.FederatedOperation{
Type: util.OperationTypeUpdate,
Obj: desiredObj,
ClusterName: cluster.Name,
Key: key,
})
}
}
@ -380,11 +376,7 @@ func (s *FederationSyncController) reconcile(namespacedName types.NamespacedName
// Everything is in order
return
}
err = s.updater.UpdateWithOnError(operations, s.updateTimeout,
func(op util.FederatedOperation, operror error) {
s.eventRecorder.Eventf(obj, api.EventTypeWarning, "UpdateInClusterFailed",
"%s update in cluster %s failed: %v", strings.ToTitle(kind), op.ClusterName, operror)
})
err = s.updater.Update(operations, s.updateTimeout)
if err != nil {
glog.Errorf("Failed to execute updates for %s: %v", key, err)

View File

@ -43,6 +43,7 @@ go_library(
"//vendor/k8s.io/client-go/tools/cache:go_default_library",
"//vendor/k8s.io/client-go/tools/clientcmd:go_default_library",
"//vendor/k8s.io/client-go/tools/clientcmd/api:go_default_library",
"//vendor/k8s.io/client-go/tools/record:go_default_library",
"//vendor/k8s.io/client-go/util/flowcontrol:go_default_library",
],
)

View File

@ -14,12 +14,10 @@ go_library(
deps = [
"//federation/pkg/federation-controller/util:go_default_library",
"//federation/pkg/federation-controller/util/finalizers:go_default_library",
"//pkg/api:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//vendor/k8s.io/client-go/tools/record:go_default_library",
],
)

View File

@ -27,10 +27,8 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/tools/record"
"k8s.io/kubernetes/federation/pkg/federation-controller/util"
finalizersutil "k8s.io/kubernetes/federation/pkg/federation-controller/util/finalizers"
"k8s.io/kubernetes/pkg/api"
"github.com/golang/glog"
)
@ -53,20 +51,17 @@ type DeletionHelper struct {
updateObjFunc UpdateObjFunc
objNameFunc ObjNameFunc
updateTimeout time.Duration
eventRecorder record.EventRecorder
informer util.FederatedInformer
updater util.FederatedUpdater
}
func NewDeletionHelper(
updateObjFunc UpdateObjFunc, objNameFunc ObjNameFunc,
updateTimeout time.Duration, eventRecorder record.EventRecorder,
updateObjFunc UpdateObjFunc, objNameFunc ObjNameFunc, updateTimeout time.Duration,
informer util.FederatedInformer, updater util.FederatedUpdater) *DeletionHelper {
return &DeletionHelper{
updateObjFunc: updateObjFunc,
objNameFunc: objNameFunc,
updateTimeout: updateTimeout,
eventRecorder: eventRecorder,
informer: informer,
updater: updater,
}
@ -157,13 +152,10 @@ func (dh *DeletionHelper) HandleObjectInUnderlyingClusters(obj runtime.Object) (
Type: util.OperationTypeDelete,
ClusterName: clusterNsObj.ClusterName,
Obj: clusterNsObj.Object.(runtime.Object),
Key: objName,
})
}
err = dh.updater.UpdateWithOnError(operations, dh.updateTimeout, func(op util.FederatedOperation, operror error) {
objName := dh.objNameFunc(op.Obj)
dh.eventRecorder.Eventf(obj, api.EventTypeWarning, "DeleteInClusterFailed",
"Failed to delete obj %s in cluster %s: %v", objName, op.ClusterName, operror)
})
err = dh.updater.Update(operations, dh.updateTimeout)
if err != nil {
return nil, fmt.Errorf("failed to execute updates for obj %s: %v", objName, err)
}

View File

@ -18,10 +18,14 @@ package util
import (
"fmt"
"strings"
"time"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
pkgruntime "k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/record"
"k8s.io/kubernetes/pkg/api"
kubeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
)
@ -39,6 +43,7 @@ type FederatedOperation struct {
Type FederatedOperationType
ClusterName string
Obj pkgruntime.Object
Key string
}
// A helper that executes the given set of updates on federation, in parallel.
@ -48,8 +53,6 @@ type FederatedUpdater interface {
// stopped when it is reached. However the function will return after the timeout
// with a non-nil error.
Update([]FederatedOperation, time.Duration) error
UpdateWithOnError([]FederatedOperation, time.Duration, func(FederatedOperation, error)) error
}
// A function that executes some operation using the passed client and object.
@ -58,25 +61,32 @@ type FederatedOperationHandler func(kubeclientset.Interface, pkgruntime.Object)
type federatedUpdaterImpl struct {
federation FederationView
kind string
eventRecorder record.EventRecorder
addFunction FederatedOperationHandler
updateFunction FederatedOperationHandler
deleteFunction FederatedOperationHandler
}
func NewFederatedUpdater(federation FederationView, add, update, del FederatedOperationHandler) FederatedUpdater {
func NewFederatedUpdater(federation FederationView, kind string, recorder record.EventRecorder, add, update, del FederatedOperationHandler) FederatedUpdater {
return &federatedUpdaterImpl{
federation: federation,
kind: kind,
eventRecorder: recorder,
addFunction: add,
updateFunction: update,
deleteFunction: del,
}
}
func (fu *federatedUpdaterImpl) Update(ops []FederatedOperation, timeout time.Duration) error {
return fu.UpdateWithOnError(ops, timeout, nil)
func (fu *federatedUpdaterImpl) recordEvent(obj runtime.Object, eventType, eventVerb string, args ...interface{}) {
messageFmt := eventVerb + " %s %q in cluster %s"
fu.eventRecorder.Eventf(obj, api.EventTypeNormal, eventType, messageFmt, args...)
}
func (fu *federatedUpdaterImpl) UpdateWithOnError(ops []FederatedOperation, timeout time.Duration, onError func(FederatedOperation, error)) error {
func (fu *federatedUpdaterImpl) Update(ops []FederatedOperation, timeout time.Duration) error {
done := make(chan error, len(ops))
for _, op := range ops {
go func(op FederatedOperation) {
@ -89,21 +99,37 @@ func (fu *federatedUpdaterImpl) UpdateWithOnError(ops []FederatedOperation, time
return
}
eventArgs := []interface{}{fu.kind, op.Key, clusterName}
baseEventType := fmt.Sprintf("%s", op.Type)
eventType := fmt.Sprintf("%sInCluster", strings.Title(baseEventType))
switch op.Type {
case OperationTypeAdd:
// TODO s+OperationTypeAdd+OperationTypeCreate+
baseEventType = "create"
eventType := "CreateInCluster"
fu.recordEvent(op.Obj, eventType, "Creating", eventArgs...)
err = fu.addFunction(clientset, op.Obj)
case OperationTypeUpdate:
fu.recordEvent(op.Obj, eventType, "Updating", eventArgs...)
err = fu.updateFunction(clientset, op.Obj)
case OperationTypeDelete:
fu.recordEvent(op.Obj, eventType, "Deleting", eventArgs...)
err = fu.deleteFunction(clientset, op.Obj)
// IsNotFound error is fine since that means the object is deleted already.
if errors.IsNotFound(err) {
if err != nil && !errors.IsNotFound(err) {
err = nil
}
}
if err != nil && onError != nil {
onError(op, err)
if err != nil {
eventType := eventType + "Failed"
messageFmt := "Failed to " + baseEventType + " %s %q in cluster %s: %v"
eventArgs = append(eventArgs, err)
fu.eventRecorder.Eventf(op.Obj, api.EventTypeWarning, eventType, messageFmt, eventArgs...)
}
done <- err
}(op)
}

View File

@ -58,11 +58,19 @@ func (f *fakeFederationView) ClustersSynced() bool {
return true
}
type fakeEventRecorder struct{}
func (f *fakeEventRecorder) Event(object pkgruntime.Object, eventtype, reason, message string) {}
func (f *fakeEventRecorder) Eventf(object pkgruntime.Object, eventtype, reason, messageFmt string, args ...interface{}) {
}
func (f *fakeEventRecorder) PastEventf(object pkgruntime.Object, timestamp metav1.Time, eventtype, reason, messageFmt string, args ...interface{}) {
}
func TestFederatedUpdaterOK(t *testing.T) {
addChan := make(chan string, 5)
updateChan := make(chan string, 5)
updater := NewFederatedUpdater(&fakeFederationView{},
updater := NewFederatedUpdater(&fakeFederationView{}, "foo", &fakeEventRecorder{},
func(_ kubeclientset.Interface, obj pkgruntime.Object) error {
service := obj.(*apiv1.Service)
addChan <- service.Name
@ -93,7 +101,7 @@ func TestFederatedUpdaterOK(t *testing.T) {
}
func TestFederatedUpdaterError(t *testing.T) {
updater := NewFederatedUpdater(&fakeFederationView{},
updater := NewFederatedUpdater(&fakeFederationView{}, "foo", &fakeEventRecorder{},
func(_ kubeclientset.Interface, obj pkgruntime.Object) error {
return fmt.Errorf("boom")
}, noop, noop)
@ -113,7 +121,7 @@ func TestFederatedUpdaterError(t *testing.T) {
func TestFederatedUpdaterTimeout(t *testing.T) {
start := time.Now()
updater := NewFederatedUpdater(&fakeFederationView{},
updater := NewFederatedUpdater(&fakeFederationView{}, "foo", &fakeEventRecorder{},
func(_ kubeclientset.Interface, obj pkgruntime.Object) error {
time.Sleep(time.Minute)
return nil