Enable cacher for all resources.

This commit is contained in:
Wojciech Tyczynski
2015-10-29 10:51:32 +01:00
parent 25b7c79860
commit 030a272db5
40 changed files with 226 additions and 127 deletions

View File

@@ -268,6 +268,13 @@ type Config struct {
KubernetesServiceNodePort int
}
func (c *Config) storageFactory() storage.StorageFactory {
if c.EnableWatchCache {
return storage.NewCacher
}
return storage.NoDecoration
}
type InstallSSHKey func(user string, data []byte) error
// Master contains state for a Kubernetes cluster master/api server.
@@ -535,27 +542,22 @@ func (m *Master) init(c *Config) {
healthzChecks := []healthz.HealthzChecker{}
var storageFactory storage.StorageFactory
if c.EnableWatchCache {
storageFactory = storage.NewCacher
} else {
storageFactory = storage.NoDecoration
}
storageFactory := c.storageFactory()
dbClient := func(resource string) storage.Interface { return c.StorageDestinations.get("", resource) }
podStorage := podetcd.NewStorage(dbClient("pods"), storageFactory, c.KubeletClient, m.proxyTransport)
podTemplateStorage := podtemplateetcd.NewREST(dbClient("podTemplates"))
podTemplateStorage := podtemplateetcd.NewREST(dbClient("podTemplates"), storageFactory)
eventStorage := eventetcd.NewREST(dbClient("events"), uint64(c.EventTTL.Seconds()))
limitRangeStorage := limitrangeetcd.NewREST(dbClient("limitRanges"))
eventStorage := eventetcd.NewREST(dbClient("events"), storageFactory, uint64(c.EventTTL.Seconds()))
limitRangeStorage := limitrangeetcd.NewREST(dbClient("limitRanges"), storageFactory)
resourceQuotaStorage, resourceQuotaStatusStorage := resourcequotaetcd.NewREST(dbClient("resourceQuotas"))
secretStorage := secretetcd.NewREST(dbClient("secrets"))
serviceAccountStorage := serviceaccountetcd.NewREST(dbClient("serviceAccounts"))
persistentVolumeStorage, persistentVolumeStatusStorage := pvetcd.NewREST(dbClient("persistentVolumes"))
persistentVolumeClaimStorage, persistentVolumeClaimStatusStorage := pvcetcd.NewREST(dbClient("persistentVolumeClaims"))
resourceQuotaStorage, resourceQuotaStatusStorage := resourcequotaetcd.NewREST(dbClient("resourceQuotas"), storageFactory)
secretStorage := secretetcd.NewREST(dbClient("secrets"), storageFactory)
serviceAccountStorage := serviceaccountetcd.NewREST(dbClient("serviceAccounts"), storageFactory)
persistentVolumeStorage, persistentVolumeStatusStorage := pvetcd.NewREST(dbClient("persistentVolumes"), storageFactory)
persistentVolumeClaimStorage, persistentVolumeClaimStatusStorage := pvcetcd.NewREST(dbClient("persistentVolumeClaims"), storageFactory)
namespaceStorage, namespaceStatusStorage, namespaceFinalizeStorage := namespaceetcd.NewREST(dbClient("namespaces"))
namespaceStorage, namespaceStatusStorage, namespaceFinalizeStorage := namespaceetcd.NewREST(dbClient("namespaces"), storageFactory)
m.namespaceRegistry = namespace.NewRegistry(namespaceStorage)
endpointsStorage := endpointsetcd.NewREST(dbClient("endpoints"), storageFactory)
@@ -564,7 +566,7 @@ func (m *Master) init(c *Config) {
nodeStorage, nodeStatusStorage := nodeetcd.NewREST(dbClient("nodes"), storageFactory, c.KubeletClient, m.proxyTransport)
m.nodeRegistry = node.NewRegistry(nodeStorage)
serviceStorage := serviceetcd.NewREST(dbClient("services"))
serviceStorage := serviceetcd.NewREST(dbClient("services"), storageFactory)
m.serviceRegistry = service.NewRegistry(serviceStorage)
var serviceClusterIPRegistry service.RangeRegistry
@@ -585,7 +587,7 @@ func (m *Master) init(c *Config) {
})
m.serviceNodePortAllocator = serviceNodePortRegistry
controllerStorage, controllerStatusStorage := controlleretcd.NewREST(dbClient("replicationControllers"))
controllerStorage, controllerStatusStorage := controlleretcd.NewREST(dbClient("replicationControllers"), storageFactory)
// TODO: Factor out the core API registration
m.storage = map[string]rest.Storage{
@@ -1005,7 +1007,7 @@ func (m *Master) InstallThirdPartyResource(rsrc *expapi.ThirdPartyResource) erro
}
func (m *Master) thirdpartyapi(group, kind, version string) *apiserver.APIGroupVersion {
resourceStorage := thirdpartyresourcedataetcd.NewREST(m.thirdPartyStorage, group, kind)
resourceStorage := thirdpartyresourcedataetcd.NewREST(m.thirdPartyStorage, storage.NoDecoration, group, kind)
apiRoot := makeThirdPartyPath("")
@@ -1047,21 +1049,22 @@ func (m *Master) experimental(c *Config) *apiserver.APIGroupVersion {
}
return enabled
}
storageFactory := c.storageFactory()
dbClient := func(resource string) storage.Interface {
return c.StorageDestinations.get("extensions", resource)
}
storage := map[string]rest.Storage{}
if isEnabled("horizontalpodautoscalers") {
autoscalerStorage, autoscalerStatusStorage := horizontalpodautoscaleretcd.NewREST(dbClient("horizontalpodautoscalers"))
autoscalerStorage, autoscalerStatusStorage := horizontalpodautoscaleretcd.NewREST(dbClient("horizontalpodautoscalers"), storageFactory)
storage["horizontalpodautoscalers"] = autoscalerStorage
storage["horizontalpodautoscalers/status"] = autoscalerStatusStorage
controllerStorage := expcontrolleretcd.NewStorage(c.StorageDestinations.get("", "replicationControllers"))
controllerStorage := expcontrolleretcd.NewStorage(c.StorageDestinations.get("", "replicationControllers"), storageFactory)
storage["replicationcontrollers"] = controllerStorage.ReplicationController
storage["replicationcontrollers/scale"] = controllerStorage.Scale
}
if isEnabled("thirdpartyresources") {
thirdPartyResourceStorage := thirdpartyresourceetcd.NewREST(dbClient("thirdpartyresources"))
thirdPartyResourceStorage := thirdpartyresourceetcd.NewREST(dbClient("thirdpartyresources"), storageFactory)
thirdPartyControl := ThirdPartyController{
master: m,
thirdPartyResourceRegistry: thirdPartyResourceStorage,
@@ -1078,23 +1081,23 @@ func (m *Master) experimental(c *Config) *apiserver.APIGroupVersion {
}
if isEnabled("daemonsets") {
daemonSetStorage, daemonSetStatusStorage := daemonetcd.NewREST(dbClient("daemonsets"))
daemonSetStorage, daemonSetStatusStorage := daemonetcd.NewREST(dbClient("daemonsets"), storageFactory)
storage["daemonsets"] = daemonSetStorage
storage["daemonsets/status"] = daemonSetStatusStorage
}
if isEnabled("deployments") {
deploymentStorage := deploymentetcd.NewStorage(dbClient("deployments"))
deploymentStorage := deploymentetcd.NewStorage(dbClient("deployments"), storageFactory)
storage["deployments"] = deploymentStorage.Deployment
storage["deployments/status"] = deploymentStorage.Status
storage["deployments/scale"] = deploymentStorage.Scale
}
if isEnabled("jobs") {
jobStorage, jobStatusStorage := jobetcd.NewREST(dbClient("jobs"))
jobStorage, jobStatusStorage := jobetcd.NewREST(dbClient("jobs"), storageFactory)
storage["jobs"] = jobStorage
storage["jobs/status"] = jobStatusStorage
}
if isEnabled("ingresses") {
ingressStorage, ingressStatusStorage := ingressetcd.NewREST(dbClient("ingresses"))
ingressStorage, ingressStatusStorage := ingressetcd.NewREST(dbClient("ingresses"), storageFactory)
storage["ingresses"] = ingressStorage
storage["ingresses/status"] = ingressStatusStorage
}