From ed92579d2ed014d6ada9825503a0f38a261c6037 Mon Sep 17 00:00:00 2001 From: Marcin Wielgus Date: Fri, 22 Jul 2016 17:07:05 +0200 Subject: [PATCH] Replica location planner for Federated ReplicaSet Controller --- .../replicaset/planner/planner.go | 148 +++++++++++++++++ .../replicaset/planner/planner_test.go | 156 ++++++++++++++++++ 2 files changed, 304 insertions(+) create mode 100644 federation/pkg/federation-controller/replicaset/planner/planner.go create mode 100644 federation/pkg/federation-controller/replicaset/planner/planner_test.go diff --git a/federation/pkg/federation-controller/replicaset/planner/planner.go b/federation/pkg/federation-controller/replicaset/planner/planner.go new file mode 100644 index 00000000000..2c86ec7a2cf --- /dev/null +++ b/federation/pkg/federation-controller/replicaset/planner/planner.go @@ -0,0 +1,148 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package planer + +import ( + "sort" + + fed_api "k8s.io/kubernetes/federation/apis/federation" +) + +// Planner decides how many out of the given replicas should be placed in each of the +// federated clusters. +type Planner struct { + preferences *fed_api.FederatedReplicaSetPreferences +} + +type namedClusterReplicaSetPreferences struct { + clusterName string + fed_api.ClusterReplicaSetPreferences +} + +type byWeight []*namedClusterReplicaSetPreferences + +func (a byWeight) Len() int { return len(a) } +func (a byWeight) Swap(i, j int) { a[i], a[j] = a[j], a[i] } + +// Preferences are sorted according by decreasing weight and increasing clusterName. +func (a byWeight) Less(i, j int) bool { + return (a[i].Weight > a[j].Weight) || (a[i].Weight == a[j].Weight && a[i].clusterName < a[j].clusterName) +} + +func NewPlanner(preferences *fed_api.FederatedReplicaSetPreferences) *Planner { + return &Planner{ + preferences: preferences, + } +} + +// Distribute the desired number of replicas among the given cluster according to the planner preferences. +// The function tries its best to assign each cluster the prefered number of replicas, however if +// sum of MinReplicas for all cluster is bigger thant replicasToDistribute then some cluster will not +// have all of the replicas assigned. In such case a cluster with higher weight has priority over +// cluster with lower weight (or with lexicographically smaller name in case of draw). +func (p *Planner) Plan(replicasToDistribute int64, availableClusters []string) map[string]int64 { + preferences := make([]*namedClusterReplicaSetPreferences, 0, len(availableClusters)) + plan := make(map[string]int64, len(preferences)) + + named := func(name string, pref fed_api.ClusterReplicaSetPreferences) *namedClusterReplicaSetPreferences { + return &namedClusterReplicaSetPreferences{ + clusterName: name, + ClusterReplicaSetPreferences: pref, + } + } + + for _, cluster := range availableClusters { + if localRSP, found := p.preferences.Clusters[cluster]; found { + preferences = append(preferences, named(cluster, localRSP)) + } else { + if localRSP, found := p.preferences.Clusters["*"]; found { + preferences = append(preferences, named(cluster, localRSP)) + } else { + plan[cluster] = int64(0) + } + } + } + sort.Sort(byWeight(preferences)) + + remainingReplicas := replicasToDistribute + + // Assign each cluster the minimum number of replicas it requested. + for _, preference := range preferences { + min := minInt64(preference.MinReplicas, remainingReplicas) + remainingReplicas -= min + plan[preference.clusterName] = min + } + + modified := true + + // It is possible single pass of the loop is not enough to distribue all replicas among clusters due + // to weight, max and rounding corner cases. In such case we iterate until either + // there is no replicas or no cluster gets any more replicas or the number + // of attempts is less than available cluster count. Every loop either distributes all remainingReplicas + // or maxes out at least one cluster. + // TODO: This algorithm is O(clusterCount^2). When needed use sweep-like algorithm for O(n log n). + for trial := 0; trial < len(availableClusters) && modified && remainingReplicas > 0; trial++ { + modified = false + weightSum := int64(0) + for _, preference := range preferences { + weightSum += preference.Weight + } + newPreferences := make([]*namedClusterReplicaSetPreferences, 0, len(preferences)) + + distributeInThisLoop := remainingReplicas + for _, preference := range preferences { + if weightSum > 0 { + start := plan[preference.clusterName] + // Distribute the remaining replicas, rounding fractions always up. + extra := (distributeInThisLoop*preference.Weight + weightSum - 1) / weightSum + extra = minInt64(extra, remainingReplicas) + // In total there should be the amount that was there at start plus whatever is due + // in this iteration + total := start + extra + + // Check if we don't overflow the cluster, and if yes don't consider this cluster + // in any of the following iterations. + if preference.MaxReplicas != nil && total > *preference.MaxReplicas { + total = *preference.MaxReplicas + } else { + newPreferences = append(newPreferences, preference) + } + + // Only total-start replicas were actually taken. + remainingReplicas -= (total - start) + plan[preference.clusterName] = total + + // Something extra got scheduled on this cluster. + if total > start { + modified = true + } + } else { + break + } + } + preferences = newPreferences + } + + return plan +} + +func minInt64(a int64, b int64) int64 { + if a < b { + return a + } + return b +} diff --git a/federation/pkg/federation-controller/replicaset/planner/planner_test.go b/federation/pkg/federation-controller/replicaset/planner/planner_test.go new file mode 100644 index 00000000000..2a9a4e68440 --- /dev/null +++ b/federation/pkg/federation-controller/replicaset/planner/planner_test.go @@ -0,0 +1,156 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package planer + +import ( + "testing" + + fed_api "k8s.io/kubernetes/federation/apis/federation" + + "github.com/stretchr/testify/assert" +) + +func doCheck(t *testing.T, pref map[string]fed_api.ClusterReplicaSetPreferences, replicas int64, clusters []string, expected map[string]int64) { + planer := NewPlanner(&fed_api.FederatedReplicaSetPreferences{ + Clusters: pref, + }) + plan := planer.Plan(replicas, clusters) + assert.EqualValues(t, expected, plan) +} + +func pint(val int64) *int64 { + return &val +} + +func TestEqual(t *testing.T) { + doCheck(t, map[string]fed_api.ClusterReplicaSetPreferences{ + "*": {Weight: 1}}, + 50, []string{"A", "B", "C"}, + map[string]int64{"A": 17, "B": 17, "C": 16}) + + doCheck(t, map[string]fed_api.ClusterReplicaSetPreferences{ + "*": {Weight: 1}}, + 50, []string{"A", "B"}, + map[string]int64{"A": 25, "B": 25}) + + doCheck(t, map[string]fed_api.ClusterReplicaSetPreferences{ + "*": {Weight: 1}}, + 1, []string{"A", "B"}, + map[string]int64{"A": 1, "B": 0}) + + doCheck(t, map[string]fed_api.ClusterReplicaSetPreferences{ + "*": {Weight: 1}}, + 1, []string{"A", "B", "C", "D"}, + map[string]int64{"A": 1, "B": 0, "C": 0, "D": 0}) + + doCheck(t, map[string]fed_api.ClusterReplicaSetPreferences{ + "*": {Weight: 1}}, + 1, []string{"A"}, + map[string]int64{"A": 1}) + + doCheck(t, map[string]fed_api.ClusterReplicaSetPreferences{ + "*": {Weight: 1}}, + 1, []string{}, + map[string]int64{}) +} + +func TestMin(t *testing.T) { + doCheck(t, map[string]fed_api.ClusterReplicaSetPreferences{ + "*": {MinReplicas: 2, Weight: 0}}, + 50, []string{"A", "B", "C"}, + map[string]int64{"A": 2, "B": 2, "C": 2}) + + doCheck(t, map[string]fed_api.ClusterReplicaSetPreferences{ + "*": {MinReplicas: 20, Weight: 0}}, + 50, []string{"A", "B", "C"}, + map[string]int64{"A": 20, "B": 20, "C": 10}) + + doCheck(t, map[string]fed_api.ClusterReplicaSetPreferences{ + "*": {MinReplicas: 20, Weight: 0}, + "A": {MinReplicas: 100, Weight: 1}}, + 50, []string{"A", "B", "C"}, + map[string]int64{"A": 50, "B": 0, "C": 0}) + + doCheck(t, map[string]fed_api.ClusterReplicaSetPreferences{ + "*": {MinReplicas: 10, Weight: 1, MaxReplicas: pint(12)}}, + 50, []string{"A", "B", "C"}, + map[string]int64{"A": 12, "B": 12, "C": 12}) +} + +func TestMax(t *testing.T) { + doCheck(t, map[string]fed_api.ClusterReplicaSetPreferences{ + "*": {Weight: 1, MaxReplicas: pint(2)}}, + 50, []string{"A", "B", "C"}, + map[string]int64{"A": 2, "B": 2, "C": 2}) + + doCheck(t, map[string]fed_api.ClusterReplicaSetPreferences{ + "*": {Weight: 0, MaxReplicas: pint(2)}}, + 50, []string{"A", "B", "C"}, + map[string]int64{"A": 0, "B": 0, "C": 0}) +} + +func TestWeight(t *testing.T) { + + doCheck(t, map[string]fed_api.ClusterReplicaSetPreferences{ + "A": {Weight: 1}, + "B": {Weight: 2}}, + 60, []string{"A", "B", "C"}, + map[string]int64{"A": 20, "B": 40, "C": 0}) + + doCheck(t, map[string]fed_api.ClusterReplicaSetPreferences{ + "A": {Weight: 10000}, + "B": {Weight: 1}}, + 50, []string{"A", "B", "C"}, + map[string]int64{"A": 50, "B": 0, "C": 0}) + + doCheck(t, map[string]fed_api.ClusterReplicaSetPreferences{ + "A": {Weight: 10000}, + "B": {Weight: 1}}, + 50, []string{"B", "C"}, + map[string]int64{"B": 50, "C": 0}) + + doCheck(t, map[string]fed_api.ClusterReplicaSetPreferences{ + "A": {Weight: 10000, MaxReplicas: pint(10)}, + "B": {Weight: 1}, + "C": {Weight: 1}}, + 50, []string{"A", "B", "C"}, + map[string]int64{"A": 10, "B": 20, "C": 20}) + + doCheck(t, map[string]fed_api.ClusterReplicaSetPreferences{ + "A": {Weight: 10000, MaxReplicas: pint(10)}, + "B": {Weight: 1}, + "C": {Weight: 1, MaxReplicas: pint(10)}}, + 50, []string{"A", "B", "C"}, + map[string]int64{"A": 10, "B": 30, "C": 10}) + + doCheck(t, map[string]fed_api.ClusterReplicaSetPreferences{ + "A": {Weight: 10000, MaxReplicas: pint(10)}, + "B": {Weight: 1}, + "C": {Weight: 1, MaxReplicas: pint(21)}, + "D": {Weight: 1, MaxReplicas: pint(10)}}, + 71, []string{"A", "B", "C", "D"}, + map[string]int64{"A": 10, "B": 30, "C": 21, "D": 10}) + + doCheck(t, map[string]fed_api.ClusterReplicaSetPreferences{ + "A": {Weight: 10000, MaxReplicas: pint(10)}, + "B": {Weight: 1}, + "C": {Weight: 1, MaxReplicas: pint(21)}, + "D": {Weight: 1, MaxReplicas: pint(10)}, + "E": {Weight: 1}}, + 91, []string{"A", "B", "C", "D", "E"}, + map[string]int64{"A": 10, "B": 25, "C": 21, "D": 10, "E": 25}) +}