Garbage collecting images in the Kubelet.

Integrated the imageManager into the Kubelet and applies the garbage
collection policy every 5 minutes. The default policy allows up to 90%
disk usage, after which images are garbage collected to bring limit back
down to 80%.

Fixes #157.
This commit is contained in:
Victor Marmol 2015-03-15 21:00:46 -07:00
parent cbe4a1a679
commit d78ecf820e
3 changed files with 61 additions and 18 deletions

View File

@ -76,6 +76,8 @@ type KubeletServer struct {
ClusterDNS util.IP ClusterDNS util.IP
ReallyCrashForTesting bool ReallyCrashForTesting bool
StreamingConnectionIdleTimeout time.Duration StreamingConnectionIdleTimeout time.Duration
ImageGCHighThresholdPercent int
ImageGCLowThresholdPercent int
} }
// NewKubeletServer will create a new KubeletServer with default values. // NewKubeletServer will create a new KubeletServer with default values.
@ -98,6 +100,8 @@ func NewKubeletServer() *KubeletServer {
CadvisorPort: 4194, CadvisorPort: 4194,
OOMScoreAdj: -900, OOMScoreAdj: -900,
MasterServiceNamespace: api.NamespaceDefault, MasterServiceNamespace: api.NamespaceDefault,
ImageGCHighThresholdPercent: 90,
ImageGCLowThresholdPercent: 80,
} }
} }
@ -133,6 +137,8 @@ func (s *KubeletServer) AddFlags(fs *pflag.FlagSet) {
fs.Var(&s.ClusterDNS, "cluster_dns", "IP address for a cluster DNS server. If set, kubelet will configure all containers to use this for DNS resolution in addition to the host's DNS servers") fs.Var(&s.ClusterDNS, "cluster_dns", "IP address for a cluster DNS server. If set, kubelet will configure all containers to use this for DNS resolution in addition to the host's DNS servers")
fs.BoolVar(&s.ReallyCrashForTesting, "really_crash_for_testing", s.ReallyCrashForTesting, "If true, crash with panics more often.") fs.BoolVar(&s.ReallyCrashForTesting, "really_crash_for_testing", s.ReallyCrashForTesting, "If true, crash with panics more often.")
fs.DurationVar(&s.StreamingConnectionIdleTimeout, "streaming_connection_idle_timeout", 0, "Maximum time a streaming connection can be idle before the connection is automatically closed. Example: '5m'") fs.DurationVar(&s.StreamingConnectionIdleTimeout, "streaming_connection_idle_timeout", 0, "Maximum time a streaming connection can be idle before the connection is automatically closed. Example: '5m'")
fs.IntVar(&s.ImageGCHighThresholdPercent, "image_gc_high_threshold", s.ImageGCHighThresholdPercent, "The percent of disk usage after which image garbage collection is always run. Default: 90%%")
fs.IntVar(&s.ImageGCLowThresholdPercent, "image_gc_low_threshold", s.ImageGCLowThresholdPercent, "The percent of disk usage before which image garbage collection is never run. Lowest disk usage to garbage collect to. Default: 80%%")
} }
// Run runs the specified KubeletServer. This should never exit. // Run runs the specified KubeletServer. This should never exit.
@ -158,6 +164,10 @@ func (s *KubeletServer) Run(_ []string) error {
return err return err
} }
imageGCPolicy := kubelet.ImageGCPolicy{
HighThresholdPercent: s.ImageGCHighThresholdPercent,
LowThresholdPercent: s.ImageGCLowThresholdPercent,
}
kcfg := KubeletConfig{ kcfg := KubeletConfig{
Address: s.Address, Address: s.Address,
AllowPrivileged: s.AllowPrivileged, AllowPrivileged: s.AllowPrivileged,
@ -187,6 +197,7 @@ func (s *KubeletServer) Run(_ []string) error {
MasterServiceNamespace: s.MasterServiceNamespace, MasterServiceNamespace: s.MasterServiceNamespace,
VolumePlugins: ProbeVolumePlugins(), VolumePlugins: ProbeVolumePlugins(),
StreamingConnectionIdleTimeout: s.StreamingConnectionIdleTimeout, StreamingConnectionIdleTimeout: s.StreamingConnectionIdleTimeout,
ImageGCPolicy: imageGCPolicy,
} }
RunKubelet(&kcfg) RunKubelet(&kcfg)
@ -250,6 +261,11 @@ func SimpleRunKubelet(client *client.Client,
tlsOptions *kubelet.TLSOptions, tlsOptions *kubelet.TLSOptions,
cadvisorInterface cadvisor.Interface, cadvisorInterface cadvisor.Interface,
configFilePath string) { configFilePath string) {
imageGCPolicy := kubelet.ImageGCPolicy{
HighThresholdPercent: 90,
LowThresholdPercent: 80,
}
kcfg := KubeletConfig{ kcfg := KubeletConfig{
KubeClient: client, KubeClient: client,
DockerClient: dockerClient, DockerClient: dockerClient,
@ -271,6 +287,7 @@ func SimpleRunKubelet(client *client.Client,
TLSOptions: tlsOptions, TLSOptions: tlsOptions,
CadvisorInterface: cadvisorInterface, CadvisorInterface: cadvisorInterface,
ConfigFile: configFilePath, ConfigFile: configFilePath,
ImageGCPolicy: imageGCPolicy,
} }
RunKubelet(&kcfg) RunKubelet(&kcfg)
} }
@ -377,6 +394,7 @@ type KubeletConfig struct {
StreamingConnectionIdleTimeout time.Duration StreamingConnectionIdleTimeout time.Duration
Recorder record.EventRecorder Recorder record.EventRecorder
TLSOptions *kubelet.TLSOptions TLSOptions *kubelet.TLSOptions
ImageGCPolicy kubelet.ImageGCPolicy
} }
func createAndInitKubelet(kc *KubeletConfig, pc *config.PodConfig) (*kubelet.Kubelet, error) { func createAndInitKubelet(kc *KubeletConfig, pc *config.PodConfig) (*kubelet.Kubelet, error) {
@ -416,7 +434,8 @@ func createAndInitKubelet(kc *KubeletConfig, pc *config.PodConfig) (*kubelet.Kub
kc.StreamingConnectionIdleTimeout, kc.StreamingConnectionIdleTimeout,
kc.Recorder, kc.Recorder,
kc.CadvisorInterface, kc.CadvisorInterface,
kc.StatusUpdateFrequency) kc.StatusUpdateFrequency,
kc.ImageGCPolicy)
if err != nil { if err != nil {
return nil, err return nil, err
@ -424,7 +443,7 @@ func createAndInitKubelet(kc *KubeletConfig, pc *config.PodConfig) (*kubelet.Kub
k.BirthCry() k.BirthCry()
go k.GarbageCollectLoop() k.StartGarbageCollection()
return k, nil return k, nil
} }

View File

@ -179,11 +179,18 @@ func (self *realImageManager) GarbageCollect() error {
} }
usage := int64(fsInfo.Usage) usage := int64(fsInfo.Usage)
capacity := int64(fsInfo.Capacity) capacity := int64(fsInfo.Capacity)
usagePercent := int(usage * 100 / capacity)
// Check valid capacity.
if capacity == 0 {
// TODO(vmarmol): Surface event.
return fmt.Errorf("invalid capacity %d on device %q at mount point %q", capacity, fsInfo.Device, fsInfo.Mountpoint)
}
// If over the max threshold, free enough to place us at the lower threshold. // If over the max threshold, free enough to place us at the lower threshold.
usagePercent := int(usage * 100 / capacity)
if usagePercent >= self.policy.HighThresholdPercent { if usagePercent >= self.policy.HighThresholdPercent {
amountToFree := usage - (int64(self.policy.LowThresholdPercent) * capacity / 100) amountToFree := usage - (int64(self.policy.LowThresholdPercent) * capacity / 100)
glog.Infof("[ImageManager]: Disk usage on %q (%s) is at %d%% which is over the high threshold (%d%%). Trying to free %d bytes", fsInfo.Device, fsInfo.Mountpoint, usagePercent, self.policy.HighThresholdPercent, amountToFree)
freed, err := self.freeSpace(amountToFree) freed, err := self.freeSpace(amountToFree)
if err != nil { if err != nil {
return err return err
@ -234,6 +241,7 @@ func (self *realImageManager) freeSpace(bytesToFree int64) (int64, error) {
} }
// Remove image. Continue despite errors. // Remove image. Continue despite errors.
glog.Infof("[ImageManager]: Removing image %q to free %d bytes", image.id, image.size)
err := self.dockerClient.RemoveImage(image.id) err := self.dockerClient.RemoveImage(image.id)
if err != nil { if err != nil {
lastErr = err lastErr = err

View File

@ -121,7 +121,8 @@ func NewMainKubelet(
streamingConnectionIdleTimeout time.Duration, streamingConnectionIdleTimeout time.Duration,
recorder record.EventRecorder, recorder record.EventRecorder,
cadvisorInterface cadvisor.Interface, cadvisorInterface cadvisor.Interface,
statusUpdateFrequency time.Duration) (*Kubelet, error) { statusUpdateFrequency time.Duration,
imageGCPolicy ImageGCPolicy) (*Kubelet, error) {
if rootDirectory == "" { if rootDirectory == "" {
return nil, fmt.Errorf("invalid root directory %q", rootDirectory) return nil, fmt.Errorf("invalid root directory %q", rootDirectory)
} }
@ -166,6 +167,10 @@ func NewMainKubelet(
if err != nil { if err != nil {
return nil, err return nil, err
} }
imageManager, err := newImageManager(dockerClient, cadvisorInterface, imageGCPolicy)
if err != nil {
return nil, fmt.Errorf("failed to initialize image manager: %v", err)
}
klet := &Kubelet{ klet := &Kubelet{
hostname: hostname, hostname: hostname,
@ -191,6 +196,7 @@ func NewMainKubelet(
recorder: recorder, recorder: recorder,
cadvisor: cadvisorInterface, cadvisor: cadvisorInterface,
containerGC: containerGC, containerGC: containerGC,
imageManager: imageManager,
} }
dockerCache, err := dockertools.NewDockerCache(dockerClient) dockerCache, err := dockertools.NewDockerCache(dockerClient)
@ -302,6 +308,9 @@ type Kubelet struct {
// Policy for handling garbage collection of dead containers. // Policy for handling garbage collection of dead containers.
containerGC containerGC containerGC containerGC
// Manager for images.
imageManager imageManager
} }
// getRootDir returns the full path to the directory under which kubelet can // getRootDir returns the full path to the directory under which kubelet can
@ -443,12 +452,19 @@ func (kl *Kubelet) listPodsFromDisk() ([]types.UID, error) {
return pods, nil return pods, nil
} }
func (kl *Kubelet) GarbageCollectLoop() { // Starts garbage collection theads.
util.Forever(func() { func (kl *Kubelet) StartGarbageCollection() {
go util.Forever(func() {
if err := kl.containerGC.GarbageCollect(); err != nil { if err := kl.containerGC.GarbageCollect(); err != nil {
glog.Errorf("Container garbage collect failed: %v", err) glog.Errorf("Container garbage collection failed: %v", err)
} }
}, time.Minute*1) }, time.Minute)
go util.Forever(func() {
if err := kl.imageManager.GarbageCollect(); err != nil {
glog.Errorf("Image garbage collection failed: %v", err)
}
}, 5*time.Minute)
} }
func (kl *Kubelet) getPodStatusFromCache(podFullName string) (api.PodStatus, bool) { func (kl *Kubelet) getPodStatusFromCache(podFullName string) (api.PodStatus, bool) {