Compare commits

...

5 Commits

Author SHA1 Message Date
Kubernetes Publisher
1d4a1e8243 Update dependencies to v0.19.2 tag 2020-09-16 20:14:05 +00:00
Kubernetes Publisher
bb0bc934b5 Merge pull request #94431 from janeczku/automated-cherry-pick-of-#94316-upstream-release-1.19
Automated cherry pick of #94316: Fixed reflector not recovering from "Too large resource

Kubernetes-commit: 8331637da07131bd5dfef49c8f8deaf248ee6669
2020-09-04 05:34:52 +00:00
Kubernetes Publisher
d3292e7379 Merge pull request #94427 from gobomb/automated-cherry-pick-of-#93646-origin-release-1.19
Automated cherry pick of #93646: let panics propagate up when processLoop panic

Kubernetes-commit: f61113118fd447e9e15174aa4f804d7754a9e495
2020-09-04 01:35:57 +00:00
janeczku
85ff1af514 Fixed reflector not recovering from "Too large resource version" errors with API servers 1.17.0-1.18.5
Kubernetes-commit: e21e490535fb7a5abd2f6f2c03a11613767e4800
2020-08-28 21:17:27 +02:00
gobomb
705dbea9c0 let panics propagate up when processLoop panic
Kubernetes-commit: c9d4923fa5438bc820bcf3b6a34cb747befaf59a
2020-08-03 09:30:39 +00:00
7 changed files with 90 additions and 15 deletions

4
Godeps/Godeps.json generated
View File

@@ -440,11 +440,11 @@
},
{
"ImportPath": "k8s.io/api",
"Rev": "21b59c1ded36"
"Rev": "v0.19.2"
},
{
"ImportPath": "k8s.io/apimachinery",
"Rev": "b63a0c883fbf"
"Rev": "v0.19.2"
},
{
"ImportPath": "k8s.io/gengo",

8
go.mod
View File

@@ -26,14 +26,14 @@ require (
golang.org/x/net v0.0.0-20200707034311-ab3426394381
golang.org/x/oauth2 v0.0.0-20191202225959-858c2ad4c8b6
golang.org/x/time v0.0.0-20191024005414-555d28b269f0
k8s.io/api v0.0.0-20200821172135-21b59c1ded36
k8s.io/apimachinery v0.0.0-20200821171749-b63a0c883fbf
k8s.io/api v0.19.2
k8s.io/apimachinery v0.19.2
k8s.io/klog/v2 v2.2.0
k8s.io/utils v0.0.0-20200729134348-d5654de09c73
sigs.k8s.io/yaml v1.2.0
)
replace (
k8s.io/api => k8s.io/api v0.0.0-20200821172135-21b59c1ded36
k8s.io/apimachinery => k8s.io/apimachinery v0.0.0-20200821171749-b63a0c883fbf
k8s.io/api => k8s.io/api v0.19.2
k8s.io/apimachinery => k8s.io/apimachinery v0.19.2
)

4
go.sum
View File

@@ -333,8 +333,8 @@ honnef.co/go/tools v0.0.0-20190106161140-3f1c8253044a/go.mod h1:rf3lG4BRIbNafJWh
honnef.co/go/tools v0.0.0-20190418001031-e561f6794a2a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg=
k8s.io/api v0.0.0-20200821172135-21b59c1ded36/go.mod h1:WXzrXjAr+IgCMGkIbOU4i87rvN7UWNGsyNmBEkW9rx8=
k8s.io/apimachinery v0.0.0-20200821171749-b63a0c883fbf/go.mod h1:DnPGDnARWFvYa3pMHgSxtbZb7gpzzAZ1pTfaUNDVlmA=
k8s.io/api v0.19.2/go.mod h1:IQpK0zFQ1xc5iNIQPqzgoOwuFugaYHK4iCknlAQP9nI=
k8s.io/apimachinery v0.19.2/go.mod h1:DnPGDnARWFvYa3pMHgSxtbZb7gpzzAZ1pTfaUNDVlmA=
k8s.io/gengo v0.0.0-20200413195148-3a45101e95ac/go.mod h1:ezvh/TsK7cY6rbqRK0oQQ8IAqLxYwwyPxAX1Pzy0ii0=
k8s.io/klog/v2 v2.0.0/go.mod h1:PBfzABfn139FHAV07az/IF9Wp1bkk3vpT2XSJ76fSDE=
k8s.io/klog/v2 v2.2.0 h1:XRvcwJozkgZ1UQJmfMGpvRthQHOvihEhYtDfAaxMz/A=

View File

@@ -144,11 +144,11 @@ func (c *controller) Run(stopCh <-chan struct{}) {
c.reflectorMutex.Unlock()
var wg wait.Group
defer wg.Wait()
wg.StartWithChannel(stopCh, r.Run)
wait.Until(c.processLoop, time.Second, stopCh)
wg.Wait()
}
// Returns true once this controller has completed an initial resource listing

View File

@@ -402,3 +402,49 @@ func TestUpdate(t *testing.T) {
testDoneWG.Wait()
close(stop)
}
func TestPanicPropagated(t *testing.T) {
// source simulates an apiserver object endpoint.
source := fcache.NewFakeControllerSource()
// Make a controller that just panic if the AddFunc is called.
_, controller := NewInformer(
source,
&v1.Pod{},
time.Millisecond*100,
ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
// Create a panic.
panic("Just panic.")
},
},
)
// Run the controller and run it until we close stop.
stop := make(chan struct{})
defer close(stop)
propagated := make(chan interface{})
go func() {
defer func() {
if r := recover(); r != nil {
propagated <- r
}
}()
controller.Run(stop)
}()
// Let's add a object to the source. It will trigger a panic.
source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "test"}})
// Check if the panic propagated up.
select {
case p := <-propagated:
if p == "Just panic." {
t.Logf("Test Passed")
} else {
t.Errorf("unrecognized panic in controller run: %v", p)
}
case <-time.After(wait.ForeverTestTimeout):
t.Errorf("timeout: the panic failed to propagate from the controller run method!")
}
}

