mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-07 03:03:59 +00:00
Merge pull request #120300 from wojtek-t/refactor_streaming_watch_encoder
Refactor streaming watch encoder to enable caching
This commit is contained in:
commit
2a4d5c5fd5
@ -33,8 +33,10 @@ import (
|
|||||||
"k8s.io/apimachinery/pkg/runtime"
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||||
|
"k8s.io/apimachinery/pkg/watch"
|
||||||
"k8s.io/apiserver/pkg/endpoints/handlers/negotiation"
|
"k8s.io/apiserver/pkg/endpoints/handlers/negotiation"
|
||||||
"k8s.io/apiserver/pkg/endpoints/handlers/responsewriters"
|
"k8s.io/apiserver/pkg/endpoints/handlers/responsewriters"
|
||||||
|
"k8s.io/apiserver/pkg/endpoints/metrics"
|
||||||
endpointsrequest "k8s.io/apiserver/pkg/endpoints/request"
|
endpointsrequest "k8s.io/apiserver/pkg/endpoints/request"
|
||||||
|
|
||||||
klog "k8s.io/klog/v2"
|
klog "k8s.io/klog/v2"
|
||||||
@ -135,6 +137,113 @@ func (e *watchEmbeddedEncoder) embeddedIdentifier() runtime.Identifier {
|
|||||||
return runtime.Identifier(result)
|
return runtime.Identifier(result)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// watchEncoder performs encoding of the watch events.
|
||||||
|
//
|
||||||
|
// NOTE: watchEncoder is NOT thread-safe.
|
||||||
|
type watchEncoder struct {
|
||||||
|
ctx context.Context
|
||||||
|
kind schema.GroupVersionKind
|
||||||
|
embeddedEncoder runtime.Encoder
|
||||||
|
encoder runtime.Encoder
|
||||||
|
framer io.Writer
|
||||||
|
|
||||||
|
buffer runtime.Splice
|
||||||
|
eventBuffer runtime.Splice
|
||||||
|
|
||||||
|
currentEmbeddedIdentifier runtime.Identifier
|
||||||
|
identifiers map[watch.EventType]runtime.Identifier
|
||||||
|
}
|
||||||
|
|
||||||
|
func newWatchEncoder(ctx context.Context, kind schema.GroupVersionKind, embeddedEncoder runtime.Encoder, encoder runtime.Encoder, framer io.Writer) *watchEncoder {
|
||||||
|
return &watchEncoder{
|
||||||
|
ctx: ctx,
|
||||||
|
kind: kind,
|
||||||
|
embeddedEncoder: embeddedEncoder,
|
||||||
|
encoder: encoder,
|
||||||
|
framer: framer,
|
||||||
|
buffer: runtime.NewSpliceBuffer(),
|
||||||
|
eventBuffer: runtime.NewSpliceBuffer(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Encode encodes a given watch event.
|
||||||
|
// NOTE: if events object is implementing the CacheableObject interface,
|
||||||
|
//
|
||||||
|
// the serialized version is cached in that object [not the event itself].
|
||||||
|
func (e *watchEncoder) Encode(event watch.Event) error {
|
||||||
|
encodeFunc := func(obj runtime.Object, w io.Writer) error {
|
||||||
|
return e.doEncode(obj, event, w)
|
||||||
|
}
|
||||||
|
if co, ok := event.Object.(runtime.CacheableObject); ok {
|
||||||
|
return co.CacheEncode(e.identifier(event.Type), encodeFunc, e.framer)
|
||||||
|
}
|
||||||
|
return encodeFunc(event.Object, e.framer)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *watchEncoder) doEncode(obj runtime.Object, event watch.Event, w io.Writer) error {
|
||||||
|
defer e.buffer.Reset()
|
||||||
|
|
||||||
|
if err := e.embeddedEncoder.Encode(obj, e.buffer); err != nil {
|
||||||
|
return fmt.Errorf("unable to encode watch object %T: %v", obj, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// ContentType is not required here because we are defaulting to the serializer type.
|
||||||
|
outEvent := &metav1.WatchEvent{
|
||||||
|
Type: string(event.Type),
|
||||||
|
Object: runtime.RawExtension{Raw: e.buffer.Bytes()},
|
||||||
|
}
|
||||||
|
metrics.WatchEventsSizes.WithContext(e.ctx).WithLabelValues(e.kind.Group, e.kind.Version, e.kind.Kind).Observe(float64(len(outEvent.Object.Raw)))
|
||||||
|
|
||||||
|
defer e.eventBuffer.Reset()
|
||||||
|
if err := e.encoder.Encode(outEvent, e.eventBuffer); err != nil {
|
||||||
|
return fmt.Errorf("unable to encode watch object %T: %v (%#v)", outEvent, err, e)
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err := w.Write(e.eventBuffer.Bytes())
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
type watchEncoderIdentifier struct {
|
||||||
|
Name string `json:"name,omitempty"`
|
||||||
|
EmbeddedEncoder string `json:"embeddedEncoder,omitempty"`
|
||||||
|
Encoder string `json:"encoder,omitempty"`
|
||||||
|
EventType string `json:"eventType,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *watchEncoder) identifier(eventType watch.EventType) runtime.Identifier {
|
||||||
|
// We need to take into account that in embeddedEncoder includes table
|
||||||
|
// transformer, then its identifier is dynamic. As a result, whenever
|
||||||
|
// the identifier of embeddedEncoder changes, we need to invalidate the
|
||||||
|
// whole identifiers cache.
|
||||||
|
// TODO(wojtek-t): Can we optimize it somehow?
|
||||||
|
if e.currentEmbeddedIdentifier != e.embeddedEncoder.Identifier() {
|
||||||
|
e.currentEmbeddedIdentifier = e.embeddedEncoder.Identifier()
|
||||||
|
e.identifiers = map[watch.EventType]runtime.Identifier{}
|
||||||
|
}
|
||||||
|
if _, ok := e.identifiers[eventType]; !ok {
|
||||||
|
e.identifiers[eventType] = e.typeIdentifier(eventType)
|
||||||
|
}
|
||||||
|
return e.identifiers[eventType]
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *watchEncoder) typeIdentifier(eventType watch.EventType) runtime.Identifier {
|
||||||
|
// The eventType is a non-standard pattern. This is coming from the fact
|
||||||
|
// that we're effectively serializing the whole watch event, but storing
|
||||||
|
// it in serializations of the Object within the watch event.
|
||||||
|
identifier := watchEncoderIdentifier{
|
||||||
|
Name: "watch",
|
||||||
|
EmbeddedEncoder: string(e.embeddedEncoder.Identifier()),
|
||||||
|
Encoder: string(e.encoder.Identifier()),
|
||||||
|
EventType: string(eventType),
|
||||||
|
}
|
||||||
|
|
||||||
|
result, err := json.Marshal(identifier)
|
||||||
|
if err != nil {
|
||||||
|
klog.Fatalf("Failed marshaling identifier for watchEncoder: %v", err)
|
||||||
|
}
|
||||||
|
return runtime.Identifier(result)
|
||||||
|
}
|
||||||
|
|
||||||
// doTransformResponseObject is used for handling all requests, including watch.
|
// doTransformResponseObject is used for handling all requests, including watch.
|
||||||
func doTransformObject(ctx context.Context, obj runtime.Object, opts interface{}, target *schema.GroupVersionKind, scope *RequestScope) (runtime.Object, error) {
|
func doTransformObject(ctx context.Context, obj runtime.Object, opts interface{}, target *schema.GroupVersionKind, scope *RequestScope) (runtime.Object, error) {
|
||||||
if _, ok := obj.(*metav1.Status); ok {
|
if _, ok := obj.(*metav1.Status); ok {
|
||||||
|
@ -212,3 +212,13 @@ func TestAsPartialObjectMetadataList(t *testing.T) {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestWatchEncoderIdentifier(t *testing.T) {
|
||||||
|
eventFields := reflect.VisibleFields(reflect.TypeOf(metav1.WatchEvent{}))
|
||||||
|
if len(eventFields) != 2 {
|
||||||
|
t.Error("New field was added to metav1.WatchEvent.")
|
||||||
|
t.Error(" Ensure that the following places are updated accordingly:")
|
||||||
|
t.Error(" - watchEncoder::doEncode method when creating outEvent")
|
||||||
|
t.Error(" - watchEncoder::typeIdentifier to capture all relevant fields in identifier")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -27,7 +27,6 @@ import (
|
|||||||
"k8s.io/apimachinery/pkg/api/errors"
|
"k8s.io/apimachinery/pkg/api/errors"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/runtime"
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
"k8s.io/apimachinery/pkg/runtime/serializer/streaming"
|
|
||||||
"k8s.io/apimachinery/pkg/util/httpstream/wsstream"
|
"k8s.io/apimachinery/pkg/util/httpstream/wsstream"
|
||||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||||
"k8s.io/apimachinery/pkg/watch"
|
"k8s.io/apimachinery/pkg/watch"
|
||||||
@ -213,9 +212,6 @@ func (s *WatchServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
var e streaming.Encoder
|
|
||||||
e = streaming.NewEncoder(framer, s.Encoder)
|
|
||||||
|
|
||||||
// ensure the connection times out
|
// ensure the connection times out
|
||||||
timeoutCh, cleanup := s.TimeoutFactory.TimeoutCh()
|
timeoutCh, cleanup := s.TimeoutFactory.TimeoutCh()
|
||||||
defer cleanup()
|
defer cleanup()
|
||||||
@ -226,10 +222,7 @@ func (s *WatchServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
|
|||||||
w.WriteHeader(http.StatusOK)
|
w.WriteHeader(http.StatusOK)
|
||||||
flusher.Flush()
|
flusher.Flush()
|
||||||
|
|
||||||
var unknown runtime.Unknown
|
watchEncoder := newWatchEncoder(req.Context(), kind, s.EmbeddedEncoder, s.Encoder, framer)
|
||||||
internalEvent := &metav1.InternalEvent{}
|
|
||||||
outEvent := &metav1.WatchEvent{}
|
|
||||||
buf := runtime.NewSpliceBuffer()
|
|
||||||
ch := s.Watching.ResultChan()
|
ch := s.Watching.ResultChan()
|
||||||
done := req.Context().Done()
|
done := req.Context().Done()
|
||||||
|
|
||||||
@ -256,43 +249,18 @@ func (s *WatchServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
|
|||||||
metrics.WatchEvents.WithContext(req.Context()).WithLabelValues(kind.Group, kind.Version, kind.Kind).Inc()
|
metrics.WatchEvents.WithContext(req.Context()).WithLabelValues(kind.Group, kind.Version, kind.Kind).Inc()
|
||||||
isWatchListLatencyRecordingRequired := shouldRecordWatchListLatency(event)
|
isWatchListLatencyRecordingRequired := shouldRecordWatchListLatency(event)
|
||||||
|
|
||||||
if err := s.EmbeddedEncoder.Encode(event.Object, buf); err != nil {
|
if err := watchEncoder.Encode(event); err != nil {
|
||||||
// unexpected error
|
utilruntime.HandleError(err)
|
||||||
utilruntime.HandleError(fmt.Errorf("unable to encode watch object %T: %v", event.Object, err))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// ContentType is not required here because we are defaulting to the serializer
|
|
||||||
// type
|
|
||||||
unknown.Raw = buf.Bytes()
|
|
||||||
event.Object = &unknown
|
|
||||||
metrics.WatchEventsSizes.WithContext(req.Context()).WithLabelValues(kind.Group, kind.Version, kind.Kind).Observe(float64(len(unknown.Raw)))
|
|
||||||
|
|
||||||
*outEvent = metav1.WatchEvent{}
|
|
||||||
|
|
||||||
// create the external type directly and encode it. Clients will only recognize the serialization we provide.
|
|
||||||
// The internal event is being reused, not reallocated so its just a few extra assignments to do it this way
|
|
||||||
// and we get the benefit of using conversion functions which already have to stay in sync
|
|
||||||
*internalEvent = metav1.InternalEvent(event)
|
|
||||||
err := metav1.Convert_v1_InternalEvent_To_v1_WatchEvent(internalEvent, outEvent, nil)
|
|
||||||
if err != nil {
|
|
||||||
utilruntime.HandleError(fmt.Errorf("unable to convert watch object: %v", err))
|
|
||||||
// client disconnect.
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if err := e.Encode(outEvent); err != nil {
|
|
||||||
utilruntime.HandleError(fmt.Errorf("unable to encode watch object %T: %v (%#v)", outEvent, err, e))
|
|
||||||
// client disconnect.
|
// client disconnect.
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(ch) == 0 {
|
if len(ch) == 0 {
|
||||||
flusher.Flush()
|
flusher.Flush()
|
||||||
}
|
}
|
||||||
if isWatchListLatencyRecordingRequired {
|
if isWatchListLatencyRecordingRequired {
|
||||||
metrics.RecordWatchListLatency(req.Context(), s.Scope.Resource, s.metricsScope)
|
metrics.RecordWatchListLatency(req.Context(), s.Scope.Resource, s.metricsScope)
|
||||||
}
|
}
|
||||||
|
|
||||||
buf.Reset()
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -2361,6 +2361,19 @@ func TestWatchTransformCaching(t *testing.T) {
|
|||||||
}
|
}
|
||||||
defer wTableIncludeObject.Close()
|
defer wTableIncludeObject.Close()
|
||||||
|
|
||||||
|
wTableIncludeObjectFiltered, err := clientSet.CoreV1().RESTClient().Get().
|
||||||
|
AbsPath("/api/v1/namespaces/watch-transform/configmaps").
|
||||||
|
SetHeader("Accept", "application/json;as=Table;g=meta.k8s.io;v=v1beta1").
|
||||||
|
VersionedParams(listOptions, metav1.ParameterCodec).
|
||||||
|
Param("includeObject", string(metav1.IncludeObject)).
|
||||||
|
Param("labelSelector", "foo=bar").
|
||||||
|
Timeout(timeout).
|
||||||
|
Stream(ctx)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Failed to start table object watch: %v", err)
|
||||||
|
}
|
||||||
|
defer wTableIncludeObjectFiltered.Close()
|
||||||
|
|
||||||
configMap, err := clientSet.CoreV1().ConfigMaps("watch-transform").Create(ctx, &v1.ConfigMap{
|
configMap, err := clientSet.CoreV1().ConfigMaps("watch-transform").Create(ctx, &v1.ConfigMap{
|
||||||
ObjectMeta: metav1.ObjectMeta{Name: "test1"},
|
ObjectMeta: metav1.ObjectMeta{Name: "test1"},
|
||||||
Data: map[string]string{
|
Data: map[string]string{
|
||||||
@ -2397,6 +2410,30 @@ func TestWatchTransformCaching(t *testing.T) {
|
|||||||
t.Fatalf("Failed to create a second configMap: %v", err)
|
t.Fatalf("Failed to create a second configMap: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Now update both configmaps so that filtering watch can observe them.
|
||||||
|
// This is needed to validate whether events caching done by apiserver
|
||||||
|
// distinguished objects by type.
|
||||||
|
|
||||||
|
configMapUpdated, err := clientSet.CoreV1().ConfigMaps("watch-transform").Update(ctx, &v1.ConfigMap{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{Name: "test1", Labels: map[string]string{"foo": "bar"}},
|
||||||
|
Data: map[string]string{
|
||||||
|
"foo": "baz",
|
||||||
|
},
|
||||||
|
}, metav1.UpdateOptions{})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Failed to update a configMap: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
configMap2Updated, err := clientSet.CoreV1().ConfigMaps("watch-transform").Update(ctx, &v1.ConfigMap{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{Name: "test2", Labels: map[string]string{"foo": "bar"}},
|
||||||
|
Data: map[string]string{
|
||||||
|
"foo": "baz",
|
||||||
|
},
|
||||||
|
}, metav1.UpdateOptions{})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Failed to update a second configMap: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
metaChecks := []partialObjectMetadataCheck{
|
metaChecks := []partialObjectMetadataCheck{
|
||||||
func(res *metav1beta1.PartialObjectMetadata) {
|
func(res *metav1beta1.PartialObjectMetadata) {
|
||||||
if !apiequality.Semantic.DeepEqual(configMap.ObjectMeta, res.ObjectMeta) {
|
if !apiequality.Semantic.DeepEqual(configMap.ObjectMeta, res.ObjectMeta) {
|
||||||
@ -2408,6 +2445,16 @@ func TestWatchTransformCaching(t *testing.T) {
|
|||||||
t.Errorf("expected object: %#v, got: %#v", configMap2.ObjectMeta, res.ObjectMeta)
|
t.Errorf("expected object: %#v, got: %#v", configMap2.ObjectMeta, res.ObjectMeta)
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
func(res *metav1beta1.PartialObjectMetadata) {
|
||||||
|
if !apiequality.Semantic.DeepEqual(configMapUpdated.ObjectMeta, res.ObjectMeta) {
|
||||||
|
t.Errorf("expected object: %#v, got: %#v", configMapUpdated.ObjectMeta, res.ObjectMeta)
|
||||||
|
}
|
||||||
|
},
|
||||||
|
func(res *metav1beta1.PartialObjectMetadata) {
|
||||||
|
if !apiequality.Semantic.DeepEqual(configMap2Updated.ObjectMeta, res.ObjectMeta) {
|
||||||
|
t.Errorf("expected object: %#v, got: %#v", configMap2Updated.ObjectMeta, res.ObjectMeta)
|
||||||
|
}
|
||||||
|
},
|
||||||
}
|
}
|
||||||
expectPartialObjectMetaEventsProtobufChecks(t, wMeta, metaChecks)
|
expectPartialObjectMetaEventsProtobufChecks(t, wMeta, metaChecks)
|
||||||
|
|
||||||
@ -2421,39 +2468,67 @@ func TestWatchTransformCaching(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
objectMetas := expectTableWatchEvents(t, 2, 3, metav1.IncludeMetadata, json.NewDecoder(wTableIncludeMeta))
|
objectMetas := expectTableWatchEvents(t, 4, 3, metav1.IncludeMetadata, json.NewDecoder(wTableIncludeMeta))
|
||||||
tableMetaCheck(configMap, objectMetas[0])
|
tableMetaCheck(configMap, objectMetas[0])
|
||||||
tableMetaCheck(configMap2, objectMetas[1])
|
tableMetaCheck(configMap2, objectMetas[1])
|
||||||
|
tableMetaCheck(configMapUpdated, objectMetas[2])
|
||||||
|
tableMetaCheck(configMap2Updated, objectMetas[3])
|
||||||
|
|
||||||
tableObjectCheck := func(expected *v1.ConfigMap, got []byte) {
|
tableObjectCheck := func(expectedType watch.EventType, expectedObj *v1.ConfigMap, got streamedEvent) {
|
||||||
var obj *v1.ConfigMap
|
var obj *v1.ConfigMap
|
||||||
if err := json.Unmarshal(got, &obj); err != nil {
|
if err := json.Unmarshal(got.rawObject, &obj); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
if expectedType != watch.EventType(got.eventType) {
|
||||||
|
t.Errorf("expected type: %#v, got: %#v", expectedType, got.eventType)
|
||||||
|
}
|
||||||
obj.TypeMeta = metav1.TypeMeta{}
|
obj.TypeMeta = metav1.TypeMeta{}
|
||||||
if !apiequality.Semantic.DeepEqual(expected, obj) {
|
if !apiequality.Semantic.DeepEqual(expectedObj, obj) {
|
||||||
t.Errorf("expected object: %#v, got: %#v", expected, obj)
|
t.Errorf("expected object: %#v, got: %#v", expectedObj, obj)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
objects := expectTableWatchEvents(t, 2, 3, metav1.IncludeObject, json.NewDecoder(wTableIncludeObject))
|
objects := expectTableWatchEventsWithTypes(t, 4, 3, metav1.IncludeObject, json.NewDecoder(wTableIncludeObject))
|
||||||
tableObjectCheck(configMap, objects[0])
|
tableObjectCheck(watch.Added, configMap, objects[0])
|
||||||
tableObjectCheck(configMap2, objects[1])
|
tableObjectCheck(watch.Added, configMap2, objects[1])
|
||||||
|
tableObjectCheck(watch.Modified, configMapUpdated, objects[2])
|
||||||
|
tableObjectCheck(watch.Modified, configMap2Updated, objects[3])
|
||||||
|
|
||||||
delayedObjects := expectTableWatchEvents(t, 1, 3, metav1.IncludeObject, json.NewDecoder(wTableIncludeObjectDelayed))
|
filteredObjects := expectTableWatchEventsWithTypes(t, 2, 3, metav1.IncludeObject, json.NewDecoder(wTableIncludeObjectFiltered))
|
||||||
tableObjectCheck(configMap2, delayedObjects[0])
|
tableObjectCheck(watch.Added, configMapUpdated, filteredObjects[0])
|
||||||
|
tableObjectCheck(watch.Added, configMap2Updated, filteredObjects[1])
|
||||||
|
|
||||||
|
delayedObjects := expectTableWatchEventsWithTypes(t, 3, 3, metav1.IncludeObject, json.NewDecoder(wTableIncludeObjectDelayed))
|
||||||
|
tableObjectCheck(watch.Added, configMap2, delayedObjects[0])
|
||||||
|
tableObjectCheck(watch.Modified, configMapUpdated, delayedObjects[1])
|
||||||
|
tableObjectCheck(watch.Modified, configMap2Updated, delayedObjects[2])
|
||||||
}
|
}
|
||||||
|
|
||||||
func expectTableWatchEvents(t *testing.T, count, columns int, policy metav1.IncludeObjectPolicy, d *json.Decoder) [][]byte {
|
func expectTableWatchEvents(t *testing.T, count, columns int, policy metav1.IncludeObjectPolicy, d *json.Decoder) [][]byte {
|
||||||
|
events := expectTableWatchEventsWithTypes(t, count, columns, policy, d)
|
||||||
|
var objects [][]byte
|
||||||
|
for _, event := range events {
|
||||||
|
objects = append(objects, event.rawObject)
|
||||||
|
}
|
||||||
|
return objects
|
||||||
|
}
|
||||||
|
|
||||||
|
type streamedEvent struct {
|
||||||
|
eventType string
|
||||||
|
rawObject []byte
|
||||||
|
}
|
||||||
|
|
||||||
|
func expectTableWatchEventsWithTypes(t *testing.T, count, columns int, policy metav1.IncludeObjectPolicy, d *json.Decoder) []streamedEvent {
|
||||||
t.Helper()
|
t.Helper()
|
||||||
|
|
||||||
var objects [][]byte
|
var events []streamedEvent
|
||||||
|
|
||||||
for i := 0; i < count; i++ {
|
for i := 0; i < count; i++ {
|
||||||
var evt metav1.WatchEvent
|
var evt metav1.WatchEvent
|
||||||
if err := d.Decode(&evt); err != nil {
|
if err := d.Decode(&evt); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
var table metav1beta1.Table
|
var table metav1beta1.Table
|
||||||
if err := json.Unmarshal(evt.Object.Raw, &table); err != nil {
|
if err := json.Unmarshal(evt.Object.Raw, &table); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
@ -2484,7 +2559,7 @@ func expectTableWatchEvents(t *testing.T, count, columns int, policy metav1.Incl
|
|||||||
if meta.TypeMeta != partialObj {
|
if meta.TypeMeta != partialObj {
|
||||||
t.Fatalf("expected partial object: %#v", meta)
|
t.Fatalf("expected partial object: %#v", meta)
|
||||||
}
|
}
|
||||||
objects = append(objects, row.Object.Raw)
|
events = append(events, streamedEvent{eventType: evt.Type, rawObject: row.Object.Raw})
|
||||||
case metav1.IncludeNone:
|
case metav1.IncludeNone:
|
||||||
if len(row.Object.Raw) != 0 {
|
if len(row.Object.Raw) != 0 {
|
||||||
t.Fatalf("Expected no object: %s", string(row.Object.Raw))
|
t.Fatalf("Expected no object: %s", string(row.Object.Raw))
|
||||||
@ -2493,10 +2568,10 @@ func expectTableWatchEvents(t *testing.T, count, columns int, policy metav1.Incl
|
|||||||
if len(row.Object.Raw) == 0 {
|
if len(row.Object.Raw) == 0 {
|
||||||
t.Fatalf("Expected object: %s", string(row.Object.Raw))
|
t.Fatalf("Expected object: %s", string(row.Object.Raw))
|
||||||
}
|
}
|
||||||
objects = append(objects, row.Object.Raw)
|
events = append(events, streamedEvent{eventType: evt.Type, rawObject: row.Object.Raw})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return objects
|
return events
|
||||||
}
|
}
|
||||||
|
|
||||||
func expectPartialObjectMetaEvents(t *testing.T, d *json.Decoder, values ...string) {
|
func expectPartialObjectMetaEvents(t *testing.T, d *json.Decoder, values ...string) {
|
||||||
|
Loading…
Reference in New Issue
Block a user