diff --git a/federation/pkg/federation-controller/service/BUILD b/federation/pkg/federation-controller/service/BUILD index e378f428cf7..581cbcda6b8 100644 --- a/federation/pkg/federation-controller/service/BUILD +++ b/federation/pkg/federation-controller/service/BUILD @@ -20,6 +20,7 @@ go_library( "//federation/pkg/federation-controller/util:go_default_library", "//federation/pkg/federation-controller/util/clusterselector:go_default_library", "//federation/pkg/federation-controller/util/deletionhelper:go_default_library", + "//federation/pkg/federation-controller/util/eventsink:go_default_library", "//pkg/api:go_default_library", "//pkg/api/v1:go_default_library", "//pkg/client/clientset_generated/clientset:go_default_library", @@ -53,6 +54,7 @@ go_test( "//federation/client/clientset_generated/federation_clientset/fake:go_default_library", "//federation/pkg/federation-controller/service/ingress:go_default_library", "//federation/pkg/federation-controller/util:go_default_library", + "//federation/pkg/federation-controller/util/deletionhelper:go_default_library", "//federation/pkg/federation-controller/util/test:go_default_library", "//pkg/api/v1:go_default_library", "//pkg/client/clientset_generated/clientset:go_default_library", @@ -62,6 +64,7 @@ go_test( "//vendor/github.com/stretchr/testify/require:go_default_library", "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", "//vendor/k8s.io/apimachinery/pkg/types:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//vendor/k8s.io/client-go/tools/cache:go_default_library", diff --git a/federation/pkg/federation-controller/service/servicecontroller.go b/federation/pkg/federation-controller/service/servicecontroller.go index 2e89f150951..ffc278af8cb 100644 --- a/federation/pkg/federation-controller/service/servicecontroller.go +++ b/federation/pkg/federation-controller/service/servicecontroller.go @@ -45,6 +45,7 @@ import ( fedutil "k8s.io/kubernetes/federation/pkg/federation-controller/util" "k8s.io/kubernetes/federation/pkg/federation-controller/util/clusterselector" "k8s.io/kubernetes/federation/pkg/federation-controller/util/deletionhelper" + "k8s.io/kubernetes/federation/pkg/federation-controller/util/eventsink" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/v1" kubeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" @@ -96,12 +97,11 @@ type ServiceController struct { flowcontrolBackoff *flowcontrol.Backoff } -// New returns a new service controller to keep DNS provider service resources -// (like Kubernetes Services and DNS server records for service discovery) in sync with the registry. +// New returns a new service controller to keep service objects between +// the federation and member clusters in sync. func New(federationClient fedclientset.Interface) *ServiceController { broadcaster := record.NewBroadcaster() - // federationClient event is not supported yet - // broadcaster.StartRecordingToSink(&unversioned_core.EventSinkImpl{Interface: kubeClient.Core().Events("")}) + broadcaster.StartRecordingToSink(eventsink.NewFederatedEventSink(federationClient)) recorder := broadcaster.NewRecorder(api.Scheme, clientv1.EventSource{Component: UserAgentName}) s := &ServiceController{ @@ -180,10 +180,6 @@ func New(federationClient fedclientset.Interface) *ServiceController { svc := obj.(*v1.Service) orphanDependents := false err := client.Core().Services(svc.Namespace).Delete(svc.Name, &metav1.DeleteOptions{OrphanDependents: &orphanDependents}) - // IsNotFound error is fine since that means the object is deleted already. - if errors.IsNotFound(err) { - return nil - } return err }) @@ -236,44 +232,40 @@ func (s *ServiceController) updateService(obj pkgruntime.Object) (pkgruntime.Obj return s.federationClient.Core().Services(service.Namespace).Update(service) } -// Run starts a background goroutine that watches for changes to federation services -// and ensures that they have Kubernetes services created, updated or deleted appropriately. -// federationSyncPeriod controls how often we check the federation's services to -// ensure that the correct Kubernetes services (and associated DNS entries) exist. -// This is only necessary to fudge over failed watches. -// clusterSyncPeriod controls how often we check the federation's underlying clusters and -// their Kubernetes services to ensure that matching services created independently of the Federation -// (e.g. directly via the underlying cluster's API) are correctly accounted for. - -// It's an error to call Run() more than once for a given ServiceController -// object. +// Run starts informers, delay deliverers and workers. Workers continuously watch for events which could +// be from federation or federated clusters and tries to reconcile the service objects from federation to +// federated clusters. func (s *ServiceController) Run(workers int, stopCh <-chan struct{}) { glog.Infof("Starting federation service controller") defer runtime.HandleCrash() + defer s.queue.ShutDown() + s.federatedInformer.Start() + defer s.federatedInformer.Stop() + s.endpointFederatedInformer.Start() + defer s.endpointFederatedInformer.Stop() + s.objectDeliverer.StartWithHandler(func(item *fedutil.DelayingDelivererItem) { s.queue.Add(item.Value.(string)) }) + defer s.objectDeliverer.Stop() + s.clusterDeliverer.StartWithHandler(func(_ *fedutil.DelayingDelivererItem) { s.deliverServicesOnClusterChange() }) + defer s.clusterDeliverer.Stop() + fedutil.StartBackoffGC(s.flowcontrolBackoff, stopCh) go s.serviceController.Run(stopCh) for i := 0; i < workers; i++ { go wait.Until(s.fedServiceWorker, time.Second, stopCh) } - go func() { - <-stopCh - glog.Infof("Shutting down Federation Service Controller") - s.queue.ShutDown() - s.federatedInformer.Stop() - s.endpointFederatedInformer.Stop() - s.objectDeliverer.Stop() - s.clusterDeliverer.Stop() - }() + + <-stopCh + glog.Infof("Shutting down federation service controller") } type reconciliationStatus string diff --git a/federation/pkg/federation-controller/service/servicecontroller_test.go b/federation/pkg/federation-controller/service/servicecontroller_test.go index 8e6b1967190..4040e122524 100644 --- a/federation/pkg/federation-controller/service/servicecontroller_test.go +++ b/federation/pkg/federation-controller/service/servicecontroller_test.go @@ -27,6 +27,7 @@ import ( "github.com/stretchr/testify/require" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/tools/cache" @@ -34,6 +35,7 @@ import ( fakefedclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset/fake" "k8s.io/kubernetes/federation/pkg/federation-controller/service/ingress" fedutil "k8s.io/kubernetes/federation/pkg/federation-controller/util" + "k8s.io/kubernetes/federation/pkg/federation-controller/util/deletionhelper" . "k8s.io/kubernetes/federation/pkg/federation-controller/util/test" "k8s.io/kubernetes/pkg/api/v1" kubeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" @@ -71,6 +73,7 @@ func TestServiceController(t *testing.T) { RegisterFakeOnUpdate(clusters, &fedClient.Fake, fedclusterWatch) RegisterFakeOnCreate(services, &fedClient.Fake, fedServiceWatch) RegisterFakeOnUpdate(services, &fedClient.Fake, fedServiceWatch) + RegisterFakeOnDelete(services, &fedClient.Fake, fedServiceWatch, serviceObjectGetter) cluster1Client := &fakekubeclientset.Clientset{} RegisterFakeList(services, &cluster1Client.Fake, &v1.ServiceList{Items: []v1.Service{}}) @@ -79,6 +82,7 @@ func TestServiceController(t *testing.T) { c1EndpointWatch := RegisterFakeWatch(endpoints, &cluster1Client.Fake) RegisterFakeOnCreate(services, &cluster1Client.Fake, c1ServiceWatch) RegisterFakeOnUpdate(services, &cluster1Client.Fake, c1ServiceWatch) + RegisterFakeOnDelete(services, &cluster1Client.Fake, c1ServiceWatch, serviceObjectGetter) RegisterFakeOnCreate(endpoints, &cluster1Client.Fake, c1EndpointWatch) RegisterFakeOnUpdate(endpoints, &cluster1Client.Fake, c1EndpointWatch) @@ -89,6 +93,7 @@ func TestServiceController(t *testing.T) { c2EndpointWatch := RegisterFakeWatch(endpoints, &cluster2Client.Fake) RegisterFakeOnCreate(services, &cluster2Client.Fake, c2ServiceWatch) RegisterFakeOnUpdate(services, &cluster2Client.Fake, c2ServiceWatch) + RegisterFakeOnDelete(services, &cluster2Client.Fake, c2ServiceWatch, serviceObjectGetter) RegisterFakeOnCreate(endpoints, &cluster2Client.Fake, c2EndpointWatch) RegisterFakeOnUpdate(endpoints, &cluster2Client.Fake, c2EndpointWatch) @@ -119,7 +124,6 @@ func TestServiceController(t *testing.T) { service := NewService("test-service-1", 80) - // Test add federated service. glog.Infof("Adding federated service") fedServiceWatch.Add(service) key := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}.String() @@ -194,19 +198,28 @@ func TestServiceController(t *testing.T) { require.NoError(t, WaitForFederatedServiceUpdate(t, sc.serviceStore, key, desiredService, serviceIngressCompare, wait.ForeverTestTimeout)) - // Test update federated service. glog.Infof("Test modifying federated service by changing the port") service.Spec.Ports[0].Port = 9090 fedServiceWatch.Modify(service) require.NoError(t, WaitForClusterService(t, sc.federatedInformer.GetTargetStore(), cluster1.Name, key, service, wait.ForeverTestTimeout)) - // Test cluster service is recreated when deleted. glog.Infof("Test cluster service is recreated when deleted") - c1ServiceWatch.Delete(service) + c1Service := NewService("test-service-1", 80) + c1Service.DeletionTimestamp = &metav1.Time{Time: time.Now()} + c1ServiceWatch.Delete(c1Service) require.NoError(t, WaitForClusterService(t, sc.federatedInformer.GetTargetStore(), cluster1.Name, key, service, wait.ForeverTestTimeout)) + glog.Infof("Test cluster services are deleted when federated service is deleted") + service.ObjectMeta.Finalizers = append(service.ObjectMeta.Finalizers, deletionhelper.FinalizerDeleteFromUnderlyingClusters) + service.DeletionTimestamp = &metav1.Time{Time: time.Now()} + fedServiceWatch.Modify(service) + require.NoError(t, WaitForClusterServiceDelete(t, sc.federatedInformer.GetTargetStore(), cluster1.Name, + key, wait.ForeverTestTimeout)) + require.NoError(t, WaitForClusterServiceDelete(t, sc.federatedInformer.GetTargetStore(), cluster2.Name, + key, wait.ForeverTestTimeout)) + close(stop) } @@ -257,6 +270,15 @@ func TestGetOperationsToPerformOnCluster(t *testing.T) { } } +// serviceObjectGetter gives dummy service objects to use with RegisterFakeOnDelete +// This is just so that federated informer can be tested for delete scenarios. +func serviceObjectGetter(name, namespace string) runtime.Object { + service := new(v1.Service) + service.Namespace = namespace + service.Name = name + return service +} + func NewService(name string, port int32) *v1.Service { return &v1.Service{ ObjectMeta: metav1.ObjectMeta{ @@ -306,6 +328,18 @@ func WaitForClusterService(t *testing.T, store fedutil.FederatedReadOnlyStore, c return err } +// WaitForClusterServiceDelete waits for the cluster service to be deleted. +func WaitForClusterServiceDelete(t *testing.T, store fedutil.FederatedReadOnlyStore, clusterName, key string, timeout time.Duration) error { + err := wait.PollImmediate(retryInterval, timeout, func() (bool, error) { + _, found, _ := store.GetByKey(clusterName, key) + if !found { + return true, nil + } + return false, nil + }) + return err +} + type serviceCompare func(current, desired *v1.Service) (match bool) func serviceStatusCompare(current, desired *v1.Service) bool { diff --git a/federation/pkg/federation-controller/util/test/test_helper.go b/federation/pkg/federation-controller/util/test/test_helper.go index c75b9ca0477..bb5505cd917 100644 --- a/federation/pkg/federation-controller/util/test/test_helper.go +++ b/federation/pkg/federation-controller/util/test/test_helper.go @@ -288,6 +288,32 @@ func RegisterFakeCopyOnUpdate(resource string, client *core.Fake, watcher *Watch return objChan } +// RegisterFakeOnDelete registers a reactor in the given fake client that passes +// all deleted objects to the given watcher. Since we could get only name of the +// deleted object from DeleteAction, this register function relies on the getObject +// function passed to get the object by name and pass it watcher. +func RegisterFakeOnDelete(resource string, client *core.Fake, watcher *WatcherDispatcher, getObject func(name, namespace string) runtime.Object) { + client.AddReactor("delete", resource, func(action core.Action) (bool, runtime.Object, error) { + deleteAction := action.(core.DeleteAction) + obj := getObject(deleteAction.GetName(), deleteAction.GetNamespace()) + glog.V(7).Infof("Deleting %s: %v", resource, obj) + + operation := func() { + glog.V(4).Infof("Object deleted %v", obj) + watcher.Delete(obj) + } + select { + case watcher.orderExecution <- operation: + break + case <-time.After(pushTimeout): + glog.Errorf("Fake client execution channel blocked") + glog.Errorf("Tried to push %v", deleteAction) + } + return true, obj, nil + }) + return +} + // Adds an update reactor to the given fake client. // The reactor just returns the object passed to update action. // This is used as a hack to workaround https://github.com/kubernetes/kubernetes/issues/40939.