From 83e7a85ecc3bc078141238b62bdfc5ad4dbbc8af Mon Sep 17 00:00:00 2001 From: Angus Lees Date: Tue, 23 Aug 2016 13:52:23 +1000 Subject: [PATCH] provider: Pass full node objects to *LoadBalancer Many providers need to do some sort of node name -> IP or instanceID lookup before they can use the list of hostnames passed to EnsureLoadBalancer/UpdateLoadBalancer. This change just passes the full Node object instead of simply the node name, allowing providers to use the node's provider ID and cached addresses without additional lookups. Using `node.Name` reproduces the old behaviour. --- pkg/cloudprovider/cloud.go | 10 ++- pkg/cloudprovider/providers/fake/fake.go | 12 +-- pkg/controller/service/servicecontroller.go | 63 +++++++-------- .../service/servicecontroller_test.go | 76 ++++--------------- 4 files changed, 58 insertions(+), 103 deletions(-) diff --git a/pkg/cloudprovider/cloud.go b/pkg/cloudprovider/cloud.go index 06c9674bfc4..2b03ac8c654 100644 --- a/pkg/cloudprovider/cloud.go +++ b/pkg/cloudprovider/cloud.go @@ -85,13 +85,15 @@ type LoadBalancer interface { // Parameter 'clusterName' is the name of the cluster as presented to kube-controller-manager GetLoadBalancer(clusterName string, service *v1.Service) (status *v1.LoadBalancerStatus, exists bool, err error) // EnsureLoadBalancer creates a new load balancer 'name', or updates the existing one. Returns the status of the balancer - // Implementations must treat the *v1.Service parameter as read-only and not modify it. + // Implementations must treat the *v1.Service and *v1.Node + // parameters as read-only and not modify them. // Parameter 'clusterName' is the name of the cluster as presented to kube-controller-manager - EnsureLoadBalancer(clusterName string, service *v1.Service, nodeNames []string) (*v1.LoadBalancerStatus, error) + EnsureLoadBalancer(clusterName string, service *v1.Service, nodes []*v1.Node) (*v1.LoadBalancerStatus, error) // UpdateLoadBalancer updates hosts under the specified load balancer. - // Implementations must treat the *v1.Service parameter as read-only and not modify it. + // Implementations must treat the *v1.Service and *v1.Node + // parameters as read-only and not modify them. // Parameter 'clusterName' is the name of the cluster as presented to kube-controller-manager - UpdateLoadBalancer(clusterName string, service *v1.Service, nodeNames []string) error + UpdateLoadBalancer(clusterName string, service *v1.Service, nodes []*v1.Node) error // EnsureLoadBalancerDeleted deletes the specified load balancer if it // exists, returning nil if the load balancer specified either didn't exist or // was successfully deleted. diff --git a/pkg/cloudprovider/providers/fake/fake.go b/pkg/cloudprovider/providers/fake/fake.go index e4bed45df32..ec4e2bc5726 100644 --- a/pkg/cloudprovider/providers/fake/fake.go +++ b/pkg/cloudprovider/providers/fake/fake.go @@ -36,12 +36,12 @@ type FakeBalancer struct { Region string LoadBalancerIP string Ports []v1.ServicePort - Hosts []string + Hosts []*v1.Node } type FakeUpdateBalancerCall struct { Service *v1.Service - Hosts []string + Hosts []*v1.Node } // FakeCloud is a test-double implementation of Interface, LoadBalancer, Instances, and Routes. It is useful for testing. @@ -131,7 +131,7 @@ func (f *FakeCloud) GetLoadBalancer(clusterName string, service *v1.Service) (*v // EnsureLoadBalancer is a test-spy implementation of LoadBalancer.EnsureLoadBalancer. // It adds an entry "create" into the internal method call record. -func (f *FakeCloud) EnsureLoadBalancer(clusterName string, service *v1.Service, hosts []string) (*v1.LoadBalancerStatus, error) { +func (f *FakeCloud) EnsureLoadBalancer(clusterName string, service *v1.Service, nodes []*v1.Node) (*v1.LoadBalancerStatus, error) { f.addCall("create") if f.Balancers == nil { f.Balancers = make(map[string]FakeBalancer) @@ -146,7 +146,7 @@ func (f *FakeCloud) EnsureLoadBalancer(clusterName string, service *v1.Service, } region := zone.Region - f.Balancers[name] = FakeBalancer{name, region, spec.LoadBalancerIP, spec.Ports, hosts} + f.Balancers[name] = FakeBalancer{name, region, spec.LoadBalancerIP, spec.Ports, nodes} status := &v1.LoadBalancerStatus{} status.Ingress = []v1.LoadBalancerIngress{{IP: f.ExternalIP.String()}} @@ -156,9 +156,9 @@ func (f *FakeCloud) EnsureLoadBalancer(clusterName string, service *v1.Service, // UpdateLoadBalancer is a test-spy implementation of LoadBalancer.UpdateLoadBalancer. // It adds an entry "update" into the internal method call record. -func (f *FakeCloud) UpdateLoadBalancer(clusterName string, service *v1.Service, hosts []string) error { +func (f *FakeCloud) UpdateLoadBalancer(clusterName string, service *v1.Service, nodes []*v1.Node) error { f.addCall("update") - f.UpdateCalls = append(f.UpdateCalls, FakeUpdateBalancerCall{service, hosts}) + f.UpdateCalls = append(f.UpdateCalls, FakeUpdateBalancerCall{service, nodes}) return f.Err } diff --git a/pkg/controller/service/servicecontroller.go b/pkg/controller/service/servicecontroller.go index 772aa0fad80..1069c476529 100644 --- a/pkg/controller/service/servicecontroller.go +++ b/pkg/controller/service/servicecontroller.go @@ -77,7 +77,7 @@ type serviceCache struct { type ServiceController struct { cloud cloudprovider.Interface - knownHosts []string + knownHosts []*v1.Node servicesToUpdate []*v1.Service kubeClient clientset.Interface clusterName string @@ -108,7 +108,7 @@ func New(cloud cloudprovider.Interface, kubeClient clientset.Interface, clusterN s := &ServiceController{ cloud: cloud, - knownHosts: []string{}, + knownHosts: []*v1.Node{}, kubeClient: kubeClient, clusterName: clusterName, cache: &serviceCache{serviceMap: make(map[string]*cachedService)}, @@ -344,10 +344,17 @@ func (s *ServiceController) createLoadBalancer(service *v1.Service) error { return err } + lbNodes := []*v1.Node{} + for ix := range nodes.Items { + if includeNodeFromNodeList(&nodes.Items[ix]) { + lbNodes = append(lbNodes, &nodes.Items[ix]) + } + } + // - Only one protocol supported per service // - Not all cloud providers support all protocols and the next step is expected to return // an error for unsupported protocols - status, err := s.balancer.EnsureLoadBalancer(s.clusterName, service, hostsFromNodeList(&nodes)) + status, err := s.balancer.EnsureLoadBalancer(s.clusterName, service, lbNodes) if err != nil { return err } else { @@ -533,6 +540,21 @@ func portEqualForLB(x, y *v1.ServicePort) bool { return true } +func nodeNames(nodes []*v1.Node) []string { + ret := make([]string, len(nodes)) + for i, node := range nodes { + ret[i] = node.Name + } + return ret +} + +func nodeSlicesEqualForLB(x, y []*v1.Node) bool { + if len(x) != len(y) { + return false + } + return stringSlicesEqual(nodeNames(x), nodeNames(y)) +} + func intSlicesEqual(x, y []int) bool { if len(x) != len(y) { return false @@ -573,26 +595,6 @@ func includeNodeFromNodeList(node *v1.Node) bool { return !node.Spec.Unschedulable } -func hostsFromNodeList(list *v1.NodeList) []string { - result := []string{} - for ix := range list.Items { - if includeNodeFromNodeList(&list.Items[ix]) { - result = append(result, list.Items[ix].Name) - } - } - return result -} - -func hostsFromNodeSlice(nodes []*v1.Node) []string { - result := []string{} - for _, node := range nodes { - if includeNodeFromNodeList(node) { - result = append(result, node.Name) - } - } - return result -} - func getNodeConditionPredicate() cache.NodeConditionPredicate { return func(node *v1.Node) bool { // We add the master to the node list, but its unschedulable. So we use this to filter @@ -620,19 +622,20 @@ func getNodeConditionPredicate() cache.NodeConditionPredicate { // nodeSyncLoop handles updating the hosts pointed to by all load // balancers whenever the set of nodes in the cluster changes. func (s *ServiceController) nodeSyncLoop() { - nodes, err := s.nodeLister.NodeCondition(getNodeConditionPredicate()).List() + newHosts, err := s.nodeLister.NodeCondition(getNodeConditionPredicate()).List() if err != nil { glog.Errorf("Failed to retrieve current set of nodes from node lister: %v", err) return } - newHosts := hostsFromNodeSlice(nodes) - if stringSlicesEqual(newHosts, s.knownHosts) { + if nodeSlicesEqualForLB(newHosts, s.knownHosts) { // The set of nodes in the cluster hasn't changed, but we can retry // updating any services that we failed to update last time around. s.servicesToUpdate = s.updateLoadBalancerHosts(s.servicesToUpdate, newHosts) return } - glog.Infof("Detected change in list of current cluster nodes. New node set: %v", newHosts) + + glog.Infof("Detected change in list of current cluster nodes. New node set: %v", + nodeNames(newHosts)) // Try updating all services, and save the ones that fail to try again next // round. @@ -648,7 +651,7 @@ func (s *ServiceController) nodeSyncLoop() { // updateLoadBalancerHosts updates all existing load balancers so that // they will match the list of hosts provided. // Returns the list of services that couldn't be updated. -func (s *ServiceController) updateLoadBalancerHosts(services []*v1.Service, hosts []string) (servicesToRetry []*v1.Service) { +func (s *ServiceController) updateLoadBalancerHosts(services []*v1.Service, hosts []*v1.Node) (servicesToRetry []*v1.Service) { for _, service := range services { func() { if service == nil { @@ -665,7 +668,7 @@ func (s *ServiceController) updateLoadBalancerHosts(services []*v1.Service, host // Updates the load balancer of a service, assuming we hold the mutex // associated with the service. -func (s *ServiceController) lockedUpdateLoadBalancerHosts(service *v1.Service, hosts []string) error { +func (s *ServiceController) lockedUpdateLoadBalancerHosts(service *v1.Service, hosts []*v1.Node) error { if !wantsLoadBalancer(service) { return nil } @@ -684,7 +687,7 @@ func (s *ServiceController) lockedUpdateLoadBalancerHosts(service *v1.Service, h return nil } - s.eventRecorder.Eventf(service, v1.EventTypeWarning, "LoadBalancerUpdateFailed", "Error updating load balancer with new hosts %v: %v", hosts, err) + s.eventRecorder.Eventf(service, v1.EventTypeWarning, "LoadBalancerUpdateFailed", "Error updating load balancer with new hosts %v: %v", nodeNames(hosts), err) return err } diff --git a/pkg/controller/service/servicecontroller_test.go b/pkg/controller/service/servicecontroller_test.go index 7d8d3d674cc..324a58a356b 100644 --- a/pkg/controller/service/servicecontroller_test.go +++ b/pkg/controller/service/servicecontroller_test.go @@ -145,7 +145,11 @@ func TestCreateExternalLoadBalancer(t *testing.T) { // TODO: Finish converting and update comments func TestUpdateNodesInExternalLoadBalancer(t *testing.T) { - hosts := []string{"node0", "node1", "node73"} + nodes := []*v1.Node{ + {ObjectMeta: v1.ObjectMeta{Name: "node0"}}, + {ObjectMeta: v1.ObjectMeta{Name: "node1"}}, + {ObjectMeta: v1.ObjectMeta{Name: "node73"}}, + } table := []struct { services []*v1.Service expectedUpdateCalls []fakecloud.FakeUpdateBalancerCall @@ -169,7 +173,7 @@ func TestUpdateNodesInExternalLoadBalancer(t *testing.T) { newService("s0", "333", v1.ServiceTypeLoadBalancer), }, expectedUpdateCalls: []fakecloud.FakeUpdateBalancerCall{ - {newService("s0", "333", v1.ServiceTypeLoadBalancer), hosts}, + {newService("s0", "333", v1.ServiceTypeLoadBalancer), nodes}, }, }, { @@ -180,9 +184,9 @@ func TestUpdateNodesInExternalLoadBalancer(t *testing.T) { newService("s2", "666", v1.ServiceTypeLoadBalancer), }, expectedUpdateCalls: []fakecloud.FakeUpdateBalancerCall{ - {newService("s0", "444", v1.ServiceTypeLoadBalancer), hosts}, - {newService("s1", "555", v1.ServiceTypeLoadBalancer), hosts}, - {newService("s2", "666", v1.ServiceTypeLoadBalancer), hosts}, + {newService("s0", "444", v1.ServiceTypeLoadBalancer), nodes}, + {newService("s1", "555", v1.ServiceTypeLoadBalancer), nodes}, + {newService("s2", "666", v1.ServiceTypeLoadBalancer), nodes}, }, }, { @@ -194,8 +198,8 @@ func TestUpdateNodesInExternalLoadBalancer(t *testing.T) { newService("s4", "123", v1.ServiceTypeClusterIP), }, expectedUpdateCalls: []fakecloud.FakeUpdateBalancerCall{ - {newService("s1", "888", v1.ServiceTypeLoadBalancer), hosts}, - {newService("s3", "999", v1.ServiceTypeLoadBalancer), hosts}, + {newService("s1", "888", v1.ServiceTypeLoadBalancer), nodes}, + {newService("s3", "999", v1.ServiceTypeLoadBalancer), nodes}, }, }, { @@ -205,7 +209,7 @@ func TestUpdateNodesInExternalLoadBalancer(t *testing.T) { nil, }, expectedUpdateCalls: []fakecloud.FakeUpdateBalancerCall{ - {newService("s0", "234", v1.ServiceTypeLoadBalancer), hosts}, + {newService("s0", "234", v1.ServiceTypeLoadBalancer), nodes}, }, }, } @@ -222,7 +226,7 @@ func TestUpdateNodesInExternalLoadBalancer(t *testing.T) { for _, service := range item.services { services = append(services, service) } - if err := controller.updateLoadBalancerHosts(services, hosts); err != nil { + if err := controller.updateLoadBalancerHosts(services, nodes); err != nil { t.Errorf("unexpected error: %v", err) } if !reflect.DeepEqual(item.expectedUpdateCalls, cloud.UpdateCalls) { @@ -231,60 +235,6 @@ func TestUpdateNodesInExternalLoadBalancer(t *testing.T) { } } -func TestHostsFromNodeList(t *testing.T) { - tests := []struct { - nodes *v1.NodeList - expectedHosts []string - }{ - { - nodes: &v1.NodeList{}, - expectedHosts: []string{}, - }, - { - nodes: &v1.NodeList{ - Items: []v1.Node{ - { - ObjectMeta: v1.ObjectMeta{Name: "foo"}, - Status: v1.NodeStatus{Phase: v1.NodeRunning}, - }, - { - ObjectMeta: v1.ObjectMeta{Name: "bar"}, - Status: v1.NodeStatus{Phase: v1.NodeRunning}, - }, - }, - }, - expectedHosts: []string{"foo", "bar"}, - }, - { - nodes: &v1.NodeList{ - Items: []v1.Node{ - { - ObjectMeta: v1.ObjectMeta{Name: "foo"}, - Status: v1.NodeStatus{Phase: v1.NodeRunning}, - }, - { - ObjectMeta: v1.ObjectMeta{Name: "bar"}, - Status: v1.NodeStatus{Phase: v1.NodeRunning}, - }, - { - ObjectMeta: v1.ObjectMeta{Name: "unschedulable"}, - Spec: v1.NodeSpec{Unschedulable: true}, - Status: v1.NodeStatus{Phase: v1.NodeRunning}, - }, - }, - }, - expectedHosts: []string{"foo", "bar"}, - }, - } - - for _, test := range tests { - hosts := hostsFromNodeList(test.nodes) - if !reflect.DeepEqual(hosts, test.expectedHosts) { - t.Errorf("expected: %v, saw: %v", test.expectedHosts, hosts) - } - } -} - func TestGetNodeConditionPredicate(t *testing.T) { tests := []struct { node v1.Node