Merge pull request #47890 from marun/fed-namespace-sync

Automatic merge from submit-queue (batch tested with PRs 47999, 47890)

[Federation] Update namespace support to use the sync controller

This PR moves namespaces to use the sync controller.

cc: @kubernetes/sig-federation-pr-reviews
This commit is contained in:
Kubernetes Submit Queue 2017-07-13 17:13:39 -07:00 committed by GitHub
commit ea78fe40db
30 changed files with 451 additions and 1078 deletions

View File

@ -24,7 +24,6 @@ go_library(
"//federation/pkg/federatedtypes:go_default_library", "//federation/pkg/federatedtypes:go_default_library",
"//federation/pkg/federation-controller/cluster:go_default_library", "//federation/pkg/federation-controller/cluster:go_default_library",
"//federation/pkg/federation-controller/ingress:go_default_library", "//federation/pkg/federation-controller/ingress:go_default_library",
"//federation/pkg/federation-controller/namespace:go_default_library",
"//federation/pkg/federation-controller/service:go_default_library", "//federation/pkg/federation-controller/service:go_default_library",
"//federation/pkg/federation-controller/service/dns:go_default_library", "//federation/pkg/federation-controller/service/dns:go_default_library",
"//federation/pkg/federation-controller/sync:go_default_library", "//federation/pkg/federation-controller/sync:go_default_library",
@ -40,7 +39,6 @@ go_library(
"//vendor/k8s.io/apiserver/pkg/server/healthz:go_default_library", "//vendor/k8s.io/apiserver/pkg/server/healthz:go_default_library",
"//vendor/k8s.io/apiserver/pkg/util/flag:go_default_library", "//vendor/k8s.io/apiserver/pkg/util/flag:go_default_library",
"//vendor/k8s.io/client-go/discovery:go_default_library", "//vendor/k8s.io/client-go/discovery:go_default_library",
"//vendor/k8s.io/client-go/dynamic:go_default_library",
"//vendor/k8s.io/client-go/rest:go_default_library", "//vendor/k8s.io/client-go/rest:go_default_library",
"//vendor/k8s.io/client-go/tools/clientcmd:go_default_library", "//vendor/k8s.io/client-go/tools/clientcmd:go_default_library",
], ],

View File

