From b1c9ae5499b49b5630768050d92bc8ac3553d830 Mon Sep 17 00:00:00 2001 From: Ziheng Liu Date: Mon, 28 Oct 2019 18:32:27 -0400 Subject: [PATCH] Change the way of synchronization in staging/.../apiserver stopAllDelegates will signal other functions to stop updating, instead of acquiring a Mutex and never unlock it Signed-off-by: Ziheng Liu --- .../plugin/pkg/audit/dynamic/dynamic.go | 22 +++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/staging/src/k8s.io/apiserver/plugin/pkg/audit/dynamic/dynamic.go b/staging/src/k8s.io/apiserver/plugin/pkg/audit/dynamic/dynamic.go index 80004a885b8..1eac3ace0de 100644 --- a/staging/src/k8s.io/apiserver/plugin/pkg/audit/dynamic/dynamic.go +++ b/staging/src/k8s.io/apiserver/plugin/pkg/audit/dynamic/dynamic.go @@ -123,6 +123,7 @@ func NewBackend(c *Config) (audit.Backend, error) { config: c, delegates: atomic.Value{}, delegateUpdateMutex: sync.Mutex{}, + stopped: false, webhookClientManager: cm, recorder: recorder, } @@ -159,6 +160,7 @@ func NewBackend(c *Config) (audit.Backend, error) { type backend struct { // delegateUpdateMutex holds an update lock on the delegates delegateUpdateMutex sync.Mutex + stopped bool config *Config delegates atomic.Value webhookClientManager webhook.ClientManager @@ -201,6 +203,11 @@ func (b *backend) Run(stopCh <-chan struct{}) error { // the primary stopChan to the current delegate map. func (b *backend) stopAllDelegates() { b.delegateUpdateMutex.Lock() + defer b.delegateUpdateMutex.Unlock() + if b.stopped { + return + } + b.stopped = true for _, d := range b.GetDelegates() { close(d.stopChan) } @@ -237,6 +244,11 @@ func (b *backend) setDelegates(delegates syncedDelegates) { func (b *backend) addSink(sink *auditregv1alpha1.AuditSink) { b.delegateUpdateMutex.Lock() defer b.delegateUpdateMutex.Unlock() + if b.stopped { + msg := fmt.Sprintf("Could not add audit sink %q uid: %s. Update to all delegates is stopped.", sink.Name, sink.UID) + klog.Error(msg) + return + } delegates := b.copyDelegates() if _, ok := delegates[sink.UID]; ok { klog.Errorf("Audit sink %q uid: %s already exists, could not readd", sink.Name, sink.UID) @@ -262,6 +274,11 @@ func (b *backend) addSink(sink *auditregv1alpha1.AuditSink) { func (b *backend) updateSink(oldSink, newSink *auditregv1alpha1.AuditSink) { b.delegateUpdateMutex.Lock() defer b.delegateUpdateMutex.Unlock() + if b.stopped { + msg := fmt.Sprintf("Could not update old audit sink %q to new audit sink %q. Update to all delegates is stopped.", oldSink.Name, newSink.Name) + klog.Error(msg) + return + } delegates := b.copyDelegates() oldDelegate, ok := delegates[oldSink.UID] if !ok { @@ -300,6 +317,11 @@ func (b *backend) updateSink(oldSink, newSink *auditregv1alpha1.AuditSink) { func (b *backend) deleteSink(sink *auditregv1alpha1.AuditSink) { b.delegateUpdateMutex.Lock() defer b.delegateUpdateMutex.Unlock() + if b.stopped { + msg := fmt.Sprintf("Could not delete audit sink %q uid: %s. Update to all delegates is stopped.", sink.Name, sink.UID) + klog.Warning(msg) + return + } delegates := b.copyDelegates() delegate, ok := delegates[sink.UID] if !ok {