Merge pull request #4240 from thockin/hyperkube

Hyperkube cleanup
This commit is contained in:
Daniel Smith
2015-02-20 11:12:32 -08:00
26 changed files with 434 additions and 345 deletions

View File

@@ -14,32 +14,186 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
// A binary that can morph into all of the other kubernetes binaries. You can
// also soft-link to it busybox style.
package main
import (
"errors"
"flag"
"fmt"
"io"
"io/ioutil"
"os"
"path"
"github.com/GoogleCloudPlatform/kubernetes/pkg/controllermanager"
"github.com/GoogleCloudPlatform/kubernetes/pkg/hyperkube"
kubelet "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/server"
apiserver "github.com/GoogleCloudPlatform/kubernetes/pkg/master/server"
proxy "github.com/GoogleCloudPlatform/kubernetes/pkg/proxy/server"
sched "github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler/server"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/version/verflag"
"github.com/spf13/pflag"
)
func main() {
hk := hyperkube.HyperKube{
Name: "hyperkube",
Long: "This is an all-in-one binary that can run any of the various Kubernetes servers.",
// HyperKube represents a single binary that can morph/manage into multiple
// servers.
type HyperKube struct {
Name string // The executable name, used for help and soft-link invocation
Long string // A long description of the binary. It will be world wrapped before output.
servers []Server
baseFlags *pflag.FlagSet
out io.Writer
helpFlagVal bool
}
// AddServer adds a server to the HyperKube object.
func (hk *HyperKube) AddServer(s *Server) {
hk.servers = append(hk.servers, *s)
hk.servers[len(hk.servers)-1].hk = hk
}
// FindServer will find a specific server named name.
func (hk *HyperKube) FindServer(name string) (*Server, error) {
for _, s := range hk.servers {
if s.Name() == name {
return &s, nil
}
}
return nil, fmt.Errorf("Server not found: %s", name)
}
// Servers returns a list of all of the registred servers
func (hk *HyperKube) Servers() []Server {
return hk.servers
}
// Flags returns a flagset for "global" flags.
func (hk *HyperKube) Flags() *pflag.FlagSet {
if hk.baseFlags == nil {
hk.baseFlags = pflag.NewFlagSet(hk.Name, pflag.ContinueOnError)
hk.baseFlags.SetOutput(ioutil.Discard)
hk.baseFlags.BoolVarP(&hk.helpFlagVal, "help", "h", false, "help for "+hk.Name)
// These will add all of the "global" flags (defined with both the
// flag and pflag packages) to the new flag set we have.
util.AddFlagSetToPFlagSet(flag.CommandLine, hk.baseFlags)
util.AddPFlagSetToPFlagSet(pflag.CommandLine, hk.baseFlags)
}
return hk.baseFlags
}
// Out returns the io.Writer that is used for all usage/error information
func (hk *HyperKube) Out() io.Writer {
if hk.out == nil {
hk.out = os.Stderr
}
return hk.out
}
// SetOut sets the output writer for all usage/error information
func (hk *HyperKube) SetOut(w io.Writer) {
hk.out = w
}
// Print is a convenience method to Print to the defined output
func (hk *HyperKube) Print(i ...interface{}) {
fmt.Fprint(hk.Out(), i...)
}
// Println is a convenience method to Println to the defined output
func (hk *HyperKube) Println(i ...interface{}) {
fmt.Fprintln(hk.Out(), i...)
}
// Printf is a convenience method to Printf to the defined output
func (hk *HyperKube) Printf(format string, i ...interface{}) {
fmt.Fprintf(hk.Out(), format, i...)
}
// Run the server. This will pick the appropriate server and run it.
func (hk *HyperKube) Run(args []string) error {
// If we are called directly, parse all flags up to the first real
// argument. That should be the server to run.
baseCommand := path.Base(args[0])
serverName := baseCommand
if serverName == hk.Name {
args = args[1:]
baseFlags := hk.Flags()
baseFlags.SetInterspersed(false) // Only parse flags up to the next real command
err := baseFlags.Parse(args)
if err != nil || hk.helpFlagVal {
if err != nil {
hk.Println("Error:", err)
}
hk.Usage()
return err
}
verflag.PrintAndExitIfRequested()
args = baseFlags.Args()
if len(args) > 0 && len(args[0]) > 0 {
serverName = args[0]
baseCommand = baseCommand + " " + serverName
args = args[1:]
} else {
err = errors.New("No server specified")
hk.Printf("Error: %v\n\n", err)
hk.Usage()
return err
}
}
hk.AddServer(apiserver.NewHyperkubeServer())
hk.AddServer(controllermanager.NewHyperkubeServer())
hk.AddServer(sched.NewHyperkubeServer())
hk.AddServer(kubelet.NewHyperkubeServer())
hk.AddServer(proxy.NewHyperkubeServer())
s, err := hk.FindServer(serverName)
if err != nil {
hk.Printf("Error: %v\n\n", err)
hk.Usage()
return err
}
hk.RunToExit(os.Args)
util.AddPFlagSetToPFlagSet(hk.Flags(), s.Flags())
err = s.Flags().Parse(args)
if err != nil || hk.helpFlagVal {
if err != nil {
hk.Printf("Error: %v\n\n", err)
}
s.Usage()
return err
}
verflag.PrintAndExitIfRequested()
util.InitLogs()
defer util.FlushLogs()
err = s.Run(s, s.Flags().Args())
if err != nil {
hk.Println("Error:", err)
}
return err
}
// RunToExit will run the hyperkube and then call os.Exit with an appropriate exit code.
func (hk *HyperKube) RunToExit(args []string) {
err := hk.Run(args)
if err != nil {
os.Exit(1)
}
os.Exit(0)
}
// Usage will write out a summary for all servers that this binary supports.
func (hk *HyperKube) Usage() {
tt := `{{if .Long}}{{.Long | trim | wrap ""}}
{{end}}Usage
{{.Name}} <server> [flags]
Servers
{{range .Servers}}
{{.Name}}
{{.Long | trim | wrap " "}}{{end}}
Call '{{.Name}} <server> --help' for help on a specific server.
`
util.ExecuteTemplate(hk.Out(), tt, hk)
}

View File

@@ -0,0 +1,143 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package main
import (
"bytes"
"errors"
"fmt"
"strings"
"testing"
"github.com/stretchr/testify/assert"
)
type result struct {
err error
output string
}
func testServer(n string) *Server {
return &Server{
SimpleUsage: n,
Long: fmt.Sprintf("A simple server named %s", n),
Run: func(s *Server, args []string) error {
s.hk.Printf("%s Run\n", s.Name())
return nil
},
}
}
func testServerError(n string) *Server {
return &Server{
SimpleUsage: n,
Long: fmt.Sprintf("A simple server named %s that returns an error", n),
Run: func(s *Server, args []string) error {
s.hk.Printf("%s Run\n", s.Name())
return errors.New("Server returning error")
},
}
}
func runFull(t *testing.T, args string) *result {
buf := new(bytes.Buffer)
hk := HyperKube{
Name: "hyperkube",
Long: "hyperkube is an all-in-one server binary.",
}
hk.SetOut(buf)
hk.AddServer(testServer("test1"))
hk.AddServer(testServer("test2"))
hk.AddServer(testServer("test3"))
hk.AddServer(testServerError("test-error"))
a := strings.Split(args, " ")
t.Logf("Running full with args: %q", a)
err := hk.Run(a)
r := &result{err, buf.String()}
t.Logf("Result err: %v, output: %q", r.err, r.output)
return r
}
func TestRun(t *testing.T) {
x := runFull(t, "hyperkube test1")
assert.Contains(t, x.output, "test1 Run")
assert.NoError(t, x.err)
}
func TestLinkRun(t *testing.T) {
x := runFull(t, "test1")
assert.Contains(t, x.output, "test1 Run")
assert.NoError(t, x.err)
}
func TestTopNoArgs(t *testing.T) {
x := runFull(t, "hyperkube")
assert.EqualError(t, x.err, "No server specified")
}
func TestBadServer(t *testing.T) {
x := runFull(t, "hyperkube bad-server")
assert.EqualError(t, x.err, "Server not found: bad-server")
assert.Contains(t, x.output, "Usage")
}
func TestTopHelp(t *testing.T) {
x := runFull(t, "hyperkube --help")
assert.NoError(t, x.err)
assert.Contains(t, x.output, "all-in-one")
assert.Contains(t, x.output, "A simple server named test1")
}
func TestTopFlags(t *testing.T) {
x := runFull(t, "hyperkube --help test1")
assert.NoError(t, x.err)
assert.Contains(t, x.output, "all-in-one")
assert.Contains(t, x.output, "A simple server named test1")
assert.NotContains(t, x.output, "test1 Run")
}
func TestTopFlagsBad(t *testing.T) {
x := runFull(t, "hyperkube --bad-flag")
assert.EqualError(t, x.err, "unknown flag: --bad-flag")
assert.Contains(t, x.output, "all-in-one")
assert.Contains(t, x.output, "A simple server named test1")
}
func TestServerHelp(t *testing.T) {
x := runFull(t, "hyperkube test1 --help")
assert.NoError(t, x.err)
assert.Contains(t, x.output, "A simple server named test1")
assert.Contains(t, x.output, "--help=false: help for hyperkube")
assert.NotContains(t, x.output, "test1 Run")
}
func TestServerFlagsBad(t *testing.T) {
x := runFull(t, "hyperkube test1 --bad-flag")
assert.EqualError(t, x.err, "unknown flag: --bad-flag")
assert.Contains(t, x.output, "A simple server named test1")
assert.Contains(t, x.output, "--help=false: help for hyperkube")
assert.NotContains(t, x.output, "test1 Run")
}
func TestServerError(t *testing.T) {
x := runFull(t, "hyperkube test-error")
assert.Contains(t, x.output, "test-error Run")
assert.EqualError(t, x.err, "Server returning error")
}

View File

@@ -0,0 +1,37 @@
/*
Copyright 2015 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package main
import (
kubeapiserver "github.com/GoogleCloudPlatform/kubernetes/cmd/kube-apiserver/app"
)
// NewKubeAPIServer creates a new hyperkube Server object that includes the
// description and flags.
func NewKubeAPIServer() *Server {
s := kubeapiserver.NewAPIServer()
hks := Server{
SimpleUsage: "apiserver",
Long: "The main API entrypoint and interface to the storage system. The API server is also the focal point for all authorization decisions.",
Run: func(_ *Server, args []string) error {
return s.Run(args)
},
}
s.AddFlags(hks.Flags())
return &hks
}

View File

@@ -0,0 +1,37 @@
/*
Copyright 2015 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package main
import (
controllermgr "github.com/GoogleCloudPlatform/kubernetes/cmd/kube-controller-manager/app"
)
// NewKubeControllerManager creates a new hyperkube Server object that includes the
// description and flags.
func NewKubeControllerManager() *Server {
s := controllermgr.NewCMServer()
hks := Server{
SimpleUsage: "controller-manager",
Long: "A server that runs a set of active components. This includes replication controllers, service endpoints and nodes.",
Run: func(_ *Server, args []string) error {
return s.Run(args)
},
}
s.AddFlags(hks.Flags())
return &hks
}

View File

@@ -0,0 +1,40 @@
/*
Copyright 2015 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package main
import (
kubeproxy "github.com/GoogleCloudPlatform/kubernetes/cmd/kube-proxy/app"
)
// NewKubeProxy creates a new hyperkube Server object that includes the
// description and flags.
func NewKubeProxy() *Server {
s := kubeproxy.NewProxyServer()
hks := Server{
SimpleUsage: "proxy",
Long: `The Kubernetes proxy server is responsible for taking traffic directed at
services and forwarding it to the appropriate pods. It generally runs on
nodes next to the Kubelet and proxies traffic from local pods to remote pods.
It is also used when handling incoming external traffic.`,
Run: func(_ *Server, args []string) error {
return s.Run(args)
},
}
s.AddFlags(hks.Flags())
return &hks
}

View File

@@ -0,0 +1,37 @@
/*
Copyright 2015 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package main
import (
scheduler "github.com/GoogleCloudPlatform/kubernetes/plugin/cmd/kube-scheduler/app"
)
// NewScheduler creates a new hyperkube Server object that includes the
// description and flags.
func NewScheduler() *Server {
s := scheduler.NewSchedulerServer()
hks := Server{
SimpleUsage: "scheduler",
Long: "Implements a Kubernetes scheduler. This will assign pods to kubelets based on capacity and constraints.",
Run: func(_ *Server, args []string) error {
return s.Run(args)
},
}
s.AddFlags(hks.Flags())
return &hks
}

41
cmd/hyperkube/kubelet.go Normal file
View File

@@ -0,0 +1,41 @@
/*
Copyright 2015 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package main
import (
kubelet "github.com/GoogleCloudPlatform/kubernetes/cmd/kubelet/app"
)
// NewKubelet creates a new hyperkube Server object that includes the
// description and flags.
func NewKubelet() *Server {
s := kubelet.NewKubeletServer()
hks := Server{
SimpleUsage: "kubelet",
Long: `The kubelet binary is responsible for maintaining a set of containers on a
particular node. It syncs data from a variety of sources including a
Kubernetes API server, an etcd cluster, HTTP endpoint or local file. It then
queries Docker to see what is currently running. It synchronizes the
configuration data, with the running set of containers by starting or stopping
Docker containers.`,
Run: func(_ *Server, args []string) error {
return s.Run(args)
},
}
s.AddFlags(hks.Flags())
return &hks
}

38
cmd/hyperkube/main.go Normal file
View File

@@ -0,0 +1,38 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// A binary that can morph into all of the other kubernetes binaries. You can
// also soft-link to it busybox style.
package main
import (
"os"
)
func main() {
hk := HyperKube{
Name: "hyperkube",
Long: "This is an all-in-one binary that can run any of the various Kubernetes servers.",
}
hk.AddServer(NewKubeAPIServer())
hk.AddServer(NewKubeControllerManager())
hk.AddServer(NewScheduler())
hk.AddServer(NewKubelet())
hk.AddServer(NewKubeProxy())
hk.RunToExit(os.Args)
}

73
cmd/hyperkube/server.go Normal file
View File

@@ -0,0 +1,73 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package main
import (
"io/ioutil"
"strings"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/spf13/pflag"
)
type serverRunFunc func(s *Server, args []string) error
// Server describes a server that this binary can morph into.
type Server struct {
SimpleUsage string // One line description of the server.
Long string // Longer free form description of the server
Run serverRunFunc // Run the server. This is not expected to return.
flags *pflag.FlagSet // Flags for the command (and all dependents)
name string
hk *HyperKube
}
// Usage returns the full usage string including all of the flags.
func (s *Server) Usage() error {
tt := `{{if .Long}}{{.Long | trim | wrap ""}}
{{end}}Usage:
{{.SimpleUsage}} [flags]
Available Flags:
{{.Flags.FlagUsages}}`
return util.ExecuteTemplate(s.hk.Out(), tt, s)
}
// Name returns the name of the command as derived from the usage line.
func (s *Server) Name() string {
if s.name != "" {
return s.name
}
name := s.SimpleUsage
i := strings.Index(name, " ")
if i >= 0 {
name = name[:i]
}
return name
}
// Flags returns a flagset for this server
func (s *Server) Flags() *pflag.FlagSet {
if s.flags == nil {
s.flags = pflag.NewFlagSet(s.Name(), pflag.ContinueOnError)
s.flags.SetOutput(ioutil.Discard)
}
return s.flags
}

View File

@@ -32,6 +32,7 @@ import (
"sync"
"time"
kubeletapp "github.com/GoogleCloudPlatform/kubernetes/cmd/kubelet/app"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/latest"
@@ -41,7 +42,6 @@ import (
nodeControllerPkg "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/controller"
replicationControllerPkg "github.com/GoogleCloudPlatform/kubernetes/pkg/controller"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools"
kubeletServer "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/server"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/volume/empty_dir"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/master"
@@ -211,13 +211,13 @@ func startComponents(manifestURL string) (apiServerURL string) {
// Kubelet (localhost)
testRootDir := makeTempDirOrDie("kubelet_integ_1.")
glog.Infof("Using %s as root dir for kubelet #1", testRootDir)
kubeletServer.SimpleRunKubelet(cl, nil, &fakeDocker1, machineList[0], testRootDir, manifestURL, "127.0.0.1", 10250, api.NamespaceDefault, empty_dir.ProbeVolumePlugins())
kubeletapp.SimpleRunKubelet(cl, nil, &fakeDocker1, machineList[0], testRootDir, manifestURL, "127.0.0.1", 10250, api.NamespaceDefault, empty_dir.ProbeVolumePlugins())
// Kubelet (machine)
// Create a second kubelet so that the guestbook example's two redis slaves both
// have a place they can schedule.
testRootDir = makeTempDirOrDie("kubelet_integ_2.")
glog.Infof("Using %s as root dir for kubelet #2", testRootDir)
kubeletServer.SimpleRunKubelet(cl, nil, &fakeDocker2, machineList[1], testRootDir, "", "127.0.0.1", 10251, api.NamespaceDefault, empty_dir.ProbeVolumePlugins())
kubeletapp.SimpleRunKubelet(cl, nil, &fakeDocker2, machineList[1], testRootDir, "", "127.0.0.1", 10251, api.NamespaceDefault, empty_dir.ProbeVolumePlugins())
return apiServer.URL
}

View File

@@ -23,7 +23,7 @@ import (
"os"
"runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/master/server"
"github.com/GoogleCloudPlatform/kubernetes/cmd/kube-apiserver/app"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/version/verflag"
@@ -32,7 +32,7 @@ import (
func main() {
runtime.GOMAXPROCS(runtime.NumCPU())
s := server.NewAPIServer()
s := app.NewAPIServer()
s.AddFlags(pflag.CommandLine)
util.InitFlags()

View File

@@ -0,0 +1,39 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package app
// This file exists to force the desired plugin implementations to be linked.
// This should probably be part of some configuration fed into the build for a
// given binary target.
import (
// Cloud providers
_ "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/aws"
_ "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/gce"
_ "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/openstack"
_ "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/ovirt"
_ "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/rackspace"
_ "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/vagrant"
// Admission policies
_ "github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/admission/admit"
_ "github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/admission/deny"
_ "github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/admission/limitranger"
_ "github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/admission/namespace/autoprovision"
_ "github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/admission/namespace/exists"
_ "github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/admission/resourcedefaults"
_ "github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/admission/resourcequota"
)

View File

@@ -0,0 +1,328 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package app does all of the work necessary to create a Kubernetes
// APIServer by binding together the API, master and APIServer infrastructure.
// It can be configured and called directly or via the hyperkube framework.
package app
import (
"crypto/tls"
"net"
"net/http"
"strconv"
"strings"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/admission"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver"
"github.com/GoogleCloudPlatform/kubernetes/pkg/capabilities"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider"
"github.com/GoogleCloudPlatform/kubernetes/pkg/master"
"github.com/GoogleCloudPlatform/kubernetes/pkg/tools"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/coreos/go-etcd/etcd"
"github.com/golang/glog"
"github.com/spf13/pflag"
)
// APIServer runs a kubernetes api server.
type APIServer struct {
WideOpenPort int
Address util.IP
PublicAddressOverride util.IP
ReadOnlyPort int
APIRate float32
APIBurst int
SecurePort int
TLSCertFile string
TLSPrivateKeyFile string
APIPrefix string
StorageVersion string
CloudProvider string
CloudConfigFile string
EventTTL time.Duration
TokenAuthFile string
AuthorizationMode string
AuthorizationPolicyFile string
AdmissionControl string
AdmissionControlConfigFile string
EtcdServerList util.StringList
EtcdConfigFile string
CorsAllowedOriginList util.StringList
AllowPrivileged bool
PortalNet util.IPNet // TODO: make this a list
EnableLogsSupport bool
MasterServiceNamespace string
RuntimeConfig util.ConfigurationMap
KubeletConfig client.KubeletConfig
}
// NewAPIServer creates a new APIServer object with default parameters
func NewAPIServer() *APIServer {
s := APIServer{
WideOpenPort: 8080,
Address: util.IP(net.ParseIP("127.0.0.1")),
PublicAddressOverride: util.IP(net.ParseIP("")),
ReadOnlyPort: 7080,
APIRate: 10.0,
APIBurst: 200,
SecurePort: 6443,
APIPrefix: "/api",
EventTTL: 48 * time.Hour,
AuthorizationMode: "AlwaysAllow",
AdmissionControl: "AlwaysAdmit",
EnableLogsSupport: true,
MasterServiceNamespace: api.NamespaceDefault,
RuntimeConfig: make(util.ConfigurationMap),
KubeletConfig: client.KubeletConfig{
Port: 10250,
EnableHttps: false,
},
}
return &s
}
// AddFlags adds flags for a specific APIServer to the specified FlagSet
func (s *APIServer) AddFlags(fs *pflag.FlagSet) {
// Note: the weird ""+ in below lines seems to be the only way to get gofmt to
// arrange these text blocks sensibly. Grrr.
fs.IntVar(&s.WideOpenPort, "port", s.WideOpenPort, ""+
"The port to listen on. Default 8080. It is assumed that firewall rules are "+
"set up such that this port is not reachable from outside of the cluster. It is "+
"further assumed that port 443 on the cluster's public address is proxied to this "+
"port. This is performed by nginx in the default setup.")
fs.Var(&s.Address, "address", "The IP address on to serve on (set to 0.0.0.0 for all interfaces)")
fs.Var(&s.PublicAddressOverride, "public_address_override", "Public serving address."+
"Read only port will be opened on this address, and it is assumed that port "+
"443 at this address will be proxied/redirected to '-address':'-port'. If "+
"blank, the address in the first listed interface will be used.")
fs.IntVar(&s.ReadOnlyPort, "read_only_port", s.ReadOnlyPort, ""+
"The port from which to serve read-only resources. If 0, don't serve on a "+
"read-only address. It is assumed that firewall rules are set up such that "+
"this port is not reachable from outside of the cluster.")
fs.Float32Var(&s.APIRate, "api_rate", s.APIRate, "API rate limit as QPS for the read only port")
fs.IntVar(&s.APIBurst, "api_burst", s.APIBurst, "API burst amount for the read only port")
fs.IntVar(&s.SecurePort, "secure_port", s.SecurePort,
"The port from which to serve HTTPS with authentication and authorization. If 0, don't serve HTTPS ")
fs.StringVar(&s.TLSCertFile, "tls_cert_file", s.TLSCertFile, ""+
"File containing x509 Certificate for HTTPS. (CA cert, if any, concatenated after server cert). "+
"If HTTPS serving is enabled, and --tls_cert_file and --tls_private_key_file are not provided, "+
"a self-signed certificate and key are generated for the public address and saved to /var/run/kubernetes.")
fs.StringVar(&s.TLSPrivateKeyFile, "tls_private_key_file", s.TLSPrivateKeyFile, "File containing x509 private key matching --tls_cert_file.")
fs.StringVar(&s.APIPrefix, "api_prefix", s.APIPrefix, "The prefix for API requests on the server. Default '/api'.")
fs.StringVar(&s.StorageVersion, "storage_version", s.StorageVersion, "The version to store resources with. Defaults to server preferred")
fs.StringVar(&s.CloudProvider, "cloud_provider", s.CloudProvider, "The provider for cloud services. Empty string for no provider.")
fs.StringVar(&s.CloudConfigFile, "cloud_config", s.CloudConfigFile, "The path to the cloud provider configuration file. Empty string for no configuration file.")
fs.DurationVar(&s.EventTTL, "event_ttl", s.EventTTL, "Amount of time to retain events. Default 2 days.")
fs.StringVar(&s.TokenAuthFile, "token_auth_file", s.TokenAuthFile, "If set, the file that will be used to secure the secure port of the API server via token authentication.")
fs.StringVar(&s.AuthorizationMode, "authorization_mode", s.AuthorizationMode, "Selects how to do authorization on the secure port. One of: "+strings.Join(apiserver.AuthorizationModeChoices, ","))
fs.StringVar(&s.AuthorizationPolicyFile, "authorization_policy_file", s.AuthorizationPolicyFile, "File with authorization policy in csv format, used with --authorization_mode=ABAC, on the secure port.")
fs.StringVar(&s.AdmissionControl, "admission_control", s.AdmissionControl, "Ordered list of plug-ins to do admission control of resources into cluster. Comma-delimited list of: "+strings.Join(admission.GetPlugins(), ", "))
fs.StringVar(&s.AdmissionControlConfigFile, "admission_control_config_file", s.AdmissionControlConfigFile, "File with admission control configuration.")
fs.Var(&s.EtcdServerList, "etcd_servers", "List of etcd servers to watch (http://ip:port), comma separated. Mutually exclusive with -etcd_config")
fs.StringVar(&s.EtcdConfigFile, "etcd_config", s.EtcdConfigFile, "The config file for the etcd client. Mutually exclusive with -etcd_servers.")
fs.Var(&s.CorsAllowedOriginList, "cors_allowed_origins", "List of allowed origins for CORS, comma separated. An allowed origin can be a regular expression to support subdomain matching. If this list is empty CORS will not be enabled.")
fs.BoolVar(&s.AllowPrivileged, "allow_privileged", s.AllowPrivileged, "If true, allow privileged containers.")
fs.Var(&s.PortalNet, "portal_net", "A CIDR notation IP range from which to assign portal IPs. This must not overlap with any IP ranges assigned to nodes for pods.")
fs.StringVar(&s.MasterServiceNamespace, "master_service_namespace", s.MasterServiceNamespace, "The namespace from which the kubernetes master services should be injected into pods")
fs.Var(&s.RuntimeConfig, "runtime_config", "A set of key=value pairs that describe runtime configuration that may be passed to the apiserver.")
client.BindKubeletClientConfigFlags(fs, &s.KubeletConfig)
}
// TODO: Longer term we should read this from some config store, rather than a flag.
func (s *APIServer) verifyPortalFlags() {
if s.PortalNet.IP == nil {
glog.Fatal("No --portal_net specified")
}
}
func newEtcd(etcdConfigFile string, etcdServerList util.StringList, storageVersion string) (helper tools.EtcdHelper, err error) {
var client tools.EtcdGetSet
if etcdConfigFile != "" {
client, err = etcd.NewClientFromFile(etcdConfigFile)
if err != nil {
return helper, err
}
} else {
client = etcd.NewClient(etcdServerList)
}
return master.NewEtcdHelper(client, storageVersion)
}
// Run runs the specified APIServer. This should never exit.
func (s *APIServer) Run(_ []string) error {
s.verifyPortalFlags()
if (s.EtcdConfigFile != "" && len(s.EtcdServerList) != 0) || (s.EtcdConfigFile == "" && len(s.EtcdServerList) == 0) {
glog.Fatalf("specify either --etcd_servers or --etcd_config")
}
capabilities.Initialize(capabilities.Capabilities{
AllowPrivileged: s.AllowPrivileged,
})
cloud := cloudprovider.InitCloudProvider(s.CloudProvider, s.CloudConfigFile)
kubeletClient, err := client.NewKubeletClient(&s.KubeletConfig)
if err != nil {
glog.Fatalf("Failure to start kubelet client: %v", err)
}
_, v1beta3 := s.RuntimeConfig["api/v1beta3"]
// TODO: expose same flags as client.BindClientConfigFlags but for a server
clientConfig := &client.Config{
Host: net.JoinHostPort(s.Address.String(), strconv.Itoa(s.WideOpenPort)),
Version: s.StorageVersion,
}
client, err := client.New(clientConfig)
if err != nil {
glog.Fatalf("Invalid server address: %v", err)
}
helper, err := newEtcd(s.EtcdConfigFile, s.EtcdServerList, s.StorageVersion)
if err != nil {
glog.Fatalf("Invalid storage version or misconfigured etcd: %v", err)
}
n := net.IPNet(s.PortalNet)
authenticator, err := apiserver.NewAuthenticatorFromTokenFile(s.TokenAuthFile)
if err != nil {
glog.Fatalf("Invalid Authentication Config: %v", err)
}
authorizer, err := apiserver.NewAuthorizerFromAuthorizationConfig(s.AuthorizationMode, s.AuthorizationPolicyFile)
if err != nil {
glog.Fatalf("Invalid Authorization Config: %v", err)
}
admissionControlPluginNames := strings.Split(s.AdmissionControl, ",")
admissionController := admission.NewFromPlugins(client, admissionControlPluginNames, s.AdmissionControlConfigFile)
config := &master.Config{
Client: client,
Cloud: cloud,
EtcdHelper: helper,
EventTTL: s.EventTTL,
KubeletClient: kubeletClient,
PortalNet: &n,
EnableLogsSupport: s.EnableLogsSupport,
EnableUISupport: true,
EnableSwaggerSupport: true,
EnableIndex: true,
APIPrefix: s.APIPrefix,
CorsAllowedOriginList: s.CorsAllowedOriginList,
ReadOnlyPort: s.ReadOnlyPort,
ReadWritePort: s.SecurePort,
PublicAddress: net.IP(s.PublicAddressOverride),
Authenticator: authenticator,
Authorizer: authorizer,
AdmissionControl: admissionController,
EnableV1Beta3: v1beta3,
MasterServiceNamespace: s.MasterServiceNamespace,
}
m := master.New(config)
// We serve on 3 ports. See docs/accessing_the_api.md
roLocation := ""
if s.ReadOnlyPort != 0 {
roLocation = net.JoinHostPort(config.PublicAddress.String(), strconv.Itoa(s.ReadOnlyPort))
}
secureLocation := ""
if s.SecurePort != 0 {
secureLocation = net.JoinHostPort(config.PublicAddress.String(), strconv.Itoa(s.SecurePort))
}
wideOpenLocation := net.JoinHostPort(s.Address.String(), strconv.Itoa(s.WideOpenPort))
// See the flag commentary to understand our assumptions when opening the read-only and read-write ports.
if roLocation != "" {
// Default settings allow 1 read-only request per second, allow up to 20 in a burst before enforcing.
rl := util.NewTokenBucketRateLimiter(s.APIRate, s.APIBurst)
readOnlyServer := &http.Server{
Addr: roLocation,
Handler: apiserver.RecoverPanics(apiserver.ReadOnly(apiserver.RateLimit(rl, m.InsecureHandler))),
ReadTimeout: 5 * time.Minute,
WriteTimeout: 5 * time.Minute,
MaxHeaderBytes: 1 << 20,
}
glog.Infof("Serving read-only insecurely on %s", roLocation)
go func() {
defer util.HandleCrash()
for {
if err := readOnlyServer.ListenAndServe(); err != nil {
glog.Errorf("Unable to listen for read only traffic (%v); will try again.", err)
}
time.Sleep(15 * time.Second)
}
}()
}
if secureLocation != "" {
secureServer := &http.Server{
Addr: secureLocation,
Handler: apiserver.RecoverPanics(m.Handler),
ReadTimeout: 5 * time.Minute,
WriteTimeout: 5 * time.Minute,
MaxHeaderBytes: 1 << 20,
TLSConfig: &tls.Config{
// Change default from SSLv3 to TLSv1.0 (because of POODLE vulnerability)
MinVersion: tls.VersionTLS10,
// Populate PeerCertificates in requests, but don't reject connections without certificates
// This allows certificates to be validated by authenticators, while still allowing other auth types
ClientAuth: tls.RequestClientCert,
},
}
glog.Infof("Serving securely on %s", secureLocation)
go func() {
defer util.HandleCrash()
for {
if s.TLSCertFile == "" && s.TLSPrivateKeyFile == "" {
s.TLSCertFile = "/var/run/kubernetes/apiserver.crt"
s.TLSPrivateKeyFile = "/var/run/kubernetes/apiserver.key"
if err := util.GenerateSelfSignedCert(config.PublicAddress.String(), s.TLSCertFile, s.TLSPrivateKeyFile); err != nil {
glog.Errorf("Unable to generate self signed cert: %v", err)
} else {
glog.Infof("Using self-signed cert (%s, %s)", s.TLSCertFile, s.TLSPrivateKeyFile)
}
}
if err := secureServer.ListenAndServeTLS(s.TLSCertFile, s.TLSPrivateKeyFile); err != nil {
glog.Errorf("Unable to listen for secure (%v); will try again.", err)
}
time.Sleep(15 * time.Second)
}
}()
}
http := &http.Server{
Addr: wideOpenLocation,
Handler: apiserver.RecoverPanics(m.InsecureHandler),
ReadTimeout: 5 * time.Minute,
WriteTimeout: 5 * time.Minute,
MaxHeaderBytes: 1 << 20,
}
glog.Infof("Serving insecurely on %s", wideOpenLocation)
glog.Fatal(http.ListenAndServe())
return nil
}

View File

@@ -0,0 +1,168 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package app implements a server that runs a set of active
// components. This includes replication controllers, service endpoints and
// nodes.
package app
import (
"net"
"net/http"
"strconv"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/resource"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider"
nodeControllerPkg "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/controller"
replicationControllerPkg "github.com/GoogleCloudPlatform/kubernetes/pkg/controller"
_ "github.com/GoogleCloudPlatform/kubernetes/pkg/healthz"
"github.com/GoogleCloudPlatform/kubernetes/pkg/master/ports"
"github.com/GoogleCloudPlatform/kubernetes/pkg/resourcequota"
"github.com/GoogleCloudPlatform/kubernetes/pkg/service"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/golang/glog"
"github.com/spf13/pflag"
)
// CMServer is the mail context object for the controller manager.
type CMServer struct {
Port int
Address util.IP
ClientConfig client.Config
CloudProvider string
CloudConfigFile string
MinionRegexp string
NodeSyncPeriod time.Duration
ResourceQuotaSyncPeriod time.Duration
RegisterRetryCount int
MachineList util.StringList
SyncNodeList bool
PodEvictionTimeout time.Duration
// TODO: Discover these by pinging the host machines, and rip out these params.
NodeMilliCPU int64
NodeMemory resource.Quantity
KubeletConfig client.KubeletConfig
}
// NewCMServer creates a new CMServer with a default config.
func NewCMServer() *CMServer {
s := CMServer{
Port: ports.ControllerManagerPort,
Address: util.IP(net.ParseIP("127.0.0.1")),
NodeSyncPeriod: 10 * time.Second,
ResourceQuotaSyncPeriod: 10 * time.Second,
RegisterRetryCount: 10,
PodEvictionTimeout: 5 * time.Minute,
NodeMilliCPU: 1000,
NodeMemory: resource.MustParse("3Gi"),
SyncNodeList: true,
KubeletConfig: client.KubeletConfig{
Port: ports.KubeletPort,
EnableHttps: false,
},
}
return &s
}
// AddFlags adds flags for a specific CMServer to the specified FlagSet
func (s *CMServer) AddFlags(fs *pflag.FlagSet) {
fs.IntVar(&s.Port, "port", s.Port, "The port that the controller-manager's http service runs on")
fs.Var(&s.Address, "address", "The IP address to serve on (set to 0.0.0.0 for all interfaces)")
client.BindClientConfigFlags(fs, &s.ClientConfig)
fs.StringVar(&s.CloudProvider, "cloud_provider", s.CloudProvider, "The provider for cloud services. Empty string for no provider.")
fs.StringVar(&s.CloudConfigFile, "cloud_config", s.CloudConfigFile, "The path to the cloud provider configuration file. Empty string for no configuration file.")
fs.StringVar(&s.MinionRegexp, "minion_regexp", s.MinionRegexp, "If non empty, and --cloud_provider is specified, a regular expression for matching minion VMs.")
fs.DurationVar(&s.NodeSyncPeriod, "node_sync_period", s.NodeSyncPeriod, ""+
"The period for syncing nodes from cloudprovider. Longer periods will result in "+
"fewer calls to cloud provider, but may delay addition of new nodes to cluster.")
fs.DurationVar(&s.ResourceQuotaSyncPeriod, "resource_quota_sync_period", s.ResourceQuotaSyncPeriod, "The period for syncing quota usage status in the system")
fs.DurationVar(&s.PodEvictionTimeout, "pod_eviction_timeout", s.PodEvictionTimeout, "The grace peroid for deleting pods on failed nodes.")
fs.IntVar(&s.RegisterRetryCount, "register_retry_count", s.RegisterRetryCount, ""+
"The number of retries for initial node registration. Retry interval equals node_sync_period.")
fs.Var(&s.MachineList, "machines", "List of machines to schedule onto, comma separated.")
fs.BoolVar(&s.SyncNodeList, "sync_nodes", s.SyncNodeList, "If true, and --cloud_provider is specified, sync nodes from the cloud provider. Default true.")
// TODO: Discover these by pinging the host machines, and rip out these flags.
// TODO: in the meantime, use resource.QuantityFlag() instead of these
fs.Int64Var(&s.NodeMilliCPU, "node_milli_cpu", s.NodeMilliCPU, "The amount of MilliCPU provisioned on each node")
fs.Var(resource.NewQuantityFlagValue(&s.NodeMemory), "node_memory", "The amount of memory (in bytes) provisioned on each node")
client.BindKubeletClientConfigFlags(fs, &s.KubeletConfig)
}
func (s *CMServer) verifyMinionFlags() {
if !s.SyncNodeList && s.MinionRegexp != "" {
glog.Info("--minion_regexp is ignored by --sync_nodes=false")
}
if s.CloudProvider == "" || s.MinionRegexp == "" {
if len(s.MachineList) == 0 {
glog.Info("No machines specified!")
}
return
}
if len(s.MachineList) != 0 {
glog.Info("--machines is overwritten by --minion_regexp")
}
}
// Run runs the CMServer. This should never exit.
func (s *CMServer) Run(_ []string) error {
s.verifyMinionFlags()
if len(s.ClientConfig.Host) == 0 {
glog.Fatal("usage: controller-manager --master <master>")
}
kubeClient, err := client.New(&s.ClientConfig)
if err != nil {
glog.Fatalf("Invalid API configuration: %v", err)
}
go http.ListenAndServe(net.JoinHostPort(s.Address.String(), strconv.Itoa(s.Port)), nil)
endpoints := service.NewEndpointController(kubeClient)
go util.Forever(func() { endpoints.SyncServiceEndpoints() }, time.Second*10)
controllerManager := replicationControllerPkg.NewReplicationManager(kubeClient)
controllerManager.Run(10 * time.Second)
kubeletClient, err := client.NewKubeletClient(&s.KubeletConfig)
if err != nil {
glog.Fatalf("Failure to start kubelet client: %v", err)
}
cloud := cloudprovider.InitCloudProvider(s.CloudProvider, s.CloudConfigFile)
nodeResources := &api.NodeResources{
Capacity: api.ResourceList{
api.ResourceCPU: *resource.NewMilliQuantity(s.NodeMilliCPU, resource.DecimalSI),
api.ResourceMemory: s.NodeMemory,
},
}
nodeController := nodeControllerPkg.NewNodeController(cloud, s.MinionRegexp, s.MachineList, nodeResources,
kubeClient, kubeletClient, s.RegisterRetryCount, s.PodEvictionTimeout)
nodeController.Run(s.NodeSyncPeriod, s.SyncNodeList)
resourceQuotaManager := resourcequota.NewResourceQuotaManager(kubeClient)
resourceQuotaManager.Run(s.ResourceQuotaSyncPeriod)
select {}
return nil
}

View File

@@ -0,0 +1,29 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package app
import (
// This file exists to force the desired plugin implementations to be linked.
// This should probably be part of some configuration fed into the build for a
// given binary target.
_ "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/aws"
_ "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/gce"
_ "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/openstack"
_ "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/ovirt"
_ "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/rackspace"
_ "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/vagrant"
)

View File

@@ -25,7 +25,7 @@ import (
"os"
"runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/controllermanager"
"github.com/GoogleCloudPlatform/kubernetes/cmd/kube-controller-manager/app"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/version/verflag"
@@ -34,7 +34,7 @@ import (
func main() {
runtime.GOMAXPROCS(runtime.NumCPU())
s := controllermanager.NewCMServer()
s := app.NewCMServer()
s.AddFlags(pflag.CommandLine)
util.InitFlags()

View File

@@ -0,0 +1,153 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package app does all of the work necessary to configure and run a
// Kubernetes app process.
package app
import (
"net"
"net/http"
"strconv"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
_ "github.com/GoogleCloudPlatform/kubernetes/pkg/healthz"
"github.com/GoogleCloudPlatform/kubernetes/pkg/proxy"
"github.com/GoogleCloudPlatform/kubernetes/pkg/proxy/config"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/exec"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/iptables"
"github.com/coreos/go-etcd/etcd"
"github.com/golang/glog"
"github.com/spf13/pflag"
)
// ProxyServer contains configures and runs a Kubernetes proxy server
type ProxyServer struct {
EtcdServerList util.StringList
EtcdConfigFile string
BindAddress util.IP
ClientConfig client.Config
HealthzPort int
OOMScoreAdj int
}
// NewProxyServer creates a new ProxyServer object with default parameters
func NewProxyServer() *ProxyServer {
return &ProxyServer{
BindAddress: util.IP(net.ParseIP("0.0.0.0")),
HealthzPort: 10249,
OOMScoreAdj: -899,
}
}
// AddFlags adds flags for a specific ProxyServer to the specified FlagSet
func (s *ProxyServer) AddFlags(fs *pflag.FlagSet) {
fs.StringVar(&s.EtcdConfigFile, "etcd_config", s.EtcdConfigFile, "The config file for the etcd client. Mutually exclusive with -etcd_servers")
fs.Var(&s.EtcdServerList, "etcd_servers", "List of etcd servers to watch (http://ip:port), comma separated (optional). Mutually exclusive with -etcd_config")
fs.Var(&s.BindAddress, "bind_address", "The IP address for the proxy server to serve on (set to 0.0.0.0 for all interfaces)")
client.BindClientConfigFlags(fs, &s.ClientConfig)
fs.IntVar(&s.HealthzPort, "healthz_port", s.HealthzPort, "The port to bind the health check server. Use 0 to disable.")
fs.IntVar(&s.OOMScoreAdj, "oom_score_adj", s.OOMScoreAdj, "The oom_score_adj value for kube-proxy process. Values must be within the range [-1000, 1000]")
}
// Run runs the specified ProxyServer. This should never exit.
func (s *ProxyServer) Run(_ []string) error {
if err := util.ApplyOomScoreAdj(0, s.OOMScoreAdj); err != nil {
glog.Info(err)
}
serviceConfig := config.NewServiceConfig()
endpointsConfig := config.NewEndpointsConfig()
protocol := iptables.ProtocolIpv4
if net.IP(s.BindAddress).To4() == nil {
protocol = iptables.ProtocolIpv6
}
loadBalancer := proxy.NewLoadBalancerRR()
proxier := proxy.NewProxier(loadBalancer, net.IP(s.BindAddress), iptables.New(exec.New(), protocol))
if proxier == nil {
glog.Fatalf("failed to create proxier, aborting")
}
// Wire proxier to handle changes to services
serviceConfig.RegisterHandler(proxier)
// And wire loadBalancer to handle changes to endpoints to services
endpointsConfig.RegisterHandler(loadBalancer)
// Note: RegisterHandler() calls need to happen before creation of Sources because sources
// only notify on changes, and the initial update (on process start) may be lost if no handlers
// are registered yet.
// define api config source
if s.ClientConfig.Host != "" {
glog.Infof("Using API calls to get config %v", s.ClientConfig.Host)
client, err := client.New(&s.ClientConfig)
if err != nil {
glog.Fatalf("Invalid API configuration: %v", err)
}
config.NewSourceAPI(
client.Services(api.NamespaceAll),
client.Endpoints(api.NamespaceAll),
30*time.Second,
serviceConfig.Channel("api"),
endpointsConfig.Channel("api"),
)
} else {
var etcdClient *etcd.Client
// Set up etcd client
if len(s.EtcdServerList) > 0 {
// Set up logger for etcd client
etcd.SetLogger(util.NewLogger("etcd "))
etcdClient = etcd.NewClient(s.EtcdServerList)
} else if s.EtcdConfigFile != "" {
// Set up logger for etcd client
etcd.SetLogger(util.NewLogger("etcd "))
var err error
etcdClient, err = etcd.NewClientFromFile(s.EtcdConfigFile)
if err != nil {
glog.Fatalf("Error with etcd config file: %v", err)
}
}
// Create a configuration source that handles configuration from etcd.
if etcdClient != nil {
glog.Infof("Using etcd servers %v", etcdClient.GetCluster())
config.NewConfigSourceEtcd(etcdClient,
serviceConfig.Channel("etcd"),
endpointsConfig.Channel("etcd"))
}
}
if s.HealthzPort > 0 {
go util.Forever(func() {
err := http.ListenAndServe(s.BindAddress.String()+":"+strconv.Itoa(s.HealthzPort), nil)
if err != nil {
glog.Errorf("Starting health server failed: %v", err)
}
}, 5*time.Second)
}
// Just loop forever for now...
proxier.SyncLoop()
return nil
}

View File

@@ -21,7 +21,7 @@ import (
"os"
"runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/proxy/server"
"github.com/GoogleCloudPlatform/kubernetes/cmd/kube-proxy/app"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/version/verflag"
@@ -30,7 +30,7 @@ import (
func main() {
runtime.GOMAXPROCS(runtime.NumCPU())
s := server.NewProxyServer()
s := app.NewProxyServer()
s.AddFlags(pflag.CommandLine)
util.InitFlags()

View File

@@ -0,0 +1,46 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package app
// This file exists to force the desired plugin implementations to be linked.
import (
// Credential providers
_ "github.com/GoogleCloudPlatform/kubernetes/pkg/credentialprovider/gcp"
// Volume plugins
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/volume"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/volume/empty_dir"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/volume/gce_pd"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/volume/git_repo"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/volume/host_path"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/volume/secret"
)
// ProbeVolumePlugins collects all volume plugins into an easy to use list.
func ProbeVolumePlugins() []volume.Plugin {
allPlugins := []volume.Plugin{}
// The list of plugins to probe is decided by the kubelet binary, not
// by dynamic linking or other "magic". Plugins will be analyzed and
// initialized later.
allPlugins = append(allPlugins, empty_dir.ProbeVolumePlugins()...)
allPlugins = append(allPlugins, gce_pd.ProbeVolumePlugins()...)
allPlugins = append(allPlugins, git_repo.ProbeVolumePlugins()...)
allPlugins = append(allPlugins, host_path.ProbeVolumePlugins()...)
allPlugins = append(allPlugins, secret.ProbeVolumePlugins()...)
return allPlugins
}

416
cmd/kubelet/app/server.go Normal file
View File

@@ -0,0 +1,416 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package app makes it easy to create a kubelet server for various contexts.
package app
import (
"fmt"
"math/rand"
"net"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/clientauth"
"github.com/GoogleCloudPlatform/kubernetes/pkg/credentialprovider"
_ "github.com/GoogleCloudPlatform/kubernetes/pkg/healthz"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/config"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/volume"
"github.com/GoogleCloudPlatform/kubernetes/pkg/master/ports"
"github.com/GoogleCloudPlatform/kubernetes/pkg/tools"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/golang/glog"
"github.com/spf13/pflag"
)
const defaultRootDir = "/var/lib/kubelet"
// KubeletServer encapsulates all of the parameters necessary for starting up
// a kubelet. These can either be set via command line or directly.
type KubeletServer struct {
Config string
SyncFrequency time.Duration
FileCheckFrequency time.Duration
HTTPCheckFrequency time.Duration
ManifestURL string
EnableServer bool
Address util.IP
Port uint
HostnameOverride string
PodInfraContainerImage string
DockerEndpoint string
EtcdServerList util.StringList
EtcdConfigFile string
RootDirectory string
AllowPrivileged bool
RegistryPullQPS float64
RegistryBurst int
RunOnce bool
EnableDebuggingHandlers bool
MinimumGCAge time.Duration
MaxContainerCount int
AuthPath string
CAdvisorPort uint
OOMScoreAdj int
APIServerList util.StringList
ClusterDomain string
MasterServiceNamespace string
ClusterDNS util.IP
ReallyCrashForTesting bool
StreamingConnectionIdleTimeout time.Duration
}
// NewKubeletServer will create a new KubeletServer with default values.
func NewKubeletServer() *KubeletServer {
return &KubeletServer{
SyncFrequency: 10 * time.Second,
FileCheckFrequency: 20 * time.Second,
HTTPCheckFrequency: 20 * time.Second,
EnableServer: true,
Address: util.IP(net.ParseIP("127.0.0.1")),
Port: ports.KubeletPort,
PodInfraContainerImage: kubelet.PodInfraContainerImage,
RootDirectory: defaultRootDir,
RegistryBurst: 10,
EnableDebuggingHandlers: true,
MinimumGCAge: 1 * time.Minute,
MaxContainerCount: 5,
CAdvisorPort: 4194,
OOMScoreAdj: -900,
MasterServiceNamespace: api.NamespaceDefault,
}
}
// AddFlags adds flags for a specific KubeletServer to the specified FlagSet
func (s *KubeletServer) AddFlags(fs *pflag.FlagSet) {
fs.StringVar(&s.Config, "config", s.Config, "Path to the config file or directory of files")
fs.DurationVar(&s.SyncFrequency, "sync_frequency", s.SyncFrequency, "Max period between synchronizing running containers and config")
fs.DurationVar(&s.FileCheckFrequency, "file_check_frequency", s.FileCheckFrequency, "Duration between checking config files for new data")
fs.DurationVar(&s.HTTPCheckFrequency, "http_check_frequency", s.HTTPCheckFrequency, "Duration between checking http for new data")
fs.StringVar(&s.ManifestURL, "manifest_url", s.ManifestURL, "URL for accessing the container manifest")
fs.BoolVar(&s.EnableServer, "enable_server", s.EnableServer, "Enable the info server")
fs.Var(&s.Address, "address", "The IP address for the info server to serve on (set to 0.0.0.0 for all interfaces)")
fs.UintVar(&s.Port, "port", s.Port, "The port for the info server to serve on")
fs.StringVar(&s.HostnameOverride, "hostname_override", s.HostnameOverride, "If non-empty, will use this string as identification instead of the actual hostname.")
fs.StringVar(&s.PodInfraContainerImage, "pod_infra_container_image", s.PodInfraContainerImage, "The image whose network/ipc namespaces containers in each pod will use.")
fs.StringVar(&s.DockerEndpoint, "docker_endpoint", s.DockerEndpoint, "If non-empty, use this for the docker endpoint to communicate with")
fs.Var(&s.EtcdServerList, "etcd_servers", "List of etcd servers to watch (http://ip:port), comma separated. Mutually exclusive with -etcd_config")
fs.StringVar(&s.EtcdConfigFile, "etcd_config", s.EtcdConfigFile, "The config file for the etcd client. Mutually exclusive with -etcd_servers")
fs.StringVar(&s.RootDirectory, "root_dir", s.RootDirectory, "Directory path for managing kubelet files (volume mounts,etc).")
fs.BoolVar(&s.AllowPrivileged, "allow_privileged", s.AllowPrivileged, "If true, allow containers to request privileged mode. [default=false]")
fs.Float64Var(&s.RegistryPullQPS, "registry_qps", s.RegistryPullQPS, "If > 0, limit registry pull QPS to this value. If 0, unlimited. [default=0.0]")
fs.IntVar(&s.RegistryBurst, "registry_burst", s.RegistryBurst, "Maximum size of a bursty pulls, temporarily allows pulls to burst to this number, while still not exceeding registry_qps. Only used if --registry_qps > 0")
fs.BoolVar(&s.RunOnce, "runonce", s.RunOnce, "If true, exit after spawning pods from local manifests or remote urls. Exclusive with --etcd_servers, --api_servers, and --enable-server")
fs.BoolVar(&s.EnableDebuggingHandlers, "enable_debugging_handlers", s.EnableDebuggingHandlers, "Enables server endpoints for log collection and local running of containers and commands")
fs.DurationVar(&s.MinimumGCAge, "minimum_container_ttl_duration", s.MinimumGCAge, "Minimum age for a finished container before it is garbage collected. Examples: '300ms', '10s' or '2h45m'")
fs.IntVar(&s.MaxContainerCount, "maximum_dead_containers_per_container", s.MaxContainerCount, "Maximum number of old instances of a container to retain per container. Each container takes up some disk space. Default: 5.")
fs.StringVar(&s.AuthPath, "auth_path", s.AuthPath, "Path to .kubernetes_auth file, specifying how to authenticate to API server.")
fs.UintVar(&s.CAdvisorPort, "cadvisor_port", s.CAdvisorPort, "The port of the localhost cAdvisor endpoint")
fs.IntVar(&s.OOMScoreAdj, "oom_score_adj", s.OOMScoreAdj, "The oom_score_adj value for kubelet process. Values must be within the range [-1000, 1000]")
fs.Var(&s.APIServerList, "api_servers", "List of Kubernetes API servers for publishing events, and reading pods and services. (ip:port), comma separated.")
fs.StringVar(&s.ClusterDomain, "cluster_domain", s.ClusterDomain, "Domain for this cluster. If set, kubelet will configure all containers to search this domain in addition to the host's search domains")
fs.StringVar(&s.MasterServiceNamespace, "master_service_namespace", s.MasterServiceNamespace, "The namespace from which the kubernetes master services should be injected into pods")
fs.Var(&s.ClusterDNS, "cluster_dns", "IP address for a cluster DNS server. If set, kubelet will configure all containers to use this for DNS resolution in addition to the host's DNS servers")
fs.BoolVar(&s.ReallyCrashForTesting, "really_crash_for_testing", s.ReallyCrashForTesting, "If true, crash with panics more often.")
fs.DurationVar(&s.StreamingConnectionIdleTimeout, "streaming_connection_idle_timeout", 0, "Maximum time a streaming connection can be idle before the connection is automatically closed. Example: '5m'")
}
// Run runs the specified KubeletServer. This should never exit.
func (s *KubeletServer) Run(_ []string) error {
util.ReallyCrash = s.ReallyCrashForTesting
rand.Seed(time.Now().UTC().UnixNano())
// Cluster creation scripts support both kubernetes versions that 1)
// support kublet watching apiserver for pods, and 2) ones that don't. So
// they can set both --etcd_servers and --api_servers. The current code
// will ignore the --etcd_servers flag, while older kubelet code will use
// the --etcd_servers flag for pods, and use --api_servers for event
// publising.
//
// TODO(erictune): convert all cloud provider scripts and Google Container Engine to
// use only --api_servers, then delete --etcd_servers flag and the resulting dead code.
if len(s.EtcdServerList) > 0 && len(s.APIServerList) > 0 {
glog.Infof("Both --etcd_servers and --api_servers are set. Not using etcd source.")
s.EtcdServerList = util.StringList{}
}
if err := util.ApplyOomScoreAdj(0, s.OOMScoreAdj); err != nil {
glog.Info(err)
}
client, err := s.createAPIServerClient()
if err != nil && len(s.APIServerList) > 0 {
glog.Warningf("No API client: %v", err)
}
glog.Infof("Using root directory: %v", s.RootDirectory)
credentialprovider.SetPreferredDockercfgPath(s.RootDirectory)
kcfg := KubeletConfig{
Address: s.Address,
AllowPrivileged: s.AllowPrivileged,
HostnameOverride: s.HostnameOverride,
RootDirectory: s.RootDirectory,
ConfigFile: s.Config,
ManifestURL: s.ManifestURL,
FileCheckFrequency: s.FileCheckFrequency,
HTTPCheckFrequency: s.HTTPCheckFrequency,
PodInfraContainerImage: s.PodInfraContainerImage,
SyncFrequency: s.SyncFrequency,
RegistryPullQPS: s.RegistryPullQPS,
RegistryBurst: s.RegistryBurst,
MinimumGCAge: s.MinimumGCAge,
MaxContainerCount: s.MaxContainerCount,
ClusterDomain: s.ClusterDomain,
ClusterDNS: s.ClusterDNS,
Runonce: s.RunOnce,
Port: s.Port,
CAdvisorPort: s.CAdvisorPort,
EnableServer: s.EnableServer,
EnableDebuggingHandlers: s.EnableDebuggingHandlers,
DockerClient: dockertools.ConnectToDockerOrDie(s.DockerEndpoint),
KubeClient: client,
EtcdClient: kubelet.EtcdClientOrDie(s.EtcdServerList, s.EtcdConfigFile),
MasterServiceNamespace: s.MasterServiceNamespace,
VolumePlugins: ProbeVolumePlugins(),
StreamingConnectionIdleTimeout: s.StreamingConnectionIdleTimeout,
}
RunKubelet(&kcfg)
// runs forever
select {}
}
func (s *KubeletServer) setupRunOnce() {
if s.RunOnce {
// Don't use remote (etcd or apiserver) sources
if len(s.EtcdServerList) > 0 {
glog.Fatalf("invalid option: --runonce and --etcd_servers are mutually exclusive")
}
if len(s.APIServerList) > 0 {
glog.Fatalf("invalid option: --runonce and --api_servers are mutually exclusive")
}
if s.EnableServer {
glog.Infof("--runonce is set, disabling server")
s.EnableServer = false
}
}
}
// TODO: replace this with clientcmd
func (s *KubeletServer) createAPIServerClient() (*client.Client, error) {
authInfo, err := clientauth.LoadFromFile(s.AuthPath)
if err != nil {
glog.Warningf("Could not load kubernetes auth path: %v. Continuing with defaults.", err)
}
if authInfo == nil {
// authInfo didn't load correctly - continue with defaults.
authInfo = &clientauth.Info{}
}
clientConfig, err := authInfo.MergeWithConfig(client.Config{})
if err != nil {
return nil, err
}
if len(s.APIServerList) < 1 {
return nil, fmt.Errorf("no api servers specified")
}
// TODO: adapt Kube client to support LB over several servers
if len(s.APIServerList) > 1 {
glog.Infof("Multiple api servers specified. Picking first one")
}
clientConfig.Host = s.APIServerList[0]
c, err := client.New(&clientConfig)
if err != nil {
return nil, err
}
return c, nil
}
// SimpleRunKubelet is a simple way to start a Kubelet talking to dockerEndpoint, using an etcdClient.
// Under the hood it calls RunKubelet (below)
func SimpleRunKubelet(client *client.Client,
etcdClient tools.EtcdClient,
dockerClient dockertools.DockerInterface,
hostname, rootDir, manifestURL, address string,
port uint,
masterServiceNamespace string,
volumePlugins []volume.Plugin) {
kcfg := KubeletConfig{
KubeClient: client,
EtcdClient: etcdClient,
DockerClient: dockerClient,
HostnameOverride: hostname,
RootDirectory: rootDir,
ManifestURL: manifestURL,
PodInfraContainerImage: kubelet.PodInfraContainerImage,
Port: port,
Address: util.IP(net.ParseIP(address)),
EnableServer: true,
EnableDebuggingHandlers: true,
SyncFrequency: 3 * time.Second,
MinimumGCAge: 10 * time.Second,
MaxContainerCount: 5,
MasterServiceNamespace: masterServiceNamespace,
VolumePlugins: volumePlugins,
}
RunKubelet(&kcfg)
}
// RunKubelet is responsible for setting up and running a kubelet. It is used in three different applications:
// 1 Integration tests
// 2 Kubelet binary
// 3 Standalone 'kubernetes' binary
// Eventually, #2 will be replaced with instances of #3
func RunKubelet(kcfg *KubeletConfig) {
kcfg.Hostname = util.GetHostname(kcfg.HostnameOverride)
if kcfg.KubeClient != nil {
kubelet.SetupEventSending(kcfg.KubeClient, kcfg.Hostname)
} else {
glog.Infof("No api server defined - no events will be sent.")
}
kubelet.SetupLogging()
kubelet.SetupCapabilities(kcfg.AllowPrivileged)
credentialprovider.SetPreferredDockercfgPath(kcfg.RootDirectory)
podCfg := makePodSourceConfig(kcfg)
k, err := createAndInitKubelet(kcfg, podCfg)
if err != nil {
glog.Errorf("Failed to create kubelet: %s", err)
return
}
// process pods and exit.
if kcfg.Runonce {
if _, err := k.RunOnce(podCfg.Updates()); err != nil {
glog.Errorf("--runonce failed: %v", err)
}
} else {
startKubelet(k, podCfg, kcfg)
}
}
func startKubelet(k *kubelet.Kubelet, podCfg *config.PodConfig, kc *KubeletConfig) {
// start the kubelet
go util.Forever(func() { k.Run(podCfg.Updates()) }, 0)
// start the kubelet server
if kc.EnableServer {
go util.Forever(func() {
kubelet.ListenAndServeKubeletServer(k, net.IP(kc.Address), kc.Port, kc.EnableDebuggingHandlers)
}, 0)
}
}
func makePodSourceConfig(kc *KubeletConfig) *config.PodConfig {
// source of all configuration
cfg := config.NewPodConfig(config.PodConfigNotificationSnapshotAndUpdates)
// define file config source
if kc.ConfigFile != "" {
glog.Infof("Adding manifest file: %v", kc.ConfigFile)
config.NewSourceFile(kc.ConfigFile, kc.FileCheckFrequency, cfg.Channel(kubelet.FileSource))
}
// define url config source
if kc.ManifestURL != "" {
glog.Infof("Adding manifest url: %v", kc.ManifestURL)
config.NewSourceURL(kc.ManifestURL, kc.HTTPCheckFrequency, cfg.Channel(kubelet.HTTPSource))
}
if kc.EtcdClient != nil {
glog.Infof("Watching for etcd configs at %v", kc.EtcdClient.GetCluster())
config.NewSourceEtcd(config.EtcdKeyForHost(kc.Hostname), kc.EtcdClient, cfg.Channel(kubelet.EtcdSource))
}
if kc.KubeClient != nil {
glog.Infof("Watching apiserver")
config.NewSourceApiserver(kc.KubeClient, kc.Hostname, cfg.Channel(kubelet.ApiserverSource))
}
return cfg
}
// KubeletConfig is all of the parameters necessary for running a kubelet.
// TODO: This should probably be merged with KubeletServer. The extra object is a consequence of refactoring.
type KubeletConfig struct {
EtcdClient tools.EtcdClient
KubeClient *client.Client
DockerClient dockertools.DockerInterface
CAdvisorPort uint
Address util.IP
AllowPrivileged bool
HostnameOverride string
RootDirectory string
ConfigFile string
ManifestURL string
FileCheckFrequency time.Duration
HTTPCheckFrequency time.Duration
Hostname string
PodInfraContainerImage string
SyncFrequency time.Duration
RegistryPullQPS float64
RegistryBurst int
MinimumGCAge time.Duration
MaxContainerCount int
ClusterDomain string
ClusterDNS util.IP
EnableServer bool
EnableDebuggingHandlers bool
Port uint
Runonce bool
MasterServiceNamespace string
VolumePlugins []volume.Plugin
StreamingConnectionIdleTimeout time.Duration
}
func createAndInitKubelet(kc *KubeletConfig, pc *config.PodConfig) (*kubelet.Kubelet, error) {
// TODO: block until all sources have delivered at least one update to the channel, or break the sync loop
// up into "per source" synchronizations
k, err := kubelet.NewMainKubelet(
kc.Hostname,
kc.DockerClient,
kc.EtcdClient,
kc.KubeClient,
kc.RootDirectory,
kc.PodInfraContainerImage,
kc.SyncFrequency,
float32(kc.RegistryPullQPS),
kc.RegistryBurst,
kc.MinimumGCAge,
kc.MaxContainerCount,
pc.IsSourceSeen,
kc.ClusterDomain,
net.IP(kc.ClusterDNS),
kc.MasterServiceNamespace,
kc.VolumePlugins,
kc.StreamingConnectionIdleTimeout)
if err != nil {
return nil, err
}
k.BirthCry()
go k.GarbageCollectLoop()
go kubelet.MonitorCAdvisor(k, kc.CAdvisorPort)
return k, nil
}

View File

@@ -25,7 +25,7 @@ import (
"os"
"runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/server"
"github.com/GoogleCloudPlatform/kubernetes/cmd/kubelet/app"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/version/verflag"
@@ -34,7 +34,7 @@ import (
func main() {
runtime.GOMAXPROCS(runtime.NumCPU())
s := server.NewKubeletServer()
s := app.NewKubeletServer()
s.AddFlags(pflag.CommandLine)
util.InitFlags()

View File

@@ -28,6 +28,7 @@ import (
"runtime"
"time"
kubeletapp "github.com/GoogleCloudPlatform/kubernetes/cmd/kubelet/app"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/resource"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/testapi"
@@ -36,7 +37,6 @@ import (
nodeControllerPkg "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/controller"
"github.com/GoogleCloudPlatform/kubernetes/pkg/controller"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools"
kubeletServer "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/server"
"github.com/GoogleCloudPlatform/kubernetes/pkg/master"
"github.com/GoogleCloudPlatform/kubernetes/pkg/master/ports"
"github.com/GoogleCloudPlatform/kubernetes/pkg/service"
@@ -144,7 +144,7 @@ func startComponents(etcdClient tools.EtcdClient, cl *client.Client, addr net.IP
runControllerManager(machineList, cl, *nodeMilliCPU, *nodeMemory)
dockerClient := dockertools.ConnectToDockerOrDie(*dockerEndpoint)
kubeletServer.SimpleRunKubelet(cl, nil, dockerClient, machineList[0], "/tmp/kubernetes", "", "127.0.0.1", 10250, *masterServiceNamespace, kubeletServer.ProbeVolumePlugins())
kubeletapp.SimpleRunKubelet(cl, nil, dockerClient, machineList[0], "/tmp/kubernetes", "", "127.0.0.1", 10250, *masterServiceNamespace, kubeletapp.ProbeVolumePlugins())
}
func newApiClient(addr net.IP, port int) *client.Client {