Move CCM to staging k8s.io/cloud-provider

This commit is contained in:
cici37
2020-10-20 10:28:15 -07:00
parent ad6a2af7d8
commit 9465d95ea6
55 changed files with 387 additions and 278 deletions

View File

@@ -1,84 +1,4 @@
rules:
- selectorRegexp: k8s[.]io/kubernetes
allowedPrefixes:
- k8s.io/kubernetes/cmd/cloud-controller-manager/app
- k8s.io/kubernetes/cmd/cloud-controller-manager/app/apis/config
- k8s.io/kubernetes/cmd/cloud-controller-manager/app/apis/config/scheme
- k8s.io/kubernetes/cmd/cloud-controller-manager/app/apis/config/v1alpha1
- k8s.io/kubernetes/cmd/cloud-controller-manager/app/config
- k8s.io/kubernetes/cmd/cloud-controller-manager/app/options
- k8s.io/kubernetes/pkg/api/legacyscheme
- k8s.io/kubernetes/pkg/api/service
- k8s.io/kubernetes/pkg/api/v1/pod
- k8s.io/kubernetes/pkg/apis/apps
- k8s.io/kubernetes/pkg/apis/autoscaling
- k8s.io/kubernetes/pkg/apis/core
- k8s.io/kubernetes/pkg/apis/core/helper
- k8s.io/kubernetes/pkg/apis/core/install
- k8s.io/kubernetes/pkg/apis/core/pods
- k8s.io/kubernetes/pkg/apis/core/v1
- k8s.io/kubernetes/pkg/apis/core/v1/helper
- k8s.io/kubernetes/pkg/apis/core/validation
- k8s.io/kubernetes/pkg/apis/scheduling
- k8s.io/kubernetes/pkg/capabilities
- k8s.io/kubernetes/pkg/client/leaderelectionconfig
- k8s.io/kubernetes/pkg/cloudprovider/providers
- k8s.io/kubernetes/pkg/controller
- k8s.io/kubernetes/pkg/controller/certificates/signer/config
- k8s.io/kubernetes/pkg/controller/certificates/signer/config/v1alpha1
- k8s.io/kubernetes/pkg/controller/cloud
- k8s.io/kubernetes/pkg/controller/daemon/config
- k8s.io/kubernetes/pkg/controller/daemon/config/v1alpha1
- k8s.io/kubernetes/pkg/controller/deployment/config
- k8s.io/kubernetes/pkg/controller/deployment/config/v1alpha1
- k8s.io/kubernetes/pkg/controller/endpoint/config
- k8s.io/kubernetes/pkg/controller/endpoint/config/v1alpha1
- k8s.io/kubernetes/pkg/controller/endpointslice/config
- k8s.io/kubernetes/pkg/controller/endpointslice/config/v1alpha1
- k8s.io/kubernetes/pkg/controller/endpointslicemirroring/config
- k8s.io/kubernetes/pkg/controller/endpointslicemirroring/config/v1alpha1
- k8s.io/kubernetes/pkg/controller/garbagecollector/config
- k8s.io/kubernetes/pkg/controller/garbagecollector/config/v1alpha1
- k8s.io/kubernetes/pkg/controller/job/config
- k8s.io/kubernetes/pkg/controller/job/config/v1alpha1
- k8s.io/kubernetes/pkg/controller/namespace/config
- k8s.io/kubernetes/pkg/controller/namespace/config/v1alpha1
- k8s.io/kubernetes/pkg/controller/nodeipam/config
- k8s.io/kubernetes/pkg/controller/nodeipam/config/v1alpha1
- k8s.io/kubernetes/pkg/controller/nodelifecycle/config
- k8s.io/kubernetes/pkg/controller/nodelifecycle/config/v1alpha1
- k8s.io/kubernetes/pkg/controller/podautoscaler/config
- k8s.io/kubernetes/pkg/controller/podautoscaler/config/v1alpha1
- k8s.io/kubernetes/pkg/controller/podgc/config
- k8s.io/kubernetes/pkg/controller/podgc/config/v1alpha1
- k8s.io/kubernetes/pkg/controller/replicaset/config
- k8s.io/kubernetes/pkg/controller/replicaset/config/v1alpha1
- k8s.io/kubernetes/pkg/controller/replication/config
- k8s.io/kubernetes/pkg/controller/replication/config/v1alpha1
- k8s.io/kubernetes/pkg/controller/resourcequota/config
- k8s.io/kubernetes/pkg/controller/resourcequota/config/v1alpha1
- k8s.io/kubernetes/pkg/controller/route
- k8s.io/kubernetes/pkg/controller/service
- k8s.io/kubernetes/pkg/controller/serviceaccount/config
- k8s.io/kubernetes/pkg/controller/serviceaccount/config/v1alpha1
- k8s.io/kubernetes/pkg/controller/statefulset/config
- k8s.io/kubernetes/pkg/controller/statefulset/config/v1alpha1
- k8s.io/kubernetes/pkg/controller/testutil
- k8s.io/kubernetes/pkg/controller/ttlafterfinished/config
- k8s.io/kubernetes/pkg/controller/ttlafterfinished/config/v1alpha1
- k8s.io/kubernetes/pkg/controller/volume/attachdetach/config
- k8s.io/kubernetes/pkg/controller/volume/attachdetach/config/v1alpha1
- k8s.io/kubernetes/pkg/controller/volume/persistentvolume/config
- k8s.io/kubernetes/pkg/controller/volume/persistentvolume/config/v1alpha1
- k8s.io/kubernetes/pkg/features
- k8s.io/kubernetes/pkg/fieldpath
- k8s.io/kubernetes/pkg/kubelet/types
- k8s.io/kubernetes/pkg/cluster/ports
- k8s.io/kubernetes/pkg/security/apparmor
- k8s.io/kubernetes/pkg/securitycontext
- k8s.io/kubernetes/pkg/serviceaccount
- k8s.io/kubernetes/pkg/util/configz
- k8s.io/kubernetes/pkg/util/hash
- k8s.io/kubernetes/pkg/util/node
- k8s.io/kubernetes/pkg/util/parsers
- k8s.io/kubernetes/pkg/util/taints
- k8s.io/kubernetes/cmd/cloud-controller-manager

View File

