Compare commits

..

11 Commits

Author SHA1 Message Date
Kubernetes Publisher
5f9873e4ca Update dependencies to v0.18.10 tag 2020-10-20 10:30:53 +00:00
Kubernetes Publisher
4c2c810a97 Merge pull request #94426 from gobomb/automated-cherry-pick-of-#93646-origin-release-1.18
Automated cherry pick of #93646: let panics propagate up when processLoop panic

Kubernetes-commit: 9150263c8c7a42f3ae28e6e27a471d2697dfe5ed
2020-09-04 05:30:45 +00:00
Kubernetes Publisher
f3104ae88e Merge pull request #94430 from janeczku/automated-cherry-pick-of-#94316-upstream-release-1.18
Automated cherry pick of #94316: Fixed reflector not recovering from "Too large resource

Kubernetes-commit: abb70d04424848f39a771d04521822c81481b99b
2020-09-04 05:30:44 +00:00
Kubernetes Publisher
82b9b1a146 Merge pull request #94148 from liggitt/json-patch-1.18
[1.18] Update to json-patch 4.9.0

Kubernetes-commit: 62eff8d42cff51ddab6ef3c5b3f7380a5c02815f
2020-09-03 13:32:01 +00:00
janeczku
94230b73bd Fixed reflector not recovering from "Too large resource version" errors with API servers 1.17.0-1.18.5
Kubernetes-commit: 744dd65b388e9fc17f53072db35c38748a83e777
2020-08-28 21:17:27 +02:00
Jordan Liggitt
0b89f44804 Update json-patch to v4.9.0 tagged release
Kubernetes-commit: f28d1007a56fa23c1874b973fa0024eddffc6fac
2020-08-20 17:14:41 -04:00
Kubernetes Publisher
c8494005d1 Merge pull request #93811 from liggitt/json-patch-4.8.0-1.18
[1.18] Update to json-patch 4.8.0

Kubernetes-commit: 7914bac28a993d4493e0c1e31323bb9330dbdcf6
2020-08-12 05:30:03 +00:00
Jordan Liggitt
146c268590 Update to json-patch 4.8.0
Kubernetes-commit: b924a9330380fd993ad9ca4ebb171c1a67ecb51f
2020-08-08 09:59:16 -04:00
gobomb
ce358577b6 let panics propagate up when processLoop panic
Kubernetes-commit: 2474a2ebc8f953104b25b48c771596d57e918470
2020-08-03 09:30:39 +00:00
Kubernetes Publisher
9e5bcc7f62 Merge pull request #92688 from wojtek-t/automated-cherry-pick-of-#92537-upstream-release-1.18
Automated cherry pick of #92537 upstream release 1.18

Kubernetes-commit: 91ae197b9d0b13b8280992595d0556d118bf11dd
2020-07-09 05:07:02 +00:00
wojtekt
3ac0631eb9 Fix bug in reflector not recovering from "Too large resource version" errors
Kubernetes-commit: 8012722d626d50fd18059ec5af6d195a7dc180c1
2020-06-26 09:45:29 +02:00
7 changed files with 172 additions and 30 deletions

10
Godeps/Godeps.json generated
View File

