Merge pull request #46483 from shashidharatd/fed-sc-ut-delete

Automatic merge from submit-queue (batch tested with PRs 36721, 46483, 45500, 46724, 46036)

Federation: Minor corrections in service controller and add a unit testcase

**What this PR does / why we need it**:
This PR fixes few outdated comments in federation service controller and few other minor fixes.
This also adds a unit test case to test federated service deletion.


/assign @quinton-hoole 
/cc @marun @kubernetes/sig-federation-pr-reviews 

```release-note
NONE
```
This commit is contained in:
Kubernetes Submit Queue 2017-06-03 08:08:38 -07:00 committed by GitHub
commit 445795186d
4 changed files with 87 additions and 32 deletions

View File

@ -20,6 +20,7 @@ go_library(
"//federation/pkg/federation-controller/util:go_default_library",
"//federation/pkg/federation-controller/util/clusterselector:go_default_library",
"//federation/pkg/federation-controller/util/deletionhelper:go_default_library",
"//federation/pkg/federation-controller/util/eventsink:go_default_library",
"//pkg/api:go_default_library",
"//pkg/api/v1:go_default_library",
"//pkg/client/clientset_generated/clientset:go_default_library",
@ -53,6 +54,7 @@ go_test(
"//federation/client/clientset_generated/federation_clientset/fake:go_default_library",
"//federation/pkg/federation-controller/service/ingress:go_default_library",
"//federation/pkg/federation-controller/util:go_default_library",
"//federation/pkg/federation-controller/util/deletionhelper:go_default_library",
"//federation/pkg/federation-controller/util/test:go_default_library",
"//pkg/api/v1:go_default_library",
"//pkg/client/clientset_generated/clientset:go_default_library",
@ -62,6 +64,7 @@ go_test(
"//vendor/github.com/stretchr/testify/require:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/k8s.io/client-go/tools/cache:go_default_library",

View File

@ -45,6 +45,7 @@ import (
fedutil "k8s.io/kubernetes/federation/pkg/federation-controller/util"
"k8s.io/kubernetes/federation/pkg/federation-controller/util/clusterselector"
"k8s.io/kubernetes/federation/pkg/federation-controller/util/deletionhelper"
"k8s.io/kubernetes/federation/pkg/federation-controller/util/eventsink"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/v1"
kubeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
@ -96,12 +97,11 @@ type ServiceController struct {
flowcontrolBackoff *flowcontrol.Backoff
}
// New returns a new service controller to keep DNS provider service resources
// (like Kubernetes Services and DNS server records for service discovery) in sync with the registry.
// New returns a new service controller to keep service objects between
// the federation and member clusters in sync.
func New(federationClient fedclientset.Interface) *ServiceController {
broadcaster := record.NewBroadcaster()
// federationClient event is not supported yet
// broadcaster.StartRecordingToSink(&unversioned_core.EventSinkImpl{Interface: kubeClient.Core().Events("")})
broadcaster.StartRecordingToSink(eventsink.NewFederatedEventSink(federationClient))
recorder := broadcaster.NewRecorder(api.Scheme, clientv1.EventSource{Component: UserAgentName})
s := &ServiceController{
@ -180,10 +180,6 @@ func New(federationClient fedclientset.Interface) *ServiceController {
svc := obj.(*v1.Service)
orphanDependents := false
err := client.Core().Services(svc.Namespace).Delete(svc.Name, &metav1.DeleteOptions{OrphanDependents: &orphanDependents})
// IsNotFound error is fine since that means the object is deleted already.
if errors.IsNotFound(err) {
return nil
}
return err
})
@ -236,44 +232,40 @@ func (s *ServiceController) updateService(obj pkgruntime.Object) (pkgruntime.Obj
return s.federationClient.Core().Services(service.Namespace).Update(service)
}
// Run starts a background goroutine that watches for changes to federation services
// and ensures that they have Kubernetes services created, updated or deleted appropriately.
// federationSyncPeriod controls how often we check the federation's services to
// ensure that the correct Kubernetes services (and associated DNS entries) exist.
// This is only necessary to fudge over failed watches.
// clusterSyncPeriod controls how often we check the federation's underlying clusters and
// their Kubernetes services to ensure that matching services created independently of the Federation
// (e.g. directly via the underlying cluster's API) are correctly accounted for.
// It's an error to call Run() more than once for a given ServiceController
// object.
// Run starts informers, delay deliverers and workers. Workers continuously watch for events which could
// be from federation or federated clusters and tries to reconcile the service objects from federation to
// federated clusters.
func (s *ServiceController) Run(workers int, stopCh <-chan struct{}) {
glog.Infof("Starting federation service controller")
defer runtime.HandleCrash()
defer s.queue.ShutDown()
s.federatedInformer.Start()
defer s.federatedInformer.Stop()
s.endpointFederatedInformer.Start()
defer s.endpointFederatedInformer.Stop()
s.objectDeliverer.StartWithHandler(func(item *fedutil.DelayingDelivererItem) {
s.queue.Add(item.Value.(string))
})
defer s.objectDeliverer.Stop()
s.clusterDeliverer.StartWithHandler(func(_ *fedutil.DelayingDelivererItem) {
s.deliverServicesOnClusterChange()
})
defer s.clusterDeliverer.Stop()
fedutil.StartBackoffGC(s.flowcontrolBackoff, stopCh)
go s.serviceController.Run(stopCh)
for i := 0; i < workers; i++ {
go wait.Until(s.fedServiceWorker, time.Second, stopCh)
}
go func() {
<-stopCh
glog.Infof("Shutting down Federation Service Controller")
s.queue.ShutDown()
s.federatedInformer.Stop()
s.endpointFederatedInformer.Stop()
s.objectDeliverer.Stop()
s.clusterDeliverer.Stop()
}()
<-stopCh
glog.Infof("Shutting down federation service controller")
}
type reconciliationStatus string

View File

@ -27,6 +27,7 @@ import (
"github.com/stretchr/testify/require"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/cache"
@ -34,6 +35,7 @@ import (
fakefedclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset/fake"
"k8s.io/kubernetes/federation/pkg/federation-controller/service/ingress"
fedutil "k8s.io/kubernetes/federation/pkg/federation-controller/util"
"k8s.io/kubernetes/federation/pkg/federation-controller/util/deletionhelper"
. "k8s.io/kubernetes/federation/pkg/federation-controller/util/test"
"k8s.io/kubernetes/pkg/api/v1"
kubeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
@ -71,6 +73,7 @@ func TestServiceController(t *testing.T) {
RegisterFakeOnUpdate(clusters, &fedClient.Fake, fedclusterWatch)
RegisterFakeOnCreate(services, &fedClient.Fake, fedServiceWatch)
RegisterFakeOnUpdate(services, &fedClient.Fake, fedServiceWatch)
RegisterFakeOnDelete(services, &fedClient.Fake, fedServiceWatch, serviceObjectGetter)
cluster1Client := &fakekubeclientset.Clientset{}
RegisterFakeList(services, &cluster1Client.Fake, &v1.ServiceList{Items: []v1.Service{}})
@ -79,6 +82,7 @@ func TestServiceController(t *testing.T) {
c1EndpointWatch := RegisterFakeWatch(endpoints, &cluster1Client.Fake)
RegisterFakeOnCreate(services, &cluster1Client.Fake, c1ServiceWatch)
RegisterFakeOnUpdate(services, &cluster1Client.Fake, c1ServiceWatch)
RegisterFakeOnDelete(services, &cluster1Client.Fake, c1ServiceWatch, serviceObjectGetter)
RegisterFakeOnCreate(endpoints, &cluster1Client.Fake, c1EndpointWatch)
RegisterFakeOnUpdate(endpoints, &cluster1Client.Fake, c1EndpointWatch)
@ -89,6 +93,7 @@ func TestServiceController(t *testing.T) {
c2EndpointWatch := RegisterFakeWatch(endpoints, &cluster2Client.Fake)
RegisterFakeOnCreate(services, &cluster2Client.Fake, c2ServiceWatch)
RegisterFakeOnUpdate(services, &cluster2Client.Fake, c2ServiceWatch)
RegisterFakeOnDelete(services, &cluster2Client.Fake, c2ServiceWatch, serviceObjectGetter)
RegisterFakeOnCreate(endpoints, &cluster2Client.Fake, c2EndpointWatch)
RegisterFakeOnUpdate(endpoints, &cluster2Client.Fake, c2EndpointWatch)
@ -119,7 +124,6 @@ func TestServiceController(t *testing.T) {
service := NewService("test-service-1", 80)
// Test add federated service.
glog.Infof("Adding federated service")
fedServiceWatch.Add(service)
key := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}.String()
@ -194,19 +198,28 @@ func TestServiceController(t *testing.T) {
require.NoError(t, WaitForFederatedServiceUpdate(t, sc.serviceStore,
key, desiredService, serviceIngressCompare, wait.ForeverTestTimeout))
// Test update federated service.
glog.Infof("Test modifying federated service by changing the port")
service.Spec.Ports[0].Port = 9090
fedServiceWatch.Modify(service)
require.NoError(t, WaitForClusterService(t, sc.federatedInformer.GetTargetStore(), cluster1.Name,
key, service, wait.ForeverTestTimeout))
// Test cluster service is recreated when deleted.
glog.Infof("Test cluster service is recreated when deleted")
c1ServiceWatch.Delete(service)
c1Service := NewService("test-service-1", 80)
c1Service.DeletionTimestamp = &metav1.Time{Time: time.Now()}
c1ServiceWatch.Delete(c1Service)
require.NoError(t, WaitForClusterService(t, sc.federatedInformer.GetTargetStore(), cluster1.Name,
key, service, wait.ForeverTestTimeout))
glog.Infof("Test cluster services are deleted when federated service is deleted")
service.ObjectMeta.Finalizers = append(service.ObjectMeta.Finalizers, deletionhelper.FinalizerDeleteFromUnderlyingClusters)
service.DeletionTimestamp = &metav1.Time{Time: time.Now()}
fedServiceWatch.Modify(service)
require.NoError(t, WaitForClusterServiceDelete(t, sc.federatedInformer.GetTargetStore(), cluster1.Name,
key, wait.ForeverTestTimeout))
require.NoError(t, WaitForClusterServiceDelete(t, sc.federatedInformer.GetTargetStore(), cluster2.Name,
key, wait.ForeverTestTimeout))
close(stop)
}
@ -257,6 +270,15 @@ func TestGetOperationsToPerformOnCluster(t *testing.T) {
}
}
// serviceObjectGetter gives dummy service objects to use with RegisterFakeOnDelete
// This is just so that federated informer can be tested for delete scenarios.
func serviceObjectGetter(name, namespace string) runtime.Object {
service := new(v1.Service)
service.Namespace = namespace
service.Name = name
return service
}
func NewService(name string, port int32) *v1.Service {
return &v1.Service{
ObjectMeta: metav1.ObjectMeta{
@ -306,6 +328,18 @@ func WaitForClusterService(t *testing.T, store fedutil.FederatedReadOnlyStore, c
return err
}
// WaitForClusterServiceDelete waits for the cluster service to be deleted.
func WaitForClusterServiceDelete(t *testing.T, store fedutil.FederatedReadOnlyStore, clusterName, key string, timeout time.Duration) error {
err := wait.PollImmediate(retryInterval, timeout, func() (bool, error) {
_, found, _ := store.GetByKey(clusterName, key)
if !found {
return true, nil
}
return false, nil
})
return err
}
type serviceCompare func(current, desired *v1.Service) (match bool)
func serviceStatusCompare(current, desired *v1.Service) bool {

View File

@ -288,6 +288,32 @@ func RegisterFakeCopyOnUpdate(resource string, client *core.Fake, watcher *Watch
return objChan
}
// RegisterFakeOnDelete registers a reactor in the given fake client that passes
// all deleted objects to the given watcher. Since we could get only name of the
// deleted object from DeleteAction, this register function relies on the getObject
// function passed to get the object by name and pass it watcher.
func RegisterFakeOnDelete(resource string, client *core.Fake, watcher *WatcherDispatcher, getObject func(name, namespace string) runtime.Object) {
client.AddReactor("delete", resource, func(action core.Action) (bool, runtime.Object, error) {
deleteAction := action.(core.DeleteAction)
obj := getObject(deleteAction.GetName(), deleteAction.GetNamespace())
glog.V(7).Infof("Deleting %s: %v", resource, obj)
operation := func() {
glog.V(4).Infof("Object deleted %v", obj)
watcher.Delete(obj)
}
select {
case watcher.orderExecution <- operation:
break
case <-time.After(pushTimeout):
glog.Errorf("Fake client execution channel blocked")
glog.Errorf("Tried to push %v", deleteAction)
}
return true, obj, nil
})
return
}
// Adds an update reactor to the given fake client.
// The reactor just returns the object passed to update action.
// This is used as a hack to workaround https://github.com/kubernetes/kubernetes/issues/40939.