mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-17 23:57:49 +00:00
fixup! add test to document behavior of net/http read/write deadline
This commit is contained in:
parent
2abe3a5dfa
commit
f91cdf768d
@ -33,7 +33,12 @@ import (
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
)
|
||||
|
||||
func TestPerRequestWriteDeadlineWithWriteShouldReturnErrorAfterTimeout(t *testing.T) {
|
||||
// NOTE: The following tests assert on low-level error returned from net/http, if
|
||||
// a new version of Go changes the error type returned, then some of these tests
|
||||
// might fail during GoLang update, we can fix the broken test(s) by changing the
|
||||
// wanted error in the test setup to the new observed error.
|
||||
|
||||
func TestPerRequestWithWriteEventuallyReturnsDeadlineError(t *testing.T) {
|
||||
// This test documents the behavior of the per request write deadline
|
||||
// using a standard net http server.
|
||||
//
|
||||
@ -46,6 +51,17 @@ func TestPerRequestWriteDeadlineWithWriteShouldReturnErrorAfterTimeout(t *testin
|
||||
// - d) client: expected to receive an error from the server
|
||||
// - e) server: the Write method of the ResponseWriter object should
|
||||
// return an "i/o timeout" error once its internal buffer is full
|
||||
//
|
||||
// The size of the internal buffer is 4kB bytes:
|
||||
//
|
||||
// http/1x: the net.Conn is written to by a buffered Writer
|
||||
// (*bufio.Writer) of default size of 4kB bytes:
|
||||
// a) https://github.com/golang/go/blob/ffb3e574012ce9d3d5193d7b8df135189b8a6671/src/net/http/server.go#L1650
|
||||
// b) https://github.com/golang/go/blob/ffb3e574012ce9d3d5193d7b8df135189b8a6671/src/net/http/server.go#L2014
|
||||
//
|
||||
// http/2.0: a buffered writer (4kB bytes) writes to the unerlying net.Conn
|
||||
// a) https://github.com/golang/go/blob/ffb3e574012ce9d3d5193d7b8df135189b8a6671/src/net/http/h2_bundle.go#L3568
|
||||
// b) https://github.com/golang/go/blob/ffb3e574012ce9d3d5193d7b8df135189b8a6671/src/net/http/h2_bundle.go#L4279
|
||||
t.Parallel()
|
||||
|
||||
const deadline = 100 * time.Millisecond
|
||||
@ -57,6 +73,10 @@ func TestPerRequestWriteDeadlineWithWriteShouldReturnErrorAfterTimeout(t *testin
|
||||
}{
|
||||
{
|
||||
protoMajor: 1, // http/1x
|
||||
// NOTE: we can't use channel based waiiter for http/1x,
|
||||
// since the request handler is executed in the same
|
||||
// goroutine as the connection serving goroutine
|
||||
// https://github.com/kubernetes/enhancements/tree/master/keps/sig-api-machinery/4460-per-request-deadline#client-hanging-indefinitely
|
||||
// write timeout is set to 100ms, a wait of 5s should be
|
||||
// enough to withstand flakes in CI.
|
||||
waiter: &waitWithDuration{after: 5 * time.Second},
|
||||
@ -65,7 +85,7 @@ func TestPerRequestWriteDeadlineWithWriteShouldReturnErrorAfterTimeout(t *testin
|
||||
},
|
||||
{
|
||||
protoMajor: 2, // http/2.0
|
||||
waiter: &waitWithChannelClose{after: make(chan time.Time)},
|
||||
waiter: &waitForClose{after: make(chan time.Time)},
|
||||
clientErrVerifier: wantContains{"stream error: stream ID 1; INTERNAL_ERROR; received from peer"},
|
||||
handlerErrVerifier: wantError{err: os.ErrDeadlineExceeded},
|
||||
},
|
||||
@ -90,8 +110,8 @@ func TestPerRequestWriteDeadlineWithWriteShouldReturnErrorAfterTimeout(t *testin
|
||||
// b) wait until the write deadline exceeds
|
||||
<-test.waiter.wait()
|
||||
|
||||
// c) keep writing 1kB of data at a time until
|
||||
// Write returns an error
|
||||
// c) keep writing 1kB of data at a time, Write
|
||||
// will eventually return an i/o timeout error
|
||||
func() {
|
||||
now := time.Now()
|
||||
written := 0
|
||||
@ -122,6 +142,9 @@ func TestPerRequestWriteDeadlineWithWriteShouldReturnErrorAfterTimeout(t *testin
|
||||
|
||||
_, err := client.Get(server.URL)
|
||||
// d) verify that the client receives the appropriate error
|
||||
// the client should not see a response body since the timeout
|
||||
// exceeded before the handler returned, and the handler
|
||||
// never invoked flush before the timeout.
|
||||
test.clientErrVerifier.verify(t, err)
|
||||
}()
|
||||
|
||||
@ -132,7 +155,7 @@ func TestPerRequestWriteDeadlineWithWriteShouldReturnErrorAfterTimeout(t *testin
|
||||
}
|
||||
}
|
||||
|
||||
func TestPerRequestWriteDeadlineWithTimeoutOccursBeforeHandlerWrites(t *testing.T) {
|
||||
func TestPerRequestWithFlushReturnsErrorAfterDeadline(t *testing.T) {
|
||||
// This test documents the behavior of the per request write deadline
|
||||
// using a standard net http server.
|
||||
//
|
||||
@ -166,7 +189,7 @@ func TestPerRequestWriteDeadlineWithTimeoutOccursBeforeHandlerWrites(t *testing.
|
||||
},
|
||||
{
|
||||
protoMajor: 2,
|
||||
waiter: &waitWithChannelClose{after: make(chan time.Time)},
|
||||
waiter: &waitForClose{after: make(chan time.Time)},
|
||||
clientErrVerifier: wantContains{"stream error: stream ID 1; INTERNAL_ERROR; received from peer"},
|
||||
handlerErrVerifier: wantError{err: os.ErrDeadlineExceeded},
|
||||
},
|
||||
@ -198,6 +221,8 @@ func TestPerRequestWriteDeadlineWithTimeoutOccursBeforeHandlerWrites(t *testing.
|
||||
|
||||
// c) write a few bytes so there is something in the buffer to
|
||||
// flush, the client should never see these bytes in the response.
|
||||
// NOTE: since the intenal buffered Writer has a size of 4kB, this
|
||||
// Write operation should not cause an actual write to the net.Conn
|
||||
if _, err := w.Write([]byte("hello")); err != nil {
|
||||
t.Errorf("server: unexpected error from Write after timeout: %v", err)
|
||||
return
|
||||
@ -235,7 +260,7 @@ func TestPerRequestWriteDeadlineWithTimeoutOccursBeforeHandlerWrites(t *testing.
|
||||
}
|
||||
}
|
||||
|
||||
func TestPerRequestWriteDeadlineWithTimeoutOccursAfterHandlerWrites(t *testing.T) {
|
||||
func TestPerRequestWithClientNeverReceivesContentFlushedAfterDeadline(t *testing.T) {
|
||||
// This test documents the behavior of the per handler write
|
||||
// deadline using a standard net http server.
|
||||
//
|
||||
@ -269,7 +294,7 @@ func TestPerRequestWriteDeadlineWithTimeoutOccursAfterHandlerWrites(t *testing.T
|
||||
},
|
||||
{
|
||||
protoMajor: 2,
|
||||
waiter: &waitWithChannelClose{after: make(chan time.Time)},
|
||||
waiter: &waitForClose{after: make(chan time.Time)},
|
||||
clientErrVerifier: wantContains{"stream error: stream ID 1; INTERNAL_ERROR; received from peer"},
|
||||
handlerErrVerifier: wantError{err: os.ErrDeadlineExceeded},
|
||||
},
|
||||
@ -323,6 +348,9 @@ func TestPerRequestWriteDeadlineWithTimeoutOccursAfterHandlerWrites(t *testing.T
|
||||
|
||||
_, err := server.Client().Get(server.URL)
|
||||
// f) verify that the client receives the appropriate error
|
||||
// NOTE: due to 'a' (the handler wrote but did not flush), and
|
||||
// since the handler did not return before the timeout, the
|
||||
// client will not receive an http.Response with the data written.
|
||||
test.clientErrVerifier.verify(t, err)
|
||||
}()
|
||||
|
||||
@ -333,7 +361,7 @@ func TestPerRequestWriteDeadlineWithTimeoutOccursAfterHandlerWrites(t *testing.T
|
||||
}
|
||||
}
|
||||
|
||||
func TestPerRequestWriteDeadlineWithTimeoutOccursAfterHandlerFlushes(t *testing.T) {
|
||||
func TestPerRequestWithClientReceivesContentFlushedBeforeDeadline(t *testing.T) {
|
||||
// This test documents the behavior of the per handler write
|
||||
// deadline using a standard net http server.
|
||||
//
|
||||
@ -372,7 +400,7 @@ func TestPerRequestWriteDeadlineWithTimeoutOccursAfterHandlerFlushes(t *testing.
|
||||
},
|
||||
{
|
||||
protoMajor: 2,
|
||||
waiter: &waitWithChannelClose{after: make(chan time.Time)},
|
||||
waiter: &waitForClose{after: make(chan time.Time)},
|
||||
clientErrVerifier: wantContains{"stream error: stream ID 1; INTERNAL_ERROR; received from peer"},
|
||||
handlerErrVerifier: wantError{err: os.ErrDeadlineExceeded},
|
||||
},
|
||||
@ -398,7 +426,7 @@ func TestPerRequestWriteDeadlineWithTimeoutOccursAfterHandlerFlushes(t *testing.
|
||||
t.Errorf("server: unexpected error from Write: %v", err)
|
||||
return
|
||||
}
|
||||
// b) flush the payload in the buffer
|
||||
// b) flush the payload that has been written to the buffer
|
||||
if err := flusher.FlushError(); err != nil {
|
||||
t.Errorf("server: unexpected error from FlushError: %v", err)
|
||||
return
|
||||
@ -415,7 +443,8 @@ func TestPerRequestWriteDeadlineWithTimeoutOccursAfterHandlerFlushes(t *testing.
|
||||
<-test.waiter.wait()
|
||||
|
||||
// e) write message 2 (the client should never
|
||||
// see the following message)
|
||||
// see the following message, and the test
|
||||
// verifies that in step g)
|
||||
if _, err := w.Write([]byte(msg2)); err != nil {
|
||||
t.Errorf("server: unexpected error from Write after timeout: %v", err)
|
||||
}
|
||||
@ -465,7 +494,7 @@ func TestPerRequestWriteDeadlineWithTimeoutOccursAfterHandlerFlushes(t *testing.
|
||||
}
|
||||
}
|
||||
|
||||
func TestPerRequestWriteDeadlineWithHandlerWritingIndefinitely(t *testing.T) {
|
||||
func TestPerRequestWithHandlerShouldAbortWriteAfterDeadline(t *testing.T) {
|
||||
// This test documents the behavior of the per handler write
|
||||
// deadline using a standard net http server.
|
||||
//
|
||||
@ -481,7 +510,7 @@ func TestPerRequestWriteDeadlineWithHandlerWritingIndefinitely(t *testing.T) {
|
||||
// expected to terminate as a result
|
||||
t.Parallel()
|
||||
|
||||
const deadline = 300 * time.Millisecond
|
||||
const deadline = 100 * time.Millisecond
|
||||
tests := []struct {
|
||||
protoMajor int
|
||||
clientErrVerifier verifier
|
||||
@ -557,7 +586,7 @@ func TestPerRequestWriteDeadlineWithHandlerWritingIndefinitely(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestPerRequestReadDeadlineWithClientNotWritingToRequestBody(t *testing.T) {
|
||||
func TestPerRequestWithBodyReadShouldTimeoutAfterDeadline(t *testing.T) {
|
||||
// This test documents the behavior of the per handler read
|
||||
// deadline using a standard net http server.
|
||||
//
|
||||
@ -574,22 +603,17 @@ func TestPerRequestReadDeadlineWithClientNotWritingToRequestBody(t *testing.T) {
|
||||
// an "i/o timeout" error
|
||||
t.Parallel()
|
||||
|
||||
const deadline = 500 * time.Millisecond
|
||||
const deadline = 100 * time.Millisecond
|
||||
tests := []struct {
|
||||
protoMajor int
|
||||
waiter waiter
|
||||
handlerErrVerifier verifier
|
||||
}{
|
||||
{
|
||||
protoMajor: 1,
|
||||
// write timeout is set to 500ms, a wait of 5s should
|
||||
// be enough to withstand flakes in CI.
|
||||
waiter: &waitWithDuration{after: 5 * time.Second},
|
||||
protoMajor: 1,
|
||||
handlerErrVerifier: wantError{err: os.ErrDeadlineExceeded},
|
||||
},
|
||||
{
|
||||
protoMajor: 2,
|
||||
waiter: &waitWithChannelClose{after: make(chan time.Time)},
|
||||
handlerErrVerifier: wantError{err: os.ErrDeadlineExceeded},
|
||||
},
|
||||
}
|
||||
@ -675,7 +699,7 @@ func TestPerRequestReadDeadlineWithClientNotWritingToRequestBody(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestPerRequestReadDeadlineWithTimeoutOccursWhileClientIsSendingContent(t *testing.T) {
|
||||
func TestPerRequestWithBodyReadShouldYieldPartialContentBeforeDeadline(t *testing.T) {
|
||||
// This test documents the behavior of the per request read
|
||||
// deadline using a standard net http server.
|
||||
//
|
||||
@ -696,7 +720,7 @@ func TestPerRequestReadDeadlineWithTimeoutOccursWhileClientIsSendingContent(t *t
|
||||
// the Body of the request
|
||||
t.Parallel()
|
||||
|
||||
const deadline = 300 * time.Millisecond
|
||||
const deadline = 100 * time.Millisecond
|
||||
tests := []struct {
|
||||
protoMajor int
|
||||
handlerErrVerifier verifier
|
||||
@ -808,7 +832,7 @@ func TestPerRequestReadDeadlineWithTimeoutOccursWhileClientIsSendingContent(t *t
|
||||
}
|
||||
}
|
||||
|
||||
func TestPerRequestReadDeadlineWithNoRequestBody(t *testing.T) {
|
||||
func TestPerRequestWithReadingEmptyBodyShouldNotYieldErrorAfterDeadline(t *testing.T) {
|
||||
// This test documents the behavior of the per request read
|
||||
// deadline using a standard net http server.
|
||||
//
|
||||
@ -897,7 +921,7 @@ func TestPerRequestReadDeadlineWithNoRequestBody(t *testing.T) {
|
||||
|
||||
}
|
||||
|
||||
func TestPerRequestReadWriteDeadlineWithHijack(t *testing.T) {
|
||||
func TestPerRequestWithHijackedConnectionShouldResetDeadline(t *testing.T) {
|
||||
// This test documents the behavior of the per handler read/write
|
||||
// deadline using a standard net http server.
|
||||
//
|
||||
@ -984,7 +1008,7 @@ func TestPerRequestReadWriteDeadlineWithHijack(t *testing.T) {
|
||||
wantNoError{}.verify(t, err)
|
||||
}
|
||||
|
||||
func TestPerRequestWriteDeadlineWithConnectionReuse(t *testing.T) {
|
||||
func TestPerRequestWithConnectionIsReused(t *testing.T) {
|
||||
// This test documents the behavior of the per request write deadline
|
||||
// using a standard net http server.
|
||||
//
|
||||
@ -1020,7 +1044,7 @@ func TestPerRequestWriteDeadlineWithConnectionReuse(t *testing.T) {
|
||||
},
|
||||
{
|
||||
protoMajor: 2, // http/2.0
|
||||
waiter: &waitWithChannelClose{after: make(chan time.Time)},
|
||||
waiter: &waitForClose{after: make(chan time.Time)},
|
||||
clientErrVerifier: wantContains{"stream error: stream ID 1; INTERNAL_ERROR; received from peer"},
|
||||
handlerErrVerifier: wantError{err: os.ErrDeadlineExceeded},
|
||||
connReuseFn: shouldUseExistingConnection,
|
||||
@ -1125,7 +1149,7 @@ func TestPerRequestWriteDeadlineWithConnectionReuse(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestPerRequestWriteDeadlineWithSlowReader(t *testing.T) {
|
||||
func TestPerRequestWithSlowReader(t *testing.T) {
|
||||
// This test documents the behavior of the per handler write
|
||||
// deadline using a standard net http server.
|
||||
//
|
||||
@ -1339,6 +1363,9 @@ type waiter interface {
|
||||
close()
|
||||
}
|
||||
|
||||
// sleep based waiter implementation, the request handler sleeps for certain
|
||||
// duration before it returns, we need to choose the sleep duration wisely
|
||||
// in order to avoid flakes in CI.
|
||||
type waitWithDuration struct {
|
||||
after time.Duration
|
||||
}
|
||||
@ -1346,22 +1373,33 @@ type waitWithDuration struct {
|
||||
func (w waitWithDuration) wait() <-chan time.Time { return time.After(w.after) }
|
||||
func (w waitWithDuration) close() {}
|
||||
|
||||
type waitWithChannelClose struct {
|
||||
// channel based waiter implementation, the request handler waits on a channel
|
||||
// to close, these are the steps:
|
||||
// a) the client sends a request to the http/2.0 server
|
||||
// a) the request handler sets per-request write timeout, and then
|
||||
// b) the request handler blocks indefinitely on this channel to close
|
||||
// c) write timeout elapses, and http/2.0 server asynchronously resets the stream
|
||||
// d) the client receives a stream reset error immediately
|
||||
// after the write timeout occurs.
|
||||
// e) the client then closes this channel
|
||||
// f) the request handler unblocks and terminates
|
||||
//
|
||||
// This waiter can be used for http/2.0 only, since the request handler executes
|
||||
// on a separate goroutine than the tcp connection serving gorutine, this allows
|
||||
// the connection serving loop to asynchronously reset the http2 stream. On the
|
||||
// other hand, http/1x executes the request handler in the same goroutine as the
|
||||
// connection serving goroutine, this forces the connection serving goroutine to
|
||||
// wait for he handler to return.
|
||||
// See https://github.com/golang/go/blob/b8ac61e6e64c92f23d8cf868a92a70d13e20a124/src/net/http/server.go#L3285
|
||||
type waitForClose struct {
|
||||
after chan time.Time
|
||||
}
|
||||
|
||||
func (w waitWithChannelClose) wait() <-chan time.Time {
|
||||
// for http/2, we do the following:
|
||||
// a) let the handler block indefinitely
|
||||
// b) this forces the write timeout to occur on the server side
|
||||
// c) the http2 client receives a stream reset error immediately
|
||||
// after the write timeout occurs.
|
||||
// d) the client then closes the channel by calling close
|
||||
// e) the handler unblocks and terminates
|
||||
func (w waitForClose) wait() <-chan time.Time {
|
||||
return w.after
|
||||
}
|
||||
|
||||
func (w waitWithChannelClose) close() { close(w.after) }
|
||||
func (w waitForClose) close() { close(w.after) }
|
||||
|
||||
type chanErr chan error
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user