mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-24 04:06:03 +00:00
GCE: Allow nodes to exceed target pool maximums
If we would exceeded the TargetPool API maximums, instead just randomly select some subsection of the nodes to include in the TP instead.
This commit is contained in:
parent
e973b5d27a
commit
faf0c44429
@ -36,6 +36,7 @@ import (
|
||||
utilerrors "k8s.io/kubernetes/pkg/util/errors"
|
||||
"k8s.io/kubernetes/pkg/util/flowcontrol"
|
||||
netsets "k8s.io/kubernetes/pkg/util/net/sets"
|
||||
"k8s.io/kubernetes/pkg/util/rand"
|
||||
"k8s.io/kubernetes/pkg/util/sets"
|
||||
"k8s.io/kubernetes/pkg/util/wait"
|
||||
|
||||
@ -68,6 +69,9 @@ const (
|
||||
// are iterated through to prevent infinite loops if the API
|
||||
// were to continuously return a nextPageToken.
|
||||
maxPages = 25
|
||||
|
||||
// TargetPools can only support 1000 VMs.
|
||||
maxInstancesPerTargetPool = 1000
|
||||
)
|
||||
|
||||
// GCECloud is an implementation of Interface, LoadBalancer and Instances for Google Compute Engine.
|
||||
@ -95,6 +99,11 @@ type Config struct {
|
||||
}
|
||||
}
|
||||
|
||||
type instRefSlice []*compute.InstanceReference
|
||||
|
||||
func (p instRefSlice) Len() int { return len(p) }
|
||||
func (p instRefSlice) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
|
||||
|
||||
func init() {
|
||||
cloudprovider.RegisterCloudProvider(ProviderName, func(config io.Reader) (cloudprovider.Interface, error) { return newGCECloud(config) })
|
||||
}
|
||||
@ -853,11 +862,22 @@ func (gce *GCECloud) createForwardingRule(name, serviceName, region, ipAddress s
|
||||
return nil
|
||||
}
|
||||
|
||||
func restrictTargetPool(instances []string, max int) []string {
|
||||
if len(instances) <= max {
|
||||
return instances
|
||||
}
|
||||
rand.Shuffle(sort.StringSlice(instances))
|
||||
return instances[:max]
|
||||
}
|
||||
|
||||
func (gce *GCECloud) createTargetPool(name, serviceName, region string, hosts []*gceInstance, affinityType api.ServiceAffinity) error {
|
||||
var instances []string
|
||||
for _, host := range hosts {
|
||||
instances = append(instances, makeHostURL(gce.projectID, host.Zone, host.Name))
|
||||
}
|
||||
// Choose a random subset of nodes to send traffic to, if we
|
||||
// exceed API maximums.
|
||||
instances = restrictTargetPool(instances, maxInstancesPerTargetPool)
|
||||
pool := &compute.TargetPool{
|
||||
Name: name,
|
||||
Description: fmt.Sprintf(`{"kubernetes.io/service-name":"%s"}`, serviceName),
|
||||
@ -1075,6 +1095,42 @@ func (gce *GCECloud) ensureStaticIP(name, serviceName, region, existingIP string
|
||||
return address.Address, existed, nil
|
||||
}
|
||||
|
||||
// computeUpdate takes the existing TargetPool and the set of running
|
||||
// instances and returns (toAdd, toRemove), the set of instances to
|
||||
// reprogram on the TargetPool this reconcile. max restricts the
|
||||
// number of nodes allowed to be programmed on the TargetPool.
|
||||
func computeUpdate(tp *compute.TargetPool, instances []*gceInstance, max int) ([]*compute.InstanceReference, []*compute.InstanceReference) {
|
||||
existing := sets.NewString()
|
||||
for _, instance := range tp.Instances {
|
||||
existing.Insert(hostURLToComparablePath(instance))
|
||||
}
|
||||
|
||||
var toAdd []*compute.InstanceReference
|
||||
var toRemove []*compute.InstanceReference
|
||||
for _, host := range instances {
|
||||
link := host.makeComparableHostPath()
|
||||
if !existing.Has(link) {
|
||||
toAdd = append(toAdd, &compute.InstanceReference{Instance: link})
|
||||
}
|
||||
existing.Delete(link)
|
||||
}
|
||||
for link := range existing {
|
||||
toRemove = append(toRemove, &compute.InstanceReference{Instance: link})
|
||||
}
|
||||
|
||||
if len(tp.Instances)+len(toAdd)-len(toRemove) > max {
|
||||
// TODO(zmerlynn): In theory, there are faster ways to handle
|
||||
// this if room is much smaller than len(toAdd). In practice,
|
||||
// meh.
|
||||
room := max - len(tp.Instances) + len(toRemove)
|
||||
glog.Infof("TargetPool maximums exceeded, shuffling in %d instances", room)
|
||||
rand.Shuffle(instRefSlice(toAdd))
|
||||
toAdd = toAdd[:room]
|
||||
}
|
||||
|
||||
return toAdd, toRemove
|
||||
}
|
||||
|
||||
// UpdateLoadBalancer is an implementation of LoadBalancer.UpdateLoadBalancer.
|
||||
func (gce *GCECloud) UpdateLoadBalancer(service *api.Service, hostNames []string) error {
|
||||
hosts, err := gce.getInstancesByNames(hostNames)
|
||||
@ -1087,27 +1143,11 @@ func (gce *GCECloud) UpdateLoadBalancer(service *api.Service, hostNames []string
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
existing := sets.NewString()
|
||||
for _, instance := range pool.Instances {
|
||||
existing.Insert(hostURLToComparablePath(instance))
|
||||
}
|
||||
|
||||
var toAdd []*compute.InstanceReference
|
||||
var toRemove []*compute.InstanceReference
|
||||
for _, host := range hosts {
|
||||
link := host.makeComparableHostPath()
|
||||
if !existing.Has(link) {
|
||||
toAdd = append(toAdd, &compute.InstanceReference{Instance: link})
|
||||
}
|
||||
existing.Delete(link)
|
||||
}
|
||||
for link := range existing {
|
||||
toRemove = append(toRemove, &compute.InstanceReference{Instance: link})
|
||||
}
|
||||
|
||||
if len(toAdd) > 0 {
|
||||
add := &compute.TargetPoolsAddInstanceRequest{Instances: toAdd}
|
||||
op, err := gce.service.TargetPools.AddInstance(gce.projectID, gce.region, loadBalancerName, add).Do()
|
||||
toAdd, toRemove := computeUpdate(pool, hosts, maxInstancesPerTargetPool)
|
||||
if len(toRemove) > 0 {
|
||||
rm := &compute.TargetPoolsRemoveInstanceRequest{Instances: toRemove}
|
||||
op, err := gce.service.TargetPools.RemoveInstance(gce.projectID, gce.region, loadBalancerName, rm).Do()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -1116,9 +1156,9 @@ func (gce *GCECloud) UpdateLoadBalancer(service *api.Service, hostNames []string
|
||||
}
|
||||
}
|
||||
|
||||
if len(toRemove) > 0 {
|
||||
rm := &compute.TargetPoolsRemoveInstanceRequest{Instances: toRemove}
|
||||
op, err := gce.service.TargetPools.RemoveInstance(gce.projectID, gce.region, loadBalancerName, rm).Do()
|
||||
if len(toAdd) > 0 {
|
||||
add := &compute.TargetPoolsAddInstanceRequest{Instances: toAdd}
|
||||
op, err := gce.service.TargetPools.AddInstance(gce.projectID, gce.region, loadBalancerName, add).Do()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -1134,10 +1174,14 @@ func (gce *GCECloud) UpdateLoadBalancer(service *api.Service, hostNames []string
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if len(updatedPool.Instances) != len(hosts) {
|
||||
wantInstances := len(hosts)
|
||||
if wantInstances > maxInstancesPerTargetPool {
|
||||
wantInstances = maxInstancesPerTargetPool
|
||||
}
|
||||
if len(updatedPool.Instances) != wantInstances {
|
||||
glog.Errorf("Unexpected number of instances (%d) in target pool %s after updating (expected %d). Instances in updated pool: %s",
|
||||
len(updatedPool.Instances), loadBalancerName, len(hosts), strings.Join(updatedPool.Instances, ","))
|
||||
return fmt.Errorf("Unexpected number of instances (%d) in target pool %s after update (expected %d)", len(updatedPool.Instances), loadBalancerName, len(hosts))
|
||||
len(updatedPool.Instances), loadBalancerName, wantInstances, strings.Join(updatedPool.Instances, ","))
|
||||
return fmt.Errorf("Unexpected number of instances (%d) in target pool %s after update (expected %d)", len(updatedPool.Instances), loadBalancerName, wantInstances)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -19,6 +19,9 @@ package gce
|
||||
import (
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
compute "google.golang.org/api/compute/v1"
|
||||
"k8s.io/kubernetes/pkg/util/rand"
|
||||
)
|
||||
|
||||
func TestGetRegion(t *testing.T) {
|
||||
@ -148,3 +151,112 @@ func TestScrubDNS(t *testing.T) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestRestrictTargetPool(t *testing.T) {
|
||||
const maxInstances = 5
|
||||
tests := []struct {
|
||||
instances []string
|
||||
want []string
|
||||
}{
|
||||
{
|
||||
instances: []string{"1", "2", "3", "4", "5"},
|
||||
want: []string{"1", "2", "3", "4", "5"},
|
||||
},
|
||||
{
|
||||
instances: []string{"1", "2", "3", "4", "5", "6"},
|
||||
want: []string{"4", "3", "5", "2", "6"},
|
||||
},
|
||||
}
|
||||
for _, tc := range tests {
|
||||
rand.Seed(5)
|
||||
got := restrictTargetPool(append([]string{}, tc.instances...), maxInstances)
|
||||
if !reflect.DeepEqual(got, tc.want) {
|
||||
t.Errorf("restrictTargetPool(%v) => %v, want %v", tc.instances, got, tc.want)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestComputeUpdate(t *testing.T) {
|
||||
const maxInstances = 5
|
||||
const fakeZone = "us-moon1-f"
|
||||
tests := []struct {
|
||||
tp []string
|
||||
instances []string
|
||||
wantToAdd []string
|
||||
wantToRemove []string
|
||||
}{
|
||||
{
|
||||
// Test adding all instances.
|
||||
tp: []string{},
|
||||
instances: []string{"0", "1", "2"},
|
||||
wantToAdd: []string{"0", "1", "2"},
|
||||
wantToRemove: []string{},
|
||||
},
|
||||
{
|
||||
// Test node 1 coming back healthy.
|
||||
tp: []string{"0", "2"},
|
||||
instances: []string{"0", "1", "2"},
|
||||
wantToAdd: []string{"1"},
|
||||
wantToRemove: []string{},
|
||||
},
|
||||
{
|
||||
// Test node 1 going healthy while node 4 needs to be removed.
|
||||
tp: []string{"0", "2", "4"},
|
||||
instances: []string{"0", "1", "2"},
|
||||
wantToAdd: []string{"1"},
|
||||
wantToRemove: []string{"4"},
|
||||
},
|
||||
{
|
||||
// Test exceeding the TargetPool max of 5 (for the test),
|
||||
// which shuffles in 7, 5, 8 based on the deterministic
|
||||
// seed below.
|
||||
tp: []string{"0", "2", "4", "6"},
|
||||
instances: []string{"0", "1", "2", "3", "5", "7", "8"},
|
||||
wantToAdd: []string{"7", "5", "8"},
|
||||
wantToRemove: []string{"4", "6"},
|
||||
},
|
||||
{
|
||||
// Test all nodes getting removed.
|
||||
tp: []string{"0", "1", "2", "3"},
|
||||
instances: []string{},
|
||||
wantToAdd: []string{},
|
||||
wantToRemove: []string{"0", "1", "2", "3"},
|
||||
},
|
||||
}
|
||||
for _, tc := range tests {
|
||||
rand.Seed(5) // Arbitrary RNG seed for deterministic testing.
|
||||
|
||||
// Dummy up the gceInstance slice.
|
||||
var instances []*gceInstance
|
||||
for _, inst := range tc.instances {
|
||||
instances = append(instances, &gceInstance{Name: inst, Zone: fakeZone})
|
||||
}
|
||||
// Dummy up the TargetPool URL list.
|
||||
var urls []string
|
||||
for _, inst := range tc.tp {
|
||||
inst := &gceInstance{Name: inst, Zone: fakeZone}
|
||||
urls = append(urls, inst.makeComparableHostPath())
|
||||
}
|
||||
gotAddInsts, gotRem := computeUpdate(&compute.TargetPool{Instances: urls}, instances, maxInstances)
|
||||
var wantAdd []string
|
||||
for _, inst := range tc.wantToAdd {
|
||||
inst := &gceInstance{Name: inst, Zone: fakeZone}
|
||||
wantAdd = append(wantAdd, inst.makeComparableHostPath())
|
||||
}
|
||||
var gotAdd []string
|
||||
for _, inst := range gotAddInsts {
|
||||
gotAdd = append(gotAdd, inst.Instance)
|
||||
}
|
||||
if !reflect.DeepEqual(wantAdd, gotAdd) {
|
||||
t.Errorf("computeTargetPool(%v, %v) => added %v, wanted %v", tc.tp, tc.instances, gotAdd, wantAdd)
|
||||
}
|
||||
_ = gotRem
|
||||
// var gotRem []string
|
||||
// for _, inst := range gotRemInsts {
|
||||
// gotRem = append(gotRem, inst.Instance)
|
||||
// }
|
||||
// if !reflect.DeepEqual(tc.wantToRemove, gotRem) {
|
||||
// t.Errorf("computeTargetPool(%v, %v) => removed %v, wanted %v", tc.tp, tc.instances, gotRem, tc.wantToRemove)
|
||||
// }
|
||||
}
|
||||
}
|
||||
|
@ -65,3 +65,19 @@ func String(length int) string {
|
||||
}
|
||||
return string(b)
|
||||
}
|
||||
|
||||
// A type that satisfies the rand.Shufflable interface can be shuffled
|
||||
// by Shuffle. Any sort.Interface will satisfy this interface.
|
||||
type Shufflable interface {
|
||||
Len() int
|
||||
Swap(i, j int)
|
||||
}
|
||||
|
||||
func Shuffle(data Shufflable) {
|
||||
rng.Lock()
|
||||
defer rng.Unlock()
|
||||
for i := 0; i < data.Len(); i++ {
|
||||
j := rng.rand.Intn(i + 1)
|
||||
data.Swap(i, j)
|
||||
}
|
||||
}
|
||||
|
@ -18,6 +18,8 @@ package rand
|
||||
|
||||
import (
|
||||
"math/rand"
|
||||
"reflect"
|
||||
"sort"
|
||||
"strings"
|
||||
"testing"
|
||||
)
|
||||
@ -71,3 +73,14 @@ func TestPerm(t *testing.T) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestShuffle(t *testing.T) {
|
||||
Seed(5) // Arbitrary RNG seed for deterministic testing.
|
||||
have := []int{0, 1, 2, 3, 4}
|
||||
want := []int{3, 2, 4, 1, 0} // "have" shuffled, with RNG at Seed(5).
|
||||
got := append([]int{}, have...)
|
||||
Shuffle(sort.IntSlice(got))
|
||||
if !reflect.DeepEqual(got, want) {
|
||||
t.Errorf("Shuffle(%v) => %v, want %v", have, got, want)
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user