mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-09-14 13:45:06 +00:00
Exposing http.Client for configurable timeouts
This commit is contained in:
@@ -28,15 +28,21 @@ import (
|
|||||||
"k8s.io/kubernetes/pkg/runtime"
|
"k8s.io/kubernetes/pkg/runtime"
|
||||||
)
|
)
|
||||||
|
|
||||||
type HTTPClientFunc func(*http.Request) (*http.Response, error)
|
func HTTPClientFunc(f func(*http.Request) (*http.Response, error)) *http.Client {
|
||||||
|
return &http.Client{
|
||||||
|
Transport: roundTripperFunc(f),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (f HTTPClientFunc) Do(req *http.Request) (*http.Response, error) {
|
type roundTripperFunc func(*http.Request) (*http.Response, error)
|
||||||
|
|
||||||
|
func (f roundTripperFunc) RoundTrip(req *http.Request) (*http.Response, error) {
|
||||||
return f(req)
|
return f(req)
|
||||||
}
|
}
|
||||||
|
|
||||||
// RESTClient provides a fake RESTClient interface.
|
// RESTClient provides a fake RESTClient interface.
|
||||||
type RESTClient struct {
|
type RESTClient struct {
|
||||||
Client unversioned.HTTPClient
|
Client *http.Client
|
||||||
Codec runtime.Codec
|
Codec runtime.Codec
|
||||||
Req *http.Request
|
Req *http.Request
|
||||||
Resp *http.Response
|
Resp *http.Response
|
||||||
@@ -68,7 +74,7 @@ func (c *RESTClient) Do(req *http.Request) (*http.Response, error) {
|
|||||||
return nil, c.Err
|
return nil, c.Err
|
||||||
}
|
}
|
||||||
c.Req = req
|
c.Req = req
|
||||||
if c.Client != unversioned.HTTPClient(nil) {
|
if c.Client != nil {
|
||||||
return c.Client.Do(req)
|
return c.Client.Do(req)
|
||||||
}
|
}
|
||||||
return c.Resp, nil
|
return c.Resp, nil
|
||||||
|
@@ -19,7 +19,7 @@ package unversioned_test
|
|||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"errors"
|
||||||
"io"
|
"io"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"net/http"
|
"net/http"
|
||||||
@@ -41,7 +41,6 @@ func objBody(object interface{}) io.ReadCloser {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestNegotiateVersion(t *testing.T) {
|
func TestNegotiateVersion(t *testing.T) {
|
||||||
refusedErr := fmt.Errorf("connection refused")
|
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
name, version, expectedVersion string
|
name, version, expectedVersion string
|
||||||
serverVersions []string
|
serverVersions []string
|
||||||
@@ -87,8 +86,8 @@ func TestNegotiateVersion(t *testing.T) {
|
|||||||
config: &unversioned.Config{Version: testapi.Default.Version()},
|
config: &unversioned.Config{Version: testapi.Default.Version()},
|
||||||
serverVersions: []string{"version1"},
|
serverVersions: []string{"version1"},
|
||||||
clientVersions: []string{"version1", testapi.Default.Version()},
|
clientVersions: []string{"version1", testapi.Default.Version()},
|
||||||
sendErr: refusedErr,
|
sendErr: errors.New("connection refused"),
|
||||||
expectErr: func(err error) bool { return err == refusedErr },
|
expectErr: func(err error) bool { return strings.Contains(err.Error(), "connection refused") },
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
codec := testapi.Default.Codec()
|
codec := testapi.Default.Codec()
|
||||||
|
@@ -17,6 +17,7 @@ limitations under the License.
|
|||||||
package unversioned
|
package unversioned
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"net/http"
|
||||||
"net/url"
|
"net/url"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
@@ -43,7 +44,7 @@ type RESTClient struct {
|
|||||||
|
|
||||||
// Set specific behavior of the client. If not set http.DefaultClient will be
|
// Set specific behavior of the client. If not set http.DefaultClient will be
|
||||||
// used.
|
// used.
|
||||||
Client HTTPClient
|
Client *http.Client
|
||||||
|
|
||||||
// TODO extract this into a wrapper interface via the RESTClient interface in kubectl.
|
// TODO extract this into a wrapper interface via the RESTClient interface in kubectl.
|
||||||
Throttle util.RateLimiter
|
Throttle util.RateLimiter
|
||||||
@@ -88,6 +89,9 @@ func (c *RESTClient) Verb(verb string) *Request {
|
|||||||
if c.Throttle != nil {
|
if c.Throttle != nil {
|
||||||
c.Throttle.Accept()
|
c.Throttle.Accept()
|
||||||
}
|
}
|
||||||
|
if c.Client == nil {
|
||||||
|
return NewRequest(nil, verb, c.baseURL, c.apiVersion, c.Codec)
|
||||||
|
}
|
||||||
return NewRequest(c.Client, verb, c.baseURL, c.apiVersion, c.Codec)
|
return NewRequest(c.Client, verb, c.baseURL, c.apiVersion, c.Codec)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -28,7 +28,6 @@ import (
|
|||||||
"github.com/spf13/cobra"
|
"github.com/spf13/cobra"
|
||||||
|
|
||||||
"k8s.io/kubernetes/pkg/api"
|
"k8s.io/kubernetes/pkg/api"
|
||||||
"k8s.io/kubernetes/pkg/api/errors"
|
|
||||||
"k8s.io/kubernetes/pkg/client/unversioned/fake"
|
"k8s.io/kubernetes/pkg/client/unversioned/fake"
|
||||||
"k8s.io/kubernetes/pkg/kubectl"
|
"k8s.io/kubernetes/pkg/kubectl"
|
||||||
cmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util"
|
cmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util"
|
||||||
@@ -220,7 +219,7 @@ func TestApplyNonExistObject(t *testing.T) {
|
|||||||
Client: fake.HTTPClientFunc(func(req *http.Request) (*http.Response, error) {
|
Client: fake.HTTPClientFunc(func(req *http.Request) (*http.Response, error) {
|
||||||
switch p, m := req.URL.Path, req.Method; {
|
switch p, m := req.URL.Path, req.Method; {
|
||||||
case p == pathNameRC && m == "GET":
|
case p == pathNameRC && m == "GET":
|
||||||
return &http.Response{StatusCode: 404}, errors.NewNotFound("ReplicationController", "")
|
return &http.Response{StatusCode: 404, Body: ioutil.NopCloser(bytes.NewReader(nil))}, nil
|
||||||
case p == pathRC && m == "POST":
|
case p == pathRC && m == "POST":
|
||||||
bodyRC := ioutil.NopCloser(bytes.NewReader(currentRC))
|
bodyRC := ioutil.NopCloser(bytes.NewReader(currentRC))
|
||||||
return &http.Response{StatusCode: 201, Body: bodyRC}, nil
|
return &http.Response{StatusCode: 201, Body: bodyRC}, nil
|
||||||
|
@@ -131,11 +131,10 @@ func TestHelperCreate(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
Resp *http.Response
|
Resp *http.Response
|
||||||
RespFunc fake.HTTPClientFunc
|
HttpErr error
|
||||||
HttpErr error
|
Modify bool
|
||||||
Modify bool
|
Object runtime.Object
|
||||||
Object runtime.Object
|
|
||||||
|
|
||||||
ExpectObject runtime.Object
|
ExpectObject runtime.Object
|
||||||
Err bool
|
Err bool
|
||||||
@@ -188,9 +187,6 @@ func TestHelperCreate(t *testing.T) {
|
|||||||
Resp: test.Resp,
|
Resp: test.Resp,
|
||||||
Err: test.HttpErr,
|
Err: test.HttpErr,
|
||||||
}
|
}
|
||||||
if test.RespFunc != nil {
|
|
||||||
client.Client = test.RespFunc
|
|
||||||
}
|
|
||||||
modifier := &Helper{
|
modifier := &Helper{
|
||||||
RESTClient: client,
|
RESTClient: client,
|
||||||
Codec: testapi.Default.Codec(),
|
Codec: testapi.Default.Codec(),
|
||||||
@@ -380,11 +376,11 @@ func TestHelperReplace(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
Resp *http.Response
|
Resp *http.Response
|
||||||
RespFunc fake.HTTPClientFunc
|
HTTPClient *http.Client
|
||||||
HttpErr error
|
HttpErr error
|
||||||
Overwrite bool
|
Overwrite bool
|
||||||
Object runtime.Object
|
Object runtime.Object
|
||||||
|
|
||||||
ExpectObject runtime.Object
|
ExpectObject runtime.Object
|
||||||
Err bool
|
Err bool
|
||||||
@@ -421,12 +417,12 @@ func TestHelperReplace(t *testing.T) {
|
|||||||
Spec: apitesting.DeepEqualSafePodSpec(),
|
Spec: apitesting.DeepEqualSafePodSpec(),
|
||||||
},
|
},
|
||||||
Overwrite: true,
|
Overwrite: true,
|
||||||
RespFunc: func(req *http.Request) (*http.Response, error) {
|
HTTPClient: fake.HTTPClientFunc(func(req *http.Request) (*http.Response, error) {
|
||||||
if req.Method == "PUT" {
|
if req.Method == "PUT" {
|
||||||
return &http.Response{StatusCode: http.StatusOK, Body: objBody(&unversioned.Status{Status: unversioned.StatusSuccess})}, nil
|
return &http.Response{StatusCode: http.StatusOK, Body: objBody(&unversioned.Status{Status: unversioned.StatusSuccess})}, nil
|
||||||
}
|
}
|
||||||
return &http.Response{StatusCode: http.StatusOK, Body: objBody(&api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo", ResourceVersion: "10"}})}, nil
|
return &http.Response{StatusCode: http.StatusOK, Body: objBody(&api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo", ResourceVersion: "10"}})}, nil
|
||||||
},
|
}),
|
||||||
Req: expectPut,
|
Req: expectPut,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
@@ -438,12 +434,10 @@ func TestHelperReplace(t *testing.T) {
|
|||||||
}
|
}
|
||||||
for i, test := range tests {
|
for i, test := range tests {
|
||||||
client := &fake.RESTClient{
|
client := &fake.RESTClient{
|
||||||
Codec: testapi.Default.Codec(),
|
Client: test.HTTPClient,
|
||||||
Resp: test.Resp,
|
Codec: testapi.Default.Codec(),
|
||||||
Err: test.HttpErr,
|
Resp: test.Resp,
|
||||||
}
|
Err: test.HttpErr,
|
||||||
if test.RespFunc != nil {
|
|
||||||
client.Client = test.RespFunc
|
|
||||||
}
|
}
|
||||||
modifier := &Helper{
|
modifier := &Helper{
|
||||||
RESTClient: client,
|
RESTClient: client,
|
||||||
|
@@ -18,7 +18,6 @@ package e2e
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"net/http"
|
|
||||||
"net/url"
|
"net/url"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@@ -133,10 +132,6 @@ func getAllNodesInCluster(c *client.Client) ([]string, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func getInfluxdbClient(c *client.Client) (*influxdb.Client, error) {
|
func getInfluxdbClient(c *client.Client) (*influxdb.Client, error) {
|
||||||
kubeMasterHttpClient, ok := c.Client.(*http.Client)
|
|
||||||
if !ok {
|
|
||||||
Failf("failed to get master http client")
|
|
||||||
}
|
|
||||||
proxyUrl := fmt.Sprintf("%s/api/v1/proxy/namespaces/%s/services/%s:api/", getMasterHost(), api.NamespaceSystem, influxdbService)
|
proxyUrl := fmt.Sprintf("%s/api/v1/proxy/namespaces/%s/services/%s:api/", getMasterHost(), api.NamespaceSystem, influxdbService)
|
||||||
config := &influxdb.ClientConfig{
|
config := &influxdb.ClientConfig{
|
||||||
Host: proxyUrl,
|
Host: proxyUrl,
|
||||||
@@ -144,7 +139,7 @@ func getInfluxdbClient(c *client.Client) (*influxdb.Client, error) {
|
|||||||
Username: influxdbUser,
|
Username: influxdbUser,
|
||||||
Password: influxdbPW,
|
Password: influxdbPW,
|
||||||
Database: influxdbDatabaseName,
|
Database: influxdbDatabaseName,
|
||||||
HttpClient: kubeMasterHttpClient,
|
HttpClient: c.Client,
|
||||||
IsSecure: true,
|
IsSecure: true,
|
||||||
}
|
}
|
||||||
return influxdb.NewClient(config)
|
return influxdb.NewClient(config)
|
||||||
|
Reference in New Issue
Block a user