From 731616e0b2e35bb359339f06cd2d7c1967e0391f Mon Sep 17 00:00:00 2001 From: wlan0 Date: Wed, 23 Nov 2016 18:30:01 -0800 Subject: [PATCH 1/3] start breaking up controller manager into two pieces Addresses: kubernetes/features#88 This commit starts breaking the controller manager into two pieces, namely, 1. cloudprovider dependent piece 2. coudprovider agnostic piece the controller manager has the following control loops - - nodeController - volumeController - routeController - serviceController - replicationController - endpointController - resourcequotacontroller - namespacecontroller - deploymentController etc.. among the above controller loops, - nodeController - volumeController - routeController - serviceController are cloud provider dependent. As kubernetes has evolved tremendously, it has become difficult for different cloudproviders (currently 8), to make changes and iterate quickly. Moreover, the cloudproviders are constrained by the kubernetes build/release lifecycle. This commit is the first step in moving towards a kubernetes code base where cloud providers specific code will move out of the core repository, and will be maintained by the cloud providers themselves. Finally, along with the controller-manager, the kubelet also has cloud-provider specific code, and that will be addressed in a different commit/issue. --- pkg/controller/cloud/BUILD | 49 ++++++ pkg/controller/cloud/cloud_node_controller.go | 153 ++++++++++++++++++ .../cloud/cloud_node_controller_test.go | 122 ++++++++++++++ pkg/controller/node/nodecontroller_test.go | 4 + 4 files changed, 328 insertions(+) create mode 100644 pkg/controller/cloud/BUILD create mode 100644 pkg/controller/cloud/cloud_node_controller.go create mode 100644 pkg/controller/cloud/cloud_node_controller_test.go diff --git a/pkg/controller/cloud/BUILD b/pkg/controller/cloud/BUILD new file mode 100644 index 00000000000..2fba1528357 --- /dev/null +++ b/pkg/controller/cloud/BUILD @@ -0,0 +1,49 @@ +package(default_visibility = ["//visibility:public"]) + +licenses(["notice"]) + +load( + "@io_bazel_rules_go//go:def.bzl", + "go_binary", + "go_library", + "go_test", + "cgo_library", +) + +go_library( + name = "go_default_library", + srcs = ["cloud_node_controller.go"], + tags = ["automanaged"], + deps = [ + "//pkg/api:go_default_library", + "//pkg/client/clientset_generated/internalclientset:go_default_library", + "//pkg/client/clientset_generated/internalclientset/typed/core/internalversion:go_default_library", + "//pkg/client/record:go_default_library", + "//pkg/cloudprovider:go_default_library", + "//pkg/controller/informers:go_default_library", + "//pkg/types:go_default_library", + "//pkg/util/runtime:go_default_library", + "//pkg/util/wait:go_default_library", + "//vendor:github.com/golang/glog", + ], +) + +go_test( + name = "go_default_test", + srcs = ["cloud_node_controller_test.go"], + library = "go_default_library", + tags = ["automanaged"], + deps = [ + "//pkg/api:go_default_library", + "//pkg/api/unversioned:go_default_library", + "//pkg/client/clientset_generated/internalclientset/fake:go_default_library", + "//pkg/client/record:go_default_library", + "//pkg/cloudprovider:go_default_library", + "//pkg/cloudprovider/providers/fake:go_default_library", + "//pkg/controller:go_default_library", + "//pkg/controller/informers:go_default_library", + "//pkg/controller/node:go_default_library", + "//pkg/util/wait:go_default_library", + "//vendor:github.com/golang/glog", + ], +) diff --git a/pkg/controller/cloud/cloud_node_controller.go b/pkg/controller/cloud/cloud_node_controller.go new file mode 100644 index 00000000000..29d5edf0149 --- /dev/null +++ b/pkg/controller/cloud/cloud_node_controller.go @@ -0,0 +1,153 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cloud + +import ( + "fmt" + "time" + + "github.com/golang/glog" + + "k8s.io/kubernetes/pkg/api" + clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" + unversionedcore "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/internalversion" + "k8s.io/kubernetes/pkg/client/record" + "k8s.io/kubernetes/pkg/cloudprovider" + "k8s.io/kubernetes/pkg/controller/informers" + "k8s.io/kubernetes/pkg/types" + utilruntime "k8s.io/kubernetes/pkg/util/runtime" + "k8s.io/kubernetes/pkg/util/wait" +) + +type CloudNodeController struct { + nodeInformer informers.NodeInformer + kubeClient clientset.Interface + recorder record.EventRecorder + + cloud cloudprovider.Interface + + // Value controlling NodeController monitoring period, i.e. how often does NodeController + // check node status posted from kubelet. This value should be lower than nodeMonitorGracePeriod + // set in controller-manager + nodeMonitorPeriod time.Duration +} + +const ( + // nodeStatusUpdateRetry controls the number of retries of writing NodeStatus update. + nodeStatusUpdateRetry = 5 + + // The amount of time the nodecontroller should sleep between retrying NodeStatus updates + retrySleepTime = 20 * time.Millisecond +) + +func NewCloudNodeController( + nodeInformer informers.NodeInformer, + kubeClient clientset.Interface, + cloud cloudprovider.Interface, + nodeMonitorPeriod time.Duration) (*CloudNodeController, error) { + + eventBroadcaster := record.NewBroadcaster() + recorder := eventBroadcaster.NewRecorder(api.EventSource{Component: "cloudcontrollermanager"}) + eventBroadcaster.StartLogging(glog.Infof) + if kubeClient != nil { + glog.V(0).Infof("Sending events to api server.") + eventBroadcaster.StartRecordingToSink(&unversionedcore.EventSinkImpl{Interface: kubeClient.Core().Events("")}) + } else { + glog.V(0).Infof("No api server defined - no events will be sent to API server.") + } + + cnc := &CloudNodeController{ + nodeInformer: nodeInformer, + kubeClient: kubeClient, + recorder: recorder, + cloud: cloud, + nodeMonitorPeriod: nodeMonitorPeriod, + } + return cnc, nil +} + +// This controller deletes a node if kubelet is not reporting +// and the node is gone from the cloud provider. +func (cnc *CloudNodeController) Run() { + go func() { + defer utilruntime.HandleCrash() + + go wait.Until(func() { + nodes, err := cnc.kubeClient.Core().Nodes().List(api.ListOptions{ResourceVersion: "0"}) + if err != nil { + glog.Errorf("Error monitoring node status: %v", err) + } + + for i := range nodes.Items { + var currentReadyCondition *api.NodeCondition + node := &nodes.Items[i] + // Try to get the current node status + // If node status is empty, then kubelet has not posted ready status yet. In this case, process next node + for rep := 0; rep < nodeStatusUpdateRetry; rep++ { + _, currentReadyCondition = api.GetNodeCondition(&node.Status, api.NodeReady) + if currentReadyCondition != nil { + break + } + name := node.Name + node, err = cnc.kubeClient.Core().Nodes().Get(name) + if err != nil { + glog.Errorf("Failed while getting a Node to retry updating NodeStatus. Probably Node %s was deleted.", name) + break + } + time.Sleep(retrySleepTime) + } + if currentReadyCondition == nil { + glog.Errorf("Update status of Node %v from CloudNodeController exceeds retry count.", node.Name) + continue + } + // If the known node status says that Node is NotReady, then check if the node has been removed + // from the cloud provider. If node cannot be found in cloudprovider, then delete the node immediately + if currentReadyCondition != nil { + if currentReadyCondition.Status != api.ConditionTrue { + instances, ok := cnc.cloud.Instances() + if !ok { + glog.Errorf("cloud provider does not support instances.") + continue + } + // Check with the cloud provider to see if the node still exists. If it + // doesn't, delete the node immediately. + if _, err := instances.ExternalID(types.NodeName(node.Name)); err != nil { + if err == cloudprovider.InstanceNotFound { + glog.V(2).Infof("Deleting node no longer present in cloud provider: %s", node.Name) + ref := &api.ObjectReference{ + Kind: "Node", + Name: node.Name, + UID: types.UID(node.UID), + Namespace: "", + } + glog.V(2).Infof("Recording %s event message for node %s", "DeletingNode", node.Name) + cnc.recorder.Eventf(ref, api.EventTypeNormal, fmt.Sprintf("Deleting Node %v because it's not present according to cloud provider", node.Name), "Node %s event: %s", node.Name, "DeletingNode") + go func(nodeName string) { + defer utilruntime.HandleCrash() + if err := cnc.kubeClient.Core().Nodes().Delete(node.Name, nil); err != nil { + glog.Errorf("unable to delete node %q: %v", node.Name, err) + } + }(node.Name) + } + glog.Errorf("Error getting node data from cloud: %v", err) + } + } + } + } + }, cnc.nodeMonitorPeriod, wait.NeverStop) + }() +} diff --git a/pkg/controller/cloud/cloud_node_controller_test.go b/pkg/controller/cloud/cloud_node_controller_test.go new file mode 100644 index 00000000000..c3771dd8b5c --- /dev/null +++ b/pkg/controller/cloud/cloud_node_controller_test.go @@ -0,0 +1,122 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cloud + +import ( + "testing" + "time" + + "github.com/golang/glog" + + "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake" + + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/unversioned" + "k8s.io/kubernetes/pkg/client/record" + "k8s.io/kubernetes/pkg/cloudprovider" + fakecloud "k8s.io/kubernetes/pkg/cloudprovider/providers/fake" + "k8s.io/kubernetes/pkg/controller" + "k8s.io/kubernetes/pkg/controller/informers" + "k8s.io/kubernetes/pkg/controller/node" + "k8s.io/kubernetes/pkg/util/wait" +) + +// This test checks that the node is deleted when kubelet stops reporting +// and cloud provider says node is gone +func TestNodeDeleted(t *testing.T) { + pod0 := &api.Pod{ + ObjectMeta: api.ObjectMeta{ + Namespace: "default", + Name: "pod0", + }, + Spec: api.PodSpec{ + NodeName: "node0", + }, + Status: api.PodStatus{ + Conditions: []api.PodCondition{ + { + Type: api.PodReady, + Status: api.ConditionTrue, + }, + }, + }, + } + + pod1 := &api.Pod{ + ObjectMeta: api.ObjectMeta{ + Namespace: "default", + Name: "pod1", + }, + Spec: api.PodSpec{ + NodeName: "node0", + }, + Status: api.PodStatus{ + Conditions: []api.PodCondition{ + { + Type: api.PodReady, + Status: api.ConditionTrue, + }, + }, + }, + } + + fnh := &node.FakeNodeHandler{ + Existing: []*api.Node{ + { + ObjectMeta: api.ObjectMeta{ + Name: "node0", + CreationTimestamp: unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), + }, + Status: api.NodeStatus{ + Conditions: []api.NodeCondition{ + { + Type: api.NodeReady, + Status: api.ConditionUnknown, + LastHeartbeatTime: unversioned.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC), + LastTransitionTime: unversioned.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC), + }, + }, + }, + }, + }, + Clientset: fake.NewSimpleClientset(&api.PodList{Items: []api.Pod{*pod0, *pod1}}), + DeleteWaitChan: make(chan struct{}), + } + + factory := informers.NewSharedInformerFactory(fnh, controller.NoResyncPeriodFunc()) + + eventBroadcaster := record.NewBroadcaster() + cloudNodeController := &CloudNodeController{ + kubeClient: fnh, + nodeInformer: factory.Nodes(), + cloud: &fakecloud.FakeCloud{Err: cloudprovider.InstanceNotFound}, + nodeMonitorPeriod: 5 * time.Second, + recorder: eventBroadcaster.NewRecorder(api.EventSource{Component: "controllermanager"}), + } + eventBroadcaster.StartLogging(glog.Infof) + + cloudNodeController.Run() + + select { + case <-fnh.DeleteWaitChan: + case <-time.After(wait.ForeverTestTimeout): + t.Errorf("Timed out waiting %v for node to be deleted", wait.ForeverTestTimeout) + } + if len(fnh.DeletedNodes) != 1 || fnh.DeletedNodes[0].Name != "node0" { + t.Errorf("Node was not deleted") + } +} diff --git a/pkg/controller/node/nodecontroller_test.go b/pkg/controller/node/nodecontroller_test.go index 012ade79b1c..bf8b2e25866 100644 --- a/pkg/controller/node/nodecontroller_test.go +++ b/pkg/controller/node/nodecontroller_test.go @@ -1261,7 +1261,11 @@ func TestCloudProviderNoRateLimit(t *testing.T) { }, }, }, +<<<<<<< HEAD Clientset: fake.NewSimpleClientset(&v1.PodList{Items: []v1.Pod{*testutil.NewPod("pod0", "node0"), *testutil.NewPod("pod1", "node0")}}), +======= + Clientset: fake.NewSimpleClientset(&api.PodList{Items: []api.Pod{*newPod("pod0", "node0"), *newPod("pod1", "node0")}}), +>>>>>>> start breaking up controller manager into two pieces DeleteWaitChan: make(chan struct{}), } nodeController, _ := NewNodeControllerFromClient(nil, fnh, 10*time.Minute, From 1e48fd18cbc57a3c9b95d6e97542b21972b65cda Mon Sep 17 00:00:00 2001 From: wlan0 Date: Fri, 7 Oct 2016 17:39:21 -0700 Subject: [PATCH 2/3] add cloud-controller-manager as the first step in breaking controller-manager --- cmd/cloud-controller-manager/BUILD | 31 +++ cmd/cloud-controller-manager/app/BUILD | 41 +++ .../app/controllermanager.go | 253 ++++++++++++++++++ .../app/options/BUILD | 25 ++ .../app/options/options.go | 95 +++++++ .../controller-manager.go | 64 +++++ .../app/controllermanager.go | 4 +- hack/.linted_packages | 1 + pkg/cloudprovider/plugins.go | 5 + pkg/controller/cloud/BUILD | 19 +- ...d_node_controller.go => nodecontroller.go} | 25 +- ...troller_test.go => nodecontroller_test.go} | 62 ++--- pkg/controller/node/nodecontroller_test.go | 4 - 13 files changed, 572 insertions(+), 57 deletions(-) create mode 100644 cmd/cloud-controller-manager/BUILD create mode 100644 cmd/cloud-controller-manager/app/BUILD create mode 100644 cmd/cloud-controller-manager/app/controllermanager.go create mode 100644 cmd/cloud-controller-manager/app/options/BUILD create mode 100644 cmd/cloud-controller-manager/app/options/options.go create mode 100644 cmd/cloud-controller-manager/controller-manager.go rename pkg/controller/cloud/{cloud_node_controller.go => nodecontroller.go} (81%) rename pkg/controller/cloud/{cloud_node_controller_test.go => nodecontroller_test.go} (60%) diff --git a/cmd/cloud-controller-manager/BUILD b/cmd/cloud-controller-manager/BUILD new file mode 100644 index 00000000000..aac1159749a --- /dev/null +++ b/cmd/cloud-controller-manager/BUILD @@ -0,0 +1,31 @@ +package(default_visibility = ["//visibility:public"]) + +licenses(["notice"]) + +load( + "@io_bazel_rules_go//go:def.bzl", + "go_binary", + "go_library", + "go_test", + "cgo_library", +) + +go_binary( + name = "cloud-controller-manager", + srcs = ["controller-manager.go"], + tags = ["automanaged"], + deps = [ + "//cmd/cloud-controller-manager/app:go_default_library", + "//cmd/cloud-controller-manager/app/options:go_default_library", + "//pkg/client/metrics/prometheus:go_default_library", + "//pkg/cloudprovider:go_default_library", + "//pkg/cloudprovider/providers:go_default_library", + "//pkg/healthz:go_default_library", + "//pkg/util/flag:go_default_library", + "//pkg/util/logs:go_default_library", + "//pkg/version/prometheus:go_default_library", + "//pkg/version/verflag:go_default_library", + "//vendor:github.com/golang/glog", + "//vendor:github.com/spf13/pflag", + ], +) diff --git a/cmd/cloud-controller-manager/app/BUILD b/cmd/cloud-controller-manager/app/BUILD new file mode 100644 index 00000000000..996464b0de8 --- /dev/null +++ b/cmd/cloud-controller-manager/app/BUILD @@ -0,0 +1,41 @@ +package(default_visibility = ["//visibility:public"]) + +licenses(["notice"]) + +load( + "@io_bazel_rules_go//go:def.bzl", + "go_binary", + "go_library", + "go_test", + "cgo_library", +) + +go_library( + name = "go_default_library", + srcs = ["controllermanager.go"], + tags = ["automanaged"], + deps = [ + "//cmd/cloud-controller-manager/app/options:go_default_library", + "//pkg/api/v1:go_default_library", + "//pkg/client/clientset_generated/release_1_5:go_default_library", + "//pkg/client/clientset_generated/release_1_5/typed/core/v1:go_default_library", + "//pkg/client/leaderelection:go_default_library", + "//pkg/client/leaderelection/resourcelock:go_default_library", + "//pkg/client/record:go_default_library", + "//pkg/client/restclient:go_default_library", + "//pkg/client/unversioned/clientcmd:go_default_library", + "//pkg/cloudprovider:go_default_library", + "//pkg/controller:go_default_library", + "//pkg/controller/cloud:go_default_library", + "//pkg/controller/informers:go_default_library", + "//pkg/controller/route:go_default_library", + "//pkg/controller/service:go_default_library", + "//pkg/healthz:go_default_library", + "//pkg/util/configz:go_default_library", + "//pkg/util/wait:go_default_library", + "//vendor:github.com/golang/glog", + "//vendor:github.com/prometheus/client_golang/prometheus", + "//vendor:github.com/spf13/cobra", + "//vendor:github.com/spf13/pflag", + ], +) diff --git a/cmd/cloud-controller-manager/app/controllermanager.go b/cmd/cloud-controller-manager/app/controllermanager.go new file mode 100644 index 00000000000..e4e2fa4bb4d --- /dev/null +++ b/cmd/cloud-controller-manager/app/controllermanager.go @@ -0,0 +1,253 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package app implements a server that runs a set of active +// components. This includes replication controllers, service endpoints and +// nodes. +// +// CAUTION: If you update code in this file, you may need to also update code +// in contrib/mesos/pkg/controllermanager/controllermanager.go +package app + +import ( + "math/rand" + "net" + "net/http" + "net/http/pprof" + "os" + "strconv" + "time" + + "k8s.io/kubernetes/cmd/cloud-controller-manager/app/options" + "k8s.io/kubernetes/pkg/api/v1" + clientset "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_5" + v1core "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_5/typed/core/v1" + "k8s.io/kubernetes/pkg/client/leaderelection" + "k8s.io/kubernetes/pkg/client/leaderelection/resourcelock" + "k8s.io/kubernetes/pkg/client/record" + "k8s.io/kubernetes/pkg/client/restclient" + "k8s.io/kubernetes/pkg/client/unversioned/clientcmd" + "k8s.io/kubernetes/pkg/cloudprovider" + "k8s.io/kubernetes/pkg/controller" + nodecontroller "k8s.io/kubernetes/pkg/controller/cloud" + "k8s.io/kubernetes/pkg/controller/informers" + routecontroller "k8s.io/kubernetes/pkg/controller/route" + servicecontroller "k8s.io/kubernetes/pkg/controller/service" + "k8s.io/kubernetes/pkg/healthz" + "k8s.io/kubernetes/pkg/util/configz" + "k8s.io/kubernetes/pkg/util/wait" + + "github.com/golang/glog" + "github.com/prometheus/client_golang/prometheus" + "github.com/spf13/cobra" + "github.com/spf13/pflag" +) + +const ( + // Jitter used when starting controller managers + ControllerStartJitter = 1.0 +) + +// NewControllerManagerCommand creates a *cobra.Command object with default parameters +func NewControllerManagerCommand() *cobra.Command { + s := options.NewExternalCMServer() + s.AddFlags(pflag.CommandLine) + cmd := &cobra.Command{ + Use: "cloud-controller-manager", + Long: `The Kubernetes controller manager is a daemon that embeds +the core control loops shipped with Kubernetes. In applications of robotics and +automation, a control loop is a non-terminating loop that regulates the state of +the system. In Kubernetes, a controller is a control loop that watches the shared +state of the cluster through the apiserver and makes changes attempting to move the +current state towards the desired state. Examples of controllers that ship with +Kubernetes today are the replication controller, endpoints controller, namespace +controller, and serviceaccounts controller.`, + Run: func(cmd *cobra.Command, args []string) { + }, + } + + return cmd +} + +func ResyncPeriod(s *options.ExternalCMServer) func() time.Duration { + return func() time.Duration { + factor := rand.Float64() + 1 + return time.Duration(float64(s.MinResyncPeriod.Nanoseconds()) * factor) + } +} + +// Run runs the ExternalCMServer. This should never exit. +func Run(s *options.ExternalCMServer, cloud cloudprovider.Interface) error { + if c, err := configz.New("componentconfig"); err == nil { + c.Set(s.KubeControllerManagerConfiguration) + } else { + glog.Errorf("unable to register configz: %s", err) + } + kubeconfig, err := clientcmd.BuildConfigFromFlags(s.Master, s.Kubeconfig) + if err != nil { + return err + } + + kubeconfig.ContentConfig.ContentType = s.ContentType + // Override kubeconfig qps/burst settings from flags + kubeconfig.QPS = s.KubeAPIQPS + kubeconfig.Burst = int(s.KubeAPIBurst) + kubeClient, err := clientset.NewForConfig(restclient.AddUserAgent(kubeconfig, "cloud-controller-manager")) + if err != nil { + glog.Fatalf("Invalid API configuration: %v", err) + } + leaderElectionClient := clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "leader-election")) + + go func() { + mux := http.NewServeMux() + healthz.InstallHandler(mux) + if s.EnableProfiling { + mux.HandleFunc("/debug/pprof/", pprof.Index) + mux.HandleFunc("/debug/pprof/profile", pprof.Profile) + mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol) + } + configz.InstallHandler(mux) + mux.Handle("/metrics", prometheus.Handler()) + + server := &http.Server{ + Addr: net.JoinHostPort(s.Address, strconv.Itoa(int(s.Port))), + Handler: mux, + } + glog.Fatal(server.ListenAndServe()) + }() + + eventBroadcaster := record.NewBroadcaster() + eventBroadcaster.StartLogging(glog.Infof) + eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.Core().Events("")}) + recorder := eventBroadcaster.NewRecorder(v1.EventSource{Component: "cloud-controller-manager"}) + + run := func(stop <-chan struct{}) { + rootClientBuilder := controller.SimpleControllerClientBuilder{ + ClientConfig: kubeconfig, + } + var clientBuilder controller.ControllerClientBuilder + if len(s.ServiceAccountKeyFile) > 0 && s.UseServiceAccountCredentials { + clientBuilder = controller.SAControllerClientBuilder{ + ClientConfig: restclient.AnonymousClientConfig(kubeconfig), + CoreClient: kubeClient.Core(), + Namespace: "kube-system", + } + } else { + clientBuilder = rootClientBuilder + } + + err := StartControllers(s, kubeconfig, rootClientBuilder, clientBuilder, stop, recorder, cloud) + glog.Fatalf("error running controllers: %v", err) + panic("unreachable") + } + + if !s.LeaderElection.LeaderElect { + run(nil) + panic("unreachable") + } + + id, err := os.Hostname() + if err != nil { + return err + } + + // TODO: enable other lock types + rl := resourcelock.EndpointsLock{ + EndpointsMeta: v1.ObjectMeta{ + Namespace: "kube-system", + Name: "cloud-controller-manager", + }, + Client: leaderElectionClient, + LockConfig: resourcelock.ResourceLockConfig{ + Identity: id + "-external-cloud-controller", + EventRecorder: recorder, + }, + } + + leaderelection.RunOrDie(leaderelection.LeaderElectionConfig{ + Lock: &rl, + LeaseDuration: s.LeaderElection.LeaseDuration.Duration, + RenewDeadline: s.LeaderElection.RenewDeadline.Duration, + RetryPeriod: s.LeaderElection.RetryPeriod.Duration, + Callbacks: leaderelection.LeaderCallbacks{ + OnStartedLeading: run, + OnStoppedLeading: func() { + glog.Fatalf("leaderelection lost") + }, + }, + }) + panic("unreachable") +} + +func StartControllers(s *options.ExternalCMServer, kubeconfig *restclient.Config, rootClientBuilder, clientBuilder controller.ControllerClientBuilder, stop <-chan struct{}, recorder record.EventRecorder, cloud cloudprovider.Interface) error { + client := func(serviceAccountName string) clientset.Interface { + return rootClientBuilder.ClientOrDie(serviceAccountName) + } + sharedInformers := informers.NewSharedInformerFactory(client("shared-informers"), nil, ResyncPeriod(s)()) + + _, clusterCIDR, err := net.ParseCIDR(s.ClusterCIDR) + if err != nil { + glog.Warningf("Unsuccessful parsing of cluster CIDR %v: %v", s.ClusterCIDR, err) + } + + nodeController, err := nodecontroller.NewCloudNodeController( + sharedInformers.Nodes(), + client("cloud-node-controller"), cloud, + s.NodeMonitorPeriod.Duration) + if err != nil { + glog.Fatalf("Failed to initialize nodecontroller: %v", err) + } + nodeController.Run() + time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) + + serviceController, err := servicecontroller.New(cloud, client("service-controller"), s.ClusterName) + if err != nil { + glog.Errorf("Failed to start service controller: %v", err) + } else { + serviceController.Run(int(s.ConcurrentServiceSyncs)) + } + time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) + + if s.AllocateNodeCIDRs && s.ConfigureCloudRoutes { + if routes, ok := cloud.Routes(); !ok { + glog.Warning("configure-cloud-routes is set, but cloud provider does not support routes. Will not configure cloud provider routes.") + } else { + routeController := routecontroller.New(routes, client("route-controller"), s.ClusterName, clusterCIDR) + routeController.Run(s.RouteReconciliationPeriod.Duration) + time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) + } + } else { + glog.Infof("Will not configure cloud provider routes for allocate-node-cidrs: %v, configure-cloud-routes: %v.", s.AllocateNodeCIDRs, s.ConfigureCloudRoutes) + } + + // If apiserver is not running we should wait for some time and fail only then. This is particularly + // important when we start apiserver and controller manager at the same time. + var versionStrings []string + err = wait.PollImmediate(time.Second, 10*time.Second, func() (bool, error) { + if versionStrings, err = restclient.ServerAPIVersions(kubeconfig); err == nil { + return true, nil + } + glog.Errorf("Failed to get api versions from server: %v", err) + return false, nil + }) + if err != nil { + glog.Fatalf("Failed to get api versions from server: %v", err) + } + + sharedInformers.Start(stop) + + select {} +} diff --git a/cmd/cloud-controller-manager/app/options/BUILD b/cmd/cloud-controller-manager/app/options/BUILD new file mode 100644 index 00000000000..769db121132 --- /dev/null +++ b/cmd/cloud-controller-manager/app/options/BUILD @@ -0,0 +1,25 @@ +package(default_visibility = ["//visibility:public"]) + +licenses(["notice"]) + +load( + "@io_bazel_rules_go//go:def.bzl", + "go_binary", + "go_library", + "go_test", + "cgo_library", +) + +go_library( + name = "go_default_library", + srcs = ["options.go"], + tags = ["automanaged"], + deps = [ + "//pkg/apis/componentconfig:go_default_library", + "//pkg/apis/meta/v1:go_default_library", + "//pkg/client/leaderelection:go_default_library", + "//pkg/master/ports:go_default_library", + "//pkg/util/config:go_default_library", + "//vendor:github.com/spf13/pflag", + ], +) diff --git a/cmd/cloud-controller-manager/app/options/options.go b/cmd/cloud-controller-manager/app/options/options.go new file mode 100644 index 00000000000..1da8b823bb0 --- /dev/null +++ b/cmd/cloud-controller-manager/app/options/options.go @@ -0,0 +1,95 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package options provides the flags used for the controller manager. +// +// CAUTION: If you update code in this file, you may need to also update code +// in contrib/mesos/pkg/controllermanager/controllermanager.go +package options + +import ( + "time" + + "k8s.io/kubernetes/pkg/apis/componentconfig" + metav1 "k8s.io/kubernetes/pkg/apis/meta/v1" + "k8s.io/kubernetes/pkg/client/leaderelection" + "k8s.io/kubernetes/pkg/master/ports" + "k8s.io/kubernetes/pkg/util/config" + + "github.com/spf13/pflag" +) + +// ExternalCMServer is the main context object for the controller manager. +type ExternalCMServer struct { + componentconfig.KubeControllerManagerConfiguration + + Master string + Kubeconfig string +} + +// NewCMServer creates a new ExternalCMServer with a default config. +func NewExternalCMServer() *ExternalCMServer { + s := ExternalCMServer{ + KubeControllerManagerConfiguration: componentconfig.KubeControllerManagerConfiguration{ + Port: ports.ControllerManagerPort, + Address: "0.0.0.0", + ConcurrentServiceSyncs: 1, + MinResyncPeriod: metav1.Duration{Duration: 12 * time.Hour}, + RegisterRetryCount: 10, + NodeMonitorPeriod: metav1.Duration{Duration: 5 * time.Second}, + ClusterName: "kubernetes", + ConfigureCloudRoutes: true, + ContentType: "application/vnd.kubernetes.protobuf", + KubeAPIQPS: 20.0, + KubeAPIBurst: 30, + LeaderElection: leaderelection.DefaultLeaderElectionConfiguration(), + ControllerStartInterval: metav1.Duration{Duration: 0 * time.Second}, + EnableGarbageCollector: true, + ConcurrentGCSyncs: 20, + ClusterSigningCertFile: "/etc/kubernetes/ca/ca.pem", + ClusterSigningKeyFile: "/etc/kubernetes/ca/ca.key", + }, + } + s.LeaderElection.LeaderElect = true + return &s +} + +// AddFlags adds flags for a specific ExternalCMServer to the specified FlagSet +func (s *ExternalCMServer) AddFlags(fs *pflag.FlagSet) { + fs.Int32Var(&s.Port, "port", s.Port, "The port that the controller-manager's http service runs on") + fs.Var(componentconfig.IPVar{Val: &s.Address}, "address", "The IP address to serve on (set to 0.0.0.0 for all interfaces)") + fs.StringVar(&s.CloudProvider, "cloud-provider", s.CloudProvider, "The provider of cloud services. Empty for no provider.") + fs.StringVar(&s.CloudConfigFile, "cloud-config", s.CloudConfigFile, "The path to the cloud provider configuration file. Empty string for no configuration file.") + fs.DurationVar(&s.MinResyncPeriod.Duration, "min-resync-period", s.MinResyncPeriod.Duration, "The resync period in reflectors will be random between MinResyncPeriod and 2*MinResyncPeriod") + fs.DurationVar(&s.NodeMonitorPeriod.Duration, "node-monitor-period", s.NodeMonitorPeriod.Duration, + "The period for syncing NodeStatus in NodeController.") + fs.StringVar(&s.ServiceAccountKeyFile, "service-account-private-key-file", s.ServiceAccountKeyFile, "Filename containing a PEM-encoded private RSA or ECDSA key used to sign service account tokens.") + fs.BoolVar(&s.UseServiceAccountCredentials, "use-service-account-credentials", s.UseServiceAccountCredentials, "If true, use individual service account credentials for each controller.") + fs.DurationVar(&s.RouteReconciliationPeriod.Duration, "route-reconciliation-period", s.RouteReconciliationPeriod.Duration, "The period for reconciling routes created for Nodes by cloud provider.") + fs.BoolVar(&s.ConfigureCloudRoutes, "configure-cloud-routes", true, "Should CIDRs allocated by allocate-node-cidrs be configured on the cloud provider.") + fs.BoolVar(&s.EnableProfiling, "profiling", true, "Enable profiling via web interface host:port/debug/pprof/") + fs.StringVar(&s.ClusterCIDR, "cluster-cidr", s.ClusterCIDR, "CIDR Range for Pods in cluster.") + fs.BoolVar(&s.AllocateNodeCIDRs, "allocate-node-cidrs", false, "Should CIDRs for Pods be allocated and set on the cloud provider.") + fs.StringVar(&s.Master, "master", s.Master, "The address of the Kubernetes API server (overrides any value in kubeconfig)") + fs.StringVar(&s.Kubeconfig, "kubeconfig", s.Kubeconfig, "Path to kubeconfig file with authorization and master location information.") + fs.StringVar(&s.ContentType, "kube-api-content-type", s.ContentType, "Content type of requests sent to apiserver.") + fs.Float32Var(&s.KubeAPIQPS, "kube-api-qps", s.KubeAPIQPS, "QPS to use while talking with kubernetes apiserver") + fs.Int32Var(&s.KubeAPIBurst, "kube-api-burst", s.KubeAPIBurst, "Burst to use while talking with kubernetes apiserver") + fs.DurationVar(&s.ControllerStartInterval.Duration, "controller-start-interval", s.ControllerStartInterval.Duration, "Interval between starting controller managers.") + + leaderelection.BindFlags(&s.LeaderElection, fs) + config.DefaultFeatureGate.AddFlag(fs) +} diff --git a/cmd/cloud-controller-manager/controller-manager.go b/cmd/cloud-controller-manager/controller-manager.go new file mode 100644 index 00000000000..2ce65417ccc --- /dev/null +++ b/cmd/cloud-controller-manager/controller-manager.go @@ -0,0 +1,64 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// The external controller manager is responsible for running controller loops that +// are cloud provider dependent. It uses the API to listen for new events on resources. + +package main + +import ( + "fmt" + "os" + + "k8s.io/kubernetes/cmd/cloud-controller-manager/app" + "k8s.io/kubernetes/cmd/cloud-controller-manager/app/options" + _ "k8s.io/kubernetes/pkg/client/metrics/prometheus" // for client metric registration + "k8s.io/kubernetes/pkg/cloudprovider" + _ "k8s.io/kubernetes/pkg/cloudprovider/providers" + "k8s.io/kubernetes/pkg/healthz" + "k8s.io/kubernetes/pkg/util/flag" + "k8s.io/kubernetes/pkg/util/logs" + _ "k8s.io/kubernetes/pkg/version/prometheus" // for version metric registration + "k8s.io/kubernetes/pkg/version/verflag" + + "github.com/golang/glog" + "github.com/spf13/pflag" +) + +func init() { + healthz.DefaultHealthz() +} + +func main() { + s := options.NewExternalCMServer() + s.AddFlags(pflag.CommandLine) + + flag.InitFlags() + logs.InitLogs() + defer logs.FlushLogs() + + verflag.PrintAndExitIfRequested() + + cloud, err := cloudprovider.InitCloudProvider(s.CloudProvider, s.CloudConfigFile) + if err != nil { + glog.Fatalf("Cloud provider could not be initialized: %v", err) + } + + if err := app.Run(s, cloud); err != nil { + fmt.Fprintf(os.Stderr, "%v\n", err) + os.Exit(1) + } +} diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index f280b232cac..8cd656a8c44 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -378,7 +378,9 @@ func StartControllers(controllers map[string]InitFunc, s *options.CMServer, root time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) if s.AllocateNodeCIDRs && s.ConfigureCloudRoutes { - if cloud == nil { + if s.CloudProvider == "external" { + glog.Warning("configure-cloud-routes is set, but external cloudprovider is specified. This manager will not configure cloud provider routes.") + } else if cloud == nil { glog.Warning("configure-cloud-routes is set, but no cloud provider specified. Will not configure cloud provider routes.") } else if routes, ok := cloud.Routes(); !ok { glog.Warning("configure-cloud-routes is set, but cloud provider does not support routes. Will not configure cloud provider routes.") diff --git a/hack/.linted_packages b/hack/.linted_packages index 09f956746df..01314244600 100644 --- a/hack/.linted_packages +++ b/hack/.linted_packages @@ -2,6 +2,7 @@ cluster/addons/fluentd-elasticsearch/es-image cluster/images/etcd/attachlease cluster/images/etcd/rollback cmd/clicheck +cmd/cloud-controller-manager cmd/gendocs cmd/genkubedocs cmd/genman diff --git a/pkg/cloudprovider/plugins.go b/pkg/cloudprovider/plugins.go index c6083e0a859..99d9758ed0f 100644 --- a/pkg/cloudprovider/plugins.go +++ b/pkg/cloudprovider/plugins.go @@ -90,6 +90,11 @@ func InitCloudProvider(name string, configFilePath string) (Interface, error) { var cloud Interface var err error + if name == "external" { + glog.Info("cloud provider external specified.") + return nil, nil + } + if name == "" { glog.Info("No cloud provider specified.") return nil, nil diff --git a/pkg/controller/cloud/BUILD b/pkg/controller/cloud/BUILD index 2fba1528357..16f758aa9bb 100644 --- a/pkg/controller/cloud/BUILD +++ b/pkg/controller/cloud/BUILD @@ -12,12 +12,13 @@ load( go_library( name = "go_default_library", - srcs = ["cloud_node_controller.go"], + srcs = ["nodecontroller.go"], tags = ["automanaged"], deps = [ - "//pkg/api:go_default_library", - "//pkg/client/clientset_generated/internalclientset:go_default_library", - "//pkg/client/clientset_generated/internalclientset/typed/core/internalversion:go_default_library", + "//pkg/api/v1:go_default_library", + "//pkg/apis/meta/v1:go_default_library", + "//pkg/client/clientset_generated/release_1_5:go_default_library", + "//pkg/client/clientset_generated/release_1_5/typed/core/v1:go_default_library", "//pkg/client/record:go_default_library", "//pkg/cloudprovider:go_default_library", "//pkg/controller/informers:go_default_library", @@ -30,19 +31,19 @@ go_library( go_test( name = "go_default_test", - srcs = ["cloud_node_controller_test.go"], + srcs = ["nodecontroller_test.go"], library = "go_default_library", tags = ["automanaged"], deps = [ - "//pkg/api:go_default_library", - "//pkg/api/unversioned:go_default_library", - "//pkg/client/clientset_generated/internalclientset/fake:go_default_library", + "//pkg/api/v1:go_default_library", + "//pkg/apis/meta/v1:go_default_library", + "//pkg/client/clientset_generated/release_1_5/fake:go_default_library", "//pkg/client/record:go_default_library", "//pkg/cloudprovider:go_default_library", "//pkg/cloudprovider/providers/fake:go_default_library", "//pkg/controller:go_default_library", "//pkg/controller/informers:go_default_library", - "//pkg/controller/node:go_default_library", + "//pkg/controller/node/testutil:go_default_library", "//pkg/util/wait:go_default_library", "//vendor:github.com/golang/glog", ], diff --git a/pkg/controller/cloud/cloud_node_controller.go b/pkg/controller/cloud/nodecontroller.go similarity index 81% rename from pkg/controller/cloud/cloud_node_controller.go rename to pkg/controller/cloud/nodecontroller.go index 29d5edf0149..48d4bb7937a 100644 --- a/pkg/controller/cloud/cloud_node_controller.go +++ b/pkg/controller/cloud/nodecontroller.go @@ -22,9 +22,10 @@ import ( "github.com/golang/glog" - "k8s.io/kubernetes/pkg/api" - clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" - unversionedcore "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/internalversion" + "k8s.io/kubernetes/pkg/api/v1" + metav1 "k8s.io/kubernetes/pkg/apis/meta/v1" + clientset "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_5" + v1core "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_5/typed/core/v1" "k8s.io/kubernetes/pkg/client/record" "k8s.io/kubernetes/pkg/cloudprovider" "k8s.io/kubernetes/pkg/controller/informers" @@ -61,11 +62,11 @@ func NewCloudNodeController( nodeMonitorPeriod time.Duration) (*CloudNodeController, error) { eventBroadcaster := record.NewBroadcaster() - recorder := eventBroadcaster.NewRecorder(api.EventSource{Component: "cloudcontrollermanager"}) + recorder := eventBroadcaster.NewRecorder(v1.EventSource{Component: "cloudcontrollermanager"}) eventBroadcaster.StartLogging(glog.Infof) if kubeClient != nil { glog.V(0).Infof("Sending events to api server.") - eventBroadcaster.StartRecordingToSink(&unversionedcore.EventSinkImpl{Interface: kubeClient.Core().Events("")}) + eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.Core().Events("")}) } else { glog.V(0).Infof("No api server defined - no events will be sent to API server.") } @@ -87,23 +88,23 @@ func (cnc *CloudNodeController) Run() { defer utilruntime.HandleCrash() go wait.Until(func() { - nodes, err := cnc.kubeClient.Core().Nodes().List(api.ListOptions{ResourceVersion: "0"}) + nodes, err := cnc.kubeClient.Core().Nodes().List(v1.ListOptions{ResourceVersion: "0"}) if err != nil { glog.Errorf("Error monitoring node status: %v", err) } for i := range nodes.Items { - var currentReadyCondition *api.NodeCondition + var currentReadyCondition *v1.NodeCondition node := &nodes.Items[i] // Try to get the current node status // If node status is empty, then kubelet has not posted ready status yet. In this case, process next node for rep := 0; rep < nodeStatusUpdateRetry; rep++ { - _, currentReadyCondition = api.GetNodeCondition(&node.Status, api.NodeReady) + _, currentReadyCondition = v1.GetNodeCondition(&node.Status, v1.NodeReady) if currentReadyCondition != nil { break } name := node.Name - node, err = cnc.kubeClient.Core().Nodes().Get(name) + node, err = cnc.kubeClient.Core().Nodes().Get(name, metav1.GetOptions{}) if err != nil { glog.Errorf("Failed while getting a Node to retry updating NodeStatus. Probably Node %s was deleted.", name) break @@ -117,7 +118,7 @@ func (cnc *CloudNodeController) Run() { // If the known node status says that Node is NotReady, then check if the node has been removed // from the cloud provider. If node cannot be found in cloudprovider, then delete the node immediately if currentReadyCondition != nil { - if currentReadyCondition.Status != api.ConditionTrue { + if currentReadyCondition.Status != v1.ConditionTrue { instances, ok := cnc.cloud.Instances() if !ok { glog.Errorf("cloud provider does not support instances.") @@ -128,14 +129,14 @@ func (cnc *CloudNodeController) Run() { if _, err := instances.ExternalID(types.NodeName(node.Name)); err != nil { if err == cloudprovider.InstanceNotFound { glog.V(2).Infof("Deleting node no longer present in cloud provider: %s", node.Name) - ref := &api.ObjectReference{ + ref := &v1.ObjectReference{ Kind: "Node", Name: node.Name, UID: types.UID(node.UID), Namespace: "", } glog.V(2).Infof("Recording %s event message for node %s", "DeletingNode", node.Name) - cnc.recorder.Eventf(ref, api.EventTypeNormal, fmt.Sprintf("Deleting Node %v because it's not present according to cloud provider", node.Name), "Node %s event: %s", node.Name, "DeletingNode") + cnc.recorder.Eventf(ref, v1.EventTypeNormal, fmt.Sprintf("Deleting Node %v because it's not present according to cloud provider", node.Name), "Node %s event: %s", node.Name, "DeletingNode") go func(nodeName string) { defer utilruntime.HandleCrash() if err := cnc.kubeClient.Core().Nodes().Delete(node.Name, nil); err != nil { diff --git a/pkg/controller/cloud/cloud_node_controller_test.go b/pkg/controller/cloud/nodecontroller_test.go similarity index 60% rename from pkg/controller/cloud/cloud_node_controller_test.go rename to pkg/controller/cloud/nodecontroller_test.go index c3771dd8b5c..45c61d56dba 100644 --- a/pkg/controller/cloud/cloud_node_controller_test.go +++ b/pkg/controller/cloud/nodecontroller_test.go @@ -22,82 +22,82 @@ import ( "github.com/golang/glog" - "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake" + "k8s.io/kubernetes/pkg/api/v1" + "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_5/fake" - "k8s.io/kubernetes/pkg/api" - "k8s.io/kubernetes/pkg/api/unversioned" + metav1 "k8s.io/kubernetes/pkg/apis/meta/v1" "k8s.io/kubernetes/pkg/client/record" "k8s.io/kubernetes/pkg/cloudprovider" fakecloud "k8s.io/kubernetes/pkg/cloudprovider/providers/fake" "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller/informers" - "k8s.io/kubernetes/pkg/controller/node" + "k8s.io/kubernetes/pkg/controller/node/testutil" "k8s.io/kubernetes/pkg/util/wait" ) // This test checks that the node is deleted when kubelet stops reporting // and cloud provider says node is gone func TestNodeDeleted(t *testing.T) { - pod0 := &api.Pod{ - ObjectMeta: api.ObjectMeta{ + pod0 := &v1.Pod{ + ObjectMeta: v1.ObjectMeta{ Namespace: "default", Name: "pod0", }, - Spec: api.PodSpec{ + Spec: v1.PodSpec{ NodeName: "node0", }, - Status: api.PodStatus{ - Conditions: []api.PodCondition{ + Status: v1.PodStatus{ + Conditions: []v1.PodCondition{ { - Type: api.PodReady, - Status: api.ConditionTrue, + Type: v1.PodReady, + Status: v1.ConditionTrue, }, }, }, } - pod1 := &api.Pod{ - ObjectMeta: api.ObjectMeta{ + pod1 := &v1.Pod{ + ObjectMeta: v1.ObjectMeta{ Namespace: "default", Name: "pod1", }, - Spec: api.PodSpec{ + Spec: v1.PodSpec{ NodeName: "node0", }, - Status: api.PodStatus{ - Conditions: []api.PodCondition{ + Status: v1.PodStatus{ + Conditions: []v1.PodCondition{ { - Type: api.PodReady, - Status: api.ConditionTrue, + Type: v1.PodReady, + Status: v1.ConditionTrue, }, }, }, } - fnh := &node.FakeNodeHandler{ - Existing: []*api.Node{ + fnh := &testutil.FakeNodeHandler{ + Existing: []*v1.Node{ { - ObjectMeta: api.ObjectMeta{ + ObjectMeta: v1.ObjectMeta{ Name: "node0", - CreationTimestamp: unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), + CreationTimestamp: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), }, - Status: api.NodeStatus{ - Conditions: []api.NodeCondition{ + Status: v1.NodeStatus{ + Conditions: []v1.NodeCondition{ { - Type: api.NodeReady, - Status: api.ConditionUnknown, - LastHeartbeatTime: unversioned.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC), - LastTransitionTime: unversioned.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC), + Type: v1.NodeReady, + Status: v1.ConditionUnknown, + LastHeartbeatTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC), + LastTransitionTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC), }, }, }, }, }, - Clientset: fake.NewSimpleClientset(&api.PodList{Items: []api.Pod{*pod0, *pod1}}), + Clientset: fake.NewSimpleClientset(&v1.PodList{Items: []v1.Pod{*pod0, *pod1}}), DeleteWaitChan: make(chan struct{}), } - factory := informers.NewSharedInformerFactory(fnh, controller.NoResyncPeriodFunc()) + factory := informers.NewSharedInformerFactory(fnh, nil, controller.NoResyncPeriodFunc()) eventBroadcaster := record.NewBroadcaster() cloudNodeController := &CloudNodeController{ @@ -105,7 +105,7 @@ func TestNodeDeleted(t *testing.T) { nodeInformer: factory.Nodes(), cloud: &fakecloud.FakeCloud{Err: cloudprovider.InstanceNotFound}, nodeMonitorPeriod: 5 * time.Second, - recorder: eventBroadcaster.NewRecorder(api.EventSource{Component: "controllermanager"}), + recorder: eventBroadcaster.NewRecorder(v1.EventSource{Component: "controllermanager"}), } eventBroadcaster.StartLogging(glog.Infof) diff --git a/pkg/controller/node/nodecontroller_test.go b/pkg/controller/node/nodecontroller_test.go index bf8b2e25866..012ade79b1c 100644 --- a/pkg/controller/node/nodecontroller_test.go +++ b/pkg/controller/node/nodecontroller_test.go @@ -1261,11 +1261,7 @@ func TestCloudProviderNoRateLimit(t *testing.T) { }, }, }, -<<<<<<< HEAD Clientset: fake.NewSimpleClientset(&v1.PodList{Items: []v1.Pod{*testutil.NewPod("pod0", "node0"), *testutil.NewPod("pod1", "node0")}}), -======= - Clientset: fake.NewSimpleClientset(&api.PodList{Items: []api.Pod{*newPod("pod0", "node0"), *newPod("pod1", "node0")}}), ->>>>>>> start breaking up controller manager into two pieces DeleteWaitChan: make(chan struct{}), } nodeController, _ := NewNodeControllerFromClient(nil, fnh, 10*time.Minute, From 75da31075743fcbc88ab133bcabdc9cc53392ead Mon Sep 17 00:00:00 2001 From: wlan0 Date: Sat, 17 Dec 2016 09:27:48 -0800 Subject: [PATCH 3/3] sanitize names and add more comments, and other essential boilerplate changes --- cmd/cloud-controller-manager/BUILD | 3 -- cmd/cloud-controller-manager/app/BUILD | 7 +-- .../app/controllermanager.go | 44 +++++++++---------- .../app/options/BUILD | 3 -- .../app/options/options.go | 25 ++++------- .../controller-manager.go | 4 +- .../app/controllermanager.go | 4 +- pkg/cloudprovider/plugins.go | 5 --- pkg/controller/cloud/BUILD | 8 ++-- pkg/controller/cloud/nodecontroller.go | 5 ++- pkg/controller/cloud/nodecontroller_test.go | 2 +- pkg/master/ports/ports.go | 3 ++ 12 files changed, 44 insertions(+), 69 deletions(-) diff --git a/cmd/cloud-controller-manager/BUILD b/cmd/cloud-controller-manager/BUILD index aac1159749a..b3cd40080be 100644 --- a/cmd/cloud-controller-manager/BUILD +++ b/cmd/cloud-controller-manager/BUILD @@ -5,9 +5,6 @@ licenses(["notice"]) load( "@io_bazel_rules_go//go:def.bzl", "go_binary", - "go_library", - "go_test", - "cgo_library", ) go_binary( diff --git a/cmd/cloud-controller-manager/app/BUILD b/cmd/cloud-controller-manager/app/BUILD index 996464b0de8..4a597874d32 100644 --- a/cmd/cloud-controller-manager/app/BUILD +++ b/cmd/cloud-controller-manager/app/BUILD @@ -4,10 +4,7 @@ licenses(["notice"]) load( "@io_bazel_rules_go//go:def.bzl", - "go_binary", "go_library", - "go_test", - "cgo_library", ) go_library( @@ -17,8 +14,8 @@ go_library( deps = [ "//cmd/cloud-controller-manager/app/options:go_default_library", "//pkg/api/v1:go_default_library", - "//pkg/client/clientset_generated/release_1_5:go_default_library", - "//pkg/client/clientset_generated/release_1_5/typed/core/v1:go_default_library", + "//pkg/client/clientset_generated/clientset:go_default_library", + "//pkg/client/clientset_generated/clientset/typed/core/v1:go_default_library", "//pkg/client/leaderelection:go_default_library", "//pkg/client/leaderelection/resourcelock:go_default_library", "//pkg/client/record:go_default_library", diff --git a/cmd/cloud-controller-manager/app/controllermanager.go b/cmd/cloud-controller-manager/app/controllermanager.go index e4e2fa4bb4d..8e6f4b2889d 100644 --- a/cmd/cloud-controller-manager/app/controllermanager.go +++ b/cmd/cloud-controller-manager/app/controllermanager.go @@ -14,12 +14,6 @@ See the License for the specific language governing permissions and limitations under the License. */ -// Package app implements a server that runs a set of active -// components. This includes replication controllers, service endpoints and -// nodes. -// -// CAUTION: If you update code in this file, you may need to also update code -// in contrib/mesos/pkg/controllermanager/controllermanager.go package app import ( @@ -33,8 +27,8 @@ import ( "k8s.io/kubernetes/cmd/cloud-controller-manager/app/options" "k8s.io/kubernetes/pkg/api/v1" - clientset "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_5" - v1core "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_5/typed/core/v1" + clientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" + v1core "k8s.io/kubernetes/pkg/client/clientset_generated/clientset/typed/core/v1" "k8s.io/kubernetes/pkg/client/leaderelection" "k8s.io/kubernetes/pkg/client/leaderelection/resourcelock" "k8s.io/kubernetes/pkg/client/record" @@ -61,20 +55,14 @@ const ( ControllerStartJitter = 1.0 ) -// NewControllerManagerCommand creates a *cobra.Command object with default parameters -func NewControllerManagerCommand() *cobra.Command { - s := options.NewExternalCMServer() +// NewCloudControllerManagerCommand creates a *cobra.Command object with default parameters +func NewCloudControllerManagerCommand() *cobra.Command { + s := options.NewCloudControllerManagerServer() s.AddFlags(pflag.CommandLine) cmd := &cobra.Command{ Use: "cloud-controller-manager", - Long: `The Kubernetes controller manager is a daemon that embeds -the core control loops shipped with Kubernetes. In applications of robotics and -automation, a control loop is a non-terminating loop that regulates the state of -the system. In Kubernetes, a controller is a control loop that watches the shared -state of the cluster through the apiserver and makes changes attempting to move the -current state towards the desired state. Examples of controllers that ship with -Kubernetes today are the replication controller, endpoints controller, namespace -controller, and serviceaccounts controller.`, + Long: `The Cloud controller manager is a daemon that embeds +the cloud specific control loops shipped with Kubernetes.`, Run: func(cmd *cobra.Command, args []string) { }, } @@ -82,7 +70,8 @@ controller, and serviceaccounts controller.`, return cmd } -func ResyncPeriod(s *options.ExternalCMServer) func() time.Duration { +// ResyncPeriod computes the time interval a shared informer waits before resyncing with the api server +func ResyncPeriod(s *options.CloudControllerManagerServer) func() time.Duration { return func() time.Duration { factor := rand.Float64() + 1 return time.Duration(float64(s.MinResyncPeriod.Nanoseconds()) * factor) @@ -90,7 +79,7 @@ func ResyncPeriod(s *options.ExternalCMServer) func() time.Duration { } // Run runs the ExternalCMServer. This should never exit. -func Run(s *options.ExternalCMServer, cloud cloudprovider.Interface) error { +func Run(s *options.CloudControllerManagerServer, cloud cloudprovider.Interface) error { if c, err := configz.New("componentconfig"); err == nil { c.Set(s.KubeControllerManagerConfiguration) } else { @@ -101,6 +90,7 @@ func Run(s *options.ExternalCMServer, cloud cloudprovider.Interface) error { return err } + // Set the ContentType of the requests from kube client kubeconfig.ContentConfig.ContentType = s.ContentType // Override kubeconfig qps/burst settings from flags kubeconfig.QPS = s.KubeAPIQPS @@ -111,6 +101,7 @@ func Run(s *options.ExternalCMServer, cloud cloudprovider.Interface) error { } leaderElectionClient := clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "leader-election")) + // Start the external controller manager server go func() { mux := http.NewServeMux() healthz.InstallHandler(mux) @@ -159,12 +150,13 @@ func Run(s *options.ExternalCMServer, cloud cloudprovider.Interface) error { panic("unreachable") } + // Identity used to distinguish between multiple cloud controller manager instances id, err := os.Hostname() if err != nil { return err } - // TODO: enable other lock types + // Lock required for leader election rl := resourcelock.EndpointsLock{ EndpointsMeta: v1.ObjectMeta{ Namespace: "kube-system", @@ -177,6 +169,7 @@ func Run(s *options.ExternalCMServer, cloud cloudprovider.Interface) error { }, } + // Try and become the leader and start cloud controller manager loops leaderelection.RunOrDie(leaderelection.LeaderElectionConfig{ Lock: &rl, LeaseDuration: s.LeaderElection.LeaseDuration.Duration, @@ -192,7 +185,9 @@ func Run(s *options.ExternalCMServer, cloud cloudprovider.Interface) error { panic("unreachable") } -func StartControllers(s *options.ExternalCMServer, kubeconfig *restclient.Config, rootClientBuilder, clientBuilder controller.ControllerClientBuilder, stop <-chan struct{}, recorder record.EventRecorder, cloud cloudprovider.Interface) error { +// StartControllers starts the cloud specific controller loops. +func StartControllers(s *options.CloudControllerManagerServer, kubeconfig *restclient.Config, rootClientBuilder, clientBuilder controller.ControllerClientBuilder, stop <-chan struct{}, recorder record.EventRecorder, cloud cloudprovider.Interface) error { + // Function to build the kube client object client := func(serviceAccountName string) clientset.Interface { return rootClientBuilder.ClientOrDie(serviceAccountName) } @@ -203,6 +198,7 @@ func StartControllers(s *options.ExternalCMServer, kubeconfig *restclient.Config glog.Warningf("Unsuccessful parsing of cluster CIDR %v: %v", s.ClusterCIDR, err) } + // Start the CloudNodeController nodeController, err := nodecontroller.NewCloudNodeController( sharedInformers.Nodes(), client("cloud-node-controller"), cloud, @@ -213,6 +209,7 @@ func StartControllers(s *options.ExternalCMServer, kubeconfig *restclient.Config nodeController.Run() time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) + // Start the service controller serviceController, err := servicecontroller.New(cloud, client("service-controller"), s.ClusterName) if err != nil { glog.Errorf("Failed to start service controller: %v", err) @@ -221,6 +218,7 @@ func StartControllers(s *options.ExternalCMServer, kubeconfig *restclient.Config } time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) + // If CIDRs should be allocated for pods and set on the CloudProvider, then start the route controller if s.AllocateNodeCIDRs && s.ConfigureCloudRoutes { if routes, ok := cloud.Routes(); !ok { glog.Warning("configure-cloud-routes is set, but cloud provider does not support routes. Will not configure cloud provider routes.") diff --git a/cmd/cloud-controller-manager/app/options/BUILD b/cmd/cloud-controller-manager/app/options/BUILD index 769db121132..cc1f41a8605 100644 --- a/cmd/cloud-controller-manager/app/options/BUILD +++ b/cmd/cloud-controller-manager/app/options/BUILD @@ -4,10 +4,7 @@ licenses(["notice"]) load( "@io_bazel_rules_go//go:def.bzl", - "go_binary", "go_library", - "go_test", - "cgo_library", ) go_library( diff --git a/cmd/cloud-controller-manager/app/options/options.go b/cmd/cloud-controller-manager/app/options/options.go index 1da8b823bb0..b1db26ae7b9 100644 --- a/cmd/cloud-controller-manager/app/options/options.go +++ b/cmd/cloud-controller-manager/app/options/options.go @@ -14,10 +14,6 @@ See the License for the specific language governing permissions and limitations under the License. */ -// Package options provides the flags used for the controller manager. -// -// CAUTION: If you update code in this file, you may need to also update code -// in contrib/mesos/pkg/controllermanager/controllermanager.go package options import ( @@ -32,23 +28,22 @@ import ( "github.com/spf13/pflag" ) -// ExternalCMServer is the main context object for the controller manager. -type ExternalCMServer struct { +// CloudControllerMangerServer is the main context object for the controller manager. +type CloudControllerManagerServer struct { componentconfig.KubeControllerManagerConfiguration Master string Kubeconfig string } -// NewCMServer creates a new ExternalCMServer with a default config. -func NewExternalCMServer() *ExternalCMServer { - s := ExternalCMServer{ +// NewCloudControllerManagerServer creates a new ExternalCMServer with a default config. +func NewCloudControllerManagerServer() *CloudControllerManagerServer { + s := CloudControllerManagerServer{ KubeControllerManagerConfiguration: componentconfig.KubeControllerManagerConfiguration{ - Port: ports.ControllerManagerPort, + Port: ports.CloudControllerManagerPort, Address: "0.0.0.0", ConcurrentServiceSyncs: 1, MinResyncPeriod: metav1.Duration{Duration: 12 * time.Hour}, - RegisterRetryCount: 10, NodeMonitorPeriod: metav1.Duration{Duration: 5 * time.Second}, ClusterName: "kubernetes", ConfigureCloudRoutes: true, @@ -57,10 +52,6 @@ func NewExternalCMServer() *ExternalCMServer { KubeAPIBurst: 30, LeaderElection: leaderelection.DefaultLeaderElectionConfiguration(), ControllerStartInterval: metav1.Duration{Duration: 0 * time.Second}, - EnableGarbageCollector: true, - ConcurrentGCSyncs: 20, - ClusterSigningCertFile: "/etc/kubernetes/ca/ca.pem", - ClusterSigningKeyFile: "/etc/kubernetes/ca/ca.key", }, } s.LeaderElection.LeaderElect = true @@ -68,8 +59,8 @@ func NewExternalCMServer() *ExternalCMServer { } // AddFlags adds flags for a specific ExternalCMServer to the specified FlagSet -func (s *ExternalCMServer) AddFlags(fs *pflag.FlagSet) { - fs.Int32Var(&s.Port, "port", s.Port, "The port that the controller-manager's http service runs on") +func (s *CloudControllerManagerServer) AddFlags(fs *pflag.FlagSet) { + fs.Int32Var(&s.Port, "port", s.Port, "The port that the cloud-controller-manager's http service runs on") fs.Var(componentconfig.IPVar{Val: &s.Address}, "address", "The IP address to serve on (set to 0.0.0.0 for all interfaces)") fs.StringVar(&s.CloudProvider, "cloud-provider", s.CloudProvider, "The provider of cloud services. Empty for no provider.") fs.StringVar(&s.CloudConfigFile, "cloud-config", s.CloudConfigFile, "The path to the cloud provider configuration file. Empty string for no configuration file.") diff --git a/cmd/cloud-controller-manager/controller-manager.go b/cmd/cloud-controller-manager/controller-manager.go index 2ce65417ccc..a465fec3596 100644 --- a/cmd/cloud-controller-manager/controller-manager.go +++ b/cmd/cloud-controller-manager/controller-manager.go @@ -15,7 +15,7 @@ limitations under the License. */ // The external controller manager is responsible for running controller loops that -// are cloud provider dependent. It uses the API to listen for new events on resources. +// are cloud provider dependent. It uses the API to listen to new events on resources. package main @@ -43,7 +43,7 @@ func init() { } func main() { - s := options.NewExternalCMServer() + s := options.NewCloudControllerManagerServer() s.AddFlags(pflag.CommandLine) flag.InitFlags() diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index 8cd656a8c44..f280b232cac 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -378,9 +378,7 @@ func StartControllers(controllers map[string]InitFunc, s *options.CMServer, root time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) if s.AllocateNodeCIDRs && s.ConfigureCloudRoutes { - if s.CloudProvider == "external" { - glog.Warning("configure-cloud-routes is set, but external cloudprovider is specified. This manager will not configure cloud provider routes.") - } else if cloud == nil { + if cloud == nil { glog.Warning("configure-cloud-routes is set, but no cloud provider specified. Will not configure cloud provider routes.") } else if routes, ok := cloud.Routes(); !ok { glog.Warning("configure-cloud-routes is set, but cloud provider does not support routes. Will not configure cloud provider routes.") diff --git a/pkg/cloudprovider/plugins.go b/pkg/cloudprovider/plugins.go index 99d9758ed0f..c6083e0a859 100644 --- a/pkg/cloudprovider/plugins.go +++ b/pkg/cloudprovider/plugins.go @@ -90,11 +90,6 @@ func InitCloudProvider(name string, configFilePath string) (Interface, error) { var cloud Interface var err error - if name == "external" { - glog.Info("cloud provider external specified.") - return nil, nil - } - if name == "" { glog.Info("No cloud provider specified.") return nil, nil diff --git a/pkg/controller/cloud/BUILD b/pkg/controller/cloud/BUILD index 16f758aa9bb..d61579dde11 100644 --- a/pkg/controller/cloud/BUILD +++ b/pkg/controller/cloud/BUILD @@ -4,10 +4,8 @@ licenses(["notice"]) load( "@io_bazel_rules_go//go:def.bzl", - "go_binary", "go_library", "go_test", - "cgo_library", ) go_library( @@ -17,8 +15,8 @@ go_library( deps = [ "//pkg/api/v1:go_default_library", "//pkg/apis/meta/v1:go_default_library", - "//pkg/client/clientset_generated/release_1_5:go_default_library", - "//pkg/client/clientset_generated/release_1_5/typed/core/v1:go_default_library", + "//pkg/client/clientset_generated/clientset:go_default_library", + "//pkg/client/clientset_generated/clientset/typed/core/v1:go_default_library", "//pkg/client/record:go_default_library", "//pkg/cloudprovider:go_default_library", "//pkg/controller/informers:go_default_library", @@ -37,7 +35,7 @@ go_test( deps = [ "//pkg/api/v1:go_default_library", "//pkg/apis/meta/v1:go_default_library", - "//pkg/client/clientset_generated/release_1_5/fake:go_default_library", + "//pkg/client/clientset_generated/clientset/fake:go_default_library", "//pkg/client/record:go_default_library", "//pkg/cloudprovider:go_default_library", "//pkg/cloudprovider/providers/fake:go_default_library", diff --git a/pkg/controller/cloud/nodecontroller.go b/pkg/controller/cloud/nodecontroller.go index 48d4bb7937a..c0d9016038e 100644 --- a/pkg/controller/cloud/nodecontroller.go +++ b/pkg/controller/cloud/nodecontroller.go @@ -24,8 +24,8 @@ import ( "k8s.io/kubernetes/pkg/api/v1" metav1 "k8s.io/kubernetes/pkg/apis/meta/v1" - clientset "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_5" - v1core "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_5/typed/core/v1" + clientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" + v1core "k8s.io/kubernetes/pkg/client/clientset_generated/clientset/typed/core/v1" "k8s.io/kubernetes/pkg/client/record" "k8s.io/kubernetes/pkg/cloudprovider" "k8s.io/kubernetes/pkg/controller/informers" @@ -55,6 +55,7 @@ const ( retrySleepTime = 20 * time.Millisecond ) +// NewCloudNodeController creates a CloudNodeController object func NewCloudNodeController( nodeInformer informers.NodeInformer, kubeClient clientset.Interface, diff --git a/pkg/controller/cloud/nodecontroller_test.go b/pkg/controller/cloud/nodecontroller_test.go index 45c61d56dba..1c8c1f1c57b 100644 --- a/pkg/controller/cloud/nodecontroller_test.go +++ b/pkg/controller/cloud/nodecontroller_test.go @@ -23,7 +23,7 @@ import ( "github.com/golang/glog" "k8s.io/kubernetes/pkg/api/v1" - "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_5/fake" + "k8s.io/kubernetes/pkg/client/clientset_generated/clientset/fake" metav1 "k8s.io/kubernetes/pkg/apis/meta/v1" "k8s.io/kubernetes/pkg/client/record" diff --git a/pkg/master/ports/ports.go b/pkg/master/ports/ports.go index 6aa97963d5a..4f3eed88969 100644 --- a/pkg/master/ports/ports.go +++ b/pkg/master/ports/ports.go @@ -29,6 +29,9 @@ const ( // ControllerManagerPort is the default port for the controller manager status server. // May be overridden by a flag at startup. ControllerManagerPort = 10252 + // CloudControllerManagerPort is the default port for the cloud controller manager server. + // This value may be overriden by a flag at startup. + CloudControllerManagerPort = 10253 // KubeletReadOnlyPort exposes basic read-only services from the kubelet. // May be overridden by a flag at startup. // This is necessary for heapster to collect monitoring stats from the kubelet