add allowunsafe read

update common controller

add comment

update to cached data

add cacheReadType type

update logic for disk reconcile loop to read from cache

update delete cache for node

review feedback
This commit is contained in:
Anish Ramasekar 2019-10-08 13:13:36 -07:00
parent b717be8269
commit cebdd85186
No known key found for this signature in database
GPG Key ID: 9E68541CF00F1FA4
17 changed files with 203 additions and 98 deletions

View File

@ -65,11 +65,11 @@ func (az *Cloud) Event(obj runtime.Object, eventtype, reason, message string) {
} }
// GetVirtualMachineWithRetry invokes az.getVirtualMachine with exponential backoff retry // GetVirtualMachineWithRetry invokes az.getVirtualMachine with exponential backoff retry
func (az *Cloud) GetVirtualMachineWithRetry(name types.NodeName) (compute.VirtualMachine, error) { func (az *Cloud) GetVirtualMachineWithRetry(name types.NodeName, crt cacheReadType) (compute.VirtualMachine, error) {
var machine compute.VirtualMachine var machine compute.VirtualMachine
var retryErr error var retryErr error
err := wait.ExponentialBackoff(az.RequestBackoff(), func() (bool, error) { err := wait.ExponentialBackoff(az.RequestBackoff(), func() (bool, error) {
machine, retryErr = az.getVirtualMachine(name) machine, retryErr = az.getVirtualMachine(name, crt)
if retryErr == cloudprovider.InstanceNotFound { if retryErr == cloudprovider.InstanceNotFound {
return true, cloudprovider.InstanceNotFound return true, cloudprovider.InstanceNotFound
} }

View File

@ -26,6 +26,20 @@ import (
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
) )
// cacheReadType defines the read type for cache data
type cacheReadType int
const (
// cachedData returns data from cache if cache entry not expired
// if cache entry expired, then it will refetch the data using getter
// save the entry in cache and then return
cachedData cacheReadType = iota
// allowUnsafeRead returns data from cache even if the cache entry is
// active/expired. If entry doesn't exist in cache, then data is fetched
// using getter, saved in cache and returned
allowUnsafeRead
)
// getFunc defines a getter function for timedCache. // getFunc defines a getter function for timedCache.
type getFunc func(key string) (interface{}, error) type getFunc func(key string) (interface{}, error)
@ -36,6 +50,8 @@ type cacheEntry struct {
// The lock to ensure not updating same entry simultaneously. // The lock to ensure not updating same entry simultaneously.
lock sync.Mutex lock sync.Mutex
// time when entry was fetched and created
createdOn time.Time
} }
// cacheKeyFunc defines the key function required in TTLStore. // cacheKeyFunc defines the key function required in TTLStore.
@ -48,6 +64,7 @@ type timedCache struct {
store cache.Store store cache.Store
lock sync.Mutex lock sync.Mutex
getter getFunc getter getFunc
ttl time.Duration
} }
// newTimedcache creates a new timedCache. // newTimedcache creates a new timedCache.
@ -58,7 +75,11 @@ func newTimedcache(ttl time.Duration, getter getFunc) (*timedCache, error) {
return &timedCache{ return &timedCache{
getter: getter, getter: getter,
store: cache.NewTTLStore(cacheKeyFunc, ttl), // switch to using NewStore instead of NewTTLStore so that we can
// reuse entries for calls that are fine with reading expired/stalled data.
// with NewTTLStore, entries are not returned if they have already expired.
store: cache.NewStore(cacheKeyFunc),
ttl: ttl,
}, nil }, nil
} }
@ -69,16 +90,12 @@ func (t *timedCache) getInternal(key string) (*cacheEntry, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
if exists { // lock here to ensure if entry doesn't exist, we add a new entry
return entry.(*cacheEntry), nil // avoiding overwrites
}
t.lock.Lock() t.lock.Lock()
defer t.lock.Unlock() defer t.lock.Unlock()
entry, exists, err = t.store.GetByKey(key)
if err != nil { // if entry exists, return the entry
return nil, err
}
if exists { if exists {
return entry.(*cacheEntry), nil return entry.(*cacheEntry), nil
} }
@ -94,26 +111,38 @@ func (t *timedCache) getInternal(key string) (*cacheEntry, error) {
} }
// Get returns the requested item by key. // Get returns the requested item by key.
func (t *timedCache) Get(key string) (interface{}, error) { func (t *timedCache) Get(key string, crt cacheReadType) (interface{}, error) {
entry, err := t.getInternal(key) entry, err := t.getInternal(key)
if err != nil { if err != nil {
return nil, err return nil, err
} }
// Data is still not cached yet, cache it by getter.
if entry.data == nil {
entry.lock.Lock() entry.lock.Lock()
defer entry.lock.Unlock() defer entry.lock.Unlock()
if entry.data == nil { // entry exists
if entry.data != nil {
// allow dirty, so return data even if expired
if crt == allowUnsafeRead {
return entry.data, nil
}
// if cached data is not expired, return cached data
if time.Since(entry.createdOn) < t.ttl && crt == cachedData {
return entry.data, nil
}
}
// Data is not cached yet or cache data is expired, cache it by getter.
// entry is locked before getting to ensure concurrent gets don't result in
// multiple ARM calls.
data, err := t.getter(key) data, err := t.getter(key)
if err != nil { if err != nil {
return nil, err return nil, err
} }
// set the data in cache and also set the last update time
// to now as the data was recently fetched
entry.data = data entry.data = data
} entry.createdOn = time.Now().UTC()
}
return entry.data, nil return entry.data, nil
} }
@ -131,5 +160,6 @@ func (t *timedCache) Set(key string, data interface{}) {
t.store.Add(&cacheEntry{ t.store.Add(&cacheEntry{
key: key, key: key,
data: data, data: data,
createdOn: time.Now().UTC(),
}) })
} }

View File

@ -99,7 +99,7 @@ func TestCacheGet(t *testing.T) {
for _, c := range cases { for _, c := range cases {
dataSource, cache := newFakeCache(t) dataSource, cache := newFakeCache(t)
dataSource.set(c.data) dataSource.set(c.data)
val, err := cache.Get(c.key) val, err := cache.Get(c.key, cachedData)
assert.NoError(t, err, c.name) assert.NoError(t, err, c.name)
assert.Equal(t, c.expected, val, c.name) assert.Equal(t, c.expected, val, c.name)
} }
@ -113,7 +113,7 @@ func TestCacheGetError(t *testing.T) {
cache, err := newTimedcache(fakeCacheTTL, getter) cache, err := newTimedcache(fakeCacheTTL, getter)
assert.NoError(t, err) assert.NoError(t, err)
val, err := cache.Get("key") val, err := cache.Get("key", cachedData)
assert.Error(t, err) assert.Error(t, err)
assert.Equal(t, getError, err) assert.Equal(t, getError, err)
assert.Nil(t, val) assert.Nil(t, val)
@ -128,13 +128,13 @@ func TestCacheDelete(t *testing.T) {
dataSource, cache := newFakeCache(t) dataSource, cache := newFakeCache(t)
dataSource.set(data) dataSource.set(data)
v, err := cache.Get(key) v, err := cache.Get(key, cachedData)
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, val, v, "cache should get correct data") assert.Equal(t, val, v, "cache should get correct data")
dataSource.set(nil) dataSource.set(nil)
cache.Delete(key) cache.Delete(key)
v, err = cache.Get(key) v, err = cache.Get(key, cachedData)
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, 1, dataSource.called) assert.Equal(t, 1, dataSource.called)
assert.Equal(t, nil, v, "cache should get nil after data is removed") assert.Equal(t, nil, v, "cache should get nil after data is removed")
@ -149,14 +149,58 @@ func TestCacheExpired(t *testing.T) {
dataSource, cache := newFakeCache(t) dataSource, cache := newFakeCache(t)
dataSource.set(data) dataSource.set(data)
v, err := cache.Get(key) v, err := cache.Get(key, cachedData)
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, 1, dataSource.called) assert.Equal(t, 1, dataSource.called)
assert.Equal(t, val, v, "cache should get correct data") assert.Equal(t, val, v, "cache should get correct data")
time.Sleep(fakeCacheTTL) time.Sleep(fakeCacheTTL)
v, err = cache.Get(key) v, err = cache.Get(key, cachedData)
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, 2, dataSource.called) assert.Equal(t, 2, dataSource.called)
assert.Equal(t, val, v, "cache should get correct data even after expired") assert.Equal(t, val, v, "cache should get correct data even after expired")
} }
func TestCacheAllowUnsafeRead(t *testing.T) {
key := "key1"
val := &fakeDataObj{}
data := map[string]*fakeDataObj{
key: val,
}
dataSource, cache := newFakeCache(t)
dataSource.set(data)
v, err := cache.Get(key, cachedData)
assert.NoError(t, err)
assert.Equal(t, 1, dataSource.called)
assert.Equal(t, val, v, "cache should get correct data")
time.Sleep(fakeCacheTTL)
v, err = cache.Get(key, allowUnsafeRead)
assert.NoError(t, err)
assert.Equal(t, 1, dataSource.called)
assert.Equal(t, val, v, "cache should return expired as allow unsafe read is allowed")
}
func TestCacheNoConcurrentGet(t *testing.T) {
key := "key1"
val := &fakeDataObj{}
data := map[string]*fakeDataObj{
key: val,
}
dataSource, cache := newFakeCache(t)
dataSource.set(data)
time.Sleep(fakeCacheTTL)
var wg sync.WaitGroup
for i := 0; i < 5; i++ {
wg.Add(1)
go cache.Get(key, cachedData)
wg.Done()
}
v, err := cache.Get(key, cachedData)
wg.Wait()
assert.NoError(t, err)
assert.Equal(t, 1, dataSource.called)
assert.Equal(t, val, v, "cache should get correct data")
}

