mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-21 19:01:49 +00:00
Merge pull request #41355 from ncdc/shared-informers-09-service
Automatic merge from submit-queue Switch service controller to shared informers Originally part of #40097 cc @deads2k @smarterclayton @gmarek @wojtek-t @timothysc @sttts @liggitt @kubernetes/sig-scalability-pr-reviews
This commit is contained in:
commit
58ec5cce28
@ -216,11 +216,17 @@ func StartControllers(s *options.CloudControllerManagerServer, kubeconfig *restc
|
|||||||
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
|
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
|
||||||
|
|
||||||
// Start the service controller
|
// Start the service controller
|
||||||
serviceController, err := servicecontroller.New(cloud, client("service-controller"), s.ClusterName)
|
serviceController, err := servicecontroller.New(
|
||||||
|
cloud,
|
||||||
|
client("service-controller"),
|
||||||
|
newSharedInformers.Core().V1().Services(),
|
||||||
|
newSharedInformers.Core().V1().Nodes(),
|
||||||
|
s.ClusterName,
|
||||||
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("Failed to start service controller: %v", err)
|
glog.Errorf("Failed to start service controller: %v", err)
|
||||||
} else {
|
} else {
|
||||||
serviceController.Run(int(s.ConcurrentServiceSyncs))
|
go serviceController.Run(stop, int(s.ConcurrentServiceSyncs))
|
||||||
}
|
}
|
||||||
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
|
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
|
||||||
|
|
||||||
|
@ -450,11 +450,17 @@ func StartControllers(controllers map[string]InitFunc, s *options.CMServer, root
|
|||||||
nodeController.Run()
|
nodeController.Run()
|
||||||
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
|
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
|
||||||
|
|
||||||
serviceController, err := servicecontroller.New(cloud, clientBuilder.ClientOrDie("service-controller"), s.ClusterName)
|
serviceController, err := servicecontroller.New(
|
||||||
|
cloud,
|
||||||
|
clientBuilder.ClientOrDie("service-controller"),
|
||||||
|
newSharedInformers.Core().V1().Services(),
|
||||||
|
newSharedInformers.Core().V1().Nodes(),
|
||||||
|
s.ClusterName,
|
||||||
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("Failed to start service controller: %v", err)
|
glog.Errorf("Failed to start service controller: %v", err)
|
||||||
} else {
|
} else {
|
||||||
serviceController.Run(int(s.ConcurrentServiceSyncs))
|
go serviceController.Run(stop, int(s.ConcurrentServiceSyncs))
|
||||||
}
|
}
|
||||||
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
|
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
|
||||||
|
|
||||||
|
@ -19,18 +19,16 @@ go_library(
|
|||||||
"//pkg/api:go_default_library",
|
"//pkg/api:go_default_library",
|
||||||
"//pkg/api/v1:go_default_library",
|
"//pkg/api/v1:go_default_library",
|
||||||
"//pkg/client/clientset_generated/clientset:go_default_library",
|
"//pkg/client/clientset_generated/clientset:go_default_library",
|
||||||
"//pkg/client/legacylisters:go_default_library",
|
"//pkg/client/informers/informers_generated/externalversions/core/v1:go_default_library",
|
||||||
|
"//pkg/client/listers/core/v1:go_default_library",
|
||||||
"//pkg/cloudprovider:go_default_library",
|
"//pkg/cloudprovider:go_default_library",
|
||||||
"//pkg/controller:go_default_library",
|
"//pkg/controller:go_default_library",
|
||||||
"//pkg/util/metrics:go_default_library",
|
"//pkg/util/metrics:go_default_library",
|
||||||
"//vendor:github.com/golang/glog",
|
"//vendor:github.com/golang/glog",
|
||||||
"//vendor:k8s.io/apimachinery/pkg/api/errors",
|
"//vendor:k8s.io/apimachinery/pkg/api/errors",
|
||||||
"//vendor:k8s.io/apimachinery/pkg/apis/meta/v1",
|
"//vendor:k8s.io/apimachinery/pkg/labels",
|
||||||
"//vendor:k8s.io/apimachinery/pkg/fields",
|
|
||||||
"//vendor:k8s.io/apimachinery/pkg/runtime",
|
|
||||||
"//vendor:k8s.io/apimachinery/pkg/util/runtime",
|
"//vendor:k8s.io/apimachinery/pkg/util/runtime",
|
||||||
"//vendor:k8s.io/apimachinery/pkg/util/wait",
|
"//vendor:k8s.io/apimachinery/pkg/util/wait",
|
||||||
"//vendor:k8s.io/apimachinery/pkg/watch",
|
|
||||||
"//vendor:k8s.io/client-go/kubernetes/typed/core/v1",
|
"//vendor:k8s.io/client-go/kubernetes/typed/core/v1",
|
||||||
"//vendor:k8s.io/client-go/pkg/api/v1",
|
"//vendor:k8s.io/client-go/pkg/api/v1",
|
||||||
"//vendor:k8s.io/client-go/tools/cache",
|
"//vendor:k8s.io/client-go/tools/cache",
|
||||||
@ -48,7 +46,9 @@ go_test(
|
|||||||
"//pkg/api/testapi:go_default_library",
|
"//pkg/api/testapi:go_default_library",
|
||||||
"//pkg/api/v1:go_default_library",
|
"//pkg/api/v1:go_default_library",
|
||||||
"//pkg/client/clientset_generated/clientset/fake:go_default_library",
|
"//pkg/client/clientset_generated/clientset/fake:go_default_library",
|
||||||
|
"//pkg/client/informers/informers_generated/externalversions:go_default_library",
|
||||||
"//pkg/cloudprovider/providers/fake:go_default_library",
|
"//pkg/cloudprovider/providers/fake:go_default_library",
|
||||||
|
"//pkg/controller:go_default_library",
|
||||||
"//vendor:k8s.io/apimachinery/pkg/apis/meta/v1",
|
"//vendor:k8s.io/apimachinery/pkg/apis/meta/v1",
|
||||||
"//vendor:k8s.io/apimachinery/pkg/types",
|
"//vendor:k8s.io/apimachinery/pkg/types",
|
||||||
"//vendor:k8s.io/client-go/tools/record",
|
"//vendor:k8s.io/client-go/tools/record",
|
||||||
|
@ -26,12 +26,9 @@ import (
|
|||||||
|
|
||||||
"github.com/golang/glog"
|
"github.com/golang/glog"
|
||||||
"k8s.io/apimachinery/pkg/api/errors"
|
"k8s.io/apimachinery/pkg/api/errors"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
"k8s.io/apimachinery/pkg/labels"
|
||||||
"k8s.io/apimachinery/pkg/fields"
|
|
||||||
pkgruntime "k8s.io/apimachinery/pkg/runtime"
|
|
||||||
"k8s.io/apimachinery/pkg/util/runtime"
|
"k8s.io/apimachinery/pkg/util/runtime"
|
||||||
"k8s.io/apimachinery/pkg/util/wait"
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
"k8s.io/apimachinery/pkg/watch"
|
|
||||||
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
|
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
|
||||||
clientv1 "k8s.io/client-go/pkg/api/v1"
|
clientv1 "k8s.io/client-go/pkg/api/v1"
|
||||||
"k8s.io/client-go/tools/cache"
|
"k8s.io/client-go/tools/cache"
|
||||||
@ -40,7 +37,8 @@ import (
|
|||||||
"k8s.io/kubernetes/pkg/api"
|
"k8s.io/kubernetes/pkg/api"
|
||||||
"k8s.io/kubernetes/pkg/api/v1"
|
"k8s.io/kubernetes/pkg/api/v1"
|
||||||
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
|
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
|
||||||
"k8s.io/kubernetes/pkg/client/legacylisters"
|
coreinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions/core/v1"
|
||||||
|
corelisters "k8s.io/kubernetes/pkg/client/listers/core/v1"
|
||||||
"k8s.io/kubernetes/pkg/cloudprovider"
|
"k8s.io/kubernetes/pkg/cloudprovider"
|
||||||
"k8s.io/kubernetes/pkg/controller"
|
"k8s.io/kubernetes/pkg/controller"
|
||||||
"k8s.io/kubernetes/pkg/util/metrics"
|
"k8s.io/kubernetes/pkg/util/metrics"
|
||||||
@ -80,28 +78,33 @@ type serviceCache struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type ServiceController struct {
|
type ServiceController struct {
|
||||||
cloud cloudprovider.Interface
|
cloud cloudprovider.Interface
|
||||||
knownHosts []*v1.Node
|
knownHosts []*v1.Node
|
||||||
servicesToUpdate []*v1.Service
|
servicesToUpdate []*v1.Service
|
||||||
kubeClient clientset.Interface
|
kubeClient clientset.Interface
|
||||||
clusterName string
|
clusterName string
|
||||||
balancer cloudprovider.LoadBalancer
|
balancer cloudprovider.LoadBalancer
|
||||||
zone cloudprovider.Zone
|
zone cloudprovider.Zone
|
||||||
cache *serviceCache
|
cache *serviceCache
|
||||||
// A store of services, populated by the serviceController
|
serviceLister corelisters.ServiceLister
|
||||||
serviceStore listers.StoreToServiceLister
|
serviceListerSynced cache.InformerSynced
|
||||||
// Watches changes to all services
|
eventBroadcaster record.EventBroadcaster
|
||||||
serviceController cache.Controller
|
eventRecorder record.EventRecorder
|
||||||
eventBroadcaster record.EventBroadcaster
|
nodeLister corelisters.NodeLister
|
||||||
eventRecorder record.EventRecorder
|
nodeListerSynced cache.InformerSynced
|
||||||
nodeLister listers.StoreToNodeLister
|
|
||||||
// services that need to be synced
|
// services that need to be synced
|
||||||
workingQueue workqueue.DelayingInterface
|
workingQueue workqueue.DelayingInterface
|
||||||
}
|
}
|
||||||
|
|
||||||
// New returns a new service controller to keep cloud provider service resources
|
// New returns a new service controller to keep cloud provider service resources
|
||||||
// (like load balancers) in sync with the registry.
|
// (like load balancers) in sync with the registry.
|
||||||
func New(cloud cloudprovider.Interface, kubeClient clientset.Interface, clusterName string) (*ServiceController, error) {
|
func New(
|
||||||
|
cloud cloudprovider.Interface,
|
||||||
|
kubeClient clientset.Interface,
|
||||||
|
serviceInformer coreinformers.ServiceInformer,
|
||||||
|
nodeInformer coreinformers.NodeInformer,
|
||||||
|
clusterName string,
|
||||||
|
) (*ServiceController, error) {
|
||||||
broadcaster := record.NewBroadcaster()
|
broadcaster := record.NewBroadcaster()
|
||||||
broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: v1core.New(kubeClient.Core().RESTClient()).Events("")})
|
broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: v1core.New(kubeClient.Core().RESTClient()).Events("")})
|
||||||
recorder := broadcaster.NewRecorder(api.Scheme, clientv1.EventSource{Component: "service-controller"})
|
recorder := broadcaster.NewRecorder(api.Scheme, clientv1.EventSource{Component: "service-controller"})
|
||||||
@ -118,22 +121,12 @@ func New(cloud cloudprovider.Interface, kubeClient clientset.Interface, clusterN
|
|||||||
cache: &serviceCache{serviceMap: make(map[string]*cachedService)},
|
cache: &serviceCache{serviceMap: make(map[string]*cachedService)},
|
||||||
eventBroadcaster: broadcaster,
|
eventBroadcaster: broadcaster,
|
||||||
eventRecorder: recorder,
|
eventRecorder: recorder,
|
||||||
nodeLister: listers.StoreToNodeLister{
|
nodeLister: nodeInformer.Lister(),
|
||||||
Store: cache.NewStore(cache.MetaNamespaceKeyFunc),
|
nodeListerSynced: nodeInformer.Informer().HasSynced,
|
||||||
},
|
workingQueue: workqueue.NewNamedDelayingQueue("service"),
|
||||||
workingQueue: workqueue.NewDelayingQueue(),
|
|
||||||
}
|
}
|
||||||
s.serviceStore.Indexer, s.serviceController = cache.NewIndexerInformer(
|
|
||||||
&cache.ListWatch{
|
serviceInformer.Informer().AddEventHandlerWithResyncPeriod(
|
||||||
ListFunc: func(options metav1.ListOptions) (pkgruntime.Object, error) {
|
|
||||||
return s.kubeClient.Core().Services(metav1.NamespaceAll).List(options)
|
|
||||||
},
|
|
||||||
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
|
|
||||||
return s.kubeClient.Core().Services(metav1.NamespaceAll).Watch(options)
|
|
||||||
},
|
|
||||||
},
|
|
||||||
&v1.Service{},
|
|
||||||
serviceSyncPeriod,
|
|
||||||
cache.ResourceEventHandlerFuncs{
|
cache.ResourceEventHandlerFuncs{
|
||||||
AddFunc: s.enqueueService,
|
AddFunc: s.enqueueService,
|
||||||
UpdateFunc: func(old, cur interface{}) {
|
UpdateFunc: func(old, cur interface{}) {
|
||||||
@ -145,8 +138,11 @@ func New(cloud cloudprovider.Interface, kubeClient clientset.Interface, clusterN
|
|||||||
},
|
},
|
||||||
DeleteFunc: s.enqueueService,
|
DeleteFunc: s.enqueueService,
|
||||||
},
|
},
|
||||||
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
|
serviceSyncPeriod,
|
||||||
)
|
)
|
||||||
|
s.serviceLister = serviceInformer.Lister()
|
||||||
|
s.serviceListerSynced = serviceInformer.Informer().HasSynced
|
||||||
|
|
||||||
if err := s.init(); err != nil {
|
if err := s.init(); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -173,15 +169,24 @@ func (s *ServiceController) enqueueService(obj interface{}) {
|
|||||||
//
|
//
|
||||||
// It's an error to call Run() more than once for a given ServiceController
|
// It's an error to call Run() more than once for a given ServiceController
|
||||||
// object.
|
// object.
|
||||||
func (s *ServiceController) Run(workers int) {
|
func (s *ServiceController) Run(stopCh <-chan struct{}, workers int) {
|
||||||
defer runtime.HandleCrash()
|
defer runtime.HandleCrash()
|
||||||
go s.serviceController.Run(wait.NeverStop)
|
defer s.workingQueue.ShutDown()
|
||||||
for i := 0; i < workers; i++ {
|
|
||||||
go wait.Until(s.worker, time.Second, wait.NeverStop)
|
glog.Info("Starting service controller")
|
||||||
|
|
||||||
|
if !cache.WaitForCacheSync(stopCh, s.serviceListerSynced, s.nodeListerSynced) {
|
||||||
|
runtime.HandleError(fmt.Errorf("timed out waiting for caches to sync"))
|
||||||
}
|
}
|
||||||
nodeLW := cache.NewListWatchFromClient(s.kubeClient.Core().RESTClient(), "nodes", metav1.NamespaceAll, fields.Everything())
|
|
||||||
cache.NewReflector(nodeLW, &v1.Node{}, s.nodeLister.Store, 0).Run()
|
for i := 0; i < workers; i++ {
|
||||||
go wait.Until(s.nodeSyncLoop, nodeSyncPeriod, wait.NeverStop)
|
go wait.Until(s.worker, time.Second, stopCh)
|
||||||
|
}
|
||||||
|
|
||||||
|
go wait.Until(s.nodeSyncLoop, nodeSyncPeriod, stopCh)
|
||||||
|
|
||||||
|
<-stopCh
|
||||||
|
glog.Info("Stopping service controller")
|
||||||
}
|
}
|
||||||
|
|
||||||
// worker runs a worker thread that just dequeues items, processes them, and marks them done.
|
// worker runs a worker thread that just dequeues items, processes them, and marks them done.
|
||||||
@ -258,12 +263,13 @@ func (s *ServiceController) processServiceUpdate(cachedService *cachedService, s
|
|||||||
// Returns whatever error occurred along with a boolean indicator of whether it
|
// Returns whatever error occurred along with a boolean indicator of whether it
|
||||||
// should be retried.
|
// should be retried.
|
||||||
func (s *ServiceController) createLoadBalancerIfNeeded(key string, service *v1.Service) (error, bool) {
|
func (s *ServiceController) createLoadBalancerIfNeeded(key string, service *v1.Service) (error, bool) {
|
||||||
|
|
||||||
// Note: It is safe to just call EnsureLoadBalancer. But, on some clouds that requires a delete & create,
|
// Note: It is safe to just call EnsureLoadBalancer. But, on some clouds that requires a delete & create,
|
||||||
// which may involve service interruption. Also, we would like user-friendly events.
|
// which may involve service interruption. Also, we would like user-friendly events.
|
||||||
|
|
||||||
// Save the state so we can avoid a write if it doesn't change
|
// Save the state so we can avoid a write if it doesn't change
|
||||||
previousState := v1.LoadBalancerStatusDeepCopy(&service.Status.LoadBalancer)
|
previousState := v1.LoadBalancerStatusDeepCopy(&service.Status.LoadBalancer)
|
||||||
|
var newState *v1.LoadBalancerStatus
|
||||||
|
var err error
|
||||||
|
|
||||||
if !wantsLoadBalancer(service) {
|
if !wantsLoadBalancer(service) {
|
||||||
needDelete := true
|
needDelete := true
|
||||||
@ -284,7 +290,7 @@ func (s *ServiceController) createLoadBalancerIfNeeded(key string, service *v1.S
|
|||||||
s.eventRecorder.Event(service, v1.EventTypeNormal, "DeletedLoadBalancer", "Deleted load balancer")
|
s.eventRecorder.Event(service, v1.EventTypeNormal, "DeletedLoadBalancer", "Deleted load balancer")
|
||||||
}
|
}
|
||||||
|
|
||||||
service.Status.LoadBalancer = v1.LoadBalancerStatus{}
|
newState = &v1.LoadBalancerStatus{}
|
||||||
} else {
|
} else {
|
||||||
glog.V(2).Infof("Ensuring LB for service %s", key)
|
glog.V(2).Infof("Ensuring LB for service %s", key)
|
||||||
|
|
||||||
@ -292,7 +298,7 @@ func (s *ServiceController) createLoadBalancerIfNeeded(key string, service *v1.S
|
|||||||
|
|
||||||
// The load balancer doesn't exist yet, so create it.
|
// The load balancer doesn't exist yet, so create it.
|
||||||
s.eventRecorder.Event(service, v1.EventTypeNormal, "CreatingLoadBalancer", "Creating load balancer")
|
s.eventRecorder.Event(service, v1.EventTypeNormal, "CreatingLoadBalancer", "Creating load balancer")
|
||||||
err := s.createLoadBalancer(service)
|
newState, err = s.createLoadBalancer(service)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("Failed to create load balancer for service %s: %v", key, err), retryable
|
return fmt.Errorf("Failed to create load balancer for service %s: %v", key, err), retryable
|
||||||
}
|
}
|
||||||
@ -301,7 +307,17 @@ func (s *ServiceController) createLoadBalancerIfNeeded(key string, service *v1.S
|
|||||||
|
|
||||||
// Write the state if changed
|
// Write the state if changed
|
||||||
// TODO: Be careful here ... what if there were other changes to the service?
|
// TODO: Be careful here ... what if there were other changes to the service?
|
||||||
if !v1.LoadBalancerStatusEqual(previousState, &service.Status.LoadBalancer) {
|
if !v1.LoadBalancerStatusEqual(previousState, newState) {
|
||||||
|
// Make a copy so we don't mutate the shared informer cache
|
||||||
|
copy, err := api.Scheme.DeepCopy(service)
|
||||||
|
if err != nil {
|
||||||
|
return err, retryable
|
||||||
|
}
|
||||||
|
service = copy.(*v1.Service)
|
||||||
|
|
||||||
|
// Update the status on the copy
|
||||||
|
service.Status.LoadBalancer = *newState
|
||||||
|
|
||||||
if err := s.persistUpdate(service); err != nil {
|
if err := s.persistUpdate(service); err != nil {
|
||||||
return fmt.Errorf("Failed to persist updated status to apiserver, even after retries. Giving up: %v", err), notRetryable
|
return fmt.Errorf("Failed to persist updated status to apiserver, even after retries. Giving up: %v", err), notRetryable
|
||||||
}
|
}
|
||||||
@ -340,30 +356,23 @@ func (s *ServiceController) persistUpdate(service *v1.Service) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *ServiceController) createLoadBalancer(service *v1.Service) error {
|
func (s *ServiceController) createLoadBalancer(service *v1.Service) (*v1.LoadBalancerStatus, error) {
|
||||||
nodes, err := s.nodeLister.List()
|
nodes, err := s.nodeLister.List(labels.Everything())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
lbNodes := []*v1.Node{}
|
lbNodes := []*v1.Node{}
|
||||||
for ix := range nodes.Items {
|
for ix := range nodes {
|
||||||
if includeNodeFromNodeList(&nodes.Items[ix]) {
|
if includeNodeFromNodeList(nodes[ix]) {
|
||||||
lbNodes = append(lbNodes, &nodes.Items[ix])
|
lbNodes = append(lbNodes, nodes[ix])
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// - Only one protocol supported per service
|
// - Only one protocol supported per service
|
||||||
// - Not all cloud providers support all protocols and the next step is expected to return
|
// - Not all cloud providers support all protocols and the next step is expected to return
|
||||||
// an error for unsupported protocols
|
// an error for unsupported protocols
|
||||||
status, err := s.balancer.EnsureLoadBalancer(s.clusterName, service, lbNodes)
|
return s.balancer.EnsureLoadBalancer(s.clusterName, service, lbNodes)
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
} else {
|
|
||||||
service.Status.LoadBalancer = *status
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// ListKeys implements the interface required by DeltaFIFO to list the keys we
|
// ListKeys implements the interface required by DeltaFIFO to list the keys we
|
||||||
@ -604,7 +613,7 @@ func includeNodeFromNodeList(node *v1.Node) bool {
|
|||||||
return !node.Spec.Unschedulable
|
return !node.Spec.Unschedulable
|
||||||
}
|
}
|
||||||
|
|
||||||
func getNodeConditionPredicate() listers.NodeConditionPredicate {
|
func getNodeConditionPredicate() corelisters.NodeConditionPredicate {
|
||||||
return func(node *v1.Node) bool {
|
return func(node *v1.Node) bool {
|
||||||
// We add the master to the node list, but its unschedulable. So we use this to filter
|
// We add the master to the node list, but its unschedulable. So we use this to filter
|
||||||
// the master.
|
// the master.
|
||||||
@ -631,7 +640,7 @@ func getNodeConditionPredicate() listers.NodeConditionPredicate {
|
|||||||
// nodeSyncLoop handles updating the hosts pointed to by all load
|
// nodeSyncLoop handles updating the hosts pointed to by all load
|
||||||
// balancers whenever the set of nodes in the cluster changes.
|
// balancers whenever the set of nodes in the cluster changes.
|
||||||
func (s *ServiceController) nodeSyncLoop() {
|
func (s *ServiceController) nodeSyncLoop() {
|
||||||
newHosts, err := s.nodeLister.NodeCondition(getNodeConditionPredicate()).List()
|
newHosts, err := s.nodeLister.ListWithPredicate(getNodeConditionPredicate())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("Failed to retrieve current set of nodes from node lister: %v", err)
|
glog.Errorf("Failed to retrieve current set of nodes from node lister: %v", err)
|
||||||
return
|
return
|
||||||
@ -736,30 +745,26 @@ func (s *ServiceController) syncService(key string) error {
|
|||||||
defer func() {
|
defer func() {
|
||||||
glog.V(4).Infof("Finished syncing service %q (%v)", key, time.Now().Sub(startTime))
|
glog.V(4).Infof("Finished syncing service %q (%v)", key, time.Now().Sub(startTime))
|
||||||
}()
|
}()
|
||||||
// obj holds the latest service info from apiserver
|
|
||||||
obj, exists, err := s.serviceStore.Indexer.GetByKey(key)
|
namespace, name, err := cache.SplitMetaNamespaceKey(key)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Infof("Unable to retrieve service %v from store: %v", key, err)
|
|
||||||
s.workingQueue.Add(key)
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if !exists {
|
|
||||||
|
// service holds the latest service info from apiserver
|
||||||
|
service, err := s.serviceLister.Services(namespace).Get(name)
|
||||||
|
switch {
|
||||||
|
case errors.IsNotFound(err):
|
||||||
// service absence in store means watcher caught the deletion, ensure LB info is cleaned
|
// service absence in store means watcher caught the deletion, ensure LB info is cleaned
|
||||||
glog.Infof("Service has been deleted %v", key)
|
glog.Infof("Service has been deleted %v", key)
|
||||||
err, retryDelay = s.processServiceDeletion(key)
|
err, retryDelay = s.processServiceDeletion(key)
|
||||||
} else {
|
case err != nil:
|
||||||
service, ok := obj.(*v1.Service)
|
glog.Infof("Unable to retrieve service %v from store: %v", key, err)
|
||||||
if ok {
|
s.workingQueue.Add(key)
|
||||||
cachedService = s.cache.getOrCreate(key)
|
return err
|
||||||
err, retryDelay = s.processServiceUpdate(cachedService, service, key)
|
default:
|
||||||
} else {
|
cachedService = s.cache.getOrCreate(key)
|
||||||
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
|
err, retryDelay = s.processServiceUpdate(cachedService, service, key)
|
||||||
if !ok {
|
|
||||||
return fmt.Errorf("object contained wasn't a service or a deleted key: %#v", obj)
|
|
||||||
}
|
|
||||||
glog.Infof("Found tombstone for %v", key)
|
|
||||||
err, retryDelay = s.processServiceDeletion(tombstone.Key)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if retryDelay != 0 {
|
if retryDelay != 0 {
|
||||||
|
@ -26,7 +26,9 @@ import (
|
|||||||
"k8s.io/kubernetes/pkg/api/testapi"
|
"k8s.io/kubernetes/pkg/api/testapi"
|
||||||
"k8s.io/kubernetes/pkg/api/v1"
|
"k8s.io/kubernetes/pkg/api/v1"
|
||||||
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset/fake"
|
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset/fake"
|
||||||
|
informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions"
|
||||||
fakecloud "k8s.io/kubernetes/pkg/cloudprovider/providers/fake"
|
fakecloud "k8s.io/kubernetes/pkg/cloudprovider/providers/fake"
|
||||||
|
"k8s.io/kubernetes/pkg/controller"
|
||||||
)
|
)
|
||||||
|
|
||||||
const region = "us-central"
|
const region = "us-central"
|
||||||
@ -35,6 +37,30 @@ func newService(name string, uid types.UID, serviceType v1.ServiceType) *v1.Serv
|
|||||||
return &v1.Service{ObjectMeta: metav1.ObjectMeta{Name: name, Namespace: "namespace", UID: uid, SelfLink: testapi.Default.SelfLink("services", name)}, Spec: v1.ServiceSpec{Type: serviceType}}
|
return &v1.Service{ObjectMeta: metav1.ObjectMeta{Name: name, Namespace: "namespace", UID: uid, SelfLink: testapi.Default.SelfLink("services", name)}, Spec: v1.ServiceSpec{Type: serviceType}}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func alwaysReady() bool { return true }
|
||||||
|
|
||||||
|
func newController() (*ServiceController, *fakecloud.FakeCloud, *fake.Clientset) {
|
||||||
|
cloud := &fakecloud.FakeCloud{}
|
||||||
|
cloud.Region = region
|
||||||
|
|
||||||
|
client := fake.NewSimpleClientset()
|
||||||
|
|
||||||
|
informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc())
|
||||||
|
serviceInformer := informerFactory.Core().V1().Services()
|
||||||
|
nodeInformer := informerFactory.Core().V1().Nodes()
|
||||||
|
|
||||||
|
controller, _ := New(cloud, client, serviceInformer, nodeInformer, "test-cluster")
|
||||||
|
controller.nodeListerSynced = alwaysReady
|
||||||
|
controller.serviceListerSynced = alwaysReady
|
||||||
|
controller.eventRecorder = record.NewFakeRecorder(100)
|
||||||
|
|
||||||
|
controller.init()
|
||||||
|
cloud.Calls = nil // ignore any cloud calls made in init()
|
||||||
|
client.ClearActions() // ignore any client calls made in init()
|
||||||
|
|
||||||
|
return controller, cloud, client
|
||||||
|
}
|
||||||
|
|
||||||
func TestCreateExternalLoadBalancer(t *testing.T) {
|
func TestCreateExternalLoadBalancer(t *testing.T) {
|
||||||
table := []struct {
|
table := []struct {
|
||||||
service *v1.Service
|
service *v1.Service
|
||||||
@ -93,14 +119,7 @@ func TestCreateExternalLoadBalancer(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
for _, item := range table {
|
for _, item := range table {
|
||||||
cloud := &fakecloud.FakeCloud{}
|
controller, cloud, client := newController()
|
||||||
cloud.Region = region
|
|
||||||
client := &fake.Clientset{}
|
|
||||||
controller, _ := New(cloud, client, "test-cluster")
|
|
||||||
controller.eventRecorder = record.NewFakeRecorder(100)
|
|
||||||
controller.init()
|
|
||||||
cloud.Calls = nil // ignore any cloud calls made in init()
|
|
||||||
client.ClearActions() // ignore any client calls made in init()
|
|
||||||
err, _ := controller.createLoadBalancerIfNeeded("foo/bar", item.service)
|
err, _ := controller.createLoadBalancerIfNeeded("foo/bar", item.service)
|
||||||
if !item.expectErr && err != nil {
|
if !item.expectErr && err != nil {
|
||||||
t.Errorf("unexpected error: %v", err)
|
t.Errorf("unexpected error: %v", err)
|
||||||
@ -217,14 +236,7 @@ func TestUpdateNodesInExternalLoadBalancer(t *testing.T) {
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
for _, item := range table {
|
for _, item := range table {
|
||||||
cloud := &fakecloud.FakeCloud{}
|
controller, cloud, _ := newController()
|
||||||
|
|
||||||
cloud.Region = region
|
|
||||||
client := &fake.Clientset{}
|
|
||||||
controller, _ := New(cloud, client, "test-cluster2")
|
|
||||||
controller.eventRecorder = record.NewFakeRecorder(100)
|
|
||||||
controller.init()
|
|
||||||
cloud.Calls = nil // ignore any cloud calls made in init()
|
|
||||||
|
|
||||||
var services []*v1.Service
|
var services []*v1.Service
|
||||||
for _, service := range item.services {
|
for _, service := range item.services {
|
||||||
|
Loading…
Reference in New Issue
Block a user