From 7aa9904196bfe2f610273550f8abf07e3ea3a16b Mon Sep 17 00:00:00 2001 From: Patrick Ohly Date: Mon, 2 Sep 2024 20:18:47 +0200 Subject: [PATCH] client-go/rest: backoff with context support The BackoffManager interface sleeps without considering the caller's context, i.e. cancellation is not supported. This alone is reason enough to deprecate it and to replace it with an interface that supports a context parameter. The other reason is that contextual logging needs that parameter. Kubernetes-commit: b15a1943d51adfb8c5e0185d58d25e038c3d6ade --- go.mod | 10 +- go.sum | 17 ++- rest/.mockery.yaml | 10 ++ rest/client.go | 4 +- rest/client_test.go | 21 ++-- rest/mock_backoff_manager_test.go | 168 ++++++++++++++++++++++++++++++ rest/request.go | 20 +++- rest/request_test.go | 78 ++++++++++---- rest/urlbackoff.go | 101 +++++++++++++++--- rest/urlbackoff_test.go | 39 +++++++ rest/with_retry.go | 10 +- 11 files changed, 421 insertions(+), 57 deletions(-) create mode 100644 rest/.mockery.yaml create mode 100644 rest/mock_backoff_manager_test.go diff --git a/go.mod b/go.mod index de3bfc32..3a50ca09 100644 --- a/go.mod +++ b/go.mod @@ -27,8 +27,8 @@ require ( golang.org/x/time v0.7.0 google.golang.org/protobuf v1.35.1 gopkg.in/evanphx/json-patch.v4 v4.12.0 - k8s.io/api v0.0.0-20250115201908-3f63dba05c7a - k8s.io/apimachinery v0.0.0-20250116201610-c74304d2a679 + k8s.io/api v0.0.0 + k8s.io/apimachinery v0.0.0 k8s.io/klog/v2 v2.130.1 k8s.io/kube-openapi v0.0.0-20241212222426-2c72e554b1e7 k8s.io/utils v0.0.0-20241104100929-3ea5e8cea738 @@ -58,6 +58,7 @@ require ( github.com/onsi/ginkgo/v2 v2.21.0 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/stretchr/objx v0.5.2 // indirect github.com/x448/float16 v0.8.4 // indirect golang.org/x/sys v0.28.0 // indirect golang.org/x/text v0.21.0 // indirect @@ -65,3 +66,8 @@ require ( gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) + +replace ( + k8s.io/api => ../api + k8s.io/apimachinery => ../apimachinery +) diff --git a/go.sum b/go.sum index 3f6f5afb..d04ea081 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,8 @@ +cloud.google.com/go/compute/metadata v0.3.0/go.mod h1:zFmK7XCadkQkj6TtorcaGlCW1hT1fIilQDwofLpJ20k= +github.com/NYTimes/gziphandler v1.1.1/go.mod h1:n/CVRwUEOgIxrgPvAQhUUr9oeUtvrhMomdKFjzJNB0c= github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPdPJAN/hZIm0C4OItdklCFmMRWYpio= github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkYZB8zMSxRWpUBQtwG5a7fFgvEO+odwuTv2gs= +github.com/chzyer/readline v1.5.1/go.mod h1:Eh+b79XXUwfKfcPLepksvw2tcLE/Ct21YObkaSkeBlk= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= @@ -22,6 +25,7 @@ github.com/go-task/slim-sprig/v3 v3.0.0 h1:sUs3vkvUymDpBKi3qH1YSqBQk9+9D/8M2mN1v github.com/go-task/slim-sprig/v3 v3.0.0/go.mod h1:W848ghGpv3Qj3dhTPRyJypKRiqCdHZiAzKg9hl15HA8= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= +github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= github.com/google/btree v1.1.3 h1:CVpQJjYgC4VbzxeGVHfvZrv1ctoYCAI8vbl07Fcxlyg= github.com/google/btree v1.1.3/go.mod h1:qOPhT0dTNdNzV6Z/lhRX0YXUafgPLFUh+gZMl761Gm4= github.com/google/gnostic-models v0.6.9 h1:MU/8wDLif2qCXZmzncUQ/BOfxWfthHi63KqpoNbWqVw= @@ -40,6 +44,7 @@ github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWm github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/gregjones/httpcache v0.0.0-20190611155906-901d90724c79 h1:+ngKgrYPPJrOjhax5N+uePQ0Fh1Z7PheYoUI/0nzkPA= github.com/gregjones/httpcache v0.0.0-20190611155906-901d90724c79/go.mod h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA= +github.com/ianlancetaylor/demangle v0.0.0-20240312041847-bd984b5ce465/go.mod h1:gx7rwoVhcfuVKG5uya9Hs3Sxj7EIvldVofAWIUtGouw= github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY= github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y= github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= @@ -83,6 +88,8 @@ github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= +github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY= +github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= @@ -93,13 +100,16 @@ github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM= github.com/x448/float16 v0.8.4/go.mod h1:14CWIYCyZA/cWjXOioeEpHeN/83MdbZDRQHoFcYsOfg= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/crypto v0.31.0/go.mod h1:kDsLvtWBEx7MV9tJOj9bnXsPbxwJQ6csT/x4KIN4Ssk= golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/mod v0.21.0/go.mod h1:6SkKJ3Xj0I0BrPOZoBy3bdMptDDU9oJrpohJ3eWZ1fY= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= @@ -111,11 +121,13 @@ golang.org/x/oauth2 v0.23.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbht golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA= golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/telemetry v0.0.0-20240521205824-bda55230c457/go.mod h1:pRgIJT+bRLFKnoM1ldnzKoxTIn14Yxz928LQRYYgIN0= golang.org/x/term v0.27.0 h1:WP60Sv1nlK1T6SupCHbXzSaN0b9wUmsPoRS9b61A23Q= golang.org/x/term v0.27.0/go.mod h1:iMsnZpn0cago0GOrHO2+Y7u7JPn5AylBrcoWkElMTSM= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -146,10 +158,7 @@ gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= -k8s.io/api v0.0.0-20250115201908-3f63dba05c7a h1:2wVqfU2ZFQqc1hN9LkYE8MMQcjLQDJKn52a+YiVrS7Q= -k8s.io/api v0.0.0-20250115201908-3f63dba05c7a/go.mod h1:/mUSMQWyTyZYxFG6+AwbXd2lQZDLPEK4VwjtCDQDSjE= -k8s.io/apimachinery v0.0.0-20250116201610-c74304d2a679 h1:f+gXPU8rTkOiMzAjHDfeA0/CY+7jsxbuz5j6GpJ35f8= -k8s.io/apimachinery v0.0.0-20250116201610-c74304d2a679/go.mod h1:h8DnJz4KNjkQsP8iFir+s3sSBEK3Iy43bfB2gFjSR+A= +k8s.io/gengo/v2 v2.0.0-20240826214909-a7b603a56eb7/go.mod h1:EJykeLsmFC60UQbYJezXkEsG2FLrt0GPNkU5iK5GWxU= k8s.io/klog/v2 v2.130.1 h1:n9Xl7H1Xvksem4KFG4PYbdQCQxqc/tTUyrgXaOhHSzk= k8s.io/klog/v2 v2.130.1/go.mod h1:3Jpz1GvMt720eyJH1ckRHK1EDfpxISzJ7I9OYgaDtPE= k8s.io/kube-openapi v0.0.0-20241212222426-2c72e554b1e7 h1:hcha5B1kVACrLujCKLbr8XWMxCxzQx42DY8QKYJrDLg= diff --git a/rest/.mockery.yaml b/rest/.mockery.yaml new file mode 100644 index 00000000..e21d7b5b --- /dev/null +++ b/rest/.mockery.yaml @@ -0,0 +1,10 @@ +--- +dir: . +filename: "mock_{{.InterfaceName | snakecase}}_test.go" +boilerplate-file: ../../../../../hack/boilerplate/boilerplate.generatego.txt +outpkg: rest +with-expecter: true +packages: + k8s.io/client-go/rest: + interfaces: + BackoffManager: diff --git a/rest/client.go b/rest/client.go index 29a25448..a085c334 100644 --- a/rest/client.go +++ b/rest/client.go @@ -93,7 +93,7 @@ type RESTClient struct { content requestClientContentConfigProvider // creates BackoffManager that is passed to requests. - createBackoffMgr func() BackoffManager + createBackoffMgr func() BackoffManagerWithContext // rateLimiter is shared among all requests created by this client unless specifically // overridden. @@ -178,7 +178,7 @@ func (c *RESTClient) GetRateLimiter() flowcontrol.RateLimiter { // readExpBackoffConfig handles the internal logic of determining what the // backoff policy is. By default if no information is available, NoBackoff. // TODO Generalize this see #17727 . -func readExpBackoffConfig() BackoffManager { +func readExpBackoffConfig() BackoffManagerWithContext { backoffBase := os.Getenv(envBackoffBase) backoffDuration := os.Getenv(envBackoffDuration) diff --git a/rest/client_test.go b/rest/client_test.go index ebb35c50..c03f6832 100644 --- a/rest/client_test.go +++ b/rest/client_test.go @@ -35,6 +35,7 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/kubernetes/scheme" utiltesting "k8s.io/client-go/util/testing" + "k8s.io/klog/v2/ktesting" "github.com/google/go-cmp/cmp" ) @@ -335,26 +336,26 @@ func TestHTTPProxy(t *testing.T) { } func TestCreateBackoffManager(t *testing.T) { - + _, ctx := ktesting.NewTestContext(t) theUrl, _ := url.Parse("http://localhost") // 1 second base backoff + duration of 2 seconds -> exponential backoff for requests. t.Setenv(envBackoffBase, "1") t.Setenv(envBackoffDuration, "2") backoff := readExpBackoffConfig() - backoff.UpdateBackoff(theUrl, nil, 500) - backoff.UpdateBackoff(theUrl, nil, 500) - if backoff.CalculateBackoff(theUrl)/time.Second != 2 { + backoff.UpdateBackoffWithContext(ctx, theUrl, nil, 500) + backoff.UpdateBackoffWithContext(ctx, theUrl, nil, 500) + if backoff.CalculateBackoffWithContext(ctx, theUrl)/time.Second != 2 { t.Errorf("Backoff env not working.") } // 0 duration -> no backoff. t.Setenv(envBackoffBase, "1") t.Setenv(envBackoffDuration, "0") - backoff.UpdateBackoff(theUrl, nil, 500) - backoff.UpdateBackoff(theUrl, nil, 500) + backoff.UpdateBackoffWithContext(ctx, theUrl, nil, 500) + backoff.UpdateBackoffWithContext(ctx, theUrl, nil, 500) backoff = readExpBackoffConfig() - if backoff.CalculateBackoff(theUrl)/time.Second != 0 { + if backoff.CalculateBackoffWithContext(ctx, theUrl)/time.Second != 0 { t.Errorf("Zero backoff duration, but backoff still occurring.") } @@ -362,9 +363,9 @@ func TestCreateBackoffManager(t *testing.T) { t.Setenv(envBackoffBase, "") t.Setenv(envBackoffDuration, "") backoff = readExpBackoffConfig() - backoff.UpdateBackoff(theUrl, nil, 500) - backoff.UpdateBackoff(theUrl, nil, 500) - if backoff.CalculateBackoff(theUrl)/time.Second != 0 { + backoff.UpdateBackoffWithContext(ctx, theUrl, nil, 500) + backoff.UpdateBackoffWithContext(ctx, theUrl, nil, 500) + if backoff.CalculateBackoffWithContext(ctx, theUrl)/time.Second != 0 { t.Errorf("Backoff should have been 0.") } diff --git a/rest/mock_backoff_manager_test.go b/rest/mock_backoff_manager_test.go new file mode 100644 index 00000000..3cd4585a --- /dev/null +++ b/rest/mock_backoff_manager_test.go @@ -0,0 +1,168 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by mockery v2.40.3. DO NOT EDIT. + +package rest + +import ( + mock "github.com/stretchr/testify/mock" + + time "time" + + url "net/url" +) + +// MockBackoffManager is an autogenerated mock type for the BackoffManager type +type MockBackoffManager struct { + mock.Mock +} + +type MockBackoffManager_Expecter struct { + mock *mock.Mock +} + +func (_m *MockBackoffManager) EXPECT() *MockBackoffManager_Expecter { + return &MockBackoffManager_Expecter{mock: &_m.Mock} +} + +// CalculateBackoff provides a mock function with given fields: actualURL +func (_m *MockBackoffManager) CalculateBackoff(actualURL *url.URL) time.Duration { + ret := _m.Called(actualURL) + + if len(ret) == 0 { + panic("no return value specified for CalculateBackoff") + } + + var r0 time.Duration + if rf, ok := ret.Get(0).(func(*url.URL) time.Duration); ok { + r0 = rf(actualURL) + } else { + r0 = ret.Get(0).(time.Duration) + } + + return r0 +} + +// MockBackoffManager_CalculateBackoff_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CalculateBackoff' +type MockBackoffManager_CalculateBackoff_Call struct { + *mock.Call +} + +// CalculateBackoff is a helper method to define mock.On call +// - actualURL *url.URL +func (_e *MockBackoffManager_Expecter) CalculateBackoff(actualURL interface{}) *MockBackoffManager_CalculateBackoff_Call { + return &MockBackoffManager_CalculateBackoff_Call{Call: _e.mock.On("CalculateBackoff", actualURL)} +} + +func (_c *MockBackoffManager_CalculateBackoff_Call) Run(run func(actualURL *url.URL)) *MockBackoffManager_CalculateBackoff_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(*url.URL)) + }) + return _c +} + +func (_c *MockBackoffManager_CalculateBackoff_Call) Return(_a0 time.Duration) *MockBackoffManager_CalculateBackoff_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockBackoffManager_CalculateBackoff_Call) RunAndReturn(run func(*url.URL) time.Duration) *MockBackoffManager_CalculateBackoff_Call { + _c.Call.Return(run) + return _c +} + +// Sleep provides a mock function with given fields: d +func (_m *MockBackoffManager) Sleep(d time.Duration) { + _m.Called(d) +} + +// MockBackoffManager_Sleep_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Sleep' +type MockBackoffManager_Sleep_Call struct { + *mock.Call +} + +// Sleep is a helper method to define mock.On call +// - d time.Duration +func (_e *MockBackoffManager_Expecter) Sleep(d interface{}) *MockBackoffManager_Sleep_Call { + return &MockBackoffManager_Sleep_Call{Call: _e.mock.On("Sleep", d)} +} + +func (_c *MockBackoffManager_Sleep_Call) Run(run func(d time.Duration)) *MockBackoffManager_Sleep_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(time.Duration)) + }) + return _c +} + +func (_c *MockBackoffManager_Sleep_Call) Return() *MockBackoffManager_Sleep_Call { + _c.Call.Return() + return _c +} + +func (_c *MockBackoffManager_Sleep_Call) RunAndReturn(run func(time.Duration)) *MockBackoffManager_Sleep_Call { + _c.Call.Return(run) + return _c +} + +// UpdateBackoff provides a mock function with given fields: actualURL, err, responseCode +func (_m *MockBackoffManager) UpdateBackoff(actualURL *url.URL, err error, responseCode int) { + _m.Called(actualURL, err, responseCode) +} + +// MockBackoffManager_UpdateBackoff_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'UpdateBackoff' +type MockBackoffManager_UpdateBackoff_Call struct { + *mock.Call +} + +// UpdateBackoff is a helper method to define mock.On call +// - actualURL *url.URL +// - err error +// - responseCode int +func (_e *MockBackoffManager_Expecter) UpdateBackoff(actualURL interface{}, err interface{}, responseCode interface{}) *MockBackoffManager_UpdateBackoff_Call { + return &MockBackoffManager_UpdateBackoff_Call{Call: _e.mock.On("UpdateBackoff", actualURL, err, responseCode)} +} + +func (_c *MockBackoffManager_UpdateBackoff_Call) Run(run func(actualURL *url.URL, err error, responseCode int)) *MockBackoffManager_UpdateBackoff_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(*url.URL), args[1].(error), args[2].(int)) + }) + return _c +} + +func (_c *MockBackoffManager_UpdateBackoff_Call) Return() *MockBackoffManager_UpdateBackoff_Call { + _c.Call.Return() + return _c +} + +func (_c *MockBackoffManager_UpdateBackoff_Call) RunAndReturn(run func(*url.URL, error, int)) *MockBackoffManager_UpdateBackoff_Call { + _c.Call.Return(run) + return _c +} + +// NewMockBackoffManager creates a new instance of MockBackoffManager. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewMockBackoffManager(t interface { + mock.TestingT + Cleanup(func()) +}) *MockBackoffManager { + mock := &MockBackoffManager{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/rest/request.go b/rest/request.go index b10db0ad..864ccd87 100644 --- a/rest/request.go +++ b/rest/request.go @@ -106,7 +106,7 @@ type Request struct { warningHandler WarningHandlerWithContext rateLimiter flowcontrol.RateLimiter - backoff BackoffManager + backoff BackoffManagerWithContext timeout time.Duration maxRetries int @@ -136,7 +136,7 @@ type Request struct { // NewRequest creates a new request helper object for accessing runtime.Objects on a server. func NewRequest(c *RESTClient) *Request { - var backoff BackoffManager + var backoff BackoffManagerWithContext if c.createBackoffMgr != nil { backoff = c.createBackoffMgr() } @@ -259,13 +259,27 @@ func (r *Request) Resource(resource string) *Request { } // BackOff sets the request's backoff manager to the one specified, -// or defaults to the stub implementation if nil is provided +// or defaults to the stub implementation if nil is provided. +// +// Deprecated: BackoffManager.Sleep ignores the caller's context. Use BackOffWithContext and BackoffManagerWithContext instead. func (r *Request) BackOff(manager BackoffManager) *Request { if manager == nil { r.backoff = &NoBackoff{} return r } + r.backoff = &backoffManagerNopContext{BackoffManager: manager} + return r +} + +// BackOffWithContext sets the request's backoff manager to the one specified, +// or defaults to the stub implementation if nil is provided. +func (r *Request) BackOffWithContext(manager BackoffManagerWithContext) *Request { + if manager == nil { + r.backoff = &NoBackoff{} + return r + } + r.backoff = manager return r } diff --git a/rest/request_test.go b/rest/request_test.go index 013a2281..0096501b 100644 --- a/rest/request_test.go +++ b/rest/request_test.go @@ -1489,6 +1489,7 @@ func TestDoRequestNewWay(t *testing.T) { // This test assumes that the client implementation backs off exponentially, for an individual request. func TestBackoffLifecycle(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) count := 0 testServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { count++ @@ -1508,22 +1509,30 @@ func TestBackoffLifecycle(t *testing.T) { seconds := []int{0, 1, 2, 4, 8, 0, 1, 2, 4, 0} request := c.Verb("POST").Prefix("backofftest").Suffix("abc") clock := testingclock.FakeClock{} - request.backoff = &URLBackoff{ - // Use a fake backoff here to avoid flakes and speed the test up. - Backoff: flowcontrol.NewFakeBackOff( - time.Duration(1)*time.Second, - time.Duration(200)*time.Second, - &clock, - )} + request.backoff = stepClockDuringSleep{ + BackoffManagerWithContext: &URLBackoff{ + // Use a fake backoff here to avoid flakes and speed the test up. + Backoff: flowcontrol.NewFakeBackOff( + time.Duration(1)*time.Second, + time.Duration(200)*time.Second, + &clock, + ), + }, + clock: &clock, + } for _, sec := range seconds { - thisBackoff := request.backoff.CalculateBackoff(request.URL()) + thisBackoff := request.backoff.CalculateBackoffWithContext(ctx, request.URL()) t.Logf("Current backoff %v", thisBackoff) if thisBackoff != time.Duration(sec)*time.Second { t.Errorf("Backoff is %v instead of %v", thisBackoff, sec) } + + // This relies on advancing the fake clock by exactly the duration + // that SleepWithContext is being called for while DoRaw is executing. + // stepClockDuringSleep.SleepWithContext ensures that this happens. now := clock.Now() - request.DoRaw(context.Background()) + request.DoRaw(ctx) elapsed := clock.Since(now) if clock.Since(now) != thisBackoff { t.Errorf("CalculatedBackoff not honored by clock: Expected time of %v, but got %v ", thisBackoff, elapsed) @@ -1531,18 +1540,51 @@ func TestBackoffLifecycle(t *testing.T) { } } +type stepClockDuringSleep struct { + BackoffManagerWithContext + clock *testingclock.FakeClock +} + +// SleepWithContext wraps the underlying SleepWithContext and ensures that once +// that is sleeping, the fake clock advances by exactly the duration that +// it is sleeping for. +func (s stepClockDuringSleep) SleepWithContext(ctx context.Context, d time.Duration) { + // This code is sensitive to both the implementation of + // URLBackoff.SleepWithContext and of FakeClock.NewTimer: + // - SleepWithContext must be a no-op when the duration is zero + // => no need to step the fake clock + // - SleepWithContext must use FakeClock.NewTimer, not FakeClock.Sleep + // because the latter would advance time itself + if d != 0 { + go func() { + // Poll until the caller is sleeping. + for { + if s.clock.HasWaiters() { + s.clock.Step(d) + return + } + if ctx.Err() != nil { + return + } + time.Sleep(time.Millisecond) + } + }() + } + s.BackoffManagerWithContext.SleepWithContext(ctx, d) +} + type testBackoffManager struct { sleeps []time.Duration } -func (b *testBackoffManager) UpdateBackoff(actualUrl *url.URL, err error, responseCode int) { +func (b *testBackoffManager) UpdateBackoffWithContext(ctx context.Context, actualURL *url.URL, err error, responseCode int) { } -func (b *testBackoffManager) CalculateBackoff(actualUrl *url.URL) time.Duration { +func (b *testBackoffManager) CalculateBackoffWithContext(ctx context.Context, actualURL *url.URL) time.Duration { return time.Duration(0) } -func (b *testBackoffManager) Sleep(d time.Duration) { +func (b *testBackoffManager) SleepWithContext(ctx context.Context, d time.Duration) { b.sleeps = append(b.sleeps, d) } @@ -1568,7 +1610,7 @@ func TestCheckRetryClosesBody(t *testing.T) { expectedSleeps := []time.Duration{0, time.Second, time.Second, time.Second, time.Second} c := testRESTClient(t, testServer) - c.createBackoffMgr = func() BackoffManager { return backoff } + c.createBackoffMgr = func() BackoffManagerWithContext { return backoff } _, err := c.Verb("POST"). Prefix("foo", "bar"). Suffix("baz"). @@ -2612,6 +2654,8 @@ type noSleepBackOff struct { func (n *noSleepBackOff) Sleep(d time.Duration) {} +func (n *noSleepBackOff) SleepWithContext(ctx context.Context, d time.Duration) {} + func TestRequestWithRetry(t *testing.T) { tests := []struct { name string @@ -2997,7 +3041,6 @@ const retryTestKey retryTestKeyType = iota // metric calls are invoked appropriately in right order. type withRateLimiterBackoffManagerAndMetrics struct { flowcontrol.RateLimiter - *NoBackoff metrics.ResultMetric calculateBackoffSeq int64 calculateBackoffFn func(i int64) time.Duration @@ -3013,7 +3056,7 @@ func (lb *withRateLimiterBackoffManagerAndMetrics) Wait(ctx context.Context) err return nil } -func (lb *withRateLimiterBackoffManagerAndMetrics) CalculateBackoff(actualUrl *url.URL) time.Duration { +func (lb *withRateLimiterBackoffManagerAndMetrics) CalculateBackoffWithContext(ctx context.Context, actualURL *url.URL) time.Duration { lb.invokeOrderGot = append(lb.invokeOrderGot, "BackoffManager.CalculateBackoff") waitFor := lb.calculateBackoffFn(lb.calculateBackoffSeq) @@ -3021,11 +3064,11 @@ func (lb *withRateLimiterBackoffManagerAndMetrics) CalculateBackoff(actualUrl *u return waitFor } -func (lb *withRateLimiterBackoffManagerAndMetrics) UpdateBackoff(actualUrl *url.URL, err error, responseCode int) { +func (lb *withRateLimiterBackoffManagerAndMetrics) UpdateBackoffWithContext(ctx context.Context, actualURL *url.URL, err error, responseCode int) { lb.invokeOrderGot = append(lb.invokeOrderGot, "BackoffManager.UpdateBackoff") } -func (lb *withRateLimiterBackoffManagerAndMetrics) Sleep(d time.Duration) { +func (lb *withRateLimiterBackoffManagerAndMetrics) SleepWithContext(ctx context.Context, d time.Duration) { lb.invokeOrderGot = append(lb.invokeOrderGot, "BackoffManager.Sleep") lb.sleepsGot = append(lb.sleepsGot, d.String()) } @@ -3206,7 +3249,6 @@ func testRetryWithRateLimiterBackoffAndMetrics(t *testing.T, key string, doFunc t.Run(test.name, func(t *testing.T) { interceptor := &withRateLimiterBackoffManagerAndMetrics{ RateLimiter: flowcontrol.NewFakeAlwaysRateLimiter(), - NoBackoff: &NoBackoff{}, calculateBackoffFn: test.calculateBackoffFn, } diff --git a/rest/urlbackoff.go b/rest/urlbackoff.go index 2f9962d7..5b7b4e21 100644 --- a/rest/urlbackoff.go +++ b/rest/urlbackoff.go @@ -17,6 +17,8 @@ limitations under the License. package rest import ( + "context" + "fmt" "net/url" "time" @@ -32,12 +34,24 @@ import ( var serverIsOverloadedSet = sets.NewInt(429) var maxResponseCode = 499 +//go:generate mockery + +// Deprecated: BackoffManager.Sleep ignores the caller's context. Use BackoffManagerWithContext instead. type BackoffManager interface { - UpdateBackoff(actualUrl *url.URL, err error, responseCode int) - CalculateBackoff(actualUrl *url.URL) time.Duration + UpdateBackoff(actualURL *url.URL, err error, responseCode int) + CalculateBackoff(actualURL *url.URL) time.Duration Sleep(d time.Duration) } +type BackoffManagerWithContext interface { + UpdateBackoffWithContext(ctx context.Context, actualURL *url.URL, err error, responseCode int) + CalculateBackoffWithContext(ctx context.Context, actualURL *url.URL) time.Duration + SleepWithContext(ctx context.Context, d time.Duration) +} + +var _ BackoffManager = &URLBackoff{} +var _ BackoffManagerWithContext = &URLBackoff{} + // URLBackoff struct implements the semantics on top of Backoff which // we need for URL specific exponential backoff. type URLBackoff struct { @@ -49,11 +63,19 @@ type URLBackoff struct { type NoBackoff struct { } -func (n *NoBackoff) UpdateBackoff(actualUrl *url.URL, err error, responseCode int) { +func (n *NoBackoff) UpdateBackoff(actualURL *url.URL, err error, responseCode int) { // do nothing. } -func (n *NoBackoff) CalculateBackoff(actualUrl *url.URL) time.Duration { +func (n *NoBackoff) UpdateBackoffWithContext(ctx context.Context, actualURL *url.URL, err error, responseCode int) { + // do nothing. +} + +func (n *NoBackoff) CalculateBackoff(actualURL *url.URL) time.Duration { + return 0 * time.Second +} + +func (n *NoBackoff) CalculateBackoffWithContext(ctx context.Context, actualURL *url.URL) time.Duration { return 0 * time.Second } @@ -61,10 +83,21 @@ func (n *NoBackoff) Sleep(d time.Duration) { time.Sleep(d) } +func (n *NoBackoff) SleepWithContext(ctx context.Context, d time.Duration) { + if d == 0 { + return + } + t := time.NewTimer(d) + defer t.Stop() + select { + case <-ctx.Done(): + case <-t.C: + } +} + // Disable makes the backoff trivial, i.e., sets it to zero. This might be used // by tests which want to run 1000s of mock requests without slowing down. func (b *URLBackoff) Disable() { - klog.V(4).Infof("Disabling backoff strategy") b.Backoff = flowcontrol.NewBackOff(0*time.Second, 0*time.Second) } @@ -76,32 +109,74 @@ func (b *URLBackoff) baseUrlKey(rawurl *url.URL) string { // in the future. host, err := url.Parse(rawurl.String()) if err != nil { - klog.V(4).Infof("Error extracting url: %v", rawurl) - panic("bad url!") + panic(fmt.Sprintf("Error parsing bad URL %q: %v", rawurl, err)) } return host.Host } // UpdateBackoff updates backoff metadata -func (b *URLBackoff) UpdateBackoff(actualUrl *url.URL, err error, responseCode int) { +func (b *URLBackoff) UpdateBackoff(actualURL *url.URL, err error, responseCode int) { + b.UpdateBackoffWithContext(context.Background(), actualURL, err, responseCode) +} + +// UpdateBackoffWithContext updates backoff metadata +func (b *URLBackoff) UpdateBackoffWithContext(ctx context.Context, actualURL *url.URL, err error, responseCode int) { // range for retry counts that we store is [0,13] if responseCode > maxResponseCode || serverIsOverloadedSet.Has(responseCode) { - b.Backoff.Next(b.baseUrlKey(actualUrl), b.Backoff.Clock.Now()) + b.Backoff.Next(b.baseUrlKey(actualURL), b.Backoff.Clock.Now()) return } else if responseCode >= 300 || err != nil { - klog.V(4).Infof("Client is returning errors: code %v, error %v", responseCode, err) + klog.FromContext(ctx).V(4).Info("Client is returning errors", "code", responseCode, "err", err) } //If we got this far, there is no backoff required for this URL anymore. - b.Backoff.Reset(b.baseUrlKey(actualUrl)) + b.Backoff.Reset(b.baseUrlKey(actualURL)) } // CalculateBackoff takes a url and back's off exponentially, // based on its knowledge of existing failures. -func (b *URLBackoff) CalculateBackoff(actualUrl *url.URL) time.Duration { - return b.Backoff.Get(b.baseUrlKey(actualUrl)) +func (b *URLBackoff) CalculateBackoff(actualURL *url.URL) time.Duration { + return b.Backoff.Get(b.baseUrlKey(actualURL)) +} + +// CalculateBackoffWithContext takes a url and back's off exponentially, +// based on its knowledge of existing failures. +func (b *URLBackoff) CalculateBackoffWithContext(ctx context.Context, actualURL *url.URL) time.Duration { + return b.Backoff.Get(b.baseUrlKey(actualURL)) } func (b *URLBackoff) Sleep(d time.Duration) { b.Backoff.Clock.Sleep(d) } + +func (b *URLBackoff) SleepWithContext(ctx context.Context, d time.Duration) { + if d == 0 { + return + } + t := b.Backoff.Clock.NewTimer(d) + defer t.Stop() + select { + case <-ctx.Done(): + case <-t.C(): + } +} + +// backoffManagerNopContext wraps a BackoffManager and adds the *WithContext methods. +type backoffManagerNopContext struct { + BackoffManager +} + +var _ BackoffManager = &backoffManagerNopContext{} +var _ BackoffManagerWithContext = &backoffManagerNopContext{} + +func (b *backoffManagerNopContext) UpdateBackoffWithContext(ctx context.Context, actualURL *url.URL, err error, responseCode int) { + b.UpdateBackoff(actualURL, err, responseCode) +} + +func (b *backoffManagerNopContext) CalculateBackoffWithContext(ctx context.Context, actualURL *url.URL) time.Duration { + return b.CalculateBackoff(actualURL) +} + +func (b *backoffManagerNopContext) SleepWithContext(ctx context.Context, d time.Duration) { + b.Sleep(d) +} diff --git a/rest/urlbackoff_test.go b/rest/urlbackoff_test.go index c5f43923..b80b721d 100644 --- a/rest/urlbackoff_test.go +++ b/rest/urlbackoff_test.go @@ -17,10 +17,14 @@ limitations under the License. package rest import ( + "context" + "errors" "net/url" "testing" "time" + "github.com/stretchr/testify/assert" + "k8s.io/client-go/util/flowcontrol" ) @@ -77,3 +81,38 @@ func TestURLBackoffFunctionality(t *testing.T) { t.Errorf("The final return code %v should have resulted in a backoff ! ", returnCodes[7]) } } + +func TestBackoffManagerNopContext(t *testing.T) { + mock := NewMockBackoffManager(t) + + sleepDuration := 42 * time.Second + mock.On("Sleep", sleepDuration).Return() + url := &url.URL{} + mock.On("CalculateBackoff", url).Return(time.Second) + err := errors.New("fake error") + responseCode := 404 + mock.On("UpdateBackoff", url, err, responseCode).Return() + + ctx := context.Background() + wrapper := backoffManagerNopContext{BackoffManager: mock} + wrapper.SleepWithContext(ctx, sleepDuration) + wrapper.CalculateBackoffWithContext(ctx, url) + wrapper.UpdateBackoffWithContext(ctx, url, err, responseCode) +} + +func TestNoBackoff(t *testing.T) { + var backoff NoBackoff + assert.Equal(t, 0*time.Second, backoff.CalculateBackoff(nil)) + assert.Equal(t, 0*time.Second, backoff.CalculateBackoffWithContext(context.Background(), nil)) + + start := time.Now() + backoff.Sleep(0 * time.Second) + assert.WithinDuration(t, start, time.Now(), time.Minute /* pretty generous, but we don't want to flake */, time.Since(start), "backoff.Sleep") + + // Cancel right away to prevent sleeping. + ctx, cancel := context.WithCancel(context.Background()) + cancel() + start = time.Now() + backoff.SleepWithContext(ctx, 10*time.Minute) + assert.WithinDuration(t, start, time.Now(), time.Minute /* pretty generous, but we don't want to flake */, time.Since(start), "backoff.SleepWithContext") +} diff --git a/rest/with_retry.go b/rest/with_retry.go index eaaadc6a..eb7eaaf3 100644 --- a/rest/with_retry.go +++ b/rest/with_retry.go @@ -209,18 +209,18 @@ func (r *withRetry) Before(ctx context.Context, request *Request) error { // we do a backoff sleep before the first attempt is made, // (preserving current behavior). if request.backoff != nil { - request.backoff.Sleep(request.backoff.CalculateBackoff(url)) + request.backoff.SleepWithContext(ctx, request.backoff.CalculateBackoffWithContext(ctx, url)) } return nil } // if we are here, we have made attempt(s) at least once before. if request.backoff != nil { - delay := request.backoff.CalculateBackoff(url) + delay := request.backoff.CalculateBackoffWithContext(ctx, url) if r.retryAfter.Wait > delay { delay = r.retryAfter.Wait } - request.backoff.Sleep(delay) + request.backoff.SleepWithContext(ctx, delay) } // We are retrying the request that we already send to @@ -258,9 +258,9 @@ func (r *withRetry) After(ctx context.Context, request *Request, resp *http.Resp if request.c.base != nil { if err != nil { - request.backoff.UpdateBackoff(request.URL(), err, 0) + request.backoff.UpdateBackoffWithContext(ctx, request.URL(), err, 0) } else { - request.backoff.UpdateBackoff(request.URL(), err, resp.StatusCode) + request.backoff.UpdateBackoffWithContext(ctx, request.URL(), err, resp.StatusCode) } } }