View File

@ -69,7 +69,7 @@ type controllerCommon struct {
} }
// getNodeVMSet gets the VMSet interface based on config.VMType and the real virtual machine type. // getNodeVMSet gets the VMSet interface based on config.VMType and the real virtual machine type.
func (c *controllerCommon) getNodeVMSet(nodeName types.NodeName) (VMSet, error) { func (c *controllerCommon) getNodeVMSet(nodeName types.NodeName, crt cacheReadType) (VMSet, error) {
// 1. vmType is standard, return cloud.vmSet directly. // 1. vmType is standard, return cloud.vmSet directly.
if c.cloud.VMType == vmTypeStandard { if c.cloud.VMType == vmTypeStandard {
return c.cloud.vmSet, nil return c.cloud.vmSet, nil
@ -82,7 +82,7 @@ func (c *controllerCommon) getNodeVMSet(nodeName types.NodeName) (VMSet, error)
} }
// 3. If the node is managed by availability set, then return ss.availabilitySet. // 3. If the node is managed by availability set, then return ss.availabilitySet.
managedByAS, err := ss.isNodeManagedByAvailabilitySet(mapNodeNameToVMName(nodeName)) managedByAS, err := ss.isNodeManagedByAvailabilitySet(mapNodeNameToVMName(nodeName), crt)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -124,14 +124,14 @@ func (c *controllerCommon) AttachDisk(isManagedDisk bool, diskName, diskURI stri
} }
} }
vmset, err := c.getNodeVMSet(nodeName) vmset, err := c.getNodeVMSet(nodeName, allowUnsafeRead)
if err != nil { if err != nil {
return -1, err return -1, err
} }
instanceid, err := c.cloud.InstanceID(context.TODO(), nodeName) instanceid, err := c.cloud.InstanceID(context.TODO(), nodeName)
if err != nil { if err != nil {
klog.Warningf("failed to get azure instance id (%v)", err) klog.Warningf("failed to get azure instance id (%v) for node %s", err, nodeName)
return -1, fmt.Errorf("failed to get azure instance id for node %q (%v)", nodeName, err) return -1, fmt.Errorf("failed to get azure instance id for node %q (%v)", nodeName, err)
} }
@ -162,7 +162,7 @@ func (c *controllerCommon) DetachDisk(diskName, diskURI string, nodeName types.N
return fmt.Errorf("failed to get azure instance id for node %q (%v)", nodeName, err) return fmt.Errorf("failed to get azure instance id for node %q (%v)", nodeName, err)
} }
vmset, err := c.getNodeVMSet(nodeName) vmset, err := c.getNodeVMSet(nodeName, allowUnsafeRead)
if err != nil { if err != nil {
return err return err
} }
@ -197,18 +197,20 @@ func (c *controllerCommon) DetachDisk(diskName, diskURI string, nodeName types.N
} }
// getNodeDataDisks invokes vmSet interfaces to get data disks for the node. // getNodeDataDisks invokes vmSet interfaces to get data disks for the node.
func (c *controllerCommon) getNodeDataDisks(nodeName types.NodeName) ([]compute.DataDisk, error) { func (c *controllerCommon) getNodeDataDisks(nodeName types.NodeName, crt cacheReadType) ([]compute.DataDisk, error) {
vmset, err := c.getNodeVMSet(nodeName) vmset, err := c.getNodeVMSet(nodeName, crt)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return vmset.GetDataDisks(nodeName) return vmset.GetDataDisks(nodeName, crt)
} }
// GetDiskLun finds the lun on the host that the vhd is attached to, given a vhd's diskName and diskURI. // GetDiskLun finds the lun on the host that the vhd is attached to, given a vhd's diskName and diskURI.
func (c *controllerCommon) GetDiskLun(diskName, diskURI string, nodeName types.NodeName) (int32, error) { func (c *controllerCommon) GetDiskLun(diskName, diskURI string, nodeName types.NodeName) (int32, error) {
disks, err := c.getNodeDataDisks(nodeName) // getNodeDataDisks need to fetch the cached data/fresh data if cache expired here
// to ensure we get LUN based on latest entry.
disks, err := c.getNodeDataDisks(nodeName, cachedData)
if err != nil { if err != nil {
klog.Errorf("error of getting data disks for node %q: %v", nodeName, err) klog.Errorf("error of getting data disks for node %q: %v", nodeName, err)
return -1, err return -1, err
@ -228,7 +230,7 @@ func (c *controllerCommon) GetDiskLun(diskName, diskURI string, nodeName types.N
// GetNextDiskLun searches all vhd attachment on the host and find unused lun. Return -1 if all luns are used. // GetNextDiskLun searches all vhd attachment on the host and find unused lun. Return -1 if all luns are used.
func (c *controllerCommon) GetNextDiskLun(nodeName types.NodeName) (int32, error) { func (c *controllerCommon) GetNextDiskLun(nodeName types.NodeName) (int32, error) {
disks, err := c.getNodeDataDisks(nodeName) disks, err := c.getNodeDataDisks(nodeName, cachedData)
if err != nil { if err != nil {
klog.Errorf("error of getting data disks for node %q: %v", nodeName, err) klog.Errorf("error of getting data disks for node %q: %v", nodeName, err)
return -1, err return -1, err
@ -255,7 +257,11 @@ func (c *controllerCommon) DisksAreAttached(diskNames []string, nodeName types.N
attached[diskName] = false attached[diskName] = false
} }
disks, err := c.getNodeDataDisks(nodeName) // doing stalled read for getNodeDataDisks to ensure we don't call ARM
// for every reconcile call. The cache is invalidated after Attach/Detach
// disk. So the new entry will be fetched and cached the first time reconcile
// loop runs after the Attach/Disk OP which will reflect the latest model.
disks, err := c.getNodeDataDisks(nodeName, allowUnsafeRead)
if err != nil { if err != nil {
if err == cloudprovider.InstanceNotFound { if err == cloudprovider.InstanceNotFound {
// if host doesn't exist, no need to detach // if host doesn't exist, no need to detach

View File

@ -31,7 +31,7 @@ import (
// AttachDisk attaches a vhd to vm // AttachDisk attaches a vhd to vm
// the vhd must exist, can be identified by diskName, diskURI, and lun. // the vhd must exist, can be identified by diskName, diskURI, and lun.
func (as *availabilitySet) AttachDisk(isManagedDisk bool, diskName, diskURI string, nodeName types.NodeName, lun int32, cachingMode compute.CachingTypes) error { func (as *availabilitySet) AttachDisk(isManagedDisk bool, diskName, diskURI string, nodeName types.NodeName, lun int32, cachingMode compute.CachingTypes) error {
vm, err := as.getVirtualMachine(nodeName) vm, err := as.getVirtualMachine(nodeName, cachedData)
if err != nil { if err != nil {
return err return err
} }
@ -102,7 +102,7 @@ func (as *availabilitySet) AttachDisk(isManagedDisk bool, diskName, diskURI stri
// DetachDisk detaches a disk from host // DetachDisk detaches a disk from host
// the vhd can be identified by diskName or diskURI // the vhd can be identified by diskName or diskURI
func (as *availabilitySet) DetachDisk(diskName, diskURI string, nodeName types.NodeName) (*http.Response, error) { func (as *availabilitySet) DetachDisk(diskName, diskURI string, nodeName types.NodeName) (*http.Response, error) {
vm, err := as.getVirtualMachine(nodeName) vm, err := as.getVirtualMachine(nodeName, cachedData)
if err != nil { if err != nil {
// if host doesn't exist, no need to detach // if host doesn't exist, no need to detach
klog.Warningf("azureDisk - cannot find node %s, skip detaching disk(%s, %s)", nodeName, diskName, diskURI) klog.Warningf("azureDisk - cannot find node %s, skip detaching disk(%s, %s)", nodeName, diskName, diskURI)
@ -155,8 +155,8 @@ func (as *availabilitySet) DetachDisk(diskName, diskURI string, nodeName types.N
} }
// GetDataDisks gets a list of data disks attached to the node. // GetDataDisks gets a list of data disks attached to the node.
func (as *availabilitySet) GetDataDisks(nodeName types.NodeName) ([]compute.DataDisk, error) { func (as *availabilitySet) GetDataDisks(nodeName types.NodeName, crt cacheReadType) ([]compute.DataDisk, error) {
vm, err := as.getVirtualMachine(nodeName) vm, err := as.getVirtualMachine(nodeName, crt)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -20,6 +20,7 @@ package azure
import ( import (
"testing" "testing"
"time"
"github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2019-07-01/compute" "github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2019-07-01/compute"
"github.com/Azure/go-autorest/autorest/to" "github.com/Azure/go-autorest/autorest/to"
@ -99,12 +100,14 @@ func TestGetDataDisks(t *testing.T) {
nodeName types.NodeName nodeName types.NodeName
expectedDataDisks []compute.DataDisk expectedDataDisks []compute.DataDisk
expectedError bool expectedError bool
crt cacheReadType
}{ }{
{ {
desc: "an error shall be returned if there's no corresponding vm", desc: "an error shall be returned if there's no corresponding vm",
nodeName: "vm2", nodeName: "vm2",
expectedDataDisks: nil, expectedDataDisks: nil,
expectedError: true, expectedError: true,
crt: cachedData,
}, },
{ {
desc: "correct list of data disks shall be returned if everything is good", desc: "correct list of data disks shall be returned if everything is good",
@ -116,6 +119,19 @@ func TestGetDataDisks(t *testing.T) {
}, },
}, },
expectedError: false, expectedError: false,
crt: cachedData,
},
{
desc: "correct list of data disks shall be returned if everything is good",
nodeName: "vm1",
expectedDataDisks: []compute.DataDisk{
{
Lun: to.Int32Ptr(0),
Name: to.StringPtr("disk1"),
},
},
expectedError: false,
crt: allowUnsafeRead,
}, },
} }
for i, test := range testCases { for i, test := range testCases {
@ -123,8 +139,15 @@ func TestGetDataDisks(t *testing.T) {
vmSet := testCloud.vmSet vmSet := testCloud.vmSet
setTestVirtualMachines(testCloud, map[string]string{"vm1": "PowerState/Running"}, false) setTestVirtualMachines(testCloud, map[string]string{"vm1": "PowerState/Running"}, false)
dataDisks, err := vmSet.GetDataDisks(test.nodeName) dataDisks, err := vmSet.GetDataDisks(test.nodeName, test.crt)
assert.Equal(t, test.expectedDataDisks, dataDisks, "TestCase[%d]: %s", i, test.desc)
assert.Equal(t, test.expectedError, err != nil, "TestCase[%d]: %s", i, test.desc)
if test.crt == allowUnsafeRead {
time.Sleep(fakeCacheTTL)
dataDisks, err := vmSet.GetDataDisks(test.nodeName, test.crt)
assert.Equal(t, test.expectedDataDisks, dataDisks, "TestCase[%d]: %s", i, test.desc) assert.Equal(t, test.expectedDataDisks, dataDisks, "TestCase[%d]: %s", i, test.desc)
assert.Equal(t, test.expectedError, err != nil, "TestCase[%d]: %s", i, test.desc) assert.Equal(t, test.expectedError, err != nil, "TestCase[%d]: %s", i, test.desc)
} }
}
} }

View File

@ -32,7 +32,7 @@ import (
// the vhd must exist, can be identified by diskName, diskURI, and lun. // the vhd must exist, can be identified by diskName, diskURI, and lun.
func (ss *scaleSet) AttachDisk(isManagedDisk bool, diskName, diskURI string, nodeName types.NodeName, lun int32, cachingMode compute.CachingTypes) error { func (ss *scaleSet) AttachDisk(isManagedDisk bool, diskName, diskURI string, nodeName types.NodeName, lun int32, cachingMode compute.CachingTypes) error {
vmName := mapNodeNameToVMName(nodeName) vmName := mapNodeNameToVMName(nodeName)
ssName, instanceID, vm, err := ss.getVmssVM(vmName) ssName, instanceID, vm, err := ss.getVmssVM(vmName, cachedData)
if err != nil { if err != nil {
return err return err
} }
@ -109,7 +109,7 @@ func (ss *scaleSet) AttachDisk(isManagedDisk bool, diskName, diskURI string, nod
// the vhd can be identified by diskName or diskURI // the vhd can be identified by diskName or diskURI
func (ss *scaleSet) DetachDisk(diskName, diskURI string, nodeName types.NodeName) (*http.Response, error) { func (ss *scaleSet) DetachDisk(diskName, diskURI string, nodeName types.NodeName) (*http.Response, error) {
vmName := mapNodeNameToVMName(nodeName) vmName := mapNodeNameToVMName(nodeName)
ssName, instanceID, vm, err := ss.getVmssVM(vmName) ssName, instanceID, vm, err := ss.getVmssVM(vmName, cachedData)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -167,8 +167,8 @@ func (ss *scaleSet) DetachDisk(diskName, diskURI string, nodeName types.NodeName
} }
// GetDataDisks gets a list of data disks attached to the node. // GetDataDisks gets a list of data disks attached to the node.
func (ss *scaleSet) GetDataDisks(nodeName types.NodeName) ([]compute.DataDisk, error) { func (ss *scaleSet) GetDataDisks(nodeName types.NodeName, crt cacheReadType) ([]compute.DataDisk, error) {
_, _, vm, err := ss.getVmssVM(string(nodeName)) _, _, vm, err := ss.getVmssVM(string(nodeName), crt)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -950,7 +950,7 @@ func (f *fakeVMSet) DetachDisk(diskName, diskURI string, nodeName types.NodeName
return nil, fmt.Errorf("unimplemented") return nil, fmt.Errorf("unimplemented")
} }
func (f *fakeVMSet) GetDataDisks(nodeName types.NodeName) ([]compute.DataDisk, error) { func (f *fakeVMSet) GetDataDisks(nodeName types.NodeName, crt cacheReadType) ([]compute.DataDisk, error) {
return nil, fmt.Errorf("unimplemented") return nil, fmt.Errorf("unimplemented")
} }

View File

@ -145,7 +145,7 @@ func (ims *InstanceMetadataService) getInstanceMetadata(key string) (interface{}
// GetMetadata gets instance metadata from cache. // GetMetadata gets instance metadata from cache.
func (ims *InstanceMetadataService) GetMetadata() (*InstanceMetadata, error) { func (ims *InstanceMetadataService) GetMetadata() (*InstanceMetadata, error) {
cache, err := ims.imsCache.Get(metadataCacheKey) cache, err := ims.imsCache.Get(metadataCacheKey, cachedData)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -962,7 +962,7 @@ func (az *Cloud) reconcileLoadBalancer(clusterName string, service *v1.Service,
if isInternal { if isInternal {
// Refresh updated lb which will be used later in other places. // Refresh updated lb which will be used later in other places.
newLB, exist, err := az.getAzureLoadBalancer(lbName) newLB, exist, err := az.getAzureLoadBalancer(lbName, cachedData)
if err != nil { if err != nil {
klog.V(2).Infof("reconcileLoadBalancer for service(%s): getAzureLoadBalancer(%s) failed: %v", serviceName, lbName, err) klog.V(2).Infof("reconcileLoadBalancer for service(%s): getAzureLoadBalancer(%s) failed: %v", serviceName, lbName, err)
return nil, err return nil, err
@ -1125,7 +1125,7 @@ func (az *Cloud) reconcileSecurityGroup(clusterName string, service *v1.Service,
ports = []v1.ServicePort{} ports = []v1.ServicePort{}
} }
sg, err := az.getSecurityGroup() sg, err := az.getSecurityGroup(cachedData)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -1466,7 +1466,7 @@ func (az *Cloud) reconcilePublicIP(clusterName string, service *v1.Service, lbNa
} }
if lbName != "" { if lbName != "" {
loadBalancer, _, err := az.getAzureLoadBalancer(lbName) loadBalancer, _, err := az.getAzureLoadBalancer(lbName, cachedData)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -46,7 +46,7 @@ const (
// ListRoutes lists all managed routes that belong to the specified clusterName // ListRoutes lists all managed routes that belong to the specified clusterName
func (az *Cloud) ListRoutes(ctx context.Context, clusterName string) ([]*cloudprovider.Route, error) { func (az *Cloud) ListRoutes(ctx context.Context, clusterName string) ([]*cloudprovider.Route, error) {
klog.V(10).Infof("ListRoutes: START clusterName=%q", clusterName) klog.V(10).Infof("ListRoutes: START clusterName=%q", clusterName)
routeTable, existsRouteTable, err := az.getRouteTable() routeTable, existsRouteTable, err := az.getRouteTable(cachedData)
routes, err := processRoutes(routeTable, existsRouteTable, err) routes, err := processRoutes(routeTable, existsRouteTable, err)
if err != nil { if err != nil {
return nil, err return nil, err
@ -102,7 +102,7 @@ func processRoutes(routeTable network.RouteTable, exists bool, err error) ([]*cl
} }
func (az *Cloud) createRouteTableIfNotExists(clusterName string, kubeRoute *cloudprovider.Route) error { func (az *Cloud) createRouteTableIfNotExists(clusterName string, kubeRoute *cloudprovider.Route) error {
if _, existsRouteTable, err := az.getRouteTable(); err != nil { if _, existsRouteTable, err := az.getRouteTable(cachedData); err != nil {
klog.V(2).Infof("createRouteTableIfNotExists error: couldn't get routetable. clusterName=%q instance=%q cidr=%q", clusterName, kubeRoute.TargetNode, kubeRoute.DestinationCIDR) klog.V(2).Infof("createRouteTableIfNotExists error: couldn't get routetable. clusterName=%q instance=%q cidr=%q", clusterName, kubeRoute.TargetNode, kubeRoute.DestinationCIDR)
return err return err
} else if existsRouteTable { } else if existsRouteTable {

View File

@ -375,14 +375,14 @@ func (as *availabilitySet) GetInstanceIDByNodeName(name string) (string, error)
var machine compute.VirtualMachine var machine compute.VirtualMachine
var err error var err error
machine, err = as.getVirtualMachine(types.NodeName(name)) machine, err = as.getVirtualMachine(types.NodeName(name), allowUnsafeRead)
if err == cloudprovider.InstanceNotFound { if err == cloudprovider.InstanceNotFound {
return "", cloudprovider.InstanceNotFound return "", cloudprovider.InstanceNotFound
} }
if err != nil { if err != nil {
if as.CloudProviderBackoff { if as.CloudProviderBackoff {
klog.V(2).Infof("GetInstanceIDByNodeName(%s) backing off", name) klog.V(2).Infof("GetInstanceIDByNodeName(%s) backing off", name)
machine, err = as.GetVirtualMachineWithRetry(types.NodeName(name)) machine, err = as.GetVirtualMachineWithRetry(types.NodeName(name), allowUnsafeRead)
if err != nil { if err != nil {
klog.V(2).Infof("GetInstanceIDByNodeName(%s) abort backoff", name) klog.V(2).Infof("GetInstanceIDByNodeName(%s) abort backoff", name)
return "", err return "", err
@ -403,7 +403,7 @@ func (as *availabilitySet) GetInstanceIDByNodeName(name string) (string, error)
// GetPowerStatusByNodeName returns the power state of the specified node. // GetPowerStatusByNodeName returns the power state of the specified node.
func (as *availabilitySet) GetPowerStatusByNodeName(name string) (powerState string, err error) { func (as *availabilitySet) GetPowerStatusByNodeName(name string) (powerState string, err error) {
vm, err := as.getVirtualMachine(types.NodeName(name)) vm, err := as.getVirtualMachine(types.NodeName(name), cachedData)
if err != nil { if err != nil {
return powerState, err return powerState, err
} }
@ -436,7 +436,7 @@ func (as *availabilitySet) GetNodeNameByProviderID(providerID string) (types.Nod
// GetInstanceTypeByNodeName gets the instance type by node name. // GetInstanceTypeByNodeName gets the instance type by node name.
func (as *availabilitySet) GetInstanceTypeByNodeName(name string) (string, error) { func (as *availabilitySet) GetInstanceTypeByNodeName(name string) (string, error) {
machine, err := as.getVirtualMachine(types.NodeName(name)) machine, err := as.getVirtualMachine(types.NodeName(name), allowUnsafeRead)
if err != nil { if err != nil {
klog.Errorf("as.GetInstanceTypeByNodeName(%s) failed: as.getVirtualMachine(%s) err=%v", name, name, err) klog.Errorf("as.GetInstanceTypeByNodeName(%s) failed: as.getVirtualMachine(%s) err=%v", name, name, err)
return "", err return "", err
@ -448,7 +448,7 @@ func (as *availabilitySet) GetInstanceTypeByNodeName(name string) (string, error
// GetZoneByNodeName gets availability zone for the specified node. If the node is not running // GetZoneByNodeName gets availability zone for the specified node. If the node is not running
// with availability zone, then it returns fault domain. // with availability zone, then it returns fault domain.
func (as *availabilitySet) GetZoneByNodeName(name string) (cloudprovider.Zone, error) { func (as *availabilitySet) GetZoneByNodeName(name string) (cloudprovider.Zone, error) {
vm, err := as.getVirtualMachine(types.NodeName(name)) vm, err := as.getVirtualMachine(types.NodeName(name), allowUnsafeRead)
if err != nil { if err != nil {
return cloudprovider.Zone{}, err return cloudprovider.Zone{}, err
} }
@ -649,7 +649,7 @@ func extractResourceGroupByNicID(nicID string) (string, error) {
func (as *availabilitySet) getPrimaryInterfaceWithVMSet(nodeName, vmSetName string) (network.Interface, error) { func (as *availabilitySet) getPrimaryInterfaceWithVMSet(nodeName, vmSetName string) (network.Interface, error) {
var machine compute.VirtualMachine var machine compute.VirtualMachine
machine, err := as.GetVirtualMachineWithRetry(types.NodeName(nodeName)) machine, err := as.GetVirtualMachineWithRetry(types.NodeName(nodeName), cachedData)
if err != nil { if err != nil {
klog.V(2).Infof("GetPrimaryInterface(%s, %s) abort backoff", nodeName, vmSetName) klog.V(2).Infof("GetPrimaryInterface(%s, %s) abort backoff", nodeName, vmSetName)
return network.Interface{}, err return network.Interface{}, err

View File

@ -70,7 +70,7 @@ type VMSet interface {
// DetachDisk detaches a vhd from host. The vhd can be identified by diskName or diskURI. // DetachDisk detaches a vhd from host. The vhd can be identified by diskName or diskURI.
DetachDisk(diskName, diskURI string, nodeName types.NodeName) (*http.Response, error) DetachDisk(diskName, diskURI string, nodeName types.NodeName) (*http.Response, error)
// GetDataDisks gets a list of data disks attached to the node. // GetDataDisks gets a list of data disks attached to the node.
GetDataDisks(nodeName types.NodeName) ([]compute.DataDisk, error) GetDataDisks(nodeName types.NodeName, crt cacheReadType) ([]compute.DataDisk, error)
// GetPowerStatusByNodeName returns the power state of the specified node. // GetPowerStatusByNodeName returns the power state of the specified node.
GetPowerStatusByNodeName(name string) (string, error) GetPowerStatusByNodeName(name string) (string, error)

View File

@ -88,9 +88,9 @@ func newScaleSet(az *Cloud) (VMSet, error) {
// getVmssVM gets virtualMachineScaleSetVM by nodeName from cache. // getVmssVM gets virtualMachineScaleSetVM by nodeName from cache.
// It returns cloudprovider.InstanceNotFound if node does not belong to any scale sets. // It returns cloudprovider.InstanceNotFound if node does not belong to any scale sets.
func (ss *scaleSet) getVmssVM(nodeName string) (string, string, *compute.VirtualMachineScaleSetVM, error) { func (ss *scaleSet) getVmssVM(nodeName string, crt cacheReadType) (string, string, *compute.VirtualMachineScaleSetVM, error) {
getter := func(nodeName string) (string, string, *compute.VirtualMachineScaleSetVM, error) { getter := func(nodeName string) (string, string, *compute.VirtualMachineScaleSetVM, error) {
cached, err := ss.vmssVMCache.Get(vmssVirtualMachinesKey) cached, err := ss.vmssVMCache.Get(vmssVirtualMachinesKey, crt)
if err != nil { if err != nil {
return "", "", nil, err return "", "", nil, err
} }
@ -132,7 +132,7 @@ func (ss *scaleSet) getVmssVM(nodeName string) (string, string, *compute.Virtual
// GetPowerStatusByNodeName returns the power state of the specified node. // GetPowerStatusByNodeName returns the power state of the specified node.
func (ss *scaleSet) GetPowerStatusByNodeName(name string) (powerState string, err error) { func (ss *scaleSet) GetPowerStatusByNodeName(name string) (powerState string, err error) {
_, _, vm, err := ss.getVmssVM(name) _, _, vm, err := ss.getVmssVM(name, cachedData)
if err != nil { if err != nil {
return powerState, err return powerState, err
} }
@ -154,9 +154,9 @@ func (ss *scaleSet) GetPowerStatusByNodeName(name string) (powerState string, er
// getCachedVirtualMachineByInstanceID gets scaleSetVMInfo from cache. // getCachedVirtualMachineByInstanceID gets scaleSetVMInfo from cache.
// The node must belong to one of scale sets. // The node must belong to one of scale sets.
func (ss *scaleSet) getVmssVMByInstanceID(resourceGroup, scaleSetName, instanceID string) (*compute.VirtualMachineScaleSetVM, error) { func (ss *scaleSet) getVmssVMByInstanceID(resourceGroup, scaleSetName, instanceID string, crt cacheReadType) (*compute.VirtualMachineScaleSetVM, error) {
getter := func() (vm *compute.VirtualMachineScaleSetVM, found bool, err error) { getter := func() (vm *compute.VirtualMachineScaleSetVM, found bool, err error) {
cached, err := ss.vmssVMCache.Get(vmssVirtualMachinesKey) cached, err := ss.vmssVMCache.Get(vmssVirtualMachinesKey, crt)
if err != nil { if err != nil {
return nil, false, err return nil, false, err
} }
@ -203,7 +203,7 @@ func (ss *scaleSet) getVmssVMByInstanceID(resourceGroup, scaleSetName, instanceI
// It must return ("", cloudprovider.InstanceNotFound) if the instance does // It must return ("", cloudprovider.InstanceNotFound) if the instance does
// not exist or is no longer running. // not exist or is no longer running.
func (ss *scaleSet) GetInstanceIDByNodeName(name string) (string, error) { func (ss *scaleSet) GetInstanceIDByNodeName(name string) (string, error) {
managedByAS, err := ss.isNodeManagedByAvailabilitySet(name) managedByAS, err := ss.isNodeManagedByAvailabilitySet(name, allowUnsafeRead)
if err != nil { if err != nil {
klog.Errorf("Failed to check isNodeManagedByAvailabilitySet: %v", err) klog.Errorf("Failed to check isNodeManagedByAvailabilitySet: %v", err)
return "", err return "", err
@ -213,7 +213,7 @@ func (ss *scaleSet) GetInstanceIDByNodeName(name string) (string, error) {
return ss.availabilitySet.GetInstanceIDByNodeName(name) return ss.availabilitySet.GetInstanceIDByNodeName(name)
} }
_, _, vm, err := ss.getVmssVM(name) _, _, vm, err := ss.getVmssVM(name, allowUnsafeRead)
if err != nil { if err != nil {
return "", err return "", err
} }
@ -247,7 +247,7 @@ func (ss *scaleSet) GetNodeNameByProviderID(providerID string) (types.NodeName,
return ss.availabilitySet.GetNodeNameByProviderID(providerID) return ss.availabilitySet.GetNodeNameByProviderID(providerID)
} }
vm, err := ss.getVmssVMByInstanceID(resourceGroup, scaleSetName, instanceID) vm, err := ss.getVmssVMByInstanceID(resourceGroup, scaleSetName, instanceID, allowUnsafeRead)
if err != nil { if err != nil {
return "", err return "", err
} }
@ -262,7 +262,7 @@ func (ss *scaleSet) GetNodeNameByProviderID(providerID string) (types.NodeName,
// GetInstanceTypeByNodeName gets the instance type by node name. // GetInstanceTypeByNodeName gets the instance type by node name.
func (ss *scaleSet) GetInstanceTypeByNodeName(name string) (string, error) { func (ss *scaleSet) GetInstanceTypeByNodeName(name string) (string, error) {
managedByAS, err := ss.isNodeManagedByAvailabilitySet(name) managedByAS, err := ss.isNodeManagedByAvailabilitySet(name, allowUnsafeRead)
if err != nil { if err != nil {
klog.Errorf("Failed to check isNodeManagedByAvailabilitySet: %v", err) klog.Errorf("Failed to check isNodeManagedByAvailabilitySet: %v", err)
return "", err return "", err
@ -272,7 +272,7 @@ func (ss *scaleSet) GetInstanceTypeByNodeName(name string) (string, error) {
return ss.availabilitySet.GetInstanceTypeByNodeName(name) return ss.availabilitySet.GetInstanceTypeByNodeName(name)
} }
_, _, vm, err := ss.getVmssVM(name) _, _, vm, err := ss.getVmssVM(name, allowUnsafeRead)
if err != nil { if err != nil {
return "", err return "", err
} }
@ -287,7 +287,7 @@ func (ss *scaleSet) GetInstanceTypeByNodeName(name string) (string, error) {
// GetZoneByNodeName gets availability zone for the specified node. If the node is not running // GetZoneByNodeName gets availability zone for the specified node. If the node is not running
// with availability zone, then it returns fault domain. // with availability zone, then it returns fault domain.
func (ss *scaleSet) GetZoneByNodeName(name string) (cloudprovider.Zone, error) { func (ss *scaleSet) GetZoneByNodeName(name string) (cloudprovider.Zone, error) {
managedByAS, err := ss.isNodeManagedByAvailabilitySet(name) managedByAS, err := ss.isNodeManagedByAvailabilitySet(name, allowUnsafeRead)
if err != nil { if err != nil {
klog.Errorf("Failed to check isNodeManagedByAvailabilitySet: %v", err) klog.Errorf("Failed to check isNodeManagedByAvailabilitySet: %v", err)
return cloudprovider.Zone{}, err return cloudprovider.Zone{}, err
@ -297,7 +297,7 @@ func (ss *scaleSet) GetZoneByNodeName(name string) (cloudprovider.Zone, error) {
return ss.availabilitySet.GetZoneByNodeName(name) return ss.availabilitySet.GetZoneByNodeName(name)
} }
_, _, vm, err := ss.getVmssVM(name) _, _, vm, err := ss.getVmssVM(name, allowUnsafeRead)
if err != nil { if err != nil {
return cloudprovider.Zone{}, err return cloudprovider.Zone{}, err
} }
@ -536,7 +536,7 @@ func (ss *scaleSet) getAgentPoolScaleSets(nodes []*v1.Node) (*[]string, error) {
} }
nodeName := nodes[nx].Name nodeName := nodes[nx].Name
ssName, _, _, err := ss.getVmssVM(nodeName) ssName, _, _, err := ss.getVmssVM(nodeName, cachedData)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -614,7 +614,7 @@ func extractResourceGroupByVMSSNicID(nicID string) (string, error) {
// GetPrimaryInterface gets machine primary network interface by node name and vmSet. // GetPrimaryInterface gets machine primary network interface by node name and vmSet.
func (ss *scaleSet) GetPrimaryInterface(nodeName string) (network.Interface, error) { func (ss *scaleSet) GetPrimaryInterface(nodeName string) (network.Interface, error) {
managedByAS, err := ss.isNodeManagedByAvailabilitySet(nodeName) managedByAS, err := ss.isNodeManagedByAvailabilitySet(nodeName, cachedData)
if err != nil { if err != nil {
klog.Errorf("Failed to check isNodeManagedByAvailabilitySet: %v", err) klog.Errorf("Failed to check isNodeManagedByAvailabilitySet: %v", err)
return network.Interface{}, err return network.Interface{}, err
@ -624,7 +624,7 @@ func (ss *scaleSet) GetPrimaryInterface(nodeName string) (network.Interface, err
return ss.availabilitySet.GetPrimaryInterface(nodeName) return ss.availabilitySet.GetPrimaryInterface(nodeName)
} }
ssName, instanceID, vm, err := ss.getVmssVM(nodeName) ssName, instanceID, vm, err := ss.getVmssVM(nodeName, cachedData)
if err != nil { if err != nil {
// VM is availability set, but not cached yet in availabilitySetNodesCache. // VM is availability set, but not cached yet in availabilitySetNodesCache.
if err == ErrorNotVmssInstance { if err == ErrorNotVmssInstance {
@ -747,7 +747,7 @@ func (ss *scaleSet) getConfigForScaleSetByIPFamily(config *compute.VirtualMachin
func (ss *scaleSet) EnsureHostInPool(service *v1.Service, nodeName types.NodeName, backendPoolID string, vmSetName string, isInternal bool) error { func (ss *scaleSet) EnsureHostInPool(service *v1.Service, nodeName types.NodeName, backendPoolID string, vmSetName string, isInternal bool) error {
klog.V(3).Infof("ensuring node %q of scaleset %q in LB backendpool %q", nodeName, vmSetName, backendPoolID) klog.V(3).Infof("ensuring node %q of scaleset %q in LB backendpool %q", nodeName, vmSetName, backendPoolID)
vmName := mapNodeNameToVMName(nodeName) vmName := mapNodeNameToVMName(nodeName)
ssName, instanceID, vm, err := ss.getVmssVM(vmName) ssName, instanceID, vm, err := ss.getVmssVM(vmName, cachedData)
if err != nil { if err != nil {
return err return err
} }
@ -1027,7 +1027,7 @@ func (ss *scaleSet) EnsureHostsInPool(service *v1.Service, nodes []*v1.Node, bac
f := func() error { f := func() error {
// Check whether the node is VMAS virtual machine. // Check whether the node is VMAS virtual machine.
managedByAS, err := ss.isNodeManagedByAvailabilitySet(localNodeName) managedByAS, err := ss.isNodeManagedByAvailabilitySet(localNodeName, cachedData)
if err != nil { if err != nil {
klog.Errorf("Failed to check isNodeManagedByAvailabilitySet(%s): %v", localNodeName, err) klog.Errorf("Failed to check isNodeManagedByAvailabilitySet(%s): %v", localNodeName, err)
return err return err
@ -1068,7 +1068,7 @@ func (ss *scaleSet) EnsureHostsInPool(service *v1.Service, nodes []*v1.Node, bac
// ensureBackendPoolDeletedFromNode ensures the loadBalancer backendAddressPools deleted from the specified node. // ensureBackendPoolDeletedFromNode ensures the loadBalancer backendAddressPools deleted from the specified node.
func (ss *scaleSet) ensureBackendPoolDeletedFromNode(service *v1.Service, nodeName, backendPoolID string) error { func (ss *scaleSet) ensureBackendPoolDeletedFromNode(service *v1.Service, nodeName, backendPoolID string) error {
ssName, instanceID, vm, err := ss.getVmssVM(nodeName) ssName, instanceID, vm, err := ss.getVmssVM(nodeName, cachedData)
if err != nil { if err != nil {
return err return err
} }
@ -1167,7 +1167,7 @@ func (ss *scaleSet) getNodeNameByIPConfigurationID(ipConfigurationID string) (st
resourceGroup := matches[1] resourceGroup := matches[1]
scaleSetName := matches[2] scaleSetName := matches[2]
instanceID := matches[3] instanceID := matches[3]
vm, err := ss.getVmssVMByInstanceID(resourceGroup, scaleSetName, instanceID) vm, err := ss.getVmssVMByInstanceID(resourceGroup, scaleSetName, instanceID, allowUnsafeRead)
if err != nil { if err != nil {
return "", err return "", err
} }

View File

@ -47,6 +47,7 @@ type vmssVirtualMachinesEntry struct {
vmssName string vmssName string
instanceID string instanceID string
virtualMachine *compute.VirtualMachineScaleSetVM virtualMachine *compute.VirtualMachineScaleSetVM
lastUpdate time.Time
} }
func (ss *scaleSet) makeVmssVMName(scaleSetName, instanceID string) string { func (ss *scaleSet) makeVmssVMName(scaleSetName, instanceID string) string {
@ -101,6 +102,7 @@ func (ss *scaleSet) newVMSSVirtualMachinesCache() (*timedCache, error) {
vmssName: ssName, vmssName: ssName,
instanceID: to.String(vm.InstanceID), instanceID: to.String(vm.InstanceID),
virtualMachine: &vm, virtualMachine: &vm,
lastUpdate: time.Now().UTC(),
}) })
} }
} }
@ -113,7 +115,7 @@ func (ss *scaleSet) newVMSSVirtualMachinesCache() (*timedCache, error) {
} }
func (ss *scaleSet) deleteCacheForNode(nodeName string) error { func (ss *scaleSet) deleteCacheForNode(nodeName string) error {
cached, err := ss.vmssVMCache.Get(vmssVirtualMachinesKey) cached, err := ss.vmssVMCache.Get(vmssVirtualMachinesKey, allowUnsafeRead)
if err != nil { if err != nil {
return err return err
} }
@ -150,8 +152,8 @@ func (ss *scaleSet) newAvailabilitySetNodesCache() (*timedCache, error) {
return newTimedcache(availabilitySetNodesCacheTTL, getter) return newTimedcache(availabilitySetNodesCacheTTL, getter)
} }
func (ss *scaleSet) isNodeManagedByAvailabilitySet(nodeName string) (bool, error) { func (ss *scaleSet) isNodeManagedByAvailabilitySet(nodeName string, crt cacheReadType) (bool, error) {
cached, err := ss.availabilitySetNodesCache.Get(availabilitySetNodesKey) cached, err := ss.availabilitySetNodesCache.Get(availabilitySetNodesKey, crt)
if err != nil { if err != nil {
return false, err return false, err
} }

View File

@ -85,7 +85,7 @@ func TestVMSSVMCache(t *testing.T) {
for i := range virtualMachines { for i := range virtualMachines {
vm := virtualMachines[i] vm := virtualMachines[i]
vmName := to.String(vm.OsProfile.ComputerName) vmName := to.String(vm.OsProfile.ComputerName)
ssName, instanceID, realVM, err := ss.getVmssVM(vmName) ssName, instanceID, realVM, err := ss.getVmssVM(vmName, cachedData)
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, "vmss", ssName) assert.Equal(t, "vmss", ssName)
assert.Equal(t, to.String(vm.InstanceID), instanceID) assert.Equal(t, to.String(vm.InstanceID), instanceID)
@ -99,14 +99,14 @@ func TestVMSSVMCache(t *testing.T) {
assert.NoError(t, err) assert.NoError(t, err)
// the VM should be removed from cache after deleteCacheForNode(). // the VM should be removed from cache after deleteCacheForNode().
cached, err := ss.vmssVMCache.Get(vmssVirtualMachinesKey) cached, err := ss.vmssVMCache.Get(vmssVirtualMachinesKey, cachedData)
assert.NoError(t, err) assert.NoError(t, err)
cachedVirtualMachines := cached.(*sync.Map) cachedVirtualMachines := cached.(*sync.Map)
_, ok := cachedVirtualMachines.Load(vmName) _, ok := cachedVirtualMachines.Load(vmName)
assert.Equal(t, false, ok) assert.Equal(t, false, ok)
// the VM should be get back after another cache refresh. // the VM should be get back after another cache refresh.
ssName, instanceID, realVM, err := ss.getVmssVM(vmName) ssName, instanceID, realVM, err := ss.getVmssVM(vmName, cachedData)
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, "vmss", ssName) assert.Equal(t, "vmss", ssName)
assert.Equal(t, to.String(vm.InstanceID), instanceID) assert.Equal(t, to.String(vm.InstanceID), instanceID)

View File

@ -90,9 +90,9 @@ func ignoreStatusForbiddenFromError(err error) error {
/// getVirtualMachine calls 'VirtualMachinesClient.Get' with a timed cache /// getVirtualMachine calls 'VirtualMachinesClient.Get' with a timed cache
/// The service side has throttling control that delays responses if there're multiple requests onto certain vm /// The service side has throttling control that delays responses if there're multiple requests onto certain vm
/// resource request in short period. /// resource request in short period.
func (az *Cloud) getVirtualMachine(nodeName types.NodeName) (vm compute.VirtualMachine, err error) { func (az *Cloud) getVirtualMachine(nodeName types.NodeName, crt cacheReadType) (vm compute.VirtualMachine, err error) {
vmName := string(nodeName) vmName := string(nodeName)
cachedVM, err := az.vmCache.Get(vmName) cachedVM, err := az.vmCache.Get(vmName, crt)
if err != nil { if err != nil {
return vm, err return vm, err
} }
@ -104,8 +104,8 @@ func (az *Cloud) getVirtualMachine(nodeName types.NodeName) (vm compute.VirtualM
return *(cachedVM.(*compute.VirtualMachine)), nil return *(cachedVM.(*compute.VirtualMachine)), nil
} }
func (az *Cloud) getRouteTable() (routeTable network.RouteTable, exists bool, err error) { func (az *Cloud) getRouteTable(crt cacheReadType) (routeTable network.RouteTable, exists bool, err error) {
cachedRt, err := az.rtCache.Get(az.RouteTableName) cachedRt, err := az.rtCache.Get(az.RouteTableName, crt)
if err != nil { if err != nil {
return routeTable, false, err return routeTable, false, err
} }
@ -168,8 +168,8 @@ func (az *Cloud) getSubnet(virtualNetworkName string, subnetName string) (subnet
return subnet, exists, err return subnet, exists, err
} }
func (az *Cloud) getAzureLoadBalancer(name string) (lb network.LoadBalancer, exists bool, err error) { func (az *Cloud) getAzureLoadBalancer(name string, crt cacheReadType) (lb network.LoadBalancer, exists bool, err error) {
cachedLB, err := az.lbCache.Get(name) cachedLB, err := az.lbCache.Get(name, crt)
if err != nil { if err != nil {
return lb, false, err return lb, false, err
} }
@ -181,12 +181,12 @@ func (az *Cloud) getAzureLoadBalancer(name string) (lb network.LoadBalancer, exi
return *(cachedLB.(*network.LoadBalancer)), true, nil return *(cachedLB.(*network.LoadBalancer)), true, nil
} }
func (az *Cloud) getSecurityGroup() (nsg network.SecurityGroup, err error) { func (az *Cloud) getSecurityGroup(crt cacheReadType) (nsg network.SecurityGroup, err error) {
if az.SecurityGroupName == "" { if az.SecurityGroupName == "" {
return nsg, fmt.Errorf("securityGroupName is not configured") return nsg, fmt.Errorf("securityGroupName is not configured")
} }
securityGroup, err := az.nsgCache.Get(az.SecurityGroupName) securityGroup, err := az.nsgCache.Get(az.SecurityGroupName, crt)
if err != nil { if err != nil {
return nsg, err return nsg, err
} }