add cloud-controller-manager as the first step in breaking controller-manager

This commit is contained in:
wlan0 2016-10-07 17:39:21 -07:00
parent 731616e0b2
commit 1e48fd18cb
13 changed files with 572 additions and 57 deletions

View File

@ -0,0 +1,31 @@
package(default_visibility = ["//visibility:public"])
licenses(["notice"])
load(
"@io_bazel_rules_go//go:def.bzl",
"go_binary",
"go_library",
"go_test",
"cgo_library",
)
go_binary(
name = "cloud-controller-manager",
srcs = ["controller-manager.go"],
tags = ["automanaged"],
deps = [
"//cmd/cloud-controller-manager/app:go_default_library",
"//cmd/cloud-controller-manager/app/options:go_default_library",
"//pkg/client/metrics/prometheus:go_default_library",
"//pkg/cloudprovider:go_default_library",
"//pkg/cloudprovider/providers:go_default_library",
"//pkg/healthz:go_default_library",
"//pkg/util/flag:go_default_library",
"//pkg/util/logs:go_default_library",
"//pkg/version/prometheus:go_default_library",
"//pkg/version/verflag:go_default_library",
"//vendor:github.com/golang/glog",
"//vendor:github.com/spf13/pflag",
],
)

View File

@ -0,0 +1,41 @@
package(default_visibility = ["//visibility:public"])
licenses(["notice"])
load(
"@io_bazel_rules_go//go:def.bzl",
"go_binary",
"go_library",
"go_test",
"cgo_library",
)
go_library(
name = "go_default_library",
srcs = ["controllermanager.go"],
tags = ["automanaged"],
deps = [
"//cmd/cloud-controller-manager/app/options:go_default_library",
"//pkg/api/v1:go_default_library",
"//pkg/client/clientset_generated/release_1_5:go_default_library",
"//pkg/client/clientset_generated/release_1_5/typed/core/v1:go_default_library",
"//pkg/client/leaderelection:go_default_library",
"//pkg/client/leaderelection/resourcelock:go_default_library",
"//pkg/client/record:go_default_library",
"//pkg/client/restclient:go_default_library",
"//pkg/client/unversioned/clientcmd:go_default_library",
"//pkg/cloudprovider:go_default_library",
"//pkg/controller:go_default_library",
"//pkg/controller/cloud:go_default_library",
"//pkg/controller/informers:go_default_library",
"//pkg/controller/route:go_default_library",
"//pkg/controller/service:go_default_library",
"//pkg/healthz:go_default_library",
"//pkg/util/configz:go_default_library",
"//pkg/util/wait:go_default_library",
"//vendor:github.com/golang/glog",
"//vendor:github.com/prometheus/client_golang/prometheus",
"//vendor:github.com/spf13/cobra",
"//vendor:github.com/spf13/pflag",
],
)

View File