@@ -22,7 +22,7 @@ go_library(
],
importpath = "k8s.io/kubernetes/cmd/cloud-controller-manager",
deps = [
"//cmd/cloud-controller-manager/app:go_default_library",
"//staging/src/k8s.io/cloud-provider/app:go_default_library",
"//staging/src/k8s.io/component-base/logs:go_default_library",
"//staging/src/k8s.io/component-base/metrics/prometheus/clientgo:go_default_library",
"//staging/src/k8s.io/component-base/metrics/prometheus/version:go_default_library",
@@ -43,9 +43,6 @@ filegroup(
filegroup(
name = "all-srcs",
srcs = [
":package-srcs",
"//cmd/cloud-controller-manager/app:all-srcs",
],
srcs = [":package-srcs"],
tags = ["automanaged"],
)

View File

@@ -1,57 +0,0 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library(
name = "go_default_library",
srcs = [
"controllermanager.go",
"core.go",
],
importpath = "k8s.io/kubernetes/cmd/cloud-controller-manager/app",
visibility = ["//visibility:public"],
deps = [
"//cmd/cloud-controller-manager/app/config:go_default_library",
"//cmd/cloud-controller-manager/app/options:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/uuid:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/server:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/server/healthz:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/client-go/tools/leaderelection:go_default_library",
"//staging/src/k8s.io/client-go/tools/leaderelection/resourcelock:go_default_library",
"//staging/src/k8s.io/cloud-provider:go_default_library",
"//staging/src/k8s.io/cloud-provider/controllers/node:go_default_library",
"//staging/src/k8s.io/cloud-provider/controllers/nodelifecycle:go_default_library",
"//staging/src/k8s.io/cloud-provider/controllers/route:go_default_library",
"//staging/src/k8s.io/cloud-provider/controllers/service:go_default_library",
"//staging/src/k8s.io/component-base/cli/flag:go_default_library",
"//staging/src/k8s.io/component-base/cli/globalflag:go_default_library",
"//staging/src/k8s.io/component-base/configz:go_default_library",
"//staging/src/k8s.io/component-base/term:go_default_library",
"//staging/src/k8s.io/component-base/version:go_default_library",
"//staging/src/k8s.io/component-base/version/verflag:go_default_library",
"//staging/src/k8s.io/controller-manager/app:go_default_library",
"//vendor/github.com/spf13/cobra:go_default_library",
"//vendor/k8s.io/klog/v2:go_default_library",
"//vendor/k8s.io/utils/net:go_default_library",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [
":package-srcs",
"//cmd/cloud-controller-manager/app/config:all-srcs",
"//cmd/cloud-controller-manager/app/options:all-srcs",
"//cmd/cloud-controller-manager/app/testing:all-srcs",
],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)

View File

@@ -1,31 +0,0 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library(
name = "go_default_library",
srcs = ["config.go"],
importpath = "k8s.io/kubernetes/cmd/cloud-controller-manager/app/config",
visibility = ["//visibility:public"],
deps = [
"//staging/src/k8s.io/apiserver/pkg/server:go_default_library",
"//staging/src/k8s.io/client-go/informers:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/rest:go_default_library",
"//staging/src/k8s.io/client-go/tools/record:go_default_library",
"//staging/src/k8s.io/cloud-provider:go_default_library",
"//staging/src/k8s.io/cloud-provider/app/apis/config:go_default_library",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)

View File

@@ -1,81 +0,0 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package app
import (
apiserver "k8s.io/apiserver/pkg/server"
"k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/tools/record"
cloudprovider "k8s.io/cloud-provider"
ccmconfig "k8s.io/cloud-provider/app/apis/config"
)
// Config is the main context object for the cloud controller manager.
type Config struct {
ComponentConfig ccmconfig.CloudControllerManagerConfiguration
SecureServing *apiserver.SecureServingInfo
// LoopbackClientConfig is a config for a privileged loopback connection
LoopbackClientConfig *restclient.Config
// TODO: remove deprecated insecure serving
InsecureServing *apiserver.DeprecatedInsecureServingInfo
Authentication apiserver.AuthenticationInfo
Authorization apiserver.AuthorizationInfo
// the general kube client
Client *clientset.Clientset
// the client only used for leader election
LeaderElectionClient *clientset.Clientset
// the rest config for the master
Kubeconfig *restclient.Config
// the event sink
EventRecorder record.EventRecorder
// ClientBuilder will provide a client for this controller to use
ClientBuilder cloudprovider.ControllerClientBuilder
// VersionedClient will provide a client for informers
VersionedClient clientset.Interface
// SharedInformers gives access to informers for the controller.
SharedInformers informers.SharedInformerFactory
}
type completedConfig struct {
*Config
}
// CompletedConfig same as Config, just to swap private object.
type CompletedConfig struct {
// Embed a private pointer that cannot be instantiated outside of this package.
*completedConfig
}
// Complete fills in any fields not set that are required to have valid data. It's mutating the receiver.
func (c *Config) Complete() *CompletedConfig {
cc := completedConfig{c}
apiserver.AuthorizeClientBearerToken(c.LoopbackClientConfig, &c.Authentication, &c.Authorization)
return &CompletedConfig{&cc}
}

View File

@@ -1,293 +0,0 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package app
import (
"context"
"flag"
"fmt"
"net/http"
"os"
"time"
"github.com/spf13/cobra"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/server"
"k8s.io/apiserver/pkg/server/healthz"
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/leaderelection/resourcelock"
cloudprovider "k8s.io/cloud-provider"
cliflag "k8s.io/component-base/cli/flag"
"k8s.io/component-base/cli/globalflag"
"k8s.io/component-base/configz"
"k8s.io/component-base/term"
"k8s.io/component-base/version"
"k8s.io/component-base/version/verflag"
genericcontrollermanager "k8s.io/controller-manager/app"
"k8s.io/klog/v2"
cloudcontrollerconfig "k8s.io/kubernetes/cmd/cloud-controller-manager/app/config"
"k8s.io/kubernetes/cmd/cloud-controller-manager/app/options"
)
const (
// ControllerStartJitter is the jitter value used when starting controller managers.
ControllerStartJitter = 1.0
// ConfigzName is the name used for register cloud-controller manager /configz, same with GroupName.
ConfigzName = "cloudcontrollermanager.config.k8s.io"
)
// NewCloudControllerManagerCommand creates a *cobra.Command object with default parameters
func NewCloudControllerManagerCommand() *cobra.Command {
s, err := options.NewCloudControllerManagerOptions()
if err != nil {
klog.Fatalf("unable to initialize command options: %v", err)
}
cmd := &cobra.Command{
Use: "cloud-controller-manager",
Long: `The Cloud controller manager is a daemon that embeds
the cloud specific control loops shipped with Kubernetes.`,
Run: func(cmd *cobra.Command, args []string) {
verflag.PrintAndExitIfRequested()
cliflag.PrintFlags(cmd.Flags())
c, err := s.Config(KnownControllers(), ControllersDisabledByDefault.List())
if err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
os.Exit(1)
}
if err := Run(c.Complete(), wait.NeverStop); err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
os.Exit(1)
}
},
Args: func(cmd *cobra.Command, args []string) error {
for _, arg := range args {
if len(arg) > 0 {
return fmt.Errorf("%q does not take any arguments, got %q", cmd.CommandPath(), args)
}
}
return nil
},
}
fs := cmd.Flags()
namedFlagSets := s.Flags(KnownControllers(), ControllersDisabledByDefault.List())
verflag.AddFlags(namedFlagSets.FlagSet("global"))
globalflag.AddGlobalFlags(namedFlagSets.FlagSet("global"), cmd.Name())
if flag.CommandLine.Lookup("cloud-provider-gce-lb-src-cidrs") != nil {
// hoist this flag from the global flagset to preserve the commandline until
// the gce cloudprovider is removed.
globalflag.Register(namedFlagSets.FlagSet("generic"), "cloud-provider-gce-lb-src-cidrs")
}
if flag.CommandLine.Lookup("cloud-provider-gce-l7lb-src-cidrs") != nil {
globalflag.Register(namedFlagSets.FlagSet("generic"), "cloud-provider-gce-l7lb-src-cidrs")
}
for _, f := range namedFlagSets.FlagSets {
fs.AddFlagSet(f)
}
usageFmt := "Usage:\n %s\n"
cols, _, _ := term.TerminalSize(cmd.OutOrStdout())
cmd.SetUsageFunc(func(cmd *cobra.Command) error {
fmt.Fprintf(cmd.OutOrStderr(), usageFmt, cmd.UseLine())
cliflag.PrintSections(cmd.OutOrStderr(), namedFlagSets, cols)
return nil
})
cmd.SetHelpFunc(func(cmd *cobra.Command, args []string) {
fmt.Fprintf(cmd.OutOrStdout(), "%s\n\n"+usageFmt, cmd.Long, cmd.UseLine())
cliflag.PrintSections(cmd.OutOrStdout(), namedFlagSets, cols)
})
return cmd
}
// Run runs the ExternalCMServer. This should never exit.
func Run(c *cloudcontrollerconfig.CompletedConfig, stopCh <-chan struct{}) error {
// To help debugging, immediately log version
klog.Infof("Version: %+v", version.Get())
cloud, err := cloudprovider.InitCloudProvider(c.ComponentConfig.KubeCloudShared.CloudProvider.Name, c.ComponentConfig.KubeCloudShared.CloudProvider.CloudConfigFile)
if err != nil {
klog.Fatalf("Cloud provider could not be initialized: %v", err)
}
if cloud == nil {
klog.Fatalf("cloud provider is nil")
}
if !cloud.HasClusterID() {
if c.ComponentConfig.KubeCloudShared.AllowUntaggedCloud {
klog.Warning("detected a cluster without a ClusterID. A ClusterID will be required in the future. Please tag your cluster to avoid any future issues")
} else {
klog.Fatalf("no ClusterID found. A ClusterID is required for the cloud provider to function properly. This check can be bypassed by setting the allow-untagged-cloud option")
}
}
// setup /configz endpoint
if cz, err := configz.New(ConfigzName); err == nil {
cz.Set(c.ComponentConfig)
} else {
klog.Errorf("unable to register configz: %v", err)
}
// Setup any health checks we will want to use.
var checks []healthz.HealthChecker
var electionChecker *leaderelection.HealthzAdaptor
if c.ComponentConfig.Generic.LeaderElection.LeaderElect {
electionChecker = leaderelection.NewLeaderHealthzAdaptor(time.Second * 20)
checks = append(checks, electionChecker)
}
// Start the controller manager HTTP server
if c.SecureServing != nil {
unsecuredMux := genericcontrollermanager.NewBaseHandler(&c.ComponentConfig.Generic.Debugging, checks...)
handler := genericcontrollermanager.BuildHandlerChain(unsecuredMux, &c.Authorization, &c.Authentication)
// TODO: handle stoppedCh returned by c.SecureServing.Serve
if _, err := c.SecureServing.Serve(handler, 0, stopCh); err != nil {
return err
}
}
if c.InsecureServing != nil {
unsecuredMux := genericcontrollermanager.NewBaseHandler(&c.ComponentConfig.Generic.Debugging, checks...)
insecureSuperuserAuthn := server.AuthenticationInfo{Authenticator: &server.InsecureSuperuser{}}
handler := genericcontrollermanager.BuildHandlerChain(unsecuredMux, nil, &insecureSuperuserAuthn)
if err := c.InsecureServing.Serve(handler, 0, stopCh); err != nil {
return err
}
}
run := func(ctx context.Context) {
if err := startControllers(c, ctx.Done(), cloud, newControllerInitializers()); err != nil {
klog.Fatalf("error running controllers: %v", err)
}
}
if !c.ComponentConfig.Generic.LeaderElection.LeaderElect {
run(context.TODO())
panic("unreachable")
}
// Identity used to distinguish between multiple cloud controller manager instances
id, err := os.Hostname()
if err != nil {
return err
}
// add a uniquifier so that two processes on the same host don't accidentally both become active
id = id + "_" + string(uuid.NewUUID())
// Lock required for leader election
rl, err := resourcelock.New(c.ComponentConfig.Generic.LeaderElection.ResourceLock,
c.ComponentConfig.Generic.LeaderElection.ResourceNamespace,
c.ComponentConfig.Generic.LeaderElection.ResourceName,
c.LeaderElectionClient.CoreV1(),
c.LeaderElectionClient.CoordinationV1(),
resourcelock.ResourceLockConfig{
Identity: id,
EventRecorder: c.EventRecorder,
})
if err != nil {
klog.Fatalf("error creating lock: %v", err)
}
// Try and become the leader and start cloud controller manager loops
leaderelection.RunOrDie(context.TODO(), leaderelection.LeaderElectionConfig{
Lock: rl,
LeaseDuration: c.ComponentConfig.Generic.LeaderElection.LeaseDuration.Duration,
RenewDeadline: c.ComponentConfig.Generic.LeaderElection.RenewDeadline.Duration,
RetryPeriod: c.ComponentConfig.Generic.LeaderElection.RetryPeriod.Duration,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: run,
OnStoppedLeading: func() {
klog.Fatalf("leaderelection lost")
},
},
WatchDog: electionChecker,
Name: "cloud-controller-manager",
})
panic("unreachable")
}
// startControllers starts the cloud specific controller loops.
func startControllers(c *cloudcontrollerconfig.CompletedConfig, stopCh <-chan struct{}, cloud cloudprovider.Interface, controllers map[string]initFunc) error {
// Initialize the cloud provider with a reference to the clientBuilder
cloud.Initialize(c.ClientBuilder, stopCh)
// Set the informer on the user cloud object
if informerUserCloud, ok := cloud.(cloudprovider.InformerUser); ok {
informerUserCloud.SetInformers(c.SharedInformers)
}
for controllerName, initFn := range controllers {
if !genericcontrollermanager.IsControllerEnabled(controllerName, ControllersDisabledByDefault, c.ComponentConfig.Generic.Controllers) {
klog.Warningf("%q is disabled", controllerName)
continue
}
klog.V(1).Infof("Starting %q", controllerName)
_, started, err := initFn(c, cloud, stopCh)
if err != nil {
klog.Errorf("Error starting %q", controllerName)
return err
}
if !started {
klog.Warningf("Skipping %q", controllerName)
continue
}
klog.Infof("Started %q", controllerName)
time.Sleep(wait.Jitter(c.ComponentConfig.Generic.ControllerStartInterval.Duration, ControllerStartJitter))
}
// If apiserver is not running we should wait for some time and fail only then. This is particularly
// important when we start apiserver and controller manager at the same time.
if err := genericcontrollermanager.WaitForAPIServer(c.VersionedClient, 10*time.Second); err != nil {
klog.Fatalf("Failed to wait for apiserver being healthy: %v", err)
}
c.SharedInformers.Start(stopCh)
select {}
}
// initFunc is used to launch a particular controller. It may run additional "should I activate checks".
// Any error returned will cause the controller process to `Fatal`
// The bool indicates whether the controller was enabled.
type initFunc func(ctx *cloudcontrollerconfig.CompletedConfig, cloud cloudprovider.Interface, stop <-chan struct{}) (debuggingHandler http.Handler, enabled bool, err error)
// KnownControllers indicate the default controller we are known.
func KnownControllers() []string {
ret := sets.StringKeySet(newControllerInitializers())
return ret.List()
}
// ControllersDisabledByDefault is the controller disabled default when starting cloud-controller managers.
var ControllersDisabledByDefault = sets.NewString()
// newControllerInitializers is a private map of named controller groups (you can start more than one in an init func)
// paired to their initFunc. This allows for structured downstream composition and subdivision.
func newControllerInitializers() map[string]initFunc {
controllers := map[string]initFunc{}
controllers["cloud-node"] = startCloudNodeController
controllers["cloud-node-lifecycle"] = startCloudNodeLifecycleController
controllers["service"] = startServiceController
controllers["route"] = startRouteController
return controllers
}

View File

@@ -1,169 +0,0 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package app implements a server that runs a set of active
// components. This includes node controllers, service and
// route controller, and so on.
//
package app
import (
"fmt"
"net"
"net/http"
"strings"
cloudprovider "k8s.io/cloud-provider"
cloudnodecontroller "k8s.io/cloud-provider/controllers/node"
cloudnodelifecyclecontroller "k8s.io/cloud-provider/controllers/nodelifecycle"
routecontroller "k8s.io/cloud-provider/controllers/route"
servicecontroller "k8s.io/cloud-provider/controllers/service"
"k8s.io/klog/v2"
cloudcontrollerconfig "k8s.io/kubernetes/cmd/cloud-controller-manager/app/config"
netutils "k8s.io/utils/net"
utilfeature "k8s.io/apiserver/pkg/util/feature"
)
const (
// IPv6DualStack enables ipv6 dual stack feature
// Original copy from pkg/features/kube_features.go
IPv6DualStack = "IPv6DualStack"
)
func startCloudNodeController(ctx *cloudcontrollerconfig.CompletedConfig, cloud cloudprovider.Interface, stopCh <-chan struct{}) (http.Handler, bool, error) {
// Start the CloudNodeController
nodeController, err := cloudnodecontroller.NewCloudNodeController(
ctx.SharedInformers.Core().V1().Nodes(),
// cloud node controller uses existing cluster role from node-controller
ctx.ClientBuilder.ClientOrDie("node-controller"),
cloud,
ctx.ComponentConfig.NodeStatusUpdateFrequency.Duration,
)
if err != nil {
klog.Warningf("failed to start cloud node controller: %s", err)
return nil, false, nil
}
go nodeController.Run(stopCh)
return nil, true, nil
}
func startCloudNodeLifecycleController(ctx *cloudcontrollerconfig.CompletedConfig, cloud cloudprovider.Interface, stopCh <-chan struct{}) (http.Handler, bool, error) {
// Start the cloudNodeLifecycleController
cloudNodeLifecycleController, err := cloudnodelifecyclecontroller.NewCloudNodeLifecycleController(
ctx.SharedInformers.Core().V1().Nodes(),
// cloud node lifecycle controller uses existing cluster role from node-controller
ctx.ClientBuilder.ClientOrDie("node-controller"),
cloud,
ctx.ComponentConfig.KubeCloudShared.NodeMonitorPeriod.Duration,
)
if err != nil {
klog.Warningf("failed to start cloud node lifecycle controller: %s", err)
return nil, false, nil
}
go cloudNodeLifecycleController.Run(stopCh)
return nil, true, nil
}
func startServiceController(ctx *cloudcontrollerconfig.CompletedConfig, cloud cloudprovider.Interface, stopCh <-chan struct{}) (http.Handler, bool, error) {
// Start the service controller
serviceController, err := servicecontroller.New(
cloud,
ctx.ClientBuilder.ClientOrDie("service-controller"),
ctx.SharedInformers.Core().V1().Services(),
ctx.SharedInformers.Core().V1().Nodes(),
ctx.ComponentConfig.KubeCloudShared.ClusterName,
utilfeature.DefaultFeatureGate,
)
if err != nil {
// This error shouldn't fail. It lives like this as a legacy.
klog.Errorf("Failed to start service controller: %v", err)
return nil, false, nil
}
go serviceController.Run(stopCh, int(ctx.ComponentConfig.ServiceController.ConcurrentServiceSyncs))
return nil, true, nil
}
func startRouteController(ctx *cloudcontrollerconfig.CompletedConfig, cloud cloudprovider.Interface, stopCh <-chan struct{}) (http.Handler, bool, error) {
if !ctx.ComponentConfig.KubeCloudShared.AllocateNodeCIDRs || !ctx.ComponentConfig.KubeCloudShared.ConfigureCloudRoutes {
klog.Infof("Will not configure cloud provider routes for allocate-node-cidrs: %v, configure-cloud-routes: %v.", ctx.ComponentConfig.KubeCloudShared.AllocateNodeCIDRs, ctx.ComponentConfig.KubeCloudShared.ConfigureCloudRoutes)
return nil, false, nil
}
// If CIDRs should be allocated for pods and set on the CloudProvider, then start the route controller
routes, ok := cloud.Routes()
if !ok {
klog.Warning("configure-cloud-routes is set, but cloud provider does not support routes. Will not configure cloud provider routes.")
return nil, false, nil
}
// failure: bad cidrs in config
clusterCIDRs, dualStack, err := processCIDRs(ctx.ComponentConfig.KubeCloudShared.ClusterCIDR)
if err != nil {
return nil, false, err
}
// failure: more than one cidr and dual stack is not enabled
if len(clusterCIDRs) > 1 && !utilfeature.DefaultFeatureGate.Enabled(IPv6DualStack) {
return nil, false, fmt.Errorf("len of ClusterCIDRs==%v and dualstack feature is not enabled", len(clusterCIDRs))
}
// failure: more than one cidr but they are not configured as dual stack
if len(clusterCIDRs) > 1 && !dualStack {
return nil, false, fmt.Errorf("len of ClusterCIDRs==%v and they are not configured as dual stack (at least one from each IPFamily", len(clusterCIDRs))
}
// failure: more than cidrs is not allowed even with dual stack
if len(clusterCIDRs) > 2 {
return nil, false, fmt.Errorf("length of clusterCIDRs is:%v more than max allowed of 2", len(clusterCIDRs))
}
routeController := routecontroller.New(
routes,
ctx.ClientBuilder.ClientOrDie("route-controller"),
ctx.SharedInformers.Core().V1().Nodes(),
ctx.ComponentConfig.KubeCloudShared.ClusterName,
clusterCIDRs,
)
go routeController.Run(stopCh, ctx.ComponentConfig.KubeCloudShared.RouteReconciliationPeriod.Duration)
return nil, true, nil
}
// processCIDRs is a helper function that works on a comma separated cidrs and returns
// a list of typed cidrs
// a flag if cidrs represents a dual stack
// error if failed to parse any of the cidrs
func processCIDRs(cidrsList string) ([]*net.IPNet, bool, error) {
cidrsSplit := strings.Split(strings.TrimSpace(cidrsList), ",")
cidrs, err := netutils.ParseCIDRs(cidrsSplit)
if err != nil {
return nil, false, err
}
// if cidrs has an error then the previous call will fail
// safe to ignore error checking on next call
dualstack, _ := netutils.IsDualStackCIDRs(cidrs)
return cidrs, dualstack, nil
}

View File

@@ -1,68 +0,0 @@
package(default_visibility = ["//visibility:public"])
load(
"@io_bazel_rules_go//go:def.bzl",
"go_library",
"go_test",
)
go_library(
name = "go_default_library",
srcs = ["options.go"],
importpath = "k8s.io/kubernetes/cmd/cloud-controller-manager/app/options",
deps = [
"//cmd/cloud-controller-manager/app/config:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/errors:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/server/options:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/client-go/informers:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/scheme:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/rest:go_default_library",
"//staging/src/k8s.io/client-go/tools/clientcmd:go_default_library",
"//staging/src/k8s.io/client-go/tools/record:go_default_library",
"//staging/src/k8s.io/cloud-provider:go_default_library",
"//staging/src/k8s.io/cloud-provider/app/apis/config:go_default_library",
"//staging/src/k8s.io/cloud-provider/app/apis/config/scheme:go_default_library",
"//staging/src/k8s.io/cloud-provider/app/apis/config/v1alpha1:go_default_library",
"//staging/src/k8s.io/cloud-provider/options:go_default_library",
"//staging/src/k8s.io/component-base/cli/flag:go_default_library",
"//staging/src/k8s.io/controller-manager/options:go_default_library",
"//staging/src/k8s.io/controller-manager/pkg/clientbuilder:go_default_library",
"//staging/src/k8s.io/controller-manager/pkg/features/register:go_default_library",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
)
go_test(
name = "go_default_test",
srcs = ["options_test.go"],
embed = [":go_default_library"],
deps = [
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/diff:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/server/options:go_default_library",
"//staging/src/k8s.io/cloud-provider/app/apis/config:go_default_library",
"//staging/src/k8s.io/cloud-provider/options:go_default_library",
"//staging/src/k8s.io/cloud-provider/service/config:go_default_library",
"//staging/src/k8s.io/component-base/config:go_default_library",
"//staging/src/k8s.io/controller-manager/config:go_default_library",
"//staging/src/k8s.io/controller-manager/options:go_default_library",
"//vendor/github.com/spf13/pflag:go_default_library",
],
)

View File

@@ -1,273 +0,0 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package options
import (
"fmt"
"math/rand"
"net"
"time"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
apiserveroptions "k8s.io/apiserver/pkg/server/options"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/record"
ports "k8s.io/cloud-provider"
ccmconfig "k8s.io/cloud-provider/app/apis/config"
ccmconfigscheme "k8s.io/cloud-provider/app/apis/config/scheme"
ccmconfigv1alpha1 "k8s.io/cloud-provider/app/apis/config/v1alpha1"
cpoptions "k8s.io/cloud-provider/options"
cliflag "k8s.io/component-base/cli/flag"
cmoptions "k8s.io/controller-manager/options"
"k8s.io/controller-manager/pkg/clientbuilder"
cloudcontrollerconfig "k8s.io/kubernetes/cmd/cloud-controller-manager/app/config"
// add the related feature gates
_ "k8s.io/controller-manager/pkg/features/register"
)
const (
// CloudControllerManagerUserAgent is the userAgent name when starting cloud-controller managers.
CloudControllerManagerUserAgent = "cloud-controller-manager"
// DefaultInsecureCloudControllerManagerPort is the default insecure cloud-controller manager port.
DefaultInsecureCloudControllerManagerPort = 0
)
// CloudControllerManagerOptions is the main context object for the controller manager.
type CloudControllerManagerOptions struct {
Generic *cmoptions.GenericControllerManagerConfigurationOptions
KubeCloudShared *cpoptions.KubeCloudSharedOptions
ServiceController *cpoptions.ServiceControllerOptions
SecureServing *apiserveroptions.SecureServingOptionsWithLoopback
// TODO: remove insecure serving mode
InsecureServing *apiserveroptions.DeprecatedInsecureServingOptionsWithLoopback
Authentication *apiserveroptions.DelegatingAuthenticationOptions
Authorization *apiserveroptions.DelegatingAuthorizationOptions
Master string
Kubeconfig string
// NodeStatusUpdateFrequency is the frequency at which the controller updates nodes' status
NodeStatusUpdateFrequency metav1.Duration
}
// NewCloudControllerManagerOptions creates a new ExternalCMServer with a default config.
func NewCloudControllerManagerOptions() (*CloudControllerManagerOptions, error) {
componentConfig, err := NewDefaultComponentConfig(DefaultInsecureCloudControllerManagerPort)
if err != nil {
return nil, err
}
s := CloudControllerManagerOptions{
Generic: cmoptions.NewGenericControllerManagerConfigurationOptions(&componentConfig.Generic),
KubeCloudShared: cpoptions.NewKubeCloudSharedOptions(&componentConfig.KubeCloudShared),
ServiceController: &cpoptions.ServiceControllerOptions{
ServiceControllerConfiguration: &componentConfig.ServiceController,
},
SecureServing: apiserveroptions.NewSecureServingOptions().WithLoopback(),
InsecureServing: (&apiserveroptions.DeprecatedInsecureServingOptions{
BindAddress: net.ParseIP(componentConfig.Generic.Address),
BindPort: int(componentConfig.Generic.Port),
BindNetwork: "tcp",
}).WithLoopback(),
Authentication: apiserveroptions.NewDelegatingAuthenticationOptions(),
Authorization: apiserveroptions.NewDelegatingAuthorizationOptions(),
NodeStatusUpdateFrequency: componentConfig.NodeStatusUpdateFrequency,
}
s.Authentication.RemoteKubeConfigFileOptional = true
s.Authorization.RemoteKubeConfigFileOptional = true
s.Authorization.AlwaysAllowPaths = []string{"/healthz"}
// Set the PairName but leave certificate directory blank to generate in-memory by default
s.SecureServing.ServerCert.CertDirectory = ""
s.SecureServing.ServerCert.PairName = "cloud-controller-manager"
s.SecureServing.BindPort = ports.CloudControllerManagerPort
s.Generic.LeaderElection.ResourceName = "cloud-controller-manager"
s.Generic.LeaderElection.ResourceNamespace = "kube-system"
return &s, nil
}
// NewDefaultComponentConfig returns cloud-controller manager configuration object.
func NewDefaultComponentConfig(insecurePort int32) (*ccmconfig.CloudControllerManagerConfiguration, error) {
versioned := &ccmconfigv1alpha1.CloudControllerManagerConfiguration{}
ccmconfigscheme.Scheme.Default(versioned)
internal := &ccmconfig.CloudControllerManagerConfiguration{}
if err := ccmconfigscheme.Scheme.Convert(versioned, internal, nil); err != nil {
return nil, err
}
internal.Generic.Port = insecurePort
return internal, nil
}
// Flags returns flags for a specific APIServer by section name
func (o *CloudControllerManagerOptions) Flags(allControllers, disabledByDefaultControllers []string) cliflag.NamedFlagSets {
fss := cliflag.NamedFlagSets{}
o.Generic.AddFlags(&fss, allControllers, disabledByDefaultControllers)
o.KubeCloudShared.AddFlags(fss.FlagSet("generic"))
o.ServiceController.AddFlags(fss.FlagSet("service controller"))
o.SecureServing.AddFlags(fss.FlagSet("secure serving"))
o.InsecureServing.AddUnqualifiedFlags(fss.FlagSet("insecure serving"))
o.Authentication.AddFlags(fss.FlagSet("authentication"))
o.Authorization.AddFlags(fss.FlagSet("authorization"))
fs := fss.FlagSet("misc")
fs.StringVar(&o.Master, "master", o.Master, "The address of the Kubernetes API server (overrides any value in kubeconfig).")
fs.StringVar(&o.Kubeconfig, "kubeconfig", o.Kubeconfig, "Path to kubeconfig file with authorization and master location information.")
fs.DurationVar(&o.NodeStatusUpdateFrequency.Duration, "node-status-update-frequency", o.NodeStatusUpdateFrequency.Duration, "Specifies how often the controller updates nodes' status.")
utilfeature.DefaultMutableFeatureGate.AddFlag(fss.FlagSet("generic"))
return fss
}
// ApplyTo fills up cloud controller manager config with options.
func (o *CloudControllerManagerOptions) ApplyTo(c *cloudcontrollerconfig.Config, userAgent string) error {
var err error
if err = o.Generic.ApplyTo(&c.ComponentConfig.Generic); err != nil {
return err
}
if err = o.KubeCloudShared.ApplyTo(&c.ComponentConfig.KubeCloudShared); err != nil {
return err
}
if err = o.ServiceController.ApplyTo(&c.ComponentConfig.ServiceController); err != nil {
return err
}
if err = o.InsecureServing.ApplyTo(&c.InsecureServing, &c.LoopbackClientConfig); err != nil {
return err
}
if err = o.SecureServing.ApplyTo(&c.SecureServing, &c.LoopbackClientConfig); err != nil {
return err
}
if o.SecureServing.BindPort != 0 || o.SecureServing.Listener != nil {
if err = o.Authentication.ApplyTo(&c.Authentication, c.SecureServing, nil); err != nil {
return err
}
if err = o.Authorization.ApplyTo(&c.Authorization); err != nil {
return err
}
}
c.Kubeconfig, err = clientcmd.BuildConfigFromFlags(o.Master, o.Kubeconfig)
if err != nil {
return err
}
c.Kubeconfig.DisableCompression = true
c.Kubeconfig.ContentConfig.AcceptContentTypes = o.Generic.ClientConnection.AcceptContentTypes
c.Kubeconfig.ContentConfig.ContentType = o.Generic.ClientConnection.ContentType
c.Kubeconfig.QPS = o.Generic.ClientConnection.QPS
c.Kubeconfig.Burst = int(o.Generic.ClientConnection.Burst)
c.Client, err = clientset.NewForConfig(restclient.AddUserAgent(c.Kubeconfig, userAgent))
if err != nil {
return err
}
c.LeaderElectionClient = clientset.NewForConfigOrDie(restclient.AddUserAgent(c.Kubeconfig, "leader-election"))
c.EventRecorder = createRecorder(c.Client, userAgent)
rootClientBuilder := clientbuilder.SimpleControllerClientBuilder{
ClientConfig: c.Kubeconfig,
}
if c.ComponentConfig.KubeCloudShared.UseServiceAccountCredentials {
c.ClientBuilder = clientbuilder.SAControllerClientBuilder{
ClientConfig: restclient.AnonymousClientConfig(c.Kubeconfig),
CoreClient: c.Client.CoreV1(),
AuthenticationClient: c.Client.AuthenticationV1(),
Namespace: metav1.NamespaceSystem,
}
} else {
c.ClientBuilder = rootClientBuilder
}
c.VersionedClient = rootClientBuilder.ClientOrDie("shared-informers")
c.SharedInformers = informers.NewSharedInformerFactory(c.VersionedClient, resyncPeriod(c)())
// sync back to component config
// TODO: find more elegant way than syncing back the values.
c.ComponentConfig.Generic.Port = int32(o.InsecureServing.BindPort)
c.ComponentConfig.Generic.Address = o.InsecureServing.BindAddress.String()
c.ComponentConfig.NodeStatusUpdateFrequency = o.NodeStatusUpdateFrequency
return nil
}
// Validate is used to validate config before launching the cloud controller manager
func (o *CloudControllerManagerOptions) Validate(allControllers, disabledByDefaultControllers []string) error {
errors := []error{}
errors = append(errors, o.Generic.Validate(allControllers, disabledByDefaultControllers)...)
errors = append(errors, o.KubeCloudShared.Validate()...)
errors = append(errors, o.ServiceController.Validate()...)
errors = append(errors, o.SecureServing.Validate()...)
errors = append(errors, o.InsecureServing.Validate()...)
errors = append(errors, o.Authentication.Validate()...)
errors = append(errors, o.Authorization.Validate()...)
if len(o.KubeCloudShared.CloudProvider.Name) == 0 {
errors = append(errors, fmt.Errorf("--cloud-provider cannot be empty"))
}
return utilerrors.NewAggregate(errors)
}
// resyncPeriod computes the time interval a shared informer waits before resyncing with the api server
func resyncPeriod(c *cloudcontrollerconfig.Config) func() time.Duration {
return func() time.Duration {
factor := rand.Float64() + 1
return time.Duration(float64(c.ComponentConfig.Generic.MinResyncPeriod.Nanoseconds()) * factor)
}
}
// Config return a cloud controller manager config objective
func (o *CloudControllerManagerOptions) Config(allControllers, disabledByDefaultControllers []string) (*cloudcontrollerconfig.Config, error) {
if err := o.Validate(allControllers, disabledByDefaultControllers); err != nil {
return nil, err
}
if err := o.SecureServing.MaybeDefaultWithSelfSignedCerts("localhost", nil, []net.IP{net.ParseIP("127.0.0.1")}); err != nil {
return nil, fmt.Errorf("error creating self-signed certificates: %v", err)
}
c := &cloudcontrollerconfig.Config{}
if err := o.ApplyTo(c, CloudControllerManagerUserAgent); err != nil {
return nil, err
}
return c, nil
}
func createRecorder(kubeClient clientset.Interface, userAgent string) record.EventRecorder {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartStructuredLogging(0)
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})
return eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: userAgent})
}

