Compare commits

..

7 Commits

Author SHA1 Message Date
Kubernetes Publisher
8a9119cfa8 Update dependencies to v0.23.16-rc.0 tag 2022-12-08 20:25:17 +00:00
Kubernetes Publisher
3dd56d254d Merge pull request #111914 from aojea/crd_recursion_bug_1_23
cherry pick pr #111557 to 1.23 :update smd to 4.2.3

Kubernetes-commit: 5f01bb40bcee9c5f16f08fe9a2fe5a39beb81101
2022-12-02 18:04:32 +00:00
Kubernetes Publisher
d33f57bea0 Merge pull request #113124 from jkh52/release-1.23
Bump konnectivity network proxy to v0.0.33.

Kubernetes-commit: c6c73eb853d52bed51ac341dc03ed13b5fe20868
2022-10-27 00:01:26 +00:00
Joseph Anttila Hall
e07918571c Bump konnectivity network proxy to v0.0.33.
Includes a couple bug fixes for better handling of dial failures.
[Agent &
Server](https://github.com/kubernetes-sigs/apiserver-network-proxy/commits/v0.0.33)
include numerous other fixes.

Kubernetes-commit: c2e5631742fff152edc3714f9b3e0064aaf92eb2
2022-10-17 15:46:15 -07:00
Kubernetes Publisher
4fe0cac705 Merge pull request #112067 from tkashem/automated-cherry-pick-of-#109114-upstream-release-1.23-1661519732
client-go: make retry in Request thread safe

Kubernetes-commit: bca383612aed64083b6b2c10d2634082877dc6da
2022-10-04 19:53:33 +00:00
Antonio Ojea
65a4692572 update structured-merge-diff to 4.2.3
Co-authored-by: silenceper <silenceper@gmail.com>

Kubernetes-commit: 93dedd539c4d0ad70166a0036b67964c333a692c
2022-08-18 16:41:10 +02:00
Abu Kashem
7763f75022 client-go: make retry in Request thread safe
Kubernetes-commit: 091f4f00395272e23a777d6bf068d67793bf8931
2022-03-29 13:09:26 -04:00
4 changed files with 107 additions and 35 deletions

12
go.mod
View File

@@ -23,23 +23,23 @@ require (
github.com/imdario/mergo v0.3.5
github.com/peterbourgon/diskv v2.0.1+incompatible
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.7.0
github.com/stretchr/testify v1.8.0
golang.org/x/crypto v0.0.0-20210817164053-32db794688a5 // indirect
golang.org/x/net v0.0.0-20211209124913-491a49abca63
golang.org/x/oauth2 v0.0.0-20210819190943-2bc19b11175f
golang.org/x/term v0.0.0-20210615171337-6886f2dfbf5b
golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac
google.golang.org/protobuf v1.27.1
k8s.io/api v0.23.12-rc.0
k8s.io/apimachinery v0.23.12-rc.0
k8s.io/api v0.23.16-rc.0
k8s.io/apimachinery v0.23.16-rc.0
k8s.io/klog/v2 v2.30.0
k8s.io/kube-openapi v0.0.0-20211115234752-e816edb12b65
k8s.io/utils v0.0.0-20211116205334-6203023598ed
sigs.k8s.io/structured-merge-diff/v4 v4.2.1
sigs.k8s.io/structured-merge-diff/v4 v4.2.3
sigs.k8s.io/yaml v1.2.0
)
replace (
k8s.io/api => k8s.io/api v0.23.12-rc.0
k8s.io/apimachinery => k8s.io/apimachinery v0.23.12-rc.0
k8s.io/api => k8s.io/api v0.23.16-rc.0
k8s.io/apimachinery => k8s.io/apimachinery v0.23.16-rc.0
)

21
go.sum
View File

@@ -238,12 +238,14 @@ github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
github.com/stoewer/go-strcase v1.2.0/go.mod h1:IBiWB2sKIp3wVVQ3Y035++gc+knqhUQag1KpM8ahLw8=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
@@ -601,8 +603,9 @@ gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b h1:h8qDotaEPuJATrMmW04NCwg7v22aHH28wwpauUhK9Oo=
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190106161140-3f1c8253044a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190418001031-e561f6794a2a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
@@ -610,10 +613,10 @@ honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWh
honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg=
honnef.co/go/tools v0.0.1-2020.1.3/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k=
honnef.co/go/tools v0.0.1-2020.1.4/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k=
k8s.io/api v0.23.12-rc.0 h1:AQxki2yv2bvrmjFcOOriE8Opm2oiiEKM9NRgLBUE1oA=
k8s.io/api v0.23.12-rc.0/go.mod h1:0SrbIrUuCUPAgGCwFfULPRDUoFKqyOp/JYPa2xgree0=
k8s.io/apimachinery v0.23.12-rc.0 h1:ZR0uJ6cwKtPpMASERdL4RDm17wI+rconfxmSUvdYEy8=
k8s.io/apimachinery v0.23.12-rc.0/go.mod h1:BEuFMMBaIbcOqVIJqNZJXGFTP4W6AycEpb5+m/97hrM=
k8s.io/api v0.23.16-rc.0 h1:uX6LmrC8ThEpbBtAAbqjuB84QrP6zIGytwKIYTk8mI8=
k8s.io/api v0.23.16-rc.0/go.mod h1:EZI/NGtoB69oae6KxTRoKrb7YNll1p9g37EYBZpa04Q=
k8s.io/apimachinery v0.23.16-rc.0 h1:yqVmy4UHlkj+75kLWSb4rBsaDZgvL29kMKz/v4DPM9Q=
k8s.io/apimachinery v0.23.16-rc.0/go.mod h1:mbefzm1H5rPdyibAc8rmzLAbr/oG60tDHQFj0FTqrZU=
k8s.io/gengo v0.0.0-20210813121822-485abfe95c7c/go.mod h1:FiNAH4ZV3gBg2Kwh89tzAEV2be7d5xI0vBa/VySYy3E=
k8s.io/klog/v2 v2.0.0/go.mod h1:PBfzABfn139FHAV07az/IF9Wp1bkk3vpT2XSJ76fSDE=
k8s.io/klog/v2 v2.2.0/go.mod h1:Od+F08eJP+W3HUb4pSrPpgp9DGU4GzlpG/TmITuYh/Y=
@@ -630,7 +633,7 @@ rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA=
sigs.k8s.io/json v0.0.0-20211020170558-c049b76a60c6 h1:fD1pz4yfdADVNfFmcP2aBEtudwUQ1AlLnRBALr33v3s=
sigs.k8s.io/json v0.0.0-20211020170558-c049b76a60c6/go.mod h1:p4QtZmO4uMYipTQNzagwnNoseA6OxSUutVw05NhYDRs=
sigs.k8s.io/structured-merge-diff/v4 v4.0.2/go.mod h1:bJZC9H9iH24zzfZ/41RGcq60oK1F7G282QMXDPYydCw=
sigs.k8s.io/structured-merge-diff/v4 v4.2.1 h1:bKCqE9GvQ5tiVHn5rfn1r+yao3aLQEaLzkkmAkf+A6Y=
sigs.k8s.io/structured-merge-diff/v4 v4.2.1/go.mod h1:j/nl6xW8vLS49O8YvXW1ocPhZawJtm+Yrr7PPRQ0Vg4=
sigs.k8s.io/structured-merge-diff/v4 v4.2.3 h1:PRbqxJClWWYMNV1dhaG4NsibJbArud9kFxnAMREiWFE=
sigs.k8s.io/structured-merge-diff/v4 v4.2.3/go.mod h1:qjx8mGObPmV2aSZepjQjbmb2ihdVs8cGKBraizNC69E=
sigs.k8s.io/yaml v1.2.0 h1:kr/MCeFWJWTwyaHoR9c8EjH9OumOmoF9YGiZd7lFm/Q=
sigs.k8s.io/yaml v1.2.0/go.mod h1:yfXDCHCao9+ENCvLSE62v9VSji2MKu5jeNfTrofGhJc=

View File

@@ -82,6 +82,12 @@ func (r *RequestConstructionError) Error() string {
var noBackoff = &NoBackoff{}
type requestRetryFunc func(maxRetries int) WithRetry
func defaultRequestRetryFn(maxRetries int) WithRetry {
return &withRetry{maxRetries: maxRetries}
}
// Request allows for building up a request to a server in a chained fashion.
// Any errors are stored until the end of your call, so you only have to
// check once.
@@ -93,6 +99,7 @@ type Request struct {
rateLimiter flowcontrol.RateLimiter
backoff BackoffManager
timeout time.Duration
maxRetries int
// generic components accessible via method setters
verb string
@@ -109,9 +116,10 @@ type Request struct {
subresource string
// output
err error
body io.Reader
retry WithRetry
err error
body io.Reader
retryFn requestRetryFunc
}
// NewRequest creates a new request helper object for accessing runtime.Objects on a server.
@@ -142,7 +150,8 @@ func NewRequest(c *RESTClient) *Request {
backoff: backoff,
timeout: timeout,
pathPrefix: pathPrefix,
retry: &withRetry{maxRetries: 10},
maxRetries: 10,
retryFn: defaultRequestRetryFn,
warningHandler: c.warningHandler,
}
@@ -408,7 +417,10 @@ func (r *Request) Timeout(d time.Duration) *Request {
// function is specifically called with a different value.
// A zero maxRetries prevent it from doing retires and return an error immediately.
func (r *Request) MaxRetries(maxRetries int) *Request {
r.retry.SetMaxRetries(maxRetries)
if maxRetries < 0 {
maxRetries = 0
}
r.maxRetries = maxRetries
return r
}
@@ -688,8 +700,10 @@ func (r *Request) Watch(ctx context.Context) (watch.Interface, error) {
}
return false
}
var retryAfter *RetryAfter
url := r.URL().String()
withRetry := r.retryFn(r.maxRetries)
for {
req, err := r.newHTTPRequest(ctx)
if err != nil {
@@ -724,9 +738,9 @@ func (r *Request) Watch(ctx context.Context) (watch.Interface, error) {
defer readAndCloseResponseBody(resp)
var retry bool
retryAfter, retry = r.retry.NextRetry(req, resp, err, isErrRetryableFunc)
retryAfter, retry = withRetry.NextRetry(req, resp, err, isErrRetryableFunc)
if retry {
err := r.retry.BeforeNextRetry(ctx, r.backoff, retryAfter, url, r.body)
err := withRetry.BeforeNextRetry(ctx, r.backoff, retryAfter, url, r.body)
if err == nil {
return false, nil
}
@@ -817,6 +831,7 @@ func (r *Request) Stream(ctx context.Context) (io.ReadCloser, error) {
}
var retryAfter *RetryAfter
withRetry := r.retryFn(r.maxRetries)
url := r.URL().String()
for {
req, err := r.newHTTPRequest(ctx)
@@ -862,9 +877,9 @@ func (r *Request) Stream(ctx context.Context) (io.ReadCloser, error) {
defer resp.Body.Close()
var retry bool
retryAfter, retry = r.retry.NextRetry(req, resp, err, neverRetryError)
retryAfter, retry = withRetry.NextRetry(req, resp, err, neverRetryError)
if retry {
err := r.retry.BeforeNextRetry(ctx, r.backoff, retryAfter, url, r.body)
err := withRetry.BeforeNextRetry(ctx, r.backoff, retryAfter, url, r.body)
if err == nil {
return false, nil
}
@@ -961,6 +976,7 @@ func (r *Request) request(ctx context.Context, fn func(*http.Request, *http.Resp
// Right now we make about ten retry attempts if we get a Retry-After response.
var retryAfter *RetryAfter
withRetry := r.retryFn(r.maxRetries)
for {
req, err := r.newHTTPRequest(ctx)
if err != nil {
@@ -997,7 +1013,7 @@ func (r *Request) request(ctx context.Context, fn func(*http.Request, *http.Resp
}
var retry bool
retryAfter, retry = r.retry.NextRetry(req, resp, err, func(req *http.Request, err error) bool {
retryAfter, retry = withRetry.NextRetry(req, resp, err, func(req *http.Request, err error) bool {
// "Connection reset by peer" or "apiserver is shutting down" are usually a transient errors.
// Thus in case of "GET" operations, we simply retry it.
// We are not automatically retrying "write" operations, as they are not idempotent.
@@ -1011,7 +1027,7 @@ func (r *Request) request(ctx context.Context, fn func(*http.Request, *http.Resp
return false
})
if retry {
err := r.retry.BeforeNextRetry(ctx, r.backoff, retryAfter, req.URL.String(), r.body)
err := withRetry.BeforeNextRetry(ctx, r.backoff, retryAfter, req.URL.String(), r.body)
if err == nil {
return false
}

View File

@@ -32,6 +32,7 @@ import (
"reflect"
"strings"
"sync"
"sync/atomic"
"syscall"
"testing"
"time"
@@ -1194,7 +1195,8 @@ func TestRequestWatch(t *testing.T) {
c.Client = client
}
testCase.Request.backoff = &noSleepBackOff{}
testCase.Request.retry = &withRetry{maxRetries: testCase.maxRetries}
testCase.Request.maxRetries = testCase.maxRetries
testCase.Request.retryFn = defaultRequestRetryFn
watch, err := testCase.Request.Watch(context.Background())
@@ -1407,7 +1409,8 @@ func TestRequestStream(t *testing.T) {
c.Client = client
}
testCase.Request.backoff = &noSleepBackOff{}
testCase.Request.retry = &withRetry{maxRetries: testCase.maxRetries}
testCase.Request.maxRetries = testCase.maxRetries
testCase.Request.retryFn = defaultRequestRetryFn
body, err := testCase.Request.Stream(context.Background())
@@ -1462,7 +1465,7 @@ func TestRequestDo(t *testing.T) {
}
for i, testCase := range testCases {
testCase.Request.backoff = &NoBackoff{}
testCase.Request.retry = &withRetry{}
testCase.Request.retryFn = defaultRequestRetryFn
body, err := testCase.Request.Do(context.Background()).Raw()
hasErr := err != nil
if hasErr != testCase.Err {
@@ -1625,8 +1628,9 @@ func TestConnectionResetByPeerIsRetried(t *testing.T) {
return nil, &net.OpError{Err: syscall.ECONNRESET}
}),
},
backoff: backoff,
retry: &withRetry{maxRetries: 10},
backoff: backoff,
maxRetries: 10,
retryFn: defaultRequestRetryFn,
}
// We expect two retries of "connection reset by peer" and the success.
_, err := req.Do(context.Background()).Raw()
@@ -2699,8 +2703,9 @@ func TestRequestWithRetry(t *testing.T) {
c: &RESTClient{
Client: client,
},
backoff: &noSleepBackOff{},
retry: &withRetry{maxRetries: 1},
backoff: &noSleepBackOff{},
maxRetries: 1,
retryFn: defaultRequestRetryFn,
}
var transformFuncInvoked int
@@ -2890,8 +2895,9 @@ func testRequestWithRetry(t *testing.T, key string, doFunc func(ctx context.Cont
content: defaultContentConfig(),
Client: client,
},
backoff: &noSleepBackOff{},
retry: &withRetry{maxRetries: test.maxRetries},
backoff: &noSleepBackOff{},
maxRetries: test.maxRetries,
retryFn: defaultRequestRetryFn,
}
doFunc(context.Background(), req)
@@ -3093,3 +3099,50 @@ func TestTransportConcurrency(t *testing.T) {
})
}
}
func TestRequestConcurrencyWithRetry(t *testing.T) {
var attempts int32
client := clientForFunc(func(req *http.Request) (*http.Response, error) {
defer func() {
atomic.AddInt32(&attempts, 1)
}()
// always send a retry-after response
return &http.Response{
StatusCode: http.StatusInternalServerError,
Header: http.Header{"Retry-After": []string{"1"}},
}, nil
})
req := &Request{
verb: "POST",
c: &RESTClient{
content: defaultContentConfig(),
Client: client,
},
backoff: &noSleepBackOff{},
maxRetries: 9, // 10 attempts in total, including the first
retryFn: defaultRequestRetryFn,
}
concurrency := 20
wg := sync.WaitGroup{}
wg.Add(concurrency)
startCh := make(chan struct{})
for i := 0; i < concurrency; i++ {
go func() {
defer wg.Done()
<-startCh
req.Do(context.Background())
}()
}
close(startCh)
wg.Wait()
// we expect (concurrency*req.maxRetries+1) attempts to be recorded
expected := concurrency * (req.maxRetries + 1)
if atomic.LoadInt32(&attempts) != int32(expected) {
t.Errorf("Expected attempts: %d, but got: %d", expected, attempts)
}
}