Merge pull request #31814 from mwielgus/add-hash-to-planer

Automatic merge from submit-queue

Use hash in federated replica set planner

Without this all replicaset of size 1 would land in the same cluster if the default preferences (weight=1) are used. 

cc: @jianhuiz  @quinton-hoole @kubernetes/sig-cluster-federation
This commit is contained in:
Kubernetes Submit Queue 2016-09-01 08:20:17 -07:00 committed by GitHub
commit 2907015b58
3 changed files with 32 additions and 15 deletions

View File

@ -17,6 +17,7 @@ limitations under the License.
package planer package planer
import ( import (
"hash/fnv"
"math" "math"
"sort" "sort"
@ -31,6 +32,7 @@ type Planner struct {
type namedClusterReplicaSetPreferences struct { type namedClusterReplicaSetPreferences struct {
clusterName string clusterName string
hash uint32
fed_api.ClusterReplicaSetPreferences fed_api.ClusterReplicaSetPreferences
} }
@ -39,9 +41,10 @@ type byWeight []*namedClusterReplicaSetPreferences
func (a byWeight) Len() int { return len(a) } func (a byWeight) Len() int { return len(a) }
func (a byWeight) Swap(i, j int) { a[i], a[j] = a[j], a[i] } func (a byWeight) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
// Preferences are sorted according by decreasing weight and increasing clusterName. // Preferences are sorted according by decreasing weight and increasing hash (built on top of cluster name and rs name).
// Sorting is made by a hash to avoid assigning single-replica rs to the alphabetically smallest cluster.
func (a byWeight) Less(i, j int) bool { func (a byWeight) Less(i, j int) bool {
return (a[i].Weight > a[j].Weight) || (a[i].Weight == a[j].Weight && a[i].clusterName < a[j].clusterName) return (a[i].Weight > a[j].Weight) || (a[i].Weight == a[j].Weight && a[i].hash < a[j].hash)
} }
func NewPlanner(preferences *fed_api.FederatedReplicaSetPreferences) *Planner { func NewPlanner(preferences *fed_api.FederatedReplicaSetPreferences) *Planner {
@ -56,21 +59,28 @@ func NewPlanner(preferences *fed_api.FederatedReplicaSetPreferences) *Planner {
// have all of the replicas assigned. In such case a cluster with higher weight has priority over // have all of the replicas assigned. In such case a cluster with higher weight has priority over
// cluster with lower weight (or with lexicographically smaller name in case of draw). // cluster with lower weight (or with lexicographically smaller name in case of draw).
// It can also use the current replica count and estimated capacity to provide better planning and // It can also use the current replica count and estimated capacity to provide better planning and
// adhere to rebalance policy. // adhere to rebalance policy. To avoid prioritization of clusters with smaller lexiconographical names
// a semi-random string (like replica set name) can be provided.
// Two maps are returned: // Two maps are returned:
// * a map that contains information how many replicas will be possible to run in a cluster. // * a map that contains information how many replicas will be possible to run in a cluster.
// * a map that contains information how many extra replicas would be nice to schedule in a cluster so, // * a map that contains information how many extra replicas would be nice to schedule in a cluster so,
// if by chance, they are scheudled we will be closer to the desired replicas layout. // if by chance, they are scheudled we will be closer to the desired replicas layout.
func (p *Planner) Plan(replicasToDistribute int64, availableClusters []string, currentReplicaCount map[string]int64, func (p *Planner) Plan(replicasToDistribute int64, availableClusters []string, currentReplicaCount map[string]int64,
estimatedCapacity map[string]int64) (map[string]int64, map[string]int64) { estimatedCapacity map[string]int64, replicaSetKey string) (map[string]int64, map[string]int64) {
preferences := make([]*namedClusterReplicaSetPreferences, 0, len(availableClusters)) preferences := make([]*namedClusterReplicaSetPreferences, 0, len(availableClusters))
plan := make(map[string]int64, len(preferences)) plan := make(map[string]int64, len(preferences))
overflow := make(map[string]int64, len(preferences)) overflow := make(map[string]int64, len(preferences))
named := func(name string, pref fed_api.ClusterReplicaSetPreferences) *namedClusterReplicaSetPreferences { named := func(name string, pref fed_api.ClusterReplicaSetPreferences) *namedClusterReplicaSetPreferences {
// Seems to work better than addler for our case.
hasher := fnv.New32()
hasher.Write([]byte(name))
hasher.Write([]byte(replicaSetKey))
return &namedClusterReplicaSetPreferences{ return &namedClusterReplicaSetPreferences{
clusterName: name, clusterName: name,
hash: hasher.Sum32(),
ClusterReplicaSetPreferences: pref, ClusterReplicaSetPreferences: pref,
} }
} }

View File

@ -28,7 +28,7 @@ func doCheck(t *testing.T, pref map[string]fed_api.ClusterReplicaSetPreferences,
planer := NewPlanner(&fed_api.FederatedReplicaSetPreferences{ planer := NewPlanner(&fed_api.FederatedReplicaSetPreferences{
Clusters: pref, Clusters: pref,
}) })
plan, overflow := planer.Plan(replicas, clusters, map[string]int64{}, map[string]int64{}) plan, overflow := planer.Plan(replicas, clusters, map[string]int64{}, map[string]int64{}, "")
assert.EqualValues(t, expected, plan) assert.EqualValues(t, expected, plan)
assert.Equal(t, 0, len(overflow)) assert.Equal(t, 0, len(overflow))
} }
@ -38,7 +38,7 @@ func doCheckWithExisting(t *testing.T, pref map[string]fed_api.ClusterReplicaSet
planer := NewPlanner(&fed_api.FederatedReplicaSetPreferences{ planer := NewPlanner(&fed_api.FederatedReplicaSetPreferences{
Clusters: pref, Clusters: pref,
}) })
plan, overflow := planer.Plan(replicas, clusters, existing, map[string]int64{}) plan, overflow := planer.Plan(replicas, clusters, existing, map[string]int64{}, "")
assert.Equal(t, 0, len(overflow)) assert.Equal(t, 0, len(overflow))
assert.EqualValues(t, expected, plan) assert.EqualValues(t, expected, plan)
} }
@ -52,7 +52,7 @@ func doCheckWithExistingAndCapacity(t *testing.T, rebalance bool, pref map[strin
Rebalance: rebalance, Rebalance: rebalance,
Clusters: pref, Clusters: pref,
}) })
plan, overflow := planer.Plan(replicas, clusters, existing, capacity) plan, overflow := planer.Plan(replicas, clusters, existing, capacity, "")
assert.EqualValues(t, expected, plan) assert.EqualValues(t, expected, plan)
assert.Equal(t, expectedOverflow, overflow) assert.Equal(t, expectedOverflow, overflow)
} }
@ -65,7 +65,8 @@ func TestEqual(t *testing.T) {
doCheck(t, map[string]fed_api.ClusterReplicaSetPreferences{ doCheck(t, map[string]fed_api.ClusterReplicaSetPreferences{
"*": {Weight: 1}}, "*": {Weight: 1}},
50, []string{"A", "B", "C"}, 50, []string{"A", "B", "C"},
map[string]int64{"A": 17, "B": 17, "C": 16}) // hash dependent
map[string]int64{"A": 16, "B": 17, "C": 17})
doCheck(t, map[string]fed_api.ClusterReplicaSetPreferences{ doCheck(t, map[string]fed_api.ClusterReplicaSetPreferences{
"*": {Weight: 1}}, "*": {Weight: 1}},
@ -75,12 +76,14 @@ func TestEqual(t *testing.T) {
doCheck(t, map[string]fed_api.ClusterReplicaSetPreferences{ doCheck(t, map[string]fed_api.ClusterReplicaSetPreferences{
"*": {Weight: 1}}, "*": {Weight: 1}},
1, []string{"A", "B"}, 1, []string{"A", "B"},
map[string]int64{"A": 1, "B": 0}) // hash dependent
map[string]int64{"A": 0, "B": 1})
doCheck(t, map[string]fed_api.ClusterReplicaSetPreferences{ doCheck(t, map[string]fed_api.ClusterReplicaSetPreferences{
"*": {Weight: 1}}, "*": {Weight: 1}},
1, []string{"A", "B", "C", "D"}, 1, []string{"A", "B", "C", "D"},
map[string]int64{"A": 1, "B": 0, "C": 0, "D": 0}) // hash dependent
map[string]int64{"A": 0, "B": 0, "C": 0, "D": 1})
doCheck(t, map[string]fed_api.ClusterReplicaSetPreferences{ doCheck(t, map[string]fed_api.ClusterReplicaSetPreferences{
"*": {Weight: 1}}, "*": {Weight: 1}},
@ -122,7 +125,9 @@ func TestEqualWithExisting(t *testing.T) {
"*": {Weight: 1}}, "*": {Weight: 1}},
50, []string{"A", "B"}, 50, []string{"A", "B"},
map[string]int64{"A": 10, "B": 70}, map[string]int64{"A": 10, "B": 70},
map[string]int64{"A": 10, "B": 40}) // hash dependent
// TODO: Should be 10:40, update algorithm. Issue: #31816
map[string]int64{"A": 0, "B": 50})
doCheckWithExisting(t, map[string]fed_api.ClusterReplicaSetPreferences{ doCheckWithExisting(t, map[string]fed_api.ClusterReplicaSetPreferences{
"*": {Weight: 1}}, "*": {Weight: 1}},
@ -145,7 +150,7 @@ func TestWithExistingAndCapacity(t *testing.T) {
map[string]int64{}, map[string]int64{},
map[string]int64{"C": 10}, map[string]int64{"C": 10},
map[string]int64{"A": 20, "B": 20, "C": 10}, map[string]int64{"A": 20, "B": 20, "C": 10},
map[string]int64{"C": 6}) map[string]int64{"C": 7})
// desired B:50 C:0 // desired B:50 C:0
doCheckWithExistingAndCapacity(t, true, map[string]fed_api.ClusterReplicaSetPreferences{ doCheckWithExistingAndCapacity(t, true, map[string]fed_api.ClusterReplicaSetPreferences{
@ -228,7 +233,8 @@ func TestMin(t *testing.T) {
doCheck(t, map[string]fed_api.ClusterReplicaSetPreferences{ doCheck(t, map[string]fed_api.ClusterReplicaSetPreferences{
"*": {MinReplicas: 20, Weight: 0}}, "*": {MinReplicas: 20, Weight: 0}},
50, []string{"A", "B", "C"}, 50, []string{"A", "B", "C"},
map[string]int64{"A": 20, "B": 20, "C": 10}) // hash dependant.
map[string]int64{"A": 10, "B": 20, "C": 20})
doCheck(t, map[string]fed_api.ClusterReplicaSetPreferences{ doCheck(t, map[string]fed_api.ClusterReplicaSetPreferences{
"*": {MinReplicas: 20, Weight: 0}, "*": {MinReplicas: 20, Weight: 0},

View File

@ -363,7 +363,8 @@ func (frsc *ReplicaSetController) schedule(frs *extensionsv1.ReplicaSet, cluster
for _, cluster := range clusters { for _, cluster := range clusters {
clusterNames = append(clusterNames, cluster.Name) clusterNames = append(clusterNames, cluster.Name)
} }
scheduleResult, overflow := plnr.Plan(replicas, clusterNames, current, estimatedCapacity) scheduleResult, overflow := plnr.Plan(replicas, clusterNames, current, estimatedCapacity,
frs.Namespace+"/"+frs.Name)
// make sure the return contains clusters need to zero the replicas // make sure the return contains clusters need to zero the replicas
result := make(map[string]int64) result := make(map[string]int64)
for clusterName := range current { for clusterName := range current {