mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 03:41:45 +00:00
Remove deprecated code from proxy/config
This commit is contained in:
parent
c3e9467b63
commit
af710835fa
@ -220,9 +220,6 @@ func NewProxyServerDefault(config *options.ProxyServerConfig) (*ProxyServer, err
|
||||
|
||||
var proxier proxy.ProxyProvider
|
||||
var servicesHandler proxyconfig.ServiceConfigHandler
|
||||
// TODO: Migrate all handlers to EndpointsHandler type and
|
||||
// get rid of this one.
|
||||
var endpointsHandler proxyconfig.EndpointsConfigHandler
|
||||
var endpointsEventHandler proxyconfig.EndpointsHandler
|
||||
|
||||
proxyMode := getProxyMode(string(config.Mode), client.Core().Nodes(), hostname, iptInterface, iptables.LinuxKernelCompatTester{})
|
||||
@ -321,12 +318,7 @@ func NewProxyServerDefault(config *options.ProxyServerConfig) (*ProxyServer, err
|
||||
go serviceConfig.Run(wait.NeverStop)
|
||||
|
||||
endpointsConfig := proxyconfig.NewEndpointsConfig(informerFactory.Core().InternalVersion().Endpoints(), config.ConfigSyncPeriod)
|
||||
if endpointsHandler != nil {
|
||||
endpointsConfig.RegisterHandler(endpointsHandler)
|
||||
}
|
||||
if endpointsEventHandler != nil {
|
||||
endpointsConfig.RegisterEventHandler(endpointsEventHandler)
|
||||
}
|
||||
endpointsConfig.RegisterEventHandler(endpointsEventHandler)
|
||||
go endpointsConfig.Run(wait.NeverStop)
|
||||
|
||||
// This has to start after the calls to NewServiceConfig and NewEndpointsConfig because those
|
||||
|
@ -143,7 +143,7 @@ func main() {
|
||||
serviceConfig.RegisterHandler(&kubemark.FakeProxyHandler{})
|
||||
|
||||
endpointsConfig := proxyconfig.NewEndpointsConfig(informerFactory.Core().InternalVersion().Endpoints(), configResyncPeriod)
|
||||
endpointsConfig.RegisterHandler(&kubemark.FakeProxyHandler{})
|
||||
endpointsConfig.RegisterEventHandler(&kubemark.FakeProxyHandler{})
|
||||
|
||||
eventClient, err := clientgoclientset.NewForConfig(clientConfig)
|
||||
if err != nil {
|
||||
|
@ -40,8 +40,11 @@ type HollowProxy struct {
|
||||
|
||||
type FakeProxyHandler struct{}
|
||||
|
||||
func (*FakeProxyHandler) OnServiceUpdate(services []*api.Service) {}
|
||||
func (*FakeProxyHandler) OnEndpointsUpdate(endpoints []*api.Endpoints) {}
|
||||
func (*FakeProxyHandler) OnServiceUpdate(services []*api.Service) {}
|
||||
func (*FakeProxyHandler) OnEndpointsAdd(endpoints *api.Endpoints) {}
|
||||
func (*FakeProxyHandler) OnEndpointsUpdate(oldEndpoints, endpoints *api.Endpoints) {}
|
||||
func (*FakeProxyHandler) OnEndpointsDelete(endpoints *api.Endpoints) {}
|
||||
func (*FakeProxyHandler) OnEndpointsSynced() {}
|
||||
|
||||
type FakeProxier struct{}
|
||||
|
||||
|
@ -40,6 +40,7 @@ go_test(
|
||||
"//pkg/client/clientset_generated/internalclientset/fake:go_default_library",
|
||||
"//pkg/client/informers/informers_generated/internalversion:go_default_library",
|
||||
"//vendor:k8s.io/apimachinery/pkg/apis/meta/v1",
|
||||
"//vendor:k8s.io/apimachinery/pkg/types",
|
||||
"//vendor:k8s.io/apimachinery/pkg/util/wait",
|
||||
"//vendor:k8s.io/apimachinery/pkg/watch",
|
||||
"//vendor:k8s.io/client-go/testing",
|
||||
|
@ -24,6 +24,7 @@ import (
|
||||
"time"
|
||||
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
ktesting "k8s.io/client-go/testing"
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
@ -124,40 +125,34 @@ func TestNewEndpointsSourceApi_UpdatesAndMultipleEndpoints(t *testing.T) {
|
||||
stopCh := make(chan struct{})
|
||||
defer close(stopCh)
|
||||
|
||||
ch := make(chan struct{})
|
||||
handler := newEpsHandler(t, nil, func() { ch <- struct{}{} })
|
||||
handler := NewEndpointsHandlerMock()
|
||||
|
||||
sharedInformers := informers.NewSharedInformerFactory(client, time.Minute)
|
||||
|
||||
endpointsConfig := NewEndpointsConfig(sharedInformers.Core().InternalVersion().Endpoints(), time.Minute)
|
||||
endpointsConfig.RegisterHandler(handler)
|
||||
endpointsConfig.RegisterEventHandler(handler)
|
||||
go sharedInformers.Start(stopCh)
|
||||
go endpointsConfig.Run(stopCh)
|
||||
|
||||
// Add the first endpoints
|
||||
handler.expected = []*api.Endpoints{endpoints1v1}
|
||||
fakeWatch.Add(endpoints1v1)
|
||||
<-ch
|
||||
handler.ValidateEndpoints(t, []*api.Endpoints{endpoints1v1})
|
||||
|
||||
// Add another endpoints
|
||||
handler.expected = []*api.Endpoints{endpoints1v1, endpoints2}
|
||||
fakeWatch.Add(endpoints2)
|
||||
<-ch
|
||||
handler.ValidateEndpoints(t, []*api.Endpoints{endpoints1v1, endpoints2})
|
||||
|
||||
// Modify endpoints1
|
||||
handler.expected = []*api.Endpoints{endpoints1v2, endpoints2}
|
||||
fakeWatch.Modify(endpoints1v2)
|
||||
<-ch
|
||||
handler.ValidateEndpoints(t, []*api.Endpoints{endpoints1v2, endpoints2})
|
||||
|
||||
// Delete endpoints1
|
||||
handler.expected = []*api.Endpoints{endpoints2}
|
||||
fakeWatch.Delete(endpoints1v2)
|
||||
<-ch
|
||||
handler.ValidateEndpoints(t, []*api.Endpoints{endpoints2})
|
||||
|
||||
// Delete endpoints2
|
||||
handler.expected = []*api.Endpoints{}
|
||||
fakeWatch.Delete(endpoints2)
|
||||
<-ch
|
||||
handler.ValidateEndpoints(t, []*api.Endpoints{})
|
||||
}
|
||||
|
||||
type svcHandler struct {
|
||||
@ -178,22 +173,17 @@ func (s *svcHandler) OnServiceUpdate(services []*api.Service) {
|
||||
}
|
||||
}
|
||||
|
||||
type epsHandler struct {
|
||||
t *testing.T
|
||||
expected []*api.Endpoints
|
||||
done func()
|
||||
}
|
||||
|
||||
func newEpsHandler(t *testing.T, eps []*api.Endpoints, done func()) *epsHandler {
|
||||
return &epsHandler{t: t, expected: eps, done: done}
|
||||
}
|
||||
|
||||
func (e *epsHandler) OnEndpointsUpdate(endpoints []*api.Endpoints) {
|
||||
defer e.done()
|
||||
sort.Sort(sortedEndpoints(endpoints))
|
||||
if !reflect.DeepEqual(e.expected, endpoints) {
|
||||
e.t.Errorf("Unexpected endpoints: %#v, expected: %#v", endpoints, e.expected)
|
||||
func newEpsHandler(t *testing.T, eps []*api.Endpoints, done func()) EndpointsHandler {
|
||||
ehm := &EndpointsHandlerMock{
|
||||
state: make(map[types.NamespacedName]*api.Endpoints),
|
||||
}
|
||||
ehm.process = func(endpoints []*api.Endpoints) {
|
||||
defer done()
|
||||
if !reflect.DeepEqual(eps, endpoints) {
|
||||
t.Errorf("Unexpected endpoints: %#v, expected: %#v", endpoints, eps)
|
||||
}
|
||||
}
|
||||
return ehm
|
||||
}
|
||||
|
||||
func TestInitialSync(t *testing.T) {
|
||||
@ -225,7 +215,7 @@ func TestInitialSync(t *testing.T) {
|
||||
svcHandler := newSvcHandler(t, []*api.Service{svc2, svc1}, wg.Done)
|
||||
svcConfig.RegisterHandler(svcHandler)
|
||||
epsHandler := newEpsHandler(t, []*api.Endpoints{eps2, eps1}, wg.Done)
|
||||
epsConfig.RegisterHandler(epsHandler)
|
||||
epsConfig.RegisterEventHandler(epsHandler)
|
||||
|
||||
stopCh := make(chan struct{})
|
||||
defer close(stopCh)
|
||||
|
@ -45,21 +45,6 @@ type ServiceConfigHandler interface {
|
||||
OnServiceUpdate(services []*api.Service)
|
||||
}
|
||||
|
||||
// EndpointsConfigHandler is an abstract interface of objects which receive update notifications for the set of endpoints.
|
||||
type EndpointsConfigHandler interface {
|
||||
// OnEndpointsUpdate gets called when endpoints configuration is changed for a given
|
||||
// service on any of the configuration sources. An example is when a new
|
||||
// service comes up, or when containers come up or down for an existing service.
|
||||
//
|
||||
// NOTE: For efficiency, endpoints are being passed by reference, thus,
|
||||
// OnEndpointsUpdate should NOT modify pointers of a given slice.
|
||||
// Those endpoints objects are shared with other layers of the system and
|
||||
// are guaranteed to be immutable with the assumption that are also
|
||||
// not mutated by those handlers. Make a deep copy if you need to modify
|
||||
// them in your code.
|
||||
OnEndpointsUpdate(endpoints []*api.Endpoints)
|
||||
}
|
||||
|
||||
// EndpointsHandler is an abstract interface o objects which receive
|
||||
// notifications about endpoints object changes.
|
||||
type EndpointsHandler interface {
|
||||
@ -83,11 +68,6 @@ type EndpointsConfig struct {
|
||||
lister listers.EndpointsLister
|
||||
listerSynced cache.InformerSynced
|
||||
eventHandlers []EndpointsHandler
|
||||
// TODO: Remove handlers by switching them to eventHandlers.
|
||||
handlers []EndpointsConfigHandler
|
||||
// updates channel is used to trigger registered handlers.
|
||||
updates chan struct{}
|
||||
stop chan struct{}
|
||||
}
|
||||
|
||||
// NewEndpointsConfig creates a new EndpointsConfig.
|
||||
@ -95,12 +75,6 @@ func NewEndpointsConfig(endpointsInformer coreinformers.EndpointsInformer, resyn
|
||||
result := &EndpointsConfig{
|
||||
lister: endpointsInformer.Lister(),
|
||||
listerSynced: endpointsInformer.Informer().HasSynced,
|
||||
// The updates channel is used to send interrupts to the Endpoints handler.
|
||||
// It's buffered because we never want to block for as long as there is a
|
||||
// pending interrupt, but don't want to drop them if the handler is doing
|
||||
// work.
|
||||
updates: make(chan struct{}, 1),
|
||||
stop: make(chan struct{}),
|
||||
}
|
||||
|
||||
endpointsInformer.Informer().AddEventHandlerWithResyncPeriod(
|
||||
@ -115,11 +89,6 @@ func NewEndpointsConfig(endpointsInformer coreinformers.EndpointsInformer, resyn
|
||||
return result
|
||||
}
|
||||
|
||||
// RegisterHandler registers a handler which is called on every endpoints change.
|
||||
func (c *EndpointsConfig) RegisterHandler(handler EndpointsConfigHandler) {
|
||||
c.handlers = append(c.handlers, handler)
|
||||
}
|
||||
|
||||
// RegisterEventHandler registers a handler which is called on every endpoints change.
|
||||
func (c *EndpointsConfig) RegisterEventHandler(handler EndpointsHandler) {
|
||||
c.eventHandlers = append(c.eventHandlers, handler)
|
||||
@ -132,40 +101,12 @@ func (c *EndpointsConfig) Run(stopCh <-chan struct{}) {
|
||||
return
|
||||
}
|
||||
|
||||
// We have synced informers. Now we can start delivering updates
|
||||
// to the registered handler.
|
||||
go func() {
|
||||
for i := range c.eventHandlers {
|
||||
glog.V(3).Infof("Calling handler.OnEndpointsSynced()")
|
||||
c.eventHandlers[i].OnEndpointsSynced()
|
||||
}
|
||||
for {
|
||||
select {
|
||||
case <-c.updates:
|
||||
endpoints, err := c.lister.List(labels.Everything())
|
||||
if err != nil {
|
||||
glog.Errorf("Error while listing endpoints from cache: %v", err)
|
||||
// This will cause a retry (if there isn't any other trigger in-flight).
|
||||
c.dispatchUpdate()
|
||||
continue
|
||||
}
|
||||
if endpoints == nil {
|
||||
endpoints = []*api.Endpoints{}
|
||||
}
|
||||
for i := range c.handlers {
|
||||
glog.V(3).Infof("Calling handler.OnEndpointsUpdate()")
|
||||
c.handlers[i].OnEndpointsUpdate(endpoints)
|
||||
}
|
||||
case <-c.stop:
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
// Close updates channel when stopCh is closed.
|
||||
go func() {
|
||||
<-stopCh
|
||||
close(c.stop)
|
||||
}()
|
||||
for i := range c.eventHandlers {
|
||||
glog.V(3).Infof("Calling handler.OnEndpointsSynced()")
|
||||
c.eventHandlers[i].OnEndpointsSynced()
|
||||
}
|
||||
|
||||
<-stopCh
|
||||
}
|
||||
|
||||
func (c *EndpointsConfig) handleAddEndpoints(obj interface{}) {
|
||||
@ -178,7 +119,6 @@ func (c *EndpointsConfig) handleAddEndpoints(obj interface{}) {
|
||||
glog.V(4).Infof("Calling handler.OnEndpointsAdd")
|
||||
c.eventHandlers[i].OnEndpointsAdd(endpoints)
|
||||
}
|
||||
c.dispatchUpdate()
|
||||
}
|
||||
|
||||
func (c *EndpointsConfig) handleUpdateEndpoints(oldObj, newObj interface{}) {
|
||||
@ -196,7 +136,6 @@ func (c *EndpointsConfig) handleUpdateEndpoints(oldObj, newObj interface{}) {
|
||||
glog.V(4).Infof("Calling handler.OnEndpointsUpdate")
|
||||
c.eventHandlers[i].OnEndpointsUpdate(oldEndpoints, endpoints)
|
||||
}
|
||||
c.dispatchUpdate()
|
||||
}
|
||||
|
||||
func (c *EndpointsConfig) handleDeleteEndpoints(obj interface{}) {
|
||||
@ -216,18 +155,6 @@ func (c *EndpointsConfig) handleDeleteEndpoints(obj interface{}) {
|
||||
glog.V(4).Infof("Calling handler.OnEndpointsUpdate")
|
||||
c.eventHandlers[i].OnEndpointsDelete(endpoints)
|
||||
}
|
||||
c.dispatchUpdate()
|
||||
}
|
||||
|
||||
func (c *EndpointsConfig) dispatchUpdate() {
|
||||
select {
|
||||
case c.updates <- struct{}{}:
|
||||
// Work enqueued successfully
|
||||
case <-c.stop:
|
||||
// We're shut down / avoid logging the message below
|
||||
default:
|
||||
glog.V(4).Infof("Endpoints handler already has a pending interrupt.")
|
||||
}
|
||||
}
|
||||
|
||||
// ServiceConfig tracks a set of service configurations.
|
||||
|
@ -19,10 +19,12 @@ package config
|
||||
import (
|
||||
"reflect"
|
||||
"sort"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
ktesting "k8s.io/client-go/testing"
|
||||
@ -45,7 +47,6 @@ func (s sortedServices) Less(i, j int) bool {
|
||||
|
||||
type ServiceHandlerMock struct {
|
||||
updated chan []*api.Service
|
||||
waits int
|
||||
}
|
||||
|
||||
func NewServiceHandlerMock() *ServiceHandlerMock {
|
||||
@ -90,17 +91,66 @@ func (s sortedEndpoints) Less(i, j int) bool {
|
||||
}
|
||||
|
||||
type EndpointsHandlerMock struct {
|
||||
lock sync.Mutex
|
||||
|
||||
state map[types.NamespacedName]*api.Endpoints
|
||||
synced bool
|
||||
updated chan []*api.Endpoints
|
||||
waits int
|
||||
process func([]*api.Endpoints)
|
||||
}
|
||||
|
||||
func NewEndpointsHandlerMock() *EndpointsHandlerMock {
|
||||
return &EndpointsHandlerMock{updated: make(chan []*api.Endpoints, 5)}
|
||||
ehm := &EndpointsHandlerMock{
|
||||
state: make(map[types.NamespacedName]*api.Endpoints),
|
||||
updated: make(chan []*api.Endpoints, 5),
|
||||
}
|
||||
ehm.process = func(endpoints []*api.Endpoints) {
|
||||
ehm.updated <- endpoints
|
||||
}
|
||||
return ehm
|
||||
}
|
||||
|
||||
func (h *EndpointsHandlerMock) OnEndpointsUpdate(endpoints []*api.Endpoints) {
|
||||
func (h *EndpointsHandlerMock) OnEndpointsAdd(endpoints *api.Endpoints) {
|
||||
h.lock.Lock()
|
||||
defer h.lock.Unlock()
|
||||
namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}
|
||||
h.state[namespacedName] = endpoints
|
||||
h.sendEndpoints()
|
||||
}
|
||||
|
||||
func (h *EndpointsHandlerMock) OnEndpointsUpdate(oldEndpoints, endpoints *api.Endpoints) {
|
||||
h.lock.Lock()
|
||||
defer h.lock.Unlock()
|
||||
namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}
|
||||
h.state[namespacedName] = endpoints
|
||||
h.sendEndpoints()
|
||||
}
|
||||
|
||||
func (h *EndpointsHandlerMock) OnEndpointsDelete(endpoints *api.Endpoints) {
|
||||
h.lock.Lock()
|
||||
defer h.lock.Unlock()
|
||||
namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}
|
||||
delete(h.state, namespacedName)
|
||||
h.sendEndpoints()
|
||||
}
|
||||
|
||||
func (h *EndpointsHandlerMock) OnEndpointsSynced() {
|
||||
h.lock.Lock()
|
||||
defer h.lock.Unlock()
|
||||
h.synced = true
|
||||
h.sendEndpoints()
|
||||
}
|
||||
|
||||
func (h *EndpointsHandlerMock) sendEndpoints() {
|
||||
if !h.synced {
|
||||
return
|
||||
}
|
||||
endpoints := make([]*api.Endpoints, 0, len(h.state))
|
||||
for _, eps := range h.state {
|
||||
endpoints = append(endpoints, eps)
|
||||
}
|
||||
sort.Sort(sortedEndpoints(endpoints))
|
||||
h.updated <- endpoints
|
||||
h.process(endpoints)
|
||||
}
|
||||
|
||||
func (h *EndpointsHandlerMock) ValidateEndpoints(t *testing.T, expectedEndpoints []*api.Endpoints) {
|
||||
@ -230,8 +280,8 @@ func TestNewEndpointsMultipleHandlersAddedAndNotified(t *testing.T) {
|
||||
config := NewEndpointsConfig(sharedInformers.Core().InternalVersion().Endpoints(), time.Minute)
|
||||
handler := NewEndpointsHandlerMock()
|
||||
handler2 := NewEndpointsHandlerMock()
|
||||
config.RegisterHandler(handler)
|
||||
config.RegisterHandler(handler2)
|
||||
config.RegisterEventHandler(handler)
|
||||
config.RegisterEventHandler(handler2)
|
||||
go sharedInformers.Start(stopCh)
|
||||
go config.Run(stopCh)
|
||||
|
||||
@ -270,8 +320,8 @@ func TestNewEndpointsMultipleHandlersAddRemoveSetAndNotified(t *testing.T) {
|
||||
config := NewEndpointsConfig(sharedInformers.Core().InternalVersion().Endpoints(), time.Minute)
|
||||
handler := NewEndpointsHandlerMock()
|
||||
handler2 := NewEndpointsHandlerMock()
|
||||
config.RegisterHandler(handler)
|
||||
config.RegisterHandler(handler2)
|
||||
config.RegisterEventHandler(handler)
|
||||
config.RegisterEventHandler(handler2)
|
||||
go sharedInformers.Start(stopCh)
|
||||
go config.Run(stopCh)
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user