duplicate the controller manager from kubernetes

This commit is contained in:
huangyuqi 2016-04-21 21:01:47 +08:00
parent 9796900306
commit 227a17f317
4 changed files with 276 additions and 0 deletions

View File

@ -0,0 +1,4 @@
assignees:
- davidopp
- lavalamp
- mikedanese

View File

@ -0,0 +1,122 @@
/*
Copyright 2014 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package app implements a server that runs a set of active
// components. This includes replication controllers, service endpoints and
// nodes.
//
// CAUTION: If you update code in this file, you may need to also update code
// in contrib/mesos/pkg/controllermanager/controllermanager.go
package app
import (
"net"
"net/http"
"net/http/pprof"
"strconv"
"k8s.io/kubernetes/federation/cmd/federated-controller-manager/app/options"
"k8s.io/kubernetes/pkg/client/restclient"
federationclientset "k8s.io/kubernetes/federation/client/clientset_generated/internalclientset"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/client/unversioned/clientcmd"
clustercontroller "k8s.io/kubernetes/federation/pkg/federated-controller/cluster"
"k8s.io/kubernetes/pkg/healthz"
"k8s.io/kubernetes/pkg/util/configz"
"github.com/golang/glog"
"github.com/prometheus/client_golang/prometheus"
"github.com/spf13/cobra"
"github.com/spf13/pflag"
"k8s.io/kubernetes/pkg/util/wait"
)
// NewControllerManagerCommand creates a *cobra.Command object with default parameters
func NewControllerManagerCommand() *cobra.Command {
s := options.NewCMServer()
s.AddFlags(pflag.CommandLine)
cmd := &cobra.Command{
Use: "ube-controller-manager",
Long: `The ubernetes controller manager is a daemon that embeds
the core control loops shipped with ubernetes. In applications of robotics and
automation, a control loop is a non-terminating loop that regulates the state of
the system. In ubernetes, a controller is a control loop that watches the shared
state of the cluster sub-replication constroller through the apiserver and makes
changes attempting to move the current state towards the desired state. Examples
of controllers that ship with ubernetes today are the cluster controller, service
controller.`,
Run: func(cmd *cobra.Command, args []string) {
},
}
return cmd
}
// Run runs the CMServer. This should never exit.
func Run(s *options.CMServer) error {
if c, err := configz.New("componentconfig"); err == nil {
c.Set(s.ControllerManagerConfiguration)
} else {
glog.Errorf("unable to register configz: %s", err)
}
restClientCfg, err := clientcmd.BuildConfigFromFlags(s.Master, s.ApiServerconfig)
if err != nil {
return err
}
// Override restClientCfg qps/burst settings from flags
restClientCfg.QPS = s.UberAPIQPS
restClientCfg.Burst = s.UberAPIBurst
go func() {
mux := http.NewServeMux()
healthz.InstallHandler(mux)
if s.EnableProfiling {
mux.HandleFunc("/debug/pprof/", pprof.Index)
mux.HandleFunc("/debug/pprof/profile", pprof.Profile)
mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
}
mux.Handle("/metrics", prometheus.Handler())
server := &http.Server{
Addr: net.JoinHostPort(s.Address, strconv.Itoa(s.Port)),
Handler: mux,
}
glog.Fatal(server.ListenAndServe())
}()
run := func(stop <-chan struct{}) {
err := StartControllers(s, restClientCfg, stop)
glog.Fatalf("error running controllers: %v", err)
panic("unreachable")
}
run(nil)
panic("unreachable")
}
func StartControllers(s *options.CMServer, restClientCfg *restclient.Config, stop <-chan struct{}) error {
kubernetesClientSet := clientset.NewForConfigOrDie(restclient.AddUserAgent(restClientCfg, "cluster-controller"))
federationClientSet := federationclientset.NewForConfigOrDie(restclient.AddUserAgent(restClientCfg, "cluster-controller"))
go clustercontroller.NewclusterController(
kubernetesClientSet,
federationClientSet,
s.ClusterMonitorPeriod.Duration,
).Run(s.ConcurrentSubRCSyncs, wait.NeverStop)
select {}
}

View File

@ -0,0 +1,98 @@
/*
Copyright 2014 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package options provides the flags used for the controller manager.
//
// CAUTION: If you update code in this file, you may need to also update code
// in contrib/mesos/pkg/controllermanager/controllermanager.go
package options
import (
"time"
"github.com/spf13/pflag"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/apis/componentconfig"
"k8s.io/kubernetes/pkg/client/leaderelection"
)
type ControllerManagerConfiguration struct {
unversioned.TypeMeta
// port is the port that the controller-manager's http service runs on.
Port int `json:"port"`
// address is the IP address to serve on (set to 0.0.0.0 for all interfaces).
Address string `json:"address"`
// concurrentSubRCSyncs is the number of sub replication controllers that are
// allowed to sync concurrently. Larger number = more responsive replica
// management, but more CPU (and network) load.
ConcurrentSubRCSyncs int `json:"concurrentSubRCSyncs"`
// clusterMonitorPeriod is the period for syncing ClusterStatus in cluster controller.
ClusterMonitorPeriod unversioned.Duration `json:"clusterMonitorPeriod"`
// uberAPIQPS is the QPS to use while talking with ubernetes apiserver.
UberAPIQPS float32 `json:"uberAPIQPS"`
// uberAPIBurst is the burst to use while talking with ubernetes apiserver.
UberAPIBurst int `json:"uberAPIBurst"`
// enableProfiling enables profiling via web interface host:port/debug/pprof/
EnableProfiling bool `json:"enableProfiling"`
// leaderElection defines the configuration of leader election client.
LeaderElection componentconfig.LeaderElectionConfiguration `json:"leaderElection"`
}
// CMServer is the main context object for the controller manager.
type CMServer struct {
ControllerManagerConfiguration
Master string
ApiServerconfig string
}
const (
// UberControllerManagerPort is the default port for the ubernetes controller manager status server.
// May be overridden by a flag at startup.
UberControllerManagerPort = 10252
)
// NewCMServer creates a new CMServer with a default config.
func NewCMServer() *CMServer {
s := CMServer{
ControllerManagerConfiguration: ControllerManagerConfiguration{
Port: UberControllerManagerPort,
Address: "0.0.0.0",
ConcurrentSubRCSyncs: 5,
ClusterMonitorPeriod: unversioned.Duration{40 * time.Second},
UberAPIQPS: 20.0,
UberAPIBurst: 30,
LeaderElection: leaderelection.DefaultLeaderElectionConfiguration(),
},
}
return &s
}
// AddFlags adds flags for a specific CMServer to the specified FlagSet
func (s *CMServer) AddFlags(fs *pflag.FlagSet) {
fs.IntVar(&s.Port, "port", s.Port, "The port that the controller-manager's http service runs on")
fs.Var(componentconfig.IPVar{&s.Address}, "address", "The IP address to serve on (set to 0.0.0.0 for all interfaces)")
fs.IntVar(&s.ConcurrentSubRCSyncs, "concurrent-subRc-syncs", s.ConcurrentSubRCSyncs, "The number of subRC syncing operations that will be done concurrently. Larger number = faster endpoint updating, but more CPU (and network) load")
fs.DurationVar(&s.ClusterMonitorPeriod.Duration, "cluster-monitor-period", s.ClusterMonitorPeriod.Duration, "The period for syncing ClusterStatus in ClusterController.")
fs.BoolVar(&s.EnableProfiling, "profiling", true, "Enable profiling via web interface host:port/debug/pprof/")
fs.StringVar(&s.Master, "master", s.Master, "The address of the Kubernetes API server (overrides any value in kubeconfig)")
fs.StringVar(&s.ApiServerconfig, "uberconfig", s.ApiServerconfig, "Path to ApiServerconfig file with authorization and master location information.")
fs.Float32Var(&s.UberAPIQPS, "uber-api-qps", s.UberAPIQPS, "QPS to use while talking with ubernetes apiserver")
fs.IntVar(&s.UberAPIBurst, "uber-api-burst", s.UberAPIBurst, "Burst to use while talking with ubernetes apiserver")
leaderelection.BindFlags(&s.LeaderElection, fs)
}

View File

@ -0,0 +1,52 @@
/*
Copyright 2014 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package main
import (
"fmt"
"os"
"runtime"
"github.com/spf13/pflag"
"k8s.io/kubernetes/federation/cmd/federated-controller-manager/app"
"k8s.io/kubernetes/federation/cmd/federated-controller-manager/app/options"
"k8s.io/kubernetes/pkg/healthz"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/flag"
"k8s.io/kubernetes/pkg/version/verflag"
)
func init() {
healthz.DefaultHealthz()
}
func main() {
runtime.GOMAXPROCS(runtime.NumCPU())
s := options.NewCMServer()
s.AddFlags(pflag.CommandLine)
flag.InitFlags()
util.InitLogs()
defer util.FlushLogs()
verflag.PrintAndExitIfRequested()
if err := app.Run(s); err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
os.Exit(1)
}
}