mesos cloud provider implementation

This commit is contained in:
James DeFelice 2015-05-12 16:02:26 +00:00
parent db9c6373df
commit eca9fd58c7
7 changed files with 1260 additions and 0 deletions

View File

@ -0,0 +1,293 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package mesos
import (
"encoding/binary"
"encoding/json"
"fmt"
"io/ioutil"
"net"
"net/http"
"sync"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/resource"
log "github.com/golang/glog"
"github.com/mesos/mesos-go/detector"
mesos "github.com/mesos/mesos-go/mesosproto"
"golang.org/x/net/context"
)
const defaultClusterName = "mesos"
var noLeadingMasterError = fmt.Errorf("there is no current leading master available to query")
type mesosClient struct {
masterLock sync.RWMutex
master string // host:port formatted address
httpClient *http.Client
tr *http.Transport
initialMaster <-chan struct{} // signal chan, closes once an initial, non-nil master is found
state *stateCache
}
type slaveNode struct {
hostname string
resources *api.NodeResources
}
type mesosState struct {
clusterName string
nodes []*slaveNode
}
type stateCache struct {
sync.Mutex
expiresAt time.Time
cached *mesosState
err error
ttl time.Duration
refill func(context.Context) (*mesosState, error)
}
// reloadCache reloads the state cache if it has expired.
func (c *stateCache) reloadCache(ctx context.Context) {
now := time.Now()
c.Lock()
defer c.Unlock()
if c.expiresAt.Before(now) {
log.V(4).Infof("Reloading cached Mesos state")
c.cached, c.err = c.refill(ctx)
c.expiresAt = now.Add(c.ttl)
} else {
log.V(4).Infof("Using cached Mesos state")
}
}
// cachedState returns the cached Mesos state.
func (c *stateCache) cachedState(ctx context.Context) (*mesosState, error) {
c.reloadCache(ctx)
return c.cached, c.err
}
// clusterName returns the cached Mesos cluster name.
func (c *stateCache) clusterName(ctx context.Context) (string, error) {
cached, err := c.cachedState(ctx)
return cached.clusterName, err
}
// nodes returns the cached list of slave nodes.
func (c *stateCache) nodes(ctx context.Context) ([]*slaveNode, error) {
cached, err := c.cachedState(ctx)
return cached.nodes, err
}
func newMesosClient(
md detector.Master,
mesosHttpClientTimeout, stateCacheTTL time.Duration) (*mesosClient, error) {
tr := &http.Transport{}
httpClient := &http.Client{
Transport: tr,
Timeout: mesosHttpClientTimeout,
}
return createMesosClient(md, httpClient, tr, stateCacheTTL)
}
func createMesosClient(
md detector.Master,
httpClient *http.Client,
tr *http.Transport,
stateCacheTTL time.Duration) (*mesosClient, error) {
initialMaster := make(chan struct{})
client := &mesosClient{
httpClient: httpClient,
tr: tr,
initialMaster: initialMaster,
state: &stateCache{
ttl: stateCacheTTL,
},
}
client.state.refill = client.pollMasterForState
first := true
if err := md.Detect(detector.OnMasterChanged(func(info *mesos.MasterInfo) {
client.masterLock.Lock()
defer client.masterLock.Unlock()
if info == nil {
client.master = ""
} else if host := info.GetHostname(); host != "" {
client.master = host
} else {
client.master = unpackIPv4(info.GetIp())
}
if len(client.master) > 0 {
client.master = fmt.Sprintf("%s:%d", client.master, info.GetPort())
if first {
first = false
close(initialMaster)
}
}
log.Infof("cloud master changed to '%v'", client.master)
})); err != nil {
log.V(1).Infof("detector initialization failed: %v", err)
return nil, err
}
return client, nil
}
func unpackIPv4(ip uint32) string {
octets := make([]byte, 4, 4)
binary.BigEndian.PutUint32(octets, ip)
ipv4 := net.IP(octets)
return ipv4.String()
}
// listSlaves returns a (possibly cached) list of slave nodes.
// Callers must not mutate the contents of the returned slice.
func (c *mesosClient) listSlaves(ctx context.Context) ([]*slaveNode, error) {
return c.state.nodes(ctx)
}
// clusterName returns a (possibly cached) cluster name.
func (c *mesosClient) clusterName(ctx context.Context) (string, error) {
return c.state.clusterName(ctx)
}
// pollMasterForState returns an array of slave nodes
func (c *mesosClient) pollMasterForState(ctx context.Context) (*mesosState, error) {
// wait for initial master detection
select {
case <-c.initialMaster: // noop
case <-ctx.Done():
return nil, ctx.Err()
}
master := func() string {
c.masterLock.RLock()
defer c.masterLock.RUnlock()
return c.master
}()
if master == "" {
return nil, noLeadingMasterError
}
//TODO(jdef) should not assume master uses http (what about https?)
uri := fmt.Sprintf("http://%s/state.json", master)
req, err := http.NewRequest("GET", uri, nil)
if err != nil {
return nil, err
}
var state *mesosState
err = c.httpDo(ctx, req, func(res *http.Response, err error) error {
if err != nil {
return err
}
defer res.Body.Close()
if res.StatusCode != 200 {
return fmt.Errorf("HTTP request failed with code %d: %v", res.StatusCode, res.Status)
}
blob, err1 := ioutil.ReadAll(res.Body)
if err1 != nil {
return err1
}
log.V(3).Infof("Got mesos state, content length %v", len(blob))
state, err1 = parseMesosState(blob)
return err1
})
return state, err
}
func parseMesosState(blob []byte) (*mesosState, error) {
type State struct {
ClusterName string `json:"cluster"`
Slaves []*struct {
Id string `json:"id"` // ex: 20150106-162714-3815890698-5050-2453-S2
Pid string `json:"pid"` // ex: slave(1)@10.22.211.18:5051
Hostname string `json:"hostname"` // ex: 10.22.211.18, or slave-123.nowhere.com
Resources map[string]interface{} `json:"resources"` // ex: {"mem": 123, "ports": "[31000-3200]"}
} `json:"slaves"`
}
state := &State{ClusterName: defaultClusterName}
if err := json.Unmarshal(blob, state); err != nil {
return nil, err
}
nodes := []*slaveNode{}
for _, slave := range state.Slaves {
if slave.Hostname == "" {
continue
}
node := &slaveNode{hostname: slave.Hostname}
cap := api.ResourceList{}
if slave.Resources != nil && len(slave.Resources) > 0 {
// attempt to translate CPU (cores) and memory (MB) resources
if cpu, found := slave.Resources["cpus"]; found {
if cpuNum, ok := cpu.(float64); ok {
cap[api.ResourceCPU] = *resource.NewQuantity(int64(cpuNum), resource.DecimalSI)
} else {
log.Warningf("unexpected slave cpu resource type %T: %v", cpu, cpu)
}
} else {
log.Warningf("slave failed to report cpu resource")
}
if mem, found := slave.Resources["mem"]; found {
if memNum, ok := mem.(float64); ok {
cap[api.ResourceMemory] = *resource.NewQuantity(int64(memNum), resource.BinarySI)
} else {
log.Warningf("unexpected slave mem resource type %T: %v", mem, mem)
}
} else {
log.Warningf("slave failed to report mem resource")
}
}
if len(cap) > 0 {
node.resources = &api.NodeResources{
Capacity: cap,
}
log.V(4).Infof("node %q reporting capacity %v", node.hostname, cap)
}
nodes = append(nodes, node)
}
result := &mesosState{
clusterName: state.ClusterName,
nodes: nodes,
}
return result, nil
}
type responseHandler func(*http.Response, error) error
// httpDo executes an HTTP request in the given context, canceling an ongoing request if the context
// is canceled prior to completion of the request. hacked from https://blog.golang.org/context
func (c *mesosClient) httpDo(ctx context.Context, req *http.Request, f responseHandler) error {
// Run the HTTP request in a goroutine and pass the response to f.
ch := make(chan error, 1)
go func() { ch <- f(c.httpClient.Do(req)) }()
select {
case <-ctx.Done():
c.tr.CancelRequest(req)
<-ch // Wait for f to return.
return ctx.Err()
case err := <-ch:
return err
}
}

