mirror of
https://github.com/kubernetes/client-go.git
synced 2025-08-21 00:23:39 +00:00
Merge pull request #116297 from p0lyn0mial/upstream-reflector-list-n-watch-refactor
reflector: extract watch and startResyncAsync methods Kubernetes-commit: 778b24c97e189dcc8f14a81ce32e022b959f8fd3
This commit is contained in:
commit
29a689d161
4
go.mod
4
go.mod
@ -24,7 +24,7 @@ require (
|
||||
golang.org/x/term v0.5.0
|
||||
golang.org/x/time v0.0.0-20220210224613-90d013bbcef8
|
||||
google.golang.org/protobuf v1.28.1
|
||||
k8s.io/api v0.0.0-20230303235756-fc1b77c3f4ab
|
||||
k8s.io/api v0.0.0-20230304080250-2d949b7889c7
|
||||
k8s.io/apimachinery v0.0.0-20230303235435-f357b1fa74b7
|
||||
k8s.io/klog/v2 v2.90.1
|
||||
k8s.io/kube-openapi v0.0.0-20230303024457-afdc3dddf62d
|
||||
@ -59,6 +59,6 @@ require (
|
||||
)
|
||||
|
||||
replace (
|
||||
k8s.io/api => k8s.io/api v0.0.0-20230303235756-fc1b77c3f4ab
|
||||
k8s.io/api => k8s.io/api v0.0.0-20230304080250-2d949b7889c7
|
||||
k8s.io/apimachinery => k8s.io/apimachinery v0.0.0-20230303235435-f357b1fa74b7
|
||||
)
|
||||
|
4
go.sum
4
go.sum
@ -473,8 +473,8 @@ honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWh
|
||||
honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg=
|
||||
honnef.co/go/tools v0.0.1-2020.1.3/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k=
|
||||
honnef.co/go/tools v0.0.1-2020.1.4/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k=
|
||||
k8s.io/api v0.0.0-20230303235756-fc1b77c3f4ab h1:peGI8OxM+LOJxG8FqX/y6BVELxehZ4fDq9cm1a5Qaz0=
|
||||
k8s.io/api v0.0.0-20230303235756-fc1b77c3f4ab/go.mod h1:ihVCDKSD6f+H/yGKVpY9HSgpw4StgSvrI3dbK05M9a8=
|
||||
k8s.io/api v0.0.0-20230304080250-2d949b7889c7 h1:aiAGgZyin08AS6FReGcsRu4Jx++Z2h1jgqG4wl6mho8=
|
||||
k8s.io/api v0.0.0-20230304080250-2d949b7889c7/go.mod h1:esKbT+6XB9TZUHyxlJVQ3zUM0abhQZ81Ic68eirO+xM=
|
||||
k8s.io/apimachinery v0.0.0-20230303235435-f357b1fa74b7 h1:YN43Lvs3Pj9iQmuWGojeBiFdz1mkrxe0EZn7Ba3TMpQ=
|
||||
k8s.io/apimachinery v0.0.0-20230303235435-f357b1fa74b7/go.mod h1:jlJwObMa4oKAEOMnAeEaqeiM+Fwd/CbAwNyQ7OaEwS0=
|
||||
k8s.io/klog/v2 v2.90.1 h1:m4bYOKall2MmOiRaR1J+We67Do7vm9KiQVlT96lnHUw=
|
||||
|
57
tools/cache/reflector.go
vendored
57
tools/cache/reflector.go
vendored
@ -323,31 +323,40 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
|
||||
resyncerrc := make(chan error, 1)
|
||||
cancelCh := make(chan struct{})
|
||||
defer close(cancelCh)
|
||||
go func() {
|
||||
resyncCh, cleanup := r.resyncChan()
|
||||
defer func() {
|
||||
cleanup() // Call the last one written into cleanup
|
||||
}()
|
||||
for {
|
||||
select {
|
||||
case <-resyncCh:
|
||||
case <-stopCh:
|
||||
return
|
||||
case <-cancelCh:
|
||||
return
|
||||
}
|
||||
if r.ShouldResync == nil || r.ShouldResync() {
|
||||
klog.V(4).Infof("%s: forcing resync", r.name)
|
||||
if err := r.store.Resync(); err != nil {
|
||||
resyncerrc <- err
|
||||
return
|
||||
}
|
||||
}
|
||||
cleanup()
|
||||
resyncCh, cleanup = r.resyncChan()
|
||||
}
|
||||
}()
|
||||
go r.startResync(stopCh, cancelCh, resyncerrc)
|
||||
return r.watch(stopCh, resyncerrc)
|
||||
}
|
||||
|
||||
// startResync periodically calls r.store.Resync() method.
|
||||
// Note that this method is blocking and should be
|
||||
// called in a separate goroutine.
|
||||
func (r *Reflector) startResync(stopCh <-chan struct{}, cancelCh <-chan struct{}, resyncerrc chan error) {
|
||||
resyncCh, cleanup := r.resyncChan()
|
||||
defer func() {
|
||||
cleanup() // Call the last one written into cleanup
|
||||
}()
|
||||
for {
|
||||
select {
|
||||
case <-resyncCh:
|
||||
case <-stopCh:
|
||||
return
|
||||
case <-cancelCh:
|
||||
return
|
||||
}
|
||||
if r.ShouldResync == nil || r.ShouldResync() {
|
||||
klog.V(4).Infof("%s: forcing resync", r.name)
|
||||
if err := r.store.Resync(); err != nil {
|
||||
resyncerrc <- err
|
||||
return
|
||||
}
|
||||
}
|
||||
cleanup()
|
||||
resyncCh, cleanup = r.resyncChan()
|
||||
}
|
||||
}
|
||||
|
||||
// watch simply starts a watch request with the server.
|
||||
func (r *Reflector) watch(stopCh <-chan struct{}, resyncerrc chan error) error {
|
||||
retry := NewRetryWithDeadline(r.MaxInternalErrorRetryDuration, time.Minute, apierrors.IsInternalError, r.clock)
|
||||
for {
|
||||
// give the stopCh a chance to stop the loop, even in case of continue statements further down on errors
|
||||
|
Loading…
Reference in New Issue
Block a user