diff --git a/staging/src/k8s.io/csi-translation-lib/BUILD b/staging/src/k8s.io/csi-translation-lib/BUILD index 64333e4fba2..cf7e98cc4c5 100644 --- a/staging/src/k8s.io/csi-translation-lib/BUILD +++ b/staging/src/k8s.io/csi-translation-lib/BUILD @@ -17,7 +17,11 @@ go_test( name = "go_default_test", srcs = ["translate_test.go"], embed = [":go_default_library"], - deps = ["//staging/src/k8s.io/api/core/v1:go_default_library"], + deps = [ + "//staging/src/k8s.io/api/core/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//staging/src/k8s.io/csi-translation-lib/plugins:go_default_library", + ], ) filegroup( diff --git a/staging/src/k8s.io/csi-translation-lib/plugins/aws_ebs.go b/staging/src/k8s.io/csi-translation-lib/plugins/aws_ebs.go index 03eee92ad18..5cbb389f66b 100644 --- a/staging/src/k8s.io/csi-translation-lib/plugins/aws_ebs.go +++ b/staging/src/k8s.io/csi-translation-lib/plugins/aws_ebs.go @@ -31,6 +31,8 @@ import ( const ( // AWSEBSDriverName is the name of the CSI driver for EBS AWSEBSDriverName = "ebs.csi.aws.com" + // AWSEBSTopologyKey is the zonal topology key for AWS EBS CSI Driver + AWSEBSTopologyKey = "topology.ebs.csi.aws.com/zone" // AWSEBSInTreePluginName is the name of the intree plugin for EBS AWSEBSInTreePluginName = "kubernetes.io/aws-ebs" ) @@ -108,6 +110,10 @@ func (t *awsElasticBlockStoreCSITranslator) TranslateInTreePVToCSI(pv *v1.Persis }, } + if err := translateTopology(pv, AWSEBSTopologyKey); err != nil { + return nil, fmt.Errorf("failed to translate topology: %v", err) + } + pv.Spec.AWSElasticBlockStore = nil pv.Spec.CSI = csiSource return pv, nil diff --git a/staging/src/k8s.io/csi-translation-lib/plugins/gce_pd.go b/staging/src/k8s.io/csi-translation-lib/plugins/gce_pd.go index 7f07c814bb4..db93112f7ca 100644 --- a/staging/src/k8s.io/csi-translation-lib/plugins/gce_pd.go +++ b/staging/src/k8s.io/csi-translation-lib/plugins/gce_pd.go @@ -274,6 +274,10 @@ func (g *gcePersistentDiskCSITranslator) TranslateInTreePVToCSI(pv *v1.Persisten }, } + if err := translateTopology(pv, GCEPDTopologyKey); err != nil { + return nil, fmt.Errorf("failed to translate topology: %v", err) + } + pv.Spec.PersistentVolumeSource.GCEPersistentDisk = nil pv.Spec.PersistentVolumeSource.CSI = csiSource pv.Spec.AccessModes = backwardCompatibleAccessModes(pv.Spec.AccessModes) diff --git a/staging/src/k8s.io/csi-translation-lib/plugins/in_tree_volume.go b/staging/src/k8s.io/csi-translation-lib/plugins/in_tree_volume.go index 1415cbc7036..5fca9f6a960 100644 --- a/staging/src/k8s.io/csi-translation-lib/plugins/in_tree_volume.go +++ b/staging/src/k8s.io/csi-translation-lib/plugins/in_tree_volume.go @@ -17,8 +17,13 @@ limitations under the License. package plugins import ( + "fmt" + "strings" + v1 "k8s.io/api/core/v1" storage "k8s.io/api/storage/v1" + "k8s.io/apimachinery/pkg/util/sets" + cloudvolume "k8s.io/cloud-provider/volume" ) // InTreePlugin handles translations between CSI and in-tree sources in a PV @@ -59,3 +64,82 @@ type InTreePlugin interface { // RepairVolumeHandle generates a correct volume handle based on node ID information. RepairVolumeHandle(volumeHandle, nodeID string) (string, error) } + +// getTopology returns the current topology zones with the given key. +func getTopology(pv *v1.PersistentVolume, key string) []string { + if pv.Spec.NodeAffinity == nil || + pv.Spec.NodeAffinity.Required == nil || + len(pv.Spec.NodeAffinity.Required.NodeSelectorTerms) < 1 { + return nil + } + for i := range pv.Spec.NodeAffinity.Required.NodeSelectorTerms { + for _, r := range pv.Spec.NodeAffinity.Required.NodeSelectorTerms[i].MatchExpressions { + if r.Key == key { + return r.Values + } + } + } + return nil +} + +// recordTopology writes the topology to the given PV. +// If topology already exists with the given key, assume the content is already accurate. +func recordTopology(pv *v1.PersistentVolume, key string, zones []string) error { + // Make sure there are no duplicate or empty strings + filteredZones := sets.String{} + for i := range zones { + zone := strings.TrimSpace(zones[i]) + if len(zone) > 0 { + filteredZones.Insert(zone) + } + } + + if filteredZones.Len() < 1 { + return fmt.Errorf("there are no valid zones for topology") + } + + // Make sure the necessary fields exist + if pv.Spec.NodeAffinity == nil { + pv.Spec.NodeAffinity = new(v1.VolumeNodeAffinity) + } + + if pv.Spec.NodeAffinity.Required == nil { + pv.Spec.NodeAffinity.Required = new(v1.NodeSelector) + } + + if len(pv.Spec.NodeAffinity.Required.NodeSelectorTerms) == 0 { + pv.Spec.NodeAffinity.Required.NodeSelectorTerms = make([]v1.NodeSelectorTerm, 1) + } + + // Don't overwrite if topology is already set + if len(getTopology(pv, key)) > 0 { + return nil + } + + pv.Spec.NodeAffinity.Required.NodeSelectorTerms[0].MatchExpressions = append(pv.Spec.NodeAffinity.Required.NodeSelectorTerms[0].MatchExpressions, + v1.NodeSelectorRequirement{ + Key: key, + Operator: v1.NodeSelectorOpIn, + Values: filteredZones.List(), + }) + + return nil +} + +// translateTopology converts existing zone labels or in-tree topology to CSI topology. +// In-tree topology has precedence over zone labels. +func translateTopology(pv *v1.PersistentVolume, topologyKey string) error { + zones := getTopology(pv, v1.LabelZoneFailureDomain) + if len(zones) > 0 { + return recordTopology(pv, topologyKey, zones) + } + + if label, ok := pv.Labels[v1.LabelZoneFailureDomain]; ok { + zones = strings.Split(label, cloudvolume.LabelMultiZoneDelimiter) + if len(zones) > 0 { + return recordTopology(pv, topologyKey, zones) + } + } + + return nil +} diff --git a/staging/src/k8s.io/csi-translation-lib/plugins/openstack_cinder.go b/staging/src/k8s.io/csi-translation-lib/plugins/openstack_cinder.go index abf578fb4a5..33252bad340 100644 --- a/staging/src/k8s.io/csi-translation-lib/plugins/openstack_cinder.go +++ b/staging/src/k8s.io/csi-translation-lib/plugins/openstack_cinder.go @@ -27,6 +27,8 @@ import ( const ( // CinderDriverName is the name of the CSI driver for Cinder CinderDriverName = "cinder.csi.openstack.org" + // CinderTopologyKey is the zonal topology key for Cinder CSI Driver + CinderTopologyKey = "topology.cinder.csi.openstack.org/zone" // CinderInTreePluginName is the name of the intree plugin for Cinder CinderInTreePluginName = "kubernetes.io/cinder" ) @@ -92,6 +94,10 @@ func (t *osCinderCSITranslator) TranslateInTreePVToCSI(pv *v1.PersistentVolume) VolumeAttributes: map[string]string{}, } + if err := translateTopology(pv, CinderTopologyKey); err != nil { + return nil, fmt.Errorf("failed to translate topology: %v", err) + } + pv.Spec.Cinder = nil pv.Spec.CSI = csiSource return pv, nil diff --git a/staging/src/k8s.io/csi-translation-lib/translate_test.go b/staging/src/k8s.io/csi-translation-lib/translate_test.go index a78f46ae923..78047e90261 100644 --- a/staging/src/k8s.io/csi-translation-lib/translate_test.go +++ b/staging/src/k8s.io/csi-translation-lib/translate_test.go @@ -20,7 +20,9 @@ import ( "reflect" "testing" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/csi-translation-lib/plugins" ) func TestTranslationStability(t *testing.T) { @@ -77,6 +79,117 @@ func TestTranslationStability(t *testing.T) { } } +func TestZoneTranslation(t *testing.T) { + testCases := []struct { + name string + pv *v1.PersistentVolume + topologyKey string + }{ + { + name: "GCE PD PV Source", + pv: &v1.PersistentVolume{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + v1.LabelZoneFailureDomain: "us-east-1", + v1.LabelZoneRegion: "us-east-1a", + }, + }, + Spec: v1.PersistentVolumeSpec{ + PersistentVolumeSource: v1.PersistentVolumeSource{ + GCEPersistentDisk: &v1.GCEPersistentDiskVolumeSource{ + PDName: "test-disk", + FSType: "ext4", + Partition: 0, + ReadOnly: false, + }, + }, + }, + }, + topologyKey: plugins.GCEPDTopologyKey, + }, + { + name: "AWS EBS PV Source", + pv: &v1.PersistentVolume{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + v1.LabelZoneFailureDomain: "us-east-1", + v1.LabelZoneRegion: "us-east-1a", + }, + }, + Spec: v1.PersistentVolumeSpec{ + PersistentVolumeSource: v1.PersistentVolumeSource{ + AWSElasticBlockStore: &v1.AWSElasticBlockStoreVolumeSource{ + VolumeID: "vol01", + FSType: "ext3", + Partition: 1, + ReadOnly: true, + }, + }, + }, + }, + topologyKey: plugins.AWSEBSTopologyKey, + }, + { + name: "Cinder PV Source", + pv: &v1.PersistentVolume{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + v1.LabelZoneFailureDomain: "us-east-1", + v1.LabelZoneRegion: "us-east-1a", + }, + }, + Spec: v1.PersistentVolumeSpec{ + PersistentVolumeSource: v1.PersistentVolumeSource{ + Cinder: &v1.CinderPersistentVolumeSource{ + VolumeID: "vol1", + FSType: "ext4", + ReadOnly: false, + }, + }, + }, + }, + topologyKey: plugins.CinderTopologyKey, + }, + } + for _, test := range testCases { + ctl := New() + t.Logf("Testing %v", test.name) + + zone := test.pv.ObjectMeta.Labels[v1.LabelZoneFailureDomain] + + // Translate to CSI PV and check translated node affinity + newCSIPV, err := ctl.TranslateInTreePVToCSI(test.pv) + if err != nil { + t.Errorf("Error when translating to CSI: %v", err) + } + + if !isNodeAffinitySet(newCSIPV, test.topologyKey, zone) { + t.Errorf("Volume after translation lacks topology: %#v", newCSIPV) + } + + // Translate back to in-tree and make sure node affinity is still set + newInTreePV, err := ctl.TranslateCSIPVToInTree(newCSIPV) + if err != nil { + t.Errorf("Error when translating to in-tree: %v", err) + } + + if !isNodeAffinitySet(newInTreePV, test.topologyKey, zone) { + t.Errorf("Volume after translation lacks topology: %#v", newInTreePV) + } + } +} + +func isNodeAffinitySet(pv *v1.PersistentVolume, topologyKey, zone string) bool { + for i := range pv.Spec.NodeAffinity.Required.NodeSelectorTerms { + for _, r := range pv.Spec.NodeAffinity.Required.NodeSelectorTerms[i].MatchExpressions { + if r.Key == topologyKey && r.Values[0] == zone { + return true + } + } + } + return false +} + func TestPluginNameMappings(t *testing.T) { testCases := []struct { name string