Adding config extension to CCM.

This commit is contained in:
cici37 2020-08-03 03:27:45 -07:00
parent 4b65f70652
commit 895a0a8d5e
14 changed files with 732 additions and 60 deletions

View File

@ -1,4 +1,37 @@
rules:
- selectorRegexp: k8s[.]io/kubernetes
allowedPrefixes:
- k8s.io/kubernetes/cmd/cloud-controller-manager
- k8s.io/kubernetes/pkg/api/legacyscheme
- k8s.io/kubernetes/pkg/api/service
- k8s.io/kubernetes/pkg/api/v1/pod
- k8s.io/kubernetes/pkg/apis/apps
- k8s.io/kubernetes/pkg/apis/autoscaling
- k8s.io/kubernetes/pkg/apis/core
- k8s.io/kubernetes/pkg/apis/core/helper
- k8s.io/kubernetes/pkg/apis/core/install
- k8s.io/kubernetes/pkg/apis/core/pods
- k8s.io/kubernetes/pkg/apis/core/v1
- k8s.io/kubernetes/pkg/apis/core/v1/helper
- k8s.io/kubernetes/pkg/apis/core/validation
- k8s.io/kubernetes/pkg/apis/scheduling
- k8s.io/kubernetes/pkg/capabilities
- k8s.io/kubernetes/pkg/cluster/ports
- k8s.io/kubernetes/pkg/controller
- k8s.io/kubernetes/pkg/controller/nodeipam
- k8s.io/kubernetes/pkg/controller/nodeipam/config
- k8s.io/kubernetes/pkg/controller/nodeipam/ipam
- k8s.io/kubernetes/pkg/controller/nodeipam/ipam/cidrset
- k8s.io/kubernetes/pkg/controller/nodeipam/ipam/sync
- k8s.io/kubernetes/pkg/controller/nodeipam/ipam/test
- k8s.io/kubernetes/pkg/controller/testutil
- k8s.io/kubernetes/pkg/controller/util/node
- k8s.io/kubernetes/pkg/features
- k8s.io/kubernetes/pkg/fieldpath
- k8s.io/kubernetes/pkg/kubelet/types
- k8s.io/kubernetes/pkg/kubelet/util/format
- k8s.io/kubernetes/pkg/security/apparmor
- k8s.io/kubernetes/pkg/securitycontext
- k8s.io/kubernetes/pkg/util/hash
- k8s.io/kubernetes/pkg/util/node
- k8s.io/kubernetes/pkg/util/parsers
- k8s.io/kubernetes/pkg/util/taints

View File

@ -18,19 +18,32 @@ go_library(
name = "go_default_library",
srcs = [
"controller-manager.go",
"nodeipamcontroller.go",
"providers.go",
],
importpath = "k8s.io/kubernetes/cmd/cloud-controller-manager",
deps = [
"//pkg/controller/nodeipam:go_default_library",
"//pkg/controller/nodeipam/config:go_default_library",
"//pkg/controller/nodeipam/ipam:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/cloud-provider:go_default_library",
"//staging/src/k8s.io/cloud-provider/app:go_default_library",
"//staging/src/k8s.io/cloud-provider/app/config:go_default_library",
"//staging/src/k8s.io/cloud-provider/options:go_default_library",
"//staging/src/k8s.io/component-base/cli/flag:go_default_library",
"//staging/src/k8s.io/component-base/logs:go_default_library",
"//staging/src/k8s.io/component-base/metrics/prometheus/clientgo:go_default_library",
"//staging/src/k8s.io/component-base/metrics/prometheus/version:go_default_library",
"//staging/src/k8s.io/controller-manager/app:go_default_library",
"//staging/src/k8s.io/legacy-cloud-providers/aws:go_default_library",
"//staging/src/k8s.io/legacy-cloud-providers/azure:go_default_library",
"//staging/src/k8s.io/legacy-cloud-providers/gce:go_default_library",
"//staging/src/k8s.io/legacy-cloud-providers/openstack:go_default_library",
"//staging/src/k8s.io/legacy-cloud-providers/vsphere:go_default_library",
"//vendor/github.com/spf13/pflag:go_default_library",
"//vendor/k8s.io/klog/v2:go_default_library",
"//vendor/k8s.io/utils/net:go_default_library",
],
)

View File

