diff --git a/pkg/kubelet/config/config.go b/pkg/kubelet/config/config.go index 7a83868cb8a..954a6d3d2a2 100644 --- a/pkg/kubelet/config/config.go +++ b/pkg/kubelet/config/config.go @@ -95,6 +95,8 @@ func (c *PodConfig) SeenAllSources(seenSources sets.String) bool { if c.pods == nil { return false } + c.sourcesLock.Lock() + defer c.sourcesLock.Unlock() klog.V(5).InfoS("Looking for sources, have seen", "sources", c.sources.List(), "seenSources", seenSources) return seenSources.HasAll(c.sources.List()...) && c.pods.seenSources(c.sources.List()...) } diff --git a/pkg/kubelet/config/config_test.go b/pkg/kubelet/config/config_test.go index cc1cdc53077..92504227fac 100644 --- a/pkg/kubelet/config/config_test.go +++ b/pkg/kubelet/config/config_test.go @@ -22,6 +22,7 @@ import ( "reflect" "sort" "strconv" + "sync" "testing" "time" @@ -29,6 +30,7 @@ import ( apiequality "k8s.io/apimachinery/pkg/api/equality" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/tools/record" kubetypes "k8s.io/kubernetes/pkg/kubelet/types" @@ -456,3 +458,29 @@ func TestPodUpdateLabels(t *testing.T) { expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.UPDATE, TestSource, pod)) } + +func TestPodConfigRace(t *testing.T) { + eventBroadcaster := record.NewBroadcaster() + config := NewPodConfig(PodConfigNotificationIncremental, eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "kubelet"})) + seenSources := sets.NewString(TestSource) + var wg sync.WaitGroup + const iterations = 100 + wg.Add(2) + + go func() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + defer wg.Done() + for i := 0; i < iterations; i++ { + config.Channel(ctx, strconv.Itoa(i)) + } + }() + go func() { + defer wg.Done() + for i := 0; i < iterations; i++ { + config.SeenAllSources(seenSources) + } + }() + + wg.Wait() +}