Add WithAlloc interface and stub implementations with base benchmarks

This commit is contained in:
scott 2023-05-27 17:57:35 -04:00 committed by Jordan Liggitt
parent 5539a5b80f
commit b8a3bd673d
No known key found for this signature in database
9 changed files with 1126 additions and 4 deletions

View File

@ -113,8 +113,27 @@ func getItemsPtr(list runtime.Object) (interface{}, error) {
// EachListItem invokes fn on each runtime.Object in the list. Any error immediately terminates
// the loop.
//
// If items passed to fn are retained for different durations, and you want to avoid
// retaining all items in obj as long as any item is referenced, use EachListItemWithAlloc instead.
func EachListItem(obj runtime.Object, fn func(runtime.Object) error) error {
return eachListItem(obj, fn, false)
}
// EachListItemWithAlloc works like EachListItem, but avoids retaining references to the items slice in obj.
// It does this by making a shallow copy of non-pointer items in obj.
//
// If the items passed to fn are not retained, or are retained for the same duration, use EachListItem instead for memory efficiency.
func EachListItemWithAlloc(obj runtime.Object, fn func(runtime.Object) error) error {
return eachListItem(obj, fn, true)
}
// allocNew: Whether shallow copy is required when the elements in Object.Items are struct
func eachListItem(obj runtime.Object, fn func(runtime.Object) error, allocNew bool) error {
if unstructured, ok := obj.(runtime.Unstructured); ok {
if allocNew {
return unstructured.EachListItemWithAlloc(fn)
}
return unstructured.EachListItem(fn)
}
// TODO: Change to an interface call?
@ -167,7 +186,23 @@ func EachListItem(obj runtime.Object, fn func(runtime.Object) error) error {
// ExtractList returns obj's Items element as an array of runtime.Objects.
// Returns an error if obj is not a List type (does not have an Items member).
//
// If items in the returned list are retained for different durations, and you want to avoid
// retaining all items in obj as long as any item is referenced, use ExtractListWithAlloc instead.
func ExtractList(obj runtime.Object) ([]runtime.Object, error) {
return extractList(obj, false)
}
// ExtractListWithAlloc works like ExtractList, but avoids retaining references to the items slice in obj.
// It does this by making a shallow copy of non-pointer items in obj.
//
// If the items in the returned list are not retained, or are retained for the same duration, use ExtractList instead for memory efficiency.
func ExtractListWithAlloc(obj runtime.Object) ([]runtime.Object, error) {
return extractList(obj, true)
}
// allocNew: Whether shallow copy is required when the elements in Object.Items are struct
func extractList(obj runtime.Object, allocNew bool) ([]runtime.Object, error) {
itemsPtr, err := GetItemsPtr(obj)
if err != nil {
return nil, err

View File

@ -0,0 +1,539 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package meta
import (
"reflect"
"strconv"
"testing"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
)
const (
fakeObjectItemsNum = 1000
exemptObjectIndex = fakeObjectItemsNum / 4
)
type SampleSpec struct {
Flied int
}
type FooSpec struct {
Flied int
}
type FooList struct {
metav1.TypeMeta
metav1.ListMeta
Items []Foo
}
func (s *FooList) DeepCopyObject() runtime.Object { panic("unimplemented"); return nil }
type SampleList struct {
metav1.TypeMeta
metav1.ListMeta
Items []Sample
}
func (s *SampleList) DeepCopyObject() runtime.Object { panic("unimplemented"); return nil }
type RawExtensionList struct {
metav1.TypeMeta
metav1.ListMeta
Items []runtime.RawExtension
}
func (l RawExtensionList) DeepCopyObject() runtime.Object { panic("unimplemented"); return nil }
// NOTE: Foo struct itself is the implementer of runtime.Object.
type Foo struct {
metav1.TypeMeta
metav1.ObjectMeta
Spec FooSpec
}
func (f Foo) GetObjectKind() schema.ObjectKind {
tm := f.TypeMeta
return &tm
}
func (f Foo) DeepCopyObject() runtime.Object { panic("unimplemented"); return nil }
// NOTE: the pointer of Sample that is the implementer of runtime.Object.
// the behavior is similar to our corev1.Pod. corev1.Node
type Sample struct {
metav1.TypeMeta
metav1.ObjectMeta
Spec SampleSpec
}
func (s *Sample) GetObjectKind() schema.ObjectKind {
tm := s.TypeMeta
return &tm
}
func (s *Sample) DeepCopyObject() runtime.Object { panic("unimplemented"); return nil }
func fakeSampleList(numItems int) *SampleList {
out := &SampleList{
Items: make([]Sample, numItems),
}
for i := range out.Items {
out.Items[i] = Sample{
TypeMeta: metav1.TypeMeta{
APIVersion: "foo.org/v1",
Kind: "Sample",
},
ObjectMeta: metav1.ObjectMeta{
Name: strconv.Itoa(i),
Namespace: "default",
Labels: map[string]string{
"label-key-1": "label-value-1",
},
Annotations: map[string]string{
"annotations-key-1": "annotations-value-1",
},
},
Spec: SampleSpec{
Flied: i,
},
}
}
return out
}
func fakeExtensionList(numItems int) *RawExtensionList {
out := &RawExtensionList{
Items: make([]runtime.RawExtension, numItems),
}
for i := range out.Items {
out.Items[i] = runtime.RawExtension{
Object: &Foo{
TypeMeta: metav1.TypeMeta{
APIVersion: "sample.org/v1",
Kind: "Foo",
},
ObjectMeta: metav1.ObjectMeta{
Name: strconv.Itoa(i),
Namespace: "default",
Labels: map[string]string{
"label-key-1": "label-value-1",
},
Annotations: map[string]string{
"annotations-key-1": "annotations-value-1",
},
},
Spec: FooSpec{
Flied: i,
},
},
}
}
return out
}
func fakeUnstructuredList(numItems int) runtime.Unstructured {
out := &unstructured.UnstructuredList{
Items: make([]unstructured.Unstructured, numItems),
}
for i := range out.Items {
out.Items[i] = unstructured.Unstructured{
Object: map[string]interface{}{
"apiVersion": "v1",
"kind": "Pod",
"metadata": map[string]interface{}{
"creationTimestamp": nil,
"name": strconv.Itoa(i),
},
"spec": map[string]interface{}{
"hostname": "example.com",
},
"status": map[string]interface{}{},
},
}
}
return out
}
func fakeFooList(numItems int) *FooList {
out := &FooList{
Items: make([]Foo, numItems),
}
for i := range out.Items {
out.Items[i] = Foo{
TypeMeta: metav1.TypeMeta{
APIVersion: "sample.org/v1",
Kind: "Foo",
},
ObjectMeta: metav1.ObjectMeta{
Name: strconv.Itoa(i),
Namespace: "default",
Labels: map[string]string{
"label-key-1": "label-value-1",
},
Annotations: map[string]string{
"annotations-key-1": "annotations-value-1",
},
},
Spec: FooSpec{
Flied: i,
},
}
}
return out
}
func TestEachList(t *testing.T) {
tests := []struct {
name string
generateFunc func(num int) (list runtime.Object)
expectObjectNum int
}{
{
name: "StructReceiverList",
generateFunc: func(num int) (list runtime.Object) {
return fakeFooList(num)
},
expectObjectNum: fakeObjectItemsNum,
},
{
name: "PointerReceiverList",
generateFunc: func(num int) (list runtime.Object) {
return fakeSampleList(num)
},
expectObjectNum: fakeObjectItemsNum,
},
{
name: "RawExtensionList",
generateFunc: func(num int) (list runtime.Object) {
return fakeExtensionList(num)
},
expectObjectNum: fakeObjectItemsNum,
},
{
name: "UnstructuredList",
generateFunc: func(num int) (list runtime.Object) {
return fakeUnstructuredList(fakeObjectItemsNum)
},
expectObjectNum: fakeObjectItemsNum,
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
t.Run("EachListItem", func(t *testing.T) {
expectObjectNames := map[string]struct{}{}
for i := 0; i < tc.expectObjectNum; i++ {
expectObjectNames[strconv.Itoa(i)] = struct{}{}
}
list := tc.generateFunc(tc.expectObjectNum)
err := EachListItem(list, func(object runtime.Object) error {
o, err := Accessor(object)
if err != nil {
return err
}
delete(expectObjectNames, o.GetName())
return nil
})
if err != nil {
t.Errorf("each list item %#v: %v", list, err)
}
if len(expectObjectNames) != 0 {
t.Fatal("expectObjectNames should be empty")
}
})
t.Run("EachListItemWithAlloc", func(t *testing.T) {
expectObjectNames := map[string]struct{}{}
for i := 0; i < tc.expectObjectNum; i++ {
expectObjectNames[strconv.Itoa(i)] = struct{}{}
}
list := tc.generateFunc(tc.expectObjectNum)
err := EachListItemWithAlloc(list, func(object runtime.Object) error {
o, err := Accessor(object)
if err != nil {
return err
}
delete(expectObjectNames, o.GetName())
return nil
})
if err != nil {
t.Errorf("each list %#v with alloc: %v", list, err)
}
if len(expectObjectNames) != 0 {
t.Fatal("expectObjectNames should be empty")
}
})
})
}
}
func TestExtractList(t *testing.T) {
tests := []struct {
name string
generateFunc func(num int) (list runtime.Object)
expectObjectNum int
}{
{
name: "StructReceiverList",
generateFunc: func(num int) (list runtime.Object) {
return fakeFooList(num)
},
expectObjectNum: fakeObjectItemsNum,
},
{
name: "PointerReceiverList",
generateFunc: func(num int) (list runtime.Object) {
return fakeSampleList(num)
},
expectObjectNum: fakeObjectItemsNum,
},
{
name: "RawExtensionList",
generateFunc: func(num int) (list runtime.Object) {
return fakeExtensionList(num)
},
expectObjectNum: fakeObjectItemsNum,
},
{
name: "UnstructuredList",
generateFunc: func(num int) (list runtime.Object) {
return fakeUnstructuredList(fakeObjectItemsNum)
},
expectObjectNum: fakeObjectItemsNum,
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
t.Run("ExtractList", func(t *testing.T) {
expectObjectNames := map[string]struct{}{}
for i := 0; i < tc.expectObjectNum; i++ {
expectObjectNames[strconv.Itoa(i)] = struct{}{}
}
list := tc.generateFunc(tc.expectObjectNum)
objs, err := ExtractList(list)
if err != nil {
t.Fatalf("extract list %#v: %v", list, err)
}
for i := range objs {
var (
o metav1.Object
err error
obj = objs[i]
)
if reflect.TypeOf(obj).Kind() == reflect.Struct {
copy := reflect.New(reflect.TypeOf(obj))
copy.Elem().Set(reflect.ValueOf(obj))
o, err = Accessor(copy.Interface())
} else {
o, err = Accessor(obj)
}
if err != nil {
t.Fatalf("Accessor object %#v: %v", obj, err)
}
delete(expectObjectNames, o.GetName())
}
if len(expectObjectNames) != 0 {
t.Fatal("expectObjectNames should be empty")
}
})
t.Run("ExtractListWithAlloc", func(t *testing.T) {
expectObjectNames := map[string]struct{}{}
for i := 0; i < tc.expectObjectNum; i++ {
expectObjectNames[strconv.Itoa(i)] = struct{}{}
}
list := tc.generateFunc(tc.expectObjectNum)
objs, err := ExtractListWithAlloc(list)
if err != nil {
t.Fatalf("extract list with alloc: %v", err)
}
for i := range objs {
var (
o metav1.Object
err error
obj = objs[i]
)
if reflect.TypeOf(obj).Kind() == reflect.Struct {
copy := reflect.New(reflect.TypeOf(obj))
copy.Elem().Set(reflect.ValueOf(obj))
o, err = Accessor(copy.Interface())
} else {
o, err = Accessor(obj)
}
if err != nil {
t.Fatalf("Accessor object %#v: %v", obj, err)
}
delete(expectObjectNames, o.GetName())
}
if len(expectObjectNames) != 0 {
t.Fatal("expectObjectNames should be empty")
}
})
})
}
}
func BenchmarkExtractListItem(b *testing.B) {
tests := []struct {
name string
list runtime.Object
}{
{
name: "StructReceiverList",
list: fakeFooList(fakeObjectItemsNum),
},
{
name: "PointerReceiverList",
list: fakeSampleList(fakeObjectItemsNum),
},
{
name: "RawExtensionList",
list: fakeExtensionList(fakeObjectItemsNum),
},
}
for _, tc := range tests {
b.Run(tc.name, func(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err := ExtractList(tc.list)
if err != nil {
b.Fatalf("ExtractList: %v", err)
}
}
b.StopTimer()
})
}
}
func BenchmarkEachListItem(b *testing.B) {
tests := []struct {
name string
list runtime.Object
}{
{
name: "StructReceiverList",
list: fakeFooList(fakeObjectItemsNum),
},
{
name: "PointerReceiverList",
list: fakeSampleList(fakeObjectItemsNum),
},
{
name: "RawExtensionList",
list: fakeExtensionList(fakeObjectItemsNum),
},
{
name: "UnstructuredList",
list: fakeUnstructuredList(fakeObjectItemsNum),
},
}
for _, tc := range tests {
b.Run(tc.name, func(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
err := EachListItem(tc.list, func(object runtime.Object) error {
return nil
})
if err != nil {
b.Fatalf("EachListItem: %v", err)
}
}
b.StopTimer()
})
}
}
func BenchmarkExtractListItemWithAlloc(b *testing.B) {
tests := []struct {
name string
list runtime.Object
}{
{
name: "StructReceiverList",
list: fakeFooList(fakeObjectItemsNum),
},
{
name: "PointerReceiverList",
list: fakeSampleList(fakeObjectItemsNum),
},
{
name: "RawExtensionList",
list: fakeExtensionList(fakeObjectItemsNum),
},
}
for _, tc := range tests {
b.Run(tc.name, func(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err := ExtractListWithAlloc(tc.list)
if err != nil {
b.Fatalf("ExtractListWithAlloc: %v", err)
}
}
b.StopTimer()
})
}
}
func BenchmarkEachListItemWithAlloc(b *testing.B) {
tests := []struct {
name string
list runtime.Object
}{
{
name: "StructReceiverList",
list: fakeFooList(fakeObjectItemsNum),
},
{
name: "PointerReceiverList",
list: fakeSampleList(fakeObjectItemsNum),
},
{
name: "RawExtensionList",
list: fakeExtensionList(fakeObjectItemsNum),
},
{
name: "UnstructuredList",
list: fakeUnstructuredList(fakeObjectItemsNum),
},
}
for _, tc := range tests {
b.Run(tc.name, func(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
err := EachListItemWithAlloc(tc.list, func(object runtime.Object) error {
return nil
})
if err != nil {
b.Fatalf("EachListItemWithAlloc: %v", err)
}
}
b.StopTimer()
})
}
}

