diff --git a/pkg/volume/plugins.go b/pkg/volume/plugins.go index 0b607d5cc06..a6abfd3cd2d 100644 --- a/pkg/volume/plugins.go +++ b/pkg/volume/plugins.go @@ -241,6 +241,7 @@ type VolumeHost interface { type VolumePluginMgr struct { mutex sync.Mutex plugins map[string]VolumePlugin + Host VolumeHost } // Spec is an internal representation of a volume. All API volume types translate to Spec. @@ -339,6 +340,7 @@ func (pm *VolumePluginMgr) InitPlugins(plugins []VolumePlugin, host VolumeHost) pm.mutex.Lock() defer pm.mutex.Unlock() + pm.Host = host if pm.plugins == nil { pm.plugins = map[string]VolumePlugin{} } diff --git a/pkg/volume/util/BUILD b/pkg/volume/util/BUILD index 30bc0164826..a70b367a070 100644 --- a/pkg/volume/util/BUILD +++ b/pkg/volume/util/BUILD @@ -29,6 +29,7 @@ go_library( "//vendor/github.com/golang/glog:go_default_library", "//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/labels:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library", ], ) @@ -38,6 +39,7 @@ go_test( srcs = [ "atomic_writer_test.go", "device_util_linux_test.go", + "util_test.go", ], library = ":go_default_library", tags = ["automanaged"], diff --git a/pkg/volume/util/operationexecutor/BUILD b/pkg/volume/util/operationexecutor/BUILD index ec455f186bc..5bee309d255 100644 --- a/pkg/volume/util/operationexecutor/BUILD +++ b/pkg/volume/util/operationexecutor/BUILD @@ -18,6 +18,7 @@ go_library( deps = [ "//pkg/api/v1:go_default_library", "//pkg/client/clientset_generated/clientset:go_default_library", + "//pkg/features:go_default_library", "//pkg/kubelet/events:go_default_library", "//pkg/util/mount:go_default_library", "//pkg/volume:go_default_library", @@ -28,6 +29,7 @@ go_library( "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/types:go_default_library", + "//vendor/k8s.io/apiserver/pkg/util/feature:go_default_library", "//vendor/k8s.io/client-go/tools/record:go_default_library", ], ) diff --git a/pkg/volume/util/operationexecutor/operation_generator.go b/pkg/volume/util/operationexecutor/operation_generator.go index f6ced832005..12a065d4f5a 100644 --- a/pkg/volume/util/operationexecutor/operation_generator.go +++ b/pkg/volume/util/operationexecutor/operation_generator.go @@ -24,12 +24,15 @@ import ( "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/tools/record" "k8s.io/kubernetes/pkg/api/v1" "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" + "k8s.io/kubernetes/pkg/features" kevents "k8s.io/kubernetes/pkg/kubelet/events" "k8s.io/kubernetes/pkg/util/mount" "k8s.io/kubernetes/pkg/volume" + "k8s.io/kubernetes/pkg/volume/util" "k8s.io/kubernetes/pkg/volume/util/volumehelper" ) @@ -362,6 +365,11 @@ func (og *operationGenerator) GenerateMountVolumeFunc( return nil, volumeToMount.GenerateErrorDetailed("MountVolume.FindPluginBySpec failed", err) } + affinityErr := checkNodeAffinity(og, volumeToMount, volumePlugin) + if affinityErr != nil { + return nil, affinityErr + } + volumeMounter, newMounterErr := volumePlugin.NewMounter( volumeToMount.VolumeSpec, volumeToMount.Pod, @@ -708,3 +716,27 @@ func checkMountOptionSupport(og *operationGenerator, volumeToMount VolumeToMount } return nil } + +// checkNodeAffinity looks at the PV node affinity, and checks if the node has the same corresponding labels +// This ensures that we don't mount a volume that doesn't belong to this node +func checkNodeAffinity(og *operationGenerator, volumeToMount VolumeToMount, plugin volume.VolumePlugin) error { + if !utilfeature.DefaultFeatureGate.Enabled(features.PersistentLocalVolumes) { + return nil + } + + pv := volumeToMount.VolumeSpec.PersistentVolume + if pv != nil { + nodeLabels, err := og.volumePluginMgr.Host.GetNodeLabels() + if err != nil { + return volumeToMount.GenerateErrorDetailed("Error getting node labels", err) + } + + err = util.CheckNodeAffinity(pv, nodeLabels) + if err != nil { + eventErr, detailedErr := volumeToMount.GenerateError("Storage node affinity check failed", err) + og.recorder.Eventf(volumeToMount.Pod, v1.EventTypeWarning, kevents.FailedMountVolume, eventErr.Error()) + return detailedErr + } + } + return nil +} diff --git a/pkg/volume/util/util.go b/pkg/volume/util/util.go index 97561e7e658..a9ab2845a70 100644 --- a/pkg/volume/util/util.go +++ b/pkg/volume/util/util.go @@ -23,6 +23,7 @@ import ( "github.com/golang/glog" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" "k8s.io/kubernetes/pkg/api/v1" v1helper "k8s.io/kubernetes/pkg/api/v1/helper" storage "k8s.io/kubernetes/pkg/apis/storage/v1" @@ -164,3 +165,30 @@ func GetClassForVolume(kubeClient clientset.Interface, pv *v1.PersistentVolume) } return class, nil } + +// CheckNodeAffinity looks at the PV node affinity, and checks if the node has the same corresponding labels +// This ensures that we don't mount a volume that doesn't belong to this node +func CheckNodeAffinity(pv *v1.PersistentVolume, nodeLabels map[string]string) error { + affinity, err := v1helper.GetStorageNodeAffinityFromAnnotation(pv.Annotations) + if err != nil { + return fmt.Errorf("Error getting storage node affinity: %v", err) + } + if affinity == nil { + return nil + } + + if affinity.RequiredDuringSchedulingIgnoredDuringExecution != nil { + terms := affinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms + glog.V(10).Infof("Match for RequiredDuringSchedulingIgnoredDuringExecution node selector terms %+v", terms) + for _, term := range terms { + selector, err := v1helper.NodeSelectorRequirementsAsSelector(term.MatchExpressions) + if err != nil { + return fmt.Errorf("Failed to parse MatchExpressions: %v", err) + } + if !selector.Matches(labels.Set(nodeLabels)) { + return fmt.Errorf("NodeSelectorTerm %+v does not match node labels", term.MatchExpressions) + } + } + } + return nil +} diff --git a/pkg/volume/util/util_test.go b/pkg/volume/util/util_test.go new file mode 100644 index 00000000000..f491d45a404 --- /dev/null +++ b/pkg/volume/util/util_test.go @@ -0,0 +1,142 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package util + +import ( + "testing" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/kubernetes/pkg/api/v1" + "k8s.io/kubernetes/pkg/api/v1/helper" +) + +var nodeLabels map[string]string = map[string]string{ + "test-key1": "test-value1", + "test-key2": "test-value2", +} + +func TestCheckNodeAffinity(t *testing.T) { + type affinityTest struct { + name string + expectSuccess bool + pv *v1.PersistentVolume + } + + cases := []affinityTest{ + { + name: "valid-no-constraints", + expectSuccess: true, + pv: testVolumeWithNodeAffinity(t, &v1.NodeAffinity{}), + }, + { + name: "valid-constraints", + expectSuccess: true, + pv: testVolumeWithNodeAffinity(t, &v1.NodeAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{ + NodeSelectorTerms: []v1.NodeSelectorTerm{ + { + MatchExpressions: []v1.NodeSelectorRequirement{ + { + Key: "test-key1", + Operator: v1.NodeSelectorOpIn, + Values: []string{"test-value1", "test-value3"}, + }, + { + Key: "test-key2", + Operator: v1.NodeSelectorOpIn, + Values: []string{"test-value0", "test-value2"}, + }, + }, + }, + }, + }, + }), + }, + { + name: "invalid-key", + expectSuccess: false, + pv: testVolumeWithNodeAffinity(t, &v1.NodeAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{ + NodeSelectorTerms: []v1.NodeSelectorTerm{ + { + MatchExpressions: []v1.NodeSelectorRequirement{ + { + Key: "test-key1", + Operator: v1.NodeSelectorOpIn, + Values: []string{"test-value1", "test-value3"}, + }, + { + Key: "test-key3", + Operator: v1.NodeSelectorOpIn, + Values: []string{"test-value0", "test-value2"}, + }, + }, + }, + }, + }, + }), + }, + { + name: "invalid-values", + expectSuccess: false, + pv: testVolumeWithNodeAffinity(t, &v1.NodeAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{ + NodeSelectorTerms: []v1.NodeSelectorTerm{ + { + MatchExpressions: []v1.NodeSelectorRequirement{ + { + Key: "test-key1", + Operator: v1.NodeSelectorOpIn, + Values: []string{"test-value3", "test-value4"}, + }, + { + Key: "test-key2", + Operator: v1.NodeSelectorOpIn, + Values: []string{"test-value0", "test-value2"}, + }, + }, + }, + }, + }, + }), + }, + } + + for _, c := range cases { + err := CheckNodeAffinity(c.pv, nodeLabels) + + if err != nil && c.expectSuccess { + t.Errorf("CheckTopology %v returned error: %v", c.name, err) + } + if err == nil && !c.expectSuccess { + t.Errorf("CheckTopology %v returned success, expected error", c.name) + } + } +} + +func testVolumeWithNodeAffinity(t *testing.T, affinity *v1.NodeAffinity) *v1.PersistentVolume { + objMeta := metav1.ObjectMeta{Name: "test-constraints"} + objMeta.Annotations = map[string]string{} + err := helper.StorageNodeAffinityToAlphaAnnotation(objMeta.Annotations, affinity) + if err != nil { + t.Fatalf("Failed to get node affinity annotation: %v", err) + } + + return &v1.PersistentVolume{ + ObjectMeta: objMeta, + } +}