mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-30 06:54:01 +00:00
Add error handling and new tests
This commit is contained in:
parent
daec2bd745
commit
7634eacb4f
@ -249,29 +249,25 @@ func NewCloud(configReader io.Reader) (cloudprovider.Interface, error) {
|
||||
az.vmSet = newAvailabilitySet(&az)
|
||||
}
|
||||
|
||||
vmCache, err := az.newVMCache()
|
||||
az.vmCache, err = az.newVMCache()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
az.vmCache = vmCache
|
||||
|
||||
lbCache, err := az.newLBCache()
|
||||
az.lbCache, err = az.newLBCache()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
az.lbCache = lbCache
|
||||
|
||||
nsgCache, err := az.newNSGCache()
|
||||
az.nsgCache, err = az.newNSGCache()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
az.nsgCache = nsgCache
|
||||
|
||||
rtCache, err := az.newRouteTableCache()
|
||||
az.rtCache, err = az.newRouteTableCache()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
az.rtCache = rtCache
|
||||
|
||||
if err := initDiskControllers(&az); err != nil {
|
||||
return nil, err
|
||||
|
@ -134,7 +134,7 @@ func (az *Cloud) CreateOrUpdateSGWithRetry(sg network.SecurityGroup) error {
|
||||
done, err := processRetryResponse(resp.Response, err)
|
||||
if done && err == nil {
|
||||
// Invalidate the cache right after updating
|
||||
az.lbCache.Delete(*sg.Name)
|
||||
az.nsgCache.Delete(*sg.Name)
|
||||
}
|
||||
return done, err
|
||||
})
|
||||
|
@ -38,11 +38,7 @@ type cacheEntry struct {
|
||||
|
||||
// cacheKeyFunc defines the key function required in TTLStore.
|
||||
func cacheKeyFunc(obj interface{}) (string, error) {
|
||||
if entry, ok := obj.(*cacheEntry); ok {
|
||||
return entry.key, nil
|
||||
}
|
||||
|
||||
return "", fmt.Errorf("obj %q is not an object of cacheEntry", obj)
|
||||
return obj.(*cacheEntry).key, nil
|
||||
}
|
||||
|
||||
// timedCache is a cache with TTL.
|
||||
@ -107,25 +103,19 @@ func (t *timedCache) Get(key string) (interface{}, error) {
|
||||
entry.lock.Lock()
|
||||
defer entry.lock.Unlock()
|
||||
|
||||
data, err := t.getter(key)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if entry.data == nil {
|
||||
data, err := t.getter(key)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
entry.data = data
|
||||
entry.data = data
|
||||
}
|
||||
}
|
||||
|
||||
return entry.data, nil
|
||||
}
|
||||
|
||||
// Update sets an item in the cache to its updated state.
|
||||
func (t *timedCache) Update(key string, data interface{}) error {
|
||||
return t.store.Update(&cacheEntry{
|
||||
key: key,
|
||||
data: data,
|
||||
})
|
||||
}
|
||||
|
||||
// Delete removes an item from the cache.
|
||||
func (t *timedCache) Delete(key string) error {
|
||||
return t.store.Delete(&cacheEntry{
|
||||
|
@ -17,6 +17,7 @@ limitations under the License.
|
||||
package azure
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
@ -31,14 +32,16 @@ var (
|
||||
type fakeDataObj struct{}
|
||||
|
||||
type fakeDataSource struct {
|
||||
data map[string]*fakeDataObj
|
||||
lock sync.Mutex
|
||||
called int
|
||||
data map[string]*fakeDataObj
|
||||
lock sync.Mutex
|
||||
}
|
||||
|
||||
func (fake *fakeDataSource) get(key string) (interface{}, error) {
|
||||
fake.lock.Lock()
|
||||
defer fake.lock.Unlock()
|
||||
|
||||
fake.called = fake.called + 1
|
||||
if v, ok := fake.data[key]; ok {
|
||||
return v, nil
|
||||
}
|
||||
@ -51,6 +54,7 @@ func (fake *fakeDataSource) set(data map[string]*fakeDataObj) {
|
||||
defer fake.lock.Unlock()
|
||||
|
||||
fake.data = data
|
||||
fake.called = 0
|
||||
}
|
||||
|
||||
func newFakeCache(t *testing.T) (*fakeDataSource, *timedCache) {
|
||||
@ -99,6 +103,20 @@ func TestCacheGet(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestCacheGetError(t *testing.T) {
|
||||
getError := fmt.Errorf("getError")
|
||||
getter := func(key string) (interface{}, error) {
|
||||
return nil, getError
|
||||
}
|
||||
cache, err := newTimedcache(fakeCacheTTL, getter)
|
||||
assert.NoError(t, err)
|
||||
|
||||
val, err := cache.Get("key")
|
||||
assert.Error(t, err)
|
||||
assert.Equal(t, getError, err)
|
||||
assert.Nil(t, val)
|
||||
}
|
||||
|
||||
func TestCacheDelete(t *testing.T) {
|
||||
key := "key1"
|
||||
val := &fakeDataObj{}
|
||||
@ -116,6 +134,7 @@ func TestCacheDelete(t *testing.T) {
|
||||
cache.Delete(key)
|
||||
v, err = cache.Get(key)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, 1, dataSource.called)
|
||||
assert.Equal(t, nil, v, "cache should get nil after data is removed")
|
||||
}
|
||||
|
||||
@ -130,10 +149,12 @@ func TestCacheExpired(t *testing.T) {
|
||||
|
||||
v, err := cache.Get(key)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, 1, dataSource.called)
|
||||
assert.Equal(t, val, v, "cache should get correct data")
|
||||
|
||||
time.Sleep(fakeCacheTTL)
|
||||
v, err = cache.Get(key)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, 2, dataSource.called)
|
||||
assert.Equal(t, val, v, "cache should get correct data even after expired")
|
||||
}
|
||||
|
@ -167,6 +167,12 @@ func (az *Cloud) getSecurityGroup() (nsg network.SecurityGroup, err error) {
|
||||
|
||||
func (az *Cloud) newVMCache() (*timedCache, error) {
|
||||
getter := func(key string) (interface{}, error) {
|
||||
// Currently InstanceView request are used by azure_zones, while the calls come after non-InstanceView
|
||||
// request. If we first send an InstanceView request and then a non InstanceView request, the second
|
||||
// request will still hit throttling. This is what happens now for cloud controller manager: In this
|
||||
// case we do get instance view every time to fulfill the azure_zones requirement without hitting
|
||||
// throttling.
|
||||
// Consider adding separate parameter for controlling 'InstanceView' once node update issue #56276 is fixed
|
||||
vm, err := az.VirtualMachinesClient.Get(az.ResourceGroup, key, compute.InstanceView)
|
||||
exists, realErr := checkResourceExistsFromError(err)
|
||||
if realErr != nil {
|
||||
|
Loading…
Reference in New Issue
Block a user