Upstream controller manager and km binary

Closes mesosphere/kubernetes-mesos#310
Depends on GoogleCloudPlatform/kubernetes#8882

- fix https://github.com/mesosphere/kubernetes-mesos/issues/336
- Fix comment typo
- Fixes https://github.com/GoogleCloudPlatform/kubernetes/pull/9265#commitcomment-11559038
- Add warning to k8s modules to also update mesos copies
- Fixes https://github.com/GoogleCloudPlatform/kubernetes/pull/9265#commitcomment-11558864
- Fixes https://github.com/GoogleCloudPlatform/kubernetes/pull/9265#commitcomment-11558855
- Add comments and TODO that hypercube and controllermanager need refactoring
This commit is contained in:
James DeFelice 2015-06-11 13:13:19 +00:00
parent 52db576617
commit 1820114a2d
27 changed files with 1511 additions and 0 deletions

View File

@ -14,6 +14,8 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
// CAUTION: If you update code in this file, you may need to also update code
// in contrib/mesos/cmd/km/hyperkube.go
package main
import (

View File

@ -14,6 +14,8 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
// CAUTION: If you update code in this file, you may need to also update code
// in contrib/mesos/cmd/km/hyperkube_test.go
package main
import (

View File

@ -14,6 +14,8 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
// CAUTION: If you update code in this file, you may need to also update code
// in contrib/mesos/cmd/km/kube-apiserver.go
package main
import (

View File

@ -14,6 +14,8 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
// CAUTION: If you update code in this file, you may need to also update code
// in contrib/mesos/cmd/km/k8sm-controllermanager.go
package main
import (

View File

@ -14,6 +14,8 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
// CAUTION: If you update code in this file, you may need to also update code
// in contrib/mesos/cmd/km/kube-proxy.go
package main
import (

View File

@ -14,6 +14,8 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
// CAUTION: If you update code in this file, you may need to also update code
// in contrib/mesos/cmd/km/k8sm-scheduler.go
package main
import (

View File

@ -16,6 +16,9 @@ limitations under the License.
// A binary that can morph into all of the other kubernetes binaries. You can
// also soft-link to it busybox style.
//
// CAUTION: If you update code in this file, you may need to also update code
// in contrib/mesos/cmd/km/km.go
package main
import (

View File

@ -14,6 +14,8 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
// CAUTION: If you update code in this file, you may need to also update code
// in contrib/mesos/cmd/km/server.go
package main
import (

View File

@ -17,6 +17,9 @@ limitations under the License.
// Package app implements a server that runs a set of active
// components. This includes replication controllers, service endpoints and
// nodes.
//
// CAUTION: If you update code in this file, you may need to also update code
// in contrib/mesos/pkg/controlmanager/controlmanager.go
package app
import (

View File

@ -0,0 +1,23 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// This package main implements the executable Kubernetes Mesos controller manager.
//
// It is mainly a clone of the upstream cmd/hyperkube module right now because
// the upstream hyperkube module is not reusable.
//
// TODO(jdef,sttts): refactor upstream cmd/kube-controller-manager to be reusable with the necessary mesos changes
package main

View File

@ -0,0 +1,53 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package main
import (
"fmt"
"os"
"runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/healthz"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/version/verflag"
"github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/controllermanager"
"github.com/spf13/pflag"
)
func init() {
healthz.DefaultHealthz()
}
func main() {
runtime.GOMAXPROCS(runtime.NumCPU())
s := controllermanager.NewCMServer()
s.AddFlags(pflag.CommandLine)
util.InitFlags()
util.InitLogs()
defer util.FlushLogs()
verflag.PrintAndExitIfRequested()
if err := s.Run(pflag.CommandLine.Args()); err != nil {
fmt.Fprintf(os.Stderr, err.Error())
os.Exit(1)
}
}

View File

@ -0,0 +1,24 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// This package main morphs all binaries under cmd/ and several other stock
// Kubernetes binaries into a single executable.
//
// It is mainly a clone of the upstream cmd/hyperkube module right now because
// the upstream hyperkube module is not reusable.
//
// TODO(jdef,sttts): refactor upstream cmd/hyperkube to be reusable with the necessary mesos changes
package main

View File

@ -0,0 +1,203 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// clone of the upstream cmd/hypercube/hyperkube.go
package main
import (
"errors"
"flag"
"fmt"
"io"
"io/ioutil"
"os"
"path"
"runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/version/verflag"
"github.com/spf13/pflag"
)
// HyperKube represents a single binary that can morph/manage into multiple
// servers.
type HyperKube struct {
Name string // The executable name, used for help and soft-link invocation
Long string // A long description of the binary. It will be world wrapped before output.
servers []Server
baseFlags *pflag.FlagSet
out io.Writer
helpFlagVal bool
}
// AddServer adds a server to the HyperKube object.
func (hk *HyperKube) AddServer(s *Server) {
hk.servers = append(hk.servers, *s)
hk.servers[len(hk.servers)-1].hk = hk
}
// FindServer will find a specific server named name.
func (hk *HyperKube) FindServer(name string) (*Server, error) {
for _, s := range hk.servers {
if s.Name() == name {
return &s, nil
}
}
return nil, fmt.Errorf("Server not found: %s", name)
}
// Servers returns a list of all of the registred servers
func (hk *HyperKube) Servers() []Server {
return hk.servers
}
// Flags returns a flagset for "global" flags.
func (hk *HyperKube) Flags() *pflag.FlagSet {
if hk.baseFlags == nil {
hk.baseFlags = pflag.NewFlagSet(hk.Name, pflag.ContinueOnError)
hk.baseFlags.SetOutput(ioutil.Discard)
hk.baseFlags.BoolVarP(&hk.helpFlagVal, "help", "h", false, "help for "+hk.Name)
// These will add all of the "global" flags (defined with both the
// flag and pflag packages) to the new flag set we have.
util.AddFlagSetToPFlagSet(flag.CommandLine, hk.baseFlags)
util.AddPFlagSetToPFlagSet(pflag.CommandLine, hk.baseFlags)
}
return hk.baseFlags
}
// Out returns the io.Writer that is used for all usage/error information
func (hk *HyperKube) Out() io.Writer {
if hk.out == nil {
hk.out = os.Stderr
}
return hk.out
}
// SetOut sets the output writer for all usage/error information
func (hk *HyperKube) SetOut(w io.Writer) {
hk.out = w
}
// Print is a convenience method to Print to the defined output
func (hk *HyperKube) Print(i ...interface{}) {
fmt.Fprint(hk.Out(), i...)
}
// Println is a convenience method to Println to the defined output
func (hk *HyperKube) Println(i ...interface{}) {
fmt.Fprintln(hk.Out(), i...)
}
// Printf is a convenience method to Printf to the defined output
func (hk *HyperKube) Printf(format string, i ...interface{}) {
fmt.Fprintf(hk.Out(), format, i...)
}
// Run the server. This will pick the appropriate server and run it.
func (hk *HyperKube) Run(args []string) error {
// If we are called directly, parse all flags up to the first real
// argument. That should be the server to run.
baseCommand := path.Base(args[0])
serverName := baseCommand
if serverName == hk.Name {
args = args[1:]
baseFlags := hk.Flags()
baseFlags.SetInterspersed(false) // Only parse flags up to the next real command
err := baseFlags.Parse(args)
if err != nil || hk.helpFlagVal {
if err != nil {
hk.Println("Error:", err)
}
hk.Usage()
return err
}
verflag.PrintAndExitIfRequested()
args = baseFlags.Args()
if len(args) > 0 && len(args[0]) > 0 {
serverName = args[0]
baseCommand = baseCommand + " " + serverName
args = args[1:]
} else {
err = errors.New("No server specified")
hk.Printf("Error: %v\n\n", err)
hk.Usage()
return err
}
}
s, err := hk.FindServer(serverName)
if err != nil {
hk.Printf("Error: %v\n\n", err)
hk.Usage()
return err
}
util.AddPFlagSetToPFlagSet(hk.Flags(), s.Flags())
err = s.Flags().Parse(args)
if err != nil || hk.helpFlagVal {
if err != nil {
hk.Printf("Error: %v\n\n", err)
}
s.Usage()
return err
}
verflag.PrintAndExitIfRequested()
util.InitLogs()
defer util.FlushLogs()
err = s.Run(s, s.Flags().Args())
if err != nil {
hk.Println("Error:", err)
}
return err
}
// RunToExit will run the hyperkube and then call os.Exit with an appropriate exit code.
func (hk *HyperKube) RunToExit(args []string) {
runtime.GOMAXPROCS(runtime.NumCPU())
err := hk.Run(args)
if err != nil {
fmt.Fprint(os.Stderr, err.Error())
os.Exit(1)
}
os.Exit(0)
}
// Usage will write out a summary for all servers that this binary supports.
func (hk *HyperKube) Usage() {
tt := `{{if .Long}}{{.Long | trim | wrap ""}}
{{end}}Usage
{{.Name}} <server> [flags]
Servers
{{range .Servers}}
{{.Name}}
{{.Long | trim | wrap " "}}{{end}}
Call '{{.Name}} <server> --help' for help on a specific server.
`
util.ExecuteTemplate(hk.Out(), tt, hk)
}

View File

@ -0,0 +1,144 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// clone of the upstream cmd/hypercube/hyperkube_test.go
package main
import (
"bytes"
"errors"
"fmt"
"strings"
"testing"
"github.com/stretchr/testify/assert"
)
type result struct {
err error
output string
}
func testServer(n string) *Server {
return &Server{
SimpleUsage: n,
Long: fmt.Sprintf("A simple server named %s", n),
Run: func(s *Server, args []string) error {
s.hk.Printf("%s Run\n", s.Name())
return nil
},
}
}
func testServerError(n string) *Server {
return &Server{
SimpleUsage: n,
Long: fmt.Sprintf("A simple server named %s that returns an error", n),
Run: func(s *Server, args []string) error {
s.hk.Printf("%s Run\n", s.Name())
return errors.New("Server returning error")
},
}
}
func runFull(t *testing.T, args string) *result {
buf := new(bytes.Buffer)
hk := HyperKube{
Name: "hyperkube",
Long: "hyperkube is an all-in-one server binary.",
}
hk.SetOut(buf)
hk.AddServer(testServer("test1"))
hk.AddServer(testServer("test2"))
hk.AddServer(testServer("test3"))
hk.AddServer(testServerError("test-error"))
a := strings.Split(args, " ")
t.Logf("Running full with args: %q", a)
err := hk.Run(a)
r := &result{err, buf.String()}
t.Logf("Result err: %v, output: %q", r.err, r.output)
return r
}
func TestRun(t *testing.T) {
x := runFull(t, "hyperkube test1")
assert.Contains(t, x.output, "test1 Run")
assert.NoError(t, x.err)
}
func TestLinkRun(t *testing.T) {
x := runFull(t, "test1")
assert.Contains(t, x.output, "test1 Run")
assert.NoError(t, x.err)
}
func TestTopNoArgs(t *testing.T) {
x := runFull(t, "hyperkube")
assert.EqualError(t, x.err, "No server specified")
}
func TestBadServer(t *testing.T) {
x := runFull(t, "hyperkube bad-server")
assert.EqualError(t, x.err, "Server not found: bad-server")
assert.Contains(t, x.output, "Usage")
}
func TestTopHelp(t *testing.T) {
x := runFull(t, "hyperkube --help")
assert.NoError(t, x.err)
assert.Contains(t, x.output, "all-in-one")
assert.Contains(t, x.output, "A simple server named test1")
}
func TestTopFlags(t *testing.T) {
x := runFull(t, "hyperkube --help test1")
assert.NoError(t, x.err)
assert.Contains(t, x.output, "all-in-one")
assert.Contains(t, x.output, "A simple server named test1")
assert.NotContains(t, x.output, "test1 Run")
}
func TestTopFlagsBad(t *testing.T) {
x := runFull(t, "hyperkube --bad-flag")
assert.EqualError(t, x.err, "unknown flag: --bad-flag")
assert.Contains(t, x.output, "all-in-one")
assert.Contains(t, x.output, "A simple server named test1")
}
func TestServerHelp(t *testing.T) {
x := runFull(t, "hyperkube test1 --help")
assert.NoError(t, x.err)
assert.Contains(t, x.output, "A simple server named test1")
assert.Contains(t, x.output, "--help=false: help for hyperkube")
assert.NotContains(t, x.output, "test1 Run")
}
func TestServerFlagsBad(t *testing.T) {
x := runFull(t, "hyperkube test1 --bad-flag")
assert.EqualError(t, x.err, "unknown flag: --bad-flag")
assert.Contains(t, x.output, "A simple server named test1")
assert.Contains(t, x.output, "--help=false: help for hyperkube")
assert.NotContains(t, x.output, "test1 Run")
}
func TestServerError(t *testing.T) {
x := runFull(t, "hyperkube test-error")
assert.Contains(t, x.output, "test-error Run")
assert.EqualError(t, x.err, "Server returning error")
}

View File

@ -0,0 +1,38 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// clone of the upstream cmd/hypercube/kube-controllermanager.go
package main
import (
"github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/controllermanager"
)
// NewHyperkubeServer creates a new hyperkube Server object that includes the
// description and flags.
func NewControllerManager() *Server {
s := controllermanager.NewCMServer()
hks := Server{
SimpleUsage: "controller-manager",
Long: "A server that runs a set of active components. This includes replication controllers, service endpoints and nodes.",
Run: func(_ *Server, args []string) error {
return s.Run(args)
},
}
s.AddFlags(hks.Flags())
return &hks
}

View File

@ -0,0 +1,40 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package main
import (
"github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/executor/service"
)
// NewHyperkubeServer creates a new hyperkube Server object that includes the
// description and flags.
func NewKubeletExecutor() *Server {
s := service.NewHyperKubeletExecutorServer()
hks := Server{
SimpleUsage: "executor",
Long: `The kubelet-executor binary is responsible for maintaining a set of containers
on a particular node. It syncs data from a specialized Mesos source that tracks
task launches and kills. It then queries Docker to see what is currently
running. It synchronizes the configuration data, with the running set of
containers by starting or stopping Docker containers.`,
Run: func(hks *Server, args []string) error {
return s.Run(hks, args)
},
}
s.AddHyperkubeFlags(hks.Flags())
return &hks
}

View File

@ -0,0 +1,39 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// clone of the upstream cmd/hypercube/k8sm-scheduler.go
package main
import (
"github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/scheduler/service"
)
// NewScheduler creates a new hyperkube Server object that includes the
// description and flags.
func NewScheduler() *Server {
s := service.NewSchedulerServer()
hks := Server{
SimpleUsage: "scheduler",
Long: `Implements the Kubernetes-Mesos scheduler. This will launch Mesos tasks which
results in pods assigned to kubelets based on capacity and constraints.`,
Run: func(hks *Server, args []string) error {
return s.Run(hks, args)
},
}
s.AddHyperkubeFlags(hks.Flags())
return &hks
}

View File

@ -0,0 +1,37 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// clone of the upstream cmd/hypercube/main.go
package main
import (
"os"
)
func main() {
hk := HyperKube{
Name: "km",
Long: "This is an all-in-one binary that can run any of the various Kubernetes-Mesos servers.",
}
hk.AddServer(NewKubeAPIServer())
hk.AddServer(NewControllerManager())
hk.AddServer(NewScheduler())
hk.AddServer(NewKubeletExecutor())
hk.AddServer(NewKubeProxy())
hk.RunToExit(os.Args)
}

View File

@ -0,0 +1,38 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// clone of the upstream cmd/hypercube/kube-apiserver.go
package main
import (
kubeapiserver "github.com/GoogleCloudPlatform/kubernetes/cmd/kube-apiserver/app"
)
// NewKubeAPIServer creates a new hyperkube Server object that includes the
// description and flags.
func NewKubeAPIServer() *Server {
s := kubeapiserver.NewAPIServer()
hks := Server{
SimpleUsage: "apiserver",
Long: "The main API entrypoint and interface to the storage system. The API server is also the focal point for all authorization decisions.",
Run: func(_ *Server, args []string) error {
return s.Run(args)
},
}
s.AddFlags(hks.Flags())
return &hks
}

View File

@ -0,0 +1,41 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// clone of the upstream cmd/hypercube/kube-proxy.go
package main
import (
kubeproxy "github.com/GoogleCloudPlatform/kubernetes/cmd/kube-proxy/app"
)
// NewKubeProxy creates a new hyperkube Server object that includes the
// description and flags.
func NewKubeProxy() *Server {
s := kubeproxy.NewProxyServer()
hks := Server{
SimpleUsage: "proxy",
Long: `The Kubernetes proxy server is responsible for taking traffic directed at
services and forwarding it to the appropriate pods. It generally runs on
nodes next to the Kubelet and proxies traffic from local pods to remote pods.
It is also used when handling incoming external traffic.`,
Run: func(_ *Server, args []string) error {
return s.Run(args)
},
}
s.AddFlags(hks.Flags())
return &hks
}

View File

@ -0,0 +1,82 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// clone of the upstream cmd/hypercube/server.go
package main
import (
"io/ioutil"
"strings"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/spf13/pflag"
)
type serverRunFunc func(s *Server, args []string) error
// Server describes a server that this binary can morph into.
type Server struct {
SimpleUsage string // One line description of the server.
Long string // Longer free form description of the server
Run serverRunFunc // Run the server. This is not expected to return.
flags *pflag.FlagSet // Flags for the command (and all dependents)
name string
hk *HyperKube
}
// Usage returns the full usage string including all of the flags.
func (s *Server) Usage() error {
tt := `{{if .Long}}{{.Long | trim | wrap ""}}
{{end}}Usage:
{{.SimpleUsage}} [flags]
Available Flags:
{{.Flags.FlagUsages}}`
return util.ExecuteTemplate(s.hk.Out(), tt, s)
}
// Name returns the name of the command as derived from the usage line.
func (s *Server) Name() string {
if s.name != "" {
return s.name
}
name := s.SimpleUsage
i := strings.Index(name, " ")
if i >= 0 {
name = name[:i]
}
return name
}
// Flags returns a flagset for this server
func (s *Server) Flags() *pflag.FlagSet {
if s.flags == nil {
s.flags = pflag.NewFlagSet(s.Name(), pflag.ContinueOnError)
s.flags.SetOutput(ioutil.Discard)
}
return s.flags
}
func (s *Server) FindServer(name string) bool {
if s == nil {
return false
}
_, err := s.hk.FindServer(name)
return err == nil
}

View File

@ -0,0 +1,183 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controllermanager
import (
"net"
"net/http"
"strconv"
"github.com/GoogleCloudPlatform/kubernetes/cmd/kube-controller-manager/app"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/clientcmd"
clientcmdapi "github.com/GoogleCloudPlatform/kubernetes/pkg/client/clientcmd/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider"
"github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/mesos"
"github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/nodecontroller"
"github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/routecontroller"
"github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/servicecontroller"
"github.com/GoogleCloudPlatform/kubernetes/pkg/controller"
"github.com/GoogleCloudPlatform/kubernetes/pkg/healthz"
"github.com/GoogleCloudPlatform/kubernetes/pkg/namespace"
"github.com/GoogleCloudPlatform/kubernetes/pkg/resourcequota"
kendpoint "github.com/GoogleCloudPlatform/kubernetes/pkg/service"
"github.com/GoogleCloudPlatform/kubernetes/pkg/serviceaccount"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/volumeclaimbinder"
"github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/profile"
kmendpoint "github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/service"
"github.com/golang/glog"
"github.com/prometheus/client_golang/prometheus"
"github.com/spf13/pflag"
)
// CMServer is the main context object for the controller manager.
type CMServer struct {
*app.CMServer
UseHostPortEndpoints bool
}
// NewCMServer creates a new CMServer with a default config.
func NewCMServer() *CMServer {
s := &CMServer{
CMServer: app.NewCMServer(),
}
s.CloudProvider = mesos.ProviderName
s.UseHostPortEndpoints = true
return s
}
// AddFlags adds flags for a specific CMServer to the specified FlagSet
func (s *CMServer) AddFlags(fs *pflag.FlagSet) {
s.CMServer.AddFlags(fs)
fs.BoolVar(&s.UseHostPortEndpoints, "host_port_endpoints", s.UseHostPortEndpoints, "Map service endpoints to hostIP:hostPort instead of podIP:containerPort. Default true.")
}
func (s *CMServer) Run(_ []string) error {
if s.Kubeconfig == "" && s.Master == "" {
glog.Warningf("Neither --kubeconfig nor --master was specified. Using default API client. This might not work.")
}
// This creates a client, first loading any specified kubeconfig
// file, and then overriding the Master flag, if non-empty.
kubeconfig, err := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(
&clientcmd.ClientConfigLoadingRules{ExplicitPath: s.Kubeconfig},
&clientcmd.ConfigOverrides{ClusterInfo: clientcmdapi.Cluster{Server: s.Master}}).ClientConfig()
if err != nil {
return err
}
kubeconfig.QPS = 20.0
kubeconfig.Burst = 30
kubeClient, err := client.New(kubeconfig)
if err != nil {
glog.Fatalf("Invalid API configuration: %v", err)
}
go func() {
mux := http.NewServeMux()
healthz.InstallHandler(mux)
if s.EnableProfiling {
profile.InstallHandler(mux)
}
mux.Handle("/metrics", prometheus.Handler())
server := &http.Server{
Addr: net.JoinHostPort(s.Address.String(), strconv.Itoa(s.Port)),
Handler: mux,
}
glog.Fatal(server.ListenAndServe())
}()
endpoints := s.createEndpointController(kubeClient)
go endpoints.Run(s.ConcurrentEndpointSyncs, util.NeverStop)
controllerManager := controller.NewReplicationManager(kubeClient, controller.BurstReplicas)
go controllerManager.Run(s.ConcurrentRCSyncs, util.NeverStop)
//TODO(jdef) should eventually support more cloud providers here
if s.CloudProvider != mesos.ProviderName {
glog.Fatalf("Only provider %v is supported, you specified %v", mesos.ProviderName, s.CloudProvider)
}
cloud := cloudprovider.InitCloudProvider(s.CloudProvider, s.CloudConfigFile)
nodeController := nodecontroller.NewNodeController(cloud, kubeClient, s.RegisterRetryCount,
s.PodEvictionTimeout, nodecontroller.NewPodEvictor(util.NewTokenBucketRateLimiter(s.DeletingPodsQps, s.DeletingPodsBurst)),
s.NodeMonitorGracePeriod, s.NodeStartupGracePeriod, s.NodeMonitorPeriod, (*net.IPNet)(&s.ClusterCIDR), s.AllocateNodeCIDRs)
nodeController.Run(s.NodeSyncPeriod)
serviceController := servicecontroller.New(cloud, kubeClient, s.ClusterName)
if err := serviceController.Run(s.NodeSyncPeriod); err != nil {
glog.Errorf("Failed to start service controller: %v", err)
}
if s.AllocateNodeCIDRs {
routes, ok := cloud.Routes()
if !ok {
glog.Fatal("Cloud provider must support routes if allocate-node-cidrs is set")
}
routeController := routecontroller.New(routes, kubeClient, s.ClusterName, (*net.IPNet)(&s.ClusterCIDR))
routeController.Run(s.NodeSyncPeriod)
}
resourceQuotaManager := resourcequota.NewResourceQuotaManager(kubeClient)
resourceQuotaManager.Run(s.ResourceQuotaSyncPeriod)
namespaceManager := namespace.NewNamespaceManager(kubeClient, s.NamespaceSyncPeriod)
namespaceManager.Run()
pvclaimBinder := volumeclaimbinder.NewPersistentVolumeClaimBinder(kubeClient, s.PVClaimBinderSyncPeriod)
pvclaimBinder.Run()
pvRecycler, err := volumeclaimbinder.NewPersistentVolumeRecycler(kubeClient, s.PVClaimBinderSyncPeriod, app.ProbeRecyclableVolumePlugins())
if err != nil {
glog.Fatalf("Failed to start persistent volume recycler: %+v", err)
}
pvRecycler.Run()
if len(s.ServiceAccountKeyFile) > 0 {
privateKey, err := serviceaccount.ReadPrivateKey(s.ServiceAccountKeyFile)
if err != nil {
glog.Errorf("Error reading key for service account token controller: %v", err)
} else {
serviceaccount.NewTokensController(
kubeClient,
serviceaccount.DefaultTokenControllerOptions(
serviceaccount.JWTTokenGenerator(privateKey),
),
).Run()
}
}
serviceaccount.NewServiceAccountsController(
kubeClient,
serviceaccount.DefaultServiceAccountsControllerOptions(),
).Run()
select {}
}
func (s *CMServer) createEndpointController(client *client.Client) kmendpoint.EndpointController {
if s.UseHostPortEndpoints {
glog.V(2).Infof("Creating hostIP:hostPort endpoint controller")
return kmendpoint.NewEndpointController(client)
}
glog.V(2).Infof("Creating podIP:containerPort endpoint controller")
stockEndpointController := kendpoint.NewEndpointController(client)
return stockEndpointController
}

View File

@ -0,0 +1,20 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package controllermanager is largely a clone of the upstream implementation,
// with additional functionality to select between stock or a customized
// endpoints controller.
package controllermanager

View File

@ -0,0 +1,20 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package service is largely a clone of the stock Kubernetes endpoints
// controller, extended with some very specific functionality related
// to kubernetes-mesos specific host-pod port mapping.
package service

View File

@ -0,0 +1,441 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package service
import (
"fmt"
"reflect"
"strconv"
"time"
"github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/scheduler/meta"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/endpoints"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache"
"github.com/GoogleCloudPlatform/kubernetes/pkg/controller/framework"
"github.com/GoogleCloudPlatform/kubernetes/pkg/fields"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
kservice "github.com/GoogleCloudPlatform/kubernetes/pkg/service"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/workqueue"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
"github.com/golang/glog"
)
var (
keyFunc = framework.DeletionHandlingMetaNamespaceKeyFunc
)
type EndpointController interface {
Run(workers int, stopCh <-chan struct{})
}
// NewEndpointController returns a new *EndpointController.
func NewEndpointController(client *client.Client) *endpointController {
e := &endpointController{
client: client,
queue: workqueue.New(),
}
e.serviceStore.Store, e.serviceController = framework.NewInformer(
&cache.ListWatch{
ListFunc: func() (runtime.Object, error) {
return e.client.Services(api.NamespaceAll).List(labels.Everything())
},
WatchFunc: func(rv string) (watch.Interface, error) {
return e.client.Services(api.NamespaceAll).Watch(labels.Everything(), fields.Everything(), rv)
},
},
&api.Service{},
kservice.FullServiceResyncPeriod,
framework.ResourceEventHandlerFuncs{
AddFunc: e.enqueueService,
UpdateFunc: func(old, cur interface{}) {
e.enqueueService(cur)
},
DeleteFunc: e.enqueueService,
},
)
e.podStore.Store, e.podController = framework.NewInformer(
&cache.ListWatch{
ListFunc: func() (runtime.Object, error) {
return e.client.Pods(api.NamespaceAll).List(labels.Everything(), fields.Everything())
},
WatchFunc: func(rv string) (watch.Interface, error) {
return e.client.Pods(api.NamespaceAll).Watch(labels.Everything(), fields.Everything(), rv)
},
},
&api.Pod{},
kservice.PodRelistPeriod,
framework.ResourceEventHandlerFuncs{
AddFunc: e.addPod,
UpdateFunc: e.updatePod,
DeleteFunc: e.deletePod,
},
)
return e
}
// EndpointController manages selector-based service endpoints.
type endpointController struct {
client *client.Client
serviceStore cache.StoreToServiceLister
podStore cache.StoreToPodLister
// Services that need to be updated. A channel is inappropriate here,
// because it allows services with lots of pods to be serviced much
// more often than services with few pods; it also would cause a
// service that's inserted multiple times to be processed more than
// necessary.
queue *workqueue.Type
// Since we join two objects, we'll watch both of them with
// controllers.
serviceController *framework.Controller
podController *framework.Controller
}
// Runs e; will not return until stopCh is closed. workers determines how many
// endpoints will be handled in parallel.
func (e *endpointController) Run(workers int, stopCh <-chan struct{}) {
defer util.HandleCrash()
go e.serviceController.Run(stopCh)
go e.podController.Run(stopCh)
for i := 0; i < workers; i++ {
go util.Until(e.worker, time.Second, stopCh)
}
go func() {
defer util.HandleCrash()
time.Sleep(5 * time.Minute) // give time for our cache to fill
e.checkLeftoverEndpoints()
}()
<-stopCh
e.queue.ShutDown()
}
func (e *endpointController) getPodServiceMemberships(pod *api.Pod) (util.StringSet, error) {
set := util.StringSet{}
services, err := e.serviceStore.GetPodServices(pod)
if err != nil {
// don't log this error because this function makes pointless
// errors when no services match.
return set, nil
}
for i := range services {
key, err := keyFunc(&services[i])
if err != nil {
return nil, err
}
set.Insert(key)
}
return set, nil
}
// When a pod is added, figure out what services it will be a member of and
// enqueue them. obj must have *api.Pod type.
func (e *endpointController) addPod(obj interface{}) {
pod := obj.(*api.Pod)
services, err := e.getPodServiceMemberships(pod)
if err != nil {
glog.Errorf("Unable to get pod %v/%v's service memberships: %v", pod.Namespace, pod.Name, err)
return
}
for key := range services {
e.queue.Add(key)
}
}
// When a pod is updated, figure out what services it used to be a member of
// and what services it will be a member of, and enqueue the union of these.
// old and cur must be *api.Pod types.
func (e *endpointController) updatePod(old, cur interface{}) {
if api.Semantic.DeepEqual(old, cur) {
return
}
newPod := old.(*api.Pod)
services, err := e.getPodServiceMemberships(newPod)
if err != nil {
glog.Errorf("Unable to get pod %v/%v's service memberships: %v", newPod.Namespace, newPod.Name, err)
return
}
oldPod := cur.(*api.Pod)
// Only need to get the old services if the labels changed.
if !reflect.DeepEqual(newPod.Labels, oldPod.Labels) {
oldServices, err := e.getPodServiceMemberships(oldPod)
if err != nil {
glog.Errorf("Unable to get pod %v/%v's service memberships: %v", oldPod.Namespace, oldPod.Name, err)
return
}
services = services.Union(oldServices)
}
for key := range services {
e.queue.Add(key)
}
}
// When a pod is deleted, enqueue the services the pod used to be a member of.
// obj could be an *api.Pod, or a DeletionFinalStateUnknown marker item.
func (e *endpointController) deletePod(obj interface{}) {
if _, ok := obj.(*api.Pod); ok {
// Enqueue all the services that the pod used to be a member
// of. This happens to be exactly the same thing we do when a
// pod is added.
e.addPod(obj)
return
}
podKey, err := keyFunc(obj)
if err != nil {
glog.Errorf("Couldn't get key for object %+v: %v", obj, err)
}
glog.Infof("Pod %q was deleted but we don't have a record of its final state, so it will take up to %v before it will be removed from all endpoint records.", podKey, kservice.FullServiceResyncPeriod)
// TODO: keep a map of pods to services to handle this condition.
}
// obj could be an *api.Service, or a DeletionFinalStateUnknown marker item.
func (e *endpointController) enqueueService(obj interface{}) {
key, err := keyFunc(obj)
if err != nil {
glog.Errorf("Couldn't get key for object %+v: %v", obj, err)
}
e.queue.Add(key)
}
// worker runs a worker thread that just dequeues items, processes them, and
// marks them done. You may run as many of these in parallel as you wish; the
// workqueue guarantees that they will not end up processing the same service
// at the same time.
func (e *endpointController) worker() {
for {
func() {
key, quit := e.queue.Get()
if quit {
return
}
// Use defer: in the unlikely event that there's a
// panic, we'd still like this to get marked done--
// otherwise the controller will not be able to sync
// this service again until it is restarted.
defer e.queue.Done(key)
e.syncService(key.(string))
}()
}
}
func (e *endpointController) syncService(key string) {
startTime := time.Now()
defer func() {
glog.V(4).Infof("Finished syncing service %q endpoints. (%v)", key, time.Now().Sub(startTime))
}()
obj, exists, err := e.serviceStore.Store.GetByKey(key)
if err != nil || !exists {
// Delete the corresponding endpoint, as the service has been deleted.
// TODO: Please note that this will delete an endpoint when a
// service is deleted. However, if we're down at the time when
// the service is deleted, we will miss that deletion, so this
// doesn't completely solve the problem. See #6877.
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
glog.Errorf("Need to delete endpoint with key %q, but couldn't understand the key: %v", key, err)
// Don't retry, as the key isn't going to magically become understandable.
return
}
err = e.client.Endpoints(namespace).Delete(name)
if err != nil && !errors.IsNotFound(err) {
glog.Errorf("Error deleting endpoint %q: %v", key, err)
e.queue.Add(key) // Retry
}
return
}
service := obj.(*api.Service)
if service.Spec.Selector == nil {
// services without a selector receive no endpoints from this controller;
// these services will receive the endpoints that are created out-of-band via the REST API.
return
}
glog.V(5).Infof("About to update endpoints for service %q", key)
pods, err := e.podStore.Pods(service.Namespace).List(labels.Set(service.Spec.Selector).AsSelector())
if err != nil {
// Since we're getting stuff from a local cache, it is
// basically impossible to get this error.
glog.Errorf("Error syncing service %q: %v", key, err)
e.queue.Add(key) // Retry
return
}
subsets := []api.EndpointSubset{}
for i := range pods.Items {
pod := &pods.Items[i]
for i := range service.Spec.Ports {
servicePort := &service.Spec.Ports[i]
portName := servicePort.Name
portProto := servicePort.Protocol
portNum, err := findPort(pod, servicePort)
if err != nil {
glog.V(4).Infof("Failed to find port for service %s/%s: %v", service.Namespace, service.Name, err)
continue
}
// HACK(jdef): use HostIP instead of pod.CurrentState.PodIP for generic mesos compat
if len(pod.Status.HostIP) == 0 {
glog.V(4).Infof("Failed to find a host IP for pod %s/%s", pod.Namespace, pod.Name)
continue
}
if !api.IsPodReady(pod) {
glog.V(5).Infof("Pod is out of service: %v/%v", pod.Namespace, pod.Name)
continue
}
// HACK(jdef): use HostIP instead of pod.CurrentState.PodIP for generic mesos compat
epp := api.EndpointPort{Name: portName, Port: portNum, Protocol: portProto}
epa := api.EndpointAddress{IP: pod.Status.HostIP, TargetRef: &api.ObjectReference{
Kind: "Pod",
Namespace: pod.ObjectMeta.Namespace,
Name: pod.ObjectMeta.Name,
UID: pod.ObjectMeta.UID,
ResourceVersion: pod.ObjectMeta.ResourceVersion,
}}
subsets = append(subsets, api.EndpointSubset{Addresses: []api.EndpointAddress{epa}, Ports: []api.EndpointPort{epp}})
}
}
subsets = endpoints.RepackSubsets(subsets)
// See if there's actually an update here.
currentEndpoints, err := e.client.Endpoints(service.Namespace).Get(service.Name)
if err != nil {
if errors.IsNotFound(err) {
currentEndpoints = &api.Endpoints{
ObjectMeta: api.ObjectMeta{
Name: service.Name,
Labels: service.Labels,
},
}
} else {
glog.Errorf("Error getting endpoints: %v", err)
e.queue.Add(key) // Retry
return
}
}
if reflect.DeepEqual(currentEndpoints.Subsets, subsets) && reflect.DeepEqual(currentEndpoints.Labels, service.Labels) {
glog.V(5).Infof("endpoints are equal for %s/%s, skipping update", service.Namespace, service.Name)
return
}
newEndpoints := currentEndpoints
newEndpoints.Subsets = subsets
newEndpoints.Labels = service.Labels
if len(currentEndpoints.ResourceVersion) == 0 {
// No previous endpoints, create them
_, err = e.client.Endpoints(service.Namespace).Create(newEndpoints)
} else {
// Pre-existing
_, err = e.client.Endpoints(service.Namespace).Update(newEndpoints)
}
if err != nil {
glog.Errorf("Error updating endpoints: %v", err)
e.queue.Add(key) // Retry
}
}
// checkLeftoverEndpoints lists all currently existing endpoints and adds their
// service to the queue. This will detect endpoints that exist with no
// corresponding service; these endpoints need to be deleted. We only need to
// do this once on startup, because in steady-state these are detected (but
// some stragglers could have been left behind if the endpoint controller
// reboots).
func (e *endpointController) checkLeftoverEndpoints() {
list, err := e.client.Endpoints(api.NamespaceAll).List(labels.Everything())
if err != nil {
glog.Errorf("Unable to list endpoints (%v); orphaned endpoints will not be cleaned up. (They're pretty harmless, but you can restart this component if you want another attempt made.)", err)
return
}
for i := range list.Items {
ep := &list.Items[i]
key, err := keyFunc(ep)
if err != nil {
glog.Errorf("Unable to get key for endpoint %#v", ep)
continue
}
e.queue.Add(key)
}
}
// findPort locates the container port for the given pod and portName. If the
// targetPort is a number, use that. If the targetPort is a string, look that
// string up in all named ports in all containers in the target pod. If no
// match is found, fail.
//
// HACK(jdef): return the HostPort instead of the ContainerPort for generic mesos compat.
func findPort(pod *api.Pod, svcPort *api.ServicePort) (int, error) {
portName := svcPort.TargetPort
switch portName.Kind {
case util.IntstrString:
name := portName.StrVal
for _, container := range pod.Spec.Containers {
for _, port := range container.Ports {
if port.Name == name && port.Protocol == svcPort.Protocol {
return findMappedPortName(pod, port.Protocol, name)
}
}
}
case util.IntstrInt:
// HACK(jdef): slightly different semantics from upstream here:
// we ensure that if the user spec'd a port in the service that
// it actually maps to a host-port assigned to the pod. upstream
// doesn't check this and happily returns the container port spec'd
// in the service, but that doesn't align w/ mesos port mgmt.
p := portName.IntVal
for _, container := range pod.Spec.Containers {
for _, port := range container.Ports {
if port.ContainerPort == p && port.Protocol == svcPort.Protocol {
return findMappedPort(pod, port.Protocol, p)
}
}
}
}
return 0, fmt.Errorf("no suitable port for manifest: %s", pod.UID)
}
func findMappedPort(pod *api.Pod, protocol api.Protocol, port int) (int, error) {
if len(pod.Annotations) > 0 {
key := fmt.Sprintf(meta.PortMappingKeyFormat, string(protocol), port)
if value, found := pod.Annotations[key]; found {
return strconv.Atoi(value)
}
}
return 0, fmt.Errorf("failed to find mapped container %s port: %d", protocol, port)
}
func findMappedPortName(pod *api.Pod, protocol api.Protocol, portName string) (int, error) {
if len(pod.Annotations) > 0 {
key := fmt.Sprintf(meta.PortNameMappingKeyFormat, string(protocol), portName)
if value, found := pod.Annotations[key]; found {
return strconv.Atoi(value)
}
}
return 0, fmt.Errorf("failed to find mapped container %s port name: %q", protocol, portName)
}

View File

@ -0,0 +1,63 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package service
import (
"testing"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
)
func TestFindMappedPort(t *testing.T) {
pod := &api.Pod{}
port, err := findMappedPort(pod, api.ProtocolTCP, 80)
if err == nil {
t.Fatalf("expected error since port tcp/80 is not mapped")
}
port, err = findMappedPortName(pod, api.ProtocolUDP, "foo")
if err == nil {
t.Fatalf("expected error since port udp/'foo' is not mapped")
}
pod.Annotations = make(map[string]string)
pod.Annotations["k8s.mesosphere.io/port_TCP_80"] = "123"
pod.Annotations["k8s.mesosphere.io/portName_UDP_foo"] = "456"
port, err = findMappedPort(pod, api.ProtocolUDP, 80)
if err == nil {
t.Fatalf("expected error since port udp/80 is not mapped")
}
port, err = findMappedPort(pod, api.ProtocolTCP, 80)
if err != nil {
t.Fatalf("expected that port 80 is mapped")
}
if port != 123 {
t.Fatalf("expected mapped port == 123")
}
port, err = findMappedPortName(pod, api.ProtocolTCP, "foo")
if err == nil {
t.Fatalf("expected error since port tcp/'foo' is not mapped")
}
port, err = findMappedPortName(pod, api.ProtocolUDP, "foo")
if err != nil {
t.Fatalf("expected that port udp/'foo' is mapped")
}
if port != 456 {
t.Fatalf("expected mapped port == 456")
}
}

View File

@ -14,6 +14,8 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
// CAUTION: If you update code in this file, you may need to also update code
// in contrib/mesos/pkg/service/endpoints_manager.go
package service
import (