Move inmemoryresponsewriter into own package

This commit is contained in:
Jefftree 2024-09-30 21:11:53 +00:00
parent 5e65529ca9
commit f67e15a26f
5 changed files with 96 additions and 145 deletions

View File

@ -0,0 +1,70 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package responsewriter
import (
"fmt"
"net/http"
)
// inMemoryResponseWriter is a http.Writer that keep the response in memory.
type inMemoryResponseWriter struct {
writeHeaderCalled bool
header http.Header
respCode int
data []byte
}
func NewInMemoryResponseWriter() *inMemoryResponseWriter {
return &inMemoryResponseWriter{header: http.Header{}}
}
func (r *inMemoryResponseWriter) Header() http.Header {
return r.header
}
func (r *inMemoryResponseWriter) RespCode() int {
return r.respCode
}
func (r *inMemoryResponseWriter) Data() []byte {
return r.data
}
func (r *inMemoryResponseWriter) WriteHeader(code int) {
r.writeHeaderCalled = true
r.respCode = code
}
func (r *inMemoryResponseWriter) Write(in []byte) (int, error) {
if !r.writeHeaderCalled {
r.WriteHeader(http.StatusOK)
}
r.data = append(r.data, in...)
return len(in), nil
}
func (r *inMemoryResponseWriter) String() string {
s := fmt.Sprintf("ResponseCode: %d", r.respCode)
if r.data != nil {
s += fmt.Sprintf(", Body: %s", string(r.data))
}
if r.header != nil {
s += fmt.Sprintf(", Header: %s", r.header)
}
return s
}

View File

