Merge pull request #4531 from wojtek-t/batch_requests_in_kubelet

Batch updates of multiple Pods.
This commit is contained in:
Victor Marmol
2015-02-19 10:50:26 -08:00
2 changed files with 30 additions and 14 deletions

View File

@@ -60,7 +60,7 @@ type PodConfig struct {
// NewPodConfig creates an object that can merge many configuration sources into a stream
// of normalized updates to a pod configuration.
func NewPodConfig(mode PodConfigNotificationMode) *PodConfig {
updates := make(chan kubelet.PodUpdate, 1)
updates := make(chan kubelet.PodUpdate, 50)
storage := newPodStorage(updates, mode)
podConfig := &PodConfig{
pods: storage,

View File

@@ -1398,24 +1398,25 @@ func filterHostPortConflicts(pods []api.BoundPod) []api.BoundPod {
// state every sync_frequency seconds. Never returns.
func (kl *Kubelet) syncLoop(updates <-chan PodUpdate, handler SyncHandler) {
for {
unsyncedPod := false
select {
case u := <-updates:
switch u.Op {
case SET:
glog.V(3).Infof("SET: Containers changed")
kl.pods = u.Pods
kl.pods = filterHostPortConflicts(kl.pods)
case UPDATE:
glog.V(3).Infof("Update: Containers changed")
kl.pods = updateBoundPods(u.Pods, kl.pods)
kl.pods = filterHostPortConflicts(kl.pods)
default:
panic("syncLoop does not support incremental changes")
}
kl.updatePods(u)
unsyncedPod = true
case <-time.After(kl.resyncInterval):
glog.V(4).Infof("Periodic sync")
}
// If we already caught some update, try to wait for some short time
// to possibly batch it with other incoming updates.
for unsyncedPod {
select {
case u := <-updates:
kl.updatePods(u)
case <-time.After(5 * time.Millisecond):
// Break the for loop.
unsyncedPod = false
}
}
err := handler.SyncPods(kl.pods)
if err != nil {
@@ -1424,6 +1425,21 @@ func (kl *Kubelet) syncLoop(updates <-chan PodUpdate, handler SyncHandler) {
}
}
func (kl *Kubelet) updatePods(u PodUpdate) {
switch u.Op {
case SET:
glog.V(3).Infof("SET: Containers changed")
kl.pods = u.Pods
kl.pods = filterHostPortConflicts(kl.pods)
case UPDATE:
glog.V(3).Infof("Update: Containers changed")
kl.pods = updateBoundPods(u.Pods, kl.pods)
kl.pods = filterHostPortConflicts(kl.pods)
default:
panic("syncLoop does not support incremental changes")
}
}
// Returns Docker version for this Kubelet.
func (kl *Kubelet) GetDockerVersion() ([]uint, error) {
if kl.dockerClient == nil {