Events in separate etcd

This commit is contained in:
Wojciech Tyczynski
2015-09-30 09:56:51 +02:00
parent b9cfab87e3
commit 0f1cbe37a4
14 changed files with 290 additions and 88 deletions

View File

@@ -98,10 +98,78 @@ const (
DefaultEtcdPathPrefix = "/registry"
)
// StorageDestinations is a mapping from API group & resource to
// the underlying storage interfaces.
type StorageDestinations struct {
APIGroups map[string]*StorageDestinationsForAPIGroup
}
type StorageDestinationsForAPIGroup struct {
Default storage.Interface
Overrides map[string]storage.Interface
}
func NewStorageDestinations() StorageDestinations {
return StorageDestinations{
APIGroups: map[string]*StorageDestinationsForAPIGroup{},
}
}
func (s *StorageDestinations) AddAPIGroup(group string, defaultStorage storage.Interface) {
s.APIGroups[group] = &StorageDestinationsForAPIGroup{
Default: defaultStorage,
Overrides: map[string]storage.Interface{},
}
}
func (s *StorageDestinations) AddStorageOverride(group, resource string, override storage.Interface) {
if _, ok := s.APIGroups[group]; !ok {
s.AddAPIGroup(group, nil)
}
if s.APIGroups[group].Overrides == nil {
s.APIGroups[group].Overrides = map[string]storage.Interface{}
}
s.APIGroups[group].Overrides[resource] = override
}
func (s *StorageDestinations) get(group, resource string) storage.Interface {
apigroup, ok := s.APIGroups[group]
if !ok {
glog.Errorf("No storage defined for API group: '%s'", apigroup)
return nil
}
if apigroup.Overrides != nil {
if client, exists := apigroup.Overrides[resource]; exists {
return client
}
}
return apigroup.Default
}
// Get all backends for all registered storage destinations.
// Used for getting all instances for health validations.
func (s *StorageDestinations) backends() []string {
backends := sets.String{}
for _, group := range s.APIGroups {
if group.Default != nil {
for _, backend := range group.Default.Backends() {
backends.Insert(backend)
}
}
if group.Overrides != nil {
for _, storage := range group.Overrides {
for _, backend := range storage.Backends() {
backends.Insert(backend)
}
}
}
}
return backends.List()
}
// Config is a structure used to configure a Master.
type Config struct {
DatabaseStorage storage.Interface
ExpDatabaseStorage storage.Interface
StorageDestinations StorageDestinations
// StorageVersions is a map between groups and their storage versions
StorageVersions map[string]string
EventTTL time.Duration
@@ -435,35 +503,36 @@ func NewHandlerContainer(mux *http.ServeMux) *restful.Container {
func (m *Master) init(c *Config) {
healthzChecks := []healthz.HealthzChecker{}
m.clock = util.RealClock{}
podStorage := podetcd.NewStorage(c.DatabaseStorage, c.EnableWatchCache, c.KubeletClient)
dbClient := func(resource string) storage.Interface { return c.StorageDestinations.get("", resource) }
podStorage := podetcd.NewStorage(dbClient("pods"), c.EnableWatchCache, c.KubeletClient)
podTemplateStorage := podtemplateetcd.NewREST(c.DatabaseStorage)
podTemplateStorage := podtemplateetcd.NewREST(dbClient("podTemplates"))
eventStorage := eventetcd.NewREST(c.DatabaseStorage, uint64(c.EventTTL.Seconds()))
limitRangeStorage := limitrangeetcd.NewREST(c.DatabaseStorage)
eventStorage := eventetcd.NewREST(dbClient("events"), uint64(c.EventTTL.Seconds()))
limitRangeStorage := limitrangeetcd.NewREST(dbClient("limitRanges"))
resourceQuotaStorage, resourceQuotaStatusStorage := resourcequotaetcd.NewREST(c.DatabaseStorage)
secretStorage := secretetcd.NewREST(c.DatabaseStorage)
serviceAccountStorage := serviceaccountetcd.NewREST(c.DatabaseStorage)
persistentVolumeStorage, persistentVolumeStatusStorage := pvetcd.NewREST(c.DatabaseStorage)
persistentVolumeClaimStorage, persistentVolumeClaimStatusStorage := pvcetcd.NewREST(c.DatabaseStorage)
resourceQuotaStorage, resourceQuotaStatusStorage := resourcequotaetcd.NewREST(dbClient("resourceQuotas"))
secretStorage := secretetcd.NewREST(dbClient("secrets"))
serviceAccountStorage := serviceaccountetcd.NewREST(dbClient("serviceAccounts"))
persistentVolumeStorage, persistentVolumeStatusStorage := pvetcd.NewREST(dbClient("persistentVolumes"))
persistentVolumeClaimStorage, persistentVolumeClaimStatusStorage := pvcetcd.NewREST(dbClient("persistentVolumeClaims"))
namespaceStorage, namespaceStatusStorage, namespaceFinalizeStorage := namespaceetcd.NewREST(c.DatabaseStorage)
namespaceStorage, namespaceStatusStorage, namespaceFinalizeStorage := namespaceetcd.NewREST(dbClient("namespaces"))
m.namespaceRegistry = namespace.NewRegistry(namespaceStorage)
endpointsStorage := endpointsetcd.NewREST(c.DatabaseStorage, c.EnableWatchCache)
endpointsStorage := endpointsetcd.NewREST(dbClient("endpoints"), c.EnableWatchCache)
m.endpointRegistry = endpoint.NewRegistry(endpointsStorage)
nodeStorage, nodeStatusStorage := nodeetcd.NewREST(c.DatabaseStorage, c.EnableWatchCache, c.KubeletClient)
nodeStorage, nodeStatusStorage := nodeetcd.NewREST(dbClient("nodes"), c.EnableWatchCache, c.KubeletClient)
m.nodeRegistry = node.NewRegistry(nodeStorage)
serviceStorage := serviceetcd.NewREST(c.DatabaseStorage)
serviceStorage := serviceetcd.NewREST(dbClient("services"))
m.serviceRegistry = service.NewRegistry(serviceStorage)
var serviceClusterIPRegistry service.RangeRegistry
serviceClusterIPAllocator := ipallocator.NewAllocatorCIDRRange(m.serviceClusterIPRange, func(max int, rangeSpec string) allocator.Interface {
mem := allocator.NewAllocationMap(max, rangeSpec)
etcd := etcdallocator.NewEtcd(mem, "/ranges/serviceips", "serviceipallocation", c.DatabaseStorage)
etcd := etcdallocator.NewEtcd(mem, "/ranges/serviceips", "serviceipallocation", dbClient("services"))
serviceClusterIPRegistry = etcd
return etcd
})
@@ -472,13 +541,13 @@ func (m *Master) init(c *Config) {
var serviceNodePortRegistry service.RangeRegistry
serviceNodePortAllocator := portallocator.NewPortAllocatorCustom(m.serviceNodePortRange, func(max int, rangeSpec string) allocator.Interface {
mem := allocator.NewAllocationMap(max, rangeSpec)
etcd := etcdallocator.NewEtcd(mem, "/ranges/servicenodeports", "servicenodeportallocation", c.DatabaseStorage)
etcd := etcdallocator.NewEtcd(mem, "/ranges/servicenodeports", "servicenodeportallocation", dbClient("services"))
serviceNodePortRegistry = etcd
return etcd
})
m.serviceNodePortAllocator = serviceNodePortRegistry
controllerStorage := controlleretcd.NewREST(c.DatabaseStorage)
controllerStorage := controlleretcd.NewREST(dbClient("replicationControllers"))
// TODO: Factor out the core API registration
m.storage = map[string]rest.Storage{
@@ -579,7 +648,7 @@ func (m *Master) init(c *Config) {
// allGroups records all supported groups at /apis
allGroups := []api.APIGroup{}
if m.exp {
m.thirdPartyStorage = c.ExpDatabaseStorage
m.thirdPartyStorage = c.StorageDestinations.APIGroups["experimental"].Default
m.thirdPartyResources = map[string]*thirdpartyresourcedataetcd.REST{}
expVersion := m.experimental(c)
@@ -752,7 +821,8 @@ func (m *Master) getServersToValidate(c *Config) map[string]apiserver.Server {
"controller-manager": {Addr: "127.0.0.1", Port: ports.ControllerManagerPort, Path: "/healthz"},
"scheduler": {Addr: "127.0.0.1", Port: ports.SchedulerPort, Path: "/healthz"},
}
for ix, machine := range c.DatabaseStorage.Backends() {
for ix, machine := range c.StorageDestinations.backends() {
etcdUrl, err := url.Parse(machine)
if err != nil {
glog.Errorf("Failed to parse etcd url for validation: %v", err)
@@ -962,13 +1032,16 @@ func (m *Master) thirdpartyapi(group, kind, version string) *apiserver.APIGroupV
// experimental returns the resources and codec for the experimental api
func (m *Master) experimental(c *Config) *apiserver.APIGroupVersion {
controllerStorage := expcontrolleretcd.NewStorage(c.DatabaseStorage)
autoscalerStorage := horizontalpodautoscaleretcd.NewREST(c.ExpDatabaseStorage)
thirdPartyResourceStorage := thirdpartyresourceetcd.NewREST(c.ExpDatabaseStorage)
daemonSetStorage, daemonSetStatusStorage := daemonetcd.NewREST(c.ExpDatabaseStorage)
deploymentStorage := deploymentetcd.NewStorage(c.ExpDatabaseStorage)
jobStorage, jobStatusStorage := jobetcd.NewREST(c.ExpDatabaseStorage)
ingressStorage := ingressetcd.NewREST(c.ExpDatabaseStorage)
controllerStorage := expcontrolleretcd.NewStorage(c.StorageDestinations.get("", "replicationControllers"))
dbClient := func(resource string) storage.Interface {
return c.StorageDestinations.get("experimental", resource)
}
autoscalerStorage := horizontalpodautoscaleretcd.NewREST(dbClient("horizonalpodautoscalers"))
thirdPartyResourceStorage := thirdpartyresourceetcd.NewREST(dbClient("thirdpartyresources"))
daemonSetStorage, daemonSetStatusStorage := daemonetcd.NewREST(dbClient("daemonsets"))
deploymentStorage := deploymentetcd.NewStorage(dbClient("deployments"))
jobStorage, jobStatusStorage := jobetcd.NewREST(dbClient("jobs"))
ingressStorage := ingressetcd.NewREST(dbClient("ingress"))
thirdPartyControl := ThirdPartyController{
master: m,