From 1748f028d5c478f6f114c1d3815b983b44d812e6 Mon Sep 17 00:00:00 2001 From: James DeFelice Date: Fri, 15 Jan 2016 05:22:06 +0000 Subject: [PATCH] experimental support for task discovery-info generation --- .../mesos/pkg/scheduler/meta/annotations.go | 27 +++--- .../mesos/pkg/scheduler/podtask/pod_task.go | 87 +++++++++++++++---- .../pkg/scheduler/podtask/pod_task_test.go | 4 +- .../mesos/pkg/scheduler/service/service.go | 43 ++++----- hack/verify-flags/known-flags.txt | 1 + 5 files changed, 112 insertions(+), 50 deletions(-) diff --git a/contrib/mesos/pkg/scheduler/meta/annotations.go b/contrib/mesos/pkg/scheduler/meta/annotations.go index 85f1745a8c5..7d051e7a67d 100644 --- a/contrib/mesos/pkg/scheduler/meta/annotations.go +++ b/contrib/mesos/pkg/scheduler/meta/annotations.go @@ -18,21 +18,24 @@ package meta // kubernetes api object annotations const ( + // Namespace is the label and annotation namespace for mesos keys + Namespace = "k8s.mesosphere.io" + // the BindingHostKey pod annotation marks a pod as being assigned to a Mesos // slave. It is already or will be launched on the slave as a task. - BindingHostKey = "k8s.mesosphere.io/bindingHost" + BindingHostKey = Namespace + "/bindingHost" - TaskIdKey = "k8s.mesosphere.io/taskId" - SlaveIdKey = "k8s.mesosphere.io/slaveId" - OfferIdKey = "k8s.mesosphere.io/offerId" - ExecutorIdKey = "k8s.mesosphere.io/executorId" - ExecutorResourcesKey = "k8s.mesosphere.io/executorResources" - PortMappingKey = "k8s.mesosphere.io/portMapping" - PortMappingKeyPrefix = "k8s.mesosphere.io/port_" + TaskIdKey = Namespace + "/taskId" + SlaveIdKey = Namespace + "/slaveId" + OfferIdKey = Namespace + "/offerId" + ExecutorIdKey = Namespace + "/executorId" + ExecutorResourcesKey = Namespace + "/executorResources" + PortMappingKey = Namespace + "/portMapping" + PortMappingKeyPrefix = Namespace + "/port_" PortMappingKeyFormat = PortMappingKeyPrefix + "%s_%d" - PortNameMappingKeyPrefix = "k8s.mesosphere.io/portName_" + PortNameMappingKeyPrefix = Namespace + "/portName_" PortNameMappingKeyFormat = PortNameMappingKeyPrefix + "%s_%s" - ContainerPortKeyFormat = "k8s.mesosphere.io/containerPort_%s_%s_%d" - StaticPodFilenameKey = "k8s.mesosphere.io/staticPodFilename" - RolesKey = "k8s.mesosphere.io/roles" + ContainerPortKeyFormat = Namespace + "/containerPort_%s_%s_%d" + StaticPodFilenameKey = Namespace + "/staticPodFilename" + RolesKey = Namespace + "/roles" ) diff --git a/contrib/mesos/pkg/scheduler/podtask/pod_task.go b/contrib/mesos/pkg/scheduler/podtask/pod_task.go index 05b0bbf5b64..fb1a912e740 100644 --- a/contrib/mesos/pkg/scheduler/podtask/pod_task.go +++ b/contrib/mesos/pkg/scheduler/podtask/pod_task.go @@ -25,7 +25,7 @@ import ( "github.com/gogo/protobuf/proto" "github.com/pborman/uuid" "k8s.io/kubernetes/contrib/mesos/pkg/offers" - annotation "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/meta" + mesosmeta "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/meta" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/metrics" "k8s.io/kubernetes/pkg/api" @@ -127,7 +127,58 @@ func generateTaskName(pod *api.Pod) string { if ns == "" { ns = api.NamespaceDefault } - return fmt.Sprintf("%s.%s.pods", pod.Name, ns) + return fmt.Sprintf("%s.%s.pod", pod.Name, ns) +} + +// GenerateTaskDiscoveryEnabled turns on/off the generation of DiscoveryInfo for TaskInfo records +var GenerateTaskDiscoveryEnabled = false + +func generateTaskDiscovery(pod *api.Pod) *mesos.DiscoveryInfo { + di := &mesos.DiscoveryInfo{ + Visibility: mesos.DiscoveryInfo_CLUSTER.Enum(), + } + switch visibility := pod.Annotations[mesosmeta.Namespace+"/discovery-visibility"]; visibility { + case "framework": + di.Visibility = mesos.DiscoveryInfo_FRAMEWORK.Enum() + case "external": + di.Visibility = mesos.DiscoveryInfo_EXTERNAL.Enum() + case "", "cluster": + // noop, pick the default we already set + default: + // default to CLUSTER, just warn the user + log.Warningf("unsupported discovery-visibility annotation: %q", visibility) + } + // name should be {{label|annotation}:name}.{pod:namespace}.pod + nameDecorator := func(n string) *string { + ns := pod.Namespace + if ns == "" { + ns = api.NamespaceDefault + } + x := n + "." + ns + "." + "pod" + return &x + } + for _, tt := range []struct { + fieldName string + dest **string + decorator func(string) *string + }{ + {"name", &di.Name, nameDecorator}, + {"environment", &di.Environment, nil}, + {"location", &di.Location, nil}, + {"version", &di.Version, nil}, + } { + d := tt.decorator + if d == nil { + d = func(s string) *string { return &s } + } + if v, ok := pod.Labels[tt.fieldName]; ok && v != "" { + *tt.dest = d(v) + } + if v, ok := pod.Annotations[mesosmeta.Namespace+"/discovery-"+tt.fieldName]; ok && v != "" { + *tt.dest = d(v) + } + } + return di } func (t *T) BuildTaskInfo() (*mesos.TaskInfo, error) { @@ -144,6 +195,10 @@ func (t *T) BuildTaskInfo() (*mesos.TaskInfo, error) { SlaveId: mutil.NewSlaveID(t.Spec.SlaveID), } + if GenerateTaskDiscoveryEnabled { + info.Discovery = generateTaskDiscovery(&t.Pod) + } + return info, nil } @@ -173,7 +228,7 @@ func (t *T) Has(f FlagType) (exists bool) { // If the pod has roles annotations defined they are being used // else default pod roles are being returned. func (t *T) Roles() (result []string) { - if r, ok := t.Pod.ObjectMeta.Annotations[annotation.RolesKey]; ok { + if r, ok := t.Pod.ObjectMeta.Annotations[mesosmeta.RolesKey]; ok { roles := strings.Split(r, ",") for i, r := range roles { @@ -229,10 +284,10 @@ func New(ctx api.Context, id string, pod *api.Pod, prototype *mesos.ExecutorInfo } func (t *T) SaveRecoveryInfo(dict map[string]string) { - dict[annotation.TaskIdKey] = t.ID - dict[annotation.SlaveIdKey] = t.Spec.SlaveID - dict[annotation.OfferIdKey] = t.Offer.Details().Id.GetValue() - dict[annotation.ExecutorIdKey] = t.Spec.Executor.ExecutorId.GetValue() + dict[mesosmeta.TaskIdKey] = t.ID + dict[mesosmeta.SlaveIdKey] = t.Spec.SlaveID + dict[mesosmeta.OfferIdKey] = t.Offer.Details().Id.GetValue() + dict[mesosmeta.ExecutorIdKey] = t.Spec.Executor.ExecutorId.GetValue() } // reconstruct a task from metadata stashed in a pod entry. there are limited pod states that @@ -287,25 +342,25 @@ func RecoverFrom(pod api.Pod) (*T, bool, error) { offerId string ) for _, k := range []string{ - annotation.BindingHostKey, - annotation.TaskIdKey, - annotation.SlaveIdKey, - annotation.OfferIdKey, + mesosmeta.BindingHostKey, + mesosmeta.TaskIdKey, + mesosmeta.SlaveIdKey, + mesosmeta.OfferIdKey, } { v, found := pod.Annotations[k] if !found { return nil, false, fmt.Errorf("incomplete metadata: missing value for pod annotation: %v", k) } switch k { - case annotation.BindingHostKey: + case mesosmeta.BindingHostKey: t.Spec.AssignedSlave = v - case annotation.SlaveIdKey: + case mesosmeta.SlaveIdKey: t.Spec.SlaveID = v - case annotation.OfferIdKey: + case mesosmeta.OfferIdKey: offerId = v - case annotation.TaskIdKey: + case mesosmeta.TaskIdKey: t.ID = v - case annotation.ExecutorIdKey: + case mesosmeta.ExecutorIdKey: // this is nowhere near sufficient to re-launch a task, but we really just // want this for tracking t.Spec.Executor = &mesos.ExecutorInfo{ExecutorId: mutil.NewExecutorID(v)} diff --git a/contrib/mesos/pkg/scheduler/podtask/pod_task_test.go b/contrib/mesos/pkg/scheduler/podtask/pod_task_test.go index ba15499cbfd..82b2daf19b0 100644 --- a/contrib/mesos/pkg/scheduler/podtask/pod_task_test.go +++ b/contrib/mesos/pkg/scheduler/podtask/pod_task_test.go @@ -299,14 +299,14 @@ func TestGeneratePodName(t *testing.T) { }, } name := generateTaskName(p) - expected := "foo.bar.pods" + expected := "foo.bar.pod" if name != expected { t.Fatalf("expected %q instead of %q", expected, name) } p.Namespace = "" name = generateTaskName(p) - expected = "foo.default.pods" + expected = "foo.default.pod" if name != expected { t.Fatalf("expected %q instead of %q", expected, name) } diff --git a/contrib/mesos/pkg/scheduler/service/service.go b/contrib/mesos/pkg/scheduler/service/service.go index 7fade99abaa..62653adc3bd 100644 --- a/contrib/mesos/pkg/scheduler/service/service.go +++ b/contrib/mesos/pkg/scheduler/service/service.go @@ -96,26 +96,27 @@ const ( ) type SchedulerServer struct { - port int - address net.IP - enableProfiling bool - authPath string - apiServerList []string - etcdServerList []string - allowPrivileged bool - executorPath string - proxyPath string - mesosMaster string - mesosUser string - frameworkRoles []string - defaultPodRoles []string - mesosAuthPrincipal string - mesosAuthSecretFile string - mesosCgroupPrefix string - mesosExecutorCPUs mresource.CPUShares - mesosExecutorMem mresource.MegaBytes - checkpoint bool - failoverTimeout float64 + port int + address net.IP + enableProfiling bool + authPath string + apiServerList []string + etcdServerList []string + allowPrivileged bool + executorPath string + proxyPath string + mesosMaster string + mesosUser string + frameworkRoles []string + defaultPodRoles []string + mesosAuthPrincipal string + mesosAuthSecretFile string + mesosCgroupPrefix string + mesosExecutorCPUs mresource.CPUShares + mesosExecutorMem mresource.MegaBytes + checkpoint bool + failoverTimeout float64 + generateTaskDiscovery bool executorLogV int executorBindall bool @@ -262,6 +263,7 @@ func (s *SchedulerServer) addCoreFlags(fs *pflag.FlagSet) { fs.Var(&s.mesosExecutorMem, "mesos-executor-mem", "Initial memory (MB) to allocate for each Mesos executor container.") fs.BoolVar(&s.checkpoint, "checkpoint", s.checkpoint, "Enable/disable checkpointing for the kubernetes-mesos framework.") fs.Float64Var(&s.failoverTimeout, "failover-timeout", s.failoverTimeout, fmt.Sprintf("Framework failover timeout, in sec.")) + fs.BoolVar(&s.generateTaskDiscovery, "mesos-generate-task-discovery", s.generateTaskDiscovery, "Enable/disable generation of DiscoveryInfo for Mesos tasks.") fs.UintVar(&s.driverPort, "driver-port", s.driverPort, "Port that the Mesos scheduler driver process should listen on.") fs.StringVar(&s.hostnameOverride, "hostname-override", s.hostnameOverride, "If non-empty, will use this string as identification instead of the actual hostname.") fs.Int64Var(&s.reconcileInterval, "reconcile-interval", s.reconcileInterval, "Interval at which to execute task reconciliation, in sec. Zero disables.") @@ -553,6 +555,7 @@ func (s *SchedulerServer) getDriver() (driver bindings.SchedulerDriver) { } func (s *SchedulerServer) Run(hks hyperkube.Interface, _ []string) error { + podtask.GenerateTaskDiscoveryEnabled = s.generateTaskDiscovery if n := len(s.frameworkRoles); n == 0 || n > 2 || (n == 2 && s.frameworkRoles[0] != "*" && s.frameworkRoles[1] != "*") { log.Fatalf(`only one custom role allowed in addition to "*"`) } diff --git a/hack/verify-flags/known-flags.txt b/hack/verify-flags/known-flags.txt index 04b9ac72aa9..504d3e0b62b 100644 --- a/hack/verify-flags/known-flags.txt +++ b/hack/verify-flags/known-flags.txt @@ -207,6 +207,7 @@ mesos-default-pod-roles mesos-executor-cpus mesos-executor-mem mesos-framework-roles +mesos-generate-task-discovery mesos-launch-grace-period mesos-master mesos-sandbox-overlay