@ -0,0 +1,253 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package app implements a server that runs a set of active
// components. This includes replication controllers, service endpoints and
// nodes.
//
// CAUTION: If you update code in this file, you may need to also update code
// in contrib/mesos/pkg/controllermanager/controllermanager.go
package app
import (
"math/rand"
"net"
"net/http"
"net/http/pprof"
"os"
"strconv"
"time"
"k8s.io/kubernetes/cmd/cloud-controller-manager/app/options"
"k8s.io/kubernetes/pkg/api/v1"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_5"
v1core "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_5/typed/core/v1"
"k8s.io/kubernetes/pkg/client/leaderelection"
"k8s.io/kubernetes/pkg/client/leaderelection/resourcelock"
"k8s.io/kubernetes/pkg/client/record"
"k8s.io/kubernetes/pkg/client/restclient"
"k8s.io/kubernetes/pkg/client/unversioned/clientcmd"
"k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/controller"
nodecontroller "k8s.io/kubernetes/pkg/controller/cloud"
"k8s.io/kubernetes/pkg/controller/informers"
routecontroller "k8s.io/kubernetes/pkg/controller/route"
servicecontroller "k8s.io/kubernetes/pkg/controller/service"
"k8s.io/kubernetes/pkg/healthz"
"k8s.io/kubernetes/pkg/util/configz"
"k8s.io/kubernetes/pkg/util/wait"
"github.com/golang/glog"
"github.com/prometheus/client_golang/prometheus"
"github.com/spf13/cobra"
"github.com/spf13/pflag"
)
const (
// Jitter used when starting controller managers
ControllerStartJitter = 1.0
)
// NewControllerManagerCommand creates a *cobra.Command object with default parameters
func NewControllerManagerCommand() *cobra.Command {
s := options.NewExternalCMServer()
s.AddFlags(pflag.CommandLine)
cmd := &cobra.Command{
Use: "cloud-controller-manager",
Long: `The Kubernetes controller manager is a daemon that embeds
the core control loops shipped with Kubernetes. In applications of robotics and
automation, a control loop is a non-terminating loop that regulates the state of
the system. In Kubernetes, a controller is a control loop that watches the shared
state of the cluster through the apiserver and makes changes attempting to move the
current state towards the desired state. Examples of controllers that ship with
Kubernetes today are the replication controller, endpoints controller, namespace
controller, and serviceaccounts controller.`,
Run: func(cmd *cobra.Command, args []string) {
},
}
return cmd
}
func ResyncPeriod(s *options.ExternalCMServer) func() time.Duration {
return func() time.Duration {
factor := rand.Float64() + 1
return time.Duration(float64(s.MinResyncPeriod.Nanoseconds()) * factor)
}
}
// Run runs the ExternalCMServer. This should never exit.
func Run(s *options.ExternalCMServer, cloud cloudprovider.Interface) error {
if c, err := configz.New("componentconfig"); err == nil {
c.Set(s.KubeControllerManagerConfiguration)
} else {
glog.Errorf("unable to register configz: %s", err)
}
kubeconfig, err := clientcmd.BuildConfigFromFlags(s.Master, s.Kubeconfig)
if err != nil {
return err
}
kubeconfig.ContentConfig.ContentType = s.ContentType
// Override kubeconfig qps/burst settings from flags
kubeconfig.QPS = s.KubeAPIQPS
kubeconfig.Burst = int(s.KubeAPIBurst)
kubeClient, err := clientset.NewForConfig(restclient.AddUserAgent(kubeconfig, "cloud-controller-manager"))
if err != nil {
glog.Fatalf("Invalid API configuration: %v", err)
}
leaderElectionClient := clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "leader-election"))
go func() {
mux := http.NewServeMux()
healthz.InstallHandler(mux)
if s.EnableProfiling {
mux.HandleFunc("/debug/pprof/", pprof.Index)
mux.HandleFunc("/debug/pprof/profile", pprof.Profile)
mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
}
configz.InstallHandler(mux)
mux.Handle("/metrics", prometheus.Handler())
server := &http.Server{
Addr: net.JoinHostPort(s.Address, strconv.Itoa(int(s.Port))),
Handler: mux,
}
glog.Fatal(server.ListenAndServe())
}()
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(glog.Infof)
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.Core().Events("")})
recorder := eventBroadcaster.NewRecorder(v1.EventSource{Component: "cloud-controller-manager"})
run := func(stop <-chan struct{}) {
rootClientBuilder := controller.SimpleControllerClientBuilder{
ClientConfig: kubeconfig,
}
var clientBuilder controller.ControllerClientBuilder
if len(s.ServiceAccountKeyFile) > 0 && s.UseServiceAccountCredentials {
clientBuilder = controller.SAControllerClientBuilder{
ClientConfig: restclient.AnonymousClientConfig(kubeconfig),
CoreClient: kubeClient.Core(),
Namespace: "kube-system",
}
} else {
clientBuilder = rootClientBuilder
}
err := StartControllers(s, kubeconfig, rootClientBuilder, clientBuilder, stop, recorder, cloud)
glog.Fatalf("error running controllers: %v", err)
panic("unreachable")
}
if !s.LeaderElection.LeaderElect {
run(nil)
panic("unreachable")
}
id, err := os.Hostname()
if err != nil {
return err
}
// TODO: enable other lock types
rl := resourcelock.EndpointsLock{
EndpointsMeta: v1.ObjectMeta{
Namespace: "kube-system",
Name: "cloud-controller-manager",
},
Client: leaderElectionClient,
LockConfig: resourcelock.ResourceLockConfig{
Identity: id + "-external-cloud-controller",
EventRecorder: recorder,
},
}
leaderelection.RunOrDie(leaderelection.LeaderElectionConfig{
Lock: &rl,
LeaseDuration: s.LeaderElection.LeaseDuration.Duration,
RenewDeadline: s.LeaderElection.RenewDeadline.Duration,
RetryPeriod: s.LeaderElection.RetryPeriod.Duration,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: run,
OnStoppedLeading: func() {
glog.Fatalf("leaderelection lost")
},
},
})
panic("unreachable")
}
func StartControllers(s *options.ExternalCMServer, kubeconfig *restclient.Config, rootClientBuilder, clientBuilder controller.ControllerClientBuilder, stop <-chan struct{}, recorder record.EventRecorder, cloud cloudprovider.Interface) error {
client := func(serviceAccountName string) clientset.Interface {
return rootClientBuilder.ClientOrDie(serviceAccountName)
}
sharedInformers := informers.NewSharedInformerFactory(client("shared-informers"), nil, ResyncPeriod(s)())
_, clusterCIDR, err := net.ParseCIDR(s.ClusterCIDR)
if err != nil {
glog.Warningf("Unsuccessful parsing of cluster CIDR %v: %v", s.ClusterCIDR, err)
}
nodeController, err := nodecontroller.NewCloudNodeController(
sharedInformers.Nodes(),
client("cloud-node-controller"), cloud,
s.NodeMonitorPeriod.Duration)
if err != nil {
glog.Fatalf("Failed to initialize nodecontroller: %v", err)
}
nodeController.Run()
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
serviceController, err := servicecontroller.New(cloud, client("service-controller"), s.ClusterName)
if err != nil {
glog.Errorf("Failed to start service controller: %v", err)
} else {
serviceController.Run(int(s.ConcurrentServiceSyncs))
}
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
if s.AllocateNodeCIDRs && s.ConfigureCloudRoutes {
if routes, ok := cloud.Routes(); !ok {
glog.Warning("configure-cloud-routes is set, but cloud provider does not support routes. Will not configure cloud provider routes.")
} else {
routeController := routecontroller.New(routes, client("route-controller"), s.ClusterName, clusterCIDR)
routeController.Run(s.RouteReconciliationPeriod.Duration)
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
}
} else {
glog.Infof("Will not configure cloud provider routes for allocate-node-cidrs: %v, configure-cloud-routes: %v.", s.AllocateNodeCIDRs, s.ConfigureCloudRoutes)
}
// If apiserver is not running we should wait for some time and fail only then. This is particularly
// important when we start apiserver and controller manager at the same time.
var versionStrings []string
err = wait.PollImmediate(time.Second, 10*time.Second, func() (bool, error) {
if versionStrings, err = restclient.ServerAPIVersions(kubeconfig); err == nil {
return true, nil
}
glog.Errorf("Failed to get api versions from server: %v", err)
return false, nil
})
if err != nil {
glog.Fatalf("Failed to get api versions from server: %v", err)
}
sharedInformers.Start(stop)
select {}
}

