From ab8cfb968ffa3d3b174ed980af1e5674cafb26e7 Mon Sep 17 00:00:00 2001 From: Eric Tune Date: Wed, 17 Feb 2016 09:11:31 -0800 Subject: [PATCH] Enabled batch API group in apiserver --- cmd/kube-apiserver/app/server.go | 41 ++++++++++++++++++ pkg/master/master.go | 71 ++++++++++++++++++++++++++++++-- 2 files changed, 109 insertions(+), 3 deletions(-) diff --git a/cmd/kube-apiserver/app/server.go b/cmd/kube-apiserver/app/server.go index 69604b3b8f6..3f6ee3a703e 100644 --- a/cmd/kube-apiserver/app/server.go +++ b/cmd/kube-apiserver/app/server.go @@ -38,6 +38,7 @@ import ( "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/apimachinery/registered" "k8s.io/kubernetes/pkg/apis/autoscaling" + "k8s.io/kubernetes/pkg/apis/batch" "k8s.io/kubernetes/pkg/apis/extensions" "k8s.io/kubernetes/pkg/apiserver" "k8s.io/kubernetes/pkg/apiserver/authenticator" @@ -305,6 +306,11 @@ func Run(s *options.APIServer) error { // sure autoscaling has a storage destination. If the autoscaling group // itself is on, it will overwrite this decision below. storageDestinations.AddAPIGroup(autoscaling.GroupName, expEtcdStorage) + + // Since Job has been moved to the batch group, we need to make + // sure batch has a storage destination. If the batch group + // itself is on, it will overwrite this decision below. + storageDestinations.AddAPIGroup(batch.GroupName, expEtcdStorage) } // autoscaling/v1/horizontalpodautoscalers is a move from extensions/v1beta1/horizontalpodautoscalers. @@ -333,6 +339,33 @@ func Run(s *options.APIServer) error { storageDestinations.AddAPIGroup(autoscaling.GroupName, autoscalingEtcdStorage) } + // batch/v1/job is a move from extensions/v1beta1/job. The storage + // version needs to be either extensions/v1beta1 or batch/v1. Users + // must roll forward while using 1.2, because we will require the + // latter for 1.3. + if !apiGroupVersionOverrides["batch/v1"].Disable { + glog.Infof("Configuring batch/v1 storage destination") + batchGroup, err := registered.Group(batch.GroupName) + if err != nil { + glog.Fatalf("Batch API is enabled in runtime config, but not enabled in the environment variable KUBE_API_VERSIONS. Error: %v", err) + } + // Figure out what storage group/version we should use. + storageGroupVersion, found := storageVersions[batchGroup.GroupVersion.Group] + if !found { + glog.Fatalf("Couldn't find the storage version for group: %q in storageVersions: %v", batchGroup.GroupVersion.Group, storageVersions) + } + + if storageGroupVersion != "batch/v1" && storageGroupVersion != "extensions/v1beta1" { + glog.Fatalf("The storage version for batch must be either 'batch/v1' or 'extensions/v1beta1'") + } + glog.Infof("Using %v for batch group storage version", storageGroupVersion) + batchEtcdStorage, err := newEtcd(s.EtcdServerList, api.Codecs, storageGroupVersion, "extensions/__internal", s.EtcdPathPrefix, s.EtcdQuorumRead) + if err != nil { + glog.Fatalf("Invalid extensions storage version or misconfigured etcd: %v", err) + } + storageDestinations.AddAPIGroup(batch.GroupName, batchEtcdStorage) + } + updateEtcdOverrides(s.EtcdServersOverrides, storageVersions, s.EtcdPathPrefix, s.EtcdQuorumRead, &storageDestinations, newEtcd) n := s.ServiceClusterIPRange @@ -520,6 +553,14 @@ func parseRuntimeConfig(s *options.APIServer) (map[string]genericapiserver.APIGr Disable: true, } } + disableBatch := disableAllAPIs + batchGroupVersion := "batch/v1" + disableBatch = !getRuntimeConfigValue(s, batchGroupVersion, !disableBatch) + if disableBatch { + apiGroupVersionOverrides[batchGroupVersion] = genericapiserver.APIGroupVersionOverride{ + Disable: true, + } + } for key := range s.RuntimeConfig { if strings.HasPrefix(key, v1GroupVersion+"/") { diff --git a/pkg/master/master.go b/pkg/master/master.go index 825f858e039..39d76bbfa79 100644 --- a/pkg/master/master.go +++ b/pkg/master/master.go @@ -32,6 +32,7 @@ import ( "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/apimachinery/registered" "k8s.io/kubernetes/pkg/apis/autoscaling" + "k8s.io/kubernetes/pkg/apis/batch" "k8s.io/kubernetes/pkg/apis/extensions" "k8s.io/kubernetes/pkg/apiserver" apiservermetrics "k8s.io/kubernetes/pkg/apiserver/metrics" @@ -285,6 +286,38 @@ func (m *Master) InstallAPIs(c *Config) { allGroups = append(allGroups, group) } + // Install batch unless disabled. + if !m.ApiGroupVersionOverrides["batch/v1"].Disable { + batchResources := m.getBatchResources(c) + batchGroupMeta := registered.GroupOrDie(batch.GroupName) + + // Hard code preferred group version to batch/v1 + batchGroupMeta.GroupVersion = unversioned.GroupVersion{Group: "batch", Version: "v1"} + + apiGroupInfo := genericapiserver.APIGroupInfo{ + GroupMeta: *batchGroupMeta, + VersionedResourcesStorageMap: map[string]map[string]rest.Storage{ + "v1": batchResources, + }, + OptionsExternalVersion: ®istered.GroupOrDie(api.GroupName).GroupVersion, + Scheme: api.Scheme, + ParameterCodec: api.ParameterCodec, + NegotiatedSerializer: api.Codecs, + } + apiGroupsInfo = append(apiGroupsInfo, apiGroupInfo) + + batchGVForDiscovery := unversioned.GroupVersionForDiscovery{ + GroupVersion: batchGroupMeta.GroupVersion.String(), + Version: batchGroupMeta.GroupVersion.Version, + } + group := unversioned.APIGroup{ + Name: batchGroupMeta.GroupVersion.Group, + Versions: []unversioned.GroupVersionForDiscovery{batchGVForDiscovery}, + PreferredVersion: batchGVForDiscovery, + } + allGroups = append(allGroups, group) + } + if err := m.InstallAPIGroups(apiGroupsInfo); err != nil { glog.Fatalf("Error in registering group versions: %v", err) } @@ -666,9 +699,7 @@ func (m *Master) getExtensionResources(c *Config) map[string]rest.Storage { storage["deployments/rollback"] = deploymentStorage.Rollback } if isEnabled("jobs") { - jobStorage, jobStatusStorage := jobetcd.NewREST(dbClient("jobs"), storageDecorator) - storage["jobs"] = jobStorage - storage["jobs/status"] = jobStatusStorage + m.constructJobResources(c, storage) } if isEnabled("ingresses") { ingressStorage, ingressStatusStorage := ingressetcd.NewREST(dbClient("ingresses"), storageDecorator) @@ -722,6 +753,40 @@ func (m *Master) getAutoscalingResources(c *Config) map[string]rest.Storage { return storage } +// constructJobResources makes Job resources and adds them to the storage map. +// They're installed in both batch and extensions. It's assumed that you've +// already done the check that they should be on. +func (m *Master) constructJobResources(c *Config, restStorage map[string]rest.Storage) { + // Note that job's storage settings are changed by changing the batch + // group. Clearly we want all jobs to be stored in the same place no + // matter where they're accessed from. + storageDecorator := m.StorageDecorator() + dbClient := func(resource string) storage.Interface { + return c.StorageDestinations.Search([]string{batch.GroupName, extensions.GroupName}, resource) + } + jobStorage, jobStatusStorage := jobetcd.NewREST(dbClient("jobs"), storageDecorator) + restStorage["jobs"] = jobStorage + restStorage["jobs/status"] = jobStatusStorage +} + +// getBatchResources returns the resources for batch api +func (m *Master) getBatchResources(c *Config) map[string]rest.Storage { + resourceOverrides := m.ApiGroupVersionOverrides["batch/v1"].ResourceOverrides + isEnabled := func(resource string) bool { + // Check if the resource has been overriden. + if enabled, ok := resourceOverrides[resource]; ok { + return enabled + } + return !m.ApiGroupVersionOverrides["batch/v1"].Disable + } + + storage := map[string]rest.Storage{} + if isEnabled("jobs") { + m.constructJobResources(c, storage) + } + return storage +} + // findExternalAddress returns ExternalIP of provided node with fallback to LegacyHostIP. func findExternalAddress(node *api.Node) (string, error) { var fallback string