Merge pull request #20845 from mesosphere/jdef_mesos_env_injection

Auto commit by PR queue bot
This commit is contained in:
k8s-merge-robot 2016-02-10 12:13:56 -08:00
commit c70c7fde4d
9 changed files with 470 additions and 234 deletions

View File

@ -103,6 +103,7 @@ type Executor struct {
kubeletFinished <-chan struct{} // signals that kubelet Run() died
exitFunc func(int)
staticPodsConfigPath string
staticPodsFilters podutil.Filters
launchGracePeriod time.Duration
nodeInfos chan<- NodeInfo
initCompleted chan struct{} // closes upon completion of Init()
@ -113,18 +114,21 @@ type Executor struct {
}
type Config struct {
APIClient *clientset.Clientset
Docker dockertools.DockerInterface
ShutdownAlert func()
SuicideTimeout time.Duration
KubeletFinished <-chan struct{} // signals that kubelet Run() died
ExitFunc func(int)
StaticPodsConfigPath string
LaunchGracePeriod time.Duration
NodeInfos chan<- NodeInfo
Registry Registry
APIClient *clientset.Clientset
Docker dockertools.DockerInterface
ShutdownAlert func()
SuicideTimeout time.Duration
KubeletFinished <-chan struct{} // signals that kubelet Run() died
ExitFunc func(int)
LaunchGracePeriod time.Duration
NodeInfos chan<- NodeInfo
Registry Registry
Options []Option // functional options
}
// Option is a functional option type for Executor
type Option func(*Executor)
func (k *Executor) isConnected() bool {
return connectedState == (&k.state).get()
}
@ -139,22 +143,26 @@ func New(config Config) *Executor {
launchGracePeriod = time.Duration(math.MaxInt64)
}
k := &Executor{
state: disconnectedState,
terminate: make(chan struct{}),
outgoing: make(chan func() (mesos.Status, error), 1024),
dockerClient: config.Docker,
suicideTimeout: config.SuicideTimeout,
kubeletFinished: config.KubeletFinished,
suicideWatch: &suicideTimer{},
shutdownAlert: config.ShutdownAlert,
exitFunc: config.ExitFunc,
staticPodsConfigPath: config.StaticPodsConfigPath,
launchGracePeriod: launchGracePeriod,
nodeInfos: config.NodeInfos,
initCompleted: make(chan struct{}),
registry: config.Registry,
kubeAPI: &clientAPIWrapper{config.APIClient},
nodeAPI: &clientAPIWrapper{config.APIClient},
state: disconnectedState,
terminate: make(chan struct{}),
outgoing: make(chan func() (mesos.Status, error), 1024),
dockerClient: config.Docker,
suicideTimeout: config.SuicideTimeout,
kubeletFinished: config.KubeletFinished,
suicideWatch: &suicideTimer{},
shutdownAlert: config.ShutdownAlert,
exitFunc: config.ExitFunc,
launchGracePeriod: launchGracePeriod,
nodeInfos: config.NodeInfos,
initCompleted: make(chan struct{}),
registry: config.Registry,
kubeAPI: &clientAPIWrapper{config.APIClient},
nodeAPI: &clientAPIWrapper{config.APIClient},
}
// apply functional options
for _, opt := range config.Options {
opt(k)
}
runtime.On(k.initCompleted, k.runSendLoop)
@ -165,6 +173,14 @@ func New(config Config) *Executor {
return k
}
// StaticPods creates a static pods Option for an Executor
func StaticPods(configPath string, f podutil.Filters) Option {
return func(k *Executor) {
k.staticPodsFilters = f
k.staticPodsConfigPath = configPath
}
}
// Done returns a chan that closes when the executor is shutting down
func (k *Executor) Done() <-chan struct{} {
return k.terminate
@ -226,12 +242,7 @@ func (k *Executor) Registered(
log.Errorf("failed to register/transition to a connected state")
}
if executorInfo != nil && executorInfo.Data != nil {
err := k.initializeStaticPodsSource(slaveInfo.GetHostname(), executorInfo.Data)
if err != nil {
log.Errorf("failed to initialize static pod configuration: %v", err)
}
}
k.initializeStaticPodsSource(executorInfo)
annotations, err := annotationsFor(executorInfo)
if err != nil {
@ -296,15 +307,17 @@ func (k *Executor) Reregistered(driver bindings.ExecutorDriver, slaveInfo *mesos
}
// initializeStaticPodsSource unzips the data slice into the static-pods directory
func (k *Executor) initializeStaticPodsSource(hostname string, data []byte) error {
log.V(2).Infof("extracting static pods config to %s", k.staticPodsConfigPath)
// annotate the pod with BindingHostKey so that the scheduler will ignore the pod
// once it appears in the pod registry. the stock kubelet sets the pod host in order
// to accomplish the same; we do this because the k8sm scheduler works differently.
annotator := podutil.Annotator(map[string]string{
meta.BindingHostKey: hostname,
})
return podutil.WriteToDir(annotator.Do(podutil.Gunzip(data)), k.staticPodsConfigPath)
func (k *Executor) initializeStaticPodsSource(executorInfo *mesos.ExecutorInfo) {
if data := executorInfo.GetData(); len(data) > 0 && k.staticPodsConfigPath != "" {
log.V(2).Infof("extracting static pods config to %s", k.staticPodsConfigPath)
err := podutil.WriteToDir(
k.staticPodsFilters.Do(podutil.Gunzip(executorInfo.Data)),
k.staticPodsConfigPath,
)
if err != nil {
log.Errorf("failed to initialize static pod configuration: %v", err)
}
}
}
// Disconnected is called when the executor is disconnected from the slave.

View File

@ -23,6 +23,7 @@ import (
"net/http/httptest"
"os"
"path/filepath"
"reflect"
"sync"
"sync/atomic"
"testing"
@ -349,9 +350,7 @@ func TestExecutorInitializeStaticPodsSource(t *testing.T) {
}
// extract the pods into staticPodsConfigPath
hostname := "h1"
err = executor.initializeStaticPodsSource(hostname, gzipped)
assert.NoError(t, err)
executor.initializeStaticPodsSource(&mesosproto.ExecutorInfo{Data: gzipped})
actualpods, errs := podutil.ReadFromDir(staticPodsConfigPath)
reportErrors(errs)
@ -359,6 +358,19 @@ func TestExecutorInitializeStaticPodsSource(t *testing.T) {
list := podutil.List(actualpods)
assert.NotNil(t, list)
assert.Equal(t, expectedStaticPodsNum, len(list.Items))
var (
expectedNames = map[string]struct{}{
"spod-01": {},
"spod-02": {},
}
actualNames = map[string]struct{}{}
)
for _, pod := range list.Items {
actualNames[pod.Name] = struct{}{}
}
assert.True(t, reflect.DeepEqual(expectedNames, actualNames), "expected %v instead of %v", expectedNames, actualNames)
wg.Wait()
}

View File

@ -1,126 +0,0 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package service
import (
"k8s.io/kubernetes/contrib/mesos/pkg/executor"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/cache"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
log "github.com/golang/glog"
)
const (
// if we don't use this source then the kubelet will do funny, mirror things. we alias
// this here for convenience. see the docs for sourceMesos for additional explanation.
// @see ConfigSourceAnnotationKey
mesosSource = kubetypes.ApiserverSource
)
type (
podName struct {
namespace, name string
}
sourceMesos struct {
stop <-chan struct{}
out chan<- interface{} // never close this because pkg/util/config.mux doesn't handle that very well
registry executor.Registry
priorPodNames map[podName]string // map podName to taskID
}
)
func newSourceMesos(
stop <-chan struct{},
out chan<- interface{},
podWatch *cache.ListWatch,
registry executor.Registry,
) {
source := &sourceMesos{
stop: stop,
out: out,
registry: registry,
priorPodNames: make(map[podName]string),
}
// reflect changes from the watch into a chan, filtered to include only mirror pods
// (have an ConfigMirrorAnnotationKey attr)
cache.NewReflector(
podWatch,
&api.Pod{},
cache.NewUndeltaStore(source.send, cache.MetaNamespaceKeyFunc),
0,
).RunUntil(stop)
}
// send is an update callback invoked by NewUndeltaStore
func (source *sourceMesos) send(objs []interface{}) {
var (
pods = make([]*api.Pod, 0, len(objs))
podNames = make(map[podName]string, len(objs))
)
for _, o := range objs {
p := o.(*api.Pod)
addPod := false
if _, ok := p.Annotations[kubetypes.ConfigMirrorAnnotationKey]; ok {
// pass through all mirror pods
addPod = true
} else if rpod, err := source.registry.Update(p); err == nil {
// pod is bound to a task, and the update is compatible
// so we'll allow it through
addPod = true
p = rpod.Pod() // use the (possibly) updated pod spec!
podNames[podName{p.Namespace, p.Name}] = rpod.Task()
} else if rpod != nil {
// we were able to ID the pod but the update still failed...
log.Warningf("failed to update registry for task %v pod %v/%v: %v",
rpod.Task(), p.Namespace, p.Name, err)
} else {
// unrecognized pod, skip!
log.V(2).Infof("skipping pod %v/%v", p.Namespace, p.Name)
}
if addPod {
pods = append(pods, p)
}
}
// detect when pods are deleted and notify the registry
for k, taskID := range source.priorPodNames {
if _, found := podNames[k]; !found {
source.registry.Remove(taskID)
}
}
source.priorPodNames = podNames
u := kubetypes.PodUpdate{
Op: kubetypes.SET,
Pods: pods,
Source: mesosSource,
}
select {
case <-source.stop:
default:
select {
case <-source.stop:
case source.out <- u:
}
}
log.V(2).Infof("sent %d pod updates", len(pods))
}

View File

@ -0,0 +1,200 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package podsource
import (
"k8s.io/kubernetes/contrib/mesos/pkg/executor"
"k8s.io/kubernetes/contrib/mesos/pkg/podutil"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/cache"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
log "github.com/golang/glog"
)
type (
filterType int
podName struct {
namespace, name string
}
// Filter is invoked for each snapshot of pod state that passes through this source
Filter interface {
// Before is invoked before any pods are evaluated
Before(podCount int)
// Accept returns true if this pod should be accepted by the source; a value
// of false results in the pod appearing to have been removed from apiserver.
// If true, the caller should use the output pod value for the remainder of
// the processing task. If false then the output pod value may be nil.
Accept(*api.Pod) (*api.Pod, bool)
// After is invoked after all pods have been evaluated
After()
}
// FilterFunc is a simplified Filter implementation that only implements Filter.Accept, its
// Before and After implementations are noop.
FilterFunc func(*api.Pod) (*api.Pod, bool)
Source struct {
stop <-chan struct{}
out chan<- interface{} // never close this because pkg/util/config.mux doesn't handle that very well
filters []Filter // additional filters to apply to pod objects
}
Option func(*Source)
)
const (
// if we don't use this source then the kubelet will do funny, mirror things. we alias
// this here for convenience. see the docs for Source for additional explanation.
// @see ConfigSourceAnnotationKey
MesosSource = kubetypes.ApiserverSource
)
func (f FilterFunc) Before(_ int) {}
func (f FilterFunc) After() {}
func (f FilterFunc) Accept(pod *api.Pod) (*api.Pod, bool) { return f(pod) }
// Mesos spawns a new pod source that watches API server for changes and collaborates with
// executor.Registry to generate api.Pod objects in a fashion that's very Mesos-aware.
func Mesos(
stop <-chan struct{},
out chan<- interface{},
podWatch *cache.ListWatch,
registry executor.Registry,
options ...Option,
) {
source := &Source{
stop: stop,
out: out,
filters: []Filter{
FilterFunc(filterMirrorPod),
&registeredPodFilter{registry: registry},
},
}
// note: any filters added by options should be applied after the defaults
for _, opt := range options {
opt(source)
}
// reflect changes from the watch into a chan, filtered to include only mirror pods
// (have an ConfigMirrorAnnotationKey attr)
cache.NewReflector(
podWatch,
&api.Pod{},
cache.NewUndeltaStore(source.send, cache.MetaNamespaceKeyFunc),
0,
).RunUntil(stop)
}
func filterMirrorPod(p *api.Pod) (*api.Pod, bool) {
_, ok := (*p).Annotations[kubetypes.ConfigMirrorAnnotationKey]
return p, ok
}
type registeredPodFilter struct {
priorPodNames, podNames map[podName]string // maps a podName to a taskID
registry executor.Registry
}
func (rpf *registeredPodFilter) Before(podCount int) {
rpf.priorPodNames = rpf.podNames
rpf.podNames = make(map[podName]string, podCount)
}
func (rpf *registeredPodFilter) After() {
// detect when pods are deleted and notify the registry
for k, taskID := range rpf.priorPodNames {
if _, found := rpf.podNames[k]; !found {
rpf.registry.Remove(taskID)
}
}
}
func (rpf *registeredPodFilter) Accept(p *api.Pod) (*api.Pod, bool) {
rpod, err := rpf.registry.Update(p)
if err == nil {
// pod is bound to a task, and the update is compatible
// so we'll allow it through
p = rpod.Pod() // use the (possibly) updated pod spec!
rpf.podNames[podName{p.Namespace, p.Name}] = rpod.Task()
return p, true
}
if rpod != nil {
// we were able to ID the pod but the update still failed...
log.Warningf("failed to update registry for task %v pod %v/%v: %v",
rpod.Task(), p.Namespace, p.Name, err)
}
return nil, false
}
// send is an update callback invoked by NewUndeltaStore; it applies all of source.filters
// to the incoming pod snapshot and forwards a PodUpdate that contains a snapshot of all
// the pods that were accepted by the filters.
func (source *Source) send(objs []interface{}) {
var (
podCount = len(objs)
pods = make([]*api.Pod, 0, podCount)
)
for _, f := range source.filters {
f.Before(podCount)
}
foreachPod:
for _, o := range objs {
p := o.(*api.Pod)
for _, f := range source.filters {
if p, ok := f.Accept(p); ok {
pods = append(pods, p)
continue foreachPod
}
}
// unrecognized pod
log.V(2).Infof("skipping pod %v/%v", p.Namespace, p.Name)
}
// TODO(jdef) should these be applied in reverse order instead?
for _, f := range source.filters {
f.After()
}
u := kubetypes.PodUpdate{
Op: kubetypes.SET,
Pods: pods,
Source: MesosSource,
}
select {
case <-source.stop:
case source.out <- u:
log.V(2).Infof("sent %d pod updates", len(pods))
}
}
func ContainerEnvOverlay(env []api.EnvVar) Option {
return func(s *Source) {
// prepend this filter so that it impacts *all* pods running on the slave
s.filters = append([]Filter{filterContainerEnvOverlay(env)}, s.filters...)
}
}
func filterContainerEnvOverlay(env []api.EnvVar) FilterFunc {
f := podutil.Environment(env)
return func(pod *api.Pod) (*api.Pod, bool) {
f(pod)
// we should't vote, let someone else decide whether the pod gets accepted
return pod, false
}
}

View File

@ -30,7 +30,10 @@ import (
"k8s.io/kubernetes/cmd/kubelet/app/options"
"k8s.io/kubernetes/contrib/mesos/pkg/executor"
"k8s.io/kubernetes/contrib/mesos/pkg/executor/config"
"k8s.io/kubernetes/contrib/mesos/pkg/executor/service/podsource"
"k8s.io/kubernetes/contrib/mesos/pkg/hyperkube"
"k8s.io/kubernetes/contrib/mesos/pkg/podutil"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/meta"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/cache"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
@ -43,10 +46,20 @@ import (
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
)
// TODO(jdef): passing the value of envContainerID to all docker containers instantiated
// through the kubelet is part of a strategy to enable orphan container GC; this can all
// be ripped out once we have a kubelet runtime that leverages Mesos native containerization.
// envContainerID is the name of the environment variable that contains the
// Mesos-assigned container ID of the Executor.
const envContainerID = "MESOS_EXECUTOR_CONTAINER_UUID"
type KubeletExecutorServer struct {
*options.KubeletServer
SuicideTimeout time.Duration
LaunchGracePeriod time.Duration
containerID string
}
func NewKubeletExecutorServer() *KubeletExecutorServer {
@ -78,15 +91,31 @@ func (s *KubeletExecutorServer) runExecutor(
apiclient *clientset.Clientset,
registry executor.Registry,
) (<-chan struct{}, error) {
staticPodFilters := podutil.Filters{
// annotate the pod with BindingHostKey so that the scheduler will ignore the pod
// once it appears in the pod registry. the stock kubelet sets the pod host in order
// to accomplish the same; we do this because the k8sm scheduler works differently.
podutil.Annotator(map[string]string{
meta.BindingHostKey: s.HostnameOverride,
}),
}
if s.containerID != "" {
// tag all pod containers with the containerID so that they can be properly GC'd by Mesos
staticPodFilters = append(staticPodFilters, podutil.Environment([]api.EnvVar{
{Name: envContainerID, Value: s.containerID},
}))
}
exec := executor.New(executor.Config{
Registry: registry,
APIClient: apiclient,
Docker: dockertools.ConnectToDockerOrDie(s.DockerEndpoint),
SuicideTimeout: s.SuicideTimeout,
KubeletFinished: kubeletFinished,
ExitFunc: os.Exit,
StaticPodsConfigPath: staticPodsConfigPath,
NodeInfos: nodeInfos,
Registry: registry,
APIClient: apiclient,
Docker: dockertools.ConnectToDockerOrDie(s.DockerEndpoint),
SuicideTimeout: s.SuicideTimeout,
KubeletFinished: kubeletFinished,
ExitFunc: os.Exit,
NodeInfos: nodeInfos,
Options: []executor.Option{
executor.StaticPods(staticPodsConfigPath, staticPodFilters),
},
})
// initialize driver and initialize the executor with it
@ -200,7 +229,19 @@ func (s *KubeletExecutorServer) runKubelet(
}()
// create main pod source, it will stop generating events once executorDone is closed
newSourceMesos(executorDone, kcfg.PodConfig.Channel(mesosSource), podLW, registry)
var containerOptions []podsource.Option
if s.containerID != "" {
// tag all pod containers with the containerID so that they can be properly GC'd by Mesos
containerOptions = append(containerOptions, podsource.ContainerEnvOverlay([]api.EnvVar{
{Name: envContainerID, Value: s.containerID},
}))
kcfg.ContainerRuntimeOptions = append(kcfg.ContainerRuntimeOptions,
dockertools.PodInfraContainerEnv(map[string]string{
envContainerID: s.containerID,
}))
}
podsource.Mesos(executorDone, kcfg.PodConfig.Channel(podsource.MesosSource), podLW, registry, containerOptions...)
// create static-pods directory file source
log.V(2).Infof("initializing static pods source factory, configured at path %q", staticPodsConfigPath)
@ -229,6 +270,12 @@ func (s *KubeletExecutorServer) Run(hks hyperkube.Interface, _ []string) error {
return err
}
// we're expecting that either Mesos or the minion process will set this for us
s.containerID = os.Getenv(envContainerID)
if s.containerID == "" {
log.Warningf("missing expected environment variable %q", envContainerID)
}
// create apiserver client
var apiclient *clientset.Clientset
clientConfig, err := kubeletapp.CreateAPIServerClientConfig(s.KubeletServer)

View File

@ -116,20 +116,22 @@ func filterArgsByFlagSet(args []string, flags *pflag.FlagSet) ([]string, []strin
return matched, notMatched
}
func findMesosCgroup(prefix string) string {
func findMesosCgroup(prefix string) (cgroupPath string, containerID string) {
// derive our cgroup from MESOS_DIRECTORY environment
mesosDir := os.Getenv("MESOS_DIRECTORY")
if mesosDir == "" {
log.V(2).Infof("cannot derive executor's cgroup because MESOS_DIRECTORY is empty")
return ""
return
}
containerId := path.Base(mesosDir)
if containerId == "" {
containerID = path.Base(mesosDir)
if containerID == "" {
log.V(2).Infof("cannot derive executor's cgroup from MESOS_DIRECTORY=%q", mesosDir)
return ""
return
}
return path.Join("/", prefix, containerId)
cgroupPath = path.Join("/", prefix, containerID)
return
}
func (ms *MinionServer) launchProxyServer() {
@ -154,13 +156,13 @@ func (ms *MinionServer) launchProxyServer() {
args = append(args, fmt.Sprintf("--hostname-override=%s", ms.KubeletExecutorServer.HostnameOverride))
}
ms.launchHyperkubeServer(hyperkube.CommandProxy, args, proxyLogFilename, nil)
ms.launchHyperkubeServer(hyperkube.CommandProxy, args, proxyLogFilename)
}
// launchExecutorServer returns a chan that closes upon kubelet-executor death. since the kubelet-
// executor doesn't support failover right now, the right thing to do is to fail completely since all
// pods will be lost upon restart and we want mesos to recover the resources from them.
func (ms *MinionServer) launchExecutorServer() <-chan struct{} {
func (ms *MinionServer) launchExecutorServer(containerID string) <-chan struct{} {
allArgs := os.Args[1:]
// filter out minion flags, leaving those for the executor
@ -174,25 +176,24 @@ func (ms *MinionServer) launchExecutorServer() <-chan struct{} {
executorArgs = append(executorArgs, "--cgroup-root="+ms.cgroupRoot)
}
// forward containerID so that the executor may pass it along to containers that it launches
var ctidOpt tasks.Option
ctidOpt = func(t *tasks.Task) tasks.Option {
oldenv := t.Env[:]
t.Env = append(t.Env, "MESOS_EXECUTOR_CONTAINER_UUID="+containerID)
return func(t2 *tasks.Task) tasks.Option {
t2.Env = oldenv
return ctidOpt
}
}
// run executor and quit minion server when this exits cleanly
execDied := make(chan struct{})
decorator := func(t *tasks.Task) *tasks.Task {
t.Finished = func(_ bool) bool {
// this func implements the task.finished spec, so when the executor exits
// we return false to indicate that it should not be restarted. we also
// close execDied to signal interested listeners.
close(execDied)
return false
}
// since we only expect to die once, and there is no restart; don't delay any longer than needed
t.RestartDelay = 0
return t
}
ms.launchHyperkubeServer(hyperkube.CommandExecutor, executorArgs, executorLogFilename, decorator)
ms.launchHyperkubeServer(hyperkube.CommandExecutor, executorArgs, executorLogFilename, tasks.NoRespawn(execDied), ctidOpt)
return execDied
}
func (ms *MinionServer) launchHyperkubeServer(server string, args []string, logFileName string, decorator func(*tasks.Task) *tasks.Task) {
func (ms *MinionServer) launchHyperkubeServer(server string, args []string, logFileName string, options ...tasks.Option) {
log.V(2).Infof("Spawning hyperkube %v with args '%+v'", server, args)
kmArgs := append([]string{server}, args...)
@ -215,33 +216,39 @@ func (ms *MinionServer) launchHyperkubeServer(server string, args []string, logF
}
}
// use given environment, but add /usr/sbin and $SANDBOX/bin to the path for the iptables binary used in kube-proxy
var kmEnv []string
env := os.Environ()
kmEnv = make([]string, 0, len(env))
for _, e := range env {
if !strings.HasPrefix(e, "PATH=") {
kmEnv = append(kmEnv, e)
} else {
if ms.pathOverride != "" {
e = "PATH=" + ms.pathOverride
}
pwd, err := os.Getwd()
if err != nil {
panic(fmt.Errorf("Cannot get current directory: %v", err))
}
kmEnv = append(kmEnv, fmt.Sprintf("%s:%s", e, path.Join(pwd, "bin")))
}
}
// prepend env, allow later options to customize further
options = append([]tasks.Option{tasks.Environment(os.Environ()), ms.applyPathOverride()}, options...)
t := tasks.New(server, ms.kmBinary, kmArgs, kmEnv, writerFunc)
if decorator != nil {
t = decorator(t)
}
t := tasks.New(server, ms.kmBinary, kmArgs, writerFunc, options...)
go t.Start()
ms.tasks = append(ms.tasks, t)
}
// applyPathOverride overrides PATH and also adds $SANDBOX/bin (needed for locating bundled binary deps
// as well as external deps like iptables)
func (ms *MinionServer) applyPathOverride() tasks.Option {
return func(t *tasks.Task) tasks.Option {
kmEnv := make([]string, 0, len(t.Env))
for _, e := range t.Env {
if !strings.HasPrefix(e, "PATH=") {
kmEnv = append(kmEnv, e)
} else {
if ms.pathOverride != "" {
e = "PATH=" + ms.pathOverride
}
pwd, err := os.Getwd()
if err != nil {
panic(fmt.Errorf("Cannot get current directory: %v", err))
}
kmEnv = append(kmEnv, fmt.Sprintf("%s:%s", e, path.Join(pwd, "bin")))
}
}
oldenv := t.Env
t.Env = kmEnv
return tasks.Environment(oldenv)
}
}
// runs the main kubelet loop, closing the kubeletFinished chan when the loop exits.
// never returns.
func (ms *MinionServer) Run(hks hyperkube.Interface, _ []string) error {
@ -263,7 +270,8 @@ func (ms *MinionServer) Run(hks hyperkube.Interface, _ []string) error {
// - pod container cgroup root (e.g. docker cgroup-parent, optionally; see comments below)
// - parent of kubelet container
// - parent of kube-proxy container
ms.mesosCgroup = findMesosCgroup(ms.cgroupPrefix)
containerID := ""
ms.mesosCgroup, containerID = findMesosCgroup(ms.cgroupPrefix)
log.Infof("discovered mesos cgroup at %q", ms.mesosCgroup)
// hack alert, this helps to work around systemd+docker+mesos integration problems
@ -285,7 +293,7 @@ func (ms *MinionServer) Run(hks hyperkube.Interface, _ []string) error {
}
// abort closes when the kubelet-executor dies
abort := ms.launchExecutorServer()
abort := ms.launchExecutorServer(containerID)
shouldQuit := termSignalListener(abort)
te := tasks.MergeOutput(ms.tasks, shouldQuit)

View File

@ -88,13 +88,13 @@ func (cp *cmdProcess) Kill(force bool) (int, error) {
// logging and restart handling as well as provides event channels for communicating process
// termination and errors related to process management.
type Task struct {
Env []string // optional: process environment override
Finished func(restarting bool) bool // callback invoked when a task process has completed; if `restarting` then it will be restarted if it returns true
RestartDelay time.Duration // interval between repeated task restarts
name string // required: unique name for this task
bin string // required: path to executable
args []string // optional: process arguments
env []string // optional: process environment override
createLogger func() io.WriteCloser // factory func that builds a log writer
cmd systemProcess // process that we started
completedCh chan *Completion // reports exit codes encountered when task processes exit, or errors during process management
@ -107,12 +107,11 @@ type Task struct {
// New builds a newly initialized task object but does not start any processes for it. callers
// are expected to invoke task.run(...) on their own.
func New(name, bin string, args, env []string, cl func() io.WriteCloser) *Task {
func New(name, bin string, args []string, cl func() io.WriteCloser, options ...Option) *Task {
t := &Task{
name: name,
bin: bin,
args: args,
env: env,
createLogger: cl,
completedCh: make(chan *Completion),
shouldQuit: make(chan struct{}),
@ -121,6 +120,9 @@ func New(name, bin string, args, env []string, cl func() io.WriteCloser) *Task {
Finished: func(restarting bool) bool { return restarting },
}
t.killFunc = func(force bool) (int, error) { return t.cmd.Kill(force) }
for _, opt := range options {
opt(t)
}
return t
}
@ -235,8 +237,8 @@ func notStartedTask(t *Task) taskStateFn {
}
t.initLogging(stderrLogs)
if len(t.env) > 0 {
cmd.Env = t.env
if len(t.Env) > 0 {
cmd.Env = t.Env
}
cmd.SysProcAttr = sysProcAttr()
@ -389,3 +391,41 @@ func MergeOutput(tasks []*Task, shouldQuit <-chan struct{}) Events {
ei := newEventsImpl(tclistener, done)
return ei
}
// Option is a functional option type for a Task that returns an "undo" Option after upon modifying the Task
type Option func(*Task) Option
// NoRespawn configures the Task lifecycle such that it will not respawn upon termination
func NoRespawn(listener chan<- struct{}) Option {
return func(t *Task) Option {
finished, restartDelay := t.Finished, t.RestartDelay
t.Finished = func(_ bool) bool {
// this func implements the task.finished spec, so when the task exits
// we return false to indicate that it should not be restarted. we also
// close execDied to signal interested listeners.
if listener != nil {
close(listener)
listener = nil
}
return false
}
// since we only expect to die once, and there is no restart; don't delay any longer than needed
t.RestartDelay = 0
return func(t2 *Task) Option {
t2.Finished, t2.RestartDelay = finished, restartDelay
return NoRespawn(listener)
}
}
}
// Environment customizes the process runtime environment for a Task
func Environment(env []string) Option {
return func(t *Task) Option {
oldenv := t.Env
t.Env = env[:]
return Environment(oldenv)
}
}

View File

@ -77,7 +77,7 @@ func newFakeProcess() *fakeProcess {
func TestBadLogger(t *testing.T) {
err := errors.New("qux")
fp := newFakeProcess()
tt := New("foo", "bar", nil, nil, func() io.WriteCloser {
tt := New("foo", "bar", nil, func() io.WriteCloser {
defer func() {
fp.pid = 123 // sanity check
fp.Kill(false) // this causes Wait() to return
@ -126,7 +126,7 @@ func TestMergeOutput(t *testing.T) {
tasksDone.Add(2)
tasksStarted.Add(2)
t1 := New("foo", "", nil, nil, devNull)
t1 := New("foo", "", nil, devNull)
t1exited := make(chan struct{})
t1.RestartDelay = 0 // don't slow the test down for no good reason
t1.Finished = func(ok bool) bool {
@ -145,7 +145,7 @@ func TestMergeOutput(t *testing.T) {
return taskRunning
})
t2 := New("bar", "", nil, nil, devNull)
t2 := New("bar", "", nil, devNull)
t2exited := make(chan struct{})
t2.RestartDelay = 0 // don't slow the test down for no good reason
t2.Finished = func(ok bool) bool {
@ -235,7 +235,7 @@ func (t *fakeTimer) reset() { t.ch = nil }
func TestAfterDeath(t *testing.T) {
// test kill escalation since that's not covered by other unit tests
t1 := New("foo", "", nil, nil, devNull)
t1 := New("foo", "", nil, devNull)
kills := 0
waitCh := make(chan *Completion, 1)
timer := &fakeTimer{}

View File

@ -23,6 +23,7 @@ import (
type defaultFunc func(pod *api.Pod) error
// return true if the pod passes the filter
type FilterFunc func(pod *api.Pod) (bool, error)
type Filters []FilterFunc
@ -47,6 +48,39 @@ func Annotator(m map[string]string) FilterFunc {
})
}
// Environment returns a filter that writes environment variables into pod containers
func Environment(env []api.EnvVar) FilterFunc {
// index the envvar names
var (
envcount = len(env)
m = make(map[string]int, envcount)
)
for j := range env {
m[env[j].Name] = j
}
return func(pod *api.Pod) (bool, error) {
for i := range pod.Spec.Containers {
ct := &pod.Spec.Containers[i]
dup := make(map[string]struct{}, envcount)
// overwrite dups (and remember them for later)
for j := range ct.Env {
name := ct.Env[j].Name
if k, ok := m[name]; ok {
ct.Env[j] = env[k]
dup[name] = struct{}{}
}
}
// append non-dups into ct.Env
for name, k := range m {
if _, ok := dup[name]; !ok {
ct.Env = append(ct.Env, env[k])
}
}
}
return true, nil
}
}
// Stream returns a chan of pods that yields each pod from the given list.
// No pods are yielded if err is non-nil.
func Stream(list *api.PodList, err error) <-chan *api.Pod {
@ -65,6 +99,14 @@ func Stream(list *api.PodList, err error) <-chan *api.Pod {
return out
}
func (filters Filters) Do(in <-chan *api.Pod) (out <-chan *api.Pod) {
out = in
for _, f := range filters {
out = f.Do(out)
}
return
}
func (filter FilterFunc) Do(in <-chan *api.Pod) <-chan *api.Pod {
out := make(chan *api.Pod)
go func() {