diff --git a/pkg/proxy/endpointslicecache.go b/pkg/proxy/endpointslicecache.go index c967f9f5172..5671404fcd3 100644 --- a/pkg/proxy/endpointslicecache.go +++ b/pkg/proxy/endpointslicecache.go @@ -149,6 +149,9 @@ func (cache *EndpointSliceCache) checkoutChanges() map[types.NamespacedName]*end } delete(esTracker.pending, name) + if len(esTracker.applied) == 0 && len(esTracker.pending) == 0 { + delete(cache.trackerByServiceMap, serviceNN) + } } change.current = cache.getEndpointsMap(serviceNN, esTracker.applied) diff --git a/pkg/proxy/endpointslicecache_test.go b/pkg/proxy/endpointslicecache_test.go index c51203c1357..f1afd63a53c 100644 --- a/pkg/proxy/endpointslicecache_test.go +++ b/pkg/proxy/endpointslicecache_test.go @@ -20,6 +20,7 @@ import ( "fmt" "reflect" "testing" + "time" v1 "k8s.io/api/core/v1" discovery "k8s.io/api/discovery/v1" @@ -559,3 +560,107 @@ func (cmc *cacheMutationCheck) Check(t *testing.T) { } } } + +func TestEndpointSliceCacheClearedCorrectly(t *testing.T) { + initEndpointsPorts := func(eps *discovery.EndpointSlice) { + eps.Endpoints = []discovery.Endpoint{{ + Addresses: []string{"1.1.1.1"}, + NodeName: ptr.To(testHostname), + }} + eps.Ports = []discovery.EndpointPort{{ + Name: ptr.To("n1"), + Port: ptr.To[int32](11), + Protocol: ptr.To(v1.ProtocolUDP), + }} + } + + testCases := []struct { + name string + currEndpointSlices []*discovery.EndpointSlice + }{ + { + name: "one endpoint slice", + currEndpointSlices: []*discovery.EndpointSlice{ + makeTestEndpointSlice("ns1", "ep1", 1, initEndpointsPorts), + }, + }, + { + name: "two endpoint slices, same namespace", + currEndpointSlices: []*discovery.EndpointSlice{ + makeTestEndpointSlice("ns1", "ep1", 1, initEndpointsPorts), + makeTestEndpointSlice("ns1", "ep2", 2, initEndpointsPorts), + }, + }, + { + name: "two endpoint slices, same service", + currEndpointSlices: []*discovery.EndpointSlice{ + makeTestEndpointSlice("ns1", "ep1", 1, initEndpointsPorts), + makeTestEndpointSlice("ns1", "ep1", 2, initEndpointsPorts), + }, + }, + { + name: "two endpoint slices, different namespace", + currEndpointSlices: []*discovery.EndpointSlice{ + makeTestEndpointSlice("ns1", "ep1", 1, initEndpointsPorts), + makeTestEndpointSlice("ns2", "ep1", 2, initEndpointsPorts), + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + fp := newFakeProxier(v1.IPv4Protocol, time.Time{}) + fp.hostname = testHostname + + for _, epSlice := range tc.currEndpointSlices { + fp.addEndpointSlice(epSlice) + } + fp.endpointsMap.Update(fp.endpointsChanges) + + for _, epSlice := range tc.currEndpointSlices { + fp.deleteEndpointSlice(epSlice) + } + fp.endpointsMap.Update(fp.endpointsChanges) + + if len(fp.endpointsChanges.endpointSliceCache.trackerByServiceMap) != 0 { + t.Errorf("expected: endpointSliceCache does not have any entries, got: %v", fp.endpointsChanges.endpointSliceCache.trackerByServiceMap) + } + }) + } +} + +func TestSameServiceEndpointSliceCacheClearedCorrectly(t *testing.T) { + initEndpointsPorts := func(eps *discovery.EndpointSlice) { + eps.Endpoints = []discovery.Endpoint{{ + Addresses: []string{"1.1.1.1"}, + NodeName: ptr.To(testHostname), + }} + eps.Ports = []discovery.EndpointPort{{ + Name: ptr.To("n1"), + Port: ptr.To[int32](11), + Protocol: ptr.To(v1.ProtocolUDP), + }} + } + + currEndpointSlices := []*discovery.EndpointSlice{ + makeTestEndpointSlice("ns1", "svc1", 1, initEndpointsPorts), + makeTestEndpointSlice("ns1", "svc1", 2, initEndpointsPorts), + } + + fp := newFakeProxier(v1.IPv4Protocol, time.Time{}) + fp.hostname = testHostname + + for _, epSlice := range currEndpointSlices { + fp.addEndpointSlice(epSlice) + } + fp.endpointsMap.Update(fp.endpointsChanges) + + // only delete the first endpoint slice + fp.deleteEndpointSlice(currEndpointSlices[0]) + + fp.endpointsMap.Update(fp.endpointsChanges) + + if len(fp.endpointsChanges.endpointSliceCache.trackerByServiceMap) != 1 { + t.Errorf("expected: endpointSliceCache to have one entries, got: %v", fp.endpointsChanges.endpointSliceCache.trackerByServiceMap) + } +} diff --git a/pkg/proxy/servicechangetracker_test.go b/pkg/proxy/servicechangetracker_test.go index e139d664824..8f58424754b 100644 --- a/pkg/proxy/servicechangetracker_test.go +++ b/pkg/proxy/servicechangetracker_test.go @@ -900,3 +900,23 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) { t.Errorf("expected healthcheck ports length 0, got %v", healthCheckNodePorts) } } + +func TestServiceCacheLeaks(t *testing.T) { + fp := newFakeProxier(v1.IPv4Protocol, time.Time{}) + + service := makeTestService("ns1", "svc1", func(svc *v1.Service) { + svc.Spec.Type = v1.ServiceTypeClusterIP + svc.Spec.ClusterIP = "172.16.55.4" + svc.Spec.Ports = addTestPort(svc.Spec.Ports, "p1", "UDP", 1234, 4321, 0) + svc.Spec.Ports = addTestPort(svc.Spec.Ports, "p2", "TCP", 1235, 5321, 0) + }) + fp.addService(service) + if len(fp.serviceChanges.items) != 1 { + t.Errorf("Found %d items on the cache, 1 expected", len(fp.serviceChanges.items)) + } + + fp.deleteService(service) + if len(fp.serviceChanges.items) > 0 { + t.Errorf("Found %d items on the cache, 0 expected", len(fp.serviceChanges.items)) + } +}