Don't export single-use types

This commit is contained in:
Tim Allclair 2024-01-30 11:52:24 -08:00
parent 01155f59c3
commit 77f03c1744
3 changed files with 21 additions and 21 deletions

View File

@ -60,7 +60,7 @@ type podStartupSLIObserver interface {
// in order. // in order.
type PodConfig struct { type PodConfig struct {
pods *podStorage pods *podStorage
mux *Mux mux *mux
// the channel of denormalized changes passed to listeners // the channel of denormalized changes passed to listeners
updates chan kubetypes.PodUpdate updates chan kubetypes.PodUpdate
@ -77,7 +77,7 @@ func NewPodConfig(mode PodConfigNotificationMode, recorder record.EventRecorder,
storage := newPodStorage(updates, mode, recorder, startupSLIObserver) storage := newPodStorage(updates, mode, recorder, startupSLIObserver)
podConfig := &PodConfig{ podConfig := &PodConfig{
pods: storage, pods: storage,
mux: NewMux(storage), mux: newMux(storage),
updates: updates, updates: updates,
sources: sets.String{}, sources: sets.String{},
} }
@ -112,7 +112,7 @@ func (c *PodConfig) Updates() <-chan kubetypes.PodUpdate {
// Sync requests the full configuration be delivered to the update channel. // Sync requests the full configuration be delivered to the update channel.
func (c *PodConfig) Sync() { func (c *PodConfig) Sync() {
c.pods.Sync() c.pods.sync()
} }
// podStorage manages the current pod state at any point in time and ensures updates // podStorage manages the current pod state at any point in time and ensures updates
@ -193,7 +193,7 @@ func (s *podStorage) Merge(source string, change interface{}) error {
case PodConfigNotificationSnapshotAndUpdates: case PodConfigNotificationSnapshotAndUpdates:
if len(removes.Pods) > 0 || len(adds.Pods) > 0 || firstSet { if len(removes.Pods) > 0 || len(adds.Pods) > 0 || firstSet {
s.updates <- kubetypes.PodUpdate{Pods: s.MergedState().([]*v1.Pod), Op: kubetypes.SET, Source: source} s.updates <- kubetypes.PodUpdate{Pods: s.mergedState().([]*v1.Pod), Op: kubetypes.SET, Source: source}
} }
if len(updates.Pods) > 0 { if len(updates.Pods) > 0 {
s.updates <- *updates s.updates <- *updates
@ -204,7 +204,7 @@ func (s *podStorage) Merge(source string, change interface{}) error {
case PodConfigNotificationSnapshot: case PodConfigNotificationSnapshot:
if len(updates.Pods) > 0 || len(deletes.Pods) > 0 || len(adds.Pods) > 0 || len(removes.Pods) > 0 || firstSet { if len(updates.Pods) > 0 || len(deletes.Pods) > 0 || len(adds.Pods) > 0 || len(removes.Pods) > 0 || firstSet {
s.updates <- kubetypes.PodUpdate{Pods: s.MergedState().([]*v1.Pod), Op: kubetypes.SET, Source: source} s.updates <- kubetypes.PodUpdate{Pods: s.mergedState().([]*v1.Pod), Op: kubetypes.SET, Source: source}
} }
case PodConfigNotificationUnknown: case PodConfigNotificationUnknown:
@ -470,14 +470,14 @@ func checkAndUpdatePod(existing, ref *v1.Pod) (needUpdate, needReconcile, needGr
return return
} }
// Sync sends a copy of the current state through the update channel. // sync sends a copy of the current state through the update channel.
func (s *podStorage) Sync() { func (s *podStorage) sync() {
s.updateLock.Lock() s.updateLock.Lock()
defer s.updateLock.Unlock() defer s.updateLock.Unlock()
s.updates <- kubetypes.PodUpdate{Pods: s.MergedState().([]*v1.Pod), Op: kubetypes.SET, Source: kubetypes.AllSource} s.updates <- kubetypes.PodUpdate{Pods: s.mergedState().([]*v1.Pod), Op: kubetypes.SET, Source: kubetypes.AllSource}
} }
func (s *podStorage) MergedState() interface{} { func (s *podStorage) mergedState() interface{} {
s.podLock.RLock() s.podLock.RLock()
defer s.podLock.RUnlock() defer s.podLock.RUnlock()
pods := make([]*v1.Pod, 0) pods := make([]*v1.Pod, 0)

View File

@ -23,18 +23,18 @@ import (
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
) )
type Merger interface { type merger interface {
// Invoked when a change from a source is received. May also function as an incremental // Invoked when a change from a source is received. May also function as an incremental
// merger if you wish to consume changes incrementally. Must be reentrant when more than // merger if you wish to consume changes incrementally. Must be reentrant when more than
// one source is defined. // one source is defined.
Merge(source string, update interface{}) error Merge(source string, update interface{}) error
} }
// Mux is a class for merging configuration from multiple sources. Changes are // mux is a class for merging configuration from multiple sources. Changes are
// pushed via channels and sent to the merge function. // pushed via channels and sent to the merge function.
type Mux struct { type mux struct {
// Invoked when an update is sent to a source. // Invoked when an update is sent to a source.
merger Merger merger merger
// Sources and their lock. // Sources and their lock.
sourceLock sync.RWMutex sourceLock sync.RWMutex
@ -42,9 +42,9 @@ type Mux struct {
sources map[string]chan interface{} sources map[string]chan interface{}
} }
// NewMux creates a new mux that can merge changes from multiple sources. // newMux creates a new mux that can merge changes from multiple sources.
func NewMux(merger Merger) *Mux { func newMux(merger merger) *mux {
mux := &Mux{ mux := &mux{
sources: make(map[string]chan interface{}), sources: make(map[string]chan interface{}),
merger: merger, merger: merger,
} }
@ -56,7 +56,7 @@ func NewMux(merger Merger) *Mux {
// source will return the same channel. This allows change and state based sources // source will return the same channel. This allows change and state based sources
// to use the same channel. Different source names however will be treated as a // to use the same channel. Different source names however will be treated as a
// union. // union.
func (m *Mux) ChannelWithContext(ctx context.Context, source string) chan interface{} { func (m *mux) ChannelWithContext(ctx context.Context, source string) chan interface{} {
if len(source) == 0 { if len(source) == 0 {
panic("Channel given an empty name") panic("Channel given an empty name")
} }
@ -73,7 +73,7 @@ func (m *Mux) ChannelWithContext(ctx context.Context, source string) chan interf
return newChannel return newChannel
} }
func (m *Mux) listen(source string, listenChannel <-chan interface{}) { func (m *mux) listen(source string, listenChannel <-chan interface{}) {
for update := range listenChannel { for update := range listenChannel {
m.merger.Merge(source, update) m.merger.Merge(source, update)
} }

View File

@ -26,7 +26,7 @@ func TestConfigurationChannels(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
mux := NewMux(nil) mux := newMux(nil)
channelOne := mux.ChannelWithContext(ctx, "one") channelOne := mux.ChannelWithContext(ctx, "one")
if channelOne != mux.ChannelWithContext(ctx, "one") { if channelOne != mux.ChannelWithContext(ctx, "one") {
t.Error("Didn't get the same muxuration channel back with the same name") t.Error("Didn't get the same muxuration channel back with the same name")
@ -58,7 +58,7 @@ func TestMergeInvoked(t *testing.T) {
defer cancel() defer cancel()
merger := MergeMock{"one", "test", t} merger := MergeMock{"one", "test", t}
mux := NewMux(&merger) mux := newMux(&merger)
mux.ChannelWithContext(ctx, "one") <- "test" mux.ChannelWithContext(ctx, "one") <- "test"
} }
@ -74,7 +74,7 @@ func TestSimultaneousMerge(t *testing.T) {
defer cancel() defer cancel()
ch := make(chan bool, 2) ch := make(chan bool, 2)
mux := NewMux(mergeFunc(func(source string, update interface{}) error { mux := newMux(mergeFunc(func(source string, update interface{}) error {
switch source { switch source {
case "one": case "one":
if update.(string) != "test" { if update.(string) != "test" {