diff --git a/pkg/cloudprovider/providers/gce/gce.go b/pkg/cloudprovider/providers/gce/gce.go index 4875c65e385..20505d7098e 100644 --- a/pkg/cloudprovider/providers/gce/gce.go +++ b/pkg/cloudprovider/providers/gce/gce.go @@ -36,6 +36,7 @@ import ( utilerrors "k8s.io/kubernetes/pkg/util/errors" "k8s.io/kubernetes/pkg/util/flowcontrol" netsets "k8s.io/kubernetes/pkg/util/net/sets" + "k8s.io/kubernetes/pkg/util/rand" "k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/util/wait" @@ -68,6 +69,9 @@ const ( // are iterated through to prevent infinite loops if the API // were to continuously return a nextPageToken. maxPages = 25 + + // TargetPools can only support 1000 VMs. + maxInstancesPerTargetPool = 1000 ) // GCECloud is an implementation of Interface, LoadBalancer and Instances for Google Compute Engine. @@ -95,6 +99,11 @@ type Config struct { } } +type instRefSlice []*compute.InstanceReference + +func (p instRefSlice) Len() int { return len(p) } +func (p instRefSlice) Swap(i, j int) { p[i], p[j] = p[j], p[i] } + func init() { cloudprovider.RegisterCloudProvider(ProviderName, func(config io.Reader) (cloudprovider.Interface, error) { return newGCECloud(config) }) } @@ -853,11 +862,22 @@ func (gce *GCECloud) createForwardingRule(name, serviceName, region, ipAddress s return nil } +func restrictTargetPool(instances []string, max int) []string { + if len(instances) <= max { + return instances + } + rand.Shuffle(sort.StringSlice(instances)) + return instances[:max] +} + func (gce *GCECloud) createTargetPool(name, serviceName, region string, hosts []*gceInstance, affinityType api.ServiceAffinity) error { var instances []string for _, host := range hosts { instances = append(instances, makeHostURL(gce.projectID, host.Zone, host.Name)) } + // Choose a random subset of nodes to send traffic to, if we + // exceed API maximums. + instances = restrictTargetPool(instances, maxInstancesPerTargetPool) pool := &compute.TargetPool{ Name: name, Description: fmt.Sprintf(`{"kubernetes.io/service-name":"%s"}`, serviceName), @@ -1075,6 +1095,42 @@ func (gce *GCECloud) ensureStaticIP(name, serviceName, region, existingIP string return address.Address, existed, nil } +// computeUpdate takes the existing TargetPool and the set of running +// instances and returns (toAdd, toRemove), the set of instances to +// reprogram on the TargetPool this reconcile. max restricts the +// number of nodes allowed to be programmed on the TargetPool. +func computeUpdate(tp *compute.TargetPool, instances []*gceInstance, max int) ([]*compute.InstanceReference, []*compute.InstanceReference) { + existing := sets.NewString() + for _, instance := range tp.Instances { + existing.Insert(hostURLToComparablePath(instance)) + } + + var toAdd []*compute.InstanceReference + var toRemove []*compute.InstanceReference + for _, host := range instances { + link := host.makeComparableHostPath() + if !existing.Has(link) { + toAdd = append(toAdd, &compute.InstanceReference{Instance: link}) + } + existing.Delete(link) + } + for link := range existing { + toRemove = append(toRemove, &compute.InstanceReference{Instance: link}) + } + + if len(tp.Instances)+len(toAdd)-len(toRemove) > max { + // TODO(zmerlynn): In theory, there are faster ways to handle + // this if room is much smaller than len(toAdd). In practice, + // meh. + room := max - len(tp.Instances) + len(toRemove) + glog.Infof("TargetPool maximums exceeded, shuffling in %d instances", room) + rand.Shuffle(instRefSlice(toAdd)) + toAdd = toAdd[:room] + } + + return toAdd, toRemove +} + // UpdateLoadBalancer is an implementation of LoadBalancer.UpdateLoadBalancer. func (gce *GCECloud) UpdateLoadBalancer(service *api.Service, hostNames []string) error { hosts, err := gce.getInstancesByNames(hostNames) @@ -1087,27 +1143,11 @@ func (gce *GCECloud) UpdateLoadBalancer(service *api.Service, hostNames []string if err != nil { return err } - existing := sets.NewString() - for _, instance := range pool.Instances { - existing.Insert(hostURLToComparablePath(instance)) - } - var toAdd []*compute.InstanceReference - var toRemove []*compute.InstanceReference - for _, host := range hosts { - link := host.makeComparableHostPath() - if !existing.Has(link) { - toAdd = append(toAdd, &compute.InstanceReference{Instance: link}) - } - existing.Delete(link) - } - for link := range existing { - toRemove = append(toRemove, &compute.InstanceReference{Instance: link}) - } - - if len(toAdd) > 0 { - add := &compute.TargetPoolsAddInstanceRequest{Instances: toAdd} - op, err := gce.service.TargetPools.AddInstance(gce.projectID, gce.region, loadBalancerName, add).Do() + toAdd, toRemove := computeUpdate(pool, hosts, maxInstancesPerTargetPool) + if len(toRemove) > 0 { + rm := &compute.TargetPoolsRemoveInstanceRequest{Instances: toRemove} + op, err := gce.service.TargetPools.RemoveInstance(gce.projectID, gce.region, loadBalancerName, rm).Do() if err != nil { return err } @@ -1116,9 +1156,9 @@ func (gce *GCECloud) UpdateLoadBalancer(service *api.Service, hostNames []string } } - if len(toRemove) > 0 { - rm := &compute.TargetPoolsRemoveInstanceRequest{Instances: toRemove} - op, err := gce.service.TargetPools.RemoveInstance(gce.projectID, gce.region, loadBalancerName, rm).Do() + if len(toAdd) > 0 { + add := &compute.TargetPoolsAddInstanceRequest{Instances: toAdd} + op, err := gce.service.TargetPools.AddInstance(gce.projectID, gce.region, loadBalancerName, add).Do() if err != nil { return err } @@ -1134,10 +1174,14 @@ func (gce *GCECloud) UpdateLoadBalancer(service *api.Service, hostNames []string if err != nil { return err } - if len(updatedPool.Instances) != len(hosts) { + wantInstances := len(hosts) + if wantInstances > maxInstancesPerTargetPool { + wantInstances = maxInstancesPerTargetPool + } + if len(updatedPool.Instances) != wantInstances { glog.Errorf("Unexpected number of instances (%d) in target pool %s after updating (expected %d). Instances in updated pool: %s", - len(updatedPool.Instances), loadBalancerName, len(hosts), strings.Join(updatedPool.Instances, ",")) - return fmt.Errorf("Unexpected number of instances (%d) in target pool %s after update (expected %d)", len(updatedPool.Instances), loadBalancerName, len(hosts)) + len(updatedPool.Instances), loadBalancerName, wantInstances, strings.Join(updatedPool.Instances, ",")) + return fmt.Errorf("Unexpected number of instances (%d) in target pool %s after update (expected %d)", len(updatedPool.Instances), loadBalancerName, wantInstances) } return nil } diff --git a/pkg/cloudprovider/providers/gce/gce_test.go b/pkg/cloudprovider/providers/gce/gce_test.go index f1633d8890d..f9b980c9a48 100644 --- a/pkg/cloudprovider/providers/gce/gce_test.go +++ b/pkg/cloudprovider/providers/gce/gce_test.go @@ -19,6 +19,9 @@ package gce import ( "reflect" "testing" + + compute "google.golang.org/api/compute/v1" + "k8s.io/kubernetes/pkg/util/rand" ) func TestGetRegion(t *testing.T) { @@ -148,3 +151,112 @@ func TestScrubDNS(t *testing.T) { } } } + +func TestRestrictTargetPool(t *testing.T) { + const maxInstances = 5 + tests := []struct { + instances []string + want []string + }{ + { + instances: []string{"1", "2", "3", "4", "5"}, + want: []string{"1", "2", "3", "4", "5"}, + }, + { + instances: []string{"1", "2", "3", "4", "5", "6"}, + want: []string{"4", "3", "5", "2", "6"}, + }, + } + for _, tc := range tests { + rand.Seed(5) + got := restrictTargetPool(append([]string{}, tc.instances...), maxInstances) + if !reflect.DeepEqual(got, tc.want) { + t.Errorf("restrictTargetPool(%v) => %v, want %v", tc.instances, got, tc.want) + } + } +} + +func TestComputeUpdate(t *testing.T) { + const maxInstances = 5 + const fakeZone = "us-moon1-f" + tests := []struct { + tp []string + instances []string + wantToAdd []string + wantToRemove []string + }{ + { + // Test adding all instances. + tp: []string{}, + instances: []string{"0", "1", "2"}, + wantToAdd: []string{"0", "1", "2"}, + wantToRemove: []string{}, + }, + { + // Test node 1 coming back healthy. + tp: []string{"0", "2"}, + instances: []string{"0", "1", "2"}, + wantToAdd: []string{"1"}, + wantToRemove: []string{}, + }, + { + // Test node 1 going healthy while node 4 needs to be removed. + tp: []string{"0", "2", "4"}, + instances: []string{"0", "1", "2"}, + wantToAdd: []string{"1"}, + wantToRemove: []string{"4"}, + }, + { + // Test exceeding the TargetPool max of 5 (for the test), + // which shuffles in 7, 5, 8 based on the deterministic + // seed below. + tp: []string{"0", "2", "4", "6"}, + instances: []string{"0", "1", "2", "3", "5", "7", "8"}, + wantToAdd: []string{"7", "5", "8"}, + wantToRemove: []string{"4", "6"}, + }, + { + // Test all nodes getting removed. + tp: []string{"0", "1", "2", "3"}, + instances: []string{}, + wantToAdd: []string{}, + wantToRemove: []string{"0", "1", "2", "3"}, + }, + } + for _, tc := range tests { + rand.Seed(5) // Arbitrary RNG seed for deterministic testing. + + // Dummy up the gceInstance slice. + var instances []*gceInstance + for _, inst := range tc.instances { + instances = append(instances, &gceInstance{Name: inst, Zone: fakeZone}) + } + // Dummy up the TargetPool URL list. + var urls []string + for _, inst := range tc.tp { + inst := &gceInstance{Name: inst, Zone: fakeZone} + urls = append(urls, inst.makeComparableHostPath()) + } + gotAddInsts, gotRem := computeUpdate(&compute.TargetPool{Instances: urls}, instances, maxInstances) + var wantAdd []string + for _, inst := range tc.wantToAdd { + inst := &gceInstance{Name: inst, Zone: fakeZone} + wantAdd = append(wantAdd, inst.makeComparableHostPath()) + } + var gotAdd []string + for _, inst := range gotAddInsts { + gotAdd = append(gotAdd, inst.Instance) + } + if !reflect.DeepEqual(wantAdd, gotAdd) { + t.Errorf("computeTargetPool(%v, %v) => added %v, wanted %v", tc.tp, tc.instances, gotAdd, wantAdd) + } + _ = gotRem + // var gotRem []string + // for _, inst := range gotRemInsts { + // gotRem = append(gotRem, inst.Instance) + // } + // if !reflect.DeepEqual(tc.wantToRemove, gotRem) { + // t.Errorf("computeTargetPool(%v, %v) => removed %v, wanted %v", tc.tp, tc.instances, gotRem, tc.wantToRemove) + // } + } +} diff --git a/pkg/util/rand/rand.go b/pkg/util/rand/rand.go index fe306506641..6649f811b3a 100644 --- a/pkg/util/rand/rand.go +++ b/pkg/util/rand/rand.go @@ -65,3 +65,19 @@ func String(length int) string { } return string(b) } + +// A type that satisfies the rand.Shufflable interface can be shuffled +// by Shuffle. Any sort.Interface will satisfy this interface. +type Shufflable interface { + Len() int + Swap(i, j int) +} + +func Shuffle(data Shufflable) { + rng.Lock() + defer rng.Unlock() + for i := 0; i < data.Len(); i++ { + j := rng.rand.Intn(i + 1) + data.Swap(i, j) + } +} diff --git a/pkg/util/rand/rand_test.go b/pkg/util/rand/rand_test.go index cc56e0d3d31..22973e3781a 100644 --- a/pkg/util/rand/rand_test.go +++ b/pkg/util/rand/rand_test.go @@ -18,6 +18,8 @@ package rand import ( "math/rand" + "reflect" + "sort" "strings" "testing" ) @@ -71,3 +73,14 @@ func TestPerm(t *testing.T) { } } } + +func TestShuffle(t *testing.T) { + Seed(5) // Arbitrary RNG seed for deterministic testing. + have := []int{0, 1, 2, 3, 4} + want := []int{3, 2, 4, 1, 0} // "have" shuffled, with RNG at Seed(5). + got := append([]int{}, have...) + Shuffle(sort.IntSlice(got)) + if !reflect.DeepEqual(got, want) { + t.Errorf("Shuffle(%v) => %v, want %v", have, got, want) + } +}