Configuring scheduler via json configuration file

This commit is contained in:
Abhishek Gupta
2015-02-19 16:18:28 -08:00
parent 445fd56739
commit 548e0da567
9 changed files with 421 additions and 5 deletions

View File

@@ -18,8 +18,11 @@ limitations under the License.
package app
import (
"fmt"
"io/ioutil"
"net"
"net/http"
"os"
"strconv"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
@@ -30,6 +33,8 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler"
_ "github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler/algorithmprovider"
schedulerapi "github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler/api"
latestschedulerapi "github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler/api/latest"
"github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler/factory"
"github.com/golang/glog"
@@ -42,6 +47,7 @@ type SchedulerServer struct {
Address util.IP
ClientConfig client.Config
AlgorithmProvider string
PolicyConfigFile string
}
// NewSchedulerServer creates a new SchedulerServer with default parameters
@@ -60,6 +66,7 @@ func (s *SchedulerServer) AddFlags(fs *pflag.FlagSet) {
fs.Var(&s.Address, "address", "The IP address to serve on (set to 0.0.0.0 for all interfaces)")
client.BindClientConfigFlags(fs, &s.ClientConfig)
fs.StringVar(&s.AlgorithmProvider, "algorithm_provider", s.AlgorithmProvider, "The scheduling algorithm provider to use")
fs.StringVar(&s.PolicyConfigFile, "policy_config_file", s.PolicyConfigFile, "File with scheduler policy configuration")
}
// Run runs the specified SchedulerServer. This should never exit.
@@ -78,6 +85,7 @@ func (s *SchedulerServer) Run(_ []string) error {
if err != nil {
glog.Fatalf("Failed to create scheduler configuration: %v", err)
}
sched := scheduler.New(config)
sched.Run()
@@ -85,10 +93,29 @@ func (s *SchedulerServer) Run(_ []string) error {
}
func (s *SchedulerServer) createConfig(configFactory *factory.ConfigFactory) (*scheduler.Config, error) {
var policy schedulerapi.Policy
var configData []byte
if _, err := os.Stat(s.PolicyConfigFile); err == nil {
configData, err = ioutil.ReadFile(s.PolicyConfigFile)
if err != nil {
return nil, fmt.Errorf("Unable to read policy config: %v", err)
}
//err = json.Unmarshal(configData, &policy)
err = latestschedulerapi.Codec.DecodeInto(configData, &policy)
if err != nil {
return nil, fmt.Errorf("Invalid configuration: %v", err)
}
return configFactory.CreateFromConfig(policy)
}
// if the config file isn't provided, use the specified (or default) provider
// check of algorithm provider is registered and fail fast
_, err := factory.GetAlgorithmProvider(s.AlgorithmProvider)
if err != nil {
return nil, err
}
return configFactory.CreateFromProvider(s.AlgorithmProvider)
}

View File

@@ -37,9 +37,9 @@ func affinityPredicates() util.StringSet {
"PodFitsResources",
"NoDiskConflict",
// Ensures that all pods within the same service are hosted on minions within the same region as defined by the "region" label
factory.RegisterFitPredicate("ServiceAffinity", algorithm.NewServiceAffinityPredicate(factory.PodLister, factory.ServiceLister, factory.MinionLister, []string{"region"})),
factory.RegisterFitPredicate("RegionAffinity", algorithm.NewServiceAffinityPredicate(factory.PodLister, factory.ServiceLister, factory.MinionLister, []string{"region"})),
// Fit is defined based on the presence of the "region" label on a minion, regardless of value.
factory.RegisterFitPredicate("NodeLabelPredicate", algorithm.NewNodeLabelPredicate(factory.MinionLister, []string{"region"}, true)),
factory.RegisterFitPredicate("RegionRequired", algorithm.NewNodeLabelPredicate(factory.MinionLister, []string{"region"}, true)),
)
}
@@ -48,8 +48,8 @@ func affinityPriorities() util.StringSet {
"LeastRequestedPriority",
"ServiceSpreadingPriority",
// spreads pods belonging to the same service across minions in different zones
factory.RegisterPriorityFunction("ZoneSpreadingPriority", algorithm.NewServiceAntiAffinityPriority(factory.ServiceLister, "zone"), 2),
factory.RegisterPriorityFunction("ZoneSpread", algorithm.NewServiceAntiAffinityPriority(factory.ServiceLister, "zone"), 2),
// Prioritize nodes based on the presence of the "zone" label on a minion, regardless of value.
factory.RegisterPriorityFunction("NodeLabelPriority", algorithm.NewNodeLabelPriority("zone", true), 1),
factory.RegisterPriorityFunction("ZonePreferred", algorithm.NewNodeLabelPriority("zone", true), 1),
)
}