@ -17,32 +17,130 @@ limitations under the License.
// The external controller manager is responsible for running controller loops that
// are cloud provider dependent. It uses the API to listen to new events on resources.
// This file should be written by each cloud provider.
// The current file demonstrate how other cloud provider should leverage CCM and it uses fake parameters. Please modify for your own use.
package main
import (
"fmt"
"math/rand"
"net/http"
"os"
"time"
"github.com/spf13/pflag"
cloudprovider "k8s.io/cloud-provider"
"k8s.io/cloud-provider/app"
cloudcontrollerconfig "k8s.io/cloud-provider/app/config"
"k8s.io/cloud-provider/options"
"k8s.io/component-base/cli/flag"
"k8s.io/component-base/logs"
_ "k8s.io/component-base/metrics/prometheus/clientgo" // load all the prometheus client-go plugins
_ "k8s.io/component-base/metrics/prometheus/version" // for version metric registration
genericcontrollermanager "k8s.io/controller-manager/app"
"k8s.io/klog/v2"
nodeipamconfig "k8s.io/kubernetes/pkg/controller/nodeipam/config"
)
const (
// cloudProviderName shows an sample of using hard coded parameter
cloudProviderName = "SampleCloudProviderName"
// defaultNodeMaskCIDRIPv4 is default mask size for IPv4 node cidr
defaultNodeMaskCIDRIPv4 = 24
// defaultNodeMaskCIDRIPv6 is default mask size for IPv6 node cidr
defaultNodeMaskCIDRIPv6 = 64
)
func main() {
rand.Seed(time.Now().UnixNano())
command := app.NewCloudControllerManagerCommand()
// cloudProviderConfigFile shows an sample of parse config file from flag option
var flagset *pflag.FlagSet = pflag.NewFlagSet("flagSet", pflag.ContinueOnError)
var cloudProviderConfigFile *string = flagset.String("cloud-provider-configfile", "", "This is the sample input for cloud provider config file")
pflag.CommandLine.ParseErrorsWhitelist.UnknownFlags = true
_ = pflag.CommandLine.Parse(os.Args[1:])
// this is an example of allow-listing specific controller loops
controllerList := []string{"cloud-node", "cloud-node-lifecycle", "service", "route"}
s, err := options.NewCloudControllerManagerOptions()
if err != nil {
klog.Fatalf("unable to initialize command options: %v", err)
}
c, err := s.Config(controllerList, app.ControllersDisabledByDefault.List())
if err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
os.Exit(1)
}
cloud, err := cloudprovider.InitCloudProvider(cloudProviderName, *cloudProviderConfigFile)
if err != nil {
klog.Fatalf("Cloud provider could not be initialized: %v", err)
}
if cloud == nil {
klog.Fatalf("cloud provider is nil")
}
if !cloud.HasClusterID() {
if c.ComponentConfig.KubeCloudShared.AllowUntaggedCloud {
klog.Warning("detected a cluster without a ClusterID. A ClusterID will be required in the future. Please tag your cluster to avoid any future issues")
} else {
klog.Fatalf("no ClusterID found. A ClusterID is required for the cloud provider to function properly. This check can be bypassed by setting the allow-untagged-cloud option")
}
}
// Initialize the cloud provider with a reference to the clientBuilder
cloud.Initialize(c.ClientBuilder, make(chan struct{}))
// Set the informer on the user cloud object
if informerUserCloud, ok := cloud.(cloudprovider.InformerUser); ok {
informerUserCloud.SetInformers(c.SharedInformers)
}
controllerInitializers := app.DefaultControllerInitializers(c.Complete(), cloud)
// Here is an example to remove the controller which is not needed.
// e.g. remove the cloud-node-lifecycle controller which current cloud provider does not need.
//delete(controllerInitializers, "cloud-node-lifecycle")
// Here is an example to add an controller(NodeIpamController) which will be used by cloud provider
// generate nodeipamconfig. Here is an sample code. Please pass the right parameter in your code.
// If you do not need additional controller, please ignore.
nodeipamconfig := nodeipamconfig.NodeIPAMControllerConfiguration{
ServiceCIDR: "sample",
SecondaryServiceCIDR: "sample",
NodeCIDRMaskSize: 11,
NodeCIDRMaskSizeIPv4: 11,
NodeCIDRMaskSizeIPv6: 111,
}
controllerInitializers["nodeipam"] = startNodeIpamControllerWrapper(c.Complete(), nodeipamconfig, cloud)
command := app.NewCloudControllerManagerCommand(s, c, controllerInitializers)
// TODO: once we switch everything over to Cobra commands, we can go back to calling
// utilflag.InitFlags() (by removing its pflag.Parse() call). For now, we have to set the
// normalize func and add the go flag set by hand.
// Here is an sample
pflag.CommandLine.SetNormalizeFunc(flag.WordSepNormalizeFunc)
// utilflag.InitFlags()
logs.InitLogs()
defer logs.FlushLogs()
// the flags could be set before execute
command.Flags().VisitAll(func(flag *pflag.Flag) {
if flag.Name == "cloud-provider" {
flag.Value.Set("SampleCloudProviderFlagValue")
return
}
})
if err := command.Execute(); err != nil {
os.Exit(1)
}
}
func startNodeIpamControllerWrapper(ccmconfig *cloudcontrollerconfig.CompletedConfig, nodeipamconfig nodeipamconfig.NodeIPAMControllerConfiguration, cloud cloudprovider.Interface) func(ctx genericcontrollermanager.ControllerContext) (http.Handler, bool, error) {
return func(ctx genericcontrollermanager.ControllerContext) (http.Handler, bool, error) {
return startNodeIpamController(ccmconfig, nodeipamconfig, ctx, cloud)
}
}

View File

