diff --git a/pkg/kubelet/config/config.go b/pkg/kubelet/config/config.go index ae2701cd3be..49467a10769 100644 --- a/pkg/kubelet/config/config.go +++ b/pkg/kubelet/config/config.go @@ -60,7 +60,7 @@ type PodConfig struct { // NewPodConfig creates an object that can merge many configuration sources into a stream // of normalized updates to a pod configuration. func NewPodConfig(mode PodConfigNotificationMode) *PodConfig { - updates := make(chan kubelet.PodUpdate, 1) + updates := make(chan kubelet.PodUpdate, 50) storage := newPodStorage(updates, mode) podConfig := &PodConfig{ pods: storage, diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 1507115c72c..3d34044073e 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -1389,24 +1389,25 @@ func filterHostPortConflicts(pods []api.BoundPod) []api.BoundPod { // state every sync_frequency seconds. Never returns. func (kl *Kubelet) syncLoop(updates <-chan PodUpdate, handler SyncHandler) { for { + unsyncedPod := false select { case u := <-updates: - switch u.Op { - case SET: - glog.V(3).Infof("SET: Containers changed") - kl.pods = u.Pods - kl.pods = filterHostPortConflicts(kl.pods) - case UPDATE: - glog.V(3).Infof("Update: Containers changed") - kl.pods = updateBoundPods(u.Pods, kl.pods) - kl.pods = filterHostPortConflicts(kl.pods) - - default: - panic("syncLoop does not support incremental changes") - } + kl.updatePods(u) + unsyncedPod = true case <-time.After(kl.resyncInterval): glog.V(4).Infof("Periodic sync") } + // If we already caught some update, try to wait for some short time + // to possibly batch it with other incoming updates. + for ; unsyncedPod; { + select { + case u := <-updates: + kl.updatePods(u) + case <-time.After(5 * time.Millisecond): + // Break the for loop. + unsyncedPod = false + } + } err := handler.SyncPods(kl.pods) if err != nil { @@ -1415,6 +1416,21 @@ func (kl *Kubelet) syncLoop(updates <-chan PodUpdate, handler SyncHandler) { } } +func (kl *Kubelet) updatePods(u PodUpdate) { + switch u.Op { + case SET: + glog.V(3).Infof("SET: Containers changed") + kl.pods = u.Pods + kl.pods = filterHostPortConflicts(kl.pods) + case UPDATE: + glog.V(3).Infof("Update: Containers changed") + kl.pods = updateBoundPods(u.Pods, kl.pods) + kl.pods = filterHostPortConflicts(kl.pods) + default: + panic("syncLoop does not support incremental changes") + } +} + // Returns Docker version for this Kubelet. func (kl *Kubelet) GetDockerVersion() ([]uint, error) { if kl.dockerClient == nil {