View File

@@ -0,0 +1,40 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package latest
import (
"github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler/api/v1"
)
// Version is the string that represents the current external default version.
const Version = "v1"
// OldestVersion is the string that represents the oldest server version supported,
// for client code that wants to hardcode the lowest common denominator.
const OldestVersion = "v1"
// Versions is the list of versions that are recognized in code. The order provided
// may be assumed to be least feature rich to most feature rich, and clients may
// choose to prefer the latter items in the list over the former items when presented
// with a set of versions to choose.
var Versions = []string{"v1"}
// Codec is the default codec for serializing output that should use
// the latest supported version. Use this Codec when writing to
// disk, a data store that is not dynamically versioned, or in tests.
// This codec can decode any object that Kubernetes is aware of.
var Codec = v1.Codec

View File

@@ -0,0 +1,32 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package api
import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
)
// Scheme is the default instance of runtime.Scheme to which types in the Kubernetes API are already registered.
var Scheme = runtime.NewScheme()
func init() {
Scheme.AddKnownTypes("",
&Policy{},
)
}
func (*Policy) IsAnAPIObject() {}

View File

@@ -0,0 +1,73 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package api
import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
)
// Where possible, json tags match the cli argument names.
// Top level config objects and all values required for proper functioning are not "omitempty". Any truly optional piece of config is allowed to be omitted.
type Policy struct {
api.TypeMeta `json:",inline"`
Predicates []PredicatePolicy `json:"predicates"`
Priorities []PriorityPolicy `json:"priorities"`
}
type PredicatePolicy struct {
Name string `json:"name"`
Argument *PredicateArgument `json:"argument"`
}
type PriorityPolicy struct {
Name string `json:"name"`
Weight int `json:"weight"`
Argument *PriorityArgument `json:"argument"`
}
// PredicateArgument represents the arguments that the different types of predicates take.
// Only one of its members may be specified.
type PredicateArgument struct {
ServiceAffinity *ServiceAffinity `json:"serviceAffinity"`
LabelsPresence *LabelsPresence `json:"labelsPresence"`
}
// PriorityArgument represents the arguments that the different types of priorities take.
// Only one of its members may be specified.
type PriorityArgument struct {
ServiceAntiAffinity *ServiceAntiAffinity `json:"serviceAntiAffinity"`
LabelPreference *LabelPreference `json:"labelPreference"`
}
type ServiceAffinity struct {
Labels []string `json:"labels"`
}
type LabelsPresence struct {
Labels []string `json:"labels"`
Presence bool `json:"presence"`
}
type ServiceAntiAffinity struct {
Label string `json:"label"`
}
type LabelPreference struct {
Label string `json:"label"`
Presence bool `json:"presence"`
}

View File

@@ -0,0 +1,33 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package v1
import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler/api"
)
// Codec encodes internal objects to the v1 scheme
var Codec = runtime.CodecFor(api.Scheme, "v1")
func init() {
api.Scheme.AddKnownTypes("v1",
&Policy{},
)
}
func (*Policy) IsAnAPIObject() {}

View File

