fed: Provide updater timeout to instance rather than to Update()

This commit is contained in:
Maru Newby 2017-05-04 13:37:33 -07:00
parent 3fbfafdd0a
commit 3f2dab896c
9 changed files with 37 additions and 42 deletions

View File

@ -188,7 +188,7 @@ func NewDeploymentController(federationClient fedclientset.Interface) *Deploymen
), ),
) )
fdc.fedUpdater = fedutil.NewFederatedUpdater(fdc.fedDeploymentInformer, "deployment", fdc.eventRecorder, fdc.fedUpdater = fedutil.NewFederatedUpdater(fdc.fedDeploymentInformer, "deployment", updateTimeout, fdc.eventRecorder,
func(client kubeclientset.Interface, obj runtime.Object) error { func(client kubeclientset.Interface, obj runtime.Object) error {
rs := obj.(*extensionsv1.Deployment) rs := obj.(*extensionsv1.Deployment)
_, err := client.Extensions().Deployments(rs.Namespace).Create(rs) _, err := client.Extensions().Deployments(rs.Namespace).Create(rs)
@ -213,7 +213,6 @@ func NewDeploymentController(federationClient fedclientset.Interface) *Deploymen
deployment := obj.(*extensionsv1.Deployment) deployment := obj.(*extensionsv1.Deployment)
return fmt.Sprintf("%s/%s", deployment.Namespace, deployment.Name) return fmt.Sprintf("%s/%s", deployment.Namespace, deployment.Name)
}, },
updateTimeout,
fdc.fedDeploymentInformer, fdc.fedDeploymentInformer,
fdc.fedUpdater, fdc.fedUpdater,
) )
@ -567,7 +566,7 @@ func (fdc *DeploymentController) reconcileDeployment(key string) (reconciliation
// Everything is in order // Everything is in order
return statusAllOk, nil return statusAllOk, nil
} }
err = fdc.fedUpdater.Update(operations, updateTimeout) err = fdc.fedUpdater.Update(operations)
if err != nil { if err != nil {
glog.Errorf("Failed to execute updates for %s: %v", key, err) glog.Errorf("Failed to execute updates for %s: %v", key, err)
return statusError, err return statusError, err

View File

@ -230,7 +230,7 @@ func NewIngressController(client federationclientset.Interface) *IngressControll
) )
// Federated ingress updater along with Create/Update/Delete operations. // Federated ingress updater along with Create/Update/Delete operations.
ic.federatedIngressUpdater = util.NewFederatedUpdater(ic.ingressFederatedInformer, "ingress", ic.eventRecorder, ic.federatedIngressUpdater = util.NewFederatedUpdater(ic.ingressFederatedInformer, "ingress", ic.updateTimeout, ic.eventRecorder,
func(client kubeclientset.Interface, obj pkgruntime.Object) error { func(client kubeclientset.Interface, obj pkgruntime.Object) error {
ingress := obj.(*extensionsv1beta1.Ingress) ingress := obj.(*extensionsv1beta1.Ingress)
glog.V(4).Infof("Attempting to create Ingress: %v", ingress) glog.V(4).Infof("Attempting to create Ingress: %v", ingress)
@ -262,7 +262,7 @@ func NewIngressController(client federationclientset.Interface) *IngressControll
}) })
// Federated configmap updater along with Create/Update/Delete operations. Only Update should ever be called. // Federated configmap updater along with Create/Update/Delete operations. Only Update should ever be called.
ic.federatedConfigMapUpdater = util.NewFederatedUpdater(ic.configMapFederatedInformer, "configmap", ic.eventRecorder, ic.federatedConfigMapUpdater = util.NewFederatedUpdater(ic.configMapFederatedInformer, "configmap", ic.updateTimeout, ic.eventRecorder,
func(client kubeclientset.Interface, obj pkgruntime.Object) error { func(client kubeclientset.Interface, obj pkgruntime.Object) error {
configMap := obj.(*v1.ConfigMap) configMap := obj.(*v1.ConfigMap)
configMapName := types.NamespacedName{Name: configMap.Name, Namespace: configMap.Namespace} configMapName := types.NamespacedName{Name: configMap.Name, Namespace: configMap.Namespace}
@ -297,7 +297,6 @@ func NewIngressController(client federationclientset.Interface) *IngressControll
ingress := obj.(*extensionsv1beta1.Ingress) ingress := obj.(*extensionsv1beta1.Ingress)
return fmt.Sprintf("%s/%s", ingress.Namespace, ingress.Name) return fmt.Sprintf("%s/%s", ingress.Namespace, ingress.Name)
}, },
ic.updateTimeout,
ic.ingressFederatedInformer, ic.ingressFederatedInformer,
ic.federatedIngressUpdater, ic.federatedIngressUpdater,
) )
@ -569,7 +568,7 @@ func (ic *IngressController) reconcileConfigMap(cluster *federationapi.Cluster,
Key: configMapNsName.String(), Key: configMapNsName.String(),
}} }}
glog.V(4).Infof("Calling federatedConfigMapUpdater.Update() - operations: %v", operations) glog.V(4).Infof("Calling federatedConfigMapUpdater.Update() - operations: %v", operations)
err := ic.federatedConfigMapUpdater.Update(operations, ic.updateTimeout) err := ic.federatedConfigMapUpdater.Update(operations)
if err != nil { if err != nil {
glog.Errorf("Failed to execute update of ConfigMap %q on cluster %q: %v", configMapNsName, cluster.Name, err) glog.Errorf("Failed to execute update of ConfigMap %q on cluster %q: %v", configMapNsName, cluster.Name, err)
ic.configMapDeliverer.DeliverAfter(cluster.Name, nil, ic.configMapReviewDelay) ic.configMapDeliverer.DeliverAfter(cluster.Name, nil, ic.configMapReviewDelay)
@ -885,7 +884,7 @@ func (ic *IngressController) reconcileIngress(ingress types.NamespacedName) {
return return
} }
glog.V(4).Infof("Calling federatedUpdater.Update() - operations: %v", operations) glog.V(4).Infof("Calling federatedUpdater.Update() - operations: %v", operations)
err = ic.federatedIngressUpdater.Update(operations, ic.updateTimeout) err = ic.federatedIngressUpdater.Update(operations)
if err != nil { if err != nil {
glog.Errorf("Failed to execute updates for %s: %v", ingress, err) glog.Errorf("Failed to execute updates for %s: %v", ingress, err)
ic.deliverIngress(ingress, ic.ingressReviewDelay, true) ic.deliverIngress(ingress, ic.ingressReviewDelay, true)

View File

@ -157,7 +157,7 @@ func NewNamespaceController(client federationclientset.Interface, dynamicClientP
) )
// Federated updater along with Create/Update/Delete operations. // Federated updater along with Create/Update/Delete operations.
nc.federatedUpdater = util.NewFederatedUpdater(nc.namespaceFederatedInformer, "namespace", nc.eventRecorder, nc.federatedUpdater = util.NewFederatedUpdater(nc.namespaceFederatedInformer, "namespace", nc.updateTimeout, nc.eventRecorder,
func(client kubeclientset.Interface, obj runtime.Object) error { func(client kubeclientset.Interface, obj runtime.Object) error {
namespace := obj.(*apiv1.Namespace) namespace := obj.(*apiv1.Namespace)
_, err := client.Core().Namespaces().Create(namespace) _, err := client.Core().Namespaces().Create(namespace)
@ -186,7 +186,6 @@ func NewNamespaceController(client federationclientset.Interface, dynamicClientP
namespace := obj.(*apiv1.Namespace) namespace := obj.(*apiv1.Namespace)
return fmt.Sprintf("%s/%s", namespace.Namespace, namespace.Name) return fmt.Sprintf("%s/%s", namespace.Namespace, namespace.Name)
}, },
nc.updateTimeout,
nc.namespaceFederatedInformer, nc.namespaceFederatedInformer,
nc.federatedUpdater, nc.federatedUpdater,
) )
@ -397,7 +396,7 @@ func (nc *NamespaceController) reconcileNamespace(namespace string) {
} }
glog.V(2).Infof("Updating namespace %s in underlying clusters. Operations: %d", baseNamespace.Name, len(operations)) glog.V(2).Infof("Updating namespace %s in underlying clusters. Operations: %d", baseNamespace.Name, len(operations))
err = nc.federatedUpdater.Update(operations, nc.updateTimeout) err = nc.federatedUpdater.Update(operations)
if err != nil { if err != nil {
glog.Errorf("Failed to execute updates for %s: %v", namespace, err) glog.Errorf("Failed to execute updates for %s: %v", namespace, err)
nc.deliverNamespace(namespace, 0, true) nc.deliverNamespace(namespace, 0, true)

View File

@ -196,7 +196,7 @@ func NewReplicaSetController(federationClient fedclientset.Interface) *ReplicaSe
) )
frsc.replicaSetLister = extensionslisters.NewReplicaSetLister(replicaSetIndexer) frsc.replicaSetLister = extensionslisters.NewReplicaSetLister(replicaSetIndexer)
frsc.fedUpdater = fedutil.NewFederatedUpdater(frsc.fedReplicaSetInformer, "replicaset", frsc.eventRecorder, frsc.fedUpdater = fedutil.NewFederatedUpdater(frsc.fedReplicaSetInformer, "replicaset", updateTimeout, frsc.eventRecorder,
func(client kubeclientset.Interface, obj runtime.Object) error { func(client kubeclientset.Interface, obj runtime.Object) error {
rs := obj.(*extensionsv1.ReplicaSet) rs := obj.(*extensionsv1.ReplicaSet)
_, err := client.Extensions().ReplicaSets(rs.Namespace).Create(rs) _, err := client.Extensions().ReplicaSets(rs.Namespace).Create(rs)
@ -221,7 +221,6 @@ func NewReplicaSetController(federationClient fedclientset.Interface) *ReplicaSe
replicaset := obj.(*extensionsv1.ReplicaSet) replicaset := obj.(*extensionsv1.ReplicaSet)
return fmt.Sprintf("%s/%s", replicaset.Namespace, replicaset.Name) return fmt.Sprintf("%s/%s", replicaset.Namespace, replicaset.Name)
}, },
updateTimeout,
frsc.fedReplicaSetInformer, frsc.fedReplicaSetInformer,
frsc.fedUpdater, frsc.fedUpdater,
) )
@ -579,7 +578,7 @@ func (frsc *ReplicaSetController) reconcileReplicaSet(key string) (reconciliatio
// Everything is in order // Everything is in order
return statusAllOk, nil return statusAllOk, nil
} }
err = frsc.fedUpdater.Update(operations, updateTimeout) err = frsc.fedUpdater.Update(operations)
if err != nil { if err != nil {
glog.Errorf("Failed to execute updates for %s: %v", key, err) glog.Errorf("Failed to execute updates for %s: %v", key, err)
return statusError, err return statusError, err

View File

@ -185,7 +185,7 @@ func New(federationClient fedclientset.Interface, dns dnsprovider.Interface,
s.federatedInformer = fedutil.NewFederatedInformer(federationClient, fedInformerFactory, &clusterLifecycle) s.federatedInformer = fedutil.NewFederatedInformer(federationClient, fedInformerFactory, &clusterLifecycle)
s.federatedUpdater = fedutil.NewFederatedUpdater(s.federatedInformer, "service", s.eventRecorder, s.federatedUpdater = fedutil.NewFederatedUpdater(s.federatedInformer, "service", updateTimeout, s.eventRecorder,
func(client kubeclientset.Interface, obj pkgruntime.Object) error { func(client kubeclientset.Interface, obj pkgruntime.Object) error {
svc := obj.(*v1.Service) svc := obj.(*v1.Service)
_, err := client.Core().Services(svc.Namespace).Create(svc) _, err := client.Core().Services(svc.Namespace).Create(svc)
@ -242,7 +242,6 @@ func New(federationClient fedclientset.Interface, dns dnsprovider.Interface,
service := obj.(*v1.Service) service := obj.(*v1.Service)
return fmt.Sprintf("%s/%s", service.Namespace, service.Name) return fmt.Sprintf("%s/%s", service.Namespace, service.Name)
}, },
updateTimeout,
s.federatedInformer, s.federatedInformer,
s.federatedUpdater, s.federatedUpdater,
) )
@ -600,7 +599,7 @@ func (s *ServiceController) reconcileService(key string) reconciliationStatus {
} }
if len(operations) != 0 { if len(operations) != 0 {
err = s.federatedUpdater.Update(operations, s.updateTimeout) err = s.federatedUpdater.Update(operations)
if err != nil { if err != nil {
if !errors.IsAlreadyExists(err) { if !errors.IsAlreadyExists(err) {
runtime.HandleError(fmt.Errorf("Failed to execute updates for %s: %v", key, err)) runtime.HandleError(fmt.Errorf("Failed to execute updates for %s: %v", key, err))

View File

@ -172,7 +172,7 @@ func newFederationSyncController(client federationclientset.Interface, adapter f
) )
// Federated updeater along with Create/Update/Delete operations. // Federated updeater along with Create/Update/Delete operations.
s.updater = util.NewFederatedUpdater(s.informer, adapter.Kind(), s.eventRecorder, s.updater = util.NewFederatedUpdater(s.informer, adapter.Kind(), s.updateTimeout, s.eventRecorder,
func(client kubeclientset.Interface, obj pkgruntime.Object) error { func(client kubeclientset.Interface, obj pkgruntime.Object) error {
_, err := adapter.ClusterCreate(client, obj) _, err := adapter.ClusterCreate(client, obj)
return err return err
@ -194,7 +194,6 @@ func newFederationSyncController(client federationclientset.Interface, adapter f
func(obj pkgruntime.Object) string { func(obj pkgruntime.Object) string {
return adapter.NamespacedName(obj).String() return adapter.NamespacedName(obj).String()
}, },
s.updateTimeout,
s.informer, s.informer,
s.updater, s.updater,
) )
@ -414,7 +413,7 @@ func (s *FederationSyncController) reconcile(namespacedName types.NamespacedName
return statusAllOK return statusAllOK
} }
err = s.updater.Update(operations, s.updateTimeout) err = s.updater.Update(operations)
if err != nil { if err != nil {
glog.Errorf("failed to execute updates for %s: %v", key, err) glog.Errorf("failed to execute updates for %s: %v", key, err)
return statusError return statusError

View File

@ -22,7 +22,6 @@ package deletionhelper
import ( import (
"fmt" "fmt"
"strings" "strings"
"time"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
@ -50,18 +49,16 @@ type ObjNameFunc func(runtime.Object) string
type DeletionHelper struct { type DeletionHelper struct {
updateObjFunc UpdateObjFunc updateObjFunc UpdateObjFunc
objNameFunc ObjNameFunc objNameFunc ObjNameFunc
updateTimeout time.Duration
informer util.FederatedInformer informer util.FederatedInformer
updater util.FederatedUpdater updater util.FederatedUpdater
} }
func NewDeletionHelper( func NewDeletionHelper(
updateObjFunc UpdateObjFunc, objNameFunc ObjNameFunc, updateTimeout time.Duration, updateObjFunc UpdateObjFunc, objNameFunc ObjNameFunc,
informer util.FederatedInformer, updater util.FederatedUpdater) *DeletionHelper { informer util.FederatedInformer, updater util.FederatedUpdater) *DeletionHelper {
return &DeletionHelper{ return &DeletionHelper{
updateObjFunc: updateObjFunc, updateObjFunc: updateObjFunc,
objNameFunc: objNameFunc, objNameFunc: objNameFunc,
updateTimeout: updateTimeout,
informer: informer, informer: informer,
updater: updater, updater: updater,
} }
@ -155,7 +152,7 @@ func (dh *DeletionHelper) HandleObjectInUnderlyingClusters(obj runtime.Object) (
Key: objName, Key: objName,
}) })
} }
err = dh.updater.Update(operations, dh.updateTimeout) err = dh.updater.Update(operations)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to execute updates for obj %s: %v", objName, err) return nil, fmt.Errorf("failed to execute updates for obj %s: %v", objName, err)
} }

View File

@ -48,11 +48,8 @@ type FederatedOperation struct {
// A helper that executes the given set of updates on federation, in parallel. // A helper that executes the given set of updates on federation, in parallel.
type FederatedUpdater interface { type FederatedUpdater interface {
// Executes the given set of operations within the specified timeout. // Executes the given set of operations.
// Timeout is best-effort. There is no guarantee that the underlying operations are Update([]FederatedOperation) error
// stopped when it is reached. However the function will return after the timeout
// with a non-nil error.
Update([]FederatedOperation, time.Duration) error
} }
// A function that executes some operation using the passed client and object. // A function that executes some operation using the passed client and object.
@ -63,6 +60,8 @@ type federatedUpdaterImpl struct {
kind string kind string
timeout time.Duration
eventRecorder record.EventRecorder eventRecorder record.EventRecorder
addFunction FederatedOperationHandler addFunction FederatedOperationHandler
@ -70,10 +69,11 @@ type federatedUpdaterImpl struct {
deleteFunction FederatedOperationHandler deleteFunction FederatedOperationHandler
} }
func NewFederatedUpdater(federation FederationView, kind string, recorder record.EventRecorder, add, update, del FederatedOperationHandler) FederatedUpdater { func NewFederatedUpdater(federation FederationView, kind string, timeout time.Duration, recorder record.EventRecorder, add, update, del FederatedOperationHandler) FederatedUpdater {
return &federatedUpdaterImpl{ return &federatedUpdaterImpl{
federation: federation, federation: federation,
kind: kind, kind: kind,
timeout: timeout,
eventRecorder: recorder, eventRecorder: recorder,
addFunction: add, addFunction: add,
updateFunction: update, updateFunction: update,
@ -86,7 +86,11 @@ func (fu *federatedUpdaterImpl) recordEvent(obj runtime.Object, eventType, event
fu.eventRecorder.Eventf(obj, api.EventTypeNormal, eventType, messageFmt, args...) fu.eventRecorder.Eventf(obj, api.EventTypeNormal, eventType, messageFmt, args...)
} }
func (fu *federatedUpdaterImpl) Update(ops []FederatedOperation, timeout time.Duration) error { // Update executes the given set of operations within the timeout specified for
// the instance. Timeout is best-effort. There is no guarantee that the
// underlying operations are stopped when it is reached. However the function
// will return after the timeout with a non-nil error.
func (fu *federatedUpdaterImpl) Update(ops []FederatedOperation) error {
done := make(chan error, len(ops)) done := make(chan error, len(ops))
for _, op := range ops { for _, op := range ops {
go func(op FederatedOperation) { go func(op FederatedOperation) {
@ -136,16 +140,16 @@ func (fu *federatedUpdaterImpl) Update(ops []FederatedOperation, timeout time.Du
start := time.Now() start := time.Now()
for i := 0; i < len(ops); i++ { for i := 0; i < len(ops); i++ {
now := time.Now() now := time.Now()
if !now.Before(start.Add(timeout)) { if !now.Before(start.Add(fu.timeout)) {
return fmt.Errorf("failed to finish all operations in %v", timeout) return fmt.Errorf("failed to finish all operations in %v", fu.timeout)
} }
select { select {
case err := <-done: case err := <-done:
if err != nil { if err != nil {
return err return err
} }
case <-time.After(start.Add(timeout).Sub(now)): case <-time.After(start.Add(fu.timeout).Sub(now)):
return fmt.Errorf("failed to finish all operations in %v", timeout) return fmt.Errorf("failed to finish all operations in %v", fu.timeout)
} }
} }
// All operations finished in time. // All operations finished in time.

View File

@ -70,7 +70,7 @@ func TestFederatedUpdaterOK(t *testing.T) {
addChan := make(chan string, 5) addChan := make(chan string, 5)
updateChan := make(chan string, 5) updateChan := make(chan string, 5)
updater := NewFederatedUpdater(&fakeFederationView{}, "foo", &fakeEventRecorder{}, updater := NewFederatedUpdater(&fakeFederationView{}, "foo", time.Minute, &fakeEventRecorder{},
func(_ kubeclientset.Interface, obj pkgruntime.Object) error { func(_ kubeclientset.Interface, obj pkgruntime.Object) error {
service := obj.(*apiv1.Service) service := obj.(*apiv1.Service)
addChan <- service.Name addChan <- service.Name
@ -92,7 +92,7 @@ func TestFederatedUpdaterOK(t *testing.T) {
Type: OperationTypeUpdate, Type: OperationTypeUpdate,
Obj: makeService("B", "s2"), Obj: makeService("B", "s2"),
}, },
}, time.Minute) })
assert.NoError(t, err) assert.NoError(t, err)
add := <-addChan add := <-addChan
update := <-updateChan update := <-updateChan
@ -101,7 +101,7 @@ func TestFederatedUpdaterOK(t *testing.T) {
} }
func TestFederatedUpdaterError(t *testing.T) { func TestFederatedUpdaterError(t *testing.T) {
updater := NewFederatedUpdater(&fakeFederationView{}, "foo", &fakeEventRecorder{}, updater := NewFederatedUpdater(&fakeFederationView{}, "foo", time.Minute, &fakeEventRecorder{},
func(_ kubeclientset.Interface, obj pkgruntime.Object) error { func(_ kubeclientset.Interface, obj pkgruntime.Object) error {
return fmt.Errorf("boom") return fmt.Errorf("boom")
}, noop, noop) }, noop, noop)
@ -115,13 +115,13 @@ func TestFederatedUpdaterError(t *testing.T) {
Type: OperationTypeUpdate, Type: OperationTypeUpdate,
Obj: makeService("B", "s1"), Obj: makeService("B", "s1"),
}, },
}, time.Minute) })
assert.Error(t, err) assert.Error(t, err)
} }
func TestFederatedUpdaterTimeout(t *testing.T) { func TestFederatedUpdaterTimeout(t *testing.T) {
start := time.Now() start := time.Now()
updater := NewFederatedUpdater(&fakeFederationView{}, "foo", &fakeEventRecorder{}, updater := NewFederatedUpdater(&fakeFederationView{}, "foo", time.Second, &fakeEventRecorder{},
func(_ kubeclientset.Interface, obj pkgruntime.Object) error { func(_ kubeclientset.Interface, obj pkgruntime.Object) error {
time.Sleep(time.Minute) time.Sleep(time.Minute)
return nil return nil
@ -137,7 +137,7 @@ func TestFederatedUpdaterTimeout(t *testing.T) {
Type: OperationTypeUpdate, Type: OperationTypeUpdate,
Obj: makeService("B", "s1"), Obj: makeService("B", "s1"),
}, },
}, time.Second) })
end := time.Now() end := time.Now()
assert.Error(t, err) assert.Error(t, err)
assert.True(t, start.Add(10*time.Second).After(end)) assert.True(t, start.Add(10*time.Second).After(end))