diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/responsewriters/writers.go b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/responsewriters/writers.go index 3ca5cba8cb6..ec5ff956320 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/responsewriters/writers.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/responsewriters/writers.go @@ -157,6 +157,9 @@ const ( // (usually the entire object), and if the size is smaller no gzipping will be performed // if the client requests it. defaultGzipThresholdBytes = 128 * 1024 + // Use the length of the first write of streaming implementations. + // TODO: Update when streaming proto is implemented + firstWriteStreamingThresholdBytes = 1 ) // negotiateContentEncoding returns a supported client-requested content encoding for the @@ -192,14 +195,53 @@ type deferredResponseWriter struct { statusCode int contentEncoding string - hasWritten bool - hw http.ResponseWriter - w io.Writer + hasBuffered bool + buffer []byte + hasWritten bool + hw http.ResponseWriter + w io.Writer ctx context.Context } func (w *deferredResponseWriter) Write(p []byte) (n int, err error) { + switch { + case w.hasWritten: + // already written, cannot buffer + return w.unbufferedWrite(p) + + case w.contentEncoding != "gzip": + // non-gzip, no need to buffer + return w.unbufferedWrite(p) + + case !w.hasBuffered && len(p) > defaultGzipThresholdBytes: + // not yet buffered, first write is long enough to trigger gzip, no need to buffer + return w.unbufferedWrite(p) + + case !w.hasBuffered && len(p) > firstWriteStreamingThresholdBytes: + // not yet buffered, first write is longer than expected for streaming scenarios that would require buffering, no need to buffer + return w.unbufferedWrite(p) + + default: + if !w.hasBuffered { + w.hasBuffered = true + // Start at 80 bytes to avoid rapid reallocation of the buffer. + // The minimum size of a 0-item serialized list object is 80 bytes: + // {"kind":"List","apiVersion":"v1","metadata":{"resourceVersion":"1"},"items":[]}\n + w.buffer = make([]byte, 0, max(80, len(p))) + } + w.buffer = append(w.buffer, p...) + var err error + if len(w.buffer) > defaultGzipThresholdBytes { + // we've accumulated enough to trigger gzip, write and clear buffer + _, err = w.unbufferedWrite(w.buffer) + w.buffer = nil + } + return len(p), err + } +} + +func (w *deferredResponseWriter) unbufferedWrite(p []byte) (n int, err error) { ctx := w.ctx span := tracing.SpanFromContext(ctx) // This Step usually wraps in-memory object serialization. @@ -245,11 +287,17 @@ func (w *deferredResponseWriter) Write(p []byte) (n int, err error) { return w.w.Write(p) } -func (w *deferredResponseWriter) Close() error { +func (w *deferredResponseWriter) Close() (err error) { if !w.hasWritten { - return nil + if !w.hasBuffered { + return nil + } + // never reached defaultGzipThresholdBytes, no need to do the gzip writer cleanup + _, err := w.unbufferedWrite(w.buffer) + w.buffer = nil + return err } - var err error + switch t := w.w.(type) { case *gzip.Writer: err = t.Close() diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/responsewriters/writers_test.go b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/responsewriters/writers_test.go index 6ec17847e83..195b161fe0a 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/responsewriters/writers_test.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/responsewriters/writers_test.go @@ -33,7 +33,6 @@ import ( "os" "reflect" "strconv" - "strings" "testing" "time" @@ -42,6 +41,7 @@ import ( kerrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" + rand2 "k8s.io/apimachinery/pkg/util/rand" "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/apiserver/pkg/features" utilfeature "k8s.io/apiserver/pkg/util/feature" @@ -378,29 +378,94 @@ func TestDeferredResponseWriter_Write(t *testing.T) { largeChunk := bytes.Repeat([]byte("b"), defaultGzipThresholdBytes+1) tests := []struct { - name string - chunks [][]byte - expectGzip bool + name string + chunks [][]byte + expectGzip bool + expectHeaders http.Header }{ + { + name: "no writes", + chunks: nil, + expectGzip: false, + expectHeaders: http.Header{}, + }, + { + name: "one empty write", + chunks: [][]byte{{}}, + expectGzip: false, + expectHeaders: http.Header{ + "Content-Type": []string{"text/plain"}, + }, + }, + { + name: "one single byte write", + chunks: [][]byte{{'{'}}, + expectGzip: false, + expectHeaders: http.Header{ + "Content-Type": []string{"text/plain"}, + }, + }, { name: "one small chunk write", chunks: [][]byte{smallChunk}, expectGzip: false, + expectHeaders: http.Header{ + "Content-Type": []string{"text/plain"}, + }, }, { name: "two small chunk writes", chunks: [][]byte{smallChunk, smallChunk}, expectGzip: false, + expectHeaders: http.Header{ + "Content-Type": []string{"text/plain"}, + }, + }, + { + name: "one single byte and one small chunk write", + chunks: [][]byte{{'{'}, smallChunk}, + expectGzip: false, + expectHeaders: http.Header{ + "Content-Type": []string{"text/plain"}, + }, + }, + { + name: "two single bytes and one small chunk write", + chunks: [][]byte{{'{'}, {'{'}, smallChunk}, + expectGzip: true, + expectHeaders: http.Header{ + "Content-Type": []string{"text/plain"}, + "Content-Encoding": []string{"gzip"}, + "Vary": []string{"Accept-Encoding"}, + }, }, { name: "one large chunk writes", chunks: [][]byte{largeChunk}, expectGzip: true, + expectHeaders: http.Header{ + "Content-Type": []string{"text/plain"}, + "Content-Encoding": []string{"gzip"}, + "Vary": []string{"Accept-Encoding"}, + }, }, { name: "two large chunk writes", chunks: [][]byte{largeChunk, largeChunk}, expectGzip: true, + expectHeaders: http.Header{ + "Content-Type": []string{"text/plain"}, + "Content-Encoding": []string{"gzip"}, + "Vary": []string{"Accept-Encoding"}, + }, + }, + { + name: "one small chunk and one large chunk write", + chunks: [][]byte{smallChunk, largeChunk}, + expectGzip: false, + expectHeaders: http.Header{ + "Content-Type": []string{"text/plain"}, + }, }, } @@ -441,8 +506,9 @@ func TestDeferredResponseWriter_Write(t *testing.T) { if res.StatusCode != http.StatusOK { t.Fatalf("status code is not writtend properly, expected: 200, got: %d", res.StatusCode) } - contentEncoding := res.Header.Get("Content-Encoding") - varyHeader := res.Header.Get("Vary") + if !reflect.DeepEqual(res.Header, tt.expectHeaders) { + t.Fatal(cmp.Diff(tt.expectHeaders, res.Header)) + } resBytes, err := io.ReadAll(res.Body) if err != nil { @@ -450,14 +516,6 @@ func TestDeferredResponseWriter_Write(t *testing.T) { } if tt.expectGzip { - if contentEncoding != "gzip" { - t.Fatalf("content-encoding is not set properly, expected: gzip, got: %s", contentEncoding) - } - - if !strings.Contains(varyHeader, "Accept-Encoding") { - t.Errorf("vary header doesn't have Accept-Encoding") - } - gr, err := gzip.NewReader(bytes.NewReader(resBytes)) if err != nil { t.Fatalf("failed to create gzip reader: %v", err) @@ -471,22 +529,101 @@ func TestDeferredResponseWriter_Write(t *testing.T) { if !bytes.Equal(fullPayload, decompressed) { t.Errorf("payload mismatch, expected: %s, got: %s", fullPayload, decompressed) } - } else { - if contentEncoding != "" { - t.Errorf("content-encoding is set unexpectedly") - } - - if strings.Contains(varyHeader, "Accept-Encoding") { - t.Errorf("accept encoding is set unexpectedly") - } - if !bytes.Equal(fullPayload, resBytes) { t.Errorf("payload mismatch, expected: %s, got: %s", fullPayload, resBytes) } - } + }) + } +} +func benchmarkChunkingGzip(b *testing.B, count int, chunk []byte) { + mockResponseWriter := httptest.NewRecorder() + mockResponseWriter.Body = nil + + drw := &deferredResponseWriter{ + mediaType: "text/plain", + statusCode: 200, + contentEncoding: "gzip", + hw: mockResponseWriter, + ctx: context.Background(), + } + b.ResetTimer() + for i := 0; i < count; i++ { + n, err := drw.Write(chunk) + if err != nil { + b.Fatalf("unexpected error while writing chunk: %v", err) + } + if n != len(chunk) { + b.Errorf("write is not complete, expected: %d bytes, written: %d bytes", len(chunk), n) + } + } + err := drw.Close() + if err != nil { + b.Fatalf("unexpected error when closing deferredResponseWriter: %v", err) + } + res := mockResponseWriter.Result() + if res.StatusCode != http.StatusOK { + b.Fatalf("status code is not writtend properly, expected: 200, got: %d", res.StatusCode) + } +} + +func BenchmarkChunkingGzip(b *testing.B) { + tests := []struct { + count int + size int + }{ + { + count: 100, + size: 1_000, + }, + { + count: 100, + size: 100_000, + }, + { + count: 1_000, + size: 100_000, + }, + { + count: 1_000, + size: 1_000_000, + }, + { + count: 10_000, + size: 100_000, + }, + { + count: 100_000, + size: 10_000, + }, + { + count: 1, + size: 100_000, + }, + { + count: 1, + size: 1_000_000, + }, + { + count: 1, + size: 10_000_000, + }, + { + count: 1, + size: 100_000_000, + }, + { + count: 1, + size: 1_000_000_000, + }, + } + + for _, t := range tests { + b.Run(fmt.Sprintf("Count=%d/Size=%d", t.count, t.size), func(b *testing.B) { + chunk := []byte(rand2.String(t.size)) + benchmarkChunkingGzip(b, t.count, chunk) }) } }