From 3f2dab896c6be15044ff5797ae62674f95e36611 Mon Sep 17 00:00:00 2001 From: Maru Newby Date: Thu, 4 May 2017 13:37:33 -0700 Subject: [PATCH] fed: Provide updater timeout to instance rather than to Update() --- .../deployment/deploymentcontroller.go | 5 ++-- .../ingress/ingress_controller.go | 9 +++---- .../namespace/namespace_controller.go | 5 ++-- .../replicaset/replicasetcontroller.go | 5 ++-- .../service/servicecontroller.go | 5 ++-- .../federation-controller/sync/controller.go | 5 ++-- .../util/deletionhelper/deletion_helper.go | 7 ++--- .../util/federated_updater.go | 26 +++++++++++-------- .../util/federated_updater_test.go | 12 ++++----- 9 files changed, 37 insertions(+), 42 deletions(-) diff --git a/federation/pkg/federation-controller/deployment/deploymentcontroller.go b/federation/pkg/federation-controller/deployment/deploymentcontroller.go index 16122e4f261..64e50831347 100644 --- a/federation/pkg/federation-controller/deployment/deploymentcontroller.go +++ b/federation/pkg/federation-controller/deployment/deploymentcontroller.go @@ -188,7 +188,7 @@ func NewDeploymentController(federationClient fedclientset.Interface) *Deploymen ), ) - fdc.fedUpdater = fedutil.NewFederatedUpdater(fdc.fedDeploymentInformer, "deployment", fdc.eventRecorder, + fdc.fedUpdater = fedutil.NewFederatedUpdater(fdc.fedDeploymentInformer, "deployment", updateTimeout, fdc.eventRecorder, func(client kubeclientset.Interface, obj runtime.Object) error { rs := obj.(*extensionsv1.Deployment) _, err := client.Extensions().Deployments(rs.Namespace).Create(rs) @@ -213,7 +213,6 @@ func NewDeploymentController(federationClient fedclientset.Interface) *Deploymen deployment := obj.(*extensionsv1.Deployment) return fmt.Sprintf("%s/%s", deployment.Namespace, deployment.Name) }, - updateTimeout, fdc.fedDeploymentInformer, fdc.fedUpdater, ) @@ -567,7 +566,7 @@ func (fdc *DeploymentController) reconcileDeployment(key string) (reconciliation // Everything is in order return statusAllOk, nil } - err = fdc.fedUpdater.Update(operations, updateTimeout) + err = fdc.fedUpdater.Update(operations) if err != nil { glog.Errorf("Failed to execute updates for %s: %v", key, err) return statusError, err diff --git a/federation/pkg/federation-controller/ingress/ingress_controller.go b/federation/pkg/federation-controller/ingress/ingress_controller.go index ab2fee10935..6cccbae0808 100644 --- a/federation/pkg/federation-controller/ingress/ingress_controller.go +++ b/federation/pkg/federation-controller/ingress/ingress_controller.go @@ -230,7 +230,7 @@ func NewIngressController(client federationclientset.Interface) *IngressControll ) // Federated ingress updater along with Create/Update/Delete operations. - ic.federatedIngressUpdater = util.NewFederatedUpdater(ic.ingressFederatedInformer, "ingress", ic.eventRecorder, + ic.federatedIngressUpdater = util.NewFederatedUpdater(ic.ingressFederatedInformer, "ingress", ic.updateTimeout, ic.eventRecorder, func(client kubeclientset.Interface, obj pkgruntime.Object) error { ingress := obj.(*extensionsv1beta1.Ingress) glog.V(4).Infof("Attempting to create Ingress: %v", ingress) @@ -262,7 +262,7 @@ func NewIngressController(client federationclientset.Interface) *IngressControll }) // Federated configmap updater along with Create/Update/Delete operations. Only Update should ever be called. - ic.federatedConfigMapUpdater = util.NewFederatedUpdater(ic.configMapFederatedInformer, "configmap", ic.eventRecorder, + ic.federatedConfigMapUpdater = util.NewFederatedUpdater(ic.configMapFederatedInformer, "configmap", ic.updateTimeout, ic.eventRecorder, func(client kubeclientset.Interface, obj pkgruntime.Object) error { configMap := obj.(*v1.ConfigMap) configMapName := types.NamespacedName{Name: configMap.Name, Namespace: configMap.Namespace} @@ -297,7 +297,6 @@ func NewIngressController(client federationclientset.Interface) *IngressControll ingress := obj.(*extensionsv1beta1.Ingress) return fmt.Sprintf("%s/%s", ingress.Namespace, ingress.Name) }, - ic.updateTimeout, ic.ingressFederatedInformer, ic.federatedIngressUpdater, ) @@ -569,7 +568,7 @@ func (ic *IngressController) reconcileConfigMap(cluster *federationapi.Cluster, Key: configMapNsName.String(), }} glog.V(4).Infof("Calling federatedConfigMapUpdater.Update() - operations: %v", operations) - err := ic.federatedConfigMapUpdater.Update(operations, ic.updateTimeout) + err := ic.federatedConfigMapUpdater.Update(operations) if err != nil { glog.Errorf("Failed to execute update of ConfigMap %q on cluster %q: %v", configMapNsName, cluster.Name, err) ic.configMapDeliverer.DeliverAfter(cluster.Name, nil, ic.configMapReviewDelay) @@ -885,7 +884,7 @@ func (ic *IngressController) reconcileIngress(ingress types.NamespacedName) { return } glog.V(4).Infof("Calling federatedUpdater.Update() - operations: %v", operations) - err = ic.federatedIngressUpdater.Update(operations, ic.updateTimeout) + err = ic.federatedIngressUpdater.Update(operations) if err != nil { glog.Errorf("Failed to execute updates for %s: %v", ingress, err) ic.deliverIngress(ingress, ic.ingressReviewDelay, true) diff --git a/federation/pkg/federation-controller/namespace/namespace_controller.go b/federation/pkg/federation-controller/namespace/namespace_controller.go index b510f9c9ef0..0005fb8be82 100644 --- a/federation/pkg/federation-controller/namespace/namespace_controller.go +++ b/federation/pkg/federation-controller/namespace/namespace_controller.go @@ -157,7 +157,7 @@ func NewNamespaceController(client federationclientset.Interface, dynamicClientP ) // Federated updater along with Create/Update/Delete operations. - nc.federatedUpdater = util.NewFederatedUpdater(nc.namespaceFederatedInformer, "namespace", nc.eventRecorder, + nc.federatedUpdater = util.NewFederatedUpdater(nc.namespaceFederatedInformer, "namespace", nc.updateTimeout, nc.eventRecorder, func(client kubeclientset.Interface, obj runtime.Object) error { namespace := obj.(*apiv1.Namespace) _, err := client.Core().Namespaces().Create(namespace) @@ -186,7 +186,6 @@ func NewNamespaceController(client federationclientset.Interface, dynamicClientP namespace := obj.(*apiv1.Namespace) return fmt.Sprintf("%s/%s", namespace.Namespace, namespace.Name) }, - nc.updateTimeout, nc.namespaceFederatedInformer, nc.federatedUpdater, ) @@ -397,7 +396,7 @@ func (nc *NamespaceController) reconcileNamespace(namespace string) { } glog.V(2).Infof("Updating namespace %s in underlying clusters. Operations: %d", baseNamespace.Name, len(operations)) - err = nc.federatedUpdater.Update(operations, nc.updateTimeout) + err = nc.federatedUpdater.Update(operations) if err != nil { glog.Errorf("Failed to execute updates for %s: %v", namespace, err) nc.deliverNamespace(namespace, 0, true) diff --git a/federation/pkg/federation-controller/replicaset/replicasetcontroller.go b/federation/pkg/federation-controller/replicaset/replicasetcontroller.go index 3e061873b56..63a8dc6203a 100644 --- a/federation/pkg/federation-controller/replicaset/replicasetcontroller.go +++ b/federation/pkg/federation-controller/replicaset/replicasetcontroller.go @@ -196,7 +196,7 @@ func NewReplicaSetController(federationClient fedclientset.Interface) *ReplicaSe ) frsc.replicaSetLister = extensionslisters.NewReplicaSetLister(replicaSetIndexer) - frsc.fedUpdater = fedutil.NewFederatedUpdater(frsc.fedReplicaSetInformer, "replicaset", frsc.eventRecorder, + frsc.fedUpdater = fedutil.NewFederatedUpdater(frsc.fedReplicaSetInformer, "replicaset", updateTimeout, frsc.eventRecorder, func(client kubeclientset.Interface, obj runtime.Object) error { rs := obj.(*extensionsv1.ReplicaSet) _, err := client.Extensions().ReplicaSets(rs.Namespace).Create(rs) @@ -221,7 +221,6 @@ func NewReplicaSetController(federationClient fedclientset.Interface) *ReplicaSe replicaset := obj.(*extensionsv1.ReplicaSet) return fmt.Sprintf("%s/%s", replicaset.Namespace, replicaset.Name) }, - updateTimeout, frsc.fedReplicaSetInformer, frsc.fedUpdater, ) @@ -579,7 +578,7 @@ func (frsc *ReplicaSetController) reconcileReplicaSet(key string) (reconciliatio // Everything is in order return statusAllOk, nil } - err = frsc.fedUpdater.Update(operations, updateTimeout) + err = frsc.fedUpdater.Update(operations) if err != nil { glog.Errorf("Failed to execute updates for %s: %v", key, err) return statusError, err diff --git a/federation/pkg/federation-controller/service/servicecontroller.go b/federation/pkg/federation-controller/service/servicecontroller.go index f04abeef583..a73b6b5247a 100644 --- a/federation/pkg/federation-controller/service/servicecontroller.go +++ b/federation/pkg/federation-controller/service/servicecontroller.go @@ -185,7 +185,7 @@ func New(federationClient fedclientset.Interface, dns dnsprovider.Interface, s.federatedInformer = fedutil.NewFederatedInformer(federationClient, fedInformerFactory, &clusterLifecycle) - s.federatedUpdater = fedutil.NewFederatedUpdater(s.federatedInformer, "service", s.eventRecorder, + s.federatedUpdater = fedutil.NewFederatedUpdater(s.federatedInformer, "service", updateTimeout, s.eventRecorder, func(client kubeclientset.Interface, obj pkgruntime.Object) error { svc := obj.(*v1.Service) _, err := client.Core().Services(svc.Namespace).Create(svc) @@ -242,7 +242,6 @@ func New(federationClient fedclientset.Interface, dns dnsprovider.Interface, service := obj.(*v1.Service) return fmt.Sprintf("%s/%s", service.Namespace, service.Name) }, - updateTimeout, s.federatedInformer, s.federatedUpdater, ) @@ -600,7 +599,7 @@ func (s *ServiceController) reconcileService(key string) reconciliationStatus { } if len(operations) != 0 { - err = s.federatedUpdater.Update(operations, s.updateTimeout) + err = s.federatedUpdater.Update(operations) if err != nil { if !errors.IsAlreadyExists(err) { runtime.HandleError(fmt.Errorf("Failed to execute updates for %s: %v", key, err)) diff --git a/federation/pkg/federation-controller/sync/controller.go b/federation/pkg/federation-controller/sync/controller.go index 91833ae9ddc..651bd6c5259 100644 --- a/federation/pkg/federation-controller/sync/controller.go +++ b/federation/pkg/federation-controller/sync/controller.go @@ -172,7 +172,7 @@ func newFederationSyncController(client federationclientset.Interface, adapter f ) // Federated updeater along with Create/Update/Delete operations. - s.updater = util.NewFederatedUpdater(s.informer, adapter.Kind(), s.eventRecorder, + s.updater = util.NewFederatedUpdater(s.informer, adapter.Kind(), s.updateTimeout, s.eventRecorder, func(client kubeclientset.Interface, obj pkgruntime.Object) error { _, err := adapter.ClusterCreate(client, obj) return err @@ -194,7 +194,6 @@ func newFederationSyncController(client federationclientset.Interface, adapter f func(obj pkgruntime.Object) string { return adapter.NamespacedName(obj).String() }, - s.updateTimeout, s.informer, s.updater, ) @@ -414,7 +413,7 @@ func (s *FederationSyncController) reconcile(namespacedName types.NamespacedName return statusAllOK } - err = s.updater.Update(operations, s.updateTimeout) + err = s.updater.Update(operations) if err != nil { glog.Errorf("failed to execute updates for %s: %v", key, err) return statusError diff --git a/federation/pkg/federation-controller/util/deletionhelper/deletion_helper.go b/federation/pkg/federation-controller/util/deletionhelper/deletion_helper.go index 4171b8f6985..d18570a9821 100644 --- a/federation/pkg/federation-controller/util/deletionhelper/deletion_helper.go +++ b/federation/pkg/federation-controller/util/deletionhelper/deletion_helper.go @@ -22,7 +22,6 @@ package deletionhelper import ( "fmt" "strings" - "time" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -50,18 +49,16 @@ type ObjNameFunc func(runtime.Object) string type DeletionHelper struct { updateObjFunc UpdateObjFunc objNameFunc ObjNameFunc - updateTimeout time.Duration informer util.FederatedInformer updater util.FederatedUpdater } func NewDeletionHelper( - updateObjFunc UpdateObjFunc, objNameFunc ObjNameFunc, updateTimeout time.Duration, + updateObjFunc UpdateObjFunc, objNameFunc ObjNameFunc, informer util.FederatedInformer, updater util.FederatedUpdater) *DeletionHelper { return &DeletionHelper{ updateObjFunc: updateObjFunc, objNameFunc: objNameFunc, - updateTimeout: updateTimeout, informer: informer, updater: updater, } @@ -155,7 +152,7 @@ func (dh *DeletionHelper) HandleObjectInUnderlyingClusters(obj runtime.Object) ( Key: objName, }) } - err = dh.updater.Update(operations, dh.updateTimeout) + err = dh.updater.Update(operations) if err != nil { return nil, fmt.Errorf("failed to execute updates for obj %s: %v", objName, err) } diff --git a/federation/pkg/federation-controller/util/federated_updater.go b/federation/pkg/federation-controller/util/federated_updater.go index c26a93832b8..244fa4c554b 100644 --- a/federation/pkg/federation-controller/util/federated_updater.go +++ b/federation/pkg/federation-controller/util/federated_updater.go @@ -48,11 +48,8 @@ type FederatedOperation struct { // A helper that executes the given set of updates on federation, in parallel. type FederatedUpdater interface { - // Executes the given set of operations within the specified timeout. - // Timeout is best-effort. There is no guarantee that the underlying operations are - // stopped when it is reached. However the function will return after the timeout - // with a non-nil error. - Update([]FederatedOperation, time.Duration) error + // Executes the given set of operations. + Update([]FederatedOperation) error } // A function that executes some operation using the passed client and object. @@ -63,6 +60,8 @@ type federatedUpdaterImpl struct { kind string + timeout time.Duration + eventRecorder record.EventRecorder addFunction FederatedOperationHandler @@ -70,10 +69,11 @@ type federatedUpdaterImpl struct { deleteFunction FederatedOperationHandler } -func NewFederatedUpdater(federation FederationView, kind string, recorder record.EventRecorder, add, update, del FederatedOperationHandler) FederatedUpdater { +func NewFederatedUpdater(federation FederationView, kind string, timeout time.Duration, recorder record.EventRecorder, add, update, del FederatedOperationHandler) FederatedUpdater { return &federatedUpdaterImpl{ federation: federation, kind: kind, + timeout: timeout, eventRecorder: recorder, addFunction: add, updateFunction: update, @@ -86,7 +86,11 @@ func (fu *federatedUpdaterImpl) recordEvent(obj runtime.Object, eventType, event fu.eventRecorder.Eventf(obj, api.EventTypeNormal, eventType, messageFmt, args...) } -func (fu *federatedUpdaterImpl) Update(ops []FederatedOperation, timeout time.Duration) error { +// Update executes the given set of operations within the timeout specified for +// the instance. Timeout is best-effort. There is no guarantee that the +// underlying operations are stopped when it is reached. However the function +// will return after the timeout with a non-nil error. +func (fu *federatedUpdaterImpl) Update(ops []FederatedOperation) error { done := make(chan error, len(ops)) for _, op := range ops { go func(op FederatedOperation) { @@ -136,16 +140,16 @@ func (fu *federatedUpdaterImpl) Update(ops []FederatedOperation, timeout time.Du start := time.Now() for i := 0; i < len(ops); i++ { now := time.Now() - if !now.Before(start.Add(timeout)) { - return fmt.Errorf("failed to finish all operations in %v", timeout) + if !now.Before(start.Add(fu.timeout)) { + return fmt.Errorf("failed to finish all operations in %v", fu.timeout) } select { case err := <-done: if err != nil { return err } - case <-time.After(start.Add(timeout).Sub(now)): - return fmt.Errorf("failed to finish all operations in %v", timeout) + case <-time.After(start.Add(fu.timeout).Sub(now)): + return fmt.Errorf("failed to finish all operations in %v", fu.timeout) } } // All operations finished in time. diff --git a/federation/pkg/federation-controller/util/federated_updater_test.go b/federation/pkg/federation-controller/util/federated_updater_test.go index 88efd99a569..76c686f9dfb 100644 --- a/federation/pkg/federation-controller/util/federated_updater_test.go +++ b/federation/pkg/federation-controller/util/federated_updater_test.go @@ -70,7 +70,7 @@ func TestFederatedUpdaterOK(t *testing.T) { addChan := make(chan string, 5) updateChan := make(chan string, 5) - updater := NewFederatedUpdater(&fakeFederationView{}, "foo", &fakeEventRecorder{}, + updater := NewFederatedUpdater(&fakeFederationView{}, "foo", time.Minute, &fakeEventRecorder{}, func(_ kubeclientset.Interface, obj pkgruntime.Object) error { service := obj.(*apiv1.Service) addChan <- service.Name @@ -92,7 +92,7 @@ func TestFederatedUpdaterOK(t *testing.T) { Type: OperationTypeUpdate, Obj: makeService("B", "s2"), }, - }, time.Minute) + }) assert.NoError(t, err) add := <-addChan update := <-updateChan @@ -101,7 +101,7 @@ func TestFederatedUpdaterOK(t *testing.T) { } func TestFederatedUpdaterError(t *testing.T) { - updater := NewFederatedUpdater(&fakeFederationView{}, "foo", &fakeEventRecorder{}, + updater := NewFederatedUpdater(&fakeFederationView{}, "foo", time.Minute, &fakeEventRecorder{}, func(_ kubeclientset.Interface, obj pkgruntime.Object) error { return fmt.Errorf("boom") }, noop, noop) @@ -115,13 +115,13 @@ func TestFederatedUpdaterError(t *testing.T) { Type: OperationTypeUpdate, Obj: makeService("B", "s1"), }, - }, time.Minute) + }) assert.Error(t, err) } func TestFederatedUpdaterTimeout(t *testing.T) { start := time.Now() - updater := NewFederatedUpdater(&fakeFederationView{}, "foo", &fakeEventRecorder{}, + updater := NewFederatedUpdater(&fakeFederationView{}, "foo", time.Second, &fakeEventRecorder{}, func(_ kubeclientset.Interface, obj pkgruntime.Object) error { time.Sleep(time.Minute) return nil @@ -137,7 +137,7 @@ func TestFederatedUpdaterTimeout(t *testing.T) { Type: OperationTypeUpdate, Obj: makeService("B", "s1"), }, - }, time.Second) + }) end := time.Now() assert.Error(t, err) assert.True(t, start.Add(10*time.Second).After(end))