diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 6729fb15afc..3cf1e356e5a 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -21,6 +21,7 @@ import ( "errors" "fmt" "net/http" + "path" "strconv" "strings" "sync" @@ -257,7 +258,7 @@ func milliCPUToShares(milliCPU int) int { func (kl *Kubelet) mountExternalVolumes(manifest *api.ContainerManifest) (volumeMap, error) { podVolumes := make(volumeMap) for _, vol := range manifest.Volumes { - extVolume, err := volume.CreateVolume(&vol, manifest.ID, kl.rootDirectory) + extVolume, err := volume.CreateVolumeBuilder(&vol, manifest.ID, kl.rootDirectory) if err != nil { return nil, err } @@ -449,6 +450,39 @@ type podContainer struct { containerName string } +// Stores all volumes defined by the set of pods into a map. +// Keys for each entry are in the format (POD_ID)/(VOLUME_NAME) +func getDesiredVolumes(pods []Pod) map[string]api.Volume { + desiredVolumes := make(map[string]api.Volume) + for _, pod := range pods { + for _, volume := range pod.Manifest.Volumes { + identifier := path.Join(pod.Manifest.ID, volume.Name) + desiredVolumes[identifier] = volume + } + } + return desiredVolumes +} + +// Compares the map of current volumes to the map of desired volumes. +// If an active volume does not have a respective desired volume, clean it up. +func (kl *Kubelet) reconcileVolumes(pods []Pod) error { + desiredVolumes := getDesiredVolumes(pods) + currentVolumes := volume.GetCurrentVolumes(kl.rootDirectory) + for name, vol := range currentVolumes { + if _, ok := desiredVolumes[name]; !ok { + //TODO (jonesdl) We should somehow differentiate between volumes that are supposed + //to be deleted and volumes that are leftover after a crash. + glog.Infof("Orphaned volume %s found, tearing down volume", name) + //TODO (jonesdl) This should not block other kubelet synchronization procedures + err := vol.TearDown() + if err != nil { + glog.Infof("Could not tear down volume %s (%s)", name, err) + } + } + } + return nil +} + // SyncPods synchronizes the configured list of pods (desired state) with the host current state. func (kl *Kubelet) SyncPods(pods []Pod) error { glog.Infof("Desired [%s]: %+v", kl.hostname, pods) @@ -497,6 +531,10 @@ func (kl *Kubelet) SyncPods(pods []Pod) error { } } } + + // Remove any orphaned volumes. + kl.reconcileVolumes(pods) + return err } diff --git a/pkg/volume/volume.go b/pkg/volume/volume.go index 0675d33160a..d2bd1ebf7ea 100644 --- a/pkg/volume/volume.go +++ b/pkg/volume/volume.go @@ -18,23 +18,33 @@ package volume import ( "errors" + "io/ioutil" "os" "path" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/golang/glog" ) var ErrUnsupportedVolumeType = errors.New("unsupported volume type") -// Interface is a directory used by pods or hosts. Interface implementations -// must be idempotent. +// Interface is a directory used by pods or hosts. +// All method implementations of methods in the volume interface must be idempotent type Interface interface { - // SetUp prepares and mounts/unpacks the volume to a directory path. - SetUp() error - // GetPath returns the directory path the volume is mounted to. GetPath() string +} +// The Builder interface provides the method to set up/mount the volume. +type Builder interface { + // Uses Interface to provide the path for Docker binds. + Interface + // SetUp prepares and mounts/unpacks the volume to a directory path. + SetUp() error +} + +// The Cleaner interface provides the method to cleanup/unmount the volumes. +type Cleaner interface { // TearDown unmounts the volume and removes traces of the SetUp procedure. TearDown() error } @@ -51,10 +61,6 @@ func (hostVol *HostDirectory) SetUp() error { return nil } -func (hostVol *HostDirectory) TearDown() error { - return nil -} - func (hostVol *HostDirectory) GetPath() string { return hostVol.Path } @@ -77,16 +83,36 @@ func (emptyDir *EmptyDirectory) SetUp() error { return nil } -// TODO(jonesdl) when we can properly invoke TearDown(), we should delete -// the directory created by SetUp. -func (emptyDir *EmptyDirectory) TearDown() error { - return nil -} - func (emptyDir *EmptyDirectory) GetPath() string { return path.Join(emptyDir.RootDir, emptyDir.PodID, "volumes", "empty", emptyDir.Name) } +func (emptyDir *EmptyDirectory) renameDirectory() (string, error) { + oldPath := emptyDir.GetPath() + newPath, err := ioutil.TempDir(path.Dir(oldPath), emptyDir.Name+".deleting~") + if err != nil { + return "", err + } + err = os.Rename(oldPath, newPath) + if err != nil { + return "", err + } + return newPath, nil +} + +// Simply delete everything in the directory. +func (emptyDir *EmptyDirectory) TearDown() error { + tmpDir, err := emptyDir.renameDirectory() + if err != nil { + return err + } + err = os.RemoveAll(tmpDir) + if err != nil { + return err + } + return nil +} + // Interprets API volume as a HostDirectory func createHostDirectory(volume *api.Volume) *HostDirectory { return &HostDirectory{volume.Source.HostDirectory.Path} @@ -97,16 +123,16 @@ func createEmptyDirectory(volume *api.Volume, podID string, rootDir string) *Emp return &EmptyDirectory{volume.Name, podID, rootDir} } -// CreateVolume returns an Interface capable of mounting a volume described by an -// *api.Volume and whether or not it is mounted, or an error. -func CreateVolume(volume *api.Volume, podID string, rootDir string) (Interface, error) { +// CreateVolumeBuilder returns a Builder capable of mounting a volume described by an +// *api.Volume, or an error. +func CreateVolumeBuilder(volume *api.Volume, podID string, rootDir string) (Builder, error) { source := volume.Source // TODO(jonesdl) We will want to throw an error here when we no longer // support the default behavior. if source == nil { return nil, nil } - var vol Interface + var vol Builder // TODO(jonesdl) We should probably not check every pointer and directly // resolve these types instead. if source.HostDirectory != nil { @@ -118,3 +144,54 @@ func CreateVolume(volume *api.Volume, podID string, rootDir string) (Interface, } return vol, nil } + +// CreateVolumeCleaner returns a Cleaner capable of tearing down a volume. +func CreateVolumeCleaner(kind string, name string, podID string, rootDir string) (Cleaner, error) { + switch kind { + case "empty": + return &EmptyDirectory{name, podID, rootDir}, nil + default: + return nil, ErrUnsupportedVolumeType + } +} + +// Examines directory structure to determine volumes that are presently +// active and mounted. Returns a map of Cleaner types. +func GetCurrentVolumes(rootDirectory string) map[string]Cleaner { + currentVolumes := make(map[string]Cleaner) + mountPath := rootDirectory + podIDDirs, err := ioutil.ReadDir(mountPath) + if err != nil { + glog.Errorf("Could not read directory: %s, (%s)", mountPath, err) + } + // Volume information is extracted from the directory structure: + // (ROOT_DIR)/(POD_ID)/volumes/(VOLUME_KIND)/(VOLUME_NAME) + for _, podIDDir := range podIDDirs { + podID := podIDDir.Name() + podIDPath := path.Join(mountPath, podID, "volumes") + volumeKindDirs, err := ioutil.ReadDir(podIDPath) + if err != nil { + glog.Errorf("Could not read directory: %s, (%s)", podIDPath, err) + } + for _, volumeKindDir := range volumeKindDirs { + volumeKind := volumeKindDir.Name() + volumeKindPath := path.Join(podIDPath, volumeKind) + volumeNameDirs, err := ioutil.ReadDir(volumeKindPath) + if err != nil { + glog.Errorf("Could not read directory: %s, (%s)", volumeKindPath, err) + } + for _, volumeNameDir := range volumeNameDirs { + volumeName := volumeNameDir.Name() + identifier := path.Join(podID, volumeName) + // TODO(thockin) This should instead return a reference to an extant volume object + cleaner, err := CreateVolumeCleaner(volumeKind, volumeName, podID, rootDirectory) + if err != nil { + glog.Errorf("Could not create volume cleaner: %s, (%s)", volumeNameDirs, err) + continue + } + currentVolumes[identifier] = cleaner + } + } + } + return currentVolumes +} diff --git a/pkg/volume/volume_test.go b/pkg/volume/volume_test.go index b7734fa97d4..845f2afa08a 100644 --- a/pkg/volume/volume_test.go +++ b/pkg/volume/volume_test.go @@ -25,7 +25,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" ) -func TestCreateVolumes(t *testing.T) { +func TestCreateVolumeBuilders(t *testing.T) { tempDir, err := ioutil.TempDir("", "CreateVolumes") if err != nil { t.Errorf("Unexpected error: %v", err) @@ -35,6 +35,7 @@ func TestCreateVolumes(t *testing.T) { volume api.Volume path string podID string + kind string }{ { api.Volume{ @@ -45,6 +46,7 @@ func TestCreateVolumes(t *testing.T) { }, "/dir/path", "my-id", + "", }, { api.Volume{ @@ -55,8 +57,9 @@ func TestCreateVolumes(t *testing.T) { }, path.Join(tempDir, "/my-id/volumes/empty/empty-dir"), "my-id", + "empty", }, - {api.Volume{}, "", ""}, + {api.Volume{}, "", "", ""}, { api.Volume{ Name: "empty-dir", @@ -64,13 +67,14 @@ func TestCreateVolumes(t *testing.T) { }, "", "", + "", }, } for _, createVolumesTest := range createVolumesTests { tt := createVolumesTest - v, err := CreateVolume(&tt.volume, tt.podID, tempDir) + vb, err := CreateVolumeBuilder(&tt.volume, tt.podID, tempDir) if tt.volume.Source == nil { - if v != nil { + if vb != nil { t.Errorf("Expected volume to be nil") } continue @@ -84,17 +88,56 @@ func TestCreateVolumes(t *testing.T) { if err != nil { t.Errorf("Unexpected error: %v", err) } - err = v.SetUp() + err = vb.SetUp() if err != nil { t.Errorf("Unexpected error: %v", err) } - path := v.GetPath() + path := vb.GetPath() if path != tt.path { t.Errorf("Unexpected bind path. Expected %v, got %v", tt.path, path) } - err = v.TearDown() + vc, err := CreateVolumeCleaner(tt.kind, tt.volume.Name, tt.podID, tempDir) + if tt.kind == "" { + if err != ErrUnsupportedVolumeType { + t.Errorf("Unexpected error: %v", err) + } + continue + } + err = vc.TearDown() if err != nil { t.Errorf("Unexpected error: %v", err) } + if _, err := os.Stat(path); !os.IsNotExist(err) { + t.Errorf("TearDown() failed, original volume path not properly removed: %v", path) + } + } +} + +func TestGetActiveVolumes(t *testing.T) { + tempDir, err := ioutil.TempDir("", "CreateVolumes") + if err != nil { + t.Errorf("Unexpected error: %v", err) + } + defer os.RemoveAll(tempDir) + getActiveVolumesTests := []struct { + name string + podID string + kind string + identifier string + }{ + {"fakeName", "fakeID", "empty", "fakeID/fakeName"}, + {"fakeName2", "fakeID2", "empty", "fakeID2/fakeName2"}, + } + expectedIdentifiers := []string{} + for _, test := range getActiveVolumesTests { + volumeDir := path.Join(tempDir, test.podID, "volumes", test.kind, test.name) + os.MkdirAll(volumeDir, 0750) + expectedIdentifiers = append(expectedIdentifiers, test.identifier) + } + volumeMap := GetCurrentVolumes(tempDir) + for _, name := range expectedIdentifiers { + if _, ok := volumeMap[name]; !ok { + t.Errorf("Expected volume map entry not found: %v", name) + } } }