mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-24 20:24:09 +00:00
Merge pull request #56520 from oracle/for/upstream/master/revert-55336
Automatic merge from submit-queue (batch tested with PRs 56520, 53764). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>. Revert "Merge pull request #55336 from oracle/for/upstream/master/53462" This reverts commitccb15fb498
, reversing changes made to4904037645
. **What this PR does / why we need it**: Reverting this PR due to the discussion https://github.com/kubernetes/kubernetes/pull/56448#discussion_r153508837 and https://github.com/kubernetes/kubernetes/pull/55336#discussion_r153652468. **Which issue this PR fixes** *(optional, in `fixes #<issue number>(, fixes #<issue_number>, ...)` format, will close that issue when PR gets merged)*: fixes #56443 **Special notes for your reviewer**: **Release note**: ```release-note NONE ``` /cc @thockin @luxas @wlan0 @MrHohn /priority critical-urgent
This commit is contained in:
commit
e9cf80f7c9
@ -50,8 +50,6 @@ go_test(
|
|||||||
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
|
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
|
||||||
"//vendor/k8s.io/client-go/informers:go_default_library",
|
"//vendor/k8s.io/client-go/informers:go_default_library",
|
||||||
"//vendor/k8s.io/client-go/kubernetes/fake:go_default_library",
|
"//vendor/k8s.io/client-go/kubernetes/fake:go_default_library",
|
||||||
"//vendor/k8s.io/client-go/listers/core/v1:go_default_library",
|
|
||||||
"//vendor/k8s.io/client-go/tools/cache:go_default_library",
|
|
||||||
"//vendor/k8s.io/client-go/tools/record:go_default_library",
|
"//vendor/k8s.io/client-go/tools/record:go_default_library",
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
@ -89,6 +89,7 @@ type serviceCache struct {
|
|||||||
type ServiceController struct {
|
type ServiceController struct {
|
||||||
cloud cloudprovider.Interface
|
cloud cloudprovider.Interface
|
||||||
knownHosts []*v1.Node
|
knownHosts []*v1.Node
|
||||||
|
servicesToUpdate []*v1.Service
|
||||||
kubeClient clientset.Interface
|
kubeClient clientset.Interface
|
||||||
clusterName string
|
clusterName string
|
||||||
balancer cloudprovider.LoadBalancer
|
balancer cloudprovider.LoadBalancer
|
||||||
@ -243,20 +244,6 @@ func (s *ServiceController) processServiceUpdate(cachedService *cachedService, s
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if cachedService.state != nil {
|
|
||||||
if !s.needsUpdate(cachedService.state, service) {
|
|
||||||
// The service does not require an update which means it was placed on the work queue
|
|
||||||
// by the node sync loop and indicates that the hosts need to be updated.
|
|
||||||
err := s.updateLoadBalancerHosts(service)
|
|
||||||
if err != nil {
|
|
||||||
return err, cachedService.nextRetryDelay()
|
|
||||||
}
|
|
||||||
cachedService.resetRetryDelay()
|
|
||||||
return nil, doNotRetry
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// cache the service, we need the info for service deletion
|
// cache the service, we need the info for service deletion
|
||||||
cachedService.state = service
|
cachedService.state = service
|
||||||
err, retry := s.createLoadBalancerIfNeeded(key, service)
|
err, retry := s.createLoadBalancerIfNeeded(key, service)
|
||||||
@ -451,8 +438,6 @@ func (s *serviceCache) delete(serviceName string) {
|
|||||||
delete(s.serviceMap, serviceName)
|
delete(s.serviceMap, serviceName)
|
||||||
}
|
}
|
||||||
|
|
||||||
// needsUpdate checks to see if there were any changes between the old and new service that would require a load balancer update.
|
|
||||||
// This method does not and should not check if the hosts have changed.
|
|
||||||
func (s *ServiceController) needsUpdate(oldService *v1.Service, newService *v1.Service) bool {
|
func (s *ServiceController) needsUpdate(oldService *v1.Service, newService *v1.Service) bool {
|
||||||
if !wantsLoadBalancer(oldService) && !wantsLoadBalancer(newService) {
|
if !wantsLoadBalancer(oldService) && !wantsLoadBalancer(newService) {
|
||||||
return false
|
return false
|
||||||
@ -651,45 +636,62 @@ func getNodeConditionPredicate() corelisters.NodeConditionPredicate {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// nodeSyncLoop handles adding all existing cached services to the work queue
|
// nodeSyncLoop handles updating the hosts pointed to by all load
|
||||||
// to be reprocessed so that they can have their hosts updated, if any
|
// balancers whenever the set of nodes in the cluster changes.
|
||||||
// host changes have occurred since the last sync loop.
|
|
||||||
func (s *ServiceController) nodeSyncLoop() {
|
func (s *ServiceController) nodeSyncLoop() {
|
||||||
newHosts, err := s.nodeLister.ListWithPredicate(getNodeConditionPredicate())
|
newHosts, err := s.nodeLister.ListWithPredicate(getNodeConditionPredicate())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("Failed to retrieve current set of nodes from node lister: %v", err)
|
glog.Errorf("Failed to retrieve current set of nodes from node lister: %v", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if nodeSlicesEqualForLB(newHosts, s.knownHosts) {
|
if nodeSlicesEqualForLB(newHosts, s.knownHosts) {
|
||||||
// Nothing to do since the hosts have not changed.
|
// The set of nodes in the cluster hasn't changed, but we can retry
|
||||||
|
// updating any services that we failed to update last time around.
|
||||||
|
s.servicesToUpdate = s.updateLoadBalancerHosts(s.servicesToUpdate, newHosts)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
glog.Infof("Detected change in list of current cluster nodes. New node set: %v", nodeNames(newHosts))
|
glog.Infof("Detected change in list of current cluster nodes. New node set: %v",
|
||||||
|
nodeNames(newHosts))
|
||||||
|
|
||||||
for _, svc := range s.cache.allServices() {
|
// Try updating all services, and save the ones that fail to try again next
|
||||||
s.enqueueService(svc)
|
// round.
|
||||||
}
|
s.servicesToUpdate = s.cache.allServices()
|
||||||
|
numServices := len(s.servicesToUpdate)
|
||||||
|
s.servicesToUpdate = s.updateLoadBalancerHosts(s.servicesToUpdate, newHosts)
|
||||||
|
glog.Infof("Successfully updated %d out of %d load balancers to direct traffic to the updated set of nodes",
|
||||||
|
numServices-len(s.servicesToUpdate), numServices)
|
||||||
|
|
||||||
// Update the known hosts so we can check next sync loop for changes.
|
|
||||||
s.knownHosts = newHosts
|
s.knownHosts = newHosts
|
||||||
}
|
}
|
||||||
|
|
||||||
// Updates the load balancer of the service with updated nodes ONLY.
|
// updateLoadBalancerHosts updates all existing load balancers so that
|
||||||
// This method will not trigger the cloud provider to create or full update a load balancer.
|
// they will match the list of hosts provided.
|
||||||
func (s *ServiceController) updateLoadBalancerHosts(service *v1.Service) error {
|
// Returns the list of services that couldn't be updated.
|
||||||
|
func (s *ServiceController) updateLoadBalancerHosts(services []*v1.Service, hosts []*v1.Node) (servicesToRetry []*v1.Service) {
|
||||||
|
for _, service := range services {
|
||||||
|
func() {
|
||||||
|
if service == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if err := s.lockedUpdateLoadBalancerHosts(service, hosts); err != nil {
|
||||||
|
glog.Errorf("External error while updating load balancer: %v.", err)
|
||||||
|
servicesToRetry = append(servicesToRetry, service)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
return servicesToRetry
|
||||||
|
}
|
||||||
|
|
||||||
|
// Updates the load balancer of a service, assuming we hold the mutex
|
||||||
|
// associated with the service.
|
||||||
|
func (s *ServiceController) lockedUpdateLoadBalancerHosts(service *v1.Service, hosts []*v1.Node) error {
|
||||||
if !wantsLoadBalancer(service) {
|
if !wantsLoadBalancer(service) {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
hosts, err := s.nodeLister.ListWithPredicate(getNodeConditionPredicate())
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// This operation doesn't normally take very long (and happens pretty often), so we only record the final event
|
// This operation doesn't normally take very long (and happens pretty often), so we only record the final event
|
||||||
err = s.balancer.UpdateLoadBalancer(s.clusterName, service, hosts)
|
err := s.balancer.UpdateLoadBalancer(s.clusterName, service, hosts)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
// If there are no available nodes for LoadBalancer service, make a EventTypeWarning event for it.
|
// If there are no available nodes for LoadBalancer service, make a EventTypeWarning event for it.
|
||||||
if len(hosts) == 0 {
|
if len(hosts) == 0 {
|
||||||
|
@ -19,7 +19,6 @@ package service
|
|||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"reflect"
|
"reflect"
|
||||||
"sort"
|
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -28,8 +27,6 @@ import (
|
|||||||
"k8s.io/apimachinery/pkg/types"
|
"k8s.io/apimachinery/pkg/types"
|
||||||
"k8s.io/client-go/informers"
|
"k8s.io/client-go/informers"
|
||||||
"k8s.io/client-go/kubernetes/fake"
|
"k8s.io/client-go/kubernetes/fake"
|
||||||
corelisters "k8s.io/client-go/listers/core/v1"
|
|
||||||
"k8s.io/client-go/tools/cache"
|
|
||||||
"k8s.io/client-go/tools/record"
|
"k8s.io/client-go/tools/record"
|
||||||
"k8s.io/kubernetes/pkg/api/testapi"
|
"k8s.io/kubernetes/pkg/api/testapi"
|
||||||
fakecloud "k8s.io/kubernetes/pkg/cloudprovider/providers/fake"
|
fakecloud "k8s.io/kubernetes/pkg/cloudprovider/providers/fake"
|
||||||
@ -177,45 +174,23 @@ func TestCreateExternalLoadBalancer(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// newLoadBalancerNode returns a node that passes the predicate check for a
|
|
||||||
// node to receive load balancer traffic.
|
|
||||||
func newLoadBalancerNode(name string) *v1.Node {
|
|
||||||
return &v1.Node{
|
|
||||||
ObjectMeta: metav1.ObjectMeta{
|
|
||||||
Name: name,
|
|
||||||
},
|
|
||||||
Spec: v1.NodeSpec{
|
|
||||||
Unschedulable: false,
|
|
||||||
},
|
|
||||||
Status: v1.NodeStatus{
|
|
||||||
Conditions: []v1.NodeCondition{
|
|
||||||
{Type: v1.NodeReady, Status: v1.ConditionTrue},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func sortNodesByName(nodes []*v1.Node) {
|
|
||||||
sort.Slice(nodes, func(i, j int) bool {
|
|
||||||
return nodes[i].Name < nodes[j].Name
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO: Finish converting and update comments
|
// TODO: Finish converting and update comments
|
||||||
func TestUpdateNodesInExternalLoadBalancer(t *testing.T) {
|
func TestUpdateNodesInExternalLoadBalancer(t *testing.T) {
|
||||||
|
|
||||||
nodes := []*v1.Node{
|
nodes := []*v1.Node{
|
||||||
newLoadBalancerNode("node0"),
|
{ObjectMeta: metav1.ObjectMeta{Name: "node0"}},
|
||||||
newLoadBalancerNode("node1"),
|
{ObjectMeta: metav1.ObjectMeta{Name: "node1"}},
|
||||||
newLoadBalancerNode("node73"),
|
{ObjectMeta: metav1.ObjectMeta{Name: "node73"}},
|
||||||
}
|
}
|
||||||
sortNodesByName(nodes)
|
table := []struct {
|
||||||
|
|
||||||
table := map[string]struct {
|
|
||||||
services []*v1.Service
|
services []*v1.Service
|
||||||
expectedUpdateCalls []fakecloud.FakeUpdateBalancerCall
|
expectedUpdateCalls []fakecloud.FakeUpdateBalancerCall
|
||||||
}{
|
}{
|
||||||
"update no load balancer": {
|
{
|
||||||
|
// No services present: no calls should be made.
|
||||||
|
services: []*v1.Service{},
|
||||||
|
expectedUpdateCalls: nil,
|
||||||
|
},
|
||||||
|
{
|
||||||
// Services do not have external load balancers: no calls should be made.
|
// Services do not have external load balancers: no calls should be made.
|
||||||
services: []*v1.Service{
|
services: []*v1.Service{
|
||||||
newService("s0", "111", v1.ServiceTypeClusterIP),
|
newService("s0", "111", v1.ServiceTypeClusterIP),
|
||||||
@ -223,7 +198,7 @@ func TestUpdateNodesInExternalLoadBalancer(t *testing.T) {
|
|||||||
},
|
},
|
||||||
expectedUpdateCalls: nil,
|
expectedUpdateCalls: nil,
|
||||||
},
|
},
|
||||||
"update 1 load balancer": {
|
{
|
||||||
// Services does have an external load balancer: one call should be made.
|
// Services does have an external load balancer: one call should be made.
|
||||||
services: []*v1.Service{
|
services: []*v1.Service{
|
||||||
newService("s0", "333", v1.ServiceTypeLoadBalancer),
|
newService("s0", "333", v1.ServiceTypeLoadBalancer),
|
||||||
@ -232,7 +207,7 @@ func TestUpdateNodesInExternalLoadBalancer(t *testing.T) {
|
|||||||
{Service: newService("s0", "333", v1.ServiceTypeLoadBalancer), Hosts: nodes},
|
{Service: newService("s0", "333", v1.ServiceTypeLoadBalancer), Hosts: nodes},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
"update 3 load balancers": {
|
{
|
||||||
// Three services have an external load balancer: three calls.
|
// Three services have an external load balancer: three calls.
|
||||||
services: []*v1.Service{
|
services: []*v1.Service{
|
||||||
newService("s0", "444", v1.ServiceTypeLoadBalancer),
|
newService("s0", "444", v1.ServiceTypeLoadBalancer),
|
||||||
@ -245,7 +220,7 @@ func TestUpdateNodesInExternalLoadBalancer(t *testing.T) {
|
|||||||
{Service: newService("s2", "666", v1.ServiceTypeLoadBalancer), Hosts: nodes},
|
{Service: newService("s2", "666", v1.ServiceTypeLoadBalancer), Hosts: nodes},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
"update 2 load balancers": {
|
{
|
||||||
// Two services have an external load balancer and two don't: two calls.
|
// Two services have an external load balancer and two don't: two calls.
|
||||||
services: []*v1.Service{
|
services: []*v1.Service{
|
||||||
newService("s0", "777", v1.ServiceTypeNodePort),
|
newService("s0", "777", v1.ServiceTypeNodePort),
|
||||||
@ -258,44 +233,30 @@ func TestUpdateNodesInExternalLoadBalancer(t *testing.T) {
|
|||||||
{Service: newService("s3", "999", v1.ServiceTypeLoadBalancer), Hosts: nodes},
|
{Service: newService("s3", "999", v1.ServiceTypeLoadBalancer), Hosts: nodes},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
// One service has an external load balancer and one is nil: one call.
|
||||||
|
services: []*v1.Service{
|
||||||
|
newService("s0", "234", v1.ServiceTypeLoadBalancer),
|
||||||
|
nil,
|
||||||
|
},
|
||||||
|
expectedUpdateCalls: []fakecloud.FakeUpdateBalancerCall{
|
||||||
|
{Service: newService("s0", "234", v1.ServiceTypeLoadBalancer), Hosts: nodes},
|
||||||
|
},
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
for _, item := range table {
|
||||||
|
controller, cloud, _ := newController()
|
||||||
|
|
||||||
for name, item := range table {
|
var services []*v1.Service
|
||||||
t.Run(name, func(t *testing.T) {
|
for _, service := range item.services {
|
||||||
controller, cloud, _ := newController()
|
services = append(services, service)
|
||||||
|
}
|
||||||
var services []*v1.Service
|
if err := controller.updateLoadBalancerHosts(services, nodes); err != nil {
|
||||||
for _, service := range item.services {
|
t.Errorf("unexpected error: %v", err)
|
||||||
services = append(services, service)
|
}
|
||||||
}
|
if !reflect.DeepEqual(item.expectedUpdateCalls, cloud.UpdateCalls) {
|
||||||
nodeIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{})
|
t.Errorf("expected update calls mismatch, expected %+v, got %+v", item.expectedUpdateCalls, cloud.UpdateCalls)
|
||||||
for _, node := range nodes {
|
}
|
||||||
nodeIndexer.Add(node)
|
|
||||||
}
|
|
||||||
controller.nodeLister = corelisters.NewNodeLister(nodeIndexer)
|
|
||||||
|
|
||||||
for _, service := range services {
|
|
||||||
if err := controller.updateLoadBalancerHosts(service); err != nil {
|
|
||||||
t.Errorf("unexpected error: %v", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(item.expectedUpdateCalls) != len(cloud.UpdateCalls) {
|
|
||||||
t.Errorf("expected %d update calls but only got %d", len(item.expectedUpdateCalls), len(cloud.UpdateCalls))
|
|
||||||
}
|
|
||||||
|
|
||||||
for i, expectedCall := range item.expectedUpdateCalls {
|
|
||||||
actualCall := cloud.UpdateCalls[i]
|
|
||||||
if !reflect.DeepEqual(expectedCall.Service, actualCall.Service) {
|
|
||||||
t.Errorf("expected update call to contain service %+v, got %+v", expectedCall.Service, actualCall.Service)
|
|
||||||
}
|
|
||||||
|
|
||||||
sortNodesByName(actualCall.Hosts)
|
|
||||||
if !reflect.DeepEqual(expectedCall.Hosts, actualCall.Hosts) {
|
|
||||||
t.Errorf("expected update call to contain hosts %+v, got %+v", expectedCall.Hosts, actualCall.Hosts)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -350,13 +311,6 @@ func TestProcessServiceUpdate(t *testing.T) {
|
|||||||
var controller *ServiceController
|
var controller *ServiceController
|
||||||
var cloud *fakecloud.FakeCloud
|
var cloud *fakecloud.FakeCloud
|
||||||
|
|
||||||
nodes := []*v1.Node{
|
|
||||||
newLoadBalancerNode("node0"),
|
|
||||||
newLoadBalancerNode("node1"),
|
|
||||||
newLoadBalancerNode("node73"),
|
|
||||||
}
|
|
||||||
sortNodesByName(nodes)
|
|
||||||
|
|
||||||
//A pair of old and new loadbalancer IP address
|
//A pair of old and new loadbalancer IP address
|
||||||
oldLBIP := "192.168.1.1"
|
oldLBIP := "192.168.1.1"
|
||||||
newLBIP := "192.168.1.11"
|
newLBIP := "192.168.1.11"
|
||||||
@ -390,51 +344,6 @@ func TestProcessServiceUpdate(t *testing.T) {
|
|||||||
return nil
|
return nil
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
{
|
|
||||||
testName: "If updating hosts only",
|
|
||||||
key: "default/sync-test-name",
|
|
||||||
svc: newService("sync-test-name", types.UID("sync-test-uid"), v1.ServiceTypeLoadBalancer),
|
|
||||||
updateFn: func(svc *v1.Service) *v1.Service {
|
|
||||||
keyExpected := svc.GetObjectMeta().GetNamespace() + "/" + svc.GetObjectMeta().GetName()
|
|
||||||
cachedServiceTest := controller.cache.getOrCreate(keyExpected)
|
|
||||||
cachedServiceTest.state = svc
|
|
||||||
controller.cache.set(keyExpected, cachedServiceTest)
|
|
||||||
|
|
||||||
// Set the nodes for the cloud's UpdateLoadBalancer call to use.
|
|
||||||
nodeIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{})
|
|
||||||
for _, node := range nodes {
|
|
||||||
nodeIndexer.Add(node)
|
|
||||||
}
|
|
||||||
controller.nodeLister = corelisters.NewNodeLister(nodeIndexer)
|
|
||||||
|
|
||||||
// This should trigger the needsUpdate false check since the service equals the cached service
|
|
||||||
return svc
|
|
||||||
},
|
|
||||||
expectedFn: func(svc *v1.Service, err error, retryDuration time.Duration) error {
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if retryDuration != doNotRetry {
|
|
||||||
return fmt.Errorf("retryDuration Expected=%v Obtained=%v", doNotRetry, retryDuration)
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(cloud.UpdateCalls) != 1 {
|
|
||||||
return fmt.Errorf("expected one update host call but only got %+v", cloud.UpdateCalls)
|
|
||||||
}
|
|
||||||
|
|
||||||
actualCall := cloud.UpdateCalls[0]
|
|
||||||
if !reflect.DeepEqual(svc, actualCall.Service) {
|
|
||||||
return fmt.Errorf("expected update call to contain service %+v, got %+v", svc, actualCall.Service)
|
|
||||||
}
|
|
||||||
|
|
||||||
sortNodesByName(actualCall.Hosts)
|
|
||||||
if !reflect.DeepEqual(nodes, actualCall.Hosts) {
|
|
||||||
return fmt.Errorf("expected update call to contain hosts %+v, got %+v", nodes, actualCall.Hosts)
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
},
|
|
||||||
},
|
|
||||||
{
|
{
|
||||||
testName: "If Updating Loadbalancer IP",
|
testName: "If Updating Loadbalancer IP",
|
||||||
key: "default/sync-test-name",
|
key: "default/sync-test-name",
|
||||||
|
Loading…
Reference in New Issue
Block a user