Merge pull request #55336 from oracle/for/upstream/master/53462

Automatic merge from submit-queue. If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.

Fixes service controller update race condition

**What this PR does / why we need it**:

Fixes service controller update race condition that can happen with the node sync loop and the worker(s). This PR allows the node sync loop to utilize the same work queue as service updates so that the queue can ensure the service is being acted upon by only one goroutine. 

**Which issue this PR fixes** *(optional, in `fixes #<issue number>(, fixes #<issue_number>, ...)` format, will close that issue when PR gets merged)*: fixes #53462

**Special notes for your reviewer**:

**Release note**:

```release-note
NONE
```

/cc @wlan0 @luxas @prydie @andrewsykim 

/sig cluster-lifecycle
/area cloudprovider
This commit is contained in:
Kubernetes Submit Queue 2017-11-22 18:05:50 -08:00 committed by GitHub
commit ccb15fb498
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 163 additions and 72 deletions

View File

@ -50,6 +50,8 @@ go_test(
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
"//vendor/k8s.io/client-go/informers:go_default_library",
"//vendor/k8s.io/client-go/kubernetes/fake:go_default_library",
"//vendor/k8s.io/client-go/listers/core/v1:go_default_library",
"//vendor/k8s.io/client-go/tools/cache:go_default_library",
"//vendor/k8s.io/client-go/tools/record:go_default_library",
],
)

View File