@@ -0,0 +1,106 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package v1
import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/v1beta3"
)
// Where possible, json tags match the cli argument names.
// Top level config objects and all values required for proper functioning are not "omitempty". Any truly optional piece of config is allowed to be omitted.
type Policy struct {
v1beta3.TypeMeta `json:",inline"`
// Predicates holds the information to configure the fit predicate functions
Predicates []PredicatePolicy `json:"predicates"`
// Priorities holds the information to configure the priority functions
Priorities []PriorityPolicy `json:"priorities"`
}
type PredicatePolicy struct {
// Name is the identifier of the predicate policy
// For a custom predicate, the name can be user-defined
// For the Kubernetes provided predicates, the name is the identifier of the pre-defined predicate
Name string `json:"name"`
// Argument holds the parameters to configure the given predicate
Argument *PredicateArgument `json:"argument"`
}
type PriorityPolicy struct {
// Name is the identifier of the priority policy
// For a custom priority, the name can be user-defined
// For the Kubernetes provided priority functions, the name is the identifier of the pre-defined priority function
Name string `json:"name"`
// Weight is the numeric multiplier for the minion scores that the priority function generates
Weight int `json:"weight"`
// Argument holds the parameters to configure the given priority function
Argument *PriorityArgument `json:"argument"`
}
// PredicateArgument represents the arguments that the different types of predicates take
// Only one of its members may be specified
type PredicateArgument struct {
// ServiceAffinity is the predicate that provides affinity for pods belonging to a service
// It uses a label to identify minions that belong to the same "group"
ServiceAffinity *ServiceAffinity `json:"serviceAffinity"`
// LabelsPresence is the predicate that checks whether a particular minion has a certain label
// defined or not, regardless of value
LabelsPresence *LabelsPresence `json:"labelsPresence"`
}
// PriorityArgument represents the arguments that the different types of priorities take.
// Only one of its members may be specified
type PriorityArgument struct {
// ServiceAntiAffinity is the priority function that ensures a good spread (anti-affinity) for pods belonging to a service
// It uses a label to identify minions that belong to the same "group"
ServiceAntiAffinity *ServiceAntiAffinity `json:"serviceAntiAffinity"`
// LabelPreference is the priority function that checks whether a particular minion has a certain label
// defined or not, regardless of value
LabelPreference *LabelPreference `json:"labelPreference"`
}
// ServiceAffinity holds the parameters that are used to configure the corresponding predicate
type ServiceAffinity struct {
// Labels is the list of labels that identify minion "groups"
// All of the labels should match for the minion to be considered a fit for hosting the pod
Labels []string `json:"labels"`
}
// LabelsPresence holds the parameters that are used to configure the corresponding predicate
type LabelsPresence struct {
// Labels is the list of labels that identify minion "groups"
// All of the labels should be either present (or absent) for the minion to be considered a fit for hosting the pod
Labels []string `json:"labels"`
// Presence is the boolean flag that indicates whether the labels should be present or absent from the minion
Presence bool `json:"presence"`
}
// ServiceAntiAffinity holds the parameters that are used to configure the corresponding priority function
type ServiceAntiAffinity struct {
// Label is used to identify minion "groups"
Label string `json:"label"`
}
// LabelPreference holds the parameters that are used to configure the corresponding priority function
type LabelPreference struct {
// Label is used to identify minion "groups"
Label string `json:"label"`
// Presence is a boolean flag
// If true, higher priority is given to minions that have the label
// If false, higher priority is given to minions that do not have the label
Presence bool `json:"presence"`
}

View File

