chore: move caches to a separate package

This commit is contained in:
Pengfei Ni 2020-02-18 06:08:26 +00:00
parent 5bd719b6a6
commit 1ade947698
23 changed files with 254 additions and 176 deletions

View File

@ -12,7 +12,6 @@ go_library(
"azure.go", "azure.go",
"azure_backoff.go", "azure_backoff.go",
"azure_blobDiskController.go", "azure_blobDiskController.go",
"azure_cache.go",
"azure_config.go", "azure_config.go",
"azure_controller_common.go", "azure_controller_common.go",
"azure_controller_standard.go", "azure_controller_standard.go",
@ -62,6 +61,7 @@ go_library(
"//staging/src/k8s.io/cloud-provider/volume/helpers:go_default_library", "//staging/src/k8s.io/cloud-provider/volume/helpers:go_default_library",
"//staging/src/k8s.io/component-base/featuregate:go_default_library", "//staging/src/k8s.io/component-base/featuregate:go_default_library",
"//staging/src/k8s.io/legacy-cloud-providers/azure/auth:go_default_library", "//staging/src/k8s.io/legacy-cloud-providers/azure/auth:go_default_library",
"//staging/src/k8s.io/legacy-cloud-providers/azure/cache:go_default_library",
"//staging/src/k8s.io/legacy-cloud-providers/azure/clients:go_default_library", "//staging/src/k8s.io/legacy-cloud-providers/azure/clients:go_default_library",
"//staging/src/k8s.io/legacy-cloud-providers/azure/clients/diskclient:go_default_library", "//staging/src/k8s.io/legacy-cloud-providers/azure/clients/diskclient:go_default_library",
"//staging/src/k8s.io/legacy-cloud-providers/azure/clients/interfaceclient:go_default_library", "//staging/src/k8s.io/legacy-cloud-providers/azure/clients/interfaceclient:go_default_library",
@ -95,7 +95,6 @@ go_library(
go_test( go_test(
name = "go_default_test", name = "go_default_test",
srcs = [ srcs = [
"azure_cache_test.go",
"azure_config_test.go", "azure_config_test.go",
"azure_controller_common_test.go", "azure_controller_common_test.go",
"azure_controller_standard_test.go", "azure_controller_standard_test.go",
@ -125,6 +124,7 @@ go_test(
"//staging/src/k8s.io/cloud-provider:go_default_library", "//staging/src/k8s.io/cloud-provider:go_default_library",
"//staging/src/k8s.io/cloud-provider/service/helpers:go_default_library", "//staging/src/k8s.io/cloud-provider/service/helpers:go_default_library",
"//staging/src/k8s.io/legacy-cloud-providers/azure/auth:go_default_library", "//staging/src/k8s.io/legacy-cloud-providers/azure/auth:go_default_library",
"//staging/src/k8s.io/legacy-cloud-providers/azure/cache:go_default_library",
"//staging/src/k8s.io/legacy-cloud-providers/azure/clients:go_default_library", "//staging/src/k8s.io/legacy-cloud-providers/azure/clients:go_default_library",
"//staging/src/k8s.io/legacy-cloud-providers/azure/retry:go_default_library", "//staging/src/k8s.io/legacy-cloud-providers/azure/retry:go_default_library",
"//vendor/github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2019-07-01/compute:go_default_library", "//vendor/github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2019-07-01/compute:go_default_library",
@ -149,6 +149,7 @@ filegroup(
srcs = [ srcs = [
":package-srcs", ":package-srcs",
"//staging/src/k8s.io/legacy-cloud-providers/azure/auth:all-srcs", "//staging/src/k8s.io/legacy-cloud-providers/azure/auth:all-srcs",
"//staging/src/k8s.io/legacy-cloud-providers/azure/cache:all-srcs",
"//staging/src/k8s.io/legacy-cloud-providers/azure/clients:all-srcs", "//staging/src/k8s.io/legacy-cloud-providers/azure/clients:all-srcs",
"//staging/src/k8s.io/legacy-cloud-providers/azure/metrics:all-srcs", "//staging/src/k8s.io/legacy-cloud-providers/azure/metrics:all-srcs",
"//staging/src/k8s.io/legacy-cloud-providers/azure/retry:all-srcs", "//staging/src/k8s.io/legacy-cloud-providers/azure/retry:all-srcs",

View File

@ -43,6 +43,7 @@ import (
cloudprovider "k8s.io/cloud-provider" cloudprovider "k8s.io/cloud-provider"
"k8s.io/klog" "k8s.io/klog"
"k8s.io/legacy-cloud-providers/azure/auth" "k8s.io/legacy-cloud-providers/azure/auth"
azcache "k8s.io/legacy-cloud-providers/azure/cache"
azclients "k8s.io/legacy-cloud-providers/azure/clients" azclients "k8s.io/legacy-cloud-providers/azure/clients"
"k8s.io/legacy-cloud-providers/azure/clients/diskclient" "k8s.io/legacy-cloud-providers/azure/clients/diskclient"
"k8s.io/legacy-cloud-providers/azure/clients/interfaceclient" "k8s.io/legacy-cloud-providers/azure/clients/interfaceclient"
@ -274,10 +275,10 @@ type Cloud struct {
eventRecorder record.EventRecorder eventRecorder record.EventRecorder
routeUpdater *delayedRouteUpdater routeUpdater *delayedRouteUpdater
vmCache *timedCache vmCache *azcache.TimedCache
lbCache *timedCache lbCache *azcache.TimedCache
nsgCache *timedCache nsgCache *azcache.TimedCache
rtCache *timedCache rtCache *azcache.TimedCache
*BlobDiskController *BlobDiskController
*ManagedDiskController *ManagedDiskController

View File

@ -32,6 +32,7 @@ import (
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
cloudprovider "k8s.io/cloud-provider" cloudprovider "k8s.io/cloud-provider"
"k8s.io/klog" "k8s.io/klog"
azcache "k8s.io/legacy-cloud-providers/azure/cache"
"k8s.io/legacy-cloud-providers/azure/retry" "k8s.io/legacy-cloud-providers/azure/retry"
) )
@ -65,7 +66,7 @@ func (az *Cloud) Event(obj runtime.Object, eventtype, reason, message string) {
} }
// GetVirtualMachineWithRetry invokes az.getVirtualMachine with exponential backoff retry // GetVirtualMachineWithRetry invokes az.getVirtualMachine with exponential backoff retry
func (az *Cloud) GetVirtualMachineWithRetry(name types.NodeName, crt cacheReadType) (compute.VirtualMachine, error) { func (az *Cloud) GetVirtualMachineWithRetry(name types.NodeName, crt azcache.AzureCacheReadType) (compute.VirtualMachine, error) {
var machine compute.VirtualMachine var machine compute.VirtualMachine
var retryErr error var retryErr error
err := wait.ExponentialBackoff(az.RequestBackoff(), func() (bool, error) { err := wait.ExponentialBackoff(az.RequestBackoff(), func() (bool, error) {

View File

@ -34,6 +34,7 @@ import (
cloudprovider "k8s.io/cloud-provider" cloudprovider "k8s.io/cloud-provider"
volerr "k8s.io/cloud-provider/volume/errors" volerr "k8s.io/cloud-provider/volume/errors"
"k8s.io/klog" "k8s.io/klog"
azcache "k8s.io/legacy-cloud-providers/azure/cache"
"k8s.io/legacy-cloud-providers/azure/retry" "k8s.io/legacy-cloud-providers/azure/retry"
) )
@ -87,7 +88,7 @@ type controllerCommon struct {
} }
// getNodeVMSet gets the VMSet interface based on config.VMType and the real virtual machine type. // getNodeVMSet gets the VMSet interface based on config.VMType and the real virtual machine type.
func (c *controllerCommon) getNodeVMSet(nodeName types.NodeName, crt cacheReadType) (VMSet, error) { func (c *controllerCommon) getNodeVMSet(nodeName types.NodeName, crt azcache.AzureCacheReadType) (VMSet, error) {
// 1. vmType is standard, return cloud.vmSet directly. // 1. vmType is standard, return cloud.vmSet directly.
if c.cloud.VMType == vmTypeStandard { if c.cloud.VMType == vmTypeStandard {
return c.cloud.vmSet, nil return c.cloud.vmSet, nil
@ -155,7 +156,7 @@ func (c *controllerCommon) AttachDisk(isManagedDisk bool, diskName, diskURI stri
} }
} }
vmset, err := c.getNodeVMSet(nodeName, cacheReadTypeUnsafe) vmset, err := c.getNodeVMSet(nodeName, azcache.CacheReadTypeUnsafe)
if err != nil { if err != nil {
return -1, err return -1, err
} }
@ -195,7 +196,7 @@ func (c *controllerCommon) DetachDisk(diskName, diskURI string, nodeName types.N
return fmt.Errorf("failed to get azure instance id for node %q (%v)", nodeName, err) return fmt.Errorf("failed to get azure instance id for node %q (%v)", nodeName, err)
} }
vmset, err := c.getNodeVMSet(nodeName, cacheReadTypeUnsafe) vmset, err := c.getNodeVMSet(nodeName, azcache.CacheReadTypeUnsafe)
if err != nil { if err != nil {
return err return err
} }
@ -239,7 +240,7 @@ func (c *controllerCommon) DetachDisk(diskName, diskURI string, nodeName types.N
} }
// getNodeDataDisks invokes vmSet interfaces to get data disks for the node. // getNodeDataDisks invokes vmSet interfaces to get data disks for the node.
func (c *controllerCommon) getNodeDataDisks(nodeName types.NodeName, crt cacheReadType) ([]compute.DataDisk, error) { func (c *controllerCommon) getNodeDataDisks(nodeName types.NodeName, crt azcache.AzureCacheReadType) ([]compute.DataDisk, error) {
vmset, err := c.getNodeVMSet(nodeName, crt) vmset, err := c.getNodeVMSet(nodeName, crt)
if err != nil { if err != nil {
return nil, err return nil, err
@ -252,7 +253,7 @@ func (c *controllerCommon) getNodeDataDisks(nodeName types.NodeName, crt cacheRe
func (c *controllerCommon) GetDiskLun(diskName, diskURI string, nodeName types.NodeName) (int32, error) { func (c *controllerCommon) GetDiskLun(diskName, diskURI string, nodeName types.NodeName) (int32, error) {
// getNodeDataDisks need to fetch the cached data/fresh data if cache expired here // getNodeDataDisks need to fetch the cached data/fresh data if cache expired here
// to ensure we get LUN based on latest entry. // to ensure we get LUN based on latest entry.
disks, err := c.getNodeDataDisks(nodeName, cacheReadTypeDefault) disks, err := c.getNodeDataDisks(nodeName, azcache.CacheReadTypeDefault)
if err != nil { if err != nil {
klog.Errorf("error of getting data disks for node %q: %v", nodeName, err) klog.Errorf("error of getting data disks for node %q: %v", nodeName, err)
return -1, err return -1, err
@ -276,7 +277,7 @@ func (c *controllerCommon) GetDiskLun(diskName, diskURI string, nodeName types.N
// GetNextDiskLun searches all vhd attachment on the host and find unused lun. Return -1 if all luns are used. // GetNextDiskLun searches all vhd attachment on the host and find unused lun. Return -1 if all luns are used.
func (c *controllerCommon) GetNextDiskLun(nodeName types.NodeName) (int32, error) { func (c *controllerCommon) GetNextDiskLun(nodeName types.NodeName) (int32, error) {
disks, err := c.getNodeDataDisks(nodeName, cacheReadTypeDefault) disks, err := c.getNodeDataDisks(nodeName, azcache.CacheReadTypeDefault)
if err != nil { if err != nil {
klog.Errorf("error of getting data disks for node %q: %v", nodeName, err) klog.Errorf("error of getting data disks for node %q: %v", nodeName, err)
return -1, err return -1, err
@ -307,7 +308,7 @@ func (c *controllerCommon) DisksAreAttached(diskNames []string, nodeName types.N
// for every reconcile call. The cache is invalidated after Attach/Detach // for every reconcile call. The cache is invalidated after Attach/Detach
// disk. So the new entry will be fetched and cached the first time reconcile // disk. So the new entry will be fetched and cached the first time reconcile
// loop runs after the Attach/Disk OP which will reflect the latest model. // loop runs after the Attach/Disk OP which will reflect the latest model.
disks, err := c.getNodeDataDisks(nodeName, cacheReadTypeUnsafe) disks, err := c.getNodeDataDisks(nodeName, azcache.CacheReadTypeUnsafe)
if err != nil { if err != nil {
if err == cloudprovider.InstanceNotFound { if err == cloudprovider.InstanceNotFound {
// if host doesn't exist, no need to detach // if host doesn't exist, no need to detach

View File

@ -26,12 +26,13 @@ import (
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
"k8s.io/klog" "k8s.io/klog"
azcache "k8s.io/legacy-cloud-providers/azure/cache"
) )
// AttachDisk attaches a vhd to vm // AttachDisk attaches a vhd to vm
// the vhd must exist, can be identified by diskName, diskURI, and lun. // the vhd must exist, can be identified by diskName, diskURI, and lun.
func (as *availabilitySet) AttachDisk(isManagedDisk bool, diskName, diskURI string, nodeName types.NodeName, lun int32, cachingMode compute.CachingTypes, diskEncryptionSetID string, writeAcceleratorEnabled bool) error { func (as *availabilitySet) AttachDisk(isManagedDisk bool, diskName, diskURI string, nodeName types.NodeName, lun int32, cachingMode compute.CachingTypes, diskEncryptionSetID string, writeAcceleratorEnabled bool) error {
vm, err := as.getVirtualMachine(nodeName, cacheReadTypeDefault) vm, err := as.getVirtualMachine(nodeName, azcache.CacheReadTypeDefault)
if err != nil { if err != nil {
return err return err
} }
@ -115,7 +116,7 @@ func (as *availabilitySet) AttachDisk(isManagedDisk bool, diskName, diskURI stri
// DetachDisk detaches a disk from host // DetachDisk detaches a disk from host
// the vhd can be identified by diskName or diskURI // the vhd can be identified by diskName or diskURI
func (as *availabilitySet) DetachDisk(diskName, diskURI string, nodeName types.NodeName) error { func (as *availabilitySet) DetachDisk(diskName, diskURI string, nodeName types.NodeName) error {
vm, err := as.getVirtualMachine(nodeName, cacheReadTypeDefault) vm, err := as.getVirtualMachine(nodeName, azcache.CacheReadTypeDefault)
if err != nil { if err != nil {
// if host doesn't exist, no need to detach // if host doesn't exist, no need to detach
klog.Warningf("azureDisk - cannot find node %s, skip detaching disk(%s, %s)", nodeName, diskName, diskURI) klog.Warningf("azureDisk - cannot find node %s, skip detaching disk(%s, %s)", nodeName, diskName, diskURI)
@ -172,7 +173,7 @@ func (as *availabilitySet) DetachDisk(diskName, diskURI string, nodeName types.N
} }
// GetDataDisks gets a list of data disks attached to the node. // GetDataDisks gets a list of data disks attached to the node.
func (as *availabilitySet) GetDataDisks(nodeName types.NodeName, crt cacheReadType) ([]compute.DataDisk, error) { func (as *availabilitySet) GetDataDisks(nodeName types.NodeName, crt azcache.AzureCacheReadType) ([]compute.DataDisk, error) {
vm, err := as.getVirtualMachine(nodeName, crt) vm, err := as.getVirtualMachine(nodeName, crt)
if err != nil { if err != nil {
return nil, err return nil, err

View File

@ -27,6 +27,11 @@ import (
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
azcache "k8s.io/legacy-cloud-providers/azure/cache"
)
var (
fakeCacheTTL = 2 * time.Second
) )
func TestStandardAttachDisk(t *testing.T) { func TestStandardAttachDisk(t *testing.T) {
@ -100,14 +105,14 @@ func TestGetDataDisks(t *testing.T) {
nodeName types.NodeName nodeName types.NodeName
expectedDataDisks []compute.DataDisk expectedDataDisks []compute.DataDisk
expectedError bool expectedError bool
crt cacheReadType crt azcache.AzureCacheReadType
}{ }{
{ {
desc: "an error shall be returned if there's no corresponding vm", desc: "an error shall be returned if there's no corresponding vm",
nodeName: "vm2", nodeName: "vm2",
expectedDataDisks: nil, expectedDataDisks: nil,
expectedError: true, expectedError: true,
crt: cacheReadTypeDefault, crt: azcache.CacheReadTypeDefault,
}, },
{ {
desc: "correct list of data disks shall be returned if everything is good", desc: "correct list of data disks shall be returned if everything is good",
@ -119,7 +124,7 @@ func TestGetDataDisks(t *testing.T) {
}, },
}, },
expectedError: false, expectedError: false,
crt: cacheReadTypeDefault, crt: azcache.CacheReadTypeDefault,
}, },
{ {
desc: "correct list of data disks shall be returned if everything is good", desc: "correct list of data disks shall be returned if everything is good",
@ -131,7 +136,7 @@ func TestGetDataDisks(t *testing.T) {
}, },
}, },
expectedError: false, expectedError: false,
crt: cacheReadTypeUnsafe, crt: azcache.CacheReadTypeUnsafe,
}, },
} }
for i, test := range testCases { for i, test := range testCases {
@ -143,7 +148,7 @@ func TestGetDataDisks(t *testing.T) {
assert.Equal(t, test.expectedDataDisks, dataDisks, "TestCase[%d]: %s", i, test.desc) assert.Equal(t, test.expectedDataDisks, dataDisks, "TestCase[%d]: %s", i, test.desc)
assert.Equal(t, test.expectedError, err != nil, "TestCase[%d]: %s", i, test.desc) assert.Equal(t, test.expectedError, err != nil, "TestCase[%d]: %s", i, test.desc)
if test.crt == cacheReadTypeUnsafe { if test.crt == azcache.CacheReadTypeUnsafe {
time.Sleep(fakeCacheTTL) time.Sleep(fakeCacheTTL)
dataDisks, err := vmSet.GetDataDisks(test.nodeName, test.crt) dataDisks, err := vmSet.GetDataDisks(test.nodeName, test.crt)
assert.Equal(t, test.expectedDataDisks, dataDisks, "TestCase[%d]: %s", i, test.desc) assert.Equal(t, test.expectedDataDisks, dataDisks, "TestCase[%d]: %s", i, test.desc)

View File

@ -26,13 +26,14 @@ import (
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
"k8s.io/klog" "k8s.io/klog"
azcache "k8s.io/legacy-cloud-providers/azure/cache"
) )
// AttachDisk attaches a vhd to vm // AttachDisk attaches a vhd to vm
// the vhd must exist, can be identified by diskName, diskURI, and lun. // the vhd must exist, can be identified by diskName, diskURI, and lun.
func (ss *scaleSet) AttachDisk(isManagedDisk bool, diskName, diskURI string, nodeName types.NodeName, lun int32, cachingMode compute.CachingTypes, diskEncryptionSetID string, writeAcceleratorEnabled bool) error { func (ss *scaleSet) AttachDisk(isManagedDisk bool, diskName, diskURI string, nodeName types.NodeName, lun int32, cachingMode compute.CachingTypes, diskEncryptionSetID string, writeAcceleratorEnabled bool) error {
vmName := mapNodeNameToVMName(nodeName) vmName := mapNodeNameToVMName(nodeName)
ssName, instanceID, vm, err := ss.getVmssVM(vmName, cacheReadTypeDefault) ssName, instanceID, vm, err := ss.getVmssVM(vmName, azcache.CacheReadTypeDefault)
if err != nil { if err != nil {
return err return err
} }
@ -120,7 +121,7 @@ func (ss *scaleSet) AttachDisk(isManagedDisk bool, diskName, diskURI string, nod
// the vhd can be identified by diskName or diskURI // the vhd can be identified by diskName or diskURI
func (ss *scaleSet) DetachDisk(diskName, diskURI string, nodeName types.NodeName) error { func (ss *scaleSet) DetachDisk(diskName, diskURI string, nodeName types.NodeName) error {
vmName := mapNodeNameToVMName(nodeName) vmName := mapNodeNameToVMName(nodeName)
ssName, instanceID, vm, err := ss.getVmssVM(vmName, cacheReadTypeDefault) ssName, instanceID, vm, err := ss.getVmssVM(vmName, azcache.CacheReadTypeDefault)
if err != nil { if err != nil {
return err return err
} }
@ -180,7 +181,7 @@ func (ss *scaleSet) DetachDisk(diskName, diskURI string, nodeName types.NodeName
} }
// GetDataDisks gets a list of data disks attached to the node. // GetDataDisks gets a list of data disks attached to the node.
func (ss *scaleSet) GetDataDisks(nodeName types.NodeName, crt cacheReadType) ([]compute.DataDisk, error) { func (ss *scaleSet) GetDataDisks(nodeName types.NodeName, crt azcache.AzureCacheReadType) ([]compute.DataDisk, error) {
_, _, vm, err := ss.getVmssVM(string(nodeName), crt) _, _, vm, err := ss.getVmssVM(string(nodeName), crt)
if err != nil { if err != nil {
return nil, err return nil, err

View File

@ -35,6 +35,7 @@ import (
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
cloudprovider "k8s.io/cloud-provider" cloudprovider "k8s.io/cloud-provider"
azcache "k8s.io/legacy-cloud-providers/azure/cache"
"k8s.io/legacy-cloud-providers/azure/retry" "k8s.io/legacy-cloud-providers/azure/retry"
) )
@ -983,7 +984,7 @@ func (f *fakeVMSet) DetachDisk(diskName, diskURI string, nodeName types.NodeName
return fmt.Errorf("unimplemented") return fmt.Errorf("unimplemented")
} }
func (f *fakeVMSet) GetDataDisks(nodeName types.NodeName, crt cacheReadType) ([]compute.DataDisk, error) { func (f *fakeVMSet) GetDataDisks(nodeName types.NodeName, crt azcache.AzureCacheReadType) ([]compute.DataDisk, error) {
return nil, fmt.Errorf("unimplemented") return nil, fmt.Errorf("unimplemented")
} }

View File

@ -24,6 +24,8 @@ import (
"io/ioutil" "io/ioutil"
"net/http" "net/http"
"time" "time"
azcache "k8s.io/legacy-cloud-providers/azure/cache"
) )
const ( const (
@ -87,7 +89,7 @@ type InstanceMetadata struct {
// InstanceMetadataService knows how to query the Azure instance metadata server. // InstanceMetadataService knows how to query the Azure instance metadata server.
type InstanceMetadataService struct { type InstanceMetadataService struct {
metadataURL string metadataURL string
imsCache *timedCache imsCache *azcache.TimedCache
} }
// NewInstanceMetadataService creates an instance of the InstanceMetadataService accessor object. // NewInstanceMetadataService creates an instance of the InstanceMetadataService accessor object.
@ -96,7 +98,7 @@ func NewInstanceMetadataService(metadataURL string) (*InstanceMetadataService, e
metadataURL: metadataURL, metadataURL: metadataURL,
} }
imsCache, err := newTimedcache(metadataCacheTTL, ims.getInstanceMetadata) imsCache, err := azcache.NewTimedcache(metadataCacheTTL, ims.getInstanceMetadata)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -145,7 +147,7 @@ func (ims *InstanceMetadataService) getInstanceMetadata(key string) (interface{}
// GetMetadata gets instance metadata from cache. // GetMetadata gets instance metadata from cache.
// crt determines if we can get data from stalled cache/need fresh if cache expired. // crt determines if we can get data from stalled cache/need fresh if cache expired.
func (ims *InstanceMetadataService) GetMetadata(crt cacheReadType) (*InstanceMetadata, error) { func (ims *InstanceMetadataService) GetMetadata(crt azcache.AzureCacheReadType) (*InstanceMetadata, error) {
cache, err := ims.imsCache.Get(metadataCacheKey, crt) cache, err := ims.imsCache.Get(metadataCacheKey, crt)
if err != nil { if err != nil {
return nil, err return nil, err

View File

@ -28,6 +28,7 @@ import (
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
cloudprovider "k8s.io/cloud-provider" cloudprovider "k8s.io/cloud-provider"
"k8s.io/klog" "k8s.io/klog"
azcache "k8s.io/legacy-cloud-providers/azure/cache"
) )
const ( const (
@ -73,7 +74,7 @@ func (az *Cloud) NodeAddresses(ctx context.Context, name types.NodeName) ([]v1.N
} }
if az.UseInstanceMetadata { if az.UseInstanceMetadata {
metadata, err := az.metadata.GetMetadata(cacheReadTypeUnsafe) metadata, err := az.metadata.GetMetadata(azcache.CacheReadTypeUnsafe)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -259,7 +260,7 @@ func (az *Cloud) InstanceID(ctx context.Context, name types.NodeName) (string, e
} }
if az.UseInstanceMetadata { if az.UseInstanceMetadata {
metadata, err := az.metadata.GetMetadata(cacheReadTypeUnsafe) metadata, err := az.metadata.GetMetadata(azcache.CacheReadTypeUnsafe)
if err != nil { if err != nil {
return "", err return "", err
} }
@ -346,7 +347,7 @@ func (az *Cloud) InstanceType(ctx context.Context, name types.NodeName) (string,
} }
if az.UseInstanceMetadata { if az.UseInstanceMetadata {
metadata, err := az.metadata.GetMetadata(cacheReadTypeUnsafe) metadata, err := az.metadata.GetMetadata(azcache.CacheReadTypeUnsafe)
if err != nil { if err != nil {
return "", err return "", err
} }

View File

@ -35,6 +35,7 @@ import (
cloudprovider "k8s.io/cloud-provider" cloudprovider "k8s.io/cloud-provider"
servicehelpers "k8s.io/cloud-provider/service/helpers" servicehelpers "k8s.io/cloud-provider/service/helpers"
"k8s.io/klog" "k8s.io/klog"
azcache "k8s.io/legacy-cloud-providers/azure/cache"
utilnet "k8s.io/utils/net" utilnet "k8s.io/utils/net"
) )
@ -961,7 +962,7 @@ func (az *Cloud) reconcileLoadBalancer(clusterName string, service *v1.Service,
if isInternal { if isInternal {
// Refresh updated lb which will be used later in other places. // Refresh updated lb which will be used later in other places.
newLB, exist, err := az.getAzureLoadBalancer(lbName, cacheReadTypeDefault) newLB, exist, err := az.getAzureLoadBalancer(lbName, azcache.CacheReadTypeDefault)
if err != nil { if err != nil {
klog.V(2).Infof("reconcileLoadBalancer for service(%s): getAzureLoadBalancer(%s) failed: %v", serviceName, lbName, err) klog.V(2).Infof("reconcileLoadBalancer for service(%s): getAzureLoadBalancer(%s) failed: %v", serviceName, lbName, err)
return nil, err return nil, err
@ -1125,7 +1126,7 @@ func (az *Cloud) reconcileSecurityGroup(clusterName string, service *v1.Service,
ports = []v1.ServicePort{} ports = []v1.ServicePort{}
} }
sg, err := az.getSecurityGroup(cacheReadTypeDefault) sg, err := az.getSecurityGroup(azcache.CacheReadTypeDefault)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -1466,7 +1467,7 @@ func (az *Cloud) reconcilePublicIP(clusterName string, service *v1.Service, lbNa
} }
if lbName != "" { if lbName != "" {
loadBalancer, _, err := az.getAzureLoadBalancer(lbName, cacheReadTypeDefault) loadBalancer, _, err := az.getAzureLoadBalancer(lbName, azcache.CacheReadTypeDefault)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -31,6 +31,7 @@ import (
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
cloudprovider "k8s.io/cloud-provider" cloudprovider "k8s.io/cloud-provider"
"k8s.io/klog" "k8s.io/klog"
azcache "k8s.io/legacy-cloud-providers/azure/cache"
utilnet "k8s.io/utils/net" utilnet "k8s.io/utils/net"
) )
@ -117,7 +118,7 @@ func (d *delayedRouteUpdater) updateRoutes() {
var routeTable network.RouteTable var routeTable network.RouteTable
var existsRouteTable bool var existsRouteTable bool
routeTable, existsRouteTable, err = d.az.getRouteTable(cacheReadTypeDefault) routeTable, existsRouteTable, err = d.az.getRouteTable(azcache.CacheReadTypeDefault)
if err != nil { if err != nil {
klog.Errorf("getRouteTable() failed with error: %v", err) klog.Errorf("getRouteTable() failed with error: %v", err)
return return
@ -131,7 +132,7 @@ func (d *delayedRouteUpdater) updateRoutes() {
return return
} }
routeTable, _, err = d.az.getRouteTable(cacheReadTypeDefault) routeTable, _, err = d.az.getRouteTable(azcache.CacheReadTypeDefault)
if err != nil { if err != nil {
klog.Errorf("getRouteTable() failed with error: %v", err) klog.Errorf("getRouteTable() failed with error: %v", err)
return return
@ -200,7 +201,7 @@ func (d *delayedRouteUpdater) addRouteOperation(operation routeOperation, route
// ListRoutes lists all managed routes that belong to the specified clusterName // ListRoutes lists all managed routes that belong to the specified clusterName
func (az *Cloud) ListRoutes(ctx context.Context, clusterName string) ([]*cloudprovider.Route, error) { func (az *Cloud) ListRoutes(ctx context.Context, clusterName string) ([]*cloudprovider.Route, error) {
klog.V(10).Infof("ListRoutes: START clusterName=%q", clusterName) klog.V(10).Infof("ListRoutes: START clusterName=%q", clusterName)
routeTable, existsRouteTable, err := az.getRouteTable(cacheReadTypeDefault) routeTable, existsRouteTable, err := az.getRouteTable(azcache.CacheReadTypeDefault)
routes, err := processRoutes(az.ipv6DualStackEnabled, routeTable, existsRouteTable, err) routes, err := processRoutes(az.ipv6DualStackEnabled, routeTable, existsRouteTable, err)
if err != nil { if err != nil {
return nil, err return nil, err

View File

@ -39,6 +39,7 @@ import (
"k8s.io/apimachinery/pkg/util/uuid" "k8s.io/apimachinery/pkg/util/uuid"
cloudprovider "k8s.io/cloud-provider" cloudprovider "k8s.io/cloud-provider"
"k8s.io/klog" "k8s.io/klog"
azcache "k8s.io/legacy-cloud-providers/azure/cache"
"k8s.io/component-base/featuregate" "k8s.io/component-base/featuregate"
utilnet "k8s.io/utils/net" utilnet "k8s.io/utils/net"
@ -391,14 +392,14 @@ func (as *availabilitySet) GetInstanceIDByNodeName(name string) (string, error)
var machine compute.VirtualMachine var machine compute.VirtualMachine
var err error var err error
machine, err = as.getVirtualMachine(types.NodeName(name), cacheReadTypeUnsafe) machine, err = as.getVirtualMachine(types.NodeName(name), azcache.CacheReadTypeUnsafe)
if err == cloudprovider.InstanceNotFound { if err == cloudprovider.InstanceNotFound {
return "", cloudprovider.InstanceNotFound return "", cloudprovider.InstanceNotFound
} }
if err != nil { if err != nil {
if as.CloudProviderBackoff { if as.CloudProviderBackoff {
klog.V(2).Infof("GetInstanceIDByNodeName(%s) backing off", name) klog.V(2).Infof("GetInstanceIDByNodeName(%s) backing off", name)
machine, err = as.GetVirtualMachineWithRetry(types.NodeName(name), cacheReadTypeUnsafe) machine, err = as.GetVirtualMachineWithRetry(types.NodeName(name), azcache.CacheReadTypeUnsafe)
if err != nil { if err != nil {
klog.V(2).Infof("GetInstanceIDByNodeName(%s) abort backoff", name) klog.V(2).Infof("GetInstanceIDByNodeName(%s) abort backoff", name)
return "", err return "", err
@ -419,7 +420,7 @@ func (as *availabilitySet) GetInstanceIDByNodeName(name string) (string, error)
// GetPowerStatusByNodeName returns the power state of the specified node. // GetPowerStatusByNodeName returns the power state of the specified node.
func (as *availabilitySet) GetPowerStatusByNodeName(name string) (powerState string, err error) { func (as *availabilitySet) GetPowerStatusByNodeName(name string) (powerState string, err error) {
vm, err := as.getVirtualMachine(types.NodeName(name), cacheReadTypeDefault) vm, err := as.getVirtualMachine(types.NodeName(name), azcache.CacheReadTypeDefault)
if err != nil { if err != nil {
return powerState, err return powerState, err
} }
@ -452,7 +453,7 @@ func (as *availabilitySet) GetNodeNameByProviderID(providerID string) (types.Nod
// GetInstanceTypeByNodeName gets the instance type by node name. // GetInstanceTypeByNodeName gets the instance type by node name.
func (as *availabilitySet) GetInstanceTypeByNodeName(name string) (string, error) { func (as *availabilitySet) GetInstanceTypeByNodeName(name string) (string, error) {
machine, err := as.getVirtualMachine(types.NodeName(name), cacheReadTypeUnsafe) machine, err := as.getVirtualMachine(types.NodeName(name), azcache.CacheReadTypeUnsafe)
if err != nil { if err != nil {
klog.Errorf("as.GetInstanceTypeByNodeName(%s) failed: as.getVirtualMachine(%s) err=%v", name, name, err) klog.Errorf("as.GetInstanceTypeByNodeName(%s) failed: as.getVirtualMachine(%s) err=%v", name, name, err)
return "", err return "", err
@ -464,7 +465,7 @@ func (as *availabilitySet) GetInstanceTypeByNodeName(name string) (string, error
// GetZoneByNodeName gets availability zone for the specified node. If the node is not running // GetZoneByNodeName gets availability zone for the specified node. If the node is not running
// with availability zone, then it returns fault domain. // with availability zone, then it returns fault domain.
func (as *availabilitySet) GetZoneByNodeName(name string) (cloudprovider.Zone, error) { func (as *availabilitySet) GetZoneByNodeName(name string) (cloudprovider.Zone, error) {
vm, err := as.getVirtualMachine(types.NodeName(name), cacheReadTypeUnsafe) vm, err := as.getVirtualMachine(types.NodeName(name), azcache.CacheReadTypeUnsafe)
if err != nil { if err != nil {
return cloudprovider.Zone{}, err return cloudprovider.Zone{}, err
} }
@ -665,7 +666,7 @@ func extractResourceGroupByNicID(nicID string) (string, error) {
func (as *availabilitySet) getPrimaryInterfaceWithVMSet(nodeName, vmSetName string) (network.Interface, error) { func (as *availabilitySet) getPrimaryInterfaceWithVMSet(nodeName, vmSetName string) (network.Interface, error) {
var machine compute.VirtualMachine var machine compute.VirtualMachine
machine, err := as.GetVirtualMachineWithRetry(types.NodeName(nodeName), cacheReadTypeDefault) machine, err := as.GetVirtualMachineWithRetry(types.NodeName(nodeName), azcache.CacheReadTypeDefault)
if err != nil { if err != nil {
klog.V(2).Infof("GetPrimaryInterface(%s, %s) abort backoff", nodeName, vmSetName) klog.V(2).Infof("GetPrimaryInterface(%s, %s) abort backoff", nodeName, vmSetName)
return network.Interface{}, err return network.Interface{}, err

View File

@ -25,6 +25,7 @@ import (
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
cloudprovider "k8s.io/cloud-provider" cloudprovider "k8s.io/cloud-provider"
azcache "k8s.io/legacy-cloud-providers/azure/cache"
) )
// VMSet defines functions all vmsets (including scale set and availability // VMSet defines functions all vmsets (including scale set and availability
@ -68,7 +69,7 @@ type VMSet interface {
// DetachDisk detaches a vhd from host. The vhd can be identified by diskName or diskURI. // DetachDisk detaches a vhd from host. The vhd can be identified by diskName or diskURI.
DetachDisk(diskName, diskURI string, nodeName types.NodeName) error DetachDisk(diskName, diskURI string, nodeName types.NodeName) error
// GetDataDisks gets a list of data disks attached to the node. // GetDataDisks gets a list of data disks attached to the node.
GetDataDisks(nodeName types.NodeName, crt cacheReadType) ([]compute.DataDisk, error) GetDataDisks(nodeName types.NodeName, string azcache.AzureCacheReadType) ([]compute.DataDisk, error)
// GetPowerStatusByNodeName returns the power state of the specified node. // GetPowerStatusByNodeName returns the power state of the specified node.
GetPowerStatusByNodeName(name string) (string, error) GetPowerStatusByNodeName(name string) (string, error)

View File

@ -36,6 +36,7 @@ import (
utilerrors "k8s.io/apimachinery/pkg/util/errors" utilerrors "k8s.io/apimachinery/pkg/util/errors"
cloudprovider "k8s.io/cloud-provider" cloudprovider "k8s.io/cloud-provider"
"k8s.io/klog" "k8s.io/klog"
azcache "k8s.io/legacy-cloud-providers/azure/cache"
utilnet "k8s.io/utils/net" utilnet "k8s.io/utils/net"
) )
@ -62,9 +63,9 @@ type scaleSet struct {
// (e.g. master nodes) may not belong to any scale sets. // (e.g. master nodes) may not belong to any scale sets.
availabilitySet VMSet availabilitySet VMSet
vmssCache *timedCache vmssCache *azcache.TimedCache
vmssVMCache *timedCache vmssVMCache *azcache.TimedCache
availabilitySetNodesCache *timedCache availabilitySetNodesCache *azcache.TimedCache
} }
// newScaleSet creates a new scaleSet. // newScaleSet creates a new scaleSet.
@ -95,7 +96,7 @@ func newScaleSet(az *Cloud) (VMSet, error) {
return ss, nil return ss, nil
} }
func (ss *scaleSet) getVMSS(vmssName string, crt cacheReadType) (*compute.VirtualMachineScaleSet, error) { func (ss *scaleSet) getVMSS(vmssName string, crt azcache.AzureCacheReadType) (*compute.VirtualMachineScaleSet, error) {
getter := func(vmssName string) (*compute.VirtualMachineScaleSet, error) { getter := func(vmssName string) (*compute.VirtualMachineScaleSet, error) {
cached, err := ss.vmssCache.Get(vmssKey, crt) cached, err := ss.vmssCache.Get(vmssKey, crt)
if err != nil { if err != nil {
@ -134,8 +135,8 @@ func (ss *scaleSet) getVMSS(vmssName string, crt cacheReadType) (*compute.Virtua
// getVmssVM gets virtualMachineScaleSetVM by nodeName from cache. // getVmssVM gets virtualMachineScaleSetVM by nodeName from cache.
// It returns cloudprovider.InstanceNotFound if node does not belong to any scale sets. // It returns cloudprovider.InstanceNotFound if node does not belong to any scale sets.
func (ss *scaleSet) getVmssVM(nodeName string, crt cacheReadType) (string, string, *compute.VirtualMachineScaleSetVM, error) { func (ss *scaleSet) getVmssVM(nodeName string, crt azcache.AzureCacheReadType) (string, string, *compute.VirtualMachineScaleSetVM, error) {
getter := func(nodeName string, crt cacheReadType) (string, string, *compute.VirtualMachineScaleSetVM, bool, error) { getter := func(nodeName string, crt azcache.AzureCacheReadType) (string, string, *compute.VirtualMachineScaleSetVM, bool, error) {
var found bool var found bool
cached, err := ss.vmssVMCache.Get(vmssVirtualMachinesKey, crt) cached, err := ss.vmssVMCache.Get(vmssVirtualMachinesKey, crt)
if err != nil { if err != nil {
@ -164,7 +165,7 @@ func (ss *scaleSet) getVmssVM(nodeName string, crt cacheReadType) (string, strin
if !found { if !found {
klog.V(2).Infof("Couldn't find VMSS VM with nodeName %s, refreshing the cache", nodeName) klog.V(2).Infof("Couldn't find VMSS VM with nodeName %s, refreshing the cache", nodeName)
vmssName, instanceID, vm, found, err = getter(nodeName, cacheReadTypeForceRefresh) vmssName, instanceID, vm, found, err = getter(nodeName, azcache.CacheReadTypeForceRefresh)
if err != nil { if err != nil {
return "", "", nil, err return "", "", nil, err
} }
@ -182,7 +183,7 @@ func (ss *scaleSet) getVmssVM(nodeName string, crt cacheReadType) (string, strin
// GetPowerStatusByNodeName returns the power state of the specified node. // GetPowerStatusByNodeName returns the power state of the specified node.
func (ss *scaleSet) GetPowerStatusByNodeName(name string) (powerState string, err error) { func (ss *scaleSet) GetPowerStatusByNodeName(name string) (powerState string, err error) {
_, _, vm, err := ss.getVmssVM(name, cacheReadTypeDefault) _, _, vm, err := ss.getVmssVM(name, azcache.CacheReadTypeDefault)
if err != nil { if err != nil {
return powerState, err return powerState, err
} }
@ -204,8 +205,8 @@ func (ss *scaleSet) GetPowerStatusByNodeName(name string) (powerState string, er
// getCachedVirtualMachineByInstanceID gets scaleSetVMInfo from cache. // getCachedVirtualMachineByInstanceID gets scaleSetVMInfo from cache.
// The node must belong to one of scale sets. // The node must belong to one of scale sets.
func (ss *scaleSet) getVmssVMByInstanceID(resourceGroup, scaleSetName, instanceID string, crt cacheReadType) (*compute.VirtualMachineScaleSetVM, error) { func (ss *scaleSet) getVmssVMByInstanceID(resourceGroup, scaleSetName, instanceID string, crt azcache.AzureCacheReadType) (*compute.VirtualMachineScaleSetVM, error) {
getter := func(crt cacheReadType) (vm *compute.VirtualMachineScaleSetVM, found bool, err error) { getter := func(crt azcache.AzureCacheReadType) (vm *compute.VirtualMachineScaleSetVM, found bool, err error) {
cached, err := ss.vmssVMCache.Get(vmssVirtualMachinesKey, crt) cached, err := ss.vmssVMCache.Get(vmssVirtualMachinesKey, crt)
if err != nil { if err != nil {
return nil, false, err return nil, false, err
@ -234,7 +235,7 @@ func (ss *scaleSet) getVmssVMByInstanceID(resourceGroup, scaleSetName, instanceI
} }
if !found { if !found {
klog.V(2).Infof("Couldn't find VMSS VM with scaleSetName %q and instanceID %q, refreshing the cache", scaleSetName, instanceID) klog.V(2).Infof("Couldn't find VMSS VM with scaleSetName %q and instanceID %q, refreshing the cache", scaleSetName, instanceID)
vm, found, err = getter(cacheReadTypeForceRefresh) vm, found, err = getter(azcache.CacheReadTypeForceRefresh)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -253,7 +254,7 @@ func (ss *scaleSet) getVmssVMByInstanceID(resourceGroup, scaleSetName, instanceI
// It must return ("", cloudprovider.InstanceNotFound) if the instance does // It must return ("", cloudprovider.InstanceNotFound) if the instance does
// not exist or is no longer running. // not exist or is no longer running.
func (ss *scaleSet) GetInstanceIDByNodeName(name string) (string, error) { func (ss *scaleSet) GetInstanceIDByNodeName(name string) (string, error) {
managedByAS, err := ss.isNodeManagedByAvailabilitySet(name, cacheReadTypeUnsafe) managedByAS, err := ss.isNodeManagedByAvailabilitySet(name, azcache.CacheReadTypeUnsafe)
if err != nil { if err != nil {
klog.Errorf("Failed to check isNodeManagedByAvailabilitySet: %v", err) klog.Errorf("Failed to check isNodeManagedByAvailabilitySet: %v", err)
return "", err return "", err
@ -263,7 +264,7 @@ func (ss *scaleSet) GetInstanceIDByNodeName(name string) (string, error) {
return ss.availabilitySet.GetInstanceIDByNodeName(name) return ss.availabilitySet.GetInstanceIDByNodeName(name)
} }
_, _, vm, err := ss.getVmssVM(name, cacheReadTypeUnsafe) _, _, vm, err := ss.getVmssVM(name, azcache.CacheReadTypeUnsafe)
if err != nil { if err != nil {
return "", err return "", err
} }
@ -297,7 +298,7 @@ func (ss *scaleSet) GetNodeNameByProviderID(providerID string) (types.NodeName,
return ss.availabilitySet.GetNodeNameByProviderID(providerID) return ss.availabilitySet.GetNodeNameByProviderID(providerID)
} }
vm, err := ss.getVmssVMByInstanceID(resourceGroup, scaleSetName, instanceID, cacheReadTypeUnsafe) vm, err := ss.getVmssVMByInstanceID(resourceGroup, scaleSetName, instanceID, azcache.CacheReadTypeUnsafe)
if err != nil { if err != nil {
return "", err return "", err
} }
@ -312,7 +313,7 @@ func (ss *scaleSet) GetNodeNameByProviderID(providerID string) (types.NodeName,
// GetInstanceTypeByNodeName gets the instance type by node name. // GetInstanceTypeByNodeName gets the instance type by node name.
func (ss *scaleSet) GetInstanceTypeByNodeName(name string) (string, error) { func (ss *scaleSet) GetInstanceTypeByNodeName(name string) (string, error) {
managedByAS, err := ss.isNodeManagedByAvailabilitySet(name, cacheReadTypeUnsafe) managedByAS, err := ss.isNodeManagedByAvailabilitySet(name, azcache.CacheReadTypeUnsafe)
if err != nil { if err != nil {
klog.Errorf("Failed to check isNodeManagedByAvailabilitySet: %v", err) klog.Errorf("Failed to check isNodeManagedByAvailabilitySet: %v", err)
return "", err return "", err
@ -322,7 +323,7 @@ func (ss *scaleSet) GetInstanceTypeByNodeName(name string) (string, error) {
return ss.availabilitySet.GetInstanceTypeByNodeName(name) return ss.availabilitySet.GetInstanceTypeByNodeName(name)
} }
_, _, vm, err := ss.getVmssVM(name, cacheReadTypeUnsafe) _, _, vm, err := ss.getVmssVM(name, azcache.CacheReadTypeUnsafe)
if err != nil { if err != nil {
return "", err return "", err
} }
@ -337,7 +338,7 @@ func (ss *scaleSet) GetInstanceTypeByNodeName(name string) (string, error) {
// GetZoneByNodeName gets availability zone for the specified node. If the node is not running // GetZoneByNodeName gets availability zone for the specified node. If the node is not running
// with availability zone, then it returns fault domain. // with availability zone, then it returns fault domain.
func (ss *scaleSet) GetZoneByNodeName(name string) (cloudprovider.Zone, error) { func (ss *scaleSet) GetZoneByNodeName(name string) (cloudprovider.Zone, error) {
managedByAS, err := ss.isNodeManagedByAvailabilitySet(name, cacheReadTypeUnsafe) managedByAS, err := ss.isNodeManagedByAvailabilitySet(name, azcache.CacheReadTypeUnsafe)
if err != nil { if err != nil {
klog.Errorf("Failed to check isNodeManagedByAvailabilitySet: %v", err) klog.Errorf("Failed to check isNodeManagedByAvailabilitySet: %v", err)
return cloudprovider.Zone{}, err return cloudprovider.Zone{}, err
@ -347,7 +348,7 @@ func (ss *scaleSet) GetZoneByNodeName(name string) (cloudprovider.Zone, error) {
return ss.availabilitySet.GetZoneByNodeName(name) return ss.availabilitySet.GetZoneByNodeName(name)
} }
_, _, vm, err := ss.getVmssVM(name, cacheReadTypeUnsafe) _, _, vm, err := ss.getVmssVM(name, azcache.CacheReadTypeUnsafe)
if err != nil { if err != nil {
return cloudprovider.Zone{}, err return cloudprovider.Zone{}, err
} }
@ -583,7 +584,7 @@ func (ss *scaleSet) getAgentPoolScaleSets(nodes []*v1.Node) (*[]string, error) {
} }
nodeName := nodes[nx].Name nodeName := nodes[nx].Name
ssName, _, _, err := ss.getVmssVM(nodeName, cacheReadTypeDefault) ssName, _, _, err := ss.getVmssVM(nodeName, azcache.CacheReadTypeDefault)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -661,7 +662,7 @@ func extractResourceGroupByVMSSNicID(nicID string) (string, error) {
// GetPrimaryInterface gets machine primary network interface by node name and vmSet. // GetPrimaryInterface gets machine primary network interface by node name and vmSet.
func (ss *scaleSet) GetPrimaryInterface(nodeName string) (network.Interface, error) { func (ss *scaleSet) GetPrimaryInterface(nodeName string) (network.Interface, error) {
managedByAS, err := ss.isNodeManagedByAvailabilitySet(nodeName, cacheReadTypeDefault) managedByAS, err := ss.isNodeManagedByAvailabilitySet(nodeName, azcache.CacheReadTypeDefault)
if err != nil { if err != nil {
klog.Errorf("Failed to check isNodeManagedByAvailabilitySet: %v", err) klog.Errorf("Failed to check isNodeManagedByAvailabilitySet: %v", err)
return network.Interface{}, err return network.Interface{}, err
@ -671,7 +672,7 @@ func (ss *scaleSet) GetPrimaryInterface(nodeName string) (network.Interface, err
return ss.availabilitySet.GetPrimaryInterface(nodeName) return ss.availabilitySet.GetPrimaryInterface(nodeName)
} }
ssName, instanceID, vm, err := ss.getVmssVM(nodeName, cacheReadTypeDefault) ssName, instanceID, vm, err := ss.getVmssVM(nodeName, azcache.CacheReadTypeDefault)
if err != nil { if err != nil {
// VM is availability set, but not cached yet in availabilitySetNodesCache. // VM is availability set, but not cached yet in availabilitySetNodesCache.
if err == ErrorNotVmssInstance { if err == ErrorNotVmssInstance {
@ -794,7 +795,7 @@ func (ss *scaleSet) getConfigForScaleSetByIPFamily(config *compute.VirtualMachin
func (ss *scaleSet) EnsureHostInPool(service *v1.Service, nodeName types.NodeName, backendPoolID string, vmSetName string, isInternal bool) error { func (ss *scaleSet) EnsureHostInPool(service *v1.Service, nodeName types.NodeName, backendPoolID string, vmSetName string, isInternal bool) error {
klog.V(3).Infof("ensuring node %q of scaleset %q in LB backendpool %q", nodeName, vmSetName, backendPoolID) klog.V(3).Infof("ensuring node %q of scaleset %q in LB backendpool %q", nodeName, vmSetName, backendPoolID)
vmName := mapNodeNameToVMName(nodeName) vmName := mapNodeNameToVMName(nodeName)
ssName, instanceID, vm, err := ss.getVmssVM(vmName, cacheReadTypeDefault) ssName, instanceID, vm, err := ss.getVmssVM(vmName, azcache.CacheReadTypeDefault)
if err != nil { if err != nil {
return err return err
} }
@ -947,7 +948,7 @@ func (ss *scaleSet) ensureVMSSInPool(service *v1.Service, nodes []*v1.Node, back
} }
for vmssName := range vmssNamesMap { for vmssName := range vmssNamesMap {
vmss, err := ss.getVMSS(vmssName, cacheReadTypeDefault) vmss, err := ss.getVMSS(vmssName, azcache.CacheReadTypeDefault)
if err != nil { if err != nil {
return err return err
} }
@ -1056,7 +1057,7 @@ func (ss *scaleSet) EnsureHostsInPool(service *v1.Service, nodes []*v1.Node, bac
f := func() error { f := func() error {
// Check whether the node is VMAS virtual machine. // Check whether the node is VMAS virtual machine.
managedByAS, err := ss.isNodeManagedByAvailabilitySet(localNodeName, cacheReadTypeDefault) managedByAS, err := ss.isNodeManagedByAvailabilitySet(localNodeName, azcache.CacheReadTypeDefault)
if err != nil { if err != nil {
klog.Errorf("Failed to check isNodeManagedByAvailabilitySet(%s): %v", localNodeName, err) klog.Errorf("Failed to check isNodeManagedByAvailabilitySet(%s): %v", localNodeName, err)
return err return err
@ -1097,7 +1098,7 @@ func (ss *scaleSet) EnsureHostsInPool(service *v1.Service, nodes []*v1.Node, bac
// ensureBackendPoolDeletedFromNode ensures the loadBalancer backendAddressPools deleted from the specified node. // ensureBackendPoolDeletedFromNode ensures the loadBalancer backendAddressPools deleted from the specified node.
func (ss *scaleSet) ensureBackendPoolDeletedFromNode(service *v1.Service, nodeName, backendPoolID string) error { func (ss *scaleSet) ensureBackendPoolDeletedFromNode(service *v1.Service, nodeName, backendPoolID string) error {
ssName, instanceID, vm, err := ss.getVmssVM(nodeName, cacheReadTypeDefault) ssName, instanceID, vm, err := ss.getVmssVM(nodeName, azcache.CacheReadTypeDefault)
if err != nil { if err != nil {
return err return err
} }
@ -1186,7 +1187,7 @@ func (ss *scaleSet) getNodeNameByIPConfigurationID(ipConfigurationID string) (st
resourceGroup := matches[1] resourceGroup := matches[1]
scaleSetName := matches[2] scaleSetName := matches[2]
instanceID := matches[3] instanceID := matches[3]
vm, err := ss.getVmssVMByInstanceID(resourceGroup, scaleSetName, instanceID, cacheReadTypeUnsafe) vm, err := ss.getVmssVMByInstanceID(resourceGroup, scaleSetName, instanceID, azcache.CacheReadTypeUnsafe)
if err != nil { if err != nil {
return "", err return "", err
} }
@ -1232,7 +1233,7 @@ func (ss *scaleSet) ensureBackendPoolDeletedFromVMSS(service *v1.Service, backen
} }
for vmssName := range vmssNamesMap { for vmssName := range vmssNamesMap {
vmss, err := ss.getVMSS(vmssName, cacheReadTypeDefault) vmss, err := ss.getVMSS(vmssName, azcache.CacheReadTypeDefault)
// When vmss is being deleted, CreateOrUpdate API would report "the vmss is being deleted" error. // When vmss is being deleted, CreateOrUpdate API would report "the vmss is being deleted" error.
// Since it is being deleted, we shouldn't send more CreateOrUpdate requests for it. // Since it is being deleted, we shouldn't send more CreateOrUpdate requests for it.

View File

@ -29,6 +29,7 @@ import (
"k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/sets"
"k8s.io/klog" "k8s.io/klog"
azcache "k8s.io/legacy-cloud-providers/azure/cache"
) )
var ( var (
@ -56,7 +57,7 @@ type vmssEntry struct {
lastUpdate time.Time lastUpdate time.Time
} }
func (ss *scaleSet) newVMSSCache() (*timedCache, error) { func (ss *scaleSet) newVMSSCache() (*azcache.TimedCache, error) {
getter := func(key string) (interface{}, error) { getter := func(key string) (interface{}, error) {
localCache := &sync.Map{} // [vmssName]*vmssEntry localCache := &sync.Map{} // [vmssName]*vmssEntry
@ -90,7 +91,7 @@ func (ss *scaleSet) newVMSSCache() (*timedCache, error) {
if ss.Config.VmssCacheTTLInSeconds == 0 { if ss.Config.VmssCacheTTLInSeconds == 0 {
ss.Config.VmssCacheTTLInSeconds = vmssCacheTTLDefaultInSeconds ss.Config.VmssCacheTTLInSeconds = vmssCacheTTLDefaultInSeconds
} }
return newTimedcache(time.Duration(ss.Config.VmssCacheTTLInSeconds)*time.Second, getter) return azcache.NewTimedcache(time.Duration(ss.Config.VmssCacheTTLInSeconds)*time.Second, getter)
} }
func extractVmssVMName(name string) (string, string, error) { func extractVmssVMName(name string) (string, string, error) {
@ -107,7 +108,7 @@ func extractVmssVMName(name string) (string, string, error) {
return ssName, instanceID, nil return ssName, instanceID, nil
} }
func (ss *scaleSet) newVMSSVirtualMachinesCache() (*timedCache, error) { func (ss *scaleSet) newVMSSVirtualMachinesCache() (*azcache.TimedCache, error) {
getter := func(key string) (interface{}, error) { getter := func(key string) (interface{}, error) {
localCache := &sync.Map{} // [nodeName]*vmssVirtualMachinesEntry localCache := &sync.Map{} // [nodeName]*vmssVirtualMachinesEntry
@ -115,12 +116,12 @@ func (ss *scaleSet) newVMSSVirtualMachinesCache() (*timedCache, error) {
if ss.vmssVMCache != nil { if ss.vmssVMCache != nil {
// get old cache before refreshing the cache // get old cache before refreshing the cache
entry, exists, err := ss.vmssVMCache.store.GetByKey(vmssVirtualMachinesKey) entry, exists, err := ss.vmssVMCache.Store.GetByKey(vmssVirtualMachinesKey)
if err != nil { if err != nil {
return nil, err return nil, err
} }
if exists { if exists {
cached := entry.(*cacheEntry).data cached := entry.(*azcache.AzureCacheEntry).Data
if cached != nil { if cached != nil {
virtualMachines := cached.(*sync.Map) virtualMachines := cached.(*sync.Map)
virtualMachines.Range(func(key, value interface{}) bool { virtualMachines.Range(func(key, value interface{}) bool {
@ -210,11 +211,11 @@ func (ss *scaleSet) newVMSSVirtualMachinesCache() (*timedCache, error) {
if ss.Config.VmssVirtualMachinesCacheTTLInSeconds == 0 { if ss.Config.VmssVirtualMachinesCacheTTLInSeconds == 0 {
ss.Config.VmssVirtualMachinesCacheTTLInSeconds = vmssVirtualMachinesCacheTTLDefaultInSeconds ss.Config.VmssVirtualMachinesCacheTTLInSeconds = vmssVirtualMachinesCacheTTLDefaultInSeconds
} }
return newTimedcache(time.Duration(ss.Config.VmssVirtualMachinesCacheTTLInSeconds)*time.Second, getter) return azcache.NewTimedcache(time.Duration(ss.Config.VmssVirtualMachinesCacheTTLInSeconds)*time.Second, getter)
} }
func (ss *scaleSet) deleteCacheForNode(nodeName string) error { func (ss *scaleSet) deleteCacheForNode(nodeName string) error {
cached, err := ss.vmssVMCache.Get(vmssVirtualMachinesKey, cacheReadTypeUnsafe) cached, err := ss.vmssVMCache.Get(vmssVirtualMachinesKey, azcache.CacheReadTypeUnsafe)
if err != nil { if err != nil {
klog.Errorf("deleteCacheForNode(%s) failed with error: %v", nodeName, err) klog.Errorf("deleteCacheForNode(%s) failed with error: %v", nodeName, err)
return err return err
@ -225,7 +226,7 @@ func (ss *scaleSet) deleteCacheForNode(nodeName string) error {
return nil return nil
} }
func (ss *scaleSet) newAvailabilitySetNodesCache() (*timedCache, error) { func (ss *scaleSet) newAvailabilitySetNodesCache() (*azcache.TimedCache, error) {
getter := func(key string) (interface{}, error) { getter := func(key string) (interface{}, error) {
localCache := sets.NewString() localCache := sets.NewString()
resourceGroups, err := ss.GetResourceGroups() resourceGroups, err := ss.GetResourceGroups()
@ -252,10 +253,10 @@ func (ss *scaleSet) newAvailabilitySetNodesCache() (*timedCache, error) {
if ss.Config.AvailabilitySetNodesCacheTTLInSeconds == 0 { if ss.Config.AvailabilitySetNodesCacheTTLInSeconds == 0 {
ss.Config.AvailabilitySetNodesCacheTTLInSeconds = availabilitySetNodesCacheTTLDefaultInSeconds ss.Config.AvailabilitySetNodesCacheTTLInSeconds = availabilitySetNodesCacheTTLDefaultInSeconds
} }
return newTimedcache(time.Duration(ss.Config.AvailabilitySetNodesCacheTTLInSeconds)*time.Second, getter) return azcache.NewTimedcache(time.Duration(ss.Config.AvailabilitySetNodesCacheTTLInSeconds)*time.Second, getter)
} }
func (ss *scaleSet) isNodeManagedByAvailabilitySet(nodeName string, crt cacheReadType) (bool, error) { func (ss *scaleSet) isNodeManagedByAvailabilitySet(nodeName string, crt azcache.AzureCacheReadType) (bool, error) {
// Assume all nodes are managed by VMSS when DisableAvailabilitySetNodes is enabled. // Assume all nodes are managed by VMSS when DisableAvailabilitySetNodes is enabled.
if ss.DisableAvailabilitySetNodes { if ss.DisableAvailabilitySetNodes {
klog.V(2).Infof("Assuming node %q is managed by VMSS since DisableAvailabilitySetNodes is set to true", nodeName) klog.V(2).Infof("Assuming node %q is managed by VMSS since DisableAvailabilitySetNodes is set to true", nodeName)

View File

@ -27,6 +27,7 @@ import (
"github.com/Azure/go-autorest/autorest/to" "github.com/Azure/go-autorest/autorest/to"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
cloudprovider "k8s.io/cloud-provider" cloudprovider "k8s.io/cloud-provider"
azcache "k8s.io/legacy-cloud-providers/azure/cache"
) )
func TestExtractVmssVMName(t *testing.T) { func TestExtractVmssVMName(t *testing.T) {
@ -87,7 +88,7 @@ func TestVMSSVMCache(t *testing.T) {
for i := range virtualMachines { for i := range virtualMachines {
vm := virtualMachines[i] vm := virtualMachines[i]
vmName := to.String(vm.OsProfile.ComputerName) vmName := to.String(vm.OsProfile.ComputerName)
ssName, instanceID, realVM, err := ss.getVmssVM(vmName, cacheReadTypeDefault) ssName, instanceID, realVM, err := ss.getVmssVM(vmName, azcache.CacheReadTypeDefault)
assert.Nil(t, err) assert.Nil(t, err)
assert.Equal(t, "vmss", ssName) assert.Equal(t, "vmss", ssName)
assert.Equal(t, to.String(vm.InstanceID), instanceID) assert.Equal(t, to.String(vm.InstanceID), instanceID)
@ -101,14 +102,14 @@ func TestVMSSVMCache(t *testing.T) {
assert.NoError(t, err) assert.NoError(t, err)
// the VM should be removed from cache after deleteCacheForNode(). // the VM should be removed from cache after deleteCacheForNode().
cached, err := ss.vmssVMCache.Get(vmssVirtualMachinesKey, cacheReadTypeDefault) cached, err := ss.vmssVMCache.Get(vmssVirtualMachinesKey, azcache.CacheReadTypeDefault)
assert.NoError(t, err) assert.NoError(t, err)
cachedVirtualMachines := cached.(*sync.Map) cachedVirtualMachines := cached.(*sync.Map)
_, ok := cachedVirtualMachines.Load(vmName) _, ok := cachedVirtualMachines.Load(vmName)
assert.Equal(t, false, ok) assert.Equal(t, false, ok)
// the VM should be get back after another cache refresh. // the VM should be get back after another cache refresh.
ssName, instanceID, realVM, err := ss.getVmssVM(vmName, cacheReadTypeDefault) ssName, instanceID, realVM, err := ss.getVmssVM(vmName, azcache.CacheReadTypeDefault)
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, "vmss", ssName) assert.Equal(t, "vmss", ssName)
assert.Equal(t, to.String(vm.InstanceID), instanceID) assert.Equal(t, to.String(vm.InstanceID), instanceID)
@ -130,7 +131,7 @@ func TestVMSSVMCacheWithDeletingNodes(t *testing.T) {
vmName := to.String(vm.OsProfile.ComputerName) vmName := to.String(vm.OsProfile.ComputerName)
assert.Equal(t, vm.ProvisioningState, to.StringPtr(string(compute.ProvisioningStateDeleting))) assert.Equal(t, vm.ProvisioningState, to.StringPtr(string(compute.ProvisioningStateDeleting)))
ssName, instanceID, realVM, err := ss.getVmssVM(vmName, cacheReadTypeDefault) ssName, instanceID, realVM, err := ss.getVmssVM(vmName, azcache.CacheReadTypeDefault)
assert.Nil(t, realVM) assert.Nil(t, realVM)
assert.Equal(t, "", ssName) assert.Equal(t, "", ssName)
assert.Equal(t, instanceID, ssName) assert.Equal(t, instanceID, ssName)

View File

@ -32,6 +32,7 @@ import (
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
cloudprovider "k8s.io/cloud-provider" cloudprovider "k8s.io/cloud-provider"
"k8s.io/klog" "k8s.io/klog"
azcache "k8s.io/legacy-cloud-providers/azure/cache"
"k8s.io/legacy-cloud-providers/azure/retry" "k8s.io/legacy-cloud-providers/azure/retry"
) )
@ -63,7 +64,7 @@ func checkResourceExistsFromError(err *retry.Error) (bool, *retry.Error) {
/// getVirtualMachine calls 'VirtualMachinesClient.Get' with a timed cache /// getVirtualMachine calls 'VirtualMachinesClient.Get' with a timed cache
/// The service side has throttling control that delays responses if there're multiple requests onto certain vm /// The service side has throttling control that delays responses if there're multiple requests onto certain vm
/// resource request in short period. /// resource request in short period.
func (az *Cloud) getVirtualMachine(nodeName types.NodeName, crt cacheReadType) (vm compute.VirtualMachine, err error) { func (az *Cloud) getVirtualMachine(nodeName types.NodeName, crt azcache.AzureCacheReadType) (vm compute.VirtualMachine, err error) {
vmName := string(nodeName) vmName := string(nodeName)
cachedVM, err := az.vmCache.Get(vmName, crt) cachedVM, err := az.vmCache.Get(vmName, crt)
if err != nil { if err != nil {
@ -77,7 +78,7 @@ func (az *Cloud) getVirtualMachine(nodeName types.NodeName, crt cacheReadType) (
return *(cachedVM.(*compute.VirtualMachine)), nil return *(cachedVM.(*compute.VirtualMachine)), nil
} }
func (az *Cloud) getRouteTable(crt cacheReadType) (routeTable network.RouteTable, exists bool, err error) { func (az *Cloud) getRouteTable(crt azcache.AzureCacheReadType) (routeTable network.RouteTable, exists bool, err error) {
cachedRt, err := az.rtCache.Get(az.RouteTableName, crt) cachedRt, err := az.rtCache.Get(az.RouteTableName, crt)
if err != nil { if err != nil {
return routeTable, false, err return routeTable, false, err
@ -136,7 +137,7 @@ func (az *Cloud) getSubnet(virtualNetworkName string, subnetName string) (networ
return subnet, exists, nil return subnet, exists, nil
} }
func (az *Cloud) getAzureLoadBalancer(name string, crt cacheReadType) (lb network.LoadBalancer, exists bool, err error) { func (az *Cloud) getAzureLoadBalancer(name string, crt azcache.AzureCacheReadType) (lb network.LoadBalancer, exists bool, err error) {
cachedLB, err := az.lbCache.Get(name, crt) cachedLB, err := az.lbCache.Get(name, crt)
if err != nil { if err != nil {
return lb, false, err return lb, false, err
@ -149,7 +150,7 @@ func (az *Cloud) getAzureLoadBalancer(name string, crt cacheReadType) (lb networ
return *(cachedLB.(*network.LoadBalancer)), true, nil return *(cachedLB.(*network.LoadBalancer)), true, nil
} }
func (az *Cloud) getSecurityGroup(crt cacheReadType) (network.SecurityGroup, error) { func (az *Cloud) getSecurityGroup(crt azcache.AzureCacheReadType) (network.SecurityGroup, error) {
nsg := network.SecurityGroup{} nsg := network.SecurityGroup{}
if az.SecurityGroupName == "" { if az.SecurityGroupName == "" {
return nsg, fmt.Errorf("securityGroupName is not configured") return nsg, fmt.Errorf("securityGroupName is not configured")
@ -167,7 +168,7 @@ func (az *Cloud) getSecurityGroup(crt cacheReadType) (network.SecurityGroup, err
return *(securityGroup.(*network.SecurityGroup)), nil return *(securityGroup.(*network.SecurityGroup)), nil
} }
func (az *Cloud) newVMCache() (*timedCache, error) { func (az *Cloud) newVMCache() (*azcache.TimedCache, error) {
getter := func(key string) (interface{}, error) { getter := func(key string) (interface{}, error) {
// Currently InstanceView request are used by azure_zones, while the calls come after non-InstanceView // Currently InstanceView request are used by azure_zones, while the calls come after non-InstanceView
// request. If we first send an InstanceView request and then a non InstanceView request, the second // request. If we first send an InstanceView request and then a non InstanceView request, the second
@ -206,10 +207,10 @@ func (az *Cloud) newVMCache() (*timedCache, error) {
if az.VMCacheTTLInSeconds == 0 { if az.VMCacheTTLInSeconds == 0 {
az.VMCacheTTLInSeconds = vmCacheTTLDefaultInSeconds az.VMCacheTTLInSeconds = vmCacheTTLDefaultInSeconds
} }
return newTimedcache(time.Duration(az.VMCacheTTLInSeconds)*time.Second, getter) return azcache.NewTimedcache(time.Duration(az.VMCacheTTLInSeconds)*time.Second, getter)
} }
func (az *Cloud) newLBCache() (*timedCache, error) { func (az *Cloud) newLBCache() (*azcache.TimedCache, error) {
getter := func(key string) (interface{}, error) { getter := func(key string) (interface{}, error) {
ctx, cancel := getContextWithCancel() ctx, cancel := getContextWithCancel()
defer cancel() defer cancel()
@ -231,10 +232,10 @@ func (az *Cloud) newLBCache() (*timedCache, error) {
if az.LoadBalancerCacheTTLInSeconds == 0 { if az.LoadBalancerCacheTTLInSeconds == 0 {
az.LoadBalancerCacheTTLInSeconds = loadBalancerCacheTTLDefaultInSeconds az.LoadBalancerCacheTTLInSeconds = loadBalancerCacheTTLDefaultInSeconds
} }
return newTimedcache(time.Duration(az.LoadBalancerCacheTTLInSeconds)*time.Second, getter) return azcache.NewTimedcache(time.Duration(az.LoadBalancerCacheTTLInSeconds)*time.Second, getter)
} }
func (az *Cloud) newNSGCache() (*timedCache, error) { func (az *Cloud) newNSGCache() (*azcache.TimedCache, error) {
getter := func(key string) (interface{}, error) { getter := func(key string) (interface{}, error) {
ctx, cancel := getContextWithCancel() ctx, cancel := getContextWithCancel()
defer cancel() defer cancel()
@ -255,10 +256,10 @@ func (az *Cloud) newNSGCache() (*timedCache, error) {
if az.NsgCacheTTLInSeconds == 0 { if az.NsgCacheTTLInSeconds == 0 {
az.NsgCacheTTLInSeconds = nsgCacheTTLDefaultInSeconds az.NsgCacheTTLInSeconds = nsgCacheTTLDefaultInSeconds
} }
return newTimedcache(time.Duration(az.NsgCacheTTLInSeconds)*time.Second, getter) return azcache.NewTimedcache(time.Duration(az.NsgCacheTTLInSeconds)*time.Second, getter)
} }
func (az *Cloud) newRouteTableCache() (*timedCache, error) { func (az *Cloud) newRouteTableCache() (*azcache.TimedCache, error) {
getter := func(key string) (interface{}, error) { getter := func(key string) (interface{}, error) {
ctx, cancel := getContextWithCancel() ctx, cancel := getContextWithCancel()
defer cancel() defer cancel()
@ -279,7 +280,7 @@ func (az *Cloud) newRouteTableCache() (*timedCache, error) {
if az.RouteTableCacheTTLInSeconds == 0 { if az.RouteTableCacheTTLInSeconds == 0 {
az.RouteTableCacheTTLInSeconds = routeTableCacheTTLDefaultInSeconds az.RouteTableCacheTTLInSeconds = routeTableCacheTTLDefaultInSeconds
} }
return newTimedcache(time.Duration(az.RouteTableCacheTTLInSeconds)*time.Second, getter) return azcache.NewTimedcache(time.Duration(az.RouteTableCacheTTLInSeconds)*time.Second, getter)
} }
func (az *Cloud) useStandardLoadBalancer() bool { func (az *Cloud) useStandardLoadBalancer() bool {

View File

@ -28,6 +28,7 @@ import (
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
cloudprovider "k8s.io/cloud-provider" cloudprovider "k8s.io/cloud-provider"
"k8s.io/klog" "k8s.io/klog"
azcache "k8s.io/legacy-cloud-providers/azure/cache"
) )
// makeZone returns the zone value in format of <region>-<zone-id>. // makeZone returns the zone value in format of <region>-<zone-id>.
@ -53,7 +54,7 @@ func (az *Cloud) GetZoneID(zoneLabel string) string {
// If the node is not running with availability zones, then it will fall back to fault domain. // If the node is not running with availability zones, then it will fall back to fault domain.
func (az *Cloud) GetZone(ctx context.Context) (cloudprovider.Zone, error) { func (az *Cloud) GetZone(ctx context.Context) (cloudprovider.Zone, error) {
if az.UseInstanceMetadata { if az.UseInstanceMetadata {
metadata, err := az.metadata.GetMetadata(cacheReadTypeUnsafe) metadata, err := az.metadata.GetMetadata(azcache.CacheReadTypeUnsafe)
if err != nil { if err != nil {
return cloudprovider.Zone{}, err return cloudprovider.Zone{}, err
} }

View File

@ -0,0 +1,34 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = [
"azure_cache.go",
"doc.go",
],
importmap = "k8s.io/kubernetes/vendor/k8s.io/legacy-cloud-providers/azure/cache",
importpath = "k8s.io/legacy-cloud-providers/azure/cache",
visibility = ["//visibility:public"],
deps = ["//staging/src/k8s.io/client-go/tools/cache:go_default_library"],
)
go_test(
name = "go_default_test",
srcs = ["azure_cache_test.go"],
embed = [":go_default_library"],
deps = ["//vendor/github.com/stretchr/testify/assert:go_default_library"],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)

View File

@ -16,7 +16,7 @@ See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
*/ */
package azure package cache
import ( import (
"fmt" "fmt"
@ -26,143 +26,143 @@ import (
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
) )
// cacheReadType defines the read type for cache data // AzureCacheReadType defines the read type for cache data
type cacheReadType int type AzureCacheReadType int
const ( const (
// cacheReadTypeDefault returns data from cache if cache entry not expired // CacheReadTypeDefault returns data from cache if cache entry not expired
// if cache entry expired, then it will refetch the data using getter // if cache entry expired, then it will refetch the data using getter
// save the entry in cache and then return // save the entry in cache and then return
cacheReadTypeDefault cacheReadType = iota CacheReadTypeDefault AzureCacheReadType = iota
// cacheReadTypeUnsafe returns data from cache even if the cache entry is // CacheReadTypeUnsafe returns data from cache even if the cache entry is
// active/expired. If entry doesn't exist in cache, then data is fetched // active/expired. If entry doesn't exist in cache, then data is fetched
// using getter, saved in cache and returned // using getter, saved in cache and returned
cacheReadTypeUnsafe CacheReadTypeUnsafe
// cacheReadTypeForceRefresh force refreshes the cache even if the cache entry // CacheReadTypeForceRefresh force refreshes the cache even if the cache entry
// is not expired // is not expired
cacheReadTypeForceRefresh CacheReadTypeForceRefresh
) )
// getFunc defines a getter function for timedCache. // GetFunc defines a getter function for timedCache.
type getFunc func(key string) (interface{}, error) type GetFunc func(key string) (interface{}, error)
// cacheEntry is the internal structure stores inside TTLStore. // AzureCacheEntry is the internal structure stores inside TTLStore.
type cacheEntry struct { type AzureCacheEntry struct {
key string Key string
data interface{} Data interface{}
// The lock to ensure not updating same entry simultaneously. // The lock to ensure not updating same entry simultaneously.
lock sync.Mutex Lock sync.Mutex
// time when entry was fetched and created // time when entry was fetched and created
createdOn time.Time CreatedOn time.Time
} }
// cacheKeyFunc defines the key function required in TTLStore. // cacheKeyFunc defines the key function required in TTLStore.
func cacheKeyFunc(obj interface{}) (string, error) { func cacheKeyFunc(obj interface{}) (string, error) {
return obj.(*cacheEntry).key, nil return obj.(*AzureCacheEntry).Key, nil
} }
// timedCache is a cache with TTL. // TimedCache is a cache with TTL.
type timedCache struct { type TimedCache struct {
store cache.Store Store cache.Store
lock sync.Mutex Lock sync.Mutex
getter getFunc Getter GetFunc
ttl time.Duration TTL time.Duration
} }
// newTimedcache creates a new timedCache. // NewTimedcache creates a new TimedCache.
func newTimedcache(ttl time.Duration, getter getFunc) (*timedCache, error) { func NewTimedcache(ttl time.Duration, getter GetFunc) (*TimedCache, error) {
if getter == nil { if getter == nil {
return nil, fmt.Errorf("getter is not provided") return nil, fmt.Errorf("getter is not provided")
} }
return &timedCache{ return &TimedCache{
getter: getter, Getter: getter,
// switch to using NewStore instead of NewTTLStore so that we can // switch to using NewStore instead of NewTTLStore so that we can
// reuse entries for calls that are fine with reading expired/stalled data. // reuse entries for calls that are fine with reading expired/stalled data.
// with NewTTLStore, entries are not returned if they have already expired. // with NewTTLStore, entries are not returned if they have already expired.
store: cache.NewStore(cacheKeyFunc), Store: cache.NewStore(cacheKeyFunc),
ttl: ttl, TTL: ttl,
}, nil }, nil
} }
// getInternal returns cacheEntry by key. If the key is not cached yet, // getInternal returns AzureCacheEntry by key. If the key is not cached yet,
// it returns a cacheEntry with nil data. // it returns a AzureCacheEntry with nil data.
func (t *timedCache) getInternal(key string) (*cacheEntry, error) { func (t *TimedCache) getInternal(key string) (*AzureCacheEntry, error) {
entry, exists, err := t.store.GetByKey(key) entry, exists, err := t.Store.GetByKey(key)
if err != nil { if err != nil {
return nil, err return nil, err
} }
// if entry exists, return the entry // if entry exists, return the entry
if exists { if exists {
return entry.(*cacheEntry), nil return entry.(*AzureCacheEntry), nil
} }
// lock here to ensure if entry doesn't exist, we add a new entry // lock here to ensure if entry doesn't exist, we add a new entry
// avoiding overwrites // avoiding overwrites
t.lock.Lock() t.Lock.Lock()
defer t.lock.Unlock() defer t.Lock.Unlock()
// Still not found, add new entry with nil data. // Still not found, add new entry with nil data.
// Note the data will be filled later by getter. // Note the data will be filled later by getter.
newEntry := &cacheEntry{ newEntry := &AzureCacheEntry{
key: key, Key: key,
data: nil, Data: nil,
} }
t.store.Add(newEntry) t.Store.Add(newEntry)
return newEntry, nil return newEntry, nil
} }
// Get returns the requested item by key. // Get returns the requested item by key.
func (t *timedCache) Get(key string, crt cacheReadType) (interface{}, error) { func (t *TimedCache) Get(key string, crt AzureCacheReadType) (interface{}, error) {
entry, err := t.getInternal(key) entry, err := t.getInternal(key)
if err != nil { if err != nil {
return nil, err return nil, err
} }
entry.lock.Lock() entry.Lock.Lock()
defer entry.lock.Unlock() defer entry.Lock.Unlock()
// entry exists and if cache is not force refreshed // entry exists and if cache is not force refreshed
if entry.data != nil && crt != cacheReadTypeForceRefresh { if entry.Data != nil && crt != CacheReadTypeForceRefresh {
// allow unsafe read, so return data even if expired // allow unsafe read, so return data even if expired
if crt == cacheReadTypeUnsafe { if crt == CacheReadTypeUnsafe {
return entry.data, nil return entry.Data, nil
} }
// if cached data is not expired, return cached data // if cached data is not expired, return cached data
if crt == cacheReadTypeDefault && time.Since(entry.createdOn) < t.ttl { if crt == CacheReadTypeDefault && time.Since(entry.CreatedOn) < t.TTL {
return entry.data, nil return entry.Data, nil
} }
} }
// Data is not cached yet, cache data is expired or requested force refresh // Data is not cached yet, cache data is expired or requested force refresh
// cache it by getter. entry is locked before getting to ensure concurrent // cache it by getter. entry is locked before getting to ensure concurrent
// gets don't result in multiple ARM calls. // gets don't result in multiple ARM calls.
data, err := t.getter(key) data, err := t.Getter(key)
if err != nil { if err != nil {
return nil, err return nil, err
} }
// set the data in cache and also set the last update time // set the data in cache and also set the last update time
// to now as the data was recently fetched // to now as the data was recently fetched
entry.data = data entry.Data = data
entry.createdOn = time.Now().UTC() entry.CreatedOn = time.Now().UTC()
return entry.data, nil return entry.Data, nil
} }
// Delete removes an item from the cache. // Delete removes an item from the cache.
func (t *timedCache) Delete(key string) error { func (t *TimedCache) Delete(key string) error {
return t.store.Delete(&cacheEntry{ return t.Store.Delete(&AzureCacheEntry{
key: key, Key: key,
}) })
} }
// Set sets the data cache for the key. // Set sets the data cache for the key.
// It is only used for testing. // It is only used for testing.
func (t *timedCache) Set(key string, data interface{}) { func (t *TimedCache) Set(key string, data interface{}) {
t.store.Add(&cacheEntry{ t.Store.Add(&AzureCacheEntry{
key: key, Key: key,
data: data, Data: data,
createdOn: time.Now().UTC(), CreatedOn: time.Now().UTC(),
}) })
} }

View File

@ -16,7 +16,7 @@ See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
*/ */
package azure package cache
import ( import (
"fmt" "fmt"
@ -59,12 +59,12 @@ func (fake *fakeDataSource) set(data map[string]*fakeDataObj) {
fake.called = 0 fake.called = 0
} }
func newFakeCache(t *testing.T) (*fakeDataSource, *timedCache) { func newFakeCache(t *testing.T) (*fakeDataSource, *TimedCache) {
dataSource := &fakeDataSource{ dataSource := &fakeDataSource{
data: make(map[string]*fakeDataObj), data: make(map[string]*fakeDataObj),
} }
getter := dataSource.get getter := dataSource.get
cache, err := newTimedcache(fakeCacheTTL, getter) cache, err := NewTimedcache(fakeCacheTTL, getter)
assert.NoError(t, err) assert.NoError(t, err)
return dataSource, cache return dataSource, cache
} }
@ -99,7 +99,7 @@ func TestCacheGet(t *testing.T) {
for _, c := range cases { for _, c := range cases {
dataSource, cache := newFakeCache(t) dataSource, cache := newFakeCache(t)
dataSource.set(c.data) dataSource.set(c.data)
val, err := cache.Get(c.key, cacheReadTypeDefault) val, err := cache.Get(c.key, CacheReadTypeDefault)
assert.NoError(t, err, c.name) assert.NoError(t, err, c.name)
assert.Equal(t, c.expected, val, c.name) assert.Equal(t, c.expected, val, c.name)
} }
@ -110,10 +110,10 @@ func TestCacheGetError(t *testing.T) {
getter := func(key string) (interface{}, error) { getter := func(key string) (interface{}, error) {
return nil, getError return nil, getError
} }
cache, err := newTimedcache(fakeCacheTTL, getter) cache, err := NewTimedcache(fakeCacheTTL, getter)
assert.NoError(t, err) assert.NoError(t, err)
val, err := cache.Get("key", cacheReadTypeDefault) val, err := cache.Get("key", CacheReadTypeDefault)
assert.Error(t, err) assert.Error(t, err)
assert.Equal(t, getError, err) assert.Equal(t, getError, err)
assert.Nil(t, val) assert.Nil(t, val)
@ -128,13 +128,13 @@ func TestCacheDelete(t *testing.T) {
dataSource, cache := newFakeCache(t) dataSource, cache := newFakeCache(t)
dataSource.set(data) dataSource.set(data)
v, err := cache.Get(key, cacheReadTypeDefault) v, err := cache.Get(key, CacheReadTypeDefault)
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, val, v, "cache should get correct data") assert.Equal(t, val, v, "cache should get correct data")
dataSource.set(nil) dataSource.set(nil)
cache.Delete(key) cache.Delete(key)
v, err = cache.Get(key, cacheReadTypeDefault) v, err = cache.Get(key, CacheReadTypeDefault)
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, 1, dataSource.called) assert.Equal(t, 1, dataSource.called)
assert.Equal(t, nil, v, "cache should get nil after data is removed") assert.Equal(t, nil, v, "cache should get nil after data is removed")
@ -149,13 +149,13 @@ func TestCacheExpired(t *testing.T) {
dataSource, cache := newFakeCache(t) dataSource, cache := newFakeCache(t)
dataSource.set(data) dataSource.set(data)
v, err := cache.Get(key, cacheReadTypeDefault) v, err := cache.Get(key, CacheReadTypeDefault)
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, 1, dataSource.called) assert.Equal(t, 1, dataSource.called)
assert.Equal(t, val, v, "cache should get correct data") assert.Equal(t, val, v, "cache should get correct data")
time.Sleep(fakeCacheTTL) time.Sleep(fakeCacheTTL)
v, err = cache.Get(key, cacheReadTypeDefault) v, err = cache.Get(key, CacheReadTypeDefault)
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, 2, dataSource.called) assert.Equal(t, 2, dataSource.called)
assert.Equal(t, val, v, "cache should get correct data even after expired") assert.Equal(t, val, v, "cache should get correct data even after expired")
@ -170,13 +170,13 @@ func TestCacheAllowUnsafeRead(t *testing.T) {
dataSource, cache := newFakeCache(t) dataSource, cache := newFakeCache(t)
dataSource.set(data) dataSource.set(data)
v, err := cache.Get(key, cacheReadTypeDefault) v, err := cache.Get(key, CacheReadTypeDefault)
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, 1, dataSource.called) assert.Equal(t, 1, dataSource.called)
assert.Equal(t, val, v, "cache should get correct data") assert.Equal(t, val, v, "cache should get correct data")
time.Sleep(fakeCacheTTL) time.Sleep(fakeCacheTTL)
v, err = cache.Get(key, cacheReadTypeUnsafe) v, err = cache.Get(key, CacheReadTypeUnsafe)
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, 1, dataSource.called) assert.Equal(t, 1, dataSource.called)
assert.Equal(t, val, v, "cache should return expired as allow unsafe read is allowed") assert.Equal(t, val, v, "cache should return expired as allow unsafe read is allowed")
@ -195,10 +195,10 @@ func TestCacheNoConcurrentGet(t *testing.T) {
var wg sync.WaitGroup var wg sync.WaitGroup
for i := 0; i < 5; i++ { for i := 0; i < 5; i++ {
wg.Add(1) wg.Add(1)
go cache.Get(key, cacheReadTypeDefault) go cache.Get(key, CacheReadTypeDefault)
wg.Done() wg.Done()
} }
v, err := cache.Get(key, cacheReadTypeDefault) v, err := cache.Get(key, CacheReadTypeDefault)
wg.Wait() wg.Wait()
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, 1, dataSource.called) assert.Equal(t, 1, dataSource.called)
@ -214,12 +214,12 @@ func TestCacheForceRefresh(t *testing.T) {
dataSource, cache := newFakeCache(t) dataSource, cache := newFakeCache(t)
dataSource.set(data) dataSource.set(data)
v, err := cache.Get(key, cacheReadTypeDefault) v, err := cache.Get(key, CacheReadTypeDefault)
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, 1, dataSource.called) assert.Equal(t, 1, dataSource.called)
assert.Equal(t, val, v, "cache should get correct data") assert.Equal(t, val, v, "cache should get correct data")
v, err = cache.Get(key, cacheReadTypeForceRefresh) v, err = cache.Get(key, CacheReadTypeForceRefresh)
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, 2, dataSource.called) assert.Equal(t, 2, dataSource.called)
assert.Equal(t, val, v, "should refetch unexpired data as forced refresh") assert.Equal(t, val, v, "should refetch unexpired data as forced refresh")

View File

@ -0,0 +1,20 @@
// +build !providerless
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package cache is an implementation of Azure caches.
package cache // import "k8s.io/legacy-cloud-providers/azure/cache"