Pass versioner to cacher.

This commit is contained in:
Wojciech Tyczynski
2015-11-05 16:04:42 +01:00
parent 0d9f2dc5fd
commit a5a8717539
49 changed files with 218 additions and 156 deletions

View File

@@ -51,6 +51,8 @@ import (
endpointsetcd "k8s.io/kubernetes/pkg/registry/endpoint/etcd"
eventetcd "k8s.io/kubernetes/pkg/registry/event/etcd"
expcontrolleretcd "k8s.io/kubernetes/pkg/registry/experimental/controller/etcd"
"k8s.io/kubernetes/pkg/registry/generic"
genericetcd "k8s.io/kubernetes/pkg/registry/generic/etcd"
ingressetcd "k8s.io/kubernetes/pkg/registry/ingress/etcd"
jobetcd "k8s.io/kubernetes/pkg/registry/job/etcd"
limitrangeetcd "k8s.io/kubernetes/pkg/registry/limitrange/etcd"
@@ -268,11 +270,11 @@ type Config struct {
KubernetesServiceNodePort int
}
func (c *Config) storageFactory() storage.StorageFactory {
func (c *Config) storageDecorator() generic.StorageDecorator {
if c.EnableWatchCache {
return storage.NewCacher
return genericetcd.StorageWithCacher
}
return storage.NoDecoration
return generic.UndecoratedStorage
}
type InstallSSHKey func(user string, data []byte) error
@@ -543,31 +545,31 @@ func (m *Master) init(c *Config) {
healthzChecks := []healthz.HealthzChecker{}
storageFactory := c.storageFactory()
storageDecorator := c.storageDecorator()
dbClient := func(resource string) storage.Interface { return c.StorageDestinations.get("", resource) }
podStorage := podetcd.NewStorage(dbClient("pods"), storageFactory, c.KubeletClient, m.proxyTransport)
podStorage := podetcd.NewStorage(dbClient("pods"), storageDecorator, c.KubeletClient, m.proxyTransport)
podTemplateStorage := podtemplateetcd.NewREST(dbClient("podTemplates"), storageFactory)
podTemplateStorage := podtemplateetcd.NewREST(dbClient("podTemplates"), storageDecorator)
eventStorage := eventetcd.NewREST(dbClient("events"), storageFactory, uint64(c.EventTTL.Seconds()))
limitRangeStorage := limitrangeetcd.NewREST(dbClient("limitRanges"), storageFactory)
eventStorage := eventetcd.NewREST(dbClient("events"), storageDecorator, uint64(c.EventTTL.Seconds()))
limitRangeStorage := limitrangeetcd.NewREST(dbClient("limitRanges"), storageDecorator)
resourceQuotaStorage, resourceQuotaStatusStorage := resourcequotaetcd.NewREST(dbClient("resourceQuotas"), storageFactory)
secretStorage := secretetcd.NewREST(dbClient("secrets"), storageFactory)
serviceAccountStorage := serviceaccountetcd.NewREST(dbClient("serviceAccounts"), storageFactory)
persistentVolumeStorage, persistentVolumeStatusStorage := pvetcd.NewREST(dbClient("persistentVolumes"), storageFactory)
persistentVolumeClaimStorage, persistentVolumeClaimStatusStorage := pvcetcd.NewREST(dbClient("persistentVolumeClaims"), storageFactory)
resourceQuotaStorage, resourceQuotaStatusStorage := resourcequotaetcd.NewREST(dbClient("resourceQuotas"), storageDecorator)
secretStorage := secretetcd.NewREST(dbClient("secrets"), storageDecorator)
serviceAccountStorage := serviceaccountetcd.NewREST(dbClient("serviceAccounts"), storageDecorator)
persistentVolumeStorage, persistentVolumeStatusStorage := pvetcd.NewREST(dbClient("persistentVolumes"), storageDecorator)
persistentVolumeClaimStorage, persistentVolumeClaimStatusStorage := pvcetcd.NewREST(dbClient("persistentVolumeClaims"), storageDecorator)
namespaceStorage, namespaceStatusStorage, namespaceFinalizeStorage := namespaceetcd.NewREST(dbClient("namespaces"), storageFactory)
namespaceStorage, namespaceStatusStorage, namespaceFinalizeStorage := namespaceetcd.NewREST(dbClient("namespaces"), storageDecorator)
m.namespaceRegistry = namespace.NewRegistry(namespaceStorage)
endpointsStorage := endpointsetcd.NewREST(dbClient("endpoints"), storageFactory)
endpointsStorage := endpointsetcd.NewREST(dbClient("endpoints"), storageDecorator)
m.endpointRegistry = endpoint.NewRegistry(endpointsStorage)
nodeStorage, nodeStatusStorage := nodeetcd.NewREST(dbClient("nodes"), storageFactory, c.KubeletClient, m.proxyTransport)
nodeStorage, nodeStatusStorage := nodeetcd.NewREST(dbClient("nodes"), storageDecorator, c.KubeletClient, m.proxyTransport)
m.nodeRegistry = node.NewRegistry(nodeStorage)
serviceStorage := serviceetcd.NewREST(dbClient("services"), storageFactory)
serviceStorage := serviceetcd.NewREST(dbClient("services"), storageDecorator)
m.serviceRegistry = service.NewRegistry(serviceStorage)
var serviceClusterIPRegistry service.RangeRegistry
@@ -588,7 +590,7 @@ func (m *Master) init(c *Config) {
})
m.serviceNodePortAllocator = serviceNodePortRegistry
controllerStorage, controllerStatusStorage := controlleretcd.NewREST(dbClient("replicationControllers"), storageFactory)
controllerStorage, controllerStatusStorage := controlleretcd.NewREST(dbClient("replicationControllers"), storageDecorator)
// TODO: Factor out the core API registration
m.storage = map[string]rest.Storage{
@@ -1008,7 +1010,7 @@ func (m *Master) InstallThirdPartyResource(rsrc *expapi.ThirdPartyResource) erro
}
func (m *Master) thirdpartyapi(group, kind, version string) *apiserver.APIGroupVersion {
resourceStorage := thirdpartyresourcedataetcd.NewREST(m.thirdPartyStorage, storage.NoDecoration, group, kind)
resourceStorage := thirdpartyresourcedataetcd.NewREST(m.thirdPartyStorage, generic.UndecoratedStorage, group, kind)
apiRoot := makeThirdPartyPath("")
@@ -1050,22 +1052,22 @@ func (m *Master) experimental(c *Config) *apiserver.APIGroupVersion {
}
return enabled
}
storageFactory := c.storageFactory()
storageDecorator := c.storageDecorator()
dbClient := func(resource string) storage.Interface {
return c.StorageDestinations.get("extensions", resource)
}
storage := map[string]rest.Storage{}
if isEnabled("horizontalpodautoscalers") {
autoscalerStorage, autoscalerStatusStorage := horizontalpodautoscaleretcd.NewREST(dbClient("horizontalpodautoscalers"), storageFactory)
autoscalerStorage, autoscalerStatusStorage := horizontalpodautoscaleretcd.NewREST(dbClient("horizontalpodautoscalers"), storageDecorator)
storage["horizontalpodautoscalers"] = autoscalerStorage
storage["horizontalpodautoscalers/status"] = autoscalerStatusStorage
controllerStorage := expcontrolleretcd.NewStorage(c.StorageDestinations.get("", "replicationControllers"), storageFactory)
controllerStorage := expcontrolleretcd.NewStorage(c.StorageDestinations.get("", "replicationControllers"), storageDecorator)
storage["replicationcontrollers"] = controllerStorage.ReplicationController
storage["replicationcontrollers/scale"] = controllerStorage.Scale
}
if isEnabled("thirdpartyresources") {
thirdPartyResourceStorage := thirdpartyresourceetcd.NewREST(dbClient("thirdpartyresources"), storageFactory)
thirdPartyResourceStorage := thirdpartyresourceetcd.NewREST(dbClient("thirdpartyresources"), storageDecorator)
thirdPartyControl := ThirdPartyController{
master: m,
thirdPartyResourceRegistry: thirdPartyResourceStorage,
@@ -1082,23 +1084,23 @@ func (m *Master) experimental(c *Config) *apiserver.APIGroupVersion {
}
if isEnabled("daemonsets") {
daemonSetStorage, daemonSetStatusStorage := daemonetcd.NewREST(dbClient("daemonsets"), storageFactory)
daemonSetStorage, daemonSetStatusStorage := daemonetcd.NewREST(dbClient("daemonsets"), storageDecorator)
storage["daemonsets"] = daemonSetStorage
storage["daemonsets/status"] = daemonSetStatusStorage
}
if isEnabled("deployments") {
deploymentStorage := deploymentetcd.NewStorage(dbClient("deployments"), storageFactory)
deploymentStorage := deploymentetcd.NewStorage(dbClient("deployments"), storageDecorator)
storage["deployments"] = deploymentStorage.Deployment
storage["deployments/status"] = deploymentStorage.Status
storage["deployments/scale"] = deploymentStorage.Scale
}
if isEnabled("jobs") {
jobStorage, jobStatusStorage := jobetcd.NewREST(dbClient("jobs"), storageFactory)
jobStorage, jobStatusStorage := jobetcd.NewREST(dbClient("jobs"), storageDecorator)
storage["jobs"] = jobStorage
storage["jobs/status"] = jobStatusStorage
}
if isEnabled("ingresses") {
ingressStorage, ingressStatusStorage := ingressetcd.NewREST(dbClient("ingresses"), storageFactory)
ingressStorage, ingressStatusStorage := ingressetcd.NewREST(dbClient("ingresses"), storageDecorator)
storage["ingresses"] = ingressStorage
storage["ingresses/status"] = ingressStatusStorage
}