mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-24 20:24:09 +00:00
Merge pull request #93441 from robscott/endpointslicemirroring-tracker-fix
Fixing memory leak in EndpointSliceMirroring EndpointSlice tracker
This commit is contained in:
commit
382107e6c8
@ -64,6 +64,7 @@ go_test(
|
||||
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/util/rand:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/informers:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library",
|
||||
|
@ -45,61 +45,76 @@ func newEndpointSliceTracker() *endpointSliceTracker {
|
||||
}
|
||||
}
|
||||
|
||||
// has returns true if the endpointSliceTracker has a resource version for the
|
||||
// Has returns true if the endpointSliceTracker has a resource version for the
|
||||
// provided EndpointSlice.
|
||||
func (est *endpointSliceTracker) has(endpointSlice *discovery.EndpointSlice) bool {
|
||||
func (est *endpointSliceTracker) Has(endpointSlice *discovery.EndpointSlice) bool {
|
||||
est.lock.Lock()
|
||||
defer est.lock.Unlock()
|
||||
|
||||
rrv := est.relatedResourceVersions(endpointSlice)
|
||||
_, ok := rrv[endpointSlice.Name]
|
||||
rrv, ok := est.relatedResourceVersions(endpointSlice)
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
_, ok = rrv[endpointSlice.Name]
|
||||
return ok
|
||||
}
|
||||
|
||||
// stale returns true if this endpointSliceTracker does not have a resource
|
||||
// Stale returns true if this endpointSliceTracker does not have a resource
|
||||
// version for the provided EndpointSlice or it does not match the resource
|
||||
// version of the provided EndpointSlice.
|
||||
func (est *endpointSliceTracker) stale(endpointSlice *discovery.EndpointSlice) bool {
|
||||
func (est *endpointSliceTracker) Stale(endpointSlice *discovery.EndpointSlice) bool {
|
||||
est.lock.Lock()
|
||||
defer est.lock.Unlock()
|
||||
|
||||
rrv := est.relatedResourceVersions(endpointSlice)
|
||||
rrv, ok := est.relatedResourceVersions(endpointSlice)
|
||||
if !ok {
|
||||
return true
|
||||
}
|
||||
return rrv[endpointSlice.Name] != endpointSlice.ResourceVersion
|
||||
}
|
||||
|
||||
// update adds or updates the resource version in this endpointSliceTracker for
|
||||
// Update adds or updates the resource version in this endpointSliceTracker for
|
||||
// the provided EndpointSlice.
|
||||
func (est *endpointSliceTracker) update(endpointSlice *discovery.EndpointSlice) {
|
||||
func (est *endpointSliceTracker) Update(endpointSlice *discovery.EndpointSlice) {
|
||||
est.lock.Lock()
|
||||
defer est.lock.Unlock()
|
||||
|
||||
rrv := est.relatedResourceVersions(endpointSlice)
|
||||
rrv, ok := est.relatedResourceVersions(endpointSlice)
|
||||
if !ok {
|
||||
rrv = endpointSliceResourceVersions{}
|
||||
est.resourceVersionsByService[getServiceNN(endpointSlice)] = rrv
|
||||
}
|
||||
rrv[endpointSlice.Name] = endpointSlice.ResourceVersion
|
||||
}
|
||||
|
||||
// delete removes the resource version in this endpointSliceTracker for the
|
||||
// provided EndpointSlice.
|
||||
func (est *endpointSliceTracker) delete(endpointSlice *discovery.EndpointSlice) {
|
||||
// DeleteService removes the set of resource versions tracked for the Service.
|
||||
func (est *endpointSliceTracker) DeleteService(namespace, name string) {
|
||||
est.lock.Lock()
|
||||
defer est.lock.Unlock()
|
||||
|
||||
rrv := est.relatedResourceVersions(endpointSlice)
|
||||
delete(rrv, endpointSlice.Name)
|
||||
serviceNN := types.NamespacedName{Name: name, Namespace: namespace}
|
||||
delete(est.resourceVersionsByService, serviceNN)
|
||||
}
|
||||
|
||||
// Delete removes the resource version in this endpointSliceTracker for the
|
||||
// provided EndpointSlice.
|
||||
func (est *endpointSliceTracker) Delete(endpointSlice *discovery.EndpointSlice) {
|
||||
est.lock.Lock()
|
||||
defer est.lock.Unlock()
|
||||
|
||||
rrv, ok := est.relatedResourceVersions(endpointSlice)
|
||||
if ok {
|
||||
delete(rrv, endpointSlice.Name)
|
||||
}
|
||||
}
|
||||
|
||||
// relatedResourceVersions returns the set of resource versions tracked for the
|
||||
// Service corresponding to the provided EndpointSlice. If no resource versions
|
||||
// are currently tracked for this service, an empty set is initialized.
|
||||
func (est *endpointSliceTracker) relatedResourceVersions(endpointSlice *discovery.EndpointSlice) endpointSliceResourceVersions {
|
||||
// Service corresponding to the provided EndpointSlice, and a bool to indicate
|
||||
// if it exists.
|
||||
func (est *endpointSliceTracker) relatedResourceVersions(endpointSlice *discovery.EndpointSlice) (endpointSliceResourceVersions, bool) {
|
||||
serviceNN := getServiceNN(endpointSlice)
|
||||
vers, ok := est.resourceVersionsByService[serviceNN]
|
||||
|
||||
if !ok {
|
||||
vers = endpointSliceResourceVersions{}
|
||||
est.resourceVersionsByService[serviceNN] = vers
|
||||
}
|
||||
|
||||
return vers
|
||||
return vers, ok
|
||||
}
|
||||
|
||||
// getServiceNN returns a namespaced name for the Service corresponding to the
|
||||
|
@ -19,8 +19,11 @@ package endpointslicemirroring
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
||||
discovery "k8s.io/api/discovery/v1beta1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
)
|
||||
|
||||
func TestEndpointSliceTrackerUpdate(t *testing.T) {
|
||||
@ -43,47 +46,69 @@ func TestEndpointSliceTrackerUpdate(t *testing.T) {
|
||||
epSlice1DifferentRV.ResourceVersion = "rv2"
|
||||
|
||||
testCases := map[string]struct {
|
||||
updateParam *discovery.EndpointSlice
|
||||
checksParam *discovery.EndpointSlice
|
||||
expectHas bool
|
||||
expectStale bool
|
||||
updateParam *discovery.EndpointSlice
|
||||
checksParam *discovery.EndpointSlice
|
||||
expectHas bool
|
||||
expectStale bool
|
||||
expectResourceVersionsByService map[types.NamespacedName]endpointSliceResourceVersions
|
||||
}{
|
||||
"same slice": {
|
||||
updateParam: epSlice1,
|
||||
checksParam: epSlice1,
|
||||
expectHas: true,
|
||||
expectStale: false,
|
||||
expectResourceVersionsByService: map[types.NamespacedName]endpointSliceResourceVersions{
|
||||
{Namespace: epSlice1.Namespace, Name: "svc1"}: {
|
||||
epSlice1.Name: epSlice1.ResourceVersion,
|
||||
},
|
||||
},
|
||||
},
|
||||
"different namespace": {
|
||||
updateParam: epSlice1,
|
||||
checksParam: epSlice1DifferentNS,
|
||||
expectHas: false,
|
||||
expectStale: true,
|
||||
expectResourceVersionsByService: map[types.NamespacedName]endpointSliceResourceVersions{
|
||||
{Namespace: epSlice1.Namespace, Name: "svc1"}: {
|
||||
epSlice1.Name: epSlice1.ResourceVersion,
|
||||
},
|
||||
},
|
||||
},
|
||||
"different service": {
|
||||
updateParam: epSlice1,
|
||||
checksParam: epSlice1DifferentService,
|
||||
expectHas: false,
|
||||
expectStale: true,
|
||||
expectResourceVersionsByService: map[types.NamespacedName]endpointSliceResourceVersions{
|
||||
{Namespace: epSlice1.Namespace, Name: "svc1"}: {
|
||||
epSlice1.Name: epSlice1.ResourceVersion,
|
||||
},
|
||||
},
|
||||
},
|
||||
"different resource version": {
|
||||
updateParam: epSlice1,
|
||||
checksParam: epSlice1DifferentRV,
|
||||
expectHas: true,
|
||||
expectStale: true,
|
||||
expectResourceVersionsByService: map[types.NamespacedName]endpointSliceResourceVersions{
|
||||
{Namespace: epSlice1.Namespace, Name: "svc1"}: {
|
||||
epSlice1.Name: epSlice1.ResourceVersion,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for name, tc := range testCases {
|
||||
t.Run(name, func(t *testing.T) {
|
||||
esTracker := newEndpointSliceTracker()
|
||||
esTracker.update(tc.updateParam)
|
||||
if esTracker.has(tc.checksParam) != tc.expectHas {
|
||||
t.Errorf("tc.tracker.has(%+v) == %t, expected %t", tc.checksParam, esTracker.has(tc.checksParam), tc.expectHas)
|
||||
esTracker.Update(tc.updateParam)
|
||||
if esTracker.Has(tc.checksParam) != tc.expectHas {
|
||||
t.Errorf("tc.tracker.Has(%+v) == %t, expected %t", tc.checksParam, esTracker.Has(tc.checksParam), tc.expectHas)
|
||||
}
|
||||
if esTracker.stale(tc.checksParam) != tc.expectStale {
|
||||
t.Errorf("tc.tracker.stale(%+v) == %t, expected %t", tc.checksParam, esTracker.stale(tc.checksParam), tc.expectStale)
|
||||
if esTracker.Stale(tc.checksParam) != tc.expectStale {
|
||||
t.Errorf("tc.tracker.Stale(%+v) == %t, expected %t", tc.checksParam, esTracker.Stale(tc.checksParam), tc.expectStale)
|
||||
}
|
||||
assert.Equal(t, tc.expectResourceVersionsByService, esTracker.resourceVersionsByService)
|
||||
})
|
||||
}
|
||||
}
|
||||
@ -160,15 +185,81 @@ func TestEndpointSliceTrackerDelete(t *testing.T) {
|
||||
for name, tc := range testCases {
|
||||
t.Run(name, func(t *testing.T) {
|
||||
esTracker := newEndpointSliceTracker()
|
||||
esTracker.update(epSlice1)
|
||||
esTracker.Update(epSlice1)
|
||||
|
||||
esTracker.delete(tc.deleteParam)
|
||||
if esTracker.has(tc.checksParam) != tc.expectHas {
|
||||
t.Errorf("esTracker.has(%+v) == %t, expected %t", tc.checksParam, esTracker.has(tc.checksParam), tc.expectHas)
|
||||
esTracker.Delete(tc.deleteParam)
|
||||
if esTracker.Has(tc.checksParam) != tc.expectHas {
|
||||
t.Errorf("esTracker.Has(%+v) == %t, expected %t", tc.checksParam, esTracker.Has(tc.checksParam), tc.expectHas)
|
||||
}
|
||||
if esTracker.stale(tc.checksParam) != tc.expectStale {
|
||||
t.Errorf("esTracker.stale(%+v) == %t, expected %t", tc.checksParam, esTracker.stale(tc.checksParam), tc.expectStale)
|
||||
if esTracker.Stale(tc.checksParam) != tc.expectStale {
|
||||
t.Errorf("esTracker.Stale(%+v) == %t, expected %t", tc.checksParam, esTracker.Stale(tc.checksParam), tc.expectStale)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestEndpointSliceTrackerDeleteService(t *testing.T) {
|
||||
svcName1, svcNS1 := "svc1", "ns1"
|
||||
svcName2, svcNS2 := "svc2", "ns2"
|
||||
epSlice1 := &discovery.EndpointSlice{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "example-1",
|
||||
Namespace: svcNS1,
|
||||
ResourceVersion: "rv1",
|
||||
Labels: map[string]string{discovery.LabelServiceName: svcName1},
|
||||
},
|
||||
}
|
||||
|
||||
testCases := map[string]struct {
|
||||
updateParam *discovery.EndpointSlice
|
||||
deleteServiceParam *types.NamespacedName
|
||||
expectHas bool
|
||||
expectStale bool
|
||||
expectResourceVersionsByService map[types.NamespacedName]endpointSliceResourceVersions
|
||||
}{
|
||||
"same service": {
|
||||
updateParam: epSlice1,
|
||||
deleteServiceParam: &types.NamespacedName{Namespace: svcNS1, Name: svcName1},
|
||||
expectHas: false,
|
||||
expectStale: true,
|
||||
expectResourceVersionsByService: map[types.NamespacedName]endpointSliceResourceVersions{},
|
||||
},
|
||||
"different namespace": {
|
||||
updateParam: epSlice1,
|
||||
deleteServiceParam: &types.NamespacedName{Namespace: svcNS2, Name: svcName1},
|
||||
expectHas: true,
|
||||
expectStale: false,
|
||||
expectResourceVersionsByService: map[types.NamespacedName]endpointSliceResourceVersions{
|
||||
{Namespace: epSlice1.Namespace, Name: "svc1"}: {
|
||||
epSlice1.Name: epSlice1.ResourceVersion,
|
||||
},
|
||||
},
|
||||
},
|
||||
"different service": {
|
||||
updateParam: epSlice1,
|
||||
deleteServiceParam: &types.NamespacedName{Namespace: svcNS1, Name: svcName2},
|
||||
expectHas: true,
|
||||
expectStale: false,
|
||||
expectResourceVersionsByService: map[types.NamespacedName]endpointSliceResourceVersions{
|
||||
{Namespace: epSlice1.Namespace, Name: "svc1"}: {
|
||||
epSlice1.Name: epSlice1.ResourceVersion,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for name, tc := range testCases {
|
||||
t.Run(name, func(t *testing.T) {
|
||||
esTracker := newEndpointSliceTracker()
|
||||
esTracker.Update(tc.updateParam)
|
||||
esTracker.DeleteService(tc.deleteServiceParam.Namespace, tc.deleteServiceParam.Name)
|
||||
if esTracker.Has(tc.updateParam) != tc.expectHas {
|
||||
t.Errorf("tc.tracker.Has(%+v) == %t, expected %t", tc.updateParam, esTracker.Has(tc.updateParam), tc.expectHas)
|
||||
}
|
||||
if esTracker.Stale(tc.updateParam) != tc.expectStale {
|
||||
t.Errorf("tc.tracker.Stale(%+v) == %t, expected %t", tc.updateParam, esTracker.Stale(tc.updateParam), tc.expectStale)
|
||||
}
|
||||
assert.Equal(t, tc.expectResourceVersionsByService, esTracker.resourceVersionsByService)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
@ -285,6 +285,7 @@ func (c *Controller) syncEndpoints(key string) error {
|
||||
endpoints, err := c.endpointsLister.Endpoints(namespace).Get(name)
|
||||
if err != nil || !c.shouldMirror(endpoints) {
|
||||
if apierrors.IsNotFound(err) || !c.shouldMirror(endpoints) {
|
||||
c.endpointSliceTracker.DeleteService(namespace, name)
|
||||
return c.reconciler.deleteEndpoints(namespace, name, endpointSlices)
|
||||
}
|
||||
return err
|
||||
@ -389,7 +390,7 @@ func (c *Controller) onEndpointSliceAdd(obj interface{}) {
|
||||
utilruntime.HandleError(fmt.Errorf("onEndpointSliceAdd() expected type discovery.EndpointSlice, got %T", obj))
|
||||
return
|
||||
}
|
||||
if managedByController(endpointSlice) && c.endpointSliceTracker.stale(endpointSlice) {
|
||||
if managedByController(endpointSlice) && c.endpointSliceTracker.Stale(endpointSlice) {
|
||||
c.queueEndpointsForEndpointSlice(endpointSlice)
|
||||
}
|
||||
}
|
||||
@ -405,7 +406,7 @@ func (c *Controller) onEndpointSliceUpdate(prevObj, obj interface{}) {
|
||||
utilruntime.HandleError(fmt.Errorf("onEndpointSliceUpdated() expected type discovery.EndpointSlice, got %T, %T", prevObj, obj))
|
||||
return
|
||||
}
|
||||
if managedByChanged(prevEndpointSlice, endpointSlice) || (managedByController(endpointSlice) && c.endpointSliceTracker.stale(endpointSlice)) {
|
||||
if managedByChanged(prevEndpointSlice, endpointSlice) || (managedByController(endpointSlice) && c.endpointSliceTracker.Stale(endpointSlice)) {
|
||||
c.queueEndpointsForEndpointSlice(endpointSlice)
|
||||
}
|
||||
}
|
||||
@ -419,7 +420,7 @@ func (c *Controller) onEndpointSliceDelete(obj interface{}) {
|
||||
utilruntime.HandleError(fmt.Errorf("onEndpointSliceDelete() expected type discovery.EndpointSlice, got %T", obj))
|
||||
return
|
||||
}
|
||||
if managedByController(endpointSlice) && c.endpointSliceTracker.has(endpointSlice) {
|
||||
if managedByController(endpointSlice) && c.endpointSliceTracker.Has(endpointSlice) {
|
||||
c.queueEndpointsForEndpointSlice(endpointSlice)
|
||||
}
|
||||
}
|
||||
|
@ -246,7 +246,7 @@ func (r *reconciler) finalize(endpoints *corev1.Endpoints, slices slicesByAction
|
||||
}
|
||||
errs = append(errs, fmt.Errorf("Error creating EndpointSlice for Endpoints %s/%s: %v", endpoints.Namespace, endpoints.Name, err))
|
||||
} else {
|
||||
r.endpointSliceTracker.update(createdSlice)
|
||||
r.endpointSliceTracker.Update(createdSlice)
|
||||
metrics.EndpointSliceChanges.WithLabelValues("create").Inc()
|
||||
}
|
||||
}
|
||||
@ -257,7 +257,7 @@ func (r *reconciler) finalize(endpoints *corev1.Endpoints, slices slicesByAction
|
||||
if err != nil {
|
||||
errs = append(errs, fmt.Errorf("Error updating %s EndpointSlice for Endpoints %s/%s: %v", endpointSlice.Name, endpoints.Namespace, endpoints.Name, err))
|
||||
} else {
|
||||
r.endpointSliceTracker.update(updatedSlice)
|
||||
r.endpointSliceTracker.Update(updatedSlice)
|
||||
metrics.EndpointSliceChanges.WithLabelValues("update").Inc()
|
||||
}
|
||||
}
|
||||
@ -267,7 +267,7 @@ func (r *reconciler) finalize(endpoints *corev1.Endpoints, slices slicesByAction
|
||||
if err != nil {
|
||||
errs = append(errs, fmt.Errorf("Error deleting %s EndpointSlice for Endpoints %s/%s: %v", endpointSlice.Name, endpoints.Namespace, endpoints.Name, err))
|
||||
} else {
|
||||
r.endpointSliceTracker.delete(endpointSlice)
|
||||
r.endpointSliceTracker.Delete(endpointSlice)
|
||||
metrics.EndpointSliceChanges.WithLabelValues("delete").Inc()
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user