mirror of
https://github.com/kubernetes/client-go.git
synced 2025-06-23 13:47:19 +00:00
count is used with atomic operations so it must be 64-bit aligned, otherwise atomic operations will panic. Having it at the top of the struct will guarantee that, even on 32-bit arches. This fixes panics like that one observed in kube-apiserver: E0310 13:48:47.476124 676 runtime.go:77] Observed a panic: unaligned 64-bit atomic operation goroutine 141 [running]: k8s.io/apimachinery/pkg/util/runtime.logPanic({0x2482378, 0x2db2ff8}) vendor/k8s.io/apimachinery/pkg/util/runtime/runtime.go:75 +0x94 k8s.io/apimachinery/pkg/util/runtime.HandleCrash({0x0, 0x0, 0x0}) vendor/k8s.io/apimachinery/pkg/util/runtime/runtime.go:49 +0x78 panic({0x2482378, 0x2db2ff8}) /usr/local/go/src/runtime/panic.go:884 +0x218 runtime/internal/atomic.panicUnaligned() /usr/local/go/src/runtime/internal/atomic/unaligned.go:8 +0x24 runtime/internal/atomic.Load64(0x685f794) /usr/local/go/src/runtime/internal/atomic/atomic_arm.s:280 +0x14 k8s.io/client-go/tools/cache/synctrack.(*SingleFileTracker).HasSynced(0x685f790) vendor/k8s.io/client-go/tools/cache/synctrack/synctrack.go:115 +0x3c k8s.io/client-go/tools/cache.(*processorListener).HasSynced(0x6013e60) vendor/k8s.io/client-go/tools/cache/shared_informer.go:907 +0x20 k8s.io/client-go/tools/cache.WaitForCacheSync.func1() vendor/k8s.io/client-go/tools/cache/shared_informer.go:332 +0x50 k8s.io/apimachinery/pkg/util/wait.ConditionFunc.WithContext.func1({0x2dcf274, 0x607c600}) vendor/k8s.io/apimachinery/pkg/util/wait/wait.go:222 +0x1c k8s.io/apimachinery/pkg/util/wait.runConditionWithCrashProtectionWithContext({0x2dcf274, 0x607c600}, 0x6382050) vendor/k8s.io/apimachinery/pkg/util/wait/wait.go:262 +0x64 k8s.io/apimachinery/pkg/util/wait.waitForWithContext({0x2dcf274, 0x607c600}, 0x64a6060, 0x6382050) vendor/k8s.io/apimachinery/pkg/util/wait/wait.go:649 +0x11c k8s.io/apimachinery/pkg/util/wait.poll({0x2dcf274, 0x607c600}, 0x1, 0x64a6060, 0x6382050) vendor/k8s.io/apimachinery/pkg/util/wait/wait.go:600 +0xc4 k8s.io/apimachinery/pkg/util/wait.PollImmediateUntilWithContext({0x2dcf274, 0x607c600}, 0x5f5e100, 0x6382050) vendor/k8s.io/apimachinery/pkg/util/wait/wait.go:551 +0x60 k8s.io/apimachinery/pkg/util/wait.PollImmediateUntil(0x5f5e100, 0x6298020, 0x607c600) vendor/k8s.io/apimachinery/pkg/util/wait/wait.go:542 +0x48 k8s.io/client-go/tools/cache.WaitForCacheSync(0x607c600, {0x6298000, 0x3, 0x3}) vendor/k8s.io/client-go/tools/cache/shared_informer.go:329 +0x80 k8s.io/client-go/tools/cache.WaitForNamedCacheSync({0x283c5e1, 0xf}, 0x607c600, {0x6298000, 0x3, 0x3}) vendor/k8s.io/client-go/tools/cache/shared_informer.go:316 +0xe8 created by k8s.io/kubernetes/plugin/pkg/auth/authorizer/node.AddGraphEventHandlers plugin/pkg/auth/authorizer/node/graph_populator.go:65 +0x5b0 panic: unaligned 64-bit atomic operation [recovered] panic: unaligned 64-bit atomic operation goroutine 141 [running]: k8s.io/apimachinery/pkg/util/runtime.HandleCrash({0x0, 0x0, 0x0}) vendor/k8s.io/apimachinery/pkg/util/runtime/runtime.go:56 +0xf4 panic({0x2482378, 0x2db2ff8}) /usr/local/go/src/runtime/panic.go:884 +0x218 runtime/internal/atomic.panicUnaligned() /usr/local/go/src/runtime/internal/atomic/unaligned.go:8 +0x24 runtime/internal/atomic.Load64(0x685f794) /usr/local/go/src/runtime/internal/atomic/atomic_arm.s:280 +0x14 k8s.io/client-go/tools/cache/synctrack.(*SingleFileTracker).HasSynced(0x685f790) vendor/k8s.io/client-go/tools/cache/synctrack/synctrack.go:115 +0x3c k8s.io/client-go/tools/cache.(*processorListener).HasSynced(0x6013e60) vendor/k8s.io/client-go/tools/cache/shared_informer.go:907 +0x20 k8s.io/client-go/tools/cache.WaitForCacheSync.func1() vendor/k8s.io/client-go/tools/cache/shared_informer.go:332 +0x50 k8s.io/apimachinery/pkg/util/wait.ConditionFunc.WithContext.func1({0x2dcf274, 0x607c600}) vendor/k8s.io/apimachinery/pkg/util/wait/wait.go:222 +0x1c k8s.io/apimachinery/pkg/util/wait.runConditionWithCrashProtectionWithContext({0x2dcf274, 0x607c600}, 0x6382050) vendor/k8s.io/apimachinery/pkg/util/wait/wait.go:262 +0x64 k8s.io/apimachinery/pkg/util/wait.waitForWithContext({0x2dcf274, 0x607c600}, 0x64a6060, 0x6382050) vendor/k8s.io/apimachinery/pkg/util/wait/wait.go:649 +0x11c k8s.io/apimachinery/pkg/util/wait.poll({0x2dcf274, 0x607c600}, 0x1, 0x64a6060, 0x6382050) vendor/k8s.io/apimachinery/pkg/util/wait/wait.go:600 +0xc4 k8s.io/apimachinery/pkg/util/wait.PollImmediateUntilWithContext({0x2dcf274, 0x607c600}, 0x5f5e100, 0x6382050) vendor/k8s.io/apimachinery/pkg/util/wait/wait.go:551 +0x60 k8s.io/apimachinery/pkg/util/wait.PollImmediateUntil(0x5f5e100, 0x6298020, 0x607c600) vendor/k8s.io/apimachinery/pkg/util/wait/wait.go:542 +0x48 k8s.io/client-go/tools/cache.WaitForCacheSync(0x607c600, {0x6298000, 0x3, 0x3}) vendor/k8s.io/client-go/tools/cache/shared_informer.go:329 +0x80 k8s.io/client-go/tools/cache.WaitForNamedCacheSync({0x283c5e1, 0xf}, 0x607c600, {0x6298000, 0x3, 0x3}) vendor/k8s.io/client-go/tools/cache/shared_informer.go:316 +0xe8 created by k8s.io/kubernetes/plugin/pkg/auth/authorizer/node.AddGraphEventHandlers plugin/pkg/auth/authorizer/node/graph_populator.go:65 +0x5b0 Kubernetes-commit: ffcf653e0666366e6241c99d9418e830840afa0f
121 lines
4.2 KiB
Go
121 lines
4.2 KiB
Go
/*
|
|
Copyright 2022 The Kubernetes Authors.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
// Package synctrack contains utilities for helping controllers track whether
|
|
// they are "synced" or not, that is, whether they have processed all items
|
|
// from the informer's initial list.
|
|
package synctrack
|
|
|
|
import (
|
|
"sync"
|
|
"sync/atomic"
|
|
|
|
"k8s.io/apimachinery/pkg/util/sets"
|
|
)
|
|
|
|
// AsyncTracker helps propagate HasSynced in the face of multiple worker threads.
|
|
type AsyncTracker[T comparable] struct {
|
|
UpstreamHasSynced func() bool
|
|
|
|
lock sync.Mutex
|
|
waiting sets.Set[T]
|
|
}
|
|
|
|
// Start should be called prior to processing each key which is part of the
|
|
// initial list.
|
|
func (t *AsyncTracker[T]) Start(key T) {
|
|
t.lock.Lock()
|
|
defer t.lock.Unlock()
|
|
if t.waiting == nil {
|
|
t.waiting = sets.New[T](key)
|
|
} else {
|
|
t.waiting.Insert(key)
|
|
}
|
|
}
|
|
|
|
// Finished should be called when finished processing a key which was part of
|
|
// the initial list. Since keys are tracked individually, nothing bad happens
|
|
// if you call Finished without a corresponding call to Start. This makes it
|
|
// easier to use this in combination with e.g. queues which don't make it easy
|
|
// to plumb through the isInInitialList boolean.
|
|
func (t *AsyncTracker[T]) Finished(key T) {
|
|
t.lock.Lock()
|
|
defer t.lock.Unlock()
|
|
if t.waiting != nil {
|
|
t.waiting.Delete(key)
|
|
}
|
|
}
|
|
|
|
// HasSynced returns true if the source is synced and every key present in the
|
|
// initial list has been processed. This relies on the source not considering
|
|
// itself synced until *after* it has delivered the notification for the last
|
|
// key, and that notification handler must have called Start.
|
|
func (t *AsyncTracker[T]) HasSynced() bool {
|
|
// Call UpstreamHasSynced first: it might take a lock, which might take
|
|
// a significant amount of time, and we can't hold our lock while
|
|
// waiting on that or a user is likely to get a deadlock.
|
|
if !t.UpstreamHasSynced() {
|
|
return false
|
|
}
|
|
t.lock.Lock()
|
|
defer t.lock.Unlock()
|
|
return t.waiting.Len() == 0
|
|
}
|
|
|
|
// SingleFileTracker helps propagate HasSynced when events are processed in
|
|
// order (i.e. via a queue).
|
|
type SingleFileTracker struct {
|
|
// Important: count is used with atomic operations so it must be 64-bit
|
|
// aligned, otherwise atomic operations will panic. Having it at the top of
|
|
// the struct will guarantee that, even on 32-bit arches.
|
|
// See https://pkg.go.dev/sync/atomic#pkg-note-BUG for more information.
|
|
count int64
|
|
|
|
UpstreamHasSynced func() bool
|
|
}
|
|
|
|
// Start should be called prior to processing each key which is part of the
|
|
// initial list.
|
|
func (t *SingleFileTracker) Start() {
|
|
atomic.AddInt64(&t.count, 1)
|
|
}
|
|
|
|
// Finished should be called when finished processing a key which was part of
|
|
// the initial list. You must never call Finished() before (or without) its
|
|
// corresponding Start(), that is a logic error that could cause HasSynced to
|
|
// return a wrong value. To help you notice this should it happen, Finished()
|
|
// will panic if the internal counter goes negative.
|
|
func (t *SingleFileTracker) Finished() {
|
|
result := atomic.AddInt64(&t.count, -1)
|
|
if result < 0 {
|
|
panic("synctrack: negative counter; this logic error means HasSynced may return incorrect value")
|
|
}
|
|
}
|
|
|
|
// HasSynced returns true if the source is synced and every key present in the
|
|
// initial list has been processed. This relies on the source not considering
|
|
// itself synced until *after* it has delivered the notification for the last
|
|
// key, and that notification handler must have called Start.
|
|
func (t *SingleFileTracker) HasSynced() bool {
|
|
// Call UpstreamHasSynced first: it might take a lock, which might take
|
|
// a significant amount of time, and we don't want to then act on a
|
|
// stale count value.
|
|
if !t.UpstreamHasSynced() {
|
|
return false
|
|
}
|
|
return atomic.LoadInt64(&t.count) <= 0
|
|
}
|