View File

@ -0,0 +1,266 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package mesos
import (
"fmt"
"net/http"
"net/http/httptest"
"net/url"
"reflect"
"testing"
"time"
log "github.com/golang/glog"
"github.com/mesos/mesos-go/detector"
"github.com/mesos/mesos-go/mesosutil"
"golang.org/x/net/context"
)
// Test data
const (
TEST_MASTER_ID = "master-12345"
TEST_MASTER_IP = 177048842 // 10.141.141.10
TEST_MASTER_PORT = 5050
TEST_STATE_JSON = `
{
"version": "0.22.0",
"unregistered_frameworks": [],
"started_tasks": 0,
"start_time": 1429456501.61141,
"staged_tasks": 0,
"slaves": [
{
"resources": {
"ports": "[31000-32000]",
"mem": 15360,
"disk": 470842,
"cpus": 8
},
"registered_time": 1429456502.46999,
"pid": "slave(1)@mesos1.internal.company.com:5050",
"id": "20150419-081501-16777343-5050-16383-S2",
"hostname": "mesos1.internal.company.com",
"attributes": {},
"active": true
},
{
"resources": {
"ports": "[31000-32000]",
"mem": 15360,
"disk": 470842,
"cpus": 8
},
"registered_time": 1429456502.4144,
"pid": "slave(1)@mesos2.internal.company.com:5050",
"id": "20150419-081501-16777343-5050-16383-S1",
"hostname": "mesos2.internal.company.com",
"attributes": {},
"active": true
},
{
"resources": {
"ports": "[31000-32000]",
"mem": 15360,
"disk": 470842,
"cpus": 8
},
"registered_time": 1429456502.02879,
"pid": "slave(1)@mesos3.internal.company.com:5050",
"id": "20150419-081501-16777343-5050-16383-S0",
"hostname": "mesos3.internal.company.com",
"attributes": {},
"active": true
}
],
"pid": "master@mesos-master0.internal.company.com:5050",
"orphan_tasks": [],
"lost_tasks": 0,
"leader": "master@mesos-master0.internal.company.com:5050",
"killed_tasks": 0,
"failed_tasks": 0,
"elected_time": 1429456501.61638,
"deactivated_slaves": 0,
"completed_frameworks": [],
"build_user": "buildbot",
"build_time": 1425085311,
"build_date": "2015-02-27 17:01:51",
"activated_slaves": 3,
"finished_tasks": 0,
"flags": {
"zk_session_timeout": "10secs",
"work_dir": "/tmp/mesos/local/Lc9arz",
"webui_dir": "/usr/local/share/mesos/webui",
"version": "false",
"user_sorter": "drf",
"slave_reregister_timeout": "10mins",
"logbufsecs": "0",
"log_auto_initialize": "true",
"initialize_driver_logging": "true",
"framework_sorter": "drf",
"authenticators": "crammd5",
"authenticate_slaves": "false",
"authenticate": "false",
"allocation_interval": "1secs",
"logging_level": "INFO",
"quiet": "false",
"recovery_slave_removal_limit": "100%",
"registry": "replicated_log",
"registry_fetch_timeout": "1mins",
"registry_store_timeout": "5secs",
"registry_strict": "false",
"root_submissions": "true"
},
"frameworks": [],
"git_branch": "refs/heads/0.22.0-rc1",
"git_sha": "46834faca67f877631e1beb7d61be5c080ec3dc2",
"git_tag": "0.22.0-rc1",
"hostname": "localhost",
"id": "20150419-081501-16777343-5050-16383"
}`
)
// Mocks
type FakeMasterDetector struct {
callback detector.MasterChanged
done chan struct{}
}
func newFakeMasterDetector() *FakeMasterDetector {
return &FakeMasterDetector{
done: make(chan struct{}),
}
}
func (md FakeMasterDetector) Cancel() {
close(md.done)
}
func (md FakeMasterDetector) Detect(cb detector.MasterChanged) error {
md.callback = cb
leadingMaster := mesosutil.NewMasterInfo(TEST_MASTER_ID, TEST_MASTER_IP, TEST_MASTER_PORT)
cb.OnMasterChanged(leadingMaster)
return nil
}
func (md FakeMasterDetector) Done() <-chan struct{} {
return md.done
}
// Auxilliary functions
func makeHttpMocks() (*httptest.Server, *http.Client, *http.Transport) {
httpServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
log.V(4).Infof("Mocking response for HTTP request: %#v", r)
if r.URL.Path == "/state.json" {
w.WriteHeader(200) // OK
w.Header().Set("Content-Type", "application/json")
fmt.Fprintln(w, TEST_STATE_JSON)
} else {
w.WriteHeader(400)
fmt.Fprintln(w, "Bad Request")
}
}))
// Intercept all client requests and feed them to the test server
transport := &http.Transport{
Proxy: func(req *http.Request) (*url.URL, error) {
return url.Parse(httpServer.URL)
},
}
httpClient := &http.Client{Transport: transport}
return httpServer, httpClient, transport
}
// Tests
// test mesos.parseMesosState
func Test_parseMesosState(t *testing.T) {
state, err := parseMesosState([]byte(TEST_STATE_JSON))
if err != nil {
t.Fatalf("parseMesosState does not yield an error")
}
if state == nil {
t.Fatalf("parseMesosState yields a non-nil state")
}
if len(state.nodes) != 3 {
t.Fatalf("parseMesosState yields a state with 3 nodes")
}
}
// test mesos.listSlaves
func Test_listSlaves(t *testing.T) {
defer log.Flush()
md := FakeMasterDetector{}
httpServer, httpClient, httpTransport := makeHttpMocks()
defer httpServer.Close()
cacheTTL := 500 * time.Millisecond
mesosClient, err := createMesosClient(md, httpClient, httpTransport, cacheTTL)
if err != nil {
t.Fatalf("createMesosClient does not yield an error")
}
slaveNodes, err := mesosClient.listSlaves(context.TODO())
if err != nil {
t.Fatalf("listSlaves does not yield an error")
}
if len(slaveNodes) != 3 {
t.Fatalf("listSlaves yields a collection of size 3")
}
expectedHostnames := map[string]struct{}{
"mesos1.internal.company.com": {},
"mesos2.internal.company.com": {},
"mesos3.internal.company.com": {},
}
actualHostnames := make(map[string]struct{})
for _, node := range slaveNodes {
actualHostnames[node.hostname] = struct{}{}
}
if !reflect.DeepEqual(expectedHostnames, actualHostnames) {
t.Fatalf("listSlaves yields a collection with the expected hostnames")
}
}
// test mesos.clusterName
func Test_clusterName(t *testing.T) {
defer log.Flush()
md := FakeMasterDetector{}
httpServer, httpClient, httpTransport := makeHttpMocks()
defer httpServer.Close()
cacheTTL := 500 * time.Millisecond
mesosClient, err := createMesosClient(md, httpClient, httpTransport, cacheTTL)
name, err := mesosClient.clusterName(context.TODO())
if err != nil {
t.Fatalf("clusterName does not yield an error")
}
if name != defaultClusterName {
t.Fatalf("clusterName yields the expected (default) value")
}
}

