From ebc65f1dd3d0cc5f506f0ae7d0b88973fd087cb1 Mon Sep 17 00:00:00 2001 From: Zach Loafman Date: Mon, 13 Jun 2016 09:49:10 -0700 Subject: [PATCH 1/2] Revert "GCE: Allow nodes to exceed target pool maximums" This reverts commit faf0c44429cfb285cdce811be61b5b39ac37e233. --- pkg/cloudprovider/providers/gce/gce.go | 96 +++++------------ pkg/cloudprovider/providers/gce/gce_test.go | 112 -------------------- pkg/util/rand/rand.go | 16 --- pkg/util/rand/rand_test.go | 13 --- 4 files changed, 26 insertions(+), 211 deletions(-) diff --git a/pkg/cloudprovider/providers/gce/gce.go b/pkg/cloudprovider/providers/gce/gce.go index d2cf415b54e..0ffcce45279 100644 --- a/pkg/cloudprovider/providers/gce/gce.go +++ b/pkg/cloudprovider/providers/gce/gce.go @@ -38,7 +38,6 @@ import ( utilerrors "k8s.io/kubernetes/pkg/util/errors" "k8s.io/kubernetes/pkg/util/flowcontrol" netsets "k8s.io/kubernetes/pkg/util/net/sets" - "k8s.io/kubernetes/pkg/util/rand" "k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/util/wait" @@ -70,9 +69,6 @@ const ( // are iterated through to prevent infinite loops if the API // were to continuously return a nextPageToken. maxPages = 25 - - // TargetPools can only support 1000 VMs. - maxInstancesPerTargetPool = 1000 ) // GCECloud is an implementation of Interface, LoadBalancer and Instances for Google Compute Engine. @@ -127,11 +123,6 @@ type Disks interface { GetAutoLabelsForPD(name string) (map[string]string, error) } -type instRefSlice []*compute.InstanceReference - -func (p instRefSlice) Len() int { return len(p) } -func (p instRefSlice) Swap(i, j int) { p[i], p[j] = p[j], p[i] } - func init() { cloudprovider.RegisterCloudProvider(ProviderName, func(config io.Reader) (cloudprovider.Interface, error) { return newGCECloud(config) }) } @@ -907,22 +898,11 @@ func (gce *GCECloud) createForwardingRule(name, serviceName, region, ipAddress s return nil } -func restrictTargetPool(instances []string, max int) []string { - if len(instances) <= max { - return instances - } - rand.Shuffle(sort.StringSlice(instances)) - return instances[:max] -} - func (gce *GCECloud) createTargetPool(name, serviceName, region string, hosts []*gceInstance, affinityType api.ServiceAffinity) error { var instances []string for _, host := range hosts { instances = append(instances, makeHostURL(gce.projectID, host.Zone, host.Name)) } - // Choose a random subset of nodes to send traffic to, if we - // exceed API maximums. - instances = restrictTargetPool(instances, maxInstancesPerTargetPool) pool := &compute.TargetPool{ Name: name, Description: fmt.Sprintf(`{"kubernetes.io/service-name":"%s"}`, serviceName), @@ -1158,42 +1138,6 @@ func (gce *GCECloud) ensureStaticIP(name, serviceName, region, existingIP string return address.Address, existed, nil } -// computeUpdate takes the existing TargetPool and the set of running -// instances and returns (toAdd, toRemove), the set of instances to -// reprogram on the TargetPool this reconcile. max restricts the -// number of nodes allowed to be programmed on the TargetPool. -func computeUpdate(tp *compute.TargetPool, instances []*gceInstance, max int) ([]*compute.InstanceReference, []*compute.InstanceReference) { - existing := sets.NewString() - for _, instance := range tp.Instances { - existing.Insert(hostURLToComparablePath(instance)) - } - - var toAdd []*compute.InstanceReference - var toRemove []*compute.InstanceReference - for _, host := range instances { - link := host.makeComparableHostPath() - if !existing.Has(link) { - toAdd = append(toAdd, &compute.InstanceReference{Instance: link}) - } - existing.Delete(link) - } - for link := range existing { - toRemove = append(toRemove, &compute.InstanceReference{Instance: link}) - } - - if len(tp.Instances)+len(toAdd)-len(toRemove) > max { - // TODO(zmerlynn): In theory, there are faster ways to handle - // this if room is much smaller than len(toAdd). In practice, - // meh. - room := max - len(tp.Instances) + len(toRemove) - glog.Infof("TargetPool maximums exceeded, shuffling in %d instances", room) - rand.Shuffle(instRefSlice(toAdd)) - toAdd = toAdd[:room] - } - - return toAdd, toRemove -} - // UpdateLoadBalancer is an implementation of LoadBalancer.UpdateLoadBalancer. func (gce *GCECloud) UpdateLoadBalancer(service *api.Service, hostNames []string) error { hosts, err := gce.getInstancesByNames(hostNames) @@ -1206,11 +1150,27 @@ func (gce *GCECloud) UpdateLoadBalancer(service *api.Service, hostNames []string if err != nil { return err } + existing := sets.NewString() + for _, instance := range pool.Instances { + existing.Insert(hostURLToComparablePath(instance)) + } - toAdd, toRemove := computeUpdate(pool, hosts, maxInstancesPerTargetPool) - if len(toRemove) > 0 { - rm := &compute.TargetPoolsRemoveInstanceRequest{Instances: toRemove} - op, err := gce.service.TargetPools.RemoveInstance(gce.projectID, gce.region, loadBalancerName, rm).Do() + var toAdd []*compute.InstanceReference + var toRemove []*compute.InstanceReference + for _, host := range hosts { + link := host.makeComparableHostPath() + if !existing.Has(link) { + toAdd = append(toAdd, &compute.InstanceReference{Instance: link}) + } + existing.Delete(link) + } + for link := range existing { + toRemove = append(toRemove, &compute.InstanceReference{Instance: link}) + } + + if len(toAdd) > 0 { + add := &compute.TargetPoolsAddInstanceRequest{Instances: toAdd} + op, err := gce.service.TargetPools.AddInstance(gce.projectID, gce.region, loadBalancerName, add).Do() if err != nil { return err } @@ -1219,9 +1179,9 @@ func (gce *GCECloud) UpdateLoadBalancer(service *api.Service, hostNames []string } } - if len(toAdd) > 0 { - add := &compute.TargetPoolsAddInstanceRequest{Instances: toAdd} - op, err := gce.service.TargetPools.AddInstance(gce.projectID, gce.region, loadBalancerName, add).Do() + if len(toRemove) > 0 { + rm := &compute.TargetPoolsRemoveInstanceRequest{Instances: toRemove} + op, err := gce.service.TargetPools.RemoveInstance(gce.projectID, gce.region, loadBalancerName, rm).Do() if err != nil { return err } @@ -1237,14 +1197,10 @@ func (gce *GCECloud) UpdateLoadBalancer(service *api.Service, hostNames []string if err != nil { return err } - wantInstances := len(hosts) - if wantInstances > maxInstancesPerTargetPool { - wantInstances = maxInstancesPerTargetPool - } - if len(updatedPool.Instances) != wantInstances { + if len(updatedPool.Instances) != len(hosts) { glog.Errorf("Unexpected number of instances (%d) in target pool %s after updating (expected %d). Instances in updated pool: %s", - len(updatedPool.Instances), loadBalancerName, wantInstances, strings.Join(updatedPool.Instances, ",")) - return fmt.Errorf("Unexpected number of instances (%d) in target pool %s after update (expected %d)", len(updatedPool.Instances), loadBalancerName, wantInstances) + len(updatedPool.Instances), loadBalancerName, len(hosts), strings.Join(updatedPool.Instances, ",")) + return fmt.Errorf("Unexpected number of instances (%d) in target pool %s after update (expected %d)", len(updatedPool.Instances), loadBalancerName, len(hosts)) } return nil } diff --git a/pkg/cloudprovider/providers/gce/gce_test.go b/pkg/cloudprovider/providers/gce/gce_test.go index ad20044cc11..f7ca0f881fd 100644 --- a/pkg/cloudprovider/providers/gce/gce_test.go +++ b/pkg/cloudprovider/providers/gce/gce_test.go @@ -19,9 +19,6 @@ package gce import ( "reflect" "testing" - - compute "google.golang.org/api/compute/v1" - "k8s.io/kubernetes/pkg/util/rand" ) func TestGetRegion(t *testing.T) { @@ -161,112 +158,3 @@ func TestCreateFirewallFails(t *testing.T) { t.Errorf("error expected when creating firewall without any tags found") } } - -func TestRestrictTargetPool(t *testing.T) { - const maxInstances = 5 - tests := []struct { - instances []string - want []string - }{ - { - instances: []string{"1", "2", "3", "4", "5"}, - want: []string{"1", "2", "3", "4", "5"}, - }, - { - instances: []string{"1", "2", "3", "4", "5", "6"}, - want: []string{"4", "3", "5", "2", "6"}, - }, - } - for _, tc := range tests { - rand.Seed(5) - got := restrictTargetPool(append([]string{}, tc.instances...), maxInstances) - if !reflect.DeepEqual(got, tc.want) { - t.Errorf("restrictTargetPool(%v) => %v, want %v", tc.instances, got, tc.want) - } - } -} - -func TestComputeUpdate(t *testing.T) { - const maxInstances = 5 - const fakeZone = "us-moon1-f" - tests := []struct { - tp []string - instances []string - wantToAdd []string - wantToRemove []string - }{ - { - // Test adding all instances. - tp: []string{}, - instances: []string{"0", "1", "2"}, - wantToAdd: []string{"0", "1", "2"}, - wantToRemove: []string{}, - }, - { - // Test node 1 coming back healthy. - tp: []string{"0", "2"}, - instances: []string{"0", "1", "2"}, - wantToAdd: []string{"1"}, - wantToRemove: []string{}, - }, - { - // Test node 1 going healthy while node 4 needs to be removed. - tp: []string{"0", "2", "4"}, - instances: []string{"0", "1", "2"}, - wantToAdd: []string{"1"}, - wantToRemove: []string{"4"}, - }, - { - // Test exceeding the TargetPool max of 5 (for the test), - // which shuffles in 7, 5, 8 based on the deterministic - // seed below. - tp: []string{"0", "2", "4", "6"}, - instances: []string{"0", "1", "2", "3", "5", "7", "8"}, - wantToAdd: []string{"7", "5", "8"}, - wantToRemove: []string{"4", "6"}, - }, - { - // Test all nodes getting removed. - tp: []string{"0", "1", "2", "3"}, - instances: []string{}, - wantToAdd: []string{}, - wantToRemove: []string{"0", "1", "2", "3"}, - }, - } - for _, tc := range tests { - rand.Seed(5) // Arbitrary RNG seed for deterministic testing. - - // Dummy up the gceInstance slice. - var instances []*gceInstance - for _, inst := range tc.instances { - instances = append(instances, &gceInstance{Name: inst, Zone: fakeZone}) - } - // Dummy up the TargetPool URL list. - var urls []string - for _, inst := range tc.tp { - inst := &gceInstance{Name: inst, Zone: fakeZone} - urls = append(urls, inst.makeComparableHostPath()) - } - gotAddInsts, gotRem := computeUpdate(&compute.TargetPool{Instances: urls}, instances, maxInstances) - var wantAdd []string - for _, inst := range tc.wantToAdd { - inst := &gceInstance{Name: inst, Zone: fakeZone} - wantAdd = append(wantAdd, inst.makeComparableHostPath()) - } - var gotAdd []string - for _, inst := range gotAddInsts { - gotAdd = append(gotAdd, inst.Instance) - } - if !reflect.DeepEqual(wantAdd, gotAdd) { - t.Errorf("computeTargetPool(%v, %v) => added %v, wanted %v", tc.tp, tc.instances, gotAdd, wantAdd) - } - _ = gotRem - // var gotRem []string - // for _, inst := range gotRemInsts { - // gotRem = append(gotRem, inst.Instance) - // } - // if !reflect.DeepEqual(tc.wantToRemove, gotRem) { - // t.Errorf("computeTargetPool(%v, %v) => removed %v, wanted %v", tc.tp, tc.instances, gotRem, tc.wantToRemove) - // } - } -} diff --git a/pkg/util/rand/rand.go b/pkg/util/rand/rand.go index 1f646fbd90a..f2647af337a 100644 --- a/pkg/util/rand/rand.go +++ b/pkg/util/rand/rand.go @@ -81,19 +81,3 @@ func String(length int) string { } return string(b) } - -// A type that satisfies the rand.Shufflable interface can be shuffled -// by Shuffle. Any sort.Interface will satisfy this interface. -type Shufflable interface { - Len() int - Swap(i, j int) -} - -func Shuffle(data Shufflable) { - rng.Lock() - defer rng.Unlock() - for i := 0; i < data.Len(); i++ { - j := rng.rand.Intn(i + 1) - data.Swap(i, j) - } -} diff --git a/pkg/util/rand/rand_test.go b/pkg/util/rand/rand_test.go index 23fa5a78b96..12cdb008e2a 100644 --- a/pkg/util/rand/rand_test.go +++ b/pkg/util/rand/rand_test.go @@ -18,8 +18,6 @@ package rand import ( "math/rand" - "reflect" - "sort" "strings" "testing" ) @@ -78,17 +76,6 @@ func TestPerm(t *testing.T) { } } -func TestShuffle(t *testing.T) { - Seed(5) // Arbitrary RNG seed for deterministic testing. - have := []int{0, 1, 2, 3, 4} - want := []int{3, 2, 4, 1, 0} // "have" shuffled, with RNG at Seed(5). - got := append([]int{}, have...) - Shuffle(sort.IntSlice(got)) - if !reflect.DeepEqual(got, want) { - t.Errorf("Shuffle(%v) => %v, want %v", have, got, want) - } -} - func TestIntnRange(t *testing.T) { // 0 is invalid. for min, max := range map[int]int{1: 2, 10: 123, 100: 500} { From f63ac1962cd6515f16fab4bfddbff09c3f5291a2 Mon Sep 17 00:00:00 2001 From: Zach Loafman Date: Tue, 21 Jun 2016 18:09:31 -0700 Subject: [PATCH 2/2] GCE provider: Create TargetPool with 200 instances, then update with rest Tested with 2000 nodes, this actually meets the GCE API specifications (which is nutty). Previous PR (#25178) was based on a mistaken understanding of a poorly documented set of limitations, and even poorer testing, for which I am embarassed. --- pkg/cloudprovider/providers/gce/gce.go | 30 +++++++++++++++++++++++--- 1 file changed, 27 insertions(+), 3 deletions(-) diff --git a/pkg/cloudprovider/providers/gce/gce.go b/pkg/cloudprovider/providers/gce/gce.go index 0ffcce45279..92d09ce13be 100644 --- a/pkg/cloudprovider/providers/gce/gce.go +++ b/pkg/cloudprovider/providers/gce/gce.go @@ -69,6 +69,9 @@ const ( // are iterated through to prevent infinite loops if the API // were to continuously return a nextPageToken. maxPages = 25 + + // Target Pool creation is limited to 200 instances. + maxTargetPoolCreateInstances = 200 ) // GCECloud is an implementation of Interface, LoadBalancer and Instances for Google Compute Engine. @@ -537,7 +540,7 @@ func (gce *GCECloud) EnsureLoadBalancer(apiService *api.Service, hostNames []str affinityType := apiService.Spec.SessionAffinity serviceName := types.NamespacedName{Namespace: apiService.Namespace, Name: apiService.Name} - glog.V(2).Infof("EnsureLoadBalancer(%v, %v, %v, %v, %v, %v, %v)", loadBalancerName, gce.region, loadBalancerIP, portStr, hosts, serviceName, apiService.Annotations) + glog.V(2).Infof("EnsureLoadBalancer(%v, %v, %v, %v, %v, %v, %v)", loadBalancerName, gce.region, loadBalancerIP, portStr, hostNames, serviceName, apiService.Annotations) // Check if the forwarding rule exists, and if so, what its IP is. fwdRuleExists, fwdRuleNeedsUpdate, fwdRuleIP, err := gce.forwardingRuleNeedsUpdate(loadBalancerName, gce.region, loadBalancerIP, ports) @@ -706,10 +709,27 @@ func (gce *GCECloud) EnsureLoadBalancer(apiService *api.Service, hostNames []str // Once we've deleted the resources (if necessary), build them back up (or for // the first time if they're new). if tpNeedsUpdate { - if err := gce.createTargetPool(loadBalancerName, serviceName.String(), gce.region, hosts, affinityType); err != nil { + createInstances := hosts + if len(hosts) > maxTargetPoolCreateInstances { + createInstances = createInstances[:maxTargetPoolCreateInstances] + } + + if err := gce.createTargetPool(loadBalancerName, serviceName.String(), gce.region, createInstances, affinityType); err != nil { return nil, fmt.Errorf("failed to create target pool %s: %v", loadBalancerName, err) } - glog.V(4).Infof("EnsureLoadBalancer(%v(%v)): created target pool", loadBalancerName, serviceName) + if len(hosts) <= maxTargetPoolCreateInstances { + glog.V(4).Infof("EnsureLoadBalancer(%v(%v)): created target pool", loadBalancerName, serviceName) + } else { + glog.V(4).Infof("EnsureLoadBalancer(%v(%v)): created initial target pool (now updating with %d hosts)", loadBalancerName, serviceName, len(hosts)-maxTargetPoolCreateInstances) + + created := sets.NewString() + for _, host := range createInstances { + created.Insert(host.makeComparableHostPath()) + } + if err := gce.updateTargetPool(loadBalancerName, created, hosts); err != nil { + return nil, fmt.Errorf("failed to update target pool %s: %v", loadBalancerName, err) + } + } } if tpNeedsUpdate || fwdRuleNeedsUpdate { if err := gce.createForwardingRule(loadBalancerName, serviceName.String(), gce.region, ipAddress, ports); err != nil { @@ -1155,6 +1175,10 @@ func (gce *GCECloud) UpdateLoadBalancer(service *api.Service, hostNames []string existing.Insert(hostURLToComparablePath(instance)) } + return gce.updateTargetPool(loadBalancerName, existing, hosts) +} + +func (gce *GCECloud) updateTargetPool(loadBalancerName string, existing sets.String, hosts []*gceInstance) error { var toAdd []*compute.InstanceReference var toRemove []*compute.InstanceReference for _, host := range hosts {