diff --git a/federation/pkg/federation-controller/replicaset/replicasetcontroller.go b/federation/pkg/federation-controller/replicaset/replicasetcontroller.go index 39ce4753cc2..2f32bbd7316 100644 --- a/federation/pkg/federation-controller/replicaset/replicasetcontroller.go +++ b/federation/pkg/federation-controller/replicaset/replicasetcontroller.go @@ -17,6 +17,7 @@ limitations under the License. package replicaset import ( + "encoding/json" "reflect" "time" @@ -25,13 +26,14 @@ import ( fed "k8s.io/kubernetes/federation/apis/federation" fedv1 "k8s.io/kubernetes/federation/apis/federation/v1beta1" fedclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_release_1_4" - //kubeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_4" planner "k8s.io/kubernetes/federation/pkg/federation-controller/replicaset/planner" fedutil "k8s.io/kubernetes/federation/pkg/federation-controller/util" "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/meta" apiv1 "k8s.io/kubernetes/pkg/api/v1" extensionsv1 "k8s.io/kubernetes/pkg/apis/extensions/v1beta1" "k8s.io/kubernetes/pkg/client/cache" + kubeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_4" "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller/framework" "k8s.io/kubernetes/pkg/runtime" @@ -42,10 +44,9 @@ import ( ) const ( - // schedule result was put into annotation in a format of "clusterName:replicas[/clusterName:replicas]..." - ExpectedReplicasAnnotation = "kubernetes.io/expected-replicas" - allClustersKey = "THE_ALL_CLUSTER_KEY" - UserAgentName = "Federation-replicaset-Controller" + FedReplicaSetPreferencesAnnotation = "" + allClustersKey = "THE_ALL_CLUSTER_KEY" + UserAgentName = "Federation-replicaset-Controller" ) var ( @@ -55,7 +56,23 @@ var ( ) func parseFederationReplicaSetReference(frs *extensionsv1.ReplicaSet) (*fed.FederatedReplicaSetPreferences, error) { - return nil, nil + accessor, err := meta.Accessor(frs) + if err != nil { + return nil, err + } + anno := accessor.GetAnnotations() + if anno == nil { + return nil, nil + } + frsPrefString, found := anno[FedReplicaSetPreferencesAnnotation] + if !found { + return nil, nil + } + var frsPref fed.FederatedReplicaSetPreferences + if err := json.Unmarshal([]byte(frsPrefString), &frsPref); err != nil { + return nil, err + } + return &frsPref, nil } type ReplicaSetController struct { @@ -64,7 +81,8 @@ type ReplicaSetController struct { replicaSetController *framework.Controller replicaSetStore cache.StoreToReplicaSetLister - fedInformer fedutil.FederatedInformer + fedReplicaSetInformer fedutil.FederatedInformer + fedPodInformer fedutil.FederatedInformer replicasetDeliverer *fedutil.DelayingDeliverer clusterDeliverer *fedutil.DelayingDeliverer @@ -72,7 +90,7 @@ type ReplicaSetController struct { replicaSetBackoff *flowcontrol.Backoff - planner *planner.Planner + defaultPlanner *planner.Planner } // NewclusterController returns a new cluster controller @@ -83,14 +101,14 @@ func NewReplicaSetController(federationClient fedclientset.Interface) *ReplicaSe clusterDeliverer: fedutil.NewDelayingDeliverer(), replicasetWorkQueue: workqueue.New(), replicaSetBackoff: flowcontrol.NewBackOff(5*time.Second, time.Minute), - planner: planner.NewPlanner(&fed.FederatedReplicaSetPreferences{ + defaultPlanner: planner.NewPlanner(&fed.FederatedReplicaSetPreferences{ Clusters: map[string]fed.ClusterReplicaSetPreferences{ "*": {Weight: 1}, }, }), } - replicaSetFedInformerFactory := func(cluster *fedv1.Cluster, clientset fedclientset.Interface) (cache.Store, framework.ControllerInterface) { + replicaSetFedInformerFactory := func(cluster *fedv1.Cluster, clientset kubeclientset.Interface) (cache.Store, framework.ControllerInterface) { return framework.NewInformer( &cache.ListWatch{ ListFunc: func(options api.ListOptions) (runtime.Object, error) { @@ -113,7 +131,28 @@ func NewReplicaSetController(federationClient fedclientset.Interface) *ReplicaSe frsc.clusterDeliverer.DeliverAfter(allClustersKey, nil, clusterUnavailableDelay) }, } - frsc.fedInformer = fedutil.NewFederatedInformer(federationClient, replicaSetFedInformerFactory, &clusterLifecycle) + frsc.fedReplicaSetInformer = fedutil.NewFederatedInformer(federationClient, replicaSetFedInformerFactory, &clusterLifecycle) + + podFedInformerFactory := func(cluster *fedv1.Cluster, clientset kubeclientset.Interface) (cache.Store, framework.ControllerInterface) { + return framework.NewInformer( + &cache.ListWatch{ + ListFunc: func(options api.ListOptions) (runtime.Object, error) { + return clientset.Core().Pods(apiv1.NamespaceAll).List(options) + }, + WatchFunc: func(options api.ListOptions) (watch.Interface, error) { + return clientset.Core().Pods(apiv1.NamespaceAll).Watch(options) + }, + }, + &apiv1.Pod{}, + controller.NoResyncPeriodFunc(), + fedutil.NewTriggerOnAllChanges( + func(obj runtime.Object) { + //frsc.deliverLocalReplicaSet(obj, replicaSetReviewDelay) + }, + ), + ) + } + frsc.fedPodInformer = fedutil.NewFederatedInformer(federationClient, podFedInformerFactory, &fedutil.ClusterLifecycleHandlerFuncs{}) frsc.replicaSetStore.Store, frsc.replicaSetController = framework.NewInformer( &cache.ListWatch{ @@ -136,11 +175,8 @@ func NewReplicaSetController(federationClient fedclientset.Interface) *ReplicaSe func (frsc *ReplicaSetController) Run(workers int, stopCh <-chan struct{}) { go frsc.replicaSetController.Run(stopCh) - frsc.fedInformer.Start() - - for !frsc.isSynced() { - time.Sleep(5 * time.Millisecond) - } + frsc.fedReplicaSetInformer.Start() + frsc.fedPodInformer.Start() frsc.replicasetDeliverer.StartWithHandler(func(item *fedutil.DelayingDelivererItem) { frsc.replicasetWorkQueue.Add(item.Key) @@ -149,9 +185,14 @@ func (frsc *ReplicaSetController) Run(workers int, stopCh <-chan struct{}) { frsc.reconcileNamespacesOnClusterChange() }) + for !frsc.isSynced() { + time.Sleep(5 * time.Millisecond) + } + for i := 0; i < workers; i++ { go wait.Until(frsc.worker, time.Second, stopCh) } + go func() { select { case <-time.After(time.Minute): @@ -166,21 +207,37 @@ func (frsc *ReplicaSetController) Run(workers int, stopCh <-chan struct{}) { frsc.replicasetDeliverer.Stop() frsc.clusterDeliverer.Stop() frsc.replicasetWorkQueue.ShutDown() + frsc.fedReplicaSetInformer.Stop() + frsc.fedPodInformer.Stop() } func (frsc *ReplicaSetController) isSynced() bool { - if !frsc.fedInformer.ClustersSynced() { + if !frsc.fedReplicaSetInformer.ClustersSynced() { glog.V(2).Infof("Cluster list not synced") return false } - clusters, err := frsc.fedInformer.GetReadyClusters() + clusters, err := frsc.fedReplicaSetInformer.GetReadyClusters() if err != nil { glog.Errorf("Failed to get ready clusters: %v", err) return false } - if !frsc.fedInformer.GetTargetStore().ClustersSynced(clusters) { + if !frsc.fedReplicaSetInformer.GetTargetStore().ClustersSynced(clusters) { return false } + + if !frsc.fedPodInformer.ClustersSynced() { + glog.V(2).Infof("Cluster list not synced") + return false + } + clusters, err = frsc.fedPodInformer.GetReadyClusters() + if err != nil { + glog.Errorf("Failed to get ready clusters: %v", err) + return false + } + if !frsc.fedPodInformer.GetTargetStore().ClustersSynced(clusters) { + return false + } + if !frsc.replicaSetController.HasSynced() { glog.V(2).Infof("federation replicaset list not synced") return false @@ -194,7 +251,7 @@ func (frsc *ReplicaSetController) deliverLocalReplicaSet(obj interface{}, durati glog.Errorf("Couldn't get key for object %v: %v", obj, err) return } - _, exists, err := frsc.replicaSetStore.GetByKey(key) + _, exists, err := frsc.replicaSetStore.Store.GetByKey(key) if err != nil { glog.Errorf("Couldn't get federation replicaset %v: %v", key, err) return @@ -243,8 +300,11 @@ func (frsc *ReplicaSetController) schedule(frs *extensionsv1.ReplicaSet, cluster expected map[string]int64, actual map[string]int64) map[string]int64 { // TODO: integrate real scheduler - plnr := frsc.planner - frsPref, _ := parseFederationReplicaSetReference(frs) + plnr := frsc.defaultPlanner + frsPref, err := parseFederationReplicaSetReference(frs) + if err != nil { + glog.Info("Invalid ReplicaSet specific preference, use default. rs: %v, err: %v", frs, err) + } if frsPref != nil { // create a new planner if user specified a preference plnr = planner.NewPlanner(frsPref) } @@ -254,7 +314,7 @@ func (frsc *ReplicaSetController) schedule(frs *extensionsv1.ReplicaSet, cluster for _, cluster := range clusters { clusterNames = append(clusterNames, cluster.Name) } - scheduleResult := plnr.Plan(replicas, clusterNames) + scheduleResult, _ := plnr.Plan(replicas, clusterNames, expected, actual) // make sure the return contains clusters need to zero the replicas result := make(map[string]int64) for clusterName := range expected { @@ -286,26 +346,34 @@ func (frsc *ReplicaSetController) reconcileReplicaSet(key string) error { } frs := obj.(*extensionsv1.ReplicaSet) - clusters, err := frsc.fedInformer.GetReadyClusters() + clusters, err := frsc.fedReplicaSetInformer.GetReadyClusters() if err != nil { return err } // collect current status and do schedule + allPods, err := frsc.fedPodInformer.GetTargetStore().List() + if err != nil { + return err + } + podStatus, err := AnalysePods(frs, allPods, time.Now()) expected := make(map[string]int64) actual := make(map[string]int64) for _, cluster := range clusters { - lrsObj, exists, err := frsc.fedInformer.GetTargetStore().GetByKey(cluster.Name, key) + lrsObj, exists, err := frsc.fedReplicaSetInformer.GetTargetStore().GetByKey(cluster.Name, key) if err != nil { return err } if exists { lrs := lrsObj.(*extensionsv1.ReplicaSet) expected[cluster.Name] = int64(*lrs.Spec.Replicas) - // will get this via pod status - actual[cluster.Name] = int64(lrs.Status.Replicas) + unscheduleable := int64(podStatus[cluster.Name].Unschedulable) + if unscheduleable > 0 { + actual[cluster.Name] = int64(*lrs.Spec.Replicas) + } } } + scheduleResult := frsc.schedule(frs, clusters, expected, actual) glog.Infof("Start syncing local replicaset %v", scheduleResult) @@ -313,11 +381,11 @@ func (frsc *ReplicaSetController) reconcileReplicaSet(key string) error { fedStatus := extensionsv1.ReplicaSetStatus{ObservedGeneration: frs.Generation} for clusterName, replicas := range scheduleResult { // TODO: updater or parallelizer doesnn't help as results are needed for updating fed rs status - clusterClient, err := frsc.fedInformer.GetClientsetForCluster(clusterName) + clusterClient, err := frsc.fedReplicaSetInformer.GetClientsetForCluster(clusterName) if err != nil { return err } - lrsObj, exists, err := frsc.fedInformer.GetTargetStore().GetByKey(clusterName, key) + lrsObj, exists, err := frsc.fedReplicaSetInformer.GetTargetStore().GetByKey(clusterName, key) if err != nil { return err } else if !exists { @@ -372,7 +440,7 @@ func (frsc *ReplicaSetController) reconcileNamespacesOnClusterChange() { if !frsc.isSynced() { frsc.clusterDeliverer.DeliverAfter(allClustersKey, nil, clusterAvailableDelay) } - rss, _ := frsc.replicaSetStore.List() + rss := frsc.replicaSetStore.Store.List() for _, rs := range rss { key, _ := controller.KeyFunc(rs) frsc.deliverReplicaSetByKey(key, 0, false) diff --git a/federation/pkg/federation-controller/replicaset/replicasetcontroller_test.go b/federation/pkg/federation-controller/replicaset/replicasetcontroller_test.go index b5e87714429..5ab0aa7ea81 100644 --- a/federation/pkg/federation-controller/replicaset/replicasetcontroller_test.go +++ b/federation/pkg/federation-controller/replicaset/replicasetcontroller_test.go @@ -17,22 +17,60 @@ limitations under the License. package replicaset import ( + "flag" "fmt" "github.com/stretchr/testify/assert" fedv1 "k8s.io/kubernetes/federation/apis/federation/v1beta1" fedclientfake "k8s.io/kubernetes/federation/client/clientset_generated/federation_release_1_4/fake" + fedutil "k8s.io/kubernetes/federation/pkg/federation-controller/util" + "k8s.io/kubernetes/pkg/api/meta" apiv1 "k8s.io/kubernetes/pkg/api/v1" extensionsv1 "k8s.io/kubernetes/pkg/apis/extensions/v1beta1" - "testing" - "time" - //kubeclientfake "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_4/fake" - "k8s.io/kubernetes/federation/client/clientset_generated/federation_release_1_4" - fedutil "k8s.io/kubernetes/federation/pkg/federation-controller/util" + kube_release_1_4 "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_4" + kubeclientfake "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_4/fake" "k8s.io/kubernetes/pkg/client/testing/core" "k8s.io/kubernetes/pkg/watch" + "testing" + "time" ) +func TestParseFederationReplicaSetReference(t *testing.T) { + successPrefs := []string{ + `{"rebalance": true, + "clusters": { + "k8s-1": {"minReplicas": 10, "maxReplicas": 20, "weight": 2}, + "*": {"weight": 1} + }}`, + } + failedPrefes := []string{ + `{`, // bad json + } + + rs := mkReplicaSet("rs-1", 100) + accessor, _ := meta.Accessor(rs) + anno := accessor.GetAnnotations() + if anno == nil { + anno = make(map[string]string) + accessor.SetAnnotations(anno) + } + for _, prefString := range successPrefs { + anno[FedReplicaSetPreferencesAnnotation] = prefString + pref, err := parseFederationReplicaSetReference(rs) + assert.NotNil(t, pref) + assert.Nil(t, err) + } + for _, prefString := range failedPrefes { + anno[FedReplicaSetPreferencesAnnotation] = prefString + pref, err := parseFederationReplicaSetReference(rs) + assert.Nil(t, pref) + assert.NotNil(t, err) + } +} + func TestReplicaSetController(t *testing.T) { + flag.Set("logtostderr", "true") + flag.Set("v", "5") + flag.Parse() replicaSetReviewDelay = 10 * time.Millisecond clusterAvailableDelay = 20 * time.Millisecond @@ -45,17 +83,18 @@ func TestReplicaSetController(t *testing.T) { fedclientset.Federation().Clusters().Create(mkCluster("k8s-1", apiv1.ConditionTrue)) fedclientset.Federation().Clusters().Create(mkCluster("k8s-2", apiv1.ConditionTrue)) - kube1clientset := fedclientfake.NewSimpleClientset() + kube1clientset := kubeclientfake.NewSimpleClientset() kube1rswatch := watch.NewFake() kube1clientset.PrependWatchReactor("replicasets", core.DefaultWatchReactor(kube1rswatch, nil)) - kube2clientset := fedclientfake.NewSimpleClientset() + kube1Podwatch := watch.NewFake() + kube1clientset.PrependWatchReactor("pods", core.DefaultWatchReactor(kube1Podwatch, nil)) + kube2clientset := kubeclientfake.NewSimpleClientset() kube2rswatch := watch.NewFake() kube2clientset.PrependWatchReactor("replicasets", core.DefaultWatchReactor(kube2rswatch, nil)) + kube2Podwatch := watch.NewFake() + kube2clientset.PrependWatchReactor("pods", core.DefaultWatchReactor(kube2Podwatch, nil)) - stopChan := make(chan struct{}) - replicaSetController := NewReplicaSetController(fedclientset) - informer := toFederatedInformerForTestOnly(replicaSetController.fedInformer) - informer.SetClientFactory(func(cluster *fedv1.Cluster) (federation_release_1_4.Interface, error) { + fedInformerClientFactory := func(cluster *fedv1.Cluster) (kube_release_1_4.Interface, error) { switch cluster.Name { case "k8s-1": return kube1clientset, nil @@ -64,7 +103,13 @@ func TestReplicaSetController(t *testing.T) { default: return nil, fmt.Errorf("Unknown cluster: %v", cluster.Name) } - }) + } + stopChan := make(chan struct{}) + replicaSetController := NewReplicaSetController(fedclientset) + rsFedinformer := toFederatedInformerForTestOnly(replicaSetController.fedReplicaSetInformer) + rsFedinformer.SetClientFactory(fedInformerClientFactory) + podFedinformer := toFederatedInformerForTestOnly(replicaSetController.fedPodInformer) + podFedinformer.SetClientFactory(fedInformerClientFactory) go replicaSetController.Run(1, stopChan)