mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 11:50:44 +00:00
Update service controller to prevent orphaned public IP addresses
This commit is contained in:
parent
66755bda05
commit
c8c88cefad
@ -92,6 +92,8 @@ const (
|
||||
)
|
||||
|
||||
type cachedService struct {
|
||||
// Protects cached service state
|
||||
mu sync.Mutex
|
||||
// The cached state of the service
|
||||
state *v1.Service
|
||||
}
|
||||
@ -106,7 +108,7 @@ type serviceCache struct {
|
||||
type ServiceController struct {
|
||||
cloud cloudprovider.Interface
|
||||
knownHosts []*v1.Node
|
||||
servicesToUpdate []*v1.Service
|
||||
servicesToUpdate []string
|
||||
kubeClient clientset.Interface
|
||||
clusterName string
|
||||
balancer cloudprovider.LoadBalancer
|
||||
@ -277,6 +279,8 @@ func (s *ServiceController) processServiceCreateOrUpdate(service *v1.Service, ke
|
||||
// TODO(@MrHohn): Remove the cache once we get rid of the non-finalizer deletion
|
||||
// path. Ref https://github.com/kubernetes/enhancements/issues/980.
|
||||
cachedService := s.cache.getOrCreate(key)
|
||||
cachedService.mu.Lock()
|
||||
defer cachedService.mu.Unlock()
|
||||
if cachedService.state != nil && cachedService.state.UID != service.UID {
|
||||
// This happens only when a service is deleted and re-created
|
||||
// in a short period, which is only possible when it doesn't
|
||||
@ -329,8 +333,10 @@ func (s *ServiceController) syncLoadBalancerIfNeeded(service *v1.Service, key st
|
||||
if err != nil {
|
||||
return op, fmt.Errorf("failed to check if load balancer exists before cleanup: %v", err)
|
||||
}
|
||||
if exists {
|
||||
klog.V(2).Infof("Deleting existing load balancer for service %s", key)
|
||||
// Always call EnsureLoadBalancerDeleted against load balancers to
|
||||
// ensure the underlying components are completely deleted
|
||||
if exists || service.Spec.Type == v1.ServiceTypeLoadBalancer {
|
||||
klog.V(2).Infof("Deleting load balancer for service %s", key)
|
||||
s.eventRecorder.Event(service, v1.EventTypeNormal, "DeletingLoadBalancer", "Deleting load balancer")
|
||||
if err := s.balancer.EnsureLoadBalancerDeleted(context.TODO(), s.clusterName, service); err != nil {
|
||||
return op, fmt.Errorf("failed to delete load balancer: %v", err)
|
||||
@ -425,18 +431,6 @@ func (s *serviceCache) GetByKey(key string) (interface{}, bool, error) {
|
||||
return nil, false, nil
|
||||
}
|
||||
|
||||
// ListKeys implements the interface required by DeltaFIFO to list the keys we
|
||||
// already know about.
|
||||
func (s *serviceCache) allServices() []*v1.Service {
|
||||
s.mu.RLock()
|
||||
defer s.mu.RUnlock()
|
||||
services := make([]*v1.Service, 0, len(s.serviceMap))
|
||||
for _, v := range s.serviceMap {
|
||||
services = append(services, v.state)
|
||||
}
|
||||
return services
|
||||
}
|
||||
|
||||
func (s *serviceCache) get(serviceName string) (*cachedService, bool) {
|
||||
s.mu.RLock()
|
||||
defer s.mu.RUnlock()
|
||||
@ -692,7 +686,7 @@ func (s *ServiceController) nodeSyncLoop() {
|
||||
|
||||
// Try updating all services, and save the ones that fail to try again next
|
||||
// round.
|
||||
s.servicesToUpdate = s.cache.allServices()
|
||||
s.servicesToUpdate = s.cache.ListKeys()
|
||||
numServices := len(s.servicesToUpdate)
|
||||
s.servicesToUpdate = s.updateLoadBalancerHosts(s.servicesToUpdate, newHosts)
|
||||
klog.V(2).Infof("Successfully updated %d out of %d load balancers to direct traffic to the updated set of nodes",
|
||||
@ -704,15 +698,21 @@ func (s *ServiceController) nodeSyncLoop() {
|
||||
// updateLoadBalancerHosts updates all existing load balancers so that
|
||||
// they will match the list of hosts provided.
|
||||
// Returns the list of services that couldn't be updated.
|
||||
func (s *ServiceController) updateLoadBalancerHosts(services []*v1.Service, hosts []*v1.Node) (servicesToRetry []*v1.Service) {
|
||||
for _, service := range services {
|
||||
func (s *ServiceController) updateLoadBalancerHosts(keys []string, hosts []*v1.Node) (servicesToRetry []string) {
|
||||
for _, key := range keys {
|
||||
func() {
|
||||
if service == nil {
|
||||
cachedService, ok := s.cache.get(key)
|
||||
// Check if we already deleted the load balancer that was created for the service
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
cachedService.mu.Lock()
|
||||
defer cachedService.mu.Unlock()
|
||||
service := cachedService.state
|
||||
if err := s.lockedUpdateLoadBalancerHosts(service, hosts); err != nil {
|
||||
runtime.HandleError(fmt.Errorf("failed to update load balancer hosts for service %s/%s: %v", service.Namespace, service.Name, err))
|
||||
servicesToRetry = append(servicesToRetry, service)
|
||||
servicesToRetry = append(servicesToRetry, key)
|
||||
}
|
||||
}()
|
||||
}
|
||||
@ -722,7 +722,7 @@ func (s *ServiceController) updateLoadBalancerHosts(services []*v1.Service, host
|
||||
// Updates the load balancer of a service, assuming we hold the mutex
|
||||
// associated with the service.
|
||||
func (s *ServiceController) lockedUpdateLoadBalancerHosts(service *v1.Service, hosts []*v1.Node) error {
|
||||
if !wantsLoadBalancer(service) {
|
||||
if needsCleanup(service) {
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -800,6 +800,8 @@ func (s *ServiceController) processServiceDeletion(key string) error {
|
||||
// In both cases we have nothing left to do.
|
||||
return nil
|
||||
}
|
||||
cachedService.mu.Lock()
|
||||
defer cachedService.mu.Unlock()
|
||||
klog.V(2).Infof("Service %v has been deleted. Attempting to cleanup load balancer resources", key)
|
||||
if err := s.processLoadBalancerDelete(cachedService.state, key); err != nil {
|
||||
return err
|
||||
|
@ -49,10 +49,11 @@ const region = "us-central"
|
||||
func newService(name string, uid types.UID, serviceType v1.ServiceType) *v1.Service {
|
||||
return &v1.Service{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: name,
|
||||
Namespace: "default",
|
||||
UID: uid,
|
||||
SelfLink: testapi.Default.SelfLink("services", name),
|
||||
Name: name,
|
||||
Namespace: "default",
|
||||
UID: uid,
|
||||
SelfLink: testapi.Default.SelfLink("services", name),
|
||||
Finalizers: []string{servicehelper.LoadBalancerCleanupFinalizer},
|
||||
},
|
||||
Spec: v1.ServiceSpec{
|
||||
Type: serviceType,
|
||||
@ -60,6 +61,12 @@ func newService(name string, uid types.UID, serviceType v1.ServiceType) *v1.Serv
|
||||
}
|
||||
}
|
||||
|
||||
func newServiceWithDeletionTimestamp(name string, uid types.UID, serviceType v1.ServiceType, deletionTimestamp metav1.Time) *v1.Service {
|
||||
service := newService(name, uid, serviceType)
|
||||
service.ObjectMeta.DeletionTimestamp = &deletionTimestamp
|
||||
return service
|
||||
}
|
||||
|
||||
//Wrap newService so that you don't have to call default arguments again and again.
|
||||
func defaultExternalService() *v1.Service {
|
||||
return newService("external-balancer", types.UID("123"), v1.ServiceTypeLoadBalancer)
|
||||
@ -303,6 +310,30 @@ func TestSyncLoadBalancerIfNeeded(t *testing.T) {
|
||||
expectPatchStatus: true,
|
||||
expectPatchFinalizer: false,
|
||||
},
|
||||
{
|
||||
desc: "service that needs cleanup but LB does not exist",
|
||||
service: &v1.Service{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "basic-service1",
|
||||
Namespace: "default",
|
||||
SelfLink: testapi.Default.SelfLink("services", "basic-service1"),
|
||||
Finalizers: []string{servicehelper.LoadBalancerCleanupFinalizer},
|
||||
DeletionTimestamp: &metav1.Time{
|
||||
Time: time.Now(),
|
||||
},
|
||||
},
|
||||
Spec: v1.ServiceSpec{
|
||||
Ports: []v1.ServicePort{{
|
||||
Port: 80,
|
||||
Protocol: v1.ProtocolTCP,
|
||||
}},
|
||||
Type: v1.ServiceTypeLoadBalancer,
|
||||
},
|
||||
},
|
||||
expectOp: deleteLoadBalancer,
|
||||
expectDeleteAttempt: true,
|
||||
expectPatchStatus: true,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
@ -408,6 +439,7 @@ func TestUpdateNodesInExternalLoadBalancer(t *testing.T) {
|
||||
table := []struct {
|
||||
services []*v1.Service
|
||||
expectedUpdateCalls []fakecloud.UpdateBalancerCall
|
||||
shouldNotBeInCache bool
|
||||
}{
|
||||
{
|
||||
// No services present: no calls should be made.
|
||||
@ -467,16 +499,39 @@ func TestUpdateNodesInExternalLoadBalancer(t *testing.T) {
|
||||
{Service: newService("s0", "234", v1.ServiceTypeLoadBalancer), Hosts: nodes},
|
||||
},
|
||||
},
|
||||
{
|
||||
// One service has an external load balancer but is not in the cache (it is deleted): no calls should be made.
|
||||
services: []*v1.Service{
|
||||
newService("s0", "567", v1.ServiceTypeLoadBalancer),
|
||||
},
|
||||
expectedUpdateCalls: nil,
|
||||
shouldNotBeInCache: true,
|
||||
},
|
||||
{
|
||||
// One service has a deleteion timestamp: no calls should be made.
|
||||
services: []*v1.Service{
|
||||
newServiceWithDeletionTimestamp("s0", "999", v1.ServiceTypeLoadBalancer, metav1.Now()),
|
||||
},
|
||||
expectedUpdateCalls: nil,
|
||||
},
|
||||
}
|
||||
for _, item := range table {
|
||||
controller, cloud, _ := newController()
|
||||
|
||||
var services []*v1.Service
|
||||
services = append(services, item.services...)
|
||||
|
||||
if err := controller.updateLoadBalancerHosts(services, nodes); err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
// Use keys array instead of cache.ListKeys() so that the order of keys is deterministic
|
||||
keys := make([]string, 0, len(item.services))
|
||||
for _, service := range item.services {
|
||||
if service == nil || item.shouldNotBeInCache {
|
||||
continue
|
||||
}
|
||||
key := service.GetNamespace() + "/" + service.GetName()
|
||||
keys = append(keys, key)
|
||||
// Manually populate cache because updateLoadBalancerHosts gets services from cache
|
||||
svc := controller.cache.getOrCreate(key)
|
||||
svc.state = service
|
||||
}
|
||||
|
||||
controller.updateLoadBalancerHosts(keys, nodes)
|
||||
if !reflect.DeepEqual(item.expectedUpdateCalls, cloud.UpdateCalls) {
|
||||
t.Errorf("expected update calls mismatch, expected %+v, got %+v", item.expectedUpdateCalls, cloud.UpdateCalls)
|
||||
}
|
||||
@ -1143,18 +1198,6 @@ func TestServiceCache(t *testing.T) {
|
||||
return nil
|
||||
},
|
||||
},
|
||||
{
|
||||
testName: "allServices",
|
||||
setCacheFn: nil, //Nothing to set
|
||||
checkCacheFn: func() error {
|
||||
//It should return two elements
|
||||
svcArray := sc.allServices()
|
||||
if len(svcArray) != 2 {
|
||||
return fmt.Errorf("Expected(2) Obtained(%v)", len(svcArray))
|
||||
}
|
||||
return nil
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
|
Loading…
Reference in New Issue
Block a user