separate third party resources from master (moves + consequences)

This commit is contained in:
deads2k
2016-10-17 15:58:12 -04:00
parent cbe65701a2
commit f9cbc42581
4 changed files with 332 additions and 308 deletions

View File

@@ -23,15 +23,11 @@ import (
"reflect"
"strconv"
"strings"
"sync"
"time"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/meta"
"k8s.io/kubernetes/pkg/api/rest"
"k8s.io/kubernetes/pkg/api/unversioned"
apiv1 "k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/apimachinery/registered"
appsapi "k8s.io/kubernetes/pkg/apis/apps/v1alpha1"
authenticationv1beta1 "k8s.io/kubernetes/pkg/apis/authentication/v1beta1"
"k8s.io/kubernetes/pkg/apis/authorization"
@@ -56,13 +52,12 @@ import (
"k8s.io/kubernetes/pkg/healthz"
kubeletclient "k8s.io/kubernetes/pkg/kubelet/client"
"k8s.io/kubernetes/pkg/master/ports"
"k8s.io/kubernetes/pkg/master/thirdparty"
"k8s.io/kubernetes/pkg/registry/generic"
"k8s.io/kubernetes/pkg/registry/generic/registry"
"k8s.io/kubernetes/pkg/routes"
"k8s.io/kubernetes/pkg/runtime"
etcdutil "k8s.io/kubernetes/pkg/storage/etcd/util"
"k8s.io/kubernetes/pkg/storage/storagebackend"
"k8s.io/kubernetes/pkg/util/sets"
"github.com/golang/glog"
@@ -80,10 +75,6 @@ import (
policyrest "k8s.io/kubernetes/pkg/registry/policy/rest"
rbacrest "k8s.io/kubernetes/pkg/registry/rbac/rest"
storagerest "k8s.io/kubernetes/pkg/registry/storage/rest"
// direct etcd registry dependencies
"k8s.io/kubernetes/pkg/registry/extensions/thirdpartyresourcedata"
thirdpartyresourcedataetcd "k8s.io/kubernetes/pkg/registry/extensions/thirdpartyresourcedata/etcd"
)
const (
@@ -121,27 +112,12 @@ type EndpointReconcilerConfig struct {
type Master struct {
GenericAPIServer *genericapiserver.GenericAPIServer
deleteCollectionWorkers int
// storage for third party objects
thirdPartyStorageConfig *storagebackend.Config
// map from api path to a tuple of (storage for the objects, APIGroup)
thirdPartyResources map[string]*thirdPartyEntry
// protects the map
thirdPartyResourcesLock sync.RWMutex
thirdPartyResourceServer *thirdparty.ThirdPartyResourceServer
// nodeClient is used to back the tunneler
nodeClient coreclient.NodeInterface
}
// thirdPartyEntry combines objects storage and API group into one struct
// for easy lookup.
type thirdPartyEntry struct {
// Map from plural resource name to entry
storage map[string]*thirdpartyresourcedataetcd.REST
group unversioned.APIGroup
}
type RESTOptionsGetter func(resource unversioned.GroupResource) generic.RESTOptions
type RESTStorageProvider interface {
@@ -199,9 +175,10 @@ func (c completedConfig) New() (*Master, error) {
}
m := &Master{
GenericAPIServer: s,
deleteCollectionWorkers: c.DeleteCollectionWorkers,
nodeClient: coreclient.NewForConfigOrDie(c.GenericConfig.LoopbackClientConfig).Nodes(),
GenericAPIServer: s,
nodeClient: coreclient.NewForConfigOrDie(c.GenericConfig.LoopbackClientConfig).Nodes(),
thirdPartyResourceServer: thirdparty.NewThirdPartyResourceServer(s),
}
restOptionsFactory := restOptionsFactory{
@@ -241,7 +218,7 @@ func (c completedConfig) New() (*Master, error) {
c.RESTStorageProviders[autoscaling.GroupName] = autoscalingrest.RESTStorageProvider{}
c.RESTStorageProviders[batch.GroupName] = batchrest.RESTStorageProvider{}
c.RESTStorageProviders[certificates.GroupName] = certificatesrest.RESTStorageProvider{}
c.RESTStorageProviders[extensions.GroupName] = extensionsrest.RESTStorageProvider{ResourceInterface: m}
c.RESTStorageProviders[extensions.GroupName] = extensionsrest.RESTStorageProvider{ResourceInterface: m.thirdPartyResourceServer}
c.RESTStorageProviders[policy.GroupName] = policyrest.RESTStorageProvider{}
c.RESTStorageProviders[rbac.GroupName] = &rbacrest.RESTStorageProvider{AuthorizerRBACSuperUser: c.GenericConfig.AuthorizerRBACSuperUser}
c.RESTStorageProviders[storage.GroupName] = storagerest.RESTStorageProvider{}
@@ -300,11 +277,11 @@ func (m *Master) InstallAPIs(c *Config, restOptionsGetter genericapiserver.RESTO
// TODO seems like this bit ought to be unconditional and the REST API is controlled by the config
if c.GenericConfig.APIResourceConfigSource.ResourceEnabled(extensionsapiv1beta1.SchemeGroupVersion.WithResource("thirdpartyresources")) {
var err error
m.thirdPartyStorageConfig, err = c.StorageFactory.NewConfig(extensions.Resource("thirdpartyresources"))
// TODO figure out why this isn't a loopback client
m.thirdPartyResourceServer.ThirdPartyStorageConfig, err = c.StorageFactory.NewConfig(extensions.Resource("thirdpartyresources"))
if err != nil {
glog.Fatalf("Error getting third party storage: %v", err)
}
m.thirdPartyResources = map[string]*thirdPartyEntry{}
}
// stabilize order.
@@ -380,241 +357,6 @@ func getServersToValidate(storageFactory genericapiserver.StorageFactory) map[st
return serversToValidate
}
// HasThirdPartyResource returns true if a particular third party resource currently installed.
func (m *Master) HasThirdPartyResource(rsrc *extensions.ThirdPartyResource) (bool, error) {
kind, group, err := thirdpartyresourcedata.ExtractApiGroupAndKind(rsrc)
if err != nil {
return false, err
}
path := extensionsrest.MakeThirdPartyPath(group)
m.thirdPartyResourcesLock.Lock()
defer m.thirdPartyResourcesLock.Unlock()
entry := m.thirdPartyResources[path]
if entry == nil {
return false, nil
}
plural, _ := meta.KindToResource(unversioned.GroupVersionKind{
Group: group,
Version: rsrc.Versions[0].Name,
Kind: kind,
})
_, found := entry.storage[plural.Resource]
return found, nil
}
func (m *Master) removeThirdPartyStorage(path, resource string) error {
m.thirdPartyResourcesLock.Lock()
defer m.thirdPartyResourcesLock.Unlock()
entry, found := m.thirdPartyResources[path]
if !found {
return nil
}
storage, found := entry.storage[resource]
if !found {
return nil
}
if err := m.removeAllThirdPartyResources(storage); err != nil {
return err
}
delete(entry.storage, resource)
if len(entry.storage) == 0 {
delete(m.thirdPartyResources, path)
m.GenericAPIServer.RemoveAPIGroupForDiscovery(extensionsrest.GetThirdPartyGroupName(path))
} else {
m.thirdPartyResources[path] = entry
}
return nil
}
// RemoveThirdPartyResource removes all resources matching `path`. Also deletes any stored data
func (m *Master) RemoveThirdPartyResource(path string) error {
ix := strings.LastIndex(path, "/")
if ix == -1 {
return fmt.Errorf("expected <api-group>/<resource-plural-name>, saw: %s", path)
}
resource := path[ix+1:]
path = path[0:ix]
if err := m.removeThirdPartyStorage(path, resource); err != nil {
return err
}
services := m.GenericAPIServer.HandlerContainer.RegisteredWebServices()
for ix := range services {
root := services[ix].RootPath()
if root == path || strings.HasPrefix(root, path+"/") {
m.GenericAPIServer.HandlerContainer.Remove(services[ix])
}
}
return nil
}
func (m *Master) removeAllThirdPartyResources(registry *thirdpartyresourcedataetcd.REST) error {
ctx := api.NewDefaultContext()
existingData, err := registry.List(ctx, nil)
if err != nil {
return err
}
list, ok := existingData.(*extensions.ThirdPartyResourceDataList)
if !ok {
return fmt.Errorf("expected a *ThirdPartyResourceDataList, got %#v", list)
}
for ix := range list.Items {
item := &list.Items[ix]
if _, err := registry.Delete(ctx, item.Name, nil); err != nil {
return err
}
}
return nil
}
// ListThirdPartyResources lists all currently installed third party resources
// The format is <path>/<resource-plural-name>
func (m *Master) ListThirdPartyResources() []string {
m.thirdPartyResourcesLock.RLock()
defer m.thirdPartyResourcesLock.RUnlock()
result := []string{}
for key := range m.thirdPartyResources {
for rsrc := range m.thirdPartyResources[key].storage {
result = append(result, key+"/"+rsrc)
}
}
return result
}
func (m *Master) getExistingThirdPartyResources(path string) []unversioned.APIResource {
result := []unversioned.APIResource{}
m.thirdPartyResourcesLock.Lock()
defer m.thirdPartyResourcesLock.Unlock()
entry := m.thirdPartyResources[path]
if entry != nil {
for key, obj := range entry.storage {
result = append(result, unversioned.APIResource{
Name: key,
Namespaced: true,
Kind: obj.Kind(),
})
}
}
return result
}
func (m *Master) hasThirdPartyGroupStorage(path string) bool {
m.thirdPartyResourcesLock.Lock()
defer m.thirdPartyResourcesLock.Unlock()
_, found := m.thirdPartyResources[path]
return found
}
func (m *Master) addThirdPartyResourceStorage(path, resource string, storage *thirdpartyresourcedataetcd.REST, apiGroup unversioned.APIGroup) {
m.thirdPartyResourcesLock.Lock()
defer m.thirdPartyResourcesLock.Unlock()
entry, found := m.thirdPartyResources[path]
if entry == nil {
entry = &thirdPartyEntry{
group: apiGroup,
storage: map[string]*thirdpartyresourcedataetcd.REST{},
}
m.thirdPartyResources[path] = entry
}
entry.storage[resource] = storage
if !found {
m.GenericAPIServer.AddAPIGroupForDiscovery(apiGroup)
}
}
// InstallThirdPartyResource installs a third party resource specified by 'rsrc'. When a resource is
// installed a corresponding RESTful resource is added as a valid path in the web service provided by
// the master.
//
// For example, if you install a resource ThirdPartyResource{ Name: "foo.company.com", Versions: {"v1"} }
// then the following RESTful resource is created on the server:
// http://<host>/apis/company.com/v1/foos/...
func (m *Master) InstallThirdPartyResource(rsrc *extensions.ThirdPartyResource) error {
kind, group, err := thirdpartyresourcedata.ExtractApiGroupAndKind(rsrc)
if err != nil {
return err
}
plural, _ := meta.KindToResource(unversioned.GroupVersionKind{
Group: group,
Version: rsrc.Versions[0].Name,
Kind: kind,
})
path := extensionsrest.MakeThirdPartyPath(group)
groupVersion := unversioned.GroupVersionForDiscovery{
GroupVersion: group + "/" + rsrc.Versions[0].Name,
Version: rsrc.Versions[0].Name,
}
apiGroup := unversioned.APIGroup{
Name: group,
Versions: []unversioned.GroupVersionForDiscovery{groupVersion},
PreferredVersion: groupVersion,
}
thirdparty := m.thirdpartyapi(group, kind, rsrc.Versions[0].Name, plural.Resource)
// If storage exists, this group has already been added, just update
// the group with the new API
if m.hasThirdPartyGroupStorage(path) {
m.addThirdPartyResourceStorage(path, plural.Resource, thirdparty.Storage[plural.Resource].(*thirdpartyresourcedataetcd.REST), apiGroup)
return thirdparty.UpdateREST(m.GenericAPIServer.HandlerContainer.Container)
}
if err := thirdparty.InstallREST(m.GenericAPIServer.HandlerContainer.Container); err != nil {
glog.Errorf("Unable to setup thirdparty api: %v", err)
}
m.GenericAPIServer.HandlerContainer.Add(apiserver.NewGroupWebService(api.Codecs, path, apiGroup))
m.addThirdPartyResourceStorage(path, plural.Resource, thirdparty.Storage[plural.Resource].(*thirdpartyresourcedataetcd.REST), apiGroup)
return nil
}
func (m *Master) thirdpartyapi(group, kind, version, pluralResource string) *apiserver.APIGroupVersion {
resourceStorage := thirdpartyresourcedataetcd.NewREST(
generic.RESTOptions{
StorageConfig: m.thirdPartyStorageConfig,
Decorator: generic.UndecoratedStorage,
DeleteCollectionWorkers: m.deleteCollectionWorkers,
},
group,
kind,
)
storage := map[string]rest.Storage{
pluralResource: resourceStorage,
}
optionsExternalVersion := registered.GroupOrDie(api.GroupName).GroupVersion
internalVersion := unversioned.GroupVersion{Group: group, Version: runtime.APIVersionInternal}
externalVersion := unversioned.GroupVersion{Group: group, Version: version}
apiRoot := extensionsrest.MakeThirdPartyPath("")
return &apiserver.APIGroupVersion{
Root: apiRoot,
GroupVersion: externalVersion,
Creater: thirdpartyresourcedata.NewObjectCreator(group, version, api.Scheme),
Convertor: api.Scheme,
Copier: api.Scheme,
Typer: api.Scheme,
Mapper: thirdpartyresourcedata.NewMapper(registered.GroupOrDie(extensions.GroupName).RESTMapper, kind, version, group),
Linker: registered.GroupOrDie(extensions.GroupName).SelfLinker,
Storage: storage,
OptionsExternalVersion: &optionsExternalVersion,
Serializer: thirdpartyresourcedata.NewNegotiatedSerializer(api.Codecs, kind, externalVersion, internalVersion),
ParameterCodec: thirdpartyresourcedata.NewThirdPartyParameterCodec(api.ParameterCodec),
Context: m.GenericAPIServer.RequestContextMapper(),
MinRequestTimeout: m.GenericAPIServer.MinRequestTimeout(),
ResourceLister: dynamicLister{m, extensionsrest.MakeThirdPartyPath(group)},
}
}
type restOptionsFactory struct {
deleteCollectionWorkers int
enableGarbageCollection bool