From 8c776b7eb950730ffec5de04cfd4f3b65789ff83 Mon Sep 17 00:00:00 2001 From: James DeFelice Date: Mon, 15 Feb 2016 19:26:32 +0000 Subject: [PATCH] host port mapping should be more consistent between scheduler and controller-manager in k8sm: - rename host_port_endpoints flag to host-port-endpoints - host port mapping strategy in scheduler should be driven by host-port-endpoints flag - added host-port-endpoints to known flags - docs: scheduler should also be configured with host-port-endpoints - task recovery: be explicit about excluding mirror pods --- contrib/mesos/docs/issues.md | 3 +- .../controllermanager/controllermanager.go | 2 +- contrib/mesos/pkg/executor/executor_test.go | 18 +-- .../components/algorithm/algorithm.go | 44 +++---- .../components/deleter/deleter_test.go | 19 +-- .../pkg/scheduler/components/scheduler.go | 11 +- .../scheduler/integration/integration_test.go | 16 ++- .../{port_mapping.go => hostport/mapper.go} | 120 ++++++++++-------- .../mapper_test.go} | 67 ++++------ .../mesos/pkg/scheduler/podtask/pod_task.go | 92 ++++++++------ .../pkg/scheduler/podtask/pod_task_test.go | 23 ++-- .../pkg/scheduler/podtask/procurement.go | 28 ++-- .../pkg/scheduler/podtask/procurement_test.go | 14 +- contrib/mesos/pkg/scheduler/podtask/roles.go | 21 --- .../scheduler/{resource => resources}/doc.go | 4 +- .../{resource => resources}/resource.go | 2 +- .../{resource => resources}/resource_test.go | 2 +- .../{podtask => resources}/resources.go | 109 +++++++++++----- .../{resource => resources}/types.go | 2 +- .../mesos/pkg/scheduler/service/service.go | 41 ++++-- .../pkg/scheduler/service/service_test.go | 6 +- .../mesos/pkg/scheduler/service/validation.go | 10 +- .../pkg/scheduler/service/validation_test.go | 4 +- hack/verify-flags/excluded-flags.txt | 1 - hack/verify-flags/known-flags.txt | 1 + 25 files changed, 355 insertions(+), 305 deletions(-) rename contrib/mesos/pkg/scheduler/podtask/{port_mapping.go => hostport/mapper.go} (59%) rename contrib/mesos/pkg/scheduler/podtask/{port_mapping_test.go => hostport/mapper_test.go} (76%) rename contrib/mesos/pkg/scheduler/{resource => resources}/doc.go (86%) rename contrib/mesos/pkg/scheduler/{resource => resources}/resource.go (99%) rename contrib/mesos/pkg/scheduler/{resource => resources}/resource_test.go (99%) rename contrib/mesos/pkg/scheduler/{podtask => resources}/resources.go (50%) rename contrib/mesos/pkg/scheduler/{resource => resources}/types.go (98%) diff --git a/contrib/mesos/docs/issues.md b/contrib/mesos/docs/issues.md index 5730127de60..ed144215708 100644 --- a/contrib/mesos/docs/issues.md +++ b/contrib/mesos/docs/issues.md @@ -137,7 +137,8 @@ Host ports that are not defined, or else defined as zero, will automatically be To disable the work-around and revert to vanilla Kubernetes service endpoint termination: -- execute the k8sm controller-manager with `-host_port_endpoints=false`; +- execute the k8sm scheduler with `-host-port-endpoints=false` +- execute the k8sm controller-manager with `-host-port-endpoints=false` Then the usual Kubernetes network assumptions must be fulfilled for Kubernetes to work with Mesos, i.e. each container must get a cluster-wide routable IP (compare [Kubernetes Networking documentation](../../../docs/design/networking.md#container-to-container)). diff --git a/contrib/mesos/pkg/controllermanager/controllermanager.go b/contrib/mesos/pkg/controllermanager/controllermanager.go index 2120d9dd71a..1712fdb03e2 100644 --- a/contrib/mesos/pkg/controllermanager/controllermanager.go +++ b/contrib/mesos/pkg/controllermanager/controllermanager.go @@ -84,7 +84,7 @@ func NewCMServer() *CMServer { // AddFlags adds flags for a specific CMServer to the specified FlagSet func (s *CMServer) AddFlags(fs *pflag.FlagSet) { s.CMServer.AddFlags(fs) - fs.BoolVar(&s.UseHostPortEndpoints, "host_port_endpoints", s.UseHostPortEndpoints, "Map service endpoints to hostIP:hostPort instead of podIP:containerPort. Default true.") + fs.BoolVar(&s.UseHostPortEndpoints, "host-port-endpoints", s.UseHostPortEndpoints, "Map service endpoints to hostIP:hostPort instead of podIP:containerPort. Default true.") } func (s *CMServer) resyncPeriod() time.Duration { diff --git a/contrib/mesos/pkg/executor/executor_test.go b/contrib/mesos/pkg/executor/executor_test.go index cc0a5e9c8f0..89e126e1c6d 100644 --- a/contrib/mesos/pkg/executor/executor_test.go +++ b/contrib/mesos/pkg/executor/executor_test.go @@ -34,6 +34,7 @@ import ( "k8s.io/kubernetes/contrib/mesos/pkg/podutil" kmruntime "k8s.io/kubernetes/contrib/mesos/pkg/runtime" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask" + "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask/hostport" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/testapi" "k8s.io/kubernetes/pkg/api/unversioned" @@ -197,11 +198,11 @@ func TestExecutorLaunchAndKillTask(t *testing.T) { podTask, err := podtask.New( api.NewDefaultContext(), - "", + podtask.Config{ + Prototype: executorinfo, + HostPortStrategy: hostport.StrategyWildcard, + }, pod, - executorinfo, - nil, - nil, ) assert.Equal(t, nil, err, "must be able to create a task from a pod") @@ -407,11 +408,12 @@ func TestExecutorFrameworkMessage(t *testing.T) { executorinfo := &mesosproto.ExecutorInfo{} podTask, _ := podtask.New( api.NewDefaultContext(), - "foo", + podtask.Config{ + ID: "foo", + Prototype: executorinfo, + HostPortStrategy: hostport.StrategyWildcard, + }, pod, - executorinfo, - nil, - nil, ) pod.Annotations = map[string]string{ "k8s.mesosphere.io/taskId": podTask.ID, diff --git a/contrib/mesos/pkg/scheduler/components/algorithm/algorithm.go b/contrib/mesos/pkg/scheduler/components/algorithm/algorithm.go index 2383a6316a6..7fe8adaa5e1 100644 --- a/contrib/mesos/pkg/scheduler/components/algorithm/algorithm.go +++ b/contrib/mesos/pkg/scheduler/components/algorithm/algorithm.go @@ -20,14 +20,13 @@ import ( "fmt" log "github.com/golang/glog" - "github.com/mesos/mesos-go/mesosproto" "k8s.io/kubernetes/contrib/mesos/pkg/offers" "k8s.io/kubernetes/contrib/mesos/pkg/queue" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/components/algorithm/podschedulers" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/errors" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask" - mresource "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/resource" + "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/resources" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/client/cache" ) @@ -42,14 +41,12 @@ type SchedulerAlgorithm interface { // SchedulerAlgorithm implements the algorithm.ScheduleAlgorithm interface type schedulerAlgorithm struct { - sched scheduler.Scheduler - podUpdates queue.FIFO - podScheduler podschedulers.PodScheduler - prototype *mesosproto.ExecutorInfo - frameworkRoles []string - defaultPodRoles []string - defaultCpus mresource.CPUShares - defaultMem mresource.MegaBytes + sched scheduler.Scheduler + podUpdates queue.FIFO + podScheduler podschedulers.PodScheduler + taskConfig podtask.Config + defaultCpus resources.CPUShares + defaultMem resources.MegaBytes } // New returns a new SchedulerAlgorithm @@ -58,20 +55,17 @@ func New( sched scheduler.Scheduler, podUpdates queue.FIFO, podScheduler podschedulers.PodScheduler, - prototype *mesosproto.ExecutorInfo, - frameworkRoles, defaultPodRoles []string, - defaultCpus mresource.CPUShares, - defaultMem mresource.MegaBytes, + taskConfig podtask.Config, + defaultCpus resources.CPUShares, + defaultMem resources.MegaBytes, ) SchedulerAlgorithm { return &schedulerAlgorithm{ - sched: sched, - podUpdates: podUpdates, - podScheduler: podScheduler, - frameworkRoles: frameworkRoles, - defaultPodRoles: defaultPodRoles, - prototype: prototype, - defaultCpus: defaultCpus, - defaultMem: defaultMem, + sched: sched, + podUpdates: podUpdates, + podScheduler: podScheduler, + taskConfig: taskConfig, + defaultCpus: defaultCpus, + defaultMem: defaultMem, } } @@ -109,7 +103,7 @@ func (k *schedulerAlgorithm) Schedule(pod *api.Pod) (string, error) { // From here on we can expect that the pod spec of a task has proper limits for CPU and memory. k.limitPod(pod) - podTask, err := podtask.New(ctx, "", pod, k.prototype, k.frameworkRoles, k.defaultPodRoles) + podTask, err := podtask.New(ctx, k.taskConfig, pod) if err != nil { log.Warningf("aborting Schedule, unable to create podtask object %+v: %v", pod, err) return "", err @@ -146,12 +140,12 @@ func (k *schedulerAlgorithm) Schedule(pod *api.Pod) (string, error) { // limitPod limits the given pod based on the scheduler's default limits. func (k *schedulerAlgorithm) limitPod(pod *api.Pod) error { - cpuRequest, cpuLimit, _, err := mresource.LimitPodCPU(pod, k.defaultCpus) + cpuRequest, cpuLimit, _, err := resources.LimitPodCPU(pod, k.defaultCpus) if err != nil { return err } - memRequest, memLimit, _, err := mresource.LimitPodMem(pod, k.defaultMem) + memRequest, memLimit, _, err := resources.LimitPodMem(pod, k.defaultMem) if err != nil { return err } diff --git a/contrib/mesos/pkg/scheduler/components/deleter/deleter_test.go b/contrib/mesos/pkg/scheduler/components/deleter/deleter_test.go index 044fdf56452..3c006307931 100644 --- a/contrib/mesos/pkg/scheduler/components/deleter/deleter_test.go +++ b/contrib/mesos/pkg/scheduler/components/deleter/deleter_test.go @@ -25,6 +25,7 @@ import ( types "k8s.io/kubernetes/contrib/mesos/pkg/scheduler" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/errors" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask" + "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask/hostport" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/queuer" "k8s.io/kubernetes/pkg/api" ) @@ -63,11 +64,12 @@ func TestDeleteOne_PendingPod(t *testing.T) { }}} task, err := podtask.New( api.NewDefaultContext(), - "bar", + podtask.Config{ + ID: "bar", + Prototype: &mesosproto.ExecutorInfo{}, + HostPortStrategy: hostport.StrategyWildcard, + }, pod.Pod, - &mesosproto.ExecutorInfo{}, - nil, - nil, ) if err != nil { t.Fatalf("failed to create task: %v", err) @@ -110,11 +112,12 @@ func TestDeleteOne_Running(t *testing.T) { }}} task, err := podtask.New( api.NewDefaultContext(), - "bar", + podtask.Config{ + ID: "bar", + Prototype: &mesosproto.ExecutorInfo{}, + HostPortStrategy: hostport.StrategyWildcard, + }, pod.Pod, - &mesosproto.ExecutorInfo{}, - nil, - nil, ) if err != nil { t.Fatalf("unexpected error: %v", err) diff --git a/contrib/mesos/pkg/scheduler/components/scheduler.go b/contrib/mesos/pkg/scheduler/components/scheduler.go index 2cffea9bd77..30e6769c8a9 100644 --- a/contrib/mesos/pkg/scheduler/components/scheduler.go +++ b/contrib/mesos/pkg/scheduler/components/scheduler.go @@ -39,7 +39,7 @@ import ( "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/config" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/queuer" - mresource "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/resource" + "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/resources" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/client/cache" "k8s.io/kubernetes/pkg/client/record" @@ -65,10 +65,9 @@ func New( terminate <-chan struct{}, mux *http.ServeMux, lw *cache.ListWatch, - prototype *mesos.ExecutorInfo, - frameworkRoles, defaultPodRoles []string, - defaultCpus mresource.CPUShares, - defaultMem mresource.MegaBytes, + taskConfig podtask.Config, + defaultCpus resources.CPUShares, + defaultMem resources.MegaBytes, ) scheduler.Scheduler { core := &sched{ framework: fw, @@ -82,7 +81,7 @@ func New( q := queuer.New(queue.NewDelayFIFO(), podUpdates) - algorithm := algorithm.New(core, podUpdates, ps, prototype, frameworkRoles, defaultPodRoles, defaultCpus, defaultMem) + algorithm := algorithm.New(core, podUpdates, ps, taskConfig, defaultCpus, defaultMem) podDeleter := deleter.New(core, q) diff --git a/contrib/mesos/pkg/scheduler/integration/integration_test.go b/contrib/mesos/pkg/scheduler/integration/integration_test.go index 239856e4d17..dbb6edd2311 100644 --- a/contrib/mesos/pkg/scheduler/integration/integration_test.go +++ b/contrib/mesos/pkg/scheduler/integration/integration_test.go @@ -43,7 +43,8 @@ import ( "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/ha" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/meta" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask" - mresource "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/resource" + "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask/hostport" + "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/resources" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/testapi" "k8s.io/kubernetes/pkg/api/unversioned" @@ -524,11 +525,14 @@ func newLifecycleTest(t *testing.T) lifecycleTest { schedulerProc.Terminal(), http.DefaultServeMux, &podsListWatch.ListWatch, - ei, - []string{"*"}, - []string{"*"}, - mresource.DefaultDefaultContainerCPULimit, - mresource.DefaultDefaultContainerMemLimit, + podtask.Config{ + Prototype: ei, + FrameworkRoles: []string{"*"}, + DefaultPodRoles: []string{"*"}, + HostPortStrategy: hostport.StrategyWildcard, + }, + resources.DefaultDefaultContainerCPULimit, + resources.DefaultDefaultContainerMemLimit, ) assert.NotNil(scheduler) diff --git a/contrib/mesos/pkg/scheduler/podtask/port_mapping.go b/contrib/mesos/pkg/scheduler/podtask/hostport/mapper.go similarity index 59% rename from contrib/mesos/pkg/scheduler/podtask/port_mapping.go rename to contrib/mesos/pkg/scheduler/podtask/hostport/mapper.go index deae79821eb..5484aff3614 100644 --- a/contrib/mesos/pkg/scheduler/podtask/port_mapping.go +++ b/contrib/mesos/pkg/scheduler/podtask/hostport/mapper.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package podtask +package hostport import ( "fmt" @@ -22,38 +22,30 @@ import ( log "github.com/golang/glog" mesos "github.com/mesos/mesos-go/mesosproto" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/meta" + "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/resources" "k8s.io/kubernetes/pkg/api" - "k8s.io/kubernetes/pkg/labels" ) -const ( - // maps a Container.HostPort to the same exact offered host port, ignores .HostPort = 0 - HostPortMappingFixed = "fixed" - // same as HostPortMappingFixed, except that .HostPort of 0 are mapped to any port offered - HostPortMappingWildcard = "wildcard" -) - -// Objects implementing the HostPortMapper interface generate port mappings +// Objects implementing the Mapper interface generate port mappings // from k8s container ports to ports offered by mesos -type HostPortMapper interface { - // Map maps the given pod task and the given mesos offer - // and returns a slice of port mappings - // or an error if the mapping failed - Map(t *T, offer *mesos.Offer) ([]HostPortMapping, error) +type Mapper interface { + // Map maps the given pod and the given mesos offer and returns a + // slice of port mappings or an error if the mapping failed + Map(pod *api.Pod, roles []string, offer *mesos.Offer) ([]Mapping, error) } -// HostPortMapperFunc is a function adapter to the HostPortMapper interface -type HostPortMapperFunc func(*T, *mesos.Offer) ([]HostPortMapping, error) +// MapperFunc is a function adapter to the Mapper interface +type MapperFunc func(*api.Pod, []string, *mesos.Offer) ([]Mapping, error) // Map calls f(t, offer) -func (f HostPortMapperFunc) Map(t *T, offer *mesos.Offer) ([]HostPortMapping, error) { - return f(t, offer) +func (f MapperFunc) Map(pod *api.Pod, roles []string, offer *mesos.Offer) ([]Mapping, error) { + return f(pod, roles, offer) } -// A HostPortMapping represents the mapping between k8s container ports +// A Mapping represents the mapping between k8s container ports // ports offered by mesos. It references the k8s' container and port // and specifies the offered mesos port and the offered port's role -type HostPortMapping struct { +type Mapping struct { ContainerIdx int // index of the container in the pod spec PortIdx int // index of the port in a container's port spec OfferPort uint64 // the port offered by mesos @@ -61,27 +53,27 @@ type HostPortMapping struct { } type PortAllocationError struct { - PodId string + PodID string Ports []uint64 } func (err *PortAllocationError) Error() string { - return fmt.Sprintf("Could not schedule pod %s: %d port(s) could not be allocated", err.PodId, len(err.Ports)) + return fmt.Sprintf("Could not schedule pod %s: %d port(s) could not be allocated", err.PodID, len(err.Ports)) } -type DuplicateHostPortError struct { - m1, m2 HostPortMapping +type DuplicateError struct { + m1, m2 Mapping } -func (err *DuplicateHostPortError) Error() string { +func (err *DuplicateError) Error() string { return fmt.Sprintf( "Host port %d is specified for container %d, pod %d and container %d, pod %d", err.m1.OfferPort, err.m1.ContainerIdx, err.m1.PortIdx, err.m2.ContainerIdx, err.m2.PortIdx) } // WildcardMapper maps k8s wildcard ports (hostPort == 0) to any available offer port -func WildcardMapper(t *T, offer *mesos.Offer) ([]HostPortMapping, error) { - mapping, err := FixedMapper(t, offer) +func WildcardMapper(pod *api.Pod, roles []string, offer *mesos.Offer) ([]Mapping, error) { + mapping, err := FixedMapper(pod, roles, offer) if err != nil { return nil, err } @@ -91,11 +83,11 @@ func WildcardMapper(t *T, offer *mesos.Offer) ([]HostPortMapping, error) { taken[entry.OfferPort] = struct{}{} } - wildports := []HostPortMapping{} - for i, container := range t.Pod.Spec.Containers { + wildports := []Mapping{} + for i, container := range pod.Spec.Containers { for pi, port := range container.Ports { if port.HostPort == 0 { - wildports = append(wildports, HostPortMapping{ + wildports = append(wildports, Mapping{ ContainerIdx: i, PortIdx: pi, }) @@ -104,7 +96,7 @@ func WildcardMapper(t *T, offer *mesos.Offer) ([]HostPortMapping, error) { } remaining := len(wildports) - foreachPortsRange(offer.GetResources(), t.Roles(), func(bp, ep uint64, role string) { + resources.ForeachPortsRange(offer.GetResources(), roles, func(bp, ep uint64, role string) { log.V(3).Infof("Searching for wildcard port in range {%d:%d}", bp, ep) for i := range wildports { if wildports[i].OfferPort != 0 { @@ -115,7 +107,7 @@ func WildcardMapper(t *T, offer *mesos.Offer) ([]HostPortMapping, error) { continue } wildports[i].OfferPort = port - wildports[i].Role = starredRole(role) + wildports[i].Role = resources.CanonicalRole(role) mapping = append(mapping, wildports[i]) remaining-- taken[port] = struct{}{} @@ -126,7 +118,7 @@ func WildcardMapper(t *T, offer *mesos.Offer) ([]HostPortMapping, error) { if remaining > 0 { err := &PortAllocationError{ - PodId: t.Pod.Name, + PodID: pod.Namespace + "/" + pod.Name, } // it doesn't make sense to include a port list here because they were all zero (wildcards) return nil, err @@ -136,10 +128,10 @@ func WildcardMapper(t *T, offer *mesos.Offer) ([]HostPortMapping, error) { } // FixedMapper maps k8s host ports to offered ports ignoring hostPorts == 0 (remaining pod-private) -func FixedMapper(t *T, offer *mesos.Offer) ([]HostPortMapping, error) { - requiredPorts := make(map[uint64]HostPortMapping) - mapping := []HostPortMapping{} - for i, container := range t.Pod.Spec.Containers { +func FixedMapper(pod *api.Pod, roles []string, offer *mesos.Offer) ([]Mapping, error) { + requiredPorts := make(map[uint64]Mapping) + mapping := []Mapping{} + for i, container := range pod.Spec.Containers { // strip all port==0 from this array; k8s already knows what to do with zero- // ports (it does not create 'port bindings' on the minion-host); we need to // remove the wildcards from this array since they don't consume host resources @@ -147,24 +139,24 @@ func FixedMapper(t *T, offer *mesos.Offer) ([]HostPortMapping, error) { if port.HostPort == 0 { continue // ignore } - m := HostPortMapping{ + m := Mapping{ ContainerIdx: i, PortIdx: pi, OfferPort: uint64(port.HostPort), } if entry, inuse := requiredPorts[uint64(port.HostPort)]; inuse { - return nil, &DuplicateHostPortError{entry, m} + return nil, &DuplicateError{entry, m} } requiredPorts[uint64(port.HostPort)] = m } } - foreachPortsRange(offer.GetResources(), t.Roles(), func(bp, ep uint64, role string) { + resources.ForeachPortsRange(offer.GetResources(), roles, func(bp, ep uint64, role string) { for port := range requiredPorts { log.V(3).Infof("evaluating port range {%d:%d} %d", bp, ep, port) if (bp <= port) && (port <= ep) { m := requiredPorts[port] - m.Role = starredRole(role) + m.Role = resources.CanonicalRole(role) mapping = append(mapping, m) delete(requiredPorts, port) } @@ -174,7 +166,7 @@ func FixedMapper(t *T, offer *mesos.Offer) ([]HostPortMapping, error) { unsatisfiedPorts := len(requiredPorts) if unsatisfiedPorts > 0 { err := &PortAllocationError{ - PodId: t.Pod.Name, + PodID: pod.Namespace + "/" + pod.Name, } for p := range requiredPorts { err.Ports = append(err.Ports, p) @@ -185,15 +177,35 @@ func FixedMapper(t *T, offer *mesos.Offer) ([]HostPortMapping, error) { return mapping, nil } -// NewHostPortMapper returns a new mapper based -// based on the port mapping key value -func NewHostPortMapper(pod *api.Pod) HostPortMapper { - filter := map[string]string{ - meta.PortMappingKey: HostPortMappingFixed, - } - selector := labels.Set(filter).AsSelector() - if selector.Matches(labels.Set(pod.Labels)) { - return HostPortMapperFunc(FixedMapper) - } - return HostPortMapperFunc(WildcardMapper) +type Strategy string + +const ( + // maps a Container.HostPort to the same exact offered host port, ignores .HostPort = 0 + StrategyFixed = Strategy("fixed") + // same as MappingFixed, except that .HostPort of 0 are mapped to any port offered + StrategyWildcard = Strategy("wildcard") +) + +var validStrategies = map[Strategy]MapperFunc{ + StrategyFixed: MapperFunc(FixedMapper), + StrategyWildcard: MapperFunc(WildcardMapper), +} + +// NewMapper returns a new mapper based on the port mapping key value +func (defaultStrategy Strategy) NewMapper(pod *api.Pod) Mapper { + strategy, ok := pod.Labels[meta.PortMappingKey] + if ok { + f, ok := validStrategies[Strategy(strategy)] + if ok { + return f + } + log.Warningf("invalid port mapping strategy %q, reverting to default %q", strategy, defaultStrategy) + } + + f, ok := validStrategies[defaultStrategy] + if ok { + return f + } + + panic("scheduler is misconfigured, unrecognized default strategy \"" + defaultStrategy + "\"") } diff --git a/contrib/mesos/pkg/scheduler/podtask/port_mapping_test.go b/contrib/mesos/pkg/scheduler/podtask/hostport/mapper_test.go similarity index 76% rename from contrib/mesos/pkg/scheduler/podtask/port_mapping_test.go rename to contrib/mesos/pkg/scheduler/podtask/hostport/mapper_test.go index bd9730c1e7a..ef153fdffa4 100644 --- a/contrib/mesos/pkg/scheduler/podtask/port_mapping_test.go +++ b/contrib/mesos/pkg/scheduler/podtask/hostport/mapper_test.go @@ -14,27 +14,31 @@ See the License for the specific language governing permissions and limitations under the License. */ -package podtask +package hostport import ( "testing" mesos "github.com/mesos/mesos-go/mesosproto" "github.com/mesos/mesos-go/mesosutil" + "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/resources" "k8s.io/kubernetes/pkg/api" ) func TestDefaultHostPortMatching(t *testing.T) { - t.Parallel() - task := fakePodTask("foo", nil, nil) - pod := &task.Pod + pod := &api.Pod{ + ObjectMeta: api.ObjectMeta{ + Name: "foo", + Namespace: "default", + }, + } offer := &mesos.Offer{ Resources: []*mesos.Resource{ - newPortsResource("*", 1, 1), + resources.NewPorts("*", 1, 1), }, } - mapping, err := FixedMapper(task, offer) + mapping, err := FixedMapper(pod, []string{"*"}, offer) if err != nil { t.Fatal(err) } @@ -52,12 +56,8 @@ func TestDefaultHostPortMatching(t *testing.T) { }}, }}, } - task, err = New(api.NewDefaultContext(), "", pod, &mesos.ExecutorInfo{}, nil, nil) - if err != nil { - t.Fatal(err) - } - _, err = FixedMapper(task, offer) - if err, _ := err.(*DuplicateHostPortError); err == nil { + _, err = FixedMapper(pod, []string{"*"}, offer) + if err, _ := err.(*DuplicateError); err == nil { t.Fatal("Expected duplicate port error") } else if err.m1.OfferPort != 123 { t.Fatal("Expected duplicate host port 123") @@ -65,12 +65,15 @@ func TestDefaultHostPortMatching(t *testing.T) { } func TestWildcardHostPortMatching(t *testing.T) { - t.Parallel() - task := fakePodTask("foo", nil, nil) - pod := &task.Pod + pod := &api.Pod{ + ObjectMeta: api.ObjectMeta{ + Name: "foo", + Namespace: "default", + }, + } offer := &mesos.Offer{} - mapping, err := WildcardMapper(task, offer) + mapping, err := WildcardMapper(pod, []string{"*"}, offer) if err != nil { t.Fatal(err) } @@ -81,10 +84,10 @@ func TestWildcardHostPortMatching(t *testing.T) { //-- offer = &mesos.Offer{ Resources: []*mesos.Resource{ - newPortsResource("*", 1, 1), + resources.NewPorts("*", 1, 1), }, } - mapping, err = WildcardMapper(task, offer) + mapping, err = WildcardMapper(pod, []string{"*"}, offer) if err != nil { t.Fatal(err) } @@ -100,11 +103,7 @@ func TestWildcardHostPortMatching(t *testing.T) { }}, }}, } - task, err = New(api.NewDefaultContext(), "", pod, &mesos.ExecutorInfo{}, nil, nil) - if err != nil { - t.Fatal(err) - } - mapping, err = WildcardMapper(task, offer) + mapping, err = WildcardMapper(pod, []string{"*"}, offer) if err == nil { t.Fatalf("expected error instead of mappings: %#v", mapping) } else if err, _ := err.(*PortAllocationError); err == nil { @@ -123,11 +122,7 @@ func TestWildcardHostPortMatching(t *testing.T) { }}, }}, } - task, err = New(api.NewDefaultContext(), "", pod, &mesos.ExecutorInfo{}, nil, nil) - if err != nil { - t.Fatal(err) - } - mapping, err = WildcardMapper(task, offer) + mapping, err = WildcardMapper(pod, []string{"*"}, offer) if err, _ := err.(*PortAllocationError); err == nil { t.Fatal("Expected port allocation error") } else if !(len(err.Ports) == 1 && err.Ports[0] == 123) { @@ -144,11 +139,7 @@ func TestWildcardHostPortMatching(t *testing.T) { }}, }}, } - task, err = New(api.NewDefaultContext(), "", pod, &mesos.ExecutorInfo{}, nil, nil) - if err != nil { - t.Fatal(err) - } - mapping, err = WildcardMapper(task, offer) + mapping, err = WildcardMapper(pod, []string{"*"}, offer) if err, _ := err.(*PortAllocationError); err == nil { t.Fatal("Expected port allocation error") } else if len(err.Ports) != 0 { @@ -158,10 +149,10 @@ func TestWildcardHostPortMatching(t *testing.T) { //-- offer = &mesos.Offer{ Resources: []*mesos.Resource{ - newPortsResource("*", 1, 2), + resources.NewPorts("*", 1, 2), }, } - mapping, err = WildcardMapper(task, offer) + mapping, err = WildcardMapper(pod, []string{"*"}, offer) if err != nil { t.Fatal(err) } else if len(mapping) != 2 { @@ -190,16 +181,12 @@ func TestWildcardHostPortMatching(t *testing.T) { }}, }}, } - task, err = New(api.NewDefaultContext(), "", pod, &mesos.ExecutorInfo{}, nil, nil) - if err != nil { - t.Fatal(err) - } offer = &mesos.Offer{ Resources: []*mesos.Resource{ mesosutil.NewRangesResource("ports", []*mesos.Value_Range{mesosutil.NewValueRange(1, 1), mesosutil.NewValueRange(3, 5)}), }, } - mapping, err = WildcardMapper(task, offer) + mapping, err = WildcardMapper(pod, []string{"*"}, offer) if err != nil { t.Fatal(err) } else if len(mapping) != 2 { diff --git a/contrib/mesos/pkg/scheduler/podtask/pod_task.go b/contrib/mesos/pkg/scheduler/podtask/pod_task.go index fb1a912e740..362169b9d70 100644 --- a/contrib/mesos/pkg/scheduler/podtask/pod_task.go +++ b/contrib/mesos/pkg/scheduler/podtask/pod_task.go @@ -27,7 +27,9 @@ import ( "k8s.io/kubernetes/contrib/mesos/pkg/offers" mesosmeta "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/meta" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/metrics" + "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask/hostport" "k8s.io/kubernetes/pkg/api" + kubetypes "k8s.io/kubernetes/pkg/kubelet/types" log "github.com/golang/glog" mesos "github.com/mesos/mesos-go/mesosproto" @@ -53,9 +55,22 @@ const ( var starRole = []string{"*"} +// Config represents elements that are used or required in order to +// create a pod task that may be scheduled. +type Config struct { + ID string // ID is an optional, unique task ID; auto-generated if not specified + DefaultPodRoles []string // DefaultPodRoles lists preferred resource groups, prioritized in order + FrameworkRoles []string // FrameworkRoles identify resource groups from which the framework may consume + Prototype *mesos.ExecutorInfo // Prototype is required + HostPortStrategy hostport.Strategy // HostPortStrategy is used as the port mapping strategy, unless overridden by the pod + GenerateTaskDiscoveryEnabled bool + mapper hostport.Mapper // host-port mapping func, derived from pod and default strategy + podKey string // k8s key for this pod; managed internally +} + // A struct that describes a pod task. type T struct { - ID string + Config Pod api.Pod // Stores the final procurement result, once set read-only. @@ -68,26 +83,16 @@ type T struct { CreateTime time.Time UpdatedTime time.Time // time of the most recent StatusUpdate we've seen from the mesos master - podStatus api.PodStatus - prototype *mesos.ExecutorInfo // readonly - frameworkRoles []string // Mesos framework roles, pods are allowed to be launched with those - defaultPodRoles []string // roles under which pods are scheduled if none are specified in labels - podKey string - launchTime time.Time - bindTime time.Time - mapper HostPortMapper -} - -type Port struct { - Port uint64 - Role string + podStatus api.PodStatus + launchTime time.Time + bindTime time.Time } type Spec struct { SlaveID string AssignedSlave string Resources []*mesos.Resource - PortMap []HostPortMapping + PortMap []hostport.Mapping Data []byte Executor *mesos.ExecutorInfo } @@ -130,9 +135,6 @@ func generateTaskName(pod *api.Pod) string { return fmt.Sprintf("%s.%s.pod", pod.Name, ns) } -// GenerateTaskDiscoveryEnabled turns on/off the generation of DiscoveryInfo for TaskInfo records -var GenerateTaskDiscoveryEnabled = false - func generateTaskDiscovery(pod *api.Pod) *mesos.DiscoveryInfo { di := &mesos.DiscoveryInfo{ Visibility: mesos.DiscoveryInfo_CLUSTER.Enum(), @@ -195,7 +197,7 @@ func (t *T) BuildTaskInfo() (*mesos.TaskInfo, error) { SlaveId: mutil.NewSlaveID(t.Spec.SlaveID), } - if GenerateTaskDiscoveryEnabled { + if t.GenerateTaskDiscoveryEnabled { info.Discovery = generateTaskDiscovery(&t.Pod) } @@ -237,46 +239,45 @@ func (t *T) Roles() (result []string) { return filterRoles( roles, - not(emptyRole), not(seenRole()), inRoles(t.frameworkRoles...), + not(emptyRole), not(seenRole()), inRoles(t.FrameworkRoles...), ) } // no roles label defined, return defaults - return t.defaultPodRoles + return t.DefaultPodRoles } -func New(ctx api.Context, id string, pod *api.Pod, prototype *mesos.ExecutorInfo, frameworkRoles, defaultPodRoles []string) (*T, error) { - if prototype == nil { - return nil, fmt.Errorf("illegal argument: executor is nil") +func New(ctx api.Context, config Config, pod *api.Pod) (*T, error) { + if config.Prototype == nil { + return nil, fmt.Errorf("illegal argument: executor-info prototype is nil") } - if len(frameworkRoles) == 0 { - frameworkRoles = starRole + if len(config.FrameworkRoles) == 0 { + config.FrameworkRoles = starRole } - if len(defaultPodRoles) == 0 { - defaultPodRoles = starRole + if len(config.DefaultPodRoles) == 0 { + config.DefaultPodRoles = starRole } key, err := MakePodKey(ctx, pod.Name) if err != nil { return nil, err } + config.podKey = key - if id == "" { - id = "pod." + uuid.NewUUID().String() + if config.ID == "" { + config.ID = "pod." + uuid.NewUUID().String() } + // the scheduler better get the fallback strategy right, otherwise we panic here + config.mapper = config.HostPortStrategy.NewMapper(pod) + task := &T{ - ID: id, - Pod: *pod, - State: StatePending, - podKey: key, - mapper: NewHostPortMapper(pod), - Flags: make(map[FlagType]struct{}), - prototype: prototype, - frameworkRoles: frameworkRoles, - defaultPodRoles: defaultPodRoles, + Pod: *pod, + Config: config, + State: StatePending, + Flags: make(map[FlagType]struct{}), } task.CreateTime = time.Now() @@ -305,11 +306,17 @@ func (t *T) SaveRecoveryInfo(dict map[string]string) { func RecoverFrom(pod api.Pod) (*T, bool, error) { // we only expect annotations if pod has been bound, which implies that it has already // been scheduled and launched - if pod.Spec.NodeName == "" && len(pod.Annotations) == 0 { + if len(pod.Annotations) == 0 { log.V(1).Infof("skipping recovery for unbound pod %v/%v", pod.Namespace, pod.Name) return nil, false, nil } + // we don't track mirror pods, they're considered part of the executor + if _, isMirrorPod := pod.Annotations[kubetypes.ConfigMirrorAnnotationKey]; isMirrorPod { + log.V(1).Infof("skipping recovery for mirror pod %v/%v", pod.Namespace, pod.Name) + return nil, false, nil + } + // only process pods that are not in a terminal state switch pod.Status.Phase { case api.PodPending, api.PodRunning, api.PodUnknown: // continue @@ -328,12 +335,13 @@ func RecoverFrom(pod api.Pod) (*T, bool, error) { now := time.Now() t := &T{ + Config: Config{ + podKey: key, + }, Pod: pod, CreateTime: now, - podKey: key, State: StatePending, // possibly running? mesos will tell us during reconciliation Flags: make(map[FlagType]struct{}), - mapper: NewHostPortMapper(&pod), launchTime: now, bindTime: now, Spec: &Spec{}, diff --git a/contrib/mesos/pkg/scheduler/podtask/pod_task_test.go b/contrib/mesos/pkg/scheduler/podtask/pod_task_test.go index 82b2daf19b0..1d3806d8eb8 100644 --- a/contrib/mesos/pkg/scheduler/podtask/pod_task_test.go +++ b/contrib/mesos/pkg/scheduler/podtask/pod_task_test.go @@ -26,6 +26,8 @@ import ( "github.com/stretchr/testify/assert" "k8s.io/kubernetes/contrib/mesos/pkg/node" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/meta" + "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask/hostport" + "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/resources" "k8s.io/kubernetes/pkg/api" ) @@ -37,16 +39,18 @@ const ( func fakePodTask(id string, allowedRoles, defaultRoles []string) *T { t, _ := New( api.NewDefaultContext(), - "", + Config{ + Prototype: &mesos.ExecutorInfo{}, + FrameworkRoles: allowedRoles, + DefaultPodRoles: defaultRoles, + HostPortStrategy: hostport.StrategyWildcard, + }, &api.Pod{ ObjectMeta: api.ObjectMeta{ Name: id, Namespace: api.NamespaceDefault, }, }, - &mesos.ExecutorInfo{}, - allowedRoles, - defaultRoles, ) return t @@ -219,7 +223,7 @@ func TestAcceptOfferPorts(t *testing.T) { Resources: []*mesos.Resource{ mutil.NewScalarResource("cpus", t_min_cpu), mutil.NewScalarResource("mem", t_min_mem), - newPortsResource("*", 1, 1), + resources.NewPorts("*", 1, 1), }, } @@ -415,12 +419,3 @@ func newScalarAttribute(name string, val float64) *mesos.Attribute { Scalar: &mesos.Value_Scalar{Value: proto.Float64(val)}, } } - -func newPortsResource(role string, ports ...uint64) *mesos.Resource { - return &mesos.Resource{ - Name: proto.String("ports"), - Type: mesos.Value_RANGES.Enum(), - Ranges: newRanges(ports), - Role: stringPtrTo(role), - } -} diff --git a/contrib/mesos/pkg/scheduler/podtask/procurement.go b/contrib/mesos/pkg/scheduler/podtask/procurement.go index 652c2233cf6..78f482bebe5 100644 --- a/contrib/mesos/pkg/scheduler/podtask/procurement.go +++ b/contrib/mesos/pkg/scheduler/podtask/procurement.go @@ -25,7 +25,7 @@ import ( mesos "github.com/mesos/mesos-go/mesosproto" "github.com/mesos/mesos-go/mesosutil" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/executorinfo" - mresource "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/resource" + "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/resources" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/labels" ) @@ -164,8 +164,8 @@ func NewPodResourcesProcurement() Procurement { return err } - wantedCpus := float64(mresource.NewCPUShares(limits[api.ResourceCPU])) - wantedMem := float64(mresource.NewMegaBytes(limits[api.ResourceMemory])) + wantedCpus := float64(resources.NewCPUShares(limits[api.ResourceCPU])) + wantedMem := float64(resources.NewMegaBytes(limits[api.ResourceMemory])) log.V(4).Infof( "trying to match offer with pod %v/%v: cpus: %.2f mem: %.2f MB", @@ -199,18 +199,18 @@ func NewPodResourcesProcurement() Procurement { func NewPortsProcurement() Procurement { return ProcurementFunc(func(t *T, _ *api.Node, ps *ProcureState) error { // fill in port mapping - if mapping, err := t.mapper.Map(t, ps.offer); err != nil { + if mapping, err := t.mapper.Map(&t.Pod, t.Roles(), ps.offer); err != nil { return err } else { - ports := []Port{} + ports := []resources.Port{} for _, entry := range mapping { - ports = append(ports, Port{ + ports = append(ports, resources.Port{ Port: entry.OfferPort, Role: entry.Role, }) } ps.spec.PortMap = mapping - ps.spec.Resources = append(ps.spec.Resources, portRangeResources(ports)...) + ps.spec.Resources = append(ps.spec.Resources, resources.PortRanges(ports)...) } return nil }) @@ -220,20 +220,20 @@ func NewPortsProcurement() Procurement { // If a given offer has no executor IDs set, the given prototype executor resources are considered for procurement. // If a given offer has one executor ID set, only pod resources are being procured. // An offer with more than one executor ID implies an invariant violation and the first executor ID is being considered. -func NewExecutorResourceProcurer(resources []*mesos.Resource, registry executorinfo.Registry) Procurement { +func NewExecutorResourceProcurer(rs []*mesos.Resource, registry executorinfo.Registry) Procurement { return ProcurementFunc(func(t *T, _ *api.Node, ps *ProcureState) error { eids := len(ps.offer.GetExecutorIds()) switch { case eids == 0: - wantedCpus := sumResources(filterResources(resources, isScalar, hasName("cpus"))) - wantedMem := sumResources(filterResources(resources, isScalar, hasName("mem"))) + wantedCpus := resources.Sum(resources.Filter(rs, resources.IsScalar, resources.HasName("cpus"))) + wantedMem := resources.Sum(resources.Filter(rs, resources.IsScalar, resources.HasName("mem"))) - procuredCpu, remaining := procureScalarResources("cpus", wantedCpus, t.frameworkRoles, ps.offer.GetResources()) + procuredCpu, remaining := procureScalarResources("cpus", wantedCpus, t.FrameworkRoles, ps.offer.GetResources()) if procuredCpu == nil { return fmt.Errorf("not enough cpu resources for executor: want=%v", wantedCpus) } - procuredMem, remaining := procureScalarResources("mem", wantedMem, t.frameworkRoles, remaining) + procuredMem, remaining := procureScalarResources("mem", wantedMem, t.FrameworkRoles, remaining) if procuredMem == nil { return fmt.Errorf("not enough mem resources for executor: want=%v", wantedMem) } @@ -273,12 +273,12 @@ func procureScalarResources( roles []string, offered []*mesos.Resource, ) (procured, remaining []*mesos.Resource) { - sorted := byRoles(roles...).sort(offered) + sorted := resources.ByRoles(roles...).Sort(offered) procured = make([]*mesos.Resource, 0, len(sorted)) remaining = make([]*mesos.Resource, 0, len(sorted)) for _, r := range sorted { - if want >= epsilon && resourceMatchesAll(r, hasName(name), isScalar) { + if want >= epsilon && resources.MatchesAll(r, resources.HasName(name), resources.IsScalar) { left, role := r.GetScalar().GetValue(), r.Role consumed := math.Min(want, left) diff --git a/contrib/mesos/pkg/scheduler/podtask/procurement_test.go b/contrib/mesos/pkg/scheduler/podtask/procurement_test.go index 910493dc505..cb01f23e4d6 100644 --- a/contrib/mesos/pkg/scheduler/podtask/procurement_test.go +++ b/contrib/mesos/pkg/scheduler/podtask/procurement_test.go @@ -23,6 +23,8 @@ import ( "github.com/mesos/mesos-go/mesosutil" mesos "github.com/mesos/mesos-go/mesosproto" + "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask/hostport" + "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/resources" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/resource" "reflect" @@ -51,7 +53,12 @@ func TestNewPodResourcesProcurement(t *testing.T) { task, _ := New( api.NewDefaultContext(), - "", + Config{ + Prototype: executor, + FrameworkRoles: []string{"*"}, + DefaultPodRoles: []string{"*"}, + HostPortStrategy: hostport.StrategyWildcard, + }, &api.Pod{ ObjectMeta: api.ObjectMeta{ Name: "test", @@ -76,9 +83,6 @@ func TestNewPodResourcesProcurement(t *testing.T) { }, }, }, - executor, - []string{"*"}, - []string{"*"}, ) procurement := NewPodResourcesProcurement() @@ -214,6 +218,6 @@ func TestProcureRoleResources(t *testing.T) { func scalar(name string, value float64, role string) *mesos.Resource { res := mesosutil.NewScalarResource(name, value) - res.Role = stringPtrTo(role) + res.Role = resources.StringPtrTo(role) return res } diff --git a/contrib/mesos/pkg/scheduler/podtask/roles.go b/contrib/mesos/pkg/scheduler/podtask/roles.go index b59d0679333..04673d30ea2 100644 --- a/contrib/mesos/pkg/scheduler/podtask/roles.go +++ b/contrib/mesos/pkg/scheduler/podtask/roles.go @@ -81,24 +81,3 @@ func inRoles(roles ...string) rolePredicate { return ok } } - -// starredRole returns a "*" if the given role is empty else the role itself -func starredRole(name string) string { - if name == "" { - return "*" - } - - return name -} - -// stringPtrTo returns a pointer to the given string -// or nil if it is empty string. -func stringPtrTo(s string) *string { - var protos *string - - if s != "" { - protos = &s - } - - return protos -} diff --git a/contrib/mesos/pkg/scheduler/resource/doc.go b/contrib/mesos/pkg/scheduler/resources/doc.go similarity index 86% rename from contrib/mesos/pkg/scheduler/resource/doc.go rename to contrib/mesos/pkg/scheduler/resources/doc.go index f1595050a3c..8ef318d2ddb 100644 --- a/contrib/mesos/pkg/scheduler/resource/doc.go +++ b/contrib/mesos/pkg/scheduler/resources/doc.go @@ -14,5 +14,5 @@ See the License for the specific language governing permissions and limitations under the License. */ -// Package resource contains the Mesos scheduler specific resource functions -package resource +// Package resources contains the Mesos scheduler specific resource functions +package resources diff --git a/contrib/mesos/pkg/scheduler/resource/resource.go b/contrib/mesos/pkg/scheduler/resources/resource.go similarity index 99% rename from contrib/mesos/pkg/scheduler/resource/resource.go rename to contrib/mesos/pkg/scheduler/resources/resource.go index a32bd629eec..792ab0cf413 100644 --- a/contrib/mesos/pkg/scheduler/resource/resource.go +++ b/contrib/mesos/pkg/scheduler/resources/resource.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package resource +package resources import ( "k8s.io/kubernetes/pkg/api" diff --git a/contrib/mesos/pkg/scheduler/resource/resource_test.go b/contrib/mesos/pkg/scheduler/resources/resource_test.go similarity index 99% rename from contrib/mesos/pkg/scheduler/resource/resource_test.go rename to contrib/mesos/pkg/scheduler/resources/resource_test.go index 04f0141d9ae..c2b79bfee03 100644 --- a/contrib/mesos/pkg/scheduler/resource/resource_test.go +++ b/contrib/mesos/pkg/scheduler/resources/resource_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package resource +package resources import ( "math" diff --git a/contrib/mesos/pkg/scheduler/podtask/resources.go b/contrib/mesos/pkg/scheduler/resources/resources.go similarity index 50% rename from contrib/mesos/pkg/scheduler/podtask/resources.go rename to contrib/mesos/pkg/scheduler/resources/resources.go index 875ce91822e..96cfe74cc8c 100644 --- a/contrib/mesos/pkg/scheduler/podtask/resources.go +++ b/contrib/mesos/pkg/scheduler/resources/resources.go @@ -14,15 +14,20 @@ See the License for the specific language governing permissions and limitations under the License. */ -package podtask +package resources import ( "github.com/gogo/protobuf/proto" mesos "github.com/mesos/mesos-go/mesosproto" ) -// portRangeResources creates a range resource for the spec ports. -func portRangeResources(Ports []Port) []*mesos.Resource { +type Port struct { + Port uint64 + Role string +} + +// PortRanges creates a range resource for the spec ports. +func PortRanges(Ports []Port) []*mesos.Resource { rolePorts := make(map[string][]uint64, len(Ports)) for _, p := range Ports { @@ -36,8 +41,8 @@ func portRangeResources(Ports []Port) []*mesos.Resource { &mesos.Resource{ Name: proto.String("ports"), Type: mesos.Value_RANGES.Enum(), - Ranges: newRanges(ports), - Role: stringPtrTo(role), + Ranges: NewRanges(ports), + Role: StringPtrTo(role), }, ) } @@ -45,8 +50,8 @@ func portRangeResources(Ports []Port) []*mesos.Resource { return resources } -// newRanges generates port ranges from the given list of ports. (naive implementation) -func newRanges(ports []uint64) *mesos.Value_Ranges { +// NewRanges generates port ranges from the given list of ports. (naive implementation) +func NewRanges(ports []uint64) *mesos.Value_Ranges { r := make([]*mesos.Value_Range, 0, len(ports)) for _, port := range ports { x := proto.Uint64(port) @@ -55,11 +60,11 @@ func newRanges(ports []uint64) *mesos.Value_Ranges { return &mesos.Value_Ranges{Range: r} } -// foreachPortsRange calls f for each resource that matches the given roles +// ForeachPortsRange calls f for each resource that matches the given roles // in the order of the given roles. -func foreachPortsRange(rs []*mesos.Resource, roles []string, f func(begin, end uint64, role string)) { - rs = filterResources(rs, hasName("ports")) - rs = byRoles(roles...).sort(rs) +func ForeachPortsRange(rs []*mesos.Resource, roles []string, f func(begin, end uint64, role string)) { + rs = Filter(rs, HasName("ports")) + rs = ByRoles(roles...).Sort(rs) for _, resource := range rs { for _, r := range (*resource).GetRanges().Range { @@ -70,22 +75,22 @@ func foreachPortsRange(rs []*mesos.Resource, roles []string, f func(begin, end u } } -// byRolesSorter sorts resources according to the ordering of roles. -type byRolesSorter struct { +// ByRolesSorter sorts resources according to the ordering of roles. +type ByRolesSorter struct { roles []string } -// byRoles returns a byRolesSorter with the given roles. -func byRoles(roles ...string) *byRolesSorter { - return &byRolesSorter{roles: roles} +// ByRoles returns a ByRolesSorter with the given roles. +func ByRoles(roles ...string) *ByRolesSorter { + return &ByRolesSorter{roles: roles} } -// sort sorts the given resources according to the order of roles in the byRolesSorter +// sort sorts the given resources according to the order of roles in the ByRolesSorter // and returns the sorted resources. -func (sorter *byRolesSorter) sort(resources []*mesos.Resource) []*mesos.Resource { +func (sorter *ByRolesSorter) Sort(resources []*mesos.Resource) []*mesos.Resource { rolesMap := map[string][]*mesos.Resource{} // maps roles to resources for _, res := range resources { - role := starredRole(res.GetRole()) + role := CanonicalRole(res.GetRole()) rolesMap[role] = append(rolesMap[role], res) } @@ -99,12 +104,21 @@ func (sorter *byRolesSorter) sort(resources []*mesos.Resource) []*mesos.Resource return result } -// resourcePredicate is a predicate function on *mesos.Resource structs. -type resourcePredicate func(*mesos.Resource) bool +// ResourcePredicate is a predicate function on *mesos.Resource structs. +type ( + ResourcePredicate func(*mesos.Resource) bool + ResourcePredicates []ResourcePredicate +) -// filter filters the given slice of resources and returns a slice of resources +// Filter filters the given slice of resources and returns a slice of resources // matching all given predicates. -func filterResources(res []*mesos.Resource, ps ...resourcePredicate) []*mesos.Resource { +func Filter(res []*mesos.Resource, ps ...ResourcePredicate) []*mesos.Resource { + return ResourcePredicates(ps).Filter(res) +} + +// Filter filters the given slice of resources and returns a slice of resources +// matching all given predicates. +func (ps ResourcePredicates) Filter(res []*mesos.Resource) []*mesos.Resource { filtered := make([]*mesos.Resource, 0, len(res)) next: @@ -121,8 +135,13 @@ next: return filtered } -// resourceMatchesAll returns true if the given resource matches all given predicates ps. -func resourceMatchesAll(res *mesos.Resource, ps ...resourcePredicate) bool { +// MatchesAll returns true if the given resource matches all given predicates ps. +func MatchesAll(res *mesos.Resource, ps ...ResourcePredicate) bool { + return ResourcePredicates(ps).MatchesAll(res) +} + +// MatchesAll returns true if the given resource matches all given predicates ps. +func (ps ResourcePredicates) MatchesAll(res *mesos.Resource) bool { for _, p := range ps { if !p(res) { return false @@ -132,7 +151,7 @@ func resourceMatchesAll(res *mesos.Resource, ps ...resourcePredicate) bool { return true } -func sumResources(res []*mesos.Resource) float64 { +func Sum(res []*mesos.Resource) float64 { var sum float64 for _, r := range res { @@ -142,15 +161,45 @@ func sumResources(res []*mesos.Resource) float64 { return sum } -// isScalar returns true if the given resource is a scalar type. -func isScalar(r *mesos.Resource) bool { +// IsScalar returns true if the given resource is a scalar type. +func IsScalar(r *mesos.Resource) bool { return r.GetType() == mesos.Value_SCALAR } -// hasName returns a resourcePredicate which returns true +// HasName returns a ResourcePredicate which returns true // if the given resource has the given name. -func hasName(name string) resourcePredicate { +func HasName(name string) ResourcePredicate { return func(r *mesos.Resource) bool { return r.GetName() == name } } + +// StringPtrTo returns a pointer to the given string +// or nil if it is empty string. +func StringPtrTo(s string) *string { + var protos *string + + if s != "" { + protos = &s + } + + return protos +} + +// CanonicalRole returns a "*" if the given role is empty else the role itself +func CanonicalRole(name string) string { + if name == "" { + return "*" + } + + return name +} + +func NewPorts(role string, ports ...uint64) *mesos.Resource { + return &mesos.Resource{ + Name: proto.String("ports"), + Type: mesos.Value_RANGES.Enum(), + Ranges: NewRanges(ports), + Role: StringPtrTo(role), + } +} diff --git a/contrib/mesos/pkg/scheduler/resource/types.go b/contrib/mesos/pkg/scheduler/resources/types.go similarity index 98% rename from contrib/mesos/pkg/scheduler/resource/types.go rename to contrib/mesos/pkg/scheduler/resources/types.go index 06aae14fb26..743853d805c 100644 --- a/contrib/mesos/pkg/scheduler/resource/types.go +++ b/contrib/mesos/pkg/scheduler/resources/types.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package resource +package resources import ( "fmt" diff --git a/contrib/mesos/pkg/scheduler/service/service.go b/contrib/mesos/pkg/scheduler/service/service.go index 362b67977c8..d691f5dfbee 100644 --- a/contrib/mesos/pkg/scheduler/service/service.go +++ b/contrib/mesos/pkg/scheduler/service/service.go @@ -66,7 +66,8 @@ import ( "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/meta" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/metrics" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask" - mresource "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/resource" + "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask/hostport" + "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/resources" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/resource" "k8s.io/kubernetes/pkg/client/cache" @@ -95,8 +96,8 @@ const ( defaultReconcileCooldown = 15 * time.Second defaultNodeRelistPeriod = 5 * time.Minute defaultFrameworkName = "Kubernetes" - defaultExecutorCPUs = mresource.CPUShares(0.25) // initial CPU allocated for executor - defaultExecutorMem = mresource.MegaBytes(128.0) // initial memory allocated for executor + defaultExecutorCPUs = resources.CPUShares(0.25) // initial CPU allocated for executor + defaultExecutorMem = resources.MegaBytes(128.0) // initial memory allocated for executor defaultExecutorInfoCacheSize = 10000 ) @@ -119,8 +120,8 @@ type SchedulerServer struct { mesosAuthPrincipal string mesosAuthSecretFile string mesosCgroupPrefix string - mesosExecutorCPUs mresource.CPUShares - mesosExecutorMem mresource.MegaBytes + mesosExecutorCPUs resources.CPUShares + mesosExecutorMem resources.MegaBytes checkpoint bool failoverTimeout float64 generateTaskDiscovery bool @@ -147,8 +148,8 @@ type SchedulerServer struct { hostnameOverride string reconcileInterval int64 reconcileCooldown time.Duration - defaultContainerCPULimit mresource.CPUShares - defaultContainerMemLimit mresource.MegaBytes + defaultContainerCPULimit resources.CPUShares + defaultContainerMemLimit resources.MegaBytes schedulerConfigFileName string graceful bool frameworkName string @@ -174,6 +175,7 @@ type SchedulerServer struct { sandboxOverlay string conntrackMax int conntrackTCPTimeoutEstablished int + useHostPortEndpoints bool executable string // path to the binary running this service client *clientset.Clientset @@ -202,8 +204,8 @@ func NewSchedulerServer() *SchedulerServer { runProxy: true, executorSuicideTimeout: execcfg.DefaultSuicideTimeout, launchGracePeriod: execcfg.DefaultLaunchGracePeriod, - defaultContainerCPULimit: mresource.DefaultDefaultContainerCPULimit, - defaultContainerMemLimit: mresource.DefaultDefaultContainerMemLimit, + defaultContainerCPULimit: resources.DefaultDefaultContainerCPULimit, + defaultContainerMemLimit: resources.DefaultDefaultContainerMemLimit, proxyMode: "userspace", // upstream default is "iptables" post-v1.1 @@ -231,6 +233,7 @@ func NewSchedulerServer() *SchedulerServer { containPodResources: true, nodeRelistPeriod: defaultNodeRelistPeriod, conntrackTCPTimeoutEstablished: 0, // non-zero values may require hand-tuning other sysctl's on the host; do so with caution + useHostPortEndpoints: true, // non-zero values can trigger failures when updating /sys/module/nf_conntrack/parameters/hashsize // when kube-proxy is running in a non-root netns (init_net); setting this to a non-zero value will @@ -293,6 +296,7 @@ func (s *SchedulerServer) addCoreFlags(fs *pflag.FlagSet) { fs.Var(&s.defaultContainerMemLimit, "default-container-mem-limit", "Containers without a memory resource limit are admitted this much amount of memory in MB") fs.BoolVar(&s.containPodResources, "contain-pod-resources", s.containPodResources, "Reparent pod containers into mesos cgroups; disable if you're having strange mesos/docker/systemd interactions.") fs.DurationVar(&s.nodeRelistPeriod, "node-monitor-period", s.nodeRelistPeriod, "Period between relisting of all nodes from the apiserver.") + fs.BoolVar(&s.useHostPortEndpoints, "host-port-endpoints", s.useHostPortEndpoints, "Map service endpoints to hostIP:hostPort instead of podIP:containerPort. Default true.") fs.IntVar(&s.executorLogV, "executor-logv", s.executorLogV, "Logging verbosity of spawned minion and executor processes.") fs.BoolVar(&s.executorBindall, "executor-bindall", s.executorBindall, "When true will set -address of the executor to 0.0.0.0.") @@ -552,7 +556,6 @@ func (s *SchedulerServer) getDriver() (driver bindings.SchedulerDriver) { } func (s *SchedulerServer) Run(hks hyperkube.Interface, _ []string) error { - podtask.GenerateTaskDiscoveryEnabled = s.generateTaskDiscovery if n := len(s.frameworkRoles); n == 0 || n > 2 || (n == 2 && s.frameworkRoles[0] != "*" && s.frameworkRoles[1] != "*") { log.Fatalf(`only one custom role allowed in addition to "*"`) } @@ -794,8 +797,14 @@ func (s *SchedulerServer) bootstrap(hks hyperkube.Interface, sc *schedcfg.Config broadcaster.StartLogging(log.Infof) broadcaster.StartRecordingToSink(&unversioned_core.EventSinkImpl{eventsClient.Events("")}) - // create scheduler core with all components arranged around it lw := cache.NewListWatchFromClient(s.client.CoreClient, "pods", api.NamespaceAll, fields.Everything()) + + hostPortStrategy := hostport.StrategyFixed + if s.useHostPortEndpoints { + hostPortStrategy = hostport.StrategyWildcard + } + + // create scheduler core with all components arranged around it sched := components.New( sc, framework, @@ -805,9 +814,13 @@ func (s *SchedulerServer) bootstrap(hks hyperkube.Interface, sc *schedcfg.Config schedulerProcess.Terminal(), s.mux, lw, - eiPrototype, - s.frameworkRoles, - s.defaultPodRoles, + podtask.Config{ + DefaultPodRoles: s.defaultPodRoles, + FrameworkRoles: s.frameworkRoles, + GenerateTaskDiscoveryEnabled: s.generateTaskDiscovery, + HostPortStrategy: hostPortStrategy, + Prototype: eiPrototype, + }, s.defaultContainerCPULimit, s.defaultContainerMemLimit, ) diff --git a/contrib/mesos/pkg/scheduler/service/service_test.go b/contrib/mesos/pkg/scheduler/service/service_test.go index 24c74e2adc4..ee4353ed05f 100644 --- a/contrib/mesos/pkg/scheduler/service/service_test.go +++ b/contrib/mesos/pkg/scheduler/service/service_test.go @@ -20,7 +20,7 @@ import ( "testing" "time" - mresource "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/resource" + "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/resources" "github.com/stretchr/testify/assert" ) @@ -113,6 +113,6 @@ func Test_DefaultResourceLimits(t *testing.T) { assert := assert.New(t) s := NewSchedulerServer() - assert.Equal(s.defaultContainerCPULimit, mresource.DefaultDefaultContainerCPULimit) - assert.Equal(s.defaultContainerMemLimit, mresource.DefaultDefaultContainerMemLimit) + assert.Equal(s.defaultContainerCPULimit, resources.DefaultDefaultContainerCPULimit) + assert.Equal(s.defaultContainerMemLimit, resources.DefaultDefaultContainerMemLimit) } diff --git a/contrib/mesos/pkg/scheduler/service/validation.go b/contrib/mesos/pkg/scheduler/service/validation.go index 4b58532ce6e..b411fb9b8f0 100644 --- a/contrib/mesos/pkg/scheduler/service/validation.go +++ b/contrib/mesos/pkg/scheduler/service/validation.go @@ -19,23 +19,23 @@ package service import ( log "github.com/golang/glog" "k8s.io/kubernetes/contrib/mesos/pkg/podutil" - "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/resource" + "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/resources" "k8s.io/kubernetes/pkg/api" ) // StaticPodValidator discards a pod if we can't calculate resource limits for it. func StaticPodValidator( - defaultContainerCPULimit resource.CPUShares, - defaultContainerMemLimit resource.MegaBytes, + defaultContainerCPULimit resources.CPUShares, + defaultContainerMemLimit resources.MegaBytes, accumCPU, accumMem *float64, ) podutil.FilterFunc { return podutil.FilterFunc(func(pod *api.Pod) (bool, error) { - _, cpu, _, err := resource.LimitPodCPU(pod, defaultContainerCPULimit) + _, cpu, _, err := resources.LimitPodCPU(pod, defaultContainerCPULimit) if err != nil { return false, err } - _, mem, _, err := resource.LimitPodMem(pod, defaultContainerMemLimit) + _, mem, _, err := resources.LimitPodMem(pod, defaultContainerMemLimit) if err != nil { return false, err } diff --git a/contrib/mesos/pkg/scheduler/service/validation_test.go b/contrib/mesos/pkg/scheduler/service/validation_test.go index 744e5b421c5..5de45cb64c0 100644 --- a/contrib/mesos/pkg/scheduler/service/validation_test.go +++ b/contrib/mesos/pkg/scheduler/service/validation_test.go @@ -21,7 +21,7 @@ import ( "github.com/stretchr/testify/assert" "k8s.io/kubernetes/contrib/mesos/pkg/podutil" - mresource "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/resource" + "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/resources" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/service" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/resource" @@ -143,7 +143,7 @@ func containers(ct ...api.Container) podOpt { }) } -func resourceLimits(cpu mresource.CPUShares, mem mresource.MegaBytes) ctOpt { +func resourceLimits(cpu resources.CPUShares, mem resources.MegaBytes) ctOpt { return ctOpt(func(c *api.Container) { if c.Resources.Limits == nil { c.Resources.Limits = make(api.ResourceList) diff --git a/hack/verify-flags/excluded-flags.txt b/hack/verify-flags/excluded-flags.txt index 3413ccd4d35..fd9a091b1bc 100644 --- a/hack/verify-flags/excluded-flags.txt +++ b/hack/verify-flags/excluded-flags.txt @@ -6,7 +6,6 @@ file_owner file_perm fs_type gke_context -host_port_endpoints max_in_flight max_par new_file_0644 diff --git a/hack/verify-flags/known-flags.txt b/hack/verify-flags/known-flags.txt index 4653a395c15..e0ab79aa23e 100644 --- a/hack/verify-flags/known-flags.txt +++ b/hack/verify-flags/known-flags.txt @@ -138,6 +138,7 @@ horizontal-pod-autoscaler-sync-period host-ipc-sources host-network-sources host-pid-sources +host-port-endpoints hostname-override http-check-frequency http-port