View File

@@ -1,263 +0,0 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package options
import (
"net"
"reflect"
"testing"
"time"
"github.com/spf13/pflag"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/diff"
apiserveroptions "k8s.io/apiserver/pkg/server/options"
cpconfig "k8s.io/cloud-provider/app/apis/config"
cpoptions "k8s.io/cloud-provider/options"
serviceconfig "k8s.io/cloud-provider/service/config"
componentbaseconfig "k8s.io/component-base/config"
cmconfig "k8s.io/controller-manager/config"
cmoptions "k8s.io/controller-manager/options"
)
func TestDefaultFlags(t *testing.T) {
s, _ := NewCloudControllerManagerOptions()
expected := &CloudControllerManagerOptions{
Generic: &cmoptions.GenericControllerManagerConfigurationOptions{
GenericControllerManagerConfiguration: &cmconfig.GenericControllerManagerConfiguration{
Port: DefaultInsecureCloudControllerManagerPort, // Note: InsecureServingOptions.ApplyTo will write the flag value back into the component config
Address: "0.0.0.0", // Note: InsecureServingOptions.ApplyTo will write the flag value back into the component config
MinResyncPeriod: metav1.Duration{Duration: 12 * time.Hour},
ClientConnection: componentbaseconfig.ClientConnectionConfiguration{
ContentType: "application/vnd.kubernetes.protobuf",
QPS: 20.0,
Burst: 30,
},
ControllerStartInterval: metav1.Duration{Duration: 0},
LeaderElection: componentbaseconfig.LeaderElectionConfiguration{
ResourceLock: "leases",
LeaderElect: true,
LeaseDuration: metav1.Duration{Duration: 15 * time.Second},
RenewDeadline: metav1.Duration{Duration: 10 * time.Second},
RetryPeriod: metav1.Duration{Duration: 2 * time.Second},
ResourceName: "cloud-controller-manager",
ResourceNamespace: "kube-system",
},
Controllers: []string{"*"},
},
Debugging: &cmoptions.DebuggingOptions{
DebuggingConfiguration: &componentbaseconfig.DebuggingConfiguration{
EnableProfiling: true,
EnableContentionProfiling: false,
},
},
},
KubeCloudShared: &cpoptions.KubeCloudSharedOptions{
KubeCloudSharedConfiguration: &cpconfig.KubeCloudSharedConfiguration{
RouteReconciliationPeriod: metav1.Duration{Duration: 10 * time.Second},
NodeMonitorPeriod: metav1.Duration{Duration: 5 * time.Second},
ClusterName: "kubernetes",
ClusterCIDR: "",
AllocateNodeCIDRs: false,
CIDRAllocatorType: "",
ConfigureCloudRoutes: true,
},
CloudProvider: &cpoptions.CloudProviderOptions{
CloudProviderConfiguration: &cpconfig.CloudProviderConfiguration{
Name: "",
CloudConfigFile: "",
},
},
},
ServiceController: &cpoptions.ServiceControllerOptions{
ServiceControllerConfiguration: &serviceconfig.ServiceControllerConfiguration{
ConcurrentServiceSyncs: 1,
},
},
SecureServing: (&apiserveroptions.SecureServingOptions{
BindPort: 10258,
BindAddress: net.ParseIP("0.0.0.0"),
ServerCert: apiserveroptions.GeneratableKeyCert{
CertDirectory: "",
PairName: "cloud-controller-manager",
},
HTTP2MaxStreamsPerConnection: 0,
}).WithLoopback(),
InsecureServing: (&apiserveroptions.DeprecatedInsecureServingOptions{
BindAddress: net.ParseIP("0.0.0.0"),
BindPort: int(0),
BindNetwork: "tcp",
}).WithLoopback(),
Authentication: &apiserveroptions.DelegatingAuthenticationOptions{
CacheTTL: 10 * time.Second,
ClientCert: apiserveroptions.ClientCertAuthenticationOptions{},
RequestHeader: apiserveroptions.RequestHeaderAuthenticationOptions{
UsernameHeaders: []string{"x-remote-user"},
GroupHeaders: []string{"x-remote-group"},
ExtraHeaderPrefixes: []string{"x-remote-extra-"},
},
RemoteKubeConfigFileOptional: true,
},
Authorization: &apiserveroptions.DelegatingAuthorizationOptions{
AllowCacheTTL: 10 * time.Second,
DenyCacheTTL: 10 * time.Second,
ClientTimeout: 10 * time.Second,
RemoteKubeConfigFileOptional: true,
AlwaysAllowPaths: []string{"/healthz"}, // note: this does not match /healthz/ or
},
Kubeconfig: "",
Master: "",
NodeStatusUpdateFrequency: metav1.Duration{Duration: 5 * time.Minute},
}
if !reflect.DeepEqual(expected, s) {
t.Errorf("Got different run options than expected.\nDifference detected on:\n%s", diff.ObjectReflectDiff(expected, s))
}
}
func TestAddFlags(t *testing.T) {
fs := pflag.NewFlagSet("addflagstest", pflag.ContinueOnError)
s, _ := NewCloudControllerManagerOptions()
for _, f := range s.Flags([]string{""}, []string{""}).FlagSets {
fs.AddFlagSet(f)
}
args := []string{
"--address=192.168.4.10",
"--allocate-node-cidrs=true",
"--bind-address=192.168.4.21",
"--cert-dir=/a/b/c",
"--cloud-config=/cloud-config",
"--cloud-provider=gce",
"--cluster-cidr=1.2.3.4/24",
"--cluster-name=k8s",
"--configure-cloud-routes=false",
"--contention-profiling=true",
"--controller-start-interval=2m",
"--controllers=foo,bar",
"--http2-max-streams-per-connection=47",
"--kube-api-burst=100",
"--kube-api-content-type=application/vnd.kubernetes.protobuf",
"--kube-api-qps=50.0",
"--kubeconfig=/kubeconfig",
"--leader-elect=false",
"--leader-elect-lease-duration=30s",
"--leader-elect-renew-deadline=15s",
"--leader-elect-resource-lock=configmap",
"--leader-elect-retry-period=5s",
"--master=192.168.4.20",
"--min-resync-period=100m",
"--node-status-update-frequency=10m",
"--port=10000",
"--profiling=false",
"--route-reconciliation-period=30s",
"--secure-port=10001",
"--use-service-account-credentials=false",
}
fs.Parse(args)
expected := &CloudControllerManagerOptions{
Generic: &cmoptions.GenericControllerManagerConfigurationOptions{
GenericControllerManagerConfiguration: &cmconfig.GenericControllerManagerConfiguration{
Port: DefaultInsecureCloudControllerManagerPort, // Note: InsecureServingOptions.ApplyTo will write the flag value back into the component config
Address: "0.0.0.0", // Note: InsecureServingOptions.ApplyTo will write the flag value back into the component config
MinResyncPeriod: metav1.Duration{Duration: 100 * time.Minute},
ClientConnection: componentbaseconfig.ClientConnectionConfiguration{
ContentType: "application/vnd.kubernetes.protobuf",
QPS: 50.0,
Burst: 100,
},
ControllerStartInterval: metav1.Duration{Duration: 2 * time.Minute},
LeaderElection: componentbaseconfig.LeaderElectionConfiguration{
ResourceLock: "configmap",
LeaderElect: false,
LeaseDuration: metav1.Duration{Duration: 30 * time.Second},
RenewDeadline: metav1.Duration{Duration: 15 * time.Second},
RetryPeriod: metav1.Duration{Duration: 5 * time.Second},
ResourceName: "cloud-controller-manager",
ResourceNamespace: "kube-system",
},
Controllers: []string{"foo", "bar"},
},
Debugging: &cmoptions.DebuggingOptions{
DebuggingConfiguration: &componentbaseconfig.DebuggingConfiguration{
EnableProfiling: false,
EnableContentionProfiling: true,
},
},
},
KubeCloudShared: &cpoptions.KubeCloudSharedOptions{
KubeCloudSharedConfiguration: &cpconfig.KubeCloudSharedConfiguration{
RouteReconciliationPeriod: metav1.Duration{Duration: 30 * time.Second},
NodeMonitorPeriod: metav1.Duration{Duration: 5 * time.Second},
ClusterName: "k8s",
ClusterCIDR: "1.2.3.4/24",
AllocateNodeCIDRs: true,
CIDRAllocatorType: "RangeAllocator",
ConfigureCloudRoutes: false,
},
CloudProvider: &cpoptions.CloudProviderOptions{
CloudProviderConfiguration: &cpconfig.CloudProviderConfiguration{
Name: "gce",
CloudConfigFile: "/cloud-config",
},
},
},
ServiceController: &cpoptions.ServiceControllerOptions{
ServiceControllerConfiguration: &serviceconfig.ServiceControllerConfiguration{
ConcurrentServiceSyncs: 1,
},
},
SecureServing: (&apiserveroptions.SecureServingOptions{
BindPort: 10001,
BindAddress: net.ParseIP("192.168.4.21"),
ServerCert: apiserveroptions.GeneratableKeyCert{
CertDirectory: "/a/b/c",
PairName: "cloud-controller-manager",
},
HTTP2MaxStreamsPerConnection: 47,
}).WithLoopback(),
InsecureServing: (&apiserveroptions.DeprecatedInsecureServingOptions{
BindAddress: net.ParseIP("192.168.4.10"),
BindPort: int(10000),
BindNetwork: "tcp",
}).WithLoopback(),
Authentication: &apiserveroptions.DelegatingAuthenticationOptions{
CacheTTL: 10 * time.Second,
ClientCert: apiserveroptions.ClientCertAuthenticationOptions{},
RequestHeader: apiserveroptions.RequestHeaderAuthenticationOptions{
UsernameHeaders: []string{"x-remote-user"},
GroupHeaders: []string{"x-remote-group"},
ExtraHeaderPrefixes: []string{"x-remote-extra-"},
},
RemoteKubeConfigFileOptional: true,
},
Authorization: &apiserveroptions.DelegatingAuthorizationOptions{
AllowCacheTTL: 10 * time.Second,
DenyCacheTTL: 10 * time.Second,
ClientTimeout: 10 * time.Second,
RemoteKubeConfigFileOptional: true,
AlwaysAllowPaths: []string{"/healthz"}, // note: this does not match /healthz/ or
},
Kubeconfig: "/kubeconfig",
Master: "192.168.4.20",
NodeStatusUpdateFrequency: metav1.Duration{Duration: 10 * time.Minute},
}
if !reflect.DeepEqual(expected, s) {
t.Errorf("Got different run options than expected.\nDifference detected on:\n%s", diff.ObjectReflectDiff(expected, s))
}
}

