mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-21 19:01:49 +00:00
Support endpoints event handlers in kube-proxy
This commit is contained in:
parent
e18843d353
commit
f7c06ad23c
@ -220,7 +220,10 @@ func NewProxyServerDefault(config *options.ProxyServerConfig) (*ProxyServer, err
|
||||
|
||||
var proxier proxy.ProxyProvider
|
||||
var servicesHandler proxyconfig.ServiceConfigHandler
|
||||
// TODO: Migrate all handlers to EndpointsHandler type and
|
||||
// get rid of this one.
|
||||
var endpointsHandler proxyconfig.EndpointsConfigHandler
|
||||
var endpointsEventHandler proxyconfig.EndpointsHandler
|
||||
|
||||
proxyMode := getProxyMode(string(config.Mode), client.Core().Nodes(), hostname, iptInterface, iptables.LinuxKernelCompatTester{})
|
||||
if proxyMode == proxyModeIPTables {
|
||||
@ -257,7 +260,7 @@ func NewProxyServerDefault(config *options.ProxyServerConfig) (*ProxyServer, err
|
||||
// This is a proxy.LoadBalancer which NewProxier needs but has methods we don't need for
|
||||
// our config.EndpointsConfigHandler.
|
||||
loadBalancer := winuserspace.NewLoadBalancerRR()
|
||||
// set EndpointsConfigHandler to our loadBalancer
|
||||
// set EndpointsHandler to our loadBalancer
|
||||
endpointsHandler = loadBalancer
|
||||
proxierUserspace, err := winuserspace.NewProxier(
|
||||
loadBalancer,
|
||||
@ -318,7 +321,12 @@ func NewProxyServerDefault(config *options.ProxyServerConfig) (*ProxyServer, err
|
||||
go serviceConfig.Run(wait.NeverStop)
|
||||
|
||||
endpointsConfig := proxyconfig.NewEndpointsConfig(informerFactory.Core().InternalVersion().Endpoints(), config.ConfigSyncPeriod)
|
||||
endpointsConfig.RegisterHandler(endpointsHandler)
|
||||
if endpointsHandler != nil {
|
||||
endpointsConfig.RegisterHandler(endpointsHandler)
|
||||
}
|
||||
if endpointsEventHandler != nil {
|
||||
endpointsConfig.RegisterEventHandler(endpointsEventHandler)
|
||||
}
|
||||
go endpointsConfig.Run(wait.NeverStop)
|
||||
|
||||
// This has to start after the calls to NewServiceConfig and NewEndpointsConfig because those
|
||||
|
@ -60,12 +60,31 @@ type EndpointsConfigHandler interface {
|
||||
OnEndpointsUpdate(endpoints []*api.Endpoints)
|
||||
}
|
||||
|
||||
// EndpointsHandler is an abstract interface o objects which receive
|
||||
// notifications about endpoints object changes.
|
||||
type EndpointsHandler interface {
|
||||
// OnEndpointsAdd is called whenever creation of new endpoints object
|
||||
// is observed.
|
||||
OnEndpointsAdd(endpoints *api.Endpoints)
|
||||
// OnEndpointsUpdate is called whenever modification of an existing
|
||||
// endpoints object is observed.
|
||||
OnEndpointsUpdate(oldEndpoints, endpoints *api.Endpoints)
|
||||
// OnEndpointsDelete is called whever deletion of an existing endpoints
|
||||
// object is observed.
|
||||
OnEndpointsDelete(endpoints *api.Endpoints)
|
||||
// OnEndpointsSynced is called once all the initial event handlers were
|
||||
// called and the state is fully propagated to local cache.
|
||||
OnEndpointsSynced()
|
||||
}
|
||||
|
||||
// EndpointsConfig tracks a set of endpoints configurations.
|
||||
// It accepts "set", "add" and "remove" operations of endpoints via channels, and invokes registered handlers on change.
|
||||
type EndpointsConfig struct {
|
||||
lister listers.EndpointsLister
|
||||
listerSynced cache.InformerSynced
|
||||
handlers []EndpointsConfigHandler
|
||||
lister listers.EndpointsLister
|
||||
listerSynced cache.InformerSynced
|
||||
eventHandlers []EndpointsHandler
|
||||
// TODO: Remove handlers by switching them to eventHandlers.
|
||||
handlers []EndpointsConfigHandler
|
||||
// updates channel is used to trigger registered handlers.
|
||||
updates chan struct{}
|
||||
stop chan struct{}
|
||||
@ -101,6 +120,11 @@ func (c *EndpointsConfig) RegisterHandler(handler EndpointsConfigHandler) {
|
||||
c.handlers = append(c.handlers, handler)
|
||||
}
|
||||
|
||||
// RegisterEventHandler registers a handler which is called on every endpoints change.
|
||||
func (c *EndpointsConfig) RegisterEventHandler(handler EndpointsHandler) {
|
||||
c.eventHandlers = append(c.eventHandlers, handler)
|
||||
}
|
||||
|
||||
// Run starts the goroutine responsible for calling registered handlers.
|
||||
func (c *EndpointsConfig) Run(stopCh <-chan struct{}) {
|
||||
if !cache.WaitForCacheSync(stopCh, c.listerSynced) {
|
||||
@ -111,6 +135,10 @@ func (c *EndpointsConfig) Run(stopCh <-chan struct{}) {
|
||||
// We have synced informers. Now we can start delivering updates
|
||||
// to the registered handler.
|
||||
go func() {
|
||||
for i := range c.eventHandlers {
|
||||
glog.V(3).Infof("Calling handler.OnEndpointsSynced()")
|
||||
c.eventHandlers[i].OnEndpointsSynced()
|
||||
}
|
||||
for {
|
||||
select {
|
||||
case <-c.updates:
|
||||
@ -140,15 +168,54 @@ func (c *EndpointsConfig) Run(stopCh <-chan struct{}) {
|
||||
}()
|
||||
}
|
||||
|
||||
func (c *EndpointsConfig) handleAddEndpoints(_ interface{}) {
|
||||
func (c *EndpointsConfig) handleAddEndpoints(obj interface{}) {
|
||||
endpoints, ok := obj.(*api.Endpoints)
|
||||
if !ok {
|
||||
utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", obj))
|
||||
return
|
||||
}
|
||||
for i := range c.eventHandlers {
|
||||
glog.V(4).Infof("Calling handler.OnEndpointsAdd")
|
||||
c.eventHandlers[i].OnEndpointsAdd(endpoints)
|
||||
}
|
||||
c.dispatchUpdate()
|
||||
}
|
||||
|
||||
func (c *EndpointsConfig) handleUpdateEndpoints(_, _ interface{}) {
|
||||
func (c *EndpointsConfig) handleUpdateEndpoints(oldObj, newObj interface{}) {
|
||||
oldEndpoints, ok := oldObj.(*api.Endpoints)
|
||||
if !ok {
|
||||
utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", oldObj))
|
||||
return
|
||||
}
|
||||
endpoints, ok := newObj.(*api.Endpoints)
|
||||
if !ok {
|
||||
utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", newObj))
|
||||
return
|
||||
}
|
||||
for i := range c.eventHandlers {
|
||||
glog.V(4).Infof("Calling handler.OnEndpointsUpdate")
|
||||
c.eventHandlers[i].OnEndpointsUpdate(oldEndpoints, endpoints)
|
||||
}
|
||||
c.dispatchUpdate()
|
||||
}
|
||||
|
||||
func (c *EndpointsConfig) handleDeleteEndpoints(_ interface{}) {
|
||||
func (c *EndpointsConfig) handleDeleteEndpoints(obj interface{}) {
|
||||
endpoints, ok := obj.(*api.Endpoints)
|
||||
if !ok {
|
||||
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
|
||||
if !ok {
|
||||
utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", obj))
|
||||
return
|
||||
}
|
||||
if endpoints, ok = tombstone.Obj.(*api.Endpoints); !ok {
|
||||
utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", obj))
|
||||
return
|
||||
}
|
||||
}
|
||||
for i := range c.eventHandlers {
|
||||
glog.V(4).Infof("Calling handler.OnEndpointsUpdate")
|
||||
c.eventHandlers[i].OnEndpointsDelete(endpoints)
|
||||
}
|
||||
c.dispatchUpdate()
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user