From cda4b6c598a273da8e6774487af7ad764ae64e9a Mon Sep 17 00:00:00 2001 From: Victor Marmol Date: Wed, 18 Mar 2015 15:31:37 -0700 Subject: [PATCH] Spread out pod status updates to apiserver. Lowers pod status interval to every 2m and spreads updates within that time window. --- cmd/kubelet/app/server.go | 51 ++++++++++++++++++++++----------------- pkg/kubelet/kubelet.go | 50 ++++++++++++++++++++++++++++---------- 2 files changed, 66 insertions(+), 35 deletions(-) diff --git a/cmd/kubelet/app/server.go b/cmd/kubelet/app/server.go index 60d9beff73d..062aec4a4c0 100644 --- a/cmd/kubelet/app/server.go +++ b/cmd/kubelet/app/server.go @@ -51,6 +51,7 @@ type KubeletServer struct { FileCheckFrequency time.Duration HTTPCheckFrequency time.Duration StatusUpdateFrequency time.Duration + PodStatusUpdateFrequency time.Duration ManifestURL string EnableServer bool Address util.IP @@ -83,13 +84,14 @@ type KubeletServer struct { // NewKubeletServer will create a new KubeletServer with default values. func NewKubeletServer() *KubeletServer { return &KubeletServer{ - SyncFrequency: 10 * time.Second, - FileCheckFrequency: 20 * time.Second, - HTTPCheckFrequency: 20 * time.Second, - StatusUpdateFrequency: 20 * time.Second, - EnableServer: true, - Address: util.IP(net.ParseIP("127.0.0.1")), - Port: ports.KubeletPort, + SyncFrequency: 10 * time.Second, + FileCheckFrequency: 20 * time.Second, + HTTPCheckFrequency: 20 * time.Second, + StatusUpdateFrequency: 20 * time.Second, + PodStatusUpdateFrequency: 2 * time.Minute, + EnableServer: true, + Address: util.IP(net.ParseIP("127.0.0.1")), + Port: ports.KubeletPort, PodInfraContainerImage: kubelet.PodInfraContainerImage, RootDirectory: defaultRootDir, RegistryBurst: 10, @@ -110,6 +112,7 @@ func (s *KubeletServer) AddFlags(fs *pflag.FlagSet) { fs.StringVar(&s.Config, "config", s.Config, "Path to the config file or directory of files") fs.DurationVar(&s.SyncFrequency, "sync_frequency", s.SyncFrequency, "Max period between synchronizing running containers and config") fs.DurationVar(&s.StatusUpdateFrequency, "status_update_frequency", s.StatusUpdateFrequency, "Duration between posting node status to master") + fs.DurationVar(&s.PodStatusUpdateFrequency, "pod_status_update_frequency", s.PodStatusUpdateFrequency, "Duration between posting pod status updates to the master") fs.DurationVar(&s.FileCheckFrequency, "file_check_frequency", s.FileCheckFrequency, "Duration between checking config files for new data") fs.DurationVar(&s.HTTPCheckFrequency, "http_check_frequency", s.HTTPCheckFrequency, "Duration between checking http for new data") fs.StringVar(&s.ManifestURL, "manifest_url", s.ManifestURL, "URL for accessing the container manifest") @@ -176,6 +179,7 @@ func (s *KubeletServer) Run(_ []string) error { ConfigFile: s.Config, ManifestURL: s.ManifestURL, StatusUpdateFrequency: s.StatusUpdateFrequency, + PodStatusUpdateFrequency: s.PodStatusUpdateFrequency, FileCheckFrequency: s.FileCheckFrequency, HTTPCheckFrequency: s.HTTPCheckFrequency, PodInfraContainerImage: s.PodInfraContainerImage, @@ -273,21 +277,22 @@ func SimpleRunKubelet(client *client.Client, RootDirectory: rootDir, ManifestURL: manifestURL, PodInfraContainerImage: kubelet.PodInfraContainerImage, - Port: port, - Address: util.IP(net.ParseIP(address)), - EnableServer: true, - EnableDebuggingHandlers: true, - StatusUpdateFrequency: 3 * time.Second, - SyncFrequency: 3 * time.Second, - MinimumGCAge: 10 * time.Second, - MaxPerPodContainerCount: 5, - MaxContainerCount: 100, - MasterServiceNamespace: masterServiceNamespace, - VolumePlugins: volumePlugins, - TLSOptions: tlsOptions, - CadvisorInterface: cadvisorInterface, - ConfigFile: configFilePath, - ImageGCPolicy: imageGCPolicy, + Port: port, + Address: util.IP(net.ParseIP(address)), + EnableServer: true, + EnableDebuggingHandlers: true, + StatusUpdateFrequency: 3 * time.Second, + PodStatusUpdateFrequency: 2 * time.Minute, + SyncFrequency: 3 * time.Second, + MinimumGCAge: 10 * time.Second, + MaxPerPodContainerCount: 5, + MaxContainerCount: 100, + MasterServiceNamespace: masterServiceNamespace, + VolumePlugins: volumePlugins, + TLSOptions: tlsOptions, + CadvisorInterface: cadvisorInterface, + ConfigFile: configFilePath, + ImageGCPolicy: imageGCPolicy, } RunKubelet(&kcfg) } @@ -373,6 +378,7 @@ type KubeletConfig struct { ConfigFile string ManifestURL string StatusUpdateFrequency time.Duration + PodStatusUpdateFrequency time.Duration FileCheckFrequency time.Duration HTTPCheckFrequency time.Duration Hostname string @@ -435,6 +441,7 @@ func createAndInitKubelet(kc *KubeletConfig, pc *config.PodConfig) (*kubelet.Kub kc.Recorder, kc.CadvisorInterface, kc.StatusUpdateFrequency, + kc.PodStatusUpdateFrequency, kc.ImageGCPolicy) if err != nil { diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index c9ce6c376d6..e0d3735ad41 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -122,6 +122,7 @@ func NewMainKubelet( recorder record.EventRecorder, cadvisorInterface cadvisor.Interface, statusUpdateFrequency time.Duration, + podStatusUpdateFrequency time.Duration, imageGCPolicy ImageGCPolicy) (*Kubelet, error) { if rootDirectory == "" { return nil, fmt.Errorf("invalid root directory %q", rootDirectory) @@ -129,6 +130,9 @@ func NewMainKubelet( if resyncInterval <= 0 { return nil, fmt.Errorf("invalid sync frequency %d", resyncInterval) } + if podStatusUpdateFrequency <= 0 { + return nil, fmt.Errorf("invalid status update frequency %d", podStatusUpdateFrequency) + } dockerClient = metrics.NewInstrumentedDockerInterface(dockerClient) // Wait for the Docker daemon to be up (with a timeout). @@ -179,6 +183,7 @@ func NewMainKubelet( rootDirectory: rootDirectory, statusUpdateFrequency: statusUpdateFrequency, resyncInterval: resyncInterval, + podStatusUpdateFrequency: podStatusUpdateFrequency, podInfraContainerImage: podInfraContainerImage, dockerIDToRef: map[dockertools.DockerID]*api.ObjectReference{}, runner: dockertools.NewDockerContainerCommandRunner(dockerClient), @@ -232,16 +237,17 @@ type serviceLister interface { // Kubelet is the main kubelet implementation. type Kubelet struct { - hostname string - dockerClient dockertools.DockerInterface - dockerCache dockertools.DockerCache - kubeClient client.Interface - rootDirectory string - podInfraContainerImage string - podWorkers *podWorkers - statusUpdateFrequency time.Duration - resyncInterval time.Duration - sourcesReady SourcesReadyFn + hostname string + dockerClient dockertools.DockerInterface + dockerCache dockertools.DockerCache + kubeClient client.Interface + rootDirectory string + podInfraContainerImage string + podWorkers *podWorkers + statusUpdateFrequency time.Duration + resyncInterval time.Duration + podStatusUpdateFrequency time.Duration + sourcesReady SourcesReadyFn // Protects the pods array // We make complete array copies out of this while locked, which is OK because once added to this array, @@ -507,7 +513,11 @@ func (kl *Kubelet) Run(updates <-chan PodUpdate) { glog.Warning("No api server defined - no node status update will be sent.") } go kl.syncNodeStatus() - go util.Forever(kl.syncStatus, kl.resyncInterval) + + // syncStatus handles its own frequency and throttling, run it always. + go util.Forever(func() { + kl.syncStatus(kl.podStatusUpdateFrequency) + }, 0) kl.syncLoop(updates, kl) } @@ -1673,12 +1683,25 @@ func (kl *Kubelet) syncLoop(updates <-chan PodUpdate, handler SyncHandler) { } } -// syncStatus syncs pods statuses with the apiserver. -func (kl *Kubelet) syncStatus() { +// syncStatus syncs pods statuses with the apiserver. Spread the updates over the specified deadline. +func (kl *Kubelet) syncStatus(deadline time.Duration) { + start := time.Now() glog.V(3).Infof("Syncing pods status") pods, _ := kl.GetPods() + if len(pods) == 0 { + // No pods, sleep the rest of our deadline. + time.Sleep(deadline - time.Since(start)) + return + } + + // TODO(vmarmol): Enhance util.RateLimiter for our use here. + singleDeadline := time.Duration(deadline.Nanoseconds() / int64(len(pods))) + t := time.NewTicker(singleDeadline) for _, pod := range pods { + // Don't hit the api server too hard, wait for the next time slot. + <-t.C + status, err := kl.GetPodStatus(GetPodFullName(&pod), pod.UID) if err != nil { glog.Warningf("Error getting pod %q status: %v, retry later", pod.Name, err) @@ -1691,6 +1714,7 @@ func (kl *Kubelet) syncStatus() { glog.V(3).Infof("Status for pod %q updated successfully: %s", pod.Name, pod) } } + t.Stop() } // Update the Kubelet's internal pods with those provided by the update.