View File

@@ -1,31 +0,0 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)
go_library(
name = "go_default_library",
srcs = ["testserver.go"],
importpath = "k8s.io/kubernetes/cmd/cloud-controller-manager/app/testing",
visibility = ["//visibility:public"],
deps = [
"//cmd/cloud-controller-manager/app:go_default_library",
"//cmd/cloud-controller-manager/app/config:go_default_library",
"//cmd/cloud-controller-manager/app/options:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/rest:go_default_library",
"//vendor/github.com/spf13/pflag:go_default_library",
],
)

View File

@@ -1,183 +0,0 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package testing
import (
"context"
"fmt"
"io/ioutil"
"net"
"os"
"time"
"github.com/spf13/pflag"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"
"k8s.io/kubernetes/cmd/cloud-controller-manager/app"
cloudcontrollerconfig "k8s.io/kubernetes/cmd/cloud-controller-manager/app/config"
"k8s.io/kubernetes/cmd/cloud-controller-manager/app/options"
)
// TearDownFunc is to be called to tear down a test server.
type TearDownFunc func()
// TestServer return values supplied by kube-test-ApiServer
type TestServer struct {
LoopbackClientConfig *restclient.Config // Rest client config using the magic token
Options *options.CloudControllerManagerOptions
Config *cloudcontrollerconfig.Config
TearDownFn TearDownFunc // TearDown function
TmpDir string // Temp Dir used, by the apiserver
}
// Logger allows t.Testing and b.Testing to be passed to StartTestServer and StartTestServerOrDie
type Logger interface {
Errorf(format string, args ...interface{})
Fatalf(format string, args ...interface{})
Logf(format string, args ...interface{})
}
// StartTestServer starts a cloud-controller-manager. A rest client config and a tear-down func,
// and location of the tmpdir are returned.
//
// Note: we return a tear-down func instead of a stop channel because the later will leak temporary
// files that because Golang testing's call to os.Exit will not give a stop channel go routine
// enough time to remove temporary files.
func StartTestServer(t Logger, customFlags []string) (result TestServer, err error) {
stopCh := make(chan struct{})
tearDown := func() {
close(stopCh)
if len(result.TmpDir) != 0 {
os.RemoveAll(result.TmpDir)
}
}
defer func() {
if result.TearDownFn == nil {
tearDown()
}
}()
result.TmpDir, err = ioutil.TempDir("", "cloud-controller-manager")
if err != nil {
return result, fmt.Errorf("failed to create temp dir: %v", err)
}
fs := pflag.NewFlagSet("test", pflag.PanicOnError)
s, err := options.NewCloudControllerManagerOptions()
if err != nil {
return TestServer{}, err
}
all, disabled := app.KnownControllers(), app.ControllersDisabledByDefault.List()
namedFlagSets := s.Flags(all, disabled)
for _, f := range namedFlagSets.FlagSets {
fs.AddFlagSet(f)
}
fs.Parse(customFlags)
if s.SecureServing.BindPort != 0 {
s.SecureServing.Listener, s.SecureServing.BindPort, err = createListenerOnFreePort()
if err != nil {
return result, fmt.Errorf("failed to create listener: %v", err)
}
s.SecureServing.ServerCert.CertDirectory = result.TmpDir
t.Logf("cloud-controller-manager will listen securely on port %d...", s.SecureServing.BindPort)
}
if s.InsecureServing.BindPort != 0 {
s.InsecureServing.Listener, s.InsecureServing.BindPort, err = createListenerOnFreePort()
if err != nil {
return result, fmt.Errorf("failed to create listener: %v", err)
}
t.Logf("cloud-controller-manager will listen insecurely on port %d...", s.InsecureServing.BindPort)
}
config, err := s.Config(all, disabled)
if err != nil {
return result, fmt.Errorf("failed to create config from options: %v", err)
}
errCh := make(chan error)
go func(stopCh <-chan struct{}) {
if err := app.Run(config.Complete(), stopCh); err != nil {
errCh <- err
}
}(stopCh)
t.Logf("Waiting for /healthz to be ok...")
client, err := kubernetes.NewForConfig(config.LoopbackClientConfig)
if err != nil {
return result, fmt.Errorf("failed to create a client: %v", err)
}
err = wait.Poll(100*time.Millisecond, 30*time.Second, func() (bool, error) {
select {
case err := <-errCh:
return false, err
default:
}
result := client.CoreV1().RESTClient().Get().AbsPath("/healthz").Do(context.TODO())
status := 0
result.StatusCode(&status)
if status == 200 {
return true, nil
}
return false, nil
})
if err != nil {
return result, fmt.Errorf("failed to wait for /healthz to return ok: %v", err)
}
// from here the caller must call tearDown
result.LoopbackClientConfig = config.LoopbackClientConfig
result.Options = s
result.Config = config
result.TearDownFn = tearDown
return result, nil
}
// StartTestServerOrDie calls StartTestServer t.Fatal if it does not succeed.
func StartTestServerOrDie(t Logger, flags []string) *TestServer {
result, err := StartTestServer(t, flags)
if err == nil {
return &result
}
t.Fatalf("failed to launch server: %v", err)
return nil
}
func createListenerOnFreePort() (net.Listener, int, error) {
ln, err := net.Listen("tcp", ":0")
if err != nil {
return nil, 0, err
}
// get port
tcpAddr, ok := ln.Addr().(*net.TCPAddr)
if !ok {
ln.Close()
return nil, 0, fmt.Errorf("invalid listen address: %q", ln.Addr().String())
}
return ln, tcpAddr.Port, nil
}

View File

@@ -24,9 +24,8 @@ import (
"os"
"time"
"k8s.io/cloud-provider/app"
"k8s.io/component-base/logs"
"k8s.io/kubernetes/cmd/cloud-controller-manager/app"
_ "k8s.io/component-base/metrics/prometheus/clientgo" // load all the prometheus client-go plugins
_ "k8s.io/component-base/metrics/prometheus/version" // for version metric registration
)