mirror of
https://github.com/kubernetes/client-go.git
synced 2025-06-24 14:12:18 +00:00
Add WithAlloc interface and stub implementations with base benchmarks
Kubernetes-commit: b8a3bd673dc61b4d7a0ad0f54fa423e0160078cf
This commit is contained in:
parent
f21df6e02d
commit
6c7d1bc996
6
tools/cache/reflector.go
vendored
6
tools/cache/reflector.go
vendored
@ -510,7 +510,7 @@ func (r *Reflector) list(stopCh <-chan struct{}) error {
|
||||
pager.PageSize = 0
|
||||
}
|
||||
|
||||
list, paginatedResult, err = pager.List(context.Background(), options)
|
||||
list, paginatedResult, err = pager.ListWithAlloc(context.Background(), options)
|
||||
if isExpiredError(err) || isTooLargeResourceVersionError(err) {
|
||||
r.setIsLastSyncResourceVersionUnavailable(true)
|
||||
// Retry immediately if the resource version used to list is unavailable.
|
||||
@ -519,7 +519,7 @@ func (r *Reflector) list(stopCh <-chan struct{}) error {
|
||||
// resource version it is listing at is expired or the cache may not yet be synced to the provided
|
||||
// resource version. So we need to fallback to resourceVersion="" in all to recover and ensure
|
||||
// the reflector makes forward progress.
|
||||
list, paginatedResult, err = pager.List(context.Background(), metav1.ListOptions{ResourceVersion: r.relistResourceVersion()})
|
||||
list, paginatedResult, err = pager.ListWithAlloc(context.Background(), metav1.ListOptions{ResourceVersion: r.relistResourceVersion()})
|
||||
}
|
||||
close(listCh)
|
||||
}()
|
||||
@ -557,7 +557,7 @@ func (r *Reflector) list(stopCh <-chan struct{}) error {
|
||||
}
|
||||
resourceVersion = listMetaInterface.GetResourceVersion()
|
||||
initTrace.Step("Resource version extracted")
|
||||
items, err := meta.ExtractList(list)
|
||||
items, err := meta.ExtractListWithAlloc(list)
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to understand list result %#v (%v)", list, err)
|
||||
}
|
||||
|
495
tools/cache/reflector_test.go
vendored
495
tools/cache/reflector_test.go
vendored
@ -17,10 +17,12 @@ limitations under the License.
|
||||
package cache
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"reflect"
|
||||
goruntime "runtime"
|
||||
"strconv"
|
||||
"syscall"
|
||||
"testing"
|
||||
@ -28,10 +30,13 @@ import (
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
"k8s.io/apimachinery/pkg/api/meta"
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
"k8s.io/utils/clock"
|
||||
@ -1119,3 +1124,493 @@ func TestReflectorResourceVersionUpdate(t *testing.T) {
|
||||
t.Errorf("Expected series of resource version updates of %#v but got: %#v", expectedRVs, s.resourceVersions)
|
||||
}
|
||||
}
|
||||
|
||||
const (
|
||||
fakeItemsNum = 100
|
||||
exemptObjectIndex = fakeItemsNum / 4
|
||||
pageNum = 3
|
||||
)
|
||||
|
||||
func getPodListItems(start int, numItems int) (string, string, *v1.PodList) {
|
||||
out := &v1.PodList{
|
||||
Items: make([]v1.Pod, numItems),
|
||||
}
|
||||
|
||||
for i := 0; i < numItems; i++ {
|
||||
|
||||
out.Items[i] = v1.Pod{
|
||||
TypeMeta: metav1.TypeMeta{
|
||||
APIVersion: "v1",
|
||||
Kind: "Pod",
|
||||
},
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: fmt.Sprintf("pod-%d", i+start),
|
||||
Namespace: "default",
|
||||
Labels: map[string]string{
|
||||
"label-key-1": "label-value-1",
|
||||
},
|
||||
Annotations: map[string]string{
|
||||
"annotations-key-1": "annotations-value-1",
|
||||
},
|
||||
},
|
||||
Spec: v1.PodSpec{
|
||||
Overhead: v1.ResourceList{
|
||||
v1.ResourceCPU: resource.MustParse("3"),
|
||||
v1.ResourceMemory: resource.MustParse("8"),
|
||||
},
|
||||
NodeSelector: map[string]string{
|
||||
"foo": "bar",
|
||||
"baz": "quux",
|
||||
},
|
||||
Affinity: &v1.Affinity{
|
||||
NodeAffinity: &v1.NodeAffinity{
|
||||
RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{
|
||||
NodeSelectorTerms: []v1.NodeSelectorTerm{
|
||||
{MatchExpressions: []v1.NodeSelectorRequirement{{Key: `foo`}}},
|
||||
},
|
||||
},
|
||||
PreferredDuringSchedulingIgnoredDuringExecution: []v1.PreferredSchedulingTerm{
|
||||
{Preference: v1.NodeSelectorTerm{MatchExpressions: []v1.NodeSelectorRequirement{{Key: `foo`}}}},
|
||||
},
|
||||
},
|
||||
},
|
||||
TopologySpreadConstraints: []v1.TopologySpreadConstraint{
|
||||
{TopologyKey: `foo`},
|
||||
},
|
||||
HostAliases: []v1.HostAlias{
|
||||
{IP: "1.1.1.1"},
|
||||
{IP: "2.2.2.2"},
|
||||
},
|
||||
ImagePullSecrets: []v1.LocalObjectReference{
|
||||
{Name: "secret1"},
|
||||
{Name: "secret2"},
|
||||
},
|
||||
Containers: []v1.Container{
|
||||
{
|
||||
Name: "foobar",
|
||||
Image: "alpine",
|
||||
Resources: v1.ResourceRequirements{
|
||||
Requests: v1.ResourceList{
|
||||
v1.ResourceName(v1.ResourceCPU): resource.MustParse("1"),
|
||||
v1.ResourceName(v1.ResourceMemory): resource.MustParse("5"),
|
||||
},
|
||||
Limits: v1.ResourceList{
|
||||
v1.ResourceName(v1.ResourceCPU): resource.MustParse("2"),
|
||||
v1.ResourceName(v1.ResourceMemory): resource.MustParse("10"),
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "foobar2",
|
||||
Image: "alpine",
|
||||
Resources: v1.ResourceRequirements{
|
||||
Requests: v1.ResourceList{
|
||||
v1.ResourceName(v1.ResourceCPU): resource.MustParse("4"),
|
||||
v1.ResourceName(v1.ResourceMemory): resource.MustParse("12"),
|
||||
},
|
||||
Limits: v1.ResourceList{
|
||||
v1.ResourceName(v1.ResourceCPU): resource.MustParse("8"),
|
||||
v1.ResourceName(v1.ResourceMemory): resource.MustParse("24"),
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
InitContainers: []v1.Container{
|
||||
{
|
||||
Name: "small-init",
|
||||
Image: "alpine",
|
||||
Resources: v1.ResourceRequirements{
|
||||
Requests: v1.ResourceList{
|
||||
v1.ResourceName(v1.ResourceCPU): resource.MustParse("1"),
|
||||
v1.ResourceName(v1.ResourceMemory): resource.MustParse("5"),
|
||||
},
|
||||
Limits: v1.ResourceList{
|
||||
v1.ResourceName(v1.ResourceCPU): resource.MustParse("1"),
|
||||
v1.ResourceName(v1.ResourceMemory): resource.MustParse("5"),
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "big-init",
|
||||
Image: "alpine",
|
||||
Resources: v1.ResourceRequirements{
|
||||
Requests: v1.ResourceList{
|
||||
v1.ResourceName(v1.ResourceCPU): resource.MustParse("40"),
|
||||
v1.ResourceName(v1.ResourceMemory): resource.MustParse("120"),
|
||||
},
|
||||
Limits: v1.ResourceList{
|
||||
v1.ResourceName(v1.ResourceCPU): resource.MustParse("80"),
|
||||
v1.ResourceName(v1.ResourceMemory): resource.MustParse("240"),
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
Hostname: fmt.Sprintf("node-%d", i),
|
||||
},
|
||||
Status: v1.PodStatus{
|
||||
Phase: v1.PodRunning,
|
||||
ContainerStatuses: []v1.ContainerStatus{
|
||||
{
|
||||
ContainerID: "docker://numbers",
|
||||
Image: "alpine",
|
||||
Name: "foobar",
|
||||
Ready: false,
|
||||
},
|
||||
{
|
||||
ContainerID: "docker://numbers",
|
||||
Image: "alpine",
|
||||
Name: "foobar2",
|
||||
Ready: false,
|
||||
},
|
||||
},
|
||||
InitContainerStatuses: []v1.ContainerStatus{
|
||||
{
|
||||
ContainerID: "docker://numbers",
|
||||
Image: "alpine",
|
||||
Name: "small-init",
|
||||
Ready: false,
|
||||
},
|
||||
{
|
||||
ContainerID: "docker://numbers",
|
||||
Image: "alpine",
|
||||
Name: "big-init",
|
||||
Ready: false,
|
||||
},
|
||||
},
|
||||
Conditions: []v1.PodCondition{
|
||||
{
|
||||
Type: v1.PodScheduled,
|
||||
Status: v1.ConditionTrue,
|
||||
Reason: "successfully",
|
||||
Message: "sync pod successfully",
|
||||
LastProbeTime: metav1.Now(),
|
||||
LastTransitionTime: metav1.Now(),
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
return out.Items[0].GetName(), out.Items[exemptObjectIndex].GetName(), out
|
||||
}
|
||||
|
||||
func getConfigmapListItems(start int, numItems int) (string, string, *v1.ConfigMapList) {
|
||||
out := &v1.ConfigMapList{
|
||||
Items: make([]v1.ConfigMap, numItems),
|
||||
}
|
||||
|
||||
for i := 0; i < numItems; i++ {
|
||||
out.Items[i] = v1.ConfigMap{
|
||||
TypeMeta: metav1.TypeMeta{
|
||||
APIVersion: "v1",
|
||||
Kind: "ConfigMap",
|
||||
},
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: fmt.Sprintf("cm-%d", i+start),
|
||||
Namespace: "default",
|
||||
Labels: map[string]string{
|
||||
"label-key-1": "label-value-1",
|
||||
},
|
||||
Annotations: map[string]string{
|
||||
"annotations-key-1": "annotations-value-1",
|
||||
},
|
||||
},
|
||||
Data: map[string]string{
|
||||
"data-1": "value-1",
|
||||
"data-2": "value-2",
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
return out.Items[0].GetName(), out.Items[exemptObjectIndex].GetName(), out
|
||||
}
|
||||
|
||||
type TestPagingPodsLW struct {
|
||||
totalPageCount int
|
||||
fetchedPageCount int
|
||||
|
||||
detectedObjectNameList []string
|
||||
exemptObjectNameList []string
|
||||
}
|
||||
|
||||
func newPageTestLW(totalPageNum int) *TestPagingPodsLW {
|
||||
return &TestPagingPodsLW{
|
||||
totalPageCount: totalPageNum,
|
||||
fetchedPageCount: 0,
|
||||
}
|
||||
}
|
||||
|
||||
func (t *TestPagingPodsLW) List(options metav1.ListOptions) (runtime.Object, error) {
|
||||
firstPodName, exemptPodName, list := getPodListItems(t.fetchedPageCount*fakeItemsNum, fakeItemsNum)
|
||||
t.detectedObjectNameList = append(t.detectedObjectNameList, firstPodName)
|
||||
t.exemptObjectNameList = append(t.exemptObjectNameList, exemptPodName)
|
||||
t.fetchedPageCount++
|
||||
if t.fetchedPageCount >= t.totalPageCount {
|
||||
return list, nil
|
||||
}
|
||||
list.SetContinue("true")
|
||||
return list, nil
|
||||
}
|
||||
|
||||
func (t *TestPagingPodsLW) Watch(options metav1.ListOptions) (watch.Interface, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func TestReflectorListExtract(t *testing.T) {
|
||||
store := NewStore(func(obj interface{}) (string, error) {
|
||||
pod, ok := obj.(*v1.Pod)
|
||||
if !ok {
|
||||
return "", fmt.Errorf("expect *v1.Pod, but got %T", obj)
|
||||
}
|
||||
return pod.GetName(), nil
|
||||
})
|
||||
|
||||
lw := newPageTestLW(5)
|
||||
reflector := NewReflector(lw, &v1.Pod{}, store, 0)
|
||||
reflector.WatchListPageSize = fakeItemsNum
|
||||
|
||||
// execute list to fill store
|
||||
stopCh := make(chan struct{})
|
||||
if err := reflector.list(stopCh); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// We will not delete exemptPod,
|
||||
// in order to see if the existence of this Pod causes other Pods that are not used to be unable to properly clear.
|
||||
for _, podName := range lw.exemptObjectNameList {
|
||||
_, exist, err := store.GetByKey(podName)
|
||||
if err != nil || !exist {
|
||||
t.Fatalf("%s should exist in pod store", podName)
|
||||
}
|
||||
}
|
||||
|
||||
// we will pay attention to whether the memory occupied by the first Pod is released
|
||||
// Golang's can only be SetFinalizer for the first element of the array,
|
||||
// so pod-0 will be the object of our attention
|
||||
detectedPodAlreadyBeCleared := make(chan struct{}, len(lw.detectedObjectNameList))
|
||||
|
||||
for _, firstPodName := range lw.detectedObjectNameList {
|
||||
_, exist, err := store.GetByKey(firstPodName)
|
||||
if err != nil || !exist {
|
||||
t.Fatalf("%s should exist in pod store", firstPodName)
|
||||
}
|
||||
firstPod, exist, err := store.GetByKey(firstPodName)
|
||||
if err != nil || !exist {
|
||||
t.Fatalf("%s should exist in pod store", firstPodName)
|
||||
}
|
||||
goruntime.SetFinalizer(firstPod, func(obj interface{}) {
|
||||
t.Logf("%s already be gc\n", obj.(*v1.Pod).GetName())
|
||||
detectedPodAlreadyBeCleared <- struct{}{}
|
||||
})
|
||||
}
|
||||
|
||||
storedObjectKeys := store.ListKeys()
|
||||
for _, k := range storedObjectKeys {
|
||||
// delete all Pods except the exempted Pods.
|
||||
if sets.NewString(lw.exemptObjectNameList...).Has(k) {
|
||||
continue
|
||||
}
|
||||
obj, exist, err := store.GetByKey(k)
|
||||
if err != nil || !exist {
|
||||
t.Fatalf("%s should exist in pod store", k)
|
||||
}
|
||||
|
||||
if err := store.Delete(obj); err != nil {
|
||||
t.Fatalf("delete object: %v", err)
|
||||
}
|
||||
goruntime.GC()
|
||||
}
|
||||
|
||||
clearedNum := 0
|
||||
for {
|
||||
select {
|
||||
case <-detectedPodAlreadyBeCleared:
|
||||
clearedNum++
|
||||
if clearedNum == len(lw.detectedObjectNameList) {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkExtractList(b *testing.B) {
|
||||
_, _, podList := getPodListItems(0, fakeItemsNum)
|
||||
_, _, configMapList := getConfigmapListItems(0, fakeItemsNum)
|
||||
tests := []struct {
|
||||
name string
|
||||
list runtime.Object
|
||||
}{
|
||||
{
|
||||
name: "PodList",
|
||||
list: podList,
|
||||
},
|
||||
{
|
||||
name: "ConfigMapList",
|
||||
list: configMapList,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
b.Run(tc.name, func(b *testing.B) {
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
_, err := meta.ExtractList(tc.list)
|
||||
if err != nil {
|
||||
b.Errorf("extract list: %v", err)
|
||||
}
|
||||
}
|
||||
b.StopTimer()
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkEachListItem(b *testing.B) {
|
||||
_, _, podList := getPodListItems(0, fakeItemsNum)
|
||||
_, _, configMapList := getConfigmapListItems(0, fakeItemsNum)
|
||||
tests := []struct {
|
||||
name string
|
||||
list runtime.Object
|
||||
}{
|
||||
{
|
||||
name: "PodList",
|
||||
list: podList,
|
||||
},
|
||||
{
|
||||
name: "ConfigMapList",
|
||||
list: configMapList,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
b.Run(tc.name, func(b *testing.B) {
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
err := meta.EachListItem(tc.list, func(object runtime.Object) error {
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
b.Errorf("each list: %v", err)
|
||||
}
|
||||
}
|
||||
b.StopTimer()
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkExtractListWithAlloc(b *testing.B) {
|
||||
_, _, podList := getPodListItems(0, fakeItemsNum)
|
||||
_, _, configMapList := getConfigmapListItems(0, fakeItemsNum)
|
||||
tests := []struct {
|
||||
name string
|
||||
list runtime.Object
|
||||
}{
|
||||
{
|
||||
name: "PodList",
|
||||
list: podList,
|
||||
},
|
||||
{
|
||||
name: "ConfigMapList",
|
||||
list: configMapList,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
b.Run(tc.name, func(b *testing.B) {
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
_, err := meta.ExtractListWithAlloc(tc.list)
|
||||
if err != nil {
|
||||
b.Errorf("extract list with alloc: %v", err)
|
||||
}
|
||||
}
|
||||
b.StopTimer()
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkEachListItemWithAlloc(b *testing.B) {
|
||||
_, _, podList := getPodListItems(0, fakeItemsNum)
|
||||
_, _, configMapList := getConfigmapListItems(0, fakeItemsNum)
|
||||
tests := []struct {
|
||||
name string
|
||||
list runtime.Object
|
||||
}{
|
||||
{
|
||||
name: "PodList",
|
||||
list: podList,
|
||||
},
|
||||
{
|
||||
name: "ConfigMapList",
|
||||
list: configMapList,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
b.Run(tc.name, func(b *testing.B) {
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
err := meta.EachListItemWithAlloc(tc.list, func(object runtime.Object) error {
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
b.Errorf("each list with alloc: %v", err)
|
||||
}
|
||||
}
|
||||
b.StopTimer()
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkReflectorList(b *testing.B) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), wait.ForeverTestTimeout)
|
||||
defer cancel()
|
||||
|
||||
store := NewStore(func(obj interface{}) (string, error) {
|
||||
o, err := meta.Accessor(obj)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
return o.GetName(), nil
|
||||
})
|
||||
|
||||
_, _, podList := getPodListItems(0, fakeItemsNum)
|
||||
_, _, configMapList := getConfigmapListItems(0, fakeItemsNum)
|
||||
tests := []struct {
|
||||
name string
|
||||
sample func() interface{}
|
||||
list runtime.Object
|
||||
}{
|
||||
{
|
||||
name: "PodList",
|
||||
sample: func() interface{} {
|
||||
return v1.Pod{}
|
||||
},
|
||||
list: podList,
|
||||
},
|
||||
{
|
||||
name: "ConfigMapList",
|
||||
sample: func() interface{} {
|
||||
return v1.ConfigMap{}
|
||||
},
|
||||
list: configMapList,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
b.Run(tc.name, func(b *testing.B) {
|
||||
|
||||
sample := tc.sample()
|
||||
reflector := NewReflector(newPageTestLW(pageNum), &sample, store, 0)
|
||||
reflector.WatchListPageSize = fakeItemsNum
|
||||
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
err := reflector.list(ctx.Done())
|
||||
if err != nil {
|
||||
b.Fatalf("reflect list: %v", err)
|
||||
}
|
||||
}
|
||||
b.StopTimer()
|
||||
})
|
||||
}
|
||||
}
|
||||
|
@ -73,7 +73,23 @@ func New(fn ListPageFunc) *ListPager {
|
||||
// List returns a single list object, but attempts to retrieve smaller chunks from the
|
||||
// server to reduce the impact on the server. If the chunk attempt fails, it will load
|
||||
// the full list instead. The Limit field on options, if unset, will default to the page size.
|
||||
//
|
||||
// If items in the returned list are retained for different durations, and you want to avoid
|
||||
// retaining the whole slice returned by p.PageFn as long as any item is referenced,
|
||||
// use ListWithAlloc instead.
|
||||
func (p *ListPager) List(ctx context.Context, options metav1.ListOptions) (runtime.Object, bool, error) {
|
||||
return p.list(ctx, options, false)
|
||||
}
|
||||
|
||||
// ListWithAlloc works like List, but avoids retaining references to the items slice returned by p.PageFn.
|
||||
// It does this by making a shallow copy of non-pointer items in the slice returned by p.PageFn.
|
||||
//
|
||||
// If the items in the returned list are not retained, or are retained for the same duration, use List instead for memory efficiency.
|
||||
func (p *ListPager) ListWithAlloc(ctx context.Context, options metav1.ListOptions) (runtime.Object, bool, error) {
|
||||
return p.list(ctx, options, true)
|
||||
}
|
||||
|
||||
func (p *ListPager) list(ctx context.Context, options metav1.ListOptions, allocNew bool) (runtime.Object, bool, error) {
|
||||
if options.Limit == 0 {
|
||||
options.Limit = p.PageSize
|
||||
}
|
||||
@ -123,7 +139,11 @@ func (p *ListPager) List(ctx context.Context, options metav1.ListOptions) (runti
|
||||
list.ResourceVersion = m.GetResourceVersion()
|
||||
list.SelfLink = m.GetSelfLink()
|
||||
}
|
||||
if err := meta.EachListItem(obj, func(obj runtime.Object) error {
|
||||
eachListItemFunc := meta.EachListItem
|
||||
if allocNew {
|
||||
eachListItemFunc = meta.EachListItemWithAlloc
|
||||
}
|
||||
if err := eachListItemFunc(obj, func(obj runtime.Object) error {
|
||||
list.Items = append(list.Items, obj)
|
||||
return nil
|
||||
}); err != nil {
|
||||
@ -156,12 +176,26 @@ func (p *ListPager) List(ctx context.Context, options metav1.ListOptions) (runti
|
||||
//
|
||||
// Items are retrieved in chunks from the server to reduce the impact on the server with up to
|
||||
// ListPager.PageBufferSize chunks buffered concurrently in the background.
|
||||
//
|
||||
// If items passed to fn are retained for different durations, and you want to avoid
|
||||
// retaining the whole slice returned by p.PageFn as long as any item is referenced,
|
||||
// use EachListItemWithAlloc instead.
|
||||
func (p *ListPager) EachListItem(ctx context.Context, options metav1.ListOptions, fn func(obj runtime.Object) error) error {
|
||||
return p.eachListChunkBuffered(ctx, options, func(obj runtime.Object) error {
|
||||
return meta.EachListItem(obj, fn)
|
||||
})
|
||||
}
|
||||
|
||||
// EachListItemWithAlloc works like EachListItem, but avoids retaining references to the items slice returned by p.PageFn.
|
||||
// It does this by making a shallow copy of non-pointer items in the slice returned by p.PageFn.
|
||||
//
|
||||
// If the items passed to fn are not retained, or are retained for the same duration, use EachListItem instead for memory efficiency.
|
||||
func (p *ListPager) EachListItemWithAlloc(ctx context.Context, options metav1.ListOptions, fn func(obj runtime.Object) error) error {
|
||||
return p.eachListChunkBuffered(ctx, options, func(obj runtime.Object) error {
|
||||
return meta.EachListItemWithAlloc(obj, fn)
|
||||
})
|
||||
}
|
||||
|
||||
// eachListChunkBuffered fetches runtimeObject list chunks using this ListPager and invokes fn on
|
||||
// each list chunk. If fn returns an error, processing stops and that error is returned. If fn does
|
||||
// not return an error, any error encountered while retrieving the list from the server is
|
||||
|
Loading…
Reference in New Issue
Block a user