Initialize API servers with negotiated serializers

Pass down into the server initialization the necessary interface for
handling client/server content type negotiation. Add integration tests
for the negotiation.
This commit is contained in:
Clayton Coleman 2015-12-21 00:27:49 -05:00
parent c49cd4edf9
commit 4d127dc969
11 changed files with 65 additions and 48 deletions

View File

@ -47,6 +47,7 @@ import (
"k8s.io/kubernetes/pkg/genericapiserver"
kubeletclient "k8s.io/kubernetes/pkg/kubelet/client"
"k8s.io/kubernetes/pkg/master"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/serviceaccount"
"k8s.io/kubernetes/pkg/storage"
etcdstorage "k8s.io/kubernetes/pkg/storage/etcd"
@ -81,9 +82,9 @@ func verifyClusterIPFlags(s *options.APIServer) {
}
}
type newEtcdFunc func([]string, meta.VersionInterfacesFunc, string, string) (storage.Interface, error)
type newEtcdFunc func([]string, runtime.NegotiatedSerializer, string, string) (storage.Interface, error)
func newEtcd(etcdServerList []string, interfacesFunc meta.VersionInterfacesFunc, storageGroupVersionString, pathPrefix string) (etcdStorage storage.Interface, err error) {
func newEtcd(etcdServerList []string, ns runtime.NegotiatedSerializer, storageGroupVersionString, pathPrefix string) (etcdStorage storage.Interface, err error) {
if storageGroupVersionString == "" {
return etcdStorage, fmt.Errorf("storageVersion is required to create a etcd storage")
}
@ -95,11 +96,11 @@ func newEtcd(etcdServerList []string, interfacesFunc meta.VersionInterfacesFunc,
var storageConfig etcdstorage.EtcdConfig
storageConfig.ServerList = etcdServerList
storageConfig.Prefix = pathPrefix
versionedInterface, err := interfacesFunc(storageVersion)
if err != nil {
return nil, err
s, ok := ns.SerializerForMediaType("application/json", nil)
if !ok {
return nil, fmt.Errorf("unable to find serializer for JSON")
}
storageConfig.Codec = versionedInterface.Codec
storageConfig.Codec = runtime.NewCodec(ns.EncoderForVersion(s, storageVersion), ns.DecoderToVersion(s, unversioned.GroupVersion{Group: storageVersion.Group, Version: runtime.APIVersionInternal}))
return storageConfig.NewStorage()
}
@ -148,7 +149,7 @@ func updateEtcdOverrides(overrides []string, storageVersions map[string]string,
}
servers := strings.Split(tokens[1], ";")
etcdOverrideStorage, err := newEtcdFn(servers, apigroup.InterfacesFor, storageVersions[apigroup.GroupVersion.Group], prefix)
etcdOverrideStorage, err := newEtcdFn(servers, api.Codecs, storageVersions[apigroup.GroupVersion.Group], prefix)
if err != nil {
glog.Fatalf("Invalid storage version or misconfigured etcd for %s: %v", tokens[0], err)
}
@ -259,7 +260,7 @@ func Run(s *options.APIServer) error {
if _, found := storageVersions[legacyV1Group.GroupVersion.Group]; !found {
glog.Fatalf("Couldn't find the storage version for group: %q in storageVersions: %v", legacyV1Group.GroupVersion.Group, storageVersions)
}
etcdStorage, err := newEtcd(s.EtcdServerList, legacyV1Group.InterfacesFor, storageVersions[legacyV1Group.GroupVersion.Group], s.EtcdPathPrefix)
etcdStorage, err := newEtcd(s.EtcdServerList, api.Codecs, storageVersions[legacyV1Group.GroupVersion.Group], s.EtcdPathPrefix)
if err != nil {
glog.Fatalf("Invalid storage version or misconfigured etcd: %v", err)
}
@ -273,7 +274,7 @@ func Run(s *options.APIServer) error {
if _, found := storageVersions[expGroup.GroupVersion.Group]; !found {
glog.Fatalf("Couldn't find the storage version for group: %q in storageVersions: %v", expGroup.GroupVersion.Group, storageVersions)
}
expEtcdStorage, err := newEtcd(s.EtcdServerList, expGroup.InterfacesFor, storageVersions[expGroup.GroupVersion.Group], s.EtcdPathPrefix)
expEtcdStorage, err := newEtcd(s.EtcdServerList, api.Codecs, storageVersions[expGroup.GroupVersion.Group], s.EtcdPathPrefix)
if err != nil {
glog.Fatalf("Invalid extensions storage version or misconfigured etcd: %v", err)
}
@ -380,6 +381,7 @@ func Run(s *options.APIServer) error {
ProxyTLSClientConfig: proxyTLSClientConfig,
ServiceNodePortRange: s.ServiceNodePortRange,
KubernetesServiceNodePort: s.KubernetesServiceNodePort,
Serializer: api.Codecs,
},
EnableCoreControllers: true,
EventTTL: s.EventTTL,

View File

@ -24,9 +24,9 @@ import (
"k8s.io/kubernetes/cmd/kube-apiserver/app/options"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/meta"
"k8s.io/kubernetes/pkg/apis/extensions"
"k8s.io/kubernetes/pkg/genericapiserver"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/storage"
)
@ -133,7 +133,7 @@ func TestUpdateEtcdOverrides(t *testing.T) {
}
for _, test := range testCases {
newEtcd := func(serverList []string, _ meta.VersionInterfacesFunc, _, _ string) (storage.Interface, error) {
newEtcd := func(serverList []string, _ runtime.NegotiatedSerializer, _, _ string) (storage.Interface, error) {
if !reflect.DeepEqual(test.servers, serverList) {
t.Errorf("unexpected server list, expected: %#v, got: %#v", test.servers, serverList)
}

View File

@ -102,15 +102,9 @@ func TestRESTMapper(t *testing.T) {
t.Errorf("incorrect groupVersion: %v", mapping)
}
<<<<<<< HEAD
interfaces, _ := registered.GroupOrDie(extensions.GroupName).InterfacesFor(version)
if mapping.Codec != interfaces.Codec {
t.Errorf("unexpected codec: %#v, expected: %#v", mapping, interfaces)
=======
interfaces, _ := latest.GroupOrDie(extensions.GroupName).InterfacesFor(version)
if mapping.ObjectConvertor != interfaces.ObjectConvertor {
t.Errorf("unexpected: %#v, expected: %#v", mapping, interfaces)
>>>>>>> e776ada... Switch API objects to not register per version codecs
}
rc := &extensions.HorizontalPodAutoscaler{ObjectMeta: api.ObjectMeta{Name: "foo"}}

View File

@ -23,7 +23,6 @@ import (
// TODO: Ideally we should create the necessary package structure in e.g.,
// pkg/conversion/test/... instead of importing pkg/api here.
"k8s.io/kubernetes/pkg/api/latest"
"k8s.io/kubernetes/pkg/api/testapi"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/apis/extensions"

View File

@ -40,6 +40,7 @@ import (
"k8s.io/kubernetes/pkg/registry/generic"
genericetcd "k8s.io/kubernetes/pkg/registry/generic/etcd"
ipallocator "k8s.io/kubernetes/pkg/registry/service/ipallocator"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/storage"
"k8s.io/kubernetes/pkg/ui"
"k8s.io/kubernetes/pkg/util"
@ -188,6 +189,9 @@ type Config struct {
// Map requests to contexts. Exported so downstream consumers can provider their own mappers
RequestContextMapper api.RequestContextMapper
// Required, the interface for serializing and converting objects to and from the wire
Serializer runtime.NegotiatedSerializer
// If specified, all web services will be registered into this container
RestfulContainer *restful.Container
@ -394,6 +398,7 @@ func New(c *Config) *GenericAPIServer {
AdmissionControl: c.AdmissionControl,
ApiGroupVersionOverrides: c.APIGroupVersionOverrides,
RequestContextMapper: c.RequestContextMapper,
Serializer: c.Serializer,
cacheTimeout: c.CacheTimeout,
MinRequestTimeout: time.Duration(c.MinRequestTimeout) * time.Second,
@ -418,7 +423,7 @@ func New(c *Config) *GenericAPIServer {
} else {
mux := http.NewServeMux()
s.mux = mux
handlerContainer = NewHandlerContainer(mux)
handlerContainer = NewHandlerContainer(mux, c.Serializer)
}
s.HandlerContainer = handlerContainer
// Use CurlyRouter to be able to use regular expressions in paths. Regular expressions are required in paths for example for proxy (where the path is proxy/{kind}/{name}/{*})
@ -457,10 +462,10 @@ func (s *GenericAPIServer) HandleFuncWithAuth(pattern string, handler func(http.
s.MuxHelper.HandleFunc(pattern, handler)
}
func NewHandlerContainer(mux *http.ServeMux) *restful.Container {
func NewHandlerContainer(mux *http.ServeMux, s runtime.NegotiatedSerializer) *restful.Container {
container := restful.NewContainer()
container.ServeMux = mux
apiserver.InstallRecoverHandler(container)
apiserver.InstallRecoverHandler(s, container)
return container
}
@ -667,7 +672,7 @@ func (s *GenericAPIServer) installAPIGroup(apiGroupInfo *APIGroupInfo) error {
// Install the version handler.
if apiGroupInfo.IsLegacyGroup {
// Add a handler at /api to enumerate the supported api versions.
apiserver.AddApiWebService(s.HandlerContainer, apiPrefix, apiVersions)
apiserver.AddApiWebService(s.Serializer, s.HandlerContainer, apiPrefix, apiVersions)
} else {
// Add a handler at /apis/<groupName> to enumerate all versions supported by this group.
apiVersionsForDiscovery := []unversioned.GroupVersionForDiscovery{}
@ -686,9 +691,9 @@ func (s *GenericAPIServer) installAPIGroup(apiGroupInfo *APIGroupInfo) error {
Versions: apiVersionsForDiscovery,
PreferredVersion: preferedVersionForDiscovery,
}
apiserver.AddGroupWebService(s.HandlerContainer, apiPrefix+"/"+apiGroup.Name, apiGroup)
apiserver.AddGroupWebService(s.Serializer, s.HandlerContainer, apiPrefix+"/"+apiGroup.Name, apiGroup)
}
apiserver.InstallServiceErrorHandler(s.HandlerContainer, s.NewRequestInfoResolver(), apiVersions)
apiserver.InstallServiceErrorHandler(s.Serializer, s.HandlerContainer, s.NewRequestInfoResolver(), apiVersions)
return nil
}
@ -700,25 +705,21 @@ func (s *GenericAPIServer) getAPIGroupVersion(apiGroupInfo *APIGroupInfo, groupV
version, err := s.newAPIGroupVersion(apiGroupInfo.GroupMeta, groupVersion)
version.Root = apiPrefix
version.Storage = storage
version.ParameterCodec = apiGroupInfo.ParameterCodec
version.Serializer = apiGroupInfo.NegotiatedSerializer
version.Creater = apiGroupInfo.Scheme
version.Convertor = apiGroupInfo.Scheme
version.Typer = apiGroupInfo.Scheme
return version, err
}
func (s *GenericAPIServer) newAPIGroupVersion(groupMeta apimachinery.GroupMeta, groupVersion unversioned.GroupVersion) (*apiserver.APIGroupVersion, error) {
versionInterface, err := groupMeta.InterfacesFor(groupVersion)
if err != nil {
return nil, err
}
return &apiserver.APIGroupVersion{
RequestInfoResolver: s.NewRequestInfoResolver(),
Creater: api.Scheme,
Convertor: api.Scheme,
Typer: api.Scheme,
GroupVersion: groupVersion,
Linker: groupMeta.SelfLinker,
Mapper: groupMeta.RESTMapper,
Codec: versionInterface.Codec,
Admit: s.AdmissionControl,
Context: s.RequestContextMapper,

View File

@ -105,12 +105,16 @@ func TestInstallAPIGroups(t *testing.T) {
GroupMeta: *apiGroupMeta,
VersionedResourcesStorageMap: map[string]map[string]rest.Storage{},
IsLegacyGroup: true,
ParameterCodec: api.ParameterCodec,
NegotiatedSerializer: api.Codecs,
},
{
// extensions group version
GroupMeta: *extensionsGroupMeta,
VersionedResourcesStorageMap: map[string]map[string]rest.Storage{},
OptionsExternalVersion: &apiGroupMeta.GroupVersion,
ParameterCodec: api.ParameterCodec,
NegotiatedSerializer: api.Codecs,
},
}
s.InstallAPIGroups(apiGroupsInfo)
@ -140,7 +144,7 @@ func TestInstallAPIGroups(t *testing.T) {
func TestNewHandlerContainer(t *testing.T) {
assert := assert.New(t)
mux := http.NewServeMux()
container := NewHandlerContainer(mux)
container := NewHandlerContainer(mux, nil)
assert.Equal(mux, container.ServeMux, "ServerMux's do not match")
}
@ -179,7 +183,7 @@ func TestInstallSwaggerAPI(t *testing.T) {
defer etcdserver.Terminate(t)
mux := http.NewServeMux()
server.HandlerContainer = NewHandlerContainer(mux)
server.HandlerContainer = NewHandlerContainer(mux, nil)
// Ensure swagger isn't installed without the call
ws := server.HandlerContainer.RegisteredWebServices()
@ -198,7 +202,7 @@ func TestInstallSwaggerAPI(t *testing.T) {
// Empty externalHost verification
mux = http.NewServeMux()
server.HandlerContainer = NewHandlerContainer(mux)
server.HandlerContainer = NewHandlerContainer(mux, nil)
server.externalHost = ""
server.ClusterIP = net.IPv4(10, 10, 10, 10)
server.PublicReadWritePort = 1010

View File

@ -66,6 +66,7 @@ import (
thirdpartyresourceetcd "k8s.io/kubernetes/pkg/registry/thirdpartyresource/etcd"
"k8s.io/kubernetes/pkg/registry/thirdpartyresourcedata"
thirdpartyresourcedataetcd "k8s.io/kubernetes/pkg/registry/thirdpartyresourcedata/etcd"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/storage"
etcdutil "k8s.io/kubernetes/pkg/storage/etcd/util"
"k8s.io/kubernetes/pkg/util"
@ -183,7 +184,10 @@ func (m *Master) InstallAPIs(c *Config) {
VersionedResourcesStorageMap: map[string]map[string]rest.Storage{
"v1": m.v1ResourcesStorage,
},
IsLegacyGroup: true,
IsLegacyGroup: true,
Scheme: api.Scheme,
ParameterCodec: api.ParameterCodec,
NegotiatedSerializer: api.Codecs,
}
apiGroupsInfo = append(apiGroupsInfo, apiGroupInfo)
}
@ -217,6 +221,9 @@ func (m *Master) InstallAPIs(c *Config) {
"v1beta1": extensionResources,
},
OptionsExternalVersion: &registered.GroupOrDie(api.GroupName).GroupVersion,
Scheme: api.Scheme,
ParameterCodec: api.ParameterCodec,
NegotiatedSerializer: api.Codecs,
}
apiGroupsInfo = append(apiGroupsInfo, apiGroupInfo)
@ -237,7 +244,7 @@ func (m *Master) InstallAPIs(c *Config) {
// This should be done after all groups are registered
// TODO: replace the hardcoded "apis".
apiserver.AddApisWebService(m.HandlerContainer, "/apis", func() []unversioned.APIGroup {
apiserver.AddApisWebService(m.Serializer, m.HandlerContainer, "/apis", func() []unversioned.APIGroup {
groups := []unversioned.APIGroup{}
for ix := range allGroups {
groups = append(groups, allGroups[ix])
@ -517,9 +524,9 @@ func (m *Master) InstallThirdPartyResource(rsrc *extensions.ThirdPartyResource)
Name: group,
Versions: []unversioned.GroupVersionForDiscovery{groupVersion},
}
apiserver.AddGroupWebService(m.HandlerContainer, path, apiGroup)
apiserver.AddGroupWebService(api.Codecs, m.HandlerContainer, path, apiGroup)
m.addThirdPartyResourceStorage(path, thirdparty.Storage[strings.ToLower(kind)+"s"].(*thirdpartyresourcedataetcd.REST), apiGroup)
apiserver.InstallServiceErrorHandler(m.HandlerContainer, m.NewRequestInfoResolver(), []string{thirdparty.GroupVersion.String()})
apiserver.InstallServiceErrorHandler(api.Codecs, m.HandlerContainer, m.NewRequestInfoResolver(), []string{thirdparty.GroupVersion.String()})
return nil
}
@ -533,10 +540,12 @@ func (m *Master) thirdpartyapi(group, kind, version string) *apiserver.APIGroupV
}
optionsExternalVersion := registered.GroupOrDie(api.GroupName).GroupVersion
internalVersion := unversioned.GroupVersion{Group: group, Version: runtime.APIVersionInternal}
externalVersion := unversioned.GroupVersion{Group: group, Version: version}
return &apiserver.APIGroupVersion{
Root: apiRoot,
GroupVersion: unversioned.GroupVersion{Group: group, Version: version},
GroupVersion: externalVersion,
RequestInfoResolver: m.NewRequestInfoResolver(),
Creater: thirdpartyresourcedata.NewObjectCreator(group, version, api.Scheme),
@ -544,11 +553,13 @@ func (m *Master) thirdpartyapi(group, kind, version string) *apiserver.APIGroupV
Typer: api.Scheme,
Mapper: thirdpartyresourcedata.NewMapper(registered.GroupOrDie(extensions.GroupName).RESTMapper, kind, version, group),
Codec: thirdpartyresourcedata.NewCodec(registered.GroupOrDie(extensions.GroupName).Codec, kind),
Linker: registered.GroupOrDie(extensions.GroupName).SelfLinker,
Storage: storage,
OptionsExternalVersion: &optionsExternalVersion,
Serializer: thirdpartyresourcedata.NewNegotiatedSerializer(api.Codecs, kind, externalVersion, internalVersion),
ParameterCodec: api.ParameterCodec,
Context: m.RequestContextMapper,
MinRequestTimeout: m.MinRequestTimeout,

View File

@ -83,6 +83,7 @@ func setUp(t *testing.T) (Master, *etcdtesting.EtcdTestServer, Config, *assert.A
func newMaster(t *testing.T) (*Master, *etcdtesting.EtcdTestServer, Config, *assert.Assertions) {
_, etcdserver, config, assert := setUp(t)
config.Serializer = api.Codecs
config.KubeletClient = client.FakeKubeletClient{}
config.ProxyDialer = func(network, addr string) (net.Conn, error) { return nil, nil }
@ -496,7 +497,6 @@ func decodeResponse(resp *http.Response, obj interface{}) error {
if err != nil {
return err
}
if err := json.Unmarshal(data, obj); err != nil {
return err
}

View File

@ -49,7 +49,7 @@ func TestSet(t *testing.T) {
if err != nil || resp.Node == nil {
t.Fatalf("unexpected error: %v %v", err, resp)
}
decoded, err := testapi.Default.Codec().Decode([]byte(resp.Node.Value))
decoded, err := runtime.Decode(testapi.Default.Codec(), []byte(resp.Node.Value))
if err != nil {
t.Fatalf("unexpected response: %#v", resp.Node)
}
@ -67,7 +67,7 @@ func TestGet(t *testing.T) {
ctx := context.TODO()
framework.WithEtcdKey(func(key string) {
testObject := api.ServiceAccount{ObjectMeta: api.ObjectMeta{Name: "foo"}}
coded, err := testapi.Default.Codec().Encode(&testObject)
coded, err := runtime.Encode(testapi.Default.Codec(), &testObject)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}

View File

@ -21,7 +21,7 @@ import (
"net"
"net/http"
"net/http/httptest"
"runtime"
goruntime "runtime"
"sync"
"testing"
"time"
@ -40,6 +40,7 @@ import (
"k8s.io/kubernetes/pkg/kubectl"
kubeletclient "k8s.io/kubernetes/pkg/kubelet/client"
"k8s.io/kubernetes/pkg/master"
"k8s.io/kubernetes/pkg/runtime"
etcdstorage "k8s.io/kubernetes/pkg/storage/etcd"
"k8s.io/kubernetes/pkg/storage/etcd/etcdtest"
"k8s.io/kubernetes/plugin/pkg/admission/admit"
@ -106,7 +107,7 @@ func NewMasterComponents(c *Config) *MasterComponents {
// TODO: Support events once we can cleanly shutdown an event recorder.
controllerManager.SetEventRecorder(&record.FakeRecorder{})
if c.StartReplicationManager {
go controllerManager.Run(runtime.NumCPU(), rcStopCh)
go controllerManager.Run(goruntime.NumCPU(), rcStopCh)
}
var once sync.Once
return &MasterComponents{
@ -157,6 +158,7 @@ func NewMasterConfig() *master.Config {
APIGroupPrefix: "/apis",
Authorizer: apiserver.NewAlwaysAllowAuthorizer(),
AdmissionControl: admit.NewAlwaysAdmit(),
Serializer: api.Codecs,
},
KubeletClient: kubeletclient.FakeKubeletClient{},
}
@ -195,7 +197,7 @@ func RCFromManifest(fileName string) *api.ReplicationController {
glog.Fatalf("Unexpected error reading rc manifest %v", err)
}
var controller api.ReplicationController
if err := api.Scheme.DecodeInto(data, &controller); err != nil {
if err := runtime.DecodeInto(testapi.Default.Codec(), data, &controller); err != nil {
glog.Fatalf("Unexpected error reading rc manifest %v", err)
}
return &controller

View File

@ -19,9 +19,13 @@ limitations under the License.
package integration
import (
"encoding/json"
"io/ioutil"
"net/http"
"testing"
"github.com/ghodss/yaml"
"k8s.io/kubernetes/test/integration/framework"
)