mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-12 21:36:24 +00:00
Add Context() to enable per-request cancellation
This commit is contained in:
parent
79f497bca7
commit
8070548ebe
@ -18,6 +18,7 @@ package restclient
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
|
"context"
|
||||||
"encoding/hex"
|
"encoding/hex"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
@ -105,16 +106,14 @@ type Request struct {
|
|||||||
resource string
|
resource string
|
||||||
resourceName string
|
resourceName string
|
||||||
subresource string
|
subresource string
|
||||||
selector labels.Selector
|
|
||||||
timeout time.Duration
|
timeout time.Duration
|
||||||
|
|
||||||
// output
|
// output
|
||||||
err error
|
err error
|
||||||
body io.Reader
|
body io.Reader
|
||||||
|
|
||||||
// The constructed request and the response
|
// This is only used for per-request timeouts, deadlines, and cancellations.
|
||||||
req *http.Request
|
ctx context.Context
|
||||||
resp *http.Response
|
|
||||||
|
|
||||||
backoffMgr BackoffManager
|
backoffMgr BackoffManager
|
||||||
throttle flowcontrol.RateLimiter
|
throttle flowcontrol.RateLimiter
|
||||||
@ -566,6 +565,13 @@ func (r *Request) Body(obj interface{}) *Request {
|
|||||||
return r
|
return r
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Context adds a context to the request. Contexts are only used for
|
||||||
|
// timeouts, deadlines, and cancellations.
|
||||||
|
func (r *Request) Context(ctx context.Context) *Request {
|
||||||
|
r.ctx = ctx
|
||||||
|
return r
|
||||||
|
}
|
||||||
|
|
||||||
// URL returns the current working URL.
|
// URL returns the current working URL.
|
||||||
func (r *Request) URL() *url.URL {
|
func (r *Request) URL() *url.URL {
|
||||||
p := r.pathPrefix
|
p := r.pathPrefix
|
||||||
@ -651,6 +657,9 @@ func (r *Request) Watch() (watch.Interface, error) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
if r.ctx != nil {
|
||||||
|
req = req.WithContext(r.ctx)
|
||||||
|
}
|
||||||
req.Header = r.headers
|
req.Header = r.headers
|
||||||
client := r.client
|
client := r.client
|
||||||
if client == nil {
|
if client == nil {
|
||||||
@ -720,6 +729,9 @@ func (r *Request) Stream() (io.ReadCloser, error) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
if r.ctx != nil {
|
||||||
|
req = req.WithContext(r.ctx)
|
||||||
|
}
|
||||||
req.Header = r.headers
|
req.Header = r.headers
|
||||||
client := r.client
|
client := r.client
|
||||||
if client == nil {
|
if client == nil {
|
||||||
@ -794,6 +806,9 @@ func (r *Request) request(fn func(*http.Request, *http.Response)) error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
if r.ctx != nil {
|
||||||
|
req = req.WithContext(r.ctx)
|
||||||
|
}
|
||||||
req.Header = r.headers
|
req.Header = r.headers
|
||||||
|
|
||||||
r.backoffMgr.Sleep(r.backoffMgr.CalculateBackoff(r.URL()))
|
r.backoffMgr.Sleep(r.backoffMgr.CalculateBackoff(r.URL()))
|
||||||
|
@ -18,6 +18,7 @@ package restclient
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
@ -1621,3 +1622,32 @@ func testRESTClient(t testing.TB, srv *httptest.Server) *RESTClient {
|
|||||||
}
|
}
|
||||||
return client
|
return client
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestDoContext(t *testing.T) {
|
||||||
|
receivedCh := make(chan struct{})
|
||||||
|
block := make(chan struct{})
|
||||||
|
testServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
|
||||||
|
close(receivedCh)
|
||||||
|
<-block
|
||||||
|
w.WriteHeader(http.StatusOK)
|
||||||
|
}))
|
||||||
|
defer testServer.Close()
|
||||||
|
defer close(block)
|
||||||
|
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
<-receivedCh
|
||||||
|
cancel()
|
||||||
|
}()
|
||||||
|
|
||||||
|
c := testRESTClient(t, testServer)
|
||||||
|
_, err := c.Verb("GET").
|
||||||
|
Context(ctx).
|
||||||
|
Prefix("foo").
|
||||||
|
DoRaw()
|
||||||
|
if err == nil {
|
||||||
|
t.Fatal("Expected context cancellation error")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user