mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-01 07:47:56 +00:00
Support for preexisting replicas and estimated capacity in federated replica set planner
This commit is contained in:
parent
214c916045
commit
8dfdc5b612
@ -17,6 +17,7 @@ limitations under the License.
|
|||||||
package planer
|
package planer
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"math"
|
||||||
"sort"
|
"sort"
|
||||||
|
|
||||||
fed_api "k8s.io/kubernetes/federation/apis/federation"
|
fed_api "k8s.io/kubernetes/federation/apis/federation"
|
||||||
@ -54,9 +55,18 @@ func NewPlanner(preferences *fed_api.FederatedReplicaSetPreferences) *Planner {
|
|||||||
// sum of MinReplicas for all cluster is bigger thant replicasToDistribute then some cluster will not
|
// sum of MinReplicas for all cluster is bigger thant replicasToDistribute then some cluster will not
|
||||||
// have all of the replicas assigned. In such case a cluster with higher weight has priority over
|
// have all of the replicas assigned. In such case a cluster with higher weight has priority over
|
||||||
// cluster with lower weight (or with lexicographically smaller name in case of draw).
|
// cluster with lower weight (or with lexicographically smaller name in case of draw).
|
||||||
func (p *Planner) Plan(replicasToDistribute int64, availableClusters []string) map[string]int64 {
|
// It can also use the current replica count and estimated capacity to provide better planning and
|
||||||
|
// adhere to rebalance policy.
|
||||||
|
// Two maps are returned:
|
||||||
|
// * a map that contains information how many replicas will be possible to run in a cluster.
|
||||||
|
// * a map that contains information how many extra replicas would be nice to schedule in a cluster so,
|
||||||
|
// if by chance, they are scheudled we will be closer to the desired replicas layout.
|
||||||
|
func (p *Planner) Plan(replicasToDistribute int64, availableClusters []string, currentReplicaCount map[string]int64,
|
||||||
|
estimatedCapacity map[string]int64) (map[string]int64, map[string]int64) {
|
||||||
|
|
||||||
preferences := make([]*namedClusterReplicaSetPreferences, 0, len(availableClusters))
|
preferences := make([]*namedClusterReplicaSetPreferences, 0, len(availableClusters))
|
||||||
plan := make(map[string]int64, len(preferences))
|
plan := make(map[string]int64, len(preferences))
|
||||||
|
overflow := make(map[string]int64, len(preferences))
|
||||||
|
|
||||||
named := func(name string, pref fed_api.ClusterReplicaSetPreferences) *namedClusterReplicaSetPreferences {
|
named := func(name string, pref fed_api.ClusterReplicaSetPreferences) *namedClusterReplicaSetPreferences {
|
||||||
return &namedClusterReplicaSetPreferences{
|
return &namedClusterReplicaSetPreferences{
|
||||||
@ -83,19 +93,59 @@ func (p *Planner) Plan(replicasToDistribute int64, availableClusters []string) m
|
|||||||
// Assign each cluster the minimum number of replicas it requested.
|
// Assign each cluster the minimum number of replicas it requested.
|
||||||
for _, preference := range preferences {
|
for _, preference := range preferences {
|
||||||
min := minInt64(preference.MinReplicas, remainingReplicas)
|
min := minInt64(preference.MinReplicas, remainingReplicas)
|
||||||
|
if capacity, hasCapacity := estimatedCapacity[preference.clusterName]; hasCapacity {
|
||||||
|
min = minInt64(min, capacity)
|
||||||
|
}
|
||||||
remainingReplicas -= min
|
remainingReplicas -= min
|
||||||
plan[preference.clusterName] = min
|
plan[preference.clusterName] = min
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// This map contains information how many replicas were assigned to
|
||||||
|
// the cluster based only on the current replica count and
|
||||||
|
// rebalance=false preference. It will be later used in remaining replica
|
||||||
|
// distribution code.
|
||||||
|
preallocated := make(map[string]int64)
|
||||||
|
|
||||||
|
if p.preferences.Rebalance == false {
|
||||||
|
for _, preference := range preferences {
|
||||||
|
planned := plan[preference.clusterName]
|
||||||
|
count, hasSome := currentReplicaCount[preference.clusterName]
|
||||||
|
if hasSome && count > planned {
|
||||||
|
target := count
|
||||||
|
if preference.MaxReplicas != nil {
|
||||||
|
target = minInt64(*preference.MaxReplicas, target)
|
||||||
|
}
|
||||||
|
if capacity, hasCapacity := estimatedCapacity[preference.clusterName]; hasCapacity {
|
||||||
|
target = minInt64(capacity, target)
|
||||||
|
}
|
||||||
|
extra := minInt64(target-planned, remainingReplicas)
|
||||||
|
if extra < 0 {
|
||||||
|
extra = 0
|
||||||
|
}
|
||||||
|
remainingReplicas -= extra
|
||||||
|
preallocated[preference.clusterName] = extra
|
||||||
|
plan[preference.clusterName] = extra + planned
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
modified := true
|
modified := true
|
||||||
|
|
||||||
// It is possible single pass of the loop is not enough to distribue all replicas among clusters due
|
// It is possible single pass of the loop is not enough to distribue all replicas among clusters due
|
||||||
// to weight, max and rounding corner cases. In such case we iterate until either
|
// to weight, max and rounding corner cases. In such case we iterate until either
|
||||||
// there is no replicas or no cluster gets any more replicas or the number
|
// there is no replicas or no cluster gets any more replicas or the number
|
||||||
// of attempts is less than available cluster count. Every loop either distributes all remainingReplicas
|
// of attempts is less than available cluster count. If there is no preallocated pods
|
||||||
// or maxes out at least one cluster.
|
// every loop either distributes all remainingReplicas or maxes out at least one cluster.
|
||||||
// TODO: This algorithm is O(clusterCount^2). When needed use sweep-like algorithm for O(n log n).
|
// If there are preallocated then the replica spreading may take longer.
|
||||||
for trial := 0; trial < len(availableClusters) && modified && remainingReplicas > 0; trial++ {
|
// We reduce the number of pending preallocated replicas by at least half with each iteration so
|
||||||
|
// we may need log(replicasAtStart) iterations.
|
||||||
|
// TODO: Prove that clusterCount * log(replicas) iterations solves the problem or adjust the number.
|
||||||
|
// TODO: This algorithm is O(clusterCount^2 * log(replicas)) which is good for up to 100 clusters.
|
||||||
|
// Find something faster.
|
||||||
|
replicasAtStart := remainingReplicas
|
||||||
|
for trial := 0; trial < int(2*(1+math.Log(float64(replicasAtStart)))*float64(len(availableClusters))) &&
|
||||||
|
modified && remainingReplicas > 0; trial++ {
|
||||||
|
|
||||||
modified = false
|
modified = false
|
||||||
weightSum := int64(0)
|
weightSum := int64(0)
|
||||||
for _, preference := range preferences {
|
for _, preference := range preferences {
|
||||||
@ -104,21 +154,41 @@ func (p *Planner) Plan(replicasToDistribute int64, availableClusters []string) m
|
|||||||
newPreferences := make([]*namedClusterReplicaSetPreferences, 0, len(preferences))
|
newPreferences := make([]*namedClusterReplicaSetPreferences, 0, len(preferences))
|
||||||
|
|
||||||
distributeInThisLoop := remainingReplicas
|
distributeInThisLoop := remainingReplicas
|
||||||
|
|
||||||
for _, preference := range preferences {
|
for _, preference := range preferences {
|
||||||
if weightSum > 0 {
|
if weightSum > 0 {
|
||||||
start := plan[preference.clusterName]
|
start := plan[preference.clusterName]
|
||||||
// Distribute the remaining replicas, rounding fractions always up.
|
// Distribute the remaining replicas, rounding fractions always up.
|
||||||
extra := (distributeInThisLoop*preference.Weight + weightSum - 1) / weightSum
|
extra := (distributeInThisLoop*preference.Weight + weightSum - 1) / weightSum
|
||||||
extra = minInt64(extra, remainingReplicas)
|
extra = minInt64(extra, remainingReplicas)
|
||||||
|
|
||||||
|
// Account preallocated.
|
||||||
|
prealloc := preallocated[preference.clusterName]
|
||||||
|
usedPrealloc := minInt64(extra, prealloc)
|
||||||
|
preallocated[preference.clusterName] = prealloc - usedPrealloc
|
||||||
|
extra = extra - usedPrealloc
|
||||||
|
if usedPrealloc > 0 {
|
||||||
|
modified = true
|
||||||
|
}
|
||||||
|
|
||||||
// In total there should be the amount that was there at start plus whatever is due
|
// In total there should be the amount that was there at start plus whatever is due
|
||||||
// in this iteration
|
// in this iteration
|
||||||
total := start + extra
|
total := start + extra
|
||||||
|
|
||||||
// Check if we don't overflow the cluster, and if yes don't consider this cluster
|
// Check if we don't overflow the cluster, and if yes don't consider this cluster
|
||||||
// in any of the following iterations.
|
// in any of the following iterations.
|
||||||
|
full := false
|
||||||
if preference.MaxReplicas != nil && total > *preference.MaxReplicas {
|
if preference.MaxReplicas != nil && total > *preference.MaxReplicas {
|
||||||
total = *preference.MaxReplicas
|
total = *preference.MaxReplicas
|
||||||
} else {
|
full = true
|
||||||
|
}
|
||||||
|
if capacity, hasCapacity := estimatedCapacity[preference.clusterName]; hasCapacity && total > capacity {
|
||||||
|
overflow[preference.clusterName] = total - capacity
|
||||||
|
total = capacity
|
||||||
|
full = true
|
||||||
|
}
|
||||||
|
|
||||||
|
if !full {
|
||||||
newPreferences = append(newPreferences, preference)
|
newPreferences = append(newPreferences, preference)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -137,7 +207,20 @@ func (p *Planner) Plan(replicasToDistribute int64, availableClusters []string) m
|
|||||||
preferences = newPreferences
|
preferences = newPreferences
|
||||||
}
|
}
|
||||||
|
|
||||||
return plan
|
if p.preferences.Rebalance {
|
||||||
|
return plan, overflow
|
||||||
|
} else {
|
||||||
|
// If rebalance = false then overflow is trimmed at the level
|
||||||
|
// of replicas that it failed to place somewhere.
|
||||||
|
newOverflow := make(map[string]int64)
|
||||||
|
for key, value := range overflow {
|
||||||
|
value = minInt64(value, remainingReplicas)
|
||||||
|
if value > 0 {
|
||||||
|
newOverflow[key] = value
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return plan, newOverflow
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func minInt64(a int64, b int64) int64 {
|
func minInt64(a int64, b int64) int64 {
|
||||||
|
@ -28,8 +28,33 @@ func doCheck(t *testing.T, pref map[string]fed_api.ClusterReplicaSetPreferences,
|
|||||||
planer := NewPlanner(&fed_api.FederatedReplicaSetPreferences{
|
planer := NewPlanner(&fed_api.FederatedReplicaSetPreferences{
|
||||||
Clusters: pref,
|
Clusters: pref,
|
||||||
})
|
})
|
||||||
plan := planer.Plan(replicas, clusters)
|
plan, overflow := planer.Plan(replicas, clusters, map[string]int64{}, map[string]int64{})
|
||||||
assert.EqualValues(t, expected, plan)
|
assert.EqualValues(t, expected, plan)
|
||||||
|
assert.Equal(t, 0, len(overflow))
|
||||||
|
}
|
||||||
|
|
||||||
|
func doCheckWithExisting(t *testing.T, pref map[string]fed_api.ClusterReplicaSetPreferences, replicas int64, clusters []string,
|
||||||
|
existing map[string]int64, expected map[string]int64) {
|
||||||
|
planer := NewPlanner(&fed_api.FederatedReplicaSetPreferences{
|
||||||
|
Clusters: pref,
|
||||||
|
})
|
||||||
|
plan, overflow := planer.Plan(replicas, clusters, existing, map[string]int64{})
|
||||||
|
assert.Equal(t, 0, len(overflow))
|
||||||
|
assert.EqualValues(t, expected, plan)
|
||||||
|
}
|
||||||
|
|
||||||
|
func doCheckWithExistingAndCapacity(t *testing.T, rebalance bool, pref map[string]fed_api.ClusterReplicaSetPreferences, replicas int64, clusters []string,
|
||||||
|
existing map[string]int64,
|
||||||
|
capacity map[string]int64,
|
||||||
|
expected map[string]int64,
|
||||||
|
expectedOverflow map[string]int64) {
|
||||||
|
planer := NewPlanner(&fed_api.FederatedReplicaSetPreferences{
|
||||||
|
Rebalance: rebalance,
|
||||||
|
Clusters: pref,
|
||||||
|
})
|
||||||
|
plan, overflow := planer.Plan(replicas, clusters, existing, capacity)
|
||||||
|
assert.EqualValues(t, expected, plan)
|
||||||
|
assert.Equal(t, expectedOverflow, overflow)
|
||||||
}
|
}
|
||||||
|
|
||||||
func pint(val int64) *int64 {
|
func pint(val int64) *int64 {
|
||||||
@ -68,6 +93,132 @@ func TestEqual(t *testing.T) {
|
|||||||
map[string]int64{})
|
map[string]int64{})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestEqualWithExisting(t *testing.T) {
|
||||||
|
doCheckWithExisting(t, map[string]fed_api.ClusterReplicaSetPreferences{
|
||||||
|
"*": {Weight: 1}},
|
||||||
|
50, []string{"A", "B", "C"},
|
||||||
|
map[string]int64{"C": 30},
|
||||||
|
map[string]int64{"A": 10, "B": 10, "C": 30})
|
||||||
|
|
||||||
|
doCheckWithExisting(t, map[string]fed_api.ClusterReplicaSetPreferences{
|
||||||
|
"*": {Weight: 1}},
|
||||||
|
50, []string{"A", "B"},
|
||||||
|
map[string]int64{"A": 30},
|
||||||
|
map[string]int64{"A": 30, "B": 20})
|
||||||
|
|
||||||
|
doCheckWithExisting(t, map[string]fed_api.ClusterReplicaSetPreferences{
|
||||||
|
"*": {Weight: 1}},
|
||||||
|
500000, []string{"A", "B"},
|
||||||
|
map[string]int64{"A": 300000},
|
||||||
|
map[string]int64{"A": 300000, "B": 200000})
|
||||||
|
|
||||||
|
doCheckWithExisting(t, map[string]fed_api.ClusterReplicaSetPreferences{
|
||||||
|
"*": {Weight: 1}},
|
||||||
|
50, []string{"A", "B"},
|
||||||
|
map[string]int64{"A": 10},
|
||||||
|
map[string]int64{"A": 25, "B": 25})
|
||||||
|
|
||||||
|
doCheckWithExisting(t, map[string]fed_api.ClusterReplicaSetPreferences{
|
||||||
|
"*": {Weight: 1}},
|
||||||
|
50, []string{"A", "B"},
|
||||||
|
map[string]int64{"A": 10, "B": 70},
|
||||||
|
map[string]int64{"A": 10, "B": 40})
|
||||||
|
|
||||||
|
doCheckWithExisting(t, map[string]fed_api.ClusterReplicaSetPreferences{
|
||||||
|
"*": {Weight: 1}},
|
||||||
|
1, []string{"A", "B"},
|
||||||
|
map[string]int64{"A": 30},
|
||||||
|
map[string]int64{"A": 1, "B": 0})
|
||||||
|
|
||||||
|
doCheckWithExisting(t, map[string]fed_api.ClusterReplicaSetPreferences{
|
||||||
|
"*": {Weight: 1}},
|
||||||
|
50, []string{"A", "B"},
|
||||||
|
map[string]int64{"A": 10, "B": 20},
|
||||||
|
map[string]int64{"A": 25, "B": 25})
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestWithExistingAndCapacity(t *testing.T) {
|
||||||
|
// desired without capacity: map[string]int64{"A": 17, "B": 17, "C": 16})
|
||||||
|
doCheckWithExistingAndCapacity(t, true, map[string]fed_api.ClusterReplicaSetPreferences{
|
||||||
|
"*": {Weight: 1}},
|
||||||
|
50, []string{"A", "B", "C"},
|
||||||
|
map[string]int64{},
|
||||||
|
map[string]int64{"C": 10},
|
||||||
|
map[string]int64{"A": 20, "B": 20, "C": 10},
|
||||||
|
map[string]int64{"C": 6})
|
||||||
|
|
||||||
|
// desired B:50 C:0
|
||||||
|
doCheckWithExistingAndCapacity(t, true, map[string]fed_api.ClusterReplicaSetPreferences{
|
||||||
|
"A": {Weight: 10000},
|
||||||
|
"B": {Weight: 1}},
|
||||||
|
50, []string{"B", "C"},
|
||||||
|
map[string]int64{},
|
||||||
|
map[string]int64{"B": 10},
|
||||||
|
map[string]int64{"B": 10, "C": 0},
|
||||||
|
map[string]int64{"B": 40},
|
||||||
|
)
|
||||||
|
|
||||||
|
// desired A:20 B:40
|
||||||
|
doCheckWithExistingAndCapacity(t, true, map[string]fed_api.ClusterReplicaSetPreferences{
|
||||||
|
"A": {Weight: 1},
|
||||||
|
"B": {Weight: 2}},
|
||||||
|
60, []string{"A", "B", "C"},
|
||||||
|
map[string]int64{},
|
||||||
|
map[string]int64{"B": 10},
|
||||||
|
map[string]int64{"A": 50, "B": 10, "C": 0},
|
||||||
|
map[string]int64{"B": 30})
|
||||||
|
|
||||||
|
// map[string]int64{"A": 10, "B": 30, "C": 21, "D": 10})
|
||||||
|
doCheckWithExistingAndCapacity(t, true, map[string]fed_api.ClusterReplicaSetPreferences{
|
||||||
|
"A": {Weight: 10000, MaxReplicas: pint(10)},
|
||||||
|
"B": {Weight: 1},
|
||||||
|
"C": {Weight: 1, MaxReplicas: pint(21)},
|
||||||
|
"D": {Weight: 1, MaxReplicas: pint(10)}},
|
||||||
|
71, []string{"A", "B", "C", "D"},
|
||||||
|
map[string]int64{},
|
||||||
|
map[string]int64{"C": 10},
|
||||||
|
map[string]int64{"A": 10, "B": 41, "C": 10, "D": 10},
|
||||||
|
map[string]int64{"C": 11},
|
||||||
|
)
|
||||||
|
|
||||||
|
// desired A:20 B:20
|
||||||
|
doCheckWithExistingAndCapacity(t, false, map[string]fed_api.ClusterReplicaSetPreferences{
|
||||||
|
"A": {Weight: 1},
|
||||||
|
"B": {Weight: 1}},
|
||||||
|
60, []string{"A", "B", "C"},
|
||||||
|
map[string]int64{},
|
||||||
|
map[string]int64{"A": 10, "B": 10},
|
||||||
|
map[string]int64{"A": 10, "B": 10, "C": 0},
|
||||||
|
map[string]int64{"A": 20, "B": 20})
|
||||||
|
|
||||||
|
// desired A:10 B:50 although A:50 B:10 is fuly acceptable because rebalance = false
|
||||||
|
doCheckWithExistingAndCapacity(t, false, map[string]fed_api.ClusterReplicaSetPreferences{
|
||||||
|
"A": {Weight: 1},
|
||||||
|
"B": {Weight: 5}},
|
||||||
|
60, []string{"A", "B", "C"},
|
||||||
|
map[string]int64{},
|
||||||
|
map[string]int64{"B": 10},
|
||||||
|
map[string]int64{"A": 50, "B": 10, "C": 0},
|
||||||
|
map[string]int64{})
|
||||||
|
|
||||||
|
doCheckWithExistingAndCapacity(t, false, map[string]fed_api.ClusterReplicaSetPreferences{
|
||||||
|
"*": {MinReplicas: 20, Weight: 0}},
|
||||||
|
50, []string{"A", "B", "C"},
|
||||||
|
map[string]int64{},
|
||||||
|
map[string]int64{"B": 10},
|
||||||
|
map[string]int64{"A": 20, "B": 10, "C": 20},
|
||||||
|
map[string]int64{})
|
||||||
|
|
||||||
|
// Actually we would like to have extra 20 in B but 15 is also good.
|
||||||
|
doCheckWithExistingAndCapacity(t, true, map[string]fed_api.ClusterReplicaSetPreferences{
|
||||||
|
"*": {MinReplicas: 20, Weight: 1}},
|
||||||
|
60, []string{"A", "B"},
|
||||||
|
map[string]int64{},
|
||||||
|
map[string]int64{"B": 10},
|
||||||
|
map[string]int64{"A": 50, "B": 10},
|
||||||
|
map[string]int64{"B": 15})
|
||||||
|
}
|
||||||
|
|
||||||
func TestMin(t *testing.T) {
|
func TestMin(t *testing.T) {
|
||||||
doCheck(t, map[string]fed_api.ClusterReplicaSetPreferences{
|
doCheck(t, map[string]fed_api.ClusterReplicaSetPreferences{
|
||||||
"*": {MinReplicas: 2, Weight: 0}},
|
"*": {MinReplicas: 2, Weight: 0}},
|
||||||
@ -104,7 +255,6 @@ func TestMax(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestWeight(t *testing.T) {
|
func TestWeight(t *testing.T) {
|
||||||
|
|
||||||
doCheck(t, map[string]fed_api.ClusterReplicaSetPreferences{
|
doCheck(t, map[string]fed_api.ClusterReplicaSetPreferences{
|
||||||
"A": {Weight: 1},
|
"A": {Weight: 1},
|
||||||
"B": {Weight: 2}},
|
"B": {Weight: 2}},
|
||||||
|
Loading…
Reference in New Issue
Block a user