View File

@ -0,0 +1,25 @@
package(default_visibility = ["//visibility:public"])
licenses(["notice"])
load(
"@io_bazel_rules_go//go:def.bzl",
"go_binary",
"go_library",
"go_test",
"cgo_library",
)
go_library(
name = "go_default_library",
srcs = ["options.go"],
tags = ["automanaged"],
deps = [
"//pkg/apis/componentconfig:go_default_library",
"//pkg/apis/meta/v1:go_default_library",
"//pkg/client/leaderelection:go_default_library",
"//pkg/master/ports:go_default_library",
"//pkg/util/config:go_default_library",
"//vendor:github.com/spf13/pflag",
],
)

View File

@ -0,0 +1,95 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package options provides the flags used for the controller manager.
//
// CAUTION: If you update code in this file, you may need to also update code
// in contrib/mesos/pkg/controllermanager/controllermanager.go
package options
import (
"time"
"k8s.io/kubernetes/pkg/apis/componentconfig"
metav1 "k8s.io/kubernetes/pkg/apis/meta/v1"
"k8s.io/kubernetes/pkg/client/leaderelection"
"k8s.io/kubernetes/pkg/master/ports"
"k8s.io/kubernetes/pkg/util/config"
"github.com/spf13/pflag"
)
// ExternalCMServer is the main context object for the controller manager.
type ExternalCMServer struct {
componentconfig.KubeControllerManagerConfiguration
Master string
Kubeconfig string
}
// NewCMServer creates a new ExternalCMServer with a default config.
func NewExternalCMServer() *ExternalCMServer {
s := ExternalCMServer{
KubeControllerManagerConfiguration: componentconfig.KubeControllerManagerConfiguration{
Port: ports.ControllerManagerPort,
Address: "0.0.0.0",
ConcurrentServiceSyncs: 1,
MinResyncPeriod: metav1.Duration{Duration: 12 * time.Hour},
RegisterRetryCount: 10,
NodeMonitorPeriod: metav1.Duration{Duration: 5 * time.Second},
ClusterName: "kubernetes",
ConfigureCloudRoutes: true,
ContentType: "application/vnd.kubernetes.protobuf",
KubeAPIQPS: 20.0,
KubeAPIBurst: 30,
LeaderElection: leaderelection.DefaultLeaderElectionConfiguration(),
ControllerStartInterval: metav1.Duration{Duration: 0 * time.Second},
EnableGarbageCollector: true,
ConcurrentGCSyncs: 20,
ClusterSigningCertFile: "/etc/kubernetes/ca/ca.pem",
ClusterSigningKeyFile: "/etc/kubernetes/ca/ca.key",
},
}
s.LeaderElection.LeaderElect = true
return &s
}
// AddFlags adds flags for a specific ExternalCMServer to the specified FlagSet
func (s *ExternalCMServer) AddFlags(fs *pflag.FlagSet) {
fs.Int32Var(&s.Port, "port", s.Port, "The port that the controller-manager's http service runs on")
fs.Var(componentconfig.IPVar{Val: &s.Address}, "address", "The IP address to serve on (set to 0.0.0.0 for all interfaces)")
fs.StringVar(&s.CloudProvider, "cloud-provider", s.CloudProvider, "The provider of cloud services. Empty for no provider.")
fs.StringVar(&s.CloudConfigFile, "cloud-config", s.CloudConfigFile, "The path to the cloud provider configuration file. Empty string for no configuration file.")
fs.DurationVar(&s.MinResyncPeriod.Duration, "min-resync-period", s.MinResyncPeriod.Duration, "The resync period in reflectors will be random between MinResyncPeriod and 2*MinResyncPeriod")
fs.DurationVar(&s.NodeMonitorPeriod.Duration, "node-monitor-period", s.NodeMonitorPeriod.Duration,
"The period for syncing NodeStatus in NodeController.")
fs.StringVar(&s.ServiceAccountKeyFile, "service-account-private-key-file", s.ServiceAccountKeyFile, "Filename containing a PEM-encoded private RSA or ECDSA key used to sign service account tokens.")
fs.BoolVar(&s.UseServiceAccountCredentials, "use-service-account-credentials", s.UseServiceAccountCredentials, "If true, use individual service account credentials for each controller.")
fs.DurationVar(&s.RouteReconciliationPeriod.Duration, "route-reconciliation-period", s.RouteReconciliationPeriod.Duration, "The period for reconciling routes created for Nodes by cloud provider.")
fs.BoolVar(&s.ConfigureCloudRoutes, "configure-cloud-routes", true, "Should CIDRs allocated by allocate-node-cidrs be configured on the cloud provider.")
fs.BoolVar(&s.EnableProfiling, "profiling", true, "Enable profiling via web interface host:port/debug/pprof/")
fs.StringVar(&s.ClusterCIDR, "cluster-cidr", s.ClusterCIDR, "CIDR Range for Pods in cluster.")
fs.BoolVar(&s.AllocateNodeCIDRs, "allocate-node-cidrs", false, "Should CIDRs for Pods be allocated and set on the cloud provider.")
fs.StringVar(&s.Master, "master", s.Master, "The address of the Kubernetes API server (overrides any value in kubeconfig)")
fs.StringVar(&s.Kubeconfig, "kubeconfig", s.Kubeconfig, "Path to kubeconfig file with authorization and master location information.")
fs.StringVar(&s.ContentType, "kube-api-content-type", s.ContentType, "Content type of requests sent to apiserver.")
fs.Float32Var(&s.KubeAPIQPS, "kube-api-qps", s.KubeAPIQPS, "QPS to use while talking with kubernetes apiserver")
fs.Int32Var(&s.KubeAPIBurst, "kube-api-burst", s.KubeAPIBurst, "Burst to use while talking with kubernetes apiserver")
fs.DurationVar(&s.ControllerStartInterval.Duration, "controller-start-interval", s.ControllerStartInterval.Duration, "Interval between starting controller managers.")
leaderelection.BindFlags(&s.LeaderElection, fs)
config.DefaultFeatureGate.AddFlag(fs)
}

