mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 03:41:45 +00:00
Merge pull request #88094 from aramase/vm-instance-update
add delays between goroutines for vm instance update
This commit is contained in:
commit
d3a10e132b
@ -21,6 +21,9 @@ package azure
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
utilerrors "k8s.io/apimachinery/pkg/util/errors"
|
||||
)
|
||||
|
||||
// lockMap used to lock on entries
|
||||
@ -74,3 +77,21 @@ func (lm *lockMap) unlockEntry(entry string) {
|
||||
func getContextWithCancel() (context.Context, context.CancelFunc) {
|
||||
return context.WithCancel(context.Background())
|
||||
}
|
||||
|
||||
// aggregateGoroutinesWithDelay aggregates goroutines and runs them
|
||||
// in parallel with delay before starting each goroutine
|
||||
func aggregateGoroutinesWithDelay(delay time.Duration, funcs ...func() error) utilerrors.Aggregate {
|
||||
errChan := make(chan error, len(funcs))
|
||||
|
||||
for _, f := range funcs {
|
||||
go func(f func() error) { errChan <- f() }(f)
|
||||
time.Sleep(delay)
|
||||
}
|
||||
errs := make([]error, 0)
|
||||
for i := 0; i < cap(errChan); i++ {
|
||||
if err := <-errChan; err != nil {
|
||||
errs = append(errs, err)
|
||||
}
|
||||
}
|
||||
return utilerrors.NewAggregate(errs)
|
||||
}
|
||||
|
@ -19,6 +19,7 @@ limitations under the License.
|
||||
package azure
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
@ -83,3 +84,67 @@ func ensureNoCallback(t *testing.T, callbackChan <-chan interface{}) bool {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
// running same unit tests as https://github.com/kubernetes/apimachinery/blob/master/pkg/util/errors/errors_test.go#L371
|
||||
func TestAggregateGoroutinesWithDelay(t *testing.T) {
|
||||
testCases := []struct {
|
||||
errs []error
|
||||
expected map[string]bool
|
||||
}{
|
||||
{
|
||||
[]error{},
|
||||
nil,
|
||||
},
|
||||
{
|
||||
[]error{nil},
|
||||
nil,
|
||||
},
|
||||
{
|
||||
[]error{nil, nil},
|
||||
nil,
|
||||
},
|
||||
{
|
||||
[]error{fmt.Errorf("1")},
|
||||
map[string]bool{"1": true},
|
||||
},
|
||||
{
|
||||
[]error{fmt.Errorf("1"), nil},
|
||||
map[string]bool{"1": true},
|
||||
},
|
||||
{
|
||||
[]error{fmt.Errorf("1"), fmt.Errorf("267")},
|
||||
map[string]bool{"1": true, "267": true},
|
||||
},
|
||||
{
|
||||
[]error{fmt.Errorf("1"), nil, fmt.Errorf("1234")},
|
||||
map[string]bool{"1": true, "1234": true},
|
||||
},
|
||||
{
|
||||
[]error{nil, fmt.Errorf("1"), nil, fmt.Errorf("1234"), fmt.Errorf("22")},
|
||||
map[string]bool{"1": true, "1234": true, "22": true},
|
||||
},
|
||||
}
|
||||
for i, testCase := range testCases {
|
||||
funcs := make([]func() error, len(testCase.errs))
|
||||
for i := range testCase.errs {
|
||||
err := testCase.errs[i]
|
||||
funcs[i] = func() error { return err }
|
||||
}
|
||||
agg := aggregateGoroutinesWithDelay(100*time.Millisecond, funcs...)
|
||||
if agg == nil {
|
||||
if len(testCase.expected) > 0 {
|
||||
t.Errorf("%d: expected %v, got nil", i, testCase.expected)
|
||||
}
|
||||
continue
|
||||
}
|
||||
if len(agg.Errors()) != len(testCase.expected) {
|
||||
t.Errorf("%d: expected %d errors in aggregate, got %v", i, len(testCase.expected), agg)
|
||||
continue
|
||||
}
|
||||
for _, err := range agg.Errors() {
|
||||
if !testCase.expected[err.Error()] {
|
||||
t.Errorf("%d: expected %v, got aggregate containing %v", i, testCase.expected, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -26,6 +26,7 @@ import (
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2019-07-01/compute"
|
||||
"github.com/Azure/azure-sdk-for-go/services/network/mgmt/2019-06-01/network"
|
||||
@ -55,6 +56,13 @@ var (
|
||||
vmssVMProviderIDRE = regexp.MustCompile(`azure:///subscriptions/(?:.*)/resourceGroups/(.+)/providers/Microsoft.Compute/virtualMachineScaleSets/(.+)/virtualMachines/(?:\d+)`)
|
||||
)
|
||||
|
||||
const (
|
||||
// vmssVMInstanceUpdateDelay is used when updating multiple vm instances in parallel
|
||||
// the optimum value is 3s to prevent any conflicts that result in concurrent vmss vm
|
||||
// instances update
|
||||
vmssVMInstanceUpdateDelay = 3 * time.Second
|
||||
)
|
||||
|
||||
// scaleSet implements VMSet interface for Azure scale set.
|
||||
type scaleSet struct {
|
||||
*Cloud
|
||||
@ -1082,7 +1090,7 @@ func (ss *scaleSet) EnsureHostsInPool(service *v1.Service, nodes []*v1.Node, bac
|
||||
hostUpdates = append(hostUpdates, f)
|
||||
}
|
||||
|
||||
errs := utilerrors.AggregateGoroutines(hostUpdates...)
|
||||
errs := aggregateGoroutinesWithDelay(vmssVMInstanceUpdateDelay, hostUpdates...)
|
||||
if errs != nil {
|
||||
return utilerrors.Flatten(errs)
|
||||
}
|
||||
@ -1355,7 +1363,7 @@ func (ss *scaleSet) EnsureBackendPoolDeleted(service *v1.Service, backendPoolID,
|
||||
hostUpdates = append(hostUpdates, f)
|
||||
}
|
||||
|
||||
errs := utilerrors.AggregateGoroutines(hostUpdates...)
|
||||
errs := aggregateGoroutinesWithDelay(vmssVMInstanceUpdateDelay, hostUpdates...)
|
||||
if errs != nil {
|
||||
return utilerrors.Flatten(errs)
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user