mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-22 03:11:40 +00:00
parent
e09a353942
commit
d789615902
@ -22,6 +22,7 @@ import (
|
|||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/fields"
|
"k8s.io/apimachinery/pkg/fields"
|
||||||
"k8s.io/apimachinery/pkg/types"
|
"k8s.io/apimachinery/pkg/types"
|
||||||
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
clientset "k8s.io/client-go/kubernetes"
|
clientset "k8s.io/client-go/kubernetes"
|
||||||
"k8s.io/client-go/tools/cache"
|
"k8s.io/client-go/tools/cache"
|
||||||
"k8s.io/kubernetes/pkg/api"
|
"k8s.io/kubernetes/pkg/api"
|
||||||
@ -43,5 +44,6 @@ func newSourceApiserverFromLW(lw cache.ListerWatcher, updates chan<- interface{}
|
|||||||
}
|
}
|
||||||
updates <- kubetypes.PodUpdate{Pods: pods, Op: kubetypes.SET, Source: kubetypes.ApiserverSource}
|
updates <- kubetypes.PodUpdate{Pods: pods, Op: kubetypes.SET, Source: kubetypes.ApiserverSource}
|
||||||
}
|
}
|
||||||
cache.NewReflector(lw, &v1.Pod{}, cache.NewUndeltaStore(send, cache.MetaNamespaceKeyFunc), 0).Run()
|
r := cache.NewReflector(lw, &v1.Pod{}, cache.NewUndeltaStore(send, cache.MetaNamespaceKeyFunc), 0)
|
||||||
|
go r.Run(wait.NeverStop)
|
||||||
}
|
}
|
||||||
|
@ -392,7 +392,8 @@ func NewMainKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *Kub
|
|||||||
serviceIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
|
serviceIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
|
||||||
if kubeDeps.KubeClient != nil {
|
if kubeDeps.KubeClient != nil {
|
||||||
serviceLW := cache.NewListWatchFromClient(kubeDeps.KubeClient.Core().RESTClient(), "services", metav1.NamespaceAll, fields.Everything())
|
serviceLW := cache.NewListWatchFromClient(kubeDeps.KubeClient.Core().RESTClient(), "services", metav1.NamespaceAll, fields.Everything())
|
||||||
cache.NewReflector(serviceLW, &v1.Service{}, serviceIndexer, 0).Run()
|
r := cache.NewReflector(serviceLW, &v1.Service{}, serviceIndexer, 0)
|
||||||
|
go r.Run(wait.NeverStop)
|
||||||
}
|
}
|
||||||
serviceLister := corelisters.NewServiceLister(serviceIndexer)
|
serviceLister := corelisters.NewServiceLister(serviceIndexer)
|
||||||
|
|
||||||
@ -400,7 +401,8 @@ func NewMainKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *Kub
|
|||||||
if kubeDeps.KubeClient != nil {
|
if kubeDeps.KubeClient != nil {
|
||||||
fieldSelector := fields.Set{api.ObjectNameField: string(nodeName)}.AsSelector()
|
fieldSelector := fields.Set{api.ObjectNameField: string(nodeName)}.AsSelector()
|
||||||
nodeLW := cache.NewListWatchFromClient(kubeDeps.KubeClient.Core().RESTClient(), "nodes", metav1.NamespaceAll, fieldSelector)
|
nodeLW := cache.NewListWatchFromClient(kubeDeps.KubeClient.Core().RESTClient(), "nodes", metav1.NamespaceAll, fieldSelector)
|
||||||
cache.NewReflector(nodeLW, &v1.Node{}, nodeIndexer, 0).Run()
|
r := cache.NewReflector(nodeLW, &v1.Node{}, nodeIndexer, 0)
|
||||||
|
go r.Run(wait.NeverStop)
|
||||||
}
|
}
|
||||||
nodeInfo := &predicates.CachedNodeInfo{NodeLister: corelisters.NewNodeLister(nodeIndexer)}
|
nodeInfo := &predicates.CachedNodeInfo{NodeLister: corelisters.NewNodeLister(nodeIndexer)}
|
||||||
|
|
||||||
|
@ -19,6 +19,7 @@ package wait
|
|||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"k8s.io/apimachinery/pkg/util/runtime"
|
"k8s.io/apimachinery/pkg/util/runtime"
|
||||||
@ -36,6 +37,15 @@ var ForeverTestTimeout = time.Second * 30
|
|||||||
// NeverStop may be passed to Until to make it never stop.
|
// NeverStop may be passed to Until to make it never stop.
|
||||||
var NeverStop <-chan struct{} = make(chan struct{})
|
var NeverStop <-chan struct{} = make(chan struct{})
|
||||||
|
|
||||||
|
// StartUntil starts f in a new goroutine and calls done once f has finished.
|
||||||
|
func StartUntil(stopCh <-chan struct{}, wg *sync.WaitGroup, f func(stopCh <-chan struct{})) {
|
||||||
|
wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
f(stopCh)
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
// Forever calls f every period for ever.
|
// Forever calls f every period for ever.
|
||||||
//
|
//
|
||||||
// Forever is syntactic sugar on top of Until.
|
// Forever is syntactic sugar on top of Until.
|
||||||
|
@ -116,7 +116,10 @@ func (c *controller) Run(stopCh <-chan struct{}) {
|
|||||||
c.reflector = r
|
c.reflector = r
|
||||||
c.reflectorMutex.Unlock()
|
c.reflectorMutex.Unlock()
|
||||||
|
|
||||||
r.RunUntil(stopCh)
|
var wg sync.WaitGroup
|
||||||
|
defer wg.Wait()
|
||||||
|
|
||||||
|
wait.StartUntil(stopCh, &wg, r.Run)
|
||||||
|
|
||||||
wait.Until(c.processLoop, time.Second, stopCh)
|
wait.Until(c.processLoop, time.Second, stopCh)
|
||||||
}
|
}
|
||||||
|
@ -79,17 +79,15 @@ type cacheObj struct {
|
|||||||
|
|
||||||
func (d *defaultCacheMutationDetector) Run(stopCh <-chan struct{}) {
|
func (d *defaultCacheMutationDetector) Run(stopCh <-chan struct{}) {
|
||||||
// we DON'T want protection from panics. If we're running this code, we want to die
|
// we DON'T want protection from panics. If we're running this code, we want to die
|
||||||
go func() {
|
for {
|
||||||
for {
|
d.CompareObjects()
|
||||||
d.CompareObjects()
|
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-stopCh:
|
case <-stopCh:
|
||||||
return
|
return
|
||||||
case <-time.After(d.period):
|
case <-time.After(d.period):
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}()
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// AddObject makes a deep copy of the object for later comparison. It only works on runtime.Object
|
// AddObject makes a deep copy of the object for later comparison. It only works on runtime.Object
|
||||||
|
@ -182,21 +182,10 @@ func extractStackCreator() (string, int, bool) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Run starts a watch and handles watch events. Will restart the watch if it is closed.
|
// Run starts a watch and handles watch events. Will restart the watch if it is closed.
|
||||||
// Run starts a goroutine and returns immediately.
|
// Run will exit when stopCh is closed.
|
||||||
func (r *Reflector) Run() {
|
func (r *Reflector) Run(stopCh <-chan struct{}) {
|
||||||
glog.V(3).Infof("Starting reflector %v (%s) from %s", r.expectedType, r.resyncPeriod, r.name)
|
glog.V(3).Infof("Starting reflector %v (%s) from %s", r.expectedType, r.resyncPeriod, r.name)
|
||||||
go wait.Until(func() {
|
wait.Until(func() {
|
||||||
if err := r.ListAndWatch(wait.NeverStop); err != nil {
|
|
||||||
utilruntime.HandleError(err)
|
|
||||||
}
|
|
||||||
}, r.period, wait.NeverStop)
|
|
||||||
}
|
|
||||||
|
|
||||||
// RunUntil starts a watch and handles watch events. Will restart the watch if it is closed.
|
|
||||||
// RunUntil starts a goroutine and returns immediately. It will exit when stopCh is closed.
|
|
||||||
func (r *Reflector) RunUntil(stopCh <-chan struct{}) {
|
|
||||||
glog.V(3).Infof("Starting reflector %v (%s) from %s", r.expectedType, r.resyncPeriod, r.name)
|
|
||||||
go wait.Until(func() {
|
|
||||||
if err := r.ListAndWatch(stopCh); err != nil {
|
if err := r.ListAndWatch(stopCh); err != nil {
|
||||||
utilruntime.HandleError(err)
|
utilruntime.HandleError(err)
|
||||||
}
|
}
|
||||||
|
@ -83,7 +83,7 @@ func TestRunUntil(t *testing.T) {
|
|||||||
return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "1"}}, nil
|
return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "1"}}, nil
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
r.RunUntil(stopCh)
|
go r.Run(stopCh)
|
||||||
// Synchronously add a dummy pod into the watch channel so we
|
// Synchronously add a dummy pod into the watch channel so we
|
||||||
// know the RunUntil go routine is in the watch handler.
|
// know the RunUntil go routine is in the watch handler.
|
||||||
fw.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}})
|
fw.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}})
|
||||||
|
@ -147,6 +147,7 @@ type sharedIndexInformer struct {
|
|||||||
// stopCh is the channel used to stop the main Run process. We have to track it so that
|
// stopCh is the channel used to stop the main Run process. We have to track it so that
|
||||||
// late joiners can have a proper stop
|
// late joiners can have a proper stop
|
||||||
stopCh <-chan struct{}
|
stopCh <-chan struct{}
|
||||||
|
wg sync.WaitGroup
|
||||||
}
|
}
|
||||||
|
|
||||||
// dummyController hides the fact that a SharedInformer is different from a dedicated one
|
// dummyController hides the fact that a SharedInformer is different from a dedicated one
|
||||||
@ -204,12 +205,14 @@ func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
|
|||||||
|
|
||||||
s.controller = New(cfg)
|
s.controller = New(cfg)
|
||||||
s.controller.(*controller).clock = s.clock
|
s.controller.(*controller).clock = s.clock
|
||||||
|
s.stopCh = stopCh
|
||||||
s.started = true
|
s.started = true
|
||||||
}()
|
}()
|
||||||
|
|
||||||
s.stopCh = stopCh
|
defer s.wg.Wait()
|
||||||
s.cacheMutationDetector.Run(stopCh)
|
|
||||||
s.processor.run(stopCh)
|
wait.StartUntil(stopCh, &s.wg, s.cacheMutationDetector.Run)
|
||||||
|
wait.StartUntil(stopCh, &s.wg, s.processor.run)
|
||||||
s.controller.Run(stopCh)
|
s.controller.Run(stopCh)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -324,8 +327,8 @@ func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEv
|
|||||||
|
|
||||||
s.processor.addListener(listener)
|
s.processor.addListener(listener)
|
||||||
|
|
||||||
go listener.run(s.stopCh)
|
wait.StartUntil(s.stopCh, &s.wg, listener.run)
|
||||||
go listener.pop(s.stopCh)
|
wait.StartUntil(s.stopCh, &s.wg, listener.pop)
|
||||||
|
|
||||||
items := s.indexer.List()
|
items := s.indexer.List()
|
||||||
for i := range items {
|
for i := range items {
|
||||||
@ -395,13 +398,16 @@ func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (p *sharedProcessor) run(stopCh <-chan struct{}) {
|
func (p *sharedProcessor) run(stopCh <-chan struct{}) {
|
||||||
p.listenersLock.RLock()
|
var wg sync.WaitGroup
|
||||||
defer p.listenersLock.RUnlock()
|
func() {
|
||||||
|
p.listenersLock.RLock()
|
||||||
for _, listener := range p.listeners {
|
defer p.listenersLock.RUnlock()
|
||||||
go listener.run(stopCh)
|
for _, listener := range p.listeners {
|
||||||
go listener.pop(stopCh)
|
wait.StartUntil(stopCh, &wg, listener.run)
|
||||||
}
|
wait.StartUntil(stopCh, &wg, listener.pop)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
wg.Wait()
|
||||||
}
|
}
|
||||||
|
|
||||||
// shouldResync queries every listener to determine if any of them need a resync, based on each
|
// shouldResync queries every listener to determine if any of them need a resync, based on each
|
||||||
|
@ -51,7 +51,7 @@ func NewPodStore(c clientset.Interface, namespace string, label labels.Selector,
|
|||||||
store := cache.NewStore(cache.MetaNamespaceKeyFunc)
|
store := cache.NewStore(cache.MetaNamespaceKeyFunc)
|
||||||
stopCh := make(chan struct{})
|
stopCh := make(chan struct{})
|
||||||
reflector := cache.NewReflector(lw, &v1.Pod{}, store, 0)
|
reflector := cache.NewReflector(lw, &v1.Pod{}, store, 0)
|
||||||
reflector.RunUntil(stopCh)
|
go reflector.Run(stopCh)
|
||||||
return &PodStore{Store: store, stopCh: stopCh, Reflector: reflector}
|
return &PodStore{Store: store, stopCh: stopCh, Reflector: reflector}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user