diff --git a/pkg/proxy/conntrack/cleanup.go b/pkg/proxy/conntrack/cleanup.go index b3b55dab978..4d25ff2f861 100644 --- a/pkg/proxy/conntrack/cleanup.go +++ b/pkg/proxy/conntrack/cleanup.go @@ -27,23 +27,23 @@ import ( // CleanStaleEntries takes care of flushing stale conntrack entries for services and endpoints. func CleanStaleEntries(isIPv6 bool, exec utilexec.Interface, svcPortMap proxy.ServicePortMap, - serviceUpdateResult proxy.UpdateServiceMapResult, endpointUpdateResult proxy.UpdateEndpointMapResult) { + serviceUpdateResult proxy.UpdateServiceMapResult, endpointsUpdateResult proxy.UpdateEndpointsMapResult) { - deleteStaleServiceConntrackEntries(isIPv6, exec, svcPortMap, serviceUpdateResult, endpointUpdateResult) - deleteStaleEndpointConntrackEntries(exec, svcPortMap, endpointUpdateResult) + deleteStaleServiceConntrackEntries(isIPv6, exec, svcPortMap, serviceUpdateResult, endpointsUpdateResult) + deleteStaleEndpointConntrackEntries(exec, svcPortMap, endpointsUpdateResult) } // deleteStaleServiceConntrackEntries takes care of flushing stale conntrack entries related // to UDP Service IPs. When a service has no endpoints and we drop traffic to it, conntrack // may create "black hole" entries for that IP+port. When the service gets endpoints we // need to delete those entries so further traffic doesn't get dropped. -func deleteStaleServiceConntrackEntries(isIPv6 bool, exec utilexec.Interface, svcPortMap proxy.ServicePortMap, serviceUpdateResult proxy.UpdateServiceMapResult, endpointUpdateResult proxy.UpdateEndpointMapResult) { +func deleteStaleServiceConntrackEntries(isIPv6 bool, exec utilexec.Interface, svcPortMap proxy.ServicePortMap, serviceUpdateResult proxy.UpdateServiceMapResult, endpointsUpdateResult proxy.UpdateEndpointsMapResult) { conntrackCleanupServiceIPs := serviceUpdateResult.DeletedUDPClusterIPs conntrackCleanupServiceNodePorts := sets.New[int]() - // merge newly active services gathered from updateEndpointsMap + // merge newly active services gathered from endpointsUpdateResult // a UDP service that changes from 0 to non-0 endpoints is newly active. - for _, svcPortName := range endpointUpdateResult.NewlyActiveUDPServices { + for _, svcPortName := range endpointsUpdateResult.NewlyActiveUDPServices { if svcInfo, ok := svcPortMap[svcPortName]; ok { klog.V(4).InfoS("Newly-active UDP service may have stale conntrack entries", "servicePortName", svcPortName) conntrackCleanupServiceIPs.Insert(svcInfo.ClusterIP().String()) @@ -78,8 +78,8 @@ func deleteStaleServiceConntrackEntries(isIPv6 bool, exec utilexec.Interface, sv // deleteStaleEndpointConntrackEntries takes care of flushing stale conntrack entries related // to UDP endpoints. After a UDP endpoint is removed we must flush any conntrack entries // for it so that if the same client keeps sending, the packets will get routed to a new endpoint. -func deleteStaleEndpointConntrackEntries(exec utilexec.Interface, svcPortMap proxy.ServicePortMap, endpointUpdateResult proxy.UpdateEndpointMapResult) { - for _, epSvcPair := range endpointUpdateResult.DeletedUDPEndpoints { +func deleteStaleEndpointConntrackEntries(exec utilexec.Interface, svcPortMap proxy.ServicePortMap, endpointsUpdateResult proxy.UpdateEndpointsMapResult) { + for _, epSvcPair := range endpointsUpdateResult.DeletedUDPEndpoints { if svcInfo, ok := svcPortMap[epSvcPair.ServicePortName]; ok { endpointIP := proxyutil.IPPart(epSvcPair.Endpoint) nodePort := svcInfo.NodePort() diff --git a/pkg/proxy/endpoints.go b/pkg/proxy/endpoints.go index 47976dbf094..0a45c948a98 100644 --- a/pkg/proxy/endpoints.go +++ b/pkg/proxy/endpoints.go @@ -147,9 +147,9 @@ type makeEndpointFunc func(info *BaseEndpointInfo, svcPortName *ServicePortName) // EndpointsMap's but just use the changes for any Proxier specific cleanup. type processEndpointsMapChangeFunc func(oldEndpointsMap, newEndpointsMap EndpointsMap) -// EndpointChangeTracker carries state about uncommitted changes to an arbitrary number of +// EndpointsChangeTracker carries state about uncommitted changes to an arbitrary number of // Endpoints, keyed by their namespace and name. -type EndpointChangeTracker struct { +type EndpointsChangeTracker struct { // lock protects lastChangeTriggerTimes lock sync.Mutex @@ -159,16 +159,16 @@ type EndpointChangeTracker struct { // Map from the Endpoints namespaced-name to the times of the triggers that caused the endpoints // object to change. Used to calculate the network-programming-latency. lastChangeTriggerTimes map[types.NamespacedName][]time.Time - // record the time when the endpointChangeTracker was created so we can ignore the endpoints + // record the time when the endpointsChangeTracker was created so we can ignore the endpoints // that were generated before, because we can't estimate the network-programming-latency on those. // This is specially problematic on restarts, because we process all the endpoints that may have been // created hours or days before. trackerStartTime time.Time } -// NewEndpointChangeTracker initializes an EndpointsChangeMap -func NewEndpointChangeTracker(hostname string, makeEndpointInfo makeEndpointFunc, ipFamily v1.IPFamily, recorder events.EventRecorder, processEndpointsMapChange processEndpointsMapChangeFunc) *EndpointChangeTracker { - return &EndpointChangeTracker{ +// NewEndpointsChangeTracker initializes an EndpointsChangeTracker +func NewEndpointsChangeTracker(hostname string, makeEndpointInfo makeEndpointFunc, ipFamily v1.IPFamily, recorder events.EventRecorder, processEndpointsMapChange processEndpointsMapChangeFunc) *EndpointsChangeTracker { + return &EndpointsChangeTracker{ lastChangeTriggerTimes: make(map[types.NamespacedName][]time.Time), trackerStartTime: time.Now(), processEndpointsMapChange: processEndpointsMapChange, @@ -177,9 +177,9 @@ func NewEndpointChangeTracker(hostname string, makeEndpointInfo makeEndpointFunc } // EndpointSliceUpdate updates given service's endpoints change map based on the endpoints pair. -// It returns true if items changed, otherwise return false. Will add/update/delete items of EndpointsChangeMap. +// It returns true if items changed, otherwise return false. Will add/update/delete items of EndpointsChangeTracker. // If removeSlice is true, slice will be removed, otherwise it will be added or updated. -func (ect *EndpointChangeTracker) EndpointSliceUpdate(endpointSlice *discovery.EndpointSlice, removeSlice bool) bool { +func (ect *EndpointsChangeTracker) EndpointSliceUpdate(endpointSlice *discovery.EndpointSlice, removeSlice bool) bool { if !supportedEndpointSliceAddressTypes.Has(string(endpointSlice.AddressType)) { klog.V(4).InfoS("EndpointSlice address type not supported by kube-proxy", "addressType", endpointSlice.AddressType) return false @@ -225,13 +225,13 @@ func (ect *EndpointChangeTracker) EndpointSliceUpdate(endpointSlice *discovery.E // PendingChanges returns a set whose keys are the names of the services whose endpoints // have changed since the last time ect was used to update an EndpointsMap. (You must call // this _before_ calling em.Update(ect).) -func (ect *EndpointChangeTracker) PendingChanges() sets.Set[string] { +func (ect *EndpointsChangeTracker) PendingChanges() sets.Set[string] { return ect.endpointSliceCache.pendingChanges() } // checkoutChanges returns a list of pending endpointsChanges and marks them as // applied. -func (ect *EndpointChangeTracker) checkoutChanges() []*endpointsChange { +func (ect *EndpointsChangeTracker) checkoutChanges() []*endpointsChange { metrics.EndpointChangesPending.Set(0) return ect.endpointSliceCache.checkoutChanges() @@ -239,7 +239,7 @@ func (ect *EndpointChangeTracker) checkoutChanges() []*endpointsChange { // checkoutTriggerTimes applies the locally cached trigger times to a map of // trigger times that have been passed in and empties the local cache. -func (ect *EndpointChangeTracker) checkoutTriggerTimes(lastChangeTriggerTimes *map[types.NamespacedName][]time.Time) { +func (ect *EndpointsChangeTracker) checkoutTriggerTimes(lastChangeTriggerTimes *map[types.NamespacedName][]time.Time) { ect.lock.Lock() defer ect.lock.Unlock() @@ -284,8 +284,8 @@ type endpointsChange struct { current EndpointsMap } -// UpdateEndpointMapResult is the updated results after applying endpoints changes. -type UpdateEndpointMapResult struct { +// UpdateEndpointsMapResult is the updated results after applying endpoints changes. +type UpdateEndpointsMapResult struct { // DeletedUDPEndpoints identifies UDP endpoints that have just been deleted. // Existing conntrack NAT entries pointing to these endpoints must be deleted to // ensure that no further traffic for the Service gets delivered to them. @@ -304,7 +304,7 @@ type UpdateEndpointMapResult struct { } // Update updates endpointsMap base on the given changes. -func (em EndpointsMap) Update(changes *EndpointChangeTracker) (result UpdateEndpointMapResult) { +func (em EndpointsMap) Update(changes *EndpointsChangeTracker) (result UpdateEndpointsMapResult) { result.DeletedUDPEndpoints = make([]ServiceEndpoint, 0) result.NewlyActiveUDPServices = make([]ServicePortName, 0) result.LastChangeTriggerTimes = make(map[types.NamespacedName][]time.Time) @@ -321,7 +321,7 @@ type EndpointsMap map[ServicePortName][]Endpoint // and clear the changes map. In addition it returns (via argument) and resets the // lastChangeTriggerTimes for all endpoints that were changed and will result in syncing // the proxy rules. apply triggers processEndpointsMapChange on every change. -func (em EndpointsMap) apply(ect *EndpointChangeTracker, deletedUDPEndpoints *[]ServiceEndpoint, +func (em EndpointsMap) apply(ect *EndpointsChangeTracker, deletedUDPEndpoints *[]ServiceEndpoint, newlyActiveUDPServices *[]ServicePortName, lastChangeTriggerTimes *map[types.NamespacedName][]time.Time) { if ect == nil { return @@ -396,7 +396,7 @@ func (em EndpointsMap) LocalReadyEndpoints() map[types.NamespacedName]int { } // detectStaleConntrackEntries detects services that may be associated with stale conntrack entries. -// (See UpdateEndpointMapResult.DeletedUDPEndpoints and .NewlyActiveUDPServices.) +// (See UpdateEndpointsMapResult.DeletedUDPEndpoints and .NewlyActiveUDPServices.) func detectStaleConntrackEntries(oldEndpointsMap, newEndpointsMap EndpointsMap, deletedUDPEndpoints *[]ServiceEndpoint, newlyActiveUDPServices *[]ServicePortName) { // Find the UDP endpoints that we were sending traffic to in oldEndpointsMap, but // are no longer sending to newEndpointsMap. The proxier should make sure that diff --git a/pkg/proxy/endpoints_test.go b/pkg/proxy/endpoints_test.go index 824a5a91461..35bfa16ca89 100644 --- a/pkg/proxy/endpoints_test.go +++ b/pkg/proxy/endpoints_test.go @@ -1340,7 +1340,7 @@ func TestEndpointSliceUpdate(t *testing.T) { testCases := map[string]struct { startingSlices []*discovery.EndpointSlice - endpointChangeTracker *EndpointChangeTracker + endpointsChangeTracker *EndpointsChangeTracker namespacedName types.NamespacedName paramEndpointSlice *discovery.EndpointSlice paramRemoveSlice bool @@ -1350,12 +1350,12 @@ func TestEndpointSliceUpdate(t *testing.T) { }{ // test starting from an empty state "add a simple slice that doesn't already exist": { - startingSlices: []*discovery.EndpointSlice{}, - endpointChangeTracker: NewEndpointChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil), - namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"}, - paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 3, 999, 999, []string{"host1", "host2"}, []*int32{pointer.Int32(80), pointer.Int32(443)}), - paramRemoveSlice: false, - expectedReturnVal: true, + startingSlices: []*discovery.EndpointSlice{}, + endpointsChangeTracker: NewEndpointsChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil), + namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"}, + paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 3, 999, 999, []string{"host1", "host2"}, []*int32{pointer.Int32(80), pointer.Int32(443)}), + paramRemoveSlice: false, + expectedReturnVal: true, expectedCurrentChange: map[ServicePortName][]*BaseEndpointInfo{ makeServicePortName("ns1", "svc1", "port-0", v1.ProtocolTCP): { &BaseEndpointInfo{Endpoint: "10.0.1.1:80", IsLocal: false, Ready: true, Serving: true, Terminating: false}, @@ -1375,7 +1375,7 @@ func TestEndpointSliceUpdate(t *testing.T) { startingSlices: []*discovery.EndpointSlice{ generateEndpointSlice("svc1", "ns1", 1, 3, 999, 999, []string{"host1", "host2"}, []*int32{pointer.Int32(80), pointer.Int32(443)}), }, - endpointChangeTracker: NewEndpointChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil), + endpointsChangeTracker: NewEndpointsChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil), namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"}, paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 3, 999, 999, []string{"host1", "host2"}, []*int32{pointer.Int32(80), pointer.Int32(443)}), paramRemoveSlice: false, @@ -1388,7 +1388,7 @@ func TestEndpointSliceUpdate(t *testing.T) { startingSlices: []*discovery.EndpointSlice{ generateEndpointSlice("svc1", "ns1", 1, 3, 999, 999, []string{"host1", "host2"}, []*int32{pointer.Int32(80), pointer.Int32(443)}), }, - endpointChangeTracker: NewEndpointChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil), + endpointsChangeTracker: NewEndpointsChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil), namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"}, paramEndpointSlice: fqdnSlice, paramRemoveSlice: false, @@ -1402,11 +1402,11 @@ func TestEndpointSliceUpdate(t *testing.T) { generateEndpointSlice("svc1", "ns1", 1, 3, 999, 999, []string{"host1", "host2"}, []*int32{pointer.Int32(80), pointer.Int32(443)}), generateEndpointSlice("svc1", "ns1", 2, 2, 999, 999, []string{"host1", "host2"}, []*int32{pointer.Int32(80), pointer.Int32(443)}), }, - endpointChangeTracker: NewEndpointChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil), - namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"}, - paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 5, 999, 999, []string{"host1"}, []*int32{pointer.Int32(80), pointer.Int32(443)}), - paramRemoveSlice: false, - expectedReturnVal: true, + endpointsChangeTracker: NewEndpointsChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil), + namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"}, + paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 5, 999, 999, []string{"host1"}, []*int32{pointer.Int32(80), pointer.Int32(443)}), + paramRemoveSlice: false, + expectedReturnVal: true, expectedCurrentChange: map[ServicePortName][]*BaseEndpointInfo{ makeServicePortName("ns1", "svc1", "port-0", v1.ProtocolTCP): { &BaseEndpointInfo{Endpoint: "10.0.1.1:80", IsLocal: true, Ready: true, Serving: true, Terminating: false}, @@ -1435,11 +1435,11 @@ func TestEndpointSliceUpdate(t *testing.T) { generateEndpointSlice("svc1", "ns1", 1, 3, 999, 999, []string{"host1", "host2"}, []*int32{pointer.Int32(80), pointer.Int32(443)}), generateEndpointSlice("svc1", "ns1", 2, 2, 999, 999, []string{"host1", "host2"}, []*int32{pointer.Int32(80), pointer.Int32(443)}), }, - endpointChangeTracker: NewEndpointChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil), - namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"}, - paramEndpointSlice: generateEndpointSliceWithOffset("svc1", "ns1", 3, 1, 5, 999, 999, []string{"host1"}, []*int32{pointer.Int32(80)}), - paramRemoveSlice: false, - expectedReturnVal: true, + endpointsChangeTracker: NewEndpointsChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil), + namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"}, + paramEndpointSlice: generateEndpointSliceWithOffset("svc1", "ns1", 3, 1, 5, 999, 999, []string{"host1"}, []*int32{pointer.Int32(80)}), + paramRemoveSlice: false, + expectedReturnVal: true, expectedCurrentChange: map[ServicePortName][]*BaseEndpointInfo{ makeServicePortName("ns1", "svc1", "port-0", v1.ProtocolTCP): { &BaseEndpointInfo{Endpoint: "10.0.1.1:80", IsLocal: true, Ready: true, Serving: true, Terminating: false}, @@ -1466,11 +1466,11 @@ func TestEndpointSliceUpdate(t *testing.T) { generateEndpointSlice("svc1", "ns1", 1, 3, 999, 999, []string{"host1", "host2"}, []*int32{pointer.Int32(80), pointer.Int32(443)}), generateEndpointSlice("svc1", "ns1", 2, 2, 999, 999, []string{"host1", "host2"}, []*int32{pointer.Int32(80), pointer.Int32(443)}), }, - endpointChangeTracker: NewEndpointChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil), - namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"}, - paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 5, 999, 999, []string{"host1"}, []*int32{pointer.Int32(80), pointer.Int32(443)}), - paramRemoveSlice: true, - expectedReturnVal: true, + endpointsChangeTracker: NewEndpointsChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil), + namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"}, + paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 5, 999, 999, []string{"host1"}, []*int32{pointer.Int32(80), pointer.Int32(443)}), + paramRemoveSlice: true, + expectedReturnVal: true, expectedCurrentChange: map[ServicePortName][]*BaseEndpointInfo{ makeServicePortName("ns1", "svc1", "port-0", v1.ProtocolTCP): { &BaseEndpointInfo{Endpoint: "10.0.2.1:80", IsLocal: false, Ready: true, Serving: true, Terminating: false}, @@ -1489,7 +1489,7 @@ func TestEndpointSliceUpdate(t *testing.T) { generateEndpointSlice("svc1", "ns1", 1, 5, 999, 999, []string{"host1"}, []*int32{pointer.Int32(80), pointer.Int32(443)}), generateEndpointSlice("svc1", "ns1", 2, 2, 999, 999, []string{"host1"}, []*int32{pointer.Int32(80), pointer.Int32(443)}), }, - endpointChangeTracker: NewEndpointChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil), + endpointsChangeTracker: NewEndpointsChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil), namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"}, paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 3, 5, 999, 999, []string{"host1"}, []*int32{pointer.Int32(80), pointer.Int32(443)}), paramRemoveSlice: true, @@ -1502,11 +1502,11 @@ func TestEndpointSliceUpdate(t *testing.T) { startingSlices: []*discovery.EndpointSlice{ generateEndpointSlice("svc1", "ns1", 1, 3, 999, 999, []string{"host1", "host2"}, []*int32{pointer.Int32(80), pointer.Int32(443)}), }, - endpointChangeTracker: NewEndpointChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil), - namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"}, - paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 3, 1, 999, []string{"host1"}, []*int32{pointer.Int32(80), pointer.Int32(443)}), - paramRemoveSlice: false, - expectedReturnVal: true, + endpointsChangeTracker: NewEndpointsChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil), + namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"}, + paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 3, 1, 999, []string{"host1"}, []*int32{pointer.Int32(80), pointer.Int32(443)}), + paramRemoveSlice: false, + expectedReturnVal: true, expectedCurrentChange: map[ServicePortName][]*BaseEndpointInfo{ makeServicePortName("ns1", "svc1", "port-0", v1.ProtocolTCP): { &BaseEndpointInfo{Endpoint: "10.0.1.1:80", IsLocal: true, Ready: false, Serving: false, Terminating: false}, @@ -1526,11 +1526,11 @@ func TestEndpointSliceUpdate(t *testing.T) { startingSlices: []*discovery.EndpointSlice{ generateEndpointSlice("svc1", "ns1", 1, 2, 1, 999, []string{"host1", "host2"}, []*int32{pointer.Int32(80), pointer.Int32(443)}), }, - endpointChangeTracker: NewEndpointChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil), - namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"}, - paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 2, 999, 999, []string{"host1"}, []*int32{pointer.Int32(80), pointer.Int32(443)}), - paramRemoveSlice: false, - expectedReturnVal: true, + endpointsChangeTracker: NewEndpointsChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil), + namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"}, + paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 2, 999, 999, []string{"host1"}, []*int32{pointer.Int32(80), pointer.Int32(443)}), + paramRemoveSlice: false, + expectedReturnVal: true, expectedCurrentChange: map[ServicePortName][]*BaseEndpointInfo{ makeServicePortName("ns1", "svc1", "port-0", v1.ProtocolTCP): { &BaseEndpointInfo{Endpoint: "10.0.1.1:80", IsLocal: true, Ready: true, Serving: true, Terminating: false}, @@ -1549,11 +1549,11 @@ func TestEndpointSliceUpdate(t *testing.T) { generateEndpointSlice("svc1", "ns1", 1, 3, 2, 999, []string{"host1"}, []*int32{pointer.Int32(80), pointer.Int32(443)}), generateEndpointSlice("svc1", "ns1", 2, 2, 2, 999, []string{"host1"}, []*int32{pointer.Int32(80), pointer.Int32(443)}), }, - endpointChangeTracker: NewEndpointChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil), - namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"}, - paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 3, 3, 999, []string{"host1"}, []*int32{pointer.Int32(80), pointer.Int32(443)}), - paramRemoveSlice: false, - expectedReturnVal: true, + endpointsChangeTracker: NewEndpointsChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil), + namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"}, + paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 3, 3, 999, []string{"host1"}, []*int32{pointer.Int32(80), pointer.Int32(443)}), + paramRemoveSlice: false, + expectedReturnVal: true, expectedCurrentChange: map[ServicePortName][]*BaseEndpointInfo{ makeServicePortName("ns1", "svc1", "port-0", v1.ProtocolTCP): { &BaseEndpointInfo{Endpoint: "10.0.1.1:80", IsLocal: true, Ready: true, Serving: true, Terminating: false}, @@ -1578,11 +1578,11 @@ func TestEndpointSliceUpdate(t *testing.T) { generateEndpointSlice("svc1", "ns1", 1, 3, 2, 2, []string{"host1"}, []*int32{pointer.Int32(80), pointer.Int32(443)}), generateEndpointSlice("svc1", "ns1", 2, 2, 2, 2, []string{"host1"}, []*int32{pointer.Int32(80), pointer.Int32(443)}), }, - endpointChangeTracker: NewEndpointChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil), - namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"}, - paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 3, 3, 2, []string{"host1"}, []*int32{pointer.Int32(80), pointer.Int32(443)}), - paramRemoveSlice: false, - expectedReturnVal: true, + endpointsChangeTracker: NewEndpointsChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil), + namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"}, + paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 3, 3, 2, []string{"host1"}, []*int32{pointer.Int32(80), pointer.Int32(443)}), + paramRemoveSlice: false, + expectedReturnVal: true, expectedCurrentChange: map[ServicePortName][]*BaseEndpointInfo{ makeServicePortName("ns1", "svc1", "port-0", v1.ProtocolTCP): { &BaseEndpointInfo{Endpoint: "10.0.1.1:80", IsLocal: true, Ready: true, Serving: true, Terminating: false}, @@ -1605,19 +1605,19 @@ func TestEndpointSliceUpdate(t *testing.T) { for name, tc := range testCases { t.Run(name, func(t *testing.T) { - initializeCache(tc.endpointChangeTracker.endpointSliceCache, tc.startingSlices) + initializeCache(tc.endpointsChangeTracker.endpointSliceCache, tc.startingSlices) - got := tc.endpointChangeTracker.EndpointSliceUpdate(tc.paramEndpointSlice, tc.paramRemoveSlice) + got := tc.endpointsChangeTracker.EndpointSliceUpdate(tc.paramEndpointSlice, tc.paramRemoveSlice) if !reflect.DeepEqual(got, tc.expectedReturnVal) { t.Errorf("EndpointSliceUpdate return value got: %v, want %v", got, tc.expectedReturnVal) } - pendingChanges := tc.endpointChangeTracker.PendingChanges() + pendingChanges := tc.endpointsChangeTracker.PendingChanges() if !pendingChanges.Equal(tc.expectedChangedEndpoints) { t.Errorf("expected changed endpoints %q, got %q", tc.expectedChangedEndpoints.UnsortedList(), pendingChanges.UnsortedList()) } - changes := tc.endpointChangeTracker.checkoutChanges() + changes := tc.endpointsChangeTracker.checkoutChanges() if tc.expectedCurrentChange == nil { if len(changes) != 0 { t.Errorf("Expected %s to have no changes", tc.namespacedName) @@ -1637,20 +1637,20 @@ func TestCheckoutChanges(t *testing.T) { svcPortName1 := ServicePortName{types.NamespacedName{Namespace: "ns1", Name: "svc1"}, "port-1", v1.ProtocolTCP} testCases := map[string]struct { - endpointChangeTracker *EndpointChangeTracker - expectedChanges []*endpointsChange - items map[types.NamespacedName]*endpointsChange - appliedSlices []*discovery.EndpointSlice - pendingSlices []*discovery.EndpointSlice + endpointsChangeTracker *EndpointsChangeTracker + expectedChanges []*endpointsChange + items map[types.NamespacedName]*endpointsChange + appliedSlices []*discovery.EndpointSlice + pendingSlices []*discovery.EndpointSlice }{ "empty slices": { - endpointChangeTracker: NewEndpointChangeTracker("", nil, v1.IPv4Protocol, nil, nil), - expectedChanges: []*endpointsChange{}, - appliedSlices: []*discovery.EndpointSlice{}, - pendingSlices: []*discovery.EndpointSlice{}, + endpointsChangeTracker: NewEndpointsChangeTracker("", nil, v1.IPv4Protocol, nil, nil), + expectedChanges: []*endpointsChange{}, + appliedSlices: []*discovery.EndpointSlice{}, + pendingSlices: []*discovery.EndpointSlice{}, }, "adding initial slice": { - endpointChangeTracker: NewEndpointChangeTracker("", nil, v1.IPv4Protocol, nil, nil), + endpointsChangeTracker: NewEndpointsChangeTracker("", nil, v1.IPv4Protocol, nil, nil), expectedChanges: []*endpointsChange{{ previous: EndpointsMap{}, current: EndpointsMap{ @@ -1663,7 +1663,7 @@ func TestCheckoutChanges(t *testing.T) { }, }, "removing port in update": { - endpointChangeTracker: NewEndpointChangeTracker("", nil, v1.IPv4Protocol, nil, nil), + endpointsChangeTracker: NewEndpointsChangeTracker("", nil, v1.IPv4Protocol, nil, nil), expectedChanges: []*endpointsChange{{ previous: EndpointsMap{ svcPortName0: []Endpoint{newTestEp("10.0.1.1:80", "host1", true, true, false), newTestEp("10.0.1.2:80", "host1", true, true, false), newTestEp("10.0.1.3:80", "host1", false, false, false)}, @@ -1685,13 +1685,13 @@ func TestCheckoutChanges(t *testing.T) { for name, tc := range testCases { t.Run(name, func(t *testing.T) { for _, slice := range tc.appliedSlices { - tc.endpointChangeTracker.EndpointSliceUpdate(slice, false) + tc.endpointsChangeTracker.EndpointSliceUpdate(slice, false) } - tc.endpointChangeTracker.checkoutChanges() + tc.endpointsChangeTracker.checkoutChanges() for _, slice := range tc.pendingSlices { - tc.endpointChangeTracker.EndpointSliceUpdate(slice, false) + tc.endpointsChangeTracker.EndpointSliceUpdate(slice, false) } - changes := tc.endpointChangeTracker.checkoutChanges() + changes := tc.endpointsChangeTracker.checkoutChanges() if len(tc.expectedChanges) != len(changes) { t.Fatalf("Expected %d changes, got %d", len(tc.expectedChanges), len(changes)) @@ -1730,7 +1730,7 @@ func compareEndpointsMapsStr(t *testing.T, newMap EndpointsMap, expected map[Ser for i := range expected[x] { newEp, ok := newMap[x][i].(*BaseEndpointInfo) if !ok { - t.Fatalf("Failed to cast endpointsInfo") + t.Fatalf("Failed to cast endpointInfo") } if !endpointEqual(newEp, expected[x][i]) { t.Fatalf("expected new[%v][%d] to be %v, got %v"+ diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index e8c252ccd58..95001d38c2a 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -122,15 +122,15 @@ func newServiceInfo(port *v1.ServicePort, service *v1.Service, bsvcPortInfo *pro } // internal struct for endpoints information -type endpointsInfo struct { +type endpointInfo struct { *proxy.BaseEndpointInfo ChainName utiliptables.Chain } -// returns a new proxy.Endpoint which abstracts a endpointsInfo +// returns a new proxy.Endpoint which abstracts a endpointInfo func newEndpointInfo(baseInfo *proxy.BaseEndpointInfo, svcPortName *proxy.ServicePortName) proxy.Endpoint { - return &endpointsInfo{ + return &endpointInfo{ BaseEndpointInfo: baseInfo, ChainName: servicePortEndpointChainName(svcPortName.String(), strings.ToLower(string(svcPortName.Protocol)), baseInfo.Endpoint), } @@ -143,7 +143,7 @@ type Proxier struct { // services that happened since iptables was synced. For a single object, // changes are accumulated, i.e. previous is state from before all of them, // current is state after applying all of those. - endpointsChanges *proxy.EndpointChangeTracker + endpointsChanges *proxy.EndpointsChangeTracker serviceChanges *proxy.ServiceChangeTracker mu sync.Mutex // protects the following fields @@ -265,7 +265,7 @@ func NewProxier(ipFamily v1.IPFamily, svcPortMap: make(proxy.ServicePortMap), serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, ipFamily, recorder, nil), endpointsMap: make(proxy.EndpointsMap), - endpointsChanges: proxy.NewEndpointChangeTracker(hostname, newEndpointInfo, ipFamily, recorder, nil), + endpointsChanges: proxy.NewEndpointsChangeTracker(hostname, newEndpointInfo, ipFamily, recorder, nil), needFullSync: true, syncPeriod: syncPeriod, iptables: ipt, @@ -952,7 +952,7 @@ func (proxier *Proxier) syncProxyRules() { // Note the endpoint chains that will be used for _, ep := range allLocallyReachableEndpoints { - if epInfo, ok := ep.(*endpointsInfo); ok { + if epInfo, ok := ep.(*endpointInfo); ok { activeNATChains[epInfo.ChainName] = true } } @@ -1345,9 +1345,9 @@ func (proxier *Proxier) syncProxyRules() { // Generate the per-endpoint chains. for _, ep := range allLocallyReachableEndpoints { - epInfo, ok := ep.(*endpointsInfo) + epInfo, ok := ep.(*endpointInfo) if !ok { - klog.ErrorS(nil, "Failed to cast endpointsInfo", "endpointsInfo", ep) + klog.ErrorS(nil, "Failed to cast endpointInfo", "endpointInfo", ep) continue } @@ -1556,7 +1556,7 @@ func (proxier *Proxier) writeServiceToEndpointRules(natRules proxyutil.LineBuffe // First write session affinity rules, if applicable. if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP { for _, ep := range endpoints { - epInfo, ok := ep.(*endpointsInfo) + epInfo, ok := ep.(*endpointInfo) if !ok { continue } @@ -1578,7 +1578,7 @@ func (proxier *Proxier) writeServiceToEndpointRules(natRules proxyutil.LineBuffe // Now write loadbalancing rules. numEndpoints := len(endpoints) for i, ep := range endpoints { - epInfo, ok := ep.(*endpointsInfo) + epInfo, ok := ep.(*endpointInfo) if !ok { continue } diff --git a/pkg/proxy/iptables/proxier_test.go b/pkg/proxy/iptables/proxier_test.go index 6dc6a2c8335..3ba9d26581a 100644 --- a/pkg/proxy/iptables/proxier_test.go +++ b/pkg/proxy/iptables/proxier_test.go @@ -330,7 +330,7 @@ func NewFakeProxier(ipt utiliptables.Interface) *Proxier { svcPortMap: make(proxy.ServicePortMap), serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, ipfamily, nil, nil), endpointsMap: make(proxy.EndpointsMap), - endpointsChanges: proxy.NewEndpointChangeTracker(testHostname, newEndpointInfo, ipfamily, nil, nil), + endpointsChanges: proxy.NewEndpointsChangeTracker(testHostname, newEndpointInfo, ipfamily, nil, nil), needFullSync: true, iptables: ipt, masqueradeMark: "0x4000", @@ -3371,7 +3371,7 @@ func makeServiceMap(proxier *Proxier, allServices ...*v1.Service) { proxier.servicesSynced = true } -func compareEndpointsMapsExceptChainName(t *testing.T, tci int, newMap proxy.EndpointsMap, expected map[proxy.ServicePortName][]*endpointsInfo) { +func compareEndpointsMapsExceptChainName(t *testing.T, tci int, newMap proxy.EndpointsMap, expected map[proxy.ServicePortName][]*endpointInfo) { if len(newMap) != len(expected) { t.Errorf("[%d] expected %d results, got %d: %v", tci, len(expected), len(newMap), newMap) } @@ -3380,9 +3380,9 @@ func compareEndpointsMapsExceptChainName(t *testing.T, tci int, newMap proxy.End t.Errorf("[%d] expected %d endpoints for %v, got %d", tci, len(expected[x]), x, len(newMap[x])) } else { for i := range expected[x] { - newEp, ok := newMap[x][i].(*endpointsInfo) + newEp, ok := newMap[x][i].(*endpointInfo) if !ok { - t.Errorf("Failed to cast endpointsInfo") + t.Errorf("Failed to cast endpointInfo") continue } if newEp.Endpoint != expected[x][i].Endpoint || @@ -3731,16 +3731,16 @@ func TestUpdateEndpointsMap(t *testing.T) { name string previousEndpoints []*discovery.EndpointSlice currentEndpoints []*discovery.EndpointSlice - oldEndpoints map[proxy.ServicePortName][]*endpointsInfo - expectedResult map[proxy.ServicePortName][]*endpointsInfo + oldEndpoints map[proxy.ServicePortName][]*endpointInfo + expectedResult map[proxy.ServicePortName][]*endpointInfo expectedDeletedUDPEndpoints []proxy.ServiceEndpoint expectedNewlyActiveUDPServices map[proxy.ServicePortName]bool expectedLocalEndpoints map[types.NamespacedName]int }{{ // Case[0]: nothing name: "nothing", - oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{}, - expectedResult: map[proxy.ServicePortName][]*endpointsInfo{}, + oldEndpoints: map[proxy.ServicePortName][]*endpointInfo{}, + expectedResult: map[proxy.ServicePortName][]*endpointInfo{}, expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{}, expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{}, expectedLocalEndpoints: map[types.NamespacedName]int{}, @@ -3749,12 +3749,12 @@ func TestUpdateEndpointsMap(t *testing.T) { name: "no change, named port, local", previousEndpoints: namedPortLocal, currentEndpoints: namedPortLocal, - oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ + oldEndpoints: map[proxy.ServicePortName][]*endpointInfo{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.1:11", IsLocal: true, Ready: true, Serving: true, Terminating: false}}, }, }, - expectedResult: map[proxy.ServicePortName][]*endpointsInfo{ + expectedResult: map[proxy.ServicePortName][]*endpointInfo{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.1:11", IsLocal: true, Ready: true, Serving: true, Terminating: false}}, }, @@ -3769,7 +3769,7 @@ func TestUpdateEndpointsMap(t *testing.T) { name: "no change, multiple subsets", previousEndpoints: multipleSubsets, currentEndpoints: multipleSubsets, - oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ + oldEndpoints: map[proxy.ServicePortName][]*endpointInfo{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}}, }, @@ -3777,7 +3777,7 @@ func TestUpdateEndpointsMap(t *testing.T) { {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.2:12", IsLocal: false, Ready: true, Serving: true, Terminating: false}}, }, }, - expectedResult: map[proxy.ServicePortName][]*endpointsInfo{ + expectedResult: map[proxy.ServicePortName][]*endpointInfo{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}}, }, @@ -3793,7 +3793,7 @@ func TestUpdateEndpointsMap(t *testing.T) { name: "no change, multiple subsets, multiple ports, local", previousEndpoints: multipleSubsetsMultiplePortsLocal, currentEndpoints: multipleSubsetsMultiplePortsLocal, - oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ + oldEndpoints: map[proxy.ServicePortName][]*endpointInfo{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.1:11", IsLocal: true, Ready: true, Serving: true, Terminating: false}}, }, @@ -3804,7 +3804,7 @@ func TestUpdateEndpointsMap(t *testing.T) { {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.3:13", IsLocal: false, Ready: true, Serving: true, Terminating: false}}, }, }, - expectedResult: map[proxy.ServicePortName][]*endpointsInfo{ + expectedResult: map[proxy.ServicePortName][]*endpointInfo{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.1:11", IsLocal: true, Ready: true, Serving: true, Terminating: false}}, }, @@ -3825,7 +3825,7 @@ func TestUpdateEndpointsMap(t *testing.T) { name: "no change, multiple endpoints, subsets, IPs, and ports", previousEndpoints: multipleSubsetsIPsPorts, currentEndpoints: multipleSubsetsIPsPorts, - oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ + oldEndpoints: map[proxy.ServicePortName][]*endpointInfo{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}}, {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.2:11", IsLocal: true, Ready: true, Serving: true, Terminating: false}}, @@ -3851,7 +3851,7 @@ func TestUpdateEndpointsMap(t *testing.T) { {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.2.2.2:22", IsLocal: true, Ready: true, Serving: true, Terminating: false}}, }, }, - expectedResult: map[proxy.ServicePortName][]*endpointsInfo{ + expectedResult: map[proxy.ServicePortName][]*endpointInfo{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}}, {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.2:11", IsLocal: true, Ready: true, Serving: true, Terminating: false}}, @@ -3888,8 +3888,8 @@ func TestUpdateEndpointsMap(t *testing.T) { name: "add an Endpoints", previousEndpoints: []*discovery.EndpointSlice{nil}, currentEndpoints: namedPortLocal, - oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{}, - expectedResult: map[proxy.ServicePortName][]*endpointsInfo{ + oldEndpoints: map[proxy.ServicePortName][]*endpointInfo{}, + expectedResult: map[proxy.ServicePortName][]*endpointInfo{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.1:11", IsLocal: true, Ready: true, Serving: true, Terminating: false}}, }, @@ -3906,12 +3906,12 @@ func TestUpdateEndpointsMap(t *testing.T) { name: "remove an Endpoints", previousEndpoints: namedPortLocal, currentEndpoints: []*discovery.EndpointSlice{nil}, - oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ + oldEndpoints: map[proxy.ServicePortName][]*endpointInfo{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.1:11", IsLocal: true, Ready: true, Serving: true, Terminating: false}}, }, }, - expectedResult: map[proxy.ServicePortName][]*endpointsInfo{}, + expectedResult: map[proxy.ServicePortName][]*endpointInfo{}, expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{{ Endpoint: "10.1.1.1:11", ServicePortName: makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP), @@ -3923,12 +3923,12 @@ func TestUpdateEndpointsMap(t *testing.T) { name: "add an IP and port", previousEndpoints: namedPort, currentEndpoints: namedPortsLocalNoLocal, - oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ + oldEndpoints: map[proxy.ServicePortName][]*endpointInfo{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}}, }, }, - expectedResult: map[proxy.ServicePortName][]*endpointsInfo{ + expectedResult: map[proxy.ServicePortName][]*endpointInfo{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}}, {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.2:11", IsLocal: true, Ready: true, Serving: true, Terminating: false}}, @@ -3950,7 +3950,7 @@ func TestUpdateEndpointsMap(t *testing.T) { name: "remove an IP and port", previousEndpoints: namedPortsLocalNoLocal, currentEndpoints: namedPort, - oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ + oldEndpoints: map[proxy.ServicePortName][]*endpointInfo{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}}, {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.2:11", IsLocal: true, Ready: true, Serving: true, Terminating: false}}, @@ -3960,7 +3960,7 @@ func TestUpdateEndpointsMap(t *testing.T) { {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.2:12", IsLocal: true, Ready: true, Serving: true, Terminating: false}}, }, }, - expectedResult: map[proxy.ServicePortName][]*endpointsInfo{ + expectedResult: map[proxy.ServicePortName][]*endpointInfo{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}}, }, @@ -3982,12 +3982,12 @@ func TestUpdateEndpointsMap(t *testing.T) { name: "add a subset", previousEndpoints: []*discovery.EndpointSlice{namedPort[0], nil}, currentEndpoints: multipleSubsetsWithLocal, - oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ + oldEndpoints: map[proxy.ServicePortName][]*endpointInfo{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}}, }, }, - expectedResult: map[proxy.ServicePortName][]*endpointsInfo{ + expectedResult: map[proxy.ServicePortName][]*endpointInfo{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}}, }, @@ -4007,7 +4007,7 @@ func TestUpdateEndpointsMap(t *testing.T) { name: "remove a subset", previousEndpoints: multipleSubsets, currentEndpoints: []*discovery.EndpointSlice{namedPort[0], nil}, - oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ + oldEndpoints: map[proxy.ServicePortName][]*endpointInfo{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}}, }, @@ -4015,7 +4015,7 @@ func TestUpdateEndpointsMap(t *testing.T) { {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.2:12", IsLocal: false, Ready: true, Serving: true, Terminating: false}}, }, }, - expectedResult: map[proxy.ServicePortName][]*endpointsInfo{ + expectedResult: map[proxy.ServicePortName][]*endpointInfo{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}}, }, @@ -4031,12 +4031,12 @@ func TestUpdateEndpointsMap(t *testing.T) { name: "rename a port", previousEndpoints: namedPort, currentEndpoints: namedPortRenamed, - oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ + oldEndpoints: map[proxy.ServicePortName][]*endpointInfo{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}}, }, }, - expectedResult: map[proxy.ServicePortName][]*endpointsInfo{ + expectedResult: map[proxy.ServicePortName][]*endpointInfo{ makeServicePortName("ns1", "ep1", "p11-2", v1.ProtocolUDP): { {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}}, }, @@ -4054,12 +4054,12 @@ func TestUpdateEndpointsMap(t *testing.T) { name: "renumber a port", previousEndpoints: namedPort, currentEndpoints: namedPortRenumbered, - oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ + oldEndpoints: map[proxy.ServicePortName][]*endpointInfo{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}}, }, }, - expectedResult: map[proxy.ServicePortName][]*endpointsInfo{ + expectedResult: map[proxy.ServicePortName][]*endpointInfo{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.1:22", IsLocal: false, Ready: true, Serving: true, Terminating: false}}, }, @@ -4075,7 +4075,7 @@ func TestUpdateEndpointsMap(t *testing.T) { name: "complex add and remove", previousEndpoints: complexBefore, currentEndpoints: complexAfter, - oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ + oldEndpoints: map[proxy.ServicePortName][]*endpointInfo{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}}, }, @@ -4094,7 +4094,7 @@ func TestUpdateEndpointsMap(t *testing.T) { {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.4.4.6:45", IsLocal: true, Ready: true, Serving: true, Terminating: false}}, }, }, - expectedResult: map[proxy.ServicePortName][]*endpointsInfo{ + expectedResult: map[proxy.ServicePortName][]*endpointInfo{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.11:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}}, {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}}, @@ -4141,8 +4141,8 @@ func TestUpdateEndpointsMap(t *testing.T) { name: "change from 0 endpoint address to 1 unnamed port", previousEndpoints: emptyEndpointSlices, currentEndpoints: namedPort, - oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{}, - expectedResult: map[proxy.ServicePortName][]*endpointsInfo{ + oldEndpoints: map[proxy.ServicePortName][]*endpointInfo{}, + expectedResult: map[proxy.ServicePortName][]*endpointInfo{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}}, }, diff --git a/pkg/proxy/ipvs/proxier.go b/pkg/proxy/ipvs/proxier.go index 87ede823c60..8dcdd4cd46d 100644 --- a/pkg/proxy/ipvs/proxier.go +++ b/pkg/proxy/ipvs/proxier.go @@ -219,7 +219,7 @@ type Proxier struct { // services that happened since last syncProxyRules call. For a single object, // changes are accumulated, i.e. previous is state from before all of them, // current is state after applying all of those. - endpointsChanges *proxy.EndpointChangeTracker + endpointsChanges *proxy.EndpointsChangeTracker serviceChanges *proxy.ServiceChangeTracker mu sync.Mutex // protects the following fields @@ -413,7 +413,7 @@ func NewProxier(ipFamily v1.IPFamily, svcPortMap: make(proxy.ServicePortMap), serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, ipFamily, recorder, nil), endpointsMap: make(proxy.EndpointsMap), - endpointsChanges: proxy.NewEndpointChangeTracker(hostname, nil, ipFamily, recorder, nil), + endpointsChanges: proxy.NewEndpointsChangeTracker(hostname, nil, ipFamily, recorder, nil), initialSync: true, syncPeriod: syncPeriod, minSyncPeriod: minSyncPeriod, diff --git a/pkg/proxy/ipvs/proxier_test.go b/pkg/proxy/ipvs/proxier_test.go index 569d0179ec0..05064a76fc8 100644 --- a/pkg/proxy/ipvs/proxier_test.go +++ b/pkg/proxy/ipvs/proxier_test.go @@ -154,7 +154,7 @@ func NewFakeProxier(ipt utiliptables.Interface, ipvs utilipvs.Interface, ipset u svcPortMap: make(proxy.ServicePortMap), serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, ipFamily, nil, nil), endpointsMap: make(proxy.EndpointsMap), - endpointsChanges: proxy.NewEndpointChangeTracker(testHostname, nil, ipFamily, nil, nil), + endpointsChanges: proxy.NewEndpointsChangeTracker(testHostname, nil, ipFamily, nil, nil), excludeCIDRs: excludeCIDRs, iptables: ipt, ipvs: ipvs, @@ -4358,7 +4358,7 @@ raid10 57344 0 - Live 0xffffffffc0597000`, } // The majority of EndpointSlice specific tests are not ipvs specific and focus on -// the shared EndpointChangeTracker and EndpointSliceCache. This test ensures that the +// the shared EndpointsChangeTracker and EndpointSliceCache. This test ensures that the // ipvs proxier supports translating EndpointSlices to ipvs output. func TestEndpointSliceE2E(t *testing.T) { ipt := iptablestest.NewFake() diff --git a/pkg/proxy/service_test.go b/pkg/proxy/service_test.go index e8d93f2b597..f21b268dff3 100644 --- a/pkg/proxy/service_test.go +++ b/pkg/proxy/service_test.go @@ -595,7 +595,7 @@ func TestServiceToServiceMap(t *testing.T) { } type FakeProxier struct { - endpointsChanges *EndpointChangeTracker + endpointsChanges *EndpointsChangeTracker serviceChanges *ServiceChangeTracker svcPortMap ServicePortMap endpointsMap EndpointsMap @@ -607,7 +607,7 @@ func newFakeProxier(ipFamily v1.IPFamily, t time.Time) *FakeProxier { svcPortMap: make(ServicePortMap), serviceChanges: NewServiceChangeTracker(nil, ipFamily, nil, nil), endpointsMap: make(EndpointsMap), - endpointsChanges: &EndpointChangeTracker{ + endpointsChanges: &EndpointsChangeTracker{ lastChangeTriggerTimes: make(map[types.NamespacedName][]time.Time), trackerStartTime: t, processEndpointsMapChange: nil, diff --git a/pkg/proxy/winkernel/hns.go b/pkg/proxy/winkernel/hns.go index 9fa83e14ef9..b11eb7cacd4 100644 --- a/pkg/proxy/winkernel/hns.go +++ b/pkg/proxy/winkernel/hns.go @@ -32,13 +32,13 @@ import ( type HostNetworkService interface { getNetworkByName(name string) (*hnsNetworkInfo, error) - getAllEndpointsByNetwork(networkName string) (map[string]*endpointsInfo, error) - getEndpointByID(id string) (*endpointsInfo, error) - getEndpointByIpAddress(ip string, networkName string) (*endpointsInfo, error) - getEndpointByName(id string) (*endpointsInfo, error) - createEndpoint(ep *endpointsInfo, networkName string) (*endpointsInfo, error) + getAllEndpointsByNetwork(networkName string) (map[string]*endpointInfo, error) + getEndpointByID(id string) (*endpointInfo, error) + getEndpointByIpAddress(ip string, networkName string) (*endpointInfo, error) + getEndpointByName(id string) (*endpointInfo, error) + createEndpoint(ep *endpointInfo, networkName string) (*endpointInfo, error) deleteEndpoint(hnsID string) error - getLoadBalancer(endpoints []endpointsInfo, flags loadBalancerFlags, sourceVip string, vip string, protocol uint16, internalPort uint16, externalPort uint16, previousLoadBalancers map[loadBalancerIdentifier]*loadBalancerInfo) (*loadBalancerInfo, error) + getLoadBalancer(endpoints []endpointInfo, flags loadBalancerFlags, sourceVip string, vip string, protocol uint16, internalPort uint16, externalPort uint16, previousLoadBalancers map[loadBalancerIdentifier]*loadBalancerInfo) (*loadBalancerInfo, error) getAllLoadBalancers() (map[loadBalancerIdentifier]*loadBalancerInfo, error) deleteLoadBalancer(hnsID string) error } @@ -87,7 +87,7 @@ func (hns hns) getNetworkByName(name string) (*hnsNetworkInfo, error) { }, nil } -func (hns hns) getAllEndpointsByNetwork(networkName string) (map[string]*(endpointsInfo), error) { +func (hns hns) getAllEndpointsByNetwork(networkName string) (map[string]*(endpointInfo), error) { hcnnetwork, err := hns.hcn.GetNetworkByName(networkName) if err != nil { klog.ErrorS(err, "failed to get HNS network by name", "name", networkName) @@ -97,7 +97,7 @@ func (hns hns) getAllEndpointsByNetwork(networkName string) (map[string]*(endpoi if err != nil { return nil, fmt.Errorf("failed to list endpoints: %w", err) } - endpointInfos := make(map[string]*(endpointsInfo)) + endpointInfos := make(map[string]*(endpointInfo)) for _, ep := range endpoints { if len(ep.IpConfigurations) == 0 { @@ -108,7 +108,7 @@ func (hns hns) getAllEndpointsByNetwork(networkName string) (map[string]*(endpoi // Add to map with key endpoint ID or IP address // Storing this is expensive in terms of memory, however there is a bug in Windows Server 2019 that can cause two endpoints to be created with the same IP address. // TODO: Store by IP only and remove any lookups by endpoint ID. - endpointInfos[ep.Id] = &endpointsInfo{ + endpointInfos[ep.Id] = &endpointInfo{ ip: ep.IpConfigurations[0].IpAddress, isLocal: uint32(ep.Flags&hcn.EndpointFlagsRemoteEndpoint) == 0, macAddress: ep.MacAddress, @@ -127,7 +127,7 @@ func (hns hns) getAllEndpointsByNetwork(networkName string) (map[string]*(endpoi // If ipFamilyPolicy is RequireDualStack or PreferDualStack, then there will be 2 IPS (iPV4 and IPV6) // in the endpoint list - endpointDualstack := &endpointsInfo{ + endpointDualstack := &endpointInfo{ ip: ep.IpConfigurations[1].IpAddress, isLocal: uint32(ep.Flags&hcn.EndpointFlagsRemoteEndpoint) == 0, macAddress: ep.MacAddress, @@ -145,12 +145,12 @@ func (hns hns) getAllEndpointsByNetwork(networkName string) (map[string]*(endpoi return endpointInfos, nil } -func (hns hns) getEndpointByID(id string) (*endpointsInfo, error) { +func (hns hns) getEndpointByID(id string) (*endpointInfo, error) { hnsendpoint, err := hns.hcn.GetEndpointByID(id) if err != nil { return nil, err } - return &endpointsInfo{ //TODO: fill out PA + return &endpointInfo{ //TODO: fill out PA ip: hnsendpoint.IpConfigurations[0].IpAddress, isLocal: uint32(hnsendpoint.Flags&hcn.EndpointFlagsRemoteEndpoint) == 0, //TODO: Change isLocal to isRemote macAddress: hnsendpoint.MacAddress, @@ -158,7 +158,7 @@ func (hns hns) getEndpointByID(id string) (*endpointsInfo, error) { hns: hns, }, nil } -func (hns hns) getEndpointByIpAddress(ip string, networkName string) (*endpointsInfo, error) { +func (hns hns) getEndpointByIpAddress(ip string, networkName string) (*endpointInfo, error) { hnsnetwork, err := hns.hcn.GetNetworkByName(networkName) if err != nil { klog.ErrorS(err, "Error getting network by name") @@ -179,7 +179,7 @@ func (hns hns) getEndpointByIpAddress(ip string, networkName string) (*endpoints } } if equal && strings.EqualFold(endpoint.HostComputeNetwork, hnsnetwork.Id) { - return &endpointsInfo{ + return &endpointInfo{ ip: ip, isLocal: uint32(endpoint.Flags&hcn.EndpointFlagsRemoteEndpoint) == 0, //TODO: Change isLocal to isRemote macAddress: endpoint.MacAddress, @@ -190,12 +190,12 @@ func (hns hns) getEndpointByIpAddress(ip string, networkName string) (*endpoints } return nil, fmt.Errorf("Endpoint %v not found on network %s", ip, networkName) } -func (hns hns) getEndpointByName(name string) (*endpointsInfo, error) { +func (hns hns) getEndpointByName(name string) (*endpointInfo, error) { hnsendpoint, err := hns.hcn.GetEndpointByName(name) if err != nil { return nil, err } - return &endpointsInfo{ //TODO: fill out PA + return &endpointInfo{ //TODO: fill out PA ip: hnsendpoint.IpConfigurations[0].IpAddress, isLocal: uint32(hnsendpoint.Flags&hcn.EndpointFlagsRemoteEndpoint) == 0, //TODO: Change isLocal to isRemote macAddress: hnsendpoint.MacAddress, @@ -203,7 +203,7 @@ func (hns hns) getEndpointByName(name string) (*endpointsInfo, error) { hns: hns, }, nil } -func (hns hns) createEndpoint(ep *endpointsInfo, networkName string) (*endpointsInfo, error) { +func (hns hns) createEndpoint(ep *endpointInfo, networkName string) (*endpointInfo, error) { hnsNetwork, err := hns.hcn.GetNetworkByName(networkName) if err != nil { return nil, err @@ -251,7 +251,7 @@ func (hns hns) createEndpoint(ep *endpointsInfo, networkName string) (*endpoints return nil, err } } - return &endpointsInfo{ + return &endpointInfo{ ip: createdEndpoint.IpConfigurations[0].IpAddress, isLocal: uint32(createdEndpoint.Flags&hcn.EndpointFlagsRemoteEndpoint) == 0, macAddress: createdEndpoint.MacAddress, @@ -273,7 +273,7 @@ func (hns hns) deleteEndpoint(hnsID string) error { } // findLoadBalancerID will construct a id from the provided loadbalancer fields -func findLoadBalancerID(endpoints []endpointsInfo, vip string, protocol, internalPort, externalPort uint16) (loadBalancerIdentifier, error) { +func findLoadBalancerID(endpoints []endpointInfo, vip string, protocol, internalPort, externalPort uint16) (loadBalancerIdentifier, error) { // Compute hash from backends (endpoint IDs) hash, err := hashEndpoints(endpoints) if err != nil { @@ -315,7 +315,7 @@ func (hns hns) getAllLoadBalancers() (map[loadBalancerIdentifier]*loadBalancerIn return loadBalancers, nil } -func (hns hns) getLoadBalancer(endpoints []endpointsInfo, flags loadBalancerFlags, sourceVip string, vip string, protocol uint16, internalPort uint16, externalPort uint16, previousLoadBalancers map[loadBalancerIdentifier]*loadBalancerInfo) (*loadBalancerInfo, error) { +func (hns hns) getLoadBalancer(endpoints []endpointInfo, flags loadBalancerFlags, sourceVip string, vip string, protocol uint16, internalPort uint16, externalPort uint16, previousLoadBalancers map[loadBalancerIdentifier]*loadBalancerInfo) (*loadBalancerInfo, error) { var id loadBalancerIdentifier vips := []string{} // Compute hash from backends (endpoint IDs) @@ -424,7 +424,7 @@ func (hns hns) deleteLoadBalancer(hnsID string) error { } // Calculates a hash from the given endpoint IDs. -func hashEndpoints[T string | endpointsInfo](endpoints []T) (hash [20]byte, err error) { +func hashEndpoints[T string | endpointInfo](endpoints []T) (hash [20]byte, err error) { var id string // Recover in case something goes wrong. Return error and null byte array. defer func() { @@ -437,7 +437,7 @@ func hashEndpoints[T string | endpointsInfo](endpoints []T) (hash [20]byte, err // Iterate over endpoints, compute hash for _, ep := range endpoints { switch x := any(ep).(type) { - case endpointsInfo: + case endpointInfo: id = strings.ToUpper(x.hnsID) case string: id = x diff --git a/pkg/proxy/winkernel/hns_test.go b/pkg/proxy/winkernel/hns_test.go index 6e674425f78..1567d467c09 100644 --- a/pkg/proxy/winkernel/hns_test.go +++ b/pkg/proxy/winkernel/hns_test.go @@ -203,7 +203,7 @@ func TestCreateEndpointLocal(t *testing.T) { hns := hns{hcn: newHcnImpl()} Network := mustTestNetwork(t) - endpoint := &endpointsInfo{ + endpoint := &endpointInfo{ ip: epIpAddress, macAddress: epMacAddress, isLocal: true, @@ -242,7 +242,7 @@ func TestCreateEndpointRemote(t *testing.T) { Network := mustTestNetwork(t) providerAddress := epPaAddress - endpoint := &endpointsInfo{ + endpoint := &endpointInfo{ ip: epIpAddressRemote, macAddress: epMacAddress, isLocal: false, @@ -350,11 +350,11 @@ func TestGetLoadBalancerExisting(t *testing.T) { if err != nil { t.Error(err) } - endpoint := &endpointsInfo{ + endpoint := &endpointInfo{ ip: Endpoint.IpConfigurations[0].IpAddress, hnsID: Endpoint.Id, } - endpoints := []endpointsInfo{*endpoint} + endpoints := []endpointInfo{*endpoint} hash, err := hashEndpoints(endpoints) if err != nil { t.Error(err) @@ -409,11 +409,11 @@ func TestGetLoadBalancerNew(t *testing.T) { if err != nil { t.Error(err) } - endpoint := &endpointsInfo{ + endpoint := &endpointInfo{ ip: Endpoint.IpConfigurations[0].IpAddress, hnsID: Endpoint.Id, } - endpoints := []endpointsInfo{*endpoint} + endpoints := []endpointInfo{*endpoint} lb, err := hns.getLoadBalancer(endpoints, loadBalancerFlags{}, sourceVip, serviceVip, protocol, internalPort, externalPort, lbs) if err != nil { t.Error(err) @@ -523,7 +523,7 @@ func TestHashEndpoints(t *testing.T) { if err != nil { t.Error(err) } - endpointInfoA := &endpointsInfo{ + endpointInfoA := &endpointInfo{ ip: endpointA.IpConfigurations[0].IpAddress, hnsID: endpointA.Id, } @@ -543,12 +543,12 @@ func TestHashEndpoints(t *testing.T) { if err != nil { t.Error(err) } - endpointInfoB := &endpointsInfo{ + endpointInfoB := &endpointInfo{ ip: endpointB.IpConfigurations[0].IpAddress, hnsID: endpointB.Id, } - endpoints := []endpointsInfo{*endpointInfoA, *endpointInfoB} - endpointsReverse := []endpointsInfo{*endpointInfoB, *endpointInfoA} + endpoints := []endpointInfo{*endpointInfoA, *endpointInfoB} + endpointsReverse := []endpointInfo{*endpointInfoB, *endpointInfoA} h1, err := hashEndpoints(endpoints) if err != nil { t.Error(err) diff --git a/pkg/proxy/winkernel/proxier.go b/pkg/proxy/winkernel/proxier.go index 642a60cdece..522a15584a2 100644 --- a/pkg/proxy/winkernel/proxier.go +++ b/pkg/proxy/winkernel/proxier.go @@ -123,7 +123,7 @@ type serviceInfo struct { hnsID string nodePorthnsID string policyApplied bool - remoteEndpoint *endpointsInfo + remoteEndpoint *endpointInfo hns HostNetworkService preserveDIP bool localTrafficDSR bool @@ -273,7 +273,7 @@ func (t DualStackCompatTester) DualStackCompatible(networkName string) bool { } // internal struct for endpoints information -type endpointsInfo struct { +type endpointInfo struct { ip string port uint16 isLocal bool @@ -290,57 +290,57 @@ type endpointsInfo struct { } // String is part of proxy.Endpoint interface. -func (info *endpointsInfo) String() string { +func (info *endpointInfo) String() string { return net.JoinHostPort(info.ip, strconv.Itoa(int(info.port))) } // GetIsLocal is part of proxy.Endpoint interface. -func (info *endpointsInfo) GetIsLocal() bool { +func (info *endpointInfo) GetIsLocal() bool { return info.isLocal } // IsReady returns true if an endpoint is ready and not terminating. -func (info *endpointsInfo) IsReady() bool { +func (info *endpointInfo) IsReady() bool { return info.ready } // IsServing returns true if an endpoint is ready, regardless of it's terminating state. -func (info *endpointsInfo) IsServing() bool { +func (info *endpointInfo) IsServing() bool { return info.serving } // IsTerminating returns true if an endpoint is terminating. -func (info *endpointsInfo) IsTerminating() bool { +func (info *endpointInfo) IsTerminating() bool { return info.terminating } // GetZoneHint returns the zone hint for the endpoint. -func (info *endpointsInfo) GetZoneHints() sets.Set[string] { +func (info *endpointInfo) GetZoneHints() sets.Set[string] { return sets.Set[string]{} } // IP returns just the IP part of the endpoint, it's a part of proxy.Endpoint interface. -func (info *endpointsInfo) IP() string { +func (info *endpointInfo) IP() string { return info.ip } // Port returns just the Port part of the endpoint. -func (info *endpointsInfo) Port() (int, error) { +func (info *endpointInfo) Port() (int, error) { return int(info.port), nil } // Equal is part of proxy.Endpoint interface. -func (info *endpointsInfo) Equal(other proxy.Endpoint) bool { +func (info *endpointInfo) Equal(other proxy.Endpoint) bool { return info.String() == other.String() && info.GetIsLocal() == other.GetIsLocal() } // GetNodeName returns the NodeName for this endpoint. -func (info *endpointsInfo) GetNodeName() string { +func (info *endpointInfo) GetNodeName() string { return "" } // GetZone returns the Zone for this endpoint. -func (info *endpointsInfo) GetZone() string { +func (info *endpointInfo) GetZone() string { return "" } @@ -407,7 +407,7 @@ func (proxier *Proxier) onEndpointsMapChange(svcPortName *proxy.ServicePortName, if exists { // Cleanup Endpoints references for _, ep := range epInfos { - epInfo, ok := ep.(*endpointsInfo) + epInfo, ok := ep.(*endpointInfo) if ok { epInfo.Cleanup() @@ -448,7 +448,7 @@ func (proxier *Proxier) onServiceMapChange(svcPortName *proxy.ServicePortName) { } } -// returns a new proxy.Endpoint which abstracts a endpointsInfo +// returns a new proxy.Endpoint which abstracts a endpointInfo func (proxier *Proxier) newEndpointInfo(baseInfo *proxy.BaseEndpointInfo, _ *proxy.ServicePortName) proxy.Endpoint { portNumber, err := baseInfo.Port() @@ -457,7 +457,7 @@ func (proxier *Proxier) newEndpointInfo(baseInfo *proxy.BaseEndpointInfo, _ *pro portNumber = 0 } - info := &endpointsInfo{ + info := &endpointInfo{ ip: baseInfo.IP(), port: uint16(portNumber), isLocal: baseInfo.GetIsLocal(), @@ -474,8 +474,8 @@ func (proxier *Proxier) newEndpointInfo(baseInfo *proxy.BaseEndpointInfo, _ *pro return info } -func newSourceVIP(hns HostNetworkService, network string, ip string, mac string, providerAddress string) (*endpointsInfo, error) { - hnsEndpoint := &endpointsInfo{ +func newSourceVIP(hns HostNetworkService, network string, ip string, mac string, providerAddress string) (*endpointInfo, error) { + hnsEndpoint := &endpointInfo{ ip: ip, isLocal: true, macAddress: mac, @@ -489,15 +489,15 @@ func newSourceVIP(hns HostNetworkService, network string, ip string, mac string, return ep, err } -func (ep *endpointsInfo) DecrementRefCount() { - klog.V(3).InfoS("Decrementing Endpoint RefCount", "endpointsInfo", ep) +func (ep *endpointInfo) DecrementRefCount() { + klog.V(3).InfoS("Decrementing Endpoint RefCount", "endpointInfo", ep) if !ep.GetIsLocal() && ep.refCount != nil && *ep.refCount > 0 { *ep.refCount-- } } -func (ep *endpointsInfo) Cleanup() { - klog.V(3).InfoS("Endpoint cleanup", "endpointsInfo", ep) +func (ep *endpointInfo) Cleanup() { + klog.V(3).InfoS("Endpoint cleanup", "endpointInfo", ep) if !ep.GetIsLocal() && ep.refCount != nil { *ep.refCount-- @@ -601,7 +601,7 @@ type Proxier struct { // services that happened since policies were synced. For a single object, // changes are accumulated, i.e. previous is state from before all of them, // current is state after applying all of those. - endpointsChanges *proxy.EndpointChangeTracker + endpointsChanges *proxy.EndpointsChangeTracker serviceChanges *proxy.ServiceChangeTracker endPointsRefCount endPointsReferenceCountMap mu sync.Mutex // protects the following fields @@ -802,7 +802,7 @@ func NewProxier( } serviceChanges := proxy.NewServiceChangeTracker(proxier.newServiceInfo, ipFamily, recorder, proxier.serviceMapChange) - endPointChangeTracker := proxy.NewEndpointChangeTracker(hostname, proxier.newEndpointInfo, ipFamily, recorder, proxier.endpointsMapChange) + endPointChangeTracker := proxy.NewEndpointsChangeTracker(hostname, proxier.newEndpointInfo, ipFamily, recorder, proxier.endpointsMapChange) proxier.endpointsChanges = endPointChangeTracker proxier.serviceChanges = serviceChanges @@ -868,7 +868,7 @@ func (svcInfo *serviceInfo) cleanupAllPolicies(endpoints []proxy.Endpoint, mapSt } // Cleanup Endpoints references for _, ep := range endpoints { - epInfo, ok := ep.(*endpointsInfo) + epInfo, ok := ep.(*endpointInfo) if ok { if winProxyOptimization { epInfo.DecrementRefCount() @@ -1062,7 +1062,7 @@ func isNetworkNotFoundError(err error) bool { // If atleast one is not terminating, then return false func (proxier *Proxier) isAllEndpointsTerminating(svcName proxy.ServicePortName, isLocalTrafficDSR bool) bool { for _, epInfo := range proxier.endpointsMap[svcName] { - ep, ok := epInfo.(*endpointsInfo) + ep, ok := epInfo.(*endpointInfo) if !ok { continue } @@ -1087,7 +1087,7 @@ func (proxier *Proxier) isAllEndpointsTerminating(svcName proxy.ServicePortName, // If atleast one is serving, then return false func (proxier *Proxier) isAllEndpointsNonServing(svcName proxy.ServicePortName, isLocalTrafficDSR bool) bool { for _, epInfo := range proxier.endpointsMap[svcName] { - ep, ok := epInfo.(*endpointsInfo) + ep, ok := epInfo.(*endpointInfo) if !ok { continue } @@ -1102,7 +1102,7 @@ func (proxier *Proxier) isAllEndpointsNonServing(svcName proxy.ServicePortName, } // updateQueriedEndpoints updates the queriedEndpoints map with newly created endpoint details -func updateQueriedEndpoints(newHnsEndpoint *endpointsInfo, queriedEndpoints map[string]*endpointsInfo) { +func updateQueriedEndpoints(newHnsEndpoint *endpointInfo, queriedEndpoints map[string]*endpointInfo) { // store newly created endpoints in queriedEndpoints queriedEndpoints[newHnsEndpoint.hnsID] = newHnsEndpoint queriedEndpoints[newHnsEndpoint.ip] = newHnsEndpoint @@ -1130,7 +1130,7 @@ func (proxier *Proxier) syncProxyRules() { hnsNetworkName := proxier.network.name hns := proxier.hns - var gatewayHnsendpoint *endpointsInfo + var gatewayHnsendpoint *endpointInfo if proxier.forwardHealthCheckVip { gatewayHnsendpoint, _ = hns.getEndpointByName(proxier.rootHnsEndpointName) } @@ -1153,7 +1153,7 @@ func (proxier *Proxier) syncProxyRules() { endpointUpdateResult := proxier.endpointsMap.Update(proxier.endpointsChanges) deletedUDPClusterIPs := serviceUpdateResult.DeletedUDPClusterIPs - // merge stale services gathered from updateEndpointsMap + // merge stale services gathered from EndpointsMap.Update for _, svcPortName := range endpointUpdateResult.NewlyActiveUDPServices { if svcInfo, ok := proxier.svcPortMap[svcPortName]; ok && svcInfo != nil && svcInfo.Protocol() == v1.ProtocolUDP { klog.V(2).InfoS("Newly-active UDP service may have stale conntrack entries", "servicePortName", svcPortName) @@ -1168,7 +1168,7 @@ func (proxier *Proxier) syncProxyRules() { } if queriedEndpoints == nil { klog.V(4).InfoS("No existing endpoints found in HNS") - queriedEndpoints = make(map[string]*(endpointsInfo)) + queriedEndpoints = make(map[string]*(endpointInfo)) } queriedLoadBalancers, err := hns.getAllLoadBalancers() if queriedLoadBalancers == nil { @@ -1208,7 +1208,7 @@ func (proxier *Proxier) syncProxyRules() { serviceVipEndpoint := queriedEndpoints[svcInfo.ClusterIP().String()] if serviceVipEndpoint == nil { klog.V(4).InfoS("No existing remote endpoint", "IP", svcInfo.ClusterIP()) - hnsEndpoint := &endpointsInfo{ + hnsEndpoint := &endpointInfo{ ip: svcInfo.ClusterIP().String(), isLocal: false, macAddress: proxier.hostMac, @@ -1228,8 +1228,8 @@ func (proxier *Proxier) syncProxyRules() { } } - var hnsEndpoints []endpointsInfo - var hnsLocalEndpoints []endpointsInfo + var hnsEndpoints []endpointInfo + var hnsLocalEndpoints []endpointInfo klog.V(4).InfoS("Applying Policy", "serviceInfo", svcName) // Create Remote endpoints for every endpoint, corresponding to the service containsPublicIP := false @@ -1249,9 +1249,9 @@ func (proxier *Proxier) syncProxyRules() { } for _, epInfo := range proxier.endpointsMap[svcName] { - ep, ok := epInfo.(*endpointsInfo) + ep, ok := epInfo.(*endpointInfo) if !ok { - klog.ErrorS(nil, "Failed to cast endpointsInfo", "serviceName", svcName) + klog.ErrorS(nil, "Failed to cast endpointInfo", "serviceName", svcName) continue } @@ -1274,7 +1274,7 @@ func (proxier *Proxier) syncProxyRules() { } - var newHnsEndpoint *endpointsInfo + var newHnsEndpoint *endpointInfo hnsNetworkName := proxier.network.name var err error @@ -1319,7 +1319,7 @@ func (proxier *Proxier) syncProxyRules() { providerAddress = proxier.nodeIP.String() } - hnsEndpoint := &endpointsInfo{ + hnsEndpoint := &endpointInfo{ ip: ep.ip, isLocal: false, macAddress: conjureMac("02-11", netutils.ParseIPSloppy(ep.ip)), @@ -1328,13 +1328,13 @@ func (proxier *Proxier) syncProxyRules() { newHnsEndpoint, err = hns.createEndpoint(hnsEndpoint, hnsNetworkName) if err != nil { - klog.ErrorS(err, "Remote endpoint creation failed", "endpointsInfo", hnsEndpoint) + klog.ErrorS(err, "Remote endpoint creation failed", "endpointInfo", hnsEndpoint) continue } updateQueriedEndpoints(newHnsEndpoint, queriedEndpoints) } else { - hnsEndpoint := &endpointsInfo{ + hnsEndpoint := &endpointInfo{ ip: ep.ip, isLocal: false, macAddress: ep.macAddress, @@ -1371,7 +1371,7 @@ func (proxier *Proxier) syncProxyRules() { } // Save the hnsId for reference - klog.V(1).InfoS("Hns endpoint resource", "endpointsInfo", newHnsEndpoint) + klog.V(1).InfoS("Hns endpoint resource", "endpointInfo", newHnsEndpoint) hnsEndpoints = append(hnsEndpoints, *newHnsEndpoint) if newHnsEndpoint.GetIsLocal() { @@ -1384,10 +1384,10 @@ func (proxier *Proxier) syncProxyRules() { ep.hnsID = newHnsEndpoint.hnsID - klog.V(3).InfoS("Endpoint resource found", "endpointsInfo", ep) + klog.V(3).InfoS("Endpoint resource found", "endpointInfo", ep) } - klog.V(3).InfoS("Associated endpoints for service", "endpointsInfo", hnsEndpoints, "serviceName", svcName) + klog.V(3).InfoS("Associated endpoints for service", "endpointInfo", hnsEndpoints, "serviceName", svcName) if len(svcInfo.hnsID) > 0 { // This should not happen @@ -1399,7 +1399,7 @@ func (proxier *Proxier) syncProxyRules() { if len(hnsEndpoints) == 0 { if svcInfo.winProxyOptimization { // Deleting loadbalancers when there are no endpoints to serve. - klog.V(3).InfoS("Cleanup existing ", "endpointsInfo", hnsEndpoints, "serviceName", svcName) + klog.V(3).InfoS("Cleanup existing ", "endpointInfo", hnsEndpoints, "serviceName", svcName) svcInfo.deleteLoadBalancerPolicy(proxier.mapStaleLoadbalancers) } klog.ErrorS(nil, "Endpoint information not available for service, not applying any policy", "serviceName", svcName) @@ -1560,10 +1560,10 @@ func (proxier *Proxier) syncProxyRules() { nodeport = svcInfo.HealthCheckNodePort() } - proxier.deleteExistingLoadBalancer(hns, svcInfo.winProxyOptimization, &lbIngressIP.healthCheckHnsID, sourceVip, Enum(svcInfo.Protocol()), uint16(svcInfo.targetPort), uint16(svcInfo.Port()), []endpointsInfo{*gatewayHnsendpoint}, queriedLoadBalancers) + proxier.deleteExistingLoadBalancer(hns, svcInfo.winProxyOptimization, &lbIngressIP.healthCheckHnsID, sourceVip, Enum(svcInfo.Protocol()), uint16(svcInfo.targetPort), uint16(svcInfo.Port()), []endpointInfo{*gatewayHnsendpoint}, queriedLoadBalancers) hnsHealthCheckLoadBalancer, err := hns.getLoadBalancer( - []endpointsInfo{*gatewayHnsendpoint}, + []endpointInfo{*gatewayHnsendpoint}, loadBalancerFlags{isDSR: false, useMUX: svcInfo.preserveDIP, preserveDIP: svcInfo.preserveDIP}, sourceVip, lbIngressIP.ip, @@ -1623,7 +1623,7 @@ func (proxier *Proxier) syncProxyRules() { // deleteExistingLoadBalancer checks whether loadbalancer delete is needed or not. // If it is needed, the function will delete the existing loadbalancer and return true, else false. -func (proxier *Proxier) deleteExistingLoadBalancer(hns HostNetworkService, winProxyOptimization bool, lbHnsID *string, sourceVip string, protocol, intPort, extPort uint16, endpoints []endpointsInfo, queriedLoadBalancers map[loadBalancerIdentifier]*loadBalancerInfo) bool { +func (proxier *Proxier) deleteExistingLoadBalancer(hns HostNetworkService, winProxyOptimization bool, lbHnsID *string, sourceVip string, protocol, intPort, extPort uint16, endpoints []endpointInfo, queriedLoadBalancers map[loadBalancerIdentifier]*loadBalancerInfo) bool { if !winProxyOptimization || *lbHnsID == "" { // Loadbalancer delete not needed diff --git a/pkg/proxy/winkernel/proxier_test.go b/pkg/proxy/winkernel/proxier_test.go index b11a02cec12..fd2856471f9 100644 --- a/pkg/proxy/winkernel/proxier_test.go +++ b/pkg/proxy/winkernel/proxier_test.go @@ -123,8 +123,8 @@ func NewFakeProxier(syncPeriod time.Duration, minSyncPeriod time.Duration, clust } serviceChanges := proxy.NewServiceChangeTracker(proxier.newServiceInfo, v1.IPv4Protocol, nil, proxier.serviceMapChange) - endpointChangeTracker := proxy.NewEndpointChangeTracker(hostname, proxier.newEndpointInfo, v1.IPv4Protocol, nil, proxier.endpointsMapChange) - proxier.endpointsChanges = endpointChangeTracker + endpointsChangeTracker := proxy.NewEndpointsChangeTracker(hostname, proxier.newEndpointInfo, v1.IPv4Protocol, nil, proxier.endpointsMapChange) + proxier.endpointsChanges = endpointsChangeTracker proxier.serviceChanges = serviceChanges return proxier @@ -232,9 +232,9 @@ func TestCreateRemoteEndpointOverlay(t *testing.T) { proxier.syncProxyRules() ep := proxier.endpointsMap[svcPortName][0] - epInfo, ok := ep.(*endpointsInfo) + epInfo, ok := ep.(*endpointInfo) if !ok { - t.Errorf("Failed to cast endpointsInfo %q", svcPortName.String()) + t.Errorf("Failed to cast endpointInfo %q", svcPortName.String()) } else { if epInfo.hnsID != "EPID-3" { @@ -296,9 +296,9 @@ func TestCreateRemoteEndpointL2Bridge(t *testing.T) { proxier.setInitialized(true) proxier.syncProxyRules() ep := proxier.endpointsMap[svcPortName][0] - epInfo, ok := ep.(*endpointsInfo) + epInfo, ok := ep.(*endpointInfo) if !ok { - t.Errorf("Failed to cast endpointsInfo %q", svcPortName.String()) + t.Errorf("Failed to cast endpointInfo %q", svcPortName.String()) } else { if epInfo.hnsID != endpointGuid1 { @@ -389,9 +389,9 @@ func TestSharedRemoteEndpointDelete(t *testing.T) { proxier.setInitialized(true) proxier.syncProxyRules() ep := proxier.endpointsMap[svcPortName1][0] - epInfo, ok := ep.(*endpointsInfo) + epInfo, ok := ep.(*endpointInfo) if !ok { - t.Errorf("Failed to cast endpointsInfo %q", svcPortName1.String()) + t.Errorf("Failed to cast endpointInfo %q", svcPortName1.String()) } else { if epInfo.hnsID != endpointGuid1 { @@ -439,9 +439,9 @@ func TestSharedRemoteEndpointDelete(t *testing.T) { proxier.syncProxyRules() ep = proxier.endpointsMap[svcPortName1][0] - epInfo, ok = ep.(*endpointsInfo) + epInfo, ok = ep.(*endpointInfo) if !ok { - t.Errorf("Failed to cast endpointsInfo %q", svcPortName1.String()) + t.Errorf("Failed to cast endpointInfo %q", svcPortName1.String()) } else { if epInfo.hnsID != endpointGuid1 { @@ -534,9 +534,9 @@ func TestSharedRemoteEndpointUpdate(t *testing.T) { proxier.setInitialized(true) proxier.syncProxyRules() ep := proxier.endpointsMap[svcPortName1][0] - epInfo, ok := ep.(*endpointsInfo) + epInfo, ok := ep.(*endpointInfo) if !ok { - t.Errorf("Failed to cast endpointsInfo %q", svcPortName1.String()) + t.Errorf("Failed to cast endpointInfo %q", svcPortName1.String()) } else { if epInfo.hnsID != endpointGuid1 { @@ -613,10 +613,10 @@ func TestSharedRemoteEndpointUpdate(t *testing.T) { proxier.syncProxyRules() ep = proxier.endpointsMap[svcPortName1][0] - epInfo, ok = ep.(*endpointsInfo) + epInfo, ok = ep.(*endpointInfo) if !ok { - t.Errorf("Failed to cast endpointsInfo %q", svcPortName1.String()) + t.Errorf("Failed to cast endpointInfo %q", svcPortName1.String()) } else { if epInfo.hnsID != endpointGuid1 { @@ -913,9 +913,9 @@ func TestEndpointSlice(t *testing.T) { } ep := proxier.endpointsMap[svcPortName][0] - epInfo, ok := ep.(*endpointsInfo) + epInfo, ok := ep.(*endpointInfo) if !ok { - t.Errorf("Failed to cast endpointsInfo %q", svcPortName.String()) + t.Errorf("Failed to cast endpointInfo %q", svcPortName.String()) } else { if epInfo.hnsID != "EPID-3" {