diff --git a/cmd/hyperkube/kube-proxy.go b/cmd/hyperkube/kube-proxy.go index 98d8febe72f..460e3403171 100644 --- a/cmd/hyperkube/kube-proxy.go +++ b/cmd/hyperkube/kube-proxy.go @@ -19,8 +19,8 @@ limitations under the License. package main import ( - "github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/hyperkube" kubeproxy "github.com/GoogleCloudPlatform/kubernetes/cmd/kube-proxy/app" + "github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/hyperkube" ) // NewKubeProxy creates a new hyperkube Server object that includes the @@ -29,7 +29,7 @@ func NewKubeProxy() *Server { s := kubeproxy.NewProxyServer() hks := Server{ - SimpleUsage: hyperkube.KM_PROXY, + SimpleUsage: hyperkube.CommandProxy, Long: `The Kubernetes proxy server is responsible for taking traffic directed at services and forwarding it to the appropriate pods. It generally runs on nodes next to the Kubelet and proxies traffic from local pods to remote pods. diff --git a/cmd/hyperkube/kube-scheduler.go b/cmd/hyperkube/kube-scheduler.go index 9af65e14d03..2e9f0316e2b 100644 --- a/cmd/hyperkube/kube-scheduler.go +++ b/cmd/hyperkube/kube-scheduler.go @@ -29,7 +29,7 @@ func NewScheduler() *Server { s := scheduler.NewSchedulerServer() hks := Server{ - SimpleUsage: hyperkube.KM_SCHEDULER, + SimpleUsage: hyperkube.CommandScheduler, Long: "Implements a Kubernetes scheduler. This will assign pods to kubelets based on capacity and constraints.", Run: func(_ *Server, args []string) error { return s.Run(args) diff --git a/contrib/mesos/cmd/k8sm-executor/main.go b/contrib/mesos/cmd/k8sm-executor/main.go index 353f6b448ab..4ca45668c95 100644 --- a/contrib/mesos/cmd/k8sm-executor/main.go +++ b/contrib/mesos/cmd/k8sm-executor/main.go @@ -32,7 +32,7 @@ func main() { runtime.GOMAXPROCS(runtime.NumCPU()) s := service.NewKubeletExecutorServer() - s.AddStandaloneFlags(pflag.CommandLine) + s.AddFlags(pflag.CommandLine) util.InitFlags() util.InitLogs() diff --git a/contrib/mesos/cmd/km/k8sm-controllermanager.go b/contrib/mesos/cmd/km/k8sm-controllermanager.go index f9a8985a17f..7d941cdb237 100644 --- a/contrib/mesos/cmd/km/k8sm-controllermanager.go +++ b/contrib/mesos/cmd/km/k8sm-controllermanager.go @@ -28,7 +28,7 @@ func NewControllerManager() *Server { s := controllermanager.NewCMServer() hks := Server{ - SimpleUsage: hyperkube.KM_CONTROLLER_MANAGER, + SimpleUsage: hyperkube.CommandControllerManager, Long: "A server that runs a set of active components. This includes replication controllers, service endpoints and nodes.", Run: func(_ *Server, args []string) error { return s.Run(args) diff --git a/contrib/mesos/cmd/km/k8sm-executor.go b/contrib/mesos/cmd/km/k8sm-executor.go index 72db942816d..e2b2a2dca8c 100644 --- a/contrib/mesos/cmd/km/k8sm-executor.go +++ b/contrib/mesos/cmd/km/k8sm-executor.go @@ -17,16 +17,16 @@ limitations under the License. package main import ( - "github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/hyperkube" "github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/executor/service" + "github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/hyperkube" ) // NewHyperkubeServer creates a new hyperkube Server object that includes the // description and flags. func NewKubeletExecutor() *Server { - s := service.NewHyperKubeletExecutorServer() + s := service.NewKubeletExecutorServer() hks := Server{ - SimpleUsage: hyperkube.KM_EXECUTOR, + SimpleUsage: hyperkube.CommandExecutor, Long: `The kubelet-executor binary is responsible for maintaining a set of containers on a particular node. It syncs data from a specialized Mesos source that tracks task launches and kills. It then queries Docker to see what is currently @@ -36,6 +36,6 @@ containers by starting or stopping Docker containers.`, return s.Run(hks, args) }, } - s.AddHyperkubeFlags(hks.Flags()) + s.AddFlags(hks.Flags()) return &hks } diff --git a/contrib/mesos/cmd/km/k8sm-minion.go b/contrib/mesos/cmd/km/k8sm-minion.go new file mode 100644 index 00000000000..1bda8b14c61 --- /dev/null +++ b/contrib/mesos/cmd/km/k8sm-minion.go @@ -0,0 +1,39 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +import ( + "github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/hyperkube" + "github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/minion" +) + +// NewMinion creates a new hyperkube Server object that includes the +// description and flags. +func NewMinion() *Server { + s := minion.NewMinionServer() + hks := Server{ + SimpleUsage: hyperkube.CommandMinion, + Long: `Implements a Kubernetes minion. This will launch the proxy and executor.`, + Run: func(hks *Server, args []string) error { + return s.Run(hks, args) + }, + } + s.AddMinionFlags(hks.Flags()) + s.AddExecutorFlags(hks.Flags()) + + return &hks +} diff --git a/contrib/mesos/cmd/km/k8sm-scheduler.go b/contrib/mesos/cmd/km/k8sm-scheduler.go index 9d291f087fc..c23bd4854aa 100644 --- a/contrib/mesos/cmd/km/k8sm-scheduler.go +++ b/contrib/mesos/cmd/km/k8sm-scheduler.go @@ -28,7 +28,7 @@ func NewScheduler() *Server { s := service.NewSchedulerServer() hks := Server{ - SimpleUsage: hyperkube.KM_SCHEDULER, + SimpleUsage: hyperkube.CommandScheduler, Long: `Implements the Kubernetes-Mesos scheduler. This will launch Mesos tasks which results in pods assigned to kubelets based on capacity and constraints.`, Run: func(hks *Server, args []string) error { diff --git a/contrib/mesos/cmd/km/km.go b/contrib/mesos/cmd/km/km.go index e5a7292cabd..fea332f48b1 100644 --- a/contrib/mesos/cmd/km/km.go +++ b/contrib/mesos/cmd/km/km.go @@ -32,6 +32,7 @@ func main() { hk.AddServer(NewScheduler()) hk.AddServer(NewKubeletExecutor()) hk.AddServer(NewKubeProxy()) + hk.AddServer(NewMinion()) hk.RunToExit(os.Args) } diff --git a/contrib/mesos/cmd/km/kube-apiserver.go b/contrib/mesos/cmd/km/kube-apiserver.go index bc15dc2eae6..971293919ce 100644 --- a/contrib/mesos/cmd/km/kube-apiserver.go +++ b/contrib/mesos/cmd/km/kube-apiserver.go @@ -18,8 +18,8 @@ limitations under the License. package main import ( - "github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/hyperkube" kubeapiserver "github.com/GoogleCloudPlatform/kubernetes/cmd/kube-apiserver/app" + "github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/hyperkube" ) // NewKubeAPIServer creates a new hyperkube Server object that includes the @@ -28,7 +28,7 @@ func NewKubeAPIServer() *Server { s := kubeapiserver.NewAPIServer() hks := Server{ - SimpleUsage: hyperkube.KM_APISERVER, + SimpleUsage: hyperkube.CommandApiserver, Long: "The main API entrypoint and interface to the storage system. The API server is also the focal point for all authorization decisions.", Run: func(_ *Server, args []string) error { return s.Run(args) diff --git a/contrib/mesos/cmd/km/kube-proxy.go b/contrib/mesos/cmd/km/kube-proxy.go index f7f497afef2..b4ee8002501 100644 --- a/contrib/mesos/cmd/km/kube-proxy.go +++ b/contrib/mesos/cmd/km/kube-proxy.go @@ -18,8 +18,8 @@ limitations under the License. package main import ( - "github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/hyperkube" kubeproxy "github.com/GoogleCloudPlatform/kubernetes/cmd/kube-proxy/app" + "github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/hyperkube" ) // NewKubeProxy creates a new hyperkube Server object that includes the @@ -28,7 +28,7 @@ func NewKubeProxy() *Server { s := kubeproxy.NewProxyServer() hks := Server{ - SimpleUsage: hyperkube.KM_PROXY, + SimpleUsage: hyperkube.CommandProxy, Long: `The Kubernetes proxy server is responsible for taking traffic directed at services and forwarding it to the appropriate pods. It generally runs on nodes next to the Kubelet and proxies traffic from local pods to remote pods. diff --git a/contrib/mesos/pkg/executor/service/service.go b/contrib/mesos/pkg/executor/service/service.go index 021c9482b16..57a00b9e8a7 100644 --- a/contrib/mesos/pkg/executor/service/service.go +++ b/contrib/mesos/pkg/executor/service/service.go @@ -17,14 +17,12 @@ limitations under the License. package service import ( - "bufio" "fmt" "io" "math/rand" "net" "net/http" "os" - "os/exec" "path" "path/filepath" "strconv" @@ -37,7 +35,6 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/executor/config" "github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/hyperkube" "github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/redirfd" - "github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/runtime" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/credentialprovider" @@ -49,7 +46,6 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/util/mount" log "github.com/golang/glog" - "github.com/kardianos/osext" bindings "github.com/mesos/mesos-go/executor" "github.com/spf13/pflag" @@ -63,11 +59,6 @@ const ( type KubeletExecutorServer struct { *app.KubeletServer - RunProxy bool - ProxyLogV int - ProxyExec string - ProxyLogfile string - ProxyBindall bool SuicideTimeout time.Duration ShutdownFD int ShutdownFIFO string @@ -96,9 +87,6 @@ func findMesosCgroup(prefix string) string { func NewKubeletExecutorServer() *KubeletExecutorServer { k := &KubeletExecutorServer{ KubeletServer: app.NewKubeletServer(), - RunProxy: true, - ProxyExec: "./kube-proxy", - ProxyLogfile: "./proxy-log", SuicideTimeout: config.DefaultSuicideTimeout, cgroupPrefix: config.DefaultCgroupPrefix, } @@ -113,40 +101,14 @@ func NewKubeletExecutorServer() *KubeletExecutorServer { return k } -func NewHyperKubeletExecutorServer() *KubeletExecutorServer { - s := NewKubeletExecutorServer() - - // cache this for later use - binary, err := osext.Executable() - if err != nil { - log.Fatalf("failed to determine currently running executable: %v", err) - } - - s.ProxyExec = binary - return s -} - -func (s *KubeletExecutorServer) addCoreFlags(fs *pflag.FlagSet) { +func (s *KubeletExecutorServer) AddFlags(fs *pflag.FlagSet) { s.KubeletServer.AddFlags(fs) - fs.BoolVar(&s.RunProxy, "run-proxy", s.RunProxy, "Maintain a running kube-proxy instance as a child proc of this kubelet-executor.") - fs.IntVar(&s.ProxyLogV, "proxy-logv", s.ProxyLogV, "Log verbosity of the child kube-proxy.") - fs.StringVar(&s.ProxyLogfile, "proxy-logfile", s.ProxyLogfile, "Path to the kube-proxy log file.") - fs.BoolVar(&s.ProxyBindall, "proxy-bindall", s.ProxyBindall, "When true will cause kube-proxy to bind to 0.0.0.0.") fs.DurationVar(&s.SuicideTimeout, "suicide-timeout", s.SuicideTimeout, "Self-terminate after this period of inactivity. Zero disables suicide watch.") fs.IntVar(&s.ShutdownFD, "shutdown-fd", s.ShutdownFD, "File descriptor used to signal shutdown to external watchers, requires shutdown-fifo flag") fs.StringVar(&s.ShutdownFIFO, "shutdown-fifo", s.ShutdownFIFO, "FIFO used to signal shutdown to external watchers, requires shutdown-fd flag") fs.StringVar(&s.cgroupPrefix, "cgroup-prefix", s.cgroupPrefix, "The cgroup prefix concatenated with MESOS_DIRECTORY must give the executor cgroup set by Mesos") } -func (s *KubeletExecutorServer) AddStandaloneFlags(fs *pflag.FlagSet) { - s.addCoreFlags(fs) - fs.StringVar(&s.ProxyExec, "proxy-exec", s.ProxyExec, "Path to the kube-proxy executable.") -} - -func (s *KubeletExecutorServer) AddHyperkubeFlags(fs *pflag.FlagSet) { - s.addCoreFlags(fs) -} - // returns a Closer that should be closed to signal impending shutdown, but only if ShutdownFD // and ShutdownFIFO were specified. if they are specified, then this func blocks until there's // a reader on the FIFO stream. @@ -429,11 +391,6 @@ func (ks *KubeletExecutorServer) createAndInitKubelet( k := &kubeletExecutor{ Kubelet: klet, - runProxy: ks.RunProxy, - proxyLogV: ks.ProxyLogV, - proxyExec: ks.ProxyExec, - proxyLogfile: ks.ProxyLogfile, - proxyBindall: ks.ProxyBindall, address: ks.Address, dockerClient: kc.DockerClient, hks: hks, @@ -468,11 +425,6 @@ type kubeletExecutor struct { *kubelet.Kubelet initialize sync.Once driver bindings.ExecutorDriver - runProxy bool - proxyLogV int - proxyExec string - proxyLogfile string - proxyBindall bool address util.IP dockerClient dockertools.DockerInterface hks hyperkube.Interface @@ -485,9 +437,6 @@ func (kl *kubeletExecutor) ListenAndServe(address net.IP, port uint, tlsOptions // this func could be called many times, depending how often the HTTP server crashes, // so only execute certain initialization procs once kl.initialize.Do(func() { - if kl.runProxy { - go runtime.Until(kl.runProxyService, 5*time.Second, kl.executorDone) - } go func() { if _, err := kl.driver.Run(); err != nil { log.Fatalf("executor driver failed: %v", err) @@ -499,101 +448,6 @@ func (kl *kubeletExecutor) ListenAndServe(address net.IP, port uint, tlsOptions kubelet.ListenAndServeKubeletServer(kl, address, port, tlsOptions, enableDebuggingHandlers) } -// this function blocks as long as the proxy service is running; intended to be -// executed asynchronously. -func (kl *kubeletExecutor) runProxyService() { - log.Infof("Starting proxy process...") - - args := []string{} - - if kl.hks.FindServer(hyperkube.KM_PROXY) { - args = append(args, hyperkube.KM_PROXY) - log.V(1).Infof("attempting to using km proxy service") - } else if _, err := os.Stat(kl.proxyExec); os.IsNotExist(err) { - log.Errorf("failed to locate proxy executable at '%v' and km not present: %v", kl.proxyExec, err) - return - } - - bindAddress := "0.0.0.0" - if !kl.proxyBindall { - bindAddress = kl.address.String() - } - args = append(args, - fmt.Sprintf("--bind-address=%s", bindAddress), - fmt.Sprintf("--v=%d", kl.proxyLogV), - "--logtostderr=true", - ) - - // add client.Config args here. proxy still calls client.BindClientConfigFlags - appendStringArg := func(name, value string) { - if value != "" { - args = append(args, fmt.Sprintf("--%s=%s", name, value)) - } - } - appendStringArg("master", kl.clientConfig.Host) - /* TODO(jdef) move these flags to a config file pointed to by --kubeconfig - appendStringArg("api-version", kl.clientConfig.Version) - appendStringArg("client-certificate", kl.clientConfig.CertFile) - appendStringArg("client-key", kl.clientConfig.KeyFile) - appendStringArg("certificate-authority", kl.clientConfig.CAFile) - args = append(args, fmt.Sprintf("--insecure-skip-tls-verify=%t", kl.clientConfig.Insecure)) - */ - - log.Infof("Spawning process executable %s with args '%+v'", kl.proxyExec, args) - - cmd := exec.Command(kl.proxyExec, args...) - if _, err := cmd.StdoutPipe(); err != nil { - log.Fatal(err) - } - - proxylogs, err := cmd.StderrPipe() - if err != nil { - log.Fatal(err) - } - - //TODO(jdef) append instead of truncate? what if the disk is full? - logfile, err := os.Create(kl.proxyLogfile) - if err != nil { - log.Fatal(err) - } - defer logfile.Close() - - ch := make(chan struct{}) - go func() { - defer func() { - select { - case <-ch: - log.Infof("killing proxy process..") - if err = cmd.Process.Kill(); err != nil { - log.Errorf("failed to kill proxy process: %v", err) - } - default: - } - }() - - writer := bufio.NewWriter(logfile) - defer writer.Flush() - - <-ch - written, err := io.Copy(writer, proxylogs) - if err != nil { - log.Errorf("error writing data to proxy log: %v", err) - } - - log.Infof("wrote %d bytes to proxy log", written) - }() - - // if the proxy fails to start then we exit the executor, otherwise - // wait for the proxy process to end (and release resources after). - if err := cmd.Start(); err != nil { - log.Fatal(err) - } - close(ch) - if err := cmd.Wait(); err != nil { - log.Error(err) - } -} - // runs the main kubelet loop, closing the kubeletFinished chan when the loop exits. // never returns. func (kl *kubeletExecutor) Run(updates <-chan kubelet.PodUpdate) { diff --git a/contrib/mesos/pkg/hyperkube/hyperkube.go b/contrib/mesos/pkg/hyperkube/hyperkube.go index 4de62e378cf..200020a5067 100644 --- a/contrib/mesos/pkg/hyperkube/hyperkube.go +++ b/contrib/mesos/pkg/hyperkube/hyperkube.go @@ -17,9 +17,10 @@ limitations under the License. package hyperkube const ( - KM_APISERVER = "apiserver" - KM_CONTROLLER_MANAGER = "controller-manager" - KM_EXECUTOR = "executor" - KM_PROXY = "proxy" - KM_SCHEDULER = "scheduler" + CommandApiserver = "apiserver" + CommandControllerManager = "controller-manager" + CommandExecutor = "executor" + CommandMinion = "minion" + CommandProxy = "proxy" + CommandScheduler = "scheduler" ) diff --git a/contrib/mesos/pkg/minion/config/config.go b/contrib/mesos/pkg/minion/config/config.go new file mode 100644 index 00000000000..c7815e2316b --- /dev/null +++ b/contrib/mesos/pkg/minion/config/config.go @@ -0,0 +1,31 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package config + +import ( + "github.com/GoogleCloudPlatform/kubernetes/pkg/api/resource" +) + +const ( + DefaultLogMaxBackups = 5 // how many backup to keep + DefaultLogMaxAgeInDays = 7 // after how many days to rotate at most +) + +// DefaultLogMaxSize returns the maximal log file size before rotation +func DefaultLogMaxSize() resource.Quantity { + return *resource.NewQuantity(10*1024*1024, resource.BinarySI) +} diff --git a/contrib/mesos/pkg/minion/config/doc.go b/contrib/mesos/pkg/minion/config/doc.go new file mode 100644 index 00000000000..96f244c0dd3 --- /dev/null +++ b/contrib/mesos/pkg/minion/config/doc.go @@ -0,0 +1,18 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package config contains minion configuration constants. +package config diff --git a/contrib/mesos/pkg/minion/doc.go b/contrib/mesos/pkg/minion/doc.go new file mode 100644 index 00000000000..11253ede76f --- /dev/null +++ b/contrib/mesos/pkg/minion/doc.go @@ -0,0 +1,18 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package minion contains the executor and proxy bootstrap code for a Mesos slave +package minion diff --git a/contrib/mesos/pkg/minion/mountns_darwin.go b/contrib/mesos/pkg/minion/mountns_darwin.go new file mode 100644 index 00000000000..aa41ed87927 --- /dev/null +++ b/contrib/mesos/pkg/minion/mountns_darwin.go @@ -0,0 +1,25 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package minion + +import ( + log "github.com/golang/glog" +) + +func enterPrivateMountNamespace() { + log.Info("Skipping mount namespace, only available on Linux") +} diff --git a/contrib/mesos/pkg/minion/mountns_linux.go b/contrib/mesos/pkg/minion/mountns_linux.go new file mode 100644 index 00000000000..17f09313113 --- /dev/null +++ b/contrib/mesos/pkg/minion/mountns_linux.go @@ -0,0 +1,39 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package minion + +import ( + "syscall" + + log "github.com/golang/glog" +) + +func enterPrivateMountNamespace() { + // enter a new mount NS, useful for isolating changes to the mount table + // that are made by the kubelet for storage volumes. + err := syscall.Unshare(syscall.CLONE_NEWNS) + if err != nil { + log.Fatalf("failed to enter private mount NS: %v", err) + } + + // make the rootfs / rslave to the parent mount NS so that we + // pick up on any changes made there + err = syscall.Mount("", "/", "dontcare", syscall.MS_REC|syscall.MS_SLAVE, "") + if err != nil { + log.Fatalf("failed to mark / rslave: %v", err) + } +} diff --git a/contrib/mesos/pkg/minion/server.go b/contrib/mesos/pkg/minion/server.go new file mode 100644 index 00000000000..e93ba3113c5 --- /dev/null +++ b/contrib/mesos/pkg/minion/server.go @@ -0,0 +1,271 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package minion + +import ( + "fmt" + "io" + "io/ioutil" + "os" + "os/exec" + "time" + + exservice "github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/executor/service" + "github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/hyperkube" + "github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/minion/config" + "github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/runtime" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api/resource" + "github.com/GoogleCloudPlatform/kubernetes/pkg/client" + + log "github.com/golang/glog" + "github.com/kardianos/osext" + "github.com/spf13/pflag" + "gopkg.in/natefinch/lumberjack.v2" +) + +type MinionServer struct { + // embed the executor server to be able to use its flags + // TODO(sttts): get rid of this mixing of the minion and the executor server with a multiflags implementation for km + KubeletExecutorServer *exservice.KubeletExecutorServer + + privateMountNS bool + hks hyperkube.Interface + clientConfig *client.Config + kmBinary string + done chan struct{} // closed when shutting down + exit chan error // to signal fatal errors + + logMaxSize resource.Quantity + logMaxBackups int + logMaxAgeInDays int + + runProxy bool + proxyLogV int + proxyBindall bool +} + +// NewMinionServer creates the MinionServer struct with default values to be used by hyperkube +func NewMinionServer() *MinionServer { + s := &MinionServer{ + KubeletExecutorServer: exservice.NewKubeletExecutorServer(), + privateMountNS: true, + done: make(chan struct{}), + exit: make(chan error), + + logMaxSize: config.DefaultLogMaxSize(), + logMaxBackups: config.DefaultLogMaxBackups, + logMaxAgeInDays: config.DefaultLogMaxAgeInDays, + + runProxy: true, + } + + // cache this for later use + binary, err := osext.Executable() + if err != nil { + log.Fatalf("failed to determine currently running executable: %v", err) + } + s.kmBinary = binary + + return s +} + +// filterArgsByFlagSet returns a list of args which are parsed by the given flag set +// and another list with those which do not match +func filterArgsByFlagSet(args []string, flags *pflag.FlagSet) ([]string, []string) { + matched := []string{} + notMatched := []string{} + for _, arg := range args { + err := flags.Parse([]string{arg}) + if err != nil { + notMatched = append(notMatched, arg) + } else { + matched = append(matched, arg) + } + } + return matched, notMatched +} + +func (ms *MinionServer) launchProxyServer() { + bindAddress := "0.0.0.0" + if !ms.proxyBindall { + bindAddress = ms.KubeletExecutorServer.Address.String() + } + args := []string{ + fmt.Sprintf("--bind-address=%s", bindAddress), + fmt.Sprintf("--v=%d", ms.proxyLogV), + "--logtostderr=true", + } + + if ms.clientConfig.Host != "" { + args = append(args, fmt.Sprintf("--master=%s", ms.clientConfig.Host)) + } + + ms.launchHyperkubeServer(hyperkube.CommandProxy, &args, "proxy.log") +} + +func (ms *MinionServer) launchExecutorServer() { + allArgs := os.Args[1:] + + // filter out minion flags, leaving those for the executor + executorFlags := pflag.NewFlagSet("executor", pflag.ContinueOnError) + executorFlags.SetOutput(ioutil.Discard) + ms.AddExecutorFlags(executorFlags) + executorArgs, _ := filterArgsByFlagSet(allArgs, executorFlags) + + // run executor and quit minion server when this exits cleanly + err := ms.launchHyperkubeServer(hyperkube.CommandExecutor, &executorArgs, "executor.log") + if err != nil { + // just return, executor will be restarted on error + log.Error(err) + return + } + + log.Info("Executor exited cleanly, stopping the minion") + ms.exit <- nil +} + +func (ms *MinionServer) launchHyperkubeServer(server string, args *[]string, logFileName string) error { + log.V(2).Infof("Spawning hyperkube %v with args '%+v'", server, args) + + // prepare parameters + kmArgs := []string{server} + for _, arg := range *args { + kmArgs = append(kmArgs, arg) + } + + // create command + cmd := exec.Command(ms.kmBinary, kmArgs...) + if _, err := cmd.StdoutPipe(); err != nil { + // fatal error => terminate minion + err = fmt.Errorf("error getting stdout of %v: %v", server, err) + ms.exit <- err + return err + } + stderrLogs, err := cmd.StderrPipe() + if err != nil { + // fatal error => terminate minion + err = fmt.Errorf("error getting stderr of %v: %v", server, err) + ms.exit <- err + return err + } + + ch := make(chan struct{}) + go func() { + defer func() { + select { + case <-ch: + log.Infof("killing %v process...", server) + if err = cmd.Process.Kill(); err != nil { + log.Errorf("failed to kill %v process: %v", server, err) + } + default: + } + }() + + maxSize := ms.logMaxSize.Value() + if maxSize > 0 { + // convert to MB + maxSize = maxSize / 1024 / 1024 + if maxSize == 0 { + log.Warning("maximal log file size is rounded to 1 MB") + maxSize = 1 + } + } + writer := &lumberjack.Logger{ + Filename: logFileName, + MaxSize: int(maxSize), + MaxBackups: ms.logMaxBackups, + MaxAge: ms.logMaxAgeInDays, + } + defer writer.Close() + + log.V(2).Infof("Starting logging for %v: max log file size %d MB, keeping %d backups, for %d days", server, maxSize, ms.logMaxBackups, ms.logMaxAgeInDays) + + <-ch + written, err := io.Copy(writer, stderrLogs) + if err != nil { + log.Errorf("error writing data to %v: %v", logFileName, err) + } + + log.Infof("wrote %d bytes to %v", written, logFileName) + }() + + // if the server fails to start then we exit the executor, otherwise + // wait for the proxy process to end (and release resources after). + if err := cmd.Start(); err != nil { + // fatal error => terminate minion + err = fmt.Errorf("error starting %v: %v", server, err) + ms.exit <- err + return err + } + close(ch) + if err := cmd.Wait(); err != nil { + log.Error("%v exited with error: %v", server, err) + err = fmt.Errorf("%v exited with error: %v", server, err) + return err + } + + return nil +} + +// runs the main kubelet loop, closing the kubeletFinished chan when the loop exits. +// never returns. +func (ms *MinionServer) Run(hks hyperkube.Interface, _ []string) error { + if ms.privateMountNS { + // only the Linux version will do anything + enterPrivateMountNamespace() + } + + // create apiserver client + clientConfig, err := ms.KubeletExecutorServer.CreateAPIServerClientConfig() + if err != nil { + // required for k8sm since we need to send api.Binding information + // back to the apiserver + log.Fatalf("No API client: %v", err) + } + ms.clientConfig = clientConfig + + // run subprocesses until ms.done is closed on return of this function + defer close(ms.done) + if ms.runProxy { + go runtime.Until(ms.launchProxyServer, 5*time.Second, ms.done) + } + go runtime.Until(ms.launchExecutorServer, 5*time.Second, ms.done) + + // wait until minion exit is requested + // don't close ms.exit here to avoid panics of go routines writing an error to it + return <-ms.exit +} + +func (ms *MinionServer) AddExecutorFlags(fs *pflag.FlagSet) { + ms.KubeletExecutorServer.AddFlags(fs) +} + +func (ms *MinionServer) AddMinionFlags(fs *pflag.FlagSet) { + // general minion flags + fs.BoolVar(&ms.privateMountNS, "private-mountns", ms.privateMountNS, "Enter a private mount NS before spawning procs (linux only).") + + // log file flags + fs.Var(resource.NewQuantityFlagValue(&ms.logMaxSize), "max-log-size", "Maximum log file size for the executor and proxy before rotation") + fs.IntVar(&ms.logMaxAgeInDays, "max-log-age", ms.logMaxAgeInDays, "Maximum log file age of the executor and proxy in days") + fs.IntVar(&ms.logMaxBackups, "max-log-backups", ms.logMaxBackups, "Maximum log file backups of the executor and proxy to keep after rotation") + + // proxy flags + fs.BoolVar(&ms.runProxy, "run-proxy", ms.runProxy, "Maintain a running kube-proxy instance as a child proc of this kubelet-executor.") + fs.IntVar(&ms.proxyLogV, "proxy-logv", ms.proxyLogV, "Log verbosity of the child kube-proxy.") + fs.BoolVar(&ms.proxyBindall, "proxy-bindall", ms.proxyBindall, "When true will cause kube-proxy to bind to 0.0.0.0.") +} diff --git a/contrib/mesos/pkg/scheduler/service/service.go b/contrib/mesos/pkg/scheduler/service/service.go index cfcfe1dd0ab..0e8c556eeb8 100644 --- a/contrib/mesos/pkg/scheduler/service/service.go +++ b/contrib/mesos/pkg/scheduler/service/service.go @@ -36,6 +36,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/election" execcfg "github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/executor/config" "github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/hyperkube" + minioncfg "github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/minion/config" "github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/profile" "github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/runtime" "github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/scheduler" @@ -46,6 +47,7 @@ import ( mresource "github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/scheduler/resource" "github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/scheduler/uid" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api/resource" "github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/clientauth" "github.com/GoogleCloudPlatform/kubernetes/pkg/master/ports" @@ -79,30 +81,37 @@ const ( ) type SchedulerServer struct { - Port int - Address util.IP - EnableProfiling bool - AuthPath string - APIServerList util.StringList - EtcdServerList util.StringList - EtcdConfigFile string - AllowPrivileged bool - ExecutorPath string - ProxyPath string - MesosMaster string - MesosUser string - MesosRole string - MesosAuthPrincipal string - MesosAuthSecretFile string - Checkpoint bool - FailoverTimeout float64 - ExecutorBindall bool - ExecutorRunProxy bool - ExecutorProxyBindall bool - ExecutorLogV int - ExecutorProxyLogV int - ExecutorSuicideTimeout time.Duration - ExecutorCgroupPrefix string + Port int + Address util.IP + EnableProfiling bool + AuthPath string + APIServerList util.StringList + EtcdServerList util.StringList + EtcdConfigFile string + AllowPrivileged bool + ExecutorPath string + ProxyPath string + MesosMaster string + MesosUser string + MesosRole string + MesosAuthPrincipal string + MesosAuthSecretFile string + Checkpoint bool + FailoverTimeout float64 + + ExecutorLogV int + ExecutorBindall bool + ExecutorSuicideTimeout time.Duration + ExecutorCgroupPrefix string + + RunProxy bool + ProxyBindall bool + ProxyLogV int + + MinionLogMaxSize resource.Quantity + MinionLogMaxBackups int + MinionLogMaxAgeInDays int + MesosAuthProvider string DriverPort uint HostnameOverride string @@ -147,23 +156,29 @@ type schedulerProcessInterface interface { // NewSchedulerServer creates a new SchedulerServer with default parameters func NewSchedulerServer() *SchedulerServer { s := SchedulerServer{ - Port: ports.SchedulerPort, - Address: util.IP(net.ParseIP("127.0.0.1")), - FailoverTimeout: time.Duration((1 << 62) - 1).Seconds(), - ExecutorRunProxy: true, + Port: ports.SchedulerPort, + Address: util.IP(net.ParseIP("127.0.0.1")), + FailoverTimeout: time.Duration((1 << 62) - 1).Seconds(), + + RunProxy: true, ExecutorSuicideTimeout: execcfg.DefaultSuicideTimeout, ExecutorCgroupPrefix: execcfg.DefaultCgroupPrefix, - MesosAuthProvider: sasl.ProviderName, - MesosMaster: defaultMesosMaster, - MesosUser: defaultMesosUser, - ReconcileInterval: defaultReconcileInterval, - ReconcileCooldown: defaultReconcileCooldown, - Checkpoint: true, - FrameworkName: defaultFrameworkName, - HA: false, - mux: http.NewServeMux(), - KubeletCadvisorPort: 4194, // copied from github.com/GoogleCloudPlatform/kubernetes/blob/release-0.14/cmd/kubelet/app/server.go - KubeletSyncFrequency: 10 * time.Second, + + MinionLogMaxSize: minioncfg.DefaultLogMaxSize(), + MinionLogMaxBackups: minioncfg.DefaultLogMaxBackups, + MinionLogMaxAgeInDays: minioncfg.DefaultLogMaxAgeInDays, + + MesosAuthProvider: sasl.ProviderName, + MesosMaster: defaultMesosMaster, + MesosUser: defaultMesosUser, + ReconcileInterval: defaultReconcileInterval, + ReconcileCooldown: defaultReconcileCooldown, + Checkpoint: true, + FrameworkName: defaultFrameworkName, + HA: false, + mux: http.NewServeMux(), + KubeletCadvisorPort: 4194, // copied from github.com/GoogleCloudPlatform/kubernetes/blob/release-0.14/cmd/kubelet/app/server.go + KubeletSyncFrequency: 10 * time.Second, } // cache this for later use. also useful in case the original binary gets deleted, e.g. // during upgrades, development deployments, etc. @@ -212,14 +227,19 @@ func (s *SchedulerServer) addCoreFlags(fs *pflag.FlagSet) { fs.Var(&s.DefaultContainerCPULimit, "default-container-cpu-limit", "Containers without a CPU resource limit are admitted this much CPU shares") fs.Var(&s.DefaultContainerMemLimit, "default-container-mem-limit", "Containers without a memory resource limit are admitted this much amount of memory in MB") + fs.IntVar(&s.ExecutorLogV, "executor-logv", s.ExecutorLogV, "Logging verbosity of spawned minion and executor processes.") fs.BoolVar(&s.ExecutorBindall, "executor-bindall", s.ExecutorBindall, "When true will set -address of the executor to 0.0.0.0.") - fs.IntVar(&s.ExecutorLogV, "executor-logv", s.ExecutorLogV, "Logging verbosity of spawned executor processes.") - fs.IntVar(&s.ExecutorProxyLogV, "executor-proxy-logv", s.ExecutorProxyLogV, "Logging verbosity of spawned executor proxy processes.") - fs.BoolVar(&s.ExecutorProxyBindall, "executor-proxy-bindall", s.ExecutorProxyBindall, "When true pass -proxy-bindall to the executor.") - fs.BoolVar(&s.ExecutorRunProxy, "executor-run-proxy", s.ExecutorRunProxy, "Run the kube-proxy as a child process of the executor.") fs.DurationVar(&s.ExecutorSuicideTimeout, "executor-suicide-timeout", s.ExecutorSuicideTimeout, "Executor self-terminates after this period of inactivity. Zero disables suicide watch.") fs.StringVar(&s.ExecutorCgroupPrefix, "executor-cgroup-prefix", s.ExecutorCgroupPrefix, "The cgroup prefix concatenated with MESOS_DIRECTORY must give the executor cgroup set by Mesos") + fs.BoolVar(&s.ProxyBindall, "proxy-bindall", s.ProxyBindall, "When true pass -proxy-bindall to the executor.") + fs.BoolVar(&s.RunProxy, "run-proxy", s.RunProxy, "Run the kube-proxy as a side process of the executor.") + fs.IntVar(&s.ProxyLogV, "proxy-logv", s.ProxyLogV, "Logging verbosity of spawned minion proxy processes.") + + fs.Var(resource.NewQuantityFlagValue(&s.MinionLogMaxSize), "minion-max-log-size", "Maximum log file size for the executor and proxy before rotation") + fs.IntVar(&s.MinionLogMaxAgeInDays, "minion-max-log-age", s.MinionLogMaxAgeInDays, "Maximum log file age of the executor and proxy in days") + fs.IntVar(&s.MinionLogMaxBackups, "minion-max-log-backups", s.MinionLogMaxBackups, "Maximum log file backups of the executor and proxy to keep after rotation") + fs.StringVar(&s.KubeletRootDirectory, "kubelet-root-dir", s.KubeletRootDirectory, "Directory path for managing kubelet files (volume mounts,etc). Defaults to executor sandbox.") fs.StringVar(&s.KubeletDockerEndpoint, "kubelet-docker-endpoint", s.KubeletDockerEndpoint, "If non-empty, kubelet will use this for the docker endpoint to communicate with.") fs.StringVar(&s.KubeletPodInfraContainerImage, "kubelet-pod-infra-container-image", s.KubeletPodInfraContainerImage, "The image whose network/ipc namespaces containers in each pod will use.") @@ -235,7 +255,6 @@ func (s *SchedulerServer) addCoreFlags(fs *pflag.FlagSet) { func (s *SchedulerServer) AddStandaloneFlags(fs *pflag.FlagSet) { s.addCoreFlags(fs) fs.StringVar(&s.ExecutorPath, "executor-path", s.ExecutorPath, "Location of the kubernetes executor executable") - fs.StringVar(&s.ProxyPath, "proxy-path", s.ProxyPath, "Location of the kubernetes proxy executable") } func (s *SchedulerServer) AddHyperkubeFlags(fs *pflag.FlagSet) { @@ -283,7 +302,7 @@ func (s *SchedulerServer) prepareExecutorInfo(hks hyperkube.Interface) (*mesos.E uri, executorCmd := s.serveFrameworkArtifact(s.ExecutorPath) ci.Uris = append(ci.Uris, &mesos.CommandInfo_URI{Value: proto.String(uri), Executable: proto.Bool(true)}) ci.Value = proto.String(fmt.Sprintf("./%s", executorCmd)) - } else if !hks.FindServer(hyperkube.KM_EXECUTOR) { + } else if !hks.FindServer(hyperkube.CommandMinion) { return nil, nil, fmt.Errorf("either run this scheduler via km or else --executor-path is required") } else { if strings.Index(s.KMPath, "://") > 0 { @@ -301,18 +320,16 @@ func (s *SchedulerServer) prepareExecutorInfo(hks hyperkube.Interface) (*mesos.E ci.Uris = append(ci.Uris, &mesos.CommandInfo_URI{Value: proto.String(uri), Executable: proto.Bool(true)}) ci.Value = proto.String(fmt.Sprintf("./%s", kmCmd)) } - ci.Arguments = append(ci.Arguments, hyperkube.KM_MINION) - } + ci.Arguments = append(ci.Arguments, hyperkube.CommandMinion) - if s.ProxyPath != "" { - uri, proxyCmd := s.serveFrameworkArtifact(s.ProxyPath) - ci.Uris = append(ci.Uris, &mesos.CommandInfo_URI{Value: proto.String(uri), Executable: proto.Bool(true)}) - ci.Arguments = append(ci.Arguments, fmt.Sprintf("--proxy-exec=./%s", proxyCmd)) - } else if !hks.FindServer(hyperkube.KM_PROXY) { - return nil, nil, fmt.Errorf("either run this scheduler via km or else --proxy-path is required") - } else if s.ExecutorPath != "" { - return nil, nil, fmt.Errorf("proxy can only use km binary if executor does the same") - } // else, executor is smart enough to know when proxy-path is required, or to use km + ci.Arguments = append(ci.Arguments, fmt.Sprintf("--run-proxy=%v", s.RunProxy)) + ci.Arguments = append(ci.Arguments, fmt.Sprintf("--proxy-bindall=%v", s.ProxyBindall)) + ci.Arguments = append(ci.Arguments, fmt.Sprintf("--proxy-logv=%d", s.ProxyLogV)) + + ci.Arguments = append(ci.Arguments, fmt.Sprintf("--max-log-size=%v", s.MinionLogMaxSize.String())) + ci.Arguments = append(ci.Arguments, fmt.Sprintf("--max-log-backups=%d", s.MinionLogMaxBackups)) + ci.Arguments = append(ci.Arguments, fmt.Sprintf("--max-log-age=%d", s.MinionLogMaxAgeInDays)) + } //TODO(jdef): provide some way (env var?) for users to customize executor config //TODO(jdef): set -address to 127.0.0.1 if `address` is 127.0.0.1 @@ -320,7 +337,7 @@ func (s *SchedulerServer) prepareExecutorInfo(hks hyperkube.Interface) (*mesos.E apiServerArgs := strings.Join(s.APIServerList, ",") ci.Arguments = append(ci.Arguments, fmt.Sprintf("--api-servers=%s", apiServerArgs)) - ci.Arguments = append(ci.Arguments, fmt.Sprintf("--v=%d", s.ExecutorLogV)) + ci.Arguments = append(ci.Arguments, fmt.Sprintf("--v=%d", s.ExecutorLogV)) // this also applies to the minion ci.Arguments = append(ci.Arguments, fmt.Sprintf("--allow-privileged=%t", s.AllowPrivileged)) ci.Arguments = append(ci.Arguments, fmt.Sprintf("--suicide-timeout=%v", s.ExecutorSuicideTimeout)) @@ -332,11 +349,8 @@ func (s *SchedulerServer) prepareExecutorInfo(hks hyperkube.Interface) (*mesos.E } ci.Arguments = append(ci.Arguments, fmt.Sprintf("--cgroup-prefix=%v", s.ExecutorCgroupPrefix)) - ci.Arguments = append(ci.Arguments, fmt.Sprintf("--proxy-bindall=%v", s.ExecutorProxyBindall)) - ci.Arguments = append(ci.Arguments, fmt.Sprintf("--run-proxy=%v", s.ExecutorRunProxy)) ci.Arguments = append(ci.Arguments, fmt.Sprintf("--cadvisor-port=%v", s.KubeletCadvisorPort)) ci.Arguments = append(ci.Arguments, fmt.Sprintf("--sync-frequency=%v", s.KubeletSyncFrequency)) - ci.Arguments = append(ci.Arguments, fmt.Sprintf("--proxy-logv=%d", s.ExecutorProxyLogV)) if s.AuthPath != "" { //TODO(jdef) should probably support non-local files, e.g. hdfs:///some/config/file