Merge pull request #75389 from jpbetz/pagination-v1

Paginate watch cache->etcd List calls & reflector init/resync List calls not served by watch cache

Kubernetes-commit: c79fbabf234bea36f7b870da8e763c542c804be0
This commit is contained in:
Kubernetes Publisher 2019-04-03 11:14:25 -07:00
commit 1300bc81aa
2 changed files with 59 additions and 2 deletions

View File

@ -17,6 +17,7 @@ limitations under the License.
package cache package cache
import ( import (
"context"
"errors" "errors"
"fmt" "fmt"
"io" "io"
@ -38,6 +39,7 @@ import (
utilruntime "k8s.io/apimachinery/pkg/util/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch" "k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/tools/pager"
"k8s.io/klog" "k8s.io/klog"
"k8s.io/utils/trace" "k8s.io/utils/trace"
) )
@ -68,6 +70,9 @@ type Reflector struct {
lastSyncResourceVersion string lastSyncResourceVersion string
// lastSyncResourceVersionMutex guards read/write access to lastSyncResourceVersion // lastSyncResourceVersionMutex guards read/write access to lastSyncResourceVersion
lastSyncResourceVersionMutex sync.RWMutex lastSyncResourceVersionMutex sync.RWMutex
// WatchListPageSize is the requested chunk size of initial and resync watch lists.
// Defaults to pager.PageSize.
WatchListPageSize int64
} }
var ( var (
@ -179,7 +184,16 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
panicCh <- r panicCh <- r
} }
}() }()
list, err = r.listerWatcher.List(options) // Attempt to gather list in chunks, if supported by listerWatcher, if not, the first
// list request will return the full response.
pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) {
return r.listerWatcher.List(opts)
}))
if r.WatchListPageSize != 0 {
pager.PageSize = r.WatchListPageSize
}
// Pager falls back to full list if paginated list calls fail due to an "Expired" error.
list, err = pager.List(context.Background(), options)
close(listCh) close(listCh)
}() }()
select { select {

View File

@ -24,7 +24,7 @@ import (
"testing" "testing"
"time" "time"
"k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
@ -387,3 +387,46 @@ func TestReflectorResync(t *testing.T) {
t.Errorf("exactly 2 iterations were expected, got: %v", iteration) t.Errorf("exactly 2 iterations were expected, got: %v", iteration)
} }
} }
func TestReflectorWatchListPageSize(t *testing.T) {
stopCh := make(chan struct{})
s := NewStore(MetaNamespaceKeyFunc)
lw := &testLW{
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
// Stop once the reflector begins watching since we're only interested in the list.
close(stopCh)
fw := watch.NewFake()
return fw, nil
},
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
if options.Limit != 4 {
t.Fatalf("Expected list Limit of 4 but got %d", options.Limit)
}
pods := make([]v1.Pod, 10)
for i := 0; i < 10; i++ {
pods[i] = v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("pod-%d", i), ResourceVersion: fmt.Sprintf("%d", i)}}
}
switch options.Continue {
case "":
return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "10", Continue: "C1"}, Items: pods[0:4]}, nil
case "C1":
return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "10", Continue: "C2"}, Items: pods[4:8]}, nil
case "C2":
return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "10"}, Items: pods[8:10]}, nil
default:
t.Fatalf("Unrecognized continue: %s", options.Continue)
}
return nil, nil
},
}
r := NewReflector(lw, &v1.Pod{}, s, 0)
// Set the reflector to paginate the list request in 4 item chunks.
r.WatchListPageSize = 4
r.ListAndWatch(stopCh)
results := s.List()
if len(results) != 10 {
t.Errorf("Expected 10 results, got %d", len(results))
}
}