Add UpdateVMs() for VMSS client to allow update multiple VMSSVMs by

sequential sync requests and concurent async requests.
This commit is contained in:
Pengfei Ni 2020-02-29 12:52:00 +00:00
parent 665c6648eb
commit 2af26dca97
7 changed files with 215 additions and 0 deletions

View File

@ -530,6 +530,10 @@ func (fVMC *fakeVirtualMachineScaleSetVMsClient) Update(ctx context.Context, res
return nil
}
func (fVMC *fakeVirtualMachineScaleSetVMsClient) UpdateVMs(ctx context.Context, resourceGroupName string, VMScaleSetName string, instances map[string]compute.VirtualMachineScaleSetVM, source string) *retry.Error {
return nil
}
type fakeVirtualMachineScaleSetsClient struct {
mutex *sync.Mutex
FakeStore map[string]map[string]compute.VirtualMachineScaleSet

View File

@ -26,6 +26,7 @@ import (
"io/ioutil"
"net/http"
"strings"
"sync"
"time"
"unicode"
@ -335,6 +336,86 @@ func (c *Client) PutResource(ctx context.Context, resourceID string, parameters
return c.PutResourceWithDecorators(ctx, resourceID, parameters, putDecorators)
}
// PutResources puts a list of resources from resources map[resourceID]parameters.
// Those resources sync requests are sequential while async requests are concurent. It 's especially
// useful when the ARM API doesn't support concurrent requests.
func (c *Client) PutResources(ctx context.Context, resources map[string]interface{}) map[string]*PutResourcesResponse {
if len(resources) == 0 {
return nil
}
// Sequential sync requests.
futures := make(map[string]*azure.Future)
responses := make(map[string]*PutResourcesResponse)
for resourceID, parameters := range resources {
decorators := []autorest.PrepareDecorator{
autorest.WithPathParameters("{resourceID}", map[string]interface{}{"resourceID": resourceID}),
autorest.WithJSON(parameters),
}
request, err := c.PreparePutRequest(ctx, decorators...)
if err != nil {
klog.V(5).Infof("Received error in %s: resourceID: %s, error: %s", "put.prepare", resourceID, err)
responses[resourceID] = &PutResourcesResponse{
Error: retry.NewError(false, err),
}
continue
}
future, resp, clientErr := c.SendAsync(ctx, request)
defer c.CloseResponse(ctx, resp)
if clientErr != nil {
klog.V(5).Infof("Received error in %s: resourceID: %s, error: %s", "put.send", resourceID, clientErr.Error())
responses[resourceID] = &PutResourcesResponse{
Error: clientErr,
}
continue
}
futures[resourceID] = future
}
// Concurrent async requests.
wg := sync.WaitGroup{}
var responseLock sync.Mutex
for resourceID, future := range futures {
wg.Add(1)
go func(resourceID string, future *azure.Future) {
defer wg.Done()
response, err := c.WaitForAsyncOperationResult(ctx, future, "armclient.PutResource")
if err != nil {
if response != nil {
klog.V(5).Infof("Received error in WaitForAsyncOperationResult: '%s', response code %d", err.Error(), response.StatusCode)
} else {
klog.V(5).Infof("Received error in WaitForAsyncOperationResult: '%s', no response", err.Error())
}
retriableErr := retry.GetError(response, err)
if !retriableErr.Retriable &&
strings.Contains(strings.ToUpper(err.Error()), strings.ToUpper("InternalServerError")) {
klog.V(5).Infof("Received InternalServerError in WaitForAsyncOperationResult: '%s', setting error retriable", err.Error())
retriableErr.Retriable = true
}
responseLock.Lock()
responses[resourceID] = &PutResourcesResponse{
Error: retriableErr,
}
responseLock.Unlock()
return
}
responseLock.Lock()
responses[resourceID] = &PutResourcesResponse{
Response: response,
}
responseLock.Unlock()
}(resourceID, future)
}
wg.Wait()
return responses
}
// PutResourceWithDecorators puts a resource by resource ID
func (c *Client) PutResourceWithDecorators(ctx context.Context, resourceID string, parameters interface{}, decorators []autorest.PrepareDecorator) (*http.Response, *retry.Error) {
request, err := c.PreparePutRequest(ctx, decorators...)

View File

@ -27,6 +27,12 @@ import (
"k8s.io/legacy-cloud-providers/azure/retry"
)
// PutResourcesResponse defines the response for PutResources.
type PutResourcesResponse struct {
Response *http.Response
Error *retry.Error
}
// Interface is the client interface for ARM.
// Don't forget to run the following command to generate the mock client:
// mockgen -source=$GOPATH/src/k8s.io/kubernetes/staging/src/k8s.io/legacy-cloud-providers/azure/clients/armclient/interface.go -package=mockarmclient Interface > $GOPATH/src/k8s.io/kubernetes/staging/src/k8s.io/legacy-cloud-providers/azure/clients/armclient/mockarmclient/interface.go
@ -61,6 +67,11 @@ type Interface interface {
// PutResource puts a resource by resource ID
PutResource(ctx context.Context, resourceID string, parameters interface{}) (*http.Response, *retry.Error)
// PutResources puts a list of resources from resources map[resourceID]parameters.
// Those resources sync requests are sequential while async requests are concurent. It 's especially
// useful when the ARM API doesn't support concurrent requests.
PutResources(ctx context.Context, resources map[string]interface{}) map[string]*PutResourcesResponse
// PutResourceWithDecorators puts a resource with decorators by resource ID
PutResourceWithDecorators(ctx context.Context, resourceID string, parameters interface{}, decorators []autorest.PrepareDecorator) (*http.Response, *retry.Error)

View File

@ -26,6 +26,7 @@ import (
autorest "github.com/Azure/go-autorest/autorest"
azure "github.com/Azure/go-autorest/autorest/azure"
gomock "github.com/golang/mock/gomock"
armclient "k8s.io/legacy-cloud-providers/azure/clients/armclient"
retry "k8s.io/legacy-cloud-providers/azure/retry"
)
@ -227,6 +228,20 @@ func (mr *MockInterfaceMockRecorder) PutResource(ctx, resourceID, parameters int
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PutResource", reflect.TypeOf((*MockInterface)(nil).PutResource), ctx, resourceID, parameters)
}
// PutResources mocks base method
func (m *MockInterface) PutResources(ctx context.Context, resources map[string]interface{}) map[string]*armclient.PutResourcesResponse {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "PutResources", ctx, resources)
ret0, _ := ret[0].(map[string]*armclient.PutResourcesResponse)
return ret0
}
// PutResources indicates an expected call of PutResources
func (mr *MockInterfaceMockRecorder) PutResources(ctx, resources interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PutResources", reflect.TypeOf((*MockInterface)(nil).PutResources), ctx, resources)
}
// PutResourceWithDecorators mocks base method
func (m *MockInterface) PutResourceWithDecorators(ctx context.Context, resourceID string, parameters interface{}, decorators []autorest.PrepareDecorator) (*http.Response, *retry.Error) {
m.ctrl.T.Helper()

View File

@ -29,6 +29,7 @@ import (
"github.com/Azure/go-autorest/autorest/azure"
"github.com/Azure/go-autorest/autorest/to"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/client-go/util/flowcontrol"
"k8s.io/klog"
azclients "k8s.io/legacy-cloud-providers/azure/clients"
@ -367,3 +368,89 @@ func (page VirtualMachineScaleSetVMListResultPage) Values() []compute.VirtualMac
}
return *page.vmssvlr.Value
}
// UpdateVMs updates a list of VirtualMachineScaleSetVM from map[instanceID]compute.VirtualMachineScaleSetVM.
func (c *Client) UpdateVMs(ctx context.Context, resourceGroupName string, VMScaleSetName string, instances map[string]compute.VirtualMachineScaleSetVM, source string) *retry.Error {
mc := metrics.NewMetricContext("vmssvm", "update_vms", resourceGroupName, c.subscriptionID, source)
// Report errors if the client is rate limited.
if !c.rateLimiterWriter.TryAccept() {
mc.RateLimitedCount()
return retry.GetRateLimitError(true, "VMSSVMUpdateVMs")
}
// Report errors if the client is throttled.
if c.RetryAfterWriter.After(time.Now()) {
mc.ThrottledCount()
rerr := retry.GetThrottlingError("VMSSVMUpdateVMs", "client throttled", c.RetryAfterWriter)
return rerr
}
rerr := c.updateVMSSVMs(ctx, resourceGroupName, VMScaleSetName, instances)
mc.Observe(rerr.Error())
if rerr != nil {
if rerr.IsThrottled() {
// Update RetryAfterReader so that no more requests would be sent until RetryAfter expires.
c.RetryAfterWriter = rerr.RetryAfter
}
return rerr
}
return nil
}
// updateVMSSVMs updates a list of VirtualMachineScaleSetVM from map[instanceID]compute.VirtualMachineScaleSetVM.
func (c *Client) updateVMSSVMs(ctx context.Context, resourceGroupName string, VMScaleSetName string, instances map[string]compute.VirtualMachineScaleSetVM) *retry.Error {
resources := make(map[string]interface{})
for instanceID, parameter := range instances {
resourceID := armclient.GetChildResourceID(
c.subscriptionID,
resourceGroupName,
"Microsoft.Compute/virtualMachineScaleSets",
VMScaleSetName,
"virtualMachines",
instanceID,
)
resources[resourceID] = parameter
}
responses := c.armClient.PutResources(ctx, resources)
errors := make([]*retry.Error, 0)
for resourceID, resp := range responses {
if resp == nil {
continue
}
defer c.armClient.CloseResponse(ctx, resp.Response)
if resp.Error != nil {
klog.V(5).Infof("Received error in %s: resourceID: %s, error: %s", "vmssvm.put.request", resourceID, resp.Error.Error())
errors = append(errors, resp.Error)
continue
}
if resp.Response != nil && resp.Response.StatusCode != http.StatusNoContent {
_, rerr := c.updateResponder(resp.Response)
if rerr != nil {
klog.V(5).Infof("Received error in %s: resourceID: %s, error: %s", "vmssvm.put.respond", resourceID, rerr.Error())
errors = append(errors, rerr)
}
}
}
// Aggregate errors.
if len(errors) > 0 {
rerr := &retry.Error{}
errs := make([]error, 0)
for _, err := range errors {
if err.IsThrottled() && err.RetryAfter.After(err.RetryAfter) {
rerr.RetryAfter = err.RetryAfter
}
errs = append(errs, err.Error())
}
rerr.RawError = utilerrors.Flatten(utilerrors.NewAggregate(errs))
return rerr
}
return nil
}

View File

@ -42,4 +42,7 @@ type Interface interface {
// Update updates a VirtualMachineScaleSetVM.
Update(ctx context.Context, resourceGroupName string, VMScaleSetName string, instanceID string, parameters compute.VirtualMachineScaleSetVM, source string) *retry.Error
// UpdateVMs updates a list of VirtualMachineScaleSetVM from map[instanceID]compute.VirtualMachineScaleSetVM.
UpdateVMs(ctx context.Context, resourceGroupName string, VMScaleSetName string, instances map[string]compute.VirtualMachineScaleSetVM, source string) *retry.Error
}

View File

@ -93,3 +93,17 @@ func (mr *MockInterfaceMockRecorder) Update(ctx, resourceGroupName, VMScaleSetNa
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Update", reflect.TypeOf((*MockInterface)(nil).Update), ctx, resourceGroupName, VMScaleSetName, instanceID, parameters, source)
}
// UpdateVMs mocks base method
func (m *MockInterface) UpdateVMs(ctx context.Context, resourceGroupName, VMScaleSetName string, instances map[string]compute.VirtualMachineScaleSetVM, source string) *retry.Error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "UpdateVMs", ctx, resourceGroupName, VMScaleSetName, instances, source)
ret0, _ := ret[0].(*retry.Error)
return ret0
}
// UpdateVMs indicates an expected call of UpdateVMs
func (mr *MockInterfaceMockRecorder) UpdateVMs(ctx, resourceGroupName, VMScaleSetName, instances, source interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateVMs", reflect.TypeOf((*MockInterface)(nil).UpdateVMs), ctx, resourceGroupName, VMScaleSetName, instances, source)
}