diff --git a/federation/cmd/federation-controller-manager/app/controllermanager.go b/federation/cmd/federation-controller-manager/app/controllermanager.go index eb4a1e7c8ec..c57ec70a616 100644 --- a/federation/cmd/federation-controller-manager/app/controllermanager.go +++ b/federation/cmd/federation-controller-manager/app/controllermanager.go @@ -30,6 +30,7 @@ import ( "k8s.io/kubernetes/federation/cmd/federation-controller-manager/app/options" "k8s.io/kubernetes/federation/pkg/dnsprovider" clustercontroller "k8s.io/kubernetes/federation/pkg/federation-controller/cluster" + deploymentcontroller "k8s.io/kubernetes/federation/pkg/federation-controller/deployment" ingresscontroller "k8s.io/kubernetes/federation/pkg/federation-controller/ingress" namespacecontroller "k8s.io/kubernetes/federation/pkg/federation-controller/namespace" replicasetcontroller "k8s.io/kubernetes/federation/pkg/federation-controller/replicaset" @@ -160,6 +161,11 @@ func StartControllers(s *options.CMServer, restClientCfg *restclient.Config) err replicaSetController := replicasetcontroller.NewReplicaSetController(replicaSetClientset) go replicaSetController.Run(s.ConcurrentReplicaSetSyncs, wait.NeverStop) + deploymentClientset := federationclientset.NewForConfigOrDie(restclient.AddUserAgent(restClientCfg, deploymentcontroller.UserAgentName)) + deploymentController := deploymentcontroller.NewDeploymentController(deploymentClientset) + // TODO: rename s.ConcurentReplicaSetSyncs + go deploymentController.Run(s.ConcurrentReplicaSetSyncs, wait.NeverStop) + glog.Infof("Loading client config for ingress controller %q", "ingress-controller") ingClientset := federationclientset.NewForConfigOrDie(restclient.AddUserAgent(restClientCfg, "ingress-controller")) ingressController := ingresscontroller.NewIngressController(ingClientset) diff --git a/federation/pkg/federation-controller/deployment/deploymentcontroller.go b/federation/pkg/federation-controller/deployment/deploymentcontroller.go new file mode 100644 index 00000000000..74833527871 --- /dev/null +++ b/federation/pkg/federation-controller/deployment/deploymentcontroller.go @@ -0,0 +1,555 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package deployment + +import ( + "bytes" + "encoding/json" + "fmt" + "reflect" + "sort" + "time" + + "github.com/golang/glog" + + fed "k8s.io/kubernetes/federation/apis/federation" + fedv1 "k8s.io/kubernetes/federation/apis/federation/v1beta1" + fedclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_release_1_5" + "k8s.io/kubernetes/federation/pkg/federation-controller/replicaset" + "k8s.io/kubernetes/federation/pkg/federation-controller/replicaset/planner" + fedutil "k8s.io/kubernetes/federation/pkg/federation-controller/util" + "k8s.io/kubernetes/federation/pkg/federation-controller/util/eventsink" + "k8s.io/kubernetes/pkg/api" + apiv1 "k8s.io/kubernetes/pkg/api/v1" + extensionsv1 "k8s.io/kubernetes/pkg/apis/extensions/v1beta1" + "k8s.io/kubernetes/pkg/client/cache" + kubeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_5" + "k8s.io/kubernetes/pkg/client/record" + "k8s.io/kubernetes/pkg/controller" + "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/util/flowcontrol" + "k8s.io/kubernetes/pkg/util/wait" + "k8s.io/kubernetes/pkg/util/workqueue" + "k8s.io/kubernetes/pkg/watch" +) + +const ( + FedDeploymentPreferencesAnnotation = "federation.kubernetes.io/deployment-preferences" + allClustersKey = "THE_ALL_CLUSTER_KEY" + UserAgentName = "Federation-Deployment-Controller" +) + +var ( + deploymentReviewDelay = 10 * time.Second + clusterAvailableDelay = 20 * time.Second + clusterUnavailableDelay = 60 * time.Second + allDeploymentReviewDelay = 2 * time.Minute + updateTimeout = 30 * time.Second +) + +func parseFederationDeploymentPreference(fd *extensionsv1.Deployment) (*fed.FederatedReplicaSetPreferences, error) { + if fd.Annotations == nil { + return nil, nil + } + fdPrefString, found := fd.Annotations[FedDeploymentPreferencesAnnotation] + if !found { + return nil, nil + } + var fdPref fed.FederatedReplicaSetPreferences + if err := json.Unmarshal([]byte(fdPrefString), &fdPref); err != nil { + return nil, err + } + return &fdPref, nil +} + +type DeploymentController struct { + fedClient fedclientset.Interface + + deploymentController *cache.Controller + deploymentStore cache.Store + + fedDeploymentInformer fedutil.FederatedInformer + fedPodInformer fedutil.FederatedInformer + + deploymentDeliverer *fedutil.DelayingDeliverer + clusterDeliverer *fedutil.DelayingDeliverer + deploymentWorkQueue workqueue.Interface + // For updating members of federation. + fedUpdater fedutil.FederatedUpdater + deploymentBackoff *flowcontrol.Backoff + eventRecorder record.EventRecorder + + defaultPlanner *planner.Planner +} + +// NewclusterController returns a new cluster controller +func NewDeploymentController(federationClient fedclientset.Interface) *DeploymentController { + broadcaster := record.NewBroadcaster() + broadcaster.StartRecordingToSink(eventsink.NewFederatedEventSink(federationClient)) + recorder := broadcaster.NewRecorder(api.EventSource{Component: "federated-deployment-controller"}) + + fdc := &DeploymentController{ + fedClient: federationClient, + deploymentDeliverer: fedutil.NewDelayingDeliverer(), + clusterDeliverer: fedutil.NewDelayingDeliverer(), + deploymentWorkQueue: workqueue.New(), + deploymentBackoff: flowcontrol.NewBackOff(5*time.Second, time.Minute), + defaultPlanner: planner.NewPlanner(&fed.FederatedReplicaSetPreferences{ + Clusters: map[string]fed.ClusterReplicaSetPreferences{ + "*": {Weight: 1}, + }, + }), + eventRecorder: recorder, + } + + deploymentFedInformerFactory := func(cluster *fedv1.Cluster, clientset kubeclientset.Interface) (cache.Store, cache.ControllerInterface) { + return cache.NewInformer( + &cache.ListWatch{ + ListFunc: func(options api.ListOptions) (runtime.Object, error) { + versionedOptions := fedutil.VersionizeV1ListOptions(options) + return clientset.Extensions().Deployments(apiv1.NamespaceAll).List(versionedOptions) + }, + WatchFunc: func(options api.ListOptions) (watch.Interface, error) { + versionedOptions := fedutil.VersionizeV1ListOptions(options) + return clientset.Extensions().Deployments(apiv1.NamespaceAll).Watch(versionedOptions) + }, + }, + &extensionsv1.Deployment{}, + controller.NoResyncPeriodFunc(), + fedutil.NewTriggerOnAllChanges( + func(obj runtime.Object) { fdc.deliverLocalDeployment(obj, deploymentReviewDelay) }, + ), + ) + } + clusterLifecycle := fedutil.ClusterLifecycleHandlerFuncs{ + ClusterAvailable: func(cluster *fedv1.Cluster) { + fdc.clusterDeliverer.DeliverAfter(allClustersKey, nil, clusterAvailableDelay) + }, + ClusterUnavailable: func(cluster *fedv1.Cluster, _ []interface{}) { + fdc.clusterDeliverer.DeliverAfter(allClustersKey, nil, clusterUnavailableDelay) + }, + } + fdc.fedDeploymentInformer = fedutil.NewFederatedInformer(federationClient, deploymentFedInformerFactory, &clusterLifecycle) + + podFedInformerFactory := func(cluster *fedv1.Cluster, clientset kubeclientset.Interface) (cache.Store, cache.ControllerInterface) { + return cache.NewInformer( + &cache.ListWatch{ + ListFunc: func(options api.ListOptions) (runtime.Object, error) { + versionedOptions := fedutil.VersionizeV1ListOptions(options) + return clientset.Core().Pods(apiv1.NamespaceAll).List(versionedOptions) + }, + WatchFunc: func(options api.ListOptions) (watch.Interface, error) { + versionedOptions := fedutil.VersionizeV1ListOptions(options) + return clientset.Core().Pods(apiv1.NamespaceAll).Watch(versionedOptions) + }, + }, + &apiv1.Pod{}, + controller.NoResyncPeriodFunc(), + fedutil.NewTriggerOnAllChanges( + func(obj runtime.Object) { + fdc.clusterDeliverer.DeliverAfter(allClustersKey, nil, allDeploymentReviewDelay) + }, + ), + ) + } + fdc.fedPodInformer = fedutil.NewFederatedInformer(federationClient, podFedInformerFactory, &fedutil.ClusterLifecycleHandlerFuncs{}) + + fdc.deploymentStore, fdc.deploymentController = cache.NewInformer( + &cache.ListWatch{ + ListFunc: func(options api.ListOptions) (runtime.Object, error) { + versionedOptions := fedutil.VersionizeV1ListOptions(options) + return fdc.fedClient.Extensions().Deployments(apiv1.NamespaceAll).List(versionedOptions) + }, + WatchFunc: func(options api.ListOptions) (watch.Interface, error) { + versionedOptions := fedutil.VersionizeV1ListOptions(options) + return fdc.fedClient.Extensions().Deployments(apiv1.NamespaceAll).Watch(versionedOptions) + }, + }, + &extensionsv1.Deployment{}, + controller.NoResyncPeriodFunc(), + fedutil.NewTriggerOnMetaAndSpecChanges( + func(obj runtime.Object) { fdc.deliverFedDeploymentObj(obj, deploymentReviewDelay) }, + ), + ) + + fdc.fedUpdater = fedutil.NewFederatedUpdater(fdc.fedDeploymentInformer, + func(client kubeclientset.Interface, obj runtime.Object) error { + rs := obj.(*extensionsv1.Deployment) + _, err := client.Extensions().Deployments(rs.Namespace).Create(rs) + return err + }, + func(client kubeclientset.Interface, obj runtime.Object) error { + rs := obj.(*extensionsv1.Deployment) + _, err := client.Extensions().Deployments(rs.Namespace).Update(rs) + return err + }, + func(client kubeclientset.Interface, obj runtime.Object) error { + rs := obj.(*extensionsv1.Deployment) + err := client.Extensions().Deployments(rs.Namespace).Delete(rs.Name, &apiv1.DeleteOptions{}) + return err + }) + + return fdc +} + +func (fdc *DeploymentController) Run(workers int, stopCh <-chan struct{}) { + go fdc.deploymentController.Run(stopCh) + fdc.fedDeploymentInformer.Start() + fdc.fedPodInformer.Start() + + fdc.deploymentDeliverer.StartWithHandler(func(item *fedutil.DelayingDelivererItem) { + fdc.deploymentWorkQueue.Add(item.Key) + }) + fdc.clusterDeliverer.StartWithHandler(func(_ *fedutil.DelayingDelivererItem) { + fdc.reconcileDeploymentsOnClusterChange() + }) + + // Wait until the cluster is synced to prevent the update storm at the very beginning. + for !fdc.isSynced() { + time.Sleep(5 * time.Millisecond) + glog.Infof("Waiting for controller to sync up") + } + + for i := 0; i < workers; i++ { + go wait.Until(fdc.worker, time.Second, stopCh) + } + + go func() { + for { + // Perform backof registry cleanup from time to time. + select { + case <-time.After(time.Minute): + fdc.deploymentBackoff.GC() + case <-stopCh: + return + } + } + }() + + <-stopCh + glog.Infof("Shutting down DeploymentController") + fdc.deploymentDeliverer.Stop() + fdc.clusterDeliverer.Stop() + fdc.deploymentWorkQueue.ShutDown() + fdc.fedDeploymentInformer.Stop() + fdc.fedPodInformer.Stop() +} + +func (fdc *DeploymentController) isSynced() bool { + if !fdc.fedDeploymentInformer.ClustersSynced() { + glog.V(2).Infof("Cluster list not synced") + return false + } + clustersFromDeps, err := fdc.fedDeploymentInformer.GetReadyClusters() + if err != nil { + glog.Errorf("Failed to get ready clusters: %v", err) + return false + } + if !fdc.fedDeploymentInformer.GetTargetStore().ClustersSynced(clustersFromDeps) { + return false + } + + if !fdc.fedPodInformer.ClustersSynced() { + glog.V(2).Infof("Cluster list not synced") + return false + } + clustersFromPods, err := fdc.fedPodInformer.GetReadyClusters() + if err != nil { + glog.Errorf("Failed to get ready clusters: %v", err) + return false + } + + // This also checks whether podInformer and deploymentInformer have the + // same cluster lists. + if !fdc.fedPodInformer.GetTargetStore().ClustersSynced(clustersFromDeps) { + glog.V(2).Infof("Pod informer not synced") + return false + } + if !fdc.fedPodInformer.GetTargetStore().ClustersSynced(clustersFromPods) { + glog.V(2).Infof("Pod informer not synced") + return false + } + + if !fdc.deploymentController.HasSynced() { + glog.V(2).Infof("federation deployment list not synced") + return false + } + return true +} + +func (fdc *DeploymentController) deliverLocalDeployment(obj interface{}, duration time.Duration) { + key, err := controller.KeyFunc(obj) + if err != nil { + glog.Errorf("Couldn't get key for object %v: %v", obj, err) + return + } + _, exists, err := fdc.deploymentStore.GetByKey(key) + if err != nil { + glog.Errorf("Couldn't get federation deployment %v: %v", key, err) + return + } + if exists { // ignore deployments exists only in local k8s + fdc.deliverDeploymentByKey(key, duration, false) + } +} + +func (fdc *DeploymentController) deliverFedDeploymentObj(obj interface{}, delay time.Duration) { + key, err := controller.KeyFunc(obj) + if err != nil { + glog.Errorf("Couldn't get key for object %+v: %v", obj, err) + return + } + fdc.deliverDeploymentByKey(key, delay, false) +} + +func (fdc *DeploymentController) deliverDeploymentByKey(key string, delay time.Duration, failed bool) { + if failed { + fdc.deploymentBackoff.Next(key, time.Now()) + delay = delay + fdc.deploymentBackoff.Get(key) + } else { + fdc.deploymentBackoff.Reset(key) + } + fdc.deploymentDeliverer.DeliverAfter(key, nil, delay) +} + +func (fdc *DeploymentController) worker() { + for { + item, quit := fdc.deploymentWorkQueue.Get() + if quit { + return + } + key := item.(string) + status, err := fdc.reconcileDeployment(key) + fdc.deploymentWorkQueue.Done(item) + if err != nil { + glog.Errorf("Error syncing cluster controller: %v", err) + fdc.deliverDeploymentByKey(key, 0, true) + } else { + switch status { + case statusAllOk: + break + case statusError: + fdc.deliverDeploymentByKey(key, 0, true) + case statusNeedRecheck: + fdc.deliverDeploymentByKey(key, deploymentReviewDelay, false) + case statusNotSynced: + fdc.deliverDeploymentByKey(key, clusterAvailableDelay, false) + default: + glog.Errorf("Unhandled reconciliation status: %s", status) + fdc.deliverDeploymentByKey(key, deploymentReviewDelay, false) + } + } + } +} + +func (fdc *DeploymentController) schedule(fd *extensionsv1.Deployment, clusters []*fedv1.Cluster, + current map[string]int64, estimatedCapacity map[string]int64) map[string]int64 { + // TODO: integrate real scheduler + + plannerToBeUsed := fdc.defaultPlanner + fdPref, err := parseFederationDeploymentPreference(fd) + if err != nil { + glog.Info("Invalid Deployment specific preference, use default. deployment: %v, err: %v", fd.Name, err) + } + if fdPref != nil { // create a new planner if user specified a preference + plannerToBeUsed = planner.NewPlanner(fdPref) + } + + replicas := int64(*fd.Spec.Replicas) + var clusterNames []string + for _, cluster := range clusters { + clusterNames = append(clusterNames, cluster.Name) + } + scheduleResult, overflow := plannerToBeUsed.Plan(replicas, clusterNames, current, estimatedCapacity, + fd.Namespace+"/"+fd.Name) + // make sure the result contains all clusters that currently have some replicas. + result := make(map[string]int64) + for clusterName := range current { + result[clusterName] = 0 + } + for clusterName, replicas := range scheduleResult { + result[clusterName] = replicas + } + for clusterName, replicas := range overflow { + result[clusterName] += replicas + } + if glog.V(4) { + buf := bytes.NewBufferString(fmt.Sprintf("Schedule - Deployment: %s/%s\n", fd.Namespace, fd.Name)) + sort.Strings(clusterNames) + for _, clusterName := range clusterNames { + cur := current[clusterName] + target := scheduleResult[clusterName] + fmt.Fprintf(buf, "%s: current: %d target: %d", clusterName, cur, target) + if over, found := overflow[clusterName]; found { + fmt.Fprintf(buf, " overflow: %d", over) + } + if capacity, found := estimatedCapacity[clusterName]; found { + fmt.Fprintf(buf, " capacity: %d", capacity) + } + fmt.Fprintf(buf, "\n") + } + glog.V(4).Infof(buf.String()) + } + return result +} + +type reconciliationStatus string + +const ( + statusAllOk = reconciliationStatus("ALL_OK") + statusNeedRecheck = reconciliationStatus("RECHECK") + statusError = reconciliationStatus("ERROR") + statusNotSynced = reconciliationStatus("NOSYNC") +) + +func (fdc *DeploymentController) reconcileDeployment(key string) (reconciliationStatus, error) { + if !fdc.isSynced() { + return statusNotSynced, nil + } + + glog.V(4).Infof("Start reconcile deployment %q", key) + startTime := time.Now() + defer glog.V(4).Infof("Finished reconcile deployment %q (%v)", key, time.Now().Sub(startTime)) + + obj, exists, err := fdc.deploymentStore.GetByKey(key) + if err != nil { + return statusError, err + } + if !exists { + // don't delete local deployments for now. Do not reconcile it anymore. + return statusAllOk, nil + } + fd := obj.(*extensionsv1.Deployment) + + clusters, err := fdc.fedDeploymentInformer.GetReadyClusters() + if err != nil { + return statusError, err + } + + // collect current status and do schedule + allPods, err := fdc.fedPodInformer.GetTargetStore().List() + if err != nil { + return statusError, err + } + podStatus, err := replicaset.AnalysePods(fd.Spec.Selector, allPods, time.Now()) + current := make(map[string]int64) + estimatedCapacity := make(map[string]int64) + for _, cluster := range clusters { + ldObj, exists, err := fdc.fedDeploymentInformer.GetTargetStore().GetByKey(cluster.Name, key) + if err != nil { + return statusError, err + } + if exists { + ld := ldObj.(*extensionsv1.Deployment) + current[cluster.Name] = int64(podStatus[cluster.Name].RunningAndReady) // include pending as well? + unschedulable := int64(podStatus[cluster.Name].Unschedulable) + if unschedulable > 0 { + estimatedCapacity[cluster.Name] = int64(*ld.Spec.Replicas) - unschedulable + } + } + } + + scheduleResult := fdc.schedule(fd, clusters, current, estimatedCapacity) + + glog.V(4).Infof("Start syncing local deployment %s: %v", key, scheduleResult) + + fedStatus := extensionsv1.DeploymentStatus{ObservedGeneration: fd.Generation} + operations := make([]fedutil.FederatedOperation, 0) + for clusterName, replicas := range scheduleResult { + + ldObj, exists, err := fdc.fedDeploymentInformer.GetTargetStore().GetByKey(clusterName, key) + if err != nil { + return statusError, err + } + + ld := &extensionsv1.Deployment{ + ObjectMeta: fedutil.CopyObjectMeta(fd.ObjectMeta), + Spec: fd.Spec, + } + specReplicas := int32(replicas) + ld.Spec.Replicas = &specReplicas + + if !exists { + if replicas > 0 { + fdc.eventRecorder.Eventf(fd, api.EventTypeNormal, "CreateInCluster", + "Creating deployment in cluster %s", clusterName) + + operations = append(operations, fedutil.FederatedOperation{ + Type: fedutil.OperationTypeAdd, + Obj: ld, + ClusterName: clusterName, + }) + } + } else { + // TODO: Update only one deployment at a time if update strategy is rolling udpate. + + currentLd := ldObj.(*extensionsv1.Deployment) + // Update existing replica set, if needed. + if !fedutil.ObjectMetaEquivalent(ld.ObjectMeta, currentLd.ObjectMeta) || + !reflect.DeepEqual(ld.Spec, currentLd.Spec) { + fdc.eventRecorder.Eventf(fd, api.EventTypeNormal, "UpdateInCluster", + "Updating deployment in cluster %s", clusterName) + + operations = append(operations, fedutil.FederatedOperation{ + Type: fedutil.OperationTypeUpdate, + Obj: ld, + ClusterName: clusterName, + }) + glog.Infof("Updating %s in %s", currentLd.Name, clusterName) + } + fedStatus.Replicas += currentLd.Status.Replicas + fedStatus.AvailableReplicas += currentLd.Status.AvailableReplicas + fedStatus.UnavailableReplicas += currentLd.Status.UnavailableReplicas + } + } + if fedStatus.Replicas != fd.Status.Replicas || + fedStatus.AvailableReplicas != fd.Status.AvailableReplicas || + fedStatus.UnavailableReplicas != fd.Status.UnavailableReplicas { + fd.Status = fedStatus + _, err = fdc.fedClient.Extensions().Deployments(fd.Namespace).UpdateStatus(fd) + if err != nil { + return statusError, err + } + } + + if len(operations) == 0 { + // Everything is in order + return statusAllOk, nil + } + err = fdc.fedUpdater.UpdateWithOnError(operations, updateTimeout, func(op fedutil.FederatedOperation, operror error) { + fdc.eventRecorder.Eventf(fd, api.EventTypeNormal, "FailedUpdateInCluster", + "Deployment update in cluster %s failed: %v", op.ClusterName, operror) + }) + if err != nil { + glog.Errorf("Failed to execute updates for %s: %v", key, err) + return statusError, err + } + + // Some operations were made, reconcile after a while. + return statusNeedRecheck, nil +} + +func (fdc *DeploymentController) reconcileDeploymentsOnClusterChange() { + if !fdc.isSynced() { + fdc.clusterDeliverer.DeliverAfter(allClustersKey, nil, clusterAvailableDelay) + } + deps := fdc.deploymentStore.List() + for _, dep := range deps { + key, _ := controller.KeyFunc(dep) + fdc.deliverDeploymentByKey(key, 0, false) + } +} diff --git a/federation/pkg/federation-controller/deployment/deploymentcontroller_test.go b/federation/pkg/federation-controller/deployment/deploymentcontroller_test.go new file mode 100644 index 00000000000..e74cda5ec5f --- /dev/null +++ b/federation/pkg/federation-controller/deployment/deploymentcontroller_test.go @@ -0,0 +1,168 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package deployment + +import ( + "flag" + "fmt" + "testing" + "time" + + fedv1 "k8s.io/kubernetes/federation/apis/federation/v1beta1" + fake_fedclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_release_1_5/fake" + . "k8s.io/kubernetes/federation/pkg/federation-controller/util/test" + "k8s.io/kubernetes/pkg/api/meta" + apiv1 "k8s.io/kubernetes/pkg/api/v1" + extensionsv1 "k8s.io/kubernetes/pkg/apis/extensions/v1beta1" + kubeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_5" + fake_kubeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_5/fake" + "k8s.io/kubernetes/pkg/runtime" + + "github.com/stretchr/testify/assert" +) + +func TestParseFederationDeploymentPreference(t *testing.T) { + successPrefs := []string{ + `{"rebalance": true, + "clusters": { + "k8s-1": {"minReplicas": 10, "maxReplicas": 20, "weight": 2}, + "*": {"weight": 1} + }}`, + } + failedPrefes := []string{ + `{`, // bad json + } + + rs := newDeploymentWithReplicas("d-1", 100) + accessor, _ := meta.Accessor(rs) + anno := accessor.GetAnnotations() + if anno == nil { + anno = make(map[string]string) + accessor.SetAnnotations(anno) + } + for _, prefString := range successPrefs { + anno[FedDeploymentPreferencesAnnotation] = prefString + pref, err := parseFederationDeploymentPreference(rs) + assert.NotNil(t, pref) + assert.Nil(t, err) + } + for _, prefString := range failedPrefes { + anno[FedDeploymentPreferencesAnnotation] = prefString + pref, err := parseFederationDeploymentPreference(rs) + assert.Nil(t, pref) + assert.NotNil(t, err) + } +} + +func TestDeploymentController(t *testing.T) { + flag.Set("logtostderr", "true") + flag.Set("v", "5") + flag.Parse() + + deploymentReviewDelay = 500 * time.Millisecond + clusterAvailableDelay = 100 * time.Millisecond + clusterUnavailableDelay = 100 * time.Millisecond + allDeploymentReviewDelay = 500 * time.Millisecond + + cluster1 := NewCluster("cluster1", apiv1.ConditionTrue) + cluster2 := NewCluster("cluster2", apiv1.ConditionTrue) + + fakeClient := &fake_fedclientset.Clientset{} + RegisterFakeList("clusters", &fakeClient.Fake, &fedv1.ClusterList{Items: []fedv1.Cluster{*cluster1}}) + deploymentsWatch := RegisterFakeWatch("deployments", &fakeClient.Fake) + clusterWatch := RegisterFakeWatch("clusters", &fakeClient.Fake) + + cluster1Client := &fake_kubeclientset.Clientset{} + cluster1Watch := RegisterFakeWatch("deployments", &cluster1Client.Fake) + _ = RegisterFakeWatch("pods", &cluster1Client.Fake) + RegisterFakeList("deployments", &cluster1Client.Fake, &extensionsv1.DeploymentList{Items: []extensionsv1.Deployment{}}) + cluster1CreateChan := RegisterFakeCopyOnCreate("deployments", &cluster1Client.Fake, cluster1Watch) + cluster1UpdateChan := RegisterFakeCopyOnUpdate("deployments", &cluster1Client.Fake, cluster1Watch) + + cluster2Client := &fake_kubeclientset.Clientset{} + cluster2Watch := RegisterFakeWatch("deployments", &cluster2Client.Fake) + _ = RegisterFakeWatch("pods", &cluster2Client.Fake) + RegisterFakeList("deployments", &cluster2Client.Fake, &extensionsv1.DeploymentList{Items: []extensionsv1.Deployment{}}) + cluster2CreateChan := RegisterFakeCopyOnCreate("deployments", &cluster2Client.Fake, cluster2Watch) + + deploymentController := NewDeploymentController(fakeClient) + clientFactory := func(cluster *fedv1.Cluster) (kubeclientset.Interface, error) { + switch cluster.Name { + case cluster1.Name: + return cluster1Client, nil + case cluster2.Name: + return cluster2Client, nil + default: + return nil, fmt.Errorf("Unknown cluster") + } + } + ToFederatedInformerForTestOnly(deploymentController.fedDeploymentInformer).SetClientFactory(clientFactory) + ToFederatedInformerForTestOnly(deploymentController.fedPodInformer).SetClientFactory(clientFactory) + + stop := make(chan struct{}) + go deploymentController.Run(5, stop) + + // Create deployment. Expect to see it in cluster1. + dep1 := newDeploymentWithReplicas("depA", 6) + deploymentsWatch.Add(dep1) + createdDep1 := GetDeploymentFromChan(cluster1CreateChan) + assert.NotNil(t, createdDep1) + assert.Equal(t, dep1.Namespace, createdDep1.Namespace) + assert.Equal(t, dep1.Name, createdDep1.Name) + assert.Equal(t, dep1.Spec.Replicas, createdDep1.Spec.Replicas) + + // Increase replica count. Expect to see the update in cluster1. + newRep := int32(8) + dep1.Spec.Replicas = &newRep + deploymentsWatch.Modify(dep1) + updatedDep1 := GetDeploymentFromChan(cluster1UpdateChan) + assert.NotNil(t, updatedDep1) + assert.Equal(t, dep1.Namespace, updatedDep1.Namespace) + assert.Equal(t, dep1.Name, updatedDep1.Name) + assert.Equal(t, dep1.Spec.Replicas, updatedDep1.Spec.Replicas) + + // Add new cluster. Although rebalance = false, no pods have been created yet so it should + // rebalance anyway. + clusterWatch.Add(cluster2) + updatedDep1 = GetDeploymentFromChan(cluster1UpdateChan) + createdDep2 := GetDeploymentFromChan(cluster2CreateChan) + assert.NotNil(t, updatedDep1) + assert.NotNil(t, createdDep2) + + assert.Equal(t, dep1.Namespace, createdDep2.Namespace) + assert.Equal(t, dep1.Name, createdDep2.Name) + assert.Equal(t, *dep1.Spec.Replicas/2, *createdDep2.Spec.Replicas) + assert.Equal(t, *dep1.Spec.Replicas/2, *updatedDep1.Spec.Replicas) +} + +func GetDeploymentFromChan(c chan runtime.Object) *extensionsv1.Deployment { + secret := GetObjectFromChan(c).(*extensionsv1.Deployment) + return secret +} + +func newDeploymentWithReplicas(name string, replicas int32) *extensionsv1.Deployment { + return &extensionsv1.Deployment{ + ObjectMeta: apiv1.ObjectMeta{ + Name: name, + Namespace: apiv1.NamespaceDefault, + SelfLink: "/api/v1/namespaces/default/deployments/name", + }, + Spec: extensionsv1.DeploymentSpec{ + Replicas: &replicas, + }, + } +} diff --git a/federation/pkg/federation-controller/replicaset/planner/planner.go b/federation/pkg/federation-controller/replicaset/planner/planner.go index 2da7c0d74b1..5c8a65c0c55 100644 --- a/federation/pkg/federation-controller/replicaset/planner/planner.go +++ b/federation/pkg/federation-controller/replicaset/planner/planner.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package planer +package planner import ( "hash/fnv" diff --git a/federation/pkg/federation-controller/replicaset/planner/planner_test.go b/federation/pkg/federation-controller/replicaset/planner/planner_test.go index 34b21f74dc9..60029e81993 100644 --- a/federation/pkg/federation-controller/replicaset/planner/planner_test.go +++ b/federation/pkg/federation-controller/replicaset/planner/planner_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package planer +package planner import ( "testing" diff --git a/federation/pkg/federation-controller/replicaset/pod_helper.go b/federation/pkg/federation-controller/replicaset/pod_helper.go index 35fa3260085..45834dcaa5b 100644 --- a/federation/pkg/federation-controller/replicaset/pod_helper.go +++ b/federation/pkg/federation-controller/replicaset/pod_helper.go @@ -47,8 +47,8 @@ const ( // A function that calculates how many pods from the list are in one of // the meaningful (from the replica set perspective) states. This function is // a temporary workaround against the current lack of ownerRef in pods. -func AnalysePods(replicaSet *v1beta1.ReplicaSet, allPods []util.FederatedObject, currentTime time.Time) (map[string]PodAnalysisResult, error) { - selector, err := labelSelectorAsSelector(replicaSet.Spec.Selector) +func AnalysePods(selectorv1 *v1beta1.LabelSelector, allPods []util.FederatedObject, currentTime time.Time) (map[string]PodAnalysisResult, error) { + selector, err := labelSelectorAsSelector(selectorv1) if err != nil { return nil, fmt.Errorf("invalid selector: %v", err) } diff --git a/federation/pkg/federation-controller/replicaset/pod_helper_test.go b/federation/pkg/federation-controller/replicaset/pod_helper_test.go index 3600f516950..9bef090ddbe 100644 --- a/federation/pkg/federation-controller/replicaset/pod_helper_test.go +++ b/federation/pkg/federation-controller/replicaset/pod_helper_test.go @@ -75,7 +75,7 @@ func TestAnalyze(t *testing.T) { {ClusterName: "c2", Object: podOtherRS}, } - raport, err := AnalysePods(replicaSet, federatedObjects, now) + raport, err := AnalysePods(replicaSet.Spec.Selector, federatedObjects, now) assert.NoError(t, err) assert.Equal(t, 2, len(raport)) c1Raport := raport["c1"] diff --git a/federation/pkg/federation-controller/replicaset/replicasetcontroller.go b/federation/pkg/federation-controller/replicaset/replicasetcontroller.go index e70ef841237..90e02f1c25d 100644 --- a/federation/pkg/federation-controller/replicaset/replicasetcontroller.go +++ b/federation/pkg/federation-controller/replicaset/replicasetcontroller.go @@ -439,7 +439,7 @@ func (frsc *ReplicaSetController) reconcileReplicaSet(key string) (reconciliatio if err != nil { return statusError, err } - podStatus, err := AnalysePods(frs, allPods, time.Now()) + podStatus, err := AnalysePods(frs.Spec.Selector, allPods, time.Now()) current := make(map[string]int64) estimatedCapacity := make(map[string]int64) for _, cluster := range clusters {