mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-27 13:37:30 +00:00
Fix part of #9382
This commit is contained in:
parent
cc326c714b
commit
15d50f4211
@ -30,7 +30,6 @@ import (
|
|||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/record"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/record"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/fields"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/fields"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
|
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/types"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/types"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
|
||||||
"github.com/golang/glog"
|
"github.com/golang/glog"
|
||||||
@ -75,6 +74,7 @@ type ServiceController struct {
|
|||||||
cache *serviceCache
|
cache *serviceCache
|
||||||
eventBroadcaster record.EventBroadcaster
|
eventBroadcaster record.EventBroadcaster
|
||||||
eventRecorder record.EventRecorder
|
eventRecorder record.EventRecorder
|
||||||
|
nodeLister cache.StoreToNodeLister
|
||||||
}
|
}
|
||||||
|
|
||||||
// New returns a new service controller to keep cloud provider service resources
|
// New returns a new service controller to keep cloud provider service resources
|
||||||
@ -91,6 +91,9 @@ func New(cloud cloudprovider.Interface, kubeClient client.Interface, clusterName
|
|||||||
cache: &serviceCache{serviceMap: make(map[string]*cachedService)},
|
cache: &serviceCache{serviceMap: make(map[string]*cachedService)},
|
||||||
eventBroadcaster: broadcaster,
|
eventBroadcaster: broadcaster,
|
||||||
eventRecorder: recorder,
|
eventRecorder: recorder,
|
||||||
|
nodeLister: cache.StoreToNodeLister{
|
||||||
|
Store: cache.NewStore(cache.MetaNamespaceKeyFunc),
|
||||||
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -99,6 +102,9 @@ func New(cloud cloudprovider.Interface, kubeClient client.Interface, clusterName
|
|||||||
// load balancers created and deleted appropriately.
|
// load balancers created and deleted appropriately.
|
||||||
// nodeSyncPeriod controls how often we check the cluster's nodes to determine
|
// nodeSyncPeriod controls how often we check the cluster's nodes to determine
|
||||||
// if external load balancers need to be updated to point to a new set.
|
// if external load balancers need to be updated to point to a new set.
|
||||||
|
//
|
||||||
|
// It's an error to call Run() more than once for a given ServiceController
|
||||||
|
// object.
|
||||||
func (s *ServiceController) Run(nodeSyncPeriod time.Duration) error {
|
func (s *ServiceController) Run(nodeSyncPeriod time.Duration) error {
|
||||||
if err := s.init(); err != nil {
|
if err := s.init(); err != nil {
|
||||||
return err
|
return err
|
||||||
@ -113,19 +119,27 @@ func (s *ServiceController) Run(nodeSyncPeriod time.Duration) error {
|
|||||||
|
|
||||||
// Get the currently existing set of services and then all future creates
|
// Get the currently existing set of services and then all future creates
|
||||||
// and updates of services.
|
// and updates of services.
|
||||||
// No delta compressor is needed for the DeltaFIFO queue because we only ever
|
// A delta compressor is needed for the DeltaFIFO queue because we only ever
|
||||||
// care about the most recent state.
|
// care about the most recent state.
|
||||||
serviceQueue := cache.NewDeltaFIFO(cache.MetaNamespaceKeyFunc, nil, s.cache)
|
serviceQueue := cache.NewDeltaFIFO(
|
||||||
|
cache.MetaNamespaceKeyFunc,
|
||||||
|
cache.DeltaCompressorFunc(func(d cache.Deltas) cache.Deltas {
|
||||||
|
if len(d) == 0 {
|
||||||
|
return d
|
||||||
|
}
|
||||||
|
return cache.Deltas{*d.Newest()}
|
||||||
|
}),
|
||||||
|
s.cache,
|
||||||
|
)
|
||||||
lw := cache.NewListWatchFromClient(s.kubeClient.(*client.Client), "services", api.NamespaceAll, fields.Everything())
|
lw := cache.NewListWatchFromClient(s.kubeClient.(*client.Client), "services", api.NamespaceAll, fields.Everything())
|
||||||
cache.NewReflector(lw, &api.Service{}, serviceQueue, 0).Run()
|
cache.NewReflector(lw, &api.Service{}, serviceQueue, 0).Run()
|
||||||
for i := 0; i < workerGoroutines; i++ {
|
for i := 0; i < workerGoroutines; i++ {
|
||||||
go s.watchServices(serviceQueue)
|
go s.watchServices(serviceQueue)
|
||||||
}
|
}
|
||||||
|
|
||||||
nodeLister := &cache.StoreToNodeLister{cache.NewStore(cache.MetaNamespaceKeyFunc)}
|
|
||||||
nodeLW := cache.NewListWatchFromClient(s.kubeClient.(*client.Client), "nodes", api.NamespaceAll, fields.Everything())
|
nodeLW := cache.NewListWatchFromClient(s.kubeClient.(*client.Client), "nodes", api.NamespaceAll, fields.Everything())
|
||||||
cache.NewReflector(nodeLW, &api.Node{}, nodeLister.Store, 0).Run()
|
cache.NewReflector(nodeLW, &api.Node{}, s.nodeLister.Store, 0).Run()
|
||||||
go s.nodeSyncLoop(nodeLister, nodeSyncPeriod)
|
go s.nodeSyncLoop(nodeSyncPeriod)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -342,7 +356,7 @@ func (s *ServiceController) createExternalLoadBalancer(service *api.Service) err
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
nodes, err := s.kubeClient.Nodes().List(labels.Everything(), fields.Everything())
|
nodes, err := s.nodeLister.List()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -352,7 +366,7 @@ func (s *ServiceController) createExternalLoadBalancer(service *api.Service) err
|
|||||||
// TODO: Make this actually work for multiple IPs by using different
|
// TODO: Make this actually work for multiple IPs by using different
|
||||||
// names for each. For now, we'll just create the first and break.
|
// names for each. For now, we'll just create the first and break.
|
||||||
status, err := s.balancer.CreateTCPLoadBalancer(name, s.zone.Region, net.ParseIP(publicIP),
|
status, err := s.balancer.CreateTCPLoadBalancer(name, s.zone.Region, net.ParseIP(publicIP),
|
||||||
ports, hostsFromNodeList(nodes), service.Spec.SessionAffinity)
|
ports, hostsFromNodeList(&nodes), service.Spec.SessionAffinity)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
} else {
|
} else {
|
||||||
@ -362,7 +376,7 @@ func (s *ServiceController) createExternalLoadBalancer(service *api.Service) err
|
|||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
status, err := s.balancer.CreateTCPLoadBalancer(name, s.zone.Region, nil,
|
status, err := s.balancer.CreateTCPLoadBalancer(name, s.zone.Region, nil,
|
||||||
ports, hostsFromNodeList(nodes), service.Spec.SessionAffinity)
|
ports, hostsFromNodeList(&nodes), service.Spec.SessionAffinity)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
} else {
|
} else {
|
||||||
@ -570,7 +584,7 @@ func hostsFromNodeList(list *api.NodeList) []string {
|
|||||||
|
|
||||||
// nodeSyncLoop handles updating the hosts pointed to by all external load
|
// nodeSyncLoop handles updating the hosts pointed to by all external load
|
||||||
// balancers whenever the set of nodes in the cluster changes.
|
// balancers whenever the set of nodes in the cluster changes.
|
||||||
func (s *ServiceController) nodeSyncLoop(nodeLister *cache.StoreToNodeLister, period time.Duration) {
|
func (s *ServiceController) nodeSyncLoop(period time.Duration) {
|
||||||
var prevHosts []string
|
var prevHosts []string
|
||||||
var servicesToUpdate []*cachedService
|
var servicesToUpdate []*cachedService
|
||||||
// TODO: Eliminate the unneeded now variable once we stop compiling in go1.3.
|
// TODO: Eliminate the unneeded now variable once we stop compiling in go1.3.
|
||||||
@ -578,7 +592,7 @@ func (s *ServiceController) nodeSyncLoop(nodeLister *cache.StoreToNodeLister, pe
|
|||||||
// something to compile, and gofmt1.4 complains about using `_ = range`.
|
// something to compile, and gofmt1.4 complains about using `_ = range`.
|
||||||
for now := range time.Tick(period) {
|
for now := range time.Tick(period) {
|
||||||
_ = now
|
_ = now
|
||||||
nodes, err := nodeLister.List()
|
nodes, err := s.nodeLister.List()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("Failed to retrieve current set of nodes from node lister: %v", err)
|
glog.Errorf("Failed to retrieve current set of nodes from node lister: %v", err)
|
||||||
continue
|
continue
|
||||||
|
Loading…
Reference in New Issue
Block a user