Use Informer framework in route controller

This commit is contained in:
Wojciech Tyczynski 2016-09-19 11:39:36 +02:00
parent 3d82251e92
commit a6ef37ece9

View File

@ -26,11 +26,15 @@ import (
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/errors" "k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/client/cache"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/cloudprovider" "k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util/metrics" "k8s.io/kubernetes/pkg/util/metrics"
nodeutil "k8s.io/kubernetes/pkg/util/node" nodeutil "k8s.io/kubernetes/pkg/util/node"
"k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/watch"
) )
const ( const (
@ -48,21 +52,42 @@ type RouteController struct {
kubeClient clientset.Interface kubeClient clientset.Interface
clusterName string clusterName string
clusterCIDR *net.IPNet clusterCIDR *net.IPNet
// Node framework and store
nodeController *cache.Controller
nodeStore cache.StoreToNodeLister
} }
func New(routes cloudprovider.Routes, kubeClient clientset.Interface, clusterName string, clusterCIDR *net.IPNet) *RouteController { func New(routes cloudprovider.Routes, kubeClient clientset.Interface, clusterName string, clusterCIDR *net.IPNet) *RouteController {
if kubeClient != nil && kubeClient.Core().GetRESTClient().GetRateLimiter() != nil { if kubeClient != nil && kubeClient.Core().GetRESTClient().GetRateLimiter() != nil {
metrics.RegisterMetricAndTrackRateLimiterUsage("route_controller", kubeClient.Core().GetRESTClient().GetRateLimiter()) metrics.RegisterMetricAndTrackRateLimiterUsage("route_controller", kubeClient.Core().GetRESTClient().GetRateLimiter())
} }
return &RouteController{ rc := &RouteController{
routes: routes, routes: routes,
kubeClient: kubeClient, kubeClient: kubeClient,
clusterName: clusterName, clusterName: clusterName,
clusterCIDR: clusterCIDR, clusterCIDR: clusterCIDR,
} }
rc.nodeStore.Store, rc.nodeController = cache.NewInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return rc.kubeClient.Core().Nodes().List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return rc.kubeClient.Core().Nodes().Watch(options)
},
},
&api.Node{},
controller.NoResyncPeriodFunc(),
cache.ResourceEventHandlerFuncs{},
)
return rc
} }
func (rc *RouteController) Run(syncPeriod time.Duration) { func (rc *RouteController) Run(syncPeriod time.Duration) {
go rc.nodeController.Run(wait.NeverStop)
// TODO: If we do just the full Resync every 5 minutes (default value) // TODO: If we do just the full Resync every 5 minutes (default value)
// that means that we may wait up to 5 minutes before even starting // that means that we may wait up to 5 minutes before even starting
// creating a route for it. This is bad. // creating a route for it. This is bad.
@ -80,9 +105,10 @@ func (rc *RouteController) reconcileNodeRoutes() error {
if err != nil { if err != nil {
return fmt.Errorf("error listing routes: %v", err) return fmt.Errorf("error listing routes: %v", err)
} }
// TODO (cjcullen): use pkg/controller/cache.NewInformer to watch this if !rc.nodeController.HasSynced() {
// and reduce the number of lists needed. return fmt.Errorf("nodeController is not yet synced")
nodeList, err := rc.kubeClient.Core().Nodes().List(api.ListOptions{}) }
nodeList, err := rc.nodeStore.List()
if err != nil { if err != nil {
return fmt.Errorf("error listing nodes: %v", err) return fmt.Errorf("error listing nodes: %v", err)
} }