client-go/tools/cache/synctrack/synctrack.go
Tom Wieczorek 9e8c663097 Properly align synctrack.SingleFileTracker struct
count is used with atomic operations so it must be 64-bit aligned,
otherwise atomic operations will panic. Having it at the top of the
struct will guarantee that, even on 32-bit arches.

This fixes panics like that one observed in kube-apiserver:

    E0310 13:48:47.476124     676 runtime.go:77] Observed a panic: unaligned 64-bit atomic operation
    goroutine 141 [running]:
    k8s.io/apimachinery/pkg/util/runtime.logPanic({0x2482378, 0x2db2ff8})
    	vendor/k8s.io/apimachinery/pkg/util/runtime/runtime.go:75 +0x94
    k8s.io/apimachinery/pkg/util/runtime.HandleCrash({0x0, 0x0, 0x0})
    	vendor/k8s.io/apimachinery/pkg/util/runtime/runtime.go:49 +0x78
    panic({0x2482378, 0x2db2ff8})
    	/usr/local/go/src/runtime/panic.go:884 +0x218
    runtime/internal/atomic.panicUnaligned()
    	/usr/local/go/src/runtime/internal/atomic/unaligned.go:8 +0x24
    runtime/internal/atomic.Load64(0x685f794)
    	/usr/local/go/src/runtime/internal/atomic/atomic_arm.s:280 +0x14
    k8s.io/client-go/tools/cache/synctrack.(*SingleFileTracker).HasSynced(0x685f790)
    	vendor/k8s.io/client-go/tools/cache/synctrack/synctrack.go:115 +0x3c
    k8s.io/client-go/tools/cache.(*processorListener).HasSynced(0x6013e60)
    	vendor/k8s.io/client-go/tools/cache/shared_informer.go:907 +0x20
    k8s.io/client-go/tools/cache.WaitForCacheSync.func1()
    	vendor/k8s.io/client-go/tools/cache/shared_informer.go:332 +0x50
    k8s.io/apimachinery/pkg/util/wait.ConditionFunc.WithContext.func1({0x2dcf274, 0x607c600})
    	vendor/k8s.io/apimachinery/pkg/util/wait/wait.go:222 +0x1c
    k8s.io/apimachinery/pkg/util/wait.runConditionWithCrashProtectionWithContext({0x2dcf274, 0x607c600}, 0x6382050)
    	vendor/k8s.io/apimachinery/pkg/util/wait/wait.go:262 +0x64
    k8s.io/apimachinery/pkg/util/wait.waitForWithContext({0x2dcf274, 0x607c600}, 0x64a6060, 0x6382050)
    	vendor/k8s.io/apimachinery/pkg/util/wait/wait.go:649 +0x11c
    k8s.io/apimachinery/pkg/util/wait.poll({0x2dcf274, 0x607c600}, 0x1, 0x64a6060, 0x6382050)
    	vendor/k8s.io/apimachinery/pkg/util/wait/wait.go:600 +0xc4
    k8s.io/apimachinery/pkg/util/wait.PollImmediateUntilWithContext({0x2dcf274, 0x607c600}, 0x5f5e100, 0x6382050)
    	vendor/k8s.io/apimachinery/pkg/util/wait/wait.go:551 +0x60
    k8s.io/apimachinery/pkg/util/wait.PollImmediateUntil(0x5f5e100, 0x6298020, 0x607c600)
    	vendor/k8s.io/apimachinery/pkg/util/wait/wait.go:542 +0x48
    k8s.io/client-go/tools/cache.WaitForCacheSync(0x607c600, {0x6298000, 0x3, 0x3})
    	vendor/k8s.io/client-go/tools/cache/shared_informer.go:329 +0x80
    k8s.io/client-go/tools/cache.WaitForNamedCacheSync({0x283c5e1, 0xf}, 0x607c600, {0x6298000, 0x3, 0x3})
    	vendor/k8s.io/client-go/tools/cache/shared_informer.go:316 +0xe8
    created by k8s.io/kubernetes/plugin/pkg/auth/authorizer/node.AddGraphEventHandlers
    	plugin/pkg/auth/authorizer/node/graph_populator.go:65 +0x5b0
    panic: unaligned 64-bit atomic operation [recovered]
    	panic: unaligned 64-bit atomic operation

    goroutine 141 [running]:
    k8s.io/apimachinery/pkg/util/runtime.HandleCrash({0x0, 0x0, 0x0})
    	vendor/k8s.io/apimachinery/pkg/util/runtime/runtime.go:56 +0xf4
    panic({0x2482378, 0x2db2ff8})
    	/usr/local/go/src/runtime/panic.go:884 +0x218
    runtime/internal/atomic.panicUnaligned()
    	/usr/local/go/src/runtime/internal/atomic/unaligned.go:8 +0x24
    runtime/internal/atomic.Load64(0x685f794)
    	/usr/local/go/src/runtime/internal/atomic/atomic_arm.s:280 +0x14
    k8s.io/client-go/tools/cache/synctrack.(*SingleFileTracker).HasSynced(0x685f790)
    	vendor/k8s.io/client-go/tools/cache/synctrack/synctrack.go:115 +0x3c
    k8s.io/client-go/tools/cache.(*processorListener).HasSynced(0x6013e60)
    	vendor/k8s.io/client-go/tools/cache/shared_informer.go:907 +0x20
    k8s.io/client-go/tools/cache.WaitForCacheSync.func1()
    	vendor/k8s.io/client-go/tools/cache/shared_informer.go:332 +0x50
    k8s.io/apimachinery/pkg/util/wait.ConditionFunc.WithContext.func1({0x2dcf274, 0x607c600})
    	vendor/k8s.io/apimachinery/pkg/util/wait/wait.go:222 +0x1c
    k8s.io/apimachinery/pkg/util/wait.runConditionWithCrashProtectionWithContext({0x2dcf274, 0x607c600}, 0x6382050)
    	vendor/k8s.io/apimachinery/pkg/util/wait/wait.go:262 +0x64
    k8s.io/apimachinery/pkg/util/wait.waitForWithContext({0x2dcf274, 0x607c600}, 0x64a6060, 0x6382050)
    	vendor/k8s.io/apimachinery/pkg/util/wait/wait.go:649 +0x11c
    k8s.io/apimachinery/pkg/util/wait.poll({0x2dcf274, 0x607c600}, 0x1, 0x64a6060, 0x6382050)
    	vendor/k8s.io/apimachinery/pkg/util/wait/wait.go:600 +0xc4
    k8s.io/apimachinery/pkg/util/wait.PollImmediateUntilWithContext({0x2dcf274, 0x607c600}, 0x5f5e100, 0x6382050)
    	vendor/k8s.io/apimachinery/pkg/util/wait/wait.go:551 +0x60
    k8s.io/apimachinery/pkg/util/wait.PollImmediateUntil(0x5f5e100, 0x6298020, 0x607c600)
    	vendor/k8s.io/apimachinery/pkg/util/wait/wait.go:542 +0x48
    k8s.io/client-go/tools/cache.WaitForCacheSync(0x607c600, {0x6298000, 0x3, 0x3})
    	vendor/k8s.io/client-go/tools/cache/shared_informer.go:329 +0x80
    k8s.io/client-go/tools/cache.WaitForNamedCacheSync({0x283c5e1, 0xf}, 0x607c600, {0x6298000, 0x3, 0x3})
    	vendor/k8s.io/client-go/tools/cache/shared_informer.go:316 +0xe8
    created by k8s.io/kubernetes/plugin/pkg/auth/authorizer/node.AddGraphEventHandlers
    	plugin/pkg/auth/authorizer/node/graph_populator.go:65 +0x5b0

