mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-25 20:53:33 +00:00
Limit number of instances in single update to GCE target pool
This commit is contained in:
parent
641321c94c
commit
8ec193ba9c
@ -68,8 +68,9 @@ const (
|
|||||||
// AffinityTypeClientIP - affinity based on Client IP.
|
// AffinityTypeClientIP - affinity based on Client IP.
|
||||||
gceAffinityTypeClientIP = "CLIENT_IP"
|
gceAffinityTypeClientIP = "CLIENT_IP"
|
||||||
|
|
||||||
operationPollInterval = time.Second
|
operationPollInterval = time.Second
|
||||||
maxTargetPoolCreateInstances = 200
|
maxTargetPoolCreateInstances = 200
|
||||||
|
maxInstancesPerTargetPoolUpdate = 1000
|
||||||
|
|
||||||
// HTTP Load Balancer parameters
|
// HTTP Load Balancer parameters
|
||||||
// Configure 8 second period for external health checks.
|
// Configure 8 second period for external health checks.
|
||||||
|
@ -591,16 +591,32 @@ func (g *Cloud) updateTargetPool(loadBalancerName string, hosts []*gceInstance)
|
|||||||
toRemove = append(toRemove, &compute.InstanceReference{Instance: link})
|
toRemove = append(toRemove, &compute.InstanceReference{Instance: link})
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(toAdd) > 0 {
|
for len(toAdd) > 0 {
|
||||||
if err := g.AddInstancesToTargetPool(loadBalancerName, g.region, toAdd); err != nil {
|
// Do not remove more than maxInstancesPerTargetPoolUpdate in a single call.
|
||||||
|
instancesCount := len(toAdd)
|
||||||
|
if instancesCount > maxInstancesPerTargetPoolUpdate {
|
||||||
|
instancesCount = maxInstancesPerTargetPoolUpdate
|
||||||
|
}
|
||||||
|
// The operation to add 1000 instances is fairly long (may take minutes), so
|
||||||
|
// we don't need to worry about saturating QPS limits.
|
||||||
|
if err := g.AddInstancesToTargetPool(loadBalancerName, g.region, toAdd[:instancesCount]); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
toAdd = toAdd[instancesCount:]
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(toRemove) > 0 {
|
for len(toRemove) > 0 {
|
||||||
if err := g.RemoveInstancesFromTargetPool(loadBalancerName, g.region, toRemove); err != nil {
|
// Do not remove more than maxInstancesPerTargetPoolUpdate in a single call.
|
||||||
|
instancesCount := len(toRemove)
|
||||||
|
if instancesCount > maxInstancesPerTargetPoolUpdate {
|
||||||
|
instancesCount = maxInstancesPerTargetPoolUpdate
|
||||||
|
}
|
||||||
|
// The operation to remove 1000 instances is fairly long (may take minutes), so
|
||||||
|
// we don't need to worry about saturating QPS limits.
|
||||||
|
if err := g.RemoveInstancesFromTargetPool(loadBalancerName, g.region, toRemove[:instancesCount]); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
toRemove = toRemove[instancesCount:]
|
||||||
}
|
}
|
||||||
|
|
||||||
// Try to verify that the correct number of nodes are now in the target pool.
|
// Try to verify that the correct number of nodes are now in the target pool.
|
||||||
|
@ -568,6 +568,53 @@ func TestForwardingRuleNeedsUpdate(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestTargetPoolAddsAndRemoveInstancesInBatches(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
vals := DefaultTestClusterValues()
|
||||||
|
gce, err := fakeGCECloud(DefaultTestClusterValues())
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
addInstanceCalls := 0
|
||||||
|
addInstanceHook := func(req *compute.TargetPoolsAddInstanceRequest) {
|
||||||
|
addInstanceCalls++
|
||||||
|
}
|
||||||
|
removeInstanceCalls := 0
|
||||||
|
removeInstanceHook := func(req *compute.TargetPoolsRemoveInstanceRequest) {
|
||||||
|
removeInstanceCalls++
|
||||||
|
}
|
||||||
|
|
||||||
|
err = registerTargetPoolAddInstanceHook(gce, addInstanceHook)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
err = registerTargetPoolRemoveInstanceHook(gce, removeInstanceHook)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
svc := fakeLoadbalancerService("")
|
||||||
|
nodeName := "default-node"
|
||||||
|
_, err = createExternalLoadBalancer(gce, svc, []string{nodeName}, vals.ClusterName, vals.ClusterID, vals.ZoneName)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
// Insert large number of nodes to test batching.
|
||||||
|
additionalNodeNames := []string{}
|
||||||
|
for i := 0; i < 2*maxInstancesPerTargetPoolUpdate+2; i++ {
|
||||||
|
additionalNodeNames = append(additionalNodeNames, fmt.Sprintf("node-%d", i))
|
||||||
|
}
|
||||||
|
allNodes, err := createAndInsertNodes(gce, append([]string{nodeName}, additionalNodeNames...), vals.ZoneName)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
err = gce.updateExternalLoadBalancer("", svc, allNodes)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
assert.Equal(t, 3, addInstanceCalls)
|
||||||
|
|
||||||
|
// Remove large number of nodes to test batching.
|
||||||
|
allNodes, err = createAndInsertNodes(gce, []string{nodeName}, vals.ZoneName)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
err = gce.updateExternalLoadBalancer("", svc, allNodes)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
assert.Equal(t, 3, removeInstanceCalls)
|
||||||
|
}
|
||||||
|
|
||||||
func TestTargetPoolNeedsRecreation(t *testing.T) {
|
func TestTargetPoolNeedsRecreation(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
|
|
||||||
|
@ -19,6 +19,7 @@ limitations under the License.
|
|||||||
package gce
|
package gce
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"net"
|
"net"
|
||||||
@ -81,6 +82,34 @@ func fakeGCECloud(vals TestClusterValues) (*Cloud, error) {
|
|||||||
return gce, nil
|
return gce, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func registerTargetPoolAddInstanceHook(gce *Cloud, callback func(*compute.TargetPoolsAddInstanceRequest)) error {
|
||||||
|
mockGCE, ok := gce.c.(*cloud.MockGCE)
|
||||||
|
if !ok {
|
||||||
|
return fmt.Errorf("couldn't cast cloud to mockGCE: %#v", gce)
|
||||||
|
}
|
||||||
|
existingHandler := mockGCE.MockTargetPools.AddInstanceHook
|
||||||
|
hook := func(ctx context.Context, key *meta.Key, req *compute.TargetPoolsAddInstanceRequest, m *cloud.MockTargetPools) error {
|
||||||
|
callback(req)
|
||||||
|
return existingHandler(ctx, key, req, m)
|
||||||
|
}
|
||||||
|
mockGCE.MockTargetPools.AddInstanceHook = hook
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func registerTargetPoolRemoveInstanceHook(gce *Cloud, callback func(*compute.TargetPoolsRemoveInstanceRequest)) error {
|
||||||
|
mockGCE, ok := gce.c.(*cloud.MockGCE)
|
||||||
|
if !ok {
|
||||||
|
return fmt.Errorf("couldn't cast cloud to mockGCE: %#v", gce)
|
||||||
|
}
|
||||||
|
existingHandler := mockGCE.MockTargetPools.RemoveInstanceHook
|
||||||
|
hook := func(ctx context.Context, key *meta.Key, req *compute.TargetPoolsRemoveInstanceRequest, m *cloud.MockTargetPools) error {
|
||||||
|
callback(req)
|
||||||
|
return existingHandler(ctx, key, req, m)
|
||||||
|
}
|
||||||
|
mockGCE.MockTargetPools.RemoveInstanceHook = hook
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
type gceInstance struct {
|
type gceInstance struct {
|
||||||
Zone string
|
Zone string
|
||||||
Name string
|
Name string
|
||||||
|
Loading…
Reference in New Issue
Block a user