Fix "Endpoint" vs "Endpoints" in proxy type names

The use of "Endpoint" vs "Endpoints" in these type names is tricky
because it doesn't always make sense to use the same singular/plural
convention as the corresonding service-related types, since often the
service-related type is referring to a single service while the
endpoint-related type is referring to multiple endpoint IPs.

The "endpointsInfo" types in the iptables and winkernel proxiers are
now "endpointInfo" because they describe a single endpoint IP (and
wrap proxy.BaseEndpointInfo).

"UpdateEndpointMapResult" is now "UpdateEndpointsMapResult", because
it is the result of EndpointsMap.Update (and it's clearly correct for
EndpointsMap to have plural "Endpoints" because it's a map to an array
of proxy.Endpoint objects.)

"EndpointChangeTracker" is now "EndpointsChangeTracker" because it
tracks changes to the full set of endpoints for a particular service
(and the new name matches the existing "endpointsChange" type and
"Proxier.endpointsChanges" fields.)
This commit is contained in:
Dan Winship 2023-10-09 17:21:12 -04:00
parent 59424358cc
commit 6c395eb098
12 changed files with 236 additions and 236 deletions

View File

@ -27,23 +27,23 @@ import (
// CleanStaleEntries takes care of flushing stale conntrack entries for services and endpoints.
func CleanStaleEntries(isIPv6 bool, exec utilexec.Interface, svcPortMap proxy.ServicePortMap,
serviceUpdateResult proxy.UpdateServiceMapResult, endpointUpdateResult proxy.UpdateEndpointMapResult) {
serviceUpdateResult proxy.UpdateServiceMapResult, endpointsUpdateResult proxy.UpdateEndpointsMapResult) {
deleteStaleServiceConntrackEntries(isIPv6, exec, svcPortMap, serviceUpdateResult, endpointUpdateResult)
deleteStaleEndpointConntrackEntries(exec, svcPortMap, endpointUpdateResult)
deleteStaleServiceConntrackEntries(isIPv6, exec, svcPortMap, serviceUpdateResult, endpointsUpdateResult)
deleteStaleEndpointConntrackEntries(exec, svcPortMap, endpointsUpdateResult)
}
// deleteStaleServiceConntrackEntries takes care of flushing stale conntrack entries related
// to UDP Service IPs. When a service has no endpoints and we drop traffic to it, conntrack
// may create "black hole" entries for that IP+port. When the service gets endpoints we
// need to delete those entries so further traffic doesn't get dropped.
func deleteStaleServiceConntrackEntries(isIPv6 bool, exec utilexec.Interface, svcPortMap proxy.ServicePortMap, serviceUpdateResult proxy.UpdateServiceMapResult, endpointUpdateResult proxy.UpdateEndpointMapResult) {
func deleteStaleServiceConntrackEntries(isIPv6 bool, exec utilexec.Interface, svcPortMap proxy.ServicePortMap, serviceUpdateResult proxy.UpdateServiceMapResult, endpointsUpdateResult proxy.UpdateEndpointsMapResult) {
conntrackCleanupServiceIPs := serviceUpdateResult.DeletedUDPClusterIPs
conntrackCleanupServiceNodePorts := sets.New[int]()
// merge newly active services gathered from updateEndpointsMap
// merge newly active services gathered from endpointsUpdateResult
// a UDP service that changes from 0 to non-0 endpoints is newly active.
for _, svcPortName := range endpointUpdateResult.NewlyActiveUDPServices {
for _, svcPortName := range endpointsUpdateResult.NewlyActiveUDPServices {
if svcInfo, ok := svcPortMap[svcPortName]; ok {
klog.V(4).InfoS("Newly-active UDP service may have stale conntrack entries", "servicePortName", svcPortName)
conntrackCleanupServiceIPs.Insert(svcInfo.ClusterIP().String())
@ -78,8 +78,8 @@ func deleteStaleServiceConntrackEntries(isIPv6 bool, exec utilexec.Interface, sv
// deleteStaleEndpointConntrackEntries takes care of flushing stale conntrack entries related
// to UDP endpoints. After a UDP endpoint is removed we must flush any conntrack entries
// for it so that if the same client keeps sending, the packets will get routed to a new endpoint.
func deleteStaleEndpointConntrackEntries(exec utilexec.Interface, svcPortMap proxy.ServicePortMap, endpointUpdateResult proxy.UpdateEndpointMapResult) {
for _, epSvcPair := range endpointUpdateResult.DeletedUDPEndpoints {
func deleteStaleEndpointConntrackEntries(exec utilexec.Interface, svcPortMap proxy.ServicePortMap, endpointsUpdateResult proxy.UpdateEndpointsMapResult) {
for _, epSvcPair := range endpointsUpdateResult.DeletedUDPEndpoints {
if svcInfo, ok := svcPortMap[epSvcPair.ServicePortName]; ok {
endpointIP := proxyutil.IPPart(epSvcPair.Endpoint)
nodePort := svcInfo.NodePort()

View File

@ -147,9 +147,9 @@ type makeEndpointFunc func(info *BaseEndpointInfo, svcPortName *ServicePortName)
// EndpointsMap's but just use the changes for any Proxier specific cleanup.
type processEndpointsMapChangeFunc func(oldEndpointsMap, newEndpointsMap EndpointsMap)
// EndpointChangeTracker carries state about uncommitted changes to an arbitrary number of
// EndpointsChangeTracker carries state about uncommitted changes to an arbitrary number of
// Endpoints, keyed by their namespace and name.
type EndpointChangeTracker struct {
type EndpointsChangeTracker struct {
// lock protects lastChangeTriggerTimes
lock sync.Mutex
@ -159,16 +159,16 @@ type EndpointChangeTracker struct {
// Map from the Endpoints namespaced-name to the times of the triggers that caused the endpoints
// object to change. Used to calculate the network-programming-latency.
lastChangeTriggerTimes map[types.NamespacedName][]time.Time
// record the time when the endpointChangeTracker was created so we can ignore the endpoints
// record the time when the endpointsChangeTracker was created so we can ignore the endpoints
// that were generated before, because we can't estimate the network-programming-latency on those.
// This is specially problematic on restarts, because we process all the endpoints that may have been
// created hours or days before.
trackerStartTime time.Time
}
// NewEndpointChangeTracker initializes an EndpointsChangeMap
func NewEndpointChangeTracker(hostname string, makeEndpointInfo makeEndpointFunc, ipFamily v1.IPFamily, recorder events.EventRecorder, processEndpointsMapChange processEndpointsMapChangeFunc) *EndpointChangeTracker {
return &EndpointChangeTracker{
// NewEndpointsChangeTracker initializes an EndpointsChangeTracker
func NewEndpointsChangeTracker(hostname string, makeEndpointInfo makeEndpointFunc, ipFamily v1.IPFamily, recorder events.EventRecorder, processEndpointsMapChange processEndpointsMapChangeFunc) *EndpointsChangeTracker {
return &EndpointsChangeTracker{
lastChangeTriggerTimes: make(map[types.NamespacedName][]time.Time),
trackerStartTime: time.Now(),
processEndpointsMapChange: processEndpointsMapChange,
@ -177,9 +177,9 @@ func NewEndpointChangeTracker(hostname string, makeEndpointInfo makeEndpointFunc
}
// EndpointSliceUpdate updates given service's endpoints change map based on the <previous, current> endpoints pair.
// It returns true if items changed, otherwise return false. Will add/update/delete items of EndpointsChangeMap.
// It returns true if items changed, otherwise return false. Will add/update/delete items of EndpointsChangeTracker.
// If removeSlice is true, slice will be removed, otherwise it will be added or updated.
func (ect *EndpointChangeTracker) EndpointSliceUpdate(endpointSlice *discovery.EndpointSlice, removeSlice bool) bool {
func (ect *EndpointsChangeTracker) EndpointSliceUpdate(endpointSlice *discovery.EndpointSlice, removeSlice bool) bool {
if !supportedEndpointSliceAddressTypes.Has(string(endpointSlice.AddressType)) {
klog.V(4).InfoS("EndpointSlice address type not supported by kube-proxy", "addressType", endpointSlice.AddressType)
return false
@ -225,13 +225,13 @@ func (ect *EndpointChangeTracker) EndpointSliceUpdate(endpointSlice *discovery.E
// PendingChanges returns a set whose keys are the names of the services whose endpoints
// have changed since the last time ect was used to update an EndpointsMap. (You must call
// this _before_ calling em.Update(ect).)
func (ect *EndpointChangeTracker) PendingChanges() sets.Set[string] {
func (ect *EndpointsChangeTracker) PendingChanges() sets.Set[string] {
return ect.endpointSliceCache.pendingChanges()
}
// checkoutChanges returns a list of pending endpointsChanges and marks them as
// applied.
func (ect *EndpointChangeTracker) checkoutChanges() []*endpointsChange {
func (ect *EndpointsChangeTracker) checkoutChanges() []*endpointsChange {
metrics.EndpointChangesPending.Set(0)
return ect.endpointSliceCache.checkoutChanges()
@ -239,7 +239,7 @@ func (ect *EndpointChangeTracker) checkoutChanges() []*endpointsChange {
// checkoutTriggerTimes applies the locally cached trigger times to a map of
// trigger times that have been passed in and empties the local cache.
func (ect *EndpointChangeTracker) checkoutTriggerTimes(lastChangeTriggerTimes *map[types.NamespacedName][]time.Time) {
func (ect *EndpointsChangeTracker) checkoutTriggerTimes(lastChangeTriggerTimes *map[types.NamespacedName][]time.Time) {
ect.lock.Lock()
defer ect.lock.Unlock()
@ -284,8 +284,8 @@ type endpointsChange struct {
current EndpointsMap
}
// UpdateEndpointMapResult is the updated results after applying endpoints changes.
type UpdateEndpointMapResult struct {
// UpdateEndpointsMapResult is the updated results after applying endpoints changes.
type UpdateEndpointsMapResult struct {
// DeletedUDPEndpoints identifies UDP endpoints that have just been deleted.
// Existing conntrack NAT entries pointing to these endpoints must be deleted to
// ensure that no further traffic for the Service gets delivered to them.
@ -304,7 +304,7 @@ type UpdateEndpointMapResult struct {
}
// Update updates endpointsMap base on the given changes.
func (em EndpointsMap) Update(changes *EndpointChangeTracker) (result UpdateEndpointMapResult) {
func (em EndpointsMap) Update(changes *EndpointsChangeTracker) (result UpdateEndpointsMapResult) {
result.DeletedUDPEndpoints = make([]ServiceEndpoint, 0)
result.NewlyActiveUDPServices = make([]ServicePortName, 0)
result.LastChangeTriggerTimes = make(map[types.NamespacedName][]time.Time)
@ -321,7 +321,7 @@ type EndpointsMap map[ServicePortName][]Endpoint
// and clear the changes map. In addition it returns (via argument) and resets the
// lastChangeTriggerTimes for all endpoints that were changed and will result in syncing
// the proxy rules. apply triggers processEndpointsMapChange on every change.
func (em EndpointsMap) apply(ect *EndpointChangeTracker, deletedUDPEndpoints *[]ServiceEndpoint,
func (em EndpointsMap) apply(ect *EndpointsChangeTracker, deletedUDPEndpoints *[]ServiceEndpoint,
newlyActiveUDPServices *[]ServicePortName, lastChangeTriggerTimes *map[types.NamespacedName][]time.Time) {
if ect == nil {
return
@ -396,7 +396,7 @@ func (em EndpointsMap) LocalReadyEndpoints() map[types.NamespacedName]int {
}
// detectStaleConntrackEntries detects services that may be associated with stale conntrack entries.
// (See UpdateEndpointMapResult.DeletedUDPEndpoints and .NewlyActiveUDPServices.)
// (See UpdateEndpointsMapResult.DeletedUDPEndpoints and .NewlyActiveUDPServices.)
func detectStaleConntrackEntries(oldEndpointsMap, newEndpointsMap EndpointsMap, deletedUDPEndpoints *[]ServiceEndpoint, newlyActiveUDPServices *[]ServicePortName) {
// Find the UDP endpoints that we were sending traffic to in oldEndpointsMap, but
// are no longer sending to newEndpointsMap. The proxier should make sure that

View File

@ -1340,7 +1340,7 @@ func TestEndpointSliceUpdate(t *testing.T) {
testCases := map[string]struct {
startingSlices []*discovery.EndpointSlice
endpointChangeTracker *EndpointChangeTracker
endpointsChangeTracker *EndpointsChangeTracker
namespacedName types.NamespacedName
paramEndpointSlice *discovery.EndpointSlice
paramRemoveSlice bool
@ -1351,7 +1351,7 @@ func TestEndpointSliceUpdate(t *testing.T) {
// test starting from an empty state
"add a simple slice that doesn't already exist": {
startingSlices: []*discovery.EndpointSlice{},
endpointChangeTracker: NewEndpointChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil),
endpointsChangeTracker: NewEndpointsChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil),
namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"},
paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 3, 999, 999, []string{"host1", "host2"}, []*int32{pointer.Int32(80), pointer.Int32(443)}),
paramRemoveSlice: false,
@ -1375,7 +1375,7 @@ func TestEndpointSliceUpdate(t *testing.T) {
startingSlices: []*discovery.EndpointSlice{
generateEndpointSlice("svc1", "ns1", 1, 3, 999, 999, []string{"host1", "host2"}, []*int32{pointer.Int32(80), pointer.Int32(443)}),
},
endpointChangeTracker: NewEndpointChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil),
endpointsChangeTracker: NewEndpointsChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil),
namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"},
paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 3, 999, 999, []string{"host1", "host2"}, []*int32{pointer.Int32(80), pointer.Int32(443)}),
paramRemoveSlice: false,
@ -1388,7 +1388,7 @@ func TestEndpointSliceUpdate(t *testing.T) {
startingSlices: []*discovery.EndpointSlice{
generateEndpointSlice("svc1", "ns1", 1, 3, 999, 999, []string{"host1", "host2"}, []*int32{pointer.Int32(80), pointer.Int32(443)}),
},
endpointChangeTracker: NewEndpointChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil),
endpointsChangeTracker: NewEndpointsChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil),
namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"},
paramEndpointSlice: fqdnSlice,
paramRemoveSlice: false,
@ -1402,7 +1402,7 @@ func TestEndpointSliceUpdate(t *testing.T) {
generateEndpointSlice("svc1", "ns1", 1, 3, 999, 999, []string{"host1", "host2"}, []*int32{pointer.Int32(80), pointer.Int32(443)}),
generateEndpointSlice("svc1", "ns1", 2, 2, 999, 999, []string{"host1", "host2"}, []*int32{pointer.Int32(80), pointer.Int32(443)}),
},
endpointChangeTracker: NewEndpointChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil),
endpointsChangeTracker: NewEndpointsChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil),
namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"},
paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 5, 999, 999, []string{"host1"}, []*int32{pointer.Int32(80), pointer.Int32(443)}),
paramRemoveSlice: false,
@ -1435,7 +1435,7 @@ func TestEndpointSliceUpdate(t *testing.T) {
generateEndpointSlice("svc1", "ns1", 1, 3, 999, 999, []string{"host1", "host2"}, []*int32{pointer.Int32(80), pointer.Int32(443)}),
generateEndpointSlice("svc1", "ns1", 2, 2, 999, 999, []string{"host1", "host2"}, []*int32{pointer.Int32(80), pointer.Int32(443)}),
},
endpointChangeTracker: NewEndpointChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil),
endpointsChangeTracker: NewEndpointsChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil),
namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"},
paramEndpointSlice: generateEndpointSliceWithOffset("svc1", "ns1", 3, 1, 5, 999, 999, []string{"host1"}, []*int32{pointer.Int32(80)}),
paramRemoveSlice: false,
@ -1466,7 +1466,7 @@ func TestEndpointSliceUpdate(t *testing.T) {
generateEndpointSlice("svc1", "ns1", 1, 3, 999, 999, []string{"host1", "host2"}, []*int32{pointer.Int32(80), pointer.Int32(443)}),
generateEndpointSlice("svc1", "ns1", 2, 2, 999, 999, []string{"host1", "host2"}, []*int32{pointer.Int32(80), pointer.Int32(443)}),
},
endpointChangeTracker: NewEndpointChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil),
endpointsChangeTracker: NewEndpointsChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil),
namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"},
paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 5, 999, 999, []string{"host1"}, []*int32{pointer.Int32(80), pointer.Int32(443)}),
paramRemoveSlice: true,
@ -1489,7 +1489,7 @@ func TestEndpointSliceUpdate(t *testing.T) {
generateEndpointSlice("svc1", "ns1", 1, 5, 999, 999, []string{"host1"}, []*int32{pointer.Int32(80), pointer.Int32(443)}),
generateEndpointSlice("svc1", "ns1", 2, 2, 999, 999, []string{"host1"}, []*int32{pointer.Int32(80), pointer.Int32(443)}),
},
endpointChangeTracker: NewEndpointChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil),
endpointsChangeTracker: NewEndpointsChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil),
namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"},
paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 3, 5, 999, 999, []string{"host1"}, []*int32{pointer.Int32(80), pointer.Int32(443)}),
paramRemoveSlice: true,
@ -1502,7 +1502,7 @@ func TestEndpointSliceUpdate(t *testing.T) {
startingSlices: []*discovery.EndpointSlice{
generateEndpointSlice("svc1", "ns1", 1, 3, 999, 999, []string{"host1", "host2"}, []*int32{pointer.Int32(80), pointer.Int32(443)}),
},
endpointChangeTracker: NewEndpointChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil),
endpointsChangeTracker: NewEndpointsChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil),
namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"},
paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 3, 1, 999, []string{"host1"}, []*int32{pointer.Int32(80), pointer.Int32(443)}),
paramRemoveSlice: false,
@ -1526,7 +1526,7 @@ func TestEndpointSliceUpdate(t *testing.T) {
startingSlices: []*discovery.EndpointSlice{
generateEndpointSlice("svc1", "ns1", 1, 2, 1, 999, []string{"host1", "host2"}, []*int32{pointer.Int32(80), pointer.Int32(443)}),
},
endpointChangeTracker: NewEndpointChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil),
endpointsChangeTracker: NewEndpointsChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil),
namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"},
paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 2, 999, 999, []string{"host1"}, []*int32{pointer.Int32(80), pointer.Int32(443)}),
paramRemoveSlice: false,
@ -1549,7 +1549,7 @@ func TestEndpointSliceUpdate(t *testing.T) {
generateEndpointSlice("svc1", "ns1", 1, 3, 2, 999, []string{"host1"}, []*int32{pointer.Int32(80), pointer.Int32(443)}),
generateEndpointSlice("svc1", "ns1", 2, 2, 2, 999, []string{"host1"}, []*int32{pointer.Int32(80), pointer.Int32(443)}),
},
endpointChangeTracker: NewEndpointChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil),
endpointsChangeTracker: NewEndpointsChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil),
namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"},
paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 3, 3, 999, []string{"host1"}, []*int32{pointer.Int32(80), pointer.Int32(443)}),
paramRemoveSlice: false,
@ -1578,7 +1578,7 @@ func TestEndpointSliceUpdate(t *testing.T) {
generateEndpointSlice("svc1", "ns1", 1, 3, 2, 2, []string{"host1"}, []*int32{pointer.Int32(80), pointer.Int32(443)}),
generateEndpointSlice("svc1", "ns1", 2, 2, 2, 2, []string{"host1"}, []*int32{pointer.Int32(80), pointer.Int32(443)}),
},
endpointChangeTracker: NewEndpointChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil),
endpointsChangeTracker: NewEndpointsChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil),
namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"},
paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 3, 3, 2, []string{"host1"}, []*int32{pointer.Int32(80), pointer.Int32(443)}),
paramRemoveSlice: false,
@ -1605,19 +1605,19 @@ func TestEndpointSliceUpdate(t *testing.T) {
for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
initializeCache(tc.endpointChangeTracker.endpointSliceCache, tc.startingSlices)
initializeCache(tc.endpointsChangeTracker.endpointSliceCache, tc.startingSlices)
got := tc.endpointChangeTracker.EndpointSliceUpdate(tc.paramEndpointSlice, tc.paramRemoveSlice)
got := tc.endpointsChangeTracker.EndpointSliceUpdate(tc.paramEndpointSlice, tc.paramRemoveSlice)
if !reflect.DeepEqual(got, tc.expectedReturnVal) {
t.Errorf("EndpointSliceUpdate return value got: %v, want %v", got, tc.expectedReturnVal)
}
pendingChanges := tc.endpointChangeTracker.PendingChanges()
pendingChanges := tc.endpointsChangeTracker.PendingChanges()
if !pendingChanges.Equal(tc.expectedChangedEndpoints) {
t.Errorf("expected changed endpoints %q, got %q", tc.expectedChangedEndpoints.UnsortedList(), pendingChanges.UnsortedList())
}
changes := tc.endpointChangeTracker.checkoutChanges()
changes := tc.endpointsChangeTracker.checkoutChanges()
if tc.expectedCurrentChange == nil {
if len(changes) != 0 {
t.Errorf("Expected %s to have no changes", tc.namespacedName)
@ -1637,20 +1637,20 @@ func TestCheckoutChanges(t *testing.T) {
svcPortName1 := ServicePortName{types.NamespacedName{Namespace: "ns1", Name: "svc1"}, "port-1", v1.ProtocolTCP}
testCases := map[string]struct {
endpointChangeTracker *EndpointChangeTracker
endpointsChangeTracker *EndpointsChangeTracker
expectedChanges []*endpointsChange
items map[types.NamespacedName]*endpointsChange
appliedSlices []*discovery.EndpointSlice
pendingSlices []*discovery.EndpointSlice
}{
"empty slices": {
endpointChangeTracker: NewEndpointChangeTracker("", nil, v1.IPv4Protocol, nil, nil),
endpointsChangeTracker: NewEndpointsChangeTracker("", nil, v1.IPv4Protocol, nil, nil),
expectedChanges: []*endpointsChange{},
appliedSlices: []*discovery.EndpointSlice{},
pendingSlices: []*discovery.EndpointSlice{},
},
"adding initial slice": {
endpointChangeTracker: NewEndpointChangeTracker("", nil, v1.IPv4Protocol, nil, nil),
endpointsChangeTracker: NewEndpointsChangeTracker("", nil, v1.IPv4Protocol, nil, nil),
expectedChanges: []*endpointsChange{{
previous: EndpointsMap{},
current: EndpointsMap{
@ -1663,7 +1663,7 @@ func TestCheckoutChanges(t *testing.T) {
},
},
"removing port in update": {
endpointChangeTracker: NewEndpointChangeTracker("", nil, v1.IPv4Protocol, nil, nil),
endpointsChangeTracker: NewEndpointsChangeTracker("", nil, v1.IPv4Protocol, nil, nil),
expectedChanges: []*endpointsChange{{
previous: EndpointsMap{
svcPortName0: []Endpoint{newTestEp("10.0.1.1:80", "host1", true, true, false), newTestEp("10.0.1.2:80", "host1", true, true, false), newTestEp("10.0.1.3:80", "host1", false, false, false)},
@ -1685,13 +1685,13 @@ func TestCheckoutChanges(t *testing.T) {
for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
for _, slice := range tc.appliedSlices {
tc.endpointChangeTracker.EndpointSliceUpdate(slice, false)
tc.endpointsChangeTracker.EndpointSliceUpdate(slice, false)
}
tc.endpointChangeTracker.checkoutChanges()
tc.endpointsChangeTracker.checkoutChanges()
for _, slice := range tc.pendingSlices {
tc.endpointChangeTracker.EndpointSliceUpdate(slice, false)
tc.endpointsChangeTracker.EndpointSliceUpdate(slice, false)
}
changes := tc.endpointChangeTracker.checkoutChanges()
changes := tc.endpointsChangeTracker.checkoutChanges()
if len(tc.expectedChanges) != len(changes) {
t.Fatalf("Expected %d changes, got %d", len(tc.expectedChanges), len(changes))
@ -1730,7 +1730,7 @@ func compareEndpointsMapsStr(t *testing.T, newMap EndpointsMap, expected map[Ser
for i := range expected[x] {
newEp, ok := newMap[x][i].(*BaseEndpointInfo)
if !ok {
t.Fatalf("Failed to cast endpointsInfo")
t.Fatalf("Failed to cast endpointInfo")
}
if !endpointEqual(newEp, expected[x][i]) {
t.Fatalf("expected new[%v][%d] to be %v, got %v"+

View File

@ -122,15 +122,15 @@ func newServiceInfo(port *v1.ServicePort, service *v1.Service, bsvcPortInfo *pro
}
// internal struct for endpoints information
type endpointsInfo struct {
type endpointInfo struct {
*proxy.BaseEndpointInfo
ChainName utiliptables.Chain
}
// returns a new proxy.Endpoint which abstracts a endpointsInfo
// returns a new proxy.Endpoint which abstracts a endpointInfo
func newEndpointInfo(baseInfo *proxy.BaseEndpointInfo, svcPortName *proxy.ServicePortName) proxy.Endpoint {
return &endpointsInfo{
return &endpointInfo{
BaseEndpointInfo: baseInfo,
ChainName: servicePortEndpointChainName(svcPortName.String(), strings.ToLower(string(svcPortName.Protocol)), baseInfo.Endpoint),
}
@ -143,7 +143,7 @@ type Proxier struct {
// services that happened since iptables was synced. For a single object,
// changes are accumulated, i.e. previous is state from before all of them,
// current is state after applying all of those.
endpointsChanges *proxy.EndpointChangeTracker
endpointsChanges *proxy.EndpointsChangeTracker
serviceChanges *proxy.ServiceChangeTracker
mu sync.Mutex // protects the following fields
@ -265,7 +265,7 @@ func NewProxier(ipFamily v1.IPFamily,
svcPortMap: make(proxy.ServicePortMap),
serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, ipFamily, recorder, nil),
endpointsMap: make(proxy.EndpointsMap),
endpointsChanges: proxy.NewEndpointChangeTracker(hostname, newEndpointInfo, ipFamily, recorder, nil),
endpointsChanges: proxy.NewEndpointsChangeTracker(hostname, newEndpointInfo, ipFamily, recorder, nil),
needFullSync: true,
syncPeriod: syncPeriod,
iptables: ipt,
@ -952,7 +952,7 @@ func (proxier *Proxier) syncProxyRules() {
// Note the endpoint chains that will be used
for _, ep := range allLocallyReachableEndpoints {
if epInfo, ok := ep.(*endpointsInfo); ok {
if epInfo, ok := ep.(*endpointInfo); ok {
activeNATChains[epInfo.ChainName] = true
}
}
@ -1345,9 +1345,9 @@ func (proxier *Proxier) syncProxyRules() {
// Generate the per-endpoint chains.
for _, ep := range allLocallyReachableEndpoints {
epInfo, ok := ep.(*endpointsInfo)
epInfo, ok := ep.(*endpointInfo)
if !ok {
klog.ErrorS(nil, "Failed to cast endpointsInfo", "endpointsInfo", ep)
klog.ErrorS(nil, "Failed to cast endpointInfo", "endpointInfo", ep)
continue
}
@ -1556,7 +1556,7 @@ func (proxier *Proxier) writeServiceToEndpointRules(natRules proxyutil.LineBuffe
// First write session affinity rules, if applicable.
if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP {
for _, ep := range endpoints {
epInfo, ok := ep.(*endpointsInfo)
epInfo, ok := ep.(*endpointInfo)
if !ok {
continue
}
@ -1578,7 +1578,7 @@ func (proxier *Proxier) writeServiceToEndpointRules(natRules proxyutil.LineBuffe
// Now write loadbalancing rules.
numEndpoints := len(endpoints)
for i, ep := range endpoints {
epInfo, ok := ep.(*endpointsInfo)
epInfo, ok := ep.(*endpointInfo)
if !ok {
continue
}

View File

@ -330,7 +330,7 @@ func NewFakeProxier(ipt utiliptables.Interface) *Proxier {
svcPortMap: make(proxy.ServicePortMap),
serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, ipfamily, nil, nil),
endpointsMap: make(proxy.EndpointsMap),
endpointsChanges: proxy.NewEndpointChangeTracker(testHostname, newEndpointInfo, ipfamily, nil, nil),
endpointsChanges: proxy.NewEndpointsChangeTracker(testHostname, newEndpointInfo, ipfamily, nil, nil),
needFullSync: true,
iptables: ipt,
masqueradeMark: "0x4000",
@ -3371,7 +3371,7 @@ func makeServiceMap(proxier *Proxier, allServices ...*v1.Service) {
proxier.servicesSynced = true
}
func compareEndpointsMapsExceptChainName(t *testing.T, tci int, newMap proxy.EndpointsMap, expected map[proxy.ServicePortName][]*endpointsInfo) {
func compareEndpointsMapsExceptChainName(t *testing.T, tci int, newMap proxy.EndpointsMap, expected map[proxy.ServicePortName][]*endpointInfo) {
if len(newMap) != len(expected) {
t.Errorf("[%d] expected %d results, got %d: %v", tci, len(expected), len(newMap), newMap)
}
@ -3380,9 +3380,9 @@ func compareEndpointsMapsExceptChainName(t *testing.T, tci int, newMap proxy.End
t.Errorf("[%d] expected %d endpoints for %v, got %d", tci, len(expected[x]), x, len(newMap[x]))
} else {
for i := range expected[x] {
newEp, ok := newMap[x][i].(*endpointsInfo)
newEp, ok := newMap[x][i].(*endpointInfo)
if !ok {
t.Errorf("Failed to cast endpointsInfo")
t.Errorf("Failed to cast endpointInfo")
continue
}
if newEp.Endpoint != expected[x][i].Endpoint ||
@ -3731,16 +3731,16 @@ func TestUpdateEndpointsMap(t *testing.T) {
name string
previousEndpoints []*discovery.EndpointSlice
currentEndpoints []*discovery.EndpointSlice
oldEndpoints map[proxy.ServicePortName][]*endpointsInfo
expectedResult map[proxy.ServicePortName][]*endpointsInfo
oldEndpoints map[proxy.ServicePortName][]*endpointInfo
expectedResult map[proxy.ServicePortName][]*endpointInfo
expectedDeletedUDPEndpoints []proxy.ServiceEndpoint
expectedNewlyActiveUDPServices map[proxy.ServicePortName]bool
expectedLocalEndpoints map[types.NamespacedName]int
}{{
// Case[0]: nothing
name: "nothing",
oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{},
expectedResult: map[proxy.ServicePortName][]*endpointsInfo{},
oldEndpoints: map[proxy.ServicePortName][]*endpointInfo{},
expectedResult: map[proxy.ServicePortName][]*endpointInfo{},
expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{},
expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{},
expectedLocalEndpoints: map[types.NamespacedName]int{},
@ -3749,12 +3749,12 @@ func TestUpdateEndpointsMap(t *testing.T) {
name: "no change, named port, local",
previousEndpoints: namedPortLocal,
currentEndpoints: namedPortLocal,
oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
oldEndpoints: map[proxy.ServicePortName][]*endpointInfo{
makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
{BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.1:11", IsLocal: true, Ready: true, Serving: true, Terminating: false}},
},
},
expectedResult: map[proxy.ServicePortName][]*endpointsInfo{
expectedResult: map[proxy.ServicePortName][]*endpointInfo{
makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
{BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.1:11", IsLocal: true, Ready: true, Serving: true, Terminating: false}},
},
@ -3769,7 +3769,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
name: "no change, multiple subsets",
previousEndpoints: multipleSubsets,
currentEndpoints: multipleSubsets,
oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
oldEndpoints: map[proxy.ServicePortName][]*endpointInfo{
makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
{BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}},
},
@ -3777,7 +3777,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
{BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.2:12", IsLocal: false, Ready: true, Serving: true, Terminating: false}},
},
},
expectedResult: map[proxy.ServicePortName][]*endpointsInfo{
expectedResult: map[proxy.ServicePortName][]*endpointInfo{
makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
{BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}},
},
@ -3793,7 +3793,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
name: "no change, multiple subsets, multiple ports, local",
previousEndpoints: multipleSubsetsMultiplePortsLocal,
currentEndpoints: multipleSubsetsMultiplePortsLocal,
oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
oldEndpoints: map[proxy.ServicePortName][]*endpointInfo{
makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
{BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.1:11", IsLocal: true, Ready: true, Serving: true, Terminating: false}},
},
@ -3804,7 +3804,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
{BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.3:13", IsLocal: false, Ready: true, Serving: true, Terminating: false}},
},
},
expectedResult: map[proxy.ServicePortName][]*endpointsInfo{
expectedResult: map[proxy.ServicePortName][]*endpointInfo{
makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
{BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.1:11", IsLocal: true, Ready: true, Serving: true, Terminating: false}},
},
@ -3825,7 +3825,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
name: "no change, multiple endpoints, subsets, IPs, and ports",
previousEndpoints: multipleSubsetsIPsPorts,
currentEndpoints: multipleSubsetsIPsPorts,
oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
oldEndpoints: map[proxy.ServicePortName][]*endpointInfo{
makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
{BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}},
{BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.2:11", IsLocal: true, Ready: true, Serving: true, Terminating: false}},
@ -3851,7 +3851,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
{BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.2.2.2:22", IsLocal: true, Ready: true, Serving: true, Terminating: false}},
},
},
expectedResult: map[proxy.ServicePortName][]*endpointsInfo{
expectedResult: map[proxy.ServicePortName][]*endpointInfo{
makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
{BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}},
{BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.2:11", IsLocal: true, Ready: true, Serving: true, Terminating: false}},
@ -3888,8 +3888,8 @@ func TestUpdateEndpointsMap(t *testing.T) {
name: "add an Endpoints",
previousEndpoints: []*discovery.EndpointSlice{nil},
currentEndpoints: namedPortLocal,
oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{},
expectedResult: map[proxy.ServicePortName][]*endpointsInfo{
oldEndpoints: map[proxy.ServicePortName][]*endpointInfo{},
expectedResult: map[proxy.ServicePortName][]*endpointInfo{
makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
{BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.1:11", IsLocal: true, Ready: true, Serving: true, Terminating: false}},
},
@ -3906,12 +3906,12 @@ func TestUpdateEndpointsMap(t *testing.T) {
name: "remove an Endpoints",
previousEndpoints: namedPortLocal,
currentEndpoints: []*discovery.EndpointSlice{nil},
oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
oldEndpoints: map[proxy.ServicePortName][]*endpointInfo{
makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
{BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.1:11", IsLocal: true, Ready: true, Serving: true, Terminating: false}},
},
},
expectedResult: map[proxy.ServicePortName][]*endpointsInfo{},
expectedResult: map[proxy.ServicePortName][]*endpointInfo{},
expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{{
Endpoint: "10.1.1.1:11",
ServicePortName: makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP),
@ -3923,12 +3923,12 @@ func TestUpdateEndpointsMap(t *testing.T) {
name: "add an IP and port",
previousEndpoints: namedPort,
currentEndpoints: namedPortsLocalNoLocal,
oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
oldEndpoints: map[proxy.ServicePortName][]*endpointInfo{
makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
{BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}},
},
},
expectedResult: map[proxy.ServicePortName][]*endpointsInfo{
expectedResult: map[proxy.ServicePortName][]*endpointInfo{
makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
{BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}},
{BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.2:11", IsLocal: true, Ready: true, Serving: true, Terminating: false}},
@ -3950,7 +3950,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
name: "remove an IP and port",
previousEndpoints: namedPortsLocalNoLocal,
currentEndpoints: namedPort,
oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
oldEndpoints: map[proxy.ServicePortName][]*endpointInfo{
makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
{BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}},
{BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.2:11", IsLocal: true, Ready: true, Serving: true, Terminating: false}},
@ -3960,7 +3960,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
{BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.2:12", IsLocal: true, Ready: true, Serving: true, Terminating: false}},
},
},
expectedResult: map[proxy.ServicePortName][]*endpointsInfo{
expectedResult: map[proxy.ServicePortName][]*endpointInfo{
makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
{BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}},
},
@ -3982,12 +3982,12 @@ func TestUpdateEndpointsMap(t *testing.T) {
name: "add a subset",
previousEndpoints: []*discovery.EndpointSlice{namedPort[0], nil},
currentEndpoints: multipleSubsetsWithLocal,
oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
oldEndpoints: map[proxy.ServicePortName][]*endpointInfo{
makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
{BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}},
},
},
expectedResult: map[proxy.ServicePortName][]*endpointsInfo{
expectedResult: map[proxy.ServicePortName][]*endpointInfo{
makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
{BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}},
},
@ -4007,7 +4007,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
name: "remove a subset",
previousEndpoints: multipleSubsets,
currentEndpoints: []*discovery.EndpointSlice{namedPort[0], nil},
oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
oldEndpoints: map[proxy.ServicePortName][]*endpointInfo{
makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
{BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}},
},
@ -4015,7 +4015,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
{BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.2:12", IsLocal: false, Ready: true, Serving: true, Terminating: false}},
},
},
expectedResult: map[proxy.ServicePortName][]*endpointsInfo{
expectedResult: map[proxy.ServicePortName][]*endpointInfo{
makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
{BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}},
},
@ -4031,12 +4031,12 @@ func TestUpdateEndpointsMap(t *testing.T) {
name: "rename a port",
previousEndpoints: namedPort,
currentEndpoints: namedPortRenamed,
oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
oldEndpoints: map[proxy.ServicePortName][]*endpointInfo{
makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
{BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}},
},
},
expectedResult: map[proxy.ServicePortName][]*endpointsInfo{
expectedResult: map[proxy.ServicePortName][]*endpointInfo{
makeServicePortName("ns1", "ep1", "p11-2", v1.ProtocolUDP): {
{BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}},
},
@ -4054,12 +4054,12 @@ func TestUpdateEndpointsMap(t *testing.T) {
name: "renumber a port",
previousEndpoints: namedPort,
currentEndpoints: namedPortRenumbered,
oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
oldEndpoints: map[proxy.ServicePortName][]*endpointInfo{
makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
{BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}},
},
},
expectedResult: map[proxy.ServicePortName][]*endpointsInfo{
expectedResult: map[proxy.ServicePortName][]*endpointInfo{
makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
{BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.1:22", IsLocal: false, Ready: true, Serving: true, Terminating: false}},
},
@ -4075,7 +4075,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
name: "complex add and remove",
previousEndpoints: complexBefore,
currentEndpoints: complexAfter,
oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
oldEndpoints: map[proxy.ServicePortName][]*endpointInfo{
makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
{BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}},
},
@ -4094,7 +4094,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
{BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.4.4.6:45", IsLocal: true, Ready: true, Serving: true, Terminating: false}},
},
},
expectedResult: map[proxy.ServicePortName][]*endpointsInfo{
expectedResult: map[proxy.ServicePortName][]*endpointInfo{
makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
{BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.11:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}},
{BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}},
@ -4141,8 +4141,8 @@ func TestUpdateEndpointsMap(t *testing.T) {
name: "change from 0 endpoint address to 1 unnamed port",
previousEndpoints: emptyEndpointSlices,
currentEndpoints: namedPort,
oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{},
expectedResult: map[proxy.ServicePortName][]*endpointsInfo{
oldEndpoints: map[proxy.ServicePortName][]*endpointInfo{},
expectedResult: map[proxy.ServicePortName][]*endpointInfo{
makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
{BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}},
},

View File

@ -219,7 +219,7 @@ type Proxier struct {
// services that happened since last syncProxyRules call. For a single object,
// changes are accumulated, i.e. previous is state from before all of them,
// current is state after applying all of those.
endpointsChanges *proxy.EndpointChangeTracker
endpointsChanges *proxy.EndpointsChangeTracker
serviceChanges *proxy.ServiceChangeTracker
mu sync.Mutex // protects the following fields
@ -413,7 +413,7 @@ func NewProxier(ipFamily v1.IPFamily,
svcPortMap: make(proxy.ServicePortMap),
serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, ipFamily, recorder, nil),
endpointsMap: make(proxy.EndpointsMap),
endpointsChanges: proxy.NewEndpointChangeTracker(hostname, nil, ipFamily, recorder, nil),
endpointsChanges: proxy.NewEndpointsChangeTracker(hostname, nil, ipFamily, recorder, nil),
initialSync: true,
syncPeriod: syncPeriod,
minSyncPeriod: minSyncPeriod,

View File

@ -154,7 +154,7 @@ func NewFakeProxier(ipt utiliptables.Interface, ipvs utilipvs.Interface, ipset u
svcPortMap: make(proxy.ServicePortMap),
serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, ipFamily, nil, nil),
endpointsMap: make(proxy.EndpointsMap),
endpointsChanges: proxy.NewEndpointChangeTracker(testHostname, nil, ipFamily, nil, nil),
endpointsChanges: proxy.NewEndpointsChangeTracker(testHostname, nil, ipFamily, nil, nil),
excludeCIDRs: excludeCIDRs,
iptables: ipt,
ipvs: ipvs,
@ -4358,7 +4358,7 @@ raid10 57344 0 - Live 0xffffffffc0597000`,
}
// The majority of EndpointSlice specific tests are not ipvs specific and focus on
// the shared EndpointChangeTracker and EndpointSliceCache. This test ensures that the
// the shared EndpointsChangeTracker and EndpointSliceCache. This test ensures that the
// ipvs proxier supports translating EndpointSlices to ipvs output.
func TestEndpointSliceE2E(t *testing.T) {
ipt := iptablestest.NewFake()

View File

@ -595,7 +595,7 @@ func TestServiceToServiceMap(t *testing.T) {
}
type FakeProxier struct {
endpointsChanges *EndpointChangeTracker
endpointsChanges *EndpointsChangeTracker
serviceChanges *ServiceChangeTracker
svcPortMap ServicePortMap
endpointsMap EndpointsMap
@ -607,7 +607,7 @@ func newFakeProxier(ipFamily v1.IPFamily, t time.Time) *FakeProxier {
svcPortMap: make(ServicePortMap),
serviceChanges: NewServiceChangeTracker(nil, ipFamily, nil, nil),
endpointsMap: make(EndpointsMap),
endpointsChanges: &EndpointChangeTracker{
endpointsChanges: &EndpointsChangeTracker{
lastChangeTriggerTimes: make(map[types.NamespacedName][]time.Time),
trackerStartTime: t,
processEndpointsMapChange: nil,

View File

@ -32,13 +32,13 @@ import (
type HostNetworkService interface {
getNetworkByName(name string) (*hnsNetworkInfo, error)
getAllEndpointsByNetwork(networkName string) (map[string]*endpointsInfo, error)
getEndpointByID(id string) (*endpointsInfo, error)
getEndpointByIpAddress(ip string, networkName string) (*endpointsInfo, error)
getEndpointByName(id string) (*endpointsInfo, error)
createEndpoint(ep *endpointsInfo, networkName string) (*endpointsInfo, error)
getAllEndpointsByNetwork(networkName string) (map[string]*endpointInfo, error)
getEndpointByID(id string) (*endpointInfo, error)
getEndpointByIpAddress(ip string, networkName string) (*endpointInfo, error)
getEndpointByName(id string) (*endpointInfo, error)
createEndpoint(ep *endpointInfo, networkName string) (*endpointInfo, error)
deleteEndpoint(hnsID string) error
getLoadBalancer(endpoints []endpointsInfo, flags loadBalancerFlags, sourceVip string, vip string, protocol uint16, internalPort uint16, externalPort uint16, previousLoadBalancers map[loadBalancerIdentifier]*loadBalancerInfo) (*loadBalancerInfo, error)
getLoadBalancer(endpoints []endpointInfo, flags loadBalancerFlags, sourceVip string, vip string, protocol uint16, internalPort uint16, externalPort uint16, previousLoadBalancers map[loadBalancerIdentifier]*loadBalancerInfo) (*loadBalancerInfo, error)
getAllLoadBalancers() (map[loadBalancerIdentifier]*loadBalancerInfo, error)
deleteLoadBalancer(hnsID string) error
}
@ -87,7 +87,7 @@ func (hns hns) getNetworkByName(name string) (*hnsNetworkInfo, error) {
}, nil
}
func (hns hns) getAllEndpointsByNetwork(networkName string) (map[string]*(endpointsInfo), error) {
func (hns hns) getAllEndpointsByNetwork(networkName string) (map[string]*(endpointInfo), error) {
hcnnetwork, err := hns.hcn.GetNetworkByName(networkName)
if err != nil {
klog.ErrorS(err, "failed to get HNS network by name", "name", networkName)
@ -97,7 +97,7 @@ func (hns hns) getAllEndpointsByNetwork(networkName string) (map[string]*(endpoi
if err != nil {
return nil, fmt.Errorf("failed to list endpoints: %w", err)
}
endpointInfos := make(map[string]*(endpointsInfo))
endpointInfos := make(map[string]*(endpointInfo))
for _, ep := range endpoints {
if len(ep.IpConfigurations) == 0 {
@ -108,7 +108,7 @@ func (hns hns) getAllEndpointsByNetwork(networkName string) (map[string]*(endpoi
// Add to map with key endpoint ID or IP address
// Storing this is expensive in terms of memory, however there is a bug in Windows Server 2019 that can cause two endpoints to be created with the same IP address.
// TODO: Store by IP only and remove any lookups by endpoint ID.
endpointInfos[ep.Id] = &endpointsInfo{
endpointInfos[ep.Id] = &endpointInfo{
ip: ep.IpConfigurations[0].IpAddress,
isLocal: uint32(ep.Flags&hcn.EndpointFlagsRemoteEndpoint) == 0,
macAddress: ep.MacAddress,
@ -127,7 +127,7 @@ func (hns hns) getAllEndpointsByNetwork(networkName string) (map[string]*(endpoi
// If ipFamilyPolicy is RequireDualStack or PreferDualStack, then there will be 2 IPS (iPV4 and IPV6)
// in the endpoint list
endpointDualstack := &endpointsInfo{
endpointDualstack := &endpointInfo{
ip: ep.IpConfigurations[1].IpAddress,
isLocal: uint32(ep.Flags&hcn.EndpointFlagsRemoteEndpoint) == 0,
macAddress: ep.MacAddress,
@ -145,12 +145,12 @@ func (hns hns) getAllEndpointsByNetwork(networkName string) (map[string]*(endpoi
return endpointInfos, nil
}
func (hns hns) getEndpointByID(id string) (*endpointsInfo, error) {
func (hns hns) getEndpointByID(id string) (*endpointInfo, error) {
hnsendpoint, err := hns.hcn.GetEndpointByID(id)
if err != nil {
return nil, err
}
return &endpointsInfo{ //TODO: fill out PA
return &endpointInfo{ //TODO: fill out PA
ip: hnsendpoint.IpConfigurations[0].IpAddress,
isLocal: uint32(hnsendpoint.Flags&hcn.EndpointFlagsRemoteEndpoint) == 0, //TODO: Change isLocal to isRemote
macAddress: hnsendpoint.MacAddress,
@ -158,7 +158,7 @@ func (hns hns) getEndpointByID(id string) (*endpointsInfo, error) {
hns: hns,
}, nil
}
func (hns hns) getEndpointByIpAddress(ip string, networkName string) (*endpointsInfo, error) {
func (hns hns) getEndpointByIpAddress(ip string, networkName string) (*endpointInfo, error) {
hnsnetwork, err := hns.hcn.GetNetworkByName(networkName)
if err != nil {
klog.ErrorS(err, "Error getting network by name")
@ -179,7 +179,7 @@ func (hns hns) getEndpointByIpAddress(ip string, networkName string) (*endpoints
}
}
if equal && strings.EqualFold(endpoint.HostComputeNetwork, hnsnetwork.Id) {
return &endpointsInfo{
return &endpointInfo{
ip: ip,
isLocal: uint32(endpoint.Flags&hcn.EndpointFlagsRemoteEndpoint) == 0, //TODO: Change isLocal to isRemote
macAddress: endpoint.MacAddress,
@ -190,12 +190,12 @@ func (hns hns) getEndpointByIpAddress(ip string, networkName string) (*endpoints
}
return nil, fmt.Errorf("Endpoint %v not found on network %s", ip, networkName)
}
func (hns hns) getEndpointByName(name string) (*endpointsInfo, error) {
func (hns hns) getEndpointByName(name string) (*endpointInfo, error) {
hnsendpoint, err := hns.hcn.GetEndpointByName(name)
if err != nil {
return nil, err
}
return &endpointsInfo{ //TODO: fill out PA
return &endpointInfo{ //TODO: fill out PA
ip: hnsendpoint.IpConfigurations[0].IpAddress,
isLocal: uint32(hnsendpoint.Flags&hcn.EndpointFlagsRemoteEndpoint) == 0, //TODO: Change isLocal to isRemote
macAddress: hnsendpoint.MacAddress,
@ -203,7 +203,7 @@ func (hns hns) getEndpointByName(name string) (*endpointsInfo, error) {
hns: hns,
}, nil
}
func (hns hns) createEndpoint(ep *endpointsInfo, networkName string) (*endpointsInfo, error) {
func (hns hns) createEndpoint(ep *endpointInfo, networkName string) (*endpointInfo, error) {
hnsNetwork, err := hns.hcn.GetNetworkByName(networkName)
if err != nil {
return nil, err
@ -251,7 +251,7 @@ func (hns hns) createEndpoint(ep *endpointsInfo, networkName string) (*endpoints
return nil, err
}
}
return &endpointsInfo{
return &endpointInfo{
ip: createdEndpoint.IpConfigurations[0].IpAddress,
isLocal: uint32(createdEndpoint.Flags&hcn.EndpointFlagsRemoteEndpoint) == 0,
macAddress: createdEndpoint.MacAddress,
@ -273,7 +273,7 @@ func (hns hns) deleteEndpoint(hnsID string) error {
}
// findLoadBalancerID will construct a id from the provided loadbalancer fields
func findLoadBalancerID(endpoints []endpointsInfo, vip string, protocol, internalPort, externalPort uint16) (loadBalancerIdentifier, error) {
func findLoadBalancerID(endpoints []endpointInfo, vip string, protocol, internalPort, externalPort uint16) (loadBalancerIdentifier, error) {
// Compute hash from backends (endpoint IDs)
hash, err := hashEndpoints(endpoints)
if err != nil {
@ -315,7 +315,7 @@ func (hns hns) getAllLoadBalancers() (map[loadBalancerIdentifier]*loadBalancerIn
return loadBalancers, nil
}
func (hns hns) getLoadBalancer(endpoints []endpointsInfo, flags loadBalancerFlags, sourceVip string, vip string, protocol uint16, internalPort uint16, externalPort uint16, previousLoadBalancers map[loadBalancerIdentifier]*loadBalancerInfo) (*loadBalancerInfo, error) {
func (hns hns) getLoadBalancer(endpoints []endpointInfo, flags loadBalancerFlags, sourceVip string, vip string, protocol uint16, internalPort uint16, externalPort uint16, previousLoadBalancers map[loadBalancerIdentifier]*loadBalancerInfo) (*loadBalancerInfo, error) {
var id loadBalancerIdentifier
vips := []string{}
// Compute hash from backends (endpoint IDs)
@ -424,7 +424,7 @@ func (hns hns) deleteLoadBalancer(hnsID string) error {
}
// Calculates a hash from the given endpoint IDs.
func hashEndpoints[T string | endpointsInfo](endpoints []T) (hash [20]byte, err error) {
func hashEndpoints[T string | endpointInfo](endpoints []T) (hash [20]byte, err error) {
var id string
// Recover in case something goes wrong. Return error and null byte array.
defer func() {
@ -437,7 +437,7 @@ func hashEndpoints[T string | endpointsInfo](endpoints []T) (hash [20]byte, err
// Iterate over endpoints, compute hash
for _, ep := range endpoints {
switch x := any(ep).(type) {
case endpointsInfo:
case endpointInfo:
id = strings.ToUpper(x.hnsID)
case string:
id = x

View File

@ -203,7 +203,7 @@ func TestCreateEndpointLocal(t *testing.T) {
hns := hns{hcn: newHcnImpl()}
Network := mustTestNetwork(t)
endpoint := &endpointsInfo{
endpoint := &endpointInfo{
ip: epIpAddress,
macAddress: epMacAddress,
isLocal: true,
@ -242,7 +242,7 @@ func TestCreateEndpointRemote(t *testing.T) {
Network := mustTestNetwork(t)
providerAddress := epPaAddress
endpoint := &endpointsInfo{
endpoint := &endpointInfo{
ip: epIpAddressRemote,
macAddress: epMacAddress,
isLocal: false,
@ -350,11 +350,11 @@ func TestGetLoadBalancerExisting(t *testing.T) {
if err != nil {
t.Error(err)
}
endpoint := &endpointsInfo{
endpoint := &endpointInfo{
ip: Endpoint.IpConfigurations[0].IpAddress,
hnsID: Endpoint.Id,
}
endpoints := []endpointsInfo{*endpoint}
endpoints := []endpointInfo{*endpoint}
hash, err := hashEndpoints(endpoints)
if err != nil {
t.Error(err)
@ -409,11 +409,11 @@ func TestGetLoadBalancerNew(t *testing.T) {
if err != nil {
t.Error(err)
}
endpoint := &endpointsInfo{
endpoint := &endpointInfo{
ip: Endpoint.IpConfigurations[0].IpAddress,
hnsID: Endpoint.Id,
}
endpoints := []endpointsInfo{*endpoint}
endpoints := []endpointInfo{*endpoint}
lb, err := hns.getLoadBalancer(endpoints, loadBalancerFlags{}, sourceVip, serviceVip, protocol, internalPort, externalPort, lbs)
if err != nil {
t.Error(err)
@ -523,7 +523,7 @@ func TestHashEndpoints(t *testing.T) {
if err != nil {
t.Error(err)
}
endpointInfoA := &endpointsInfo{
endpointInfoA := &endpointInfo{
ip: endpointA.IpConfigurations[0].IpAddress,
hnsID: endpointA.Id,
}
@ -543,12 +543,12 @@ func TestHashEndpoints(t *testing.T) {
if err != nil {
t.Error(err)
}
endpointInfoB := &endpointsInfo{
endpointInfoB := &endpointInfo{
ip: endpointB.IpConfigurations[0].IpAddress,
hnsID: endpointB.Id,
}
endpoints := []endpointsInfo{*endpointInfoA, *endpointInfoB}
endpointsReverse := []endpointsInfo{*endpointInfoB, *endpointInfoA}
endpoints := []endpointInfo{*endpointInfoA, *endpointInfoB}
endpointsReverse := []endpointInfo{*endpointInfoB, *endpointInfoA}
h1, err := hashEndpoints(endpoints)
if err != nil {
t.Error(err)

View File

@ -123,7 +123,7 @@ type serviceInfo struct {
hnsID string
nodePorthnsID string
policyApplied bool
remoteEndpoint *endpointsInfo
remoteEndpoint *endpointInfo
hns HostNetworkService
preserveDIP bool
localTrafficDSR bool
@ -273,7 +273,7 @@ func (t DualStackCompatTester) DualStackCompatible(networkName string) bool {
}
// internal struct for endpoints information
type endpointsInfo struct {
type endpointInfo struct {
ip string
port uint16
isLocal bool
@ -290,57 +290,57 @@ type endpointsInfo struct {
}
// String is part of proxy.Endpoint interface.
func (info *endpointsInfo) String() string {
func (info *endpointInfo) String() string {
return net.JoinHostPort(info.ip, strconv.Itoa(int(info.port)))
}
// GetIsLocal is part of proxy.Endpoint interface.
func (info *endpointsInfo) GetIsLocal() bool {
func (info *endpointInfo) GetIsLocal() bool {
return info.isLocal
}
// IsReady returns true if an endpoint is ready and not terminating.
func (info *endpointsInfo) IsReady() bool {
func (info *endpointInfo) IsReady() bool {
return info.ready
}
// IsServing returns true if an endpoint is ready, regardless of it's terminating state.
func (info *endpointsInfo) IsServing() bool {
func (info *endpointInfo) IsServing() bool {
return info.serving
}
// IsTerminating returns true if an endpoint is terminating.
func (info *endpointsInfo) IsTerminating() bool {
func (info *endpointInfo) IsTerminating() bool {
return info.terminating
}
// GetZoneHint returns the zone hint for the endpoint.
func (info *endpointsInfo) GetZoneHints() sets.Set[string] {
func (info *endpointInfo) GetZoneHints() sets.Set[string] {
return sets.Set[string]{}
}
// IP returns just the IP part of the endpoint, it's a part of proxy.Endpoint interface.
func (info *endpointsInfo) IP() string {
func (info *endpointInfo) IP() string {
return info.ip
}
// Port returns just the Port part of the endpoint.
func (info *endpointsInfo) Port() (int, error) {
func (info *endpointInfo) Port() (int, error) {
return int(info.port), nil
}
// Equal is part of proxy.Endpoint interface.
func (info *endpointsInfo) Equal(other proxy.Endpoint) bool {
func (info *endpointInfo) Equal(other proxy.Endpoint) bool {
return info.String() == other.String() && info.GetIsLocal() == other.GetIsLocal()
}
// GetNodeName returns the NodeName for this endpoint.
func (info *endpointsInfo) GetNodeName() string {
func (info *endpointInfo) GetNodeName() string {
return ""
}
// GetZone returns the Zone for this endpoint.
func (info *endpointsInfo) GetZone() string {
func (info *endpointInfo) GetZone() string {
return ""
}
@ -407,7 +407,7 @@ func (proxier *Proxier) onEndpointsMapChange(svcPortName *proxy.ServicePortName,
if exists {
// Cleanup Endpoints references
for _, ep := range epInfos {
epInfo, ok := ep.(*endpointsInfo)
epInfo, ok := ep.(*endpointInfo)
if ok {
epInfo.Cleanup()
@ -448,7 +448,7 @@ func (proxier *Proxier) onServiceMapChange(svcPortName *proxy.ServicePortName) {
}
}
// returns a new proxy.Endpoint which abstracts a endpointsInfo
// returns a new proxy.Endpoint which abstracts a endpointInfo
func (proxier *Proxier) newEndpointInfo(baseInfo *proxy.BaseEndpointInfo, _ *proxy.ServicePortName) proxy.Endpoint {
portNumber, err := baseInfo.Port()
@ -457,7 +457,7 @@ func (proxier *Proxier) newEndpointInfo(baseInfo *proxy.BaseEndpointInfo, _ *pro
portNumber = 0
}
info := &endpointsInfo{
info := &endpointInfo{
ip: baseInfo.IP(),
port: uint16(portNumber),
isLocal: baseInfo.GetIsLocal(),
@ -474,8 +474,8 @@ func (proxier *Proxier) newEndpointInfo(baseInfo *proxy.BaseEndpointInfo, _ *pro
return info
}
func newSourceVIP(hns HostNetworkService, network string, ip string, mac string, providerAddress string) (*endpointsInfo, error) {
hnsEndpoint := &endpointsInfo{
func newSourceVIP(hns HostNetworkService, network string, ip string, mac string, providerAddress string) (*endpointInfo, error) {
hnsEndpoint := &endpointInfo{
ip: ip,
isLocal: true,
macAddress: mac,
@ -489,15 +489,15 @@ func newSourceVIP(hns HostNetworkService, network string, ip string, mac string,
return ep, err
}
func (ep *endpointsInfo) DecrementRefCount() {
klog.V(3).InfoS("Decrementing Endpoint RefCount", "endpointsInfo", ep)
func (ep *endpointInfo) DecrementRefCount() {
klog.V(3).InfoS("Decrementing Endpoint RefCount", "endpointInfo", ep)
if !ep.GetIsLocal() && ep.refCount != nil && *ep.refCount > 0 {
*ep.refCount--
}
}
func (ep *endpointsInfo) Cleanup() {
klog.V(3).InfoS("Endpoint cleanup", "endpointsInfo", ep)
func (ep *endpointInfo) Cleanup() {
klog.V(3).InfoS("Endpoint cleanup", "endpointInfo", ep)
if !ep.GetIsLocal() && ep.refCount != nil {
*ep.refCount--
@ -601,7 +601,7 @@ type Proxier struct {
// services that happened since policies were synced. For a single object,
// changes are accumulated, i.e. previous is state from before all of them,
// current is state after applying all of those.
endpointsChanges *proxy.EndpointChangeTracker
endpointsChanges *proxy.EndpointsChangeTracker
serviceChanges *proxy.ServiceChangeTracker
endPointsRefCount endPointsReferenceCountMap
mu sync.Mutex // protects the following fields
@ -802,7 +802,7 @@ func NewProxier(
}
serviceChanges := proxy.NewServiceChangeTracker(proxier.newServiceInfo, ipFamily, recorder, proxier.serviceMapChange)
endPointChangeTracker := proxy.NewEndpointChangeTracker(hostname, proxier.newEndpointInfo, ipFamily, recorder, proxier.endpointsMapChange)
endPointChangeTracker := proxy.NewEndpointsChangeTracker(hostname, proxier.newEndpointInfo, ipFamily, recorder, proxier.endpointsMapChange)
proxier.endpointsChanges = endPointChangeTracker
proxier.serviceChanges = serviceChanges
@ -868,7 +868,7 @@ func (svcInfo *serviceInfo) cleanupAllPolicies(endpoints []proxy.Endpoint, mapSt
}
// Cleanup Endpoints references
for _, ep := range endpoints {
epInfo, ok := ep.(*endpointsInfo)
epInfo, ok := ep.(*endpointInfo)
if ok {
if winProxyOptimization {
epInfo.DecrementRefCount()
@ -1062,7 +1062,7 @@ func isNetworkNotFoundError(err error) bool {
// If atleast one is not terminating, then return false
func (proxier *Proxier) isAllEndpointsTerminating(svcName proxy.ServicePortName, isLocalTrafficDSR bool) bool {
for _, epInfo := range proxier.endpointsMap[svcName] {
ep, ok := epInfo.(*endpointsInfo)
ep, ok := epInfo.(*endpointInfo)
if !ok {
continue
}
@ -1087,7 +1087,7 @@ func (proxier *Proxier) isAllEndpointsTerminating(svcName proxy.ServicePortName,
// If atleast one is serving, then return false
func (proxier *Proxier) isAllEndpointsNonServing(svcName proxy.ServicePortName, isLocalTrafficDSR bool) bool {
for _, epInfo := range proxier.endpointsMap[svcName] {
ep, ok := epInfo.(*endpointsInfo)
ep, ok := epInfo.(*endpointInfo)
if !ok {
continue
}
@ -1102,7 +1102,7 @@ func (proxier *Proxier) isAllEndpointsNonServing(svcName proxy.ServicePortName,
}
// updateQueriedEndpoints updates the queriedEndpoints map with newly created endpoint details
func updateQueriedEndpoints(newHnsEndpoint *endpointsInfo, queriedEndpoints map[string]*endpointsInfo) {
func updateQueriedEndpoints(newHnsEndpoint *endpointInfo, queriedEndpoints map[string]*endpointInfo) {
// store newly created endpoints in queriedEndpoints
queriedEndpoints[newHnsEndpoint.hnsID] = newHnsEndpoint
queriedEndpoints[newHnsEndpoint.ip] = newHnsEndpoint
@ -1130,7 +1130,7 @@ func (proxier *Proxier) syncProxyRules() {
hnsNetworkName := proxier.network.name
hns := proxier.hns
var gatewayHnsendpoint *endpointsInfo
var gatewayHnsendpoint *endpointInfo
if proxier.forwardHealthCheckVip {
gatewayHnsendpoint, _ = hns.getEndpointByName(proxier.rootHnsEndpointName)
}
@ -1153,7 +1153,7 @@ func (proxier *Proxier) syncProxyRules() {
endpointUpdateResult := proxier.endpointsMap.Update(proxier.endpointsChanges)
deletedUDPClusterIPs := serviceUpdateResult.DeletedUDPClusterIPs
// merge stale services gathered from updateEndpointsMap
// merge stale services gathered from EndpointsMap.Update
for _, svcPortName := range endpointUpdateResult.NewlyActiveUDPServices {
if svcInfo, ok := proxier.svcPortMap[svcPortName]; ok && svcInfo != nil && svcInfo.Protocol() == v1.ProtocolUDP {
klog.V(2).InfoS("Newly-active UDP service may have stale conntrack entries", "servicePortName", svcPortName)
@ -1168,7 +1168,7 @@ func (proxier *Proxier) syncProxyRules() {
}
if queriedEndpoints == nil {
klog.V(4).InfoS("No existing endpoints found in HNS")
queriedEndpoints = make(map[string]*(endpointsInfo))
queriedEndpoints = make(map[string]*(endpointInfo))
}
queriedLoadBalancers, err := hns.getAllLoadBalancers()
if queriedLoadBalancers == nil {
@ -1208,7 +1208,7 @@ func (proxier *Proxier) syncProxyRules() {
serviceVipEndpoint := queriedEndpoints[svcInfo.ClusterIP().String()]
if serviceVipEndpoint == nil {
klog.V(4).InfoS("No existing remote endpoint", "IP", svcInfo.ClusterIP())
hnsEndpoint := &endpointsInfo{
hnsEndpoint := &endpointInfo{
ip: svcInfo.ClusterIP().String(),
isLocal: false,
macAddress: proxier.hostMac,
@ -1228,8 +1228,8 @@ func (proxier *Proxier) syncProxyRules() {
}
}
var hnsEndpoints []endpointsInfo
var hnsLocalEndpoints []endpointsInfo
var hnsEndpoints []endpointInfo
var hnsLocalEndpoints []endpointInfo
klog.V(4).InfoS("Applying Policy", "serviceInfo", svcName)
// Create Remote endpoints for every endpoint, corresponding to the service
containsPublicIP := false
@ -1249,9 +1249,9 @@ func (proxier *Proxier) syncProxyRules() {
}
for _, epInfo := range proxier.endpointsMap[svcName] {
ep, ok := epInfo.(*endpointsInfo)
ep, ok := epInfo.(*endpointInfo)
if !ok {
klog.ErrorS(nil, "Failed to cast endpointsInfo", "serviceName", svcName)
klog.ErrorS(nil, "Failed to cast endpointInfo", "serviceName", svcName)
continue
}
@ -1274,7 +1274,7 @@ func (proxier *Proxier) syncProxyRules() {
}
var newHnsEndpoint *endpointsInfo
var newHnsEndpoint *endpointInfo
hnsNetworkName := proxier.network.name
var err error
@ -1319,7 +1319,7 @@ func (proxier *Proxier) syncProxyRules() {
providerAddress = proxier.nodeIP.String()
}
hnsEndpoint := &endpointsInfo{
hnsEndpoint := &endpointInfo{
ip: ep.ip,
isLocal: false,
macAddress: conjureMac("02-11", netutils.ParseIPSloppy(ep.ip)),
@ -1328,13 +1328,13 @@ func (proxier *Proxier) syncProxyRules() {
newHnsEndpoint, err = hns.createEndpoint(hnsEndpoint, hnsNetworkName)
if err != nil {
klog.ErrorS(err, "Remote endpoint creation failed", "endpointsInfo", hnsEndpoint)
klog.ErrorS(err, "Remote endpoint creation failed", "endpointInfo", hnsEndpoint)
continue
}
updateQueriedEndpoints(newHnsEndpoint, queriedEndpoints)
} else {
hnsEndpoint := &endpointsInfo{
hnsEndpoint := &endpointInfo{
ip: ep.ip,
isLocal: false,
macAddress: ep.macAddress,
@ -1371,7 +1371,7 @@ func (proxier *Proxier) syncProxyRules() {
}
// Save the hnsId for reference
klog.V(1).InfoS("Hns endpoint resource", "endpointsInfo", newHnsEndpoint)
klog.V(1).InfoS("Hns endpoint resource", "endpointInfo", newHnsEndpoint)
hnsEndpoints = append(hnsEndpoints, *newHnsEndpoint)
if newHnsEndpoint.GetIsLocal() {
@ -1384,10 +1384,10 @@ func (proxier *Proxier) syncProxyRules() {
ep.hnsID = newHnsEndpoint.hnsID
klog.V(3).InfoS("Endpoint resource found", "endpointsInfo", ep)
klog.V(3).InfoS("Endpoint resource found", "endpointInfo", ep)
}
klog.V(3).InfoS("Associated endpoints for service", "endpointsInfo", hnsEndpoints, "serviceName", svcName)
klog.V(3).InfoS("Associated endpoints for service", "endpointInfo", hnsEndpoints, "serviceName", svcName)
if len(svcInfo.hnsID) > 0 {
// This should not happen
@ -1399,7 +1399,7 @@ func (proxier *Proxier) syncProxyRules() {
if len(hnsEndpoints) == 0 {
if svcInfo.winProxyOptimization {
// Deleting loadbalancers when there are no endpoints to serve.
klog.V(3).InfoS("Cleanup existing ", "endpointsInfo", hnsEndpoints, "serviceName", svcName)
klog.V(3).InfoS("Cleanup existing ", "endpointInfo", hnsEndpoints, "serviceName", svcName)
svcInfo.deleteLoadBalancerPolicy(proxier.mapStaleLoadbalancers)
}
klog.ErrorS(nil, "Endpoint information not available for service, not applying any policy", "serviceName", svcName)
@ -1560,10 +1560,10 @@ func (proxier *Proxier) syncProxyRules() {
nodeport = svcInfo.HealthCheckNodePort()
}
proxier.deleteExistingLoadBalancer(hns, svcInfo.winProxyOptimization, &lbIngressIP.healthCheckHnsID, sourceVip, Enum(svcInfo.Protocol()), uint16(svcInfo.targetPort), uint16(svcInfo.Port()), []endpointsInfo{*gatewayHnsendpoint}, queriedLoadBalancers)
proxier.deleteExistingLoadBalancer(hns, svcInfo.winProxyOptimization, &lbIngressIP.healthCheckHnsID, sourceVip, Enum(svcInfo.Protocol()), uint16(svcInfo.targetPort), uint16(svcInfo.Port()), []endpointInfo{*gatewayHnsendpoint}, queriedLoadBalancers)
hnsHealthCheckLoadBalancer, err := hns.getLoadBalancer(
[]endpointsInfo{*gatewayHnsendpoint},
[]endpointInfo{*gatewayHnsendpoint},
loadBalancerFlags{isDSR: false, useMUX: svcInfo.preserveDIP, preserveDIP: svcInfo.preserveDIP},
sourceVip,
lbIngressIP.ip,
@ -1623,7 +1623,7 @@ func (proxier *Proxier) syncProxyRules() {
// deleteExistingLoadBalancer checks whether loadbalancer delete is needed or not.
// If it is needed, the function will delete the existing loadbalancer and return true, else false.
func (proxier *Proxier) deleteExistingLoadBalancer(hns HostNetworkService, winProxyOptimization bool, lbHnsID *string, sourceVip string, protocol, intPort, extPort uint16, endpoints []endpointsInfo, queriedLoadBalancers map[loadBalancerIdentifier]*loadBalancerInfo) bool {
func (proxier *Proxier) deleteExistingLoadBalancer(hns HostNetworkService, winProxyOptimization bool, lbHnsID *string, sourceVip string, protocol, intPort, extPort uint16, endpoints []endpointInfo, queriedLoadBalancers map[loadBalancerIdentifier]*loadBalancerInfo) bool {
if !winProxyOptimization || *lbHnsID == "" {
// Loadbalancer delete not needed

View File

@ -123,8 +123,8 @@ func NewFakeProxier(syncPeriod time.Duration, minSyncPeriod time.Duration, clust
}
serviceChanges := proxy.NewServiceChangeTracker(proxier.newServiceInfo, v1.IPv4Protocol, nil, proxier.serviceMapChange)
endpointChangeTracker := proxy.NewEndpointChangeTracker(hostname, proxier.newEndpointInfo, v1.IPv4Protocol, nil, proxier.endpointsMapChange)
proxier.endpointsChanges = endpointChangeTracker
endpointsChangeTracker := proxy.NewEndpointsChangeTracker(hostname, proxier.newEndpointInfo, v1.IPv4Protocol, nil, proxier.endpointsMapChange)
proxier.endpointsChanges = endpointsChangeTracker
proxier.serviceChanges = serviceChanges
return proxier
@ -232,9 +232,9 @@ func TestCreateRemoteEndpointOverlay(t *testing.T) {
proxier.syncProxyRules()
ep := proxier.endpointsMap[svcPortName][0]
epInfo, ok := ep.(*endpointsInfo)
epInfo, ok := ep.(*endpointInfo)
if !ok {
t.Errorf("Failed to cast endpointsInfo %q", svcPortName.String())
t.Errorf("Failed to cast endpointInfo %q", svcPortName.String())
} else {
if epInfo.hnsID != "EPID-3" {
@ -296,9 +296,9 @@ func TestCreateRemoteEndpointL2Bridge(t *testing.T) {
proxier.setInitialized(true)
proxier.syncProxyRules()
ep := proxier.endpointsMap[svcPortName][0]
epInfo, ok := ep.(*endpointsInfo)
epInfo, ok := ep.(*endpointInfo)
if !ok {
t.Errorf("Failed to cast endpointsInfo %q", svcPortName.String())
t.Errorf("Failed to cast endpointInfo %q", svcPortName.String())
} else {
if epInfo.hnsID != endpointGuid1 {
@ -389,9 +389,9 @@ func TestSharedRemoteEndpointDelete(t *testing.T) {
proxier.setInitialized(true)
proxier.syncProxyRules()
ep := proxier.endpointsMap[svcPortName1][0]
epInfo, ok := ep.(*endpointsInfo)
epInfo, ok := ep.(*endpointInfo)
if !ok {
t.Errorf("Failed to cast endpointsInfo %q", svcPortName1.String())
t.Errorf("Failed to cast endpointInfo %q", svcPortName1.String())
} else {
if epInfo.hnsID != endpointGuid1 {
@ -439,9 +439,9 @@ func TestSharedRemoteEndpointDelete(t *testing.T) {
proxier.syncProxyRules()
ep = proxier.endpointsMap[svcPortName1][0]
epInfo, ok = ep.(*endpointsInfo)
epInfo, ok = ep.(*endpointInfo)
if !ok {
t.Errorf("Failed to cast endpointsInfo %q", svcPortName1.String())
t.Errorf("Failed to cast endpointInfo %q", svcPortName1.String())
} else {
if epInfo.hnsID != endpointGuid1 {
@ -534,9 +534,9 @@ func TestSharedRemoteEndpointUpdate(t *testing.T) {
proxier.setInitialized(true)
proxier.syncProxyRules()
ep := proxier.endpointsMap[svcPortName1][0]
epInfo, ok := ep.(*endpointsInfo)
epInfo, ok := ep.(*endpointInfo)
if !ok {
t.Errorf("Failed to cast endpointsInfo %q", svcPortName1.String())
t.Errorf("Failed to cast endpointInfo %q", svcPortName1.String())
} else {
if epInfo.hnsID != endpointGuid1 {
@ -613,10 +613,10 @@ func TestSharedRemoteEndpointUpdate(t *testing.T) {
proxier.syncProxyRules()
ep = proxier.endpointsMap[svcPortName1][0]
epInfo, ok = ep.(*endpointsInfo)
epInfo, ok = ep.(*endpointInfo)
if !ok {
t.Errorf("Failed to cast endpointsInfo %q", svcPortName1.String())
t.Errorf("Failed to cast endpointInfo %q", svcPortName1.String())
} else {
if epInfo.hnsID != endpointGuid1 {
@ -913,9 +913,9 @@ func TestEndpointSlice(t *testing.T) {
}
ep := proxier.endpointsMap[svcPortName][0]
epInfo, ok := ep.(*endpointsInfo)
epInfo, ok := ep.(*endpointInfo)
if !ok {
t.Errorf("Failed to cast endpointsInfo %q", svcPortName.String())
t.Errorf("Failed to cast endpointInfo %q", svcPortName.String())
} else {
if epInfo.hnsID != "EPID-3" {