From 80ceb5b3d6a1009b23e075e2127443486d7266d0 Mon Sep 17 00:00:00 2001 From: shashidharatd Date: Thu, 18 May 2017 18:46:33 +0530 Subject: [PATCH 1/4] Some minor corrections in service controller --- .../service/servicecontroller.go | 48 ++++++++----------- 1 file changed, 20 insertions(+), 28 deletions(-) diff --git a/federation/pkg/federation-controller/service/servicecontroller.go b/federation/pkg/federation-controller/service/servicecontroller.go index 10b6c2a2b85..870e449cd5b 100644 --- a/federation/pkg/federation-controller/service/servicecontroller.go +++ b/federation/pkg/federation-controller/service/servicecontroller.go @@ -44,6 +44,7 @@ import ( fedutil "k8s.io/kubernetes/federation/pkg/federation-controller/util" "k8s.io/kubernetes/federation/pkg/federation-controller/util/clusterselector" "k8s.io/kubernetes/federation/pkg/federation-controller/util/deletionhelper" + "k8s.io/kubernetes/federation/pkg/federation-controller/util/eventsink" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/v1" kubeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" @@ -95,12 +96,11 @@ type ServiceController struct { flowcontrolBackoff *flowcontrol.Backoff } -// New returns a new service controller to keep DNS provider service resources -// (like Kubernetes Services and DNS server records for service discovery) in sync with the registry. +// New returns a new service controller to keep service objects between +// the federation and member clusters in sync. func New(federationClient fedclientset.Interface) *ServiceController { broadcaster := record.NewBroadcaster() - // federationClient event is not supported yet - // broadcaster.StartRecordingToSink(&unversioned_core.EventSinkImpl{Interface: kubeClient.Core().Events("")}) + broadcaster.StartRecordingToSink(eventsink.NewFederatedEventSink(federationClient)) recorder := broadcaster.NewRecorder(api.Scheme, clientv1.EventSource{Component: UserAgentName}) s := &ServiceController{ @@ -179,10 +179,6 @@ func New(federationClient fedclientset.Interface) *ServiceController { svc := obj.(*v1.Service) orphanDependents := false err := client.Core().Services(svc.Namespace).Delete(svc.Name, &metav1.DeleteOptions{OrphanDependents: &orphanDependents}) - // IsNotFound error is fine since that means the object is deleted already. - if errors.IsNotFound(err) { - return nil - } return err }) @@ -235,44 +231,40 @@ func (s *ServiceController) updateService(obj pkgruntime.Object) (pkgruntime.Obj return s.federationClient.Core().Services(service.Namespace).Update(service) } -// Run starts a background goroutine that watches for changes to federation services -// and ensures that they have Kubernetes services created, updated or deleted appropriately. -// federationSyncPeriod controls how often we check the federation's services to -// ensure that the correct Kubernetes services (and associated DNS entries) exist. -// This is only necessary to fudge over failed watches. -// clusterSyncPeriod controls how often we check the federation's underlying clusters and -// their Kubernetes services to ensure that matching services created independently of the Federation -// (e.g. directly via the underlying cluster's API) are correctly accounted for. - -// It's an error to call Run() more than once for a given ServiceController -// object. +// Run starts informers, delay deliverers and workers. Workers continuously watch for events which could +// be from federation or federated clusters and tries to reconcile the service objects from federation to +// federated clusters. func (s *ServiceController) Run(workers int, stopCh <-chan struct{}) { glog.Infof("Starting federation service controller") defer runtime.HandleCrash() + defer s.queue.ShutDown() + s.federatedInformer.Start() + defer s.federatedInformer.Stop() + s.endpointFederatedInformer.Start() + defer s.endpointFederatedInformer.Stop() + s.objectDeliverer.StartWithHandler(func(item *fedutil.DelayingDelivererItem) { s.queue.Add(item.Value.(string)) }) + defer s.objectDeliverer.Stop() + s.clusterDeliverer.StartWithHandler(func(_ *fedutil.DelayingDelivererItem) { s.deliverServicesOnClusterChange() }) + defer s.clusterDeliverer.Stop() + fedutil.StartBackoffGC(s.flowcontrolBackoff, stopCh) go s.serviceController.Run(stopCh) for i := 0; i < workers; i++ { go wait.Until(s.fedServiceWorker, time.Second, stopCh) } - go func() { - <-stopCh - glog.Infof("Shutting down Federation Service Controller") - s.queue.ShutDown() - s.federatedInformer.Stop() - s.endpointFederatedInformer.Stop() - s.objectDeliverer.Stop() - s.clusterDeliverer.Stop() - }() + + <-stopCh + glog.Infof("Shutting down federation service controller") } type reconciliationStatus string From be0d7f0aeb615260be44f57ae492b4dfc884e5a5 Mon Sep 17 00:00:00 2001 From: shashidharatd Date: Thu, 25 May 2017 18:51:15 +0530 Subject: [PATCH 2/4] Add RegisterFakeOnDelete to test federation object deletion --- .../util/test/test_helper.go | 26 +++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/federation/pkg/federation-controller/util/test/test_helper.go b/federation/pkg/federation-controller/util/test/test_helper.go index 9647cf9d209..af28fde4ca0 100644 --- a/federation/pkg/federation-controller/util/test/test_helper.go +++ b/federation/pkg/federation-controller/util/test/test_helper.go @@ -288,6 +288,32 @@ func RegisterFakeCopyOnUpdate(resource string, client *core.Fake, watcher *Watch return objChan } +// RegisterFakeOnDelete registers a reactor in the given fake client that passes +// all deleted objects to the given watcher. Since we could get only name of the +// deleted object from DeleteAction, this register function relies on the getObject +// function passed to get the object by name and pass it watcher. +func RegisterFakeOnDelete(resource string, client *core.Fake, watcher *WatcherDispatcher, getObject func(name, namespace string) runtime.Object) { + client.AddReactor("delete", resource, func(action core.Action) (bool, runtime.Object, error) { + deleteAction := action.(core.DeleteAction) + obj := getObject(deleteAction.GetName(), deleteAction.GetNamespace()) + glog.V(7).Infof("Deleting %s: %v", resource, obj) + + operation := func() { + glog.V(4).Infof("Object deleted %v", obj) + watcher.Delete(obj) + } + select { + case watcher.orderExecution <- operation: + break + case <-time.After(pushTimeout): + glog.Errorf("Fake client execution channel blocked") + glog.Errorf("Tried to push %v", deleteAction) + } + return true, obj, nil + }) + return +} + // Adds an update reactor to the given fake client. // The reactor just returns the object passed to update action. // This is used as a hack to workaround https://github.com/kubernetes/kubernetes/issues/40939. From 296fe66bf75038964d8d9a032b609a403b07f654 Mon Sep 17 00:00:00 2001 From: shashidharatd Date: Thu, 25 May 2017 18:53:05 +0530 Subject: [PATCH 3/4] Add federated service delete unit testcase --- .../service/servicecontroller_test.go | 42 +++++++++++++++++-- 1 file changed, 38 insertions(+), 4 deletions(-) diff --git a/federation/pkg/federation-controller/service/servicecontroller_test.go b/federation/pkg/federation-controller/service/servicecontroller_test.go index a98ae0936c4..0a8962ecc2b 100644 --- a/federation/pkg/federation-controller/service/servicecontroller_test.go +++ b/federation/pkg/federation-controller/service/servicecontroller_test.go @@ -27,12 +27,14 @@ import ( "github.com/stretchr/testify/require" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/tools/cache" "k8s.io/kubernetes/federation/apis/federation/v1beta1" fakefedclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset/fake" fedutil "k8s.io/kubernetes/federation/pkg/federation-controller/util" + "k8s.io/kubernetes/federation/pkg/federation-controller/util/deletionhelper" . "k8s.io/kubernetes/federation/pkg/federation-controller/util/test" "k8s.io/kubernetes/pkg/api/v1" kubeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" @@ -70,6 +72,7 @@ func TestServiceController(t *testing.T) { RegisterFakeOnUpdate(clusters, &fedClient.Fake, fedclusterWatch) RegisterFakeOnCreate(services, &fedClient.Fake, fedServiceWatch) RegisterFakeOnUpdate(services, &fedClient.Fake, fedServiceWatch) + RegisterFakeOnDelete(services, &fedClient.Fake, fedServiceWatch, serviceObjectGetter) cluster1Client := &fakekubeclientset.Clientset{} RegisterFakeList(services, &cluster1Client.Fake, &v1.ServiceList{Items: []v1.Service{}}) @@ -78,6 +81,7 @@ func TestServiceController(t *testing.T) { c1EndpointWatch := RegisterFakeWatch(endpoints, &cluster1Client.Fake) RegisterFakeOnCreate(services, &cluster1Client.Fake, c1ServiceWatch) RegisterFakeOnUpdate(services, &cluster1Client.Fake, c1ServiceWatch) + RegisterFakeOnDelete(services, &cluster1Client.Fake, c1ServiceWatch, serviceObjectGetter) RegisterFakeOnCreate(endpoints, &cluster1Client.Fake, c1EndpointWatch) RegisterFakeOnUpdate(endpoints, &cluster1Client.Fake, c1EndpointWatch) @@ -88,6 +92,7 @@ func TestServiceController(t *testing.T) { c2EndpointWatch := RegisterFakeWatch(endpoints, &cluster2Client.Fake) RegisterFakeOnCreate(services, &cluster2Client.Fake, c2ServiceWatch) RegisterFakeOnUpdate(services, &cluster2Client.Fake, c2ServiceWatch) + RegisterFakeOnDelete(services, &cluster2Client.Fake, c2ServiceWatch, serviceObjectGetter) RegisterFakeOnCreate(endpoints, &cluster2Client.Fake, c2EndpointWatch) RegisterFakeOnUpdate(endpoints, &cluster2Client.Fake, c2EndpointWatch) @@ -118,7 +123,6 @@ func TestServiceController(t *testing.T) { service := NewService("test-service-1", 80) - // Test add federated service. glog.Infof("Adding federated service") fedServiceWatch.Add(service) key := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}.String() @@ -193,19 +197,28 @@ func TestServiceController(t *testing.T) { require.NoError(t, WaitForFederatedServiceUpdate(t, sc.serviceStore, key, desiredService, serviceIngressCompare, wait.ForeverTestTimeout)) - // Test update federated service. glog.Infof("Test modifying federated service by changing the port") service.Spec.Ports[0].Port = 9090 fedServiceWatch.Modify(service) require.NoError(t, WaitForClusterService(t, sc.federatedInformer.GetTargetStore(), cluster1.Name, key, service, wait.ForeverTestTimeout)) - // Test cluster service is recreated when deleted. glog.Infof("Test cluster service is recreated when deleted") - c1ServiceWatch.Delete(service) + c1Service := NewService("test-service-1", 80) + c1Service.DeletionTimestamp = &metav1.Time{Time: time.Now()} + c1ServiceWatch.Delete(c1Service) require.NoError(t, WaitForClusterService(t, sc.federatedInformer.GetTargetStore(), cluster1.Name, key, service, wait.ForeverTestTimeout)) + glog.Infof("Test cluster services are deleted when federated service is deleted") + service.ObjectMeta.Finalizers = append(service.ObjectMeta.Finalizers, deletionhelper.FinalizerDeleteFromUnderlyingClusters) + service.DeletionTimestamp = &metav1.Time{Time: time.Now()} + fedServiceWatch.Modify(service) + require.NoError(t, WaitForClusterServiceDelete(t, sc.federatedInformer.GetTargetStore(), cluster1.Name, + key, wait.ForeverTestTimeout)) + require.NoError(t, WaitForClusterServiceDelete(t, sc.federatedInformer.GetTargetStore(), cluster2.Name, + key, wait.ForeverTestTimeout)) + close(stop) } @@ -256,6 +269,15 @@ func TestGetOperationsToPerformOnCluster(t *testing.T) { } } +// serviceObjectGetter gives dummy service objects to use with RegisterFakeOnDelete +// This is just so that federated informer can be tested for delete scenarios. +func serviceObjectGetter(name, namespace string) runtime.Object { + service := new(v1.Service) + service.Namespace = namespace + service.Name = name + return service +} + func NewService(name string, port int32) *v1.Service { return &v1.Service{ ObjectMeta: metav1.ObjectMeta{ @@ -313,6 +335,18 @@ func WaitForClusterService(t *testing.T, store fedutil.FederatedReadOnlyStore, c return err } +// WaitForClusterServiceDelete waits for the cluster service to be deleted. +func WaitForClusterServiceDelete(t *testing.T, store fedutil.FederatedReadOnlyStore, clusterName, key string, timeout time.Duration) error { + err := wait.PollImmediate(retryInterval, timeout, func() (bool, error) { + _, found, _ := store.GetByKey(clusterName, key) + if !found { + return true, nil + } + return false, nil + }) + return err +} + type serviceCompare func(current, desired *v1.Service) (match bool) func serviceStatusCompare(current, desired *v1.Service) bool { From b4ddf4720d72952041392341950f6987e908d007 Mon Sep 17 00:00:00 2001 From: shashidharatd Date: Thu, 25 May 2017 18:53:43 +0530 Subject: [PATCH 4/4] Auto generated files --- federation/pkg/federation-controller/service/BUILD | 3 +++ 1 file changed, 3 insertions(+) diff --git a/federation/pkg/federation-controller/service/BUILD b/federation/pkg/federation-controller/service/BUILD index 6a0a9784c37..27a9d435b79 100644 --- a/federation/pkg/federation-controller/service/BUILD +++ b/federation/pkg/federation-controller/service/BUILD @@ -25,6 +25,7 @@ go_library( "//federation/pkg/federation-controller/util:go_default_library", "//federation/pkg/federation-controller/util/clusterselector:go_default_library", "//federation/pkg/federation-controller/util/deletionhelper:go_default_library", + "//federation/pkg/federation-controller/util/eventsink:go_default_library", "//pkg/api:go_default_library", "//pkg/api/v1:go_default_library", "//pkg/client/clientset_generated/clientset:go_default_library", @@ -62,6 +63,7 @@ go_test( "//federation/client/clientset_generated/federation_clientset/fake:go_default_library", "//federation/pkg/dnsprovider/providers/google/clouddns:go_default_library", "//federation/pkg/federation-controller/util:go_default_library", + "//federation/pkg/federation-controller/util/deletionhelper:go_default_library", "//federation/pkg/federation-controller/util/test:go_default_library", "//pkg/api/v1:go_default_library", "//pkg/client/clientset_generated/clientset:go_default_library", @@ -71,6 +73,7 @@ go_test( "//vendor/github.com/stretchr/testify/require:go_default_library", "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", "//vendor/k8s.io/apimachinery/pkg/types:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//vendor/k8s.io/client-go/tools/cache:go_default_library",