Add ListWithoutKubelet to mesos cloud provider

This commit is contained in:
Dr. Stefan Schimanski 2015-11-05 12:29:39 +01:00
parent bc7523a775
commit 0c1d90bf5f
2 changed files with 70 additions and 10 deletions

View File

@ -49,13 +49,14 @@ type mesosClient struct {
}
type slaveNode struct {
hostname string
resources *api.NodeResources
hostname string
kubeletRunning bool
resources *api.NodeResources
}
type mesosState struct {
clusterName string
nodes map[string]*slaveNode
nodes map[string]*slaveNode // by hostname
}
type stateCache struct {
@ -160,7 +161,7 @@ func unpackIPv4(ip uint32) string {
return ipv4.String()
}
// listSlaves returns a (possibly cached) list of slave nodes.
// listSlaves returns a (possibly cached) map of slave nodes by hostname.
// Callers must not mutate the contents of the returned slice.
func (c *mesosClient) listSlaves(ctx context.Context) (map[string]*slaveNode, error) {
return c.state.nodes(ctx)
@ -225,12 +226,36 @@ func parseMesosState(blob []byte) (*mesosState, error) {
Hostname string `json:"hostname"` // ex: 10.22.211.18, or slave-123.nowhere.com
Resources map[string]interface{} `json:"resources"` // ex: {"mem": 123, "ports": "[31000-3200]"}
} `json:"slaves"`
Frameworks []*struct {
Id string `json:"id"` // ex: 20151105-093752-3745622208-5050-1-0000
Pid string `json:"pid"` // ex: scheduler(1)@192.168.65.228:57124
Executors []*struct {
SlaveId string `json:"slave_id"` // ex: 20151105-093752-3745622208-5050-1-S1
ExecutorId string `json:"executor_id"` // ex: 6704d375c68fee1e_k8sm-executor
Name string `json:"name"` // ex: Kubelet-Executor
} `json:"executors"`
} `json:"frameworks"`
}
state := &State{ClusterName: defaultClusterName}
if err := json.Unmarshal(blob, state); err != nil {
return nil, err
}
nodes := map[string]*slaveNode{}
executorSlaveIds := map[string]struct{}{}
for _, f := range state.Frameworks {
for _, e := range f.Executors {
// Note that this simple comparison breaks when we support more than one
// k8s instance in a cluster. At the moment this is not possible for
// a number of reasons.
// TODO(sttts): find way to detect executors of this k8s instance
if e.Name == "Kubelet-Executor" {
executorSlaveIds[e.SlaveId] = struct{}{}
}
}
}
nodes := map[string]*slaveNode{} // by hostname
for _, slave := range state.Slaves {
if slave.Hostname == "" {
continue
@ -264,6 +289,9 @@ func parseMesosState(blob []byte) (*mesosState, error) {
}
log.V(4).Infof("node %q reporting capacity %v", node.hostname, cap)
}
if _, ok := executorSlaveIds[slave.Id]; ok {
node.kubeletRunning = true
}
nodes[node.hostname] = node
}

View File

@ -30,8 +30,16 @@ import (
"k8s.io/kubernetes/pkg/cloudprovider"
)
const (
ProviderName = "mesos"
// KubernetesExecutorName is shared between contrib/mesos and Mesos cloud provider.
// Because cloud provider -> contrib dependencies are forbidden, this constant
// is defined here, not in contrib.
KubernetesExecutorName = "Kubelet-Executor"
)
var (
ProviderName = "mesos"
CloudProvider *MesosCloud
noHostNameSpecified = errors.New("No hostname specified")
@ -208,9 +216,7 @@ func (c *MesosCloud) InstanceID(name string) (string, error) {
return "", nil
}
// List lists instances that match 'filter' which is a regular expression
// which must match the entire instance name (fqdn).
func (c *MesosCloud) List(filter string) ([]string, error) {
func (c *MesosCloud) listNodes() (map[string]*slaveNode, error) {
//TODO(jdef) use a timeout here? 15s?
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
@ -223,6 +229,16 @@ func (c *MesosCloud) List(filter string) ([]string, error) {
log.V(2).Info("no slaves found, are any running?")
return nil, nil
}
return nodes, nil
}
// List lists instances that match 'filter' which is a regular expression
// which must match the entire instance name (fqdn).
func (c *MesosCloud) List(filter string) ([]string, error) {
nodes, err := c.listNodes()
if err != nil {
return nil, err
}
filterRegex, err := regexp.Compile(filter)
if err != nil {
return nil, err
@ -233,7 +249,23 @@ func (c *MesosCloud) List(filter string) ([]string, error) {
addr = append(addr, node.hostname)
}
}
return addr, err
return addr, nil
}
// ListWithKubelet list those instance which have no running kubelet, i.e. the
// Kubernetes executor.
func (c *MesosCloud) ListWithoutKubelet() ([]string, error) {
nodes, err := c.listNodes()
if err != nil {
return nil, err
}
addr := make([]string, 0, len(nodes))
for _, n := range nodes {
if !n.kubeletRunning {
addr = append(addr, n.hostname)
}
}
return addr, nil
}
// NodeAddresses returns the addresses of the specified instance.