Batch updates of multiple Pods.

This commit is contained in:
Wojciech Tyczynski 2015-02-18 16:08:32 +01:00
parent e81d9abcac
commit 25c8f07c67
2 changed files with 30 additions and 14 deletions

View File

@ -60,7 +60,7 @@ type PodConfig struct {
// NewPodConfig creates an object that can merge many configuration sources into a stream
// of normalized updates to a pod configuration.
func NewPodConfig(mode PodConfigNotificationMode) *PodConfig {
updates := make(chan kubelet.PodUpdate, 1)
updates := make(chan kubelet.PodUpdate, 50)
storage := newPodStorage(updates, mode)
podConfig := &PodConfig{
pods: storage,

View File

@ -1389,24 +1389,25 @@ func filterHostPortConflicts(pods []api.BoundPod) []api.BoundPod {
// state every sync_frequency seconds. Never returns.
func (kl *Kubelet) syncLoop(updates <-chan PodUpdate, handler SyncHandler) {
for {
unsyncedPod := false
select {
case u := <-updates:
switch u.Op {
case SET:
glog.V(3).Infof("SET: Containers changed")
kl.pods = u.Pods
kl.pods = filterHostPortConflicts(kl.pods)
case UPDATE:
glog.V(3).Infof("Update: Containers changed")
kl.pods = updateBoundPods(u.Pods, kl.pods)
kl.pods = filterHostPortConflicts(kl.pods)
default:
panic("syncLoop does not support incremental changes")
}
kl.updatePods(u)
unsyncedPod = true
case <-time.After(kl.resyncInterval):
glog.V(4).Infof("Periodic sync")
}
// If we already caught some update, try to wait for some short time
// to possibly batch it with other incoming updates.
for ; unsyncedPod; {
select {
case u := <-updates:
kl.updatePods(u)
case <-time.After(5 * time.Millisecond):
// Break the for loop.
unsyncedPod = false
}
}
err := handler.SyncPods(kl.pods)
if err != nil {
@ -1415,6 +1416,21 @@ func (kl *Kubelet) syncLoop(updates <-chan PodUpdate, handler SyncHandler) {
}
}
func (kl *Kubelet) updatePods(u PodUpdate) {
switch u.Op {
case SET:
glog.V(3).Infof("SET: Containers changed")
kl.pods = u.Pods
kl.pods = filterHostPortConflicts(kl.pods)
case UPDATE:
glog.V(3).Infof("Update: Containers changed")
kl.pods = updateBoundPods(u.Pods, kl.pods)
kl.pods = filterHostPortConflicts(kl.pods)
default:
panic("syncLoop does not support incremental changes")
}
}
// Returns Docker version for this Kubelet.
func (kl *Kubelet) GetDockerVersion() ([]uint, error) {
if kl.dockerClient == nil {