mirror of
https://github.com/kubernetes/client-go.git
synced 2025-06-27 07:28:14 +00:00
Paginate List call performed by Reflector's ListAndWatch call
Kubernetes-commit: 84723c2d3ef5ff5d30aadd1ad72068bf2254358c
This commit is contained in:
parent
75debb4b68
commit
d858f18787
15
tools/cache/reflector.go
vendored
15
tools/cache/reflector.go
vendored
@ -17,6 +17,7 @@ limitations under the License.
|
|||||||
package cache
|
package cache
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
@ -38,6 +39,7 @@ import (
|
|||||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||||
"k8s.io/apimachinery/pkg/util/wait"
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
"k8s.io/apimachinery/pkg/watch"
|
"k8s.io/apimachinery/pkg/watch"
|
||||||
|
"k8s.io/client-go/tools/pager"
|
||||||
"k8s.io/klog"
|
"k8s.io/klog"
|
||||||
"k8s.io/utils/trace"
|
"k8s.io/utils/trace"
|
||||||
)
|
)
|
||||||
@ -68,6 +70,9 @@ type Reflector struct {
|
|||||||
lastSyncResourceVersion string
|
lastSyncResourceVersion string
|
||||||
// lastSyncResourceVersionMutex guards read/write access to lastSyncResourceVersion
|
// lastSyncResourceVersionMutex guards read/write access to lastSyncResourceVersion
|
||||||
lastSyncResourceVersionMutex sync.RWMutex
|
lastSyncResourceVersionMutex sync.RWMutex
|
||||||
|
// WatchListPageSize is the requested chunk size of initial and resync watch lists.
|
||||||
|
// Defaults to pager.PageSize.
|
||||||
|
WatchListPageSize int64
|
||||||
}
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
@ -179,7 +184,15 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
|
|||||||
panicCh <- r
|
panicCh <- r
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
list, err = r.listerWatcher.List(options)
|
// Attempt to gather list in chunks, if supported by listerWatcher, if not, the first
|
||||||
|
// list request will return the full response.
|
||||||
|
pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) {
|
||||||
|
return r.listerWatcher.List(opts)
|
||||||
|
}))
|
||||||
|
if r.WatchListPageSize != 0 {
|
||||||
|
pager.PageSize = r.WatchListPageSize
|
||||||
|
}
|
||||||
|
list, err = pager.List(context.Background(), options)
|
||||||
close(listCh)
|
close(listCh)
|
||||||
}()
|
}()
|
||||||
select {
|
select {
|
||||||
|
45
tools/cache/reflector_test.go
vendored
45
tools/cache/reflector_test.go
vendored
@ -24,7 +24,7 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"k8s.io/api/core/v1"
|
v1 "k8s.io/api/core/v1"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/runtime"
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
"k8s.io/apimachinery/pkg/util/wait"
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
@ -387,3 +387,46 @@ func TestReflectorResync(t *testing.T) {
|
|||||||
t.Errorf("exactly 2 iterations were expected, got: %v", iteration)
|
t.Errorf("exactly 2 iterations were expected, got: %v", iteration)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestReflectorWatchListPageSize(t *testing.T) {
|
||||||
|
stopCh := make(chan struct{})
|
||||||
|
s := NewStore(MetaNamespaceKeyFunc)
|
||||||
|
|
||||||
|
lw := &testLW{
|
||||||
|
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
|
||||||
|
// Stop once the reflector begins watching since we're only interested in the list.
|
||||||
|
close(stopCh)
|
||||||
|
fw := watch.NewFake()
|
||||||
|
return fw, nil
|
||||||
|
},
|
||||||
|
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
|
||||||
|
if options.Limit != 4 {
|
||||||
|
t.Fatalf("Expected list Limit of 4 but got %d", options.Limit)
|
||||||
|
}
|
||||||
|
pods := make([]v1.Pod, 10)
|
||||||
|
for i := 0; i < 10; i++ {
|
||||||
|
pods[i] = v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("pod-%d", i), ResourceVersion: fmt.Sprintf("%d", i)}}
|
||||||
|
}
|
||||||
|
switch options.Continue {
|
||||||
|
case "":
|
||||||
|
return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "10", Continue: "C1"}, Items: pods[0:4]}, nil
|
||||||
|
case "C1":
|
||||||
|
return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "10", Continue: "C2"}, Items: pods[4:8]}, nil
|
||||||
|
case "C2":
|
||||||
|
return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "10"}, Items: pods[8:10]}, nil
|
||||||
|
default:
|
||||||
|
t.Fatalf("Unrecognized continue: %s", options.Continue)
|
||||||
|
}
|
||||||
|
return nil, nil
|
||||||
|
},
|
||||||
|
}
|
||||||
|
r := NewReflector(lw, &v1.Pod{}, s, 0)
|
||||||
|
// Set the reflector to paginate the list request in 4 item chunks.
|
||||||
|
r.WatchListPageSize = 4
|
||||||
|
r.ListAndWatch(stopCh)
|
||||||
|
|
||||||
|
results := s.List()
|
||||||
|
if len(results) != 10 {
|
||||||
|
t.Errorf("Expected 10 results, got %d", len(results))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user