@ -30,7 +30,6 @@ import (
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/server/healthz" "k8s.io/apiserver/pkg/server/healthz"
utilflag "k8s.io/apiserver/pkg/util/flag" utilflag "k8s.io/apiserver/pkg/util/flag"
"k8s.io/client-go/dynamic"
restclient "k8s.io/client-go/rest" restclient "k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/tools/clientcmd"
federationclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset" federationclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset"
@ -38,7 +37,6 @@ import (
"k8s.io/kubernetes/federation/pkg/federatedtypes" "k8s.io/kubernetes/federation/pkg/federatedtypes"
clustercontroller "k8s.io/kubernetes/federation/pkg/federation-controller/cluster" clustercontroller "k8s.io/kubernetes/federation/pkg/federation-controller/cluster"
ingresscontroller "k8s.io/kubernetes/federation/pkg/federation-controller/ingress" ingresscontroller "k8s.io/kubernetes/federation/pkg/federation-controller/ingress"
namespacecontroller "k8s.io/kubernetes/federation/pkg/federation-controller/namespace"
servicecontroller "k8s.io/kubernetes/federation/pkg/federation-controller/service" servicecontroller "k8s.io/kubernetes/federation/pkg/federation-controller/service"
servicednscontroller "k8s.io/kubernetes/federation/pkg/federation-controller/service/dns" servicednscontroller "k8s.io/kubernetes/federation/pkg/federation-controller/service/dns"
synccontroller "k8s.io/kubernetes/federation/pkg/federation-controller/sync" synccontroller "k8s.io/kubernetes/federation/pkg/federation-controller/sync"
@ -151,14 +149,6 @@ func StartControllers(s *options.CMServer, restClientCfg *restclient.Config) err
go serviceController.Run(s.ConcurrentServiceSyncs, wait.NeverStop) go serviceController.Run(s.ConcurrentServiceSyncs, wait.NeverStop)
} }
if controllerEnabled(s.Controllers, serverResources, namespacecontroller.ControllerName, namespacecontroller.RequiredResources, true) {
glog.V(3).Infof("Loading client config for namespace controller %q", namespacecontroller.UserAgentName)
nsClientset := federationclientset.NewForConfigOrDie(restclient.AddUserAgent(restClientCfg, namespacecontroller.UserAgentName))
namespaceController := namespacecontroller.NewNamespaceController(nsClientset, dynamic.NewDynamicClientPool(restclient.AddUserAgent(restClientCfg, namespacecontroller.UserAgentName)))
glog.V(3).Infof("Running namespace controller")
namespaceController.Run(wait.NeverStop)
}
for kind, federatedType := range federatedtypes.FederatedTypes() { for kind, federatedType := range federatedtypes.FederatedTypes() {
if controllerEnabled(s.Controllers, serverResources, federatedType.ControllerName, federatedType.RequiredResources, true) { if controllerEnabled(s.Controllers, serverResources, federatedType.ControllerName, federatedType.RequiredResources, true) {
synccontroller.StartFederationSyncController(kind, federatedType.AdapterFactory, restClientCfg, stopChan, minimizeLatency) synccontroller.StartFederationSyncController(kind, federatedType.AdapterFactory, restClientCfg, stopChan, minimizeLatency)

View File

@ -15,6 +15,8 @@ go_library(
"configmap.go", "configmap.go",
"daemonset.go", "daemonset.go",
"deployment.go", "deployment.go",
"namespace.go",
"qualifiedname.go",
"registry.go", "registry.go",
"replicaset.go", "replicaset.go",
"scheduling.go", "scheduling.go",
@ -29,7 +31,9 @@ go_library(
"//federation/pkg/federation-controller/util/planner:go_default_library", "//federation/pkg/federation-controller/util/planner:go_default_library",
"//federation/pkg/federation-controller/util/podanalyzer:go_default_library", "//federation/pkg/federation-controller/util/podanalyzer:go_default_library",
"//federation/pkg/federation-controller/util/replicapreferences:go_default_library", "//federation/pkg/federation-controller/util/replicapreferences:go_default_library",
"//pkg/api:go_default_library",
"//pkg/client/clientset_generated/clientset:go_default_library", "//pkg/client/clientset_generated/clientset:go_default_library",
"//pkg/controller/namespace/deletion:go_default_library",
"//vendor/github.com/golang/glog:go_default_library", "//vendor/github.com/golang/glog:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/api/extensions/v1beta1:go_default_library", "//vendor/k8s.io/api/extensions/v1beta1:go_default_library",
@ -37,8 +41,10 @@ go_library(
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", "//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/watch:go_default_library", "//vendor/k8s.io/apimachinery/pkg/watch:go_default_library",
"//vendor/k8s.io/client-go/dynamic:go_default_library",
"//vendor/k8s.io/client-go/rest:go_default_library",
"//vendor/k8s.io/client-go/tools/record:go_default_library",
], ],
) )

View File

@ -19,8 +19,8 @@ package federatedtypes
import ( import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
pkgruntime "k8s.io/apimachinery/pkg/runtime" pkgruntime "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/watch" "k8s.io/apimachinery/pkg/watch"
restclient "k8s.io/client-go/rest"
federationclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset" federationclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset"
kubeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" kubeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
) )
@ -34,21 +34,21 @@ type FederatedTypeAdapter interface {
IsExpectedType(obj interface{}) bool IsExpectedType(obj interface{}) bool
Copy(obj pkgruntime.Object) pkgruntime.Object Copy(obj pkgruntime.Object) pkgruntime.Object
Equivalent(obj1, obj2 pkgruntime.Object) bool Equivalent(obj1, obj2 pkgruntime.Object) bool
NamespacedName(obj pkgruntime.Object) types.NamespacedName QualifiedName(obj pkgruntime.Object) QualifiedName
ObjectMeta(obj pkgruntime.Object) *metav1.ObjectMeta ObjectMeta(obj pkgruntime.Object) *metav1.ObjectMeta
// Fed* operations target the federation control plane // Fed* operations target the federation control plane
FedCreate(obj pkgruntime.Object) (pkgruntime.Object, error) FedCreate(obj pkgruntime.Object) (pkgruntime.Object, error)
FedDelete(namespacedName types.NamespacedName, options *metav1.DeleteOptions) error FedDelete(qualifiedName QualifiedName, options *metav1.DeleteOptions) error
FedGet(namespacedName types.NamespacedName) (pkgruntime.Object, error) FedGet(qualifiedName QualifiedName) (pkgruntime.Object, error)
FedList(namespace string, options metav1.ListOptions) (pkgruntime.Object, error) FedList(namespace string, options metav1.ListOptions) (pkgruntime.Object, error)
FedUpdate(obj pkgruntime.Object) (pkgruntime.Object, error) FedUpdate(obj pkgruntime.Object) (pkgruntime.Object, error)
FedWatch(namespace string, options metav1.ListOptions) (watch.Interface, error) FedWatch(namespace string, options metav1.ListOptions) (watch.Interface, error)
// The following operations are intended to target a cluster that is a member of a federation // The following operations are intended to target a cluster that is a member of a federation
ClusterCreate(client kubeclientset.Interface, obj pkgruntime.Object) (pkgruntime.Object, error) ClusterCreate(client kubeclientset.Interface, obj pkgruntime.Object) (pkgruntime.Object, error)
ClusterDelete(client kubeclientset.Interface, nsName types.NamespacedName, options *metav1.DeleteOptions) error ClusterDelete(client kubeclientset.Interface, qualifiedName QualifiedName, options *metav1.DeleteOptions) error
ClusterGet(client kubeclientset.Interface, namespacedName types.NamespacedName) (pkgruntime.Object, error) ClusterGet(client kubeclientset.Interface, qualifiedName QualifiedName) (pkgruntime.Object, error)
ClusterList(client kubeclientset.Interface, namespace string, options metav1.ListOptions) (pkgruntime.Object, error) ClusterList(client kubeclientset.Interface, namespace string, options metav1.ListOptions) (pkgruntime.Object, error)
ClusterUpdate(client kubeclientset.Interface, obj pkgruntime.Object) (pkgruntime.Object, error) ClusterUpdate(client kubeclientset.Interface, obj pkgruntime.Object) (pkgruntime.Object, error)
ClusterWatch(client kubeclientset.Interface, namespace string, options metav1.ListOptions) (watch.Interface, error) ClusterWatch(client kubeclientset.Interface, namespace string, options metav1.ListOptions) (watch.Interface, error)
@ -62,7 +62,7 @@ type FederatedTypeAdapter interface {
// that create instances of FederatedTypeAdapter. Such methods should // that create instances of FederatedTypeAdapter. Such methods should
// be registered with RegisterAdapterFactory to ensure the type // be registered with RegisterAdapterFactory to ensure the type
// adapter is discoverable. // adapter is discoverable.
type AdapterFactory func(client federationclientset.Interface) FederatedTypeAdapter type AdapterFactory func(client federationclientset.Interface, config *restclient.Config) FederatedTypeAdapter
// SetAnnotation sets the given key and value in the given object's ObjectMeta.Annotations map // SetAnnotation sets the given key and value in the given object's ObjectMeta.Annotations map
func SetAnnotation(adapter FederatedTypeAdapter, obj pkgruntime.Object, key, value string) { func SetAnnotation(adapter FederatedTypeAdapter, obj pkgruntime.Object, key, value string) {
@ -75,5 +75,5 @@ func SetAnnotation(adapter FederatedTypeAdapter, obj pkgruntime.Object, key, val
// ObjectKey returns a cluster-unique key for the given object // ObjectKey returns a cluster-unique key for the given object
func ObjectKey(adapter FederatedTypeAdapter, obj pkgruntime.Object) string { func ObjectKey(adapter FederatedTypeAdapter, obj pkgruntime.Object) string {
return adapter.NamespacedName(obj).String() return adapter.QualifiedName(obj).String()
} }

View File

@ -21,8 +21,8 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
pkgruntime "k8s.io/apimachinery/pkg/runtime" pkgruntime "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/watch" "k8s.io/apimachinery/pkg/watch"
restclient "k8s.io/client-go/rest"
federationclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset" federationclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset"
"k8s.io/kubernetes/federation/pkg/federation-controller/util" "k8s.io/kubernetes/federation/pkg/federation-controller/util"
kubeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" kubeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
@ -41,7 +41,7 @@ type ConfigMapAdapter struct {
client federationclientset.Interface client federationclientset.Interface
} }
func NewConfigMapAdapter(client federationclientset.Interface) FederatedTypeAdapter { func NewConfigMapAdapter(client federationclientset.Interface, config *restclient.Config) FederatedTypeAdapter {
return &ConfigMapAdapter{client: client} return &ConfigMapAdapter{client: client}
} }
@ -72,9 +72,9 @@ func (a *ConfigMapAdapter) Equivalent(obj1, obj2 pkgruntime.Object) bool {
return util.ConfigMapEquivalent(configmap1, configmap2) return util.ConfigMapEquivalent(configmap1, configmap2)
} }
func (a *ConfigMapAdapter) NamespacedName(obj pkgruntime.Object) types.NamespacedName { func (a *ConfigMapAdapter) QualifiedName(obj pkgruntime.Object) QualifiedName {
configmap := obj.(*apiv1.ConfigMap) configmap := obj.(*apiv1.ConfigMap)
return types.NamespacedName{Namespace: configmap.Namespace, Name: configmap.Name} return QualifiedName{Namespace: configmap.Namespace, Name: configmap.Name}
} }
func (a *ConfigMapAdapter) ObjectMeta(obj pkgruntime.Object) *metav1.ObjectMeta { func (a *ConfigMapAdapter) ObjectMeta(obj pkgruntime.Object) *metav1.ObjectMeta {
@ -86,12 +86,12 @@ func (a *ConfigMapAdapter) FedCreate(obj pkgruntime.Object) (pkgruntime.Object,
return a.client.CoreV1().ConfigMaps(configmap.Namespace).Create(configmap) return a.client.CoreV1().ConfigMaps(configmap.Namespace).Create(configmap)
} }
func (a *ConfigMapAdapter) FedDelete(namespacedName types.NamespacedName, options *metav1.DeleteOptions) error { func (a *ConfigMapAdapter) FedDelete(qualifiedName QualifiedName, options *metav1.DeleteOptions) error {
return a.client.CoreV1().ConfigMaps(namespacedName.Namespace).Delete(namespacedName.Name, options) return a.client.CoreV1().ConfigMaps(qualifiedName.Namespace).Delete(qualifiedName.Name, options)
} }
func (a *ConfigMapAdapter) FedGet(namespacedName types.NamespacedName) (pkgruntime.Object, error) { func (a *ConfigMapAdapter) FedGet(qualifiedName QualifiedName) (pkgruntime.Object, error) {
return a.client.CoreV1().ConfigMaps(namespacedName.Namespace).Get(namespacedName.Name, metav1.GetOptions{}) return a.client.CoreV1().ConfigMaps(qualifiedName.Namespace).Get(qualifiedName.Name, metav1.GetOptions{})
} }
func (a *ConfigMapAdapter) FedList(namespace string, options metav1.ListOptions) (pkgruntime.Object, error) { func (a *ConfigMapAdapter) FedList(namespace string, options metav1.ListOptions) (pkgruntime.Object, error) {
@ -112,12 +112,12 @@ func (a *ConfigMapAdapter) ClusterCreate(client kubeclientset.Interface, obj pkg
return client.CoreV1().ConfigMaps(configmap.Namespace).Create(configmap) return client.CoreV1().ConfigMaps(configmap.Namespace).Create(configmap)
} }
func (a *ConfigMapAdapter) ClusterDelete(client kubeclientset.Interface, nsName types.NamespacedName, options *metav1.DeleteOptions) error { func (a *ConfigMapAdapter) ClusterDelete(client kubeclientset.Interface, qualifiedName QualifiedName, options *metav1.DeleteOptions) error {
return client.CoreV1().ConfigMaps(nsName.Namespace).Delete(nsName.Name, options) return client.CoreV1().ConfigMaps(qualifiedName.Namespace).Delete(qualifiedName.Name, options)
} }
func (a *ConfigMapAdapter) ClusterGet(client kubeclientset.Interface, namespacedName types.NamespacedName) (pkgruntime.Object, error) { func (a *ConfigMapAdapter) ClusterGet(client kubeclientset.Interface, qualifiedName QualifiedName) (pkgruntime.Object, error) {
return client.CoreV1().ConfigMaps(namespacedName.Namespace).Get(namespacedName.Name, metav1.GetOptions{}) return client.CoreV1().ConfigMaps(qualifiedName.Namespace).Get(qualifiedName.Name, metav1.GetOptions{})
} }
func (a *ConfigMapAdapter) ClusterList(client kubeclientset.Interface, namespace string, options metav1.ListOptions) (pkgruntime.Object, error) { func (a *ConfigMapAdapter) ClusterList(client kubeclientset.Interface, namespace string, options metav1.ListOptions) (pkgruntime.Object, error) {

View File

@ -17,6 +17,7 @@ limitations under the License.
package crudtester package crudtester
import ( import (
"fmt"
"time" "time"
"k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/errors"
@ -76,15 +77,20 @@ func (c *FederatedTypeCRUDTester) CheckLifecycle(desiredObject pkgruntime.Object
func (c *FederatedTypeCRUDTester) Create(desiredObject pkgruntime.Object) pkgruntime.Object { func (c *FederatedTypeCRUDTester) Create(desiredObject pkgruntime.Object) pkgruntime.Object {
namespace := c.adapter.ObjectMeta(desiredObject).Namespace namespace := c.adapter.ObjectMeta(desiredObject).Namespace
c.tl.Logf("Creating new federated %s in namespace %q", c.kind, namespace) resourceMsg := fmt.Sprintf("federated %s", c.kind)
if len(namespace) > 0 {
resourceMsg = fmt.Sprintf("%s in namespace %q", resourceMsg, namespace)
}
c.tl.Logf("Creating new %s", resourceMsg)
obj, err := c.adapter.FedCreate(desiredObject) obj, err := c.adapter.FedCreate(desiredObject)
if err != nil { if err != nil {
c.tl.Fatalf("Error creating federated %s in namespace %q : %v", c.kind, namespace, err) c.tl.Fatalf("Error creating %s: %v", resourceMsg, err)
} }
namespacedName := c.adapter.NamespacedName(obj) qualifiedName := c.adapter.QualifiedName(obj)
c.tl.Logf("Created new federated %s %q", c.kind, namespacedName) c.tl.Logf("Created new federated %s %q", c.kind, qualifiedName)
return obj return obj
} }
@ -98,7 +104,7 @@ func (c *FederatedTypeCRUDTester) CheckCreate(desiredObject pkgruntime.Object) p
} }
func (c *FederatedTypeCRUDTester) CheckUpdate(obj pkgruntime.Object) { func (c *FederatedTypeCRUDTester) CheckUpdate(obj pkgruntime.Object) {
namespacedName := c.adapter.NamespacedName(obj) qualifiedName := c.adapter.QualifiedName(obj)
var initialAnnotation string var initialAnnotation string
meta := c.adapter.ObjectMeta(obj) meta := c.adapter.ObjectMeta(obj)
@ -106,29 +112,29 @@ func (c *FederatedTypeCRUDTester) CheckUpdate(obj pkgruntime.Object) {
initialAnnotation = meta.Annotations[AnnotationTestFederationCRUDUpdate] initialAnnotation = meta.Annotations[AnnotationTestFederationCRUDUpdate]
} }
c.tl.Logf("Updating federated %s %q", c.kind, namespacedName) c.tl.Logf("Updating federated %s %q", c.kind, qualifiedName)
updatedObj, err := c.updateFedObject(obj) updatedObj, err := c.updateFedObject(obj)
if err != nil { if err != nil {
c.tl.Fatalf("Error updating federated %s %q: %v", c.kind, namespacedName, err) c.tl.Fatalf("Error updating federated %s %q: %v", c.kind, qualifiedName, err)
} }
// updateFedObject is expected to have changed the value of the annotation // updateFedObject is expected to have changed the value of the annotation
meta = c.adapter.ObjectMeta(updatedObj) meta = c.adapter.ObjectMeta(updatedObj)
updatedAnnotation := meta.Annotations[AnnotationTestFederationCRUDUpdate] updatedAnnotation := meta.Annotations[AnnotationTestFederationCRUDUpdate]
if updatedAnnotation == initialAnnotation { if updatedAnnotation == initialAnnotation {
c.tl.Fatalf("Federated %s %q not mutated", c.kind, namespacedName) c.tl.Fatalf("Federated %s %q not mutated", c.kind, qualifiedName)
} }
c.CheckPropagation(updatedObj) c.CheckPropagation(updatedObj)
} }
func (c *FederatedTypeCRUDTester) CheckDelete(obj pkgruntime.Object, orphanDependents *bool) { func (c *FederatedTypeCRUDTester) CheckDelete(obj pkgruntime.Object, orphanDependents *bool) {
namespacedName := c.adapter.NamespacedName(obj) qualifiedName := c.adapter.QualifiedName(obj)
c.tl.Logf("Deleting federated %s %q", c.kind, namespacedName) c.tl.Logf("Deleting federated %s %q", c.kind, qualifiedName)
err := c.adapter.FedDelete(namespacedName, &metav1.DeleteOptions{OrphanDependents: orphanDependents}) err := c.adapter.FedDelete(qualifiedName, &metav1.DeleteOptions{OrphanDependents: orphanDependents})
if err != nil { if err != nil {
c.tl.Fatalf("Error deleting federated %s %q: %v", c.kind, namespacedName, err) c.tl.Fatalf("Error deleting federated %s %q: %v", c.kind, qualifiedName, err)
} }
deletingInCluster := (orphanDependents != nil && *orphanDependents == false) deletingInCluster := (orphanDependents != nil && *orphanDependents == false)
@ -142,14 +148,14 @@ func (c *FederatedTypeCRUDTester) CheckDelete(obj pkgruntime.Object, orphanDepen
// Wait for deletion. The federation resource will only be removed once orphan deletion has been // Wait for deletion. The federation resource will only be removed once orphan deletion has been
// completed or deemed unnecessary. // completed or deemed unnecessary.
err = wait.PollImmediate(c.waitInterval, waitTimeout, func() (bool, error) { err = wait.PollImmediate(c.waitInterval, waitTimeout, func() (bool, error) {
_, err := c.adapter.FedGet(namespacedName) _, err := c.adapter.FedGet(qualifiedName)
if errors.IsNotFound(err) { if errors.IsNotFound(err) {
return true, nil return true, nil
} }
return false, err return false, err
}) })
if err != nil { if err != nil {
c.tl.Fatalf("Error deleting federated %s %q: %v", c.kind, namespacedName, err) c.tl.Fatalf("Error deleting federated %s %q: %v", c.kind, qualifiedName, err)
} }
var stateMsg string = "present" var stateMsg string = "present"
@ -157,14 +163,14 @@ func (c *FederatedTypeCRUDTester) CheckDelete(obj pkgruntime.Object, orphanDepen
stateMsg = "not present" stateMsg = "not present"
} }
for _, client := range c.clusterClients { for _, client := range c.clusterClients {
_, err := c.adapter.ClusterGet(client, namespacedName) _, err := c.adapter.ClusterGet(client, qualifiedName)
switch { switch {
case !deletingInCluster && errors.IsNotFound(err): case !deletingInCluster && errors.IsNotFound(err):
c.tl.Fatalf("Federated %s %q was unexpectedly deleted from a member cluster", c.kind, namespacedName) c.tl.Fatalf("Federated %s %q was unexpectedly deleted from a member cluster", c.kind, qualifiedName)
case deletingInCluster && err == nil: case deletingInCluster && err == nil:
c.tl.Fatalf("Federated %s %q was unexpectedly orphaned in a member cluster", c.kind, namespacedName) c.tl.Fatalf("Federated %s %q was unexpectedly orphaned in a member cluster", c.kind, qualifiedName)
case err != nil && !errors.IsNotFound(err): case err != nil && !errors.IsNotFound(err):
c.tl.Fatalf("Error while checking whether %s %q is %s in member clusters: %v", c.kind, namespacedName, stateMsg, err) c.tl.Fatalf("Error while checking whether %s %q is %s in member clusters: %v", c.kind, qualifiedName, stateMsg, err)
} }
} }
} }
@ -176,26 +182,26 @@ func (c *FederatedTypeCRUDTester) CheckPropagation(obj pkgruntime.Object) {
// CheckPropagationForClients checks propagation for the provided clients // CheckPropagationForClients checks propagation for the provided clients
func (c *FederatedTypeCRUDTester) CheckPropagationForClients(obj pkgruntime.Object, clusterClients []clientset.Interface, objExpected bool) { func (c *FederatedTypeCRUDTester) CheckPropagationForClients(obj pkgruntime.Object, clusterClients []clientset.Interface, objExpected bool) {
namespacedName := c.adapter.NamespacedName(obj) qualifiedName := c.adapter.QualifiedName(obj)
c.tl.Logf("Waiting for %s %q in %d clusters", c.kind, namespacedName, len(clusterClients)) c.tl.Logf("Waiting for %s %q in %d clusters", c.kind, qualifiedName, len(clusterClients))
for _, client := range clusterClients { for _, client := range clusterClients {
err := c.waitForResource(client, obj) err := c.waitForResource(client, obj)
switch { switch {
case err == wait.ErrWaitTimeout: case err == wait.ErrWaitTimeout:
if objExpected { if objExpected {
c.tl.Fatalf("Timeout verifying %s %q in a member cluster: %v", c.kind, namespacedName, err) c.tl.Fatalf("Timeout verifying %s %q in a member cluster: %v", c.kind, qualifiedName, err)
} }
case err != nil: case err != nil:
c.tl.Fatalf("Failed to verify %s %q in a member cluster: %v", c.kind, namespacedName, err) c.tl.Fatalf("Failed to verify %s %q in a member cluster: %v", c.kind, qualifiedName, err)
case err == nil && !objExpected: case err == nil && !objExpected:
c.tl.Fatalf("Found unexpected object %s %q in a member cluster: %v", c.kind, namespacedName, err) c.tl.Fatalf("Found unexpected object %s %q in a member cluster: %v", c.kind, qualifiedName, err)
} }
} }
} }
func (c *FederatedTypeCRUDTester) waitForResource(client clientset.Interface, obj pkgruntime.Object) error { func (c *FederatedTypeCRUDTester) waitForResource(client clientset.Interface, obj pkgruntime.Object) error {
namespacedName := c.adapter.NamespacedName(obj) qualifiedName := c.adapter.QualifiedName(obj)
err := wait.PollImmediate(c.waitInterval, c.clusterWaitTimeout, func() (bool, error) { err := wait.PollImmediate(c.waitInterval, c.clusterWaitTimeout, func() (bool, error) {
equivalenceFunc := c.adapter.Equivalent equivalenceFunc := c.adapter.Equivalent
if c.adapter.IsSchedulingAdapter() { if c.adapter.IsSchedulingAdapter() {
@ -206,7 +212,7 @@ func (c *FederatedTypeCRUDTester) waitForResource(client clientset.Interface, ob
equivalenceFunc = schedulingAdapter.EquivalentIgnoringSchedule equivalenceFunc = schedulingAdapter.EquivalentIgnoringSchedule
} }
clusterObj, err := c.adapter.ClusterGet(client, namespacedName) clusterObj, err := c.adapter.ClusterGet(client, qualifiedName)
if err == nil && equivalenceFunc(clusterObj, obj) { if err == nil && equivalenceFunc(clusterObj, obj) {
return true, nil return true, nil
} }
@ -227,8 +233,8 @@ func (c *FederatedTypeCRUDTester) updateFedObject(obj pkgruntime.Object) (pkgrun
if errors.IsConflict(err) { if errors.IsConflict(err) {
// The resource was updated by the federation controller. // The resource was updated by the federation controller.
// Get the latest version and retry. // Get the latest version and retry.
namespacedName := c.adapter.NamespacedName(obj) qualifiedName := c.adapter.QualifiedName(obj)
obj, err = c.adapter.FedGet(namespacedName) obj, err = c.adapter.FedGet(qualifiedName)
return false, err return false, err
} }
// Be tolerant of a slow server // Be tolerant of a slow server

View File

@ -24,8 +24,8 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
pkgruntime "k8s.io/apimachinery/pkg/runtime" pkgruntime "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/watch" "k8s.io/apimachinery/pkg/watch"
restclient "k8s.io/client-go/rest"
federationclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset" federationclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset"
"k8s.io/kubernetes/federation/pkg/federation-controller/util" "k8s.io/kubernetes/federation/pkg/federation-controller/util"
kubeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" kubeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
@ -44,7 +44,7 @@ type DaemonSetAdapter struct {
client federationclientset.Interface client federationclientset.Interface
} }
func NewDaemonSetAdapter(client federationclientset.Interface) FederatedTypeAdapter { func NewDaemonSetAdapter(client federationclientset.Interface, config *restclient.Config) FederatedTypeAdapter {
return &DaemonSetAdapter{client: client} return &DaemonSetAdapter{client: client}
} }
@ -75,9 +75,9 @@ func (a *DaemonSetAdapter) Equivalent(obj1, obj2 pkgruntime.Object) bool {
return util.ObjectMetaEquivalent(daemonset1.ObjectMeta, daemonset2.ObjectMeta) && reflect.DeepEqual(daemonset1.Spec, daemonset2.Spec) return util.ObjectMetaEquivalent(daemonset1.ObjectMeta, daemonset2.ObjectMeta) && reflect.DeepEqual(daemonset1.Spec, daemonset2.Spec)
} }
func (a *DaemonSetAdapter) NamespacedName(obj pkgruntime.Object) types.NamespacedName { func (a *DaemonSetAdapter) QualifiedName(obj pkgruntime.Object) QualifiedName {
daemonset := obj.(*extensionsv1.DaemonSet) daemonset := obj.(*extensionsv1.DaemonSet)
return types.NamespacedName{Namespace: daemonset.Namespace, Name: daemonset.Name} return QualifiedName{Namespace: daemonset.Namespace, Name: daemonset.Name}
} }
func (a *DaemonSetAdapter) ObjectMeta(obj pkgruntime.Object) *metav1.ObjectMeta { func (a *DaemonSetAdapter) ObjectMeta(obj pkgruntime.Object) *metav1.ObjectMeta {
@ -89,12 +89,12 @@ func (a *DaemonSetAdapter) FedCreate(obj pkgruntime.Object) (pkgruntime.Object,
return a.client.Extensions().DaemonSets(daemonset.Namespace).Create(daemonset) return a.client.Extensions().DaemonSets(daemonset.Namespace).Create(daemonset)
} }
func (a *DaemonSetAdapter) FedDelete(namespacedName types.NamespacedName, options *metav1.DeleteOptions) error { func (a *DaemonSetAdapter) FedDelete(qualifiedName QualifiedName, options *metav1.DeleteOptions) error {
return a.client.Extensions().DaemonSets(namespacedName.Namespace).Delete(namespacedName.Name, options) return a.client.Extensions().DaemonSets(qualifiedName.Namespace).Delete(qualifiedName.Name, options)
} }
func (a *DaemonSetAdapter) FedGet(namespacedName types.NamespacedName) (pkgruntime.Object, error) { func (a *DaemonSetAdapter) FedGet(qualifiedName QualifiedName) (pkgruntime.Object, error) {
return a.client.Extensions().DaemonSets(namespacedName.Namespace).Get(namespacedName.Name, metav1.GetOptions{}) return a.client.Extensions().DaemonSets(qualifiedName.Namespace).Get(qualifiedName.Name, metav1.GetOptions{})
} }
func (a *DaemonSetAdapter) FedList(namespace string, options metav1.ListOptions) (pkgruntime.Object, error) { func (a *DaemonSetAdapter) FedList(namespace string, options metav1.ListOptions) (pkgruntime.Object, error) {
@ -115,12 +115,12 @@ func (a *DaemonSetAdapter) ClusterCreate(client kubeclientset.Interface, obj pkg
return client.Extensions().DaemonSets(daemonset.Namespace).Create(daemonset) return client.Extensions().DaemonSets(daemonset.Namespace).Create(daemonset)
} }
func (a *DaemonSetAdapter) ClusterDelete(client kubeclientset.Interface, nsName types.NamespacedName, options *metav1.DeleteOptions) error { func (a *DaemonSetAdapter) ClusterDelete(client kubeclientset.Interface, qualifiedName QualifiedName, options *metav1.DeleteOptions) error {
return client.Extensions().DaemonSets(nsName.Namespace).Delete(nsName.Name, options) return client.Extensions().DaemonSets(qualifiedName.Namespace).Delete(qualifiedName.Name, options)
} }
func (a *DaemonSetAdapter) ClusterGet(client kubeclientset.Interface, namespacedName types.NamespacedName) (pkgruntime.Object, error) { func (a *DaemonSetAdapter) ClusterGet(client kubeclientset.Interface, qualifiedName QualifiedName) (pkgruntime.Object, error) {
return client.Extensions().DaemonSets(namespacedName.Namespace).Get(namespacedName.Name, metav1.GetOptions{}) return client.Extensions().DaemonSets(qualifiedName.Namespace).Get(qualifiedName.Name, metav1.GetOptions{})
} }
func (a *DaemonSetAdapter) ClusterList(client kubeclientset.Interface, namespace string, options metav1.ListOptions) (pkgruntime.Object, error) { func (a *DaemonSetAdapter) ClusterList(client kubeclientset.Interface, namespace string, options metav1.ListOptions) (pkgruntime.Object, error) {

View File

@ -22,8 +22,8 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
pkgruntime "k8s.io/apimachinery/pkg/runtime" pkgruntime "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/watch" "k8s.io/apimachinery/pkg/watch"
restclient "k8s.io/client-go/rest"
federationclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset" federationclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset"
fedutil "k8s.io/kubernetes/federation/pkg/federation-controller/util" fedutil "k8s.io/kubernetes/federation/pkg/federation-controller/util"
kubeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" kubeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
@ -44,7 +44,7 @@ type DeploymentAdapter struct {
client federationclientset.Interface client federationclientset.Interface
} }
func NewDeploymentAdapter(client federationclientset.Interface) FederatedTypeAdapter { func NewDeploymentAdapter(client federationclientset.Interface, config *restclient.Config) FederatedTypeAdapter {
schedulingAdapter := schedulingAdapter{ schedulingAdapter := schedulingAdapter{
preferencesAnnotationName: FedDeploymentPreferencesAnnotation, preferencesAnnotationName: FedDeploymentPreferencesAnnotation,
updateStatusFunc: func(obj pkgruntime.Object, status SchedulingStatus) error { updateStatusFunc: func(obj pkgruntime.Object, status SchedulingStatus) error {
@ -91,9 +91,9 @@ func (a *DeploymentAdapter) Equivalent(obj1, obj2 pkgruntime.Object) bool {
return fedutil.DeploymentEquivalent(deployment1, deployment2) return fedutil.DeploymentEquivalent(deployment1, deployment2)
} }
func (a *DeploymentAdapter) NamespacedName(obj pkgruntime.Object) types.NamespacedName { func (a *DeploymentAdapter) QualifiedName(obj pkgruntime.Object) QualifiedName {
deployment := obj.(*extensionsv1.Deployment) deployment := obj.(*extensionsv1.Deployment)
return types.NamespacedName{Namespace: deployment.Namespace, Name: deployment.Name} return QualifiedName{Namespace: deployment.Namespace, Name: deployment.Name}
} }
func (a *DeploymentAdapter) ObjectMeta(obj pkgruntime.Object) *metav1.ObjectMeta { func (a *DeploymentAdapter) ObjectMeta(obj pkgruntime.Object) *metav1.ObjectMeta {
@ -105,12 +105,12 @@ func (a *DeploymentAdapter) FedCreate(obj pkgruntime.Object) (pkgruntime.Object,
return a.client.Extensions().Deployments(deployment.Namespace).Create(deployment) return a.client.Extensions().Deployments(deployment.Namespace).Create(deployment)
} }
func (a *DeploymentAdapter) FedDelete(namespacedName types.NamespacedName, options *metav1.DeleteOptions) error { func (a *DeploymentAdapter) FedDelete(qualifiedName QualifiedName, options *metav1.DeleteOptions) error {
return a.client.Extensions().Deployments(namespacedName.Namespace).Delete(namespacedName.Name, options) return a.client.Extensions().Deployments(qualifiedName.Namespace).Delete(qualifiedName.Name, options)
} }
func (a *DeploymentAdapter) FedGet(namespacedName types.NamespacedName) (pkgruntime.Object, error) { func (a *DeploymentAdapter) FedGet(qualifiedName QualifiedName) (pkgruntime.Object, error) {
return a.client.Extensions().Deployments(namespacedName.Namespace).Get(namespacedName.Name, metav1.GetOptions{}) return a.client.Extensions().Deployments(qualifiedName.Namespace).Get(qualifiedName.Name, metav1.GetOptions{})
} }
func (a *DeploymentAdapter) FedList(namespace string, options metav1.ListOptions) (pkgruntime.Object, error) { func (a *DeploymentAdapter) FedList(namespace string, options metav1.ListOptions) (pkgruntime.Object, error) {
@ -131,12 +131,12 @@ func (a *DeploymentAdapter) ClusterCreate(client kubeclientset.Interface, obj pk
return client.Extensions().Deployments(deployment.Namespace).Create(deployment) return client.Extensions().Deployments(deployment.Namespace).Create(deployment)
} }
func (a *DeploymentAdapter) ClusterDelete(client kubeclientset.Interface, nsName types.NamespacedName, options *metav1.DeleteOptions) error { func (a *DeploymentAdapter) ClusterDelete(client kubeclientset.Interface, qualifiedName QualifiedName, options *metav1.DeleteOptions) error {
return client.Extensions().Deployments(nsName.Namespace).Delete(nsName.Name, options) return client.Extensions().Deployments(qualifiedName.Namespace).Delete(qualifiedName.Name, options)
} }
func (a *DeploymentAdapter) ClusterGet(client kubeclientset.Interface, namespacedName types.NamespacedName) (pkgruntime.Object, error) { func (a *DeploymentAdapter) ClusterGet(client kubeclientset.Interface, qualifiedName QualifiedName) (pkgruntime.Object, error) {
return client.Extensions().Deployments(namespacedName.Namespace).Get(namespacedName.Name, metav1.GetOptions{}) return client.Extensions().Deployments(qualifiedName.Namespace).Get(qualifiedName.Name, metav1.GetOptions{})
} }
func (a *DeploymentAdapter) ClusterList(client kubeclientset.Interface, namespace string, options metav1.ListOptions) (pkgruntime.Object, error) { func (a *DeploymentAdapter) ClusterList(client kubeclientset.Interface, namespace string, options metav1.ListOptions) (pkgruntime.Object, error) {

View File

@ -0,0 +1,215 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package federatedtypes
import (
"fmt"
apiv1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
pkgruntime "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/dynamic"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/tools/record"
federationclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset"
"k8s.io/kubernetes/federation/pkg/federation-controller/util"
"k8s.io/kubernetes/pkg/api"
kubeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
"k8s.io/kubernetes/pkg/controller/namespace/deletion"
"github.com/golang/glog"
)
const (
NamespaceKind = "namespace"
NamespaceControllerName = "namespaces"
)
func init() {
RegisterFederatedType(NamespaceKind, NamespaceControllerName, []schema.GroupVersionResource{apiv1.SchemeGroupVersion.WithResource(NamespaceControllerName)}, NewNamespaceAdapter)
}
type NamespaceAdapter struct {
client federationclientset.Interface
deleter deletion.NamespacedResourcesDeleterInterface
}
func NewNamespaceAdapter(client federationclientset.Interface, config *restclient.Config) FederatedTypeAdapter {
dynamicClientPool := dynamic.NewDynamicClientPool(config)
discoverResourcesFunc := client.Discovery().ServerPreferredNamespacedResources
deleter := deletion.NewNamespacedResourcesDeleter(
client.Core().Namespaces(),
dynamicClientPool,
nil,
discoverResourcesFunc,
apiv1.FinalizerKubernetes,
false)
return &NamespaceAdapter{client: client, deleter: deleter}
}
func (a *NamespaceAdapter) Kind() string {
return NamespaceKind
}
func (a *NamespaceAdapter) ObjectType() pkgruntime.Object {
return &apiv1.Namespace{}
}
func (a *NamespaceAdapter) IsExpectedType(obj interface{}) bool {
_, ok := obj.(*apiv1.Namespace)
return ok
}
func (a *NamespaceAdapter) Copy(obj pkgruntime.Object) pkgruntime.Object {
namespace := obj.(*apiv1.Namespace)
return &apiv1.Namespace{
ObjectMeta: util.DeepCopyRelevantObjectMeta(namespace.ObjectMeta),
Spec: *(util.DeepCopyApiTypeOrPanic(&namespace.Spec).(*apiv1.NamespaceSpec)),
}
}
func (a *NamespaceAdapter) Equivalent(obj1, obj2 pkgruntime.Object) bool {
return util.ObjectMetaAndSpecEquivalent(obj1, obj2)
}
func (a *NamespaceAdapter) QualifiedName(obj pkgruntime.Object) QualifiedName {
namespace := obj.(*apiv1.Namespace)
return QualifiedName{Name: namespace.Name}
}
func (a *NamespaceAdapter) ObjectMeta(obj pkgruntime.Object) *metav1.ObjectMeta {
return &obj.(*apiv1.Namespace).ObjectMeta
}
func (a *NamespaceAdapter) FedCreate(obj pkgruntime.Object) (pkgruntime.Object, error) {
namespace := obj.(*apiv1.Namespace)
return a.client.CoreV1().Namespaces().Create(namespace)
}
func (a *NamespaceAdapter) FedDelete(qualifiedName QualifiedName, options *metav1.DeleteOptions) error {
return a.client.CoreV1().Namespaces().Delete(qualifiedName.Name, options)
}
func (a *NamespaceAdapter) FedGet(qualifiedName QualifiedName) (pkgruntime.Object, error) {
return a.client.CoreV1().Namespaces().Get(qualifiedName.Name, metav1.GetOptions{})
}
func (a *NamespaceAdapter) FedList(namespace string, options metav1.ListOptions) (pkgruntime.Object, error) {
return a.client.CoreV1().Namespaces().List(options)
}
func (a *NamespaceAdapter) FedUpdate(obj pkgruntime.Object) (pkgruntime.Object, error) {
namespace := obj.(*apiv1.Namespace)
return a.client.CoreV1().Namespaces().Update(namespace)
}
func (a *NamespaceAdapter) FedWatch(namespace string, options metav1.ListOptions) (watch.Interface, error) {
return a.client.CoreV1().Namespaces().Watch(options)
}
func (a *NamespaceAdapter) ClusterCreate(client kubeclientset.Interface, obj pkgruntime.Object) (pkgruntime.Object, error) {
namespace := obj.(*apiv1.Namespace)
return client.CoreV1().Namespaces().Create(namespace)
}
func (a *NamespaceAdapter) ClusterDelete(client kubeclientset.Interface, qualifiedName QualifiedName, options *metav1.DeleteOptions) error {
return client.CoreV1().Namespaces().Delete(qualifiedName.Name, options)
}
func (a *NamespaceAdapter) ClusterGet(client kubeclientset.Interface, qualifiedName QualifiedName) (pkgruntime.Object, error) {
return client.CoreV1().Namespaces().Get(qualifiedName.Name, metav1.GetOptions{})
}
func (a *NamespaceAdapter) ClusterList(client kubeclientset.Interface, namespace string, options metav1.ListOptions) (pkgruntime.Object, error) {
return client.CoreV1().Namespaces().List(options)
}
func (a *NamespaceAdapter) ClusterUpdate(client kubeclientset.Interface, obj pkgruntime.Object) (pkgruntime.Object, error) {
namespace := obj.(*apiv1.Namespace)
return client.CoreV1().Namespaces().Update(namespace)
}
func (a *NamespaceAdapter) ClusterWatch(client kubeclientset.Interface, namespace string, options metav1.ListOptions) (watch.Interface, error) {
return client.CoreV1().Namespaces().Watch(options)
}
func (a *NamespaceAdapter) IsSchedulingAdapter() bool {
return false
}
func (a *NamespaceAdapter) NewTestObject(namespace string) pkgruntime.Object {
return &apiv1.Namespace{
ObjectMeta: metav1.ObjectMeta{
GenerateName: "test-namespace-",
},
Spec: apiv1.NamespaceSpec{
Finalizers: []apiv1.FinalizerName{apiv1.FinalizerKubernetes},
},
}
}
// CleanUpNamespace deletes all resources in a given namespace.
func (a *NamespaceAdapter) CleanUpNamespace(obj pkgruntime.Object, eventRecorder record.EventRecorder) (pkgruntime.Object, error) {
namespace := obj.(*apiv1.Namespace)
name := namespace.Name
// Set Terminating status.
updatedNamespace := &apiv1.Namespace{
ObjectMeta: namespace.ObjectMeta,
Spec: namespace.Spec,
Status: apiv1.NamespaceStatus{
Phase: apiv1.NamespaceTerminating,
},
}
var err error
if namespace.Status.Phase != apiv1.NamespaceTerminating {
glog.V(2).Infof("Marking ns %s as terminating", name)
eventRecorder.Event(namespace, api.EventTypeNormal, "DeleteNamespace", fmt.Sprintf("Marking for deletion"))
_, err = a.FedUpdate(updatedNamespace)
if err != nil {
return nil, fmt.Errorf("failed to update namespace: %v", err)
}
}
if hasFinalizerInSpec(updatedNamespace, apiv1.FinalizerKubernetes) {
// Delete resources in this namespace.
err = a.deleter.Delete(name)
if err != nil {
return nil, fmt.Errorf("error in deleting resources in namespace %s: %v", name, err)
}
glog.V(2).Infof("Removed kubernetes finalizer from ns %s", name)
// Fetch the updated Namespace.
obj, err = a.FedGet(QualifiedName{Name: name})
updatedNamespace = obj.(*apiv1.Namespace)
if err != nil {
return nil, fmt.Errorf("error in fetching updated namespace %s: %s", name, err)
}
}
return updatedNamespace, nil
}
func hasFinalizerInSpec(namespace *apiv1.Namespace, finalizer apiv1.FinalizerName) bool {
for i := range namespace.Spec.Finalizers {
if namespace.Spec.Finalizers[i] == finalizer {
return true
}
}
return false
}

View File

@ -0,0 +1,41 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package federatedtypes
import (
"fmt"
)
// QualifiedName comprises a resource name with an optional namespace.
// If namespace is provided, a QualifiedName will be rendered as
// "<namespace>/<name>". If not, it will be rendered as "name". This
// is intended to allow the FederatedTypeAdapter interface and its
// consumers to operate on both namespaces and namespace-qualified
// resources.
type QualifiedName struct {
Namespace string
Name string
}
// String returns the general purpose string representation
func (n QualifiedName) String() string {
if len(n.Namespace) == 0 {
return n.Name
}
return fmt.Sprintf("%s/%s", n.Namespace, n.Name)
}

View File

@ -22,8 +22,8 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
pkgruntime "k8s.io/apimachinery/pkg/runtime" pkgruntime "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/watch" "k8s.io/apimachinery/pkg/watch"
restclient "k8s.io/client-go/rest"
federationclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset" federationclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset"
fedutil "k8s.io/kubernetes/federation/pkg/federation-controller/util" fedutil "k8s.io/kubernetes/federation/pkg/federation-controller/util"
kubeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" kubeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
@ -44,7 +44,7 @@ type ReplicaSetAdapter struct {
client federationclientset.Interface client federationclientset.Interface
} }
func NewReplicaSetAdapter(client federationclientset.Interface) FederatedTypeAdapter { func NewReplicaSetAdapter(client federationclientset.Interface, config *restclient.Config) FederatedTypeAdapter {
schedulingAdapter := schedulingAdapter{ schedulingAdapter := schedulingAdapter{
preferencesAnnotationName: FedReplicaSetPreferencesAnnotation, preferencesAnnotationName: FedReplicaSetPreferencesAnnotation,
updateStatusFunc: func(obj pkgruntime.Object, status SchedulingStatus) error { updateStatusFunc: func(obj pkgruntime.Object, status SchedulingStatus) error {
@ -91,9 +91,9 @@ func (a *ReplicaSetAdapter) Equivalent(obj1, obj2 pkgruntime.Object) bool {
return fedutil.ObjectMetaAndSpecEquivalent(obj1, obj2) return fedutil.ObjectMetaAndSpecEquivalent(obj1, obj2)
} }
func (a *ReplicaSetAdapter) NamespacedName(obj pkgruntime.Object) types.NamespacedName { func (a *ReplicaSetAdapter) QualifiedName(obj pkgruntime.Object) QualifiedName {
replicaset := obj.(*extensionsv1.ReplicaSet) replicaset := obj.(*extensionsv1.ReplicaSet)
return types.NamespacedName{Namespace: replicaset.Namespace, Name: replicaset.Name} return QualifiedName{Namespace: replicaset.Namespace, Name: replicaset.Name}
} }
func (a *ReplicaSetAdapter) ObjectMeta(obj pkgruntime.Object) *metav1.ObjectMeta { func (a *ReplicaSetAdapter) ObjectMeta(obj pkgruntime.Object) *metav1.ObjectMeta {
@ -105,12 +105,12 @@ func (a *ReplicaSetAdapter) FedCreate(obj pkgruntime.Object) (pkgruntime.Object,
return a.client.Extensions().ReplicaSets(replicaset.Namespace).Create(replicaset) return a.client.Extensions().ReplicaSets(replicaset.Namespace).Create(replicaset)
} }
func (a *ReplicaSetAdapter) FedDelete(namespacedName types.NamespacedName, options *metav1.DeleteOptions) error { func (a *ReplicaSetAdapter) FedDelete(qualifiedName QualifiedName, options *metav1.DeleteOptions) error {
return a.client.Extensions().ReplicaSets(namespacedName.Namespace).Delete(namespacedName.Name, options) return a.client.Extensions().ReplicaSets(qualifiedName.Namespace).Delete(qualifiedName.Name, options)
} }
func (a *ReplicaSetAdapter) FedGet(namespacedName types.NamespacedName) (pkgruntime.Object, error) { func (a *ReplicaSetAdapter) FedGet(qualifiedName QualifiedName) (pkgruntime.Object, error) {
return a.client.Extensions().ReplicaSets(namespacedName.Namespace).Get(namespacedName.Name, metav1.GetOptions{}) return a.client.Extensions().ReplicaSets(qualifiedName.Namespace).Get(qualifiedName.Name, metav1.GetOptions{})
} }
func (a *ReplicaSetAdapter) FedList(namespace string, options metav1.ListOptions) (pkgruntime.Object, error) { func (a *ReplicaSetAdapter) FedList(namespace string, options metav1.ListOptions) (pkgruntime.Object, error) {
@ -131,12 +131,12 @@ func (a *ReplicaSetAdapter) ClusterCreate(client kubeclientset.Interface, obj pk
return client.Extensions().ReplicaSets(replicaset.Namespace).Create(replicaset) return client.Extensions().ReplicaSets(replicaset.Namespace).Create(replicaset)
} }
func (a *ReplicaSetAdapter) ClusterDelete(client kubeclientset.Interface, nsName types.NamespacedName, options *metav1.DeleteOptions) error { func (a *ReplicaSetAdapter) ClusterDelete(client kubeclientset.Interface, qualifiedName QualifiedName, options *metav1.DeleteOptions) error {
return client.Extensions().ReplicaSets(nsName.Namespace).Delete(nsName.Name, options) return client.Extensions().ReplicaSets(qualifiedName.Namespace).Delete(qualifiedName.Name, options)
} }
func (a *ReplicaSetAdapter) ClusterGet(client kubeclientset.Interface, namespacedName types.NamespacedName) (pkgruntime.Object, error) { func (a *ReplicaSetAdapter) ClusterGet(client kubeclientset.Interface, qualifiedName QualifiedName) (pkgruntime.Object, error) {
return client.Extensions().ReplicaSets(namespacedName.Namespace).Get(namespacedName.Name, metav1.GetOptions{}) return client.Extensions().ReplicaSets(qualifiedName.Namespace).Get(qualifiedName.Name, metav1.GetOptions{})
} }
func (a *ReplicaSetAdapter) ClusterList(client kubeclientset.Interface, namespace string, options metav1.ListOptions) (pkgruntime.Object, error) { func (a *ReplicaSetAdapter) ClusterList(client kubeclientset.Interface, namespace string, options metav1.ListOptions) (pkgruntime.Object, error) {

View File

@ -21,8 +21,8 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
pkgruntime "k8s.io/apimachinery/pkg/runtime" pkgruntime "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/watch" "k8s.io/apimachinery/pkg/watch"
restclient "k8s.io/client-go/rest"
federationclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset" federationclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset"
"k8s.io/kubernetes/federation/pkg/federation-controller/util" "k8s.io/kubernetes/federation/pkg/federation-controller/util"
kubeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" kubeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
@ -41,7 +41,7 @@ type SecretAdapter struct {
client federationclientset.Interface client federationclientset.Interface
} }
func NewSecretAdapter(client federationclientset.Interface) FederatedTypeAdapter { func NewSecretAdapter(client federationclientset.Interface, config *restclient.Config) FederatedTypeAdapter {
return &SecretAdapter{client: client} return &SecretAdapter{client: client}
} }
@ -73,9 +73,9 @@ func (a *SecretAdapter) Equivalent(obj1, obj2 pkgruntime.Object) bool {
return util.SecretEquivalent(*secret1, *secret2) return util.SecretEquivalent(*secret1, *secret2)
} }
func (a *SecretAdapter) NamespacedName(obj pkgruntime.Object) types.NamespacedName { func (a *SecretAdapter) QualifiedName(obj pkgruntime.Object) QualifiedName {
secret := obj.(*apiv1.Secret) secret := obj.(*apiv1.Secret)
return types.NamespacedName{Namespace: secret.Namespace, Name: secret.Name} return QualifiedName{Namespace: secret.Namespace, Name: secret.Name}
} }
func (a *SecretAdapter) ObjectMeta(obj pkgruntime.Object) *metav1.ObjectMeta { func (a *SecretAdapter) ObjectMeta(obj pkgruntime.Object) *metav1.ObjectMeta {
@ -87,12 +87,12 @@ func (a *SecretAdapter) FedCreate(obj pkgruntime.Object) (pkgruntime.Object, err
return a.client.CoreV1().Secrets(secret.Namespace).Create(secret) return a.client.CoreV1().Secrets(secret.Namespace).Create(secret)
} }
func (a *SecretAdapter) FedDelete(namespacedName types.NamespacedName, options *metav1.DeleteOptions) error { func (a *SecretAdapter) FedDelete(qualifiedName QualifiedName, options *metav1.DeleteOptions) error {
return a.client.CoreV1().Secrets(namespacedName.Namespace).Delete(namespacedName.Name, options) return a.client.CoreV1().Secrets(qualifiedName.Namespace).Delete(qualifiedName.Name, options)
} }
func (a *SecretAdapter) FedGet(namespacedName types.NamespacedName) (pkgruntime.Object, error) { func (a *SecretAdapter) FedGet(qualifiedName QualifiedName) (pkgruntime.Object, error) {
return a.client.CoreV1().Secrets(namespacedName.Namespace).Get(namespacedName.Name, metav1.GetOptions{}) return a.client.CoreV1().Secrets(qualifiedName.Namespace).Get(qualifiedName.Name, metav1.GetOptions{})
} }
func (a *SecretAdapter) FedList(namespace string, options metav1.ListOptions) (pkgruntime.Object, error) { func (a *SecretAdapter) FedList(namespace string, options metav1.ListOptions) (pkgruntime.Object, error) {
@ -113,12 +113,12 @@ func (a *SecretAdapter) ClusterCreate(client kubeclientset.Interface, obj pkgrun
return client.CoreV1().Secrets(secret.Namespace).Create(secret) return client.CoreV1().Secrets(secret.Namespace).Create(secret)
} }
func (a *SecretAdapter) ClusterDelete(client kubeclientset.Interface, nsName types.NamespacedName, options *metav1.DeleteOptions) error { func (a *SecretAdapter) ClusterDelete(client kubeclientset.Interface, qualifiedName QualifiedName, options *metav1.DeleteOptions) error {
return client.CoreV1().Secrets(nsName.Namespace).Delete(nsName.Name, options) return client.CoreV1().Secrets(qualifiedName.Namespace).Delete(qualifiedName.Name, options)
} }
func (a *SecretAdapter) ClusterGet(client kubeclientset.Interface, namespacedName types.NamespacedName) (pkgruntime.Object, error) { func (a *SecretAdapter) ClusterGet(client kubeclientset.Interface, qualifiedName QualifiedName) (pkgruntime.Object, error) {
return client.CoreV1().Secrets(namespacedName.Namespace).Get(namespacedName.Name, metav1.GetOptions{}) return client.CoreV1().Secrets(qualifiedName.Namespace).Get(qualifiedName.Name, metav1.GetOptions{})
} }
func (a *SecretAdapter) ClusterList(client kubeclientset.Interface, namespace string, options metav1.ListOptions) (pkgruntime.Object, error) { func (a *SecretAdapter) ClusterList(client kubeclientset.Interface, namespace string, options metav1.ListOptions) (pkgruntime.Object, error) {

View File

@ -26,7 +26,6 @@ filegroup(
":package-srcs", ":package-srcs",
"//federation/pkg/federation-controller/cluster:all-srcs", "//federation/pkg/federation-controller/cluster:all-srcs",
"//federation/pkg/federation-controller/ingress:all-srcs", "//federation/pkg/federation-controller/ingress:all-srcs",
"//federation/pkg/federation-controller/namespace:all-srcs",
"//federation/pkg/federation-controller/service:all-srcs", "//federation/pkg/federation-controller/service:all-srcs",
"//federation/pkg/federation-controller/sync:all-srcs", "//federation/pkg/federation-controller/sync:all-srcs",
"//federation/pkg/federation-controller/util:all-srcs", "//federation/pkg/federation-controller/util:all-srcs",

View File

@ -1,75 +0,0 @@
package(default_visibility = ["//visibility:public"])
licenses(["notice"])
load(
"@io_bazel_rules_go//go:def.bzl",
"go_library",
"go_test",
)
go_library(
name = "go_default_library",
srcs = ["namespace_controller.go"],
tags = ["automanaged"],
deps = [
"//federation/apis/federation/v1beta1:go_default_library",
"//federation/client/clientset_generated/federation_clientset:go_default_library",
"//federation/pkg/federation-controller/util:go_default_library",
"//federation/pkg/federation-controller/util/deletionhelper:go_default_library",
"//federation/pkg/federation-controller/util/eventsink:go_default_library",
"//pkg/api:go_default_library",
"//pkg/client/clientset_generated/clientset:go_default_library",
"//pkg/controller:go_default_library",
"//pkg/controller/namespace/deletion:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/watch:go_default_library",
"//vendor/k8s.io/client-go/dynamic:go_default_library",
"//vendor/k8s.io/client-go/tools/cache:go_default_library",
"//vendor/k8s.io/client-go/tools/record:go_default_library",
"//vendor/k8s.io/client-go/util/flowcontrol:go_default_library",
],
)
go_test(
name = "go_default_test",
srcs = ["namespace_controller_test.go"],
library = ":go_default_library",
tags = ["automanaged"],
deps = [
"//federation/apis/federation/v1beta1:go_default_library",
"//federation/client/clientset_generated/federation_clientset/fake:go_default_library",
"//federation/pkg/federation-controller/util:go_default_library",
"//federation/pkg/federation-controller/util/deletionhelper:go_default_library",
"//federation/pkg/federation-controller/util/test:go_default_library",
"//pkg/client/clientset_generated/clientset:go_default_library",
"//pkg/client/clientset_generated/clientset/fake:go_default_library",
"//vendor/github.com/stretchr/testify/assert:go_default_library",
"//vendor/github.com/stretchr/testify/require:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/k8s.io/client-go/dynamic:go_default_library",
"//vendor/k8s.io/client-go/rest:go_default_library",
"//vendor/k8s.io/client-go/testing:go_default_library",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
)

View File

@ -1,460 +0,0 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package namespace
import (
"fmt"
"time"
apiv1 "k8s.io/api/core/v1"
clientv1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/flowcontrol"
federationapi "k8s.io/kubernetes/federation/apis/federation/v1beta1"
federationclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset"
"k8s.io/kubernetes/federation/pkg/federation-controller/util"
"k8s.io/kubernetes/federation/pkg/federation-controller/util/deletionhelper"
"k8s.io/kubernetes/federation/pkg/federation-controller/util/eventsink"
"k8s.io/kubernetes/pkg/api"
kubeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/namespace/deletion"
"github.com/golang/glog"
)
const (
allClustersKey = "ALL_CLUSTERS"
ControllerName = "namespaces"
UserAgentName = "federation-namespace-controller"
)
var (
RequiredResources = []schema.GroupVersionResource{apiv1.SchemeGroupVersion.WithResource("namespaces")}
)
type NamespaceController struct {
// For triggering single namespace reconciliation. This is used when there is an
// add/update/delete operation on a namespace in either federated API server or
// in some member of the federation.
namespaceDeliverer *util.DelayingDeliverer
// For triggering all namespaces reconciliation. This is used when
// a new cluster becomes available.
clusterDeliverer *util.DelayingDeliverer
// Contains namespaces present in members of federation.
namespaceFederatedInformer util.FederatedInformer
// For updating members of federation.
federatedUpdater util.FederatedUpdater
// Definitions of namespaces that should be federated.
namespaceInformerStore cache.Store
// Informer controller for namespaces that should be federated.
namespaceInformerController cache.Controller
// Client to federated api server.
federatedApiClient federationclientset.Interface
// Backoff manager for namespaces
namespaceBackoff *flowcontrol.Backoff
// For events
eventRecorder record.EventRecorder
deletionHelper *deletionhelper.DeletionHelper
// Helper to delete all resources in a namespace.
namespacedResourcesDeleter deletion.NamespacedResourcesDeleterInterface
namespaceReviewDelay time.Duration
clusterAvailableDelay time.Duration
smallDelay time.Duration
updateTimeout time.Duration
}
// NewNamespaceController returns a new namespace controller
func NewNamespaceController(client federationclientset.Interface, dynamicClientPool dynamic.ClientPool) *NamespaceController {
broadcaster := record.NewBroadcaster()
broadcaster.StartRecordingToSink(eventsink.NewFederatedEventSink(client))
recorder := broadcaster.NewRecorder(api.Scheme, clientv1.EventSource{Component: UserAgentName})
nc := &NamespaceController{
federatedApiClient: client,
namespaceReviewDelay: time.Second * 10,
clusterAvailableDelay: time.Second * 20,
smallDelay: time.Second * 3,
updateTimeout: time.Second * 30,
namespaceBackoff: flowcontrol.NewBackOff(5*time.Second, time.Minute),
eventRecorder: recorder,
}
// Build deliverers for triggering reconciliations.
nc.namespaceDeliverer = util.NewDelayingDeliverer()
nc.clusterDeliverer = util.NewDelayingDeliverer()
// Start informer in federated API servers on namespaces that should be federated.
nc.namespaceInformerStore, nc.namespaceInformerController = cache.NewInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return client.Core().Namespaces().List(options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return client.Core().Namespaces().Watch(options)
},
},
&apiv1.Namespace{},
controller.NoResyncPeriodFunc(),
util.NewTriggerOnAllChanges(func(obj runtime.Object) { nc.deliverNamespaceObj(obj, 0, false) }))
// Federated informer on namespaces in members of federation.
nc.namespaceFederatedInformer = util.NewFederatedInformer(
client,
func(cluster *federationapi.Cluster, targetClient kubeclientset.Interface) (cache.Store, cache.Controller) {
return cache.NewInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return targetClient.Core().Namespaces().List(options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return targetClient.Core().Namespaces().Watch(options)
},
},
&apiv1.Namespace{},
controller.NoResyncPeriodFunc(),
// Trigger reconciliation whenever something in federated cluster is changed. In most cases it
// would be just confirmation that some namespace operation succeeded.
util.NewTriggerOnMetaAndSpecChanges(
func(obj runtime.Object) { nc.deliverNamespaceObj(obj, nc.namespaceReviewDelay, false) },
))
},
&util.ClusterLifecycleHandlerFuncs{
ClusterAvailable: func(cluster *federationapi.Cluster) {
// When new cluster becomes available process all the namespaces again.
nc.clusterDeliverer.DeliverAfter(allClustersKey, nil, nc.clusterAvailableDelay)
},
},
)
// Federated updater along with Create/Update/Delete operations.
nc.federatedUpdater = util.NewFederatedUpdater(nc.namespaceFederatedInformer, "namespace", nc.updateTimeout, nc.eventRecorder,
func(client kubeclientset.Interface, obj runtime.Object) error {
namespace := obj.(*apiv1.Namespace)
_, err := client.Core().Namespaces().Create(namespace)
return err
},
func(client kubeclientset.Interface, obj runtime.Object) error {
namespace := obj.(*apiv1.Namespace)
_, err := client.Core().Namespaces().Update(namespace)
return err
},
func(client kubeclientset.Interface, obj runtime.Object) error {
namespace := obj.(*apiv1.Namespace)
orphanDependents := false
err := client.Core().Namespaces().Delete(namespace.Name, &metav1.DeleteOptions{OrphanDependents: &orphanDependents})
// IsNotFound error is fine since that means the object is deleted already.
if errors.IsNotFound(err) {
return nil
}
return err
})
nc.deletionHelper = deletionhelper.NewDeletionHelper(
nc.updateNamespace,
// objNameFunc
func(obj runtime.Object) string {
namespace := obj.(*apiv1.Namespace)
return fmt.Sprintf("%s/%s", namespace.Namespace, namespace.Name)
},
nc.namespaceFederatedInformer,
nc.federatedUpdater,
)
discoverResourcesFn := nc.federatedApiClient.Discovery().ServerPreferredNamespacedResources
nc.namespacedResourcesDeleter = deletion.NewNamespacedResourcesDeleter(
client.Core().Namespaces(), dynamicClientPool, nil,
discoverResourcesFn, apiv1.FinalizerKubernetes, false)
return nc
}
// Sends the given update object to apiserver.
// Assumes that the given object is a namespace.
func (nc *NamespaceController) updateNamespace(obj runtime.Object) (runtime.Object, error) {
namespace := obj.(*apiv1.Namespace)
return nc.federatedApiClient.Core().Namespaces().Update(namespace)
}
// Returns true if the given object has the given finalizer in its NamespaceSpec.
func (nc *NamespaceController) hasFinalizerFuncInSpec(obj runtime.Object, finalizer apiv1.FinalizerName) bool {
namespace := obj.(*apiv1.Namespace)
for i := range namespace.Spec.Finalizers {
if namespace.Spec.Finalizers[i] == finalizer {
return true
}
}
return false
}
// Removes the finalizer from the given objects NamespaceSpec.
func (nc *NamespaceController) removeFinalizerFromSpec(namespace *apiv1.Namespace, finalizer apiv1.FinalizerName) (*apiv1.Namespace, error) {
updatedFinalizers := []apiv1.FinalizerName{}
for i := range namespace.Spec.Finalizers {
if namespace.Spec.Finalizers[i] != finalizer {
updatedFinalizers = append(updatedFinalizers, namespace.Spec.Finalizers[i])
}
}
namespace.Spec.Finalizers = updatedFinalizers
updatedNamespace, err := nc.federatedApiClient.Core().Namespaces().Finalize(namespace)
if err != nil {
return nil, fmt.Errorf("failed to remove finalizer %s from namespace %s: %v", string(finalizer), namespace.Name, err)
}
return updatedNamespace, nil
}
func (nc *NamespaceController) Run(stopChan <-chan struct{}) {
go nc.namespaceInformerController.Run(stopChan)
nc.namespaceFederatedInformer.Start()
go func() {
<-stopChan
nc.namespaceFederatedInformer.Stop()
}()
nc.namespaceDeliverer.StartWithHandler(func(item *util.DelayingDelivererItem) {
namespace := item.Value.(string)
nc.reconcileNamespace(namespace)
})
nc.clusterDeliverer.StartWithHandler(func(_ *util.DelayingDelivererItem) {
nc.reconcileNamespacesOnClusterChange()
})
util.StartBackoffGC(nc.namespaceBackoff, stopChan)
}
func (nc *NamespaceController) deliverNamespaceObj(obj interface{}, delay time.Duration, failed bool) {
namespace := obj.(*apiv1.Namespace)
nc.deliverNamespace(namespace.Name, delay, failed)
}
// Adds backoff to delay if this delivery is related to some failure. Resets backoff if there was no failure.
func (nc *NamespaceController) deliverNamespace(namespace string, delay time.Duration, failed bool) {
if failed {
nc.namespaceBackoff.Next(namespace, time.Now())
delay = delay + nc.namespaceBackoff.Get(namespace)
} else {
nc.namespaceBackoff.Reset(namespace)
}
nc.namespaceDeliverer.DeliverAfter(namespace, namespace, delay)
}
// Check whether all data stores are in sync. False is returned if any of the informer/stores is not yet
// synced with the corresponding api server.
func (nc *NamespaceController) isSynced() bool {
if !nc.namespaceFederatedInformer.ClustersSynced() {
glog.V(2).Infof("Cluster list not synced")
return false
}
clusters, err := nc.namespaceFederatedInformer.GetReadyClusters()
if err != nil {
glog.Errorf("Failed to get ready clusters: %v", err)
return false
}
if !nc.namespaceFederatedInformer.GetTargetStore().ClustersSynced(clusters) {
return false
}
return true
}
// The function triggers reconciliation of all federated namespaces.
func (nc *NamespaceController) reconcileNamespacesOnClusterChange() {
if !nc.isSynced() {
nc.clusterDeliverer.DeliverAfter(allClustersKey, nil, nc.clusterAvailableDelay)
}
for _, obj := range nc.namespaceInformerStore.List() {
namespace := obj.(*apiv1.Namespace)
nc.deliverNamespace(namespace.Name, nc.smallDelay, false)
}
}
func (nc *NamespaceController) reconcileNamespace(namespace string) {
if !nc.isSynced() {
nc.deliverNamespace(namespace, nc.clusterAvailableDelay, false)
return
}
namespaceObjFromStore, exist, err := nc.namespaceInformerStore.GetByKey(namespace)
if err != nil {
glog.Errorf("Failed to query main namespace store for %v: %v", namespace, err)
nc.deliverNamespace(namespace, 0, true)
return
}
if !exist {
// Not federated namespace, ignoring.
return
}
// Create a copy before modifying the namespace to prevent race condition with
// other readers of namespace from store.
namespaceObj, err := api.Scheme.DeepCopy(namespaceObjFromStore)
baseNamespace, ok := namespaceObj.(*apiv1.Namespace)
if err != nil || !ok {
glog.Errorf("Error in retrieving obj from store: %v, %v", ok, err)
nc.deliverNamespace(namespace, 0, true)
return
}
if baseNamespace.DeletionTimestamp != nil {
if err := nc.delete(baseNamespace); err != nil {
glog.Errorf("Failed to delete %s: %v", namespace, err)
nc.eventRecorder.Eventf(baseNamespace, api.EventTypeWarning, "DeleteFailed",
"Namespace delete failed: %v", err)
nc.deliverNamespace(namespace, 0, true)
}
return
}
glog.V(3).Infof("Ensuring delete object from underlying clusters finalizer for namespace: %s",
baseNamespace.Name)
// Add the required finalizers before creating a namespace in
// underlying clusters.
// This ensures that the dependent namespaces are deleted in underlying
// clusters when the federated namespace is deleted.
updatedNamespaceObj, err := nc.deletionHelper.EnsureFinalizers(baseNamespace)
if err != nil {
glog.Errorf("Failed to ensure delete object from underlying clusters finalizer in namespace %s: %v",
baseNamespace.Name, err)
nc.deliverNamespace(namespace, 0, false)
return
}
baseNamespace = updatedNamespaceObj.(*apiv1.Namespace)
glog.V(3).Infof("Syncing namespace %s in underlying clusters", baseNamespace.Name)
// Sync the namespace in all underlying clusters.
clusters, err := nc.namespaceFederatedInformer.GetReadyClusters()
if err != nil {
glog.Errorf("Failed to get cluster list: %v", err)
nc.deliverNamespace(namespace, nc.clusterAvailableDelay, false)
return
}
operations := make([]util.FederatedOperation, 0)
for _, cluster := range clusters {
clusterNamespaceObj, found, err := nc.namespaceFederatedInformer.GetTargetStore().GetByKey(cluster.Name, namespace)
if err != nil {
glog.Errorf("Failed to get %s from %s: %v", namespace, cluster.Name, err)
nc.deliverNamespace(namespace, 0, true)
return
}
// The object should not be modified.
desiredNamespace := &apiv1.Namespace{
ObjectMeta: util.DeepCopyRelevantObjectMeta(baseNamespace.ObjectMeta),
Spec: *(util.DeepCopyApiTypeOrPanic(&baseNamespace.Spec).(*apiv1.NamespaceSpec)),
}
glog.V(5).Infof("Desired namespace in underlying clusters: %+v", desiredNamespace)
if !found {
operations = append(operations, util.FederatedOperation{
Type: util.OperationTypeAdd,
Obj: desiredNamespace,
ClusterName: cluster.Name,
Key: namespace,
})
} else {
clusterNamespace := clusterNamespaceObj.(*apiv1.Namespace)
// Update existing namespace, if needed.
if !util.ObjectMetaAndSpecEquivalent(desiredNamespace, clusterNamespace) {
operations = append(operations, util.FederatedOperation{
Type: util.OperationTypeUpdate,
Obj: desiredNamespace,
ClusterName: cluster.Name,
Key: namespace,
})
}
}
}
if len(operations) == 0 {
// Everything is in order
return
}
glog.V(2).Infof("Updating namespace %s in underlying clusters. Operations: %d", baseNamespace.Name, len(operations))
err = nc.federatedUpdater.Update(operations)
if err != nil {
glog.Errorf("Failed to execute updates for %s: %v", namespace, err)
nc.deliverNamespace(namespace, 0, true)
return
}
// Everything is in order but lets be double sure
nc.deliverNamespace(namespace, nc.namespaceReviewDelay, false)
}
// delete deletes the given namespace or returns error if the deletion was not complete.
func (nc *NamespaceController) delete(namespace *apiv1.Namespace) error {
// Set Terminating status.
updatedNamespace := &apiv1.Namespace{
ObjectMeta: namespace.ObjectMeta,
Spec: namespace.Spec,
Status: apiv1.NamespaceStatus{
Phase: apiv1.NamespaceTerminating,
},
}
var err error
if namespace.Status.Phase != apiv1.NamespaceTerminating {
glog.V(2).Infof("Marking ns %s as terminating", namespace.Name)
nc.eventRecorder.Event(namespace, api.EventTypeNormal, "DeleteNamespace", fmt.Sprintf("Marking for deletion"))
_, err = nc.federatedApiClient.Core().Namespaces().Update(updatedNamespace)
if err != nil {
return fmt.Errorf("failed to update namespace: %v", err)
}
}
if nc.hasFinalizerFuncInSpec(updatedNamespace, apiv1.FinalizerKubernetes) {
// Delete resources in this namespace.
err = nc.namespacedResourcesDeleter.Delete(updatedNamespace.Name)
if err != nil {
return fmt.Errorf("error in deleting resources in namespace %s: %v", namespace.Name, err)
}
glog.V(2).Infof("Removed kubernetes finalizer from ns %s", namespace.Name)
// Fetch the updated Namespace.
updatedNamespace, err = nc.federatedApiClient.Core().Namespaces().Get(updatedNamespace.Name, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("error in fetching updated namespace %s: %s", updatedNamespace.Name, err)
}
}
// Delete the namespace from all underlying clusters.
_, err = nc.deletionHelper.HandleObjectInUnderlyingClusters(updatedNamespace)
if err != nil {
return err
}
err = nc.federatedApiClient.Core().Namespaces().Delete(namespace.Name, nil)
if err != nil {
// Its all good if the error is not found error. That means it is deleted already and we do not have to do anything.
// This is expected when we are processing an update as a result of namespace finalizer deletion.
// The process that deleted the last finalizer is also going to delete the namespace and we do not have to do anything.
if !errors.IsNotFound(err) {
return fmt.Errorf("failed to delete namespace: %v", err)
}
}
return nil
}

View File

@ -1,188 +0,0 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package namespace
import (
"fmt"
"testing"
"time"
apiv1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/dynamic"
restclient "k8s.io/client-go/rest"
core "k8s.io/client-go/testing"
federationapi "k8s.io/kubernetes/federation/apis/federation/v1beta1"
fakefedclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset/fake"
"k8s.io/kubernetes/federation/pkg/federation-controller/util"
"k8s.io/kubernetes/federation/pkg/federation-controller/util/deletionhelper"
. "k8s.io/kubernetes/federation/pkg/federation-controller/util/test"
kubeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
fakekubeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset/fake"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
const (
namespaces string = "namespaces"
clusters string = "clusters"
)
func TestNamespaceController(t *testing.T) {
cluster1 := NewCluster("cluster1", apiv1.ConditionTrue)
cluster2 := NewCluster("cluster2", apiv1.ConditionTrue)
ns1 := apiv1.Namespace{
ObjectMeta: metav1.ObjectMeta{
Name: "test-namespace",
SelfLink: "/api/v1/namespaces/test-namespace",
},
Spec: apiv1.NamespaceSpec{
Finalizers: []apiv1.FinalizerName{apiv1.FinalizerKubernetes},
},
}
fakeClient := &fakefedclientset.Clientset{}
RegisterFakeList(clusters, &fakeClient.Fake, &federationapi.ClusterList{Items: []federationapi.Cluster{*cluster1}})
RegisterFakeList(namespaces, &fakeClient.Fake, &apiv1.NamespaceList{Items: []apiv1.Namespace{}})
namespaceWatch := RegisterFakeWatch(namespaces, &fakeClient.Fake)
namespaceUpdateChan := RegisterFakeCopyOnUpdate(namespaces, &fakeClient.Fake, namespaceWatch)
clusterWatch := RegisterFakeWatch(clusters, &fakeClient.Fake)
cluster1Client := &fakekubeclientset.Clientset{}
cluster1Watch := RegisterFakeWatch(namespaces, &cluster1Client.Fake)
RegisterFakeList(namespaces, &cluster1Client.Fake, &apiv1.NamespaceList{Items: []apiv1.Namespace{}})
cluster1CreateChan := RegisterFakeCopyOnCreate(namespaces, &cluster1Client.Fake, cluster1Watch)
cluster1UpdateChan := RegisterFakeCopyOnUpdate(namespaces, &cluster1Client.Fake, cluster1Watch)
cluster2Client := &fakekubeclientset.Clientset{}
cluster2Watch := RegisterFakeWatch(namespaces, &cluster2Client.Fake)
RegisterFakeList(namespaces, &cluster2Client.Fake, &apiv1.NamespaceList{Items: []apiv1.Namespace{}})
cluster2CreateChan := RegisterFakeCopyOnCreate(namespaces, &cluster2Client.Fake, cluster2Watch)
nsDeleteChan := RegisterDelete(&fakeClient.Fake, namespaces)
namespaceController := NewNamespaceController(fakeClient, dynamic.NewDynamicClientPool(&restclient.Config{}))
informerClientFactory := func(cluster *federationapi.Cluster) (kubeclientset.Interface, error) {
switch cluster.Name {
case cluster1.Name:
return cluster1Client, nil
case cluster2.Name:
return cluster2Client, nil
default:
return nil, fmt.Errorf("Unknown cluster")
}
}
setClientFactory(namespaceController.namespaceFederatedInformer, informerClientFactory)
namespaceController.clusterAvailableDelay = time.Second
namespaceController.namespaceReviewDelay = 50 * time.Millisecond
namespaceController.smallDelay = 20 * time.Millisecond
namespaceController.updateTimeout = 5 * time.Second
stop := make(chan struct{})
namespaceController.Run(stop)
// Test add federated namespace.
namespaceWatch.Add(&ns1)
// Verify that the DeleteFromUnderlyingClusters finalizer is added to the namespace.
updatedNamespace := GetNamespaceFromChan(namespaceUpdateChan)
require.NotNil(t, updatedNamespace)
AssertHasFinalizer(t, updatedNamespace, deletionhelper.FinalizerDeleteFromUnderlyingClusters)
ns1 = *updatedNamespace
// Verify that the namespace is created in underlying cluster1.
createdNamespace := GetNamespaceFromChan(cluster1CreateChan)
require.NotNil(t, createdNamespace)
assert.Equal(t, ns1.Name, createdNamespace.Name)
// Wait for the namespace to appear in the informer store
err := WaitForStoreUpdate(
namespaceController.namespaceFederatedInformer.GetTargetStore(),
cluster1.Name, ns1.Name, wait.ForeverTestTimeout)
assert.Nil(t, err, "namespace should have appeared in the informer store")
// Test update federated namespace.
ns1.Annotations = map[string]string{
"A": "B",
}
namespaceWatch.Modify(&ns1)
assert.NoError(t, CheckObjectFromChan(cluster1UpdateChan, MetaAndSpecCheckingFunction(&ns1)))
// Test add cluster
clusterWatch.Add(cluster2)
createdNamespace2 := GetNamespaceFromChan(cluster2CreateChan)
require.NotNil(t, createdNamespace2)
assert.Equal(t, ns1.Name, createdNamespace2.Name)
assert.Contains(t, createdNamespace2.Annotations, "A")
// Delete the namespace with orphan finalizer (let namespaces
// in underlying clusters be as is).
// TODO: Add a test without orphan finalizer.
ns1.ObjectMeta.Finalizers = append(ns1.ObjectMeta.Finalizers, metav1.FinalizerOrphanDependents)
ns1.DeletionTimestamp = &metav1.Time{Time: time.Now()}
namespaceWatch.Modify(&ns1)
assert.Equal(t, ns1.Name, GetStringFromChan(nsDeleteChan))
// TODO: Add a test for verifying that resources in the namespace are deleted
// when the namespace is deleted.
// Need a fake dynamic client to mock list and delete actions to be able to test this.
// TODO: Add a fake dynamic client and test this.
// In the meantime, e2e test verify that the resources in a namespace are
// deleted when the namespace is deleted.
close(stop)
}
func setClientFactory(informer util.FederatedInformer, informerClientFactory func(*federationapi.Cluster) (kubeclientset.Interface, error)) {
testInformer := ToFederatedInformerForTestOnly(informer)
testInformer.SetClientFactory(informerClientFactory)
}
func RegisterDeleteCollection(client *core.Fake, resource string) chan string {
deleteChan := make(chan string, 100)
client.AddReactor("delete-collection", resource, func(action core.Action) (bool, runtime.Object, error) {
deleteChan <- "all"
return true, nil, nil
})
return deleteChan
}
func RegisterDelete(client *core.Fake, resource string) chan string {
deleteChan := make(chan string, 100)
client.AddReactor("delete", resource, func(action core.Action) (bool, runtime.Object, error) {
deleteAction := action.(core.DeleteAction)
deleteChan <- deleteAction.GetName()
return true, nil, nil
})
return deleteChan
}
func GetStringFromChan(c chan string) string {
select {
case str := <-c:
return str
case <-time.After(5 * time.Second):
return "timedout"
}
}
func GetNamespaceFromChan(c chan runtime.Object) *apiv1.Namespace {
if namespace := GetObjectFromChan(c); namespace == nil {
return nil
} else {
return namespace.(*apiv1.Namespace)
}
}

View File

@ -28,7 +28,6 @@ go_library(
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/watch:go_default_library", "//vendor/k8s.io/apimachinery/pkg/watch:go_default_library",
@ -42,28 +41,18 @@ go_library(
go_test( go_test(
name = "go_default_test", name = "go_default_test",
srcs = [ srcs = ["controller_test.go"],
"controller_test.go",
"deploymentcontroller_test.go",
],
library = ":go_default_library", library = ":go_default_library",
tags = ["automanaged"], tags = ["automanaged"],
deps = [ deps = [
"//federation/apis/federation/v1beta1:go_default_library", "//federation/apis/federation/v1beta1:go_default_library",
"//federation/client/clientset_generated/federation_clientset/fake:go_default_library",
"//federation/pkg/federatedtypes:go_default_library", "//federation/pkg/federatedtypes:go_default_library",
"//federation/pkg/federation-controller/util:go_default_library", "//federation/pkg/federation-controller/util:go_default_library",
"//federation/pkg/federation-controller/util/test:go_default_library", "//federation/pkg/federation-controller/util/test:go_default_library",
"//pkg/client/clientset_generated/clientset:go_default_library",
"//pkg/client/clientset_generated/clientset/fake:go_default_library",
"//vendor/github.com/stretchr/testify/assert:go_default_library",
"//vendor/github.com/stretchr/testify/require:go_default_library", "//vendor/github.com/stretchr/testify/require:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/api/extensions/v1beta1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
], ],
) )

View File

@ -24,7 +24,6 @@ import (
"k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
pkgruntime "k8s.io/apimachinery/pkg/runtime" pkgruntime "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch" "k8s.io/apimachinery/pkg/watch"
@ -97,7 +96,7 @@ type FederationSyncController struct {
func StartFederationSyncController(kind string, adapterFactory federatedtypes.AdapterFactory, config *restclient.Config, stopChan <-chan struct{}, minimizeLatency bool) { func StartFederationSyncController(kind string, adapterFactory federatedtypes.AdapterFactory, config *restclient.Config, stopChan <-chan struct{}, minimizeLatency bool) {
restclient.AddUserAgent(config, fmt.Sprintf("federation-%s-controller", kind)) restclient.AddUserAgent(config, fmt.Sprintf("federation-%s-controller", kind))
client := federationclientset.NewForConfigOrDie(config) client := federationclientset.NewForConfigOrDie(config)
adapter := adapterFactory(client) adapter := adapterFactory(client, config)
controller := newFederationSyncController(client, adapter) controller := newFederationSyncController(client, adapter)
if minimizeLatency { if minimizeLatency {
controller.minimizeLatency() controller.minimizeLatency()
@ -189,9 +188,9 @@ func newFederationSyncController(client federationclientset.Interface, adapter f
return err return err
}, },
func(client kubeclientset.Interface, obj pkgruntime.Object) error { func(client kubeclientset.Interface, obj pkgruntime.Object) error {
namespacedName := adapter.NamespacedName(obj) qualifiedName := adapter.QualifiedName(obj)
orphanDependents := false orphanDependents := false
err := adapter.ClusterDelete(client, namespacedName, &metav1.DeleteOptions{OrphanDependents: &orphanDependents}) err := adapter.ClusterDelete(client, qualifiedName, &metav1.DeleteOptions{OrphanDependents: &orphanDependents})
return err return err
}) })
@ -199,7 +198,7 @@ func newFederationSyncController(client federationclientset.Interface, adapter f
s.updateObject, s.updateObject,
// objNameFunc // objNameFunc
func(obj pkgruntime.Object) string { func(obj pkgruntime.Object) string {
return adapter.NamespacedName(obj).String() return adapter.QualifiedName(obj).String()
}, },
s.informer, s.informer,
s.updater, s.updater,
@ -264,38 +263,38 @@ func (s *FederationSyncController) worker() {
} }
item := obj.(*util.DelayingDelivererItem) item := obj.(*util.DelayingDelivererItem)
namespacedName := item.Value.(*types.NamespacedName) qualifiedName := item.Value.(*federatedtypes.QualifiedName)
status := s.reconcile(*namespacedName) status := s.reconcile(*qualifiedName)
s.workQueue.Done(item) s.workQueue.Done(item)
switch status { switch status {
case statusAllOK: case statusAllOK:
break break
case statusError: case statusError:
s.deliver(*namespacedName, 0, true) s.deliver(*qualifiedName, 0, true)
case statusNeedsRecheck: case statusNeedsRecheck:
s.deliver(*namespacedName, s.reviewDelay, false) s.deliver(*qualifiedName, s.reviewDelay, false)
case statusNotSynced: case statusNotSynced:
s.deliver(*namespacedName, s.clusterAvailableDelay, false) s.deliver(*qualifiedName, s.clusterAvailableDelay, false)
} }
} }
} }
func (s *FederationSyncController) deliverObj(obj pkgruntime.Object, delay time.Duration, failed bool) { func (s *FederationSyncController) deliverObj(obj pkgruntime.Object, delay time.Duration, failed bool) {
namespacedName := s.adapter.NamespacedName(obj) qualifiedName := s.adapter.QualifiedName(obj)
s.deliver(namespacedName, delay, failed) s.deliver(qualifiedName, delay, failed)
} }
// Adds backoff to delay if this delivery is related to some failure. Resets backoff if there was no failure. // Adds backoff to delay if this delivery is related to some failure. Resets backoff if there was no failure.
func (s *FederationSyncController) deliver(namespacedName types.NamespacedName, delay time.Duration, failed bool) { func (s *FederationSyncController) deliver(qualifiedName federatedtypes.QualifiedName, delay time.Duration, failed bool) {
key := namespacedName.String() key := qualifiedName.String()
if failed { if failed {
s.backoff.Next(key, time.Now()) s.backoff.Next(key, time.Now())
delay = delay + s.backoff.Get(key) delay = delay + s.backoff.Get(key)
} else { } else {
s.backoff.Reset(key) s.backoff.Reset(key)
} }
s.deliverer.DeliverAfter(key, &namespacedName, delay) s.deliverer.DeliverAfter(key, &qualifiedName, delay)
} }
// Check whether all data stores are in sync. False is returned if any of the informer/stores is not yet // Check whether all data stores are in sync. False is returned if any of the informer/stores is not yet
@ -322,18 +321,18 @@ func (s *FederationSyncController) reconcileOnClusterChange() {
s.clusterDeliverer.DeliverAt(allClustersKey, nil, time.Now().Add(s.clusterAvailableDelay)) s.clusterDeliverer.DeliverAt(allClustersKey, nil, time.Now().Add(s.clusterAvailableDelay))
} }
for _, obj := range s.store.List() { for _, obj := range s.store.List() {
namespacedName := s.adapter.NamespacedName(obj.(pkgruntime.Object)) qualifiedName := s.adapter.QualifiedName(obj.(pkgruntime.Object))
s.deliver(namespacedName, s.smallDelay, false) s.deliver(qualifiedName, s.smallDelay, false)
} }
} }
func (s *FederationSyncController) reconcile(namespacedName types.NamespacedName) reconciliationStatus { func (s *FederationSyncController) reconcile(qualifiedName federatedtypes.QualifiedName) reconciliationStatus {
if !s.isSynced() { if !s.isSynced() {
return statusNotSynced return statusNotSynced
} }
kind := s.adapter.Kind() kind := s.adapter.Kind()
key := namespacedName.String() key := qualifiedName.String()
glog.V(4).Infof("Starting to reconcile %v %v", kind, key) glog.V(4).Infof("Starting to reconcile %v %v", kind, key)
startTime := time.Now() startTime := time.Now()
@ -349,10 +348,10 @@ func (s *FederationSyncController) reconcile(namespacedName types.NamespacedName
meta := s.adapter.ObjectMeta(obj) meta := s.adapter.ObjectMeta(obj)
if meta.DeletionTimestamp != nil { if meta.DeletionTimestamp != nil {
err := s.delete(obj, kind, namespacedName) err := s.delete(obj, kind, qualifiedName)
if err != nil { if err != nil {
msg := "Failed to delete %s %q: %v" msg := "Failed to delete %s %q: %v"
args := []interface{}{kind, namespacedName, err} args := []interface{}{kind, qualifiedName, err}
runtime.HandleError(fmt.Errorf(msg, args...)) runtime.HandleError(fmt.Errorf(msg, args...))
s.eventRecorder.Eventf(obj, api.EventTypeWarning, "DeleteFailed", msg, args...) s.eventRecorder.Eventf(obj, api.EventTypeWarning, "DeleteFailed", msg, args...)
return statusError return statusError
@ -415,14 +414,25 @@ func (s *FederationSyncController) objFromCache(kind, key string) (pkgruntime.Ob
} }
// delete deletes the given resource or returns error if the deletion was not complete. // delete deletes the given resource or returns error if the deletion was not complete.
func (s *FederationSyncController) delete(obj pkgruntime.Object, kind string, namespacedName types.NamespacedName) error { func (s *FederationSyncController) delete(obj pkgruntime.Object, kind string, qualifiedName federatedtypes.QualifiedName) error {
glog.V(3).Infof("Handling deletion of %s %q", kind, namespacedName) glog.V(3).Infof("Handling deletion of %s %q", kind, qualifiedName)
// Perform pre-deletion cleanup for the namespace adapter
namespaceAdapter, ok := s.adapter.(*federatedtypes.NamespaceAdapter)
if ok {
var err error
obj, err = namespaceAdapter.CleanUpNamespace(obj, s.eventRecorder)
if err != nil {
return err
}
}
_, err := s.deletionHelper.HandleObjectInUnderlyingClusters(obj) _, err := s.deletionHelper.HandleObjectInUnderlyingClusters(obj)
if err != nil { if err != nil {
return err return err
} }
err = s.adapter.FedDelete(namespacedName, nil) err = s.adapter.FedDelete(qualifiedName, nil)
if err != nil { if err != nil {
// Its all good if the error is not found error. That means it is deleted already and we do not have to do anything. // Its all good if the error is not found error. That means it is deleted already and we do not have to do anything.
// This is expected when we are processing an update as a result of finalizer deletion. // This is expected when we are processing an update as a result of finalizer deletion.

View File

@ -1,146 +0,0 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package sync
import (
"flag"
"fmt"
"testing"
apiv1 "k8s.io/api/core/v1"
extensionsv1 "k8s.io/api/extensions/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
fedv1 "k8s.io/kubernetes/federation/apis/federation/v1beta1"
fakefedclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset/fake"
"k8s.io/kubernetes/federation/pkg/federatedtypes"
. "k8s.io/kubernetes/federation/pkg/federation-controller/util/test"
kubeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
fakekubeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset/fake"
"github.com/stretchr/testify/assert"
)
const (
deployments = "deployments"
)
func TestDeploymentController(t *testing.T) {
flag.Set("logtostderr", "true")
flag.Set("v", "5")
flag.Parse()
cluster1 := NewCluster("cluster1", apiv1.ConditionTrue)
cluster2 := NewCluster("cluster2", apiv1.ConditionTrue)
fakeClient := &fakefedclientset.Clientset{}
// Add an update reactor on fake client to return the desired updated object.
// This is a hack to workaround https://github.com/kubernetes/kubernetes/issues/40939.
AddFakeUpdateReactor(deployments, &fakeClient.Fake)
RegisterFakeList("clusters", &fakeClient.Fake, &fedv1.ClusterList{Items: []fedv1.Cluster{*cluster1}})
deploymentsWatch := RegisterFakeWatch(deployments, &fakeClient.Fake)
clusterWatch := RegisterFakeWatch("clusters", &fakeClient.Fake)
cluster1Client := &fakekubeclientset.Clientset{}
cluster1Watch := RegisterFakeWatch(deployments, &cluster1Client.Fake)
cluster1CreateChan := RegisterFakeCopyOnCreate(deployments, &cluster1Client.Fake, cluster1Watch)
cluster1UpdateChan := RegisterFakeCopyOnUpdate(deployments, &cluster1Client.Fake, cluster1Watch)
cluster2Client := &fakekubeclientset.Clientset{}
cluster2Watch := RegisterFakeWatch(deployments, &cluster2Client.Fake)
cluster2CreateChan := RegisterFakeCopyOnCreate(deployments, &cluster2Client.Fake, cluster2Watch)
deploymentController := newFederationSyncController(fakeClient, federatedtypes.NewDeploymentAdapter(fakeClient))
deploymentController.minimizeLatency()
clientFactory := func(cluster *fedv1.Cluster) (kubeclientset.Interface, error) {
switch cluster.Name {
case cluster1.Name:
return cluster1Client, nil
case cluster2.Name:
return cluster2Client, nil
default:
return nil, fmt.Errorf("Unknown cluster")
}
}
ToFederatedInformerForTestOnly(deploymentController.informer).SetClientFactory(clientFactory)
stop := make(chan struct{})
go deploymentController.Run(stop)
// Create deployment. Expect to see it in cluster1.
dep1 := newDeploymentWithReplicas("depA", 6)
deploymentsWatch.Add(dep1)
checkDeployment := func(base *extensionsv1.Deployment, replicas int32) CheckingFunction {
return func(obj runtime.Object) error {
if obj == nil {
return fmt.Errorf("Observed object is nil")
}
d := obj.(*extensionsv1.Deployment)
if err := CompareObjectMeta(base.ObjectMeta, d.ObjectMeta); err != nil {
return err
}
if replicas != *d.Spec.Replicas {
return fmt.Errorf("Replica count is different expected:%d observed:%d", replicas, *d.Spec.Replicas)
}
return nil
}
}
assert.NoError(t, CheckObjectFromChan(cluster1CreateChan, checkDeployment(dep1, *dep1.Spec.Replicas)))
err := WaitForStoreUpdate(
deploymentController.informer.GetTargetStore(),
cluster1.Name, types.NamespacedName{Namespace: dep1.Namespace, Name: dep1.Name}.String(), wait.ForeverTestTimeout)
assert.Nil(t, err, "deployment should have appeared in the informer store")
// Increase replica count. Expect to see the update in cluster1.
newRep := int32(8)
dep1.Spec.Replicas = &newRep
deploymentsWatch.Modify(dep1)
assert.NoError(t, CheckObjectFromChan(cluster1UpdateChan, checkDeployment(dep1, *dep1.Spec.Replicas)))
// Add new cluster. Although rebalance = false, no pods have been created yet so it should
// rebalance anyway.
clusterWatch.Add(cluster2)
assert.NoError(t, CheckObjectFromChan(cluster1UpdateChan, checkDeployment(dep1, *dep1.Spec.Replicas/2)))
assert.NoError(t, CheckObjectFromChan(cluster2CreateChan, checkDeployment(dep1, *dep1.Spec.Replicas/2)))
// Add new deployment with non-default replica placement preferences.
dep2 := newDeploymentWithReplicas("deployment2", 9)
dep2.Annotations = make(map[string]string)
dep2.Annotations[federatedtypes.FedDeploymentPreferencesAnnotation] = `{"rebalance": true,
"clusters": {
"cluster1": {"weight": 2},
"cluster2": {"weight": 1}
}}`
deploymentsWatch.Add(dep2)
assert.NoError(t, CheckObjectFromChan(cluster1CreateChan, checkDeployment(dep2, 6)))
assert.NoError(t, CheckObjectFromChan(cluster2CreateChan, checkDeployment(dep2, 3)))
}
func newDeploymentWithReplicas(name string, replicas int32) *extensionsv1.Deployment {
return &extensionsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: metav1.NamespaceDefault,
SelfLink: "/api/v1/namespaces/default/deployments/name",
},
Spec: extensionsv1.DeploymentSpec{
Replicas: &replicas,
},
}
}

View File

@ -45,7 +45,7 @@ var _ = framework.KubeDescribe("Federated types [Feature:Federation][Experimenta
if clusterClients == nil { if clusterClients == nil {
clusterClients = f.GetClusterClients() clusterClients = f.GetClusterClients()
} }
adapter := fedType.AdapterFactory(f.FederationClientset) adapter := fedType.AdapterFactory(f.FederationClientset, f.FederationConfig)
crudTester := fedframework.NewFederatedTypeCRUDTester(adapter, clusterClients) crudTester := fedframework.NewFederatedTypeCRUDTester(adapter, clusterClients)
obj := adapter.NewTestObject(f.FederationNamespace.Name) obj := adapter.NewTestObject(f.FederationNamespace.Name)
crudTester.CheckLifecycle(obj) crudTester.CheckLifecycle(obj)

View File

@ -26,6 +26,8 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors" apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset" "k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset"
kubeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" kubeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
"k8s.io/kubernetes/test/e2e/framework" "k8s.io/kubernetes/test/e2e/framework"
@ -44,7 +46,10 @@ type Framework struct {
// should abort, the AfterSuite hook should run all Cleanup actions. // should abort, the AfterSuite hook should run all Cleanup actions.
cleanupHandle framework.CleanupActionHandle cleanupHandle framework.CleanupActionHandle
FederationConfig *restclient.Config
FederationClientset *federation_clientset.Clientset FederationClientset *federation_clientset.Clientset
FederationNamespace *v1.Namespace FederationNamespace *v1.Namespace
} }
@ -73,10 +78,16 @@ func (f *Framework) FederationBeforeEach() {
// https://github.com/onsi/ginkgo/issues/222 // https://github.com/onsi/ginkgo/issues/222
f.cleanupHandle = framework.AddCleanupAction(f.FederationAfterEach) f.cleanupHandle = framework.AddCleanupAction(f.FederationAfterEach)
if f.FederationConfig == nil {
By("Reading the federation configuration")
var err error
f.FederationConfig, err = LoadFederatedConfig(&clientcmd.ConfigOverrides{})
Expect(err).NotTo(HaveOccurred())
}
if f.FederationClientset == nil { if f.FederationClientset == nil {
By("Creating a release 1.5 federation Clientset") By("Creating a release 1.5 federation Clientset")
var err error var err error
f.FederationClientset, err = LoadFederationClientset() f.FederationClientset, err = LoadFederationClientset(f.FederationConfig)
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
} }
By("Waiting for federation-apiserver to be ready") By("Waiting for federation-apiserver to be ready")

View File

@ -65,12 +65,7 @@ func WaitForFederationApiserverReady(c *federation_clientset.Clientset) error {
}) })
} }
func LoadFederationClientset() (*federation_clientset.Clientset, error) { func LoadFederationClientset(config *restclient.Config) (*federation_clientset.Clientset, error) {
config, err := LoadFederatedConfig(&clientcmd.ConfigOverrides{})
if err != nil {
return nil, err
}
c, err := federation_clientset.NewForConfig(config) c, err := federation_clientset.NewForConfig(config)
if err != nil { if err != nil {
return nil, fmt.Errorf("error creating federation clientset: %v", err.Error()) return nil, fmt.Errorf("error creating federation clientset: %v", err.Error())
@ -83,7 +78,7 @@ func LoadFederatedConfig(overrides *clientcmd.ConfigOverrides) (*restclient.Conf
if err != nil { if err != nil {
return nil, fmt.Errorf("error creating federation client config: %v", err.Error()) return nil, fmt.Errorf("error creating federation client config: %v", err.Error())
} }
cfg, err := clientcmd.NewDefaultClientConfig(*c, overrides).ClientConfig() cfg, err := clientcmd.NewDefaultClientConfig(*c, &clientcmd.ConfigOverrides{}).ClientConfig()
if cfg != nil { if cfg != nil {
//TODO(colhom): this is only here because https://github.com/kubernetes/kubernetes/issues/25422 //TODO(colhom): this is only here because https://github.com/kubernetes/kubernetes/issues/25422
cfg.NegotiatedSerializer = api.Codecs cfg.NegotiatedSerializer = api.Codecs

View File

@ -66,39 +66,6 @@ var _ = framework.KubeDescribe("Federation namespace [Feature:Federation]", func
} }
}) })
It("should be created and deleted successfully", func() {
fedframework.SkipUnlessFederated(f.ClientSet)
nsName = createNamespace(f.FederationClientset.Core().Namespaces())
By(fmt.Sprintf("Deleting namespace %s", nsName))
deleteNamespace(nil, nsName,
f.FederationClientset.Core().Namespaces().Get,
f.FederationClientset.Core().Namespaces().Delete)
By(fmt.Sprintf("Verified that deletion succeeded"))
})
It("should be deleted from underlying clusters when OrphanDependents is false", func() {
fedframework.SkipUnlessFederated(f.ClientSet)
orphanDependents := false
nsName = verifyNsCascadingDeletion(f.FederationClientset.Core().Namespaces(), clusters, &orphanDependents)
By(fmt.Sprintf("Verified that namespaces were deleted from underlying clusters"))
})
It("should not be deleted from underlying clusters when OrphanDependents is true", func() {
fedframework.SkipUnlessFederated(f.ClientSet)
orphanDependents := true
nsName = verifyNsCascadingDeletion(f.FederationClientset.Core().Namespaces(), clusters, &orphanDependents)
By(fmt.Sprintf("Verified that namespaces were not deleted from underlying clusters"))
})
It("should not be deleted from underlying clusters when OrphanDependents is nil", func() {
fedframework.SkipUnlessFederated(f.ClientSet)
nsName = verifyNsCascadingDeletion(f.FederationClientset.Core().Namespaces(), clusters, nil)
By(fmt.Sprintf("Verified that namespaces were not deleted from underlying clusters"))
})
// See https://github.com/kubernetes/kubernetes/issues/38225 // See https://github.com/kubernetes/kubernetes/issues/38225
It("deletes replicasets in the namespace when the namespace is deleted", func() { It("deletes replicasets in the namespace when the namespace is deleted", func() {
fedframework.SkipUnlessFederated(f.ClientSet) fedframework.SkipUnlessFederated(f.ClientSet)

View File

@ -38,7 +38,7 @@ type SimpleUpgradeTest struct {
// Setup creates a resource and validates its propagation to member clusters // Setup creates a resource and validates its propagation to member clusters
func (ut *SimpleUpgradeTest) Setup(f *fedframework.Framework) { func (ut *SimpleUpgradeTest) Setup(f *fedframework.Framework) {
adapter := ut.adapterFactory(f.FederationClientset) adapter := ut.adapterFactory(f.FederationClientset, f.FederationConfig)
clients := f.GetClusterClients() clients := f.GetClusterClients()
ut.crudTester = fedframework.NewFederatedTypeCRUDTester(adapter, clients) ut.crudTester = fedframework.NewFederatedTypeCRUDTester(adapter, clients)

View File

@ -20,6 +20,8 @@ import (
"io/ioutil" "io/ioutil"
"os" "os"
"k8s.io/kubernetes/test/e2e/framework"
"github.com/golang/glog" "github.com/golang/glog"
) )
@ -128,7 +130,7 @@ func (es *e2eServices) startApiServer() error {
// startNamespaceController starts the embedded namespace controller or returns an error. // startNamespaceController starts the embedded namespace controller or returns an error.
func (es *e2eServices) startNamespaceController() error { func (es *e2eServices) startNamespaceController() error {
glog.Info("Starting namespace controller") glog.Info("Starting namespace controller")
es.nsController = NewNamespaceController() es.nsController = NewNamespaceController(framework.TestContext.Host)
return es.nsController.Start() return es.nsController.Start()
} }

View File

@ -26,7 +26,6 @@ import (
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset" "k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions" informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions"
namespacecontroller "k8s.io/kubernetes/pkg/controller/namespace" namespacecontroller "k8s.io/kubernetes/pkg/controller/namespace"
"k8s.io/kubernetes/test/e2e/framework"
) )
const ( const (
@ -40,18 +39,19 @@ const (
// NamespaceController is a server which manages namespace controller. // NamespaceController is a server which manages namespace controller.
type NamespaceController struct { type NamespaceController struct {
host string
stopCh chan struct{} stopCh chan struct{}
} }
// NewNamespaceController creates a new namespace controller. // NewNamespaceController creates a new namespace controller.
func NewNamespaceController() *NamespaceController { func NewNamespaceController(host string) *NamespaceController {
return &NamespaceController{stopCh: make(chan struct{})} return &NamespaceController{host: host, stopCh: make(chan struct{})}
} }
// Start starts the namespace controller. // Start starts the namespace controller.
func (n *NamespaceController) Start() error { func (n *NamespaceController) Start() error {
// Use the default QPS // Use the default QPS
config := restclient.AddUserAgent(&restclient.Config{Host: framework.TestContext.Host}, ncName) config := restclient.AddUserAgent(&restclient.Config{Host: n.host}, ncName)
client, err := clientset.NewForConfig(config) client, err := clientset.NewForConfig(config)
if err != nil { if err != nil {
return err return err

View File

@ -104,7 +104,7 @@ func initCRUDTest(t *testing.T, fedFixture *framework.FederationFixture, adapter
fixture := framework.NewControllerFixture(t, kind, adapterFactory, config) fixture := framework.NewControllerFixture(t, kind, adapterFactory, config)
client := fedFixture.APIFixture.NewClient(fmt.Sprintf("crud-test-%s", kind)) client := fedFixture.APIFixture.NewClient(fmt.Sprintf("crud-test-%s", kind))
adapter := adapterFactory(client) adapter := adapterFactory(client, config)
crudTester := framework.NewFederatedTypeCRUDTester(t, adapter, fedFixture.ClusterClients) crudTester := framework.NewFederatedTypeCRUDTester(t, adapter, fedFixture.ClusterClients)

View File

@ -28,6 +28,7 @@ go_library(
"//federation/pkg/federation-controller/sync:go_default_library", "//federation/pkg/federation-controller/sync:go_default_library",
"//pkg/client/clientset_generated/clientset:go_default_library", "//pkg/client/clientset_generated/clientset:go_default_library",
"//pkg/master:go_default_library", "//pkg/master:go_default_library",
"//test/e2e_node/services:go_default_library",
"//test/integration/framework:go_default_library", "//test/integration/framework:go_default_library",
"//vendor/github.com/pborman/uuid:go_default_library", "//vendor/github.com/pborman/uuid:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",

View File

@ -27,14 +27,16 @@ import (
clustercontroller "k8s.io/kubernetes/federation/pkg/federation-controller/cluster" clustercontroller "k8s.io/kubernetes/federation/pkg/federation-controller/cluster"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset" "k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
"k8s.io/kubernetes/pkg/master" "k8s.io/kubernetes/pkg/master"
"k8s.io/kubernetes/test/e2e_node/services"
"k8s.io/kubernetes/test/integration/framework" "k8s.io/kubernetes/test/integration/framework"
) )
type MemberCluster struct { type MemberCluster struct {
CloseFn framework.CloseFunc CloseFn framework.CloseFunc
Config *master.Config Config *master.Config
Client clientset.Interface Client clientset.Interface
Host string Host string
namespaceController *services.NamespaceController
} }
// FederationFixture manages a federation api server and a set of member clusters // FederationFixture manages a federation api server and a set of member clusters
@ -42,10 +44,11 @@ type FederationFixture struct {
APIFixture *FederationAPIFixture APIFixture *FederationAPIFixture
DesiredClusterCount int DesiredClusterCount int
Clusters []*MemberCluster Clusters []*MemberCluster
ClusterClients []clientset.Interface
ClusterController *clustercontroller.ClusterController ClusterClients []clientset.Interface
fedClient federationclientset.Interface ClusterController *clustercontroller.ClusterController
stopChan chan struct{} fedClient federationclientset.Interface
stopChan chan struct{}
} }
func (f *FederationFixture) SetUp(t *testing.T) { func (f *FederationFixture) SetUp(t *testing.T) {
@ -79,12 +82,18 @@ func (f *FederationFixture) StartCluster(t *testing.T) {
clusterClient := clientset.NewForConfigOrDie(config.GenericConfig.LoopbackClientConfig) clusterClient := clientset.NewForConfigOrDie(config.GenericConfig.LoopbackClientConfig)
f.ClusterClients = append(f.ClusterClients, clusterClient) f.ClusterClients = append(f.ClusterClients, clusterClient)
f.Clusters = append(f.Clusters, &MemberCluster{ memberCluster := &MemberCluster{
CloseFn: closeFn, CloseFn: closeFn,
Config: config, Config: config,
Client: clusterClient, Client: clusterClient,
Host: host, Host: host,
}) namespaceController: services.NewNamespaceController(host),
}
f.Clusters = append(f.Clusters, memberCluster)
err := memberCluster.namespaceController.Start()
if err != nil {
t.Fatal(err)
}
clusterId := len(f.ClusterClients) clusterId := len(f.ClusterClients)
@ -115,6 +124,10 @@ func (f *FederationFixture) TearDown(t *testing.T) {
f.stopChan = nil f.stopChan = nil
} }
for _, cluster := range f.Clusters { for _, cluster := range f.Clusters {
// Need to close controllers with active connections to the
// cluster api before stopping the api or the connections will
// hang until tcp timeout.
cluster.namespaceController.Stop()
cluster.CloseFn() cluster.CloseFn()
} }
f.Clusters = nil f.Clusters = nil

View File

@ -599,7 +599,6 @@ k8s.io/kubernetes/federation/pkg/dnsprovider/providers/google/clouddns,madhusuda
k8s.io/kubernetes/federation/pkg/federation-controller/cluster,nikhiljindal,0, k8s.io/kubernetes/federation/pkg/federation-controller/cluster,nikhiljindal,0,
k8s.io/kubernetes/federation/pkg/federation-controller/deployment,zmerlynn,1, k8s.io/kubernetes/federation/pkg/federation-controller/deployment,zmerlynn,1,
k8s.io/kubernetes/federation/pkg/federation-controller/ingress,vishh,1, k8s.io/kubernetes/federation/pkg/federation-controller/ingress,vishh,1,
k8s.io/kubernetes/federation/pkg/federation-controller/namespace,rrati,0,
k8s.io/kubernetes/federation/pkg/federation-controller/replicaset,roberthbailey,1, k8s.io/kubernetes/federation/pkg/federation-controller/replicaset,roberthbailey,1,
k8s.io/kubernetes/federation/pkg/federation-controller/secret,apelisse,1, k8s.io/kubernetes/federation/pkg/federation-controller/secret,apelisse,1,
k8s.io/kubernetes/federation/pkg/federation-controller/service,pmorie,1, k8s.io/kubernetes/federation/pkg/federation-controller/service,pmorie,1,

1 name owner auto-assigned sig
599 k8s.io/kubernetes/federation/pkg/federation-controller/cluster nikhiljindal 0
600 k8s.io/kubernetes/federation/pkg/federation-controller/deployment zmerlynn 1
601 k8s.io/kubernetes/federation/pkg/federation-controller/ingress vishh 1
k8s.io/kubernetes/federation/pkg/federation-controller/namespace rrati 0
602 k8s.io/kubernetes/federation/pkg/federation-controller/replicaset roberthbailey 1
603 k8s.io/kubernetes/federation/pkg/federation-controller/secret apelisse 1
604 k8s.io/kubernetes/federation/pkg/federation-controller/service pmorie 1