Merge pull request #43702 from wojtek-t/edge_based_proxy

Automatic merge from submit-queue

Edge-based userspace LB in kube-proxy

@thockin @bowei - if one of you could take a look if that PR doesn't break some basic kube-proxy assumptions. The similar change for winuserproxy should be pretty trivial.

And we should also do that for iptables, but that requires splitting the iptables code to syncProxyRules (which from what I know @thockin already started working on so we should probably wait for it to be done).
This commit is contained in:
Kubernetes Submit Queue 2017-04-12 00:30:53 -07:00 committed by GitHub
commit 284615d79d
7 changed files with 396 additions and 265 deletions

View File

@ -220,7 +220,10 @@ func NewProxyServerDefault(config *options.ProxyServerConfig) (*ProxyServer, err
var proxier proxy.ProxyProvider
var servicesHandler proxyconfig.ServiceConfigHandler
// TODO: Migrate all handlers to EndpointsHandler type and
// get rid of this one.
var endpointsHandler proxyconfig.EndpointsConfigHandler
var endpointsEventHandler proxyconfig.EndpointsHandler
proxyMode := getProxyMode(string(config.Mode), client.Core().Nodes(), hostname, iptInterface, iptables.LinuxKernelCompatTester{})
if proxyMode == proxyModeIPTables {
@ -247,7 +250,7 @@ func NewProxyServerDefault(config *options.ProxyServerConfig) (*ProxyServer, err
}
proxier = proxierIPTables
servicesHandler = proxierIPTables
endpointsHandler = proxierIPTables
endpointsEventHandler = proxierIPTables
// No turning back. Remove artifacts that might still exist from the userspace Proxier.
glog.V(0).Info("Tearing down userspace rules.")
userspace.CleanupLeftovers(iptInterface)
@ -257,7 +260,7 @@ func NewProxyServerDefault(config *options.ProxyServerConfig) (*ProxyServer, err
// This is a proxy.LoadBalancer which NewProxier needs but has methods we don't need for
// our config.EndpointsConfigHandler.
loadBalancer := winuserspace.NewLoadBalancerRR()
// set EndpointsConfigHandler to our loadBalancer
// set EndpointsHandler to our loadBalancer
endpointsHandler = loadBalancer
proxierUserspace, err := winuserspace.NewProxier(
loadBalancer,
@ -278,7 +281,7 @@ func NewProxyServerDefault(config *options.ProxyServerConfig) (*ProxyServer, err
// our config.EndpointsConfigHandler.
loadBalancer := userspace.NewLoadBalancerRR()
// set EndpointsConfigHandler to our loadBalancer
endpointsHandler = loadBalancer
endpointsEventHandler = loadBalancer
proxierUserspace, err := userspace.NewProxier(
loadBalancer,
net.ParseIP(config.BindAddress),
@ -318,7 +321,12 @@ func NewProxyServerDefault(config *options.ProxyServerConfig) (*ProxyServer, err
go serviceConfig.Run(wait.NeverStop)
endpointsConfig := proxyconfig.NewEndpointsConfig(informerFactory.Core().InternalVersion().Endpoints(), config.ConfigSyncPeriod)
endpointsConfig.RegisterHandler(endpointsHandler)
if endpointsHandler != nil {
endpointsConfig.RegisterHandler(endpointsHandler)
}
if endpointsEventHandler != nil {
endpointsConfig.RegisterEventHandler(endpointsEventHandler)
}
go endpointsConfig.Run(wait.NeverStop)
// This has to start after the calls to NewServiceConfig and NewEndpointsConfig because those

View File

@ -60,12 +60,31 @@ type EndpointsConfigHandler interface {
OnEndpointsUpdate(endpoints []*api.Endpoints)
}
// EndpointsHandler is an abstract interface o objects which receive
// notifications about endpoints object changes.
type EndpointsHandler interface {
// OnEndpointsAdd is called whenever creation of new endpoints object
// is observed.
OnEndpointsAdd(endpoints *api.Endpoints)
// OnEndpointsUpdate is called whenever modification of an existing
// endpoints object is observed.
OnEndpointsUpdate(oldEndpoints, endpoints *api.Endpoints)
// OnEndpointsDelete is called whever deletion of an existing endpoints
// object is observed.
OnEndpointsDelete(endpoints *api.Endpoints)
// OnEndpointsSynced is called once all the initial event handlers were
// called and the state is fully propagated to local cache.
OnEndpointsSynced()
}
// EndpointsConfig tracks a set of endpoints configurations.
// It accepts "set", "add" and "remove" operations of endpoints via channels, and invokes registered handlers on change.
type EndpointsConfig struct {
lister listers.EndpointsLister
listerSynced cache.InformerSynced
handlers []EndpointsConfigHandler
lister listers.EndpointsLister
listerSynced cache.InformerSynced
eventHandlers []EndpointsHandler
// TODO: Remove handlers by switching them to eventHandlers.
handlers []EndpointsConfigHandler
// updates channel is used to trigger registered handlers.
updates chan struct{}
stop chan struct{}
@ -101,6 +120,11 @@ func (c *EndpointsConfig) RegisterHandler(handler EndpointsConfigHandler) {
c.handlers = append(c.handlers, handler)
}
// RegisterEventHandler registers a handler which is called on every endpoints change.
func (c *EndpointsConfig) RegisterEventHandler(handler EndpointsHandler) {
c.eventHandlers = append(c.eventHandlers, handler)
}
// Run starts the goroutine responsible for calling registered handlers.
func (c *EndpointsConfig) Run(stopCh <-chan struct{}) {
if !cache.WaitForCacheSync(stopCh, c.listerSynced) {
@ -111,6 +135,10 @@ func (c *EndpointsConfig) Run(stopCh <-chan struct{}) {
// We have synced informers. Now we can start delivering updates
// to the registered handler.
go func() {
for i := range c.eventHandlers {
glog.V(3).Infof("Calling handler.OnEndpointsSynced()")
c.eventHandlers[i].OnEndpointsSynced()
}
for {
select {
case <-c.updates:
@ -140,15 +168,54 @@ func (c *EndpointsConfig) Run(stopCh <-chan struct{}) {
}()
}
func (c *EndpointsConfig) handleAddEndpoints(_ interface{}) {
func (c *EndpointsConfig) handleAddEndpoints(obj interface{}) {
endpoints, ok := obj.(*api.Endpoints)
if !ok {
utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", obj))
return
}
for i := range c.eventHandlers {
glog.V(4).Infof("Calling handler.OnEndpointsAdd")
c.eventHandlers[i].OnEndpointsAdd(endpoints)
}
c.dispatchUpdate()
}
func (c *EndpointsConfig) handleUpdateEndpoints(_, _ interface{}) {
func (c *EndpointsConfig) handleUpdateEndpoints(oldObj, newObj interface{}) {
oldEndpoints, ok := oldObj.(*api.Endpoints)
if !ok {
utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", oldObj))
return
}
endpoints, ok := newObj.(*api.Endpoints)
if !ok {
utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", newObj))
return
}
for i := range c.eventHandlers {
glog.V(4).Infof("Calling handler.OnEndpointsUpdate")
c.eventHandlers[i].OnEndpointsUpdate(oldEndpoints, endpoints)
}
c.dispatchUpdate()
}
func (c *EndpointsConfig) handleDeleteEndpoints(_ interface{}) {
func (c *EndpointsConfig) handleDeleteEndpoints(obj interface{}) {
endpoints, ok := obj.(*api.Endpoints)
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", obj))
return
}
if endpoints, ok = tombstone.Obj.(*api.Endpoints); !ok {
utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", obj))
return
}
}
for i := range c.eventHandlers {
glog.V(4).Infof("Calling handler.OnEndpointsUpdate")
c.eventHandlers[i].OnEndpointsDelete(endpoints)
}
c.dispatchUpdate()
}

View File

@ -196,8 +196,8 @@ func newServiceInfo(serviceName proxy.ServicePortName, port *api.ServicePort, se
return info
}
type endpointsMap map[types.NamespacedName]*api.Endpoints
type proxyServiceMap map[proxy.ServicePortName]*serviceInfo
type proxyEndpointMap map[proxy.ServicePortName][]*endpointsInfo
// Proxier is an iptables based proxy for connections between a localhost:lport
@ -211,9 +211,14 @@ type Proxier struct {
// pointers are shared with higher layers of kube-proxy. They are guaranteed
// to not be modified in the meantime, but also require to be not modified
// by Proxier.
// nil until we have seen an On*Update event.
allServices []*api.Service
allEndpoints []*api.Endpoints
allEndpoints endpointsMap
// allServices is nil until we have seen an OnServiceUpdate event.
allServices []*api.Service
// endpointsSynced is set to true when endpoints are synced after startup.
// This is used to avoid updating iptables with some partial data after
// kube-proxy restart.
endpointsSynced bool
throttle flowcontrol.RateLimiter
@ -328,6 +333,7 @@ func NewProxier(ipt utiliptables.Interface,
serviceMap: make(proxyServiceMap),
endpointsMap: make(proxyEndpointMap),
portsMap: make(map[localPort]closeable),
allEndpoints: make(endpointsMap),
syncPeriod: syncPeriod,
minSyncPeriod: minSyncPeriod,
throttle: throttle,
@ -532,19 +538,42 @@ func (proxier *Proxier) OnServiceUpdate(allServices []*api.Service) {
proxier.syncProxyRules(syncReasonServices)
}
// OnEndpointsUpdate takes in a slice of updated endpoints.
func (proxier *Proxier) OnEndpointsUpdate(allEndpoints []*api.Endpoints) {
func (proxier *Proxier) OnEndpointsAdd(endpoints *api.Endpoints) {
namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}
proxier.mu.Lock()
defer proxier.mu.Unlock()
if proxier.allEndpoints == nil {
glog.V(2).Info("Received first Endpoints update")
}
proxier.allEndpoints = allEndpoints
proxier.allEndpoints[namespacedName] = endpoints
proxier.syncProxyRules(syncReasonEndpoints)
}
func (proxier *Proxier) OnEndpointsUpdate(_, endpoints *api.Endpoints) {
namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}
proxier.mu.Lock()
defer proxier.mu.Unlock()
proxier.allEndpoints[namespacedName] = endpoints
proxier.syncProxyRules(syncReasonEndpoints)
}
func (proxier *Proxier) OnEndpointsDelete(endpoints *api.Endpoints) {
namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}
proxier.mu.Lock()
defer proxier.mu.Unlock()
delete(proxier.allEndpoints, namespacedName)
proxier.syncProxyRules(syncReasonEndpoints)
}
func (proxier *Proxier) OnEndpointsSynced() {
proxier.mu.Lock()
defer proxier.mu.Unlock()
proxier.endpointsSynced = true
proxier.syncProxyRules(syncReasonEndpoints)
}
// Convert a slice of api.Endpoints objects into a map of service-port -> endpoints.
func buildNewEndpointsMap(allEndpoints []*api.Endpoints, curMap proxyEndpointMap, hostname string) (newMap proxyEndpointMap, hcEndpoints map[types.NamespacedName]int, staleSet map[endpointServicePair]bool) {
func buildNewEndpointsMap(allEndpoints endpointsMap, curMap proxyEndpointMap, hostname string) (newMap proxyEndpointMap, hcEndpoints map[types.NamespacedName]int, staleSet map[endpointServicePair]bool) {
// return values
newMap = make(proxyEndpointMap)
@ -552,8 +581,8 @@ func buildNewEndpointsMap(allEndpoints []*api.Endpoints, curMap proxyEndpointMap
staleSet = make(map[endpointServicePair]bool)
// Update endpoints for services.
for i := range allEndpoints {
accumulateEndpointsMap(allEndpoints[i], hostname, &newMap)
for _, endpoints := range allEndpoints {
accumulateEndpointsMap(endpoints, hostname, &newMap)
}
// Check stale connections against endpoints missing from the update.
// TODO: we should really only mark a connection stale if the proto was UDP
@ -608,7 +637,6 @@ func buildNewEndpointsMap(allEndpoints []*api.Endpoints, curMap proxyEndpointMap
// NOTE: endpoints object should NOT be modified.
//
// TODO: this could be simplified:
// - hostPortInfo and endpointsInfo overlap too much
// - the test for this is overlapped by the test for buildNewEndpointsMap
// - naming is poor and responsibilities are muddled
func accumulateEndpointsMap(endpoints *api.Endpoints, hostname string, newEndpoints *proxyEndpointMap) {
@ -733,7 +761,7 @@ func (proxier *Proxier) syncProxyRules(reason syncReason) {
glog.V(4).Infof("syncProxyRules(%s) took %v", reason, time.Since(start))
}()
// don't sync rules till we've received services and endpoints
if proxier.allEndpoints == nil || proxier.allServices == nil {
if !proxier.endpointsSynced || proxier.allServices == nil {
glog.V(2).Info("Not syncing iptables until Services and Endpoints have been received from master")
return
}

View File

@ -383,16 +383,17 @@ func NewFakeProxier(ipt utiliptables.Interface) *Proxier {
// TODO: Call NewProxier after refactoring out the goroutine
// invocation into a Run() method.
return &Proxier{
exec: &exec.FakeExec{},
serviceMap: make(map[proxy.ServicePortName]*serviceInfo),
iptables: ipt,
clusterCIDR: "10.0.0.0/24",
allEndpoints: []*api.Endpoints{},
allServices: []*api.Service{},
hostname: testHostname,
portsMap: make(map[localPort]closeable),
portMapper: &fakePortOpener{[]*localPort{}},
healthChecker: newFakeHealthChecker(),
exec: &exec.FakeExec{},
serviceMap: make(map[proxy.ServicePortName]*serviceInfo),
iptables: ipt,
clusterCIDR: "10.0.0.0/24",
allEndpoints: make(endpointsMap),
allServices: []*api.Service{},
endpointsSynced: true,
hostname: testHostname,
portsMap: make(map[localPort]closeable),
portMapper: &fakePortOpener{[]*localPort{}},
healthChecker: newFakeHealthChecker(),
}
}
@ -611,7 +612,7 @@ func TestClusterIPEndpointsJump(t *testing.T) {
}
epIP := "10.180.0.1"
fp.allEndpoints = []*api.Endpoints{
fp.allEndpoints = makeEndpointsMap(
makeTestEndpoints(svcPortName.Namespace, svcPortName.Name, func(ept *api.Endpoints) {
ept.Subsets = []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{
@ -623,7 +624,7 @@ func TestClusterIPEndpointsJump(t *testing.T) {
}},
}}
}),
}
)
fp.syncProxyRules(syncReasonForce)
@ -675,7 +676,7 @@ func TestLoadBalancer(t *testing.T) {
}
epIP := "10.180.0.1"
fp.allEndpoints = []*api.Endpoints{
fp.allEndpoints = makeEndpointsMap(
makeTestEndpoints(svcPortName.Namespace, svcPortName.Name, func(ept *api.Endpoints) {
ept.Subsets = []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{
@ -687,7 +688,7 @@ func TestLoadBalancer(t *testing.T) {
}},
}}
}),
}
)
fp.syncProxyRules(syncReasonForce)
@ -732,7 +733,7 @@ func TestNodePort(t *testing.T) {
}
epIP := "10.180.0.1"
fp.allEndpoints = []*api.Endpoints{
fp.allEndpoints = makeEndpointsMap(
makeTestEndpoints(svcPortName.Namespace, svcPortName.Name, func(ept *api.Endpoints) {
ept.Subsets = []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{
@ -744,7 +745,7 @@ func TestNodePort(t *testing.T) {
}},
}}
}),
}
)
fp.syncProxyRules(syncReasonForce)
@ -826,7 +827,7 @@ func TestOnlyLocalLoadBalancing(t *testing.T) {
epIP2 := "10.180.2.1"
epStrLocal := fmt.Sprintf("%s:%d", epIP1, svcPort)
epStrNonLocal := fmt.Sprintf("%s:%d", epIP2, svcPort)
fp.allEndpoints = []*api.Endpoints{
fp.allEndpoints = makeEndpointsMap(
makeTestEndpoints(svcPortName.Namespace, svcPortName.Name, func(ept *api.Endpoints) {
ept.Subsets = []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{
@ -842,7 +843,7 @@ func TestOnlyLocalLoadBalancing(t *testing.T) {
}},
}}
}),
}
)
fp.syncProxyRules(syncReasonForce)
@ -917,7 +918,7 @@ func onlyLocalNodePorts(t *testing.T, fp *Proxier, ipt *iptablestest.FakeIPTable
epIP2 := "10.180.2.1"
epStrLocal := fmt.Sprintf("%s:%d", epIP1, svcPort)
epStrNonLocal := fmt.Sprintf("%s:%d", epIP2, svcPort)
fp.allEndpoints = []*api.Endpoints{
fp.allEndpoints = makeEndpointsMap(
makeTestEndpoints(svcPortName.Namespace, svcPortName.Name, func(ept *api.Endpoints) {
ept.Subsets = []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{
@ -933,7 +934,7 @@ func onlyLocalNodePorts(t *testing.T, fp *Proxier, ipt *iptablestest.FakeIPTable
}},
}}
}),
}
)
fp.syncProxyRules(syncReasonForce)
@ -1405,6 +1406,15 @@ func makeTestEndpoints(namespace, name string, eptFunc func(*api.Endpoints)) *ap
return ept
}
func makeEndpointsMap(allEndpoints ...*api.Endpoints) endpointsMap {
result := make(endpointsMap)
for _, endpoints := range allEndpoints {
namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}
result[namespacedName] = endpoints
}
return result
}
func makeNSN(namespace, name string) types.NamespacedName {
return types.NamespacedName{Namespace: namespace, Name: name}
}
@ -1420,21 +1430,21 @@ func Test_buildNewEndpointsMap(t *testing.T) {
var nodeName = "host"
testCases := []struct {
newEndpoints []*api.Endpoints
newEndpoints map[types.NamespacedName]*api.Endpoints
oldEndpoints map[proxy.ServicePortName][]*endpointsInfo
expectedResult map[proxy.ServicePortName][]*endpointsInfo
expectedStale []endpointServicePair
expectedHealthchecks map[types.NamespacedName]int
}{{
// Case[0]: nothing
newEndpoints: []*api.Endpoints{},
newEndpoints: map[types.NamespacedName]*api.Endpoints{},
oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{},
expectedResult: map[proxy.ServicePortName][]*endpointsInfo{},
expectedStale: []endpointServicePair{},
expectedHealthchecks: map[types.NamespacedName]int{},
}, {
// Case[1]: no change, unnamed port
newEndpoints: []*api.Endpoints{
newEndpoints: makeEndpointsMap(
makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) {
ept.Subsets = []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{
@ -1445,7 +1455,7 @@ func Test_buildNewEndpointsMap(t *testing.T) {
}},
}}
}),
},
),
oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
makeServicePortName("ns1", "ep1", ""): {
{"1.1.1.1:11", false},
@ -1460,7 +1470,7 @@ func Test_buildNewEndpointsMap(t *testing.T) {
expectedHealthchecks: map[types.NamespacedName]int{},
}, {
// Case[2]: no change, named port, local
newEndpoints: []*api.Endpoints{
newEndpoints: makeEndpointsMap(
makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) {
ept.Subsets = []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{
@ -1473,7 +1483,7 @@ func Test_buildNewEndpointsMap(t *testing.T) {
}},
}}
}),
},
),
oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
makeServicePortName("ns1", "ep1", "p11"): {
{"1.1.1.1:11", true},
@ -1490,7 +1500,7 @@ func Test_buildNewEndpointsMap(t *testing.T) {
},
}, {
// Case[3]: no change, multiple subsets
newEndpoints: []*api.Endpoints{
newEndpoints: makeEndpointsMap(
makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) {
ept.Subsets = []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{
@ -1510,7 +1520,7 @@ func Test_buildNewEndpointsMap(t *testing.T) {
}},
}}
}),
},
),
oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
makeServicePortName("ns1", "ep1", "p11"): {
{"1.1.1.1:11", false},
@ -1531,7 +1541,7 @@ func Test_buildNewEndpointsMap(t *testing.T) {
expectedHealthchecks: map[types.NamespacedName]int{},
}, {
// Case[4]: no change, multiple subsets, multiple ports, local
newEndpoints: []*api.Endpoints{
newEndpoints: makeEndpointsMap(
makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) {
ept.Subsets = []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{
@ -1555,7 +1565,7 @@ func Test_buildNewEndpointsMap(t *testing.T) {
}},
}}
}),
},
),
oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
makeServicePortName("ns1", "ep1", "p11"): {
{"1.1.1.1:11", true},
@ -1584,7 +1594,7 @@ func Test_buildNewEndpointsMap(t *testing.T) {
},
}, {
// Case[5]: no change, multiple endpoints, subsets, IPs, and ports
newEndpoints: []*api.Endpoints{
newEndpoints: makeEndpointsMap(
makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) {
ept.Subsets = []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{
@ -1633,7 +1643,7 @@ func Test_buildNewEndpointsMap(t *testing.T) {
}},
}}
}),
},
),
oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
makeServicePortName("ns1", "ep1", "p11"): {
{"1.1.1.1:11", false},
@ -1693,7 +1703,7 @@ func Test_buildNewEndpointsMap(t *testing.T) {
},
}, {
// Case[6]: add an Endpoints
newEndpoints: []*api.Endpoints{
newEndpoints: makeEndpointsMap(
makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) {
ept.Subsets = []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{
@ -1705,7 +1715,7 @@ func Test_buildNewEndpointsMap(t *testing.T) {
}},
}}
}),
},
),
oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ /* empty */ },
expectedResult: map[proxy.ServicePortName][]*endpointsInfo{
makeServicePortName("ns1", "ep1", ""): {
@ -1718,7 +1728,7 @@ func Test_buildNewEndpointsMap(t *testing.T) {
},
}, {
// Case[7]: remove an Endpoints
newEndpoints: []*api.Endpoints{ /* empty */ },
newEndpoints: makeEndpointsMap(),
oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
makeServicePortName("ns1", "ep1", ""): {
{"1.1.1.1:11", true},
@ -1732,7 +1742,7 @@ func Test_buildNewEndpointsMap(t *testing.T) {
expectedHealthchecks: map[types.NamespacedName]int{},
}, {
// Case[8]: add an IP and port
newEndpoints: []*api.Endpoints{
newEndpoints: makeEndpointsMap(
makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) {
ept.Subsets = []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{
@ -1750,7 +1760,7 @@ func Test_buildNewEndpointsMap(t *testing.T) {
}},
}}
}),
},
),
oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
makeServicePortName("ns1", "ep1", "p11"): {
{"1.1.1.1:11", false},
@ -1772,7 +1782,7 @@ func Test_buildNewEndpointsMap(t *testing.T) {
},
}, {
// Case[9]: remove an IP and port
newEndpoints: []*api.Endpoints{
newEndpoints: makeEndpointsMap(
makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) {
ept.Subsets = []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{
@ -1784,7 +1794,7 @@ func Test_buildNewEndpointsMap(t *testing.T) {
}},
}}
}),
},
),
oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
makeServicePortName("ns1", "ep1", "p11"): {
{"1.1.1.1:11", false},
@ -1813,7 +1823,7 @@ func Test_buildNewEndpointsMap(t *testing.T) {
expectedHealthchecks: map[types.NamespacedName]int{},
}, {
// Case[10]: add a subset
newEndpoints: []*api.Endpoints{
newEndpoints: makeEndpointsMap(
makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) {
ept.Subsets = []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{
@ -1834,7 +1844,7 @@ func Test_buildNewEndpointsMap(t *testing.T) {
}},
}}
}),
},
),
oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
makeServicePortName("ns1", "ep1", "p11"): {
{"1.1.1.1:11", false},
@ -1854,7 +1864,7 @@ func Test_buildNewEndpointsMap(t *testing.T) {
},
}, {
// Case[11]: remove a subset
newEndpoints: []*api.Endpoints{
newEndpoints: makeEndpointsMap(
makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) {
ept.Subsets = []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{
@ -1866,7 +1876,7 @@ func Test_buildNewEndpointsMap(t *testing.T) {
}},
}}
}),
},
),
oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
makeServicePortName("ns1", "ep1", "p11"): {
{"1.1.1.1:11", false},
@ -1887,7 +1897,7 @@ func Test_buildNewEndpointsMap(t *testing.T) {
expectedHealthchecks: map[types.NamespacedName]int{},
}, {
// Case[12]: rename a port
newEndpoints: []*api.Endpoints{
newEndpoints: makeEndpointsMap(
makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) {
ept.Subsets = []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{
@ -1899,7 +1909,7 @@ func Test_buildNewEndpointsMap(t *testing.T) {
}},
}}
}),
},
),
oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
makeServicePortName("ns1", "ep1", "p11"): {
{"1.1.1.1:11", false},
@ -1917,7 +1927,7 @@ func Test_buildNewEndpointsMap(t *testing.T) {
expectedHealthchecks: map[types.NamespacedName]int{},
}, {
// Case[13]: renumber a port
newEndpoints: []*api.Endpoints{
newEndpoints: makeEndpointsMap(
makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) {
ept.Subsets = []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{
@ -1929,7 +1939,7 @@ func Test_buildNewEndpointsMap(t *testing.T) {
}},
}}
}),
},
),
oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
makeServicePortName("ns1", "ep1", "p11"): {
{"1.1.1.1:11", false},
@ -1947,7 +1957,7 @@ func Test_buildNewEndpointsMap(t *testing.T) {
expectedHealthchecks: map[types.NamespacedName]int{},
}, {
// Case[14]: complex add and remove
newEndpoints: []*api.Endpoints{
newEndpoints: makeEndpointsMap(
makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) {
ept.Subsets = []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{
@ -1995,7 +2005,7 @@ func Test_buildNewEndpointsMap(t *testing.T) {
}},
}}
}),
},
),
oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
makeServicePortName("ns1", "ep1", "p11"): {
{"1.1.1.1:11", false},

View File

@ -202,14 +202,12 @@ func waitForNumProxyClients(t *testing.T, s *ServiceInfo, want int, timeout time
func TestTCPProxy(t *testing.T) {
lb := NewLoadBalancerRR()
service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"}
lb.OnEndpointsUpdate([]*api.Endpoints{
{
ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}},
Ports: []api.EndpointPort{{Name: "p", Port: tcpServerPort}},
}},
},
lb.OnEndpointsAdd(&api.Endpoints{
ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}},
Ports: []api.EndpointPort{{Name: "p", Port: tcpServerPort}},
}},
})
fexec := makeFakeExec()
@ -232,14 +230,12 @@ func TestTCPProxy(t *testing.T) {
func TestUDPProxy(t *testing.T) {
lb := NewLoadBalancerRR()
service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"}
lb.OnEndpointsUpdate([]*api.Endpoints{
{
ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}},
Ports: []api.EndpointPort{{Name: "p", Port: udpServerPort}},
}},
},
lb.OnEndpointsAdd(&api.Endpoints{
ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}},
Ports: []api.EndpointPort{{Name: "p", Port: udpServerPort}},
}},
})
fexec := makeFakeExec()
@ -262,14 +258,12 @@ func TestUDPProxy(t *testing.T) {
func TestUDPProxyTimeout(t *testing.T) {
lb := NewLoadBalancerRR()
service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"}
lb.OnEndpointsUpdate([]*api.Endpoints{
{
ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}},
Ports: []api.EndpointPort{{Name: "p", Port: udpServerPort}},
}},
},
lb.OnEndpointsAdd(&api.Endpoints{
ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}},
Ports: []api.EndpointPort{{Name: "p", Port: udpServerPort}},
}},
})
fexec := makeFakeExec()
@ -297,19 +291,20 @@ func TestMultiPortProxy(t *testing.T) {
lb := NewLoadBalancerRR()
serviceP := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo-p"}, Port: "p"}
serviceQ := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo-q"}, Port: "q"}
lb.OnEndpointsUpdate([]*api.Endpoints{{
lb.OnEndpointsAdd(&api.Endpoints{
ObjectMeta: metav1.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace},
Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}},
Ports: []api.EndpointPort{{Name: "p", Protocol: "TCP", Port: tcpServerPort}},
}},
}, {
})
lb.OnEndpointsAdd(&api.Endpoints{
ObjectMeta: metav1.ObjectMeta{Name: serviceQ.Name, Namespace: serviceQ.Namespace},
Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}},
Ports: []api.EndpointPort{{Name: "q", Protocol: "UDP", Port: udpServerPort}},
}},
}})
})
fexec := makeFakeExec()
@ -397,14 +392,12 @@ func stopProxyByName(proxier *Proxier, service proxy.ServicePortName) error {
func TestTCPProxyStop(t *testing.T) {
lb := NewLoadBalancerRR()
service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"}
lb.OnEndpointsUpdate([]*api.Endpoints{
{
ObjectMeta: metav1.ObjectMeta{Namespace: service.Namespace, Name: service.Name},
Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}},
Ports: []api.EndpointPort{{Name: "p", Port: tcpServerPort}},
}},
},
lb.OnEndpointsAdd(&api.Endpoints{
ObjectMeta: metav1.ObjectMeta{Namespace: service.Namespace, Name: service.Name},
Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}},
Ports: []api.EndpointPort{{Name: "p", Port: tcpServerPort}},
}},
})
fexec := makeFakeExec()
@ -444,14 +437,12 @@ func TestTCPProxyStop(t *testing.T) {
func TestUDPProxyStop(t *testing.T) {
lb := NewLoadBalancerRR()
service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"}
lb.OnEndpointsUpdate([]*api.Endpoints{
{
ObjectMeta: metav1.ObjectMeta{Namespace: service.Namespace, Name: service.Name},
Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}},
Ports: []api.EndpointPort{{Name: "p", Port: udpServerPort}},
}},
},
lb.OnEndpointsAdd(&api.Endpoints{
ObjectMeta: metav1.ObjectMeta{Namespace: service.Namespace, Name: service.Name},
Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}},
Ports: []api.EndpointPort{{Name: "p", Port: udpServerPort}},
}},
})
fexec := makeFakeExec()
@ -485,14 +476,12 @@ func TestUDPProxyStop(t *testing.T) {
func TestTCPProxyUpdateDelete(t *testing.T) {
lb := NewLoadBalancerRR()
service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"}
lb.OnEndpointsUpdate([]*api.Endpoints{
{
ObjectMeta: metav1.ObjectMeta{Namespace: service.Namespace, Name: service.Name},
Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}},
Ports: []api.EndpointPort{{Name: "p", Port: tcpServerPort}},
}},
},
lb.OnEndpointsAdd(&api.Endpoints{
ObjectMeta: metav1.ObjectMeta{Namespace: service.Namespace, Name: service.Name},
Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}},
Ports: []api.EndpointPort{{Name: "p", Port: tcpServerPort}},
}},
})
fexec := makeFakeExec()
@ -525,14 +514,12 @@ func TestTCPProxyUpdateDelete(t *testing.T) {
func TestUDPProxyUpdateDelete(t *testing.T) {
lb := NewLoadBalancerRR()
service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"}
lb.OnEndpointsUpdate([]*api.Endpoints{
{
ObjectMeta: metav1.ObjectMeta{Namespace: service.Namespace, Name: service.Name},
Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}},
Ports: []api.EndpointPort{{Name: "p", Port: udpServerPort}},
}},
},
lb.OnEndpointsAdd(&api.Endpoints{
ObjectMeta: metav1.ObjectMeta{Namespace: service.Namespace, Name: service.Name},
Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}},
Ports: []api.EndpointPort{{Name: "p", Port: udpServerPort}},
}},
})
fexec := makeFakeExec()
@ -572,7 +559,7 @@ func TestTCPProxyUpdateDeleteUpdate(t *testing.T) {
Ports: []api.EndpointPort{{Name: "p", Port: tcpServerPort}},
}},
}
lb.OnEndpointsUpdate([]*api.Endpoints{endpoint})
lb.OnEndpointsAdd(endpoint)
fexec := makeFakeExec()
@ -601,7 +588,7 @@ func TestTCPProxyUpdateDeleteUpdate(t *testing.T) {
waitForNumProxyLoops(t, p, 0)
// need to add endpoint here because it got clean up during service delete
lb.OnEndpointsUpdate([]*api.Endpoints{endpoint})
lb.OnEndpointsAdd(endpoint)
p.OnServiceUpdate([]*api.Service{{
ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Spec: api.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []api.ServicePort{{
@ -628,7 +615,7 @@ func TestUDPProxyUpdateDeleteUpdate(t *testing.T) {
Ports: []api.EndpointPort{{Name: "p", Port: udpServerPort}},
}},
}
lb.OnEndpointsUpdate([]*api.Endpoints{endpoint})
lb.OnEndpointsAdd(endpoint)
fexec := makeFakeExec()
@ -657,7 +644,7 @@ func TestUDPProxyUpdateDeleteUpdate(t *testing.T) {
waitForNumProxyLoops(t, p, 0)
// need to add endpoint here because it got clean up during service delete
lb.OnEndpointsUpdate([]*api.Endpoints{endpoint})
lb.OnEndpointsAdd(endpoint)
p.OnServiceUpdate([]*api.Service{{
ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Spec: api.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []api.ServicePort{{
@ -677,14 +664,12 @@ func TestUDPProxyUpdateDeleteUpdate(t *testing.T) {
func TestTCPProxyUpdatePort(t *testing.T) {
lb := NewLoadBalancerRR()
service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"}
lb.OnEndpointsUpdate([]*api.Endpoints{
{
ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}},
Ports: []api.EndpointPort{{Name: "p", Port: tcpServerPort}},
}},
},
lb.OnEndpointsAdd(&api.Endpoints{
ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}},
Ports: []api.EndpointPort{{Name: "p", Port: tcpServerPort}},
}},
})
fexec := makeFakeExec()
@ -728,14 +713,12 @@ func TestTCPProxyUpdatePort(t *testing.T) {
func TestUDPProxyUpdatePort(t *testing.T) {
lb := NewLoadBalancerRR()
service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"}
lb.OnEndpointsUpdate([]*api.Endpoints{
{
ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}},
Ports: []api.EndpointPort{{Name: "p", Port: udpServerPort}},
}},
},
lb.OnEndpointsAdd(&api.Endpoints{
ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}},
Ports: []api.EndpointPort{{Name: "p", Port: udpServerPort}},
}},
})
fexec := makeFakeExec()
@ -776,14 +759,12 @@ func TestUDPProxyUpdatePort(t *testing.T) {
func TestProxyUpdatePublicIPs(t *testing.T) {
lb := NewLoadBalancerRR()
service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"}
lb.OnEndpointsUpdate([]*api.Endpoints{
{
ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}},
Ports: []api.EndpointPort{{Name: "p", Port: tcpServerPort}},
}},
},
lb.OnEndpointsAdd(&api.Endpoints{
ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}},
Ports: []api.EndpointPort{{Name: "p", Port: tcpServerPort}},
}},
})
fexec := makeFakeExec()
@ -838,7 +819,7 @@ func TestProxyUpdatePortal(t *testing.T) {
Ports: []api.EndpointPort{{Name: "p", Port: tcpServerPort}},
}},
}
lb.OnEndpointsUpdate([]*api.Endpoints{endpoint})
lb.OnEndpointsAdd(endpoint)
fexec := makeFakeExec()
@ -890,7 +871,7 @@ func TestProxyUpdatePortal(t *testing.T) {
Protocol: "TCP",
}}},
}})
lb.OnEndpointsUpdate([]*api.Endpoints{endpoint})
lb.OnEndpointsAdd(endpoint)
svcInfo, exists = p.getServiceInfo(service)
if !exists {
t.Fatalf("service with ClusterIP set not found in the proxy")

View File

@ -243,65 +243,92 @@ func (lb *LoadBalancerRR) updateAffinityMap(svcPort proxy.ServicePortName, newEn
}
}
// OnEndpointsUpdate manages the registered service endpoints.
// Registered endpoints are updated if found in the update set or
// unregistered if missing from the update set.
func (lb *LoadBalancerRR) OnEndpointsUpdate(allEndpoints []*api.Endpoints) {
registeredEndpoints := make(map[proxy.ServicePortName]bool)
// buildPortsToEndpointsMap builds a map of portname -> all ip:ports for that
// portname. Expode Endpoints.Subsets[*] into this structure.
func buildPortsToEndpointsMap(endpoints *api.Endpoints) map[string][]hostPortPair {
portsToEndpoints := map[string][]hostPortPair{}
for i := range endpoints.Subsets {
ss := &endpoints.Subsets[i]
for i := range ss.Ports {
port := &ss.Ports[i]
for i := range ss.Addresses {
addr := &ss.Addresses[i]
portsToEndpoints[port.Name] = append(portsToEndpoints[port.Name], hostPortPair{addr.IP, int(port.Port)})
// Ignore the protocol field - we'll get that from the Service objects.
}
}
}
return portsToEndpoints
}
func (lb *LoadBalancerRR) OnEndpointsAdd(endpoints *api.Endpoints) {
portsToEndpoints := buildPortsToEndpointsMap(endpoints)
lb.lock.Lock()
defer lb.lock.Unlock()
// Update endpoints for services.
for i := range allEndpoints {
// svcEndpoints object should NOT be modified.
svcEndpoints := allEndpoints[i]
for portname := range portsToEndpoints {
svcPort := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}, Port: portname}
newEndpoints := flattenValidEndpoints(portsToEndpoints[portname])
state, exists := lb.services[svcPort]
// We need to build a map of portname -> all ip:ports for that
// portname. Explode Endpoints.Subsets[*] into this structure.
portsToEndpoints := map[string][]hostPortPair{}
for i := range svcEndpoints.Subsets {
ss := &svcEndpoints.Subsets[i]
for i := range ss.Ports {
port := &ss.Ports[i]
for i := range ss.Addresses {
addr := &ss.Addresses[i]
portsToEndpoints[port.Name] = append(portsToEndpoints[port.Name], hostPortPair{addr.IP, int(port.Port)})
// Ignore the protocol field - we'll get that from the Service objects.
}
}
}
if !exists || state == nil || len(newEndpoints) > 0 {
glog.V(1).Infof("LoadBalancerRR: Setting endpoints for %s to %+v", svcPort, newEndpoints)
lb.updateAffinityMap(svcPort, newEndpoints)
// OnEndpointsAdd can be called without NewService being called externally.
// To be safe we will call it here. A new service will only be created
// if one does not already exist. The affinity will be updated
// later, once NewService is called.
state = lb.newServiceInternal(svcPort, api.ServiceAffinity(""), 0)
state.endpoints = slice.ShuffleStrings(newEndpoints)
for portname := range portsToEndpoints {
svcPort := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: svcEndpoints.Namespace, Name: svcEndpoints.Name}, Port: portname}
state, exists := lb.services[svcPort]
curEndpoints := []string{}
if state != nil {
curEndpoints = state.endpoints
}
newEndpoints := flattenValidEndpoints(portsToEndpoints[portname])
if !exists || state == nil || len(curEndpoints) != len(newEndpoints) || !slicesEquiv(slice.CopyStrings(curEndpoints), newEndpoints) {
glog.V(1).Infof("LoadBalancerRR: Setting endpoints for %s to %+v", svcPort, newEndpoints)
lb.updateAffinityMap(svcPort, newEndpoints)
// OnEndpointsUpdate can be called without NewService being called externally.
// To be safe we will call it here. A new service will only be created
// if one does not already exist. The affinity will be updated
// later, once NewService is called.
state = lb.newServiceInternal(svcPort, api.ServiceAffinity(""), 0)
state.endpoints = slice.ShuffleStrings(newEndpoints)
// Reset the round-robin index.
state.index = 0
}
registeredEndpoints[svcPort] = true
// Reset the round-robin index.
state.index = 0
}
}
// Remove endpoints missing from the update.
for k := range lb.services {
if _, exists := registeredEndpoints[k]; !exists {
glog.V(2).Infof("LoadBalancerRR: Removing endpoints for %s", k)
}
func (lb *LoadBalancerRR) OnEndpointsUpdate(oldEndpoints, endpoints *api.Endpoints) {
portsToEndpoints := buildPortsToEndpointsMap(endpoints)
oldPortsToEndpoints := buildPortsToEndpointsMap(oldEndpoints)
registeredEndpoints := make(map[proxy.ServicePortName]bool)
lb.lock.Lock()
defer lb.lock.Unlock()
for portname := range portsToEndpoints {
svcPort := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}, Port: portname}
newEndpoints := flattenValidEndpoints(portsToEndpoints[portname])
state, exists := lb.services[svcPort]
curEndpoints := []string{}
if state != nil {
curEndpoints = state.endpoints
}
if !exists || state == nil || len(curEndpoints) != len(newEndpoints) || !slicesEquiv(slice.CopyStrings(curEndpoints), newEndpoints) {
glog.V(1).Infof("LoadBalancerRR: Setting endpoints for %s to %+v", svcPort, newEndpoints)
lb.updateAffinityMap(svcPort, newEndpoints)
// OnEndpointsUpdate can be called without NewService being called externally.
// To be safe we will call it here. A new service will only be created
// if one does not already exist. The affinity will be updated
// later, once NewService is called.
state = lb.newServiceInternal(svcPort, api.ServiceAffinity(""), 0)
state.endpoints = slice.ShuffleStrings(newEndpoints)
// Reset the round-robin index.
state.index = 0
}
registeredEndpoints[svcPort] = true
}
// Now remove all endpoints missing from the update.
for portname := range oldPortsToEndpoints {
svcPort := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: oldEndpoints.Namespace, Name: oldEndpoints.Name}, Port: portname}
if _, exists := registeredEndpoints[svcPort]; !exists {
glog.V(2).Infof("LoadBalancerRR: Removing endpoints for %s", svcPort)
// Reset but don't delete.
state := lb.services[k]
state := lb.services[svcPort]
state.endpoints = []string{}
state.index = 0
state.affinity.affinityMap = map[string]*affinityState{}
@ -309,6 +336,27 @@ func (lb *LoadBalancerRR) OnEndpointsUpdate(allEndpoints []*api.Endpoints) {
}
}
func (lb *LoadBalancerRR) OnEndpointsDelete(endpoints *api.Endpoints) {
portsToEndpoints := buildPortsToEndpointsMap(endpoints)
lb.lock.Lock()
defer lb.lock.Unlock()
for portname := range portsToEndpoints {
svcPort := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}, Port: portname}
glog.V(2).Infof("LoadBalancerRR: Removing endpoints for %s", svcPort)
// If the service is still around, reset but don't delete.
if state, ok := lb.services[svcPort]; ok {
state.endpoints = []string{}
state.index = 0
state.affinity.affinityMap = map[string]*affinityState{}
}
}
}
func (lb *LoadBalancerRR) OnEndpointsSynced() {
}
// Tests whether two slices are equivalent. This sorts both slices in-place.
func slicesEquiv(lhs, rhs []string) bool {
if len(lhs) != len(rhs) {

View File

@ -67,8 +67,6 @@ func TestFilterWorks(t *testing.T) {
func TestLoadBalanceFailsWithNoEndpoints(t *testing.T) {
loadBalancer := NewLoadBalancerRR()
var endpoints []*api.Endpoints
loadBalancer.OnEndpointsUpdate(endpoints)
service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: "does-not-exist"}
endpoint, err := loadBalancer.NextEndpoint(service, nil, false)
if err == nil {
@ -106,15 +104,14 @@ func TestLoadBalanceWorksWithSingleEndpoint(t *testing.T) {
if err == nil || len(endpoint) != 0 {
t.Errorf("Didn't fail with non-existent service")
}
endpoints := make([]*api.Endpoints, 1)
endpoints[0] = &api.Endpoints{
endpoints := &api.Endpoints{
ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{IP: "endpoint1"}},
Ports: []api.EndpointPort{{Name: "p", Port: 40}},
}},
}
loadBalancer.OnEndpointsUpdate(endpoints)
loadBalancer.OnEndpointsAdd(endpoints)
expectEndpoint(t, loadBalancer, service, "endpoint1:40", nil)
expectEndpoint(t, loadBalancer, service, "endpoint1:40", nil)
expectEndpoint(t, loadBalancer, service, "endpoint1:40", nil)
@ -144,15 +141,14 @@ func TestLoadBalanceWorksWithMultipleEndpoints(t *testing.T) {
if err == nil || len(endpoint) != 0 {
t.Errorf("Didn't fail with non-existent service")
}
endpoints := make([]*api.Endpoints, 1)
endpoints[0] = &api.Endpoints{
endpoints := &api.Endpoints{
ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{IP: "endpoint"}},
Ports: []api.EndpointPort{{Name: "p", Port: 1}, {Name: "p", Port: 2}, {Name: "p", Port: 3}},
}},
}
loadBalancer.OnEndpointsUpdate(endpoints)
loadBalancer.OnEndpointsAdd(endpoints)
shuffledEndpoints := loadBalancer.services[service].endpoints
if !stringsInSlice(shuffledEndpoints, "endpoint:1", "endpoint:2", "endpoint:3") {
@ -172,8 +168,7 @@ func TestLoadBalanceWorksWithMultipleEndpointsMultiplePorts(t *testing.T) {
if err == nil || len(endpoint) != 0 {
t.Errorf("Didn't fail with non-existent service")
}
endpoints := make([]*api.Endpoints, 1)
endpoints[0] = &api.Endpoints{
endpoints := &api.Endpoints{
ObjectMeta: metav1.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace},
Subsets: []api.EndpointSubset{
{
@ -186,7 +181,7 @@ func TestLoadBalanceWorksWithMultipleEndpointsMultiplePorts(t *testing.T) {
},
},
}
loadBalancer.OnEndpointsUpdate(endpoints)
loadBalancer.OnEndpointsAdd(endpoints)
shuffledEndpoints := loadBalancer.services[serviceP].endpoints
if !stringsInSlice(shuffledEndpoints, "endpoint1:1", "endpoint2:1", "endpoint3:3") {
@ -215,8 +210,7 @@ func TestLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) {
if err == nil || len(endpoint) != 0 {
t.Errorf("Didn't fail with non-existent service")
}
endpoints := make([]*api.Endpoints, 1)
endpoints[0] = &api.Endpoints{
endpointsv1 := &api.Endpoints{
ObjectMeta: metav1.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace},
Subsets: []api.EndpointSubset{
{
@ -233,7 +227,7 @@ func TestLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) {
},
},
}
loadBalancer.OnEndpointsUpdate(endpoints)
loadBalancer.OnEndpointsAdd(endpointsv1)
shuffledEndpoints := loadBalancer.services[serviceP].endpoints
if !stringsInSlice(shuffledEndpoints, "endpoint1:1", "endpoint2:2", "endpoint3:3") {
@ -255,7 +249,7 @@ func TestLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) {
// Then update the configuration with one fewer endpoints, make sure
// we start in the beginning again
endpoints[0] = &api.Endpoints{
endpointsv2 := &api.Endpoints{
ObjectMeta: metav1.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace},
Subsets: []api.EndpointSubset{
{
@ -268,7 +262,7 @@ func TestLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) {
},
},
}
loadBalancer.OnEndpointsUpdate(endpoints)
loadBalancer.OnEndpointsUpdate(endpointsv1, endpointsv2)
shuffledEndpoints = loadBalancer.services[serviceP].endpoints
if !stringsInSlice(shuffledEndpoints, "endpoint4:4", "endpoint5:5") {
@ -289,8 +283,8 @@ func TestLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) {
expectEndpoint(t, loadBalancer, serviceQ, shuffledEndpoints[1], nil)
// Clear endpoints
endpoints[0] = &api.Endpoints{ObjectMeta: metav1.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace}, Subsets: nil}
loadBalancer.OnEndpointsUpdate(endpoints)
endpointsv3 := &api.Endpoints{ObjectMeta: metav1.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace}, Subsets: nil}
loadBalancer.OnEndpointsUpdate(endpointsv2, endpointsv3)
endpoint, err = loadBalancer.NextEndpoint(serviceP, nil, false)
if err == nil || len(endpoint) != 0 {
@ -306,8 +300,7 @@ func TestLoadBalanceWorksWithServiceRemoval(t *testing.T) {
if err == nil || len(endpoint) != 0 {
t.Errorf("Didn't fail with non-existent service")
}
endpoints := make([]*api.Endpoints, 2)
endpoints[0] = &api.Endpoints{
endpoints1 := &api.Endpoints{
ObjectMeta: metav1.ObjectMeta{Name: fooServiceP.Name, Namespace: fooServiceP.Namespace},
Subsets: []api.EndpointSubset{
{
@ -316,7 +309,7 @@ func TestLoadBalanceWorksWithServiceRemoval(t *testing.T) {
},
},
}
endpoints[1] = &api.Endpoints{
endpoints2 := &api.Endpoints{
ObjectMeta: metav1.ObjectMeta{Name: barServiceP.Name, Namespace: barServiceP.Namespace},
Subsets: []api.EndpointSubset{
{
@ -325,7 +318,8 @@ func TestLoadBalanceWorksWithServiceRemoval(t *testing.T) {
},
},
}
loadBalancer.OnEndpointsUpdate(endpoints)
loadBalancer.OnEndpointsAdd(endpoints1)
loadBalancer.OnEndpointsAdd(endpoints2)
shuffledFooEndpoints := loadBalancer.services[fooServiceP].endpoints
expectEndpoint(t, loadBalancer, fooServiceP, shuffledFooEndpoints[0], nil)
expectEndpoint(t, loadBalancer, fooServiceP, shuffledFooEndpoints[1], nil)
@ -341,7 +335,7 @@ func TestLoadBalanceWorksWithServiceRemoval(t *testing.T) {
expectEndpoint(t, loadBalancer, barServiceP, shuffledBarEndpoints[1], nil)
// Then update the configuration by removing foo
loadBalancer.OnEndpointsUpdate(endpoints[1:])
loadBalancer.OnEndpointsDelete(endpoints1)
endpoint, err = loadBalancer.NextEndpoint(fooServiceP, nil, false)
if err == nil || len(endpoint) != 0 {
t.Errorf("Didn't fail with non-existent service")
@ -364,8 +358,7 @@ func TestStickyLoadBalanceWorksWithNewServiceCalledFirst(t *testing.T) {
// Call NewService() before OnEndpointsUpdate()
loadBalancer.NewService(service, api.ServiceAffinityClientIP, 0)
endpoints := make([]*api.Endpoints, 1)
endpoints[0] = &api.Endpoints{
endpoints := &api.Endpoints{
ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Subsets: []api.EndpointSubset{
{Addresses: []api.EndpointAddress{{IP: "endpoint1"}}, Ports: []api.EndpointPort{{Port: 1}}},
@ -373,7 +366,7 @@ func TestStickyLoadBalanceWorksWithNewServiceCalledFirst(t *testing.T) {
{Addresses: []api.EndpointAddress{{IP: "endpoint3"}}, Ports: []api.EndpointPort{{Port: 3}}},
},
}
loadBalancer.OnEndpointsUpdate(endpoints)
loadBalancer.OnEndpointsAdd(endpoints)
client1 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0}
client2 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 2), Port: 0}
@ -420,15 +413,14 @@ func TestStickyLoadBalanceWorksWithNewServiceCalledSecond(t *testing.T) {
}
// Call OnEndpointsUpdate() before NewService()
endpoints := make([]*api.Endpoints, 1)
endpoints[0] = &api.Endpoints{
endpoints := &api.Endpoints{
ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Subsets: []api.EndpointSubset{
{Addresses: []api.EndpointAddress{{IP: "endpoint1"}}, Ports: []api.EndpointPort{{Port: 1}}},
{Addresses: []api.EndpointAddress{{IP: "endpoint2"}}, Ports: []api.EndpointPort{{Port: 2}}},
},
}
loadBalancer.OnEndpointsUpdate(endpoints)
loadBalancer.OnEndpointsAdd(endpoints)
loadBalancer.NewService(service, api.ServiceAffinityClientIP, 0)
client1 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0}
@ -482,8 +474,7 @@ func TestStickyLoadBalanaceWorksWithMultipleEndpointsRemoveOne(t *testing.T) {
}
loadBalancer.NewService(service, api.ServiceAffinityClientIP, 0)
endpoints := make([]*api.Endpoints, 1)
endpoints[0] = &api.Endpoints{
endpointsv1 := &api.Endpoints{
ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Subsets: []api.EndpointSubset{
{
@ -492,7 +483,7 @@ func TestStickyLoadBalanaceWorksWithMultipleEndpointsRemoveOne(t *testing.T) {
},
},
}
loadBalancer.OnEndpointsUpdate(endpoints)
loadBalancer.OnEndpointsAdd(endpointsv1)
shuffledEndpoints := loadBalancer.services[service].endpoints
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client1)
client1Endpoint := shuffledEndpoints[0]
@ -503,7 +494,7 @@ func TestStickyLoadBalanaceWorksWithMultipleEndpointsRemoveOne(t *testing.T) {
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[2], client3)
client3Endpoint := shuffledEndpoints[2]
endpoints[0] = &api.Endpoints{
endpointsv2 := &api.Endpoints{
ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Subsets: []api.EndpointSubset{
{
@ -512,7 +503,7 @@ func TestStickyLoadBalanaceWorksWithMultipleEndpointsRemoveOne(t *testing.T) {
},
},
}
loadBalancer.OnEndpointsUpdate(endpoints)
loadBalancer.OnEndpointsUpdate(endpointsv1, endpointsv2)
shuffledEndpoints = loadBalancer.services[service].endpoints
if client1Endpoint == "endpoint:3" {
client1Endpoint = shuffledEndpoints[0]
@ -525,7 +516,7 @@ func TestStickyLoadBalanaceWorksWithMultipleEndpointsRemoveOne(t *testing.T) {
expectEndpoint(t, loadBalancer, service, client2Endpoint, client2)
expectEndpoint(t, loadBalancer, service, client3Endpoint, client3)
endpoints[0] = &api.Endpoints{
endpointsv3 := &api.Endpoints{
ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Subsets: []api.EndpointSubset{
{
@ -534,7 +525,7 @@ func TestStickyLoadBalanaceWorksWithMultipleEndpointsRemoveOne(t *testing.T) {
},
},
}
loadBalancer.OnEndpointsUpdate(endpoints)
loadBalancer.OnEndpointsUpdate(endpointsv2, endpointsv3)
shuffledEndpoints = loadBalancer.services[service].endpoints
expectEndpoint(t, loadBalancer, service, client1Endpoint, client1)
expectEndpoint(t, loadBalancer, service, client2Endpoint, client2)
@ -556,8 +547,7 @@ func TestStickyLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) {
}
loadBalancer.NewService(service, api.ServiceAffinityClientIP, 0)
endpoints := make([]*api.Endpoints, 1)
endpoints[0] = &api.Endpoints{
endpointsv1 := &api.Endpoints{
ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Subsets: []api.EndpointSubset{
{
@ -566,7 +556,7 @@ func TestStickyLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) {
},
},
}
loadBalancer.OnEndpointsUpdate(endpoints)
loadBalancer.OnEndpointsAdd(endpointsv1)
shuffledEndpoints := loadBalancer.services[service].endpoints
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client1)
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client1)
@ -577,7 +567,7 @@ func TestStickyLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) {
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], client2)
// Then update the configuration with one fewer endpoints, make sure
// we start in the beginning again
endpoints[0] = &api.Endpoints{
endpointsv2 := &api.Endpoints{
ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Subsets: []api.EndpointSubset{
{
@ -586,7 +576,7 @@ func TestStickyLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) {
},
},
}
loadBalancer.OnEndpointsUpdate(endpoints)
loadBalancer.OnEndpointsUpdate(endpointsv1, endpointsv2)
shuffledEndpoints = loadBalancer.services[service].endpoints
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client1)
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], client2)
@ -596,8 +586,8 @@ func TestStickyLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) {
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], client2)
// Clear endpoints
endpoints[0] = &api.Endpoints{ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Subsets: nil}
loadBalancer.OnEndpointsUpdate(endpoints)
endpointsv3 := &api.Endpoints{ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Subsets: nil}
loadBalancer.OnEndpointsUpdate(endpointsv2, endpointsv3)
endpoint, err = loadBalancer.NextEndpoint(service, nil, false)
if err == nil || len(endpoint) != 0 {
@ -616,8 +606,7 @@ func TestStickyLoadBalanceWorksWithServiceRemoval(t *testing.T) {
t.Errorf("Didn't fail with non-existent service")
}
loadBalancer.NewService(fooService, api.ServiceAffinityClientIP, 0)
endpoints := make([]*api.Endpoints, 2)
endpoints[0] = &api.Endpoints{
endpoints1 := &api.Endpoints{
ObjectMeta: metav1.ObjectMeta{Name: fooService.Name, Namespace: fooService.Namespace},
Subsets: []api.EndpointSubset{
{
@ -628,7 +617,7 @@ func TestStickyLoadBalanceWorksWithServiceRemoval(t *testing.T) {
}
barService := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "bar"}, Port: ""}
loadBalancer.NewService(barService, api.ServiceAffinityClientIP, 0)
endpoints[1] = &api.Endpoints{
endpoints2 := &api.Endpoints{
ObjectMeta: metav1.ObjectMeta{Name: barService.Name, Namespace: barService.Namespace},
Subsets: []api.EndpointSubset{
{
@ -637,7 +626,8 @@ func TestStickyLoadBalanceWorksWithServiceRemoval(t *testing.T) {
},
},
}
loadBalancer.OnEndpointsUpdate(endpoints)
loadBalancer.OnEndpointsAdd(endpoints1)
loadBalancer.OnEndpointsAdd(endpoints2)
shuffledFooEndpoints := loadBalancer.services[fooService].endpoints
expectEndpoint(t, loadBalancer, fooService, shuffledFooEndpoints[0], client1)
@ -659,7 +649,7 @@ func TestStickyLoadBalanceWorksWithServiceRemoval(t *testing.T) {
expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[1], client2)
// Then update the configuration by removing foo
loadBalancer.OnEndpointsUpdate(endpoints[1:])
loadBalancer.OnEndpointsDelete(endpoints1)
endpoint, err = loadBalancer.NextEndpoint(fooService, nil, false)
if err == nil || len(endpoint) != 0 {
t.Errorf("Didn't fail with non-existent service")
@ -685,8 +675,7 @@ func TestStickyLoadBalanceWorksWithEndpointFails(t *testing.T) {
// Call NewService() before OnEndpointsUpdate()
loadBalancer.NewService(service, api.ServiceAffinityClientIP, 0)
endpoints := make([]*api.Endpoints, 1)
endpoints[0] = &api.Endpoints{
endpoints := &api.Endpoints{
ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Subsets: []api.EndpointSubset{
{Addresses: []api.EndpointAddress{{IP: "endpoint1"}}, Ports: []api.EndpointPort{{Port: 1}}},
@ -694,7 +683,7 @@ func TestStickyLoadBalanceWorksWithEndpointFails(t *testing.T) {
{Addresses: []api.EndpointAddress{{IP: "endpoint3"}}, Ports: []api.EndpointPort{{Port: 3}}},
},
}
loadBalancer.OnEndpointsUpdate(endpoints)
loadBalancer.OnEndpointsAdd(endpoints)
client1 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0}
client2 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 2), Port: 0}