diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 0ed51bad2df..6c11f12a3f0 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -1725,13 +1725,41 @@ func (kl *Kubelet) getPullSecretsForPod(pod *api.Pod) ([]api.Secret, error) { return pullSecrets, nil } +// Return name of a volume. When the volume is a PersistentVolumeClaim, +// it returns name of the real PersistentVolume bound to the claim. +// It returns errror when the clam is not bound yet. +func (kl *Kubelet) resolveVolumeName(pod *api.Pod, volume *api.Volume) (string, error) { + claimSource := volume.VolumeSource.PersistentVolumeClaim + if claimSource != nil { + // resolve real volume behind the claim + claim, err := kl.kubeClient.Legacy().PersistentVolumeClaims(pod.Namespace).Get(claimSource.ClaimName) + if err != nil { + return "", fmt.Errorf("Cannot find claim %s/%s for volume %s", pod.Namespace, claimSource.ClaimName, volume.Name) + } + if claim.Status.Phase != api.ClaimBound { + return "", fmt.Errorf("Claim for volume %s/%s is not bound yet", pod.Namespace, claimSource.ClaimName) + } + // Use the real bound volume instead of PersistentVolume.Name + return claim.Spec.VolumeName, nil + } + return volume.Name, nil +} + // Stores all volumes defined by the set of pods into a map. +// It stores real volumes there, i.e. persistent volume claims are resolved +// to volumes that are bound to them. // Keys for each entry are in the format (POD_ID)/(VOLUME_NAME) -func getDesiredVolumes(pods []*api.Pod) map[string]api.Volume { +func (kl *Kubelet) getDesiredVolumes(pods []*api.Pod) map[string]api.Volume { desiredVolumes := make(map[string]api.Volume) for _, pod := range pods { for _, volume := range pod.Spec.Volumes { - identifier := path.Join(string(pod.UID), volume.Name) + volumeName, err := kl.resolveVolumeName(pod, &volume) + if err != nil { + glog.V(3).Infof("%v", err) + // Ignore the error and hope it's resolved next time + continue + } + identifier := path.Join(string(pod.UID), volumeName) desiredVolumes[identifier] = volume } } @@ -1815,8 +1843,11 @@ func (kl *Kubelet) cleanupBandwidthLimits(allPods []*api.Pod) error { // Compares the map of current volumes to the map of desired volumes. // If an active volume does not have a respective desired volume, clean it up. +// This method is blocking: +// 1) it talks to API server to find volumes bound to persistent volume claims +// 2) it talks to cloud to detach volumes func (kl *Kubelet) cleanupOrphanedVolumes(pods []*api.Pod, runningPods []*kubecontainer.Pod) error { - desiredVolumes := getDesiredVolumes(pods) + desiredVolumes := kl.getDesiredVolumes(pods) currentVolumes := kl.getPodVolumesFromDisk() runningSet := sets.String{} diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index 9fee58d8874..eccb6b959d6 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -551,6 +551,106 @@ func TestGetPodVolumesFromDisk(t *testing.T) { } } +// Test for https://github.com/kubernetes/kubernetes/pull/19600 +func TestCleanupOrphanedVolumes(t *testing.T) { + testKubelet := newTestKubelet(t) + kubelet := testKubelet.kubelet + kubeClient := testKubelet.fakeKubeClient + plug := &volume.FakeVolumePlugin{PluginName: "fake", Host: nil} + kubelet.volumePluginMgr.InitPlugins([]volume.VolumePlugin{plug}, &volumeHost{kubelet}) + + // create a volume "on disk" + volsOnDisk := []struct { + podUID types.UID + volName string + }{ + {"podUID", "myrealvol"}, + } + + pathsOnDisk := []string{} + for i := range volsOnDisk { + fv := volume.FakeVolume{PodUID: volsOnDisk[i].podUID, VolName: volsOnDisk[i].volName, Plugin: plug} + fv.SetUp(nil) + pathsOnDisk = append(pathsOnDisk, fv.GetPath()) + } + + // store the claim in fake kubelet database + claim := api.PersistentVolumeClaim{ + ObjectMeta: api.ObjectMeta{ + Name: "myclaim", + Namespace: "test", + }, + Spec: api.PersistentVolumeClaimSpec{ + VolumeName: "myrealvol", + }, + Status: api.PersistentVolumeClaimStatus{ + Phase: api.ClaimBound, + }, + } + kubeClient.ReactionChain = fake.NewSimpleClientset(&api.PersistentVolumeClaimList{Items: []api.PersistentVolumeClaim{ + claim, + }}).ReactionChain + + // Create a pod referencing the volume via a PersistentVolumeClaim + pod := api.Pod{ + ObjectMeta: api.ObjectMeta{ + UID: "podUID", + Name: "pod", + Namespace: "test", + }, + Spec: api.PodSpec{ + Volumes: []api.Volume{ + { + Name: "myvolumeclaim", + VolumeSource: api.VolumeSource{ + PersistentVolumeClaim: &api.PersistentVolumeClaimVolumeSource{ + ClaimName: "myclaim", + }, + }, + }, + }, + }, + } + + // The pod is pending and not running yet. Test that cleanupOrphanedVolumes + // won't remove the volume from disk if the volume is referenced only + // indirectly by a claim. + err := kubelet.cleanupOrphanedVolumes([]*api.Pod{&pod}, []*kubecontainer.Pod{}) + if err != nil { + t.Errorf("cleanupOrphanedVolumes failed: %v", err) + } + + volumesFound := kubelet.getPodVolumesFromDisk() + if len(volumesFound) != len(pathsOnDisk) { + t.Errorf("Expected to find %d cleaners, got %d", len(pathsOnDisk), len(volumesFound)) + } + for _, ep := range pathsOnDisk { + found := false + for _, cl := range volumesFound { + if ep == cl.GetPath() { + found = true + break + } + } + if !found { + t.Errorf("Could not find a volume with path %s", ep) + } + } + + // The pod is deleted -> kubelet should delete the volume + err = kubelet.cleanupOrphanedVolumes([]*api.Pod{}, []*kubecontainer.Pod{}) + if err != nil { + t.Errorf("cleanupOrphanedVolumes failed: %v", err) + } + volumesFound = kubelet.getPodVolumesFromDisk() + if len(volumesFound) != 0 { + t.Errorf("Expected to find 0 cleaners, got %d", len(volumesFound)) + } + for _, cl := range volumesFound { + t.Errorf("Found unexpected volume %s", cl.GetPath()) + } +} + type stubVolume struct { path string volume.MetricsNil