diff --git a/pkg/controller/replication/conversion.go b/pkg/controller/replication/conversion.go index 2e71fc0a6ff..4eca1f00896 100644 --- a/pkg/controller/replication/conversion.go +++ b/pkg/controller/replication/conversion.go @@ -125,13 +125,13 @@ type conversionEventHandler struct { handler cache.ResourceEventHandler } -func (h conversionEventHandler) OnAdd(obj interface{}) { +func (h conversionEventHandler) OnAdd(obj interface{}, isInInitialList bool) { rs, err := convertRCtoRS(obj.(*v1.ReplicationController), nil) if err != nil { utilruntime.HandleError(fmt.Errorf("dropping RC OnAdd event: can't convert object %#v to RS: %v", obj, err)) return } - h.handler.OnAdd(rs) + h.handler.OnAdd(rs, isInInitialList) } func (h conversionEventHandler) OnUpdate(oldObj, newObj interface{}) { diff --git a/pkg/controller/serviceaccount/serviceaccounts_controller.go b/pkg/controller/serviceaccount/serviceaccounts_controller.go index 4ba2741e665..03451cf0c8f 100644 --- a/pkg/controller/serviceaccount/serviceaccounts_controller.go +++ b/pkg/controller/serviceaccount/serviceaccounts_controller.go @@ -68,18 +68,18 @@ func NewServiceAccountsController(saInformer coreinformers.ServiceAccountInforme queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "serviceaccount"), } - saInformer.Informer().AddEventHandlerWithResyncPeriod(cache.ResourceEventHandlerFuncs{ + saHandler, _ := saInformer.Informer().AddEventHandlerWithResyncPeriod(cache.ResourceEventHandlerFuncs{ DeleteFunc: e.serviceAccountDeleted, }, options.ServiceAccountResync) e.saLister = saInformer.Lister() - e.saListerSynced = saInformer.Informer().HasSynced + e.saListerSynced = saHandler.HasSynced - nsInformer.Informer().AddEventHandlerWithResyncPeriod(cache.ResourceEventHandlerFuncs{ + nsHandler, _ := nsInformer.Informer().AddEventHandlerWithResyncPeriod(cache.ResourceEventHandlerFuncs{ AddFunc: e.namespaceAdded, UpdateFunc: e.namespaceUpdated, }, options.NamespaceResync) e.nsLister = nsInformer.Lister() - e.nsListerSynced = nsInformer.Informer().HasSynced + e.nsListerSynced = nsHandler.HasSynced e.syncHandler = e.syncNamespace diff --git a/plugin/pkg/auth/authorizer/node/graph_populator.go b/plugin/pkg/auth/authorizer/node/graph_populator.go index 1eec6afb42e..aff1f80c63c 100644 --- a/plugin/pkg/auth/authorizer/node/graph_populator.go +++ b/plugin/pkg/auth/authorizer/node/graph_populator.go @@ -44,30 +44,26 @@ func AddGraphEventHandlers( graph: graph, } - var hasSynced []cache.InformerSynced - - pods.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + podHandler, _ := pods.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: g.addPod, UpdateFunc: g.updatePod, DeleteFunc: g.deletePod, }) - hasSynced = append(hasSynced, pods.Informer().HasSynced) - pvs.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + pvsHandler, _ := pvs.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: g.addPV, UpdateFunc: g.updatePV, DeleteFunc: g.deletePV, }) - hasSynced = append(hasSynced, pvs.Informer().HasSynced) - attachments.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + attachHandler, _ := attachments.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: g.addVolumeAttachment, UpdateFunc: g.updateVolumeAttachment, DeleteFunc: g.deleteVolumeAttachment, }) - hasSynced = append(hasSynced, attachments.Informer().HasSynced) - go cache.WaitForNamedCacheSync("node_authorizer", wait.NeverStop, hasSynced...) + go cache.WaitForNamedCacheSync("node_authorizer", wait.NeverStop, + podHandler.HasSynced, pvsHandler.HasSynced, attachHandler.HasSynced) } func (g *graphPopulator) addPod(obj interface{}) { diff --git a/staging/src/k8s.io/apiserver/pkg/admission/configuration/mutating_webhook_manager.go b/staging/src/k8s.io/apiserver/pkg/admission/configuration/mutating_webhook_manager.go index ea58e6c3267..79fb0fc8ff2 100644 --- a/staging/src/k8s.io/apiserver/pkg/admission/configuration/mutating_webhook_manager.go +++ b/staging/src/k8s.io/apiserver/pkg/admission/configuration/mutating_webhook_manager.go @@ -36,11 +36,6 @@ type mutatingWebhookConfigurationManager struct { configuration *atomic.Value lister admissionregistrationlisters.MutatingWebhookConfigurationLister hasSynced func() bool - // initialConfigurationSynced tracks if - // the existing webhook configs have been synced (honored) by the - // manager at startup-- the informer has synced and either has no items - // or has finished executing updateConfiguration() once. - initialConfigurationSynced *atomic.Bool } var _ generic.Source = &mutatingWebhookConfigurationManager{} @@ -48,23 +43,25 @@ var _ generic.Source = &mutatingWebhookConfigurationManager{} func NewMutatingWebhookConfigurationManager(f informers.SharedInformerFactory) generic.Source { informer := f.Admissionregistration().V1().MutatingWebhookConfigurations() manager := &mutatingWebhookConfigurationManager{ - configuration: &atomic.Value{}, - lister: informer.Lister(), - hasSynced: informer.Informer().HasSynced, - initialConfigurationSynced: &atomic.Bool{}, + configuration: &atomic.Value{}, + lister: informer.Lister(), } // Start with an empty list manager.configuration.Store([]webhook.WebhookAccessor{}) - manager.initialConfigurationSynced.Store(false) // On any change, rebuild the config - informer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + // TODO: the initial sync for this is N ^ 2, ideally we should make it N. + handler, _ := informer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(_ interface{}) { manager.updateConfiguration() }, UpdateFunc: func(_, _ interface{}) { manager.updateConfiguration() }, DeleteFunc: func(_ interface{}) { manager.updateConfiguration() }, }) + // Since our processing is synchronous, this is all we need to do to + // see if we have processed everything or not. + manager.hasSynced = handler.HasSynced + return manager } @@ -73,28 +70,9 @@ func (m *mutatingWebhookConfigurationManager) Webhooks() []webhook.WebhookAccess return m.configuration.Load().([]webhook.WebhookAccessor) } -// HasSynced returns true when the manager is synced with existing webhookconfig -// objects at startup-- which means the informer is synced and either has no items -// or updateConfiguration() has completed. -func (m *mutatingWebhookConfigurationManager) HasSynced() bool { - if !m.hasSynced() { - return false - } - if m.initialConfigurationSynced.Load() { - // the informer has synced and configuration has been updated - return true - } - if configurations, err := m.lister.List(labels.Everything()); err == nil && len(configurations) == 0 { - // the empty list we initially stored is valid to use. - // Setting initialConfigurationSynced to true, so subsequent checks - // would be able to take the fast path on the atomic boolean in a - // cluster without any admission webhooks configured. - m.initialConfigurationSynced.Store(true) - // the informer has synced and we don't have any items - return true - } - return false -} +// HasSynced returns true if the initial set of mutating webhook configurations +// has been loaded. +func (m *mutatingWebhookConfigurationManager) HasSynced() bool { return m.hasSynced() } func (m *mutatingWebhookConfigurationManager) updateConfiguration() { configurations, err := m.lister.List(labels.Everything()) @@ -103,7 +81,6 @@ func (m *mutatingWebhookConfigurationManager) updateConfiguration() { return } m.configuration.Store(mergeMutatingWebhookConfigurations(configurations)) - m.initialConfigurationSynced.Store(true) } func mergeMutatingWebhookConfigurations(configurations []*v1.MutatingWebhookConfiguration) []webhook.WebhookAccessor { diff --git a/staging/src/k8s.io/apiserver/pkg/admission/configuration/validating_webhook_manager.go b/staging/src/k8s.io/apiserver/pkg/admission/configuration/validating_webhook_manager.go index 00f954251f4..da8035674d8 100644 --- a/staging/src/k8s.io/apiserver/pkg/admission/configuration/validating_webhook_manager.go +++ b/staging/src/k8s.io/apiserver/pkg/admission/configuration/validating_webhook_manager.go @@ -36,11 +36,6 @@ type validatingWebhookConfigurationManager struct { configuration *atomic.Value lister admissionregistrationlisters.ValidatingWebhookConfigurationLister hasSynced func() bool - // initialConfigurationSynced tracks if - // the existing webhook configs have been synced (honored) by the - // manager at startup-- the informer has synced and either has no items - // or has finished executing updateConfiguration() once. - initialConfigurationSynced *atomic.Bool } var _ generic.Source = &validatingWebhookConfigurationManager{} @@ -48,23 +43,25 @@ var _ generic.Source = &validatingWebhookConfigurationManager{} func NewValidatingWebhookConfigurationManager(f informers.SharedInformerFactory) generic.Source { informer := f.Admissionregistration().V1().ValidatingWebhookConfigurations() manager := &validatingWebhookConfigurationManager{ - configuration: &atomic.Value{}, - lister: informer.Lister(), - hasSynced: informer.Informer().HasSynced, - initialConfigurationSynced: &atomic.Bool{}, + configuration: &atomic.Value{}, + lister: informer.Lister(), } // Start with an empty list manager.configuration.Store([]webhook.WebhookAccessor{}) - manager.initialConfigurationSynced.Store(false) // On any change, rebuild the config - informer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + // TODO: the initial sync for this is N ^ 2, ideally we should make it N. + handle, _ := informer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(_ interface{}) { manager.updateConfiguration() }, UpdateFunc: func(_, _ interface{}) { manager.updateConfiguration() }, DeleteFunc: func(_ interface{}) { manager.updateConfiguration() }, }) + // Since our processing is synchronous, this is all we need to do to + // see if we have processed everything or not. + manager.hasSynced = handle.HasSynced + return manager } @@ -73,29 +70,9 @@ func (v *validatingWebhookConfigurationManager) Webhooks() []webhook.WebhookAcce return v.configuration.Load().([]webhook.WebhookAccessor) } -// HasSynced returns true when the manager is synced with existing webhookconfig -// objects at startup-- which means the informer is synced and either has no items -// or updateConfiguration() has completed. -func (v *validatingWebhookConfigurationManager) HasSynced() bool { - if !v.hasSynced() { - return false - } - if v.initialConfigurationSynced.Load() { - // the informer has synced and configuration has been updated - return true - } - if configurations, err := v.lister.List(labels.Everything()); err == nil && len(configurations) == 0 { - // the empty list we initially stored is valid to use. - // Setting initialConfigurationSynced to true, so subsequent checks - // would be able to take the fast path on the atomic boolean in a - // cluster without any admission webhooks configured. - v.initialConfigurationSynced.Store(true) - // the informer has synced and we don't have any items - return true - } - return false - -} +// HasSynced returns true if the initial set of mutating webhook configurations +// has been loaded. +func (v *validatingWebhookConfigurationManager) HasSynced() bool { return v.hasSynced() } func (v *validatingWebhookConfigurationManager) updateConfiguration() { configurations, err := v.lister.List(labels.Everything()) @@ -104,7 +81,6 @@ func (v *validatingWebhookConfigurationManager) updateConfiguration() { return } v.configuration.Store(mergeValidatingWebhookConfigurations(configurations)) - v.initialConfigurationSynced.Store(true) } func mergeValidatingWebhookConfigurations(configurations []*v1.ValidatingWebhookConfiguration) []webhook.WebhookAccessor { diff --git a/staging/src/k8s.io/apiserver/pkg/admission/plugin/validatingadmissionpolicy/controller.go b/staging/src/k8s.io/apiserver/pkg/admission/plugin/validatingadmissionpolicy/controller.go index 4398aa6b133..bdb2a0680ab 100644 --- a/staging/src/k8s.io/apiserver/pkg/admission/plugin/validatingadmissionpolicy/controller.go +++ b/staging/src/k8s.io/apiserver/pkg/admission/plugin/validatingadmissionpolicy/controller.go @@ -39,6 +39,7 @@ import ( "k8s.io/client-go/dynamic" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" ) var _ CELPolicyEvaluator = &celAdmissionController{} @@ -165,6 +166,7 @@ func NewAdmissionController( } func (c *celAdmissionController) Run(stopCh <-chan struct{}) { + // TODO: Doesn't this comparison need a lock? if c.runningContext != nil { return } @@ -302,9 +304,10 @@ func (c *celAdmissionController) Validate( // If the param informer for this admission policy has not yet // had time to perform an initial listing, don't attempt to use // it. - //!TOOD(alexzielenski): add a wait for a very short amount of - // time for the cache to sync - if !paramInfo.controller.HasSynced() { + //!TODO(alexzielenski): Add a shorter timeout + // than "forever" to this wait. + + if !cache.WaitForCacheSync(c.runningContext.Done(), paramInfo.controller.HasSynced) { addConfigError(fmt.Errorf("paramKind kind `%v` not yet synced to use for admission", paramKind.String()), definition, binding) continue diff --git a/staging/src/k8s.io/apiserver/pkg/admission/plugin/validatingadmissionpolicy/controller_reconcile.go b/staging/src/k8s.io/apiserver/pkg/admission/plugin/validatingadmissionpolicy/controller_reconcile.go index cbc89b518d6..17c4a9a6492 100644 --- a/staging/src/k8s.io/apiserver/pkg/admission/plugin/validatingadmissionpolicy/controller_reconcile.go +++ b/staging/src/k8s.io/apiserver/pkg/admission/plugin/validatingadmissionpolicy/controller_reconcile.go @@ -128,7 +128,7 @@ func (c *celAdmissionController) reconcilePolicyDefinition(namespace, name strin c.dynamicClient, paramsGVR.Resource, corev1.NamespaceAll, - 30*time.Second, + 30*time.Second, // TODO: do we really need to ever resync these? cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, nil, ) diff --git a/staging/src/k8s.io/apiserver/pkg/admission/plugin/validatingadmissionpolicy/internal/generic/controller.go b/staging/src/k8s.io/apiserver/pkg/admission/plugin/validatingadmissionpolicy/internal/generic/controller.go index bd5ea818d67..e1e1b04ebc6 100644 --- a/staging/src/k8s.io/apiserver/pkg/admission/plugin/validatingadmissionpolicy/internal/generic/controller.go +++ b/staging/src/k8s.io/apiserver/pkg/admission/plugin/validatingadmissionpolicy/internal/generic/controller.go @@ -21,6 +21,7 @@ import ( "errors" "fmt" "sync" + "sync/atomic" "time" kerrors "k8s.io/apimachinery/pkg/api/errors" @@ -30,6 +31,7 @@ import ( "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/tools/cache" + "k8s.io/client-go/tools/cache/synctrack" "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" ) @@ -45,6 +47,11 @@ type controller[T runtime.Object] struct { reconciler func(namespace, name string, newObj T) error options ControllerOptions + + // must hold a func() bool or nil + notificationsDelivered atomic.Value + + hasProcessed synctrack.AsyncTracker[string] } type ControllerOptions struct { @@ -69,12 +76,20 @@ func NewController[T runtime.Object]( options.Name = fmt.Sprintf("%T-controller", *new(T)) } - return &controller[T]{ + c := &controller[T]{ options: options, informer: informer, reconciler: reconciler, queue: nil, } + c.hasProcessed.UpstreamHasSynced = func() bool { + f := c.notificationsDelivered.Load() + if f == nil { + return false + } + return f.(func() bool)() + } + return c } // Runs the controller and returns an error explaining why running was stopped. @@ -92,20 +107,22 @@ func (c *controller[T]) Run(ctx context.Context) error { // would never shut down the workqueue defer c.queue.ShutDown() - enqueue := func(obj interface{}) { + enqueue := func(obj interface{}, isInInitialList bool) { var key string var err error if key, err = cache.DeletionHandlingMetaNamespaceKeyFunc(obj); err != nil { utilruntime.HandleError(err) return } + if isInInitialList { + c.hasProcessed.Start(key) + } + c.queue.Add(key) } - registration, err := c.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - enqueue(obj) - }, + registration, err := c.informer.AddEventHandler(cache.ResourceEventHandlerDetailedFuncs{ + AddFunc: enqueue, UpdateFunc: func(oldObj, newObj interface{}) { oldMeta, err1 := meta.Accessor(oldObj) newMeta, err2 := meta.Accessor(newObj) @@ -126,13 +143,14 @@ func (c *controller[T]) Run(ctx context.Context) error { return } - enqueue(newObj) + enqueue(newObj, false) }, DeleteFunc: func(obj interface{}) { // Enqueue - enqueue(obj) + enqueue(obj, false) }, }) + c.notificationsDelivered.Store(registration.HasSynced) // Error might be raised if informer was started and stopped already if err != nil { @@ -142,6 +160,7 @@ func (c *controller[T]) Run(ctx context.Context) error { // Make sure event handler is removed from informer in case return early from // an error defer func() { + c.notificationsDelivered.Store(func() bool { return false }) // Remove event handler and Handle Error here. Error should only be raised // for improper usage of event handler API. if err := c.informer.RemoveEventHandler(registration); err != nil { @@ -188,7 +207,7 @@ func (c *controller[T]) Run(ctx context.Context) error { } func (c *controller[T]) HasSynced() bool { - return c.informer.HasSynced() + return c.hasProcessed.HasSynced() } func (c *controller[T]) runWorker() { @@ -220,6 +239,7 @@ func (c *controller[T]) runWorker() { // but the key is invalid so there is no point in doing that) return fmt.Errorf("expected string in workqueue but got %#v", obj) } + defer c.hasProcessed.Finished(key) if err := c.reconcile(key); err != nil { // Put the item back on the workqueue to handle any transient errors. diff --git a/staging/src/k8s.io/apiserver/pkg/admission/plugin/validatingadmissionpolicy/internal/generic/controller_test.go b/staging/src/k8s.io/apiserver/pkg/admission/plugin/validatingadmissionpolicy/internal/generic/controller_test.go index 2d3af72f084..cfd805750c5 100644 --- a/staging/src/k8s.io/apiserver/pkg/admission/plugin/validatingadmissionpolicy/internal/generic/controller_test.go +++ b/staging/src/k8s.io/apiserver/pkg/admission/plugin/validatingadmissionpolicy/internal/generic/controller_test.go @@ -106,6 +106,7 @@ func setupTest(ctx context.Context, customReconciler func(string, string, runtim controller generic.Controller[*unstructured.Unstructured], informer *testInformer, waitForReconcile func(runtime.Object) error, + verifyNoMoreEvents func() bool, ) { tracker = clienttesting.NewObjectTracker(scheme, codecs.UniversalDecoder()) reconciledObjects := make(chan runtime.Object) @@ -127,7 +128,11 @@ func setupTest(ctx context.Context, customReconciler func(string, string, runtim if customReconciler != nil { err = customReconciler(namespace, name, newObj) } - reconciledObjects <- copied + select { + case reconciledObjects <- copied: + case <-ctx.Done(): + panic("timed out attempting to deliver reconcile event") + } return err } @@ -149,23 +154,24 @@ func setupTest(ctx context.Context, customReconciler func(string, string, runtim generic.ControllerOptions{}, ) - go func() { - <-ctx.Done() - close(reconciledObjects) - + verifyNoMoreEvents = func() bool { + close(reconciledObjects) // closing means that a future attempt to send will crash for leftover := range reconciledObjects { panic(fmt.Errorf("leftover object which was not anticipated by test: %v", leftover)) } - }() + // TODO(alexzielenski): this effectively doesn't test anything since the + // controller drops any pending events when it shuts down. + return true + } - return tracker, myController, informer, waitForReconcile + return tracker, myController, informer, waitForReconcile, verifyNoMoreEvents } func TestReconcile(t *testing.T) { testContext, testCancel := context.WithTimeout(context.Background(), 2*time.Second) defer testCancel() - tracker, myController, informer, waitForReconcile := setupTest(testContext, nil) + tracker, myController, informer, waitForReconcile, verifyNoMoreEvents := setupTest(testContext, nil) // Add object to informer initialObject := &unstructured.Unstructured{} @@ -196,11 +202,16 @@ func TestReconcile(t *testing.T) { require.ErrorIs(t, stopReason, context.Canceled) }() - require.True(t, cache.WaitForCacheSync(testContext.Done(), myController.HasSynced)) + // The controller is blocked because the reconcile function sends on an + // unbuffered channel. + require.False(t, myController.HasSynced()) // Wait for all enqueued reconciliations require.NoError(t, waitForReconcile(initialObject)) + // Now it is safe to wait for it to Sync + require.True(t, cache.WaitForCacheSync(testContext.Done(), myController.HasSynced)) + // Updated object updatedObject := &unstructured.Unstructured{} updatedObject.SetUnstructuredContent(map[string]interface{}{ @@ -220,13 +231,15 @@ func TestReconcile(t *testing.T) { testCancel() wg.Wait() + + verifyNoMoreEvents() } func TestShutdown(t *testing.T) { testContext, testCancel := context.WithTimeout(context.Background(), 2*time.Second) defer testCancel() - _, myController, informer, _ := setupTest(testContext, nil) + _, myController, informer, _, verifyNoMoreEvents := setupTest(testContext, nil) wg := sync.WaitGroup{} @@ -256,6 +269,8 @@ func TestShutdown(t *testing.T) { // Ensure the event handler was cleaned up require.Empty(t, informer.registrations) + + verifyNoMoreEvents() } // Show an error is thrown informer isn't started when the controller runs @@ -263,7 +278,7 @@ func TestInformerNeverStarts(t *testing.T) { testContext, testCancel := context.WithTimeout(context.Background(), 400*time.Millisecond) defer testCancel() - _, myController, informer, _ := setupTest(testContext, nil) + _, myController, informer, _, verifyNoMoreEvents := setupTest(testContext, nil) wg := sync.WaitGroup{} @@ -283,6 +298,8 @@ func TestInformerNeverStarts(t *testing.T) { // Ensure there are no event handlers require.Empty(t, informer.registrations) + + verifyNoMoreEvents() } // Shows that if RV does not change, the reconciler does not get called @@ -290,7 +307,7 @@ func TestIgnoredUpdate(t *testing.T) { testContext, testCancel := context.WithTimeout(context.Background(), 2*time.Second) defer testCancel() - tracker, myController, informer, waitForReconcile := setupTest(testContext, nil) + tracker, myController, informer, waitForReconcile, verifyNoMoreEvents := setupTest(testContext, nil) // Add object to informer initialObject := &unstructured.Unstructured{} @@ -321,11 +338,16 @@ func TestIgnoredUpdate(t *testing.T) { require.ErrorIs(t, stopReason, context.Canceled) }() - require.True(t, cache.WaitForCacheSync(testContext.Done(), myController.HasSynced)) + // The controller is blocked because the reconcile function sends on an + // unbuffered channel. + require.False(t, myController.HasSynced()) // Wait for all enqueued reconciliations require.NoError(t, waitForReconcile(initialObject)) + // Now it is safe to wait for it to Sync + require.True(t, cache.WaitForCacheSync(testContext.Done(), myController.HasSynced)) + // Send update with the same object require.NoError(t, tracker.Update(fakeGVR, initialObject, "")) @@ -334,8 +356,9 @@ func TestIgnoredUpdate(t *testing.T) { testCancel() wg.Wait() - // Test infrastructure has logic to panic if there are any reconciled objects - // that weren't "expected" + // TODO(alexzielenski): Find a better way to test this since the + // controller drops any pending events when it shuts down. + verifyNoMoreEvents() } // Shows that an object which fails reconciliation will retry @@ -345,7 +368,7 @@ func TestReconcileRetry(t *testing.T) { calls := atomic.Uint64{} success := atomic.Bool{} - tracker, myController, _, waitForReconcile := setupTest(testContext, func(s1, s2 string, o runtime.Object) error { + tracker, myController, _, waitForReconcile, verifyNoMoreEvents := setupTest(testContext, func(s1, s2 string, o runtime.Object) error { if calls.Add(1) > 2 { // Suddenly start liking the object @@ -390,13 +413,14 @@ func TestReconcileRetry(t *testing.T) { require.True(t, success.Load(), "last call to reconcile should return success") testCancel() wg.Wait() + + verifyNoMoreEvents() } func TestInformerList(t *testing.T) { testContext, testCancel := context.WithTimeout(context.Background(), 2*time.Second) - defer testCancel() - tracker, myController, _, _ := setupTest(testContext, nil) + tracker, myController, _, _, _ := setupTest(testContext, nil) wg := sync.WaitGroup{} @@ -406,7 +430,12 @@ func TestInformerList(t *testing.T) { myController.Informer().Run(testContext.Done()) }() - require.True(t, cache.WaitForCacheSync(testContext.Done(), myController.HasSynced)) + defer func() { + testCancel() + wg.Wait() + }() + + require.True(t, cache.WaitForCacheSync(testContext.Done(), myController.Informer().HasSynced)) object1 := &unstructured.Unstructured{} object1.SetUnstructuredContent(map[string]interface{}{ diff --git a/staging/src/k8s.io/client-go/tools/cache/controller.go b/staging/src/k8s.io/client-go/tools/cache/controller.go index 57b15fea1c9..31b94ea2878 100644 --- a/staging/src/k8s.io/client-go/tools/cache/controller.go +++ b/staging/src/k8s.io/client-go/tools/cache/controller.go @@ -85,7 +85,7 @@ type Config struct { type ShouldResyncFunc func() bool // ProcessFunc processes a single object. -type ProcessFunc func(obj interface{}) error +type ProcessFunc func(obj interface{}, isInInitialList bool) error // `*controller` implements Controller type controller struct { @@ -215,7 +215,7 @@ func (c *controller) processLoop() { // happen if the watch is closed and misses the delete event and we don't // notice the deletion until the subsequent re-list. type ResourceEventHandler interface { - OnAdd(obj interface{}) + OnAdd(obj interface{}, isInInitialList bool) OnUpdate(oldObj, newObj interface{}) OnDelete(obj interface{}) } @@ -224,6 +224,9 @@ type ResourceEventHandler interface { // as few of the notification functions as you want while still implementing // ResourceEventHandler. This adapter does not remove the prohibition against // modifying the objects. +// +// See ResourceEventHandlerDetailedFuncs if your use needs to propagate +// HasSynced. type ResourceEventHandlerFuncs struct { AddFunc func(obj interface{}) UpdateFunc func(oldObj, newObj interface{}) @@ -231,7 +234,7 @@ type ResourceEventHandlerFuncs struct { } // OnAdd calls AddFunc if it's not nil. -func (r ResourceEventHandlerFuncs) OnAdd(obj interface{}) { +func (r ResourceEventHandlerFuncs) OnAdd(obj interface{}, isInInitialList bool) { if r.AddFunc != nil { r.AddFunc(obj) } @@ -251,6 +254,36 @@ func (r ResourceEventHandlerFuncs) OnDelete(obj interface{}) { } } +// ResourceEventHandlerDetailedFuncs is exactly like ResourceEventHandlerFuncs +// except its AddFunc accepts the isInInitialList parameter, for propagating +// HasSynced. +type ResourceEventHandlerDetailedFuncs struct { + AddFunc func(obj interface{}, isInInitialList bool) + UpdateFunc func(oldObj, newObj interface{}) + DeleteFunc func(obj interface{}) +} + +// OnAdd calls AddFunc if it's not nil. +func (r ResourceEventHandlerDetailedFuncs) OnAdd(obj interface{}, isInInitialList bool) { + if r.AddFunc != nil { + r.AddFunc(obj, isInInitialList) + } +} + +// OnUpdate calls UpdateFunc if it's not nil. +func (r ResourceEventHandlerDetailedFuncs) OnUpdate(oldObj, newObj interface{}) { + if r.UpdateFunc != nil { + r.UpdateFunc(oldObj, newObj) + } +} + +// OnDelete calls DeleteFunc if it's not nil. +func (r ResourceEventHandlerDetailedFuncs) OnDelete(obj interface{}) { + if r.DeleteFunc != nil { + r.DeleteFunc(obj) + } +} + // FilteringResourceEventHandler applies the provided filter to all events coming // in, ensuring the appropriate nested handler method is invoked. An object // that starts passing the filter after an update is considered an add, and an @@ -262,11 +295,11 @@ type FilteringResourceEventHandler struct { } // OnAdd calls the nested handler only if the filter succeeds -func (r FilteringResourceEventHandler) OnAdd(obj interface{}) { +func (r FilteringResourceEventHandler) OnAdd(obj interface{}, isInInitialList bool) { if !r.FilterFunc(obj) { return } - r.Handler.OnAdd(obj) + r.Handler.OnAdd(obj, isInInitialList) } // OnUpdate ensures the proper handler is called depending on whether the filter matches @@ -277,7 +310,7 @@ func (r FilteringResourceEventHandler) OnUpdate(oldObj, newObj interface{}) { case newer && older: r.Handler.OnUpdate(oldObj, newObj) case newer && !older: - r.Handler.OnAdd(newObj) + r.Handler.OnAdd(newObj, false) case !newer && older: r.Handler.OnDelete(oldObj) default: @@ -417,6 +450,7 @@ func processDeltas( clientState Store, transformer TransformFunc, deltas Deltas, + isInInitialList bool, ) error { // from oldest to newest for _, d := range deltas { @@ -440,7 +474,7 @@ func processDeltas( if err := clientState.Add(obj); err != nil { return err } - handler.OnAdd(obj) + handler.OnAdd(obj, isInInitialList) } case Deleted: if err := clientState.Delete(obj); err != nil { @@ -488,9 +522,9 @@ func newInformer( FullResyncPeriod: resyncPeriod, RetryOnError: false, - Process: func(obj interface{}) error { + Process: func(obj interface{}, isInInitialList bool) error { if deltas, ok := obj.(Deltas); ok { - return processDeltas(h, clientState, transformer, deltas) + return processDeltas(h, clientState, transformer, deltas, isInInitialList) } return errors.New("object given as Process argument is not Deltas") }, diff --git a/staging/src/k8s.io/client-go/tools/cache/controller_test.go b/staging/src/k8s.io/client-go/tools/cache/controller_test.go index cf42478e057..e59a5c96650 100644 --- a/staging/src/k8s.io/client-go/tools/cache/controller_test.go +++ b/staging/src/k8s.io/client-go/tools/cache/controller_test.go @@ -62,7 +62,7 @@ func Example() { // Let's implement a simple controller that just deletes // everything that comes in. - Process: func(obj interface{}) error { + Process: func(obj interface{}, isInInitialList bool) error { // Obj is from the Pop method of the Queue we make above. newest := obj.(Deltas).Newest() @@ -137,8 +137,8 @@ func ExampleNewInformer() { source, &v1.Pod{}, time.Millisecond*100, - ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { + ResourceEventHandlerDetailedFuncs{ + AddFunc: func(obj interface{}, isInInitialList bool) { source.Delete(obj.(runtime.Object)) }, DeleteFunc: func(obj interface{}) { @@ -213,8 +213,8 @@ func TestHammerController(t *testing.T) { source, &v1.Pod{}, time.Millisecond*100, - ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { recordFunc("add", obj) }, + ResourceEventHandlerDetailedFuncs{ + AddFunc: func(obj interface{}, isInInitialList bool) { recordFunc("add", obj) }, UpdateFunc: func(oldObj, newObj interface{}) { recordFunc("update", newObj) }, DeleteFunc: func(obj interface{}) { recordFunc("delete", obj) }, }, @@ -416,8 +416,8 @@ func TestPanicPropagated(t *testing.T) { source, &v1.Pod{}, time.Millisecond*100, - ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { + ResourceEventHandlerDetailedFuncs{ + AddFunc: func(obj interface{}, isInInitialList bool) { // Create a panic. panic("Just panic.") }, @@ -526,8 +526,8 @@ func TestTransformingInformer(t *testing.T) { source, &v1.Pod{}, 0, - ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { recordEvent(watch.Added, nil, obj) }, + ResourceEventHandlerDetailedFuncs{ + AddFunc: func(obj interface{}, isInInitialList bool) { recordEvent(watch.Added, nil, obj) }, UpdateFunc: func(oldObj, newObj interface{}) { recordEvent(watch.Modified, oldObj, newObj) }, DeleteFunc: func(obj interface{}) { recordEvent(watch.Deleted, obj, nil) }, }, diff --git a/staging/src/k8s.io/client-go/tools/cache/delta_fifo.go b/staging/src/k8s.io/client-go/tools/cache/delta_fifo.go index 0c13a41f065..c4f2de7b251 100644 --- a/staging/src/k8s.io/client-go/tools/cache/delta_fifo.go +++ b/staging/src/k8s.io/client-go/tools/cache/delta_fifo.go @@ -271,6 +271,10 @@ func (f *DeltaFIFO) KeyOf(obj interface{}) (string, error) { func (f *DeltaFIFO) HasSynced() bool { f.lock.Lock() defer f.lock.Unlock() + return f.hasSynced_locked() +} + +func (f *DeltaFIFO) hasSynced_locked() bool { return f.populated && f.initialPopulationCount == 0 } @@ -526,6 +530,7 @@ func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) { f.cond.Wait() } + isInInitialList := !f.hasSynced_locked() id := f.queue[0] f.queue = f.queue[1:] depth := len(f.queue) @@ -551,7 +556,7 @@ func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) { utiltrace.Field{Key: "Reason", Value: "slow event handlers blocking the queue"}) defer trace.LogIfLong(100 * time.Millisecond) } - err := process(item) + err := process(item, isInInitialList) if e, ok := err.(ErrRequeue); ok { f.addIfNotPresent(id, item) err = e.Err diff --git a/staging/src/k8s.io/client-go/tools/cache/delta_fifo_test.go b/staging/src/k8s.io/client-go/tools/cache/delta_fifo_test.go index f17240da5c8..902fcaedb1d 100644 --- a/staging/src/k8s.io/client-go/tools/cache/delta_fifo_test.go +++ b/staging/src/k8s.io/client-go/tools/cache/delta_fifo_test.go @@ -125,7 +125,7 @@ func TestDeltaFIFO_requeueOnPop(t *testing.T) { f := NewDeltaFIFOWithOptions(DeltaFIFOOptions{KeyFunction: testFifoObjectKeyFunc}) f.Add(mkFifoObj("foo", 10)) - _, err := f.Pop(func(obj interface{}) error { + _, err := f.Pop(func(obj interface{}, isInInitialList bool) error { if obj.(Deltas)[0].Object.(testFifoObject).name != "foo" { t.Fatalf("unexpected object: %#v", obj) } @@ -138,7 +138,7 @@ func TestDeltaFIFO_requeueOnPop(t *testing.T) { t.Fatalf("object should have been requeued: %t %v", ok, err) } - _, err = f.Pop(func(obj interface{}) error { + _, err = f.Pop(func(obj interface{}, isInInitialList bool) error { if obj.(Deltas)[0].Object.(testFifoObject).name != "foo" { t.Fatalf("unexpected object: %#v", obj) } @@ -151,7 +151,7 @@ func TestDeltaFIFO_requeueOnPop(t *testing.T) { t.Fatalf("object should have been requeued: %t %v", ok, err) } - _, err = f.Pop(func(obj interface{}) error { + _, err = f.Pop(func(obj interface{}, isInInitialList bool) error { if obj.(Deltas)[0].Object.(testFifoObject).name != "foo" { t.Fatalf("unexpected object: %#v", obj) } @@ -480,6 +480,18 @@ func TestDeltaFIFO_UpdateResyncRace(t *testing.T) { } } +// pop2 captures both parameters, unlike Pop(). +func pop2[T any](queue Queue) (T, bool) { + var result interface{} + var isList bool + queue.Pop(func(obj interface{}, isInInitialList bool) error { + result = obj + isList = isInInitialList + return nil + }) + return result.(T), isList +} + func TestDeltaFIFO_HasSyncedCorrectOnDeletion(t *testing.T) { f := NewDeltaFIFOWithOptions(DeltaFIFOOptions{ KeyFunction: testFifoObjectKeyFunc, @@ -501,10 +513,13 @@ func TestDeltaFIFO_HasSyncedCorrectOnDeletion(t *testing.T) { if f.HasSynced() { t.Errorf("Expected HasSynced to be false") } - cur := Pop(f).(Deltas) + cur, initial := pop2[Deltas](f) if e, a := expected, cur; !reflect.DeepEqual(e, a) { t.Errorf("Expected %#v, got %#v", e, a) } + if initial != true { + t.Error("Expected initial list item") + } } if !f.HasSynced() { t.Errorf("Expected HasSynced to be true") @@ -676,7 +691,7 @@ func TestDeltaFIFO_PopShouldUnblockWhenClosed(t *testing.T) { const jobs = 10 for i := 0; i < jobs; i++ { go func() { - f.Pop(func(obj interface{}) error { + f.Pop(func(obj interface{}, isInInitialList bool) error { return nil }) c <- struct{}{} diff --git a/staging/src/k8s.io/client-go/tools/cache/fifo.go b/staging/src/k8s.io/client-go/tools/cache/fifo.go index 8f3313783d5..dd13c4ea774 100644 --- a/staging/src/k8s.io/client-go/tools/cache/fifo.go +++ b/staging/src/k8s.io/client-go/tools/cache/fifo.go @@ -25,7 +25,7 @@ import ( // PopProcessFunc is passed to Pop() method of Queue interface. // It is supposed to process the accumulator popped from the queue. -type PopProcessFunc func(interface{}) error +type PopProcessFunc func(obj interface{}, isInInitialList bool) error // ErrRequeue may be returned by a PopProcessFunc to safely requeue // the current item. The value of Err will be returned from Pop. @@ -82,9 +82,12 @@ type Queue interface { // Pop is helper function for popping from Queue. // WARNING: Do NOT use this function in non-test code to avoid races // unless you really really really really know what you are doing. +// +// NOTE: This function is deprecated and may be removed in the future without +// additional warning. func Pop(queue Queue) interface{} { var result interface{} - queue.Pop(func(obj interface{}) error { + queue.Pop(func(obj interface{}, isInInitialList bool) error { result = obj return nil }) @@ -149,6 +152,10 @@ func (f *FIFO) Close() { func (f *FIFO) HasSynced() bool { f.lock.Lock() defer f.lock.Unlock() + return f.hasSynced_locked() +} + +func (f *FIFO) hasSynced_locked() bool { return f.populated && f.initialPopulationCount == 0 } @@ -287,6 +294,7 @@ func (f *FIFO) Pop(process PopProcessFunc) (interface{}, error) { f.cond.Wait() } + isInInitialList := !f.hasSynced_locked() id := f.queue[0] f.queue = f.queue[1:] if f.initialPopulationCount > 0 { @@ -298,7 +306,7 @@ func (f *FIFO) Pop(process PopProcessFunc) (interface{}, error) { continue } delete(f.items, id) - err := process(item) + err := process(item, isInInitialList) if e, ok := err.(ErrRequeue); ok { f.addIfNotPresent(id, item) err = e.Err diff --git a/staging/src/k8s.io/client-go/tools/cache/fifo_test.go b/staging/src/k8s.io/client-go/tools/cache/fifo_test.go index 16b8502f4b3..655f1378539 100644 --- a/staging/src/k8s.io/client-go/tools/cache/fifo_test.go +++ b/staging/src/k8s.io/client-go/tools/cache/fifo_test.go @@ -76,7 +76,7 @@ func TestFIFO_requeueOnPop(t *testing.T) { f := NewFIFO(testFifoObjectKeyFunc) f.Add(mkFifoObj("foo", 10)) - _, err := f.Pop(func(obj interface{}) error { + _, err := f.Pop(func(obj interface{}, isInInitialList bool) error { if obj.(testFifoObject).name != "foo" { t.Fatalf("unexpected object: %#v", obj) } @@ -89,7 +89,7 @@ func TestFIFO_requeueOnPop(t *testing.T) { t.Fatalf("object should have been requeued: %t %v", ok, err) } - _, err = f.Pop(func(obj interface{}) error { + _, err = f.Pop(func(obj interface{}, isInInitialList bool) error { if obj.(testFifoObject).name != "foo" { t.Fatalf("unexpected object: %#v", obj) } @@ -102,7 +102,7 @@ func TestFIFO_requeueOnPop(t *testing.T) { t.Fatalf("object should have been requeued: %t %v", ok, err) } - _, err = f.Pop(func(obj interface{}) error { + _, err = f.Pop(func(obj interface{}, isInInitialList bool) error { if obj.(testFifoObject).name != "foo" { t.Fatalf("unexpected object: %#v", obj) } @@ -289,7 +289,7 @@ func TestFIFO_PopShouldUnblockWhenClosed(t *testing.T) { const jobs = 10 for i := 0; i < jobs; i++ { go func() { - f.Pop(func(obj interface{}) error { + f.Pop(func(obj interface{}, isInInitialList bool) error { return nil }) c <- struct{}{} diff --git a/staging/src/k8s.io/client-go/tools/cache/processor_listener_test.go b/staging/src/k8s.io/client-go/tools/cache/processor_listener_test.go index 1da73420f05..fd658197d47 100644 --- a/staging/src/k8s.io/client-go/tools/cache/processor_listener_test.go +++ b/staging/src/k8s.io/client-go/tools/cache/processor_listener_test.go @@ -39,7 +39,7 @@ func BenchmarkListener(b *testing.B) { AddFunc: func(obj interface{}) { swg.Done() }, - }, 0, 0, time.Now(), 1024*1024) + }, 0, 0, time.Now(), 1024*1024, func() bool { return true }) var wg wait.Group defer wg.Wait() // Wait for .run and .pop to stop defer close(pl.addCh) // Tell .run and .pop to stop diff --git a/staging/src/k8s.io/client-go/tools/cache/shared_informer.go b/staging/src/k8s.io/client-go/tools/cache/shared_informer.go index 0e39f2a7eea..2717bde589c 100644 --- a/staging/src/k8s.io/client-go/tools/cache/shared_informer.go +++ b/staging/src/k8s.io/client-go/tools/cache/shared_informer.go @@ -26,6 +26,7 @@ import ( "k8s.io/apimachinery/pkg/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/tools/cache/synctrack" "k8s.io/utils/buffer" "k8s.io/utils/clock" @@ -132,11 +133,13 @@ import ( // state, except that its ResourceVersion is replaced with a // ResourceVersion in which the object is actually absent. type SharedInformer interface { - // AddEventHandler adds an event handler to the shared informer using the shared informer's resync - // period. Events to a single handler are delivered sequentially, but there is no coordination - // between different handlers. - // It returns a registration handle for the handler that can be used to remove - // the handler again. + // AddEventHandler adds an event handler to the shared informer using + // the shared informer's resync period. Events to a single handler are + // delivered sequentially, but there is no coordination between + // different handlers. + // It returns a registration handle for the handler that can be used to + // remove the handler again, or to tell if the handler is synced (has + // seen every item in the initial list). AddEventHandler(handler ResourceEventHandler) (ResourceEventHandlerRegistration, error) // AddEventHandlerWithResyncPeriod adds an event handler to the // shared informer with the requested resync period; zero means @@ -169,6 +172,10 @@ type SharedInformer interface { // HasSynced returns true if the shared informer's store has been // informed by at least one full LIST of the authoritative state // of the informer's object collection. This is unrelated to "resync". + // + // Note that this doesn't tell you if an individual handler is synced!! + // For that, please call HasSynced on the handle returned by + // AddEventHandler. HasSynced() bool // LastSyncResourceVersion is the resource version observed when last synced with the underlying // store. The value returned is not synchronized with access to the underlying store and is not @@ -213,7 +220,14 @@ type SharedInformer interface { // Opaque interface representing the registration of ResourceEventHandler for // a SharedInformer. Must be supplied back to the same SharedInformer's // `RemoveEventHandler` to unregister the handlers. -type ResourceEventHandlerRegistration interface{} +// +// Also used to tell if the handler is synced (has had all items in the initial +// list delivered). +type ResourceEventHandlerRegistration interface { + // HasSynced reports if both the parent has synced and all pre-sync + // events have been delivered. + HasSynced() bool +} // SharedIndexInformer provides add and get Indexers ability based on SharedInformer. type SharedIndexInformer interface { @@ -409,7 +423,8 @@ type updateNotification struct { } type addNotification struct { - newObj interface{} + newObj interface{} + isInInitialList bool } type deleteNotification struct { @@ -588,7 +603,7 @@ func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEv } } - listener := newProcessListener(handler, resyncPeriod, determineResyncPeriod(resyncPeriod, s.resyncCheckPeriod), s.clock.Now(), initialBufferSize) + listener := newProcessListener(handler, resyncPeriod, determineResyncPeriod(resyncPeriod, s.resyncCheckPeriod), s.clock.Now(), initialBufferSize, s.HasSynced) if !s.started { return s.processor.addListener(listener), nil @@ -604,27 +619,35 @@ func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEv handle := s.processor.addListener(listener) for _, item := range s.indexer.List() { - listener.add(addNotification{newObj: item}) + // Note that we enqueue these notifications with the lock held + // and before returning the handle. That means there is never a + // chance for anyone to call the handle's HasSynced method in a + // state when it would falsely return true (i.e., when the + // shared informer is synced but it has not observed an Add + // with isInitialList being true, nor when the thread + // processing notifications somehow goes faster than this + // thread adding them and the counter is temporarily zero). + listener.add(addNotification{newObj: item, isInInitialList: true}) } return handle, nil } -func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error { +func (s *sharedIndexInformer) HandleDeltas(obj interface{}, isInInitialList bool) error { s.blockDeltas.Lock() defer s.blockDeltas.Unlock() if deltas, ok := obj.(Deltas); ok { - return processDeltas(s, s.indexer, s.transform, deltas) + return processDeltas(s, s.indexer, s.transform, deltas, isInInitialList) } return errors.New("object given as Process argument is not Deltas") } // Conforms to ResourceEventHandler -func (s *sharedIndexInformer) OnAdd(obj interface{}) { +func (s *sharedIndexInformer) OnAdd(obj interface{}, isInInitialList bool) { // Invocation of this function is locked under s.blockDeltas, so it is // save to distribute the notification s.cacheMutationDetector.AddObject(obj) - s.processor.distribute(addNotification{newObj: obj}, false) + s.processor.distribute(addNotification{newObj: obj, isInInitialList: isInInitialList}, false) } // Conforms to ResourceEventHandler @@ -846,6 +869,8 @@ type processorListener struct { handler ResourceEventHandler + syncTracker *synctrack.SingleFileTracker + // pendingNotifications is an unbounded ring buffer that holds all notifications not yet distributed. // There is one per listener, but a failing/stalled listener will have infinite pendingNotifications // added until we OOM. @@ -876,11 +901,18 @@ type processorListener struct { resyncLock sync.Mutex } -func newProcessListener(handler ResourceEventHandler, requestedResyncPeriod, resyncPeriod time.Duration, now time.Time, bufferSize int) *processorListener { +// HasSynced returns true if the source informer has synced, and all +// corresponding events have been delivered. +func (p *processorListener) HasSynced() bool { + return p.syncTracker.HasSynced() +} + +func newProcessListener(handler ResourceEventHandler, requestedResyncPeriod, resyncPeriod time.Duration, now time.Time, bufferSize int, hasSynced func() bool) *processorListener { ret := &processorListener{ nextCh: make(chan interface{}), addCh: make(chan interface{}), handler: handler, + syncTracker: &synctrack.SingleFileTracker{UpstreamHasSynced: hasSynced}, pendingNotifications: *buffer.NewRingGrowing(bufferSize), requestedResyncPeriod: requestedResyncPeriod, resyncPeriod: resyncPeriod, @@ -892,6 +924,9 @@ func newProcessListener(handler ResourceEventHandler, requestedResyncPeriod, res } func (p *processorListener) add(notification interface{}) { + if a, ok := notification.(addNotification); ok && a.isInInitialList { + p.syncTracker.Start() + } p.addCh <- notification } @@ -937,7 +972,10 @@ func (p *processorListener) run() { case updateNotification: p.handler.OnUpdate(notification.oldObj, notification.newObj) case addNotification: - p.handler.OnAdd(notification.newObj) + p.handler.OnAdd(notification.newObj, notification.isInInitialList) + if notification.isInInitialList { + p.syncTracker.Finished() + } case deleteNotification: p.handler.OnDelete(notification.oldObj) default: diff --git a/staging/src/k8s.io/client-go/tools/cache/shared_informer_test.go b/staging/src/k8s.io/client-go/tools/cache/shared_informer_test.go index 2676e8f54c7..71f154b0d49 100644 --- a/staging/src/k8s.io/client-go/tools/cache/shared_informer_test.go +++ b/staging/src/k8s.io/client-go/tools/cache/shared_informer_test.go @@ -52,7 +52,7 @@ func newTestListener(name string, resyncPeriod time.Duration, expected ...string return l } -func (l *testListener) OnAdd(obj interface{}) { +func (l *testListener) OnAdd(obj interface{}, isInInitialList bool) { l.handle(obj) } @@ -68,7 +68,6 @@ func (l *testListener) handle(obj interface{}) { fmt.Printf("%s: handle: %v\n", l.name, key) l.lock.Lock() defer l.lock.Unlock() - objectMeta, _ := meta.Accessor(obj) l.receivedItemNames = append(l.receivedItemNames, objectMeta.GetName()) } @@ -649,8 +648,8 @@ func TestSharedInformerHandlerAbuse(t *testing.T) { worker := func() { // Keep adding and removing handler // Make sure no duplicate events? - funcs := ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) {}, + funcs := ResourceEventHandlerDetailedFuncs{ + AddFunc: func(obj interface{}, isInInitialList bool) {}, UpdateFunc: func(oldObj, newObj interface{}) {}, DeleteFunc: func(obj interface{}) {}, } @@ -902,9 +901,13 @@ func TestAddWhileActive(t *testing.T) { // create the shared informer and resync every 12 hours informer := NewSharedInformer(source, &v1.Pod{}, 0).(*sharedIndexInformer) listener1 := newTestListener("originalListener", 0, "pod1") - listener2 := newTestListener("originalListener", 0, "pod1", "pod2") + listener2 := newTestListener("listener2", 0, "pod1", "pod2") handle1, _ := informer.AddEventHandler(listener1) + if handle1.HasSynced() { + t.Error("Synced before Run??") + } + stop := make(chan struct{}) defer close(stop) @@ -916,7 +919,17 @@ func TestAddWhileActive(t *testing.T) { return } + if !handle1.HasSynced() { + t.Error("Not synced after Run??") + } + + listener2.lock.Lock() // ensure we observe it before it has synced handle2, _ := informer.AddEventHandler(listener2) + if handle2.HasSynced() { + t.Error("Synced before processing anything?") + } + listener2.lock.Unlock() // permit it to proceed and sync + source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod2"}}) if !listener2.ok() { @@ -924,6 +937,10 @@ func TestAddWhileActive(t *testing.T) { return } + if !handle2.HasSynced() { + t.Error("Not synced even after processing?") + } + if !isRegistered(informer, handle1) { t.Errorf("handle1 is not active") return diff --git a/staging/src/k8s.io/client-go/tools/cache/synctrack/synctrack.go b/staging/src/k8s.io/client-go/tools/cache/synctrack/synctrack.go new file mode 100644 index 00000000000..c488b497ff1 --- /dev/null +++ b/staging/src/k8s.io/client-go/tools/cache/synctrack/synctrack.go @@ -0,0 +1,116 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package synctrack contains utilities for helping controllers track whether +// they are "synced" or not, that is, whether they have processed all items +// from the informer's initial list. +package synctrack + +import ( + "sync" + "sync/atomic" + + "k8s.io/apimachinery/pkg/util/sets" +) + +// AsyncTracker helps propagate HasSynced in the face of multiple worker threads. +type AsyncTracker[T comparable] struct { + UpstreamHasSynced func() bool + + lock sync.Mutex + waiting sets.Set[T] +} + +// Start should be called prior to processing each key which is part of the +// initial list. +func (t *AsyncTracker[T]) Start(key T) { + t.lock.Lock() + defer t.lock.Unlock() + if t.waiting == nil { + t.waiting = sets.New[T](key) + } else { + t.waiting.Insert(key) + } +} + +// Finished should be called when finished processing a key which was part of +// the initial list. Since keys are tracked individually, nothing bad happens +// if you call Finished without a corresponding call to Start. This makes it +// easier to use this in combination with e.g. queues which don't make it easy +// to plumb through the isInInitialList boolean. +func (t *AsyncTracker[T]) Finished(key T) { + t.lock.Lock() + defer t.lock.Unlock() + if t.waiting != nil { + t.waiting.Delete(key) + } +} + +// HasSynced returns true if the source is synced and every key present in the +// initial list has been processed. This relies on the source not considering +// itself synced until *after* it has delivered the notification for the last +// key, and that notification handler must have called Start. +func (t *AsyncTracker[T]) HasSynced() bool { + // Call UpstreamHasSynced first: it might take a lock, which might take + // a significant amount of time, and we can't hold our lock while + // waiting on that or a user is likely to get a deadlock. + if !t.UpstreamHasSynced() { + return false + } + t.lock.Lock() + defer t.lock.Unlock() + return t.waiting.Len() == 0 +} + +// SingleFileTracker helps propagate HasSynced when events are processed in +// order (i.e. via a queue). +type SingleFileTracker struct { + UpstreamHasSynced func() bool + + count int64 +} + +// Start should be called prior to processing each key which is part of the +// initial list. +func (t *SingleFileTracker) Start() { + atomic.AddInt64(&t.count, 1) +} + +// Finished should be called when finished processing a key which was part of +// the initial list. You must never call Finished() before (or without) its +// corresponding Start(), that is a logic error that could cause HasSynced to +// return a wrong value. To help you notice this should it happen, Finished() +// will panic if the internal counter goes negative. +func (t *SingleFileTracker) Finished() { + result := atomic.AddInt64(&t.count, -1) + if result < 0 { + panic("synctrack: negative counter; this logic error means HasSynced may return incorrect value") + } +} + +// HasSynced returns true if the source is synced and every key present in the +// initial list has been processed. This relies on the source not considering +// itself synced until *after* it has delivered the notification for the last +// key, and that notification handler must have called Start. +func (t *SingleFileTracker) HasSynced() bool { + // Call UpstreamHasSynced first: it might take a lock, which might take + // a significant amount of time, and we don't want to then act on a + // stale count value. + if !t.UpstreamHasSynced() { + return false + } + return atomic.LoadInt64(&t.count) <= 0 +} diff --git a/staging/src/k8s.io/client-go/tools/cache/synctrack/synctrack_test.go b/staging/src/k8s.io/client-go/tools/cache/synctrack/synctrack_test.go new file mode 100644 index 00000000000..4cf089e225c --- /dev/null +++ b/staging/src/k8s.io/client-go/tools/cache/synctrack/synctrack_test.go @@ -0,0 +1,239 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package synctrack + +import ( + "strings" + "sync" + "time" + + "testing" +) + +func testSingleFileFuncs(upstreamHasSynced func() bool) (start func(), finished func(), hasSynced func() bool) { + tracker := SingleFileTracker{ + UpstreamHasSynced: upstreamHasSynced, + } + return tracker.Start, tracker.Finished, tracker.HasSynced +} + +func testAsyncFuncs(upstreamHasSynced func() bool) (start func(), finished func(), hasSynced func() bool) { + tracker := AsyncTracker[string]{ + UpstreamHasSynced: upstreamHasSynced, + } + return func() { tracker.Start("key") }, func() { tracker.Finished("key") }, tracker.HasSynced +} + +func TestBasicLogic(t *testing.T) { + table := []struct { + name string + construct func(func() bool) (func(), func(), func() bool) + }{ + {"SingleFile", testSingleFileFuncs}, + {"Async", testAsyncFuncs}, + } + + for _, entry := range table { + t.Run(entry.name, func(t *testing.T) { + table := []struct { + synced bool + start bool + finish bool + expectSynced bool + }{ + {false, true, true, false}, + {true, true, false, false}, + {false, true, false, false}, + {true, true, true, true}, + } + for _, tt := range table { + Start, Finished, HasSynced := entry.construct(func() bool { return tt.synced }) + if tt.start { + Start() + } + if tt.finish { + Finished() + } + got := HasSynced() + if e, a := tt.expectSynced, got; e != a { + t.Errorf("for %#v got %v (wanted %v)", tt, a, e) + } + } + }) + } +} + +func TestAsyncLocking(t *testing.T) { + aft := AsyncTracker[int]{UpstreamHasSynced: func() bool { return true }} + + var wg sync.WaitGroup + for _, i := range []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10} { + wg.Add(1) + go func(i int) { + aft.Start(i) + go func() { + aft.Finished(i) + wg.Done() + }() + }(i) + } + wg.Wait() + if !aft.HasSynced() { + t.Errorf("async tracker must have made a threading error?") + } + +} + +func TestSingleFileCounting(t *testing.T) { + sft := SingleFileTracker{UpstreamHasSynced: func() bool { return true }} + + for i := 0; i < 100; i++ { + sft.Start() + } + if sft.HasSynced() { + t.Fatal("Unexpectedly synced?") + } + for i := 0; i < 99; i++ { + sft.Finished() + } + if sft.HasSynced() { + t.Fatal("Unexpectedly synced?") + } + + sft.Finished() + if !sft.HasSynced() { + t.Fatal("Unexpectedly not synced?") + } + + // Calling an extra time will panic. + func() { + defer func() { + x := recover() + if x == nil { + t.Error("no panic?") + return + } + msg, ok := x.(string) + if !ok { + t.Errorf("unexpected panic value: %v", x) + return + } + if !strings.Contains(msg, "negative counter") { + t.Errorf("unexpected panic message: %v", msg) + return + } + }() + sft.Finished() + }() + + // Negative counter still means it is synced + if !sft.HasSynced() { + t.Fatal("Unexpectedly not synced?") + } +} + +func TestSingleFile(t *testing.T) { + table := []struct { + synced bool + starts int + stops int + expectSynced bool + }{ + {false, 1, 1, false}, + {true, 1, 0, false}, + {false, 1, 0, false}, + {true, 1, 1, true}, + } + for _, tt := range table { + sft := SingleFileTracker{UpstreamHasSynced: func() bool { return tt.synced }} + for i := 0; i < tt.starts; i++ { + sft.Start() + } + for i := 0; i < tt.stops; i++ { + sft.Finished() + } + got := sft.HasSynced() + if e, a := tt.expectSynced, got; e != a { + t.Errorf("for %#v got %v (wanted %v)", tt, a, e) + } + } + +} + +func TestNoStaleValue(t *testing.T) { + table := []struct { + name string + construct func(func() bool) (func(), func(), func() bool) + }{ + {"SingleFile", testSingleFileFuncs}, + {"Async", testAsyncFuncs}, + } + + for _, entry := range table { + t.Run(entry.name, func(t *testing.T) { + var lock sync.Mutex + upstreamHasSynced := func() bool { + lock.Lock() + defer lock.Unlock() + return true + } + + Start, Finished, HasSynced := entry.construct(upstreamHasSynced) + + // Ordinarily the corresponding lock would be held and you wouldn't be + // able to call this function at this point. + if !HasSynced() { + t.Fatal("Unexpectedly not synced??") + } + + Start() + if HasSynced() { + t.Fatal("Unexpectedly synced??") + } + Finished() + if !HasSynced() { + t.Fatal("Unexpectedly not synced??") + } + + // Now we will prove that if the lock is held, you can't get a false + // HasSynced return. + lock.Lock() + + // This goroutine calls HasSynced + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + if HasSynced() { + t.Error("Unexpectedly synced??") + } + }() + + // This goroutine increments + unlocks. The sleep is to bias the + // runtime such that the other goroutine usually wins (it needs to work + // in both orderings, this one is more likely to be buggy). + go func() { + time.Sleep(time.Millisecond) + Start() + lock.Unlock() + }() + + wg.Wait() + }) + } + +} diff --git a/staging/src/k8s.io/kube-aggregator/pkg/controllers/status/available_controller.go b/staging/src/k8s.io/kube-aggregator/pkg/controllers/status/available_controller.go index ba69c4dc369..7b84df3fb85 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/controllers/status/available_controller.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/controllers/status/available_controller.go @@ -156,11 +156,8 @@ func NewAvailableConditionController( c := &AvailableConditionController{ apiServiceClient: apiServiceClient, apiServiceLister: apiServiceInformer.Lister(), - apiServiceSynced: apiServiceInformer.Informer().HasSynced, serviceLister: serviceInformer.Lister(), - servicesSynced: serviceInformer.Informer().HasSynced, endpointsLister: endpointsInformer.Lister(), - endpointsSynced: endpointsInformer.Informer().HasSynced, serviceResolver: serviceResolver, queue: workqueue.NewNamedRateLimitingQueue( // We want a fairly tight requeue time. The controller listens to the API, but because it relies on the routability of the @@ -189,25 +186,28 @@ func NewAvailableConditionController( // allows us to detect health in a more timely fashion when network connectivity to // nodes is snipped, but the network still attempts to route there. See // https://github.com/openshift/origin/issues/17159#issuecomment-341798063 - apiServiceInformer.Informer().AddEventHandlerWithResyncPeriod( + apiServiceHandler, _ := apiServiceInformer.Informer().AddEventHandlerWithResyncPeriod( cache.ResourceEventHandlerFuncs{ AddFunc: c.addAPIService, UpdateFunc: c.updateAPIService, DeleteFunc: c.deleteAPIService, }, 30*time.Second) + c.apiServiceSynced = apiServiceHandler.HasSynced - serviceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + serviceHandler, _ := serviceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: c.addService, UpdateFunc: c.updateService, DeleteFunc: c.deleteService, }) + c.servicesSynced = serviceHandler.HasSynced - endpointsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + endpointsHandler, _ := endpointsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: c.addEndpoints, UpdateFunc: c.updateEndpoints, DeleteFunc: c.deleteEndpoints, }) + c.endpointsSynced = endpointsHandler.HasSynced c.syncFn = c.sync @@ -494,6 +494,10 @@ func (c *AvailableConditionController) Run(workers int, stopCh <-chan struct{}) klog.Info("Starting AvailableConditionController") defer klog.Info("Shutting down AvailableConditionController") + // This waits not just for the informers to sync, but for our handlers + // to be called; since the handlers are three different ways of + // enqueueing the same thing, waiting for this permits the queue to + // maximally de-duplicate the entries. if !controllers.WaitForCacheSync("AvailableConditionController", stopCh, c.apiServiceSynced, c.servicesSynced, c.endpointsSynced) { return } diff --git a/test/integration/client/exec_test.go b/test/integration/client/exec_test.go index dda8cf37b4d..45faf5ce902 100644 --- a/test/integration/client/exec_test.go +++ b/test/integration/client/exec_test.go @@ -562,7 +562,7 @@ type informerSpy struct { deletes []interface{} } -func (is *informerSpy) OnAdd(obj interface{}) { +func (is *informerSpy) OnAdd(obj interface{}, isInInitialList bool) { is.mu.Lock() defer is.mu.Unlock() is.adds = append(is.adds, obj) diff --git a/vendor/modules.txt b/vendor/modules.txt index e9a85c0c571..b47c4dc859f 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1906,6 +1906,7 @@ k8s.io/client-go/testing k8s.io/client-go/third_party/forked/golang/template k8s.io/client-go/tools/auth k8s.io/client-go/tools/cache +k8s.io/client-go/tools/cache/synctrack k8s.io/client-go/tools/clientcmd k8s.io/client-go/tools/clientcmd/api k8s.io/client-go/tools/clientcmd/api/latest