Merge pull request #58394 from deads2k/controller-08-redeliver

Automatic merge from submit-queue (batch tested with PRs 58412, 56132, 58506, 58542, 58394). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.

don't stop informer delivery on error

If an informer delivery fails today, we stop delivering to it entirely.  The pull updates the code to skip that particular notification, delay, and continue delivery with the next time.

/assign derekwaynecarr 
/assign ncdc
/assign ash2k

@derekwaynecarr This would change the "the controller isn't doing anything?!" to "the controller missed my (individual) resource!"


```release-note
NONE
```
This commit is contained in:
Kubernetes Submit Queue 2018-01-22 22:57:47 -08:00 committed by GitHub
commit 71426ba59f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 49 additions and 12 deletions

View File

@ -1634,6 +1634,10 @@
"ImportPath": "k8s.io/client-go/util/integer",
"Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
},
{
"ImportPath": "k8s.io/client-go/util/retry",
"Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
},
{
"ImportPath": "k8s.io/kube-openapi/pkg/builder",
"Rev": "a07b7bbb58e7fdc5144f8d7046331d29fc9ad3b3"

View File

@ -1762,6 +1762,10 @@
"ImportPath": "k8s.io/client-go/util/integer",
"Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
},
{
"ImportPath": "k8s.io/client-go/util/retry",
"Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
},
{
"ImportPath": "k8s.io/kube-openapi/pkg/builder",
"Rev": "a07b7bbb58e7fdc5144f8d7046331d29fc9ad3b3"

View File

@ -83,6 +83,7 @@ go_library(
"//vendor/k8s.io/client-go/rest:go_default_library",
"//vendor/k8s.io/client-go/tools/pager:go_default_library",
"//vendor/k8s.io/client-go/util/buffer:go_default_library",
"//vendor/k8s.io/client-go/util/retry:go_default_library",
],
)

View File

@ -26,6 +26,7 @@ import (
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/util/buffer"
"k8s.io/client-go/util/retry"
"github.com/golang/glog"
)
@ -540,20 +541,35 @@ func (p *processorListener) pop() {
}
func (p *processorListener) run() {
defer utilruntime.HandleCrash()
// this call blocks until the channel is closed. When a panic happens during the notification
// we will catch it, **the offending item will be skipped!**, and after a short delay (one second)
// the next notification will be attempted. This is usually better than the alternative of never
// delivering again.
stopCh := make(chan struct{})
wait.Until(func() {
// this gives us a few quick retries before a long pause and then a few more quick retries
err := wait.ExponentialBackoff(retry.DefaultRetry, func() (bool, error) {
for next := range p.nextCh {
switch notification := next.(type) {
case updateNotification:
p.handler.OnUpdate(notification.oldObj, notification.newObj)
case addNotification:
p.handler.OnAdd(notification.newObj)
case deleteNotification:
p.handler.OnDelete(notification.oldObj)
default:
utilruntime.HandleError(fmt.Errorf("unrecognized notification: %#v", next))
}
}
// the only way to get here is if the p.nextCh is empty and closed
return true, nil
})
for next := range p.nextCh {
switch notification := next.(type) {
case updateNotification:
p.handler.OnUpdate(notification.oldObj, notification.newObj)
case addNotification:
p.handler.OnAdd(notification.newObj)
case deleteNotification:
p.handler.OnDelete(notification.oldObj)
default:
utilruntime.HandleError(fmt.Errorf("unrecognized notification: %#v", next))
// the only way to get here is if the p.nextCh is empty and closed
if err == nil {
close(stopCh)
}
}
}, 1*time.Minute, stopCh)
}
// shouldResync deterimines if the listener needs a resync. If the listener's resyncPeriod is 0,

View File

@ -1618,6 +1618,10 @@
"ImportPath": "k8s.io/client-go/util/integer",
"Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
},
{
"ImportPath": "k8s.io/client-go/util/retry",
"Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
},
{
"ImportPath": "k8s.io/client-go/util/workqueue",
"Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"

View File

@ -1610,6 +1610,10 @@
"ImportPath": "k8s.io/client-go/util/integer",
"Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
},
{
"ImportPath": "k8s.io/client-go/util/retry",
"Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
},
{
"ImportPath": "k8s.io/kube-openapi/pkg/builder",
"Rev": "a07b7bbb58e7fdc5144f8d7046331d29fc9ad3b3"

View File

@ -890,6 +890,10 @@
"ImportPath": "k8s.io/client-go/util/integer",
"Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
},
{
"ImportPath": "k8s.io/client-go/util/retry",
"Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
},
{
"ImportPath": "k8s.io/client-go/util/workqueue",
"Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"