[Federation] Convert the ReplicaSet controller to a sync controller.

This commit is contained in:
Jonathan MacMillan 2017-05-26 12:23:16 -07:00
parent e123311d8a
commit 16943f6f30
20 changed files with 692 additions and 774 deletions

View File

@ -26,7 +26,6 @@ go_library(
"//federation/pkg/federation-controller/deployment:go_default_library",
"//federation/pkg/federation-controller/ingress:go_default_library",
"//federation/pkg/federation-controller/namespace:go_default_library",
"//federation/pkg/federation-controller/replicaset:go_default_library",
"//federation/pkg/federation-controller/service:go_default_library",
"//federation/pkg/federation-controller/service/dns:go_default_library",
"//federation/pkg/federation-controller/sync:go_default_library",

View File

@ -40,7 +40,6 @@ import (
deploymentcontroller "k8s.io/kubernetes/federation/pkg/federation-controller/deployment"
ingresscontroller "k8s.io/kubernetes/federation/pkg/federation-controller/ingress"
namespacecontroller "k8s.io/kubernetes/federation/pkg/federation-controller/namespace"
replicasetcontroller "k8s.io/kubernetes/federation/pkg/federation-controller/replicaset"
servicecontroller "k8s.io/kubernetes/federation/pkg/federation-controller/service"
servicednscontroller "k8s.io/kubernetes/federation/pkg/federation-controller/service/dns"
synccontroller "k8s.io/kubernetes/federation/pkg/federation-controller/sync"
@ -167,14 +166,6 @@ func StartControllers(s *options.CMServer, restClientCfg *restclient.Config) err
}
}
if controllerEnabled(s.Controllers, serverResources, replicasetcontroller.ControllerName, replicasetcontroller.RequiredResources, true) {
glog.V(3).Infof("Loading client config for replica set controller %q", replicasetcontroller.UserAgentName)
replicaSetClientset := federationclientset.NewForConfigOrDie(restclient.AddUserAgent(restClientCfg, replicasetcontroller.UserAgentName))
replicaSetController := replicasetcontroller.NewReplicaSetController(replicaSetClientset)
glog.V(3).Infof("Running replica set controller")
go replicaSetController.Run(s.ConcurrentReplicaSetSyncs, wait.NeverStop)
}
if controllerEnabled(s.Controllers, serverResources, deploymentcontroller.ControllerName, deploymentcontroller.RequiredResources, true) {
glog.V(3).Infof("Loading client config for deployment controller %q", deploymentcontroller.UserAgentName)
deploymentClientset := federationclientset.NewForConfigOrDie(restclient.AddUserAgent(restClientCfg, deploymentcontroller.UserAgentName))

View File

@ -5,6 +5,7 @@ licenses(["notice"])
load(
"@io_bazel_rules_go//go:def.bzl",
"go_library",
"go_test",
)
go_library(
@ -14,15 +15,21 @@ go_library(
"configmap.go",
"daemonset.go",
"registry.go",
"replicaset.go",
"scheduling.go",
"secret.go",
],
tags = ["automanaged"],
deps = [
"//federation/apis/federation:go_default_library",
"//federation/apis/federation/v1beta1:go_default_library",
"//federation/client/clientset_generated/federation_clientset:go_default_library",
"//federation/pkg/federation-controller/util:go_default_library",
"//federation/pkg/federation-controller/util/planner:go_default_library",
"//federation/pkg/federation-controller/util/podanalyzer:go_default_library",
"//federation/pkg/federation-controller/util/replicapreferences:go_default_library",
"//pkg/client/clientset_generated/clientset:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/api/extensions/v1beta1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
@ -48,3 +55,16 @@ filegroup(
],
tags = ["automanaged"],
)
go_test(
name = "go_default_test",
srcs = ["replicaset_test.go"],
library = ":go_default_library",
tags = ["automanaged"],
deps = [
"//vendor/github.com/stretchr/testify/assert:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/api/extensions/v1beta1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
],
)

View File

@ -197,8 +197,17 @@ func (c *FederatedTypeCRUDTester) CheckPropagationForClients(obj pkgruntime.Obje
func (c *FederatedTypeCRUDTester) waitForResource(client clientset.Interface, obj pkgruntime.Object) error {
namespacedName := c.adapter.NamespacedName(obj)
err := wait.PollImmediate(c.waitInterval, c.clusterWaitTimeout, func() (bool, error) {
equivalenceFunc := c.adapter.Equivalent
if c.adapter.IsSchedulingAdapter() {
schedulingAdapter, ok := c.adapter.(federatedtypes.SchedulingAdapter)
if !ok {
c.tl.Fatalf("Adapter for kind %q does not properly implement SchedulingAdapter.", c.adapter.Kind())
}
equivalenceFunc = schedulingAdapter.EquivalentIgnoringSchedule
}
clusterObj, err := c.adapter.ClusterGet(client, namespacedName)
if err == nil && c.adapter.Equivalent(clusterObj, obj) {
if err == nil && equivalenceFunc(clusterObj, obj) {
return true, nil
}
if errors.IsNotFound(err) {

View File

@ -0,0 +1,354 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package federatedtypes
import (
"bytes"
"fmt"
"sort"
"time"
"github.com/golang/glog"
apiv1 "k8s.io/api/core/v1"
extensionsv1 "k8s.io/api/extensions/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
pkgruntime "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/watch"
fedapi "k8s.io/kubernetes/federation/apis/federation"
fedv1beta1 "k8s.io/kubernetes/federation/apis/federation/v1beta1"
federationclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset"
fedutil "k8s.io/kubernetes/federation/pkg/federation-controller/util"
"k8s.io/kubernetes/federation/pkg/federation-controller/util/planner"
"k8s.io/kubernetes/federation/pkg/federation-controller/util/podanalyzer"
"k8s.io/kubernetes/federation/pkg/federation-controller/util/replicapreferences"
kubeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
)
const (
ReplicaSetKind = "replicaset"
ReplicaSetControllerName = "replicasets"
FedReplicaSetPreferencesAnnotation = "federation.kubernetes.io/replica-set-preferences"
)
type replicaSetUserInfo struct {
scheduleResult (map[string]int64)
fedStatus *extensionsv1.ReplicaSetStatus
}
func init() {
RegisterFederatedType(ReplicaSetKind, ReplicaSetControllerName, []schema.GroupVersionResource{extensionsv1.SchemeGroupVersion.WithResource(ReplicaSetControllerName)}, NewReplicaSetAdapter)
}
type ReplicaSetAdapter struct {
client federationclientset.Interface
defaultPlanner *planner.Planner
}
func NewReplicaSetAdapter(client federationclientset.Interface) FederatedTypeAdapter {
return &ReplicaSetAdapter{
client: client,
defaultPlanner: planner.NewPlanner(&fedapi.ReplicaAllocationPreferences{
Clusters: map[string]fedapi.ClusterPreferences{
"*": {Weight: 1},
},
})}
}
func (a *ReplicaSetAdapter) Kind() string {
return ReplicaSetKind
}
func (a *ReplicaSetAdapter) ObjectType() pkgruntime.Object {
return &extensionsv1.ReplicaSet{}
}
func (a *ReplicaSetAdapter) IsExpectedType(obj interface{}) bool {
_, ok := obj.(*extensionsv1.ReplicaSet)
return ok
}
func (a *ReplicaSetAdapter) Copy(obj pkgruntime.Object) pkgruntime.Object {
rs := obj.(*extensionsv1.ReplicaSet)
return &extensionsv1.ReplicaSet{
ObjectMeta: fedutil.DeepCopyRelevantObjectMeta(rs.ObjectMeta),
Spec: *fedutil.DeepCopyApiTypeOrPanic(&rs.Spec).(*extensionsv1.ReplicaSetSpec),
}
}
func (a *ReplicaSetAdapter) Equivalent(obj1, obj2 pkgruntime.Object) bool {
replicaset1 := obj1.(*extensionsv1.ReplicaSet)
replicaset2 := obj2.(*extensionsv1.ReplicaSet)
return fedutil.ObjectMetaAndSpecEquivalent(replicaset1, replicaset2)
}
func (a *ReplicaSetAdapter) NamespacedName(obj pkgruntime.Object) types.NamespacedName {
replicaset := obj.(*extensionsv1.ReplicaSet)
return types.NamespacedName{Namespace: replicaset.Namespace, Name: replicaset.Name}
}
func (a *ReplicaSetAdapter) ObjectMeta(obj pkgruntime.Object) *metav1.ObjectMeta {
return &obj.(*extensionsv1.ReplicaSet).ObjectMeta
}
func (a *ReplicaSetAdapter) FedCreate(obj pkgruntime.Object) (pkgruntime.Object, error) {
replicaset := obj.(*extensionsv1.ReplicaSet)
return a.client.Extensions().ReplicaSets(replicaset.Namespace).Create(replicaset)
}
func (a *ReplicaSetAdapter) FedDelete(namespacedName types.NamespacedName, options *metav1.DeleteOptions) error {
return a.client.Extensions().ReplicaSets(namespacedName.Namespace).Delete(namespacedName.Name, options)
}
func (a *ReplicaSetAdapter) FedGet(namespacedName types.NamespacedName) (pkgruntime.Object, error) {
return a.client.Extensions().ReplicaSets(namespacedName.Namespace).Get(namespacedName.Name, metav1.GetOptions{})
}
func (a *ReplicaSetAdapter) FedList(namespace string, options metav1.ListOptions) (pkgruntime.Object, error) {
return a.client.Extensions().ReplicaSets(namespace).List(options)
}
func (a *ReplicaSetAdapter) FedUpdate(obj pkgruntime.Object) (pkgruntime.Object, error) {
replicaset := obj.(*extensionsv1.ReplicaSet)
return a.client.Extensions().ReplicaSets(replicaset.Namespace).Update(replicaset)
}
func (a *ReplicaSetAdapter) FedWatch(namespace string, options metav1.ListOptions) (watch.Interface, error) {
return a.client.Extensions().ReplicaSets(namespace).Watch(options)
}
func (a *ReplicaSetAdapter) ClusterCreate(client kubeclientset.Interface, obj pkgruntime.Object) (pkgruntime.Object, error) {
replicaset := obj.(*extensionsv1.ReplicaSet)
return client.Extensions().ReplicaSets(replicaset.Namespace).Create(replicaset)
}
func (a *ReplicaSetAdapter) ClusterDelete(client kubeclientset.Interface, nsName types.NamespacedName, options *metav1.DeleteOptions) error {
return client.Extensions().ReplicaSets(nsName.Namespace).Delete(nsName.Name, options)
}
func (a *ReplicaSetAdapter) ClusterGet(client kubeclientset.Interface, namespacedName types.NamespacedName) (pkgruntime.Object, error) {
return client.Extensions().ReplicaSets(namespacedName.Namespace).Get(namespacedName.Name, metav1.GetOptions{})
}
func (a *ReplicaSetAdapter) ClusterList(client kubeclientset.Interface, namespace string, options metav1.ListOptions) (pkgruntime.Object, error) {
return client.Extensions().ReplicaSets(namespace).List(options)
}
func (a *ReplicaSetAdapter) ClusterUpdate(client kubeclientset.Interface, obj pkgruntime.Object) (pkgruntime.Object, error) {
replicaset := obj.(*extensionsv1.ReplicaSet)
return client.Extensions().ReplicaSets(replicaset.Namespace).Update(replicaset)
}
func (a *ReplicaSetAdapter) ClusterWatch(client kubeclientset.Interface, namespace string, options metav1.ListOptions) (watch.Interface, error) {
return client.Extensions().ReplicaSets(namespace).Watch(options)
}
func (a *ReplicaSetAdapter) IsSchedulingAdapter() bool {
return true
}
func (a *ReplicaSetAdapter) GetSchedule(obj pkgruntime.Object, key string, clusters []*fedv1beta1.Cluster, informer fedutil.FederatedInformer) (*SchedulingInfo, error) {
var clusterNames []string
for _, cluster := range clusters {
clusterNames = append(clusterNames, cluster.Name)
}
// Schedule the pods across the existing clusters.
replicaSetGetter := func(clusterName, key string) (interface{}, bool, error) {
return informer.GetTargetStore().GetByKey(clusterName, key)
}
podsGetter := func(clusterName string, replicaSet *extensionsv1.ReplicaSet) (*apiv1.PodList, error) {
clientset, err := informer.GetClientsetForCluster(clusterName)
if err != nil {
return nil, err
}
selector, err := metav1.LabelSelectorAsSelector(replicaSet.Spec.Selector)
if err != nil {
return nil, fmt.Errorf("invalid selector: %v", err)
}
return clientset.Core().Pods(replicaSet.ObjectMeta.Namespace).List(metav1.ListOptions{LabelSelector: selector.String()})
}
current, estimatedCapacity, err := clustersReplicaState(clusterNames, key, replicaSetGetter, podsGetter)
if err != nil {
return nil, err
}
rs := obj.(*extensionsv1.ReplicaSet)
return &SchedulingInfo{
Schedule: a.schedule(rs, clusterNames, current, estimatedCapacity),
Status: SchedulingStatus{},
}, nil
}
func (a *ReplicaSetAdapter) ScheduleObject(cluster *fedv1beta1.Cluster, clusterObj pkgruntime.Object, federationObjCopy pkgruntime.Object, schedulingInfo *SchedulingInfo) (pkgruntime.Object, bool, error) {
rs := federationObjCopy.(*extensionsv1.ReplicaSet)
replicas, ok := schedulingInfo.Schedule[cluster.Name]
if !ok {
replicas = 0
}
specReplicas := int32(replicas)
rs.Spec.Replicas = &specReplicas
if clusterObj != nil {
clusterRs := clusterObj.(*extensionsv1.ReplicaSet)
schedulingInfo.Status.Replicas += clusterRs.Status.Replicas
schedulingInfo.Status.FullyLabeledReplicas += clusterRs.Status.FullyLabeledReplicas
schedulingInfo.Status.ReadyReplicas += clusterRs.Status.ReadyReplicas
schedulingInfo.Status.AvailableReplicas += clusterRs.Status.AvailableReplicas
}
return rs, replicas > 0, nil
}
func (a *ReplicaSetAdapter) UpdateFederatedStatus(obj pkgruntime.Object, status SchedulingStatus) error {
rs := obj.(*extensionsv1.ReplicaSet)
if status.Replicas != rs.Status.Replicas || status.FullyLabeledReplicas != rs.Status.FullyLabeledReplicas ||
status.ReadyReplicas != rs.Status.ReadyReplicas || status.AvailableReplicas != rs.Status.AvailableReplicas {
rs.Status = extensionsv1.ReplicaSetStatus{
Replicas: status.Replicas,
FullyLabeledReplicas: status.Replicas,
ReadyReplicas: status.ReadyReplicas,
AvailableReplicas: status.AvailableReplicas,
}
_, err := a.client.Extensions().ReplicaSets(rs.Namespace).UpdateStatus(rs)
return err
}
return nil
}
func (a *ReplicaSetAdapter) EquivalentIgnoringSchedule(obj1, obj2 pkgruntime.Object) bool {
replicaset1 := obj1.(*extensionsv1.ReplicaSet)
replicaset2 := a.Copy(obj2).(*extensionsv1.ReplicaSet)
replicaset2.Spec.Replicas = replicaset1.Spec.Replicas
return fedutil.ObjectMetaAndSpecEquivalent(replicaset1, replicaset2)
}
func (a *ReplicaSetAdapter) schedule(frs *extensionsv1.ReplicaSet, clusterNames []string,
current map[string]int64, estimatedCapacity map[string]int64) map[string]int64 {
// TODO: integrate real scheduler
plnr := a.defaultPlanner
frsPref, err := replicapreferences.GetAllocationPreferences(frs, FedReplicaSetPreferencesAnnotation)
if err != nil {
glog.Info("Invalid ReplicaSet specific preference, use default. rs: %v, err: %v", frs, err)
}
if frsPref != nil { // create a new planner if user specified a preference
plnr = planner.NewPlanner(frsPref)
}
replicas := int64(*frs.Spec.Replicas)
scheduleResult, overflow := plnr.Plan(replicas, clusterNames, current, estimatedCapacity,
frs.Namespace+"/"+frs.Name)
// Ensure that the schedule being returned has scheduling instructions for
// all of the clusters that currently have replicas. A cluster that was in
// the previous schedule but is not in the new schedule should have zero
// replicas.
result := make(map[string]int64)
for clusterName := range current {
result[clusterName] = 0
}
for clusterName, replicas := range scheduleResult {
result[clusterName] = replicas
}
for clusterName, replicas := range overflow {
result[clusterName] += replicas
}
if glog.V(4) {
buf := bytes.NewBufferString(fmt.Sprintf("Schedule - ReplicaSet: %s/%s\n", frs.Namespace, frs.Name))
sort.Strings(clusterNames)
for _, clusterName := range clusterNames {
cur := current[clusterName]
target := scheduleResult[clusterName]
fmt.Fprintf(buf, "%s: current: %d target: %d", clusterName, cur, target)
if over, found := overflow[clusterName]; found {
fmt.Fprintf(buf, " overflow: %d", over)
}
if capacity, found := estimatedCapacity[clusterName]; found {
fmt.Fprintf(buf, " capacity: %d", capacity)
}
fmt.Fprintf(buf, "\n")
}
glog.V(4).Infof(buf.String())
}
return result
}
// clusterReplicaState returns information about the scheduling state of the pods running in the federated clusters.
func clustersReplicaState(
clusterNames []string,
replicaSetKey string,
replicaSetGetter func(clusterName string, key string) (interface{}, bool, error),
podsGetter func(clusterName string, replicaSet *extensionsv1.ReplicaSet) (*apiv1.PodList, error)) (current map[string]int64, estimatedCapacity map[string]int64, err error) {
current = make(map[string]int64)
estimatedCapacity = make(map[string]int64)
for _, clusterName := range clusterNames {
rsObj, exists, err := replicaSetGetter(clusterName, replicaSetKey)
if err != nil {
return nil, nil, err
}
if !exists {
continue
}
rs := rsObj.(*extensionsv1.ReplicaSet)
if int32(*rs.Spec.Replicas) == rs.Status.ReadyReplicas {
current[clusterName] = int64(rs.Status.ReadyReplicas)
} else {
pods, err := podsGetter(clusterName, rs)
if err != nil {
return nil, nil, err
}
podStatus := podanalyzer.AnalyzePods(pods, time.Now())
current[clusterName] = int64(podStatus.RunningAndReady) // include pending as well?
unschedulable := int64(podStatus.Unschedulable)
if unschedulable > 0 {
estimatedCapacity[clusterName] = int64(*rs.Spec.Replicas) - unschedulable
}
}
}
return current, estimatedCapacity, nil
}
func (a *ReplicaSetAdapter) NewTestObject(namespace string) pkgruntime.Object {
replicas := int32(3)
zero := int64(0)
return &extensionsv1.ReplicaSet{
ObjectMeta: metav1.ObjectMeta{
GenerateName: "test-replicaset-",
Namespace: namespace,
},
Spec: extensionsv1.ReplicaSetSpec{
Replicas: &replicas,
Template: apiv1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{"foo": "bar"},
},
Spec: apiv1.PodSpec{
TerminationGracePeriodSeconds: &zero,
Containers: []apiv1.Container{
{
Name: "nginx",
Image: "nginx",
},
},
},
},
},
}
}

View File

@ -0,0 +1,169 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package federatedtypes
import (
"fmt"
"testing"
"time"
apiv1 "k8s.io/api/core/v1"
extensionsv1 "k8s.io/api/extensions/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"github.com/stretchr/testify/assert"
)
const (
pods = "pods"
replicasets = "replicasets"
k8s1 = "k8s-1"
k8s2 = "k8s-2"
)
func TestClusterReplicaState(t *testing.T) {
uncalledPodsGetter := func(clusterName string, replicaSet *extensionsv1.ReplicaSet) (*apiv1.PodList, error) {
t.Fatal("podsGetter should not be called when replica sets are all ready.")
return nil, nil
}
podsByReplicaSet := make(map[*extensionsv1.ReplicaSet][]*apiv1.Pod)
podsGetter := func(clusterName string, replicaSet *extensionsv1.ReplicaSet) (*apiv1.PodList, error) {
pods, ok := podsByReplicaSet[replicaSet]
if !ok {
t.Fatalf("No pods found in test data for replica set named %v", replicaSet.Name)
return nil, fmt.Errorf("Not found")
}
var podListPods []apiv1.Pod
for _, pod := range pods {
podListPods = append(podListPods, *pod)
}
return &apiv1.PodList{Items: podListPods}, nil
}
readyCondition := apiv1.PodCondition{Type: apiv1.PodReady}
unschedulableCondition := apiv1.PodCondition{
Type: apiv1.PodScheduled,
Status: apiv1.ConditionFalse,
Reason: apiv1.PodReasonUnschedulable,
LastTransitionTime: metav1.NewTime(time.Now().Add(-1 * time.Hour)),
}
one := int64(1)
two := int64(2)
tests := map[string]struct {
rs1Replicas int32
rs2Replicas int32
rs1ReadyReplicas int32
rs2ReadyReplicas int32
podsGetter func(clusterName string, replicaSet *extensionsv1.ReplicaSet) (*apiv1.PodList, error)
pod1Phase apiv1.PodPhase
pod1Condition apiv1.PodCondition
pod2Phase apiv1.PodPhase
pod2Condition apiv1.PodCondition
cluster1Replicas *int64
cluster2Replicas *int64
cluster1UnschedulableReplicas *int64
cluster2UnschedulableReplicas *int64
}{
"All replica sets have an equal number of requested and ready replicas.": {rs1Replicas: 2, rs2Replicas: 2, rs1ReadyReplicas: 2, rs2ReadyReplicas: 2, podsGetter: uncalledPodsGetter, cluster1Replicas: &two, cluster2Replicas: &two},
"One replica set has a pending schedulable pod": {rs1Replicas: 2, rs2Replicas: 2, rs1ReadyReplicas: 1, rs2ReadyReplicas: 2, podsGetter: podsGetter, pod1Phase: apiv1.PodRunning, pod1Condition: readyCondition, pod2Phase: apiv1.PodPending, cluster1Replicas: &one, cluster2Replicas: &two},
"One replica set has an unschedulable pod": {rs1Replicas: 2, rs2Replicas: 2, rs1ReadyReplicas: 1, rs2ReadyReplicas: 2, podsGetter: podsGetter, pod1Phase: apiv1.PodRunning, pod1Condition: readyCondition, pod2Phase: apiv1.PodPending, pod2Condition: unschedulableCondition, cluster1Replicas: &one, cluster2Replicas: &two, cluster1UnschedulableReplicas: &one},
}
for name, tt := range tests {
t.Run(name, func(t *testing.T) {
clusters := []string{"one", "two"}
replicaSetsByCluster := make(map[string]*extensionsv1.ReplicaSet)
replicaSetGetter := func(clusterName string, key string) (interface{}, bool, error) {
rs, ok := replicaSetsByCluster[clusterName]
if !ok {
t.Fatalf("No replica set found in test data for %v", clusterName)
return nil, false, fmt.Errorf("Not found")
}
return rs, true, nil
}
rs1 := newReplicaSetWithReplicas("one", tt.rs1Replicas)
rs2 := newReplicaSetWithReplicas("two", tt.rs2Replicas)
rs1.Spec.Replicas = &tt.rs1Replicas
rs2.Spec.Replicas = &tt.rs2Replicas
rs1.Status.ReadyReplicas = tt.rs1ReadyReplicas
rs2.Status.ReadyReplicas = tt.rs2ReadyReplicas
replicaSetsByCluster["one"] = rs1
replicaSetsByCluster["two"] = rs2
pod1 := newPod("one")
pod2 := newPod("two")
podThree := newPod("three")
podFour := newPod("four")
pod1.Status.Phase = tt.pod1Phase
pod2.Status.Phase = tt.pod2Phase
pod1.Status.Conditions = []apiv1.PodCondition{tt.pod1Condition}
pod2.Status.Conditions = []apiv1.PodCondition{tt.pod2Condition}
podsByReplicaSet[rs1] = []*apiv1.Pod{pod1, pod2}
podsByReplicaSet[rs2] = []*apiv1.Pod{podThree, podFour}
current, estimatedCapacity, err := clustersReplicaState(clusters, "", replicaSetGetter, tt.podsGetter)
assert.Nil(t, err)
wantedCurrent := make(map[string]int64)
if tt.cluster1Replicas != nil {
wantedCurrent["one"] = *tt.cluster1Replicas
}
if tt.cluster2Replicas != nil {
wantedCurrent["two"] = *tt.cluster2Replicas
}
assert.Equal(t, wantedCurrent, current)
wantedEstimatedCapacity := make(map[string]int64)
if tt.cluster1UnschedulableReplicas != nil {
wantedEstimatedCapacity["one"] = *tt.cluster1UnschedulableReplicas
}
if tt.cluster2UnschedulableReplicas != nil {
wantedEstimatedCapacity["two"] = *tt.cluster2UnschedulableReplicas
}
assert.Equal(t, wantedEstimatedCapacity, estimatedCapacity)
})
}
}
func newReplicaSetWithReplicas(name string, replicas int32) *extensionsv1.ReplicaSet {
return &extensionsv1.ReplicaSet{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: metav1.NamespaceDefault,
SelfLink: "/api/v1/namespaces/default/replicasets/name",
},
Spec: extensionsv1.ReplicaSetSpec{
Replicas: &replicas,
},
}
}
func newPod(name string) *apiv1.Pod {
return &apiv1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: metav1.NamespaceDefault,
},
}
}

View File

@ -44,4 +44,8 @@ type SchedulingAdapter interface {
GetSchedule(obj pkgruntime.Object, key string, clusters []*federationapi.Cluster, informer fedutil.FederatedInformer) (*SchedulingInfo, error)
ScheduleObject(cluster *federationapi.Cluster, clusterObj pkgruntime.Object, federationObjCopy pkgruntime.Object, schedulingInfo *SchedulingInfo) (pkgruntime.Object, bool, error)
UpdateFederatedStatus(obj pkgruntime.Object, status SchedulingStatus) error
// EquivalentIgnoringSchedule returns whether obj1 and obj2 are
// equivalent ignoring differences due to scheduling.
EquivalentIgnoringSchedule(obj1, obj2 pkgruntime.Object) bool
}

View File

@ -28,7 +28,6 @@ filegroup(
"//federation/pkg/federation-controller/deployment:all-srcs",
"//federation/pkg/federation-controller/ingress:all-srcs",
"//federation/pkg/federation-controller/namespace:all-srcs",
"//federation/pkg/federation-controller/replicaset:all-srcs",
"//federation/pkg/federation-controller/service:all-srcs",
"//federation/pkg/federation-controller/sync:all-srcs",
"//federation/pkg/federation-controller/util:all-srcs",

View File

@ -20,7 +20,6 @@ go_library(
"//federation/pkg/federation-controller/util/deletionhelper:go_default_library",
"//federation/pkg/federation-controller/util/eventsink:go_default_library",
"//federation/pkg/federation-controller/util/planner:go_default_library",
"//federation/pkg/federation-controller/util/podanalyzer:go_default_library",
"//federation/pkg/federation-controller/util/replicapreferences:go_default_library",
"//pkg/api:go_default_library",
"//pkg/client/clientset_generated/clientset:go_default_library",
@ -30,6 +29,7 @@ go_library(
"//vendor/k8s.io/api/extensions/v1beta1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/labels:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",

View File

@ -29,6 +29,7 @@ import (
extensionsv1 "k8s.io/api/extensions/v1beta1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/wait"
@ -44,7 +45,6 @@ import (
"k8s.io/kubernetes/federation/pkg/federation-controller/util/deletionhelper"
"k8s.io/kubernetes/federation/pkg/federation-controller/util/eventsink"
"k8s.io/kubernetes/federation/pkg/federation-controller/util/planner"
"k8s.io/kubernetes/federation/pkg/federation-controller/util/podanalyzer"
"k8s.io/kubernetes/federation/pkg/federation-controller/util/replicapreferences"
"k8s.io/kubernetes/pkg/api"
kubeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
@ -352,6 +352,62 @@ func (fdc *DeploymentController) worker() {
}
}
type podAnalysisResult struct {
// Total number of pods created.
total int
// Number of pods that are running and ready.
runningAndReady int
// Number of pods that have been in unschedulable state for UnshedulableThreshold seconds.
unschedulable int
// TODO: Handle other scenarios like pod waiting too long for scheduler etc.
}
const (
// TODO: make it configurable
unschedulableThreshold = 60 * time.Second
)
// A function that calculates how many pods from the list are in one of
// the meaningful (from the replica set perspective) states. This function is
// a temporary workaround against the current lack of ownerRef in pods.
// TODO(perotinus): Unify this with the ReplicaSet controller.
func analyzePods(selectorv1 *metav1.LabelSelector, allPods []fedutil.FederatedObject, currentTime time.Time) (map[string]podAnalysisResult, error) {
selector, err := metav1.LabelSelectorAsSelector(selectorv1)
if err != nil {
return nil, fmt.Errorf("invalid selector: %v", err)
}
result := make(map[string]podAnalysisResult)
for _, fedObject := range allPods {
pod, isPod := fedObject.Object.(*apiv1.Pod)
if !isPod {
return nil, fmt.Errorf("invalid arg content - not a *pod")
}
if !selector.Empty() && selector.Matches(labels.Set(pod.Labels)) {
status := result[fedObject.ClusterName]
status.total++
for _, condition := range pod.Status.Conditions {
if pod.Status.Phase == apiv1.PodRunning {
if condition.Type == apiv1.PodReady {
status.runningAndReady++
}
} else {
if condition.Type == apiv1.PodScheduled &&
condition.Status == apiv1.ConditionFalse &&
condition.Reason == apiv1.PodReasonUnschedulable &&
condition.LastTransitionTime.Add(unschedulableThreshold).Before(currentTime) {
status.unschedulable++
}
}
}
result[fedObject.ClusterName] = status
}
}
return result, nil
}
func (fdc *DeploymentController) schedule(fd *extensionsv1.Deployment, clusters []*fedv1.Cluster,
current map[string]int64, estimatedCapacity map[string]int64) map[string]int64 {
// TODO: integrate real scheduler
@ -471,10 +527,7 @@ func (fdc *DeploymentController) reconcileDeployment(key string) (reconciliation
if err != nil {
return statusError, err
}
podStatus, err := podanalyzer.AnalysePods(fd.Spec.Selector, allPods, time.Now())
if err != nil {
return statusError, err
}
podStatus, err := analyzePods(fd.Spec.Selector, allPods, time.Now())
current := make(map[string]int64)
estimatedCapacity := make(map[string]int64)
for _, cluster := range clusters {
@ -484,8 +537,8 @@ func (fdc *DeploymentController) reconcileDeployment(key string) (reconciliation
}
if exists {
ld := ldObj.(*extensionsv1.Deployment)
current[cluster.Name] = int64(podStatus[cluster.Name].RunningAndReady) // include pending as well?
unschedulable := int64(podStatus[cluster.Name].Unschedulable)
current[cluster.Name] = int64(podStatus[cluster.Name].runningAndReady) // include pending as well?
unschedulable := int64(podStatus[cluster.Name].unschedulable)
if unschedulable > 0 {
estimatedCapacity[cluster.Name] = int64(*ld.Spec.Replicas) - unschedulable
}

View File

@ -1,75 +0,0 @@
package(default_visibility = ["//visibility:public"])
licenses(["notice"])
load(
"@io_bazel_rules_go//go:def.bzl",
"go_library",
"go_test",
)
go_library(
name = "go_default_library",
srcs = ["replicasetcontroller.go"],
tags = ["automanaged"],
deps = [
"//federation/apis/federation:go_default_library",
"//federation/apis/federation/v1beta1:go_default_library",
"//federation/client/clientset_generated/federation_clientset:go_default_library",
"//federation/pkg/federation-controller/util:go_default_library",
"//federation/pkg/federation-controller/util/deletionhelper:go_default_library",
"//federation/pkg/federation-controller/util/eventsink:go_default_library",
"//federation/pkg/federation-controller/util/planner:go_default_library",
"//federation/pkg/federation-controller/util/podanalyzer:go_default_library",
"//federation/pkg/federation-controller/util/replicapreferences:go_default_library",
"//pkg/api:go_default_library",
"//pkg/client/clientset_generated/clientset:go_default_library",
"//pkg/controller:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/api/extensions/v1beta1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/watch:go_default_library",
"//vendor/k8s.io/client-go/tools/cache:go_default_library",
"//vendor/k8s.io/client-go/tools/record:go_default_library",
"//vendor/k8s.io/client-go/util/flowcontrol:go_default_library",
"//vendor/k8s.io/client-go/util/workqueue:go_default_library",
],
)
go_test(
name = "go_default_test",
srcs = ["replicasetcontroller_test.go"],
library = ":go_default_library",
tags = ["automanaged"],
deps = [
"//federation/apis/federation/v1beta1:go_default_library",
"//federation/client/clientset_generated/federation_clientset/fake:go_default_library",
"//federation/pkg/federation-controller/util/test:go_default_library",
"//pkg/client/clientset_generated/clientset:go_default_library",
"//pkg/client/clientset_generated/clientset/fake:go_default_library",
"//vendor/github.com/stretchr/testify/assert:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/api/extensions/v1beta1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/watch:go_default_library",
"//vendor/k8s.io/client-go/testing:go_default_library",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
)

View File

@ -1,574 +0,0 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package replicaset
import (
"bytes"
"fmt"
"sort"
"time"
"github.com/golang/glog"
apiv1 "k8s.io/api/core/v1"
clientv1 "k8s.io/api/core/v1"
extensionsv1 "k8s.io/api/extensions/v1beta1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/flowcontrol"
"k8s.io/client-go/util/workqueue"
fed "k8s.io/kubernetes/federation/apis/federation"
fedv1 "k8s.io/kubernetes/federation/apis/federation/v1beta1"
fedclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset"
fedutil "k8s.io/kubernetes/federation/pkg/federation-controller/util"
"k8s.io/kubernetes/federation/pkg/federation-controller/util/deletionhelper"
"k8s.io/kubernetes/federation/pkg/federation-controller/util/eventsink"
"k8s.io/kubernetes/federation/pkg/federation-controller/util/planner"
"k8s.io/kubernetes/federation/pkg/federation-controller/util/podanalyzer"
"k8s.io/kubernetes/federation/pkg/federation-controller/util/replicapreferences"
"k8s.io/kubernetes/pkg/api"
kubeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
"k8s.io/kubernetes/pkg/controller"
)
const (
FedReplicaSetPreferencesAnnotation = "federation.kubernetes.io/replica-set-preferences"
allClustersKey = "THE_ALL_CLUSTER_KEY"
UserAgentName = "federation-replicaset-controller"
ControllerName = "replicasets"
)
var (
RequiredResources = []schema.GroupVersionResource{extensionsv1.SchemeGroupVersion.WithResource("replicasets")}
replicaSetReviewDelay = 10 * time.Second
clusterAvailableDelay = 20 * time.Second
clusterUnavailableDelay = 60 * time.Second
allReplicaSetReviewDelay = 2 * time.Minute
updateTimeout = 30 * time.Second
)
type ReplicaSetController struct {
fedClient fedclientset.Interface
replicaSetStore cache.Store
replicaSetController cache.Controller
fedReplicaSetInformer fedutil.FederatedInformer
fedPodInformer fedutil.FederatedInformer
replicasetDeliverer *fedutil.DelayingDeliverer
clusterDeliverer *fedutil.DelayingDeliverer
replicasetWorkQueue workqueue.Interface
// For updating members of federation.
fedUpdater fedutil.FederatedUpdater
replicaSetBackoff *flowcontrol.Backoff
// For events
eventRecorder record.EventRecorder
deletionHelper *deletionhelper.DeletionHelper
defaultPlanner *planner.Planner
}
// NewReplicaSetController returns a new replicaset controller
func NewReplicaSetController(federationClient fedclientset.Interface) *ReplicaSetController {
broadcaster := record.NewBroadcaster()
broadcaster.StartRecordingToSink(eventsink.NewFederatedEventSink(federationClient))
recorder := broadcaster.NewRecorder(api.Scheme, clientv1.EventSource{Component: UserAgentName})
frsc := &ReplicaSetController{
fedClient: federationClient,
replicasetDeliverer: fedutil.NewDelayingDeliverer(),
clusterDeliverer: fedutil.NewDelayingDeliverer(),
replicasetWorkQueue: workqueue.New(),
replicaSetBackoff: flowcontrol.NewBackOff(5*time.Second, time.Minute),
defaultPlanner: planner.NewPlanner(&fed.ReplicaAllocationPreferences{
Clusters: map[string]fed.ClusterPreferences{
"*": {Weight: 1},
},
}),
eventRecorder: recorder,
}
replicaSetFedInformerFactory := func(cluster *fedv1.Cluster, clientset kubeclientset.Interface) (cache.Store, cache.Controller) {
return cache.NewInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return clientset.Extensions().ReplicaSets(metav1.NamespaceAll).List(options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return clientset.Extensions().ReplicaSets(metav1.NamespaceAll).Watch(options)
},
},
&extensionsv1.ReplicaSet{},
controller.NoResyncPeriodFunc(),
fedutil.NewTriggerOnAllChanges(
func(obj runtime.Object) { frsc.deliverReplicaSetObj(obj, replicaSetReviewDelay) },
),
)
}
clusterLifecycle := fedutil.ClusterLifecycleHandlerFuncs{
ClusterAvailable: func(cluster *fedv1.Cluster) {
frsc.clusterDeliverer.DeliverAfter(allClustersKey, nil, clusterAvailableDelay)
},
ClusterUnavailable: func(cluster *fedv1.Cluster, _ []interface{}) {
frsc.clusterDeliverer.DeliverAfter(allClustersKey, nil, clusterUnavailableDelay)
},
}
frsc.fedReplicaSetInformer = fedutil.NewFederatedInformer(federationClient, replicaSetFedInformerFactory, &clusterLifecycle)
podFedInformerFactory := func(cluster *fedv1.Cluster, clientset kubeclientset.Interface) (cache.Store, cache.Controller) {
return cache.NewInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return clientset.Core().Pods(metav1.NamespaceAll).List(options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return clientset.Core().Pods(metav1.NamespaceAll).Watch(options)
},
},
&apiv1.Pod{},
controller.NoResyncPeriodFunc(),
fedutil.NewTriggerOnAllChanges(
func(obj runtime.Object) {
frsc.clusterDeliverer.DeliverAfter(allClustersKey, nil, allReplicaSetReviewDelay)
},
),
)
}
frsc.fedPodInformer = fedutil.NewFederatedInformer(federationClient, podFedInformerFactory, &fedutil.ClusterLifecycleHandlerFuncs{})
frsc.replicaSetStore, frsc.replicaSetController = cache.NewInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return frsc.fedClient.Extensions().ReplicaSets(metav1.NamespaceAll).List(options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return frsc.fedClient.Extensions().ReplicaSets(metav1.NamespaceAll).Watch(options)
},
},
&extensionsv1.ReplicaSet{},
controller.NoResyncPeriodFunc(),
fedutil.NewTriggerOnMetaAndSpecChanges(
func(obj runtime.Object) { frsc.deliverReplicaSetObj(obj, replicaSetReviewDelay) },
),
)
frsc.fedUpdater = fedutil.NewFederatedUpdater(frsc.fedReplicaSetInformer, "replicaset", updateTimeout, frsc.eventRecorder,
func(client kubeclientset.Interface, obj runtime.Object) error {
rs := obj.(*extensionsv1.ReplicaSet)
_, err := client.Extensions().ReplicaSets(rs.Namespace).Create(rs)
return err
},
func(client kubeclientset.Interface, obj runtime.Object) error {
rs := obj.(*extensionsv1.ReplicaSet)
_, err := client.Extensions().ReplicaSets(rs.Namespace).Update(rs)
return err
},
func(client kubeclientset.Interface, obj runtime.Object) error {
rs := obj.(*extensionsv1.ReplicaSet)
orphanDependents := false
err := client.Extensions().ReplicaSets(rs.Namespace).Delete(rs.Name, &metav1.DeleteOptions{OrphanDependents: &orphanDependents})
return err
})
frsc.deletionHelper = deletionhelper.NewDeletionHelper(
frsc.updateReplicaSet,
// objNameFunc
func(obj runtime.Object) string {
replicaset := obj.(*extensionsv1.ReplicaSet)
return fmt.Sprintf("%s/%s", replicaset.Namespace, replicaset.Name)
},
frsc.fedReplicaSetInformer,
frsc.fedUpdater,
)
return frsc
}
// Sends the given updated object to apiserver.
// Assumes that the given object is a replicaset.
func (frsc *ReplicaSetController) updateReplicaSet(obj runtime.Object) (runtime.Object, error) {
replicaset := obj.(*extensionsv1.ReplicaSet)
return frsc.fedClient.Extensions().ReplicaSets(replicaset.Namespace).Update(replicaset)
}
func (frsc *ReplicaSetController) Run(workers int, stopCh <-chan struct{}) {
go frsc.replicaSetController.Run(stopCh)
frsc.fedReplicaSetInformer.Start()
frsc.fedPodInformer.Start()
frsc.replicasetDeliverer.StartWithHandler(func(item *fedutil.DelayingDelivererItem) {
frsc.replicasetWorkQueue.Add(item.Key)
})
frsc.clusterDeliverer.StartWithHandler(func(_ *fedutil.DelayingDelivererItem) {
frsc.reconcileReplicaSetsOnClusterChange()
})
for !frsc.isSynced() {
time.Sleep(5 * time.Millisecond)
}
for i := 0; i < workers; i++ {
go wait.Until(frsc.worker, time.Second, stopCh)
}
fedutil.StartBackoffGC(frsc.replicaSetBackoff, stopCh)
<-stopCh
glog.Infof("Shutting down ReplicaSetController")
frsc.replicasetDeliverer.Stop()
frsc.clusterDeliverer.Stop()
frsc.replicasetWorkQueue.ShutDown()
frsc.fedReplicaSetInformer.Stop()
frsc.fedPodInformer.Stop()
}
func (frsc *ReplicaSetController) isSynced() bool {
if !frsc.fedReplicaSetInformer.ClustersSynced() {
glog.V(2).Infof("Cluster list not synced")
return false
}
clusters, err := frsc.fedReplicaSetInformer.GetReadyClusters()
if err != nil {
glog.Errorf("Failed to get ready clusters: %v", err)
return false
}
if !frsc.fedReplicaSetInformer.GetTargetStore().ClustersSynced(clusters) {
return false
}
if !frsc.fedPodInformer.ClustersSynced() {
glog.V(2).Infof("Cluster list not synced")
return false
}
clusters2, err := frsc.fedPodInformer.GetReadyClusters()
if err != nil {
glog.Errorf("Failed to get ready clusters: %v", err)
return false
}
// This also checks whether podInformer and replicaSetInformer have the
// same cluster lists.
if !frsc.fedPodInformer.GetTargetStore().ClustersSynced(clusters) {
return false
}
if !frsc.fedPodInformer.GetTargetStore().ClustersSynced(clusters2) {
return false
}
if !frsc.replicaSetController.HasSynced() {
glog.V(2).Infof("federation replicaset list not synced")
return false
}
return true
}
func (frsc *ReplicaSetController) deliverReplicaSetObj(obj interface{}, delay time.Duration) {
key, err := controller.KeyFunc(obj)
if err != nil {
glog.Errorf("Couldn't get key for object %+v: %v", obj, err)
return
}
frsc.deliverReplicaSetByKey(key, delay, false)
}
func (frsc *ReplicaSetController) deliverReplicaSetByKey(key string, delay time.Duration, failed bool) {
if failed {
frsc.replicaSetBackoff.Next(key, time.Now())
delay = delay + frsc.replicaSetBackoff.Get(key)
} else {
frsc.replicaSetBackoff.Reset(key)
}
frsc.replicasetDeliverer.DeliverAfter(key, nil, delay)
}
func (frsc *ReplicaSetController) worker() {
for {
item, quit := frsc.replicasetWorkQueue.Get()
if quit {
return
}
key := item.(string)
status, err := frsc.reconcileReplicaSet(key)
frsc.replicasetWorkQueue.Done(item)
if err != nil {
glog.Errorf("Error syncing cluster controller: %v", err)
frsc.deliverReplicaSetByKey(key, 0, true)
} else {
switch status {
case statusAllOk:
break
case statusError:
frsc.deliverReplicaSetByKey(key, 0, true)
case statusNeedRecheck:
frsc.deliverReplicaSetByKey(key, replicaSetReviewDelay, false)
case statusNotSynced:
frsc.deliverReplicaSetByKey(key, clusterAvailableDelay, false)
default:
glog.Errorf("Unhandled reconciliation status: %s", status)
frsc.deliverReplicaSetByKey(key, replicaSetReviewDelay, false)
}
}
}
}
func (frsc *ReplicaSetController) schedule(frs *extensionsv1.ReplicaSet, clusters []*fedv1.Cluster,
current map[string]int64, estimatedCapacity map[string]int64) map[string]int64 {
// TODO: integrate real scheduler
plnr := frsc.defaultPlanner
frsPref, err := replicapreferences.GetAllocationPreferences(frs, FedReplicaSetPreferencesAnnotation)
if err != nil {
glog.Info("Invalid ReplicaSet specific preference, use default. rs: %v, err: %v", frs, err)
}
if frsPref != nil { // create a new planner if user specified a preference
plnr = planner.NewPlanner(frsPref)
}
replicas := int64(*frs.Spec.Replicas)
var clusterNames []string
for _, cluster := range clusters {
clusterNames = append(clusterNames, cluster.Name)
}
scheduleResult, overflow := plnr.Plan(replicas, clusterNames, current, estimatedCapacity,
frs.Namespace+"/"+frs.Name)
// make sure the return contains clusters need to zero the replicas
result := make(map[string]int64)
for clusterName := range current {
result[clusterName] = 0
}
for clusterName, replicas := range scheduleResult {
result[clusterName] = replicas
}
for clusterName, replicas := range overflow {
result[clusterName] += replicas
}
if glog.V(4) {
buf := bytes.NewBufferString(fmt.Sprintf("Schedule - ReplicaSet: %s/%s\n", frs.Namespace, frs.Name))
sort.Strings(clusterNames)
for _, clusterName := range clusterNames {
cur := current[clusterName]
target := scheduleResult[clusterName]
fmt.Fprintf(buf, "%s: current: %d target: %d", clusterName, cur, target)
if over, found := overflow[clusterName]; found {
fmt.Fprintf(buf, " overflow: %d", over)
}
if capacity, found := estimatedCapacity[clusterName]; found {
fmt.Fprintf(buf, " capacity: %d", capacity)
}
fmt.Fprintf(buf, "\n")
}
glog.V(4).Infof(buf.String())
}
return result
}
type reconciliationStatus string
const (
statusAllOk = reconciliationStatus("ALL_OK")
statusNeedRecheck = reconciliationStatus("RECHECK")
statusError = reconciliationStatus("ERROR")
statusNotSynced = reconciliationStatus("NOSYNC")
)
func (frsc *ReplicaSetController) reconcileReplicaSet(key string) (reconciliationStatus, error) {
if !frsc.isSynced() {
return statusNotSynced, nil
}
glog.V(4).Infof("Start reconcile replicaset %q", key)
startTime := time.Now()
defer glog.V(4).Infof("Finished reconcile replicaset %q (%v)", key, time.Now().Sub(startTime))
objFromStore, exists, err := frsc.replicaSetStore.GetByKey(key)
if err != nil {
return statusError, err
}
if !exists {
return statusAllOk, nil
}
obj, err := api.Scheme.DeepCopy(objFromStore)
frs, ok := obj.(*extensionsv1.ReplicaSet)
if err != nil || !ok {
glog.Errorf("Error in retrieving obj from store: %v, %v", ok, err)
frsc.deliverReplicaSetByKey(key, 0, true)
return statusError, err
}
if frs.DeletionTimestamp != nil {
if err := frsc.delete(frs); err != nil {
glog.Errorf("Failed to delete %s: %v", frs, err)
frsc.eventRecorder.Eventf(frs, api.EventTypeWarning, "DeleteFailed",
"ReplicaSet delete failed: %v", err)
frsc.deliverReplicaSetByKey(key, 0, true)
return statusError, err
}
return statusAllOk, nil
}
glog.V(3).Infof("Ensuring delete object from underlying clusters finalizer for replicaset: %s",
frs.Name)
// Add the required finalizers before creating a replicaset in underlying clusters.
updatedRsObj, err := frsc.deletionHelper.EnsureFinalizers(frs)
if err != nil {
glog.Errorf("Failed to ensure delete object from underlying clusters finalizer in replicaset %s: %v",
frs.Name, err)
frsc.deliverReplicaSetByKey(key, 0, false)
return statusError, err
}
frs = updatedRsObj.(*extensionsv1.ReplicaSet)
glog.V(3).Infof("Syncing replicaset %s in underlying clusters", frs.Name)
clusters, err := frsc.fedReplicaSetInformer.GetReadyClusters()
if err != nil {
return statusError, err
}
// collect current status and do schedule
allPods, err := frsc.fedPodInformer.GetTargetStore().List()
if err != nil {
return statusError, err
}
podStatus, err := podanalyzer.AnalysePods(frs.Spec.Selector, allPods, time.Now())
if err != nil {
return statusError, err
}
current := make(map[string]int64)
estimatedCapacity := make(map[string]int64)
for _, cluster := range clusters {
lrsObj, exists, err := frsc.fedReplicaSetInformer.GetTargetStore().GetByKey(cluster.Name, key)
if err != nil {
return statusError, err
}
if exists {
lrs := lrsObj.(*extensionsv1.ReplicaSet)
current[cluster.Name] = int64(podStatus[cluster.Name].RunningAndReady) // include pending as well?
unschedulable := int64(podStatus[cluster.Name].Unschedulable)
if unschedulable > 0 {
estimatedCapacity[cluster.Name] = int64(*lrs.Spec.Replicas) - unschedulable
}
}
}
scheduleResult := frsc.schedule(frs, clusters, current, estimatedCapacity)
glog.V(4).Infof("Start syncing local replicaset %s: %v", key, scheduleResult)
fedStatus := extensionsv1.ReplicaSetStatus{ObservedGeneration: frs.Generation}
operations := make([]fedutil.FederatedOperation, 0)
for clusterName, replicas := range scheduleResult {
lrsObj, exists, err := frsc.fedReplicaSetInformer.GetTargetStore().GetByKey(clusterName, key)
if err != nil {
return statusError, err
}
// The object can be modified.
lrs := &extensionsv1.ReplicaSet{
ObjectMeta: fedutil.DeepCopyRelevantObjectMeta(frs.ObjectMeta),
Spec: *fedutil.DeepCopyApiTypeOrPanic(&frs.Spec).(*extensionsv1.ReplicaSetSpec),
}
specReplicas := int32(replicas)
lrs.Spec.Replicas = &specReplicas
if !exists {
if replicas > 0 {
operations = append(operations, fedutil.FederatedOperation{
Type: fedutil.OperationTypeAdd,
Obj: lrs,
ClusterName: clusterName,
Key: key,
})
}
} else {
currentLrs := lrsObj.(*extensionsv1.ReplicaSet)
// Update existing replica set, if needed.
if !fedutil.ObjectMetaAndSpecEquivalent(lrs, currentLrs) {
operations = append(operations, fedutil.FederatedOperation{
Type: fedutil.OperationTypeUpdate,
Obj: lrs,
ClusterName: clusterName,
Key: key,
})
}
fedStatus.Replicas += currentLrs.Status.Replicas
fedStatus.FullyLabeledReplicas += currentLrs.Status.FullyLabeledReplicas
fedStatus.ReadyReplicas += currentLrs.Status.ReadyReplicas
fedStatus.AvailableReplicas += currentLrs.Status.AvailableReplicas
}
}
if fedStatus.Replicas != frs.Status.Replicas || fedStatus.FullyLabeledReplicas != frs.Status.FullyLabeledReplicas ||
fedStatus.ReadyReplicas != frs.Status.ReadyReplicas || fedStatus.AvailableReplicas != frs.Status.AvailableReplicas {
frs.Status = fedStatus
_, err = frsc.fedClient.Extensions().ReplicaSets(frs.Namespace).UpdateStatus(frs)
if err != nil {
return statusError, err
}
}
if len(operations) == 0 {
// Everything is in order
return statusAllOk, nil
}
err = frsc.fedUpdater.Update(operations)
if err != nil {
glog.Errorf("Failed to execute updates for %s: %v", key, err)
return statusError, err
}
// Some operations were made, reconcile after a while.
return statusNeedRecheck, nil
}
func (frsc *ReplicaSetController) reconcileReplicaSetsOnClusterChange() {
if !frsc.isSynced() {
frsc.clusterDeliverer.DeliverAfter(allClustersKey, nil, clusterAvailableDelay)
}
for _, rs := range frsc.replicaSetStore.List() {
key, _ := controller.KeyFunc(rs)
frsc.deliverReplicaSetByKey(key, 0, false)
}
}
// delete deletes the given replicaset or returns error if the deletion was not complete.
func (frsc *ReplicaSetController) delete(replicaset *extensionsv1.ReplicaSet) error {
glog.V(3).Infof("Handling deletion of replicaset: %v", *replicaset)
_, err := frsc.deletionHelper.HandleObjectInUnderlyingClusters(replicaset)
if err != nil {
return err
}
err = frsc.fedClient.Extensions().ReplicaSets(replicaset.Namespace).Delete(replicaset.Name, nil)
if err != nil {
// Its all good if the error is not found error. That means it is deleted already and we do not have to do anything.
// This is expected when we are processing an update as a result of replicaset finalizer deletion.
// The process that deleted the last finalizer is also going to delete the replicaset and we do not have to do anything.
if !errors.IsNotFound(err) {
return fmt.Errorf("failed to delete replicaset: %v", err)
}
}
return nil
}

View File

@ -42,18 +42,28 @@ go_library(
go_test(
name = "go_default_test",
srcs = ["controller_test.go"],
srcs = [
"controller_test.go",
"replicasetcontroller_test.go",
],
library = ":go_default_library",
tags = ["automanaged"],
deps = [
"//federation/apis/federation/v1beta1:go_default_library",
"//federation/client/clientset_generated/federation_clientset/fake:go_default_library",
"//federation/pkg/federatedtypes:go_default_library",
"//federation/pkg/federation-controller/util:go_default_library",
"//federation/pkg/federation-controller/util/test:go_default_library",
"//pkg/client/clientset_generated/clientset:go_default_library",
"//pkg/client/clientset_generated/clientset/fake:go_default_library",
"//vendor/github.com/stretchr/testify/assert:go_default_library",
"//vendor/github.com/stretchr/testify/require:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/api/extensions/v1beta1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/watch:go_default_library",
"//vendor/k8s.io/client-go/testing:go_default_library",
],
)

View File

@ -84,10 +84,11 @@ type FederationSyncController struct {
deletionHelper *deletionhelper.DeletionHelper
reviewDelay time.Duration
clusterAvailableDelay time.Duration
smallDelay time.Duration
updateTimeout time.Duration
reviewDelay time.Duration
clusterAvailableDelay time.Duration
clusterUnavailableDelay time.Duration
smallDelay time.Duration
updateTimeout time.Duration
adapter federatedtypes.FederatedTypeAdapter
}
@ -112,14 +113,15 @@ func newFederationSyncController(client federationclientset.Interface, adapter f
recorder := broadcaster.NewRecorder(api.Scheme, clientv1.EventSource{Component: fmt.Sprintf("federation-%v-controller", adapter.Kind())})
s := &FederationSyncController{
reviewDelay: time.Second * 10,
clusterAvailableDelay: time.Second * 20,
smallDelay: time.Second * 3,
updateTimeout: time.Second * 30,
workQueue: workqueue.New(),
backoff: flowcontrol.NewBackOff(5*time.Second, time.Minute),
eventRecorder: recorder,
adapter: adapter,
reviewDelay: time.Second * 10,
clusterAvailableDelay: time.Second * 20,
clusterUnavailableDelay: time.Second * 60,
smallDelay: time.Second * 3,
updateTimeout: time.Second * 30,
workQueue: workqueue.New(),
backoff: flowcontrol.NewBackOff(5*time.Second, time.Minute),
eventRecorder: recorder,
adapter: adapter,
}
// Build delivereres for triggering reconciliations.
@ -169,6 +171,10 @@ func newFederationSyncController(client federationclientset.Interface, adapter f
// When new cluster becomes available process all the target resources again.
s.clusterDeliverer.DeliverAt(allClustersKey, nil, time.Now().Add(s.clusterAvailableDelay))
},
// When a cluster becomes unavailable process all the target resources again.
ClusterUnavailable: func(cluster *federationapi.Cluster, _ []interface{}) {
s.clusterDeliverer.DeliverAt(allClustersKey, nil, time.Now().Add(s.clusterUnavailableDelay))
},
},
)
@ -205,6 +211,7 @@ func newFederationSyncController(client federationclientset.Interface, adapter f
// minimizeLatency reduces delays and timeouts to make the controller more responsive (useful for testing).
func (s *FederationSyncController) minimizeLatency() {
s.clusterAvailableDelay = time.Second
s.clusterUnavailableDelay = time.Second
s.reviewDelay = 50 * time.Millisecond
s.smallDelay = 20 * time.Millisecond
s.updateTimeout = 5 * time.Second
@ -328,6 +335,10 @@ func (s *FederationSyncController) reconcile(namespacedName types.NamespacedName
kind := s.adapter.Kind()
key := namespacedName.String()
glog.V(4).Infof("Starting to reconcile %v %v", kind, key)
startTime := time.Now()
defer glog.V(4).Infof("Finished reconciling %v %v (duration: %v)", kind, key, time.Now().Sub(startTime))
obj, err := s.objFromCache(kind, key)
if err != nil {
return statusError

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package replicaset
package sync
import (
"flag"
@ -29,7 +29,8 @@ import (
core "k8s.io/client-go/testing"
fedv1 "k8s.io/kubernetes/federation/apis/federation/v1beta1"
fedclientfake "k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset/fake"
"k8s.io/kubernetes/federation/pkg/federation-controller/util/test"
"k8s.io/kubernetes/federation/pkg/federatedtypes"
testutil "k8s.io/kubernetes/federation/pkg/federation-controller/util/test"
kubeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
kubeclientfake "k8s.io/kubernetes/pkg/client/clientset_generated/clientset/fake"
@ -48,11 +49,6 @@ func TestReplicaSetController(t *testing.T) {
flag.Set("v", "5")
flag.Parse()
replicaSetReviewDelay = 10 * time.Millisecond
clusterAvailableDelay = 20 * time.Millisecond
clusterUnavailableDelay = 60 * time.Millisecond
allReplicaSetReviewDelay = 120 * time.Millisecond
fedclientset := fedclientfake.NewSimpleClientset()
fedrswatch := watch.NewFake()
fedclientset.PrependWatchReactor(replicasets, core.DefaultWatchReactor(fedrswatch, nil))
@ -81,20 +77,19 @@ func TestReplicaSetController(t *testing.T) {
return nil, fmt.Errorf("Unknown cluster: %v", cluster.Name)
}
}
replicaSetController := NewReplicaSetController(fedclientset)
rsFedinformer := testutil.ToFederatedInformerForTestOnly(replicaSetController.fedReplicaSetInformer)
replicaSetController := newFederationSyncController(fedclientset, federatedtypes.NewReplicaSetAdapter(fedclientset))
replicaSetController.minimizeLatency()
rsFedinformer := testutil.ToFederatedInformerForTestOnly(replicaSetController.informer)
rsFedinformer.SetClientFactory(fedInformerClientFactory)
podFedinformer := testutil.ToFederatedInformerForTestOnly(replicaSetController.fedPodInformer)
podFedinformer.SetClientFactory(fedInformerClientFactory)
stopChan := make(chan struct{})
defer close(stopChan)
go replicaSetController.Run(1, stopChan)
go replicaSetController.Run(stopChan)
rs := newReplicaSetWithReplicas("rs", 9)
rs, _ = fedclientset.Extensions().ReplicaSets(metav1.NamespaceDefault).Create(rs)
fedrswatch.Add(rs)
time.Sleep(1 * time.Second)
time.Sleep(2 * time.Second)
rs1, _ := kube1clientset.Extensions().ReplicaSets(metav1.NamespaceDefault).Get(rs.Name, metav1.GetOptions{})
kube1rswatch.Add(rs1)
@ -114,7 +109,7 @@ func TestReplicaSetController(t *testing.T) {
rs2, _ = kube2clientset.Extensions().ReplicaSets(metav1.NamespaceDefault).UpdateStatus(rs2)
kube2rswatch.Modify(rs2)
time.Sleep(1 * time.Second)
time.Sleep(2 * time.Second)
rs, _ = fedclientset.Extensions().ReplicaSets(metav1.NamespaceDefault).Get(rs.Name, metav1.GetOptions{})
assert.Equal(t, *rs.Spec.Replicas, *rs1.Spec.Replicas+*rs2.Spec.Replicas)
assert.Equal(t, rs.Status.Replicas, rs1.Status.Replicas+rs2.Status.Replicas)
@ -126,7 +121,7 @@ func TestReplicaSetController(t *testing.T) {
rs.Spec.Replicas = &replicas
rs, _ = fedclientset.Extensions().ReplicaSets(metav1.NamespaceDefault).Update(rs)
fedrswatch.Modify(rs)
time.Sleep(1 * time.Second)
time.Sleep(2 * time.Second)
rs1, _ = kube1clientset.Extensions().ReplicaSets(metav1.NamespaceDefault).Get(rs.Name, metav1.GetOptions{})
rs1.Status.Replicas = *rs1.Spec.Replicas

View File

@ -12,12 +12,7 @@ go_library(
name = "go_default_library",
srcs = ["pod_helper.go"],
tags = ["automanaged"],
deps = [
"//federation/pkg/federation-controller/util:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/labels:go_default_library",
],
deps = ["//vendor/k8s.io/api/core/v1:go_default_library"],
)
go_test(
@ -26,7 +21,6 @@ go_test(
library = ":go_default_library",
tags = ["automanaged"],
deps = [
"//federation/pkg/federation-controller/util:go_default_library",
"//vendor/github.com/stretchr/testify/assert:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/api/extensions/v1beta1:go_default_library",

View File

@ -17,13 +17,9 @@ limitations under the License.
package podanalyzer
import (
"fmt"
"time"
api_v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/kubernetes/federation/pkg/federation-controller/util"
)
type PodAnalysisResult struct {
@ -42,41 +38,26 @@ const (
UnschedulableThreshold = 60 * time.Second
)
// A function that calculates how many pods from the list are in one of
// AnalyzePods calculates how many pods from the list are in one of
// the meaningful (from the replica set perspective) states. This function is
// a temporary workaround against the current lack of ownerRef in pods.
func AnalysePods(selectorv1 *metav1.LabelSelector, allPods []util.FederatedObject, currentTime time.Time) (map[string]PodAnalysisResult, error) {
selector, err := metav1.LabelSelectorAsSelector(selectorv1)
if err != nil {
return nil, fmt.Errorf("invalid selector: %v", err)
}
result := make(map[string]PodAnalysisResult)
for _, fedObject := range allPods {
pod, isPod := fedObject.Object.(*api_v1.Pod)
if !isPod {
return nil, fmt.Errorf("invalid arg content - not a *pod")
}
if !selector.Empty() && selector.Matches(labels.Set(pod.Labels)) {
status := result[fedObject.ClusterName]
status.Total++
for _, condition := range pod.Status.Conditions {
if pod.Status.Phase == api_v1.PodRunning {
if condition.Type == api_v1.PodReady {
status.RunningAndReady++
}
} else {
if condition.Type == api_v1.PodScheduled &&
condition.Status == api_v1.ConditionFalse &&
condition.Reason == api_v1.PodReasonUnschedulable &&
condition.LastTransitionTime.Add(UnschedulableThreshold).Before(currentTime) {
status.Unschedulable++
}
func AnalyzePods(pods *api_v1.PodList, currentTime time.Time) PodAnalysisResult {
result := PodAnalysisResult{}
for _, pod := range pods.Items {
result.Total++
for _, condition := range pod.Status.Conditions {
if pod.Status.Phase == api_v1.PodRunning {
if condition.Type == api_v1.PodReady {
result.RunningAndReady++
}
} else if condition.Type == api_v1.PodScheduled &&
condition.Status == api_v1.ConditionFalse &&
condition.Reason == api_v1.PodReasonUnschedulable &&
condition.LastTransitionTime.Add(UnschedulableThreshold).Before(currentTime) {
result.Unschedulable++
}
result[fedObject.ClusterName] = status
}
}
return result, nil
return result
}

View File

@ -23,16 +23,13 @@ import (
api_v1 "k8s.io/api/core/v1"
"k8s.io/api/extensions/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/kubernetes/federation/pkg/federation-controller/util"
"github.com/stretchr/testify/assert"
)
func TestAnalyze(t *testing.T) {
now := time.Now()
replicaSet := newReplicaSet(map[string]string{"A": "B"})
replicaSet2 := newReplicaSet(map[string]string{"C": "D"})
podRunning := newPod("p1", replicaSet,
podRunning := newPod("p1",
api_v1.PodStatus{
Phase: api_v1.PodRunning,
Conditions: []api_v1.PodCondition{
@ -42,7 +39,7 @@ func TestAnalyze(t *testing.T) {
},
},
})
podUnschedulable := newPod("pU", replicaSet,
podUnschedulable := newPod("pU",
api_v1.PodStatus{
Phase: api_v1.PodPending,
Conditions: []api_v1.PodCondition{
@ -54,42 +51,25 @@ func TestAnalyze(t *testing.T) {
},
},
})
podOther := newPod("pO", replicaSet,
api_v1.PodStatus{
Phase: api_v1.PodPending,
Conditions: []api_v1.PodCondition{},
})
podOtherRS := newPod("pO", replicaSet2,
podOther := newPod("pO",
api_v1.PodStatus{
Phase: api_v1.PodPending,
Conditions: []api_v1.PodCondition{},
})
federatedObjects := []util.FederatedObject{
{ClusterName: "c1", Object: podRunning},
{ClusterName: "c1", Object: podRunning},
{ClusterName: "c1", Object: podRunning},
{ClusterName: "c1", Object: podUnschedulable},
{ClusterName: "c1", Object: podUnschedulable},
{ClusterName: "c2", Object: podOther},
{ClusterName: "c2", Object: podOtherRS},
}
raport, err := AnalysePods(replicaSet.Spec.Selector, federatedObjects, now)
assert.NoError(t, err)
assert.Equal(t, 2, len(raport))
c1Raport := raport["c1"]
c2Raport := raport["c2"]
result := AnalyzePods(&api_v1.PodList{Items: []api_v1.Pod{*podRunning, *podRunning, *podRunning, *podUnschedulable, *podUnschedulable}}, now)
assert.Equal(t, PodAnalysisResult{
Total: 5,
RunningAndReady: 3,
Unschedulable: 2,
}, c1Raport)
}, result)
result = AnalyzePods(&api_v1.PodList{Items: []api_v1.Pod{*podOther}}, now)
assert.Equal(t, PodAnalysisResult{
Total: 1,
RunningAndReady: 0,
Unschedulable: 0,
}, c2Raport)
}, result)
}
func newReplicaSet(selectorMap map[string]string) *v1beta1.ReplicaSet {
@ -97,7 +77,7 @@ func newReplicaSet(selectorMap map[string]string) *v1beta1.ReplicaSet {
rs := &v1beta1.ReplicaSet{
ObjectMeta: metav1.ObjectMeta{
Name: "foobar",
Namespace: "default",
Namespace: metav1.NamespaceDefault,
},
Spec: v1beta1.ReplicaSetSpec{
Replicas: &replicas,
@ -107,12 +87,11 @@ func newReplicaSet(selectorMap map[string]string) *v1beta1.ReplicaSet {
return rs
}
func newPod(name string, rs *v1beta1.ReplicaSet, status api_v1.PodStatus) *api_v1.Pod {
func newPod(name string, status api_v1.PodStatus) *api_v1.Pod {
return &api_v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: rs.Namespace,
Labels: rs.Spec.Selector.MatchLabels,
Namespace: metav1.NamespaceDefault,
},
Status: status,
}

View File

@ -29,7 +29,6 @@ go_library(
"//federation/client/clientset_generated/federation_clientset:go_default_library",
"//federation/client/clientset_generated/federation_clientset/typed/core/v1:go_default_library",
"//federation/pkg/federatedtypes:go_default_library",
"//federation/pkg/federation-controller/replicaset:go_default_library",
"//federation/pkg/federation-controller/util:go_default_library",
"//pkg/api:go_default_library",
"//pkg/api/v1:go_default_library",

View File

@ -37,7 +37,7 @@ import (
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/kubernetes/federation/apis/federation"
fedreplicsetcontroller "k8s.io/kubernetes/federation/pkg/federation-controller/replicaset"
federatedtypes "k8s.io/kubernetes/federation/pkg/federatedtypes"
)
const (
@ -513,7 +513,7 @@ func newReplicaSetObj(namespace string, replicas int32, pref *federation.Replica
if pref != nil {
prefBytes, _ := json.Marshal(pref)
prefString := string(prefBytes)
rs.Annotations[fedreplicsetcontroller.FedReplicaSetPreferencesAnnotation] = prefString
rs.Annotations[federatedtypes.FedReplicaSetPreferencesAnnotation] = prefString
}
return rs