From 6c395eb098dc726d870425eb2fb4e4f8cfe23694 Mon Sep 17 00:00:00 2001 From: Dan Winship Date: Mon, 9 Oct 2023 17:21:12 -0400 Subject: [PATCH] Fix "Endpoint" vs "Endpoints" in proxy type names The use of "Endpoint" vs "Endpoints" in these type names is tricky because it doesn't always make sense to use the same singular/plural convention as the corresonding service-related types, since often the service-related type is referring to a single service while the endpoint-related type is referring to multiple endpoint IPs. The "endpointsInfo" types in the iptables and winkernel proxiers are now "endpointInfo" because they describe a single endpoint IP (and wrap proxy.BaseEndpointInfo). "UpdateEndpointMapResult" is now "UpdateEndpointsMapResult", because it is the result of EndpointsMap.Update (and it's clearly correct for EndpointsMap to have plural "Endpoints" because it's a map to an array of proxy.Endpoint objects.) "EndpointChangeTracker" is now "EndpointsChangeTracker" because it tracks changes to the full set of endpoints for a particular service (and the new name matches the existing "endpointsChange" type and "Proxier.endpointsChanges" fields.) --- pkg/proxy/conntrack/cleanup.go | 16 ++-- pkg/proxy/endpoints.go | 32 +++---- pkg/proxy/endpoints_test.go | 130 ++++++++++++++-------------- pkg/proxy/iptables/proxier.go | 20 ++--- pkg/proxy/iptables/proxier_test.go | 72 +++++++-------- pkg/proxy/ipvs/proxier.go | 4 +- pkg/proxy/ipvs/proxier_test.go | 4 +- pkg/proxy/service_test.go | 4 +- pkg/proxy/winkernel/hns.go | 44 +++++----- pkg/proxy/winkernel/hns_test.go | 20 ++--- pkg/proxy/winkernel/proxier.go | 94 ++++++++++---------- pkg/proxy/winkernel/proxier_test.go | 32 +++---- 12 files changed, 236 insertions(+), 236 deletions(-) diff --git a/pkg/proxy/conntrack/cleanup.go b/pkg/proxy/conntrack/cleanup.go index b3b55dab978..4d25ff2f861 100644 --- a/pkg/proxy/conntrack/cleanup.go +++ b/pkg/proxy/conntrack/cleanup.go @@ -27,23 +27,23 @@ import ( // CleanStaleEntries takes care of flushing stale conntrack entries for services and endpoints. func CleanStaleEntries(isIPv6 bool, exec utilexec.Interface, svcPortMap proxy.ServicePortMap, - serviceUpdateResult proxy.UpdateServiceMapResult, endpointUpdateResult proxy.UpdateEndpointMapResult) { + serviceUpdateResult proxy.UpdateServiceMapResult, endpointsUpdateResult proxy.UpdateEndpointsMapResult) { - deleteStaleServiceConntrackEntries(isIPv6, exec, svcPortMap, serviceUpdateResult, endpointUpdateResult) - deleteStaleEndpointConntrackEntries(exec, svcPortMap, endpointUpdateResult) + deleteStaleServiceConntrackEntries(isIPv6, exec, svcPortMap, serviceUpdateResult, endpointsUpdateResult) + deleteStaleEndpointConntrackEntries(exec, svcPortMap, endpointsUpdateResult) } // deleteStaleServiceConntrackEntries takes care of flushing stale conntrack entries related // to UDP Service IPs. When a service has no endpoints and we drop traffic to it, conntrack // may create "black hole" entries for that IP+port. When the service gets endpoints we // need to delete those entries so further traffic doesn't get dropped. -func deleteStaleServiceConntrackEntries(isIPv6 bool, exec utilexec.Interface, svcPortMap proxy.ServicePortMap, serviceUpdateResult proxy.UpdateServiceMapResult, endpointUpdateResult proxy.UpdateEndpointMapResult) { +func deleteStaleServiceConntrackEntries(isIPv6 bool, exec utilexec.Interface, svcPortMap proxy.ServicePortMap, serviceUpdateResult proxy.UpdateServiceMapResult, endpointsUpdateResult proxy.UpdateEndpointsMapResult) { conntrackCleanupServiceIPs := serviceUpdateResult.DeletedUDPClusterIPs conntrackCleanupServiceNodePorts := sets.New[int]() - // merge newly active services gathered from updateEndpointsMap + // merge newly active services gathered from endpointsUpdateResult // a UDP service that changes from 0 to non-0 endpoints is newly active. - for _, svcPortName := range endpointUpdateResult.NewlyActiveUDPServices { + for _, svcPortName := range endpointsUpdateResult.NewlyActiveUDPServices { if svcInfo, ok := svcPortMap[svcPortName]; ok { klog.V(4).InfoS("Newly-active UDP service may have stale conntrack entries", "servicePortName", svcPortName) conntrackCleanupServiceIPs.Insert(svcInfo.ClusterIP().String()) @@ -78,8 +78,8 @@ func deleteStaleServiceConntrackEntries(isIPv6 bool, exec utilexec.Interface, sv // deleteStaleEndpointConntrackEntries takes care of flushing stale conntrack entries related // to UDP endpoints. After a UDP endpoint is removed we must flush any conntrack entries // for it so that if the same client keeps sending, the packets will get routed to a new endpoint. -func deleteStaleEndpointConntrackEntries(exec utilexec.Interface, svcPortMap proxy.ServicePortMap, endpointUpdateResult proxy.UpdateEndpointMapResult) { - for _, epSvcPair := range endpointUpdateResult.DeletedUDPEndpoints { +func deleteStaleEndpointConntrackEntries(exec utilexec.Interface, svcPortMap proxy.ServicePortMap, endpointsUpdateResult proxy.UpdateEndpointsMapResult) { + for _, epSvcPair := range endpointsUpdateResult.DeletedUDPEndpoints { if svcInfo, ok := svcPortMap[epSvcPair.ServicePortName]; ok { endpointIP := proxyutil.IPPart(epSvcPair.Endpoint) nodePort := svcInfo.NodePort() diff --git a/pkg/proxy/endpoints.go b/pkg/proxy/endpoints.go index 47976dbf094..0a45c948a98 100644 --- a/pkg/proxy/endpoints.go +++ b/pkg/proxy/endpoints.go @@ -147,9 +147,9 @@ type makeEndpointFunc func(info *BaseEndpointInfo, svcPortName *ServicePortName) // EndpointsMap's but just use the changes for any Proxier specific cleanup. type processEndpointsMapChangeFunc func(oldEndpointsMap, newEndpointsMap EndpointsMap) -// EndpointChangeTracker carries state about uncommitted changes to an arbitrary number of +// EndpointsChangeTracker carries state about uncommitted changes to an arbitrary number of // Endpoints, keyed by their namespace and name. -type EndpointChangeTracker struct { +type EndpointsChangeTracker struct { // lock protects lastChangeTriggerTimes lock sync.Mutex @@ -159,16 +159,16 @@ type EndpointChangeTracker struct { // Map from the Endpoints namespaced-name to the times of the triggers that caused the endpoints // object to change. Used to calculate the network-programming-latency. lastChangeTriggerTimes map[types.NamespacedName][]time.Time - // record the time when the endpointChangeTracker was created so we can ignore the endpoints + // record the time when the endpointsChangeTracker was created so we can ignore the endpoints // that were generated before, because we can't estimate the network-programming-latency on those. // This is specially problematic on restarts, because we process all the endpoints that may have been // created hours or days before. trackerStartTime time.Time } -// NewEndpointChangeTracker initializes an EndpointsChangeMap -func NewEndpointChangeTracker(hostname string, makeEndpointInfo makeEndpointFunc, ipFamily v1.IPFamily, recorder events.EventRecorder, processEndpointsMapChange processEndpointsMapChangeFunc) *EndpointChangeTracker { - return &EndpointChangeTracker{ +// NewEndpointsChangeTracker initializes an EndpointsChangeTracker +func NewEndpointsChangeTracker(hostname string, makeEndpointInfo makeEndpointFunc, ipFamily v1.IPFamily, recorder events.EventRecorder, processEndpointsMapChange processEndpointsMapChangeFunc) *EndpointsChangeTracker { + return &EndpointsChangeTracker{ lastChangeTriggerTimes: make(map[types.NamespacedName][]time.Time), trackerStartTime: time.Now(), processEndpointsMapChange: processEndpointsMapChange, @@ -177,9 +177,9 @@ func NewEndpointChangeTracker(hostname string, makeEndpointInfo makeEndpointFunc } // EndpointSliceUpdate updates given service's endpoints change map based on the endpoints pair. -// It returns true if items changed, otherwise return false. Will add/update/delete items of EndpointsChangeMap. +// It returns true if items changed, otherwise return false. Will add/update/delete items of EndpointsChangeTracker. // If removeSlice is true, slice will be removed, otherwise it will be added or updated. -func (ect *EndpointChangeTracker) EndpointSliceUpdate(endpointSlice *discovery.EndpointSlice, removeSlice bool) bool { +func (ect *EndpointsChangeTracker) EndpointSliceUpdate(endpointSlice *discovery.EndpointSlice, removeSlice bool) bool { if !supportedEndpointSliceAddressTypes.Has(string(endpointSlice.AddressType)) { klog.V(4).InfoS("EndpointSlice address type not supported by kube-proxy", "addressType", endpointSlice.AddressType) return false @@ -225,13 +225,13 @@ func (ect *EndpointChangeTracker) EndpointSliceUpdate(endpointSlice *discovery.E // PendingChanges returns a set whose keys are the names of the services whose endpoints // have changed since the last time ect was used to update an EndpointsMap. (You must call // this _before_ calling em.Update(ect).) -func (ect *EndpointChangeTracker) PendingChanges() sets.Set[string] { +func (ect *EndpointsChangeTracker) PendingChanges() sets.Set[string] { return ect.endpointSliceCache.pendingChanges() } // checkoutChanges returns a list of pending endpointsChanges and marks them as // applied. -func (ect *EndpointChangeTracker) checkoutChanges() []*endpointsChange { +func (ect *EndpointsChangeTracker) checkoutChanges() []*endpointsChange { metrics.EndpointChangesPending.Set(0) return ect.endpointSliceCache.checkoutChanges() @@ -239,7 +239,7 @@ func (ect *EndpointChangeTracker) checkoutChanges() []*endpointsChange { // checkoutTriggerTimes applies the locally cached trigger times to a map of // trigger times that have been passed in and empties the local cache. -func (ect *EndpointChangeTracker) checkoutTriggerTimes(lastChangeTriggerTimes *map[types.NamespacedName][]time.Time) { +func (ect *EndpointsChangeTracker) checkoutTriggerTimes(lastChangeTriggerTimes *map[types.NamespacedName][]time.Time) { ect.lock.Lock() defer ect.lock.Unlock() @@ -284,8 +284,8 @@ type endpointsChange struct { current EndpointsMap } -// UpdateEndpointMapResult is the updated results after applying endpoints changes. -type UpdateEndpointMapResult struct { +// UpdateEndpointsMapResult is the updated results after applying endpoints changes. +type UpdateEndpointsMapResult struct { // DeletedUDPEndpoints identifies UDP endpoints that have just been deleted. // Existing conntrack NAT entries pointing to these endpoints must be deleted to // ensure that no further traffic for the Service gets delivered to them. @@ -304,7 +304,7 @@ type UpdateEndpointMapResult struct { } // Update updates endpointsMap base on the given changes. -func (em EndpointsMap) Update(changes *EndpointChangeTracker) (result UpdateEndpointMapResult) { +func (em EndpointsMap) Update(changes *EndpointsChangeTracker) (result UpdateEndpointsMapResult) { result.DeletedUDPEndpoints = make([]ServiceEndpoint, 0) result.NewlyActiveUDPServices = make([]ServicePortName, 0) result.LastChangeTriggerTimes = make(map[types.NamespacedName][]time.Time) @@ -321,7 +321,7 @@ type EndpointsMap map[ServicePortName][]Endpoint // and clear the changes map. In addition it returns (via argument) and resets the // lastChangeTriggerTimes for all endpoints that were changed and will result in syncing // the proxy rules. apply triggers processEndpointsMapChange on every change. -func (em EndpointsMap) apply(ect *EndpointChangeTracker, deletedUDPEndpoints *[]ServiceEndpoint, +func (em EndpointsMap) apply(ect *EndpointsChangeTracker, deletedUDPEndpoints *[]ServiceEndpoint, newlyActiveUDPServices *[]ServicePortName, lastChangeTriggerTimes *map[types.NamespacedName][]time.Time) { if ect == nil { return @@ -396,7 +396,7 @@ func (em EndpointsMap) LocalReadyEndpoints() map[types.NamespacedName]int { } // detectStaleConntrackEntries detects services that may be associated with stale conntrack entries. -// (See UpdateEndpointMapResult.DeletedUDPEndpoints and .NewlyActiveUDPServices.) +// (See UpdateEndpointsMapResult.DeletedUDPEndpoints and .NewlyActiveUDPServices.) func detectStaleConntrackEntries(oldEndpointsMap, newEndpointsMap EndpointsMap, deletedUDPEndpoints *[]ServiceEndpoint, newlyActiveUDPServices *[]ServicePortName) { // Find the UDP endpoints that we were sending traffic to in oldEndpointsMap, but // are no longer sending to newEndpointsMap. The proxier should make sure that diff --git a/pkg/proxy/endpoints_test.go b/pkg/proxy/endpoints_test.go index 824a5a91461..35bfa16ca89 100644 --- a/pkg/proxy/endpoints_test.go +++ b/pkg/proxy/endpoints_test.go @@ -1340,7 +1340,7 @@ func TestEndpointSliceUpdate(t *testing.T) { testCases := map[string]struct { startingSlices []*discovery.EndpointSlice - endpointChangeTracker *EndpointChangeTracker + endpointsChangeTracker *EndpointsChangeTracker namespacedName types.NamespacedName paramEndpointSlice *discovery.EndpointSlice paramRemoveSlice bool @@ -1350,12 +1350,12 @@ func TestEndpointSliceUpdate(t *testing.T) { }{ // test starting from an empty state "add a simple slice that doesn't already exist": { - startingSlices: []*discovery.EndpointSlice{}, - endpointChangeTracker: NewEndpointChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil), - namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"}, - paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 3, 999, 999, []string{"host1", "host2"}, []*int32{pointer.Int32(80), pointer.Int32(443)}), - paramRemoveSlice: false, - expectedReturnVal: true, + startingSlices: []*discovery.EndpointSlice{}, + endpointsChangeTracker: NewEndpointsChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil), + namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"}, + paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 3, 999, 999, []string{"host1", "host2"}, []*int32{pointer.Int32(80), pointer.Int32(443)}), + paramRemoveSlice: false, + expectedReturnVal: true, expectedCurrentChange: map[ServicePortName][]*BaseEndpointInfo{ makeServicePortName("ns1", "svc1", "port-0", v1.ProtocolTCP): { &BaseEndpointInfo{Endpoint: "10.0.1.1:80", IsLocal: false, Ready: true, Serving: true, Terminating: false}, @@ -1375,7 +1375,7 @@ func TestEndpointSliceUpdate(t *testing.T) { startingSlices: []*discovery.EndpointSlice{ generateEndpointSlice("svc1", "ns1", 1, 3, 999, 999, []string{"host1", "host2"}, []*int32{pointer.Int32(80), pointer.Int32(443)}), }, - endpointChangeTracker: NewEndpointChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil), + endpointsChangeTracker: NewEndpointsChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil), namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"}, paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 3, 999, 999, []string{"host1", "host2"}, []*int32{pointer.Int32(80), pointer.Int32(443)}), paramRemoveSlice: false, @@ -1388,7 +1388,7 @@ func TestEndpointSliceUpdate(t *testing.T) { startingSlices: []*discovery.EndpointSlice{ generateEndpointSlice("svc1", "ns1", 1, 3, 999, 999, []string{"host1", "host2"}, []*int32{pointer.Int32(80), pointer.Int32(443)}), }, - endpointChangeTracker: NewEndpointChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil), + endpointsChangeTracker: NewEndpointsChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil), namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"}, paramEndpointSlice: fqdnSlice, paramRemoveSlice: false, @@ -1402,11 +1402,11 @@ func TestEndpointSliceUpdate(t *testing.T) { generateEndpointSlice("svc1", "ns1", 1, 3, 999, 999, []string{"host1", "host2"}, []*int32{pointer.Int32(80), pointer.Int32(443)}), generateEndpointSlice("svc1", "ns1", 2, 2, 999, 999, []string{"host1", "host2"}, []*int32{pointer.Int32(80), pointer.Int32(443)}), }, - endpointChangeTracker: NewEndpointChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil), - namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"}, - paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 5, 999, 999, []string{"host1"}, []*int32{pointer.Int32(80), pointer.Int32(443)}), - paramRemoveSlice: false, - expectedReturnVal: true, + endpointsChangeTracker: NewEndpointsChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil), + namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"}, + paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 5, 999, 999, []string{"host1"}, []*int32{pointer.Int32(80), pointer.Int32(443)}), + paramRemoveSlice: false, + expectedReturnVal: true, expectedCurrentChange: map[ServicePortName][]*BaseEndpointInfo{ makeServicePortName("ns1", "svc1", "port-0", v1.ProtocolTCP): { &BaseEndpointInfo{Endpoint: "10.0.1.1:80", IsLocal: true, Ready: true, Serving: true, Terminating: false}, @@ -1435,11 +1435,11 @@ func TestEndpointSliceUpdate(t *testing.T) { generateEndpointSlice("svc1", "ns1", 1, 3, 999, 999, []string{"host1", "host2"}, []*int32{pointer.Int32(80), pointer.Int32(443)}), generateEndpointSlice("svc1", "ns1", 2, 2, 999, 999, []string{"host1", "host2"}, []*int32{pointer.Int32(80), pointer.Int32(443)}), }, - endpointChangeTracker: NewEndpointChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil), - namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"}, - paramEndpointSlice: generateEndpointSliceWithOffset("svc1", "ns1", 3, 1, 5, 999, 999, []string{"host1"}, []*int32{pointer.Int32(80)}), - paramRemoveSlice: false, - expectedReturnVal: true, + endpointsChangeTracker: NewEndpointsChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil), + namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"}, + paramEndpointSlice: generateEndpointSliceWithOffset("svc1", "ns1", 3, 1, 5, 999, 999, []string{"host1"}, []*int32{pointer.Int32(80)}), + paramRemoveSlice: false, + expectedReturnVal: true, expectedCurrentChange: map[ServicePortName][]*BaseEndpointInfo{ makeServicePortName("ns1", "svc1", "port-0", v1.ProtocolTCP): { &BaseEndpointInfo{Endpoint: "10.0.1.1:80", IsLocal: true, Ready: true, Serving: true, Terminating: false}, @@ -1466,11 +1466,11 @@ func TestEndpointSliceUpdate(t *testing.T) { generateEndpointSlice("svc1", "ns1", 1, 3, 999, 999, []string{"host1", "host2"}, []*int32{pointer.Int32(80), pointer.Int32(443)}), generateEndpointSlice("svc1", "ns1", 2, 2, 999, 999, []string{"host1", "host2"}, []*int32{pointer.Int32(80), pointer.Int32(443)}), }, - endpointChangeTracker: NewEndpointChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil), - namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"}, - paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 5, 999, 999, []string{"host1"}, []*int32{pointer.Int32(80), pointer.Int32(443)}), - paramRemoveSlice: true, - expectedReturnVal: true, + endpointsChangeTracker: NewEndpointsChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil), + namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"}, + paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 5, 999, 999, []string{"host1"}, []*int32{pointer.Int32(80), pointer.Int32(443)}), + paramRemoveSlice: true, + expectedReturnVal: true, expectedCurrentChange: map[ServicePortName][]*BaseEndpointInfo{ makeServicePortName("ns1", "svc1", "port-0", v1.ProtocolTCP): { &BaseEndpointInfo{Endpoint: "10.0.2.1:80", IsLocal: false, Ready: true, Serving: true, Terminating: false}, @@ -1489,7 +1489,7 @@ func TestEndpointSliceUpdate(t *testing.T) { generateEndpointSlice("svc1", "ns1", 1, 5, 999, 999, []string{"host1"}, []*int32{pointer.Int32(80), pointer.Int32(443)}), generateEndpointSlice("svc1", "ns1", 2, 2, 999, 999, []string{"host1"}, []*int32{pointer.Int32(80), pointer.Int32(443)}), }, - endpointChangeTracker: NewEndpointChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil), + endpointsChangeTracker: NewEndpointsChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil), namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"}, paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 3, 5, 999, 999, []string{"host1"}, []*int32{pointer.Int32(80), pointer.Int32(443)}), paramRemoveSlice: true, @@ -1502,11 +1502,11 @@ func TestEndpointSliceUpdate(t *testing.T) { startingSlices: []*discovery.EndpointSlice{ generateEndpointSlice("svc1", "ns1", 1, 3, 999, 999, []string{"host1", "host2"}, []*int32{pointer.Int32(80), pointer.Int32(443)}), }, - endpointChangeTracker: NewEndpointChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil), - namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"}, - paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 3, 1, 999, []string{"host1"}, []*int32{pointer.Int32(80), pointer.Int32(443)}), - paramRemoveSlice: false, - expectedReturnVal: true, + endpointsChangeTracker: NewEndpointsChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil), + namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"}, + paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 3, 1, 999, []string{"host1"}, []*int32{pointer.Int32(80), pointer.Int32(443)}), + paramRemoveSlice: false, + expectedReturnVal: true, expectedCurrentChange: map[ServicePortName][]*BaseEndpointInfo{ makeServicePortName("ns1", "svc1", "port-0", v1.ProtocolTCP): { &BaseEndpointInfo{Endpoint: "10.0.1.1:80", IsLocal: true, Ready: false, Serving: false, Terminating: false}, @@ -1526,11 +1526,11 @@ func TestEndpointSliceUpdate(t *testing.T) { startingSlices: []*discovery.EndpointSlice{ generateEndpointSlice("svc1", "ns1", 1, 2, 1, 999, []string{"host1", "host2"}, []*int32{pointer.Int32(80), pointer.Int32(443)}), }, - endpointChangeTracker: NewEndpointChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil), - namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"}, - paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 2, 999, 999, []string{"host1"}, []*int32{pointer.Int32(80), pointer.Int32(443)}), - paramRemoveSlice: false, - expectedReturnVal: true, + endpointsChangeTracker: NewEndpointsChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil), + namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"}, + paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 2, 999, 999, []string{"host1"}, []*int32{pointer.Int32(80), pointer.Int32(443)}), + paramRemoveSlice: false, + expectedReturnVal: true, expectedCurrentChange: map[ServicePortName][]*BaseEndpointInfo{ makeServicePortName("ns1", "svc1", "port-0", v1.ProtocolTCP): { &BaseEndpointInfo{Endpoint: "10.0.1.1:80", IsLocal: true, Ready: true, Serving: true, Terminating: false}, @@ -1549,11 +1549,11 @@ func TestEndpointSliceUpdate(t *testing.T) { generateEndpointSlice("svc1", "ns1", 1, 3, 2, 999, []string{"host1"}, []*int32{pointer.Int32(80), pointer.Int32(443)}), generateEndpointSlice("svc1", "ns1", 2, 2, 2, 999, []string{"host1"}, []*int32{pointer.Int32(80), pointer.Int32(443)}), }, - endpointChangeTracker: NewEndpointChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil), - namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"}, - paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 3, 3, 999, []string{"host1"}, []*int32{pointer.Int32(80), pointer.Int32(443)}), - paramRemoveSlice: false, - expectedReturnVal: true, + endpointsChangeTracker: NewEndpointsChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil), + namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"}, + paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 3, 3, 999, []string{"host1"}, []*int32{pointer.Int32(80), pointer.Int32(443)}), + paramRemoveSlice: false, + expectedReturnVal: true, expectedCurrentChange: map[ServicePortName][]*BaseEndpointInfo{ makeServicePortName("ns1", "svc1", "port-0", v1.ProtocolTCP): { &BaseEndpointInfo{Endpoint: "10.0.1.1:80", IsLocal: true, Ready: true, Serving: true, Terminating: false}, @@ -1578,11 +1578,11 @@ func TestEndpointSliceUpdate(t *testing.T) { generateEndpointSlice("svc1", "ns1", 1, 3, 2, 2, []string{"host1"}, []*int32{pointer.Int32(80), pointer.Int32(443)}), generateEndpointSlice("svc1", "ns1", 2, 2, 2, 2, []string{"host1"}, []*int32{pointer.Int32(80), pointer.Int32(443)}), }, - endpointChangeTracker: NewEndpointChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil), - namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"}, - paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 3, 3, 2, []string{"host1"}, []*int32{pointer.Int32(80), pointer.Int32(443)}), - paramRemoveSlice: false, - expectedReturnVal: true, + endpointsChangeTracker: NewEndpointsChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil), + namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"}, + paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 3, 3, 2, []string{"host1"}, []*int32{pointer.Int32(80), pointer.Int32(443)}), + paramRemoveSlice: false, + expectedReturnVal: true, expectedCurrentChange: map[ServicePortName][]*BaseEndpointInfo{ makeServicePortName("ns1", "svc1", "port-0", v1.ProtocolTCP): { &BaseEndpointInfo{Endpoint: "10.0.1.1:80", IsLocal: true, Ready: true, Serving: true, Terminating: false}, @@ -1605,19 +1605,19 @@ func TestEndpointSliceUpdate(t *testing.T) { for name, tc := range testCases { t.Run(name, func(t *testing.T) { - initializeCache(tc.endpointChangeTracker.endpointSliceCache, tc.startingSlices) + initializeCache(tc.endpointsChangeTracker.endpointSliceCache, tc.startingSlices) - got := tc.endpointChangeTracker.EndpointSliceUpdate(tc.paramEndpointSlice, tc.paramRemoveSlice) + got := tc.endpointsChangeTracker.EndpointSliceUpdate(tc.paramEndpointSlice, tc.paramRemoveSlice) if !reflect.DeepEqual(got, tc.expectedReturnVal) { t.Errorf("EndpointSliceUpdate return value got: %v, want %v", got, tc.expectedReturnVal) } - pendingChanges := tc.endpointChangeTracker.PendingChanges() + pendingChanges := tc.endpointsChangeTracker.PendingChanges() if !pendingChanges.Equal(tc.expectedChangedEndpoints) { t.Errorf("expected changed endpoints %q, got %q", tc.expectedChangedEndpoints.UnsortedList(), pendingChanges.UnsortedList()) } - changes := tc.endpointChangeTracker.checkoutChanges() + changes := tc.endpointsChangeTracker.checkoutChanges() if tc.expectedCurrentChange == nil { if len(changes) != 0 { t.Errorf("Expected %s to have no changes", tc.namespacedName) @@ -1637,20 +1637,20 @@ func TestCheckoutChanges(t *testing.T) { svcPortName1 := ServicePortName{types.NamespacedName{Namespace: "ns1", Name: "svc1"}, "port-1", v1.ProtocolTCP} testCases := map[string]struct { - endpointChangeTracker *EndpointChangeTracker - expectedChanges []*endpointsChange - items map[types.NamespacedName]*endpointsChange - appliedSlices []*discovery.EndpointSlice - pendingSlices []*discovery.EndpointSlice + endpointsChangeTracker *EndpointsChangeTracker + expectedChanges []*endpointsChange + items map[types.NamespacedName]*endpointsChange + appliedSlices []*discovery.EndpointSlice + pendingSlices []*discovery.EndpointSlice }{ "empty slices": { - endpointChangeTracker: NewEndpointChangeTracker("", nil, v1.IPv4Protocol, nil, nil), - expectedChanges: []*endpointsChange{}, - appliedSlices: []*discovery.EndpointSlice{}, - pendingSlices: []*discovery.EndpointSlice{}, + endpointsChangeTracker: NewEndpointsChangeTracker("", nil, v1.IPv4Protocol, nil, nil), + expectedChanges: []*endpointsChange{}, + appliedSlices: []*discovery.EndpointSlice{}, + pendingSlices: []*discovery.EndpointSlice{}, }, "adding initial slice": { - endpointChangeTracker: NewEndpointChangeTracker("", nil, v1.IPv4Protocol, nil, nil), + endpointsChangeTracker: NewEndpointsChangeTracker("", nil, v1.IPv4Protocol, nil, nil), expectedChanges: []*endpointsChange{{ previous: EndpointsMap{}, current: EndpointsMap{ @@ -1663,7 +1663,7 @@ func TestCheckoutChanges(t *testing.T) { }, }, "removing port in update": { - endpointChangeTracker: NewEndpointChangeTracker("", nil, v1.IPv4Protocol, nil, nil), + endpointsChangeTracker: NewEndpointsChangeTracker("", nil, v1.IPv4Protocol, nil, nil), expectedChanges: []*endpointsChange{{ previous: EndpointsMap{ svcPortName0: []Endpoint{newTestEp("10.0.1.1:80", "host1", true, true, false), newTestEp("10.0.1.2:80", "host1", true, true, false), newTestEp("10.0.1.3:80", "host1", false, false, false)}, @@ -1685,13 +1685,13 @@ func TestCheckoutChanges(t *testing.T) { for name, tc := range testCases { t.Run(name, func(t *testing.T) { for _, slice := range tc.appliedSlices { - tc.endpointChangeTracker.EndpointSliceUpdate(slice, false) + tc.endpointsChangeTracker.EndpointSliceUpdate(slice, false) } - tc.endpointChangeTracker.checkoutChanges() + tc.endpointsChangeTracker.checkoutChanges() for _, slice := range tc.pendingSlices { - tc.endpointChangeTracker.EndpointSliceUpdate(slice, false) + tc.endpointsChangeTracker.EndpointSliceUpdate(slice, false) } - changes := tc.endpointChangeTracker.checkoutChanges() + changes := tc.endpointsChangeTracker.checkoutChanges() if len(tc.expectedChanges) != len(changes) { t.Fatalf("Expected %d changes, got %d", len(tc.expectedChanges), len(changes)) @@ -1730,7 +1730,7 @@ func compareEndpointsMapsStr(t *testing.T, newMap EndpointsMap, expected map[Ser for i := range expected[x] { newEp, ok := newMap[x][i].(*BaseEndpointInfo) if !ok { - t.Fatalf("Failed to cast endpointsInfo") + t.Fatalf("Failed to cast endpointInfo") } if !endpointEqual(newEp, expected[x][i]) { t.Fatalf("expected new[%v][%d] to be %v, got %v"+ diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index e8c252ccd58..95001d38c2a 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -122,15 +122,15 @@ func newServiceInfo(port *v1.ServicePort, service *v1.Service, bsvcPortInfo *pro } // internal struct for endpoints information -type endpointsInfo struct { +type endpointInfo struct { *proxy.BaseEndpointInfo ChainName utiliptables.Chain } -// returns a new proxy.Endpoint which abstracts a endpointsInfo +// returns a new proxy.Endpoint which abstracts a endpointInfo func newEndpointInfo(baseInfo *proxy.BaseEndpointInfo, svcPortName *proxy.ServicePortName) proxy.Endpoint { - return &endpointsInfo{ + return &endpointInfo{ BaseEndpointInfo: baseInfo, ChainName: servicePortEndpointChainName(svcPortName.String(), strings.ToLower(string(svcPortName.Protocol)), baseInfo.Endpoint), } @@ -143,7 +143,7 @@ type Proxier struct { // services that happened since iptables was synced. For a single object, // changes are accumulated, i.e. previous is state from before all of them, // current is state after applying all of those. - endpointsChanges *proxy.EndpointChangeTracker + endpointsChanges *proxy.EndpointsChangeTracker serviceChanges *proxy.ServiceChangeTracker mu sync.Mutex // protects the following fields @@ -265,7 +265,7 @@ func NewProxier(ipFamily v1.IPFamily, svcPortMap: make(proxy.ServicePortMap), serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, ipFamily, recorder, nil), endpointsMap: make(proxy.EndpointsMap), - endpointsChanges: proxy.NewEndpointChangeTracker(hostname, newEndpointInfo, ipFamily, recorder, nil), + endpointsChanges: proxy.NewEndpointsChangeTracker(hostname, newEndpointInfo, ipFamily, recorder, nil), needFullSync: true, syncPeriod: syncPeriod, iptables: ipt, @@ -952,7 +952,7 @@ func (proxier *Proxier) syncProxyRules() { // Note the endpoint chains that will be used for _, ep := range allLocallyReachableEndpoints { - if epInfo, ok := ep.(*endpointsInfo); ok { + if epInfo, ok := ep.(*endpointInfo); ok { activeNATChains[epInfo.ChainName] = true } } @@ -1345,9 +1345,9 @@ func (proxier *Proxier) syncProxyRules() { // Generate the per-endpoint chains. for _, ep := range allLocallyReachableEndpoints { - epInfo, ok := ep.(*endpointsInfo) + epInfo, ok := ep.(*endpointInfo) if !ok { - klog.ErrorS(nil, "Failed to cast endpointsInfo", "endpointsInfo", ep) + klog.ErrorS(nil, "Failed to cast endpointInfo", "endpointInfo", ep) continue } @@ -1556,7 +1556,7 @@ func (proxier *Proxier) writeServiceToEndpointRules(natRules proxyutil.LineBuffe // First write session affinity rules, if applicable. if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP { for _, ep := range endpoints { - epInfo, ok := ep.(*endpointsInfo) + epInfo, ok := ep.(*endpointInfo) if !ok { continue } @@ -1578,7 +1578,7 @@ func (proxier *Proxier) writeServiceToEndpointRules(natRules proxyutil.LineBuffe // Now write loadbalancing rules. numEndpoints := len(endpoints) for i, ep := range endpoints { - epInfo, ok := ep.(*endpointsInfo) + epInfo, ok := ep.(*endpointInfo) if !ok { continue } diff --git a/pkg/proxy/iptables/proxier_test.go b/pkg/proxy/iptables/proxier_test.go index 6dc6a2c8335..3ba9d26581a 100644 --- a/pkg/proxy/iptables/proxier_test.go +++ b/pkg/proxy/iptables/proxier_test.go @@ -330,7 +330,7 @@ func NewFakeProxier(ipt utiliptables.Interface) *Proxier { svcPortMap: make(proxy.ServicePortMap), serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, ipfamily, nil, nil), endpointsMap: make(proxy.EndpointsMap), - endpointsChanges: proxy.NewEndpointChangeTracker(testHostname, newEndpointInfo, ipfamily, nil, nil), + endpointsChanges: proxy.NewEndpointsChangeTracker(testHostname, newEndpointInfo, ipfamily, nil, nil), needFullSync: true, iptables: ipt, masqueradeMark: "0x4000", @@ -3371,7 +3371,7 @@ func makeServiceMap(proxier *Proxier, allServices ...*v1.Service) { proxier.servicesSynced = true } -func compareEndpointsMapsExceptChainName(t *testing.T, tci int, newMap proxy.EndpointsMap, expected map[proxy.ServicePortName][]*endpointsInfo) { +func compareEndpointsMapsExceptChainName(t *testing.T, tci int, newMap proxy.EndpointsMap, expected map[proxy.ServicePortName][]*endpointInfo) { if len(newMap) != len(expected) { t.Errorf("[%d] expected %d results, got %d: %v", tci, len(expected), len(newMap), newMap) } @@ -3380,9 +3380,9 @@ func compareEndpointsMapsExceptChainName(t *testing.T, tci int, newMap proxy.End t.Errorf("[%d] expected %d endpoints for %v, got %d", tci, len(expected[x]), x, len(newMap[x])) } else { for i := range expected[x] { - newEp, ok := newMap[x][i].(*endpointsInfo) + newEp, ok := newMap[x][i].(*endpointInfo) if !ok { - t.Errorf("Failed to cast endpointsInfo") + t.Errorf("Failed to cast endpointInfo") continue } if newEp.Endpoint != expected[x][i].Endpoint || @@ -3731,16 +3731,16 @@ func TestUpdateEndpointsMap(t *testing.T) { name string previousEndpoints []*discovery.EndpointSlice currentEndpoints []*discovery.EndpointSlice - oldEndpoints map[proxy.ServicePortName][]*endpointsInfo - expectedResult map[proxy.ServicePortName][]*endpointsInfo + oldEndpoints map[proxy.ServicePortName][]*endpointInfo + expectedResult map[proxy.ServicePortName][]*endpointInfo expectedDeletedUDPEndpoints []proxy.ServiceEndpoint expectedNewlyActiveUDPServices map[proxy.ServicePortName]bool expectedLocalEndpoints map[types.NamespacedName]int }{{ // Case[0]: nothing name: "nothing", - oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{}, - expectedResult: map[proxy.ServicePortName][]*endpointsInfo{}, + oldEndpoints: map[proxy.ServicePortName][]*endpointInfo{}, + expectedResult: map[proxy.ServicePortName][]*endpointInfo{}, expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{}, expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{}, expectedLocalEndpoints: map[types.NamespacedName]int{}, @@ -3749,12 +3749,12 @@ func TestUpdateEndpointsMap(t *testing.T) { name: "no change, named port, local", previousEndpoints: namedPortLocal, currentEndpoints: namedPortLocal, - oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ + oldEndpoints: map[proxy.ServicePortName][]*endpointInfo{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.1:11", IsLocal: true, Ready: true, Serving: true, Terminating: false}}, }, }, - expectedResult: map[proxy.ServicePortName][]*endpointsInfo{ + expectedResult: map[proxy.ServicePortName][]*endpointInfo{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.1:11", IsLocal: true, Ready: true, Serving: true, Terminating: false}}, }, @@ -3769,7 +3769,7 @@ func TestUpdateEndpointsMap(t *testing.T) { name: "no change, multiple subsets", previousEndpoints: multipleSubsets, currentEndpoints: multipleSubsets, - oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ + oldEndpoints: map[proxy.ServicePortName][]*endpointInfo{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}}, }, @@ -3777,7 +3777,7 @@ func TestUpdateEndpointsMap(t *testing.T) { {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.2:12", IsLocal: false, Ready: true, Serving: true, Terminating: false}}, }, }, - expectedResult: map[proxy.ServicePortName][]*endpointsInfo{ + expectedResult: map[proxy.ServicePortName][]*endpointInfo{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}}, }, @@ -3793,7 +3793,7 @@ func TestUpdateEndpointsMap(t *testing.T) { name: "no change, multiple subsets, multiple ports, local", previousEndpoints: multipleSubsetsMultiplePortsLocal, currentEndpoints: multipleSubsetsMultiplePortsLocal, - oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ + oldEndpoints: map[proxy.ServicePortName][]*endpointInfo{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.1:11", IsLocal: true, Ready: true, Serving: true, Terminating: false}}, }, @@ -3804,7 +3804,7 @@ func TestUpdateEndpointsMap(t *testing.T) { {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.3:13", IsLocal: false, Ready: true, Serving: true, Terminating: false}}, }, }, - expectedResult: map[proxy.ServicePortName][]*endpointsInfo{ + expectedResult: map[proxy.ServicePortName][]*endpointInfo{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.1:11", IsLocal: true, Ready: true, Serving: true, Terminating: false}}, }, @@ -3825,7 +3825,7 @@ func TestUpdateEndpointsMap(t *testing.T) { name: "no change, multiple endpoints, subsets, IPs, and ports", previousEndpoints: multipleSubsetsIPsPorts, currentEndpoints: multipleSubsetsIPsPorts, - oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ + oldEndpoints: map[proxy.ServicePortName][]*endpointInfo{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}}, {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.2:11", IsLocal: true, Ready: true, Serving: true, Terminating: false}}, @@ -3851,7 +3851,7 @@ func TestUpdateEndpointsMap(t *testing.T) { {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.2.2.2:22", IsLocal: true, Ready: true, Serving: true, Terminating: false}}, }, }, - expectedResult: map[proxy.ServicePortName][]*endpointsInfo{ + expectedResult: map[proxy.ServicePortName][]*endpointInfo{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}}, {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.2:11", IsLocal: true, Ready: true, Serving: true, Terminating: false}}, @@ -3888,8 +3888,8 @@ func TestUpdateEndpointsMap(t *testing.T) { name: "add an Endpoints", previousEndpoints: []*discovery.EndpointSlice{nil}, currentEndpoints: namedPortLocal, - oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{}, - expectedResult: map[proxy.ServicePortName][]*endpointsInfo{ + oldEndpoints: map[proxy.ServicePortName][]*endpointInfo{}, + expectedResult: map[proxy.ServicePortName][]*endpointInfo{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.1:11", IsLocal: true, Ready: true, Serving: true, Terminating: false}}, }, @@ -3906,12 +3906,12 @@ func TestUpdateEndpointsMap(t *testing.T) { name: "remove an Endpoints", previousEndpoints: namedPortLocal, currentEndpoints: []*discovery.EndpointSlice{nil}, - oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ + oldEndpoints: map[proxy.ServicePortName][]*endpointInfo{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.1:11", IsLocal: true, Ready: true, Serving: true, Terminating: false}}, }, }, - expectedResult: map[proxy.ServicePortName][]*endpointsInfo{}, + expectedResult: map[proxy.ServicePortName][]*endpointInfo{}, expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{{ Endpoint: "10.1.1.1:11", ServicePortName: makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP), @@ -3923,12 +3923,12 @@ func TestUpdateEndpointsMap(t *testing.T) { name: "add an IP and port", previousEndpoints: namedPort, currentEndpoints: namedPortsLocalNoLocal, - oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ + oldEndpoints: map[proxy.ServicePortName][]*endpointInfo{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}}, }, }, - expectedResult: map[proxy.ServicePortName][]*endpointsInfo{ + expectedResult: map[proxy.ServicePortName][]*endpointInfo{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}}, {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.2:11", IsLocal: true, Ready: true, Serving: true, Terminating: false}}, @@ -3950,7 +3950,7 @@ func TestUpdateEndpointsMap(t *testing.T) { name: "remove an IP and port", previousEndpoints: namedPortsLocalNoLocal, currentEndpoints: namedPort, - oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ + oldEndpoints: map[proxy.ServicePortName][]*endpointInfo{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}}, {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.2:11", IsLocal: true, Ready: true, Serving: true, Terminating: false}}, @@ -3960,7 +3960,7 @@ func TestUpdateEndpointsMap(t *testing.T) { {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.2:12", IsLocal: true, Ready: true, Serving: true, Terminating: false}}, }, }, - expectedResult: map[proxy.ServicePortName][]*endpointsInfo{ + expectedResult: map[proxy.ServicePortName][]*endpointInfo{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}}, }, @@ -3982,12 +3982,12 @@ func TestUpdateEndpointsMap(t *testing.T) { name: "add a subset", previousEndpoints: []*discovery.EndpointSlice{namedPort[0], nil}, currentEndpoints: multipleSubsetsWithLocal, - oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ + oldEndpoints: map[proxy.ServicePortName][]*endpointInfo{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}}, }, }, - expectedResult: map[proxy.ServicePortName][]*endpointsInfo{ + expectedResult: map[proxy.ServicePortName][]*endpointInfo{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}}, }, @@ -4007,7 +4007,7 @@ func TestUpdateEndpointsMap(t *testing.T) { name: "remove a subset", previousEndpoints: multipleSubsets, currentEndpoints: []*discovery.EndpointSlice{namedPort[0], nil}, - oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ + oldEndpoints: map[proxy.ServicePortName][]*endpointInfo{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}}, }, @@ -4015,7 +4015,7 @@ func TestUpdateEndpointsMap(t *testing.T) { {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.2:12", IsLocal: false, Ready: true, Serving: true, Terminating: false}}, }, }, - expectedResult: map[proxy.ServicePortName][]*endpointsInfo{ + expectedResult: map[proxy.ServicePortName][]*endpointInfo{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}}, }, @@ -4031,12 +4031,12 @@ func TestUpdateEndpointsMap(t *testing.T) { name: "rename a port", previousEndpoints: namedPort, currentEndpoints: namedPortRenamed, - oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ + oldEndpoints: map[proxy.ServicePortName][]*endpointInfo{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}}, }, }, - expectedResult: map[proxy.ServicePortName][]*endpointsInfo{ + expectedResult: map[proxy.ServicePortName][]*endpointInfo{ makeServicePortName("ns1", "ep1", "p11-2", v1.ProtocolUDP): { {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}}, }, @@ -4054,12 +4054,12 @@ func TestUpdateEndpointsMap(t *testing.T) { name: "renumber a port", previousEndpoints: namedPort, currentEndpoints: namedPortRenumbered, - oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ + oldEndpoints: map[proxy.ServicePortName][]*endpointInfo{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}}, }, }, - expectedResult: map[proxy.ServicePortName][]*endpointsInfo{ + expectedResult: map[proxy.ServicePortName][]*endpointInfo{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.1:22", IsLocal: false, Ready: true, Serving: true, Terminating: false}}, }, @@ -4075,7 +4075,7 @@ func TestUpdateEndpointsMap(t *testing.T) { name: "complex add and remove", previousEndpoints: complexBefore, currentEndpoints: complexAfter, - oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ + oldEndpoints: map[proxy.ServicePortName][]*endpointInfo{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}}, }, @@ -4094,7 +4094,7 @@ func TestUpdateEndpointsMap(t *testing.T) { {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.4.4.6:45", IsLocal: true, Ready: true, Serving: true, Terminating: false}}, }, }, - expectedResult: map[proxy.ServicePortName][]*endpointsInfo{ + expectedResult: map[proxy.ServicePortName][]*endpointInfo{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.11:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}}, {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}}, @@ -4141,8 +4141,8 @@ func TestUpdateEndpointsMap(t *testing.T) { name: "change from 0 endpoint address to 1 unnamed port", previousEndpoints: emptyEndpointSlices, currentEndpoints: namedPort, - oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{}, - expectedResult: map[proxy.ServicePortName][]*endpointsInfo{ + oldEndpoints: map[proxy.ServicePortName][]*endpointInfo{}, + expectedResult: map[proxy.ServicePortName][]*endpointInfo{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}}, }, diff --git a/pkg/proxy/ipvs/proxier.go b/pkg/proxy/ipvs/proxier.go index 87ede823c60..8dcdd4cd46d 100644 --- a/pkg/proxy/ipvs/proxier.go +++ b/pkg/proxy/ipvs/proxier.go @@ -219,7 +219,7 @@ type Proxier struct { // services that happened since last syncProxyRules call. For a single object, // changes are accumulated, i.e. previous is state from before all of them, // current is state after applying all of those. - endpointsChanges *proxy.EndpointChangeTracker + endpointsChanges *proxy.EndpointsChangeTracker serviceChanges *proxy.ServiceChangeTracker mu sync.Mutex // protects the following fields @@ -413,7 +413,7 @@ func NewProxier(ipFamily v1.IPFamily, svcPortMap: make(proxy.ServicePortMap), serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, ipFamily, recorder, nil), endpointsMap: make(proxy.EndpointsMap), - endpointsChanges: proxy.NewEndpointChangeTracker(hostname, nil, ipFamily, recorder, nil), + endpointsChanges: proxy.NewEndpointsChangeTracker(hostname, nil, ipFamily, recorder, nil), initialSync: true, syncPeriod: syncPeriod, minSyncPeriod: minSyncPeriod, diff --git a/pkg/proxy/ipvs/proxier_test.go b/pkg/proxy/ipvs/proxier_test.go index 569d0179ec0..05064a76fc8 100644 --- a/pkg/proxy/ipvs/proxier_test.go +++ b/pkg/proxy/ipvs/proxier_test.go @@ -154,7 +154,7 @@ func NewFakeProxier(ipt utiliptables.Interface, ipvs utilipvs.Interface, ipset u svcPortMap: make(proxy.ServicePortMap), serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, ipFamily, nil, nil), endpointsMap: make(proxy.EndpointsMap), - endpointsChanges: proxy.NewEndpointChangeTracker(testHostname, nil, ipFamily, nil, nil), + endpointsChanges: proxy.NewEndpointsChangeTracker(testHostname, nil, ipFamily, nil, nil), excludeCIDRs: excludeCIDRs, iptables: ipt, ipvs: ipvs, @@ -4358,7 +4358,7 @@ raid10 57344 0 - Live 0xffffffffc0597000`, } // The majority of EndpointSlice specific tests are not ipvs specific and focus on -// the shared EndpointChangeTracker and EndpointSliceCache. This test ensures that the +// the shared EndpointsChangeTracker and EndpointSliceCache. This test ensures that the // ipvs proxier supports translating EndpointSlices to ipvs output. func TestEndpointSliceE2E(t *testing.T) { ipt := iptablestest.NewFake() diff --git a/pkg/proxy/service_test.go b/pkg/proxy/service_test.go index e8d93f2b597..f21b268dff3 100644 --- a/pkg/proxy/service_test.go +++ b/pkg/proxy/service_test.go @@ -595,7 +595,7 @@ func TestServiceToServiceMap(t *testing.T) { } type FakeProxier struct { - endpointsChanges *EndpointChangeTracker + endpointsChanges *EndpointsChangeTracker serviceChanges *ServiceChangeTracker svcPortMap ServicePortMap endpointsMap EndpointsMap @@ -607,7 +607,7 @@ func newFakeProxier(ipFamily v1.IPFamily, t time.Time) *FakeProxier { svcPortMap: make(ServicePortMap), serviceChanges: NewServiceChangeTracker(nil, ipFamily, nil, nil), endpointsMap: make(EndpointsMap), - endpointsChanges: &EndpointChangeTracker{ + endpointsChanges: &EndpointsChangeTracker{ lastChangeTriggerTimes: make(map[types.NamespacedName][]time.Time), trackerStartTime: t, processEndpointsMapChange: nil, diff --git a/pkg/proxy/winkernel/hns.go b/pkg/proxy/winkernel/hns.go index 9fa83e14ef9..b11eb7cacd4 100644 --- a/pkg/proxy/winkernel/hns.go +++ b/pkg/proxy/winkernel/hns.go @@ -32,13 +32,13 @@ import ( type HostNetworkService interface { getNetworkByName(name string) (*hnsNetworkInfo, error) - getAllEndpointsByNetwork(networkName string) (map[string]*endpointsInfo, error) - getEndpointByID(id string) (*endpointsInfo, error) - getEndpointByIpAddress(ip string, networkName string) (*endpointsInfo, error) - getEndpointByName(id string) (*endpointsInfo, error) - createEndpoint(ep *endpointsInfo, networkName string) (*endpointsInfo, error) + getAllEndpointsByNetwork(networkName string) (map[string]*endpointInfo, error) + getEndpointByID(id string) (*endpointInfo, error) + getEndpointByIpAddress(ip string, networkName string) (*endpointInfo, error) + getEndpointByName(id string) (*endpointInfo, error) + createEndpoint(ep *endpointInfo, networkName string) (*endpointInfo, error) deleteEndpoint(hnsID string) error - getLoadBalancer(endpoints []endpointsInfo, flags loadBalancerFlags, sourceVip string, vip string, protocol uint16, internalPort uint16, externalPort uint16, previousLoadBalancers map[loadBalancerIdentifier]*loadBalancerInfo) (*loadBalancerInfo, error) + getLoadBalancer(endpoints []endpointInfo, flags loadBalancerFlags, sourceVip string, vip string, protocol uint16, internalPort uint16, externalPort uint16, previousLoadBalancers map[loadBalancerIdentifier]*loadBalancerInfo) (*loadBalancerInfo, error) getAllLoadBalancers() (map[loadBalancerIdentifier]*loadBalancerInfo, error) deleteLoadBalancer(hnsID string) error } @@ -87,7 +87,7 @@ func (hns hns) getNetworkByName(name string) (*hnsNetworkInfo, error) { }, nil } -func (hns hns) getAllEndpointsByNetwork(networkName string) (map[string]*(endpointsInfo), error) { +func (hns hns) getAllEndpointsByNetwork(networkName string) (map[string]*(endpointInfo), error) { hcnnetwork, err := hns.hcn.GetNetworkByName(networkName) if err != nil { klog.ErrorS(err, "failed to get HNS network by name", "name", networkName) @@ -97,7 +97,7 @@ func (hns hns) getAllEndpointsByNetwork(networkName string) (map[string]*(endpoi if err != nil { return nil, fmt.Errorf("failed to list endpoints: %w", err) } - endpointInfos := make(map[string]*(endpointsInfo)) + endpointInfos := make(map[string]*(endpointInfo)) for _, ep := range endpoints { if len(ep.IpConfigurations) == 0 { @@ -108,7 +108,7 @@ func (hns hns) getAllEndpointsByNetwork(networkName string) (map[string]*(endpoi // Add to map with key endpoint ID or IP address // Storing this is expensive in terms of memory, however there is a bug in Windows Server 2019 that can cause two endpoints to be created with the same IP address. // TODO: Store by IP only and remove any lookups by endpoint ID. - endpointInfos[ep.Id] = &endpointsInfo{ + endpointInfos[ep.Id] = &endpointInfo{ ip: ep.IpConfigurations[0].IpAddress, isLocal: uint32(ep.Flags&hcn.EndpointFlagsRemoteEndpoint) == 0, macAddress: ep.MacAddress, @@ -127,7 +127,7 @@ func (hns hns) getAllEndpointsByNetwork(networkName string) (map[string]*(endpoi // If ipFamilyPolicy is RequireDualStack or PreferDualStack, then there will be 2 IPS (iPV4 and IPV6) // in the endpoint list - endpointDualstack := &endpointsInfo{ + endpointDualstack := &endpointInfo{ ip: ep.IpConfigurations[1].IpAddress, isLocal: uint32(ep.Flags&hcn.EndpointFlagsRemoteEndpoint) == 0, macAddress: ep.MacAddress, @@ -145,12 +145,12 @@ func (hns hns) getAllEndpointsByNetwork(networkName string) (map[string]*(endpoi return endpointInfos, nil } -func (hns hns) getEndpointByID(id string) (*endpointsInfo, error) { +func (hns hns) getEndpointByID(id string) (*endpointInfo, error) { hnsendpoint, err := hns.hcn.GetEndpointByID(id) if err != nil { return nil, err } - return &endpointsInfo{ //TODO: fill out PA + return &endpointInfo{ //TODO: fill out PA ip: hnsendpoint.IpConfigurations[0].IpAddress, isLocal: uint32(hnsendpoint.Flags&hcn.EndpointFlagsRemoteEndpoint) == 0, //TODO: Change isLocal to isRemote macAddress: hnsendpoint.MacAddress, @@ -158,7 +158,7 @@ func (hns hns) getEndpointByID(id string) (*endpointsInfo, error) { hns: hns, }, nil } -func (hns hns) getEndpointByIpAddress(ip string, networkName string) (*endpointsInfo, error) { +func (hns hns) getEndpointByIpAddress(ip string, networkName string) (*endpointInfo, error) { hnsnetwork, err := hns.hcn.GetNetworkByName(networkName) if err != nil { klog.ErrorS(err, "Error getting network by name") @@ -179,7 +179,7 @@ func (hns hns) getEndpointByIpAddress(ip string, networkName string) (*endpoints } } if equal && strings.EqualFold(endpoint.HostComputeNetwork, hnsnetwork.Id) { - return &endpointsInfo{ + return &endpointInfo{ ip: ip, isLocal: uint32(endpoint.Flags&hcn.EndpointFlagsRemoteEndpoint) == 0, //TODO: Change isLocal to isRemote macAddress: endpoint.MacAddress, @@ -190,12 +190,12 @@ func (hns hns) getEndpointByIpAddress(ip string, networkName string) (*endpoints } return nil, fmt.Errorf("Endpoint %v not found on network %s", ip, networkName) } -func (hns hns) getEndpointByName(name string) (*endpointsInfo, error) { +func (hns hns) getEndpointByName(name string) (*endpointInfo, error) { hnsendpoint, err := hns.hcn.GetEndpointByName(name) if err != nil { return nil, err } - return &endpointsInfo{ //TODO: fill out PA + return &endpointInfo{ //TODO: fill out PA ip: hnsendpoint.IpConfigurations[0].IpAddress, isLocal: uint32(hnsendpoint.Flags&hcn.EndpointFlagsRemoteEndpoint) == 0, //TODO: Change isLocal to isRemote macAddress: hnsendpoint.MacAddress, @@ -203,7 +203,7 @@ func (hns hns) getEndpointByName(name string) (*endpointsInfo, error) { hns: hns, }, nil } -func (hns hns) createEndpoint(ep *endpointsInfo, networkName string) (*endpointsInfo, error) { +func (hns hns) createEndpoint(ep *endpointInfo, networkName string) (*endpointInfo, error) { hnsNetwork, err := hns.hcn.GetNetworkByName(networkName) if err != nil { return nil, err @@ -251,7 +251,7 @@ func (hns hns) createEndpoint(ep *endpointsInfo, networkName string) (*endpoints return nil, err } } - return &endpointsInfo{ + return &endpointInfo{ ip: createdEndpoint.IpConfigurations[0].IpAddress, isLocal: uint32(createdEndpoint.Flags&hcn.EndpointFlagsRemoteEndpoint) == 0, macAddress: createdEndpoint.MacAddress, @@ -273,7 +273,7 @@ func (hns hns) deleteEndpoint(hnsID string) error { } // findLoadBalancerID will construct a id from the provided loadbalancer fields -func findLoadBalancerID(endpoints []endpointsInfo, vip string, protocol, internalPort, externalPort uint16) (loadBalancerIdentifier, error) { +func findLoadBalancerID(endpoints []endpointInfo, vip string, protocol, internalPort, externalPort uint16) (loadBalancerIdentifier, error) { // Compute hash from backends (endpoint IDs) hash, err := hashEndpoints(endpoints) if err != nil { @@ -315,7 +315,7 @@ func (hns hns) getAllLoadBalancers() (map[loadBalancerIdentifier]*loadBalancerIn return loadBalancers, nil } -func (hns hns) getLoadBalancer(endpoints []endpointsInfo, flags loadBalancerFlags, sourceVip string, vip string, protocol uint16, internalPort uint16, externalPort uint16, previousLoadBalancers map[loadBalancerIdentifier]*loadBalancerInfo) (*loadBalancerInfo, error) { +func (hns hns) getLoadBalancer(endpoints []endpointInfo, flags loadBalancerFlags, sourceVip string, vip string, protocol uint16, internalPort uint16, externalPort uint16, previousLoadBalancers map[loadBalancerIdentifier]*loadBalancerInfo) (*loadBalancerInfo, error) { var id loadBalancerIdentifier vips := []string{} // Compute hash from backends (endpoint IDs) @@ -424,7 +424,7 @@ func (hns hns) deleteLoadBalancer(hnsID string) error { } // Calculates a hash from the given endpoint IDs. -func hashEndpoints[T string | endpointsInfo](endpoints []T) (hash [20]byte, err error) { +func hashEndpoints[T string | endpointInfo](endpoints []T) (hash [20]byte, err error) { var id string // Recover in case something goes wrong. Return error and null byte array. defer func() { @@ -437,7 +437,7 @@ func hashEndpoints[T string | endpointsInfo](endpoints []T) (hash [20]byte, err // Iterate over endpoints, compute hash for _, ep := range endpoints { switch x := any(ep).(type) { - case endpointsInfo: + case endpointInfo: id = strings.ToUpper(x.hnsID) case string: id = x diff --git a/pkg/proxy/winkernel/hns_test.go b/pkg/proxy/winkernel/hns_test.go index 6e674425f78..1567d467c09 100644 --- a/pkg/proxy/winkernel/hns_test.go +++ b/pkg/proxy/winkernel/hns_test.go @@ -203,7 +203,7 @@ func TestCreateEndpointLocal(t *testing.T) { hns := hns{hcn: newHcnImpl()} Network := mustTestNetwork(t) - endpoint := &endpointsInfo{ + endpoint := &endpointInfo{ ip: epIpAddress, macAddress: epMacAddress, isLocal: true, @@ -242,7 +242,7 @@ func TestCreateEndpointRemote(t *testing.T) { Network := mustTestNetwork(t) providerAddress := epPaAddress - endpoint := &endpointsInfo{ + endpoint := &endpointInfo{ ip: epIpAddressRemote, macAddress: epMacAddress, isLocal: false, @@ -350,11 +350,11 @@ func TestGetLoadBalancerExisting(t *testing.T) { if err != nil { t.Error(err) } - endpoint := &endpointsInfo{ + endpoint := &endpointInfo{ ip: Endpoint.IpConfigurations[0].IpAddress, hnsID: Endpoint.Id, } - endpoints := []endpointsInfo{*endpoint} + endpoints := []endpointInfo{*endpoint} hash, err := hashEndpoints(endpoints) if err != nil { t.Error(err) @@ -409,11 +409,11 @@ func TestGetLoadBalancerNew(t *testing.T) { if err != nil { t.Error(err) } - endpoint := &endpointsInfo{ + endpoint := &endpointInfo{ ip: Endpoint.IpConfigurations[0].IpAddress, hnsID: Endpoint.Id, } - endpoints := []endpointsInfo{*endpoint} + endpoints := []endpointInfo{*endpoint} lb, err := hns.getLoadBalancer(endpoints, loadBalancerFlags{}, sourceVip, serviceVip, protocol, internalPort, externalPort, lbs) if err != nil { t.Error(err) @@ -523,7 +523,7 @@ func TestHashEndpoints(t *testing.T) { if err != nil { t.Error(err) } - endpointInfoA := &endpointsInfo{ + endpointInfoA := &endpointInfo{ ip: endpointA.IpConfigurations[0].IpAddress, hnsID: endpointA.Id, } @@ -543,12 +543,12 @@ func TestHashEndpoints(t *testing.T) { if err != nil { t.Error(err) } - endpointInfoB := &endpointsInfo{ + endpointInfoB := &endpointInfo{ ip: endpointB.IpConfigurations[0].IpAddress, hnsID: endpointB.Id, } - endpoints := []endpointsInfo{*endpointInfoA, *endpointInfoB} - endpointsReverse := []endpointsInfo{*endpointInfoB, *endpointInfoA} + endpoints := []endpointInfo{*endpointInfoA, *endpointInfoB} + endpointsReverse := []endpointInfo{*endpointInfoB, *endpointInfoA} h1, err := hashEndpoints(endpoints) if err != nil { t.Error(err) diff --git a/pkg/proxy/winkernel/proxier.go b/pkg/proxy/winkernel/proxier.go index 642a60cdece..522a15584a2 100644 --- a/pkg/proxy/winkernel/proxier.go +++ b/pkg/proxy/winkernel/proxier.go @@ -123,7 +123,7 @@ type serviceInfo struct { hnsID string nodePorthnsID string policyApplied bool - remoteEndpoint *endpointsInfo + remoteEndpoint *endpointInfo hns HostNetworkService preserveDIP bool localTrafficDSR bool @@ -273,7 +273,7 @@ func (t DualStackCompatTester) DualStackCompatible(networkName string) bool { } // internal struct for endpoints information -type endpointsInfo struct { +type endpointInfo struct { ip string port uint16 isLocal bool @@ -290,57 +290,57 @@ type endpointsInfo struct { } // String is part of proxy.Endpoint interface. -func (info *endpointsInfo) String() string { +func (info *endpointInfo) String() string { return net.JoinHostPort(info.ip, strconv.Itoa(int(info.port))) } // GetIsLocal is part of proxy.Endpoint interface. -func (info *endpointsInfo) GetIsLocal() bool { +func (info *endpointInfo) GetIsLocal() bool { return info.isLocal } // IsReady returns true if an endpoint is ready and not terminating. -func (info *endpointsInfo) IsReady() bool { +func (info *endpointInfo) IsReady() bool { return info.ready } // IsServing returns true if an endpoint is ready, regardless of it's terminating state. -func (info *endpointsInfo) IsServing() bool { +func (info *endpointInfo) IsServing() bool { return info.serving } // IsTerminating returns true if an endpoint is terminating. -func (info *endpointsInfo) IsTerminating() bool { +func (info *endpointInfo) IsTerminating() bool { return info.terminating } // GetZoneHint returns the zone hint for the endpoint. -func (info *endpointsInfo) GetZoneHints() sets.Set[string] { +func (info *endpointInfo) GetZoneHints() sets.Set[string] { return sets.Set[string]{} } // IP returns just the IP part of the endpoint, it's a part of proxy.Endpoint interface. -func (info *endpointsInfo) IP() string { +func (info *endpointInfo) IP() string { return info.ip } // Port returns just the Port part of the endpoint. -func (info *endpointsInfo) Port() (int, error) { +func (info *endpointInfo) Port() (int, error) { return int(info.port), nil } // Equal is part of proxy.Endpoint interface. -func (info *endpointsInfo) Equal(other proxy.Endpoint) bool { +func (info *endpointInfo) Equal(other proxy.Endpoint) bool { return info.String() == other.String() && info.GetIsLocal() == other.GetIsLocal() } // GetNodeName returns the NodeName for this endpoint. -func (info *endpointsInfo) GetNodeName() string { +func (info *endpointInfo) GetNodeName() string { return "" } // GetZone returns the Zone for this endpoint. -func (info *endpointsInfo) GetZone() string { +func (info *endpointInfo) GetZone() string { return "" } @@ -407,7 +407,7 @@ func (proxier *Proxier) onEndpointsMapChange(svcPortName *proxy.ServicePortName, if exists { // Cleanup Endpoints references for _, ep := range epInfos { - epInfo, ok := ep.(*endpointsInfo) + epInfo, ok := ep.(*endpointInfo) if ok { epInfo.Cleanup() @@ -448,7 +448,7 @@ func (proxier *Proxier) onServiceMapChange(svcPortName *proxy.ServicePortName) { } } -// returns a new proxy.Endpoint which abstracts a endpointsInfo +// returns a new proxy.Endpoint which abstracts a endpointInfo func (proxier *Proxier) newEndpointInfo(baseInfo *proxy.BaseEndpointInfo, _ *proxy.ServicePortName) proxy.Endpoint { portNumber, err := baseInfo.Port() @@ -457,7 +457,7 @@ func (proxier *Proxier) newEndpointInfo(baseInfo *proxy.BaseEndpointInfo, _ *pro portNumber = 0 } - info := &endpointsInfo{ + info := &endpointInfo{ ip: baseInfo.IP(), port: uint16(portNumber), isLocal: baseInfo.GetIsLocal(), @@ -474,8 +474,8 @@ func (proxier *Proxier) newEndpointInfo(baseInfo *proxy.BaseEndpointInfo, _ *pro return info } -func newSourceVIP(hns HostNetworkService, network string, ip string, mac string, providerAddress string) (*endpointsInfo, error) { - hnsEndpoint := &endpointsInfo{ +func newSourceVIP(hns HostNetworkService, network string, ip string, mac string, providerAddress string) (*endpointInfo, error) { + hnsEndpoint := &endpointInfo{ ip: ip, isLocal: true, macAddress: mac, @@ -489,15 +489,15 @@ func newSourceVIP(hns HostNetworkService, network string, ip string, mac string, return ep, err } -func (ep *endpointsInfo) DecrementRefCount() { - klog.V(3).InfoS("Decrementing Endpoint RefCount", "endpointsInfo", ep) +func (ep *endpointInfo) DecrementRefCount() { + klog.V(3).InfoS("Decrementing Endpoint RefCount", "endpointInfo", ep) if !ep.GetIsLocal() && ep.refCount != nil && *ep.refCount > 0 { *ep.refCount-- } } -func (ep *endpointsInfo) Cleanup() { - klog.V(3).InfoS("Endpoint cleanup", "endpointsInfo", ep) +func (ep *endpointInfo) Cleanup() { + klog.V(3).InfoS("Endpoint cleanup", "endpointInfo", ep) if !ep.GetIsLocal() && ep.refCount != nil { *ep.refCount-- @@ -601,7 +601,7 @@ type Proxier struct { // services that happened since policies were synced. For a single object, // changes are accumulated, i.e. previous is state from before all of them, // current is state after applying all of those. - endpointsChanges *proxy.EndpointChangeTracker + endpointsChanges *proxy.EndpointsChangeTracker serviceChanges *proxy.ServiceChangeTracker endPointsRefCount endPointsReferenceCountMap mu sync.Mutex // protects the following fields @@ -802,7 +802,7 @@ func NewProxier( } serviceChanges := proxy.NewServiceChangeTracker(proxier.newServiceInfo, ipFamily, recorder, proxier.serviceMapChange) - endPointChangeTracker := proxy.NewEndpointChangeTracker(hostname, proxier.newEndpointInfo, ipFamily, recorder, proxier.endpointsMapChange) + endPointChangeTracker := proxy.NewEndpointsChangeTracker(hostname, proxier.newEndpointInfo, ipFamily, recorder, proxier.endpointsMapChange) proxier.endpointsChanges = endPointChangeTracker proxier.serviceChanges = serviceChanges @@ -868,7 +868,7 @@ func (svcInfo *serviceInfo) cleanupAllPolicies(endpoints []proxy.Endpoint, mapSt } // Cleanup Endpoints references for _, ep := range endpoints { - epInfo, ok := ep.(*endpointsInfo) + epInfo, ok := ep.(*endpointInfo) if ok { if winProxyOptimization { epInfo.DecrementRefCount() @@ -1062,7 +1062,7 @@ func isNetworkNotFoundError(err error) bool { // If atleast one is not terminating, then return false func (proxier *Proxier) isAllEndpointsTerminating(svcName proxy.ServicePortName, isLocalTrafficDSR bool) bool { for _, epInfo := range proxier.endpointsMap[svcName] { - ep, ok := epInfo.(*endpointsInfo) + ep, ok := epInfo.(*endpointInfo) if !ok { continue } @@ -1087,7 +1087,7 @@ func (proxier *Proxier) isAllEndpointsTerminating(svcName proxy.ServicePortName, // If atleast one is serving, then return false func (proxier *Proxier) isAllEndpointsNonServing(svcName proxy.ServicePortName, isLocalTrafficDSR bool) bool { for _, epInfo := range proxier.endpointsMap[svcName] { - ep, ok := epInfo.(*endpointsInfo) + ep, ok := epInfo.(*endpointInfo) if !ok { continue } @@ -1102,7 +1102,7 @@ func (proxier *Proxier) isAllEndpointsNonServing(svcName proxy.ServicePortName, } // updateQueriedEndpoints updates the queriedEndpoints map with newly created endpoint details -func updateQueriedEndpoints(newHnsEndpoint *endpointsInfo, queriedEndpoints map[string]*endpointsInfo) { +func updateQueriedEndpoints(newHnsEndpoint *endpointInfo, queriedEndpoints map[string]*endpointInfo) { // store newly created endpoints in queriedEndpoints queriedEndpoints[newHnsEndpoint.hnsID] = newHnsEndpoint queriedEndpoints[newHnsEndpoint.ip] = newHnsEndpoint @@ -1130,7 +1130,7 @@ func (proxier *Proxier) syncProxyRules() { hnsNetworkName := proxier.network.name hns := proxier.hns - var gatewayHnsendpoint *endpointsInfo + var gatewayHnsendpoint *endpointInfo if proxier.forwardHealthCheckVip { gatewayHnsendpoint, _ = hns.getEndpointByName(proxier.rootHnsEndpointName) } @@ -1153,7 +1153,7 @@ func (proxier *Proxier) syncProxyRules() { endpointUpdateResult := proxier.endpointsMap.Update(proxier.endpointsChanges) deletedUDPClusterIPs := serviceUpdateResult.DeletedUDPClusterIPs - // merge stale services gathered from updateEndpointsMap + // merge stale services gathered from EndpointsMap.Update for _, svcPortName := range endpointUpdateResult.NewlyActiveUDPServices { if svcInfo, ok := proxier.svcPortMap[svcPortName]; ok && svcInfo != nil && svcInfo.Protocol() == v1.ProtocolUDP { klog.V(2).InfoS("Newly-active UDP service may have stale conntrack entries", "servicePortName", svcPortName) @@ -1168,7 +1168,7 @@ func (proxier *Proxier) syncProxyRules() { } if queriedEndpoints == nil { klog.V(4).InfoS("No existing endpoints found in HNS") - queriedEndpoints = make(map[string]*(endpointsInfo)) + queriedEndpoints = make(map[string]*(endpointInfo)) } queriedLoadBalancers, err := hns.getAllLoadBalancers() if queriedLoadBalancers == nil { @@ -1208,7 +1208,7 @@ func (proxier *Proxier) syncProxyRules() { serviceVipEndpoint := queriedEndpoints[svcInfo.ClusterIP().String()] if serviceVipEndpoint == nil { klog.V(4).InfoS("No existing remote endpoint", "IP", svcInfo.ClusterIP()) - hnsEndpoint := &endpointsInfo{ + hnsEndpoint := &endpointInfo{ ip: svcInfo.ClusterIP().String(), isLocal: false, macAddress: proxier.hostMac, @@ -1228,8 +1228,8 @@ func (proxier *Proxier) syncProxyRules() { } } - var hnsEndpoints []endpointsInfo - var hnsLocalEndpoints []endpointsInfo + var hnsEndpoints []endpointInfo + var hnsLocalEndpoints []endpointInfo klog.V(4).InfoS("Applying Policy", "serviceInfo", svcName) // Create Remote endpoints for every endpoint, corresponding to the service containsPublicIP := false @@ -1249,9 +1249,9 @@ func (proxier *Proxier) syncProxyRules() { } for _, epInfo := range proxier.endpointsMap[svcName] { - ep, ok := epInfo.(*endpointsInfo) + ep, ok := epInfo.(*endpointInfo) if !ok { - klog.ErrorS(nil, "Failed to cast endpointsInfo", "serviceName", svcName) + klog.ErrorS(nil, "Failed to cast endpointInfo", "serviceName", svcName) continue } @@ -1274,7 +1274,7 @@ func (proxier *Proxier) syncProxyRules() { } - var newHnsEndpoint *endpointsInfo + var newHnsEndpoint *endpointInfo hnsNetworkName := proxier.network.name var err error @@ -1319,7 +1319,7 @@ func (proxier *Proxier) syncProxyRules() { providerAddress = proxier.nodeIP.String() } - hnsEndpoint := &endpointsInfo{ + hnsEndpoint := &endpointInfo{ ip: ep.ip, isLocal: false, macAddress: conjureMac("02-11", netutils.ParseIPSloppy(ep.ip)), @@ -1328,13 +1328,13 @@ func (proxier *Proxier) syncProxyRules() { newHnsEndpoint, err = hns.createEndpoint(hnsEndpoint, hnsNetworkName) if err != nil { - klog.ErrorS(err, "Remote endpoint creation failed", "endpointsInfo", hnsEndpoint) + klog.ErrorS(err, "Remote endpoint creation failed", "endpointInfo", hnsEndpoint) continue } updateQueriedEndpoints(newHnsEndpoint, queriedEndpoints) } else { - hnsEndpoint := &endpointsInfo{ + hnsEndpoint := &endpointInfo{ ip: ep.ip, isLocal: false, macAddress: ep.macAddress, @@ -1371,7 +1371,7 @@ func (proxier *Proxier) syncProxyRules() { } // Save the hnsId for reference - klog.V(1).InfoS("Hns endpoint resource", "endpointsInfo", newHnsEndpoint) + klog.V(1).InfoS("Hns endpoint resource", "endpointInfo", newHnsEndpoint) hnsEndpoints = append(hnsEndpoints, *newHnsEndpoint) if newHnsEndpoint.GetIsLocal() { @@ -1384,10 +1384,10 @@ func (proxier *Proxier) syncProxyRules() { ep.hnsID = newHnsEndpoint.hnsID - klog.V(3).InfoS("Endpoint resource found", "endpointsInfo", ep) + klog.V(3).InfoS("Endpoint resource found", "endpointInfo", ep) } - klog.V(3).InfoS("Associated endpoints for service", "endpointsInfo", hnsEndpoints, "serviceName", svcName) + klog.V(3).InfoS("Associated endpoints for service", "endpointInfo", hnsEndpoints, "serviceName", svcName) if len(svcInfo.hnsID) > 0 { // This should not happen @@ -1399,7 +1399,7 @@ func (proxier *Proxier) syncProxyRules() { if len(hnsEndpoints) == 0 { if svcInfo.winProxyOptimization { // Deleting loadbalancers when there are no endpoints to serve. - klog.V(3).InfoS("Cleanup existing ", "endpointsInfo", hnsEndpoints, "serviceName", svcName) + klog.V(3).InfoS("Cleanup existing ", "endpointInfo", hnsEndpoints, "serviceName", svcName) svcInfo.deleteLoadBalancerPolicy(proxier.mapStaleLoadbalancers) } klog.ErrorS(nil, "Endpoint information not available for service, not applying any policy", "serviceName", svcName) @@ -1560,10 +1560,10 @@ func (proxier *Proxier) syncProxyRules() { nodeport = svcInfo.HealthCheckNodePort() } - proxier.deleteExistingLoadBalancer(hns, svcInfo.winProxyOptimization, &lbIngressIP.healthCheckHnsID, sourceVip, Enum(svcInfo.Protocol()), uint16(svcInfo.targetPort), uint16(svcInfo.Port()), []endpointsInfo{*gatewayHnsendpoint}, queriedLoadBalancers) + proxier.deleteExistingLoadBalancer(hns, svcInfo.winProxyOptimization, &lbIngressIP.healthCheckHnsID, sourceVip, Enum(svcInfo.Protocol()), uint16(svcInfo.targetPort), uint16(svcInfo.Port()), []endpointInfo{*gatewayHnsendpoint}, queriedLoadBalancers) hnsHealthCheckLoadBalancer, err := hns.getLoadBalancer( - []endpointsInfo{*gatewayHnsendpoint}, + []endpointInfo{*gatewayHnsendpoint}, loadBalancerFlags{isDSR: false, useMUX: svcInfo.preserveDIP, preserveDIP: svcInfo.preserveDIP}, sourceVip, lbIngressIP.ip, @@ -1623,7 +1623,7 @@ func (proxier *Proxier) syncProxyRules() { // deleteExistingLoadBalancer checks whether loadbalancer delete is needed or not. // If it is needed, the function will delete the existing loadbalancer and return true, else false. -func (proxier *Proxier) deleteExistingLoadBalancer(hns HostNetworkService, winProxyOptimization bool, lbHnsID *string, sourceVip string, protocol, intPort, extPort uint16, endpoints []endpointsInfo, queriedLoadBalancers map[loadBalancerIdentifier]*loadBalancerInfo) bool { +func (proxier *Proxier) deleteExistingLoadBalancer(hns HostNetworkService, winProxyOptimization bool, lbHnsID *string, sourceVip string, protocol, intPort, extPort uint16, endpoints []endpointInfo, queriedLoadBalancers map[loadBalancerIdentifier]*loadBalancerInfo) bool { if !winProxyOptimization || *lbHnsID == "" { // Loadbalancer delete not needed diff --git a/pkg/proxy/winkernel/proxier_test.go b/pkg/proxy/winkernel/proxier_test.go index b11a02cec12..fd2856471f9 100644 --- a/pkg/proxy/winkernel/proxier_test.go +++ b/pkg/proxy/winkernel/proxier_test.go @@ -123,8 +123,8 @@ func NewFakeProxier(syncPeriod time.Duration, minSyncPeriod time.Duration, clust } serviceChanges := proxy.NewServiceChangeTracker(proxier.newServiceInfo, v1.IPv4Protocol, nil, proxier.serviceMapChange) - endpointChangeTracker := proxy.NewEndpointChangeTracker(hostname, proxier.newEndpointInfo, v1.IPv4Protocol, nil, proxier.endpointsMapChange) - proxier.endpointsChanges = endpointChangeTracker + endpointsChangeTracker := proxy.NewEndpointsChangeTracker(hostname, proxier.newEndpointInfo, v1.IPv4Protocol, nil, proxier.endpointsMapChange) + proxier.endpointsChanges = endpointsChangeTracker proxier.serviceChanges = serviceChanges return proxier @@ -232,9 +232,9 @@ func TestCreateRemoteEndpointOverlay(t *testing.T) { proxier.syncProxyRules() ep := proxier.endpointsMap[svcPortName][0] - epInfo, ok := ep.(*endpointsInfo) + epInfo, ok := ep.(*endpointInfo) if !ok { - t.Errorf("Failed to cast endpointsInfo %q", svcPortName.String()) + t.Errorf("Failed to cast endpointInfo %q", svcPortName.String()) } else { if epInfo.hnsID != "EPID-3" { @@ -296,9 +296,9 @@ func TestCreateRemoteEndpointL2Bridge(t *testing.T) { proxier.setInitialized(true) proxier.syncProxyRules() ep := proxier.endpointsMap[svcPortName][0] - epInfo, ok := ep.(*endpointsInfo) + epInfo, ok := ep.(*endpointInfo) if !ok { - t.Errorf("Failed to cast endpointsInfo %q", svcPortName.String()) + t.Errorf("Failed to cast endpointInfo %q", svcPortName.String()) } else { if epInfo.hnsID != endpointGuid1 { @@ -389,9 +389,9 @@ func TestSharedRemoteEndpointDelete(t *testing.T) { proxier.setInitialized(true) proxier.syncProxyRules() ep := proxier.endpointsMap[svcPortName1][0] - epInfo, ok := ep.(*endpointsInfo) + epInfo, ok := ep.(*endpointInfo) if !ok { - t.Errorf("Failed to cast endpointsInfo %q", svcPortName1.String()) + t.Errorf("Failed to cast endpointInfo %q", svcPortName1.String()) } else { if epInfo.hnsID != endpointGuid1 { @@ -439,9 +439,9 @@ func TestSharedRemoteEndpointDelete(t *testing.T) { proxier.syncProxyRules() ep = proxier.endpointsMap[svcPortName1][0] - epInfo, ok = ep.(*endpointsInfo) + epInfo, ok = ep.(*endpointInfo) if !ok { - t.Errorf("Failed to cast endpointsInfo %q", svcPortName1.String()) + t.Errorf("Failed to cast endpointInfo %q", svcPortName1.String()) } else { if epInfo.hnsID != endpointGuid1 { @@ -534,9 +534,9 @@ func TestSharedRemoteEndpointUpdate(t *testing.T) { proxier.setInitialized(true) proxier.syncProxyRules() ep := proxier.endpointsMap[svcPortName1][0] - epInfo, ok := ep.(*endpointsInfo) + epInfo, ok := ep.(*endpointInfo) if !ok { - t.Errorf("Failed to cast endpointsInfo %q", svcPortName1.String()) + t.Errorf("Failed to cast endpointInfo %q", svcPortName1.String()) } else { if epInfo.hnsID != endpointGuid1 { @@ -613,10 +613,10 @@ func TestSharedRemoteEndpointUpdate(t *testing.T) { proxier.syncProxyRules() ep = proxier.endpointsMap[svcPortName1][0] - epInfo, ok = ep.(*endpointsInfo) + epInfo, ok = ep.(*endpointInfo) if !ok { - t.Errorf("Failed to cast endpointsInfo %q", svcPortName1.String()) + t.Errorf("Failed to cast endpointInfo %q", svcPortName1.String()) } else { if epInfo.hnsID != endpointGuid1 { @@ -913,9 +913,9 @@ func TestEndpointSlice(t *testing.T) { } ep := proxier.endpointsMap[svcPortName][0] - epInfo, ok := ep.(*endpointsInfo) + epInfo, ok := ep.(*endpointInfo) if !ok { - t.Errorf("Failed to cast endpointsInfo %q", svcPortName.String()) + t.Errorf("Failed to cast endpointInfo %q", svcPortName.String()) } else { if epInfo.hnsID != "EPID-3" {