diff --git a/cluster/saltbase/salt/apiserver/default b/cluster/saltbase/salt/apiserver/default index 6e7b66f7959..e8671d7a979 100644 --- a/cluster/saltbase/salt/apiserver/default +++ b/cluster/saltbase/salt/apiserver/default @@ -3,19 +3,13 @@ {% set daemon_args = "" %} {% endif %} -{% set machines = ""%} {% set cloud_provider = "" %} -{% set minion_regexp = "-minion_regexp=.*" %} {% if grains.cloud_provider is defined %} {% set cloud_provider = "-cloud_provider=" + grains.cloud_provider %} {% endif %} {% set address = "-address=127.0.0.1" %} -{% if pillar['node_instance_prefix'] is defined %} - {% set minion_regexp = "-minion_regexp='" + pillar['node_instance_prefix'] + ".*'" %} -{% endif %} - {% if grains.etcd_servers is defined %} {% set etcd_servers = "-etcd_servers=http://" + grains.etcd_servers + ":4001" %} {% else %} @@ -26,28 +20,11 @@ {% if grains.cloud is defined %} {% if grains.cloud == 'gce' %} {% set cloud_provider = "-cloud_provider=gce" %} - {% set machines = "-machines=" + ','.join(salt['mine.get']('roles:kubernetes-pool', 'network.ip_addrs', expr_form='grain').keys()) %} -{% endif %} -{% if grains.cloud == 'azure' %} - MACHINES="{{ salt['mine.get']('roles:kubernetes-pool', 'grains.items', expr_form='grain').values()|join(',', attribute='hostnamef') }}" - {% set machines = "-machines=$MACHINES" %} -{% endif %} -{% if grains.cloud == 'vsphere' %} - # Collect IPs of minions as machines list. - # - # Use a bash array to build the value we need. Jinja 2.7 does support a 'map' - # filter that would simplify this. However, some installations (specifically - # Debian Wheezy) only install Jinja 2.6. - MACHINE_IPS=() - {% for addrs in salt['mine.get']('roles:kubernetes-pool', 'network.ip_addrs', expr_form='grain').values() %} - MACHINE_IPS+=( {{ addrs[0] }} ) - {% endfor %} - {% set machines = "-machines=$(echo ${MACHINE_IPS[@]} | xargs -n1 echo | paste -sd,)" %} - {% set minion_regexp = "" %} {% endif %} {% endif %} + {% if pillar['portal_net'] is defined %} {% set portal_net = "-portal_net=" + pillar['portal_net'] %} {% endif %} -DAEMON_ARGS="{{daemon_args}} {{address}} {{machines}} {{etcd_servers}} {{ minion_regexp }} {{ cloud_provider }} --allow_privileged={{pillar['allow_privileged']}} {{portal_net}}" +DAEMON_ARGS="{{daemon_args}} {{address}} {{etcd_servers}} {{ cloud_provider }} --allow_privileged={{pillar['allow_privileged']}} {{portal_net}}" diff --git a/cluster/saltbase/salt/controller-manager/default b/cluster/saltbase/salt/controller-manager/default index e4a89682e62..9e65fbc4dd4 100644 --- a/cluster/saltbase/salt/controller-manager/default +++ b/cluster/saltbase/salt/controller-manager/default @@ -2,5 +2,42 @@ {% if grains['os_family'] == 'RedHat' %} {% set daemon_args = "" %} {% endif %} + {% set master="-master=127.0.0.1:8080" %} -DAEMON_ARGS="{{daemon_args}} {{master}}" + +{% set machines = ""%} +{% set cloud_provider = "" %} +{% set minion_regexp = "-minion_regexp=.*" %} +{% if grains.cloud_provider is defined %} + {% set cloud_provider = "-cloud_provider=" + grains.cloud_provider %} +{% endif %} + +{% if pillar['node_instance_prefix'] is defined %} + {% set minion_regexp = "-minion_regexp='" + pillar['node_instance_prefix'] + ".*'" %} +{% endif %} + +{% if grains.cloud is defined %} +{% if grains.cloud == 'gce' %} + {% set cloud_provider = "-cloud_provider=gce" %} + {% set machines = "-machines=" + ','.join(salt['mine.get']('roles:kubernetes-pool', 'network.ip_addrs', expr_form='grain').keys()) %} +{% endif %} +{% if grains.cloud == 'azure' %} + MACHINES="{{ salt['mine.get']('roles:kubernetes-pool', 'grains.items', expr_form='grain').values()|join(',', attribute='hostnamef') }}" + {% set machines = "-machines=$MACHINES" %} +{% endif %} +{% if grains.cloud == 'vsphere' %} + # Collect IPs of minions as machines list. + # + # Use a bash array to build the value we need. Jinja 2.7 does support a 'map' + # filter that would simplify this. However, some installations (specifically + # Debian Wheezy) only install Jinja 2.6. + MACHINE_IPS=() + {% for addrs in salt['mine.get']('roles:kubernetes-pool', 'network.ip_addrs', expr_form='grain').values() %} + MACHINE_IPS+=( {{ addrs[0] }} ) + {% endfor %} + {% set machines = "-machines=$(echo ${MACHINE_IPS[@]} | xargs -n1 echo | paste -sd,)" %} + {% set minion_regexp = "" %} +{% endif %} +{% endif %} + +DAEMON_ARGS="{{daemon_args}} {{master}} {{machines}} {{ minion_regexp }} {{ cloud_provider }}" diff --git a/cmd/apiserver/apiserver.go b/cmd/apiserver/apiserver.go index 083e73ff756..373889f5e22 100644 --- a/cmd/apiserver/apiserver.go +++ b/cmd/apiserver/apiserver.go @@ -22,17 +22,14 @@ import ( "flag" "net" "net/http" - "os" "strconv" "time" - "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver" "github.com/GoogleCloudPlatform/kubernetes/pkg/capabilities" "github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider" "github.com/GoogleCloudPlatform/kubernetes/pkg/master" - "github.com/GoogleCloudPlatform/kubernetes/pkg/resources" "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/version/verflag" @@ -63,22 +60,16 @@ var ( storageVersion = flag.String("storage_version", "", "The version to store resources with. Defaults to server preferred") cloudProvider = flag.String("cloud_provider", "", "The provider for cloud services. Empty string for no provider.") cloudConfigFile = flag.String("cloud_config", "", "The path to the cloud provider configuration file. Empty string for no configuration file.") - minionRegexp = flag.String("minion_regexp", "", "If non empty, and -cloud_provider is specified, a regular expression for matching minion VMs.") healthCheckMinions = flag.Bool("health_check_minions", true, "If true, health check minions and filter unhealthy ones. Default true.") - minionCacheTTL = flag.Duration("minion_cache_ttl", 30*time.Second, "Duration of time to cache minion information. Default 30 seconds.") eventTTL = flag.Duration("event_ttl", 48*time.Hour, "Amount of time to retain events. Default 2 days.") tokenAuthFile = flag.String("token_auth_file", "", "If set, the file that will be used to secure the API server via token authentication.") etcdServerList util.StringList etcdConfigFile = flag.String("etcd_config", "", "The config file for the etcd client. Mutually exclusive with -etcd_servers.") - machineList util.StringList corsAllowedOriginList util.StringList allowPrivileged = flag.Bool("allow_privileged", false, "If true, allow privileged containers.") portalNet util.IPNet // TODO: make this a list - // TODO: Discover these by pinging the host machines, and rip out these flags. - nodeMilliCPU = flag.Int("node_milli_cpu", 1000, "The amount of MilliCPU provisioned on each node") - nodeMemory = flag.Int("node_memory", 3*1024*1024*1024, "The amount of memory (in bytes) provisioned on each node") - enableLogsSupport = flag.Bool("enable_logs_support", true, "Enables server endpoint for log collection") - kubeletConfig = client.KubeletConfig{ + enableLogsSupport = flag.Bool("enable_logs_support", true, "Enables server endpoint for log collection") + kubeletConfig = client.KubeletConfig{ Port: 10250, EnableHttps: false, } @@ -87,24 +78,11 @@ var ( func init() { flag.Var(&address, "address", "The IP address on to serve on (set to 0.0.0.0 for all interfaces)") flag.Var(&etcdServerList, "etcd_servers", "List of etcd servers to watch (http://ip:port), comma separated. Mutually exclusive with -etcd_config") - flag.Var(&machineList, "machines", "List of machines to schedule onto, comma separated.") flag.Var(&corsAllowedOriginList, "cors_allowed_origins", "List of allowed origins for CORS, comma separated. An allowed origin can be a regular expression to support subdomain matching. If this list is empty CORS will not be enabled.") flag.Var(&portalNet, "portal_net", "A CIDR notation IP range from which to assign portal IPs. This must not overlap with any IP ranges assigned to nodes for pods.") client.BindKubeletClientConfigFlags(flag.CommandLine, &kubeletConfig) } -func verifyMinionFlags() { - if *cloudProvider == "" || *minionRegexp == "" { - if len(machineList) == 0 { - glog.Info("No machines specified!") - } - return - } - if len(machineList) != 0 { - glog.Info("-machines is overwritten by -minion_regexp") - } -} - // TODO: Longer term we should read this from some config store, rather than a flag. func verifyPortalFlags() { if portalNet.IP == nil { @@ -112,37 +90,6 @@ func verifyPortalFlags() { } } -func initCloudProvider(name string, configFilePath string) cloudprovider.Interface { - var config *os.File - - if name == "" { - glog.Info("No cloud provider specified.") - return nil - } - - if configFilePath != "" { - var err error - - config, err = os.Open(configFilePath) - if err != nil { - glog.Fatalf("Couldn't open cloud provider configuration %s: %#v", - configFilePath, err) - } - - defer config.Close() - } - - cloud, err := cloudprovider.GetCloudProvider(name, config) - if err != nil { - glog.Fatalf("Couldn't init cloud provider %q: %#v", name, err) - } - if cloud == nil { - glog.Fatalf("Unknown cloud provider: %s", name) - } - - return cloud -} - func newEtcd(etcdConfigFile string, etcdServerList util.StringList) (helper tools.EtcdHelper, err error) { var client tools.EtcdGetSet if etcdConfigFile != "" { @@ -163,7 +110,6 @@ func main() { defer util.FlushLogs() verflag.PrintAndExitIfRequested() - verifyMinionFlags() verifyPortalFlags() if (*etcdConfigFile != "" && len(etcdServerList) != 0) || (*etcdConfigFile == "" && len(etcdServerList) == 0) { @@ -174,7 +120,7 @@ func main() { AllowPrivileged: *allowPrivileged, }) - cloud := initCloudProvider(*cloudProvider, *cloudConfigFile) + cloud := cloudprovider.InitCloudProvider(*cloudProvider, *cloudConfigFile) kubeletClient, err := client.NewKubeletClient(&kubeletConfig) if err != nil { @@ -198,31 +144,21 @@ func main() { n := net.IPNet(portalNet) config := &master.Config{ - Client: client, - Cloud: cloud, - EtcdHelper: helper, - HealthCheckMinions: *healthCheckMinions, - Minions: machineList, - MinionCacheTTL: *minionCacheTTL, - EventTTL: *eventTTL, - MinionRegexp: *minionRegexp, - KubeletClient: kubeletClient, - NodeResources: api.NodeResources{ - Capacity: api.ResourceList{ - resources.CPU: util.NewIntOrStringFromInt(*nodeMilliCPU), - resources.Memory: util.NewIntOrStringFromInt(*nodeMemory), - }, - }, + Client: client, + Cloud: cloud, + EtcdHelper: helper, + HealthCheckMinions: *healthCheckMinions, + EventTTL: *eventTTL, + KubeletClient: kubeletClient, PortalNet: &n, EnableLogsSupport: *enableLogsSupport, EnableUISupport: true, APIPrefix: *apiPrefix, CorsAllowedOriginList: corsAllowedOriginList, TokenAuthFile: *tokenAuthFile, - - ReadOnlyPort: *readOnlyPort, - ReadWritePort: *port, - PublicAddress: *publicAddressOverride, + ReadOnlyPort: *readOnlyPort, + ReadWritePort: *port, + PublicAddress: *publicAddressOverride, } m := master.New(config) diff --git a/cmd/controller-manager/controller-manager.go b/cmd/controller-manager/controller-manager.go index 8a2d7ca03e2..8d523ec0faa 100644 --- a/cmd/controller-manager/controller-manager.go +++ b/cmd/controller-manager/controller-manager.go @@ -27,10 +27,14 @@ import ( "strconv" "time" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/client" - "github.com/GoogleCloudPlatform/kubernetes/pkg/controller" + "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider" + minionControllerPkg "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/controller" + replicationControllerPkg "github.com/GoogleCloudPlatform/kubernetes/pkg/controller" _ "github.com/GoogleCloudPlatform/kubernetes/pkg/healthz" "github.com/GoogleCloudPlatform/kubernetes/pkg/master/ports" + "github.com/GoogleCloudPlatform/kubernetes/pkg/resources" "github.com/GoogleCloudPlatform/kubernetes/pkg/service" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/version/verflag" @@ -38,22 +42,43 @@ import ( ) var ( - port = flag.Int("port", ports.ControllerManagerPort, "The port that the controller-manager's http service runs on") - address = util.IP(net.ParseIP("127.0.0.1")) - clientConfig = &client.Config{} + port = flag.Int("port", ports.ControllerManagerPort, "The port that the controller-manager's http service runs on") + address = util.IP(net.ParseIP("127.0.0.1")) + clientConfig = &client.Config{} + cloudProvider = flag.String("cloud_provider", "", "The provider for cloud services. Empty string for no provider.") + cloudConfigFile = flag.String("cloud_config", "", "The path to the cloud provider configuration file. Empty string for no configuration file.") + minionRegexp = flag.String("minion_regexp", "", "If non empty, and -cloud_provider is specified, a regular expression for matching minion VMs.") + machineList util.StringList + // TODO: Discover these by pinging the host machines, and rip out these flags. + nodeMilliCPU = flag.Int("node_milli_cpu", 1000, "The amount of MilliCPU provisioned on each node") + nodeMemory = flag.Int("node_memory", 3*1024*1024*1024, "The amount of memory (in bytes) provisioned on each node") ) func init() { flag.Var(&address, "address", "The IP address to serve on (set to 0.0.0.0 for all interfaces)") + flag.Var(&machineList, "machines", "List of machines to schedule onto, comma separated.") client.BindClientConfigFlags(flag.CommandLine, clientConfig) } +func verifyMinionFlags() { + if *cloudProvider == "" || *minionRegexp == "" { + if len(machineList) == 0 { + glog.Info("No machines specified!") + } + return + } + if len(machineList) != 0 { + glog.Info("-machines is overwritten by -minion_regexp") + } +} + func main() { flag.Parse() util.InitLogs() defer util.FlushLogs() verflag.PrintAndExitIfRequested() + verifyMinionFlags() if len(clientConfig.Host) == 0 { glog.Fatal("usage: controller-manager -master ") @@ -69,8 +94,18 @@ func main() { endpoints := service.NewEndpointController(kubeClient) go util.Forever(func() { endpoints.SyncServiceEndpoints() }, time.Second*10) - controllerManager := controller.NewReplicationManager(kubeClient) + controllerManager := replicationControllerPkg.NewReplicationManager(kubeClient) controllerManager.Run(10 * time.Second) + cloud := cloudprovider.InitCloudProvider(*cloudProvider, *cloudConfigFile) + nodeResources := &api.NodeResources{ + Capacity: api.ResourceList{ + resources.CPU: util.NewIntOrStringFromInt(*nodeMilliCPU), + resources.Memory: util.NewIntOrStringFromInt(*nodeMemory), + }, + } + minionController := minionControllerPkg.NewMinionController(cloud, *minionRegexp, machineList, nodeResources, kubeClient) + minionController.Run(10 * time.Second) + select {} } diff --git a/cmd/controller-manager/plugins.go b/cmd/controller-manager/plugins.go new file mode 100644 index 00000000000..64908e509a3 --- /dev/null +++ b/cmd/controller-manager/plugins.go @@ -0,0 +1,28 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +// This file exists to force the desired plugin implementations to be linked. +// This should probably be part of some configuration fed into the build for a +// given binary target. +import ( + _ "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/aws" + _ "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/gce" + _ "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/openstack" + _ "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/ovirt" + _ "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/vagrant" +) diff --git a/cmd/integration/integration.go b/cmd/integration/integration.go index a22039d0e6c..bbb9be4e481 100644 --- a/cmd/integration/integration.go +++ b/cmd/integration/integration.go @@ -37,7 +37,8 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api/latest" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/testapi" "github.com/GoogleCloudPlatform/kubernetes/pkg/client" - "github.com/GoogleCloudPlatform/kubernetes/pkg/controller" + minionControllerPkg "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/controller" + replicationControllerPkg "github.com/GoogleCloudPlatform/kubernetes/pkg/controller" "github.com/GoogleCloudPlatform/kubernetes/pkg/health" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/config" @@ -137,11 +138,11 @@ func startComponents(manifestURL string) (apiServerURL string) { if err != nil { glog.Fatalf("Nonnumeric port? %v", err) } + // Create a master and install handlers into mux. m := master.New(&master.Config{ Client: cl, EtcdHelper: helper, - Minions: machineList, KubeletClient: fakeKubeletClient{}, EnableLogsSupport: false, APIPrefix: "/api", @@ -160,12 +161,16 @@ func startComponents(manifestURL string) (apiServerURL string) { endpoints := service.NewEndpointController(cl) go util.Forever(func() { endpoints.SyncServiceEndpoints() }, time.Second*10) - controllerManager := controller.NewReplicationManager(cl) + controllerManager := replicationControllerPkg.NewReplicationManager(cl) // Prove that controllerManager's watch works by making it not sync until after this // test is over. (Hopefully we don't take 10 minutes!) controllerManager.Run(10 * time.Minute) + nodeResources := &api.NodeResources{} + minionController := minionControllerPkg.NewMinionController(nil, "", machineList, nodeResources, cl) + minionController.Run(10 * time.Second) + // Kubelet (localhost) os.MkdirAll(testRootDir, 0750) cfg1 := config.NewPodConfig(config.PodConfigNotificationSnapshotAndUpdates) diff --git a/hack/test-cmd.sh b/hack/test-cmd.sh index 7016344acd8..e1f8244a7d0 100755 --- a/hack/test-cmd.sh +++ b/hack/test-cmd.sh @@ -49,6 +49,7 @@ ETCD_PORT=${ETCD_PORT:-4001} API_PORT=${API_PORT:-8080} API_HOST=${API_HOST:-127.0.0.1} KUBELET_PORT=${KUBELET_PORT:-10250} +CTLRMGR_PORT=${CTLRMGR_PORT:-10252} GO_OUT=${KUBE_TARGET}/bin # Check kubectl @@ -70,14 +71,20 @@ ${GO_OUT}/apiserver \ --address="127.0.0.1" \ --port="${API_PORT}" \ --etcd_servers="http://${ETCD_HOST}:${ETCD_PORT}" \ - --machines="127.0.0.1" \ --kubelet_port=${KUBELET_PORT} \ --portal_net="10.0.0.0/24" 1>&2 & - APISERVER_PID=$! wait_for_url "http://127.0.0.1:${API_PORT}/healthz" "apiserver: " +# Start controller manager +${GO_OUT}/controller-manager \ + --machines="127.0.0.1" \ + --master="127.0.0.1:${API_PORT}" 1>&2 & +CTLRMGR_PID=$! + +wait_for_url "http://127.0.0.1:${CTLRMGR_PORT}/healthz" "controller-manager: " + KUBE_CMD="${GO_OUT}/kubectl" KUBE_FLAGS="-s http://127.0.0.1:${API_PORT} --match-server-version" @@ -93,12 +100,6 @@ ${KUBE_CMD} get minions ${KUBE_FLAGS} ${KUBE_CMD} get minions 127.0.0.1 ${KUBE_FLAGS} echo "kubectl(minions): ok" -# Start controller manager -#${GO_OUT}/controller-manager \ -# --etcd_servers="http://127.0.0.1:${ETCD_PORT}" \ -# --master="127.0.0.1:${API_PORT}" 1>&2 & -#CTLRMGR_PID=$! - # Start proxy #PROXY_LOG=/tmp/kube-proxy.log #${GO_OUT}/proxy \ diff --git a/pkg/client/minions.go b/pkg/client/minions.go index 945a1ba4144..e61d8a64c93 100644 --- a/pkg/client/minions.go +++ b/pkg/client/minions.go @@ -16,9 +16,7 @@ limitations under the License. package client -import ( - "github.com/GoogleCloudPlatform/kubernetes/pkg/api" -) +import "github.com/GoogleCloudPlatform/kubernetes/pkg/api" type MinionsInterface interface { Minions() MinionInterface @@ -49,17 +47,17 @@ func (c *minions) Create(minion *api.Minion) (*api.Minion, error) { } // List lists all the minions in the cluster. -func (c *minions) List() (result *api.MinionList, err error) { - result = &api.MinionList{} - err = c.r.Get().Path("minions").Do().Into(result) - return +func (c *minions) List() (*api.MinionList, error) { + result := &api.MinionList{} + err := c.r.Get().Path("minions").Do().Into(result) + return result, err } // Get gets an existing minion -func (c *minions) Get(id string) (result *api.Minion, err error) { - result = &api.Minion{} - err = c.r.Get().Path("minions").Path(id).Do().Into(result) - return +func (c *minions) Get(id string) (*api.Minion, error) { + result := &api.Minion{} + err := c.r.Get().Path("minions").Path(id).Do().Into(result) + return result, err } // Delete deletes an existing minion. diff --git a/pkg/cloudprovider/controller/minioncontroller.go b/pkg/cloudprovider/controller/minioncontroller.go index a0fbadb3cec..4deede3b247 100644 --- a/pkg/cloudprovider/controller/minioncontroller.go +++ b/pkg/cloudprovider/controller/minioncontroller.go @@ -21,8 +21,8 @@ import ( "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider" - "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/minion" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/golang/glog" ) @@ -31,31 +31,38 @@ type MinionController struct { cloud cloudprovider.Interface matchRE string staticResources *api.NodeResources - registry minion.Registry - period time.Duration + minions []string + kubeClient client.Interface } // NewMinionController returns a new minion controller to sync instances from cloudprovider. func NewMinionController( cloud cloudprovider.Interface, matchRE string, + minions []string, staticResources *api.NodeResources, - registry minion.Registry, - period time.Duration) *MinionController { + kubeClient client.Interface) *MinionController { return &MinionController{ cloud: cloud, matchRE: matchRE, + minions: minions, staticResources: staticResources, - registry: registry, - period: period, + kubeClient: kubeClient, } } -// Run starts syncing instances from cloudprovider periodically. -func (s *MinionController) Run() { - // Call Sync() first to warm up minion registry. - s.Sync() - go util.Forever(func() { s.Sync() }, s.period) +// Run starts syncing instances from cloudprovider periodically, or create initial minion list. +func (s *MinionController) Run(period time.Duration) { + if s.cloud != nil && len(s.matchRE) > 0 { + go util.Forever(func() { s.Sync() }, period) + } else { + for _, minionID := range s.minions { + s.kubeClient.Minions().Create(&api.Minion{ + ObjectMeta: api.ObjectMeta{Name: minionID}, + NodeResources: *s.staticResources, + }) + } + } } // Sync syncs list of instances from cloudprovider to master etcd registry. @@ -64,7 +71,7 @@ func (s *MinionController) Sync() error { if err != nil { return err } - minions, err := s.registry.ListMinions(nil) + minions, err := s.kubeClient.Minions().List() if err != nil { return err } @@ -77,20 +84,14 @@ func (s *MinionController) Sync() error { for _, minion := range matches.Items { if _, ok := minionMap[minion.Name]; !ok { glog.Infof("Create minion in registry: %s", minion.Name) - err = s.registry.CreateMinion(nil, &minion) - if err != nil { - return err - } + s.kubeClient.Minions().Create(&minion) } delete(minionMap, minion.Name) } for minionID := range minionMap { glog.Infof("Delete minion from registry: %s", minionID) - err = s.registry.DeleteMinion(nil, minionID) - if err != nil { - return err - } + s.kubeClient.Minions().Delete(minionID) } return nil } diff --git a/pkg/cloudprovider/controller/minioncontroller_test.go b/pkg/cloudprovider/controller/minioncontroller_test.go index ef65cfb9f3e..1bb82eb0f87 100644 --- a/pkg/cloudprovider/controller/minioncontroller_test.go +++ b/pkg/cloudprovider/controller/minioncontroller_test.go @@ -17,161 +17,103 @@ limitations under the License. package controller import ( + "fmt" + "net/http" + "net/http/httptest" "testing" - "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" - "github.com/GoogleCloudPlatform/kubernetes/pkg/api/latest" - fake_cloud "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/fake" - etcdregistry "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/etcd" - "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/pod" - "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/registrytest" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api/testapi" + "github.com/GoogleCloudPlatform/kubernetes/pkg/client" + "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/fake" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" - "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" - - etcd "github.com/coreos/go-etcd/etcd" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" ) -func NewTestEtcdRegistry(client tools.EtcdClient) *etcdregistry.Registry { - registry := etcdregistry.NewRegistry( - tools.EtcdHelper{client, latest.Codec, tools.RuntimeVersionAdapter{latest.ResourceVersioner}}, - &pod.BasicBoundPodFactory{ - ServiceRegistry: ®istrytest.ServiceRegistry{}, - }, - ) - return registry +func newMinionList(count int) api.MinionList { + minions := []api.Minion{} + for i := 0; i < count; i++ { + minions = append(minions, api.Minion{ + ObjectMeta: api.ObjectMeta{ + Name: fmt.Sprintf("minion%d", i), + }, + }) + } + return api.MinionList{ + Items: minions, + } +} + +type serverResponse struct { + statusCode int + obj interface{} +} + +func makeTestServer(t *testing.T, minionResponse serverResponse) (*httptest.Server, *util.FakeHandler) { + fakeMinionHandler := util.FakeHandler{ + StatusCode: minionResponse.statusCode, + ResponseBody: util.EncodeJSON(minionResponse.obj), + } + mux := http.NewServeMux() + mux.Handle("/api/"+testapi.Version()+"/minions", &fakeMinionHandler) + mux.Handle("/api/"+testapi.Version()+"/minions/", &fakeMinionHandler) + mux.HandleFunc("/", func(res http.ResponseWriter, req *http.Request) { + t.Errorf("unexpected request: %v", req.RequestURI) + res.WriteHeader(http.StatusNotFound) + }) + return httptest.NewServer(mux), &fakeMinionHandler } func TestSyncCreateMinion(t *testing.T) { - ctx := api.NewContext() - fakeClient := tools.NewFakeEtcdClient(t) - m1 := runtime.EncodeOrDie(latest.Codec, &api.Minion{ObjectMeta: api.ObjectMeta{Name: "m1"}}) - m2 := runtime.EncodeOrDie(latest.Codec, &api.Minion{ObjectMeta: api.ObjectMeta{Name: "m2"}}) - fakeClient.Set("/registry/minions/m1", m1, 0) - fakeClient.Set("/registry/minions/m2", m2, 0) - fakeClient.ExpectNotFoundGet("/registry/minions/m3") - fakeClient.Data["/registry/minions"] = tools.EtcdResponseWithError{ - R: &etcd.Response{ - Node: &etcd.Node{ - Nodes: []*etcd.Node{ - {Value: m1}, - {Value: m2}, - }, - }, - }, - E: nil, - } - - registry := NewTestEtcdRegistry(fakeClient) - instances := []string{"m1", "m2", "m3"} + testServer, minionHandler := makeTestServer(t, + serverResponse{http.StatusOK, newMinionList(1)}) + defer testServer.Close() + client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()}) + instances := []string{"minion0", "minion1"} fakeCloud := fake_cloud.FakeCloud{ Machines: instances, } - minionController := NewMinionController(&fakeCloud, ".*", nil, registry, time.Second) - - minion, err := registry.GetMinion(ctx, "m3") - if minion != nil { - t.Errorf("Unexpected contains") - } - - err = minionController.Sync() - if err != nil { + minionController := NewMinionController(&fakeCloud, ".*", nil, nil, client) + if err := minionController.Sync(); err != nil { t.Errorf("unexpected error: %v", err) } - minion, err = registry.GetMinion(ctx, "m3") - if minion == nil { - t.Errorf("Unexpected !contains") - } + data := runtime.EncodeOrDie(testapi.Codec(), &api.Minion{ObjectMeta: api.ObjectMeta{Name: "minion1"}}) + minionHandler.ValidateRequest(t, "/api/"+testapi.Version()+"/minions", "POST", &data) } func TestSyncDeleteMinion(t *testing.T) { - ctx := api.NewContext() - fakeClient := tools.NewFakeEtcdClient(t) - m1 := runtime.EncodeOrDie(latest.Codec, &api.Minion{ObjectMeta: api.ObjectMeta{Name: "m1"}}) - m2 := runtime.EncodeOrDie(latest.Codec, &api.Minion{ObjectMeta: api.ObjectMeta{Name: "m2"}}) - m3 := runtime.EncodeOrDie(latest.Codec, &api.Minion{ObjectMeta: api.ObjectMeta{Name: "m3"}}) - fakeClient.Set("/registry/minions/m1", m1, 0) - fakeClient.Set("/registry/minions/m2", m2, 0) - fakeClient.Set("/registry/minions/m3", m3, 0) - fakeClient.Data["/registry/minions"] = tools.EtcdResponseWithError{ - R: &etcd.Response{ - Node: &etcd.Node{ - Nodes: []*etcd.Node{ - {Value: m1}, - {Value: m2}, - {Value: m3}, - }, - }, - }, - E: nil, - } - - registry := NewTestEtcdRegistry(fakeClient) - instances := []string{"m1", "m2"} + testServer, minionHandler := makeTestServer(t, + serverResponse{http.StatusOK, newMinionList(2)}) + defer testServer.Close() + client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()}) + instances := []string{"minion0"} fakeCloud := fake_cloud.FakeCloud{ Machines: instances, } - minionController := NewMinionController(&fakeCloud, ".*", nil, registry, time.Second) - - minion, err := registry.GetMinion(ctx, "m3") - if minion == nil { - t.Errorf("Unexpected !contains") - } - - err = minionController.Sync() - if err != nil { + minionController := NewMinionController(&fakeCloud, ".*", nil, nil, client) + if err := minionController.Sync(); err != nil { t.Errorf("unexpected error: %v", err) } - minion, err = registry.GetMinion(ctx, "m3") - if minion != nil { - t.Errorf("Unexpected contains") - } + minionHandler.ValidateRequest(t, "/api/"+testapi.Version()+"/minions/minion1", "DELETE", nil) } func TestSyncMinionRegexp(t *testing.T) { - ctx := api.NewContext() - fakeClient := tools.NewFakeEtcdClient(t) - fakeClient.Data["/registry/minions"] = tools.EtcdResponseWithError{ - R: &etcd.Response{ - Node: &etcd.Node{ - Nodes: []*etcd.Node{}, - }, - }, - E: nil, - } - - registry := NewTestEtcdRegistry(fakeClient) - instances := []string{"m1", "m2", "n1", "n2"} + testServer, minionHandler := makeTestServer(t, + serverResponse{http.StatusOK, newMinionList(1)}) + defer testServer.Close() + client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()}) + instances := []string{"minion0", "minion1", "node0"} fakeCloud := fake_cloud.FakeCloud{ Machines: instances, } - minionController := NewMinionController(&fakeCloud, "m[0-9]+", nil, registry, time.Second) - - err := minionController.Sync() - if err != nil { + minionController := NewMinionController(&fakeCloud, "minion[0-9]+", nil, nil, client) + if err := minionController.Sync(); err != nil { t.Errorf("unexpected error: %v", err) } - var minion *api.Minion - fakeClient.ExpectNotFoundGet("/registry/minions/n1") - fakeClient.ExpectNotFoundGet("/registry/minions/n2") - - minion, err = registry.GetMinion(ctx, "m1") - if minion == nil { - t.Errorf("Unexpected !contains") - } - minion, err = registry.GetMinion(ctx, "m2") - if minion == nil { - t.Errorf("Unexpected !contains") - } - minion, err = registry.GetMinion(ctx, "n1") - if minion != nil { - t.Errorf("Unexpected !contains") - } - minion, err = registry.GetMinion(ctx, "n2") - if minion != nil { - t.Errorf("Unexpected !contains") - } + // Only minion1 is created. + data := runtime.EncodeOrDie(testapi.Codec(), &api.Minion{ObjectMeta: api.ObjectMeta{Name: "minion1"}}) + minionHandler.ValidateRequest(t, "/api/"+testapi.Version()+"/minions", "POST", &data) } diff --git a/pkg/cloudprovider/plugins.go b/pkg/cloudprovider/plugins.go index c7387ca0466..f982069a255 100644 --- a/pkg/cloudprovider/plugins.go +++ b/pkg/cloudprovider/plugins.go @@ -18,6 +18,7 @@ package cloudprovider import ( "io" + "os" "sync" "github.com/golang/glog" @@ -60,3 +61,35 @@ func GetCloudProvider(name string, config io.Reader) (Interface, error) { } return f(config) } + +// InitCloudProvider creates an instance of the named cloud provider. +func InitCloudProvider(name string, configFilePath string) Interface { + var config *os.File + + if name == "" { + glog.Info("No cloud provider specified.") + return nil + } + + if configFilePath != "" { + var err error + + config, err = os.Open(configFilePath) + if err != nil { + glog.Fatalf("Couldn't open cloud provider configuration %s: %#v", + configFilePath, err) + } + + defer config.Close() + } + + cloud, err := GetCloudProvider(name, config) + if err != nil { + glog.Fatalf("Couldn't init cloud provider %q: %#v", name, err) + } + if cloud == nil { + glog.Fatalf("Unknown cloud provider: %s", name) + } + + return cloud +} diff --git a/pkg/master/master.go b/pkg/master/master.go index 6aa6ccb0ca2..f085fc2a7fd 100644 --- a/pkg/master/master.go +++ b/pkg/master/master.go @@ -23,7 +23,6 @@ import ( "strings" "time" - "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/latest" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/v1beta1" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/v1beta2" @@ -34,7 +33,6 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/auth/handlers" "github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider" - cloudcontroller "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/controller" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/binding" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/controller" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/endpoint" @@ -58,12 +56,9 @@ type Config struct { Cloud cloudprovider.Interface EtcdHelper tools.EtcdHelper HealthCheckMinions bool - Minions []string - MinionCacheTTL time.Duration EventTTL time.Duration MinionRegexp string KubeletClient client.KubeletClient - NodeResources api.NodeResources PortalNet *net.IPNet Mux apiserver.Mux EnableLogsSupport bool @@ -265,18 +260,6 @@ func (m *Master) init(c *Config) { podCache := NewPodCache(c.KubeletClient, m.podRegistry) go util.Forever(func() { podCache.UpdateAllContainers() }, time.Second*30) - if c.Cloud != nil && len(c.MinionRegexp) > 0 { - // TODO: Move minion controller to its own code. - cloudcontroller.NewMinionController(c.Cloud, c.MinionRegexp, &c.NodeResources, m.minionRegistry, c.MinionCacheTTL).Run() - } else { - for _, minionID := range c.Minions { - m.minionRegistry.CreateMinion(nil, &api.Minion{ - ObjectMeta: api.ObjectMeta{Name: minionID}, - NodeResources: c.NodeResources, - }) - } - } - var userContexts = handlers.NewUserRequestContext() var authenticator authenticator.Request if len(c.TokenAuthFile) != 0 {