diff --git a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_fakes.go b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_fakes.go index 6c29b68620c..696a0beabc6 100644 --- a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_fakes.go +++ b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_fakes.go @@ -530,6 +530,10 @@ func (fVMC *fakeVirtualMachineScaleSetVMsClient) Update(ctx context.Context, res return nil } +func (fVMC *fakeVirtualMachineScaleSetVMsClient) UpdateVMs(ctx context.Context, resourceGroupName string, VMScaleSetName string, instances map[string]compute.VirtualMachineScaleSetVM, source string) *retry.Error { + return nil +} + type fakeVirtualMachineScaleSetsClient struct { mutex *sync.Mutex FakeStore map[string]map[string]compute.VirtualMachineScaleSet diff --git a/staging/src/k8s.io/legacy-cloud-providers/azure/clients/armclient/azure_armclient.go b/staging/src/k8s.io/legacy-cloud-providers/azure/clients/armclient/azure_armclient.go index 90a78299ef2..036f0651fc5 100644 --- a/staging/src/k8s.io/legacy-cloud-providers/azure/clients/armclient/azure_armclient.go +++ b/staging/src/k8s.io/legacy-cloud-providers/azure/clients/armclient/azure_armclient.go @@ -26,6 +26,7 @@ import ( "io/ioutil" "net/http" "strings" + "sync" "time" "unicode" @@ -335,6 +336,86 @@ func (c *Client) PutResource(ctx context.Context, resourceID string, parameters return c.PutResourceWithDecorators(ctx, resourceID, parameters, putDecorators) } +// PutResources puts a list of resources from resources map[resourceID]parameters. +// Those resources sync requests are sequential while async requests are concurent. It 's especially +// useful when the ARM API doesn't support concurrent requests. +func (c *Client) PutResources(ctx context.Context, resources map[string]interface{}) map[string]*PutResourcesResponse { + if len(resources) == 0 { + return nil + } + + // Sequential sync requests. + futures := make(map[string]*azure.Future) + responses := make(map[string]*PutResourcesResponse) + for resourceID, parameters := range resources { + decorators := []autorest.PrepareDecorator{ + autorest.WithPathParameters("{resourceID}", map[string]interface{}{"resourceID": resourceID}), + autorest.WithJSON(parameters), + } + request, err := c.PreparePutRequest(ctx, decorators...) + if err != nil { + klog.V(5).Infof("Received error in %s: resourceID: %s, error: %s", "put.prepare", resourceID, err) + responses[resourceID] = &PutResourcesResponse{ + Error: retry.NewError(false, err), + } + continue + } + + future, resp, clientErr := c.SendAsync(ctx, request) + defer c.CloseResponse(ctx, resp) + if clientErr != nil { + klog.V(5).Infof("Received error in %s: resourceID: %s, error: %s", "put.send", resourceID, clientErr.Error()) + responses[resourceID] = &PutResourcesResponse{ + Error: clientErr, + } + continue + } + + futures[resourceID] = future + } + + // Concurrent async requests. + wg := sync.WaitGroup{} + var responseLock sync.Mutex + for resourceID, future := range futures { + wg.Add(1) + go func(resourceID string, future *azure.Future) { + defer wg.Done() + response, err := c.WaitForAsyncOperationResult(ctx, future, "armclient.PutResource") + if err != nil { + if response != nil { + klog.V(5).Infof("Received error in WaitForAsyncOperationResult: '%s', response code %d", err.Error(), response.StatusCode) + } else { + klog.V(5).Infof("Received error in WaitForAsyncOperationResult: '%s', no response", err.Error()) + } + + retriableErr := retry.GetError(response, err) + if !retriableErr.Retriable && + strings.Contains(strings.ToUpper(err.Error()), strings.ToUpper("InternalServerError")) { + klog.V(5).Infof("Received InternalServerError in WaitForAsyncOperationResult: '%s', setting error retriable", err.Error()) + retriableErr.Retriable = true + } + + responseLock.Lock() + responses[resourceID] = &PutResourcesResponse{ + Error: retriableErr, + } + responseLock.Unlock() + return + } + + responseLock.Lock() + responses[resourceID] = &PutResourcesResponse{ + Response: response, + } + responseLock.Unlock() + }(resourceID, future) + } + + wg.Wait() + return responses +} + // PutResourceWithDecorators puts a resource by resource ID func (c *Client) PutResourceWithDecorators(ctx context.Context, resourceID string, parameters interface{}, decorators []autorest.PrepareDecorator) (*http.Response, *retry.Error) { request, err := c.PreparePutRequest(ctx, decorators...) diff --git a/staging/src/k8s.io/legacy-cloud-providers/azure/clients/armclient/interface.go b/staging/src/k8s.io/legacy-cloud-providers/azure/clients/armclient/interface.go index f2af8922c44..01195dee6b0 100644 --- a/staging/src/k8s.io/legacy-cloud-providers/azure/clients/armclient/interface.go +++ b/staging/src/k8s.io/legacy-cloud-providers/azure/clients/armclient/interface.go @@ -27,6 +27,12 @@ import ( "k8s.io/legacy-cloud-providers/azure/retry" ) +// PutResourcesResponse defines the response for PutResources. +type PutResourcesResponse struct { + Response *http.Response + Error *retry.Error +} + // Interface is the client interface for ARM. // Don't forget to run the following command to generate the mock client: // mockgen -source=$GOPATH/src/k8s.io/kubernetes/staging/src/k8s.io/legacy-cloud-providers/azure/clients/armclient/interface.go -package=mockarmclient Interface > $GOPATH/src/k8s.io/kubernetes/staging/src/k8s.io/legacy-cloud-providers/azure/clients/armclient/mockarmclient/interface.go @@ -61,6 +67,11 @@ type Interface interface { // PutResource puts a resource by resource ID PutResource(ctx context.Context, resourceID string, parameters interface{}) (*http.Response, *retry.Error) + // PutResources puts a list of resources from resources map[resourceID]parameters. + // Those resources sync requests are sequential while async requests are concurent. It 's especially + // useful when the ARM API doesn't support concurrent requests. + PutResources(ctx context.Context, resources map[string]interface{}) map[string]*PutResourcesResponse + // PutResourceWithDecorators puts a resource with decorators by resource ID PutResourceWithDecorators(ctx context.Context, resourceID string, parameters interface{}, decorators []autorest.PrepareDecorator) (*http.Response, *retry.Error) diff --git a/staging/src/k8s.io/legacy-cloud-providers/azure/clients/armclient/mockarmclient/interface.go b/staging/src/k8s.io/legacy-cloud-providers/azure/clients/armclient/mockarmclient/interface.go index 8f2e7daac6d..27d8a32877d 100644 --- a/staging/src/k8s.io/legacy-cloud-providers/azure/clients/armclient/mockarmclient/interface.go +++ b/staging/src/k8s.io/legacy-cloud-providers/azure/clients/armclient/mockarmclient/interface.go @@ -26,6 +26,7 @@ import ( autorest "github.com/Azure/go-autorest/autorest" azure "github.com/Azure/go-autorest/autorest/azure" gomock "github.com/golang/mock/gomock" + armclient "k8s.io/legacy-cloud-providers/azure/clients/armclient" retry "k8s.io/legacy-cloud-providers/azure/retry" ) @@ -227,6 +228,20 @@ func (mr *MockInterfaceMockRecorder) PutResource(ctx, resourceID, parameters int return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PutResource", reflect.TypeOf((*MockInterface)(nil).PutResource), ctx, resourceID, parameters) } +// PutResources mocks base method +func (m *MockInterface) PutResources(ctx context.Context, resources map[string]interface{}) map[string]*armclient.PutResourcesResponse { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "PutResources", ctx, resources) + ret0, _ := ret[0].(map[string]*armclient.PutResourcesResponse) + return ret0 +} + +// PutResources indicates an expected call of PutResources +func (mr *MockInterfaceMockRecorder) PutResources(ctx, resources interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PutResources", reflect.TypeOf((*MockInterface)(nil).PutResources), ctx, resources) +} + // PutResourceWithDecorators mocks base method func (m *MockInterface) PutResourceWithDecorators(ctx context.Context, resourceID string, parameters interface{}, decorators []autorest.PrepareDecorator) (*http.Response, *retry.Error) { m.ctrl.T.Helper() diff --git a/staging/src/k8s.io/legacy-cloud-providers/azure/clients/vmssvmclient/azure_vmssvmclient.go b/staging/src/k8s.io/legacy-cloud-providers/azure/clients/vmssvmclient/azure_vmssvmclient.go index e073b0c6e35..a087493c737 100644 --- a/staging/src/k8s.io/legacy-cloud-providers/azure/clients/vmssvmclient/azure_vmssvmclient.go +++ b/staging/src/k8s.io/legacy-cloud-providers/azure/clients/vmssvmclient/azure_vmssvmclient.go @@ -29,6 +29,7 @@ import ( "github.com/Azure/go-autorest/autorest/azure" "github.com/Azure/go-autorest/autorest/to" + utilerrors "k8s.io/apimachinery/pkg/util/errors" "k8s.io/client-go/util/flowcontrol" "k8s.io/klog" azclients "k8s.io/legacy-cloud-providers/azure/clients" @@ -367,3 +368,89 @@ func (page VirtualMachineScaleSetVMListResultPage) Values() []compute.VirtualMac } return *page.vmssvlr.Value } + +// UpdateVMs updates a list of VirtualMachineScaleSetVM from map[instanceID]compute.VirtualMachineScaleSetVM. +func (c *Client) UpdateVMs(ctx context.Context, resourceGroupName string, VMScaleSetName string, instances map[string]compute.VirtualMachineScaleSetVM, source string) *retry.Error { + mc := metrics.NewMetricContext("vmssvm", "update_vms", resourceGroupName, c.subscriptionID, source) + + // Report errors if the client is rate limited. + if !c.rateLimiterWriter.TryAccept() { + mc.RateLimitedCount() + return retry.GetRateLimitError(true, "VMSSVMUpdateVMs") + } + + // Report errors if the client is throttled. + if c.RetryAfterWriter.After(time.Now()) { + mc.ThrottledCount() + rerr := retry.GetThrottlingError("VMSSVMUpdateVMs", "client throttled", c.RetryAfterWriter) + return rerr + } + + rerr := c.updateVMSSVMs(ctx, resourceGroupName, VMScaleSetName, instances) + mc.Observe(rerr.Error()) + if rerr != nil { + if rerr.IsThrottled() { + // Update RetryAfterReader so that no more requests would be sent until RetryAfter expires. + c.RetryAfterWriter = rerr.RetryAfter + } + + return rerr + } + + return nil +} + +// updateVMSSVMs updates a list of VirtualMachineScaleSetVM from map[instanceID]compute.VirtualMachineScaleSetVM. +func (c *Client) updateVMSSVMs(ctx context.Context, resourceGroupName string, VMScaleSetName string, instances map[string]compute.VirtualMachineScaleSetVM) *retry.Error { + resources := make(map[string]interface{}) + for instanceID, parameter := range instances { + resourceID := armclient.GetChildResourceID( + c.subscriptionID, + resourceGroupName, + "Microsoft.Compute/virtualMachineScaleSets", + VMScaleSetName, + "virtualMachines", + instanceID, + ) + resources[resourceID] = parameter + } + + responses := c.armClient.PutResources(ctx, resources) + errors := make([]*retry.Error, 0) + for resourceID, resp := range responses { + if resp == nil { + continue + } + + defer c.armClient.CloseResponse(ctx, resp.Response) + if resp.Error != nil { + klog.V(5).Infof("Received error in %s: resourceID: %s, error: %s", "vmssvm.put.request", resourceID, resp.Error.Error()) + errors = append(errors, resp.Error) + continue + } + + if resp.Response != nil && resp.Response.StatusCode != http.StatusNoContent { + _, rerr := c.updateResponder(resp.Response) + if rerr != nil { + klog.V(5).Infof("Received error in %s: resourceID: %s, error: %s", "vmssvm.put.respond", resourceID, rerr.Error()) + errors = append(errors, rerr) + } + } + } + + // Aggregate errors. + if len(errors) > 0 { + rerr := &retry.Error{} + errs := make([]error, 0) + for _, err := range errors { + if err.IsThrottled() && err.RetryAfter.After(err.RetryAfter) { + rerr.RetryAfter = err.RetryAfter + } + errs = append(errs, err.Error()) + } + rerr.RawError = utilerrors.Flatten(utilerrors.NewAggregate(errs)) + return rerr + } + + return nil +} diff --git a/staging/src/k8s.io/legacy-cloud-providers/azure/clients/vmssvmclient/interface.go b/staging/src/k8s.io/legacy-cloud-providers/azure/clients/vmssvmclient/interface.go index e92c0fd3c78..19d84c35f10 100644 --- a/staging/src/k8s.io/legacy-cloud-providers/azure/clients/vmssvmclient/interface.go +++ b/staging/src/k8s.io/legacy-cloud-providers/azure/clients/vmssvmclient/interface.go @@ -42,4 +42,7 @@ type Interface interface { // Update updates a VirtualMachineScaleSetVM. Update(ctx context.Context, resourceGroupName string, VMScaleSetName string, instanceID string, parameters compute.VirtualMachineScaleSetVM, source string) *retry.Error + + // UpdateVMs updates a list of VirtualMachineScaleSetVM from map[instanceID]compute.VirtualMachineScaleSetVM. + UpdateVMs(ctx context.Context, resourceGroupName string, VMScaleSetName string, instances map[string]compute.VirtualMachineScaleSetVM, source string) *retry.Error } diff --git a/staging/src/k8s.io/legacy-cloud-providers/azure/clients/vmssvmclient/mockvmssvmclient/interface.go b/staging/src/k8s.io/legacy-cloud-providers/azure/clients/vmssvmclient/mockvmssvmclient/interface.go index 8258f462fda..442dc19580b 100644 --- a/staging/src/k8s.io/legacy-cloud-providers/azure/clients/vmssvmclient/mockvmssvmclient/interface.go +++ b/staging/src/k8s.io/legacy-cloud-providers/azure/clients/vmssvmclient/mockvmssvmclient/interface.go @@ -93,3 +93,17 @@ func (mr *MockInterfaceMockRecorder) Update(ctx, resourceGroupName, VMScaleSetNa mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Update", reflect.TypeOf((*MockInterface)(nil).Update), ctx, resourceGroupName, VMScaleSetName, instanceID, parameters, source) } + +// UpdateVMs mocks base method +func (m *MockInterface) UpdateVMs(ctx context.Context, resourceGroupName, VMScaleSetName string, instances map[string]compute.VirtualMachineScaleSetVM, source string) *retry.Error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "UpdateVMs", ctx, resourceGroupName, VMScaleSetName, instances, source) + ret0, _ := ret[0].(*retry.Error) + return ret0 +} + +// UpdateVMs indicates an expected call of UpdateVMs +func (mr *MockInterfaceMockRecorder) UpdateVMs(ctx, resourceGroupName, VMScaleSetName, instances, source interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateVMs", reflect.TypeOf((*MockInterface)(nil).UpdateVMs), ctx, resourceGroupName, VMScaleSetName, instances, source) +}