From fb9efac68c34a0968255b480898490033250e1a7 Mon Sep 17 00:00:00 2001 From: Brendan Burns Date: Wed, 9 Sep 2015 14:36:19 -0700 Subject: [PATCH] Complete initial third party API support in the master --- pkg/api/meta/restmapper.go | 4 +- pkg/master/master.go | 123 ++++++++++- pkg/master/master_test.go | 219 ++++++++++++++++--- pkg/registry/thirdpartyresourcedata/codec.go | 14 +- 4 files changed, 324 insertions(+), 36 deletions(-) diff --git a/pkg/api/meta/restmapper.go b/pkg/api/meta/restmapper.go index 89680920e50..6ac54b2d8ff 100644 --- a/pkg/api/meta/restmapper.go +++ b/pkg/api/meta/restmapper.go @@ -171,14 +171,14 @@ func (m *DefaultRESTMapper) ResourceSingularizer(resource string) (singular stri func (m *DefaultRESTMapper) VersionAndKindForResource(resource string) (defaultVersion, kind string, err error) { meta, ok := m.mapping[strings.ToLower(resource)] if !ok { - return "", "", fmt.Errorf("no resource %q has been defined", resource) + return "", "", fmt.Errorf("in version and kind for resource, no resource %q has been defined", resource) } return meta.APIVersion, meta.Kind, nil } func (m *DefaultRESTMapper) GroupForResource(resource string) (string, error) { if _, ok := m.mapping[strings.ToLower(resource)]; !ok { - return "", fmt.Errorf("no resource %q has been defined", resource) + return "", fmt.Errorf("in group for resource, no resource %q has been defined", resource) } return m.group, nil } diff --git a/pkg/master/master.go b/pkg/master/master.go index b133de797a5..499279f417c 100644 --- a/pkg/master/master.go +++ b/pkg/master/master.go @@ -37,7 +37,7 @@ import ( "k8s.io/kubernetes/pkg/api/meta" "k8s.io/kubernetes/pkg/api/rest" "k8s.io/kubernetes/pkg/api/v1" - "k8s.io/kubernetes/pkg/apis/experimental" + expapi "k8s.io/kubernetes/pkg/apis/experimental" "k8s.io/kubernetes/pkg/apiserver" "k8s.io/kubernetes/pkg/auth/authenticator" "k8s.io/kubernetes/pkg/auth/authorizer" @@ -244,6 +244,10 @@ type Master struct { // storage for third party objects thirdPartyStorage storage.Interface + // map from api path to storage for those objects + thirdPartyResources map[string]*thirdpartyresourcedataetcd.REST + // protects the map + thirdPartyResourcesLock sync.RWMutex } // NewEtcdStorage returns a storage.Interface for the provided arguments or an error if the version @@ -575,7 +579,11 @@ func (m *Master) init(c *Config) { // allGroups records all supported groups at /apis allGroups := []api.APIGroup{} if m.exp { + m.thirdPartyStorage = c.ExpDatabaseStorage + m.thirdPartyResources = map[string]*thirdpartyresourcedataetcd.REST{} + expVersion := m.experimental(c) + if err := expVersion.InstallREST(m.handlerContainer); err != nil { glog.Fatalf("Unable to setup experimental api: %v", err) } @@ -802,7 +810,95 @@ func (m *Master) api_v1() *apiserver.APIGroupVersion { return version } -func (m *Master) InstallThirdPartyAPI(rsrc *experimental.ThirdPartyResource) error { +// HasThirdPartyResource returns true if a particular third party resource currently installed. +func (m *Master) HasThirdPartyResource(rsrc *expapi.ThirdPartyResource) (bool, error) { + _, group, err := thirdpartyresourcedata.ExtractApiGroupAndKind(rsrc) + if err != nil { + return false, err + } + path := makeThirdPartyPath(group) + services := m.handlerContainer.RegisteredWebServices() + for ix := range services { + if services[ix].RootPath() == path { + return true, nil + } + } + return false, nil +} + +func (m *Master) removeThirdPartyStorage(path string) error { + m.thirdPartyResourcesLock.Lock() + defer m.thirdPartyResourcesLock.Unlock() + storage, found := m.thirdPartyResources[path] + if found { + if err := m.removeAllThirdPartyResources(storage); err != nil { + return err + } + delete(m.thirdPartyResources, path) + } + return nil +} + +// RemoveThirdPartyResource removes all resources matching `path`. Also deletes any stored data +func (m *Master) RemoveThirdPartyResource(path string) error { + if err := m.removeThirdPartyStorage(path); err != nil { + return err + } + + services := m.handlerContainer.RegisteredWebServices() + for ix := range services { + root := services[ix].RootPath() + if root == path || strings.HasPrefix(root, path+"/") { + m.handlerContainer.Remove(services[ix]) + } + } + return nil +} + +func (m *Master) removeAllThirdPartyResources(registry *thirdpartyresourcedataetcd.REST) error { + ctx := api.NewDefaultContext() + existingData, err := registry.List(ctx, labels.Everything(), fields.Everything()) + if err != nil { + return err + } + list, ok := existingData.(*expapi.ThirdPartyResourceDataList) + if !ok { + return fmt.Errorf("expected a *ThirdPartyResourceDataList, got %#v", list) + } + for ix := range list.Items { + item := &list.Items[ix] + if _, err := registry.Delete(ctx, item.Name, nil); err != nil { + return err + } + } + return nil +} + +// ListThirdPartyResources lists all currently installed third party resources +func (m *Master) ListThirdPartyResources() []string { + m.thirdPartyResourcesLock.RLock() + defer m.thirdPartyResourcesLock.RUnlock() + result := []string{} + for key := range m.thirdPartyResources { + result = append(result, key) + } + return result +} + +func (m *Master) addThirdPartyResourceStorage(path string, storage *thirdpartyresourcedataetcd.REST) { + m.thirdPartyResourcesLock.Lock() + defer m.thirdPartyResourcesLock.Unlock() + m.thirdPartyResources[path] = storage +} + +// InstallThirdPartyResource installs a third party resource specified by 'rsrc'. When a resource is +// installed a corresponding RESTful resource is added as a valid path in the web service provided by +// the master. +// +// For example, if you install a resource ThirdPartyResource{ Name: "foo.company.com", Versions: {"v1"} } +// then the following RESTful resource is created on the server: +// http:///apis/company.com/v1/foos/... +func (m *Master) InstallThirdPartyResource(rsrc *expapi.ThirdPartyResource) error { kind, group, err := thirdpartyresourcedata.ExtractApiGroupAndKind(rsrc) if err != nil { return err @@ -811,7 +907,7 @@ func (m *Master) InstallThirdPartyAPI(rsrc *experimental.ThirdPartyResource) err if err := thirdparty.InstallREST(m.handlerContainer); err != nil { glog.Fatalf("Unable to setup thirdparty api: %v", err) } - thirdPartyAPIPrefix := makeThirdPartyPath(group) + "/" + path := makeThirdPartyPath(group) groupVersion := api.GroupVersion{ GroupVersion: group + "/" + rsrc.Versions[0].Name, Version: rsrc.Versions[0].Name, @@ -820,7 +916,8 @@ func (m *Master) InstallThirdPartyAPI(rsrc *experimental.ThirdPartyResource) err Name: group, Versions: []api.GroupVersion{groupVersion}, } - apiserver.AddGroupWebService(m.handlerContainer, thirdPartyAPIPrefix, apiGroup) + apiserver.AddGroupWebService(m.handlerContainer, path, apiGroup) + m.addThirdPartyResourceStorage(path, thirdparty.Storage[strings.ToLower(kind)+"s"].(*thirdpartyresourcedataetcd.REST)) thirdPartyRequestInfoResolver := &apiserver.APIRequestInfoResolver{APIPrefixes: sets.NewString(strings.TrimPrefix(group, "/")), RestMapper: thirdparty.Mapper} apiserver.InstallServiceErrorHandler(m.handlerContainer, thirdPartyRequestInfoResolver, []string{thirdparty.Version}) return nil @@ -829,7 +926,7 @@ func (m *Master) InstallThirdPartyAPI(rsrc *experimental.ThirdPartyResource) err func (m *Master) thirdpartyapi(group, kind, version string) *apiserver.APIGroupVersion { resourceStorage := thirdpartyresourcedataetcd.NewREST(m.thirdPartyStorage, group, kind) - apiRoot := makeThirdPartyPath(group) + "/" + apiRoot := makeThirdPartyPath(group) storage := map[string]rest.Storage{ strings.ToLower(kind) + "s": resourceStorage, @@ -842,13 +939,12 @@ func (m *Master) thirdpartyapi(group, kind, version string) *apiserver.APIGroupV Convertor: api.Scheme, Typer: api.Scheme, - Mapper: thirdpartyresourcedata.NewMapper(latest.GroupOrDie("experimental").RESTMapper, kind, version), - Codec: latest.GroupOrDie("experimental").Codec, + Mapper: thirdpartyresourcedata.NewMapper(latest.GroupOrDie("experimental").RESTMapper, kind, version, group), + Codec: thirdpartyresourcedata.NewCodec(latest.GroupOrDie("experimental").Codec, kind), Linker: latest.GroupOrDie("experimental").SelfLinker, Storage: storage, Version: version, - Admit: m.admissionControl, Context: m.requestContextMapper, ProxyDialerFn: m.dialer, @@ -865,6 +961,17 @@ func (m *Master) experimental(c *Config) *apiserver.APIGroupVersion { deploymentStorage := deploymentetcd.NewREST(c.ExpDatabaseStorage) jobStorage := jobetcd.NewREST(c.ExpDatabaseStorage) + thirdPartyControl := ThirdPartyController{ + master: m, + thirdPartyResourceRegistry: thirdPartyResourceStorage, + } + go func() { + util.Forever(func() { + if err := thirdPartyControl.SyncResources(); err != nil { + glog.Warningf("third party resource sync failed: %v", err) + } + }, 10*time.Second) + }() storage := map[string]rest.Storage{ strings.ToLower("replicationControllers"): controllerStorage.ReplicationController, strings.ToLower("replicationControllers/scale"): controllerStorage.Scale, diff --git a/pkg/master/master_test.go b/pkg/master/master_test.go index 36c59ec7672..86cdcb65894 100644 --- a/pkg/master/master_test.go +++ b/pkg/master/master_test.go @@ -28,11 +28,10 @@ import ( "os" "path/filepath" "reflect" + "strings" "testing" "time" - "github.com/emicklei/go-restful" - "github.com/stretchr/testify/assert" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/latest" "k8s.io/kubernetes/pkg/api/meta" @@ -47,10 +46,15 @@ import ( "k8s.io/kubernetes/pkg/registry/endpoint" "k8s.io/kubernetes/pkg/registry/namespace" "k8s.io/kubernetes/pkg/registry/registrytest" + thirdpartyresourcedatastorage "k8s.io/kubernetes/pkg/registry/thirdpartyresourcedata/etcd" etcdstorage "k8s.io/kubernetes/pkg/storage/etcd" "k8s.io/kubernetes/pkg/tools" "k8s.io/kubernetes/pkg/tools/etcdtest" "k8s.io/kubernetes/pkg/util" + + "github.com/coreos/go-etcd/etcd" + "github.com/emicklei/go-restful" + "github.com/stretchr/testify/assert" ) // setUp is a convience function for setting up for (most) tests. @@ -440,12 +444,12 @@ type FooList struct { unversioned.TypeMeta `json:",inline"` unversioned.ListMeta `json:"metadata,omitempty" description:"standard list metadata; see http://releases.k8s.io/HEAD/docs/devel/api-conventions.md#metadata"` - items []Foo `json:"items"` + Items []Foo `json:"items"` } -func initThirdParty(t *testing.T, version string) (*tools.FakeEtcdClient, *httptest.Server, *assert.Assertions) { +func initThirdParty(t *testing.T, version string) (*Master, *tools.FakeEtcdClient, *httptest.Server, *assert.Assertions) { master, _, assert := setUp(t) - + master.thirdPartyResources = map[string]*thirdpartyresourcedatastorage.REST{} api := &experimental.ThirdPartyResource{ ObjectMeta: api.ObjectMeta{ Name: "foo.company.com", @@ -463,12 +467,12 @@ func initThirdParty(t *testing.T, version string) (*tools.FakeEtcdClient, *httpt fakeClient.Machines = []string{"http://machine1:4001", "http://machine2", "http://machine3:4003"} master.thirdPartyStorage = etcdstorage.NewEtcdStorage(fakeClient, testapi.Experimental.Codec(), etcdtest.PathPrefix()) - if !assert.NoError(master.InstallThirdPartyAPI(api)) { + if !assert.NoError(master.InstallThirdPartyResource(api)) { t.FailNow() } server := httptest.NewServer(master.handlerContainer.ServeMux) - return fakeClient, server, assert + return &master, fakeClient, server, assert } func TestInstallThirdPartyAPIList(t *testing.T) { @@ -478,27 +482,90 @@ func TestInstallThirdPartyAPIList(t *testing.T) { } func testInstallThirdPartyAPIListVersion(t *testing.T, version string) { - fakeClient, server, assert := initThirdParty(t, version) - defer server.Close() - - fakeClient.ExpectNotFoundGet(etcdtest.PathPrefix() + "/ThirdPartyResourceData/company.com/foos/default") - - resp, err := http.Get(server.URL + "/apis/company.com/" + version + "/namespaces/default/foos") - if !assert.NoError(err) { - return + tests := []struct { + items []Foo + }{ + {}, + { + items: []Foo{}, + }, + { + items: []Foo{ + { + ObjectMeta: api.ObjectMeta{ + Name: "test", + }, + TypeMeta: unversioned.TypeMeta{ + Kind: "Foo", + APIVersion: version, + }, + SomeField: "test field", + OtherField: 10, + }, + { + ObjectMeta: api.ObjectMeta{ + Name: "bar", + }, + TypeMeta: unversioned.TypeMeta{ + Kind: "Foo", + APIVersion: version, + }, + SomeField: "test field another", + OtherField: 20, + }, + }, + }, } + for _, test := range tests { + _, fakeClient, server, assert := initThirdParty(t, version) + defer server.Close() - defer resp.Body.Close() + if test.items == nil { + fakeClient.ExpectNotFoundGet(etcdtest.PathPrefix() + "/ThirdPartyResourceData/company.com/foos/default") + } else { + setupEtcdList(fakeClient, "/ThirdPartyResourceData/company.com/foos/default", test.items) + } - assert.Equal(http.StatusOK, resp.StatusCode) + resp, err := http.Get(server.URL + "/apis/company.com/" + version + "/namespaces/default/foos") + if !assert.NoError(err) { + return + } + defer resp.Body.Close() - data, err := ioutil.ReadAll(resp.Body) - assert.NoError(err) + assert.Equal(http.StatusOK, resp.StatusCode) - list := FooList{} - err = json.Unmarshal(data, &list) - assert.NoError(err) + data, err := ioutil.ReadAll(resp.Body) + assert.NoError(err) + list := FooList{} + if err = json.Unmarshal(data, &list); err != nil { + t.Errorf("unexpected error: %v", err) + } + + if test.items == nil { + if len(list.Items) != 0 { + t.Errorf("expected no items, saw: %v", list.Items) + } + continue + } + + if len(list.Items) != len(test.items) { + t.Errorf("unexpected length: %d vs %d", len(list.Items), len(test.items)) + continue + } + for ix := range list.Items { + // Copy things that are set dynamically on the server + expectedObj := test.items[ix] + expectedObj.SelfLink = list.Items[ix].SelfLink + expectedObj.Namespace = list.Items[ix].Namespace + expectedObj.UID = list.Items[ix].UID + expectedObj.CreationTimestamp = list.Items[ix].CreationTimestamp + + if !reflect.DeepEqual(list.Items[ix], expectedObj) { + t.Errorf("expected:\n%#v\nsaw:\n%#v\n", expectedObj, list.Items[ix]) + } + } + } } func encodeToThirdParty(name string, obj interface{}) ([]byte, error) { @@ -522,6 +589,23 @@ func storeToEtcd(fakeClient *tools.FakeEtcdClient, path, name string, obj interf return err } +func setupEtcdList(fakeClient *tools.FakeEtcdClient, path string, list []Foo) error { + resp := tools.EtcdResponseWithError{ + R: &etcd.Response{ + Node: &etcd.Node{}, + }, + } + for _, obj := range list { + data, err := encodeToThirdParty(obj.Name, obj) + if err != nil { + return err + } + resp.R.Node.Nodes = append(resp.R.Node.Nodes, &etcd.Node{Value: string(data)}) + } + fakeClient.Data[etcdtest.PathPrefix()+path] = resp + return nil +} + func decodeResponse(resp *http.Response, obj interface{}) error { defer resp.Body.Close() @@ -543,7 +627,7 @@ func TestInstallThirdPartyAPIGet(t *testing.T) { } func testInstallThirdPartyAPIGetVersion(t *testing.T, version string) { - fakeClient, server, assert := initThirdParty(t, version) + _, fakeClient, server, assert := initThirdParty(t, version) defer server.Close() expectedObj := Foo{ @@ -588,7 +672,7 @@ func TestInstallThirdPartyAPIPost(t *testing.T) { } func testInstallThirdPartyAPIPostForVersion(t *testing.T, version string) { - fakeClient, server, assert := initThirdParty(t, version) + _, fakeClient, server, assert := initThirdParty(t, version) defer server.Close() inputObj := Foo{ @@ -656,7 +740,7 @@ func TestInstallThirdPartyAPIDelete(t *testing.T) { } func testInstallThirdPartyAPIDeleteVersion(t *testing.T, version string) { - fakeClient, server, assert := initThirdParty(t, version) + _, fakeClient, server, assert := initThirdParty(t, version) defer server.Close() expectedObj := Foo{ @@ -721,3 +805,88 @@ func httpDelete(url string) (*http.Response, error) { client := &http.Client{} return client.Do(req) } + +func TestInstallThirdPartyResourceRemove(t *testing.T) { + for _, version := range versionsToTest { + testInstallThirdPartyResourceRemove(t, version) + } +} + +func testInstallThirdPartyResourceRemove(t *testing.T, version string) { + master, fakeClient, server, assert := initThirdParty(t, version) + defer server.Close() + + expectedObj := Foo{ + ObjectMeta: api.ObjectMeta{ + Name: "test", + }, + TypeMeta: unversioned.TypeMeta{ + Kind: "Foo", + }, + SomeField: "test field", + OtherField: 10, + } + if !assert.NoError(storeToEtcd(fakeClient, "/ThirdPartyResourceData/company.com/foos/default/test", "test", expectedObj)) { + t.FailNow() + return + } + secondObj := expectedObj + secondObj.Name = "bar" + if !assert.NoError(storeToEtcd(fakeClient, "/ThirdPartyResourceData/company.com/foos/default/bar", "bar", secondObj)) { + t.FailNow() + return + } + + setupEtcdList(fakeClient, "/ThirdPartyResourceData/company.com/foos/default", []Foo{expectedObj, secondObj}) + + resp, err := http.Get(server.URL + "/apis/company.com/" + version + "/namespaces/default/foos/test") + if !assert.NoError(err) { + t.FailNow() + return + } + + if resp.StatusCode != http.StatusOK { + t.Errorf("unexpected status: %v", resp) + } + + item := Foo{} + if err := decodeResponse(resp, &item); err != nil { + t.Errorf("unexpected error: %v", err) + } + + // TODO: validate etcd set things here + item.ObjectMeta = expectedObj.ObjectMeta + + if !assert.True(reflect.DeepEqual(item, expectedObj)) { + t.Errorf("expected:\n%v\nsaw:\n%v\n", expectedObj, item) + } + + path := makeThirdPartyPath("company.com") + master.RemoveThirdPartyResource(path) + + resp, err = http.Get(server.URL + "/apis/company.com/" + version + "/namespaces/default/foos/test") + if !assert.NoError(err) { + return + } + + if resp.StatusCode != http.StatusNotFound { + t.Errorf("unexpected status: %v", resp) + } + expectDeletedKeys := []string{ + etcdtest.PathPrefix() + "/ThirdPartyResourceData/company.com/foos/default/test", + etcdtest.PathPrefix() + "/ThirdPartyResourceData/company.com/foos/default/bar", + } + if !assert.True(reflect.DeepEqual(fakeClient.DeletedKeys, expectDeletedKeys)) { + t.Errorf("unexpected deleted keys: %v", fakeClient.DeletedKeys) + } + installed := master.ListThirdPartyResources() + if len(installed) != 0 { + t.Errorf("Resource(s) still installed: %v", installed) + } + services := master.handlerContainer.RegisteredWebServices() + for ix := range services { + if strings.HasPrefix(services[ix].RootPath(), "/apis/company.com") { + t.Errorf("Web service still installed at %s: %#v", services[ix].RootPath(), services[ix]) + } + } +} diff --git a/pkg/registry/thirdpartyresourcedata/codec.go b/pkg/registry/thirdpartyresourcedata/codec.go index 32c2437ef6c..a292641eb2c 100644 --- a/pkg/registry/thirdpartyresourcedata/codec.go +++ b/pkg/registry/thirdpartyresourcedata/codec.go @@ -33,9 +33,17 @@ type thirdPartyResourceDataMapper struct { mapper meta.RESTMapper kind string version string + group string +} + +func (t *thirdPartyResourceDataMapper) isThirdPartyResource(resource string) bool { + return resource == strings.ToLower(t.kind)+"s" } func (t *thirdPartyResourceDataMapper) GroupForResource(resource string) (string, error) { + if t.isThirdPartyResource(resource) { + return t.group, nil + } return t.mapper.GroupForResource(resource) } @@ -66,14 +74,18 @@ func (t *thirdPartyResourceDataMapper) ResourceSingularizer(resource string) (si } func (t *thirdPartyResourceDataMapper) VersionAndKindForResource(resource string) (defaultVersion, kind string, err error) { + if t.isThirdPartyResource(resource) { + return t.version, t.kind, nil + } return t.mapper.VersionAndKindForResource(resource) } -func NewMapper(mapper meta.RESTMapper, kind, version string) meta.RESTMapper { +func NewMapper(mapper meta.RESTMapper, kind, version, group string) meta.RESTMapper { return &thirdPartyResourceDataMapper{ mapper: mapper, kind: kind, version: version, + group: group, } }