Merge pull request #1997 from ddysher/split-master

Separate minion controller from master.
This commit is contained in:
Brendan Burns 2014-10-31 11:23:58 -07:00
commit 893291d81d
12 changed files with 267 additions and 291 deletions

View File

@ -3,19 +3,13 @@
{% set daemon_args = "" %} {% set daemon_args = "" %}
{% endif %} {% endif %}
{% set machines = ""%}
{% set cloud_provider = "" %} {% set cloud_provider = "" %}
{% set minion_regexp = "-minion_regexp=.*" %}
{% if grains.cloud_provider is defined %} {% if grains.cloud_provider is defined %}
{% set cloud_provider = "-cloud_provider=" + grains.cloud_provider %} {% set cloud_provider = "-cloud_provider=" + grains.cloud_provider %}
{% endif %} {% endif %}
{% set address = "-address=127.0.0.1" %} {% set address = "-address=127.0.0.1" %}
{% if pillar['node_instance_prefix'] is defined %}
{% set minion_regexp = "-minion_regexp='" + pillar['node_instance_prefix'] + ".*'" %}
{% endif %}
{% if grains.etcd_servers is defined %} {% if grains.etcd_servers is defined %}
{% set etcd_servers = "-etcd_servers=http://" + grains.etcd_servers + ":4001" %} {% set etcd_servers = "-etcd_servers=http://" + grains.etcd_servers + ":4001" %}
{% else %} {% else %}
@ -26,28 +20,11 @@
{% if grains.cloud is defined %} {% if grains.cloud is defined %}
{% if grains.cloud == 'gce' %} {% if grains.cloud == 'gce' %}
{% set cloud_provider = "-cloud_provider=gce" %} {% set cloud_provider = "-cloud_provider=gce" %}
{% set machines = "-machines=" + ','.join(salt['mine.get']('roles:kubernetes-pool', 'network.ip_addrs', expr_form='grain').keys()) %}
{% endif %}
{% if grains.cloud == 'azure' %}
MACHINES="{{ salt['mine.get']('roles:kubernetes-pool', 'grains.items', expr_form='grain').values()|join(',', attribute='hostnamef') }}"
{% set machines = "-machines=$MACHINES" %}
{% endif %}
{% if grains.cloud == 'vsphere' %}
# Collect IPs of minions as machines list.
#
# Use a bash array to build the value we need. Jinja 2.7 does support a 'map'
# filter that would simplify this. However, some installations (specifically
# Debian Wheezy) only install Jinja 2.6.
MACHINE_IPS=()
{% for addrs in salt['mine.get']('roles:kubernetes-pool', 'network.ip_addrs', expr_form='grain').values() %}
MACHINE_IPS+=( {{ addrs[0] }} )
{% endfor %}
{% set machines = "-machines=$(echo ${MACHINE_IPS[@]} | xargs -n1 echo | paste -sd,)" %}
{% set minion_regexp = "" %}
{% endif %} {% endif %}
{% endif %} {% endif %}
{% if pillar['portal_net'] is defined %} {% if pillar['portal_net'] is defined %}
{% set portal_net = "-portal_net=" + pillar['portal_net'] %} {% set portal_net = "-portal_net=" + pillar['portal_net'] %}
{% endif %} {% endif %}
DAEMON_ARGS="{{daemon_args}} {{address}} {{machines}} {{etcd_servers}} {{ minion_regexp }} {{ cloud_provider }} --allow_privileged={{pillar['allow_privileged']}} {{portal_net}}" DAEMON_ARGS="{{daemon_args}} {{address}} {{etcd_servers}} {{ cloud_provider }} --allow_privileged={{pillar['allow_privileged']}} {{portal_net}}"

View File

@ -2,5 +2,42 @@
{% if grains['os_family'] == 'RedHat' %} {% if grains['os_family'] == 'RedHat' %}
{% set daemon_args = "" %} {% set daemon_args = "" %}
{% endif %} {% endif %}
{% set master="-master=127.0.0.1:8080" %} {% set master="-master=127.0.0.1:8080" %}
DAEMON_ARGS="{{daemon_args}} {{master}}"
{% set machines = ""%}
{% set cloud_provider = "" %}
{% set minion_regexp = "-minion_regexp=.*" %}
{% if grains.cloud_provider is defined %}
{% set cloud_provider = "-cloud_provider=" + grains.cloud_provider %}
{% endif %}
{% if pillar['node_instance_prefix'] is defined %}
{% set minion_regexp = "-minion_regexp='" + pillar['node_instance_prefix'] + ".*'" %}
{% endif %}
{% if grains.cloud is defined %}
{% if grains.cloud == 'gce' %}
{% set cloud_provider = "-cloud_provider=gce" %}
{% set machines = "-machines=" + ','.join(salt['mine.get']('roles:kubernetes-pool', 'network.ip_addrs', expr_form='grain').keys()) %}
{% endif %}
{% if grains.cloud == 'azure' %}
MACHINES="{{ salt['mine.get']('roles:kubernetes-pool', 'grains.items', expr_form='grain').values()|join(',', attribute='hostnamef') }}"
{% set machines = "-machines=$MACHINES" %}
{% endif %}
{% if grains.cloud == 'vsphere' %}
# Collect IPs of minions as machines list.
#
# Use a bash array to build the value we need. Jinja 2.7 does support a 'map'
# filter that would simplify this. However, some installations (specifically
# Debian Wheezy) only install Jinja 2.6.
MACHINE_IPS=()
{% for addrs in salt['mine.get']('roles:kubernetes-pool', 'network.ip_addrs', expr_form='grain').values() %}
MACHINE_IPS+=( {{ addrs[0] }} )
{% endfor %}
{% set machines = "-machines=$(echo ${MACHINE_IPS[@]} | xargs -n1 echo | paste -sd,)" %}
{% set minion_regexp = "" %}
{% endif %}
{% endif %}
DAEMON_ARGS="{{daemon_args}} {{master}} {{machines}} {{ minion_regexp }} {{ cloud_provider }}"

