Merge pull request #3260 from commonlisp/master

NewMainKubelet validates SyncFrequency, minimum GC age, etc.
This commit is contained in:
Tim Hockin 2015-01-08 08:20:55 -08:00
commit eac982137f
3 changed files with 37 additions and 21 deletions

View File

@ -57,7 +57,7 @@ var (
registryBurst = flag.Int("registry_burst", 10, "Maximum size of a bursty pulls, temporarily allows pulls to burst to this number, while still not exceeding registry_qps. Only used if --registry_qps > 0") registryBurst = flag.Int("registry_burst", 10, "Maximum size of a bursty pulls, temporarily allows pulls to burst to this number, while still not exceeding registry_qps. Only used if --registry_qps > 0")
runonce = flag.Bool("runonce", false, "If true, exit after spawning pods from local manifests or remote urls. Exclusive with --etcd_servers and --enable-server") runonce = flag.Bool("runonce", false, "If true, exit after spawning pods from local manifests or remote urls. Exclusive with --etcd_servers and --enable-server")
enableDebuggingHandlers = flag.Bool("enable_debugging_handlers", true, "Enables server endpoints for log collection and local running of containers and commands") enableDebuggingHandlers = flag.Bool("enable_debugging_handlers", true, "Enables server endpoints for log collection and local running of containers and commands")
minimumGCAge = flag.Duration("minimum_container_ttl_duration", 0, "Minimum age for a finished container before it is garbage collected. Examples: '300ms', '10s' or '2h45m'") minimumGCAge = flag.Duration("minimum_container_ttl_duration", 1*time.Minute, "Minimum age for a finished container before it is garbage collected. Examples: '300ms', '10s' or '2h45m'")
maxContainerCount = flag.Int("maximum_dead_containers_per_container", 5, "Maximum number of old instances of a container to retain per container. Each container takes up some disk space. Default: 5.") maxContainerCount = flag.Int("maximum_dead_containers_per_container", 5, "Maximum number of old instances of a container to retain per container. Each container takes up some disk space. Default: 5.")
authPath = flag.String("auth_path", "", "Path to .kubernetes_auth file, specifying how to authenticate to API server.") authPath = flag.String("auth_path", "", "Path to .kubernetes_auth file, specifying how to authenticate to API server.")
cAdvisorPort = flag.Uint("cadvisor_port", 4194, "The port of the localhost cAdvisor endpoint") cAdvisorPort = flag.Uint("cadvisor_port", 4194, "The port of the localhost cAdvisor endpoint")

View File

@ -61,29 +61,35 @@ type volumeMap map[string]volume.Interface
// New creates a new Kubelet for use in main // New creates a new Kubelet for use in main
func NewMainKubelet( func NewMainKubelet(
hn string, hostname string,
dc dockertools.DockerInterface, dockerClient dockertools.DockerInterface,
ec tools.EtcdClient, etcdClient tools.EtcdClient,
rd string, rootDirectory string,
ni string, networkContainerImage string,
ri time.Duration, resyncInterval time.Duration,
pullQPS float32, pullQPS float32,
pullBurst int, pullBurst int,
minimumGCAge time.Duration, minimumGCAge time.Duration,
maxContainerCount int, maxContainerCount int,
sourcesReady SourcesReadyFn, sourcesReady SourcesReadyFn,
clusterDomain string, clusterDomain string,
clusterDNS net.IP) *Kubelet { clusterDNS net.IP) (*Kubelet, error) {
if resyncInterval <= 0 {
return nil, fmt.Errorf("invalid sync frequency %d", resyncInterval)
}
if minimumGCAge <= 0 {
return nil, fmt.Errorf("invalid minimum GC age %d", minimumGCAge)
}
return &Kubelet{ return &Kubelet{
hostname: hn, hostname: hostname,
dockerClient: dc, dockerClient: dockerClient,
etcdClient: ec, etcdClient: etcdClient,
rootDirectory: rd, rootDirectory: rootDirectory,
resyncInterval: ri, resyncInterval: resyncInterval,
networkContainerImage: ni, networkContainerImage: networkContainerImage,
podWorkers: newPodWorkers(), podWorkers: newPodWorkers(),
dockerIDToRef: map[dockertools.DockerID]*api.ObjectReference{}, dockerIDToRef: map[dockertools.DockerID]*api.ObjectReference{},
runner: dockertools.NewDockerContainerCommandRunner(dc), runner: dockertools.NewDockerContainerCommandRunner(dockerClient),
httpClient: &http.Client{}, httpClient: &http.Client{},
pullQPS: pullQPS, pullQPS: pullQPS,
pullBurst: pullBurst, pullBurst: pullBurst,
@ -92,7 +98,7 @@ func NewMainKubelet(
sourcesReady: sourcesReady, sourcesReady: sourcesReady,
clusterDomain: clusterDomain, clusterDomain: clusterDomain,
clusterDNS: clusterDNS, clusterDNS: clusterDNS,
} }, nil
} }
type httpGetter interface { type httpGetter interface {
@ -202,7 +208,7 @@ func (kl *Kubelet) purgeOldest(ids []string) error {
if err != nil { if err != nil {
return err return err
} }
if !data.State.Running && (kl.minimumGCAge == 0 || time.Now().Sub(data.State.FinishedAt) > kl.minimumGCAge) { if !data.State.Running && (time.Now().Sub(data.State.FinishedAt) > kl.minimumGCAge) {
dockerData = append(dockerData, data) dockerData = append(dockerData, data)
} }
} }

View File

@ -156,6 +156,8 @@ func SimpleRunKubelet(client *client.Client, etcdClient tools.EtcdClient, docker
EnableServer: true, EnableServer: true,
EnableDebuggingHandlers: true, EnableDebuggingHandlers: true,
SyncFrequency: 3 * time.Second, SyncFrequency: 3 * time.Second,
MinimumGCAge: 10 * time.Second,
MaxContainerCount: 5,
} }
RunKubelet(&kcfg) RunKubelet(&kcfg)
} }
@ -180,7 +182,11 @@ func RunKubelet(kcfg *KubeletConfig) {
} }
cfg := makePodSourceConfig(kcfg) cfg := makePodSourceConfig(kcfg)
k := createAndInitKubelet(kcfg, cfg) k, err := createAndInitKubelet(kcfg, cfg)
if err != nil {
glog.Errorf("Failed to create kubelet: %s", err)
return
}
// process pods and exit. // process pods and exit.
if kcfg.Runonce { if kcfg.Runonce {
if _, err := k.RunOnce(cfg.Updates()); err != nil { if _, err := k.RunOnce(cfg.Updates()); err != nil {
@ -254,11 +260,11 @@ type KubeletConfig struct {
Runonce bool Runonce bool
} }
func createAndInitKubelet(kc *KubeletConfig, pc *config.PodConfig) *kubelet.Kubelet { func createAndInitKubelet(kc *KubeletConfig, pc *config.PodConfig) (*kubelet.Kubelet, error) {
// TODO: block until all sources have delivered at least one update to the channel, or break the sync loop // TODO: block until all sources have delivered at least one update to the channel, or break the sync loop
// up into "per source" synchronizations // up into "per source" synchronizations
k := kubelet.NewMainKubelet( k, err := kubelet.NewMainKubelet(
kc.Hostname, kc.Hostname,
kc.DockerClient, kc.DockerClient,
kc.EtcdClient, kc.EtcdClient,
@ -273,11 +279,15 @@ func createAndInitKubelet(kc *KubeletConfig, pc *config.PodConfig) *kubelet.Kube
kc.ClusterDomain, kc.ClusterDomain,
net.IP(kc.ClusterDNS)) net.IP(kc.ClusterDNS))
if err != nil {
return nil, err
}
k.BirthCry() k.BirthCry()
go k.GarbageCollectLoop() go k.GarbageCollectLoop()
go kubelet.MonitorCAdvisor(k, kc.CAdvisorPort) go kubelet.MonitorCAdvisor(k, kc.CAdvisorPort)
kubelet.InitHealthChecking(k) kubelet.InitHealthChecking(k)
return k return k, nil
} }