@ -89,7 +89,6 @@ type serviceCache struct {
type ServiceController struct {
cloud cloudprovider.Interface
knownHosts []*v1.Node
servicesToUpdate []*v1.Service
kubeClient clientset.Interface
clusterName string
balancer cloudprovider.LoadBalancer
@ -244,6 +243,20 @@ func (s *ServiceController) processServiceUpdate(cachedService *cachedService, s
}
}
}
if cachedService.state != nil {
if !s.needsUpdate(cachedService.state, service) {
// The service does not require an update which means it was placed on the work queue
// by the node sync loop and indicates that the hosts need to be updated.
err := s.updateLoadBalancerHosts(service)
if err != nil {
return err, cachedService.nextRetryDelay()
}
cachedService.resetRetryDelay()
return nil, doNotRetry
}
}
// cache the service, we need the info for service deletion
cachedService.state = service
err, retry := s.createLoadBalancerIfNeeded(key, service)
@ -438,6 +451,8 @@ func (s *serviceCache) delete(serviceName string) {
delete(s.serviceMap, serviceName)
}
// needsUpdate checks to see if there were any changes between the old and new service that would require a load balancer update.
// This method does not and should not check if the hosts have changed.
func (s *ServiceController) needsUpdate(oldService *v1.Service, newService *v1.Service) bool {
if !wantsLoadBalancer(oldService) && !wantsLoadBalancer(newService) {
return false
@ -636,62 +651,45 @@ func getNodeConditionPredicate() corelisters.NodeConditionPredicate {
}
}
// nodeSyncLoop handles updating the hosts pointed to by all load
// balancers whenever the set of nodes in the cluster changes.
// nodeSyncLoop handles adding all existing cached services to the work queue
// to be reprocessed so that they can have their hosts updated, if any
// host changes have occurred since the last sync loop.
func (s *ServiceController) nodeSyncLoop() {
newHosts, err := s.nodeLister.ListWithPredicate(getNodeConditionPredicate())
if err != nil {
glog.Errorf("Failed to retrieve current set of nodes from node lister: %v", err)
return
}
if nodeSlicesEqualForLB(newHosts, s.knownHosts) {
// The set of nodes in the cluster hasn't changed, but we can retry
// updating any services that we failed to update last time around.
s.servicesToUpdate = s.updateLoadBalancerHosts(s.servicesToUpdate, newHosts)
// Nothing to do since the hosts have not changed.
return
}
glog.Infof("Detected change in list of current cluster nodes. New node set: %v",
nodeNames(newHosts))
glog.Infof("Detected change in list of current cluster nodes. New node set: %v", nodeNames(newHosts))
// Try updating all services, and save the ones that fail to try again next
// round.
s.servicesToUpdate = s.cache.allServices()
numServices := len(s.servicesToUpdate)
s.servicesToUpdate = s.updateLoadBalancerHosts(s.servicesToUpdate, newHosts)
glog.Infof("Successfully updated %d out of %d load balancers to direct traffic to the updated set of nodes",
numServices-len(s.servicesToUpdate), numServices)
for _, svc := range s.cache.allServices() {
s.enqueueService(svc)
}
// Update the known hosts so we can check next sync loop for changes.
s.knownHosts = newHosts
}
// updateLoadBalancerHosts updates all existing load balancers so that
// they will match the list of hosts provided.
// Returns the list of services that couldn't be updated.
func (s *ServiceController) updateLoadBalancerHosts(services []*v1.Service, hosts []*v1.Node) (servicesToRetry []*v1.Service) {
for _, service := range services {
func() {
if service == nil {
return
}
if err := s.lockedUpdateLoadBalancerHosts(service, hosts); err != nil {
glog.Errorf("External error while updating load balancer: %v.", err)
servicesToRetry = append(servicesToRetry, service)
}
}()
}
return servicesToRetry
}
// Updates the load balancer of a service, assuming we hold the mutex
// associated with the service.
func (s *ServiceController) lockedUpdateLoadBalancerHosts(service *v1.Service, hosts []*v1.Node) error {
// Updates the load balancer of the service with updated nodes ONLY.
// This method will not trigger the cloud provider to create or full update a load balancer.
func (s *ServiceController) updateLoadBalancerHosts(service *v1.Service) error {
if !wantsLoadBalancer(service) {
return nil
}
hosts, err := s.nodeLister.ListWithPredicate(getNodeConditionPredicate())
if err != nil {
return err
}
// This operation doesn't normally take very long (and happens pretty often), so we only record the final event
err := s.balancer.UpdateLoadBalancer(s.clusterName, service, hosts)
err = s.balancer.UpdateLoadBalancer(s.clusterName, service, hosts)
if err == nil {
// If there are no available nodes for LoadBalancer service, make a EventTypeWarning event for it.
if len(hosts) == 0 {

View File

@ -19,6 +19,7 @@ package service
import (
"fmt"
"reflect"
"sort"
"testing"
"time"
@ -27,6 +28,8 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/kubernetes/pkg/api/testapi"
fakecloud "k8s.io/kubernetes/pkg/cloudprovider/providers/fake"
@ -174,23 +177,45 @@ func TestCreateExternalLoadBalancer(t *testing.T) {
}
}
// newLoadBalancerNode returns a node that passes the predicate check for a
// node to receive load balancer traffic.
func newLoadBalancerNode(name string) *v1.Node {
return &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: name,
},
Spec: v1.NodeSpec{
Unschedulable: false,
},
Status: v1.NodeStatus{
Conditions: []v1.NodeCondition{
{Type: v1.NodeReady, Status: v1.ConditionTrue},
},
},
}
}
func sortNodesByName(nodes []*v1.Node) {
sort.Slice(nodes, func(i, j int) bool {
return nodes[i].Name < nodes[j].Name
})
}
// TODO: Finish converting and update comments
func TestUpdateNodesInExternalLoadBalancer(t *testing.T) {
nodes := []*v1.Node{
{ObjectMeta: metav1.ObjectMeta{Name: "node0"}},
{ObjectMeta: metav1.ObjectMeta{Name: "node1"}},
{ObjectMeta: metav1.ObjectMeta{Name: "node73"}},
newLoadBalancerNode("node0"),
newLoadBalancerNode("node1"),
newLoadBalancerNode("node73"),
}
table := []struct {
sortNodesByName(nodes)
table := map[string]struct {
services []*v1.Service
expectedUpdateCalls []fakecloud.FakeUpdateBalancerCall
}{
{
// No services present: no calls should be made.
services: []*v1.Service{},
expectedUpdateCalls: nil,
},
{
"update no load balancer": {
// Services do not have external load balancers: no calls should be made.
services: []*v1.Service{
newService("s0", "111", v1.ServiceTypeClusterIP),
@ -198,7 +223,7 @@ func TestUpdateNodesInExternalLoadBalancer(t *testing.T) {
},
expectedUpdateCalls: nil,
},
{
"update 1 load balancer": {
// Services does have an external load balancer: one call should be made.
services: []*v1.Service{
newService("s0", "333", v1.ServiceTypeLoadBalancer),
@ -207,7 +232,7 @@ func TestUpdateNodesInExternalLoadBalancer(t *testing.T) {
{Service: newService("s0", "333", v1.ServiceTypeLoadBalancer), Hosts: nodes},
},
},
{
"update 3 load balancers": {
// Three services have an external load balancer: three calls.
services: []*v1.Service{
newService("s0", "444", v1.ServiceTypeLoadBalancer),
@ -220,7 +245,7 @@ func TestUpdateNodesInExternalLoadBalancer(t *testing.T) {
{Service: newService("s2", "666", v1.ServiceTypeLoadBalancer), Hosts: nodes},
},
},
{
"update 2 load balancers": {
// Two services have an external load balancer and two don't: two calls.
services: []*v1.Service{
newService("s0", "777", v1.ServiceTypeNodePort),
@ -233,30 +258,44 @@ func TestUpdateNodesInExternalLoadBalancer(t *testing.T) {
{Service: newService("s3", "999", v1.ServiceTypeLoadBalancer), Hosts: nodes},
},
},
{
// One service has an external load balancer and one is nil: one call.
services: []*v1.Service{
newService("s0", "234", v1.ServiceTypeLoadBalancer),
nil,
},
expectedUpdateCalls: []fakecloud.FakeUpdateBalancerCall{
{Service: newService("s0", "234", v1.ServiceTypeLoadBalancer), Hosts: nodes},
},
},
}
for _, item := range table {
controller, cloud, _ := newController()
var services []*v1.Service
for _, service := range item.services {
services = append(services, service)
}
if err := controller.updateLoadBalancerHosts(services, nodes); err != nil {
t.Errorf("unexpected error: %v", err)
}
if !reflect.DeepEqual(item.expectedUpdateCalls, cloud.UpdateCalls) {
t.Errorf("expected update calls mismatch, expected %+v, got %+v", item.expectedUpdateCalls, cloud.UpdateCalls)
}
for name, item := range table {
t.Run(name, func(t *testing.T) {
controller, cloud, _ := newController()
var services []*v1.Service
for _, service := range item.services {
services = append(services, service)
}
nodeIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{})
for _, node := range nodes {
nodeIndexer.Add(node)
}
controller.nodeLister = corelisters.NewNodeLister(nodeIndexer)
for _, service := range services {
if err := controller.updateLoadBalancerHosts(service); err != nil {
t.Errorf("unexpected error: %v", err)
}
}
if len(item.expectedUpdateCalls) != len(cloud.UpdateCalls) {
t.Errorf("expected %d update calls but only got %d", len(item.expectedUpdateCalls), len(cloud.UpdateCalls))
}
for i, expectedCall := range item.expectedUpdateCalls {
actualCall := cloud.UpdateCalls[i]
if !reflect.DeepEqual(expectedCall.Service, actualCall.Service) {
t.Errorf("expected update call to contain service %+v, got %+v", expectedCall.Service, actualCall.Service)
}
sortNodesByName(actualCall.Hosts)
if !reflect.DeepEqual(expectedCall.Hosts, actualCall.Hosts) {
t.Errorf("expected update call to contain hosts %+v, got %+v", expectedCall.Hosts, actualCall.Hosts)
}
}
})
}
}
@ -311,6 +350,13 @@ func TestProcessServiceUpdate(t *testing.T) {
var controller *ServiceController
var cloud *fakecloud.FakeCloud
nodes := []*v1.Node{
newLoadBalancerNode("node0"),
newLoadBalancerNode("node1"),
newLoadBalancerNode("node73"),
}
sortNodesByName(nodes)
//A pair of old and new loadbalancer IP address
oldLBIP := "192.168.1.1"
newLBIP := "192.168.1.11"
@ -344,6 +390,51 @@ func TestProcessServiceUpdate(t *testing.T) {
return nil
},
},
{
testName: "If updating hosts only",
key: "default/sync-test-name",
svc: newService("sync-test-name", types.UID("sync-test-uid"), v1.ServiceTypeLoadBalancer),
updateFn: func(svc *v1.Service) *v1.Service {
keyExpected := svc.GetObjectMeta().GetNamespace() + "/" + svc.GetObjectMeta().GetName()
cachedServiceTest := controller.cache.getOrCreate(keyExpected)
cachedServiceTest.state = svc
controller.cache.set(keyExpected, cachedServiceTest)
// Set the nodes for the cloud's UpdateLoadBalancer call to use.
nodeIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{})
for _, node := range nodes {
nodeIndexer.Add(node)
}
controller.nodeLister = corelisters.NewNodeLister(nodeIndexer)
// This should trigger the needsUpdate false check since the service equals the cached service
return svc
},
expectedFn: func(svc *v1.Service, err error, retryDuration time.Duration) error {
if err != nil {
return err
}
if retryDuration != doNotRetry {
return fmt.Errorf("retryDuration Expected=%v Obtained=%v", doNotRetry, retryDuration)
}
if len(cloud.UpdateCalls) != 1 {
return fmt.Errorf("expected one update host call but only got %+v", cloud.UpdateCalls)
}
actualCall := cloud.UpdateCalls[0]
if !reflect.DeepEqual(svc, actualCall.Service) {
return fmt.Errorf("expected update call to contain service %+v, got %+v", svc, actualCall.Service)
}
sortNodesByName(actualCall.Hosts)
if !reflect.DeepEqual(nodes, actualCall.Hosts) {
return fmt.Errorf("expected update call to contain hosts %+v, got %+v", nodes, actualCall.Hosts)
}
return nil
},
},
{
testName: "If Updating Loadbalancer IP",
key: "default/sync-test-name",