View File

@ -0,0 +1,79 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package mesos
import (
"io"
"time"
"code.google.com/p/gcfg"
)
const (
DefaultMesosMaster = "localhost:5050"
DefaultHttpClientTimeout = time.Duration(10) * time.Second
DefaultStateCacheTTL = time.Duration(5) * time.Second
)
// Example Mesos cloud provider configuration file:
//
// [mesos-cloud]
// mesos-master = leader.mesos:5050
// http-client-timeout = 500ms
// state-cache-ttl = 1h
type ConfigWrapper struct {
Mesos_Cloud Config
}
type Config struct {
MesosMaster string `gcfg:"mesos-master"`
MesosHttpClientTimeout Duration `gcfg:"http-client-timeout"`
StateCacheTTL Duration `gcfg:"state-cache-ttl"`
}
type Duration struct {
Duration time.Duration `gcfg:"duration"`
}
func (d *Duration) UnmarshalText(data []byte) error {
underlying, err := time.ParseDuration(string(data))
if err == nil {
d.Duration = underlying
}
return err
}
func createDefaultConfig() *Config {
return &Config{
MesosMaster: DefaultMesosMaster,
MesosHttpClientTimeout: Duration{Duration: DefaultHttpClientTimeout},
StateCacheTTL: Duration{Duration: DefaultStateCacheTTL},
}
}
func readConfig(configReader io.Reader) (*Config, error) {
config := createDefaultConfig()
wrapper := &ConfigWrapper{Mesos_Cloud: *config}
if configReader != nil {
if err := gcfg.ReadInto(wrapper, configReader); err != nil {
return nil, err
}
config = &(wrapper.Mesos_Cloud)
}
return config, nil
}

