diff --git a/contrib/mesos/pkg/executor/executor.go b/contrib/mesos/pkg/executor/executor.go index d0e85086b2c..e8eb5494ef4 100644 --- a/contrib/mesos/pkg/executor/executor.go +++ b/contrib/mesos/pkg/executor/executor.go @@ -103,6 +103,7 @@ type Executor struct { kubeletFinished <-chan struct{} // signals that kubelet Run() died exitFunc func(int) staticPodsConfigPath string + staticPodsFilters podutil.Filters launchGracePeriod time.Duration nodeInfos chan<- NodeInfo initCompleted chan struct{} // closes upon completion of Init() @@ -113,18 +114,21 @@ type Executor struct { } type Config struct { - APIClient *clientset.Clientset - Docker dockertools.DockerInterface - ShutdownAlert func() - SuicideTimeout time.Duration - KubeletFinished <-chan struct{} // signals that kubelet Run() died - ExitFunc func(int) - StaticPodsConfigPath string - LaunchGracePeriod time.Duration - NodeInfos chan<- NodeInfo - Registry Registry + APIClient *clientset.Clientset + Docker dockertools.DockerInterface + ShutdownAlert func() + SuicideTimeout time.Duration + KubeletFinished <-chan struct{} // signals that kubelet Run() died + ExitFunc func(int) + LaunchGracePeriod time.Duration + NodeInfos chan<- NodeInfo + Registry Registry + Options []Option // functional options } +// Option is a functional option type for Executor +type Option func(*Executor) + func (k *Executor) isConnected() bool { return connectedState == (&k.state).get() } @@ -139,22 +143,26 @@ func New(config Config) *Executor { launchGracePeriod = time.Duration(math.MaxInt64) } k := &Executor{ - state: disconnectedState, - terminate: make(chan struct{}), - outgoing: make(chan func() (mesos.Status, error), 1024), - dockerClient: config.Docker, - suicideTimeout: config.SuicideTimeout, - kubeletFinished: config.KubeletFinished, - suicideWatch: &suicideTimer{}, - shutdownAlert: config.ShutdownAlert, - exitFunc: config.ExitFunc, - staticPodsConfigPath: config.StaticPodsConfigPath, - launchGracePeriod: launchGracePeriod, - nodeInfos: config.NodeInfos, - initCompleted: make(chan struct{}), - registry: config.Registry, - kubeAPI: &clientAPIWrapper{config.APIClient}, - nodeAPI: &clientAPIWrapper{config.APIClient}, + state: disconnectedState, + terminate: make(chan struct{}), + outgoing: make(chan func() (mesos.Status, error), 1024), + dockerClient: config.Docker, + suicideTimeout: config.SuicideTimeout, + kubeletFinished: config.KubeletFinished, + suicideWatch: &suicideTimer{}, + shutdownAlert: config.ShutdownAlert, + exitFunc: config.ExitFunc, + launchGracePeriod: launchGracePeriod, + nodeInfos: config.NodeInfos, + initCompleted: make(chan struct{}), + registry: config.Registry, + kubeAPI: &clientAPIWrapper{config.APIClient}, + nodeAPI: &clientAPIWrapper{config.APIClient}, + } + + // apply functional options + for _, opt := range config.Options { + opt(k) } runtime.On(k.initCompleted, k.runSendLoop) @@ -165,6 +173,14 @@ func New(config Config) *Executor { return k } +// StaticPods creates a static pods Option for an Executor +func StaticPods(configPath string, f podutil.Filters) Option { + return func(k *Executor) { + k.staticPodsFilters = f + k.staticPodsConfigPath = configPath + } +} + // Done returns a chan that closes when the executor is shutting down func (k *Executor) Done() <-chan struct{} { return k.terminate @@ -226,12 +242,7 @@ func (k *Executor) Registered( log.Errorf("failed to register/transition to a connected state") } - if executorInfo != nil && executorInfo.Data != nil { - err := k.initializeStaticPodsSource(slaveInfo.GetHostname(), executorInfo.Data) - if err != nil { - log.Errorf("failed to initialize static pod configuration: %v", err) - } - } + k.initializeStaticPodsSource(executorInfo) annotations, err := annotationsFor(executorInfo) if err != nil { @@ -296,15 +307,17 @@ func (k *Executor) Reregistered(driver bindings.ExecutorDriver, slaveInfo *mesos } // initializeStaticPodsSource unzips the data slice into the static-pods directory -func (k *Executor) initializeStaticPodsSource(hostname string, data []byte) error { - log.V(2).Infof("extracting static pods config to %s", k.staticPodsConfigPath) - // annotate the pod with BindingHostKey so that the scheduler will ignore the pod - // once it appears in the pod registry. the stock kubelet sets the pod host in order - // to accomplish the same; we do this because the k8sm scheduler works differently. - annotator := podutil.Annotator(map[string]string{ - meta.BindingHostKey: hostname, - }) - return podutil.WriteToDir(annotator.Do(podutil.Gunzip(data)), k.staticPodsConfigPath) +func (k *Executor) initializeStaticPodsSource(executorInfo *mesos.ExecutorInfo) { + if data := executorInfo.GetData(); len(data) > 0 && k.staticPodsConfigPath != "" { + log.V(2).Infof("extracting static pods config to %s", k.staticPodsConfigPath) + err := podutil.WriteToDir( + k.staticPodsFilters.Do(podutil.Gunzip(executorInfo.Data)), + k.staticPodsConfigPath, + ) + if err != nil { + log.Errorf("failed to initialize static pod configuration: %v", err) + } + } } // Disconnected is called when the executor is disconnected from the slave. diff --git a/contrib/mesos/pkg/executor/executor_test.go b/contrib/mesos/pkg/executor/executor_test.go index 47a20421578..cc0a5e9c8f0 100644 --- a/contrib/mesos/pkg/executor/executor_test.go +++ b/contrib/mesos/pkg/executor/executor_test.go @@ -23,6 +23,7 @@ import ( "net/http/httptest" "os" "path/filepath" + "reflect" "sync" "sync/atomic" "testing" @@ -349,9 +350,7 @@ func TestExecutorInitializeStaticPodsSource(t *testing.T) { } // extract the pods into staticPodsConfigPath - hostname := "h1" - err = executor.initializeStaticPodsSource(hostname, gzipped) - assert.NoError(t, err) + executor.initializeStaticPodsSource(&mesosproto.ExecutorInfo{Data: gzipped}) actualpods, errs := podutil.ReadFromDir(staticPodsConfigPath) reportErrors(errs) @@ -359,6 +358,19 @@ func TestExecutorInitializeStaticPodsSource(t *testing.T) { list := podutil.List(actualpods) assert.NotNil(t, list) assert.Equal(t, expectedStaticPodsNum, len(list.Items)) + + var ( + expectedNames = map[string]struct{}{ + "spod-01": {}, + "spod-02": {}, + } + actualNames = map[string]struct{}{} + ) + for _, pod := range list.Items { + actualNames[pod.Name] = struct{}{} + } + assert.True(t, reflect.DeepEqual(expectedNames, actualNames), "expected %v instead of %v", expectedNames, actualNames) + wg.Wait() } diff --git a/contrib/mesos/pkg/executor/service/podsource.go b/contrib/mesos/pkg/executor/service/podsource.go deleted file mode 100644 index 5a546c8a0c7..00000000000 --- a/contrib/mesos/pkg/executor/service/podsource.go +++ /dev/null @@ -1,126 +0,0 @@ -/* -Copyright 2015 The Kubernetes Authors All rights reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package service - -import ( - "k8s.io/kubernetes/contrib/mesos/pkg/executor" - "k8s.io/kubernetes/pkg/api" - "k8s.io/kubernetes/pkg/client/cache" - kubetypes "k8s.io/kubernetes/pkg/kubelet/types" - - log "github.com/golang/glog" -) - -const ( - // if we don't use this source then the kubelet will do funny, mirror things. we alias - // this here for convenience. see the docs for sourceMesos for additional explanation. - // @see ConfigSourceAnnotationKey - mesosSource = kubetypes.ApiserverSource -) - -type ( - podName struct { - namespace, name string - } - - sourceMesos struct { - stop <-chan struct{} - out chan<- interface{} // never close this because pkg/util/config.mux doesn't handle that very well - registry executor.Registry - priorPodNames map[podName]string // map podName to taskID - } -) - -func newSourceMesos( - stop <-chan struct{}, - out chan<- interface{}, - podWatch *cache.ListWatch, - registry executor.Registry, -) { - source := &sourceMesos{ - stop: stop, - out: out, - registry: registry, - priorPodNames: make(map[podName]string), - } - // reflect changes from the watch into a chan, filtered to include only mirror pods - // (have an ConfigMirrorAnnotationKey attr) - cache.NewReflector( - podWatch, - &api.Pod{}, - cache.NewUndeltaStore(source.send, cache.MetaNamespaceKeyFunc), - 0, - ).RunUntil(stop) -} - -// send is an update callback invoked by NewUndeltaStore -func (source *sourceMesos) send(objs []interface{}) { - var ( - pods = make([]*api.Pod, 0, len(objs)) - podNames = make(map[podName]string, len(objs)) - ) - - for _, o := range objs { - p := o.(*api.Pod) - addPod := false - if _, ok := p.Annotations[kubetypes.ConfigMirrorAnnotationKey]; ok { - // pass through all mirror pods - addPod = true - } else if rpod, err := source.registry.Update(p); err == nil { - // pod is bound to a task, and the update is compatible - // so we'll allow it through - addPod = true - p = rpod.Pod() // use the (possibly) updated pod spec! - podNames[podName{p.Namespace, p.Name}] = rpod.Task() - } else if rpod != nil { - // we were able to ID the pod but the update still failed... - log.Warningf("failed to update registry for task %v pod %v/%v: %v", - rpod.Task(), p.Namespace, p.Name, err) - } else { - // unrecognized pod, skip! - log.V(2).Infof("skipping pod %v/%v", p.Namespace, p.Name) - } - - if addPod { - pods = append(pods, p) - } - } - - // detect when pods are deleted and notify the registry - for k, taskID := range source.priorPodNames { - if _, found := podNames[k]; !found { - source.registry.Remove(taskID) - } - } - - source.priorPodNames = podNames - - u := kubetypes.PodUpdate{ - Op: kubetypes.SET, - Pods: pods, - Source: mesosSource, - } - select { - case <-source.stop: - default: - select { - case <-source.stop: - case source.out <- u: - } - } - log.V(2).Infof("sent %d pod updates", len(pods)) -} diff --git a/contrib/mesos/pkg/executor/service/podsource/podsource.go b/contrib/mesos/pkg/executor/service/podsource/podsource.go new file mode 100644 index 00000000000..3d0c5388fcf --- /dev/null +++ b/contrib/mesos/pkg/executor/service/podsource/podsource.go @@ -0,0 +1,200 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package podsource + +import ( + "k8s.io/kubernetes/contrib/mesos/pkg/executor" + "k8s.io/kubernetes/contrib/mesos/pkg/podutil" + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/client/cache" + kubetypes "k8s.io/kubernetes/pkg/kubelet/types" + + log "github.com/golang/glog" +) + +type ( + filterType int + + podName struct { + namespace, name string + } + + // Filter is invoked for each snapshot of pod state that passes through this source + Filter interface { + // Before is invoked before any pods are evaluated + Before(podCount int) + // Accept returns true if this pod should be accepted by the source; a value + // of false results in the pod appearing to have been removed from apiserver. + // If true, the caller should use the output pod value for the remainder of + // the processing task. If false then the output pod value may be nil. + Accept(*api.Pod) (*api.Pod, bool) + // After is invoked after all pods have been evaluated + After() + } + + // FilterFunc is a simplified Filter implementation that only implements Filter.Accept, its + // Before and After implementations are noop. + FilterFunc func(*api.Pod) (*api.Pod, bool) + + Source struct { + stop <-chan struct{} + out chan<- interface{} // never close this because pkg/util/config.mux doesn't handle that very well + filters []Filter // additional filters to apply to pod objects + } + + Option func(*Source) +) + +const ( + // if we don't use this source then the kubelet will do funny, mirror things. we alias + // this here for convenience. see the docs for Source for additional explanation. + // @see ConfigSourceAnnotationKey + MesosSource = kubetypes.ApiserverSource +) + +func (f FilterFunc) Before(_ int) {} +func (f FilterFunc) After() {} +func (f FilterFunc) Accept(pod *api.Pod) (*api.Pod, bool) { return f(pod) } + +// Mesos spawns a new pod source that watches API server for changes and collaborates with +// executor.Registry to generate api.Pod objects in a fashion that's very Mesos-aware. +func Mesos( + stop <-chan struct{}, + out chan<- interface{}, + podWatch *cache.ListWatch, + registry executor.Registry, + options ...Option, +) { + source := &Source{ + stop: stop, + out: out, + filters: []Filter{ + FilterFunc(filterMirrorPod), + ®isteredPodFilter{registry: registry}, + }, + } + // note: any filters added by options should be applied after the defaults + for _, opt := range options { + opt(source) + } + // reflect changes from the watch into a chan, filtered to include only mirror pods + // (have an ConfigMirrorAnnotationKey attr) + cache.NewReflector( + podWatch, + &api.Pod{}, + cache.NewUndeltaStore(source.send, cache.MetaNamespaceKeyFunc), + 0, + ).RunUntil(stop) +} + +func filterMirrorPod(p *api.Pod) (*api.Pod, bool) { + _, ok := (*p).Annotations[kubetypes.ConfigMirrorAnnotationKey] + return p, ok +} + +type registeredPodFilter struct { + priorPodNames, podNames map[podName]string // maps a podName to a taskID + registry executor.Registry +} + +func (rpf *registeredPodFilter) Before(podCount int) { + rpf.priorPodNames = rpf.podNames + rpf.podNames = make(map[podName]string, podCount) +} + +func (rpf *registeredPodFilter) After() { + // detect when pods are deleted and notify the registry + for k, taskID := range rpf.priorPodNames { + if _, found := rpf.podNames[k]; !found { + rpf.registry.Remove(taskID) + } + } +} + +func (rpf *registeredPodFilter) Accept(p *api.Pod) (*api.Pod, bool) { + rpod, err := rpf.registry.Update(p) + if err == nil { + // pod is bound to a task, and the update is compatible + // so we'll allow it through + p = rpod.Pod() // use the (possibly) updated pod spec! + rpf.podNames[podName{p.Namespace, p.Name}] = rpod.Task() + return p, true + } + if rpod != nil { + // we were able to ID the pod but the update still failed... + log.Warningf("failed to update registry for task %v pod %v/%v: %v", + rpod.Task(), p.Namespace, p.Name, err) + } + return nil, false +} + +// send is an update callback invoked by NewUndeltaStore; it applies all of source.filters +// to the incoming pod snapshot and forwards a PodUpdate that contains a snapshot of all +// the pods that were accepted by the filters. +func (source *Source) send(objs []interface{}) { + var ( + podCount = len(objs) + pods = make([]*api.Pod, 0, podCount) + ) + + for _, f := range source.filters { + f.Before(podCount) + } +foreachPod: + for _, o := range objs { + p := o.(*api.Pod) + for _, f := range source.filters { + if p, ok := f.Accept(p); ok { + pods = append(pods, p) + continue foreachPod + } + } + // unrecognized pod + log.V(2).Infof("skipping pod %v/%v", p.Namespace, p.Name) + } + // TODO(jdef) should these be applied in reverse order instead? + for _, f := range source.filters { + f.After() + } + + u := kubetypes.PodUpdate{ + Op: kubetypes.SET, + Pods: pods, + Source: MesosSource, + } + select { + case <-source.stop: + case source.out <- u: + log.V(2).Infof("sent %d pod updates", len(pods)) + } +} + +func ContainerEnvOverlay(env []api.EnvVar) Option { + return func(s *Source) { + // prepend this filter so that it impacts *all* pods running on the slave + s.filters = append([]Filter{filterContainerEnvOverlay(env)}, s.filters...) + } +} + +func filterContainerEnvOverlay(env []api.EnvVar) FilterFunc { + f := podutil.Environment(env) + return func(pod *api.Pod) (*api.Pod, bool) { + f(pod) + // we should't vote, let someone else decide whether the pod gets accepted + return pod, false + } +} diff --git a/contrib/mesos/pkg/executor/service/service.go b/contrib/mesos/pkg/executor/service/service.go index 85d8de26e03..23b1b626936 100644 --- a/contrib/mesos/pkg/executor/service/service.go +++ b/contrib/mesos/pkg/executor/service/service.go @@ -30,7 +30,10 @@ import ( "k8s.io/kubernetes/cmd/kubelet/app/options" "k8s.io/kubernetes/contrib/mesos/pkg/executor" "k8s.io/kubernetes/contrib/mesos/pkg/executor/config" + "k8s.io/kubernetes/contrib/mesos/pkg/executor/service/podsource" "k8s.io/kubernetes/contrib/mesos/pkg/hyperkube" + "k8s.io/kubernetes/contrib/mesos/pkg/podutil" + "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/meta" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/client/cache" clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" @@ -43,10 +46,20 @@ import ( kubetypes "k8s.io/kubernetes/pkg/kubelet/types" ) +// TODO(jdef): passing the value of envContainerID to all docker containers instantiated +// through the kubelet is part of a strategy to enable orphan container GC; this can all +// be ripped out once we have a kubelet runtime that leverages Mesos native containerization. + +// envContainerID is the name of the environment variable that contains the +// Mesos-assigned container ID of the Executor. +const envContainerID = "MESOS_EXECUTOR_CONTAINER_UUID" + type KubeletExecutorServer struct { *options.KubeletServer SuicideTimeout time.Duration LaunchGracePeriod time.Duration + + containerID string } func NewKubeletExecutorServer() *KubeletExecutorServer { @@ -78,15 +91,31 @@ func (s *KubeletExecutorServer) runExecutor( apiclient *clientset.Clientset, registry executor.Registry, ) (<-chan struct{}, error) { + staticPodFilters := podutil.Filters{ + // annotate the pod with BindingHostKey so that the scheduler will ignore the pod + // once it appears in the pod registry. the stock kubelet sets the pod host in order + // to accomplish the same; we do this because the k8sm scheduler works differently. + podutil.Annotator(map[string]string{ + meta.BindingHostKey: s.HostnameOverride, + }), + } + if s.containerID != "" { + // tag all pod containers with the containerID so that they can be properly GC'd by Mesos + staticPodFilters = append(staticPodFilters, podutil.Environment([]api.EnvVar{ + {Name: envContainerID, Value: s.containerID}, + })) + } exec := executor.New(executor.Config{ - Registry: registry, - APIClient: apiclient, - Docker: dockertools.ConnectToDockerOrDie(s.DockerEndpoint), - SuicideTimeout: s.SuicideTimeout, - KubeletFinished: kubeletFinished, - ExitFunc: os.Exit, - StaticPodsConfigPath: staticPodsConfigPath, - NodeInfos: nodeInfos, + Registry: registry, + APIClient: apiclient, + Docker: dockertools.ConnectToDockerOrDie(s.DockerEndpoint), + SuicideTimeout: s.SuicideTimeout, + KubeletFinished: kubeletFinished, + ExitFunc: os.Exit, + NodeInfos: nodeInfos, + Options: []executor.Option{ + executor.StaticPods(staticPodsConfigPath, staticPodFilters), + }, }) // initialize driver and initialize the executor with it @@ -200,7 +229,19 @@ func (s *KubeletExecutorServer) runKubelet( }() // create main pod source, it will stop generating events once executorDone is closed - newSourceMesos(executorDone, kcfg.PodConfig.Channel(mesosSource), podLW, registry) + var containerOptions []podsource.Option + if s.containerID != "" { + // tag all pod containers with the containerID so that they can be properly GC'd by Mesos + containerOptions = append(containerOptions, podsource.ContainerEnvOverlay([]api.EnvVar{ + {Name: envContainerID, Value: s.containerID}, + })) + kcfg.ContainerRuntimeOptions = append(kcfg.ContainerRuntimeOptions, + dockertools.PodInfraContainerEnv(map[string]string{ + envContainerID: s.containerID, + })) + } + + podsource.Mesos(executorDone, kcfg.PodConfig.Channel(podsource.MesosSource), podLW, registry, containerOptions...) // create static-pods directory file source log.V(2).Infof("initializing static pods source factory, configured at path %q", staticPodsConfigPath) @@ -229,6 +270,12 @@ func (s *KubeletExecutorServer) Run(hks hyperkube.Interface, _ []string) error { return err } + // we're expecting that either Mesos or the minion process will set this for us + s.containerID = os.Getenv(envContainerID) + if s.containerID == "" { + log.Warningf("missing expected environment variable %q", envContainerID) + } + // create apiserver client var apiclient *clientset.Clientset clientConfig, err := kubeletapp.CreateAPIServerClientConfig(s.KubeletServer) diff --git a/contrib/mesos/pkg/minion/server.go b/contrib/mesos/pkg/minion/server.go index 7654b9d250a..2498020f4fa 100644 --- a/contrib/mesos/pkg/minion/server.go +++ b/contrib/mesos/pkg/minion/server.go @@ -116,20 +116,22 @@ func filterArgsByFlagSet(args []string, flags *pflag.FlagSet) ([]string, []strin return matched, notMatched } -func findMesosCgroup(prefix string) string { +func findMesosCgroup(prefix string) (cgroupPath string, containerID string) { // derive our cgroup from MESOS_DIRECTORY environment mesosDir := os.Getenv("MESOS_DIRECTORY") if mesosDir == "" { log.V(2).Infof("cannot derive executor's cgroup because MESOS_DIRECTORY is empty") - return "" + return } - containerId := path.Base(mesosDir) - if containerId == "" { + containerID = path.Base(mesosDir) + if containerID == "" { log.V(2).Infof("cannot derive executor's cgroup from MESOS_DIRECTORY=%q", mesosDir) - return "" + return } - return path.Join("/", prefix, containerId) + + cgroupPath = path.Join("/", prefix, containerID) + return } func (ms *MinionServer) launchProxyServer() { @@ -154,13 +156,13 @@ func (ms *MinionServer) launchProxyServer() { args = append(args, fmt.Sprintf("--hostname-override=%s", ms.KubeletExecutorServer.HostnameOverride)) } - ms.launchHyperkubeServer(hyperkube.CommandProxy, args, proxyLogFilename, nil) + ms.launchHyperkubeServer(hyperkube.CommandProxy, args, proxyLogFilename) } // launchExecutorServer returns a chan that closes upon kubelet-executor death. since the kubelet- // executor doesn't support failover right now, the right thing to do is to fail completely since all // pods will be lost upon restart and we want mesos to recover the resources from them. -func (ms *MinionServer) launchExecutorServer() <-chan struct{} { +func (ms *MinionServer) launchExecutorServer(containerID string) <-chan struct{} { allArgs := os.Args[1:] // filter out minion flags, leaving those for the executor @@ -174,25 +176,24 @@ func (ms *MinionServer) launchExecutorServer() <-chan struct{} { executorArgs = append(executorArgs, "--cgroup-root="+ms.cgroupRoot) } + // forward containerID so that the executor may pass it along to containers that it launches + var ctidOpt tasks.Option + ctidOpt = func(t *tasks.Task) tasks.Option { + oldenv := t.Env[:] + t.Env = append(t.Env, "MESOS_EXECUTOR_CONTAINER_UUID="+containerID) + return func(t2 *tasks.Task) tasks.Option { + t2.Env = oldenv + return ctidOpt + } + } + // run executor and quit minion server when this exits cleanly execDied := make(chan struct{}) - decorator := func(t *tasks.Task) *tasks.Task { - t.Finished = func(_ bool) bool { - // this func implements the task.finished spec, so when the executor exits - // we return false to indicate that it should not be restarted. we also - // close execDied to signal interested listeners. - close(execDied) - return false - } - // since we only expect to die once, and there is no restart; don't delay any longer than needed - t.RestartDelay = 0 - return t - } - ms.launchHyperkubeServer(hyperkube.CommandExecutor, executorArgs, executorLogFilename, decorator) + ms.launchHyperkubeServer(hyperkube.CommandExecutor, executorArgs, executorLogFilename, tasks.NoRespawn(execDied), ctidOpt) return execDied } -func (ms *MinionServer) launchHyperkubeServer(server string, args []string, logFileName string, decorator func(*tasks.Task) *tasks.Task) { +func (ms *MinionServer) launchHyperkubeServer(server string, args []string, logFileName string, options ...tasks.Option) { log.V(2).Infof("Spawning hyperkube %v with args '%+v'", server, args) kmArgs := append([]string{server}, args...) @@ -215,33 +216,39 @@ func (ms *MinionServer) launchHyperkubeServer(server string, args []string, logF } } - // use given environment, but add /usr/sbin and $SANDBOX/bin to the path for the iptables binary used in kube-proxy - var kmEnv []string - env := os.Environ() - kmEnv = make([]string, 0, len(env)) - for _, e := range env { - if !strings.HasPrefix(e, "PATH=") { - kmEnv = append(kmEnv, e) - } else { - if ms.pathOverride != "" { - e = "PATH=" + ms.pathOverride - } - pwd, err := os.Getwd() - if err != nil { - panic(fmt.Errorf("Cannot get current directory: %v", err)) - } - kmEnv = append(kmEnv, fmt.Sprintf("%s:%s", e, path.Join(pwd, "bin"))) - } - } + // prepend env, allow later options to customize further + options = append([]tasks.Option{tasks.Environment(os.Environ()), ms.applyPathOverride()}, options...) - t := tasks.New(server, ms.kmBinary, kmArgs, kmEnv, writerFunc) - if decorator != nil { - t = decorator(t) - } + t := tasks.New(server, ms.kmBinary, kmArgs, writerFunc, options...) go t.Start() ms.tasks = append(ms.tasks, t) } +// applyPathOverride overrides PATH and also adds $SANDBOX/bin (needed for locating bundled binary deps +// as well as external deps like iptables) +func (ms *MinionServer) applyPathOverride() tasks.Option { + return func(t *tasks.Task) tasks.Option { + kmEnv := make([]string, 0, len(t.Env)) + for _, e := range t.Env { + if !strings.HasPrefix(e, "PATH=") { + kmEnv = append(kmEnv, e) + } else { + if ms.pathOverride != "" { + e = "PATH=" + ms.pathOverride + } + pwd, err := os.Getwd() + if err != nil { + panic(fmt.Errorf("Cannot get current directory: %v", err)) + } + kmEnv = append(kmEnv, fmt.Sprintf("%s:%s", e, path.Join(pwd, "bin"))) + } + } + oldenv := t.Env + t.Env = kmEnv + return tasks.Environment(oldenv) + } +} + // runs the main kubelet loop, closing the kubeletFinished chan when the loop exits. // never returns. func (ms *MinionServer) Run(hks hyperkube.Interface, _ []string) error { @@ -263,7 +270,8 @@ func (ms *MinionServer) Run(hks hyperkube.Interface, _ []string) error { // - pod container cgroup root (e.g. docker cgroup-parent, optionally; see comments below) // - parent of kubelet container // - parent of kube-proxy container - ms.mesosCgroup = findMesosCgroup(ms.cgroupPrefix) + containerID := "" + ms.mesosCgroup, containerID = findMesosCgroup(ms.cgroupPrefix) log.Infof("discovered mesos cgroup at %q", ms.mesosCgroup) // hack alert, this helps to work around systemd+docker+mesos integration problems @@ -285,7 +293,7 @@ func (ms *MinionServer) Run(hks hyperkube.Interface, _ []string) error { } // abort closes when the kubelet-executor dies - abort := ms.launchExecutorServer() + abort := ms.launchExecutorServer(containerID) shouldQuit := termSignalListener(abort) te := tasks.MergeOutput(ms.tasks, shouldQuit) diff --git a/contrib/mesos/pkg/minion/tasks/task.go b/contrib/mesos/pkg/minion/tasks/task.go index 9748eaa03d7..4650684a76d 100644 --- a/contrib/mesos/pkg/minion/tasks/task.go +++ b/contrib/mesos/pkg/minion/tasks/task.go @@ -88,13 +88,13 @@ func (cp *cmdProcess) Kill(force bool) (int, error) { // logging and restart handling as well as provides event channels for communicating process // termination and errors related to process management. type Task struct { + Env []string // optional: process environment override Finished func(restarting bool) bool // callback invoked when a task process has completed; if `restarting` then it will be restarted if it returns true RestartDelay time.Duration // interval between repeated task restarts name string // required: unique name for this task bin string // required: path to executable args []string // optional: process arguments - env []string // optional: process environment override createLogger func() io.WriteCloser // factory func that builds a log writer cmd systemProcess // process that we started completedCh chan *Completion // reports exit codes encountered when task processes exit, or errors during process management @@ -107,12 +107,11 @@ type Task struct { // New builds a newly initialized task object but does not start any processes for it. callers // are expected to invoke task.run(...) on their own. -func New(name, bin string, args, env []string, cl func() io.WriteCloser) *Task { +func New(name, bin string, args []string, cl func() io.WriteCloser, options ...Option) *Task { t := &Task{ name: name, bin: bin, args: args, - env: env, createLogger: cl, completedCh: make(chan *Completion), shouldQuit: make(chan struct{}), @@ -121,6 +120,9 @@ func New(name, bin string, args, env []string, cl func() io.WriteCloser) *Task { Finished: func(restarting bool) bool { return restarting }, } t.killFunc = func(force bool) (int, error) { return t.cmd.Kill(force) } + for _, opt := range options { + opt(t) + } return t } @@ -235,8 +237,8 @@ func notStartedTask(t *Task) taskStateFn { } t.initLogging(stderrLogs) - if len(t.env) > 0 { - cmd.Env = t.env + if len(t.Env) > 0 { + cmd.Env = t.Env } cmd.SysProcAttr = sysProcAttr() @@ -389,3 +391,41 @@ func MergeOutput(tasks []*Task, shouldQuit <-chan struct{}) Events { ei := newEventsImpl(tclistener, done) return ei } + +// Option is a functional option type for a Task that returns an "undo" Option after upon modifying the Task +type Option func(*Task) Option + +// NoRespawn configures the Task lifecycle such that it will not respawn upon termination +func NoRespawn(listener chan<- struct{}) Option { + return func(t *Task) Option { + finished, restartDelay := t.Finished, t.RestartDelay + + t.Finished = func(_ bool) bool { + // this func implements the task.finished spec, so when the task exits + // we return false to indicate that it should not be restarted. we also + // close execDied to signal interested listeners. + if listener != nil { + close(listener) + listener = nil + } + return false + } + + // since we only expect to die once, and there is no restart; don't delay any longer than needed + t.RestartDelay = 0 + + return func(t2 *Task) Option { + t2.Finished, t2.RestartDelay = finished, restartDelay + return NoRespawn(listener) + } + } +} + +// Environment customizes the process runtime environment for a Task +func Environment(env []string) Option { + return func(t *Task) Option { + oldenv := t.Env + t.Env = env[:] + return Environment(oldenv) + } +} diff --git a/contrib/mesos/pkg/minion/tasks/task_test.go b/contrib/mesos/pkg/minion/tasks/task_test.go index a5717f391a9..0df577444f1 100644 --- a/contrib/mesos/pkg/minion/tasks/task_test.go +++ b/contrib/mesos/pkg/minion/tasks/task_test.go @@ -77,7 +77,7 @@ func newFakeProcess() *fakeProcess { func TestBadLogger(t *testing.T) { err := errors.New("qux") fp := newFakeProcess() - tt := New("foo", "bar", nil, nil, func() io.WriteCloser { + tt := New("foo", "bar", nil, func() io.WriteCloser { defer func() { fp.pid = 123 // sanity check fp.Kill(false) // this causes Wait() to return @@ -126,7 +126,7 @@ func TestMergeOutput(t *testing.T) { tasksDone.Add(2) tasksStarted.Add(2) - t1 := New("foo", "", nil, nil, devNull) + t1 := New("foo", "", nil, devNull) t1exited := make(chan struct{}) t1.RestartDelay = 0 // don't slow the test down for no good reason t1.Finished = func(ok bool) bool { @@ -145,7 +145,7 @@ func TestMergeOutput(t *testing.T) { return taskRunning }) - t2 := New("bar", "", nil, nil, devNull) + t2 := New("bar", "", nil, devNull) t2exited := make(chan struct{}) t2.RestartDelay = 0 // don't slow the test down for no good reason t2.Finished = func(ok bool) bool { @@ -235,7 +235,7 @@ func (t *fakeTimer) reset() { t.ch = nil } func TestAfterDeath(t *testing.T) { // test kill escalation since that's not covered by other unit tests - t1 := New("foo", "", nil, nil, devNull) + t1 := New("foo", "", nil, devNull) kills := 0 waitCh := make(chan *Completion, 1) timer := &fakeTimer{} diff --git a/contrib/mesos/pkg/podutil/filters.go b/contrib/mesos/pkg/podutil/filters.go index f90009b5d91..e44e2d169bd 100644 --- a/contrib/mesos/pkg/podutil/filters.go +++ b/contrib/mesos/pkg/podutil/filters.go @@ -23,6 +23,7 @@ import ( type defaultFunc func(pod *api.Pod) error +// return true if the pod passes the filter type FilterFunc func(pod *api.Pod) (bool, error) type Filters []FilterFunc @@ -47,6 +48,39 @@ func Annotator(m map[string]string) FilterFunc { }) } +// Environment returns a filter that writes environment variables into pod containers +func Environment(env []api.EnvVar) FilterFunc { + // index the envvar names + var ( + envcount = len(env) + m = make(map[string]int, envcount) + ) + for j := range env { + m[env[j].Name] = j + } + return func(pod *api.Pod) (bool, error) { + for i := range pod.Spec.Containers { + ct := &pod.Spec.Containers[i] + dup := make(map[string]struct{}, envcount) + // overwrite dups (and remember them for later) + for j := range ct.Env { + name := ct.Env[j].Name + if k, ok := m[name]; ok { + ct.Env[j] = env[k] + dup[name] = struct{}{} + } + } + // append non-dups into ct.Env + for name, k := range m { + if _, ok := dup[name]; !ok { + ct.Env = append(ct.Env, env[k]) + } + } + } + return true, nil + } +} + // Stream returns a chan of pods that yields each pod from the given list. // No pods are yielded if err is non-nil. func Stream(list *api.PodList, err error) <-chan *api.Pod { @@ -65,6 +99,14 @@ func Stream(list *api.PodList, err error) <-chan *api.Pod { return out } +func (filters Filters) Do(in <-chan *api.Pod) (out <-chan *api.Pod) { + out = in + for _, f := range filters { + out = f.Do(out) + } + return +} + func (filter FilterFunc) Do(in <-chan *api.Pod) <-chan *api.Pod { out := make(chan *api.Pod) go func() {