From fdefdff2225e4590957ef9ec6cdf269826e381d8 Mon Sep 17 00:00:00 2001 From: Anish Ramasekar Date: Wed, 12 Feb 2020 11:53:22 -0800 Subject: [PATCH] add delays between goroutines for vm instance update --- .../azure/azure_utils.go | 21 ++++++ .../azure/azure_utils_test.go | 65 +++++++++++++++++++ .../azure/azure_vmss.go | 12 +++- 3 files changed, 96 insertions(+), 2 deletions(-) diff --git a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_utils.go b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_utils.go index 78ca666bddd..13657de5c2a 100644 --- a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_utils.go +++ b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_utils.go @@ -21,6 +21,9 @@ package azure import ( "context" "sync" + "time" + + utilerrors "k8s.io/apimachinery/pkg/util/errors" ) // lockMap used to lock on entries @@ -74,3 +77,21 @@ func (lm *lockMap) unlockEntry(entry string) { func getContextWithCancel() (context.Context, context.CancelFunc) { return context.WithCancel(context.Background()) } + +// aggregateGoroutinesWithDelay aggregates goroutines and runs them +// in parallel with delay before starting each goroutine +func aggregateGoroutinesWithDelay(delay time.Duration, funcs ...func() error) utilerrors.Aggregate { + errChan := make(chan error, len(funcs)) + + for _, f := range funcs { + go func(f func() error) { errChan <- f() }(f) + time.Sleep(delay) + } + errs := make([]error, 0) + for i := 0; i < cap(errChan); i++ { + if err := <-errChan; err != nil { + errs = append(errs, err) + } + } + return utilerrors.NewAggregate(errs) +} diff --git a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_utils_test.go b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_utils_test.go index cb527f37dd2..fedca6bd3e5 100644 --- a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_utils_test.go +++ b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_utils_test.go @@ -19,6 +19,7 @@ limitations under the License. package azure import ( + "fmt" "testing" "time" ) @@ -83,3 +84,67 @@ func ensureNoCallback(t *testing.T, callbackChan <-chan interface{}) bool { return true } } + +// running same unit tests as https://github.com/kubernetes/apimachinery/blob/master/pkg/util/errors/errors_test.go#L371 +func TestAggregateGoroutinesWithDelay(t *testing.T) { + testCases := []struct { + errs []error + expected map[string]bool + }{ + { + []error{}, + nil, + }, + { + []error{nil}, + nil, + }, + { + []error{nil, nil}, + nil, + }, + { + []error{fmt.Errorf("1")}, + map[string]bool{"1": true}, + }, + { + []error{fmt.Errorf("1"), nil}, + map[string]bool{"1": true}, + }, + { + []error{fmt.Errorf("1"), fmt.Errorf("267")}, + map[string]bool{"1": true, "267": true}, + }, + { + []error{fmt.Errorf("1"), nil, fmt.Errorf("1234")}, + map[string]bool{"1": true, "1234": true}, + }, + { + []error{nil, fmt.Errorf("1"), nil, fmt.Errorf("1234"), fmt.Errorf("22")}, + map[string]bool{"1": true, "1234": true, "22": true}, + }, + } + for i, testCase := range testCases { + funcs := make([]func() error, len(testCase.errs)) + for i := range testCase.errs { + err := testCase.errs[i] + funcs[i] = func() error { return err } + } + agg := aggregateGoroutinesWithDelay(100*time.Millisecond, funcs...) + if agg == nil { + if len(testCase.expected) > 0 { + t.Errorf("%d: expected %v, got nil", i, testCase.expected) + } + continue + } + if len(agg.Errors()) != len(testCase.expected) { + t.Errorf("%d: expected %d errors in aggregate, got %v", i, len(testCase.expected), agg) + continue + } + for _, err := range agg.Errors() { + if !testCase.expected[err.Error()] { + t.Errorf("%d: expected %v, got aggregate containing %v", i, testCase.expected, err) + } + } + } +} diff --git a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_vmss.go b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_vmss.go index 4a874be758e..09623870e93 100644 --- a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_vmss.go +++ b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_vmss.go @@ -26,6 +26,7 @@ import ( "strconv" "strings" "sync" + "time" "github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2019-07-01/compute" "github.com/Azure/azure-sdk-for-go/services/network/mgmt/2019-06-01/network" @@ -54,6 +55,13 @@ var ( vmssVMProviderIDRE = regexp.MustCompile(`azure:///subscriptions/(?:.*)/resourceGroups/(.+)/providers/Microsoft.Compute/virtualMachineScaleSets/(.+)/virtualMachines/(?:\d+)`) ) +const ( + // vmssVMInstanceUpdateDelay is used when updating multiple vm instances in parallel + // the optimum value is 3s to prevent any conflicts that result in concurrent vmss vm + // instances update + vmssVMInstanceUpdateDelay = 3 * time.Second +) + // scaleSet implements VMSet interface for Azure scale set. type scaleSet struct { *Cloud @@ -1081,7 +1089,7 @@ func (ss *scaleSet) EnsureHostsInPool(service *v1.Service, nodes []*v1.Node, bac hostUpdates = append(hostUpdates, f) } - errs := utilerrors.AggregateGoroutines(hostUpdates...) + errs := aggregateGoroutinesWithDelay(vmssVMInstanceUpdateDelay, hostUpdates...) if errs != nil { return utilerrors.Flatten(errs) } @@ -1354,7 +1362,7 @@ func (ss *scaleSet) EnsureBackendPoolDeleted(service *v1.Service, backendPoolID, hostUpdates = append(hostUpdates, f) } - errs := utilerrors.AggregateGoroutines(hostUpdates...) + errs := aggregateGoroutinesWithDelay(vmssVMInstanceUpdateDelay, hostUpdates...) if errs != nil { return utilerrors.Flatten(errs) }