mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-19 18:02:01 +00:00
Merge pull request #115077 from smarterclayton/reflector_mock_clock
cache: Reflector should have the same injected clock as its informer
This commit is contained in:
commit
4c4d4ad0a4
@ -139,11 +139,11 @@ func (c *controller) Run(stopCh <-chan struct{}) {
|
|||||||
ReflectorOptions{
|
ReflectorOptions{
|
||||||
ResyncPeriod: c.config.FullResyncPeriod,
|
ResyncPeriod: c.config.FullResyncPeriod,
|
||||||
TypeDescription: c.config.ObjectDescription,
|
TypeDescription: c.config.ObjectDescription,
|
||||||
|
Clock: c.clock,
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
r.ShouldResync = c.config.ShouldResync
|
r.ShouldResync = c.config.ShouldResync
|
||||||
r.WatchListPageSize = c.config.WatchListPageSize
|
r.WatchListPageSize = c.config.WatchListPageSize
|
||||||
r.clock = c.clock
|
|
||||||
if c.config.WatchErrorHandler != nil {
|
if c.config.WatchErrorHandler != nil {
|
||||||
r.watchErrorHandler = c.config.WatchErrorHandler
|
r.watchErrorHandler = c.config.WatchErrorHandler
|
||||||
}
|
}
|
||||||
|
@ -183,6 +183,9 @@ type ReflectorOptions struct {
|
|||||||
// ResyncPeriod is the Reflector's resync period. If unset/unspecified, the resync period defaults to 0
|
// ResyncPeriod is the Reflector's resync period. If unset/unspecified, the resync period defaults to 0
|
||||||
// (do not resync).
|
// (do not resync).
|
||||||
ResyncPeriod time.Duration
|
ResyncPeriod time.Duration
|
||||||
|
|
||||||
|
// Clock allows tests to control time. If unset defaults to clock.RealClock{}
|
||||||
|
Clock clock.Clock
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewReflectorWithOptions creates a new Reflector object which will keep the
|
// NewReflectorWithOptions creates a new Reflector object which will keep the
|
||||||
@ -196,7 +199,10 @@ type ReflectorOptions struct {
|
|||||||
// everything as well as incrementally processing the things that
|
// everything as well as incrementally processing the things that
|
||||||
// change.
|
// change.
|
||||||
func NewReflectorWithOptions(lw ListerWatcher, expectedType interface{}, store Store, options ReflectorOptions) *Reflector {
|
func NewReflectorWithOptions(lw ListerWatcher, expectedType interface{}, store Store, options ReflectorOptions) *Reflector {
|
||||||
realClock := &clock.RealClock{}
|
reflectorClock := options.Clock
|
||||||
|
if reflectorClock == nil {
|
||||||
|
reflectorClock = clock.RealClock{}
|
||||||
|
}
|
||||||
r := &Reflector{
|
r := &Reflector{
|
||||||
name: options.Name,
|
name: options.Name,
|
||||||
resyncPeriod: options.ResyncPeriod,
|
resyncPeriod: options.ResyncPeriod,
|
||||||
@ -206,9 +212,9 @@ func NewReflectorWithOptions(lw ListerWatcher, expectedType interface{}, store S
|
|||||||
// We used to make the call every 1sec (1 QPS), the goal here is to achieve ~98% traffic reduction when
|
// We used to make the call every 1sec (1 QPS), the goal here is to achieve ~98% traffic reduction when
|
||||||
// API server is not healthy. With these parameters, backoff will stop at [30,60) sec interval which is
|
// API server is not healthy. With these parameters, backoff will stop at [30,60) sec interval which is
|
||||||
// 0.22 QPS. If we don't backoff for 2min, assume API server is healthy and we reset the backoff.
|
// 0.22 QPS. If we don't backoff for 2min, assume API server is healthy and we reset the backoff.
|
||||||
backoffManager: wait.NewExponentialBackoffManager(800*time.Millisecond, 30*time.Second, 2*time.Minute, 2.0, 1.0, realClock),
|
backoffManager: wait.NewExponentialBackoffManager(800*time.Millisecond, 30*time.Second, 2*time.Minute, 2.0, 1.0, reflectorClock),
|
||||||
initConnBackoffManager: wait.NewExponentialBackoffManager(800*time.Millisecond, 30*time.Second, 2*time.Minute, 2.0, 1.0, realClock),
|
initConnBackoffManager: wait.NewExponentialBackoffManager(800*time.Millisecond, 30*time.Second, 2*time.Minute, 2.0, 1.0, reflectorClock),
|
||||||
clock: realClock,
|
clock: reflectorClock,
|
||||||
watchErrorHandler: WatchErrorHandler(DefaultWatchErrorHandler),
|
watchErrorHandler: WatchErrorHandler(DefaultWatchErrorHandler),
|
||||||
expectedType: reflect.TypeOf(expectedType),
|
expectedType: reflect.TypeOf(expectedType),
|
||||||
}
|
}
|
||||||
|
@ -348,6 +348,18 @@ func TestSharedInformerWatchDisruption(t *testing.T) {
|
|||||||
// Simulate a connection loss (or even just a too-old-watch)
|
// Simulate a connection loss (or even just a too-old-watch)
|
||||||
source.ResetWatch()
|
source.ResetWatch()
|
||||||
|
|
||||||
|
// Wait long enough for the reflector to exit and the backoff function to start waiting
|
||||||
|
// on the fake clock, otherwise advancing the fake clock will have no effect.
|
||||||
|
// TODO: Make this deterministic by counting the number of waiters on FakeClock
|
||||||
|
time.Sleep(10 * time.Millisecond)
|
||||||
|
|
||||||
|
// Advance the clock to cause the backoff wait to expire.
|
||||||
|
clock.Step(1601 * time.Millisecond)
|
||||||
|
|
||||||
|
// Wait long enough for backoff to invoke ListWatch a second time and distribute events
|
||||||
|
// to listeners.
|
||||||
|
time.Sleep(10 * time.Millisecond)
|
||||||
|
|
||||||
for _, listener := range listeners {
|
for _, listener := range listeners {
|
||||||
if !listener.ok() {
|
if !listener.ok() {
|
||||||
t.Errorf("%s: expected %v, got %v", listener.name, listener.expectedItemNames, listener.receivedItemNames)
|
t.Errorf("%s: expected %v, got %v", listener.name, listener.expectedItemNames, listener.receivedItemNames)
|
||||||
|
Loading…
Reference in New Issue
Block a user