client-go/rest: backoff with context support

The BackoffManager interface sleeps without considering the caller's context,
i.e. cancellation is not supported. This alone is reason enough to deprecate it
and to replace it with an interface that supports a context parameter.

The other reason is that contextual logging needs that parameter.

Kubernetes-commit: b15a1943d51adfb8c5e0185d58d25e038c3d6ade
This commit is contained in:
Patrick Ohly 2024-09-02 20:18:47 +02:00 committed by Kubernetes Publisher
parent 2b2015d460
commit 7aa9904196
11 changed files with 421 additions and 57 deletions

10
go.mod
View File

@ -27,8 +27,8 @@ require (
golang.org/x/time v0.7.0
google.golang.org/protobuf v1.35.1
gopkg.in/evanphx/json-patch.v4 v4.12.0
k8s.io/api v0.0.0-20250115201908-3f63dba05c7a
k8s.io/apimachinery v0.0.0-20250116201610-c74304d2a679
k8s.io/api v0.0.0
k8s.io/apimachinery v0.0.0
k8s.io/klog/v2 v2.130.1
k8s.io/kube-openapi v0.0.0-20241212222426-2c72e554b1e7
k8s.io/utils v0.0.0-20241104100929-3ea5e8cea738
@ -58,6 +58,7 @@ require (
github.com/onsi/ginkgo/v2 v2.21.0 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/stretchr/objx v0.5.2 // indirect
github.com/x448/float16 v0.8.4 // indirect
golang.org/x/sys v0.28.0 // indirect
golang.org/x/text v0.21.0 // indirect
@ -65,3 +66,8 @@ require (
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
replace (
k8s.io/api => ../api
k8s.io/apimachinery => ../apimachinery
)

17
go.sum
View File

@ -1,5 +1,8 @@
cloud.google.com/go/compute/metadata v0.3.0/go.mod h1:zFmK7XCadkQkj6TtorcaGlCW1hT1fIilQDwofLpJ20k=
github.com/NYTimes/gziphandler v1.1.1/go.mod h1:n/CVRwUEOgIxrgPvAQhUUr9oeUtvrhMomdKFjzJNB0c=
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPdPJAN/hZIm0C4OItdklCFmMRWYpio=
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkYZB8zMSxRWpUBQtwG5a7fFgvEO+odwuTv2gs=
github.com/chzyer/readline v1.5.1/go.mod h1:Eh+b79XXUwfKfcPLepksvw2tcLE/Ct21YObkaSkeBlk=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
@ -22,6 +25,7 @@ github.com/go-task/slim-sprig/v3 v3.0.0 h1:sUs3vkvUymDpBKi3qH1YSqBQk9+9D/8M2mN1v
github.com/go-task/slim-sprig/v3 v3.0.0/go.mod h1:W848ghGpv3Qj3dhTPRyJypKRiqCdHZiAzKg9hl15HA8=
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/google/btree v1.1.3 h1:CVpQJjYgC4VbzxeGVHfvZrv1ctoYCAI8vbl07Fcxlyg=
github.com/google/btree v1.1.3/go.mod h1:qOPhT0dTNdNzV6Z/lhRX0YXUafgPLFUh+gZMl761Gm4=
github.com/google/gnostic-models v0.6.9 h1:MU/8wDLif2qCXZmzncUQ/BOfxWfthHi63KqpoNbWqVw=
@ -40,6 +44,7 @@ github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWm
github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/gregjones/httpcache v0.0.0-20190611155906-901d90724c79 h1:+ngKgrYPPJrOjhax5N+uePQ0Fh1Z7PheYoUI/0nzkPA=
github.com/gregjones/httpcache v0.0.0-20190611155906-901d90724c79/go.mod h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA=
github.com/ianlancetaylor/demangle v0.0.0-20240312041847-bd984b5ce465/go.mod h1:gx7rwoVhcfuVKG5uya9Hs3Sxj7EIvldVofAWIUtGouw=
github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY=
github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y=
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
@ -83,6 +88,8 @@ github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY=
github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
@ -93,13 +100,16 @@ github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM=
github.com/x448/float16 v0.8.4/go.mod h1:14CWIYCyZA/cWjXOioeEpHeN/83MdbZDRQHoFcYsOfg=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.31.0/go.mod h1:kDsLvtWBEx7MV9tJOj9bnXsPbxwJQ6csT/x4KIN4Ssk=
golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.21.0/go.mod h1:6SkKJ3Xj0I0BrPOZoBy3bdMptDDU9oJrpohJ3eWZ1fY=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
@ -111,11 +121,13 @@ golang.org/x/oauth2 v0.23.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbht
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA=
golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/telemetry v0.0.0-20240521205824-bda55230c457/go.mod h1:pRgIJT+bRLFKnoM1ldnzKoxTIn14Yxz928LQRYYgIN0=
golang.org/x/term v0.27.0 h1:WP60Sv1nlK1T6SupCHbXzSaN0b9wUmsPoRS9b61A23Q=
golang.org/x/term v0.27.0/go.mod h1:iMsnZpn0cago0GOrHO2+Y7u7JPn5AylBrcoWkElMTSM=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
@ -146,10 +158,7 @@ gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
k8s.io/api v0.0.0-20250115201908-3f63dba05c7a h1:2wVqfU2ZFQqc1hN9LkYE8MMQcjLQDJKn52a+YiVrS7Q=
k8s.io/api v0.0.0-20250115201908-3f63dba05c7a/go.mod h1:/mUSMQWyTyZYxFG6+AwbXd2lQZDLPEK4VwjtCDQDSjE=
k8s.io/apimachinery v0.0.0-20250116201610-c74304d2a679 h1:f+gXPU8rTkOiMzAjHDfeA0/CY+7jsxbuz5j6GpJ35f8=
k8s.io/apimachinery v0.0.0-20250116201610-c74304d2a679/go.mod h1:h8DnJz4KNjkQsP8iFir+s3sSBEK3Iy43bfB2gFjSR+A=
k8s.io/gengo/v2 v2.0.0-20240826214909-a7b603a56eb7/go.mod h1:EJykeLsmFC60UQbYJezXkEsG2FLrt0GPNkU5iK5GWxU=
k8s.io/klog/v2 v2.130.1 h1:n9Xl7H1Xvksem4KFG4PYbdQCQxqc/tTUyrgXaOhHSzk=
k8s.io/klog/v2 v2.130.1/go.mod h1:3Jpz1GvMt720eyJH1ckRHK1EDfpxISzJ7I9OYgaDtPE=
k8s.io/kube-openapi v0.0.0-20241212222426-2c72e554b1e7 h1:hcha5B1kVACrLujCKLbr8XWMxCxzQx42DY8QKYJrDLg=

10
rest/.mockery.yaml Normal file
View File

@ -0,0 +1,10 @@
---
dir: .
filename: "mock_{{.InterfaceName | snakecase}}_test.go"
boilerplate-file: ../../../../../hack/boilerplate/boilerplate.generatego.txt
outpkg: rest
with-expecter: true
packages:
k8s.io/client-go/rest:
interfaces:
BackoffManager:

View File

@ -93,7 +93,7 @@ type RESTClient struct {
content requestClientContentConfigProvider
// creates BackoffManager that is passed to requests.
createBackoffMgr func() BackoffManager
createBackoffMgr func() BackoffManagerWithContext
// rateLimiter is shared among all requests created by this client unless specifically
// overridden.
@ -178,7 +178,7 @@ func (c *RESTClient) GetRateLimiter() flowcontrol.RateLimiter {
// readExpBackoffConfig handles the internal logic of determining what the
// backoff policy is. By default if no information is available, NoBackoff.
// TODO Generalize this see #17727 .
func readExpBackoffConfig() BackoffManager {
func readExpBackoffConfig() BackoffManagerWithContext {
backoffBase := os.Getenv(envBackoffBase)
backoffDuration := os.Getenv(envBackoffDuration)

View File

@ -35,6 +35,7 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes/scheme"
utiltesting "k8s.io/client-go/util/testing"
"k8s.io/klog/v2/ktesting"
"github.com/google/go-cmp/cmp"
)
@ -335,26 +336,26 @@ func TestHTTPProxy(t *testing.T) {
}
func TestCreateBackoffManager(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
theUrl, _ := url.Parse("http://localhost")
// 1 second base backoff + duration of 2 seconds -> exponential backoff for requests.
t.Setenv(envBackoffBase, "1")
t.Setenv(envBackoffDuration, "2")
backoff := readExpBackoffConfig()
backoff.UpdateBackoff(theUrl, nil, 500)
backoff.UpdateBackoff(theUrl, nil, 500)
if backoff.CalculateBackoff(theUrl)/time.Second != 2 {
backoff.UpdateBackoffWithContext(ctx, theUrl, nil, 500)
backoff.UpdateBackoffWithContext(ctx, theUrl, nil, 500)
if backoff.CalculateBackoffWithContext(ctx, theUrl)/time.Second != 2 {
t.Errorf("Backoff env not working.")
}
// 0 duration -> no backoff.
t.Setenv(envBackoffBase, "1")
t.Setenv(envBackoffDuration, "0")
backoff.UpdateBackoff(theUrl, nil, 500)
backoff.UpdateBackoff(theUrl, nil, 500)
backoff.UpdateBackoffWithContext(ctx, theUrl, nil, 500)
backoff.UpdateBackoffWithContext(ctx, theUrl, nil, 500)
backoff = readExpBackoffConfig()
if backoff.CalculateBackoff(theUrl)/time.Second != 0 {
if backoff.CalculateBackoffWithContext(ctx, theUrl)/time.Second != 0 {
t.Errorf("Zero backoff duration, but backoff still occurring.")
}
@ -362,9 +363,9 @@ func TestCreateBackoffManager(t *testing.T) {
t.Setenv(envBackoffBase, "")
t.Setenv(envBackoffDuration, "")
backoff = readExpBackoffConfig()
backoff.UpdateBackoff(theUrl, nil, 500)
backoff.UpdateBackoff(theUrl, nil, 500)
if backoff.CalculateBackoff(theUrl)/time.Second != 0 {
backoff.UpdateBackoffWithContext(ctx, theUrl, nil, 500)
backoff.UpdateBackoffWithContext(ctx, theUrl, nil, 500)
if backoff.CalculateBackoffWithContext(ctx, theUrl)/time.Second != 0 {
t.Errorf("Backoff should have been 0.")
}

View File

@ -0,0 +1,168 @@
/*
Copyright The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Code generated by mockery v2.40.3. DO NOT EDIT.
package rest
import (
mock "github.com/stretchr/testify/mock"
time "time"
url "net/url"
)
// MockBackoffManager is an autogenerated mock type for the BackoffManager type
type MockBackoffManager struct {
mock.Mock
}
type MockBackoffManager_Expecter struct {
mock *mock.Mock
}
func (_m *MockBackoffManager) EXPECT() *MockBackoffManager_Expecter {
return &MockBackoffManager_Expecter{mock: &_m.Mock}
}
// CalculateBackoff provides a mock function with given fields: actualURL
func (_m *MockBackoffManager) CalculateBackoff(actualURL *url.URL) time.Duration {
ret := _m.Called(actualURL)
if len(ret) == 0 {
panic("no return value specified for CalculateBackoff")
}
var r0 time.Duration
if rf, ok := ret.Get(0).(func(*url.URL) time.Duration); ok {
r0 = rf(actualURL)
} else {
r0 = ret.Get(0).(time.Duration)
}
return r0
}
// MockBackoffManager_CalculateBackoff_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CalculateBackoff'
type MockBackoffManager_CalculateBackoff_Call struct {
*mock.Call
}
// CalculateBackoff is a helper method to define mock.On call
// - actualURL *url.URL
func (_e *MockBackoffManager_Expecter) CalculateBackoff(actualURL interface{}) *MockBackoffManager_CalculateBackoff_Call {
return &MockBackoffManager_CalculateBackoff_Call{Call: _e.mock.On("CalculateBackoff", actualURL)}
}
func (_c *MockBackoffManager_CalculateBackoff_Call) Run(run func(actualURL *url.URL)) *MockBackoffManager_CalculateBackoff_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(*url.URL))
})
return _c
}
func (_c *MockBackoffManager_CalculateBackoff_Call) Return(_a0 time.Duration) *MockBackoffManager_CalculateBackoff_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockBackoffManager_CalculateBackoff_Call) RunAndReturn(run func(*url.URL) time.Duration) *MockBackoffManager_CalculateBackoff_Call {
_c.Call.Return(run)
return _c
}
// Sleep provides a mock function with given fields: d
func (_m *MockBackoffManager) Sleep(d time.Duration) {
_m.Called(d)
}
// MockBackoffManager_Sleep_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Sleep'
type MockBackoffManager_Sleep_Call struct {
*mock.Call
}
// Sleep is a helper method to define mock.On call
// - d time.Duration
func (_e *MockBackoffManager_Expecter) Sleep(d interface{}) *MockBackoffManager_Sleep_Call {
return &MockBackoffManager_Sleep_Call{Call: _e.mock.On("Sleep", d)}
}
func (_c *MockBackoffManager_Sleep_Call) Run(run func(d time.Duration)) *MockBackoffManager_Sleep_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(time.Duration))
})
return _c
}
func (_c *MockBackoffManager_Sleep_Call) Return() *MockBackoffManager_Sleep_Call {
_c.Call.Return()
return _c
}
func (_c *MockBackoffManager_Sleep_Call) RunAndReturn(run func(time.Duration)) *MockBackoffManager_Sleep_Call {
_c.Call.Return(run)
return _c
}
// UpdateBackoff provides a mock function with given fields: actualURL, err, responseCode
func (_m *MockBackoffManager) UpdateBackoff(actualURL *url.URL, err error, responseCode int) {
_m.Called(actualURL, err, responseCode)
}
// MockBackoffManager_UpdateBackoff_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'UpdateBackoff'
type MockBackoffManager_UpdateBackoff_Call struct {
*mock.Call
}
// UpdateBackoff is a helper method to define mock.On call
// - actualURL *url.URL
// - err error
// - responseCode int
func (_e *MockBackoffManager_Expecter) UpdateBackoff(actualURL interface{}, err interface{}, responseCode interface{}) *MockBackoffManager_UpdateBackoff_Call {
return &MockBackoffManager_UpdateBackoff_Call{Call: _e.mock.On("UpdateBackoff", actualURL, err, responseCode)}
}
func (_c *MockBackoffManager_UpdateBackoff_Call) Run(run func(actualURL *url.URL, err error, responseCode int)) *MockBackoffManager_UpdateBackoff_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(*url.URL), args[1].(error), args[2].(int))
})
return _c
}
func (_c *MockBackoffManager_UpdateBackoff_Call) Return() *MockBackoffManager_UpdateBackoff_Call {
_c.Call.Return()
return _c
}
func (_c *MockBackoffManager_UpdateBackoff_Call) RunAndReturn(run func(*url.URL, error, int)) *MockBackoffManager_UpdateBackoff_Call {
_c.Call.Return(run)
return _c
}
// NewMockBackoffManager creates a new instance of MockBackoffManager. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
// The first argument is typically a *testing.T value.
func NewMockBackoffManager(t interface {
mock.TestingT
Cleanup(func())
}) *MockBackoffManager {
mock := &MockBackoffManager{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}

View File

@ -106,7 +106,7 @@ type Request struct {
warningHandler WarningHandlerWithContext
rateLimiter flowcontrol.RateLimiter
backoff BackoffManager
backoff BackoffManagerWithContext
timeout time.Duration
maxRetries int
@ -136,7 +136,7 @@ type Request struct {
// NewRequest creates a new request helper object for accessing runtime.Objects on a server.
func NewRequest(c *RESTClient) *Request {
var backoff BackoffManager
var backoff BackoffManagerWithContext
if c.createBackoffMgr != nil {
backoff = c.createBackoffMgr()
}
@ -259,13 +259,27 @@ func (r *Request) Resource(resource string) *Request {
}
// BackOff sets the request's backoff manager to the one specified,
// or defaults to the stub implementation if nil is provided
// or defaults to the stub implementation if nil is provided.
//
// Deprecated: BackoffManager.Sleep ignores the caller's context. Use BackOffWithContext and BackoffManagerWithContext instead.
func (r *Request) BackOff(manager BackoffManager) *Request {
if manager == nil {
r.backoff = &NoBackoff{}
return r
}
r.backoff = &backoffManagerNopContext{BackoffManager: manager}
return r
}
// BackOffWithContext sets the request's backoff manager to the one specified,
// or defaults to the stub implementation if nil is provided.
func (r *Request) BackOffWithContext(manager BackoffManagerWithContext) *Request {
if manager == nil {
r.backoff = &NoBackoff{}
return r
}
r.backoff = manager
return r
}

View File

@ -1489,6 +1489,7 @@ func TestDoRequestNewWay(t *testing.T) {
// This test assumes that the client implementation backs off exponentially, for an individual request.
func TestBackoffLifecycle(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
count := 0
testServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
count++
@ -1508,22 +1509,30 @@ func TestBackoffLifecycle(t *testing.T) {
seconds := []int{0, 1, 2, 4, 8, 0, 1, 2, 4, 0}
request := c.Verb("POST").Prefix("backofftest").Suffix("abc")
clock := testingclock.FakeClock{}
request.backoff = &URLBackoff{
request.backoff = stepClockDuringSleep{
BackoffManagerWithContext: &URLBackoff{
// Use a fake backoff here to avoid flakes and speed the test up.
Backoff: flowcontrol.NewFakeBackOff(
time.Duration(1)*time.Second,
time.Duration(200)*time.Second,
&clock,
)}
),
},
clock: &clock,
}
for _, sec := range seconds {
thisBackoff := request.backoff.CalculateBackoff(request.URL())
thisBackoff := request.backoff.CalculateBackoffWithContext(ctx, request.URL())
t.Logf("Current backoff %v", thisBackoff)
if thisBackoff != time.Duration(sec)*time.Second {
t.Errorf("Backoff is %v instead of %v", thisBackoff, sec)
}
// This relies on advancing the fake clock by exactly the duration
// that SleepWithContext is being called for while DoRaw is executing.
// stepClockDuringSleep.SleepWithContext ensures that this happens.
now := clock.Now()
request.DoRaw(context.Background())
request.DoRaw(ctx)
elapsed := clock.Since(now)
if clock.Since(now) != thisBackoff {
t.Errorf("CalculatedBackoff not honored by clock: Expected time of %v, but got %v ", thisBackoff, elapsed)
@ -1531,18 +1540,51 @@ func TestBackoffLifecycle(t *testing.T) {
}
}
type stepClockDuringSleep struct {
BackoffManagerWithContext
clock *testingclock.FakeClock
}
// SleepWithContext wraps the underlying SleepWithContext and ensures that once
// that is sleeping, the fake clock advances by exactly the duration that
// it is sleeping for.
func (s stepClockDuringSleep) SleepWithContext(ctx context.Context, d time.Duration) {
// This code is sensitive to both the implementation of
// URLBackoff.SleepWithContext and of FakeClock.NewTimer:
// - SleepWithContext must be a no-op when the duration is zero
// => no need to step the fake clock
// - SleepWithContext must use FakeClock.NewTimer, not FakeClock.Sleep
// because the latter would advance time itself
if d != 0 {
go func() {
// Poll until the caller is sleeping.
for {
if s.clock.HasWaiters() {
s.clock.Step(d)
return
}
if ctx.Err() != nil {
return
}
time.Sleep(time.Millisecond)
}
}()
}
s.BackoffManagerWithContext.SleepWithContext(ctx, d)
}
type testBackoffManager struct {
sleeps []time.Duration
}
func (b *testBackoffManager) UpdateBackoff(actualUrl *url.URL, err error, responseCode int) {
func (b *testBackoffManager) UpdateBackoffWithContext(ctx context.Context, actualURL *url.URL, err error, responseCode int) {
}
func (b *testBackoffManager) CalculateBackoff(actualUrl *url.URL) time.Duration {
func (b *testBackoffManager) CalculateBackoffWithContext(ctx context.Context, actualURL *url.URL) time.Duration {
return time.Duration(0)
}
func (b *testBackoffManager) Sleep(d time.Duration) {
func (b *testBackoffManager) SleepWithContext(ctx context.Context, d time.Duration) {
b.sleeps = append(b.sleeps, d)
}
@ -1568,7 +1610,7 @@ func TestCheckRetryClosesBody(t *testing.T) {
expectedSleeps := []time.Duration{0, time.Second, time.Second, time.Second, time.Second}
c := testRESTClient(t, testServer)
c.createBackoffMgr = func() BackoffManager { return backoff }
c.createBackoffMgr = func() BackoffManagerWithContext { return backoff }
_, err := c.Verb("POST").
Prefix("foo", "bar").
Suffix("baz").
@ -2612,6 +2654,8 @@ type noSleepBackOff struct {
func (n *noSleepBackOff) Sleep(d time.Duration) {}
func (n *noSleepBackOff) SleepWithContext(ctx context.Context, d time.Duration) {}
func TestRequestWithRetry(t *testing.T) {
tests := []struct {
name string
@ -2997,7 +3041,6 @@ const retryTestKey retryTestKeyType = iota
// metric calls are invoked appropriately in right order.
type withRateLimiterBackoffManagerAndMetrics struct {
flowcontrol.RateLimiter
*NoBackoff
metrics.ResultMetric
calculateBackoffSeq int64
calculateBackoffFn func(i int64) time.Duration
@ -3013,7 +3056,7 @@ func (lb *withRateLimiterBackoffManagerAndMetrics) Wait(ctx context.Context) err
return nil
}
func (lb *withRateLimiterBackoffManagerAndMetrics) CalculateBackoff(actualUrl *url.URL) time.Duration {
func (lb *withRateLimiterBackoffManagerAndMetrics) CalculateBackoffWithContext(ctx context.Context, actualURL *url.URL) time.Duration {
lb.invokeOrderGot = append(lb.invokeOrderGot, "BackoffManager.CalculateBackoff")
waitFor := lb.calculateBackoffFn(lb.calculateBackoffSeq)
@ -3021,11 +3064,11 @@ func (lb *withRateLimiterBackoffManagerAndMetrics) CalculateBackoff(actualUrl *u
return waitFor
}
func (lb *withRateLimiterBackoffManagerAndMetrics) UpdateBackoff(actualUrl *url.URL, err error, responseCode int) {
func (lb *withRateLimiterBackoffManagerAndMetrics) UpdateBackoffWithContext(ctx context.Context, actualURL *url.URL, err error, responseCode int) {
lb.invokeOrderGot = append(lb.invokeOrderGot, "BackoffManager.UpdateBackoff")
}
func (lb *withRateLimiterBackoffManagerAndMetrics) Sleep(d time.Duration) {
func (lb *withRateLimiterBackoffManagerAndMetrics) SleepWithContext(ctx context.Context, d time.Duration) {
lb.invokeOrderGot = append(lb.invokeOrderGot, "BackoffManager.Sleep")
lb.sleepsGot = append(lb.sleepsGot, d.String())
}
@ -3206,7 +3249,6 @@ func testRetryWithRateLimiterBackoffAndMetrics(t *testing.T, key string, doFunc
t.Run(test.name, func(t *testing.T) {
interceptor := &withRateLimiterBackoffManagerAndMetrics{
RateLimiter: flowcontrol.NewFakeAlwaysRateLimiter(),
NoBackoff: &NoBackoff{},
calculateBackoffFn: test.calculateBackoffFn,
}

View File

@ -17,6 +17,8 @@ limitations under the License.
package rest
import (
"context"
"fmt"
"net/url"
"time"
@ -32,12 +34,24 @@ import (
var serverIsOverloadedSet = sets.NewInt(429)
var maxResponseCode = 499
//go:generate mockery
// Deprecated: BackoffManager.Sleep ignores the caller's context. Use BackoffManagerWithContext instead.
type BackoffManager interface {
UpdateBackoff(actualUrl *url.URL, err error, responseCode int)
CalculateBackoff(actualUrl *url.URL) time.Duration
UpdateBackoff(actualURL *url.URL, err error, responseCode int)
CalculateBackoff(actualURL *url.URL) time.Duration
Sleep(d time.Duration)
}
type BackoffManagerWithContext interface {
UpdateBackoffWithContext(ctx context.Context, actualURL *url.URL, err error, responseCode int)
CalculateBackoffWithContext(ctx context.Context, actualURL *url.URL) time.Duration
SleepWithContext(ctx context.Context, d time.Duration)
}
var _ BackoffManager = &URLBackoff{}
var _ BackoffManagerWithContext = &URLBackoff{}
// URLBackoff struct implements the semantics on top of Backoff which
// we need for URL specific exponential backoff.
type URLBackoff struct {
@ -49,11 +63,19 @@ type URLBackoff struct {
type NoBackoff struct {
}
func (n *NoBackoff) UpdateBackoff(actualUrl *url.URL, err error, responseCode int) {
func (n *NoBackoff) UpdateBackoff(actualURL *url.URL, err error, responseCode int) {
// do nothing.
}
func (n *NoBackoff) CalculateBackoff(actualUrl *url.URL) time.Duration {
func (n *NoBackoff) UpdateBackoffWithContext(ctx context.Context, actualURL *url.URL, err error, responseCode int) {
// do nothing.
}
func (n *NoBackoff) CalculateBackoff(actualURL *url.URL) time.Duration {
return 0 * time.Second
}
func (n *NoBackoff) CalculateBackoffWithContext(ctx context.Context, actualURL *url.URL) time.Duration {
return 0 * time.Second
}
@ -61,10 +83,21 @@ func (n *NoBackoff) Sleep(d time.Duration) {
time.Sleep(d)
}
func (n *NoBackoff) SleepWithContext(ctx context.Context, d time.Duration) {
if d == 0 {
return
}
t := time.NewTimer(d)
defer t.Stop()
select {
case <-ctx.Done():
case <-t.C:
}
}
// Disable makes the backoff trivial, i.e., sets it to zero. This might be used
// by tests which want to run 1000s of mock requests without slowing down.
func (b *URLBackoff) Disable() {
klog.V(4).Infof("Disabling backoff strategy")
b.Backoff = flowcontrol.NewBackOff(0*time.Second, 0*time.Second)
}
@ -76,32 +109,74 @@ func (b *URLBackoff) baseUrlKey(rawurl *url.URL) string {
// in the future.
host, err := url.Parse(rawurl.String())
if err != nil {
klog.V(4).Infof("Error extracting url: %v", rawurl)
panic("bad url!")
panic(fmt.Sprintf("Error parsing bad URL %q: %v", rawurl, err))
}
return host.Host
}
// UpdateBackoff updates backoff metadata
func (b *URLBackoff) UpdateBackoff(actualUrl *url.URL, err error, responseCode int) {
func (b *URLBackoff) UpdateBackoff(actualURL *url.URL, err error, responseCode int) {
b.UpdateBackoffWithContext(context.Background(), actualURL, err, responseCode)
}
// UpdateBackoffWithContext updates backoff metadata
func (b *URLBackoff) UpdateBackoffWithContext(ctx context.Context, actualURL *url.URL, err error, responseCode int) {
// range for retry counts that we store is [0,13]
if responseCode > maxResponseCode || serverIsOverloadedSet.Has(responseCode) {
b.Backoff.Next(b.baseUrlKey(actualUrl), b.Backoff.Clock.Now())
b.Backoff.Next(b.baseUrlKey(actualURL), b.Backoff.Clock.Now())
return
} else if responseCode >= 300 || err != nil {
klog.V(4).Infof("Client is returning errors: code %v, error %v", responseCode, err)
klog.FromContext(ctx).V(4).Info("Client is returning errors", "code", responseCode, "err", err)
}
//If we got this far, there is no backoff required for this URL anymore.
b.Backoff.Reset(b.baseUrlKey(actualUrl))
b.Backoff.Reset(b.baseUrlKey(actualURL))
}
// CalculateBackoff takes a url and back's off exponentially,
// based on its knowledge of existing failures.
func (b *URLBackoff) CalculateBackoff(actualUrl *url.URL) time.Duration {
return b.Backoff.Get(b.baseUrlKey(actualUrl))
func (b *URLBackoff) CalculateBackoff(actualURL *url.URL) time.Duration {
return b.Backoff.Get(b.baseUrlKey(actualURL))
}
// CalculateBackoffWithContext takes a url and back's off exponentially,
// based on its knowledge of existing failures.
func (b *URLBackoff) CalculateBackoffWithContext(ctx context.Context, actualURL *url.URL) time.Duration {
return b.Backoff.Get(b.baseUrlKey(actualURL))
}
func (b *URLBackoff) Sleep(d time.Duration) {
b.Backoff.Clock.Sleep(d)
}
func (b *URLBackoff) SleepWithContext(ctx context.Context, d time.Duration) {
if d == 0 {
return
}
t := b.Backoff.Clock.NewTimer(d)
defer t.Stop()
select {
case <-ctx.Done():
case <-t.C():
}
}
// backoffManagerNopContext wraps a BackoffManager and adds the *WithContext methods.
type backoffManagerNopContext struct {
BackoffManager
}
var _ BackoffManager = &backoffManagerNopContext{}
var _ BackoffManagerWithContext = &backoffManagerNopContext{}
func (b *backoffManagerNopContext) UpdateBackoffWithContext(ctx context.Context, actualURL *url.URL, err error, responseCode int) {
b.UpdateBackoff(actualURL, err, responseCode)
}
func (b *backoffManagerNopContext) CalculateBackoffWithContext(ctx context.Context, actualURL *url.URL) time.Duration {
return b.CalculateBackoff(actualURL)
}
func (b *backoffManagerNopContext) SleepWithContext(ctx context.Context, d time.Duration) {
b.Sleep(d)
}

View File

@ -17,10 +17,14 @@ limitations under the License.
package rest
import (
"context"
"errors"
"net/url"
"testing"
"time"
"github.com/stretchr/testify/assert"
"k8s.io/client-go/util/flowcontrol"
)
@ -77,3 +81,38 @@ func TestURLBackoffFunctionality(t *testing.T) {
t.Errorf("The final return code %v should have resulted in a backoff ! ", returnCodes[7])
}
}
func TestBackoffManagerNopContext(t *testing.T) {
mock := NewMockBackoffManager(t)
sleepDuration := 42 * time.Second
mock.On("Sleep", sleepDuration).Return()
url := &url.URL{}
mock.On("CalculateBackoff", url).Return(time.Second)
err := errors.New("fake error")
responseCode := 404
mock.On("UpdateBackoff", url, err, responseCode).Return()
ctx := context.Background()
wrapper := backoffManagerNopContext{BackoffManager: mock}
wrapper.SleepWithContext(ctx, sleepDuration)
wrapper.CalculateBackoffWithContext(ctx, url)
wrapper.UpdateBackoffWithContext(ctx, url, err, responseCode)
}
func TestNoBackoff(t *testing.T) {
var backoff NoBackoff
assert.Equal(t, 0*time.Second, backoff.CalculateBackoff(nil))
assert.Equal(t, 0*time.Second, backoff.CalculateBackoffWithContext(context.Background(), nil))
start := time.Now()
backoff.Sleep(0 * time.Second)
assert.WithinDuration(t, start, time.Now(), time.Minute /* pretty generous, but we don't want to flake */, time.Since(start), "backoff.Sleep")
// Cancel right away to prevent sleeping.
ctx, cancel := context.WithCancel(context.Background())
cancel()
start = time.Now()
backoff.SleepWithContext(ctx, 10*time.Minute)
assert.WithinDuration(t, start, time.Now(), time.Minute /* pretty generous, but we don't want to flake */, time.Since(start), "backoff.SleepWithContext")
}

View File

@ -209,18 +209,18 @@ func (r *withRetry) Before(ctx context.Context, request *Request) error {
// we do a backoff sleep before the first attempt is made,
// (preserving current behavior).
if request.backoff != nil {
request.backoff.Sleep(request.backoff.CalculateBackoff(url))
request.backoff.SleepWithContext(ctx, request.backoff.CalculateBackoffWithContext(ctx, url))
}
return nil
}
// if we are here, we have made attempt(s) at least once before.
if request.backoff != nil {
delay := request.backoff.CalculateBackoff(url)
delay := request.backoff.CalculateBackoffWithContext(ctx, url)
if r.retryAfter.Wait > delay {
delay = r.retryAfter.Wait
}
request.backoff.Sleep(delay)
request.backoff.SleepWithContext(ctx, delay)
}
// We are retrying the request that we already send to
@ -258,9 +258,9 @@ func (r *withRetry) After(ctx context.Context, request *Request, resp *http.Resp
if request.c.base != nil {
if err != nil {
request.backoff.UpdateBackoff(request.URL(), err, 0)
request.backoff.UpdateBackoffWithContext(ctx, request.URL(), err, 0)
} else {
request.backoff.UpdateBackoff(request.URL(), err, resp.StatusCode)
request.backoff.UpdateBackoffWithContext(ctx, request.URL(), err, resp.StatusCode)
}
}
}