View File

@@ -570,5 +570,26 @@ func isExpiredError(err error) bool {
}
func isTooLargeResourceVersionError(err error) bool {
return apierrors.HasStatusCause(err, metav1.CauseTypeResourceVersionTooLarge)
if apierrors.HasStatusCause(err, metav1.CauseTypeResourceVersionTooLarge) {
return true
}
// In Kubernetes 1.17.0-1.18.5, the api server doesn't set the error status cause to
// metav1.CauseTypeResourceVersionTooLarge to indicate that the requested minimum resource
// version is larger than the largest currently available resource version. To ensure backward
// compatibility with these server versions we also need to detect the error based on the content
// of the error message field.
if !apierrors.IsTimeout(err) {
return false
}
apierr, ok := err.(apierrors.APIStatus)
if !ok || apierr == nil || apierr.Status().Details == nil {
return false
}
for _, cause := range apierr.Status().Details.Causes {
// Matches the message returned by api server 1.17.0-1.18.5 for this error condition
if cause.Message == "Too large resource version" {
return true
}
}
return false
}

View File

@@ -738,9 +738,14 @@ func TestReflectorFullListIfTooLarge(t *testing.T) {
err := apierrors.NewTimeoutError("too large resource version", 1)
err.ErrStatus.Details.Causes = []metav1.StatusCause{{Type: metav1.CauseTypeResourceVersionTooLarge}}
return nil, err
// relist after the initial list (covers the error format used in api server 1.17.0-1.18.5)
case "30":
err := apierrors.NewTimeoutError("too large resource version", 1)
err.ErrStatus.Details.Causes = []metav1.StatusCause{{Message: "Too large resource version"}}
return nil, err
// relist from etcd after "too large" error
case "":
return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "10"}}, nil
return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "30"}}, nil
default:
return nil, fmt.Errorf("unexpected List call: %s", options.ResourceVersion)
}
@@ -759,12 +764,15 @@ func TestReflectorFullListIfTooLarge(t *testing.T) {
// may be synced to a different version and they will never converge.
// TODO: We should use etcd progress-notify feature to avoid this behavior but until this is
// done we simply try to relist from now to avoid continuous errors on relists.
stopCh = make(chan struct{})
if err := r.ListAndWatch(stopCh); err != nil {
t.Fatal(err)
for i := 1; i <= 2; i++ {
// relist twice to cover the two variants of TooLargeResourceVersion api errors
stopCh = make(chan struct{})
if err := r.ListAndWatch(stopCh); err != nil {
t.Fatal(err)
}
}
expectedRVs := []string{"0", "20", ""}
expectedRVs := []string{"0", "20", "", "30", ""}
if !reflect.DeepEqual(listCallRVs, expectedRVs) {
t.Errorf("Expected series of list calls with resource version of %#v but got: %#v", expectedRVs, listCallRVs)
}