View File

@ -101,6 +101,11 @@ func (obj *Unstructured) EachListItem(fn func(runtime.Object) error) error {
return nil
}
func (obj *Unstructured) EachListItemWithAlloc(fn func(runtime.Object) error) error {
// EachListItem has allocated a new Object for the user, we can use it directly.
return obj.EachListItem(fn)
}
func (obj *Unstructured) UnstructuredContent() map[string]interface{} {
if obj.Object == nil {
return make(map[string]interface{})

View File

@ -52,6 +52,10 @@ func (u *UnstructuredList) EachListItem(fn func(runtime.Object) error) error {
return nil
}
func (u *UnstructuredList) EachListItemWithAlloc(fn func(runtime.Object) error) error {
return u.EachListItem(fn)
}
// NewEmptyInstance returns a new instance of the concrete type containing only kind/apiVersion and no other data.
// This should be called instead of reflect.New() for unstructured types because the go type alone does not preserve kind/apiVersion info.
func (u *UnstructuredList) NewEmptyInstance() runtime.Unstructured {

View File

@ -365,4 +365,9 @@ type Unstructured interface {
// error should terminate the iteration. If IsList() returns false, this method should return an error
// instead of calling the provided function.
EachListItem(func(Object) error) error
// EachListItemWithAlloc works like EachListItem, but avoids retaining references to a slice of items.
// It does this by making a shallow copy of non-pointer items before passing them to fn.
//
// If the items passed to fn are not retained, or are retained for the same duration, use EachListItem instead for memory efficiency.
EachListItemWithAlloc(func(Object) error) error
}

View File

@ -261,6 +261,11 @@ func (obj *Unstructured) EachListItem(fn func(runtime.Object) error) error {
return nil
}
func (obj *Unstructured) EachListItemWithAlloc(fn func(runtime.Object) error) error {
// EachListItem has allocated a new Object for the user, we can use it directly.
return obj.EachListItem(fn)
}
func (obj *Unstructured) NewEmptyInstance() runtime.Unstructured {
out := new(Unstructured)
if obj != nil {

View File

@ -399,7 +399,7 @@ func (r *Reflector) list(stopCh <-chan struct{}) error {
pager.PageSize = 0
}
list, paginatedResult, err = pager.List(context.Background(), options)
list, paginatedResult, err = pager.ListWithAlloc(context.Background(), options)
if isExpiredError(err) || isTooLargeResourceVersionError(err) {
r.setIsLastSyncResourceVersionUnavailable(true)
// Retry immediately if the resource version used to list is unavailable.
@ -408,7 +408,7 @@ func (r *Reflector) list(stopCh <-chan struct{}) error {
// resource version it is listing at is expired or the cache may not yet be synced to the provided
// resource version. So we need to fallback to resourceVersion="" in all to recover and ensure
// the reflector makes forward progress.
list, paginatedResult, err = pager.List(context.Background(), metav1.ListOptions{ResourceVersion: r.relistResourceVersion()})
list, paginatedResult, err = pager.ListWithAlloc(context.Background(), metav1.ListOptions{ResourceVersion: r.relistResourceVersion()})
}
close(listCh)
}()
@ -446,7 +446,7 @@ func (r *Reflector) list(stopCh <-chan struct{}) error {
}
resourceVersion = listMetaInterface.GetResourceVersion()
initTrace.Step("Resource version extracted")
items, err := meta.ExtractList(list)
items, err := meta.ExtractListWithAlloc(list)
if err != nil {
return fmt.Errorf("unable to understand list result %#v (%v)", list, err)
}

View File

@ -17,10 +17,12 @@ limitations under the License.
package cache
import (
"context"
"errors"
"fmt"
"math/rand"
"reflect"
goruntime "runtime"
"strconv"
"syscall"
"testing"
@ -28,10 +30,13 @@ import (
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/utils/clock"
@ -1089,3 +1094,493 @@ func TestReflectorResourceVersionUpdate(t *testing.T) {
t.Errorf("Expected series of resource version updates of %#v but got: %#v", expectedRVs, s.resourceVersions)
}
}
const (
fakeItemsNum = 100
exemptObjectIndex = fakeItemsNum / 4
pageNum = 3
)
func getPodListItems(start int, numItems int) (string, string, *v1.PodList) {
out := &v1.PodList{
Items: make([]v1.Pod, numItems),
}
for i := 0; i < numItems; i++ {
out.Items[i] = v1.Pod{
TypeMeta: metav1.TypeMeta{
APIVersion: "v1",
Kind: "Pod",
},
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("pod-%d", i+start),
Namespace: "default",
Labels: map[string]string{
"label-key-1": "label-value-1",
},
Annotations: map[string]string{
"annotations-key-1": "annotations-value-1",
},
},
Spec: v1.PodSpec{
Overhead: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("3"),
v1.ResourceMemory: resource.MustParse("8"),
},
NodeSelector: map[string]string{
"foo": "bar",
"baz": "quux",
},
Affinity: &v1.Affinity{
NodeAffinity: &v1.NodeAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{
NodeSelectorTerms: []v1.NodeSelectorTerm{
{MatchExpressions: []v1.NodeSelectorRequirement{{Key: `foo`}}},
},
},
PreferredDuringSchedulingIgnoredDuringExecution: []v1.PreferredSchedulingTerm{
{Preference: v1.NodeSelectorTerm{MatchExpressions: []v1.NodeSelectorRequirement{{Key: `foo`}}}},
},
},
},
TopologySpreadConstraints: []v1.TopologySpreadConstraint{
{TopologyKey: `foo`},
},
HostAliases: []v1.HostAlias{
{IP: "1.1.1.1"},
{IP: "2.2.2.2"},
},
ImagePullSecrets: []v1.LocalObjectReference{
{Name: "secret1"},
{Name: "secret2"},
},
Containers: []v1.Container{
{
Name: "foobar",
Image: "alpine",
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceName(v1.ResourceCPU): resource.MustParse("1"),
v1.ResourceName(v1.ResourceMemory): resource.MustParse("5"),
},
Limits: v1.ResourceList{
v1.ResourceName(v1.ResourceCPU): resource.MustParse("2"),
v1.ResourceName(v1.ResourceMemory): resource.MustParse("10"),
},
},
},
{
Name: "foobar2",
Image: "alpine",
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceName(v1.ResourceCPU): resource.MustParse("4"),
v1.ResourceName(v1.ResourceMemory): resource.MustParse("12"),
},
Limits: v1.ResourceList{
v1.ResourceName(v1.ResourceCPU): resource.MustParse("8"),
v1.ResourceName(v1.ResourceMemory): resource.MustParse("24"),
},
},
},
},
InitContainers: []v1.Container{
{
Name: "small-init",
Image: "alpine",
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceName(v1.ResourceCPU): resource.MustParse("1"),
v1.ResourceName(v1.ResourceMemory): resource.MustParse("5"),
},
Limits: v1.ResourceList{
v1.ResourceName(v1.ResourceCPU): resource.MustParse("1"),
v1.ResourceName(v1.ResourceMemory): resource.MustParse("5"),
},
},
},
{
Name: "big-init",
Image: "alpine",
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceName(v1.ResourceCPU): resource.MustParse("40"),
v1.ResourceName(v1.ResourceMemory): resource.MustParse("120"),
},
Limits: v1.ResourceList{
v1.ResourceName(v1.ResourceCPU): resource.MustParse("80"),
v1.ResourceName(v1.ResourceMemory): resource.MustParse("240"),
},
},
},
},
Hostname: fmt.Sprintf("node-%d", i),
},
Status: v1.PodStatus{
Phase: v1.PodRunning,
ContainerStatuses: []v1.ContainerStatus{
{
ContainerID: "docker://numbers",
Image: "alpine",
Name: "foobar",
Ready: false,
},
{
ContainerID: "docker://numbers",
Image: "alpine",
Name: "foobar2",
Ready: false,
},
},
InitContainerStatuses: []v1.ContainerStatus{
{
ContainerID: "docker://numbers",
Image: "alpine",
Name: "small-init",
Ready: false,
},
{
ContainerID: "docker://numbers",
Image: "alpine",
Name: "big-init",
Ready: false,
},
},
Conditions: []v1.PodCondition{
{
Type: v1.PodScheduled,
Status: v1.ConditionTrue,
Reason: "successfully",
Message: "sync pod successfully",
LastProbeTime: metav1.Now(),
LastTransitionTime: metav1.Now(),
},
},
},
}
}
return out.Items[0].GetName(), out.Items[exemptObjectIndex].GetName(), out
}
func getConfigmapListItems(start int, numItems int) (string, string, *v1.ConfigMapList) {
out := &v1.ConfigMapList{
Items: make([]v1.ConfigMap, numItems),
}
for i := 0; i < numItems; i++ {
out.Items[i] = v1.ConfigMap{
TypeMeta: metav1.TypeMeta{
APIVersion: "v1",
Kind: "ConfigMap",
},
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("cm-%d", i+start),
Namespace: "default",
Labels: map[string]string{
"label-key-1": "label-value-1",
},
Annotations: map[string]string{
"annotations-key-1": "annotations-value-1",
},
},
Data: map[string]string{
"data-1": "value-1",
"data-2": "value-2",
},
}
}
return out.Items[0].GetName(), out.Items[exemptObjectIndex].GetName(), out
}
type TestPagingPodsLW struct {
totalPageCount int
fetchedPageCount int
detectedObjectNameList []string
exemptObjectNameList []string
}
func newPageTestLW(totalPageNum int) *TestPagingPodsLW {
return &TestPagingPodsLW{
totalPageCount: totalPageNum,
fetchedPageCount: 0,
}
}
func (t *TestPagingPodsLW) List(options metav1.ListOptions) (runtime.Object, error) {
firstPodName, exemptPodName, list := getPodListItems(t.fetchedPageCount*fakeItemsNum, fakeItemsNum)
t.detectedObjectNameList = append(t.detectedObjectNameList, firstPodName)
t.exemptObjectNameList = append(t.exemptObjectNameList, exemptPodName)
t.fetchedPageCount++
if t.fetchedPageCount >= t.totalPageCount {
return list, nil
}
list.SetContinue("true")
return list, nil
}
func (t *TestPagingPodsLW) Watch(options metav1.ListOptions) (watch.Interface, error) {
return nil, nil
}
func TestReflectorListExtract(t *testing.T) {
store := NewStore(func(obj interface{}) (string, error) {
pod, ok := obj.(*v1.Pod)
if !ok {
return "", fmt.Errorf("expect *v1.Pod, but got %T", obj)
}
return pod.GetName(), nil
})
lw := newPageTestLW(5)
reflector := NewReflector(lw, &v1.Pod{}, store, 0)
reflector.WatchListPageSize = fakeItemsNum
// execute list to fill store
stopCh := make(chan struct{})
if err := reflector.list(stopCh); err != nil {
t.Fatal(err)
}
// We will not delete exemptPod,
// in order to see if the existence of this Pod causes other Pods that are not used to be unable to properly clear.
for _, podName := range lw.exemptObjectNameList {
_, exist, err := store.GetByKey(podName)
if err != nil || !exist {
t.Fatalf("%s should exist in pod store", podName)
}
}
// we will pay attention to whether the memory occupied by the first Pod is released
// Golang's can only be SetFinalizer for the first element of the array,
// so pod-0 will be the object of our attention
detectedPodAlreadyBeCleared := make(chan struct{}, len(lw.detectedObjectNameList))
for _, firstPodName := range lw.detectedObjectNameList {
_, exist, err := store.GetByKey(firstPodName)
if err != nil || !exist {
t.Fatalf("%s should exist in pod store", firstPodName)
}
firstPod, exist, err := store.GetByKey(firstPodName)
if err != nil || !exist {
t.Fatalf("%s should exist in pod store", firstPodName)
}
goruntime.SetFinalizer(firstPod, func(obj interface{}) {
t.Logf("%s already be gc\n", obj.(*v1.Pod).GetName())
detectedPodAlreadyBeCleared <- struct{}{}
})
}
storedObjectKeys := store.ListKeys()
for _, k := range storedObjectKeys {
// delete all Pods except the exempted Pods.
if sets.NewString(lw.exemptObjectNameList...).Has(k) {
continue
}
obj, exist, err := store.GetByKey(k)
if err != nil || !exist {
t.Fatalf("%s should exist in pod store", k)
}
if err := store.Delete(obj); err != nil {
t.Fatalf("delete object: %v", err)
}
goruntime.GC()
}
clearedNum := 0
for {
select {
case <-detectedPodAlreadyBeCleared:
clearedNum++
if clearedNum == len(lw.detectedObjectNameList) {
return
}
}
}
}
func BenchmarkExtractList(b *testing.B) {
_, _, podList := getPodListItems(0, fakeItemsNum)
_, _, configMapList := getConfigmapListItems(0, fakeItemsNum)
tests := []struct {
name string
list runtime.Object
}{
{
name: "PodList",
list: podList,
},
{
name: "ConfigMapList",
list: configMapList,
},
}
for _, tc := range tests {
b.Run(tc.name, func(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err := meta.ExtractList(tc.list)
if err != nil {
b.Errorf("extract list: %v", err)
}
}
b.StopTimer()
})
}
}
func BenchmarkEachListItem(b *testing.B) {
_, _, podList := getPodListItems(0, fakeItemsNum)
_, _, configMapList := getConfigmapListItems(0, fakeItemsNum)
tests := []struct {
name string
list runtime.Object
}{
{
name: "PodList",
list: podList,
},
{
name: "ConfigMapList",
list: configMapList,
},
}
for _, tc := range tests {
b.Run(tc.name, func(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
err := meta.EachListItem(tc.list, func(object runtime.Object) error {
return nil
})
if err != nil {
b.Errorf("each list: %v", err)
}
}
b.StopTimer()
})
}
}
func BenchmarkExtractListWithAlloc(b *testing.B) {
_, _, podList := getPodListItems(0, fakeItemsNum)
_, _, configMapList := getConfigmapListItems(0, fakeItemsNum)
tests := []struct {
name string
list runtime.Object
}{
{
name: "PodList",
list: podList,
},
{
name: "ConfigMapList",
list: configMapList,
},
}
for _, tc := range tests {
b.Run(tc.name, func(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err := meta.ExtractListWithAlloc(tc.list)
if err != nil {
b.Errorf("extract list with alloc: %v", err)
}
}
b.StopTimer()
})
}
}
func BenchmarkEachListItemWithAlloc(b *testing.B) {
_, _, podList := getPodListItems(0, fakeItemsNum)
_, _, configMapList := getConfigmapListItems(0, fakeItemsNum)
tests := []struct {
name string
list runtime.Object
}{
{
name: "PodList",
list: podList,
},
{
name: "ConfigMapList",
list: configMapList,
},
}
for _, tc := range tests {
b.Run(tc.name, func(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
err := meta.EachListItemWithAlloc(tc.list, func(object runtime.Object) error {
return nil
})
if err != nil {
b.Errorf("each list with alloc: %v", err)
}
}
b.StopTimer()
})
}
}
func BenchmarkReflectorList(b *testing.B) {
ctx, cancel := context.WithTimeout(context.Background(), wait.ForeverTestTimeout)
defer cancel()
store := NewStore(func(obj interface{}) (string, error) {
o, err := meta.Accessor(obj)
if err != nil {
return "", err
}
return o.GetName(), nil
})
_, _, podList := getPodListItems(0, fakeItemsNum)
_, _, configMapList := getConfigmapListItems(0, fakeItemsNum)
tests := []struct {
name string
sample func() interface{}
list runtime.Object
}{
{
name: "PodList",
sample: func() interface{} {
return v1.Pod{}
},
list: podList,
},
{
name: "ConfigMapList",
sample: func() interface{} {
return v1.ConfigMap{}
},
list: configMapList,
},
}
for _, tc := range tests {
b.Run(tc.name, func(b *testing.B) {
sample := tc.sample()
reflector := NewReflector(newPageTestLW(pageNum), &sample, store, 0)
reflector.WatchListPageSize = fakeItemsNum
b.ResetTimer()
for i := 0; i < b.N; i++ {
err := reflector.list(ctx.Done())
if err != nil {
b.Fatalf("reflect list: %v", err)
}
}
b.StopTimer()
})
}
}