View File

@ -0,0 +1,75 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package mesos
import (
"bytes"
"testing"
"time"
log "github.com/golang/glog"
)
// test mesos.createDefaultConfig
func Test_createDefaultConfig(t *testing.T) {
defer log.Flush()
config := createDefaultConfig()
if config.MesosMaster != DefaultMesosMaster {
t.Fatalf("Default config has the expected MesosMaster value")
}
if config.MesosHttpClientTimeout.Duration != DefaultHttpClientTimeout {
t.Fatalf("Default config has the expected MesosHttpClientTimeout value")
}
if config.StateCacheTTL.Duration != DefaultStateCacheTTL {
t.Fatalf("Default config has the expected StateCacheTTL value")
}
}
// test mesos.readConfig
func Test_readConfig(t *testing.T) {
defer log.Flush()
configString := `
[mesos-cloud]
mesos-master = leader.mesos:5050
http-client-timeout = 500ms
state-cache-ttl = 1h`
reader := bytes.NewBufferString(configString)
config, err := readConfig(reader)
if err != nil {
t.Fatalf("Reading configuration does not yield an error: %#v", err)
}
if config.MesosMaster != "leader.mesos:5050" {
t.Fatalf("Parsed config has the expected MesosMaster value")
}
if config.MesosHttpClientTimeout.Duration != time.Duration(500)*time.Millisecond {
t.Fatalf("Parsed config has the expected MesosHttpClientTimeout value")
}
if config.StateCacheTTL.Duration != time.Duration(1)*time.Hour {
t.Fatalf("Parsed config has the expected StateCacheTTL value")
}
}