@ -38,6 +38,7 @@ import (
"k8s.io/apiserver/pkg/endpoints" "k8s.io/apiserver/pkg/endpoints"
discoveryendpoint "k8s.io/apiserver/pkg/endpoints/discovery/aggregated" discoveryendpoint "k8s.io/apiserver/pkg/endpoints/discovery/aggregated"
"k8s.io/apiserver/pkg/endpoints/request" "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/util/responsewriter"
"k8s.io/client-go/discovery" "k8s.io/client-go/discovery"
"k8s.io/client-go/util/workqueue" "k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2" "k8s.io/klog/v2"
@ -249,14 +250,14 @@ func (dm *discoveryManager) fetchFreshDiscoveryForService(gv metav1.GroupVersion
// from BEFORE the request is dispatched so that lastUpdated can be used to // from BEFORE the request is dispatched so that lastUpdated can be used to
// de-duplicate requests. // de-duplicate requests.
now := time.Now() now := time.Now()
writer := newInMemoryResponseWriter() writer := responsewriter.NewInMemoryResponseWriter()
handler.ServeHTTP(writer, req) handler.ServeHTTP(writer, req)
isV2Beta1GVK, _ := discovery.ContentTypeIsGVK(writer.Header().Get("Content-Type"), v2Beta1GVK) isV2Beta1GVK, _ := discovery.ContentTypeIsGVK(writer.Header().Get("Content-Type"), v2Beta1GVK)
isV2GVK, _ := discovery.ContentTypeIsGVK(writer.Header().Get("Content-Type"), v2GVK) isV2GVK, _ := discovery.ContentTypeIsGVK(writer.Header().Get("Content-Type"), v2GVK)
switch { switch {
case writer.respCode == http.StatusNotModified: case writer.RespCode() == http.StatusNotModified:
// Keep old entry, update timestamp // Keep old entry, update timestamp
cached = cachedResult{ cached = cachedResult{
discovery: cached.discovery, discovery: cached.discovery,
@ -266,12 +267,12 @@ func (dm *discoveryManager) fetchFreshDiscoveryForService(gv metav1.GroupVersion
dm.setCacheEntryForService(info.service, cached) dm.setCacheEntryForService(info.service, cached)
return &cached, nil return &cached, nil
case writer.respCode == http.StatusServiceUnavailable: case writer.RespCode() == http.StatusServiceUnavailable:
return nil, fmt.Errorf("service %s returned non-success response code: %v", return nil, fmt.Errorf("service %s returned non-success response code: %v",
info.service.String(), writer.respCode) info.service.String(), writer.RespCode())
case writer.respCode == http.StatusOK && (isV2GVK || isV2Beta1GVK): case writer.RespCode() == http.StatusOK && (isV2GVK || isV2Beta1GVK):
parsed := &apidiscoveryv2.APIGroupDiscoveryList{} parsed := &apidiscoveryv2.APIGroupDiscoveryList{}
if err := runtime.DecodeInto(dm.codecs.UniversalDecoder(), writer.data, parsed); err != nil { if err := runtime.DecodeInto(dm.codecs.UniversalDecoder(), writer.Data(), parsed); err != nil {
return nil, err return nil, err
} }
@ -338,15 +339,15 @@ func (dm *discoveryManager) fetchFreshDiscoveryForService(gv metav1.GroupVersion
req.Header.Add("If-None-Match", cached.etag) req.Header.Add("If-None-Match", cached.etag)
} }
writer := newInMemoryResponseWriter() writer := responsewriter.NewInMemoryResponseWriter()
handler.ServeHTTP(writer, req) handler.ServeHTTP(writer, req)
if writer.respCode != http.StatusOK { if writer.RespCode() != http.StatusOK {
return nil, fmt.Errorf("failed to download legacy discovery for %s: %v", path, writer.String()) return nil, fmt.Errorf("failed to download legacy discovery for %s: %v", path, writer.String())
} }
parsed := &metav1.APIResourceList{} parsed := &metav1.APIResourceList{}
if err := runtime.DecodeInto(scheme.Codecs.UniversalDecoder(), writer.data, parsed); err != nil { if err := runtime.DecodeInto(scheme.Codecs.UniversalDecoder(), writer.Data(), parsed); err != nil {
return nil, err return nil, err
} }
@ -619,46 +620,3 @@ func (dm *discoveryManager) setInfoForAPIService(name string, result *groupVersi
return oldValueIfExisted return oldValueIfExisted
} }
// !TODO: This was copied from staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator/downloader.go
// which was copied from staging/src/k8s.io/kube-aggregator/pkg/controllers/openapiv3/aggregator/downloader.go
// so we should find a home for this
// inMemoryResponseWriter is a http.Writer that keep the response in memory.
type inMemoryResponseWriter struct {
writeHeaderCalled bool
header http.Header
respCode int
data []byte
}
func newInMemoryResponseWriter() *inMemoryResponseWriter {
return &inMemoryResponseWriter{header: http.Header{}}
}
func (r *inMemoryResponseWriter) Header() http.Header {
return r.header
}
func (r *inMemoryResponseWriter) WriteHeader(code int) {
r.writeHeaderCalled = true
r.respCode = code
}
func (r *inMemoryResponseWriter) Write(in []byte) (int, error) {
if !r.writeHeaderCalled {
r.WriteHeader(http.StatusOK)
}
r.data = append(r.data, in...)
return len(in), nil
}
func (r *inMemoryResponseWriter) String() string {
s := fmt.Sprintf("ResponseCode: %d", r.respCode)
if r.data != nil {
s += fmt.Sprintf(", Body: %s", string(r.data))
}
if r.header != nil {
s += fmt.Sprintf(", Header: %s", r.header)
}
return s
}

View File

@ -25,6 +25,7 @@ import (
"k8s.io/apiserver/pkg/authentication/user" "k8s.io/apiserver/pkg/authentication/user"
"k8s.io/apiserver/pkg/endpoints/request" "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/util/responsewriter"
"k8s.io/kube-openapi/pkg/validation/spec" "k8s.io/kube-openapi/pkg/validation/spec"
) )
@ -126,10 +127,10 @@ func (s *Downloader) Download(handler http.Handler, etag string) (returnSpec *sp
req.Header.Add("If-None-Match", etag) req.Header.Add("If-None-Match", etag)
} }
writer := newInMemoryResponseWriter() writer := responsewriter.NewInMemoryResponseWriter()
handler.ServeHTTP(writer, req) handler.ServeHTTP(writer, req)
switch writer.respCode { switch writer.RespCode() {
case http.StatusNotModified: case http.StatusNotModified:
if len(etag) == 0 { if len(etag) == 0 {
return nil, etag, http.StatusNotModified, fmt.Errorf("http.StatusNotModified is not allowed in absence of etag") return nil, etag, http.StatusNotModified, fmt.Errorf("http.StatusNotModified is not allowed in absence of etag")
@ -140,12 +141,12 @@ func (s *Downloader) Download(handler http.Handler, etag string) (returnSpec *sp
return nil, "", http.StatusNotFound, nil return nil, "", http.StatusNotFound, nil
case http.StatusOK: case http.StatusOK:
openAPISpec := &spec.Swagger{} openAPISpec := &spec.Swagger{}
if err := openAPISpec.UnmarshalJSON(writer.data); err != nil { if err := openAPISpec.UnmarshalJSON(writer.Data()); err != nil {
return nil, "", 0, err return nil, "", 0, err
} }
newEtag = writer.Header().Get("Etag") newEtag = writer.Header().Get("Etag")
if len(newEtag) == 0 { if len(newEtag) == 0 {
newEtag = etagFor(writer.data) newEtag = etagFor(writer.Data())
if len(etag) > 0 && strings.HasPrefix(etag, locallyGeneratedEtagPrefix) { if len(etag) > 0 && strings.HasPrefix(etag, locallyGeneratedEtagPrefix) {
// The function call with an etag and server does not report an etag. // The function call with an etag and server does not report an etag.
// That means this server does not support etag and the etag that passed // That means this server does not support etag and the etag that passed
@ -161,43 +162,3 @@ func (s *Downloader) Download(handler http.Handler, etag string) (returnSpec *sp
return nil, "", 0, fmt.Errorf("failed to retrieve openAPI spec, http error: %s", writer.String()) return nil, "", 0, fmt.Errorf("failed to retrieve openAPI spec, http error: %s", writer.String())
} }
} }
// inMemoryResponseWriter is a http.Writer that keep the response in memory.
type inMemoryResponseWriter struct {
writeHeaderCalled bool
header http.Header
respCode int
data []byte
}
func newInMemoryResponseWriter() *inMemoryResponseWriter {
return &inMemoryResponseWriter{header: http.Header{}}
}
func (r *inMemoryResponseWriter) Header() http.Header {
return r.header
}
func (r *inMemoryResponseWriter) WriteHeader(code int) {
r.writeHeaderCalled = true
r.respCode = code
}
func (r *inMemoryResponseWriter) Write(in []byte) (int, error) {
if !r.writeHeaderCalled {
r.WriteHeader(http.StatusOK)
}
r.data = append(r.data, in...)
return len(in), nil
}
func (r *inMemoryResponseWriter) String() string {
s := fmt.Sprintf("ResponseCode: %d", r.respCode)
if r.data != nil {
s += fmt.Sprintf(", Body: %s", string(r.data))
}
if r.header != nil {
s += fmt.Sprintf(", Header: %s", r.header)
}
return s
}

View File

@ -31,6 +31,7 @@ import (
openapinamer "k8s.io/apiserver/pkg/endpoints/openapi" openapinamer "k8s.io/apiserver/pkg/endpoints/openapi"
genericapiserver "k8s.io/apiserver/pkg/server" genericapiserver "k8s.io/apiserver/pkg/server"
"k8s.io/apiserver/pkg/server/mux" "k8s.io/apiserver/pkg/server/mux"
"k8s.io/apiserver/pkg/util/responsewriter"
"k8s.io/component-base/metrics/legacyregistry" "k8s.io/component-base/metrics/legacyregistry"
"k8s.io/component-base/metrics/testutil" "k8s.io/component-base/metrics/testutil"
v1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1" v1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
@ -301,9 +302,9 @@ func sendReq(t *testing.T, handler http.Handler, path string) []byte {
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
writer := newInMemoryResponseWriter() writer := responsewriter.NewInMemoryResponseWriter()
handler.ServeHTTP(writer, req) handler.ServeHTTP(writer, req)
return writer.data return writer.Data()
} }
func getTestAPIServiceOpenAPIDefinitions(_ openapicommon.ReferenceCallback) map[string]openapicommon.OpenAPIDefinition { func getTestAPIServiceOpenAPIDefinitions(_ openapicommon.ReferenceCallback) map[string]openapicommon.OpenAPIDefinition {

View File

@ -23,6 +23,7 @@ import (
"k8s.io/apiserver/pkg/authentication/user" "k8s.io/apiserver/pkg/authentication/user"
"k8s.io/apiserver/pkg/endpoints/request" "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/util/responsewriter"
"k8s.io/kube-openapi/pkg/handler3" "k8s.io/kube-openapi/pkg/handler3"
) )
@ -58,58 +59,18 @@ func (s *Downloader) OpenAPIV3Root(handler http.Handler) (*handler3.OpenAPIV3Dis
if err != nil { if err != nil {
return nil, 0, err return nil, 0, err
} }
writer := newInMemoryResponseWriter() writer := responsewriter.NewInMemoryResponseWriter()
handler.ServeHTTP(writer, req) handler.ServeHTTP(writer, req)
switch writer.respCode { switch writer.RespCode() {
case http.StatusNotFound: case http.StatusNotFound:
return nil, writer.respCode, nil return nil, writer.RespCode(), nil
case http.StatusOK: case http.StatusOK:
groups := handler3.OpenAPIV3Discovery{} groups := handler3.OpenAPIV3Discovery{}
if err := json.Unmarshal(writer.data, &groups); err != nil { if err := json.Unmarshal(writer.Data(), &groups); err != nil {
return nil, writer.respCode, err return nil, writer.RespCode(), err
} }
return &groups, writer.respCode, nil return &groups, writer.RespCode(), nil
} }
return nil, writer.respCode, fmt.Errorf("Error, could not get list of group versions for APIService") return nil, writer.RespCode(), fmt.Errorf("Error, could not get list of group versions for APIService")
}
// inMemoryResponseWriter is a http.Writer that keep the response in memory.
type inMemoryResponseWriter struct {
writeHeaderCalled bool
header http.Header
respCode int
data []byte
}
func newInMemoryResponseWriter() *inMemoryResponseWriter {
return &inMemoryResponseWriter{header: http.Header{}}
}
func (r *inMemoryResponseWriter) Header() http.Header {
return r.header
}
func (r *inMemoryResponseWriter) WriteHeader(code int) {
r.writeHeaderCalled = true
r.respCode = code
}
func (r *inMemoryResponseWriter) Write(in []byte) (int, error) {
if !r.writeHeaderCalled {
r.WriteHeader(http.StatusOK)
}
r.data = append(r.data, in...)
return len(in), nil
}
func (r *inMemoryResponseWriter) String() string {
s := fmt.Sprintf("ResponseCode: %d", r.respCode)
if r.data != nil {
s += fmt.Sprintf(", Body: %s", string(r.data))
}
if r.header != nil {
s += fmt.Sprintf(", Header: %s", r.header)
}
return s
} }