From d7f46718a8e599afefe8915e558c369a6e3f8817 Mon Sep 17 00:00:00 2001 From: Clayton Coleman Date: Wed, 6 Aug 2014 16:12:19 -0400 Subject: [PATCH] Kubelet should have a max think time before auto resync The sync frequency should be part of the syncLoop and resync no less often than every X seconds. The current implementation runs even if a config update was delivered less than X seconds ago. --- cmd/integration/integration.go | 2 -- cmd/kubelet/kubelet.go | 8 ++------ pkg/kubelet/kubelet.go | 31 +++++++++++++++++++------------ 3 files changed, 21 insertions(+), 20 deletions(-) diff --git a/cmd/integration/integration.go b/cmd/integration/integration.go index 00fe19b1d5e..b05474d8426 100644 --- a/cmd/integration/integration.go +++ b/cmd/integration/integration.go @@ -115,7 +115,6 @@ func startComponents(manifestURL string) (apiServerURL string) { config.NewSourceURL(manifestURL, 5*time.Second, cfg1.Channel("url")) myKubelet := kubelet.NewIntegrationTestKubelet(machineList[0], &fakeDocker1) go util.Forever(func() { myKubelet.Run(cfg1.Updates()) }, 0) - go util.Forever(cfg1.Sync, 3*time.Second) go util.Forever(func() { kubelet.ListenAndServeKubeletServer(myKubelet, cfg1.Channel("http"), http.DefaultServeMux, "localhost", 10250) }, 0) @@ -127,7 +126,6 @@ func startComponents(manifestURL string) (apiServerURL string) { config.NewSourceEtcd(config.EtcdKeyForHost(machineList[1]), etcdClient, 30*time.Second, cfg2.Channel("etcd")) otherKubelet := kubelet.NewIntegrationTestKubelet(machineList[1], &fakeDocker2) go util.Forever(func() { otherKubelet.Run(cfg2.Updates()) }, 0) - go util.Forever(cfg2.Sync, 3*time.Second) go util.Forever(func() { kubelet.ListenAndServeKubeletServer(otherKubelet, cfg2.Channel("http"), http.DefaultServeMux, "localhost", 10251) }, 0) diff --git a/cmd/kubelet/kubelet.go b/cmd/kubelet/kubelet.go index 7c4ef5152a1..83de031dbc0 100644 --- a/cmd/kubelet/kubelet.go +++ b/cmd/kubelet/kubelet.go @@ -148,16 +148,12 @@ func main() { dockerClient, cadvisorClient, etcdClient, - *rootDirectory) + *rootDirectory, + *syncFrequency) // start the kubelet go util.Forever(func() { k.Run(cfg.Updates()) }, 0) - // resynchronize periodically - // TODO: make this part of PodConfig so that it is only delivered after syncFrequency has elapsed without - // an update - go util.Forever(cfg.Sync, *syncFrequency) - // start the kubelet server if *enableServer { go util.Forever(func() { diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 3cf1e356e5a..2ec196c2e18 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -64,13 +64,15 @@ func NewMainKubelet( dc DockerInterface, cc CadvisorInterface, ec tools.EtcdClient, - rd string) *Kubelet { + rd string, + ri time.Duration) *Kubelet { return &Kubelet{ hostname: hn, dockerClient: dc, cadvisorClient: cc, etcdClient: ec, rootDirectory: rd, + resyncInterval: ri, podWorkers: newPodWorkers(), } } @@ -79,19 +81,21 @@ func NewMainKubelet( // TODO: add more integration tests, and expand parameter list as needed. func NewIntegrationTestKubelet(hn string, dc DockerInterface) *Kubelet { return &Kubelet{ - hostname: hn, - dockerClient: dc, - dockerPuller: &FakeDockerPuller{}, - podWorkers: newPodWorkers(), + hostname: hn, + dockerClient: dc, + dockerPuller: &FakeDockerPuller{}, + resyncInterval: 3 * time.Second, + podWorkers: newPodWorkers(), } } // Kubelet is the main kubelet implementation. type Kubelet struct { - hostname string - dockerClient DockerInterface - rootDirectory string - podWorkers podWorkers + hostname string + dockerClient DockerInterface + rootDirectory string + podWorkers podWorkers + resyncInterval time.Duration // Optional, no events will be sent without it etcdClient tools.EtcdClient @@ -561,14 +565,15 @@ func filterHostPortConflicts(pods []Pod) []Pod { // no changes are seen to the configuration, will synchronize the last known desired // state every sync_frequency seconds. Never returns. func (kl *Kubelet) syncLoop(updates <-chan PodUpdate, handler SyncHandler) { + var pods []Pod for { - var pods []Pod select { case u := <-updates: switch u.Op { case SET: glog.Infof("Containers changed [%s]", kl.hostname) pods = u.Pods + pods = filterHostPortConflicts(pods) case UPDATE: //TODO: implement updates of containers @@ -578,10 +583,12 @@ func (kl *Kubelet) syncLoop(updates <-chan PodUpdate, handler SyncHandler) { default: panic("syncLoop does not support incremental changes") } + case <-time.After(kl.resyncInterval): + if pods == nil { + continue + } } - pods = filterHostPortConflicts(pods) - err := handler.SyncPods(pods) if err != nil { glog.Errorf("Couldn't sync containers : %v", err)