diff --git a/staging/src/k8s.io/legacy-cloud-providers/vsphere/vsphere.go b/staging/src/k8s.io/legacy-cloud-providers/vsphere/vsphere.go index fefc0ec6a18..d0ccb410425 100644 --- a/staging/src/k8s.io/legacy-cloud-providers/vsphere/vsphere.go +++ b/staging/src/k8s.io/legacy-cloud-providers/vsphere/vsphere.go @@ -24,6 +24,7 @@ import ( "errors" "fmt" "io" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "net" "net/url" @@ -47,6 +48,7 @@ import ( k8stypes "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/informers" + clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/cache" cloudprovider "k8s.io/cloud-provider" nodehelpers "k8s.io/cloud-provider/node/helpers" @@ -62,6 +64,7 @@ import ( // VSphere Cloud Provider constants const ( ProviderName = "vsphere" + providerIDPrefix = "vsphere://" VolDir = "kubevols" RoundTripperDefaultCount = 3 DummyVMPrefixName = "vsphere-k8s" @@ -95,8 +98,9 @@ var _ cloudprovider.PVLabeler = (*VSphere)(nil) // VSphere is an implementation of cloud provider Interface for VSphere. type VSphere struct { - cfg *VSphereConfig - hostName string + cfg *VSphereConfig + kubeClient clientset.Interface + hostName string // Maps the VSphere IP address to VSphereInstance vsphereInstanceMap map[string]*VSphereInstance vsphereVolumeMap *VsphereVolumeMap @@ -268,6 +272,7 @@ func init() { // Initialize passes a Kubernetes clientBuilder interface to the cloud provider func (vs *VSphere) Initialize(clientBuilder cloudprovider.ControllerClientBuilder, stop <-chan struct{}) { + vs.kubeClient = clientBuilder.ClientOrDie("vsphere-legacy-cloud-provider") } // Initialize Node Informers @@ -1530,6 +1535,37 @@ func (vs *VSphere) NodeAdded(obj interface{}) { if err := vs.nodeManager.RegisterNode(node); err != nil { klog.Errorf("failed to add node %+v: %v", node, err) } + vs.reconcileZonesForNode(node) +} + +func (vs *VSphere) reconcileZonesForNode(node *v1.Node) { + nodeZone := node.ObjectMeta.Labels[v1.LabelTopologyZone] + nodeRegion := node.ObjectMeta.Labels[v1.LabelTopologyRegion] + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + if vs.isSecretInfoProvided && vs.isZoneEnabled() { + zone, err := vs.GetZoneByProviderID(ctx, node.Spec.ProviderID) + if err != nil { + klog.Warningf("Can not get Zones from vCenter: %v", err) + } + + if zone.FailureDomain != nodeZone || zone.Region != nodeRegion { + updatedNode := node.DeepCopy() + labels := updatedNode.ObjectMeta.Labels + if labels == nil { + labels = make(map[string]string) + } + labels[v1.LabelTopologyZone] = zone.FailureDomain + labels[v1.LabelTopologyRegion] = zone.Region + + updatedNode, err = vs.kubeClient.CoreV1().Nodes().Update(ctx, updatedNode, metav1.UpdateOptions{}) + if err != nil { + klog.Errorf("vSphere cloud provider can not update node with zones info: %v", err) + } else { + klog.V(3).Infof("Node %s updated with zone and region lables", node.Name) + } + } + } } // Notification handler when node is removed from k8s cluster. @@ -1716,8 +1752,89 @@ func (vs *VSphere) GetZoneByNodeName(ctx context.Context, nodeName k8stypes.Node return cloudprovider.Zone{}, cloudprovider.NotImplemented } +//TODO (dmoiseev) refactor, commonize with GetZone func (vs *VSphere) GetZoneByProviderID(ctx context.Context, providerID string) (cloudprovider.Zone, error) { - return cloudprovider.Zone{}, cloudprovider.NotImplemented + zone := cloudprovider.Zone{} + vmUUID := strings.Replace(providerID, providerIDPrefix, "", 1) + + vsi, err := vs.getVSphereInstanceForServer(vs.cfg.Workspace.VCenterIP, ctx) + if err != nil { + klog.Errorf("Cannot connect to vsphere. Get zone for vm %s error", vmUUID) + return cloudprovider.Zone{}, err + } + dc, err := vclib.GetDatacenter(ctx, vsi.conn, vs.cfg.Workspace.Datacenter) + if err != nil { + klog.Errorf("Cannot connect to datacenter. Get zone for vm %s error", vmUUID) + return cloudprovider.Zone{}, err + } + vmHost, err := dc.GetHostByVMUUID(ctx, vmUUID) + if err != nil { + klog.Errorf("Cannot find VM runtime host. Get zone for vm %s error", vmUUID) + return cloudprovider.Zone{}, err + } + + pc := vsi.conn.Client.ServiceContent.PropertyCollector + err = withTagsClient(ctx, vsi.conn, func(c *rest.Client) error { + client := tags.NewManager(c) + // example result: ["Folder", "Datacenter", "Cluster", "Host"] + objects, err := mo.Ancestors(ctx, vsi.conn.Client, pc, *vmHost) + if err != nil { + return err + } + + // search the hierarchy, example order: ["Host", "Cluster", "Datacenter", "Folder"] + for i := range objects { + obj := objects[len(objects)-1-i] + tags, err := client.ListAttachedTags(ctx, obj) + if err != nil { + klog.Errorf("Cannot list attached tags. Get zone for vm %s: %s", vmUUID, err) + return err + } + for _, value := range tags { + tag, err := client.GetTag(ctx, value) + if err != nil { + klog.Errorf("Get tag %s: %s", value, err) + return err + } + category, err := client.GetCategory(ctx, tag.CategoryID) + if err != nil { + klog.Errorf("Get category %s error", value) + return err + } + + found := func() { + klog.Errorf("Found %q tag (%s) for %s attached to %s", category.Name, tag.Name, vmUUID, obj.Reference()) + } + switch { + case category.Name == vs.cfg.Labels.Zone: + zone.FailureDomain = tag.Name + found() + case category.Name == vs.cfg.Labels.Region: + zone.Region = tag.Name + found() + } + + if zone.FailureDomain != "" && zone.Region != "" { + return nil + } + } + } + + if zone.Region == "" { + return fmt.Errorf("vSphere region category %q does not match any tags for vm %s", vs.cfg.Labels.Region, vmUUID) + } + if zone.FailureDomain == "" { + return fmt.Errorf("vSphere zone category %q does not match any tags for vm %s", vs.cfg.Labels.Zone, vmUUID) + } + + return nil + }) + if err != nil { + klog.Errorf("Get zone for vm %s: %s", vmUUID, err) + return cloudprovider.Zone{}, err + } + + return zone, nil } // GetLabelsForVolume implements the PVLabeler interface for VSphere