mirror of
https://github.com/kubernetes/client-go.git
synced 2026-06-12 04:06:38 +00:00
Compare commits
19 Commits
v0.18.0-be
...
v0.18.6
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
dea2c88477 | ||
|
|
9e5bcc7f62 | ||
|
|
3ac0631eb9 | ||
|
|
303c51f3d4 | ||
|
|
7b592a8174 | ||
|
|
11b3f81436 | ||
|
|
19e102247d | ||
|
|
b3ff053fb1 | ||
|
|
3ce1317b14 | ||
|
|
aa2d6328a3 | ||
|
|
dbcb4c305c | ||
|
|
88ba1c9b80 | ||
|
|
136e483e70 | ||
|
|
dc8515ba96 | ||
|
|
8ca7f667a2 | ||
|
|
3e3c7a4209 | ||
|
|
6be3e0bf76 | ||
|
|
d0229e78f4 | ||
|
|
63c6663c56 |
8
Godeps/Godeps.json
generated
8
Godeps/Godeps.json
generated
@@ -348,11 +348,11 @@
|
||||
},
|
||||
{
|
||||
"ImportPath": "k8s.io/api",
|
||||
"Rev": "5c56b6577da0"
|
||||
"Rev": "v0.18.6"
|
||||
},
|
||||
{
|
||||
"ImportPath": "k8s.io/apimachinery",
|
||||
"Rev": "b75c29caafab"
|
||||
"Rev": "v0.18.6"
|
||||
},
|
||||
{
|
||||
"ImportPath": "k8s.io/gengo",
|
||||
@@ -364,11 +364,11 @@
|
||||
},
|
||||
{
|
||||
"ImportPath": "k8s.io/kube-openapi",
|
||||
"Rev": "bf4fb3bd569c"
|
||||
"Rev": "61e04a5be9a6"
|
||||
},
|
||||
{
|
||||
"ImportPath": "k8s.io/utils",
|
||||
"Rev": "0a110f9eb7ab"
|
||||
"Rev": "a9aa75ae1b89"
|
||||
},
|
||||
{
|
||||
"ImportPath": "sigs.k8s.io/structured-merge-diff/v3",
|
||||
|
||||
10
go.mod
10
go.mod
@@ -28,16 +28,16 @@ require (
|
||||
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45
|
||||
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4
|
||||
google.golang.org/appengine v1.5.0 // indirect
|
||||
k8s.io/api v0.0.0-20200308145338-5c56b6577da0
|
||||
k8s.io/apimachinery v0.0.0-20200308145338-b75c29caafab
|
||||
k8s.io/api v0.18.6
|
||||
k8s.io/apimachinery v0.18.6
|
||||
k8s.io/klog v1.0.0
|
||||
k8s.io/utils v0.0.0-20200229041039-0a110f9eb7ab
|
||||
k8s.io/utils v0.0.0-20200324210504-a9aa75ae1b89
|
||||
sigs.k8s.io/yaml v1.2.0
|
||||
)
|
||||
|
||||
replace (
|
||||
golang.org/x/sys => golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a // pinned to release-branch.go1.13
|
||||
golang.org/x/tools => golang.org/x/tools v0.0.0-20190821162956-65e3620a7ae7 // pinned to release-branch.go1.13
|
||||
k8s.io/api => k8s.io/api v0.0.0-20200308145338-5c56b6577da0
|
||||
k8s.io/apimachinery => k8s.io/apimachinery v0.0.0-20200308145338-b75c29caafab
|
||||
k8s.io/api => k8s.io/api v0.18.6
|
||||
k8s.io/apimachinery => k8s.io/apimachinery v0.18.6
|
||||
)
|
||||
|
||||
12
go.sum
12
go.sum
@@ -182,17 +182,17 @@ gopkg.in/yaml.v2 v2.2.8 h1:obN1ZagJSUGI0Ek/LBmuj4SNLPfIny3KsKFopxRdj10=
|
||||
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
||||
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
|
||||
honnef.co/go/tools v0.0.0-20190106161140-3f1c8253044a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
|
||||
k8s.io/api v0.0.0-20200308145338-5c56b6577da0/go.mod h1:dQFmApLGF1AzHBoCa9z1a7ftM/1nZEAoknHzuttq6eY=
|
||||
k8s.io/apimachinery v0.0.0-20200308145338-b75c29caafab/go.mod h1:9SnR/e11v5IbyPCGbvJViimtJ0SwHG4nfZFjU77ftcA=
|
||||
k8s.io/api v0.18.6/go.mod h1:eeyxr+cwCjMdLAmr2W3RyDI0VvTawSg/3RFFBEnmZGI=
|
||||
k8s.io/apimachinery v0.18.6/go.mod h1:OaXp26zu/5J7p0f92ASynJa1pZo06YlV9fG7BoWbCko=
|
||||
k8s.io/gengo v0.0.0-20190128074634-0689ccc1d7d6/go.mod h1:ezvh/TsK7cY6rbqRK0oQQ8IAqLxYwwyPxAX1Pzy0ii0=
|
||||
k8s.io/klog v0.0.0-20181102134211-b9b56d5dfc92/go.mod h1:Gq+BEi5rUBO/HRz0bTSXDUcqjScdoY3a9IHpCEIOOfk=
|
||||
k8s.io/klog v0.3.0/go.mod h1:Gq+BEi5rUBO/HRz0bTSXDUcqjScdoY3a9IHpCEIOOfk=
|
||||
k8s.io/klog v1.0.0 h1:Pt+yjF5aB1xDSVbau4VsWe+dQNzA0qv1LlXdC2dF6Q8=
|
||||
k8s.io/klog v1.0.0/go.mod h1:4Bi6QPql/J/LkTDqv7R/cd3hPo4k2DG6Ptcz060Ez5I=
|
||||
k8s.io/kube-openapi v0.0.0-20200121204235-bf4fb3bd569c h1:/KUFqjjqAcY4Us6luF5RDNZ16KJtb49HfR3ZHB9qYXM=
|
||||
k8s.io/kube-openapi v0.0.0-20200121204235-bf4fb3bd569c/go.mod h1:GRQhZsXIAJ1xR0C9bd8UpWHZ5plfAS9fzPjJuQ6JL3E=
|
||||
k8s.io/utils v0.0.0-20200229041039-0a110f9eb7ab h1:I3f2hcBrepGRXI1z4sukzAb8w1R4eqbsHrAsx06LGYM=
|
||||
k8s.io/utils v0.0.0-20200229041039-0a110f9eb7ab/go.mod h1:sZAwmy6armz5eXlNoLmJcl4F1QuKu7sr+mFQ0byX7Ew=
|
||||
k8s.io/kube-openapi v0.0.0-20200410145947-61e04a5be9a6 h1:Oh3Mzx5pJ+yIumsAD0MOECPVeXsVot0UkiaCGVyfGQY=
|
||||
k8s.io/kube-openapi v0.0.0-20200410145947-61e04a5be9a6/go.mod h1:GRQhZsXIAJ1xR0C9bd8UpWHZ5plfAS9fzPjJuQ6JL3E=
|
||||
k8s.io/utils v0.0.0-20200324210504-a9aa75ae1b89 h1:d4vVOjXm687F1iLSP2q3lyPPuyvTUt3aVoBpi2DqRsU=
|
||||
k8s.io/utils v0.0.0-20200324210504-a9aa75ae1b89/go.mod h1:sZAwmy6armz5eXlNoLmJcl4F1QuKu7sr+mFQ0byX7Ew=
|
||||
sigs.k8s.io/structured-merge-diff/v3 v3.0.0-20200116222232-67a7b8c61874/go.mod h1:PlARxl6Hbt/+BC80dRLi1qAmnMqwqDg62YvvVkZjemw=
|
||||
sigs.k8s.io/structured-merge-diff/v3 v3.0.0 h1:dOmIZBMfhcHS09XZkMyUgkq5trg3/jRyJYFZUiaOp8E=
|
||||
sigs.k8s.io/structured-merge-diff/v3 v3.0.0/go.mod h1:PlARxl6Hbt/+BC80dRLi1qAmnMqwqDg62YvvVkZjemw=
|
||||
|
||||
@@ -273,8 +273,9 @@ func (ts *azureTokenSource) retrieveTokenFromCfg() (*azureToken, error) {
|
||||
if expiresOn == "" {
|
||||
return nil, fmt.Errorf("no expiresOn in cfg: %s", cfgExpiresOn)
|
||||
}
|
||||
tokenAudience := resourceID
|
||||
if ts.configMode == configModeDefault {
|
||||
resourceID = fmt.Sprintf("spn:%s", resourceID)
|
||||
tokenAudience = fmt.Sprintf("spn:%s", resourceID)
|
||||
}
|
||||
|
||||
return &azureToken{
|
||||
@@ -284,7 +285,7 @@ func (ts *azureTokenSource) retrieveTokenFromCfg() (*azureToken, error) {
|
||||
ExpiresIn: json.Number(expiresIn),
|
||||
ExpiresOn: json.Number(expiresOn),
|
||||
NotBefore: json.Number(expiresOn),
|
||||
Resource: resourceID,
|
||||
Resource: tokenAudience,
|
||||
Type: tokenType,
|
||||
},
|
||||
environment: environment,
|
||||
|
||||
@@ -18,6 +18,7 @@ package azure
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
@@ -170,6 +171,55 @@ func TestAzureTokenSource(t *testing.T) {
|
||||
expectedConfigModes := []string{"1", "0"}
|
||||
|
||||
for i, configMode := range configModes {
|
||||
t.Run(fmt.Sprintf("validate token from cfg with configMode %v", configMode), func(t *testing.T) {
|
||||
const (
|
||||
serverID = "fakeServerID"
|
||||
clientID = "fakeClientID"
|
||||
tenantID = "fakeTenantID"
|
||||
accessToken = "fakeToken"
|
||||
environment = "fakeEnvironment"
|
||||
refreshToken = "fakeToken"
|
||||
expiresIn = "foo"
|
||||
expiresOn = "foo"
|
||||
)
|
||||
cfg := map[string]string{
|
||||
cfgConfigMode: string(configMode),
|
||||
cfgApiserverID: serverID,
|
||||
cfgClientID: clientID,
|
||||
cfgTenantID: tenantID,
|
||||
cfgEnvironment: environment,
|
||||
cfgAccessToken: accessToken,
|
||||
cfgRefreshToken: refreshToken,
|
||||
cfgExpiresIn: expiresIn,
|
||||
cfgExpiresOn: expiresOn,
|
||||
}
|
||||
fakeSource := fakeTokenSource{}
|
||||
persiter := &fakePersister{cache: make(map[string]string)}
|
||||
tokenCache := newAzureTokenCache()
|
||||
tokenSource := newAzureTokenSource(&fakeSource, tokenCache, cfg, configMode, persiter)
|
||||
azTokenSource := tokenSource.(*azureTokenSource)
|
||||
token, err := azTokenSource.retrieveTokenFromCfg()
|
||||
if err != nil {
|
||||
t.Errorf("failed to retrieve the token form cfg: %s", err)
|
||||
}
|
||||
if token.apiserverID != serverID {
|
||||
t.Errorf("expecting token.apiserverID: %s, actual: %s", serverID, token.apiserverID)
|
||||
}
|
||||
if token.clientID != clientID {
|
||||
t.Errorf("expecting token.clientID: %s, actual: %s", clientID, token.clientID)
|
||||
}
|
||||
if token.tenantID != tenantID {
|
||||
t.Errorf("expecting token.tenantID: %s, actual: %s", tenantID, token.tenantID)
|
||||
}
|
||||
expectedAudience := serverID
|
||||
if configMode == configModeDefault {
|
||||
expectedAudience = fmt.Sprintf("spn:%s", serverID)
|
||||
}
|
||||
if token.token.Resource != expectedAudience {
|
||||
t.Errorf("expecting adal token.Resource: %s, actual: %s", expectedAudience, token.token.Resource)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("validate token against cache", func(t *testing.T) {
|
||||
fakeAccessToken := "fake token 1"
|
||||
fakeSource := fakeTokenSource{
|
||||
|
||||
@@ -655,7 +655,7 @@ func (r *Request) Watch(ctx context.Context) (watch.Interface, error) {
|
||||
if err != nil {
|
||||
// The watch stream mechanism handles many common partial data errors, so closed
|
||||
// connections can be retried in many cases.
|
||||
if net.IsProbableEOF(err) {
|
||||
if net.IsProbableEOF(err) || net.IsTimeout(err) {
|
||||
return watch.NewEmptyWatch(), nil
|
||||
}
|
||||
return nil, err
|
||||
|
||||
46
tools/cache/reflector.go
vendored
46
tools/cache/reflector.go
vendored
@@ -82,9 +82,9 @@ type Reflector struct {
|
||||
// observed when doing a sync with the underlying store
|
||||
// it is thread safe, but not synchronized with the underlying store
|
||||
lastSyncResourceVersion string
|
||||
// isLastSyncResourceVersionGone is true if the previous list or watch request with lastSyncResourceVersion
|
||||
// failed with an HTTP 410 (Gone) status code.
|
||||
isLastSyncResourceVersionGone bool
|
||||
// isLastSyncResourceVersionUnavailable is true if the previous list or watch request with
|
||||
// lastSyncResourceVersion failed with an "expired" or "too large resource version" error.
|
||||
isLastSyncResourceVersionUnavailable bool
|
||||
// lastSyncResourceVersionMutex guards read/write access to lastSyncResourceVersion
|
||||
lastSyncResourceVersionMutex sync.RWMutex
|
||||
// WatchListPageSize is the requested chunk size of initial and resync watch lists.
|
||||
@@ -256,13 +256,14 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
|
||||
}
|
||||
|
||||
list, paginatedResult, err = pager.List(context.Background(), options)
|
||||
if isExpiredError(err) {
|
||||
r.setIsLastSyncResourceVersionExpired(true)
|
||||
// Retry immediately if the resource version used to list is expired.
|
||||
if isExpiredError(err) || isTooLargeResourceVersionError(err) {
|
||||
r.setIsLastSyncResourceVersionUnavailable(true)
|
||||
// Retry immediately if the resource version used to list is unavailable.
|
||||
// The pager already falls back to full list if paginated list calls fail due to an "Expired" error on
|
||||
// continuation pages, but the pager might not be enabled, or the full list might fail because the
|
||||
// resource version it is listing at is expired, so we need to fallback to resourceVersion="" in all
|
||||
// to recover and ensure the reflector makes forward progress.
|
||||
// continuation pages, but the pager might not be enabled, the full list might fail because the
|
||||
// resource version it is listing at is expired or the cache may not yet be synced to the provided
|
||||
// resource version. So we need to fallback to resourceVersion="" in all to recover and ensure
|
||||
// the reflector makes forward progress.
|
||||
list, paginatedResult, err = pager.List(context.Background(), metav1.ListOptions{ResourceVersion: r.relistResourceVersion()})
|
||||
}
|
||||
close(listCh)
|
||||
@@ -292,7 +293,7 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
|
||||
r.paginatedResult = true
|
||||
}
|
||||
|
||||
r.setIsLastSyncResourceVersionExpired(false) // list was successful
|
||||
r.setIsLastSyncResourceVersionUnavailable(false) // list was successful
|
||||
initTrace.Step("Objects listed")
|
||||
listMetaInterface, err := meta.ListAccessor(list)
|
||||
if err != nil {
|
||||
@@ -364,6 +365,8 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
|
||||
AllowWatchBookmarks: true,
|
||||
}
|
||||
|
||||
// start the clock before sending the request, since some proxies won't flush headers until after the first watch event is sent
|
||||
start := r.clock.Now()
|
||||
w, err := r.listerWatcher.Watch(options)
|
||||
if err != nil {
|
||||
switch {
|
||||
@@ -390,11 +393,11 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
if err := r.watchHandler(w, &resourceVersion, resyncerrc, stopCh); err != nil {
|
||||
if err := r.watchHandler(start, w, &resourceVersion, resyncerrc, stopCh); err != nil {
|
||||
if err != errorStopRequested {
|
||||
switch {
|
||||
case isExpiredError(err):
|
||||
// Don't set LastSyncResourceVersionExpired - LIST call with ResourceVersion=RV already
|
||||
// Don't set LastSyncResourceVersionUnavailable - LIST call with ResourceVersion=RV already
|
||||
// has a semantic that it returns data at least as fresh as provided RV.
|
||||
// So first try to LIST with setting RV to resource version of last observed object.
|
||||
klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.expectedTypeName, err)
|
||||
@@ -417,8 +420,7 @@ func (r *Reflector) syncWith(items []runtime.Object, resourceVersion string) err
|
||||
}
|
||||
|
||||
// watchHandler watches w and keeps *resourceVersion up to date.
|
||||
func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {
|
||||
start := r.clock.Now()
|
||||
func (r *Reflector) watchHandler(start time.Time, w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {
|
||||
eventCount := 0
|
||||
|
||||
// Stopping the watcher should be idempotent and if we return from this function there's no way
|
||||
@@ -518,9 +520,9 @@ func (r *Reflector) relistResourceVersion() string {
|
||||
r.lastSyncResourceVersionMutex.RLock()
|
||||
defer r.lastSyncResourceVersionMutex.RUnlock()
|
||||
|
||||
if r.isLastSyncResourceVersionGone {
|
||||
if r.isLastSyncResourceVersionUnavailable {
|
||||
// Since this reflector makes paginated list requests, and all paginated list requests skip the watch cache
|
||||
// if the lastSyncResourceVersion is expired, we set ResourceVersion="" and list again to re-establish reflector
|
||||
// if the lastSyncResourceVersion is unavailable, we set ResourceVersion="" and list again to re-establish reflector
|
||||
// to the latest available ResourceVersion, using a consistent read from etcd.
|
||||
return ""
|
||||
}
|
||||
@@ -532,12 +534,12 @@ func (r *Reflector) relistResourceVersion() string {
|
||||
return r.lastSyncResourceVersion
|
||||
}
|
||||
|
||||
// setIsLastSyncResourceVersionExpired sets if the last list or watch request with lastSyncResourceVersion returned a
|
||||
// expired error: HTTP 410 (Gone) Status Code.
|
||||
func (r *Reflector) setIsLastSyncResourceVersionExpired(isExpired bool) {
|
||||
// setIsLastSyncResourceVersionUnavailable sets if the last list or watch request with lastSyncResourceVersion returned
|
||||
// "expired" or "too large resource version" error.
|
||||
func (r *Reflector) setIsLastSyncResourceVersionUnavailable(isUnavailable bool) {
|
||||
r.lastSyncResourceVersionMutex.Lock()
|
||||
defer r.lastSyncResourceVersionMutex.Unlock()
|
||||
r.isLastSyncResourceVersionGone = isExpired
|
||||
r.isLastSyncResourceVersionUnavailable = isUnavailable
|
||||
}
|
||||
|
||||
func isExpiredError(err error) bool {
|
||||
@@ -547,3 +549,7 @@ func isExpiredError(err error) bool {
|
||||
// check when we fully drop support for Kubernetes 1.17 servers from reflectors.
|
||||
return apierrors.IsResourceExpired(err) || apierrors.IsGone(err)
|
||||
}
|
||||
|
||||
func isTooLargeResourceVersionError(err error) bool {
|
||||
return apierrors.HasStatusCause(err, metav1.CauseTypeResourceVersionTooLarge)
|
||||
}
|
||||
|
||||
62
tools/cache/reflector_test.go
vendored
62
tools/cache/reflector_test.go
vendored
@@ -136,7 +136,7 @@ func TestReflectorWatchHandlerError(t *testing.T) {
|
||||
fw.Stop()
|
||||
}()
|
||||
var resumeRV string
|
||||
err := g.watchHandler(fw, &resumeRV, nevererrc, wait.NeverStop)
|
||||
err := g.watchHandler(time.Now(), fw, &resumeRV, nevererrc, wait.NeverStop)
|
||||
if err == nil {
|
||||
t.Errorf("unexpected non-error")
|
||||
}
|
||||
@@ -156,7 +156,7 @@ func TestReflectorWatchHandler(t *testing.T) {
|
||||
fw.Stop()
|
||||
}()
|
||||
var resumeRV string
|
||||
err := g.watchHandler(fw, &resumeRV, nevererrc, wait.NeverStop)
|
||||
err := g.watchHandler(time.Now(), fw, &resumeRV, nevererrc, wait.NeverStop)
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error %v", err)
|
||||
}
|
||||
@@ -205,7 +205,7 @@ func TestReflectorStopWatch(t *testing.T) {
|
||||
var resumeRV string
|
||||
stopWatch := make(chan struct{}, 1)
|
||||
stopWatch <- struct{}{}
|
||||
err := g.watchHandler(fw, &resumeRV, nevererrc, stopWatch)
|
||||
err := g.watchHandler(time.Now(), fw, &resumeRV, nevererrc, stopWatch)
|
||||
if err != errorStopRequested {
|
||||
t.Errorf("expected stop error, got %q", err)
|
||||
}
|
||||
@@ -714,6 +714,62 @@ func TestReflectorFullListIfExpired(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestReflectorFullListIfTooLarge(t *testing.T) {
|
||||
stopCh := make(chan struct{})
|
||||
s := NewStore(MetaNamespaceKeyFunc)
|
||||
listCallRVs := []string{}
|
||||
|
||||
lw := &testLW{
|
||||
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
|
||||
// Stop once the reflector begins watching since we're only interested in the list.
|
||||
close(stopCh)
|
||||
fw := watch.NewFake()
|
||||
return fw, nil
|
||||
},
|
||||
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
|
||||
listCallRVs = append(listCallRVs, options.ResourceVersion)
|
||||
|
||||
switch options.ResourceVersion {
|
||||
// initial list
|
||||
case "0":
|
||||
return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "20"}}, nil
|
||||
// relist after the initial list
|
||||
case "20":
|
||||
err := apierrors.NewTimeoutError("too large resource version", 1)
|
||||
err.ErrStatus.Details.Causes = []metav1.StatusCause{{Type: metav1.CauseTypeResourceVersionTooLarge}}
|
||||
return nil, err
|
||||
// relist from etcd after "too large" error
|
||||
case "":
|
||||
return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "10"}}, nil
|
||||
default:
|
||||
return nil, fmt.Errorf("unexpected List call: %s", options.ResourceVersion)
|
||||
}
|
||||
},
|
||||
}
|
||||
r := NewReflector(lw, &v1.Pod{}, s, 0)
|
||||
|
||||
// Initial list should use RV=0
|
||||
if err := r.ListAndWatch(stopCh); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Relist from the future version.
|
||||
// This may happen, as watchcache is initialized from "current global etcd resource version"
|
||||
// when kube-apiserver is starting and if no objects are changing after that each kube-apiserver
|
||||
// may be synced to a different version and they will never converge.
|
||||
// TODO: We should use etcd progress-notify feature to avoid this behavior but until this is
|
||||
// done we simply try to relist from now to avoid continuous errors on relists.
|
||||
stopCh = make(chan struct{})
|
||||
if err := r.ListAndWatch(stopCh); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
expectedRVs := []string{"0", "20", ""}
|
||||
if !reflect.DeepEqual(listCallRVs, expectedRVs) {
|
||||
t.Errorf("Expected series of list calls with resource version of %#v but got: %#v", expectedRVs, listCallRVs)
|
||||
}
|
||||
}
|
||||
|
||||
func TestReflectorSetExpectedType(t *testing.T) {
|
||||
obj := &unstructured.Unstructured{}
|
||||
gvk := schema.GroupVersionKind{
|
||||
|
||||
@@ -35,7 +35,7 @@ import (
|
||||
var (
|
||||
// ClusterDefaults has the same behavior as the old EnvVar and DefaultCluster fields
|
||||
// DEPRECATED will be replaced
|
||||
ClusterDefaults = clientcmdapi.Cluster{Server: os.Getenv("KUBERNETES_MASTER")}
|
||||
ClusterDefaults = clientcmdapi.Cluster{Server: getDefaultServer()}
|
||||
// DefaultClientConfig represents the legacy behavior of this package for defaulting
|
||||
// DEPRECATED will be replace
|
||||
DefaultClientConfig = DirectClientConfig{*clientcmdapi.NewConfig(), "", &ConfigOverrides{
|
||||
@@ -43,6 +43,15 @@ var (
|
||||
}, nil, NewDefaultClientConfigLoadingRules(), promptedCredentials{}}
|
||||
)
|
||||
|
||||
// getDefaultServer returns a default setting for DefaultClientConfig
|
||||
// DEPRECATED
|
||||
func getDefaultServer() string {
|
||||
if server := os.Getenv("KUBERNETES_MASTER"); len(server) > 0 {
|
||||
return server
|
||||
}
|
||||
return "http://localhost:8080"
|
||||
}
|
||||
|
||||
// ClientConfig is used to make it easy to get an api server client
|
||||
type ClientConfig interface {
|
||||
// RawConfig returns the merged result of all overrides
|
||||
|
||||
@@ -88,6 +88,9 @@ func (cml *ConfigMapLock) Update(ctx context.Context, ler LeaderElectionRecord)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if cml.cm.Annotations == nil {
|
||||
cml.cm.Annotations = make(map[string]string)
|
||||
}
|
||||
cml.cm.Annotations[LeaderElectionRecordAnnotationKey] = string(recordBytes)
|
||||
cml.cm, err = cml.Client.ConfigMaps(cml.ConfigMapMeta.Namespace).Update(ctx, cml.cm, metav1.UpdateOptions{})
|
||||
return err
|
||||
|
||||
@@ -120,7 +120,7 @@ func (rw *RetryWatcher) doReceive() (bool, time.Duration) {
|
||||
|
||||
default:
|
||||
msg := "Watch failed: %v"
|
||||
if net.IsProbableEOF(err) {
|
||||
if net.IsProbableEOF(err) || net.IsTimeout(err) {
|
||||
klog.V(5).Infof(msg, err)
|
||||
// Retry
|
||||
return false, 0
|
||||
|
||||
Reference in New Issue
Block a user