Add Mesos hyperkube minion server

The minion server will
- launch the proxy and executor
- relaunch them when they terminate uncleanly
- logrotate their logs.

It is a replacement for a full-blown init process like s6 which is not necessary
in this case.
This commit is contained in:
Dr. Stefan Schimanski 2015-07-28 17:15:48 +02:00
parent e0eb0397b8
commit 2b1ecd28f0
19 changed files with 537 additions and 226 deletions

View File

@ -19,8 +19,8 @@ limitations under the License.
package main package main
import ( import (
"github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/hyperkube"
kubeproxy "github.com/GoogleCloudPlatform/kubernetes/cmd/kube-proxy/app" kubeproxy "github.com/GoogleCloudPlatform/kubernetes/cmd/kube-proxy/app"
"github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/hyperkube"
) )
// NewKubeProxy creates a new hyperkube Server object that includes the // NewKubeProxy creates a new hyperkube Server object that includes the
@ -29,7 +29,7 @@ func NewKubeProxy() *Server {
s := kubeproxy.NewProxyServer() s := kubeproxy.NewProxyServer()
hks := Server{ hks := Server{
SimpleUsage: hyperkube.KM_PROXY, SimpleUsage: hyperkube.CommandProxy,
Long: `The Kubernetes proxy server is responsible for taking traffic directed at Long: `The Kubernetes proxy server is responsible for taking traffic directed at
services and forwarding it to the appropriate pods. It generally runs on services and forwarding it to the appropriate pods. It generally runs on
nodes next to the Kubelet and proxies traffic from local pods to remote pods. nodes next to the Kubelet and proxies traffic from local pods to remote pods.

View File

@ -29,7 +29,7 @@ func NewScheduler() *Server {
s := scheduler.NewSchedulerServer() s := scheduler.NewSchedulerServer()
hks := Server{ hks := Server{
SimpleUsage: hyperkube.KM_SCHEDULER, SimpleUsage: hyperkube.CommandScheduler,
Long: "Implements a Kubernetes scheduler. This will assign pods to kubelets based on capacity and constraints.", Long: "Implements a Kubernetes scheduler. This will assign pods to kubelets based on capacity and constraints.",
Run: func(_ *Server, args []string) error { Run: func(_ *Server, args []string) error {
return s.Run(args) return s.Run(args)

View File

@ -32,7 +32,7 @@ func main() {
runtime.GOMAXPROCS(runtime.NumCPU()) runtime.GOMAXPROCS(runtime.NumCPU())
s := service.NewKubeletExecutorServer() s := service.NewKubeletExecutorServer()
s.AddStandaloneFlags(pflag.CommandLine) s.AddFlags(pflag.CommandLine)
util.InitFlags() util.InitFlags()
util.InitLogs() util.InitLogs()

View File

@ -28,7 +28,7 @@ func NewControllerManager() *Server {
s := controllermanager.NewCMServer() s := controllermanager.NewCMServer()
hks := Server{ hks := Server{
SimpleUsage: hyperkube.KM_CONTROLLER_MANAGER, SimpleUsage: hyperkube.CommandControllerManager,
Long: "A server that runs a set of active components. This includes replication controllers, service endpoints and nodes.", Long: "A server that runs a set of active components. This includes replication controllers, service endpoints and nodes.",
Run: func(_ *Server, args []string) error { Run: func(_ *Server, args []string) error {
return s.Run(args) return s.Run(args)

View File

@ -17,16 +17,16 @@ limitations under the License.
package main package main
import ( import (
"github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/hyperkube"
"github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/executor/service" "github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/executor/service"
"github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/hyperkube"
) )
// NewHyperkubeServer creates a new hyperkube Server object that includes the // NewHyperkubeServer creates a new hyperkube Server object that includes the
// description and flags. // description and flags.
func NewKubeletExecutor() *Server { func NewKubeletExecutor() *Server {
s := service.NewHyperKubeletExecutorServer() s := service.NewKubeletExecutorServer()
hks := Server{ hks := Server{
SimpleUsage: hyperkube.KM_EXECUTOR, SimpleUsage: hyperkube.CommandExecutor,
Long: `The kubelet-executor binary is responsible for maintaining a set of containers Long: `The kubelet-executor binary is responsible for maintaining a set of containers
on a particular node. It syncs data from a specialized Mesos source that tracks on a particular node. It syncs data from a specialized Mesos source that tracks
task launches and kills. It then queries Docker to see what is currently task launches and kills. It then queries Docker to see what is currently
@ -36,6 +36,6 @@ containers by starting or stopping Docker containers.`,
return s.Run(hks, args) return s.Run(hks, args)
}, },
} }
s.AddHyperkubeFlags(hks.Flags()) s.AddFlags(hks.Flags())
return &hks return &hks
} }

View File

@ -0,0 +1,39 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package main
import (
"github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/hyperkube"
"github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/minion"
)
// NewMinion creates a new hyperkube Server object that includes the
// description and flags.
func NewMinion() *Server {
s := minion.NewMinionServer()
hks := Server{
SimpleUsage: hyperkube.CommandMinion,
Long: `Implements a Kubernetes minion. This will launch the proxy and executor.`,
Run: func(hks *Server, args []string) error {
return s.Run(hks, args)
},
}
s.AddMinionFlags(hks.Flags())
s.AddExecutorFlags(hks.Flags())
return &hks
}

View File

@ -28,7 +28,7 @@ func NewScheduler() *Server {
s := service.NewSchedulerServer() s := service.NewSchedulerServer()
hks := Server{ hks := Server{
SimpleUsage: hyperkube.KM_SCHEDULER, SimpleUsage: hyperkube.CommandScheduler,
Long: `Implements the Kubernetes-Mesos scheduler. This will launch Mesos tasks which Long: `Implements the Kubernetes-Mesos scheduler. This will launch Mesos tasks which
results in pods assigned to kubelets based on capacity and constraints.`, results in pods assigned to kubelets based on capacity and constraints.`,
Run: func(hks *Server, args []string) error { Run: func(hks *Server, args []string) error {

View File

@ -32,6 +32,7 @@ func main() {
hk.AddServer(NewScheduler()) hk.AddServer(NewScheduler())
hk.AddServer(NewKubeletExecutor()) hk.AddServer(NewKubeletExecutor())
hk.AddServer(NewKubeProxy()) hk.AddServer(NewKubeProxy())
hk.AddServer(NewMinion())
hk.RunToExit(os.Args) hk.RunToExit(os.Args)
} }

View File

@ -18,8 +18,8 @@ limitations under the License.
package main package main
import ( import (
"github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/hyperkube"
kubeapiserver "github.com/GoogleCloudPlatform/kubernetes/cmd/kube-apiserver/app" kubeapiserver "github.com/GoogleCloudPlatform/kubernetes/cmd/kube-apiserver/app"
"github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/hyperkube"
) )
// NewKubeAPIServer creates a new hyperkube Server object that includes the // NewKubeAPIServer creates a new hyperkube Server object that includes the
@ -28,7 +28,7 @@ func NewKubeAPIServer() *Server {
s := kubeapiserver.NewAPIServer() s := kubeapiserver.NewAPIServer()
hks := Server{ hks := Server{
SimpleUsage: hyperkube.KM_APISERVER, SimpleUsage: hyperkube.CommandApiserver,
Long: "The main API entrypoint and interface to the storage system. The API server is also the focal point for all authorization decisions.", Long: "The main API entrypoint and interface to the storage system. The API server is also the focal point for all authorization decisions.",
Run: func(_ *Server, args []string) error { Run: func(_ *Server, args []string) error {
return s.Run(args) return s.Run(args)

View File

@ -18,8 +18,8 @@ limitations under the License.
package main package main
import ( import (
"github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/hyperkube"
kubeproxy "github.com/GoogleCloudPlatform/kubernetes/cmd/kube-proxy/app" kubeproxy "github.com/GoogleCloudPlatform/kubernetes/cmd/kube-proxy/app"
"github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/hyperkube"
) )
// NewKubeProxy creates a new hyperkube Server object that includes the // NewKubeProxy creates a new hyperkube Server object that includes the
@ -28,7 +28,7 @@ func NewKubeProxy() *Server {
s := kubeproxy.NewProxyServer() s := kubeproxy.NewProxyServer()
hks := Server{ hks := Server{
SimpleUsage: hyperkube.KM_PROXY, SimpleUsage: hyperkube.CommandProxy,
Long: `The Kubernetes proxy server is responsible for taking traffic directed at Long: `The Kubernetes proxy server is responsible for taking traffic directed at
services and forwarding it to the appropriate pods. It generally runs on services and forwarding it to the appropriate pods. It generally runs on
nodes next to the Kubelet and proxies traffic from local pods to remote pods. nodes next to the Kubelet and proxies traffic from local pods to remote pods.

View File

@ -17,14 +17,12 @@ limitations under the License.
package service package service
import ( import (
"bufio"
"fmt" "fmt"
"io" "io"
"math/rand" "math/rand"
"net" "net"
"net/http" "net/http"
"os" "os"
"os/exec"
"path" "path"
"path/filepath" "path/filepath"
"strconv" "strconv"
@ -37,7 +35,6 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/executor/config" "github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/executor/config"
"github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/hyperkube" "github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/hyperkube"
"github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/redirfd" "github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/redirfd"
"github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/credentialprovider" "github.com/GoogleCloudPlatform/kubernetes/pkg/credentialprovider"
@ -49,7 +46,6 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/mount" "github.com/GoogleCloudPlatform/kubernetes/pkg/util/mount"
log "github.com/golang/glog" log "github.com/golang/glog"
"github.com/kardianos/osext"
bindings "github.com/mesos/mesos-go/executor" bindings "github.com/mesos/mesos-go/executor"
"github.com/spf13/pflag" "github.com/spf13/pflag"
@ -63,11 +59,6 @@ const (
type KubeletExecutorServer struct { type KubeletExecutorServer struct {
*app.KubeletServer *app.KubeletServer
RunProxy bool
ProxyLogV int
ProxyExec string
ProxyLogfile string
ProxyBindall bool
SuicideTimeout time.Duration SuicideTimeout time.Duration
ShutdownFD int ShutdownFD int
ShutdownFIFO string ShutdownFIFO string
@ -96,9 +87,6 @@ func findMesosCgroup(prefix string) string {
func NewKubeletExecutorServer() *KubeletExecutorServer { func NewKubeletExecutorServer() *KubeletExecutorServer {
k := &KubeletExecutorServer{ k := &KubeletExecutorServer{
KubeletServer: app.NewKubeletServer(), KubeletServer: app.NewKubeletServer(),
RunProxy: true,
ProxyExec: "./kube-proxy",
ProxyLogfile: "./proxy-log",
SuicideTimeout: config.DefaultSuicideTimeout, SuicideTimeout: config.DefaultSuicideTimeout,
cgroupPrefix: config.DefaultCgroupPrefix, cgroupPrefix: config.DefaultCgroupPrefix,
} }
@ -113,40 +101,14 @@ func NewKubeletExecutorServer() *KubeletExecutorServer {
return k return k
} }
func NewHyperKubeletExecutorServer() *KubeletExecutorServer { func (s *KubeletExecutorServer) AddFlags(fs *pflag.FlagSet) {
s := NewKubeletExecutorServer()
// cache this for later use
binary, err := osext.Executable()
if err != nil {
log.Fatalf("failed to determine currently running executable: %v", err)
}
s.ProxyExec = binary
return s
}
func (s *KubeletExecutorServer) addCoreFlags(fs *pflag.FlagSet) {
s.KubeletServer.AddFlags(fs) s.KubeletServer.AddFlags(fs)
fs.BoolVar(&s.RunProxy, "run-proxy", s.RunProxy, "Maintain a running kube-proxy instance as a child proc of this kubelet-executor.")
fs.IntVar(&s.ProxyLogV, "proxy-logv", s.ProxyLogV, "Log verbosity of the child kube-proxy.")
fs.StringVar(&s.ProxyLogfile, "proxy-logfile", s.ProxyLogfile, "Path to the kube-proxy log file.")
fs.BoolVar(&s.ProxyBindall, "proxy-bindall", s.ProxyBindall, "When true will cause kube-proxy to bind to 0.0.0.0.")
fs.DurationVar(&s.SuicideTimeout, "suicide-timeout", s.SuicideTimeout, "Self-terminate after this period of inactivity. Zero disables suicide watch.") fs.DurationVar(&s.SuicideTimeout, "suicide-timeout", s.SuicideTimeout, "Self-terminate after this period of inactivity. Zero disables suicide watch.")
fs.IntVar(&s.ShutdownFD, "shutdown-fd", s.ShutdownFD, "File descriptor used to signal shutdown to external watchers, requires shutdown-fifo flag") fs.IntVar(&s.ShutdownFD, "shutdown-fd", s.ShutdownFD, "File descriptor used to signal shutdown to external watchers, requires shutdown-fifo flag")
fs.StringVar(&s.ShutdownFIFO, "shutdown-fifo", s.ShutdownFIFO, "FIFO used to signal shutdown to external watchers, requires shutdown-fd flag") fs.StringVar(&s.ShutdownFIFO, "shutdown-fifo", s.ShutdownFIFO, "FIFO used to signal shutdown to external watchers, requires shutdown-fd flag")
fs.StringVar(&s.cgroupPrefix, "cgroup-prefix", s.cgroupPrefix, "The cgroup prefix concatenated with MESOS_DIRECTORY must give the executor cgroup set by Mesos") fs.StringVar(&s.cgroupPrefix, "cgroup-prefix", s.cgroupPrefix, "The cgroup prefix concatenated with MESOS_DIRECTORY must give the executor cgroup set by Mesos")
} }
func (s *KubeletExecutorServer) AddStandaloneFlags(fs *pflag.FlagSet) {
s.addCoreFlags(fs)
fs.StringVar(&s.ProxyExec, "proxy-exec", s.ProxyExec, "Path to the kube-proxy executable.")
}
func (s *KubeletExecutorServer) AddHyperkubeFlags(fs *pflag.FlagSet) {
s.addCoreFlags(fs)
}
// returns a Closer that should be closed to signal impending shutdown, but only if ShutdownFD // returns a Closer that should be closed to signal impending shutdown, but only if ShutdownFD
// and ShutdownFIFO were specified. if they are specified, then this func blocks until there's // and ShutdownFIFO were specified. if they are specified, then this func blocks until there's
// a reader on the FIFO stream. // a reader on the FIFO stream.
@ -429,11 +391,6 @@ func (ks *KubeletExecutorServer) createAndInitKubelet(
k := &kubeletExecutor{ k := &kubeletExecutor{
Kubelet: klet, Kubelet: klet,
runProxy: ks.RunProxy,
proxyLogV: ks.ProxyLogV,
proxyExec: ks.ProxyExec,
proxyLogfile: ks.ProxyLogfile,
proxyBindall: ks.ProxyBindall,
address: ks.Address, address: ks.Address,
dockerClient: kc.DockerClient, dockerClient: kc.DockerClient,
hks: hks, hks: hks,
@ -468,11 +425,6 @@ type kubeletExecutor struct {
*kubelet.Kubelet *kubelet.Kubelet
initialize sync.Once initialize sync.Once
driver bindings.ExecutorDriver driver bindings.ExecutorDriver
runProxy bool
proxyLogV int
proxyExec string
proxyLogfile string
proxyBindall bool
address util.IP address util.IP
dockerClient dockertools.DockerInterface dockerClient dockertools.DockerInterface
hks hyperkube.Interface hks hyperkube.Interface
@ -485,9 +437,6 @@ func (kl *kubeletExecutor) ListenAndServe(address net.IP, port uint, tlsOptions
// this func could be called many times, depending how often the HTTP server crashes, // this func could be called many times, depending how often the HTTP server crashes,
// so only execute certain initialization procs once // so only execute certain initialization procs once
kl.initialize.Do(func() { kl.initialize.Do(func() {
if kl.runProxy {
go runtime.Until(kl.runProxyService, 5*time.Second, kl.executorDone)
}
go func() { go func() {
if _, err := kl.driver.Run(); err != nil { if _, err := kl.driver.Run(); err != nil {
log.Fatalf("executor driver failed: %v", err) log.Fatalf("executor driver failed: %v", err)
@ -499,101 +448,6 @@ func (kl *kubeletExecutor) ListenAndServe(address net.IP, port uint, tlsOptions
kubelet.ListenAndServeKubeletServer(kl, address, port, tlsOptions, enableDebuggingHandlers) kubelet.ListenAndServeKubeletServer(kl, address, port, tlsOptions, enableDebuggingHandlers)
} }
// this function blocks as long as the proxy service is running; intended to be
// executed asynchronously.
func (kl *kubeletExecutor) runProxyService() {
log.Infof("Starting proxy process...")
args := []string{}
if kl.hks.FindServer(hyperkube.KM_PROXY) {
args = append(args, hyperkube.KM_PROXY)
log.V(1).Infof("attempting to using km proxy service")
} else if _, err := os.Stat(kl.proxyExec); os.IsNotExist(err) {
log.Errorf("failed to locate proxy executable at '%v' and km not present: %v", kl.proxyExec, err)
return
}
bindAddress := "0.0.0.0"
if !kl.proxyBindall {
bindAddress = kl.address.String()
}
args = append(args,
fmt.Sprintf("--bind-address=%s", bindAddress),
fmt.Sprintf("--v=%d", kl.proxyLogV),
"--logtostderr=true",
)
// add client.Config args here. proxy still calls client.BindClientConfigFlags
appendStringArg := func(name, value string) {
if value != "" {
args = append(args, fmt.Sprintf("--%s=%s", name, value))
}
}
appendStringArg("master", kl.clientConfig.Host)
/* TODO(jdef) move these flags to a config file pointed to by --kubeconfig
appendStringArg("api-version", kl.clientConfig.Version)
appendStringArg("client-certificate", kl.clientConfig.CertFile)
appendStringArg("client-key", kl.clientConfig.KeyFile)
appendStringArg("certificate-authority", kl.clientConfig.CAFile)
args = append(args, fmt.Sprintf("--insecure-skip-tls-verify=%t", kl.clientConfig.Insecure))
*/
log.Infof("Spawning process executable %s with args '%+v'", kl.proxyExec, args)
cmd := exec.Command(kl.proxyExec, args...)
if _, err := cmd.StdoutPipe(); err != nil {
log.Fatal(err)
}
proxylogs, err := cmd.StderrPipe()
if err != nil {
log.Fatal(err)
}
//TODO(jdef) append instead of truncate? what if the disk is full?
logfile, err := os.Create(kl.proxyLogfile)
if err != nil {
log.Fatal(err)
}
defer logfile.Close()
ch := make(chan struct{})
go func() {
defer func() {
select {
case <-ch:
log.Infof("killing proxy process..")
if err = cmd.Process.Kill(); err != nil {
log.Errorf("failed to kill proxy process: %v", err)
}
default:
}
}()
writer := bufio.NewWriter(logfile)
defer writer.Flush()
<-ch
written, err := io.Copy(writer, proxylogs)
if err != nil {
log.Errorf("error writing data to proxy log: %v", err)
}
log.Infof("wrote %d bytes to proxy log", written)
}()
// if the proxy fails to start then we exit the executor, otherwise
// wait for the proxy process to end (and release resources after).
if err := cmd.Start(); err != nil {
log.Fatal(err)
}
close(ch)
if err := cmd.Wait(); err != nil {
log.Error(err)
}
}
// runs the main kubelet loop, closing the kubeletFinished chan when the loop exits. // runs the main kubelet loop, closing the kubeletFinished chan when the loop exits.
// never returns. // never returns.
func (kl *kubeletExecutor) Run(updates <-chan kubelet.PodUpdate) { func (kl *kubeletExecutor) Run(updates <-chan kubelet.PodUpdate) {

View File

@ -17,9 +17,10 @@ limitations under the License.
package hyperkube package hyperkube
const ( const (
KM_APISERVER = "apiserver" CommandApiserver = "apiserver"
KM_CONTROLLER_MANAGER = "controller-manager" CommandControllerManager = "controller-manager"
KM_EXECUTOR = "executor" CommandExecutor = "executor"
KM_PROXY = "proxy" CommandMinion = "minion"
KM_SCHEDULER = "scheduler" CommandProxy = "proxy"
CommandScheduler = "scheduler"
) )

View File

@ -0,0 +1,31 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package config
import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/resource"
)
const (
DefaultLogMaxBackups = 5 // how many backup to keep
DefaultLogMaxAgeInDays = 7 // after how many days to rotate at most
)
// DefaultLogMaxSize returns the maximal log file size before rotation
func DefaultLogMaxSize() resource.Quantity {
return *resource.NewQuantity(10*1024*1024, resource.BinarySI)
}

View File

@ -0,0 +1,18 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package config contains minion configuration constants.
package config

View File

@ -0,0 +1,18 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package minion contains the executor and proxy bootstrap code for a Mesos slave
package minion

View File

@ -0,0 +1,25 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package minion
import (
log "github.com/golang/glog"
)
func enterPrivateMountNamespace() {
log.Info("Skipping mount namespace, only available on Linux")
}

View File

@ -0,0 +1,39 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package minion
import (
"syscall"
log "github.com/golang/glog"
)
func enterPrivateMountNamespace() {
// enter a new mount NS, useful for isolating changes to the mount table
// that are made by the kubelet for storage volumes.
err := syscall.Unshare(syscall.CLONE_NEWNS)
if err != nil {
log.Fatalf("failed to enter private mount NS: %v", err)
}
// make the rootfs / rslave to the parent mount NS so that we
// pick up on any changes made there
err = syscall.Mount("", "/", "dontcare", syscall.MS_REC|syscall.MS_SLAVE, "")
if err != nil {
log.Fatalf("failed to mark / rslave: %v", err)
}
}

View File

@ -0,0 +1,271 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package minion
import (
"fmt"
"io"
"io/ioutil"
"os"
"os/exec"
"time"
exservice "github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/executor/service"
"github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/hyperkube"
"github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/minion/config"
"github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/resource"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
log "github.com/golang/glog"
"github.com/kardianos/osext"
"github.com/spf13/pflag"
"gopkg.in/natefinch/lumberjack.v2"
)
type MinionServer struct {
// embed the executor server to be able to use its flags
// TODO(sttts): get rid of this mixing of the minion and the executor server with a multiflags implementation for km
KubeletExecutorServer *exservice.KubeletExecutorServer
privateMountNS bool
hks hyperkube.Interface
clientConfig *client.Config
kmBinary string
done chan struct{} // closed when shutting down
exit chan error // to signal fatal errors
logMaxSize resource.Quantity
logMaxBackups int
logMaxAgeInDays int
runProxy bool
proxyLogV int
proxyBindall bool
}
// NewMinionServer creates the MinionServer struct with default values to be used by hyperkube
func NewMinionServer() *MinionServer {
s := &MinionServer{
KubeletExecutorServer: exservice.NewKubeletExecutorServer(),
privateMountNS: true,
done: make(chan struct{}),
exit: make(chan error),
logMaxSize: config.DefaultLogMaxSize(),
logMaxBackups: config.DefaultLogMaxBackups,
logMaxAgeInDays: config.DefaultLogMaxAgeInDays,
runProxy: true,
}
// cache this for later use
binary, err := osext.Executable()
if err != nil {
log.Fatalf("failed to determine currently running executable: %v", err)
}
s.kmBinary = binary
return s
}
// filterArgsByFlagSet returns a list of args which are parsed by the given flag set
// and another list with those which do not match
func filterArgsByFlagSet(args []string, flags *pflag.FlagSet) ([]string, []string) {
matched := []string{}
notMatched := []string{}
for _, arg := range args {
err := flags.Parse([]string{arg})
if err != nil {
notMatched = append(notMatched, arg)
} else {
matched = append(matched, arg)
}
}
return matched, notMatched
}
func (ms *MinionServer) launchProxyServer() {
bindAddress := "0.0.0.0"
if !ms.proxyBindall {
bindAddress = ms.KubeletExecutorServer.Address.String()
}
args := []string{
fmt.Sprintf("--bind-address=%s", bindAddress),
fmt.Sprintf("--v=%d", ms.proxyLogV),
"--logtostderr=true",
}
if ms.clientConfig.Host != "" {
args = append(args, fmt.Sprintf("--master=%s", ms.clientConfig.Host))
}
ms.launchHyperkubeServer(hyperkube.CommandProxy, &args, "proxy.log")
}
func (ms *MinionServer) launchExecutorServer() {
allArgs := os.Args[1:]
// filter out minion flags, leaving those for the executor
executorFlags := pflag.NewFlagSet("executor", pflag.ContinueOnError)
executorFlags.SetOutput(ioutil.Discard)
ms.AddExecutorFlags(executorFlags)
executorArgs, _ := filterArgsByFlagSet(allArgs, executorFlags)
// run executor and quit minion server when this exits cleanly
err := ms.launchHyperkubeServer(hyperkube.CommandExecutor, &executorArgs, "executor.log")
if err != nil {
// just return, executor will be restarted on error
log.Error(err)
return
}
log.Info("Executor exited cleanly, stopping the minion")
ms.exit <- nil
}
func (ms *MinionServer) launchHyperkubeServer(server string, args *[]string, logFileName string) error {
log.V(2).Infof("Spawning hyperkube %v with args '%+v'", server, args)
// prepare parameters
kmArgs := []string{server}
for _, arg := range *args {
kmArgs = append(kmArgs, arg)
}
// create command
cmd := exec.Command(ms.kmBinary, kmArgs...)
if _, err := cmd.StdoutPipe(); err != nil {
// fatal error => terminate minion
err = fmt.Errorf("error getting stdout of %v: %v", server, err)
ms.exit <- err
return err
}
stderrLogs, err := cmd.StderrPipe()
if err != nil {
// fatal error => terminate minion
err = fmt.Errorf("error getting stderr of %v: %v", server, err)
ms.exit <- err
return err
}
ch := make(chan struct{})
go func() {
defer func() {
select {
case <-ch:
log.Infof("killing %v process...", server)
if err = cmd.Process.Kill(); err != nil {
log.Errorf("failed to kill %v process: %v", server, err)
}
default:
}
}()
maxSize := ms.logMaxSize.Value()
if maxSize > 0 {
// convert to MB
maxSize = maxSize / 1024 / 1024
if maxSize == 0 {
log.Warning("maximal log file size is rounded to 1 MB")
maxSize = 1
}
}
writer := &lumberjack.Logger{
Filename: logFileName,
MaxSize: int(maxSize),
MaxBackups: ms.logMaxBackups,
MaxAge: ms.logMaxAgeInDays,
}
defer writer.Close()
log.V(2).Infof("Starting logging for %v: max log file size %d MB, keeping %d backups, for %d days", server, maxSize, ms.logMaxBackups, ms.logMaxAgeInDays)
<-ch
written, err := io.Copy(writer, stderrLogs)
if err != nil {
log.Errorf("error writing data to %v: %v", logFileName, err)
}
log.Infof("wrote %d bytes to %v", written, logFileName)
}()
// if the server fails to start then we exit the executor, otherwise
// wait for the proxy process to end (and release resources after).
if err := cmd.Start(); err != nil {
// fatal error => terminate minion
err = fmt.Errorf("error starting %v: %v", server, err)
ms.exit <- err
return err
}
close(ch)
if err := cmd.Wait(); err != nil {
log.Error("%v exited with error: %v", server, err)
err = fmt.Errorf("%v exited with error: %v", server, err)
return err
}
return nil
}
// runs the main kubelet loop, closing the kubeletFinished chan when the loop exits.
// never returns.
func (ms *MinionServer) Run(hks hyperkube.Interface, _ []string) error {
if ms.privateMountNS {
// only the Linux version will do anything
enterPrivateMountNamespace()
}
// create apiserver client
clientConfig, err := ms.KubeletExecutorServer.CreateAPIServerClientConfig()
if err != nil {
// required for k8sm since we need to send api.Binding information
// back to the apiserver
log.Fatalf("No API client: %v", err)
}
ms.clientConfig = clientConfig
// run subprocesses until ms.done is closed on return of this function
defer close(ms.done)
if ms.runProxy {
go runtime.Until(ms.launchProxyServer, 5*time.Second, ms.done)
}
go runtime.Until(ms.launchExecutorServer, 5*time.Second, ms.done)
// wait until minion exit is requested
// don't close ms.exit here to avoid panics of go routines writing an error to it
return <-ms.exit
}
func (ms *MinionServer) AddExecutorFlags(fs *pflag.FlagSet) {
ms.KubeletExecutorServer.AddFlags(fs)
}
func (ms *MinionServer) AddMinionFlags(fs *pflag.FlagSet) {
// general minion flags
fs.BoolVar(&ms.privateMountNS, "private-mountns", ms.privateMountNS, "Enter a private mount NS before spawning procs (linux only).")
// log file flags
fs.Var(resource.NewQuantityFlagValue(&ms.logMaxSize), "max-log-size", "Maximum log file size for the executor and proxy before rotation")
fs.IntVar(&ms.logMaxAgeInDays, "max-log-age", ms.logMaxAgeInDays, "Maximum log file age of the executor and proxy in days")
fs.IntVar(&ms.logMaxBackups, "max-log-backups", ms.logMaxBackups, "Maximum log file backups of the executor and proxy to keep after rotation")
// proxy flags
fs.BoolVar(&ms.runProxy, "run-proxy", ms.runProxy, "Maintain a running kube-proxy instance as a child proc of this kubelet-executor.")
fs.IntVar(&ms.proxyLogV, "proxy-logv", ms.proxyLogV, "Log verbosity of the child kube-proxy.")
fs.BoolVar(&ms.proxyBindall, "proxy-bindall", ms.proxyBindall, "When true will cause kube-proxy to bind to 0.0.0.0.")
}

View File

@ -36,6 +36,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/election" "github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/election"
execcfg "github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/executor/config" execcfg "github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/executor/config"
"github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/hyperkube" "github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/hyperkube"
minioncfg "github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/minion/config"
"github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/profile" "github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/profile"
"github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/runtime" "github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/scheduler" "github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/scheduler"
@ -46,6 +47,7 @@ import (
mresource "github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/scheduler/resource" mresource "github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/scheduler/resource"
"github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/scheduler/uid" "github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/scheduler/uid"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/resource"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/clientauth" "github.com/GoogleCloudPlatform/kubernetes/pkg/clientauth"
"github.com/GoogleCloudPlatform/kubernetes/pkg/master/ports" "github.com/GoogleCloudPlatform/kubernetes/pkg/master/ports"
@ -79,30 +81,37 @@ const (
) )
type SchedulerServer struct { type SchedulerServer struct {
Port int Port int
Address util.IP Address util.IP
EnableProfiling bool EnableProfiling bool
AuthPath string AuthPath string
APIServerList util.StringList APIServerList util.StringList
EtcdServerList util.StringList EtcdServerList util.StringList
EtcdConfigFile string EtcdConfigFile string
AllowPrivileged bool AllowPrivileged bool
ExecutorPath string ExecutorPath string
ProxyPath string ProxyPath string
MesosMaster string MesosMaster string
MesosUser string MesosUser string
MesosRole string MesosRole string
MesosAuthPrincipal string MesosAuthPrincipal string
MesosAuthSecretFile string MesosAuthSecretFile string
Checkpoint bool Checkpoint bool
FailoverTimeout float64 FailoverTimeout float64
ExecutorBindall bool
ExecutorRunProxy bool ExecutorLogV int
ExecutorProxyBindall bool ExecutorBindall bool
ExecutorLogV int ExecutorSuicideTimeout time.Duration
ExecutorProxyLogV int ExecutorCgroupPrefix string
ExecutorSuicideTimeout time.Duration
ExecutorCgroupPrefix string RunProxy bool
ProxyBindall bool
ProxyLogV int
MinionLogMaxSize resource.Quantity
MinionLogMaxBackups int
MinionLogMaxAgeInDays int
MesosAuthProvider string MesosAuthProvider string
DriverPort uint DriverPort uint
HostnameOverride string HostnameOverride string
@ -147,23 +156,29 @@ type schedulerProcessInterface interface {
// NewSchedulerServer creates a new SchedulerServer with default parameters // NewSchedulerServer creates a new SchedulerServer with default parameters
func NewSchedulerServer() *SchedulerServer { func NewSchedulerServer() *SchedulerServer {
s := SchedulerServer{ s := SchedulerServer{
Port: ports.SchedulerPort, Port: ports.SchedulerPort,
Address: util.IP(net.ParseIP("127.0.0.1")), Address: util.IP(net.ParseIP("127.0.0.1")),
FailoverTimeout: time.Duration((1 << 62) - 1).Seconds(), FailoverTimeout: time.Duration((1 << 62) - 1).Seconds(),
ExecutorRunProxy: true,
RunProxy: true,
ExecutorSuicideTimeout: execcfg.DefaultSuicideTimeout, ExecutorSuicideTimeout: execcfg.DefaultSuicideTimeout,
ExecutorCgroupPrefix: execcfg.DefaultCgroupPrefix, ExecutorCgroupPrefix: execcfg.DefaultCgroupPrefix,
MesosAuthProvider: sasl.ProviderName,
MesosMaster: defaultMesosMaster, MinionLogMaxSize: minioncfg.DefaultLogMaxSize(),
MesosUser: defaultMesosUser, MinionLogMaxBackups: minioncfg.DefaultLogMaxBackups,
ReconcileInterval: defaultReconcileInterval, MinionLogMaxAgeInDays: minioncfg.DefaultLogMaxAgeInDays,
ReconcileCooldown: defaultReconcileCooldown,
Checkpoint: true, MesosAuthProvider: sasl.ProviderName,
FrameworkName: defaultFrameworkName, MesosMaster: defaultMesosMaster,
HA: false, MesosUser: defaultMesosUser,
mux: http.NewServeMux(), ReconcileInterval: defaultReconcileInterval,
KubeletCadvisorPort: 4194, // copied from github.com/GoogleCloudPlatform/kubernetes/blob/release-0.14/cmd/kubelet/app/server.go ReconcileCooldown: defaultReconcileCooldown,
KubeletSyncFrequency: 10 * time.Second, Checkpoint: true,
FrameworkName: defaultFrameworkName,
HA: false,
mux: http.NewServeMux(),
KubeletCadvisorPort: 4194, // copied from github.com/GoogleCloudPlatform/kubernetes/blob/release-0.14/cmd/kubelet/app/server.go
KubeletSyncFrequency: 10 * time.Second,
} }
// cache this for later use. also useful in case the original binary gets deleted, e.g. // cache this for later use. also useful in case the original binary gets deleted, e.g.
// during upgrades, development deployments, etc. // during upgrades, development deployments, etc.
@ -212,14 +227,19 @@ func (s *SchedulerServer) addCoreFlags(fs *pflag.FlagSet) {
fs.Var(&s.DefaultContainerCPULimit, "default-container-cpu-limit", "Containers without a CPU resource limit are admitted this much CPU shares") fs.Var(&s.DefaultContainerCPULimit, "default-container-cpu-limit", "Containers without a CPU resource limit are admitted this much CPU shares")
fs.Var(&s.DefaultContainerMemLimit, "default-container-mem-limit", "Containers without a memory resource limit are admitted this much amount of memory in MB") fs.Var(&s.DefaultContainerMemLimit, "default-container-mem-limit", "Containers without a memory resource limit are admitted this much amount of memory in MB")
fs.IntVar(&s.ExecutorLogV, "executor-logv", s.ExecutorLogV, "Logging verbosity of spawned minion and executor processes.")
fs.BoolVar(&s.ExecutorBindall, "executor-bindall", s.ExecutorBindall, "When true will set -address of the executor to 0.0.0.0.") fs.BoolVar(&s.ExecutorBindall, "executor-bindall", s.ExecutorBindall, "When true will set -address of the executor to 0.0.0.0.")
fs.IntVar(&s.ExecutorLogV, "executor-logv", s.ExecutorLogV, "Logging verbosity of spawned executor processes.")
fs.IntVar(&s.ExecutorProxyLogV, "executor-proxy-logv", s.ExecutorProxyLogV, "Logging verbosity of spawned executor proxy processes.")
fs.BoolVar(&s.ExecutorProxyBindall, "executor-proxy-bindall", s.ExecutorProxyBindall, "When true pass -proxy-bindall to the executor.")
fs.BoolVar(&s.ExecutorRunProxy, "executor-run-proxy", s.ExecutorRunProxy, "Run the kube-proxy as a child process of the executor.")
fs.DurationVar(&s.ExecutorSuicideTimeout, "executor-suicide-timeout", s.ExecutorSuicideTimeout, "Executor self-terminates after this period of inactivity. Zero disables suicide watch.") fs.DurationVar(&s.ExecutorSuicideTimeout, "executor-suicide-timeout", s.ExecutorSuicideTimeout, "Executor self-terminates after this period of inactivity. Zero disables suicide watch.")
fs.StringVar(&s.ExecutorCgroupPrefix, "executor-cgroup-prefix", s.ExecutorCgroupPrefix, "The cgroup prefix concatenated with MESOS_DIRECTORY must give the executor cgroup set by Mesos") fs.StringVar(&s.ExecutorCgroupPrefix, "executor-cgroup-prefix", s.ExecutorCgroupPrefix, "The cgroup prefix concatenated with MESOS_DIRECTORY must give the executor cgroup set by Mesos")
fs.BoolVar(&s.ProxyBindall, "proxy-bindall", s.ProxyBindall, "When true pass -proxy-bindall to the executor.")
fs.BoolVar(&s.RunProxy, "run-proxy", s.RunProxy, "Run the kube-proxy as a side process of the executor.")
fs.IntVar(&s.ProxyLogV, "proxy-logv", s.ProxyLogV, "Logging verbosity of spawned minion proxy processes.")
fs.Var(resource.NewQuantityFlagValue(&s.MinionLogMaxSize), "minion-max-log-size", "Maximum log file size for the executor and proxy before rotation")
fs.IntVar(&s.MinionLogMaxAgeInDays, "minion-max-log-age", s.MinionLogMaxAgeInDays, "Maximum log file age of the executor and proxy in days")
fs.IntVar(&s.MinionLogMaxBackups, "minion-max-log-backups", s.MinionLogMaxBackups, "Maximum log file backups of the executor and proxy to keep after rotation")
fs.StringVar(&s.KubeletRootDirectory, "kubelet-root-dir", s.KubeletRootDirectory, "Directory path for managing kubelet files (volume mounts,etc). Defaults to executor sandbox.") fs.StringVar(&s.KubeletRootDirectory, "kubelet-root-dir", s.KubeletRootDirectory, "Directory path for managing kubelet files (volume mounts,etc). Defaults to executor sandbox.")
fs.StringVar(&s.KubeletDockerEndpoint, "kubelet-docker-endpoint", s.KubeletDockerEndpoint, "If non-empty, kubelet will use this for the docker endpoint to communicate with.") fs.StringVar(&s.KubeletDockerEndpoint, "kubelet-docker-endpoint", s.KubeletDockerEndpoint, "If non-empty, kubelet will use this for the docker endpoint to communicate with.")
fs.StringVar(&s.KubeletPodInfraContainerImage, "kubelet-pod-infra-container-image", s.KubeletPodInfraContainerImage, "The image whose network/ipc namespaces containers in each pod will use.") fs.StringVar(&s.KubeletPodInfraContainerImage, "kubelet-pod-infra-container-image", s.KubeletPodInfraContainerImage, "The image whose network/ipc namespaces containers in each pod will use.")
@ -235,7 +255,6 @@ func (s *SchedulerServer) addCoreFlags(fs *pflag.FlagSet) {
func (s *SchedulerServer) AddStandaloneFlags(fs *pflag.FlagSet) { func (s *SchedulerServer) AddStandaloneFlags(fs *pflag.FlagSet) {
s.addCoreFlags(fs) s.addCoreFlags(fs)
fs.StringVar(&s.ExecutorPath, "executor-path", s.ExecutorPath, "Location of the kubernetes executor executable") fs.StringVar(&s.ExecutorPath, "executor-path", s.ExecutorPath, "Location of the kubernetes executor executable")
fs.StringVar(&s.ProxyPath, "proxy-path", s.ProxyPath, "Location of the kubernetes proxy executable")
} }
func (s *SchedulerServer) AddHyperkubeFlags(fs *pflag.FlagSet) { func (s *SchedulerServer) AddHyperkubeFlags(fs *pflag.FlagSet) {
@ -283,7 +302,7 @@ func (s *SchedulerServer) prepareExecutorInfo(hks hyperkube.Interface) (*mesos.E
uri, executorCmd := s.serveFrameworkArtifact(s.ExecutorPath) uri, executorCmd := s.serveFrameworkArtifact(s.ExecutorPath)
ci.Uris = append(ci.Uris, &mesos.CommandInfo_URI{Value: proto.String(uri), Executable: proto.Bool(true)}) ci.Uris = append(ci.Uris, &mesos.CommandInfo_URI{Value: proto.String(uri), Executable: proto.Bool(true)})
ci.Value = proto.String(fmt.Sprintf("./%s", executorCmd)) ci.Value = proto.String(fmt.Sprintf("./%s", executorCmd))
} else if !hks.FindServer(hyperkube.KM_EXECUTOR) { } else if !hks.FindServer(hyperkube.CommandMinion) {
return nil, nil, fmt.Errorf("either run this scheduler via km or else --executor-path is required") return nil, nil, fmt.Errorf("either run this scheduler via km or else --executor-path is required")
} else { } else {
if strings.Index(s.KMPath, "://") > 0 { if strings.Index(s.KMPath, "://") > 0 {
@ -301,18 +320,16 @@ func (s *SchedulerServer) prepareExecutorInfo(hks hyperkube.Interface) (*mesos.E
ci.Uris = append(ci.Uris, &mesos.CommandInfo_URI{Value: proto.String(uri), Executable: proto.Bool(true)}) ci.Uris = append(ci.Uris, &mesos.CommandInfo_URI{Value: proto.String(uri), Executable: proto.Bool(true)})
ci.Value = proto.String(fmt.Sprintf("./%s", kmCmd)) ci.Value = proto.String(fmt.Sprintf("./%s", kmCmd))
} }
ci.Arguments = append(ci.Arguments, hyperkube.KM_MINION) ci.Arguments = append(ci.Arguments, hyperkube.CommandMinion)
}
if s.ProxyPath != "" { ci.Arguments = append(ci.Arguments, fmt.Sprintf("--run-proxy=%v", s.RunProxy))
uri, proxyCmd := s.serveFrameworkArtifact(s.ProxyPath) ci.Arguments = append(ci.Arguments, fmt.Sprintf("--proxy-bindall=%v", s.ProxyBindall))
ci.Uris = append(ci.Uris, &mesos.CommandInfo_URI{Value: proto.String(uri), Executable: proto.Bool(true)}) ci.Arguments = append(ci.Arguments, fmt.Sprintf("--proxy-logv=%d", s.ProxyLogV))
ci.Arguments = append(ci.Arguments, fmt.Sprintf("--proxy-exec=./%s", proxyCmd))
} else if !hks.FindServer(hyperkube.KM_PROXY) { ci.Arguments = append(ci.Arguments, fmt.Sprintf("--max-log-size=%v", s.MinionLogMaxSize.String()))
return nil, nil, fmt.Errorf("either run this scheduler via km or else --proxy-path is required") ci.Arguments = append(ci.Arguments, fmt.Sprintf("--max-log-backups=%d", s.MinionLogMaxBackups))
} else if s.ExecutorPath != "" { ci.Arguments = append(ci.Arguments, fmt.Sprintf("--max-log-age=%d", s.MinionLogMaxAgeInDays))
return nil, nil, fmt.Errorf("proxy can only use km binary if executor does the same") }
} // else, executor is smart enough to know when proxy-path is required, or to use km
//TODO(jdef): provide some way (env var?) for users to customize executor config //TODO(jdef): provide some way (env var?) for users to customize executor config
//TODO(jdef): set -address to 127.0.0.1 if `address` is 127.0.0.1 //TODO(jdef): set -address to 127.0.0.1 if `address` is 127.0.0.1
@ -320,7 +337,7 @@ func (s *SchedulerServer) prepareExecutorInfo(hks hyperkube.Interface) (*mesos.E
apiServerArgs := strings.Join(s.APIServerList, ",") apiServerArgs := strings.Join(s.APIServerList, ",")
ci.Arguments = append(ci.Arguments, fmt.Sprintf("--api-servers=%s", apiServerArgs)) ci.Arguments = append(ci.Arguments, fmt.Sprintf("--api-servers=%s", apiServerArgs))
ci.Arguments = append(ci.Arguments, fmt.Sprintf("--v=%d", s.ExecutorLogV)) ci.Arguments = append(ci.Arguments, fmt.Sprintf("--v=%d", s.ExecutorLogV)) // this also applies to the minion
ci.Arguments = append(ci.Arguments, fmt.Sprintf("--allow-privileged=%t", s.AllowPrivileged)) ci.Arguments = append(ci.Arguments, fmt.Sprintf("--allow-privileged=%t", s.AllowPrivileged))
ci.Arguments = append(ci.Arguments, fmt.Sprintf("--suicide-timeout=%v", s.ExecutorSuicideTimeout)) ci.Arguments = append(ci.Arguments, fmt.Sprintf("--suicide-timeout=%v", s.ExecutorSuicideTimeout))
@ -332,11 +349,8 @@ func (s *SchedulerServer) prepareExecutorInfo(hks hyperkube.Interface) (*mesos.E
} }
ci.Arguments = append(ci.Arguments, fmt.Sprintf("--cgroup-prefix=%v", s.ExecutorCgroupPrefix)) ci.Arguments = append(ci.Arguments, fmt.Sprintf("--cgroup-prefix=%v", s.ExecutorCgroupPrefix))
ci.Arguments = append(ci.Arguments, fmt.Sprintf("--proxy-bindall=%v", s.ExecutorProxyBindall))
ci.Arguments = append(ci.Arguments, fmt.Sprintf("--run-proxy=%v", s.ExecutorRunProxy))
ci.Arguments = append(ci.Arguments, fmt.Sprintf("--cadvisor-port=%v", s.KubeletCadvisorPort)) ci.Arguments = append(ci.Arguments, fmt.Sprintf("--cadvisor-port=%v", s.KubeletCadvisorPort))
ci.Arguments = append(ci.Arguments, fmt.Sprintf("--sync-frequency=%v", s.KubeletSyncFrequency)) ci.Arguments = append(ci.Arguments, fmt.Sprintf("--sync-frequency=%v", s.KubeletSyncFrequency))
ci.Arguments = append(ci.Arguments, fmt.Sprintf("--proxy-logv=%d", s.ExecutorProxyLogV))
if s.AuthPath != "" { if s.AuthPath != "" {
//TODO(jdef) should probably support non-local files, e.g. hdfs:///some/config/file //TODO(jdef) should probably support non-local files, e.g. hdfs:///some/config/file