View File

@ -0,0 +1,238 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package mesos
import (
"errors"
"fmt"
"io"
"net"
"regexp"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider"
log "github.com/golang/glog"
"github.com/mesos/mesos-go/detector"
"golang.org/x/net/context"
)
var (
PluginName = "mesos"
CloudProvider *MesosCloud
noHostNameSpecified = errors.New("No hostname specified")
)
func init() {
cloudprovider.RegisterCloudProvider(
PluginName,
func(configReader io.Reader) (cloudprovider.Interface, error) {
provider, err := newMesosCloud(configReader)
if err == nil {
CloudProvider = provider
}
return provider, err
})
}
type MesosCloud struct {
client *mesosClient
config *Config
}
func (c *MesosCloud) MasterURI() string {
return c.config.MesosMaster
}
func newMesosCloud(configReader io.Reader) (*MesosCloud, error) {
config, err := readConfig(configReader)
if err != nil {
return nil, err
}
log.V(1).Infof("new mesos cloud, master='%v'", config.MesosMaster)
if d, err := detector.New(config.MesosMaster); err != nil {
log.V(1).Infof("failed to create master detector: %v", err)
return nil, err
} else if cl, err := newMesosClient(d,
config.MesosHttpClientTimeout.Duration,
config.StateCacheTTL.Duration); err != nil {
log.V(1).Infof("failed to create mesos cloud client: %v", err)
return nil, err
} else {
return &MesosCloud{client: cl, config: config}, nil
}
}
// Instances returns a copy of the Mesos cloud Instances implementation.
// Mesos natively provides minimal cloud-type resources. More robust cloud
// support requires a combination of Mesos and cloud-specific knowledge.
func (c *MesosCloud) Instances() (cloudprovider.Instances, bool) {
return c, true
}
// TCPLoadBalancer always returns nil, false in this implementation.
// Mesos does not provide any type of native load balancing by default,
// so this implementation always returns (nil, false).
func (c *MesosCloud) TCPLoadBalancer() (cloudprovider.TCPLoadBalancer, bool) {
return nil, false
}
// Zones always returns nil, false in this implementation.
// Mesos does not provide any type of native region or zone awareness,
// so this implementation always returns (nil, false).
func (c *MesosCloud) Zones() (cloudprovider.Zones, bool) {
return nil, false
}
// Clusters returns a copy of the Mesos cloud Clusters implementation.
// Mesos does not provide support for multiple clusters.
func (c *MesosCloud) Clusters() (cloudprovider.Clusters, bool) {
return c, true
}
// ListClusters lists the names of the available Mesos clusters.
func (c *MesosCloud) ListClusters() ([]string, error) {
// Always returns a single cluster (this one!)
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
name, err := c.client.clusterName(ctx)
return []string{name}, err
}
// Master gets back the address (either DNS name or IP address) of the master node for the cluster.
func (c *MesosCloud) Master(clusterName string) (string, error) {
clusters, err := c.ListClusters()
if err != nil {
return "", err
}
for _, name := range clusters {
if name == clusterName {
if c.client.master == "" {
return "", errors.New("The currently leading master is unknown.")
}
host, _, err := net.SplitHostPort(c.client.master)
if err != nil {
return "", err
}
return host, nil
}
}
return "", errors.New(fmt.Sprintf("The supplied cluster '%v' does not exist", clusterName))
}
// ipAddress returns an IP address of the specified instance.
func (c *MesosCloud) ipAddress(name string) (net.IP, error) {
if name == "" {
return nil, noHostNameSpecified
}
ipaddr := net.ParseIP(name)
if ipaddr != nil {
return ipaddr, nil
}
iplist, err := net.LookupIP(name)
if err != nil {
log.V(2).Infof("failed to resolve IP from host name '%v': %v", name, err)
return nil, err
}
ipaddr = iplist[0]
log.V(2).Infof("resolved host '%v' to '%v'", name, ipaddr)
return ipaddr, nil
}
// ExternalID returns the cloud provider ID of the specified instance.
func (c *MesosCloud) ExternalID(instance string) (string, error) {
ip, err := c.ipAddress(instance)
if err != nil {
return "", err
}
return ip.String(), nil
}
// List lists instances that match 'filter' which is a regular expression
// which must match the entire instance name (fqdn).
func (c *MesosCloud) List(filter string) ([]string, error) {
//TODO(jdef) use a timeout here? 15s?
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
nodes, err := c.client.listSlaves(ctx)
if err != nil {
return nil, err
}
if len(nodes) == 0 {
log.V(2).Info("no slaves found, are any running?")
return nil, nil
}
filterRegex, err := regexp.Compile(filter)
if err != nil {
return nil, err
}
addr := []string{}
for _, node := range nodes {
if filterRegex.MatchString(node.hostname) {
addr = append(addr, node.hostname)
}
}
return addr, err
}
// GetNodeResources gets the resources for a particular node
func (c *MesosCloud) GetNodeResources(name string) (*api.NodeResources, error) {
//TODO(jdef) use a timeout here? 15s?
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
nodes, err := c.client.listSlaves(ctx)
if err != nil {
return nil, err
}
if len(nodes) == 0 {
log.V(2).Info("no slaves found, are any running?")
} else {
for _, node := range nodes {
if name == node.hostname {
return node.resources, nil
}
}
}
log.Warningf("failed to locate node spec for %q", name)
return nil, nil
}
// NodeAddresses returns the addresses of the specified instance.
func (c *MesosCloud) NodeAddresses(name string) ([]api.NodeAddress, error) {
ip, err := c.ipAddress(name)
if err != nil {
return nil, err
}
return []api.NodeAddress{{Type: api.NodeLegacyHostIP, Address: ip.String()}}, nil
}
// Configure the specified instance using the spec.
// Ths implementation is a noop.
func (c *MesosCloud) Configure(name string, spec *api.NodeSpec) error {
return nil
}
// Release deletes all the configuration related to the instance, including other cloud resources.
// Ths implementation is a noop.
func (c *MesosCloud) Release(name string) error {
return nil
}

