diff --git a/cmd/kube-controller-manager/app/BUILD b/cmd/kube-controller-manager/app/BUILD index 9cb00956ac5..9b3692b68c3 100644 --- a/cmd/kube-controller-manager/app/BUILD +++ b/cmd/kube-controller-manager/app/BUILD @@ -84,6 +84,7 @@ go_library( "//pkg/volume/gce_pd:go_default_library", "//pkg/volume/glusterfs:go_default_library", "//pkg/volume/host_path:go_default_library", + "//pkg/volume/local:go_default_library", "//pkg/volume/nfs:go_default_library", "//pkg/volume/photon_pd:go_default_library", "//pkg/volume/portworx:go_default_library", diff --git a/cmd/kube-controller-manager/app/plugins.go b/cmd/kube-controller-manager/app/plugins.go index 38f126f6b88..45dbec4f2a1 100644 --- a/cmd/kube-controller-manager/app/plugins.go +++ b/cmd/kube-controller-manager/app/plugins.go @@ -47,6 +47,7 @@ import ( "k8s.io/kubernetes/pkg/volume/gce_pd" "k8s.io/kubernetes/pkg/volume/glusterfs" "k8s.io/kubernetes/pkg/volume/host_path" + "k8s.io/kubernetes/pkg/volume/local" "k8s.io/kubernetes/pkg/volume/nfs" "k8s.io/kubernetes/pkg/volume/photon_pd" "k8s.io/kubernetes/pkg/volume/portworx" @@ -121,6 +122,7 @@ func ProbeControllerVolumePlugins(cloud cloudprovider.Interface, config componen allPlugins = append(allPlugins, flocker.ProbeVolumePlugins()...) allPlugins = append(allPlugins, portworx.ProbeVolumePlugins()...) allPlugins = append(allPlugins, scaleio.ProbeVolumePlugins()...) + allPlugins = append(allPlugins, local.ProbeVolumePlugins()...) if cloud != nil { switch { diff --git a/cmd/kubelet/app/BUILD b/cmd/kubelet/app/BUILD index c29b3791ea5..0202f3d5b0f 100644 --- a/cmd/kubelet/app/BUILD +++ b/cmd/kubelet/app/BUILD @@ -92,6 +92,7 @@ go_library( "//pkg/volume/glusterfs:go_default_library", "//pkg/volume/host_path:go_default_library", "//pkg/volume/iscsi:go_default_library", + "//pkg/volume/local:go_default_library", "//pkg/volume/nfs:go_default_library", "//pkg/volume/photon_pd:go_default_library", "//pkg/volume/portworx:go_default_library", diff --git a/cmd/kubelet/app/plugins.go b/cmd/kubelet/app/plugins.go index 10be1927d7e..be45e063d49 100644 --- a/cmd/kubelet/app/plugins.go +++ b/cmd/kubelet/app/plugins.go @@ -45,6 +45,7 @@ import ( "k8s.io/kubernetes/pkg/volume/glusterfs" "k8s.io/kubernetes/pkg/volume/host_path" "k8s.io/kubernetes/pkg/volume/iscsi" + "k8s.io/kubernetes/pkg/volume/local" "k8s.io/kubernetes/pkg/volume/nfs" "k8s.io/kubernetes/pkg/volume/photon_pd" "k8s.io/kubernetes/pkg/volume/portworx" @@ -95,6 +96,7 @@ func ProbeVolumePlugins(pluginDir string) []volume.VolumePlugin { allPlugins = append(allPlugins, projected.ProbeVolumePlugins()...) allPlugins = append(allPlugins, portworx.ProbeVolumePlugins()...) allPlugins = append(allPlugins, scaleio.ProbeVolumePlugins()...) + allPlugins = append(allPlugins, local.ProbeVolumePlugins()...) return allPlugins } diff --git a/pkg/api/helper/helpers.go b/pkg/api/helper/helpers.go index 891c012d994..95b8f8aa5ae 100644 --- a/pkg/api/helper/helpers.go +++ b/pkg/api/helper/helpers.go @@ -590,6 +590,10 @@ func GetStorageNodeAffinityFromAnnotation(annotations map[string]string) (*api.N // Converts NodeAffinity type to Alpha annotation for use in PersistentVolumes // TODO: update when storage node affinity graduates to beta func StorageNodeAffinityToAlphaAnnotation(annotations map[string]string, affinity *api.NodeAffinity) error { + if affinity == nil { + return nil + } + json, err := json.Marshal(*affinity) if err != nil { return err diff --git a/pkg/api/v1/helper/helpers.go b/pkg/api/v1/helper/helpers.go index 9a8caad82f2..6b51c5c8364 100644 --- a/pkg/api/v1/helper/helpers.go +++ b/pkg/api/v1/helper/helpers.go @@ -517,6 +517,10 @@ func GetStorageNodeAffinityFromAnnotation(annotations map[string]string) (*v1.No // Converts NodeAffinity type to Alpha annotation for use in PersistentVolumes // TODO: update when storage node affinity graduates to beta func StorageNodeAffinityToAlphaAnnotation(annotations map[string]string, affinity *v1.NodeAffinity) error { + if affinity == nil { + return nil + } + json, err := json.Marshal(*affinity) if err != nil { return err diff --git a/pkg/controller/volume/attachdetach/attach_detach_controller.go b/pkg/controller/volume/attachdetach/attach_detach_controller.go index cdfd24923ff..fd49f0f470e 100644 --- a/pkg/controller/volume/attachdetach/attach_detach_controller.go +++ b/pkg/controller/volume/attachdetach/attach_detach_controller.go @@ -565,3 +565,7 @@ func (adc *attachDetachController) addNodeToDswp(node *v1.Node, nodeName types.N adc.desiredStateOfWorld.AddNode(nodeName, keepTerminatedPodVolumes) } } + +func (adc *attachDetachController) GetNodeLabels() (map[string]string, error) { + return nil, fmt.Errorf("GetNodeLabels() unsupported in Attach/Detach controller") +} diff --git a/pkg/controller/volume/persistentvolume/volume_host.go b/pkg/controller/volume/persistentvolume/volume_host.go index c495c002159..111c2c0d8d5 100644 --- a/pkg/controller/volume/persistentvolume/volume_host.go +++ b/pkg/controller/volume/persistentvolume/volume_host.go @@ -86,3 +86,7 @@ func (adc *PersistentVolumeController) GetSecretFunc() func(namespace, name stri return nil, fmt.Errorf("GetSecret unsupported in PersistentVolumeController") } } + +func (ctrl *PersistentVolumeController) GetNodeLabels() (map[string]string, error) { + return nil, fmt.Errorf("GetNodeLabels() unsupported in PersistentVolumeController") +} diff --git a/pkg/kubelet/volume_host.go b/pkg/kubelet/volume_host.go index 547bf9b0cce..87d625f33a3 100644 --- a/pkg/kubelet/volume_host.go +++ b/pkg/kubelet/volume_host.go @@ -140,3 +140,11 @@ func (kvh *kubeletVolumeHost) GetNodeAllocatable() (v1.ResourceList, error) { func (kvh *kubeletVolumeHost) GetSecretFunc() func(namespace, name string) (*v1.Secret, error) { return kvh.secretManager.GetSecret } + +func (kvh *kubeletVolumeHost) GetNodeLabels() (map[string]string, error) { + node, err := kvh.kubelet.GetNode() + if err != nil { + return nil, fmt.Errorf("error retrieving node: %v", err) + } + return node.Labels, nil +} diff --git a/pkg/volume/BUILD b/pkg/volume/BUILD index 923204d37d4..5ecc5b254a8 100644 --- a/pkg/volume/BUILD +++ b/pkg/volume/BUILD @@ -106,6 +106,7 @@ filegroup( "//pkg/volume/glusterfs:all-srcs", "//pkg/volume/host_path:all-srcs", "//pkg/volume/iscsi:all-srcs", + "//pkg/volume/local:all-srcs", "//pkg/volume/nfs:all-srcs", "//pkg/volume/photon_pd:all-srcs", "//pkg/volume/portworx:all-srcs", diff --git a/pkg/volume/local/BUILD b/pkg/volume/local/BUILD new file mode 100644 index 00000000000..e6172e5ce31 --- /dev/null +++ b/pkg/volume/local/BUILD @@ -0,0 +1,56 @@ +package(default_visibility = ["//visibility:public"]) + +licenses(["notice"]) + +load( + "@io_bazel_rules_go//go:def.bzl", + "go_library", + "go_test", +) + +go_library( + name = "go_default_library", + srcs = [ + "doc.go", + "local.go", + ], + tags = ["automanaged"], + deps = [ + "//pkg/api/v1:go_default_library", + "//pkg/util/mount:go_default_library", + "//pkg/util/strings:go_default_library", + "//pkg/volume:go_default_library", + "//pkg/volume/util:go_default_library", + "//vendor/github.com/golang/glog:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/types:go_default_library", + ], +) + +go_test( + name = "go_default_test", + srcs = ["local_test.go"], + library = ":go_default_library", + tags = ["automanaged"], + deps = [ + "//pkg/api/v1:go_default_library", + "//pkg/volume:go_default_library", + "//pkg/volume/testing:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/types:go_default_library", + "//vendor/k8s.io/client-go/util/testing:go_default_library", + ], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [":package-srcs"], + tags = ["automanaged"], +) diff --git a/pkg/volume/local/OWNERS b/pkg/volume/local/OWNERS new file mode 100644 index 00000000000..2737b7296e4 --- /dev/null +++ b/pkg/volume/local/OWNERS @@ -0,0 +1,21 @@ +approvers: +- saad-ali +- thockin +- vishh +- msau42 +- jingxu97 +- jsafrane +reviewers: +- thockin +- smarterclayton +- deads2k +- brendandburns +- derekwaynecarr +- pmorie +- saad-ali +- justinsb +- jsafrane +- rootfs +- jingxu97 +- msau42 +- vishh diff --git a/pkg/volume/local/doc.go b/pkg/volume/local/doc.go new file mode 100644 index 00000000000..6817475508e --- /dev/null +++ b/pkg/volume/local/doc.go @@ -0,0 +1,18 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package local contains the internal representation of local volumes +package local // import "k8s.io/kubernetes/pkg/volume/local" diff --git a/pkg/volume/local/local.go b/pkg/volume/local/local.go new file mode 100644 index 00000000000..9e485c52b34 --- /dev/null +++ b/pkg/volume/local/local.go @@ -0,0 +1,267 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package local + +import ( + "fmt" + "os" + + "github.com/golang/glog" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/kubernetes/pkg/api/v1" + "k8s.io/kubernetes/pkg/util/mount" + "k8s.io/kubernetes/pkg/util/strings" + "k8s.io/kubernetes/pkg/volume" + "k8s.io/kubernetes/pkg/volume/util" +) + +// This is the primary entrypoint for volume plugins. +func ProbeVolumePlugins() []volume.VolumePlugin { + return []volume.VolumePlugin{&localVolumePlugin{}} +} + +type localVolumePlugin struct { + host volume.VolumeHost +} + +var _ volume.VolumePlugin = &localVolumePlugin{} +var _ volume.PersistentVolumePlugin = &localVolumePlugin{} + +const ( + localVolumePluginName = "kubernetes.io/local-volume" +) + +func (plugin *localVolumePlugin) Init(host volume.VolumeHost) error { + plugin.host = host + return nil +} + +func (plugin *localVolumePlugin) GetPluginName() string { + return localVolumePluginName +} + +func (plugin *localVolumePlugin) GetVolumeName(spec *volume.Spec) (string, error) { + // This volume is only supported as a PersistentVolumeSource, so the PV name is unique + return spec.Name(), nil +} + +func (plugin *localVolumePlugin) CanSupport(spec *volume.Spec) bool { + // This volume is only supported as a PersistentVolumeSource + return (spec.PersistentVolume != nil && spec.PersistentVolume.Spec.Local != nil) +} + +func (plugin *localVolumePlugin) RequiresRemount() bool { + return false +} + +func (plugin *localVolumePlugin) SupportsMountOption() bool { + return false +} + +func (plugin *localVolumePlugin) SupportsBulkVolumeVerification() bool { + return false +} + +func (plugin *localVolumePlugin) GetAccessModes() []v1.PersistentVolumeAccessMode { + // The current meaning of AccessMode is how many nodes can attach to it, not how many pods can mount it + return []v1.PersistentVolumeAccessMode{ + v1.ReadWriteOnce, + } +} + +func getVolumeSource(spec *volume.Spec) (*v1.LocalVolumeSource, bool, error) { + if spec.PersistentVolume != nil && spec.PersistentVolume.Spec.Local != nil { + return spec.PersistentVolume.Spec.Local, spec.ReadOnly, nil + } + + return nil, false, fmt.Errorf("Spec does not reference a Local volume type") +} + +func (plugin *localVolumePlugin) NewMounter(spec *volume.Spec, pod *v1.Pod, _ volume.VolumeOptions) (volume.Mounter, error) { + volumeSource, readOnly, err := getVolumeSource(spec) + if err != nil { + return nil, err + } + + return &localVolumeMounter{ + localVolume: &localVolume{ + podUID: pod.UID, + volName: spec.Name(), + mounter: plugin.host.GetMounter(), + plugin: plugin, + globalPath: volumeSource.Path, + }, + readOnly: readOnly, + }, nil + +} + +func (plugin *localVolumePlugin) NewUnmounter(volName string, podUID types.UID) (volume.Unmounter, error) { + return &localVolumeUnmounter{ + localVolume: &localVolume{ + podUID: podUID, + volName: volName, + mounter: plugin.host.GetMounter(), + plugin: plugin, + }, + }, nil +} + +// TODO: check if no path and no topology constraints are ok +func (plugin *localVolumePlugin) ConstructVolumeSpec(volumeName, mountPath string) (*volume.Spec, error) { + localVolume := &v1.PersistentVolume{ + ObjectMeta: metav1.ObjectMeta{ + Name: volumeName, + }, + Spec: v1.PersistentVolumeSpec{ + PersistentVolumeSource: v1.PersistentVolumeSource{ + Local: &v1.LocalVolumeSource{ + Path: "", + }, + }, + }, + } + return volume.NewSpecFromPersistentVolume(localVolume, false), nil +} + +// Local volumes represent a local directory on a node. +// The directory at the globalPath will be bind-mounted to the pod's directory +type localVolume struct { + volName string + podUID types.UID + // Global path to the volume + globalPath string + // Mounter interface that provides system calls to mount the global path to the pod local path. + mounter mount.Interface + plugin *localVolumePlugin + // TODO: add metrics + volume.MetricsNil +} + +func (l *localVolume) GetPath() string { + return l.plugin.host.GetPodVolumeDir(l.podUID, strings.EscapeQualifiedNameForDisk(localVolumePluginName), l.volName) +} + +type localVolumeMounter struct { + *localVolume + readOnly bool +} + +var _ volume.Mounter = &localVolumeMounter{} + +func (m *localVolumeMounter) GetAttributes() volume.Attributes { + return volume.Attributes{ + ReadOnly: m.readOnly, + Managed: !m.readOnly, + SupportsSELinux: true, + } +} + +// CanMount checks prior to mount operations to verify that the required components (binaries, etc.) +// to mount the volume are available on the underlying node. +// If not, it returns an error +func (m *localVolumeMounter) CanMount() error { + return nil +} + +// SetUp bind mounts the directory to the volume path +func (m *localVolumeMounter) SetUp(fsGroup *types.UnixGroupID) error { + return m.SetUpAt(m.GetPath(), fsGroup) +} + +// SetUpAt bind mounts the directory to the volume path and sets up volume ownership +func (m *localVolumeMounter) SetUpAt(dir string, fsGroup *types.UnixGroupID) error { + if m.globalPath == "" { + err := fmt.Errorf("LocalVolume volume %q path is empty", m.volName) + return err + } + + notMnt, err := m.mounter.IsLikelyNotMountPoint(dir) + glog.V(4).Infof("LocalVolume mount setup: PodDir(%s) VolDir(%s) Mounted(%t) Error(%v), ReadOnly(%t)", dir, m.globalPath, !notMnt, err, m.readOnly) + if err != nil && !os.IsNotExist(err) { + glog.Errorf("cannot validate mount point: %s %v", dir, err) + return err + } + if !notMnt { + return nil + } + + if err := os.MkdirAll(dir, 0750); err != nil { + glog.Errorf("mkdir failed on disk %s (%v)", dir, err) + return err + } + + // Perform a bind mount to the full path to allow duplicate mounts of the same volume. + options := []string{"bind"} + if m.readOnly { + options = append(options, "ro") + } + + glog.V(4).Infof("attempting to mount %s", dir) + err = m.mounter.Mount(m.globalPath, dir, "", options) + if err != nil { + glog.Errorf("Mount of volume %s failed: %v", dir, err) + notMnt, mntErr := m.mounter.IsLikelyNotMountPoint(dir) + if mntErr != nil { + glog.Errorf("IsLikelyNotMountPoint check failed: %v", mntErr) + return err + } + if !notMnt { + if mntErr = m.mounter.Unmount(dir); mntErr != nil { + glog.Errorf("Failed to unmount: %v", mntErr) + return err + } + notMnt, mntErr = m.mounter.IsLikelyNotMountPoint(dir) + if mntErr != nil { + glog.Errorf("IsLikelyNotMountPoint check failed: %v", mntErr) + return err + } + if !notMnt { + // This is very odd, we don't expect it. We'll try again next sync loop. + glog.Errorf("%s is still mounted, despite call to unmount(). Will try again next sync loop.", dir) + return err + } + } + os.Remove(dir) + return err + } + + if !m.readOnly { + // TODO: how to prevent multiple mounts with conflicting fsGroup? + return volume.SetVolumeOwnership(m, fsGroup) + } + return nil +} + +type localVolumeUnmounter struct { + *localVolume +} + +var _ volume.Unmounter = &localVolumeUnmounter{} + +// TearDown unmounts the bind mount +func (u *localVolumeUnmounter) TearDown() error { + return u.TearDownAt(u.GetPath()) +} + +// TearDownAt unmounts the bind mount +func (u *localVolumeUnmounter) TearDownAt(dir string) error { + glog.V(4).Infof("Unmounting volume %q at path %q\n", u.volName, dir) + return util.UnmountPath(dir, u.mounter) +} diff --git a/pkg/volume/local/local_test.go b/pkg/volume/local/local_test.go new file mode 100644 index 00000000000..b417eff3ea3 --- /dev/null +++ b/pkg/volume/local/local_test.go @@ -0,0 +1,288 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package local + +import ( + "os" + "path" + "testing" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + utiltesting "k8s.io/client-go/util/testing" + "k8s.io/kubernetes/pkg/api/v1" + "k8s.io/kubernetes/pkg/volume" + volumetest "k8s.io/kubernetes/pkg/volume/testing" +) + +const ( + testPVName = "pvA" + testMountPath = "pods/poduid/volumes/kubernetes.io~local-volume/pvA" + testNodeName = "fakeNodeName" +) + +func getPlugin(t *testing.T) (string, volume.VolumePlugin) { + tmpDir, err := utiltesting.MkTmpdir("localVolumeTest") + if err != nil { + t.Fatalf("can't make a temp dir: %v", err) + } + + plugMgr := volume.VolumePluginMgr{} + plugMgr.InitPlugins(ProbeVolumePlugins(), volumetest.NewFakeVolumeHost(tmpDir, nil, nil)) + + plug, err := plugMgr.FindPluginByName(localVolumePluginName) + if err != nil { + os.RemoveAll(tmpDir) + t.Fatalf("Can't find the plugin by name") + } + if plug.GetPluginName() != localVolumePluginName { + t.Errorf("Wrong name: %s", plug.GetPluginName()) + } + return tmpDir, plug +} + +func getPersistentPlugin(t *testing.T) (string, volume.PersistentVolumePlugin) { + tmpDir, err := utiltesting.MkTmpdir("localVolumeTest") + if err != nil { + t.Fatalf("can't make a temp dir: %v", err) + } + + plugMgr := volume.VolumePluginMgr{} + plugMgr.InitPlugins(ProbeVolumePlugins(), volumetest.NewFakeVolumeHost(tmpDir, nil, nil)) + + plug, err := plugMgr.FindPersistentPluginByName(localVolumePluginName) + if err != nil { + os.RemoveAll(tmpDir) + t.Fatalf("Can't find the plugin by name") + } + if plug.GetPluginName() != localVolumePluginName { + t.Errorf("Wrong name: %s", plug.GetPluginName()) + } + return tmpDir, plug +} + +func getTestVolume(readOnly bool) *volume.Spec { + pv := &v1.PersistentVolume{ + ObjectMeta: metav1.ObjectMeta{ + Name: testPVName, + }, + Spec: v1.PersistentVolumeSpec{ + PersistentVolumeSource: v1.PersistentVolumeSource{ + Local: &v1.LocalVolumeSource{ + Path: "/test-vol", + }, + }, + }, + } + return volume.NewSpecFromPersistentVolume(pv, readOnly) +} + +func contains(modes []v1.PersistentVolumeAccessMode, mode v1.PersistentVolumeAccessMode) bool { + for _, m := range modes { + if m == mode { + return true + } + } + return false +} + +func TestCanSupport(t *testing.T) { + tmpDir, plug := getPlugin(t) + defer os.RemoveAll(tmpDir) + + if !plug.CanSupport(getTestVolume(false)) { + t.Errorf("Expected true") + } +} + +func TestGetAccessModes(t *testing.T) { + tmpDir, plug := getPersistentPlugin(t) + defer os.RemoveAll(tmpDir) + + modes := plug.GetAccessModes() + if !contains(modes, v1.ReadWriteOnce) { + t.Errorf("Expected AccessModeType %q", v1.ReadWriteOnce) + } + + if contains(modes, v1.ReadWriteMany) { + t.Errorf("Found AccessModeType %q, expected not", v1.ReadWriteMany) + } + if contains(modes, v1.ReadOnlyMany) { + t.Errorf("Found AccessModeType %q, expected not", v1.ReadOnlyMany) + } +} + +func TestGetVolumeName(t *testing.T) { + tmpDir, plug := getPersistentPlugin(t) + defer os.RemoveAll(tmpDir) + + volName, err := plug.GetVolumeName(getTestVolume(false)) + if err != nil { + t.Errorf("Failed to get volume name: %v", err) + } + if volName != testPVName { + t.Errorf("Expected volume name %q, got %q", testPVName, volName) + } +} + +func TestMountUnmount(t *testing.T) { + tmpDir, plug := getPlugin(t) + defer os.RemoveAll(tmpDir) + + pod := &v1.Pod{ObjectMeta: metav1.ObjectMeta{UID: types.UID("poduid")}} + mounter, err := plug.NewMounter(getTestVolume(false), pod, volume.VolumeOptions{}) + if err != nil { + t.Errorf("Failed to make a new Mounter: %v", err) + } + if mounter == nil { + t.Fatalf("Got a nil Mounter") + } + + volPath := path.Join(tmpDir, testMountPath) + path := mounter.GetPath() + if path != volPath { + t.Errorf("Got unexpected path: %s", path) + } + + if err := mounter.SetUp(nil); err != nil { + t.Errorf("Expected success, got: %v", err) + } + if _, err := os.Stat(path); err != nil { + if os.IsNotExist(err) { + t.Errorf("SetUp() failed, volume path not created: %s", path) + } else { + t.Errorf("SetUp() failed: %v", err) + } + } + + unmounter, err := plug.NewUnmounter(testPVName, pod.UID) + if err != nil { + t.Errorf("Failed to make a new Unmounter: %v", err) + } + if unmounter == nil { + t.Fatalf("Got a nil Unmounter") + } + + if err := unmounter.TearDown(); err != nil { + t.Errorf("Expected success, got: %v", err) + } + if _, err := os.Stat(path); err == nil { + t.Errorf("TearDown() failed, volume path still exists: %s", path) + } else if !os.IsNotExist(err) { + t.Errorf("SetUp() failed: %v", err) + } +} + +func TestConstructVolumeSpec(t *testing.T) { + tmpDir, plug := getPlugin(t) + defer os.RemoveAll(tmpDir) + + volPath := path.Join(tmpDir, testMountPath) + spec, err := plug.ConstructVolumeSpec(testPVName, volPath) + if err != nil { + t.Errorf("ConstructVolumeSpec() failed: %v", err) + } + if spec == nil { + t.Fatalf("ConstructVolumeSpec() returned nil") + } + + volName := spec.Name() + if volName != testPVName { + t.Errorf("Expected volume name %q, got %q", testPVName, volName) + } + + if spec.Volume != nil { + t.Errorf("Volume object returned, expected nil") + } + + pv := spec.PersistentVolume + if pv == nil { + t.Fatalf("PersistentVolume object nil") + } + + ls := pv.Spec.PersistentVolumeSource.Local + if ls == nil { + t.Fatalf("LocalVolumeSource object nil") + } +} + +func TestPersistentClaimReadOnlyFlag(t *testing.T) { + tmpDir, plug := getPlugin(t) + defer os.RemoveAll(tmpDir) + + // Read only == true + pod := &v1.Pod{ObjectMeta: metav1.ObjectMeta{UID: types.UID("poduid")}} + mounter, err := plug.NewMounter(getTestVolume(true), pod, volume.VolumeOptions{}) + if err != nil { + t.Errorf("Failed to make a new Mounter: %v", err) + } + if mounter == nil { + t.Fatalf("Got a nil Mounter") + } + if !mounter.GetAttributes().ReadOnly { + t.Errorf("Expected true for mounter.IsReadOnly") + } + + // Read only == false + mounter, err = plug.NewMounter(getTestVolume(false), pod, volume.VolumeOptions{}) + if err != nil { + t.Errorf("Failed to make a new Mounter: %v", err) + } + if mounter == nil { + t.Fatalf("Got a nil Mounter") + } + if mounter.GetAttributes().ReadOnly { + t.Errorf("Expected false for mounter.IsReadOnly") + } +} + +func TestUnsupportedPlugins(t *testing.T) { + tmpDir, err := utiltesting.MkTmpdir("localVolumeTest") + if err != nil { + t.Fatalf("can't make a temp dir: %v", err) + } + defer os.RemoveAll(tmpDir) + + plugMgr := volume.VolumePluginMgr{} + plugMgr.InitPlugins(ProbeVolumePlugins(), volumetest.NewFakeVolumeHost(tmpDir, nil, nil)) + spec := getTestVolume(false) + + recyclePlug, err := plugMgr.FindRecyclablePluginBySpec(spec) + if err == nil && recyclePlug != nil { + t.Errorf("Recyclable plugin found, expected none") + } + + deletePlug, err := plugMgr.FindDeletablePluginByName(localVolumePluginName) + if err == nil && deletePlug != nil { + t.Errorf("Deletable plugin found, expected none") + } + + attachPlug, err := plugMgr.FindAttachablePluginByName(localVolumePluginName) + if err == nil && attachPlug != nil { + t.Errorf("Attachable plugin found, expected none") + } + + createPlug, err := plugMgr.FindCreatablePluginBySpec(spec) + if err == nil && createPlug != nil { + t.Errorf("Creatable plugin found, expected none") + } + + provisionPlug, err := plugMgr.FindProvisionablePluginByName(localVolumePluginName) + if err == nil && provisionPlug != nil { + t.Errorf("Provisionable plugin found, expected none") + } +} diff --git a/pkg/volume/plugins.go b/pkg/volume/plugins.go index 23dec329696..a6abfd3cd2d 100644 --- a/pkg/volume/plugins.go +++ b/pkg/volume/plugins.go @@ -232,12 +232,16 @@ type VolumeHost interface { // Returns a function that returns a secret. GetSecretFunc() func(namespace, name string) (*v1.Secret, error) + + // Returns the labels on the node + GetNodeLabels() (map[string]string, error) } // VolumePluginMgr tracks registered plugins. type VolumePluginMgr struct { mutex sync.Mutex plugins map[string]VolumePlugin + Host VolumeHost } // Spec is an internal representation of a volume. All API volume types translate to Spec. @@ -336,6 +340,7 @@ func (pm *VolumePluginMgr) InitPlugins(plugins []VolumePlugin, host VolumeHost) pm.mutex.Lock() defer pm.mutex.Unlock() + pm.Host = host if pm.plugins == nil { pm.plugins = map[string]VolumePlugin{} } diff --git a/pkg/volume/testing/testing.go b/pkg/volume/testing/testing.go index 95ee00c2aad..2e489b7a4b8 100644 --- a/pkg/volume/testing/testing.go +++ b/pkg/volume/testing/testing.go @@ -134,6 +134,10 @@ func (f *fakeVolumeHost) GetSecretFunc() func(namespace, name string) (*v1.Secre } } +func (f *fakeVolumeHost) GetNodeLabels() (map[string]string, error) { + return map[string]string{"test-label": "test-value"}, nil +} + func ProbeVolumePlugins(config VolumeConfig) []VolumePlugin { if _, ok := config.OtherAttributes["fake-property"]; ok { return []VolumePlugin{ diff --git a/pkg/volume/util/BUILD b/pkg/volume/util/BUILD index 30bc0164826..c2ff2f56693 100644 --- a/pkg/volume/util/BUILD +++ b/pkg/volume/util/BUILD @@ -29,6 +29,7 @@ go_library( "//vendor/github.com/golang/glog:go_default_library", "//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/labels:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library", ], ) @@ -38,10 +39,14 @@ go_test( srcs = [ "atomic_writer_test.go", "device_util_linux_test.go", + "util_test.go", ], library = ":go_default_library", tags = ["automanaged"], deps = [ + "//pkg/api/v1:go_default_library", + "//pkg/api/v1/helper:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//vendor/k8s.io/client-go/util/testing:go_default_library", ], diff --git a/pkg/volume/util/operationexecutor/BUILD b/pkg/volume/util/operationexecutor/BUILD index ec455f186bc..9817a5f606e 100644 --- a/pkg/volume/util/operationexecutor/BUILD +++ b/pkg/volume/util/operationexecutor/BUILD @@ -18,9 +18,11 @@ go_library( deps = [ "//pkg/api/v1:go_default_library", "//pkg/client/clientset_generated/clientset:go_default_library", + "//pkg/features:go_default_library", "//pkg/kubelet/events:go_default_library", "//pkg/util/mount:go_default_library", "//pkg/volume:go_default_library", + "//pkg/volume/util:go_default_library", "//pkg/volume/util/nestedpendingoperations:go_default_library", "//pkg/volume/util/types:go_default_library", "//pkg/volume/util/volumehelper:go_default_library", @@ -28,6 +30,7 @@ go_library( "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/types:go_default_library", + "//vendor/k8s.io/apiserver/pkg/util/feature:go_default_library", "//vendor/k8s.io/client-go/tools/record:go_default_library", ], ) diff --git a/pkg/volume/util/operationexecutor/operation_generator.go b/pkg/volume/util/operationexecutor/operation_generator.go index 20059b527ac..52629afd4c0 100644 --- a/pkg/volume/util/operationexecutor/operation_generator.go +++ b/pkg/volume/util/operationexecutor/operation_generator.go @@ -24,12 +24,15 @@ import ( "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/tools/record" "k8s.io/kubernetes/pkg/api/v1" "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" + "k8s.io/kubernetes/pkg/features" kevents "k8s.io/kubernetes/pkg/kubelet/events" "k8s.io/kubernetes/pkg/util/mount" "k8s.io/kubernetes/pkg/volume" + "k8s.io/kubernetes/pkg/volume/util" "k8s.io/kubernetes/pkg/volume/util/volumehelper" ) @@ -363,6 +366,11 @@ func (og *operationGenerator) GenerateMountVolumeFunc( return nil, volumeToMount.GenerateErrorDetailed("MountVolume.FindPluginBySpec failed", err) } + affinityErr := checkNodeAffinity(og, volumeToMount, volumePlugin) + if affinityErr != nil { + return nil, affinityErr + } + volumeMounter, newMounterErr := volumePlugin.NewMounter( volumeToMount.VolumeSpec, volumeToMount.Pod, @@ -713,3 +721,27 @@ func checkMountOptionSupport(og *operationGenerator, volumeToMount VolumeToMount } return nil } + +// checkNodeAffinity looks at the PV node affinity, and checks if the node has the same corresponding labels +// This ensures that we don't mount a volume that doesn't belong to this node +func checkNodeAffinity(og *operationGenerator, volumeToMount VolumeToMount, plugin volume.VolumePlugin) error { + if !utilfeature.DefaultFeatureGate.Enabled(features.PersistentLocalVolumes) { + return nil + } + + pv := volumeToMount.VolumeSpec.PersistentVolume + if pv != nil { + nodeLabels, err := og.volumePluginMgr.Host.GetNodeLabels() + if err != nil { + return volumeToMount.GenerateErrorDetailed("Error getting node labels", err) + } + + err = util.CheckNodeAffinity(pv, nodeLabels) + if err != nil { + eventErr, detailedErr := volumeToMount.GenerateError("Storage node affinity check failed", err) + og.recorder.Eventf(volumeToMount.Pod, v1.EventTypeWarning, kevents.FailedMountVolume, eventErr.Error()) + return detailedErr + } + } + return nil +} diff --git a/pkg/volume/util/util.go b/pkg/volume/util/util.go index 97561e7e658..a9ab2845a70 100644 --- a/pkg/volume/util/util.go +++ b/pkg/volume/util/util.go @@ -23,6 +23,7 @@ import ( "github.com/golang/glog" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" "k8s.io/kubernetes/pkg/api/v1" v1helper "k8s.io/kubernetes/pkg/api/v1/helper" storage "k8s.io/kubernetes/pkg/apis/storage/v1" @@ -164,3 +165,30 @@ func GetClassForVolume(kubeClient clientset.Interface, pv *v1.PersistentVolume) } return class, nil } + +// CheckNodeAffinity looks at the PV node affinity, and checks if the node has the same corresponding labels +// This ensures that we don't mount a volume that doesn't belong to this node +func CheckNodeAffinity(pv *v1.PersistentVolume, nodeLabels map[string]string) error { + affinity, err := v1helper.GetStorageNodeAffinityFromAnnotation(pv.Annotations) + if err != nil { + return fmt.Errorf("Error getting storage node affinity: %v", err) + } + if affinity == nil { + return nil + } + + if affinity.RequiredDuringSchedulingIgnoredDuringExecution != nil { + terms := affinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms + glog.V(10).Infof("Match for RequiredDuringSchedulingIgnoredDuringExecution node selector terms %+v", terms) + for _, term := range terms { + selector, err := v1helper.NodeSelectorRequirementsAsSelector(term.MatchExpressions) + if err != nil { + return fmt.Errorf("Failed to parse MatchExpressions: %v", err) + } + if !selector.Matches(labels.Set(nodeLabels)) { + return fmt.Errorf("NodeSelectorTerm %+v does not match node labels", term.MatchExpressions) + } + } + } + return nil +} diff --git a/pkg/volume/util/util_test.go b/pkg/volume/util/util_test.go new file mode 100644 index 00000000000..f491d45a404 --- /dev/null +++ b/pkg/volume/util/util_test.go @@ -0,0 +1,142 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package util + +import ( + "testing" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/kubernetes/pkg/api/v1" + "k8s.io/kubernetes/pkg/api/v1/helper" +) + +var nodeLabels map[string]string = map[string]string{ + "test-key1": "test-value1", + "test-key2": "test-value2", +} + +func TestCheckNodeAffinity(t *testing.T) { + type affinityTest struct { + name string + expectSuccess bool + pv *v1.PersistentVolume + } + + cases := []affinityTest{ + { + name: "valid-no-constraints", + expectSuccess: true, + pv: testVolumeWithNodeAffinity(t, &v1.NodeAffinity{}), + }, + { + name: "valid-constraints", + expectSuccess: true, + pv: testVolumeWithNodeAffinity(t, &v1.NodeAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{ + NodeSelectorTerms: []v1.NodeSelectorTerm{ + { + MatchExpressions: []v1.NodeSelectorRequirement{ + { + Key: "test-key1", + Operator: v1.NodeSelectorOpIn, + Values: []string{"test-value1", "test-value3"}, + }, + { + Key: "test-key2", + Operator: v1.NodeSelectorOpIn, + Values: []string{"test-value0", "test-value2"}, + }, + }, + }, + }, + }, + }), + }, + { + name: "invalid-key", + expectSuccess: false, + pv: testVolumeWithNodeAffinity(t, &v1.NodeAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{ + NodeSelectorTerms: []v1.NodeSelectorTerm{ + { + MatchExpressions: []v1.NodeSelectorRequirement{ + { + Key: "test-key1", + Operator: v1.NodeSelectorOpIn, + Values: []string{"test-value1", "test-value3"}, + }, + { + Key: "test-key3", + Operator: v1.NodeSelectorOpIn, + Values: []string{"test-value0", "test-value2"}, + }, + }, + }, + }, + }, + }), + }, + { + name: "invalid-values", + expectSuccess: false, + pv: testVolumeWithNodeAffinity(t, &v1.NodeAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{ + NodeSelectorTerms: []v1.NodeSelectorTerm{ + { + MatchExpressions: []v1.NodeSelectorRequirement{ + { + Key: "test-key1", + Operator: v1.NodeSelectorOpIn, + Values: []string{"test-value3", "test-value4"}, + }, + { + Key: "test-key2", + Operator: v1.NodeSelectorOpIn, + Values: []string{"test-value0", "test-value2"}, + }, + }, + }, + }, + }, + }), + }, + } + + for _, c := range cases { + err := CheckNodeAffinity(c.pv, nodeLabels) + + if err != nil && c.expectSuccess { + t.Errorf("CheckTopology %v returned error: %v", c.name, err) + } + if err == nil && !c.expectSuccess { + t.Errorf("CheckTopology %v returned success, expected error", c.name) + } + } +} + +func testVolumeWithNodeAffinity(t *testing.T, affinity *v1.NodeAffinity) *v1.PersistentVolume { + objMeta := metav1.ObjectMeta{Name: "test-constraints"} + objMeta.Annotations = map[string]string{} + err := helper.StorageNodeAffinityToAlphaAnnotation(objMeta.Annotations, affinity) + if err != nil { + t.Fatalf("Failed to get node affinity annotation: %v", err) + } + + return &v1.PersistentVolume{ + ObjectMeta: objMeta, + } +} diff --git a/plugin/pkg/scheduler/algorithm/predicates/BUILD b/plugin/pkg/scheduler/algorithm/predicates/BUILD index 77880a33167..8f586f64b50 100644 --- a/plugin/pkg/scheduler/algorithm/predicates/BUILD +++ b/plugin/pkg/scheduler/algorithm/predicates/BUILD @@ -22,6 +22,8 @@ go_library( "//pkg/api/v1/helper:go_default_library", "//pkg/api/v1/helper/qos:go_default_library", "//pkg/client/listers/core/v1:go_default_library", + "//pkg/features:go_default_library", + "//pkg/volume/util:go_default_library", "//plugin/pkg/scheduler/algorithm:go_default_library", "//plugin/pkg/scheduler/algorithm/priorities/util:go_default_library", "//plugin/pkg/scheduler/schedulercache:go_default_library", @@ -30,7 +32,9 @@ go_library( "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/labels:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library", + "//vendor/k8s.io/apiserver/pkg/util/feature:go_default_library", "//vendor/k8s.io/client-go/util/workqueue:go_default_library", + "//vendor/k8s.io/metrics/pkg/client/clientset_generated/clientset:go_default_library", ], ) diff --git a/plugin/pkg/scheduler/algorithm/predicates/error.go b/plugin/pkg/scheduler/algorithm/predicates/error.go index 36ac5f1deac..ecedab29cba 100644 --- a/plugin/pkg/scheduler/algorithm/predicates/error.go +++ b/plugin/pkg/scheduler/algorithm/predicates/error.go @@ -37,6 +37,7 @@ var ( ErrMaxVolumeCountExceeded = newPredicateFailureError("MaxVolumeCount") ErrNodeUnderMemoryPressure = newPredicateFailureError("NodeUnderMemoryPressure") ErrNodeUnderDiskPressure = newPredicateFailureError("NodeUnderDiskPressure") + ErrVolumeNodeConflict = newPredicateFailureError("NoVolumeNodeConflict") // ErrFakePredicate is used for test only. The fake predicates returning false also returns error // as ErrFakePredicate. ErrFakePredicate = newPredicateFailureError("FakePredicateError") diff --git a/plugin/pkg/scheduler/algorithm/predicates/predicates.go b/plugin/pkg/scheduler/algorithm/predicates/predicates.go index 6a86bebaf6a..10ea3eafea3 100644 --- a/plugin/pkg/scheduler/algorithm/predicates/predicates.go +++ b/plugin/pkg/scheduler/algorithm/predicates/predicates.go @@ -29,14 +29,18 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" utilruntime "k8s.io/apimachinery/pkg/util/runtime" + utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/util/workqueue" "k8s.io/kubernetes/pkg/api/v1" v1helper "k8s.io/kubernetes/pkg/api/v1/helper" v1qos "k8s.io/kubernetes/pkg/api/v1/helper/qos" corelisters "k8s.io/kubernetes/pkg/client/listers/core/v1" + "k8s.io/kubernetes/pkg/features" + volumeutil "k8s.io/kubernetes/pkg/volume/util" "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm" priorityutil "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/priorities/util" "k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache" + "k8s.io/metrics/pkg/client/clientset_generated/clientset" ) // predicatePrecomputations: Helper types/variables... @@ -1264,3 +1268,81 @@ func CheckNodeDiskPressurePredicate(pod *v1.Pod, meta interface{}, nodeInfo *sch } return true, nil, nil } + +type VolumeNodeChecker struct { + pvInfo PersistentVolumeInfo + pvcInfo PersistentVolumeClaimInfo + client clientset.Interface +} + +// VolumeNodeChecker evaluates if a pod can fit due to the volumes it requests, given +// that some volumes have node topology constraints, particularly when using Local PVs. +// The requirement is that any pod that uses a PVC that is bound to a PV with topology constraints +// must be scheduled to a node that satisfies the PV's topology labels. +func NewVolumeNodePredicate(pvInfo PersistentVolumeInfo, pvcInfo PersistentVolumeClaimInfo, client clientset.Interface) algorithm.FitPredicate { + c := &VolumeNodeChecker{ + pvInfo: pvInfo, + pvcInfo: pvcInfo, + client: client, + } + return c.predicate +} + +func (c *VolumeNodeChecker) predicate(pod *v1.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) { + if !utilfeature.DefaultFeatureGate.Enabled(features.PersistentLocalVolumes) { + return true, nil, nil + } + + // If a pod doesn't have any volume attached to it, the predicate will always be true. + // Thus we make a fast path for it, to avoid unnecessary computations in this case. + if len(pod.Spec.Volumes) == 0 { + return true, nil, nil + } + + node := nodeInfo.Node() + if node == nil { + return false, nil, fmt.Errorf("node not found") + } + + glog.V(2).Infof("Checking for prebound volumes with node affinity") + namespace := pod.Namespace + manifest := &(pod.Spec) + for i := range manifest.Volumes { + volume := &manifest.Volumes[i] + if volume.PersistentVolumeClaim == nil { + continue + } + pvcName := volume.PersistentVolumeClaim.ClaimName + if pvcName == "" { + return false, nil, fmt.Errorf("PersistentVolumeClaim had no name") + } + pvc, err := c.pvcInfo.GetPersistentVolumeClaimInfo(namespace, pvcName) + if err != nil { + return false, nil, err + } + + if pvc == nil { + return false, nil, fmt.Errorf("PersistentVolumeClaim was not found: %q", pvcName) + } + pvName := pvc.Spec.VolumeName + if pvName == "" { + return false, nil, fmt.Errorf("PersistentVolumeClaim is not bound: %q", pvcName) + } + + pv, err := c.pvInfo.GetPersistentVolumeInfo(pvName) + if err != nil { + return false, nil, err + } + if pv == nil { + return false, nil, fmt.Errorf("PersistentVolume not found: %q", pvName) + } + + err = volumeutil.CheckNodeAffinity(pv, node.Labels) + if err != nil { + glog.V(2).Infof("Won't schedule pod %q onto node %q due to volume %q node mismatch: %v", pod.Name, node.Name, pvName, err.Error()) + return false, []algorithm.PredicateFailureReason{ErrVolumeNodeConflict}, nil + } + glog.V(4).Infof("VolumeNode predicate allows node %q for pod %q due to volume %q", node.Name, pod.Name, pvName) + } + return true, nil, nil +} diff --git a/plugin/pkg/scheduler/algorithmprovider/defaults/compatibility_test.go b/plugin/pkg/scheduler/algorithmprovider/defaults/compatibility_test.go index ccc57efc50b..be4d20cbede 100644 --- a/plugin/pkg/scheduler/algorithmprovider/defaults/compatibility_test.go +++ b/plugin/pkg/scheduler/algorithmprovider/defaults/compatibility_test.go @@ -313,6 +313,77 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { }, }, }, + // Do not change this JSON after the corresponding release has been tagged. + // A failure indicates backwards compatibility with the specified release was broken. + "1.7": { + JSON: `{ + "kind": "Policy", + "apiVersion": "v1", + "predicates": [ + {"name": "MatchNodeSelector"}, + {"name": "PodFitsResources"}, + {"name": "PodFitsHostPorts"}, + {"name": "HostName"}, + {"name": "NoDiskConflict"}, + {"name": "NoVolumeZoneConflict"}, + {"name": "PodToleratesNodeTaints"}, + {"name": "CheckNodeMemoryPressure"}, + {"name": "CheckNodeDiskPressure"}, + {"name": "MaxEBSVolumeCount"}, + {"name": "MaxGCEPDVolumeCount"}, + {"name": "MaxAzureDiskVolumeCount"}, + {"name": "MatchInterPodAffinity"}, + {"name": "GeneralPredicates"}, + {"name": "TestServiceAffinity", "argument": {"serviceAffinity" : {"labels" : ["region"]}}}, + {"name": "TestLabelsPresence", "argument": {"labelsPresence" : {"labels" : ["foo"], "presence":true}}}, + {"name": "NoVolumeNodeConflict"} + ],"priorities": [ + {"name": "EqualPriority", "weight": 2}, + {"name": "ImageLocalityPriority", "weight": 2}, + {"name": "LeastRequestedPriority", "weight": 2}, + {"name": "BalancedResourceAllocation", "weight": 2}, + {"name": "SelectorSpreadPriority", "weight": 2}, + {"name": "NodePreferAvoidPodsPriority", "weight": 2}, + {"name": "NodeAffinityPriority", "weight": 2}, + {"name": "TaintTolerationPriority", "weight": 2}, + {"name": "InterPodAffinityPriority", "weight": 2}, + {"name": "MostRequestedPriority", "weight": 2} + ] + }`, + ExpectedPolicy: schedulerapi.Policy{ + Predicates: []schedulerapi.PredicatePolicy{ + {Name: "MatchNodeSelector"}, + {Name: "PodFitsResources"}, + {Name: "PodFitsHostPorts"}, + {Name: "HostName"}, + {Name: "NoDiskConflict"}, + {Name: "NoVolumeZoneConflict"}, + {Name: "PodToleratesNodeTaints"}, + {Name: "CheckNodeMemoryPressure"}, + {Name: "CheckNodeDiskPressure"}, + {Name: "MaxEBSVolumeCount"}, + {Name: "MaxGCEPDVolumeCount"}, + {Name: "MaxAzureDiskVolumeCount"}, + {Name: "MatchInterPodAffinity"}, + {Name: "GeneralPredicates"}, + {Name: "TestServiceAffinity", Argument: &schedulerapi.PredicateArgument{ServiceAffinity: &schedulerapi.ServiceAffinity{Labels: []string{"region"}}}}, + {Name: "TestLabelsPresence", Argument: &schedulerapi.PredicateArgument{LabelsPresence: &schedulerapi.LabelsPresence{Labels: []string{"foo"}, Presence: true}}}, + {Name: "NoVolumeNodeConflict"}, + }, + Priorities: []schedulerapi.PriorityPolicy{ + {Name: "EqualPriority", Weight: 2}, + {Name: "ImageLocalityPriority", Weight: 2}, + {Name: "LeastRequestedPriority", Weight: 2}, + {Name: "BalancedResourceAllocation", Weight: 2}, + {Name: "SelectorSpreadPriority", Weight: 2}, + {Name: "NodePreferAvoidPodsPriority", Weight: 2}, + {Name: "NodeAffinityPriority", Weight: 2}, + {Name: "TaintTolerationPriority", Weight: 2}, + {Name: "InterPodAffinityPriority", Weight: 2}, + {Name: "MostRequestedPriority", Weight: 2}, + }, + }, + }, } registeredPredicates := sets.NewString(factory.ListRegisteredFitPredicates()...) diff --git a/plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go b/plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go index d03ca433ff9..94447611a13 100644 --- a/plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go +++ b/plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go @@ -176,6 +176,14 @@ func defaultPredicates() sets.String { // Fit is determined by node disk pressure condition. factory.RegisterFitPredicate("CheckNodeDiskPressure", predicates.CheckNodeDiskPressurePredicate), + + // Fit is determined by volume zone requirements. + factory.RegisterFitPredicateFactory( + "NoVolumeNodeConflict", + func(args factory.PluginFactoryArgs) algorithm.FitPredicate { + return predicates.NewVolumeNodePredicate(args.PVInfo, args.PVCInfo, nil) + }, + ), ) } diff --git a/staging/src/k8s.io/client-go/pkg/api/helper/helpers.go b/staging/src/k8s.io/client-go/pkg/api/helper/helpers.go index 3bce0e54be9..068877d5e2d 100644 --- a/staging/src/k8s.io/client-go/pkg/api/helper/helpers.go +++ b/staging/src/k8s.io/client-go/pkg/api/helper/helpers.go @@ -590,6 +590,10 @@ func GetStorageNodeAffinityFromAnnotation(annotations map[string]string) (*api.N // Converts NodeAffinity type to Alpha annotation for use in PersistentVolumes // TODO: update when storage node affinity graduates to beta func StorageNodeAffinityToAlphaAnnotation(annotations map[string]string, affinity *api.NodeAffinity) error { + if affinity == nil { + return nil + } + json, err := json.Marshal(*affinity) if err != nil { return err diff --git a/test/e2e/framework/pv_util.go b/test/e2e/framework/pv_util.go index dc6bf8693db..5a913c20b83 100644 --- a/test/e2e/framework/pv_util.go +++ b/test/e2e/framework/pv_util.go @@ -35,6 +35,7 @@ import ( "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/v1" + "k8s.io/kubernetes/pkg/api/v1/helper" "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" awscloud "k8s.io/kubernetes/pkg/cloudprovider/providers/aws" gcecloud "k8s.io/kubernetes/pkg/cloudprovider/providers/gce" @@ -73,11 +74,13 @@ type PVCMap map[types.NamespacedName]pvcval // }, // } type PersistentVolumeConfig struct { - PVSource v1.PersistentVolumeSource - Prebind *v1.PersistentVolumeClaim - ReclaimPolicy v1.PersistentVolumeReclaimPolicy - NamePrefix string - Labels labels.Set + PVSource v1.PersistentVolumeSource + Prebind *v1.PersistentVolumeClaim + ReclaimPolicy v1.PersistentVolumeReclaimPolicy + NamePrefix string + Labels labels.Set + StorageClassName string + NodeAffinity *v1.NodeAffinity } // PersistentVolumeClaimConfig is consumed by MakePersistentVolumeClaim() to generate a PVC object. @@ -85,9 +88,10 @@ type PersistentVolumeConfig struct { // (+optional) Annotations defines the PVC's annotations type PersistentVolumeClaimConfig struct { - AccessModes []v1.PersistentVolumeAccessMode - Annotations map[string]string - Selector *metav1.LabelSelector + AccessModes []v1.PersistentVolumeAccessMode + Annotations map[string]string + Selector *metav1.LabelSelector + StorageClassName *string } // Clean up a pv and pvc in a single pv/pvc test case. @@ -561,7 +565,7 @@ func makePvcKey(ns, name string) types.NamespacedName { // is assigned, assumes "Retain". Specs are expected to match the test's PVC. // Note: the passed-in claim does not have a name until it is created and thus the PV's // ClaimRef cannot be completely filled-in in this func. Therefore, the ClaimRef's name -// is added later in createPVCPV. +// is added later in CreatePVCPV. func MakePersistentVolume(pvConfig PersistentVolumeConfig) *v1.PersistentVolume { var claimRef *v1.ObjectReference // If the reclaimPolicy is not provided, assume Retain @@ -575,7 +579,7 @@ func MakePersistentVolume(pvConfig PersistentVolumeConfig) *v1.PersistentVolume Namespace: pvConfig.Prebind.Namespace, } } - return &v1.PersistentVolume{ + pv := &v1.PersistentVolume{ ObjectMeta: metav1.ObjectMeta{ GenerateName: pvConfig.NamePrefix, Labels: pvConfig.Labels, @@ -594,9 +598,16 @@ func MakePersistentVolume(pvConfig PersistentVolumeConfig) *v1.PersistentVolume v1.ReadOnlyMany, v1.ReadWriteMany, }, - ClaimRef: claimRef, + ClaimRef: claimRef, + StorageClassName: pvConfig.StorageClassName, }, } + err := helper.StorageNodeAffinityToAlphaAnnotation(pv.Annotations, pvConfig.NodeAffinity) + if err != nil { + Logf("Setting storage node affinity failed: %v", err) + return nil + } + return pv } // Returns a PVC definition based on the namespace. @@ -625,6 +636,7 @@ func MakePersistentVolumeClaim(cfg PersistentVolumeClaimConfig, ns string) *v1.P v1.ResourceName(v1.ResourceStorage): resource.MustParse("1Gi"), }, }, + StorageClassName: cfg.StorageClassName, }, } } diff --git a/test/e2e/framework/volume_util.go b/test/e2e/framework/volume_util.go index e22a42b7685..718a84ad2e0 100644 --- a/test/e2e/framework/volume_util.go +++ b/test/e2e/framework/volume_util.go @@ -76,11 +76,18 @@ type VolumeTestConfig struct { ServerImage string // Ports to export from the server pod. TCP only. ServerPorts []int + // Commands to run in the container image. + ServerCmds []string // Arguments to pass to the container image. ServerArgs []string // Volumes needed to be mounted to the server container from the host // map -> ServerVolumes map[string]string + // Wait for the pod to terminate successfully + // False indicates that the pod is long running + WaitForCompletion bool + // NodeName to run pod on. Default is any node. + NodeName string } // VolumeTest contains a volume to mount into a client pod and its @@ -133,6 +140,11 @@ func StartVolumeServer(client clientset.Interface, config VolumeTestConfig) *v1. By(fmt.Sprint("creating ", serverPodName, " pod")) privileged := new(bool) *privileged = true + + restartPolicy := v1.RestartPolicyAlways + if config.WaitForCompletion { + restartPolicy = v1.RestartPolicyNever + } serverPod := &v1.Pod{ TypeMeta: metav1.TypeMeta{ Kind: "Pod", @@ -153,12 +165,15 @@ func StartVolumeServer(client clientset.Interface, config VolumeTestConfig) *v1. SecurityContext: &v1.SecurityContext{ Privileged: privileged, }, + Command: config.ServerCmds, Args: config.ServerArgs, Ports: serverPodPorts, VolumeMounts: mounts, }, }, - Volumes: volumes, + Volumes: volumes, + RestartPolicy: restartPolicy, + NodeName: config.NodeName, }, } @@ -176,12 +191,16 @@ func StartVolumeServer(client clientset.Interface, config VolumeTestConfig) *v1. ExpectNoError(err, "Failed to create %q pod: %v", serverPodName, err) } } - ExpectNoError(WaitForPodRunningInNamespace(client, serverPod)) - - if pod == nil { - By(fmt.Sprintf("locating the %q server pod", serverPodName)) - pod, err = podClient.Get(serverPodName, metav1.GetOptions{}) - ExpectNoError(err, "Cannot locate the server pod %q: %v", serverPodName, err) + if config.WaitForCompletion { + ExpectNoError(WaitForPodSuccessInNamespace(client, serverPod.Name, serverPod.Namespace)) + ExpectNoError(podClient.Delete(serverPod.Name, nil)) + } else { + ExpectNoError(WaitForPodRunningInNamespace(client, serverPod)) + if pod == nil { + By(fmt.Sprintf("locating the %q server pod", serverPodName)) + pod, err = podClient.Get(serverPodName, metav1.GetOptions{}) + ExpectNoError(err, "Cannot locate the server pod %q: %v", serverPodName, err) + } } return pod } diff --git a/test/e2e/storage/BUILD b/test/e2e/storage/BUILD index eb3fdd0eb7c..44bad63721d 100644 --- a/test/e2e/storage/BUILD +++ b/test/e2e/storage/BUILD @@ -14,6 +14,7 @@ go_library( "persistent_volumes.go", "persistent_volumes-disruptive.go", "persistent_volumes-gce.go", + "persistent_volumes-local.go", "persistent_volumes-vsphere.go", "pv_reclaimpolicy.go", "pvc_label_selector.go", diff --git a/test/e2e/storage/persistent_volumes-local.go b/test/e2e/storage/persistent_volumes-local.go new file mode 100644 index 00000000000..0097d96b701 --- /dev/null +++ b/test/e2e/storage/persistent_volumes-local.go @@ -0,0 +1,245 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package storage + +import ( + "fmt" + "path/filepath" + + . "github.com/onsi/ginkgo" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + utilerrors "k8s.io/apimachinery/pkg/util/errors" + "k8s.io/apimachinery/pkg/util/uuid" + "k8s.io/kubernetes/pkg/api/v1" + "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" + "k8s.io/kubernetes/test/e2e/framework" +) + +type localTestConfig struct { + ns string + nodes []v1.Node + client clientset.Interface +} + +type localTestVolume struct { + // Node that the volume is on + node *v1.Node + // Path to the volume on the host node + hostDir string + // Path to the volume in the local util container + containerDir string + // PVC for this volume + pvc *v1.PersistentVolumeClaim + // PV for this volume + pv *v1.PersistentVolume +} + +const ( + // TODO: This may not be available/writable on all images. + hostBase = "/tmp" + containerBase = "/myvol" + testFile = "test-file" + testContents = "testdata" + testSC = "local-test-storagclass" +) + +var _ = framework.KubeDescribe("[Volume] PersistentVolumes-local [Feature:LocalPersistentVolumes] [Serial]", func() { + f := framework.NewDefaultFramework("persistent-local-volumes-test") + + var ( + config *localTestConfig + ) + + BeforeEach(func() { + config = &localTestConfig{ + ns: f.Namespace.Name, + client: f.ClientSet, + nodes: []v1.Node{}, + } + + // Get all the schedulable nodes + nodes, err := config.client.CoreV1().Nodes().List(metav1.ListOptions{}) + if err != nil { + framework.Failf("Failed to get nodes: %v", err) + } + + for _, node := range nodes.Items { + if !node.Spec.Unschedulable { + // TODO: does this need to be a deep copy + config.nodes = append(config.nodes, node) + } + } + if len(config.nodes) == 0 { + framework.Failf("No available nodes for scheduling") + } + }) + + Context("when one pod requests one prebound PVC", func() { + var ( + testVol *localTestVolume + node *v1.Node + ) + + BeforeEach(func() { + // Choose the first node + node = &config.nodes[0] + }) + + AfterEach(func() { + cleanupLocalVolume(config, testVol) + testVol = nil + }) + + It("should be able to mount and read from the volume", func() { + By("Initializing test volume") + testVol = setupLocalVolume(config, node) + + By("Creating local PVC and PV") + createLocalPVCPV(config, testVol) + + By("Creating a pod to consume the PV") + readCmd := fmt.Sprintf("cat /mnt/volume1/%s", testFile) + podSpec := createLocalPod(config, testVol, readCmd) + f.TestContainerOutput("pod consumes PV", podSpec, 0, []string{testContents}) + }) + + It("should be able to mount and write to the volume", func() { + By("Initializing test volume") + testVol = setupLocalVolume(config, node) + + By("Creating local PVC and PV") + createLocalPVCPV(config, testVol) + + By("Creating a pod to write to the PV") + testFilePath := filepath.Join("/mnt/volume1", testFile) + cmd := fmt.Sprintf("echo %s > %s; cat %s", testVol.hostDir, testFilePath, testFilePath) + podSpec := createLocalPod(config, testVol, cmd) + f.TestContainerOutput("pod writes to PV", podSpec, 0, []string{testVol.hostDir}) + }) + }) +}) + +// Launches a pod with hostpath volume on a specific node to setup a directory to use +// for the local PV +func setupLocalVolume(config *localTestConfig, node *v1.Node) *localTestVolume { + testDirName := "local-volume-test-" + string(uuid.NewUUID()) + testDir := filepath.Join(containerBase, testDirName) + hostDir := filepath.Join(hostBase, testDirName) + testFilePath := filepath.Join(testDir, testFile) + writeCmd := fmt.Sprintf("mkdir %s; echo %s > %s", testDir, testContents, testFilePath) + framework.Logf("Creating local volume on node %q at path %q", node.Name, hostDir) + + runLocalUtil(config, node.Name, writeCmd) + return &localTestVolume{ + node: node, + hostDir: hostDir, + containerDir: testDir, + } +} + +// Deletes the PVC/PV, and launches a pod with hostpath volume to remove the test directory +func cleanupLocalVolume(config *localTestConfig, volume *localTestVolume) { + if volume == nil { + return + } + + By("Cleaning up PVC and PV") + errs := framework.PVPVCCleanup(config.client, config.ns, volume.pv, volume.pvc) + if len(errs) > 0 { + framework.Logf("AfterEach: Failed to delete PV and/or PVC: %v", utilerrors.NewAggregate(errs)) + } + + By("Removing the test directory") + removeCmd := fmt.Sprintf("rm -r %s", volume.containerDir) + runLocalUtil(config, volume.node.Name, removeCmd) +} + +func runLocalUtil(config *localTestConfig, nodeName, cmd string) { + framework.StartVolumeServer(config.client, framework.VolumeTestConfig{ + Namespace: config.ns, + Prefix: "local-volume-init", + ServerImage: "gcr.io/google_containers/busybox:1.24", + ServerCmds: []string{"/bin/sh"}, + ServerArgs: []string{"-c", cmd}, + ServerVolumes: map[string]string{ + hostBase: containerBase, + }, + WaitForCompletion: true, + NodeName: nodeName, + }) +} + +func makeLocalPVCConfig() framework.PersistentVolumeClaimConfig { + sc := testSC + return framework.PersistentVolumeClaimConfig{ + AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadWriteOnce}, + StorageClassName: &sc, + } +} + +func makeLocalPVConfig(volume *localTestVolume) framework.PersistentVolumeConfig { + // TODO: hostname may not be the best option + nodeKey := "kubernetes.io/hostname" + if volume.node.Labels == nil { + framework.Failf("Node does not have labels") + } + nodeValue, found := volume.node.Labels[nodeKey] + if !found { + framework.Failf("Node does not have required label %q", nodeKey) + } + + return framework.PersistentVolumeConfig{ + PVSource: v1.PersistentVolumeSource{ + Local: &v1.LocalVolumeSource{ + Path: volume.hostDir, + }, + }, + NamePrefix: "local-pv", + StorageClassName: testSC, + NodeAffinity: &v1.NodeAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{ + NodeSelectorTerms: []v1.NodeSelectorTerm{ + { + MatchExpressions: []v1.NodeSelectorRequirement{ + { + Key: nodeKey, + Operator: v1.NodeSelectorOpIn, + Values: []string{nodeValue}, + }, + }, + }, + }, + }, + }, + } +} + +// Creates a PVC and PV with prebinding +func createLocalPVCPV(config *localTestConfig, volume *localTestVolume) { + pvcConfig := makeLocalPVCConfig() + pvConfig := makeLocalPVConfig(volume) + + var err error + volume.pv, volume.pvc, err = framework.CreatePVPVC(config.client, pvConfig, pvcConfig, config.ns, true) + framework.ExpectNoError(err) + framework.WaitOnPVandPVC(config.client, config.ns, volume.pv, volume.pvc) +} + +func createLocalPod(config *localTestConfig, volume *localTestVolume, cmd string) *v1.Pod { + return framework.MakePod(config.ns, []*v1.PersistentVolumeClaim{volume.pvc}, false, cmd) +}