diff --git a/pkg/kubelet/volumemanager/cache/actual_state_of_world.go b/pkg/kubelet/volumemanager/cache/actual_state_of_world.go index a7b06af94ea..43a9b0f8cdf 100644 --- a/pkg/kubelet/volumemanager/cache/actual_state_of_world.go +++ b/pkg/kubelet/volumemanager/cache/actual_state_of_world.go @@ -154,6 +154,11 @@ type ActualStateOfWorld interface { // mounted for the specified pod as requiring file system resize (if the plugin for the // volume indicates it requires file system resize). MarkFSResizeRequired(volumeName v1.UniqueVolumeName, podName volumetypes.UniquePodName) + + // GetAttachedVolumes returns a list of volumes that is known to be attached + // to the node. This list can be used to determine volumes that are either in-use + // or have a mount/unmount operation pending. + GetAttachedVolumes() []AttachedVolume } // MountedVolume represents a volume that has successfully been mounted to a pod. @@ -710,6 +715,20 @@ func (asw *actualStateOfWorld) GetGloballyMountedVolumes() []AttachedVolume { return globallyMountedVolumes } +func (asw *actualStateOfWorld) GetAttachedVolumes() []AttachedVolume { + asw.RLock() + defer asw.RUnlock() + allAttachedVolumes := make( + []AttachedVolume, 0 /* len */, len(asw.attachedVolumes) /* cap */) + for _, volumeObj := range asw.attachedVolumes { + allAttachedVolumes = append( + allAttachedVolumes, + asw.newAttachedVolume(&volumeObj)) + } + + return allAttachedVolumes +} + func (asw *actualStateOfWorld) GetUnmountedVolumes() []AttachedVolume { asw.RLock() defer asw.RUnlock() diff --git a/pkg/kubelet/volumemanager/volume_manager.go b/pkg/kubelet/volumemanager/volume_manager.go index ba3d99d64c8..561733d592c 100644 --- a/pkg/kubelet/volumemanager/volume_manager.go +++ b/pkg/kubelet/volumemanager/volume_manager.go @@ -295,9 +295,9 @@ func (vm *volumeManager) GetVolumesInUse() []v1.UniqueVolumeName { // that volumes are marked in use as soon as the decision is made that the // volume *should* be attached to this node until it is safely unmounted. desiredVolumes := vm.desiredStateOfWorld.GetVolumesToMount() - mountedVolumes := vm.actualStateOfWorld.GetGloballyMountedVolumes() - volumesToReportInUse := make([]v1.UniqueVolumeName, 0, len(desiredVolumes)+len(mountedVolumes)) - desiredVolumesMap := make(map[v1.UniqueVolumeName]bool, len(desiredVolumes)+len(mountedVolumes)) + allAttachedVolumes := vm.actualStateOfWorld.GetAttachedVolumes() + volumesToReportInUse := make([]v1.UniqueVolumeName, 0, len(desiredVolumes)+len(allAttachedVolumes)) + desiredVolumesMap := make(map[v1.UniqueVolumeName]bool, len(desiredVolumes)+len(allAttachedVolumes)) for _, volume := range desiredVolumes { if volume.PluginIsAttachable { @@ -308,7 +308,7 @@ func (vm *volumeManager) GetVolumesInUse() []v1.UniqueVolumeName { } } - for _, volume := range mountedVolumes { + for _, volume := range allAttachedVolumes { if volume.PluginIsAttachable { if _, exists := desiredVolumesMap[volume.VolumeName]; !exists { volumesToReportInUse = append(volumesToReportInUse, volume.VolumeName) diff --git a/test/e2e/storage/BUILD b/test/e2e/storage/BUILD index 2d31246c771..118c3a259ba 100644 --- a/test/e2e/storage/BUILD +++ b/test/e2e/storage/BUILD @@ -4,6 +4,7 @@ go_library( name = "go_default_library", srcs = [ "csi_volumes.go", + "detach_mounted.go", "empty_dir_wrapper.go", "ephemeral_volume.go", "flexvolume.go", diff --git a/test/e2e/storage/detach_mounted.go b/test/e2e/storage/detach_mounted.go new file mode 100644 index 00000000000..270d93fd1f2 --- /dev/null +++ b/test/e2e/storage/detach_mounted.go @@ -0,0 +1,204 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package storage + +import ( + "fmt" + "math/rand" + "path" + + "time" + + "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" + clientset "k8s.io/client-go/kubernetes" + "k8s.io/kubernetes/test/e2e/framework" + "k8s.io/kubernetes/test/e2e/storage/utils" + imageutils "k8s.io/kubernetes/test/utils/image" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" +) + +var ( + BusyBoxImage = imageutils.GetE2EImage(imageutils.BusyBox) + durationForStuckMount = 110 * time.Second +) + +var _ = utils.SIGDescribe("Detaching volumes", func() { + f := framework.NewDefaultFramework("flexvolume") + + // note that namespace deletion is handled by delete-namespace flag + + var cs clientset.Interface + var ns *v1.Namespace + var node v1.Node + var suffix string + + BeforeEach(func() { + framework.SkipUnlessProviderIs("gce", "local") + framework.SkipUnlessMasterOSDistroIs("debian", "ubuntu", "gci", "custom") + framework.SkipUnlessNodeOSDistroIs("debian", "ubuntu", "gci", "custom") + framework.SkipUnlessSSHKeyPresent() + + cs = f.ClientSet + ns = f.Namespace + nodes := framework.GetReadySchedulableNodesOrDie(f.ClientSet) + node = nodes.Items[rand.Intn(len(nodes.Items))] + suffix = ns.Name + }) + + It("should not work when mount is in progress", func() { + driver := "attachable-with-long-format" + driverInstallAs := driver + "-" + suffix + + By(fmt.Sprintf("installing flexvolume %s on node %s as %s", path.Join(driverDir, driver), node.Name, driverInstallAs)) + installFlex(cs, &node, "k8s", driverInstallAs, path.Join(driverDir, driver)) + By(fmt.Sprintf("installing flexvolume %s on master as %s", path.Join(driverDir, driver), driverInstallAs)) + installFlex(cs, nil, "k8s", driverInstallAs, path.Join(driverDir, driver)) + volumeSource := v1.VolumeSource{ + FlexVolume: &v1.FlexVolumeSource{ + Driver: "k8s/" + driverInstallAs, + }, + } + + clientPod := getFlexVolumePod(volumeSource, node.Name) + By("Creating pod that uses slow format volume") + pod, err := cs.CoreV1().Pods(ns.Name).Create(clientPod) + Expect(err).NotTo(HaveOccurred()) + + uniqueVolumeName := getUniqueVolumeName(pod, driverInstallAs) + + By("waiting for volume-in-use on the node after pod creation") + err = waitForVolumesInUse(cs, node.Name, uniqueVolumeName) + Expect(err).NotTo(HaveOccurred(), "while waiting for volume in use") + + By("Deleting the flexvolume pod") + err = framework.DeletePodWithWait(f, cs, pod) + Expect(err).NotTo(HaveOccurred(), "in deleting the pod") + + // Wait a bit for node to sync the volume status + time.Sleep(30 * time.Second) + + By("waiting for volume-in-use on the node after pod deletion") + err = waitForVolumesInUse(cs, node.Name, uniqueVolumeName) + Expect(err).NotTo(HaveOccurred(), "while waiting for volume in use") + + // Wait for 110s because mount device operation has a sleep of 120 seconds + // we previously already waited for 30s. + time.Sleep(durationForStuckMount) + + By("waiting for volume to disappear from node in-use") + err = waitForVolumesNotInUse(cs, node.Name, uniqueVolumeName) + Expect(err).NotTo(HaveOccurred(), "while waiting for volume to be removed from in-use") + + By(fmt.Sprintf("uninstalling flexvolume %s from node %s", driverInstallAs, node.Name)) + uninstallFlex(cs, &node, "k8s", driverInstallAs) + By(fmt.Sprintf("uninstalling flexvolume %s from master", driverInstallAs)) + uninstallFlex(cs, nil, "k8s", driverInstallAs) + }) +}) + +func getUniqueVolumeName(pod *v1.Pod, driverName string) string { + return fmt.Sprintf("flexvolume-k8s/%s/%s", driverName, pod.Spec.Volumes[0].Name) +} + +func waitForVolumesNotInUse(client clientset.Interface, nodeName, volumeName string) error { + return wait.PollImmediate(10*time.Second, 60*time.Second, func() (bool, error) { + node, err := client.CoreV1().Nodes().Get(nodeName, metav1.GetOptions{}) + if err != nil { + return false, fmt.Errorf("error fetching node %s with %v", nodeName, err) + } + volumeInUSe := node.Status.VolumesInUse + for _, volume := range volumeInUSe { + if string(volume) == volumeName { + return false, nil + } + } + return true, nil + }) +} + +func waitForVolumesInUse(client clientset.Interface, nodeName, volumeName string) error { + return wait.PollImmediate(10*time.Second, 60*time.Second, func() (bool, error) { + node, err := client.CoreV1().Nodes().Get(nodeName, metav1.GetOptions{}) + if err != nil { + return false, fmt.Errorf("error fetching node %s with %v", nodeName, err) + } + volumeInUSe := node.Status.VolumesInUse + for _, volume := range volumeInUSe { + if string(volume) == volumeName { + return true, nil + } + } + return false, nil + }) +} + +func getFlexVolumePod(volumeSource v1.VolumeSource, nodeName string) *v1.Pod { + var gracePeriod int64 + clientPod := &v1.Pod{ + TypeMeta: metav1.TypeMeta{ + Kind: "Pod", + APIVersion: "v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "flexvolume-detach-test" + "-client", + Labels: map[string]string{ + "role": "flexvolume-detach-test" + "-client", + }, + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "flexvolume-detach-test" + "-client", + Image: BusyBoxImage, + WorkingDir: "/opt", + // An imperative and easily debuggable container which reads vol contents for + // us to scan in the tests or by eye. + // We expect that /opt is empty in the minimal containers which we use in this test. + Command: []string{ + "/bin/sh", + "-c", + "while true ; do cat /opt/foo/index.html ; sleep 2 ; ls -altrh /opt/ ; sleep 2 ; done ", + }, + VolumeMounts: []v1.VolumeMount{ + { + Name: "test-long-detach-flex", + MountPath: "/opt/foo", + }, + }, + }, + }, + TerminationGracePeriodSeconds: &gracePeriod, + SecurityContext: &v1.PodSecurityContext{ + SELinuxOptions: &v1.SELinuxOptions{ + Level: "s0:c0,c1", + }, + }, + Volumes: []v1.Volume{ + { + Name: "test-long-detach-flex", + VolumeSource: volumeSource, + }, + }, + NodeName: nodeName, + }, + } + return clientPod +} diff --git a/test/e2e/testing-manifests/flexvolume/attachable-with-long-format b/test/e2e/testing-manifests/flexvolume/attachable-with-long-format new file mode 100644 index 00000000000..6785147a20a --- /dev/null +++ b/test/e2e/testing-manifests/flexvolume/attachable-with-long-format @@ -0,0 +1,145 @@ +#!/bin/sh + +# Copyright 2017 The Kubernetes Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# This driver is especially designed to test a long mounting scenario +# which can cause a volume to be detached while mount is in progress. + + +FLEX_DUMMY_LOG=${FLEX_DUMMY_LOG:-"/tmp/flex-dummy.log"} + +VALID_MNTDEVICE=foo + +# attach always returns one valid mount device so a different device +# showing up in a subsequent driver call implies a bug +validateMountDeviceOrDie() { + MNTDEVICE=$1 + CALL=$2 + if [ "$MNTDEVICE" != "$VALID_MNTDEVICE" ]; then + log "{\"status\":\"Failure\",\"message\":\"call "${CALL}" expected device "${VALID_MNTDEVICE}", got device "${MNTDEVICE}"\"}" + exit 0 + fi +} + +log() { + printf "$*" >&1 +} + +debug() { + echo "$(date) $*" >> "${FLEX_DUMMY_LOG}" +} + +attach() { + debug "attach $@" + log "{\"status\":\"Success\",\"device\":\""${VALID_MNTDEVICE}"\"}" + exit 0 +} + +detach() { + debug "detach $@" + # TODO issue 44737 detach is passed PV name, not mount device + log "{\"status\":\"Success\"}" + exit 0 +} + +waitforattach() { + debug "waitforattach $@" + MNTDEVICE=$1 + validateMountDeviceOrDie "$MNTDEVICE" "waitforattach" + log "{\"status\":\"Success\",\"device\":\""${MNTDEVICE}"\"}" + exit 0 +} + +isattached() { + debug "isattached $@" + log "{\"status\":\"Success\",\"attached\":true}" + exit 0 +} + +domountdevice() { + debug "domountdevice $@" + MNTDEVICE=$2 + validateMountDeviceOrDie "$MNTDEVICE" "domountdevice" + MNTPATH=$1 + mkdir -p ${MNTPATH} >/dev/null 2>&1 + mount -t tmpfs none ${MNTPATH} >/dev/null 2>&1 + sleep 120 + echo "Hello from flexvolume!" >> "${MNTPATH}/index.html" + log "{\"status\":\"Success\"}" + exit 0 +} + +unmountdevice() { + debug "unmountdevice $@" + MNTPATH=$1 + rm "${MNTPATH}/index.html" >/dev/null 2>&1 + umount ${MNTPATH} >/dev/null 2>&1 + log "{\"status\":\"Success\"}" + exit 0 +} + +expandvolume() { + debug "expandvolume $@" + log "{\"status\":\"Success\"}" + exit 0 +} + +expandfs() { + debug "expandfs $@" + log "{\"status\":\"Success\"}" + exit 0 +} + +op=$1 + +if [ "$op" = "init" ]; then + debug "init $@" + log "{\"status\":\"Success\",\"capabilities\":{\"attach\":true, \"requiresFSResize\":true}}" + exit 0 +fi + +shift + +case "$op" in + attach) + attach $* + ;; + detach) + detach $* + ;; + waitforattach) + waitforattach $* + ;; + isattached) + isattached $* + ;; + mountdevice) + domountdevice $* + ;; + unmountdevice) + unmountdevice $* + ;; + expandvolume) + expandvolume $* + ;; + expandfs) + expandfs $* + ;; + *) + log "{\"status\":\"Not supported\"}" + exit 0 +esac + +exit 1