Merge pull request #34109 from mwielgus/fed-deployment-controller

Automatic merge from submit-queue

Federated deployment controller - part 1

Based on federated replicaset controller (copy + find/replace).

Remaining stuff:
- refacing out common elements to libs
- using owerref in pod analysis
- e2e tests
- renaming concurrency flag for rs and reusing it in deployment
- updating only one cluster at a time if rollingupdate strategy is used.

cc: @quinton-hoole @kubernetes/sig-cluster-federation

**Release note**:

```release-note
Federated deployment controller that supports the same api as the regular kubernetes deployment controller.
```
This commit is contained in:
Kubernetes Submit Queue 2016-10-10 10:08:01 -07:00 committed by GitHub
commit e72f26a3ff
8 changed files with 735 additions and 6 deletions

View File

@ -30,6 +30,7 @@ import (
"k8s.io/kubernetes/federation/cmd/federation-controller-manager/app/options"
"k8s.io/kubernetes/federation/pkg/dnsprovider"
clustercontroller "k8s.io/kubernetes/federation/pkg/federation-controller/cluster"
deploymentcontroller "k8s.io/kubernetes/federation/pkg/federation-controller/deployment"
ingresscontroller "k8s.io/kubernetes/federation/pkg/federation-controller/ingress"
namespacecontroller "k8s.io/kubernetes/federation/pkg/federation-controller/namespace"
replicasetcontroller "k8s.io/kubernetes/federation/pkg/federation-controller/replicaset"
@ -160,6 +161,11 @@ func StartControllers(s *options.CMServer, restClientCfg *restclient.Config) err
replicaSetController := replicasetcontroller.NewReplicaSetController(replicaSetClientset)
go replicaSetController.Run(s.ConcurrentReplicaSetSyncs, wait.NeverStop)
deploymentClientset := federationclientset.NewForConfigOrDie(restclient.AddUserAgent(restClientCfg, deploymentcontroller.UserAgentName))
deploymentController := deploymentcontroller.NewDeploymentController(deploymentClientset)
// TODO: rename s.ConcurentReplicaSetSyncs
go deploymentController.Run(s.ConcurrentReplicaSetSyncs, wait.NeverStop)
glog.Infof("Loading client config for ingress controller %q", "ingress-controller")
ingClientset := federationclientset.NewForConfigOrDie(restclient.AddUserAgent(restClientCfg, "ingress-controller"))
ingressController := ingresscontroller.NewIngressController(ingClientset)

View File

@ -0,0 +1,555 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package deployment
import (
"bytes"
"encoding/json"
"fmt"
"reflect"
"sort"
"time"
"github.com/golang/glog"
fed "k8s.io/kubernetes/federation/apis/federation"
fedv1 "k8s.io/kubernetes/federation/apis/federation/v1beta1"
fedclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_release_1_5"
"k8s.io/kubernetes/federation/pkg/federation-controller/replicaset"
"k8s.io/kubernetes/federation/pkg/federation-controller/replicaset/planner"
fedutil "k8s.io/kubernetes/federation/pkg/federation-controller/util"
"k8s.io/kubernetes/federation/pkg/federation-controller/util/eventsink"
"k8s.io/kubernetes/pkg/api"
apiv1 "k8s.io/kubernetes/pkg/api/v1"
extensionsv1 "k8s.io/kubernetes/pkg/apis/extensions/v1beta1"
"k8s.io/kubernetes/pkg/client/cache"
kubeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_5"
"k8s.io/kubernetes/pkg/client/record"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util/flowcontrol"
"k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/util/workqueue"
"k8s.io/kubernetes/pkg/watch"
)
const (
FedDeploymentPreferencesAnnotation = "federation.kubernetes.io/deployment-preferences"
allClustersKey = "THE_ALL_CLUSTER_KEY"
UserAgentName = "Federation-Deployment-Controller"
)
var (
deploymentReviewDelay = 10 * time.Second
clusterAvailableDelay = 20 * time.Second
clusterUnavailableDelay = 60 * time.Second
allDeploymentReviewDelay = 2 * time.Minute
updateTimeout = 30 * time.Second
)
func parseFederationDeploymentPreference(fd *extensionsv1.Deployment) (*fed.FederatedReplicaSetPreferences, error) {
if fd.Annotations == nil {
return nil, nil
}
fdPrefString, found := fd.Annotations[FedDeploymentPreferencesAnnotation]
if !found {
return nil, nil
}
var fdPref fed.FederatedReplicaSetPreferences
if err := json.Unmarshal([]byte(fdPrefString), &fdPref); err != nil {
return nil, err
}
return &fdPref, nil
}
type DeploymentController struct {
fedClient fedclientset.Interface
deploymentController *cache.Controller
deploymentStore cache.Store
fedDeploymentInformer fedutil.FederatedInformer
fedPodInformer fedutil.FederatedInformer
deploymentDeliverer *fedutil.DelayingDeliverer
clusterDeliverer *fedutil.DelayingDeliverer
deploymentWorkQueue workqueue.Interface
// For updating members of federation.
fedUpdater fedutil.FederatedUpdater
deploymentBackoff *flowcontrol.Backoff
eventRecorder record.EventRecorder
defaultPlanner *planner.Planner
}
// NewclusterController returns a new cluster controller
func NewDeploymentController(federationClient fedclientset.Interface) *DeploymentController {
broadcaster := record.NewBroadcaster()
broadcaster.StartRecordingToSink(eventsink.NewFederatedEventSink(federationClient))
recorder := broadcaster.NewRecorder(api.EventSource{Component: "federated-deployment-controller"})
fdc := &DeploymentController{
fedClient: federationClient,
deploymentDeliverer: fedutil.NewDelayingDeliverer(),
clusterDeliverer: fedutil.NewDelayingDeliverer(),
deploymentWorkQueue: workqueue.New(),
deploymentBackoff: flowcontrol.NewBackOff(5*time.Second, time.Minute),
defaultPlanner: planner.NewPlanner(&fed.FederatedReplicaSetPreferences{
Clusters: map[string]fed.ClusterReplicaSetPreferences{
"*": {Weight: 1},
},
}),
eventRecorder: recorder,
}
deploymentFedInformerFactory := func(cluster *fedv1.Cluster, clientset kubeclientset.Interface) (cache.Store, cache.ControllerInterface) {
return cache.NewInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
versionedOptions := fedutil.VersionizeV1ListOptions(options)
return clientset.Extensions().Deployments(apiv1.NamespaceAll).List(versionedOptions)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
versionedOptions := fedutil.VersionizeV1ListOptions(options)
return clientset.Extensions().Deployments(apiv1.NamespaceAll).Watch(versionedOptions)
},
},
&extensionsv1.Deployment{},
controller.NoResyncPeriodFunc(),
fedutil.NewTriggerOnAllChanges(
func(obj runtime.Object) { fdc.deliverLocalDeployment(obj, deploymentReviewDelay) },
),
)
}
clusterLifecycle := fedutil.ClusterLifecycleHandlerFuncs{
ClusterAvailable: func(cluster *fedv1.Cluster) {
fdc.clusterDeliverer.DeliverAfter(allClustersKey, nil, clusterAvailableDelay)
},
ClusterUnavailable: func(cluster *fedv1.Cluster, _ []interface{}) {
fdc.clusterDeliverer.DeliverAfter(allClustersKey, nil, clusterUnavailableDelay)
},
}
fdc.fedDeploymentInformer = fedutil.NewFederatedInformer(federationClient, deploymentFedInformerFactory, &clusterLifecycle)
podFedInformerFactory := func(cluster *fedv1.Cluster, clientset kubeclientset.Interface) (cache.Store, cache.ControllerInterface) {
return cache.NewInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
versionedOptions := fedutil.VersionizeV1ListOptions(options)
return clientset.Core().Pods(apiv1.NamespaceAll).List(versionedOptions)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
versionedOptions := fedutil.VersionizeV1ListOptions(options)
return clientset.Core().Pods(apiv1.NamespaceAll).Watch(versionedOptions)
},
},
&apiv1.Pod{},
controller.NoResyncPeriodFunc(),
fedutil.NewTriggerOnAllChanges(
func(obj runtime.Object) {
fdc.clusterDeliverer.DeliverAfter(allClustersKey, nil, allDeploymentReviewDelay)
},
),
)
}
fdc.fedPodInformer = fedutil.NewFederatedInformer(federationClient, podFedInformerFactory, &fedutil.ClusterLifecycleHandlerFuncs{})
fdc.deploymentStore, fdc.deploymentController = cache.NewInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
versionedOptions := fedutil.VersionizeV1ListOptions(options)
return fdc.fedClient.Extensions().Deployments(apiv1.NamespaceAll).List(versionedOptions)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
versionedOptions := fedutil.VersionizeV1ListOptions(options)
return fdc.fedClient.Extensions().Deployments(apiv1.NamespaceAll).Watch(versionedOptions)
},
},
&extensionsv1.Deployment{},
controller.NoResyncPeriodFunc(),
fedutil.NewTriggerOnMetaAndSpecChanges(
func(obj runtime.Object) { fdc.deliverFedDeploymentObj(obj, deploymentReviewDelay) },
),
)
fdc.fedUpdater = fedutil.NewFederatedUpdater(fdc.fedDeploymentInformer,
func(client kubeclientset.Interface, obj runtime.Object) error {
rs := obj.(*extensionsv1.Deployment)
_, err := client.Extensions().Deployments(rs.Namespace).Create(rs)
return err
},
func(client kubeclientset.Interface, obj runtime.Object) error {
rs := obj.(*extensionsv1.Deployment)
_, err := client.Extensions().Deployments(rs.Namespace).Update(rs)
return err
},
func(client kubeclientset.Interface, obj runtime.Object) error {
rs := obj.(*extensionsv1.Deployment)
err := client.Extensions().Deployments(rs.Namespace).Delete(rs.Name, &apiv1.DeleteOptions{})
return err
})
return fdc
}
func (fdc *DeploymentController) Run(workers int, stopCh <-chan struct{}) {
go fdc.deploymentController.Run(stopCh)
fdc.fedDeploymentInformer.Start()
fdc.fedPodInformer.Start()
fdc.deploymentDeliverer.StartWithHandler(func(item *fedutil.DelayingDelivererItem) {
fdc.deploymentWorkQueue.Add(item.Key)
})
fdc.clusterDeliverer.StartWithHandler(func(_ *fedutil.DelayingDelivererItem) {
fdc.reconcileDeploymentsOnClusterChange()
})
// Wait until the cluster is synced to prevent the update storm at the very beginning.
for !fdc.isSynced() {
time.Sleep(5 * time.Millisecond)
glog.Infof("Waiting for controller to sync up")
}
for i := 0; i < workers; i++ {
go wait.Until(fdc.worker, time.Second, stopCh)
}
go func() {
for {
// Perform backof registry cleanup from time to time.
select {
case <-time.After(time.Minute):
fdc.deploymentBackoff.GC()
case <-stopCh:
return
}
}
}()
<-stopCh
glog.Infof("Shutting down DeploymentController")
fdc.deploymentDeliverer.Stop()
fdc.clusterDeliverer.Stop()
fdc.deploymentWorkQueue.ShutDown()
fdc.fedDeploymentInformer.Stop()
fdc.fedPodInformer.Stop()
}
func (fdc *DeploymentController) isSynced() bool {
if !fdc.fedDeploymentInformer.ClustersSynced() {
glog.V(2).Infof("Cluster list not synced")
return false
}
clustersFromDeps, err := fdc.fedDeploymentInformer.GetReadyClusters()
if err != nil {
glog.Errorf("Failed to get ready clusters: %v", err)
return false
}
if !fdc.fedDeploymentInformer.GetTargetStore().ClustersSynced(clustersFromDeps) {
return false
}
if !fdc.fedPodInformer.ClustersSynced() {
glog.V(2).Infof("Cluster list not synced")
return false
}
clustersFromPods, err := fdc.fedPodInformer.GetReadyClusters()
if err != nil {
glog.Errorf("Failed to get ready clusters: %v", err)
return false
}
// This also checks whether podInformer and deploymentInformer have the
// same cluster lists.
if !fdc.fedPodInformer.GetTargetStore().ClustersSynced(clustersFromDeps) {
glog.V(2).Infof("Pod informer not synced")
return false
}
if !fdc.fedPodInformer.GetTargetStore().ClustersSynced(clustersFromPods) {
glog.V(2).Infof("Pod informer not synced")
return false
}
if !fdc.deploymentController.HasSynced() {
glog.V(2).Infof("federation deployment list not synced")
return false
}
return true
}
func (fdc *DeploymentController) deliverLocalDeployment(obj interface{}, duration time.Duration) {
key, err := controller.KeyFunc(obj)
if err != nil {
glog.Errorf("Couldn't get key for object %v: %v", obj, err)
return
}
_, exists, err := fdc.deploymentStore.GetByKey(key)
if err != nil {
glog.Errorf("Couldn't get federation deployment %v: %v", key, err)
return
}
if exists { // ignore deployments exists only in local k8s
fdc.deliverDeploymentByKey(key, duration, false)
}
}
func (fdc *DeploymentController) deliverFedDeploymentObj(obj interface{}, delay time.Duration) {
key, err := controller.KeyFunc(obj)
if err != nil {
glog.Errorf("Couldn't get key for object %+v: %v", obj, err)
return
}
fdc.deliverDeploymentByKey(key, delay, false)
}
func (fdc *DeploymentController) deliverDeploymentByKey(key string, delay time.Duration, failed bool) {
if failed {
fdc.deploymentBackoff.Next(key, time.Now())
delay = delay + fdc.deploymentBackoff.Get(key)
} else {
fdc.deploymentBackoff.Reset(key)
}
fdc.deploymentDeliverer.DeliverAfter(key, nil, delay)
}
func (fdc *DeploymentController) worker() {
for {
item, quit := fdc.deploymentWorkQueue.Get()
if quit {
return
}
key := item.(string)
status, err := fdc.reconcileDeployment(key)
fdc.deploymentWorkQueue.Done(item)
if err != nil {
glog.Errorf("Error syncing cluster controller: %v", err)
fdc.deliverDeploymentByKey(key, 0, true)
} else {
switch status {
case statusAllOk:
break
case statusError:
fdc.deliverDeploymentByKey(key, 0, true)
case statusNeedRecheck:
fdc.deliverDeploymentByKey(key, deploymentReviewDelay, false)
case statusNotSynced:
fdc.deliverDeploymentByKey(key, clusterAvailableDelay, false)
default:
glog.Errorf("Unhandled reconciliation status: %s", status)
fdc.deliverDeploymentByKey(key, deploymentReviewDelay, false)
}
}
}
}
func (fdc *DeploymentController) schedule(fd *extensionsv1.Deployment, clusters []*fedv1.Cluster,
current map[string]int64, estimatedCapacity map[string]int64) map[string]int64 {
// TODO: integrate real scheduler
plannerToBeUsed := fdc.defaultPlanner
fdPref, err := parseFederationDeploymentPreference(fd)
if err != nil {
glog.Info("Invalid Deployment specific preference, use default. deployment: %v, err: %v", fd.Name, err)
}
if fdPref != nil { // create a new planner if user specified a preference
plannerToBeUsed = planner.NewPlanner(fdPref)
}
replicas := int64(*fd.Spec.Replicas)
var clusterNames []string
for _, cluster := range clusters {
clusterNames = append(clusterNames, cluster.Name)
}
scheduleResult, overflow := plannerToBeUsed.Plan(replicas, clusterNames, current, estimatedCapacity,
fd.Namespace+"/"+fd.Name)
// make sure the result contains all clusters that currently have some replicas.
result := make(map[string]int64)
for clusterName := range current {
result[clusterName] = 0
}
for clusterName, replicas := range scheduleResult {
result[clusterName] = replicas
}
for clusterName, replicas := range overflow {
result[clusterName] += replicas
}
if glog.V(4) {
buf := bytes.NewBufferString(fmt.Sprintf("Schedule - Deployment: %s/%s\n", fd.Namespace, fd.Name))
sort.Strings(clusterNames)
for _, clusterName := range clusterNames {
cur := current[clusterName]
target := scheduleResult[clusterName]
fmt.Fprintf(buf, "%s: current: %d target: %d", clusterName, cur, target)
if over, found := overflow[clusterName]; found {
fmt.Fprintf(buf, " overflow: %d", over)
}
if capacity, found := estimatedCapacity[clusterName]; found {
fmt.Fprintf(buf, " capacity: %d", capacity)
}
fmt.Fprintf(buf, "\n")
}
glog.V(4).Infof(buf.String())
}
return result
}
type reconciliationStatus string
const (
statusAllOk = reconciliationStatus("ALL_OK")
statusNeedRecheck = reconciliationStatus("RECHECK")
statusError = reconciliationStatus("ERROR")
statusNotSynced = reconciliationStatus("NOSYNC")
)
func (fdc *DeploymentController) reconcileDeployment(key string) (reconciliationStatus, error) {
if !fdc.isSynced() {
return statusNotSynced, nil
}
glog.V(4).Infof("Start reconcile deployment %q", key)
startTime := time.Now()
defer glog.V(4).Infof("Finished reconcile deployment %q (%v)", key, time.Now().Sub(startTime))
obj, exists, err := fdc.deploymentStore.GetByKey(key)
if err != nil {
return statusError, err
}
if !exists {
// don't delete local deployments for now. Do not reconcile it anymore.
return statusAllOk, nil
}
fd := obj.(*extensionsv1.Deployment)
clusters, err := fdc.fedDeploymentInformer.GetReadyClusters()
if err != nil {
return statusError, err
}
// collect current status and do schedule
allPods, err := fdc.fedPodInformer.GetTargetStore().List()
if err != nil {
return statusError, err
}
podStatus, err := replicaset.AnalysePods(fd.Spec.Selector, allPods, time.Now())
current := make(map[string]int64)
estimatedCapacity := make(map[string]int64)
for _, cluster := range clusters {
ldObj, exists, err := fdc.fedDeploymentInformer.GetTargetStore().GetByKey(cluster.Name, key)
if err != nil {
return statusError, err
}
if exists {
ld := ldObj.(*extensionsv1.Deployment)
current[cluster.Name] = int64(podStatus[cluster.Name].RunningAndReady) // include pending as well?
unschedulable := int64(podStatus[cluster.Name].Unschedulable)
if unschedulable > 0 {
estimatedCapacity[cluster.Name] = int64(*ld.Spec.Replicas) - unschedulable
}
}
}
scheduleResult := fdc.schedule(fd, clusters, current, estimatedCapacity)
glog.V(4).Infof("Start syncing local deployment %s: %v", key, scheduleResult)
fedStatus := extensionsv1.DeploymentStatus{ObservedGeneration: fd.Generation}
operations := make([]fedutil.FederatedOperation, 0)
for clusterName, replicas := range scheduleResult {
ldObj, exists, err := fdc.fedDeploymentInformer.GetTargetStore().GetByKey(clusterName, key)
if err != nil {
return statusError, err
}
ld := &extensionsv1.Deployment{
ObjectMeta: fedutil.CopyObjectMeta(fd.ObjectMeta),
Spec: fd.Spec,
}
specReplicas := int32(replicas)
ld.Spec.Replicas = &specReplicas
if !exists {
if replicas > 0 {
fdc.eventRecorder.Eventf(fd, api.EventTypeNormal, "CreateInCluster",
"Creating deployment in cluster %s", clusterName)
operations = append(operations, fedutil.FederatedOperation{
Type: fedutil.OperationTypeAdd,
Obj: ld,
ClusterName: clusterName,
})
}
} else {
// TODO: Update only one deployment at a time if update strategy is rolling udpate.
currentLd := ldObj.(*extensionsv1.Deployment)
// Update existing replica set, if needed.
if !fedutil.ObjectMetaEquivalent(ld.ObjectMeta, currentLd.ObjectMeta) ||
!reflect.DeepEqual(ld.Spec, currentLd.Spec) {
fdc.eventRecorder.Eventf(fd, api.EventTypeNormal, "UpdateInCluster",
"Updating deployment in cluster %s", clusterName)
operations = append(operations, fedutil.FederatedOperation{
Type: fedutil.OperationTypeUpdate,
Obj: ld,
ClusterName: clusterName,
})
glog.Infof("Updating %s in %s", currentLd.Name, clusterName)
}
fedStatus.Replicas += currentLd.Status.Replicas
fedStatus.AvailableReplicas += currentLd.Status.AvailableReplicas
fedStatus.UnavailableReplicas += currentLd.Status.UnavailableReplicas
}
}
if fedStatus.Replicas != fd.Status.Replicas ||
fedStatus.AvailableReplicas != fd.Status.AvailableReplicas ||
fedStatus.UnavailableReplicas != fd.Status.UnavailableReplicas {
fd.Status = fedStatus
_, err = fdc.fedClient.Extensions().Deployments(fd.Namespace).UpdateStatus(fd)
if err != nil {
return statusError, err
}
}
if len(operations) == 0 {
// Everything is in order
return statusAllOk, nil
}
err = fdc.fedUpdater.UpdateWithOnError(operations, updateTimeout, func(op fedutil.FederatedOperation, operror error) {
fdc.eventRecorder.Eventf(fd, api.EventTypeNormal, "FailedUpdateInCluster",
"Deployment update in cluster %s failed: %v", op.ClusterName, operror)
})
if err != nil {
glog.Errorf("Failed to execute updates for %s: %v", key, err)
return statusError, err
}
// Some operations were made, reconcile after a while.
return statusNeedRecheck, nil
}
func (fdc *DeploymentController) reconcileDeploymentsOnClusterChange() {
if !fdc.isSynced() {
fdc.clusterDeliverer.DeliverAfter(allClustersKey, nil, clusterAvailableDelay)
}
deps := fdc.deploymentStore.List()
for _, dep := range deps {
key, _ := controller.KeyFunc(dep)
fdc.deliverDeploymentByKey(key, 0, false)
}
}

