diff --git a/pkg/proxy/endpoints.go b/pkg/proxy/endpoints.go index 6a2fc3809b8..b797a01e10d 100644 --- a/pkg/proxy/endpoints.go +++ b/pkg/proxy/endpoints.go @@ -94,6 +94,10 @@ func newBaseEndpointInfo(IP string, port int, isLocal bool, topology map[string] type makeEndpointFunc func(info *BaseEndpointInfo) Endpoint +// This handler is invoked by the apply function on every change. This function should not modify the +// EndpointsMap's but just use the changes for any Proxier specific cleanup. +type processEndpointsMapChangeFunc func(oldEndpointsMap, newEndpointsMap EndpointsMap) + // EndpointChangeTracker carries state about uncommitted changes to an arbitrary number of // Endpoints, keyed by their namespace and name. type EndpointChangeTracker struct { @@ -104,7 +108,8 @@ type EndpointChangeTracker struct { // items maps a service to is endpointsChange. items map[types.NamespacedName]*endpointsChange // makeEndpointInfo allows proxier to inject customized information when processing endpoint. - makeEndpointInfo makeEndpointFunc + makeEndpointInfo makeEndpointFunc + processEndpointsMapChange processEndpointsMapChangeFunc // endpointSliceCache holds a simplified version of endpoint slices. endpointSliceCache *EndpointSliceCache // isIPv6Mode indicates if change tracker is under IPv6/IPv4 mode. Nil means not applicable. @@ -116,14 +121,15 @@ type EndpointChangeTracker struct { } // NewEndpointChangeTracker initializes an EndpointsChangeMap -func NewEndpointChangeTracker(hostname string, makeEndpointInfo makeEndpointFunc, isIPv6Mode *bool, recorder record.EventRecorder, endpointSlicesEnabled bool) *EndpointChangeTracker { +func NewEndpointChangeTracker(hostname string, makeEndpointInfo makeEndpointFunc, isIPv6Mode *bool, recorder record.EventRecorder, endpointSlicesEnabled bool, processEndpointsMapChange processEndpointsMapChangeFunc) *EndpointChangeTracker { ect := &EndpointChangeTracker{ - hostname: hostname, - items: make(map[types.NamespacedName]*endpointsChange), - makeEndpointInfo: makeEndpointInfo, - isIPv6Mode: isIPv6Mode, - recorder: recorder, - lastChangeTriggerTimes: make(map[types.NamespacedName][]time.Time), + hostname: hostname, + items: make(map[types.NamespacedName]*endpointsChange), + makeEndpointInfo: makeEndpointInfo, + isIPv6Mode: isIPv6Mode, + recorder: recorder, + lastChangeTriggerTimes: make(map[types.NamespacedName][]time.Time), + processEndpointsMapChange: processEndpointsMapChange, } if endpointSlicesEnabled { ect.endpointSliceCache = NewEndpointSliceCache(hostname, isIPv6Mode, recorder, makeEndpointInfo) @@ -388,6 +394,7 @@ func (ect *EndpointChangeTracker) endpointsToEndpointsMap(endpoints *v1.Endpoint // The changes map is cleared after applying them. // In addition it returns (via argument) and resets the lastChangeTriggerTimes for all endpoints // that were changed and will result in syncing the proxy rules. +// apply triggers processEndpointsMapChange on every change. func (em EndpointsMap) apply(ect *EndpointChangeTracker, staleEndpoints *[]ServiceEndpoint, staleServiceNames *[]ServicePortName, lastChangeTriggerTimes *map[types.NamespacedName][]time.Time) { if ect == nil { @@ -396,6 +403,9 @@ func (em EndpointsMap) apply(ect *EndpointChangeTracker, staleEndpoints *[]Servi changes := ect.checkoutChanges() for _, change := range changes { + if ect.processEndpointsMapChange != nil { + ect.processEndpointsMapChange(change.previous, change.current) + } em.unmerge(change.previous) em.merge(change.current) detectStaleConnections(change.previous, change.current, staleEndpoints, staleServiceNames) diff --git a/pkg/proxy/endpoints_test.go b/pkg/proxy/endpoints_test.go index d6abcc1ade9..1794d2b5500 100644 --- a/pkg/proxy/endpoints_test.go +++ b/pkg/proxy/endpoints_test.go @@ -135,7 +135,7 @@ func makeTestEndpoints(namespace, name string, eptFunc func(*v1.Endpoints)) *v1. // This is a coarse test, but it offers some modicum of confidence as the code is evolved. func TestEndpointsToEndpointsMap(t *testing.T) { - epTracker := NewEndpointChangeTracker("test-hostname", nil, nil, nil, false) + epTracker := NewEndpointChangeTracker("test-hostname", nil, nil, nil, false, nil) trueVal := true falseVal := false @@ -1446,7 +1446,7 @@ func TestEndpointSliceUpdate(t *testing.T) { // test starting from an empty state "add a simple slice that doesn't already exist": { startingSlices: []*discovery.EndpointSlice{}, - endpointChangeTracker: NewEndpointChangeTracker("host1", nil, nil, nil, true), + endpointChangeTracker: NewEndpointChangeTracker("host1", nil, nil, nil, true, nil), namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"}, paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 3, 999, []string{"host1", "host2"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}), paramRemoveSlice: false, @@ -1469,7 +1469,7 @@ func TestEndpointSliceUpdate(t *testing.T) { startingSlices: []*discovery.EndpointSlice{ generateEndpointSlice("svc1", "ns1", 1, 3, 999, []string{"host1", "host2"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}), }, - endpointChangeTracker: NewEndpointChangeTracker("host1", nil, nil, nil, true), + endpointChangeTracker: NewEndpointChangeTracker("host1", nil, nil, nil, true, nil), namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"}, paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 3, 999, []string{"host1", "host2"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}), paramRemoveSlice: false, @@ -1481,7 +1481,7 @@ func TestEndpointSliceUpdate(t *testing.T) { startingSlices: []*discovery.EndpointSlice{ generateEndpointSlice("svc1", "ns1", 1, 3, 999, []string{"host1", "host2"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}), }, - endpointChangeTracker: NewEndpointChangeTracker("host1", nil, nil, nil, true), + endpointChangeTracker: NewEndpointChangeTracker("host1", nil, nil, nil, true, nil), namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"}, paramEndpointSlice: fqdnSlice, paramRemoveSlice: false, @@ -1494,7 +1494,7 @@ func TestEndpointSliceUpdate(t *testing.T) { generateEndpointSlice("svc1", "ns1", 1, 3, 999, []string{"host1", "host2"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}), generateEndpointSlice("svc1", "ns1", 2, 2, 999, []string{"host1", "host2"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}), }, - endpointChangeTracker: NewEndpointChangeTracker("host1", nil, nil, nil, true), + endpointChangeTracker: NewEndpointChangeTracker("host1", nil, nil, nil, true, nil), namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"}, paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 5, 999, []string{"host1"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}), paramRemoveSlice: false, @@ -1526,7 +1526,7 @@ func TestEndpointSliceUpdate(t *testing.T) { generateEndpointSlice("svc1", "ns1", 1, 3, 999, []string{"host1", "host2"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}), generateEndpointSlice("svc1", "ns1", 2, 2, 999, []string{"host1", "host2"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}), }, - endpointChangeTracker: NewEndpointChangeTracker("host1", nil, nil, nil, true), + endpointChangeTracker: NewEndpointChangeTracker("host1", nil, nil, nil, true, nil), namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"}, paramEndpointSlice: generateEndpointSliceWithOffset("svc1", "ns1", 3, 1, 5, 999, []string{"host1"}, []*int32{utilpointer.Int32Ptr(80)}), paramRemoveSlice: false, @@ -1556,7 +1556,7 @@ func TestEndpointSliceUpdate(t *testing.T) { generateEndpointSlice("svc1", "ns1", 1, 3, 999, []string{"host1", "host2"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}), generateEndpointSlice("svc1", "ns1", 2, 2, 999, []string{"host1", "host2"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}), }, - endpointChangeTracker: NewEndpointChangeTracker("host1", nil, nil, nil, true), + endpointChangeTracker: NewEndpointChangeTracker("host1", nil, nil, nil, true, nil), namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"}, paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 5, 999, []string{"host1"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}), paramRemoveSlice: true, @@ -1578,7 +1578,7 @@ func TestEndpointSliceUpdate(t *testing.T) { generateEndpointSlice("svc1", "ns1", 1, 5, 999, []string{"host1"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}), generateEndpointSlice("svc1", "ns1", 2, 2, 999, []string{"host1"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}), }, - endpointChangeTracker: NewEndpointChangeTracker("host1", nil, nil, nil, true), + endpointChangeTracker: NewEndpointChangeTracker("host1", nil, nil, nil, true, nil), namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"}, paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 3, 5, 999, []string{"host1"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}), paramRemoveSlice: true, @@ -1590,7 +1590,7 @@ func TestEndpointSliceUpdate(t *testing.T) { startingSlices: []*discovery.EndpointSlice{ generateEndpointSlice("svc1", "ns1", 1, 3, 999, []string{"host1", "host2"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}), }, - endpointChangeTracker: NewEndpointChangeTracker("host1", nil, nil, nil, true), + endpointChangeTracker: NewEndpointChangeTracker("host1", nil, nil, nil, true, nil), namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"}, paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 3, 1, []string{"host1"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}), paramRemoveSlice: false, @@ -1602,7 +1602,7 @@ func TestEndpointSliceUpdate(t *testing.T) { startingSlices: []*discovery.EndpointSlice{ generateEndpointSlice("svc1", "ns1", 1, 2, 1, []string{"host1", "host2"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}), }, - endpointChangeTracker: NewEndpointChangeTracker("host1", nil, nil, nil, true), + endpointChangeTracker: NewEndpointChangeTracker("host1", nil, nil, nil, true, nil), namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"}, paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 2, 999, []string{"host1"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}), paramRemoveSlice: false, @@ -1624,7 +1624,7 @@ func TestEndpointSliceUpdate(t *testing.T) { generateEndpointSlice("svc1", "ns1", 1, 3, 2, []string{"host1"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}), generateEndpointSlice("svc1", "ns1", 2, 2, 2, []string{"host1"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}), }, - endpointChangeTracker: NewEndpointChangeTracker("host1", nil, nil, nil, true), + endpointChangeTracker: NewEndpointChangeTracker("host1", nil, nil, nil, true, nil), namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"}, paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 3, 3, []string{"host1"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}), paramRemoveSlice: false, @@ -1683,20 +1683,20 @@ func TestCheckoutChanges(t *testing.T) { pendingSlices []*discovery.EndpointSlice }{ "empty slices": { - endpointChangeTracker: NewEndpointChangeTracker("", nil, nil, nil, true), + endpointChangeTracker: NewEndpointChangeTracker("", nil, nil, nil, true, nil), expectedChanges: []*endpointsChange{}, useEndpointSlices: true, appliedSlices: []*discovery.EndpointSlice{}, pendingSlices: []*discovery.EndpointSlice{}, }, "without slices, empty items": { - endpointChangeTracker: NewEndpointChangeTracker("", nil, nil, nil, false), + endpointChangeTracker: NewEndpointChangeTracker("", nil, nil, nil, false, nil), expectedChanges: []*endpointsChange{}, items: map[types.NamespacedName]*endpointsChange{}, useEndpointSlices: false, }, "without slices, simple items": { - endpointChangeTracker: NewEndpointChangeTracker("", nil, nil, nil, false), + endpointChangeTracker: NewEndpointChangeTracker("", nil, nil, nil, false, nil), expectedChanges: []*endpointsChange{{ previous: EndpointsMap{ svcPortName0: []Endpoint{newTestEp("10.0.1.1:80", ""), newTestEp("10.0.1.2:80", "")}, @@ -1720,7 +1720,7 @@ func TestCheckoutChanges(t *testing.T) { useEndpointSlices: false, }, "adding initial slice": { - endpointChangeTracker: NewEndpointChangeTracker("", nil, nil, nil, true), + endpointChangeTracker: NewEndpointChangeTracker("", nil, nil, nil, true, nil), expectedChanges: []*endpointsChange{{ previous: EndpointsMap{}, current: EndpointsMap{ @@ -1734,7 +1734,7 @@ func TestCheckoutChanges(t *testing.T) { }, }, "removing port in update": { - endpointChangeTracker: NewEndpointChangeTracker("", nil, nil, nil, true), + endpointChangeTracker: NewEndpointChangeTracker("", nil, nil, nil, true, nil), expectedChanges: []*endpointsChange{{ previous: EndpointsMap{ svcPortName0: []Endpoint{newTestEp("10.0.1.1:80", "host1"), newTestEp("10.0.1.2:80", "host1")}, diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index 2cc09c31401..61161aecb5e 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -298,9 +298,9 @@ func NewProxier(ipt utiliptables.Interface, proxier := &Proxier{ portsMap: make(map[utilproxy.LocalPort]utilproxy.Closeable), serviceMap: make(proxy.ServiceMap), - serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, &isIPv6, recorder), + serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, &isIPv6, recorder, nil), endpointsMap: make(proxy.EndpointsMap), - endpointsChanges: proxy.NewEndpointChangeTracker(hostname, newEndpointInfo, &isIPv6, recorder, endpointSlicesEnabled), + endpointsChanges: proxy.NewEndpointChangeTracker(hostname, newEndpointInfo, &isIPv6, recorder, endpointSlicesEnabled, nil), syncPeriod: syncPeriod, iptables: ipt, masqueradeAll: masqueradeAll, diff --git a/pkg/proxy/iptables/proxier_test.go b/pkg/proxy/iptables/proxier_test.go index 0b12b4f7bf7..9f1eb72d833 100644 --- a/pkg/proxy/iptables/proxier_test.go +++ b/pkg/proxy/iptables/proxier_test.go @@ -353,9 +353,9 @@ func NewFakeProxier(ipt utiliptables.Interface, endpointSlicesEnabled bool) *Pro p := &Proxier{ exec: &fakeexec.FakeExec{}, serviceMap: make(proxy.ServiceMap), - serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, nil, nil), + serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, nil, nil, nil), endpointsMap: make(proxy.EndpointsMap), - endpointsChanges: proxy.NewEndpointChangeTracker(testHostname, newEndpointInfo, nil, nil, endpointSlicesEnabled), + endpointsChanges: proxy.NewEndpointChangeTracker(testHostname, newEndpointInfo, nil, nil, endpointSlicesEnabled, nil), iptables: ipt, masqueradeMark: "0x4000", localDetector: detectLocal, diff --git a/pkg/proxy/ipvs/proxier.go b/pkg/proxy/ipvs/proxier.go index cf794639f00..387d0b14ae3 100644 --- a/pkg/proxy/ipvs/proxier.go +++ b/pkg/proxy/ipvs/proxier.go @@ -441,9 +441,9 @@ func NewProxier(ipt utiliptables.Interface, proxier := &Proxier{ portsMap: make(map[utilproxy.LocalPort]utilproxy.Closeable), serviceMap: make(proxy.ServiceMap), - serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, &isIPv6, recorder), + serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, &isIPv6, recorder, nil), endpointsMap: make(proxy.EndpointsMap), - endpointsChanges: proxy.NewEndpointChangeTracker(hostname, nil, &isIPv6, recorder, endpointSlicesEnabled), + endpointsChanges: proxy.NewEndpointChangeTracker(hostname, nil, &isIPv6, recorder, endpointSlicesEnabled, nil), syncPeriod: syncPeriod, minSyncPeriod: minSyncPeriod, excludeCIDRs: parseExcludedCIDRs(excludeCIDRs), diff --git a/pkg/proxy/ipvs/proxier_test.go b/pkg/proxy/ipvs/proxier_test.go index 53a056ab6ad..cb90c472235 100644 --- a/pkg/proxy/ipvs/proxier_test.go +++ b/pkg/proxy/ipvs/proxier_test.go @@ -122,9 +122,9 @@ func NewFakeProxier(ipt utiliptables.Interface, ipvs utilipvs.Interface, ipset u p := &Proxier{ exec: fexec, serviceMap: make(proxy.ServiceMap), - serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, nil, nil), + serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, nil, nil, nil), endpointsMap: make(proxy.EndpointsMap), - endpointsChanges: proxy.NewEndpointChangeTracker(testHostname, nil, nil, nil, endpointSlicesEnabled), + endpointsChanges: proxy.NewEndpointChangeTracker(testHostname, nil, nil, nil, endpointSlicesEnabled, nil), excludeCIDRs: excludeCIDRs, iptables: ipt, ipvs: ipvs, diff --git a/pkg/proxy/service.go b/pkg/proxy/service.go index 038eab23f59..3eea1fc7719 100644 --- a/pkg/proxy/service.go +++ b/pkg/proxy/service.go @@ -198,6 +198,10 @@ func (sct *ServiceChangeTracker) newBaseServiceInfo(port *v1.ServicePort, servic type makeServicePortFunc func(*v1.ServicePort, *v1.Service, *BaseServiceInfo) ServicePort +// This handler is invoked by the apply function on every change. This function should not modify the +// ServiceMap's but just use the changes for any Proxier specific cleanup. +type processServiceMapChangeFunc func(previous, current ServiceMap) + // serviceChange contains all changes to services that happened since proxy rules were synced. For a single object, // changes are accumulated, i.e. previous is state from before applying the changes, // current is state after applying all of the changes. @@ -214,19 +218,21 @@ type ServiceChangeTracker struct { // items maps a service to its serviceChange. items map[types.NamespacedName]*serviceChange // makeServiceInfo allows proxier to inject customized information when processing service. - makeServiceInfo makeServicePortFunc + makeServiceInfo makeServicePortFunc + processServiceMapChange processServiceMapChangeFunc // isIPv6Mode indicates if change tracker is under IPv6/IPv4 mode. Nil means not applicable. isIPv6Mode *bool recorder record.EventRecorder } // NewServiceChangeTracker initializes a ServiceChangeTracker -func NewServiceChangeTracker(makeServiceInfo makeServicePortFunc, isIPv6Mode *bool, recorder record.EventRecorder) *ServiceChangeTracker { +func NewServiceChangeTracker(makeServiceInfo makeServicePortFunc, isIPv6Mode *bool, recorder record.EventRecorder, processServiceMapChange processServiceMapChangeFunc) *ServiceChangeTracker { return &ServiceChangeTracker{ - items: make(map[types.NamespacedName]*serviceChange), - makeServiceInfo: makeServiceInfo, - isIPv6Mode: isIPv6Mode, - recorder: recorder, + items: make(map[types.NamespacedName]*serviceChange), + makeServiceInfo: makeServiceInfo, + isIPv6Mode: isIPv6Mode, + recorder: recorder, + processServiceMapChange: processServiceMapChange, } } @@ -337,10 +343,14 @@ func (sct *ServiceChangeTracker) serviceToServiceMap(service *v1.Service) Servic // apply the changes to ServiceMap and update the stale udp cluster IP set. The UDPStaleClusterIP argument is passed in to store the // udp protocol service cluster ip when service is deleted from the ServiceMap. +// apply triggers processServiceMapChange on every change. func (sm *ServiceMap) apply(changes *ServiceChangeTracker, UDPStaleClusterIP sets.String) { changes.lock.Lock() defer changes.lock.Unlock() for _, change := range changes.items { + if changes.processServiceMapChange != nil { + changes.processServiceMapChange(change.previous, change.current) + } sm.merge(change.current) // filter out the Update event of current changes from previous changes before calling unmerge() so that can // skip deleting the Update events. diff --git a/pkg/proxy/service_test.go b/pkg/proxy/service_test.go index 9f1195968f2..656dc7306cf 100644 --- a/pkg/proxy/service_test.go +++ b/pkg/proxy/service_test.go @@ -85,7 +85,7 @@ func makeServicePortName(ns, name, port string, protocol v1.Protocol) ServicePor } func TestServiceToServiceMap(t *testing.T) { - svcTracker := NewServiceChangeTracker(nil, nil, nil) + svcTracker := NewServiceChangeTracker(nil, nil, nil, nil) trueVal := true falseVal := false @@ -449,9 +449,9 @@ type FakeProxier struct { func newFakeProxier() *FakeProxier { return &FakeProxier{ serviceMap: make(ServiceMap), - serviceChanges: NewServiceChangeTracker(nil, nil, nil), + serviceChanges: NewServiceChangeTracker(nil, nil, nil, nil), endpointsMap: make(EndpointsMap), - endpointsChanges: NewEndpointChangeTracker(testHostname, nil, nil, nil, false), + endpointsChanges: NewEndpointChangeTracker(testHostname, nil, nil, nil, false, nil), } }