View File

@ -73,7 +73,23 @@ func New(fn ListPageFunc) *ListPager {
// List returns a single list object, but attempts to retrieve smaller chunks from the
// server to reduce the impact on the server. If the chunk attempt fails, it will load
// the full list instead. The Limit field on options, if unset, will default to the page size.
//
// If items in the returned list are retained for different durations, and you want to avoid
// retaining the whole slice returned by p.PageFn as long as any item is referenced,
// use ListWithAlloc instead.
func (p *ListPager) List(ctx context.Context, options metav1.ListOptions) (runtime.Object, bool, error) {
return p.list(ctx, options, false)
}
// ListWithAlloc works like List, but avoids retaining references to the items slice returned by p.PageFn.
// It does this by making a shallow copy of non-pointer items in the slice returned by p.PageFn.
//
// If the items in the returned list are not retained, or are retained for the same duration, use List instead for memory efficiency.
func (p *ListPager) ListWithAlloc(ctx context.Context, options metav1.ListOptions) (runtime.Object, bool, error) {
return p.list(ctx, options, true)
}
func (p *ListPager) list(ctx context.Context, options metav1.ListOptions, allocNew bool) (runtime.Object, bool, error) {
if options.Limit == 0 {
options.Limit = p.PageSize
}
@ -123,7 +139,11 @@ func (p *ListPager) List(ctx context.Context, options metav1.ListOptions) (runti
list.ResourceVersion = m.GetResourceVersion()
list.SelfLink = m.GetSelfLink()
}
if err := meta.EachListItem(obj, func(obj runtime.Object) error {
eachListItemFunc := meta.EachListItem
if allocNew {
eachListItemFunc = meta.EachListItemWithAlloc
}
if err := eachListItemFunc(obj, func(obj runtime.Object) error {
list.Items = append(list.Items, obj)
return nil
}); err != nil {
@ -156,12 +176,26 @@ func (p *ListPager) List(ctx context.Context, options metav1.ListOptions) (runti
//
// Items are retrieved in chunks from the server to reduce the impact on the server with up to
// ListPager.PageBufferSize chunks buffered concurrently in the background.
//
// If items passed to fn are retained for different durations, and you want to avoid
// retaining the whole slice returned by p.PageFn as long as any item is referenced,
// use EachListItemWithAlloc instead.
func (p *ListPager) EachListItem(ctx context.Context, options metav1.ListOptions, fn func(obj runtime.Object) error) error {
return p.eachListChunkBuffered(ctx, options, func(obj runtime.Object) error {
return meta.EachListItem(obj, fn)
})
}
// EachListItemWithAlloc works like EachListItem, but avoids retaining references to the items slice returned by p.PageFn.
// It does this by making a shallow copy of non-pointer items in the slice returned by p.PageFn.
//
// If the items passed to fn are not retained, or are retained for the same duration, use EachListItem instead for memory efficiency.
func (p *ListPager) EachListItemWithAlloc(ctx context.Context, options metav1.ListOptions, fn func(obj runtime.Object) error) error {
return p.eachListChunkBuffered(ctx, options, func(obj runtime.Object) error {
return meta.EachListItemWithAlloc(obj, fn)
})
}
// eachListChunkBuffered fetches runtimeObject list chunks using this ListPager and invokes fn on
// each list chunk. If fn returns an error, processing stops and that error is returned. If fn does
// not return an error, any error encountered while retrieving the list from the server is