@@ -76,7 +76,7 @@
},
{
"ImportPath": "github.com/evanphx/json-patch",
"Rev": "v4.2.0"
"Rev": "v4.9.0"
},
{
"ImportPath": "github.com/fsnotify/fsnotify",
@@ -238,6 +238,10 @@
"ImportPath": "github.com/peterbourgon/diskv",
"Rev": "v2.0.1"
},
{
"ImportPath": "github.com/pkg/errors",
"Rev": "v0.8.1"
},
{
"ImportPath": "github.com/pmezard/go-difflib",
"Rev": "v1.0.0"
@@ -348,11 +352,11 @@
},
{
"ImportPath": "k8s.io/api",
"Rev": "v0.18.4"
"Rev": "v0.18.10"
},
{
"ImportPath": "k8s.io/apimachinery",
"Rev": "v0.18.4"
"Rev": "v0.18.10"
},
{
"ImportPath": "k8s.io/gengo",

10
go.mod
View File

@@ -9,7 +9,7 @@ require (
github.com/Azure/go-autorest/autorest v0.9.0
github.com/Azure/go-autorest/autorest/adal v0.5.0
github.com/davecgh/go-spew v1.1.1
github.com/evanphx/json-patch v4.2.0+incompatible
github.com/evanphx/json-patch v4.9.0+incompatible
github.com/gogo/protobuf v1.3.1
github.com/golang/groupcache v0.0.0-20160516000752-02826c3e7903
github.com/golang/protobuf v1.3.2
@@ -28,8 +28,8 @@ require (
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4
google.golang.org/appengine v1.5.0 // indirect
k8s.io/api v0.18.4
k8s.io/apimachinery v0.18.4
k8s.io/api v0.18.10
k8s.io/apimachinery v0.18.10
k8s.io/klog v1.0.0
k8s.io/utils v0.0.0-20200324210504-a9aa75ae1b89
sigs.k8s.io/yaml v1.2.0
@@ -38,6 +38,6 @@ require (
replace (
golang.org/x/sys => golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a // pinned to release-branch.go1.13
golang.org/x/tools => golang.org/x/tools v0.0.0-20190821162956-65e3620a7ae7 // pinned to release-branch.go1.13
k8s.io/api => k8s.io/api v0.18.4
k8s.io/apimachinery => k8s.io/apimachinery v0.18.4
k8s.io/api => k8s.io/api v0.18.10
k8s.io/apimachinery => k8s.io/apimachinery v0.18.10
)

10
go.sum
View File

@@ -30,8 +30,8 @@ github.com/docker/spdystream v0.0.0-20160310174837-449fdfce4d96/go.mod h1:Qh8CwZ
github.com/elazarl/goproxy v0.0.0-20180725130230-947c36da3153 h1:yUdfgN0XgIJw7foRItutHYUIhlcKzcSf5vDpdhQAKTc=
github.com/elazarl/goproxy v0.0.0-20180725130230-947c36da3153/go.mod h1:/Zj4wYkgs4iZTTu3o/KG3Itv/qCCa8VVMlb3i9OVuzc=
github.com/emicklei/go-restful v0.0.0-20170410110728-ff4f55a20633/go.mod h1:otzb+WCGbkyDHkqmQmT5YD2WR4BBwUdeQoFo8l/7tVs=
github.com/evanphx/json-patch v4.2.0+incompatible h1:fUDGZCv/7iAN7u0puUVhvKCcsR6vRfwrJatElLBEf0I=
github.com/evanphx/json-patch v4.2.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk=
github.com/evanphx/json-patch v4.9.0+incompatible h1:kLcOMZeuLAJvL2BPWLMIj5oaZQobrkAqrL+WFZwQses=
github.com/evanphx/json-patch v4.9.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk=
github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/ghodss/yaml v0.0.0-20150909031657-73d445a93680/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
@@ -108,6 +108,8 @@ github.com/onsi/gomega v1.7.0 h1:XPnZz8VVBHjVsy1vzJmRwIcSwiUO+JFfrv/xGiigmME=
github.com/onsi/gomega v1.7.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/peterbourgon/diskv v2.0.1+incompatible h1:UBdAOUP5p4RWqPBg048CAvpKN+vxiaj6gdUUzhl4XmI=
github.com/peterbourgon/diskv v2.0.1+incompatible/go.mod h1:uqqh8zWWbv1HBMNONnaR/tNboyR3/BZd58JJSHlUSCU=
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/spf13/afero v1.2.2/go.mod h1:9ZxEEn6pIJ8Rxe320qSDBk6AsU0r9pR7Q4OcevTdifk=
@@ -182,8 +184,8 @@ gopkg.in/yaml.v2 v2.2.8 h1:obN1ZagJSUGI0Ek/LBmuj4SNLPfIny3KsKFopxRdj10=
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190106161140-3f1c8253044a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
k8s.io/api v0.18.4/go.mod h1:lOIQAKYgai1+vz9J7YcDZwC26Z0zQewYOGWdyIPUUQ4=
k8s.io/apimachinery v0.18.4/go.mod h1:OaXp26zu/5J7p0f92ASynJa1pZo06YlV9fG7BoWbCko=
k8s.io/api v0.18.10/go.mod h1:xWtwPX1v47j5RTncmlMFGCx8b0avh+nP8OgZZ9hjo3M=
k8s.io/apimachinery v0.18.10/go.mod h1:PF5taHbXgTEJLU+xMypMmYTXTWPJ5LaW8bfsisxnEXk=
k8s.io/gengo v0.0.0-20190128074634-0689ccc1d7d6/go.mod h1:ezvh/TsK7cY6rbqRK0oQQ8IAqLxYwwyPxAX1Pzy0ii0=
k8s.io/klog v0.0.0-20181102134211-b9b56d5dfc92/go.mod h1:Gq+BEi5rUBO/HRz0bTSXDUcqjScdoY3a9IHpCEIOOfk=
k8s.io/klog v0.3.0/go.mod h1:Gq+BEi5rUBO/HRz0bTSXDUcqjScdoY3a9IHpCEIOOfk=

View File

@@ -138,11 +138,11 @@ func (c *controller) Run(stopCh <-chan struct{}) {
c.reflectorMutex.Unlock()
var wg wait.Group
defer wg.Wait()
wg.StartWithChannel(stopCh, r.Run)
wait.Until(c.processLoop, time.Second, stopCh)
wg.Wait()
}
// Returns true once this controller has completed an initial resource listing

View File

@@ -402,3 +402,49 @@ func TestUpdate(t *testing.T) {
testDoneWG.Wait()
close(stop)
}
func TestPanicPropagated(t *testing.T) {
// source simulates an apiserver object endpoint.
source := fcache.NewFakeControllerSource()
// Make a controller that just panic if the AddFunc is called.
_, controller := NewInformer(
source,
&v1.Pod{},
time.Millisecond*100,
ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
// Create a panic.
panic("Just panic.")
},
},
)
// Run the controller and run it until we close stop.
stop := make(chan struct{})
defer close(stop)
propagated := make(chan interface{})
go func() {
defer func() {
if r := recover(); r != nil {
propagated <- r
}
}()
controller.Run(stop)
}()
// Let's add a object to the source. It will trigger a panic.
source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "test"}})
// Check if the panic propagated up.
select {
case p := <-propagated:
if p == "Just panic." {
t.Logf("Test Passed")
} else {
t.Errorf("unrecognized panic in controller run: %v", p)
}
case <-time.After(wait.ForeverTestTimeout):
t.Errorf("timeout: the panic failed to propagate from the controller run method!")
}
}