Kubernetes-commit: ffcf653e0666366e6241c99d9418e830840afa0f
2023-03-13 08:37:13 +01:00

121 lines
4.2 KiB
Go

/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package synctrack contains utilities for helping controllers track whether
// they are "synced" or not, that is, whether they have processed all items
// from the informer's initial list.
package synctrack
import (
"sync"
"sync/atomic"
"k8s.io/apimachinery/pkg/util/sets"
)
// AsyncTracker helps propagate HasSynced in the face of multiple worker threads.
type AsyncTracker[T comparable] struct {
UpstreamHasSynced func() bool
lock sync.Mutex
waiting sets.Set[T]
}
// Start should be called prior to processing each key which is part of the
// initial list.
func (t *AsyncTracker[T]) Start(key T) {
t.lock.Lock()
defer t.lock.Unlock()
if t.waiting == nil {
t.waiting = sets.New[T](key)
} else {
t.waiting.Insert(key)
}
}
// Finished should be called when finished processing a key which was part of
// the initial list. Since keys are tracked individually, nothing bad happens
// if you call Finished without a corresponding call to Start. This makes it
// easier to use this in combination with e.g. queues which don't make it easy
// to plumb through the isInInitialList boolean.
func (t *AsyncTracker[T]) Finished(key T) {
t.lock.Lock()
defer t.lock.Unlock()
if t.waiting != nil {
t.waiting.Delete(key)
}
}
// HasSynced returns true if the source is synced and every key present in the
// initial list has been processed. This relies on the source not considering
// itself synced until *after* it has delivered the notification for the last
// key, and that notification handler must have called Start.
func (t *AsyncTracker[T]) HasSynced() bool {
// Call UpstreamHasSynced first: it might take a lock, which might take
// a significant amount of time, and we can't hold our lock while
// waiting on that or a user is likely to get a deadlock.
if !t.UpstreamHasSynced() {
return false
}
t.lock.Lock()
defer t.lock.Unlock()
return t.waiting.Len() == 0
}
// SingleFileTracker helps propagate HasSynced when events are processed in
// order (i.e. via a queue).
type SingleFileTracker struct {
// Important: count is used with atomic operations so it must be 64-bit
// aligned, otherwise atomic operations will panic. Having it at the top of
// the struct will guarantee that, even on 32-bit arches.
// See https://pkg.go.dev/sync/atomic#pkg-note-BUG for more information.
count int64
UpstreamHasSynced func() bool
}
// Start should be called prior to processing each key which is part of the
// initial list.
func (t *SingleFileTracker) Start() {
atomic.AddInt64(&t.count, 1)
}
// Finished should be called when finished processing a key which was part of
// the initial list. You must never call Finished() before (or without) its
// corresponding Start(), that is a logic error that could cause HasSynced to
// return a wrong value. To help you notice this should it happen, Finished()
// will panic if the internal counter goes negative.
func (t *SingleFileTracker) Finished() {
result := atomic.AddInt64(&t.count, -1)
if result < 0 {
panic("synctrack: negative counter; this logic error means HasSynced may return incorrect value")
}
}
// HasSynced returns true if the source is synced and every key present in the
// initial list has been processed. This relies on the source not considering
// itself synced until *after* it has delivered the notification for the last
// key, and that notification handler must have called Start.
func (t *SingleFileTracker) HasSynced() bool {
// Call UpstreamHasSynced first: it might take a lock, which might take
// a significant amount of time, and we don't want to then act on a
// stale count value.
if !t.UpstreamHasSynced() {
return false
}
return atomic.LoadInt64(&t.count) <= 0
}