@ -0,0 +1,203 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// This file holds the code related with the sample nodeipamcontroller
// which demonstrates how cloud providers add external controllers to cloud-controller-manager
package main
import (
"errors"
"fmt"
"net"
"net/http"
"strings"
utilfeature "k8s.io/apiserver/pkg/util/feature"
cloudprovider "k8s.io/cloud-provider"
"k8s.io/cloud-provider/app"
cloudcontrollerconfig "k8s.io/cloud-provider/app/config"
genericcontrollermanager "k8s.io/controller-manager/app"
"k8s.io/klog/v2"
nodeipamcontroller "k8s.io/kubernetes/pkg/controller/nodeipam"
nodeipamconfig "k8s.io/kubernetes/pkg/controller/nodeipam/config"
"k8s.io/kubernetes/pkg/controller/nodeipam/ipam"
netutils "k8s.io/utils/net"
)
func startNodeIpamController(ccmconfig *cloudcontrollerconfig.CompletedConfig, nodeipamconfig nodeipamconfig.NodeIPAMControllerConfiguration, ctx genericcontrollermanager.ControllerContext, cloud cloudprovider.Interface) (http.Handler, bool, error) {
var serviceCIDR *net.IPNet
var secondaryServiceCIDR *net.IPNet
// should we start nodeIPAM
if !ccmconfig.ComponentConfig.KubeCloudShared.AllocateNodeCIDRs {
return nil, false, nil
}
// failure: bad cidrs in config
clusterCIDRs, dualStack, err := processCIDRs(ccmconfig.ComponentConfig.KubeCloudShared.ClusterCIDR)
if err != nil {
return nil, false, err
}
// failure: more than one cidr and dual stack is not enabled
if len(clusterCIDRs) > 1 && !utilfeature.DefaultFeatureGate.Enabled(app.IPv6DualStack) {
return nil, false, fmt.Errorf("len of ClusterCIDRs==%v and dualstack feature is not enabled", len(clusterCIDRs))
}
// failure: more than one cidr but they are not configured as dual stack
if len(clusterCIDRs) > 1 && !dualStack {
return nil, false, fmt.Errorf("len of ClusterCIDRs==%v and they are not configured as dual stack (at least one from each IPFamily", len(clusterCIDRs))
}
// failure: more than cidrs is not allowed even with dual stack
if len(clusterCIDRs) > 2 {
return nil, false, fmt.Errorf("len of clusters is:%v > more than max allowed of 2", len(clusterCIDRs))
}
// service cidr processing
if len(strings.TrimSpace(nodeipamconfig.ServiceCIDR)) != 0 {
_, serviceCIDR, err = net.ParseCIDR(nodeipamconfig.ServiceCIDR)
if err != nil {
klog.Warningf("Unsuccessful parsing of service CIDR %v: %v", nodeipamconfig.ServiceCIDR, err)
}
}
if len(strings.TrimSpace(nodeipamconfig.SecondaryServiceCIDR)) != 0 {
_, secondaryServiceCIDR, err = net.ParseCIDR(nodeipamconfig.SecondaryServiceCIDR)
if err != nil {
klog.Warningf("Unsuccessful parsing of service CIDR %v: %v", nodeipamconfig.SecondaryServiceCIDR, err)
}
}
// the following checks are triggered if both serviceCIDR and secondaryServiceCIDR are provided
if serviceCIDR != nil && secondaryServiceCIDR != nil {
// should have dual stack flag enabled
if !utilfeature.DefaultFeatureGate.Enabled(app.IPv6DualStack) {
return nil, false, fmt.Errorf("secondary service cidr is provided and IPv6DualStack feature is not enabled")
}
// should be dual stack (from different IPFamilies)
dualstackServiceCIDR, err := netutils.IsDualStackCIDRs([]*net.IPNet{serviceCIDR, secondaryServiceCIDR})
if err != nil {
return nil, false, fmt.Errorf("failed to perform dualstack check on serviceCIDR and secondaryServiceCIDR error:%v", err)
}
if !dualstackServiceCIDR {
return nil, false, fmt.Errorf("serviceCIDR and secondaryServiceCIDR are not dualstack (from different IPfamiles)")
}
}
var nodeCIDRMaskSizeIPv4, nodeCIDRMaskSizeIPv6 int
if utilfeature.DefaultFeatureGate.Enabled(app.IPv6DualStack) {
// only --node-cidr-mask-size-ipv4 and --node-cidr-mask-size-ipv6 supported with dual stack clusters.
// --node-cidr-mask-size flag is incompatible with dual stack clusters.
nodeCIDRMaskSizeIPv4, nodeCIDRMaskSizeIPv6, err = setNodeCIDRMaskSizesDualStack(nodeipamconfig)
} else {
// only --node-cidr-mask-size supported with single stack clusters.
// --node-cidr-mask-size-ipv4 and --node-cidr-mask-size-ipv6 flags are incompatible with dual stack clusters.
nodeCIDRMaskSizeIPv4, nodeCIDRMaskSizeIPv6, err = setNodeCIDRMaskSizes(nodeipamconfig)
}
if err != nil {
return nil, false, err
}
// get list of node cidr mask sizes
nodeCIDRMaskSizes := getNodeCIDRMaskSizes(clusterCIDRs, nodeCIDRMaskSizeIPv4, nodeCIDRMaskSizeIPv6)
nodeIpamController, err := nodeipamcontroller.NewNodeIpamController(
ctx.InformerFactory.Core().V1().Nodes(),
cloud,
ctx.ClientBuilder.ClientOrDie("node-controller"),
clusterCIDRs,
serviceCIDR,
secondaryServiceCIDR,
nodeCIDRMaskSizes,
ipam.CIDRAllocatorType(ccmconfig.ComponentConfig.KubeCloudShared.CIDRAllocatorType),
)
if err != nil {
return nil, true, err
}
go nodeIpamController.Run(ctx.Stop)
return nil, true, nil
}
// processCIDRs is a helper function that works on a comma separated cidrs and returns
// a list of typed cidrs
// a flag if cidrs represents a dual stack
// error if failed to parse any of the cidrs
func processCIDRs(cidrsList string) ([]*net.IPNet, bool, error) {
cidrsSplit := strings.Split(strings.TrimSpace(cidrsList), ",")
cidrs, err := netutils.ParseCIDRs(cidrsSplit)
if err != nil {
return nil, false, err
}
// if cidrs has an error then the previous call will fail
// safe to ignore error checking on next call
dualstack, _ := netutils.IsDualStackCIDRs(cidrs)
return cidrs, dualstack, nil
}
// setNodeCIDRMaskSizes returns the IPv4 and IPv6 node cidr mask sizes.
// If --node-cidr-mask-size not set, then it will return default IPv4 and IPv6 cidr mask sizes.
func setNodeCIDRMaskSizes(cfg nodeipamconfig.NodeIPAMControllerConfiguration) (int, int, error) {
ipv4Mask, ipv6Mask := defaultNodeMaskCIDRIPv4, defaultNodeMaskCIDRIPv6
// NodeCIDRMaskSizeIPv4 and NodeCIDRMaskSizeIPv6 can be used only for dual-stack clusters
if cfg.NodeCIDRMaskSizeIPv4 != 0 || cfg.NodeCIDRMaskSizeIPv6 != 0 {
return ipv4Mask, ipv6Mask, errors.New("usage of --node-cidr-mask-size-ipv4 and --node-cidr-mask-size-ipv6 are not allowed with non dual-stack clusters")
}
if cfg.NodeCIDRMaskSize != 0 {
ipv4Mask = int(cfg.NodeCIDRMaskSize)
ipv6Mask = int(cfg.NodeCIDRMaskSize)
}
return ipv4Mask, ipv6Mask, nil
}
// setNodeCIDRMaskSizesDualStack returns the IPv4 and IPv6 node cidr mask sizes to the value provided
// for --node-cidr-mask-size-ipv4 and --node-cidr-mask-size-ipv6 respectively. If value not provided,
// then it will return default IPv4 and IPv6 cidr mask sizes.
func setNodeCIDRMaskSizesDualStack(cfg nodeipamconfig.NodeIPAMControllerConfiguration) (int, int, error) {
ipv4Mask, ipv6Mask := defaultNodeMaskCIDRIPv4, defaultNodeMaskCIDRIPv6
// NodeCIDRMaskSize can be used only for single stack clusters
if cfg.NodeCIDRMaskSize != 0 {
return ipv4Mask, ipv6Mask, errors.New("usage of --node-cidr-mask-size is not allowed with dual-stack clusters")
}
if cfg.NodeCIDRMaskSizeIPv4 != 0 {
ipv4Mask = int(cfg.NodeCIDRMaskSizeIPv4)
}
if cfg.NodeCIDRMaskSizeIPv6 != 0 {
ipv6Mask = int(cfg.NodeCIDRMaskSizeIPv6)
}
return ipv4Mask, ipv6Mask, nil
}
// getNodeCIDRMaskSizes is a helper function that helps the generate the node cidr mask
// sizes slice based on the cluster cidr slice
func getNodeCIDRMaskSizes(clusterCIDRs []*net.IPNet, maskSizeIPv4, maskSizeIPv6 int) []int {
nodeMaskCIDRs := make([]int, len(clusterCIDRs))
for idx, clusterCIDR := range clusterCIDRs {
if netutils.IsIPv6CIDR(clusterCIDR) {
nodeMaskCIDRs[idx] = maskSizeIPv6
} else {
nodeMaskCIDRs[idx] = maskSizeIPv4
}
}
return nodeMaskCIDRs
}

