Merge pull request #100110 from CecileRobertMichon/azure-vm-cache

Cherry pick #537 from cloud provider azure: Refresh VM cache when node is not found
This commit is contained in:
Kubernetes Prow Robot 2021-04-08 17:12:13 -07:00 committed by GitHub
commit 830055379b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 60 additions and 5 deletions

View File

@ -274,6 +274,8 @@ type Cloud struct {
ipv6DualStackEnabled bool
// Lock for access to node caches, includes nodeZones, nodeResourceGroups, and unmanagedNodes.
nodeCachesLock sync.RWMutex
// nodeNames holds current nodes for tracking added nodes in VM caches.
nodeNames sets.String
// nodeZones is a mapping from Zone to a sets.String of Node's names in the Zone
// it is updated by the nodeInformer
nodeZones map[string]sets.String
@ -342,6 +344,7 @@ func NewCloudWithoutFeatureGates(configReader io.Reader) (*Cloud, error) {
}
az := &Cloud{
nodeNames: sets.NewString(),
nodeZones: map[string]sets.String{},
nodeResourceGroups: map[string]string{},
unmanagedNodes: sets.NewString(),
@ -782,6 +785,9 @@ func (az *Cloud) updateNodeCaches(prevNode, newNode *v1.Node) {
defer az.nodeCachesLock.Unlock()
if prevNode != nil {
// Remove from nodeNames cache.
az.nodeNames.Delete(prevNode.ObjectMeta.Name)
// Remove from nodeZones cache.
prevZone, ok := prevNode.ObjectMeta.Labels[LabelFailureDomainBetaZone]
if ok && az.isAvailabilityZone(prevZone) {
@ -805,6 +811,9 @@ func (az *Cloud) updateNodeCaches(prevNode, newNode *v1.Node) {
}
if newNode != nil {
// Add to nodeNames cache.
az.nodeNames.Insert(newNode.ObjectMeta.Name)
// Add to nodeZones cache.
newZone, ok := newNode.ObjectMeta.Labels[LabelFailureDomainBetaZone]
if ok && az.isAvailabilityZone(newZone) {
@ -876,6 +885,22 @@ func (az *Cloud) GetNodeResourceGroup(nodeName string) (string, error) {
return az.ResourceGroup, nil
}
// GetNodeNames returns a set of all node names in the k8s cluster.
func (az *Cloud) GetNodeNames() (sets.String, error) {
// Kubelet won't set az.nodeInformerSynced, return nil.
if az.nodeInformerSynced == nil {
return nil, nil
}
az.nodeCachesLock.RLock()
defer az.nodeCachesLock.RUnlock()
if !az.nodeInformerSynced() {
return nil, fmt.Errorf("node informer is not synced when trying to GetNodeNames")
}
return sets.NewString(az.nodeNames.List()...), nil
}
// GetResourceGroups returns a set of resource groups that all nodes are running on.
func (az *Cloud) GetResourceGroups() (sets.String, error) {
// Kubelet won't set az.nodeInformerSynced, always return configured resourceGroup.

View File

@ -3244,6 +3244,7 @@ func TestUpdateNodeCaches(t *testing.T) {
az.nodeZones = map[string]sets.String{zone: nodesInZone}
az.nodeResourceGroups = map[string]string{"prevNode": "rg"}
az.unmanagedNodes = sets.NewString("prevNode")
az.nodeNames = sets.NewString("prevNode")
prevNode := v1.Node{
ObjectMeta: metav1.ObjectMeta{
@ -3260,6 +3261,7 @@ func TestUpdateNodeCaches(t *testing.T) {
assert.Equal(t, 0, len(az.nodeZones[zone]))
assert.Equal(t, 0, len(az.nodeResourceGroups))
assert.Equal(t, 0, len(az.unmanagedNodes))
assert.Equal(t, 0, len(az.nodeNames))
newNode := v1.Node{
ObjectMeta: metav1.ObjectMeta{
@ -3276,6 +3278,7 @@ func TestUpdateNodeCaches(t *testing.T) {
assert.Equal(t, 1, len(az.nodeZones[zone]))
assert.Equal(t, 1, len(az.nodeResourceGroups))
assert.Equal(t, 1, len(az.unmanagedNodes))
assert.Equal(t, 1, len(az.nodeNames))
}
func TestGetActiveZones(t *testing.T) {

View File

@ -74,7 +74,8 @@ type scaleSet struct {
*Cloud
// availabilitySet is also required for scaleSet because some instances
// (e.g. master nodes) may not belong to any scale sets.
// (e.g. control plane nodes) may not belong to any scale sets.
// this also allows for clusters with both VM and VMSS nodes.
availabilitySet VMSet
vmssCache *azcache.TimedCache

View File

@ -58,6 +58,11 @@ type vmssEntry struct {
lastUpdate time.Time
}
type availabilitySetEntry struct {
vmNames sets.String
nodeNames sets.String
}
func (ss *scaleSet) newVMSSCache() (*azcache.TimedCache, error) {
getter := func(key string) (interface{}, error) {
localCache := &sync.Map{} // [vmssName]*vmssEntry
@ -273,7 +278,7 @@ func (ss *scaleSet) deleteCacheForNode(nodeName string) error {
func (ss *scaleSet) newAvailabilitySetNodesCache() (*azcache.TimedCache, error) {
getter := func(key string) (interface{}, error) {
localCache := sets.NewString()
vmNames := sets.NewString()
resourceGroups, err := ss.GetResourceGroups()
if err != nil {
return nil, err
@ -287,11 +292,22 @@ func (ss *scaleSet) newAvailabilitySetNodesCache() (*azcache.TimedCache, error)
for _, vm := range vmList {
if vm.Name != nil {
localCache.Insert(*vm.Name)
vmNames.Insert(*vm.Name)
}
}
}
// store all the node names in the cluster when the cache data was created.
nodeNames, err := ss.GetNodeNames()
if err != nil {
return nil, err
}
localCache := availabilitySetEntry{
vmNames: vmNames,
nodeNames: nodeNames,
}
return localCache, nil
}
@ -313,6 +329,16 @@ func (ss *scaleSet) isNodeManagedByAvailabilitySet(nodeName string, crt azcache.
return false, err
}
availabilitySetNodes := cached.(sets.String)
return availabilitySetNodes.Has(nodeName), nil
cachedNodes := cached.(availabilitySetEntry).nodeNames
// if the node is not in the cache, assume the node has joined after the last cache refresh and attempt to refresh the cache.
if !cachedNodes.Has(nodeName) {
klog.V(2).Infof("Node %s has joined the cluster since the last VM cache refresh, refreshing the cache", nodeName)
cached, err = ss.availabilitySetNodesCache.Get(availabilitySetNodesKey, azcache.CacheReadTypeForceRefresh)
if err != nil {
return false, err
}
}
cachedVMs := cached.(availabilitySetEntry).vmNames
return cachedVMs.Has(nodeName), nil
}