Retool HTTP and UDP e2e utils

This is a prefactoring for followup changes that need to use very
similar but subtly different test.  Now it is more generic, though it
pushes a little logic up the stack.  That makes sense to me.
This commit is contained in:
Tim Hockin 2019-03-07 17:08:44 -08:00
parent 19e333b5cc
commit 382f5c83c0
4 changed files with 290 additions and 188 deletions

View File

@ -17,7 +17,6 @@ limitations under the License.
package framework
import (
"bytes"
"encoding/json"
"fmt"
"io/ioutil"
@ -708,13 +707,131 @@ func CheckReachabilityFromPod(expectToBeReachable bool, timeout time.Duration, n
ExpectNoError(err)
}
// Does an HTTP GET, but does not reuse TCP connections
// This masks problems where the iptables rule has changed, but we don't see it
// This is intended for relatively quick requests (status checks), so we set a short (5 seconds) timeout
func httpGetNoConnectionPool(url string) (*http.Response, error) {
return httpGetNoConnectionPoolTimeout(url, 5*time.Second)
type HTTPPokeParams struct {
Timeout time.Duration
ExpectCode int // default = 200
BodyContains string
RetriableCodes []int
}
type HTTPPokeResult struct {
Status HTTPPokeStatus
Code int // HTTP code: 0 if the connection was not made
Error error // if there was any error
Body []byte // if code != 0
}
type HTTPPokeStatus string
const (
HTTPSuccess HTTPPokeStatus = "Success"
HTTPError HTTPPokeStatus = "UnknownError"
// Any time we add new errors, we should audit all callers of this.
HTTPTimeout HTTPPokeStatus = "TimedOut"
HTTPRefused HTTPPokeStatus = "ConnectionRefused"
HTTPRetryCode HTTPPokeStatus = "RetryCode"
HTTPWrongCode HTTPPokeStatus = "WrongCode"
HTTPBadResponse HTTPPokeStatus = "BadResponse"
)
// PokeHTTP tries to connect to a host on a port for a given URL path. Callers
// can specify additional success parameters, if desired.
//
// The result status will be characterized as precisely as possible, given the
// known users of this.
//
// The result code will be zero in case of any failure to connect, or non-zero
// if the HTTP transaction completed (even if the other test params make this a
// failure).
//
// The result error will be populated for any status other than Success.
//
// The result body will be populated if the HTTP transaction was completed, even
// if the other test params make this a failure).
func PokeHTTP(host string, port int, path string, params *HTTPPokeParams) HTTPPokeResult {
hostPort := net.JoinHostPort(host, strconv.Itoa(port))
url := fmt.Sprintf("http://%s%s", hostPort, path)
ret := HTTPPokeResult{}
// Sanity check inputs, because it has happened. These are the only things
// that should hard fail the test - they are basically ASSERT()s.
if host == "" {
Failf("Got empty host for HTTP poke (%s)", url)
return ret
}
if port == 0 {
Failf("Got port==0 for HTTP poke (%s)", url)
return ret
}
// Set default params.
if params == nil {
params = &HTTPPokeParams{}
}
if params.ExpectCode == 0 {
params.ExpectCode = http.StatusOK
}
Logf("Poking %q", url)
resp, err := httpGetNoConnectionPoolTimeout(url, params.Timeout)
if err != nil {
ret.Error = err
neterr, ok := err.(net.Error)
if ok && neterr.Timeout() {
ret.Status = HTTPTimeout
} else if strings.Contains(err.Error(), "connection refused") {
ret.Status = HTTPRefused
} else {
ret.Status = HTTPError
}
Logf("Poke(%q): %v", url, err)
return ret
}
ret.Code = resp.StatusCode
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
ret.Status = HTTPError
ret.Error = fmt.Errorf("error reading HTTP body: %v", err)
Logf("Poke(%q): %v", url, ret.Error)
return ret
}
ret.Body = make([]byte, len(body))
copy(ret.Body, body)
if resp.StatusCode != params.ExpectCode {
for _, code := range params.RetriableCodes {
if resp.StatusCode == code {
ret.Error = fmt.Errorf("retriable status code: %d", resp.StatusCode)
ret.Status = HTTPRetryCode
Logf("Poke(%q): %v", url, ret.Error)
return ret
}
}
ret.Status = HTTPWrongCode
ret.Error = fmt.Errorf("bad status code: %d", resp.StatusCode)
Logf("Poke(%q): %v", url, ret.Error)
return ret
}
if params.BodyContains != "" && !strings.Contains(string(body), params.BodyContains) {
ret.Status = HTTPBadResponse
ret.Error = fmt.Errorf("response does not contain expected substring: %q", string(body))
Logf("Poke(%q): %v", url, ret.Error)
return ret
}
ret.Status = HTTPSuccess
Logf("Poke(%q): success", url)
return ret
}
// Does an HTTP GET, but does not reuse TCP connections
// This masks problems where the iptables rule has changed, but we don't see it
func httpGetNoConnectionPoolTimeout(url string, timeout time.Duration) (*http.Response, error) {
tr := utilnet.SetTransportDefaults(&http.Transport{
DisableKeepAlives: true,
@ -727,178 +844,126 @@ func httpGetNoConnectionPoolTimeout(url string, timeout time.Duration) (*http.Re
return client.Get(url)
}
func TestReachableHTTP(ip string, port int, request string, expect string) (bool, error) {
return TestReachableHTTPWithContent(ip, port, request, expect, nil)
type UDPPokeParams struct {
Timeout time.Duration
Response string
}
func TestReachableHTTPWithRetriableErrorCodes(ip string, port int, request string, expect string, retriableErrCodes []int) (bool, error) {
return TestReachableHTTPWithContentTimeoutWithRetriableErrorCodes(ip, port, request, expect, nil, retriableErrCodes, time.Second*5)
type UDPPokeResult struct {
Status UDPPokeStatus
Error error // if there was any error
Response []byte // if code != 0
}
func TestReachableHTTPWithContent(ip string, port int, request string, expect string, content *bytes.Buffer) (bool, error) {
return TestReachableHTTPWithContentTimeout(ip, port, request, expect, content, 5*time.Second)
}
type UDPPokeStatus string
func TestReachableHTTPWithContentTimeout(ip string, port int, request string, expect string, content *bytes.Buffer, timeout time.Duration) (bool, error) {
return TestReachableHTTPWithContentTimeoutWithRetriableErrorCodes(ip, port, request, expect, content, []int{}, timeout)
}
const (
UDPSuccess UDPPokeStatus = "Success"
UDPError UDPPokeStatus = "UnknownError"
// Any time we add new errors, we should audit all callers of this.
UDPTimeout UDPPokeStatus = "TimedOut"
UDPRefused UDPPokeStatus = "ConnectionRefused"
UDPBadResponse UDPPokeStatus = "BadResponse"
)
func TestReachableHTTPWithContentTimeoutWithRetriableErrorCodes(ip string, port int, request string, expect string, content *bytes.Buffer, retriableErrCodes []int, timeout time.Duration) (bool, error) {
// PokeUDP tries to connect to a host on a port and send the given request. Callers
// can specify additional success parameters, if desired.
//
// The result status will be characterized as precisely as possible, given the
// known users of this.
//
// The result error will be populated for any status other than Success.
//
// The result response will be populated if the UDP transaction was completed, even
// if the other test params make this a failure).
func PokeUDP(host string, port int, request string, params *UDPPokeParams) UDPPokeResult {
hostPort := net.JoinHostPort(host, strconv.Itoa(port))
url := fmt.Sprintf("udp://%s", hostPort)
ipPort := net.JoinHostPort(ip, strconv.Itoa(port))
url := fmt.Sprintf("http://%s%s", ipPort, request)
if ip == "" {
Failf("Got empty IP for reachability check (%s)", url)
return false, nil
ret := UDPPokeResult{}
// Sanity check inputs, because it has happened. These are the only things
// that should hard fail the test - they are basically ASSERT()s.
if host == "" {
Failf("Got empty host for UDP poke (%s)", url)
return ret
}
if port == 0 {
Failf("Got port==0 for reachability check (%s)", url)
return false, nil
Failf("Got port==0 for UDP poke (%s)", url)
return ret
}
Logf("Testing HTTP reachability of %v", url)
// Set default params.
if params == nil {
params = &UDPPokeParams{}
}
resp, err := httpGetNoConnectionPoolTimeout(url, timeout)
Logf("Poking %v", url)
con, err := net.Dial("udp", hostPort)
if err != nil {
Logf("Got error testing for reachability of %s: %v", url, err)
return false, nil
ret.Status = UDPError
ret.Error = err
Logf("Poke(%q): %v", url, err)
return ret
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
_, err = con.Write([]byte(fmt.Sprintf("%s\n", request)))
if err != nil {
Logf("Got error reading response from %s: %v", url, err)
return false, nil
}
if resp.StatusCode != 200 {
for _, code := range retriableErrCodes {
if resp.StatusCode == code {
Logf("Got non-success status %q when trying to access %s, but the error code is retriable", resp.Status, url)
return false, nil
}
ret.Error = err
neterr, ok := err.(net.Error)
if ok && neterr.Timeout() {
ret.Status = UDPTimeout
} else if strings.Contains(err.Error(), "connection refused") {
ret.Status = UDPRefused
} else {
ret.Status = UDPError
}
return false, fmt.Errorf("received non-success return status %q trying to access %s; got body: %s",
resp.Status, url, string(body))
}
if !strings.Contains(string(body), expect) {
return false, fmt.Errorf("received response body without expected substring %q: %s", expect, string(body))
}
if content != nil {
content.Write(body)
}
return true, nil
}
func TestNotReachableHTTP(ip string, port int) (bool, error) {
return TestNotReachableHTTPTimeout(ip, port, 5*time.Second)
}
func TestNotReachableHTTPTimeout(ip string, port int, timeout time.Duration) (bool, error) {
ipPort := net.JoinHostPort(ip, strconv.Itoa(port))
url := fmt.Sprintf("http://%s", ipPort)
if ip == "" {
Failf("Got empty IP for non-reachability check (%s)", url)
return false, nil
}
if port == 0 {
Failf("Got port==0 for non-reachability check (%s)", url)
return false, nil
Logf("Poke(%q): %v", url, err)
return ret
}
Logf("Testing HTTP non-reachability of %v", url)
if params.Timeout != 0 {
err = con.SetDeadline(time.Now().Add(params.Timeout))
if err != nil {
ret.Status = UDPError
ret.Error = err
Logf("Poke(%q): %v", url, err)
return ret
}
}
resp, err := httpGetNoConnectionPoolTimeout(url, timeout)
bufsize := len(params.Response) + 1
if bufsize == 0 {
bufsize = 4096
}
var buf []byte = make([]byte, bufsize)
n, err := con.Read(buf)
if err != nil {
Logf("Confirmed that %s is not reachable", url)
return true, nil
ret.Error = err
neterr, ok := err.(net.Error)
if ok && neterr.Timeout() {
ret.Status = UDPTimeout
} else if strings.Contains(err.Error(), "connection refused") {
ret.Status = UDPRefused
} else {
ret.Status = UDPError
}
Logf("Poke(%q): %v", url, err)
return ret
}
resp.Body.Close()
return false, nil
}
ret.Response = buf[0:n]
func TestReachableUDP(ip string, port int, request string, expect string) (bool, error) {
ipPort := net.JoinHostPort(ip, strconv.Itoa(port))
uri := fmt.Sprintf("udp://%s", ipPort)
if ip == "" {
Failf("Got empty IP for reachability check (%s)", uri)
return false, nil
}
if port == 0 {
Failf("Got port==0 for reachability check (%s)", uri)
return false, nil
if params.Response != "" && string(ret.Response) != params.Response {
ret.Status = UDPBadResponse
ret.Error = fmt.Errorf("response does not match expected string: %q", string(ret.Response))
Logf("Poke(%q): %v", url, ret.Error)
return ret
}
Logf("Testing UDP reachability of %v", uri)
con, err := net.Dial("udp", ipPort)
if err != nil {
return false, fmt.Errorf("Failed to dial %s: %v", ipPort, err)
}
_, err = con.Write([]byte(fmt.Sprintf("%s\n", request)))
if err != nil {
return false, fmt.Errorf("Failed to send request: %v", err)
}
var buf []byte = make([]byte, len(expect)+1)
err = con.SetDeadline(time.Now().Add(3 * time.Second))
if err != nil {
return false, fmt.Errorf("Failed to set deadline: %v", err)
}
_, err = con.Read(buf)
if err != nil {
return false, nil
}
if !strings.Contains(string(buf), expect) {
return false, fmt.Errorf("Failed to retrieve %q, got %q", expect, string(buf))
}
Logf("Successfully reached %v", uri)
return true, nil
}
func TestNotReachableUDP(ip string, port int, request string) (bool, error) {
ipPort := net.JoinHostPort(ip, strconv.Itoa(port))
uri := fmt.Sprintf("udp://%s", ipPort)
if ip == "" {
Failf("Got empty IP for reachability check (%s)", uri)
return false, nil
}
if port == 0 {
Failf("Got port==0 for reachability check (%s)", uri)
return false, nil
}
Logf("Testing UDP non-reachability of %v", uri)
con, err := net.Dial("udp", ipPort)
if err != nil {
Logf("Confirmed that %s is not reachable", uri)
return true, nil
}
_, err = con.Write([]byte(fmt.Sprintf("%s\n", request)))
if err != nil {
Logf("Confirmed that %s is not reachable", uri)
return true, nil
}
var buf []byte = make([]byte, 1)
err = con.SetDeadline(time.Now().Add(3 * time.Second))
if err != nil {
return false, fmt.Errorf("Failed to set deadline: %v", err)
}
_, err = con.Read(buf)
if err != nil {
Logf("Confirmed that %s is not reachable", uri)
return true, nil
}
return false, nil
ret.Status = UDPSuccess
Logf("Poke(%q): success", url)
return ret
}
func TestHitNodesFromOutside(externalIP string, httpPort int32, timeout time.Duration, expectedHosts sets.String) error {
@ -911,13 +976,12 @@ func TestHitNodesFromOutsideWithCount(externalIP string, httpPort int32, timeout
hittedHosts := sets.NewString()
count := 0
condition := func() (bool, error) {
var respBody bytes.Buffer
reached, err := TestReachableHTTPWithContentTimeout(externalIP, int(httpPort), "/hostname", "", &respBody,
1*time.Second)
if err != nil || !reached {
result := PokeHTTP(externalIP, int(httpPort), "/hostname", &HTTPPokeParams{Timeout: 1 * time.Second})
if result.Status != HTTPSuccess {
return false, nil
}
hittedHost := strings.TrimSpace(respBody.String())
hittedHost := strings.TrimSpace(string(result.Body))
if !expectedHosts.Has(hittedHost) {
Logf("Error hitting unexpected host: %v, reset counter: %v", hittedHost, count)
count = 0

View File

@ -876,9 +876,19 @@ func (j *ServiceTestJig) TestReachableHTTP(host string, port int, timeout time.D
}
func (j *ServiceTestJig) TestReachableHTTPWithRetriableErrorCodes(host string, port int, retriableErrCodes []int, timeout time.Duration) {
if err := wait.PollImmediate(Poll, timeout, func() (bool, error) {
return TestReachableHTTPWithRetriableErrorCodes(host, port, "/echo?msg=hello", "hello", retriableErrCodes)
}); err != nil {
pollfn := func() (bool, error) {
result := PokeHTTP(host, port, "/echo?msg=hello",
&HTTPPokeParams{
BodyContains: "hello",
RetriableCodes: retriableErrCodes,
})
if result.Status == HTTPSuccess {
return true, nil
}
return false, nil // caller can retry
}
if err := wait.PollImmediate(Poll, timeout, pollfn); err != nil {
if err == wait.ErrWaitTimeout {
Failf("Could not reach HTTP service through %v:%v after %v", host, port, timeout)
} else {
@ -888,36 +898,60 @@ func (j *ServiceTestJig) TestReachableHTTPWithRetriableErrorCodes(host string, p
}
func (j *ServiceTestJig) TestNotReachableHTTP(host string, port int, timeout time.Duration) {
if err := wait.PollImmediate(Poll, timeout, func() (bool, error) { return TestNotReachableHTTP(host, port) }); err != nil {
Failf("Could still reach HTTP service through %v:%v after %v: %v", host, port, timeout, err)
pollfn := func() (bool, error) {
result := PokeHTTP(host, port, "/", nil)
if result.Code == 0 {
return true, nil
}
return false, nil // caller can retry
}
if err := wait.PollImmediate(Poll, timeout, pollfn); err != nil {
Failf("HTTP service %v:%v reachable after %v: %v", host, port, timeout, err)
}
}
func (j *ServiceTestJig) TestReachableUDP(host string, port int, timeout time.Duration) {
if err := wait.PollImmediate(Poll, timeout, func() (bool, error) { return TestReachableUDP(host, port, "echo hello", "hello") }); err != nil {
pollfn := func() (bool, error) {
result := PokeUDP(host, port, "echo hello", &UDPPokeParams{
Timeout: 3 * time.Second,
Response: "hello",
})
if result.Status == UDPSuccess {
return true, nil
}
return false, nil // caller can retry
}
if err := wait.PollImmediate(Poll, timeout, pollfn); err != nil {
Failf("Could not reach UDP service through %v:%v after %v: %v", host, port, timeout, err)
}
}
func (j *ServiceTestJig) TestNotReachableUDP(host string, port int, timeout time.Duration) {
if err := wait.PollImmediate(Poll, timeout, func() (bool, error) { return TestNotReachableUDP(host, port, "echo hello") }); err != nil {
Failf("Could still reach UDP service through %v:%v after %v: %v", host, port, timeout, err)
pollfn := func() (bool, error) {
result := PokeUDP(host, port, "echo hello", &UDPPokeParams{Timeout: 3 * time.Second})
if result.Status != UDPSuccess && result.Status != UDPError {
return true, nil
}
return false, nil // caller can retry
}
if err := wait.PollImmediate(Poll, timeout, pollfn); err != nil {
Failf("UDP service %v:%v reachable after %v: %v", host, port, timeout, err)
}
}
func (j *ServiceTestJig) GetHTTPContent(host string, port int, timeout time.Duration, url string) bytes.Buffer {
var body bytes.Buffer
var err error
if pollErr := wait.PollImmediate(Poll, timeout, func() (bool, error) {
var result bool
result, err = TestReachableHTTPWithContent(host, port, url, "", &body)
if err != nil {
Logf("Error hitting %v:%v%v, retrying: %v", host, port, url, err)
return false, nil
result := PokeHTTP(host, port, url, nil)
if result.Status == HTTPSuccess {
body.Write(result.Body)
return true, nil
}
return result, nil
return false, nil
}); pollErr != nil {
Failf("Could not reach HTTP service through %v:%v%v after %v: %v", host, port, url, timeout, err)
Failf("Could not reach HTTP service through %v:%v%v after %v: %v", host, port, url, timeout, pollErr)
}
return body
}
@ -930,7 +964,7 @@ func testHTTPHealthCheckNodePort(ip string, port int, request string) (bool, err
return false, fmt.Errorf("Invalid input ip or port")
}
Logf("Testing HTTP health check on %v", url)
resp, err := httpGetNoConnectionPool(url)
resp, err := httpGetNoConnectionPoolTimeout(url, 5*time.Second)
if err != nil {
Logf("Got error testing for reachability of %s: %v", url, err)
return false, err

View File

@ -188,11 +188,11 @@ var _ = SIGDescribe("Firewall rule", func() {
})
func assertNotReachableHTTPTimeout(ip string, port int, timeout time.Duration) {
unreachable, err := framework.TestNotReachableHTTPTimeout(ip, port, timeout)
if err != nil {
framework.Failf("Unexpected error checking for reachability of %s:%d: %v", ip, port, err)
result := framework.PokeHTTP(ip, port, "/", &framework.HTTPPokeParams{Timeout: timeout})
if result.Status == framework.HTTPError {
framework.Failf("Unexpected error checking for reachability of %s:%d: %v", ip, port, result.Error)
}
if !unreachable {
if result.Code != 0 {
framework.Failf("Was unexpectedly able to reach %s:%d", ip, port)
}
}

View File

@ -2063,14 +2063,18 @@ var _ = SIGDescribe("ESIPP [Slow] [DisabledForLargeClusters]", func() {
for nodeName, nodeIPs := range endpointNodeMap {
By(fmt.Sprintf("checking kube-proxy health check fails on node with endpoint (%s), public IP %s", nodeName, nodeIPs[0]))
var body bytes.Buffer
var result bool
var err error
if pollErr := wait.PollImmediate(framework.Poll, framework.ServiceTestTimeout, func() (bool, error) {
result, err = framework.TestReachableHTTPWithContent(nodeIPs[0], healthCheckNodePort, "/healthz", "", &body)
return !result, nil
}); pollErr != nil {
framework.Failf("Kube-proxy still exposing health check on node %v:%v, after ESIPP was turned off. Last err %v, last body %v",
nodeName, healthCheckNodePort, err, body.String())
pollfn := func() (bool, error) {
result := framework.PokeHTTP(nodeIPs[0], healthCheckNodePort, "/healthz", nil)
if result.Code == 0 {
return true, nil
}
body.Reset()
body.Write(result.Body)
return false, nil
}
if pollErr := wait.PollImmediate(framework.Poll, framework.ServiceTestTimeout, pollfn); pollErr != nil {
framework.Failf("Kube-proxy still exposing health check on node %v:%v, after ESIPP was turned off. body %s",
nodeName, healthCheckNodePort, body.String())
}
}