diff --git a/pkg/kubelet/config/config.go b/pkg/kubelet/config/config.go index 21cde64fd1f..f6e31189f0e 100644 --- a/pkg/kubelet/config/config.go +++ b/pkg/kubelet/config/config.go @@ -32,7 +32,6 @@ import ( "k8s.io/kubernetes/pkg/kubelet/events" kubetypes "k8s.io/kubernetes/pkg/kubelet/types" "k8s.io/kubernetes/pkg/kubelet/util/format" - "k8s.io/kubernetes/pkg/util/config" ) // PodConfigNotificationMode describes how changes are sent to the update channel. @@ -61,7 +60,7 @@ type podStartupSLIObserver interface { // in order. type PodConfig struct { pods *podStorage - mux *config.Mux + mux *mux // the channel of denormalized changes passed to listeners updates chan kubetypes.PodUpdate @@ -78,7 +77,7 @@ func NewPodConfig(mode PodConfigNotificationMode, recorder record.EventRecorder, storage := newPodStorage(updates, mode, recorder, startupSLIObserver) podConfig := &PodConfig{ pods: storage, - mux: config.NewMux(storage), + mux: newMux(storage), updates: updates, sources: sets.String{}, } @@ -113,7 +112,7 @@ func (c *PodConfig) Updates() <-chan kubetypes.PodUpdate { // Sync requests the full configuration be delivered to the update channel. func (c *PodConfig) Sync() { - c.pods.Sync() + c.pods.sync() } // podStorage manages the current pod state at any point in time and ensures updates @@ -194,7 +193,7 @@ func (s *podStorage) Merge(source string, change interface{}) error { case PodConfigNotificationSnapshotAndUpdates: if len(removes.Pods) > 0 || len(adds.Pods) > 0 || firstSet { - s.updates <- kubetypes.PodUpdate{Pods: s.MergedState().([]*v1.Pod), Op: kubetypes.SET, Source: source} + s.updates <- kubetypes.PodUpdate{Pods: s.mergedState().([]*v1.Pod), Op: kubetypes.SET, Source: source} } if len(updates.Pods) > 0 { s.updates <- *updates @@ -205,7 +204,7 @@ func (s *podStorage) Merge(source string, change interface{}) error { case PodConfigNotificationSnapshot: if len(updates.Pods) > 0 || len(deletes.Pods) > 0 || len(adds.Pods) > 0 || len(removes.Pods) > 0 || firstSet { - s.updates <- kubetypes.PodUpdate{Pods: s.MergedState().([]*v1.Pod), Op: kubetypes.SET, Source: source} + s.updates <- kubetypes.PodUpdate{Pods: s.mergedState().([]*v1.Pod), Op: kubetypes.SET, Source: source} } case PodConfigNotificationUnknown: @@ -471,15 +470,14 @@ func checkAndUpdatePod(existing, ref *v1.Pod) (needUpdate, needReconcile, needGr return } -// Sync sends a copy of the current state through the update channel. -func (s *podStorage) Sync() { +// sync sends a copy of the current state through the update channel. +func (s *podStorage) sync() { s.updateLock.Lock() defer s.updateLock.Unlock() - s.updates <- kubetypes.PodUpdate{Pods: s.MergedState().([]*v1.Pod), Op: kubetypes.SET, Source: kubetypes.AllSource} + s.updates <- kubetypes.PodUpdate{Pods: s.mergedState().([]*v1.Pod), Op: kubetypes.SET, Source: kubetypes.AllSource} } -// Object implements config.Accessor -func (s *podStorage) MergedState() interface{} { +func (s *podStorage) mergedState() interface{} { s.podLock.RLock() defer s.podLock.RUnlock() pods := make([]*v1.Pod, 0) diff --git a/pkg/util/config/config.go b/pkg/kubelet/config/mux.go similarity index 50% rename from pkg/util/config/config.go rename to pkg/kubelet/config/mux.go index 66fce13866d..b2bacff5f38 100644 --- a/pkg/util/config/config.go +++ b/pkg/kubelet/config/mux.go @@ -23,25 +23,18 @@ import ( "k8s.io/apimachinery/pkg/util/wait" ) -type Merger interface { +type merger interface { // Invoked when a change from a source is received. May also function as an incremental // merger if you wish to consume changes incrementally. Must be reentrant when more than // one source is defined. Merge(source string, update interface{}) error } -// MergeFunc implements the Merger interface -type MergeFunc func(source string, update interface{}) error - -func (f MergeFunc) Merge(source string, update interface{}) error { - return f(source, update) -} - -// Mux is a class for merging configuration from multiple sources. Changes are +// mux is a class for merging configuration from multiple sources. Changes are // pushed via channels and sent to the merge function. -type Mux struct { +type mux struct { // Invoked when an update is sent to a source. - merger Merger + merger merger // Sources and their lock. sourceLock sync.RWMutex @@ -49,9 +42,9 @@ type Mux struct { sources map[string]chan interface{} } -// NewMux creates a new mux that can merge changes from multiple sources. -func NewMux(merger Merger) *Mux { - mux := &Mux{ +// newMux creates a new mux that can merge changes from multiple sources. +func newMux(merger merger) *mux { + mux := &mux{ sources: make(map[string]chan interface{}), merger: merger, } @@ -63,7 +56,7 @@ func NewMux(merger Merger) *Mux { // source will return the same channel. This allows change and state based sources // to use the same channel. Different source names however will be treated as a // union. -func (m *Mux) ChannelWithContext(ctx context.Context, source string) chan interface{} { +func (m *mux) ChannelWithContext(ctx context.Context, source string) chan interface{} { if len(source) == 0 { panic("Channel given an empty name") } @@ -80,63 +73,8 @@ func (m *Mux) ChannelWithContext(ctx context.Context, source string) chan interf return newChannel } -func (m *Mux) listen(source string, listenChannel <-chan interface{}) { +func (m *mux) listen(source string, listenChannel <-chan interface{}) { for update := range listenChannel { m.merger.Merge(source, update) } } - -// Accessor is an interface for retrieving the current merge state. -type Accessor interface { - // MergedState returns a representation of the current merge state. - // Must be reentrant when more than one source is defined. - MergedState() interface{} -} - -// AccessorFunc implements the Accessor interface. -type AccessorFunc func() interface{} - -func (f AccessorFunc) MergedState() interface{} { - return f() -} - -type Listener interface { - // OnUpdate is invoked when a change is made to an object. - OnUpdate(instance interface{}) -} - -// ListenerFunc receives a representation of the change or object. -type ListenerFunc func(instance interface{}) - -func (f ListenerFunc) OnUpdate(instance interface{}) { - f(instance) -} - -type Broadcaster struct { - // Listeners for changes and their lock. - listenerLock sync.RWMutex - listeners []Listener -} - -// NewBroadcaster registers a set of listeners that support the Listener interface -// and notifies them all on changes. -func NewBroadcaster() *Broadcaster { - return &Broadcaster{} -} - -// Add registers listener to receive updates of changes. -func (b *Broadcaster) Add(listener Listener) { - b.listenerLock.Lock() - defer b.listenerLock.Unlock() - b.listeners = append(b.listeners, listener) -} - -// Notify notifies all listeners. -func (b *Broadcaster) Notify(instance interface{}) { - b.listenerLock.RLock() - listeners := b.listeners - b.listenerLock.RUnlock() - for _, listener := range listeners { - listener.OnUpdate(instance) - } -} diff --git a/pkg/util/config/config_test.go b/pkg/kubelet/config/mux_test.go similarity index 69% rename from pkg/util/config/config_test.go rename to pkg/kubelet/config/mux_test.go index 060e4ec4c98..b24068505cf 100644 --- a/pkg/util/config/config_test.go +++ b/pkg/kubelet/config/mux_test.go @@ -26,7 +26,7 @@ func TestConfigurationChannels(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - mux := NewMux(nil) + mux := newMux(nil) channelOne := mux.ChannelWithContext(ctx, "one") if channelOne != mux.ChannelWithContext(ctx, "one") { t.Error("Didn't get the same muxuration channel back with the same name") @@ -58,27 +58,15 @@ func TestMergeInvoked(t *testing.T) { defer cancel() merger := MergeMock{"one", "test", t} - mux := NewMux(&merger) + mux := newMux(&merger) mux.ChannelWithContext(ctx, "one") <- "test" } -func TestMergeFuncInvoked(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() +// mergeFunc implements the Merger interface +type mergeFunc func(source string, update interface{}) error - ch := make(chan bool) - mux := NewMux(MergeFunc(func(source string, update interface{}) error { - if source != "one" { - t.Errorf("Expected %s, Got %s", "one", source) - } - if update.(string) != "test" { - t.Errorf("Expected %s, Got %s", "test", update) - } - ch <- true - return nil - })) - mux.ChannelWithContext(ctx, "one") <- "test" - <-ch +func (f mergeFunc) Merge(source string, update interface{}) error { + return f(source, update) } func TestSimultaneousMerge(t *testing.T) { @@ -86,7 +74,7 @@ func TestSimultaneousMerge(t *testing.T) { defer cancel() ch := make(chan bool, 2) - mux := NewMux(MergeFunc(func(source string, update interface{}) error { + mux := newMux(mergeFunc(func(source string, update interface{}) error { switch source { case "one": if update.(string) != "test" { @@ -109,25 +97,3 @@ func TestSimultaneousMerge(t *testing.T) { <-ch <-ch } - -func TestBroadcaster(t *testing.T) { - b := NewBroadcaster() - b.Notify(struct{}{}) - - ch := make(chan bool, 2) - b.Add(ListenerFunc(func(object interface{}) { - if object != "test" { - t.Errorf("Expected %s, Got %s", "test", object) - } - ch <- true - })) - b.Add(ListenerFunc(func(object interface{}) { - if object != "test" { - t.Errorf("Expected %s, Got %s", "test", object) - } - ch <- true - })) - b.Notify("test") - <-ch - <-ch -} diff --git a/pkg/util/config/doc.go b/pkg/util/config/doc.go deleted file mode 100644 index 5e9a469dff6..00000000000 --- a/pkg/util/config/doc.go +++ /dev/null @@ -1,20 +0,0 @@ -/* -Copyright 2014 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -// Package config provides utility objects for decoupling sources of configuration and the -// actual configuration state. Consumers must implement the Merger interface to unify -// the sources of change into an object. -package config // import "k8s.io/kubernetes/pkg/util/config"