fed: Replace NamespacedName for namespace sync compatibility

This commit is contained in:
Maru Newby 2017-06-21 18:47:26 -07:00
parent 6d6b93986c
commit 5ed095b401
11 changed files with 144 additions and 259 deletions

View File

@ -15,6 +15,7 @@ go_library(
"configmap.go",
"daemonset.go",
"deployment.go",
"qualifiedname.go",
"registry.go",
"replicaset.go",
"scheduling.go",

View File

@ -19,7 +19,6 @@ package federatedtypes
import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
pkgruntime "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/watch"
federationclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset"
kubeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
@ -34,21 +33,21 @@ type FederatedTypeAdapter interface {
IsExpectedType(obj interface{}) bool
Copy(obj pkgruntime.Object) pkgruntime.Object
Equivalent(obj1, obj2 pkgruntime.Object) bool
NamespacedName(obj pkgruntime.Object) types.NamespacedName
QualifiedName(obj pkgruntime.Object) QualifiedName
ObjectMeta(obj pkgruntime.Object) *metav1.ObjectMeta
// Fed* operations target the federation control plane
FedCreate(obj pkgruntime.Object) (pkgruntime.Object, error)
FedDelete(namespacedName types.NamespacedName, options *metav1.DeleteOptions) error
FedGet(namespacedName types.NamespacedName) (pkgruntime.Object, error)
FedDelete(qualifiedName QualifiedName, options *metav1.DeleteOptions) error
FedGet(qualifiedName QualifiedName) (pkgruntime.Object, error)
FedList(namespace string, options metav1.ListOptions) (pkgruntime.Object, error)
FedUpdate(obj pkgruntime.Object) (pkgruntime.Object, error)
FedWatch(namespace string, options metav1.ListOptions) (watch.Interface, error)
// The following operations are intended to target a cluster that is a member of a federation
ClusterCreate(client kubeclientset.Interface, obj pkgruntime.Object) (pkgruntime.Object, error)
ClusterDelete(client kubeclientset.Interface, nsName types.NamespacedName, options *metav1.DeleteOptions) error
ClusterGet(client kubeclientset.Interface, namespacedName types.NamespacedName) (pkgruntime.Object, error)
ClusterDelete(client kubeclientset.Interface, qualifiedName QualifiedName, options *metav1.DeleteOptions) error
ClusterGet(client kubeclientset.Interface, qualifiedName QualifiedName) (pkgruntime.Object, error)
ClusterList(client kubeclientset.Interface, namespace string, options metav1.ListOptions) (pkgruntime.Object, error)
ClusterUpdate(client kubeclientset.Interface, obj pkgruntime.Object) (pkgruntime.Object, error)
ClusterWatch(client kubeclientset.Interface, namespace string, options metav1.ListOptions) (watch.Interface, error)
@ -75,5 +74,5 @@ func SetAnnotation(adapter FederatedTypeAdapter, obj pkgruntime.Object, key, val
// ObjectKey returns a cluster-unique key for the given object
func ObjectKey(adapter FederatedTypeAdapter, obj pkgruntime.Object) string {
return adapter.NamespacedName(obj).String()
return adapter.QualifiedName(obj).String()
}

View File

@ -21,7 +21,6 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
pkgruntime "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/watch"
federationclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset"
"k8s.io/kubernetes/federation/pkg/federation-controller/util"
@ -72,9 +71,9 @@ func (a *ConfigMapAdapter) Equivalent(obj1, obj2 pkgruntime.Object) bool {
return util.ConfigMapEquivalent(configmap1, configmap2)
}
func (a *ConfigMapAdapter) NamespacedName(obj pkgruntime.Object) types.NamespacedName {
func (a *ConfigMapAdapter) QualifiedName(obj pkgruntime.Object) QualifiedName {
configmap := obj.(*apiv1.ConfigMap)
return types.NamespacedName{Namespace: configmap.Namespace, Name: configmap.Name}
return QualifiedName{Namespace: configmap.Namespace, Name: configmap.Name}
}
func (a *ConfigMapAdapter) ObjectMeta(obj pkgruntime.Object) *metav1.ObjectMeta {
@ -86,12 +85,12 @@ func (a *ConfigMapAdapter) FedCreate(obj pkgruntime.Object) (pkgruntime.Object,
return a.client.CoreV1().ConfigMaps(configmap.Namespace).Create(configmap)
}
func (a *ConfigMapAdapter) FedDelete(namespacedName types.NamespacedName, options *metav1.DeleteOptions) error {
return a.client.CoreV1().ConfigMaps(namespacedName.Namespace).Delete(namespacedName.Name, options)
func (a *ConfigMapAdapter) FedDelete(qualifiedName QualifiedName, options *metav1.DeleteOptions) error {
return a.client.CoreV1().ConfigMaps(qualifiedName.Namespace).Delete(qualifiedName.Name, options)
}
func (a *ConfigMapAdapter) FedGet(namespacedName types.NamespacedName) (pkgruntime.Object, error) {
return a.client.CoreV1().ConfigMaps(namespacedName.Namespace).Get(namespacedName.Name, metav1.GetOptions{})
func (a *ConfigMapAdapter) FedGet(qualifiedName QualifiedName) (pkgruntime.Object, error) {
return a.client.CoreV1().ConfigMaps(qualifiedName.Namespace).Get(qualifiedName.Name, metav1.GetOptions{})
}
func (a *ConfigMapAdapter) FedList(namespace string, options metav1.ListOptions) (pkgruntime.Object, error) {
@ -112,12 +111,12 @@ func (a *ConfigMapAdapter) ClusterCreate(client kubeclientset.Interface, obj pkg
return client.CoreV1().ConfigMaps(configmap.Namespace).Create(configmap)
}
func (a *ConfigMapAdapter) ClusterDelete(client kubeclientset.Interface, nsName types.NamespacedName, options *metav1.DeleteOptions) error {
return client.CoreV1().ConfigMaps(nsName.Namespace).Delete(nsName.Name, options)
func (a *ConfigMapAdapter) ClusterDelete(client kubeclientset.Interface, qualifiedName QualifiedName, options *metav1.DeleteOptions) error {
return client.CoreV1().ConfigMaps(qualifiedName.Namespace).Delete(qualifiedName.Name, options)
}
func (a *ConfigMapAdapter) ClusterGet(client kubeclientset.Interface, namespacedName types.NamespacedName) (pkgruntime.Object, error) {
return client.CoreV1().ConfigMaps(namespacedName.Namespace).Get(namespacedName.Name, metav1.GetOptions{})
func (a *ConfigMapAdapter) ClusterGet(client kubeclientset.Interface, qualifiedName QualifiedName) (pkgruntime.Object, error) {
return client.CoreV1().ConfigMaps(qualifiedName.Namespace).Get(qualifiedName.Name, metav1.GetOptions{})
}
func (a *ConfigMapAdapter) ClusterList(client kubeclientset.Interface, namespace string, options metav1.ListOptions) (pkgruntime.Object, error) {

View File

@ -17,6 +17,7 @@ limitations under the License.
package crudtester
import (
"fmt"
"time"
"k8s.io/apimachinery/pkg/api/errors"
@ -76,15 +77,20 @@ func (c *FederatedTypeCRUDTester) CheckLifecycle(desiredObject pkgruntime.Object
func (c *FederatedTypeCRUDTester) Create(desiredObject pkgruntime.Object) pkgruntime.Object {
namespace := c.adapter.ObjectMeta(desiredObject).Namespace
c.tl.Logf("Creating new federated %s in namespace %q", c.kind, namespace)
resourceMsg := fmt.Sprintf("federated %s", c.kind)
if len(namespace) > 0 {
resourceMsg = fmt.Sprintf("%s in namespace %q", resourceMsg, namespace)
}
c.tl.Logf("Creating new %s", resourceMsg)
obj, err := c.adapter.FedCreate(desiredObject)
if err != nil {
c.tl.Fatalf("Error creating federated %s in namespace %q : %v", c.kind, namespace, err)
c.tl.Fatalf("Error creating %s: %v", resourceMsg, err)
}
namespacedName := c.adapter.NamespacedName(obj)
c.tl.Logf("Created new federated %s %q", c.kind, namespacedName)
qualifiedName := c.adapter.QualifiedName(obj)
c.tl.Logf("Created new federated %s %q", c.kind, qualifiedName)
return obj
}
@ -98,7 +104,7 @@ func (c *FederatedTypeCRUDTester) CheckCreate(desiredObject pkgruntime.Object) p
}
func (c *FederatedTypeCRUDTester) CheckUpdate(obj pkgruntime.Object) {
namespacedName := c.adapter.NamespacedName(obj)
qualifiedName := c.adapter.QualifiedName(obj)
var initialAnnotation string
meta := c.adapter.ObjectMeta(obj)
@ -106,29 +112,29 @@ func (c *FederatedTypeCRUDTester) CheckUpdate(obj pkgruntime.Object) {
initialAnnotation = meta.Annotations[AnnotationTestFederationCRUDUpdate]
}
c.tl.Logf("Updating federated %s %q", c.kind, namespacedName)
c.tl.Logf("Updating federated %s %q", c.kind, qualifiedName)
updatedObj, err := c.updateFedObject(obj)
if err != nil {
c.tl.Fatalf("Error updating federated %s %q: %v", c.kind, namespacedName, err)
c.tl.Fatalf("Error updating federated %s %q: %v", c.kind, qualifiedName, err)
}
// updateFedObject is expected to have changed the value of the annotation
meta = c.adapter.ObjectMeta(updatedObj)
updatedAnnotation := meta.Annotations[AnnotationTestFederationCRUDUpdate]
if updatedAnnotation == initialAnnotation {
c.tl.Fatalf("Federated %s %q not mutated", c.kind, namespacedName)
c.tl.Fatalf("Federated %s %q not mutated", c.kind, qualifiedName)
}
c.CheckPropagation(updatedObj)
}
func (c *FederatedTypeCRUDTester) CheckDelete(obj pkgruntime.Object, orphanDependents *bool) {
namespacedName := c.adapter.NamespacedName(obj)
qualifiedName := c.adapter.QualifiedName(obj)
c.tl.Logf("Deleting federated %s %q", c.kind, namespacedName)
err := c.adapter.FedDelete(namespacedName, &metav1.DeleteOptions{OrphanDependents: orphanDependents})
c.tl.Logf("Deleting federated %s %q", c.kind, qualifiedName)
err := c.adapter.FedDelete(qualifiedName, &metav1.DeleteOptions{OrphanDependents: orphanDependents})
if err != nil {
c.tl.Fatalf("Error deleting federated %s %q: %v", c.kind, namespacedName, err)
c.tl.Fatalf("Error deleting federated %s %q: %v", c.kind, qualifiedName, err)
}
deletingInCluster := (orphanDependents != nil && *orphanDependents == false)
@ -142,14 +148,14 @@ func (c *FederatedTypeCRUDTester) CheckDelete(obj pkgruntime.Object, orphanDepen
// Wait for deletion. The federation resource will only be removed once orphan deletion has been
// completed or deemed unnecessary.
err = wait.PollImmediate(c.waitInterval, waitTimeout, func() (bool, error) {
_, err := c.adapter.FedGet(namespacedName)
_, err := c.adapter.FedGet(qualifiedName)
if errors.IsNotFound(err) {
return true, nil
}
return false, err
})
if err != nil {
c.tl.Fatalf("Error deleting federated %s %q: %v", c.kind, namespacedName, err)
c.tl.Fatalf("Error deleting federated %s %q: %v", c.kind, qualifiedName, err)
}
var stateMsg string = "present"
@ -157,14 +163,14 @@ func (c *FederatedTypeCRUDTester) CheckDelete(obj pkgruntime.Object, orphanDepen
stateMsg = "not present"
}
for _, client := range c.clusterClients {
_, err := c.adapter.ClusterGet(client, namespacedName)
_, err := c.adapter.ClusterGet(client, qualifiedName)
switch {
case !deletingInCluster && errors.IsNotFound(err):
c.tl.Fatalf("Federated %s %q was unexpectedly deleted from a member cluster", c.kind, namespacedName)
c.tl.Fatalf("Federated %s %q was unexpectedly deleted from a member cluster", c.kind, qualifiedName)
case deletingInCluster && err == nil:
c.tl.Fatalf("Federated %s %q was unexpectedly orphaned in a member cluster", c.kind, namespacedName)
c.tl.Fatalf("Federated %s %q was unexpectedly orphaned in a member cluster", c.kind, qualifiedName)
case err != nil && !errors.IsNotFound(err):
c.tl.Fatalf("Error while checking whether %s %q is %s in member clusters: %v", c.kind, namespacedName, stateMsg, err)
c.tl.Fatalf("Error while checking whether %s %q is %s in member clusters: %v", c.kind, qualifiedName, stateMsg, err)
}
}
}
@ -176,26 +182,26 @@ func (c *FederatedTypeCRUDTester) CheckPropagation(obj pkgruntime.Object) {
// CheckPropagationForClients checks propagation for the provided clients
func (c *FederatedTypeCRUDTester) CheckPropagationForClients(obj pkgruntime.Object, clusterClients []clientset.Interface, objExpected bool) {
namespacedName := c.adapter.NamespacedName(obj)
qualifiedName := c.adapter.QualifiedName(obj)
c.tl.Logf("Waiting for %s %q in %d clusters", c.kind, namespacedName, len(clusterClients))
c.tl.Logf("Waiting for %s %q in %d clusters", c.kind, qualifiedName, len(clusterClients))
for _, client := range clusterClients {
err := c.waitForResource(client, obj)
switch {
case err == wait.ErrWaitTimeout:
if objExpected {
c.tl.Fatalf("Timeout verifying %s %q in a member cluster: %v", c.kind, namespacedName, err)
c.tl.Fatalf("Timeout verifying %s %q in a member cluster: %v", c.kind, qualifiedName, err)
}
case err != nil:
c.tl.Fatalf("Failed to verify %s %q in a member cluster: %v", c.kind, namespacedName, err)
c.tl.Fatalf("Failed to verify %s %q in a member cluster: %v", c.kind, qualifiedName, err)
case err == nil && !objExpected:
c.tl.Fatalf("Found unexpected object %s %q in a member cluster: %v", c.kind, namespacedName, err)
c.tl.Fatalf("Found unexpected object %s %q in a member cluster: %v", c.kind, qualifiedName, err)
}
}
}
func (c *FederatedTypeCRUDTester) waitForResource(client clientset.Interface, obj pkgruntime.Object) error {
namespacedName := c.adapter.NamespacedName(obj)
qualifiedName := c.adapter.QualifiedName(obj)
err := wait.PollImmediate(c.waitInterval, c.clusterWaitTimeout, func() (bool, error) {
equivalenceFunc := c.adapter.Equivalent
if c.adapter.IsSchedulingAdapter() {
@ -206,7 +212,7 @@ func (c *FederatedTypeCRUDTester) waitForResource(client clientset.Interface, ob
equivalenceFunc = schedulingAdapter.EquivalentIgnoringSchedule
}
clusterObj, err := c.adapter.ClusterGet(client, namespacedName)
clusterObj, err := c.adapter.ClusterGet(client, qualifiedName)
if err == nil && equivalenceFunc(clusterObj, obj) {
return true, nil
}
@ -227,8 +233,8 @@ func (c *FederatedTypeCRUDTester) updateFedObject(obj pkgruntime.Object) (pkgrun
if errors.IsConflict(err) {
// The resource was updated by the federation controller.
// Get the latest version and retry.
namespacedName := c.adapter.NamespacedName(obj)
obj, err = c.adapter.FedGet(namespacedName)
qualifiedName := c.adapter.QualifiedName(obj)
obj, err = c.adapter.FedGet(qualifiedName)
return false, err
}
// Be tolerant of a slow server

View File

@ -24,7 +24,6 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
pkgruntime "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/watch"
federationclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset"
"k8s.io/kubernetes/federation/pkg/federation-controller/util"
@ -75,9 +74,9 @@ func (a *DaemonSetAdapter) Equivalent(obj1, obj2 pkgruntime.Object) bool {
return util.ObjectMetaEquivalent(daemonset1.ObjectMeta, daemonset2.ObjectMeta) && reflect.DeepEqual(daemonset1.Spec, daemonset2.Spec)
}
func (a *DaemonSetAdapter) NamespacedName(obj pkgruntime.Object) types.NamespacedName {
func (a *DaemonSetAdapter) QualifiedName(obj pkgruntime.Object) QualifiedName {
daemonset := obj.(*extensionsv1.DaemonSet)
return types.NamespacedName{Namespace: daemonset.Namespace, Name: daemonset.Name}
return QualifiedName{Namespace: daemonset.Namespace, Name: daemonset.Name}
}
func (a *DaemonSetAdapter) ObjectMeta(obj pkgruntime.Object) *metav1.ObjectMeta {
@ -89,12 +88,12 @@ func (a *DaemonSetAdapter) FedCreate(obj pkgruntime.Object) (pkgruntime.Object,
return a.client.Extensions().DaemonSets(daemonset.Namespace).Create(daemonset)
}
func (a *DaemonSetAdapter) FedDelete(namespacedName types.NamespacedName, options *metav1.DeleteOptions) error {
return a.client.Extensions().DaemonSets(namespacedName.Namespace).Delete(namespacedName.Name, options)
func (a *DaemonSetAdapter) FedDelete(qualifiedName QualifiedName, options *metav1.DeleteOptions) error {
return a.client.Extensions().DaemonSets(qualifiedName.Namespace).Delete(qualifiedName.Name, options)
}
func (a *DaemonSetAdapter) FedGet(namespacedName types.NamespacedName) (pkgruntime.Object, error) {
return a.client.Extensions().DaemonSets(namespacedName.Namespace).Get(namespacedName.Name, metav1.GetOptions{})
func (a *DaemonSetAdapter) FedGet(qualifiedName QualifiedName) (pkgruntime.Object, error) {
return a.client.Extensions().DaemonSets(qualifiedName.Namespace).Get(qualifiedName.Name, metav1.GetOptions{})
}
func (a *DaemonSetAdapter) FedList(namespace string, options metav1.ListOptions) (pkgruntime.Object, error) {
@ -115,12 +114,12 @@ func (a *DaemonSetAdapter) ClusterCreate(client kubeclientset.Interface, obj pkg
return client.Extensions().DaemonSets(daemonset.Namespace).Create(daemonset)
}
func (a *DaemonSetAdapter) ClusterDelete(client kubeclientset.Interface, nsName types.NamespacedName, options *metav1.DeleteOptions) error {
return client.Extensions().DaemonSets(nsName.Namespace).Delete(nsName.Name, options)
func (a *DaemonSetAdapter) ClusterDelete(client kubeclientset.Interface, qualifiedName QualifiedName, options *metav1.DeleteOptions) error {
return client.Extensions().DaemonSets(qualifiedName.Namespace).Delete(qualifiedName.Name, options)
}
func (a *DaemonSetAdapter) ClusterGet(client kubeclientset.Interface, namespacedName types.NamespacedName) (pkgruntime.Object, error) {
return client.Extensions().DaemonSets(namespacedName.Namespace).Get(namespacedName.Name, metav1.GetOptions{})
func (a *DaemonSetAdapter) ClusterGet(client kubeclientset.Interface, qualifiedName QualifiedName) (pkgruntime.Object, error) {
return client.Extensions().DaemonSets(qualifiedName.Namespace).Get(qualifiedName.Name, metav1.GetOptions{})
}
func (a *DaemonSetAdapter) ClusterList(client kubeclientset.Interface, namespace string, options metav1.ListOptions) (pkgruntime.Object, error) {

View File

@ -0,0 +1,41 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package federatedtypes
import (
"fmt"
)
// QualifiedName comprises a resource name with an optional namespace.
// If namespace is provided, a QualifiedName will be rendered as
// "<namespace>/<name>". If not, it will be rendered as "name". This
// is intended to allow the FederatedTypeAdapter interface and its
// consumers to operate on both namespaces and namespace-qualified
// resources.
type QualifiedName struct {
Namespace string
Name string
}
// String returns the general purpose string representation
func (n QualifiedName) String() string {
if len(n.Namespace) == 0 {
return n.Name
}
return fmt.Sprintf("%s/%s", n.Namespace, n.Name)
}

View File

@ -22,7 +22,6 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
pkgruntime "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/watch"
federationclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset"
fedutil "k8s.io/kubernetes/federation/pkg/federation-controller/util"
@ -91,9 +90,9 @@ func (a *ReplicaSetAdapter) Equivalent(obj1, obj2 pkgruntime.Object) bool {
return fedutil.ObjectMetaAndSpecEquivalent(obj1, obj2)
}
func (a *ReplicaSetAdapter) NamespacedName(obj pkgruntime.Object) types.NamespacedName {
func (a *ReplicaSetAdapter) QualifiedName(obj pkgruntime.Object) QualifiedName {
replicaset := obj.(*extensionsv1.ReplicaSet)
return types.NamespacedName{Namespace: replicaset.Namespace, Name: replicaset.Name}
return QualifiedName{Namespace: replicaset.Namespace, Name: replicaset.Name}
}
func (a *ReplicaSetAdapter) ObjectMeta(obj pkgruntime.Object) *metav1.ObjectMeta {
@ -105,12 +104,12 @@ func (a *ReplicaSetAdapter) FedCreate(obj pkgruntime.Object) (pkgruntime.Object,
return a.client.Extensions().ReplicaSets(replicaset.Namespace).Create(replicaset)
}
func (a *ReplicaSetAdapter) FedDelete(namespacedName types.NamespacedName, options *metav1.DeleteOptions) error {
return a.client.Extensions().ReplicaSets(namespacedName.Namespace).Delete(namespacedName.Name, options)
func (a *ReplicaSetAdapter) FedDelete(qualifiedName QualifiedName, options *metav1.DeleteOptions) error {
return a.client.Extensions().ReplicaSets(qualifiedName.Namespace).Delete(qualifiedName.Name, options)
}
func (a *ReplicaSetAdapter) FedGet(namespacedName types.NamespacedName) (pkgruntime.Object, error) {
return a.client.Extensions().ReplicaSets(namespacedName.Namespace).Get(namespacedName.Name, metav1.GetOptions{})
func (a *ReplicaSetAdapter) FedGet(qualifiedName QualifiedName) (pkgruntime.Object, error) {
return a.client.Extensions().ReplicaSets(qualifiedName.Namespace).Get(qualifiedName.Name, metav1.GetOptions{})
}
func (a *ReplicaSetAdapter) FedList(namespace string, options metav1.ListOptions) (pkgruntime.Object, error) {
@ -131,12 +130,12 @@ func (a *ReplicaSetAdapter) ClusterCreate(client kubeclientset.Interface, obj pk
return client.Extensions().ReplicaSets(replicaset.Namespace).Create(replicaset)
}
func (a *ReplicaSetAdapter) ClusterDelete(client kubeclientset.Interface, nsName types.NamespacedName, options *metav1.DeleteOptions) error {
return client.Extensions().ReplicaSets(nsName.Namespace).Delete(nsName.Name, options)
func (a *ReplicaSetAdapter) ClusterDelete(client kubeclientset.Interface, qualifiedName QualifiedName, options *metav1.DeleteOptions) error {
return client.Extensions().ReplicaSets(qualifiedName.Namespace).Delete(qualifiedName.Name, options)
}
func (a *ReplicaSetAdapter) ClusterGet(client kubeclientset.Interface, namespacedName types.NamespacedName) (pkgruntime.Object, error) {
return client.Extensions().ReplicaSets(namespacedName.Namespace).Get(namespacedName.Name, metav1.GetOptions{})
func (a *ReplicaSetAdapter) ClusterGet(client kubeclientset.Interface, qualifiedName QualifiedName) (pkgruntime.Object, error) {
return client.Extensions().ReplicaSets(qualifiedName.Namespace).Get(qualifiedName.Name, metav1.GetOptions{})
}
func (a *ReplicaSetAdapter) ClusterList(client kubeclientset.Interface, namespace string, options metav1.ListOptions) (pkgruntime.Object, error) {

View File

@ -21,7 +21,6 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
pkgruntime "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/watch"
federationclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset"
"k8s.io/kubernetes/federation/pkg/federation-controller/util"
@ -73,9 +72,9 @@ func (a *SecretAdapter) Equivalent(obj1, obj2 pkgruntime.Object) bool {
return util.SecretEquivalent(*secret1, *secret2)
}
func (a *SecretAdapter) NamespacedName(obj pkgruntime.Object) types.NamespacedName {
func (a *SecretAdapter) QualifiedName(obj pkgruntime.Object) QualifiedName {
secret := obj.(*apiv1.Secret)
return types.NamespacedName{Namespace: secret.Namespace, Name: secret.Name}
return QualifiedName{Namespace: secret.Namespace, Name: secret.Name}
}
func (a *SecretAdapter) ObjectMeta(obj pkgruntime.Object) *metav1.ObjectMeta {
@ -87,12 +86,12 @@ func (a *SecretAdapter) FedCreate(obj pkgruntime.Object) (pkgruntime.Object, err
return a.client.CoreV1().Secrets(secret.Namespace).Create(secret)
}
func (a *SecretAdapter) FedDelete(namespacedName types.NamespacedName, options *metav1.DeleteOptions) error {
return a.client.CoreV1().Secrets(namespacedName.Namespace).Delete(namespacedName.Name, options)
func (a *SecretAdapter) FedDelete(qualifiedName QualifiedName, options *metav1.DeleteOptions) error {
return a.client.CoreV1().Secrets(qualifiedName.Namespace).Delete(qualifiedName.Name, options)
}
func (a *SecretAdapter) FedGet(namespacedName types.NamespacedName) (pkgruntime.Object, error) {
return a.client.CoreV1().Secrets(namespacedName.Namespace).Get(namespacedName.Name, metav1.GetOptions{})
func (a *SecretAdapter) FedGet(qualifiedName QualifiedName) (pkgruntime.Object, error) {
return a.client.CoreV1().Secrets(qualifiedName.Namespace).Get(qualifiedName.Name, metav1.GetOptions{})
}
func (a *SecretAdapter) FedList(namespace string, options metav1.ListOptions) (pkgruntime.Object, error) {
@ -113,12 +112,12 @@ func (a *SecretAdapter) ClusterCreate(client kubeclientset.Interface, obj pkgrun
return client.CoreV1().Secrets(secret.Namespace).Create(secret)
}
func (a *SecretAdapter) ClusterDelete(client kubeclientset.Interface, nsName types.NamespacedName, options *metav1.DeleteOptions) error {
return client.CoreV1().Secrets(nsName.Namespace).Delete(nsName.Name, options)
func (a *SecretAdapter) ClusterDelete(client kubeclientset.Interface, qualifiedName QualifiedName, options *metav1.DeleteOptions) error {
return client.CoreV1().Secrets(qualifiedName.Namespace).Delete(qualifiedName.Name, options)
}
func (a *SecretAdapter) ClusterGet(client kubeclientset.Interface, namespacedName types.NamespacedName) (pkgruntime.Object, error) {
return client.CoreV1().Secrets(namespacedName.Namespace).Get(namespacedName.Name, metav1.GetOptions{})
func (a *SecretAdapter) ClusterGet(client kubeclientset.Interface, qualifiedName QualifiedName) (pkgruntime.Object, error) {
return client.CoreV1().Secrets(qualifiedName.Namespace).Get(qualifiedName.Name, metav1.GetOptions{})
}
func (a *SecretAdapter) ClusterList(client kubeclientset.Interface, namespace string, options metav1.ListOptions) (pkgruntime.Object, error) {

View File

@ -28,7 +28,6 @@ go_library(
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/watch:go_default_library",
@ -42,28 +41,18 @@ go_library(
go_test(
name = "go_default_test",
srcs = [
"controller_test.go",
"deploymentcontroller_test.go",
],
srcs = ["controller_test.go"],
library = ":go_default_library",
tags = ["automanaged"],
deps = [
"//federation/apis/federation/v1beta1:go_default_library",
"//federation/client/clientset_generated/federation_clientset/fake:go_default_library",
"//federation/pkg/federatedtypes:go_default_library",
"//federation/pkg/federation-controller/util:go_default_library",
"//federation/pkg/federation-controller/util/test:go_default_library",
"//pkg/client/clientset_generated/clientset:go_default_library",
"//pkg/client/clientset_generated/clientset/fake:go_default_library",
"//vendor/github.com/stretchr/testify/assert:go_default_library",
"//vendor/github.com/stretchr/testify/require:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/api/extensions/v1beta1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
],
)

View File

@ -24,7 +24,6 @@ import (
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
pkgruntime "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
@ -189,9 +188,9 @@ func newFederationSyncController(client federationclientset.Interface, adapter f
return err
},
func(client kubeclientset.Interface, obj pkgruntime.Object) error {
namespacedName := adapter.NamespacedName(obj)
qualifiedName := adapter.QualifiedName(obj)
orphanDependents := false
err := adapter.ClusterDelete(client, namespacedName, &metav1.DeleteOptions{OrphanDependents: &orphanDependents})
err := adapter.ClusterDelete(client, qualifiedName, &metav1.DeleteOptions{OrphanDependents: &orphanDependents})
return err
})
@ -199,7 +198,7 @@ func newFederationSyncController(client federationclientset.Interface, adapter f
s.updateObject,
// objNameFunc
func(obj pkgruntime.Object) string {
return adapter.NamespacedName(obj).String()
return adapter.QualifiedName(obj).String()
},
s.informer,
s.updater,
@ -264,38 +263,38 @@ func (s *FederationSyncController) worker() {
}
item := obj.(*util.DelayingDelivererItem)
namespacedName := item.Value.(*types.NamespacedName)
status := s.reconcile(*namespacedName)
qualifiedName := item.Value.(*federatedtypes.QualifiedName)
status := s.reconcile(*qualifiedName)
s.workQueue.Done(item)
switch status {
case statusAllOK:
break
case statusError:
s.deliver(*namespacedName, 0, true)
s.deliver(*qualifiedName, 0, true)
case statusNeedsRecheck:
s.deliver(*namespacedName, s.reviewDelay, false)
s.deliver(*qualifiedName, s.reviewDelay, false)
case statusNotSynced:
s.deliver(*namespacedName, s.clusterAvailableDelay, false)
s.deliver(*qualifiedName, s.clusterAvailableDelay, false)
}
}
}
func (s *FederationSyncController) deliverObj(obj pkgruntime.Object, delay time.Duration, failed bool) {
namespacedName := s.adapter.NamespacedName(obj)
s.deliver(namespacedName, delay, failed)
qualifiedName := s.adapter.QualifiedName(obj)
s.deliver(qualifiedName, delay, failed)
}
// Adds backoff to delay if this delivery is related to some failure. Resets backoff if there was no failure.
func (s *FederationSyncController) deliver(namespacedName types.NamespacedName, delay time.Duration, failed bool) {
key := namespacedName.String()
func (s *FederationSyncController) deliver(qualifiedName federatedtypes.QualifiedName, delay time.Duration, failed bool) {
key := qualifiedName.String()
if failed {
s.backoff.Next(key, time.Now())
delay = delay + s.backoff.Get(key)
} else {
s.backoff.Reset(key)
}
s.deliverer.DeliverAfter(key, &namespacedName, delay)
s.deliverer.DeliverAfter(key, &qualifiedName, delay)
}
// Check whether all data stores are in sync. False is returned if any of the informer/stores is not yet
@ -322,18 +321,18 @@ func (s *FederationSyncController) reconcileOnClusterChange() {
s.clusterDeliverer.DeliverAt(allClustersKey, nil, time.Now().Add(s.clusterAvailableDelay))
}
for _, obj := range s.store.List() {
namespacedName := s.adapter.NamespacedName(obj.(pkgruntime.Object))
s.deliver(namespacedName, s.smallDelay, false)
qualifiedName := s.adapter.QualifiedName(obj.(pkgruntime.Object))
s.deliver(qualifiedName, s.smallDelay, false)
}
}
func (s *FederationSyncController) reconcile(namespacedName types.NamespacedName) reconciliationStatus {
func (s *FederationSyncController) reconcile(qualifiedName federatedtypes.QualifiedName) reconciliationStatus {
if !s.isSynced() {
return statusNotSynced
}
kind := s.adapter.Kind()
key := namespacedName.String()
key := qualifiedName.String()
glog.V(4).Infof("Starting to reconcile %v %v", kind, key)
startTime := time.Now()
@ -349,10 +348,10 @@ func (s *FederationSyncController) reconcile(namespacedName types.NamespacedName
meta := s.adapter.ObjectMeta(obj)
if meta.DeletionTimestamp != nil {
err := s.delete(obj, kind, namespacedName)
err := s.delete(obj, kind, qualifiedName)
if err != nil {
msg := "Failed to delete %s %q: %v"
args := []interface{}{kind, namespacedName, err}
args := []interface{}{kind, qualifiedName, err}
runtime.HandleError(fmt.Errorf(msg, args...))
s.eventRecorder.Eventf(obj, api.EventTypeWarning, "DeleteFailed", msg, args...)
return statusError
@ -415,14 +414,14 @@ func (s *FederationSyncController) objFromCache(kind, key string) (pkgruntime.Ob
}
// delete deletes the given resource or returns error if the deletion was not complete.
func (s *FederationSyncController) delete(obj pkgruntime.Object, kind string, namespacedName types.NamespacedName) error {
glog.V(3).Infof("Handling deletion of %s %q", kind, namespacedName)
func (s *FederationSyncController) delete(obj pkgruntime.Object, kind string, qualifiedName federatedtypes.QualifiedName) error {
glog.V(3).Infof("Handling deletion of %s %q", kind, qualifiedName)
_, err := s.deletionHelper.HandleObjectInUnderlyingClusters(obj)
if err != nil {
return err
}
err = s.adapter.FedDelete(namespacedName, nil)
err = s.adapter.FedDelete(qualifiedName, nil)
if err != nil {
// Its all good if the error is not found error. That means it is deleted already and we do not have to do anything.
// This is expected when we are processing an update as a result of finalizer deletion.

View File

@ -1,146 +0,0 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package sync
import (
"flag"
"fmt"
"testing"
apiv1 "k8s.io/api/core/v1"
extensionsv1 "k8s.io/api/extensions/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
fedv1 "k8s.io/kubernetes/federation/apis/federation/v1beta1"
fakefedclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset/fake"
"k8s.io/kubernetes/federation/pkg/federatedtypes"
. "k8s.io/kubernetes/federation/pkg/federation-controller/util/test"
kubeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
fakekubeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset/fake"
"github.com/stretchr/testify/assert"
)
const (
deployments = "deployments"
)
func TestDeploymentController(t *testing.T) {
flag.Set("logtostderr", "true")
flag.Set("v", "5")
flag.Parse()
cluster1 := NewCluster("cluster1", apiv1.ConditionTrue)
cluster2 := NewCluster("cluster2", apiv1.ConditionTrue)
fakeClient := &fakefedclientset.Clientset{}
// Add an update reactor on fake client to return the desired updated object.
// This is a hack to workaround https://github.com/kubernetes/kubernetes/issues/40939.
AddFakeUpdateReactor(deployments, &fakeClient.Fake)
RegisterFakeList("clusters", &fakeClient.Fake, &fedv1.ClusterList{Items: []fedv1.Cluster{*cluster1}})
deploymentsWatch := RegisterFakeWatch(deployments, &fakeClient.Fake)
clusterWatch := RegisterFakeWatch("clusters", &fakeClient.Fake)
cluster1Client := &fakekubeclientset.Clientset{}
cluster1Watch := RegisterFakeWatch(deployments, &cluster1Client.Fake)
cluster1CreateChan := RegisterFakeCopyOnCreate(deployments, &cluster1Client.Fake, cluster1Watch)
cluster1UpdateChan := RegisterFakeCopyOnUpdate(deployments, &cluster1Client.Fake, cluster1Watch)
cluster2Client := &fakekubeclientset.Clientset{}
cluster2Watch := RegisterFakeWatch(deployments, &cluster2Client.Fake)
cluster2CreateChan := RegisterFakeCopyOnCreate(deployments, &cluster2Client.Fake, cluster2Watch)
deploymentController := newFederationSyncController(fakeClient, federatedtypes.NewDeploymentAdapter(fakeClient))
deploymentController.minimizeLatency()
clientFactory := func(cluster *fedv1.Cluster) (kubeclientset.Interface, error) {
switch cluster.Name {
case cluster1.Name:
return cluster1Client, nil
case cluster2.Name:
return cluster2Client, nil
default:
return nil, fmt.Errorf("Unknown cluster")
}
}
ToFederatedInformerForTestOnly(deploymentController.informer).SetClientFactory(clientFactory)
stop := make(chan struct{})
go deploymentController.Run(stop)
// Create deployment. Expect to see it in cluster1.
dep1 := newDeploymentWithReplicas("depA", 6)
deploymentsWatch.Add(dep1)
checkDeployment := func(base *extensionsv1.Deployment, replicas int32) CheckingFunction {
return func(obj runtime.Object) error {
if obj == nil {
return fmt.Errorf("Observed object is nil")
}
d := obj.(*extensionsv1.Deployment)
if err := CompareObjectMeta(base.ObjectMeta, d.ObjectMeta); err != nil {
return err
}
if replicas != *d.Spec.Replicas {
return fmt.Errorf("Replica count is different expected:%d observed:%d", replicas, *d.Spec.Replicas)
}
return nil
}
}
assert.NoError(t, CheckObjectFromChan(cluster1CreateChan, checkDeployment(dep1, *dep1.Spec.Replicas)))
err := WaitForStoreUpdate(
deploymentController.informer.GetTargetStore(),
cluster1.Name, types.NamespacedName{Namespace: dep1.Namespace, Name: dep1.Name}.String(), wait.ForeverTestTimeout)
assert.Nil(t, err, "deployment should have appeared in the informer store")
// Increase replica count. Expect to see the update in cluster1.
newRep := int32(8)
dep1.Spec.Replicas = &newRep
deploymentsWatch.Modify(dep1)
assert.NoError(t, CheckObjectFromChan(cluster1UpdateChan, checkDeployment(dep1, *dep1.Spec.Replicas)))
// Add new cluster. Although rebalance = false, no pods have been created yet so it should
// rebalance anyway.
clusterWatch.Add(cluster2)
assert.NoError(t, CheckObjectFromChan(cluster1UpdateChan, checkDeployment(dep1, *dep1.Spec.Replicas/2)))
assert.NoError(t, CheckObjectFromChan(cluster2CreateChan, checkDeployment(dep1, *dep1.Spec.Replicas/2)))
// Add new deployment with non-default replica placement preferences.
dep2 := newDeploymentWithReplicas("deployment2", 9)
dep2.Annotations = make(map[string]string)
dep2.Annotations[federatedtypes.FedDeploymentPreferencesAnnotation] = `{"rebalance": true,
"clusters": {
"cluster1": {"weight": 2},
"cluster2": {"weight": 1}
}}`
deploymentsWatch.Add(dep2)
assert.NoError(t, CheckObjectFromChan(cluster1CreateChan, checkDeployment(dep2, 6)))
assert.NoError(t, CheckObjectFromChan(cluster2CreateChan, checkDeployment(dep2, 3)))
}
func newDeploymentWithReplicas(name string, replicas int32) *extensionsv1.Deployment {
return &extensionsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: metav1.NamespaceDefault,
SelfLink: "/api/v1/namespaces/default/deployments/name",
},
Spec: extensionsv1.DeploymentSpec{
Replicas: &replicas,
},
}
}