diff --git a/cluster/saltbase/salt/etcd/etcd.manifest b/cluster/saltbase/salt/etcd/etcd.manifest index 5095edd7f9f..09fdb9aeee3 100644 --- a/cluster/saltbase/salt/etcd/etcd.manifest +++ b/cluster/saltbase/salt/etcd/etcd.manifest @@ -22,7 +22,7 @@ "command": [ "/bin/sh", "-c", - "/usr/local/bin/etcd --listen-peer-urls=http://127.0.0.1:{{ server_port }} --addr 127.0.0.1:{{ port }} --bind-addr 127.0.0.1:{{ port }} --data-dir /var/etcd/data{{ suffix }} 1>>/var/log/etcd{{ suffix }}.log 2>&1" + "/usr/local/bin/etcd --listen-peer-urls http://127.0.0.1:{{ server_port }} --addr 127.0.0.1:{{ port }} --bind-addr 127.0.0.1:{{ port }} --data-dir /var/etcd/data{{ suffix }} 1>>/var/log/etcd{{ suffix }}.log 2>&1" ], "livenessProbe": { "httpGet": { diff --git a/cluster/saltbase/salt/kube-apiserver/kube-apiserver.manifest b/cluster/saltbase/salt/kube-apiserver/kube-apiserver.manifest index 0afab9c79c0..ddc013d85d5 100644 --- a/cluster/saltbase/salt/kube-apiserver/kube-apiserver.manifest +++ b/cluster/saltbase/salt/kube-apiserver/kube-apiserver.manifest @@ -44,6 +44,7 @@ {% endif -%} {% set etcd_servers = "--etcd-servers=http://127.0.0.1:4001" -%} +{% set etcd_servers_overrides = "--etcd-servers-overrides=/events#http://127.0.0.1:4002" -%} {% set service_cluster_ip_range = "" -%} {% if pillar['service_cluster_ip_range'] is defined -%} @@ -88,7 +89,7 @@ {% set runtime_config = "--runtime-config=" + grains.runtime_config -%} {% endif -%} -{% set params = address + " " + etcd_servers + " " + cloud_provider + " " + cloud_config + " " + runtime_config + " " + admission_control + " " + service_cluster_ip_range + " " + client_ca_file + " " + basic_auth_file + " " + min_request_timeout -%} +{% set params = address + " " + etcd_servers + " " + etcd_servers_overrides + " " + cloud_provider + " " + cloud_config + " " + runtime_config + " " + admission_control + " " + service_cluster_ip_range + " " + client_ca_file + " " + basic_auth_file + " " + min_request_timeout -%} {% set params = params + " " + cluster_name + " " + cert_file + " " + key_file + " --secure-port=" + secure_port + " " + token_auth_file + " " + bind_address + " " + pillar['log_level'] + " " + advertise_address + " " + proxy_ssh_options -%} # test_args has to be kept at the end, so they'll overwrite any prior configuration diff --git a/cmd/integration/integration.go b/cmd/integration/integration.go index 815530613a4..2d3a5560696 100644 --- a/cmd/integration/integration.go +++ b/cmd/integration/integration.go @@ -144,10 +144,12 @@ func startComponents(firstManifestURL, secondManifestURL string) (string, string } expEtcdStorage, err := master.NewEtcdStorage(etcdClient, latest.GroupOrDie("experimental").InterfacesFor, testapi.Experimental.GroupAndVersion(), etcdtest.PathPrefix()) storageVersions["experimental"] = testapi.Experimental.GroupAndVersion() - if err != nil { glog.Fatalf("Unable to get etcd storage for experimental: %v", err) } + storageDestinations := master.NewStorageDestinations() + storageDestinations.AddAPIGroup("", etcdStorage) + storageDestinations.AddAPIGroup("experimental", expEtcdStorage) // Master host, port, err := net.SplitHostPort(strings.TrimLeft(apiServer.URL, "http://")) @@ -166,8 +168,7 @@ func startComponents(firstManifestURL, secondManifestURL string) (string, string // Create a master and install handlers into mux. m := master.New(&master.Config{ - DatabaseStorage: etcdStorage, - ExpDatabaseStorage: expEtcdStorage, + StorageDestinations: storageDestinations, KubeletClient: fakeKubeletClient{}, EnableCoreControllers: true, EnableLogsSupport: false, diff --git a/cmd/kube-apiserver/app/server.go b/cmd/kube-apiserver/app/server.go index a761267c58d..a9a580a299a 100644 --- a/cmd/kube-apiserver/app/server.go +++ b/cmd/kube-apiserver/app/server.go @@ -95,6 +95,7 @@ type APIServer struct { AdmissionControlConfigFile string EtcdServerList []string EtcdConfigFile string + EtcdServersOverrides []string EtcdPathPrefix string CorsAllowedOriginList []string AllowPrivileged bool @@ -211,6 +212,7 @@ func (s *APIServer) AddFlags(fs *pflag.FlagSet) { fs.StringVar(&s.AdmissionControlConfigFile, "admission-control-config-file", s.AdmissionControlConfigFile, "File with admission control configuration.") fs.StringSliceVar(&s.EtcdServerList, "etcd-servers", s.EtcdServerList, "List of etcd servers to watch (http://ip:port), comma separated. Mutually exclusive with -etcd-config") fs.StringVar(&s.EtcdConfigFile, "etcd-config", s.EtcdConfigFile, "The config file for the etcd client. Mutually exclusive with -etcd-servers.") + fs.StringSliceVar(&s.EtcdServersOverrides, "etcd-servers-overrides", s.EtcdServersOverrides, "Per-resource etcd servers overrides, comma separated. The individual override format: group/resource#servers, where servers are http://ip:port, semicolon separated.") fs.StringVar(&s.EtcdPathPrefix, "etcd-prefix", s.EtcdPathPrefix, "The prefix for all resource paths in etcd.") fs.StringSliceVar(&s.CorsAllowedOriginList, "cors-allowed-origins", s.CorsAllowedOriginList, "List of allowed origins for CORS, comma separated. An allowed origin can be a regular expression to support subdomain matching. If this list is empty CORS will not be enabled.") fs.BoolVar(&s.AllowPrivileged, "allow-privileged", s.AllowPrivileged, "If true, allow privileged containers.") @@ -253,6 +255,8 @@ func (s *APIServer) verifyClusterIPFlags() { } } +type newEtcdFunc func(string, []string, meta.VersionInterfacesFunc, string, string) (storage.Interface, error) + func newEtcd(etcdConfigFile string, etcdServerList []string, interfacesFunc meta.VersionInterfacesFunc, storageVersion, pathPrefix string) (etcdStorage storage.Interface, err error) { if storageVersion == "" { return etcdStorage, fmt.Errorf("storageVersion is required to create a etcd storage") @@ -294,6 +298,45 @@ func generateStorageVersionMap(legacyVersion string, storageVersions string) map return storageVersionMap } +// parse the value of --etcd-servers-overrides and update given storageDestinations. +func updateEtcdOverrides(overrides []string, storageVersions map[string]string, prefix string, storageDestinations *master.StorageDestinations, newEtcdFn newEtcdFunc) { + if len(overrides) == 0 { + return + } + for _, override := range overrides { + tokens := strings.Split(override, "#") + if len(tokens) != 2 { + glog.Errorf("invalid value of etcd server overrides: %s", override) + continue + } + + apiresource := strings.Split(tokens[0], "/") + if len(apiresource) != 2 { + glog.Errorf("invalid resource definition: %s", tokens[0]) + } + group := apiresource[0] + resource := apiresource[1] + + apigroup, err := latest.Group(group) + if err != nil { + glog.Errorf("invalid api group %s: %v", group, err) + continue + } + if _, found := storageVersions[apigroup.Group]; !found { + glog.Errorf("Couldn't find the storage version for group %s", apigroup.Group) + continue + } + + servers := strings.Split(tokens[1], ";") + etcdOverrideStorage, err := newEtcdFn("", servers, apigroup.InterfacesFor, storageVersions[apigroup.Group], prefix) + if err != nil { + glog.Fatalf("Invalid storage version or misconfigured etcd for %s: %v", tokens[0], err) + } + + storageDestinations.AddStorageOverride(group, resource, etcdOverrideStorage) + } +} + // Run runs the specified APIServer. This should never exit. func (s *APIServer) Run(_ []string) error { s.verifyClusterIPFlags() @@ -369,6 +412,8 @@ func (s *APIServer) Run(_ []string) error { return err } + storageDestinations := master.NewStorageDestinations() + storageVersions := generateStorageVersionMap(s.DeprecatedStorageVersion, s.StorageVersions) if _, found := storageVersions[legacyV1Group.Group]; !found { glog.Fatalf("Couldn't find the storage version for group: %q in storageVersions: %v", legacyV1Group.Group, storageVersions) @@ -377,8 +422,8 @@ func (s *APIServer) Run(_ []string) error { if err != nil { glog.Fatalf("Invalid storage version or misconfigured etcd: %v", err) } + storageDestinations.AddAPIGroup("", etcdStorage) - var expEtcdStorage storage.Interface if enableExp { expGroup, err := latest.Group("experimental") if err != nil { @@ -387,12 +432,15 @@ func (s *APIServer) Run(_ []string) error { if _, found := storageVersions[expGroup.Group]; !found { glog.Fatalf("Couldn't find the storage version for group: %q in storageVersions: %v", expGroup.Group, storageVersions) } - expEtcdStorage, err = newEtcd(s.EtcdConfigFile, s.EtcdServerList, expGroup.InterfacesFor, storageVersions[expGroup.Group], s.EtcdPathPrefix) + expEtcdStorage, err := newEtcd(s.EtcdConfigFile, s.EtcdServerList, expGroup.InterfacesFor, storageVersions[expGroup.Group], s.EtcdPathPrefix) if err != nil { glog.Fatalf("Invalid experimental storage version or misconfigured etcd: %v", err) } + storageDestinations.AddAPIGroup("experimental", expEtcdStorage) } + updateEtcdOverrides(s.EtcdServersOverrides, storageVersions, s.EtcdPathPrefix, &storageDestinations, newEtcd) + n := s.ServiceClusterIPRange // Default to the private server key for service account token signing @@ -460,10 +508,8 @@ func (s *APIServer) Run(_ []string) error { } } config := &master.Config{ - DatabaseStorage: etcdStorage, - ExpDatabaseStorage: expEtcdStorage, - StorageVersions: storageVersions, - + StorageDestinations: storageDestinations, + StorageVersions: storageVersions, EventTTL: s.EventTTL, KubeletClient: kubeletClient, ServiceClusterIPRange: &n, diff --git a/cmd/kube-apiserver/app/server_test.go b/cmd/kube-apiserver/app/server_test.go index 6299892620f..fc69f9efeb5 100644 --- a/cmd/kube-apiserver/app/server_test.go +++ b/cmd/kube-apiserver/app/server_test.go @@ -19,7 +19,12 @@ package app import ( "reflect" "regexp" + "strings" "testing" + + "k8s.io/kubernetes/pkg/api/meta" + "k8s.io/kubernetes/pkg/master" + "k8s.io/kubernetes/pkg/storage" ) func TestLongRunningRequestRegexp(t *testing.T) { @@ -98,3 +103,54 @@ func TestGenerateStorageVersionMap(t *testing.T) { } } } + +func TestUpdateEtcdOverrides(t *testing.T) { + storageVersions := generateStorageVersionMap("", "v1,experimental/v1alpha1") + + testCases := []struct { + apigroup string + resource string + servers []string + }{ + { + apigroup: "", + resource: "resource", + servers: []string{"http://127.0.0.1:10000"}, + }, + { + apigroup: "", + resource: "resource", + servers: []string{"http://127.0.0.1:10000", "http://127.0.0.1:20000"}, + }, + { + apigroup: "experimental", + resource: "resource", + servers: []string{"http://127.0.0.1:10000"}, + }, + } + + for _, test := range testCases { + newEtcd := func(_ string, serverList []string, _ meta.VersionInterfacesFunc, _, _ string) (storage.Interface, error) { + if !reflect.DeepEqual(test.servers, serverList) { + t.Errorf("unexpected server list, expected: %#v, got: %#v", test.servers, serverList) + } + return nil, nil + } + storageDestinations := master.NewStorageDestinations() + override := test.apigroup + "/" + test.resource + "#" + strings.Join(test.servers, ";") + updateEtcdOverrides([]string{override}, storageVersions, "", &storageDestinations, newEtcd) + apigroup, ok := storageDestinations.APIGroups[test.apigroup] + if !ok { + t.Errorf("apigroup: %s not created", test.apigroup) + continue + } + if apigroup.Overrides == nil { + t.Errorf("Overrides not created for: %s", test.apigroup) + continue + } + if _, ok := apigroup.Overrides[test.resource]; !ok { + t.Errorf("override not created for: %s", test.resource) + continue + } + } +} diff --git a/hack/verify-flags/known-flags.txt b/hack/verify-flags/known-flags.txt index fb714037e8a..639915e166c 100644 --- a/hack/verify-flags/known-flags.txt +++ b/hack/verify-flags/known-flags.txt @@ -77,6 +77,7 @@ etcd-config etcd-prefix etcd-server etcd-servers +etcd-servers-overrides event-burst event-qps event-ttl diff --git a/pkg/master/master.go b/pkg/master/master.go index 67fa7477bcc..21d2e18a088 100644 --- a/pkg/master/master.go +++ b/pkg/master/master.go @@ -98,10 +98,78 @@ const ( DefaultEtcdPathPrefix = "/registry" ) +// StorageDestinations is a mapping from API group & resource to +// the underlying storage interfaces. +type StorageDestinations struct { + APIGroups map[string]*StorageDestinationsForAPIGroup +} + +type StorageDestinationsForAPIGroup struct { + Default storage.Interface + Overrides map[string]storage.Interface +} + +func NewStorageDestinations() StorageDestinations { + return StorageDestinations{ + APIGroups: map[string]*StorageDestinationsForAPIGroup{}, + } +} + +func (s *StorageDestinations) AddAPIGroup(group string, defaultStorage storage.Interface) { + s.APIGroups[group] = &StorageDestinationsForAPIGroup{ + Default: defaultStorage, + Overrides: map[string]storage.Interface{}, + } +} + +func (s *StorageDestinations) AddStorageOverride(group, resource string, override storage.Interface) { + if _, ok := s.APIGroups[group]; !ok { + s.AddAPIGroup(group, nil) + } + if s.APIGroups[group].Overrides == nil { + s.APIGroups[group].Overrides = map[string]storage.Interface{} + } + s.APIGroups[group].Overrides[resource] = override +} + +func (s *StorageDestinations) get(group, resource string) storage.Interface { + apigroup, ok := s.APIGroups[group] + if !ok { + glog.Errorf("No storage defined for API group: '%s'", apigroup) + return nil + } + if apigroup.Overrides != nil { + if client, exists := apigroup.Overrides[resource]; exists { + return client + } + } + return apigroup.Default +} + +// Get all backends for all registered storage destinations. +// Used for getting all instances for health validations. +func (s *StorageDestinations) backends() []string { + backends := sets.String{} + for _, group := range s.APIGroups { + if group.Default != nil { + for _, backend := range group.Default.Backends() { + backends.Insert(backend) + } + } + if group.Overrides != nil { + for _, storage := range group.Overrides { + for _, backend := range storage.Backends() { + backends.Insert(backend) + } + } + } + } + return backends.List() +} + // Config is a structure used to configure a Master. type Config struct { - DatabaseStorage storage.Interface - ExpDatabaseStorage storage.Interface + StorageDestinations StorageDestinations // StorageVersions is a map between groups and their storage versions StorageVersions map[string]string EventTTL time.Duration @@ -435,35 +503,36 @@ func NewHandlerContainer(mux *http.ServeMux) *restful.Container { func (m *Master) init(c *Config) { healthzChecks := []healthz.HealthzChecker{} m.clock = util.RealClock{} - podStorage := podetcd.NewStorage(c.DatabaseStorage, c.EnableWatchCache, c.KubeletClient) + dbClient := func(resource string) storage.Interface { return c.StorageDestinations.get("", resource) } + podStorage := podetcd.NewStorage(dbClient("pods"), c.EnableWatchCache, c.KubeletClient) - podTemplateStorage := podtemplateetcd.NewREST(c.DatabaseStorage) + podTemplateStorage := podtemplateetcd.NewREST(dbClient("podTemplates")) - eventStorage := eventetcd.NewREST(c.DatabaseStorage, uint64(c.EventTTL.Seconds())) - limitRangeStorage := limitrangeetcd.NewREST(c.DatabaseStorage) + eventStorage := eventetcd.NewREST(dbClient("events"), uint64(c.EventTTL.Seconds())) + limitRangeStorage := limitrangeetcd.NewREST(dbClient("limitRanges")) - resourceQuotaStorage, resourceQuotaStatusStorage := resourcequotaetcd.NewREST(c.DatabaseStorage) - secretStorage := secretetcd.NewREST(c.DatabaseStorage) - serviceAccountStorage := serviceaccountetcd.NewREST(c.DatabaseStorage) - persistentVolumeStorage, persistentVolumeStatusStorage := pvetcd.NewREST(c.DatabaseStorage) - persistentVolumeClaimStorage, persistentVolumeClaimStatusStorage := pvcetcd.NewREST(c.DatabaseStorage) + resourceQuotaStorage, resourceQuotaStatusStorage := resourcequotaetcd.NewREST(dbClient("resourceQuotas")) + secretStorage := secretetcd.NewREST(dbClient("secrets")) + serviceAccountStorage := serviceaccountetcd.NewREST(dbClient("serviceAccounts")) + persistentVolumeStorage, persistentVolumeStatusStorage := pvetcd.NewREST(dbClient("persistentVolumes")) + persistentVolumeClaimStorage, persistentVolumeClaimStatusStorage := pvcetcd.NewREST(dbClient("persistentVolumeClaims")) - namespaceStorage, namespaceStatusStorage, namespaceFinalizeStorage := namespaceetcd.NewREST(c.DatabaseStorage) + namespaceStorage, namespaceStatusStorage, namespaceFinalizeStorage := namespaceetcd.NewREST(dbClient("namespaces")) m.namespaceRegistry = namespace.NewRegistry(namespaceStorage) - endpointsStorage := endpointsetcd.NewREST(c.DatabaseStorage, c.EnableWatchCache) + endpointsStorage := endpointsetcd.NewREST(dbClient("endpoints"), c.EnableWatchCache) m.endpointRegistry = endpoint.NewRegistry(endpointsStorage) - nodeStorage, nodeStatusStorage := nodeetcd.NewREST(c.DatabaseStorage, c.EnableWatchCache, c.KubeletClient) + nodeStorage, nodeStatusStorage := nodeetcd.NewREST(dbClient("nodes"), c.EnableWatchCache, c.KubeletClient) m.nodeRegistry = node.NewRegistry(nodeStorage) - serviceStorage := serviceetcd.NewREST(c.DatabaseStorage) + serviceStorage := serviceetcd.NewREST(dbClient("services")) m.serviceRegistry = service.NewRegistry(serviceStorage) var serviceClusterIPRegistry service.RangeRegistry serviceClusterIPAllocator := ipallocator.NewAllocatorCIDRRange(m.serviceClusterIPRange, func(max int, rangeSpec string) allocator.Interface { mem := allocator.NewAllocationMap(max, rangeSpec) - etcd := etcdallocator.NewEtcd(mem, "/ranges/serviceips", "serviceipallocation", c.DatabaseStorage) + etcd := etcdallocator.NewEtcd(mem, "/ranges/serviceips", "serviceipallocation", dbClient("services")) serviceClusterIPRegistry = etcd return etcd }) @@ -472,13 +541,13 @@ func (m *Master) init(c *Config) { var serviceNodePortRegistry service.RangeRegistry serviceNodePortAllocator := portallocator.NewPortAllocatorCustom(m.serviceNodePortRange, func(max int, rangeSpec string) allocator.Interface { mem := allocator.NewAllocationMap(max, rangeSpec) - etcd := etcdallocator.NewEtcd(mem, "/ranges/servicenodeports", "servicenodeportallocation", c.DatabaseStorage) + etcd := etcdallocator.NewEtcd(mem, "/ranges/servicenodeports", "servicenodeportallocation", dbClient("services")) serviceNodePortRegistry = etcd return etcd }) m.serviceNodePortAllocator = serviceNodePortRegistry - controllerStorage := controlleretcd.NewREST(c.DatabaseStorage) + controllerStorage := controlleretcd.NewREST(dbClient("replicationControllers")) // TODO: Factor out the core API registration m.storage = map[string]rest.Storage{ @@ -579,7 +648,7 @@ func (m *Master) init(c *Config) { // allGroups records all supported groups at /apis allGroups := []api.APIGroup{} if m.exp { - m.thirdPartyStorage = c.ExpDatabaseStorage + m.thirdPartyStorage = c.StorageDestinations.APIGroups["experimental"].Default m.thirdPartyResources = map[string]*thirdpartyresourcedataetcd.REST{} expVersion := m.experimental(c) @@ -752,7 +821,8 @@ func (m *Master) getServersToValidate(c *Config) map[string]apiserver.Server { "controller-manager": {Addr: "127.0.0.1", Port: ports.ControllerManagerPort, Path: "/healthz"}, "scheduler": {Addr: "127.0.0.1", Port: ports.SchedulerPort, Path: "/healthz"}, } - for ix, machine := range c.DatabaseStorage.Backends() { + + for ix, machine := range c.StorageDestinations.backends() { etcdUrl, err := url.Parse(machine) if err != nil { glog.Errorf("Failed to parse etcd url for validation: %v", err) @@ -962,13 +1032,16 @@ func (m *Master) thirdpartyapi(group, kind, version string) *apiserver.APIGroupV // experimental returns the resources and codec for the experimental api func (m *Master) experimental(c *Config) *apiserver.APIGroupVersion { - controllerStorage := expcontrolleretcd.NewStorage(c.DatabaseStorage) - autoscalerStorage := horizontalpodautoscaleretcd.NewREST(c.ExpDatabaseStorage) - thirdPartyResourceStorage := thirdpartyresourceetcd.NewREST(c.ExpDatabaseStorage) - daemonSetStorage, daemonSetStatusStorage := daemonetcd.NewREST(c.ExpDatabaseStorage) - deploymentStorage := deploymentetcd.NewStorage(c.ExpDatabaseStorage) - jobStorage, jobStatusStorage := jobetcd.NewREST(c.ExpDatabaseStorage) - ingressStorage := ingressetcd.NewREST(c.ExpDatabaseStorage) + controllerStorage := expcontrolleretcd.NewStorage(c.StorageDestinations.get("", "replicationControllers")) + dbClient := func(resource string) storage.Interface { + return c.StorageDestinations.get("experimental", resource) + } + autoscalerStorage := horizontalpodautoscaleretcd.NewREST(dbClient("horizonalpodautoscalers")) + thirdPartyResourceStorage := thirdpartyresourceetcd.NewREST(dbClient("thirdpartyresources")) + daemonSetStorage, daemonSetStatusStorage := daemonetcd.NewREST(dbClient("daemonsets")) + deploymentStorage := deploymentetcd.NewStorage(dbClient("deployments")) + jobStorage, jobStatusStorage := jobetcd.NewREST(dbClient("jobs")) + ingressStorage := ingressetcd.NewREST(dbClient("ingress")) thirdPartyControl := ThirdPartyController{ master: m, diff --git a/pkg/master/master_test.go b/pkg/master/master_test.go index 390a495baa8..022d577f93f 100644 --- a/pkg/master/master_test.go +++ b/pkg/master/master_test.go @@ -65,9 +65,11 @@ func setUp(t *testing.T) (Master, Config, *assert.Assertions) { fakeClient := tools.NewFakeEtcdClient(t) fakeClient.Machines = []string{"http://machine1:4001", "http://machine2", "http://machine3:4003"} storageVersions := make(map[string]string) - config.DatabaseStorage = etcdstorage.NewEtcdStorage(fakeClient, testapi.Default.Codec(), etcdtest.PathPrefix()) + storageDestinations := NewStorageDestinations() + storageDestinations.AddAPIGroup("", etcdstorage.NewEtcdStorage(fakeClient, testapi.Default.Codec(), etcdtest.PathPrefix())) + storageDestinations.AddAPIGroup("experimental", etcdstorage.NewEtcdStorage(fakeClient, testapi.Experimental.Codec(), etcdtest.PathPrefix())) + config.StorageDestinations = storageDestinations storageVersions[""] = testapi.Default.Version() - config.ExpDatabaseStorage = etcdstorage.NewEtcdStorage(fakeClient, testapi.Experimental.Codec(), etcdtest.PathPrefix()) storageVersions["experimental"] = testapi.Experimental.GroupAndVersion() config.StorageVersions = storageVersions master.nodeRegistry = registrytest.NewNodeRegistry([]string{"node1", "node2"}, api.NodeResources{}) diff --git a/test/integration/auth_test.go b/test/integration/auth_test.go index 54b9f3d64ba..6df5651aa0d 100644 --- a/test/integration/auth_test.go +++ b/test/integration/auth_test.go @@ -390,6 +390,8 @@ func TestAuthModeAlwaysAllow(t *testing.T) { if err != nil { t.Fatalf("unexpected error: %v", err) } + storageDestinations := master.NewStorageDestinations() + storageDestinations.AddAPIGroup("", etcdStorage) var m *master.Master s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { m.Handler.ServeHTTP(w, req) @@ -397,7 +399,7 @@ func TestAuthModeAlwaysAllow(t *testing.T) { defer s.Close() m = master.New(&master.Config{ - DatabaseStorage: etcdStorage, + StorageDestinations: storageDestinations, KubeletClient: client.FakeKubeletClient{}, EnableCoreControllers: true, EnableLogsSupport: false, @@ -506,6 +508,8 @@ func TestAuthModeAlwaysDeny(t *testing.T) { if err != nil { t.Fatalf("unexpected error: %v", err) } + storageDestinations := master.NewStorageDestinations() + storageDestinations.AddAPIGroup("", etcdStorage) var m *master.Master s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { @@ -514,7 +518,7 @@ func TestAuthModeAlwaysDeny(t *testing.T) { defer s.Close() m = master.New(&master.Config{ - DatabaseStorage: etcdStorage, + StorageDestinations: storageDestinations, KubeletClient: client.FakeKubeletClient{}, EnableCoreControllers: true, EnableLogsSupport: false, @@ -574,6 +578,8 @@ func TestAliceNotForbiddenOrUnauthorized(t *testing.T) { if err != nil { t.Fatalf("unexpected error: %v", err) } + storageDestinations := master.NewStorageDestinations() + storageDestinations.AddAPIGroup("", etcdStorage) var m *master.Master s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { @@ -582,7 +588,7 @@ func TestAliceNotForbiddenOrUnauthorized(t *testing.T) { defer s.Close() m = master.New(&master.Config{ - DatabaseStorage: etcdStorage, + StorageDestinations: storageDestinations, KubeletClient: client.FakeKubeletClient{}, EnableCoreControllers: true, EnableLogsSupport: false, @@ -662,6 +668,8 @@ func TestBobIsForbidden(t *testing.T) { if err != nil { t.Fatalf("unexpected error: %v", err) } + storageDestinations := master.NewStorageDestinations() + storageDestinations.AddAPIGroup("", etcdStorage) var m *master.Master s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { @@ -670,7 +678,7 @@ func TestBobIsForbidden(t *testing.T) { defer s.Close() m = master.New(&master.Config{ - DatabaseStorage: etcdStorage, + StorageDestinations: storageDestinations, KubeletClient: client.FakeKubeletClient{}, EnableCoreControllers: true, EnableLogsSupport: false, @@ -724,6 +732,8 @@ func TestUnknownUserIsUnauthorized(t *testing.T) { if err != nil { t.Fatalf("unexpected error: %v", err) } + storageDestinations := master.NewStorageDestinations() + storageDestinations.AddAPIGroup("", etcdStorage) var m *master.Master s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { @@ -732,7 +742,7 @@ func TestUnknownUserIsUnauthorized(t *testing.T) { defer s.Close() m = master.New(&master.Config{ - DatabaseStorage: etcdStorage, + StorageDestinations: storageDestinations, KubeletClient: client.FakeKubeletClient{}, EnableCoreControllers: true, EnableLogsSupport: false, @@ -809,6 +819,8 @@ func TestAuthorizationAttributeDetermination(t *testing.T) { if err != nil { t.Fatalf("unexpected error: %v", err) } + storageDestinations := master.NewStorageDestinations() + storageDestinations.AddAPIGroup("", etcdStorage) trackingAuthorizer := &trackingAuthorizer{} @@ -819,7 +831,7 @@ func TestAuthorizationAttributeDetermination(t *testing.T) { defer s.Close() m = master.New(&master.Config{ - DatabaseStorage: etcdStorage, + StorageDestinations: storageDestinations, KubeletClient: client.FakeKubeletClient{}, EnableCoreControllers: true, EnableLogsSupport: false, @@ -890,6 +902,8 @@ func TestNamespaceAuthorization(t *testing.T) { if err != nil { t.Fatalf("unexpected error: %v", err) } + storageDestinations := master.NewStorageDestinations() + storageDestinations.AddAPIGroup("", etcdStorage) a := newAuthorizerWithContents(t, `{"namespace": "foo"} `) @@ -901,7 +915,7 @@ func TestNamespaceAuthorization(t *testing.T) { defer s.Close() m = master.New(&master.Config{ - DatabaseStorage: etcdStorage, + StorageDestinations: storageDestinations, KubeletClient: client.FakeKubeletClient{}, EnableCoreControllers: true, EnableLogsSupport: false, @@ -1006,6 +1020,8 @@ func TestKindAuthorization(t *testing.T) { if err != nil { t.Fatalf("unexpected error: %v", err) } + storageDestinations := master.NewStorageDestinations() + storageDestinations.AddAPIGroup("", etcdStorage) a := newAuthorizerWithContents(t, `{"resource": "services"} `) @@ -1017,7 +1033,7 @@ func TestKindAuthorization(t *testing.T) { defer s.Close() m = master.New(&master.Config{ - DatabaseStorage: etcdStorage, + StorageDestinations: storageDestinations, KubeletClient: client.FakeKubeletClient{}, EnableCoreControllers: true, EnableLogsSupport: false, @@ -1110,6 +1126,8 @@ func TestReadOnlyAuthorization(t *testing.T) { if err != nil { t.Fatalf("unexpected error: %v", err) } + storageDestinations := master.NewStorageDestinations() + storageDestinations.AddAPIGroup("", etcdStorage) a := newAuthorizerWithContents(t, `{"readonly": true}`) @@ -1120,7 +1138,7 @@ func TestReadOnlyAuthorization(t *testing.T) { defer s.Close() m = master.New(&master.Config{ - DatabaseStorage: etcdStorage, + StorageDestinations: storageDestinations, KubeletClient: client.FakeKubeletClient{}, EnableCoreControllers: true, EnableLogsSupport: false, diff --git a/test/integration/framework/master_utils.go b/test/integration/framework/master_utils.go index 13fe661a217..5ce56a7eb1f 100644 --- a/test/integration/framework/master_utils.go +++ b/test/integration/framework/master_utils.go @@ -37,7 +37,6 @@ import ( "k8s.io/kubernetes/pkg/kubectl" "k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/master" - "k8s.io/kubernetes/pkg/storage" "k8s.io/kubernetes/pkg/tools/etcdtest" "k8s.io/kubernetes/plugin/pkg/admission/admit" ) @@ -72,8 +71,6 @@ type MasterComponents struct { rcStopCh chan struct{} // Used to stop master components individually, and via MasterComponents.Stop once sync.Once - // Kubernetes etcd storage, has embedded etcd client - EtcdStorage storage.Interface } // Config is a struct of configuration directives for NewMasterComponents. @@ -92,7 +89,7 @@ type Config struct { // NewMasterComponents creates, initializes and starts master components based on the given config. func NewMasterComponents(c *Config) *MasterComponents { - m, s, e := startMasterOrDie(c.MasterConfig) + m, s := startMasterOrDie(c.MasterConfig) // TODO: Allow callers to pipe through a different master url and create a client/start components using it. glog.Infof("Master %+v", s.URL) if c.DeleteEtcdKeys { @@ -114,24 +111,21 @@ func NewMasterComponents(c *Config) *MasterComponents { RestClient: restClient, ControllerManager: controllerManager, rcStopCh: rcStopCh, - EtcdStorage: e, once: once, } } // startMasterOrDie starts a kubernetes master and an httpserver to handle api requests -func startMasterOrDie(masterConfig *master.Config) (*master.Master, *httptest.Server, storage.Interface) { +func startMasterOrDie(masterConfig *master.Config) (*master.Master, *httptest.Server) { var m *master.Master s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { m.Handler.ServeHTTP(w, req) })) - var etcdStorage storage.Interface - var err error if masterConfig == nil { etcdClient := NewEtcdClient() storageVersions := make(map[string]string) - etcdStorage, err = master.NewEtcdStorage(etcdClient, latest.GroupOrDie("").InterfacesFor, latest.GroupOrDie("").GroupVersion, etcdtest.PathPrefix()) + etcdStorage, err := master.NewEtcdStorage(etcdClient, latest.GroupOrDie("").InterfacesFor, latest.GroupOrDie("").GroupVersion, etcdtest.PathPrefix()) storageVersions[""] = latest.GroupOrDie("").GroupVersion if err != nil { glog.Fatalf("Failed to create etcd storage for master %v", err) @@ -141,10 +135,12 @@ func startMasterOrDie(masterConfig *master.Config) (*master.Master, *httptest.Se if err != nil { glog.Fatalf("Failed to create etcd storage for master %v", err) } + storageDestinations := master.NewStorageDestinations() + storageDestinations.AddAPIGroup("", etcdStorage) + storageDestinations.AddAPIGroup("experimental", expEtcdStorage) masterConfig = &master.Config{ - DatabaseStorage: etcdStorage, - ExpDatabaseStorage: expEtcdStorage, + StorageDestinations: storageDestinations, StorageVersions: storageVersions, KubeletClient: client.FakeKubeletClient{}, EnableExp: true, @@ -157,11 +153,9 @@ func startMasterOrDie(masterConfig *master.Config) (*master.Master, *httptest.Se Authorizer: apiserver.NewAlwaysAllowAuthorizer(), AdmissionControl: admit.NewAlwaysAdmit(), } - } else { - etcdStorage = masterConfig.DatabaseStorage } m = master.New(masterConfig) - return m, s, etcdStorage + return m, s } func (m *MasterComponents) stopRCManager() { @@ -285,20 +279,22 @@ func RunAMaster(t *testing.T) (*master.Master, *httptest.Server) { if err != nil { t.Fatalf("unexpected error: %v", err) } + storageDestinations := master.NewStorageDestinations() + storageDestinations.AddAPIGroup("", etcdStorage) + storageDestinations.AddAPIGroup("experimental", expEtcdStorage) m := master.New(&master.Config{ - DatabaseStorage: etcdStorage, - ExpDatabaseStorage: expEtcdStorage, - KubeletClient: client.FakeKubeletClient{}, - EnableLogsSupport: false, - EnableProfiling: true, - EnableUISupport: false, - APIPrefix: "/api", - APIGroupPrefix: "/apis", - EnableExp: true, - Authorizer: apiserver.NewAlwaysAllowAuthorizer(), - AdmissionControl: admit.NewAlwaysAdmit(), - StorageVersions: storageVersions, + StorageDestinations: storageDestinations, + KubeletClient: client.FakeKubeletClient{}, + EnableLogsSupport: false, + EnableProfiling: true, + EnableUISupport: false, + APIPrefix: "/api", + APIGroupPrefix: "/apis", + EnableExp: true, + Authorizer: apiserver.NewAlwaysAllowAuthorizer(), + AdmissionControl: admit.NewAlwaysAdmit(), + StorageVersions: storageVersions, }) s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { diff --git a/test/integration/scheduler_test.go b/test/integration/scheduler_test.go index a6bf5934006..04dbe2e93da 100644 --- a/test/integration/scheduler_test.go +++ b/test/integration/scheduler_test.go @@ -58,6 +58,8 @@ func TestUnschedulableNodes(t *testing.T) { if err != nil { t.Fatalf("Couldn't create etcd storage: %v", err) } + storageDestinations := master.NewStorageDestinations() + storageDestinations.AddAPIGroup("", etcdStorage) framework.DeleteAllEtcdKeys() var m *master.Master @@ -67,7 +69,7 @@ func TestUnschedulableNodes(t *testing.T) { defer s.Close() m = master.New(&master.Config{ - DatabaseStorage: etcdStorage, + StorageDestinations: storageDestinations, KubeletClient: client.FakeKubeletClient{}, EnableCoreControllers: true, EnableLogsSupport: false, diff --git a/test/integration/secret_test.go b/test/integration/secret_test.go index 680a73e0c68..bd126f9730c 100644 --- a/test/integration/secret_test.go +++ b/test/integration/secret_test.go @@ -51,6 +51,8 @@ func TestSecrets(t *testing.T) { if err != nil { t.Fatalf("unexpected error: %v", err) } + storageDestinations := master.NewStorageDestinations() + storageDestinations.AddAPIGroup("", etcdStorage) var m *master.Master s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { @@ -59,7 +61,7 @@ func TestSecrets(t *testing.T) { defer s.Close() m = master.New(&master.Config{ - DatabaseStorage: etcdStorage, + StorageDestinations: storageDestinations, KubeletClient: client.FakeKubeletClient{}, EnableCoreControllers: true, EnableLogsSupport: false, diff --git a/test/integration/service_account_test.go b/test/integration/service_account_test.go index 43602aea2d1..82b9c9cc8e8 100644 --- a/test/integration/service_account_test.go +++ b/test/integration/service_account_test.go @@ -345,6 +345,8 @@ func startServiceAccountTestServer(t *testing.T) (*client.Client, client.Config, if err != nil { t.Fatalf("unexpected error: %v", err) } + storageDestinations := master.NewStorageDestinations() + storageDestinations.AddAPIGroup("", etcdStorage) // Listener var m *master.Master @@ -411,16 +413,16 @@ func startServiceAccountTestServer(t *testing.T) (*client.Client, client.Config, // Create a master and install handlers into mux. m = master.New(&master.Config{ - DatabaseStorage: etcdStorage, - KubeletClient: client.FakeKubeletClient{}, - EnableLogsSupport: false, - EnableUISupport: false, - EnableIndex: true, - APIPrefix: "/api", - Authenticator: authenticator, - Authorizer: authorizer, - AdmissionControl: serviceAccountAdmission, - StorageVersions: map[string]string{"": testapi.Default.Version()}, + StorageDestinations: storageDestinations, + KubeletClient: client.FakeKubeletClient{}, + EnableLogsSupport: false, + EnableUISupport: false, + EnableIndex: true, + APIPrefix: "/api", + Authenticator: authenticator, + Authorizer: authorizer, + AdmissionControl: serviceAccountAdmission, + StorageVersions: map[string]string{"": testapi.Default.Version()}, }) // Start the service account and service account token controllers diff --git a/test/integration/utils.go b/test/integration/utils.go index 874cff321c1..46d857ff3b4 100644 --- a/test/integration/utils.go +++ b/test/integration/utils.go @@ -70,9 +70,11 @@ func runAMaster(t *testing.T) (*master.Master, *httptest.Server) { if err != nil { t.Fatalf("unexpected error: %v", err) } + storageDestinations := master.NewStorageDestinations() + storageDestinations.AddAPIGroup("", etcdStorage) m := master.New(&master.Config{ - DatabaseStorage: etcdStorage, + StorageDestinations: storageDestinations, KubeletClient: client.FakeKubeletClient{}, EnableCoreControllers: true, EnableLogsSupport: false,