From 1820114a2d6beb7f031658893620cd0c4f1237d7 Mon Sep 17 00:00:00 2001 From: James DeFelice Date: Thu, 11 Jun 2015 13:13:19 +0000 Subject: [PATCH 1/2] Upstream controller manager and km binary Closes mesosphere/kubernetes-mesos#310 Depends on GoogleCloudPlatform/kubernetes#8882 - fix https://github.com/mesosphere/kubernetes-mesos/issues/336 - Fix comment typo - Fixes https://github.com/GoogleCloudPlatform/kubernetes/pull/9265#commitcomment-11559038 - Add warning to k8s modules to also update mesos copies - Fixes https://github.com/GoogleCloudPlatform/kubernetes/pull/9265#commitcomment-11558864 - Fixes https://github.com/GoogleCloudPlatform/kubernetes/pull/9265#commitcomment-11558855 - Add comments and TODO that hypercube and controllermanager need refactoring --- cmd/hyperkube/hyperkube.go | 2 + cmd/hyperkube/hyperkube_test.go | 2 + cmd/hyperkube/kube-apiserver.go | 2 + cmd/hyperkube/kube-controller-manager.go | 2 + cmd/hyperkube/kube-proxy.go | 2 + cmd/hyperkube/kube-scheduler.go | 2 + cmd/hyperkube/main.go | 3 + cmd/hyperkube/server.go | 2 + .../app/controllermanager.go | 3 + .../mesos/cmd/k8sm-controller-manager/doc.go | 23 + .../mesos/cmd/k8sm-controller-manager/main.go | 53 +++ contrib/mesos/cmd/km/doc.go | 24 + contrib/mesos/cmd/km/hyperkube.go | 203 ++++++++ contrib/mesos/cmd/km/hyperkube_test.go | 144 ++++++ .../mesos/cmd/km/k8sm-controllermanager.go | 38 ++ contrib/mesos/cmd/km/k8sm-executor.go | 40 ++ contrib/mesos/cmd/km/k8sm-scheduler.go | 39 ++ contrib/mesos/cmd/km/km.go | 37 ++ contrib/mesos/cmd/km/kube-apiserver.go | 38 ++ contrib/mesos/cmd/km/kube-proxy.go | 41 ++ contrib/mesos/cmd/km/server.go | 82 ++++ .../controllermanager/controllermanager.go | 183 ++++++++ contrib/mesos/pkg/controllermanager/doc.go | 20 + contrib/mesos/pkg/service/doc.go | 20 + .../mesos/pkg/service/endpoints_controller.go | 441 ++++++++++++++++++ .../pkg/service/endpoints_controller_test.go | 63 +++ pkg/service/endpoints_controller.go | 2 + 27 files changed, 1511 insertions(+) create mode 100644 contrib/mesos/cmd/k8sm-controller-manager/doc.go create mode 100644 contrib/mesos/cmd/k8sm-controller-manager/main.go create mode 100644 contrib/mesos/cmd/km/doc.go create mode 100644 contrib/mesos/cmd/km/hyperkube.go create mode 100644 contrib/mesos/cmd/km/hyperkube_test.go create mode 100644 contrib/mesos/cmd/km/k8sm-controllermanager.go create mode 100644 contrib/mesos/cmd/km/k8sm-executor.go create mode 100644 contrib/mesos/cmd/km/k8sm-scheduler.go create mode 100644 contrib/mesos/cmd/km/km.go create mode 100644 contrib/mesos/cmd/km/kube-apiserver.go create mode 100644 contrib/mesos/cmd/km/kube-proxy.go create mode 100644 contrib/mesos/cmd/km/server.go create mode 100644 contrib/mesos/pkg/controllermanager/controllermanager.go create mode 100644 contrib/mesos/pkg/controllermanager/doc.go create mode 100644 contrib/mesos/pkg/service/doc.go create mode 100644 contrib/mesos/pkg/service/endpoints_controller.go create mode 100644 contrib/mesos/pkg/service/endpoints_controller_test.go diff --git a/cmd/hyperkube/hyperkube.go b/cmd/hyperkube/hyperkube.go index a43bd0b9d61..ba120032579 100644 --- a/cmd/hyperkube/hyperkube.go +++ b/cmd/hyperkube/hyperkube.go @@ -14,6 +14,8 @@ See the License for the specific language governing permissions and limitations under the License. */ +// CAUTION: If you update code in this file, you may need to also update code +// in contrib/mesos/cmd/km/hyperkube.go package main import ( diff --git a/cmd/hyperkube/hyperkube_test.go b/cmd/hyperkube/hyperkube_test.go index 3843083ec79..55f04978df3 100644 --- a/cmd/hyperkube/hyperkube_test.go +++ b/cmd/hyperkube/hyperkube_test.go @@ -14,6 +14,8 @@ See the License for the specific language governing permissions and limitations under the License. */ +// CAUTION: If you update code in this file, you may need to also update code +// in contrib/mesos/cmd/km/hyperkube_test.go package main import ( diff --git a/cmd/hyperkube/kube-apiserver.go b/cmd/hyperkube/kube-apiserver.go index f9ef7a92d3b..e6942ef9b5d 100644 --- a/cmd/hyperkube/kube-apiserver.go +++ b/cmd/hyperkube/kube-apiserver.go @@ -14,6 +14,8 @@ See the License for the specific language governing permissions and limitations under the License. */ +// CAUTION: If you update code in this file, you may need to also update code +// in contrib/mesos/cmd/km/kube-apiserver.go package main import ( diff --git a/cmd/hyperkube/kube-controller-manager.go b/cmd/hyperkube/kube-controller-manager.go index 2b90f706ef0..f99e5244e14 100644 --- a/cmd/hyperkube/kube-controller-manager.go +++ b/cmd/hyperkube/kube-controller-manager.go @@ -14,6 +14,8 @@ See the License for the specific language governing permissions and limitations under the License. */ +// CAUTION: If you update code in this file, you may need to also update code +// in contrib/mesos/cmd/km/k8sm-controllermanager.go package main import ( diff --git a/cmd/hyperkube/kube-proxy.go b/cmd/hyperkube/kube-proxy.go index fc260250145..ffaa1a54c13 100644 --- a/cmd/hyperkube/kube-proxy.go +++ b/cmd/hyperkube/kube-proxy.go @@ -14,6 +14,8 @@ See the License for the specific language governing permissions and limitations under the License. */ +// CAUTION: If you update code in this file, you may need to also update code +// in contrib/mesos/cmd/km/kube-proxy.go package main import ( diff --git a/cmd/hyperkube/kube-scheduler.go b/cmd/hyperkube/kube-scheduler.go index a2824e93097..ededc4ed8ae 100644 --- a/cmd/hyperkube/kube-scheduler.go +++ b/cmd/hyperkube/kube-scheduler.go @@ -14,6 +14,8 @@ See the License for the specific language governing permissions and limitations under the License. */ +// CAUTION: If you update code in this file, you may need to also update code +// in contrib/mesos/cmd/km/k8sm-scheduler.go package main import ( diff --git a/cmd/hyperkube/main.go b/cmd/hyperkube/main.go index d57be2111c5..3a2b92fa905 100644 --- a/cmd/hyperkube/main.go +++ b/cmd/hyperkube/main.go @@ -16,6 +16,9 @@ limitations under the License. // A binary that can morph into all of the other kubernetes binaries. You can // also soft-link to it busybox style. +// +// CAUTION: If you update code in this file, you may need to also update code +// in contrib/mesos/cmd/km/km.go package main import ( diff --git a/cmd/hyperkube/server.go b/cmd/hyperkube/server.go index f84dd2e3ed7..ece91b803b5 100644 --- a/cmd/hyperkube/server.go +++ b/cmd/hyperkube/server.go @@ -14,6 +14,8 @@ See the License for the specific language governing permissions and limitations under the License. */ +// CAUTION: If you update code in this file, you may need to also update code +// in contrib/mesos/cmd/km/server.go package main import ( diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index 8d68d457aab..7054dcaccec 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -17,6 +17,9 @@ limitations under the License. // Package app implements a server that runs a set of active // components. This includes replication controllers, service endpoints and // nodes. +// +// CAUTION: If you update code in this file, you may need to also update code +// in contrib/mesos/pkg/controlmanager/controlmanager.go package app import ( diff --git a/contrib/mesos/cmd/k8sm-controller-manager/doc.go b/contrib/mesos/cmd/k8sm-controller-manager/doc.go new file mode 100644 index 00000000000..aa8507a3a64 --- /dev/null +++ b/contrib/mesos/cmd/k8sm-controller-manager/doc.go @@ -0,0 +1,23 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// This package main implements the executable Kubernetes Mesos controller manager. +// +// It is mainly a clone of the upstream cmd/hyperkube module right now because +// the upstream hyperkube module is not reusable. +// +// TODO(jdef,sttts): refactor upstream cmd/kube-controller-manager to be reusable with the necessary mesos changes +package main diff --git a/contrib/mesos/cmd/k8sm-controller-manager/main.go b/contrib/mesos/cmd/k8sm-controller-manager/main.go new file mode 100644 index 00000000000..25917f72517 --- /dev/null +++ b/contrib/mesos/cmd/k8sm-controller-manager/main.go @@ -0,0 +1,53 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +import ( + "fmt" + "os" + "runtime" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/healthz" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" + "github.com/GoogleCloudPlatform/kubernetes/pkg/version/verflag" + + "github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/controllermanager" + + "github.com/spf13/pflag" +) + +func init() { + healthz.DefaultHealthz() +} + +func main() { + runtime.GOMAXPROCS(runtime.NumCPU()) + + s := controllermanager.NewCMServer() + s.AddFlags(pflag.CommandLine) + + util.InitFlags() + util.InitLogs() + defer util.FlushLogs() + + verflag.PrintAndExitIfRequested() + + if err := s.Run(pflag.CommandLine.Args()); err != nil { + fmt.Fprintf(os.Stderr, err.Error()) + os.Exit(1) + } +} diff --git a/contrib/mesos/cmd/km/doc.go b/contrib/mesos/cmd/km/doc.go new file mode 100644 index 00000000000..a46a0f8e800 --- /dev/null +++ b/contrib/mesos/cmd/km/doc.go @@ -0,0 +1,24 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// This package main morphs all binaries under cmd/ and several other stock +// Kubernetes binaries into a single executable. +// +// It is mainly a clone of the upstream cmd/hyperkube module right now because +// the upstream hyperkube module is not reusable. +// +// TODO(jdef,sttts): refactor upstream cmd/hyperkube to be reusable with the necessary mesos changes +package main diff --git a/contrib/mesos/cmd/km/hyperkube.go b/contrib/mesos/cmd/km/hyperkube.go new file mode 100644 index 00000000000..b8e2e9dffc9 --- /dev/null +++ b/contrib/mesos/cmd/km/hyperkube.go @@ -0,0 +1,203 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// clone of the upstream cmd/hypercube/hyperkube.go +package main + +import ( + "errors" + "flag" + "fmt" + "io" + "io/ioutil" + "os" + "path" + "runtime" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" + "github.com/GoogleCloudPlatform/kubernetes/pkg/version/verflag" + + "github.com/spf13/pflag" +) + +// HyperKube represents a single binary that can morph/manage into multiple +// servers. +type HyperKube struct { + Name string // The executable name, used for help and soft-link invocation + Long string // A long description of the binary. It will be world wrapped before output. + + servers []Server + baseFlags *pflag.FlagSet + out io.Writer + helpFlagVal bool +} + +// AddServer adds a server to the HyperKube object. +func (hk *HyperKube) AddServer(s *Server) { + hk.servers = append(hk.servers, *s) + hk.servers[len(hk.servers)-1].hk = hk +} + +// FindServer will find a specific server named name. +func (hk *HyperKube) FindServer(name string) (*Server, error) { + for _, s := range hk.servers { + if s.Name() == name { + return &s, nil + } + } + return nil, fmt.Errorf("Server not found: %s", name) +} + +// Servers returns a list of all of the registred servers +func (hk *HyperKube) Servers() []Server { + return hk.servers +} + +// Flags returns a flagset for "global" flags. +func (hk *HyperKube) Flags() *pflag.FlagSet { + if hk.baseFlags == nil { + hk.baseFlags = pflag.NewFlagSet(hk.Name, pflag.ContinueOnError) + hk.baseFlags.SetOutput(ioutil.Discard) + hk.baseFlags.BoolVarP(&hk.helpFlagVal, "help", "h", false, "help for "+hk.Name) + + // These will add all of the "global" flags (defined with both the + // flag and pflag packages) to the new flag set we have. + util.AddFlagSetToPFlagSet(flag.CommandLine, hk.baseFlags) + util.AddPFlagSetToPFlagSet(pflag.CommandLine, hk.baseFlags) + + } + return hk.baseFlags +} + +// Out returns the io.Writer that is used for all usage/error information +func (hk *HyperKube) Out() io.Writer { + if hk.out == nil { + hk.out = os.Stderr + } + return hk.out +} + +// SetOut sets the output writer for all usage/error information +func (hk *HyperKube) SetOut(w io.Writer) { + hk.out = w +} + +// Print is a convenience method to Print to the defined output +func (hk *HyperKube) Print(i ...interface{}) { + fmt.Fprint(hk.Out(), i...) +} + +// Println is a convenience method to Println to the defined output +func (hk *HyperKube) Println(i ...interface{}) { + fmt.Fprintln(hk.Out(), i...) +} + +// Printf is a convenience method to Printf to the defined output +func (hk *HyperKube) Printf(format string, i ...interface{}) { + fmt.Fprintf(hk.Out(), format, i...) +} + +// Run the server. This will pick the appropriate server and run it. +func (hk *HyperKube) Run(args []string) error { + // If we are called directly, parse all flags up to the first real + // argument. That should be the server to run. + baseCommand := path.Base(args[0]) + serverName := baseCommand + if serverName == hk.Name { + args = args[1:] + + baseFlags := hk.Flags() + baseFlags.SetInterspersed(false) // Only parse flags up to the next real command + err := baseFlags.Parse(args) + if err != nil || hk.helpFlagVal { + if err != nil { + hk.Println("Error:", err) + } + hk.Usage() + return err + } + + verflag.PrintAndExitIfRequested() + + args = baseFlags.Args() + if len(args) > 0 && len(args[0]) > 0 { + serverName = args[0] + baseCommand = baseCommand + " " + serverName + args = args[1:] + } else { + err = errors.New("No server specified") + hk.Printf("Error: %v\n\n", err) + hk.Usage() + return err + } + } + + s, err := hk.FindServer(serverName) + if err != nil { + hk.Printf("Error: %v\n\n", err) + hk.Usage() + return err + } + + util.AddPFlagSetToPFlagSet(hk.Flags(), s.Flags()) + err = s.Flags().Parse(args) + if err != nil || hk.helpFlagVal { + if err != nil { + hk.Printf("Error: %v\n\n", err) + } + s.Usage() + return err + } + + verflag.PrintAndExitIfRequested() + + util.InitLogs() + defer util.FlushLogs() + + err = s.Run(s, s.Flags().Args()) + if err != nil { + hk.Println("Error:", err) + } + + return err +} + +// RunToExit will run the hyperkube and then call os.Exit with an appropriate exit code. +func (hk *HyperKube) RunToExit(args []string) { + runtime.GOMAXPROCS(runtime.NumCPU()) + err := hk.Run(args) + if err != nil { + fmt.Fprint(os.Stderr, err.Error()) + os.Exit(1) + } + os.Exit(0) +} + +// Usage will write out a summary for all servers that this binary supports. +func (hk *HyperKube) Usage() { + tt := `{{if .Long}}{{.Long | trim | wrap ""}} +{{end}}Usage + + {{.Name}} [flags] + +Servers +{{range .Servers}} + {{.Name}} +{{.Long | trim | wrap " "}}{{end}} +Call '{{.Name}} --help' for help on a specific server. +` + util.ExecuteTemplate(hk.Out(), tt, hk) +} diff --git a/contrib/mesos/cmd/km/hyperkube_test.go b/contrib/mesos/cmd/km/hyperkube_test.go new file mode 100644 index 00000000000..a114c57a802 --- /dev/null +++ b/contrib/mesos/cmd/km/hyperkube_test.go @@ -0,0 +1,144 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// clone of the upstream cmd/hypercube/hyperkube_test.go +package main + +import ( + "bytes" + "errors" + "fmt" + "strings" + "testing" + + "github.com/stretchr/testify/assert" +) + +type result struct { + err error + output string +} + +func testServer(n string) *Server { + return &Server{ + SimpleUsage: n, + Long: fmt.Sprintf("A simple server named %s", n), + Run: func(s *Server, args []string) error { + s.hk.Printf("%s Run\n", s.Name()) + return nil + }, + } +} +func testServerError(n string) *Server { + return &Server{ + SimpleUsage: n, + Long: fmt.Sprintf("A simple server named %s that returns an error", n), + Run: func(s *Server, args []string) error { + s.hk.Printf("%s Run\n", s.Name()) + return errors.New("Server returning error") + }, + } +} + +func runFull(t *testing.T, args string) *result { + buf := new(bytes.Buffer) + hk := HyperKube{ + Name: "hyperkube", + Long: "hyperkube is an all-in-one server binary.", + } + hk.SetOut(buf) + + hk.AddServer(testServer("test1")) + hk.AddServer(testServer("test2")) + hk.AddServer(testServer("test3")) + hk.AddServer(testServerError("test-error")) + + a := strings.Split(args, " ") + t.Logf("Running full with args: %q", a) + err := hk.Run(a) + + r := &result{err, buf.String()} + t.Logf("Result err: %v, output: %q", r.err, r.output) + + return r +} + +func TestRun(t *testing.T) { + x := runFull(t, "hyperkube test1") + assert.Contains(t, x.output, "test1 Run") + assert.NoError(t, x.err) +} + +func TestLinkRun(t *testing.T) { + x := runFull(t, "test1") + assert.Contains(t, x.output, "test1 Run") + assert.NoError(t, x.err) +} + +func TestTopNoArgs(t *testing.T) { + x := runFull(t, "hyperkube") + assert.EqualError(t, x.err, "No server specified") +} + +func TestBadServer(t *testing.T) { + x := runFull(t, "hyperkube bad-server") + assert.EqualError(t, x.err, "Server not found: bad-server") + assert.Contains(t, x.output, "Usage") +} + +func TestTopHelp(t *testing.T) { + x := runFull(t, "hyperkube --help") + assert.NoError(t, x.err) + assert.Contains(t, x.output, "all-in-one") + assert.Contains(t, x.output, "A simple server named test1") +} + +func TestTopFlags(t *testing.T) { + x := runFull(t, "hyperkube --help test1") + assert.NoError(t, x.err) + assert.Contains(t, x.output, "all-in-one") + assert.Contains(t, x.output, "A simple server named test1") + assert.NotContains(t, x.output, "test1 Run") +} + +func TestTopFlagsBad(t *testing.T) { + x := runFull(t, "hyperkube --bad-flag") + assert.EqualError(t, x.err, "unknown flag: --bad-flag") + assert.Contains(t, x.output, "all-in-one") + assert.Contains(t, x.output, "A simple server named test1") +} + +func TestServerHelp(t *testing.T) { + x := runFull(t, "hyperkube test1 --help") + assert.NoError(t, x.err) + assert.Contains(t, x.output, "A simple server named test1") + assert.Contains(t, x.output, "--help=false: help for hyperkube") + assert.NotContains(t, x.output, "test1 Run") +} + +func TestServerFlagsBad(t *testing.T) { + x := runFull(t, "hyperkube test1 --bad-flag") + assert.EqualError(t, x.err, "unknown flag: --bad-flag") + assert.Contains(t, x.output, "A simple server named test1") + assert.Contains(t, x.output, "--help=false: help for hyperkube") + assert.NotContains(t, x.output, "test1 Run") +} + +func TestServerError(t *testing.T) { + x := runFull(t, "hyperkube test-error") + assert.Contains(t, x.output, "test-error Run") + assert.EqualError(t, x.err, "Server returning error") +} diff --git a/contrib/mesos/cmd/km/k8sm-controllermanager.go b/contrib/mesos/cmd/km/k8sm-controllermanager.go new file mode 100644 index 00000000000..db87b84bf9d --- /dev/null +++ b/contrib/mesos/cmd/km/k8sm-controllermanager.go @@ -0,0 +1,38 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// clone of the upstream cmd/hypercube/kube-controllermanager.go +package main + +import ( + "github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/controllermanager" +) + +// NewHyperkubeServer creates a new hyperkube Server object that includes the +// description and flags. +func NewControllerManager() *Server { + s := controllermanager.NewCMServer() + + hks := Server{ + SimpleUsage: "controller-manager", + Long: "A server that runs a set of active components. This includes replication controllers, service endpoints and nodes.", + Run: func(_ *Server, args []string) error { + return s.Run(args) + }, + } + s.AddFlags(hks.Flags()) + return &hks +} diff --git a/contrib/mesos/cmd/km/k8sm-executor.go b/contrib/mesos/cmd/km/k8sm-executor.go new file mode 100644 index 00000000000..8c90a694e74 --- /dev/null +++ b/contrib/mesos/cmd/km/k8sm-executor.go @@ -0,0 +1,40 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +import ( + "github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/executor/service" +) + +// NewHyperkubeServer creates a new hyperkube Server object that includes the +// description and flags. +func NewKubeletExecutor() *Server { + s := service.NewHyperKubeletExecutorServer() + hks := Server{ + SimpleUsage: "executor", + Long: `The kubelet-executor binary is responsible for maintaining a set of containers +on a particular node. It syncs data from a specialized Mesos source that tracks +task launches and kills. It then queries Docker to see what is currently +running. It synchronizes the configuration data, with the running set of +containers by starting or stopping Docker containers.`, + Run: func(hks *Server, args []string) error { + return s.Run(hks, args) + }, + } + s.AddHyperkubeFlags(hks.Flags()) + return &hks +} diff --git a/contrib/mesos/cmd/km/k8sm-scheduler.go b/contrib/mesos/cmd/km/k8sm-scheduler.go new file mode 100644 index 00000000000..ba759dc4a00 --- /dev/null +++ b/contrib/mesos/cmd/km/k8sm-scheduler.go @@ -0,0 +1,39 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// clone of the upstream cmd/hypercube/k8sm-scheduler.go +package main + +import ( + "github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/scheduler/service" +) + +// NewScheduler creates a new hyperkube Server object that includes the +// description and flags. +func NewScheduler() *Server { + s := service.NewSchedulerServer() + + hks := Server{ + SimpleUsage: "scheduler", + Long: `Implements the Kubernetes-Mesos scheduler. This will launch Mesos tasks which +results in pods assigned to kubelets based on capacity and constraints.`, + Run: func(hks *Server, args []string) error { + return s.Run(hks, args) + }, + } + s.AddHyperkubeFlags(hks.Flags()) + return &hks +} diff --git a/contrib/mesos/cmd/km/km.go b/contrib/mesos/cmd/km/km.go new file mode 100644 index 00000000000..e5a7292cabd --- /dev/null +++ b/contrib/mesos/cmd/km/km.go @@ -0,0 +1,37 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// clone of the upstream cmd/hypercube/main.go +package main + +import ( + "os" +) + +func main() { + hk := HyperKube{ + Name: "km", + Long: "This is an all-in-one binary that can run any of the various Kubernetes-Mesos servers.", + } + + hk.AddServer(NewKubeAPIServer()) + hk.AddServer(NewControllerManager()) + hk.AddServer(NewScheduler()) + hk.AddServer(NewKubeletExecutor()) + hk.AddServer(NewKubeProxy()) + + hk.RunToExit(os.Args) +} diff --git a/contrib/mesos/cmd/km/kube-apiserver.go b/contrib/mesos/cmd/km/kube-apiserver.go new file mode 100644 index 00000000000..81ed2d83336 --- /dev/null +++ b/contrib/mesos/cmd/km/kube-apiserver.go @@ -0,0 +1,38 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// clone of the upstream cmd/hypercube/kube-apiserver.go +package main + +import ( + kubeapiserver "github.com/GoogleCloudPlatform/kubernetes/cmd/kube-apiserver/app" +) + +// NewKubeAPIServer creates a new hyperkube Server object that includes the +// description and flags. +func NewKubeAPIServer() *Server { + s := kubeapiserver.NewAPIServer() + + hks := Server{ + SimpleUsage: "apiserver", + Long: "The main API entrypoint and interface to the storage system. The API server is also the focal point for all authorization decisions.", + Run: func(_ *Server, args []string) error { + return s.Run(args) + }, + } + s.AddFlags(hks.Flags()) + return &hks +} diff --git a/contrib/mesos/cmd/km/kube-proxy.go b/contrib/mesos/cmd/km/kube-proxy.go new file mode 100644 index 00000000000..be7522913bc --- /dev/null +++ b/contrib/mesos/cmd/km/kube-proxy.go @@ -0,0 +1,41 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// clone of the upstream cmd/hypercube/kube-proxy.go +package main + +import ( + kubeproxy "github.com/GoogleCloudPlatform/kubernetes/cmd/kube-proxy/app" +) + +// NewKubeProxy creates a new hyperkube Server object that includes the +// description and flags. +func NewKubeProxy() *Server { + s := kubeproxy.NewProxyServer() + + hks := Server{ + SimpleUsage: "proxy", + Long: `The Kubernetes proxy server is responsible for taking traffic directed at + services and forwarding it to the appropriate pods. It generally runs on + nodes next to the Kubelet and proxies traffic from local pods to remote pods. + It is also used when handling incoming external traffic.`, + Run: func(_ *Server, args []string) error { + return s.Run(args) + }, + } + s.AddFlags(hks.Flags()) + return &hks +} diff --git a/contrib/mesos/cmd/km/server.go b/contrib/mesos/cmd/km/server.go new file mode 100644 index 00000000000..b00599a5cda --- /dev/null +++ b/contrib/mesos/cmd/km/server.go @@ -0,0 +1,82 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// clone of the upstream cmd/hypercube/server.go +package main + +import ( + "io/ioutil" + "strings" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" + + "github.com/spf13/pflag" +) + +type serverRunFunc func(s *Server, args []string) error + +// Server describes a server that this binary can morph into. +type Server struct { + SimpleUsage string // One line description of the server. + Long string // Longer free form description of the server + Run serverRunFunc // Run the server. This is not expected to return. + + flags *pflag.FlagSet // Flags for the command (and all dependents) + name string + hk *HyperKube +} + +// Usage returns the full usage string including all of the flags. +func (s *Server) Usage() error { + tt := `{{if .Long}}{{.Long | trim | wrap ""}} +{{end}}Usage: + {{.SimpleUsage}} [flags] + +Available Flags: +{{.Flags.FlagUsages}}` + + return util.ExecuteTemplate(s.hk.Out(), tt, s) +} + +// Name returns the name of the command as derived from the usage line. +func (s *Server) Name() string { + if s.name != "" { + return s.name + } + name := s.SimpleUsage + i := strings.Index(name, " ") + if i >= 0 { + name = name[:i] + } + return name +} + +// Flags returns a flagset for this server +func (s *Server) Flags() *pflag.FlagSet { + if s.flags == nil { + s.flags = pflag.NewFlagSet(s.Name(), pflag.ContinueOnError) + s.flags.SetOutput(ioutil.Discard) + } + return s.flags +} + +func (s *Server) FindServer(name string) bool { + if s == nil { + return false + } + _, err := s.hk.FindServer(name) + return err == nil +} diff --git a/contrib/mesos/pkg/controllermanager/controllermanager.go b/contrib/mesos/pkg/controllermanager/controllermanager.go new file mode 100644 index 00000000000..75651f7a26a --- /dev/null +++ b/contrib/mesos/pkg/controllermanager/controllermanager.go @@ -0,0 +1,183 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controllermanager + +import ( + "net" + "net/http" + "strconv" + + "github.com/GoogleCloudPlatform/kubernetes/cmd/kube-controller-manager/app" + "github.com/GoogleCloudPlatform/kubernetes/pkg/client" + "github.com/GoogleCloudPlatform/kubernetes/pkg/client/clientcmd" + clientcmdapi "github.com/GoogleCloudPlatform/kubernetes/pkg/client/clientcmd/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider" + "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/mesos" + "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/nodecontroller" + "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/routecontroller" + "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/servicecontroller" + "github.com/GoogleCloudPlatform/kubernetes/pkg/controller" + "github.com/GoogleCloudPlatform/kubernetes/pkg/healthz" + "github.com/GoogleCloudPlatform/kubernetes/pkg/namespace" + "github.com/GoogleCloudPlatform/kubernetes/pkg/resourcequota" + kendpoint "github.com/GoogleCloudPlatform/kubernetes/pkg/service" + "github.com/GoogleCloudPlatform/kubernetes/pkg/serviceaccount" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" + "github.com/GoogleCloudPlatform/kubernetes/pkg/volumeclaimbinder" + + "github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/profile" + kmendpoint "github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/service" + + "github.com/golang/glog" + "github.com/prometheus/client_golang/prometheus" + "github.com/spf13/pflag" +) + +// CMServer is the main context object for the controller manager. +type CMServer struct { + *app.CMServer + UseHostPortEndpoints bool +} + +// NewCMServer creates a new CMServer with a default config. +func NewCMServer() *CMServer { + s := &CMServer{ + CMServer: app.NewCMServer(), + } + s.CloudProvider = mesos.ProviderName + s.UseHostPortEndpoints = true + return s +} + +// AddFlags adds flags for a specific CMServer to the specified FlagSet +func (s *CMServer) AddFlags(fs *pflag.FlagSet) { + s.CMServer.AddFlags(fs) + fs.BoolVar(&s.UseHostPortEndpoints, "host_port_endpoints", s.UseHostPortEndpoints, "Map service endpoints to hostIP:hostPort instead of podIP:containerPort. Default true.") +} + +func (s *CMServer) Run(_ []string) error { + if s.Kubeconfig == "" && s.Master == "" { + glog.Warningf("Neither --kubeconfig nor --master was specified. Using default API client. This might not work.") + } + + // This creates a client, first loading any specified kubeconfig + // file, and then overriding the Master flag, if non-empty. + kubeconfig, err := clientcmd.NewNonInteractiveDeferredLoadingClientConfig( + &clientcmd.ClientConfigLoadingRules{ExplicitPath: s.Kubeconfig}, + &clientcmd.ConfigOverrides{ClusterInfo: clientcmdapi.Cluster{Server: s.Master}}).ClientConfig() + if err != nil { + return err + } + + kubeconfig.QPS = 20.0 + kubeconfig.Burst = 30 + + kubeClient, err := client.New(kubeconfig) + if err != nil { + glog.Fatalf("Invalid API configuration: %v", err) + } + + go func() { + mux := http.NewServeMux() + healthz.InstallHandler(mux) + if s.EnableProfiling { + profile.InstallHandler(mux) + } + mux.Handle("/metrics", prometheus.Handler()) + server := &http.Server{ + Addr: net.JoinHostPort(s.Address.String(), strconv.Itoa(s.Port)), + Handler: mux, + } + glog.Fatal(server.ListenAndServe()) + }() + + endpoints := s.createEndpointController(kubeClient) + go endpoints.Run(s.ConcurrentEndpointSyncs, util.NeverStop) + + controllerManager := controller.NewReplicationManager(kubeClient, controller.BurstReplicas) + go controllerManager.Run(s.ConcurrentRCSyncs, util.NeverStop) + + //TODO(jdef) should eventually support more cloud providers here + if s.CloudProvider != mesos.ProviderName { + glog.Fatalf("Only provider %v is supported, you specified %v", mesos.ProviderName, s.CloudProvider) + } + cloud := cloudprovider.InitCloudProvider(s.CloudProvider, s.CloudConfigFile) + + nodeController := nodecontroller.NewNodeController(cloud, kubeClient, s.RegisterRetryCount, + s.PodEvictionTimeout, nodecontroller.NewPodEvictor(util.NewTokenBucketRateLimiter(s.DeletingPodsQps, s.DeletingPodsBurst)), + s.NodeMonitorGracePeriod, s.NodeStartupGracePeriod, s.NodeMonitorPeriod, (*net.IPNet)(&s.ClusterCIDR), s.AllocateNodeCIDRs) + nodeController.Run(s.NodeSyncPeriod) + + serviceController := servicecontroller.New(cloud, kubeClient, s.ClusterName) + if err := serviceController.Run(s.NodeSyncPeriod); err != nil { + glog.Errorf("Failed to start service controller: %v", err) + } + + if s.AllocateNodeCIDRs { + routes, ok := cloud.Routes() + if !ok { + glog.Fatal("Cloud provider must support routes if allocate-node-cidrs is set") + } + routeController := routecontroller.New(routes, kubeClient, s.ClusterName, (*net.IPNet)(&s.ClusterCIDR)) + routeController.Run(s.NodeSyncPeriod) + } + + resourceQuotaManager := resourcequota.NewResourceQuotaManager(kubeClient) + resourceQuotaManager.Run(s.ResourceQuotaSyncPeriod) + + namespaceManager := namespace.NewNamespaceManager(kubeClient, s.NamespaceSyncPeriod) + namespaceManager.Run() + + pvclaimBinder := volumeclaimbinder.NewPersistentVolumeClaimBinder(kubeClient, s.PVClaimBinderSyncPeriod) + pvclaimBinder.Run() + pvRecycler, err := volumeclaimbinder.NewPersistentVolumeRecycler(kubeClient, s.PVClaimBinderSyncPeriod, app.ProbeRecyclableVolumePlugins()) + if err != nil { + glog.Fatalf("Failed to start persistent volume recycler: %+v", err) + } + pvRecycler.Run() + + if len(s.ServiceAccountKeyFile) > 0 { + privateKey, err := serviceaccount.ReadPrivateKey(s.ServiceAccountKeyFile) + if err != nil { + glog.Errorf("Error reading key for service account token controller: %v", err) + } else { + serviceaccount.NewTokensController( + kubeClient, + serviceaccount.DefaultTokenControllerOptions( + serviceaccount.JWTTokenGenerator(privateKey), + ), + ).Run() + } + } + + serviceaccount.NewServiceAccountsController( + kubeClient, + serviceaccount.DefaultServiceAccountsControllerOptions(), + ).Run() + + select {} +} + +func (s *CMServer) createEndpointController(client *client.Client) kmendpoint.EndpointController { + if s.UseHostPortEndpoints { + glog.V(2).Infof("Creating hostIP:hostPort endpoint controller") + return kmendpoint.NewEndpointController(client) + } + glog.V(2).Infof("Creating podIP:containerPort endpoint controller") + stockEndpointController := kendpoint.NewEndpointController(client) + return stockEndpointController +} diff --git a/contrib/mesos/pkg/controllermanager/doc.go b/contrib/mesos/pkg/controllermanager/doc.go new file mode 100644 index 00000000000..63c28eed891 --- /dev/null +++ b/contrib/mesos/pkg/controllermanager/doc.go @@ -0,0 +1,20 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package controllermanager is largely a clone of the upstream implementation, +// with additional functionality to select between stock or a customized +// endpoints controller. +package controllermanager diff --git a/contrib/mesos/pkg/service/doc.go b/contrib/mesos/pkg/service/doc.go new file mode 100644 index 00000000000..04b35a7e486 --- /dev/null +++ b/contrib/mesos/pkg/service/doc.go @@ -0,0 +1,20 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package service is largely a clone of the stock Kubernetes endpoints +// controller, extended with some very specific functionality related +// to kubernetes-mesos specific host-pod port mapping. +package service diff --git a/contrib/mesos/pkg/service/endpoints_controller.go b/contrib/mesos/pkg/service/endpoints_controller.go new file mode 100644 index 00000000000..f0eb055023e --- /dev/null +++ b/contrib/mesos/pkg/service/endpoints_controller.go @@ -0,0 +1,441 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package service + +import ( + "fmt" + "reflect" + "strconv" + "time" + + "github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/scheduler/meta" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api/endpoints" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" + "github.com/GoogleCloudPlatform/kubernetes/pkg/client" + "github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache" + "github.com/GoogleCloudPlatform/kubernetes/pkg/controller/framework" + "github.com/GoogleCloudPlatform/kubernetes/pkg/fields" + "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" + kservice "github.com/GoogleCloudPlatform/kubernetes/pkg/service" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util/workqueue" + "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" + + "github.com/golang/glog" +) + +var ( + keyFunc = framework.DeletionHandlingMetaNamespaceKeyFunc +) + +type EndpointController interface { + Run(workers int, stopCh <-chan struct{}) +} + +// NewEndpointController returns a new *EndpointController. +func NewEndpointController(client *client.Client) *endpointController { + e := &endpointController{ + client: client, + queue: workqueue.New(), + } + e.serviceStore.Store, e.serviceController = framework.NewInformer( + &cache.ListWatch{ + ListFunc: func() (runtime.Object, error) { + return e.client.Services(api.NamespaceAll).List(labels.Everything()) + }, + WatchFunc: func(rv string) (watch.Interface, error) { + return e.client.Services(api.NamespaceAll).Watch(labels.Everything(), fields.Everything(), rv) + }, + }, + &api.Service{}, + kservice.FullServiceResyncPeriod, + framework.ResourceEventHandlerFuncs{ + AddFunc: e.enqueueService, + UpdateFunc: func(old, cur interface{}) { + e.enqueueService(cur) + }, + DeleteFunc: e.enqueueService, + }, + ) + + e.podStore.Store, e.podController = framework.NewInformer( + &cache.ListWatch{ + ListFunc: func() (runtime.Object, error) { + return e.client.Pods(api.NamespaceAll).List(labels.Everything(), fields.Everything()) + }, + WatchFunc: func(rv string) (watch.Interface, error) { + return e.client.Pods(api.NamespaceAll).Watch(labels.Everything(), fields.Everything(), rv) + }, + }, + &api.Pod{}, + kservice.PodRelistPeriod, + framework.ResourceEventHandlerFuncs{ + AddFunc: e.addPod, + UpdateFunc: e.updatePod, + DeleteFunc: e.deletePod, + }, + ) + return e +} + +// EndpointController manages selector-based service endpoints. +type endpointController struct { + client *client.Client + + serviceStore cache.StoreToServiceLister + podStore cache.StoreToPodLister + + // Services that need to be updated. A channel is inappropriate here, + // because it allows services with lots of pods to be serviced much + // more often than services with few pods; it also would cause a + // service that's inserted multiple times to be processed more than + // necessary. + queue *workqueue.Type + + // Since we join two objects, we'll watch both of them with + // controllers. + serviceController *framework.Controller + podController *framework.Controller +} + +// Runs e; will not return until stopCh is closed. workers determines how many +// endpoints will be handled in parallel. +func (e *endpointController) Run(workers int, stopCh <-chan struct{}) { + defer util.HandleCrash() + go e.serviceController.Run(stopCh) + go e.podController.Run(stopCh) + for i := 0; i < workers; i++ { + go util.Until(e.worker, time.Second, stopCh) + } + go func() { + defer util.HandleCrash() + time.Sleep(5 * time.Minute) // give time for our cache to fill + e.checkLeftoverEndpoints() + }() + <-stopCh + e.queue.ShutDown() +} + +func (e *endpointController) getPodServiceMemberships(pod *api.Pod) (util.StringSet, error) { + set := util.StringSet{} + services, err := e.serviceStore.GetPodServices(pod) + if err != nil { + // don't log this error because this function makes pointless + // errors when no services match. + return set, nil + } + for i := range services { + key, err := keyFunc(&services[i]) + if err != nil { + return nil, err + } + set.Insert(key) + } + return set, nil +} + +// When a pod is added, figure out what services it will be a member of and +// enqueue them. obj must have *api.Pod type. +func (e *endpointController) addPod(obj interface{}) { + pod := obj.(*api.Pod) + services, err := e.getPodServiceMemberships(pod) + if err != nil { + glog.Errorf("Unable to get pod %v/%v's service memberships: %v", pod.Namespace, pod.Name, err) + return + } + for key := range services { + e.queue.Add(key) + } +} + +// When a pod is updated, figure out what services it used to be a member of +// and what services it will be a member of, and enqueue the union of these. +// old and cur must be *api.Pod types. +func (e *endpointController) updatePod(old, cur interface{}) { + if api.Semantic.DeepEqual(old, cur) { + return + } + newPod := old.(*api.Pod) + services, err := e.getPodServiceMemberships(newPod) + if err != nil { + glog.Errorf("Unable to get pod %v/%v's service memberships: %v", newPod.Namespace, newPod.Name, err) + return + } + + oldPod := cur.(*api.Pod) + // Only need to get the old services if the labels changed. + if !reflect.DeepEqual(newPod.Labels, oldPod.Labels) { + oldServices, err := e.getPodServiceMemberships(oldPod) + if err != nil { + glog.Errorf("Unable to get pod %v/%v's service memberships: %v", oldPod.Namespace, oldPod.Name, err) + return + } + services = services.Union(oldServices) + } + for key := range services { + e.queue.Add(key) + } +} + +// When a pod is deleted, enqueue the services the pod used to be a member of. +// obj could be an *api.Pod, or a DeletionFinalStateUnknown marker item. +func (e *endpointController) deletePod(obj interface{}) { + if _, ok := obj.(*api.Pod); ok { + // Enqueue all the services that the pod used to be a member + // of. This happens to be exactly the same thing we do when a + // pod is added. + e.addPod(obj) + return + } + podKey, err := keyFunc(obj) + if err != nil { + glog.Errorf("Couldn't get key for object %+v: %v", obj, err) + } + glog.Infof("Pod %q was deleted but we don't have a record of its final state, so it will take up to %v before it will be removed from all endpoint records.", podKey, kservice.FullServiceResyncPeriod) + + // TODO: keep a map of pods to services to handle this condition. +} + +// obj could be an *api.Service, or a DeletionFinalStateUnknown marker item. +func (e *endpointController) enqueueService(obj interface{}) { + key, err := keyFunc(obj) + if err != nil { + glog.Errorf("Couldn't get key for object %+v: %v", obj, err) + } + + e.queue.Add(key) +} + +// worker runs a worker thread that just dequeues items, processes them, and +// marks them done. You may run as many of these in parallel as you wish; the +// workqueue guarantees that they will not end up processing the same service +// at the same time. +func (e *endpointController) worker() { + for { + func() { + key, quit := e.queue.Get() + if quit { + return + } + // Use defer: in the unlikely event that there's a + // panic, we'd still like this to get marked done-- + // otherwise the controller will not be able to sync + // this service again until it is restarted. + defer e.queue.Done(key) + e.syncService(key.(string)) + }() + } +} + +func (e *endpointController) syncService(key string) { + startTime := time.Now() + defer func() { + glog.V(4).Infof("Finished syncing service %q endpoints. (%v)", key, time.Now().Sub(startTime)) + }() + obj, exists, err := e.serviceStore.Store.GetByKey(key) + if err != nil || !exists { + // Delete the corresponding endpoint, as the service has been deleted. + // TODO: Please note that this will delete an endpoint when a + // service is deleted. However, if we're down at the time when + // the service is deleted, we will miss that deletion, so this + // doesn't completely solve the problem. See #6877. + namespace, name, err := cache.SplitMetaNamespaceKey(key) + if err != nil { + glog.Errorf("Need to delete endpoint with key %q, but couldn't understand the key: %v", key, err) + // Don't retry, as the key isn't going to magically become understandable. + return + } + err = e.client.Endpoints(namespace).Delete(name) + if err != nil && !errors.IsNotFound(err) { + glog.Errorf("Error deleting endpoint %q: %v", key, err) + e.queue.Add(key) // Retry + } + return + } + + service := obj.(*api.Service) + if service.Spec.Selector == nil { + // services without a selector receive no endpoints from this controller; + // these services will receive the endpoints that are created out-of-band via the REST API. + return + } + + glog.V(5).Infof("About to update endpoints for service %q", key) + pods, err := e.podStore.Pods(service.Namespace).List(labels.Set(service.Spec.Selector).AsSelector()) + if err != nil { + // Since we're getting stuff from a local cache, it is + // basically impossible to get this error. + glog.Errorf("Error syncing service %q: %v", key, err) + e.queue.Add(key) // Retry + return + } + + subsets := []api.EndpointSubset{} + for i := range pods.Items { + pod := &pods.Items[i] + + for i := range service.Spec.Ports { + servicePort := &service.Spec.Ports[i] + + portName := servicePort.Name + portProto := servicePort.Protocol + portNum, err := findPort(pod, servicePort) + if err != nil { + glog.V(4).Infof("Failed to find port for service %s/%s: %v", service.Namespace, service.Name, err) + continue + } + // HACK(jdef): use HostIP instead of pod.CurrentState.PodIP for generic mesos compat + if len(pod.Status.HostIP) == 0 { + glog.V(4).Infof("Failed to find a host IP for pod %s/%s", pod.Namespace, pod.Name) + continue + } + if !api.IsPodReady(pod) { + glog.V(5).Infof("Pod is out of service: %v/%v", pod.Namespace, pod.Name) + continue + } + + // HACK(jdef): use HostIP instead of pod.CurrentState.PodIP for generic mesos compat + epp := api.EndpointPort{Name: portName, Port: portNum, Protocol: portProto} + epa := api.EndpointAddress{IP: pod.Status.HostIP, TargetRef: &api.ObjectReference{ + Kind: "Pod", + Namespace: pod.ObjectMeta.Namespace, + Name: pod.ObjectMeta.Name, + UID: pod.ObjectMeta.UID, + ResourceVersion: pod.ObjectMeta.ResourceVersion, + }} + subsets = append(subsets, api.EndpointSubset{Addresses: []api.EndpointAddress{epa}, Ports: []api.EndpointPort{epp}}) + } + } + subsets = endpoints.RepackSubsets(subsets) + + // See if there's actually an update here. + currentEndpoints, err := e.client.Endpoints(service.Namespace).Get(service.Name) + if err != nil { + if errors.IsNotFound(err) { + currentEndpoints = &api.Endpoints{ + ObjectMeta: api.ObjectMeta{ + Name: service.Name, + Labels: service.Labels, + }, + } + } else { + glog.Errorf("Error getting endpoints: %v", err) + e.queue.Add(key) // Retry + return + } + } + if reflect.DeepEqual(currentEndpoints.Subsets, subsets) && reflect.DeepEqual(currentEndpoints.Labels, service.Labels) { + glog.V(5).Infof("endpoints are equal for %s/%s, skipping update", service.Namespace, service.Name) + return + } + newEndpoints := currentEndpoints + newEndpoints.Subsets = subsets + newEndpoints.Labels = service.Labels + + if len(currentEndpoints.ResourceVersion) == 0 { + // No previous endpoints, create them + _, err = e.client.Endpoints(service.Namespace).Create(newEndpoints) + } else { + // Pre-existing + _, err = e.client.Endpoints(service.Namespace).Update(newEndpoints) + } + if err != nil { + glog.Errorf("Error updating endpoints: %v", err) + e.queue.Add(key) // Retry + } +} + +// checkLeftoverEndpoints lists all currently existing endpoints and adds their +// service to the queue. This will detect endpoints that exist with no +// corresponding service; these endpoints need to be deleted. We only need to +// do this once on startup, because in steady-state these are detected (but +// some stragglers could have been left behind if the endpoint controller +// reboots). +func (e *endpointController) checkLeftoverEndpoints() { + list, err := e.client.Endpoints(api.NamespaceAll).List(labels.Everything()) + if err != nil { + glog.Errorf("Unable to list endpoints (%v); orphaned endpoints will not be cleaned up. (They're pretty harmless, but you can restart this component if you want another attempt made.)", err) + return + } + for i := range list.Items { + ep := &list.Items[i] + key, err := keyFunc(ep) + if err != nil { + glog.Errorf("Unable to get key for endpoint %#v", ep) + continue + } + e.queue.Add(key) + } +} + +// findPort locates the container port for the given pod and portName. If the +// targetPort is a number, use that. If the targetPort is a string, look that +// string up in all named ports in all containers in the target pod. If no +// match is found, fail. +// +// HACK(jdef): return the HostPort instead of the ContainerPort for generic mesos compat. +func findPort(pod *api.Pod, svcPort *api.ServicePort) (int, error) { + portName := svcPort.TargetPort + switch portName.Kind { + case util.IntstrString: + name := portName.StrVal + for _, container := range pod.Spec.Containers { + for _, port := range container.Ports { + if port.Name == name && port.Protocol == svcPort.Protocol { + return findMappedPortName(pod, port.Protocol, name) + } + } + } + case util.IntstrInt: + // HACK(jdef): slightly different semantics from upstream here: + // we ensure that if the user spec'd a port in the service that + // it actually maps to a host-port assigned to the pod. upstream + // doesn't check this and happily returns the container port spec'd + // in the service, but that doesn't align w/ mesos port mgmt. + p := portName.IntVal + for _, container := range pod.Spec.Containers { + for _, port := range container.Ports { + if port.ContainerPort == p && port.Protocol == svcPort.Protocol { + return findMappedPort(pod, port.Protocol, p) + } + } + } + } + return 0, fmt.Errorf("no suitable port for manifest: %s", pod.UID) +} + +func findMappedPort(pod *api.Pod, protocol api.Protocol, port int) (int, error) { + if len(pod.Annotations) > 0 { + key := fmt.Sprintf(meta.PortMappingKeyFormat, string(protocol), port) + if value, found := pod.Annotations[key]; found { + return strconv.Atoi(value) + } + } + return 0, fmt.Errorf("failed to find mapped container %s port: %d", protocol, port) +} + +func findMappedPortName(pod *api.Pod, protocol api.Protocol, portName string) (int, error) { + if len(pod.Annotations) > 0 { + key := fmt.Sprintf(meta.PortNameMappingKeyFormat, string(protocol), portName) + if value, found := pod.Annotations[key]; found { + return strconv.Atoi(value) + } + } + return 0, fmt.Errorf("failed to find mapped container %s port name: %q", protocol, portName) +} diff --git a/contrib/mesos/pkg/service/endpoints_controller_test.go b/contrib/mesos/pkg/service/endpoints_controller_test.go new file mode 100644 index 00000000000..8168cd01cd0 --- /dev/null +++ b/contrib/mesos/pkg/service/endpoints_controller_test.go @@ -0,0 +1,63 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package service + +import ( + "testing" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" +) + +func TestFindMappedPort(t *testing.T) { + pod := &api.Pod{} + port, err := findMappedPort(pod, api.ProtocolTCP, 80) + if err == nil { + t.Fatalf("expected error since port tcp/80 is not mapped") + } + port, err = findMappedPortName(pod, api.ProtocolUDP, "foo") + if err == nil { + t.Fatalf("expected error since port udp/'foo' is not mapped") + } + + pod.Annotations = make(map[string]string) + pod.Annotations["k8s.mesosphere.io/port_TCP_80"] = "123" + pod.Annotations["k8s.mesosphere.io/portName_UDP_foo"] = "456" + + port, err = findMappedPort(pod, api.ProtocolUDP, 80) + if err == nil { + t.Fatalf("expected error since port udp/80 is not mapped") + } + port, err = findMappedPort(pod, api.ProtocolTCP, 80) + if err != nil { + t.Fatalf("expected that port 80 is mapped") + } + if port != 123 { + t.Fatalf("expected mapped port == 123") + } + + port, err = findMappedPortName(pod, api.ProtocolTCP, "foo") + if err == nil { + t.Fatalf("expected error since port tcp/'foo' is not mapped") + } + port, err = findMappedPortName(pod, api.ProtocolUDP, "foo") + if err != nil { + t.Fatalf("expected that port udp/'foo' is mapped") + } + if port != 456 { + t.Fatalf("expected mapped port == 456") + } +} diff --git a/pkg/service/endpoints_controller.go b/pkg/service/endpoints_controller.go index e6d8e30a0d7..f204a16c7e6 100644 --- a/pkg/service/endpoints_controller.go +++ b/pkg/service/endpoints_controller.go @@ -14,6 +14,8 @@ See the License for the specific language governing permissions and limitations under the License. */ +// CAUTION: If you update code in this file, you may need to also update code +// in contrib/mesos/pkg/service/endpoints_manager.go package service import ( From 022ff5196d21fb24cf02fa1e571bb46884c0e571 Mon Sep 17 00:00:00 2001 From: James DeFelice Date: Thu, 11 Jun 2015 19:34:04 +0000 Subject: [PATCH 2/2] fix broken file refs --- cmd/kube-controller-manager/app/controllermanager.go | 2 +- pkg/service/endpoints_controller.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index 7054dcaccec..9df4b646cbd 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -19,7 +19,7 @@ limitations under the License. // nodes. // // CAUTION: If you update code in this file, you may need to also update code -// in contrib/mesos/pkg/controlmanager/controlmanager.go +// in contrib/mesos/pkg/controllermanager/controllermanager.go package app import ( diff --git a/pkg/service/endpoints_controller.go b/pkg/service/endpoints_controller.go index f204a16c7e6..461e41c8134 100644 --- a/pkg/service/endpoints_controller.go +++ b/pkg/service/endpoints_controller.go @@ -15,7 +15,7 @@ limitations under the License. */ // CAUTION: If you update code in this file, you may need to also update code -// in contrib/mesos/pkg/service/endpoints_manager.go +// in contrib/mesos/pkg/service/endpoints_controller.go package service import (