diff --git a/pkg/cloudprovider/providers/azure/azure.go b/pkg/cloudprovider/providers/azure/azure.go index 12a5cee0ade..c0636e90d12 100644 --- a/pkg/cloudprovider/providers/azure/azure.go +++ b/pkg/cloudprovider/providers/azure/azure.go @@ -58,6 +58,9 @@ const ( loadBalancerSkuBasic = "basic" loadBalancerSkuStandard = "standard" + + externalResourceGroupLabel = "kubernetes.azure.com/resource-group" + managedByAzureLabel = "kubernetes.azure.com/managed" ) var ( @@ -156,11 +159,16 @@ type Cloud struct { metadata *InstanceMetadata vmSet VMSet - // Lock for access to nodeZones - nodeZonesLock sync.Mutex + // Lock for access to node caches + nodeCachesLock sync.Mutex // nodeZones is a mapping from Zone to a sets.String of Node's names in the Zone // it is updated by the nodeInformer - nodeZones map[string]sets.String + nodeZones map[string]sets.String + // nodeResourceGroups holds nodes external resource groups + nodeResourceGroups map[string]string + // unmanagedNodes holds a list of nodes not managed by Azure cloud provider. + unmanagedNodes sets.String + // nodeInformerSynced is for determining if the informer has synced. nodeInformerSynced cache.InformerSynced // Clients for vmss. @@ -254,9 +262,11 @@ func NewCloud(configReader io.Reader) (cloudprovider.Interface, error) { rateLimiterWriter: operationPollRateLimiterWrite, } az := Cloud{ - Config: *config, - Environment: *env, - nodeZones: map[string]sets.String{}, + Config: *config, + Environment: *env, + nodeZones: map[string]sets.String{}, + nodeResourceGroups: map[string]string{}, + unmanagedNodes: sets.NewString(), DisksClient: newAzDisksClient(azClientConfig), RoutesClient: newAzRoutesClient(azClientConfig), @@ -294,7 +304,7 @@ func NewCloud(configReader io.Reader) (cloudprovider.Interface, error) { Duration: time.Duration(az.CloudProviderBackoffDuration) * time.Second, Jitter: az.CloudProviderBackoffJitter, } - glog.V(2).Infof("Azure cloudprovider using retry backoff: retries=%d, exponent=%f, duration=%d, jitter=%f", + glog.V(2).Infof("Azure cloudprovider using try backoff: retries=%d, exponent=%f, duration=%d, jitter=%f", az.CloudProviderBackoffRetries, az.CloudProviderBackoffExponent, az.CloudProviderBackoffDuration, @@ -449,7 +459,7 @@ func (az *Cloud) SetInformers(informerFactory informers.SharedInformerFactory) { nodeInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { node := obj.(*v1.Node) - az.updateNodeZones(nil, node) + az.updateNodeCaches(nil, node) }, UpdateFunc: func(prev, obj interface{}) { prevNode := prev.(*v1.Node) @@ -458,7 +468,7 @@ func (az *Cloud) SetInformers(informerFactory informers.SharedInformerFactory) { prevNode.Labels[kubeletapis.LabelZoneFailureDomain] { return } - az.updateNodeZones(prevNode, newNode) + az.updateNodeCaches(prevNode, newNode) }, DeleteFunc: func(obj interface{}) { node, isNode := obj.(*v1.Node) @@ -476,16 +486,19 @@ func (az *Cloud) SetInformers(informerFactory informers.SharedInformerFactory) { return } } - az.updateNodeZones(node, nil) + az.updateNodeCaches(node, nil) }, }) az.nodeInformerSynced = nodeInformer.HasSynced } -func (az *Cloud) updateNodeZones(prevNode, newNode *v1.Node) { - az.nodeZonesLock.Lock() - defer az.nodeZonesLock.Unlock() +// updateNodeCaches updates local cache for node's zones and external resource groups. +func (az *Cloud) updateNodeCaches(prevNode, newNode *v1.Node) { + az.nodeCachesLock.Lock() + defer az.nodeCachesLock.Unlock() + if prevNode != nil { + // Remove from nodeZones cache. prevZone, ok := prevNode.ObjectMeta.Labels[kubeletapis.LabelZoneFailureDomain] if ok && az.isAvailabilityZone(prevZone) { az.nodeZones[prevZone].Delete(prevNode.ObjectMeta.Name) @@ -493,8 +506,22 @@ func (az *Cloud) updateNodeZones(prevNode, newNode *v1.Node) { az.nodeZones[prevZone] = nil } } + + // Remove from nodeResourceGroups cache. + _, ok = prevNode.ObjectMeta.Labels[externalResourceGroupLabel] + if ok { + delete(az.nodeResourceGroups, prevNode.ObjectMeta.Name) + } + + // Remove from unmanagedNodes cache. + managed, ok := prevNode.ObjectMeta.Labels[managedByAzureLabel] + if ok && managed == "false" { + az.unmanagedNodes.Delete(prevNode.ObjectMeta.Name) + } } + if newNode != nil { + // Add to nodeZones cache. newZone, ok := newNode.ObjectMeta.Labels[kubeletapis.LabelZoneFailureDomain] if ok && az.isAvailabilityZone(newZone) { if az.nodeZones[newZone] == nil { @@ -502,6 +529,18 @@ func (az *Cloud) updateNodeZones(prevNode, newNode *v1.Node) { } az.nodeZones[newZone].Insert(newNode.ObjectMeta.Name) } + + // Add to nodeResourceGroups cache. + newRG, ok := newNode.ObjectMeta.Labels[externalResourceGroupLabel] + if ok && len(newRG) > 0 { + az.nodeResourceGroups[newNode.ObjectMeta.Name] = newRG + } + + // Add to unmanagedNodes cache. + managed, ok := newNode.ObjectMeta.Labels[managedByAzureLabel] + if ok && managed == "false" { + az.unmanagedNodes.Insert(newNode.ObjectMeta.Name) + } } } @@ -511,8 +550,8 @@ func (az *Cloud) GetActiveZones() (sets.String, error) { return nil, fmt.Errorf("Azure cloud provider doesn't have informers set") } - az.nodeZonesLock.Lock() - defer az.nodeZonesLock.Unlock() + az.nodeCachesLock.Lock() + defer az.nodeCachesLock.Unlock() if !az.nodeInformerSynced() { return nil, fmt.Errorf("node informer is not synced when trying to GetActiveZones") } @@ -530,3 +569,76 @@ func (az *Cloud) GetActiveZones() (sets.String, error) { func (az *Cloud) GetLocation() string { return az.Location } + +// GetNodeResourceGroup gets resource group for given node. +func (az *Cloud) GetNodeResourceGroup(nodeName string) (string, error) { + // Kubelet won't set az.nodeInformerSynced, always return configured resourceGroup. + if az.nodeInformerSynced == nil { + return az.ResourceGroup, nil + } + + az.nodeCachesLock.Lock() + defer az.nodeCachesLock.Unlock() + if !az.nodeInformerSynced() { + return "", fmt.Errorf("node informer is not synced when trying to GetNodeResourceGroup") + } + + // Return external resource group if it has been cached. + if cachedRG, ok := az.nodeResourceGroups[nodeName]; ok { + return cachedRG, nil + } + + // Return resource group from cloud provider options. + return az.ResourceGroup, nil +} + +// GetResourceGroups returns a set of resource groups that all nodes are running on. +func (az *Cloud) GetResourceGroups() (sets.String, error) { + // Kubelet won't set az.nodeInformerSynced, always return configured resourceGroup. + if az.nodeInformerSynced == nil { + return sets.NewString(az.ResourceGroup), nil + } + + az.nodeCachesLock.Lock() + defer az.nodeCachesLock.Unlock() + if !az.nodeInformerSynced() { + return nil, fmt.Errorf("node informer is not synced when trying to GetResourceGroups") + } + + resourceGroups := sets.NewString(az.ResourceGroup) + for _, rg := range az.nodeResourceGroups { + resourceGroups.Insert(rg) + } + + return resourceGroups, nil +} + +// GetUnmanagedNodes returns a list of nodes not managed by Azure cloud provider (e.g. on-prem nodes). +func (az *Cloud) GetUnmanagedNodes() (sets.String, error) { + // Kubelet won't set az.nodeInformerSynced, always return nil. + if az.nodeInformerSynced == nil { + return nil, nil + } + + az.nodeCachesLock.Lock() + defer az.nodeCachesLock.Unlock() + if !az.nodeInformerSynced() { + return nil, fmt.Errorf("node informer is not synced when trying to GetUnmanagedNodes") + } + + return sets.NewString(az.unmanagedNodes.List()...), nil +} + +// ShouldNodeExcludedFromLoadBalancer returns true if node is unmanaged or in external resource group. +func (az *Cloud) ShouldNodeExcludedFromLoadBalancer(node *v1.Node) bool { + labels := node.ObjectMeta.Labels + if rg, ok := labels[externalResourceGroupLabel]; ok && rg != az.ResourceGroup { + return true + } + + if managed, ok := labels[managedByAzureLabel]; ok && managed == "false" { + return true + } + + return false +} diff --git a/pkg/cloudprovider/providers/azure/azure_backoff.go b/pkg/cloudprovider/providers/azure/azure_backoff.go index d085bcf222a..02a08d11707 100644 --- a/pkg/cloudprovider/providers/azure/azure_backoff.go +++ b/pkg/cloudprovider/providers/azure/azure_backoff.go @@ -69,20 +69,20 @@ func (az *Cloud) GetVirtualMachineWithRetry(name types.NodeName) (compute.Virtua } // VirtualMachineClientListWithRetry invokes az.VirtualMachinesClient.List with exponential backoff retry -func (az *Cloud) VirtualMachineClientListWithRetry() ([]compute.VirtualMachine, error) { +func (az *Cloud) VirtualMachineClientListWithRetry(resourceGroup string) ([]compute.VirtualMachine, error) { allNodes := []compute.VirtualMachine{} err := wait.ExponentialBackoff(az.requestBackoff(), func() (bool, error) { var retryErr error ctx, cancel := getContextWithCancel() defer cancel() - allNodes, retryErr = az.VirtualMachinesClient.List(ctx, az.ResourceGroup) + allNodes, retryErr = az.VirtualMachinesClient.List(ctx, resourceGroup) if retryErr != nil { glog.Errorf("VirtualMachinesClient.List(%v) - backoff: failure, will retry,err=%v", - az.ResourceGroup, + resourceGroup, retryErr) return false, retryErr } - glog.V(2).Infof("VirtualMachinesClient.List(%v) - backoff: success", az.ResourceGroup) + glog.V(2).Infof("VirtualMachinesClient.List(%v) - backoff: success", resourceGroup) return true, nil }) if err != nil { @@ -281,12 +281,12 @@ func (az *Cloud) DeleteRouteWithRetry(routeName string) error { } // CreateOrUpdateVMWithRetry invokes az.VirtualMachinesClient.CreateOrUpdate with exponential backoff retry -func (az *Cloud) CreateOrUpdateVMWithRetry(vmName string, newVM compute.VirtualMachine) error { +func (az *Cloud) CreateOrUpdateVMWithRetry(resourceGroup, vmName string, newVM compute.VirtualMachine) error { return wait.ExponentialBackoff(az.requestBackoff(), func() (bool, error) { ctx, cancel := getContextWithCancel() defer cancel() - resp, err := az.VirtualMachinesClient.CreateOrUpdate(ctx, az.ResourceGroup, vmName, newVM) + resp, err := az.VirtualMachinesClient.CreateOrUpdate(ctx, resourceGroup, vmName, newVM) glog.V(10).Infof("VirtualMachinesClient.CreateOrUpdate(%s): end", vmName) return processHTTPRetryResponse(resp, err) }) diff --git a/pkg/cloudprovider/providers/azure/azure_controller_standard.go b/pkg/cloudprovider/providers/azure/azure_controller_standard.go index 2ee8e8aba9b..5ac04bbf437 100644 --- a/pkg/cloudprovider/providers/azure/azure_controller_standard.go +++ b/pkg/cloudprovider/providers/azure/azure_controller_standard.go @@ -34,6 +34,12 @@ func (as *availabilitySet) AttachDisk(isManagedDisk bool, diskName, diskURI stri return err } + vmName := mapNodeNameToVMName(nodeName) + nodeResourceGroup, err := as.GetNodeResourceGroup(vmName) + if err != nil { + return err + } + disks := *vm.StorageProfile.DataDisks if isManagedDisk { disks = append(disks, @@ -67,17 +73,16 @@ func (as *availabilitySet) AttachDisk(isManagedDisk bool, diskName, diskURI stri }, }, } - vmName := mapNodeNameToVMName(nodeName) - glog.V(2).Infof("azureDisk - update(%s): vm(%s) - attach disk", as.resourceGroup, vmName) + glog.V(2).Infof("azureDisk - update(%s): vm(%s) - attach disk", nodeResourceGroup, vmName) ctx, cancel := getContextWithCancel() defer cancel() - resp, err := as.VirtualMachinesClient.CreateOrUpdate(ctx, as.resourceGroup, vmName, newVM) + resp, err := as.VirtualMachinesClient.CreateOrUpdate(ctx, nodeResourceGroup, vmName, newVM) if as.CloudProviderBackoff && shouldRetryHTTPRequest(resp, err) { - glog.V(2).Infof("azureDisk - update(%s) backing off: vm(%s)", as.resourceGroup, vmName) - retryErr := as.CreateOrUpdateVMWithRetry(vmName, newVM) + glog.V(2).Infof("azureDisk - update(%s) backing off: vm(%s)", nodeResourceGroup, vmName) + retryErr := as.CreateOrUpdateVMWithRetry(nodeResourceGroup, vmName, newVM) if retryErr != nil { err = retryErr - glog.V(2).Infof("azureDisk - update(%s) abort backoff: vm(%s)", as.resourceGroup, vmName) + glog.V(2).Infof("azureDisk - update(%s) abort backoff: vm(%s)", nodeResourceGroup, vmName) } } if err != nil { @@ -106,6 +111,12 @@ func (as *availabilitySet) DetachDiskByName(diskName, diskURI string, nodeName t return nil } + vmName := mapNodeNameToVMName(nodeName) + nodeResourceGroup, err := as.GetNodeResourceGroup(vmName) + if err != nil { + return err + } + disks := *vm.StorageProfile.DataDisks bFoundDisk := false for i, disk := range disks { @@ -132,17 +143,16 @@ func (as *availabilitySet) DetachDiskByName(diskName, diskURI string, nodeName t }, }, } - vmName := mapNodeNameToVMName(nodeName) - glog.V(2).Infof("azureDisk - update(%s): vm(%s) - detach disk", as.resourceGroup, vmName) + glog.V(2).Infof("azureDisk - update(%s): vm(%s) - detach disk", nodeResourceGroup, vmName) ctx, cancel := getContextWithCancel() defer cancel() - resp, err := as.VirtualMachinesClient.CreateOrUpdate(ctx, as.resourceGroup, vmName, newVM) + resp, err := as.VirtualMachinesClient.CreateOrUpdate(ctx, nodeResourceGroup, vmName, newVM) if as.CloudProviderBackoff && shouldRetryHTTPRequest(resp, err) { - glog.V(2).Infof("azureDisk - update(%s) backing off: vm(%s)", as.resourceGroup, vmName) - retryErr := as.CreateOrUpdateVMWithRetry(vmName, newVM) + glog.V(2).Infof("azureDisk - update(%s) backing off: vm(%s)", nodeResourceGroup, vmName) + retryErr := as.CreateOrUpdateVMWithRetry(nodeResourceGroup, vmName, newVM) if retryErr != nil { err = retryErr - glog.V(2).Infof("azureDisk - update(%s) abort backoff: vm(%s)", as.ResourceGroup, vmName) + glog.V(2).Infof("azureDisk - update(%s) abort backoff: vm(%s)", nodeResourceGroup, vmName) } } if err != nil { diff --git a/pkg/cloudprovider/providers/azure/azure_controller_vmss.go b/pkg/cloudprovider/providers/azure/azure_controller_vmss.go index ab24f8271b7..b5db5e37ae9 100644 --- a/pkg/cloudprovider/providers/azure/azure_controller_vmss.go +++ b/pkg/cloudprovider/providers/azure/azure_controller_vmss.go @@ -29,7 +29,13 @@ import ( // AttachDisk attaches a vhd to vm // the vhd must exist, can be identified by diskName, diskURI, and lun. func (ss *scaleSet) AttachDisk(isManagedDisk bool, diskName, diskURI string, nodeName types.NodeName, lun int32, cachingMode compute.CachingTypes) error { - ssName, instanceID, vm, err := ss.getVmssVM(string(nodeName)) + vmName := mapNodeNameToVMName(nodeName) + ssName, instanceID, vm, err := ss.getVmssVM(vmName) + if err != nil { + return err + } + + nodeResourceGroup, err := ss.GetNodeResourceGroup(vmName) if err != nil { return err } @@ -65,14 +71,14 @@ func (ss *scaleSet) AttachDisk(isManagedDisk bool, diskName, diskURI string, nod ctx, cancel := getContextWithCancel() defer cancel() - glog.V(2).Infof("azureDisk - update(%s): vm(%s) - attach disk", ss.resourceGroup, nodeName) - resp, err := ss.VirtualMachineScaleSetVMsClient.Update(ctx, ss.resourceGroup, ssName, instanceID, vm) + glog.V(2).Infof("azureDisk - update(%s): vm(%s) - attach disk", nodeResourceGroup, nodeName) + resp, err := ss.VirtualMachineScaleSetVMsClient.Update(ctx, nodeResourceGroup, ssName, instanceID, vm) if ss.CloudProviderBackoff && shouldRetryHTTPRequest(resp, err) { - glog.V(2).Infof("azureDisk - update(%s) backing off: vm(%s)", ss.resourceGroup, nodeName) - retryErr := ss.UpdateVmssVMWithRetry(ctx, ss.resourceGroup, ssName, instanceID, vm) + glog.V(2).Infof("azureDisk - update(%s) backing off: vm(%s)", nodeResourceGroup, nodeName) + retryErr := ss.UpdateVmssVMWithRetry(ctx, nodeResourceGroup, ssName, instanceID, vm) if retryErr != nil { err = retryErr - glog.V(2).Infof("azureDisk - update(%s) abort backoff: vm(%s)", ss.resourceGroup, nodeName) + glog.V(2).Infof("azureDisk - update(%s) abort backoff: vm(%s)", nodeResourceGroup, nodeName) } } if err != nil { @@ -85,7 +91,8 @@ func (ss *scaleSet) AttachDisk(isManagedDisk bool, diskName, diskURI string, nod } else { glog.V(4).Info("azureDisk - azure attach succeeded") // Invalidate the cache right after updating - ss.vmssVMCache.Delete(ss.makeVmssVMName(ssName, instanceID)) + key := buildVmssCacheKey(nodeResourceGroup, ss.makeVmssVMName(ssName, instanceID)) + ss.vmssVMCache.Delete(key) } return err } @@ -93,7 +100,13 @@ func (ss *scaleSet) AttachDisk(isManagedDisk bool, diskName, diskURI string, nod // DetachDiskByName detaches a vhd from host // the vhd can be identified by diskName or diskURI func (ss *scaleSet) DetachDiskByName(diskName, diskURI string, nodeName types.NodeName) error { - ssName, instanceID, vm, err := ss.getVmssVM(string(nodeName)) + vmName := mapNodeNameToVMName(nodeName) + ssName, instanceID, vm, err := ss.getVmssVM(vmName) + if err != nil { + return err + } + + nodeResourceGroup, err := ss.GetNodeResourceGroup(vmName) if err != nil { return err } @@ -122,14 +135,14 @@ func (ss *scaleSet) DetachDiskByName(diskName, diskURI string, nodeName types.No vm.StorageProfile.DataDisks = &disks ctx, cancel := getContextWithCancel() defer cancel() - glog.V(2).Infof("azureDisk - update(%s): vm(%s) - detach disk", ss.resourceGroup, nodeName) - resp, err := ss.VirtualMachineScaleSetVMsClient.Update(ctx, ss.resourceGroup, ssName, instanceID, vm) + glog.V(2).Infof("azureDisk - update(%s): vm(%s) - detach disk", nodeResourceGroup, nodeName) + resp, err := ss.VirtualMachineScaleSetVMsClient.Update(ctx, nodeResourceGroup, ssName, instanceID, vm) if ss.CloudProviderBackoff && shouldRetryHTTPRequest(resp, err) { - glog.V(2).Infof("azureDisk - update(%s) backing off: vm(%s)", ss.resourceGroup, nodeName) - retryErr := ss.UpdateVmssVMWithRetry(ctx, ss.resourceGroup, ssName, instanceID, vm) + glog.V(2).Infof("azureDisk - update(%s) backing off: vm(%s)", nodeResourceGroup, nodeName) + retryErr := ss.UpdateVmssVMWithRetry(ctx, nodeResourceGroup, ssName, instanceID, vm) if retryErr != nil { err = retryErr - glog.V(2).Infof("azureDisk - update(%s) abort backoff: vm(%s)", ss.resourceGroup, nodeName) + glog.V(2).Infof("azureDisk - update(%s) abort backoff: vm(%s)", nodeResourceGroup, nodeName) } } if err != nil { @@ -137,7 +150,8 @@ func (ss *scaleSet) DetachDiskByName(diskName, diskURI string, nodeName types.No } else { glog.V(4).Info("azureDisk - azure detach succeeded") // Invalidate the cache right after updating - ss.vmssVMCache.Delete(ss.makeVmssVMName(ssName, instanceID)) + key := buildVmssCacheKey(nodeResourceGroup, ss.makeVmssVMName(ssName, instanceID)) + ss.vmssVMCache.Delete(key) } return err diff --git a/pkg/cloudprovider/providers/azure/azure_instances.go b/pkg/cloudprovider/providers/azure/azure_instances.go index 3037f726ff5..e2d5b3cb754 100644 --- a/pkg/cloudprovider/providers/azure/azure_instances.go +++ b/pkg/cloudprovider/providers/azure/azure_instances.go @@ -171,9 +171,15 @@ func (az *Cloud) InstanceID(ctx context.Context, name types.NodeName) (string, e return az.vmSet.GetInstanceIDByNodeName(nodeName) } + // Get resource group name. + resourceGroup, err := az.metadata.Text("instance/compute/resourceGroupName") + if err != nil { + return "", err + } + // Compose instanceID based on nodeName for standard instance. if az.VMType == vmTypeStandard { - return az.getStandardMachineID(nodeName), nil + return az.getStandardMachineID(resourceGroup, nodeName), nil } // Get scale set name and instanceID from vmName for vmss. @@ -181,12 +187,12 @@ func (az *Cloud) InstanceID(ctx context.Context, name types.NodeName) (string, e if err != nil { if err == ErrorNotVmssInstance { // Compose machineID for standard Node. - return az.getStandardMachineID(nodeName), nil + return az.getStandardMachineID(resourceGroup, nodeName), nil } return "", err } // Compose instanceID based on ssName and instanceID for vmss instance. - return az.getVmssMachineID(ssName, instanceID), nil + return az.getVmssMachineID(resourceGroup, ssName, instanceID), nil } return az.vmSet.GetInstanceIDByNodeName(nodeName) diff --git a/pkg/cloudprovider/providers/azure/azure_standard.go b/pkg/cloudprovider/providers/azure/azure_standard.go index 4376c29cb9c..8fec048c256 100644 --- a/pkg/cloudprovider/providers/azure/azure_standard.go +++ b/pkg/cloudprovider/providers/azure/azure_standard.go @@ -62,22 +62,24 @@ const ( var errNotInVMSet = errors.New("vm is not in the vmset") var providerIDRE = regexp.MustCompile(`^` + CloudProviderName + `://(?:.*)/Microsoft.Compute/virtualMachines/(.+)$`) var backendPoolIDRE = regexp.MustCompile(`^/subscriptions/(?:.*)/resourceGroups/(?:.*)/providers/Microsoft.Network/loadBalancers/(.+)/backendAddressPools/(?:.*)`) +var nicResourceGroupRE = regexp.MustCompile(`.*/subscriptions/(?:.*)/resourceGroups/(.+)/providers/Microsoft.Network/networkInterfaces/(?:.*)`) +var publicIPResourceGroupRE = regexp.MustCompile(`.*/subscriptions/(?:.*)/resourceGroups/(.+)/providers/Microsoft.Network/publicIPAddresses/(?:.*)`) // getStandardMachineID returns the full identifier of a virtual machine. -func (az *Cloud) getStandardMachineID(machineName string) string { +func (az *Cloud) getStandardMachineID(resourceGroup, machineName string) string { return fmt.Sprintf( machineIDTemplate, az.SubscriptionID, - az.ResourceGroup, + resourceGroup, machineName) } // returns the full identifier of an availabilitySet -func (az *Cloud) getAvailabilitySetID(availabilitySetName string) string { +func (az *Cloud) getAvailabilitySetID(resourceGroup, availabilitySetName string) string { return fmt.Sprintf( availabilitySetIDTemplate, az.SubscriptionID, - az.ResourceGroup, + resourceGroup, availabilitySetName) } @@ -306,51 +308,6 @@ func MakeCRC32(str string) string { return strconv.FormatUint(uint64(hash), 10) } -//ExtractVMData : extract dataDisks, storageProfile from a map struct -func ExtractVMData(vmData map[string]interface{}) (dataDisks []interface{}, - storageProfile map[string]interface{}, - hardwareProfile map[string]interface{}, err error) { - props, ok := vmData["properties"].(map[string]interface{}) - if !ok { - return nil, nil, nil, fmt.Errorf("convert vmData(properties) to map error") - } - - storageProfile, ok = props["storageProfile"].(map[string]interface{}) - if !ok { - return nil, nil, nil, fmt.Errorf("convert vmData(storageProfile) to map error") - } - - hardwareProfile, ok = props["hardwareProfile"].(map[string]interface{}) - if !ok { - return nil, nil, nil, fmt.Errorf("convert vmData(hardwareProfile) to map error") - } - - dataDisks, ok = storageProfile["dataDisks"].([]interface{}) - if !ok { - return nil, nil, nil, fmt.Errorf("convert vmData(dataDisks) to map error") - } - return dataDisks, storageProfile, hardwareProfile, nil -} - -//ExtractDiskData : extract provisioningState, diskState from a map struct -func ExtractDiskData(diskData interface{}) (provisioningState string, diskState string, err error) { - fragment, ok := diskData.(map[string]interface{}) - if !ok { - return "", "", fmt.Errorf("convert diskData to map error") - } - - properties, ok := fragment["properties"].(map[string]interface{}) - if !ok { - return "", "", fmt.Errorf("convert diskData(properties) to map error") - } - - provisioningState, ok = properties["provisioningState"].(string) // if there is a disk, provisioningState property will be there - if ref, ok := properties["diskState"]; ok { - diskState = ref.(string) - } - return provisioningState, diskState, nil -} - // availabilitySet implements VMSet interface for Azure availability sets. type availabilitySet struct { *Cloud @@ -483,7 +440,7 @@ func (as *availabilitySet) GetIPByNodeName(name string) (string, string, error) // getAgentPoolAvailabiliySets lists the virtual machines for the resource group and then builds // a list of availability sets that match the nodes available to k8s. func (as *availabilitySet) getAgentPoolAvailabiliySets(nodes []*v1.Node) (agentPoolAvailabilitySets *[]string, err error) { - vms, err := as.VirtualMachineClientListWithRetry() + vms, err := as.VirtualMachineClientListWithRetry(as.ResourceGroup) if err != nil { glog.Errorf("as.getNodeAvailabilitySet - VirtualMachineClientListWithRetry failed, err=%v", err) return nil, err @@ -578,6 +535,26 @@ func (as *availabilitySet) GetPrimaryInterface(nodeName string) (network.Interfa return as.getPrimaryInterfaceWithVMSet(nodeName, "") } +// extractResourceGroupByNicID extracts the resource group name by nicID. +func extractResourceGroupByNicID(nicID string) (string, error) { + matches := nicResourceGroupRE.FindStringSubmatch(nicID) + if len(matches) != 2 { + return "", fmt.Errorf("error of extracting resourceGroup from nicID %q", nicID) + } + + return matches[1], nil +} + +// extractResourceGroupByPipID extracts the resource group name by publicIP ID. +func extractResourceGroupByPipID(pipID string) (string, error) { + matches := publicIPResourceGroupRE.FindStringSubmatch(pipID) + if len(matches) != 2 { + return "", fmt.Errorf("error of extracting resourceGroup from pipID %q", pipID) + } + + return matches[1], nil +} + // getPrimaryInterfaceWithVMSet gets machine primary network interface by node name and vmSet. func (as *availabilitySet) getPrimaryInterfaceWithVMSet(nodeName, vmSetName string) (network.Interface, error) { var machine compute.VirtualMachine @@ -596,6 +573,10 @@ func (as *availabilitySet) getPrimaryInterfaceWithVMSet(nodeName, vmSetName stri if err != nil { return network.Interface{}, err } + nodeResourceGroup, err := as.GetNodeResourceGroup(nodeName) + if err != nil { + return network.Interface{}, err + } // Check availability set name. Note that vmSetName is empty string when getting // the Node's IP address. While vmSetName is not empty, it should be checked with @@ -605,7 +586,7 @@ func (as *availabilitySet) getPrimaryInterfaceWithVMSet(nodeName, vmSetName stri // - For standard SKU load balancer, backend could belong to multiple VMAS, so we // don't check vmSet for it. if vmSetName != "" && !as.useStandardLoadBalancer() { - expectedAvailabilitySetName := as.getAvailabilitySetID(vmSetName) + expectedAvailabilitySetName := as.getAvailabilitySetID(nodeResourceGroup, vmSetName) if machine.AvailabilitySet == nil || !strings.EqualFold(*machine.AvailabilitySet.ID, expectedAvailabilitySetName) { glog.V(3).Infof( "GetPrimaryInterface: nic (%s) is not in the availabilitySet(%s)", nicName, vmSetName) @@ -613,9 +594,14 @@ func (as *availabilitySet) getPrimaryInterfaceWithVMSet(nodeName, vmSetName stri } } + nicResourceGroup, err := extractResourceGroupByNicID(primaryNicID) + if err != nil { + return network.Interface{}, err + } + ctx, cancel := getContextWithCancel() defer cancel() - nic, err := as.InterfacesClient.Get(ctx, as.ResourceGroup, nicName, "") + nic, err := as.InterfacesClient.Get(ctx, nicResourceGroup, nicName, "") if err != nil { return network.Interface{}, err } @@ -718,6 +704,11 @@ func (as *availabilitySet) EnsureHostsInPool(serviceName string, nodes []*v1.Nod continue } + if as.ShouldNodeExcludedFromLoadBalancer(node) { + glog.V(4).Infof("Excluding unmanaged/external-resource-group node %q", localNodeName) + continue + } + f := func() error { err := as.ensureHostInPool(serviceName, types.NodeName(localNodeName), backendPoolID, vmSetName, isInternal) if err != nil { diff --git a/pkg/cloudprovider/providers/azure/azure_test.go b/pkg/cloudprovider/providers/azure/azure_test.go index d2f4184eba8..bf694d84f3c 100644 --- a/pkg/cloudprovider/providers/azure/azure_test.go +++ b/pkg/cloudprovider/providers/azure/azure_test.go @@ -956,6 +956,8 @@ func getTestCloud() (az *Cloud) { }, nodeZones: map[string]sets.String{}, nodeInformerSynced: func() bool { return true }, + nodeResourceGroups: map[string]string{}, + unmanagedNodes: sets.NewString(), } az.DisksClient = newFakeDisksClient() az.InterfacesClient = newFakeAzureInterfacesClient() @@ -1065,7 +1067,7 @@ func getClusterResources(az *Cloud, vmCount int, availabilitySetCount int) (clus az.InterfacesClient.CreateOrUpdate(ctx, az.Config.ResourceGroup, nicName, newNIC) // create vm - asID := az.getAvailabilitySetID(asName) + asID := az.getAvailabilitySetID(az.Config.ResourceGroup, asName) newVM := compute.VirtualMachine{ Name: &vmName, Location: &az.Config.Location, @@ -2771,3 +2773,100 @@ func TestGetResourceGroupFromDiskURI(t *testing.T) { } } } + +func TestGetResourceGroups(t *testing.T) { + tests := []struct { + name string + nodeResourceGroups map[string]string + expected sets.String + informerSynced bool + expectError bool + }{ + { + name: "cloud provider configured RG should be returned by default", + nodeResourceGroups: map[string]string{}, + informerSynced: true, + expected: sets.NewString("rg"), + }, + { + name: "cloud provider configured RG and node RGs should be returned", + nodeResourceGroups: map[string]string{"node1": "rg1", "node2": "rg2"}, + informerSynced: true, + expected: sets.NewString("rg", "rg1", "rg2"), + }, + { + name: "error should be returned if informer hasn't synced yet", + nodeResourceGroups: map[string]string{"node1": "rg1", "node2": "rg2"}, + informerSynced: false, + expectError: true, + }, + } + + az := getTestCloud() + for _, test := range tests { + az.nodeResourceGroups = test.nodeResourceGroups + if test.informerSynced { + az.nodeInformerSynced = func() bool { return true } + } else { + az.nodeInformerSynced = func() bool { return false } + } + actual, err := az.GetResourceGroups() + if test.expectError { + assert.NotNil(t, err, test.name) + continue + } + + assert.Nil(t, err, test.name) + assert.Equal(t, test.expected, actual, test.name) + } +} + +func TestGetNodeResourceGroup(t *testing.T) { + tests := []struct { + name string + nodeResourceGroups map[string]string + node string + expected string + informerSynced bool + expectError bool + }{ + { + name: "cloud provider configured RG should be returned by default", + nodeResourceGroups: map[string]string{}, + informerSynced: true, + node: "node1", + expected: "rg", + }, + { + name: "node RGs should be returned", + nodeResourceGroups: map[string]string{"node1": "rg1", "node2": "rg2"}, + informerSynced: true, + node: "node1", + expected: "rg1", + }, + { + name: "error should be returned if informer hasn't synced yet", + nodeResourceGroups: map[string]string{"node1": "rg1", "node2": "rg2"}, + informerSynced: false, + expectError: true, + }, + } + + az := getTestCloud() + for _, test := range tests { + az.nodeResourceGroups = test.nodeResourceGroups + if test.informerSynced { + az.nodeInformerSynced = func() bool { return true } + } else { + az.nodeInformerSynced = func() bool { return false } + } + actual, err := az.GetNodeResourceGroup(test.node) + if test.expectError { + assert.NotNil(t, err, test.name) + continue + } + + assert.Nil(t, err, test.name) + assert.Equal(t, test.expected, actual, test.name) + } +} diff --git a/pkg/cloudprovider/providers/azure/azure_vmss.go b/pkg/cloudprovider/providers/azure/azure_vmss.go index 04e62e696e9..2fe598e0ccf 100644 --- a/pkg/cloudprovider/providers/azure/azure_vmss.go +++ b/pkg/cloudprovider/providers/azure/azure_vmss.go @@ -40,8 +40,10 @@ var ( // ErrorNotVmssInstance indicates an instance is not belongint to any vmss. ErrorNotVmssInstance = errors.New("not a vmss instance") - scaleSetNameRE = regexp.MustCompile(`.*/subscriptions/(?:.*)/Microsoft.Compute/virtualMachineScaleSets/(.+)/virtualMachines(?:.*)`) - vmssMachineIDTemplate = "/subscriptions/%s/resourceGroups/%s/providers/Microsoft.Compute/virtualMachineScaleSets/%s/virtualMachines/%s" + scaleSetNameRE = regexp.MustCompile(`.*/subscriptions/(?:.*)/Microsoft.Compute/virtualMachineScaleSets/(.+)/virtualMachines(?:.*)`) + resourceGroupRE = regexp.MustCompile(`.*/subscriptions/(?:.*)/resourceGroups/(.+)/providers/Microsoft.Compute/virtualMachineScaleSets/(?:.*)/virtualMachines(?:.*)`) + vmssNicResourceGroupRE = regexp.MustCompile(`.*/subscriptions/(?:.*)/resourceGroups/(.+)/providers/Microsoft.Compute/virtualMachineScaleSets/(?:.*)/virtualMachines/(?:.*)/networkInterfaces/(?:.*)`) + vmssMachineIDTemplate = "/subscriptions/%s/resourceGroups/%s/providers/Microsoft.Compute/virtualMachineScaleSets/%s/virtualMachines/%s" ) // scaleSet implements VMSet interface for Azure scale set. @@ -106,8 +108,14 @@ func (ss *scaleSet) getVmssVM(nodeName string) (ssName, instanceID string, vm co return "", "", vm, cloudprovider.InstanceNotFound } + resourceGroup, err := ss.GetNodeResourceGroup(nodeName) + if err != nil { + return "", "", vm, err + } + glog.V(4).Infof("getVmssVM gets scaleSetName (%q) and instanceID (%q) for node %q", ssName, instanceID, nodeName) - cachedVM, err := ss.vmssVMCache.Get(ss.makeVmssVMName(ssName, instanceID)) + key := buildVmssCacheKey(resourceGroup, ss.makeVmssVMName(ssName, instanceID)) + cachedVM, err := ss.vmssVMCache.Get(key) if err != nil { return ssName, instanceID, vm, err } @@ -122,9 +130,10 @@ func (ss *scaleSet) getVmssVM(nodeName string) (ssName, instanceID string, vm co // getCachedVirtualMachineByInstanceID gets scaleSetVMInfo from cache. // The node must belong to one of scale sets. -func (ss *scaleSet) getVmssVMByInstanceID(scaleSetName, instanceID string) (vm compute.VirtualMachineScaleSetVM, err error) { +func (ss *scaleSet) getVmssVMByInstanceID(resourceGroup, scaleSetName, instanceID string) (vm compute.VirtualMachineScaleSetVM, err error) { vmName := ss.makeVmssVMName(scaleSetName, instanceID) - cachedVM, err := ss.vmssVMCache.Get(vmName) + key := buildVmssCacheKey(resourceGroup, vmName) + cachedVM, err := ss.vmssVMCache.Get(key) if err != nil { return vm, err } @@ -168,13 +177,18 @@ func (ss *scaleSet) GetNodeNameByProviderID(providerID string) (types.NodeName, return ss.availabilitySet.GetNodeNameByProviderID(providerID) } + resourceGroup, err := extractResourceGroupByProviderID(providerID) + if err != nil { + return "", fmt.Errorf("error of extracting resource group for node %q", providerID) + } + instanceID, err := getLastSegment(providerID) if err != nil { glog.V(4).Infof("Can not extract instanceID from providerID (%s), assuming it is mananaged by availability set: %v", providerID, err) return ss.availabilitySet.GetNodeNameByProviderID(providerID) } - vm, err := ss.getVmssVMByInstanceID(scaleSetName, instanceID) + vm, err := ss.getVmssVMByInstanceID(resourceGroup, scaleSetName, instanceID) if err != nil { return "", err } @@ -308,7 +322,7 @@ func getScaleSetVMInstanceID(machineName string) (string, error) { return fmt.Sprintf("%d", instanceID), nil } -// extractScaleSetNameByProviderID extracts the scaleset name by node's ProviderID. +// extractScaleSetNameByProviderID extracts the scaleset name by vmss node's ProviderID. func extractScaleSetNameByProviderID(providerID string) (string, error) { matches := scaleSetNameRE.FindStringSubmatch(providerID) if len(matches) != 2 { @@ -318,13 +332,23 @@ func extractScaleSetNameByProviderID(providerID string) (string, error) { return matches[1], nil } +// extractResourceGroupByProviderID extracts the resource group name by vmss node's ProviderID. +func extractResourceGroupByProviderID(providerID string) (string, error) { + matches := resourceGroupRE.FindStringSubmatch(providerID) + if len(matches) != 2 { + return "", ErrorNotVmssInstance + } + + return matches[1], nil +} + // listScaleSets lists all scale sets. -func (ss *scaleSet) listScaleSets() ([]string, error) { +func (ss *scaleSet) listScaleSets(resourceGroup string) ([]string, error) { var err error ctx, cancel := getContextWithCancel() defer cancel() - allScaleSets, err := ss.VirtualMachineScaleSetsClient.List(ctx, ss.ResourceGroup) + allScaleSets, err := ss.VirtualMachineScaleSetsClient.List(ctx, resourceGroup) if err != nil { glog.Errorf("VirtualMachineScaleSetsClient.List failed: %v", err) return nil, err @@ -339,12 +363,12 @@ func (ss *scaleSet) listScaleSets() ([]string, error) { } // listScaleSetVMs lists VMs belonging to the specified scale set. -func (ss *scaleSet) listScaleSetVMs(scaleSetName string) ([]compute.VirtualMachineScaleSetVM, error) { +func (ss *scaleSet) listScaleSetVMs(scaleSetName, resourceGroup string) ([]compute.VirtualMachineScaleSetVM, error) { var err error ctx, cancel := getContextWithCancel() defer cancel() - allVMs, err := ss.VirtualMachineScaleSetVMsClient.List(ctx, ss.ResourceGroup, scaleSetName, "", "", string(compute.InstanceView)) + allVMs, err := ss.VirtualMachineScaleSetVMsClient.List(ctx, resourceGroup, scaleSetName, "", "", string(compute.InstanceView)) if err != nil { glog.Errorf("VirtualMachineScaleSetVMsClient.List failed: %v", err) return nil, err @@ -362,6 +386,10 @@ func (ss *scaleSet) getAgentPoolScaleSets(nodes []*v1.Node) (*[]string, error) { continue } + if ss.ShouldNodeExcludedFromLoadBalancer(nodes[nx]) { + continue + } + nodeName := nodes[nx].Name ssName, err := ss.getScaleSetNameByNodeName(nodeName) if err != nil { @@ -429,6 +457,16 @@ func (ss *scaleSet) GetVMSetNames(service *v1.Service, nodes []*v1.Node) (vmSetN return vmSetNames, nil } +// extractResourceGroupByVMSSNicID extracts the resource group name by vmss nicID. +func extractResourceGroupByVMSSNicID(nicID string) (string, error) { + matches := vmssNicResourceGroupRE.FindStringSubmatch(nicID) + if len(matches) != 2 { + return "", fmt.Errorf("error of extracting resourceGroup from nicID %q", nicID) + } + + return matches[1], nil +} + // GetPrimaryInterface gets machine primary network interface by node name and vmSet. func (ss *scaleSet) GetPrimaryInterface(nodeName string) (network.Interface, error) { managedByAS, err := ss.isNodeManagedByAvailabilitySet(nodeName) @@ -443,6 +481,11 @@ func (ss *scaleSet) GetPrimaryInterface(nodeName string) (network.Interface, err ssName, instanceID, vm, err := ss.getVmssVM(nodeName) if err != nil { + // VM is availability set, but not cached yet in availabilitySetNodesCache. + if err == ErrorNotVmssInstance { + return ss.availabilitySet.GetPrimaryInterface(nodeName) + } + glog.Errorf("error: ss.GetPrimaryInterface(%s), ss.getVmssVM(%s), err=%v", nodeName, nodeName, err) return network.Interface{}, err } @@ -458,12 +501,16 @@ func (ss *scaleSet) GetPrimaryInterface(nodeName string) (network.Interface, err glog.Errorf("error: ss.GetPrimaryInterface(%s), getLastSegment(%s), err=%v", nodeName, primaryInterfaceID, err) return network.Interface{}, err } + resourceGroup, err := extractResourceGroupByVMSSNicID(primaryInterfaceID) + if err != nil { + return network.Interface{}, err + } ctx, cancel := getContextWithCancel() defer cancel() - nic, err := ss.InterfacesClient.GetVirtualMachineScaleSetNetworkInterface(ctx, ss.ResourceGroup, ssName, instanceID, nicName, "") + nic, err := ss.InterfacesClient.GetVirtualMachineScaleSetNetworkInterface(ctx, resourceGroup, ssName, instanceID, nicName, "") if err != nil { - glog.Errorf("error: ss.GetPrimaryInterface(%s), ss.GetVirtualMachineScaleSetNetworkInterface.Get(%s, %s, %s), err=%v", nodeName, ss.ResourceGroup, ssName, nicName, err) + glog.Errorf("error: ss.GetPrimaryInterface(%s), ss.GetVirtualMachineScaleSetNetworkInterface.Get(%s, %s, %s), err=%v", nodeName, resourceGroup, ssName, nicName, err) return network.Interface{}, err } @@ -566,6 +613,11 @@ func (ss *scaleSet) getNodesScaleSets(nodes []*v1.Node) (map[string]sets.String, continue } + if ss.ShouldNodeExcludedFromLoadBalancer(curNode) { + glog.V(4).Infof("Excluding unmanaged/external-resource-group node %q", curNode.Name) + continue + } + curScaleSetName, err := extractScaleSetNameByProviderID(curNode.Spec.ProviderID) if err != nil { glog.V(4).Infof("Node %q is not belonging to any scale sets, assuming it is belong to availability sets", curNode.Name) @@ -884,11 +936,11 @@ func (ss *scaleSet) EnsureBackendPoolDeleted(poolID, vmSetName string, backendAd } // getVmssMachineID returns the full identifier of a vmss virtual machine. -func (az *Cloud) getVmssMachineID(scaleSetName, instanceID string) string { +func (az *Cloud) getVmssMachineID(resourceGroup, scaleSetName, instanceID string) string { return fmt.Sprintf( vmssMachineIDTemplate, az.SubscriptionID, - az.ResourceGroup, + resourceGroup, scaleSetName, instanceID) } diff --git a/pkg/cloudprovider/providers/azure/azure_vmss_cache.go b/pkg/cloudprovider/providers/azure/azure_vmss_cache.go index b3c1b0e99e1..c09f71f588f 100644 --- a/pkg/cloudprovider/providers/azure/azure_vmss_cache.go +++ b/pkg/cloudprovider/providers/azure/azure_vmss_cache.go @@ -27,15 +27,16 @@ import ( ) var ( - vmssNameSeparator = "_" + vmssNameSeparator = "_" + vmssCacheSeparator = "#" nodeNameToScaleSetMappingKey = "k8sNodeNameToScaleSetMappingKey" availabilitySetNodesKey = "k8sAvailabilitySetNodesKey" vmssCacheTTL = time.Minute vmssVMCacheTTL = time.Minute - availabilitySetNodesCacheTTL = 15 * time.Minute - nodeNameToScaleSetMappingCacheTTL = 15 * time.Minute + availabilitySetNodesCacheTTL = 5 * time.Minute + nodeNameToScaleSetMappingCacheTTL = 5 * time.Minute ) // nodeNameToScaleSetMapping maps nodeName to scaleSet name. @@ -49,19 +50,19 @@ func (ss *scaleSet) makeVmssVMName(scaleSetName, instanceID string) string { func extractVmssVMName(name string) (string, string, error) { split := strings.SplitAfter(name, vmssNameSeparator) if len(split) < 2 { - glog.Errorf("Failed to extract vmssVMName %q", name) + glog.V(3).Infof("Failed to extract vmssVMName %q", name) return "", "", ErrorNotVmssInstance } ssName := strings.Join(split[0:len(split)-1], "") // removing the trailing `vmssNameSeparator` since we used SplitAfter ssName = ssName[:len(ssName)-1] - instanceID := split[len(split)-1] - return ssName, instanceID, nil } +// vmssCache only holds vmss from ss.ResourceGroup because nodes from other resourceGroups +// will be excluded from LB backends. func (ss *scaleSet) newVmssCache() (*timedCache, error) { getter := func(key string) (interface{}, error) { ctx, cancel := getContextWithCancel() @@ -85,26 +86,34 @@ func (ss *scaleSet) newVmssCache() (*timedCache, error) { func (ss *scaleSet) newNodeNameToScaleSetMappingCache() (*timedCache, error) { getter := func(key string) (interface{}, error) { - scaleSetNames, err := ss.listScaleSets() + localCache := make(nodeNameToScaleSetMapping) + + allResourceGroups, err := ss.GetResourceGroups() if err != nil { return nil, err } - localCache := make(nodeNameToScaleSetMapping) - for _, ssName := range scaleSetNames { - vms, err := ss.listScaleSetVMs(ssName) + for _, resourceGroup := range allResourceGroups.List() { + scaleSetNames, err := ss.listScaleSets(resourceGroup) if err != nil { return nil, err } - for _, vm := range vms { - if vm.OsProfile == nil || vm.OsProfile.ComputerName == nil { - glog.Warningf("failed to get computerName for vmssVM (%q)", ssName) - continue + for _, ssName := range scaleSetNames { + vms, err := ss.listScaleSetVMs(ssName, resourceGroup) + if err != nil { + return nil, err } - computerName := strings.ToLower(*vm.OsProfile.ComputerName) - localCache[computerName] = ssName + for _, vm := range vms { + if vm.OsProfile == nil || vm.OsProfile.ComputerName == nil { + glog.Warningf("failed to get computerName for vmssVM (%q)", ssName) + continue + } + + computerName := strings.ToLower(*vm.OsProfile.ComputerName) + localCache[computerName] = ssName + } } } @@ -116,14 +125,23 @@ func (ss *scaleSet) newNodeNameToScaleSetMappingCache() (*timedCache, error) { func (ss *scaleSet) newAvailabilitySetNodesCache() (*timedCache, error) { getter := func(key string) (interface{}, error) { - vmList, err := ss.Cloud.VirtualMachineClientListWithRetry() + localCache := sets.NewString() + resourceGroups, err := ss.GetResourceGroups() if err != nil { return nil, err } - localCache := sets.NewString() - for _, vm := range vmList { - localCache.Insert(*vm.Name) + for _, resourceGroup := range resourceGroups.List() { + vmList, err := ss.Cloud.VirtualMachineClientListWithRetry(resourceGroup) + if err != nil { + return nil, err + } + + for _, vm := range vmList { + if vm.Name != nil { + localCache.Insert(*vm.Name) + } + } } return localCache, nil @@ -132,10 +150,33 @@ func (ss *scaleSet) newAvailabilitySetNodesCache() (*timedCache, error) { return newTimedcache(availabilitySetNodesCacheTTL, getter) } +func buildVmssCacheKey(resourceGroup, name string) string { + // key is composed of # + return fmt.Sprintf("%s%s%s", resourceGroup, vmssCacheSeparator, name) +} + +func extractVmssCacheKey(key string) (string, string, error) { + // key is composed of # + keyItems := strings.Split(key, vmssCacheSeparator) + if len(keyItems) != 2 { + return "", "", fmt.Errorf("key %q is not in format '#'", key) + } + + resourceGroup := keyItems[0] + vmName := keyItems[1] + return resourceGroup, vmName, nil +} + func (ss *scaleSet) newVmssVMCache() (*timedCache, error) { getter := func(key string) (interface{}, error) { - // vmssVM name's format is 'scaleSetName_instanceID' - ssName, instanceID, err := extractVmssVMName(key) + // key is composed of # + resourceGroup, vmName, err := extractVmssCacheKey(key) + if err != nil { + return nil, err + } + + // vmName's format is 'scaleSetName_instanceID' + ssName, instanceID, err := extractVmssVMName(vmName) if err != nil { return nil, err } @@ -147,7 +188,7 @@ func (ss *scaleSet) newVmssVMCache() (*timedCache, error) { ctx, cancel := getContextWithCancel() defer cancel() - result, err := ss.VirtualMachineScaleSetVMsClient.Get(ctx, ss.ResourceGroup, ssName, instanceID) + result, err := ss.VirtualMachineScaleSetVMsClient.Get(ctx, resourceGroup, ssName, instanceID) exists, message, realErr := checkResourceExistsFromError(err) if realErr != nil { return nil, realErr diff --git a/pkg/cloudprovider/providers/azure/azure_wrap.go b/pkg/cloudprovider/providers/azure/azure_wrap.go index 920eb2ec928..f3d2e642805 100644 --- a/pkg/cloudprovider/providers/azure/azure_wrap.go +++ b/pkg/cloudprovider/providers/azure/azure_wrap.go @@ -189,7 +189,13 @@ func (az *Cloud) newVMCache() (*timedCache, error) { // Consider adding separate parameter for controlling 'InstanceView' once node update issue #56276 is fixed ctx, cancel := getContextWithCancel() defer cancel() - vm, err := az.VirtualMachinesClient.Get(ctx, az.ResourceGroup, key, compute.InstanceView) + + resourceGroup, err := az.GetNodeResourceGroup(key) + if err != nil { + return nil, err + } + + vm, err := az.VirtualMachinesClient.Get(ctx, resourceGroup, key, compute.InstanceView) exists, message, realErr := checkResourceExistsFromError(err) if realErr != nil { return nil, realErr diff --git a/pkg/volume/azure_dd/azure_provision.go b/pkg/volume/azure_dd/azure_provision.go index f77da039cc3..d685eb44781 100644 --- a/pkg/volume/azure_dd/azure_provision.go +++ b/pkg/volume/azure_dd/azure_provision.go @@ -304,14 +304,16 @@ func (p *azureDiskProvisioner) Provision(selectedNode *v1.Node, allowedTopologie if zoned { // Set node affinity labels based on availability zone labels. - requirements := make([]v1.NodeSelectorRequirement, 0) - for k, v := range labels { - requirements = append(requirements, v1.NodeSelectorRequirement{Key: k, Operator: v1.NodeSelectorOpIn, Values: []string{v}}) - } + if len(labels) > 0 { + requirements := make([]v1.NodeSelectorRequirement, 0) + for k, v := range labels { + requirements = append(requirements, v1.NodeSelectorRequirement{Key: k, Operator: v1.NodeSelectorOpIn, Values: []string{v}}) + } - nodeSelectorTerms = append(nodeSelectorTerms, v1.NodeSelectorTerm{ - MatchExpressions: requirements, - }) + nodeSelectorTerms = append(nodeSelectorTerms, v1.NodeSelectorTerm{ + MatchExpressions: requirements, + }) + } } else { // Set node affinity labels based on fault domains. // This is required because unzoned AzureDisk can't be attached to zoned nodes. @@ -336,10 +338,12 @@ func (p *azureDiskProvisioner) Provision(selectedNode *v1.Node, allowedTopologie } } - pv.Spec.NodeAffinity = &v1.VolumeNodeAffinity{ - Required: &v1.NodeSelector{ - NodeSelectorTerms: nodeSelectorTerms, - }, + if len(nodeSelectorTerms) > 0 { + pv.Spec.NodeAffinity = &v1.VolumeNodeAffinity{ + Required: &v1.NodeSelector{ + NodeSelectorTerms: nodeSelectorTerms, + }, + } } }