diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/responsewriters/writers.go b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/responsewriters/writers.go index b7c59cfc54d..65cb389e517 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/responsewriters/writers.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/responsewriters/writers.go @@ -96,9 +96,11 @@ func SerializeObject(mediaType string, encoder runtime.Encoder, hw http.Response err := encoder.Encode(object, w) if err == nil { err = w.Close() - if err == nil { - return + if err != nil { + // we cannot write an error to the writer anymore as the Encode call was successful. + utilruntime.HandleError(fmt.Errorf("apiserver was unable to close cleanly the response writer: %v", err)) } + return } // make a best effort to write the object if a failure is detected diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/responsewriters/writers_test.go b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/responsewriters/writers_test.go index 624b6becdff..ace0f0a7096 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/responsewriters/writers_test.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/responsewriters/writers_test.go @@ -20,6 +20,7 @@ import ( "bytes" "compress/gzip" "encoding/hex" + "errors" "fmt" "io" "io/ioutil" @@ -28,7 +29,7 @@ import ( "reflect" "testing" - "k8s.io/apimachinery/pkg/api/errors" + kerrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/diff" @@ -37,6 +38,76 @@ import ( featuregatetesting "k8s.io/component-base/featuregate/testing" ) +func TestSerializeObjectParallel(t *testing.T) { + largePayload := bytes.Repeat([]byte("0123456789abcdef"), defaultGzipThresholdBytes/16+1) + type test struct { + name string + + compressionEnabled bool + + mediaType string + out []byte + outErrs []error + req *http.Request + statusCode int + object runtime.Object + + wantCode int + wantHeaders http.Header + wantBody []byte + } + newTest := func() test { + return test{ + name: "compress on gzip", + compressionEnabled: true, + out: largePayload, + mediaType: "application/json", + req: &http.Request{Header: http.Header{ + "Accept-Encoding": []string{"gzip"}, + }}, + wantCode: http.StatusOK, + wantHeaders: http.Header{ + "Content-Type": []string{"application/json"}, + "Content-Encoding": []string{"gzip"}, + "Vary": []string{"Accept-Encoding"}, + }, + } + } + for i := 0; i < 100; i++ { + ctt := newTest() + t.Run(ctt.name, func(t *testing.T) { + defer func() { + if r := recover(); r != nil { + t.Fatalf("recovered from err %v", r) + } + }() + t.Parallel() + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.APIResponseCompression, ctt.compressionEnabled)() + + encoder := &fakeEncoder{ + buf: ctt.out, + errs: ctt.outErrs, + } + if ctt.statusCode == 0 { + ctt.statusCode = http.StatusOK + } + recorder := &fakeResponseRecorder{ + ResponseRecorder: httptest.NewRecorder(), + fe: encoder, + errorAfterEncoding: true, + } + SerializeObject(ctt.mediaType, encoder, recorder, ctt.req, ctt.statusCode, ctt.object) + result := recorder.Result() + if result.StatusCode != ctt.wantCode { + t.Fatalf("unexpected code: %v", result.StatusCode) + } + if !reflect.DeepEqual(result.Header, ctt.wantHeaders) { + t.Fatal(diff.ObjectReflectDiff(ctt.wantHeaders, result.Header)) + } + }) + } +} + func TestSerializeObject(t *testing.T) { smallPayload := []byte("{test-object,test-object}") largePayload := bytes.Repeat([]byte("0123456789abcdef"), defaultGzipThresholdBytes/16+1) @@ -111,7 +182,7 @@ func TestSerializeObject(t *testing.T) { { name: "fail to encode object or status with status code", out: smallPayload, - outErrs: []error{errors.NewNotFound(schema.GroupResource{}, "test"), fmt.Errorf("bad2")}, + outErrs: []error{kerrors.NewNotFound(schema.GroupResource{}, "test"), fmt.Errorf("bad2")}, mediaType: "application/json", req: &http.Request{Header: http.Header{}}, statusCode: http.StatusOK, @@ -123,7 +194,7 @@ func TestSerializeObject(t *testing.T) { { name: "fail to encode object or status with status code and keeps previous error", out: smallPayload, - outErrs: []error{errors.NewNotFound(schema.GroupResource{}, "test"), fmt.Errorf("bad2")}, + outErrs: []error{kerrors.NewNotFound(schema.GroupResource{}, "test"), fmt.Errorf("bad2")}, mediaType: "application/json", req: &http.Request{Header: http.Header{}}, statusCode: http.StatusNotAcceptable, @@ -270,10 +341,25 @@ func TestSerializeObject(t *testing.T) { } } +type fakeResponseRecorder struct { + *httptest.ResponseRecorder + fe *fakeEncoder + errorAfterEncoding bool +} + +func (frw *fakeResponseRecorder) Write(buf []byte) (int, error) { + if frw.errorAfterEncoding && frw.fe.encodeCalled { + return 0, errors.New("returning a requested error") + } + return frw.ResponseRecorder.Write(buf) +} + type fakeEncoder struct { obj runtime.Object buf []byte errs []error + + encodeCalled bool } func (e *fakeEncoder) Encode(obj runtime.Object, w io.Writer) error { @@ -284,6 +370,7 @@ func (e *fakeEncoder) Encode(obj runtime.Object, w io.Writer) error { return err } _, err := w.Write(e.buf) + e.encodeCalled = true return err }