mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-09-17 23:19:26 +00:00
Refactor transformers for watch to implement Encoder interface
This commit is contained in:
@@ -18,8 +18,11 @@ package handlers
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"io"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"reflect"
|
||||||
|
|
||||||
"k8s.io/apimachinery/pkg/api/errors"
|
"k8s.io/apimachinery/pkg/api/errors"
|
||||||
"k8s.io/apimachinery/pkg/api/meta"
|
"k8s.io/apimachinery/pkg/api/meta"
|
||||||
@@ -29,30 +32,107 @@ import (
|
|||||||
"k8s.io/apimachinery/pkg/apis/meta/v1beta1/validation"
|
"k8s.io/apimachinery/pkg/apis/meta/v1beta1/validation"
|
||||||
"k8s.io/apimachinery/pkg/runtime"
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||||
|
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||||
"k8s.io/apiserver/pkg/endpoints/handlers/negotiation"
|
"k8s.io/apiserver/pkg/endpoints/handlers/negotiation"
|
||||||
"k8s.io/apiserver/pkg/endpoints/handlers/responsewriters"
|
"k8s.io/apiserver/pkg/endpoints/handlers/responsewriters"
|
||||||
endpointsrequest "k8s.io/apiserver/pkg/endpoints/request"
|
endpointsrequest "k8s.io/apiserver/pkg/endpoints/request"
|
||||||
|
|
||||||
|
klog "k8s.io/klog/v2"
|
||||||
)
|
)
|
||||||
|
|
||||||
// transformObject takes the object as returned by storage and ensures it is in
|
// watchEmbeddedEncoder performs encoding of the embedded object.
|
||||||
// the client's desired form, as well as ensuring any API level fields like self-link
|
|
||||||
// are properly set.
|
|
||||||
func transformObject(ctx context.Context, obj runtime.Object, opts interface{}, target *schema.GroupVersionKind, scope *RequestScope) (runtime.Object, error) {
|
|
||||||
if co, ok := obj.(runtime.CacheableObject); ok {
|
|
||||||
if target != nil {
|
|
||||||
// Non-nil mediaType.Convert means that some conversion of the object
|
|
||||||
// has to happen. Currently conversion may potentially modify the
|
|
||||||
// object or assume something about it (e.g. asTable operates on
|
|
||||||
// reflection, which won't work for any wrapper).
|
|
||||||
// To ensure it will work correctly, let's operate on base objects
|
|
||||||
// and not cache it for now.
|
|
||||||
//
|
//
|
||||||
// TODO: Long-term, transformObject should be changed so that it
|
// NOTE: watchEmbeddedEncoder is NOT thread-safe.
|
||||||
// implements runtime.Encoder interface.
|
type watchEmbeddedEncoder struct {
|
||||||
return doTransformObject(ctx, co.GetObject(), opts, target, scope)
|
encoder runtime.Encoder
|
||||||
|
|
||||||
|
ctx context.Context
|
||||||
|
|
||||||
|
// target, if non-nil, configures transformation type.
|
||||||
|
// The other options are ignored if target is nil.
|
||||||
|
target *schema.GroupVersionKind
|
||||||
|
tableOptions *metav1.TableOptions
|
||||||
|
scope *RequestScope
|
||||||
|
|
||||||
|
// identifier of the encoder, computed lazily
|
||||||
|
identifier runtime.Identifier
|
||||||
|
}
|
||||||
|
|
||||||
|
func newWatchEmbeddedEncoder(ctx context.Context, encoder runtime.Encoder, target *schema.GroupVersionKind, tableOptions *metav1.TableOptions, scope *RequestScope) *watchEmbeddedEncoder {
|
||||||
|
return &watchEmbeddedEncoder{
|
||||||
|
encoder: encoder,
|
||||||
|
ctx: ctx,
|
||||||
|
target: target,
|
||||||
|
tableOptions: tableOptions,
|
||||||
|
scope: scope,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return doTransformObject(ctx, obj, opts, target, scope)
|
|
||||||
|
// Encode implements runtime.Encoder interface.
|
||||||
|
func (e *watchEmbeddedEncoder) Encode(obj runtime.Object, w io.Writer) error {
|
||||||
|
if co, ok := obj.(runtime.CacheableObject); ok {
|
||||||
|
return co.CacheEncode(e.Identifier(), e.doEncode, w)
|
||||||
|
}
|
||||||
|
return e.doEncode(obj, w)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *watchEmbeddedEncoder) doEncode(obj runtime.Object, w io.Writer) error {
|
||||||
|
result, err := doTransformObject(e.ctx, obj, e.tableOptions, e.target, e.scope)
|
||||||
|
if err != nil {
|
||||||
|
utilruntime.HandleError(fmt.Errorf("failed to transform object %v: %v", reflect.TypeOf(obj), err))
|
||||||
|
result = obj
|
||||||
|
}
|
||||||
|
|
||||||
|
// When we are tranforming to a table, use the original table options when
|
||||||
|
// we should print headers only on the first object - headers should be
|
||||||
|
// omitted on subsequent events.
|
||||||
|
if e.tableOptions != nil && !e.tableOptions.NoHeaders {
|
||||||
|
e.tableOptions.NoHeaders = true
|
||||||
|
// With options change, we should recompute the identifier.
|
||||||
|
// Clearing this will trigger lazy recompute when needed.
|
||||||
|
e.identifier = ""
|
||||||
|
}
|
||||||
|
|
||||||
|
return e.encoder.Encode(result, w)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Identifier implements runtime.Encoder interface.
|
||||||
|
func (e *watchEmbeddedEncoder) Identifier() runtime.Identifier {
|
||||||
|
if e.identifier == "" {
|
||||||
|
e.identifier = e.embeddedIdentifier()
|
||||||
|
}
|
||||||
|
return e.identifier
|
||||||
|
}
|
||||||
|
|
||||||
|
type watchEmbeddedEncoderIdentifier struct {
|
||||||
|
Name string `json:"name,omitempty"`
|
||||||
|
Encoder string `json:"encoder,omitempty"`
|
||||||
|
Target string `json:"target,omitempty"`
|
||||||
|
Options metav1.TableOptions `json:"options,omitempty"`
|
||||||
|
NoHeaders bool `json:"noHeaders,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *watchEmbeddedEncoder) embeddedIdentifier() runtime.Identifier {
|
||||||
|
if e.target == nil {
|
||||||
|
// If no conversion is performed, we effective only use
|
||||||
|
// the embedded identifier.
|
||||||
|
return e.encoder.Identifier()
|
||||||
|
}
|
||||||
|
identifier := watchEmbeddedEncoderIdentifier{
|
||||||
|
Name: "watch-embedded",
|
||||||
|
Encoder: string(e.encoder.Identifier()),
|
||||||
|
Target: e.target.String(),
|
||||||
|
}
|
||||||
|
if e.target.Kind == "Table" && e.tableOptions != nil {
|
||||||
|
identifier.Options = *e.tableOptions
|
||||||
|
identifier.NoHeaders = e.tableOptions.NoHeaders
|
||||||
|
}
|
||||||
|
|
||||||
|
result, err := json.Marshal(identifier)
|
||||||
|
if err != nil {
|
||||||
|
klog.Fatalf("Failed marshaling identifier for watchEmbeddedEncoder: %v", err)
|
||||||
|
}
|
||||||
|
return runtime.Identifier(result)
|
||||||
}
|
}
|
||||||
|
|
||||||
// doTransformResponseObject is used for handling all requests, including watch.
|
// doTransformResponseObject is used for handling all requests, including watch.
|
||||||
@@ -63,6 +143,8 @@ func doTransformObject(ctx context.Context, obj runtime.Object, opts interface{}
|
|||||||
|
|
||||||
switch {
|
switch {
|
||||||
case target == nil:
|
case target == nil:
|
||||||
|
// If we ever change that from a no-op, the identifier of
|
||||||
|
// the watchEmbeddedEncoder has to be adjusted accordingly.
|
||||||
return obj, nil
|
return obj, nil
|
||||||
|
|
||||||
case target.Kind == "PartialObjectMetadata":
|
case target.Kind == "PartialObjectMetadata":
|
||||||
@@ -140,7 +222,7 @@ func transformResponseObject(ctx context.Context, scope *RequestScope, req *http
|
|||||||
|
|
||||||
var obj runtime.Object
|
var obj runtime.Object
|
||||||
do := func() {
|
do := func() {
|
||||||
obj, err = transformObject(ctx, result, options, mediaType.Convert, scope)
|
obj, err = doTransformObject(ctx, result, options, mediaType.Convert, scope)
|
||||||
}
|
}
|
||||||
endpointsrequest.TrackTransformResponseObjectLatency(ctx, do)
|
endpointsrequest.TrackTransformResponseObjectLatency(ctx, do)
|
||||||
|
|
||||||
|
@@ -63,7 +63,7 @@ func (m *mockCacheableObject) SetGroupVersionKind(gvk schema.GroupVersionKind) {
|
|||||||
|
|
||||||
// CacheEncode implements runtime.CacheableObject interface.
|
// CacheEncode implements runtime.CacheableObject interface.
|
||||||
func (m *mockCacheableObject) CacheEncode(id runtime.Identifier, encode func(runtime.Object, io.Writer) error, w io.Writer) error {
|
func (m *mockCacheableObject) CacheEncode(id runtime.Identifier, encode func(runtime.Object, io.Writer) error, w io.Writer) error {
|
||||||
return fmt.Errorf("unimplemented")
|
return encode(m.obj.DeepCopyObject(), w)
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetObject implements runtime.CacheableObject interface.
|
// GetObject implements runtime.CacheableObject interface.
|
||||||
@@ -77,6 +77,19 @@ func (*mockNamer) Namespace(_ *http.Request) (string, error) { return
|
|||||||
func (*mockNamer) Name(_ *http.Request) (string, string, error) { return "", "", nil }
|
func (*mockNamer) Name(_ *http.Request) (string, string, error) { return "", "", nil }
|
||||||
func (*mockNamer) ObjectName(_ runtime.Object) (string, string, error) { return "", "", nil }
|
func (*mockNamer) ObjectName(_ runtime.Object) (string, string, error) { return "", "", nil }
|
||||||
|
|
||||||
|
type mockEncoder struct {
|
||||||
|
obj runtime.Object
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *mockEncoder) Encode(obj runtime.Object, _ io.Writer) error {
|
||||||
|
e.obj = obj
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *mockEncoder) Identifier() runtime.Identifier {
|
||||||
|
return runtime.Identifier("")
|
||||||
|
}
|
||||||
|
|
||||||
func TestCacheableObject(t *testing.T) {
|
func TestCacheableObject(t *testing.T) {
|
||||||
pomGVK := metav1.SchemeGroupVersion.WithKind("PartialObjectMetadata")
|
pomGVK := metav1.SchemeGroupVersion.WithKind("PartialObjectMetadata")
|
||||||
tableGVK := metav1.SchemeGroupVersion.WithKind("Table")
|
tableGVK := metav1.SchemeGroupVersion.WithKind("Table")
|
||||||
@@ -125,7 +138,7 @@ func TestCacheableObject(t *testing.T) {
|
|||||||
desc: "cacheableObject nil convert",
|
desc: "cacheableObject nil convert",
|
||||||
object: &mockCacheableObject{obj: pod},
|
object: &mockCacheableObject{obj: pod},
|
||||||
target: nil,
|
target: nil,
|
||||||
expectedObj: &mockCacheableObject{obj: pod},
|
expectedObj: pod,
|
||||||
expectedErr: nil,
|
expectedErr: nil,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
@@ -147,20 +160,22 @@ func TestCacheableObject(t *testing.T) {
|
|||||||
|
|
||||||
for _, test := range testCases {
|
for _, test := range testCases {
|
||||||
t.Run(test.desc, func(t *testing.T) {
|
t.Run(test.desc, func(t *testing.T) {
|
||||||
result, err := transformObject(
|
internalEncoder := &mockEncoder{}
|
||||||
|
watchEncoder := newWatchEmbeddedEncoder(
|
||||||
request.WithRequestInfo(context.TODO(), &request.RequestInfo{}),
|
request.WithRequestInfo(context.TODO(), &request.RequestInfo{}),
|
||||||
test.object, test.opts, test.target,
|
internalEncoder, test.target, test.opts,
|
||||||
&RequestScope{
|
&RequestScope{
|
||||||
Namer: &mockNamer{},
|
Namer: &mockNamer{},
|
||||||
TableConvertor: tableConvertor,
|
TableConvertor: tableConvertor,
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
|
||||||
|
err := watchEncoder.Encode(test.object, nil)
|
||||||
if err != test.expectedErr {
|
if err != test.expectedErr {
|
||||||
t.Errorf("unexpected error: %v, expected: %v", err, test.expectedErr)
|
t.Errorf("unexpected error: %v, expected: %v", err, test.expectedErr)
|
||||||
}
|
}
|
||||||
if a, e := result, test.expectedObj; !reflect.DeepEqual(a, e) {
|
if a, e := internalEncoder.obj, test.expectedObj; !reflect.DeepEqual(a, e) {
|
||||||
t.Errorf("unexpected result: %v, expected: %v", a, e)
|
t.Errorf("unexpected result: %#v, expected: %#v", a, e)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
@@ -20,7 +20,6 @@ import (
|
|||||||
"bytes"
|
"bytes"
|
||||||
"fmt"
|
"fmt"
|
||||||
"net/http"
|
"net/http"
|
||||||
"reflect"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"golang.org/x/net/websocket"
|
"golang.org/x/net/websocket"
|
||||||
@@ -91,6 +90,8 @@ func serveWatch(watcher watch.Interface, scope *RequestScope, mediaTypeOptions n
|
|||||||
mediaType += ";stream=watch"
|
mediaType += ";stream=watch"
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ctx := req.Context()
|
||||||
|
|
||||||
// locate the appropriate embedded encoder based on the transform
|
// locate the appropriate embedded encoder based on the transform
|
||||||
var embeddedEncoder runtime.Encoder
|
var embeddedEncoder runtime.Encoder
|
||||||
contentKind, contentSerializer, transform := targetEncodingForTransform(scope, mediaTypeOptions, req)
|
contentKind, contentSerializer, transform := targetEncodingForTransform(scope, mediaTypeOptions, req)
|
||||||
@@ -114,6 +115,16 @@ func serveWatch(watcher watch.Interface, scope *RequestScope, mediaTypeOptions n
|
|||||||
defer runtime.AllocatorPool.Put(memoryAllocator)
|
defer runtime.AllocatorPool.Put(memoryAllocator)
|
||||||
embeddedEncoder = runtime.NewEncoderWithAllocator(encoderWithAllocator, memoryAllocator)
|
embeddedEncoder = runtime.NewEncoderWithAllocator(encoderWithAllocator, memoryAllocator)
|
||||||
}
|
}
|
||||||
|
var tableOptions *metav1.TableOptions
|
||||||
|
if options != nil {
|
||||||
|
if passedOptions, ok := options.(*metav1.TableOptions); ok {
|
||||||
|
tableOptions = passedOptions
|
||||||
|
} else {
|
||||||
|
scope.err(fmt.Errorf("unexpected options type: %T", options), w, req)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
embeddedEncoder = newWatchEmbeddedEncoder(ctx, embeddedEncoder, mediaTypeOptions.Convert, tableOptions, scope)
|
||||||
|
|
||||||
if encoderWithAllocator, supportsAllocator := encoder.(runtime.EncoderWithAllocator); supportsAllocator {
|
if encoderWithAllocator, supportsAllocator := encoder.(runtime.EncoderWithAllocator); supportsAllocator {
|
||||||
if memoryAllocator == nil {
|
if memoryAllocator == nil {
|
||||||
@@ -130,8 +141,6 @@ func serveWatch(watcher watch.Interface, scope *RequestScope, mediaTypeOptions n
|
|||||||
serverShuttingDownCh = signals.ShuttingDown()
|
serverShuttingDownCh = signals.ShuttingDown()
|
||||||
}
|
}
|
||||||
|
|
||||||
ctx := req.Context()
|
|
||||||
|
|
||||||
server := &WatchServer{
|
server := &WatchServer{
|
||||||
Watching: watcher,
|
Watching: watcher,
|
||||||
Scope: scope,
|
Scope: scope,
|
||||||
@@ -142,21 +151,6 @@ func serveWatch(watcher watch.Interface, scope *RequestScope, mediaTypeOptions n
|
|||||||
Encoder: encoder,
|
Encoder: encoder,
|
||||||
EmbeddedEncoder: embeddedEncoder,
|
EmbeddedEncoder: embeddedEncoder,
|
||||||
|
|
||||||
Fixup: func(obj runtime.Object) runtime.Object {
|
|
||||||
result, err := transformObject(ctx, obj, options, mediaTypeOptions.Convert, scope)
|
|
||||||
if err != nil {
|
|
||||||
utilruntime.HandleError(fmt.Errorf("failed to transform object %v: %v", reflect.TypeOf(obj), err))
|
|
||||||
return obj
|
|
||||||
}
|
|
||||||
// When we are transformed to a table, use the table options as the state for whether we
|
|
||||||
// should print headers - on watch, we only want to print table headers on the first object
|
|
||||||
// and omit them on subsequent events.
|
|
||||||
if tableOptions, ok := options.(*metav1.TableOptions); ok {
|
|
||||||
tableOptions.NoHeaders = true
|
|
||||||
}
|
|
||||||
return result
|
|
||||||
},
|
|
||||||
|
|
||||||
TimeoutFactory: &realTimeoutFactory{timeout},
|
TimeoutFactory: &realTimeoutFactory{timeout},
|
||||||
ServerShuttingDownCh: serverShuttingDownCh,
|
ServerShuttingDownCh: serverShuttingDownCh,
|
||||||
}
|
}
|
||||||
@@ -179,8 +173,6 @@ type WatchServer struct {
|
|||||||
Encoder runtime.Encoder
|
Encoder runtime.Encoder
|
||||||
// used to encode the nested object in the watch stream
|
// used to encode the nested object in the watch stream
|
||||||
EmbeddedEncoder runtime.Encoder
|
EmbeddedEncoder runtime.Encoder
|
||||||
// used to correct the object before we send it to the serializer
|
|
||||||
Fixup func(runtime.Object) runtime.Object
|
|
||||||
|
|
||||||
TimeoutFactory TimeoutFactory
|
TimeoutFactory TimeoutFactory
|
||||||
ServerShuttingDownCh <-chan struct{}
|
ServerShuttingDownCh <-chan struct{}
|
||||||
@@ -256,10 +248,9 @@ func (s *WatchServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
|
|||||||
}
|
}
|
||||||
metrics.WatchEvents.WithContext(req.Context()).WithLabelValues(kind.Group, kind.Version, kind.Kind).Inc()
|
metrics.WatchEvents.WithContext(req.Context()).WithLabelValues(kind.Group, kind.Version, kind.Kind).Inc()
|
||||||
|
|
||||||
obj := s.Fixup(event.Object)
|
if err := s.EmbeddedEncoder.Encode(event.Object, buf); err != nil {
|
||||||
if err := s.EmbeddedEncoder.Encode(obj, buf); err != nil {
|
|
||||||
// unexpected error
|
// unexpected error
|
||||||
utilruntime.HandleError(fmt.Errorf("unable to encode watch object %T: %v", obj, err))
|
utilruntime.HandleError(fmt.Errorf("unable to encode watch object %T: %v", event.Object, err))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -324,10 +315,10 @@ func (s *WatchServer) HandleWS(ws *websocket.Conn) {
|
|||||||
// End of results.
|
// End of results.
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
obj := s.Fixup(event.Object)
|
|
||||||
if err := s.EmbeddedEncoder.Encode(obj, buf); err != nil {
|
if err := s.EmbeddedEncoder.Encode(event.Object, buf); err != nil {
|
||||||
// unexpected error
|
// unexpected error
|
||||||
utilruntime.HandleError(fmt.Errorf("unable to encode watch object %T: %v", obj, err))
|
utilruntime.HandleError(fmt.Errorf("unable to encode watch object %T: %v", event.Object, err))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -647,7 +647,6 @@ func TestWatchHTTPErrors(t *testing.T) {
|
|||||||
Encoder: newCodec,
|
Encoder: newCodec,
|
||||||
EmbeddedEncoder: newCodec,
|
EmbeddedEncoder: newCodec,
|
||||||
|
|
||||||
Fixup: func(obj runtime.Object) runtime.Object { return obj },
|
|
||||||
TimeoutFactory: &fakeTimeoutFactory{timeoutCh, done},
|
TimeoutFactory: &fakeTimeoutFactory{timeoutCh, done},
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -712,7 +711,6 @@ func TestWatchHTTPErrorsBeforeServe(t *testing.T) {
|
|||||||
Encoder: newCodec,
|
Encoder: newCodec,
|
||||||
EmbeddedEncoder: newCodec,
|
EmbeddedEncoder: newCodec,
|
||||||
|
|
||||||
Fixup: func(obj runtime.Object) runtime.Object { return obj },
|
|
||||||
TimeoutFactory: &fakeTimeoutFactory{timeoutCh, done},
|
TimeoutFactory: &fakeTimeoutFactory{timeoutCh, done},
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -771,7 +769,6 @@ func TestWatchHTTPDynamicClientErrors(t *testing.T) {
|
|||||||
Encoder: newCodec,
|
Encoder: newCodec,
|
||||||
EmbeddedEncoder: newCodec,
|
EmbeddedEncoder: newCodec,
|
||||||
|
|
||||||
Fixup: func(obj runtime.Object) runtime.Object { return obj },
|
|
||||||
TimeoutFactory: &fakeTimeoutFactory{timeoutCh, done},
|
TimeoutFactory: &fakeTimeoutFactory{timeoutCh, done},
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -814,7 +811,6 @@ func TestWatchHTTPTimeout(t *testing.T) {
|
|||||||
Encoder: newCodec,
|
Encoder: newCodec,
|
||||||
EmbeddedEncoder: newCodec,
|
EmbeddedEncoder: newCodec,
|
||||||
|
|
||||||
Fixup: func(obj runtime.Object) runtime.Object { return obj },
|
|
||||||
TimeoutFactory: &fakeTimeoutFactory{timeoutCh, done},
|
TimeoutFactory: &fakeTimeoutFactory{timeoutCh, done},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user