From e6f98311d00f083c1b980ed7434d2e9769fa921f Mon Sep 17 00:00:00 2001 From: Lukasz Szaszkiewicz Date: Mon, 7 Sep 2020 12:42:36 +0200 Subject: [PATCH] deferredResponseWriter returns after calling the Close() method previously all sorts of errors including a data race were possible because deferredResponseWriter resets the writer and returns it to the pool. an attempt to write to a nil writer will lead to "invalid memory address or nil pointer dereference" sharing the same instance of deferredResponseWriter might lead to "index out of range [43] with length 30" and "recovered from err index > windowEnd" errors --- .../handlers/responsewriters/writers.go | 6 +- .../handlers/responsewriters/writers_test.go | 93 ++++++++++++++++++- 2 files changed, 94 insertions(+), 5 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/responsewriters/writers.go b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/responsewriters/writers.go index b7c59cfc54d..65cb389e517 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/responsewriters/writers.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/responsewriters/writers.go @@ -96,9 +96,11 @@ func SerializeObject(mediaType string, encoder runtime.Encoder, hw http.Response err := encoder.Encode(object, w) if err == nil { err = w.Close() - if err == nil { - return + if err != nil { + // we cannot write an error to the writer anymore as the Encode call was successful. + utilruntime.HandleError(fmt.Errorf("apiserver was unable to close cleanly the response writer: %v", err)) } + return } // make a best effort to write the object if a failure is detected diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/responsewriters/writers_test.go b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/responsewriters/writers_test.go index 624b6becdff..ace0f0a7096 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/responsewriters/writers_test.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/responsewriters/writers_test.go @@ -20,6 +20,7 @@ import ( "bytes" "compress/gzip" "encoding/hex" + "errors" "fmt" "io" "io/ioutil" @@ -28,7 +29,7 @@ import ( "reflect" "testing" - "k8s.io/apimachinery/pkg/api/errors" + kerrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/diff" @@ -37,6 +38,76 @@ import ( featuregatetesting "k8s.io/component-base/featuregate/testing" ) +func TestSerializeObjectParallel(t *testing.T) { + largePayload := bytes.Repeat([]byte("0123456789abcdef"), defaultGzipThresholdBytes/16+1) + type test struct { + name string + + compressionEnabled bool + + mediaType string + out []byte + outErrs []error + req *http.Request + statusCode int + object runtime.Object + + wantCode int + wantHeaders http.Header + wantBody []byte + } + newTest := func() test { + return test{ + name: "compress on gzip", + compressionEnabled: true, + out: largePayload, + mediaType: "application/json", + req: &http.Request{Header: http.Header{ + "Accept-Encoding": []string{"gzip"}, + }}, + wantCode: http.StatusOK, + wantHeaders: http.Header{ + "Content-Type": []string{"application/json"}, + "Content-Encoding": []string{"gzip"}, + "Vary": []string{"Accept-Encoding"}, + }, + } + } + for i := 0; i < 100; i++ { + ctt := newTest() + t.Run(ctt.name, func(t *testing.T) { + defer func() { + if r := recover(); r != nil { + t.Fatalf("recovered from err %v", r) + } + }() + t.Parallel() + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.APIResponseCompression, ctt.compressionEnabled)() + + encoder := &fakeEncoder{ + buf: ctt.out, + errs: ctt.outErrs, + } + if ctt.statusCode == 0 { + ctt.statusCode = http.StatusOK + } + recorder := &fakeResponseRecorder{ + ResponseRecorder: httptest.NewRecorder(), + fe: encoder, + errorAfterEncoding: true, + } + SerializeObject(ctt.mediaType, encoder, recorder, ctt.req, ctt.statusCode, ctt.object) + result := recorder.Result() + if result.StatusCode != ctt.wantCode { + t.Fatalf("unexpected code: %v", result.StatusCode) + } + if !reflect.DeepEqual(result.Header, ctt.wantHeaders) { + t.Fatal(diff.ObjectReflectDiff(ctt.wantHeaders, result.Header)) + } + }) + } +} + func TestSerializeObject(t *testing.T) { smallPayload := []byte("{test-object,test-object}") largePayload := bytes.Repeat([]byte("0123456789abcdef"), defaultGzipThresholdBytes/16+1) @@ -111,7 +182,7 @@ func TestSerializeObject(t *testing.T) { { name: "fail to encode object or status with status code", out: smallPayload, - outErrs: []error{errors.NewNotFound(schema.GroupResource{}, "test"), fmt.Errorf("bad2")}, + outErrs: []error{kerrors.NewNotFound(schema.GroupResource{}, "test"), fmt.Errorf("bad2")}, mediaType: "application/json", req: &http.Request{Header: http.Header{}}, statusCode: http.StatusOK, @@ -123,7 +194,7 @@ func TestSerializeObject(t *testing.T) { { name: "fail to encode object or status with status code and keeps previous error", out: smallPayload, - outErrs: []error{errors.NewNotFound(schema.GroupResource{}, "test"), fmt.Errorf("bad2")}, + outErrs: []error{kerrors.NewNotFound(schema.GroupResource{}, "test"), fmt.Errorf("bad2")}, mediaType: "application/json", req: &http.Request{Header: http.Header{}}, statusCode: http.StatusNotAcceptable, @@ -270,10 +341,25 @@ func TestSerializeObject(t *testing.T) { } } +type fakeResponseRecorder struct { + *httptest.ResponseRecorder + fe *fakeEncoder + errorAfterEncoding bool +} + +func (frw *fakeResponseRecorder) Write(buf []byte) (int, error) { + if frw.errorAfterEncoding && frw.fe.encodeCalled { + return 0, errors.New("returning a requested error") + } + return frw.ResponseRecorder.Write(buf) +} + type fakeEncoder struct { obj runtime.Object buf []byte errs []error + + encodeCalled bool } func (e *fakeEncoder) Encode(obj runtime.Object, w io.Writer) error { @@ -284,6 +370,7 @@ func (e *fakeEncoder) Encode(obj runtime.Object, w io.Writer) error { return err } _, err := w.Write(e.buf) + e.encodeCalled = true return err }