mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-05 10:19:50 +00:00
Merge pull request #29741 from jianhuiz/federation-replicaset-controller
Automatic merge from submit-queue Federation replicaset controller TBD: integrate with the scheduler, events @quinton-hoole @deepak-vij @kshafiee
This commit is contained in:
commit
b8990593f2
@ -31,6 +31,7 @@ import (
|
|||||||
"k8s.io/kubernetes/federation/pkg/dnsprovider"
|
"k8s.io/kubernetes/federation/pkg/dnsprovider"
|
||||||
clustercontroller "k8s.io/kubernetes/federation/pkg/federation-controller/cluster"
|
clustercontroller "k8s.io/kubernetes/federation/pkg/federation-controller/cluster"
|
||||||
namespacecontroller "k8s.io/kubernetes/federation/pkg/federation-controller/namespace"
|
namespacecontroller "k8s.io/kubernetes/federation/pkg/federation-controller/namespace"
|
||||||
|
replicasetcontroller "k8s.io/kubernetes/federation/pkg/federation-controller/replicaset"
|
||||||
secretcontroller "k8s.io/kubernetes/federation/pkg/federation-controller/secret"
|
secretcontroller "k8s.io/kubernetes/federation/pkg/federation-controller/secret"
|
||||||
servicecontroller "k8s.io/kubernetes/federation/pkg/federation-controller/service"
|
servicecontroller "k8s.io/kubernetes/federation/pkg/federation-controller/service"
|
||||||
"k8s.io/kubernetes/federation/pkg/federation-controller/util"
|
"k8s.io/kubernetes/federation/pkg/federation-controller/util"
|
||||||
@ -155,5 +156,9 @@ func StartControllers(s *options.CMServer, restClientCfg *restclient.Config) err
|
|||||||
secretcontroller := secretcontroller.NewSecretController(secretcontrollerClientset)
|
secretcontroller := secretcontroller.NewSecretController(secretcontrollerClientset)
|
||||||
secretcontroller.Run(wait.NeverStop)
|
secretcontroller.Run(wait.NeverStop)
|
||||||
|
|
||||||
|
replicaSetClientset := federationclientset.NewForConfigOrDie(restclient.AddUserAgent(restClientCfg, replicasetcontroller.UserAgentName))
|
||||||
|
replicaSetController := replicasetcontroller.NewReplicaSetController(replicaSetClientset)
|
||||||
|
go replicaSetController.Run(s.ConcurrentReplicaSetSyncs, wait.NeverStop)
|
||||||
|
|
||||||
select {}
|
select {}
|
||||||
}
|
}
|
||||||
|
@ -47,6 +47,10 @@ type ControllerManagerConfiguration struct {
|
|||||||
// allowed to sync concurrently. Larger number = more responsive service
|
// allowed to sync concurrently. Larger number = more responsive service
|
||||||
// management, but more CPU (and network) load.
|
// management, but more CPU (and network) load.
|
||||||
ConcurrentServiceSyncs int `json:"concurrentServiceSyncs"`
|
ConcurrentServiceSyncs int `json:"concurrentServiceSyncs"`
|
||||||
|
// concurrentReplicaSetSyncs is the number of ReplicaSets that are
|
||||||
|
// allowed to sync concurrently. Larger number = more responsive service
|
||||||
|
// management, but more CPU (and network) load.
|
||||||
|
ConcurrentReplicaSetSyncs int `json:"concurrentReplicaSetSyncs"`
|
||||||
// clusterMonitorPeriod is the period for syncing ClusterStatus in cluster controller.
|
// clusterMonitorPeriod is the period for syncing ClusterStatus in cluster controller.
|
||||||
ClusterMonitorPeriod unversioned.Duration `json:"clusterMonitorPeriod"`
|
ClusterMonitorPeriod unversioned.Duration `json:"clusterMonitorPeriod"`
|
||||||
// APIServerQPS is the QPS to use while talking with federation apiserver.
|
// APIServerQPS is the QPS to use while talking with federation apiserver.
|
||||||
@ -81,6 +85,7 @@ func NewCMServer() *CMServer {
|
|||||||
Port: FederatedControllerManagerPort,
|
Port: FederatedControllerManagerPort,
|
||||||
Address: "0.0.0.0",
|
Address: "0.0.0.0",
|
||||||
ConcurrentServiceSyncs: 10,
|
ConcurrentServiceSyncs: 10,
|
||||||
|
ConcurrentReplicaSetSyncs: 10,
|
||||||
ClusterMonitorPeriod: unversioned.Duration{Duration: 40 * time.Second},
|
ClusterMonitorPeriod: unversioned.Duration{Duration: 40 * time.Second},
|
||||||
APIServerQPS: 20.0,
|
APIServerQPS: 20.0,
|
||||||
APIServerBurst: 30,
|
APIServerBurst: 30,
|
||||||
@ -97,6 +102,7 @@ func (s *CMServer) AddFlags(fs *pflag.FlagSet) {
|
|||||||
fs.StringVar(&s.FederationName, "federation-name", s.FederationName, "Federation name.")
|
fs.StringVar(&s.FederationName, "federation-name", s.FederationName, "Federation name.")
|
||||||
fs.StringVar(&s.ZoneName, "zone-name", s.ZoneName, "Zone name, like example.com.")
|
fs.StringVar(&s.ZoneName, "zone-name", s.ZoneName, "Zone name, like example.com.")
|
||||||
fs.IntVar(&s.ConcurrentServiceSyncs, "concurrent-service-syncs", s.ConcurrentServiceSyncs, "The number of service syncing operations that will be done concurrently. Larger number = faster endpoint updating, but more CPU (and network) load")
|
fs.IntVar(&s.ConcurrentServiceSyncs, "concurrent-service-syncs", s.ConcurrentServiceSyncs, "The number of service syncing operations that will be done concurrently. Larger number = faster endpoint updating, but more CPU (and network) load")
|
||||||
|
fs.IntVar(&s.ConcurrentReplicaSetSyncs, "concurrent-replicaset-syncs", s.ConcurrentReplicaSetSyncs, "The number of ReplicaSets syncing operations that will be done concurrently. Larger number = faster endpoint updating, but more CPU (and network) load")
|
||||||
fs.DurationVar(&s.ClusterMonitorPeriod.Duration, "cluster-monitor-period", s.ClusterMonitorPeriod.Duration, "The period for syncing ClusterStatus in ClusterController.")
|
fs.DurationVar(&s.ClusterMonitorPeriod.Duration, "cluster-monitor-period", s.ClusterMonitorPeriod.Duration, "The period for syncing ClusterStatus in ClusterController.")
|
||||||
fs.BoolVar(&s.EnableProfiling, "profiling", true, "Enable profiling via web interface host:port/debug/pprof/")
|
fs.BoolVar(&s.EnableProfiling, "profiling", true, "Enable profiling via web interface host:port/debug/pprof/")
|
||||||
fs.StringVar(&s.Master, "master", s.Master, "The address of the federation API server (overrides any value in kubeconfig)")
|
fs.StringVar(&s.Master, "master", s.Master, "The address of the federation API server (overrides any value in kubeconfig)")
|
||||||
|
@ -0,0 +1,449 @@
|
|||||||
|
/*
|
||||||
|
Copyright 2016 The Kubernetes Authors.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package replicaset
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"reflect"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/golang/glog"
|
||||||
|
|
||||||
|
fed "k8s.io/kubernetes/federation/apis/federation"
|
||||||
|
fedv1 "k8s.io/kubernetes/federation/apis/federation/v1beta1"
|
||||||
|
fedclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_release_1_4"
|
||||||
|
planner "k8s.io/kubernetes/federation/pkg/federation-controller/replicaset/planner"
|
||||||
|
fedutil "k8s.io/kubernetes/federation/pkg/federation-controller/util"
|
||||||
|
"k8s.io/kubernetes/pkg/api"
|
||||||
|
"k8s.io/kubernetes/pkg/api/meta"
|
||||||
|
apiv1 "k8s.io/kubernetes/pkg/api/v1"
|
||||||
|
extensionsv1 "k8s.io/kubernetes/pkg/apis/extensions/v1beta1"
|
||||||
|
"k8s.io/kubernetes/pkg/client/cache"
|
||||||
|
kubeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_4"
|
||||||
|
"k8s.io/kubernetes/pkg/controller"
|
||||||
|
"k8s.io/kubernetes/pkg/controller/framework"
|
||||||
|
"k8s.io/kubernetes/pkg/runtime"
|
||||||
|
"k8s.io/kubernetes/pkg/util/flowcontrol"
|
||||||
|
"k8s.io/kubernetes/pkg/util/wait"
|
||||||
|
"k8s.io/kubernetes/pkg/util/workqueue"
|
||||||
|
"k8s.io/kubernetes/pkg/watch"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
FedReplicaSetPreferencesAnnotation = ""
|
||||||
|
allClustersKey = "THE_ALL_CLUSTER_KEY"
|
||||||
|
UserAgentName = "Federation-replicaset-Controller"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
replicaSetReviewDelay = 10 * time.Second
|
||||||
|
clusterAvailableDelay = 20 * time.Second
|
||||||
|
clusterUnavailableDelay = 60 * time.Second
|
||||||
|
allReplicaSetReviewDealy = 2 * time.Minute
|
||||||
|
)
|
||||||
|
|
||||||
|
func parseFederationReplicaSetReference(frs *extensionsv1.ReplicaSet) (*fed.FederatedReplicaSetPreferences, error) {
|
||||||
|
accessor, err := meta.Accessor(frs)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
anno := accessor.GetAnnotations()
|
||||||
|
if anno == nil {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
frsPrefString, found := anno[FedReplicaSetPreferencesAnnotation]
|
||||||
|
if !found {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
var frsPref fed.FederatedReplicaSetPreferences
|
||||||
|
if err := json.Unmarshal([]byte(frsPrefString), &frsPref); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return &frsPref, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type ReplicaSetController struct {
|
||||||
|
fedClient fedclientset.Interface
|
||||||
|
|
||||||
|
replicaSetController *framework.Controller
|
||||||
|
replicaSetStore cache.StoreToReplicaSetLister
|
||||||
|
|
||||||
|
fedReplicaSetInformer fedutil.FederatedInformer
|
||||||
|
fedPodInformer fedutil.FederatedInformer
|
||||||
|
|
||||||
|
replicasetDeliverer *fedutil.DelayingDeliverer
|
||||||
|
clusterDeliverer *fedutil.DelayingDeliverer
|
||||||
|
replicasetWorkQueue workqueue.Interface
|
||||||
|
|
||||||
|
replicaSetBackoff *flowcontrol.Backoff
|
||||||
|
|
||||||
|
defaultPlanner *planner.Planner
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewclusterController returns a new cluster controller
|
||||||
|
func NewReplicaSetController(federationClient fedclientset.Interface) *ReplicaSetController {
|
||||||
|
frsc := &ReplicaSetController{
|
||||||
|
fedClient: federationClient,
|
||||||
|
replicasetDeliverer: fedutil.NewDelayingDeliverer(),
|
||||||
|
clusterDeliverer: fedutil.NewDelayingDeliverer(),
|
||||||
|
replicasetWorkQueue: workqueue.New(),
|
||||||
|
replicaSetBackoff: flowcontrol.NewBackOff(5*time.Second, time.Minute),
|
||||||
|
defaultPlanner: planner.NewPlanner(&fed.FederatedReplicaSetPreferences{
|
||||||
|
Clusters: map[string]fed.ClusterReplicaSetPreferences{
|
||||||
|
"*": {Weight: 1},
|
||||||
|
},
|
||||||
|
}),
|
||||||
|
}
|
||||||
|
|
||||||
|
replicaSetFedInformerFactory := func(cluster *fedv1.Cluster, clientset kubeclientset.Interface) (cache.Store, framework.ControllerInterface) {
|
||||||
|
return framework.NewInformer(
|
||||||
|
&cache.ListWatch{
|
||||||
|
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
|
||||||
|
return clientset.Extensions().ReplicaSets(apiv1.NamespaceAll).List(options)
|
||||||
|
},
|
||||||
|
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
|
||||||
|
return clientset.Extensions().ReplicaSets(apiv1.NamespaceAll).Watch(options)
|
||||||
|
},
|
||||||
|
},
|
||||||
|
&extensionsv1.ReplicaSet{},
|
||||||
|
controller.NoResyncPeriodFunc(),
|
||||||
|
fedutil.NewTriggerOnAllChanges(
|
||||||
|
func(obj runtime.Object) { frsc.deliverLocalReplicaSet(obj, allReplicaSetReviewDealy) },
|
||||||
|
),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
clusterLifecycle := fedutil.ClusterLifecycleHandlerFuncs{
|
||||||
|
ClusterAvailable: func(cluster *fedv1.Cluster) {
|
||||||
|
frsc.clusterDeliverer.DeliverAfter(allClustersKey, nil, clusterUnavailableDelay)
|
||||||
|
},
|
||||||
|
ClusterUnavailable: func(cluster *fedv1.Cluster, _ []interface{}) {
|
||||||
|
frsc.clusterDeliverer.DeliverAfter(allClustersKey, nil, clusterUnavailableDelay)
|
||||||
|
},
|
||||||
|
}
|
||||||
|
frsc.fedReplicaSetInformer = fedutil.NewFederatedInformer(federationClient, replicaSetFedInformerFactory, &clusterLifecycle)
|
||||||
|
|
||||||
|
podFedInformerFactory := func(cluster *fedv1.Cluster, clientset kubeclientset.Interface) (cache.Store, framework.ControllerInterface) {
|
||||||
|
return framework.NewInformer(
|
||||||
|
&cache.ListWatch{
|
||||||
|
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
|
||||||
|
return clientset.Core().Pods(apiv1.NamespaceAll).List(options)
|
||||||
|
},
|
||||||
|
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
|
||||||
|
return clientset.Core().Pods(apiv1.NamespaceAll).Watch(options)
|
||||||
|
},
|
||||||
|
},
|
||||||
|
&apiv1.Pod{},
|
||||||
|
controller.NoResyncPeriodFunc(),
|
||||||
|
fedutil.NewTriggerOnAllChanges(
|
||||||
|
func(obj runtime.Object) {
|
||||||
|
frsc.clusterDeliverer.DeliverAfter(allClustersKey, nil, clusterUnavailableDelay)
|
||||||
|
},
|
||||||
|
),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
frsc.fedPodInformer = fedutil.NewFederatedInformer(federationClient, podFedInformerFactory, &fedutil.ClusterLifecycleHandlerFuncs{})
|
||||||
|
|
||||||
|
frsc.replicaSetStore.Store, frsc.replicaSetController = framework.NewInformer(
|
||||||
|
&cache.ListWatch{
|
||||||
|
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
|
||||||
|
return frsc.fedClient.Extensions().ReplicaSets(apiv1.NamespaceAll).List(options)
|
||||||
|
},
|
||||||
|
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
|
||||||
|
return frsc.fedClient.Extensions().ReplicaSets(apiv1.NamespaceAll).Watch(options)
|
||||||
|
},
|
||||||
|
},
|
||||||
|
&extensionsv1.ReplicaSet{},
|
||||||
|
controller.NoResyncPeriodFunc(),
|
||||||
|
fedutil.NewTriggerOnMetaAndSpecChanges(
|
||||||
|
func(obj runtime.Object) { frsc.deliverFedReplicaSetObj(obj, replicaSetReviewDelay) },
|
||||||
|
),
|
||||||
|
)
|
||||||
|
|
||||||
|
return frsc
|
||||||
|
}
|
||||||
|
|
||||||
|
func (frsc *ReplicaSetController) Run(workers int, stopCh <-chan struct{}) {
|
||||||
|
go frsc.replicaSetController.Run(stopCh)
|
||||||
|
frsc.fedReplicaSetInformer.Start()
|
||||||
|
frsc.fedPodInformer.Start()
|
||||||
|
|
||||||
|
frsc.replicasetDeliverer.StartWithHandler(func(item *fedutil.DelayingDelivererItem) {
|
||||||
|
frsc.replicasetWorkQueue.Add(item.Key)
|
||||||
|
})
|
||||||
|
frsc.clusterDeliverer.StartWithHandler(func(_ *fedutil.DelayingDelivererItem) {
|
||||||
|
frsc.reconcileReplicaSetsOnClusterChange()
|
||||||
|
})
|
||||||
|
|
||||||
|
for !frsc.isSynced() {
|
||||||
|
time.Sleep(5 * time.Millisecond)
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := 0; i < workers; i++ {
|
||||||
|
go wait.Until(frsc.worker, time.Second, stopCh)
|
||||||
|
}
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
select {
|
||||||
|
case <-time.After(time.Minute):
|
||||||
|
frsc.replicaSetBackoff.GC()
|
||||||
|
case <-stopCh:
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
<-stopCh
|
||||||
|
glog.Infof("Shutting down ReplicaSetController")
|
||||||
|
frsc.replicasetDeliverer.Stop()
|
||||||
|
frsc.clusterDeliverer.Stop()
|
||||||
|
frsc.replicasetWorkQueue.ShutDown()
|
||||||
|
frsc.fedReplicaSetInformer.Stop()
|
||||||
|
frsc.fedPodInformer.Stop()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (frsc *ReplicaSetController) isSynced() bool {
|
||||||
|
if !frsc.fedReplicaSetInformer.ClustersSynced() {
|
||||||
|
glog.V(2).Infof("Cluster list not synced")
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
clusters, err := frsc.fedReplicaSetInformer.GetReadyClusters()
|
||||||
|
if err != nil {
|
||||||
|
glog.Errorf("Failed to get ready clusters: %v", err)
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
if !frsc.fedReplicaSetInformer.GetTargetStore().ClustersSynced(clusters) {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
if !frsc.fedPodInformer.ClustersSynced() {
|
||||||
|
glog.V(2).Infof("Cluster list not synced")
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
if !frsc.fedPodInformer.GetTargetStore().ClustersSynced(clusters) {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
if !frsc.replicaSetController.HasSynced() {
|
||||||
|
glog.V(2).Infof("federation replicaset list not synced")
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
func (frsc *ReplicaSetController) deliverLocalReplicaSet(obj interface{}, duration time.Duration) {
|
||||||
|
key, err := controller.KeyFunc(obj)
|
||||||
|
if err != nil {
|
||||||
|
glog.Errorf("Couldn't get key for object %v: %v", obj, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
_, exists, err := frsc.replicaSetStore.Store.GetByKey(key)
|
||||||
|
if err != nil {
|
||||||
|
glog.Errorf("Couldn't get federation replicaset %v: %v", key, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if exists { // ignore replicasets exists only in local k8s
|
||||||
|
frsc.deliverReplicaSetByKey(key, duration, false)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (frsc *ReplicaSetController) deliverFedReplicaSetObj(obj interface{}, delay time.Duration) {
|
||||||
|
key, err := controller.KeyFunc(obj)
|
||||||
|
if err != nil {
|
||||||
|
glog.Errorf("Couldn't get key for object %+v: %v", obj, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
frsc.deliverReplicaSetByKey(key, delay, false)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (frsc *ReplicaSetController) deliverReplicaSetByKey(key string, delay time.Duration, failed bool) {
|
||||||
|
if failed {
|
||||||
|
frsc.replicaSetBackoff.Next(key, time.Now())
|
||||||
|
delay = delay + frsc.replicaSetBackoff.Get(key)
|
||||||
|
} else {
|
||||||
|
frsc.replicaSetBackoff.Reset(key)
|
||||||
|
}
|
||||||
|
frsc.replicasetDeliverer.DeliverAfter(key, nil, delay)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (frsc *ReplicaSetController) worker() {
|
||||||
|
for {
|
||||||
|
item, quit := frsc.replicasetWorkQueue.Get()
|
||||||
|
if quit {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
key := item.(string)
|
||||||
|
err := frsc.reconcileReplicaSet(key)
|
||||||
|
frsc.replicasetWorkQueue.Done(item)
|
||||||
|
if err != nil {
|
||||||
|
glog.Errorf("Error syncing cluster controller: %v", err)
|
||||||
|
frsc.deliverReplicaSetByKey(key, 0, true)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (frsc *ReplicaSetController) schedule(frs *extensionsv1.ReplicaSet, clusters []*fedv1.Cluster,
|
||||||
|
current map[string]int64, estimatedCapacity map[string]int64) map[string]int64 {
|
||||||
|
// TODO: integrate real scheduler
|
||||||
|
|
||||||
|
plnr := frsc.defaultPlanner
|
||||||
|
frsPref, err := parseFederationReplicaSetReference(frs)
|
||||||
|
if err != nil {
|
||||||
|
glog.Info("Invalid ReplicaSet specific preference, use default. rs: %v, err: %v", frs, err)
|
||||||
|
}
|
||||||
|
if frsPref != nil { // create a new planner if user specified a preference
|
||||||
|
plnr = planner.NewPlanner(frsPref)
|
||||||
|
}
|
||||||
|
|
||||||
|
replicas := int64(*frs.Spec.Replicas)
|
||||||
|
var clusterNames []string
|
||||||
|
for _, cluster := range clusters {
|
||||||
|
clusterNames = append(clusterNames, cluster.Name)
|
||||||
|
}
|
||||||
|
scheduleResult, overflow := plnr.Plan(replicas, clusterNames, current, estimatedCapacity)
|
||||||
|
// make sure the return contains clusters need to zero the replicas
|
||||||
|
result := make(map[string]int64)
|
||||||
|
for clusterName := range current {
|
||||||
|
result[clusterName] = 0
|
||||||
|
}
|
||||||
|
for clusterName, replicas := range scheduleResult {
|
||||||
|
result[clusterName] = replicas
|
||||||
|
}
|
||||||
|
for clusterName, replicas := range overflow {
|
||||||
|
result[clusterName] += replicas
|
||||||
|
}
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
|
||||||
|
func (frsc *ReplicaSetController) reconcileReplicaSet(key string) error {
|
||||||
|
if !frsc.isSynced() {
|
||||||
|
frsc.deliverReplicaSetByKey(key, clusterAvailableDelay, false)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
glog.Infof("Start reconcile replicaset %q", key)
|
||||||
|
startTime := time.Now()
|
||||||
|
defer glog.Infof("Finished reconcile replicaset %q (%v)", key, time.Now().Sub(startTime))
|
||||||
|
|
||||||
|
obj, exists, err := frsc.replicaSetStore.Store.GetByKey(key)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if !exists {
|
||||||
|
// don't delete local replicasets for now
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
frs := obj.(*extensionsv1.ReplicaSet)
|
||||||
|
|
||||||
|
clusters, err := frsc.fedReplicaSetInformer.GetReadyClusters()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// collect current status and do schedule
|
||||||
|
allPods, err := frsc.fedPodInformer.GetTargetStore().List()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
podStatus, err := AnalysePods(frs, allPods, time.Now())
|
||||||
|
current := make(map[string]int64)
|
||||||
|
estimatedCapacity := make(map[string]int64)
|
||||||
|
for _, cluster := range clusters {
|
||||||
|
lrsObj, exists, err := frsc.fedReplicaSetInformer.GetTargetStore().GetByKey(cluster.Name, key)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if exists {
|
||||||
|
lrs := lrsObj.(*extensionsv1.ReplicaSet)
|
||||||
|
current[cluster.Name] = int64(podStatus[cluster.Name].RunningAndReady) // include pending as well?
|
||||||
|
unschedulable := int64(podStatus[cluster.Name].Unschedulable)
|
||||||
|
if unschedulable > 0 {
|
||||||
|
estimatedCapacity[cluster.Name] = int64(*lrs.Spec.Replicas) - unschedulable
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
scheduleResult := frsc.schedule(frs, clusters, current, estimatedCapacity)
|
||||||
|
|
||||||
|
glog.Infof("Start syncing local replicaset %v", scheduleResult)
|
||||||
|
|
||||||
|
fedStatus := extensionsv1.ReplicaSetStatus{ObservedGeneration: frs.Generation}
|
||||||
|
for clusterName, replicas := range scheduleResult {
|
||||||
|
// TODO: updater or parallelizer doesnn't help as results are needed for updating fed rs status
|
||||||
|
clusterClient, err := frsc.fedReplicaSetInformer.GetClientsetForCluster(clusterName)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
lrsObj, exists, err := frsc.fedReplicaSetInformer.GetTargetStore().GetByKey(clusterName, key)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
} else if !exists {
|
||||||
|
if replicas > 0 {
|
||||||
|
lrs := &extensionsv1.ReplicaSet{
|
||||||
|
ObjectMeta: apiv1.ObjectMeta{
|
||||||
|
Name: frs.Name,
|
||||||
|
Namespace: frs.Namespace,
|
||||||
|
Labels: frs.Labels,
|
||||||
|
Annotations: frs.Annotations,
|
||||||
|
},
|
||||||
|
Spec: frs.Spec,
|
||||||
|
}
|
||||||
|
specReplicas := int32(replicas)
|
||||||
|
lrs.Spec.Replicas = &specReplicas
|
||||||
|
lrs, err = clusterClient.Extensions().ReplicaSets(frs.Namespace).Create(lrs)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
fedStatus.Replicas += lrs.Status.Replicas
|
||||||
|
fedStatus.FullyLabeledReplicas += lrs.Status.FullyLabeledReplicas
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
lrs := lrsObj.(*extensionsv1.ReplicaSet)
|
||||||
|
lrsExpectedSpec := frs.Spec
|
||||||
|
specReplicas := int32(replicas)
|
||||||
|
lrsExpectedSpec.Replicas = &specReplicas
|
||||||
|
if !reflect.DeepEqual(lrs.Spec, lrsExpectedSpec) {
|
||||||
|
lrs.Spec = lrsExpectedSpec
|
||||||
|
lrs, err = clusterClient.Extensions().ReplicaSets(frs.Namespace).Update(lrs)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
fedStatus.Replicas += lrs.Status.Replicas
|
||||||
|
fedStatus.FullyLabeledReplicas += lrs.Status.FullyLabeledReplicas
|
||||||
|
// leave the replicaset even the replicas dropped to 0
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if fedStatus.Replicas != frs.Status.Replicas || fedStatus.FullyLabeledReplicas != frs.Status.FullyLabeledReplicas {
|
||||||
|
frs.Status = fedStatus
|
||||||
|
_, err = frsc.fedClient.Extensions().ReplicaSets(frs.Namespace).UpdateStatus(frs)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (frsc *ReplicaSetController) reconcileReplicaSetsOnClusterChange() {
|
||||||
|
if !frsc.isSynced() {
|
||||||
|
frsc.clusterDeliverer.DeliverAfter(allClustersKey, nil, clusterAvailableDelay)
|
||||||
|
}
|
||||||
|
rss := frsc.replicaSetStore.Store.List()
|
||||||
|
for _, rs := range rss {
|
||||||
|
key, _ := controller.KeyFunc(rs)
|
||||||
|
frsc.deliverReplicaSetByKey(key, 0, false)
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,190 @@
|
|||||||
|
/*
|
||||||
|
Copyright 2016 The Kubernetes Authors.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package replicaset
|
||||||
|
|
||||||
|
import (
|
||||||
|
"flag"
|
||||||
|
"fmt"
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
fedv1 "k8s.io/kubernetes/federation/apis/federation/v1beta1"
|
||||||
|
fedclientfake "k8s.io/kubernetes/federation/client/clientset_generated/federation_release_1_4/fake"
|
||||||
|
fedutil "k8s.io/kubernetes/federation/pkg/federation-controller/util"
|
||||||
|
"k8s.io/kubernetes/pkg/api/meta"
|
||||||
|
apiv1 "k8s.io/kubernetes/pkg/api/v1"
|
||||||
|
extensionsv1 "k8s.io/kubernetes/pkg/apis/extensions/v1beta1"
|
||||||
|
kube_release_1_4 "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_4"
|
||||||
|
kubeclientfake "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_4/fake"
|
||||||
|
"k8s.io/kubernetes/pkg/client/testing/core"
|
||||||
|
"k8s.io/kubernetes/pkg/watch"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestParseFederationReplicaSetReference(t *testing.T) {
|
||||||
|
successPrefs := []string{
|
||||||
|
`{"rebalance": true,
|
||||||
|
"clusters": {
|
||||||
|
"k8s-1": {"minReplicas": 10, "maxReplicas": 20, "weight": 2},
|
||||||
|
"*": {"weight": 1}
|
||||||
|
}}`,
|
||||||
|
}
|
||||||
|
failedPrefes := []string{
|
||||||
|
`{`, // bad json
|
||||||
|
}
|
||||||
|
|
||||||
|
rs := newReplicaSetWithReplicas("rs-1", 100)
|
||||||
|
accessor, _ := meta.Accessor(rs)
|
||||||
|
anno := accessor.GetAnnotations()
|
||||||
|
if anno == nil {
|
||||||
|
anno = make(map[string]string)
|
||||||
|
accessor.SetAnnotations(anno)
|
||||||
|
}
|
||||||
|
for _, prefString := range successPrefs {
|
||||||
|
anno[FedReplicaSetPreferencesAnnotation] = prefString
|
||||||
|
pref, err := parseFederationReplicaSetReference(rs)
|
||||||
|
assert.NotNil(t, pref)
|
||||||
|
assert.Nil(t, err)
|
||||||
|
}
|
||||||
|
for _, prefString := range failedPrefes {
|
||||||
|
anno[FedReplicaSetPreferencesAnnotation] = prefString
|
||||||
|
pref, err := parseFederationReplicaSetReference(rs)
|
||||||
|
assert.Nil(t, pref)
|
||||||
|
assert.NotNil(t, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestReplicaSetController(t *testing.T) {
|
||||||
|
flag.Set("logtostderr", "true")
|
||||||
|
flag.Set("v", "5")
|
||||||
|
flag.Parse()
|
||||||
|
|
||||||
|
replicaSetReviewDelay = 10 * time.Millisecond
|
||||||
|
clusterAvailableDelay = 20 * time.Millisecond
|
||||||
|
clusterUnavailableDelay = 60 * time.Millisecond
|
||||||
|
allReplicaSetReviewDealy = 120 * time.Millisecond
|
||||||
|
|
||||||
|
fedclientset := fedclientfake.NewSimpleClientset()
|
||||||
|
fedrswatch := watch.NewFake()
|
||||||
|
fedclientset.PrependWatchReactor("replicasets", core.DefaultWatchReactor(fedrswatch, nil))
|
||||||
|
|
||||||
|
fedclientset.Federation().Clusters().Create(newClusterWithReadyStatus("k8s-1", apiv1.ConditionTrue))
|
||||||
|
fedclientset.Federation().Clusters().Create(newClusterWithReadyStatus("k8s-2", apiv1.ConditionTrue))
|
||||||
|
|
||||||
|
kube1clientset := kubeclientfake.NewSimpleClientset()
|
||||||
|
kube1rswatch := watch.NewFake()
|
||||||
|
kube1clientset.PrependWatchReactor("replicasets", core.DefaultWatchReactor(kube1rswatch, nil))
|
||||||
|
kube1Podwatch := watch.NewFake()
|
||||||
|
kube1clientset.PrependWatchReactor("pods", core.DefaultWatchReactor(kube1Podwatch, nil))
|
||||||
|
kube2clientset := kubeclientfake.NewSimpleClientset()
|
||||||
|
kube2rswatch := watch.NewFake()
|
||||||
|
kube2clientset.PrependWatchReactor("replicasets", core.DefaultWatchReactor(kube2rswatch, nil))
|
||||||
|
kube2Podwatch := watch.NewFake()
|
||||||
|
kube2clientset.PrependWatchReactor("pods", core.DefaultWatchReactor(kube2Podwatch, nil))
|
||||||
|
|
||||||
|
fedInformerClientFactory := func(cluster *fedv1.Cluster) (kube_release_1_4.Interface, error) {
|
||||||
|
switch cluster.Name {
|
||||||
|
case "k8s-1":
|
||||||
|
return kube1clientset, nil
|
||||||
|
case "k8s-2":
|
||||||
|
return kube2clientset, nil
|
||||||
|
default:
|
||||||
|
return nil, fmt.Errorf("Unknown cluster: %v", cluster.Name)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
replicaSetController := NewReplicaSetController(fedclientset)
|
||||||
|
rsFedinformer := toFederatedInformerForTestOnly(replicaSetController.fedReplicaSetInformer)
|
||||||
|
rsFedinformer.SetClientFactory(fedInformerClientFactory)
|
||||||
|
podFedinformer := toFederatedInformerForTestOnly(replicaSetController.fedPodInformer)
|
||||||
|
podFedinformer.SetClientFactory(fedInformerClientFactory)
|
||||||
|
|
||||||
|
stopChan := make(chan struct{})
|
||||||
|
defer close(stopChan)
|
||||||
|
go replicaSetController.Run(1, stopChan)
|
||||||
|
|
||||||
|
rs := newReplicaSetWithReplicas("rs", 9)
|
||||||
|
rs, _ = fedclientset.Extensions().ReplicaSets(apiv1.NamespaceDefault).Create(rs)
|
||||||
|
fedrswatch.Add(rs)
|
||||||
|
time.Sleep(1 * time.Second)
|
||||||
|
|
||||||
|
rs1, _ := kube1clientset.Extensions().ReplicaSets(apiv1.NamespaceDefault).Get(rs.Name)
|
||||||
|
kube1rswatch.Add(rs1)
|
||||||
|
rs1.Status.Replicas = *rs1.Spec.Replicas
|
||||||
|
rs1, _ = kube1clientset.Extensions().ReplicaSets(apiv1.NamespaceDefault).UpdateStatus(rs1)
|
||||||
|
kube1rswatch.Modify(rs1)
|
||||||
|
|
||||||
|
rs2, _ := kube2clientset.Extensions().ReplicaSets(apiv1.NamespaceDefault).Get(rs.Name)
|
||||||
|
kube2rswatch.Add(rs2)
|
||||||
|
rs2.Status.Replicas = *rs2.Spec.Replicas
|
||||||
|
rs2, _ = kube2clientset.Extensions().ReplicaSets(apiv1.NamespaceDefault).UpdateStatus(rs2)
|
||||||
|
kube2rswatch.Modify(rs2)
|
||||||
|
|
||||||
|
time.Sleep(1 * time.Second)
|
||||||
|
rs, _ = fedclientset.Extensions().ReplicaSets(apiv1.NamespaceDefault).Get(rs.Name)
|
||||||
|
assert.Equal(t, *rs.Spec.Replicas, *rs1.Spec.Replicas+*rs2.Spec.Replicas)
|
||||||
|
assert.Equal(t, rs.Status.Replicas, rs1.Status.Replicas+rs2.Status.Replicas)
|
||||||
|
|
||||||
|
var replicas int32 = 20
|
||||||
|
rs.Spec.Replicas = &replicas
|
||||||
|
rs, _ = fedclientset.Extensions().ReplicaSets(apiv1.NamespaceDefault).Update(rs)
|
||||||
|
fedrswatch.Modify(rs)
|
||||||
|
time.Sleep(1 * time.Second)
|
||||||
|
|
||||||
|
rs1, _ = kube1clientset.Extensions().ReplicaSets(apiv1.NamespaceDefault).Get(rs.Name)
|
||||||
|
rs1.Status.Replicas = *rs1.Spec.Replicas
|
||||||
|
rs1, _ = kube1clientset.Extensions().ReplicaSets(apiv1.NamespaceDefault).UpdateStatus(rs1)
|
||||||
|
kube1rswatch.Modify(rs1)
|
||||||
|
|
||||||
|
rs2, _ = kube2clientset.Extensions().ReplicaSets(apiv1.NamespaceDefault).Get(rs.Name)
|
||||||
|
rs2.Status.Replicas = *rs2.Spec.Replicas
|
||||||
|
rs2, _ = kube2clientset.Extensions().ReplicaSets(apiv1.NamespaceDefault).UpdateStatus(rs2)
|
||||||
|
kube2rswatch.Modify(rs2)
|
||||||
|
|
||||||
|
time.Sleep(1 * time.Second)
|
||||||
|
rs, _ = fedclientset.Extensions().ReplicaSets(apiv1.NamespaceDefault).Get(rs.Name)
|
||||||
|
assert.Equal(t, *rs.Spec.Replicas, *rs1.Spec.Replicas+*rs2.Spec.Replicas)
|
||||||
|
assert.Equal(t, rs.Status.Replicas, rs1.Status.Replicas+rs2.Status.Replicas)
|
||||||
|
}
|
||||||
|
|
||||||
|
func toFederatedInformerForTestOnly(informer fedutil.FederatedInformer) fedutil.FederatedInformerForTestOnly {
|
||||||
|
inter := informer.(interface{})
|
||||||
|
return inter.(fedutil.FederatedInformerForTestOnly)
|
||||||
|
}
|
||||||
|
|
||||||
|
func newClusterWithReadyStatus(name string, readyStatus apiv1.ConditionStatus) *fedv1.Cluster {
|
||||||
|
return &fedv1.Cluster{
|
||||||
|
ObjectMeta: apiv1.ObjectMeta{
|
||||||
|
Name: name,
|
||||||
|
},
|
||||||
|
Status: fedv1.ClusterStatus{
|
||||||
|
Conditions: []fedv1.ClusterCondition{
|
||||||
|
{Type: fedv1.ClusterReady, Status: readyStatus},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func newReplicaSetWithReplicas(name string, replicas int32) *extensionsv1.ReplicaSet {
|
||||||
|
return &extensionsv1.ReplicaSet{
|
||||||
|
ObjectMeta: apiv1.ObjectMeta{
|
||||||
|
Name: name,
|
||||||
|
Namespace: apiv1.NamespaceDefault,
|
||||||
|
},
|
||||||
|
Spec: extensionsv1.ReplicaSetSpec{
|
||||||
|
Replicas: &replicas,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user