mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-28 14:07:14 +00:00
Reduce timeout for waiting for resource version
This commit is contained in:
parent
8e888a7671
commit
d5e235c831
@ -404,6 +404,12 @@ func IsForbidden(err error) bool {
|
|||||||
return reasonForError(err) == metav1.StatusReasonForbidden
|
return reasonForError(err) == metav1.StatusReasonForbidden
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// IsTimeout determines if err is an error which indicates that request times out due to long
|
||||||
|
// processing.
|
||||||
|
func IsTimeout(err error) bool {
|
||||||
|
return reasonForError(err) == metav1.StatusReasonTimeout
|
||||||
|
}
|
||||||
|
|
||||||
// IsServerTimeout determines if err is an error which indicates that the request needs to be retried
|
// IsServerTimeout determines if err is an error which indicates that the request needs to be retried
|
||||||
// by the client.
|
// by the client.
|
||||||
func IsServerTimeout(err error) bool {
|
func IsServerTimeout(err error) bool {
|
||||||
|
@ -376,7 +376,7 @@ func (c *Cacher) Get(ctx context.Context, key string, resourceVersion string, ob
|
|||||||
|
|
||||||
obj, exists, readResourceVersion, err := c.watchCache.WaitUntilFreshAndGet(getRV, key, nil)
|
obj, exists, readResourceVersion, err := c.watchCache.WaitUntilFreshAndGet(getRV, key, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to wait for fresh get: %v", err)
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if exists {
|
if exists {
|
||||||
@ -429,7 +429,7 @@ func (c *Cacher) GetToList(ctx context.Context, key string, resourceVersion stri
|
|||||||
|
|
||||||
obj, exists, readResourceVersion, err := c.watchCache.WaitUntilFreshAndGet(listRV, key, trace)
|
obj, exists, readResourceVersion, err := c.watchCache.WaitUntilFreshAndGet(listRV, key, trace)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to wait for fresh get: %v", err)
|
return err
|
||||||
}
|
}
|
||||||
trace.Step("Got from cache")
|
trace.Step("Got from cache")
|
||||||
|
|
||||||
@ -485,7 +485,7 @@ func (c *Cacher) List(ctx context.Context, key string, resourceVersion string, p
|
|||||||
|
|
||||||
objs, readResourceVersion, err := c.watchCache.WaitUntilFreshAndList(listRV, trace)
|
objs, readResourceVersion, err := c.watchCache.WaitUntilFreshAndList(listRV, trace)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to wait for fresh list: %v", err)
|
return err
|
||||||
}
|
}
|
||||||
trace.Step(fmt.Sprintf("Listed %d items from cache", len(objs)))
|
trace.Step(fmt.Sprintf("Listed %d items from cache", len(objs)))
|
||||||
if len(objs) > listVal.Cap() && pred.Label.Empty() && pred.Field.Empty() {
|
if len(objs) > listVal.Cap() && pred.Label.Empty() && pred.Field.Empty() {
|
||||||
|
@ -208,6 +208,29 @@ func TestList(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestInfiniteList(t *testing.T) {
|
||||||
|
server, etcdStorage := newEtcdTestStorage(t, testapi.Default.Codec(), etcdtest.PathPrefix())
|
||||||
|
defer server.Terminate(t)
|
||||||
|
cacher := newTestCacher(etcdStorage, 10)
|
||||||
|
defer cacher.Stop()
|
||||||
|
|
||||||
|
podFoo := makeTestPod("foo")
|
||||||
|
fooCreated := updatePod(t, etcdStorage, podFoo, nil)
|
||||||
|
|
||||||
|
// Set up List at fooCreated.ResourceVersion + 10
|
||||||
|
rv, err := storage.ParseWatchResourceVersion(fooCreated.ResourceVersion)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
listRV := strconv.Itoa(int(rv + 10))
|
||||||
|
|
||||||
|
result := &api.PodList{}
|
||||||
|
err = cacher.List(context.TODO(), "pods/ns", listRV, storage.Everything, result)
|
||||||
|
if !errors.IsTimeout(err) {
|
||||||
|
t.Errorf("Unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func verifyWatchEvent(t *testing.T, w watch.Interface, eventType watch.EventType, eventObject runtime.Object) {
|
func verifyWatchEvent(t *testing.T, w watch.Interface, eventType watch.EventType, eventObject runtime.Object) {
|
||||||
_, _, line, _ := goruntime.Caller(1)
|
_, _, line, _ := goruntime.Caller(1)
|
||||||
select {
|
select {
|
||||||
|
@ -35,9 +35,11 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
// MaximumListWait determines how long we're willing to wait for a
|
// blockTimeout determines how long we're willing to block the request
|
||||||
// list if a client specified a resource version in the future.
|
// to wait for a given resource version to be propagated to cache,
|
||||||
MaximumListWait = 60 * time.Second
|
// before terminating request and returning Timeout error with retry
|
||||||
|
// after suggestion.
|
||||||
|
blockTimeout = 3 * time.Second
|
||||||
)
|
)
|
||||||
|
|
||||||
// watchCacheEvent is a single "watch event" that is send to users of
|
// watchCacheEvent is a single "watch event" that is send to users of
|
||||||
@ -206,7 +208,8 @@ func parseResourceVersion(resourceVersion string) (uint64, error) {
|
|||||||
if resourceVersion == "" {
|
if resourceVersion == "" {
|
||||||
return 0, nil
|
return 0, nil
|
||||||
}
|
}
|
||||||
return strconv.ParseUint(resourceVersion, 10, 64)
|
// Use bitsize being the size of int on the machine.
|
||||||
|
return strconv.ParseUint(resourceVersion, 10, 0)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, updateFunc func(*storeElement) error) error {
|
func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, updateFunc func(*storeElement) error) error {
|
||||||
@ -288,7 +291,7 @@ func (w *watchCache) waitUntilFreshAndBlock(resourceVersion uint64, trace *util.
|
|||||||
// it will wake up the loop below sometime after the broadcast,
|
// it will wake up the loop below sometime after the broadcast,
|
||||||
// we don't need to worry about waking it up before the time
|
// we don't need to worry about waking it up before the time
|
||||||
// has expired accidentally.
|
// has expired accidentally.
|
||||||
<-w.clock.After(MaximumListWait)
|
<-w.clock.After(blockTimeout)
|
||||||
w.cond.Broadcast()
|
w.cond.Broadcast()
|
||||||
}()
|
}()
|
||||||
|
|
||||||
@ -297,8 +300,9 @@ func (w *watchCache) waitUntilFreshAndBlock(resourceVersion uint64, trace *util.
|
|||||||
trace.Step("watchCache locked acquired")
|
trace.Step("watchCache locked acquired")
|
||||||
}
|
}
|
||||||
for w.resourceVersion < resourceVersion {
|
for w.resourceVersion < resourceVersion {
|
||||||
if w.clock.Since(startTime) >= MaximumListWait {
|
if w.clock.Since(startTime) >= blockTimeout {
|
||||||
return fmt.Errorf("time limit exceeded while waiting for resource version %v (current value: %v)", resourceVersion, w.resourceVersion)
|
// Timeout with retry after 1 second.
|
||||||
|
return errors.NewTimeoutError(fmt.Sprintf("Too large resource version: %v, current: %v", resourceVersion, w.resourceVersion), 1)
|
||||||
}
|
}
|
||||||
w.cond.Wait()
|
w.cond.Wait()
|
||||||
}
|
}
|
||||||
|
@ -302,7 +302,7 @@ func TestWaitUntilFreshAndListTimeout(t *testing.T) {
|
|||||||
for !fc.HasWaiters() {
|
for !fc.HasWaiters() {
|
||||||
time.Sleep(time.Millisecond)
|
time.Sleep(time.Millisecond)
|
||||||
}
|
}
|
||||||
fc.Step(MaximumListWait)
|
fc.Step(blockTimeout)
|
||||||
|
|
||||||
// Add an object to make sure the test would
|
// Add an object to make sure the test would
|
||||||
// eventually fail instead of just waiting
|
// eventually fail instead of just waiting
|
||||||
|
Loading…
Reference in New Issue
Block a user