From f67e15a26f5958b1ebf07a2590e98889a987677d Mon Sep 17 00:00:00 2001 From: Jefftree Date: Mon, 30 Sep 2024 21:11:53 +0000 Subject: [PATCH] Move inmemoryresponsewriter into own package --- .../responsewriter/inmemoryresponsewriter.go | 70 +++++++++++++++++++ .../pkg/apiserver/handler_discovery.go | 62 +++------------- .../openapi/aggregator/downloader.go | 49 ++----------- .../openapiv3/aggregator/aggregator_test.go | 5 +- .../openapiv3/aggregator/downloader.go | 55 +++------------ 5 files changed, 96 insertions(+), 145 deletions(-) create mode 100644 staging/src/k8s.io/apiserver/pkg/util/responsewriter/inmemoryresponsewriter.go diff --git a/staging/src/k8s.io/apiserver/pkg/util/responsewriter/inmemoryresponsewriter.go b/staging/src/k8s.io/apiserver/pkg/util/responsewriter/inmemoryresponsewriter.go new file mode 100644 index 00000000000..1d5705bb43e --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/util/responsewriter/inmemoryresponsewriter.go @@ -0,0 +1,70 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package responsewriter + +import ( + "fmt" + "net/http" +) + +// inMemoryResponseWriter is a http.Writer that keep the response in memory. +type inMemoryResponseWriter struct { + writeHeaderCalled bool + header http.Header + respCode int + data []byte +} + +func NewInMemoryResponseWriter() *inMemoryResponseWriter { + return &inMemoryResponseWriter{header: http.Header{}} +} + +func (r *inMemoryResponseWriter) Header() http.Header { + return r.header +} + +func (r *inMemoryResponseWriter) RespCode() int { + return r.respCode +} + +func (r *inMemoryResponseWriter) Data() []byte { + return r.data +} + +func (r *inMemoryResponseWriter) WriteHeader(code int) { + r.writeHeaderCalled = true + r.respCode = code +} + +func (r *inMemoryResponseWriter) Write(in []byte) (int, error) { + if !r.writeHeaderCalled { + r.WriteHeader(http.StatusOK) + } + r.data = append(r.data, in...) + return len(in), nil +} + +func (r *inMemoryResponseWriter) String() string { + s := fmt.Sprintf("ResponseCode: %d", r.respCode) + if r.data != nil { + s += fmt.Sprintf(", Body: %s", string(r.data)) + } + if r.header != nil { + s += fmt.Sprintf(", Header: %s", r.header) + } + return s +} diff --git a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_discovery.go b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_discovery.go index 68199b6b30c..b6ad297a9c6 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_discovery.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_discovery.go @@ -38,6 +38,7 @@ import ( "k8s.io/apiserver/pkg/endpoints" discoveryendpoint "k8s.io/apiserver/pkg/endpoints/discovery/aggregated" "k8s.io/apiserver/pkg/endpoints/request" + "k8s.io/apiserver/pkg/util/responsewriter" "k8s.io/client-go/discovery" "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" @@ -249,14 +250,14 @@ func (dm *discoveryManager) fetchFreshDiscoveryForService(gv metav1.GroupVersion // from BEFORE the request is dispatched so that lastUpdated can be used to // de-duplicate requests. now := time.Now() - writer := newInMemoryResponseWriter() + writer := responsewriter.NewInMemoryResponseWriter() handler.ServeHTTP(writer, req) isV2Beta1GVK, _ := discovery.ContentTypeIsGVK(writer.Header().Get("Content-Type"), v2Beta1GVK) isV2GVK, _ := discovery.ContentTypeIsGVK(writer.Header().Get("Content-Type"), v2GVK) switch { - case writer.respCode == http.StatusNotModified: + case writer.RespCode() == http.StatusNotModified: // Keep old entry, update timestamp cached = cachedResult{ discovery: cached.discovery, @@ -266,12 +267,12 @@ func (dm *discoveryManager) fetchFreshDiscoveryForService(gv metav1.GroupVersion dm.setCacheEntryForService(info.service, cached) return &cached, nil - case writer.respCode == http.StatusServiceUnavailable: + case writer.RespCode() == http.StatusServiceUnavailable: return nil, fmt.Errorf("service %s returned non-success response code: %v", - info.service.String(), writer.respCode) - case writer.respCode == http.StatusOK && (isV2GVK || isV2Beta1GVK): + info.service.String(), writer.RespCode()) + case writer.RespCode() == http.StatusOK && (isV2GVK || isV2Beta1GVK): parsed := &apidiscoveryv2.APIGroupDiscoveryList{} - if err := runtime.DecodeInto(dm.codecs.UniversalDecoder(), writer.data, parsed); err != nil { + if err := runtime.DecodeInto(dm.codecs.UniversalDecoder(), writer.Data(), parsed); err != nil { return nil, err } @@ -338,15 +339,15 @@ func (dm *discoveryManager) fetchFreshDiscoveryForService(gv metav1.GroupVersion req.Header.Add("If-None-Match", cached.etag) } - writer := newInMemoryResponseWriter() + writer := responsewriter.NewInMemoryResponseWriter() handler.ServeHTTP(writer, req) - if writer.respCode != http.StatusOK { + if writer.RespCode() != http.StatusOK { return nil, fmt.Errorf("failed to download legacy discovery for %s: %v", path, writer.String()) } parsed := &metav1.APIResourceList{} - if err := runtime.DecodeInto(scheme.Codecs.UniversalDecoder(), writer.data, parsed); err != nil { + if err := runtime.DecodeInto(scheme.Codecs.UniversalDecoder(), writer.Data(), parsed); err != nil { return nil, err } @@ -619,46 +620,3 @@ func (dm *discoveryManager) setInfoForAPIService(name string, result *groupVersi return oldValueIfExisted } - -// !TODO: This was copied from staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator/downloader.go -// which was copied from staging/src/k8s.io/kube-aggregator/pkg/controllers/openapiv3/aggregator/downloader.go -// so we should find a home for this -// inMemoryResponseWriter is a http.Writer that keep the response in memory. -type inMemoryResponseWriter struct { - writeHeaderCalled bool - header http.Header - respCode int - data []byte -} - -func newInMemoryResponseWriter() *inMemoryResponseWriter { - return &inMemoryResponseWriter{header: http.Header{}} -} - -func (r *inMemoryResponseWriter) Header() http.Header { - return r.header -} - -func (r *inMemoryResponseWriter) WriteHeader(code int) { - r.writeHeaderCalled = true - r.respCode = code -} - -func (r *inMemoryResponseWriter) Write(in []byte) (int, error) { - if !r.writeHeaderCalled { - r.WriteHeader(http.StatusOK) - } - r.data = append(r.data, in...) - return len(in), nil -} - -func (r *inMemoryResponseWriter) String() string { - s := fmt.Sprintf("ResponseCode: %d", r.respCode) - if r.data != nil { - s += fmt.Sprintf(", Body: %s", string(r.data)) - } - if r.header != nil { - s += fmt.Sprintf(", Header: %s", r.header) - } - return s -} diff --git a/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator/downloader.go b/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator/downloader.go index 03721365805..fd1a21201d3 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator/downloader.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator/downloader.go @@ -25,6 +25,7 @@ import ( "k8s.io/apiserver/pkg/authentication/user" "k8s.io/apiserver/pkg/endpoints/request" + "k8s.io/apiserver/pkg/util/responsewriter" "k8s.io/kube-openapi/pkg/validation/spec" ) @@ -126,10 +127,10 @@ func (s *Downloader) Download(handler http.Handler, etag string) (returnSpec *sp req.Header.Add("If-None-Match", etag) } - writer := newInMemoryResponseWriter() + writer := responsewriter.NewInMemoryResponseWriter() handler.ServeHTTP(writer, req) - switch writer.respCode { + switch writer.RespCode() { case http.StatusNotModified: if len(etag) == 0 { return nil, etag, http.StatusNotModified, fmt.Errorf("http.StatusNotModified is not allowed in absence of etag") @@ -140,12 +141,12 @@ func (s *Downloader) Download(handler http.Handler, etag string) (returnSpec *sp return nil, "", http.StatusNotFound, nil case http.StatusOK: openAPISpec := &spec.Swagger{} - if err := openAPISpec.UnmarshalJSON(writer.data); err != nil { + if err := openAPISpec.UnmarshalJSON(writer.Data()); err != nil { return nil, "", 0, err } newEtag = writer.Header().Get("Etag") if len(newEtag) == 0 { - newEtag = etagFor(writer.data) + newEtag = etagFor(writer.Data()) if len(etag) > 0 && strings.HasPrefix(etag, locallyGeneratedEtagPrefix) { // The function call with an etag and server does not report an etag. // That means this server does not support etag and the etag that passed @@ -161,43 +162,3 @@ func (s *Downloader) Download(handler http.Handler, etag string) (returnSpec *sp return nil, "", 0, fmt.Errorf("failed to retrieve openAPI spec, http error: %s", writer.String()) } } - -// inMemoryResponseWriter is a http.Writer that keep the response in memory. -type inMemoryResponseWriter struct { - writeHeaderCalled bool - header http.Header - respCode int - data []byte -} - -func newInMemoryResponseWriter() *inMemoryResponseWriter { - return &inMemoryResponseWriter{header: http.Header{}} -} - -func (r *inMemoryResponseWriter) Header() http.Header { - return r.header -} - -func (r *inMemoryResponseWriter) WriteHeader(code int) { - r.writeHeaderCalled = true - r.respCode = code -} - -func (r *inMemoryResponseWriter) Write(in []byte) (int, error) { - if !r.writeHeaderCalled { - r.WriteHeader(http.StatusOK) - } - r.data = append(r.data, in...) - return len(in), nil -} - -func (r *inMemoryResponseWriter) String() string { - s := fmt.Sprintf("ResponseCode: %d", r.respCode) - if r.data != nil { - s += fmt.Sprintf(", Body: %s", string(r.data)) - } - if r.header != nil { - s += fmt.Sprintf(", Header: %s", r.header) - } - return s -} diff --git a/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapiv3/aggregator/aggregator_test.go b/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapiv3/aggregator/aggregator_test.go index eb0b090e23a..fdd250dd6a7 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapiv3/aggregator/aggregator_test.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapiv3/aggregator/aggregator_test.go @@ -31,6 +31,7 @@ import ( openapinamer "k8s.io/apiserver/pkg/endpoints/openapi" genericapiserver "k8s.io/apiserver/pkg/server" "k8s.io/apiserver/pkg/server/mux" + "k8s.io/apiserver/pkg/util/responsewriter" "k8s.io/component-base/metrics/legacyregistry" "k8s.io/component-base/metrics/testutil" v1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1" @@ -301,9 +302,9 @@ func sendReq(t *testing.T, handler http.Handler, path string) []byte { if err != nil { t.Fatal(err) } - writer := newInMemoryResponseWriter() + writer := responsewriter.NewInMemoryResponseWriter() handler.ServeHTTP(writer, req) - return writer.data + return writer.Data() } func getTestAPIServiceOpenAPIDefinitions(_ openapicommon.ReferenceCallback) map[string]openapicommon.OpenAPIDefinition { diff --git a/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapiv3/aggregator/downloader.go b/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapiv3/aggregator/downloader.go index 45b2467980a..83646456062 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapiv3/aggregator/downloader.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapiv3/aggregator/downloader.go @@ -23,6 +23,7 @@ import ( "k8s.io/apiserver/pkg/authentication/user" "k8s.io/apiserver/pkg/endpoints/request" + "k8s.io/apiserver/pkg/util/responsewriter" "k8s.io/kube-openapi/pkg/handler3" ) @@ -58,58 +59,18 @@ func (s *Downloader) OpenAPIV3Root(handler http.Handler) (*handler3.OpenAPIV3Dis if err != nil { return nil, 0, err } - writer := newInMemoryResponseWriter() + writer := responsewriter.NewInMemoryResponseWriter() handler.ServeHTTP(writer, req) - switch writer.respCode { + switch writer.RespCode() { case http.StatusNotFound: - return nil, writer.respCode, nil + return nil, writer.RespCode(), nil case http.StatusOK: groups := handler3.OpenAPIV3Discovery{} - if err := json.Unmarshal(writer.data, &groups); err != nil { - return nil, writer.respCode, err + if err := json.Unmarshal(writer.Data(), &groups); err != nil { + return nil, writer.RespCode(), err } - return &groups, writer.respCode, nil + return &groups, writer.RespCode(), nil } - return nil, writer.respCode, fmt.Errorf("Error, could not get list of group versions for APIService") -} - -// inMemoryResponseWriter is a http.Writer that keep the response in memory. -type inMemoryResponseWriter struct { - writeHeaderCalled bool - header http.Header - respCode int - data []byte -} - -func newInMemoryResponseWriter() *inMemoryResponseWriter { - return &inMemoryResponseWriter{header: http.Header{}} -} - -func (r *inMemoryResponseWriter) Header() http.Header { - return r.header -} - -func (r *inMemoryResponseWriter) WriteHeader(code int) { - r.writeHeaderCalled = true - r.respCode = code -} - -func (r *inMemoryResponseWriter) Write(in []byte) (int, error) { - if !r.writeHeaderCalled { - r.WriteHeader(http.StatusOK) - } - r.data = append(r.data, in...) - return len(in), nil -} - -func (r *inMemoryResponseWriter) String() string { - s := fmt.Sprintf("ResponseCode: %d", r.respCode) - if r.data != nil { - s += fmt.Sprintf(", Body: %s", string(r.data)) - } - if r.header != nil { - s += fmt.Sprintf(", Header: %s", r.header) - } - return s + return nil, writer.RespCode(), fmt.Errorf("Error, could not get list of group versions for APIService") }