mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 19:56:01 +00:00
Merge pull request #25894 from brendandburns/thirdparty-watch
Automatic merge from submit-queue Fix third party Fixes https://github.com/kubernetes/kubernetes/issues/25421 Fixes https://github.com/kubernetes/kubernetes/issues/25422 @AdoHe @sjenning @caesarxuchao @lavalamp @kubernetes/sig-api-machinery []()
This commit is contained in:
commit
421c16addd
@ -373,7 +373,8 @@ func NewFactory(optionalClientConfig clientcmd.ClientConfig) *Factory {
|
||||
gv := gvk.GroupVersion()
|
||||
cfg.GroupVersion = &gv
|
||||
cfg.APIPath = "/apis"
|
||||
cfg.Codec = thirdpartyresourcedata.NewCodec(c.ExtensionsClient.RESTClient.Codec(), gvk.Kind)
|
||||
cfg.Codec = thirdpartyresourcedata.NewCodec(c.ExtensionsClient.RESTClient.Codec(), gvk)
|
||||
cfg.NegotiatedSerializer = thirdpartyresourcedata.NewNegotiatedSerializer(api.Codecs, gvk.Kind, gv, gv)
|
||||
return restclient.RESTClientFor(cfg)
|
||||
}
|
||||
},
|
||||
@ -398,10 +399,14 @@ func NewFactory(optionalClientConfig clientcmd.ClientConfig) *Factory {
|
||||
return nil, fmt.Errorf("no description has been implemented for %q", mapping.GroupVersionKind.Kind)
|
||||
},
|
||||
Decoder: func(toInternal bool) runtime.Decoder {
|
||||
var decoder runtime.Decoder
|
||||
if toInternal {
|
||||
return api.Codecs.UniversalDecoder()
|
||||
decoder = api.Codecs.UniversalDecoder()
|
||||
} else {
|
||||
decoder = api.Codecs.UniversalDeserializer()
|
||||
}
|
||||
return api.Codecs.UniversalDeserializer()
|
||||
return thirdpartyresourcedata.NewDecoder(decoder, "")
|
||||
|
||||
},
|
||||
JSONEncoder: func() runtime.Encoder {
|
||||
return api.Codecs.LegacyCodec(registered.EnabledVersions()...)
|
||||
|
@ -22,7 +22,6 @@ import (
|
||||
|
||||
"k8s.io/kubernetes/pkg/api/meta"
|
||||
"k8s.io/kubernetes/pkg/api/unversioned"
|
||||
"k8s.io/kubernetes/pkg/apimachinery/registered"
|
||||
"k8s.io/kubernetes/pkg/registry/thirdpartyresourcedata"
|
||||
"k8s.io/kubernetes/pkg/runtime"
|
||||
)
|
||||
@ -57,9 +56,12 @@ func (m *Mapper) InfoForData(data []byte, source string) (*Info, error) {
|
||||
}
|
||||
var obj runtime.Object
|
||||
var versioned runtime.Object
|
||||
if registered.IsThirdPartyAPIGroupVersion(gvk.GroupVersion()) {
|
||||
obj, err = runtime.Decode(thirdpartyresourcedata.NewDecoder(nil, gvk.Kind), data)
|
||||
if isThirdParty, gvkOut, err := thirdpartyresourcedata.IsThirdPartyObject(data, gvk); err != nil {
|
||||
return nil, err
|
||||
} else if isThirdParty {
|
||||
obj, err = runtime.Decode(thirdpartyresourcedata.NewDecoder(nil, gvkOut.Kind), data)
|
||||
versioned = obj
|
||||
gvk = gvkOut
|
||||
} else {
|
||||
obj, versioned = versions.Last(), versions.First()
|
||||
}
|
||||
|
@ -531,7 +531,6 @@ type FooList struct {
|
||||
|
||||
func initThirdParty(t *testing.T, version, name string) (*Master, *etcdtesting.EtcdTestServer, *httptest.Server, *assert.Assertions) {
|
||||
master, etcdserver, _, assert := newMaster(t)
|
||||
|
||||
api := &extensions.ThirdPartyResource{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
Name: name,
|
||||
@ -773,6 +772,8 @@ func testInstallThirdPartyAPIGetVersion(t *testing.T, version string) {
|
||||
}
|
||||
|
||||
func TestInstallThirdPartyAPIPost(t *testing.T) {
|
||||
registered.AddThirdPartyAPIGroupVersions(unversioned.GroupVersion{Group: "company.com", Version: "v1"}, unversioned.GroupVersion{Group: "company.com", Version: "v3"})
|
||||
|
||||
for _, version := range versionsToTest {
|
||||
testInstallThirdPartyAPIPostForVersion(t, version)
|
||||
}
|
||||
|
@ -34,6 +34,8 @@ import (
|
||||
"k8s.io/kubernetes/pkg/apis/extensions/v1beta1"
|
||||
"k8s.io/kubernetes/pkg/runtime"
|
||||
"k8s.io/kubernetes/pkg/util/sets"
|
||||
"k8s.io/kubernetes/pkg/util/yaml"
|
||||
"k8s.io/kubernetes/pkg/watch/versioned"
|
||||
)
|
||||
|
||||
type thirdPartyObjectConverter struct {
|
||||
@ -168,7 +170,7 @@ func NewMapper(mapper meta.RESTMapper, kind, version, group string) meta.RESTMap
|
||||
}
|
||||
|
||||
type thirdPartyResourceDataCodecFactory struct {
|
||||
runtime.NegotiatedSerializer
|
||||
delegate runtime.NegotiatedSerializer
|
||||
kind string
|
||||
encodeGV unversioned.GroupVersion
|
||||
decodeGV unversioned.GroupVersion
|
||||
@ -176,8 +178,7 @@ type thirdPartyResourceDataCodecFactory struct {
|
||||
|
||||
func NewNegotiatedSerializer(s runtime.NegotiatedSerializer, kind string, encodeGV, decodeGV unversioned.GroupVersion) runtime.NegotiatedSerializer {
|
||||
return &thirdPartyResourceDataCodecFactory{
|
||||
NegotiatedSerializer: s,
|
||||
|
||||
delegate: s,
|
||||
kind: kind,
|
||||
encodeGV: encodeGV,
|
||||
decodeGV: decodeGV,
|
||||
@ -185,25 +186,43 @@ func NewNegotiatedSerializer(s runtime.NegotiatedSerializer, kind string, encode
|
||||
}
|
||||
|
||||
func (t *thirdPartyResourceDataCodecFactory) SupportedMediaTypes() []string {
|
||||
supported := sets.NewString(t.NegotiatedSerializer.SupportedMediaTypes()...)
|
||||
supported := sets.NewString(t.delegate.SupportedMediaTypes()...)
|
||||
return supported.Intersection(sets.NewString("application/json", "application/yaml")).List()
|
||||
}
|
||||
|
||||
func (t *thirdPartyResourceDataCodecFactory) SerializerForMediaType(mediaType string, params map[string]string) (runtime.SerializerInfo, bool) {
|
||||
switch mediaType {
|
||||
case "application/json", "application/yaml":
|
||||
return t.delegate.SerializerForMediaType(mediaType, params)
|
||||
default:
|
||||
return runtime.SerializerInfo{}, false
|
||||
}
|
||||
}
|
||||
|
||||
func (t *thirdPartyResourceDataCodecFactory) SupportedStreamingMediaTypes() []string {
|
||||
supported := sets.NewString(t.NegotiatedSerializer.SupportedStreamingMediaTypes()...)
|
||||
supported := sets.NewString(t.delegate.SupportedStreamingMediaTypes()...)
|
||||
return supported.Intersection(sets.NewString("application/json", "application/json;stream=watch")).List()
|
||||
}
|
||||
|
||||
func (t *thirdPartyResourceDataCodecFactory) StreamingSerializerForMediaType(mediaType string, params map[string]string) (runtime.StreamSerializerInfo, bool) {
|
||||
switch mediaType {
|
||||
case "application/json", "application/json;stream=watch":
|
||||
return t.delegate.StreamingSerializerForMediaType(mediaType, params)
|
||||
default:
|
||||
return runtime.StreamSerializerInfo{}, false
|
||||
}
|
||||
}
|
||||
|
||||
func (t *thirdPartyResourceDataCodecFactory) EncoderForVersion(s runtime.Encoder, gv unversioned.GroupVersion) runtime.Encoder {
|
||||
return &thirdPartyResourceDataEncoder{delegate: t.NegotiatedSerializer.EncoderForVersion(s, gv), kind: t.kind}
|
||||
return &thirdPartyResourceDataEncoder{delegate: t.delegate.EncoderForVersion(s, gv), gvk: gv.WithKind(t.kind)}
|
||||
}
|
||||
|
||||
func (t *thirdPartyResourceDataCodecFactory) DecoderToVersion(s runtime.Decoder, gv unversioned.GroupVersion) runtime.Decoder {
|
||||
return NewDecoder(t.NegotiatedSerializer.DecoderToVersion(s, gv), t.kind)
|
||||
return NewDecoder(t.delegate.DecoderToVersion(s, gv), t.kind)
|
||||
}
|
||||
|
||||
func NewCodec(delegate runtime.Codec, kind string) runtime.Codec {
|
||||
return runtime.NewCodec(NewEncoder(delegate, kind), NewDecoder(delegate, kind))
|
||||
func NewCodec(delegate runtime.Codec, gvk unversioned.GroupVersionKind) runtime.Codec {
|
||||
return runtime.NewCodec(NewEncoder(delegate, gvk), NewDecoder(delegate, gvk.Kind))
|
||||
}
|
||||
|
||||
type thirdPartyResourceDataDecoder struct {
|
||||
@ -220,7 +239,6 @@ var _ runtime.Decoder = &thirdPartyResourceDataDecoder{}
|
||||
func parseObject(data []byte) (map[string]interface{}, error) {
|
||||
var obj interface{}
|
||||
if err := json.Unmarshal(data, &obj); err != nil {
|
||||
fmt.Printf("Invalid JSON:\n%s\n", string(data))
|
||||
return nil, err
|
||||
}
|
||||
mapObj, ok := obj.(map[string]interface{})
|
||||
@ -243,14 +261,15 @@ func (t *thirdPartyResourceDataDecoder) populateFromObject(mapObj map[string]int
|
||||
if err := json.Unmarshal(data, &typeMeta); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
switch typeMeta.Kind {
|
||||
case t.kind:
|
||||
isList := strings.HasSuffix(typeMeta.Kind, "List")
|
||||
switch {
|
||||
case !isList && (len(t.kind) == 0 || typeMeta.Kind == t.kind):
|
||||
result := &extensions.ThirdPartyResourceData{}
|
||||
if err := t.populateResource(result, mapObj, data); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return result, nil
|
||||
case t.kind + "List":
|
||||
case isList && (len(t.kind) == 0 || typeMeta.Kind == t.kind+"List"):
|
||||
list := &extensions.ThirdPartyResourceDataList{}
|
||||
if err := t.populateListResource(list, mapObj); err != nil {
|
||||
return nil, err
|
||||
@ -283,19 +302,73 @@ func (t *thirdPartyResourceDataDecoder) populateResource(objIn *extensions.Third
|
||||
return nil
|
||||
}
|
||||
|
||||
func IsThirdPartyObject(rawData []byte, gvk *unversioned.GroupVersionKind) (isThirdParty bool, gvkOut *unversioned.GroupVersionKind, err error) {
|
||||
var gv unversioned.GroupVersion
|
||||
if gvk == nil {
|
||||
data, err := yaml.ToJSON(rawData)
|
||||
if err != nil {
|
||||
return false, nil, err
|
||||
}
|
||||
metadata := unversioned.TypeMeta{}
|
||||
if err = json.Unmarshal(data, &metadata); err != nil {
|
||||
return false, nil, err
|
||||
}
|
||||
gv, err = unversioned.ParseGroupVersion(metadata.APIVersion)
|
||||
if err != nil {
|
||||
return false, nil, err
|
||||
}
|
||||
gvkOut = &unversioned.GroupVersionKind{
|
||||
Group: gv.Group,
|
||||
Version: gv.Version,
|
||||
Kind: metadata.Kind,
|
||||
}
|
||||
} else {
|
||||
gv = gvk.GroupVersion()
|
||||
gvkOut = gvk
|
||||
}
|
||||
return registered.IsThirdPartyAPIGroupVersion(gv), gvkOut, nil
|
||||
}
|
||||
|
||||
func (t *thirdPartyResourceDataDecoder) Decode(data []byte, gvk *unversioned.GroupVersionKind, into runtime.Object) (runtime.Object, *unversioned.GroupVersionKind, error) {
|
||||
if into == nil {
|
||||
if gvk == nil || gvk.Kind != t.kind {
|
||||
if isThirdParty, _, err := IsThirdPartyObject(data, gvk); err != nil {
|
||||
return nil, nil, err
|
||||
} else if !isThirdParty {
|
||||
return t.delegate.Decode(data, gvk, into)
|
||||
}
|
||||
}
|
||||
obj, err := t.populate(data)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
return obj, gvk, nil
|
||||
}
|
||||
thirdParty, ok := into.(*extensions.ThirdPartyResourceData)
|
||||
if !ok {
|
||||
return nil, nil, fmt.Errorf("unexpected object: %#v", into)
|
||||
switch o := into.(type) {
|
||||
case *extensions.ThirdPartyResourceData:
|
||||
break
|
||||
case *runtime.VersionedObjects:
|
||||
// We're not sure that it's third party, we need to test
|
||||
if gvk == nil || gvk.Kind != t.kind {
|
||||
if isThirdParty, _, err := IsThirdPartyObject(data, gvk); err != nil {
|
||||
return nil, nil, err
|
||||
} else if !isThirdParty {
|
||||
return t.delegate.Decode(data, gvk, into)
|
||||
}
|
||||
}
|
||||
obj, err := t.populate(data)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
o.Objects = []runtime.Object{
|
||||
obj,
|
||||
}
|
||||
return o, gvk, nil
|
||||
default:
|
||||
return t.delegate.Decode(data, gvk, into)
|
||||
}
|
||||
|
||||
thirdParty := into.(*extensions.ThirdPartyResourceData)
|
||||
var dataObj interface{}
|
||||
if err := json.Unmarshal(data, &dataObj); err != nil {
|
||||
return nil, nil, err
|
||||
@ -320,10 +393,10 @@ func (t *thirdPartyResourceDataDecoder) Decode(data []byte, gvk *unversioned.Gro
|
||||
if !ok {
|
||||
return nil, nil, fmt.Errorf("unexpected object for 'kind': %v", kindObj)
|
||||
}
|
||||
if kindStr != t.kind {
|
||||
if len(t.kind) > 0 && kindStr != t.kind {
|
||||
return nil, nil, fmt.Errorf("kind doesn't match, expecting: %s, got %s", gvk.Kind, kindStr)
|
||||
}
|
||||
actual.Kind = t.kind
|
||||
actual.Kind = kindStr
|
||||
}
|
||||
if versionObj, found := mapObj["apiVersion"]; !found {
|
||||
if gvk == nil {
|
||||
@ -380,16 +453,18 @@ func (t *thirdPartyResourceDataDecoder) populateListResource(objIn *extensions.T
|
||||
|
||||
const template = `{
|
||||
"kind": "%s",
|
||||
"apiVersion": "%s",
|
||||
"metadata": {},
|
||||
"items": [ %s ]
|
||||
}`
|
||||
|
||||
type thirdPartyResourceDataEncoder struct {
|
||||
delegate runtime.Encoder
|
||||
kind string
|
||||
gvk unversioned.GroupVersionKind
|
||||
}
|
||||
|
||||
func NewEncoder(delegate runtime.Encoder, kind string) runtime.Encoder {
|
||||
return &thirdPartyResourceDataEncoder{delegate: delegate, kind: kind}
|
||||
func NewEncoder(delegate runtime.Encoder, gvk unversioned.GroupVersionKind) runtime.Encoder {
|
||||
return &thirdPartyResourceDataEncoder{delegate: delegate, gvk: gvk}
|
||||
}
|
||||
|
||||
var _ runtime.Encoder = &thirdPartyResourceDataEncoder{}
|
||||
@ -410,6 +485,8 @@ func encodeToJSON(obj *extensions.ThirdPartyResourceData, stream io.Writer) erro
|
||||
|
||||
func (t *thirdPartyResourceDataEncoder) EncodeToStream(obj runtime.Object, stream io.Writer, overrides ...unversioned.GroupVersion) (err error) {
|
||||
switch obj := obj.(type) {
|
||||
case *versioned.InternalEvent:
|
||||
return t.delegate.EncodeToStream(obj, stream, overrides...)
|
||||
case *extensions.ThirdPartyResourceData:
|
||||
return encodeToJSON(obj, stream)
|
||||
case *extensions.ThirdPartyResourceDataList:
|
||||
@ -423,7 +500,8 @@ func (t *thirdPartyResourceDataEncoder) EncodeToStream(obj runtime.Object, strea
|
||||
}
|
||||
dataStrings[ix] = buff.String()
|
||||
}
|
||||
fmt.Fprintf(stream, template, t.kind+"List", strings.Join(dataStrings, ","))
|
||||
gv := t.gvk.GroupVersion()
|
||||
fmt.Fprintf(stream, template, t.gvk.Kind+"List", gv.String(), strings.Join(dataStrings, ","))
|
||||
return nil
|
||||
case *unversioned.Status, *unversioned.APIResourceList:
|
||||
return t.delegate.EncodeToStream(obj, stream, overrides...)
|
||||
|
@ -26,6 +26,7 @@ import (
|
||||
"k8s.io/kubernetes/pkg/api/testapi"
|
||||
"k8s.io/kubernetes/pkg/api/unversioned"
|
||||
"k8s.io/kubernetes/pkg/api/v1"
|
||||
"k8s.io/kubernetes/pkg/apimachinery/registered"
|
||||
"k8s.io/kubernetes/pkg/apis/extensions"
|
||||
"k8s.io/kubernetes/pkg/runtime"
|
||||
)
|
||||
@ -38,6 +39,10 @@ type Foo struct {
|
||||
OtherField int `json:"otherField"`
|
||||
}
|
||||
|
||||
func (*Foo) GetObjectKind() unversioned.ObjectKind {
|
||||
return unversioned.EmptyObjectKind
|
||||
}
|
||||
|
||||
type FooList struct {
|
||||
unversioned.TypeMeta `json:",inline"`
|
||||
unversioned.ListMeta `json:"metadata,omitempty" description:"standard list metadata; see http://releases.k8s.io/HEAD/docs/devel/api-conventions.md#metadata"`
|
||||
@ -47,21 +52,37 @@ type FooList struct {
|
||||
|
||||
func TestCodec(t *testing.T) {
|
||||
tests := []struct {
|
||||
into runtime.Object
|
||||
obj *Foo
|
||||
expectErr bool
|
||||
name string
|
||||
}{
|
||||
{
|
||||
into: &runtime.VersionedObjects{},
|
||||
obj: &Foo{
|
||||
ObjectMeta: api.ObjectMeta{Name: "bar"},
|
||||
TypeMeta: unversioned.TypeMeta{APIVersion: "company.com/v1", Kind: "Foo"},
|
||||
},
|
||||
expectErr: false,
|
||||
name: "versioned objects list",
|
||||
},
|
||||
{
|
||||
obj: &Foo{ObjectMeta: api.ObjectMeta{Name: "bar"}},
|
||||
expectErr: true,
|
||||
name: "missing kind",
|
||||
},
|
||||
{
|
||||
obj: &Foo{ObjectMeta: api.ObjectMeta{Name: "bar"}, TypeMeta: unversioned.TypeMeta{Kind: "Foo"}},
|
||||
obj: &Foo{
|
||||
ObjectMeta: api.ObjectMeta{Name: "bar"},
|
||||
TypeMeta: unversioned.TypeMeta{APIVersion: "company.com/v1", Kind: "Foo"},
|
||||
},
|
||||
name: "basic",
|
||||
},
|
||||
{
|
||||
obj: &Foo{ObjectMeta: api.ObjectMeta{Name: "bar", ResourceVersion: "baz"}, TypeMeta: unversioned.TypeMeta{Kind: "Foo"}},
|
||||
obj: &Foo{
|
||||
ObjectMeta: api.ObjectMeta{Name: "bar", ResourceVersion: "baz"},
|
||||
TypeMeta: unversioned.TypeMeta{APIVersion: "company.com/v1", Kind: "Foo"},
|
||||
},
|
||||
name: "resource version",
|
||||
},
|
||||
{
|
||||
@ -70,7 +91,10 @@ func TestCodec(t *testing.T) {
|
||||
Name: "bar",
|
||||
CreationTimestamp: unversioned.Time{Time: time.Unix(100, 0)},
|
||||
},
|
||||
TypeMeta: unversioned.TypeMeta{Kind: "Foo"},
|
||||
TypeMeta: unversioned.TypeMeta{
|
||||
APIVersion: "company.com/v1",
|
||||
Kind: "Foo",
|
||||
},
|
||||
},
|
||||
name: "creation time",
|
||||
},
|
||||
@ -81,20 +105,31 @@ func TestCodec(t *testing.T) {
|
||||
ResourceVersion: "baz",
|
||||
Labels: map[string]string{"foo": "bar", "baz": "blah"},
|
||||
},
|
||||
TypeMeta: unversioned.TypeMeta{Kind: "Foo"},
|
||||
TypeMeta: unversioned.TypeMeta{APIVersion: "company.com/v1", Kind: "Foo"},
|
||||
},
|
||||
name: "labels",
|
||||
},
|
||||
}
|
||||
registered.AddThirdPartyAPIGroupVersions(unversioned.GroupVersion{Group: "company.com", Version: "v1"})
|
||||
for _, test := range tests {
|
||||
d := &thirdPartyResourceDataDecoder{kind: "Foo", delegate: testapi.Extensions.Codec()}
|
||||
e := &thirdPartyResourceDataEncoder{kind: "Foo", delegate: testapi.Extensions.Codec()}
|
||||
e := &thirdPartyResourceDataEncoder{gvk: unversioned.GroupVersionKind{
|
||||
Group: "company.com",
|
||||
Version: "v1",
|
||||
Kind: "Foo",
|
||||
}, delegate: testapi.Extensions.Codec()}
|
||||
data, err := json.Marshal(test.obj)
|
||||
if err != nil {
|
||||
t.Errorf("[%s] unexpected error: %v", test.name, err)
|
||||
continue
|
||||
}
|
||||
obj, err := runtime.Decode(d, data)
|
||||
var obj runtime.Object
|
||||
if test.into != nil {
|
||||
err = runtime.DecodeInto(d, data, test.into)
|
||||
obj = test.into
|
||||
} else {
|
||||
obj, err = runtime.Decode(d, data)
|
||||
}
|
||||
if err != nil && !test.expectErr {
|
||||
t.Errorf("[%s] unexpected error: %v", test.name, err)
|
||||
continue
|
||||
@ -105,8 +140,13 @@ func TestCodec(t *testing.T) {
|
||||
}
|
||||
continue
|
||||
}
|
||||
rsrcObj, ok := obj.(*extensions.ThirdPartyResourceData)
|
||||
if !ok {
|
||||
var rsrcObj *extensions.ThirdPartyResourceData
|
||||
switch o := obj.(type) {
|
||||
case *extensions.ThirdPartyResourceData:
|
||||
rsrcObj = o
|
||||
case *runtime.VersionedObjects:
|
||||
rsrcObj = o.First().(*extensions.ThirdPartyResourceData)
|
||||
default:
|
||||
t.Errorf("[%s] unexpected object: %v", test.name, obj)
|
||||
continue
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user