mirror of
https://github.com/kubernetes/client-go.git
synced 2025-06-27 15:39:39 +00:00
Merge pull request #86430 from wojtek-t/avoid_thundering_herd_on_etcd
Avoid thundering herd of relists on etcd Kubernetes-commit: d52ecd5f70cdf5f13f919bab56cf08cd556a2e26
This commit is contained in:
commit
203d909765
61
tools/cache/reflector.go
vendored
61
tools/cache/reflector.go
vendored
@ -75,6 +75,9 @@ type Reflector struct {
|
||||
ShouldResync func() bool
|
||||
// clock allows tests to manipulate time
|
||||
clock clock.Clock
|
||||
// paginatedResult defines whether pagination should be forced for list calls.
|
||||
// It is set based on the result of the initial list call.
|
||||
paginatedResult bool
|
||||
// lastSyncResourceVersion is the resource version token last
|
||||
// observed when doing a sync with the underlying store
|
||||
// it is thread safe, but not synchronized with the underlying store
|
||||
@ -85,7 +88,12 @@ type Reflector struct {
|
||||
// lastSyncResourceVersionMutex guards read/write access to lastSyncResourceVersion
|
||||
lastSyncResourceVersionMutex sync.RWMutex
|
||||
// WatchListPageSize is the requested chunk size of initial and resync watch lists.
|
||||
// Defaults to pager.PageSize.
|
||||
// If unset, for consistent reads (RV="") or reads that opt-into arbitrarily old data
|
||||
// (RV="0") it will default to pager.PageSize, for the rest (RV != "" && RV != "0")
|
||||
// it will turn off pagination to allow serving them from watch cache.
|
||||
// NOTE: It should be used carefully as paginated lists are always served directly from
|
||||
// etcd, which is significantly less efficient and may lead to serious performance and
|
||||
// scalability problems.
|
||||
WatchListPageSize int64
|
||||
}
|
||||
|
||||
@ -204,6 +212,7 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
|
||||
initTrace := trace.New("Reflector ListAndWatch", trace.Field{"name", r.name})
|
||||
defer initTrace.LogIfLong(10 * time.Second)
|
||||
var list runtime.Object
|
||||
var paginatedResult bool
|
||||
var err error
|
||||
listCh := make(chan struct{}, 1)
|
||||
panicCh := make(chan interface{}, 1)
|
||||
@ -218,11 +227,30 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
|
||||
pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) {
|
||||
return r.listerWatcher.List(opts)
|
||||
}))
|
||||
if r.WatchListPageSize != 0 {
|
||||
switch {
|
||||
case r.WatchListPageSize != 0:
|
||||
pager.PageSize = r.WatchListPageSize
|
||||
case r.paginatedResult:
|
||||
// We got a paginated result initially. Assume this resource and server honor
|
||||
// paging requests (i.e. watch cache is probably disabled) and leave the default
|
||||
// pager size set.
|
||||
case options.ResourceVersion != "" && options.ResourceVersion != "0":
|
||||
// User didn't explicitly request pagination.
|
||||
//
|
||||
// With ResourceVersion != "", we have a possibility to list from watch cache,
|
||||
// but we do that (for ResourceVersion != "0") only if Limit is unset.
|
||||
// To avoid thundering herd on etcd (e.g. on master upgrades), we explicitly
|
||||
// switch off pagination to force listing from watch cache (if enabled).
|
||||
// With the existing semantic of RV (result is at least as fresh as provided RV),
|
||||
// this is correct and doesn't lead to going back in time.
|
||||
//
|
||||
// We also don't turn off pagination for ResourceVersion="0", since watch cache
|
||||
// is ignoring Limit in that case anyway, and if watch cache is not enabled
|
||||
// we don't introduce regression.
|
||||
pager.PageSize = 0
|
||||
}
|
||||
|
||||
list, err = pager.List(context.Background(), options)
|
||||
list, paginatedResult, err = pager.List(context.Background(), options)
|
||||
if isExpiredError(err) {
|
||||
r.setIsLastSyncResourceVersionExpired(true)
|
||||
// Retry immediately if the resource version used to list is expired.
|
||||
@ -230,7 +258,7 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
|
||||
// continuation pages, but the pager might not be enabled, or the full list might fail because the
|
||||
// resource version it is listing at is expired, so we need to fallback to resourceVersion="" in all
|
||||
// to recover and ensure the reflector makes forward progress.
|
||||
list, err = pager.List(context.Background(), metav1.ListOptions{ResourceVersion: r.relistResourceVersion()})
|
||||
list, paginatedResult, err = pager.List(context.Background(), metav1.ListOptions{ResourceVersion: r.relistResourceVersion()})
|
||||
}
|
||||
close(listCh)
|
||||
}()
|
||||
@ -244,6 +272,21 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
|
||||
if err != nil {
|
||||
return fmt.Errorf("%s: Failed to list %v: %v", r.name, r.expectedTypeName, err)
|
||||
}
|
||||
|
||||
// We check if the list was paginated and if so set the paginatedResult based on that.
|
||||
// However, we want to do that only for the initial list (which is the only case
|
||||
// when we set ResourceVersion="0"). The reasoning behind it is that later, in some
|
||||
// situations we may force listing directly from etcd (by setting ResourceVersion="")
|
||||
// which will return paginated result, even if watch cache is enabled. However, in
|
||||
// that case, we still want to prefer sending requests to watch cache if possible.
|
||||
//
|
||||
// Paginated result returned for request with ResourceVersion="0" mean that watch
|
||||
// cache is disabled and there are a lot of objects of a given type. In such case,
|
||||
// there is no need to prefer listing from watch cache.
|
||||
if options.ResourceVersion == "0" && paginatedResult {
|
||||
r.paginatedResult = true
|
||||
}
|
||||
|
||||
r.setIsLastSyncResourceVersionExpired(false) // list was successful
|
||||
initTrace.Step("Objects listed")
|
||||
listMetaInterface, err := meta.ListAccessor(list)
|
||||
@ -320,7 +363,9 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
|
||||
if err != nil {
|
||||
switch {
|
||||
case isExpiredError(err):
|
||||
r.setIsLastSyncResourceVersionExpired(true)
|
||||
// Don't set LastSyncResourceVersionExpired - LIST call with ResourceVersion=RV already
|
||||
// has a semantic that it returns data at least as fresh as provided RV.
|
||||
// So first try to LIST with setting RV to resource version of last observed object.
|
||||
klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.expectedTypeName, err)
|
||||
case err == io.EOF:
|
||||
// watch closed normally
|
||||
@ -344,8 +389,10 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
|
||||
if err != errorStopRequested {
|
||||
switch {
|
||||
case isExpiredError(err):
|
||||
r.setIsLastSyncResourceVersionExpired(true)
|
||||
klog.V(4).Infof("%s: watch of %v ended with: %v", r.name, r.expectedTypeName, err)
|
||||
// Don't set LastSyncResourceVersionExpired - LIST call with ResourceVersion=RV already
|
||||
// has a semantic that it returns data at least as fresh as provided RV.
|
||||
// So first try to LIST with setting RV to resource version of last observed object.
|
||||
klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.expectedTypeName, err)
|
||||
default:
|
||||
klog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedTypeName, err)
|
||||
}
|
||||
|
88
tools/cache/reflector_test.go
vendored
88
tools/cache/reflector_test.go
vendored
@ -425,6 +425,8 @@ func TestReflectorWatchListPageSize(t *testing.T) {
|
||||
},
|
||||
}
|
||||
r := NewReflector(lw, &v1.Pod{}, s, 0)
|
||||
// Set resource version to test pagination also for not consistent reads.
|
||||
r.setLastSyncResourceVersion("10")
|
||||
// Set the reflector to paginate the list request in 4 item chunks.
|
||||
r.WatchListPageSize = 4
|
||||
r.ListAndWatch(stopCh)
|
||||
@ -435,6 +437,92 @@ func TestReflectorWatchListPageSize(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestReflectorNotPaginatingNotConsistentReads(t *testing.T) {
|
||||
stopCh := make(chan struct{})
|
||||
s := NewStore(MetaNamespaceKeyFunc)
|
||||
|
||||
lw := &testLW{
|
||||
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
|
||||
// Stop once the reflector begins watching since we're only interested in the list.
|
||||
close(stopCh)
|
||||
fw := watch.NewFake()
|
||||
return fw, nil
|
||||
},
|
||||
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
|
||||
if options.ResourceVersion != "10" {
|
||||
t.Fatalf("Expected ResourceVersion: \"10\", got: %s", options.ResourceVersion)
|
||||
}
|
||||
if options.Limit != 0 {
|
||||
t.Fatalf("Expected list Limit of 0 but got %d", options.Limit)
|
||||
}
|
||||
pods := make([]v1.Pod, 10)
|
||||
for i := 0; i < 10; i++ {
|
||||
pods[i] = v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("pod-%d", i), ResourceVersion: fmt.Sprintf("%d", i)}}
|
||||
}
|
||||
return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "10"}, Items: pods}, nil
|
||||
},
|
||||
}
|
||||
r := NewReflector(lw, &v1.Pod{}, s, 0)
|
||||
r.setLastSyncResourceVersion("10")
|
||||
r.ListAndWatch(stopCh)
|
||||
|
||||
results := s.List()
|
||||
if len(results) != 10 {
|
||||
t.Errorf("Expected 10 results, got %d", len(results))
|
||||
}
|
||||
}
|
||||
|
||||
func TestReflectorPaginatingNonConsistentReadsIfWatchCacheDisabled(t *testing.T) {
|
||||
var stopCh chan struct{}
|
||||
s := NewStore(MetaNamespaceKeyFunc)
|
||||
|
||||
lw := &testLW{
|
||||
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
|
||||
// Stop once the reflector begins watching since we're only interested in the list.
|
||||
close(stopCh)
|
||||
fw := watch.NewFake()
|
||||
return fw, nil
|
||||
},
|
||||
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
|
||||
// Check that default pager limit is set.
|
||||
if options.Limit != 500 {
|
||||
t.Fatalf("Expected list Limit of 500 but got %d", options.Limit)
|
||||
}
|
||||
pods := make([]v1.Pod, 10)
|
||||
for i := 0; i < 10; i++ {
|
||||
pods[i] = v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("pod-%d", i), ResourceVersion: fmt.Sprintf("%d", i)}}
|
||||
}
|
||||
switch options.Continue {
|
||||
case "":
|
||||
return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "10", Continue: "C1"}, Items: pods[0:4]}, nil
|
||||
case "C1":
|
||||
return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "10", Continue: "C2"}, Items: pods[4:8]}, nil
|
||||
case "C2":
|
||||
return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "10"}, Items: pods[8:10]}, nil
|
||||
default:
|
||||
t.Fatalf("Unrecognized continue: %s", options.Continue)
|
||||
}
|
||||
return nil, nil
|
||||
},
|
||||
}
|
||||
r := NewReflector(lw, &v1.Pod{}, s, 0)
|
||||
|
||||
// Initial list should initialize paginatedResult in the reflector.
|
||||
stopCh = make(chan struct{})
|
||||
r.ListAndWatch(stopCh)
|
||||
if results := s.List(); len(results) != 10 {
|
||||
t.Errorf("Expected 10 results, got %d", len(results))
|
||||
}
|
||||
|
||||
// Since initial list for ResourceVersion="0" was paginated, the subsequent
|
||||
// ones should also be paginated.
|
||||
stopCh = make(chan struct{})
|
||||
r.ListAndWatch(stopCh)
|
||||
if results := s.List(); len(results) != 10 {
|
||||
t.Errorf("Expected 10 results, got %d", len(results))
|
||||
}
|
||||
}
|
||||
|
||||
// TestReflectorResyncWithResourceVersion ensures that a reflector keeps track of the ResourceVersion and sends
|
||||
// it in relist requests to prevent the reflector from traveling back in time if the relist is to a api-server or
|
||||
// etcd that is partitioned and serving older data than the reflector has already processed.
|
||||
|
@ -73,16 +73,18 @@ func New(fn ListPageFunc) *ListPager {
|
||||
// List returns a single list object, but attempts to retrieve smaller chunks from the
|
||||
// server to reduce the impact on the server. If the chunk attempt fails, it will load
|
||||
// the full list instead. The Limit field on options, if unset, will default to the page size.
|
||||
func (p *ListPager) List(ctx context.Context, options metav1.ListOptions) (runtime.Object, error) {
|
||||
func (p *ListPager) List(ctx context.Context, options metav1.ListOptions) (runtime.Object, bool, error) {
|
||||
if options.Limit == 0 {
|
||||
options.Limit = p.PageSize
|
||||
}
|
||||
requestedResourceVersion := options.ResourceVersion
|
||||
var list *metainternalversion.List
|
||||
paginatedResult := false
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil, ctx.Err()
|
||||
return nil, paginatedResult, ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
@ -93,23 +95,24 @@ func (p *ListPager) List(ctx context.Context, options metav1.ListOptions) (runti
|
||||
// failing when the resource versions is established by the first page request falls out of the compaction
|
||||
// during the subsequent list requests).
|
||||
if !errors.IsResourceExpired(err) || !p.FullListIfExpired || options.Continue == "" {
|
||||
return nil, err
|
||||
return nil, paginatedResult, err
|
||||
}
|
||||
// the list expired while we were processing, fall back to a full list at
|
||||
// the requested ResourceVersion.
|
||||
options.Limit = 0
|
||||
options.Continue = ""
|
||||
options.ResourceVersion = requestedResourceVersion
|
||||
return p.PageFn(ctx, options)
|
||||
result, err := p.PageFn(ctx, options)
|
||||
return result, paginatedResult, err
|
||||
}
|
||||
m, err := meta.ListAccessor(obj)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("returned object must be a list: %v", err)
|
||||
return nil, paginatedResult, fmt.Errorf("returned object must be a list: %v", err)
|
||||
}
|
||||
|
||||
// exit early and return the object we got if we haven't processed any pages
|
||||
if len(m.GetContinue()) == 0 && list == nil {
|
||||
return obj, nil
|
||||
return obj, paginatedResult, nil
|
||||
}
|
||||
|
||||
// initialize the list and fill its contents
|
||||
@ -122,12 +125,12 @@ func (p *ListPager) List(ctx context.Context, options metav1.ListOptions) (runti
|
||||
list.Items = append(list.Items, obj)
|
||||
return nil
|
||||
}); err != nil {
|
||||
return nil, err
|
||||
return nil, paginatedResult, err
|
||||
}
|
||||
|
||||
// if we have no more items, return the list
|
||||
if len(m.GetContinue()) == 0 {
|
||||
return list, nil
|
||||
return list, paginatedResult, nil
|
||||
}
|
||||
|
||||
// set the next loop up
|
||||
@ -136,6 +139,8 @@ func (p *ListPager) List(ctx context.Context, options metav1.ListOptions) (runti
|
||||
// `specifying resource version is not allowed when using continue` error.
|
||||
// See https://github.com/kubernetes/kubernetes/issues/85221#issuecomment-553748143.
|
||||
options.ResourceVersion = ""
|
||||
// At this point, result is already paginated.
|
||||
paginatedResult = true
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -120,6 +120,7 @@ func (p *testPager) ExpiresOnSecondPageThenFullList(ctx context.Context, options
|
||||
}
|
||||
return p.PagedList(ctx, options)
|
||||
}
|
||||
|
||||
func TestListPager_List(t *testing.T) {
|
||||
type fields struct {
|
||||
PageSize int64
|
||||
@ -135,6 +136,7 @@ func TestListPager_List(t *testing.T) {
|
||||
fields fields
|
||||
args args
|
||||
want runtime.Object
|
||||
wantPaged bool
|
||||
wantErr bool
|
||||
isExpired bool
|
||||
}{
|
||||
@ -143,35 +145,41 @@ func TestListPager_List(t *testing.T) {
|
||||
fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 0, rv: "rv:20"}).PagedList},
|
||||
args: args{},
|
||||
want: list(0, "rv:20"),
|
||||
wantPaged: false,
|
||||
},
|
||||
{
|
||||
name: "one page",
|
||||
fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 9, rv: "rv:20"}).PagedList},
|
||||
args: args{},
|
||||
want: list(9, "rv:20"),
|
||||
wantPaged: false,
|
||||
},
|
||||
{
|
||||
name: "one full page",
|
||||
fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 10, rv: "rv:20"}).PagedList},
|
||||
args: args{},
|
||||
want: list(10, "rv:20"),
|
||||
wantPaged: false,
|
||||
},
|
||||
{
|
||||
name: "two pages",
|
||||
fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 11, rv: "rv:20"}).PagedList},
|
||||
args: args{},
|
||||
want: list(11, "rv:20"),
|
||||
wantPaged: true,
|
||||
},
|
||||
{
|
||||
name: "three pages",
|
||||
fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 21, rv: "rv:20"}).PagedList},
|
||||
args: args{},
|
||||
want: list(21, "rv:20"),
|
||||
wantPaged: true,
|
||||
},
|
||||
{
|
||||
name: "expires on second page",
|
||||
fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 21, rv: "rv:20"}).ExpiresOnSecondPage},
|
||||
args: args{},
|
||||
wantPaged: true,
|
||||
wantErr: true,
|
||||
isExpired: true,
|
||||
},
|
||||
@ -184,12 +192,14 @@ func TestListPager_List(t *testing.T) {
|
||||
},
|
||||
args: args{},
|
||||
want: list(21, "rv:20"),
|
||||
wantPaged: true,
|
||||
},
|
||||
{
|
||||
name: "two pages with resourceVersion",
|
||||
fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 11, rv: "rv:20"}).PagedList},
|
||||
args: args{options: metav1.ListOptions{ResourceVersion: "rv:10"}},
|
||||
want: list(11, "rv:20"),
|
||||
wantPaged: true,
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
@ -203,7 +213,7 @@ func TestListPager_List(t *testing.T) {
|
||||
if ctx == nil {
|
||||
ctx = context.Background()
|
||||
}
|
||||
got, err := p.List(ctx, tt.args.options)
|
||||
got, paginatedResult, err := p.List(ctx, tt.args.options)
|
||||
if (err != nil) != tt.wantErr {
|
||||
t.Errorf("ListPager.List() error = %v, wantErr %v", err, tt.wantErr)
|
||||
return
|
||||
@ -212,6 +222,9 @@ func TestListPager_List(t *testing.T) {
|
||||
t.Errorf("ListPager.List() error = %v, isExpired %v", err, tt.isExpired)
|
||||
return
|
||||
}
|
||||
if tt.wantPaged != paginatedResult {
|
||||
t.Errorf("paginatedResult = %t, want %t", paginatedResult, tt.wantPaged)
|
||||
}
|
||||
if !reflect.DeepEqual(got, tt.want) {
|
||||
t.Errorf("ListPager.List() = %v, want %v", got, tt.want)
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user