From 3085b57869a2a7bf5290ab97facaf17fedfa88a0 Mon Sep 17 00:00:00 2001 From: Eric Lin Date: Wed, 31 May 2023 19:12:32 +0000 Subject: [PATCH] Do not copy bytes for cached serializations Since cachingObject has the encoded data cached and they are not supposed to change. It's memory efficient to just copy the slice references. Signed-off-by: Eric Lin --- .../k8s.io/apimachinery/pkg/runtime/splice.go | 76 +++++++++++ .../apimachinery/pkg/runtime/splice_test.go | 121 ++++++++++++++++++ .../apiserver/pkg/endpoints/handlers/watch.go | 2 +- .../pkg/storage/cacher/caching_object.go | 4 + 4 files changed, 202 insertions(+), 1 deletion(-) create mode 100644 staging/src/k8s.io/apimachinery/pkg/runtime/splice.go create mode 100644 staging/src/k8s.io/apimachinery/pkg/runtime/splice_test.go diff --git a/staging/src/k8s.io/apimachinery/pkg/runtime/splice.go b/staging/src/k8s.io/apimachinery/pkg/runtime/splice.go new file mode 100644 index 00000000000..2badb7b97f3 --- /dev/null +++ b/staging/src/k8s.io/apimachinery/pkg/runtime/splice.go @@ -0,0 +1,76 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package runtime + +import ( + "bytes" + "io" +) + +// Splice is the interface that wraps the Splice method. +// +// Splice moves data from given slice without copying the underlying data for +// efficiency purpose. Therefore, the caller should make sure the underlying +// data is not changed later. +type Splice interface { + Splice([]byte) + io.Writer + Reset() + Bytes() []byte +} + +// A spliceBuffer implements Splice and io.Writer interfaces. +type spliceBuffer struct { + raw []byte + buf *bytes.Buffer +} + +func NewSpliceBuffer() Splice { + return &spliceBuffer{} +} + +// Splice implements the Splice interface. +func (sb *spliceBuffer) Splice(raw []byte) { + sb.raw = raw +} + +// Write implements the io.Writer interface. +func (sb *spliceBuffer) Write(p []byte) (n int, err error) { + if sb.buf == nil { + sb.buf = &bytes.Buffer{} + } + return sb.buf.Write(p) +} + +// Reset resets the buffer to be empty. +func (sb *spliceBuffer) Reset() { + if sb.buf != nil { + sb.buf.Reset() + } + sb.raw = nil +} + +// Bytes returns the data held by the buffer. +func (sb *spliceBuffer) Bytes() []byte { + if sb.buf != nil && len(sb.buf.Bytes()) > 0 { + return sb.buf.Bytes() + } + if sb.raw != nil { + return sb.raw + } + return []byte{} +} diff --git a/staging/src/k8s.io/apimachinery/pkg/runtime/splice_test.go b/staging/src/k8s.io/apimachinery/pkg/runtime/splice_test.go new file mode 100644 index 00000000000..9d8ca5102dd --- /dev/null +++ b/staging/src/k8s.io/apimachinery/pkg/runtime/splice_test.go @@ -0,0 +1,121 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package runtime_test + +import ( + "bytes" + "testing" + + "k8s.io/apimachinery/pkg/runtime" +) + +func TestSpliceBuffer(t *testing.T) { + testBytes0 := []byte{0x01, 0x02, 0x03, 0x04} + testBytes1 := []byte{0x04, 0x03, 0x02, 0x02} + + testCases := []struct { + name string + run func(sb runtime.Splice, buf *bytes.Buffer) + }{ + { + name: "Basic Write", + run: func(sb runtime.Splice, buf *bytes.Buffer) { + sb.Write(testBytes0) + buf.Write(testBytes0) + }, + }, + { + name: "Multiple Writes", + run: func(sb runtime.Splice, buf *bytes.Buffer) { + for _, b := range testBytes0 { + sb.Write([]byte{b}) + buf.Write([]byte{b}) + } + }, + }, + { + name: "Write and Reset", + run: func(sb runtime.Splice, buf *bytes.Buffer) { + sb.Write(testBytes0) + buf.Write(testBytes0) + + sb.Reset() + buf.Reset() + }, + }, + { + name: "Write/Splice", + run: func(sb runtime.Splice, buf *bytes.Buffer) { + sb.Splice(testBytes0) + buf.Write(testBytes0) + }, + }, + { + name: "Write/Splice and Reset", + run: func(sb runtime.Splice, buf *bytes.Buffer) { + sb.Splice(testBytes0) + buf.Write(testBytes0) + + sb.Reset() + buf.Reset() + }, + }, + { + name: "Write/Splice, Reset, Write/Splice", + run: func(sb runtime.Splice, buf *bytes.Buffer) { + sb.Splice(testBytes0) + buf.Write(testBytes0) + + sb.Reset() + buf.Reset() + + sb.Splice(testBytes1) + buf.Write(testBytes1) + }, + }, + { + name: "Write, Reset, Splice", + run: func(sb runtime.Splice, buf *bytes.Buffer) { + sb.Write(testBytes0) + buf.Write(testBytes0) + + sb.Reset() + buf.Reset() + + sb.Splice(testBytes1) + buf.Write(testBytes1) + }, + }, + } + + for _, tt := range testCases { + t.Run(tt.name, func(t *testing.T) { + sb := runtime.NewSpliceBuffer() + buf := &bytes.Buffer{} + tt.run(sb, buf) + + if sb.Bytes() == nil { + t.Errorf("Unexpected nil") + } + if string(sb.Bytes()) != string(buf.Bytes()) { + t.Errorf("Expected sb.Bytes() == %q, buf.Bytes() == %q", sb.Bytes(), buf.Bytes()) + } + }) + + } + +} diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/watch.go b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/watch.go index c76cc194a2c..7d9372d967a 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/watch.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/watch.go @@ -219,7 +219,7 @@ func (s *WatchServer) ServeHTTP(w http.ResponseWriter, req *http.Request) { var unknown runtime.Unknown internalEvent := &metav1.InternalEvent{} outEvent := &metav1.WatchEvent{} - buf := &bytes.Buffer{} + buf := runtime.NewSpliceBuffer() ch := s.Watching.ResultChan() done := req.Context().Done() diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/caching_object.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/caching_object.go index 258efed8425..e2e2aa5e79d 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/caching_object.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/caching_object.go @@ -148,6 +148,10 @@ func (o *cachingObject) CacheEncode(id runtime.Identifier, encode func(runtime.O if result.err != nil { return result.err } + if b, support := w.(runtime.Splice); support { + b.Splice(result.raw) + return nil + } _, err := w.Write(result.raw) return err }