create and delete concurrently

This commit is contained in:
mqliang 2015-10-20 18:47:46 +08:00
parent 500493a3ac
commit 7d87a1de99

View File

@ -34,12 +34,17 @@ import (
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/workqueue"
"k8s.io/kubernetes/pkg/watch"
"sync"
)
const (
// Daemon sets will periodically check that their daemon pods are running as expected.
FullDaemonSetResyncPeriod = 30 * time.Second // TODO: Figure out if this time seems reasonable.
// Realistic value of the burstReplica field for the replication manager based off
// performance requirements for kubernetes 1.0.
BurstReplicas = 500
// We must avoid counting pods until the pod store has synced. If it hasn't synced, to
// avoid a hot loop, we'll wait this long between checks.
PodStoreSyncedPollPeriod = 100 * time.Millisecond
@ -54,6 +59,10 @@ type DaemonSetsController struct {
kubeClient client.Interface
podControl controller.PodControlInterface
// An dsc is temporarily suspended after creating/deleting these many replicas.
// It resumes normal action after observing the watch events for them.
burstReplicas int
// To allow injection of syncDaemonSet for testing.
syncHandler func(dsKey string) error
// A TTLCache of pod creates/deletes each ds expects to see
@ -89,8 +98,9 @@ func NewDaemonSetsController(kubeClient client.Interface, resyncPeriod controlle
KubeClient: kubeClient,
Recorder: eventBroadcaster.NewRecorder(api.EventSource{Component: "daemon-set"}),
},
expectations: controller.NewControllerExpectations(),
queue: workqueue.New(),
burstReplicas: BurstReplicas,
expectations: controller.NewControllerExpectations(),
queue: workqueue.New(),
}
// Manage addition/update of daemon sets.
dsc.dsStore.Store, dsc.dsController = framework.NewInformer(
@ -403,25 +413,48 @@ func (dsc *DaemonSetsController) manage(ds *extensions.DaemonSet) {
glog.Errorf("Couldn't get key for object %+v: %v", ds, err)
return
}
dsc.expectations.SetExpectations(dsKey, len(nodesNeedingDaemonPods), len(podsToDelete))
glog.V(4).Infof("Nodes needing daemon pods for daemon set %s: %+v", ds.Name, nodesNeedingDaemonPods)
for i := range nodesNeedingDaemonPods {
if err := dsc.podControl.CreatePodsOnNode(nodesNeedingDaemonPods[i], ds.Namespace, ds.Spec.Template, ds); err != nil {
glog.V(2).Infof("Failed creation, decrementing expectations for set %q/%q", ds.Namespace, ds.Name)
dsc.expectations.CreationObserved(dsKey)
util.HandleError(err)
}
createDiff := len(nodesNeedingDaemonPods)
deleteDiff := len(podsToDelete)
if createDiff > dsc.burstReplicas {
createDiff = dsc.burstReplicas
}
if deleteDiff > dsc.burstReplicas {
deleteDiff = dsc.burstReplicas
}
glog.V(4).Infof("Pods to delete for daemon set %s: %+v", ds.Name, podsToDelete)
for i := range podsToDelete {
if err := dsc.podControl.DeletePod(ds.Namespace, podsToDelete[i], ds); err != nil {
glog.V(2).Infof("Failed deletion, decrementing expectations for set %q/%q", ds.Namespace, ds.Name)
dsc.expectations.DeletionObserved(dsKey)
util.HandleError(err)
}
dsc.expectations.SetExpectations(dsKey, createDiff, deleteDiff)
glog.V(4).Infof("Nodes needing daemon pods for daemon set %s: %+v, creating %d", ds.Name, nodesNeedingDaemonPods, createDiff)
createWait := sync.WaitGroup{}
createWait.Add(createDiff)
for i := 0; i < createDiff; i++ {
go func(ix int) {
defer createWait.Done()
if err := dsc.podControl.CreatePodsOnNode(nodesNeedingDaemonPods[ix], ds.Namespace, ds.Spec.Template, ds); err != nil {
glog.V(2).Infof("Failed creation, decrementing expectations for set %q/%q", ds.Namespace, ds.Name)
dsc.expectations.CreationObserved(dsKey)
util.HandleError(err)
}
}(i)
}
createWait.Wait()
glog.V(4).Infof("Pods to delete for daemon set %s: %+v, deleting %d", ds.Name, podsToDelete, deleteDiff)
deleteWait := sync.WaitGroup{}
deleteWait.Add(deleteDiff)
for i := 0; i < deleteDiff; i++ {
go func(ix int) {
defer deleteWait.Done()
if err := dsc.podControl.DeletePod(ds.Namespace, podsToDelete[ix], ds); err != nil {
glog.V(2).Infof("Failed deletion, decrementing expectations for set %q/%q", ds.Namespace, ds.Name)
dsc.expectations.DeletionObserved(dsKey)
util.HandleError(err)
}
}(i)
}
deleteWait.Wait()
}
func storeDaemonSetStatus(dsClient client.DaemonSetInterface, ds *extensions.DaemonSet, desiredNumberScheduled, currentNumberScheduled, numberMisscheduled int) error {