Complete initial third party API support in the master

This commit is contained in:
Brendan Burns 2015-09-09 14:36:19 -07:00
parent 8fda0969e6
commit fb9efac68c
4 changed files with 324 additions and 36 deletions

View File

@ -171,14 +171,14 @@ func (m *DefaultRESTMapper) ResourceSingularizer(resource string) (singular stri
func (m *DefaultRESTMapper) VersionAndKindForResource(resource string) (defaultVersion, kind string, err error) {
meta, ok := m.mapping[strings.ToLower(resource)]
if !ok {
return "", "", fmt.Errorf("no resource %q has been defined", resource)
return "", "", fmt.Errorf("in version and kind for resource, no resource %q has been defined", resource)
}
return meta.APIVersion, meta.Kind, nil
}
func (m *DefaultRESTMapper) GroupForResource(resource string) (string, error) {
if _, ok := m.mapping[strings.ToLower(resource)]; !ok {
return "", fmt.Errorf("no resource %q has been defined", resource)
return "", fmt.Errorf("in group for resource, no resource %q has been defined", resource)
}
return m.group, nil
}

View File

@ -37,7 +37,7 @@ import (
"k8s.io/kubernetes/pkg/api/meta"
"k8s.io/kubernetes/pkg/api/rest"
"k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/apis/experimental"
expapi "k8s.io/kubernetes/pkg/apis/experimental"
"k8s.io/kubernetes/pkg/apiserver"
"k8s.io/kubernetes/pkg/auth/authenticator"
"k8s.io/kubernetes/pkg/auth/authorizer"
@ -244,6 +244,10 @@ type Master struct {
// storage for third party objects
thirdPartyStorage storage.Interface
// map from api path to storage for those objects
thirdPartyResources map[string]*thirdpartyresourcedataetcd.REST
// protects the map
thirdPartyResourcesLock sync.RWMutex
}
// NewEtcdStorage returns a storage.Interface for the provided arguments or an error if the version
@ -575,7 +579,11 @@ func (m *Master) init(c *Config) {
// allGroups records all supported groups at /apis
allGroups := []api.APIGroup{}
if m.exp {
m.thirdPartyStorage = c.ExpDatabaseStorage
m.thirdPartyResources = map[string]*thirdpartyresourcedataetcd.REST{}
expVersion := m.experimental(c)
if err := expVersion.InstallREST(m.handlerContainer); err != nil {
glog.Fatalf("Unable to setup experimental api: %v", err)
}
@ -802,7 +810,95 @@ func (m *Master) api_v1() *apiserver.APIGroupVersion {
return version
}
func (m *Master) InstallThirdPartyAPI(rsrc *experimental.ThirdPartyResource) error {
// HasThirdPartyResource returns true if a particular third party resource currently installed.
func (m *Master) HasThirdPartyResource(rsrc *expapi.ThirdPartyResource) (bool, error) {
_, group, err := thirdpartyresourcedata.ExtractApiGroupAndKind(rsrc)
if err != nil {
return false, err
}
path := makeThirdPartyPath(group)
services := m.handlerContainer.RegisteredWebServices()
for ix := range services {
if services[ix].RootPath() == path {
return true, nil
}
}
return false, nil
}
func (m *Master) removeThirdPartyStorage(path string) error {
m.thirdPartyResourcesLock.Lock()
defer m.thirdPartyResourcesLock.Unlock()
storage, found := m.thirdPartyResources[path]
if found {
if err := m.removeAllThirdPartyResources(storage); err != nil {
return err
}
delete(m.thirdPartyResources, path)
}
return nil
}
// RemoveThirdPartyResource removes all resources matching `path`. Also deletes any stored data
func (m *Master) RemoveThirdPartyResource(path string) error {
if err := m.removeThirdPartyStorage(path); err != nil {
return err
}
services := m.handlerContainer.RegisteredWebServices()
for ix := range services {
root := services[ix].RootPath()
if root == path || strings.HasPrefix(root, path+"/") {
m.handlerContainer.Remove(services[ix])
}
}
return nil
}
func (m *Master) removeAllThirdPartyResources(registry *thirdpartyresourcedataetcd.REST) error {
ctx := api.NewDefaultContext()
existingData, err := registry.List(ctx, labels.Everything(), fields.Everything())
if err != nil {
return err
}
list, ok := existingData.(*expapi.ThirdPartyResourceDataList)
if !ok {
return fmt.Errorf("expected a *ThirdPartyResourceDataList, got %#v", list)
}
for ix := range list.Items {
item := &list.Items[ix]
if _, err := registry.Delete(ctx, item.Name, nil); err != nil {
return err
}
}
return nil
}
// ListThirdPartyResources lists all currently installed third party resources
func (m *Master) ListThirdPartyResources() []string {
m.thirdPartyResourcesLock.RLock()
defer m.thirdPartyResourcesLock.RUnlock()
result := []string{}
for key := range m.thirdPartyResources {
result = append(result, key)
}
return result
}
func (m *Master) addThirdPartyResourceStorage(path string, storage *thirdpartyresourcedataetcd.REST) {
m.thirdPartyResourcesLock.Lock()
defer m.thirdPartyResourcesLock.Unlock()
m.thirdPartyResources[path] = storage
}
// InstallThirdPartyResource installs a third party resource specified by 'rsrc'. When a resource is
// installed a corresponding RESTful resource is added as a valid path in the web service provided by
// the master.
//
// For example, if you install a resource ThirdPartyResource{ Name: "foo.company.com", Versions: {"v1"} }
// then the following RESTful resource is created on the server:
// http://<host>/apis/company.com/v1/foos/...
func (m *Master) InstallThirdPartyResource(rsrc *expapi.ThirdPartyResource) error {
kind, group, err := thirdpartyresourcedata.ExtractApiGroupAndKind(rsrc)
if err != nil {
return err
@ -811,7 +907,7 @@ func (m *Master) InstallThirdPartyAPI(rsrc *experimental.ThirdPartyResource) err
if err := thirdparty.InstallREST(m.handlerContainer); err != nil {
glog.Fatalf("Unable to setup thirdparty api: %v", err)
}
thirdPartyAPIPrefix := makeThirdPartyPath(group) + "/"
path := makeThirdPartyPath(group)
groupVersion := api.GroupVersion{
GroupVersion: group + "/" + rsrc.Versions[0].Name,
Version: rsrc.Versions[0].Name,
@ -820,7 +916,8 @@ func (m *Master) InstallThirdPartyAPI(rsrc *experimental.ThirdPartyResource) err
Name: group,
Versions: []api.GroupVersion{groupVersion},
}
apiserver.AddGroupWebService(m.handlerContainer, thirdPartyAPIPrefix, apiGroup)
apiserver.AddGroupWebService(m.handlerContainer, path, apiGroup)
m.addThirdPartyResourceStorage(path, thirdparty.Storage[strings.ToLower(kind)+"s"].(*thirdpartyresourcedataetcd.REST))
thirdPartyRequestInfoResolver := &apiserver.APIRequestInfoResolver{APIPrefixes: sets.NewString(strings.TrimPrefix(group, "/")), RestMapper: thirdparty.Mapper}
apiserver.InstallServiceErrorHandler(m.handlerContainer, thirdPartyRequestInfoResolver, []string{thirdparty.Version})
return nil
@ -829,7 +926,7 @@ func (m *Master) InstallThirdPartyAPI(rsrc *experimental.ThirdPartyResource) err
func (m *Master) thirdpartyapi(group, kind, version string) *apiserver.APIGroupVersion {
resourceStorage := thirdpartyresourcedataetcd.NewREST(m.thirdPartyStorage, group, kind)
apiRoot := makeThirdPartyPath(group) + "/"
apiRoot := makeThirdPartyPath(group)
storage := map[string]rest.Storage{
strings.ToLower(kind) + "s": resourceStorage,
@ -842,13 +939,12 @@ func (m *Master) thirdpartyapi(group, kind, version string) *apiserver.APIGroupV
Convertor: api.Scheme,
Typer: api.Scheme,
Mapper: thirdpartyresourcedata.NewMapper(latest.GroupOrDie("experimental").RESTMapper, kind, version),
Codec: latest.GroupOrDie("experimental").Codec,
Mapper: thirdpartyresourcedata.NewMapper(latest.GroupOrDie("experimental").RESTMapper, kind, version, group),
Codec: thirdpartyresourcedata.NewCodec(latest.GroupOrDie("experimental").Codec, kind),
Linker: latest.GroupOrDie("experimental").SelfLinker,
Storage: storage,
Version: version,
Admit: m.admissionControl,
Context: m.requestContextMapper,
ProxyDialerFn: m.dialer,
@ -865,6 +961,17 @@ func (m *Master) experimental(c *Config) *apiserver.APIGroupVersion {
deploymentStorage := deploymentetcd.NewREST(c.ExpDatabaseStorage)
jobStorage := jobetcd.NewREST(c.ExpDatabaseStorage)
thirdPartyControl := ThirdPartyController{
master: m,
thirdPartyResourceRegistry: thirdPartyResourceStorage,
}
go func() {
util.Forever(func() {
if err := thirdPartyControl.SyncResources(); err != nil {
glog.Warningf("third party resource sync failed: %v", err)
}
}, 10*time.Second)
}()
storage := map[string]rest.Storage{
strings.ToLower("replicationControllers"): controllerStorage.ReplicationController,
strings.ToLower("replicationControllers/scale"): controllerStorage.Scale,

View File

@ -28,11 +28,10 @@ import (
"os"
"path/filepath"
"reflect"
"strings"
"testing"
"time"
"github.com/emicklei/go-restful"
"github.com/stretchr/testify/assert"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/latest"
"k8s.io/kubernetes/pkg/api/meta"
@ -47,10 +46,15 @@ import (
"k8s.io/kubernetes/pkg/registry/endpoint"
"k8s.io/kubernetes/pkg/registry/namespace"
"k8s.io/kubernetes/pkg/registry/registrytest"
thirdpartyresourcedatastorage "k8s.io/kubernetes/pkg/registry/thirdpartyresourcedata/etcd"
etcdstorage "k8s.io/kubernetes/pkg/storage/etcd"
"k8s.io/kubernetes/pkg/tools"
"k8s.io/kubernetes/pkg/tools/etcdtest"
"k8s.io/kubernetes/pkg/util"
"github.com/coreos/go-etcd/etcd"
"github.com/emicklei/go-restful"
"github.com/stretchr/testify/assert"
)
// setUp is a convience function for setting up for (most) tests.
@ -440,12 +444,12 @@ type FooList struct {
unversioned.TypeMeta `json:",inline"`
unversioned.ListMeta `json:"metadata,omitempty" description:"standard list metadata; see http://releases.k8s.io/HEAD/docs/devel/api-conventions.md#metadata"`
items []Foo `json:"items"`
Items []Foo `json:"items"`
}
func initThirdParty(t *testing.T, version string) (*tools.FakeEtcdClient, *httptest.Server, *assert.Assertions) {
func initThirdParty(t *testing.T, version string) (*Master, *tools.FakeEtcdClient, *httptest.Server, *assert.Assertions) {
master, _, assert := setUp(t)
master.thirdPartyResources = map[string]*thirdpartyresourcedatastorage.REST{}
api := &experimental.ThirdPartyResource{
ObjectMeta: api.ObjectMeta{
Name: "foo.company.com",
@ -463,12 +467,12 @@ func initThirdParty(t *testing.T, version string) (*tools.FakeEtcdClient, *httpt
fakeClient.Machines = []string{"http://machine1:4001", "http://machine2", "http://machine3:4003"}
master.thirdPartyStorage = etcdstorage.NewEtcdStorage(fakeClient, testapi.Experimental.Codec(), etcdtest.PathPrefix())
if !assert.NoError(master.InstallThirdPartyAPI(api)) {
if !assert.NoError(master.InstallThirdPartyResource(api)) {
t.FailNow()
}
server := httptest.NewServer(master.handlerContainer.ServeMux)
return fakeClient, server, assert
return &master, fakeClient, server, assert
}
func TestInstallThirdPartyAPIList(t *testing.T) {
@ -478,27 +482,90 @@ func TestInstallThirdPartyAPIList(t *testing.T) {
}
func testInstallThirdPartyAPIListVersion(t *testing.T, version string) {
fakeClient, server, assert := initThirdParty(t, version)
defer server.Close()
fakeClient.ExpectNotFoundGet(etcdtest.PathPrefix() + "/ThirdPartyResourceData/company.com/foos/default")
resp, err := http.Get(server.URL + "/apis/company.com/" + version + "/namespaces/default/foos")
if !assert.NoError(err) {
return
tests := []struct {
items []Foo
}{
{},
{
items: []Foo{},
},
{
items: []Foo{
{
ObjectMeta: api.ObjectMeta{
Name: "test",
},
TypeMeta: unversioned.TypeMeta{
Kind: "Foo",
APIVersion: version,
},
SomeField: "test field",
OtherField: 10,
},
{
ObjectMeta: api.ObjectMeta{
Name: "bar",
},
TypeMeta: unversioned.TypeMeta{
Kind: "Foo",
APIVersion: version,
},
SomeField: "test field another",
OtherField: 20,
},
},
},
}
for _, test := range tests {
_, fakeClient, server, assert := initThirdParty(t, version)
defer server.Close()
defer resp.Body.Close()
if test.items == nil {
fakeClient.ExpectNotFoundGet(etcdtest.PathPrefix() + "/ThirdPartyResourceData/company.com/foos/default")
} else {
setupEtcdList(fakeClient, "/ThirdPartyResourceData/company.com/foos/default", test.items)
}
assert.Equal(http.StatusOK, resp.StatusCode)
resp, err := http.Get(server.URL + "/apis/company.com/" + version + "/namespaces/default/foos")
if !assert.NoError(err) {
return
}
defer resp.Body.Close()
data, err := ioutil.ReadAll(resp.Body)
assert.NoError(err)
assert.Equal(http.StatusOK, resp.StatusCode)
list := FooList{}
err = json.Unmarshal(data, &list)
assert.NoError(err)
data, err := ioutil.ReadAll(resp.Body)
assert.NoError(err)
list := FooList{}
if err = json.Unmarshal(data, &list); err != nil {
t.Errorf("unexpected error: %v", err)
}
if test.items == nil {
if len(list.Items) != 0 {
t.Errorf("expected no items, saw: %v", list.Items)
}
continue
}
if len(list.Items) != len(test.items) {
t.Errorf("unexpected length: %d vs %d", len(list.Items), len(test.items))
continue
}
for ix := range list.Items {
// Copy things that are set dynamically on the server
expectedObj := test.items[ix]
expectedObj.SelfLink = list.Items[ix].SelfLink
expectedObj.Namespace = list.Items[ix].Namespace
expectedObj.UID = list.Items[ix].UID
expectedObj.CreationTimestamp = list.Items[ix].CreationTimestamp
if !reflect.DeepEqual(list.Items[ix], expectedObj) {
t.Errorf("expected:\n%#v\nsaw:\n%#v\n", expectedObj, list.Items[ix])
}
}
}
}
func encodeToThirdParty(name string, obj interface{}) ([]byte, error) {
@ -522,6 +589,23 @@ func storeToEtcd(fakeClient *tools.FakeEtcdClient, path, name string, obj interf
return err
}
func setupEtcdList(fakeClient *tools.FakeEtcdClient, path string, list []Foo) error {
resp := tools.EtcdResponseWithError{
R: &etcd.Response{
Node: &etcd.Node{},
},
}
for _, obj := range list {
data, err := encodeToThirdParty(obj.Name, obj)
if err != nil {
return err
}
resp.R.Node.Nodes = append(resp.R.Node.Nodes, &etcd.Node{Value: string(data)})
}
fakeClient.Data[etcdtest.PathPrefix()+path] = resp
return nil
}
func decodeResponse(resp *http.Response, obj interface{}) error {
defer resp.Body.Close()
@ -543,7 +627,7 @@ func TestInstallThirdPartyAPIGet(t *testing.T) {
}
func testInstallThirdPartyAPIGetVersion(t *testing.T, version string) {
fakeClient, server, assert := initThirdParty(t, version)
_, fakeClient, server, assert := initThirdParty(t, version)
defer server.Close()
expectedObj := Foo{
@ -588,7 +672,7 @@ func TestInstallThirdPartyAPIPost(t *testing.T) {
}
func testInstallThirdPartyAPIPostForVersion(t *testing.T, version string) {
fakeClient, server, assert := initThirdParty(t, version)
_, fakeClient, server, assert := initThirdParty(t, version)
defer server.Close()
inputObj := Foo{
@ -656,7 +740,7 @@ func TestInstallThirdPartyAPIDelete(t *testing.T) {
}
func testInstallThirdPartyAPIDeleteVersion(t *testing.T, version string) {
fakeClient, server, assert := initThirdParty(t, version)
_, fakeClient, server, assert := initThirdParty(t, version)
defer server.Close()
expectedObj := Foo{
@ -721,3 +805,88 @@ func httpDelete(url string) (*http.Response, error) {
client := &http.Client{}
return client.Do(req)
}
func TestInstallThirdPartyResourceRemove(t *testing.T) {
for _, version := range versionsToTest {
testInstallThirdPartyResourceRemove(t, version)
}
}
func testInstallThirdPartyResourceRemove(t *testing.T, version string) {
master, fakeClient, server, assert := initThirdParty(t, version)
defer server.Close()
expectedObj := Foo{
ObjectMeta: api.ObjectMeta{
Name: "test",
},
TypeMeta: unversioned.TypeMeta{
Kind: "Foo",
},
SomeField: "test field",
OtherField: 10,
}
if !assert.NoError(storeToEtcd(fakeClient, "/ThirdPartyResourceData/company.com/foos/default/test", "test", expectedObj)) {
t.FailNow()
return
}
secondObj := expectedObj
secondObj.Name = "bar"
if !assert.NoError(storeToEtcd(fakeClient, "/ThirdPartyResourceData/company.com/foos/default/bar", "bar", secondObj)) {
t.FailNow()
return
}
setupEtcdList(fakeClient, "/ThirdPartyResourceData/company.com/foos/default", []Foo{expectedObj, secondObj})
resp, err := http.Get(server.URL + "/apis/company.com/" + version + "/namespaces/default/foos/test")
if !assert.NoError(err) {
t.FailNow()
return
}
if resp.StatusCode != http.StatusOK {
t.Errorf("unexpected status: %v", resp)
}
item := Foo{}
if err := decodeResponse(resp, &item); err != nil {
t.Errorf("unexpected error: %v", err)
}
// TODO: validate etcd set things here
item.ObjectMeta = expectedObj.ObjectMeta
if !assert.True(reflect.DeepEqual(item, expectedObj)) {
t.Errorf("expected:\n%v\nsaw:\n%v\n", expectedObj, item)
}
path := makeThirdPartyPath("company.com")
master.RemoveThirdPartyResource(path)
resp, err = http.Get(server.URL + "/apis/company.com/" + version + "/namespaces/default/foos/test")
if !assert.NoError(err) {
return
}
if resp.StatusCode != http.StatusNotFound {
t.Errorf("unexpected status: %v", resp)
}
expectDeletedKeys := []string{
etcdtest.PathPrefix() + "/ThirdPartyResourceData/company.com/foos/default/test",
etcdtest.PathPrefix() + "/ThirdPartyResourceData/company.com/foos/default/bar",
}
if !assert.True(reflect.DeepEqual(fakeClient.DeletedKeys, expectDeletedKeys)) {
t.Errorf("unexpected deleted keys: %v", fakeClient.DeletedKeys)
}
installed := master.ListThirdPartyResources()
if len(installed) != 0 {
t.Errorf("Resource(s) still installed: %v", installed)
}
services := master.handlerContainer.RegisteredWebServices()
for ix := range services {
if strings.HasPrefix(services[ix].RootPath(), "/apis/company.com") {
t.Errorf("Web service still installed at %s: %#v", services[ix].RootPath(), services[ix])
}
}
}

View File

@ -33,9 +33,17 @@ type thirdPartyResourceDataMapper struct {
mapper meta.RESTMapper
kind string
version string
group string
}
func (t *thirdPartyResourceDataMapper) isThirdPartyResource(resource string) bool {
return resource == strings.ToLower(t.kind)+"s"
}
func (t *thirdPartyResourceDataMapper) GroupForResource(resource string) (string, error) {
if t.isThirdPartyResource(resource) {
return t.group, nil
}
return t.mapper.GroupForResource(resource)
}
@ -66,14 +74,18 @@ func (t *thirdPartyResourceDataMapper) ResourceSingularizer(resource string) (si
}
func (t *thirdPartyResourceDataMapper) VersionAndKindForResource(resource string) (defaultVersion, kind string, err error) {
if t.isThirdPartyResource(resource) {
return t.version, t.kind, nil
}
return t.mapper.VersionAndKindForResource(resource)
}
func NewMapper(mapper meta.RESTMapper, kind, version string) meta.RESTMapper {
func NewMapper(mapper meta.RESTMapper, kind, version, group string) meta.RESTMapper {
return &thirdPartyResourceDataMapper{
mapper: mapper,
kind: kind,
version: version,
group: group,
}
}