View File

@ -0,0 +1,168 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package deployment
import (
"flag"
"fmt"
"testing"
"time"
fedv1 "k8s.io/kubernetes/federation/apis/federation/v1beta1"
fake_fedclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_release_1_5/fake"
. "k8s.io/kubernetes/federation/pkg/federation-controller/util/test"
"k8s.io/kubernetes/pkg/api/meta"
apiv1 "k8s.io/kubernetes/pkg/api/v1"
extensionsv1 "k8s.io/kubernetes/pkg/apis/extensions/v1beta1"
kubeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_5"
fake_kubeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_5/fake"
"k8s.io/kubernetes/pkg/runtime"
"github.com/stretchr/testify/assert"
)
func TestParseFederationDeploymentPreference(t *testing.T) {
successPrefs := []string{
`{"rebalance": true,
"clusters": {
"k8s-1": {"minReplicas": 10, "maxReplicas": 20, "weight": 2},
"*": {"weight": 1}
}}`,
}
failedPrefes := []string{
`{`, // bad json
}
rs := newDeploymentWithReplicas("d-1", 100)
accessor, _ := meta.Accessor(rs)
anno := accessor.GetAnnotations()
if anno == nil {
anno = make(map[string]string)
accessor.SetAnnotations(anno)
}
for _, prefString := range successPrefs {
anno[FedDeploymentPreferencesAnnotation] = prefString
pref, err := parseFederationDeploymentPreference(rs)
assert.NotNil(t, pref)
assert.Nil(t, err)
}
for _, prefString := range failedPrefes {
anno[FedDeploymentPreferencesAnnotation] = prefString
pref, err := parseFederationDeploymentPreference(rs)
assert.Nil(t, pref)
assert.NotNil(t, err)
}
}
func TestDeploymentController(t *testing.T) {
flag.Set("logtostderr", "true")
flag.Set("v", "5")
flag.Parse()
deploymentReviewDelay = 500 * time.Millisecond
clusterAvailableDelay = 100 * time.Millisecond
clusterUnavailableDelay = 100 * time.Millisecond
allDeploymentReviewDelay = 500 * time.Millisecond
cluster1 := NewCluster("cluster1", apiv1.ConditionTrue)
cluster2 := NewCluster("cluster2", apiv1.ConditionTrue)
fakeClient := &fake_fedclientset.Clientset{}
RegisterFakeList("clusters", &fakeClient.Fake, &fedv1.ClusterList{Items: []fedv1.Cluster{*cluster1}})
deploymentsWatch := RegisterFakeWatch("deployments", &fakeClient.Fake)
clusterWatch := RegisterFakeWatch("clusters", &fakeClient.Fake)
cluster1Client := &fake_kubeclientset.Clientset{}
cluster1Watch := RegisterFakeWatch("deployments", &cluster1Client.Fake)
_ = RegisterFakeWatch("pods", &cluster1Client.Fake)
RegisterFakeList("deployments", &cluster1Client.Fake, &extensionsv1.DeploymentList{Items: []extensionsv1.Deployment{}})
cluster1CreateChan := RegisterFakeCopyOnCreate("deployments", &cluster1Client.Fake, cluster1Watch)
cluster1UpdateChan := RegisterFakeCopyOnUpdate("deployments", &cluster1Client.Fake, cluster1Watch)
cluster2Client := &fake_kubeclientset.Clientset{}
cluster2Watch := RegisterFakeWatch("deployments", &cluster2Client.Fake)
_ = RegisterFakeWatch("pods", &cluster2Client.Fake)
RegisterFakeList("deployments", &cluster2Client.Fake, &extensionsv1.DeploymentList{Items: []extensionsv1.Deployment{}})
cluster2CreateChan := RegisterFakeCopyOnCreate("deployments", &cluster2Client.Fake, cluster2Watch)
deploymentController := NewDeploymentController(fakeClient)
clientFactory := func(cluster *fedv1.Cluster) (kubeclientset.Interface, error) {
switch cluster.Name {
case cluster1.Name:
return cluster1Client, nil
case cluster2.Name:
return cluster2Client, nil
default:
return nil, fmt.Errorf("Unknown cluster")
}
}
ToFederatedInformerForTestOnly(deploymentController.fedDeploymentInformer).SetClientFactory(clientFactory)
ToFederatedInformerForTestOnly(deploymentController.fedPodInformer).SetClientFactory(clientFactory)
stop := make(chan struct{})
go deploymentController.Run(5, stop)
// Create deployment. Expect to see it in cluster1.
dep1 := newDeploymentWithReplicas("depA", 6)
deploymentsWatch.Add(dep1)
createdDep1 := GetDeploymentFromChan(cluster1CreateChan)
assert.NotNil(t, createdDep1)
assert.Equal(t, dep1.Namespace, createdDep1.Namespace)
assert.Equal(t, dep1.Name, createdDep1.Name)
assert.Equal(t, dep1.Spec.Replicas, createdDep1.Spec.Replicas)
// Increase replica count. Expect to see the update in cluster1.
newRep := int32(8)
dep1.Spec.Replicas = &newRep
deploymentsWatch.Modify(dep1)
updatedDep1 := GetDeploymentFromChan(cluster1UpdateChan)
assert.NotNil(t, updatedDep1)
assert.Equal(t, dep1.Namespace, updatedDep1.Namespace)
assert.Equal(t, dep1.Name, updatedDep1.Name)
assert.Equal(t, dep1.Spec.Replicas, updatedDep1.Spec.Replicas)
// Add new cluster. Although rebalance = false, no pods have been created yet so it should
// rebalance anyway.
clusterWatch.Add(cluster2)
updatedDep1 = GetDeploymentFromChan(cluster1UpdateChan)
createdDep2 := GetDeploymentFromChan(cluster2CreateChan)
assert.NotNil(t, updatedDep1)
assert.NotNil(t, createdDep2)
assert.Equal(t, dep1.Namespace, createdDep2.Namespace)
assert.Equal(t, dep1.Name, createdDep2.Name)
assert.Equal(t, *dep1.Spec.Replicas/2, *createdDep2.Spec.Replicas)
assert.Equal(t, *dep1.Spec.Replicas/2, *updatedDep1.Spec.Replicas)
}
func GetDeploymentFromChan(c chan runtime.Object) *extensionsv1.Deployment {
secret := GetObjectFromChan(c).(*extensionsv1.Deployment)
return secret
}
func newDeploymentWithReplicas(name string, replicas int32) *extensionsv1.Deployment {
return &extensionsv1.Deployment{
ObjectMeta: apiv1.ObjectMeta{
Name: name,
Namespace: apiv1.NamespaceDefault,
SelfLink: "/api/v1/namespaces/default/deployments/name",
},
Spec: extensionsv1.DeploymentSpec{
Replicas: &replicas,
},
}
}

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package planer
package planner
import (
"hash/fnv"

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package planer
package planner
import (
"testing"

View File

@ -47,8 +47,8 @@ const (
// A function that calculates how many pods from the list are in one of
// the meaningful (from the replica set perspective) states. This function is
// a temporary workaround against the current lack of ownerRef in pods.
func AnalysePods(replicaSet *v1beta1.ReplicaSet, allPods []util.FederatedObject, currentTime time.Time) (map[string]PodAnalysisResult, error) {
selector, err := labelSelectorAsSelector(replicaSet.Spec.Selector)
func AnalysePods(selectorv1 *v1beta1.LabelSelector, allPods []util.FederatedObject, currentTime time.Time) (map[string]PodAnalysisResult, error) {
selector, err := labelSelectorAsSelector(selectorv1)
if err != nil {
return nil, fmt.Errorf("invalid selector: %v", err)
}

View File

@ -75,7 +75,7 @@ func TestAnalyze(t *testing.T) {
{ClusterName: "c2", Object: podOtherRS},
}
raport, err := AnalysePods(replicaSet, federatedObjects, now)
raport, err := AnalysePods(replicaSet.Spec.Selector, federatedObjects, now)
assert.NoError(t, err)
assert.Equal(t, 2, len(raport))
c1Raport := raport["c1"]

View File

@ -440,7 +440,7 @@ func (frsc *ReplicaSetController) reconcileReplicaSet(key string) (reconciliatio
if err != nil {
return statusError, err
}
podStatus, err := AnalysePods(frs, allPods, time.Now())
podStatus, err := AnalysePods(frs.Spec.Selector, allPods, time.Now())
current := make(map[string]int64)
estimatedCapacity := make(map[string]int64)
for _, cluster := range clusters {