View File

@ -0,0 +1,288 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package mesos
import (
"bytes"
"net"
"reflect"
"testing"
"time"
log "github.com/golang/glog"
"speter.net/go/exp/math/dec/inf"
)
func TestIPAddress(t *testing.T) {
c := &MesosCloud{}
expected4 := net.IPv4(127, 0, 0, 1)
ip, err := c.ipAddress("127.0.0.1")
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if !reflect.DeepEqual(ip, expected4) {
t.Fatalf("expected %#v instead of %#v", expected4, ip)
}
expected6 := net.ParseIP("::1")
if expected6 == nil {
t.Fatalf("failed to parse ipv6 ::1")
}
ip, err = c.ipAddress("::1")
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if !reflect.DeepEqual(ip, expected6) {
t.Fatalf("expected %#v instead of %#v", expected6, ip)
}
ip, err = c.ipAddress("localhost")
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if !reflect.DeepEqual(ip, expected4) && !reflect.DeepEqual(ip, expected6) {
t.Fatalf("expected %#v or %#v instead of %#v", expected4, expected6, ip)
}
_, err = c.ipAddress("")
if err != noHostNameSpecified {
t.Fatalf("expected error noHostNameSpecified but got none")
}
}
// test mesos.newMesosCloud with no config
func Test_newMesosCloud_NoConfig(t *testing.T) {
defer log.Flush()
mesosCloud, err := newMesosCloud(nil)
if err != nil {
t.Fatalf("Creating a new Mesos cloud provider without config does not yield an error: %#v", err)
}
if mesosCloud.client.httpClient.Timeout != DefaultHttpClientTimeout {
t.Fatalf("Creating a new Mesos cloud provider without config does not yield an error: %#v", err)
}
if mesosCloud.client.state.ttl != DefaultStateCacheTTL {
t.Fatalf("Mesos client with default config has the expected state cache TTL value")
}
}
// test mesos.newMesosCloud with custom config
func Test_newMesosCloud_WithConfig(t *testing.T) {
defer log.Flush()
configString := `
[mesos-cloud]
http-client-timeout = 500ms
state-cache-ttl = 1h`
reader := bytes.NewBufferString(configString)
mesosCloud, err := newMesosCloud(reader)
if err != nil {
t.Fatalf("Creating a new Mesos cloud provider with a custom config does not yield an error: %#v", err)
}
if mesosCloud.client.httpClient.Timeout != time.Duration(500)*time.Millisecond {
t.Fatalf("Mesos client with a custom config has the expected HTTP client timeout value")
}
if mesosCloud.client.state.ttl != time.Duration(1)*time.Hour {
t.Fatalf("Mesos client with a custom config has the expected state cache TTL value")
}
}
// tests for capability reporting functions
// test mesos.Instances
func Test_Instances(t *testing.T) {
defer log.Flush()
mesosCloud, _ := newMesosCloud(nil)
instances, supports_instances := mesosCloud.Instances()
if !supports_instances || instances == nil {
t.Fatalf("MesosCloud provides an implementation of Instances")
}
}
// test mesos.TCPLoadBalancer
func Test_TcpLoadBalancer(t *testing.T) {
defer log.Flush()
mesosCloud, _ := newMesosCloud(nil)
lb, supports_lb := mesosCloud.TCPLoadBalancer()
if supports_lb || lb != nil {
t.Fatalf("MesosCloud does not provide an implementation of TCPLoadBalancer")
}
}
// test mesos.Zones
func Test_Zones(t *testing.T) {
defer log.Flush()
mesosCloud, _ := newMesosCloud(nil)
zones, supports_zones := mesosCloud.Zones()
if supports_zones || zones != nil {
t.Fatalf("MesosCloud does not provide an implementation of Zones")
}
}
// test mesos.Clusters
func Test_Clusters(t *testing.T) {
defer log.Flush()
mesosCloud, _ := newMesosCloud(nil)
clusters, supports_clusters := mesosCloud.Clusters()
if !supports_clusters || clusters == nil {
t.Fatalf("MesosCloud does not provide an implementation of Clusters")
}
}
// test mesos.MasterURI
func Test_MasterURI(t *testing.T) {
defer log.Flush()
mesosCloud, _ := newMesosCloud(nil)
uri := mesosCloud.MasterURI()
if uri != DefaultMesosMaster {
t.Fatalf("MasterURI returns the expected master URI (expected \"localhost\", actual \"%s\"", uri)
}
}
// test mesos.ListClusters
func Test_ListClusters(t *testing.T) {
defer log.Flush()
md := FakeMasterDetector{}
httpServer, httpClient, httpTransport := makeHttpMocks()
defer httpServer.Close()
cacheTTL := 500 * time.Millisecond
mesosClient, err := createMesosClient(md, httpClient, httpTransport, cacheTTL)
mesosCloud := &MesosCloud{client: mesosClient, config: createDefaultConfig()}
clusters, err := mesosCloud.ListClusters()
if err != nil {
t.Fatalf("ListClusters does not yield an error: %#v", err)
}
if len(clusters) != 1 {
t.Fatalf("ListClusters should return a list of size 1: (actual: %#v)", clusters)
}
expectedClusterNames := []string{"mesos"}
if !reflect.DeepEqual(clusters, expectedClusterNames) {
t.Fatalf("ListClusters should return the expected list of names: (expected: %#v, actual: %#v)",
expectedClusterNames,
clusters)
}
}
// test mesos.Master
func Test_Master(t *testing.T) {
defer log.Flush()
md := FakeMasterDetector{}
httpServer, httpClient, httpTransport := makeHttpMocks()
defer httpServer.Close()
cacheTTL := 500 * time.Millisecond
mesosClient, err := createMesosClient(md, httpClient, httpTransport, cacheTTL)
mesosCloud := &MesosCloud{client: mesosClient, config: createDefaultConfig()}
clusters, err := mesosCloud.ListClusters()
clusterName := clusters[0]
master, err := mesosCloud.Master(clusterName)
if err != nil {
t.Fatalf("Master does not yield an error: %#v", err)
}
expectedMaster := unpackIPv4(TEST_MASTER_IP)
if master != expectedMaster {
t.Fatalf("Master returns the expected value: (expected: %#v, actual: %#v", expectedMaster, master)
}
}
// test mesos.List
func Test_List(t *testing.T) {
defer log.Flush()
md := FakeMasterDetector{}
httpServer, httpClient, httpTransport := makeHttpMocks()
defer httpServer.Close()
cacheTTL := 500 * time.Millisecond
mesosClient, err := createMesosClient(md, httpClient, httpTransport, cacheTTL)
mesosCloud := &MesosCloud{client: mesosClient, config: createDefaultConfig()}
clusters, err := mesosCloud.List(".*") // recognizes the language of all strings
if err != nil {
t.Fatalf("List does not yield an error: %#v", err)
}
if len(clusters) != 3 {
t.Fatalf("List with a catch-all filter should return a list of size 3: (actual: %#v)", clusters)
}
clusters, err = mesosCloud.List("$^") // end-of-string followed by start-of-string: recognizes the empty language
if err != nil {
t.Fatalf("List does not yield an error: %#v", err)
}
if len(clusters) != 0 {
t.Fatalf("List with a reject-all filter should return a list of size 0: (actual: %#v)", clusters)
}
}
// test mesos.GetNodeResources
func Test_GetNodeResources(t *testing.T) {
defer log.Flush()
md := FakeMasterDetector{}
httpServer, httpClient, httpTransport := makeHttpMocks()
defer httpServer.Close()
cacheTTL := 500 * time.Millisecond
mesosClient, err := createMesosClient(md, httpClient, httpTransport, cacheTTL)
mesosCloud := &MesosCloud{client: mesosClient, config: createDefaultConfig()}
resources, err := mesosCloud.GetNodeResources("mesos1.internal.company.com")
if err != nil {
t.Fatalf("GetNodeResources does not yield an error: %#v", err)
}
expectedCpu := inf.NewDec(8, 0)
expectedMem := inf.NewDec(15360, 0)
actualCpu := resources.Capacity["cpu"].Amount
actualMem := resources.Capacity["memory"].Amount
if actualCpu.Cmp(expectedCpu) != 0 {
t.Fatalf("GetNodeResources should return the expected amount of cpu: (expected: %#v, vactual: %#v)", expectedCpu, actualCpu)
}
if actualMem.Cmp(expectedMem) != 0 {
t.Fatalf("GetNodeResources should return the expected amount of memory: (expected: %#v, vactual: %#v)", expectedMem, actualMem)
}
}

View File

@ -0,0 +1,21 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package mesos
import (
_ "github.com/mesos/mesos-go/detector/zoo"
)