mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-24 12:15:52 +00:00
Merge pull request #7251 from a-robinson/tp
Move load balancer host / target pool reconciliation to the service controller
This commit is contained in:
commit
0f22b6fc4d
@ -221,7 +221,7 @@ func (s *CMServer) Run(_ []string) error {
|
||||
nodeController.Run(s.NodeSyncPeriod, s.SyncNodeList)
|
||||
|
||||
serviceController := servicecontroller.New(cloud, kubeClient, s.ClusterName)
|
||||
if err := serviceController.Run(); err != nil {
|
||||
if err := serviceController.Run(s.NodeSyncPeriod); err != nil {
|
||||
glog.Errorf("Failed to start service controller: %v", err)
|
||||
}
|
||||
|
||||
|
@ -130,12 +130,13 @@ func runControllerManager(machineList []string, cl *client.Client, nodeMilliCPU,
|
||||
},
|
||||
}
|
||||
|
||||
const nodeSyncPeriod = 10 * time.Second
|
||||
nodeController := nodecontroller.NewNodeController(
|
||||
nil, "", machineList, nodeResources, cl, 10, 5*time.Minute, util.NewTokenBucketRateLimiter(*deletingPodsQps, *deletingPodsBurst), 40*time.Second, 60*time.Second, 5*time.Second, "")
|
||||
nodeController.Run(10*time.Second, true)
|
||||
nodeController.Run(nodeSyncPeriod, true)
|
||||
|
||||
serviceController := servicecontroller.New(nil, cl, "kubernetes")
|
||||
if err := serviceController.Run(); err != nil {
|
||||
if err := serviceController.Run(nodeSyncPeriod); err != nil {
|
||||
glog.Warningf("Running without a service controller: %v", err)
|
||||
}
|
||||
|
||||
|
@ -337,27 +337,28 @@ func (gce *GCECloud) UpdateTCPLoadBalancer(name, region string, hosts []string)
|
||||
toRemove = append(toRemove, &compute.InstanceReference{link})
|
||||
}
|
||||
|
||||
add := &compute.TargetPoolsAddInstanceRequest{
|
||||
Instances: toAdd,
|
||||
}
|
||||
op, err := gce.service.TargetPools.AddInstance(gce.projectID, region, name, add).Do()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = gce.waitForRegionOp(op, region)
|
||||
if err != nil {
|
||||
return err
|
||||
if len(toAdd) > 0 {
|
||||
add := &compute.TargetPoolsAddInstanceRequest{Instances: toAdd}
|
||||
op, err := gce.service.TargetPools.AddInstance(gce.projectID, region, name, add).Do()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := gce.waitForRegionOp(op, region); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
rm := &compute.TargetPoolsRemoveInstanceRequest{
|
||||
Instances: toRemove,
|
||||
if len(toRemove) > 0 {
|
||||
rm := &compute.TargetPoolsRemoveInstanceRequest{Instances: toRemove}
|
||||
op, err := gce.service.TargetPools.RemoveInstance(gce.projectID, region, name, rm).Do()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := gce.waitForRegionOp(op, region); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
op, err = gce.service.TargetPools.RemoveInstance(gce.projectID, region, name, rm).Do()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = gce.waitForRegionOp(op, region)
|
||||
return err
|
||||
return nil
|
||||
}
|
||||
|
||||
// DeleteTCPLoadBalancer is an implementation of TCPLoadBalancer.DeleteTCPLoadBalancer.
|
||||
|
@ -87,8 +87,6 @@ type NodeController struct {
|
||||
// TODO: Change node status monitor to watch based.
|
||||
nodeMonitorPeriod time.Duration
|
||||
clusterName string
|
||||
// Should external services be reconciled during syncing cloud nodes, even though the nodes were not changed.
|
||||
reconcileServices bool
|
||||
// Method for easy mocking in unittest.
|
||||
lookupIP func(host string) ([]net.IP, error)
|
||||
now func() util.Time
|
||||
@ -221,62 +219,6 @@ func (nc *NodeController) registerNodes(nodes *api.NodeList, retryCount int, ret
|
||||
}
|
||||
}
|
||||
|
||||
// reconcileExternalServices updates balancers for external services, so that they will match the nodes given.
|
||||
// Returns true if something went wrong and we should call reconcile again.
|
||||
func (nc *NodeController) reconcileExternalServices(nodes *api.NodeList) (shouldRetry bool) {
|
||||
balancer, ok := nc.cloud.TCPLoadBalancer()
|
||||
if !ok {
|
||||
glog.Error("The cloud provider does not support external TCP load balancers.")
|
||||
return false
|
||||
}
|
||||
|
||||
zones, ok := nc.cloud.Zones()
|
||||
if !ok {
|
||||
glog.Error("The cloud provider does not support zone enumeration.")
|
||||
return false
|
||||
}
|
||||
zone, err := zones.GetZone()
|
||||
if err != nil {
|
||||
glog.Errorf("Error while getting zone: %v", err)
|
||||
return false
|
||||
}
|
||||
|
||||
hosts := []string{}
|
||||
for _, node := range nodes.Items {
|
||||
hosts = append(hosts, node.Name)
|
||||
}
|
||||
|
||||
services, err := nc.kubeClient.Services(api.NamespaceAll).List(labels.Everything())
|
||||
if err != nil {
|
||||
glog.Errorf("Error while listing services: %v", err)
|
||||
return true
|
||||
}
|
||||
shouldRetry = false
|
||||
for _, service := range services.Items {
|
||||
if service.Spec.CreateExternalLoadBalancer {
|
||||
nonTCPPort := false
|
||||
for i := range service.Spec.Ports {
|
||||
if service.Spec.Ports[i].Protocol != api.ProtocolTCP {
|
||||
nonTCPPort = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if nonTCPPort {
|
||||
// TODO: Support UDP here.
|
||||
glog.Errorf("External load balancers for non TCP services are not currently supported: %v.", service)
|
||||
continue
|
||||
}
|
||||
name := cloudprovider.GetLoadBalancerName(&service)
|
||||
err := balancer.UpdateTCPLoadBalancer(name, zone.Region, hosts)
|
||||
if err != nil {
|
||||
glog.Errorf("External error while updating TCP load balancer: %v.", err)
|
||||
shouldRetry = true
|
||||
}
|
||||
}
|
||||
}
|
||||
return shouldRetry
|
||||
}
|
||||
|
||||
// syncCloudNodes synchronizes the list of instances from cloudprovider to master server.
|
||||
func (nc *NodeController) syncCloudNodes() error {
|
||||
matches, err := nc.getCloudNodesWithSpec()
|
||||
@ -295,7 +237,6 @@ func (nc *NodeController) syncCloudNodes() error {
|
||||
|
||||
// Create nodes which have been created in cloud, but not in kubernetes cluster
|
||||
// Skip nodes if we hit an error while trying to get their addresses.
|
||||
nodesChanged := false
|
||||
for _, node := range matches.Items {
|
||||
if _, ok := nodeMap[node.Name]; !ok {
|
||||
glog.V(3).Infof("Querying addresses for new node: %s", node.Name)
|
||||
@ -313,7 +254,6 @@ func (nc *NodeController) syncCloudNodes() error {
|
||||
if err != nil {
|
||||
glog.Errorf("Create node %s error: %v", node.Name, err)
|
||||
}
|
||||
nodesChanged = true
|
||||
}
|
||||
delete(nodeMap, node.Name)
|
||||
}
|
||||
@ -326,15 +266,6 @@ func (nc *NodeController) syncCloudNodes() error {
|
||||
glog.Errorf("Delete node %s error: %v", nodeID, err)
|
||||
}
|
||||
nc.deletePods(nodeID)
|
||||
nodesChanged = true
|
||||
}
|
||||
|
||||
// Make external services aware of nodes currently present in the cluster.
|
||||
if nodesChanged || nc.reconcileServices {
|
||||
nc.reconcileServices = nc.reconcileExternalServices(matches)
|
||||
if nc.reconcileServices {
|
||||
glog.Error("Reconcilation of external services failed and will be retried during the next sync.")
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
|
@ -32,7 +32,6 @@ import (
|
||||
fake_cloud "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/fake"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/fields"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/types"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
|
||||
)
|
||||
@ -558,7 +557,7 @@ func TestSyncCloudNodesEvictPods(t *testing.T) {
|
||||
matchRE: ".*",
|
||||
expectedRequestCount: 2, // List + Delete
|
||||
expectedDeleted: []string{"node1"},
|
||||
expectedActions: []testclient.FakeAction{{Action: "list-pods"}, {Action: "delete-pod", Value: "pod0"}, {Action: "list-services"}},
|
||||
expectedActions: []testclient.FakeAction{{Action: "list-pods"}, {Action: "delete-pod", Value: "pod0"}},
|
||||
},
|
||||
{
|
||||
// Delete node1, but pod0 is running on node0.
|
||||
@ -572,7 +571,7 @@ func TestSyncCloudNodesEvictPods(t *testing.T) {
|
||||
matchRE: ".*",
|
||||
expectedRequestCount: 2, // List + Delete
|
||||
expectedDeleted: []string{"node1"},
|
||||
expectedActions: []testclient.FakeAction{{Action: "list-pods"}, {Action: "list-services"}},
|
||||
expectedActions: []testclient.FakeAction{{Action: "list-pods"}},
|
||||
},
|
||||
}
|
||||
|
||||
@ -598,71 +597,6 @@ func TestSyncCloudNodesEvictPods(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestSyncCloudNodesReconcilesExternalService(t *testing.T) {
|
||||
table := []struct {
|
||||
fakeNodeHandler *FakeNodeHandler
|
||||
fakeCloud *fake_cloud.FakeCloud
|
||||
matchRE string
|
||||
expectedClientActions []testclient.FakeAction
|
||||
expectedUpdateCalls []fake_cloud.FakeUpdateBalancerCall
|
||||
}{
|
||||
{
|
||||
// Set of nodes does not change: do nothing.
|
||||
fakeNodeHandler: &FakeNodeHandler{
|
||||
Existing: []*api.Node{newNode("node0"), newNode("node1")},
|
||||
Fake: testclient.NewSimpleFake(&api.ServiceList{Items: []api.Service{*newService("service0", types.UID(""), true), *newService("service1", types.UID(""), false)}})},
|
||||
fakeCloud: &fake_cloud.FakeCloud{
|
||||
Machines: []string{"node0", "node1"},
|
||||
},
|
||||
matchRE: ".*",
|
||||
expectedClientActions: nil,
|
||||
expectedUpdateCalls: nil,
|
||||
},
|
||||
{
|
||||
// Delete "node1", target pool for "service0" should shrink.
|
||||
fakeNodeHandler: &FakeNodeHandler{
|
||||
Existing: []*api.Node{newNode("node0"), newNode("node1")},
|
||||
Fake: testclient.NewSimpleFake(&api.ServiceList{Items: []api.Service{*newService("service0", types.UID("2c104a7c-e79e-11e4-8187-42010af0068a"), true), *newService("service1", types.UID(""), false)}})},
|
||||
fakeCloud: &fake_cloud.FakeCloud{
|
||||
Machines: []string{"node0"},
|
||||
},
|
||||
matchRE: ".*",
|
||||
expectedClientActions: []testclient.FakeAction{{Action: "list-pods"}, {Action: "list-services"}},
|
||||
expectedUpdateCalls: []fake_cloud.FakeUpdateBalancerCall{
|
||||
{Name: "a2c104a7ce79e11e4818742010af0068", Hosts: []string{"node0"}},
|
||||
},
|
||||
},
|
||||
{
|
||||
// Add "node1", target pool for "service0" should grow.
|
||||
fakeNodeHandler: &FakeNodeHandler{
|
||||
Existing: []*api.Node{newNode("node0")},
|
||||
Fake: testclient.NewSimpleFake(&api.ServiceList{Items: []api.Service{*newService("service0", types.UID("2c104a7c-e79e-11e4-8187-42010af0068a"), true), *newService("service1", types.UID(""), false)}})},
|
||||
fakeCloud: &fake_cloud.FakeCloud{
|
||||
Machines: []string{"node0", "node1"},
|
||||
},
|
||||
matchRE: ".*",
|
||||
expectedClientActions: []testclient.FakeAction{{Action: "list-services"}},
|
||||
expectedUpdateCalls: []fake_cloud.FakeUpdateBalancerCall{
|
||||
{Name: "a2c104a7ce79e11e4818742010af0068", Hosts: []string{"node0", "node1"}},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, item := range table {
|
||||
nodeController := NewNodeController(item.fakeCloud, item.matchRE, nil, &api.NodeResources{}, item.fakeNodeHandler,
|
||||
10, time.Minute, util.NewFakeRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, "kubernetes")
|
||||
if err := nodeController.syncCloudNodes(); err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
if !reflect.DeepEqual(item.expectedClientActions, item.fakeNodeHandler.Actions) {
|
||||
t.Errorf("expected client actions mismatch, expected %+v, got %+v", item.expectedClientActions, item.fakeNodeHandler.Actions)
|
||||
}
|
||||
if !reflect.DeepEqual(item.expectedUpdateCalls, item.fakeCloud.UpdateCalls) {
|
||||
t.Errorf("expected update calls mismatch, expected %+v, got %+v", item.expectedUpdateCalls, item.fakeCloud.UpdateCalls)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestPopulateNodeAddresses(t *testing.T) {
|
||||
table := []struct {
|
||||
nodes *api.NodeList
|
||||
@ -1129,10 +1063,6 @@ func newPod(name, host string) *api.Pod {
|
||||
return &api.Pod{ObjectMeta: api.ObjectMeta{Name: name}, Spec: api.PodSpec{Host: host}}
|
||||
}
|
||||
|
||||
func newService(name string, uid types.UID, external bool) *api.Service {
|
||||
return &api.Service{ObjectMeta: api.ObjectMeta{Name: name, Namespace: "namespace", UID: uid}, Spec: api.ServiceSpec{CreateExternalLoadBalancer: external}}
|
||||
}
|
||||
|
||||
func sortedNodeNames(nodes []*api.Node) []string {
|
||||
nodeNames := []string{}
|
||||
for _, node := range nodes {
|
||||
|
@ -79,7 +79,9 @@ func New(cloud cloudprovider.Interface, kubeClient client.Interface, clusterName
|
||||
// Run starts a background goroutine that watches for changes to services that
|
||||
// have (or had) externalLoadBalancers=true and ensures that they have external
|
||||
// load balancers created and deleted appropriately.
|
||||
func (s *ServiceController) Run() error {
|
||||
// nodeSyncPeriod controls how often we check the cluster's nodes to determine
|
||||
// if external load balancers need to be updated to point to a new set.
|
||||
func (s *ServiceController) Run(nodeSyncPeriod time.Duration) error {
|
||||
if err := s.init(); err != nil {
|
||||
return err
|
||||
}
|
||||
@ -101,6 +103,11 @@ func (s *ServiceController) Run() error {
|
||||
for i := 0; i < workerGoroutines; i++ {
|
||||
go s.watchServices(serviceQueue)
|
||||
}
|
||||
|
||||
nodeLister := &cache.StoreToNodeLister{cache.NewStore(cache.MetaNamespaceKeyFunc)}
|
||||
nodeLW := cache.NewListWatchFromClient(s.kubeClient.(*client.Client), "nodes", api.NamespaceAll, fields.Everything())
|
||||
cache.NewReflector(nodeLW, &api.Node{}, nodeLister.Store, 0).Run()
|
||||
go s.nodeSyncLoop(nodeLister, nodeSyncPeriod)
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -367,6 +374,18 @@ func (s *serviceCache) ListKeys() []string {
|
||||
return keys
|
||||
}
|
||||
|
||||
// ListKeys implements the interface required by DeltaFIFO to list the keys we
|
||||
// already know about.
|
||||
func (s *serviceCache) allServices() []*cachedService {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
services := make([]*cachedService, 0, len(s.serviceMap))
|
||||
for _, v := range s.serviceMap {
|
||||
services = append(services, v)
|
||||
}
|
||||
return services
|
||||
}
|
||||
|
||||
func (s *serviceCache) get(serviceName string) (*cachedService, bool) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
@ -445,13 +464,39 @@ func portsEqual(x, y *api.Service) bool {
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
if len(xPorts) != len(yPorts) {
|
||||
return intSlicesEqual(xPorts, yPorts)
|
||||
}
|
||||
|
||||
func intSlicesEqual(x, y []int) bool {
|
||||
if len(x) != len(y) {
|
||||
return false
|
||||
}
|
||||
sort.Ints(xPorts)
|
||||
sort.Ints(yPorts)
|
||||
for i := range xPorts {
|
||||
if xPorts[i] != yPorts[i] {
|
||||
if !sort.IntsAreSorted(x) {
|
||||
sort.Ints(x)
|
||||
}
|
||||
if !sort.IntsAreSorted(y) {
|
||||
sort.Ints(y)
|
||||
}
|
||||
for i := range x {
|
||||
if x[i] != y[i] {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func stringSlicesEqual(x, y []string) bool {
|
||||
if len(x) != len(y) {
|
||||
return false
|
||||
}
|
||||
if !sort.StringsAreSorted(x) {
|
||||
sort.Strings(x)
|
||||
}
|
||||
if !sort.StringsAreSorted(y) {
|
||||
sort.Strings(y)
|
||||
}
|
||||
for i := range x {
|
||||
if x[i] != y[i] {
|
||||
return false
|
||||
}
|
||||
}
|
||||
@ -465,3 +510,78 @@ func hostsFromNodeList(list *api.NodeList) []string {
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
// nodeSyncLoop handles updating the hosts pointed to by all external load
|
||||
// balancers whenever the set of nodes in the cluster changes.
|
||||
func (s *ServiceController) nodeSyncLoop(nodeLister *cache.StoreToNodeLister, period time.Duration) {
|
||||
var prevHosts []string
|
||||
var servicesToUpdate []*cachedService
|
||||
// TODO: Eliminate the unneeded now variable once we stop compiling in go1.3.
|
||||
// It's needed at the moment because go1.3 requires ranges to be assigned to
|
||||
// something to compile, and gofmt1.4 complains about using `_ = range`.
|
||||
for now := range time.Tick(period) {
|
||||
_ = now
|
||||
nodes, err := nodeLister.List()
|
||||
if err != nil {
|
||||
glog.Errorf("Failed to retrieve current set of nodes from node lister: %v", err)
|
||||
continue
|
||||
}
|
||||
newHosts := hostsFromNodeList(&nodes)
|
||||
if stringSlicesEqual(newHosts, prevHosts) {
|
||||
// The set of nodes in the cluster hasn't changed, but we can retry
|
||||
// updating any services that we failed to update last time around.
|
||||
servicesToUpdate = s.updateLoadBalancerHosts(servicesToUpdate, newHosts)
|
||||
continue
|
||||
}
|
||||
glog.Infof("Detected change in list of current cluster nodes. New node set: %v", newHosts)
|
||||
|
||||
// Try updating all services, and save the ones that fail to try again next
|
||||
// round.
|
||||
servicesToUpdate = s.cache.allServices()
|
||||
numServices := len(servicesToUpdate)
|
||||
servicesToUpdate = s.updateLoadBalancerHosts(servicesToUpdate, newHosts)
|
||||
glog.Infof("Successfully updated %d out of %d external load balancers to direct traffic to the updated set of nodes",
|
||||
numServices-len(servicesToUpdate), numServices)
|
||||
|
||||
prevHosts = newHosts
|
||||
}
|
||||
}
|
||||
|
||||
// updateLoadBalancerHosts updates all existing external load balancers so that
|
||||
// they will match the list of hosts provided.
|
||||
// Returns the list of services that couldn't be updated.
|
||||
func (s *ServiceController) updateLoadBalancerHosts(services []*cachedService, hosts []string) (servicesToRetry []*cachedService) {
|
||||
for _, service := range services {
|
||||
func() {
|
||||
service.mu.Lock()
|
||||
defer service.mu.Unlock()
|
||||
if err := s.lockedUpdateLoadBalancerHosts(service.service, hosts); err != nil {
|
||||
glog.Errorf("External error while updating TCP load balancer: %v.", err)
|
||||
servicesToRetry = append(servicesToRetry, service)
|
||||
}
|
||||
}()
|
||||
}
|
||||
return servicesToRetry
|
||||
}
|
||||
|
||||
// Updates the external load balancer of a service, assuming we hold the mutex
|
||||
// associated with the service.
|
||||
func (s *ServiceController) lockedUpdateLoadBalancerHosts(service *api.Service, hosts []string) error {
|
||||
if !service.Spec.CreateExternalLoadBalancer {
|
||||
return nil
|
||||
}
|
||||
|
||||
name := cloudprovider.GetLoadBalancerName(service)
|
||||
err := s.balancer.UpdateTCPLoadBalancer(name, s.zone.Region, hosts)
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
// It's only an actual error if the load balancer still exists.
|
||||
if exists, err := s.balancer.TCPLoadBalancerExists(name, s.zone.Region); err != nil {
|
||||
glog.Errorf("External error while checking if TCP load balancer %q exists: name, %v")
|
||||
} else if !exists {
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
@ -17,6 +17,7 @@ limitations under the License.
|
||||
package servicecontroller
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||
@ -27,6 +28,10 @@ import (
|
||||
|
||||
const region = "us-central"
|
||||
|
||||
func newService(name string, uid types.UID, external bool) *api.Service {
|
||||
return &api.Service{ObjectMeta: api.ObjectMeta{Name: name, Namespace: "namespace", UID: uid}, Spec: api.ServiceSpec{CreateExternalLoadBalancer: external}}
|
||||
}
|
||||
|
||||
func TestCreateExternalLoadBalancer(t *testing.T) {
|
||||
table := []struct {
|
||||
service *api.Service
|
||||
@ -124,4 +129,82 @@ func TestCreateExternalLoadBalancer(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: Finish converting and update comments
|
||||
func TestUpdateNodesInExternalLoadBalancer(t *testing.T) {
|
||||
hosts := []string{"node0", "node1", "node73"}
|
||||
table := []struct {
|
||||
services []*api.Service
|
||||
expectedUpdateCalls []fake_cloud.FakeUpdateBalancerCall
|
||||
}{
|
||||
{
|
||||
// No services present: no calls should be made.
|
||||
services: []*api.Service{},
|
||||
expectedUpdateCalls: nil,
|
||||
},
|
||||
{
|
||||
// Services do not have external load balancers: no calls should be made.
|
||||
services: []*api.Service{
|
||||
newService("s0", "111", false),
|
||||
newService("s1", "222", false),
|
||||
},
|
||||
expectedUpdateCalls: nil,
|
||||
},
|
||||
{
|
||||
// Services does have an external load balancer: one call should be made.
|
||||
services: []*api.Service{
|
||||
newService("s0", "333", true),
|
||||
},
|
||||
expectedUpdateCalls: []fake_cloud.FakeUpdateBalancerCall{
|
||||
{Name: "a333", Region: region, Hosts: []string{"node0", "node1", "node73"}},
|
||||
},
|
||||
},
|
||||
{
|
||||
// Three services have an external load balancer: three calls.
|
||||
services: []*api.Service{
|
||||
newService("s0", "444", true),
|
||||
newService("s1", "555", true),
|
||||
newService("s2", "666", true),
|
||||
},
|
||||
expectedUpdateCalls: []fake_cloud.FakeUpdateBalancerCall{
|
||||
{Name: "a444", Region: region, Hosts: []string{"node0", "node1", "node73"}},
|
||||
{Name: "a555", Region: region, Hosts: []string{"node0", "node1", "node73"}},
|
||||
{Name: "a666", Region: region, Hosts: []string{"node0", "node1", "node73"}},
|
||||
},
|
||||
},
|
||||
{
|
||||
// Two services have an external load balancer and two don't: two calls.
|
||||
services: []*api.Service{
|
||||
newService("s0", "777", false),
|
||||
newService("s1", "888", true),
|
||||
newService("s3", "999", true),
|
||||
newService("s4", "123", false),
|
||||
},
|
||||
expectedUpdateCalls: []fake_cloud.FakeUpdateBalancerCall{
|
||||
{Name: "a888", Region: region, Hosts: []string{"node0", "node1", "node73"}},
|
||||
{Name: "a999", Region: region, Hosts: []string{"node0", "node1", "node73"}},
|
||||
},
|
||||
},
|
||||
}
|
||||
for _, item := range table {
|
||||
cloud := &fake_cloud.FakeCloud{}
|
||||
|
||||
cloud.Region = region
|
||||
client := &testclient.Fake{}
|
||||
controller := New(cloud, client, "test-cluster2")
|
||||
controller.init()
|
||||
cloud.Calls = nil // ignore any cloud calls made in init()
|
||||
|
||||
var services []*cachedService
|
||||
for _, service := range item.services {
|
||||
services = append(services, &cachedService{service: service})
|
||||
}
|
||||
if err := controller.updateLoadBalancerHosts(services, hosts); err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
if !reflect.DeepEqual(item.expectedUpdateCalls, cloud.UpdateCalls) {
|
||||
t.Errorf("expected update calls mismatch, expected %+v, got %+v", item.expectedUpdateCalls, cloud.UpdateCalls)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TODO(a-robinson): Add tests for update/sync/delete.
|
||||
|
Loading…
Reference in New Issue
Block a user