@@ -30,6 +30,7 @@ import (
algorithm "github.com/GoogleCloudPlatform/kubernetes/pkg/scheduler"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler"
schedulerapi "github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler/api"
"github.com/golang/glog"
)
@@ -80,6 +81,25 @@ func (f *ConfigFactory) CreateFromProvider(providerName string) (*scheduler.Conf
return f.CreateFromKeys(provider.FitPredicateKeys, provider.PriorityFunctionKeys)
}
// CreateFromConfig creates a scheduler from the configuration file
func (f *ConfigFactory) CreateFromConfig(policy schedulerapi.Policy) (*scheduler.Config, error) {
glog.V(2).Infof("creating scheduler from configuration: %v", policy)
predicateKeys := util.NewStringSet()
for _, predicate := range policy.Predicates {
glog.V(2).Infof("Registering predicate: %s", predicate.Name)
predicateKeys.Insert(RegisterCustomPredicate(predicate))
}
priorityKeys := util.NewStringSet()
for _, priority := range policy.Priorities {
glog.V(2).Infof("Registering priority: %s", priority.Name)
priorityKeys.Insert(RegisterCustomPriorityFunction(priority))
}
return f.CreateFromKeys(predicateKeys, priorityKeys)
}
// CreateFromKeys creates a scheduler from a set of registered fit predicate keys and priority keys.
func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys util.StringSet) (*scheduler.Config, error) {
glog.V(2).Infof("creating scheduler with fit predicates '%v' and priority functions '%v", predicateKeys, priorityKeys)

View File

@@ -23,6 +23,7 @@ import (
algorithm "github.com/GoogleCloudPlatform/kubernetes/pkg/scheduler"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
schedulerapi "github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler/api"
"github.com/golang/glog"
)
@@ -55,6 +56,33 @@ func RegisterFitPredicate(name string, predicate algorithm.FitPredicate) string
return name
}
// RegisterCustomPredicate registers a custom fit predicate with the algorithm registry.
// Returns the name, with which the predicate was registered.
func RegisterCustomPredicate(policy schedulerapi.PredicatePolicy) string {
var predicate algorithm.FitPredicate
var ok bool
validatePredicateOrDie(policy)
// generate the predicate function, if a custom type is requested
if policy.Argument != nil {
if policy.Argument.ServiceAffinity != nil {
predicate = algorithm.NewServiceAffinityPredicate(PodLister, ServiceLister, MinionLister, policy.Argument.ServiceAffinity.Labels)
} else if policy.Argument.LabelsPresence != nil {
predicate = algorithm.NewNodeLabelPredicate(MinionLister, policy.Argument.LabelsPresence.Labels, policy.Argument.LabelsPresence.Presence)
}
// check to see if a pre-defined predicate is requested
} else if predicate, ok = fitPredicateMap[policy.Name]; ok {
glog.V(2).Infof("Predicate type %s already registered, reusing.", policy.Name)
}
if predicate == nil {
glog.Fatalf("Invalid configuration: Predicate type not found for %s", policy.Name)
}
return RegisterFitPredicate(policy.Name, predicate)
}
// IsFitPredicateRegistered check is useful for testing providers.
func IsFitPredicateRegistered(name string) bool {
schedulerFactoryMutex.Lock()
@@ -63,7 +91,7 @@ func IsFitPredicateRegistered(name string) bool {
return ok
}
// RegisterFitPredicate registers a priority function with the algorithm registry. Returns the name,
// RegisterPriorityFunction registers a priority function with the algorithm registry. Returns the name,
// with which the function was registered.
func RegisterPriorityFunction(name string, function algorithm.PriorityFunction, weight int) string {
schedulerFactoryMutex.Lock()
@@ -73,6 +101,32 @@ func RegisterPriorityFunction(name string, function algorithm.PriorityFunction,
return name
}
// RegisterCustomPriority registers a custom priority function with the algorithm registry.
// Returns the name, with which the priority function was registered.
func RegisterCustomPriorityFunction(policy schedulerapi.PriorityPolicy) string {
var priority algorithm.PriorityFunction
validatePriorityOrDie(policy)
// generate the priority function, if a custom priority is requested
if policy.Argument != nil {
if policy.Argument.ServiceAntiAffinity != nil {
priority = algorithm.NewServiceAntiAffinityPriority(ServiceLister, policy.Argument.ServiceAntiAffinity.Label)
} else if policy.Argument.LabelPreference != nil {
priority = algorithm.NewNodeLabelPriority(policy.Argument.LabelPreference.Label, policy.Argument.LabelPreference.Presence)
}
} else if priorityConfig, ok := priorityFunctionMap[policy.Name]; ok {
glog.V(2).Infof("Priority type %s already registered, reusing.", policy.Name)
priority = priorityConfig.Function
}
if priority == nil {
glog.Fatalf("Invalid configuration: Priority type not found for %s", policy.Name)
}
return RegisterPriorityFunction(policy.Name, priority, policy.Weight)
}
// IsPriorityFunctionRegistered check is useful for testing providers.
func IsPriorityFunctionRegistered(name string) bool {
schedulerFactoryMutex.Lock()
@@ -91,6 +145,7 @@ func SetPriorityFunctionWeight(name string, weight int) {
return
}
config.Weight = weight
priorityFunctionMap[name] = config
}
// RegisterAlgorithmProvider registers a new algorithm provider with the algorithm registry. This should
@@ -157,3 +212,33 @@ func validateAlgorithmNameOrDie(name string) {
glog.Fatalf("algorithm name %v does not match the name validation regexp \"%v\".", name, validName)
}
}
func validatePredicateOrDie(predicate schedulerapi.PredicatePolicy) {
if predicate.Argument != nil {
numArgs := 0
if predicate.Argument.ServiceAffinity != nil {
numArgs++
}
if predicate.Argument.LabelsPresence != nil {
numArgs++
}
if numArgs != 1 {
glog.Fatalf("Exactly 1 predicate argument is required")
}
}
}
func validatePriorityOrDie(priority schedulerapi.PriorityPolicy) {
if priority.Argument != nil {
numArgs := 0
if priority.Argument.ServiceAntiAffinity != nil {
numArgs++
}
if priority.Argument.LabelPreference != nil {
numArgs++
}
if numArgs != 1 {
glog.Fatalf("Exactly 1 priority argument is required")
}
}
}