provider: Pass full node objects to *LoadBalancer

Many providers need to do some sort of node name -> IP or instanceID
lookup before they can use the list of hostnames passed to
EnsureLoadBalancer/UpdateLoadBalancer.

This change just passes the full Node object instead of simply the node
name, allowing providers to use the node's provider ID and cached
addresses without additional lookups.  Using `node.Name` reproduces the
old behaviour.
This commit is contained in:
Angus Lees 2016-08-23 13:52:23 +10:00
parent ec1371b2b1
commit 83e7a85ecc
4 changed files with 58 additions and 103 deletions

View File

@ -85,13 +85,15 @@ type LoadBalancer interface {
// Parameter 'clusterName' is the name of the cluster as presented to kube-controller-manager // Parameter 'clusterName' is the name of the cluster as presented to kube-controller-manager
GetLoadBalancer(clusterName string, service *v1.Service) (status *v1.LoadBalancerStatus, exists bool, err error) GetLoadBalancer(clusterName string, service *v1.Service) (status *v1.LoadBalancerStatus, exists bool, err error)
// EnsureLoadBalancer creates a new load balancer 'name', or updates the existing one. Returns the status of the balancer // EnsureLoadBalancer creates a new load balancer 'name', or updates the existing one. Returns the status of the balancer
// Implementations must treat the *v1.Service parameter as read-only and not modify it. // Implementations must treat the *v1.Service and *v1.Node
// parameters as read-only and not modify them.
// Parameter 'clusterName' is the name of the cluster as presented to kube-controller-manager // Parameter 'clusterName' is the name of the cluster as presented to kube-controller-manager
EnsureLoadBalancer(clusterName string, service *v1.Service, nodeNames []string) (*v1.LoadBalancerStatus, error) EnsureLoadBalancer(clusterName string, service *v1.Service, nodes []*v1.Node) (*v1.LoadBalancerStatus, error)
// UpdateLoadBalancer updates hosts under the specified load balancer. // UpdateLoadBalancer updates hosts under the specified load balancer.
// Implementations must treat the *v1.Service parameter as read-only and not modify it. // Implementations must treat the *v1.Service and *v1.Node
// parameters as read-only and not modify them.
// Parameter 'clusterName' is the name of the cluster as presented to kube-controller-manager // Parameter 'clusterName' is the name of the cluster as presented to kube-controller-manager
UpdateLoadBalancer(clusterName string, service *v1.Service, nodeNames []string) error UpdateLoadBalancer(clusterName string, service *v1.Service, nodes []*v1.Node) error
// EnsureLoadBalancerDeleted deletes the specified load balancer if it // EnsureLoadBalancerDeleted deletes the specified load balancer if it
// exists, returning nil if the load balancer specified either didn't exist or // exists, returning nil if the load balancer specified either didn't exist or
// was successfully deleted. // was successfully deleted.

View File

@ -36,12 +36,12 @@ type FakeBalancer struct {
Region string Region string
LoadBalancerIP string LoadBalancerIP string
Ports []v1.ServicePort Ports []v1.ServicePort
Hosts []string Hosts []*v1.Node
} }
type FakeUpdateBalancerCall struct { type FakeUpdateBalancerCall struct {
Service *v1.Service Service *v1.Service
Hosts []string Hosts []*v1.Node
} }
// FakeCloud is a test-double implementation of Interface, LoadBalancer, Instances, and Routes. It is useful for testing. // FakeCloud is a test-double implementation of Interface, LoadBalancer, Instances, and Routes. It is useful for testing.
@ -131,7 +131,7 @@ func (f *FakeCloud) GetLoadBalancer(clusterName string, service *v1.Service) (*v
// EnsureLoadBalancer is a test-spy implementation of LoadBalancer.EnsureLoadBalancer. // EnsureLoadBalancer is a test-spy implementation of LoadBalancer.EnsureLoadBalancer.
// It adds an entry "create" into the internal method call record. // It adds an entry "create" into the internal method call record.
func (f *FakeCloud) EnsureLoadBalancer(clusterName string, service *v1.Service, hosts []string) (*v1.LoadBalancerStatus, error) { func (f *FakeCloud) EnsureLoadBalancer(clusterName string, service *v1.Service, nodes []*v1.Node) (*v1.LoadBalancerStatus, error) {
f.addCall("create") f.addCall("create")
if f.Balancers == nil { if f.Balancers == nil {
f.Balancers = make(map[string]FakeBalancer) f.Balancers = make(map[string]FakeBalancer)
@ -146,7 +146,7 @@ func (f *FakeCloud) EnsureLoadBalancer(clusterName string, service *v1.Service,
} }
region := zone.Region region := zone.Region
f.Balancers[name] = FakeBalancer{name, region, spec.LoadBalancerIP, spec.Ports, hosts} f.Balancers[name] = FakeBalancer{name, region, spec.LoadBalancerIP, spec.Ports, nodes}
status := &v1.LoadBalancerStatus{} status := &v1.LoadBalancerStatus{}
status.Ingress = []v1.LoadBalancerIngress{{IP: f.ExternalIP.String()}} status.Ingress = []v1.LoadBalancerIngress{{IP: f.ExternalIP.String()}}
@ -156,9 +156,9 @@ func (f *FakeCloud) EnsureLoadBalancer(clusterName string, service *v1.Service,
// UpdateLoadBalancer is a test-spy implementation of LoadBalancer.UpdateLoadBalancer. // UpdateLoadBalancer is a test-spy implementation of LoadBalancer.UpdateLoadBalancer.
// It adds an entry "update" into the internal method call record. // It adds an entry "update" into the internal method call record.
func (f *FakeCloud) UpdateLoadBalancer(clusterName string, service *v1.Service, hosts []string) error { func (f *FakeCloud) UpdateLoadBalancer(clusterName string, service *v1.Service, nodes []*v1.Node) error {
f.addCall("update") f.addCall("update")
f.UpdateCalls = append(f.UpdateCalls, FakeUpdateBalancerCall{service, hosts}) f.UpdateCalls = append(f.UpdateCalls, FakeUpdateBalancerCall{service, nodes})
return f.Err return f.Err
} }

View File

@ -77,7 +77,7 @@ type serviceCache struct {
type ServiceController struct { type ServiceController struct {
cloud cloudprovider.Interface cloud cloudprovider.Interface
knownHosts []string knownHosts []*v1.Node
servicesToUpdate []*v1.Service servicesToUpdate []*v1.Service
kubeClient clientset.Interface kubeClient clientset.Interface
clusterName string clusterName string
@ -108,7 +108,7 @@ func New(cloud cloudprovider.Interface, kubeClient clientset.Interface, clusterN
s := &ServiceController{ s := &ServiceController{
cloud: cloud, cloud: cloud,
knownHosts: []string{}, knownHosts: []*v1.Node{},
kubeClient: kubeClient, kubeClient: kubeClient,
clusterName: clusterName, clusterName: clusterName,
cache: &serviceCache{serviceMap: make(map[string]*cachedService)}, cache: &serviceCache{serviceMap: make(map[string]*cachedService)},
@ -344,10 +344,17 @@ func (s *ServiceController) createLoadBalancer(service *v1.Service) error {
return err return err
} }
lbNodes := []*v1.Node{}
for ix := range nodes.Items {
if includeNodeFromNodeList(&nodes.Items[ix]) {
lbNodes = append(lbNodes, &nodes.Items[ix])
}
}
// - Only one protocol supported per service // - Only one protocol supported per service
// - Not all cloud providers support all protocols and the next step is expected to return // - Not all cloud providers support all protocols and the next step is expected to return
// an error for unsupported protocols // an error for unsupported protocols
status, err := s.balancer.EnsureLoadBalancer(s.clusterName, service, hostsFromNodeList(&nodes)) status, err := s.balancer.EnsureLoadBalancer(s.clusterName, service, lbNodes)
if err != nil { if err != nil {
return err return err
} else { } else {
@ -533,6 +540,21 @@ func portEqualForLB(x, y *v1.ServicePort) bool {
return true return true
} }
func nodeNames(nodes []*v1.Node) []string {
ret := make([]string, len(nodes))
for i, node := range nodes {
ret[i] = node.Name
}
return ret
}
func nodeSlicesEqualForLB(x, y []*v1.Node) bool {
if len(x) != len(y) {
return false
}
return stringSlicesEqual(nodeNames(x), nodeNames(y))
}
func intSlicesEqual(x, y []int) bool { func intSlicesEqual(x, y []int) bool {
if len(x) != len(y) { if len(x) != len(y) {
return false return false
@ -573,26 +595,6 @@ func includeNodeFromNodeList(node *v1.Node) bool {
return !node.Spec.Unschedulable return !node.Spec.Unschedulable
} }
func hostsFromNodeList(list *v1.NodeList) []string {
result := []string{}
for ix := range list.Items {
if includeNodeFromNodeList(&list.Items[ix]) {
result = append(result, list.Items[ix].Name)
}
}
return result
}
func hostsFromNodeSlice(nodes []*v1.Node) []string {
result := []string{}
for _, node := range nodes {
if includeNodeFromNodeList(node) {
result = append(result, node.Name)
}
}
return result
}
func getNodeConditionPredicate() cache.NodeConditionPredicate { func getNodeConditionPredicate() cache.NodeConditionPredicate {
return func(node *v1.Node) bool { return func(node *v1.Node) bool {
// We add the master to the node list, but its unschedulable. So we use this to filter // We add the master to the node list, but its unschedulable. So we use this to filter
@ -620,19 +622,20 @@ func getNodeConditionPredicate() cache.NodeConditionPredicate {
// nodeSyncLoop handles updating the hosts pointed to by all load // nodeSyncLoop handles updating the hosts pointed to by all load
// balancers whenever the set of nodes in the cluster changes. // balancers whenever the set of nodes in the cluster changes.
func (s *ServiceController) nodeSyncLoop() { func (s *ServiceController) nodeSyncLoop() {
nodes, err := s.nodeLister.NodeCondition(getNodeConditionPredicate()).List() newHosts, err := s.nodeLister.NodeCondition(getNodeConditionPredicate()).List()
if err != nil { if err != nil {
glog.Errorf("Failed to retrieve current set of nodes from node lister: %v", err) glog.Errorf("Failed to retrieve current set of nodes from node lister: %v", err)
return return
} }
newHosts := hostsFromNodeSlice(nodes) if nodeSlicesEqualForLB(newHosts, s.knownHosts) {
if stringSlicesEqual(newHosts, s.knownHosts) {
// The set of nodes in the cluster hasn't changed, but we can retry // The set of nodes in the cluster hasn't changed, but we can retry
// updating any services that we failed to update last time around. // updating any services that we failed to update last time around.
s.servicesToUpdate = s.updateLoadBalancerHosts(s.servicesToUpdate, newHosts) s.servicesToUpdate = s.updateLoadBalancerHosts(s.servicesToUpdate, newHosts)
return return
} }
glog.Infof("Detected change in list of current cluster nodes. New node set: %v", newHosts)
glog.Infof("Detected change in list of current cluster nodes. New node set: %v",
nodeNames(newHosts))
// Try updating all services, and save the ones that fail to try again next // Try updating all services, and save the ones that fail to try again next
// round. // round.
@ -648,7 +651,7 @@ func (s *ServiceController) nodeSyncLoop() {
// updateLoadBalancerHosts updates all existing load balancers so that // updateLoadBalancerHosts updates all existing load balancers so that
// they will match the list of hosts provided. // they will match the list of hosts provided.
// Returns the list of services that couldn't be updated. // Returns the list of services that couldn't be updated.
func (s *ServiceController) updateLoadBalancerHosts(services []*v1.Service, hosts []string) (servicesToRetry []*v1.Service) { func (s *ServiceController) updateLoadBalancerHosts(services []*v1.Service, hosts []*v1.Node) (servicesToRetry []*v1.Service) {
for _, service := range services { for _, service := range services {
func() { func() {
if service == nil { if service == nil {
@ -665,7 +668,7 @@ func (s *ServiceController) updateLoadBalancerHosts(services []*v1.Service, host
// Updates the load balancer of a service, assuming we hold the mutex // Updates the load balancer of a service, assuming we hold the mutex
// associated with the service. // associated with the service.
func (s *ServiceController) lockedUpdateLoadBalancerHosts(service *v1.Service, hosts []string) error { func (s *ServiceController) lockedUpdateLoadBalancerHosts(service *v1.Service, hosts []*v1.Node) error {
if !wantsLoadBalancer(service) { if !wantsLoadBalancer(service) {
return nil return nil
} }
@ -684,7 +687,7 @@ func (s *ServiceController) lockedUpdateLoadBalancerHosts(service *v1.Service, h
return nil return nil
} }
s.eventRecorder.Eventf(service, v1.EventTypeWarning, "LoadBalancerUpdateFailed", "Error updating load balancer with new hosts %v: %v", hosts, err) s.eventRecorder.Eventf(service, v1.EventTypeWarning, "LoadBalancerUpdateFailed", "Error updating load balancer with new hosts %v: %v", nodeNames(hosts), err)
return err return err
} }

View File

@ -145,7 +145,11 @@ func TestCreateExternalLoadBalancer(t *testing.T) {
// TODO: Finish converting and update comments // TODO: Finish converting and update comments
func TestUpdateNodesInExternalLoadBalancer(t *testing.T) { func TestUpdateNodesInExternalLoadBalancer(t *testing.T) {
hosts := []string{"node0", "node1", "node73"} nodes := []*v1.Node{
{ObjectMeta: v1.ObjectMeta{Name: "node0"}},
{ObjectMeta: v1.ObjectMeta{Name: "node1"}},
{ObjectMeta: v1.ObjectMeta{Name: "node73"}},
}
table := []struct { table := []struct {
services []*v1.Service services []*v1.Service
expectedUpdateCalls []fakecloud.FakeUpdateBalancerCall expectedUpdateCalls []fakecloud.FakeUpdateBalancerCall
@ -169,7 +173,7 @@ func TestUpdateNodesInExternalLoadBalancer(t *testing.T) {
newService("s0", "333", v1.ServiceTypeLoadBalancer), newService("s0", "333", v1.ServiceTypeLoadBalancer),
}, },
expectedUpdateCalls: []fakecloud.FakeUpdateBalancerCall{ expectedUpdateCalls: []fakecloud.FakeUpdateBalancerCall{
{newService("s0", "333", v1.ServiceTypeLoadBalancer), hosts}, {newService("s0", "333", v1.ServiceTypeLoadBalancer), nodes},
}, },
}, },
{ {
@ -180,9 +184,9 @@ func TestUpdateNodesInExternalLoadBalancer(t *testing.T) {
newService("s2", "666", v1.ServiceTypeLoadBalancer), newService("s2", "666", v1.ServiceTypeLoadBalancer),
}, },
expectedUpdateCalls: []fakecloud.FakeUpdateBalancerCall{ expectedUpdateCalls: []fakecloud.FakeUpdateBalancerCall{
{newService("s0", "444", v1.ServiceTypeLoadBalancer), hosts}, {newService("s0", "444", v1.ServiceTypeLoadBalancer), nodes},
{newService("s1", "555", v1.ServiceTypeLoadBalancer), hosts}, {newService("s1", "555", v1.ServiceTypeLoadBalancer), nodes},
{newService("s2", "666", v1.ServiceTypeLoadBalancer), hosts}, {newService("s2", "666", v1.ServiceTypeLoadBalancer), nodes},
}, },
}, },
{ {
@ -194,8 +198,8 @@ func TestUpdateNodesInExternalLoadBalancer(t *testing.T) {
newService("s4", "123", v1.ServiceTypeClusterIP), newService("s4", "123", v1.ServiceTypeClusterIP),
}, },
expectedUpdateCalls: []fakecloud.FakeUpdateBalancerCall{ expectedUpdateCalls: []fakecloud.FakeUpdateBalancerCall{
{newService("s1", "888", v1.ServiceTypeLoadBalancer), hosts}, {newService("s1", "888", v1.ServiceTypeLoadBalancer), nodes},
{newService("s3", "999", v1.ServiceTypeLoadBalancer), hosts}, {newService("s3", "999", v1.ServiceTypeLoadBalancer), nodes},
}, },
}, },
{ {
@ -205,7 +209,7 @@ func TestUpdateNodesInExternalLoadBalancer(t *testing.T) {
nil, nil,
}, },
expectedUpdateCalls: []fakecloud.FakeUpdateBalancerCall{ expectedUpdateCalls: []fakecloud.FakeUpdateBalancerCall{
{newService("s0", "234", v1.ServiceTypeLoadBalancer), hosts}, {newService("s0", "234", v1.ServiceTypeLoadBalancer), nodes},
}, },
}, },
} }
@ -222,7 +226,7 @@ func TestUpdateNodesInExternalLoadBalancer(t *testing.T) {
for _, service := range item.services { for _, service := range item.services {
services = append(services, service) services = append(services, service)
} }
if err := controller.updateLoadBalancerHosts(services, hosts); err != nil { if err := controller.updateLoadBalancerHosts(services, nodes); err != nil {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
} }
if !reflect.DeepEqual(item.expectedUpdateCalls, cloud.UpdateCalls) { if !reflect.DeepEqual(item.expectedUpdateCalls, cloud.UpdateCalls) {
@ -231,60 +235,6 @@ func TestUpdateNodesInExternalLoadBalancer(t *testing.T) {
} }
} }
func TestHostsFromNodeList(t *testing.T) {
tests := []struct {
nodes *v1.NodeList
expectedHosts []string
}{
{
nodes: &v1.NodeList{},
expectedHosts: []string{},
},
{
nodes: &v1.NodeList{
Items: []v1.Node{
{
ObjectMeta: v1.ObjectMeta{Name: "foo"},
Status: v1.NodeStatus{Phase: v1.NodeRunning},
},
{
ObjectMeta: v1.ObjectMeta{Name: "bar"},
Status: v1.NodeStatus{Phase: v1.NodeRunning},
},
},
},
expectedHosts: []string{"foo", "bar"},
},
{
nodes: &v1.NodeList{
Items: []v1.Node{
{
ObjectMeta: v1.ObjectMeta{Name: "foo"},
Status: v1.NodeStatus{Phase: v1.NodeRunning},
},
{
ObjectMeta: v1.ObjectMeta{Name: "bar"},
Status: v1.NodeStatus{Phase: v1.NodeRunning},
},
{
ObjectMeta: v1.ObjectMeta{Name: "unschedulable"},
Spec: v1.NodeSpec{Unschedulable: true},
Status: v1.NodeStatus{Phase: v1.NodeRunning},
},
},
},
expectedHosts: []string{"foo", "bar"},
},
}
for _, test := range tests {
hosts := hostsFromNodeList(test.nodes)
if !reflect.DeepEqual(hosts, test.expectedHosts) {
t.Errorf("expected: %v, saw: %v", test.expectedHosts, hosts)
}
}
}
func TestGetNodeConditionPredicate(t *testing.T) { func TestGetNodeConditionPredicate(t *testing.T) {
tests := []struct { tests := []struct {
node v1.Node node v1.Node