mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-24 20:24:09 +00:00
Merge pull request #130899 from serathius/watchcache-error
Implement watchcache returning error from etcd that caused cache reinitialization
This commit is contained in:
commit
e5558a81c9
@ -686,7 +686,6 @@ func expectConversionFailureMessage(id, message string) func(t *testing.T, ctc *
|
|||||||
objv1beta2 := newConversionMultiVersionFixture(ns, id, "v1beta2")
|
objv1beta2 := newConversionMultiVersionFixture(ns, id, "v1beta2")
|
||||||
meta, _, _ := unstructured.NestedFieldCopy(obj.Object, "metadata")
|
meta, _, _ := unstructured.NestedFieldCopy(obj.Object, "metadata")
|
||||||
unstructured.SetNestedField(objv1beta2.Object, meta, "metadata")
|
unstructured.SetNestedField(objv1beta2.Object, meta, "metadata")
|
||||||
lastRV := objv1beta2.GetResourceVersion()
|
|
||||||
|
|
||||||
for _, verb := range []string{"get", "list", "create", "update", "patch", "delete", "deletecollection"} {
|
for _, verb := range []string{"get", "list", "create", "update", "patch", "delete", "deletecollection"} {
|
||||||
t.Run(verb, func(t *testing.T) {
|
t.Run(verb, func(t *testing.T) {
|
||||||
@ -694,10 +693,7 @@ func expectConversionFailureMessage(id, message string) func(t *testing.T, ctc *
|
|||||||
case "get":
|
case "get":
|
||||||
_, err = clients["v1beta2"].Get(context.TODO(), obj.GetName(), metav1.GetOptions{})
|
_, err = clients["v1beta2"].Get(context.TODO(), obj.GetName(), metav1.GetOptions{})
|
||||||
case "list":
|
case "list":
|
||||||
// With ResilientWatchcCacheInitialization feature, List requests are rejected with 429 if watchcache is not initialized.
|
_, err = clients["v1beta2"].List(context.TODO(), metav1.ListOptions{})
|
||||||
// However, in some of these tests that install faulty converter webhook, watchcache will never initialize by definition (as list will never succeed due to faulty converter webook).
|
|
||||||
// In such case, the returned error will differ from the one returned from the etcd, so we need to force the request to go to etcd.
|
|
||||||
_, err = clients["v1beta2"].List(context.TODO(), metav1.ListOptions{ResourceVersion: lastRV, ResourceVersionMatch: metav1.ResourceVersionMatchExact})
|
|
||||||
case "create":
|
case "create":
|
||||||
_, err = clients["v1beta2"].Create(context.TODO(), newConversionMultiVersionFixture(ns, id, "v1beta2"), metav1.CreateOptions{})
|
_, err = clients["v1beta2"].Create(context.TODO(), newConversionMultiVersionFixture(ns, id, "v1beta2"), metav1.CreateOptions{})
|
||||||
case "update":
|
case "update":
|
||||||
|
@ -463,31 +463,19 @@ func NewCacherFromConfig(config Config) (*Cacher, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (c *Cacher) startCaching(stopChannel <-chan struct{}) {
|
func (c *Cacher) startCaching(stopChannel <-chan struct{}) {
|
||||||
// The 'usable' lock is always 'RLock'able when it is safe to use the cache.
|
|
||||||
// It is safe to use the cache after a successful list until a disconnection.
|
|
||||||
// We start with usable (write) locked. The below OnReplace function will
|
|
||||||
// unlock it after a successful list. The below defer will then re-lock
|
|
||||||
// it when this function exits (always due to disconnection), only if
|
|
||||||
// we actually got a successful list. This cycle will repeat as needed.
|
|
||||||
successfulList := false
|
|
||||||
c.watchCache.SetOnReplace(func() {
|
c.watchCache.SetOnReplace(func() {
|
||||||
successfulList = true
|
c.ready.setReady()
|
||||||
c.ready.set(true)
|
|
||||||
klog.V(1).Infof("cacher (%v): initialized", c.groupResource.String())
|
klog.V(1).Infof("cacher (%v): initialized", c.groupResource.String())
|
||||||
metrics.WatchCacheInitializations.WithLabelValues(c.groupResource.String()).Inc()
|
metrics.WatchCacheInitializations.WithLabelValues(c.groupResource.String()).Inc()
|
||||||
})
|
})
|
||||||
|
var err error
|
||||||
defer func() {
|
defer func() {
|
||||||
if successfulList {
|
c.ready.setError(err)
|
||||||
c.ready.set(false)
|
|
||||||
}
|
|
||||||
}()
|
}()
|
||||||
|
|
||||||
c.terminateAllWatchers()
|
c.terminateAllWatchers()
|
||||||
// Note that since onReplace may be not called due to errors, we explicitly
|
err = c.reflector.ListAndWatch(stopChannel)
|
||||||
// need to retry it on errors under lock.
|
if err != nil {
|
||||||
// Also note that startCaching is called in a loop, so there's no need
|
|
||||||
// to have another loop here.
|
|
||||||
if err := c.reflector.ListAndWatch(stopChannel); err != nil {
|
|
||||||
klog.Errorf("cacher (%v): unexpected ListAndWatch error: %v; reinitializing...", c.groupResource.String(), err)
|
klog.Errorf("cacher (%v): unexpected ListAndWatch error: %v; reinitializing...", c.groupResource.String(), err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -506,11 +494,11 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions
|
|||||||
|
|
||||||
var readyGeneration int
|
var readyGeneration int
|
||||||
if utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
|
if utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
|
||||||
var ok bool
|
var err error
|
||||||
var downtime time.Duration
|
var downtime time.Duration
|
||||||
readyGeneration, downtime, ok = c.ready.checkAndReadGeneration()
|
readyGeneration, downtime, err = c.ready.checkAndReadGeneration()
|
||||||
if !ok {
|
if err != nil {
|
||||||
return nil, errors.NewTooManyRequests("storage is (re)initializing", calculateRetryAfterForUnreadyCache(downtime))
|
return nil, errors.NewTooManyRequests(err.Error(), calculateRetryAfterForUnreadyCache(downtime))
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
readyGeneration, err = c.ready.waitAndReadGeneration(ctx)
|
readyGeneration, err = c.ready.waitAndReadGeneration(ctx)
|
||||||
@ -631,7 +619,7 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions
|
|||||||
c.Lock()
|
c.Lock()
|
||||||
defer c.Unlock()
|
defer c.Unlock()
|
||||||
|
|
||||||
if generation, _, ok := c.ready.checkAndReadGeneration(); generation != readyGeneration || !ok {
|
if generation, _, err := c.ready.checkAndReadGeneration(); generation != readyGeneration || err != nil {
|
||||||
// We went unready or are already on a different generation.
|
// We went unready or are already on a different generation.
|
||||||
// Avoid registering and starting the watch as it will have to be
|
// Avoid registering and starting the watch as it will have to be
|
||||||
// terminated immediately anyway.
|
// terminated immediately anyway.
|
||||||
@ -749,10 +737,10 @@ func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptio
|
|||||||
defer span.End(500 * time.Millisecond)
|
defer span.End(500 * time.Millisecond)
|
||||||
|
|
||||||
if utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
|
if utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
|
||||||
if downtime, ok := c.ready.check(); !ok {
|
if downtime, err := c.ready.check(); err != nil {
|
||||||
// If Cacher is not initialized, reject List requests
|
// If Cacher is not initialized, reject List requests
|
||||||
// as described in https://kep.k8s.io/4568
|
// as described in https://kep.k8s.io/4568
|
||||||
return errors.NewTooManyRequests("storage is (re)initializing", calculateRetryAfterForUnreadyCache(downtime))
|
return errors.NewTooManyRequests(err.Error(), calculateRetryAfterForUnreadyCache(downtime))
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if err := c.ready.wait(ctx); err != nil {
|
if err := c.ready.wait(ctx); err != nil {
|
||||||
@ -1304,8 +1292,8 @@ func (c *Cacher) setInitialEventsEndBookmarkIfRequested(cacheInterval *watchCach
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (c *Cacher) Ready() bool {
|
func (c *Cacher) Ready() bool {
|
||||||
_, ok := c.ready.check()
|
_, err := c.ready.check()
|
||||||
return ok
|
return err == nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// errWatcher implements watch.Interface to return a single error
|
// errWatcher implements watch.Interface to return a single error
|
||||||
|
@ -3213,7 +3213,7 @@ func TestRetryAfterForUnreadyCache(t *testing.T) {
|
|||||||
t.Fatalf("Unexpected error waiting for the cache to be ready")
|
t.Fatalf("Unexpected error waiting for the cache to be ready")
|
||||||
}
|
}
|
||||||
|
|
||||||
cacher.ready.set(false)
|
cacher.ready.setError(nil)
|
||||||
clock.Step(14 * time.Second)
|
clock.Step(14 * time.Second)
|
||||||
|
|
||||||
opts := storage.ListOptions{
|
opts := storage.ListOptions{
|
||||||
|
@ -42,6 +42,7 @@ const (
|
|||||||
// └---------------------------┘
|
// └---------------------------┘
|
||||||
type ready struct {
|
type ready struct {
|
||||||
state status // represent the state of the variable
|
state status // represent the state of the variable
|
||||||
|
lastErr error
|
||||||
generation int // represent the number of times we have transtioned to ready
|
generation int // represent the number of times we have transtioned to ready
|
||||||
lock sync.RWMutex // protect the state and generation variables
|
lock sync.RWMutex // protect the state and generation variables
|
||||||
restartLock sync.Mutex // protect the transition from ready to pending where the channel is recreated
|
restartLock sync.Mutex // protect the transition from ready to pending where the channel is recreated
|
||||||
@ -87,8 +88,7 @@ func (r *ready) waitAndReadGeneration(ctx context.Context) (int, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
r.lock.RLock()
|
r.lock.RLock()
|
||||||
switch r.state {
|
if r.state == Pending {
|
||||||
case Pending:
|
|
||||||
// since we allow to switch between the states Pending and Ready
|
// since we allow to switch between the states Pending and Ready
|
||||||
// if there is a quick transition from Pending -> Ready -> Pending
|
// if there is a quick transition from Pending -> Ready -> Pending
|
||||||
// a process that was waiting can get unblocked and see a Pending
|
// a process that was waiting can get unblocked and see a Pending
|
||||||
@ -96,40 +96,61 @@ func (r *ready) waitAndReadGeneration(ctx context.Context) (int, error) {
|
|||||||
// avoid an inconsistent state on the system, with some processes not
|
// avoid an inconsistent state on the system, with some processes not
|
||||||
// waiting despite the state moved back to Pending.
|
// waiting despite the state moved back to Pending.
|
||||||
r.lock.RUnlock()
|
r.lock.RUnlock()
|
||||||
case Ready:
|
continue
|
||||||
generation := r.generation
|
|
||||||
r.lock.RUnlock()
|
|
||||||
return generation, nil
|
|
||||||
case Stopped:
|
|
||||||
r.lock.RUnlock()
|
|
||||||
return 0, fmt.Errorf("apiserver cacher is stopped")
|
|
||||||
default:
|
|
||||||
r.lock.RUnlock()
|
|
||||||
return 0, fmt.Errorf("unexpected apiserver cache state: %v", r.state)
|
|
||||||
}
|
}
|
||||||
|
generation, err := r.readGenerationLocked()
|
||||||
|
r.lock.RUnlock()
|
||||||
|
return generation, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// check returns the time elapsed since the state was last changed and the current value.
|
// check returns the time elapsed since the state was last changed and the current value.
|
||||||
func (r *ready) check() (time.Duration, bool) {
|
func (r *ready) check() (time.Duration, error) {
|
||||||
_, elapsed, ok := r.checkAndReadGeneration()
|
_, elapsed, err := r.checkAndReadGeneration()
|
||||||
return elapsed, ok
|
return elapsed, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// checkAndReadGeneration returns the current generation, the time elapsed since the state was last changed and the current value.
|
// checkAndReadGeneration returns the current generation, the time elapsed since the state was last changed and the current value.
|
||||||
func (r *ready) checkAndReadGeneration() (int, time.Duration, bool) {
|
func (r *ready) checkAndReadGeneration() (int, time.Duration, error) {
|
||||||
r.lock.RLock()
|
r.lock.RLock()
|
||||||
defer r.lock.RUnlock()
|
defer r.lock.RUnlock()
|
||||||
return r.generation, r.clock.Since(r.lastStateChangeTime), r.state == Ready
|
generation, err := r.readGenerationLocked()
|
||||||
|
return generation, r.clock.Since(r.lastStateChangeTime), err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *ready) readGenerationLocked() (int, error) {
|
||||||
|
switch r.state {
|
||||||
|
case Pending:
|
||||||
|
if r.lastErr == nil {
|
||||||
|
return 0, fmt.Errorf("storage is (re)initializing")
|
||||||
|
} else {
|
||||||
|
return 0, fmt.Errorf("storage is (re)initializing: %w", r.lastErr)
|
||||||
|
}
|
||||||
|
case Ready:
|
||||||
|
return r.generation, nil
|
||||||
|
case Stopped:
|
||||||
|
return 0, fmt.Errorf("apiserver cacher is stopped")
|
||||||
|
default:
|
||||||
|
return 0, fmt.Errorf("unexpected apiserver cache state: %v", r.state)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *ready) setReady() {
|
||||||
|
r.set(true, nil)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *ready) setError(err error) {
|
||||||
|
r.set(false, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// set the state to Pending (false) or Ready (true), it does not have effect if the state is Stopped.
|
// set the state to Pending (false) or Ready (true), it does not have effect if the state is Stopped.
|
||||||
func (r *ready) set(ok bool) {
|
func (r *ready) set(ok bool, err error) {
|
||||||
r.lock.Lock()
|
r.lock.Lock()
|
||||||
defer r.lock.Unlock()
|
defer r.lock.Unlock()
|
||||||
if r.state == Stopped {
|
if r.state == Stopped {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
r.lastErr = err
|
||||||
if ok && r.state == Pending {
|
if ok && r.state == Pending {
|
||||||
r.state = Ready
|
r.state = Ready
|
||||||
r.generation++
|
r.generation++
|
||||||
|
@ -18,6 +18,7 @@ package cacher
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"errors"
|
||||||
"sync"
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
@ -28,7 +29,7 @@ import (
|
|||||||
func Test_newReady(t *testing.T) {
|
func Test_newReady(t *testing.T) {
|
||||||
errCh := make(chan error, 10)
|
errCh := make(chan error, 10)
|
||||||
ready := newReady(testingclock.NewFakeClock(time.Now()))
|
ready := newReady(testingclock.NewFakeClock(time.Now()))
|
||||||
ready.set(false)
|
ready.setError(nil)
|
||||||
// create 10 goroutines waiting for ready
|
// create 10 goroutines waiting for ready
|
||||||
for i := 0; i < 10; i++ {
|
for i := 0; i < 10; i++ {
|
||||||
go func() {
|
go func() {
|
||||||
@ -40,7 +41,7 @@ func Test_newReady(t *testing.T) {
|
|||||||
case <-errCh:
|
case <-errCh:
|
||||||
t.Errorf("ready should be blocking")
|
t.Errorf("ready should be blocking")
|
||||||
}
|
}
|
||||||
ready.set(true)
|
ready.setReady()
|
||||||
for i := 0; i < 10; i++ {
|
for i := 0; i < 10; i++ {
|
||||||
if err := <-errCh; err != nil {
|
if err := <-errCh; err != nil {
|
||||||
t.Errorf("unexpected error on channel %d", i)
|
t.Errorf("unexpected error on channel %d", i)
|
||||||
@ -51,22 +52,22 @@ func Test_newReady(t *testing.T) {
|
|||||||
func Test_newReadySetIdempotent(t *testing.T) {
|
func Test_newReadySetIdempotent(t *testing.T) {
|
||||||
errCh := make(chan error, 10)
|
errCh := make(chan error, 10)
|
||||||
ready := newReady(testingclock.NewFakeClock(time.Now()))
|
ready := newReady(testingclock.NewFakeClock(time.Now()))
|
||||||
ready.set(false)
|
ready.setError(nil)
|
||||||
ready.set(false)
|
ready.setError(nil)
|
||||||
ready.set(false)
|
ready.setError(nil)
|
||||||
if generation, _, ok := ready.checkAndReadGeneration(); generation != 0 || ok {
|
if generation, _, err := ready.checkAndReadGeneration(); generation != 0 || err == nil {
|
||||||
t.Errorf("unexpected state: generation=%v ready=%v", generation, ok)
|
t.Errorf("unexpected state: generation=%v ready=%v", generation, err)
|
||||||
}
|
}
|
||||||
ready.set(true)
|
ready.setReady()
|
||||||
if generation, _, ok := ready.checkAndReadGeneration(); generation != 1 || !ok {
|
if generation, _, err := ready.checkAndReadGeneration(); generation != 1 || err != nil {
|
||||||
t.Errorf("unexpected state: generation=%v ready=%v", generation, ok)
|
t.Errorf("unexpected state: generation=%v ready=%v", generation, err)
|
||||||
}
|
}
|
||||||
ready.set(true)
|
ready.setReady()
|
||||||
ready.set(true)
|
ready.setReady()
|
||||||
if generation, _, ok := ready.checkAndReadGeneration(); generation != 1 || !ok {
|
if generation, _, err := ready.checkAndReadGeneration(); generation != 1 || err != nil {
|
||||||
t.Errorf("unexpected state: generation=%v ready=%v", generation, ok)
|
t.Errorf("unexpected state: generation=%v ready=%v", generation, err)
|
||||||
}
|
}
|
||||||
ready.set(false)
|
ready.setError(nil)
|
||||||
// create 10 goroutines waiting for ready and stop
|
// create 10 goroutines waiting for ready and stop
|
||||||
for i := 0; i < 10; i++ {
|
for i := 0; i < 10; i++ {
|
||||||
go func() {
|
go func() {
|
||||||
@ -78,9 +79,9 @@ func Test_newReadySetIdempotent(t *testing.T) {
|
|||||||
case <-errCh:
|
case <-errCh:
|
||||||
t.Errorf("ready should be blocking")
|
t.Errorf("ready should be blocking")
|
||||||
}
|
}
|
||||||
ready.set(true)
|
ready.setReady()
|
||||||
if generation, _, ok := ready.checkAndReadGeneration(); generation != 2 || !ok {
|
if generation, _, err := ready.checkAndReadGeneration(); generation != 2 || err != nil {
|
||||||
t.Errorf("unexpected state: generation=%v ready=%v", generation, ok)
|
t.Errorf("unexpected state: generation=%v ready=%v", generation, err)
|
||||||
}
|
}
|
||||||
for i := 0; i < 10; i++ {
|
for i := 0; i < 10; i++ {
|
||||||
if err := <-errCh; err != nil {
|
if err := <-errCh; err != nil {
|
||||||
@ -95,7 +96,7 @@ func Test_newReadyRacy(t *testing.T) {
|
|||||||
concurrency := 1000
|
concurrency := 1000
|
||||||
errCh := make(chan error, concurrency)
|
errCh := make(chan error, concurrency)
|
||||||
ready := newReady(testingclock.NewFakeClock(time.Now()))
|
ready := newReady(testingclock.NewFakeClock(time.Now()))
|
||||||
ready.set(false)
|
ready.setError(nil)
|
||||||
|
|
||||||
wg := sync.WaitGroup{}
|
wg := sync.WaitGroup{}
|
||||||
wg.Add(2 * concurrency)
|
wg.Add(2 * concurrency)
|
||||||
@ -105,16 +106,16 @@ func Test_newReadyRacy(t *testing.T) {
|
|||||||
}()
|
}()
|
||||||
go func() {
|
go func() {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
ready.set(false)
|
ready.setError(nil)
|
||||||
}()
|
}()
|
||||||
go func() {
|
go func() {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
ready.set(true)
|
ready.setReady()
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
// Last one has to be set to true.
|
// Last one has to be set to true.
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
ready.set(true)
|
ready.setReady()
|
||||||
|
|
||||||
for i := 0; i < concurrency; i++ {
|
for i := 0; i < concurrency; i++ {
|
||||||
if err := <-errCh; err != nil {
|
if err := <-errCh; err != nil {
|
||||||
@ -126,7 +127,7 @@ func Test_newReadyRacy(t *testing.T) {
|
|||||||
func Test_newReadyStop(t *testing.T) {
|
func Test_newReadyStop(t *testing.T) {
|
||||||
errCh := make(chan error, 10)
|
errCh := make(chan error, 10)
|
||||||
ready := newReady(testingclock.NewFakeClock(time.Now()))
|
ready := newReady(testingclock.NewFakeClock(time.Now()))
|
||||||
ready.set(false)
|
ready.setError(nil)
|
||||||
// create 10 goroutines waiting for ready and stop
|
// create 10 goroutines waiting for ready and stop
|
||||||
for i := 0; i < 10; i++ {
|
for i := 0; i < 10; i++ {
|
||||||
go func() {
|
go func() {
|
||||||
@ -149,22 +150,22 @@ func Test_newReadyStop(t *testing.T) {
|
|||||||
func Test_newReadyCheck(t *testing.T) {
|
func Test_newReadyCheck(t *testing.T) {
|
||||||
ready := newReady(testingclock.NewFakeClock(time.Now()))
|
ready := newReady(testingclock.NewFakeClock(time.Now()))
|
||||||
// it starts as false
|
// it starts as false
|
||||||
if _, ok := ready.check(); ok {
|
if _, err := ready.check(); err == nil {
|
||||||
t.Errorf("unexpected ready state %v", ok)
|
t.Errorf("unexpected ready state %v", err)
|
||||||
}
|
}
|
||||||
ready.set(true)
|
ready.setReady()
|
||||||
if _, ok := ready.check(); !ok {
|
if _, err := ready.check(); err != nil {
|
||||||
t.Errorf("unexpected ready state %v", ok)
|
t.Errorf("unexpected ready state %v", err)
|
||||||
}
|
}
|
||||||
// stop sets ready to false
|
// stop sets ready to false
|
||||||
ready.stop()
|
ready.stop()
|
||||||
if _, ok := ready.check(); ok {
|
if _, err := ready.check(); err == nil {
|
||||||
t.Errorf("unexpected ready state %v", ok)
|
t.Errorf("unexpected ready state %v", err)
|
||||||
}
|
}
|
||||||
// can not set to true if is stopped
|
// can not set to true if is stopped
|
||||||
ready.set(true)
|
ready.setReady()
|
||||||
if _, ok := ready.check(); ok {
|
if _, err := ready.check(); err == nil {
|
||||||
t.Errorf("unexpected ready state %v", ok)
|
t.Errorf("unexpected ready state %v", err)
|
||||||
}
|
}
|
||||||
err := ready.wait(context.Background())
|
err := ready.wait(context.Background())
|
||||||
if err == nil {
|
if err == nil {
|
||||||
@ -175,7 +176,7 @@ func Test_newReadyCheck(t *testing.T) {
|
|||||||
func Test_newReadyCancelPending(t *testing.T) {
|
func Test_newReadyCancelPending(t *testing.T) {
|
||||||
errCh := make(chan error, 10)
|
errCh := make(chan error, 10)
|
||||||
ready := newReady(testingclock.NewFakeClock(time.Now()))
|
ready := newReady(testingclock.NewFakeClock(time.Now()))
|
||||||
ready.set(false)
|
ready.setError(nil)
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
// create 10 goroutines stuck on pending
|
// create 10 goroutines stuck on pending
|
||||||
for i := 0; i < 10; i++ {
|
for i := 0; i < 10; i++ {
|
||||||
@ -204,19 +205,19 @@ func Test_newReadyStateChangeTimestamp(t *testing.T) {
|
|||||||
fakeClock.Step(time.Minute)
|
fakeClock.Step(time.Minute)
|
||||||
checkReadyTransitionTime(t, ready, time.Minute)
|
checkReadyTransitionTime(t, ready, time.Minute)
|
||||||
|
|
||||||
ready.set(true)
|
ready.setReady()
|
||||||
fakeClock.Step(time.Minute)
|
fakeClock.Step(time.Minute)
|
||||||
checkReadyTransitionTime(t, ready, time.Minute)
|
checkReadyTransitionTime(t, ready, time.Minute)
|
||||||
fakeClock.Step(time.Minute)
|
fakeClock.Step(time.Minute)
|
||||||
checkReadyTransitionTime(t, ready, 2*time.Minute)
|
checkReadyTransitionTime(t, ready, 2*time.Minute)
|
||||||
|
|
||||||
ready.set(false)
|
ready.setError(nil)
|
||||||
fakeClock.Step(time.Minute)
|
fakeClock.Step(time.Minute)
|
||||||
checkReadyTransitionTime(t, ready, time.Minute)
|
checkReadyTransitionTime(t, ready, time.Minute)
|
||||||
fakeClock.Step(time.Minute)
|
fakeClock.Step(time.Minute)
|
||||||
checkReadyTransitionTime(t, ready, 2*time.Minute)
|
checkReadyTransitionTime(t, ready, 2*time.Minute)
|
||||||
|
|
||||||
ready.set(true)
|
ready.setReady()
|
||||||
fakeClock.Step(time.Minute)
|
fakeClock.Step(time.Minute)
|
||||||
checkReadyTransitionTime(t, ready, time.Minute)
|
checkReadyTransitionTime(t, ready, time.Minute)
|
||||||
|
|
||||||
@ -232,3 +233,21 @@ func checkReadyTransitionTime(t *testing.T, r *ready, expectedLastStateChangeDur
|
|||||||
t.Errorf("unexpected last state change duration: %v, expected: %v", lastStateChangeDuration, expectedLastStateChangeDuration)
|
t.Errorf("unexpected last state change duration: %v, expected: %v", lastStateChangeDuration, expectedLastStateChangeDuration)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestReadyError(t *testing.T) {
|
||||||
|
ready := newReady(testingclock.NewFakeClock(time.Now()))
|
||||||
|
_, _, err := ready.checkAndReadGeneration()
|
||||||
|
if err == nil || err.Error() != "storage is (re)initializing" {
|
||||||
|
t.Errorf("Unexpected error when unready, got %q", err)
|
||||||
|
}
|
||||||
|
ready.setError(errors.New("etcd is down"))
|
||||||
|
_, _, err = ready.checkAndReadGeneration()
|
||||||
|
if err == nil || err.Error() != "storage is (re)initializing: etcd is down" {
|
||||||
|
t.Errorf("Unexpected error when unready, got %q", err)
|
||||||
|
}
|
||||||
|
ready.setError(nil)
|
||||||
|
_, _, err = ready.checkAndReadGeneration()
|
||||||
|
if err == nil || err.Error() != "storage is (re)initializing" {
|
||||||
|
t.Errorf("Unexpected error when unready, got %q", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user