diff --git a/federation/client/cache/cluster_cache.go b/federation/client/cache/cluster_cache.go new file mode 100644 index 00000000000..10ef095589e --- /dev/null +++ b/federation/client/cache/cluster_cache.go @@ -0,0 +1,35 @@ +/* +Copyright 2016 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cache + +import ( + federation_v1alpha1 "k8s.io/kubernetes/federation/apis/federation/v1alpha1" + kubeCache "k8s.io/kubernetes/pkg/client/cache" +) + +// StoreToClusterLister makes a Store have the List method of the unversioned.ClusterInterface +// The Store must contain (only) clusters. +type StoreToClusterLister struct { + kubeCache.Store +} + +func (s *StoreToClusterLister) List() (clusters federation_v1alpha1.ClusterList, err error) { + for _, m := range s.Store.List() { + clusters.Items = append(clusters.Items, *(m.(*federation_v1alpha1.Cluster))) + } + return clusters, nil +} diff --git a/federation/cmd/federation-controller-manager/OWNERS b/federation/cmd/federation-controller-manager/OWNERS new file mode 100644 index 00000000000..c6b4c5c4f65 --- /dev/null +++ b/federation/cmd/federation-controller-manager/OWNERS @@ -0,0 +1,4 @@ +assignees: + - quinton-hoole + - nikhiljindal + - madhusundancs diff --git a/federation/cmd/federation-controller-manager/app/controllermanager.go b/federation/cmd/federation-controller-manager/app/controllermanager.go new file mode 100644 index 00000000000..582c938ce80 --- /dev/null +++ b/federation/cmd/federation-controller-manager/app/controllermanager.go @@ -0,0 +1,109 @@ +/* +Copyright 2016 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package app implements a server that runs a set of active +// components. This includes cluster controller + +package app + +import ( + "net" + "net/http" + "net/http/pprof" + "strconv" + + "k8s.io/kubernetes/federation/cmd/federation-controller-manager/app/options" + "k8s.io/kubernetes/pkg/client/restclient" + + federationclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_release_1_3" + clustercontroller "k8s.io/kubernetes/federation/pkg/federation-controller/cluster" + "k8s.io/kubernetes/pkg/client/unversioned/clientcmd" + "k8s.io/kubernetes/pkg/healthz" + "k8s.io/kubernetes/pkg/util/configz" + + "github.com/golang/glog" + "github.com/prometheus/client_golang/prometheus" + "github.com/spf13/cobra" + "github.com/spf13/pflag" +) + +// NewControllerManagerCommand creates a *cobra.Command object with default parameters +func NewControllerManagerCommand() *cobra.Command { + s := options.NewCMServer() + s.AddFlags(pflag.CommandLine) + cmd := &cobra.Command{ + Use: "federation-controller-manager", + Long: `The federation controller manager is a daemon that embeds +the core control loops shipped with federation. In applications of robotics and +automation, a control loop is a non-terminating loop that regulates the state of +the system. In federation, a controller is a control loop that watches the shared +state of the federation cluster through the apiserver and makes changes attempting +to move the current state towards the desired state. Examples of controllers that +ship with federation today is the cluster controller.`, + Run: func(cmd *cobra.Command, args []string) { + }, + } + + return cmd +} + +// Run runs the CMServer. This should never exit. +func Run(s *options.CMServer) error { + if c, err := configz.New("componentconfig"); err == nil { + c.Set(s.ControllerManagerConfiguration) + } else { + glog.Errorf("unable to register configz: %s", err) + } + restClientCfg, err := clientcmd.BuildConfigFromFlags(s.Master, s.Kubeconfig) + if err != nil { + return err + } + + // Override restClientCfg qps/burst settings from flags + restClientCfg.QPS = s.APIServerQPS + restClientCfg.Burst = s.APIServerBurst + + go func() { + mux := http.NewServeMux() + healthz.InstallHandler(mux) + if s.EnableProfiling { + mux.HandleFunc("/debug/pprof/", pprof.Index) + mux.HandleFunc("/debug/pprof/profile", pprof.Profile) + mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol) + } + mux.Handle("/metrics", prometheus.Handler()) + + server := &http.Server{ + Addr: net.JoinHostPort(s.Address, strconv.Itoa(s.Port)), + Handler: mux, + } + glog.Fatal(server.ListenAndServe()) + }() + + run := func() { + err := StartControllers(s, restClientCfg) + glog.Fatalf("error running controllers: %v", err) + panic("unreachable") + } + run() + panic("unreachable") +} + +func StartControllers(s *options.CMServer, restClientCfg *restclient.Config) error { + federationClientSet := federationclientset.NewForConfigOrDie(restclient.AddUserAgent(restClientCfg, "cluster-controller")) + go clustercontroller.NewclusterController(federationClientSet, s.ClusterMonitorPeriod.Duration).Run() + select {} +} diff --git a/federation/cmd/federation-controller-manager/app/options/options.go b/federation/cmd/federation-controller-manager/app/options/options.go new file mode 100644 index 00000000000..9a6e8735f9f --- /dev/null +++ b/federation/cmd/federation-controller-manager/app/options/options.go @@ -0,0 +1,89 @@ +/* +Copyright 2016 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package options provides the flags used for the controller manager. + +package options + +import ( + "time" + + "github.com/spf13/pflag" + "k8s.io/kubernetes/pkg/api/unversioned" + "k8s.io/kubernetes/pkg/apis/componentconfig" + "k8s.io/kubernetes/pkg/client/leaderelection" +) + +type ControllerManagerConfiguration struct { + // port is the port that the controller-manager's http service runs on. + Port int `json:"port"` + // address is the IP address to serve on (set to 0.0.0.0 for all interfaces). + Address string `json:"address"` + // clusterMonitorPeriod is the period for syncing ClusterStatus in cluster controller. + ClusterMonitorPeriod unversioned.Duration `json:"clusterMonitorPeriod"` + // APIServerQPS is the QPS to use while talking with federation apiserver. + APIServerQPS float32 `json:"federatedAPIQPS"` + // APIServerBurst is the burst to use while talking with federation apiserver. + APIServerBurst int `json:"federatedAPIBurst"` + // enableProfiling enables profiling via web interface host:port/debug/pprof/ + EnableProfiling bool `json:"enableProfiling"` + // leaderElection defines the configuration of leader election client. + LeaderElection componentconfig.LeaderElectionConfiguration `json:"leaderElection"` + // contentType is contentType of requests sent to apiserver. + ContentType string `json:"contentType"` +} + +// CMServer is the main context object for the controller manager. +type CMServer struct { + ControllerManagerConfiguration + Master string + Kubeconfig string +} + +const ( + // FederatedControllerManagerPort is the default port for the federation controller manager status server. + // May be overridden by a flag at startup. + FederatedControllerManagerPort = 10253 +) + +// NewCMServer creates a new CMServer with a default config. +func NewCMServer() *CMServer { + s := CMServer{ + ControllerManagerConfiguration: ControllerManagerConfiguration{ + Port: FederatedControllerManagerPort, + Address: "0.0.0.0", + ClusterMonitorPeriod: unversioned.Duration{Duration: 40 * time.Second}, + APIServerQPS: 20.0, + APIServerBurst: 30, + LeaderElection: leaderelection.DefaultLeaderElectionConfiguration(), + }, + } + return &s +} + +// AddFlags adds flags for a specific CMServer to the specified FlagSet +func (s *CMServer) AddFlags(fs *pflag.FlagSet) { + fs.IntVar(&s.Port, "port", s.Port, "The port that the controller-manager's http service runs on") + fs.Var(componentconfig.IPVar{Val: &s.Address}, "address", "The IP address to serve on (set to 0.0.0.0 for all interfaces)") + fs.DurationVar(&s.ClusterMonitorPeriod.Duration, "cluster-monitor-period", s.ClusterMonitorPeriod.Duration, "The period for syncing ClusterStatus in ClusterController.") + fs.BoolVar(&s.EnableProfiling, "profiling", true, "Enable profiling via web interface host:port/debug/pprof/") + fs.StringVar(&s.Master, "master", s.Master, "The address of the federation API server (overrides any value in kubeconfig)") + fs.StringVar(&s.Kubeconfig, "kubeconfig", s.Kubeconfig, "Path to kubeconfig file with authorization and master location information.") + fs.StringVar(&s.ContentType, "kube-api-content-type", s.ContentType, "ContentType of requests sent to apiserver. Passing application/vnd.kubernetes.protobuf is an experimental feature now.") + fs.Float32Var(&s.APIServerQPS, "federated-api-qps", s.APIServerQPS, "QPS to use while talking with federation apiserver") + fs.IntVar(&s.APIServerBurst, "federated-api-burst", s.APIServerBurst, "Burst to use while talking with federation apiserver") + leaderelection.BindFlags(&s.LeaderElection, fs) +} diff --git a/federation/cmd/federation-controller-manager/controller-manager.go b/federation/cmd/federation-controller-manager/controller-manager.go new file mode 100644 index 00000000000..56a7c0c9715 --- /dev/null +++ b/federation/cmd/federation-controller-manager/controller-manager.go @@ -0,0 +1,52 @@ +/* +Copyright 2014 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +import ( + "fmt" + "os" + "runtime" + + "github.com/spf13/pflag" + "k8s.io/kubernetes/federation/cmd/federation-controller-manager/app" + "k8s.io/kubernetes/federation/cmd/federation-controller-manager/app/options" + "k8s.io/kubernetes/pkg/healthz" + "k8s.io/kubernetes/pkg/util" + "k8s.io/kubernetes/pkg/util/flag" + "k8s.io/kubernetes/pkg/version/verflag" +) + +func init() { + healthz.DefaultHealthz() +} + +func main() { + runtime.GOMAXPROCS(runtime.NumCPU()) + s := options.NewCMServer() + s.AddFlags(pflag.CommandLine) + + flag.InitFlags() + util.InitLogs() + defer util.FlushLogs() + + verflag.PrintAndExitIfRequested() + + if err := app.Run(s); err != nil { + fmt.Fprintf(os.Stderr, "%v\n", err) + os.Exit(1) + } +} diff --git a/federation/pkg/federation-controller/OWNERS b/federation/pkg/federation-controller/OWNERS new file mode 100644 index 00000000000..c6b4c5c4f65 --- /dev/null +++ b/federation/pkg/federation-controller/OWNERS @@ -0,0 +1,4 @@ +assignees: + - quinton-hoole + - nikhiljindal + - madhusundancs diff --git a/federation/pkg/federation-controller/cluster/cluster_client.go b/federation/pkg/federation-controller/cluster/cluster_client.go new file mode 100644 index 00000000000..0864718b772 --- /dev/null +++ b/federation/pkg/federation-controller/cluster/cluster_client.go @@ -0,0 +1,123 @@ +/* +Copyright 2014 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cluster + +import ( + "net" + "strings" + + federation_v1alpha1 "k8s.io/kubernetes/federation/apis/federation/v1alpha1" + "k8s.io/kubernetes/pkg/api/unversioned" + "k8s.io/kubernetes/pkg/api/v1" + "k8s.io/kubernetes/pkg/client/restclient" + "k8s.io/kubernetes/pkg/client/typed/discovery" + "k8s.io/kubernetes/pkg/client/unversioned/clientcmd" + utilnet "k8s.io/kubernetes/pkg/util/net" +) + +const ( + UserAgentName = "Cluster-Controller" + KubeAPIQPS = 20.0 + KubeAPIBurst = 30 +) + +type ClusterClient struct { + discoveryClient *discovery.DiscoveryClient +} + +func NewClusterClientSet(c *federation_v1alpha1.Cluster) (*ClusterClient, error) { + var serverAddress string + hostIP, err := utilnet.ChooseHostInterface() + if err != nil { + return nil, err + } + + for _, item := range c.Spec.ServerAddressByClientCIDRs { + _, cidrnet, err := net.ParseCIDR(item.ClientCIDR) + if err != nil { + return nil, err + } + myaddr := net.ParseIP(hostIP.String()) + if cidrnet.Contains(myaddr) == true { + serverAddress = item.ServerAddress + break + } + } + var clusterClientSet = ClusterClient{} + if serverAddress != "" { + clusterConfig, err := clientcmd.BuildConfigFromFlags(serverAddress, "") + if err != nil { + return nil, err + } + clusterConfig.QPS = KubeAPIQPS + clusterConfig.Burst = KubeAPIBurst + clusterClientSet.discoveryClient = discovery.NewDiscoveryClientForConfigOrDie((restclient.AddUserAgent(clusterConfig, UserAgentName))) + if clusterClientSet.discoveryClient == nil { + return nil, nil + } + } + return &clusterClientSet, err +} + +// GetClusterHealthStatus gets the kubernetes cluster health status by requesting "/healthz" +func (self *ClusterClient) GetClusterHealthStatus() *federation_v1alpha1.ClusterStatus { + clusterStatus := federation_v1alpha1.ClusterStatus{} + currentTime := unversioned.Now() + newClusterReadyCondition := federation_v1alpha1.ClusterCondition{ + Type: federation_v1alpha1.ClusterReady, + Status: v1.ConditionTrue, + Reason: "ClusterReady", + Message: "/healthz responded with ok", + LastProbeTime: currentTime, + LastTransitionTime: currentTime, + } + newClusterNotReadyCondition := federation_v1alpha1.ClusterCondition{ + Type: federation_v1alpha1.ClusterReady, + Status: v1.ConditionFalse, + Reason: "ClusterNotReady", + Message: "/healthz responded without ok", + LastProbeTime: currentTime, + LastTransitionTime: currentTime, + } + newNodeOfflineCondition := federation_v1alpha1.ClusterCondition{ + Type: federation_v1alpha1.ClusterOffline, + Status: v1.ConditionTrue, + Reason: "ClusterNotReachable", + Message: "cluster is not reachable", + LastProbeTime: currentTime, + LastTransitionTime: currentTime, + } + newNodeNotOfflineCondition := federation_v1alpha1.ClusterCondition{ + Type: federation_v1alpha1.ClusterOffline, + Status: v1.ConditionFalse, + Reason: "ClusterReachable", + Message: "cluster is reachable", + LastProbeTime: currentTime, + LastTransitionTime: currentTime, + } + body, err := self.discoveryClient.Get().AbsPath("/healthz").Do().Raw() + if err != nil { + clusterStatus.Conditions = append(clusterStatus.Conditions, newNodeOfflineCondition) + } else { + if !strings.EqualFold(string(body), "ok") { + clusterStatus.Conditions = append(clusterStatus.Conditions, newClusterNotReadyCondition, newNodeNotOfflineCondition) + } else { + clusterStatus.Conditions = append(clusterStatus.Conditions, newClusterReadyCondition) + } + } + return &clusterStatus +} diff --git a/federation/pkg/federation-controller/cluster/clustercontroller.go b/federation/pkg/federation-controller/cluster/clustercontroller.go new file mode 100644 index 00000000000..3b1ae48e9ac --- /dev/null +++ b/federation/pkg/federation-controller/cluster/clustercontroller.go @@ -0,0 +1,196 @@ +/* +Copyright 2014 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cluster + +import ( + "strings" + "time" + + "github.com/golang/glog" + "k8s.io/kubernetes/federation/apis/federation" + federation_v1alpha1 "k8s.io/kubernetes/federation/apis/federation/v1alpha1" + cluster_cache "k8s.io/kubernetes/federation/client/cache" + federationclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_release_1_3" + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/client/cache" + "k8s.io/kubernetes/pkg/controller" + "k8s.io/kubernetes/pkg/controller/framework" + "k8s.io/kubernetes/pkg/runtime" + utilruntime "k8s.io/kubernetes/pkg/util/runtime" + "k8s.io/kubernetes/pkg/util/sets" + "k8s.io/kubernetes/pkg/util/wait" + "k8s.io/kubernetes/pkg/watch" +) + +type ClusterController struct { + knownClusterSet sets.String + + // federationClient used to operate cluster + federationClient federationclientset.Interface + + // clusterMonitorPeriod is the period for updating status of cluster + clusterMonitorPeriod time.Duration + // clusterClusterStatusMap is a mapping of clusterName and cluster status of last sampling + clusterClusterStatusMap map[string]federation_v1alpha1.ClusterStatus + + // clusterKubeClientMap is a mapping of clusterName and restclient + clusterKubeClientMap map[string]ClusterClient + + // cluster framework and store + clusterController *framework.Controller + clusterStore cluster_cache.StoreToClusterLister +} + +// NewclusterController returns a new cluster controller +func NewclusterController(federationClient federationclientset.Interface, clusterMonitorPeriod time.Duration) *ClusterController { + cc := &ClusterController{ + knownClusterSet: make(sets.String), + federationClient: federationClient, + clusterMonitorPeriod: clusterMonitorPeriod, + clusterClusterStatusMap: make(map[string]federation_v1alpha1.ClusterStatus), + clusterKubeClientMap: make(map[string]ClusterClient), + } + cc.clusterStore.Store, cc.clusterController = framework.NewInformer( + &cache.ListWatch{ + ListFunc: func(options api.ListOptions) (runtime.Object, error) { + return cc.federationClient.Federation().Clusters().List(options) + }, + WatchFunc: func(options api.ListOptions) (watch.Interface, error) { + return cc.federationClient.Federation().Clusters().Watch(options) + }, + }, + &federation.Cluster{}, + controller.NoResyncPeriodFunc(), + framework.ResourceEventHandlerFuncs{ + DeleteFunc: cc.delFromClusterSet, + AddFunc: cc.addToClusterSet, + }, + ) + return cc +} + +// delFromClusterSet delete a cluster from clusterSet and +// delete the corresponding restclient from the map clusterKubeClientMap +func (cc *ClusterController) delFromClusterSet(obj interface{}) { + cluster := obj.(*federation_v1alpha1.Cluster) + cc.knownClusterSet.Delete(cluster.Name) + delete(cc.clusterKubeClientMap, cluster.Name) +} + +// addToClusterSet insert the new cluster to clusterSet and create a corresponding +// restclient to map clusterKubeClientMap +func (cc *ClusterController) addToClusterSet(obj interface{}) { + cluster := obj.(*federation_v1alpha1.Cluster) + cc.knownClusterSet.Insert(cluster.Name) + // create the restclient of cluster + restClient, err := NewClusterClientSet(cluster) + if err != nil || restClient == nil { + glog.Errorf("Failed to create corresponding restclient of kubernetes cluster: %v", err) + return + } + cc.clusterKubeClientMap[cluster.Name] = *restClient +} + +// Run begins watching and syncing. +func (cc *ClusterController) Run() { + defer utilruntime.HandleCrash() + go cc.clusterController.Run(wait.NeverStop) + // monitor cluster status periodically, in phase 1 we just get the health state from "/healthz" + go wait.Until(func() { + if err := cc.UpdateClusterStatus(); err != nil { + glog.Errorf("Error monitoring cluster status: %v", err) + } + }, cc.clusterMonitorPeriod, wait.NeverStop) +} + +func (cc *ClusterController) GetClusterStatus(cluster *federation_v1alpha1.Cluster) (*federation_v1alpha1.ClusterStatus, error) { + // just get the status of cluster, by requesting the restapi "/healthz" + clusterClient, found := cc.clusterKubeClientMap[cluster.Name] + if !found { + glog.Infof("It's a new cluster, a cluster client will be created") + client, err := NewClusterClientSet(cluster) + if err != nil || client == nil { + glog.Infof("Failed to create cluster client, err: %v", err) + return nil, err + } + clusterClient = *client + cc.clusterKubeClientMap[cluster.Name] = clusterClient + } + clusterStatus := clusterClient.GetClusterHealthStatus() + return clusterStatus, nil +} + +// UpdateClusterStatus checks cluster status and get the metrics from cluster's restapi +func (cc *ClusterController) UpdateClusterStatus() error { + clusters, err := cc.federationClient.Federation().Clusters().List(api.ListOptions{}) + if err != nil { + return err + } + for _, cluster := range clusters.Items { + if !cc.knownClusterSet.Has(cluster.Name) { + glog.V(1).Infof("ClusterController observed a new cluster: %#v", cluster) + cc.knownClusterSet.Insert(cluster.Name) + } + } + + // If there's a difference between lengths of known clusters and observed clusters + if len(cc.knownClusterSet) != len(clusters.Items) { + observedSet := make(sets.String) + for _, cluster := range clusters.Items { + observedSet.Insert(cluster.Name) + } + deleted := cc.knownClusterSet.Difference(observedSet) + for clusterName := range deleted { + glog.V(1).Infof("ClusterController observed a Cluster deletion: %v", clusterName) + cc.knownClusterSet.Delete(clusterName) + } + } + for _, cluster := range clusters.Items { + clusterStatusNew, err := cc.GetClusterStatus(&cluster) + if err != nil { + glog.Infof("Failed to Get the status of cluster: %v", cluster.Name) + continue + } + clusterStatusOld, found := cc.clusterClusterStatusMap[cluster.Name] + if !found { + glog.Infof("There is no status stored for cluster: %v before", cluster.Name) + + } else { + hasTransition := false + for i := 0; i < len(clusterStatusNew.Conditions); i++ { + if !(strings.EqualFold(string(clusterStatusNew.Conditions[i].Type), string(clusterStatusOld.Conditions[i].Type)) && + strings.EqualFold(string(clusterStatusNew.Conditions[i].Status), string(clusterStatusOld.Conditions[i].Status))) { + hasTransition = true + break + } + } + if !hasTransition { + for j := 0; j < len(clusterStatusNew.Conditions); j++ { + clusterStatusNew.Conditions[j].LastTransitionTime = clusterStatusOld.Conditions[j].LastTransitionTime + } + } + } + cc.clusterClusterStatusMap[cluster.Name] = *clusterStatusNew + cluster.Status = *clusterStatusNew + cluster, err := cc.federationClient.Federation().Clusters().UpdateStatus(&cluster) + if err != nil { + glog.Infof("Failed to update the status of cluster: %v ,error is : %v", cluster.Name, err) + continue + } + } + return nil +} diff --git a/federation/pkg/federation-controller/cluster/clustercontroller_test.go b/federation/pkg/federation-controller/cluster/clustercontroller_test.go new file mode 100644 index 00000000000..13491dccd40 --- /dev/null +++ b/federation/pkg/federation-controller/cluster/clustercontroller_test.go @@ -0,0 +1,138 @@ +/* +Copyright 2016 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cluster + +import ( + "encoding/json" + "fmt" + "net/http" + "net/http/httptest" + "testing" + + federation_v1alpha1 "k8s.io/kubernetes/federation/apis/federation/v1alpha1" + federationclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_release_1_3" + "k8s.io/kubernetes/pkg/api/testapi" + "k8s.io/kubernetes/pkg/api/unversioned" + "k8s.io/kubernetes/pkg/api/v1" + "k8s.io/kubernetes/pkg/client/restclient" + "k8s.io/kubernetes/pkg/client/unversioned/clientcmd" + "k8s.io/kubernetes/pkg/util" +) + +func newCluster(clusterName string, serverUrl string) *federation_v1alpha1.Cluster { + cluster := federation_v1alpha1.Cluster{ + TypeMeta: unversioned.TypeMeta{APIVersion: testapi.Federation.GroupVersion().String()}, + ObjectMeta: v1.ObjectMeta{ + UID: util.NewUUID(), + Name: clusterName, + }, + Spec: federation_v1alpha1.ClusterSpec{ + ServerAddressByClientCIDRs: []federation_v1alpha1.ServerAddressByClientCIDR{ + { + ClientCIDR: "0.0.0.0/0", + ServerAddress: serverUrl, + }, + }, + }, + } + return &cluster +} + +func newClusterList(cluster *federation_v1alpha1.Cluster) *federation_v1alpha1.ClusterList { + clusterList := federation_v1alpha1.ClusterList{ + TypeMeta: unversioned.TypeMeta{APIVersion: testapi.Federation.GroupVersion().String()}, + ListMeta: unversioned.ListMeta{ + SelfLink: "foobar", + }, + Items: []federation_v1alpha1.Cluster{}, + } + clusterList.Items = append(clusterList.Items, *cluster) + return &clusterList +} + +// init a fake http handler, simulate a federation apiserver, response the "DELETE" "PUT" "GET" "UPDATE" +// when "canBeGotten" is false, means that user can not get the cluster cluster from apiserver +func createHttptestFakeHandlerForFederation(clusterList *federation_v1alpha1.ClusterList, canBeGotten bool) *http.HandlerFunc { + fakeHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + clusterListString, _ := json.Marshal(*clusterList) + w.Header().Set("Content-Type", "application/json") + switch r.Method { + case "PUT": + fmt.Fprintln(w, string(clusterListString)) + case "GET": + if canBeGotten { + fmt.Fprintln(w, string(clusterListString)) + } else { + fmt.Fprintln(w, "") + } + default: + fmt.Fprintln(w, "") + } + }) + return &fakeHandler +} + +// init a fake http handler, simulate a cluster apiserver, response the "/healthz" +// when "canBeGotten" is false, means that user can not get response from apiserver +func createHttptestFakeHandlerForCluster(canBeGotten bool) *http.HandlerFunc { + fakeHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + switch r.Method { + case "GET": + if canBeGotten { + fmt.Fprintln(w, "ok") + } else { + w.WriteHeader(http.StatusNotFound) + } + default: + fmt.Fprintln(w, "") + } + }) + return &fakeHandler +} + +func TestUpdateClusterStatusOK(t *testing.T) { + clusterName := "foobarCluster" + // create dummy httpserver + testClusterServer := httptest.NewServer(createHttptestFakeHandlerForCluster(true)) + defer testClusterServer.Close() + federationCluster := newCluster(clusterName, testClusterServer.URL) + federationClusterList := newClusterList(federationCluster) + + testFederationServer := httptest.NewServer(createHttptestFakeHandlerForFederation(federationClusterList, true)) + defer testFederationServer.Close() + + restClientCfg, err := clientcmd.BuildConfigFromFlags(testFederationServer.URL, "") + if err != nil { + t.Errorf("Failed to build client config") + } + federationClientSet := federationclientset.NewForConfigOrDie(restclient.AddUserAgent(restClientCfg, "cluster-controller")) + + manager := NewclusterController(federationClientSet, 5) + err = manager.UpdateClusterStatus() + if err != nil { + t.Errorf("Failed to Update Cluster Status: %v", err) + } + clusterStatus, found := manager.clusterClusterStatusMap[clusterName] + if !found { + t.Errorf("Failed to Update Cluster Status") + } else { + if (clusterStatus.Conditions[1].Status != v1.ConditionFalse) || (clusterStatus.Conditions[1].Type != federation_v1alpha1.ClusterOffline) { + t.Errorf("Failed to Update Cluster Status") + } + } +} diff --git a/federation/pkg/federation-controller/cluster/doc.go b/federation/pkg/federation-controller/cluster/doc.go new file mode 100644 index 00000000000..0815e8418a1 --- /dev/null +++ b/federation/pkg/federation-controller/cluster/doc.go @@ -0,0 +1,18 @@ +/* +Copyright 2016 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package cluster contains code for syncing cluster +package cluster diff --git a/federation/pkg/federation-controller/doc.go b/federation/pkg/federation-controller/doc.go new file mode 100644 index 00000000000..2d806482ed4 --- /dev/null +++ b/federation/pkg/federation-controller/doc.go @@ -0,0 +1,19 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package federation_controller contains code for controllers (like the cluster +// controller). +package federation_controller diff --git a/hack/lib/golang.sh b/hack/lib/golang.sh index 95e1ce8c790..4f5c4b7f52d 100755 --- a/hack/lib/golang.sh +++ b/hack/lib/golang.sh @@ -37,6 +37,7 @@ kube::golang::server_targets() { cmd/kubemark cmd/hyperkube federation/cmd/federated-apiserver + federation/cmd/federation-controller-manager plugin/cmd/kube-scheduler ) if [ -n "${KUBERNETES_CONTRIB:-}" ]; then diff --git a/hack/verify-flags/known-flags.txt b/hack/verify-flags/known-flags.txt index 6befaeb17d1..3010d263fb2 100644 --- a/hack/verify-flags/known-flags.txt +++ b/hack/verify-flags/known-flags.txt @@ -54,6 +54,7 @@ cluster-dns cluster-domain cluster-name cluster-tag +cluster-monitor-period concurrent-deployment-syncs concurrent-endpoint-syncs concurrent-namespace-syncs @@ -136,6 +137,8 @@ external-ip failover-timeout failure-domains fake-clientset +federated-api-burst +federated-api-qps file-check-frequency file-suffix file_content_in_loop