From 77f03c174496c95cd0dd75e04e359d852318578d Mon Sep 17 00:00:00 2001 From: Tim Allclair Date: Tue, 30 Jan 2024 11:52:24 -0800 Subject: [PATCH] Don't export single-use types --- pkg/kubelet/config/config.go | 18 +++++++++--------- pkg/kubelet/config/mux.go | 18 +++++++++--------- pkg/kubelet/config/mux_test.go | 6 +++--- 3 files changed, 21 insertions(+), 21 deletions(-) diff --git a/pkg/kubelet/config/config.go b/pkg/kubelet/config/config.go index 98266dfb281..f6e31189f0e 100644 --- a/pkg/kubelet/config/config.go +++ b/pkg/kubelet/config/config.go @@ -60,7 +60,7 @@ type podStartupSLIObserver interface { // in order. type PodConfig struct { pods *podStorage - mux *Mux + mux *mux // the channel of denormalized changes passed to listeners updates chan kubetypes.PodUpdate @@ -77,7 +77,7 @@ func NewPodConfig(mode PodConfigNotificationMode, recorder record.EventRecorder, storage := newPodStorage(updates, mode, recorder, startupSLIObserver) podConfig := &PodConfig{ pods: storage, - mux: NewMux(storage), + mux: newMux(storage), updates: updates, sources: sets.String{}, } @@ -112,7 +112,7 @@ func (c *PodConfig) Updates() <-chan kubetypes.PodUpdate { // Sync requests the full configuration be delivered to the update channel. func (c *PodConfig) Sync() { - c.pods.Sync() + c.pods.sync() } // podStorage manages the current pod state at any point in time and ensures updates @@ -193,7 +193,7 @@ func (s *podStorage) Merge(source string, change interface{}) error { case PodConfigNotificationSnapshotAndUpdates: if len(removes.Pods) > 0 || len(adds.Pods) > 0 || firstSet { - s.updates <- kubetypes.PodUpdate{Pods: s.MergedState().([]*v1.Pod), Op: kubetypes.SET, Source: source} + s.updates <- kubetypes.PodUpdate{Pods: s.mergedState().([]*v1.Pod), Op: kubetypes.SET, Source: source} } if len(updates.Pods) > 0 { s.updates <- *updates @@ -204,7 +204,7 @@ func (s *podStorage) Merge(source string, change interface{}) error { case PodConfigNotificationSnapshot: if len(updates.Pods) > 0 || len(deletes.Pods) > 0 || len(adds.Pods) > 0 || len(removes.Pods) > 0 || firstSet { - s.updates <- kubetypes.PodUpdate{Pods: s.MergedState().([]*v1.Pod), Op: kubetypes.SET, Source: source} + s.updates <- kubetypes.PodUpdate{Pods: s.mergedState().([]*v1.Pod), Op: kubetypes.SET, Source: source} } case PodConfigNotificationUnknown: @@ -470,14 +470,14 @@ func checkAndUpdatePod(existing, ref *v1.Pod) (needUpdate, needReconcile, needGr return } -// Sync sends a copy of the current state through the update channel. -func (s *podStorage) Sync() { +// sync sends a copy of the current state through the update channel. +func (s *podStorage) sync() { s.updateLock.Lock() defer s.updateLock.Unlock() - s.updates <- kubetypes.PodUpdate{Pods: s.MergedState().([]*v1.Pod), Op: kubetypes.SET, Source: kubetypes.AllSource} + s.updates <- kubetypes.PodUpdate{Pods: s.mergedState().([]*v1.Pod), Op: kubetypes.SET, Source: kubetypes.AllSource} } -func (s *podStorage) MergedState() interface{} { +func (s *podStorage) mergedState() interface{} { s.podLock.RLock() defer s.podLock.RUnlock() pods := make([]*v1.Pod, 0) diff --git a/pkg/kubelet/config/mux.go b/pkg/kubelet/config/mux.go index 5aea63f4f8f..b2bacff5f38 100644 --- a/pkg/kubelet/config/mux.go +++ b/pkg/kubelet/config/mux.go @@ -23,18 +23,18 @@ import ( "k8s.io/apimachinery/pkg/util/wait" ) -type Merger interface { +type merger interface { // Invoked when a change from a source is received. May also function as an incremental // merger if you wish to consume changes incrementally. Must be reentrant when more than // one source is defined. Merge(source string, update interface{}) error } -// Mux is a class for merging configuration from multiple sources. Changes are +// mux is a class for merging configuration from multiple sources. Changes are // pushed via channels and sent to the merge function. -type Mux struct { +type mux struct { // Invoked when an update is sent to a source. - merger Merger + merger merger // Sources and their lock. sourceLock sync.RWMutex @@ -42,9 +42,9 @@ type Mux struct { sources map[string]chan interface{} } -// NewMux creates a new mux that can merge changes from multiple sources. -func NewMux(merger Merger) *Mux { - mux := &Mux{ +// newMux creates a new mux that can merge changes from multiple sources. +func newMux(merger merger) *mux { + mux := &mux{ sources: make(map[string]chan interface{}), merger: merger, } @@ -56,7 +56,7 @@ func NewMux(merger Merger) *Mux { // source will return the same channel. This allows change and state based sources // to use the same channel. Different source names however will be treated as a // union. -func (m *Mux) ChannelWithContext(ctx context.Context, source string) chan interface{} { +func (m *mux) ChannelWithContext(ctx context.Context, source string) chan interface{} { if len(source) == 0 { panic("Channel given an empty name") } @@ -73,7 +73,7 @@ func (m *Mux) ChannelWithContext(ctx context.Context, source string) chan interf return newChannel } -func (m *Mux) listen(source string, listenChannel <-chan interface{}) { +func (m *mux) listen(source string, listenChannel <-chan interface{}) { for update := range listenChannel { m.merger.Merge(source, update) } diff --git a/pkg/kubelet/config/mux_test.go b/pkg/kubelet/config/mux_test.go index 767d4130e50..b24068505cf 100644 --- a/pkg/kubelet/config/mux_test.go +++ b/pkg/kubelet/config/mux_test.go @@ -26,7 +26,7 @@ func TestConfigurationChannels(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - mux := NewMux(nil) + mux := newMux(nil) channelOne := mux.ChannelWithContext(ctx, "one") if channelOne != mux.ChannelWithContext(ctx, "one") { t.Error("Didn't get the same muxuration channel back with the same name") @@ -58,7 +58,7 @@ func TestMergeInvoked(t *testing.T) { defer cancel() merger := MergeMock{"one", "test", t} - mux := NewMux(&merger) + mux := newMux(&merger) mux.ChannelWithContext(ctx, "one") <- "test" } @@ -74,7 +74,7 @@ func TestSimultaneousMerge(t *testing.T) { defer cancel() ch := make(chan bool, 2) - mux := NewMux(mergeFunc(func(source string, update interface{}) error { + mux := newMux(mergeFunc(func(source string, update interface{}) error { switch source { case "one": if update.(string) != "test" {