Merge pull request #21276 from mesosphere/jdef_host_port_endpoints_refactor

Auto commit by PR queue bot
This commit is contained in:
k8s-merge-robot 2016-02-16 10:59:11 -08:00
commit eb44520f5f
25 changed files with 355 additions and 305 deletions

View File

@ -137,7 +137,8 @@ Host ports that are not defined, or else defined as zero, will automatically be
To disable the work-around and revert to vanilla Kubernetes service endpoint termination:
- execute the k8sm controller-manager with `-host_port_endpoints=false`;
- execute the k8sm scheduler with `-host-port-endpoints=false`
- execute the k8sm controller-manager with `-host-port-endpoints=false`
Then the usual Kubernetes network assumptions must be fulfilled for Kubernetes to work with Mesos, i.e. each container must get a cluster-wide routable IP (compare [Kubernetes Networking documentation](../../../docs/design/networking.md#container-to-container)).

View File

@ -84,7 +84,7 @@ func NewCMServer() *CMServer {
// AddFlags adds flags for a specific CMServer to the specified FlagSet
func (s *CMServer) AddFlags(fs *pflag.FlagSet) {
s.CMServer.AddFlags(fs)
fs.BoolVar(&s.UseHostPortEndpoints, "host_port_endpoints", s.UseHostPortEndpoints, "Map service endpoints to hostIP:hostPort instead of podIP:containerPort. Default true.")
fs.BoolVar(&s.UseHostPortEndpoints, "host-port-endpoints", s.UseHostPortEndpoints, "Map service endpoints to hostIP:hostPort instead of podIP:containerPort. Default true.")
}
func (s *CMServer) resyncPeriod() time.Duration {

View File

@ -34,6 +34,7 @@ import (
"k8s.io/kubernetes/contrib/mesos/pkg/podutil"
kmruntime "k8s.io/kubernetes/contrib/mesos/pkg/runtime"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask/hostport"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/testapi"
"k8s.io/kubernetes/pkg/api/unversioned"
@ -197,11 +198,11 @@ func TestExecutorLaunchAndKillTask(t *testing.T) {
podTask, err := podtask.New(
api.NewDefaultContext(),
"",
podtask.Config{
Prototype: executorinfo,
HostPortStrategy: hostport.StrategyWildcard,
},
pod,
executorinfo,
nil,
nil,
)
assert.Equal(t, nil, err, "must be able to create a task from a pod")
@ -407,11 +408,12 @@ func TestExecutorFrameworkMessage(t *testing.T) {
executorinfo := &mesosproto.ExecutorInfo{}
podTask, _ := podtask.New(
api.NewDefaultContext(),
"foo",
podtask.Config{
ID: "foo",
Prototype: executorinfo,
HostPortStrategy: hostport.StrategyWildcard,
},
pod,
executorinfo,
nil,
nil,
)
pod.Annotations = map[string]string{
"k8s.mesosphere.io/taskId": podTask.ID,

View File

@ -20,14 +20,13 @@ import (
"fmt"
log "github.com/golang/glog"
"github.com/mesos/mesos-go/mesosproto"
"k8s.io/kubernetes/contrib/mesos/pkg/offers"
"k8s.io/kubernetes/contrib/mesos/pkg/queue"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/components/algorithm/podschedulers"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/errors"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask"
mresource "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/resource"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/resources"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/cache"
)
@ -42,14 +41,12 @@ type SchedulerAlgorithm interface {
// SchedulerAlgorithm implements the algorithm.ScheduleAlgorithm interface
type schedulerAlgorithm struct {
sched scheduler.Scheduler
podUpdates queue.FIFO
podScheduler podschedulers.PodScheduler
prototype *mesosproto.ExecutorInfo
frameworkRoles []string
defaultPodRoles []string
defaultCpus mresource.CPUShares
defaultMem mresource.MegaBytes
sched scheduler.Scheduler
podUpdates queue.FIFO
podScheduler podschedulers.PodScheduler
taskConfig podtask.Config
defaultCpus resources.CPUShares
defaultMem resources.MegaBytes
}
// New returns a new SchedulerAlgorithm
@ -58,20 +55,17 @@ func New(
sched scheduler.Scheduler,
podUpdates queue.FIFO,
podScheduler podschedulers.PodScheduler,
prototype *mesosproto.ExecutorInfo,
frameworkRoles, defaultPodRoles []string,
defaultCpus mresource.CPUShares,
defaultMem mresource.MegaBytes,
taskConfig podtask.Config,
defaultCpus resources.CPUShares,
defaultMem resources.MegaBytes,
) SchedulerAlgorithm {
return &schedulerAlgorithm{
sched: sched,
podUpdates: podUpdates,
podScheduler: podScheduler,
frameworkRoles: frameworkRoles,
defaultPodRoles: defaultPodRoles,
prototype: prototype,
defaultCpus: defaultCpus,
defaultMem: defaultMem,
sched: sched,
podUpdates: podUpdates,
podScheduler: podScheduler,
taskConfig: taskConfig,
defaultCpus: defaultCpus,
defaultMem: defaultMem,
}
}
@ -109,7 +103,7 @@ func (k *schedulerAlgorithm) Schedule(pod *api.Pod) (string, error) {
// From here on we can expect that the pod spec of a task has proper limits for CPU and memory.
k.limitPod(pod)
podTask, err := podtask.New(ctx, "", pod, k.prototype, k.frameworkRoles, k.defaultPodRoles)
podTask, err := podtask.New(ctx, k.taskConfig, pod)
if err != nil {
log.Warningf("aborting Schedule, unable to create podtask object %+v: %v", pod, err)
return "", err
@ -146,12 +140,12 @@ func (k *schedulerAlgorithm) Schedule(pod *api.Pod) (string, error) {
// limitPod limits the given pod based on the scheduler's default limits.
func (k *schedulerAlgorithm) limitPod(pod *api.Pod) error {
cpuRequest, cpuLimit, _, err := mresource.LimitPodCPU(pod, k.defaultCpus)
cpuRequest, cpuLimit, _, err := resources.LimitPodCPU(pod, k.defaultCpus)
if err != nil {
return err
}
memRequest, memLimit, _, err := mresource.LimitPodMem(pod, k.defaultMem)
memRequest, memLimit, _, err := resources.LimitPodMem(pod, k.defaultMem)
if err != nil {
return err
}

View File

@ -25,6 +25,7 @@ import (
types "k8s.io/kubernetes/contrib/mesos/pkg/scheduler"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/errors"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask/hostport"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/queuer"
"k8s.io/kubernetes/pkg/api"
)
@ -63,11 +64,12 @@ func TestDeleteOne_PendingPod(t *testing.T) {
}}}
task, err := podtask.New(
api.NewDefaultContext(),
"bar",
podtask.Config{
ID: "bar",
Prototype: &mesosproto.ExecutorInfo{},
HostPortStrategy: hostport.StrategyWildcard,
},
pod.Pod,
&mesosproto.ExecutorInfo{},
nil,
nil,
)
if err != nil {
t.Fatalf("failed to create task: %v", err)
@ -110,11 +112,12 @@ func TestDeleteOne_Running(t *testing.T) {
}}}
task, err := podtask.New(
api.NewDefaultContext(),
"bar",
podtask.Config{
ID: "bar",
Prototype: &mesosproto.ExecutorInfo{},
HostPortStrategy: hostport.StrategyWildcard,
},
pod.Pod,
&mesosproto.ExecutorInfo{},
nil,
nil,
)
if err != nil {
t.Fatalf("unexpected error: %v", err)

View File

@ -39,7 +39,7 @@ import (
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/config"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/queuer"
mresource "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/resource"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/resources"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/cache"
"k8s.io/kubernetes/pkg/client/record"
@ -65,10 +65,9 @@ func New(
terminate <-chan struct{},
mux *http.ServeMux,
lw *cache.ListWatch,
prototype *mesos.ExecutorInfo,
frameworkRoles, defaultPodRoles []string,
defaultCpus mresource.CPUShares,
defaultMem mresource.MegaBytes,
taskConfig podtask.Config,
defaultCpus resources.CPUShares,
defaultMem resources.MegaBytes,
) scheduler.Scheduler {
core := &sched{
framework: fw,
@ -82,7 +81,7 @@ func New(
q := queuer.New(queue.NewDelayFIFO(), podUpdates)
algorithm := algorithm.New(core, podUpdates, ps, prototype, frameworkRoles, defaultPodRoles, defaultCpus, defaultMem)
algorithm := algorithm.New(core, podUpdates, ps, taskConfig, defaultCpus, defaultMem)
podDeleter := deleter.New(core, q)

View File

@ -43,7 +43,8 @@ import (
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/ha"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/meta"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask"
mresource "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/resource"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask/hostport"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/resources"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/testapi"
"k8s.io/kubernetes/pkg/api/unversioned"
@ -524,11 +525,14 @@ func newLifecycleTest(t *testing.T) lifecycleTest {
schedulerProc.Terminal(),
http.DefaultServeMux,
&podsListWatch.ListWatch,
ei,
[]string{"*"},
[]string{"*"},
mresource.DefaultDefaultContainerCPULimit,
mresource.DefaultDefaultContainerMemLimit,
podtask.Config{
Prototype: ei,
FrameworkRoles: []string{"*"},
DefaultPodRoles: []string{"*"},
HostPortStrategy: hostport.StrategyWildcard,
},
resources.DefaultDefaultContainerCPULimit,
resources.DefaultDefaultContainerMemLimit,
)
assert.NotNil(scheduler)

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package podtask
package hostport
import (
"fmt"
@ -22,38 +22,30 @@ import (
log "github.com/golang/glog"
mesos "github.com/mesos/mesos-go/mesosproto"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/meta"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/resources"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/labels"
)
const (
// maps a Container.HostPort to the same exact offered host port, ignores .HostPort = 0
HostPortMappingFixed = "fixed"
// same as HostPortMappingFixed, except that .HostPort of 0 are mapped to any port offered
HostPortMappingWildcard = "wildcard"
)
// Objects implementing the HostPortMapper interface generate port mappings
// Objects implementing the Mapper interface generate port mappings
// from k8s container ports to ports offered by mesos
type HostPortMapper interface {
// Map maps the given pod task and the given mesos offer
// and returns a slice of port mappings
// or an error if the mapping failed
Map(t *T, offer *mesos.Offer) ([]HostPortMapping, error)
type Mapper interface {
// Map maps the given pod and the given mesos offer and returns a
// slice of port mappings or an error if the mapping failed
Map(pod *api.Pod, roles []string, offer *mesos.Offer) ([]Mapping, error)
}
// HostPortMapperFunc is a function adapter to the HostPortMapper interface
type HostPortMapperFunc func(*T, *mesos.Offer) ([]HostPortMapping, error)
// MapperFunc is a function adapter to the Mapper interface
type MapperFunc func(*api.Pod, []string, *mesos.Offer) ([]Mapping, error)
// Map calls f(t, offer)
func (f HostPortMapperFunc) Map(t *T, offer *mesos.Offer) ([]HostPortMapping, error) {
return f(t, offer)
func (f MapperFunc) Map(pod *api.Pod, roles []string, offer *mesos.Offer) ([]Mapping, error) {
return f(pod, roles, offer)
}
// A HostPortMapping represents the mapping between k8s container ports
// A Mapping represents the mapping between k8s container ports
// ports offered by mesos. It references the k8s' container and port
// and specifies the offered mesos port and the offered port's role
type HostPortMapping struct {
type Mapping struct {
ContainerIdx int // index of the container in the pod spec
PortIdx int // index of the port in a container's port spec
OfferPort uint64 // the port offered by mesos
@ -61,27 +53,27 @@ type HostPortMapping struct {
}
type PortAllocationError struct {
PodId string
PodID string
Ports []uint64
}
func (err *PortAllocationError) Error() string {
return fmt.Sprintf("Could not schedule pod %s: %d port(s) could not be allocated", err.PodId, len(err.Ports))
return fmt.Sprintf("Could not schedule pod %s: %d port(s) could not be allocated", err.PodID, len(err.Ports))
}
type DuplicateHostPortError struct {
m1, m2 HostPortMapping
type DuplicateError struct {
m1, m2 Mapping
}
func (err *DuplicateHostPortError) Error() string {
func (err *DuplicateError) Error() string {
return fmt.Sprintf(
"Host port %d is specified for container %d, pod %d and container %d, pod %d",
err.m1.OfferPort, err.m1.ContainerIdx, err.m1.PortIdx, err.m2.ContainerIdx, err.m2.PortIdx)
}
// WildcardMapper maps k8s wildcard ports (hostPort == 0) to any available offer port
func WildcardMapper(t *T, offer *mesos.Offer) ([]HostPortMapping, error) {
mapping, err := FixedMapper(t, offer)
func WildcardMapper(pod *api.Pod, roles []string, offer *mesos.Offer) ([]Mapping, error) {
mapping, err := FixedMapper(pod, roles, offer)
if err != nil {
return nil, err
}
@ -91,11 +83,11 @@ func WildcardMapper(t *T, offer *mesos.Offer) ([]HostPortMapping, error) {
taken[entry.OfferPort] = struct{}{}
}
wildports := []HostPortMapping{}
for i, container := range t.Pod.Spec.Containers {
wildports := []Mapping{}
for i, container := range pod.Spec.Containers {
for pi, port := range container.Ports {
if port.HostPort == 0 {
wildports = append(wildports, HostPortMapping{
wildports = append(wildports, Mapping{
ContainerIdx: i,
PortIdx: pi,
})
@ -104,7 +96,7 @@ func WildcardMapper(t *T, offer *mesos.Offer) ([]HostPortMapping, error) {
}
remaining := len(wildports)
foreachPortsRange(offer.GetResources(), t.Roles(), func(bp, ep uint64, role string) {
resources.ForeachPortsRange(offer.GetResources(), roles, func(bp, ep uint64, role string) {
log.V(3).Infof("Searching for wildcard port in range {%d:%d}", bp, ep)
for i := range wildports {
if wildports[i].OfferPort != 0 {
@ -115,7 +107,7 @@ func WildcardMapper(t *T, offer *mesos.Offer) ([]HostPortMapping, error) {
continue
}
wildports[i].OfferPort = port
wildports[i].Role = starredRole(role)
wildports[i].Role = resources.CanonicalRole(role)
mapping = append(mapping, wildports[i])
remaining--
taken[port] = struct{}{}
@ -126,7 +118,7 @@ func WildcardMapper(t *T, offer *mesos.Offer) ([]HostPortMapping, error) {
if remaining > 0 {
err := &PortAllocationError{
PodId: t.Pod.Name,
PodID: pod.Namespace + "/" + pod.Name,
}
// it doesn't make sense to include a port list here because they were all zero (wildcards)
return nil, err
@ -136,10 +128,10 @@ func WildcardMapper(t *T, offer *mesos.Offer) ([]HostPortMapping, error) {
}
// FixedMapper maps k8s host ports to offered ports ignoring hostPorts == 0 (remaining pod-private)
func FixedMapper(t *T, offer *mesos.Offer) ([]HostPortMapping, error) {
requiredPorts := make(map[uint64]HostPortMapping)
mapping := []HostPortMapping{}
for i, container := range t.Pod.Spec.Containers {
func FixedMapper(pod *api.Pod, roles []string, offer *mesos.Offer) ([]Mapping, error) {
requiredPorts := make(map[uint64]Mapping)
mapping := []Mapping{}
for i, container := range pod.Spec.Containers {
// strip all port==0 from this array; k8s already knows what to do with zero-
// ports (it does not create 'port bindings' on the minion-host); we need to
// remove the wildcards from this array since they don't consume host resources
@ -147,24 +139,24 @@ func FixedMapper(t *T, offer *mesos.Offer) ([]HostPortMapping, error) {
if port.HostPort == 0 {
continue // ignore
}
m := HostPortMapping{
m := Mapping{
ContainerIdx: i,
PortIdx: pi,
OfferPort: uint64(port.HostPort),
}
if entry, inuse := requiredPorts[uint64(port.HostPort)]; inuse {
return nil, &DuplicateHostPortError{entry, m}
return nil, &DuplicateError{entry, m}
}
requiredPorts[uint64(port.HostPort)] = m
}
}
foreachPortsRange(offer.GetResources(), t.Roles(), func(bp, ep uint64, role string) {
resources.ForeachPortsRange(offer.GetResources(), roles, func(bp, ep uint64, role string) {
for port := range requiredPorts {
log.V(3).Infof("evaluating port range {%d:%d} %d", bp, ep, port)
if (bp <= port) && (port <= ep) {
m := requiredPorts[port]
m.Role = starredRole(role)
m.Role = resources.CanonicalRole(role)
mapping = append(mapping, m)
delete(requiredPorts, port)
}
@ -174,7 +166,7 @@ func FixedMapper(t *T, offer *mesos.Offer) ([]HostPortMapping, error) {
unsatisfiedPorts := len(requiredPorts)
if unsatisfiedPorts > 0 {
err := &PortAllocationError{
PodId: t.Pod.Name,
PodID: pod.Namespace + "/" + pod.Name,
}
for p := range requiredPorts {
err.Ports = append(err.Ports, p)
@ -185,15 +177,35 @@ func FixedMapper(t *T, offer *mesos.Offer) ([]HostPortMapping, error) {
return mapping, nil
}
// NewHostPortMapper returns a new mapper based
// based on the port mapping key value
func NewHostPortMapper(pod *api.Pod) HostPortMapper {
filter := map[string]string{
meta.PortMappingKey: HostPortMappingFixed,
}
selector := labels.Set(filter).AsSelector()
if selector.Matches(labels.Set(pod.Labels)) {
return HostPortMapperFunc(FixedMapper)
}
return HostPortMapperFunc(WildcardMapper)
type Strategy string
const (
// maps a Container.HostPort to the same exact offered host port, ignores .HostPort = 0
StrategyFixed = Strategy("fixed")
// same as MappingFixed, except that .HostPort of 0 are mapped to any port offered
StrategyWildcard = Strategy("wildcard")
)
var validStrategies = map[Strategy]MapperFunc{
StrategyFixed: MapperFunc(FixedMapper),
StrategyWildcard: MapperFunc(WildcardMapper),
}
// NewMapper returns a new mapper based on the port mapping key value
func (defaultStrategy Strategy) NewMapper(pod *api.Pod) Mapper {
strategy, ok := pod.Labels[meta.PortMappingKey]
if ok {
f, ok := validStrategies[Strategy(strategy)]
if ok {
return f
}
log.Warningf("invalid port mapping strategy %q, reverting to default %q", strategy, defaultStrategy)
}
f, ok := validStrategies[defaultStrategy]
if ok {
return f
}
panic("scheduler is misconfigured, unrecognized default strategy \"" + defaultStrategy + "\"")
}

View File

@ -14,27 +14,31 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package podtask
package hostport
import (
"testing"
mesos "github.com/mesos/mesos-go/mesosproto"
"github.com/mesos/mesos-go/mesosutil"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/resources"
"k8s.io/kubernetes/pkg/api"
)
func TestDefaultHostPortMatching(t *testing.T) {
t.Parallel()
task := fakePodTask("foo", nil, nil)
pod := &task.Pod
pod := &api.Pod{
ObjectMeta: api.ObjectMeta{
Name: "foo",
Namespace: "default",
},
}
offer := &mesos.Offer{
Resources: []*mesos.Resource{
newPortsResource("*", 1, 1),
resources.NewPorts("*", 1, 1),
},
}
mapping, err := FixedMapper(task, offer)
mapping, err := FixedMapper(pod, []string{"*"}, offer)
if err != nil {
t.Fatal(err)
}
@ -52,12 +56,8 @@ func TestDefaultHostPortMatching(t *testing.T) {
}},
}},
}
task, err = New(api.NewDefaultContext(), "", pod, &mesos.ExecutorInfo{}, nil, nil)
if err != nil {
t.Fatal(err)
}
_, err = FixedMapper(task, offer)
if err, _ := err.(*DuplicateHostPortError); err == nil {
_, err = FixedMapper(pod, []string{"*"}, offer)
if err, _ := err.(*DuplicateError); err == nil {
t.Fatal("Expected duplicate port error")
} else if err.m1.OfferPort != 123 {
t.Fatal("Expected duplicate host port 123")
@ -65,12 +65,15 @@ func TestDefaultHostPortMatching(t *testing.T) {
}
func TestWildcardHostPortMatching(t *testing.T) {
t.Parallel()
task := fakePodTask("foo", nil, nil)
pod := &task.Pod
pod := &api.Pod{
ObjectMeta: api.ObjectMeta{
Name: "foo",
Namespace: "default",
},
}
offer := &mesos.Offer{}
mapping, err := WildcardMapper(task, offer)
mapping, err := WildcardMapper(pod, []string{"*"}, offer)
if err != nil {
t.Fatal(err)
}
@ -81,10 +84,10 @@ func TestWildcardHostPortMatching(t *testing.T) {
//--
offer = &mesos.Offer{
Resources: []*mesos.Resource{
newPortsResource("*", 1, 1),
resources.NewPorts("*", 1, 1),
},
}
mapping, err = WildcardMapper(task, offer)
mapping, err = WildcardMapper(pod, []string{"*"}, offer)
if err != nil {
t.Fatal(err)
}
@ -100,11 +103,7 @@ func TestWildcardHostPortMatching(t *testing.T) {
}},
}},
}
task, err = New(api.NewDefaultContext(), "", pod, &mesos.ExecutorInfo{}, nil, nil)
if err != nil {
t.Fatal(err)
}
mapping, err = WildcardMapper(task, offer)
mapping, err = WildcardMapper(pod, []string{"*"}, offer)
if err == nil {
t.Fatalf("expected error instead of mappings: %#v", mapping)
} else if err, _ := err.(*PortAllocationError); err == nil {
@ -123,11 +122,7 @@ func TestWildcardHostPortMatching(t *testing.T) {
}},
}},
}
task, err = New(api.NewDefaultContext(), "", pod, &mesos.ExecutorInfo{}, nil, nil)
if err != nil {
t.Fatal(err)
}
mapping, err = WildcardMapper(task, offer)
mapping, err = WildcardMapper(pod, []string{"*"}, offer)
if err, _ := err.(*PortAllocationError); err == nil {
t.Fatal("Expected port allocation error")
} else if !(len(err.Ports) == 1 && err.Ports[0] == 123) {
@ -144,11 +139,7 @@ func TestWildcardHostPortMatching(t *testing.T) {
}},
}},
}
task, err = New(api.NewDefaultContext(), "", pod, &mesos.ExecutorInfo{}, nil, nil)
if err != nil {
t.Fatal(err)
}
mapping, err = WildcardMapper(task, offer)
mapping, err = WildcardMapper(pod, []string{"*"}, offer)
if err, _ := err.(*PortAllocationError); err == nil {
t.Fatal("Expected port allocation error")
} else if len(err.Ports) != 0 {
@ -158,10 +149,10 @@ func TestWildcardHostPortMatching(t *testing.T) {
//--
offer = &mesos.Offer{
Resources: []*mesos.Resource{
newPortsResource("*", 1, 2),
resources.NewPorts("*", 1, 2),
},
}
mapping, err = WildcardMapper(task, offer)
mapping, err = WildcardMapper(pod, []string{"*"}, offer)
if err != nil {
t.Fatal(err)
} else if len(mapping) != 2 {
@ -190,16 +181,12 @@ func TestWildcardHostPortMatching(t *testing.T) {
}},
}},
}
task, err = New(api.NewDefaultContext(), "", pod, &mesos.ExecutorInfo{}, nil, nil)
if err != nil {
t.Fatal(err)
}
offer = &mesos.Offer{
Resources: []*mesos.Resource{
mesosutil.NewRangesResource("ports", []*mesos.Value_Range{mesosutil.NewValueRange(1, 1), mesosutil.NewValueRange(3, 5)}),
},
}
mapping, err = WildcardMapper(task, offer)
mapping, err = WildcardMapper(pod, []string{"*"}, offer)
if err != nil {
t.Fatal(err)
} else if len(mapping) != 2 {

View File

@ -27,7 +27,9 @@ import (
"k8s.io/kubernetes/contrib/mesos/pkg/offers"
mesosmeta "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/meta"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/metrics"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask/hostport"
"k8s.io/kubernetes/pkg/api"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
log "github.com/golang/glog"
mesos "github.com/mesos/mesos-go/mesosproto"
@ -53,9 +55,22 @@ const (
var starRole = []string{"*"}
// Config represents elements that are used or required in order to
// create a pod task that may be scheduled.
type Config struct {
ID string // ID is an optional, unique task ID; auto-generated if not specified
DefaultPodRoles []string // DefaultPodRoles lists preferred resource groups, prioritized in order
FrameworkRoles []string // FrameworkRoles identify resource groups from which the framework may consume
Prototype *mesos.ExecutorInfo // Prototype is required
HostPortStrategy hostport.Strategy // HostPortStrategy is used as the port mapping strategy, unless overridden by the pod
GenerateTaskDiscoveryEnabled bool
mapper hostport.Mapper // host-port mapping func, derived from pod and default strategy
podKey string // k8s key for this pod; managed internally
}
// A struct that describes a pod task.
type T struct {
ID string
Config
Pod api.Pod
// Stores the final procurement result, once set read-only.
@ -68,26 +83,16 @@ type T struct {
CreateTime time.Time
UpdatedTime time.Time // time of the most recent StatusUpdate we've seen from the mesos master
podStatus api.PodStatus
prototype *mesos.ExecutorInfo // readonly
frameworkRoles []string // Mesos framework roles, pods are allowed to be launched with those
defaultPodRoles []string // roles under which pods are scheduled if none are specified in labels
podKey string
launchTime time.Time
bindTime time.Time
mapper HostPortMapper
}
type Port struct {
Port uint64
Role string
podStatus api.PodStatus
launchTime time.Time
bindTime time.Time
}
type Spec struct {
SlaveID string
AssignedSlave string
Resources []*mesos.Resource
PortMap []HostPortMapping
PortMap []hostport.Mapping
Data []byte
Executor *mesos.ExecutorInfo
}
@ -130,9 +135,6 @@ func generateTaskName(pod *api.Pod) string {
return fmt.Sprintf("%s.%s.pod", pod.Name, ns)
}
// GenerateTaskDiscoveryEnabled turns on/off the generation of DiscoveryInfo for TaskInfo records
var GenerateTaskDiscoveryEnabled = false
func generateTaskDiscovery(pod *api.Pod) *mesos.DiscoveryInfo {
di := &mesos.DiscoveryInfo{
Visibility: mesos.DiscoveryInfo_CLUSTER.Enum(),
@ -195,7 +197,7 @@ func (t *T) BuildTaskInfo() (*mesos.TaskInfo, error) {
SlaveId: mutil.NewSlaveID(t.Spec.SlaveID),
}
if GenerateTaskDiscoveryEnabled {
if t.GenerateTaskDiscoveryEnabled {
info.Discovery = generateTaskDiscovery(&t.Pod)
}
@ -237,46 +239,45 @@ func (t *T) Roles() (result []string) {
return filterRoles(
roles,
not(emptyRole), not(seenRole()), inRoles(t.frameworkRoles...),
not(emptyRole), not(seenRole()), inRoles(t.FrameworkRoles...),
)
}
// no roles label defined, return defaults
return t.defaultPodRoles
return t.DefaultPodRoles
}
func New(ctx api.Context, id string, pod *api.Pod, prototype *mesos.ExecutorInfo, frameworkRoles, defaultPodRoles []string) (*T, error) {
if prototype == nil {
return nil, fmt.Errorf("illegal argument: executor is nil")
func New(ctx api.Context, config Config, pod *api.Pod) (*T, error) {
if config.Prototype == nil {
return nil, fmt.Errorf("illegal argument: executor-info prototype is nil")
}
if len(frameworkRoles) == 0 {
frameworkRoles = starRole
if len(config.FrameworkRoles) == 0 {
config.FrameworkRoles = starRole
}
if len(defaultPodRoles) == 0 {
defaultPodRoles = starRole
if len(config.DefaultPodRoles) == 0 {
config.DefaultPodRoles = starRole
}
key, err := MakePodKey(ctx, pod.Name)
if err != nil {
return nil, err
}
config.podKey = key
if id == "" {
id = "pod." + uuid.NewUUID().String()
if config.ID == "" {
config.ID = "pod." + uuid.NewUUID().String()
}
// the scheduler better get the fallback strategy right, otherwise we panic here
config.mapper = config.HostPortStrategy.NewMapper(pod)
task := &T{
ID: id,
Pod: *pod,
State: StatePending,
podKey: key,
mapper: NewHostPortMapper(pod),
Flags: make(map[FlagType]struct{}),
prototype: prototype,
frameworkRoles: frameworkRoles,
defaultPodRoles: defaultPodRoles,
Pod: *pod,
Config: config,
State: StatePending,
Flags: make(map[FlagType]struct{}),
}
task.CreateTime = time.Now()
@ -305,11 +306,17 @@ func (t *T) SaveRecoveryInfo(dict map[string]string) {
func RecoverFrom(pod api.Pod) (*T, bool, error) {
// we only expect annotations if pod has been bound, which implies that it has already
// been scheduled and launched
if pod.Spec.NodeName == "" && len(pod.Annotations) == 0 {
if len(pod.Annotations) == 0 {
log.V(1).Infof("skipping recovery for unbound pod %v/%v", pod.Namespace, pod.Name)
return nil, false, nil
}
// we don't track mirror pods, they're considered part of the executor
if _, isMirrorPod := pod.Annotations[kubetypes.ConfigMirrorAnnotationKey]; isMirrorPod {
log.V(1).Infof("skipping recovery for mirror pod %v/%v", pod.Namespace, pod.Name)
return nil, false, nil
}
// only process pods that are not in a terminal state
switch pod.Status.Phase {
case api.PodPending, api.PodRunning, api.PodUnknown: // continue
@ -328,12 +335,13 @@ func RecoverFrom(pod api.Pod) (*T, bool, error) {
now := time.Now()
t := &T{
Config: Config{
podKey: key,
},
Pod: pod,
CreateTime: now,
podKey: key,
State: StatePending, // possibly running? mesos will tell us during reconciliation
Flags: make(map[FlagType]struct{}),
mapper: NewHostPortMapper(&pod),
launchTime: now,
bindTime: now,
Spec: &Spec{},

View File

@ -26,6 +26,8 @@ import (
"github.com/stretchr/testify/assert"
"k8s.io/kubernetes/contrib/mesos/pkg/node"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/meta"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask/hostport"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/resources"
"k8s.io/kubernetes/pkg/api"
)
@ -37,16 +39,18 @@ const (
func fakePodTask(id string, allowedRoles, defaultRoles []string) *T {
t, _ := New(
api.NewDefaultContext(),
"",
Config{
Prototype: &mesos.ExecutorInfo{},
FrameworkRoles: allowedRoles,
DefaultPodRoles: defaultRoles,
HostPortStrategy: hostport.StrategyWildcard,
},
&api.Pod{
ObjectMeta: api.ObjectMeta{
Name: id,
Namespace: api.NamespaceDefault,
},
},
&mesos.ExecutorInfo{},
allowedRoles,
defaultRoles,
)
return t
@ -219,7 +223,7 @@ func TestAcceptOfferPorts(t *testing.T) {
Resources: []*mesos.Resource{
mutil.NewScalarResource("cpus", t_min_cpu),
mutil.NewScalarResource("mem", t_min_mem),
newPortsResource("*", 1, 1),
resources.NewPorts("*", 1, 1),
},
}
@ -415,12 +419,3 @@ func newScalarAttribute(name string, val float64) *mesos.Attribute {
Scalar: &mesos.Value_Scalar{Value: proto.Float64(val)},
}
}
func newPortsResource(role string, ports ...uint64) *mesos.Resource {
return &mesos.Resource{
Name: proto.String("ports"),
Type: mesos.Value_RANGES.Enum(),
Ranges: newRanges(ports),
Role: stringPtrTo(role),
}
}

View File

@ -25,7 +25,7 @@ import (
mesos "github.com/mesos/mesos-go/mesosproto"
"github.com/mesos/mesos-go/mesosutil"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/executorinfo"
mresource "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/resource"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/resources"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/labels"
)
@ -164,8 +164,8 @@ func NewPodResourcesProcurement() Procurement {
return err
}
wantedCpus := float64(mresource.NewCPUShares(limits[api.ResourceCPU]))
wantedMem := float64(mresource.NewMegaBytes(limits[api.ResourceMemory]))
wantedCpus := float64(resources.NewCPUShares(limits[api.ResourceCPU]))
wantedMem := float64(resources.NewMegaBytes(limits[api.ResourceMemory]))
log.V(4).Infof(
"trying to match offer with pod %v/%v: cpus: %.2f mem: %.2f MB",
@ -199,18 +199,18 @@ func NewPodResourcesProcurement() Procurement {
func NewPortsProcurement() Procurement {
return ProcurementFunc(func(t *T, _ *api.Node, ps *ProcureState) error {
// fill in port mapping
if mapping, err := t.mapper.Map(t, ps.offer); err != nil {
if mapping, err := t.mapper.Map(&t.Pod, t.Roles(), ps.offer); err != nil {
return err
} else {
ports := []Port{}
ports := []resources.Port{}
for _, entry := range mapping {
ports = append(ports, Port{
ports = append(ports, resources.Port{
Port: entry.OfferPort,
Role: entry.Role,
})
}
ps.spec.PortMap = mapping
ps.spec.Resources = append(ps.spec.Resources, portRangeResources(ports)...)
ps.spec.Resources = append(ps.spec.Resources, resources.PortRanges(ports)...)
}
return nil
})
@ -220,20 +220,20 @@ func NewPortsProcurement() Procurement {
// If a given offer has no executor IDs set, the given prototype executor resources are considered for procurement.
// If a given offer has one executor ID set, only pod resources are being procured.
// An offer with more than one executor ID implies an invariant violation and the first executor ID is being considered.
func NewExecutorResourceProcurer(resources []*mesos.Resource, registry executorinfo.Registry) Procurement {
func NewExecutorResourceProcurer(rs []*mesos.Resource, registry executorinfo.Registry) Procurement {
return ProcurementFunc(func(t *T, _ *api.Node, ps *ProcureState) error {
eids := len(ps.offer.GetExecutorIds())
switch {
case eids == 0:
wantedCpus := sumResources(filterResources(resources, isScalar, hasName("cpus")))
wantedMem := sumResources(filterResources(resources, isScalar, hasName("mem")))
wantedCpus := resources.Sum(resources.Filter(rs, resources.IsScalar, resources.HasName("cpus")))
wantedMem := resources.Sum(resources.Filter(rs, resources.IsScalar, resources.HasName("mem")))
procuredCpu, remaining := procureScalarResources("cpus", wantedCpus, t.frameworkRoles, ps.offer.GetResources())
procuredCpu, remaining := procureScalarResources("cpus", wantedCpus, t.FrameworkRoles, ps.offer.GetResources())
if procuredCpu == nil {
return fmt.Errorf("not enough cpu resources for executor: want=%v", wantedCpus)
}
procuredMem, remaining := procureScalarResources("mem", wantedMem, t.frameworkRoles, remaining)
procuredMem, remaining := procureScalarResources("mem", wantedMem, t.FrameworkRoles, remaining)
if procuredMem == nil {
return fmt.Errorf("not enough mem resources for executor: want=%v", wantedMem)
}
@ -273,12 +273,12 @@ func procureScalarResources(
roles []string,
offered []*mesos.Resource,
) (procured, remaining []*mesos.Resource) {
sorted := byRoles(roles...).sort(offered)
sorted := resources.ByRoles(roles...).Sort(offered)
procured = make([]*mesos.Resource, 0, len(sorted))
remaining = make([]*mesos.Resource, 0, len(sorted))
for _, r := range sorted {
if want >= epsilon && resourceMatchesAll(r, hasName(name), isScalar) {
if want >= epsilon && resources.MatchesAll(r, resources.HasName(name), resources.IsScalar) {
left, role := r.GetScalar().GetValue(), r.Role
consumed := math.Min(want, left)

View File

@ -23,6 +23,8 @@ import (
"github.com/mesos/mesos-go/mesosutil"
mesos "github.com/mesos/mesos-go/mesosproto"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask/hostport"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/resources"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/resource"
"reflect"
@ -51,7 +53,12 @@ func TestNewPodResourcesProcurement(t *testing.T) {
task, _ := New(
api.NewDefaultContext(),
"",
Config{
Prototype: executor,
FrameworkRoles: []string{"*"},
DefaultPodRoles: []string{"*"},
HostPortStrategy: hostport.StrategyWildcard,
},
&api.Pod{
ObjectMeta: api.ObjectMeta{
Name: "test",
@ -76,9 +83,6 @@ func TestNewPodResourcesProcurement(t *testing.T) {
},
},
},
executor,
[]string{"*"},
[]string{"*"},
)
procurement := NewPodResourcesProcurement()
@ -214,6 +218,6 @@ func TestProcureRoleResources(t *testing.T) {
func scalar(name string, value float64, role string) *mesos.Resource {
res := mesosutil.NewScalarResource(name, value)
res.Role = stringPtrTo(role)
res.Role = resources.StringPtrTo(role)
return res
}

View File

@ -81,24 +81,3 @@ func inRoles(roles ...string) rolePredicate {
return ok
}
}
// starredRole returns a "*" if the given role is empty else the role itself
func starredRole(name string) string {
if name == "" {
return "*"
}
return name
}
// stringPtrTo returns a pointer to the given string
// or nil if it is empty string.
func stringPtrTo(s string) *string {
var protos *string
if s != "" {
protos = &s
}
return protos
}

View File

@ -14,5 +14,5 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
// Package resource contains the Mesos scheduler specific resource functions
package resource
// Package resources contains the Mesos scheduler specific resource functions
package resources

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package resource
package resources
import (
"k8s.io/kubernetes/pkg/api"

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package resource
package resources
import (
"math"

View File

@ -14,15 +14,20 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package podtask
package resources
import (
"github.com/gogo/protobuf/proto"
mesos "github.com/mesos/mesos-go/mesosproto"
)
// portRangeResources creates a range resource for the spec ports.
func portRangeResources(Ports []Port) []*mesos.Resource {
type Port struct {
Port uint64
Role string
}
// PortRanges creates a range resource for the spec ports.
func PortRanges(Ports []Port) []*mesos.Resource {
rolePorts := make(map[string][]uint64, len(Ports))
for _, p := range Ports {
@ -36,8 +41,8 @@ func portRangeResources(Ports []Port) []*mesos.Resource {
&mesos.Resource{
Name: proto.String("ports"),
Type: mesos.Value_RANGES.Enum(),
Ranges: newRanges(ports),
Role: stringPtrTo(role),
Ranges: NewRanges(ports),
Role: StringPtrTo(role),
},
)
}
@ -45,8 +50,8 @@ func portRangeResources(Ports []Port) []*mesos.Resource {
return resources
}
// newRanges generates port ranges from the given list of ports. (naive implementation)
func newRanges(ports []uint64) *mesos.Value_Ranges {
// NewRanges generates port ranges from the given list of ports. (naive implementation)
func NewRanges(ports []uint64) *mesos.Value_Ranges {
r := make([]*mesos.Value_Range, 0, len(ports))
for _, port := range ports {
x := proto.Uint64(port)
@ -55,11 +60,11 @@ func newRanges(ports []uint64) *mesos.Value_Ranges {
return &mesos.Value_Ranges{Range: r}
}
// foreachPortsRange calls f for each resource that matches the given roles
// ForeachPortsRange calls f for each resource that matches the given roles
// in the order of the given roles.
func foreachPortsRange(rs []*mesos.Resource, roles []string, f func(begin, end uint64, role string)) {
rs = filterResources(rs, hasName("ports"))
rs = byRoles(roles...).sort(rs)
func ForeachPortsRange(rs []*mesos.Resource, roles []string, f func(begin, end uint64, role string)) {
rs = Filter(rs, HasName("ports"))
rs = ByRoles(roles...).Sort(rs)
for _, resource := range rs {
for _, r := range (*resource).GetRanges().Range {
@ -70,22 +75,22 @@ func foreachPortsRange(rs []*mesos.Resource, roles []string, f func(begin, end u
}
}
// byRolesSorter sorts resources according to the ordering of roles.
type byRolesSorter struct {
// ByRolesSorter sorts resources according to the ordering of roles.
type ByRolesSorter struct {
roles []string
}
// byRoles returns a byRolesSorter with the given roles.
func byRoles(roles ...string) *byRolesSorter {
return &byRolesSorter{roles: roles}
// ByRoles returns a ByRolesSorter with the given roles.
func ByRoles(roles ...string) *ByRolesSorter {
return &ByRolesSorter{roles: roles}
}
// sort sorts the given resources according to the order of roles in the byRolesSorter
// sort sorts the given resources according to the order of roles in the ByRolesSorter
// and returns the sorted resources.
func (sorter *byRolesSorter) sort(resources []*mesos.Resource) []*mesos.Resource {
func (sorter *ByRolesSorter) Sort(resources []*mesos.Resource) []*mesos.Resource {
rolesMap := map[string][]*mesos.Resource{} // maps roles to resources
for _, res := range resources {
role := starredRole(res.GetRole())
role := CanonicalRole(res.GetRole())
rolesMap[role] = append(rolesMap[role], res)
}
@ -99,12 +104,21 @@ func (sorter *byRolesSorter) sort(resources []*mesos.Resource) []*mesos.Resource
return result
}
// resourcePredicate is a predicate function on *mesos.Resource structs.
type resourcePredicate func(*mesos.Resource) bool
// ResourcePredicate is a predicate function on *mesos.Resource structs.
type (
ResourcePredicate func(*mesos.Resource) bool
ResourcePredicates []ResourcePredicate
)
// filter filters the given slice of resources and returns a slice of resources
// Filter filters the given slice of resources and returns a slice of resources
// matching all given predicates.
func filterResources(res []*mesos.Resource, ps ...resourcePredicate) []*mesos.Resource {
func Filter(res []*mesos.Resource, ps ...ResourcePredicate) []*mesos.Resource {
return ResourcePredicates(ps).Filter(res)
}
// Filter filters the given slice of resources and returns a slice of resources
// matching all given predicates.
func (ps ResourcePredicates) Filter(res []*mesos.Resource) []*mesos.Resource {
filtered := make([]*mesos.Resource, 0, len(res))
next:
@ -121,8 +135,13 @@ next:
return filtered
}
// resourceMatchesAll returns true if the given resource matches all given predicates ps.
func resourceMatchesAll(res *mesos.Resource, ps ...resourcePredicate) bool {
// MatchesAll returns true if the given resource matches all given predicates ps.
func MatchesAll(res *mesos.Resource, ps ...ResourcePredicate) bool {
return ResourcePredicates(ps).MatchesAll(res)
}
// MatchesAll returns true if the given resource matches all given predicates ps.
func (ps ResourcePredicates) MatchesAll(res *mesos.Resource) bool {
for _, p := range ps {
if !p(res) {
return false
@ -132,7 +151,7 @@ func resourceMatchesAll(res *mesos.Resource, ps ...resourcePredicate) bool {
return true
}
func sumResources(res []*mesos.Resource) float64 {
func Sum(res []*mesos.Resource) float64 {
var sum float64
for _, r := range res {
@ -142,15 +161,45 @@ func sumResources(res []*mesos.Resource) float64 {
return sum
}
// isScalar returns true if the given resource is a scalar type.
func isScalar(r *mesos.Resource) bool {
// IsScalar returns true if the given resource is a scalar type.
func IsScalar(r *mesos.Resource) bool {
return r.GetType() == mesos.Value_SCALAR
}
// hasName returns a resourcePredicate which returns true
// HasName returns a ResourcePredicate which returns true
// if the given resource has the given name.
func hasName(name string) resourcePredicate {
func HasName(name string) ResourcePredicate {
return func(r *mesos.Resource) bool {
return r.GetName() == name
}
}
// StringPtrTo returns a pointer to the given string
// or nil if it is empty string.
func StringPtrTo(s string) *string {
var protos *string
if s != "" {
protos = &s
}
return protos
}
// CanonicalRole returns a "*" if the given role is empty else the role itself
func CanonicalRole(name string) string {
if name == "" {
return "*"
}
return name
}
func NewPorts(role string, ports ...uint64) *mesos.Resource {
return &mesos.Resource{
Name: proto.String("ports"),
Type: mesos.Value_RANGES.Enum(),
Ranges: NewRanges(ports),
Role: StringPtrTo(role),
}
}

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package resource
package resources
import (
"fmt"

View File

@ -66,7 +66,8 @@ import (
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/meta"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/metrics"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask"
mresource "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/resource"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask/hostport"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/resources"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/resource"
"k8s.io/kubernetes/pkg/client/cache"
@ -95,8 +96,8 @@ const (
defaultReconcileCooldown = 15 * time.Second
defaultNodeRelistPeriod = 5 * time.Minute
defaultFrameworkName = "Kubernetes"
defaultExecutorCPUs = mresource.CPUShares(0.25) // initial CPU allocated for executor
defaultExecutorMem = mresource.MegaBytes(128.0) // initial memory allocated for executor
defaultExecutorCPUs = resources.CPUShares(0.25) // initial CPU allocated for executor
defaultExecutorMem = resources.MegaBytes(128.0) // initial memory allocated for executor
defaultExecutorInfoCacheSize = 10000
)
@ -119,8 +120,8 @@ type SchedulerServer struct {
mesosAuthPrincipal string
mesosAuthSecretFile string
mesosCgroupPrefix string
mesosExecutorCPUs mresource.CPUShares
mesosExecutorMem mresource.MegaBytes
mesosExecutorCPUs resources.CPUShares
mesosExecutorMem resources.MegaBytes
checkpoint bool
failoverTimeout float64
generateTaskDiscovery bool
@ -147,8 +148,8 @@ type SchedulerServer struct {
hostnameOverride string
reconcileInterval int64
reconcileCooldown time.Duration
defaultContainerCPULimit mresource.CPUShares
defaultContainerMemLimit mresource.MegaBytes
defaultContainerCPULimit resources.CPUShares
defaultContainerMemLimit resources.MegaBytes
schedulerConfigFileName string
graceful bool
frameworkName string
@ -174,6 +175,7 @@ type SchedulerServer struct {
sandboxOverlay string
conntrackMax int
conntrackTCPTimeoutEstablished int
useHostPortEndpoints bool
executable string // path to the binary running this service
client *clientset.Clientset
@ -202,8 +204,8 @@ func NewSchedulerServer() *SchedulerServer {
runProxy: true,
executorSuicideTimeout: execcfg.DefaultSuicideTimeout,
launchGracePeriod: execcfg.DefaultLaunchGracePeriod,
defaultContainerCPULimit: mresource.DefaultDefaultContainerCPULimit,
defaultContainerMemLimit: mresource.DefaultDefaultContainerMemLimit,
defaultContainerCPULimit: resources.DefaultDefaultContainerCPULimit,
defaultContainerMemLimit: resources.DefaultDefaultContainerMemLimit,
proxyMode: "userspace", // upstream default is "iptables" post-v1.1
@ -231,6 +233,7 @@ func NewSchedulerServer() *SchedulerServer {
containPodResources: true,
nodeRelistPeriod: defaultNodeRelistPeriod,
conntrackTCPTimeoutEstablished: 0, // non-zero values may require hand-tuning other sysctl's on the host; do so with caution
useHostPortEndpoints: true,
// non-zero values can trigger failures when updating /sys/module/nf_conntrack/parameters/hashsize
// when kube-proxy is running in a non-root netns (init_net); setting this to a non-zero value will
@ -293,6 +296,7 @@ func (s *SchedulerServer) addCoreFlags(fs *pflag.FlagSet) {
fs.Var(&s.defaultContainerMemLimit, "default-container-mem-limit", "Containers without a memory resource limit are admitted this much amount of memory in MB")
fs.BoolVar(&s.containPodResources, "contain-pod-resources", s.containPodResources, "Reparent pod containers into mesos cgroups; disable if you're having strange mesos/docker/systemd interactions.")
fs.DurationVar(&s.nodeRelistPeriod, "node-monitor-period", s.nodeRelistPeriod, "Period between relisting of all nodes from the apiserver.")
fs.BoolVar(&s.useHostPortEndpoints, "host-port-endpoints", s.useHostPortEndpoints, "Map service endpoints to hostIP:hostPort instead of podIP:containerPort. Default true.")
fs.IntVar(&s.executorLogV, "executor-logv", s.executorLogV, "Logging verbosity of spawned minion and executor processes.")
fs.BoolVar(&s.executorBindall, "executor-bindall", s.executorBindall, "When true will set -address of the executor to 0.0.0.0.")
@ -552,7 +556,6 @@ func (s *SchedulerServer) getDriver() (driver bindings.SchedulerDriver) {
}
func (s *SchedulerServer) Run(hks hyperkube.Interface, _ []string) error {
podtask.GenerateTaskDiscoveryEnabled = s.generateTaskDiscovery
if n := len(s.frameworkRoles); n == 0 || n > 2 || (n == 2 && s.frameworkRoles[0] != "*" && s.frameworkRoles[1] != "*") {
log.Fatalf(`only one custom role allowed in addition to "*"`)
}
@ -794,8 +797,14 @@ func (s *SchedulerServer) bootstrap(hks hyperkube.Interface, sc *schedcfg.Config
broadcaster.StartLogging(log.Infof)
broadcaster.StartRecordingToSink(&unversioned_core.EventSinkImpl{eventsClient.Events("")})
// create scheduler core with all components arranged around it
lw := cache.NewListWatchFromClient(s.client.CoreClient, "pods", api.NamespaceAll, fields.Everything())
hostPortStrategy := hostport.StrategyFixed
if s.useHostPortEndpoints {
hostPortStrategy = hostport.StrategyWildcard
}
// create scheduler core with all components arranged around it
sched := components.New(
sc,
framework,
@ -805,9 +814,13 @@ func (s *SchedulerServer) bootstrap(hks hyperkube.Interface, sc *schedcfg.Config
schedulerProcess.Terminal(),
s.mux,
lw,
eiPrototype,
s.frameworkRoles,
s.defaultPodRoles,
podtask.Config{
DefaultPodRoles: s.defaultPodRoles,
FrameworkRoles: s.frameworkRoles,
GenerateTaskDiscoveryEnabled: s.generateTaskDiscovery,
HostPortStrategy: hostPortStrategy,
Prototype: eiPrototype,
},
s.defaultContainerCPULimit,
s.defaultContainerMemLimit,
)

View File

@ -20,7 +20,7 @@ import (
"testing"
"time"
mresource "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/resource"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/resources"
"github.com/stretchr/testify/assert"
)
@ -113,6 +113,6 @@ func Test_DefaultResourceLimits(t *testing.T) {
assert := assert.New(t)
s := NewSchedulerServer()
assert.Equal(s.defaultContainerCPULimit, mresource.DefaultDefaultContainerCPULimit)
assert.Equal(s.defaultContainerMemLimit, mresource.DefaultDefaultContainerMemLimit)
assert.Equal(s.defaultContainerCPULimit, resources.DefaultDefaultContainerCPULimit)
assert.Equal(s.defaultContainerMemLimit, resources.DefaultDefaultContainerMemLimit)
}

View File

@ -19,23 +19,23 @@ package service
import (
log "github.com/golang/glog"
"k8s.io/kubernetes/contrib/mesos/pkg/podutil"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/resource"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/resources"
"k8s.io/kubernetes/pkg/api"
)
// StaticPodValidator discards a pod if we can't calculate resource limits for it.
func StaticPodValidator(
defaultContainerCPULimit resource.CPUShares,
defaultContainerMemLimit resource.MegaBytes,
defaultContainerCPULimit resources.CPUShares,
defaultContainerMemLimit resources.MegaBytes,
accumCPU, accumMem *float64,
) podutil.FilterFunc {
return podutil.FilterFunc(func(pod *api.Pod) (bool, error) {
_, cpu, _, err := resource.LimitPodCPU(pod, defaultContainerCPULimit)
_, cpu, _, err := resources.LimitPodCPU(pod, defaultContainerCPULimit)
if err != nil {
return false, err
}
_, mem, _, err := resource.LimitPodMem(pod, defaultContainerMemLimit)
_, mem, _, err := resources.LimitPodMem(pod, defaultContainerMemLimit)
if err != nil {
return false, err
}

View File

@ -21,7 +21,7 @@ import (
"github.com/stretchr/testify/assert"
"k8s.io/kubernetes/contrib/mesos/pkg/podutil"
mresource "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/resource"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/resources"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/service"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/resource"
@ -143,7 +143,7 @@ func containers(ct ...api.Container) podOpt {
})
}
func resourceLimits(cpu mresource.CPUShares, mem mresource.MegaBytes) ctOpt {
func resourceLimits(cpu resources.CPUShares, mem resources.MegaBytes) ctOpt {
return ctOpt(func(c *api.Container) {
if c.Resources.Limits == nil {
c.Resources.Limits = make(api.ResourceList)

View File

@ -6,7 +6,6 @@ file_owner
file_perm
fs_type
gke_context
host_port_endpoints
max_in_flight
max_par
new_file_0644

View File

@ -138,6 +138,7 @@ horizontal-pod-autoscaler-sync-period
host-ipc-sources
host-network-sources
host-pid-sources
host-port-endpoints
hostname-override
http-check-frequency
http-port