mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-30 15:05:27 +00:00
Merge pull request #15939 from mqliang/waitgroup
Auto commit by PR queue bot
This commit is contained in:
commit
712fcf1c63
@ -34,12 +34,17 @@ import (
|
||||
"k8s.io/kubernetes/pkg/util"
|
||||
"k8s.io/kubernetes/pkg/util/workqueue"
|
||||
"k8s.io/kubernetes/pkg/watch"
|
||||
"sync"
|
||||
)
|
||||
|
||||
const (
|
||||
// Daemon sets will periodically check that their daemon pods are running as expected.
|
||||
FullDaemonSetResyncPeriod = 30 * time.Second // TODO: Figure out if this time seems reasonable.
|
||||
|
||||
// Realistic value of the burstReplica field for the replication manager based off
|
||||
// performance requirements for kubernetes 1.0.
|
||||
BurstReplicas = 500
|
||||
|
||||
// We must avoid counting pods until the pod store has synced. If it hasn't synced, to
|
||||
// avoid a hot loop, we'll wait this long between checks.
|
||||
PodStoreSyncedPollPeriod = 100 * time.Millisecond
|
||||
@ -54,6 +59,10 @@ type DaemonSetsController struct {
|
||||
kubeClient client.Interface
|
||||
podControl controller.PodControlInterface
|
||||
|
||||
// An dsc is temporarily suspended after creating/deleting these many replicas.
|
||||
// It resumes normal action after observing the watch events for them.
|
||||
burstReplicas int
|
||||
|
||||
// To allow injection of syncDaemonSet for testing.
|
||||
syncHandler func(dsKey string) error
|
||||
// A TTLCache of pod creates/deletes each ds expects to see
|
||||
@ -89,8 +98,9 @@ func NewDaemonSetsController(kubeClient client.Interface, resyncPeriod controlle
|
||||
KubeClient: kubeClient,
|
||||
Recorder: eventBroadcaster.NewRecorder(api.EventSource{Component: "daemon-set"}),
|
||||
},
|
||||
expectations: controller.NewControllerExpectations(),
|
||||
queue: workqueue.New(),
|
||||
burstReplicas: BurstReplicas,
|
||||
expectations: controller.NewControllerExpectations(),
|
||||
queue: workqueue.New(),
|
||||
}
|
||||
// Manage addition/update of daemon sets.
|
||||
dsc.dsStore.Store, dsc.dsController = framework.NewInformer(
|
||||
@ -403,25 +413,48 @@ func (dsc *DaemonSetsController) manage(ds *extensions.DaemonSet) {
|
||||
glog.Errorf("Couldn't get key for object %+v: %v", ds, err)
|
||||
return
|
||||
}
|
||||
dsc.expectations.SetExpectations(dsKey, len(nodesNeedingDaemonPods), len(podsToDelete))
|
||||
|
||||
glog.V(4).Infof("Nodes needing daemon pods for daemon set %s: %+v", ds.Name, nodesNeedingDaemonPods)
|
||||
for i := range nodesNeedingDaemonPods {
|
||||
if err := dsc.podControl.CreatePodsOnNode(nodesNeedingDaemonPods[i], ds.Namespace, ds.Spec.Template, ds); err != nil {
|
||||
glog.V(2).Infof("Failed creation, decrementing expectations for set %q/%q", ds.Namespace, ds.Name)
|
||||
dsc.expectations.CreationObserved(dsKey)
|
||||
util.HandleError(err)
|
||||
}
|
||||
createDiff := len(nodesNeedingDaemonPods)
|
||||
deleteDiff := len(podsToDelete)
|
||||
|
||||
if createDiff > dsc.burstReplicas {
|
||||
createDiff = dsc.burstReplicas
|
||||
}
|
||||
if deleteDiff > dsc.burstReplicas {
|
||||
deleteDiff = dsc.burstReplicas
|
||||
}
|
||||
|
||||
glog.V(4).Infof("Pods to delete for daemon set %s: %+v", ds.Name, podsToDelete)
|
||||
for i := range podsToDelete {
|
||||
if err := dsc.podControl.DeletePod(ds.Namespace, podsToDelete[i], ds); err != nil {
|
||||
glog.V(2).Infof("Failed deletion, decrementing expectations for set %q/%q", ds.Namespace, ds.Name)
|
||||
dsc.expectations.DeletionObserved(dsKey)
|
||||
util.HandleError(err)
|
||||
}
|
||||
dsc.expectations.SetExpectations(dsKey, createDiff, deleteDiff)
|
||||
|
||||
glog.V(4).Infof("Nodes needing daemon pods for daemon set %s: %+v, creating %d", ds.Name, nodesNeedingDaemonPods, createDiff)
|
||||
createWait := sync.WaitGroup{}
|
||||
createWait.Add(createDiff)
|
||||
for i := 0; i < createDiff; i++ {
|
||||
go func(ix int) {
|
||||
defer createWait.Done()
|
||||
if err := dsc.podControl.CreatePodsOnNode(nodesNeedingDaemonPods[ix], ds.Namespace, ds.Spec.Template, ds); err != nil {
|
||||
glog.V(2).Infof("Failed creation, decrementing expectations for set %q/%q", ds.Namespace, ds.Name)
|
||||
dsc.expectations.CreationObserved(dsKey)
|
||||
util.HandleError(err)
|
||||
}
|
||||
}(i)
|
||||
}
|
||||
createWait.Wait()
|
||||
|
||||
glog.V(4).Infof("Pods to delete for daemon set %s: %+v, deleting %d", ds.Name, podsToDelete, deleteDiff)
|
||||
deleteWait := sync.WaitGroup{}
|
||||
deleteWait.Add(deleteDiff)
|
||||
for i := 0; i < deleteDiff; i++ {
|
||||
go func(ix int) {
|
||||
defer deleteWait.Done()
|
||||
if err := dsc.podControl.DeletePod(ds.Namespace, podsToDelete[ix], ds); err != nil {
|
||||
glog.V(2).Infof("Failed deletion, decrementing expectations for set %q/%q", ds.Namespace, ds.Name)
|
||||
dsc.expectations.DeletionObserved(dsKey)
|
||||
util.HandleError(err)
|
||||
}
|
||||
}(i)
|
||||
}
|
||||
deleteWait.Wait()
|
||||
}
|
||||
|
||||
func storeDaemonSetStatus(dsClient client.DaemonSetInterface, ds *extensions.DaemonSet, desiredNumberScheduled, currentNumberScheduled, numberMisscheduled int) error {
|
||||
|
Loading…
Reference in New Issue
Block a user