Add leader election support for controller-manager

This commit is contained in:
shashidharatd 2017-05-18 16:26:27 +05:30
parent c653437377
commit 716156348d
3 changed files with 112 additions and 7 deletions

View File

@ -154,4 +154,8 @@ const (
// FederationClusterSelectorAnnotation is used to determine placement of objects on federated clusters
FederationClusterSelectorAnnotation string = "federation.alpha.kubernetes.io/cluster-selector"
// FederationOnlyClusterSelector is the cluster selector to indicate any object in
// federation having this annotation should not be synced to federated clusters.
FederationOnlyClusterSelector string = "federation.kubernetes.io/federation-control-plane=true"
)

View File

@ -23,15 +23,24 @@ import (
"net"
"net/http"
"net/http/pprof"
"os"
goruntime "runtime"
"strconv"
"time"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/server/healthz"
utilflag "k8s.io/apiserver/pkg/util/flag"
"k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/leaderelection/resourcelock"
"k8s.io/client-go/tools/record"
federationapi "k8s.io/kubernetes/federation/apis/federation/v1beta1"
federationclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset"
"k8s.io/kubernetes/federation/cmd/federation-controller-manager/app/options"
"k8s.io/kubernetes/federation/pkg/federatedtypes"
@ -41,6 +50,8 @@ import (
servicecontroller "k8s.io/kubernetes/federation/pkg/federation-controller/service"
servicednscontroller "k8s.io/kubernetes/federation/pkg/federation-controller/service/dns"
synccontroller "k8s.io/kubernetes/federation/pkg/federation-controller/sync"
"k8s.io/kubernetes/federation/pkg/federation-controller/util/eventsink"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/util/configz"
"k8s.io/kubernetes/pkg/version"
@ -52,6 +63,11 @@ import (
"k8s.io/client-go/discovery"
)
const (
apiserverWaitTimeout = 2 * time.Minute
apiserverRetryInterval = 2 * time.Second
)
// NewControllerManagerCommand creates a *cobra.Command object with default parameters
func NewControllerManagerCommand() *cobra.Command {
s := options.NewCMServer()
@ -112,17 +128,68 @@ func Run(s *options.CMServer) error {
glog.Fatal(server.ListenAndServe())
}()
run := func() {
err := StartControllers(s, restClientCfg)
federationClientset, err := federationclientset.NewForConfig(restclient.AddUserAgent(restClientCfg, "federation-controller-manager"))
if err != nil {
glog.Fatalf("Invalid API configuration: %v", err)
}
run := func(stop <-chan struct{}) {
err := StartControllers(s, restClientCfg, stop)
glog.Fatalf("error running controllers: %v", err)
panic("unreachable")
}
run()
if !s.LeaderElection.LeaderElect {
run(nil)
// unreachable
}
if err := ensureFederationNamespace(federationClientset, s.FederationOnlyNamespace); err != nil {
glog.Fatalf("Failed to ensure federation only namespace %s: %v", s.FederationOnlyNamespace, err)
}
leaderElectionClient := kubernetes.NewForConfigOrDie(restclient.AddUserAgent(restClientCfg, "leader-election"))
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(glog.Infof)
eventBroadcaster.StartRecordingToSink(eventsink.NewFederatedEventSink(federationClientset))
recorder := eventBroadcaster.NewRecorder(api.Scheme, v1.EventSource{Component: "controller-manager"})
id, err := os.Hostname()
if err != nil {
return err
}
rl := resourcelock.ConfigMapLock{
ConfigMapMeta: metav1.ObjectMeta{
Namespace: s.FederationOnlyNamespace,
Name: "federation-controller-manager-leader-election",
Annotations: map[string]string{
federationapi.FederationClusterSelectorAnnotation: federationapi.FederationOnlyClusterSelector,
}},
Client: leaderElectionClient.CoreV1(),
LockConfig: resourcelock.ResourceLockConfig{
Identity: id,
EventRecorder: recorder,
},
}
leaderelection.RunOrDie(leaderelection.LeaderElectionConfig{
Lock: &rl,
LeaseDuration: s.LeaderElection.LeaseDuration.Duration,
RenewDeadline: s.LeaderElection.RenewDeadline.Duration,
RetryPeriod: s.LeaderElection.RetryPeriod.Duration,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: run,
OnStoppedLeading: func() {
glog.Fatalf("leaderelection lost")
},
},
})
panic("unreachable")
}
func StartControllers(s *options.CMServer, restClientCfg *restclient.Config) error {
stopChan := wait.NeverStop
func StartControllers(s *options.CMServer, restClientCfg *restclient.Config, stopChan <-chan struct{}) error {
minimizeLatency := false
discoveryClient := discovery.NewDiscoveryClientForConfigOrDie(restClientCfg)
@ -147,7 +214,7 @@ func StartControllers(s *options.CMServer, restClientCfg *restclient.Config) err
glog.V(3).Infof("Loading client config for service controller %q", servicecontroller.UserAgentName)
scClientset := federationclientset.NewForConfigOrDie(restclient.AddUserAgent(restClientCfg, servicecontroller.UserAgentName))
serviceController := servicecontroller.New(scClientset)
go serviceController.Run(s.ConcurrentServiceSyncs, wait.NeverStop)
go serviceController.Run(s.ConcurrentServiceSyncs, stopChan)
}
adapterSpecificArgs := make(map[string]interface{})
@ -171,7 +238,7 @@ func StartControllers(s *options.CMServer, restClientCfg *restclient.Config) err
ingClientset := federationclientset.NewForConfigOrDie(restclient.AddUserAgent(restClientCfg, ingresscontroller.UserAgentName))
ingressController := ingresscontroller.NewIngressController(ingClientset)
glog.V(3).Infof("Running ingress controller")
ingressController.Run(wait.NeverStop)
ingressController.Run(stopChan)
}
select {}
@ -219,3 +286,33 @@ func hasRequiredResources(serverResources []*metav1.APIResourceList, requiredRes
}
return true
}
func ensureFederationNamespace(clientset *federationclientset.Clientset, namespace string) error {
ns := v1.Namespace{
ObjectMeta: metav1.ObjectMeta{
Name: namespace,
Annotations: map[string]string{
federationapi.FederationClusterSelectorAnnotation: federationapi.FederationOnlyClusterSelector,
},
},
}
// Probably this is the first operation by controller manager on api server. So retry the operation
// until timeout to handle scenario where api server is not yet ready.
err := wait.PollImmediate(apiserverRetryInterval, apiserverWaitTimeout, func() (bool, error) {
var err error
_, err = clientset.CoreV1().Namespaces().Get(namespace, metav1.GetOptions{})
if err != nil {
if !errors.IsNotFound(err) {
glog.V(2).Infof("Failed to get namespace %s: %v", namespace, err)
return false, nil
}
_, err := clientset.CoreV1().Namespaces().Create(&ns)
if err != nil {
glog.V(2).Infof("Failed to create namespace %s: %v", namespace, err)
return false, nil
}
}
return true, nil
})
return err
}

View File

@ -83,6 +83,8 @@ type ControllerManagerConfiguration struct {
// by cluster local hpas on local replicas, but too low a value can result in thrashing.
// Higher values will result in slower response to scalibility conditions on local replicas.
HpaScaleForbiddenWindow metav1.Duration `json:"HpaScaleForbiddenWindow"`
// pre-configured namespace name that would be created only in federation control plane
FederationOnlyNamespace string `json:"federationOnlyNamespaceName"`
}
// CMServer is the main context object for the controller manager.
@ -113,6 +115,7 @@ func NewCMServer() *CMServer {
LeaderElection: leaderelectionconfig.DefaultLeaderElectionConfiguration(),
Controllers: make(utilflag.ConfigurationMap),
HpaScaleForbiddenWindow: metav1.Duration{Duration: 2 * time.Minute},
FederationOnlyNamespace: "federation-only",
},
}
return &s
@ -144,5 +147,6 @@ func (s *CMServer) AddFlags(fs *pflag.FlagSet) {
"A set of key=value pairs that describe controller configuration "+
"to enable/disable specific controllers. Key should be the resource name (like services) and value should be true or false. "+
"For example: services=false,ingresses=false")
fs.StringVar(&s.FederationOnlyNamespace, "federation-only-namespace", s.FederationOnlyNamespace, "Name of the namespace that would be created only in federation control plane.")
leaderelectionconfig.BindFlags(&s.LeaderElection, fs)
}