View File

@ -22,17 +22,14 @@ import (
"flag" "flag"
"net" "net"
"net/http" "net/http"
"os"
"strconv" "strconv"
"time" "time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver" "github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver"
"github.com/GoogleCloudPlatform/kubernetes/pkg/capabilities" "github.com/GoogleCloudPlatform/kubernetes/pkg/capabilities"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider" "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider"
"github.com/GoogleCloudPlatform/kubernetes/pkg/master" "github.com/GoogleCloudPlatform/kubernetes/pkg/master"
"github.com/GoogleCloudPlatform/kubernetes/pkg/resources"
"github.com/GoogleCloudPlatform/kubernetes/pkg/tools" "github.com/GoogleCloudPlatform/kubernetes/pkg/tools"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/version/verflag" "github.com/GoogleCloudPlatform/kubernetes/pkg/version/verflag"
@ -63,22 +60,16 @@ var (
storageVersion = flag.String("storage_version", "", "The version to store resources with. Defaults to server preferred") storageVersion = flag.String("storage_version", "", "The version to store resources with. Defaults to server preferred")
cloudProvider = flag.String("cloud_provider", "", "The provider for cloud services. Empty string for no provider.") cloudProvider = flag.String("cloud_provider", "", "The provider for cloud services. Empty string for no provider.")
cloudConfigFile = flag.String("cloud_config", "", "The path to the cloud provider configuration file. Empty string for no configuration file.") cloudConfigFile = flag.String("cloud_config", "", "The path to the cloud provider configuration file. Empty string for no configuration file.")
minionRegexp = flag.String("minion_regexp", "", "If non empty, and -cloud_provider is specified, a regular expression for matching minion VMs.")
healthCheckMinions = flag.Bool("health_check_minions", true, "If true, health check minions and filter unhealthy ones. Default true.") healthCheckMinions = flag.Bool("health_check_minions", true, "If true, health check minions and filter unhealthy ones. Default true.")
minionCacheTTL = flag.Duration("minion_cache_ttl", 30*time.Second, "Duration of time to cache minion information. Default 30 seconds.")
eventTTL = flag.Duration("event_ttl", 48*time.Hour, "Amount of time to retain events. Default 2 days.") eventTTL = flag.Duration("event_ttl", 48*time.Hour, "Amount of time to retain events. Default 2 days.")
tokenAuthFile = flag.String("token_auth_file", "", "If set, the file that will be used to secure the API server via token authentication.") tokenAuthFile = flag.String("token_auth_file", "", "If set, the file that will be used to secure the API server via token authentication.")
etcdServerList util.StringList etcdServerList util.StringList
etcdConfigFile = flag.String("etcd_config", "", "The config file for the etcd client. Mutually exclusive with -etcd_servers.") etcdConfigFile = flag.String("etcd_config", "", "The config file for the etcd client. Mutually exclusive with -etcd_servers.")
machineList util.StringList
corsAllowedOriginList util.StringList corsAllowedOriginList util.StringList
allowPrivileged = flag.Bool("allow_privileged", false, "If true, allow privileged containers.") allowPrivileged = flag.Bool("allow_privileged", false, "If true, allow privileged containers.")
portalNet util.IPNet // TODO: make this a list portalNet util.IPNet // TODO: make this a list
// TODO: Discover these by pinging the host machines, and rip out these flags. enableLogsSupport = flag.Bool("enable_logs_support", true, "Enables server endpoint for log collection")
nodeMilliCPU = flag.Int("node_milli_cpu", 1000, "The amount of MilliCPU provisioned on each node") kubeletConfig = client.KubeletConfig{
nodeMemory = flag.Int("node_memory", 3*1024*1024*1024, "The amount of memory (in bytes) provisioned on each node")
enableLogsSupport = flag.Bool("enable_logs_support", true, "Enables server endpoint for log collection")
kubeletConfig = client.KubeletConfig{
Port: 10250, Port: 10250,
EnableHttps: false, EnableHttps: false,
} }
@ -87,24 +78,11 @@ var (
func init() { func init() {
flag.Var(&address, "address", "The IP address on to serve on (set to 0.0.0.0 for all interfaces)") flag.Var(&address, "address", "The IP address on to serve on (set to 0.0.0.0 for all interfaces)")
flag.Var(&etcdServerList, "etcd_servers", "List of etcd servers to watch (http://ip:port), comma separated. Mutually exclusive with -etcd_config") flag.Var(&etcdServerList, "etcd_servers", "List of etcd servers to watch (http://ip:port), comma separated. Mutually exclusive with -etcd_config")
flag.Var(&machineList, "machines", "List of machines to schedule onto, comma separated.")
flag.Var(&corsAllowedOriginList, "cors_allowed_origins", "List of allowed origins for CORS, comma separated. An allowed origin can be a regular expression to support subdomain matching. If this list is empty CORS will not be enabled.") flag.Var(&corsAllowedOriginList, "cors_allowed_origins", "List of allowed origins for CORS, comma separated. An allowed origin can be a regular expression to support subdomain matching. If this list is empty CORS will not be enabled.")
flag.Var(&portalNet, "portal_net", "A CIDR notation IP range from which to assign portal IPs. This must not overlap with any IP ranges assigned to nodes for pods.") flag.Var(&portalNet, "portal_net", "A CIDR notation IP range from which to assign portal IPs. This must not overlap with any IP ranges assigned to nodes for pods.")
client.BindKubeletClientConfigFlags(flag.CommandLine, &kubeletConfig) client.BindKubeletClientConfigFlags(flag.CommandLine, &kubeletConfig)
} }
func verifyMinionFlags() {
if *cloudProvider == "" || *minionRegexp == "" {
if len(machineList) == 0 {
glog.Info("No machines specified!")
}
return
}
if len(machineList) != 0 {
glog.Info("-machines is overwritten by -minion_regexp")
}
}
// TODO: Longer term we should read this from some config store, rather than a flag. // TODO: Longer term we should read this from some config store, rather than a flag.
func verifyPortalFlags() { func verifyPortalFlags() {
if portalNet.IP == nil { if portalNet.IP == nil {
@ -112,37 +90,6 @@ func verifyPortalFlags() {
} }
} }
func initCloudProvider(name string, configFilePath string) cloudprovider.Interface {
var config *os.File
if name == "" {
glog.Info("No cloud provider specified.")
return nil
}
if configFilePath != "" {
var err error
config, err = os.Open(configFilePath)
if err != nil {
glog.Fatalf("Couldn't open cloud provider configuration %s: %#v",
configFilePath, err)
}
defer config.Close()
}
cloud, err := cloudprovider.GetCloudProvider(name, config)
if err != nil {
glog.Fatalf("Couldn't init cloud provider %q: %#v", name, err)
}
if cloud == nil {
glog.Fatalf("Unknown cloud provider: %s", name)
}
return cloud
}
func newEtcd(etcdConfigFile string, etcdServerList util.StringList) (helper tools.EtcdHelper, err error) { func newEtcd(etcdConfigFile string, etcdServerList util.StringList) (helper tools.EtcdHelper, err error) {
var client tools.EtcdGetSet var client tools.EtcdGetSet
if etcdConfigFile != "" { if etcdConfigFile != "" {
@ -163,7 +110,6 @@ func main() {
defer util.FlushLogs() defer util.FlushLogs()
verflag.PrintAndExitIfRequested() verflag.PrintAndExitIfRequested()
verifyMinionFlags()
verifyPortalFlags() verifyPortalFlags()
if (*etcdConfigFile != "" && len(etcdServerList) != 0) || (*etcdConfigFile == "" && len(etcdServerList) == 0) { if (*etcdConfigFile != "" && len(etcdServerList) != 0) || (*etcdConfigFile == "" && len(etcdServerList) == 0) {
@ -174,7 +120,7 @@ func main() {
AllowPrivileged: *allowPrivileged, AllowPrivileged: *allowPrivileged,
}) })
cloud := initCloudProvider(*cloudProvider, *cloudConfigFile) cloud := cloudprovider.InitCloudProvider(*cloudProvider, *cloudConfigFile)
kubeletClient, err := client.NewKubeletClient(&kubeletConfig) kubeletClient, err := client.NewKubeletClient(&kubeletConfig)
if err != nil { if err != nil {
@ -198,31 +144,21 @@ func main() {
n := net.IPNet(portalNet) n := net.IPNet(portalNet)
config := &master.Config{ config := &master.Config{
Client: client, Client: client,
Cloud: cloud, Cloud: cloud,
EtcdHelper: helper, EtcdHelper: helper,
HealthCheckMinions: *healthCheckMinions, HealthCheckMinions: *healthCheckMinions,
Minions: machineList, EventTTL: *eventTTL,
MinionCacheTTL: *minionCacheTTL, KubeletClient: kubeletClient,
EventTTL: *eventTTL,
MinionRegexp: *minionRegexp,
KubeletClient: kubeletClient,
NodeResources: api.NodeResources{
Capacity: api.ResourceList{
resources.CPU: util.NewIntOrStringFromInt(*nodeMilliCPU),
resources.Memory: util.NewIntOrStringFromInt(*nodeMemory),
},
},
PortalNet: &n, PortalNet: &n,
EnableLogsSupport: *enableLogsSupport, EnableLogsSupport: *enableLogsSupport,
EnableUISupport: true, EnableUISupport: true,
APIPrefix: *apiPrefix, APIPrefix: *apiPrefix,
CorsAllowedOriginList: corsAllowedOriginList, CorsAllowedOriginList: corsAllowedOriginList,
TokenAuthFile: *tokenAuthFile, TokenAuthFile: *tokenAuthFile,
ReadOnlyPort: *readOnlyPort,
ReadOnlyPort: *readOnlyPort, ReadWritePort: *port,
ReadWritePort: *port, PublicAddress: *publicAddressOverride,
PublicAddress: *publicAddressOverride,
} }
m := master.New(config) m := master.New(config)

View File

@ -27,10 +27,14 @@ import (
"strconv" "strconv"
"time" "time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/controller" "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider"
minionControllerPkg "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/controller"
replicationControllerPkg "github.com/GoogleCloudPlatform/kubernetes/pkg/controller"
_ "github.com/GoogleCloudPlatform/kubernetes/pkg/healthz" _ "github.com/GoogleCloudPlatform/kubernetes/pkg/healthz"
"github.com/GoogleCloudPlatform/kubernetes/pkg/master/ports" "github.com/GoogleCloudPlatform/kubernetes/pkg/master/ports"
"github.com/GoogleCloudPlatform/kubernetes/pkg/resources"
"github.com/GoogleCloudPlatform/kubernetes/pkg/service" "github.com/GoogleCloudPlatform/kubernetes/pkg/service"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/version/verflag" "github.com/GoogleCloudPlatform/kubernetes/pkg/version/verflag"
@ -38,22 +42,43 @@ import (
) )
var ( var (
port = flag.Int("port", ports.ControllerManagerPort, "The port that the controller-manager's http service runs on") port = flag.Int("port", ports.ControllerManagerPort, "The port that the controller-manager's http service runs on")
address = util.IP(net.ParseIP("127.0.0.1")) address = util.IP(net.ParseIP("127.0.0.1"))
clientConfig = &client.Config{} clientConfig = &client.Config{}
cloudProvider = flag.String("cloud_provider", "", "The provider for cloud services. Empty string for no provider.")
cloudConfigFile = flag.String("cloud_config", "", "The path to the cloud provider configuration file. Empty string for no configuration file.")
minionRegexp = flag.String("minion_regexp", "", "If non empty, and -cloud_provider is specified, a regular expression for matching minion VMs.")
machineList util.StringList
// TODO: Discover these by pinging the host machines, and rip out these flags.
nodeMilliCPU = flag.Int("node_milli_cpu", 1000, "The amount of MilliCPU provisioned on each node")
nodeMemory = flag.Int("node_memory", 3*1024*1024*1024, "The amount of memory (in bytes) provisioned on each node")
) )
func init() { func init() {
flag.Var(&address, "address", "The IP address to serve on (set to 0.0.0.0 for all interfaces)") flag.Var(&address, "address", "The IP address to serve on (set to 0.0.0.0 for all interfaces)")
flag.Var(&machineList, "machines", "List of machines to schedule onto, comma separated.")
client.BindClientConfigFlags(flag.CommandLine, clientConfig) client.BindClientConfigFlags(flag.CommandLine, clientConfig)
} }
func verifyMinionFlags() {
if *cloudProvider == "" || *minionRegexp == "" {
if len(machineList) == 0 {
glog.Info("No machines specified!")
}
return
}
if len(machineList) != 0 {
glog.Info("-machines is overwritten by -minion_regexp")
}
}
func main() { func main() {
flag.Parse() flag.Parse()
util.InitLogs() util.InitLogs()
defer util.FlushLogs() defer util.FlushLogs()
verflag.PrintAndExitIfRequested() verflag.PrintAndExitIfRequested()
verifyMinionFlags()
if len(clientConfig.Host) == 0 { if len(clientConfig.Host) == 0 {
glog.Fatal("usage: controller-manager -master <master>") glog.Fatal("usage: controller-manager -master <master>")
@ -69,8 +94,18 @@ func main() {
endpoints := service.NewEndpointController(kubeClient) endpoints := service.NewEndpointController(kubeClient)
go util.Forever(func() { endpoints.SyncServiceEndpoints() }, time.Second*10) go util.Forever(func() { endpoints.SyncServiceEndpoints() }, time.Second*10)
controllerManager := controller.NewReplicationManager(kubeClient) controllerManager := replicationControllerPkg.NewReplicationManager(kubeClient)
controllerManager.Run(10 * time.Second) controllerManager.Run(10 * time.Second)
cloud := cloudprovider.InitCloudProvider(*cloudProvider, *cloudConfigFile)
nodeResources := &api.NodeResources{
Capacity: api.ResourceList{
resources.CPU: util.NewIntOrStringFromInt(*nodeMilliCPU),
resources.Memory: util.NewIntOrStringFromInt(*nodeMemory),
},
}
minionController := minionControllerPkg.NewMinionController(cloud, *minionRegexp, machineList, nodeResources, kubeClient)
minionController.Run(10 * time.Second)
select {} select {}
} }

View File

@ -0,0 +1,28 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package main
// This file exists to force the desired plugin implementations to be linked.
// This should probably be part of some configuration fed into the build for a
// given binary target.
import (
_ "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/aws"
_ "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/gce"
_ "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/openstack"
_ "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/ovirt"
_ "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/vagrant"
)

View File

@ -37,7 +37,8 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/latest" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/latest"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/testapi" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/testapi"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/controller" minionControllerPkg "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/controller"
replicationControllerPkg "github.com/GoogleCloudPlatform/kubernetes/pkg/controller"
"github.com/GoogleCloudPlatform/kubernetes/pkg/health" "github.com/GoogleCloudPlatform/kubernetes/pkg/health"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/config" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/config"
@ -137,11 +138,11 @@ func startComponents(manifestURL string) (apiServerURL string) {
if err != nil { if err != nil {
glog.Fatalf("Nonnumeric port? %v", err) glog.Fatalf("Nonnumeric port? %v", err)
} }
// Create a master and install handlers into mux. // Create a master and install handlers into mux.
m := master.New(&master.Config{ m := master.New(&master.Config{
Client: cl, Client: cl,
EtcdHelper: helper, EtcdHelper: helper,
Minions: machineList,
KubeletClient: fakeKubeletClient{}, KubeletClient: fakeKubeletClient{},
EnableLogsSupport: false, EnableLogsSupport: false,
APIPrefix: "/api", APIPrefix: "/api",
@ -160,12 +161,16 @@ func startComponents(manifestURL string) (apiServerURL string) {
endpoints := service.NewEndpointController(cl) endpoints := service.NewEndpointController(cl)
go util.Forever(func() { endpoints.SyncServiceEndpoints() }, time.Second*10) go util.Forever(func() { endpoints.SyncServiceEndpoints() }, time.Second*10)
controllerManager := controller.NewReplicationManager(cl) controllerManager := replicationControllerPkg.NewReplicationManager(cl)
// Prove that controllerManager's watch works by making it not sync until after this // Prove that controllerManager's watch works by making it not sync until after this
// test is over. (Hopefully we don't take 10 minutes!) // test is over. (Hopefully we don't take 10 minutes!)
controllerManager.Run(10 * time.Minute) controllerManager.Run(10 * time.Minute)
nodeResources := &api.NodeResources{}
minionController := minionControllerPkg.NewMinionController(nil, "", machineList, nodeResources, cl)
minionController.Run(10 * time.Second)
// Kubelet (localhost) // Kubelet (localhost)
os.MkdirAll(testRootDir, 0750) os.MkdirAll(testRootDir, 0750)
cfg1 := config.NewPodConfig(config.PodConfigNotificationSnapshotAndUpdates) cfg1 := config.NewPodConfig(config.PodConfigNotificationSnapshotAndUpdates)

View File

@ -49,6 +49,7 @@ ETCD_PORT=${ETCD_PORT:-4001}
API_PORT=${API_PORT:-8080} API_PORT=${API_PORT:-8080}
API_HOST=${API_HOST:-127.0.0.1} API_HOST=${API_HOST:-127.0.0.1}
KUBELET_PORT=${KUBELET_PORT:-10250} KUBELET_PORT=${KUBELET_PORT:-10250}
CTLRMGR_PORT=${CTLRMGR_PORT:-10252}
GO_OUT=${KUBE_TARGET}/bin GO_OUT=${KUBE_TARGET}/bin
# Check kubectl # Check kubectl
@ -70,14 +71,20 @@ ${GO_OUT}/apiserver \
--address="127.0.0.1" \ --address="127.0.0.1" \
--port="${API_PORT}" \ --port="${API_PORT}" \
--etcd_servers="http://${ETCD_HOST}:${ETCD_PORT}" \ --etcd_servers="http://${ETCD_HOST}:${ETCD_PORT}" \
--machines="127.0.0.1" \
--kubelet_port=${KUBELET_PORT} \ --kubelet_port=${KUBELET_PORT} \
--portal_net="10.0.0.0/24" 1>&2 & --portal_net="10.0.0.0/24" 1>&2 &
APISERVER_PID=$! APISERVER_PID=$!
wait_for_url "http://127.0.0.1:${API_PORT}/healthz" "apiserver: " wait_for_url "http://127.0.0.1:${API_PORT}/healthz" "apiserver: "
# Start controller manager
${GO_OUT}/controller-manager \
--machines="127.0.0.1" \
--master="127.0.0.1:${API_PORT}" 1>&2 &
CTLRMGR_PID=$!
wait_for_url "http://127.0.0.1:${CTLRMGR_PORT}/healthz" "controller-manager: "
KUBE_CMD="${GO_OUT}/kubectl" KUBE_CMD="${GO_OUT}/kubectl"
KUBE_FLAGS="-s http://127.0.0.1:${API_PORT} --match-server-version" KUBE_FLAGS="-s http://127.0.0.1:${API_PORT} --match-server-version"
@ -93,12 +100,6 @@ ${KUBE_CMD} get minions ${KUBE_FLAGS}
${KUBE_CMD} get minions 127.0.0.1 ${KUBE_FLAGS} ${KUBE_CMD} get minions 127.0.0.1 ${KUBE_FLAGS}
echo "kubectl(minions): ok" echo "kubectl(minions): ok"
# Start controller manager
#${GO_OUT}/controller-manager \
# --etcd_servers="http://127.0.0.1:${ETCD_PORT}" \
# --master="127.0.0.1:${API_PORT}" 1>&2 &
#CTLRMGR_PID=$!
# Start proxy # Start proxy
#PROXY_LOG=/tmp/kube-proxy.log #PROXY_LOG=/tmp/kube-proxy.log
#${GO_OUT}/proxy \ #${GO_OUT}/proxy \

View File

@ -16,9 +16,7 @@ limitations under the License.
package client package client
import ( import "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
)
type MinionsInterface interface { type MinionsInterface interface {
Minions() MinionInterface Minions() MinionInterface
@ -49,17 +47,17 @@ func (c *minions) Create(minion *api.Minion) (*api.Minion, error) {
} }
// List lists all the minions in the cluster. // List lists all the minions in the cluster.
func (c *minions) List() (result *api.MinionList, err error) { func (c *minions) List() (*api.MinionList, error) {
result = &api.MinionList{} result := &api.MinionList{}
err = c.r.Get().Path("minions").Do().Into(result) err := c.r.Get().Path("minions").Do().Into(result)
return return result, err
} }
// Get gets an existing minion // Get gets an existing minion
func (c *minions) Get(id string) (result *api.Minion, err error) { func (c *minions) Get(id string) (*api.Minion, error) {
result = &api.Minion{} result := &api.Minion{}
err = c.r.Get().Path("minions").Path(id).Do().Into(result) err := c.r.Get().Path("minions").Path(id).Do().Into(result)
return return result, err
} }
// Delete deletes an existing minion. // Delete deletes an existing minion.

View File

@ -21,8 +21,8 @@ import (
"time" "time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider" "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/minion"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/golang/glog" "github.com/golang/glog"
) )
@ -31,31 +31,38 @@ type MinionController struct {
cloud cloudprovider.Interface cloud cloudprovider.Interface
matchRE string matchRE string
staticResources *api.NodeResources staticResources *api.NodeResources
registry minion.Registry minions []string
period time.Duration kubeClient client.Interface
} }
// NewMinionController returns a new minion controller to sync instances from cloudprovider. // NewMinionController returns a new minion controller to sync instances from cloudprovider.
func NewMinionController( func NewMinionController(
cloud cloudprovider.Interface, cloud cloudprovider.Interface,
matchRE string, matchRE string,
minions []string,
staticResources *api.NodeResources, staticResources *api.NodeResources,
registry minion.Registry, kubeClient client.Interface) *MinionController {
period time.Duration) *MinionController {
return &MinionController{ return &MinionController{
cloud: cloud, cloud: cloud,
matchRE: matchRE, matchRE: matchRE,
minions: minions,
staticResources: staticResources, staticResources: staticResources,
registry: registry, kubeClient: kubeClient,
period: period,
} }
} }
// Run starts syncing instances from cloudprovider periodically. // Run starts syncing instances from cloudprovider periodically, or create initial minion list.
func (s *MinionController) Run() { func (s *MinionController) Run(period time.Duration) {
// Call Sync() first to warm up minion registry. if s.cloud != nil && len(s.matchRE) > 0 {
s.Sync() go util.Forever(func() { s.Sync() }, period)
go util.Forever(func() { s.Sync() }, s.period) } else {
for _, minionID := range s.minions {
s.kubeClient.Minions().Create(&api.Minion{
ObjectMeta: api.ObjectMeta{Name: minionID},
NodeResources: *s.staticResources,
})
}
}
} }
// Sync syncs list of instances from cloudprovider to master etcd registry. // Sync syncs list of instances from cloudprovider to master etcd registry.
@ -64,7 +71,7 @@ func (s *MinionController) Sync() error {
if err != nil { if err != nil {
return err return err
} }
minions, err := s.registry.ListMinions(nil) minions, err := s.kubeClient.Minions().List()
if err != nil { if err != nil {
return err return err
} }
@ -77,20 +84,14 @@ func (s *MinionController) Sync() error {
for _, minion := range matches.Items { for _, minion := range matches.Items {
if _, ok := minionMap[minion.Name]; !ok { if _, ok := minionMap[minion.Name]; !ok {
glog.Infof("Create minion in registry: %s", minion.Name) glog.Infof("Create minion in registry: %s", minion.Name)
err = s.registry.CreateMinion(nil, &minion) s.kubeClient.Minions().Create(&minion)
if err != nil {
return err
}
} }
delete(minionMap, minion.Name) delete(minionMap, minion.Name)
} }
for minionID := range minionMap { for minionID := range minionMap {
glog.Infof("Delete minion from registry: %s", minionID) glog.Infof("Delete minion from registry: %s", minionID)
err = s.registry.DeleteMinion(nil, minionID) s.kubeClient.Minions().Delete(minionID)
if err != nil {
return err
}
} }
return nil return nil
} }

View File

@ -17,161 +17,103 @@ limitations under the License.
package controller package controller
import ( import (
"fmt"
"net/http"
"net/http/httptest"
"testing" "testing"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/latest" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/testapi"
fake_cloud "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/fake" "github.com/GoogleCloudPlatform/kubernetes/pkg/client"
etcdregistry "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/etcd" "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/fake"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/pod"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/registrytest"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/tools" "github.com/GoogleCloudPlatform/kubernetes/pkg/util"
etcd "github.com/coreos/go-etcd/etcd"
) )
func NewTestEtcdRegistry(client tools.EtcdClient) *etcdregistry.Registry { func newMinionList(count int) api.MinionList {
registry := etcdregistry.NewRegistry( minions := []api.Minion{}
tools.EtcdHelper{client, latest.Codec, tools.RuntimeVersionAdapter{latest.ResourceVersioner}}, for i := 0; i < count; i++ {
&pod.BasicBoundPodFactory{ minions = append(minions, api.Minion{
ServiceRegistry: &registrytest.ServiceRegistry{}, ObjectMeta: api.ObjectMeta{
}, Name: fmt.Sprintf("minion%d", i),
) },
return registry })
}
return api.MinionList{
Items: minions,
}
}
type serverResponse struct {
statusCode int
obj interface{}
}
func makeTestServer(t *testing.T, minionResponse serverResponse) (*httptest.Server, *util.FakeHandler) {
fakeMinionHandler := util.FakeHandler{
StatusCode: minionResponse.statusCode,
ResponseBody: util.EncodeJSON(minionResponse.obj),
}
mux := http.NewServeMux()
mux.Handle("/api/"+testapi.Version()+"/minions", &fakeMinionHandler)
mux.Handle("/api/"+testapi.Version()+"/minions/", &fakeMinionHandler)
mux.HandleFunc("/", func(res http.ResponseWriter, req *http.Request) {
t.Errorf("unexpected request: %v", req.RequestURI)
res.WriteHeader(http.StatusNotFound)
})
return httptest.NewServer(mux), &fakeMinionHandler
} }
func TestSyncCreateMinion(t *testing.T) { func TestSyncCreateMinion(t *testing.T) {
ctx := api.NewContext() testServer, minionHandler := makeTestServer(t,
fakeClient := tools.NewFakeEtcdClient(t) serverResponse{http.StatusOK, newMinionList(1)})
m1 := runtime.EncodeOrDie(latest.Codec, &api.Minion{ObjectMeta: api.ObjectMeta{Name: "m1"}}) defer testServer.Close()
m2 := runtime.EncodeOrDie(latest.Codec, &api.Minion{ObjectMeta: api.ObjectMeta{Name: "m2"}}) client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()})
fakeClient.Set("/registry/minions/m1", m1, 0) instances := []string{"minion0", "minion1"}
fakeClient.Set("/registry/minions/m2", m2, 0)
fakeClient.ExpectNotFoundGet("/registry/minions/m3")
fakeClient.Data["/registry/minions"] = tools.EtcdResponseWithError{
R: &etcd.Response{
Node: &etcd.Node{
Nodes: []*etcd.Node{
{Value: m1},
{Value: m2},
},
},
},
E: nil,
}
registry := NewTestEtcdRegistry(fakeClient)
instances := []string{"m1", "m2", "m3"}
fakeCloud := fake_cloud.FakeCloud{ fakeCloud := fake_cloud.FakeCloud{
Machines: instances, Machines: instances,
} }
minionController := NewMinionController(&fakeCloud, ".*", nil, registry, time.Second) minionController := NewMinionController(&fakeCloud, ".*", nil, nil, client)
if err := minionController.Sync(); err != nil {
minion, err := registry.GetMinion(ctx, "m3")
if minion != nil {
t.Errorf("Unexpected contains")
}
err = minionController.Sync()
if err != nil {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
} }
minion, err = registry.GetMinion(ctx, "m3") data := runtime.EncodeOrDie(testapi.Codec(), &api.Minion{ObjectMeta: api.ObjectMeta{Name: "minion1"}})
if minion == nil { minionHandler.ValidateRequest(t, "/api/"+testapi.Version()+"/minions", "POST", &data)
t.Errorf("Unexpected !contains")
}
} }
func TestSyncDeleteMinion(t *testing.T) { func TestSyncDeleteMinion(t *testing.T) {
ctx := api.NewContext() testServer, minionHandler := makeTestServer(t,
fakeClient := tools.NewFakeEtcdClient(t) serverResponse{http.StatusOK, newMinionList(2)})
m1 := runtime.EncodeOrDie(latest.Codec, &api.Minion{ObjectMeta: api.ObjectMeta{Name: "m1"}}) defer testServer.Close()
m2 := runtime.EncodeOrDie(latest.Codec, &api.Minion{ObjectMeta: api.ObjectMeta{Name: "m2"}}) client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()})
m3 := runtime.EncodeOrDie(latest.Codec, &api.Minion{ObjectMeta: api.ObjectMeta{Name: "m3"}}) instances := []string{"minion0"}
fakeClient.Set("/registry/minions/m1", m1, 0)
fakeClient.Set("/registry/minions/m2", m2, 0)
fakeClient.Set("/registry/minions/m3", m3, 0)
fakeClient.Data["/registry/minions"] = tools.EtcdResponseWithError{
R: &etcd.Response{
Node: &etcd.Node{
Nodes: []*etcd.Node{
{Value: m1},
{Value: m2},
{Value: m3},
},
},
},
E: nil,
}
registry := NewTestEtcdRegistry(fakeClient)
instances := []string{"m1", "m2"}
fakeCloud := fake_cloud.FakeCloud{ fakeCloud := fake_cloud.FakeCloud{
Machines: instances, Machines: instances,
} }
minionController := NewMinionController(&fakeCloud, ".*", nil, registry, time.Second) minionController := NewMinionController(&fakeCloud, ".*", nil, nil, client)
if err := minionController.Sync(); err != nil {
minion, err := registry.GetMinion(ctx, "m3")
if minion == nil {
t.Errorf("Unexpected !contains")
}
err = minionController.Sync()
if err != nil {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
} }
minion, err = registry.GetMinion(ctx, "m3") minionHandler.ValidateRequest(t, "/api/"+testapi.Version()+"/minions/minion1", "DELETE", nil)
if minion != nil {
t.Errorf("Unexpected contains")
}
} }
func TestSyncMinionRegexp(t *testing.T) { func TestSyncMinionRegexp(t *testing.T) {
ctx := api.NewContext() testServer, minionHandler := makeTestServer(t,
fakeClient := tools.NewFakeEtcdClient(t) serverResponse{http.StatusOK, newMinionList(1)})
fakeClient.Data["/registry/minions"] = tools.EtcdResponseWithError{ defer testServer.Close()
R: &etcd.Response{ client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()})
Node: &etcd.Node{ instances := []string{"minion0", "minion1", "node0"}
Nodes: []*etcd.Node{},
},
},
E: nil,
}
registry := NewTestEtcdRegistry(fakeClient)
instances := []string{"m1", "m2", "n1", "n2"}
fakeCloud := fake_cloud.FakeCloud{ fakeCloud := fake_cloud.FakeCloud{
Machines: instances, Machines: instances,
} }
minionController := NewMinionController(&fakeCloud, "m[0-9]+", nil, registry, time.Second) minionController := NewMinionController(&fakeCloud, "minion[0-9]+", nil, nil, client)
if err := minionController.Sync(); err != nil {
err := minionController.Sync()
if err != nil {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
} }
var minion *api.Minion // Only minion1 is created.
fakeClient.ExpectNotFoundGet("/registry/minions/n1") data := runtime.EncodeOrDie(testapi.Codec(), &api.Minion{ObjectMeta: api.ObjectMeta{Name: "minion1"}})
fakeClient.ExpectNotFoundGet("/registry/minions/n2") minionHandler.ValidateRequest(t, "/api/"+testapi.Version()+"/minions", "POST", &data)
minion, err = registry.GetMinion(ctx, "m1")
if minion == nil {
t.Errorf("Unexpected !contains")
}
minion, err = registry.GetMinion(ctx, "m2")
if minion == nil {
t.Errorf("Unexpected !contains")
}
minion, err = registry.GetMinion(ctx, "n1")
if minion != nil {
t.Errorf("Unexpected !contains")
}
minion, err = registry.GetMinion(ctx, "n2")
if minion != nil {
t.Errorf("Unexpected !contains")
}
} }

View File

@ -18,6 +18,7 @@ package cloudprovider
import ( import (
"io" "io"
"os"
"sync" "sync"
"github.com/golang/glog" "github.com/golang/glog"
@ -60,3 +61,35 @@ func GetCloudProvider(name string, config io.Reader) (Interface, error) {
} }
return f(config) return f(config)
} }
// InitCloudProvider creates an instance of the named cloud provider.
func InitCloudProvider(name string, configFilePath string) Interface {
var config *os.File
if name == "" {
glog.Info("No cloud provider specified.")
return nil
}
if configFilePath != "" {
var err error
config, err = os.Open(configFilePath)
if err != nil {
glog.Fatalf("Couldn't open cloud provider configuration %s: %#v",
configFilePath, err)
}
defer config.Close()
}
cloud, err := GetCloudProvider(name, config)
if err != nil {
glog.Fatalf("Couldn't init cloud provider %q: %#v", name, err)
}
if cloud == nil {
glog.Fatalf("Unknown cloud provider: %s", name)
}
return cloud
}

View File

@ -23,7 +23,6 @@ import (
"strings" "strings"
"time" "time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/latest" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/latest"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/v1beta1" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/v1beta1"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/v1beta2" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/v1beta2"
@ -34,7 +33,6 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/auth/handlers" "github.com/GoogleCloudPlatform/kubernetes/pkg/auth/handlers"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider" "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider"
cloudcontroller "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/controller"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/binding" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/binding"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/controller" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/controller"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/endpoint" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/endpoint"
@ -58,12 +56,9 @@ type Config struct {
Cloud cloudprovider.Interface Cloud cloudprovider.Interface
EtcdHelper tools.EtcdHelper EtcdHelper tools.EtcdHelper
HealthCheckMinions bool HealthCheckMinions bool
Minions []string
MinionCacheTTL time.Duration
EventTTL time.Duration EventTTL time.Duration
MinionRegexp string MinionRegexp string
KubeletClient client.KubeletClient KubeletClient client.KubeletClient
NodeResources api.NodeResources
PortalNet *net.IPNet PortalNet *net.IPNet
Mux apiserver.Mux Mux apiserver.Mux
EnableLogsSupport bool EnableLogsSupport bool
@ -265,18 +260,6 @@ func (m *Master) init(c *Config) {
podCache := NewPodCache(c.KubeletClient, m.podRegistry) podCache := NewPodCache(c.KubeletClient, m.podRegistry)
go util.Forever(func() { podCache.UpdateAllContainers() }, time.Second*30) go util.Forever(func() { podCache.UpdateAllContainers() }, time.Second*30)
if c.Cloud != nil && len(c.MinionRegexp) > 0 {
// TODO: Move minion controller to its own code.
cloudcontroller.NewMinionController(c.Cloud, c.MinionRegexp, &c.NodeResources, m.minionRegistry, c.MinionCacheTTL).Run()
} else {
for _, minionID := range c.Minions {
m.minionRegistry.CreateMinion(nil, &api.Minion{
ObjectMeta: api.ObjectMeta{Name: minionID},
NodeResources: c.NodeResources,
})
}
}
var userContexts = handlers.NewUserRequestContext() var userContexts = handlers.NewUserRequestContext()
var authenticator authenticator.Request var authenticator authenticator.Request
if len(c.TokenAuthFile) != 0 { if len(c.TokenAuthFile) != 0 {