Merge pull request #113985 from lavalamp/improved-has-synced

Propagate HasSynced properly
This commit is contained in:
Kubernetes Prow Robot 2022-12-15 12:15:47 -08:00 committed by GitHub
commit 843b40aeb4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 653 additions and 175 deletions

View File

@ -125,13 +125,13 @@ type conversionEventHandler struct {
handler cache.ResourceEventHandler
}
func (h conversionEventHandler) OnAdd(obj interface{}) {
func (h conversionEventHandler) OnAdd(obj interface{}, isInInitialList bool) {
rs, err := convertRCtoRS(obj.(*v1.ReplicationController), nil)
if err != nil {
utilruntime.HandleError(fmt.Errorf("dropping RC OnAdd event: can't convert object %#v to RS: %v", obj, err))
return
}
h.handler.OnAdd(rs)
h.handler.OnAdd(rs, isInInitialList)
}
func (h conversionEventHandler) OnUpdate(oldObj, newObj interface{}) {

View File

@ -68,18 +68,18 @@ func NewServiceAccountsController(saInformer coreinformers.ServiceAccountInforme
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "serviceaccount"),
}
saInformer.Informer().AddEventHandlerWithResyncPeriod(cache.ResourceEventHandlerFuncs{
saHandler, _ := saInformer.Informer().AddEventHandlerWithResyncPeriod(cache.ResourceEventHandlerFuncs{
DeleteFunc: e.serviceAccountDeleted,
}, options.ServiceAccountResync)
e.saLister = saInformer.Lister()
e.saListerSynced = saInformer.Informer().HasSynced
e.saListerSynced = saHandler.HasSynced
nsInformer.Informer().AddEventHandlerWithResyncPeriod(cache.ResourceEventHandlerFuncs{
nsHandler, _ := nsInformer.Informer().AddEventHandlerWithResyncPeriod(cache.ResourceEventHandlerFuncs{
AddFunc: e.namespaceAdded,
UpdateFunc: e.namespaceUpdated,
}, options.NamespaceResync)
e.nsLister = nsInformer.Lister()
e.nsListerSynced = nsInformer.Informer().HasSynced
e.nsListerSynced = nsHandler.HasSynced
e.syncHandler = e.syncNamespace

View File

@ -44,30 +44,26 @@ func AddGraphEventHandlers(
graph: graph,
}
var hasSynced []cache.InformerSynced
pods.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
podHandler, _ := pods.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: g.addPod,
UpdateFunc: g.updatePod,
DeleteFunc: g.deletePod,
})
hasSynced = append(hasSynced, pods.Informer().HasSynced)
pvs.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
pvsHandler, _ := pvs.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: g.addPV,
UpdateFunc: g.updatePV,
DeleteFunc: g.deletePV,
})
hasSynced = append(hasSynced, pvs.Informer().HasSynced)
attachments.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
attachHandler, _ := attachments.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: g.addVolumeAttachment,
UpdateFunc: g.updateVolumeAttachment,
DeleteFunc: g.deleteVolumeAttachment,
})
hasSynced = append(hasSynced, attachments.Informer().HasSynced)
go cache.WaitForNamedCacheSync("node_authorizer", wait.NeverStop, hasSynced...)
go cache.WaitForNamedCacheSync("node_authorizer", wait.NeverStop,
podHandler.HasSynced, pvsHandler.HasSynced, attachHandler.HasSynced)
}
func (g *graphPopulator) addPod(obj interface{}) {

View File

@ -36,11 +36,6 @@ type mutatingWebhookConfigurationManager struct {
configuration *atomic.Value
lister admissionregistrationlisters.MutatingWebhookConfigurationLister
hasSynced func() bool
// initialConfigurationSynced tracks if
// the existing webhook configs have been synced (honored) by the
// manager at startup-- the informer has synced and either has no items
// or has finished executing updateConfiguration() once.
initialConfigurationSynced *atomic.Bool
}
var _ generic.Source = &mutatingWebhookConfigurationManager{}
@ -48,23 +43,25 @@ var _ generic.Source = &mutatingWebhookConfigurationManager{}
func NewMutatingWebhookConfigurationManager(f informers.SharedInformerFactory) generic.Source {
informer := f.Admissionregistration().V1().MutatingWebhookConfigurations()
manager := &mutatingWebhookConfigurationManager{
configuration: &atomic.Value{},
lister: informer.Lister(),
hasSynced: informer.Informer().HasSynced,
initialConfigurationSynced: &atomic.Bool{},
configuration: &atomic.Value{},
lister: informer.Lister(),
}
// Start with an empty list
manager.configuration.Store([]webhook.WebhookAccessor{})
manager.initialConfigurationSynced.Store(false)
// On any change, rebuild the config
informer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
// TODO: the initial sync for this is N ^ 2, ideally we should make it N.
handler, _ := informer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(_ interface{}) { manager.updateConfiguration() },
UpdateFunc: func(_, _ interface{}) { manager.updateConfiguration() },
DeleteFunc: func(_ interface{}) { manager.updateConfiguration() },
})
// Since our processing is synchronous, this is all we need to do to
// see if we have processed everything or not.
manager.hasSynced = handler.HasSynced
return manager
}
@ -73,28 +70,9 @@ func (m *mutatingWebhookConfigurationManager) Webhooks() []webhook.WebhookAccess
return m.configuration.Load().([]webhook.WebhookAccessor)
}
// HasSynced returns true when the manager is synced with existing webhookconfig
// objects at startup-- which means the informer is synced and either has no items
// or updateConfiguration() has completed.
func (m *mutatingWebhookConfigurationManager) HasSynced() bool {
if !m.hasSynced() {
return false
}
if m.initialConfigurationSynced.Load() {
// the informer has synced and configuration has been updated
return true
}
if configurations, err := m.lister.List(labels.Everything()); err == nil && len(configurations) == 0 {
// the empty list we initially stored is valid to use.
// Setting initialConfigurationSynced to true, so subsequent checks
// would be able to take the fast path on the atomic boolean in a
// cluster without any admission webhooks configured.
m.initialConfigurationSynced.Store(true)
// the informer has synced and we don't have any items
return true
}
return false
}
// HasSynced returns true if the initial set of mutating webhook configurations
// has been loaded.
func (m *mutatingWebhookConfigurationManager) HasSynced() bool { return m.hasSynced() }
func (m *mutatingWebhookConfigurationManager) updateConfiguration() {
configurations, err := m.lister.List(labels.Everything())
@ -103,7 +81,6 @@ func (m *mutatingWebhookConfigurationManager) updateConfiguration() {
return
}
m.configuration.Store(mergeMutatingWebhookConfigurations(configurations))
m.initialConfigurationSynced.Store(true)
}
func mergeMutatingWebhookConfigurations(configurations []*v1.MutatingWebhookConfiguration) []webhook.WebhookAccessor {

View File

@ -36,11 +36,6 @@ type validatingWebhookConfigurationManager struct {
configuration *atomic.Value
lister admissionregistrationlisters.ValidatingWebhookConfigurationLister
hasSynced func() bool
// initialConfigurationSynced tracks if
// the existing webhook configs have been synced (honored) by the
// manager at startup-- the informer has synced and either has no items
// or has finished executing updateConfiguration() once.
initialConfigurationSynced *atomic.Bool
}
var _ generic.Source = &validatingWebhookConfigurationManager{}
@ -48,23 +43,25 @@ var _ generic.Source = &validatingWebhookConfigurationManager{}
func NewValidatingWebhookConfigurationManager(f informers.SharedInformerFactory) generic.Source {
informer := f.Admissionregistration().V1().ValidatingWebhookConfigurations()
manager := &validatingWebhookConfigurationManager{
configuration: &atomic.Value{},
lister: informer.Lister(),
hasSynced: informer.Informer().HasSynced,
initialConfigurationSynced: &atomic.Bool{},
configuration: &atomic.Value{},
lister: informer.Lister(),
}
// Start with an empty list
manager.configuration.Store([]webhook.WebhookAccessor{})
manager.initialConfigurationSynced.Store(false)
// On any change, rebuild the config
informer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
// TODO: the initial sync for this is N ^ 2, ideally we should make it N.
handle, _ := informer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(_ interface{}) { manager.updateConfiguration() },
UpdateFunc: func(_, _ interface{}) { manager.updateConfiguration() },
DeleteFunc: func(_ interface{}) { manager.updateConfiguration() },
})
// Since our processing is synchronous, this is all we need to do to
// see if we have processed everything or not.
manager.hasSynced = handle.HasSynced
return manager
}
@ -73,29 +70,9 @@ func (v *validatingWebhookConfigurationManager) Webhooks() []webhook.WebhookAcce
return v.configuration.Load().([]webhook.WebhookAccessor)
}
// HasSynced returns true when the manager is synced with existing webhookconfig
// objects at startup-- which means the informer is synced and either has no items
// or updateConfiguration() has completed.
func (v *validatingWebhookConfigurationManager) HasSynced() bool {
if !v.hasSynced() {
return false
}
if v.initialConfigurationSynced.Load() {
// the informer has synced and configuration has been updated
return true
}
if configurations, err := v.lister.List(labels.Everything()); err == nil && len(configurations) == 0 {
// the empty list we initially stored is valid to use.
// Setting initialConfigurationSynced to true, so subsequent checks
// would be able to take the fast path on the atomic boolean in a
// cluster without any admission webhooks configured.
v.initialConfigurationSynced.Store(true)
// the informer has synced and we don't have any items
return true
}
return false
}
// HasSynced returns true if the initial set of mutating webhook configurations
// has been loaded.
func (v *validatingWebhookConfigurationManager) HasSynced() bool { return v.hasSynced() }
func (v *validatingWebhookConfigurationManager) updateConfiguration() {
configurations, err := v.lister.List(labels.Everything())
@ -104,7 +81,6 @@ func (v *validatingWebhookConfigurationManager) updateConfiguration() {
return
}
v.configuration.Store(mergeValidatingWebhookConfigurations(configurations))
v.initialConfigurationSynced.Store(true)
}
func mergeValidatingWebhookConfigurations(configurations []*v1.ValidatingWebhookConfiguration) []webhook.WebhookAccessor {

View File

@ -39,6 +39,7 @@ import (
"k8s.io/client-go/dynamic"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
)
var _ CELPolicyEvaluator = &celAdmissionController{}
@ -165,6 +166,7 @@ func NewAdmissionController(
}
func (c *celAdmissionController) Run(stopCh <-chan struct{}) {
// TODO: Doesn't this comparison need a lock?
if c.runningContext != nil {
return
}
@ -302,9 +304,10 @@ func (c *celAdmissionController) Validate(
// If the param informer for this admission policy has not yet
// had time to perform an initial listing, don't attempt to use
// it.
//!TOOD(alexzielenski): add a wait for a very short amount of
// time for the cache to sync
if !paramInfo.controller.HasSynced() {
//!TODO(alexzielenski): Add a shorter timeout
// than "forever" to this wait.
if !cache.WaitForCacheSync(c.runningContext.Done(), paramInfo.controller.HasSynced) {
addConfigError(fmt.Errorf("paramKind kind `%v` not yet synced to use for admission",
paramKind.String()), definition, binding)
continue

View File

@ -128,7 +128,7 @@ func (c *celAdmissionController) reconcilePolicyDefinition(namespace, name strin
c.dynamicClient,
paramsGVR.Resource,
corev1.NamespaceAll,
30*time.Second,
30*time.Second, // TODO: do we really need to ever resync these?
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
nil,
)

View File

@ -21,6 +21,7 @@ import (
"errors"
"fmt"
"sync"
"sync/atomic"
"time"
kerrors "k8s.io/apimachinery/pkg/api/errors"
@ -30,6 +31,7 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/cache/synctrack"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
)
@ -45,6 +47,11 @@ type controller[T runtime.Object] struct {
reconciler func(namespace, name string, newObj T) error
options ControllerOptions
// must hold a func() bool or nil
notificationsDelivered atomic.Value
hasProcessed synctrack.AsyncTracker[string]
}
type ControllerOptions struct {
@ -69,12 +76,20 @@ func NewController[T runtime.Object](
options.Name = fmt.Sprintf("%T-controller", *new(T))
}
return &controller[T]{
c := &controller[T]{
options: options,
informer: informer,
reconciler: reconciler,
queue: nil,
}
c.hasProcessed.UpstreamHasSynced = func() bool {
f := c.notificationsDelivered.Load()
if f == nil {
return false
}
return f.(func() bool)()
}
return c
}
// Runs the controller and returns an error explaining why running was stopped.
@ -92,20 +107,22 @@ func (c *controller[T]) Run(ctx context.Context) error {
// would never shut down the workqueue
defer c.queue.ShutDown()
enqueue := func(obj interface{}) {
enqueue := func(obj interface{}, isInInitialList bool) {
var key string
var err error
if key, err = cache.DeletionHandlingMetaNamespaceKeyFunc(obj); err != nil {
utilruntime.HandleError(err)
return
}
if isInInitialList {
c.hasProcessed.Start(key)
}
c.queue.Add(key)
}
registration, err := c.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
enqueue(obj)
},
registration, err := c.informer.AddEventHandler(cache.ResourceEventHandlerDetailedFuncs{
AddFunc: enqueue,
UpdateFunc: func(oldObj, newObj interface{}) {
oldMeta, err1 := meta.Accessor(oldObj)
newMeta, err2 := meta.Accessor(newObj)
@ -126,13 +143,14 @@ func (c *controller[T]) Run(ctx context.Context) error {
return
}
enqueue(newObj)
enqueue(newObj, false)
},
DeleteFunc: func(obj interface{}) {
// Enqueue
enqueue(obj)
enqueue(obj, false)
},
})
c.notificationsDelivered.Store(registration.HasSynced)
// Error might be raised if informer was started and stopped already
if err != nil {
@ -142,6 +160,7 @@ func (c *controller[T]) Run(ctx context.Context) error {
// Make sure event handler is removed from informer in case return early from
// an error
defer func() {
c.notificationsDelivered.Store(func() bool { return false })
// Remove event handler and Handle Error here. Error should only be raised
// for improper usage of event handler API.
if err := c.informer.RemoveEventHandler(registration); err != nil {
@ -188,7 +207,7 @@ func (c *controller[T]) Run(ctx context.Context) error {
}
func (c *controller[T]) HasSynced() bool {
return c.informer.HasSynced()
return c.hasProcessed.HasSynced()
}
func (c *controller[T]) runWorker() {
@ -220,6 +239,7 @@ func (c *controller[T]) runWorker() {
// but the key is invalid so there is no point in doing that)
return fmt.Errorf("expected string in workqueue but got %#v", obj)
}
defer c.hasProcessed.Finished(key)
if err := c.reconcile(key); err != nil {
// Put the item back on the workqueue to handle any transient errors.

View File

@ -106,6 +106,7 @@ func setupTest(ctx context.Context, customReconciler func(string, string, runtim
controller generic.Controller[*unstructured.Unstructured],
informer *testInformer,
waitForReconcile func(runtime.Object) error,
verifyNoMoreEvents func() bool,
) {
tracker = clienttesting.NewObjectTracker(scheme, codecs.UniversalDecoder())
reconciledObjects := make(chan runtime.Object)
@ -127,7 +128,11 @@ func setupTest(ctx context.Context, customReconciler func(string, string, runtim
if customReconciler != nil {
err = customReconciler(namespace, name, newObj)
}
reconciledObjects <- copied
select {
case reconciledObjects <- copied:
case <-ctx.Done():
panic("timed out attempting to deliver reconcile event")
}
return err
}
@ -149,23 +154,24 @@ func setupTest(ctx context.Context, customReconciler func(string, string, runtim
generic.ControllerOptions{},
)
go func() {
<-ctx.Done()
close(reconciledObjects)
verifyNoMoreEvents = func() bool {
close(reconciledObjects) // closing means that a future attempt to send will crash
for leftover := range reconciledObjects {
panic(fmt.Errorf("leftover object which was not anticipated by test: %v", leftover))
}
}()
// TODO(alexzielenski): this effectively doesn't test anything since the
// controller drops any pending events when it shuts down.
return true
}
return tracker, myController, informer, waitForReconcile
return tracker, myController, informer, waitForReconcile, verifyNoMoreEvents
}
func TestReconcile(t *testing.T) {
testContext, testCancel := context.WithTimeout(context.Background(), 2*time.Second)
defer testCancel()
tracker, myController, informer, waitForReconcile := setupTest(testContext, nil)
tracker, myController, informer, waitForReconcile, verifyNoMoreEvents := setupTest(testContext, nil)
// Add object to informer
initialObject := &unstructured.Unstructured{}
@ -196,11 +202,16 @@ func TestReconcile(t *testing.T) {
require.ErrorIs(t, stopReason, context.Canceled)
}()
require.True(t, cache.WaitForCacheSync(testContext.Done(), myController.HasSynced))
// The controller is blocked because the reconcile function sends on an
// unbuffered channel.
require.False(t, myController.HasSynced())
// Wait for all enqueued reconciliations
require.NoError(t, waitForReconcile(initialObject))
// Now it is safe to wait for it to Sync
require.True(t, cache.WaitForCacheSync(testContext.Done(), myController.HasSynced))
// Updated object
updatedObject := &unstructured.Unstructured{}
updatedObject.SetUnstructuredContent(map[string]interface{}{
@ -220,13 +231,15 @@ func TestReconcile(t *testing.T) {
testCancel()
wg.Wait()
verifyNoMoreEvents()
}
func TestShutdown(t *testing.T) {
testContext, testCancel := context.WithTimeout(context.Background(), 2*time.Second)
defer testCancel()
_, myController, informer, _ := setupTest(testContext, nil)
_, myController, informer, _, verifyNoMoreEvents := setupTest(testContext, nil)
wg := sync.WaitGroup{}
@ -256,6 +269,8 @@ func TestShutdown(t *testing.T) {
// Ensure the event handler was cleaned up
require.Empty(t, informer.registrations)
verifyNoMoreEvents()
}
// Show an error is thrown informer isn't started when the controller runs
@ -263,7 +278,7 @@ func TestInformerNeverStarts(t *testing.T) {
testContext, testCancel := context.WithTimeout(context.Background(), 400*time.Millisecond)
defer testCancel()
_, myController, informer, _ := setupTest(testContext, nil)
_, myController, informer, _, verifyNoMoreEvents := setupTest(testContext, nil)
wg := sync.WaitGroup{}
@ -283,6 +298,8 @@ func TestInformerNeverStarts(t *testing.T) {
// Ensure there are no event handlers
require.Empty(t, informer.registrations)
verifyNoMoreEvents()
}
// Shows that if RV does not change, the reconciler does not get called
@ -290,7 +307,7 @@ func TestIgnoredUpdate(t *testing.T) {
testContext, testCancel := context.WithTimeout(context.Background(), 2*time.Second)
defer testCancel()
tracker, myController, informer, waitForReconcile := setupTest(testContext, nil)
tracker, myController, informer, waitForReconcile, verifyNoMoreEvents := setupTest(testContext, nil)
// Add object to informer
initialObject := &unstructured.Unstructured{}
@ -321,11 +338,16 @@ func TestIgnoredUpdate(t *testing.T) {
require.ErrorIs(t, stopReason, context.Canceled)
}()
require.True(t, cache.WaitForCacheSync(testContext.Done(), myController.HasSynced))
// The controller is blocked because the reconcile function sends on an
// unbuffered channel.
require.False(t, myController.HasSynced())
// Wait for all enqueued reconciliations
require.NoError(t, waitForReconcile(initialObject))
// Now it is safe to wait for it to Sync
require.True(t, cache.WaitForCacheSync(testContext.Done(), myController.HasSynced))
// Send update with the same object
require.NoError(t, tracker.Update(fakeGVR, initialObject, ""))
@ -334,8 +356,9 @@ func TestIgnoredUpdate(t *testing.T) {
testCancel()
wg.Wait()
// Test infrastructure has logic to panic if there are any reconciled objects
// that weren't "expected"
// TODO(alexzielenski): Find a better way to test this since the
// controller drops any pending events when it shuts down.
verifyNoMoreEvents()
}
// Shows that an object which fails reconciliation will retry
@ -345,7 +368,7 @@ func TestReconcileRetry(t *testing.T) {
calls := atomic.Uint64{}
success := atomic.Bool{}
tracker, myController, _, waitForReconcile := setupTest(testContext, func(s1, s2 string, o runtime.Object) error {
tracker, myController, _, waitForReconcile, verifyNoMoreEvents := setupTest(testContext, func(s1, s2 string, o runtime.Object) error {
if calls.Add(1) > 2 {
// Suddenly start liking the object
@ -390,13 +413,14 @@ func TestReconcileRetry(t *testing.T) {
require.True(t, success.Load(), "last call to reconcile should return success")
testCancel()
wg.Wait()
verifyNoMoreEvents()
}
func TestInformerList(t *testing.T) {
testContext, testCancel := context.WithTimeout(context.Background(), 2*time.Second)
defer testCancel()
tracker, myController, _, _ := setupTest(testContext, nil)
tracker, myController, _, _, _ := setupTest(testContext, nil)
wg := sync.WaitGroup{}
@ -406,7 +430,12 @@ func TestInformerList(t *testing.T) {
myController.Informer().Run(testContext.Done())
}()
require.True(t, cache.WaitForCacheSync(testContext.Done(), myController.HasSynced))
defer func() {
testCancel()
wg.Wait()
}()
require.True(t, cache.WaitForCacheSync(testContext.Done(), myController.Informer().HasSynced))
object1 := &unstructured.Unstructured{}
object1.SetUnstructuredContent(map[string]interface{}{

View File

@ -85,7 +85,7 @@ type Config struct {
type ShouldResyncFunc func() bool
// ProcessFunc processes a single object.
type ProcessFunc func(obj interface{}) error
type ProcessFunc func(obj interface{}, isInInitialList bool) error
// `*controller` implements Controller
type controller struct {
@ -215,7 +215,7 @@ func (c *controller) processLoop() {
// happen if the watch is closed and misses the delete event and we don't
// notice the deletion until the subsequent re-list.
type ResourceEventHandler interface {
OnAdd(obj interface{})
OnAdd(obj interface{}, isInInitialList bool)
OnUpdate(oldObj, newObj interface{})
OnDelete(obj interface{})
}
@ -224,6 +224,9 @@ type ResourceEventHandler interface {
// as few of the notification functions as you want while still implementing
// ResourceEventHandler. This adapter does not remove the prohibition against
// modifying the objects.
//
// See ResourceEventHandlerDetailedFuncs if your use needs to propagate
// HasSynced.
type ResourceEventHandlerFuncs struct {
AddFunc func(obj interface{})
UpdateFunc func(oldObj, newObj interface{})
@ -231,7 +234,7 @@ type ResourceEventHandlerFuncs struct {
}
// OnAdd calls AddFunc if it's not nil.
func (r ResourceEventHandlerFuncs) OnAdd(obj interface{}) {
func (r ResourceEventHandlerFuncs) OnAdd(obj interface{}, isInInitialList bool) {
if r.AddFunc != nil {
r.AddFunc(obj)
}
@ -251,6 +254,36 @@ func (r ResourceEventHandlerFuncs) OnDelete(obj interface{}) {
}
}
// ResourceEventHandlerDetailedFuncs is exactly like ResourceEventHandlerFuncs
// except its AddFunc accepts the isInInitialList parameter, for propagating
// HasSynced.
type ResourceEventHandlerDetailedFuncs struct {
AddFunc func(obj interface{}, isInInitialList bool)
UpdateFunc func(oldObj, newObj interface{})
DeleteFunc func(obj interface{})
}
// OnAdd calls AddFunc if it's not nil.
func (r ResourceEventHandlerDetailedFuncs) OnAdd(obj interface{}, isInInitialList bool) {
if r.AddFunc != nil {
r.AddFunc(obj, isInInitialList)
}
}
// OnUpdate calls UpdateFunc if it's not nil.
func (r ResourceEventHandlerDetailedFuncs) OnUpdate(oldObj, newObj interface{}) {
if r.UpdateFunc != nil {
r.UpdateFunc(oldObj, newObj)
}
}
// OnDelete calls DeleteFunc if it's not nil.
func (r ResourceEventHandlerDetailedFuncs) OnDelete(obj interface{}) {
if r.DeleteFunc != nil {
r.DeleteFunc(obj)
}
}
// FilteringResourceEventHandler applies the provided filter to all events coming
// in, ensuring the appropriate nested handler method is invoked. An object
// that starts passing the filter after an update is considered an add, and an
@ -262,11 +295,11 @@ type FilteringResourceEventHandler struct {
}
// OnAdd calls the nested handler only if the filter succeeds
func (r FilteringResourceEventHandler) OnAdd(obj interface{}) {
func (r FilteringResourceEventHandler) OnAdd(obj interface{}, isInInitialList bool) {
if !r.FilterFunc(obj) {
return
}
r.Handler.OnAdd(obj)
r.Handler.OnAdd(obj, isInInitialList)
}
// OnUpdate ensures the proper handler is called depending on whether the filter matches
@ -277,7 +310,7 @@ func (r FilteringResourceEventHandler) OnUpdate(oldObj, newObj interface{}) {
case newer && older:
r.Handler.OnUpdate(oldObj, newObj)
case newer && !older:
r.Handler.OnAdd(newObj)
r.Handler.OnAdd(newObj, false)
case !newer && older:
r.Handler.OnDelete(oldObj)
default:
@ -417,6 +450,7 @@ func processDeltas(
clientState Store,
transformer TransformFunc,
deltas Deltas,
isInInitialList bool,
) error {
// from oldest to newest
for _, d := range deltas {
@ -440,7 +474,7 @@ func processDeltas(
if err := clientState.Add(obj); err != nil {
return err
}
handler.OnAdd(obj)
handler.OnAdd(obj, isInInitialList)
}
case Deleted:
if err := clientState.Delete(obj); err != nil {
@ -488,9 +522,9 @@ func newInformer(
FullResyncPeriod: resyncPeriod,
RetryOnError: false,
Process: func(obj interface{}) error {
Process: func(obj interface{}, isInInitialList bool) error {
if deltas, ok := obj.(Deltas); ok {
return processDeltas(h, clientState, transformer, deltas)
return processDeltas(h, clientState, transformer, deltas, isInInitialList)
}
return errors.New("object given as Process argument is not Deltas")
},

View File

@ -62,7 +62,7 @@ func Example() {
// Let's implement a simple controller that just deletes
// everything that comes in.
Process: func(obj interface{}) error {
Process: func(obj interface{}, isInInitialList bool) error {
// Obj is from the Pop method of the Queue we make above.
newest := obj.(Deltas).Newest()
@ -137,8 +137,8 @@ func ExampleNewInformer() {
source,
&v1.Pod{},
time.Millisecond*100,
ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
ResourceEventHandlerDetailedFuncs{
AddFunc: func(obj interface{}, isInInitialList bool) {
source.Delete(obj.(runtime.Object))
},
DeleteFunc: func(obj interface{}) {
@ -213,8 +213,8 @@ func TestHammerController(t *testing.T) {
source,
&v1.Pod{},
time.Millisecond*100,
ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) { recordFunc("add", obj) },
ResourceEventHandlerDetailedFuncs{
AddFunc: func(obj interface{}, isInInitialList bool) { recordFunc("add", obj) },
UpdateFunc: func(oldObj, newObj interface{}) { recordFunc("update", newObj) },
DeleteFunc: func(obj interface{}) { recordFunc("delete", obj) },
},
@ -416,8 +416,8 @@ func TestPanicPropagated(t *testing.T) {
source,
&v1.Pod{},
time.Millisecond*100,
ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
ResourceEventHandlerDetailedFuncs{
AddFunc: func(obj interface{}, isInInitialList bool) {
// Create a panic.
panic("Just panic.")
},
@ -526,8 +526,8 @@ func TestTransformingInformer(t *testing.T) {
source,
&v1.Pod{},
0,
ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) { recordEvent(watch.Added, nil, obj) },
ResourceEventHandlerDetailedFuncs{
AddFunc: func(obj interface{}, isInInitialList bool) { recordEvent(watch.Added, nil, obj) },
UpdateFunc: func(oldObj, newObj interface{}) { recordEvent(watch.Modified, oldObj, newObj) },
DeleteFunc: func(obj interface{}) { recordEvent(watch.Deleted, obj, nil) },
},

View File

@ -271,6 +271,10 @@ func (f *DeltaFIFO) KeyOf(obj interface{}) (string, error) {
func (f *DeltaFIFO) HasSynced() bool {
f.lock.Lock()
defer f.lock.Unlock()
return f.hasSynced_locked()
}
func (f *DeltaFIFO) hasSynced_locked() bool {
return f.populated && f.initialPopulationCount == 0
}
@ -526,6 +530,7 @@ func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
f.cond.Wait()
}
isInInitialList := !f.hasSynced_locked()
id := f.queue[0]
f.queue = f.queue[1:]
depth := len(f.queue)
@ -551,7 +556,7 @@ func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
utiltrace.Field{Key: "Reason", Value: "slow event handlers blocking the queue"})
defer trace.LogIfLong(100 * time.Millisecond)
}
err := process(item)
err := process(item, isInInitialList)
if e, ok := err.(ErrRequeue); ok {
f.addIfNotPresent(id, item)
err = e.Err

View File

@ -125,7 +125,7 @@ func TestDeltaFIFO_requeueOnPop(t *testing.T) {
f := NewDeltaFIFOWithOptions(DeltaFIFOOptions{KeyFunction: testFifoObjectKeyFunc})
f.Add(mkFifoObj("foo", 10))
_, err := f.Pop(func(obj interface{}) error {
_, err := f.Pop(func(obj interface{}, isInInitialList bool) error {
if obj.(Deltas)[0].Object.(testFifoObject).name != "foo" {
t.Fatalf("unexpected object: %#v", obj)
}
@ -138,7 +138,7 @@ func TestDeltaFIFO_requeueOnPop(t *testing.T) {
t.Fatalf("object should have been requeued: %t %v", ok, err)
}
_, err = f.Pop(func(obj interface{}) error {
_, err = f.Pop(func(obj interface{}, isInInitialList bool) error {
if obj.(Deltas)[0].Object.(testFifoObject).name != "foo" {
t.Fatalf("unexpected object: %#v", obj)
}
@ -151,7 +151,7 @@ func TestDeltaFIFO_requeueOnPop(t *testing.T) {
t.Fatalf("object should have been requeued: %t %v", ok, err)
}
_, err = f.Pop(func(obj interface{}) error {
_, err = f.Pop(func(obj interface{}, isInInitialList bool) error {
if obj.(Deltas)[0].Object.(testFifoObject).name != "foo" {
t.Fatalf("unexpected object: %#v", obj)
}
@ -480,6 +480,18 @@ func TestDeltaFIFO_UpdateResyncRace(t *testing.T) {
}
}
// pop2 captures both parameters, unlike Pop().
func pop2[T any](queue Queue) (T, bool) {
var result interface{}
var isList bool
queue.Pop(func(obj interface{}, isInInitialList bool) error {
result = obj
isList = isInInitialList
return nil
})
return result.(T), isList
}
func TestDeltaFIFO_HasSyncedCorrectOnDeletion(t *testing.T) {
f := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
KeyFunction: testFifoObjectKeyFunc,
@ -501,10 +513,13 @@ func TestDeltaFIFO_HasSyncedCorrectOnDeletion(t *testing.T) {
if f.HasSynced() {
t.Errorf("Expected HasSynced to be false")
}
cur := Pop(f).(Deltas)
cur, initial := pop2[Deltas](f)
if e, a := expected, cur; !reflect.DeepEqual(e, a) {
t.Errorf("Expected %#v, got %#v", e, a)
}
if initial != true {
t.Error("Expected initial list item")
}
}
if !f.HasSynced() {
t.Errorf("Expected HasSynced to be true")
@ -676,7 +691,7 @@ func TestDeltaFIFO_PopShouldUnblockWhenClosed(t *testing.T) {
const jobs = 10
for i := 0; i < jobs; i++ {
go func() {
f.Pop(func(obj interface{}) error {
f.Pop(func(obj interface{}, isInInitialList bool) error {
return nil
})
c <- struct{}{}

View File

@ -25,7 +25,7 @@ import (
// PopProcessFunc is passed to Pop() method of Queue interface.
// It is supposed to process the accumulator popped from the queue.
type PopProcessFunc func(interface{}) error
type PopProcessFunc func(obj interface{}, isInInitialList bool) error
// ErrRequeue may be returned by a PopProcessFunc to safely requeue
// the current item. The value of Err will be returned from Pop.
@ -82,9 +82,12 @@ type Queue interface {
// Pop is helper function for popping from Queue.
// WARNING: Do NOT use this function in non-test code to avoid races
// unless you really really really really know what you are doing.
//
// NOTE: This function is deprecated and may be removed in the future without
// additional warning.
func Pop(queue Queue) interface{} {
var result interface{}
queue.Pop(func(obj interface{}) error {
queue.Pop(func(obj interface{}, isInInitialList bool) error {
result = obj
return nil
})
@ -149,6 +152,10 @@ func (f *FIFO) Close() {
func (f *FIFO) HasSynced() bool {
f.lock.Lock()
defer f.lock.Unlock()
return f.hasSynced_locked()
}
func (f *FIFO) hasSynced_locked() bool {
return f.populated && f.initialPopulationCount == 0
}
@ -287,6 +294,7 @@ func (f *FIFO) Pop(process PopProcessFunc) (interface{}, error) {
f.cond.Wait()
}
isInInitialList := !f.hasSynced_locked()
id := f.queue[0]
f.queue = f.queue[1:]
if f.initialPopulationCount > 0 {
@ -298,7 +306,7 @@ func (f *FIFO) Pop(process PopProcessFunc) (interface{}, error) {
continue
}
delete(f.items, id)
err := process(item)
err := process(item, isInInitialList)
if e, ok := err.(ErrRequeue); ok {
f.addIfNotPresent(id, item)
err = e.Err

View File

@ -76,7 +76,7 @@ func TestFIFO_requeueOnPop(t *testing.T) {
f := NewFIFO(testFifoObjectKeyFunc)
f.Add(mkFifoObj("foo", 10))
_, err := f.Pop(func(obj interface{}) error {
_, err := f.Pop(func(obj interface{}, isInInitialList bool) error {
if obj.(testFifoObject).name != "foo" {
t.Fatalf("unexpected object: %#v", obj)
}
@ -89,7 +89,7 @@ func TestFIFO_requeueOnPop(t *testing.T) {
t.Fatalf("object should have been requeued: %t %v", ok, err)
}
_, err = f.Pop(func(obj interface{}) error {
_, err = f.Pop(func(obj interface{}, isInInitialList bool) error {
if obj.(testFifoObject).name != "foo" {
t.Fatalf("unexpected object: %#v", obj)
}
@ -102,7 +102,7 @@ func TestFIFO_requeueOnPop(t *testing.T) {
t.Fatalf("object should have been requeued: %t %v", ok, err)
}
_, err = f.Pop(func(obj interface{}) error {
_, err = f.Pop(func(obj interface{}, isInInitialList bool) error {
if obj.(testFifoObject).name != "foo" {
t.Fatalf("unexpected object: %#v", obj)
}
@ -289,7 +289,7 @@ func TestFIFO_PopShouldUnblockWhenClosed(t *testing.T) {
const jobs = 10
for i := 0; i < jobs; i++ {
go func() {
f.Pop(func(obj interface{}) error {
f.Pop(func(obj interface{}, isInInitialList bool) error {
return nil
})
c <- struct{}{}

View File

@ -39,7 +39,7 @@ func BenchmarkListener(b *testing.B) {
AddFunc: func(obj interface{}) {
swg.Done()
},
}, 0, 0, time.Now(), 1024*1024)
}, 0, 0, time.Now(), 1024*1024, func() bool { return true })
var wg wait.Group
defer wg.Wait() // Wait for .run and .pop to stop
defer close(pl.addCh) // Tell .run and .pop to stop

View File

@ -26,6 +26,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/cache/synctrack"
"k8s.io/utils/buffer"
"k8s.io/utils/clock"
@ -132,11 +133,13 @@ import (
// state, except that its ResourceVersion is replaced with a
// ResourceVersion in which the object is actually absent.
type SharedInformer interface {
// AddEventHandler adds an event handler to the shared informer using the shared informer's resync
// period. Events to a single handler are delivered sequentially, but there is no coordination
// between different handlers.
// It returns a registration handle for the handler that can be used to remove
// the handler again.
// AddEventHandler adds an event handler to the shared informer using
// the shared informer's resync period. Events to a single handler are
// delivered sequentially, but there is no coordination between
// different handlers.
// It returns a registration handle for the handler that can be used to
// remove the handler again, or to tell if the handler is synced (has
// seen every item in the initial list).
AddEventHandler(handler ResourceEventHandler) (ResourceEventHandlerRegistration, error)
// AddEventHandlerWithResyncPeriod adds an event handler to the
// shared informer with the requested resync period; zero means
@ -169,6 +172,10 @@ type SharedInformer interface {
// HasSynced returns true if the shared informer's store has been
// informed by at least one full LIST of the authoritative state
// of the informer's object collection. This is unrelated to "resync".
//
// Note that this doesn't tell you if an individual handler is synced!!
// For that, please call HasSynced on the handle returned by
// AddEventHandler.
HasSynced() bool
// LastSyncResourceVersion is the resource version observed when last synced with the underlying
// store. The value returned is not synchronized with access to the underlying store and is not
@ -213,7 +220,14 @@ type SharedInformer interface {
// Opaque interface representing the registration of ResourceEventHandler for
// a SharedInformer. Must be supplied back to the same SharedInformer's
// `RemoveEventHandler` to unregister the handlers.
type ResourceEventHandlerRegistration interface{}
//
// Also used to tell if the handler is synced (has had all items in the initial
// list delivered).
type ResourceEventHandlerRegistration interface {
// HasSynced reports if both the parent has synced and all pre-sync
// events have been delivered.
HasSynced() bool
}
// SharedIndexInformer provides add and get Indexers ability based on SharedInformer.
type SharedIndexInformer interface {
@ -409,7 +423,8 @@ type updateNotification struct {
}
type addNotification struct {
newObj interface{}
newObj interface{}
isInInitialList bool
}
type deleteNotification struct {
@ -588,7 +603,7 @@ func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEv
}
}
listener := newProcessListener(handler, resyncPeriod, determineResyncPeriod(resyncPeriod, s.resyncCheckPeriod), s.clock.Now(), initialBufferSize)
listener := newProcessListener(handler, resyncPeriod, determineResyncPeriod(resyncPeriod, s.resyncCheckPeriod), s.clock.Now(), initialBufferSize, s.HasSynced)
if !s.started {
return s.processor.addListener(listener), nil
@ -604,27 +619,35 @@ func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEv
handle := s.processor.addListener(listener)
for _, item := range s.indexer.List() {
listener.add(addNotification{newObj: item})
// Note that we enqueue these notifications with the lock held
// and before returning the handle. That means there is never a
// chance for anyone to call the handle's HasSynced method in a
// state when it would falsely return true (i.e., when the
// shared informer is synced but it has not observed an Add
// with isInitialList being true, nor when the thread
// processing notifications somehow goes faster than this
// thread adding them and the counter is temporarily zero).
listener.add(addNotification{newObj: item, isInInitialList: true})
}
return handle, nil
}
func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
func (s *sharedIndexInformer) HandleDeltas(obj interface{}, isInInitialList bool) error {
s.blockDeltas.Lock()
defer s.blockDeltas.Unlock()
if deltas, ok := obj.(Deltas); ok {
return processDeltas(s, s.indexer, s.transform, deltas)
return processDeltas(s, s.indexer, s.transform, deltas, isInInitialList)
}
return errors.New("object given as Process argument is not Deltas")
}
// Conforms to ResourceEventHandler
func (s *sharedIndexInformer) OnAdd(obj interface{}) {
func (s *sharedIndexInformer) OnAdd(obj interface{}, isInInitialList bool) {
// Invocation of this function is locked under s.blockDeltas, so it is
// save to distribute the notification
s.cacheMutationDetector.AddObject(obj)
s.processor.distribute(addNotification{newObj: obj}, false)
s.processor.distribute(addNotification{newObj: obj, isInInitialList: isInInitialList}, false)
}
// Conforms to ResourceEventHandler
@ -846,6 +869,8 @@ type processorListener struct {
handler ResourceEventHandler
syncTracker *synctrack.SingleFileTracker
// pendingNotifications is an unbounded ring buffer that holds all notifications not yet distributed.
// There is one per listener, but a failing/stalled listener will have infinite pendingNotifications
// added until we OOM.
@ -876,11 +901,18 @@ type processorListener struct {
resyncLock sync.Mutex
}
func newProcessListener(handler ResourceEventHandler, requestedResyncPeriod, resyncPeriod time.Duration, now time.Time, bufferSize int) *processorListener {
// HasSynced returns true if the source informer has synced, and all
// corresponding events have been delivered.
func (p *processorListener) HasSynced() bool {
return p.syncTracker.HasSynced()
}
func newProcessListener(handler ResourceEventHandler, requestedResyncPeriod, resyncPeriod time.Duration, now time.Time, bufferSize int, hasSynced func() bool) *processorListener {
ret := &processorListener{
nextCh: make(chan interface{}),
addCh: make(chan interface{}),
handler: handler,
syncTracker: &synctrack.SingleFileTracker{UpstreamHasSynced: hasSynced},
pendingNotifications: *buffer.NewRingGrowing(bufferSize),
requestedResyncPeriod: requestedResyncPeriod,
resyncPeriod: resyncPeriod,
@ -892,6 +924,9 @@ func newProcessListener(handler ResourceEventHandler, requestedResyncPeriod, res
}
func (p *processorListener) add(notification interface{}) {
if a, ok := notification.(addNotification); ok && a.isInInitialList {
p.syncTracker.Start()
}
p.addCh <- notification
}
@ -937,7 +972,10 @@ func (p *processorListener) run() {
case updateNotification:
p.handler.OnUpdate(notification.oldObj, notification.newObj)
case addNotification:
p.handler.OnAdd(notification.newObj)
p.handler.OnAdd(notification.newObj, notification.isInInitialList)
if notification.isInInitialList {
p.syncTracker.Finished()
}
case deleteNotification:
p.handler.OnDelete(notification.oldObj)
default:

View File

@ -52,7 +52,7 @@ func newTestListener(name string, resyncPeriod time.Duration, expected ...string
return l
}
func (l *testListener) OnAdd(obj interface{}) {
func (l *testListener) OnAdd(obj interface{}, isInInitialList bool) {
l.handle(obj)
}
@ -68,7 +68,6 @@ func (l *testListener) handle(obj interface{}) {
fmt.Printf("%s: handle: %v\n", l.name, key)
l.lock.Lock()
defer l.lock.Unlock()
objectMeta, _ := meta.Accessor(obj)
l.receivedItemNames = append(l.receivedItemNames, objectMeta.GetName())
}
@ -649,8 +648,8 @@ func TestSharedInformerHandlerAbuse(t *testing.T) {
worker := func() {
// Keep adding and removing handler
// Make sure no duplicate events?
funcs := ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {},
funcs := ResourceEventHandlerDetailedFuncs{
AddFunc: func(obj interface{}, isInInitialList bool) {},
UpdateFunc: func(oldObj, newObj interface{}) {},
DeleteFunc: func(obj interface{}) {},
}
@ -902,9 +901,13 @@ func TestAddWhileActive(t *testing.T) {
// create the shared informer and resync every 12 hours
informer := NewSharedInformer(source, &v1.Pod{}, 0).(*sharedIndexInformer)
listener1 := newTestListener("originalListener", 0, "pod1")
listener2 := newTestListener("originalListener", 0, "pod1", "pod2")
listener2 := newTestListener("listener2", 0, "pod1", "pod2")
handle1, _ := informer.AddEventHandler(listener1)
if handle1.HasSynced() {
t.Error("Synced before Run??")
}
stop := make(chan struct{})
defer close(stop)
@ -916,7 +919,17 @@ func TestAddWhileActive(t *testing.T) {
return
}
if !handle1.HasSynced() {
t.Error("Not synced after Run??")
}
listener2.lock.Lock() // ensure we observe it before it has synced
handle2, _ := informer.AddEventHandler(listener2)
if handle2.HasSynced() {
t.Error("Synced before processing anything?")
}
listener2.lock.Unlock() // permit it to proceed and sync
source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod2"}})
if !listener2.ok() {
@ -924,6 +937,10 @@ func TestAddWhileActive(t *testing.T) {
return
}
if !handle2.HasSynced() {
t.Error("Not synced even after processing?")
}
if !isRegistered(informer, handle1) {
t.Errorf("handle1 is not active")
return

View File

@ -0,0 +1,116 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package synctrack contains utilities for helping controllers track whether
// they are "synced" or not, that is, whether they have processed all items
// from the informer's initial list.
package synctrack
import (
"sync"
"sync/atomic"
"k8s.io/apimachinery/pkg/util/sets"
)
// AsyncTracker helps propagate HasSynced in the face of multiple worker threads.
type AsyncTracker[T comparable] struct {
UpstreamHasSynced func() bool
lock sync.Mutex
waiting sets.Set[T]
}
// Start should be called prior to processing each key which is part of the
// initial list.
func (t *AsyncTracker[T]) Start(key T) {
t.lock.Lock()
defer t.lock.Unlock()
if t.waiting == nil {
t.waiting = sets.New[T](key)
} else {
t.waiting.Insert(key)
}
}
// Finished should be called when finished processing a key which was part of
// the initial list. Since keys are tracked individually, nothing bad happens
// if you call Finished without a corresponding call to Start. This makes it
// easier to use this in combination with e.g. queues which don't make it easy
// to plumb through the isInInitialList boolean.
func (t *AsyncTracker[T]) Finished(key T) {
t.lock.Lock()
defer t.lock.Unlock()
if t.waiting != nil {
t.waiting.Delete(key)
}
}
// HasSynced returns true if the source is synced and every key present in the
// initial list has been processed. This relies on the source not considering
// itself synced until *after* it has delivered the notification for the last
// key, and that notification handler must have called Start.
func (t *AsyncTracker[T]) HasSynced() bool {
// Call UpstreamHasSynced first: it might take a lock, which might take
// a significant amount of time, and we can't hold our lock while
// waiting on that or a user is likely to get a deadlock.
if !t.UpstreamHasSynced() {
return false
}
t.lock.Lock()
defer t.lock.Unlock()
return t.waiting.Len() == 0
}
// SingleFileTracker helps propagate HasSynced when events are processed in
// order (i.e. via a queue).
type SingleFileTracker struct {
UpstreamHasSynced func() bool
count int64
}
// Start should be called prior to processing each key which is part of the
// initial list.
func (t *SingleFileTracker) Start() {
atomic.AddInt64(&t.count, 1)
}
// Finished should be called when finished processing a key which was part of
// the initial list. You must never call Finished() before (or without) its
// corresponding Start(), that is a logic error that could cause HasSynced to
// return a wrong value. To help you notice this should it happen, Finished()
// will panic if the internal counter goes negative.
func (t *SingleFileTracker) Finished() {
result := atomic.AddInt64(&t.count, -1)
if result < 0 {
panic("synctrack: negative counter; this logic error means HasSynced may return incorrect value")
}
}
// HasSynced returns true if the source is synced and every key present in the
// initial list has been processed. This relies on the source not considering
// itself synced until *after* it has delivered the notification for the last
// key, and that notification handler must have called Start.
func (t *SingleFileTracker) HasSynced() bool {
// Call UpstreamHasSynced first: it might take a lock, which might take
// a significant amount of time, and we don't want to then act on a
// stale count value.
if !t.UpstreamHasSynced() {
return false
}
return atomic.LoadInt64(&t.count) <= 0
}

View File

@ -0,0 +1,239 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package synctrack
import (
"strings"
"sync"
"time"
"testing"
)
func testSingleFileFuncs(upstreamHasSynced func() bool) (start func(), finished func(), hasSynced func() bool) {
tracker := SingleFileTracker{
UpstreamHasSynced: upstreamHasSynced,
}
return tracker.Start, tracker.Finished, tracker.HasSynced
}
func testAsyncFuncs(upstreamHasSynced func() bool) (start func(), finished func(), hasSynced func() bool) {
tracker := AsyncTracker[string]{
UpstreamHasSynced: upstreamHasSynced,
}
return func() { tracker.Start("key") }, func() { tracker.Finished("key") }, tracker.HasSynced
}
func TestBasicLogic(t *testing.T) {
table := []struct {
name string
construct func(func() bool) (func(), func(), func() bool)
}{
{"SingleFile", testSingleFileFuncs},
{"Async", testAsyncFuncs},
}
for _, entry := range table {
t.Run(entry.name, func(t *testing.T) {
table := []struct {
synced bool
start bool
finish bool
expectSynced bool
}{
{false, true, true, false},
{true, true, false, false},
{false, true, false, false},
{true, true, true, true},
}
for _, tt := range table {
Start, Finished, HasSynced := entry.construct(func() bool { return tt.synced })
if tt.start {
Start()
}
if tt.finish {
Finished()
}
got := HasSynced()
if e, a := tt.expectSynced, got; e != a {
t.Errorf("for %#v got %v (wanted %v)", tt, a, e)
}
}
})
}
}
func TestAsyncLocking(t *testing.T) {
aft := AsyncTracker[int]{UpstreamHasSynced: func() bool { return true }}
var wg sync.WaitGroup
for _, i := range []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10} {
wg.Add(1)
go func(i int) {
aft.Start(i)
go func() {
aft.Finished(i)
wg.Done()
}()
}(i)
}
wg.Wait()
if !aft.HasSynced() {
t.Errorf("async tracker must have made a threading error?")
}
}
func TestSingleFileCounting(t *testing.T) {
sft := SingleFileTracker{UpstreamHasSynced: func() bool { return true }}
for i := 0; i < 100; i++ {
sft.Start()
}
if sft.HasSynced() {
t.Fatal("Unexpectedly synced?")
}
for i := 0; i < 99; i++ {
sft.Finished()
}
if sft.HasSynced() {
t.Fatal("Unexpectedly synced?")
}
sft.Finished()
if !sft.HasSynced() {
t.Fatal("Unexpectedly not synced?")
}
// Calling an extra time will panic.
func() {
defer func() {
x := recover()
if x == nil {
t.Error("no panic?")
return
}
msg, ok := x.(string)
if !ok {
t.Errorf("unexpected panic value: %v", x)
return
}
if !strings.Contains(msg, "negative counter") {
t.Errorf("unexpected panic message: %v", msg)
return
}
}()
sft.Finished()
}()
// Negative counter still means it is synced
if !sft.HasSynced() {
t.Fatal("Unexpectedly not synced?")
}
}
func TestSingleFile(t *testing.T) {
table := []struct {
synced bool
starts int
stops int
expectSynced bool
}{
{false, 1, 1, false},
{true, 1, 0, false},
{false, 1, 0, false},
{true, 1, 1, true},
}
for _, tt := range table {
sft := SingleFileTracker{UpstreamHasSynced: func() bool { return tt.synced }}
for i := 0; i < tt.starts; i++ {
sft.Start()
}
for i := 0; i < tt.stops; i++ {
sft.Finished()
}
got := sft.HasSynced()
if e, a := tt.expectSynced, got; e != a {
t.Errorf("for %#v got %v (wanted %v)", tt, a, e)
}
}
}
func TestNoStaleValue(t *testing.T) {
table := []struct {
name string
construct func(func() bool) (func(), func(), func() bool)
}{
{"SingleFile", testSingleFileFuncs},
{"Async", testAsyncFuncs},
}
for _, entry := range table {
t.Run(entry.name, func(t *testing.T) {
var lock sync.Mutex
upstreamHasSynced := func() bool {
lock.Lock()
defer lock.Unlock()
return true
}
Start, Finished, HasSynced := entry.construct(upstreamHasSynced)
// Ordinarily the corresponding lock would be held and you wouldn't be
// able to call this function at this point.
if !HasSynced() {
t.Fatal("Unexpectedly not synced??")
}
Start()
if HasSynced() {
t.Fatal("Unexpectedly synced??")
}
Finished()
if !HasSynced() {
t.Fatal("Unexpectedly not synced??")
}
// Now we will prove that if the lock is held, you can't get a false
// HasSynced return.
lock.Lock()
// This goroutine calls HasSynced
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
if HasSynced() {
t.Error("Unexpectedly synced??")
}
}()
// This goroutine increments + unlocks. The sleep is to bias the
// runtime such that the other goroutine usually wins (it needs to work
// in both orderings, this one is more likely to be buggy).
go func() {
time.Sleep(time.Millisecond)
Start()
lock.Unlock()
}()
wg.Wait()
})
}
}

View File

@ -156,11 +156,8 @@ func NewAvailableConditionController(
c := &AvailableConditionController{
apiServiceClient: apiServiceClient,
apiServiceLister: apiServiceInformer.Lister(),
apiServiceSynced: apiServiceInformer.Informer().HasSynced,
serviceLister: serviceInformer.Lister(),
servicesSynced: serviceInformer.Informer().HasSynced,
endpointsLister: endpointsInformer.Lister(),
endpointsSynced: endpointsInformer.Informer().HasSynced,
serviceResolver: serviceResolver,
queue: workqueue.NewNamedRateLimitingQueue(
// We want a fairly tight requeue time. The controller listens to the API, but because it relies on the routability of the
@ -189,25 +186,28 @@ func NewAvailableConditionController(
// allows us to detect health in a more timely fashion when network connectivity to
// nodes is snipped, but the network still attempts to route there. See
// https://github.com/openshift/origin/issues/17159#issuecomment-341798063
apiServiceInformer.Informer().AddEventHandlerWithResyncPeriod(
apiServiceHandler, _ := apiServiceInformer.Informer().AddEventHandlerWithResyncPeriod(
cache.ResourceEventHandlerFuncs{
AddFunc: c.addAPIService,
UpdateFunc: c.updateAPIService,
DeleteFunc: c.deleteAPIService,
},
30*time.Second)
c.apiServiceSynced = apiServiceHandler.HasSynced
serviceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
serviceHandler, _ := serviceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: c.addService,
UpdateFunc: c.updateService,
DeleteFunc: c.deleteService,
})
c.servicesSynced = serviceHandler.HasSynced
endpointsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
endpointsHandler, _ := endpointsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: c.addEndpoints,
UpdateFunc: c.updateEndpoints,
DeleteFunc: c.deleteEndpoints,
})
c.endpointsSynced = endpointsHandler.HasSynced
c.syncFn = c.sync
@ -494,6 +494,10 @@ func (c *AvailableConditionController) Run(workers int, stopCh <-chan struct{})
klog.Info("Starting AvailableConditionController")
defer klog.Info("Shutting down AvailableConditionController")
// This waits not just for the informers to sync, but for our handlers
// to be called; since the handlers are three different ways of
// enqueueing the same thing, waiting for this permits the queue to
// maximally de-duplicate the entries.
if !controllers.WaitForCacheSync("AvailableConditionController", stopCh, c.apiServiceSynced, c.servicesSynced, c.endpointsSynced) {
return
}

View File

@ -562,7 +562,7 @@ type informerSpy struct {
deletes []interface{}
}
func (is *informerSpy) OnAdd(obj interface{}) {
func (is *informerSpy) OnAdd(obj interface{}, isInInitialList bool) {
is.mu.Lock()
defer is.mu.Unlock()
is.adds = append(is.adds, obj)

1
vendor/modules.txt vendored
View File

@ -1906,6 +1906,7 @@ k8s.io/client-go/testing
k8s.io/client-go/third_party/forked/golang/template
k8s.io/client-go/tools/auth
k8s.io/client-go/tools/cache
k8s.io/client-go/tools/cache/synctrack
k8s.io/client-go/tools/clientcmd
k8s.io/client-go/tools/clientcmd/api
k8s.io/client-go/tools/clientcmd/api/latest