From 0c1d90bf5f23ff2c8b070e8488659f006381d3ae Mon Sep 17 00:00:00 2001 From: "Dr. Stefan Schimanski" Date: Thu, 5 Nov 2015 12:29:39 +0100 Subject: [PATCH] Add ListWithoutKubelet to mesos cloud provider --- pkg/cloudprovider/providers/mesos/client.go | 38 ++++++++++++++++--- pkg/cloudprovider/providers/mesos/mesos.go | 42 ++++++++++++++++++--- 2 files changed, 70 insertions(+), 10 deletions(-) diff --git a/pkg/cloudprovider/providers/mesos/client.go b/pkg/cloudprovider/providers/mesos/client.go index da7d5e98551..70d8ada92c3 100644 --- a/pkg/cloudprovider/providers/mesos/client.go +++ b/pkg/cloudprovider/providers/mesos/client.go @@ -49,13 +49,14 @@ type mesosClient struct { } type slaveNode struct { - hostname string - resources *api.NodeResources + hostname string + kubeletRunning bool + resources *api.NodeResources } type mesosState struct { clusterName string - nodes map[string]*slaveNode + nodes map[string]*slaveNode // by hostname } type stateCache struct { @@ -160,7 +161,7 @@ func unpackIPv4(ip uint32) string { return ipv4.String() } -// listSlaves returns a (possibly cached) list of slave nodes. +// listSlaves returns a (possibly cached) map of slave nodes by hostname. // Callers must not mutate the contents of the returned slice. func (c *mesosClient) listSlaves(ctx context.Context) (map[string]*slaveNode, error) { return c.state.nodes(ctx) @@ -225,12 +226,36 @@ func parseMesosState(blob []byte) (*mesosState, error) { Hostname string `json:"hostname"` // ex: 10.22.211.18, or slave-123.nowhere.com Resources map[string]interface{} `json:"resources"` // ex: {"mem": 123, "ports": "[31000-3200]"} } `json:"slaves"` + Frameworks []*struct { + Id string `json:"id"` // ex: 20151105-093752-3745622208-5050-1-0000 + Pid string `json:"pid"` // ex: scheduler(1)@192.168.65.228:57124 + Executors []*struct { + SlaveId string `json:"slave_id"` // ex: 20151105-093752-3745622208-5050-1-S1 + ExecutorId string `json:"executor_id"` // ex: 6704d375c68fee1e_k8sm-executor + Name string `json:"name"` // ex: Kubelet-Executor + } `json:"executors"` + } `json:"frameworks"` } + state := &State{ClusterName: defaultClusterName} if err := json.Unmarshal(blob, state); err != nil { return nil, err } - nodes := map[string]*slaveNode{} + + executorSlaveIds := map[string]struct{}{} + for _, f := range state.Frameworks { + for _, e := range f.Executors { + // Note that this simple comparison breaks when we support more than one + // k8s instance in a cluster. At the moment this is not possible for + // a number of reasons. + // TODO(sttts): find way to detect executors of this k8s instance + if e.Name == "Kubelet-Executor" { + executorSlaveIds[e.SlaveId] = struct{}{} + } + } + } + + nodes := map[string]*slaveNode{} // by hostname for _, slave := range state.Slaves { if slave.Hostname == "" { continue @@ -264,6 +289,9 @@ func parseMesosState(blob []byte) (*mesosState, error) { } log.V(4).Infof("node %q reporting capacity %v", node.hostname, cap) } + if _, ok := executorSlaveIds[slave.Id]; ok { + node.kubeletRunning = true + } nodes[node.hostname] = node } diff --git a/pkg/cloudprovider/providers/mesos/mesos.go b/pkg/cloudprovider/providers/mesos/mesos.go index e376f1a0a1a..538c3de5a10 100644 --- a/pkg/cloudprovider/providers/mesos/mesos.go +++ b/pkg/cloudprovider/providers/mesos/mesos.go @@ -30,8 +30,16 @@ import ( "k8s.io/kubernetes/pkg/cloudprovider" ) +const ( + ProviderName = "mesos" + + // KubernetesExecutorName is shared between contrib/mesos and Mesos cloud provider. + // Because cloud provider -> contrib dependencies are forbidden, this constant + // is defined here, not in contrib. + KubernetesExecutorName = "Kubelet-Executor" +) + var ( - ProviderName = "mesos" CloudProvider *MesosCloud noHostNameSpecified = errors.New("No hostname specified") @@ -208,9 +216,7 @@ func (c *MesosCloud) InstanceID(name string) (string, error) { return "", nil } -// List lists instances that match 'filter' which is a regular expression -// which must match the entire instance name (fqdn). -func (c *MesosCloud) List(filter string) ([]string, error) { +func (c *MesosCloud) listNodes() (map[string]*slaveNode, error) { //TODO(jdef) use a timeout here? 15s? ctx, cancel := context.WithCancel(context.TODO()) defer cancel() @@ -223,6 +229,16 @@ func (c *MesosCloud) List(filter string) ([]string, error) { log.V(2).Info("no slaves found, are any running?") return nil, nil } + return nodes, nil +} + +// List lists instances that match 'filter' which is a regular expression +// which must match the entire instance name (fqdn). +func (c *MesosCloud) List(filter string) ([]string, error) { + nodes, err := c.listNodes() + if err != nil { + return nil, err + } filterRegex, err := regexp.Compile(filter) if err != nil { return nil, err @@ -233,7 +249,23 @@ func (c *MesosCloud) List(filter string) ([]string, error) { addr = append(addr, node.hostname) } } - return addr, err + return addr, nil +} + +// ListWithKubelet list those instance which have no running kubelet, i.e. the +// Kubernetes executor. +func (c *MesosCloud) ListWithoutKubelet() ([]string, error) { + nodes, err := c.listNodes() + if err != nil { + return nil, err + } + addr := make([]string, 0, len(nodes)) + for _, n := range nodes { + if !n.kubeletRunning { + addr = append(addr, n.hostname) + } + } + return addr, nil } // NodeAddresses returns the addresses of the specified instance.