From 16943f6f30dd8340220f1f1558bb53c8ecd86e87 Mon Sep 17 00:00:00 2001 From: Jonathan MacMillan Date: Fri, 26 May 2017 12:23:16 -0700 Subject: [PATCH] [Federation] Convert the ReplicaSet controller to a sync controller. --- .../federation-controller-manager/app/BUILD | 1 - .../app/controllermanager.go | 9 - federation/pkg/federatedtypes/BUILD | 20 + .../federatedtypes/crudtester/crudtester.go | 11 +- federation/pkg/federatedtypes/replicaset.go | 354 +++++++++++ .../pkg/federatedtypes/replicaset_test.go | 169 ++++++ federation/pkg/federatedtypes/scheduling.go | 4 + federation/pkg/federation-controller/BUILD | 1 - .../federation-controller/deployment/BUILD | 2 +- .../deployment/deploymentcontroller.go | 67 +- .../federation-controller/replicaset/BUILD | 75 --- .../replicaset/replicasetcontroller.go | 574 ------------------ .../pkg/federation-controller/sync/BUILD | 12 +- .../federation-controller/sync/controller.go | 35 +- .../replicasetcontroller_test.go | 25 +- .../util/podanalyzer/BUILD | 8 +- .../util/podanalyzer/pod_helper.go | 51 +- .../util/podanalyzer/pod_helper_test.go | 43 +- test/e2e_federation/BUILD | 1 - test/e2e_federation/replicaset.go | 4 +- 20 files changed, 692 insertions(+), 774 deletions(-) create mode 100644 federation/pkg/federatedtypes/replicaset.go create mode 100644 federation/pkg/federatedtypes/replicaset_test.go delete mode 100644 federation/pkg/federation-controller/replicaset/BUILD delete mode 100644 federation/pkg/federation-controller/replicaset/replicasetcontroller.go rename federation/pkg/federation-controller/{replicaset => sync}/replicasetcontroller_test.go (90%) diff --git a/federation/cmd/federation-controller-manager/app/BUILD b/federation/cmd/federation-controller-manager/app/BUILD index 4f10afb2d7e..2be5e85b6d3 100644 --- a/federation/cmd/federation-controller-manager/app/BUILD +++ b/federation/cmd/federation-controller-manager/app/BUILD @@ -26,7 +26,6 @@ go_library( "//federation/pkg/federation-controller/deployment:go_default_library", "//federation/pkg/federation-controller/ingress:go_default_library", "//federation/pkg/federation-controller/namespace:go_default_library", - "//federation/pkg/federation-controller/replicaset:go_default_library", "//federation/pkg/federation-controller/service:go_default_library", "//federation/pkg/federation-controller/service/dns:go_default_library", "//federation/pkg/federation-controller/sync:go_default_library", diff --git a/federation/cmd/federation-controller-manager/app/controllermanager.go b/federation/cmd/federation-controller-manager/app/controllermanager.go index 08a7934fd49..17b37d4004b 100644 --- a/federation/cmd/federation-controller-manager/app/controllermanager.go +++ b/federation/cmd/federation-controller-manager/app/controllermanager.go @@ -40,7 +40,6 @@ import ( deploymentcontroller "k8s.io/kubernetes/federation/pkg/federation-controller/deployment" ingresscontroller "k8s.io/kubernetes/federation/pkg/federation-controller/ingress" namespacecontroller "k8s.io/kubernetes/federation/pkg/federation-controller/namespace" - replicasetcontroller "k8s.io/kubernetes/federation/pkg/federation-controller/replicaset" servicecontroller "k8s.io/kubernetes/federation/pkg/federation-controller/service" servicednscontroller "k8s.io/kubernetes/federation/pkg/federation-controller/service/dns" synccontroller "k8s.io/kubernetes/federation/pkg/federation-controller/sync" @@ -167,14 +166,6 @@ func StartControllers(s *options.CMServer, restClientCfg *restclient.Config) err } } - if controllerEnabled(s.Controllers, serverResources, replicasetcontroller.ControllerName, replicasetcontroller.RequiredResources, true) { - glog.V(3).Infof("Loading client config for replica set controller %q", replicasetcontroller.UserAgentName) - replicaSetClientset := federationclientset.NewForConfigOrDie(restclient.AddUserAgent(restClientCfg, replicasetcontroller.UserAgentName)) - replicaSetController := replicasetcontroller.NewReplicaSetController(replicaSetClientset) - glog.V(3).Infof("Running replica set controller") - go replicaSetController.Run(s.ConcurrentReplicaSetSyncs, wait.NeverStop) - } - if controllerEnabled(s.Controllers, serverResources, deploymentcontroller.ControllerName, deploymentcontroller.RequiredResources, true) { glog.V(3).Infof("Loading client config for deployment controller %q", deploymentcontroller.UserAgentName) deploymentClientset := federationclientset.NewForConfigOrDie(restclient.AddUserAgent(restClientCfg, deploymentcontroller.UserAgentName)) diff --git a/federation/pkg/federatedtypes/BUILD b/federation/pkg/federatedtypes/BUILD index 462fa323922..3391d597462 100644 --- a/federation/pkg/federatedtypes/BUILD +++ b/federation/pkg/federatedtypes/BUILD @@ -5,6 +5,7 @@ licenses(["notice"]) load( "@io_bazel_rules_go//go:def.bzl", "go_library", + "go_test", ) go_library( @@ -14,15 +15,21 @@ go_library( "configmap.go", "daemonset.go", "registry.go", + "replicaset.go", "scheduling.go", "secret.go", ], tags = ["automanaged"], deps = [ + "//federation/apis/federation:go_default_library", "//federation/apis/federation/v1beta1:go_default_library", "//federation/client/clientset_generated/federation_clientset:go_default_library", "//federation/pkg/federation-controller/util:go_default_library", + "//federation/pkg/federation-controller/util/planner:go_default_library", + "//federation/pkg/federation-controller/util/podanalyzer:go_default_library", + "//federation/pkg/federation-controller/util/replicapreferences:go_default_library", "//pkg/client/clientset_generated/clientset:go_default_library", + "//vendor/github.com/golang/glog:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library", "//vendor/k8s.io/api/extensions/v1beta1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", @@ -48,3 +55,16 @@ filegroup( ], tags = ["automanaged"], ) + +go_test( + name = "go_default_test", + srcs = ["replicaset_test.go"], + library = ":go_default_library", + tags = ["automanaged"], + deps = [ + "//vendor/github.com/stretchr/testify/assert:go_default_library", + "//vendor/k8s.io/api/core/v1:go_default_library", + "//vendor/k8s.io/api/extensions/v1beta1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + ], +) diff --git a/federation/pkg/federatedtypes/crudtester/crudtester.go b/federation/pkg/federatedtypes/crudtester/crudtester.go index e6eb57e37af..45a7c5ea6f3 100644 --- a/federation/pkg/federatedtypes/crudtester/crudtester.go +++ b/federation/pkg/federatedtypes/crudtester/crudtester.go @@ -197,8 +197,17 @@ func (c *FederatedTypeCRUDTester) CheckPropagationForClients(obj pkgruntime.Obje func (c *FederatedTypeCRUDTester) waitForResource(client clientset.Interface, obj pkgruntime.Object) error { namespacedName := c.adapter.NamespacedName(obj) err := wait.PollImmediate(c.waitInterval, c.clusterWaitTimeout, func() (bool, error) { + equivalenceFunc := c.adapter.Equivalent + if c.adapter.IsSchedulingAdapter() { + schedulingAdapter, ok := c.adapter.(federatedtypes.SchedulingAdapter) + if !ok { + c.tl.Fatalf("Adapter for kind %q does not properly implement SchedulingAdapter.", c.adapter.Kind()) + } + equivalenceFunc = schedulingAdapter.EquivalentIgnoringSchedule + } + clusterObj, err := c.adapter.ClusterGet(client, namespacedName) - if err == nil && c.adapter.Equivalent(clusterObj, obj) { + if err == nil && equivalenceFunc(clusterObj, obj) { return true, nil } if errors.IsNotFound(err) { diff --git a/federation/pkg/federatedtypes/replicaset.go b/federation/pkg/federatedtypes/replicaset.go new file mode 100644 index 00000000000..85f18597a78 --- /dev/null +++ b/federation/pkg/federatedtypes/replicaset.go @@ -0,0 +1,354 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package federatedtypes + +import ( + "bytes" + "fmt" + "sort" + "time" + + "github.com/golang/glog" + apiv1 "k8s.io/api/core/v1" + extensionsv1 "k8s.io/api/extensions/v1beta1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + pkgruntime "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/watch" + fedapi "k8s.io/kubernetes/federation/apis/federation" + fedv1beta1 "k8s.io/kubernetes/federation/apis/federation/v1beta1" + federationclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset" + fedutil "k8s.io/kubernetes/federation/pkg/federation-controller/util" + "k8s.io/kubernetes/federation/pkg/federation-controller/util/planner" + "k8s.io/kubernetes/federation/pkg/federation-controller/util/podanalyzer" + "k8s.io/kubernetes/federation/pkg/federation-controller/util/replicapreferences" + kubeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" +) + +const ( + ReplicaSetKind = "replicaset" + ReplicaSetControllerName = "replicasets" + FedReplicaSetPreferencesAnnotation = "federation.kubernetes.io/replica-set-preferences" +) + +type replicaSetUserInfo struct { + scheduleResult (map[string]int64) + fedStatus *extensionsv1.ReplicaSetStatus +} + +func init() { + RegisterFederatedType(ReplicaSetKind, ReplicaSetControllerName, []schema.GroupVersionResource{extensionsv1.SchemeGroupVersion.WithResource(ReplicaSetControllerName)}, NewReplicaSetAdapter) +} + +type ReplicaSetAdapter struct { + client federationclientset.Interface + defaultPlanner *planner.Planner +} + +func NewReplicaSetAdapter(client federationclientset.Interface) FederatedTypeAdapter { + return &ReplicaSetAdapter{ + client: client, + defaultPlanner: planner.NewPlanner(&fedapi.ReplicaAllocationPreferences{ + Clusters: map[string]fedapi.ClusterPreferences{ + "*": {Weight: 1}, + }, + })} +} + +func (a *ReplicaSetAdapter) Kind() string { + return ReplicaSetKind +} + +func (a *ReplicaSetAdapter) ObjectType() pkgruntime.Object { + return &extensionsv1.ReplicaSet{} +} + +func (a *ReplicaSetAdapter) IsExpectedType(obj interface{}) bool { + _, ok := obj.(*extensionsv1.ReplicaSet) + return ok +} + +func (a *ReplicaSetAdapter) Copy(obj pkgruntime.Object) pkgruntime.Object { + rs := obj.(*extensionsv1.ReplicaSet) + return &extensionsv1.ReplicaSet{ + ObjectMeta: fedutil.DeepCopyRelevantObjectMeta(rs.ObjectMeta), + Spec: *fedutil.DeepCopyApiTypeOrPanic(&rs.Spec).(*extensionsv1.ReplicaSetSpec), + } +} + +func (a *ReplicaSetAdapter) Equivalent(obj1, obj2 pkgruntime.Object) bool { + replicaset1 := obj1.(*extensionsv1.ReplicaSet) + replicaset2 := obj2.(*extensionsv1.ReplicaSet) + return fedutil.ObjectMetaAndSpecEquivalent(replicaset1, replicaset2) +} + +func (a *ReplicaSetAdapter) NamespacedName(obj pkgruntime.Object) types.NamespacedName { + replicaset := obj.(*extensionsv1.ReplicaSet) + return types.NamespacedName{Namespace: replicaset.Namespace, Name: replicaset.Name} +} + +func (a *ReplicaSetAdapter) ObjectMeta(obj pkgruntime.Object) *metav1.ObjectMeta { + return &obj.(*extensionsv1.ReplicaSet).ObjectMeta +} + +func (a *ReplicaSetAdapter) FedCreate(obj pkgruntime.Object) (pkgruntime.Object, error) { + replicaset := obj.(*extensionsv1.ReplicaSet) + return a.client.Extensions().ReplicaSets(replicaset.Namespace).Create(replicaset) +} + +func (a *ReplicaSetAdapter) FedDelete(namespacedName types.NamespacedName, options *metav1.DeleteOptions) error { + return a.client.Extensions().ReplicaSets(namespacedName.Namespace).Delete(namespacedName.Name, options) +} + +func (a *ReplicaSetAdapter) FedGet(namespacedName types.NamespacedName) (pkgruntime.Object, error) { + return a.client.Extensions().ReplicaSets(namespacedName.Namespace).Get(namespacedName.Name, metav1.GetOptions{}) +} + +func (a *ReplicaSetAdapter) FedList(namespace string, options metav1.ListOptions) (pkgruntime.Object, error) { + return a.client.Extensions().ReplicaSets(namespace).List(options) +} + +func (a *ReplicaSetAdapter) FedUpdate(obj pkgruntime.Object) (pkgruntime.Object, error) { + replicaset := obj.(*extensionsv1.ReplicaSet) + return a.client.Extensions().ReplicaSets(replicaset.Namespace).Update(replicaset) +} + +func (a *ReplicaSetAdapter) FedWatch(namespace string, options metav1.ListOptions) (watch.Interface, error) { + return a.client.Extensions().ReplicaSets(namespace).Watch(options) +} + +func (a *ReplicaSetAdapter) ClusterCreate(client kubeclientset.Interface, obj pkgruntime.Object) (pkgruntime.Object, error) { + replicaset := obj.(*extensionsv1.ReplicaSet) + return client.Extensions().ReplicaSets(replicaset.Namespace).Create(replicaset) +} + +func (a *ReplicaSetAdapter) ClusterDelete(client kubeclientset.Interface, nsName types.NamespacedName, options *metav1.DeleteOptions) error { + return client.Extensions().ReplicaSets(nsName.Namespace).Delete(nsName.Name, options) +} + +func (a *ReplicaSetAdapter) ClusterGet(client kubeclientset.Interface, namespacedName types.NamespacedName) (pkgruntime.Object, error) { + return client.Extensions().ReplicaSets(namespacedName.Namespace).Get(namespacedName.Name, metav1.GetOptions{}) +} + +func (a *ReplicaSetAdapter) ClusterList(client kubeclientset.Interface, namespace string, options metav1.ListOptions) (pkgruntime.Object, error) { + return client.Extensions().ReplicaSets(namespace).List(options) +} + +func (a *ReplicaSetAdapter) ClusterUpdate(client kubeclientset.Interface, obj pkgruntime.Object) (pkgruntime.Object, error) { + replicaset := obj.(*extensionsv1.ReplicaSet) + return client.Extensions().ReplicaSets(replicaset.Namespace).Update(replicaset) +} + +func (a *ReplicaSetAdapter) ClusterWatch(client kubeclientset.Interface, namespace string, options metav1.ListOptions) (watch.Interface, error) { + return client.Extensions().ReplicaSets(namespace).Watch(options) +} + +func (a *ReplicaSetAdapter) IsSchedulingAdapter() bool { + return true +} + +func (a *ReplicaSetAdapter) GetSchedule(obj pkgruntime.Object, key string, clusters []*fedv1beta1.Cluster, informer fedutil.FederatedInformer) (*SchedulingInfo, error) { + var clusterNames []string + for _, cluster := range clusters { + clusterNames = append(clusterNames, cluster.Name) + } + + // Schedule the pods across the existing clusters. + replicaSetGetter := func(clusterName, key string) (interface{}, bool, error) { + return informer.GetTargetStore().GetByKey(clusterName, key) + } + podsGetter := func(clusterName string, replicaSet *extensionsv1.ReplicaSet) (*apiv1.PodList, error) { + clientset, err := informer.GetClientsetForCluster(clusterName) + if err != nil { + return nil, err + } + selector, err := metav1.LabelSelectorAsSelector(replicaSet.Spec.Selector) + if err != nil { + return nil, fmt.Errorf("invalid selector: %v", err) + } + return clientset.Core().Pods(replicaSet.ObjectMeta.Namespace).List(metav1.ListOptions{LabelSelector: selector.String()}) + } + current, estimatedCapacity, err := clustersReplicaState(clusterNames, key, replicaSetGetter, podsGetter) + if err != nil { + return nil, err + } + rs := obj.(*extensionsv1.ReplicaSet) + return &SchedulingInfo{ + Schedule: a.schedule(rs, clusterNames, current, estimatedCapacity), + Status: SchedulingStatus{}, + }, nil +} + +func (a *ReplicaSetAdapter) ScheduleObject(cluster *fedv1beta1.Cluster, clusterObj pkgruntime.Object, federationObjCopy pkgruntime.Object, schedulingInfo *SchedulingInfo) (pkgruntime.Object, bool, error) { + rs := federationObjCopy.(*extensionsv1.ReplicaSet) + + replicas, ok := schedulingInfo.Schedule[cluster.Name] + if !ok { + replicas = 0 + } + specReplicas := int32(replicas) + rs.Spec.Replicas = &specReplicas + + if clusterObj != nil { + clusterRs := clusterObj.(*extensionsv1.ReplicaSet) + schedulingInfo.Status.Replicas += clusterRs.Status.Replicas + schedulingInfo.Status.FullyLabeledReplicas += clusterRs.Status.FullyLabeledReplicas + schedulingInfo.Status.ReadyReplicas += clusterRs.Status.ReadyReplicas + schedulingInfo.Status.AvailableReplicas += clusterRs.Status.AvailableReplicas + } + return rs, replicas > 0, nil +} + +func (a *ReplicaSetAdapter) UpdateFederatedStatus(obj pkgruntime.Object, status SchedulingStatus) error { + rs := obj.(*extensionsv1.ReplicaSet) + + if status.Replicas != rs.Status.Replicas || status.FullyLabeledReplicas != rs.Status.FullyLabeledReplicas || + status.ReadyReplicas != rs.Status.ReadyReplicas || status.AvailableReplicas != rs.Status.AvailableReplicas { + rs.Status = extensionsv1.ReplicaSetStatus{ + Replicas: status.Replicas, + FullyLabeledReplicas: status.Replicas, + ReadyReplicas: status.ReadyReplicas, + AvailableReplicas: status.AvailableReplicas, + } + _, err := a.client.Extensions().ReplicaSets(rs.Namespace).UpdateStatus(rs) + return err + } + return nil +} + +func (a *ReplicaSetAdapter) EquivalentIgnoringSchedule(obj1, obj2 pkgruntime.Object) bool { + replicaset1 := obj1.(*extensionsv1.ReplicaSet) + replicaset2 := a.Copy(obj2).(*extensionsv1.ReplicaSet) + replicaset2.Spec.Replicas = replicaset1.Spec.Replicas + return fedutil.ObjectMetaAndSpecEquivalent(replicaset1, replicaset2) +} + +func (a *ReplicaSetAdapter) schedule(frs *extensionsv1.ReplicaSet, clusterNames []string, + current map[string]int64, estimatedCapacity map[string]int64) map[string]int64 { + // TODO: integrate real scheduler + + plnr := a.defaultPlanner + frsPref, err := replicapreferences.GetAllocationPreferences(frs, FedReplicaSetPreferencesAnnotation) + if err != nil { + glog.Info("Invalid ReplicaSet specific preference, use default. rs: %v, err: %v", frs, err) + } + if frsPref != nil { // create a new planner if user specified a preference + plnr = planner.NewPlanner(frsPref) + } + + replicas := int64(*frs.Spec.Replicas) + scheduleResult, overflow := plnr.Plan(replicas, clusterNames, current, estimatedCapacity, + frs.Namespace+"/"+frs.Name) + // Ensure that the schedule being returned has scheduling instructions for + // all of the clusters that currently have replicas. A cluster that was in + // the previous schedule but is not in the new schedule should have zero + // replicas. + result := make(map[string]int64) + for clusterName := range current { + result[clusterName] = 0 + } + for clusterName, replicas := range scheduleResult { + result[clusterName] = replicas + } + for clusterName, replicas := range overflow { + result[clusterName] += replicas + } + if glog.V(4) { + buf := bytes.NewBufferString(fmt.Sprintf("Schedule - ReplicaSet: %s/%s\n", frs.Namespace, frs.Name)) + sort.Strings(clusterNames) + for _, clusterName := range clusterNames { + cur := current[clusterName] + target := scheduleResult[clusterName] + fmt.Fprintf(buf, "%s: current: %d target: %d", clusterName, cur, target) + if over, found := overflow[clusterName]; found { + fmt.Fprintf(buf, " overflow: %d", over) + } + if capacity, found := estimatedCapacity[clusterName]; found { + fmt.Fprintf(buf, " capacity: %d", capacity) + } + fmt.Fprintf(buf, "\n") + } + glog.V(4).Infof(buf.String()) + } + return result +} + +// clusterReplicaState returns information about the scheduling state of the pods running in the federated clusters. +func clustersReplicaState( + clusterNames []string, + replicaSetKey string, + replicaSetGetter func(clusterName string, key string) (interface{}, bool, error), + podsGetter func(clusterName string, replicaSet *extensionsv1.ReplicaSet) (*apiv1.PodList, error)) (current map[string]int64, estimatedCapacity map[string]int64, err error) { + + current = make(map[string]int64) + estimatedCapacity = make(map[string]int64) + + for _, clusterName := range clusterNames { + rsObj, exists, err := replicaSetGetter(clusterName, replicaSetKey) + if err != nil { + return nil, nil, err + } + if !exists { + continue + } + rs := rsObj.(*extensionsv1.ReplicaSet) + if int32(*rs.Spec.Replicas) == rs.Status.ReadyReplicas { + current[clusterName] = int64(rs.Status.ReadyReplicas) + } else { + pods, err := podsGetter(clusterName, rs) + if err != nil { + return nil, nil, err + } + podStatus := podanalyzer.AnalyzePods(pods, time.Now()) + current[clusterName] = int64(podStatus.RunningAndReady) // include pending as well? + unschedulable := int64(podStatus.Unschedulable) + if unschedulable > 0 { + estimatedCapacity[clusterName] = int64(*rs.Spec.Replicas) - unschedulable + } + } + } + return current, estimatedCapacity, nil +} + +func (a *ReplicaSetAdapter) NewTestObject(namespace string) pkgruntime.Object { + replicas := int32(3) + zero := int64(0) + return &extensionsv1.ReplicaSet{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: "test-replicaset-", + Namespace: namespace, + }, + Spec: extensionsv1.ReplicaSetSpec{ + Replicas: &replicas, + Template: apiv1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{"foo": "bar"}, + }, + Spec: apiv1.PodSpec{ + TerminationGracePeriodSeconds: &zero, + Containers: []apiv1.Container{ + { + Name: "nginx", + Image: "nginx", + }, + }, + }, + }, + }, + } +} diff --git a/federation/pkg/federatedtypes/replicaset_test.go b/federation/pkg/federatedtypes/replicaset_test.go new file mode 100644 index 00000000000..a19d2495e95 --- /dev/null +++ b/federation/pkg/federatedtypes/replicaset_test.go @@ -0,0 +1,169 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package federatedtypes + +import ( + "fmt" + "testing" + "time" + + apiv1 "k8s.io/api/core/v1" + extensionsv1 "k8s.io/api/extensions/v1beta1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "github.com/stretchr/testify/assert" +) + +const ( + pods = "pods" + replicasets = "replicasets" + k8s1 = "k8s-1" + k8s2 = "k8s-2" +) + +func TestClusterReplicaState(t *testing.T) { + uncalledPodsGetter := func(clusterName string, replicaSet *extensionsv1.ReplicaSet) (*apiv1.PodList, error) { + t.Fatal("podsGetter should not be called when replica sets are all ready.") + return nil, nil + } + + podsByReplicaSet := make(map[*extensionsv1.ReplicaSet][]*apiv1.Pod) + podsGetter := func(clusterName string, replicaSet *extensionsv1.ReplicaSet) (*apiv1.PodList, error) { + pods, ok := podsByReplicaSet[replicaSet] + if !ok { + t.Fatalf("No pods found in test data for replica set named %v", replicaSet.Name) + return nil, fmt.Errorf("Not found") + } + var podListPods []apiv1.Pod + for _, pod := range pods { + podListPods = append(podListPods, *pod) + } + return &apiv1.PodList{Items: podListPods}, nil + } + + readyCondition := apiv1.PodCondition{Type: apiv1.PodReady} + unschedulableCondition := apiv1.PodCondition{ + Type: apiv1.PodScheduled, + Status: apiv1.ConditionFalse, + Reason: apiv1.PodReasonUnschedulable, + LastTransitionTime: metav1.NewTime(time.Now().Add(-1 * time.Hour)), + } + + one := int64(1) + two := int64(2) + + tests := map[string]struct { + rs1Replicas int32 + rs2Replicas int32 + rs1ReadyReplicas int32 + rs2ReadyReplicas int32 + podsGetter func(clusterName string, replicaSet *extensionsv1.ReplicaSet) (*apiv1.PodList, error) + pod1Phase apiv1.PodPhase + pod1Condition apiv1.PodCondition + pod2Phase apiv1.PodPhase + pod2Condition apiv1.PodCondition + cluster1Replicas *int64 + cluster2Replicas *int64 + cluster1UnschedulableReplicas *int64 + cluster2UnschedulableReplicas *int64 + }{ + "All replica sets have an equal number of requested and ready replicas.": {rs1Replicas: 2, rs2Replicas: 2, rs1ReadyReplicas: 2, rs2ReadyReplicas: 2, podsGetter: uncalledPodsGetter, cluster1Replicas: &two, cluster2Replicas: &two}, + "One replica set has a pending schedulable pod": {rs1Replicas: 2, rs2Replicas: 2, rs1ReadyReplicas: 1, rs2ReadyReplicas: 2, podsGetter: podsGetter, pod1Phase: apiv1.PodRunning, pod1Condition: readyCondition, pod2Phase: apiv1.PodPending, cluster1Replicas: &one, cluster2Replicas: &two}, + "One replica set has an unschedulable pod": {rs1Replicas: 2, rs2Replicas: 2, rs1ReadyReplicas: 1, rs2ReadyReplicas: 2, podsGetter: podsGetter, pod1Phase: apiv1.PodRunning, pod1Condition: readyCondition, pod2Phase: apiv1.PodPending, pod2Condition: unschedulableCondition, cluster1Replicas: &one, cluster2Replicas: &two, cluster1UnschedulableReplicas: &one}, + } + + for name, tt := range tests { + t.Run(name, func(t *testing.T) { + clusters := []string{"one", "two"} + replicaSetsByCluster := make(map[string]*extensionsv1.ReplicaSet) + replicaSetGetter := func(clusterName string, key string) (interface{}, bool, error) { + rs, ok := replicaSetsByCluster[clusterName] + if !ok { + t.Fatalf("No replica set found in test data for %v", clusterName) + return nil, false, fmt.Errorf("Not found") + } + return rs, true, nil + } + rs1 := newReplicaSetWithReplicas("one", tt.rs1Replicas) + rs2 := newReplicaSetWithReplicas("two", tt.rs2Replicas) + rs1.Spec.Replicas = &tt.rs1Replicas + rs2.Spec.Replicas = &tt.rs2Replicas + rs1.Status.ReadyReplicas = tt.rs1ReadyReplicas + rs2.Status.ReadyReplicas = tt.rs2ReadyReplicas + + replicaSetsByCluster["one"] = rs1 + replicaSetsByCluster["two"] = rs2 + + pod1 := newPod("one") + pod2 := newPod("two") + podThree := newPod("three") + podFour := newPod("four") + + pod1.Status.Phase = tt.pod1Phase + pod2.Status.Phase = tt.pod2Phase + pod1.Status.Conditions = []apiv1.PodCondition{tt.pod1Condition} + pod2.Status.Conditions = []apiv1.PodCondition{tt.pod2Condition} + + podsByReplicaSet[rs1] = []*apiv1.Pod{pod1, pod2} + podsByReplicaSet[rs2] = []*apiv1.Pod{podThree, podFour} + + current, estimatedCapacity, err := clustersReplicaState(clusters, "", replicaSetGetter, tt.podsGetter) + + assert.Nil(t, err) + + wantedCurrent := make(map[string]int64) + if tt.cluster1Replicas != nil { + wantedCurrent["one"] = *tt.cluster1Replicas + } + if tt.cluster2Replicas != nil { + wantedCurrent["two"] = *tt.cluster2Replicas + } + assert.Equal(t, wantedCurrent, current) + + wantedEstimatedCapacity := make(map[string]int64) + if tt.cluster1UnschedulableReplicas != nil { + wantedEstimatedCapacity["one"] = *tt.cluster1UnschedulableReplicas + } + if tt.cluster2UnschedulableReplicas != nil { + wantedEstimatedCapacity["two"] = *tt.cluster2UnschedulableReplicas + } + assert.Equal(t, wantedEstimatedCapacity, estimatedCapacity) + }) + } +} + +func newReplicaSetWithReplicas(name string, replicas int32) *extensionsv1.ReplicaSet { + return &extensionsv1.ReplicaSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: metav1.NamespaceDefault, + SelfLink: "/api/v1/namespaces/default/replicasets/name", + }, + Spec: extensionsv1.ReplicaSetSpec{ + Replicas: &replicas, + }, + } +} + +func newPod(name string) *apiv1.Pod { + return &apiv1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: metav1.NamespaceDefault, + }, + } +} diff --git a/federation/pkg/federatedtypes/scheduling.go b/federation/pkg/federatedtypes/scheduling.go index 86d6dc785cd..c5d0b93a845 100644 --- a/federation/pkg/federatedtypes/scheduling.go +++ b/federation/pkg/federatedtypes/scheduling.go @@ -44,4 +44,8 @@ type SchedulingAdapter interface { GetSchedule(obj pkgruntime.Object, key string, clusters []*federationapi.Cluster, informer fedutil.FederatedInformer) (*SchedulingInfo, error) ScheduleObject(cluster *federationapi.Cluster, clusterObj pkgruntime.Object, federationObjCopy pkgruntime.Object, schedulingInfo *SchedulingInfo) (pkgruntime.Object, bool, error) UpdateFederatedStatus(obj pkgruntime.Object, status SchedulingStatus) error + + // EquivalentIgnoringSchedule returns whether obj1 and obj2 are + // equivalent ignoring differences due to scheduling. + EquivalentIgnoringSchedule(obj1, obj2 pkgruntime.Object) bool } diff --git a/federation/pkg/federation-controller/BUILD b/federation/pkg/federation-controller/BUILD index 89eb75e9364..310b2860242 100644 --- a/federation/pkg/federation-controller/BUILD +++ b/federation/pkg/federation-controller/BUILD @@ -28,7 +28,6 @@ filegroup( "//federation/pkg/federation-controller/deployment:all-srcs", "//federation/pkg/federation-controller/ingress:all-srcs", "//federation/pkg/federation-controller/namespace:all-srcs", - "//federation/pkg/federation-controller/replicaset:all-srcs", "//federation/pkg/federation-controller/service:all-srcs", "//federation/pkg/federation-controller/sync:all-srcs", "//federation/pkg/federation-controller/util:all-srcs", diff --git a/federation/pkg/federation-controller/deployment/BUILD b/federation/pkg/federation-controller/deployment/BUILD index ba0e054d94c..207236f2eab 100644 --- a/federation/pkg/federation-controller/deployment/BUILD +++ b/federation/pkg/federation-controller/deployment/BUILD @@ -20,7 +20,6 @@ go_library( "//federation/pkg/federation-controller/util/deletionhelper:go_default_library", "//federation/pkg/federation-controller/util/eventsink:go_default_library", "//federation/pkg/federation-controller/util/planner:go_default_library", - "//federation/pkg/federation-controller/util/podanalyzer:go_default_library", "//federation/pkg/federation-controller/util/replicapreferences:go_default_library", "//pkg/api:go_default_library", "//pkg/client/clientset_generated/clientset:go_default_library", @@ -30,6 +29,7 @@ go_library( "//vendor/k8s.io/api/extensions/v1beta1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/labels:go_default_library", "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", "//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", diff --git a/federation/pkg/federation-controller/deployment/deploymentcontroller.go b/federation/pkg/federation-controller/deployment/deploymentcontroller.go index 10d589b17aa..ccb1e13a480 100644 --- a/federation/pkg/federation-controller/deployment/deploymentcontroller.go +++ b/federation/pkg/federation-controller/deployment/deploymentcontroller.go @@ -29,6 +29,7 @@ import ( extensionsv1 "k8s.io/api/extensions/v1beta1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/wait" @@ -44,7 +45,6 @@ import ( "k8s.io/kubernetes/federation/pkg/federation-controller/util/deletionhelper" "k8s.io/kubernetes/federation/pkg/federation-controller/util/eventsink" "k8s.io/kubernetes/federation/pkg/federation-controller/util/planner" - "k8s.io/kubernetes/federation/pkg/federation-controller/util/podanalyzer" "k8s.io/kubernetes/federation/pkg/federation-controller/util/replicapreferences" "k8s.io/kubernetes/pkg/api" kubeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" @@ -352,6 +352,62 @@ func (fdc *DeploymentController) worker() { } } +type podAnalysisResult struct { + // Total number of pods created. + total int + // Number of pods that are running and ready. + runningAndReady int + // Number of pods that have been in unschedulable state for UnshedulableThreshold seconds. + unschedulable int + + // TODO: Handle other scenarios like pod waiting too long for scheduler etc. +} + +const ( + // TODO: make it configurable + unschedulableThreshold = 60 * time.Second +) + +// A function that calculates how many pods from the list are in one of +// the meaningful (from the replica set perspective) states. This function is +// a temporary workaround against the current lack of ownerRef in pods. +// TODO(perotinus): Unify this with the ReplicaSet controller. +func analyzePods(selectorv1 *metav1.LabelSelector, allPods []fedutil.FederatedObject, currentTime time.Time) (map[string]podAnalysisResult, error) { + selector, err := metav1.LabelSelectorAsSelector(selectorv1) + if err != nil { + return nil, fmt.Errorf("invalid selector: %v", err) + } + result := make(map[string]podAnalysisResult) + + for _, fedObject := range allPods { + pod, isPod := fedObject.Object.(*apiv1.Pod) + if !isPod { + return nil, fmt.Errorf("invalid arg content - not a *pod") + } + if !selector.Empty() && selector.Matches(labels.Set(pod.Labels)) { + status := result[fedObject.ClusterName] + status.total++ + for _, condition := range pod.Status.Conditions { + if pod.Status.Phase == apiv1.PodRunning { + if condition.Type == apiv1.PodReady { + status.runningAndReady++ + } + } else { + if condition.Type == apiv1.PodScheduled && + condition.Status == apiv1.ConditionFalse && + condition.Reason == apiv1.PodReasonUnschedulable && + condition.LastTransitionTime.Add(unschedulableThreshold).Before(currentTime) { + + status.unschedulable++ + } + } + } + result[fedObject.ClusterName] = status + } + } + return result, nil +} + func (fdc *DeploymentController) schedule(fd *extensionsv1.Deployment, clusters []*fedv1.Cluster, current map[string]int64, estimatedCapacity map[string]int64) map[string]int64 { // TODO: integrate real scheduler @@ -471,10 +527,7 @@ func (fdc *DeploymentController) reconcileDeployment(key string) (reconciliation if err != nil { return statusError, err } - podStatus, err := podanalyzer.AnalysePods(fd.Spec.Selector, allPods, time.Now()) - if err != nil { - return statusError, err - } + podStatus, err := analyzePods(fd.Spec.Selector, allPods, time.Now()) current := make(map[string]int64) estimatedCapacity := make(map[string]int64) for _, cluster := range clusters { @@ -484,8 +537,8 @@ func (fdc *DeploymentController) reconcileDeployment(key string) (reconciliation } if exists { ld := ldObj.(*extensionsv1.Deployment) - current[cluster.Name] = int64(podStatus[cluster.Name].RunningAndReady) // include pending as well? - unschedulable := int64(podStatus[cluster.Name].Unschedulable) + current[cluster.Name] = int64(podStatus[cluster.Name].runningAndReady) // include pending as well? + unschedulable := int64(podStatus[cluster.Name].unschedulable) if unschedulable > 0 { estimatedCapacity[cluster.Name] = int64(*ld.Spec.Replicas) - unschedulable } diff --git a/federation/pkg/federation-controller/replicaset/BUILD b/federation/pkg/federation-controller/replicaset/BUILD deleted file mode 100644 index 86c03aa5339..00000000000 --- a/federation/pkg/federation-controller/replicaset/BUILD +++ /dev/null @@ -1,75 +0,0 @@ -package(default_visibility = ["//visibility:public"]) - -licenses(["notice"]) - -load( - "@io_bazel_rules_go//go:def.bzl", - "go_library", - "go_test", -) - -go_library( - name = "go_default_library", - srcs = ["replicasetcontroller.go"], - tags = ["automanaged"], - deps = [ - "//federation/apis/federation:go_default_library", - "//federation/apis/federation/v1beta1:go_default_library", - "//federation/client/clientset_generated/federation_clientset:go_default_library", - "//federation/pkg/federation-controller/util:go_default_library", - "//federation/pkg/federation-controller/util/deletionhelper:go_default_library", - "//federation/pkg/federation-controller/util/eventsink:go_default_library", - "//federation/pkg/federation-controller/util/planner:go_default_library", - "//federation/pkg/federation-controller/util/podanalyzer:go_default_library", - "//federation/pkg/federation-controller/util/replicapreferences:go_default_library", - "//pkg/api:go_default_library", - "//pkg/client/clientset_generated/clientset:go_default_library", - "//pkg/controller:go_default_library", - "//vendor/github.com/golang/glog:go_default_library", - "//vendor/k8s.io/api/core/v1:go_default_library", - "//vendor/k8s.io/api/extensions/v1beta1:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/watch:go_default_library", - "//vendor/k8s.io/client-go/tools/cache:go_default_library", - "//vendor/k8s.io/client-go/tools/record:go_default_library", - "//vendor/k8s.io/client-go/util/flowcontrol:go_default_library", - "//vendor/k8s.io/client-go/util/workqueue:go_default_library", - ], -) - -go_test( - name = "go_default_test", - srcs = ["replicasetcontroller_test.go"], - library = ":go_default_library", - tags = ["automanaged"], - deps = [ - "//federation/apis/federation/v1beta1:go_default_library", - "//federation/client/clientset_generated/federation_clientset/fake:go_default_library", - "//federation/pkg/federation-controller/util/test:go_default_library", - "//pkg/client/clientset_generated/clientset:go_default_library", - "//pkg/client/clientset_generated/clientset/fake:go_default_library", - "//vendor/github.com/stretchr/testify/assert:go_default_library", - "//vendor/k8s.io/api/core/v1:go_default_library", - "//vendor/k8s.io/api/extensions/v1beta1:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/watch:go_default_library", - "//vendor/k8s.io/client-go/testing:go_default_library", - ], -) - -filegroup( - name = "package-srcs", - srcs = glob(["**"]), - tags = ["automanaged"], - visibility = ["//visibility:private"], -) - -filegroup( - name = "all-srcs", - srcs = [":package-srcs"], - tags = ["automanaged"], -) diff --git a/federation/pkg/federation-controller/replicaset/replicasetcontroller.go b/federation/pkg/federation-controller/replicaset/replicasetcontroller.go deleted file mode 100644 index 15ad5414b3c..00000000000 --- a/federation/pkg/federation-controller/replicaset/replicasetcontroller.go +++ /dev/null @@ -1,574 +0,0 @@ -/* -Copyright 2016 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package replicaset - -import ( - "bytes" - "fmt" - "sort" - "time" - - "github.com/golang/glog" - - apiv1 "k8s.io/api/core/v1" - clientv1 "k8s.io/api/core/v1" - extensionsv1 "k8s.io/api/extensions/v1beta1" - "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/runtime/schema" - "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/apimachinery/pkg/watch" - "k8s.io/client-go/tools/cache" - "k8s.io/client-go/tools/record" - "k8s.io/client-go/util/flowcontrol" - "k8s.io/client-go/util/workqueue" - fed "k8s.io/kubernetes/federation/apis/federation" - fedv1 "k8s.io/kubernetes/federation/apis/federation/v1beta1" - fedclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset" - fedutil "k8s.io/kubernetes/federation/pkg/federation-controller/util" - "k8s.io/kubernetes/federation/pkg/federation-controller/util/deletionhelper" - "k8s.io/kubernetes/federation/pkg/federation-controller/util/eventsink" - "k8s.io/kubernetes/federation/pkg/federation-controller/util/planner" - "k8s.io/kubernetes/federation/pkg/federation-controller/util/podanalyzer" - "k8s.io/kubernetes/federation/pkg/federation-controller/util/replicapreferences" - "k8s.io/kubernetes/pkg/api" - kubeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" - "k8s.io/kubernetes/pkg/controller" -) - -const ( - FedReplicaSetPreferencesAnnotation = "federation.kubernetes.io/replica-set-preferences" - allClustersKey = "THE_ALL_CLUSTER_KEY" - UserAgentName = "federation-replicaset-controller" - ControllerName = "replicasets" -) - -var ( - RequiredResources = []schema.GroupVersionResource{extensionsv1.SchemeGroupVersion.WithResource("replicasets")} - replicaSetReviewDelay = 10 * time.Second - clusterAvailableDelay = 20 * time.Second - clusterUnavailableDelay = 60 * time.Second - allReplicaSetReviewDelay = 2 * time.Minute - updateTimeout = 30 * time.Second -) - -type ReplicaSetController struct { - fedClient fedclientset.Interface - - replicaSetStore cache.Store - replicaSetController cache.Controller - - fedReplicaSetInformer fedutil.FederatedInformer - fedPodInformer fedutil.FederatedInformer - - replicasetDeliverer *fedutil.DelayingDeliverer - clusterDeliverer *fedutil.DelayingDeliverer - replicasetWorkQueue workqueue.Interface - // For updating members of federation. - fedUpdater fedutil.FederatedUpdater - - replicaSetBackoff *flowcontrol.Backoff - // For events - eventRecorder record.EventRecorder - - deletionHelper *deletionhelper.DeletionHelper - - defaultPlanner *planner.Planner -} - -// NewReplicaSetController returns a new replicaset controller -func NewReplicaSetController(federationClient fedclientset.Interface) *ReplicaSetController { - broadcaster := record.NewBroadcaster() - broadcaster.StartRecordingToSink(eventsink.NewFederatedEventSink(federationClient)) - recorder := broadcaster.NewRecorder(api.Scheme, clientv1.EventSource{Component: UserAgentName}) - - frsc := &ReplicaSetController{ - fedClient: federationClient, - replicasetDeliverer: fedutil.NewDelayingDeliverer(), - clusterDeliverer: fedutil.NewDelayingDeliverer(), - replicasetWorkQueue: workqueue.New(), - replicaSetBackoff: flowcontrol.NewBackOff(5*time.Second, time.Minute), - defaultPlanner: planner.NewPlanner(&fed.ReplicaAllocationPreferences{ - Clusters: map[string]fed.ClusterPreferences{ - "*": {Weight: 1}, - }, - }), - eventRecorder: recorder, - } - - replicaSetFedInformerFactory := func(cluster *fedv1.Cluster, clientset kubeclientset.Interface) (cache.Store, cache.Controller) { - return cache.NewInformer( - &cache.ListWatch{ - ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { - return clientset.Extensions().ReplicaSets(metav1.NamespaceAll).List(options) - }, - WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { - return clientset.Extensions().ReplicaSets(metav1.NamespaceAll).Watch(options) - }, - }, - &extensionsv1.ReplicaSet{}, - controller.NoResyncPeriodFunc(), - fedutil.NewTriggerOnAllChanges( - func(obj runtime.Object) { frsc.deliverReplicaSetObj(obj, replicaSetReviewDelay) }, - ), - ) - } - clusterLifecycle := fedutil.ClusterLifecycleHandlerFuncs{ - ClusterAvailable: func(cluster *fedv1.Cluster) { - frsc.clusterDeliverer.DeliverAfter(allClustersKey, nil, clusterAvailableDelay) - }, - ClusterUnavailable: func(cluster *fedv1.Cluster, _ []interface{}) { - frsc.clusterDeliverer.DeliverAfter(allClustersKey, nil, clusterUnavailableDelay) - }, - } - frsc.fedReplicaSetInformer = fedutil.NewFederatedInformer(federationClient, replicaSetFedInformerFactory, &clusterLifecycle) - - podFedInformerFactory := func(cluster *fedv1.Cluster, clientset kubeclientset.Interface) (cache.Store, cache.Controller) { - return cache.NewInformer( - &cache.ListWatch{ - ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { - return clientset.Core().Pods(metav1.NamespaceAll).List(options) - }, - WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { - return clientset.Core().Pods(metav1.NamespaceAll).Watch(options) - }, - }, - &apiv1.Pod{}, - controller.NoResyncPeriodFunc(), - fedutil.NewTriggerOnAllChanges( - func(obj runtime.Object) { - frsc.clusterDeliverer.DeliverAfter(allClustersKey, nil, allReplicaSetReviewDelay) - }, - ), - ) - } - frsc.fedPodInformer = fedutil.NewFederatedInformer(federationClient, podFedInformerFactory, &fedutil.ClusterLifecycleHandlerFuncs{}) - frsc.replicaSetStore, frsc.replicaSetController = cache.NewInformer( - &cache.ListWatch{ - ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { - return frsc.fedClient.Extensions().ReplicaSets(metav1.NamespaceAll).List(options) - }, - WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { - return frsc.fedClient.Extensions().ReplicaSets(metav1.NamespaceAll).Watch(options) - }, - }, - &extensionsv1.ReplicaSet{}, - controller.NoResyncPeriodFunc(), - fedutil.NewTriggerOnMetaAndSpecChanges( - func(obj runtime.Object) { frsc.deliverReplicaSetObj(obj, replicaSetReviewDelay) }, - ), - ) - frsc.fedUpdater = fedutil.NewFederatedUpdater(frsc.fedReplicaSetInformer, "replicaset", updateTimeout, frsc.eventRecorder, - func(client kubeclientset.Interface, obj runtime.Object) error { - rs := obj.(*extensionsv1.ReplicaSet) - _, err := client.Extensions().ReplicaSets(rs.Namespace).Create(rs) - return err - }, - func(client kubeclientset.Interface, obj runtime.Object) error { - rs := obj.(*extensionsv1.ReplicaSet) - _, err := client.Extensions().ReplicaSets(rs.Namespace).Update(rs) - return err - }, - func(client kubeclientset.Interface, obj runtime.Object) error { - rs := obj.(*extensionsv1.ReplicaSet) - orphanDependents := false - err := client.Extensions().ReplicaSets(rs.Namespace).Delete(rs.Name, &metav1.DeleteOptions{OrphanDependents: &orphanDependents}) - return err - }) - - frsc.deletionHelper = deletionhelper.NewDeletionHelper( - frsc.updateReplicaSet, - // objNameFunc - func(obj runtime.Object) string { - replicaset := obj.(*extensionsv1.ReplicaSet) - return fmt.Sprintf("%s/%s", replicaset.Namespace, replicaset.Name) - }, - frsc.fedReplicaSetInformer, - frsc.fedUpdater, - ) - - return frsc -} - -// Sends the given updated object to apiserver. -// Assumes that the given object is a replicaset. -func (frsc *ReplicaSetController) updateReplicaSet(obj runtime.Object) (runtime.Object, error) { - replicaset := obj.(*extensionsv1.ReplicaSet) - return frsc.fedClient.Extensions().ReplicaSets(replicaset.Namespace).Update(replicaset) -} - -func (frsc *ReplicaSetController) Run(workers int, stopCh <-chan struct{}) { - go frsc.replicaSetController.Run(stopCh) - frsc.fedReplicaSetInformer.Start() - frsc.fedPodInformer.Start() - - frsc.replicasetDeliverer.StartWithHandler(func(item *fedutil.DelayingDelivererItem) { - frsc.replicasetWorkQueue.Add(item.Key) - }) - frsc.clusterDeliverer.StartWithHandler(func(_ *fedutil.DelayingDelivererItem) { - frsc.reconcileReplicaSetsOnClusterChange() - }) - - for !frsc.isSynced() { - time.Sleep(5 * time.Millisecond) - } - - for i := 0; i < workers; i++ { - go wait.Until(frsc.worker, time.Second, stopCh) - } - - fedutil.StartBackoffGC(frsc.replicaSetBackoff, stopCh) - - <-stopCh - glog.Infof("Shutting down ReplicaSetController") - frsc.replicasetDeliverer.Stop() - frsc.clusterDeliverer.Stop() - frsc.replicasetWorkQueue.ShutDown() - frsc.fedReplicaSetInformer.Stop() - frsc.fedPodInformer.Stop() -} - -func (frsc *ReplicaSetController) isSynced() bool { - if !frsc.fedReplicaSetInformer.ClustersSynced() { - glog.V(2).Infof("Cluster list not synced") - return false - } - clusters, err := frsc.fedReplicaSetInformer.GetReadyClusters() - if err != nil { - glog.Errorf("Failed to get ready clusters: %v", err) - return false - } - if !frsc.fedReplicaSetInformer.GetTargetStore().ClustersSynced(clusters) { - return false - } - - if !frsc.fedPodInformer.ClustersSynced() { - glog.V(2).Infof("Cluster list not synced") - return false - } - clusters2, err := frsc.fedPodInformer.GetReadyClusters() - if err != nil { - glog.Errorf("Failed to get ready clusters: %v", err) - return false - } - - // This also checks whether podInformer and replicaSetInformer have the - // same cluster lists. - if !frsc.fedPodInformer.GetTargetStore().ClustersSynced(clusters) { - return false - } - if !frsc.fedPodInformer.GetTargetStore().ClustersSynced(clusters2) { - return false - } - - if !frsc.replicaSetController.HasSynced() { - glog.V(2).Infof("federation replicaset list not synced") - return false - } - return true -} - -func (frsc *ReplicaSetController) deliverReplicaSetObj(obj interface{}, delay time.Duration) { - key, err := controller.KeyFunc(obj) - if err != nil { - glog.Errorf("Couldn't get key for object %+v: %v", obj, err) - return - } - frsc.deliverReplicaSetByKey(key, delay, false) -} - -func (frsc *ReplicaSetController) deliverReplicaSetByKey(key string, delay time.Duration, failed bool) { - if failed { - frsc.replicaSetBackoff.Next(key, time.Now()) - delay = delay + frsc.replicaSetBackoff.Get(key) - } else { - frsc.replicaSetBackoff.Reset(key) - } - frsc.replicasetDeliverer.DeliverAfter(key, nil, delay) -} - -func (frsc *ReplicaSetController) worker() { - for { - item, quit := frsc.replicasetWorkQueue.Get() - if quit { - return - } - key := item.(string) - status, err := frsc.reconcileReplicaSet(key) - frsc.replicasetWorkQueue.Done(item) - if err != nil { - glog.Errorf("Error syncing cluster controller: %v", err) - frsc.deliverReplicaSetByKey(key, 0, true) - } else { - switch status { - case statusAllOk: - break - case statusError: - frsc.deliverReplicaSetByKey(key, 0, true) - case statusNeedRecheck: - frsc.deliverReplicaSetByKey(key, replicaSetReviewDelay, false) - case statusNotSynced: - frsc.deliverReplicaSetByKey(key, clusterAvailableDelay, false) - default: - glog.Errorf("Unhandled reconciliation status: %s", status) - frsc.deliverReplicaSetByKey(key, replicaSetReviewDelay, false) - } - } - } -} - -func (frsc *ReplicaSetController) schedule(frs *extensionsv1.ReplicaSet, clusters []*fedv1.Cluster, - current map[string]int64, estimatedCapacity map[string]int64) map[string]int64 { - // TODO: integrate real scheduler - - plnr := frsc.defaultPlanner - frsPref, err := replicapreferences.GetAllocationPreferences(frs, FedReplicaSetPreferencesAnnotation) - if err != nil { - glog.Info("Invalid ReplicaSet specific preference, use default. rs: %v, err: %v", frs, err) - } - if frsPref != nil { // create a new planner if user specified a preference - plnr = planner.NewPlanner(frsPref) - } - - replicas := int64(*frs.Spec.Replicas) - var clusterNames []string - for _, cluster := range clusters { - clusterNames = append(clusterNames, cluster.Name) - } - scheduleResult, overflow := plnr.Plan(replicas, clusterNames, current, estimatedCapacity, - frs.Namespace+"/"+frs.Name) - // make sure the return contains clusters need to zero the replicas - result := make(map[string]int64) - for clusterName := range current { - result[clusterName] = 0 - } - for clusterName, replicas := range scheduleResult { - result[clusterName] = replicas - } - for clusterName, replicas := range overflow { - result[clusterName] += replicas - } - if glog.V(4) { - buf := bytes.NewBufferString(fmt.Sprintf("Schedule - ReplicaSet: %s/%s\n", frs.Namespace, frs.Name)) - sort.Strings(clusterNames) - for _, clusterName := range clusterNames { - cur := current[clusterName] - target := scheduleResult[clusterName] - fmt.Fprintf(buf, "%s: current: %d target: %d", clusterName, cur, target) - if over, found := overflow[clusterName]; found { - fmt.Fprintf(buf, " overflow: %d", over) - } - if capacity, found := estimatedCapacity[clusterName]; found { - fmt.Fprintf(buf, " capacity: %d", capacity) - } - fmt.Fprintf(buf, "\n") - } - glog.V(4).Infof(buf.String()) - } - return result -} - -type reconciliationStatus string - -const ( - statusAllOk = reconciliationStatus("ALL_OK") - statusNeedRecheck = reconciliationStatus("RECHECK") - statusError = reconciliationStatus("ERROR") - statusNotSynced = reconciliationStatus("NOSYNC") -) - -func (frsc *ReplicaSetController) reconcileReplicaSet(key string) (reconciliationStatus, error) { - if !frsc.isSynced() { - return statusNotSynced, nil - } - - glog.V(4).Infof("Start reconcile replicaset %q", key) - startTime := time.Now() - defer glog.V(4).Infof("Finished reconcile replicaset %q (%v)", key, time.Now().Sub(startTime)) - - objFromStore, exists, err := frsc.replicaSetStore.GetByKey(key) - if err != nil { - return statusError, err - } - if !exists { - return statusAllOk, nil - } - - obj, err := api.Scheme.DeepCopy(objFromStore) - frs, ok := obj.(*extensionsv1.ReplicaSet) - if err != nil || !ok { - glog.Errorf("Error in retrieving obj from store: %v, %v", ok, err) - frsc.deliverReplicaSetByKey(key, 0, true) - return statusError, err - } - if frs.DeletionTimestamp != nil { - if err := frsc.delete(frs); err != nil { - glog.Errorf("Failed to delete %s: %v", frs, err) - frsc.eventRecorder.Eventf(frs, api.EventTypeWarning, "DeleteFailed", - "ReplicaSet delete failed: %v", err) - frsc.deliverReplicaSetByKey(key, 0, true) - return statusError, err - } - return statusAllOk, nil - } - - glog.V(3).Infof("Ensuring delete object from underlying clusters finalizer for replicaset: %s", - frs.Name) - // Add the required finalizers before creating a replicaset in underlying clusters. - updatedRsObj, err := frsc.deletionHelper.EnsureFinalizers(frs) - if err != nil { - glog.Errorf("Failed to ensure delete object from underlying clusters finalizer in replicaset %s: %v", - frs.Name, err) - frsc.deliverReplicaSetByKey(key, 0, false) - return statusError, err - } - frs = updatedRsObj.(*extensionsv1.ReplicaSet) - - glog.V(3).Infof("Syncing replicaset %s in underlying clusters", frs.Name) - - clusters, err := frsc.fedReplicaSetInformer.GetReadyClusters() - if err != nil { - return statusError, err - } - - // collect current status and do schedule - allPods, err := frsc.fedPodInformer.GetTargetStore().List() - if err != nil { - return statusError, err - } - podStatus, err := podanalyzer.AnalysePods(frs.Spec.Selector, allPods, time.Now()) - if err != nil { - return statusError, err - } - current := make(map[string]int64) - estimatedCapacity := make(map[string]int64) - for _, cluster := range clusters { - lrsObj, exists, err := frsc.fedReplicaSetInformer.GetTargetStore().GetByKey(cluster.Name, key) - if err != nil { - return statusError, err - } - if exists { - lrs := lrsObj.(*extensionsv1.ReplicaSet) - current[cluster.Name] = int64(podStatus[cluster.Name].RunningAndReady) // include pending as well? - unschedulable := int64(podStatus[cluster.Name].Unschedulable) - if unschedulable > 0 { - estimatedCapacity[cluster.Name] = int64(*lrs.Spec.Replicas) - unschedulable - } - } - } - - scheduleResult := frsc.schedule(frs, clusters, current, estimatedCapacity) - - glog.V(4).Infof("Start syncing local replicaset %s: %v", key, scheduleResult) - - fedStatus := extensionsv1.ReplicaSetStatus{ObservedGeneration: frs.Generation} - operations := make([]fedutil.FederatedOperation, 0) - for clusterName, replicas := range scheduleResult { - - lrsObj, exists, err := frsc.fedReplicaSetInformer.GetTargetStore().GetByKey(clusterName, key) - if err != nil { - return statusError, err - } - - // The object can be modified. - lrs := &extensionsv1.ReplicaSet{ - ObjectMeta: fedutil.DeepCopyRelevantObjectMeta(frs.ObjectMeta), - Spec: *fedutil.DeepCopyApiTypeOrPanic(&frs.Spec).(*extensionsv1.ReplicaSetSpec), - } - specReplicas := int32(replicas) - lrs.Spec.Replicas = &specReplicas - - if !exists { - if replicas > 0 { - operations = append(operations, fedutil.FederatedOperation{ - Type: fedutil.OperationTypeAdd, - Obj: lrs, - ClusterName: clusterName, - Key: key, - }) - } - } else { - currentLrs := lrsObj.(*extensionsv1.ReplicaSet) - // Update existing replica set, if needed. - if !fedutil.ObjectMetaAndSpecEquivalent(lrs, currentLrs) { - operations = append(operations, fedutil.FederatedOperation{ - Type: fedutil.OperationTypeUpdate, - Obj: lrs, - ClusterName: clusterName, - Key: key, - }) - } - fedStatus.Replicas += currentLrs.Status.Replicas - fedStatus.FullyLabeledReplicas += currentLrs.Status.FullyLabeledReplicas - fedStatus.ReadyReplicas += currentLrs.Status.ReadyReplicas - fedStatus.AvailableReplicas += currentLrs.Status.AvailableReplicas - } - } - if fedStatus.Replicas != frs.Status.Replicas || fedStatus.FullyLabeledReplicas != frs.Status.FullyLabeledReplicas || - fedStatus.ReadyReplicas != frs.Status.ReadyReplicas || fedStatus.AvailableReplicas != frs.Status.AvailableReplicas { - frs.Status = fedStatus - _, err = frsc.fedClient.Extensions().ReplicaSets(frs.Namespace).UpdateStatus(frs) - if err != nil { - return statusError, err - } - } - - if len(operations) == 0 { - // Everything is in order - return statusAllOk, nil - } - err = frsc.fedUpdater.Update(operations) - if err != nil { - glog.Errorf("Failed to execute updates for %s: %v", key, err) - return statusError, err - } - - // Some operations were made, reconcile after a while. - return statusNeedRecheck, nil -} - -func (frsc *ReplicaSetController) reconcileReplicaSetsOnClusterChange() { - if !frsc.isSynced() { - frsc.clusterDeliverer.DeliverAfter(allClustersKey, nil, clusterAvailableDelay) - } - - for _, rs := range frsc.replicaSetStore.List() { - key, _ := controller.KeyFunc(rs) - frsc.deliverReplicaSetByKey(key, 0, false) - } -} - -// delete deletes the given replicaset or returns error if the deletion was not complete. -func (frsc *ReplicaSetController) delete(replicaset *extensionsv1.ReplicaSet) error { - glog.V(3).Infof("Handling deletion of replicaset: %v", *replicaset) - _, err := frsc.deletionHelper.HandleObjectInUnderlyingClusters(replicaset) - if err != nil { - return err - } - - err = frsc.fedClient.Extensions().ReplicaSets(replicaset.Namespace).Delete(replicaset.Name, nil) - if err != nil { - // Its all good if the error is not found error. That means it is deleted already and we do not have to do anything. - // This is expected when we are processing an update as a result of replicaset finalizer deletion. - // The process that deleted the last finalizer is also going to delete the replicaset and we do not have to do anything. - if !errors.IsNotFound(err) { - return fmt.Errorf("failed to delete replicaset: %v", err) - } - } - return nil -} diff --git a/federation/pkg/federation-controller/sync/BUILD b/federation/pkg/federation-controller/sync/BUILD index 65e348d2786..1258db9f168 100644 --- a/federation/pkg/federation-controller/sync/BUILD +++ b/federation/pkg/federation-controller/sync/BUILD @@ -42,18 +42,28 @@ go_library( go_test( name = "go_default_test", - srcs = ["controller_test.go"], + srcs = [ + "controller_test.go", + "replicasetcontroller_test.go", + ], library = ":go_default_library", tags = ["automanaged"], deps = [ "//federation/apis/federation/v1beta1:go_default_library", + "//federation/client/clientset_generated/federation_clientset/fake:go_default_library", "//federation/pkg/federatedtypes:go_default_library", "//federation/pkg/federation-controller/util:go_default_library", "//federation/pkg/federation-controller/util/test:go_default_library", + "//pkg/client/clientset_generated/clientset:go_default_library", + "//pkg/client/clientset_generated/clientset/fake:go_default_library", + "//vendor/github.com/stretchr/testify/assert:go_default_library", "//vendor/github.com/stretchr/testify/require:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library", + "//vendor/k8s.io/api/extensions/v1beta1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/watch:go_default_library", + "//vendor/k8s.io/client-go/testing:go_default_library", ], ) diff --git a/federation/pkg/federation-controller/sync/controller.go b/federation/pkg/federation-controller/sync/controller.go index e7c43c1cebe..38eb0fc65ed 100644 --- a/federation/pkg/federation-controller/sync/controller.go +++ b/federation/pkg/federation-controller/sync/controller.go @@ -84,10 +84,11 @@ type FederationSyncController struct { deletionHelper *deletionhelper.DeletionHelper - reviewDelay time.Duration - clusterAvailableDelay time.Duration - smallDelay time.Duration - updateTimeout time.Duration + reviewDelay time.Duration + clusterAvailableDelay time.Duration + clusterUnavailableDelay time.Duration + smallDelay time.Duration + updateTimeout time.Duration adapter federatedtypes.FederatedTypeAdapter } @@ -112,14 +113,15 @@ func newFederationSyncController(client federationclientset.Interface, adapter f recorder := broadcaster.NewRecorder(api.Scheme, clientv1.EventSource{Component: fmt.Sprintf("federation-%v-controller", adapter.Kind())}) s := &FederationSyncController{ - reviewDelay: time.Second * 10, - clusterAvailableDelay: time.Second * 20, - smallDelay: time.Second * 3, - updateTimeout: time.Second * 30, - workQueue: workqueue.New(), - backoff: flowcontrol.NewBackOff(5*time.Second, time.Minute), - eventRecorder: recorder, - adapter: adapter, + reviewDelay: time.Second * 10, + clusterAvailableDelay: time.Second * 20, + clusterUnavailableDelay: time.Second * 60, + smallDelay: time.Second * 3, + updateTimeout: time.Second * 30, + workQueue: workqueue.New(), + backoff: flowcontrol.NewBackOff(5*time.Second, time.Minute), + eventRecorder: recorder, + adapter: adapter, } // Build delivereres for triggering reconciliations. @@ -169,6 +171,10 @@ func newFederationSyncController(client federationclientset.Interface, adapter f // When new cluster becomes available process all the target resources again. s.clusterDeliverer.DeliverAt(allClustersKey, nil, time.Now().Add(s.clusterAvailableDelay)) }, + // When a cluster becomes unavailable process all the target resources again. + ClusterUnavailable: func(cluster *federationapi.Cluster, _ []interface{}) { + s.clusterDeliverer.DeliverAt(allClustersKey, nil, time.Now().Add(s.clusterUnavailableDelay)) + }, }, ) @@ -205,6 +211,7 @@ func newFederationSyncController(client federationclientset.Interface, adapter f // minimizeLatency reduces delays and timeouts to make the controller more responsive (useful for testing). func (s *FederationSyncController) minimizeLatency() { s.clusterAvailableDelay = time.Second + s.clusterUnavailableDelay = time.Second s.reviewDelay = 50 * time.Millisecond s.smallDelay = 20 * time.Millisecond s.updateTimeout = 5 * time.Second @@ -328,6 +335,10 @@ func (s *FederationSyncController) reconcile(namespacedName types.NamespacedName kind := s.adapter.Kind() key := namespacedName.String() + glog.V(4).Infof("Starting to reconcile %v %v", kind, key) + startTime := time.Now() + defer glog.V(4).Infof("Finished reconciling %v %v (duration: %v)", kind, key, time.Now().Sub(startTime)) + obj, err := s.objFromCache(kind, key) if err != nil { return statusError diff --git a/federation/pkg/federation-controller/replicaset/replicasetcontroller_test.go b/federation/pkg/federation-controller/sync/replicasetcontroller_test.go similarity index 90% rename from federation/pkg/federation-controller/replicaset/replicasetcontroller_test.go rename to federation/pkg/federation-controller/sync/replicasetcontroller_test.go index 0ddd132a1bb..fa94708677d 100644 --- a/federation/pkg/federation-controller/replicaset/replicasetcontroller_test.go +++ b/federation/pkg/federation-controller/sync/replicasetcontroller_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package replicaset +package sync import ( "flag" @@ -29,7 +29,8 @@ import ( core "k8s.io/client-go/testing" fedv1 "k8s.io/kubernetes/federation/apis/federation/v1beta1" fedclientfake "k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset/fake" - "k8s.io/kubernetes/federation/pkg/federation-controller/util/test" + "k8s.io/kubernetes/federation/pkg/federatedtypes" + testutil "k8s.io/kubernetes/federation/pkg/federation-controller/util/test" kubeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" kubeclientfake "k8s.io/kubernetes/pkg/client/clientset_generated/clientset/fake" @@ -48,11 +49,6 @@ func TestReplicaSetController(t *testing.T) { flag.Set("v", "5") flag.Parse() - replicaSetReviewDelay = 10 * time.Millisecond - clusterAvailableDelay = 20 * time.Millisecond - clusterUnavailableDelay = 60 * time.Millisecond - allReplicaSetReviewDelay = 120 * time.Millisecond - fedclientset := fedclientfake.NewSimpleClientset() fedrswatch := watch.NewFake() fedclientset.PrependWatchReactor(replicasets, core.DefaultWatchReactor(fedrswatch, nil)) @@ -81,20 +77,19 @@ func TestReplicaSetController(t *testing.T) { return nil, fmt.Errorf("Unknown cluster: %v", cluster.Name) } } - replicaSetController := NewReplicaSetController(fedclientset) - rsFedinformer := testutil.ToFederatedInformerForTestOnly(replicaSetController.fedReplicaSetInformer) + replicaSetController := newFederationSyncController(fedclientset, federatedtypes.NewReplicaSetAdapter(fedclientset)) + replicaSetController.minimizeLatency() + rsFedinformer := testutil.ToFederatedInformerForTestOnly(replicaSetController.informer) rsFedinformer.SetClientFactory(fedInformerClientFactory) - podFedinformer := testutil.ToFederatedInformerForTestOnly(replicaSetController.fedPodInformer) - podFedinformer.SetClientFactory(fedInformerClientFactory) stopChan := make(chan struct{}) defer close(stopChan) - go replicaSetController.Run(1, stopChan) + go replicaSetController.Run(stopChan) rs := newReplicaSetWithReplicas("rs", 9) rs, _ = fedclientset.Extensions().ReplicaSets(metav1.NamespaceDefault).Create(rs) fedrswatch.Add(rs) - time.Sleep(1 * time.Second) + time.Sleep(2 * time.Second) rs1, _ := kube1clientset.Extensions().ReplicaSets(metav1.NamespaceDefault).Get(rs.Name, metav1.GetOptions{}) kube1rswatch.Add(rs1) @@ -114,7 +109,7 @@ func TestReplicaSetController(t *testing.T) { rs2, _ = kube2clientset.Extensions().ReplicaSets(metav1.NamespaceDefault).UpdateStatus(rs2) kube2rswatch.Modify(rs2) - time.Sleep(1 * time.Second) + time.Sleep(2 * time.Second) rs, _ = fedclientset.Extensions().ReplicaSets(metav1.NamespaceDefault).Get(rs.Name, metav1.GetOptions{}) assert.Equal(t, *rs.Spec.Replicas, *rs1.Spec.Replicas+*rs2.Spec.Replicas) assert.Equal(t, rs.Status.Replicas, rs1.Status.Replicas+rs2.Status.Replicas) @@ -126,7 +121,7 @@ func TestReplicaSetController(t *testing.T) { rs.Spec.Replicas = &replicas rs, _ = fedclientset.Extensions().ReplicaSets(metav1.NamespaceDefault).Update(rs) fedrswatch.Modify(rs) - time.Sleep(1 * time.Second) + time.Sleep(2 * time.Second) rs1, _ = kube1clientset.Extensions().ReplicaSets(metav1.NamespaceDefault).Get(rs.Name, metav1.GetOptions{}) rs1.Status.Replicas = *rs1.Spec.Replicas diff --git a/federation/pkg/federation-controller/util/podanalyzer/BUILD b/federation/pkg/federation-controller/util/podanalyzer/BUILD index 9f16ce4f519..3e7b1b9da9f 100644 --- a/federation/pkg/federation-controller/util/podanalyzer/BUILD +++ b/federation/pkg/federation-controller/util/podanalyzer/BUILD @@ -12,12 +12,7 @@ go_library( name = "go_default_library", srcs = ["pod_helper.go"], tags = ["automanaged"], - deps = [ - "//federation/pkg/federation-controller/util:go_default_library", - "//vendor/k8s.io/api/core/v1:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/labels:go_default_library", - ], + deps = ["//vendor/k8s.io/api/core/v1:go_default_library"], ) go_test( @@ -26,7 +21,6 @@ go_test( library = ":go_default_library", tags = ["automanaged"], deps = [ - "//federation/pkg/federation-controller/util:go_default_library", "//vendor/github.com/stretchr/testify/assert:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library", "//vendor/k8s.io/api/extensions/v1beta1:go_default_library", diff --git a/federation/pkg/federation-controller/util/podanalyzer/pod_helper.go b/federation/pkg/federation-controller/util/podanalyzer/pod_helper.go index 3212f52935e..53b308f1d96 100644 --- a/federation/pkg/federation-controller/util/podanalyzer/pod_helper.go +++ b/federation/pkg/federation-controller/util/podanalyzer/pod_helper.go @@ -17,13 +17,9 @@ limitations under the License. package podanalyzer import ( - "fmt" "time" api_v1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/labels" - "k8s.io/kubernetes/federation/pkg/federation-controller/util" ) type PodAnalysisResult struct { @@ -42,41 +38,26 @@ const ( UnschedulableThreshold = 60 * time.Second ) -// A function that calculates how many pods from the list are in one of +// AnalyzePods calculates how many pods from the list are in one of // the meaningful (from the replica set perspective) states. This function is // a temporary workaround against the current lack of ownerRef in pods. -func AnalysePods(selectorv1 *metav1.LabelSelector, allPods []util.FederatedObject, currentTime time.Time) (map[string]PodAnalysisResult, error) { - selector, err := metav1.LabelSelectorAsSelector(selectorv1) - if err != nil { - return nil, fmt.Errorf("invalid selector: %v", err) - } - result := make(map[string]PodAnalysisResult) - - for _, fedObject := range allPods { - pod, isPod := fedObject.Object.(*api_v1.Pod) - if !isPod { - return nil, fmt.Errorf("invalid arg content - not a *pod") - } - if !selector.Empty() && selector.Matches(labels.Set(pod.Labels)) { - status := result[fedObject.ClusterName] - status.Total++ - for _, condition := range pod.Status.Conditions { - if pod.Status.Phase == api_v1.PodRunning { - if condition.Type == api_v1.PodReady { - status.RunningAndReady++ - } - } else { - if condition.Type == api_v1.PodScheduled && - condition.Status == api_v1.ConditionFalse && - condition.Reason == api_v1.PodReasonUnschedulable && - condition.LastTransitionTime.Add(UnschedulableThreshold).Before(currentTime) { - - status.Unschedulable++ - } +func AnalyzePods(pods *api_v1.PodList, currentTime time.Time) PodAnalysisResult { + result := PodAnalysisResult{} + for _, pod := range pods.Items { + result.Total++ + for _, condition := range pod.Status.Conditions { + if pod.Status.Phase == api_v1.PodRunning { + if condition.Type == api_v1.PodReady { + result.RunningAndReady++ } + } else if condition.Type == api_v1.PodScheduled && + condition.Status == api_v1.ConditionFalse && + condition.Reason == api_v1.PodReasonUnschedulable && + condition.LastTransitionTime.Add(UnschedulableThreshold).Before(currentTime) { + + result.Unschedulable++ } - result[fedObject.ClusterName] = status } } - return result, nil + return result } diff --git a/federation/pkg/federation-controller/util/podanalyzer/pod_helper_test.go b/federation/pkg/federation-controller/util/podanalyzer/pod_helper_test.go index 46b577f291d..99a16112012 100644 --- a/federation/pkg/federation-controller/util/podanalyzer/pod_helper_test.go +++ b/federation/pkg/federation-controller/util/podanalyzer/pod_helper_test.go @@ -23,16 +23,13 @@ import ( api_v1 "k8s.io/api/core/v1" "k8s.io/api/extensions/v1beta1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/kubernetes/federation/pkg/federation-controller/util" "github.com/stretchr/testify/assert" ) func TestAnalyze(t *testing.T) { now := time.Now() - replicaSet := newReplicaSet(map[string]string{"A": "B"}) - replicaSet2 := newReplicaSet(map[string]string{"C": "D"}) - podRunning := newPod("p1", replicaSet, + podRunning := newPod("p1", api_v1.PodStatus{ Phase: api_v1.PodRunning, Conditions: []api_v1.PodCondition{ @@ -42,7 +39,7 @@ func TestAnalyze(t *testing.T) { }, }, }) - podUnschedulable := newPod("pU", replicaSet, + podUnschedulable := newPod("pU", api_v1.PodStatus{ Phase: api_v1.PodPending, Conditions: []api_v1.PodCondition{ @@ -54,42 +51,25 @@ func TestAnalyze(t *testing.T) { }, }, }) - podOther := newPod("pO", replicaSet, - api_v1.PodStatus{ - Phase: api_v1.PodPending, - Conditions: []api_v1.PodCondition{}, - }) - podOtherRS := newPod("pO", replicaSet2, + podOther := newPod("pO", api_v1.PodStatus{ Phase: api_v1.PodPending, Conditions: []api_v1.PodCondition{}, }) - federatedObjects := []util.FederatedObject{ - {ClusterName: "c1", Object: podRunning}, - {ClusterName: "c1", Object: podRunning}, - {ClusterName: "c1", Object: podRunning}, - {ClusterName: "c1", Object: podUnschedulable}, - {ClusterName: "c1", Object: podUnschedulable}, - {ClusterName: "c2", Object: podOther}, - {ClusterName: "c2", Object: podOtherRS}, - } - - raport, err := AnalysePods(replicaSet.Spec.Selector, federatedObjects, now) - assert.NoError(t, err) - assert.Equal(t, 2, len(raport)) - c1Raport := raport["c1"] - c2Raport := raport["c2"] + result := AnalyzePods(&api_v1.PodList{Items: []api_v1.Pod{*podRunning, *podRunning, *podRunning, *podUnschedulable, *podUnschedulable}}, now) assert.Equal(t, PodAnalysisResult{ Total: 5, RunningAndReady: 3, Unschedulable: 2, - }, c1Raport) + }, result) + + result = AnalyzePods(&api_v1.PodList{Items: []api_v1.Pod{*podOther}}, now) assert.Equal(t, PodAnalysisResult{ Total: 1, RunningAndReady: 0, Unschedulable: 0, - }, c2Raport) + }, result) } func newReplicaSet(selectorMap map[string]string) *v1beta1.ReplicaSet { @@ -97,7 +77,7 @@ func newReplicaSet(selectorMap map[string]string) *v1beta1.ReplicaSet { rs := &v1beta1.ReplicaSet{ ObjectMeta: metav1.ObjectMeta{ Name: "foobar", - Namespace: "default", + Namespace: metav1.NamespaceDefault, }, Spec: v1beta1.ReplicaSetSpec{ Replicas: &replicas, @@ -107,12 +87,11 @@ func newReplicaSet(selectorMap map[string]string) *v1beta1.ReplicaSet { return rs } -func newPod(name string, rs *v1beta1.ReplicaSet, status api_v1.PodStatus) *api_v1.Pod { +func newPod(name string, status api_v1.PodStatus) *api_v1.Pod { return &api_v1.Pod{ ObjectMeta: metav1.ObjectMeta{ Name: name, - Namespace: rs.Namespace, - Labels: rs.Spec.Selector.MatchLabels, + Namespace: metav1.NamespaceDefault, }, Status: status, } diff --git a/test/e2e_federation/BUILD b/test/e2e_federation/BUILD index 10480a9c5e3..2d7afd28d74 100644 --- a/test/e2e_federation/BUILD +++ b/test/e2e_federation/BUILD @@ -29,7 +29,6 @@ go_library( "//federation/client/clientset_generated/federation_clientset:go_default_library", "//federation/client/clientset_generated/federation_clientset/typed/core/v1:go_default_library", "//federation/pkg/federatedtypes:go_default_library", - "//federation/pkg/federation-controller/replicaset:go_default_library", "//federation/pkg/federation-controller/util:go_default_library", "//pkg/api:go_default_library", "//pkg/api/v1:go_default_library", diff --git a/test/e2e_federation/replicaset.go b/test/e2e_federation/replicaset.go index 9a778e89a37..1e588ccf6a6 100644 --- a/test/e2e_federation/replicaset.go +++ b/test/e2e_federation/replicaset.go @@ -37,7 +37,7 @@ import ( "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/kubernetes/federation/apis/federation" - fedreplicsetcontroller "k8s.io/kubernetes/federation/pkg/federation-controller/replicaset" + federatedtypes "k8s.io/kubernetes/federation/pkg/federatedtypes" ) const ( @@ -513,7 +513,7 @@ func newReplicaSetObj(namespace string, replicas int32, pref *federation.Replica if pref != nil { prefBytes, _ := json.Marshal(pref) prefString := string(prefBytes) - rs.Annotations[fedreplicsetcontroller.FedReplicaSetPreferencesAnnotation] = prefString + rs.Annotations[federatedtypes.FedReplicaSetPreferencesAnnotation] = prefString } return rs