Changes to Proxy common code

This commit is contained in:
Vinod K L Swamy 2020-06-29 14:29:46 -07:00
parent 71c352dee3
commit 4505d5b182
No known key found for this signature in database
GPG Key ID: 9F321C2977A31082
8 changed files with 61 additions and 41 deletions

View File

@ -94,6 +94,10 @@ func newBaseEndpointInfo(IP string, port int, isLocal bool, topology map[string]
type makeEndpointFunc func(info *BaseEndpointInfo) Endpoint
// This handler is invoked by the apply function on every change. This function should not modify the
// EndpointsMap's but just use the changes for any Proxier specific cleanup.
type processEndpointsMapChangeFunc func(oldEndpointsMap, newEndpointsMap EndpointsMap)
// EndpointChangeTracker carries state about uncommitted changes to an arbitrary number of
// Endpoints, keyed by their namespace and name.
type EndpointChangeTracker struct {
@ -104,7 +108,8 @@ type EndpointChangeTracker struct {
// items maps a service to is endpointsChange.
items map[types.NamespacedName]*endpointsChange
// makeEndpointInfo allows proxier to inject customized information when processing endpoint.
makeEndpointInfo makeEndpointFunc
makeEndpointInfo makeEndpointFunc
processEndpointsMapChange processEndpointsMapChangeFunc
// endpointSliceCache holds a simplified version of endpoint slices.
endpointSliceCache *EndpointSliceCache
// isIPv6Mode indicates if change tracker is under IPv6/IPv4 mode. Nil means not applicable.
@ -116,14 +121,15 @@ type EndpointChangeTracker struct {
}
// NewEndpointChangeTracker initializes an EndpointsChangeMap
func NewEndpointChangeTracker(hostname string, makeEndpointInfo makeEndpointFunc, isIPv6Mode *bool, recorder record.EventRecorder, endpointSlicesEnabled bool) *EndpointChangeTracker {
func NewEndpointChangeTracker(hostname string, makeEndpointInfo makeEndpointFunc, isIPv6Mode *bool, recorder record.EventRecorder, endpointSlicesEnabled bool, processEndpointsMapChange processEndpointsMapChangeFunc) *EndpointChangeTracker {
ect := &EndpointChangeTracker{
hostname: hostname,
items: make(map[types.NamespacedName]*endpointsChange),
makeEndpointInfo: makeEndpointInfo,
isIPv6Mode: isIPv6Mode,
recorder: recorder,
lastChangeTriggerTimes: make(map[types.NamespacedName][]time.Time),
hostname: hostname,
items: make(map[types.NamespacedName]*endpointsChange),
makeEndpointInfo: makeEndpointInfo,
isIPv6Mode: isIPv6Mode,
recorder: recorder,
lastChangeTriggerTimes: make(map[types.NamespacedName][]time.Time),
processEndpointsMapChange: processEndpointsMapChange,
}
if endpointSlicesEnabled {
ect.endpointSliceCache = NewEndpointSliceCache(hostname, isIPv6Mode, recorder, makeEndpointInfo)
@ -388,6 +394,7 @@ func (ect *EndpointChangeTracker) endpointsToEndpointsMap(endpoints *v1.Endpoint
// The changes map is cleared after applying them.
// In addition it returns (via argument) and resets the lastChangeTriggerTimes for all endpoints
// that were changed and will result in syncing the proxy rules.
// apply triggers processEndpointsMapChange on every change.
func (em EndpointsMap) apply(ect *EndpointChangeTracker, staleEndpoints *[]ServiceEndpoint,
staleServiceNames *[]ServicePortName, lastChangeTriggerTimes *map[types.NamespacedName][]time.Time) {
if ect == nil {
@ -396,6 +403,9 @@ func (em EndpointsMap) apply(ect *EndpointChangeTracker, staleEndpoints *[]Servi
changes := ect.checkoutChanges()
for _, change := range changes {
if ect.processEndpointsMapChange != nil {
ect.processEndpointsMapChange(change.previous, change.current)
}
em.unmerge(change.previous)
em.merge(change.current)
detectStaleConnections(change.previous, change.current, staleEndpoints, staleServiceNames)

View File

@ -135,7 +135,7 @@ func makeTestEndpoints(namespace, name string, eptFunc func(*v1.Endpoints)) *v1.
// This is a coarse test, but it offers some modicum of confidence as the code is evolved.
func TestEndpointsToEndpointsMap(t *testing.T) {
epTracker := NewEndpointChangeTracker("test-hostname", nil, nil, nil, false)
epTracker := NewEndpointChangeTracker("test-hostname", nil, nil, nil, false, nil)
trueVal := true
falseVal := false
@ -1446,7 +1446,7 @@ func TestEndpointSliceUpdate(t *testing.T) {
// test starting from an empty state
"add a simple slice that doesn't already exist": {
startingSlices: []*discovery.EndpointSlice{},
endpointChangeTracker: NewEndpointChangeTracker("host1", nil, nil, nil, true),
endpointChangeTracker: NewEndpointChangeTracker("host1", nil, nil, nil, true, nil),
namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"},
paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 3, 999, []string{"host1", "host2"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}),
paramRemoveSlice: false,
@ -1469,7 +1469,7 @@ func TestEndpointSliceUpdate(t *testing.T) {
startingSlices: []*discovery.EndpointSlice{
generateEndpointSlice("svc1", "ns1", 1, 3, 999, []string{"host1", "host2"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}),
},
endpointChangeTracker: NewEndpointChangeTracker("host1", nil, nil, nil, true),
endpointChangeTracker: NewEndpointChangeTracker("host1", nil, nil, nil, true, nil),
namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"},
paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 3, 999, []string{"host1", "host2"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}),
paramRemoveSlice: false,
@ -1481,7 +1481,7 @@ func TestEndpointSliceUpdate(t *testing.T) {
startingSlices: []*discovery.EndpointSlice{
generateEndpointSlice("svc1", "ns1", 1, 3, 999, []string{"host1", "host2"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}),
},
endpointChangeTracker: NewEndpointChangeTracker("host1", nil, nil, nil, true),
endpointChangeTracker: NewEndpointChangeTracker("host1", nil, nil, nil, true, nil),
namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"},
paramEndpointSlice: fqdnSlice,
paramRemoveSlice: false,
@ -1494,7 +1494,7 @@ func TestEndpointSliceUpdate(t *testing.T) {
generateEndpointSlice("svc1", "ns1", 1, 3, 999, []string{"host1", "host2"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}),
generateEndpointSlice("svc1", "ns1", 2, 2, 999, []string{"host1", "host2"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}),
},
endpointChangeTracker: NewEndpointChangeTracker("host1", nil, nil, nil, true),
endpointChangeTracker: NewEndpointChangeTracker("host1", nil, nil, nil, true, nil),
namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"},
paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 5, 999, []string{"host1"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}),
paramRemoveSlice: false,
@ -1526,7 +1526,7 @@ func TestEndpointSliceUpdate(t *testing.T) {
generateEndpointSlice("svc1", "ns1", 1, 3, 999, []string{"host1", "host2"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}),
generateEndpointSlice("svc1", "ns1", 2, 2, 999, []string{"host1", "host2"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}),
},
endpointChangeTracker: NewEndpointChangeTracker("host1", nil, nil, nil, true),
endpointChangeTracker: NewEndpointChangeTracker("host1", nil, nil, nil, true, nil),
namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"},
paramEndpointSlice: generateEndpointSliceWithOffset("svc1", "ns1", 3, 1, 5, 999, []string{"host1"}, []*int32{utilpointer.Int32Ptr(80)}),
paramRemoveSlice: false,
@ -1556,7 +1556,7 @@ func TestEndpointSliceUpdate(t *testing.T) {
generateEndpointSlice("svc1", "ns1", 1, 3, 999, []string{"host1", "host2"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}),
generateEndpointSlice("svc1", "ns1", 2, 2, 999, []string{"host1", "host2"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}),
},
endpointChangeTracker: NewEndpointChangeTracker("host1", nil, nil, nil, true),
endpointChangeTracker: NewEndpointChangeTracker("host1", nil, nil, nil, true, nil),
namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"},
paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 5, 999, []string{"host1"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}),
paramRemoveSlice: true,
@ -1578,7 +1578,7 @@ func TestEndpointSliceUpdate(t *testing.T) {
generateEndpointSlice("svc1", "ns1", 1, 5, 999, []string{"host1"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}),
generateEndpointSlice("svc1", "ns1", 2, 2, 999, []string{"host1"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}),
},
endpointChangeTracker: NewEndpointChangeTracker("host1", nil, nil, nil, true),
endpointChangeTracker: NewEndpointChangeTracker("host1", nil, nil, nil, true, nil),
namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"},
paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 3, 5, 999, []string{"host1"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}),
paramRemoveSlice: true,
@ -1590,7 +1590,7 @@ func TestEndpointSliceUpdate(t *testing.T) {
startingSlices: []*discovery.EndpointSlice{
generateEndpointSlice("svc1", "ns1", 1, 3, 999, []string{"host1", "host2"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}),
},
endpointChangeTracker: NewEndpointChangeTracker("host1", nil, nil, nil, true),
endpointChangeTracker: NewEndpointChangeTracker("host1", nil, nil, nil, true, nil),
namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"},
paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 3, 1, []string{"host1"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}),
paramRemoveSlice: false,
@ -1602,7 +1602,7 @@ func TestEndpointSliceUpdate(t *testing.T) {
startingSlices: []*discovery.EndpointSlice{
generateEndpointSlice("svc1", "ns1", 1, 2, 1, []string{"host1", "host2"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}),
},
endpointChangeTracker: NewEndpointChangeTracker("host1", nil, nil, nil, true),
endpointChangeTracker: NewEndpointChangeTracker("host1", nil, nil, nil, true, nil),
namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"},
paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 2, 999, []string{"host1"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}),
paramRemoveSlice: false,
@ -1624,7 +1624,7 @@ func TestEndpointSliceUpdate(t *testing.T) {
generateEndpointSlice("svc1", "ns1", 1, 3, 2, []string{"host1"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}),
generateEndpointSlice("svc1", "ns1", 2, 2, 2, []string{"host1"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}),
},
endpointChangeTracker: NewEndpointChangeTracker("host1", nil, nil, nil, true),
endpointChangeTracker: NewEndpointChangeTracker("host1", nil, nil, nil, true, nil),
namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"},
paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 3, 3, []string{"host1"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}),
paramRemoveSlice: false,
@ -1683,20 +1683,20 @@ func TestCheckoutChanges(t *testing.T) {
pendingSlices []*discovery.EndpointSlice
}{
"empty slices": {
endpointChangeTracker: NewEndpointChangeTracker("", nil, nil, nil, true),
endpointChangeTracker: NewEndpointChangeTracker("", nil, nil, nil, true, nil),
expectedChanges: []*endpointsChange{},
useEndpointSlices: true,
appliedSlices: []*discovery.EndpointSlice{},
pendingSlices: []*discovery.EndpointSlice{},
},
"without slices, empty items": {
endpointChangeTracker: NewEndpointChangeTracker("", nil, nil, nil, false),
endpointChangeTracker: NewEndpointChangeTracker("", nil, nil, nil, false, nil),
expectedChanges: []*endpointsChange{},
items: map[types.NamespacedName]*endpointsChange{},
useEndpointSlices: false,
},
"without slices, simple items": {
endpointChangeTracker: NewEndpointChangeTracker("", nil, nil, nil, false),
endpointChangeTracker: NewEndpointChangeTracker("", nil, nil, nil, false, nil),
expectedChanges: []*endpointsChange{{
previous: EndpointsMap{
svcPortName0: []Endpoint{newTestEp("10.0.1.1:80", ""), newTestEp("10.0.1.2:80", "")},
@ -1720,7 +1720,7 @@ func TestCheckoutChanges(t *testing.T) {
useEndpointSlices: false,
},
"adding initial slice": {
endpointChangeTracker: NewEndpointChangeTracker("", nil, nil, nil, true),
endpointChangeTracker: NewEndpointChangeTracker("", nil, nil, nil, true, nil),
expectedChanges: []*endpointsChange{{
previous: EndpointsMap{},
current: EndpointsMap{
@ -1734,7 +1734,7 @@ func TestCheckoutChanges(t *testing.T) {
},
},
"removing port in update": {
endpointChangeTracker: NewEndpointChangeTracker("", nil, nil, nil, true),
endpointChangeTracker: NewEndpointChangeTracker("", nil, nil, nil, true, nil),
expectedChanges: []*endpointsChange{{
previous: EndpointsMap{
svcPortName0: []Endpoint{newTestEp("10.0.1.1:80", "host1"), newTestEp("10.0.1.2:80", "host1")},

View File

@ -298,9 +298,9 @@ func NewProxier(ipt utiliptables.Interface,
proxier := &Proxier{
portsMap: make(map[utilproxy.LocalPort]utilproxy.Closeable),
serviceMap: make(proxy.ServiceMap),
serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, &isIPv6, recorder),
serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, &isIPv6, recorder, nil),
endpointsMap: make(proxy.EndpointsMap),
endpointsChanges: proxy.NewEndpointChangeTracker(hostname, newEndpointInfo, &isIPv6, recorder, endpointSlicesEnabled),
endpointsChanges: proxy.NewEndpointChangeTracker(hostname, newEndpointInfo, &isIPv6, recorder, endpointSlicesEnabled, nil),
syncPeriod: syncPeriod,
iptables: ipt,
masqueradeAll: masqueradeAll,

View File

@ -353,9 +353,9 @@ func NewFakeProxier(ipt utiliptables.Interface, endpointSlicesEnabled bool) *Pro
p := &Proxier{
exec: &fakeexec.FakeExec{},
serviceMap: make(proxy.ServiceMap),
serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, nil, nil),
serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, nil, nil, nil),
endpointsMap: make(proxy.EndpointsMap),
endpointsChanges: proxy.NewEndpointChangeTracker(testHostname, newEndpointInfo, nil, nil, endpointSlicesEnabled),
endpointsChanges: proxy.NewEndpointChangeTracker(testHostname, newEndpointInfo, nil, nil, endpointSlicesEnabled, nil),
iptables: ipt,
masqueradeMark: "0x4000",
localDetector: detectLocal,

View File

@ -441,9 +441,9 @@ func NewProxier(ipt utiliptables.Interface,
proxier := &Proxier{
portsMap: make(map[utilproxy.LocalPort]utilproxy.Closeable),
serviceMap: make(proxy.ServiceMap),
serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, &isIPv6, recorder),
serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, &isIPv6, recorder, nil),
endpointsMap: make(proxy.EndpointsMap),
endpointsChanges: proxy.NewEndpointChangeTracker(hostname, nil, &isIPv6, recorder, endpointSlicesEnabled),
endpointsChanges: proxy.NewEndpointChangeTracker(hostname, nil, &isIPv6, recorder, endpointSlicesEnabled, nil),
syncPeriod: syncPeriod,
minSyncPeriod: minSyncPeriod,
excludeCIDRs: parseExcludedCIDRs(excludeCIDRs),

View File

@ -122,9 +122,9 @@ func NewFakeProxier(ipt utiliptables.Interface, ipvs utilipvs.Interface, ipset u
p := &Proxier{
exec: fexec,
serviceMap: make(proxy.ServiceMap),
serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, nil, nil),
serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, nil, nil, nil),
endpointsMap: make(proxy.EndpointsMap),
endpointsChanges: proxy.NewEndpointChangeTracker(testHostname, nil, nil, nil, endpointSlicesEnabled),
endpointsChanges: proxy.NewEndpointChangeTracker(testHostname, nil, nil, nil, endpointSlicesEnabled, nil),
excludeCIDRs: excludeCIDRs,
iptables: ipt,
ipvs: ipvs,

View File

@ -198,6 +198,10 @@ func (sct *ServiceChangeTracker) newBaseServiceInfo(port *v1.ServicePort, servic
type makeServicePortFunc func(*v1.ServicePort, *v1.Service, *BaseServiceInfo) ServicePort
// This handler is invoked by the apply function on every change. This function should not modify the
// ServiceMap's but just use the changes for any Proxier specific cleanup.
type processServiceMapChangeFunc func(previous, current ServiceMap)
// serviceChange contains all changes to services that happened since proxy rules were synced. For a single object,
// changes are accumulated, i.e. previous is state from before applying the changes,
// current is state after applying all of the changes.
@ -214,19 +218,21 @@ type ServiceChangeTracker struct {
// items maps a service to its serviceChange.
items map[types.NamespacedName]*serviceChange
// makeServiceInfo allows proxier to inject customized information when processing service.
makeServiceInfo makeServicePortFunc
makeServiceInfo makeServicePortFunc
processServiceMapChange processServiceMapChangeFunc
// isIPv6Mode indicates if change tracker is under IPv6/IPv4 mode. Nil means not applicable.
isIPv6Mode *bool
recorder record.EventRecorder
}
// NewServiceChangeTracker initializes a ServiceChangeTracker
func NewServiceChangeTracker(makeServiceInfo makeServicePortFunc, isIPv6Mode *bool, recorder record.EventRecorder) *ServiceChangeTracker {
func NewServiceChangeTracker(makeServiceInfo makeServicePortFunc, isIPv6Mode *bool, recorder record.EventRecorder, processServiceMapChange processServiceMapChangeFunc) *ServiceChangeTracker {
return &ServiceChangeTracker{
items: make(map[types.NamespacedName]*serviceChange),
makeServiceInfo: makeServiceInfo,
isIPv6Mode: isIPv6Mode,
recorder: recorder,
items: make(map[types.NamespacedName]*serviceChange),
makeServiceInfo: makeServiceInfo,
isIPv6Mode: isIPv6Mode,
recorder: recorder,
processServiceMapChange: processServiceMapChange,
}
}
@ -337,10 +343,14 @@ func (sct *ServiceChangeTracker) serviceToServiceMap(service *v1.Service) Servic
// apply the changes to ServiceMap and update the stale udp cluster IP set. The UDPStaleClusterIP argument is passed in to store the
// udp protocol service cluster ip when service is deleted from the ServiceMap.
// apply triggers processServiceMapChange on every change.
func (sm *ServiceMap) apply(changes *ServiceChangeTracker, UDPStaleClusterIP sets.String) {
changes.lock.Lock()
defer changes.lock.Unlock()
for _, change := range changes.items {
if changes.processServiceMapChange != nil {
changes.processServiceMapChange(change.previous, change.current)
}
sm.merge(change.current)
// filter out the Update event of current changes from previous changes before calling unmerge() so that can
// skip deleting the Update events.

View File

@ -85,7 +85,7 @@ func makeServicePortName(ns, name, port string, protocol v1.Protocol) ServicePor
}
func TestServiceToServiceMap(t *testing.T) {
svcTracker := NewServiceChangeTracker(nil, nil, nil)
svcTracker := NewServiceChangeTracker(nil, nil, nil, nil)
trueVal := true
falseVal := false
@ -449,9 +449,9 @@ type FakeProxier struct {
func newFakeProxier() *FakeProxier {
return &FakeProxier{
serviceMap: make(ServiceMap),
serviceChanges: NewServiceChangeTracker(nil, nil, nil),
serviceChanges: NewServiceChangeTracker(nil, nil, nil, nil),
endpointsMap: make(EndpointsMap),
endpointsChanges: NewEndpointChangeTracker(testHostname, nil, nil, nil, false),
endpointsChanges: NewEndpointChangeTracker(testHostname, nil, nil, nil, false, nil),
}
}