mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-30 15:05:27 +00:00
Merge pull request #84483 from zxl381/MU_stopAllD
Fix a double lock bug in staging/.../apiserver
This commit is contained in:
commit
6dde01d314
@ -123,6 +123,7 @@ func NewBackend(c *Config) (audit.Backend, error) {
|
|||||||
config: c,
|
config: c,
|
||||||
delegates: atomic.Value{},
|
delegates: atomic.Value{},
|
||||||
delegateUpdateMutex: sync.Mutex{},
|
delegateUpdateMutex: sync.Mutex{},
|
||||||
|
stopped: false,
|
||||||
webhookClientManager: cm,
|
webhookClientManager: cm,
|
||||||
recorder: recorder,
|
recorder: recorder,
|
||||||
}
|
}
|
||||||
@ -159,6 +160,7 @@ func NewBackend(c *Config) (audit.Backend, error) {
|
|||||||
type backend struct {
|
type backend struct {
|
||||||
// delegateUpdateMutex holds an update lock on the delegates
|
// delegateUpdateMutex holds an update lock on the delegates
|
||||||
delegateUpdateMutex sync.Mutex
|
delegateUpdateMutex sync.Mutex
|
||||||
|
stopped bool
|
||||||
config *Config
|
config *Config
|
||||||
delegates atomic.Value
|
delegates atomic.Value
|
||||||
webhookClientManager webhook.ClientManager
|
webhookClientManager webhook.ClientManager
|
||||||
@ -201,6 +203,11 @@ func (b *backend) Run(stopCh <-chan struct{}) error {
|
|||||||
// the primary stopChan to the current delegate map.
|
// the primary stopChan to the current delegate map.
|
||||||
func (b *backend) stopAllDelegates() {
|
func (b *backend) stopAllDelegates() {
|
||||||
b.delegateUpdateMutex.Lock()
|
b.delegateUpdateMutex.Lock()
|
||||||
|
defer b.delegateUpdateMutex.Unlock()
|
||||||
|
if b.stopped {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
b.stopped = true
|
||||||
for _, d := range b.GetDelegates() {
|
for _, d := range b.GetDelegates() {
|
||||||
close(d.stopChan)
|
close(d.stopChan)
|
||||||
}
|
}
|
||||||
@ -237,6 +244,11 @@ func (b *backend) setDelegates(delegates syncedDelegates) {
|
|||||||
func (b *backend) addSink(sink *auditregv1alpha1.AuditSink) {
|
func (b *backend) addSink(sink *auditregv1alpha1.AuditSink) {
|
||||||
b.delegateUpdateMutex.Lock()
|
b.delegateUpdateMutex.Lock()
|
||||||
defer b.delegateUpdateMutex.Unlock()
|
defer b.delegateUpdateMutex.Unlock()
|
||||||
|
if b.stopped {
|
||||||
|
msg := fmt.Sprintf("Could not add audit sink %q uid: %s. Update to all delegates is stopped.", sink.Name, sink.UID)
|
||||||
|
klog.Error(msg)
|
||||||
|
return
|
||||||
|
}
|
||||||
delegates := b.copyDelegates()
|
delegates := b.copyDelegates()
|
||||||
if _, ok := delegates[sink.UID]; ok {
|
if _, ok := delegates[sink.UID]; ok {
|
||||||
klog.Errorf("Audit sink %q uid: %s already exists, could not readd", sink.Name, sink.UID)
|
klog.Errorf("Audit sink %q uid: %s already exists, could not readd", sink.Name, sink.UID)
|
||||||
@ -262,6 +274,11 @@ func (b *backend) addSink(sink *auditregv1alpha1.AuditSink) {
|
|||||||
func (b *backend) updateSink(oldSink, newSink *auditregv1alpha1.AuditSink) {
|
func (b *backend) updateSink(oldSink, newSink *auditregv1alpha1.AuditSink) {
|
||||||
b.delegateUpdateMutex.Lock()
|
b.delegateUpdateMutex.Lock()
|
||||||
defer b.delegateUpdateMutex.Unlock()
|
defer b.delegateUpdateMutex.Unlock()
|
||||||
|
if b.stopped {
|
||||||
|
msg := fmt.Sprintf("Could not update old audit sink %q to new audit sink %q. Update to all delegates is stopped.", oldSink.Name, newSink.Name)
|
||||||
|
klog.Error(msg)
|
||||||
|
return
|
||||||
|
}
|
||||||
delegates := b.copyDelegates()
|
delegates := b.copyDelegates()
|
||||||
oldDelegate, ok := delegates[oldSink.UID]
|
oldDelegate, ok := delegates[oldSink.UID]
|
||||||
if !ok {
|
if !ok {
|
||||||
@ -300,6 +317,11 @@ func (b *backend) updateSink(oldSink, newSink *auditregv1alpha1.AuditSink) {
|
|||||||
func (b *backend) deleteSink(sink *auditregv1alpha1.AuditSink) {
|
func (b *backend) deleteSink(sink *auditregv1alpha1.AuditSink) {
|
||||||
b.delegateUpdateMutex.Lock()
|
b.delegateUpdateMutex.Lock()
|
||||||
defer b.delegateUpdateMutex.Unlock()
|
defer b.delegateUpdateMutex.Unlock()
|
||||||
|
if b.stopped {
|
||||||
|
msg := fmt.Sprintf("Could not delete audit sink %q uid: %s. Update to all delegates is stopped.", sink.Name, sink.UID)
|
||||||
|
klog.Warning(msg)
|
||||||
|
return
|
||||||
|
}
|
||||||
delegates := b.copyDelegates()
|
delegates := b.copyDelegates()
|
||||||
delegate, ok := delegates[sink.UID]
|
delegate, ok := delegates[sink.UID]
|
||||||
if !ok {
|
if !ok {
|
||||||
|
Loading…
Reference in New Issue
Block a user