diff --git a/cmd/cloud-controller-manager/app/controllermanager.go b/cmd/cloud-controller-manager/app/controllermanager.go index ea67e4f624b..2b9fdd58adf 100644 --- a/cmd/cloud-controller-manager/app/controllermanager.go +++ b/cmd/cloud-controller-manager/app/controllermanager.go @@ -229,8 +229,8 @@ func StartControllers(s *options.CloudControllerManagerServer, kubeconfig *restc if routes, ok := cloud.Routes(); !ok { glog.Warning("configure-cloud-routes is set, but cloud provider does not support routes. Will not configure cloud provider routes.") } else { - routeController := routecontroller.New(routes, client("route-controller"), s.ClusterName, clusterCIDR) - routeController.Run(s.RouteReconciliationPeriod.Duration) + routeController := routecontroller.New(routes, client("route-controller"), newSharedInformers.Core().V1().Nodes(), s.ClusterName, clusterCIDR) + routeController.Run(stop, s.RouteReconciliationPeriod.Duration) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) } } else { diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index 515a334d19a..791f3b41c32 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -464,8 +464,8 @@ func StartControllers(controllers map[string]InitFunc, s *options.CMServer, root } else if routes, ok := cloud.Routes(); !ok { glog.Warning("configure-cloud-routes is set, but cloud provider does not support routes. Will not configure cloud provider routes.") } else { - routeController := routecontroller.New(routes, clientBuilder.ClientOrDie("route-controller"), s.ClusterName, clusterCIDR) - routeController.Run(s.RouteReconciliationPeriod.Duration) + routeController := routecontroller.New(routes, clientBuilder.ClientOrDie("route-controller"), newSharedInformers.Core().V1().Nodes(), s.ClusterName, clusterCIDR) + go routeController.Run(stop, s.RouteReconciliationPeriod.Duration) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) } } else { diff --git a/pkg/controller/route/BUILD b/pkg/controller/route/BUILD index eeba037ef78..2c2bc53190b 100644 --- a/pkg/controller/route/BUILD +++ b/pkg/controller/route/BUILD @@ -18,18 +18,18 @@ go_library( deps = [ "//pkg/api/v1:go_default_library", "//pkg/client/clientset_generated/clientset:go_default_library", - "//pkg/client/legacylisters:go_default_library", + "//pkg/client/informers/informers_generated/externalversions/core/v1:go_default_library", + "//pkg/client/listers/core/v1:go_default_library", "//pkg/cloudprovider:go_default_library", - "//pkg/controller:go_default_library", "//pkg/util/metrics:go_default_library", "//pkg/util/node:go_default_library", "//vendor:github.com/golang/glog", "//vendor:k8s.io/apimachinery/pkg/api/errors", "//vendor:k8s.io/apimachinery/pkg/apis/meta/v1", - "//vendor:k8s.io/apimachinery/pkg/runtime", + "//vendor:k8s.io/apimachinery/pkg/labels", "//vendor:k8s.io/apimachinery/pkg/types", + "//vendor:k8s.io/apimachinery/pkg/util/runtime", "//vendor:k8s.io/apimachinery/pkg/util/wait", - "//vendor:k8s.io/apimachinery/pkg/watch", "//vendor:k8s.io/client-go/tools/cache", ], ) @@ -42,8 +42,10 @@ go_test( deps = [ "//pkg/api/v1:go_default_library", "//pkg/client/clientset_generated/clientset/fake:go_default_library", + "//pkg/client/informers/informers_generated/externalversions:go_default_library", "//pkg/cloudprovider:go_default_library", "//pkg/cloudprovider/providers/fake:go_default_library", + "//pkg/controller:go_default_library", "//vendor:k8s.io/apimachinery/pkg/apis/meta/v1", "//vendor:k8s.io/apimachinery/pkg/types", "//vendor:k8s.io/client-go/testing", diff --git a/pkg/controller/route/routecontroller.go b/pkg/controller/route/routecontroller.go index d4866f76830..d2dc0410107 100644 --- a/pkg/controller/route/routecontroller.go +++ b/pkg/controller/route/routecontroller.go @@ -25,16 +25,16 @@ import ( "github.com/golang/glog" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/tools/cache" "k8s.io/kubernetes/pkg/api/v1" "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" - "k8s.io/kubernetes/pkg/client/legacylisters" + coreinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions/core/v1" + corelisters "k8s.io/kubernetes/pkg/client/listers/core/v1" "k8s.io/kubernetes/pkg/cloudprovider" - "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/util/metrics" nodeutil "k8s.io/kubernetes/pkg/util/node" ) @@ -50,45 +50,39 @@ const ( ) type RouteController struct { - routes cloudprovider.Routes - kubeClient clientset.Interface - clusterName string - clusterCIDR *net.IPNet - // Node framework and store - nodeController cache.Controller - nodeStore listers.StoreToNodeLister + routes cloudprovider.Routes + kubeClient clientset.Interface + clusterName string + clusterCIDR *net.IPNet + nodeLister corelisters.NodeLister + nodeListerSynced cache.InformerSynced } -func New(routes cloudprovider.Routes, kubeClient clientset.Interface, clusterName string, clusterCIDR *net.IPNet) *RouteController { +func New(routes cloudprovider.Routes, kubeClient clientset.Interface, nodeInformer coreinformers.NodeInformer, clusterName string, clusterCIDR *net.IPNet) *RouteController { if kubeClient != nil && kubeClient.Core().RESTClient().GetRateLimiter() != nil { metrics.RegisterMetricAndTrackRateLimiterUsage("route_controller", kubeClient.Core().RESTClient().GetRateLimiter()) } rc := &RouteController{ - routes: routes, - kubeClient: kubeClient, - clusterName: clusterName, - clusterCIDR: clusterCIDR, + routes: routes, + kubeClient: kubeClient, + clusterName: clusterName, + clusterCIDR: clusterCIDR, + nodeLister: nodeInformer.Lister(), + nodeListerSynced: nodeInformer.Informer().HasSynced, } - rc.nodeStore.Store, rc.nodeController = cache.NewInformer( - &cache.ListWatch{ - ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { - return rc.kubeClient.Core().Nodes().List(options) - }, - WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { - return rc.kubeClient.Core().Nodes().Watch(options) - }, - }, - &v1.Node{}, - controller.NoResyncPeriodFunc(), - cache.ResourceEventHandlerFuncs{}, - ) - return rc } -func (rc *RouteController) Run(syncPeriod time.Duration) { - go rc.nodeController.Run(wait.NeverStop) +func (rc *RouteController) Run(stopCh <-chan struct{}, syncPeriod time.Duration) { + defer utilruntime.HandleCrash() + + glog.Info("Starting the route controller") + + if !cache.WaitForCacheSync(stopCh, rc.nodeListerSynced) { + utilruntime.HandleError(fmt.Errorf("timed out waiting for caches to sync")) + return + } // TODO: If we do just the full Resync every 5 minutes (default value) // that means that we may wait up to 5 minutes before even starting @@ -107,17 +101,14 @@ func (rc *RouteController) reconcileNodeRoutes() error { if err != nil { return fmt.Errorf("error listing routes: %v", err) } - if !rc.nodeController.HasSynced() { - return fmt.Errorf("nodeController is not yet synced") - } - nodeList, err := rc.nodeStore.List() + nodes, err := rc.nodeLister.List(labels.Everything()) if err != nil { return fmt.Errorf("error listing nodes: %v", err) } - return rc.reconcile(nodeList.Items, routeList) + return rc.reconcile(nodes, routeList) } -func (rc *RouteController) reconcile(nodes []v1.Node, routes []*cloudprovider.Route) error { +func (rc *RouteController) reconcile(nodes []*v1.Node, routes []*cloudprovider.Route) error { // nodeCIDRs maps nodeName->nodeCIDR nodeCIDRs := make(map[types.NodeName]string) // routeMap maps routeTargetNode->route diff --git a/pkg/controller/route/routecontroller_test.go b/pkg/controller/route/routecontroller_test.go index ba630930497..ced204b892e 100644 --- a/pkg/controller/route/routecontroller_test.go +++ b/pkg/controller/route/routecontroller_test.go @@ -26,10 +26,14 @@ import ( core "k8s.io/client-go/testing" "k8s.io/kubernetes/pkg/api/v1" "k8s.io/kubernetes/pkg/client/clientset_generated/clientset/fake" + informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions" "k8s.io/kubernetes/pkg/cloudprovider" fakecloud "k8s.io/kubernetes/pkg/cloudprovider/providers/fake" + "k8s.io/kubernetes/pkg/controller" ) +func alwaysReady() bool { return true } + func TestIsResponsibleForRoute(t *testing.T) { myClusterName := "my-awesome-cluster" myClusterRoute := "my-awesome-cluster-12345678-90ab-cdef-1234-567890abcdef" @@ -57,7 +61,10 @@ func TestIsResponsibleForRoute(t *testing.T) { if err != nil { t.Errorf("%d. Error in test case: unparsable cidr %q", i, testCase.clusterCIDR) } - rc := New(nil, nil, myClusterName, cidr) + client := fake.NewSimpleClientset() + informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc()) + rc := New(nil, nil, informerFactory.Core().V1().Nodes(), myClusterName, cidr) + rc.nodeListerSynced = alwaysReady route := &cloudprovider.Route{ Name: testCase.routeName, TargetNode: types.NodeName("doesnt-matter-for-this-test"), @@ -76,7 +83,7 @@ func TestReconcile(t *testing.T) { nodeNoCidr := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "node-2", UID: "02"}, Spec: v1.NodeSpec{PodCIDR: ""}} testCases := []struct { - nodes []v1.Node + nodes []*v1.Node initialRoutes []*cloudprovider.Route expectedRoutes []*cloudprovider.Route expectedNetworkUnavailable []bool @@ -84,9 +91,9 @@ func TestReconcile(t *testing.T) { }{ // 2 nodes, routes already there { - nodes: []v1.Node{ - node1, - node2, + nodes: []*v1.Node{ + &node1, + &node2, }, initialRoutes: []*cloudprovider.Route{ {cluster + "-01", "node-1", "10.120.0.0/24"}, @@ -101,9 +108,9 @@ func TestReconcile(t *testing.T) { }, // 2 nodes, one route already there { - nodes: []v1.Node{ - node1, - node2, + nodes: []*v1.Node{ + &node1, + &node2, }, initialRoutes: []*cloudprovider.Route{ {cluster + "-01", "node-1", "10.120.0.0/24"}, @@ -117,9 +124,9 @@ func TestReconcile(t *testing.T) { }, // 2 nodes, no routes yet { - nodes: []v1.Node{ - node1, - node2, + nodes: []*v1.Node{ + &node1, + &node2, }, initialRoutes: []*cloudprovider.Route{}, expectedRoutes: []*cloudprovider.Route{ @@ -131,9 +138,9 @@ func TestReconcile(t *testing.T) { }, // 2 nodes, a few too many routes { - nodes: []v1.Node{ - node1, - node2, + nodes: []*v1.Node{ + &node1, + &node2, }, initialRoutes: []*cloudprovider.Route{ {cluster + "-01", "node-1", "10.120.0.0/24"}, @@ -150,9 +157,9 @@ func TestReconcile(t *testing.T) { }, // 2 nodes, 2 routes, but only 1 is right { - nodes: []v1.Node{ - node1, - node2, + nodes: []*v1.Node{ + &node1, + &node2, }, initialRoutes: []*cloudprovider.Route{ {cluster + "-01", "node-1", "10.120.0.0/24"}, @@ -167,9 +174,9 @@ func TestReconcile(t *testing.T) { }, // 2 nodes, one node without CIDR assigned. { - nodes: []v1.Node{ - node1, - nodeNoCidr, + nodes: []*v1.Node{ + &node1, + &nodeNoCidr, }, initialRoutes: []*cloudprovider.Route{}, expectedRoutes: []*cloudprovider.Route{ @@ -192,7 +199,9 @@ func TestReconcile(t *testing.T) { t.Error("Error in test: fakecloud doesn't support Routes()") } _, cidr, _ := net.ParseCIDR("10.120.0.0/16") - rc := New(routes, testCase.clientset, cluster, cidr) + informerFactory := informers.NewSharedInformerFactory(testCase.clientset, controller.NoResyncPeriodFunc()) + rc := New(routes, testCase.clientset, informerFactory.Core().V1().Nodes(), cluster, cidr) + rc.nodeListerSynced = alwaysReady if err := rc.reconcile(testCase.nodes, testCase.initialRoutes); err != nil { t.Errorf("%d. Error from rc.reconcile(): %v", i, err) }