View File

@ -0,0 +1,64 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// The external controller manager is responsible for running controller loops that
// are cloud provider dependent. It uses the API to listen for new events on resources.
package main
import (
"fmt"
"os"
"k8s.io/kubernetes/cmd/cloud-controller-manager/app"
"k8s.io/kubernetes/cmd/cloud-controller-manager/app/options"
_ "k8s.io/kubernetes/pkg/client/metrics/prometheus" // for client metric registration
"k8s.io/kubernetes/pkg/cloudprovider"
_ "k8s.io/kubernetes/pkg/cloudprovider/providers"
"k8s.io/kubernetes/pkg/healthz"
"k8s.io/kubernetes/pkg/util/flag"
"k8s.io/kubernetes/pkg/util/logs"
_ "k8s.io/kubernetes/pkg/version/prometheus" // for version metric registration
"k8s.io/kubernetes/pkg/version/verflag"
"github.com/golang/glog"
"github.com/spf13/pflag"
)
func init() {
healthz.DefaultHealthz()
}
func main() {
s := options.NewExternalCMServer()
s.AddFlags(pflag.CommandLine)
flag.InitFlags()
logs.InitLogs()
defer logs.FlushLogs()
verflag.PrintAndExitIfRequested()
cloud, err := cloudprovider.InitCloudProvider(s.CloudProvider, s.CloudConfigFile)
if err != nil {
glog.Fatalf("Cloud provider could not be initialized: %v", err)
}
if err := app.Run(s, cloud); err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
os.Exit(1)
}
}