View File

@@ -82,9 +82,9 @@ type Reflector struct {
// observed when doing a sync with the underlying store
// it is thread safe, but not synchronized with the underlying store
lastSyncResourceVersion string
// isLastSyncResourceVersionGone is true if the previous list or watch request with lastSyncResourceVersion
// failed with an HTTP 410 (Gone) status code.
isLastSyncResourceVersionGone bool
// isLastSyncResourceVersionUnavailable is true if the previous list or watch request with
// lastSyncResourceVersion failed with an "expired" or "too large resource version" error.
isLastSyncResourceVersionUnavailable bool
// lastSyncResourceVersionMutex guards read/write access to lastSyncResourceVersion
lastSyncResourceVersionMutex sync.RWMutex
// WatchListPageSize is the requested chunk size of initial and resync watch lists.
@@ -256,13 +256,14 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
}
list, paginatedResult, err = pager.List(context.Background(), options)
if isExpiredError(err) {
r.setIsLastSyncResourceVersionExpired(true)
// Retry immediately if the resource version used to list is expired.
if isExpiredError(err) || isTooLargeResourceVersionError(err) {
r.setIsLastSyncResourceVersionUnavailable(true)
// Retry immediately if the resource version used to list is unavailable.
// The pager already falls back to full list if paginated list calls fail due to an "Expired" error on
// continuation pages, but the pager might not be enabled, or the full list might fail because the
// resource version it is listing at is expired, so we need to fallback to resourceVersion="" in all
// to recover and ensure the reflector makes forward progress.
// continuation pages, but the pager might not be enabled, the full list might fail because the
// resource version it is listing at is expired or the cache may not yet be synced to the provided
// resource version. So we need to fallback to resourceVersion="" in all to recover and ensure
// the reflector makes forward progress.
list, paginatedResult, err = pager.List(context.Background(), metav1.ListOptions{ResourceVersion: r.relistResourceVersion()})
}
close(listCh)
@@ -292,7 +293,7 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
r.paginatedResult = true
}
r.setIsLastSyncResourceVersionExpired(false) // list was successful
r.setIsLastSyncResourceVersionUnavailable(false) // list was successful
initTrace.Step("Objects listed")
listMetaInterface, err := meta.ListAccessor(list)
if err != nil {
@@ -396,7 +397,7 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
if err != errorStopRequested {
switch {
case isExpiredError(err):
// Don't set LastSyncResourceVersionExpired - LIST call with ResourceVersion=RV already
// Don't set LastSyncResourceVersionUnavailable - LIST call with ResourceVersion=RV already
// has a semantic that it returns data at least as fresh as provided RV.
// So first try to LIST with setting RV to resource version of last observed object.
klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.expectedTypeName, err)
@@ -519,9 +520,9 @@ func (r *Reflector) relistResourceVersion() string {
r.lastSyncResourceVersionMutex.RLock()
defer r.lastSyncResourceVersionMutex.RUnlock()
if r.isLastSyncResourceVersionGone {
if r.isLastSyncResourceVersionUnavailable {
// Since this reflector makes paginated list requests, and all paginated list requests skip the watch cache
// if the lastSyncResourceVersion is expired, we set ResourceVersion="" and list again to re-establish reflector
// if the lastSyncResourceVersion is unavailable, we set ResourceVersion="" and list again to re-establish reflector
// to the latest available ResourceVersion, using a consistent read from etcd.
return ""
}
@@ -533,12 +534,12 @@ func (r *Reflector) relistResourceVersion() string {
return r.lastSyncResourceVersion
}
// setIsLastSyncResourceVersionExpired sets if the last list or watch request with lastSyncResourceVersion returned a
// expired error: HTTP 410 (Gone) Status Code.
func (r *Reflector) setIsLastSyncResourceVersionExpired(isExpired bool) {
// setIsLastSyncResourceVersionUnavailable sets if the last list or watch request with lastSyncResourceVersion returned
// "expired" or "too large resource version" error.
func (r *Reflector) setIsLastSyncResourceVersionUnavailable(isUnavailable bool) {
r.lastSyncResourceVersionMutex.Lock()
defer r.lastSyncResourceVersionMutex.Unlock()
r.isLastSyncResourceVersionGone = isExpired
r.isLastSyncResourceVersionUnavailable = isUnavailable
}
func isExpiredError(err error) bool {
@@ -548,3 +549,28 @@ func isExpiredError(err error) bool {
// check when we fully drop support for Kubernetes 1.17 servers from reflectors.
return apierrors.IsResourceExpired(err) || apierrors.IsGone(err)
}
func isTooLargeResourceVersionError(err error) bool {
if apierrors.HasStatusCause(err, metav1.CauseTypeResourceVersionTooLarge) {
return true
}
// In Kubernetes 1.17.0-1.18.5, the api server doesn't set the error status cause to
// metav1.CauseTypeResourceVersionTooLarge to indicate that the requested minimum resource
// version is larger than the largest currently available resource version. To ensure backward
// compatibility with these server versions we also need to detect the error based on the content
// of the error message field.
if !apierrors.IsTimeout(err) {
return false
}
apierr, ok := err.(apierrors.APIStatus)
if !ok || apierr == nil || apierr.Status().Details == nil {
return false
}
for _, cause := range apierr.Status().Details.Causes {
// Matches the message returned by api server 1.17.0-1.18.5 for this error condition
if cause.Message == "Too large resource version" {
return true
}
}
return false
}

View File

@@ -714,6 +714,70 @@ func TestReflectorFullListIfExpired(t *testing.T) {
}
}
func TestReflectorFullListIfTooLarge(t *testing.T) {
stopCh := make(chan struct{})
s := NewStore(MetaNamespaceKeyFunc)
listCallRVs := []string{}
lw := &testLW{
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
// Stop once the reflector begins watching since we're only interested in the list.
close(stopCh)
fw := watch.NewFake()
return fw, nil
},
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
listCallRVs = append(listCallRVs, options.ResourceVersion)
switch options.ResourceVersion {
// initial list
case "0":
return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "20"}}, nil
// relist after the initial list
case "20":
err := apierrors.NewTimeoutError("too large resource version", 1)
err.ErrStatus.Details.Causes = []metav1.StatusCause{{Type: metav1.CauseTypeResourceVersionTooLarge}}
return nil, err
// relist after the initial list (covers the error format used in api server 1.17.0-1.18.5)
case "30":
err := apierrors.NewTimeoutError("too large resource version", 1)
err.ErrStatus.Details.Causes = []metav1.StatusCause{{Message: "Too large resource version"}}
return nil, err
// relist from etcd after "too large" error
case "":
return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "30"}}, nil
default:
return nil, fmt.Errorf("unexpected List call: %s", options.ResourceVersion)
}
},
}
r := NewReflector(lw, &v1.Pod{}, s, 0)
// Initial list should use RV=0
if err := r.ListAndWatch(stopCh); err != nil {
t.Fatal(err)
}
// Relist from the future version.
// This may happen, as watchcache is initialized from "current global etcd resource version"
// when kube-apiserver is starting and if no objects are changing after that each kube-apiserver
// may be synced to a different version and they will never converge.
// TODO: We should use etcd progress-notify feature to avoid this behavior but until this is
// done we simply try to relist from now to avoid continuous errors on relists.
for i := 1; i <= 2; i++ {
// relist twice to cover the two variants of TooLargeResourceVersion api errors
stopCh = make(chan struct{})
if err := r.ListAndWatch(stopCh); err != nil {
t.Fatal(err)
}
}
expectedRVs := []string{"0", "20", "", "30", ""}
if !reflect.DeepEqual(listCallRVs, expectedRVs) {
t.Errorf("Expected series of list calls with resource version of %#v but got: %#v", expectedRVs, listCallRVs)
}
}
func TestReflectorSetExpectedType(t *testing.T) {
obj := &unstructured.Unstructured{}
gvk := schema.GroupVersionKind{