View File

@ -48,4 +48,5 @@ vendor/k8s.io/client-go/discovery
vendor/k8s.io/client-go/rest
vendor/k8s.io/client-go/rest/watch
vendor/k8s.io/client-go/transport
vendor/k8s.io/cloud-provider/sample
vendor/k8s.io/kubectl/pkg/cmd/scale

View File

@ -46,6 +46,7 @@ filegroup(
"//staging/src/k8s.io/cloud-provider/fake:all-srcs",
"//staging/src/k8s.io/cloud-provider/node:all-srcs",
"//staging/src/k8s.io/cloud-provider/options:all-srcs",
"//staging/src/k8s.io/cloud-provider/sample:all-srcs",
"//staging/src/k8s.io/cloud-provider/service/config:all-srcs",
"//staging/src/k8s.io/cloud-provider/service/helpers:all-srcs",
"//staging/src/k8s.io/cloud-provider/volume:all-srcs",

View File

@ -10,12 +10,19 @@ go_library(
importpath = "k8s.io/cloud-provider/app",
visibility = ["//visibility:public"],
deps = [
"//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/uuid:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/server:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/server/healthz:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/client-go/discovery/cached:go_default_library",
"//staging/src/k8s.io/client-go/informers:go_default_library",
"//staging/src/k8s.io/client-go/metadata:go_default_library",
"//staging/src/k8s.io/client-go/metadata/metadatainformer:go_default_library",
"//staging/src/k8s.io/client-go/restmapper:go_default_library",
"//staging/src/k8s.io/client-go/tools/leaderelection:go_default_library",
"//staging/src/k8s.io/client-go/tools/leaderelection/resourcelock:go_default_library",
"//staging/src/k8s.io/cloud-provider:go_default_library",
@ -32,6 +39,8 @@ go_library(
"//staging/src/k8s.io/component-base/version:go_default_library",
"//staging/src/k8s.io/component-base/version/verflag:go_default_library",
"//staging/src/k8s.io/controller-manager/app:go_default_library",
"//staging/src/k8s.io/controller-manager/pkg/clientbuilder:go_default_library",
"//staging/src/k8s.io/controller-manager/pkg/informerfactory:go_default_library",
"//vendor/github.com/spf13/cobra:go_default_library",
"//vendor/k8s.io/klog/v2:go_default_library",
"//vendor/k8s.io/utils/net:go_default_library",

View File

@ -20,14 +20,24 @@ import (
"context"
"flag"
"fmt"
"math/rand"
"net/http"
"os"
"time"
"github.com/spf13/cobra"
"k8s.io/apimachinery/pkg/runtime/schema"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
cacheddiscovery "k8s.io/client-go/discovery/cached"
"k8s.io/client-go/informers"
"k8s.io/client-go/metadata"
"k8s.io/client-go/metadata/metadatainformer"
"k8s.io/client-go/restmapper"
cloudprovider "k8s.io/cloud-provider"
"k8s.io/cloud-provider/app/config"
cloudcontrollerconfig "k8s.io/cloud-provider/app/config"
"k8s.io/cloud-provider/options"
"k8s.io/controller-manager/pkg/clientbuilder"
"k8s.io/controller-manager/pkg/informerfactory"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/uuid"
@ -54,11 +64,7 @@ const (
)
// NewCloudControllerManagerCommand creates a *cobra.Command object with default parameters
func NewCloudControllerManagerCommand() *cobra.Command {
s, err := options.NewCloudControllerManagerOptions()
if err != nil {
klog.Fatalf("unable to initialize command options: %v", err)
}
func NewCloudControllerManagerCommand(s *options.CloudControllerManagerOptions, c *cloudcontrollerconfig.Config, controllerInitializers map[string]InitFunc) *cobra.Command {
cmd := &cobra.Command{
Use: "cloud-controller-manager",
@ -68,13 +74,7 @@ the cloud specific control loops shipped with Kubernetes.`,
verflag.PrintAndExitIfRequested()
cliflag.PrintFlags(cmd.Flags())
c, err := s.Config(KnownControllers(), ControllersDisabledByDefault.List())
if err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
os.Exit(1)
}
if err := Run(c.Complete(), wait.NeverStop); err != nil {
if err := Run(c.Complete(), controllerInitializers, wait.NeverStop); err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
os.Exit(1)
}
@ -91,7 +91,7 @@ the cloud specific control loops shipped with Kubernetes.`,
}
fs := cmd.Flags()
namedFlagSets := s.Flags(KnownControllers(), ControllersDisabledByDefault.List())
namedFlagSets := s.Flags(KnownControllers(controllerInitializers), ControllersDisabledByDefault.List())
verflag.AddFlags(namedFlagSets.FlagSet("global"))
globalflag.AddGlobalFlags(namedFlagSets.FlagSet("global"), cmd.Name())
@ -122,26 +122,10 @@ the cloud specific control loops shipped with Kubernetes.`,
}
// Run runs the ExternalCMServer. This should never exit.
func Run(c *config.CompletedConfig, stopCh <-chan struct{}) error {
func Run(c *cloudcontrollerconfig.CompletedConfig, controllerInitializers map[string]InitFunc, stopCh <-chan struct{}) error {
// To help debugging, immediately log version
klog.Infof("Version: %+v", version.Get())
cloud, err := cloudprovider.InitCloudProvider(c.ComponentConfig.KubeCloudShared.CloudProvider.Name, c.ComponentConfig.KubeCloudShared.CloudProvider.CloudConfigFile)
if err != nil {
klog.Fatalf("Cloud provider could not be initialized: %v", err)
}
if cloud == nil {
klog.Fatalf("cloud provider is nil")
}
if !cloud.HasClusterID() {
if c.ComponentConfig.KubeCloudShared.AllowUntaggedCloud {
klog.Warning("detected a cluster without a ClusterID. A ClusterID will be required in the future. Please tag your cluster to avoid any future issues")
} else {
klog.Fatalf("no ClusterID found. A ClusterID is required for the cloud provider to function properly. This check can be bypassed by setting the allow-untagged-cloud option")
}
}
// setup /configz endpoint
if cz, err := configz.New(ConfigzName); err == nil {
cz.Set(c.ComponentConfig)
@ -176,7 +160,14 @@ func Run(c *config.CompletedConfig, stopCh <-chan struct{}) error {
}
run := func(ctx context.Context) {
if err := startControllers(c, ctx.Done(), cloud, newControllerInitializers()); err != nil {
clientBuilder := clientbuilder.SimpleControllerClientBuilder{
ClientConfig: c.Kubeconfig,
}
controllerContext, err := CreateControllerContext(c, clientBuilder, ctx.Done())
if err != nil {
klog.Fatalf("error building controller context: %v", err)
}
if err := startControllers(controllerContext, c, ctx.Done(), controllerInitializers); err != nil {
klog.Fatalf("error running controllers: %v", err)
}
}
@ -227,14 +218,7 @@ func Run(c *config.CompletedConfig, stopCh <-chan struct{}) error {
}
// startControllers starts the cloud specific controller loops.
func startControllers(c *config.CompletedConfig, stopCh <-chan struct{}, cloud cloudprovider.Interface, controllers map[string]initFunc) error {
// Initialize the cloud provider with a reference to the clientBuilder
cloud.Initialize(c.ClientBuilder, stopCh)
// Set the informer on the user cloud object
if informerUserCloud, ok := cloud.(cloudprovider.InformerUser); ok {
informerUserCloud.SetInformers(c.SharedInformers)
}
func startControllers(ctx genericcontrollermanager.ControllerContext, c *cloudcontrollerconfig.CompletedConfig, stopCh <-chan struct{}, controllers map[string]InitFunc) error {
for controllerName, initFn := range controllers {
if !genericcontrollermanager.IsControllerEnabled(controllerName, ControllersDisabledByDefault, c.ComponentConfig.Generic.Controllers) {
klog.Warningf("%q is disabled", controllerName)
@ -242,7 +226,7 @@ func startControllers(c *config.CompletedConfig, stopCh <-chan struct{}, cloud c
}
klog.V(1).Infof("Starting %q", controllerName)
_, started, err := initFn(c, cloud, stopCh)
_, started, err := initFn(ctx)
if err != nil {
klog.Errorf("Error starting %q", controllerName)
return err
@ -267,27 +251,136 @@ func startControllers(c *config.CompletedConfig, stopCh <-chan struct{}, cloud c
select {}
}
// initFunc is used to launch a particular controller. It may run additional "should I activate checks".
// InitFunc is used to launch a particular controller. It may run additional "should I activate checks".
// Any error returned will cause the controller process to `Fatal`
// The bool indicates whether the controller was enabled.
type initFunc func(ctx *config.CompletedConfig, cloud cloudprovider.Interface, stop <-chan struct{}) (debuggingHandler http.Handler, enabled bool, err error)
type InitFunc func(ctx genericcontrollermanager.ControllerContext) (debuggingHandler http.Handler, enabled bool, err error)
// KnownControllers indicate the default controller we are known.
func KnownControllers() []string {
ret := sets.StringKeySet(newControllerInitializers())
func KnownControllers(controllerInitializers map[string]InitFunc) []string {
ret := sets.StringKeySet(controllerInitializers)
return ret.List()
}
// ControllersDisabledByDefault is the controller disabled default when starting cloud-controller managers.
var ControllersDisabledByDefault = sets.NewString()
// newControllerInitializers is a private map of named controller groups (you can start more than one in an init func)
// paired to their initFunc. This allows for structured downstream composition and subdivision.
func newControllerInitializers() map[string]initFunc {
controllers := map[string]initFunc{}
controllers["cloud-node"] = startCloudNodeController
controllers["cloud-node-lifecycle"] = startCloudNodeLifecycleController
controllers["service"] = startServiceController
controllers["route"] = startRouteController
// DefaultControllerInitializers is a private map of named controller groups (you can start more than one in an init func)
// paired to their InitFunc. This allows for structured downstream composition and subdivision.
func DefaultControllerInitializers(completedConfig *cloudcontrollerconfig.CompletedConfig, cloud cloudprovider.Interface) map[string]InitFunc {
controllers := map[string]InitFunc{}
controllers["cloud-node"] = StartCloudNodeControllerWrapper(completedConfig, cloud)
controllers["cloud-node-lifecycle"] = startCloudNodeLifecycleControllerWrapper(completedConfig, cloud)
controllers["service"] = startServiceControllerWrapper(completedConfig, cloud)
controllers["route"] = startRouteControllerWrapper(completedConfig, cloud)
return controllers
}
// StartCloudNodeControllerWrapper is used to take cloud cofig as input and start cloud node controller
func StartCloudNodeControllerWrapper(completedConfig *cloudcontrollerconfig.CompletedConfig, cloud cloudprovider.Interface) func(ctx genericcontrollermanager.ControllerContext) (http.Handler, bool, error) {
return func(ctx genericcontrollermanager.ControllerContext) (http.Handler, bool, error) {
return startCloudNodeController(completedConfig, cloud, ctx.Stop)
}
}
// startCloudNodeLifecycleControllerWrapper is used to take cloud cofig as input and start cloud node lifecycle controller
func startCloudNodeLifecycleControllerWrapper(completedConfig *cloudcontrollerconfig.CompletedConfig, cloud cloudprovider.Interface) func(ctx genericcontrollermanager.ControllerContext) (http.Handler, bool, error) {
return func(ctx genericcontrollermanager.ControllerContext) (http.Handler, bool, error) {
return startCloudNodeLifecycleController(completedConfig, cloud, ctx.Stop)
}
}
// startServiceControllerWrapper is used to take cloud cofig as input and start service controller
func startServiceControllerWrapper(completedConfig *cloudcontrollerconfig.CompletedConfig, cloud cloudprovider.Interface) func(ctx genericcontrollermanager.ControllerContext) (http.Handler, bool, error) {
return func(ctx genericcontrollermanager.ControllerContext) (http.Handler, bool, error) {
return startServiceController(completedConfig, cloud, ctx.Stop)
}
}
// startRouteControllerWrapper is used to take cloud cofig as input and start route controller
func startRouteControllerWrapper(completedConfig *cloudcontrollerconfig.CompletedConfig, cloud cloudprovider.Interface) func(ctx genericcontrollermanager.ControllerContext) (http.Handler, bool, error) {
return func(ctx genericcontrollermanager.ControllerContext) (http.Handler, bool, error) {
return startRouteController(completedConfig, cloud, ctx.Stop)
}
}
// CreateControllerContext creates a context struct containing references to resources needed by the
// controllers such as the cloud provider and clientBuilder. rootClientBuilder is only used for
// the shared-informers client and token controller.
func CreateControllerContext(s *cloudcontrollerconfig.CompletedConfig, clientBuilder clientbuilder.ControllerClientBuilder, stop <-chan struct{}) (genericcontrollermanager.ControllerContext, error) {
versionedClient := clientBuilder.ClientOrDie("shared-informers")
sharedInformers := informers.NewSharedInformerFactory(versionedClient, ResyncPeriod(s)())
metadataClient := metadata.NewForConfigOrDie(clientBuilder.ConfigOrDie("metadata-informers"))
metadataInformers := metadatainformer.NewSharedInformerFactory(metadataClient, ResyncPeriod(s)())
// If apiserver is not running we should wait for some time and fail only then. This is particularly
// important when we start apiserver and controller manager at the same time.
if err := genericcontrollermanager.WaitForAPIServer(versionedClient, 10*time.Second); err != nil {
return genericcontrollermanager.ControllerContext{}, fmt.Errorf("failed to wait for apiserver being healthy: %v", err)
}
// Use a discovery client capable of being refreshed.
discoveryClient := clientBuilder.ClientOrDie("controller-discovery")
cachedClient := cacheddiscovery.NewMemCacheClient(discoveryClient.Discovery())
restMapper := restmapper.NewDeferredDiscoveryRESTMapper(cachedClient)
go wait.Until(func() {
restMapper.Reset()
}, 30*time.Second, stop)
availableResources, err := GetAvailableResources(clientBuilder)
if err != nil {
return genericcontrollermanager.ControllerContext{}, err
}
ctx := genericcontrollermanager.ControllerContext{
ClientBuilder: clientBuilder,
InformerFactory: sharedInformers,
ObjectOrMetadataInformerFactory: informerfactory.NewInformerFactory(sharedInformers, metadataInformers),
RESTMapper: restMapper,
AvailableResources: availableResources,
Stop: stop,
InformersStarted: make(chan struct{}),
ResyncPeriod: ResyncPeriod(s),
}
return ctx, nil
}
// GetAvailableResources gets the map which contains all available resources of the apiserver
// TODO: In general, any controller checking this needs to be dynamic so
// users don't have to restart their controller manager if they change the apiserver.
// Until we get there, the structure here needs to be exposed for the construction of a proper ControllerContext.
func GetAvailableResources(clientBuilder clientbuilder.ControllerClientBuilder) (map[schema.GroupVersionResource]bool, error) {
client := clientBuilder.ClientOrDie("controller-discovery")
discoveryClient := client.Discovery()
_, resourceMap, err := discoveryClient.ServerGroupsAndResources()
if err != nil {
utilruntime.HandleError(fmt.Errorf("unable to get all supported resources from server: %v", err))
}
if len(resourceMap) == 0 {
return nil, fmt.Errorf("unable to get any supported resources from server")
}
allResources := map[schema.GroupVersionResource]bool{}
for _, apiResourceList := range resourceMap {
version, err := schema.ParseGroupVersion(apiResourceList.GroupVersion)
if err != nil {
return nil, err
}
for _, apiResource := range apiResourceList.APIResources {
allResources[version.WithResource(apiResource.Name)] = true
}
}
return allResources, nil
}
// ResyncPeriod returns a function which generates a duration each time it is
// invoked; this is so that multiple controllers don't get into lock-step and all
// hammer the apiserver with list requests simultaneously.
func ResyncPeriod(c *cloudcontrollerconfig.CompletedConfig) func() time.Duration {
return func() time.Duration {
factor := rand.Float64() + 1
return time.Duration(float64(c.ComponentConfig.Generic.MinResyncPeriod.Nanoseconds()) * factor)
}
}

View File

@ -24,6 +24,7 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/rest:go_default_library",
"//staging/src/k8s.io/cloud-provider:go_default_library",
"//staging/src/k8s.io/cloud-provider/app:go_default_library",
"//staging/src/k8s.io/cloud-provider/app/config:go_default_library",
"//staging/src/k8s.io/cloud-provider/options:go_default_library",

View File

@ -25,6 +25,7 @@ import (
"time"
"github.com/spf13/pflag"
cloudprovider "k8s.io/cloud-provider"
"k8s.io/cloud-provider/app"
"k8s.io/cloud-provider/app/config"
"k8s.io/cloud-provider/options"
@ -84,13 +85,11 @@ func StartTestServer(t Logger, customFlags []string) (result TestServer, err err
if err != nil {
return TestServer{}, err
}
all, disabled := app.KnownControllers(), app.ControllersDisabledByDefault.List()
namedFlagSets := s.Flags(all, disabled)
namedFlagSets := s.Flags([]string{}, []string{})
for _, f := range namedFlagSets.FlagSets {
fs.AddFlagSet(f)
}
fs.Parse(customFlags)
if s.SecureServing.BindPort != 0 {
s.SecureServing.Listener, s.SecureServing.BindPort, err = createListenerOnFreePort()
if err != nil {
@ -110,14 +109,22 @@ func StartTestServer(t Logger, customFlags []string) (result TestServer, err err
t.Logf("cloud-controller-manager will listen insecurely on port %d...", s.InsecureServing.BindPort)
}
config, err := s.Config(all, disabled)
config, err := s.Config([]string{}, []string{})
if err != nil {
return result, fmt.Errorf("failed to create config from options: %v", err)
}
cloudconfig := config.Complete().ComponentConfig.KubeCloudShared.CloudProvider
cloud, err := cloudprovider.InitCloudProvider(cloudconfig.Name, cloudconfig.CloudConfigFile)
if err != nil {
return result, fmt.Errorf("Cloud provider could not be initialized: %v", err)
}
if cloud == nil {
return result, fmt.Errorf("cloud provider is nil")
}
errCh := make(chan error)
go func(stopCh <-chan struct{}) {
if err := app.Run(config.Complete(), stopCh); err != nil {
if err := app.Run(config.Complete(), app.DefaultControllerInitializers(config.Complete(), cloud), stopCh); err != nil {
errCh <- err
}
}(stopCh)

View File

@ -0,0 +1,34 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library(
name = "go_default_library",
srcs = ["main.go"],
importmap = "k8s.io/kubernetes/vendor/k8s.io/cloud-provider/sample",
importpath = "k8s.io/cloud-provider/sample",
visibility = ["//visibility:public"],
deps = [
"//staging/src/k8s.io/cloud-provider:go_default_library",
"//staging/src/k8s.io/cloud-provider/app:go_default_library",
"//staging/src/k8s.io/cloud-provider/options:go_default_library",
"//staging/src/k8s.io/component-base/cli/flag:go_default_library",
"//staging/src/k8s.io/component-base/logs:go_default_library",
"//staging/src/k8s.io/component-base/metrics/prometheus/clientgo:go_default_library",
"//staging/src/k8s.io/component-base/metrics/prometheus/version:go_default_library",
"//vendor/github.com/spf13/pflag:go_default_library",
"//vendor/k8s.io/klog/v2:go_default_library",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)

View File

@ -0,0 +1,111 @@
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// This file should be written by each cloud provider.
// This is an minimal example. For more details, please refer to k8s.io/kubernetes/cmd/cloud-controller-manager/controller-manager.go
package sample
import (
"fmt"
"math/rand"
"os"
"time"
"github.com/spf13/pflag"
"k8s.io/cloud-provider"
"k8s.io/cloud-provider/app"
"k8s.io/cloud-provider/options"
"k8s.io/component-base/cli/flag"
"k8s.io/component-base/logs"
_ "k8s.io/component-base/metrics/prometheus/clientgo" // load all the prometheus client-go plugins
_ "k8s.io/component-base/metrics/prometheus/version" // for version metric registration
"k8s.io/klog/v2"
)
const (
// cloudProviderName shows an sample of using hard coded parameter
cloudProviderName = "SampleCloudProviderName"
)
func main() {
rand.Seed(time.Now().UnixNano())
// cloudProviderConfigFile shows an sample of parse config file from flag option
var flagset *pflag.FlagSet = pflag.NewFlagSet("flagSet", pflag.ContinueOnError)
var cloudProviderConfigFile *string = flagset.String("cloud-provider-configfile", "", "This is the sample input for cloud provider config file")
pflag.CommandLine.ParseErrorsWhitelist.UnknownFlags = true
_ = pflag.CommandLine.Parse(os.Args[1:])
// this is an example of allow-listing specific controller loops
controllerList := []string{"cloud-node", "cloud-node-lifecycle", "service", "route"}
s, err := options.NewCloudControllerManagerOptions()
if err != nil {
klog.Fatalf("unable to initialize command options: %v", err)
}
c, err := s.Config(controllerList, app.ControllersDisabledByDefault.List())
if err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
os.Exit(1)
}
cloud, err := cloudprovider.InitCloudProvider(cloudProviderName, *cloudProviderConfigFile)
if err != nil {
klog.Fatalf("Cloud provider could not be initialized: %v", err)
}
if cloud == nil {
klog.Fatalf("cloud provider is nil")
}
if !cloud.HasClusterID() {
if c.ComponentConfig.KubeCloudShared.AllowUntaggedCloud {
klog.Warning("detected a cluster without a ClusterID. A ClusterID will be required in the future. Please tag your cluster to avoid any future issues")
} else {
klog.Fatalf("no ClusterID found. A ClusterID is required for the cloud provider to function properly. This check can be bypassed by setting the allow-untagged-cloud option")
}
}
// Initialize the cloud provider with a reference to the clientBuilder
cloud.Initialize(c.ClientBuilder, make(chan struct{}))
// Set the informer on the user cloud object
if informerUserCloud, ok := cloud.(cloudprovider.InformerUser); ok {
informerUserCloud.SetInformers(c.SharedInformers)
}
controllerInitializers := app.DefaultControllerInitializers(c.Complete(), cloud)
command := app.NewCloudControllerManagerCommand(s, c, controllerInitializers)
// TODO: once we switch everything over to Cobra commands, we can go back to calling
// utilflag.InitFlags() (by removing its pflag.Parse() call). For now, we have to set the
// normalize func and add the go flag set by hand.
// Here is an sample
pflag.CommandLine.SetNormalizeFunc(flag.WordSepNormalizeFunc)
// utilflag.InitFlags()
logs.InitLogs()
defer logs.FlushLogs()
// the flags could be set before execute
command.Flags().VisitAll(func(flag *pflag.Flag) {
if flag.Name == "cloud-provider" {
flag.Value.Set("SampleCloudProviderFlagValue")
return
}
})
if err := command.Execute(); err != nil {
os.Exit(1)
}
}

View File

@ -3,6 +3,7 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = [
"controllercontext.go",
"helper.go",
"serve.go",
],
@ -10,6 +11,7 @@ go_library(
importpath = "k8s.io/controller-manager/app",
visibility = ["//visibility:public"],
deps = [
"//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/endpoints/filters:go_default_library",
@ -19,12 +21,16 @@ go_library(
"//staging/src/k8s.io/apiserver/pkg/server/healthz:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/server/mux:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/server/routes:go_default_library",
"//staging/src/k8s.io/client-go/informers:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/scheme:go_default_library",
"//staging/src/k8s.io/client-go/restmapper:go_default_library",
"//staging/src/k8s.io/component-base/config:go_default_library",
"//staging/src/k8s.io/component-base/configz:go_default_library",
"//staging/src/k8s.io/component-base/metrics/legacyregistry:go_default_library",
"//staging/src/k8s.io/component-base/metrics/prometheus/workqueue:go_default_library",
"//staging/src/k8s.io/controller-manager/pkg/clientbuilder:go_default_library",
"//staging/src/k8s.io/controller-manager/pkg/informerfactory:go_default_library",
"//vendor/k8s.io/klog/v2:go_default_library",
],
)

View File

@ -0,0 +1,62 @@
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package app
import (
"time"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/informers"
"k8s.io/client-go/restmapper"
"k8s.io/controller-manager/pkg/clientbuilder"
"k8s.io/controller-manager/pkg/informerfactory"
)
// ControllerContext defines the context object for controller
type ControllerContext struct {
// ClientBuilder will provide a client for this controller to use
ClientBuilder clientbuilder.ControllerClientBuilder
// InformerFactory gives access to informers for the controller.
InformerFactory informers.SharedInformerFactory
// ObjectOrMetadataInformerFactory gives access to informers for typed resources
// and dynamic resources by their metadata. All generic controllers currently use
// object metadata - if a future controller needs access to the full object this
// would become GenericInformerFactory and take a dynamic client.
ObjectOrMetadataInformerFactory informerfactory.InformerFactory
// DeferredDiscoveryRESTMapper is a RESTMapper that will defer
// initialization of the RESTMapper until the first mapping is
// requested.
RESTMapper *restmapper.DeferredDiscoveryRESTMapper
// AvailableResources is a map listing currently available resources
AvailableResources map[schema.GroupVersionResource]bool
// Stop is the stop channel
Stop <-chan struct{}
// InformersStarted is closed after all of the controllers have been initialized and are running. After this point it is safe,
// for an individual controller to start the shared informers. Before it is closed, they should not.
InformersStarted chan struct{}
// ResyncPeriod generates a duration each time it is invoked; this is so that
// multiple controllers don't get into lock-step and all hammer the apiserver
// with list requests simultaneously.
ResyncPeriod func() time.Duration
}