mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 11:50:44 +00:00
Merge pull request #23430 from huangyuqi/uber-cluster-manager
Automatic merge from submit-queue Implement the cluster-controller of kubernetes federation This PR is the cluster-controller of kubernetes federation. This controller of federation phase 1 just collect the status of federated clusters.
This commit is contained in:
commit
3a5d53d467
35
federation/client/cache/cluster_cache.go
vendored
Normal file
35
federation/client/cache/cluster_cache.go
vendored
Normal file
@ -0,0 +1,35 @@
|
||||
/*
|
||||
Copyright 2016 The Kubernetes Authors All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package cache
|
||||
|
||||
import (
|
||||
federation_v1alpha1 "k8s.io/kubernetes/federation/apis/federation/v1alpha1"
|
||||
kubeCache "k8s.io/kubernetes/pkg/client/cache"
|
||||
)
|
||||
|
||||
// StoreToClusterLister makes a Store have the List method of the unversioned.ClusterInterface
|
||||
// The Store must contain (only) clusters.
|
||||
type StoreToClusterLister struct {
|
||||
kubeCache.Store
|
||||
}
|
||||
|
||||
func (s *StoreToClusterLister) List() (clusters federation_v1alpha1.ClusterList, err error) {
|
||||
for _, m := range s.Store.List() {
|
||||
clusters.Items = append(clusters.Items, *(m.(*federation_v1alpha1.Cluster)))
|
||||
}
|
||||
return clusters, nil
|
||||
}
|
4
federation/cmd/federation-controller-manager/OWNERS
Normal file
4
federation/cmd/federation-controller-manager/OWNERS
Normal file
@ -0,0 +1,4 @@
|
||||
assignees:
|
||||
- quinton-hoole
|
||||
- nikhiljindal
|
||||
- madhusundancs
|
@ -0,0 +1,109 @@
|
||||
/*
|
||||
Copyright 2016 The Kubernetes Authors All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
// Package app implements a server that runs a set of active
|
||||
// components. This includes cluster controller
|
||||
|
||||
package app
|
||||
|
||||
import (
|
||||
"net"
|
||||
"net/http"
|
||||
"net/http/pprof"
|
||||
"strconv"
|
||||
|
||||
"k8s.io/kubernetes/federation/cmd/federation-controller-manager/app/options"
|
||||
"k8s.io/kubernetes/pkg/client/restclient"
|
||||
|
||||
federationclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_release_1_3"
|
||||
clustercontroller "k8s.io/kubernetes/federation/pkg/federation-controller/cluster"
|
||||
"k8s.io/kubernetes/pkg/client/unversioned/clientcmd"
|
||||
"k8s.io/kubernetes/pkg/healthz"
|
||||
"k8s.io/kubernetes/pkg/util/configz"
|
||||
|
||||
"github.com/golang/glog"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/spf13/cobra"
|
||||
"github.com/spf13/pflag"
|
||||
)
|
||||
|
||||
// NewControllerManagerCommand creates a *cobra.Command object with default parameters
|
||||
func NewControllerManagerCommand() *cobra.Command {
|
||||
s := options.NewCMServer()
|
||||
s.AddFlags(pflag.CommandLine)
|
||||
cmd := &cobra.Command{
|
||||
Use: "federation-controller-manager",
|
||||
Long: `The federation controller manager is a daemon that embeds
|
||||
the core control loops shipped with federation. In applications of robotics and
|
||||
automation, a control loop is a non-terminating loop that regulates the state of
|
||||
the system. In federation, a controller is a control loop that watches the shared
|
||||
state of the federation cluster through the apiserver and makes changes attempting
|
||||
to move the current state towards the desired state. Examples of controllers that
|
||||
ship with federation today is the cluster controller.`,
|
||||
Run: func(cmd *cobra.Command, args []string) {
|
||||
},
|
||||
}
|
||||
|
||||
return cmd
|
||||
}
|
||||
|
||||
// Run runs the CMServer. This should never exit.
|
||||
func Run(s *options.CMServer) error {
|
||||
if c, err := configz.New("componentconfig"); err == nil {
|
||||
c.Set(s.ControllerManagerConfiguration)
|
||||
} else {
|
||||
glog.Errorf("unable to register configz: %s", err)
|
||||
}
|
||||
restClientCfg, err := clientcmd.BuildConfigFromFlags(s.Master, s.Kubeconfig)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Override restClientCfg qps/burst settings from flags
|
||||
restClientCfg.QPS = s.APIServerQPS
|
||||
restClientCfg.Burst = s.APIServerBurst
|
||||
|
||||
go func() {
|
||||
mux := http.NewServeMux()
|
||||
healthz.InstallHandler(mux)
|
||||
if s.EnableProfiling {
|
||||
mux.HandleFunc("/debug/pprof/", pprof.Index)
|
||||
mux.HandleFunc("/debug/pprof/profile", pprof.Profile)
|
||||
mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
|
||||
}
|
||||
mux.Handle("/metrics", prometheus.Handler())
|
||||
|
||||
server := &http.Server{
|
||||
Addr: net.JoinHostPort(s.Address, strconv.Itoa(s.Port)),
|
||||
Handler: mux,
|
||||
}
|
||||
glog.Fatal(server.ListenAndServe())
|
||||
}()
|
||||
|
||||
run := func() {
|
||||
err := StartControllers(s, restClientCfg)
|
||||
glog.Fatalf("error running controllers: %v", err)
|
||||
panic("unreachable")
|
||||
}
|
||||
run()
|
||||
panic("unreachable")
|
||||
}
|
||||
|
||||
func StartControllers(s *options.CMServer, restClientCfg *restclient.Config) error {
|
||||
federationClientSet := federationclientset.NewForConfigOrDie(restclient.AddUserAgent(restClientCfg, "cluster-controller"))
|
||||
go clustercontroller.NewclusterController(federationClientSet, s.ClusterMonitorPeriod.Duration).Run()
|
||||
select {}
|
||||
}
|
@ -0,0 +1,89 @@
|
||||
/*
|
||||
Copyright 2016 The Kubernetes Authors All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
// Package options provides the flags used for the controller manager.
|
||||
|
||||
package options
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/spf13/pflag"
|
||||
"k8s.io/kubernetes/pkg/api/unversioned"
|
||||
"k8s.io/kubernetes/pkg/apis/componentconfig"
|
||||
"k8s.io/kubernetes/pkg/client/leaderelection"
|
||||
)
|
||||
|
||||
type ControllerManagerConfiguration struct {
|
||||
// port is the port that the controller-manager's http service runs on.
|
||||
Port int `json:"port"`
|
||||
// address is the IP address to serve on (set to 0.0.0.0 for all interfaces).
|
||||
Address string `json:"address"`
|
||||
// clusterMonitorPeriod is the period for syncing ClusterStatus in cluster controller.
|
||||
ClusterMonitorPeriod unversioned.Duration `json:"clusterMonitorPeriod"`
|
||||
// APIServerQPS is the QPS to use while talking with federation apiserver.
|
||||
APIServerQPS float32 `json:"federatedAPIQPS"`
|
||||
// APIServerBurst is the burst to use while talking with federation apiserver.
|
||||
APIServerBurst int `json:"federatedAPIBurst"`
|
||||
// enableProfiling enables profiling via web interface host:port/debug/pprof/
|
||||
EnableProfiling bool `json:"enableProfiling"`
|
||||
// leaderElection defines the configuration of leader election client.
|
||||
LeaderElection componentconfig.LeaderElectionConfiguration `json:"leaderElection"`
|
||||
// contentType is contentType of requests sent to apiserver.
|
||||
ContentType string `json:"contentType"`
|
||||
}
|
||||
|
||||
// CMServer is the main context object for the controller manager.
|
||||
type CMServer struct {
|
||||
ControllerManagerConfiguration
|
||||
Master string
|
||||
Kubeconfig string
|
||||
}
|
||||
|
||||
const (
|
||||
// FederatedControllerManagerPort is the default port for the federation controller manager status server.
|
||||
// May be overridden by a flag at startup.
|
||||
FederatedControllerManagerPort = 10253
|
||||
)
|
||||
|
||||
// NewCMServer creates a new CMServer with a default config.
|
||||
func NewCMServer() *CMServer {
|
||||
s := CMServer{
|
||||
ControllerManagerConfiguration: ControllerManagerConfiguration{
|
||||
Port: FederatedControllerManagerPort,
|
||||
Address: "0.0.0.0",
|
||||
ClusterMonitorPeriod: unversioned.Duration{Duration: 40 * time.Second},
|
||||
APIServerQPS: 20.0,
|
||||
APIServerBurst: 30,
|
||||
LeaderElection: leaderelection.DefaultLeaderElectionConfiguration(),
|
||||
},
|
||||
}
|
||||
return &s
|
||||
}
|
||||
|
||||
// AddFlags adds flags for a specific CMServer to the specified FlagSet
|
||||
func (s *CMServer) AddFlags(fs *pflag.FlagSet) {
|
||||
fs.IntVar(&s.Port, "port", s.Port, "The port that the controller-manager's http service runs on")
|
||||
fs.Var(componentconfig.IPVar{Val: &s.Address}, "address", "The IP address to serve on (set to 0.0.0.0 for all interfaces)")
|
||||
fs.DurationVar(&s.ClusterMonitorPeriod.Duration, "cluster-monitor-period", s.ClusterMonitorPeriod.Duration, "The period for syncing ClusterStatus in ClusterController.")
|
||||
fs.BoolVar(&s.EnableProfiling, "profiling", true, "Enable profiling via web interface host:port/debug/pprof/")
|
||||
fs.StringVar(&s.Master, "master", s.Master, "The address of the federation API server (overrides any value in kubeconfig)")
|
||||
fs.StringVar(&s.Kubeconfig, "kubeconfig", s.Kubeconfig, "Path to kubeconfig file with authorization and master location information.")
|
||||
fs.StringVar(&s.ContentType, "kube-api-content-type", s.ContentType, "ContentType of requests sent to apiserver. Passing application/vnd.kubernetes.protobuf is an experimental feature now.")
|
||||
fs.Float32Var(&s.APIServerQPS, "federated-api-qps", s.APIServerQPS, "QPS to use while talking with federation apiserver")
|
||||
fs.IntVar(&s.APIServerBurst, "federated-api-burst", s.APIServerBurst, "Burst to use while talking with federation apiserver")
|
||||
leaderelection.BindFlags(&s.LeaderElection, fs)
|
||||
}
|
@ -0,0 +1,52 @@
|
||||
/*
|
||||
Copyright 2014 The Kubernetes Authors All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"runtime"
|
||||
|
||||
"github.com/spf13/pflag"
|
||||
"k8s.io/kubernetes/federation/cmd/federation-controller-manager/app"
|
||||
"k8s.io/kubernetes/federation/cmd/federation-controller-manager/app/options"
|
||||
"k8s.io/kubernetes/pkg/healthz"
|
||||
"k8s.io/kubernetes/pkg/util"
|
||||
"k8s.io/kubernetes/pkg/util/flag"
|
||||
"k8s.io/kubernetes/pkg/version/verflag"
|
||||
)
|
||||
|
||||
func init() {
|
||||
healthz.DefaultHealthz()
|
||||
}
|
||||
|
||||
func main() {
|
||||
runtime.GOMAXPROCS(runtime.NumCPU())
|
||||
s := options.NewCMServer()
|
||||
s.AddFlags(pflag.CommandLine)
|
||||
|
||||
flag.InitFlags()
|
||||
util.InitLogs()
|
||||
defer util.FlushLogs()
|
||||
|
||||
verflag.PrintAndExitIfRequested()
|
||||
|
||||
if err := app.Run(s); err != nil {
|
||||
fmt.Fprintf(os.Stderr, "%v\n", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
}
|
4
federation/pkg/federation-controller/OWNERS
Normal file
4
federation/pkg/federation-controller/OWNERS
Normal file
@ -0,0 +1,4 @@
|
||||
assignees:
|
||||
- quinton-hoole
|
||||
- nikhiljindal
|
||||
- madhusundancs
|
123
federation/pkg/federation-controller/cluster/cluster_client.go
Normal file
123
federation/pkg/federation-controller/cluster/cluster_client.go
Normal file
@ -0,0 +1,123 @@
|
||||
/*
|
||||
Copyright 2014 The Kubernetes Authors All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package cluster
|
||||
|
||||
import (
|
||||
"net"
|
||||
"strings"
|
||||
|
||||
federation_v1alpha1 "k8s.io/kubernetes/federation/apis/federation/v1alpha1"
|
||||
"k8s.io/kubernetes/pkg/api/unversioned"
|
||||
"k8s.io/kubernetes/pkg/api/v1"
|
||||
"k8s.io/kubernetes/pkg/client/restclient"
|
||||
"k8s.io/kubernetes/pkg/client/typed/discovery"
|
||||
"k8s.io/kubernetes/pkg/client/unversioned/clientcmd"
|
||||
utilnet "k8s.io/kubernetes/pkg/util/net"
|
||||
)
|
||||
|
||||
const (
|
||||
UserAgentName = "Cluster-Controller"
|
||||
KubeAPIQPS = 20.0
|
||||
KubeAPIBurst = 30
|
||||
)
|
||||
|
||||
type ClusterClient struct {
|
||||
discoveryClient *discovery.DiscoveryClient
|
||||
}
|
||||
|
||||
func NewClusterClientSet(c *federation_v1alpha1.Cluster) (*ClusterClient, error) {
|
||||
var serverAddress string
|
||||
hostIP, err := utilnet.ChooseHostInterface()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for _, item := range c.Spec.ServerAddressByClientCIDRs {
|
||||
_, cidrnet, err := net.ParseCIDR(item.ClientCIDR)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
myaddr := net.ParseIP(hostIP.String())
|
||||
if cidrnet.Contains(myaddr) == true {
|
||||
serverAddress = item.ServerAddress
|
||||
break
|
||||
}
|
||||
}
|
||||
var clusterClientSet = ClusterClient{}
|
||||
if serverAddress != "" {
|
||||
clusterConfig, err := clientcmd.BuildConfigFromFlags(serverAddress, "")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
clusterConfig.QPS = KubeAPIQPS
|
||||
clusterConfig.Burst = KubeAPIBurst
|
||||
clusterClientSet.discoveryClient = discovery.NewDiscoveryClientForConfigOrDie((restclient.AddUserAgent(clusterConfig, UserAgentName)))
|
||||
if clusterClientSet.discoveryClient == nil {
|
||||
return nil, nil
|
||||
}
|
||||
}
|
||||
return &clusterClientSet, err
|
||||
}
|
||||
|
||||
// GetClusterHealthStatus gets the kubernetes cluster health status by requesting "/healthz"
|
||||
func (self *ClusterClient) GetClusterHealthStatus() *federation_v1alpha1.ClusterStatus {
|
||||
clusterStatus := federation_v1alpha1.ClusterStatus{}
|
||||
currentTime := unversioned.Now()
|
||||
newClusterReadyCondition := federation_v1alpha1.ClusterCondition{
|
||||
Type: federation_v1alpha1.ClusterReady,
|
||||
Status: v1.ConditionTrue,
|
||||
Reason: "ClusterReady",
|
||||
Message: "/healthz responded with ok",
|
||||
LastProbeTime: currentTime,
|
||||
LastTransitionTime: currentTime,
|
||||
}
|
||||
newClusterNotReadyCondition := federation_v1alpha1.ClusterCondition{
|
||||
Type: federation_v1alpha1.ClusterReady,
|
||||
Status: v1.ConditionFalse,
|
||||
Reason: "ClusterNotReady",
|
||||
Message: "/healthz responded without ok",
|
||||
LastProbeTime: currentTime,
|
||||
LastTransitionTime: currentTime,
|
||||
}
|
||||
newNodeOfflineCondition := federation_v1alpha1.ClusterCondition{
|
||||
Type: federation_v1alpha1.ClusterOffline,
|
||||
Status: v1.ConditionTrue,
|
||||
Reason: "ClusterNotReachable",
|
||||
Message: "cluster is not reachable",
|
||||
LastProbeTime: currentTime,
|
||||
LastTransitionTime: currentTime,
|
||||
}
|
||||
newNodeNotOfflineCondition := federation_v1alpha1.ClusterCondition{
|
||||
Type: federation_v1alpha1.ClusterOffline,
|
||||
Status: v1.ConditionFalse,
|
||||
Reason: "ClusterReachable",
|
||||
Message: "cluster is reachable",
|
||||
LastProbeTime: currentTime,
|
||||
LastTransitionTime: currentTime,
|
||||
}
|
||||
body, err := self.discoveryClient.Get().AbsPath("/healthz").Do().Raw()
|
||||
if err != nil {
|
||||
clusterStatus.Conditions = append(clusterStatus.Conditions, newNodeOfflineCondition)
|
||||
} else {
|
||||
if !strings.EqualFold(string(body), "ok") {
|
||||
clusterStatus.Conditions = append(clusterStatus.Conditions, newClusterNotReadyCondition, newNodeNotOfflineCondition)
|
||||
} else {
|
||||
clusterStatus.Conditions = append(clusterStatus.Conditions, newClusterReadyCondition)
|
||||
}
|
||||
}
|
||||
return &clusterStatus
|
||||
}
|
@ -0,0 +1,196 @@
|
||||
/*
|
||||
Copyright 2014 The Kubernetes Authors All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package cluster
|
||||
|
||||
import (
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/golang/glog"
|
||||
"k8s.io/kubernetes/federation/apis/federation"
|
||||
federation_v1alpha1 "k8s.io/kubernetes/federation/apis/federation/v1alpha1"
|
||||
cluster_cache "k8s.io/kubernetes/federation/client/cache"
|
||||
federationclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_release_1_3"
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/client/cache"
|
||||
"k8s.io/kubernetes/pkg/controller"
|
||||
"k8s.io/kubernetes/pkg/controller/framework"
|
||||
"k8s.io/kubernetes/pkg/runtime"
|
||||
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
|
||||
"k8s.io/kubernetes/pkg/util/sets"
|
||||
"k8s.io/kubernetes/pkg/util/wait"
|
||||
"k8s.io/kubernetes/pkg/watch"
|
||||
)
|
||||
|
||||
type ClusterController struct {
|
||||
knownClusterSet sets.String
|
||||
|
||||
// federationClient used to operate cluster
|
||||
federationClient federationclientset.Interface
|
||||
|
||||
// clusterMonitorPeriod is the period for updating status of cluster
|
||||
clusterMonitorPeriod time.Duration
|
||||
// clusterClusterStatusMap is a mapping of clusterName and cluster status of last sampling
|
||||
clusterClusterStatusMap map[string]federation_v1alpha1.ClusterStatus
|
||||
|
||||
// clusterKubeClientMap is a mapping of clusterName and restclient
|
||||
clusterKubeClientMap map[string]ClusterClient
|
||||
|
||||
// cluster framework and store
|
||||
clusterController *framework.Controller
|
||||
clusterStore cluster_cache.StoreToClusterLister
|
||||
}
|
||||
|
||||
// NewclusterController returns a new cluster controller
|
||||
func NewclusterController(federationClient federationclientset.Interface, clusterMonitorPeriod time.Duration) *ClusterController {
|
||||
cc := &ClusterController{
|
||||
knownClusterSet: make(sets.String),
|
||||
federationClient: federationClient,
|
||||
clusterMonitorPeriod: clusterMonitorPeriod,
|
||||
clusterClusterStatusMap: make(map[string]federation_v1alpha1.ClusterStatus),
|
||||
clusterKubeClientMap: make(map[string]ClusterClient),
|
||||
}
|
||||
cc.clusterStore.Store, cc.clusterController = framework.NewInformer(
|
||||
&cache.ListWatch{
|
||||
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
|
||||
return cc.federationClient.Federation().Clusters().List(options)
|
||||
},
|
||||
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
|
||||
return cc.federationClient.Federation().Clusters().Watch(options)
|
||||
},
|
||||
},
|
||||
&federation.Cluster{},
|
||||
controller.NoResyncPeriodFunc(),
|
||||
framework.ResourceEventHandlerFuncs{
|
||||
DeleteFunc: cc.delFromClusterSet,
|
||||
AddFunc: cc.addToClusterSet,
|
||||
},
|
||||
)
|
||||
return cc
|
||||
}
|
||||
|
||||
// delFromClusterSet delete a cluster from clusterSet and
|
||||
// delete the corresponding restclient from the map clusterKubeClientMap
|
||||
func (cc *ClusterController) delFromClusterSet(obj interface{}) {
|
||||
cluster := obj.(*federation_v1alpha1.Cluster)
|
||||
cc.knownClusterSet.Delete(cluster.Name)
|
||||
delete(cc.clusterKubeClientMap, cluster.Name)
|
||||
}
|
||||
|
||||
// addToClusterSet insert the new cluster to clusterSet and create a corresponding
|
||||
// restclient to map clusterKubeClientMap
|
||||
func (cc *ClusterController) addToClusterSet(obj interface{}) {
|
||||
cluster := obj.(*federation_v1alpha1.Cluster)
|
||||
cc.knownClusterSet.Insert(cluster.Name)
|
||||
// create the restclient of cluster
|
||||
restClient, err := NewClusterClientSet(cluster)
|
||||
if err != nil || restClient == nil {
|
||||
glog.Errorf("Failed to create corresponding restclient of kubernetes cluster: %v", err)
|
||||
return
|
||||
}
|
||||
cc.clusterKubeClientMap[cluster.Name] = *restClient
|
||||
}
|
||||
|
||||
// Run begins watching and syncing.
|
||||
func (cc *ClusterController) Run() {
|
||||
defer utilruntime.HandleCrash()
|
||||
go cc.clusterController.Run(wait.NeverStop)
|
||||
// monitor cluster status periodically, in phase 1 we just get the health state from "/healthz"
|
||||
go wait.Until(func() {
|
||||
if err := cc.UpdateClusterStatus(); err != nil {
|
||||
glog.Errorf("Error monitoring cluster status: %v", err)
|
||||
}
|
||||
}, cc.clusterMonitorPeriod, wait.NeverStop)
|
||||
}
|
||||
|
||||
func (cc *ClusterController) GetClusterStatus(cluster *federation_v1alpha1.Cluster) (*federation_v1alpha1.ClusterStatus, error) {
|
||||
// just get the status of cluster, by requesting the restapi "/healthz"
|
||||
clusterClient, found := cc.clusterKubeClientMap[cluster.Name]
|
||||
if !found {
|
||||
glog.Infof("It's a new cluster, a cluster client will be created")
|
||||
client, err := NewClusterClientSet(cluster)
|
||||
if err != nil || client == nil {
|
||||
glog.Infof("Failed to create cluster client, err: %v", err)
|
||||
return nil, err
|
||||
}
|
||||
clusterClient = *client
|
||||
cc.clusterKubeClientMap[cluster.Name] = clusterClient
|
||||
}
|
||||
clusterStatus := clusterClient.GetClusterHealthStatus()
|
||||
return clusterStatus, nil
|
||||
}
|
||||
|
||||
// UpdateClusterStatus checks cluster status and get the metrics from cluster's restapi
|
||||
func (cc *ClusterController) UpdateClusterStatus() error {
|
||||
clusters, err := cc.federationClient.Federation().Clusters().List(api.ListOptions{})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, cluster := range clusters.Items {
|
||||
if !cc.knownClusterSet.Has(cluster.Name) {
|
||||
glog.V(1).Infof("ClusterController observed a new cluster: %#v", cluster)
|
||||
cc.knownClusterSet.Insert(cluster.Name)
|
||||
}
|
||||
}
|
||||
|
||||
// If there's a difference between lengths of known clusters and observed clusters
|
||||
if len(cc.knownClusterSet) != len(clusters.Items) {
|
||||
observedSet := make(sets.String)
|
||||
for _, cluster := range clusters.Items {
|
||||
observedSet.Insert(cluster.Name)
|
||||
}
|
||||
deleted := cc.knownClusterSet.Difference(observedSet)
|
||||
for clusterName := range deleted {
|
||||
glog.V(1).Infof("ClusterController observed a Cluster deletion: %v", clusterName)
|
||||
cc.knownClusterSet.Delete(clusterName)
|
||||
}
|
||||
}
|
||||
for _, cluster := range clusters.Items {
|
||||
clusterStatusNew, err := cc.GetClusterStatus(&cluster)
|
||||
if err != nil {
|
||||
glog.Infof("Failed to Get the status of cluster: %v", cluster.Name)
|
||||
continue
|
||||
}
|
||||
clusterStatusOld, found := cc.clusterClusterStatusMap[cluster.Name]
|
||||
if !found {
|
||||
glog.Infof("There is no status stored for cluster: %v before", cluster.Name)
|
||||
|
||||
} else {
|
||||
hasTransition := false
|
||||
for i := 0; i < len(clusterStatusNew.Conditions); i++ {
|
||||
if !(strings.EqualFold(string(clusterStatusNew.Conditions[i].Type), string(clusterStatusOld.Conditions[i].Type)) &&
|
||||
strings.EqualFold(string(clusterStatusNew.Conditions[i].Status), string(clusterStatusOld.Conditions[i].Status))) {
|
||||
hasTransition = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !hasTransition {
|
||||
for j := 0; j < len(clusterStatusNew.Conditions); j++ {
|
||||
clusterStatusNew.Conditions[j].LastTransitionTime = clusterStatusOld.Conditions[j].LastTransitionTime
|
||||
}
|
||||
}
|
||||
}
|
||||
cc.clusterClusterStatusMap[cluster.Name] = *clusterStatusNew
|
||||
cluster.Status = *clusterStatusNew
|
||||
cluster, err := cc.federationClient.Federation().Clusters().UpdateStatus(&cluster)
|
||||
if err != nil {
|
||||
glog.Infof("Failed to update the status of cluster: %v ,error is : %v", cluster.Name, err)
|
||||
continue
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
@ -0,0 +1,138 @@
|
||||
/*
|
||||
Copyright 2016 The Kubernetes Authors All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package cluster
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"testing"
|
||||
|
||||
federation_v1alpha1 "k8s.io/kubernetes/federation/apis/federation/v1alpha1"
|
||||
federationclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_release_1_3"
|
||||
"k8s.io/kubernetes/pkg/api/testapi"
|
||||
"k8s.io/kubernetes/pkg/api/unversioned"
|
||||
"k8s.io/kubernetes/pkg/api/v1"
|
||||
"k8s.io/kubernetes/pkg/client/restclient"
|
||||
"k8s.io/kubernetes/pkg/client/unversioned/clientcmd"
|
||||
"k8s.io/kubernetes/pkg/util"
|
||||
)
|
||||
|
||||
func newCluster(clusterName string, serverUrl string) *federation_v1alpha1.Cluster {
|
||||
cluster := federation_v1alpha1.Cluster{
|
||||
TypeMeta: unversioned.TypeMeta{APIVersion: testapi.Federation.GroupVersion().String()},
|
||||
ObjectMeta: v1.ObjectMeta{
|
||||
UID: util.NewUUID(),
|
||||
Name: clusterName,
|
||||
},
|
||||
Spec: federation_v1alpha1.ClusterSpec{
|
||||
ServerAddressByClientCIDRs: []federation_v1alpha1.ServerAddressByClientCIDR{
|
||||
{
|
||||
ClientCIDR: "0.0.0.0/0",
|
||||
ServerAddress: serverUrl,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
return &cluster
|
||||
}
|
||||
|
||||
func newClusterList(cluster *federation_v1alpha1.Cluster) *federation_v1alpha1.ClusterList {
|
||||
clusterList := federation_v1alpha1.ClusterList{
|
||||
TypeMeta: unversioned.TypeMeta{APIVersion: testapi.Federation.GroupVersion().String()},
|
||||
ListMeta: unversioned.ListMeta{
|
||||
SelfLink: "foobar",
|
||||
},
|
||||
Items: []federation_v1alpha1.Cluster{},
|
||||
}
|
||||
clusterList.Items = append(clusterList.Items, *cluster)
|
||||
return &clusterList
|
||||
}
|
||||
|
||||
// init a fake http handler, simulate a federation apiserver, response the "DELETE" "PUT" "GET" "UPDATE"
|
||||
// when "canBeGotten" is false, means that user can not get the cluster cluster from apiserver
|
||||
func createHttptestFakeHandlerForFederation(clusterList *federation_v1alpha1.ClusterList, canBeGotten bool) *http.HandlerFunc {
|
||||
fakeHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
clusterListString, _ := json.Marshal(*clusterList)
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
switch r.Method {
|
||||
case "PUT":
|
||||
fmt.Fprintln(w, string(clusterListString))
|
||||
case "GET":
|
||||
if canBeGotten {
|
||||
fmt.Fprintln(w, string(clusterListString))
|
||||
} else {
|
||||
fmt.Fprintln(w, "")
|
||||
}
|
||||
default:
|
||||
fmt.Fprintln(w, "")
|
||||
}
|
||||
})
|
||||
return &fakeHandler
|
||||
}
|
||||
|
||||
// init a fake http handler, simulate a cluster apiserver, response the "/healthz"
|
||||
// when "canBeGotten" is false, means that user can not get response from apiserver
|
||||
func createHttptestFakeHandlerForCluster(canBeGotten bool) *http.HandlerFunc {
|
||||
fakeHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
switch r.Method {
|
||||
case "GET":
|
||||
if canBeGotten {
|
||||
fmt.Fprintln(w, "ok")
|
||||
} else {
|
||||
w.WriteHeader(http.StatusNotFound)
|
||||
}
|
||||
default:
|
||||
fmt.Fprintln(w, "")
|
||||
}
|
||||
})
|
||||
return &fakeHandler
|
||||
}
|
||||
|
||||
func TestUpdateClusterStatusOK(t *testing.T) {
|
||||
clusterName := "foobarCluster"
|
||||
// create dummy httpserver
|
||||
testClusterServer := httptest.NewServer(createHttptestFakeHandlerForCluster(true))
|
||||
defer testClusterServer.Close()
|
||||
federationCluster := newCluster(clusterName, testClusterServer.URL)
|
||||
federationClusterList := newClusterList(federationCluster)
|
||||
|
||||
testFederationServer := httptest.NewServer(createHttptestFakeHandlerForFederation(federationClusterList, true))
|
||||
defer testFederationServer.Close()
|
||||
|
||||
restClientCfg, err := clientcmd.BuildConfigFromFlags(testFederationServer.URL, "")
|
||||
if err != nil {
|
||||
t.Errorf("Failed to build client config")
|
||||
}
|
||||
federationClientSet := federationclientset.NewForConfigOrDie(restclient.AddUserAgent(restClientCfg, "cluster-controller"))
|
||||
|
||||
manager := NewclusterController(federationClientSet, 5)
|
||||
err = manager.UpdateClusterStatus()
|
||||
if err != nil {
|
||||
t.Errorf("Failed to Update Cluster Status: %v", err)
|
||||
}
|
||||
clusterStatus, found := manager.clusterClusterStatusMap[clusterName]
|
||||
if !found {
|
||||
t.Errorf("Failed to Update Cluster Status")
|
||||
} else {
|
||||
if (clusterStatus.Conditions[1].Status != v1.ConditionFalse) || (clusterStatus.Conditions[1].Type != federation_v1alpha1.ClusterOffline) {
|
||||
t.Errorf("Failed to Update Cluster Status")
|
||||
}
|
||||
}
|
||||
}
|
18
federation/pkg/federation-controller/cluster/doc.go
Normal file
18
federation/pkg/federation-controller/cluster/doc.go
Normal file
@ -0,0 +1,18 @@
|
||||
/*
|
||||
Copyright 2016 The Kubernetes Authors All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
// Package cluster contains code for syncing cluster
|
||||
package cluster
|
19
federation/pkg/federation-controller/doc.go
Normal file
19
federation/pkg/federation-controller/doc.go
Normal file
@ -0,0 +1,19 @@
|
||||
/*
|
||||
Copyright 2015 The Kubernetes Authors All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
// Package federation_controller contains code for controllers (like the cluster
|
||||
// controller).
|
||||
package federation_controller
|
@ -37,6 +37,7 @@ kube::golang::server_targets() {
|
||||
cmd/kubemark
|
||||
cmd/hyperkube
|
||||
federation/cmd/federated-apiserver
|
||||
federation/cmd/federation-controller-manager
|
||||
plugin/cmd/kube-scheduler
|
||||
)
|
||||
if [ -n "${KUBERNETES_CONTRIB:-}" ]; then
|
||||
|
@ -54,6 +54,7 @@ cluster-dns
|
||||
cluster-domain
|
||||
cluster-name
|
||||
cluster-tag
|
||||
cluster-monitor-period
|
||||
concurrent-deployment-syncs
|
||||
concurrent-endpoint-syncs
|
||||
concurrent-namespace-syncs
|
||||
@ -136,6 +137,8 @@ external-ip
|
||||
failover-timeout
|
||||
failure-domains
|
||||
fake-clientset
|
||||
federated-api-burst
|
||||
federated-api-qps
|
||||
file-check-frequency
|
||||
file-suffix
|
||||
file_content_in_loop
|
||||
|
Loading…
Reference in New Issue
Block a user