mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 11:50:44 +00:00
Merge pull request #106045 from chenlinx17/kube-apiserver-panic
Fix concurrent map writes error in kube-apiserver
This commit is contained in:
commit
d67cbcb8df
@ -19,6 +19,7 @@ package admission
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
auditinternal "k8s.io/apiserver/pkg/apis/audit"
|
||||
"k8s.io/apiserver/pkg/audit"
|
||||
@ -27,7 +28,10 @@ import (
|
||||
// auditHandler logs annotations set by other admission handlers
|
||||
type auditHandler struct {
|
||||
Interface
|
||||
ae *auditinternal.Event
|
||||
// TODO: move the lock near the Annotations field of the audit event so it is always protected from concurrent access.
|
||||
// to protect the 'Annotations' map of the audit event from concurrent writes
|
||||
mutex sync.Mutex
|
||||
ae *auditinternal.Event
|
||||
}
|
||||
|
||||
var _ Interface = &auditHandler{}
|
||||
@ -42,10 +46,10 @@ func WithAudit(i Interface, ae *auditinternal.Event) Interface {
|
||||
if i == nil {
|
||||
return i
|
||||
}
|
||||
return &auditHandler{i, ae}
|
||||
return &auditHandler{Interface: i, ae: ae}
|
||||
}
|
||||
|
||||
func (handler auditHandler) Admit(ctx context.Context, a Attributes, o ObjectInterfaces) error {
|
||||
func (handler *auditHandler) Admit(ctx context.Context, a Attributes, o ObjectInterfaces) error {
|
||||
if !handler.Interface.Handles(a.GetOperation()) {
|
||||
return nil
|
||||
}
|
||||
@ -60,7 +64,7 @@ func (handler auditHandler) Admit(ctx context.Context, a Attributes, o ObjectInt
|
||||
return err
|
||||
}
|
||||
|
||||
func (handler auditHandler) Validate(ctx context.Context, a Attributes, o ObjectInterfaces) error {
|
||||
func (handler *auditHandler) Validate(ctx context.Context, a Attributes, o ObjectInterfaces) error {
|
||||
if !handler.Interface.Handles(a.GetOperation()) {
|
||||
return nil
|
||||
}
|
||||
@ -84,10 +88,13 @@ func ensureAnnotationGetter(a Attributes) error {
|
||||
return fmt.Errorf("attributes must be an instance of privateAnnotationsGetter or AnnotationsGetter")
|
||||
}
|
||||
|
||||
func (handler auditHandler) logAnnotations(a Attributes) {
|
||||
func (handler *auditHandler) logAnnotations(a Attributes) {
|
||||
if handler.ae == nil {
|
||||
return
|
||||
}
|
||||
handler.mutex.Lock()
|
||||
defer handler.mutex.Unlock()
|
||||
|
||||
switch a := a.(type) {
|
||||
case privateAnnotationsGetter:
|
||||
for key, value := range a.getAnnotations(handler.ae.Level) {
|
||||
|
@ -19,6 +19,7 @@ package admission
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
@ -172,3 +173,32 @@ func TestWithAudit(t *testing.T) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestWithAuditConcurrency(t *testing.T) {
|
||||
admitAnnotations := map[string]string{
|
||||
"plugin.example.com/foo": "foo",
|
||||
"plugin.example.com/bar": "bar",
|
||||
"plugin.example.com/baz": "baz",
|
||||
"plugin.example.com/qux": "qux",
|
||||
}
|
||||
var handler Interface = fakeHandler{admitAnnotations: admitAnnotations, handles: true}
|
||||
ae := &auditinternal.Event{Level: auditinternal.LevelMetadata}
|
||||
auditHandler := WithAudit(handler, ae)
|
||||
a := attributes()
|
||||
|
||||
// Simulate the scenario store.DeleteCollection
|
||||
workers := 2
|
||||
wg := &sync.WaitGroup{}
|
||||
wg.Add(workers)
|
||||
for i := 0; i < workers; i++ {
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
mutator, ok := handler.(MutationInterface)
|
||||
require.True(t, ok)
|
||||
auditMutator, ok := auditHandler.(MutationInterface)
|
||||
require.True(t, ok)
|
||||
assert.Equal(t, mutator.Admit(context.TODO(), a, nil), auditMutator.Admit(context.TODO(), a, nil), "WithAudit decorator should not effect the return value")
|
||||
}()
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user