diff --git a/pkg/cloudprovider/mesos/client.go b/pkg/cloudprovider/mesos/client.go new file mode 100644 index 00000000000..6fa82373729 --- /dev/null +++ b/pkg/cloudprovider/mesos/client.go @@ -0,0 +1,293 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package mesos + +import ( + "encoding/binary" + "encoding/json" + "fmt" + "io/ioutil" + "net" + "net/http" + "sync" + "time" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api/resource" + log "github.com/golang/glog" + "github.com/mesos/mesos-go/detector" + mesos "github.com/mesos/mesos-go/mesosproto" + "golang.org/x/net/context" +) + +const defaultClusterName = "mesos" + +var noLeadingMasterError = fmt.Errorf("there is no current leading master available to query") + +type mesosClient struct { + masterLock sync.RWMutex + master string // host:port formatted address + httpClient *http.Client + tr *http.Transport + initialMaster <-chan struct{} // signal chan, closes once an initial, non-nil master is found + state *stateCache +} + +type slaveNode struct { + hostname string + resources *api.NodeResources +} + +type mesosState struct { + clusterName string + nodes []*slaveNode +} + +type stateCache struct { + sync.Mutex + expiresAt time.Time + cached *mesosState + err error + ttl time.Duration + refill func(context.Context) (*mesosState, error) +} + +// reloadCache reloads the state cache if it has expired. +func (c *stateCache) reloadCache(ctx context.Context) { + now := time.Now() + c.Lock() + defer c.Unlock() + if c.expiresAt.Before(now) { + log.V(4).Infof("Reloading cached Mesos state") + c.cached, c.err = c.refill(ctx) + c.expiresAt = now.Add(c.ttl) + } else { + log.V(4).Infof("Using cached Mesos state") + } +} + +// cachedState returns the cached Mesos state. +func (c *stateCache) cachedState(ctx context.Context) (*mesosState, error) { + c.reloadCache(ctx) + return c.cached, c.err +} + +// clusterName returns the cached Mesos cluster name. +func (c *stateCache) clusterName(ctx context.Context) (string, error) { + cached, err := c.cachedState(ctx) + return cached.clusterName, err +} + +// nodes returns the cached list of slave nodes. +func (c *stateCache) nodes(ctx context.Context) ([]*slaveNode, error) { + cached, err := c.cachedState(ctx) + return cached.nodes, err +} + +func newMesosClient( + md detector.Master, + mesosHttpClientTimeout, stateCacheTTL time.Duration) (*mesosClient, error) { + + tr := &http.Transport{} + httpClient := &http.Client{ + Transport: tr, + Timeout: mesosHttpClientTimeout, + } + return createMesosClient(md, httpClient, tr, stateCacheTTL) +} + +func createMesosClient( + md detector.Master, + httpClient *http.Client, + tr *http.Transport, + stateCacheTTL time.Duration) (*mesosClient, error) { + + initialMaster := make(chan struct{}) + client := &mesosClient{ + httpClient: httpClient, + tr: tr, + initialMaster: initialMaster, + state: &stateCache{ + ttl: stateCacheTTL, + }, + } + client.state.refill = client.pollMasterForState + first := true + if err := md.Detect(detector.OnMasterChanged(func(info *mesos.MasterInfo) { + client.masterLock.Lock() + defer client.masterLock.Unlock() + if info == nil { + client.master = "" + } else if host := info.GetHostname(); host != "" { + client.master = host + } else { + client.master = unpackIPv4(info.GetIp()) + } + if len(client.master) > 0 { + client.master = fmt.Sprintf("%s:%d", client.master, info.GetPort()) + if first { + first = false + close(initialMaster) + } + } + log.Infof("cloud master changed to '%v'", client.master) + })); err != nil { + log.V(1).Infof("detector initialization failed: %v", err) + return nil, err + } + return client, nil +} + +func unpackIPv4(ip uint32) string { + octets := make([]byte, 4, 4) + binary.BigEndian.PutUint32(octets, ip) + ipv4 := net.IP(octets) + return ipv4.String() +} + +// listSlaves returns a (possibly cached) list of slave nodes. +// Callers must not mutate the contents of the returned slice. +func (c *mesosClient) listSlaves(ctx context.Context) ([]*slaveNode, error) { + return c.state.nodes(ctx) +} + +// clusterName returns a (possibly cached) cluster name. +func (c *mesosClient) clusterName(ctx context.Context) (string, error) { + return c.state.clusterName(ctx) +} + +// pollMasterForState returns an array of slave nodes +func (c *mesosClient) pollMasterForState(ctx context.Context) (*mesosState, error) { + // wait for initial master detection + select { + case <-c.initialMaster: // noop + case <-ctx.Done(): + return nil, ctx.Err() + } + + master := func() string { + c.masterLock.RLock() + defer c.masterLock.RUnlock() + return c.master + }() + if master == "" { + return nil, noLeadingMasterError + } + + //TODO(jdef) should not assume master uses http (what about https?) + + uri := fmt.Sprintf("http://%s/state.json", master) + req, err := http.NewRequest("GET", uri, nil) + if err != nil { + return nil, err + } + var state *mesosState + err = c.httpDo(ctx, req, func(res *http.Response, err error) error { + if err != nil { + return err + } + defer res.Body.Close() + if res.StatusCode != 200 { + return fmt.Errorf("HTTP request failed with code %d: %v", res.StatusCode, res.Status) + } + blob, err1 := ioutil.ReadAll(res.Body) + if err1 != nil { + return err1 + } + log.V(3).Infof("Got mesos state, content length %v", len(blob)) + state, err1 = parseMesosState(blob) + return err1 + }) + return state, err +} + +func parseMesosState(blob []byte) (*mesosState, error) { + type State struct { + ClusterName string `json:"cluster"` + Slaves []*struct { + Id string `json:"id"` // ex: 20150106-162714-3815890698-5050-2453-S2 + Pid string `json:"pid"` // ex: slave(1)@10.22.211.18:5051 + Hostname string `json:"hostname"` // ex: 10.22.211.18, or slave-123.nowhere.com + Resources map[string]interface{} `json:"resources"` // ex: {"mem": 123, "ports": "[31000-3200]"} + } `json:"slaves"` + } + state := &State{ClusterName: defaultClusterName} + if err := json.Unmarshal(blob, state); err != nil { + return nil, err + } + nodes := []*slaveNode{} + for _, slave := range state.Slaves { + if slave.Hostname == "" { + continue + } + node := &slaveNode{hostname: slave.Hostname} + cap := api.ResourceList{} + if slave.Resources != nil && len(slave.Resources) > 0 { + // attempt to translate CPU (cores) and memory (MB) resources + if cpu, found := slave.Resources["cpus"]; found { + if cpuNum, ok := cpu.(float64); ok { + cap[api.ResourceCPU] = *resource.NewQuantity(int64(cpuNum), resource.DecimalSI) + } else { + log.Warningf("unexpected slave cpu resource type %T: %v", cpu, cpu) + } + } else { + log.Warningf("slave failed to report cpu resource") + } + if mem, found := slave.Resources["mem"]; found { + if memNum, ok := mem.(float64); ok { + cap[api.ResourceMemory] = *resource.NewQuantity(int64(memNum), resource.BinarySI) + } else { + log.Warningf("unexpected slave mem resource type %T: %v", mem, mem) + } + } else { + log.Warningf("slave failed to report mem resource") + } + } + if len(cap) > 0 { + node.resources = &api.NodeResources{ + Capacity: cap, + } + log.V(4).Infof("node %q reporting capacity %v", node.hostname, cap) + } + nodes = append(nodes, node) + } + + result := &mesosState{ + clusterName: state.ClusterName, + nodes: nodes, + } + + return result, nil +} + +type responseHandler func(*http.Response, error) error + +// httpDo executes an HTTP request in the given context, canceling an ongoing request if the context +// is canceled prior to completion of the request. hacked from https://blog.golang.org/context +func (c *mesosClient) httpDo(ctx context.Context, req *http.Request, f responseHandler) error { + // Run the HTTP request in a goroutine and pass the response to f. + ch := make(chan error, 1) + go func() { ch <- f(c.httpClient.Do(req)) }() + select { + case <-ctx.Done(): + c.tr.CancelRequest(req) + <-ch // Wait for f to return. + return ctx.Err() + case err := <-ch: + return err + } +} diff --git a/pkg/cloudprovider/mesos/client_test.go b/pkg/cloudprovider/mesos/client_test.go new file mode 100644 index 00000000000..eaf5d706b62 --- /dev/null +++ b/pkg/cloudprovider/mesos/client_test.go @@ -0,0 +1,266 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package mesos + +import ( + "fmt" + "net/http" + "net/http/httptest" + "net/url" + "reflect" + "testing" + "time" + + log "github.com/golang/glog" + "github.com/mesos/mesos-go/detector" + "github.com/mesos/mesos-go/mesosutil" + "golang.org/x/net/context" +) + +// Test data + +const ( + TEST_MASTER_ID = "master-12345" + TEST_MASTER_IP = 177048842 // 10.141.141.10 + TEST_MASTER_PORT = 5050 + + TEST_STATE_JSON = ` + { + "version": "0.22.0", + "unregistered_frameworks": [], + "started_tasks": 0, + "start_time": 1429456501.61141, + "staged_tasks": 0, + "slaves": [ + { + "resources": { + "ports": "[31000-32000]", + "mem": 15360, + "disk": 470842, + "cpus": 8 + }, + "registered_time": 1429456502.46999, + "pid": "slave(1)@mesos1.internal.company.com:5050", + "id": "20150419-081501-16777343-5050-16383-S2", + "hostname": "mesos1.internal.company.com", + "attributes": {}, + "active": true + }, + { + "resources": { + "ports": "[31000-32000]", + "mem": 15360, + "disk": 470842, + "cpus": 8 + }, + "registered_time": 1429456502.4144, + "pid": "slave(1)@mesos2.internal.company.com:5050", + "id": "20150419-081501-16777343-5050-16383-S1", + "hostname": "mesos2.internal.company.com", + "attributes": {}, + "active": true + }, + { + "resources": { + "ports": "[31000-32000]", + "mem": 15360, + "disk": 470842, + "cpus": 8 + }, + "registered_time": 1429456502.02879, + "pid": "slave(1)@mesos3.internal.company.com:5050", + "id": "20150419-081501-16777343-5050-16383-S0", + "hostname": "mesos3.internal.company.com", + "attributes": {}, + "active": true + } + ], + "pid": "master@mesos-master0.internal.company.com:5050", + "orphan_tasks": [], + "lost_tasks": 0, + "leader": "master@mesos-master0.internal.company.com:5050", + "killed_tasks": 0, + "failed_tasks": 0, + "elected_time": 1429456501.61638, + "deactivated_slaves": 0, + "completed_frameworks": [], + "build_user": "buildbot", + "build_time": 1425085311, + "build_date": "2015-02-27 17:01:51", + "activated_slaves": 3, + "finished_tasks": 0, + "flags": { + "zk_session_timeout": "10secs", + "work_dir": "/tmp/mesos/local/Lc9arz", + "webui_dir": "/usr/local/share/mesos/webui", + "version": "false", + "user_sorter": "drf", + "slave_reregister_timeout": "10mins", + "logbufsecs": "0", + "log_auto_initialize": "true", + "initialize_driver_logging": "true", + "framework_sorter": "drf", + "authenticators": "crammd5", + "authenticate_slaves": "false", + "authenticate": "false", + "allocation_interval": "1secs", + "logging_level": "INFO", + "quiet": "false", + "recovery_slave_removal_limit": "100%", + "registry": "replicated_log", + "registry_fetch_timeout": "1mins", + "registry_store_timeout": "5secs", + "registry_strict": "false", + "root_submissions": "true" + }, + "frameworks": [], + "git_branch": "refs/heads/0.22.0-rc1", + "git_sha": "46834faca67f877631e1beb7d61be5c080ec3dc2", + "git_tag": "0.22.0-rc1", + "hostname": "localhost", + "id": "20150419-081501-16777343-5050-16383" + }` +) + +// Mocks + +type FakeMasterDetector struct { + callback detector.MasterChanged + done chan struct{} +} + +func newFakeMasterDetector() *FakeMasterDetector { + return &FakeMasterDetector{ + done: make(chan struct{}), + } +} + +func (md FakeMasterDetector) Cancel() { + close(md.done) +} + +func (md FakeMasterDetector) Detect(cb detector.MasterChanged) error { + md.callback = cb + leadingMaster := mesosutil.NewMasterInfo(TEST_MASTER_ID, TEST_MASTER_IP, TEST_MASTER_PORT) + cb.OnMasterChanged(leadingMaster) + return nil +} + +func (md FakeMasterDetector) Done() <-chan struct{} { + return md.done +} + +// Auxilliary functions + +func makeHttpMocks() (*httptest.Server, *http.Client, *http.Transport) { + httpServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + log.V(4).Infof("Mocking response for HTTP request: %#v", r) + if r.URL.Path == "/state.json" { + w.WriteHeader(200) // OK + w.Header().Set("Content-Type", "application/json") + fmt.Fprintln(w, TEST_STATE_JSON) + } else { + w.WriteHeader(400) + fmt.Fprintln(w, "Bad Request") + } + })) + + // Intercept all client requests and feed them to the test server + transport := &http.Transport{ + Proxy: func(req *http.Request) (*url.URL, error) { + return url.Parse(httpServer.URL) + }, + } + + httpClient := &http.Client{Transport: transport} + + return httpServer, httpClient, transport +} + +// Tests + +// test mesos.parseMesosState +func Test_parseMesosState(t *testing.T) { + state, err := parseMesosState([]byte(TEST_STATE_JSON)) + + if err != nil { + t.Fatalf("parseMesosState does not yield an error") + } + if state == nil { + t.Fatalf("parseMesosState yields a non-nil state") + } + if len(state.nodes) != 3 { + t.Fatalf("parseMesosState yields a state with 3 nodes") + } +} + +// test mesos.listSlaves +func Test_listSlaves(t *testing.T) { + defer log.Flush() + md := FakeMasterDetector{} + httpServer, httpClient, httpTransport := makeHttpMocks() + defer httpServer.Close() + cacheTTL := 500 * time.Millisecond + mesosClient, err := createMesosClient(md, httpClient, httpTransport, cacheTTL) + + if err != nil { + t.Fatalf("createMesosClient does not yield an error") + } + + slaveNodes, err := mesosClient.listSlaves(context.TODO()) + + if err != nil { + t.Fatalf("listSlaves does not yield an error") + } + if len(slaveNodes) != 3 { + t.Fatalf("listSlaves yields a collection of size 3") + } + + expectedHostnames := map[string]struct{}{ + "mesos1.internal.company.com": {}, + "mesos2.internal.company.com": {}, + "mesos3.internal.company.com": {}, + } + + actualHostnames := make(map[string]struct{}) + for _, node := range slaveNodes { + actualHostnames[node.hostname] = struct{}{} + } + + if !reflect.DeepEqual(expectedHostnames, actualHostnames) { + t.Fatalf("listSlaves yields a collection with the expected hostnames") + } +} + +// test mesos.clusterName +func Test_clusterName(t *testing.T) { + defer log.Flush() + md := FakeMasterDetector{} + httpServer, httpClient, httpTransport := makeHttpMocks() + defer httpServer.Close() + cacheTTL := 500 * time.Millisecond + mesosClient, err := createMesosClient(md, httpClient, httpTransport, cacheTTL) + + name, err := mesosClient.clusterName(context.TODO()) + + if err != nil { + t.Fatalf("clusterName does not yield an error") + } + if name != defaultClusterName { + t.Fatalf("clusterName yields the expected (default) value") + } +} diff --git a/pkg/cloudprovider/mesos/config.go b/pkg/cloudprovider/mesos/config.go new file mode 100644 index 00000000000..bd62aca588b --- /dev/null +++ b/pkg/cloudprovider/mesos/config.go @@ -0,0 +1,79 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package mesos + +import ( + "io" + "time" + + "code.google.com/p/gcfg" +) + +const ( + DefaultMesosMaster = "localhost:5050" + DefaultHttpClientTimeout = time.Duration(10) * time.Second + DefaultStateCacheTTL = time.Duration(5) * time.Second +) + +// Example Mesos cloud provider configuration file: +// +// [mesos-cloud] +// mesos-master = leader.mesos:5050 +// http-client-timeout = 500ms +// state-cache-ttl = 1h + +type ConfigWrapper struct { + Mesos_Cloud Config +} + +type Config struct { + MesosMaster string `gcfg:"mesos-master"` + MesosHttpClientTimeout Duration `gcfg:"http-client-timeout"` + StateCacheTTL Duration `gcfg:"state-cache-ttl"` +} + +type Duration struct { + Duration time.Duration `gcfg:"duration"` +} + +func (d *Duration) UnmarshalText(data []byte) error { + underlying, err := time.ParseDuration(string(data)) + if err == nil { + d.Duration = underlying + } + return err +} + +func createDefaultConfig() *Config { + return &Config{ + MesosMaster: DefaultMesosMaster, + MesosHttpClientTimeout: Duration{Duration: DefaultHttpClientTimeout}, + StateCacheTTL: Duration{Duration: DefaultStateCacheTTL}, + } +} + +func readConfig(configReader io.Reader) (*Config, error) { + config := createDefaultConfig() + wrapper := &ConfigWrapper{Mesos_Cloud: *config} + if configReader != nil { + if err := gcfg.ReadInto(wrapper, configReader); err != nil { + return nil, err + } + config = &(wrapper.Mesos_Cloud) + } + return config, nil +} diff --git a/pkg/cloudprovider/mesos/config_test.go b/pkg/cloudprovider/mesos/config_test.go new file mode 100644 index 00000000000..d1013471c71 --- /dev/null +++ b/pkg/cloudprovider/mesos/config_test.go @@ -0,0 +1,75 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package mesos + +import ( + "bytes" + "testing" + "time" + + log "github.com/golang/glog" +) + +// test mesos.createDefaultConfig +func Test_createDefaultConfig(t *testing.T) { + defer log.Flush() + + config := createDefaultConfig() + + if config.MesosMaster != DefaultMesosMaster { + t.Fatalf("Default config has the expected MesosMaster value") + } + + if config.MesosHttpClientTimeout.Duration != DefaultHttpClientTimeout { + t.Fatalf("Default config has the expected MesosHttpClientTimeout value") + } + + if config.StateCacheTTL.Duration != DefaultStateCacheTTL { + t.Fatalf("Default config has the expected StateCacheTTL value") + } +} + +// test mesos.readConfig +func Test_readConfig(t *testing.T) { + defer log.Flush() + + configString := ` +[mesos-cloud] + mesos-master = leader.mesos:5050 + http-client-timeout = 500ms + state-cache-ttl = 1h` + + reader := bytes.NewBufferString(configString) + + config, err := readConfig(reader) + + if err != nil { + t.Fatalf("Reading configuration does not yield an error: %#v", err) + } + + if config.MesosMaster != "leader.mesos:5050" { + t.Fatalf("Parsed config has the expected MesosMaster value") + } + + if config.MesosHttpClientTimeout.Duration != time.Duration(500)*time.Millisecond { + t.Fatalf("Parsed config has the expected MesosHttpClientTimeout value") + } + + if config.StateCacheTTL.Duration != time.Duration(1)*time.Hour { + t.Fatalf("Parsed config has the expected StateCacheTTL value") + } +} diff --git a/pkg/cloudprovider/mesos/mesos.go b/pkg/cloudprovider/mesos/mesos.go new file mode 100644 index 00000000000..8df50beefde --- /dev/null +++ b/pkg/cloudprovider/mesos/mesos.go @@ -0,0 +1,238 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package mesos + +import ( + "errors" + "fmt" + "io" + "net" + "regexp" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider" + log "github.com/golang/glog" + "github.com/mesos/mesos-go/detector" + "golang.org/x/net/context" +) + +var ( + PluginName = "mesos" + CloudProvider *MesosCloud + + noHostNameSpecified = errors.New("No hostname specified") +) + +func init() { + cloudprovider.RegisterCloudProvider( + PluginName, + func(configReader io.Reader) (cloudprovider.Interface, error) { + provider, err := newMesosCloud(configReader) + if err == nil { + CloudProvider = provider + } + return provider, err + }) +} + +type MesosCloud struct { + client *mesosClient + config *Config +} + +func (c *MesosCloud) MasterURI() string { + return c.config.MesosMaster +} + +func newMesosCloud(configReader io.Reader) (*MesosCloud, error) { + config, err := readConfig(configReader) + if err != nil { + return nil, err + } + + log.V(1).Infof("new mesos cloud, master='%v'", config.MesosMaster) + if d, err := detector.New(config.MesosMaster); err != nil { + log.V(1).Infof("failed to create master detector: %v", err) + return nil, err + } else if cl, err := newMesosClient(d, + config.MesosHttpClientTimeout.Duration, + config.StateCacheTTL.Duration); err != nil { + log.V(1).Infof("failed to create mesos cloud client: %v", err) + return nil, err + } else { + return &MesosCloud{client: cl, config: config}, nil + } +} + +// Instances returns a copy of the Mesos cloud Instances implementation. +// Mesos natively provides minimal cloud-type resources. More robust cloud +// support requires a combination of Mesos and cloud-specific knowledge. +func (c *MesosCloud) Instances() (cloudprovider.Instances, bool) { + return c, true +} + +// TCPLoadBalancer always returns nil, false in this implementation. +// Mesos does not provide any type of native load balancing by default, +// so this implementation always returns (nil, false). +func (c *MesosCloud) TCPLoadBalancer() (cloudprovider.TCPLoadBalancer, bool) { + return nil, false +} + +// Zones always returns nil, false in this implementation. +// Mesos does not provide any type of native region or zone awareness, +// so this implementation always returns (nil, false). +func (c *MesosCloud) Zones() (cloudprovider.Zones, bool) { + return nil, false +} + +// Clusters returns a copy of the Mesos cloud Clusters implementation. +// Mesos does not provide support for multiple clusters. +func (c *MesosCloud) Clusters() (cloudprovider.Clusters, bool) { + return c, true +} + +// ListClusters lists the names of the available Mesos clusters. +func (c *MesosCloud) ListClusters() ([]string, error) { + // Always returns a single cluster (this one!) + ctx, cancel := context.WithCancel(context.TODO()) + defer cancel() + name, err := c.client.clusterName(ctx) + return []string{name}, err +} + +// Master gets back the address (either DNS name or IP address) of the master node for the cluster. +func (c *MesosCloud) Master(clusterName string) (string, error) { + clusters, err := c.ListClusters() + if err != nil { + return "", err + } + for _, name := range clusters { + if name == clusterName { + if c.client.master == "" { + return "", errors.New("The currently leading master is unknown.") + } + + host, _, err := net.SplitHostPort(c.client.master) + if err != nil { + return "", err + } + + return host, nil + } + } + return "", errors.New(fmt.Sprintf("The supplied cluster '%v' does not exist", clusterName)) +} + +// ipAddress returns an IP address of the specified instance. +func (c *MesosCloud) ipAddress(name string) (net.IP, error) { + if name == "" { + return nil, noHostNameSpecified + } + ipaddr := net.ParseIP(name) + if ipaddr != nil { + return ipaddr, nil + } + iplist, err := net.LookupIP(name) + if err != nil { + log.V(2).Infof("failed to resolve IP from host name '%v': %v", name, err) + return nil, err + } + ipaddr = iplist[0] + log.V(2).Infof("resolved host '%v' to '%v'", name, ipaddr) + return ipaddr, nil +} + +// ExternalID returns the cloud provider ID of the specified instance. +func (c *MesosCloud) ExternalID(instance string) (string, error) { + ip, err := c.ipAddress(instance) + if err != nil { + return "", err + } + return ip.String(), nil +} + +// List lists instances that match 'filter' which is a regular expression +// which must match the entire instance name (fqdn). +func (c *MesosCloud) List(filter string) ([]string, error) { + //TODO(jdef) use a timeout here? 15s? + ctx, cancel := context.WithCancel(context.TODO()) + defer cancel() + + nodes, err := c.client.listSlaves(ctx) + if err != nil { + return nil, err + } + if len(nodes) == 0 { + log.V(2).Info("no slaves found, are any running?") + return nil, nil + } + filterRegex, err := regexp.Compile(filter) + if err != nil { + return nil, err + } + addr := []string{} + for _, node := range nodes { + if filterRegex.MatchString(node.hostname) { + addr = append(addr, node.hostname) + } + } + return addr, err +} + +// GetNodeResources gets the resources for a particular node +func (c *MesosCloud) GetNodeResources(name string) (*api.NodeResources, error) { + //TODO(jdef) use a timeout here? 15s? + ctx, cancel := context.WithCancel(context.TODO()) + defer cancel() + + nodes, err := c.client.listSlaves(ctx) + if err != nil { + return nil, err + } + if len(nodes) == 0 { + log.V(2).Info("no slaves found, are any running?") + } else { + for _, node := range nodes { + if name == node.hostname { + return node.resources, nil + } + } + } + log.Warningf("failed to locate node spec for %q", name) + return nil, nil +} + +// NodeAddresses returns the addresses of the specified instance. +func (c *MesosCloud) NodeAddresses(name string) ([]api.NodeAddress, error) { + ip, err := c.ipAddress(name) + if err != nil { + return nil, err + } + return []api.NodeAddress{{Type: api.NodeLegacyHostIP, Address: ip.String()}}, nil +} + +// Configure the specified instance using the spec. +// Ths implementation is a noop. +func (c *MesosCloud) Configure(name string, spec *api.NodeSpec) error { + return nil +} + +// Release deletes all the configuration related to the instance, including other cloud resources. +// Ths implementation is a noop. +func (c *MesosCloud) Release(name string) error { + return nil +} diff --git a/pkg/cloudprovider/mesos/mesos_test.go b/pkg/cloudprovider/mesos/mesos_test.go new file mode 100644 index 00000000000..b6503faff8f --- /dev/null +++ b/pkg/cloudprovider/mesos/mesos_test.go @@ -0,0 +1,288 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package mesos + +import ( + "bytes" + "net" + "reflect" + "testing" + "time" + + log "github.com/golang/glog" + "speter.net/go/exp/math/dec/inf" +) + +func TestIPAddress(t *testing.T) { + c := &MesosCloud{} + expected4 := net.IPv4(127, 0, 0, 1) + ip, err := c.ipAddress("127.0.0.1") + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if !reflect.DeepEqual(ip, expected4) { + t.Fatalf("expected %#v instead of %#v", expected4, ip) + } + + expected6 := net.ParseIP("::1") + if expected6 == nil { + t.Fatalf("failed to parse ipv6 ::1") + } + ip, err = c.ipAddress("::1") + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if !reflect.DeepEqual(ip, expected6) { + t.Fatalf("expected %#v instead of %#v", expected6, ip) + } + + ip, err = c.ipAddress("localhost") + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if !reflect.DeepEqual(ip, expected4) && !reflect.DeepEqual(ip, expected6) { + t.Fatalf("expected %#v or %#v instead of %#v", expected4, expected6, ip) + } + + _, err = c.ipAddress("") + if err != noHostNameSpecified { + t.Fatalf("expected error noHostNameSpecified but got none") + } +} + +// test mesos.newMesosCloud with no config +func Test_newMesosCloud_NoConfig(t *testing.T) { + defer log.Flush() + mesosCloud, err := newMesosCloud(nil) + + if err != nil { + t.Fatalf("Creating a new Mesos cloud provider without config does not yield an error: %#v", err) + } + + if mesosCloud.client.httpClient.Timeout != DefaultHttpClientTimeout { + t.Fatalf("Creating a new Mesos cloud provider without config does not yield an error: %#v", err) + } + + if mesosCloud.client.state.ttl != DefaultStateCacheTTL { + t.Fatalf("Mesos client with default config has the expected state cache TTL value") + } +} + +// test mesos.newMesosCloud with custom config +func Test_newMesosCloud_WithConfig(t *testing.T) { + defer log.Flush() + + configString := ` +[mesos-cloud] + http-client-timeout = 500ms + state-cache-ttl = 1h` + + reader := bytes.NewBufferString(configString) + + mesosCloud, err := newMesosCloud(reader) + + if err != nil { + t.Fatalf("Creating a new Mesos cloud provider with a custom config does not yield an error: %#v", err) + } + + if mesosCloud.client.httpClient.Timeout != time.Duration(500)*time.Millisecond { + t.Fatalf("Mesos client with a custom config has the expected HTTP client timeout value") + } + + if mesosCloud.client.state.ttl != time.Duration(1)*time.Hour { + t.Fatalf("Mesos client with a custom config has the expected state cache TTL value") + } +} + +// tests for capability reporting functions + +// test mesos.Instances +func Test_Instances(t *testing.T) { + defer log.Flush() + mesosCloud, _ := newMesosCloud(nil) + + instances, supports_instances := mesosCloud.Instances() + + if !supports_instances || instances == nil { + t.Fatalf("MesosCloud provides an implementation of Instances") + } +} + +// test mesos.TCPLoadBalancer +func Test_TcpLoadBalancer(t *testing.T) { + defer log.Flush() + mesosCloud, _ := newMesosCloud(nil) + + lb, supports_lb := mesosCloud.TCPLoadBalancer() + + if supports_lb || lb != nil { + t.Fatalf("MesosCloud does not provide an implementation of TCPLoadBalancer") + } +} + +// test mesos.Zones +func Test_Zones(t *testing.T) { + defer log.Flush() + mesosCloud, _ := newMesosCloud(nil) + + zones, supports_zones := mesosCloud.Zones() + + if supports_zones || zones != nil { + t.Fatalf("MesosCloud does not provide an implementation of Zones") + } +} + +// test mesos.Clusters +func Test_Clusters(t *testing.T) { + defer log.Flush() + mesosCloud, _ := newMesosCloud(nil) + + clusters, supports_clusters := mesosCloud.Clusters() + + if !supports_clusters || clusters == nil { + t.Fatalf("MesosCloud does not provide an implementation of Clusters") + } +} + +// test mesos.MasterURI +func Test_MasterURI(t *testing.T) { + defer log.Flush() + mesosCloud, _ := newMesosCloud(nil) + + uri := mesosCloud.MasterURI() + + if uri != DefaultMesosMaster { + t.Fatalf("MasterURI returns the expected master URI (expected \"localhost\", actual \"%s\"", uri) + } +} + +// test mesos.ListClusters +func Test_ListClusters(t *testing.T) { + defer log.Flush() + md := FakeMasterDetector{} + httpServer, httpClient, httpTransport := makeHttpMocks() + defer httpServer.Close() + cacheTTL := 500 * time.Millisecond + mesosClient, err := createMesosClient(md, httpClient, httpTransport, cacheTTL) + mesosCloud := &MesosCloud{client: mesosClient, config: createDefaultConfig()} + + clusters, err := mesosCloud.ListClusters() + + if err != nil { + t.Fatalf("ListClusters does not yield an error: %#v", err) + } + + if len(clusters) != 1 { + t.Fatalf("ListClusters should return a list of size 1: (actual: %#v)", clusters) + } + + expectedClusterNames := []string{"mesos"} + + if !reflect.DeepEqual(clusters, expectedClusterNames) { + t.Fatalf("ListClusters should return the expected list of names: (expected: %#v, actual: %#v)", + expectedClusterNames, + clusters) + } +} + +// test mesos.Master +func Test_Master(t *testing.T) { + defer log.Flush() + md := FakeMasterDetector{} + httpServer, httpClient, httpTransport := makeHttpMocks() + defer httpServer.Close() + cacheTTL := 500 * time.Millisecond + mesosClient, err := createMesosClient(md, httpClient, httpTransport, cacheTTL) + mesosCloud := &MesosCloud{client: mesosClient, config: createDefaultConfig()} + + clusters, err := mesosCloud.ListClusters() + clusterName := clusters[0] + master, err := mesosCloud.Master(clusterName) + + if err != nil { + t.Fatalf("Master does not yield an error: %#v", err) + } + + expectedMaster := unpackIPv4(TEST_MASTER_IP) + + if master != expectedMaster { + t.Fatalf("Master returns the expected value: (expected: %#v, actual: %#v", expectedMaster, master) + } +} + +// test mesos.List +func Test_List(t *testing.T) { + defer log.Flush() + md := FakeMasterDetector{} + httpServer, httpClient, httpTransport := makeHttpMocks() + defer httpServer.Close() + cacheTTL := 500 * time.Millisecond + mesosClient, err := createMesosClient(md, httpClient, httpTransport, cacheTTL) + mesosCloud := &MesosCloud{client: mesosClient, config: createDefaultConfig()} + + clusters, err := mesosCloud.List(".*") // recognizes the language of all strings + + if err != nil { + t.Fatalf("List does not yield an error: %#v", err) + } + + if len(clusters) != 3 { + t.Fatalf("List with a catch-all filter should return a list of size 3: (actual: %#v)", clusters) + } + + clusters, err = mesosCloud.List("$^") // end-of-string followed by start-of-string: recognizes the empty language + + if err != nil { + t.Fatalf("List does not yield an error: %#v", err) + } + + if len(clusters) != 0 { + t.Fatalf("List with a reject-all filter should return a list of size 0: (actual: %#v)", clusters) + } +} + +// test mesos.GetNodeResources +func Test_GetNodeResources(t *testing.T) { + defer log.Flush() + md := FakeMasterDetector{} + httpServer, httpClient, httpTransport := makeHttpMocks() + defer httpServer.Close() + cacheTTL := 500 * time.Millisecond + mesosClient, err := createMesosClient(md, httpClient, httpTransport, cacheTTL) + mesosCloud := &MesosCloud{client: mesosClient, config: createDefaultConfig()} + + resources, err := mesosCloud.GetNodeResources("mesos1.internal.company.com") + + if err != nil { + t.Fatalf("GetNodeResources does not yield an error: %#v", err) + } + + expectedCpu := inf.NewDec(8, 0) + expectedMem := inf.NewDec(15360, 0) + + actualCpu := resources.Capacity["cpu"].Amount + actualMem := resources.Capacity["memory"].Amount + + if actualCpu.Cmp(expectedCpu) != 0 { + t.Fatalf("GetNodeResources should return the expected amount of cpu: (expected: %#v, vactual: %#v)", expectedCpu, actualCpu) + } + + if actualMem.Cmp(expectedMem) != 0 { + t.Fatalf("GetNodeResources should return the expected amount of memory: (expected: %#v, vactual: %#v)", expectedMem, actualMem) + } + +} diff --git a/pkg/cloudprovider/mesos/plugins.go b/pkg/cloudprovider/mesos/plugins.go new file mode 100644 index 00000000000..2baf7b47f50 --- /dev/null +++ b/pkg/cloudprovider/mesos/plugins.go @@ -0,0 +1,21 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package mesos + +import ( + _ "github.com/mesos/mesos-go/detector/zoo" +)