From 8dfdc5b612967fdf42b932440cc100accb259ead Mon Sep 17 00:00:00 2001 From: Marcin Wielgus Date: Fri, 19 Aug 2016 10:48:16 +0200 Subject: [PATCH] Support for preexisting replicas and estimated capacity in federated replica set planner --- .../replicaset/planner/planner.go | 97 ++++++++++- .../replicaset/planner/planner_test.go | 154 +++++++++++++++++- 2 files changed, 242 insertions(+), 9 deletions(-) diff --git a/federation/pkg/federation-controller/replicaset/planner/planner.go b/federation/pkg/federation-controller/replicaset/planner/planner.go index 2c86ec7a2cf..e16caf632eb 100644 --- a/federation/pkg/federation-controller/replicaset/planner/planner.go +++ b/federation/pkg/federation-controller/replicaset/planner/planner.go @@ -17,6 +17,7 @@ limitations under the License. package planer import ( + "math" "sort" fed_api "k8s.io/kubernetes/federation/apis/federation" @@ -54,9 +55,18 @@ func NewPlanner(preferences *fed_api.FederatedReplicaSetPreferences) *Planner { // sum of MinReplicas for all cluster is bigger thant replicasToDistribute then some cluster will not // have all of the replicas assigned. In such case a cluster with higher weight has priority over // cluster with lower weight (or with lexicographically smaller name in case of draw). -func (p *Planner) Plan(replicasToDistribute int64, availableClusters []string) map[string]int64 { +// It can also use the current replica count and estimated capacity to provide better planning and +// adhere to rebalance policy. +// Two maps are returned: +// * a map that contains information how many replicas will be possible to run in a cluster. +// * a map that contains information how many extra replicas would be nice to schedule in a cluster so, +// if by chance, they are scheudled we will be closer to the desired replicas layout. +func (p *Planner) Plan(replicasToDistribute int64, availableClusters []string, currentReplicaCount map[string]int64, + estimatedCapacity map[string]int64) (map[string]int64, map[string]int64) { + preferences := make([]*namedClusterReplicaSetPreferences, 0, len(availableClusters)) plan := make(map[string]int64, len(preferences)) + overflow := make(map[string]int64, len(preferences)) named := func(name string, pref fed_api.ClusterReplicaSetPreferences) *namedClusterReplicaSetPreferences { return &namedClusterReplicaSetPreferences{ @@ -83,19 +93,59 @@ func (p *Planner) Plan(replicasToDistribute int64, availableClusters []string) m // Assign each cluster the minimum number of replicas it requested. for _, preference := range preferences { min := minInt64(preference.MinReplicas, remainingReplicas) + if capacity, hasCapacity := estimatedCapacity[preference.clusterName]; hasCapacity { + min = minInt64(min, capacity) + } remainingReplicas -= min plan[preference.clusterName] = min } + // This map contains information how many replicas were assigned to + // the cluster based only on the current replica count and + // rebalance=false preference. It will be later used in remaining replica + // distribution code. + preallocated := make(map[string]int64) + + if p.preferences.Rebalance == false { + for _, preference := range preferences { + planned := plan[preference.clusterName] + count, hasSome := currentReplicaCount[preference.clusterName] + if hasSome && count > planned { + target := count + if preference.MaxReplicas != nil { + target = minInt64(*preference.MaxReplicas, target) + } + if capacity, hasCapacity := estimatedCapacity[preference.clusterName]; hasCapacity { + target = minInt64(capacity, target) + } + extra := minInt64(target-planned, remainingReplicas) + if extra < 0 { + extra = 0 + } + remainingReplicas -= extra + preallocated[preference.clusterName] = extra + plan[preference.clusterName] = extra + planned + } + } + } + modified := true // It is possible single pass of the loop is not enough to distribue all replicas among clusters due // to weight, max and rounding corner cases. In such case we iterate until either // there is no replicas or no cluster gets any more replicas or the number - // of attempts is less than available cluster count. Every loop either distributes all remainingReplicas - // or maxes out at least one cluster. - // TODO: This algorithm is O(clusterCount^2). When needed use sweep-like algorithm for O(n log n). - for trial := 0; trial < len(availableClusters) && modified && remainingReplicas > 0; trial++ { + // of attempts is less than available cluster count. If there is no preallocated pods + // every loop either distributes all remainingReplicas or maxes out at least one cluster. + // If there are preallocated then the replica spreading may take longer. + // We reduce the number of pending preallocated replicas by at least half with each iteration so + // we may need log(replicasAtStart) iterations. + // TODO: Prove that clusterCount * log(replicas) iterations solves the problem or adjust the number. + // TODO: This algorithm is O(clusterCount^2 * log(replicas)) which is good for up to 100 clusters. + // Find something faster. + replicasAtStart := remainingReplicas + for trial := 0; trial < int(2*(1+math.Log(float64(replicasAtStart)))*float64(len(availableClusters))) && + modified && remainingReplicas > 0; trial++ { + modified = false weightSum := int64(0) for _, preference := range preferences { @@ -104,21 +154,41 @@ func (p *Planner) Plan(replicasToDistribute int64, availableClusters []string) m newPreferences := make([]*namedClusterReplicaSetPreferences, 0, len(preferences)) distributeInThisLoop := remainingReplicas + for _, preference := range preferences { if weightSum > 0 { start := plan[preference.clusterName] // Distribute the remaining replicas, rounding fractions always up. extra := (distributeInThisLoop*preference.Weight + weightSum - 1) / weightSum extra = minInt64(extra, remainingReplicas) + + // Account preallocated. + prealloc := preallocated[preference.clusterName] + usedPrealloc := minInt64(extra, prealloc) + preallocated[preference.clusterName] = prealloc - usedPrealloc + extra = extra - usedPrealloc + if usedPrealloc > 0 { + modified = true + } + // In total there should be the amount that was there at start plus whatever is due // in this iteration total := start + extra // Check if we don't overflow the cluster, and if yes don't consider this cluster // in any of the following iterations. + full := false if preference.MaxReplicas != nil && total > *preference.MaxReplicas { total = *preference.MaxReplicas - } else { + full = true + } + if capacity, hasCapacity := estimatedCapacity[preference.clusterName]; hasCapacity && total > capacity { + overflow[preference.clusterName] = total - capacity + total = capacity + full = true + } + + if !full { newPreferences = append(newPreferences, preference) } @@ -137,7 +207,20 @@ func (p *Planner) Plan(replicasToDistribute int64, availableClusters []string) m preferences = newPreferences } - return plan + if p.preferences.Rebalance { + return plan, overflow + } else { + // If rebalance = false then overflow is trimmed at the level + // of replicas that it failed to place somewhere. + newOverflow := make(map[string]int64) + for key, value := range overflow { + value = minInt64(value, remainingReplicas) + if value > 0 { + newOverflow[key] = value + } + } + return plan, newOverflow + } } func minInt64(a int64, b int64) int64 { diff --git a/federation/pkg/federation-controller/replicaset/planner/planner_test.go b/federation/pkg/federation-controller/replicaset/planner/planner_test.go index 2a9a4e68440..556758a7b70 100644 --- a/federation/pkg/federation-controller/replicaset/planner/planner_test.go +++ b/federation/pkg/federation-controller/replicaset/planner/planner_test.go @@ -28,8 +28,33 @@ func doCheck(t *testing.T, pref map[string]fed_api.ClusterReplicaSetPreferences, planer := NewPlanner(&fed_api.FederatedReplicaSetPreferences{ Clusters: pref, }) - plan := planer.Plan(replicas, clusters) + plan, overflow := planer.Plan(replicas, clusters, map[string]int64{}, map[string]int64{}) assert.EqualValues(t, expected, plan) + assert.Equal(t, 0, len(overflow)) +} + +func doCheckWithExisting(t *testing.T, pref map[string]fed_api.ClusterReplicaSetPreferences, replicas int64, clusters []string, + existing map[string]int64, expected map[string]int64) { + planer := NewPlanner(&fed_api.FederatedReplicaSetPreferences{ + Clusters: pref, + }) + plan, overflow := planer.Plan(replicas, clusters, existing, map[string]int64{}) + assert.Equal(t, 0, len(overflow)) + assert.EqualValues(t, expected, plan) +} + +func doCheckWithExistingAndCapacity(t *testing.T, rebalance bool, pref map[string]fed_api.ClusterReplicaSetPreferences, replicas int64, clusters []string, + existing map[string]int64, + capacity map[string]int64, + expected map[string]int64, + expectedOverflow map[string]int64) { + planer := NewPlanner(&fed_api.FederatedReplicaSetPreferences{ + Rebalance: rebalance, + Clusters: pref, + }) + plan, overflow := planer.Plan(replicas, clusters, existing, capacity) + assert.EqualValues(t, expected, plan) + assert.Equal(t, expectedOverflow, overflow) } func pint(val int64) *int64 { @@ -68,6 +93,132 @@ func TestEqual(t *testing.T) { map[string]int64{}) } +func TestEqualWithExisting(t *testing.T) { + doCheckWithExisting(t, map[string]fed_api.ClusterReplicaSetPreferences{ + "*": {Weight: 1}}, + 50, []string{"A", "B", "C"}, + map[string]int64{"C": 30}, + map[string]int64{"A": 10, "B": 10, "C": 30}) + + doCheckWithExisting(t, map[string]fed_api.ClusterReplicaSetPreferences{ + "*": {Weight: 1}}, + 50, []string{"A", "B"}, + map[string]int64{"A": 30}, + map[string]int64{"A": 30, "B": 20}) + + doCheckWithExisting(t, map[string]fed_api.ClusterReplicaSetPreferences{ + "*": {Weight: 1}}, + 500000, []string{"A", "B"}, + map[string]int64{"A": 300000}, + map[string]int64{"A": 300000, "B": 200000}) + + doCheckWithExisting(t, map[string]fed_api.ClusterReplicaSetPreferences{ + "*": {Weight: 1}}, + 50, []string{"A", "B"}, + map[string]int64{"A": 10}, + map[string]int64{"A": 25, "B": 25}) + + doCheckWithExisting(t, map[string]fed_api.ClusterReplicaSetPreferences{ + "*": {Weight: 1}}, + 50, []string{"A", "B"}, + map[string]int64{"A": 10, "B": 70}, + map[string]int64{"A": 10, "B": 40}) + + doCheckWithExisting(t, map[string]fed_api.ClusterReplicaSetPreferences{ + "*": {Weight: 1}}, + 1, []string{"A", "B"}, + map[string]int64{"A": 30}, + map[string]int64{"A": 1, "B": 0}) + + doCheckWithExisting(t, map[string]fed_api.ClusterReplicaSetPreferences{ + "*": {Weight: 1}}, + 50, []string{"A", "B"}, + map[string]int64{"A": 10, "B": 20}, + map[string]int64{"A": 25, "B": 25}) +} + +func TestWithExistingAndCapacity(t *testing.T) { + // desired without capacity: map[string]int64{"A": 17, "B": 17, "C": 16}) + doCheckWithExistingAndCapacity(t, true, map[string]fed_api.ClusterReplicaSetPreferences{ + "*": {Weight: 1}}, + 50, []string{"A", "B", "C"}, + map[string]int64{}, + map[string]int64{"C": 10}, + map[string]int64{"A": 20, "B": 20, "C": 10}, + map[string]int64{"C": 6}) + + // desired B:50 C:0 + doCheckWithExistingAndCapacity(t, true, map[string]fed_api.ClusterReplicaSetPreferences{ + "A": {Weight: 10000}, + "B": {Weight: 1}}, + 50, []string{"B", "C"}, + map[string]int64{}, + map[string]int64{"B": 10}, + map[string]int64{"B": 10, "C": 0}, + map[string]int64{"B": 40}, + ) + + // desired A:20 B:40 + doCheckWithExistingAndCapacity(t, true, map[string]fed_api.ClusterReplicaSetPreferences{ + "A": {Weight: 1}, + "B": {Weight: 2}}, + 60, []string{"A", "B", "C"}, + map[string]int64{}, + map[string]int64{"B": 10}, + map[string]int64{"A": 50, "B": 10, "C": 0}, + map[string]int64{"B": 30}) + + // map[string]int64{"A": 10, "B": 30, "C": 21, "D": 10}) + doCheckWithExistingAndCapacity(t, true, map[string]fed_api.ClusterReplicaSetPreferences{ + "A": {Weight: 10000, MaxReplicas: pint(10)}, + "B": {Weight: 1}, + "C": {Weight: 1, MaxReplicas: pint(21)}, + "D": {Weight: 1, MaxReplicas: pint(10)}}, + 71, []string{"A", "B", "C", "D"}, + map[string]int64{}, + map[string]int64{"C": 10}, + map[string]int64{"A": 10, "B": 41, "C": 10, "D": 10}, + map[string]int64{"C": 11}, + ) + + // desired A:20 B:20 + doCheckWithExistingAndCapacity(t, false, map[string]fed_api.ClusterReplicaSetPreferences{ + "A": {Weight: 1}, + "B": {Weight: 1}}, + 60, []string{"A", "B", "C"}, + map[string]int64{}, + map[string]int64{"A": 10, "B": 10}, + map[string]int64{"A": 10, "B": 10, "C": 0}, + map[string]int64{"A": 20, "B": 20}) + + // desired A:10 B:50 although A:50 B:10 is fuly acceptable because rebalance = false + doCheckWithExistingAndCapacity(t, false, map[string]fed_api.ClusterReplicaSetPreferences{ + "A": {Weight: 1}, + "B": {Weight: 5}}, + 60, []string{"A", "B", "C"}, + map[string]int64{}, + map[string]int64{"B": 10}, + map[string]int64{"A": 50, "B": 10, "C": 0}, + map[string]int64{}) + + doCheckWithExistingAndCapacity(t, false, map[string]fed_api.ClusterReplicaSetPreferences{ + "*": {MinReplicas: 20, Weight: 0}}, + 50, []string{"A", "B", "C"}, + map[string]int64{}, + map[string]int64{"B": 10}, + map[string]int64{"A": 20, "B": 10, "C": 20}, + map[string]int64{}) + + // Actually we would like to have extra 20 in B but 15 is also good. + doCheckWithExistingAndCapacity(t, true, map[string]fed_api.ClusterReplicaSetPreferences{ + "*": {MinReplicas: 20, Weight: 1}}, + 60, []string{"A", "B"}, + map[string]int64{}, + map[string]int64{"B": 10}, + map[string]int64{"A": 50, "B": 10}, + map[string]int64{"B": 15}) +} + func TestMin(t *testing.T) { doCheck(t, map[string]fed_api.ClusterReplicaSetPreferences{ "*": {MinReplicas: 2, Weight: 0}}, @@ -104,7 +255,6 @@ func TestMax(t *testing.T) { } func TestWeight(t *testing.T) { - doCheck(t, map[string]fed_api.ClusterReplicaSetPreferences{ "A": {Weight: 1}, "B": {Weight: 2}},