Fixup cmd/*controller-manager code after struct changes. Co-authored by @stewart-yu

This commit is contained in:
Lucas Käldström
2018-09-02 14:10:46 +03:00
parent 3187f2221a
commit 8aaa527d35
11 changed files with 234 additions and 231 deletions

View File

@@ -102,7 +102,7 @@ the cloud specific control loops shipped with Kubernetes.`,
// Run runs the ExternalCMServer. This should never exit.
func Run(c *cloudcontrollerconfig.CompletedConfig, stopCh <-chan struct{}) error {
cloud, err := cloudprovider.InitCloudProvider(c.ComponentConfig.CloudProvider.Name, c.ComponentConfig.CloudProvider.CloudConfigFile)
cloud, err := cloudprovider.InitCloudProvider(c.ComponentConfig.KubeCloudShared.CloudProvider.Name, c.ComponentConfig.KubeCloudShared.CloudProvider.CloudConfigFile)
if err != nil {
glog.Fatalf("Cloud provider could not be initialized: %v", err)
}
@@ -127,14 +127,14 @@ func Run(c *cloudcontrollerconfig.CompletedConfig, stopCh <-chan struct{}) error
// Start the controller manager HTTP server
if c.SecureServing != nil {
unsecuredMux := genericcontrollermanager.NewBaseHandler(&c.ComponentConfig.Debugging)
unsecuredMux := genericcontrollermanager.NewBaseHandler(&c.ComponentConfig.Generic.Debugging)
handler := genericcontrollermanager.BuildHandlerChain(unsecuredMux, &c.Authorization, &c.Authentication)
if err := c.SecureServing.Serve(handler, 0, stopCh); err != nil {
return err
}
}
if c.InsecureServing != nil {
unsecuredMux := genericcontrollermanager.NewBaseHandler(&c.ComponentConfig.Debugging)
unsecuredMux := genericcontrollermanager.NewBaseHandler(&c.ComponentConfig.Generic.Debugging)
insecureSuperuserAuthn := server.AuthenticationInfo{Authenticator: &server.InsecureSuperuser{}}
handler := genericcontrollermanager.BuildHandlerChain(unsecuredMux, nil, &insecureSuperuserAuthn)
if err := c.InsecureServing.Serve(handler, 0, stopCh); err != nil {
@@ -148,7 +148,7 @@ func Run(c *cloudcontrollerconfig.CompletedConfig, stopCh <-chan struct{}) error
}
}
if !c.ComponentConfig.GenericComponent.LeaderElection.LeaderElect {
if !c.ComponentConfig.Generic.LeaderElection.LeaderElect {
run(context.TODO())
panic("unreachable")
}
@@ -162,7 +162,7 @@ func Run(c *cloudcontrollerconfig.CompletedConfig, stopCh <-chan struct{}) error
id = id + "_" + string(uuid.NewUUID())
// Lock required for leader election
rl, err := resourcelock.New(c.ComponentConfig.GenericComponent.LeaderElection.ResourceLock,
rl, err := resourcelock.New(c.ComponentConfig.Generic.LeaderElection.ResourceLock,
"kube-system",
"cloud-controller-manager",
c.LeaderElectionClient.CoreV1(),
@@ -177,9 +177,9 @@ func Run(c *cloudcontrollerconfig.CompletedConfig, stopCh <-chan struct{}) error
// Try and become the leader and start cloud controller manager loops
leaderelection.RunOrDie(context.TODO(), leaderelection.LeaderElectionConfig{
Lock: rl,
LeaseDuration: c.ComponentConfig.GenericComponent.LeaderElection.LeaseDuration.Duration,
RenewDeadline: c.ComponentConfig.GenericComponent.LeaderElection.RenewDeadline.Duration,
RetryPeriod: c.ComponentConfig.GenericComponent.LeaderElection.RetryPeriod.Duration,
LeaseDuration: c.ComponentConfig.Generic.LeaderElection.LeaseDuration.Duration,
RenewDeadline: c.ComponentConfig.Generic.LeaderElection.RenewDeadline.Duration,
RetryPeriod: c.ComponentConfig.Generic.LeaderElection.RetryPeriod.Duration,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: run,
OnStoppedLeading: func() {
@@ -208,12 +208,12 @@ func startControllers(c *cloudcontrollerconfig.CompletedConfig, stop <-chan stru
c.ComponentConfig.NodeStatusUpdateFrequency.Duration)
nodeController.Run(stop)
time.Sleep(wait.Jitter(c.ComponentConfig.GenericComponent.ControllerStartInterval.Duration, ControllerStartJitter))
time.Sleep(wait.Jitter(c.ComponentConfig.Generic.ControllerStartInterval.Duration, ControllerStartJitter))
// Start the PersistentVolumeLabelController
pvlController := cloudcontrollers.NewPersistentVolumeLabelController(client("pvl-controller"), cloud)
go pvlController.Run(5, stop)
time.Sleep(wait.Jitter(c.ComponentConfig.GenericComponent.ControllerStartInterval.Duration, ControllerStartJitter))
time.Sleep(wait.Jitter(c.ComponentConfig.Generic.ControllerStartInterval.Duration, ControllerStartJitter))
// Start the service controller
serviceController, err := servicecontroller.New(
@@ -227,7 +227,7 @@ func startControllers(c *cloudcontrollerconfig.CompletedConfig, stop <-chan stru
glog.Errorf("Failed to start service controller: %v", err)
} else {
go serviceController.Run(stop, int(c.ComponentConfig.ServiceController.ConcurrentServiceSyncs))
time.Sleep(wait.Jitter(c.ComponentConfig.GenericComponent.ControllerStartInterval.Duration, ControllerStartJitter))
time.Sleep(wait.Jitter(c.ComponentConfig.Generic.ControllerStartInterval.Duration, ControllerStartJitter))
}
// If CIDRs should be allocated for pods and set on the CloudProvider, then start the route controller
@@ -245,7 +245,7 @@ func startControllers(c *cloudcontrollerconfig.CompletedConfig, stop <-chan stru
routeController := routecontroller.New(routes, client("route-controller"), c.SharedInformers.Core().V1().Nodes(), c.ComponentConfig.KubeCloudShared.ClusterName, clusterCIDR)
go routeController.Run(stop, c.ComponentConfig.KubeCloudShared.RouteReconciliationPeriod.Duration)
time.Sleep(wait.Jitter(c.ComponentConfig.GenericComponent.ControllerStartInterval.Duration, ControllerStartJitter))
time.Sleep(wait.Jitter(c.ComponentConfig.Generic.ControllerStartInterval.Duration, ControllerStartJitter))
}
} else {
glog.Infof("Will not configure cloud provider routes for allocate-node-cidrs: %v, configure-cloud-routes: %v.", c.ComponentConfig.KubeCloudShared.AllocateNodeCIDRs, c.ComponentConfig.KubeCloudShared.ConfigureCloudRoutes)

View File

@@ -55,9 +55,7 @@ const (
// CloudControllerManagerOptions is the main context object for the controller manager.
type CloudControllerManagerOptions struct {
CloudProvider *cmoptions.CloudProviderOptions
Debugging *cmoptions.DebuggingOptions
GenericComponent *cmoptions.GenericComponentConfigOptions
Generic *cmoptions.GenericControllerManagerConfigurationOptions
KubeCloudShared *cmoptions.KubeCloudSharedOptions
ServiceController *cmoptions.ServiceControllerOptions
@@ -82,17 +80,15 @@ func NewCloudControllerManagerOptions() (*CloudControllerManagerOptions, error)
}
s := CloudControllerManagerOptions{
CloudProvider: &cmoptions.CloudProviderOptions{},
Debugging: &cmoptions.DebuggingOptions{},
GenericComponent: cmoptions.NewGenericComponentConfigOptions(componentConfig.GenericComponent),
KubeCloudShared: cmoptions.NewKubeCloudSharedOptions(componentConfig.KubeCloudShared),
Generic: cmoptions.NewGenericControllerManagerConfigurationOptions(componentConfig.Generic),
KubeCloudShared: cmoptions.NewKubeCloudSharedOptions(componentConfig.KubeCloudShared),
ServiceController: &cmoptions.ServiceControllerOptions{
ConcurrentServiceSyncs: componentConfig.ServiceController.ConcurrentServiceSyncs,
},
SecureServing: apiserveroptions.NewSecureServingOptions().WithLoopback(),
InsecureServing: (&apiserveroptions.DeprecatedInsecureServingOptions{
BindAddress: net.ParseIP(componentConfig.KubeCloudShared.Address),
BindPort: int(componentConfig.KubeCloudShared.Port),
BindAddress: net.ParseIP(componentConfig.Generic.Address),
BindPort: int(componentConfig.Generic.Port),
BindNetwork: "tcp",
}).WithLoopback(),
Authentication: apiserveroptions.NewDelegatingAuthenticationOptions(),
@@ -112,31 +108,35 @@ func NewCloudControllerManagerOptions() (*CloudControllerManagerOptions, error)
}
// NewDefaultComponentConfig returns cloud-controller manager configuration object.
func NewDefaultComponentConfig(insecurePort int32) (componentconfig.CloudControllerManagerConfiguration, error) {
func NewDefaultComponentConfig(insecurePort int32) (*componentconfig.CloudControllerManagerConfiguration, error) {
// TODO: This code will be fixed up/improved when the ccm API types are moved to their own, real API group out of
// pkg/apis/componentconfig to cmd/cloud-controller-manager/app/apis/
scheme := runtime.NewScheme()
if err := componentconfigv1alpha1.AddToScheme(scheme); err != nil {
return componentconfig.CloudControllerManagerConfiguration{}, err
return nil, err
}
if err := componentconfig.AddToScheme(scheme); err != nil {
return componentconfig.CloudControllerManagerConfiguration{}, err
return nil, err
}
scheme.AddKnownTypes(componentconfigv1alpha1.SchemeGroupVersion, &componentconfigv1alpha1.CloudControllerManagerConfiguration{})
scheme.AddKnownTypes(componentconfig.SchemeGroupVersion, &componentconfig.CloudControllerManagerConfiguration{})
versioned := componentconfigv1alpha1.CloudControllerManagerConfiguration{}
scheme.Default(&versioned)
internal := componentconfig.CloudControllerManagerConfiguration{}
if err := scheme.Convert(&versioned, &internal, nil); err != nil {
versioned := &componentconfigv1alpha1.CloudControllerManagerConfiguration{}
internal := &componentconfig.CloudControllerManagerConfiguration{}
scheme.Default(versioned)
if err := scheme.Convert(versioned, internal, nil); err != nil {
return internal, err
}
internal.KubeCloudShared.Port = insecurePort
internal.Generic.Port = insecurePort
return internal, nil
}
// Flags returns flags for a specific APIServer by section name
func (o *CloudControllerManagerOptions) Flags() (fss apiserverflag.NamedFlagSets) {
o.CloudProvider.AddFlags(fss.FlagSet("cloud provider"))
o.Debugging.AddFlags(fss.FlagSet("debugging"))
o.GenericComponent.AddFlags(fss.FlagSet("generic"))
func (o *CloudControllerManagerOptions) Flags() apiserverflag.NamedFlagSets {
fss := apiserverflag.NamedFlagSets{}
o.Generic.AddFlags(&fss, []string{}, []string{})
// TODO: Implement the --controllers flag fully for the ccm
fss.FlagSet("generic").MarkHidden("controllers")
o.KubeCloudShared.AddFlags(fss.FlagSet("generic"))
o.ServiceController.AddFlags(fss.FlagSet("service controller"))
@@ -158,13 +158,7 @@ func (o *CloudControllerManagerOptions) Flags() (fss apiserverflag.NamedFlagSets
// ApplyTo fills up cloud controller manager config with options.
func (o *CloudControllerManagerOptions) ApplyTo(c *cloudcontrollerconfig.Config, userAgent string) error {
var err error
if err = o.CloudProvider.ApplyTo(&c.ComponentConfig.CloudProvider); err != nil {
return err
}
if err = o.Debugging.ApplyTo(&c.ComponentConfig.Debugging); err != nil {
return err
}
if err = o.GenericComponent.ApplyTo(&c.ComponentConfig.GenericComponent); err != nil {
if err = o.Generic.ApplyTo(&c.ComponentConfig.Generic); err != nil {
return err
}
if err = o.KubeCloudShared.ApplyTo(&c.ComponentConfig.KubeCloudShared); err != nil {
@@ -192,9 +186,9 @@ func (o *CloudControllerManagerOptions) ApplyTo(c *cloudcontrollerconfig.Config,
if err != nil {
return err
}
c.Kubeconfig.ContentConfig.ContentType = o.GenericComponent.ContentType
c.Kubeconfig.QPS = o.GenericComponent.KubeAPIQPS
c.Kubeconfig.Burst = int(o.GenericComponent.KubeAPIBurst)
c.Kubeconfig.ContentConfig.ContentType = o.Generic.ClientConnection.ContentType
c.Kubeconfig.QPS = o.Generic.ClientConnection.QPS
c.Kubeconfig.Burst = int(o.Generic.ClientConnection.Burst)
c.Client, err = clientset.NewForConfig(restclient.AddUserAgent(c.Kubeconfig, userAgent))
if err != nil {
@@ -213,7 +207,7 @@ func (o *CloudControllerManagerOptions) ApplyTo(c *cloudcontrollerconfig.Config,
ClientConfig: restclient.AnonymousClientConfig(c.Kubeconfig),
CoreClient: c.Client.CoreV1(),
AuthenticationClient: c.Client.AuthenticationV1(),
Namespace: "kube-system",
Namespace: metav1.NamespaceSystem,
}
} else {
c.ClientBuilder = rootClientBuilder
@@ -223,8 +217,8 @@ func (o *CloudControllerManagerOptions) ApplyTo(c *cloudcontrollerconfig.Config,
// sync back to component config
// TODO: find more elegant way than syncing back the values.
c.ComponentConfig.KubeCloudShared.Port = int32(o.InsecureServing.BindPort)
c.ComponentConfig.KubeCloudShared.Address = o.InsecureServing.BindAddress.String()
c.ComponentConfig.Generic.Port = int32(o.InsecureServing.BindPort)
c.ComponentConfig.Generic.Address = o.InsecureServing.BindAddress.String()
c.ComponentConfig.NodeStatusUpdateFrequency = o.NodeStatusUpdateFrequency
@@ -235,9 +229,7 @@ func (o *CloudControllerManagerOptions) ApplyTo(c *cloudcontrollerconfig.Config,
func (o *CloudControllerManagerOptions) Validate() error {
errors := []error{}
errors = append(errors, o.CloudProvider.Validate()...)
errors = append(errors, o.Debugging.Validate()...)
errors = append(errors, o.GenericComponent.Validate()...)
errors = append(errors, o.Generic.Validate(nil, nil)...)
errors = append(errors, o.KubeCloudShared.Validate()...)
errors = append(errors, o.ServiceController.Validate()...)
errors = append(errors, o.SecureServing.Validate()...)
@@ -245,7 +237,7 @@ func (o *CloudControllerManagerOptions) Validate() error {
errors = append(errors, o.Authentication.Validate()...)
errors = append(errors, o.Authorization.Validate()...)
if len(o.CloudProvider.Name) == 0 {
if len(o.KubeCloudShared.CloudProvider.Name) == 0 {
errors = append(errors, fmt.Errorf("--cloud-provider cannot be empty"))
}
@@ -256,7 +248,7 @@ func (o *CloudControllerManagerOptions) Validate() error {
func resyncPeriod(c *cloudcontrollerconfig.Config) func() time.Duration {
return func() time.Duration {
factor := rand.Float64() + 1
return time.Duration(float64(c.ComponentConfig.GenericComponent.MinResyncPeriod.Nanoseconds()) * factor)
return time.Duration(float64(c.ComponentConfig.Generic.MinResyncPeriod.Nanoseconds()) * factor)
}
}

View File

@@ -24,6 +24,7 @@ import (
"github.com/spf13/pflag"
apimachineryconfig "k8s.io/apimachinery/pkg/apis/config"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/diff"
apiserverconfig "k8s.io/apiserver/pkg/apis/config"
@@ -35,18 +36,15 @@ func TestDefaultFlags(t *testing.T) {
s, _ := NewCloudControllerManagerOptions()
expected := &CloudControllerManagerOptions{
CloudProvider: &cmoptions.CloudProviderOptions{
Name: "",
CloudConfigFile: "",
},
Debugging: &cmoptions.DebuggingOptions{
EnableContentionProfiling: false,
},
GenericComponent: &cmoptions.GenericComponentConfigOptions{
MinResyncPeriod: metav1.Duration{Duration: 12 * time.Hour},
ContentType: "application/vnd.kubernetes.protobuf",
KubeAPIQPS: 20.0,
KubeAPIBurst: 30,
Generic: &cmoptions.GenericControllerManagerConfigurationOptions{
Port: 10253, // Note: InsecureServingOptions.ApplyTo will write the flag value back into the component config
Address: "0.0.0.0", // Note: InsecureServingOptions.ApplyTo will write the flag value back into the component config
MinResyncPeriod: metav1.Duration{Duration: 12 * time.Hour},
ClientConnection: apimachineryconfig.ClientConnectionConfiguration{
ContentType: "application/vnd.kubernetes.protobuf",
QPS: 20.0,
Burst: 30,
},
ControllerStartInterval: metav1.Duration{Duration: 0},
LeaderElection: apiserverconfig.LeaderElectionConfiguration{
ResourceLock: "endpoints",
@@ -55,10 +53,12 @@ func TestDefaultFlags(t *testing.T) {
RenewDeadline: metav1.Duration{Duration: 10 * time.Second},
RetryPeriod: metav1.Duration{Duration: 2 * time.Second},
},
Debugging: &cmoptions.DebuggingOptions{
EnableContentionProfiling: false,
},
Controllers: []string{"*"},
},
KubeCloudShared: &cmoptions.KubeCloudSharedOptions{
Port: 10253, // Note: DeprecatedInsecureServingOptions.ApplyTo will write the flag value back into the component config
Address: "0.0.0.0", // Note: DeprecatedInsecureServingOptions.ApplyTo will write the flag value back into the component config
RouteReconciliationPeriod: metav1.Duration{Duration: 10 * time.Second},
NodeMonitorPeriod: metav1.Duration{Duration: 5 * time.Second},
ClusterName: "kubernetes",
@@ -66,6 +66,10 @@ func TestDefaultFlags(t *testing.T) {
AllocateNodeCIDRs: false,
CIDRAllocatorType: "",
ConfigureCloudRoutes: true,
CloudProvider: &cmoptions.CloudProviderOptions{
Name: "",
CloudConfigFile: "",
},
},
ServiceController: &cmoptions.ServiceControllerOptions{
ConcurrentServiceSyncs: 1,
@@ -150,18 +154,15 @@ func TestAddFlags(t *testing.T) {
fs.Parse(args)
expected := &CloudControllerManagerOptions{
CloudProvider: &cmoptions.CloudProviderOptions{
Name: "gce",
CloudConfigFile: "/cloud-config",
},
Debugging: &cmoptions.DebuggingOptions{
EnableContentionProfiling: true,
},
GenericComponent: &cmoptions.GenericComponentConfigOptions{
MinResyncPeriod: metav1.Duration{Duration: 100 * time.Minute},
ContentType: "application/vnd.kubernetes.protobuf",
KubeAPIQPS: 50.0,
KubeAPIBurst: 100,
Generic: &cmoptions.GenericControllerManagerConfigurationOptions{
Port: 10253, // Note: InsecureServingOptions.ApplyTo will write the flag value back into the component config
Address: "0.0.0.0", // Note: InsecureServingOptions.ApplyTo will write the flag value back into the component config
MinResyncPeriod: metav1.Duration{Duration: 100 * time.Minute},
ClientConnection: apimachineryconfig.ClientConnectionConfiguration{
ContentType: "application/vnd.kubernetes.protobuf",
QPS: 50.0,
Burst: 100,
},
ControllerStartInterval: metav1.Duration{Duration: 2 * time.Minute},
LeaderElection: apiserverconfig.LeaderElectionConfiguration{
ResourceLock: "configmap",
@@ -170,10 +171,16 @@ func TestAddFlags(t *testing.T) {
RenewDeadline: metav1.Duration{Duration: 15 * time.Second},
RetryPeriod: metav1.Duration{Duration: 5 * time.Second},
},
Debugging: &cmoptions.DebuggingOptions{
EnableContentionProfiling: true,
},
Controllers: []string{"*"},
},
KubeCloudShared: &cmoptions.KubeCloudSharedOptions{
Port: 10253, // Note: DeprecatedInsecureServingOptions.ApplyTo will write the flag value back into the component config
Address: "0.0.0.0", // Note: DeprecatedInsecureServingOptions.ApplyTo will write the flag value back into the component config
CloudProvider: &cmoptions.CloudProviderOptions{
Name: "gce",
CloudConfigFile: "/cloud-config",
},
RouteReconciliationPeriod: metav1.Duration{Duration: 30 * time.Second},
NodeMonitorPeriod: metav1.Duration{Duration: 5 * time.Second},
ClusterName: "k8s",