mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-30 06:54:01 +00:00
Merge pull request #123039 from tallclair/configutil
Clean up single use pkg/util/config
This commit is contained in:
commit
46c9bd1267
@ -32,7 +32,6 @@ import (
|
||||
"k8s.io/kubernetes/pkg/kubelet/events"
|
||||
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
|
||||
"k8s.io/kubernetes/pkg/kubelet/util/format"
|
||||
"k8s.io/kubernetes/pkg/util/config"
|
||||
)
|
||||
|
||||
// PodConfigNotificationMode describes how changes are sent to the update channel.
|
||||
@ -61,7 +60,7 @@ type podStartupSLIObserver interface {
|
||||
// in order.
|
||||
type PodConfig struct {
|
||||
pods *podStorage
|
||||
mux *config.Mux
|
||||
mux *mux
|
||||
|
||||
// the channel of denormalized changes passed to listeners
|
||||
updates chan kubetypes.PodUpdate
|
||||
@ -78,7 +77,7 @@ func NewPodConfig(mode PodConfigNotificationMode, recorder record.EventRecorder,
|
||||
storage := newPodStorage(updates, mode, recorder, startupSLIObserver)
|
||||
podConfig := &PodConfig{
|
||||
pods: storage,
|
||||
mux: config.NewMux(storage),
|
||||
mux: newMux(storage),
|
||||
updates: updates,
|
||||
sources: sets.String{},
|
||||
}
|
||||
@ -113,7 +112,7 @@ func (c *PodConfig) Updates() <-chan kubetypes.PodUpdate {
|
||||
|
||||
// Sync requests the full configuration be delivered to the update channel.
|
||||
func (c *PodConfig) Sync() {
|
||||
c.pods.Sync()
|
||||
c.pods.sync()
|
||||
}
|
||||
|
||||
// podStorage manages the current pod state at any point in time and ensures updates
|
||||
@ -194,7 +193,7 @@ func (s *podStorage) Merge(source string, change interface{}) error {
|
||||
|
||||
case PodConfigNotificationSnapshotAndUpdates:
|
||||
if len(removes.Pods) > 0 || len(adds.Pods) > 0 || firstSet {
|
||||
s.updates <- kubetypes.PodUpdate{Pods: s.MergedState().([]*v1.Pod), Op: kubetypes.SET, Source: source}
|
||||
s.updates <- kubetypes.PodUpdate{Pods: s.mergedState().([]*v1.Pod), Op: kubetypes.SET, Source: source}
|
||||
}
|
||||
if len(updates.Pods) > 0 {
|
||||
s.updates <- *updates
|
||||
@ -205,7 +204,7 @@ func (s *podStorage) Merge(source string, change interface{}) error {
|
||||
|
||||
case PodConfigNotificationSnapshot:
|
||||
if len(updates.Pods) > 0 || len(deletes.Pods) > 0 || len(adds.Pods) > 0 || len(removes.Pods) > 0 || firstSet {
|
||||
s.updates <- kubetypes.PodUpdate{Pods: s.MergedState().([]*v1.Pod), Op: kubetypes.SET, Source: source}
|
||||
s.updates <- kubetypes.PodUpdate{Pods: s.mergedState().([]*v1.Pod), Op: kubetypes.SET, Source: source}
|
||||
}
|
||||
|
||||
case PodConfigNotificationUnknown:
|
||||
@ -471,15 +470,14 @@ func checkAndUpdatePod(existing, ref *v1.Pod) (needUpdate, needReconcile, needGr
|
||||
return
|
||||
}
|
||||
|
||||
// Sync sends a copy of the current state through the update channel.
|
||||
func (s *podStorage) Sync() {
|
||||
// sync sends a copy of the current state through the update channel.
|
||||
func (s *podStorage) sync() {
|
||||
s.updateLock.Lock()
|
||||
defer s.updateLock.Unlock()
|
||||
s.updates <- kubetypes.PodUpdate{Pods: s.MergedState().([]*v1.Pod), Op: kubetypes.SET, Source: kubetypes.AllSource}
|
||||
s.updates <- kubetypes.PodUpdate{Pods: s.mergedState().([]*v1.Pod), Op: kubetypes.SET, Source: kubetypes.AllSource}
|
||||
}
|
||||
|
||||
// Object implements config.Accessor
|
||||
func (s *podStorage) MergedState() interface{} {
|
||||
func (s *podStorage) mergedState() interface{} {
|
||||
s.podLock.RLock()
|
||||
defer s.podLock.RUnlock()
|
||||
pods := make([]*v1.Pod, 0)
|
||||
|
@ -23,25 +23,18 @@ import (
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
)
|
||||
|
||||
type Merger interface {
|
||||
type merger interface {
|
||||
// Invoked when a change from a source is received. May also function as an incremental
|
||||
// merger if you wish to consume changes incrementally. Must be reentrant when more than
|
||||
// one source is defined.
|
||||
Merge(source string, update interface{}) error
|
||||
}
|
||||
|
||||
// MergeFunc implements the Merger interface
|
||||
type MergeFunc func(source string, update interface{}) error
|
||||
|
||||
func (f MergeFunc) Merge(source string, update interface{}) error {
|
||||
return f(source, update)
|
||||
}
|
||||
|
||||
// Mux is a class for merging configuration from multiple sources. Changes are
|
||||
// mux is a class for merging configuration from multiple sources. Changes are
|
||||
// pushed via channels and sent to the merge function.
|
||||
type Mux struct {
|
||||
type mux struct {
|
||||
// Invoked when an update is sent to a source.
|
||||
merger Merger
|
||||
merger merger
|
||||
|
||||
// Sources and their lock.
|
||||
sourceLock sync.RWMutex
|
||||
@ -49,9 +42,9 @@ type Mux struct {
|
||||
sources map[string]chan interface{}
|
||||
}
|
||||
|
||||
// NewMux creates a new mux that can merge changes from multiple sources.
|
||||
func NewMux(merger Merger) *Mux {
|
||||
mux := &Mux{
|
||||
// newMux creates a new mux that can merge changes from multiple sources.
|
||||
func newMux(merger merger) *mux {
|
||||
mux := &mux{
|
||||
sources: make(map[string]chan interface{}),
|
||||
merger: merger,
|
||||
}
|
||||
@ -63,7 +56,7 @@ func NewMux(merger Merger) *Mux {
|
||||
// source will return the same channel. This allows change and state based sources
|
||||
// to use the same channel. Different source names however will be treated as a
|
||||
// union.
|
||||
func (m *Mux) ChannelWithContext(ctx context.Context, source string) chan interface{} {
|
||||
func (m *mux) ChannelWithContext(ctx context.Context, source string) chan interface{} {
|
||||
if len(source) == 0 {
|
||||
panic("Channel given an empty name")
|
||||
}
|
||||
@ -80,63 +73,8 @@ func (m *Mux) ChannelWithContext(ctx context.Context, source string) chan interf
|
||||
return newChannel
|
||||
}
|
||||
|
||||
func (m *Mux) listen(source string, listenChannel <-chan interface{}) {
|
||||
func (m *mux) listen(source string, listenChannel <-chan interface{}) {
|
||||
for update := range listenChannel {
|
||||
m.merger.Merge(source, update)
|
||||
}
|
||||
}
|
||||
|
||||
// Accessor is an interface for retrieving the current merge state.
|
||||
type Accessor interface {
|
||||
// MergedState returns a representation of the current merge state.
|
||||
// Must be reentrant when more than one source is defined.
|
||||
MergedState() interface{}
|
||||
}
|
||||
|
||||
// AccessorFunc implements the Accessor interface.
|
||||
type AccessorFunc func() interface{}
|
||||
|
||||
func (f AccessorFunc) MergedState() interface{} {
|
||||
return f()
|
||||
}
|
||||
|
||||
type Listener interface {
|
||||
// OnUpdate is invoked when a change is made to an object.
|
||||
OnUpdate(instance interface{})
|
||||
}
|
||||
|
||||
// ListenerFunc receives a representation of the change or object.
|
||||
type ListenerFunc func(instance interface{})
|
||||
|
||||
func (f ListenerFunc) OnUpdate(instance interface{}) {
|
||||
f(instance)
|
||||
}
|
||||
|
||||
type Broadcaster struct {
|
||||
// Listeners for changes and their lock.
|
||||
listenerLock sync.RWMutex
|
||||
listeners []Listener
|
||||
}
|
||||
|
||||
// NewBroadcaster registers a set of listeners that support the Listener interface
|
||||
// and notifies them all on changes.
|
||||
func NewBroadcaster() *Broadcaster {
|
||||
return &Broadcaster{}
|
||||
}
|
||||
|
||||
// Add registers listener to receive updates of changes.
|
||||
func (b *Broadcaster) Add(listener Listener) {
|
||||
b.listenerLock.Lock()
|
||||
defer b.listenerLock.Unlock()
|
||||
b.listeners = append(b.listeners, listener)
|
||||
}
|
||||
|
||||
// Notify notifies all listeners.
|
||||
func (b *Broadcaster) Notify(instance interface{}) {
|
||||
b.listenerLock.RLock()
|
||||
listeners := b.listeners
|
||||
b.listenerLock.RUnlock()
|
||||
for _, listener := range listeners {
|
||||
listener.OnUpdate(instance)
|
||||
}
|
||||
}
|
@ -26,7 +26,7 @@ func TestConfigurationChannels(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
mux := NewMux(nil)
|
||||
mux := newMux(nil)
|
||||
channelOne := mux.ChannelWithContext(ctx, "one")
|
||||
if channelOne != mux.ChannelWithContext(ctx, "one") {
|
||||
t.Error("Didn't get the same muxuration channel back with the same name")
|
||||
@ -58,27 +58,15 @@ func TestMergeInvoked(t *testing.T) {
|
||||
defer cancel()
|
||||
|
||||
merger := MergeMock{"one", "test", t}
|
||||
mux := NewMux(&merger)
|
||||
mux := newMux(&merger)
|
||||
mux.ChannelWithContext(ctx, "one") <- "test"
|
||||
}
|
||||
|
||||
func TestMergeFuncInvoked(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
// mergeFunc implements the Merger interface
|
||||
type mergeFunc func(source string, update interface{}) error
|
||||
|
||||
ch := make(chan bool)
|
||||
mux := NewMux(MergeFunc(func(source string, update interface{}) error {
|
||||
if source != "one" {
|
||||
t.Errorf("Expected %s, Got %s", "one", source)
|
||||
}
|
||||
if update.(string) != "test" {
|
||||
t.Errorf("Expected %s, Got %s", "test", update)
|
||||
}
|
||||
ch <- true
|
||||
return nil
|
||||
}))
|
||||
mux.ChannelWithContext(ctx, "one") <- "test"
|
||||
<-ch
|
||||
func (f mergeFunc) Merge(source string, update interface{}) error {
|
||||
return f(source, update)
|
||||
}
|
||||
|
||||
func TestSimultaneousMerge(t *testing.T) {
|
||||
@ -86,7 +74,7 @@ func TestSimultaneousMerge(t *testing.T) {
|
||||
defer cancel()
|
||||
|
||||
ch := make(chan bool, 2)
|
||||
mux := NewMux(MergeFunc(func(source string, update interface{}) error {
|
||||
mux := newMux(mergeFunc(func(source string, update interface{}) error {
|
||||
switch source {
|
||||
case "one":
|
||||
if update.(string) != "test" {
|
||||
@ -109,25 +97,3 @@ func TestSimultaneousMerge(t *testing.T) {
|
||||
<-ch
|
||||
<-ch
|
||||
}
|
||||
|
||||
func TestBroadcaster(t *testing.T) {
|
||||
b := NewBroadcaster()
|
||||
b.Notify(struct{}{})
|
||||
|
||||
ch := make(chan bool, 2)
|
||||
b.Add(ListenerFunc(func(object interface{}) {
|
||||
if object != "test" {
|
||||
t.Errorf("Expected %s, Got %s", "test", object)
|
||||
}
|
||||
ch <- true
|
||||
}))
|
||||
b.Add(ListenerFunc(func(object interface{}) {
|
||||
if object != "test" {
|
||||
t.Errorf("Expected %s, Got %s", "test", object)
|
||||
}
|
||||
ch <- true
|
||||
}))
|
||||
b.Notify("test")
|
||||
<-ch
|
||||
<-ch
|
||||
}
|
@ -1,20 +0,0 @@
|
||||
/*
|
||||
Copyright 2014 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
// Package config provides utility objects for decoupling sources of configuration and the
|
||||
// actual configuration state. Consumers must implement the Merger interface to unify
|
||||
// the sources of change into an object.
|
||||
package config // import "k8s.io/kubernetes/pkg/util/config"
|
Loading…
Reference in New Issue
Block a user