Merge pull request #5619 from vmarmol/rate

Spread out pod status updates to apiserver.
This commit is contained in:
Dawn Chen 2015-03-18 17:26:02 -07:00
commit 52e1ee9d5b
2 changed files with 66 additions and 35 deletions

View File

@ -51,6 +51,7 @@ type KubeletServer struct {
FileCheckFrequency time.Duration
HTTPCheckFrequency time.Duration
StatusUpdateFrequency time.Duration
PodStatusUpdateFrequency time.Duration
ManifestURL string
EnableServer bool
Address util.IP
@ -83,13 +84,14 @@ type KubeletServer struct {
// NewKubeletServer will create a new KubeletServer with default values.
func NewKubeletServer() *KubeletServer {
return &KubeletServer{
SyncFrequency: 10 * time.Second,
FileCheckFrequency: 20 * time.Second,
HTTPCheckFrequency: 20 * time.Second,
StatusUpdateFrequency: 20 * time.Second,
EnableServer: true,
Address: util.IP(net.ParseIP("127.0.0.1")),
Port: ports.KubeletPort,
SyncFrequency: 10 * time.Second,
FileCheckFrequency: 20 * time.Second,
HTTPCheckFrequency: 20 * time.Second,
StatusUpdateFrequency: 20 * time.Second,
PodStatusUpdateFrequency: 2 * time.Minute,
EnableServer: true,
Address: util.IP(net.ParseIP("127.0.0.1")),
Port: ports.KubeletPort,
PodInfraContainerImage: kubelet.PodInfraContainerImage,
RootDirectory: defaultRootDir,
RegistryBurst: 10,
@ -110,6 +112,7 @@ func (s *KubeletServer) AddFlags(fs *pflag.FlagSet) {
fs.StringVar(&s.Config, "config", s.Config, "Path to the config file or directory of files")
fs.DurationVar(&s.SyncFrequency, "sync_frequency", s.SyncFrequency, "Max period between synchronizing running containers and config")
fs.DurationVar(&s.StatusUpdateFrequency, "status_update_frequency", s.StatusUpdateFrequency, "Duration between posting node status to master")
fs.DurationVar(&s.PodStatusUpdateFrequency, "pod_status_update_frequency", s.PodStatusUpdateFrequency, "Duration between posting pod status updates to the master")
fs.DurationVar(&s.FileCheckFrequency, "file_check_frequency", s.FileCheckFrequency, "Duration between checking config files for new data")
fs.DurationVar(&s.HTTPCheckFrequency, "http_check_frequency", s.HTTPCheckFrequency, "Duration between checking http for new data")
fs.StringVar(&s.ManifestURL, "manifest_url", s.ManifestURL, "URL for accessing the container manifest")
@ -176,6 +179,7 @@ func (s *KubeletServer) Run(_ []string) error {
ConfigFile: s.Config,
ManifestURL: s.ManifestURL,
StatusUpdateFrequency: s.StatusUpdateFrequency,
PodStatusUpdateFrequency: s.PodStatusUpdateFrequency,
FileCheckFrequency: s.FileCheckFrequency,
HTTPCheckFrequency: s.HTTPCheckFrequency,
PodInfraContainerImage: s.PodInfraContainerImage,
@ -273,21 +277,22 @@ func SimpleRunKubelet(client *client.Client,
RootDirectory: rootDir,
ManifestURL: manifestURL,
PodInfraContainerImage: kubelet.PodInfraContainerImage,
Port: port,
Address: util.IP(net.ParseIP(address)),
EnableServer: true,
EnableDebuggingHandlers: true,
StatusUpdateFrequency: 3 * time.Second,
SyncFrequency: 3 * time.Second,
MinimumGCAge: 10 * time.Second,
MaxPerPodContainerCount: 5,
MaxContainerCount: 100,
MasterServiceNamespace: masterServiceNamespace,
VolumePlugins: volumePlugins,
TLSOptions: tlsOptions,
CadvisorInterface: cadvisorInterface,
ConfigFile: configFilePath,
ImageGCPolicy: imageGCPolicy,
Port: port,
Address: util.IP(net.ParseIP(address)),
EnableServer: true,
EnableDebuggingHandlers: true,
StatusUpdateFrequency: 3 * time.Second,
PodStatusUpdateFrequency: 2 * time.Minute,
SyncFrequency: 3 * time.Second,
MinimumGCAge: 10 * time.Second,
MaxPerPodContainerCount: 5,
MaxContainerCount: 100,
MasterServiceNamespace: masterServiceNamespace,
VolumePlugins: volumePlugins,
TLSOptions: tlsOptions,
CadvisorInterface: cadvisorInterface,
ConfigFile: configFilePath,
ImageGCPolicy: imageGCPolicy,
}
RunKubelet(&kcfg)
}
@ -373,6 +378,7 @@ type KubeletConfig struct {
ConfigFile string
ManifestURL string
StatusUpdateFrequency time.Duration
PodStatusUpdateFrequency time.Duration
FileCheckFrequency time.Duration
HTTPCheckFrequency time.Duration
Hostname string
@ -435,6 +441,7 @@ func createAndInitKubelet(kc *KubeletConfig, pc *config.PodConfig) (*kubelet.Kub
kc.Recorder,
kc.CadvisorInterface,
kc.StatusUpdateFrequency,
kc.PodStatusUpdateFrequency,
kc.ImageGCPolicy)
if err != nil {

View File

@ -122,6 +122,7 @@ func NewMainKubelet(
recorder record.EventRecorder,
cadvisorInterface cadvisor.Interface,
statusUpdateFrequency time.Duration,
podStatusUpdateFrequency time.Duration,
imageGCPolicy ImageGCPolicy) (*Kubelet, error) {
if rootDirectory == "" {
return nil, fmt.Errorf("invalid root directory %q", rootDirectory)
@ -129,6 +130,9 @@ func NewMainKubelet(
if resyncInterval <= 0 {
return nil, fmt.Errorf("invalid sync frequency %d", resyncInterval)
}
if podStatusUpdateFrequency <= 0 {
return nil, fmt.Errorf("invalid status update frequency %d", podStatusUpdateFrequency)
}
dockerClient = metrics.NewInstrumentedDockerInterface(dockerClient)
// Wait for the Docker daemon to be up (with a timeout).
@ -179,6 +183,7 @@ func NewMainKubelet(
rootDirectory: rootDirectory,
statusUpdateFrequency: statusUpdateFrequency,
resyncInterval: resyncInterval,
podStatusUpdateFrequency: podStatusUpdateFrequency,
podInfraContainerImage: podInfraContainerImage,
containerIDToRef: map[string]*api.ObjectReference{},
runner: dockertools.NewDockerContainerCommandRunner(dockerClient),
@ -232,16 +237,17 @@ type serviceLister interface {
// Kubelet is the main kubelet implementation.
type Kubelet struct {
hostname string
dockerClient dockertools.DockerInterface
dockerCache dockertools.DockerCache
kubeClient client.Interface
rootDirectory string
podInfraContainerImage string
podWorkers *podWorkers
statusUpdateFrequency time.Duration
resyncInterval time.Duration
sourcesReady SourcesReadyFn
hostname string
dockerClient dockertools.DockerInterface
dockerCache dockertools.DockerCache
kubeClient client.Interface
rootDirectory string
podInfraContainerImage string
podWorkers *podWorkers
statusUpdateFrequency time.Duration
resyncInterval time.Duration
podStatusUpdateFrequency time.Duration
sourcesReady SourcesReadyFn
// Protects the pods array
// We make complete array copies out of this while locked, which is OK because once added to this array,
@ -507,7 +513,11 @@ func (kl *Kubelet) Run(updates <-chan PodUpdate) {
glog.Warning("No api server defined - no node status update will be sent.")
}
go kl.syncNodeStatus()
go util.Forever(kl.syncStatus, kl.resyncInterval)
// syncStatus handles its own frequency and throttling, run it always.
go util.Forever(func() {
kl.syncStatus(kl.podStatusUpdateFrequency)
}, 0)
kl.syncLoop(updates, kl)
}
@ -1673,12 +1683,25 @@ func (kl *Kubelet) syncLoop(updates <-chan PodUpdate, handler SyncHandler) {
}
}
// syncStatus syncs pods statuses with the apiserver.
func (kl *Kubelet) syncStatus() {
// syncStatus syncs pods statuses with the apiserver. Spread the updates over the specified deadline.
func (kl *Kubelet) syncStatus(deadline time.Duration) {
start := time.Now()
glog.V(3).Infof("Syncing pods status")
pods, _ := kl.GetPods()
if len(pods) == 0 {
// No pods, sleep the rest of our deadline.
time.Sleep(deadline - time.Since(start))
return
}
// TODO(vmarmol): Enhance util.RateLimiter for our use here.
singleDeadline := time.Duration(deadline.Nanoseconds() / int64(len(pods)))
t := time.NewTicker(singleDeadline)
for _, pod := range pods {
// Don't hit the api server too hard, wait for the next time slot.
<-t.C
status, err := kl.GetPodStatus(GetPodFullName(&pod), pod.UID)
if err != nil {
glog.Warningf("Error getting pod %q status: %v, retry later", pod.Name, err)
@ -1691,6 +1714,7 @@ func (kl *Kubelet) syncStatus() {
glog.V(3).Infof("Status for pod %q updated successfully: %s", pod.Name, pod)
}
}
t.Stop()
}
// Update the Kubelet's internal pods with those provided by the update.