mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-22 19:31:44 +00:00
Switch route controller to shared informers
This commit is contained in:
parent
beaf5ffacc
commit
effde6b8dd
@ -229,8 +229,8 @@ func StartControllers(s *options.CloudControllerManagerServer, kubeconfig *restc
|
|||||||
if routes, ok := cloud.Routes(); !ok {
|
if routes, ok := cloud.Routes(); !ok {
|
||||||
glog.Warning("configure-cloud-routes is set, but cloud provider does not support routes. Will not configure cloud provider routes.")
|
glog.Warning("configure-cloud-routes is set, but cloud provider does not support routes. Will not configure cloud provider routes.")
|
||||||
} else {
|
} else {
|
||||||
routeController := routecontroller.New(routes, client("route-controller"), s.ClusterName, clusterCIDR)
|
routeController := routecontroller.New(routes, client("route-controller"), newSharedInformers.Core().V1().Nodes(), s.ClusterName, clusterCIDR)
|
||||||
routeController.Run(s.RouteReconciliationPeriod.Duration)
|
routeController.Run(stop, s.RouteReconciliationPeriod.Duration)
|
||||||
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
|
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
@ -464,8 +464,8 @@ func StartControllers(controllers map[string]InitFunc, s *options.CMServer, root
|
|||||||
} else if routes, ok := cloud.Routes(); !ok {
|
} else if routes, ok := cloud.Routes(); !ok {
|
||||||
glog.Warning("configure-cloud-routes is set, but cloud provider does not support routes. Will not configure cloud provider routes.")
|
glog.Warning("configure-cloud-routes is set, but cloud provider does not support routes. Will not configure cloud provider routes.")
|
||||||
} else {
|
} else {
|
||||||
routeController := routecontroller.New(routes, clientBuilder.ClientOrDie("route-controller"), s.ClusterName, clusterCIDR)
|
routeController := routecontroller.New(routes, clientBuilder.ClientOrDie("route-controller"), newSharedInformers.Core().V1().Nodes(), s.ClusterName, clusterCIDR)
|
||||||
routeController.Run(s.RouteReconciliationPeriod.Duration)
|
go routeController.Run(stop, s.RouteReconciliationPeriod.Duration)
|
||||||
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
|
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
@ -18,18 +18,18 @@ go_library(
|
|||||||
deps = [
|
deps = [
|
||||||
"//pkg/api/v1:go_default_library",
|
"//pkg/api/v1:go_default_library",
|
||||||
"//pkg/client/clientset_generated/clientset:go_default_library",
|
"//pkg/client/clientset_generated/clientset:go_default_library",
|
||||||
"//pkg/client/legacylisters:go_default_library",
|
"//pkg/client/informers/informers_generated/externalversions/core/v1:go_default_library",
|
||||||
|
"//pkg/client/listers/core/v1:go_default_library",
|
||||||
"//pkg/cloudprovider:go_default_library",
|
"//pkg/cloudprovider:go_default_library",
|
||||||
"//pkg/controller:go_default_library",
|
|
||||||
"//pkg/util/metrics:go_default_library",
|
"//pkg/util/metrics:go_default_library",
|
||||||
"//pkg/util/node:go_default_library",
|
"//pkg/util/node:go_default_library",
|
||||||
"//vendor:github.com/golang/glog",
|
"//vendor:github.com/golang/glog",
|
||||||
"//vendor:k8s.io/apimachinery/pkg/api/errors",
|
"//vendor:k8s.io/apimachinery/pkg/api/errors",
|
||||||
"//vendor:k8s.io/apimachinery/pkg/apis/meta/v1",
|
"//vendor:k8s.io/apimachinery/pkg/apis/meta/v1",
|
||||||
"//vendor:k8s.io/apimachinery/pkg/runtime",
|
"//vendor:k8s.io/apimachinery/pkg/labels",
|
||||||
"//vendor:k8s.io/apimachinery/pkg/types",
|
"//vendor:k8s.io/apimachinery/pkg/types",
|
||||||
|
"//vendor:k8s.io/apimachinery/pkg/util/runtime",
|
||||||
"//vendor:k8s.io/apimachinery/pkg/util/wait",
|
"//vendor:k8s.io/apimachinery/pkg/util/wait",
|
||||||
"//vendor:k8s.io/apimachinery/pkg/watch",
|
|
||||||
"//vendor:k8s.io/client-go/tools/cache",
|
"//vendor:k8s.io/client-go/tools/cache",
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
@ -42,8 +42,10 @@ go_test(
|
|||||||
deps = [
|
deps = [
|
||||||
"//pkg/api/v1:go_default_library",
|
"//pkg/api/v1:go_default_library",
|
||||||
"//pkg/client/clientset_generated/clientset/fake:go_default_library",
|
"//pkg/client/clientset_generated/clientset/fake:go_default_library",
|
||||||
|
"//pkg/client/informers/informers_generated/externalversions:go_default_library",
|
||||||
"//pkg/cloudprovider:go_default_library",
|
"//pkg/cloudprovider:go_default_library",
|
||||||
"//pkg/cloudprovider/providers/fake:go_default_library",
|
"//pkg/cloudprovider/providers/fake:go_default_library",
|
||||||
|
"//pkg/controller:go_default_library",
|
||||||
"//vendor:k8s.io/apimachinery/pkg/apis/meta/v1",
|
"//vendor:k8s.io/apimachinery/pkg/apis/meta/v1",
|
||||||
"//vendor:k8s.io/apimachinery/pkg/types",
|
"//vendor:k8s.io/apimachinery/pkg/types",
|
||||||
"//vendor:k8s.io/client-go/testing",
|
"//vendor:k8s.io/client-go/testing",
|
||||||
|
@ -25,16 +25,16 @@ import (
|
|||||||
"github.com/golang/glog"
|
"github.com/golang/glog"
|
||||||
"k8s.io/apimachinery/pkg/api/errors"
|
"k8s.io/apimachinery/pkg/api/errors"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/runtime"
|
"k8s.io/apimachinery/pkg/labels"
|
||||||
"k8s.io/apimachinery/pkg/types"
|
"k8s.io/apimachinery/pkg/types"
|
||||||
|
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||||
"k8s.io/apimachinery/pkg/util/wait"
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
"k8s.io/apimachinery/pkg/watch"
|
|
||||||
"k8s.io/client-go/tools/cache"
|
"k8s.io/client-go/tools/cache"
|
||||||
"k8s.io/kubernetes/pkg/api/v1"
|
"k8s.io/kubernetes/pkg/api/v1"
|
||||||
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
|
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
|
||||||
"k8s.io/kubernetes/pkg/client/legacylisters"
|
coreinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions/core/v1"
|
||||||
|
corelisters "k8s.io/kubernetes/pkg/client/listers/core/v1"
|
||||||
"k8s.io/kubernetes/pkg/cloudprovider"
|
"k8s.io/kubernetes/pkg/cloudprovider"
|
||||||
"k8s.io/kubernetes/pkg/controller"
|
|
||||||
"k8s.io/kubernetes/pkg/util/metrics"
|
"k8s.io/kubernetes/pkg/util/metrics"
|
||||||
nodeutil "k8s.io/kubernetes/pkg/util/node"
|
nodeutil "k8s.io/kubernetes/pkg/util/node"
|
||||||
)
|
)
|
||||||
@ -50,45 +50,39 @@ const (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type RouteController struct {
|
type RouteController struct {
|
||||||
routes cloudprovider.Routes
|
routes cloudprovider.Routes
|
||||||
kubeClient clientset.Interface
|
kubeClient clientset.Interface
|
||||||
clusterName string
|
clusterName string
|
||||||
clusterCIDR *net.IPNet
|
clusterCIDR *net.IPNet
|
||||||
// Node framework and store
|
nodeLister corelisters.NodeLister
|
||||||
nodeController cache.Controller
|
nodeListerSynced cache.InformerSynced
|
||||||
nodeStore listers.StoreToNodeLister
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func New(routes cloudprovider.Routes, kubeClient clientset.Interface, clusterName string, clusterCIDR *net.IPNet) *RouteController {
|
func New(routes cloudprovider.Routes, kubeClient clientset.Interface, nodeInformer coreinformers.NodeInformer, clusterName string, clusterCIDR *net.IPNet) *RouteController {
|
||||||
if kubeClient != nil && kubeClient.Core().RESTClient().GetRateLimiter() != nil {
|
if kubeClient != nil && kubeClient.Core().RESTClient().GetRateLimiter() != nil {
|
||||||
metrics.RegisterMetricAndTrackRateLimiterUsage("route_controller", kubeClient.Core().RESTClient().GetRateLimiter())
|
metrics.RegisterMetricAndTrackRateLimiterUsage("route_controller", kubeClient.Core().RESTClient().GetRateLimiter())
|
||||||
}
|
}
|
||||||
rc := &RouteController{
|
rc := &RouteController{
|
||||||
routes: routes,
|
routes: routes,
|
||||||
kubeClient: kubeClient,
|
kubeClient: kubeClient,
|
||||||
clusterName: clusterName,
|
clusterName: clusterName,
|
||||||
clusterCIDR: clusterCIDR,
|
clusterCIDR: clusterCIDR,
|
||||||
|
nodeLister: nodeInformer.Lister(),
|
||||||
|
nodeListerSynced: nodeInformer.Informer().HasSynced,
|
||||||
}
|
}
|
||||||
|
|
||||||
rc.nodeStore.Store, rc.nodeController = cache.NewInformer(
|
|
||||||
&cache.ListWatch{
|
|
||||||
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
|
|
||||||
return rc.kubeClient.Core().Nodes().List(options)
|
|
||||||
},
|
|
||||||
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
|
|
||||||
return rc.kubeClient.Core().Nodes().Watch(options)
|
|
||||||
},
|
|
||||||
},
|
|
||||||
&v1.Node{},
|
|
||||||
controller.NoResyncPeriodFunc(),
|
|
||||||
cache.ResourceEventHandlerFuncs{},
|
|
||||||
)
|
|
||||||
|
|
||||||
return rc
|
return rc
|
||||||
}
|
}
|
||||||
|
|
||||||
func (rc *RouteController) Run(syncPeriod time.Duration) {
|
func (rc *RouteController) Run(stopCh <-chan struct{}, syncPeriod time.Duration) {
|
||||||
go rc.nodeController.Run(wait.NeverStop)
|
defer utilruntime.HandleCrash()
|
||||||
|
|
||||||
|
glog.Info("Starting the route controller")
|
||||||
|
|
||||||
|
if !cache.WaitForCacheSync(stopCh, rc.nodeListerSynced) {
|
||||||
|
utilruntime.HandleError(fmt.Errorf("timed out waiting for caches to sync"))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
// TODO: If we do just the full Resync every 5 minutes (default value)
|
// TODO: If we do just the full Resync every 5 minutes (default value)
|
||||||
// that means that we may wait up to 5 minutes before even starting
|
// that means that we may wait up to 5 minutes before even starting
|
||||||
@ -107,17 +101,14 @@ func (rc *RouteController) reconcileNodeRoutes() error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("error listing routes: %v", err)
|
return fmt.Errorf("error listing routes: %v", err)
|
||||||
}
|
}
|
||||||
if !rc.nodeController.HasSynced() {
|
nodes, err := rc.nodeLister.List(labels.Everything())
|
||||||
return fmt.Errorf("nodeController is not yet synced")
|
|
||||||
}
|
|
||||||
nodeList, err := rc.nodeStore.List()
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("error listing nodes: %v", err)
|
return fmt.Errorf("error listing nodes: %v", err)
|
||||||
}
|
}
|
||||||
return rc.reconcile(nodeList.Items, routeList)
|
return rc.reconcile(nodes, routeList)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (rc *RouteController) reconcile(nodes []v1.Node, routes []*cloudprovider.Route) error {
|
func (rc *RouteController) reconcile(nodes []*v1.Node, routes []*cloudprovider.Route) error {
|
||||||
// nodeCIDRs maps nodeName->nodeCIDR
|
// nodeCIDRs maps nodeName->nodeCIDR
|
||||||
nodeCIDRs := make(map[types.NodeName]string)
|
nodeCIDRs := make(map[types.NodeName]string)
|
||||||
// routeMap maps routeTargetNode->route
|
// routeMap maps routeTargetNode->route
|
||||||
|
@ -26,10 +26,14 @@ import (
|
|||||||
core "k8s.io/client-go/testing"
|
core "k8s.io/client-go/testing"
|
||||||
"k8s.io/kubernetes/pkg/api/v1"
|
"k8s.io/kubernetes/pkg/api/v1"
|
||||||
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset/fake"
|
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset/fake"
|
||||||
|
informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions"
|
||||||
"k8s.io/kubernetes/pkg/cloudprovider"
|
"k8s.io/kubernetes/pkg/cloudprovider"
|
||||||
fakecloud "k8s.io/kubernetes/pkg/cloudprovider/providers/fake"
|
fakecloud "k8s.io/kubernetes/pkg/cloudprovider/providers/fake"
|
||||||
|
"k8s.io/kubernetes/pkg/controller"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func alwaysReady() bool { return true }
|
||||||
|
|
||||||
func TestIsResponsibleForRoute(t *testing.T) {
|
func TestIsResponsibleForRoute(t *testing.T) {
|
||||||
myClusterName := "my-awesome-cluster"
|
myClusterName := "my-awesome-cluster"
|
||||||
myClusterRoute := "my-awesome-cluster-12345678-90ab-cdef-1234-567890abcdef"
|
myClusterRoute := "my-awesome-cluster-12345678-90ab-cdef-1234-567890abcdef"
|
||||||
@ -57,7 +61,10 @@ func TestIsResponsibleForRoute(t *testing.T) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("%d. Error in test case: unparsable cidr %q", i, testCase.clusterCIDR)
|
t.Errorf("%d. Error in test case: unparsable cidr %q", i, testCase.clusterCIDR)
|
||||||
}
|
}
|
||||||
rc := New(nil, nil, myClusterName, cidr)
|
client := fake.NewSimpleClientset()
|
||||||
|
informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc())
|
||||||
|
rc := New(nil, nil, informerFactory.Core().V1().Nodes(), myClusterName, cidr)
|
||||||
|
rc.nodeListerSynced = alwaysReady
|
||||||
route := &cloudprovider.Route{
|
route := &cloudprovider.Route{
|
||||||
Name: testCase.routeName,
|
Name: testCase.routeName,
|
||||||
TargetNode: types.NodeName("doesnt-matter-for-this-test"),
|
TargetNode: types.NodeName("doesnt-matter-for-this-test"),
|
||||||
@ -76,7 +83,7 @@ func TestReconcile(t *testing.T) {
|
|||||||
nodeNoCidr := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "node-2", UID: "02"}, Spec: v1.NodeSpec{PodCIDR: ""}}
|
nodeNoCidr := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "node-2", UID: "02"}, Spec: v1.NodeSpec{PodCIDR: ""}}
|
||||||
|
|
||||||
testCases := []struct {
|
testCases := []struct {
|
||||||
nodes []v1.Node
|
nodes []*v1.Node
|
||||||
initialRoutes []*cloudprovider.Route
|
initialRoutes []*cloudprovider.Route
|
||||||
expectedRoutes []*cloudprovider.Route
|
expectedRoutes []*cloudprovider.Route
|
||||||
expectedNetworkUnavailable []bool
|
expectedNetworkUnavailable []bool
|
||||||
@ -84,9 +91,9 @@ func TestReconcile(t *testing.T) {
|
|||||||
}{
|
}{
|
||||||
// 2 nodes, routes already there
|
// 2 nodes, routes already there
|
||||||
{
|
{
|
||||||
nodes: []v1.Node{
|
nodes: []*v1.Node{
|
||||||
node1,
|
&node1,
|
||||||
node2,
|
&node2,
|
||||||
},
|
},
|
||||||
initialRoutes: []*cloudprovider.Route{
|
initialRoutes: []*cloudprovider.Route{
|
||||||
{cluster + "-01", "node-1", "10.120.0.0/24"},
|
{cluster + "-01", "node-1", "10.120.0.0/24"},
|
||||||
@ -101,9 +108,9 @@ func TestReconcile(t *testing.T) {
|
|||||||
},
|
},
|
||||||
// 2 nodes, one route already there
|
// 2 nodes, one route already there
|
||||||
{
|
{
|
||||||
nodes: []v1.Node{
|
nodes: []*v1.Node{
|
||||||
node1,
|
&node1,
|
||||||
node2,
|
&node2,
|
||||||
},
|
},
|
||||||
initialRoutes: []*cloudprovider.Route{
|
initialRoutes: []*cloudprovider.Route{
|
||||||
{cluster + "-01", "node-1", "10.120.0.0/24"},
|
{cluster + "-01", "node-1", "10.120.0.0/24"},
|
||||||
@ -117,9 +124,9 @@ func TestReconcile(t *testing.T) {
|
|||||||
},
|
},
|
||||||
// 2 nodes, no routes yet
|
// 2 nodes, no routes yet
|
||||||
{
|
{
|
||||||
nodes: []v1.Node{
|
nodes: []*v1.Node{
|
||||||
node1,
|
&node1,
|
||||||
node2,
|
&node2,
|
||||||
},
|
},
|
||||||
initialRoutes: []*cloudprovider.Route{},
|
initialRoutes: []*cloudprovider.Route{},
|
||||||
expectedRoutes: []*cloudprovider.Route{
|
expectedRoutes: []*cloudprovider.Route{
|
||||||
@ -131,9 +138,9 @@ func TestReconcile(t *testing.T) {
|
|||||||
},
|
},
|
||||||
// 2 nodes, a few too many routes
|
// 2 nodes, a few too many routes
|
||||||
{
|
{
|
||||||
nodes: []v1.Node{
|
nodes: []*v1.Node{
|
||||||
node1,
|
&node1,
|
||||||
node2,
|
&node2,
|
||||||
},
|
},
|
||||||
initialRoutes: []*cloudprovider.Route{
|
initialRoutes: []*cloudprovider.Route{
|
||||||
{cluster + "-01", "node-1", "10.120.0.0/24"},
|
{cluster + "-01", "node-1", "10.120.0.0/24"},
|
||||||
@ -150,9 +157,9 @@ func TestReconcile(t *testing.T) {
|
|||||||
},
|
},
|
||||||
// 2 nodes, 2 routes, but only 1 is right
|
// 2 nodes, 2 routes, but only 1 is right
|
||||||
{
|
{
|
||||||
nodes: []v1.Node{
|
nodes: []*v1.Node{
|
||||||
node1,
|
&node1,
|
||||||
node2,
|
&node2,
|
||||||
},
|
},
|
||||||
initialRoutes: []*cloudprovider.Route{
|
initialRoutes: []*cloudprovider.Route{
|
||||||
{cluster + "-01", "node-1", "10.120.0.0/24"},
|
{cluster + "-01", "node-1", "10.120.0.0/24"},
|
||||||
@ -167,9 +174,9 @@ func TestReconcile(t *testing.T) {
|
|||||||
},
|
},
|
||||||
// 2 nodes, one node without CIDR assigned.
|
// 2 nodes, one node without CIDR assigned.
|
||||||
{
|
{
|
||||||
nodes: []v1.Node{
|
nodes: []*v1.Node{
|
||||||
node1,
|
&node1,
|
||||||
nodeNoCidr,
|
&nodeNoCidr,
|
||||||
},
|
},
|
||||||
initialRoutes: []*cloudprovider.Route{},
|
initialRoutes: []*cloudprovider.Route{},
|
||||||
expectedRoutes: []*cloudprovider.Route{
|
expectedRoutes: []*cloudprovider.Route{
|
||||||
@ -192,7 +199,9 @@ func TestReconcile(t *testing.T) {
|
|||||||
t.Error("Error in test: fakecloud doesn't support Routes()")
|
t.Error("Error in test: fakecloud doesn't support Routes()")
|
||||||
}
|
}
|
||||||
_, cidr, _ := net.ParseCIDR("10.120.0.0/16")
|
_, cidr, _ := net.ParseCIDR("10.120.0.0/16")
|
||||||
rc := New(routes, testCase.clientset, cluster, cidr)
|
informerFactory := informers.NewSharedInformerFactory(testCase.clientset, controller.NoResyncPeriodFunc())
|
||||||
|
rc := New(routes, testCase.clientset, informerFactory.Core().V1().Nodes(), cluster, cidr)
|
||||||
|
rc.nodeListerSynced = alwaysReady
|
||||||
if err := rc.reconcile(testCase.nodes, testCase.initialRoutes); err != nil {
|
if err := rc.reconcile(testCase.nodes, testCase.initialRoutes); err != nil {
|
||||||
t.Errorf("%d. Error from rc.reconcile(): %v", i, err)
|
t.Errorf("%d. Error from rc.reconcile(): %v", i, err)
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user