From 8ec193ba9cebc4d7e3a4cc36f2a170c781dbf5a4 Mon Sep 17 00:00:00 2001 From: wojtekt Date: Thu, 6 Feb 2020 10:04:38 +0100 Subject: [PATCH] Limit number of instances in single update to GCE target pool --- .../k8s.io/legacy-cloud-providers/gce/gce.go | 5 +- .../gce/gce_loadbalancer_external.go | 24 ++++++++-- .../gce/gce_loadbalancer_external_test.go | 47 +++++++++++++++++++ .../legacy-cloud-providers/gce/gce_util.go | 29 ++++++++++++ 4 files changed, 99 insertions(+), 6 deletions(-) diff --git a/staging/src/k8s.io/legacy-cloud-providers/gce/gce.go b/staging/src/k8s.io/legacy-cloud-providers/gce/gce.go index 70f46dc3a40..c4c33e446ba 100644 --- a/staging/src/k8s.io/legacy-cloud-providers/gce/gce.go +++ b/staging/src/k8s.io/legacy-cloud-providers/gce/gce.go @@ -68,8 +68,9 @@ const ( // AffinityTypeClientIP - affinity based on Client IP. gceAffinityTypeClientIP = "CLIENT_IP" - operationPollInterval = time.Second - maxTargetPoolCreateInstances = 200 + operationPollInterval = time.Second + maxTargetPoolCreateInstances = 200 + maxInstancesPerTargetPoolUpdate = 1000 // HTTP Load Balancer parameters // Configure 8 second period for external health checks. diff --git a/staging/src/k8s.io/legacy-cloud-providers/gce/gce_loadbalancer_external.go b/staging/src/k8s.io/legacy-cloud-providers/gce/gce_loadbalancer_external.go index 7fb9ce22c7a..331903082de 100644 --- a/staging/src/k8s.io/legacy-cloud-providers/gce/gce_loadbalancer_external.go +++ b/staging/src/k8s.io/legacy-cloud-providers/gce/gce_loadbalancer_external.go @@ -591,16 +591,32 @@ func (g *Cloud) updateTargetPool(loadBalancerName string, hosts []*gceInstance) toRemove = append(toRemove, &compute.InstanceReference{Instance: link}) } - if len(toAdd) > 0 { - if err := g.AddInstancesToTargetPool(loadBalancerName, g.region, toAdd); err != nil { + for len(toAdd) > 0 { + // Do not remove more than maxInstancesPerTargetPoolUpdate in a single call. + instancesCount := len(toAdd) + if instancesCount > maxInstancesPerTargetPoolUpdate { + instancesCount = maxInstancesPerTargetPoolUpdate + } + // The operation to add 1000 instances is fairly long (may take minutes), so + // we don't need to worry about saturating QPS limits. + if err := g.AddInstancesToTargetPool(loadBalancerName, g.region, toAdd[:instancesCount]); err != nil { return err } + toAdd = toAdd[instancesCount:] } - if len(toRemove) > 0 { - if err := g.RemoveInstancesFromTargetPool(loadBalancerName, g.region, toRemove); err != nil { + for len(toRemove) > 0 { + // Do not remove more than maxInstancesPerTargetPoolUpdate in a single call. + instancesCount := len(toRemove) + if instancesCount > maxInstancesPerTargetPoolUpdate { + instancesCount = maxInstancesPerTargetPoolUpdate + } + // The operation to remove 1000 instances is fairly long (may take minutes), so + // we don't need to worry about saturating QPS limits. + if err := g.RemoveInstancesFromTargetPool(loadBalancerName, g.region, toRemove[:instancesCount]); err != nil { return err } + toRemove = toRemove[instancesCount:] } // Try to verify that the correct number of nodes are now in the target pool. diff --git a/staging/src/k8s.io/legacy-cloud-providers/gce/gce_loadbalancer_external_test.go b/staging/src/k8s.io/legacy-cloud-providers/gce/gce_loadbalancer_external_test.go index ef50ca68a63..268d664d134 100644 --- a/staging/src/k8s.io/legacy-cloud-providers/gce/gce_loadbalancer_external_test.go +++ b/staging/src/k8s.io/legacy-cloud-providers/gce/gce_loadbalancer_external_test.go @@ -568,6 +568,53 @@ func TestForwardingRuleNeedsUpdate(t *testing.T) { } } +func TestTargetPoolAddsAndRemoveInstancesInBatches(t *testing.T) { + t.Parallel() + + vals := DefaultTestClusterValues() + gce, err := fakeGCECloud(DefaultTestClusterValues()) + require.NoError(t, err) + + addInstanceCalls := 0 + addInstanceHook := func(req *compute.TargetPoolsAddInstanceRequest) { + addInstanceCalls++ + } + removeInstanceCalls := 0 + removeInstanceHook := func(req *compute.TargetPoolsRemoveInstanceRequest) { + removeInstanceCalls++ + } + + err = registerTargetPoolAddInstanceHook(gce, addInstanceHook) + assert.NoError(t, err) + err = registerTargetPoolRemoveInstanceHook(gce, removeInstanceHook) + assert.NoError(t, err) + + svc := fakeLoadbalancerService("") + nodeName := "default-node" + _, err = createExternalLoadBalancer(gce, svc, []string{nodeName}, vals.ClusterName, vals.ClusterID, vals.ZoneName) + assert.NoError(t, err) + + // Insert large number of nodes to test batching. + additionalNodeNames := []string{} + for i := 0; i < 2*maxInstancesPerTargetPoolUpdate+2; i++ { + additionalNodeNames = append(additionalNodeNames, fmt.Sprintf("node-%d", i)) + } + allNodes, err := createAndInsertNodes(gce, append([]string{nodeName}, additionalNodeNames...), vals.ZoneName) + assert.NoError(t, err) + err = gce.updateExternalLoadBalancer("", svc, allNodes) + assert.NoError(t, err) + + assert.Equal(t, 3, addInstanceCalls) + + // Remove large number of nodes to test batching. + allNodes, err = createAndInsertNodes(gce, []string{nodeName}, vals.ZoneName) + assert.NoError(t, err) + err = gce.updateExternalLoadBalancer("", svc, allNodes) + assert.NoError(t, err) + + assert.Equal(t, 3, removeInstanceCalls) +} + func TestTargetPoolNeedsRecreation(t *testing.T) { t.Parallel() diff --git a/staging/src/k8s.io/legacy-cloud-providers/gce/gce_util.go b/staging/src/k8s.io/legacy-cloud-providers/gce/gce_util.go index 746b00c54cf..48ad9fe88b0 100644 --- a/staging/src/k8s.io/legacy-cloud-providers/gce/gce_util.go +++ b/staging/src/k8s.io/legacy-cloud-providers/gce/gce_util.go @@ -19,6 +19,7 @@ limitations under the License. package gce import ( + "context" "errors" "fmt" "net" @@ -81,6 +82,34 @@ func fakeGCECloud(vals TestClusterValues) (*Cloud, error) { return gce, nil } +func registerTargetPoolAddInstanceHook(gce *Cloud, callback func(*compute.TargetPoolsAddInstanceRequest)) error { + mockGCE, ok := gce.c.(*cloud.MockGCE) + if !ok { + return fmt.Errorf("couldn't cast cloud to mockGCE: %#v", gce) + } + existingHandler := mockGCE.MockTargetPools.AddInstanceHook + hook := func(ctx context.Context, key *meta.Key, req *compute.TargetPoolsAddInstanceRequest, m *cloud.MockTargetPools) error { + callback(req) + return existingHandler(ctx, key, req, m) + } + mockGCE.MockTargetPools.AddInstanceHook = hook + return nil +} + +func registerTargetPoolRemoveInstanceHook(gce *Cloud, callback func(*compute.TargetPoolsRemoveInstanceRequest)) error { + mockGCE, ok := gce.c.(*cloud.MockGCE) + if !ok { + return fmt.Errorf("couldn't cast cloud to mockGCE: %#v", gce) + } + existingHandler := mockGCE.MockTargetPools.RemoveInstanceHook + hook := func(ctx context.Context, key *meta.Key, req *compute.TargetPoolsRemoveInstanceRequest, m *cloud.MockTargetPools) error { + callback(req) + return existingHandler(ctx, key, req, m) + } + mockGCE.MockTargetPools.RemoveInstanceHook = hook + return nil +} + type gceInstance struct { Zone string Name string