diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 9929ba7c2e0..a65fb55045c 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -39,6 +39,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools" "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util/errors" "github.com/GoogleCloudPlatform/kubernetes/pkg/volume" "github.com/fsouza/go-dockerclient" "github.com/golang/glog" @@ -76,13 +77,16 @@ func NewMainKubelet( sourceReady SourceReadyFn, clusterDomain string, clusterDNS net.IP) (*Kubelet, error) { + if rootDirectory == "" { + return nil, fmt.Errorf("invalid root directory %q", rootDirectory) + } if resyncInterval <= 0 { return nil, fmt.Errorf("invalid sync frequency %d", resyncInterval) } if minimumGCAge <= 0 { return nil, fmt.Errorf("invalid minimum GC age %d", minimumGCAge) } - return &Kubelet{ + klet := &Kubelet{ hostname: hostname, dockerClient: dockerClient, etcdClient: etcdClient, @@ -100,7 +104,13 @@ func NewMainKubelet( sourceReady: sourceReady, clusterDomain: clusterDomain, clusterDNS: clusterDNS, - }, nil + } + + if err := klet.setupDataDirs(); err != nil { + return nil, err + } + + return klet, nil } type httpGetter interface { @@ -234,6 +244,32 @@ func dirExists(path string) bool { return s.IsDir() } +func (kl *Kubelet) setupDataDirs() error { + kl.rootDirectory = path.Clean(kl.rootDirectory) + if err := os.MkdirAll(kl.GetRootDir(), 0750); err != nil { + return fmt.Errorf("error creating root directory: %v", err) + } + if err := os.MkdirAll(kl.GetPodsDir(), 0750); err != nil { + return fmt.Errorf("error creating pods directory: %v", err) + } + return nil +} + +// Get a list of pods that have data directories. +func (kl *Kubelet) listPodsFromDisk() ([]string, error) { + podInfos, err := ioutil.ReadDir(kl.GetPodsDir()) + if err != nil { + return nil, err + } + pods := []string{} + for i := range podInfos { + if podInfos[i].IsDir() { + pods = append(pods, podInfos[i].Name()) + } + } + return pods, nil +} + type ByCreated []*docker.Container func (a ByCreated) Len() int { return len(a) } @@ -834,6 +870,14 @@ func (kl *Kubelet) syncPod(pod *api.BoundPod, dockerContainers dockertools.Docke killedContainers := make(map[dockertools.DockerID]empty) glog.V(4).Infof("Syncing Pod, podFullName: %q, uuid: %q", podFullName, uuid) + // Make data dirs. + if err := os.Mkdir(kl.GetPodDir(uuid), 0750); err != nil && !os.IsExist(err) { + return err + } + if err := os.Mkdir(kl.GetPodVolumesDir(uuid), 0750); err != nil && !os.IsExist(err) { + return err + } + // Make sure we have a network container var netID dockertools.DockerID if netDockerContainer, found, _ := dockerContainers.FindPodContainer(podFullName, uuid, networkContainerName); found { @@ -1008,9 +1052,30 @@ func getDesiredVolumes(pods []api.BoundPod) map[string]api.Volume { return desiredVolumes } +func (kl *Kubelet) cleanupOrphanedPods(pods []api.BoundPod) error { + desired := util.NewStringSet() + for i := range pods { + desired.Insert(pods[i].UID) + } + found, err := kl.listPodsFromDisk() + if err != nil { + return err + } + errlist := []error{} + for i := range found { + if !desired.Has(found[i]) { + glog.V(3).Infof("Orphaned pod %q found, removing", found[i]) + if err := os.RemoveAll(kl.GetPodDir(found[i])); err != nil { + errlist = append(errlist, err) + } + } + } + return errors.NewAggregate(errlist) +} + // Compares the map of current volumes to the map of desired volumes. // If an active volume does not have a respective desired volume, clean it up. -func (kl *Kubelet) reconcileVolumes(pods []api.BoundPod) error { +func (kl *Kubelet) cleanupOrphanedVolumes(pods []api.BoundPod) error { desiredVolumes := getDesiredVolumes(pods) currentVolumes := volume.GetCurrentVolumes(kl.rootDirectory) for name, vol := range currentVolumes { @@ -1087,8 +1152,17 @@ func (kl *Kubelet) SyncPods(pods []api.BoundPod) error { } } + // Remove any orphaned pods. + err = kl.cleanupOrphanedPods(pods) + if err != nil { + return err + } + // Remove any orphaned volumes. - kl.reconcileVolumes(pods) + err = kl.cleanupOrphanedVolumes(pods) + if err != nil { + return err + } return err } diff --git a/pkg/kubelet/runonce_test.go b/pkg/kubelet/runonce_test.go index e747b214778..8a1f9af9b75 100644 --- a/pkg/kubelet/runonce_test.go +++ b/pkg/kubelet/runonce_test.go @@ -66,7 +66,12 @@ func (d *testDocker) InspectContainer(id string) (*docker.Container, error) { } func TestRunOnce(t *testing.T) { - kb := &Kubelet{} + kb := &Kubelet{ + rootDirectory: "/tmp/kubelet", + } + if err := kb.setupDataDirs(); err != nil { + t.Errorf("Failed to init data dirs: %v", err) + } podContainers := []docker.APIContainers{ { Names: []string{"/k8s_bar." + strconv.FormatUint(dockertools.HashContainer(&api.Container{Name: "bar"}), 16) + "_foo.new.test_12345678_42"}, diff --git a/pkg/kubelet/util.go b/pkg/kubelet/util.go index 589bebf097f..117969481bc 100644 --- a/pkg/kubelet/util.go +++ b/pkg/kubelet/util.go @@ -18,8 +18,6 @@ package kubelet import ( "net/http" - "os" - "path" "strconv" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" @@ -70,17 +68,6 @@ func EtcdClientOrDie(etcdServerList util.StringList, etcdConfigFile string) *etc return nil } -// TODO: move this into pkg/util -func SetupRootDirectoryOrDie(rootDirectory string) { - if rootDirectory == "" { - glog.Fatal("Invalid root directory path.") - } - rootDirectory = path.Clean(rootDirectory) - if err := os.MkdirAll(rootDirectory, 0750); err != nil { - glog.Fatalf("Error creating root directory: %v", err) - } -} - // TODO: move this into pkg/capabilities func SetupCapabilities(allowPrivileged bool) { capabilities.Initialize(capabilities.Capabilities{ diff --git a/pkg/standalone/standalone.go b/pkg/standalone/standalone.go index 012de76669e..d32fbb4821b 100644 --- a/pkg/standalone/standalone.go +++ b/pkg/standalone/standalone.go @@ -179,9 +179,6 @@ func RunKubelet(kcfg *KubeletConfig) { kubelet.SetupCapabilities(kcfg.AllowPrivileged) kcfg.Hostname = util.GetHostname(kcfg.HostnameOverride) - if len(kcfg.RootDirectory) > 0 { - kubelet.SetupRootDirectoryOrDie(kcfg.RootDirectory) - } cfg := makePodSourceConfig(kcfg) k, err := createAndInitKubelet(kcfg, cfg)