mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-27 05:27:21 +00:00
Avoid thundering herd on etcd on masters upgrade
This commit is contained in:
parent
1bb68a2cde
commit
773d3583bc
@ -85,7 +85,12 @@ type Reflector struct {
|
||||
// lastSyncResourceVersionMutex guards read/write access to lastSyncResourceVersion
|
||||
lastSyncResourceVersionMutex sync.RWMutex
|
||||
// WatchListPageSize is the requested chunk size of initial and resync watch lists.
|
||||
// Defaults to pager.PageSize.
|
||||
// If unset, for consistent reads (RV="") or reads that opt-into arbitrarily old data
|
||||
// (RV="0") it will default to pager.PageSize, for the rest (RV != "" && RV != "0")
|
||||
// it will turn off pagination to allow serving them from watch cache.
|
||||
// NOTE: It should be used carefully as paginated lists are always served directly from
|
||||
// etcd, which is significantly less efficient and may lead to serious performance and
|
||||
// scalability problems.
|
||||
WatchListPageSize int64
|
||||
}
|
||||
|
||||
@ -220,6 +225,21 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
|
||||
}))
|
||||
if r.WatchListPageSize != 0 {
|
||||
pager.PageSize = r.WatchListPageSize
|
||||
} else {
|
||||
// User didn't explicitly request pagination.
|
||||
if options.ResourceVersion != "" && options.ResourceVersion != "0" {
|
||||
// We also don't turn off pagination for ResourceVersion="0", since watch cache
|
||||
// is ignoring Limit in that case anyway, and if watchcache is not enabled we
|
||||
// don't introduce regression.
|
||||
|
||||
// With ResourceVersion != "", we have a possibility to list from watch cache,
|
||||
// but we do that (for ResourceVersion != "0") only if Limit is unset.
|
||||
// To avoid thundering herd on etcd (e.g. on master upgrades), we explicitly
|
||||
// switch off pagination to force listing from watch cache (if enabled).
|
||||
// With the existing semantic of RV (result is at least as fresh as provided RV),
|
||||
// this is correct and doesn't lead to going back in time.
|
||||
pager.PageSize = 0
|
||||
}
|
||||
}
|
||||
|
||||
list, err = pager.List(context.Background(), options)
|
||||
@ -320,7 +340,9 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
|
||||
if err != nil {
|
||||
switch {
|
||||
case isExpiredError(err):
|
||||
r.setIsLastSyncResourceVersionExpired(true)
|
||||
// Don't set LastSyncResourceVersionExpired - LIST call with ResourceVersion=RV already
|
||||
// has a semantic that it returns data at least as fresh as provided RV.
|
||||
// So first try to LIST with setting RV to resource version of last observed object.
|
||||
klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.expectedTypeName, err)
|
||||
case err == io.EOF:
|
||||
// watch closed normally
|
||||
@ -344,8 +366,10 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
|
||||
if err != errorStopRequested {
|
||||
switch {
|
||||
case isExpiredError(err):
|
||||
r.setIsLastSyncResourceVersionExpired(true)
|
||||
klog.V(4).Infof("%s: watch of %v ended with: %v", r.name, r.expectedTypeName, err)
|
||||
// Don't set LastSyncResourceVersionExpired - LIST call with ResourceVersion=RV already
|
||||
// has a semantic that it returns data at least as fresh as provided RV.
|
||||
// So first try to LIST with setting RV to resource version of last observed object.
|
||||
klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.expectedTypeName, err)
|
||||
default:
|
||||
klog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedTypeName, err)
|
||||
}
|
||||
|
@ -425,6 +425,8 @@ func TestReflectorWatchListPageSize(t *testing.T) {
|
||||
},
|
||||
}
|
||||
r := NewReflector(lw, &v1.Pod{}, s, 0)
|
||||
// Set resource version to test pagination also for not consistent reads.
|
||||
r.setLastSyncResourceVersion("10")
|
||||
// Set the reflector to paginate the list request in 4 item chunks.
|
||||
r.WatchListPageSize = 4
|
||||
r.ListAndWatch(stopCh)
|
||||
@ -435,6 +437,41 @@ func TestReflectorWatchListPageSize(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestReflectorNotPaginatingNotConsistentReads(t *testing.T) {
|
||||
stopCh := make(chan struct{})
|
||||
s := NewStore(MetaNamespaceKeyFunc)
|
||||
|
||||
lw := &testLW{
|
||||
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
|
||||
// Stop once the reflector begins watching since we're only interested in the list.
|
||||
close(stopCh)
|
||||
fw := watch.NewFake()
|
||||
return fw, nil
|
||||
},
|
||||
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
|
||||
if options.ResourceVersion != "10" {
|
||||
t.Fatalf("Expected ResourceVersion: \"10\", got: %s", options.ResourceVersion)
|
||||
}
|
||||
if options.Limit != 0 {
|
||||
t.Fatalf("Expected list Limit of 0 but got %d", options.Limit)
|
||||
}
|
||||
pods := make([]v1.Pod, 10)
|
||||
for i := 0; i < 10; i++ {
|
||||
pods[i] = v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("pod-%d", i), ResourceVersion: fmt.Sprintf("%d", i)}}
|
||||
}
|
||||
return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "10"}, Items: pods}, nil
|
||||
},
|
||||
}
|
||||
r := NewReflector(lw, &v1.Pod{}, s, 0)
|
||||
r.setLastSyncResourceVersion("10")
|
||||
r.ListAndWatch(stopCh)
|
||||
|
||||
results := s.List()
|
||||
if len(results) != 10 {
|
||||
t.Errorf("Expected 10 results, got %d", len(results))
|
||||
}
|
||||
}
|
||||
|
||||
// TestReflectorResyncWithResourceVersion ensures that a reflector keeps track of the ResourceVersion and sends
|
||||
// it in relist requests to prevent the reflector from traveling back in time if the relist is to a api-server or
|
||||
// etcd that is partitioned and serving older data than the reflector has already processed.
|
||||
|
Loading…
Reference in New Issue
Block a user