View File

@ -378,7 +378,9 @@ func StartControllers(controllers map[string]InitFunc, s *options.CMServer, root
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
if s.AllocateNodeCIDRs && s.ConfigureCloudRoutes {
if cloud == nil {
if s.CloudProvider == "external" {
glog.Warning("configure-cloud-routes is set, but external cloudprovider is specified. This manager will not configure cloud provider routes.")
} else if cloud == nil {
glog.Warning("configure-cloud-routes is set, but no cloud provider specified. Will not configure cloud provider routes.")
} else if routes, ok := cloud.Routes(); !ok {
glog.Warning("configure-cloud-routes is set, but cloud provider does not support routes. Will not configure cloud provider routes.")

View File

@ -2,6 +2,7 @@ cluster/addons/fluentd-elasticsearch/es-image
cluster/images/etcd/attachlease
cluster/images/etcd/rollback
cmd/clicheck
cmd/cloud-controller-manager
cmd/gendocs
cmd/genkubedocs
cmd/genman

View File

@ -90,6 +90,11 @@ func InitCloudProvider(name string, configFilePath string) (Interface, error) {
var cloud Interface
var err error
if name == "external" {
glog.Info("cloud provider external specified.")
return nil, nil
}
if name == "" {
glog.Info("No cloud provider specified.")
return nil, nil

View File

@ -12,12 +12,13 @@ load(
go_library(
name = "go_default_library",
srcs = ["cloud_node_controller.go"],
srcs = ["nodecontroller.go"],
tags = ["automanaged"],
deps = [
"//pkg/api:go_default_library",
"//pkg/client/clientset_generated/internalclientset:go_default_library",
"//pkg/client/clientset_generated/internalclientset/typed/core/internalversion:go_default_library",
"//pkg/api/v1:go_default_library",
"//pkg/apis/meta/v1:go_default_library",
"//pkg/client/clientset_generated/release_1_5:go_default_library",
"//pkg/client/clientset_generated/release_1_5/typed/core/v1:go_default_library",
"//pkg/client/record:go_default_library",
"//pkg/cloudprovider:go_default_library",
"//pkg/controller/informers:go_default_library",
@ -30,19 +31,19 @@ go_library(
go_test(
name = "go_default_test",
srcs = ["cloud_node_controller_test.go"],
srcs = ["nodecontroller_test.go"],
library = "go_default_library",
tags = ["automanaged"],
deps = [
"//pkg/api:go_default_library",
"//pkg/api/unversioned:go_default_library",
"//pkg/client/clientset_generated/internalclientset/fake:go_default_library",
"//pkg/api/v1:go_default_library",
"//pkg/apis/meta/v1:go_default_library",
"//pkg/client/clientset_generated/release_1_5/fake:go_default_library",
"//pkg/client/record:go_default_library",
"//pkg/cloudprovider:go_default_library",
"//pkg/cloudprovider/providers/fake:go_default_library",
"//pkg/controller:go_default_library",
"//pkg/controller/informers:go_default_library",
"//pkg/controller/node:go_default_library",
"//pkg/controller/node/testutil:go_default_library",
"//pkg/util/wait:go_default_library",
"//vendor:github.com/golang/glog",
],

View File

@ -22,9 +22,10 @@ import (
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
unversionedcore "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/internalversion"
"k8s.io/kubernetes/pkg/api/v1"
metav1 "k8s.io/kubernetes/pkg/apis/meta/v1"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_5"
v1core "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_5/typed/core/v1"
"k8s.io/kubernetes/pkg/client/record"
"k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/controller/informers"
@ -61,11 +62,11 @@ func NewCloudNodeController(
nodeMonitorPeriod time.Duration) (*CloudNodeController, error) {
eventBroadcaster := record.NewBroadcaster()
recorder := eventBroadcaster.NewRecorder(api.EventSource{Component: "cloudcontrollermanager"})
recorder := eventBroadcaster.NewRecorder(v1.EventSource{Component: "cloudcontrollermanager"})
eventBroadcaster.StartLogging(glog.Infof)
if kubeClient != nil {
glog.V(0).Infof("Sending events to api server.")
eventBroadcaster.StartRecordingToSink(&unversionedcore.EventSinkImpl{Interface: kubeClient.Core().Events("")})
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.Core().Events("")})
} else {
glog.V(0).Infof("No api server defined - no events will be sent to API server.")
}
@ -87,23 +88,23 @@ func (cnc *CloudNodeController) Run() {
defer utilruntime.HandleCrash()
go wait.Until(func() {
nodes, err := cnc.kubeClient.Core().Nodes().List(api.ListOptions{ResourceVersion: "0"})
nodes, err := cnc.kubeClient.Core().Nodes().List(v1.ListOptions{ResourceVersion: "0"})
if err != nil {
glog.Errorf("Error monitoring node status: %v", err)
}
for i := range nodes.Items {
var currentReadyCondition *api.NodeCondition
var currentReadyCondition *v1.NodeCondition
node := &nodes.Items[i]
// Try to get the current node status
// If node status is empty, then kubelet has not posted ready status yet. In this case, process next node
for rep := 0; rep < nodeStatusUpdateRetry; rep++ {
_, currentReadyCondition = api.GetNodeCondition(&node.Status, api.NodeReady)
_, currentReadyCondition = v1.GetNodeCondition(&node.Status, v1.NodeReady)
if currentReadyCondition != nil {
break
}
name := node.Name
node, err = cnc.kubeClient.Core().Nodes().Get(name)
node, err = cnc.kubeClient.Core().Nodes().Get(name, metav1.GetOptions{})
if err != nil {
glog.Errorf("Failed while getting a Node to retry updating NodeStatus. Probably Node %s was deleted.", name)
break
@ -117,7 +118,7 @@ func (cnc *CloudNodeController) Run() {
// If the known node status says that Node is NotReady, then check if the node has been removed
// from the cloud provider. If node cannot be found in cloudprovider, then delete the node immediately
if currentReadyCondition != nil {
if currentReadyCondition.Status != api.ConditionTrue {
if currentReadyCondition.Status != v1.ConditionTrue {
instances, ok := cnc.cloud.Instances()
if !ok {
glog.Errorf("cloud provider does not support instances.")
@ -128,14 +129,14 @@ func (cnc *CloudNodeController) Run() {
if _, err := instances.ExternalID(types.NodeName(node.Name)); err != nil {
if err == cloudprovider.InstanceNotFound {
glog.V(2).Infof("Deleting node no longer present in cloud provider: %s", node.Name)
ref := &api.ObjectReference{
ref := &v1.ObjectReference{
Kind: "Node",
Name: node.Name,
UID: types.UID(node.UID),
Namespace: "",
}
glog.V(2).Infof("Recording %s event message for node %s", "DeletingNode", node.Name)
cnc.recorder.Eventf(ref, api.EventTypeNormal, fmt.Sprintf("Deleting Node %v because it's not present according to cloud provider", node.Name), "Node %s event: %s", node.Name, "DeletingNode")
cnc.recorder.Eventf(ref, v1.EventTypeNormal, fmt.Sprintf("Deleting Node %v because it's not present according to cloud provider", node.Name), "Node %s event: %s", node.Name, "DeletingNode")
go func(nodeName string) {
defer utilruntime.HandleCrash()
if err := cnc.kubeClient.Core().Nodes().Delete(node.Name, nil); err != nil {

View File

@ -22,82 +22,82 @@ import (
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake"
"k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/client/clientset_generated/release_1_5/fake"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/unversioned"
metav1 "k8s.io/kubernetes/pkg/apis/meta/v1"
"k8s.io/kubernetes/pkg/client/record"
"k8s.io/kubernetes/pkg/cloudprovider"
fakecloud "k8s.io/kubernetes/pkg/cloudprovider/providers/fake"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/informers"
"k8s.io/kubernetes/pkg/controller/node"
"k8s.io/kubernetes/pkg/controller/node/testutil"
"k8s.io/kubernetes/pkg/util/wait"
)
// This test checks that the node is deleted when kubelet stops reporting
// and cloud provider says node is gone
func TestNodeDeleted(t *testing.T) {
pod0 := &api.Pod{
ObjectMeta: api.ObjectMeta{
pod0 := &v1.Pod{
ObjectMeta: v1.ObjectMeta{
Namespace: "default",
Name: "pod0",
},
Spec: api.PodSpec{
Spec: v1.PodSpec{
NodeName: "node0",
},
Status: api.PodStatus{
Conditions: []api.PodCondition{
Status: v1.PodStatus{
Conditions: []v1.PodCondition{
{
Type: api.PodReady,
Status: api.ConditionTrue,
Type: v1.PodReady,
Status: v1.ConditionTrue,
},
},
},
}
pod1 := &api.Pod{
ObjectMeta: api.ObjectMeta{
pod1 := &v1.Pod{
ObjectMeta: v1.ObjectMeta{
Namespace: "default",
Name: "pod1",
},
Spec: api.PodSpec{
Spec: v1.PodSpec{
NodeName: "node0",
},
Status: api.PodStatus{
Conditions: []api.PodCondition{
Status: v1.PodStatus{
Conditions: []v1.PodCondition{
{
Type: api.PodReady,
Status: api.ConditionTrue,
Type: v1.PodReady,
Status: v1.ConditionTrue,
},
},
},
}
fnh := &node.FakeNodeHandler{
Existing: []*api.Node{
fnh := &testutil.FakeNodeHandler{
Existing: []*v1.Node{
{
ObjectMeta: api.ObjectMeta{
ObjectMeta: v1.ObjectMeta{
Name: "node0",
CreationTimestamp: unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
CreationTimestamp: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
},
Status: api.NodeStatus{
Conditions: []api.NodeCondition{
Status: v1.NodeStatus{
Conditions: []v1.NodeCondition{
{
Type: api.NodeReady,
Status: api.ConditionUnknown,
LastHeartbeatTime: unversioned.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
LastTransitionTime: unversioned.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
Type: v1.NodeReady,
Status: v1.ConditionUnknown,
LastHeartbeatTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
LastTransitionTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
},
},
},
},
},
Clientset: fake.NewSimpleClientset(&api.PodList{Items: []api.Pod{*pod0, *pod1}}),
Clientset: fake.NewSimpleClientset(&v1.PodList{Items: []v1.Pod{*pod0, *pod1}}),
DeleteWaitChan: make(chan struct{}),
}
factory := informers.NewSharedInformerFactory(fnh, controller.NoResyncPeriodFunc())
factory := informers.NewSharedInformerFactory(fnh, nil, controller.NoResyncPeriodFunc())
eventBroadcaster := record.NewBroadcaster()
cloudNodeController := &CloudNodeController{
@ -105,7 +105,7 @@ func TestNodeDeleted(t *testing.T) {
nodeInformer: factory.Nodes(),
cloud: &fakecloud.FakeCloud{Err: cloudprovider.InstanceNotFound},
nodeMonitorPeriod: 5 * time.Second,
recorder: eventBroadcaster.NewRecorder(api.EventSource{Component: "controllermanager"}),
recorder: eventBroadcaster.NewRecorder(v1.EventSource{Component: "controllermanager"}),
}
eventBroadcaster.StartLogging(glog.Infof)

View File

@ -1261,11 +1261,7 @@ func TestCloudProviderNoRateLimit(t *testing.T) {
},
},
},
<<<<<<< HEAD
Clientset: fake.NewSimpleClientset(&v1.PodList{Items: []v1.Pod{*testutil.NewPod("pod0", "node0"), *testutil.NewPod("pod1", "node0")}}),
=======
Clientset: fake.NewSimpleClientset(&api.PodList{Items: []api.Pod{*newPod("pod0", "node0"), *newPod("pod1", "node0")}}),
>>>>>>> start breaking up controller manager into two pieces
DeleteWaitChan: make(chan struct{}),
}
nodeController, _ := NewNodeControllerFromClient(nil, fnh, 10*time.Minute,