Removed mesos as cloud provider from Kubernetes.

This commit is contained in:
Guangya Liu 2017-06-09 17:13:54 +08:00
parent 5ca03d674e
commit 498b034492
11 changed files with 0 additions and 1485 deletions

View File

@ -43,7 +43,6 @@ var cloudproviders = []string{
"azure", "azure",
"cloudstack", "cloudstack",
"gce", "gce",
"mesos",
"openstack", "openstack",
"ovirt", "ovirt",
"photon", "photon",

View File

@ -16,7 +16,6 @@ go_library(
"//pkg/cloudprovider/providers/azure:go_default_library", "//pkg/cloudprovider/providers/azure:go_default_library",
"//pkg/cloudprovider/providers/cloudstack:go_default_library", "//pkg/cloudprovider/providers/cloudstack:go_default_library",
"//pkg/cloudprovider/providers/gce:go_default_library", "//pkg/cloudprovider/providers/gce:go_default_library",
"//pkg/cloudprovider/providers/mesos:go_default_library",
"//pkg/cloudprovider/providers/openstack:go_default_library", "//pkg/cloudprovider/providers/openstack:go_default_library",
"//pkg/cloudprovider/providers/ovirt:go_default_library", "//pkg/cloudprovider/providers/ovirt:go_default_library",
"//pkg/cloudprovider/providers/photon:go_default_library", "//pkg/cloudprovider/providers/photon:go_default_library",
@ -41,7 +40,6 @@ filegroup(
"//pkg/cloudprovider/providers/cloudstack:all-srcs", "//pkg/cloudprovider/providers/cloudstack:all-srcs",
"//pkg/cloudprovider/providers/fake:all-srcs", "//pkg/cloudprovider/providers/fake:all-srcs",
"//pkg/cloudprovider/providers/gce:all-srcs", "//pkg/cloudprovider/providers/gce:all-srcs",
"//pkg/cloudprovider/providers/mesos:all-srcs",
"//pkg/cloudprovider/providers/openstack:all-srcs", "//pkg/cloudprovider/providers/openstack:all-srcs",
"//pkg/cloudprovider/providers/ovirt:all-srcs", "//pkg/cloudprovider/providers/ovirt:all-srcs",
"//pkg/cloudprovider/providers/photon:all-srcs", "//pkg/cloudprovider/providers/photon:all-srcs",

View File

@ -1,67 +0,0 @@
package(default_visibility = ["//visibility:public"])
licenses(["notice"])
load(
"@io_bazel_rules_go//go:def.bzl",
"go_library",
"go_test",
)
go_library(
name = "go_default_library",
srcs = [
"client.go",
"config.go",
"mesos.go",
"plugins.go",
],
tags = ["automanaged"],
deps = [
"//pkg/cloudprovider:go_default_library",
"//pkg/controller:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/github.com/mesos/mesos-go/detector:go_default_library",
"//vendor/github.com/mesos/mesos-go/detector/zoo:go_default_library",
"//vendor/github.com/mesos/mesos-go/mesosproto:go_default_library",
"//vendor/golang.org/x/net/context:go_default_library",
"//vendor/gopkg.in/gcfg.v1:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/net:go_default_library",
],
)
go_test(
name = "go_default_test",
srcs = [
"client_test.go",
"config_test.go",
"mesos_test.go",
],
library = ":go_default_library",
tags = ["automanaged"],
deps = [
"//pkg/cloudprovider:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/github.com/mesos/mesos-go/detector:go_default_library",
"//vendor/github.com/mesos/mesos-go/mesosutil:go_default_library",
"//vendor/golang.org/x/net/context:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/net:go_default_library",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
)

View File

@ -1,375 +0,0 @@
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package mesos
import (
"encoding/binary"
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"net"
"net/http"
"sync"
"time"
log "github.com/golang/glog"
"github.com/mesos/mesos-go/detector"
mesos "github.com/mesos/mesos-go/mesosproto"
"golang.org/x/net/context"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
utilnet "k8s.io/apimachinery/pkg/util/net"
)
const defaultClusterName = "mesos"
var noLeadingMasterError = errors.New("there is no current leading master available to query")
type mesosClient struct {
masterLock sync.RWMutex
master string // host:port formatted address
httpClient *http.Client
tr *http.Transport
initialMaster <-chan struct{} // signal chan, closes once an initial, non-nil master is found
state *stateCache
}
type slaveNode struct {
hostname string
kubeletRunning bool
resources *v1.NodeResources
}
type mesosState struct {
clusterName string
nodes map[string]*slaveNode // by hostname
}
type stateCache struct {
sync.Mutex
expiresAt time.Time
cached *mesosState
err error
ttl time.Duration
refill func(context.Context) (*mesosState, error)
}
// reloadCache reloads the state cache if it has expired.
func (c *stateCache) reloadCache(ctx context.Context) {
now := time.Now()
c.Lock()
defer c.Unlock()
if c.expiresAt.Before(now) {
log.V(4).Infof("Reloading cached Mesos state")
c.cached, c.err = c.refill(ctx)
c.expiresAt = now.Add(c.ttl)
} else {
log.V(4).Infof("Using cached Mesos state")
}
}
// cachedState returns the cached Mesos state.
func (c *stateCache) cachedState(ctx context.Context) (*mesosState, error) {
c.reloadCache(ctx)
return c.cached, c.err
}
// clusterName returns the cached Mesos cluster name.
func (c *stateCache) clusterName(ctx context.Context) (string, error) {
cached, err := c.cachedState(ctx)
if err != nil {
return "", err
}
return cached.clusterName, nil
}
// nodes returns the cached list of slave nodes.
func (c *stateCache) nodes(ctx context.Context) (map[string]*slaveNode, error) {
cached, err := c.cachedState(ctx)
if err != nil {
return nil, err
}
return cached.nodes, nil
}
func newMesosClient(
md detector.Master,
mesosHttpClientTimeout, stateCacheTTL time.Duration) (*mesosClient, error) {
tr := utilnet.SetTransportDefaults(&http.Transport{})
httpClient := &http.Client{
Transport: tr,
Timeout: mesosHttpClientTimeout,
}
return createMesosClient(md, httpClient, tr, stateCacheTTL)
}
func createMesosClient(
md detector.Master,
httpClient *http.Client,
tr *http.Transport,
stateCacheTTL time.Duration) (*mesosClient, error) {
initialMaster := make(chan struct{})
client := &mesosClient{
httpClient: httpClient,
tr: tr,
initialMaster: initialMaster,
state: &stateCache{
ttl: stateCacheTTL,
},
}
client.state.refill = client.pollMasterForState
first := true
if err := md.Detect(detector.OnMasterChanged(func(info *mesos.MasterInfo) {
host, port := extractMasterAddress(info)
if len(host) > 0 {
client.masterLock.Lock()
defer client.masterLock.Unlock()
client.master = fmt.Sprintf("%s:%d", host, port)
if first {
first = false
close(initialMaster)
}
}
log.Infof("cloud master changed to '%v'", client.master)
})); err != nil {
log.V(1).Infof("detector initialization failed: %v", err)
return nil, err
}
return client, nil
}
func extractMasterAddress(info *mesos.MasterInfo) (host string, port int) {
if info != nil {
host = info.GetAddress().GetHostname()
if host == "" {
host = info.GetAddress().GetIp()
}
if host != "" {
// use port from Address
port = int(info.GetAddress().GetPort())
} else {
// deprecated: get host and port directly from MasterInfo (and not Address)
host = info.GetHostname()
if host == "" {
host = unpackIPv4(info.GetIp())
}
port = int(info.GetPort())
}
}
return
}
func unpackIPv4(ip uint32) string {
octets := make([]byte, 4, 4)
binary.BigEndian.PutUint32(octets, ip)
ipv4 := net.IP(octets)
return ipv4.String()
}
// listSlaves returns a (possibly cached) map of slave nodes by hostname.
// Callers must not mutate the contents of the returned slice.
func (c *mesosClient) listSlaves(ctx context.Context) (map[string]*slaveNode, error) {
return c.state.nodes(ctx)
}
// clusterName returns a (possibly cached) cluster name.
func (c *mesosClient) clusterName(ctx context.Context) (string, error) {
return c.state.clusterName(ctx)
}
// pollMasterForState returns an array of slave nodes
func (c *mesosClient) pollMasterForState(ctx context.Context) (*mesosState, error) {
// wait for initial master detection
select {
case <-c.initialMaster: // noop
case <-ctx.Done():
return nil, ctx.Err()
}
master := func() string {
c.masterLock.RLock()
defer c.masterLock.RUnlock()
return c.master
}()
if master == "" {
return nil, noLeadingMasterError
}
//TODO(jdef) should not assume master uses http (what about https?)
var state *mesosState
successHandler := func(res *http.Response) error {
blob, err1 := ioutil.ReadAll(res.Body)
if err1 != nil {
return err1
}
log.V(3).Infof("Got mesos state, content length %v", len(blob))
state, err1 = parseMesosState(blob)
return err1
}
// thinking here is that we may get some other status codes from mesos at some point:
// - authentication
// - redirection (possibly from http to https)
// ...
for _, tt := range []struct {
uri string
handlers map[int]func(*http.Response) error
}{
{
uri: fmt.Sprintf("http://%s/state", master),
handlers: map[int]func(*http.Response) error{
200: successHandler,
},
},
{
uri: fmt.Sprintf("http://%s/state.json", master),
handlers: map[int]func(*http.Response) error{
200: successHandler,
},
},
} {
req, err := http.NewRequest("GET", tt.uri, nil)
if err != nil {
return nil, err
}
err = c.httpDo(ctx, req, func(res *http.Response, err error) error {
if err != nil {
return err
}
defer res.Body.Close()
if handler, ok := tt.handlers[res.StatusCode]; ok {
if err := handler(res); err != nil {
return err
}
}
// no handler for this error code, proceed to the next connection type
return nil
})
if state != nil || err != nil {
return state, err
}
}
return nil, errors.New("failed to sync with Mesos master")
}
func parseMesosState(blob []byte) (*mesosState, error) {
type State struct {
ClusterName string `json:"cluster"`
Slaves []*struct {
Id string `json:"id"` // ex: 20150106-162714-3815890698-5050-2453-S2
Pid string `json:"pid"` // ex: slave(1)@10.22.211.18:5051
Hostname string `json:"hostname"` // ex: 10.22.211.18, or slave-123.nowhere.com
Resources map[string]interface{} `json:"resources"` // ex: {"mem": 123, "ports": "[31000-3200]"}
} `json:"slaves"`
Frameworks []*struct {
Id string `json:"id"` // ex: 20151105-093752-3745622208-5050-1-0000
Pid string `json:"pid"` // ex: scheduler(1)@192.168.65.228:57124
Executors []*struct {
SlaveId string `json:"slave_id"` // ex: 20151105-093752-3745622208-5050-1-S1
ExecutorId string `json:"executor_id"` // ex: 6704d375c68fee1e_k8sm-executor
Name string `json:"name"` // ex: Kubelet-Executor
} `json:"executors"`
} `json:"frameworks"`
}
state := &State{ClusterName: defaultClusterName}
if err := json.Unmarshal(blob, state); err != nil {
return nil, err
}
executorSlaveIds := map[string]struct{}{}
for _, f := range state.Frameworks {
for _, e := range f.Executors {
// Note that this simple comparison breaks when we support more than one
// k8s instance in a cluster. At the moment this is not possible for
// a number of reasons.
// TODO(sttts): find way to detect executors of this k8s instance
if e.Name == KubernetesExecutorName {
executorSlaveIds[e.SlaveId] = struct{}{}
}
}
}
nodes := map[string]*slaveNode{} // by hostname
for _, slave := range state.Slaves {
if slave.Hostname == "" {
continue
}
node := &slaveNode{hostname: slave.Hostname}
cap := v1.ResourceList{}
if slave.Resources != nil && len(slave.Resources) > 0 {
// attempt to translate CPU (cores) and memory (MB) resources
if cpu, found := slave.Resources["cpus"]; found {
if cpuNum, ok := cpu.(float64); ok {
cap[v1.ResourceCPU] = *resource.NewQuantity(int64(cpuNum), resource.DecimalSI)
} else {
log.Warningf("unexpected slave cpu resource type %T: %v", cpu, cpu)
}
} else {
log.Warningf("slave failed to report cpu resource")
}
if mem, found := slave.Resources["mem"]; found {
if memNum, ok := mem.(float64); ok {
cap[v1.ResourceMemory] = *resource.NewQuantity(int64(memNum), resource.BinarySI)
} else {
log.Warningf("unexpected slave mem resource type %T: %v", mem, mem)
}
} else {
log.Warningf("slave failed to report mem resource")
}
}
if len(cap) > 0 {
node.resources = &v1.NodeResources{
Capacity: cap,
}
log.V(4).Infof("node %q reporting capacity %v", node.hostname, cap)
}
if _, ok := executorSlaveIds[slave.Id]; ok {
node.kubeletRunning = true
}
nodes[node.hostname] = node
}
result := &mesosState{
clusterName: state.ClusterName,
nodes: nodes,
}
return result, nil
}
type responseHandler func(*http.Response, error) error
// httpDo executes an HTTP request in the given context, canceling an ongoing request if the context
// is canceled prior to completion of the request. hacked from https://blog.golang.org/context
func (c *mesosClient) httpDo(ctx context.Context, req *http.Request, f responseHandler) error {
// Run the HTTP request in a goroutine and pass the response to f.
ch := make(chan error, 1)
go func() { ch <- f(c.httpClient.Do(req)) }()
select {
case <-ctx.Done():
c.tr.CancelRequest(req)
<-ch // Wait for f to return.
return ctx.Err()
case err := <-ch:
return err
}
}

View File

@ -1,269 +0,0 @@
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package mesos
import (
"fmt"
"net/http"
"net/http/httptest"
"net/url"
"reflect"
"testing"
"time"
log "github.com/golang/glog"
"github.com/mesos/mesos-go/detector"
"github.com/mesos/mesos-go/mesosutil"
"golang.org/x/net/context"
utilnet "k8s.io/apimachinery/pkg/util/net"
)
// Test data
const (
TEST_MASTER_ID = "master-12345"
TEST_MASTER_IP = 177048842 // 10.141.141.10
TEST_MASTER_PORT = 5050
TEST_STATE_JSON = `
{
"version": "0.22.0",
"unregistered_frameworks": [],
"started_tasks": 0,
"start_time": 1429456501.61141,
"staged_tasks": 0,
"slaves": [
{
"resources": {
"ports": "[31000-32000]",
"mem": 15360,
"disk": 470842,
"cpus": 8
},
"registered_time": 1429456502.46999,
"pid": "slave(1)@mesos1.internal.example.org.fail:5050",
"id": "20150419-081501-16777343-5050-16383-S2",
"hostname": "mesos1.internal.example.org.fail",
"attributes": {},
"active": true
},
{
"resources": {
"ports": "[31000-32000]",
"mem": 15360,
"disk": 470842,
"cpus": 8
},
"registered_time": 1429456502.4144,
"pid": "slave(1)@mesos2.internal.example.org.fail:5050",
"id": "20150419-081501-16777343-5050-16383-S1",
"hostname": "mesos2.internal.example.org.fail",
"attributes": {},
"active": true
},
{
"resources": {
"ports": "[31000-32000]",
"mem": 15360,
"disk": 470842,
"cpus": 8
},
"registered_time": 1429456502.02879,
"pid": "slave(1)@mesos3.internal.example.org.fail:5050",
"id": "20150419-081501-16777343-5050-16383-S0",
"hostname": "mesos3.internal.example.org.fail",
"attributes": {},
"active": true
}
],
"pid": "master@mesos-master0.internal.example.org.fail:5050",
"orphan_tasks": [],
"lost_tasks": 0,
"leader": "master@mesos-master0.internal.example.org.fail:5050",
"killed_tasks": 0,
"failed_tasks": 0,
"elected_time": 1429456501.61638,
"deactivated_slaves": 0,
"completed_frameworks": [],
"build_user": "buildbot",
"build_time": 1425085311,
"build_date": "2015-02-27 17:01:51",
"activated_slaves": 3,
"finished_tasks": 0,
"flags": {
"zk_session_timeout": "10secs",
"work_dir": "/somepath/mesos/local/Lc9arz",
"webui_dir": "/usr/local/share/mesos/webui",
"version": "false",
"user_sorter": "drf",
"slave_reregister_timeout": "10mins",
"logbufsecs": "0",
"log_auto_initialize": "true",
"initialize_driver_logging": "true",
"framework_sorter": "drf",
"authenticators": "crammd5",
"authenticate_slaves": "false",
"authenticate": "false",
"allocation_interval": "1secs",
"logging_level": "INFO",
"quiet": "false",
"recovery_slave_removal_limit": "100%",
"registry": "replicated_log",
"registry_fetch_timeout": "1mins",
"registry_store_timeout": "5secs",
"registry_strict": "false",
"root_submissions": "true"
},
"frameworks": [],
"git_branch": "refs/heads/0.22.0-rc1",
"git_sha": "46834faca67f877631e1beb7d61be5c080ec3dc2",
"git_tag": "0.22.0-rc1",
"hostname": "localhost",
"id": "20150419-081501-16777343-5050-16383"
}`
)
// Mocks
type FakeMasterDetector struct {
callback detector.MasterChanged
done chan struct{}
}
func newFakeMasterDetector() *FakeMasterDetector {
return &FakeMasterDetector{
done: make(chan struct{}),
}
}
func (md FakeMasterDetector) Cancel() {
close(md.done)
}
func (md FakeMasterDetector) Detect(cb detector.MasterChanged) error {
md.callback = cb
leadingMaster := mesosutil.NewMasterInfo(TEST_MASTER_ID, TEST_MASTER_IP, TEST_MASTER_PORT)
cb.OnMasterChanged(leadingMaster)
return nil
}
func (md FakeMasterDetector) Done() <-chan struct{} {
return md.done
}
// Auxiliary functions
func makeHttpMocks() (*httptest.Server, *http.Client, *http.Transport) {
httpServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
log.V(4).Infof("Mocking response for HTTP request: %#v", r)
if r.URL.Path == "/state.json" {
w.WriteHeader(200) // OK
w.Header().Set("Content-Type", "application/json")
fmt.Fprintln(w, TEST_STATE_JSON)
} else {
w.WriteHeader(400)
fmt.Fprintln(w, "Bad Request")
}
}))
// Intercept all client requests and feed them to the test server
transport := utilnet.SetTransportDefaults(&http.Transport{
Proxy: func(req *http.Request) (*url.URL, error) {
return url.Parse(httpServer.URL)
},
})
httpClient := &http.Client{Transport: transport}
return httpServer, httpClient, transport
}
// Tests
// test mesos.parseMesosState
func Test_parseMesosState(t *testing.T) {
state, err := parseMesosState([]byte(TEST_STATE_JSON))
if err != nil {
t.Fatalf("parseMesosState does not yield an error")
}
if state == nil {
t.Fatalf("parseMesosState yields a non-nil state")
}
if len(state.nodes) != 3 {
t.Fatalf("parseMesosState yields a state with 3 nodes")
}
}
// test mesos.listSlaves
func Test_listSlaves(t *testing.T) {
defer log.Flush()
md := FakeMasterDetector{}
httpServer, httpClient, httpTransport := makeHttpMocks()
defer httpServer.Close()
cacheTTL := 500 * time.Millisecond
mesosClient, err := createMesosClient(md, httpClient, httpTransport, cacheTTL)
if err != nil {
t.Fatalf("createMesosClient does not yield an error")
}
slaveNodes, err := mesosClient.listSlaves(context.TODO())
if err != nil {
t.Fatalf("listSlaves does not yield an error")
}
if len(slaveNodes) != 3 {
t.Fatalf("listSlaves yields a collection of size 3")
}
expectedHostnames := map[string]struct{}{
"mesos1.internal.example.org.fail": {},
"mesos2.internal.example.org.fail": {},
"mesos3.internal.example.org.fail": {},
}
actualHostnames := make(map[string]struct{})
for _, node := range slaveNodes {
actualHostnames[node.hostname] = struct{}{}
}
if !reflect.DeepEqual(expectedHostnames, actualHostnames) {
t.Fatalf("listSlaves yields a collection with the expected hostnames")
}
}
// test mesos.clusterName
func Test_clusterName(t *testing.T) {
defer log.Flush()
md := FakeMasterDetector{}
httpServer, httpClient, httpTransport := makeHttpMocks()
defer httpServer.Close()
cacheTTL := 500 * time.Millisecond
mesosClient, err := createMesosClient(md, httpClient, httpTransport, cacheTTL)
name, err := mesosClient.clusterName(context.TODO())
if err != nil {
t.Fatalf("clusterName does not yield an error")
}
if name != defaultClusterName {
t.Fatalf("clusterName yields the expected (default) value")
}
}

View File

@ -1,79 +0,0 @@
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package mesos
import (
"io"
"time"
"gopkg.in/gcfg.v1"
)
const (
DefaultMesosMaster = "localhost:5050"
DefaultHttpClientTimeout = time.Duration(10) * time.Second
DefaultStateCacheTTL = time.Duration(5) * time.Second
)
// Example Mesos cloud provider configuration file:
//
// [mesos-cloud]
// mesos-master = leader.mesos:5050
// http-client-timeout = 500ms
// state-cache-ttl = 1h
type ConfigWrapper struct {
Mesos_Cloud Config
}
type Config struct {
MesosMaster string `gcfg:"mesos-master"`
MesosHttpClientTimeout Duration `gcfg:"http-client-timeout"`
StateCacheTTL Duration `gcfg:"state-cache-ttl"`
}
type Duration struct {
Duration time.Duration `gcfg:"duration"`
}
func (d *Duration) UnmarshalText(data []byte) error {
underlying, err := time.ParseDuration(string(data))
if err == nil {
d.Duration = underlying
}
return err
}
func createDefaultConfig() *Config {
return &Config{
MesosMaster: DefaultMesosMaster,
MesosHttpClientTimeout: Duration{Duration: DefaultHttpClientTimeout},
StateCacheTTL: Duration{Duration: DefaultStateCacheTTL},
}
}
func readConfig(configReader io.Reader) (*Config, error) {
config := createDefaultConfig()
wrapper := &ConfigWrapper{Mesos_Cloud: *config}
if configReader != nil {
if err := gcfg.ReadInto(wrapper, configReader); err != nil {
return nil, err
}
config = &(wrapper.Mesos_Cloud)
}
return config, nil
}

View File

@ -1,75 +0,0 @@
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package mesos
import (
"bytes"
"testing"
"time"
log "github.com/golang/glog"
)
// test mesos.createDefaultConfig
func Test_createDefaultConfig(t *testing.T) {
defer log.Flush()
config := createDefaultConfig()
if config.MesosMaster != DefaultMesosMaster {
t.Fatalf("Default config has the expected MesosMaster value")
}
if config.MesosHttpClientTimeout.Duration != DefaultHttpClientTimeout {
t.Fatalf("Default config has the expected MesosHttpClientTimeout value")
}
if config.StateCacheTTL.Duration != DefaultStateCacheTTL {
t.Fatalf("Default config has the expected StateCacheTTL value")
}
}
// test mesos.readConfig
func Test_readConfig(t *testing.T) {
defer log.Flush()
configString := `
[mesos-cloud]
mesos-master = leader.mesos:5050
http-client-timeout = 500ms
state-cache-ttl = 1h`
reader := bytes.NewBufferString(configString)
config, err := readConfig(reader)
if err != nil {
t.Fatalf("Reading configuration does not yield an error: %#v", err)
}
if config.MesosMaster != "leader.mesos:5050" {
t.Fatalf("Parsed config has the expected MesosMaster value")
}
if config.MesosHttpClientTimeout.Duration != time.Duration(500)*time.Millisecond {
t.Fatalf("Parsed config has the expected MesosHttpClientTimeout value")
}
if config.StateCacheTTL.Duration != time.Duration(1)*time.Hour {
t.Fatalf("Parsed config has the expected StateCacheTTL value")
}
}

View File

@ -1,315 +0,0 @@
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package mesos
import (
"errors"
"fmt"
"io"
"net"
"regexp"
"golang.org/x/net/context"
log "github.com/golang/glog"
"github.com/mesos/mesos-go/detector"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/controller"
)
const (
ProviderName = "mesos"
// KubernetesExecutorName is shared between contrib/mesos and Mesos cloud provider.
// Because cloud provider -> contrib dependencies are forbidden, this constant
// is defined here, not in contrib.
KubernetesExecutorName = "Kubelet-Executor"
)
var (
CloudProvider *MesosCloud
noHostNameSpecified = errors.New("No hostname specified")
)
func init() {
cloudprovider.RegisterCloudProvider(
ProviderName,
func(configReader io.Reader) (cloudprovider.Interface, error) {
provider, err := newMesosCloud(configReader)
if err == nil {
CloudProvider = provider
}
return provider, err
})
}
type MesosCloud struct {
client *mesosClient
config *Config
}
func (c *MesosCloud) MasterURI() string {
return c.config.MesosMaster
}
func newMesosCloud(configReader io.Reader) (*MesosCloud, error) {
config, err := readConfig(configReader)
if err != nil {
return nil, err
}
log.V(1).Infof("new mesos cloud, master='%v'", config.MesosMaster)
if d, err := detector.New(config.MesosMaster); err != nil {
log.V(1).Infof("failed to create master detector: %v", err)
return nil, err
} else if cl, err := newMesosClient(d,
config.MesosHttpClientTimeout.Duration,
config.StateCacheTTL.Duration); err != nil {
log.V(1).Infof("failed to create mesos cloud client: %v", err)
return nil, err
} else {
return &MesosCloud{client: cl, config: config}, nil
}
}
// Initialize passes a Kubernetes clientBuilder interface to the cloud provider
func (c *MesosCloud) Initialize(clientBuilder controller.ControllerClientBuilder) {}
// Implementation of Instances.CurrentNodeName
func (c *MesosCloud) CurrentNodeName(hostname string) (types.NodeName, error) {
return types.NodeName(hostname), nil
}
func (c *MesosCloud) AddSSHKeyToAllInstances(user string, keyData []byte) error {
return errors.New("unimplemented")
}
// Instances returns a copy of the Mesos cloud Instances implementation.
// Mesos natively provides minimal cloud-type resources. More robust cloud
// support requires a combination of Mesos and cloud-specific knowledge.
func (c *MesosCloud) Instances() (cloudprovider.Instances, bool) {
return c, true
}
// LoadBalancer always returns nil, false in this implementation.
// Mesos does not provide any type of native load balancing by default,
// so this implementation always returns (nil, false).
func (c *MesosCloud) LoadBalancer() (cloudprovider.LoadBalancer, bool) {
return nil, false
}
// Zones always returns nil, false in this implementation.
// Mesos does not provide any type of native region or zone awareness,
// so this implementation always returns (nil, false).
func (c *MesosCloud) Zones() (cloudprovider.Zones, bool) {
return nil, false
}
// Clusters returns a copy of the Mesos cloud Clusters implementation.
// Mesos does not provide support for multiple clusters.
func (c *MesosCloud) Clusters() (cloudprovider.Clusters, bool) {
return c, true
}
// Routes always returns nil, false in this implementation.
func (c *MesosCloud) Routes() (cloudprovider.Routes, bool) {
return nil, false
}
// ProviderName returns the cloud provider ID.
func (c *MesosCloud) ProviderName() string {
return ProviderName
}
// ScrubDNS filters DNS settings for pods.
func (c *MesosCloud) ScrubDNS(nameservers, searches []string) (nsOut, srchOut []string) {
return nameservers, searches
}
// ListClusters lists the names of the available Mesos clusters.
func (c *MesosCloud) ListClusters() ([]string, error) {
// Always returns a single cluster (this one!)
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
name, err := c.client.clusterName(ctx)
return []string{name}, err
}
// Master gets back the address (either DNS name or IP address) of the leading Mesos master node for the cluster.
func (c *MesosCloud) Master(clusterName string) (string, error) {
clusters, err := c.ListClusters()
if err != nil {
return "", err
}
for _, name := range clusters {
if name == clusterName {
if c.client.master == "" {
return "", errors.New("The currently leading master is unknown.")
}
host, _, err := net.SplitHostPort(c.client.master)
if err != nil {
return "", err
}
return host, nil
}
}
return "", fmt.Errorf("The supplied cluster '%v' does not exist", clusterName)
}
// ipAddress returns an IP address of the specified instance.
func ipAddress(name string) (net.IP, error) {
if name == "" {
return nil, noHostNameSpecified
}
ipaddr := net.ParseIP(name)
if ipaddr != nil {
return ipaddr, nil
}
iplist, err := net.LookupIP(name)
if err != nil {
log.V(2).Infof("failed to resolve IP from host name '%v': %v", name, err)
return nil, err
}
ipaddr = iplist[0]
log.V(2).Infof("resolved host '%v' to '%v'", name, ipaddr)
return ipaddr, nil
}
// mapNodeNameToPrivateDNSName maps a k8s NodeName to an mesos hostname.
// This is a simple string cast
func mapNodeNameToHostname(nodeName types.NodeName) string {
return string(nodeName)
}
// ExternalID returns the cloud provider ID of the instance with the specified nodeName (deprecated).
func (c *MesosCloud) ExternalID(nodeName types.NodeName) (string, error) {
hostname := mapNodeNameToHostname(nodeName)
//TODO(jdef) use a timeout here? 15s?
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
nodes, err := c.client.listSlaves(ctx)
if err != nil {
return "", err
}
node := nodes[hostname]
if node == nil {
return "", cloudprovider.InstanceNotFound
}
ip, err := ipAddress(node.hostname)
if err != nil {
return "", err
}
return ip.String(), nil
}
// InstanceID returns the cloud provider ID of the instance with the specified nodeName.
func (c *MesosCloud) InstanceID(nodeName types.NodeName) (string, error) {
return "", nil
}
// InstanceTypeByProviderID returns the cloudprovider instance type of the node with the specified unique providerID
// This method will not be called from the node that is requesting this ID. i.e. metadata service
// and other local methods cannot be used here
func (c *MesosCloud) InstanceTypeByProviderID(providerID string) (string, error) {
return "", errors.New("unimplemented")
}
// InstanceType returns the type of the instance with the specified nodeName.
func (c *MesosCloud) InstanceType(nodeName types.NodeName) (string, error) {
return "", nil
}
func (c *MesosCloud) listNodes() (map[string]*slaveNode, error) {
//TODO(jdef) use a timeout here? 15s?
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
nodes, err := c.client.listSlaves(ctx)
if err != nil {
return nil, err
}
if len(nodes) == 0 {
log.V(2).Info("no slaves found, are any running?")
return nil, nil
}
return nodes, nil
}
// List lists instances that match 'filter' which is a regular expression
// which must match the entire instance name (fqdn).
func (c *MesosCloud) List(filter string) ([]types.NodeName, error) {
nodes, err := c.listNodes()
if err != nil {
return nil, err
}
filterRegex, err := regexp.Compile(filter)
if err != nil {
return nil, err
}
names := []types.NodeName{}
for _, node := range nodes {
if filterRegex.MatchString(node.hostname) {
names = append(names, types.NodeName(node.hostname))
}
}
return names, nil
}
// ListWithKubelet list those instance which have no running kubelet, i.e. the
// Kubernetes executor.
func (c *MesosCloud) ListWithoutKubelet() ([]string, error) {
nodes, err := c.listNodes()
if err != nil {
return nil, err
}
addr := make([]string, 0, len(nodes))
for _, n := range nodes {
if !n.kubeletRunning {
addr = append(addr, n.hostname)
}
}
return addr, nil
}
// NodeAddresses returns the addresses of the instance with the specified nodeName.
func (c *MesosCloud) NodeAddresses(nodeName types.NodeName) ([]v1.NodeAddress, error) {
name := mapNodeNameToHostname(nodeName)
ip, err := ipAddress(name)
if err != nil {
return nil, err
}
return []v1.NodeAddress{
{Type: v1.NodeInternalIP, Address: ip.String()},
{Type: v1.NodeExternalIP, Address: ip.String()},
}, nil
}
// NodeAddressesByProviderID returns the node addresses of an instances with the specified unique providerID
// This method will not be called from the node that is requesting this ID. i.e. metadata service
// and other local methods cannot be used here
func (c *MesosCloud) NodeAddressesByProviderID(providerID string) ([]v1.NodeAddress, error) {
return []v1.NodeAddress{}, errors.New("unimplemented")
}

View File

@ -1,280 +0,0 @@
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package mesos
import (
"bytes"
"net"
"reflect"
"testing"
"time"
log "github.com/golang/glog"
"k8s.io/apimachinery/pkg/types"
"k8s.io/kubernetes/pkg/cloudprovider"
)
func TestIPAddress(t *testing.T) {
expected4 := net.IPv4(127, 0, 0, 1)
ip, err := ipAddress("127.0.0.1")
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if !reflect.DeepEqual(ip, expected4) {
t.Fatalf("expected %#v instead of %#v", expected4, ip)
}
expected6 := net.ParseIP("::1")
if expected6 == nil {
t.Fatalf("failed to parse ipv6 ::1")
}
ip, err = ipAddress("::1")
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if !reflect.DeepEqual(ip, expected6) {
t.Fatalf("expected %#v instead of %#v", expected6, ip)
}
ip, err = ipAddress("localhost")
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if !reflect.DeepEqual(ip, expected4) && !reflect.DeepEqual(ip, expected6) {
t.Fatalf("expected %#v or %#v instead of %#v", expected4, expected6, ip)
}
_, err = ipAddress("")
if err != noHostNameSpecified {
t.Fatalf("expected error noHostNameSpecified but got none")
}
}
// test mesos.newMesosCloud with no config
func Test_newMesosCloud_NoConfig(t *testing.T) {
defer log.Flush()
mesosCloud, err := newMesosCloud(nil)
if err != nil {
t.Fatalf("Creating a new Mesos cloud provider without config does not yield an error: %#v", err)
}
if mesosCloud.client.httpClient.Timeout != DefaultHttpClientTimeout {
t.Fatalf("Creating a new Mesos cloud provider without config does not yield an error: %#v", err)
}
if mesosCloud.client.state.ttl != DefaultStateCacheTTL {
t.Fatalf("Mesos client with default config has the expected state cache TTL value")
}
}
// test mesos.newMesosCloud with custom config
func Test_newMesosCloud_WithConfig(t *testing.T) {
defer log.Flush()
configString := `
[mesos-cloud]
http-client-timeout = 500ms
state-cache-ttl = 1h`
reader := bytes.NewBufferString(configString)
mesosCloud, err := newMesosCloud(reader)
if err != nil {
t.Fatalf("Creating a new Mesos cloud provider with a custom config does not yield an error: %#v", err)
}
if mesosCloud.client.httpClient.Timeout != time.Duration(500)*time.Millisecond {
t.Fatalf("Mesos client with a custom config has the expected HTTP client timeout value")
}
if mesosCloud.client.state.ttl != time.Duration(1)*time.Hour {
t.Fatalf("Mesos client with a custom config has the expected state cache TTL value")
}
}
// tests for capability reporting functions
// test mesos.Instances
func Test_Instances(t *testing.T) {
defer log.Flush()
mesosCloud, _ := newMesosCloud(nil)
instances, supports_instances := mesosCloud.Instances()
if !supports_instances || instances == nil {
t.Fatalf("MesosCloud provides an implementation of Instances")
}
}
// test mesos.LoadBalancer
func Test_TcpLoadBalancer(t *testing.T) {
defer log.Flush()
mesosCloud, _ := newMesosCloud(nil)
lb, supports_lb := mesosCloud.LoadBalancer()
if supports_lb || lb != nil {
t.Fatalf("MesosCloud does not provide an implementation of LoadBalancer")
}
}
// test mesos.Zones
func Test_Zones(t *testing.T) {
defer log.Flush()
mesosCloud, _ := newMesosCloud(nil)
zones, supports_zones := mesosCloud.Zones()
if supports_zones || zones != nil {
t.Fatalf("MesosCloud does not provide an implementation of Zones")
}
}
// test mesos.Clusters
func Test_Clusters(t *testing.T) {
defer log.Flush()
mesosCloud, _ := newMesosCloud(nil)
clusters, supports_clusters := mesosCloud.Clusters()
if !supports_clusters || clusters == nil {
t.Fatalf("MesosCloud does not provide an implementation of Clusters")
}
}
// test mesos.MasterURI
func Test_MasterURI(t *testing.T) {
defer log.Flush()
mesosCloud, _ := newMesosCloud(nil)
uri := mesosCloud.MasterURI()
if uri != DefaultMesosMaster {
t.Fatalf("MasterURI returns the expected master URI (expected \"localhost\", actual \"%s\"", uri)
}
}
// test mesos.ListClusters
func Test_ListClusters(t *testing.T) {
defer log.Flush()
md := FakeMasterDetector{}
httpServer, httpClient, httpTransport := makeHttpMocks()
defer httpServer.Close()
cacheTTL := 500 * time.Millisecond
mesosClient, err := createMesosClient(md, httpClient, httpTransport, cacheTTL)
mesosCloud := &MesosCloud{client: mesosClient, config: createDefaultConfig()}
clusters, err := mesosCloud.ListClusters()
if err != nil {
t.Fatalf("ListClusters does not yield an error: %#v", err)
}
if len(clusters) != 1 {
t.Fatalf("ListClusters should return a list of size 1: (actual: %#v)", clusters)
}
expectedClusterNames := []string{"mesos"}
if !reflect.DeepEqual(clusters, expectedClusterNames) {
t.Fatalf("ListClusters should return the expected list of names: (expected: %#v, actual: %#v)",
expectedClusterNames,
clusters)
}
}
// test mesos.Master
func Test_Master(t *testing.T) {
defer log.Flush()
md := FakeMasterDetector{}
httpServer, httpClient, httpTransport := makeHttpMocks()
defer httpServer.Close()
cacheTTL := 500 * time.Millisecond
mesosClient, err := createMesosClient(md, httpClient, httpTransport, cacheTTL)
mesosCloud := &MesosCloud{client: mesosClient, config: createDefaultConfig()}
clusters, err := mesosCloud.ListClusters()
clusterName := clusters[0]
master, err := mesosCloud.Master(clusterName)
if err != nil {
t.Fatalf("Master does not yield an error: %#v", err)
}
expectedMaster := unpackIPv4(TEST_MASTER_IP)
if master != expectedMaster {
t.Fatalf("Master returns the unexpected value: (expected: %#v, actual: %#v", expectedMaster, master)
}
}
// test mesos.List
func Test_List(t *testing.T) {
defer log.Flush()
md := FakeMasterDetector{}
httpServer, httpClient, httpTransport := makeHttpMocks()
defer httpServer.Close()
cacheTTL := 500 * time.Millisecond
mesosClient, err := createMesosClient(md, httpClient, httpTransport, cacheTTL)
mesosCloud := &MesosCloud{client: mesosClient, config: createDefaultConfig()}
clusters, err := mesosCloud.List(".*") // recognizes the language of all strings
if err != nil {
t.Fatalf("List does not yield an error: %#v", err)
}
if len(clusters) != 3 {
t.Fatalf("List with a catch-all filter should return a list of size 3: (actual: %#v)", clusters)
}
clusters, err = mesosCloud.List("$^") // end-of-string followed by start-of-string: recognizes the empty language
if err != nil {
t.Fatalf("List does not yield an error: %#v", err)
}
if len(clusters) != 0 {
t.Fatalf("List with a reject-all filter should return a list of size 0: (actual: %#v)", clusters)
}
}
func Test_ExternalID(t *testing.T) {
defer log.Flush()
md := FakeMasterDetector{}
httpServer, httpClient, httpTransport := makeHttpMocks()
defer httpServer.Close()
cacheTTL := 500 * time.Millisecond
mesosClient, err := createMesosClient(md, httpClient, httpTransport, cacheTTL)
mesosCloud := &MesosCloud{client: mesosClient, config: createDefaultConfig()}
_, err = mesosCloud.ExternalID("unknown")
if err != cloudprovider.InstanceNotFound {
t.Fatalf("ExternalID did not return InstanceNotFound on an unknown instance")
}
slaveName := types.NodeName("mesos3.internal.example.org.fail")
id, err := mesosCloud.ExternalID(slaveName)
if id != "" {
t.Fatalf("ExternalID should not be able to resolve %q", slaveName)
}
if err == cloudprovider.InstanceNotFound {
t.Fatalf("ExternalID should find %q", slaveName)
}
}

View File

@ -1,21 +0,0 @@
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package mesos
import (
_ "github.com/mesos/mesos-go/detector/zoo"
)

View File

@ -22,7 +22,6 @@ import (
_ "k8s.io/kubernetes/pkg/cloudprovider/providers/azure" _ "k8s.io/kubernetes/pkg/cloudprovider/providers/azure"
_ "k8s.io/kubernetes/pkg/cloudprovider/providers/cloudstack" _ "k8s.io/kubernetes/pkg/cloudprovider/providers/cloudstack"
_ "k8s.io/kubernetes/pkg/cloudprovider/providers/gce" _ "k8s.io/kubernetes/pkg/cloudprovider/providers/gce"
_ "k8s.io/kubernetes/pkg/cloudprovider/providers/mesos"
_ "k8s.io/kubernetes/pkg/cloudprovider/providers/openstack" _ "k8s.io/kubernetes/pkg/cloudprovider/providers/openstack"
_ "k8s.io/kubernetes/pkg/cloudprovider/providers/ovirt" _ "k8s.io/kubernetes/pkg/cloudprovider/providers/ovirt"
_ "k8s.io/kubernetes/pkg/cloudprovider/providers/photon" _ "k8s.io/kubernetes/pkg/cloudprovider/providers/photon"