use palnner v2 and pod analyzer

This commit is contained in:
jianhuiz 2016-08-19 22:22:23 -07:00
parent a491580597
commit 7598d43db0
2 changed files with 155 additions and 42 deletions

View File

@ -17,6 +17,7 @@ limitations under the License.
package replicaset package replicaset
import ( import (
"encoding/json"
"reflect" "reflect"
"time" "time"
@ -25,13 +26,14 @@ import (
fed "k8s.io/kubernetes/federation/apis/federation" fed "k8s.io/kubernetes/federation/apis/federation"
fedv1 "k8s.io/kubernetes/federation/apis/federation/v1beta1" fedv1 "k8s.io/kubernetes/federation/apis/federation/v1beta1"
fedclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_release_1_4" fedclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_release_1_4"
//kubeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_4"
planner "k8s.io/kubernetes/federation/pkg/federation-controller/replicaset/planner" planner "k8s.io/kubernetes/federation/pkg/federation-controller/replicaset/planner"
fedutil "k8s.io/kubernetes/federation/pkg/federation-controller/util" fedutil "k8s.io/kubernetes/federation/pkg/federation-controller/util"
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/meta"
apiv1 "k8s.io/kubernetes/pkg/api/v1" apiv1 "k8s.io/kubernetes/pkg/api/v1"
extensionsv1 "k8s.io/kubernetes/pkg/apis/extensions/v1beta1" extensionsv1 "k8s.io/kubernetes/pkg/apis/extensions/v1beta1"
"k8s.io/kubernetes/pkg/client/cache" "k8s.io/kubernetes/pkg/client/cache"
kubeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_4"
"k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/framework" "k8s.io/kubernetes/pkg/controller/framework"
"k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/runtime"
@ -42,10 +44,9 @@ import (
) )
const ( const (
// schedule result was put into annotation in a format of "clusterName:replicas[/clusterName:replicas]..." FedReplicaSetPreferencesAnnotation = ""
ExpectedReplicasAnnotation = "kubernetes.io/expected-replicas" allClustersKey = "THE_ALL_CLUSTER_KEY"
allClustersKey = "THE_ALL_CLUSTER_KEY" UserAgentName = "Federation-replicaset-Controller"
UserAgentName = "Federation-replicaset-Controller"
) )
var ( var (
@ -55,7 +56,23 @@ var (
) )
func parseFederationReplicaSetReference(frs *extensionsv1.ReplicaSet) (*fed.FederatedReplicaSetPreferences, error) { func parseFederationReplicaSetReference(frs *extensionsv1.ReplicaSet) (*fed.FederatedReplicaSetPreferences, error) {
return nil, nil accessor, err := meta.Accessor(frs)
if err != nil {
return nil, err
}
anno := accessor.GetAnnotations()
if anno == nil {
return nil, nil
}
frsPrefString, found := anno[FedReplicaSetPreferencesAnnotation]
if !found {
return nil, nil
}
var frsPref fed.FederatedReplicaSetPreferences
if err := json.Unmarshal([]byte(frsPrefString), &frsPref); err != nil {
return nil, err
}
return &frsPref, nil
} }
type ReplicaSetController struct { type ReplicaSetController struct {
@ -64,7 +81,8 @@ type ReplicaSetController struct {
replicaSetController *framework.Controller replicaSetController *framework.Controller
replicaSetStore cache.StoreToReplicaSetLister replicaSetStore cache.StoreToReplicaSetLister
fedInformer fedutil.FederatedInformer fedReplicaSetInformer fedutil.FederatedInformer
fedPodInformer fedutil.FederatedInformer
replicasetDeliverer *fedutil.DelayingDeliverer replicasetDeliverer *fedutil.DelayingDeliverer
clusterDeliverer *fedutil.DelayingDeliverer clusterDeliverer *fedutil.DelayingDeliverer
@ -72,7 +90,7 @@ type ReplicaSetController struct {
replicaSetBackoff *flowcontrol.Backoff replicaSetBackoff *flowcontrol.Backoff
planner *planner.Planner defaultPlanner *planner.Planner
} }
// NewclusterController returns a new cluster controller // NewclusterController returns a new cluster controller
@ -83,14 +101,14 @@ func NewReplicaSetController(federationClient fedclientset.Interface) *ReplicaSe
clusterDeliverer: fedutil.NewDelayingDeliverer(), clusterDeliverer: fedutil.NewDelayingDeliverer(),
replicasetWorkQueue: workqueue.New(), replicasetWorkQueue: workqueue.New(),
replicaSetBackoff: flowcontrol.NewBackOff(5*time.Second, time.Minute), replicaSetBackoff: flowcontrol.NewBackOff(5*time.Second, time.Minute),
planner: planner.NewPlanner(&fed.FederatedReplicaSetPreferences{ defaultPlanner: planner.NewPlanner(&fed.FederatedReplicaSetPreferences{
Clusters: map[string]fed.ClusterReplicaSetPreferences{ Clusters: map[string]fed.ClusterReplicaSetPreferences{
"*": {Weight: 1}, "*": {Weight: 1},
}, },
}), }),
} }
replicaSetFedInformerFactory := func(cluster *fedv1.Cluster, clientset fedclientset.Interface) (cache.Store, framework.ControllerInterface) { replicaSetFedInformerFactory := func(cluster *fedv1.Cluster, clientset kubeclientset.Interface) (cache.Store, framework.ControllerInterface) {
return framework.NewInformer( return framework.NewInformer(
&cache.ListWatch{ &cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) { ListFunc: func(options api.ListOptions) (runtime.Object, error) {
@ -113,7 +131,28 @@ func NewReplicaSetController(federationClient fedclientset.Interface) *ReplicaSe
frsc.clusterDeliverer.DeliverAfter(allClustersKey, nil, clusterUnavailableDelay) frsc.clusterDeliverer.DeliverAfter(allClustersKey, nil, clusterUnavailableDelay)
}, },
} }
frsc.fedInformer = fedutil.NewFederatedInformer(federationClient, replicaSetFedInformerFactory, &clusterLifecycle) frsc.fedReplicaSetInformer = fedutil.NewFederatedInformer(federationClient, replicaSetFedInformerFactory, &clusterLifecycle)
podFedInformerFactory := func(cluster *fedv1.Cluster, clientset kubeclientset.Interface) (cache.Store, framework.ControllerInterface) {
return framework.NewInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return clientset.Core().Pods(apiv1.NamespaceAll).List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return clientset.Core().Pods(apiv1.NamespaceAll).Watch(options)
},
},
&apiv1.Pod{},
controller.NoResyncPeriodFunc(),
fedutil.NewTriggerOnAllChanges(
func(obj runtime.Object) {
//frsc.deliverLocalReplicaSet(obj, replicaSetReviewDelay)
},
),
)
}
frsc.fedPodInformer = fedutil.NewFederatedInformer(federationClient, podFedInformerFactory, &fedutil.ClusterLifecycleHandlerFuncs{})
frsc.replicaSetStore.Store, frsc.replicaSetController = framework.NewInformer( frsc.replicaSetStore.Store, frsc.replicaSetController = framework.NewInformer(
&cache.ListWatch{ &cache.ListWatch{
@ -136,11 +175,8 @@ func NewReplicaSetController(federationClient fedclientset.Interface) *ReplicaSe
func (frsc *ReplicaSetController) Run(workers int, stopCh <-chan struct{}) { func (frsc *ReplicaSetController) Run(workers int, stopCh <-chan struct{}) {
go frsc.replicaSetController.Run(stopCh) go frsc.replicaSetController.Run(stopCh)
frsc.fedInformer.Start() frsc.fedReplicaSetInformer.Start()
frsc.fedPodInformer.Start()
for !frsc.isSynced() {
time.Sleep(5 * time.Millisecond)
}
frsc.replicasetDeliverer.StartWithHandler(func(item *fedutil.DelayingDelivererItem) { frsc.replicasetDeliverer.StartWithHandler(func(item *fedutil.DelayingDelivererItem) {
frsc.replicasetWorkQueue.Add(item.Key) frsc.replicasetWorkQueue.Add(item.Key)
@ -149,9 +185,14 @@ func (frsc *ReplicaSetController) Run(workers int, stopCh <-chan struct{}) {
frsc.reconcileNamespacesOnClusterChange() frsc.reconcileNamespacesOnClusterChange()
}) })
for !frsc.isSynced() {
time.Sleep(5 * time.Millisecond)
}
for i := 0; i < workers; i++ { for i := 0; i < workers; i++ {
go wait.Until(frsc.worker, time.Second, stopCh) go wait.Until(frsc.worker, time.Second, stopCh)
} }
go func() { go func() {
select { select {
case <-time.After(time.Minute): case <-time.After(time.Minute):
@ -166,21 +207,37 @@ func (frsc *ReplicaSetController) Run(workers int, stopCh <-chan struct{}) {
frsc.replicasetDeliverer.Stop() frsc.replicasetDeliverer.Stop()
frsc.clusterDeliverer.Stop() frsc.clusterDeliverer.Stop()
frsc.replicasetWorkQueue.ShutDown() frsc.replicasetWorkQueue.ShutDown()
frsc.fedReplicaSetInformer.Stop()
frsc.fedPodInformer.Stop()
} }
func (frsc *ReplicaSetController) isSynced() bool { func (frsc *ReplicaSetController) isSynced() bool {
if !frsc.fedInformer.ClustersSynced() { if !frsc.fedReplicaSetInformer.ClustersSynced() {
glog.V(2).Infof("Cluster list not synced") glog.V(2).Infof("Cluster list not synced")
return false return false
} }
clusters, err := frsc.fedInformer.GetReadyClusters() clusters, err := frsc.fedReplicaSetInformer.GetReadyClusters()
if err != nil { if err != nil {
glog.Errorf("Failed to get ready clusters: %v", err) glog.Errorf("Failed to get ready clusters: %v", err)
return false return false
} }
if !frsc.fedInformer.GetTargetStore().ClustersSynced(clusters) { if !frsc.fedReplicaSetInformer.GetTargetStore().ClustersSynced(clusters) {
return false return false
} }
if !frsc.fedPodInformer.ClustersSynced() {
glog.V(2).Infof("Cluster list not synced")
return false
}
clusters, err = frsc.fedPodInformer.GetReadyClusters()
if err != nil {
glog.Errorf("Failed to get ready clusters: %v", err)
return false
}
if !frsc.fedPodInformer.GetTargetStore().ClustersSynced(clusters) {
return false
}
if !frsc.replicaSetController.HasSynced() { if !frsc.replicaSetController.HasSynced() {
glog.V(2).Infof("federation replicaset list not synced") glog.V(2).Infof("federation replicaset list not synced")
return false return false
@ -194,7 +251,7 @@ func (frsc *ReplicaSetController) deliverLocalReplicaSet(obj interface{}, durati
glog.Errorf("Couldn't get key for object %v: %v", obj, err) glog.Errorf("Couldn't get key for object %v: %v", obj, err)
return return
} }
_, exists, err := frsc.replicaSetStore.GetByKey(key) _, exists, err := frsc.replicaSetStore.Store.GetByKey(key)
if err != nil { if err != nil {
glog.Errorf("Couldn't get federation replicaset %v: %v", key, err) glog.Errorf("Couldn't get federation replicaset %v: %v", key, err)
return return
@ -243,8 +300,11 @@ func (frsc *ReplicaSetController) schedule(frs *extensionsv1.ReplicaSet, cluster
expected map[string]int64, actual map[string]int64) map[string]int64 { expected map[string]int64, actual map[string]int64) map[string]int64 {
// TODO: integrate real scheduler // TODO: integrate real scheduler
plnr := frsc.planner plnr := frsc.defaultPlanner
frsPref, _ := parseFederationReplicaSetReference(frs) frsPref, err := parseFederationReplicaSetReference(frs)
if err != nil {
glog.Info("Invalid ReplicaSet specific preference, use default. rs: %v, err: %v", frs, err)
}
if frsPref != nil { // create a new planner if user specified a preference if frsPref != nil { // create a new planner if user specified a preference
plnr = planner.NewPlanner(frsPref) plnr = planner.NewPlanner(frsPref)
} }
@ -254,7 +314,7 @@ func (frsc *ReplicaSetController) schedule(frs *extensionsv1.ReplicaSet, cluster
for _, cluster := range clusters { for _, cluster := range clusters {
clusterNames = append(clusterNames, cluster.Name) clusterNames = append(clusterNames, cluster.Name)
} }
scheduleResult := plnr.Plan(replicas, clusterNames) scheduleResult, _ := plnr.Plan(replicas, clusterNames, expected, actual)
// make sure the return contains clusters need to zero the replicas // make sure the return contains clusters need to zero the replicas
result := make(map[string]int64) result := make(map[string]int64)
for clusterName := range expected { for clusterName := range expected {
@ -286,26 +346,34 @@ func (frsc *ReplicaSetController) reconcileReplicaSet(key string) error {
} }
frs := obj.(*extensionsv1.ReplicaSet) frs := obj.(*extensionsv1.ReplicaSet)
clusters, err := frsc.fedInformer.GetReadyClusters() clusters, err := frsc.fedReplicaSetInformer.GetReadyClusters()
if err != nil { if err != nil {
return err return err
} }
// collect current status and do schedule // collect current status and do schedule
allPods, err := frsc.fedPodInformer.GetTargetStore().List()
if err != nil {
return err
}
podStatus, err := AnalysePods(frs, allPods, time.Now())
expected := make(map[string]int64) expected := make(map[string]int64)
actual := make(map[string]int64) actual := make(map[string]int64)
for _, cluster := range clusters { for _, cluster := range clusters {
lrsObj, exists, err := frsc.fedInformer.GetTargetStore().GetByKey(cluster.Name, key) lrsObj, exists, err := frsc.fedReplicaSetInformer.GetTargetStore().GetByKey(cluster.Name, key)
if err != nil { if err != nil {
return err return err
} }
if exists { if exists {
lrs := lrsObj.(*extensionsv1.ReplicaSet) lrs := lrsObj.(*extensionsv1.ReplicaSet)
expected[cluster.Name] = int64(*lrs.Spec.Replicas) expected[cluster.Name] = int64(*lrs.Spec.Replicas)
// will get this via pod status unscheduleable := int64(podStatus[cluster.Name].Unschedulable)
actual[cluster.Name] = int64(lrs.Status.Replicas) if unscheduleable > 0 {
actual[cluster.Name] = int64(*lrs.Spec.Replicas)
}
} }
} }
scheduleResult := frsc.schedule(frs, clusters, expected, actual) scheduleResult := frsc.schedule(frs, clusters, expected, actual)
glog.Infof("Start syncing local replicaset %v", scheduleResult) glog.Infof("Start syncing local replicaset %v", scheduleResult)
@ -313,11 +381,11 @@ func (frsc *ReplicaSetController) reconcileReplicaSet(key string) error {
fedStatus := extensionsv1.ReplicaSetStatus{ObservedGeneration: frs.Generation} fedStatus := extensionsv1.ReplicaSetStatus{ObservedGeneration: frs.Generation}
for clusterName, replicas := range scheduleResult { for clusterName, replicas := range scheduleResult {
// TODO: updater or parallelizer doesnn't help as results are needed for updating fed rs status // TODO: updater or parallelizer doesnn't help as results are needed for updating fed rs status
clusterClient, err := frsc.fedInformer.GetClientsetForCluster(clusterName) clusterClient, err := frsc.fedReplicaSetInformer.GetClientsetForCluster(clusterName)
if err != nil { if err != nil {
return err return err
} }
lrsObj, exists, err := frsc.fedInformer.GetTargetStore().GetByKey(clusterName, key) lrsObj, exists, err := frsc.fedReplicaSetInformer.GetTargetStore().GetByKey(clusterName, key)
if err != nil { if err != nil {
return err return err
} else if !exists { } else if !exists {
@ -372,7 +440,7 @@ func (frsc *ReplicaSetController) reconcileNamespacesOnClusterChange() {
if !frsc.isSynced() { if !frsc.isSynced() {
frsc.clusterDeliverer.DeliverAfter(allClustersKey, nil, clusterAvailableDelay) frsc.clusterDeliverer.DeliverAfter(allClustersKey, nil, clusterAvailableDelay)
} }
rss, _ := frsc.replicaSetStore.List() rss := frsc.replicaSetStore.Store.List()
for _, rs := range rss { for _, rs := range rss {
key, _ := controller.KeyFunc(rs) key, _ := controller.KeyFunc(rs)
frsc.deliverReplicaSetByKey(key, 0, false) frsc.deliverReplicaSetByKey(key, 0, false)

View File

@ -17,22 +17,60 @@ limitations under the License.
package replicaset package replicaset
import ( import (
"flag"
"fmt" "fmt"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
fedv1 "k8s.io/kubernetes/federation/apis/federation/v1beta1" fedv1 "k8s.io/kubernetes/federation/apis/federation/v1beta1"
fedclientfake "k8s.io/kubernetes/federation/client/clientset_generated/federation_release_1_4/fake" fedclientfake "k8s.io/kubernetes/federation/client/clientset_generated/federation_release_1_4/fake"
fedutil "k8s.io/kubernetes/federation/pkg/federation-controller/util"
"k8s.io/kubernetes/pkg/api/meta"
apiv1 "k8s.io/kubernetes/pkg/api/v1" apiv1 "k8s.io/kubernetes/pkg/api/v1"
extensionsv1 "k8s.io/kubernetes/pkg/apis/extensions/v1beta1" extensionsv1 "k8s.io/kubernetes/pkg/apis/extensions/v1beta1"
"testing" kube_release_1_4 "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_4"
"time" kubeclientfake "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_4/fake"
//kubeclientfake "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_4/fake"
"k8s.io/kubernetes/federation/client/clientset_generated/federation_release_1_4"
fedutil "k8s.io/kubernetes/federation/pkg/federation-controller/util"
"k8s.io/kubernetes/pkg/client/testing/core" "k8s.io/kubernetes/pkg/client/testing/core"
"k8s.io/kubernetes/pkg/watch" "k8s.io/kubernetes/pkg/watch"
"testing"
"time"
) )
func TestParseFederationReplicaSetReference(t *testing.T) {
successPrefs := []string{
`{"rebalance": true,
"clusters": {
"k8s-1": {"minReplicas": 10, "maxReplicas": 20, "weight": 2},
"*": {"weight": 1}
}}`,
}
failedPrefes := []string{
`{`, // bad json
}
rs := mkReplicaSet("rs-1", 100)
accessor, _ := meta.Accessor(rs)
anno := accessor.GetAnnotations()
if anno == nil {
anno = make(map[string]string)
accessor.SetAnnotations(anno)
}
for _, prefString := range successPrefs {
anno[FedReplicaSetPreferencesAnnotation] = prefString
pref, err := parseFederationReplicaSetReference(rs)
assert.NotNil(t, pref)
assert.Nil(t, err)
}
for _, prefString := range failedPrefes {
anno[FedReplicaSetPreferencesAnnotation] = prefString
pref, err := parseFederationReplicaSetReference(rs)
assert.Nil(t, pref)
assert.NotNil(t, err)
}
}
func TestReplicaSetController(t *testing.T) { func TestReplicaSetController(t *testing.T) {
flag.Set("logtostderr", "true")
flag.Set("v", "5")
flag.Parse()
replicaSetReviewDelay = 10 * time.Millisecond replicaSetReviewDelay = 10 * time.Millisecond
clusterAvailableDelay = 20 * time.Millisecond clusterAvailableDelay = 20 * time.Millisecond
@ -45,17 +83,18 @@ func TestReplicaSetController(t *testing.T) {
fedclientset.Federation().Clusters().Create(mkCluster("k8s-1", apiv1.ConditionTrue)) fedclientset.Federation().Clusters().Create(mkCluster("k8s-1", apiv1.ConditionTrue))
fedclientset.Federation().Clusters().Create(mkCluster("k8s-2", apiv1.ConditionTrue)) fedclientset.Federation().Clusters().Create(mkCluster("k8s-2", apiv1.ConditionTrue))
kube1clientset := fedclientfake.NewSimpleClientset() kube1clientset := kubeclientfake.NewSimpleClientset()
kube1rswatch := watch.NewFake() kube1rswatch := watch.NewFake()
kube1clientset.PrependWatchReactor("replicasets", core.DefaultWatchReactor(kube1rswatch, nil)) kube1clientset.PrependWatchReactor("replicasets", core.DefaultWatchReactor(kube1rswatch, nil))
kube2clientset := fedclientfake.NewSimpleClientset() kube1Podwatch := watch.NewFake()
kube1clientset.PrependWatchReactor("pods", core.DefaultWatchReactor(kube1Podwatch, nil))
kube2clientset := kubeclientfake.NewSimpleClientset()
kube2rswatch := watch.NewFake() kube2rswatch := watch.NewFake()
kube2clientset.PrependWatchReactor("replicasets", core.DefaultWatchReactor(kube2rswatch, nil)) kube2clientset.PrependWatchReactor("replicasets", core.DefaultWatchReactor(kube2rswatch, nil))
kube2Podwatch := watch.NewFake()
kube2clientset.PrependWatchReactor("pods", core.DefaultWatchReactor(kube2Podwatch, nil))
stopChan := make(chan struct{}) fedInformerClientFactory := func(cluster *fedv1.Cluster) (kube_release_1_4.Interface, error) {
replicaSetController := NewReplicaSetController(fedclientset)
informer := toFederatedInformerForTestOnly(replicaSetController.fedInformer)
informer.SetClientFactory(func(cluster *fedv1.Cluster) (federation_release_1_4.Interface, error) {
switch cluster.Name { switch cluster.Name {
case "k8s-1": case "k8s-1":
return kube1clientset, nil return kube1clientset, nil
@ -64,7 +103,13 @@ func TestReplicaSetController(t *testing.T) {
default: default:
return nil, fmt.Errorf("Unknown cluster: %v", cluster.Name) return nil, fmt.Errorf("Unknown cluster: %v", cluster.Name)
} }
}) }
stopChan := make(chan struct{})
replicaSetController := NewReplicaSetController(fedclientset)
rsFedinformer := toFederatedInformerForTestOnly(replicaSetController.fedReplicaSetInformer)
rsFedinformer.SetClientFactory(fedInformerClientFactory)
podFedinformer := toFederatedInformerForTestOnly(replicaSetController.fedPodInformer)
podFedinformer.SetClientFactory(fedInformerClientFactory)
go replicaSetController.Run(1, stopChan) go replicaSetController.Run(1, stopChan)