diff --git a/tools/cache/reflector.go b/tools/cache/reflector.go index 8d015833..45eaff52 100644 --- a/tools/cache/reflector.go +++ b/tools/cache/reflector.go @@ -510,7 +510,7 @@ func (r *Reflector) list(stopCh <-chan struct{}) error { pager.PageSize = 0 } - list, paginatedResult, err = pager.List(context.Background(), options) + list, paginatedResult, err = pager.ListWithAlloc(context.Background(), options) if isExpiredError(err) || isTooLargeResourceVersionError(err) { r.setIsLastSyncResourceVersionUnavailable(true) // Retry immediately if the resource version used to list is unavailable. @@ -519,7 +519,7 @@ func (r *Reflector) list(stopCh <-chan struct{}) error { // resource version it is listing at is expired or the cache may not yet be synced to the provided // resource version. So we need to fallback to resourceVersion="" in all to recover and ensure // the reflector makes forward progress. - list, paginatedResult, err = pager.List(context.Background(), metav1.ListOptions{ResourceVersion: r.relistResourceVersion()}) + list, paginatedResult, err = pager.ListWithAlloc(context.Background(), metav1.ListOptions{ResourceVersion: r.relistResourceVersion()}) } close(listCh) }() @@ -557,7 +557,7 @@ func (r *Reflector) list(stopCh <-chan struct{}) error { } resourceVersion = listMetaInterface.GetResourceVersion() initTrace.Step("Resource version extracted") - items, err := meta.ExtractList(list) + items, err := meta.ExtractListWithAlloc(list) if err != nil { return fmt.Errorf("unable to understand list result %#v (%v)", list, err) } diff --git a/tools/cache/reflector_test.go b/tools/cache/reflector_test.go index 8b9457a7..b26fe345 100644 --- a/tools/cache/reflector_test.go +++ b/tools/cache/reflector_test.go @@ -17,10 +17,12 @@ limitations under the License. package cache import ( + "context" "errors" "fmt" "math/rand" "reflect" + goruntime "runtime" "strconv" "syscall" "testing" @@ -28,10 +30,13 @@ import ( v1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" "k8s.io/utils/clock" @@ -1119,3 +1124,493 @@ func TestReflectorResourceVersionUpdate(t *testing.T) { t.Errorf("Expected series of resource version updates of %#v but got: %#v", expectedRVs, s.resourceVersions) } } + +const ( + fakeItemsNum = 100 + exemptObjectIndex = fakeItemsNum / 4 + pageNum = 3 +) + +func getPodListItems(start int, numItems int) (string, string, *v1.PodList) { + out := &v1.PodList{ + Items: make([]v1.Pod, numItems), + } + + for i := 0; i < numItems; i++ { + + out.Items[i] = v1.Pod{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "v1", + Kind: "Pod", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("pod-%d", i+start), + Namespace: "default", + Labels: map[string]string{ + "label-key-1": "label-value-1", + }, + Annotations: map[string]string{ + "annotations-key-1": "annotations-value-1", + }, + }, + Spec: v1.PodSpec{ + Overhead: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("3"), + v1.ResourceMemory: resource.MustParse("8"), + }, + NodeSelector: map[string]string{ + "foo": "bar", + "baz": "quux", + }, + Affinity: &v1.Affinity{ + NodeAffinity: &v1.NodeAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{ + NodeSelectorTerms: []v1.NodeSelectorTerm{ + {MatchExpressions: []v1.NodeSelectorRequirement{{Key: `foo`}}}, + }, + }, + PreferredDuringSchedulingIgnoredDuringExecution: []v1.PreferredSchedulingTerm{ + {Preference: v1.NodeSelectorTerm{MatchExpressions: []v1.NodeSelectorRequirement{{Key: `foo`}}}}, + }, + }, + }, + TopologySpreadConstraints: []v1.TopologySpreadConstraint{ + {TopologyKey: `foo`}, + }, + HostAliases: []v1.HostAlias{ + {IP: "1.1.1.1"}, + {IP: "2.2.2.2"}, + }, + ImagePullSecrets: []v1.LocalObjectReference{ + {Name: "secret1"}, + {Name: "secret2"}, + }, + Containers: []v1.Container{ + { + Name: "foobar", + Image: "alpine", + Resources: v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceName(v1.ResourceCPU): resource.MustParse("1"), + v1.ResourceName(v1.ResourceMemory): resource.MustParse("5"), + }, + Limits: v1.ResourceList{ + v1.ResourceName(v1.ResourceCPU): resource.MustParse("2"), + v1.ResourceName(v1.ResourceMemory): resource.MustParse("10"), + }, + }, + }, + { + Name: "foobar2", + Image: "alpine", + Resources: v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceName(v1.ResourceCPU): resource.MustParse("4"), + v1.ResourceName(v1.ResourceMemory): resource.MustParse("12"), + }, + Limits: v1.ResourceList{ + v1.ResourceName(v1.ResourceCPU): resource.MustParse("8"), + v1.ResourceName(v1.ResourceMemory): resource.MustParse("24"), + }, + }, + }, + }, + InitContainers: []v1.Container{ + { + Name: "small-init", + Image: "alpine", + Resources: v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceName(v1.ResourceCPU): resource.MustParse("1"), + v1.ResourceName(v1.ResourceMemory): resource.MustParse("5"), + }, + Limits: v1.ResourceList{ + v1.ResourceName(v1.ResourceCPU): resource.MustParse("1"), + v1.ResourceName(v1.ResourceMemory): resource.MustParse("5"), + }, + }, + }, + { + Name: "big-init", + Image: "alpine", + Resources: v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceName(v1.ResourceCPU): resource.MustParse("40"), + v1.ResourceName(v1.ResourceMemory): resource.MustParse("120"), + }, + Limits: v1.ResourceList{ + v1.ResourceName(v1.ResourceCPU): resource.MustParse("80"), + v1.ResourceName(v1.ResourceMemory): resource.MustParse("240"), + }, + }, + }, + }, + Hostname: fmt.Sprintf("node-%d", i), + }, + Status: v1.PodStatus{ + Phase: v1.PodRunning, + ContainerStatuses: []v1.ContainerStatus{ + { + ContainerID: "docker://numbers", + Image: "alpine", + Name: "foobar", + Ready: false, + }, + { + ContainerID: "docker://numbers", + Image: "alpine", + Name: "foobar2", + Ready: false, + }, + }, + InitContainerStatuses: []v1.ContainerStatus{ + { + ContainerID: "docker://numbers", + Image: "alpine", + Name: "small-init", + Ready: false, + }, + { + ContainerID: "docker://numbers", + Image: "alpine", + Name: "big-init", + Ready: false, + }, + }, + Conditions: []v1.PodCondition{ + { + Type: v1.PodScheduled, + Status: v1.ConditionTrue, + Reason: "successfully", + Message: "sync pod successfully", + LastProbeTime: metav1.Now(), + LastTransitionTime: metav1.Now(), + }, + }, + }, + } + } + + return out.Items[0].GetName(), out.Items[exemptObjectIndex].GetName(), out +} + +func getConfigmapListItems(start int, numItems int) (string, string, *v1.ConfigMapList) { + out := &v1.ConfigMapList{ + Items: make([]v1.ConfigMap, numItems), + } + + for i := 0; i < numItems; i++ { + out.Items[i] = v1.ConfigMap{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "v1", + Kind: "ConfigMap", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("cm-%d", i+start), + Namespace: "default", + Labels: map[string]string{ + "label-key-1": "label-value-1", + }, + Annotations: map[string]string{ + "annotations-key-1": "annotations-value-1", + }, + }, + Data: map[string]string{ + "data-1": "value-1", + "data-2": "value-2", + }, + } + } + + return out.Items[0].GetName(), out.Items[exemptObjectIndex].GetName(), out +} + +type TestPagingPodsLW struct { + totalPageCount int + fetchedPageCount int + + detectedObjectNameList []string + exemptObjectNameList []string +} + +func newPageTestLW(totalPageNum int) *TestPagingPodsLW { + return &TestPagingPodsLW{ + totalPageCount: totalPageNum, + fetchedPageCount: 0, + } +} + +func (t *TestPagingPodsLW) List(options metav1.ListOptions) (runtime.Object, error) { + firstPodName, exemptPodName, list := getPodListItems(t.fetchedPageCount*fakeItemsNum, fakeItemsNum) + t.detectedObjectNameList = append(t.detectedObjectNameList, firstPodName) + t.exemptObjectNameList = append(t.exemptObjectNameList, exemptPodName) + t.fetchedPageCount++ + if t.fetchedPageCount >= t.totalPageCount { + return list, nil + } + list.SetContinue("true") + return list, nil +} + +func (t *TestPagingPodsLW) Watch(options metav1.ListOptions) (watch.Interface, error) { + return nil, nil +} + +func TestReflectorListExtract(t *testing.T) { + store := NewStore(func(obj interface{}) (string, error) { + pod, ok := obj.(*v1.Pod) + if !ok { + return "", fmt.Errorf("expect *v1.Pod, but got %T", obj) + } + return pod.GetName(), nil + }) + + lw := newPageTestLW(5) + reflector := NewReflector(lw, &v1.Pod{}, store, 0) + reflector.WatchListPageSize = fakeItemsNum + + // execute list to fill store + stopCh := make(chan struct{}) + if err := reflector.list(stopCh); err != nil { + t.Fatal(err) + } + + // We will not delete exemptPod, + // in order to see if the existence of this Pod causes other Pods that are not used to be unable to properly clear. + for _, podName := range lw.exemptObjectNameList { + _, exist, err := store.GetByKey(podName) + if err != nil || !exist { + t.Fatalf("%s should exist in pod store", podName) + } + } + + // we will pay attention to whether the memory occupied by the first Pod is released + // Golang's can only be SetFinalizer for the first element of the array, + // so pod-0 will be the object of our attention + detectedPodAlreadyBeCleared := make(chan struct{}, len(lw.detectedObjectNameList)) + + for _, firstPodName := range lw.detectedObjectNameList { + _, exist, err := store.GetByKey(firstPodName) + if err != nil || !exist { + t.Fatalf("%s should exist in pod store", firstPodName) + } + firstPod, exist, err := store.GetByKey(firstPodName) + if err != nil || !exist { + t.Fatalf("%s should exist in pod store", firstPodName) + } + goruntime.SetFinalizer(firstPod, func(obj interface{}) { + t.Logf("%s already be gc\n", obj.(*v1.Pod).GetName()) + detectedPodAlreadyBeCleared <- struct{}{} + }) + } + + storedObjectKeys := store.ListKeys() + for _, k := range storedObjectKeys { + // delete all Pods except the exempted Pods. + if sets.NewString(lw.exemptObjectNameList...).Has(k) { + continue + } + obj, exist, err := store.GetByKey(k) + if err != nil || !exist { + t.Fatalf("%s should exist in pod store", k) + } + + if err := store.Delete(obj); err != nil { + t.Fatalf("delete object: %v", err) + } + goruntime.GC() + } + + clearedNum := 0 + for { + select { + case <-detectedPodAlreadyBeCleared: + clearedNum++ + if clearedNum == len(lw.detectedObjectNameList) { + return + } + } + } +} + +func BenchmarkExtractList(b *testing.B) { + _, _, podList := getPodListItems(0, fakeItemsNum) + _, _, configMapList := getConfigmapListItems(0, fakeItemsNum) + tests := []struct { + name string + list runtime.Object + }{ + { + name: "PodList", + list: podList, + }, + { + name: "ConfigMapList", + list: configMapList, + }, + } + + for _, tc := range tests { + b.Run(tc.name, func(b *testing.B) { + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, err := meta.ExtractList(tc.list) + if err != nil { + b.Errorf("extract list: %v", err) + } + } + b.StopTimer() + }) + } +} + +func BenchmarkEachListItem(b *testing.B) { + _, _, podList := getPodListItems(0, fakeItemsNum) + _, _, configMapList := getConfigmapListItems(0, fakeItemsNum) + tests := []struct { + name string + list runtime.Object + }{ + { + name: "PodList", + list: podList, + }, + { + name: "ConfigMapList", + list: configMapList, + }, + } + + for _, tc := range tests { + b.Run(tc.name, func(b *testing.B) { + b.ResetTimer() + for i := 0; i < b.N; i++ { + err := meta.EachListItem(tc.list, func(object runtime.Object) error { + return nil + }) + if err != nil { + b.Errorf("each list: %v", err) + } + } + b.StopTimer() + }) + } +} + +func BenchmarkExtractListWithAlloc(b *testing.B) { + _, _, podList := getPodListItems(0, fakeItemsNum) + _, _, configMapList := getConfigmapListItems(0, fakeItemsNum) + tests := []struct { + name string + list runtime.Object + }{ + { + name: "PodList", + list: podList, + }, + { + name: "ConfigMapList", + list: configMapList, + }, + } + + for _, tc := range tests { + b.Run(tc.name, func(b *testing.B) { + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, err := meta.ExtractListWithAlloc(tc.list) + if err != nil { + b.Errorf("extract list with alloc: %v", err) + } + } + b.StopTimer() + }) + } +} + +func BenchmarkEachListItemWithAlloc(b *testing.B) { + _, _, podList := getPodListItems(0, fakeItemsNum) + _, _, configMapList := getConfigmapListItems(0, fakeItemsNum) + tests := []struct { + name string + list runtime.Object + }{ + { + name: "PodList", + list: podList, + }, + { + name: "ConfigMapList", + list: configMapList, + }, + } + + for _, tc := range tests { + b.Run(tc.name, func(b *testing.B) { + b.ResetTimer() + for i := 0; i < b.N; i++ { + err := meta.EachListItemWithAlloc(tc.list, func(object runtime.Object) error { + return nil + }) + if err != nil { + b.Errorf("each list with alloc: %v", err) + } + } + b.StopTimer() + }) + } +} + +func BenchmarkReflectorList(b *testing.B) { + ctx, cancel := context.WithTimeout(context.Background(), wait.ForeverTestTimeout) + defer cancel() + + store := NewStore(func(obj interface{}) (string, error) { + o, err := meta.Accessor(obj) + if err != nil { + return "", err + } + return o.GetName(), nil + }) + + _, _, podList := getPodListItems(0, fakeItemsNum) + _, _, configMapList := getConfigmapListItems(0, fakeItemsNum) + tests := []struct { + name string + sample func() interface{} + list runtime.Object + }{ + { + name: "PodList", + sample: func() interface{} { + return v1.Pod{} + }, + list: podList, + }, + { + name: "ConfigMapList", + sample: func() interface{} { + return v1.ConfigMap{} + }, + list: configMapList, + }, + } + + for _, tc := range tests { + b.Run(tc.name, func(b *testing.B) { + + sample := tc.sample() + reflector := NewReflector(newPageTestLW(pageNum), &sample, store, 0) + reflector.WatchListPageSize = fakeItemsNum + + b.ResetTimer() + for i := 0; i < b.N; i++ { + err := reflector.list(ctx.Done()) + if err != nil { + b.Fatalf("reflect list: %v", err) + } + } + b.StopTimer() + }) + } +} diff --git a/tools/pager/pager.go b/tools/pager/pager.go index 9ba988f6..3c77cc37 100644 --- a/tools/pager/pager.go +++ b/tools/pager/pager.go @@ -73,7 +73,23 @@ func New(fn ListPageFunc) *ListPager { // List returns a single list object, but attempts to retrieve smaller chunks from the // server to reduce the impact on the server. If the chunk attempt fails, it will load // the full list instead. The Limit field on options, if unset, will default to the page size. +// +// If items in the returned list are retained for different durations, and you want to avoid +// retaining the whole slice returned by p.PageFn as long as any item is referenced, +// use ListWithAlloc instead. func (p *ListPager) List(ctx context.Context, options metav1.ListOptions) (runtime.Object, bool, error) { + return p.list(ctx, options, false) +} + +// ListWithAlloc works like List, but avoids retaining references to the items slice returned by p.PageFn. +// It does this by making a shallow copy of non-pointer items in the slice returned by p.PageFn. +// +// If the items in the returned list are not retained, or are retained for the same duration, use List instead for memory efficiency. +func (p *ListPager) ListWithAlloc(ctx context.Context, options metav1.ListOptions) (runtime.Object, bool, error) { + return p.list(ctx, options, true) +} + +func (p *ListPager) list(ctx context.Context, options metav1.ListOptions, allocNew bool) (runtime.Object, bool, error) { if options.Limit == 0 { options.Limit = p.PageSize } @@ -123,7 +139,11 @@ func (p *ListPager) List(ctx context.Context, options metav1.ListOptions) (runti list.ResourceVersion = m.GetResourceVersion() list.SelfLink = m.GetSelfLink() } - if err := meta.EachListItem(obj, func(obj runtime.Object) error { + eachListItemFunc := meta.EachListItem + if allocNew { + eachListItemFunc = meta.EachListItemWithAlloc + } + if err := eachListItemFunc(obj, func(obj runtime.Object) error { list.Items = append(list.Items, obj) return nil }); err != nil { @@ -156,12 +176,26 @@ func (p *ListPager) List(ctx context.Context, options metav1.ListOptions) (runti // // Items are retrieved in chunks from the server to reduce the impact on the server with up to // ListPager.PageBufferSize chunks buffered concurrently in the background. +// +// If items passed to fn are retained for different durations, and you want to avoid +// retaining the whole slice returned by p.PageFn as long as any item is referenced, +// use EachListItemWithAlloc instead. func (p *ListPager) EachListItem(ctx context.Context, options metav1.ListOptions, fn func(obj runtime.Object) error) error { return p.eachListChunkBuffered(ctx, options, func(obj runtime.Object) error { return meta.EachListItem(obj, fn) }) } +// EachListItemWithAlloc works like EachListItem, but avoids retaining references to the items slice returned by p.PageFn. +// It does this by making a shallow copy of non-pointer items in the slice returned by p.PageFn. +// +// If the items passed to fn are not retained, or are retained for the same duration, use EachListItem instead for memory efficiency. +func (p *ListPager) EachListItemWithAlloc(ctx context.Context, options metav1.ListOptions, fn func(obj runtime.Object) error) error { + return p.eachListChunkBuffered(ctx, options, func(obj runtime.Object) error { + return meta.EachListItemWithAlloc(obj, fn) + }) +} + // eachListChunkBuffered fetches runtimeObject list chunks using this ListPager and invokes fn on // each list chunk. If fn returns an error, processing stops and that error is returned. If fn does // not return an error, any error encountered while retrieving the list from the server is