diff --git a/pkg/proxy/endpoints.go b/pkg/proxy/endpoints.go index ca7071a9eb6..23fe42fa0f9 100644 --- a/pkg/proxy/endpoints.go +++ b/pkg/proxy/endpoints.go @@ -18,7 +18,6 @@ package proxy import ( "net" - "reflect" "strconv" "sync" "time" @@ -32,7 +31,6 @@ import ( "k8s.io/apimachinery/pkg/util/sets" "k8s.io/kubernetes/pkg/proxy/metrics" utilproxy "k8s.io/kubernetes/pkg/proxy/util" - utilnet "k8s.io/utils/net" ) var supportedEndpointSliceAddressTypes = sets.NewString( @@ -159,20 +157,12 @@ type processEndpointsMapChangeFunc func(oldEndpointsMap, newEndpointsMap Endpoin // EndpointChangeTracker carries state about uncommitted changes to an arbitrary number of // Endpoints, keyed by their namespace and name. type EndpointChangeTracker struct { - // lock protects items. + // lock protects lastChangeTriggerTimes lock sync.Mutex - // hostname is the host where kube-proxy is running. - hostname string - // items maps a service to is endpointsChange. - items map[types.NamespacedName]*endpointsChange - // makeEndpointInfo allows proxier to inject customized information when processing endpoint. - makeEndpointInfo makeEndpointFunc + processEndpointsMapChange processEndpointsMapChangeFunc // endpointSliceCache holds a simplified version of endpoint slices. endpointSliceCache *EndpointSliceCache - // ipfamily identify the ip family on which the tracker is operating on - ipFamily v1.IPFamily - recorder events.EventRecorder // Map from the Endpoints namespaced-name to the times of the triggers that caused the endpoints // object to change. Used to calculate the network-programming-latency. lastChangeTriggerTimes map[types.NamespacedName][]time.Time @@ -186,11 +176,6 @@ type EndpointChangeTracker struct { // NewEndpointChangeTracker initializes an EndpointsChangeMap func NewEndpointChangeTracker(hostname string, makeEndpointInfo makeEndpointFunc, ipFamily v1.IPFamily, recorder events.EventRecorder, processEndpointsMapChange processEndpointsMapChangeFunc) *EndpointChangeTracker { return &EndpointChangeTracker{ - hostname: hostname, - items: make(map[types.NamespacedName]*endpointsChange), - makeEndpointInfo: makeEndpointInfo, - ipFamily: ipFamily, - recorder: recorder, lastChangeTriggerTimes: make(map[types.NamespacedName][]time.Time), trackerStartTime: time.Now(), processEndpointsMapChange: processEndpointsMapChange, @@ -198,66 +183,6 @@ func NewEndpointChangeTracker(hostname string, makeEndpointInfo makeEndpointFunc } } -// Update updates given service's endpoints change map based on the endpoints pair. It returns true -// if items changed, otherwise return false. Update can be used to add/update/delete items of EndpointsChangeMap. For example, -// Add item -// - pass as the pair. -// -// Update item -// - pass as the pair. -// -// Delete item -// - pass as the pair. -func (ect *EndpointChangeTracker) Update(previous, current *v1.Endpoints) bool { - endpoints := current - if endpoints == nil { - endpoints = previous - } - // previous == nil && current == nil is unexpected, we should return false directly. - if endpoints == nil { - return false - } - metrics.EndpointChangesTotal.Inc() - namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name} - - ect.lock.Lock() - defer ect.lock.Unlock() - - change, exists := ect.items[namespacedName] - if !exists { - change = &endpointsChange{} - change.previous = ect.endpointsToEndpointsMap(previous) - ect.items[namespacedName] = change - } - - // In case of Endpoints deletion, the LastChangeTriggerTime annotation is - // by-definition coming from the time of last update, which is not what - // we want to measure. So we simply ignore it in this cases. - if t := getLastChangeTriggerTime(endpoints.Annotations); !t.IsZero() && current != nil && t.After(ect.trackerStartTime) { - ect.lastChangeTriggerTimes[namespacedName] = append(ect.lastChangeTriggerTimes[namespacedName], t) - } - - change.current = ect.endpointsToEndpointsMap(current) - // if change.previous equal to change.current, it means no change - if reflect.DeepEqual(change.previous, change.current) { - delete(ect.items, namespacedName) - // Reset the lastChangeTriggerTimes for the Endpoints object. Given that the network programming - // SLI is defined as the duration between a time of an event and a time when the network was - // programmed to incorporate that event, if there are events that happened between two - // consecutive syncs and that canceled each other out, e.g. pod A added -> pod A deleted, - // there will be no network programming for them and thus no network programming latency metric - // should be exported. - delete(ect.lastChangeTriggerTimes, namespacedName) - } else { - for spn, eps := range change.current { - klog.V(2).InfoS("Service port endpoints update", "servicePort", spn, "endpoints", len(eps)) - } - } - - metrics.EndpointChangesPending.Set(float64(len(ect.items))) - return len(ect.items) > 0 -} - // EndpointSliceUpdate updates given service's endpoints change map based on the endpoints pair. // It returns true if items changed, otherwise return false. Will add/update/delete items of EndpointsChangeMap. // If removeSlice is true, slice will be removed, otherwise it will be added or updated. @@ -293,7 +218,9 @@ func (ect *EndpointChangeTracker) EndpointSliceUpdate(endpointSlice *discovery.E // we want to measure. So we simply ignore it in this cases. // TODO(wojtek-t, robscott): Address the problem for EndpointSlice deletion // when other EndpointSlice for that service still exist. - if t := getLastChangeTriggerTime(endpointSlice.Annotations); !t.IsZero() && !removeSlice && t.After(ect.trackerStartTime) { + if removeSlice { + delete(ect.lastChangeTriggerTimes, namespacedName) + } else if t := getLastChangeTriggerTime(endpointSlice.Annotations); !t.IsZero() && t.After(ect.trackerStartTime) { ect.lastChangeTriggerTimes[namespacedName] = append(ect.lastChangeTriggerTimes[namespacedName], t) } @@ -306,38 +233,15 @@ func (ect *EndpointChangeTracker) EndpointSliceUpdate(endpointSlice *discovery.E // have changed since the last time ect was used to update an EndpointsMap. (You must call // this _before_ calling em.Update(ect).) func (ect *EndpointChangeTracker) PendingChanges() sets.String { - if ect.endpointSliceCache != nil { - return ect.endpointSliceCache.pendingChanges() - } - - ect.lock.Lock() - defer ect.lock.Unlock() - - changes := sets.NewString() - for name := range ect.items { - changes.Insert(name.String()) - } - return changes + return ect.endpointSliceCache.pendingChanges() } // checkoutChanges returns a list of pending endpointsChanges and marks them as // applied. func (ect *EndpointChangeTracker) checkoutChanges() []*endpointsChange { - ect.lock.Lock() - defer ect.lock.Unlock() - metrics.EndpointChangesPending.Set(0) - if ect.endpointSliceCache != nil { - return ect.endpointSliceCache.checkoutChanges() - } - - changes := []*endpointsChange{} - for _, change := range ect.items { - changes = append(changes, change) - } - ect.items = make(map[types.NamespacedName]*endpointsChange) - return changes + return ect.endpointSliceCache.checkoutChanges() } // checkoutTriggerTimes applies the locally cached trigger times to a map of @@ -424,76 +328,6 @@ func (em EndpointsMap) Update(changes *EndpointChangeTracker) (result UpdateEndp // EndpointsMap maps a service name to a list of all its Endpoints. type EndpointsMap map[ServicePortName][]Endpoint -// endpointsToEndpointsMap translates single Endpoints object to EndpointsMap. -// This function is used for incremental updated of endpointsMap. -// -// NOTE: endpoints object should NOT be modified. -func (ect *EndpointChangeTracker) endpointsToEndpointsMap(endpoints *v1.Endpoints) EndpointsMap { - if endpoints == nil { - return nil - } - - endpointsMap := make(EndpointsMap) - // We need to build a map of portname -> all ip:ports for that portname. - // Explode Endpoints.Subsets[*] into this structure. - for i := range endpoints.Subsets { - ss := &endpoints.Subsets[i] - for i := range ss.Ports { - port := &ss.Ports[i] - if port.Port == 0 { - klog.ErrorS(nil, "Ignoring invalid endpoint port", "portName", port.Name) - continue - } - svcPortName := ServicePortName{ - NamespacedName: types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}, - Port: port.Name, - Protocol: port.Protocol, - } - for i := range ss.Addresses { - addr := &ss.Addresses[i] - if addr.IP == "" { - klog.ErrorS(nil, "Ignoring invalid endpoint port with empty host", "portName", port.Name) - continue - } - - // Filter out the incorrect IP version case. - // Any endpoint port that contains incorrect IP version will be ignored. - if (ect.ipFamily == v1.IPv6Protocol) != utilnet.IsIPv6String(addr.IP) { - // Emit event on the corresponding service which had a different - // IP version than the endpoint. - utilproxy.LogAndEmitIncorrectIPVersionEvent(ect.recorder, "endpoints", addr.IP, endpoints.Namespace, endpoints.Name, "") - continue - } - - // it is safe to assume that any address in endpoints.subsets[*].addresses is - // ready and NOT terminating - isReady := true - isServing := true - isTerminating := false - isLocal := false - nodeName := "" - if addr.NodeName != nil { - isLocal = *addr.NodeName == ect.hostname - nodeName = *addr.NodeName - } - // Only supported with EndpointSlice API - zoneHints := sets.String{} - - // Zone information is only supported with EndpointSlice API - baseEndpointInfo := newBaseEndpointInfo(addr.IP, nodeName, "", int(port.Port), isLocal, isReady, isServing, isTerminating, zoneHints) - if ect.makeEndpointInfo != nil { - endpointsMap[svcPortName] = append(endpointsMap[svcPortName], ect.makeEndpointInfo(baseEndpointInfo, &svcPortName)) - } else { - endpointsMap[svcPortName] = append(endpointsMap[svcPortName], baseEndpointInfo) - } - } - - klog.V(3).InfoS("Setting endpoints for service port", "portName", svcPortName, "endpoints", formatEndpointsList(endpointsMap[svcPortName])) - } - } - return endpointsMap -} - // apply the changes to EndpointsMap and updates stale endpoints and service-endpoints pair. The `staleEndpoints` argument // is passed in to store the stale udp endpoints and `staleServiceNames` argument is passed in to store the stale udp service. // The changes map is cleared after applying them. diff --git a/pkg/proxy/endpoints_test.go b/pkg/proxy/endpoints_test.go index 660a3a2efe7..cd5662cb11a 100644 --- a/pkg/proxy/endpoints_test.go +++ b/pkg/proxy/endpoints_test.go @@ -17,12 +17,11 @@ limitations under the License. package proxy import ( + "fmt" "reflect" "testing" "time" - "github.com/davecgh/go-spew/spew" - v1 "k8s.io/api/core/v1" discovery "k8s.io/api/discovery/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -31,16 +30,16 @@ import ( "k8s.io/utils/pointer" ) -func (proxier *FakeProxier) addEndpoints(endpoints *v1.Endpoints) { - proxier.endpointsChanges.Update(nil, endpoints) +func (proxier *FakeProxier) addEndpointSlice(slice *discovery.EndpointSlice) { + proxier.endpointsChanges.EndpointSliceUpdate(slice, false) } -func (proxier *FakeProxier) updateEndpoints(oldEndpoints, endpoints *v1.Endpoints) { - proxier.endpointsChanges.Update(oldEndpoints, endpoints) +func (proxier *FakeProxier) updateEndpointSlice(oldSlice, slice *discovery.EndpointSlice) { + proxier.endpointsChanges.EndpointSliceUpdate(slice, false) } -func (proxier *FakeProxier) deleteEndpoints(endpoints *v1.Endpoints) { - proxier.endpointsChanges.Update(endpoints, nil) +func (proxier *FakeProxier) deleteEndpointSlice(slice *discovery.EndpointSlice) { + proxier.endpointsChanges.EndpointSliceUpdate(slice, true) } func TestGetLocalEndpointIPs(t *testing.T) { @@ -168,658 +167,340 @@ func TestGetLocalEndpointIPs(t *testing.T) { } } -func makeTestEndpoints(namespace, name string, eptFunc func(*v1.Endpoints)) *v1.Endpoints { - ept := &v1.Endpoints{ +func makeTestEndpointSlice(namespace, name string, slice int, epsFunc func(*discovery.EndpointSlice)) *discovery.EndpointSlice { + eps := &discovery.EndpointSlice{ ObjectMeta: metav1.ObjectMeta{ - Name: name, + Name: fmt.Sprintf("%s-%d", name, slice), Namespace: namespace, - Annotations: make(map[string]string), + Annotations: map[string]string{}, + Labels: map[string]string{ + discovery.LabelServiceName: name, + }, }, + AddressType: discovery.AddressTypeIPv4, } - eptFunc(ept) - return ept -} - -// This is a coarse test, but it offers some modicum of confidence as the code is evolved. -func TestEndpointsToEndpointsMap(t *testing.T) { - testCases := []struct { - desc string - newEndpoints *v1.Endpoints - expected map[ServicePortName][]*BaseEndpointInfo - isIPv6Mode *bool - ipFamily v1.IPFamily - }{ - { - desc: "nothing", - ipFamily: v1.IPv4Protocol, - - newEndpoints: makeTestEndpoints("ns1", "ep1", func(ept *v1.Endpoints) {}), - expected: map[ServicePortName][]*BaseEndpointInfo{}, - }, - { - desc: "no changes, unnamed port", - ipFamily: v1.IPv4Protocol, - - newEndpoints: makeTestEndpoints("ns1", "ep1", func(ept *v1.Endpoints) { - ept.Subsets = []v1.EndpointSubset{ - { - Addresses: []v1.EndpointAddress{{ - IP: "1.1.1.1", - }}, - Ports: []v1.EndpointPort{{ - Name: "", - Port: 11, - Protocol: v1.ProtocolTCP, - }}, - }, - } - }), - expected: map[ServicePortName][]*BaseEndpointInfo{ - makeServicePortName("ns1", "ep1", "", v1.ProtocolTCP): { - {Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false, ZoneHints: sets.String{}}, - }, - }, - }, - { - desc: "no changes, named port", - ipFamily: v1.IPv4Protocol, - - newEndpoints: makeTestEndpoints("ns1", "ep1", func(ept *v1.Endpoints) { - ept.Subsets = []v1.EndpointSubset{ - { - Addresses: []v1.EndpointAddress{{ - IP: "1.1.1.1", - }}, - Ports: []v1.EndpointPort{{ - Name: "port", - Port: 11, - Protocol: v1.ProtocolTCP, - }}, - }, - } - }), - expected: map[ServicePortName][]*BaseEndpointInfo{ - makeServicePortName("ns1", "ep1", "port", v1.ProtocolTCP): { - {Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false, ZoneHints: sets.String{}}, - }, - }, - }, - { - desc: "new port", - ipFamily: v1.IPv4Protocol, - - newEndpoints: makeTestEndpoints("ns1", "ep1", func(ept *v1.Endpoints) { - ept.Subsets = []v1.EndpointSubset{ - { - Addresses: []v1.EndpointAddress{{ - IP: "1.1.1.1", - }}, - Ports: []v1.EndpointPort{{ - Port: 11, - Protocol: v1.ProtocolTCP, - }}, - }, - } - }), - expected: map[ServicePortName][]*BaseEndpointInfo{ - makeServicePortName("ns1", "ep1", "", v1.ProtocolTCP): { - {Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false, ZoneHints: sets.String{}}, - }, - }, - }, - { - desc: "remove port", - ipFamily: v1.IPv4Protocol, - - newEndpoints: makeTestEndpoints("ns1", "ep1", func(ept *v1.Endpoints) {}), - expected: map[ServicePortName][]*BaseEndpointInfo{}, - }, - { - desc: "new IP and port", - ipFamily: v1.IPv4Protocol, - - newEndpoints: makeTestEndpoints("ns1", "ep1", func(ept *v1.Endpoints) { - ept.Subsets = []v1.EndpointSubset{ - { - Addresses: []v1.EndpointAddress{{ - IP: "1.1.1.1", - }, { - IP: "2.2.2.2", - }}, - Ports: []v1.EndpointPort{{ - Name: "p1", - Port: 11, - Protocol: v1.ProtocolTCP, - }, { - Name: "p2", - Port: 22, - Protocol: v1.ProtocolTCP, - }}, - }, - } - }), - expected: map[ServicePortName][]*BaseEndpointInfo{ - makeServicePortName("ns1", "ep1", "p1", v1.ProtocolTCP): { - {Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false, ZoneHints: sets.String{}}, - {Endpoint: "2.2.2.2:11", IsLocal: false, Ready: true, Serving: true, Terminating: false, ZoneHints: sets.String{}}, - }, - makeServicePortName("ns1", "ep1", "p2", v1.ProtocolTCP): { - {Endpoint: "1.1.1.1:22", IsLocal: false, Ready: true, Serving: true, Terminating: false, ZoneHints: sets.String{}}, - {Endpoint: "2.2.2.2:22", IsLocal: false, Ready: true, Serving: true, Terminating: false, ZoneHints: sets.String{}}, - }, - }, - }, - { - desc: "remove IP and port", - ipFamily: v1.IPv4Protocol, - - newEndpoints: makeTestEndpoints("ns1", "ep1", func(ept *v1.Endpoints) { - ept.Subsets = []v1.EndpointSubset{ - { - Addresses: []v1.EndpointAddress{{ - IP: "1.1.1.1", - }}, - Ports: []v1.EndpointPort{{ - Name: "p1", - Port: 11, - Protocol: v1.ProtocolTCP, - }}, - }, - } - }), - expected: map[ServicePortName][]*BaseEndpointInfo{ - makeServicePortName("ns1", "ep1", "p1", v1.ProtocolTCP): { - {Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false, ZoneHints: sets.String{}}, - }, - }, - }, - { - desc: "rename port", - ipFamily: v1.IPv4Protocol, - - newEndpoints: makeTestEndpoints("ns1", "ep1", func(ept *v1.Endpoints) { - ept.Subsets = []v1.EndpointSubset{ - { - Addresses: []v1.EndpointAddress{{ - IP: "1.1.1.1", - }}, - Ports: []v1.EndpointPort{{ - Name: "p2", - Port: 11, - Protocol: v1.ProtocolTCP, - }}, - }, - } - }), - expected: map[ServicePortName][]*BaseEndpointInfo{ - makeServicePortName("ns1", "ep1", "p2", v1.ProtocolTCP): { - {Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false, ZoneHints: sets.String{}}, - }, - }, - }, - { - desc: "renumber port", - ipFamily: v1.IPv4Protocol, - - newEndpoints: makeTestEndpoints("ns1", "ep1", func(ept *v1.Endpoints) { - ept.Subsets = []v1.EndpointSubset{ - { - Addresses: []v1.EndpointAddress{{ - IP: "1.1.1.1", - }}, - Ports: []v1.EndpointPort{{ - Name: "p1", - Port: 22, - Protocol: v1.ProtocolTCP, - }}, - }, - } - }), - expected: map[ServicePortName][]*BaseEndpointInfo{ - makeServicePortName("ns1", "ep1", "p1", v1.ProtocolTCP): { - {Endpoint: "1.1.1.1:22", IsLocal: false, Ready: true, Serving: true, Terminating: false, ZoneHints: sets.String{}}, - }, - }, - }, - { - desc: "should omit IPv6 address in IPv4 mode", - ipFamily: v1.IPv4Protocol, - - newEndpoints: makeTestEndpoints("ns1", "ep1", func(ept *v1.Endpoints) { - ept.Subsets = []v1.EndpointSubset{ - { - Addresses: []v1.EndpointAddress{{ - IP: "1.1.1.1", - }, { - IP: "2001:db8:85a3:0:0:8a2e:370:7334", - }}, - Ports: []v1.EndpointPort{{ - Name: "p1", - Port: 11, - Protocol: v1.ProtocolTCP, - }, { - Name: "p2", - Port: 22, - Protocol: v1.ProtocolTCP, - }}, - }, - } - }), - expected: map[ServicePortName][]*BaseEndpointInfo{ - makeServicePortName("ns1", "ep1", "p1", v1.ProtocolTCP): { - {Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false, ZoneHints: sets.String{}}, - }, - makeServicePortName("ns1", "ep1", "p2", v1.ProtocolTCP): { - {Endpoint: "1.1.1.1:22", IsLocal: false, Ready: true, Serving: true, Terminating: false, ZoneHints: sets.String{}}, - }, - }, - }, - { - desc: "should omit IPv4 address in IPv6 mode", - ipFamily: v1.IPv6Protocol, - - newEndpoints: makeTestEndpoints("ns1", "ep1", func(ept *v1.Endpoints) { - ept.Subsets = []v1.EndpointSubset{ - { - Addresses: []v1.EndpointAddress{{ - IP: "1.1.1.1", - }, { - IP: "2001:db8:85a3:0:0:8a2e:370:7334", - }}, - Ports: []v1.EndpointPort{{ - Name: "p1", - Port: 11, - Protocol: v1.ProtocolTCP, - }, { - Name: "p2", - Port: 22, - Protocol: v1.ProtocolTCP, - }}, - }, - } - }), - expected: map[ServicePortName][]*BaseEndpointInfo{ - makeServicePortName("ns1", "ep1", "p1", v1.ProtocolTCP): { - {Endpoint: "[2001:db8:85a3:0:0:8a2e:370:7334]:11", IsLocal: false, Ready: true, Serving: true, Terminating: false, ZoneHints: sets.String{}}, - }, - makeServicePortName("ns1", "ep1", "p2", v1.ProtocolTCP): { - {Endpoint: "[2001:db8:85a3:0:0:8a2e:370:7334]:22", IsLocal: false, Ready: true, Serving: true, Terminating: false, ZoneHints: sets.String{}}, - }, - }, - }, - } - - for _, tc := range testCases { - t.Run(tc.desc, func(t *testing.T) { - - epTracker := NewEndpointChangeTracker("test-hostname", nil, tc.ipFamily, nil, nil) - - // outputs - newEndpoints := epTracker.endpointsToEndpointsMap(tc.newEndpoints) - - if len(newEndpoints) != len(tc.expected) { - t.Fatalf("[%s] expected %d new, got %d: %v", tc.desc, len(tc.expected), len(newEndpoints), spew.Sdump(newEndpoints)) - } - for x := range tc.expected { - if len(newEndpoints[x]) != len(tc.expected[x]) { - t.Fatalf("[%s] expected %d endpoints for %v, got %d", tc.desc, len(tc.expected[x]), x, len(newEndpoints[x])) - } else { - for i := range newEndpoints[x] { - ep := newEndpoints[x][i].(*BaseEndpointInfo) - if !(reflect.DeepEqual(*ep, *(tc.expected[x][i]))) { - t.Fatalf("[%s] expected new[%v][%d] to be %v, got %v", tc.desc, x, i, tc.expected[x][i], *ep) - } - } - } - } - }) - } + epsFunc(eps) + return eps } func TestUpdateEndpointsMap(t *testing.T) { var nodeName = testHostname + udp := v1.ProtocolUDP - emptyEndpoint := func(ept *v1.Endpoints) { - ept.Subsets = []v1.EndpointSubset{} + emptyEndpoint := func(eps *discovery.EndpointSlice) { + eps.Endpoints = []discovery.Endpoint{} } - unnamedPort := func(ept *v1.Endpoints) { - ept.Subsets = []v1.EndpointSubset{{ - Addresses: []v1.EndpointAddress{{ - IP: "1.1.1.1", - }}, - Ports: []v1.EndpointPort{{ - Port: 11, - Protocol: v1.ProtocolUDP, - }}, + unnamedPort := func(eps *discovery.EndpointSlice) { + eps.Endpoints = []discovery.Endpoint{{ + Addresses: []string{"1.1.1.1"}, + }} + eps.Ports = []discovery.EndpointPort{{ + Name: pointer.String(""), + Port: pointer.Int32(11), + Protocol: &udp, }} } - unnamedPortLocal := func(ept *v1.Endpoints) { - ept.Subsets = []v1.EndpointSubset{{ - Addresses: []v1.EndpointAddress{{ - IP: "1.1.1.1", - NodeName: &nodeName, - }}, - Ports: []v1.EndpointPort{{ - Port: 11, - Protocol: v1.ProtocolUDP, - }}, + unnamedPortLocal := func(eps *discovery.EndpointSlice) { + eps.Endpoints = []discovery.Endpoint{{ + Addresses: []string{"1.1.1.1"}, + NodeName: &nodeName, + }} + eps.Ports = []discovery.EndpointPort{{ + Name: pointer.String(""), + Port: pointer.Int32(11), + Protocol: &udp, }} } - namedPortLocal := func(ept *v1.Endpoints) { - ept.Subsets = []v1.EndpointSubset{{ - Addresses: []v1.EndpointAddress{{ - IP: "1.1.1.1", - NodeName: &nodeName, - }}, - Ports: []v1.EndpointPort{{ - Name: "p11", - Port: 11, - Protocol: v1.ProtocolUDP, - }}, + namedPortLocal := func(eps *discovery.EndpointSlice) { + eps.Endpoints = []discovery.Endpoint{{ + Addresses: []string{"1.1.1.1"}, + NodeName: &nodeName, + }} + eps.Ports = []discovery.EndpointPort{{ + Name: pointer.String("p11"), + Port: pointer.Int32(11), + Protocol: &udp, }} } - namedPort := func(ept *v1.Endpoints) { - ept.Subsets = []v1.EndpointSubset{{ - Addresses: []v1.EndpointAddress{{ - IP: "1.1.1.1", - }}, - Ports: []v1.EndpointPort{{ - Name: "p11", - Port: 11, - Protocol: v1.ProtocolUDP, - }}, + namedPort := func(eps *discovery.EndpointSlice) { + eps.Endpoints = []discovery.Endpoint{{ + Addresses: []string{"1.1.1.1"}, + }} + eps.Ports = []discovery.EndpointPort{{ + Name: pointer.String("p11"), + Port: pointer.Int32(11), + Protocol: &udp, }} } - namedPortRenamed := func(ept *v1.Endpoints) { - ept.Subsets = []v1.EndpointSubset{{ - Addresses: []v1.EndpointAddress{{ - IP: "1.1.1.1", - }}, - Ports: []v1.EndpointPort{{ - Name: "p11-2", - Port: 11, - Protocol: v1.ProtocolUDP, - }}, + namedPortRenamed := func(eps *discovery.EndpointSlice) { + eps.Endpoints = []discovery.Endpoint{{ + Addresses: []string{"1.1.1.1"}, + }} + eps.Ports = []discovery.EndpointPort{{ + Name: pointer.String("p11-2"), + Port: pointer.Int32(11), + Protocol: &udp, }} } - namedPortRenumbered := func(ept *v1.Endpoints) { - ept.Subsets = []v1.EndpointSubset{{ - Addresses: []v1.EndpointAddress{{ - IP: "1.1.1.1", - }}, - Ports: []v1.EndpointPort{{ - Name: "p11", - Port: 22, - Protocol: v1.ProtocolUDP, - }}, + namedPortRenumbered := func(eps *discovery.EndpointSlice) { + eps.Endpoints = []discovery.Endpoint{{ + Addresses: []string{"1.1.1.1"}, + }} + eps.Ports = []discovery.EndpointPort{{ + Name: pointer.String("p11"), + Port: pointer.Int32(22), + Protocol: &udp, }} } - namedPortsLocalNoLocal := func(ept *v1.Endpoints) { - ept.Subsets = []v1.EndpointSubset{{ - Addresses: []v1.EndpointAddress{{ - IP: "1.1.1.1", - }, { - IP: "1.1.1.2", - NodeName: &nodeName, - }}, - Ports: []v1.EndpointPort{{ - Name: "p11", - Port: 11, - Protocol: v1.ProtocolUDP, - }, { - Name: "p12", - Port: 12, - Protocol: v1.ProtocolUDP, - }}, - }} - } - multipleSubsets := func(ept *v1.Endpoints) { - ept.Subsets = []v1.EndpointSubset{{ - Addresses: []v1.EndpointAddress{{ - IP: "1.1.1.1", - }}, - Ports: []v1.EndpointPort{{ - Name: "p11", - Port: 11, - Protocol: v1.ProtocolUDP, - }}, + namedPortsLocalNoLocal := func(eps *discovery.EndpointSlice) { + eps.Endpoints = []discovery.Endpoint{{ + Addresses: []string{"1.1.1.1"}, }, { - Addresses: []v1.EndpointAddress{{ - IP: "1.1.1.2", - }}, - Ports: []v1.EndpointPort{{ - Name: "p12", - Port: 12, - Protocol: v1.ProtocolUDP, - }}, + Addresses: []string{"1.1.1.2"}, + NodeName: &nodeName, }} - } - multipleSubsetsWithLocal := func(ept *v1.Endpoints) { - ept.Subsets = []v1.EndpointSubset{{ - Addresses: []v1.EndpointAddress{{ - IP: "1.1.1.1", - }}, - Ports: []v1.EndpointPort{{ - Name: "p11", - Port: 11, - Protocol: v1.ProtocolUDP, - }}, + eps.Ports = []discovery.EndpointPort{{ + Name: pointer.String("p11"), + Port: pointer.Int32(11), + Protocol: &udp, }, { - Addresses: []v1.EndpointAddress{{ - IP: "1.1.1.2", - NodeName: &nodeName, - }}, - Ports: []v1.EndpointPort{{ - Name: "p12", - Port: 12, - Protocol: v1.ProtocolUDP, - }}, + Name: pointer.String("p12"), + Port: pointer.Int32(12), + Protocol: &udp, }} } - multipleSubsetsMultiplePortsLocal := func(ept *v1.Endpoints) { - ept.Subsets = []v1.EndpointSubset{{ - Addresses: []v1.EndpointAddress{{ - IP: "1.1.1.1", - NodeName: &nodeName, - }}, - Ports: []v1.EndpointPort{{ - Name: "p11", - Port: 11, - Protocol: v1.ProtocolUDP, - }, { - Name: "p12", - Port: 12, - Protocol: v1.ProtocolUDP, - }}, + multipleSubsets_s1 := func(eps *discovery.EndpointSlice) { + eps.Endpoints = []discovery.Endpoint{{ + Addresses: []string{"1.1.1.1"}, + }} + eps.Ports = []discovery.EndpointPort{{ + Name: pointer.String("p11"), + Port: pointer.Int32(11), + Protocol: &udp, + }} + } + multipleSubsets_s2 := func(eps *discovery.EndpointSlice) { + eps.Endpoints = []discovery.Endpoint{{ + Addresses: []string{"1.1.1.2"}, + }} + eps.Ports = []discovery.EndpointPort{{ + Name: pointer.String("p12"), + Port: pointer.Int32(12), + Protocol: &udp, + }} + } + multipleSubsetsWithLocal_s1 := func(eps *discovery.EndpointSlice) { + eps.Endpoints = []discovery.Endpoint{{ + Addresses: []string{"1.1.1.1"}, + }} + eps.Ports = []discovery.EndpointPort{{ + Name: pointer.String("p11"), + Port: pointer.Int32(11), + Protocol: &udp, + }} + } + multipleSubsetsWithLocal_s2 := func(eps *discovery.EndpointSlice) { + eps.Endpoints = []discovery.Endpoint{{ + Addresses: []string{"1.1.1.2"}, + NodeName: &nodeName, + }} + eps.Ports = []discovery.EndpointPort{{ + Name: pointer.String("p12"), + Port: pointer.Int32(12), + Protocol: &udp, + }} + } + multipleSubsetsMultiplePortsLocal_s1 := func(eps *discovery.EndpointSlice) { + eps.Endpoints = []discovery.Endpoint{{ + Addresses: []string{"1.1.1.1"}, + NodeName: &nodeName, + }} + eps.Ports = []discovery.EndpointPort{{ + Name: pointer.String("p11"), + Port: pointer.Int32(11), + Protocol: &udp, }, { - Addresses: []v1.EndpointAddress{{ - IP: "1.1.1.3", - }}, - Ports: []v1.EndpointPort{{ - Name: "p13", - Port: 13, - Protocol: v1.ProtocolUDP, - }}, + Name: pointer.String("p12"), + Port: pointer.Int32(12), + Protocol: &udp, }} } - multipleSubsetsIPsPorts1 := func(ept *v1.Endpoints) { - ept.Subsets = []v1.EndpointSubset{{ - Addresses: []v1.EndpointAddress{{ - IP: "1.1.1.1", - }, { - IP: "1.1.1.2", - NodeName: &nodeName, - }}, - Ports: []v1.EndpointPort{{ - Name: "p11", - Port: 11, - Protocol: v1.ProtocolUDP, - }, { - Name: "p12", - Port: 12, - Protocol: v1.ProtocolUDP, - }}, + multipleSubsetsMultiplePortsLocal_s2 := func(eps *discovery.EndpointSlice) { + eps.Endpoints = []discovery.Endpoint{{ + Addresses: []string{"1.1.1.3"}, + }} + eps.Ports = []discovery.EndpointPort{{ + Name: pointer.String("p13"), + Port: pointer.Int32(13), + Protocol: &udp, + }} + } + multipleSubsetsIPsPorts1_s1 := func(eps *discovery.EndpointSlice) { + eps.Endpoints = []discovery.Endpoint{{ + Addresses: []string{"1.1.1.1"}, }, { - Addresses: []v1.EndpointAddress{{ - IP: "1.1.1.3", - }, { - IP: "1.1.1.4", - NodeName: &nodeName, - }}, - Ports: []v1.EndpointPort{{ - Name: "p13", - Port: 13, - Protocol: v1.ProtocolUDP, - }, { - Name: "p14", - Port: 14, - Protocol: v1.ProtocolUDP, - }}, + Addresses: []string{"1.1.1.2"}, + NodeName: &nodeName, }} - } - multipleSubsetsIPsPorts2 := func(ept *v1.Endpoints) { - ept.Subsets = []v1.EndpointSubset{{ - Addresses: []v1.EndpointAddress{{ - IP: "2.2.2.1", - }, { - IP: "2.2.2.2", - NodeName: &nodeName, - }}, - Ports: []v1.EndpointPort{{ - Name: "p21", - Port: 21, - Protocol: v1.ProtocolUDP, - }, { - Name: "p22", - Port: 22, - Protocol: v1.ProtocolUDP, - }}, - }} - } - complexBefore1 := func(ept *v1.Endpoints) { - ept.Subsets = []v1.EndpointSubset{{ - Addresses: []v1.EndpointAddress{{ - IP: "1.1.1.1", - }}, - Ports: []v1.EndpointPort{{ - Name: "p11", - Port: 11, - Protocol: v1.ProtocolUDP, - }}, - }} - } - complexBefore2 := func(ept *v1.Endpoints) { - ept.Subsets = []v1.EndpointSubset{{ - Addresses: []v1.EndpointAddress{{ - IP: "2.2.2.2", - NodeName: &nodeName, - }, { - IP: "2.2.2.22", - NodeName: &nodeName, - }}, - Ports: []v1.EndpointPort{{ - Name: "p22", - Port: 22, - Protocol: v1.ProtocolUDP, - }}, + eps.Ports = []discovery.EndpointPort{{ + Name: pointer.String("p11"), + Port: pointer.Int32(11), + Protocol: &udp, }, { - Addresses: []v1.EndpointAddress{{ - IP: "2.2.2.3", - NodeName: &nodeName, - }}, - Ports: []v1.EndpointPort{{ - Name: "p23", - Port: 23, - Protocol: v1.ProtocolUDP, - }}, + Name: pointer.String("p12"), + Port: pointer.Int32(12), + Protocol: &udp, }} } - complexBefore4 := func(ept *v1.Endpoints) { - ept.Subsets = []v1.EndpointSubset{{ - Addresses: []v1.EndpointAddress{{ - IP: "4.4.4.4", - NodeName: &nodeName, - }, { - IP: "4.4.4.5", - NodeName: &nodeName, - }}, - Ports: []v1.EndpointPort{{ - Name: "p44", - Port: 44, - Protocol: v1.ProtocolUDP, - }}, + multipleSubsetsIPsPorts1_s2 := func(eps *discovery.EndpointSlice) { + eps.Endpoints = []discovery.Endpoint{{ + Addresses: []string{"1.1.1.3"}, }, { - Addresses: []v1.EndpointAddress{{ - IP: "4.4.4.6", - NodeName: &nodeName, - }}, - Ports: []v1.EndpointPort{{ - Name: "p45", - Port: 45, - Protocol: v1.ProtocolUDP, - }}, + Addresses: []string{"1.1.1.4"}, + NodeName: &nodeName, }} - } - complexAfter1 := func(ept *v1.Endpoints) { - ept.Subsets = []v1.EndpointSubset{{ - Addresses: []v1.EndpointAddress{{ - IP: "1.1.1.1", - }, { - IP: "1.1.1.11", - }}, - Ports: []v1.EndpointPort{{ - Name: "p11", - Port: 11, - Protocol: v1.ProtocolUDP, - }}, + eps.Ports = []discovery.EndpointPort{{ + Name: pointer.String("p13"), + Port: pointer.Int32(13), + Protocol: &udp, }, { - Addresses: []v1.EndpointAddress{{ - IP: "1.1.1.2", - }}, - Ports: []v1.EndpointPort{{ - Name: "p12", - Port: 12, - Protocol: v1.ProtocolUDP, - }, { - Name: "p122", - Port: 122, - Protocol: v1.ProtocolUDP, - }}, + Name: pointer.String("p14"), + Port: pointer.Int32(14), + Protocol: &udp, }} } - complexAfter3 := func(ept *v1.Endpoints) { - ept.Subsets = []v1.EndpointSubset{{ - Addresses: []v1.EndpointAddress{{ - IP: "3.3.3.3", - }}, - Ports: []v1.EndpointPort{{ - Name: "p33", - Port: 33, - Protocol: v1.ProtocolUDP, - }}, + multipleSubsetsIPsPorts2 := func(eps *discovery.EndpointSlice) { + eps.Endpoints = []discovery.Endpoint{{ + Addresses: []string{"2.2.2.1"}, + }, { + Addresses: []string{"2.2.2.2"}, + NodeName: &nodeName, + }} + eps.Ports = []discovery.EndpointPort{{ + Name: pointer.String("p21"), + Port: pointer.Int32(21), + Protocol: &udp, + }, { + Name: pointer.String("p22"), + Port: pointer.Int32(22), + Protocol: &udp, }} } - complexAfter4 := func(ept *v1.Endpoints) { - ept.Subsets = []v1.EndpointSubset{{ - Addresses: []v1.EndpointAddress{{ - IP: "4.4.4.4", - NodeName: &nodeName, - }}, - Ports: []v1.EndpointPort{{ - Name: "p44", - Port: 44, - Protocol: v1.ProtocolUDP, - }}, + complexBefore1 := func(eps *discovery.EndpointSlice) { + eps.Endpoints = []discovery.Endpoint{{ + Addresses: []string{"1.1.1.1"}, + }} + eps.Ports = []discovery.EndpointPort{{ + Name: pointer.String("p11"), + Port: pointer.Int32(11), + Protocol: &udp, + }} + } + complexBefore2_s1 := func(eps *discovery.EndpointSlice) { + eps.Endpoints = []discovery.Endpoint{{ + Addresses: []string{"2.2.2.2"}, + NodeName: &nodeName, + }, { + Addresses: []string{"2.2.2.22"}, + NodeName: &nodeName, + }} + eps.Ports = []discovery.EndpointPort{{ + Name: pointer.String("p22"), + Port: pointer.Int32(22), + Protocol: &udp, + }} + } + complexBefore2_s2 := func(eps *discovery.EndpointSlice) { + eps.Endpoints = []discovery.Endpoint{{ + Addresses: []string{"2.2.2.3"}, + NodeName: &nodeName, + }} + eps.Ports = []discovery.EndpointPort{{ + Name: pointer.String("p23"), + Port: pointer.Int32(23), + Protocol: &udp, + }} + } + complexBefore4_s1 := func(eps *discovery.EndpointSlice) { + eps.Endpoints = []discovery.Endpoint{{ + Addresses: []string{"4.4.4.4"}, + NodeName: &nodeName, + }, { + Addresses: []string{"4.4.4.5"}, + NodeName: &nodeName, + }} + eps.Ports = []discovery.EndpointPort{{ + Name: pointer.String("p44"), + Port: pointer.Int32(44), + Protocol: &udp, + }} + } + complexBefore4_s2 := func(eps *discovery.EndpointSlice) { + eps.Endpoints = []discovery.Endpoint{{ + Addresses: []string{"4.4.4.6"}, + NodeName: &nodeName, + }} + eps.Ports = []discovery.EndpointPort{{ + Name: pointer.String("p45"), + Port: pointer.Int32(45), + Protocol: &udp, + }} + } + complexAfter1_s1 := func(eps *discovery.EndpointSlice) { + eps.Endpoints = []discovery.Endpoint{{ + Addresses: []string{"1.1.1.1"}, + }, { + Addresses: []string{"1.1.1.11"}, + }} + eps.Ports = []discovery.EndpointPort{{ + Name: pointer.String("p11"), + Port: pointer.Int32(11), + Protocol: &udp, + }} + } + complexAfter1_s2 := func(eps *discovery.EndpointSlice) { + eps.Endpoints = []discovery.Endpoint{{ + Addresses: []string{"1.1.1.2"}, + }} + eps.Ports = []discovery.EndpointPort{{ + Name: pointer.String("p12"), + Port: pointer.Int32(12), + Protocol: &udp, + }, { + Name: pointer.String("p122"), + Port: pointer.Int32(122), + Protocol: &udp, + }} + } + complexAfter3 := func(eps *discovery.EndpointSlice) { + eps.Endpoints = []discovery.Endpoint{{ + Addresses: []string{"3.3.3.3"}, + }} + eps.Ports = []discovery.EndpointPort{{ + Name: pointer.String("p33"), + Port: pointer.Int32(33), + Protocol: &udp, + }} + } + complexAfter4 := func(eps *discovery.EndpointSlice) { + eps.Endpoints = []discovery.Endpoint{{ + Addresses: []string{"4.4.4.4"}, + NodeName: &nodeName, + }} + eps.Ports = []discovery.EndpointPort{{ + Name: pointer.String("p44"), + Port: pointer.Int32(44), + Protocol: &udp, }} } testCases := []struct { // previousEndpoints and currentEndpoints are used to call appropriate - // handlers OnEndpoints* (based on whether corresponding values are nil + // handlers OnEndpointSlice* (based on whether corresponding values are nil // or non-nil) and must be of equal length. name string - previousEndpoints []*v1.Endpoints - currentEndpoints []*v1.Endpoints + previousEndpoints []*discovery.EndpointSlice + currentEndpoints []*discovery.EndpointSlice oldEndpoints map[ServicePortName][]*BaseEndpointInfo expectedResult map[ServicePortName][]*BaseEndpointInfo expectedStaleEndpoints []ServiceEndpoint @@ -836,11 +517,11 @@ func TestUpdateEndpointsMap(t *testing.T) { expectedChangedEndpoints: sets.NewString(), }, { name: "no change, unnamed port", - previousEndpoints: []*v1.Endpoints{ - makeTestEndpoints("ns1", "ep1", unnamedPort), + previousEndpoints: []*discovery.EndpointSlice{ + makeTestEndpointSlice("ns1", "ep1", 1, unnamedPort), }, - currentEndpoints: []*v1.Endpoints{ - makeTestEndpoints("ns1", "ep1", unnamedPort), + currentEndpoints: []*discovery.EndpointSlice{ + makeTestEndpointSlice("ns1", "ep1", 1, unnamedPort), }, oldEndpoints: map[ServicePortName][]*BaseEndpointInfo{ makeServicePortName("ns1", "ep1", "", v1.ProtocolUDP): { @@ -858,11 +539,11 @@ func TestUpdateEndpointsMap(t *testing.T) { expectedChangedEndpoints: sets.NewString(), }, { name: "no change, named port, local", - previousEndpoints: []*v1.Endpoints{ - makeTestEndpoints("ns1", "ep1", namedPortLocal), + previousEndpoints: []*discovery.EndpointSlice{ + makeTestEndpointSlice("ns1", "ep1", 1, namedPortLocal), }, - currentEndpoints: []*v1.Endpoints{ - makeTestEndpoints("ns1", "ep1", namedPortLocal), + currentEndpoints: []*discovery.EndpointSlice{ + makeTestEndpointSlice("ns1", "ep1", 1, namedPortLocal), }, oldEndpoints: map[ServicePortName][]*BaseEndpointInfo{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { @@ -881,12 +562,14 @@ func TestUpdateEndpointsMap(t *testing.T) { }, expectedChangedEndpoints: sets.NewString(), }, { - name: "no change, multiple subsets", - previousEndpoints: []*v1.Endpoints{ - makeTestEndpoints("ns1", "ep1", multipleSubsets), + name: "no change, multiple slices", + previousEndpoints: []*discovery.EndpointSlice{ + makeTestEndpointSlice("ns1", "ep1", 1, multipleSubsets_s1), + makeTestEndpointSlice("ns1", "ep1", 2, multipleSubsets_s2), }, - currentEndpoints: []*v1.Endpoints{ - makeTestEndpoints("ns1", "ep1", multipleSubsets), + currentEndpoints: []*discovery.EndpointSlice{ + makeTestEndpointSlice("ns1", "ep1", 1, multipleSubsets_s1), + makeTestEndpointSlice("ns1", "ep1", 2, multipleSubsets_s2), }, oldEndpoints: map[ServicePortName][]*BaseEndpointInfo{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { @@ -909,12 +592,14 @@ func TestUpdateEndpointsMap(t *testing.T) { expectedHealthchecks: map[types.NamespacedName]int{}, expectedChangedEndpoints: sets.NewString(), }, { - name: "no change, multiple subsets, multiple ports, local", - previousEndpoints: []*v1.Endpoints{ - makeTestEndpoints("ns1", "ep1", multipleSubsetsMultiplePortsLocal), + name: "no change, multiple slices, multiple ports, local", + previousEndpoints: []*discovery.EndpointSlice{ + makeTestEndpointSlice("ns1", "ep1", 1, multipleSubsetsMultiplePortsLocal_s1), + makeTestEndpointSlice("ns1", "ep1", 2, multipleSubsetsMultiplePortsLocal_s2), }, - currentEndpoints: []*v1.Endpoints{ - makeTestEndpoints("ns1", "ep1", multipleSubsetsMultiplePortsLocal), + currentEndpoints: []*discovery.EndpointSlice{ + makeTestEndpointSlice("ns1", "ep1", 1, multipleSubsetsMultiplePortsLocal_s1), + makeTestEndpointSlice("ns1", "ep1", 2, multipleSubsetsMultiplePortsLocal_s2), }, oldEndpoints: map[ServicePortName][]*BaseEndpointInfo{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { @@ -945,14 +630,16 @@ func TestUpdateEndpointsMap(t *testing.T) { }, expectedChangedEndpoints: sets.NewString(), }, { - name: "no change, multiple endpoints, subsets, IPs, and ports", - previousEndpoints: []*v1.Endpoints{ - makeTestEndpoints("ns1", "ep1", multipleSubsetsIPsPorts1), - makeTestEndpoints("ns2", "ep2", multipleSubsetsIPsPorts2), + name: "no change, multiple services, slices, IPs, and ports", + previousEndpoints: []*discovery.EndpointSlice{ + makeTestEndpointSlice("ns1", "ep1", 1, multipleSubsetsIPsPorts1_s1), + makeTestEndpointSlice("ns1", "ep1", 2, multipleSubsetsIPsPorts1_s2), + makeTestEndpointSlice("ns2", "ep2", 1, multipleSubsetsIPsPorts2), }, - currentEndpoints: []*v1.Endpoints{ - makeTestEndpoints("ns1", "ep1", multipleSubsetsIPsPorts1), - makeTestEndpoints("ns2", "ep2", multipleSubsetsIPsPorts2), + currentEndpoints: []*discovery.EndpointSlice{ + makeTestEndpointSlice("ns1", "ep1", 1, multipleSubsetsIPsPorts1_s1), + makeTestEndpointSlice("ns1", "ep1", 2, multipleSubsetsIPsPorts1_s2), + makeTestEndpointSlice("ns2", "ep2", 1, multipleSubsetsIPsPorts2), }, oldEndpoints: map[ServicePortName][]*BaseEndpointInfo{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { @@ -1014,12 +701,12 @@ func TestUpdateEndpointsMap(t *testing.T) { }, expectedChangedEndpoints: sets.NewString(), }, { - name: "add an Endpoints", - previousEndpoints: []*v1.Endpoints{ + name: "add an EndpointSlice", + previousEndpoints: []*discovery.EndpointSlice{ nil, }, - currentEndpoints: []*v1.Endpoints{ - makeTestEndpoints("ns1", "ep1", unnamedPortLocal), + currentEndpoints: []*discovery.EndpointSlice{ + makeTestEndpointSlice("ns1", "ep1", 1, unnamedPortLocal), }, oldEndpoints: map[ServicePortName][]*BaseEndpointInfo{}, expectedResult: map[ServicePortName][]*BaseEndpointInfo{ @@ -1036,11 +723,11 @@ func TestUpdateEndpointsMap(t *testing.T) { }, expectedChangedEndpoints: sets.NewString("ns1/ep1"), }, { - name: "remove an Endpoints", - previousEndpoints: []*v1.Endpoints{ - makeTestEndpoints("ns1", "ep1", unnamedPortLocal), + name: "remove an EndpointSlice", + previousEndpoints: []*discovery.EndpointSlice{ + makeTestEndpointSlice("ns1", "ep1", 1, unnamedPortLocal), }, - currentEndpoints: []*v1.Endpoints{ + currentEndpoints: []*discovery.EndpointSlice{ nil, }, oldEndpoints: map[ServicePortName][]*BaseEndpointInfo{ @@ -1058,11 +745,11 @@ func TestUpdateEndpointsMap(t *testing.T) { expectedChangedEndpoints: sets.NewString("ns1/ep1"), }, { name: "add an IP and port", - previousEndpoints: []*v1.Endpoints{ - makeTestEndpoints("ns1", "ep1", namedPort), + previousEndpoints: []*discovery.EndpointSlice{ + makeTestEndpointSlice("ns1", "ep1", 1, namedPort), }, - currentEndpoints: []*v1.Endpoints{ - makeTestEndpoints("ns1", "ep1", namedPortsLocalNoLocal), + currentEndpoints: []*discovery.EndpointSlice{ + makeTestEndpointSlice("ns1", "ep1", 1, namedPortsLocalNoLocal), }, oldEndpoints: map[ServicePortName][]*BaseEndpointInfo{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { @@ -1089,11 +776,11 @@ func TestUpdateEndpointsMap(t *testing.T) { expectedChangedEndpoints: sets.NewString("ns1/ep1"), }, { name: "remove an IP and port", - previousEndpoints: []*v1.Endpoints{ - makeTestEndpoints("ns1", "ep1", namedPortsLocalNoLocal), + previousEndpoints: []*discovery.EndpointSlice{ + makeTestEndpointSlice("ns1", "ep1", 1, namedPortsLocalNoLocal), }, - currentEndpoints: []*v1.Endpoints{ - makeTestEndpoints("ns1", "ep1", namedPort), + currentEndpoints: []*discovery.EndpointSlice{ + makeTestEndpointSlice("ns1", "ep1", 1, namedPort), }, oldEndpoints: map[ServicePortName][]*BaseEndpointInfo{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { @@ -1124,12 +811,14 @@ func TestUpdateEndpointsMap(t *testing.T) { expectedHealthchecks: map[types.NamespacedName]int{}, expectedChangedEndpoints: sets.NewString("ns1/ep1"), }, { - name: "add a subset", - previousEndpoints: []*v1.Endpoints{ - makeTestEndpoints("ns1", "ep1", namedPort), + name: "add a slice to an endpoint", + previousEndpoints: []*discovery.EndpointSlice{ + makeTestEndpointSlice("ns1", "ep1", 1, namedPort), + nil, }, - currentEndpoints: []*v1.Endpoints{ - makeTestEndpoints("ns1", "ep1", multipleSubsetsWithLocal), + currentEndpoints: []*discovery.EndpointSlice{ + makeTestEndpointSlice("ns1", "ep1", 1, multipleSubsetsWithLocal_s1), + makeTestEndpointSlice("ns1", "ep1", 2, multipleSubsetsWithLocal_s2), }, oldEndpoints: map[ServicePortName][]*BaseEndpointInfo{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { @@ -1153,12 +842,14 @@ func TestUpdateEndpointsMap(t *testing.T) { }, expectedChangedEndpoints: sets.NewString("ns1/ep1"), }, { - name: "remove a subset", - previousEndpoints: []*v1.Endpoints{ - makeTestEndpoints("ns1", "ep1", multipleSubsets), + name: "remove a slice from an endpoint", + previousEndpoints: []*discovery.EndpointSlice{ + makeTestEndpointSlice("ns1", "ep1", 1, multipleSubsets_s1), + makeTestEndpointSlice("ns1", "ep1", 2, multipleSubsets_s2), }, - currentEndpoints: []*v1.Endpoints{ - makeTestEndpoints("ns1", "ep1", namedPort), + currentEndpoints: []*discovery.EndpointSlice{ + makeTestEndpointSlice("ns1", "ep1", 1, namedPort), + nil, }, oldEndpoints: map[ServicePortName][]*BaseEndpointInfo{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { @@ -1182,11 +873,11 @@ func TestUpdateEndpointsMap(t *testing.T) { expectedChangedEndpoints: sets.NewString("ns1/ep1"), }, { name: "rename a port", - previousEndpoints: []*v1.Endpoints{ - makeTestEndpoints("ns1", "ep1", namedPort), + previousEndpoints: []*discovery.EndpointSlice{ + makeTestEndpointSlice("ns1", "ep1", 1, namedPort), }, - currentEndpoints: []*v1.Endpoints{ - makeTestEndpoints("ns1", "ep1", namedPortRenamed), + currentEndpoints: []*discovery.EndpointSlice{ + makeTestEndpointSlice("ns1", "ep1", 1, namedPortRenamed), }, oldEndpoints: map[ServicePortName][]*BaseEndpointInfo{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { @@ -1209,11 +900,11 @@ func TestUpdateEndpointsMap(t *testing.T) { expectedChangedEndpoints: sets.NewString("ns1/ep1"), }, { name: "renumber a port", - previousEndpoints: []*v1.Endpoints{ - makeTestEndpoints("ns1", "ep1", namedPort), + previousEndpoints: []*discovery.EndpointSlice{ + makeTestEndpointSlice("ns1", "ep1", 1, namedPort), }, - currentEndpoints: []*v1.Endpoints{ - makeTestEndpoints("ns1", "ep1", namedPortRenumbered), + currentEndpoints: []*discovery.EndpointSlice{ + makeTestEndpointSlice("ns1", "ep1", 1, namedPortRenumbered), }, oldEndpoints: map[ServicePortName][]*BaseEndpointInfo{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { @@ -1234,25 +925,39 @@ func TestUpdateEndpointsMap(t *testing.T) { expectedChangedEndpoints: sets.NewString("ns1/ep1"), }, { name: "complex add and remove", - previousEndpoints: []*v1.Endpoints{ - makeTestEndpoints("ns1", "ep1", complexBefore1), - makeTestEndpoints("ns2", "ep2", complexBefore2), + previousEndpoints: []*discovery.EndpointSlice{ + makeTestEndpointSlice("ns1", "ep1", 1, complexBefore1), nil, - makeTestEndpoints("ns4", "ep4", complexBefore4), + + makeTestEndpointSlice("ns2", "ep2", 1, complexBefore2_s1), + makeTestEndpointSlice("ns2", "ep2", 2, complexBefore2_s2), + + nil, + nil, + + makeTestEndpointSlice("ns4", "ep4", 1, complexBefore4_s1), + makeTestEndpointSlice("ns4", "ep4", 2, complexBefore4_s2), }, - currentEndpoints: []*v1.Endpoints{ - makeTestEndpoints("ns1", "ep1", complexAfter1), + currentEndpoints: []*discovery.EndpointSlice{ + makeTestEndpointSlice("ns1", "ep1", 1, complexAfter1_s1), + makeTestEndpointSlice("ns1", "ep1", 2, complexAfter1_s2), + + nil, + nil, + + makeTestEndpointSlice("ns3", "ep3", 1, complexAfter3), + nil, + + makeTestEndpointSlice("ns4", "ep4", 1, complexAfter4), nil, - makeTestEndpoints("ns3", "ep3", complexAfter3), - makeTestEndpoints("ns4", "ep4", complexAfter4), }, oldEndpoints: map[ServicePortName][]*BaseEndpointInfo{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { {Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}, }, makeServicePortName("ns2", "ep2", "p22", v1.ProtocolUDP): { - {Endpoint: "2.2.2.2:22", IsLocal: true, Ready: true, Serving: true, Terminating: false}, {Endpoint: "2.2.2.22:22", IsLocal: true, Ready: true, Serving: true, Terminating: false}, + {Endpoint: "2.2.2.2:22", IsLocal: true, Ready: true, Serving: true, Terminating: false}, }, makeServicePortName("ns2", "ep2", "p23", v1.ProtocolUDP): { {Endpoint: "2.2.2.3:23", IsLocal: true, Ready: true, Serving: true, Terminating: false}, @@ -1267,8 +972,8 @@ func TestUpdateEndpointsMap(t *testing.T) { }, expectedResult: map[ServicePortName][]*BaseEndpointInfo{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { - {Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}, {Endpoint: "1.1.1.11:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}, + {Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}, }, makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): { {Endpoint: "1.1.1.2:12", IsLocal: false, Ready: true, Serving: true, Terminating: false}, @@ -1310,11 +1015,11 @@ func TestUpdateEndpointsMap(t *testing.T) { expectedChangedEndpoints: sets.NewString("ns1/ep1", "ns2/ep2", "ns3/ep3", "ns4/ep4"), }, { name: "change from 0 endpoint address to 1 unnamed port", - previousEndpoints: []*v1.Endpoints{ - makeTestEndpoints("ns1", "ep1", emptyEndpoint), + previousEndpoints: []*discovery.EndpointSlice{ + makeTestEndpointSlice("ns1", "ep1", 1, emptyEndpoint), }, - currentEndpoints: []*v1.Endpoints{ - makeTestEndpoints("ns1", "ep1", unnamedPort), + currentEndpoints: []*discovery.EndpointSlice{ + makeTestEndpointSlice("ns1", "ep1", 1, unnamedPort), }, oldEndpoints: map[ServicePortName][]*BaseEndpointInfo{}, expectedResult: map[ServicePortName][]*BaseEndpointInfo{ @@ -1340,7 +1045,7 @@ func TestUpdateEndpointsMap(t *testing.T) { // the fp.oldEndpoints is as we expect. for i := range tc.previousEndpoints { if tc.previousEndpoints[i] != nil { - fp.addEndpoints(tc.previousEndpoints[i]) + fp.addEndpointSlice(tc.previousEndpoints[i]) } } fp.endpointsMap.Update(fp.endpointsChanges) @@ -1355,12 +1060,14 @@ func TestUpdateEndpointsMap(t *testing.T) { for i := range tc.previousEndpoints { prev, curr := tc.previousEndpoints[i], tc.currentEndpoints[i] switch { + case prev == nil && curr == nil: + continue case prev == nil: - fp.addEndpoints(curr) + fp.addEndpointSlice(curr) case curr == nil: - fp.deleteEndpoints(prev) + fp.deleteEndpointSlice(prev) default: - fp.updateEndpoints(prev, curr) + fp.updateEndpointSlice(prev, curr) } } @@ -1416,24 +1123,38 @@ func TestLastChangeTriggerTime(t *testing.T) { t2 := t1.Add(time.Second) t3 := t2.Add(time.Second) - createEndpoints := func(namespace, name string, triggerTime time.Time) *v1.Endpoints { - e := makeTestEndpoints(namespace, name, func(ept *v1.Endpoints) { - ept.Subsets = []v1.EndpointSubset{{ - Addresses: []v1.EndpointAddress{{IP: "1.1.1.1"}}, - Ports: []v1.EndpointPort{{Port: 11}}, - }} - }) - e.Annotations[v1.EndpointsLastChangeTriggerTime] = triggerTime.Format(time.RFC3339Nano) - return e + createEndpoints := func(namespace, name string, triggerTime time.Time) *discovery.EndpointSlice { + tcp := v1.ProtocolTCP + return &discovery.EndpointSlice{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + Annotations: map[string]string{ + v1.EndpointsLastChangeTriggerTime: triggerTime.Format(time.RFC3339Nano), + }, + Labels: map[string]string{ + discovery.LabelServiceName: name, + }, + }, + AddressType: discovery.AddressTypeIPv4, + Endpoints: []discovery.Endpoint{{ + Addresses: []string{"1.1.1.1"}, + }}, + Ports: []discovery.EndpointPort{{ + Name: pointer.String("p11"), + Port: pointer.Int32(11), + Protocol: &tcp, + }}, + } } createName := func(namespace, name string) types.NamespacedName { return types.NamespacedName{Namespace: namespace, Name: name} } - modifyEndpoints := func(endpoints *v1.Endpoints, triggerTime time.Time) *v1.Endpoints { - e := endpoints.DeepCopy() - e.Subsets[0].Ports[0].Port++ + modifyEndpoints := func(slice *discovery.EndpointSlice, triggerTime time.Time) *discovery.EndpointSlice { + e := slice.DeepCopy() + (*e.Ports[0].Port)++ e.Annotations[v1.EndpointsLastChangeTriggerTime] = triggerTime.Format(time.RFC3339Nano) return e } @@ -1447,7 +1168,7 @@ func TestLastChangeTriggerTime(t *testing.T) { name: "Single addEndpoints", scenario: func(fp *FakeProxier) { e := createEndpoints("ns", "ep1", t0) - fp.addEndpoints(e) + fp.addEndpointSlice(e) }, expected: map[types.NamespacedName][]time.Time{createName("ns", "ep1"): {t0}}, }, @@ -1455,10 +1176,10 @@ func TestLastChangeTriggerTime(t *testing.T) { name: "addEndpoints then updatedEndpoints", scenario: func(fp *FakeProxier) { e := createEndpoints("ns", "ep1", t0) - fp.addEndpoints(e) + fp.addEndpointSlice(e) e1 := modifyEndpoints(e, t1) - fp.updateEndpoints(e, e1) + fp.updateEndpointSlice(e, e1) }, expected: map[types.NamespacedName][]time.Time{createName("ns", "ep1"): {t0, t1}}, }, @@ -1466,13 +1187,13 @@ func TestLastChangeTriggerTime(t *testing.T) { name: "Add two endpoints then modify one", scenario: func(fp *FakeProxier) { e1 := createEndpoints("ns", "ep1", t1) - fp.addEndpoints(e1) + fp.addEndpointSlice(e1) e2 := createEndpoints("ns", "ep2", t2) - fp.addEndpoints(e2) + fp.addEndpointSlice(e2) e11 := modifyEndpoints(e1, t3) - fp.updateEndpoints(e1, e11) + fp.updateEndpointSlice(e1, e11) }, expected: map[types.NamespacedName][]time.Time{createName("ns", "ep1"): {t1, t3}, createName("ns", "ep2"): {t2}}, }, @@ -1481,7 +1202,7 @@ func TestLastChangeTriggerTime(t *testing.T) { scenario: func(fp *FakeProxier) { e := createEndpoints("ns", "ep1", t1) delete(e.Annotations, v1.EndpointsLastChangeTriggerTime) - fp.addEndpoints(e) + fp.addEndpointSlice(e) }, expected: map[types.NamespacedName][]time.Time{}, }, @@ -1489,7 +1210,7 @@ func TestLastChangeTriggerTime(t *testing.T) { name: "Endpoints create before tracker started", scenario: func(fp *FakeProxier) { e := createEndpoints("ns", "ep1", t_1) - fp.addEndpoints(e) + fp.addEndpointSlice(e) }, expected: map[types.NamespacedName][]time.Time{}, }, @@ -1497,8 +1218,8 @@ func TestLastChangeTriggerTime(t *testing.T) { name: "addEndpoints then deleteEndpoints", scenario: func(fp *FakeProxier) { e := createEndpoints("ns", "ep1", t1) - fp.addEndpoints(e) - fp.deleteEndpoints(e) + fp.addEndpointSlice(e) + fp.deleteEndpointSlice(e) }, expected: map[types.NamespacedName][]time.Time{}, }, @@ -1506,10 +1227,10 @@ func TestLastChangeTriggerTime(t *testing.T) { name: "add then delete then add again", scenario: func(fp *FakeProxier) { e := createEndpoints("ns", "ep1", t1) - fp.addEndpoints(e) - fp.deleteEndpoints(e) + fp.addEndpointSlice(e) + fp.deleteEndpointSlice(e) e = modifyEndpoints(e, t2) - fp.addEndpoints(e) + fp.addEndpointSlice(e) }, expected: map[types.NamespacedName][]time.Time{createName("ns", "ep1"): {t2}}, }, @@ -1517,7 +1238,7 @@ func TestLastChangeTriggerTime(t *testing.T) { name: "delete", scenario: func(fp *FakeProxier) { e := createEndpoints("ns", "ep1", t1) - fp.deleteEndpoints(e) + fp.deleteEndpointSlice(e) }, expected: map[types.NamespacedName][]time.Time{}, }, @@ -1815,9 +1536,6 @@ func TestEndpointSliceUpdate(t *testing.T) { if !reflect.DeepEqual(got, tc.expectedReturnVal) { t.Errorf("EndpointSliceUpdate return value got: %v, want %v", got, tc.expectedReturnVal) } - if tc.endpointChangeTracker.items == nil { - t.Errorf("Expected ect.items to not be nil") - } pendingChanges := tc.endpointChangeTracker.PendingChanges() if !pendingChanges.Equal(tc.expectedChangedEndpoints) { diff --git a/pkg/proxy/service_test.go b/pkg/proxy/service_test.go index 8e3222eac29..a45ae5e490a 100644 --- a/pkg/proxy/service_test.go +++ b/pkg/proxy/service_test.go @@ -518,14 +518,10 @@ func newFakeProxier(ipFamily v1.IPFamily, t time.Time) *FakeProxier { serviceChanges: NewServiceChangeTracker(nil, ipFamily, nil, nil), endpointsMap: make(EndpointsMap), endpointsChanges: &EndpointChangeTracker{ - hostname: testHostname, - items: make(map[types.NamespacedName]*endpointsChange), - makeEndpointInfo: nil, - ipFamily: ipFamily, - recorder: nil, lastChangeTriggerTimes: make(map[types.NamespacedName][]time.Time), trackerStartTime: t, processEndpointsMapChange: nil, + endpointSliceCache: NewEndpointSliceCache(testHostname, ipFamily, nil, nil), }, } }