Move RegionTopologyHandler to in_tree_volume

This commit is contained in:
Ayberk Yilmaz 2021-02-19 20:15:43 +00:00
parent 339b8b450f
commit e500271b4a
4 changed files with 111 additions and 171 deletions

View File

@ -176,7 +176,7 @@ func (t *awsElasticBlockStoreCSITranslator) TranslateCSIPVToInTree(pv *v1.Persis
}
// translate CSI topology to In-tree topology for rollback compatibility
if err := translateTopologyFromCSIToInTree(pv, AWSEBSTopologyKey, awsRegionTopologyHandler); err != nil {
if err := translateTopologyFromCSIToInTree(pv, AWSEBSTopologyKey, getAwsRegionFromZones); err != nil {
return nil, fmt.Errorf("failed to translate topology. PV:%+v. Error:%v", *pv, err)
}
@ -260,66 +260,6 @@ func KubernetesVolumeIDToEBSVolumeID(kubernetesID string) (string, error) {
return awsID, nil
}
// This code is copied over from gce_pd as is. However, we're passing our own getRegionFromZones implementation.
// It might be worth moving this to a common place and pass that function as a parameter.
func awsRegionTopologyHandler(pv *v1.PersistentVolume) error {
// Make sure the necessary fields exist
if pv == nil || pv.Spec.NodeAffinity == nil || pv.Spec.NodeAffinity.Required == nil ||
pv.Spec.NodeAffinity.Required.NodeSelectorTerms == nil || len(pv.Spec.NodeAffinity.Required.NodeSelectorTerms) == 0 {
return nil
}
zoneLabel, regionLabel := getTopologyLabel(pv)
// process each term
for index, nodeSelectorTerm := range pv.Spec.NodeAffinity.Required.NodeSelectorTerms {
// In the first loop, see if regionLabel already exist
regionExist := false
var zoneVals []string
for _, nsRequirement := range nodeSelectorTerm.MatchExpressions {
if nsRequirement.Key == regionLabel {
regionExist = true
break
} else if nsRequirement.Key == zoneLabel {
zoneVals = append(zoneVals, nsRequirement.Values...)
}
}
if regionExist {
// Regionlabel already exist in this term, skip it
continue
}
// If no regionLabel found, generate region label from the zoneLabel we collect from this term
regionVal, err := getAwsRegionFromZones(zoneVals)
if err != nil {
return err
}
// Add the regionVal to this term
pv.Spec.NodeAffinity.Required.NodeSelectorTerms[index].MatchExpressions =
append(pv.Spec.NodeAffinity.Required.NodeSelectorTerms[index].MatchExpressions, v1.NodeSelectorRequirement{
Key: regionLabel,
Operator: v1.NodeSelectorOpIn,
Values: []string{regionVal},
})
}
// Add region label
regionVals := getTopologyValues(pv, regionLabel)
if len(regionVals) == 1 {
// We should only have exactly 1 region value
if pv.Labels == nil {
pv.Labels = make(map[string]string)
}
_, regionOK := pv.Labels[regionLabel]
if !regionOK {
pv.Labels[regionLabel] = regionVals[0]
}
}
return nil
}
func getAwsRegionFromZones(zones []string) (string, error) {
regions := sets.String{}
if len(zones) < 1 {

View File

@ -227,7 +227,7 @@ func (g *gcePersistentDiskCSITranslator) TranslateInTreePVToCSI(pv *v1.Persisten
volID = fmt.Sprintf(volIDZonalFmt, UnspecifiedValue, zones[0], pv.Spec.GCEPersistentDisk.PDName)
} else if len(zones) > 1 {
// Regional
region, err := getRegionFromZones(zones)
region, err := gceGetRegionFromZones(zones)
if err != nil {
return nil, fmt.Errorf("failed to get region from zones: %v", err)
}
@ -292,7 +292,7 @@ func (g *gcePersistentDiskCSITranslator) TranslateCSIPVToInTree(pv *v1.Persisten
}
// translate CSI topology to In-tree topology for rollback compatibility
if err := translateTopologyFromCSIToInTree(pv, GCEPDTopologyKey, gceRegionTopologyHandler); err != nil {
if err := translateTopologyFromCSIToInTree(pv, GCEPDTopologyKey, gceGetRegionFromZones); err != nil {
return nil, fmt.Errorf("failed to translate topology. PV:%+v. Error:%v", *pv, err)
}
@ -356,7 +356,7 @@ func (g *gcePersistentDiskCSITranslator) RepairVolumeHandle(volumeHandle, nodeID
case "regions":
region := ""
if tok[volIDZoneValue] == UnspecifiedValue {
region, err = getRegionFromZones([]string{nodeTok[volIDZoneValue]})
region, err = gceGetRegionFromZones([]string{nodeTok[volIDZoneValue]})
if err != nil {
return "", fmt.Errorf("failed to get region from zone %s: %v", nodeTok[volIDZoneValue], err)
}
@ -377,70 +377,9 @@ func pdNameFromVolumeID(id string) (string, error) {
return splitID[volIDDiskNameValue], nil
}
// gceRegionTopologyHandler will process the PV and add region
// kubernetes topology label to its NodeAffinity and labels
// It assumes the Zone NodeAffinity already exists
func gceRegionTopologyHandler(pv *v1.PersistentVolume) error {
// Make sure the necessary fields exist
if pv == nil || pv.Spec.NodeAffinity == nil || pv.Spec.NodeAffinity.Required == nil ||
pv.Spec.NodeAffinity.Required.NodeSelectorTerms == nil || len(pv.Spec.NodeAffinity.Required.NodeSelectorTerms) == 0 {
return nil
}
zoneLabel, regionLabel := getTopologyLabel(pv)
// process each term
for index, nodeSelectorTerm := range pv.Spec.NodeAffinity.Required.NodeSelectorTerms {
// In the first loop, see if regionLabel already exist
regionExist := false
var zoneVals []string
for _, nsRequirement := range nodeSelectorTerm.MatchExpressions {
if nsRequirement.Key == regionLabel {
regionExist = true
break
} else if nsRequirement.Key == zoneLabel {
zoneVals = append(zoneVals, nsRequirement.Values...)
}
}
if regionExist {
// Regionlabel already exist in this term, skip it
continue
}
// If no regionLabel found, generate region label from the zoneLabel we collect from this term
regionVal, err := getRegionFromZones(zoneVals)
if err != nil {
return err
}
// Add the regionVal to this term
pv.Spec.NodeAffinity.Required.NodeSelectorTerms[index].MatchExpressions =
append(pv.Spec.NodeAffinity.Required.NodeSelectorTerms[index].MatchExpressions, v1.NodeSelectorRequirement{
Key: regionLabel,
Operator: v1.NodeSelectorOpIn,
Values: []string{regionVal},
})
}
// Add region label
regionVals := getTopologyValues(pv, regionLabel)
if len(regionVals) == 1 {
// We should only have exactly 1 region value
if pv.Labels == nil {
pv.Labels = make(map[string]string)
}
_, regionOK := pv.Labels[regionLabel]
if !regionOK {
pv.Labels[regionLabel] = regionVals[0]
}
}
return nil
}
// TODO: Replace this with the imported one from GCE PD CSI Driver when
// the driver removes all k8s/k8s dependencies
func getRegionFromZones(zones []string) (string, error) {
func gceGetRegionFromZones(zones []string) (string, error) {
regions := sets.String{}
if len(zones) < 1 {
return "", fmt.Errorf("no zones specified")

View File

@ -255,22 +255,22 @@ func TopologyKeyExist(key string, vna *v1.VolumeNodeAffinity) bool {
return false
}
type regionTopologyHandler func(pv *v1.PersistentVolume) error
type regionParserFn func([]string) (string, error)
// translateTopologyFromCSIToInTree translate a CSI topology to
// Kubernetes topology and add topology labels to it. Note that this function
// will only work for plugin with a single topologyKey that translates to
// Kubernetes zone(and region if regionTopologyHandler is passed in).
// Kubernetes zone(and region if regionParser is passed in).
// If a plugin has more than one topologyKey, it will need to be processed
// separately by the plugin.
// regionTopologyHandler is a function to add region topology NodeAffinity
// and labels for the given PV. It assumes the Zone NodeAffinity already exists
// If regionTopologyHandler is nil, no region NodeAffinity will be added
//
// If regionParser is nil, no region NodeAffinity will be added. If not nil,
// it'll be passed to regionTopologyHandler, which will add region topology NodeAffinity
// and labels for the given PV. It assumes the Zone NodeAffinity already exists.
// In short this function will,
// 1. Replace all CSI topology to Kubernetes Zone topology label
// 2. Process and generate region topology if a regionTopologyHandler is passed
// 2. Process and generate region topology if a regionParser is passed
// 3. Add Kubernetes Topology labels(zone) if they do not exist
func translateTopologyFromCSIToInTree(pv *v1.PersistentVolume, csiTopologyKey string, regionTopologyHandler regionTopologyHandler) error {
func translateTopologyFromCSIToInTree(pv *v1.PersistentVolume, csiTopologyKey string, regionParser regionParserFn) error {
zoneLabel, _ := getTopologyLabel(pv)
@ -280,10 +280,10 @@ func translateTopologyFromCSIToInTree(pv *v1.PersistentVolume, csiTopologyKey st
return fmt.Errorf("Failed to replace CSI topology to Kubernetes topology, error: %v", err)
}
// 2. Take care of region topology if a regionTopologyHandler is passed
if regionTopologyHandler != nil {
// 2. Take care of region topology if a regionParser is passed
if regionParser != nil {
// let's make less strict on this one. Even if there is an error in the region processing, just ignore it
err = regionTopologyHandler(pv)
err = regionTopologyHandler(pv, regionParser)
if err != nil {
return fmt.Errorf("Failed to handle region topology. error: %v", err)
}
@ -333,3 +333,65 @@ func translateAllowedTopologies(terms []v1.TopologySelectorTerm, key string) ([]
}
return newTopologies, nil
}
// regionTopologyHandler will process the PV and add region
// kubernetes topology label to its NodeAffinity and labels
// It assumes the Zone NodeAffinity already exists
// Each provider is responsible for providing their own regionParser
func regionTopologyHandler(pv *v1.PersistentVolume, regionParser regionParserFn) error {
// Make sure the necessary fields exist
if pv == nil || pv.Spec.NodeAffinity == nil || pv.Spec.NodeAffinity.Required == nil ||
pv.Spec.NodeAffinity.Required.NodeSelectorTerms == nil || len(pv.Spec.NodeAffinity.Required.NodeSelectorTerms) == 0 {
return nil
}
zoneLabel, regionLabel := getTopologyLabel(pv)
// process each term
for index, nodeSelectorTerm := range pv.Spec.NodeAffinity.Required.NodeSelectorTerms {
// In the first loop, see if regionLabel already exist
regionExist := false
var zoneVals []string
for _, nsRequirement := range nodeSelectorTerm.MatchExpressions {
if nsRequirement.Key == regionLabel {
regionExist = true
break
} else if nsRequirement.Key == zoneLabel {
zoneVals = append(zoneVals, nsRequirement.Values...)
}
}
if regionExist {
// Regionlabel already exist in this term, skip it
continue
}
// If no regionLabel found, generate region label from the zoneLabel we collect from this term
regionVal, err := regionParser(zoneVals)
if err != nil {
return err
}
// Add the regionVal to this term
pv.Spec.NodeAffinity.Required.NodeSelectorTerms[index].MatchExpressions =
append(pv.Spec.NodeAffinity.Required.NodeSelectorTerms[index].MatchExpressions, v1.NodeSelectorRequirement{
Key: regionLabel,
Operator: v1.NodeSelectorOpIn,
Values: []string{regionVal},
})
}
// Add region label
regionVals := getTopologyValues(pv, regionLabel)
if len(regionVals) == 1 {
// We should only have exactly 1 region value
if pv.Labels == nil {
pv.Labels = make(map[string]string)
}
_, regionOK := pv.Labels[regionLabel]
if !regionOK {
pv.Labels[regionLabel] = regionVals[0]
}
}
return nil
}

View File

@ -108,16 +108,16 @@ func TestTranslateTopologyFromCSIToInTree(t *testing.T) {
name string
key string
expErr bool
regionTopologyHandler regionTopologyHandler
regionParser regionParserFn
pv *v1.PersistentVolume
expectedNodeSelectorTerms []v1.NodeSelectorTerm
expectedLabels map[string]string
}{
{
name: "Remove CSI Topology Key and do not change existing GA Kubernetes topology",
key: GCEPDTopologyKey,
expErr: false,
regionTopologyHandler: gceRegionTopologyHandler,
name: "Remove CSI Topology Key and do not change existing GA Kubernetes topology",
key: GCEPDTopologyKey,
expErr: false,
regionParser: gceGetRegionFromZones,
pv: &v1.PersistentVolume{
ObjectMeta: metav1.ObjectMeta{
Name: "gcepd", Namespace: "myns",
@ -150,10 +150,10 @@ func TestTranslateTopologyFromCSIToInTree(t *testing.T) {
expectedLabels: useast1aGALabels,
},
{
name: "Remove CSI Topology Key and do not change existing Beta Kubernetes topology",
key: GCEPDTopologyKey,
expErr: false,
regionTopologyHandler: gceRegionTopologyHandler,
name: "Remove CSI Topology Key and do not change existing Beta Kubernetes topology",
key: GCEPDTopologyKey,
expErr: false,
regionParser: gceGetRegionFromZones,
pv: &v1.PersistentVolume{
ObjectMeta: metav1.ObjectMeta{
Name: "gcepd", Namespace: "myns",
@ -186,10 +186,10 @@ func TestTranslateTopologyFromCSIToInTree(t *testing.T) {
expectedLabels: uswest2bBetaLabels,
},
{
name: "Remove CSI Topology Key and add Kubernetes topology from NodeAffinity, ignore labels",
key: GCEPDTopologyKey,
expErr: false,
regionTopologyHandler: gceRegionTopologyHandler,
name: "Remove CSI Topology Key and add Kubernetes topology from NodeAffinity, ignore labels",
key: GCEPDTopologyKey,
expErr: false,
regionParser: gceGetRegionFromZones,
pv: &v1.PersistentVolume{
ObjectMeta: metav1.ObjectMeta{
Name: "gcepd", Namespace: "myns",
@ -223,10 +223,10 @@ func TestTranslateTopologyFromCSIToInTree(t *testing.T) {
},
},
{
name: "No CSI topology label exists and no change to the NodeAffinity",
key: GCEPDTopologyKey,
expErr: false,
regionTopologyHandler: gceRegionTopologyHandler,
name: "No CSI topology label exists and no change to the NodeAffinity",
key: GCEPDTopologyKey,
expErr: false,
regionParser: gceGetRegionFromZones,
pv: &v1.PersistentVolume{
ObjectMeta: metav1.ObjectMeta{
Name: "gcepd", Namespace: "myns",
@ -250,10 +250,10 @@ func TestTranslateTopologyFromCSIToInTree(t *testing.T) {
},
},
{
name: "Generate GA labels and kubernetes topology only from CSI topology",
key: GCEPDTopologyKey,
expErr: false,
regionTopologyHandler: gceRegionTopologyHandler,
name: "Generate GA labels and kubernetes topology only from CSI topology",
key: GCEPDTopologyKey,
expErr: false,
regionParser: gceGetRegionFromZones,
pv: &v1.PersistentVolume{
ObjectMeta: metav1.ObjectMeta{
Name: "gcepd", Namespace: "myns",
@ -280,10 +280,10 @@ func TestTranslateTopologyFromCSIToInTree(t *testing.T) {
expectedLabels: useast1aGALabels,
},
{
name: "Generate Beta labels and kubernetes topology from Beta NodeAffinity",
key: GCEPDTopologyKey,
expErr: false,
regionTopologyHandler: gceRegionTopologyHandler,
name: "Generate Beta labels and kubernetes topology from Beta NodeAffinity",
key: GCEPDTopologyKey,
expErr: false,
regionParser: gceGetRegionFromZones,
pv: &v1.PersistentVolume{
ObjectMeta: metav1.ObjectMeta{
Name: "gcepd", Namespace: "myns",
@ -310,10 +310,9 @@ func TestTranslateTopologyFromCSIToInTree(t *testing.T) {
expectedLabels: uswest2bBetaLabels,
},
{
name: "regionParser is missing and only zone labels get generated",
key: GCEPDTopologyKey,
expErr: false,
regionTopologyHandler: nil,
name: "regionParser is missing and only zone labels get generated",
key: GCEPDTopologyKey,
expErr: false,
pv: &v1.PersistentVolume{
ObjectMeta: metav1.ObjectMeta{
Name: "gcepd", Namespace: "myns",
@ -352,10 +351,10 @@ func TestTranslateTopologyFromCSIToInTree(t *testing.T) {
},
},
{
name: "Replace multi-term CSI Topology Key and add Region Kubernetes topology for both",
key: GCEPDTopologyKey,
expErr: false,
regionTopologyHandler: gceRegionTopologyHandler,
name: "Replace multi-term CSI Topology Key and add Region Kubernetes topology for both",
key: GCEPDTopologyKey,
expErr: false,
regionParser: gceGetRegionFromZones,
pv: &v1.PersistentVolume{
ObjectMeta: metav1.ObjectMeta{
Name: "gcepd", Namespace: "myns",
@ -426,7 +425,7 @@ func TestTranslateTopologyFromCSIToInTree(t *testing.T) {
for _, tc := range testCases {
t.Logf("Running test: %v", tc.name)
err := translateTopologyFromCSIToInTree(tc.pv, tc.key, tc.regionTopologyHandler)
err := translateTopologyFromCSIToInTree(tc.pv, tc.key, tc.regionParser)
if err != nil && !tc.expErr {
t.Errorf("Did not expect an error, got: %v", err)
}