mirror of
https://github.com/kubernetes/client-go.git
synced 2026-06-10 03:07:20 +00:00
Compare commits
1 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
ef9570953e |
14
Godeps/Godeps.json
generated
14
Godeps/Godeps.json
generated
@@ -76,7 +76,7 @@
|
||||
},
|
||||
{
|
||||
"ImportPath": "github.com/evanphx/json-patch",
|
||||
"Rev": "162e5629780b"
|
||||
"Rev": "v4.2.0"
|
||||
},
|
||||
{
|
||||
"ImportPath": "github.com/fsnotify/fsnotify",
|
||||
@@ -178,10 +178,6 @@
|
||||
"ImportPath": "github.com/imdario/mergo",
|
||||
"Rev": "v0.3.5"
|
||||
},
|
||||
{
|
||||
"ImportPath": "github.com/jessevdk/go-flags",
|
||||
"Rev": "v1.4.0"
|
||||
},
|
||||
{
|
||||
"ImportPath": "github.com/json-iterator/go",
|
||||
"Rev": "v1.1.8"
|
||||
@@ -242,10 +238,6 @@
|
||||
"ImportPath": "github.com/peterbourgon/diskv",
|
||||
"Rev": "v2.0.1"
|
||||
},
|
||||
{
|
||||
"ImportPath": "github.com/pkg/errors",
|
||||
"Rev": "v0.8.1"
|
||||
},
|
||||
{
|
||||
"ImportPath": "github.com/pmezard/go-difflib",
|
||||
"Rev": "v1.0.0"
|
||||
@@ -356,11 +348,11 @@
|
||||
},
|
||||
{
|
||||
"ImportPath": "k8s.io/api",
|
||||
"Rev": "v0.18.8"
|
||||
"Rev": "v0.18.5"
|
||||
},
|
||||
{
|
||||
"ImportPath": "k8s.io/apimachinery",
|
||||
"Rev": "v0.18.8"
|
||||
"Rev": "v0.18.5"
|
||||
},
|
||||
{
|
||||
"ImportPath": "k8s.io/gengo",
|
||||
|
||||
10
go.mod
10
go.mod
@@ -9,7 +9,7 @@ require (
|
||||
github.com/Azure/go-autorest/autorest v0.9.0
|
||||
github.com/Azure/go-autorest/autorest/adal v0.5.0
|
||||
github.com/davecgh/go-spew v1.1.1
|
||||
github.com/evanphx/json-patch v0.0.0-20200808040245-162e5629780b
|
||||
github.com/evanphx/json-patch v4.2.0+incompatible
|
||||
github.com/gogo/protobuf v1.3.1
|
||||
github.com/golang/groupcache v0.0.0-20160516000752-02826c3e7903
|
||||
github.com/golang/protobuf v1.3.2
|
||||
@@ -28,8 +28,8 @@ require (
|
||||
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45
|
||||
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4
|
||||
google.golang.org/appengine v1.5.0 // indirect
|
||||
k8s.io/api v0.18.8
|
||||
k8s.io/apimachinery v0.18.8
|
||||
k8s.io/api v0.18.5
|
||||
k8s.io/apimachinery v0.18.5
|
||||
k8s.io/klog v1.0.0
|
||||
k8s.io/utils v0.0.0-20200324210504-a9aa75ae1b89
|
||||
sigs.k8s.io/yaml v1.2.0
|
||||
@@ -38,6 +38,6 @@ require (
|
||||
replace (
|
||||
golang.org/x/sys => golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a // pinned to release-branch.go1.13
|
||||
golang.org/x/tools => golang.org/x/tools v0.0.0-20190821162956-65e3620a7ae7 // pinned to release-branch.go1.13
|
||||
k8s.io/api => k8s.io/api v0.18.8
|
||||
k8s.io/apimachinery => k8s.io/apimachinery v0.18.8
|
||||
k8s.io/api => k8s.io/api v0.18.5
|
||||
k8s.io/apimachinery => k8s.io/apimachinery v0.18.5
|
||||
)
|
||||
|
||||
11
go.sum
11
go.sum
@@ -30,8 +30,8 @@ github.com/docker/spdystream v0.0.0-20160310174837-449fdfce4d96/go.mod h1:Qh8CwZ
|
||||
github.com/elazarl/goproxy v0.0.0-20180725130230-947c36da3153 h1:yUdfgN0XgIJw7foRItutHYUIhlcKzcSf5vDpdhQAKTc=
|
||||
github.com/elazarl/goproxy v0.0.0-20180725130230-947c36da3153/go.mod h1:/Zj4wYkgs4iZTTu3o/KG3Itv/qCCa8VVMlb3i9OVuzc=
|
||||
github.com/emicklei/go-restful v0.0.0-20170410110728-ff4f55a20633/go.mod h1:otzb+WCGbkyDHkqmQmT5YD2WR4BBwUdeQoFo8l/7tVs=
|
||||
github.com/evanphx/json-patch v0.0.0-20200808040245-162e5629780b h1:vCplRbYcTTeBVLjIU0KvipEeVBSxl6sakUBRmeLBTkw=
|
||||
github.com/evanphx/json-patch v0.0.0-20200808040245-162e5629780b/go.mod h1:NAJj0yf/KaRKURN6nyi7A9IZydMivZEm9oQLWNjfKDc=
|
||||
github.com/evanphx/json-patch v4.2.0+incompatible h1:fUDGZCv/7iAN7u0puUVhvKCcsR6vRfwrJatElLBEf0I=
|
||||
github.com/evanphx/json-patch v4.2.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk=
|
||||
github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I=
|
||||
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
|
||||
github.com/ghodss/yaml v0.0.0-20150909031657-73d445a93680/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
|
||||
@@ -79,7 +79,6 @@ github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI=
|
||||
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
|
||||
github.com/imdario/mergo v0.3.5 h1:JboBksRwiiAJWvIYJVo46AfV+IAIKZpfrSzVKj42R4Q=
|
||||
github.com/imdario/mergo v0.3.5/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA=
|
||||
github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
|
||||
github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
|
||||
github.com/json-iterator/go v1.1.8 h1:QiWkFLKq0T7mpzwOTu6BzNDbfTE8OLrYhVKYMLF46Ok=
|
||||
github.com/json-iterator/go v1.1.8/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
|
||||
@@ -109,8 +108,6 @@ github.com/onsi/gomega v1.7.0 h1:XPnZz8VVBHjVsy1vzJmRwIcSwiUO+JFfrv/xGiigmME=
|
||||
github.com/onsi/gomega v1.7.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
|
||||
github.com/peterbourgon/diskv v2.0.1+incompatible h1:UBdAOUP5p4RWqPBg048CAvpKN+vxiaj6gdUUzhl4XmI=
|
||||
github.com/peterbourgon/diskv v2.0.1+incompatible/go.mod h1:uqqh8zWWbv1HBMNONnaR/tNboyR3/BZd58JJSHlUSCU=
|
||||
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
|
||||
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
|
||||
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||
github.com/spf13/afero v1.2.2/go.mod h1:9ZxEEn6pIJ8Rxe320qSDBk6AsU0r9pR7Q4OcevTdifk=
|
||||
@@ -185,8 +182,8 @@ gopkg.in/yaml.v2 v2.2.8 h1:obN1ZagJSUGI0Ek/LBmuj4SNLPfIny3KsKFopxRdj10=
|
||||
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
||||
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
|
||||
honnef.co/go/tools v0.0.0-20190106161140-3f1c8253044a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
|
||||
k8s.io/api v0.18.8/go.mod h1:d/CXqwWv+Z2XEG1LgceeDmHQwpUJhROPx16SlxJgERY=
|
||||
k8s.io/apimachinery v0.18.8/go.mod h1:6sQd+iHEqmOtALqOFjSWp2KZ9F0wlU/nWm0ZgsYWMig=
|
||||
k8s.io/api v0.18.5/go.mod h1:tN+e/2nbdGKOAH55NMV8oGrMG+3uRlA9GaRfvnCCSNk=
|
||||
k8s.io/apimachinery v0.18.5/go.mod h1:OaXp26zu/5J7p0f92ASynJa1pZo06YlV9fG7BoWbCko=
|
||||
k8s.io/gengo v0.0.0-20190128074634-0689ccc1d7d6/go.mod h1:ezvh/TsK7cY6rbqRK0oQQ8IAqLxYwwyPxAX1Pzy0ii0=
|
||||
k8s.io/klog v0.0.0-20181102134211-b9b56d5dfc92/go.mod h1:Gq+BEi5rUBO/HRz0bTSXDUcqjScdoY3a9IHpCEIOOfk=
|
||||
k8s.io/klog v0.3.0/go.mod h1:Gq+BEi5rUBO/HRz0bTSXDUcqjScdoY3a9IHpCEIOOfk=
|
||||
|
||||
39
tools/cache/reflector.go
vendored
39
tools/cache/reflector.go
vendored
@@ -82,9 +82,9 @@ type Reflector struct {
|
||||
// observed when doing a sync with the underlying store
|
||||
// it is thread safe, but not synchronized with the underlying store
|
||||
lastSyncResourceVersion string
|
||||
// isLastSyncResourceVersionUnavailable is true if the previous list or watch request with
|
||||
// lastSyncResourceVersion failed with an "expired" or "too large resource version" error.
|
||||
isLastSyncResourceVersionUnavailable bool
|
||||
// isLastSyncResourceVersionGone is true if the previous list or watch request with lastSyncResourceVersion
|
||||
// failed with an HTTP 410 (Gone) status code.
|
||||
isLastSyncResourceVersionGone bool
|
||||
// lastSyncResourceVersionMutex guards read/write access to lastSyncResourceVersion
|
||||
lastSyncResourceVersionMutex sync.RWMutex
|
||||
// WatchListPageSize is the requested chunk size of initial and resync watch lists.
|
||||
@@ -256,14 +256,13 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
|
||||
}
|
||||
|
||||
list, paginatedResult, err = pager.List(context.Background(), options)
|
||||
if isExpiredError(err) || isTooLargeResourceVersionError(err) {
|
||||
r.setIsLastSyncResourceVersionUnavailable(true)
|
||||
// Retry immediately if the resource version used to list is unavailable.
|
||||
if isExpiredError(err) {
|
||||
r.setIsLastSyncResourceVersionExpired(true)
|
||||
// Retry immediately if the resource version used to list is expired.
|
||||
// The pager already falls back to full list if paginated list calls fail due to an "Expired" error on
|
||||
// continuation pages, but the pager might not be enabled, the full list might fail because the
|
||||
// resource version it is listing at is expired or the cache may not yet be synced to the provided
|
||||
// resource version. So we need to fallback to resourceVersion="" in all to recover and ensure
|
||||
// the reflector makes forward progress.
|
||||
// continuation pages, but the pager might not be enabled, or the full list might fail because the
|
||||
// resource version it is listing at is expired, so we need to fallback to resourceVersion="" in all
|
||||
// to recover and ensure the reflector makes forward progress.
|
||||
list, paginatedResult, err = pager.List(context.Background(), metav1.ListOptions{ResourceVersion: r.relistResourceVersion()})
|
||||
}
|
||||
close(listCh)
|
||||
@@ -293,7 +292,7 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
|
||||
r.paginatedResult = true
|
||||
}
|
||||
|
||||
r.setIsLastSyncResourceVersionUnavailable(false) // list was successful
|
||||
r.setIsLastSyncResourceVersionExpired(false) // list was successful
|
||||
initTrace.Step("Objects listed")
|
||||
listMetaInterface, err := meta.ListAccessor(list)
|
||||
if err != nil {
|
||||
@@ -397,7 +396,7 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
|
||||
if err != errorStopRequested {
|
||||
switch {
|
||||
case isExpiredError(err):
|
||||
// Don't set LastSyncResourceVersionUnavailable - LIST call with ResourceVersion=RV already
|
||||
// Don't set LastSyncResourceVersionExpired - LIST call with ResourceVersion=RV already
|
||||
// has a semantic that it returns data at least as fresh as provided RV.
|
||||
// So first try to LIST with setting RV to resource version of last observed object.
|
||||
klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.expectedTypeName, err)
|
||||
@@ -520,9 +519,9 @@ func (r *Reflector) relistResourceVersion() string {
|
||||
r.lastSyncResourceVersionMutex.RLock()
|
||||
defer r.lastSyncResourceVersionMutex.RUnlock()
|
||||
|
||||
if r.isLastSyncResourceVersionUnavailable {
|
||||
if r.isLastSyncResourceVersionGone {
|
||||
// Since this reflector makes paginated list requests, and all paginated list requests skip the watch cache
|
||||
// if the lastSyncResourceVersion is unavailable, we set ResourceVersion="" and list again to re-establish reflector
|
||||
// if the lastSyncResourceVersion is expired, we set ResourceVersion="" and list again to re-establish reflector
|
||||
// to the latest available ResourceVersion, using a consistent read from etcd.
|
||||
return ""
|
||||
}
|
||||
@@ -534,12 +533,12 @@ func (r *Reflector) relistResourceVersion() string {
|
||||
return r.lastSyncResourceVersion
|
||||
}
|
||||
|
||||
// setIsLastSyncResourceVersionUnavailable sets if the last list or watch request with lastSyncResourceVersion returned
|
||||
// "expired" or "too large resource version" error.
|
||||
func (r *Reflector) setIsLastSyncResourceVersionUnavailable(isUnavailable bool) {
|
||||
// setIsLastSyncResourceVersionExpired sets if the last list or watch request with lastSyncResourceVersion returned a
|
||||
// expired error: HTTP 410 (Gone) Status Code.
|
||||
func (r *Reflector) setIsLastSyncResourceVersionExpired(isExpired bool) {
|
||||
r.lastSyncResourceVersionMutex.Lock()
|
||||
defer r.lastSyncResourceVersionMutex.Unlock()
|
||||
r.isLastSyncResourceVersionUnavailable = isUnavailable
|
||||
r.isLastSyncResourceVersionGone = isExpired
|
||||
}
|
||||
|
||||
func isExpiredError(err error) bool {
|
||||
@@ -549,7 +548,3 @@ func isExpiredError(err error) bool {
|
||||
// check when we fully drop support for Kubernetes 1.17 servers from reflectors.
|
||||
return apierrors.IsResourceExpired(err) || apierrors.IsGone(err)
|
||||
}
|
||||
|
||||
func isTooLargeResourceVersionError(err error) bool {
|
||||
return apierrors.HasStatusCause(err, metav1.CauseTypeResourceVersionTooLarge)
|
||||
}
|
||||
|
||||
56
tools/cache/reflector_test.go
vendored
56
tools/cache/reflector_test.go
vendored
@@ -714,62 +714,6 @@ func TestReflectorFullListIfExpired(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestReflectorFullListIfTooLarge(t *testing.T) {
|
||||
stopCh := make(chan struct{})
|
||||
s := NewStore(MetaNamespaceKeyFunc)
|
||||
listCallRVs := []string{}
|
||||
|
||||
lw := &testLW{
|
||||
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
|
||||
// Stop once the reflector begins watching since we're only interested in the list.
|
||||
close(stopCh)
|
||||
fw := watch.NewFake()
|
||||
return fw, nil
|
||||
},
|
||||
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
|
||||
listCallRVs = append(listCallRVs, options.ResourceVersion)
|
||||
|
||||
switch options.ResourceVersion {
|
||||
// initial list
|
||||
case "0":
|
||||
return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "20"}}, nil
|
||||
// relist after the initial list
|
||||
case "20":
|
||||
err := apierrors.NewTimeoutError("too large resource version", 1)
|
||||
err.ErrStatus.Details.Causes = []metav1.StatusCause{{Type: metav1.CauseTypeResourceVersionTooLarge}}
|
||||
return nil, err
|
||||
// relist from etcd after "too large" error
|
||||
case "":
|
||||
return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "10"}}, nil
|
||||
default:
|
||||
return nil, fmt.Errorf("unexpected List call: %s", options.ResourceVersion)
|
||||
}
|
||||
},
|
||||
}
|
||||
r := NewReflector(lw, &v1.Pod{}, s, 0)
|
||||
|
||||
// Initial list should use RV=0
|
||||
if err := r.ListAndWatch(stopCh); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Relist from the future version.
|
||||
// This may happen, as watchcache is initialized from "current global etcd resource version"
|
||||
// when kube-apiserver is starting and if no objects are changing after that each kube-apiserver
|
||||
// may be synced to a different version and they will never converge.
|
||||
// TODO: We should use etcd progress-notify feature to avoid this behavior but until this is
|
||||
// done we simply try to relist from now to avoid continuous errors on relists.
|
||||
stopCh = make(chan struct{})
|
||||
if err := r.ListAndWatch(stopCh); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
expectedRVs := []string{"0", "20", ""}
|
||||
if !reflect.DeepEqual(listCallRVs, expectedRVs) {
|
||||
t.Errorf("Expected series of list calls with resource version of %#v but got: %#v", expectedRVs, listCallRVs)
|
||||
}
|
||||
}
|
||||
|
||||
func TestReflectorSetExpectedType(t *testing.T) {
|
||||
obj := &unstructured.Unstructured{}
|
||||
gvk := schema.GroupVersionKind{
|
||||
|
||||
Reference in New Issue
Block a user