From d499855ba2c7c62e2d0e61525a7ab40329d5730f Mon Sep 17 00:00:00 2001 From: Pengfei Ni Date: Thu, 16 Aug 2018 17:10:37 +0800 Subject: [PATCH] Add caches for cross node resource groups and unmanaged nodes. --- pkg/cloudprovider/providers/azure/azure.go | 142 ++++++++++++++++++--- 1 file changed, 127 insertions(+), 15 deletions(-) diff --git a/pkg/cloudprovider/providers/azure/azure.go b/pkg/cloudprovider/providers/azure/azure.go index 12a5cee0ade..c0636e90d12 100644 --- a/pkg/cloudprovider/providers/azure/azure.go +++ b/pkg/cloudprovider/providers/azure/azure.go @@ -58,6 +58,9 @@ const ( loadBalancerSkuBasic = "basic" loadBalancerSkuStandard = "standard" + + externalResourceGroupLabel = "kubernetes.azure.com/resource-group" + managedByAzureLabel = "kubernetes.azure.com/managed" ) var ( @@ -156,11 +159,16 @@ type Cloud struct { metadata *InstanceMetadata vmSet VMSet - // Lock for access to nodeZones - nodeZonesLock sync.Mutex + // Lock for access to node caches + nodeCachesLock sync.Mutex // nodeZones is a mapping from Zone to a sets.String of Node's names in the Zone // it is updated by the nodeInformer - nodeZones map[string]sets.String + nodeZones map[string]sets.String + // nodeResourceGroups holds nodes external resource groups + nodeResourceGroups map[string]string + // unmanagedNodes holds a list of nodes not managed by Azure cloud provider. + unmanagedNodes sets.String + // nodeInformerSynced is for determining if the informer has synced. nodeInformerSynced cache.InformerSynced // Clients for vmss. @@ -254,9 +262,11 @@ func NewCloud(configReader io.Reader) (cloudprovider.Interface, error) { rateLimiterWriter: operationPollRateLimiterWrite, } az := Cloud{ - Config: *config, - Environment: *env, - nodeZones: map[string]sets.String{}, + Config: *config, + Environment: *env, + nodeZones: map[string]sets.String{}, + nodeResourceGroups: map[string]string{}, + unmanagedNodes: sets.NewString(), DisksClient: newAzDisksClient(azClientConfig), RoutesClient: newAzRoutesClient(azClientConfig), @@ -294,7 +304,7 @@ func NewCloud(configReader io.Reader) (cloudprovider.Interface, error) { Duration: time.Duration(az.CloudProviderBackoffDuration) * time.Second, Jitter: az.CloudProviderBackoffJitter, } - glog.V(2).Infof("Azure cloudprovider using retry backoff: retries=%d, exponent=%f, duration=%d, jitter=%f", + glog.V(2).Infof("Azure cloudprovider using try backoff: retries=%d, exponent=%f, duration=%d, jitter=%f", az.CloudProviderBackoffRetries, az.CloudProviderBackoffExponent, az.CloudProviderBackoffDuration, @@ -449,7 +459,7 @@ func (az *Cloud) SetInformers(informerFactory informers.SharedInformerFactory) { nodeInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { node := obj.(*v1.Node) - az.updateNodeZones(nil, node) + az.updateNodeCaches(nil, node) }, UpdateFunc: func(prev, obj interface{}) { prevNode := prev.(*v1.Node) @@ -458,7 +468,7 @@ func (az *Cloud) SetInformers(informerFactory informers.SharedInformerFactory) { prevNode.Labels[kubeletapis.LabelZoneFailureDomain] { return } - az.updateNodeZones(prevNode, newNode) + az.updateNodeCaches(prevNode, newNode) }, DeleteFunc: func(obj interface{}) { node, isNode := obj.(*v1.Node) @@ -476,16 +486,19 @@ func (az *Cloud) SetInformers(informerFactory informers.SharedInformerFactory) { return } } - az.updateNodeZones(node, nil) + az.updateNodeCaches(node, nil) }, }) az.nodeInformerSynced = nodeInformer.HasSynced } -func (az *Cloud) updateNodeZones(prevNode, newNode *v1.Node) { - az.nodeZonesLock.Lock() - defer az.nodeZonesLock.Unlock() +// updateNodeCaches updates local cache for node's zones and external resource groups. +func (az *Cloud) updateNodeCaches(prevNode, newNode *v1.Node) { + az.nodeCachesLock.Lock() + defer az.nodeCachesLock.Unlock() + if prevNode != nil { + // Remove from nodeZones cache. prevZone, ok := prevNode.ObjectMeta.Labels[kubeletapis.LabelZoneFailureDomain] if ok && az.isAvailabilityZone(prevZone) { az.nodeZones[prevZone].Delete(prevNode.ObjectMeta.Name) @@ -493,8 +506,22 @@ func (az *Cloud) updateNodeZones(prevNode, newNode *v1.Node) { az.nodeZones[prevZone] = nil } } + + // Remove from nodeResourceGroups cache. + _, ok = prevNode.ObjectMeta.Labels[externalResourceGroupLabel] + if ok { + delete(az.nodeResourceGroups, prevNode.ObjectMeta.Name) + } + + // Remove from unmanagedNodes cache. + managed, ok := prevNode.ObjectMeta.Labels[managedByAzureLabel] + if ok && managed == "false" { + az.unmanagedNodes.Delete(prevNode.ObjectMeta.Name) + } } + if newNode != nil { + // Add to nodeZones cache. newZone, ok := newNode.ObjectMeta.Labels[kubeletapis.LabelZoneFailureDomain] if ok && az.isAvailabilityZone(newZone) { if az.nodeZones[newZone] == nil { @@ -502,6 +529,18 @@ func (az *Cloud) updateNodeZones(prevNode, newNode *v1.Node) { } az.nodeZones[newZone].Insert(newNode.ObjectMeta.Name) } + + // Add to nodeResourceGroups cache. + newRG, ok := newNode.ObjectMeta.Labels[externalResourceGroupLabel] + if ok && len(newRG) > 0 { + az.nodeResourceGroups[newNode.ObjectMeta.Name] = newRG + } + + // Add to unmanagedNodes cache. + managed, ok := newNode.ObjectMeta.Labels[managedByAzureLabel] + if ok && managed == "false" { + az.unmanagedNodes.Insert(newNode.ObjectMeta.Name) + } } } @@ -511,8 +550,8 @@ func (az *Cloud) GetActiveZones() (sets.String, error) { return nil, fmt.Errorf("Azure cloud provider doesn't have informers set") } - az.nodeZonesLock.Lock() - defer az.nodeZonesLock.Unlock() + az.nodeCachesLock.Lock() + defer az.nodeCachesLock.Unlock() if !az.nodeInformerSynced() { return nil, fmt.Errorf("node informer is not synced when trying to GetActiveZones") } @@ -530,3 +569,76 @@ func (az *Cloud) GetActiveZones() (sets.String, error) { func (az *Cloud) GetLocation() string { return az.Location } + +// GetNodeResourceGroup gets resource group for given node. +func (az *Cloud) GetNodeResourceGroup(nodeName string) (string, error) { + // Kubelet won't set az.nodeInformerSynced, always return configured resourceGroup. + if az.nodeInformerSynced == nil { + return az.ResourceGroup, nil + } + + az.nodeCachesLock.Lock() + defer az.nodeCachesLock.Unlock() + if !az.nodeInformerSynced() { + return "", fmt.Errorf("node informer is not synced when trying to GetNodeResourceGroup") + } + + // Return external resource group if it has been cached. + if cachedRG, ok := az.nodeResourceGroups[nodeName]; ok { + return cachedRG, nil + } + + // Return resource group from cloud provider options. + return az.ResourceGroup, nil +} + +// GetResourceGroups returns a set of resource groups that all nodes are running on. +func (az *Cloud) GetResourceGroups() (sets.String, error) { + // Kubelet won't set az.nodeInformerSynced, always return configured resourceGroup. + if az.nodeInformerSynced == nil { + return sets.NewString(az.ResourceGroup), nil + } + + az.nodeCachesLock.Lock() + defer az.nodeCachesLock.Unlock() + if !az.nodeInformerSynced() { + return nil, fmt.Errorf("node informer is not synced when trying to GetResourceGroups") + } + + resourceGroups := sets.NewString(az.ResourceGroup) + for _, rg := range az.nodeResourceGroups { + resourceGroups.Insert(rg) + } + + return resourceGroups, nil +} + +// GetUnmanagedNodes returns a list of nodes not managed by Azure cloud provider (e.g. on-prem nodes). +func (az *Cloud) GetUnmanagedNodes() (sets.String, error) { + // Kubelet won't set az.nodeInformerSynced, always return nil. + if az.nodeInformerSynced == nil { + return nil, nil + } + + az.nodeCachesLock.Lock() + defer az.nodeCachesLock.Unlock() + if !az.nodeInformerSynced() { + return nil, fmt.Errorf("node informer is not synced when trying to GetUnmanagedNodes") + } + + return sets.NewString(az.unmanagedNodes.List()...), nil +} + +// ShouldNodeExcludedFromLoadBalancer returns true if node is unmanaged or in external resource group. +func (az *Cloud) ShouldNodeExcludedFromLoadBalancer(node *v1.Node) bool { + labels := node.ObjectMeta.Labels + if rg, ok := labels[externalResourceGroupLabel]; ok && rg != az.ResourceGroup { + return true + } + + if managed, ok := labels[managedByAzureLabel]; ok && managed == "false" { + return true + } + + return false +}