mirror of
https://github.com/kubernetes/client-go.git
synced 2025-06-18 19:37:36 +00:00
Signed-off-by: Abirdcfly <fp544037857@gmail.com> Kubernetes-commit: 5e84f6b6d7cd39b785eaca404c3f7e460f80f71b
472 lines
14 KiB
Go
472 lines
14 KiB
Go
/*
|
|
Copyright 2017 The Kubernetes Authors.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package pager
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"reflect"
|
|
"testing"
|
|
"time"
|
|
|
|
"k8s.io/apimachinery/pkg/api/errors"
|
|
metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion"
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
metav1beta1 "k8s.io/apimachinery/pkg/apis/meta/v1beta1"
|
|
"k8s.io/apimachinery/pkg/runtime"
|
|
)
|
|
|
|
func list(count int, rv string) *metainternalversion.List {
|
|
var list metainternalversion.List
|
|
for i := 0; i < count; i++ {
|
|
list.Items = append(list.Items, &metav1beta1.PartialObjectMetadata{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: fmt.Sprintf("%d", i),
|
|
},
|
|
})
|
|
}
|
|
list.ResourceVersion = rv
|
|
return &list
|
|
}
|
|
|
|
type testPager struct {
|
|
t *testing.T
|
|
rv string
|
|
index int
|
|
remaining int
|
|
last int
|
|
continuing bool
|
|
done bool
|
|
expectPage int64
|
|
}
|
|
|
|
func (p *testPager) reset() {
|
|
p.continuing = false
|
|
p.remaining += p.index
|
|
p.index = 0
|
|
p.last = 0
|
|
p.done = false
|
|
}
|
|
|
|
func (p *testPager) PagedList(ctx context.Context, options metav1.ListOptions) (runtime.Object, error) {
|
|
if p.done {
|
|
p.t.Errorf("did not expect additional call to paged list")
|
|
return nil, fmt.Errorf("unexpected list call")
|
|
}
|
|
expectedContinue := fmt.Sprintf("%s:%d", p.rv, p.last)
|
|
if options.Limit != p.expectPage || (p.continuing && options.Continue != expectedContinue) {
|
|
p.t.Errorf("invariant violated, expected limit %d and continue %s, got %#v", p.expectPage, expectedContinue, options)
|
|
return nil, fmt.Errorf("invariant violated")
|
|
}
|
|
if options.Continue != "" && options.ResourceVersion != "" {
|
|
p.t.Errorf("invariant violated, specifying resource version (%s) is not allowed when using continue (%s).", options.ResourceVersion, options.Continue)
|
|
return nil, fmt.Errorf("invariant violated")
|
|
}
|
|
if options.Continue != "" && options.ResourceVersionMatch != "" {
|
|
p.t.Errorf("invariant violated, specifying resource version match type (%s) is not allowed when using continue (%s).", options.ResourceVersionMatch, options.Continue)
|
|
return nil, fmt.Errorf("invariant violated")
|
|
}
|
|
var list metainternalversion.List
|
|
total := options.Limit
|
|
if total == 0 {
|
|
total = int64(p.remaining)
|
|
}
|
|
for i := int64(0); i < total; i++ {
|
|
if p.remaining <= 0 {
|
|
break
|
|
}
|
|
list.Items = append(list.Items, &metav1beta1.PartialObjectMetadata{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: fmt.Sprintf("%d", p.index),
|
|
},
|
|
})
|
|
p.remaining--
|
|
p.index++
|
|
}
|
|
p.last = p.index
|
|
if p.remaining > 0 {
|
|
list.Continue = fmt.Sprintf("%s:%d", p.rv, p.last)
|
|
p.continuing = true
|
|
} else {
|
|
p.done = true
|
|
}
|
|
list.ResourceVersion = p.rv
|
|
return &list, nil
|
|
}
|
|
|
|
func (p *testPager) ExpiresOnSecondPage(ctx context.Context, options metav1.ListOptions) (runtime.Object, error) {
|
|
if p.continuing {
|
|
p.done = true
|
|
return nil, errors.NewResourceExpired("this list has expired")
|
|
}
|
|
return p.PagedList(ctx, options)
|
|
}
|
|
|
|
func (p *testPager) ExpiresOnSecondPageThenFullList(ctx context.Context, options metav1.ListOptions) (runtime.Object, error) {
|
|
if p.continuing {
|
|
p.reset()
|
|
p.expectPage = 0
|
|
return nil, errors.NewResourceExpired("this list has expired")
|
|
}
|
|
return p.PagedList(ctx, options)
|
|
}
|
|
|
|
func TestListPager_List(t *testing.T) {
|
|
type fields struct {
|
|
PageSize int64
|
|
PageFn ListPageFunc
|
|
FullListIfExpired bool
|
|
}
|
|
type args struct {
|
|
ctx context.Context
|
|
options metav1.ListOptions
|
|
}
|
|
tests := []struct {
|
|
name string
|
|
fields fields
|
|
args args
|
|
want runtime.Object
|
|
wantPaged bool
|
|
wantErr bool
|
|
isExpired bool
|
|
}{
|
|
{
|
|
name: "empty page",
|
|
fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 0, rv: "rv:20"}).PagedList},
|
|
args: args{},
|
|
want: list(0, "rv:20"),
|
|
wantPaged: false,
|
|
},
|
|
{
|
|
name: "one page",
|
|
fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 9, rv: "rv:20"}).PagedList},
|
|
args: args{},
|
|
want: list(9, "rv:20"),
|
|
wantPaged: false,
|
|
},
|
|
{
|
|
name: "one full page",
|
|
fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 10, rv: "rv:20"}).PagedList},
|
|
args: args{},
|
|
want: list(10, "rv:20"),
|
|
wantPaged: false,
|
|
},
|
|
{
|
|
name: "two pages",
|
|
fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 11, rv: "rv:20"}).PagedList},
|
|
args: args{},
|
|
want: list(11, "rv:20"),
|
|
wantPaged: true,
|
|
},
|
|
{
|
|
name: "three pages",
|
|
fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 21, rv: "rv:20"}).PagedList},
|
|
args: args{},
|
|
want: list(21, "rv:20"),
|
|
wantPaged: true,
|
|
},
|
|
{
|
|
name: "expires on second page",
|
|
fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 21, rv: "rv:20"}).ExpiresOnSecondPage},
|
|
args: args{},
|
|
wantPaged: true,
|
|
wantErr: true,
|
|
isExpired: true,
|
|
},
|
|
{
|
|
name: "expires on second page and then lists",
|
|
fields: fields{
|
|
FullListIfExpired: true,
|
|
PageSize: 10,
|
|
PageFn: (&testPager{t: t, expectPage: 10, remaining: 21, rv: "rv:20"}).ExpiresOnSecondPageThenFullList,
|
|
},
|
|
args: args{},
|
|
want: list(21, "rv:20"),
|
|
wantPaged: true,
|
|
},
|
|
{
|
|
name: "two pages with resourceVersion",
|
|
fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 11, rv: "rv:20"}).PagedList},
|
|
args: args{options: metav1.ListOptions{ResourceVersion: "rv:10"}},
|
|
want: list(11, "rv:20"),
|
|
wantPaged: true,
|
|
},
|
|
{
|
|
name: "two pages with resourceVersion and resourceVersionMatch",
|
|
fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 11, rv: "rv:20"}).PagedList},
|
|
args: args{options: metav1.ListOptions{ResourceVersion: "rv:10", ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan}},
|
|
want: list(11, "rv:20"),
|
|
wantPaged: true,
|
|
},
|
|
}
|
|
for _, tt := range tests {
|
|
t.Run(tt.name, func(t *testing.T) {
|
|
p := &ListPager{
|
|
PageSize: tt.fields.PageSize,
|
|
PageFn: tt.fields.PageFn,
|
|
FullListIfExpired: tt.fields.FullListIfExpired,
|
|
}
|
|
ctx := tt.args.ctx
|
|
if ctx == nil {
|
|
ctx = context.Background()
|
|
}
|
|
got, paginatedResult, err := p.List(ctx, tt.args.options)
|
|
if (err != nil) != tt.wantErr {
|
|
t.Errorf("ListPager.List() error = %v, wantErr %v", err, tt.wantErr)
|
|
return
|
|
}
|
|
if tt.isExpired != errors.IsResourceExpired(err) {
|
|
t.Errorf("ListPager.List() error = %v, isExpired %v", err, tt.isExpired)
|
|
return
|
|
}
|
|
if tt.wantPaged != paginatedResult {
|
|
t.Errorf("paginatedResult = %t, want %t", paginatedResult, tt.wantPaged)
|
|
}
|
|
if !reflect.DeepEqual(got, tt.want) {
|
|
t.Errorf("ListPager.List() = %v, want %v", got, tt.want)
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestListPager_EachListItem(t *testing.T) {
|
|
type fields struct {
|
|
PageSize int64
|
|
PageFn ListPageFunc
|
|
}
|
|
tests := []struct {
|
|
name string
|
|
fields fields
|
|
want runtime.Object
|
|
wantErr bool
|
|
wantPanic bool
|
|
isExpired bool
|
|
processorErrorOnItem int
|
|
processorPanicOnItem int
|
|
cancelContextOnItem int
|
|
}{
|
|
{
|
|
name: "empty page",
|
|
fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 0, rv: "rv:20"}).PagedList},
|
|
want: list(0, "rv:20"),
|
|
},
|
|
{
|
|
name: "one page",
|
|
fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 9, rv: "rv:20"}).PagedList},
|
|
want: list(9, "rv:20"),
|
|
},
|
|
{
|
|
name: "one full page",
|
|
fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 10, rv: "rv:20"}).PagedList},
|
|
want: list(10, "rv:20"),
|
|
},
|
|
{
|
|
name: "two pages",
|
|
fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 11, rv: "rv:20"}).PagedList},
|
|
want: list(11, "rv:20"),
|
|
},
|
|
{
|
|
name: "three pages",
|
|
fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 21, rv: "rv:20"}).PagedList},
|
|
want: list(21, "rv:20"),
|
|
},
|
|
{
|
|
name: "expires on second page",
|
|
fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 21, rv: "rv:20"}).ExpiresOnSecondPage},
|
|
want: list(10, "rv:20"), // all items on the first page should have been visited
|
|
wantErr: true,
|
|
isExpired: true,
|
|
},
|
|
{
|
|
name: "error processing item",
|
|
fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 51, rv: "rv:20"}).PagedList},
|
|
want: list(3, "rv:20"), // all the items <= the one the processor returned an error on should have been visited
|
|
wantPanic: true,
|
|
processorPanicOnItem: 3,
|
|
},
|
|
{
|
|
name: "cancel context while processing",
|
|
fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 51, rv: "rv:20"}).PagedList},
|
|
want: list(10, "rv:20"), // The whole PageSize worth of items got returned.
|
|
wantErr: true,
|
|
cancelContextOnItem: 3,
|
|
},
|
|
}
|
|
|
|
processorErr := fmt.Errorf("processor error")
|
|
for _, tt := range tests {
|
|
t.Run(tt.name, func(t *testing.T) {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
p := &ListPager{
|
|
PageSize: tt.fields.PageSize,
|
|
PageFn: tt.fields.PageFn,
|
|
}
|
|
var items []runtime.Object
|
|
|
|
fn := func(obj runtime.Object) error {
|
|
items = append(items, obj)
|
|
if tt.processorErrorOnItem > 0 && len(items) == tt.processorErrorOnItem {
|
|
return processorErr
|
|
}
|
|
if tt.processorPanicOnItem > 0 && len(items) == tt.processorPanicOnItem {
|
|
panic(processorErr)
|
|
}
|
|
if tt.cancelContextOnItem > 0 && len(items) == tt.cancelContextOnItem {
|
|
cancel()
|
|
}
|
|
return nil
|
|
}
|
|
var err error
|
|
var panic interface{}
|
|
func() {
|
|
defer func() {
|
|
panic = recover()
|
|
}()
|
|
err = p.EachListItem(ctx, metav1.ListOptions{}, fn)
|
|
}()
|
|
if (panic != nil) && !tt.wantPanic {
|
|
t.Errorf(".EachListItem() panic = %v, wantPanic %v", panic, tt.wantPanic)
|
|
return
|
|
}
|
|
if (err != nil) != tt.wantErr {
|
|
t.Errorf("ListPager.EachListItem() error = %v, wantErr %v", err, tt.wantErr)
|
|
return
|
|
}
|
|
if tt.isExpired != errors.IsResourceExpired(err) {
|
|
t.Errorf("ListPager.EachListItem() error = %v, isExpired %v", err, tt.isExpired)
|
|
return
|
|
}
|
|
if tt.processorErrorOnItem > 0 && err != processorErr {
|
|
t.Errorf("ListPager.EachListItem() error = %v, processorErrorOnItem %d", err, tt.processorErrorOnItem)
|
|
return
|
|
}
|
|
l := tt.want.(*metainternalversion.List)
|
|
if !reflect.DeepEqual(items, l.Items) {
|
|
t.Errorf("ListPager.EachListItem() = %v, want %v", items, l.Items)
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestListPager_eachListPageBuffered(t *testing.T) {
|
|
tests := []struct {
|
|
name string
|
|
totalPages int
|
|
pagesProcessed int
|
|
wantPageLists int
|
|
pageBufferSize int32
|
|
pageSize int
|
|
}{
|
|
{
|
|
name: "no buffer, one total page",
|
|
totalPages: 1,
|
|
pagesProcessed: 1,
|
|
wantPageLists: 1,
|
|
pageBufferSize: 0,
|
|
}, {
|
|
name: "no buffer, 1/5 pages processed",
|
|
totalPages: 5,
|
|
pagesProcessed: 1,
|
|
wantPageLists: 2, // 1 received for processing, 1 listed
|
|
pageBufferSize: 0,
|
|
},
|
|
{
|
|
name: "no buffer, 2/5 pages processed",
|
|
totalPages: 5,
|
|
pagesProcessed: 2,
|
|
wantPageLists: 3,
|
|
pageBufferSize: 0,
|
|
},
|
|
{
|
|
name: "no buffer, 5/5 pages processed",
|
|
totalPages: 5,
|
|
pagesProcessed: 5,
|
|
wantPageLists: 5,
|
|
pageBufferSize: 0,
|
|
},
|
|
{
|
|
name: "size 1 buffer, 1/5 pages processed",
|
|
totalPages: 5,
|
|
pagesProcessed: 1,
|
|
wantPageLists: 3,
|
|
pageBufferSize: 1,
|
|
},
|
|
{
|
|
name: "size 1 buffer, 5/5 pages processed",
|
|
totalPages: 5,
|
|
pagesProcessed: 5,
|
|
wantPageLists: 5,
|
|
pageBufferSize: 1,
|
|
},
|
|
{
|
|
name: "size 10 buffer, 1/5 page processed",
|
|
totalPages: 5,
|
|
pagesProcessed: 1,
|
|
wantPageLists: 5,
|
|
pageBufferSize: 10, // buffer is larger than list
|
|
},
|
|
}
|
|
processorErr := fmt.Errorf("processor error")
|
|
pageSize := 10
|
|
for _, tt := range tests {
|
|
t.Run(tt.name, func(t *testing.T) {
|
|
pgr := &testPager{t: t, expectPage: int64(pageSize), remaining: tt.totalPages * pageSize, rv: "rv:20"}
|
|
pageLists := 0
|
|
wantedPageListsDone := make(chan struct{})
|
|
listFn := func(ctx context.Context, options metav1.ListOptions) (runtime.Object, error) {
|
|
pageLists++
|
|
if pageLists == tt.wantPageLists {
|
|
close(wantedPageListsDone)
|
|
}
|
|
return pgr.PagedList(ctx, options)
|
|
}
|
|
p := &ListPager{
|
|
PageSize: int64(pageSize),
|
|
PageBufferSize: tt.pageBufferSize,
|
|
PageFn: listFn,
|
|
}
|
|
|
|
pagesProcessed := 0
|
|
fn := func(obj runtime.Object) error {
|
|
pagesProcessed++
|
|
if tt.pagesProcessed == pagesProcessed && tt.wantPageLists > 0 {
|
|
// wait for buffering to catch up
|
|
select {
|
|
case <-time.After(time.Second):
|
|
return fmt.Errorf("Timed out waiting for %d page lists", tt.wantPageLists)
|
|
case <-wantedPageListsDone:
|
|
}
|
|
return processorErr
|
|
}
|
|
return nil
|
|
}
|
|
err := p.eachListChunkBuffered(context.Background(), metav1.ListOptions{}, fn)
|
|
if tt.pagesProcessed > 0 && err == processorErr {
|
|
// expected
|
|
} else if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
if tt.wantPageLists > 0 && pageLists != tt.wantPageLists {
|
|
t.Errorf("expected %d page lists, got %d", tt.wantPageLists, pageLists)
|
|
}
|
|
if pagesProcessed != tt.pagesProcessed {
|
|
t.Errorf("expected %d pages processed, got %d", tt.pagesProcessed, pagesProcessed)
|
|
}
|
|
})
|
|
}
|
|
}
|