diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/filters/read_write_deadline_test.go b/staging/src/k8s.io/apiserver/pkg/endpoints/filters/read_write_deadline_test.go index 656d04302d4..4ce42ec71d2 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/filters/read_write_deadline_test.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/filters/read_write_deadline_test.go @@ -33,7 +33,12 @@ import ( "k8s.io/apimachinery/pkg/util/wait" ) -func TestPerRequestWriteDeadlineWithWriteShouldReturnErrorAfterTimeout(t *testing.T) { +// NOTE: The following tests assert on low-level error returned from net/http, if +// a new version of Go changes the error type returned, then some of these tests +// might fail during GoLang update, we can fix the broken test(s) by changing the +// wanted error in the test setup to the new observed error. + +func TestPerRequestWithWriteEventuallyReturnsDeadlineError(t *testing.T) { // This test documents the behavior of the per request write deadline // using a standard net http server. // @@ -46,6 +51,17 @@ func TestPerRequestWriteDeadlineWithWriteShouldReturnErrorAfterTimeout(t *testin // - d) client: expected to receive an error from the server // - e) server: the Write method of the ResponseWriter object should // return an "i/o timeout" error once its internal buffer is full + // + // The size of the internal buffer is 4kB bytes: + // + // http/1x: the net.Conn is written to by a buffered Writer + // (*bufio.Writer) of default size of 4kB bytes: + // a) https://github.com/golang/go/blob/ffb3e574012ce9d3d5193d7b8df135189b8a6671/src/net/http/server.go#L1650 + // b) https://github.com/golang/go/blob/ffb3e574012ce9d3d5193d7b8df135189b8a6671/src/net/http/server.go#L2014 + // + // http/2.0: a buffered writer (4kB bytes) writes to the unerlying net.Conn + // a) https://github.com/golang/go/blob/ffb3e574012ce9d3d5193d7b8df135189b8a6671/src/net/http/h2_bundle.go#L3568 + // b) https://github.com/golang/go/blob/ffb3e574012ce9d3d5193d7b8df135189b8a6671/src/net/http/h2_bundle.go#L4279 t.Parallel() const deadline = 100 * time.Millisecond @@ -57,6 +73,10 @@ func TestPerRequestWriteDeadlineWithWriteShouldReturnErrorAfterTimeout(t *testin }{ { protoMajor: 1, // http/1x + // NOTE: we can't use channel based waiiter for http/1x, + // since the request handler is executed in the same + // goroutine as the connection serving goroutine + // https://github.com/kubernetes/enhancements/tree/master/keps/sig-api-machinery/4460-per-request-deadline#client-hanging-indefinitely // write timeout is set to 100ms, a wait of 5s should be // enough to withstand flakes in CI. waiter: &waitWithDuration{after: 5 * time.Second}, @@ -65,7 +85,7 @@ func TestPerRequestWriteDeadlineWithWriteShouldReturnErrorAfterTimeout(t *testin }, { protoMajor: 2, // http/2.0 - waiter: &waitWithChannelClose{after: make(chan time.Time)}, + waiter: &waitForClose{after: make(chan time.Time)}, clientErrVerifier: wantContains{"stream error: stream ID 1; INTERNAL_ERROR; received from peer"}, handlerErrVerifier: wantError{err: os.ErrDeadlineExceeded}, }, @@ -90,8 +110,8 @@ func TestPerRequestWriteDeadlineWithWriteShouldReturnErrorAfterTimeout(t *testin // b) wait until the write deadline exceeds <-test.waiter.wait() - // c) keep writing 1kB of data at a time until - // Write returns an error + // c) keep writing 1kB of data at a time, Write + // will eventually return an i/o timeout error func() { now := time.Now() written := 0 @@ -122,6 +142,9 @@ func TestPerRequestWriteDeadlineWithWriteShouldReturnErrorAfterTimeout(t *testin _, err := client.Get(server.URL) // d) verify that the client receives the appropriate error + // the client should not see a response body since the timeout + // exceeded before the handler returned, and the handler + // never invoked flush before the timeout. test.clientErrVerifier.verify(t, err) }() @@ -132,7 +155,7 @@ func TestPerRequestWriteDeadlineWithWriteShouldReturnErrorAfterTimeout(t *testin } } -func TestPerRequestWriteDeadlineWithTimeoutOccursBeforeHandlerWrites(t *testing.T) { +func TestPerRequestWithFlushReturnsErrorAfterDeadline(t *testing.T) { // This test documents the behavior of the per request write deadline // using a standard net http server. // @@ -166,7 +189,7 @@ func TestPerRequestWriteDeadlineWithTimeoutOccursBeforeHandlerWrites(t *testing. }, { protoMajor: 2, - waiter: &waitWithChannelClose{after: make(chan time.Time)}, + waiter: &waitForClose{after: make(chan time.Time)}, clientErrVerifier: wantContains{"stream error: stream ID 1; INTERNAL_ERROR; received from peer"}, handlerErrVerifier: wantError{err: os.ErrDeadlineExceeded}, }, @@ -198,6 +221,8 @@ func TestPerRequestWriteDeadlineWithTimeoutOccursBeforeHandlerWrites(t *testing. // c) write a few bytes so there is something in the buffer to // flush, the client should never see these bytes in the response. + // NOTE: since the intenal buffered Writer has a size of 4kB, this + // Write operation should not cause an actual write to the net.Conn if _, err := w.Write([]byte("hello")); err != nil { t.Errorf("server: unexpected error from Write after timeout: %v", err) return @@ -235,7 +260,7 @@ func TestPerRequestWriteDeadlineWithTimeoutOccursBeforeHandlerWrites(t *testing. } } -func TestPerRequestWriteDeadlineWithTimeoutOccursAfterHandlerWrites(t *testing.T) { +func TestPerRequestWithClientNeverReceivesContentFlushedAfterDeadline(t *testing.T) { // This test documents the behavior of the per handler write // deadline using a standard net http server. // @@ -269,7 +294,7 @@ func TestPerRequestWriteDeadlineWithTimeoutOccursAfterHandlerWrites(t *testing.T }, { protoMajor: 2, - waiter: &waitWithChannelClose{after: make(chan time.Time)}, + waiter: &waitForClose{after: make(chan time.Time)}, clientErrVerifier: wantContains{"stream error: stream ID 1; INTERNAL_ERROR; received from peer"}, handlerErrVerifier: wantError{err: os.ErrDeadlineExceeded}, }, @@ -323,6 +348,9 @@ func TestPerRequestWriteDeadlineWithTimeoutOccursAfterHandlerWrites(t *testing.T _, err := server.Client().Get(server.URL) // f) verify that the client receives the appropriate error + // NOTE: due to 'a' (the handler wrote but did not flush), and + // since the handler did not return before the timeout, the + // client will not receive an http.Response with the data written. test.clientErrVerifier.verify(t, err) }() @@ -333,7 +361,7 @@ func TestPerRequestWriteDeadlineWithTimeoutOccursAfterHandlerWrites(t *testing.T } } -func TestPerRequestWriteDeadlineWithTimeoutOccursAfterHandlerFlushes(t *testing.T) { +func TestPerRequestWithClientReceivesContentFlushedBeforeDeadline(t *testing.T) { // This test documents the behavior of the per handler write // deadline using a standard net http server. // @@ -372,7 +400,7 @@ func TestPerRequestWriteDeadlineWithTimeoutOccursAfterHandlerFlushes(t *testing. }, { protoMajor: 2, - waiter: &waitWithChannelClose{after: make(chan time.Time)}, + waiter: &waitForClose{after: make(chan time.Time)}, clientErrVerifier: wantContains{"stream error: stream ID 1; INTERNAL_ERROR; received from peer"}, handlerErrVerifier: wantError{err: os.ErrDeadlineExceeded}, }, @@ -398,7 +426,7 @@ func TestPerRequestWriteDeadlineWithTimeoutOccursAfterHandlerFlushes(t *testing. t.Errorf("server: unexpected error from Write: %v", err) return } - // b) flush the payload in the buffer + // b) flush the payload that has been written to the buffer if err := flusher.FlushError(); err != nil { t.Errorf("server: unexpected error from FlushError: %v", err) return @@ -415,7 +443,8 @@ func TestPerRequestWriteDeadlineWithTimeoutOccursAfterHandlerFlushes(t *testing. <-test.waiter.wait() // e) write message 2 (the client should never - // see the following message) + // see the following message, and the test + // verifies that in step g) if _, err := w.Write([]byte(msg2)); err != nil { t.Errorf("server: unexpected error from Write after timeout: %v", err) } @@ -465,7 +494,7 @@ func TestPerRequestWriteDeadlineWithTimeoutOccursAfterHandlerFlushes(t *testing. } } -func TestPerRequestWriteDeadlineWithHandlerWritingIndefinitely(t *testing.T) { +func TestPerRequestWithHandlerShouldAbortWriteAfterDeadline(t *testing.T) { // This test documents the behavior of the per handler write // deadline using a standard net http server. // @@ -481,7 +510,7 @@ func TestPerRequestWriteDeadlineWithHandlerWritingIndefinitely(t *testing.T) { // expected to terminate as a result t.Parallel() - const deadline = 300 * time.Millisecond + const deadline = 100 * time.Millisecond tests := []struct { protoMajor int clientErrVerifier verifier @@ -557,7 +586,7 @@ func TestPerRequestWriteDeadlineWithHandlerWritingIndefinitely(t *testing.T) { } } -func TestPerRequestReadDeadlineWithClientNotWritingToRequestBody(t *testing.T) { +func TestPerRequestWithBodyReadShouldTimeoutAfterDeadline(t *testing.T) { // This test documents the behavior of the per handler read // deadline using a standard net http server. // @@ -574,22 +603,17 @@ func TestPerRequestReadDeadlineWithClientNotWritingToRequestBody(t *testing.T) { // an "i/o timeout" error t.Parallel() - const deadline = 500 * time.Millisecond + const deadline = 100 * time.Millisecond tests := []struct { protoMajor int - waiter waiter handlerErrVerifier verifier }{ { - protoMajor: 1, - // write timeout is set to 500ms, a wait of 5s should - // be enough to withstand flakes in CI. - waiter: &waitWithDuration{after: 5 * time.Second}, + protoMajor: 1, handlerErrVerifier: wantError{err: os.ErrDeadlineExceeded}, }, { protoMajor: 2, - waiter: &waitWithChannelClose{after: make(chan time.Time)}, handlerErrVerifier: wantError{err: os.ErrDeadlineExceeded}, }, } @@ -675,7 +699,7 @@ func TestPerRequestReadDeadlineWithClientNotWritingToRequestBody(t *testing.T) { } } -func TestPerRequestReadDeadlineWithTimeoutOccursWhileClientIsSendingContent(t *testing.T) { +func TestPerRequestWithBodyReadShouldYieldPartialContentBeforeDeadline(t *testing.T) { // This test documents the behavior of the per request read // deadline using a standard net http server. // @@ -696,7 +720,7 @@ func TestPerRequestReadDeadlineWithTimeoutOccursWhileClientIsSendingContent(t *t // the Body of the request t.Parallel() - const deadline = 300 * time.Millisecond + const deadline = 100 * time.Millisecond tests := []struct { protoMajor int handlerErrVerifier verifier @@ -808,7 +832,7 @@ func TestPerRequestReadDeadlineWithTimeoutOccursWhileClientIsSendingContent(t *t } } -func TestPerRequestReadDeadlineWithNoRequestBody(t *testing.T) { +func TestPerRequestWithReadingEmptyBodyShouldNotYieldErrorAfterDeadline(t *testing.T) { // This test documents the behavior of the per request read // deadline using a standard net http server. // @@ -897,7 +921,7 @@ func TestPerRequestReadDeadlineWithNoRequestBody(t *testing.T) { } -func TestPerRequestReadWriteDeadlineWithHijack(t *testing.T) { +func TestPerRequestWithHijackedConnectionShouldResetDeadline(t *testing.T) { // This test documents the behavior of the per handler read/write // deadline using a standard net http server. // @@ -984,7 +1008,7 @@ func TestPerRequestReadWriteDeadlineWithHijack(t *testing.T) { wantNoError{}.verify(t, err) } -func TestPerRequestWriteDeadlineWithConnectionReuse(t *testing.T) { +func TestPerRequestWithConnectionIsReused(t *testing.T) { // This test documents the behavior of the per request write deadline // using a standard net http server. // @@ -1020,7 +1044,7 @@ func TestPerRequestWriteDeadlineWithConnectionReuse(t *testing.T) { }, { protoMajor: 2, // http/2.0 - waiter: &waitWithChannelClose{after: make(chan time.Time)}, + waiter: &waitForClose{after: make(chan time.Time)}, clientErrVerifier: wantContains{"stream error: stream ID 1; INTERNAL_ERROR; received from peer"}, handlerErrVerifier: wantError{err: os.ErrDeadlineExceeded}, connReuseFn: shouldUseExistingConnection, @@ -1125,7 +1149,7 @@ func TestPerRequestWriteDeadlineWithConnectionReuse(t *testing.T) { } } -func TestPerRequestWriteDeadlineWithSlowReader(t *testing.T) { +func TestPerRequestWithSlowReader(t *testing.T) { // This test documents the behavior of the per handler write // deadline using a standard net http server. // @@ -1339,6 +1363,9 @@ type waiter interface { close() } +// sleep based waiter implementation, the request handler sleeps for certain +// duration before it returns, we need to choose the sleep duration wisely +// in order to avoid flakes in CI. type waitWithDuration struct { after time.Duration } @@ -1346,22 +1373,33 @@ type waitWithDuration struct { func (w waitWithDuration) wait() <-chan time.Time { return time.After(w.after) } func (w waitWithDuration) close() {} -type waitWithChannelClose struct { +// channel based waiter implementation, the request handler waits on a channel +// to close, these are the steps: +// a) the client sends a request to the http/2.0 server +// a) the request handler sets per-request write timeout, and then +// b) the request handler blocks indefinitely on this channel to close +// c) write timeout elapses, and http/2.0 server asynchronously resets the stream +// d) the client receives a stream reset error immediately +// after the write timeout occurs. +// e) the client then closes this channel +// f) the request handler unblocks and terminates +// +// This waiter can be used for http/2.0 only, since the request handler executes +// on a separate goroutine than the tcp connection serving gorutine, this allows +// the connection serving loop to asynchronously reset the http2 stream. On the +// other hand, http/1x executes the request handler in the same goroutine as the +// connection serving goroutine, this forces the connection serving goroutine to +// wait for he handler to return. +// See https://github.com/golang/go/blob/b8ac61e6e64c92f23d8cf868a92a70d13e20a124/src/net/http/server.go#L3285 +type waitForClose struct { after chan time.Time } -func (w waitWithChannelClose) wait() <-chan time.Time { - // for http/2, we do the following: - // a) let the handler block indefinitely - // b) this forces the write timeout to occur on the server side - // c) the http2 client receives a stream reset error immediately - // after the write timeout occurs. - // d) the client then closes the channel by calling close - // e) the handler unblocks and terminates +func (w waitForClose) wait() <-chan time.Time { return w.after } -func (w waitWithChannelClose) close() { close(w.after) } +func (w waitForClose) close() { close(w.after) } type chanErr chan error