From 7d87a1de99de5ac4bb2cef48781f63f46b91f075 Mon Sep 17 00:00:00 2001 From: mqliang Date: Tue, 20 Oct 2015 18:47:46 +0800 Subject: [PATCH] create and delete concurrently --- pkg/controller/daemon/controller.go | 67 +++++++++++++++++++++-------- 1 file changed, 50 insertions(+), 17 deletions(-) diff --git a/pkg/controller/daemon/controller.go b/pkg/controller/daemon/controller.go index aadb6c7fa3a..41cf4af0d07 100644 --- a/pkg/controller/daemon/controller.go +++ b/pkg/controller/daemon/controller.go @@ -34,12 +34,17 @@ import ( "k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util/workqueue" "k8s.io/kubernetes/pkg/watch" + "sync" ) const ( // Daemon sets will periodically check that their daemon pods are running as expected. FullDaemonSetResyncPeriod = 30 * time.Second // TODO: Figure out if this time seems reasonable. + // Realistic value of the burstReplica field for the replication manager based off + // performance requirements for kubernetes 1.0. + BurstReplicas = 500 + // We must avoid counting pods until the pod store has synced. If it hasn't synced, to // avoid a hot loop, we'll wait this long between checks. PodStoreSyncedPollPeriod = 100 * time.Millisecond @@ -54,6 +59,10 @@ type DaemonSetsController struct { kubeClient client.Interface podControl controller.PodControlInterface + // An dsc is temporarily suspended after creating/deleting these many replicas. + // It resumes normal action after observing the watch events for them. + burstReplicas int + // To allow injection of syncDaemonSet for testing. syncHandler func(dsKey string) error // A TTLCache of pod creates/deletes each ds expects to see @@ -89,8 +98,9 @@ func NewDaemonSetsController(kubeClient client.Interface, resyncPeriod controlle KubeClient: kubeClient, Recorder: eventBroadcaster.NewRecorder(api.EventSource{Component: "daemon-set"}), }, - expectations: controller.NewControllerExpectations(), - queue: workqueue.New(), + burstReplicas: BurstReplicas, + expectations: controller.NewControllerExpectations(), + queue: workqueue.New(), } // Manage addition/update of daemon sets. dsc.dsStore.Store, dsc.dsController = framework.NewInformer( @@ -403,25 +413,48 @@ func (dsc *DaemonSetsController) manage(ds *extensions.DaemonSet) { glog.Errorf("Couldn't get key for object %+v: %v", ds, err) return } - dsc.expectations.SetExpectations(dsKey, len(nodesNeedingDaemonPods), len(podsToDelete)) - glog.V(4).Infof("Nodes needing daemon pods for daemon set %s: %+v", ds.Name, nodesNeedingDaemonPods) - for i := range nodesNeedingDaemonPods { - if err := dsc.podControl.CreatePodsOnNode(nodesNeedingDaemonPods[i], ds.Namespace, ds.Spec.Template, ds); err != nil { - glog.V(2).Infof("Failed creation, decrementing expectations for set %q/%q", ds.Namespace, ds.Name) - dsc.expectations.CreationObserved(dsKey) - util.HandleError(err) - } + createDiff := len(nodesNeedingDaemonPods) + deleteDiff := len(podsToDelete) + + if createDiff > dsc.burstReplicas { + createDiff = dsc.burstReplicas + } + if deleteDiff > dsc.burstReplicas { + deleteDiff = dsc.burstReplicas } - glog.V(4).Infof("Pods to delete for daemon set %s: %+v", ds.Name, podsToDelete) - for i := range podsToDelete { - if err := dsc.podControl.DeletePod(ds.Namespace, podsToDelete[i], ds); err != nil { - glog.V(2).Infof("Failed deletion, decrementing expectations for set %q/%q", ds.Namespace, ds.Name) - dsc.expectations.DeletionObserved(dsKey) - util.HandleError(err) - } + dsc.expectations.SetExpectations(dsKey, createDiff, deleteDiff) + + glog.V(4).Infof("Nodes needing daemon pods for daemon set %s: %+v, creating %d", ds.Name, nodesNeedingDaemonPods, createDiff) + createWait := sync.WaitGroup{} + createWait.Add(createDiff) + for i := 0; i < createDiff; i++ { + go func(ix int) { + defer createWait.Done() + if err := dsc.podControl.CreatePodsOnNode(nodesNeedingDaemonPods[ix], ds.Namespace, ds.Spec.Template, ds); err != nil { + glog.V(2).Infof("Failed creation, decrementing expectations for set %q/%q", ds.Namespace, ds.Name) + dsc.expectations.CreationObserved(dsKey) + util.HandleError(err) + } + }(i) } + createWait.Wait() + + glog.V(4).Infof("Pods to delete for daemon set %s: %+v, deleting %d", ds.Name, podsToDelete, deleteDiff) + deleteWait := sync.WaitGroup{} + deleteWait.Add(deleteDiff) + for i := 0; i < deleteDiff; i++ { + go func(ix int) { + defer deleteWait.Done() + if err := dsc.podControl.DeletePod(ds.Namespace, podsToDelete[ix], ds); err != nil { + glog.V(2).Infof("Failed deletion, decrementing expectations for set %q/%q", ds.Namespace, ds.Name) + dsc.expectations.DeletionObserved(dsKey) + util.HandleError(err) + } + }(i) + } + deleteWait.Wait() } func storeDaemonSetStatus(dsClient client.DaemonSetInterface, ds *extensions.DaemonSet, desiredNumberScheduled, currentNumberScheduled, numberMisscheduled int) error {