diff --git a/staging/src/k8s.io/legacy-cloud-providers/azure/azure.go b/staging/src/k8s.io/legacy-cloud-providers/azure/azure.go index 30ad09c534a..3e9d13dedc3 100644 --- a/staging/src/k8s.io/legacy-cloud-providers/azure/azure.go +++ b/staging/src/k8s.io/legacy-cloud-providers/azure/azure.go @@ -274,6 +274,8 @@ type Cloud struct { ipv6DualStackEnabled bool // Lock for access to node caches, includes nodeZones, nodeResourceGroups, and unmanagedNodes. nodeCachesLock sync.RWMutex + // nodeNames holds current nodes for tracking added nodes in VM caches. + nodeNames sets.String // nodeZones is a mapping from Zone to a sets.String of Node's names in the Zone // it is updated by the nodeInformer nodeZones map[string]sets.String @@ -342,6 +344,7 @@ func NewCloudWithoutFeatureGates(configReader io.Reader) (*Cloud, error) { } az := &Cloud{ + nodeNames: sets.NewString(), nodeZones: map[string]sets.String{}, nodeResourceGroups: map[string]string{}, unmanagedNodes: sets.NewString(), @@ -782,6 +785,9 @@ func (az *Cloud) updateNodeCaches(prevNode, newNode *v1.Node) { defer az.nodeCachesLock.Unlock() if prevNode != nil { + // Remove from nodeNames cache. + az.nodeNames.Delete(prevNode.ObjectMeta.Name) + // Remove from nodeZones cache. prevZone, ok := prevNode.ObjectMeta.Labels[LabelFailureDomainBetaZone] if ok && az.isAvailabilityZone(prevZone) { @@ -805,6 +811,9 @@ func (az *Cloud) updateNodeCaches(prevNode, newNode *v1.Node) { } if newNode != nil { + // Add to nodeNames cache. + az.nodeNames.Insert(newNode.ObjectMeta.Name) + // Add to nodeZones cache. newZone, ok := newNode.ObjectMeta.Labels[LabelFailureDomainBetaZone] if ok && az.isAvailabilityZone(newZone) { @@ -876,6 +885,22 @@ func (az *Cloud) GetNodeResourceGroup(nodeName string) (string, error) { return az.ResourceGroup, nil } +// GetNodeNames returns a set of all node names in the k8s cluster. +func (az *Cloud) GetNodeNames() (sets.String, error) { + // Kubelet won't set az.nodeInformerSynced, return nil. + if az.nodeInformerSynced == nil { + return nil, nil + } + + az.nodeCachesLock.RLock() + defer az.nodeCachesLock.RUnlock() + if !az.nodeInformerSynced() { + return nil, fmt.Errorf("node informer is not synced when trying to GetNodeNames") + } + + return sets.NewString(az.nodeNames.List()...), nil +} + // GetResourceGroups returns a set of resource groups that all nodes are running on. func (az *Cloud) GetResourceGroups() (sets.String, error) { // Kubelet won't set az.nodeInformerSynced, always return configured resourceGroup. diff --git a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_test.go b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_test.go index ba4ad047b48..32cb052a924 100644 --- a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_test.go +++ b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_test.go @@ -3244,6 +3244,7 @@ func TestUpdateNodeCaches(t *testing.T) { az.nodeZones = map[string]sets.String{zone: nodesInZone} az.nodeResourceGroups = map[string]string{"prevNode": "rg"} az.unmanagedNodes = sets.NewString("prevNode") + az.nodeNames = sets.NewString("prevNode") prevNode := v1.Node{ ObjectMeta: metav1.ObjectMeta{ @@ -3260,6 +3261,7 @@ func TestUpdateNodeCaches(t *testing.T) { assert.Equal(t, 0, len(az.nodeZones[zone])) assert.Equal(t, 0, len(az.nodeResourceGroups)) assert.Equal(t, 0, len(az.unmanagedNodes)) + assert.Equal(t, 0, len(az.nodeNames)) newNode := v1.Node{ ObjectMeta: metav1.ObjectMeta{ @@ -3276,6 +3278,7 @@ func TestUpdateNodeCaches(t *testing.T) { assert.Equal(t, 1, len(az.nodeZones[zone])) assert.Equal(t, 1, len(az.nodeResourceGroups)) assert.Equal(t, 1, len(az.unmanagedNodes)) + assert.Equal(t, 1, len(az.nodeNames)) } func TestGetActiveZones(t *testing.T) { diff --git a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_vmss.go b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_vmss.go index 8504aada304..49e0d755fde 100644 --- a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_vmss.go +++ b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_vmss.go @@ -74,7 +74,8 @@ type scaleSet struct { *Cloud // availabilitySet is also required for scaleSet because some instances - // (e.g. master nodes) may not belong to any scale sets. + // (e.g. control plane nodes) may not belong to any scale sets. + // this also allows for clusters with both VM and VMSS nodes. availabilitySet VMSet vmssCache *azcache.TimedCache diff --git a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_vmss_cache.go b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_vmss_cache.go index 5abfec1936e..576e6539421 100644 --- a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_vmss_cache.go +++ b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_vmss_cache.go @@ -58,6 +58,11 @@ type vmssEntry struct { lastUpdate time.Time } +type availabilitySetEntry struct { + vmNames sets.String + nodeNames sets.String +} + func (ss *scaleSet) newVMSSCache() (*azcache.TimedCache, error) { getter := func(key string) (interface{}, error) { localCache := &sync.Map{} // [vmssName]*vmssEntry @@ -273,7 +278,7 @@ func (ss *scaleSet) deleteCacheForNode(nodeName string) error { func (ss *scaleSet) newAvailabilitySetNodesCache() (*azcache.TimedCache, error) { getter := func(key string) (interface{}, error) { - localCache := sets.NewString() + vmNames := sets.NewString() resourceGroups, err := ss.GetResourceGroups() if err != nil { return nil, err @@ -287,11 +292,22 @@ func (ss *scaleSet) newAvailabilitySetNodesCache() (*azcache.TimedCache, error) for _, vm := range vmList { if vm.Name != nil { - localCache.Insert(*vm.Name) + vmNames.Insert(*vm.Name) } } } + // store all the node names in the cluster when the cache data was created. + nodeNames, err := ss.GetNodeNames() + if err != nil { + return nil, err + } + + localCache := availabilitySetEntry{ + vmNames: vmNames, + nodeNames: nodeNames, + } + return localCache, nil } @@ -313,6 +329,16 @@ func (ss *scaleSet) isNodeManagedByAvailabilitySet(nodeName string, crt azcache. return false, err } - availabilitySetNodes := cached.(sets.String) - return availabilitySetNodes.Has(nodeName), nil + cachedNodes := cached.(availabilitySetEntry).nodeNames + // if the node is not in the cache, assume the node has joined after the last cache refresh and attempt to refresh the cache. + if !cachedNodes.Has(nodeName) { + klog.V(2).Infof("Node %s has joined the cluster since the last VM cache refresh, refreshing the cache", nodeName) + cached, err = ss.availabilitySetNodesCache.Get(availabilitySetNodesKey, azcache.CacheReadTypeForceRefresh) + if err != nil { + return false, err + } + } + + cachedVMs := cached.(availabilitySetEntry).vmNames + return cachedVMs.Has(nodeName), nil }