mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-30 15:05:27 +00:00
Adding cadcading deletion support to federated secrets
This commit is contained in:
parent
4b66d80e85
commit
17b2178222
@ -17,18 +17,22 @@ limitations under the License.
|
||||
package secret
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
federation_api "k8s.io/kubernetes/federation/apis/federation/v1beta1"
|
||||
federationclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_release_1_5"
|
||||
"k8s.io/kubernetes/federation/pkg/federation-controller/util"
|
||||
"k8s.io/kubernetes/federation/pkg/federation-controller/util/deletionhelper"
|
||||
"k8s.io/kubernetes/federation/pkg/federation-controller/util/eventsink"
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/api/errors"
|
||||
api_v1 "k8s.io/kubernetes/pkg/api/v1"
|
||||
"k8s.io/kubernetes/pkg/client/cache"
|
||||
kubeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_5"
|
||||
"k8s.io/kubernetes/pkg/client/record"
|
||||
"k8s.io/kubernetes/pkg/controller"
|
||||
"k8s.io/kubernetes/pkg/conversion"
|
||||
pkg_runtime "k8s.io/kubernetes/pkg/runtime"
|
||||
"k8s.io/kubernetes/pkg/types"
|
||||
"k8s.io/kubernetes/pkg/util/flowcontrol"
|
||||
@ -69,6 +73,8 @@ type SecretController struct {
|
||||
// For events
|
||||
eventRecorder record.EventRecorder
|
||||
|
||||
deletionHelper *deletionhelper.DeletionHelper
|
||||
|
||||
secretReviewDelay time.Duration
|
||||
clusterAvailableDelay time.Duration
|
||||
smallDelay time.Duration
|
||||
@ -162,9 +168,73 @@ func NewSecretController(client federationclientset.Interface) *SecretController
|
||||
err := client.Core().Secrets(secret.Namespace).Delete(secret.Name, &api_v1.DeleteOptions{})
|
||||
return err
|
||||
})
|
||||
|
||||
secretcontroller.deletionHelper = deletionhelper.NewDeletionHelper(
|
||||
secretcontroller.hasFinalizerFunc,
|
||||
secretcontroller.removeFinalizerFunc,
|
||||
secretcontroller.addFinalizerFunc,
|
||||
// objNameFunc
|
||||
func(obj pkg_runtime.Object) string {
|
||||
secret := obj.(*api_v1.Secret)
|
||||
return secret.Name
|
||||
},
|
||||
secretcontroller.updateTimeout,
|
||||
secretcontroller.eventRecorder,
|
||||
secretcontroller.secretFederatedInformer,
|
||||
secretcontroller.federatedUpdater,
|
||||
)
|
||||
|
||||
return secretcontroller
|
||||
}
|
||||
|
||||
// Returns true if the given object has the given finalizer in its ObjectMeta.
|
||||
func (secretcontroller *SecretController) hasFinalizerFunc(obj pkg_runtime.Object, finalizer string) bool {
|
||||
secret := obj.(*api_v1.Secret)
|
||||
for i := range secret.ObjectMeta.Finalizers {
|
||||
if string(secret.ObjectMeta.Finalizers[i]) == finalizer {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// Removes the finalizer from the given objects ObjectMeta.
|
||||
// Assumes that the given object is a secret.
|
||||
func (secretcontroller *SecretController) removeFinalizerFunc(obj pkg_runtime.Object, finalizer string) (pkg_runtime.Object, error) {
|
||||
secret := obj.(*api_v1.Secret)
|
||||
newFinalizers := []string{}
|
||||
hasFinalizer := false
|
||||
for i := range secret.ObjectMeta.Finalizers {
|
||||
if string(secret.ObjectMeta.Finalizers[i]) != finalizer {
|
||||
newFinalizers = append(newFinalizers, secret.ObjectMeta.Finalizers[i])
|
||||
} else {
|
||||
hasFinalizer = true
|
||||
}
|
||||
}
|
||||
if !hasFinalizer {
|
||||
// Nothing to do.
|
||||
return obj, nil
|
||||
}
|
||||
secret.ObjectMeta.Finalizers = newFinalizers
|
||||
secret, err := secretcontroller.federatedApiClient.Core().Secrets(secret.Namespace).Update(secret)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to remove finalizer %s from secret %s: %v", finalizer, secret.Name, err)
|
||||
}
|
||||
return secret, nil
|
||||
}
|
||||
|
||||
// Adds the given finalizer to the given objects ObjectMeta.
|
||||
// Assumes that the given object is a secret.
|
||||
func (secretcontroller *SecretController) addFinalizerFunc(obj pkg_runtime.Object, finalizer string) (pkg_runtime.Object, error) {
|
||||
secret := obj.(*api_v1.Secret)
|
||||
secret.ObjectMeta.Finalizers = append(secret.ObjectMeta.Finalizers, finalizer)
|
||||
secret, err := secretcontroller.federatedApiClient.Core().Secrets(secret.Namespace).Update(secret)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to add finalizer %s to secret %s: %v", finalizer, secret.Name, err)
|
||||
}
|
||||
return secret, nil
|
||||
}
|
||||
|
||||
func (secretcontroller *SecretController) Run(stopChan <-chan struct{}) {
|
||||
go secretcontroller.secretInformerController.Run(stopChan)
|
||||
secretcontroller.secretFederatedInformer.Start()
|
||||
@ -229,14 +299,13 @@ func (secretcontroller *SecretController) reconcileSecretsOnClusterChange() {
|
||||
}
|
||||
|
||||
func (secretcontroller *SecretController) reconcileSecret(secret types.NamespacedName) {
|
||||
|
||||
if !secretcontroller.isSynced() {
|
||||
secretcontroller.deliverSecret(secret, secretcontroller.clusterAvailableDelay, false)
|
||||
return
|
||||
}
|
||||
|
||||
key := secret.String()
|
||||
baseSecretObj, exist, err := secretcontroller.secretInformerStore.GetByKey(key)
|
||||
baseSecretObjFromStore, exist, err := secretcontroller.secretInformerStore.GetByKey(key)
|
||||
if err != nil {
|
||||
glog.Errorf("Failed to query main secret store for %v: %v", key, err)
|
||||
secretcontroller.deliverSecret(secret, 0, true)
|
||||
@ -247,7 +316,39 @@ func (secretcontroller *SecretController) reconcileSecret(secret types.Namespace
|
||||
// Not federated secret, ignoring.
|
||||
return
|
||||
}
|
||||
baseSecret := baseSecretObj.(*api_v1.Secret)
|
||||
|
||||
// Create a copy before modifying the obj to prevent race condition with
|
||||
// other readers of obj from store.
|
||||
baseSecretObj, err := conversion.NewCloner().DeepCopy(baseSecretObjFromStore)
|
||||
baseSecret, ok := baseSecretObj.(*api_v1.Secret)
|
||||
if err != nil || !ok {
|
||||
glog.Errorf("Error in retrieving obj from store: %v, %v", ok, err)
|
||||
secretcontroller.deliverSecret(secret, 0, true)
|
||||
return
|
||||
}
|
||||
if baseSecret.DeletionTimestamp != nil {
|
||||
if err := secretcontroller.delete(baseSecret); err != nil {
|
||||
glog.Errorf("Failed to delete %s: %v", secret, err)
|
||||
secretcontroller.eventRecorder.Eventf(baseSecret, api.EventTypeNormal, "DeleteFailed",
|
||||
"Secret delete failed: %v", err)
|
||||
secretcontroller.deliverSecret(secret, 0, true)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
glog.V(3).Infof("Ensuring delete object from underlying clusters finalizer for secret: %s",
|
||||
baseSecret.Name)
|
||||
// Add the required finalizers before creating a secret in underlying clusters.
|
||||
updatedSecretObj, err := secretcontroller.deletionHelper.EnsureFinalizers(baseSecret)
|
||||
if err != nil {
|
||||
glog.Errorf("Failed to ensure delete object from underlying clusters finalizer in secret %s: %v",
|
||||
baseSecret.Name, err)
|
||||
secretcontroller.deliverSecret(secret, 0, false)
|
||||
return
|
||||
}
|
||||
baseSecret = updatedSecretObj.(*api_v1.Secret)
|
||||
|
||||
glog.V(3).Infof("Syncing secret %s in underlying clusters", baseSecret.Name)
|
||||
|
||||
clusters, err := secretcontroller.secretFederatedInformer.GetReadyClusters()
|
||||
if err != nil {
|
||||
@ -316,3 +417,23 @@ func (secretcontroller *SecretController) reconcileSecret(secret types.Namespace
|
||||
// Evertyhing is in order but lets be double sure
|
||||
secretcontroller.deliverSecret(secret, secretcontroller.secretReviewDelay, false)
|
||||
}
|
||||
|
||||
// delete deletes the given secret or returns error if the deletion was not complete.
|
||||
func (secretcontroller *SecretController) delete(secret *api_v1.Secret) error {
|
||||
glog.V(3).Infof("Handling deletion of secret: %v", *secret)
|
||||
_, err := secretcontroller.deletionHelper.HandleObjectInUnderlyingClusters(secret)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = secretcontroller.federatedApiClient.Core().Secrets(secret.Namespace).Delete(secret.Name, nil)
|
||||
if err != nil {
|
||||
// Its all good if the error is not found error. That means it is deleted already and we do not have to do anything.
|
||||
// This is expected when we are processing an update as a result of secret finalizer deletion.
|
||||
// The process that deleted the last finalizer is also going to delete the secret and we do not have to do anything.
|
||||
if !errors.IsNotFound(err) {
|
||||
return fmt.Errorf("failed to delete secret: %v", err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -24,6 +24,8 @@ import (
|
||||
|
||||
federation_api "k8s.io/kubernetes/federation/apis/federation/v1beta1"
|
||||
fake_fedclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_release_1_5/fake"
|
||||
"k8s.io/kubernetes/federation/pkg/federation-controller/util"
|
||||
"k8s.io/kubernetes/federation/pkg/federation-controller/util/deletionhelper"
|
||||
. "k8s.io/kubernetes/federation/pkg/federation-controller/util/test"
|
||||
api_v1 "k8s.io/kubernetes/pkg/api/v1"
|
||||
kubeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_5"
|
||||
@ -43,6 +45,7 @@ func TestSecretController(t *testing.T) {
|
||||
RegisterFakeList("clusters", &fakeClient.Fake, &federation_api.ClusterList{Items: []federation_api.Cluster{*cluster1}})
|
||||
RegisterFakeList("secrets", &fakeClient.Fake, &api_v1.SecretList{Items: []api_v1.Secret{}})
|
||||
secretWatch := RegisterFakeWatch("secrets", &fakeClient.Fake)
|
||||
secretUpdateChan := RegisterFakeCopyOnUpdate("secrets", &fakeClient.Fake, secretWatch)
|
||||
clusterWatch := RegisterFakeWatch("clusters", &fakeClient.Fake)
|
||||
|
||||
cluster1Client := &fake_kubeclientset.Clientset{}
|
||||
@ -57,8 +60,7 @@ func TestSecretController(t *testing.T) {
|
||||
cluster2CreateChan := RegisterFakeCopyOnCreate("secrets", &cluster2Client.Fake, cluster2Watch)
|
||||
|
||||
secretController := NewSecretController(fakeClient)
|
||||
informer := ToFederatedInformerForTestOnly(secretController.secretFederatedInformer)
|
||||
informer.SetClientFactory(func(cluster *federation_api.Cluster) (kubeclientset.Interface, error) {
|
||||
informerClientFactory := func(cluster *federation_api.Cluster) (kubeclientset.Interface, error) {
|
||||
switch cluster.Name {
|
||||
case cluster1.Name:
|
||||
return cluster1Client, nil
|
||||
@ -67,7 +69,8 @@ func TestSecretController(t *testing.T) {
|
||||
default:
|
||||
return nil, fmt.Errorf("Unknown cluster")
|
||||
}
|
||||
})
|
||||
}
|
||||
setClientFactory(secretController.secretFederatedInformer, informerClientFactory)
|
||||
|
||||
secretController.clusterAvailableDelay = time.Second
|
||||
secretController.secretReviewDelay = 50 * time.Millisecond
|
||||
@ -92,11 +95,20 @@ func TestSecretController(t *testing.T) {
|
||||
|
||||
// Test add federated secret.
|
||||
secretWatch.Add(&secret1)
|
||||
// There should be 2 updates to add both the finalizers.
|
||||
updatedSecret := GetSecretFromChan(secretUpdateChan)
|
||||
assert.True(t, secretController.hasFinalizerFunc(updatedSecret, deletionhelper.FinalizerDeleteFromUnderlyingClusters))
|
||||
updatedSecret = GetSecretFromChan(secretUpdateChan)
|
||||
assert.True(t, secretController.hasFinalizerFunc(updatedSecret, api_v1.FinalizerOrphan))
|
||||
secret1 = *updatedSecret
|
||||
|
||||
// Verify that the secret is created in underlying cluster1.
|
||||
createdSecret := GetSecretFromChan(cluster1CreateChan)
|
||||
assert.NotNil(t, createdSecret)
|
||||
assert.Equal(t, secret1.Namespace, createdSecret.Namespace)
|
||||
assert.Equal(t, secret1.Name, createdSecret.Name)
|
||||
assert.True(t, secretsEqual(secret1, *createdSecret))
|
||||
assert.True(t, secretsEqual(secret1, *createdSecret),
|
||||
fmt.Sprintf("expected: %v, actual: %v", secret1, *createdSecret))
|
||||
|
||||
// Wait for the secret to appear in the informer store
|
||||
err := WaitForStoreUpdate(
|
||||
@ -109,11 +121,12 @@ func TestSecretController(t *testing.T) {
|
||||
"A": "B",
|
||||
}
|
||||
secretWatch.Modify(&secret1)
|
||||
updatedSecret := GetSecretFromChan(cluster1UpdateChan)
|
||||
updatedSecret = GetSecretFromChan(cluster1UpdateChan)
|
||||
assert.NotNil(t, updatedSecret)
|
||||
assert.Equal(t, secret1.Name, updatedSecret.Name)
|
||||
assert.Equal(t, secret1.Namespace, updatedSecret.Namespace)
|
||||
assert.True(t, secretsEqual(secret1, *updatedSecret))
|
||||
assert.True(t, secretsEqual(secret1, *updatedSecret),
|
||||
fmt.Sprintf("expected: %v, actual: %v", secret1, *updatedSecret))
|
||||
|
||||
// Test update federated secret.
|
||||
secret1.Data = map[string][]byte{
|
||||
@ -124,7 +137,8 @@ func TestSecretController(t *testing.T) {
|
||||
assert.NotNil(t, updatedSecret)
|
||||
assert.Equal(t, secret1.Name, updatedSecret.Name)
|
||||
assert.Equal(t, secret1.Namespace, updatedSecret.Namespace)
|
||||
assert.True(t, secretsEqual(secret1, *updatedSecret2))
|
||||
assert.True(t, secretsEqual(secret1, *updatedSecret2),
|
||||
fmt.Sprintf("expected: %v, actual: %v", secret1, *updatedSecret2))
|
||||
|
||||
// Test add cluster
|
||||
clusterWatch.Add(cluster2)
|
||||
@ -132,14 +146,24 @@ func TestSecretController(t *testing.T) {
|
||||
assert.NotNil(t, createdSecret2)
|
||||
assert.Equal(t, secret1.Name, createdSecret2.Name)
|
||||
assert.Equal(t, secret1.Namespace, createdSecret2.Namespace)
|
||||
assert.True(t, secretsEqual(secret1, *createdSecret2))
|
||||
assert.True(t, secretsEqual(secret1, *createdSecret2),
|
||||
fmt.Sprintf("expected: %v, actual: %v", secret1, *createdSecret2))
|
||||
|
||||
close(stop)
|
||||
}
|
||||
|
||||
func setClientFactory(informer util.FederatedInformer, informerClientFactory func(*federation_api.Cluster) (kubeclientset.Interface, error)) {
|
||||
testInformer := ToFederatedInformerForTestOnly(informer)
|
||||
testInformer.SetClientFactory(informerClientFactory)
|
||||
}
|
||||
|
||||
func secretsEqual(a, b api_v1.Secret) bool {
|
||||
// Clear the SelfLink and ObjectMeta.Finalizers since they will be different
|
||||
// in resoure in federation control plane and resource in underlying cluster.
|
||||
a.SelfLink = ""
|
||||
b.SelfLink = ""
|
||||
a.ObjectMeta.Finalizers = []string{}
|
||||
b.ObjectMeta.Finalizers = []string{}
|
||||
return reflect.DeepEqual(a, b)
|
||||
}
|
||||
|
||||
|
@ -122,7 +122,7 @@ func (dh *DeletionHelper) HandleObjectInUnderlyingClusters(obj runtime.Object) (
|
||||
}
|
||||
hasOrphanFinalizer := dh.hasFinalizerFunc(obj, api_v1.FinalizerOrphan)
|
||||
if hasOrphanFinalizer {
|
||||
glog.V(3).Infof("Found finalizer orphan. Nothing to do, just remove the finalizer")
|
||||
glog.V(2).Infof("Found finalizer orphan. Nothing to do, just remove the finalizer")
|
||||
// If the obj has FinalizerOrphan finalizer, then we need to orphan the
|
||||
// corresponding objects in underlying clusters.
|
||||
// Just remove both the finalizers in that case.
|
||||
@ -133,6 +133,7 @@ func (dh *DeletionHelper) HandleObjectInUnderlyingClusters(obj runtime.Object) (
|
||||
return dh.removeFinalizerFunc(obj, FinalizerDeleteFromUnderlyingClusters)
|
||||
}
|
||||
|
||||
glog.V(2).Infof("Deleting obj %s from underlying clusters", objName)
|
||||
// Else, we need to delete the obj from all underlying clusters.
|
||||
unreadyClusters, err := dh.informer.GetUnreadyClusters()
|
||||
if err != nil {
|
||||
@ -141,7 +142,9 @@ func (dh *DeletionHelper) HandleObjectInUnderlyingClusters(obj runtime.Object) (
|
||||
// TODO: Handle the case when cluster resource is watched after this is executed.
|
||||
// This can happen if a namespace is deleted before its creation had been
|
||||
// observed in all underlying clusters.
|
||||
clusterNsObjs, err := dh.informer.GetTargetStore().GetFromAllClusters(objName)
|
||||
storeKey := dh.informer.GetTargetStore().GetKeyFor(obj)
|
||||
clusterNsObjs, err := dh.informer.GetTargetStore().GetFromAllClusters(storeKey)
|
||||
glog.V(3).Infof("Found %d objects in underlying clusters", len(clusterNsObjs))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get object %s from underlying clusters: %v", objName, err)
|
||||
}
|
||||
|
@ -54,6 +54,9 @@ type FederatedReadOnlyStore interface {
|
||||
// Returns all items from a cluster.
|
||||
ListFromCluster(clusterName string) ([]interface{}, error)
|
||||
|
||||
// GetKeyFor returns the key under which the item would be put in the store.
|
||||
GetKeyFor(item interface{}) string
|
||||
|
||||
// GetByKey returns the item stored under the given key in the specified cluster (if exist).
|
||||
GetByKey(clusterName string, key string) (interface{}, bool, error)
|
||||
|
||||
|
@ -197,12 +197,12 @@ func RegisterFakeCopyOnUpdate(resource string, client *core.Fake, watcher *Watch
|
||||
}
|
||||
|
||||
// GetObjectFromChan tries to get an api object from the given channel
|
||||
// within a reasonable time (1 min).
|
||||
// within a reasonable time.
|
||||
func GetObjectFromChan(c chan runtime.Object) runtime.Object {
|
||||
select {
|
||||
case obj := <-c:
|
||||
return obj
|
||||
case <-time.After(20 * time.Second):
|
||||
case <-time.After(wait.ForeverTestTimeout):
|
||||
pprof.Lookup("goroutine").WriteTo(os.Stderr, 1)
|
||||
return nil
|
||||
}
|
||||
|
@ -18,12 +18,14 @@ package e2e
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
. "github.com/onsi/ginkgo"
|
||||
. "github.com/onsi/gomega"
|
||||
fedclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_release_1_5"
|
||||
"k8s.io/kubernetes/federation/pkg/federation-controller/util"
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/api/errors"
|
||||
"k8s.io/kubernetes/pkg/api/v1"
|
||||
kubeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_5"
|
||||
@ -32,7 +34,7 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
FederatedSecretName = "federated-secret"
|
||||
secretNamePrefix = "e2e-secret-test-"
|
||||
FederatedSecretTimeout = 60 * time.Second
|
||||
MaxRetries = 3
|
||||
)
|
||||
@ -61,116 +63,188 @@ var _ = framework.KubeDescribe("Federation secrets [Feature:Federation]", func()
|
||||
nsName := f.FederationNamespace.Name
|
||||
secret := createSecretOrFail(f.FederationClientset_1_5, nsName)
|
||||
defer func() { // Cleanup
|
||||
By(fmt.Sprintf("Deleting secret %q in namespace %q", secret.Name, nsName))
|
||||
err := f.FederationClientset_1_5.Core().Secrets(nsName).Delete(secret.Name, &v1.DeleteOptions{})
|
||||
framework.ExpectNoError(err, "Error deleting secret %q in namespace %q", secret.Name, nsName)
|
||||
deleteSecretOrFail(f.FederationClientset_1_5, nsName, secret.Name, true)
|
||||
}()
|
||||
// wait for secret shards being created
|
||||
waitForSecretShardsOrFail(nsName, secret, clusters)
|
||||
secret = updateSecretOrFail(f.FederationClientset_1_5, nsName)
|
||||
secret = updateSecretOrFail(f.FederationClientset_1_5, nsName, secret.Name)
|
||||
waitForSecretShardsUpdatedOrFail(nsName, secret, clusters)
|
||||
})
|
||||
|
||||
It("should be deleted from underlying clusters when OrphanDependents is false", func() {
|
||||
framework.SkipUnlessFederated(f.ClientSet)
|
||||
nsName := f.FederationNamespace.Name
|
||||
verifyCascadingDeletion(f.FederationClientset_1_5, clusters, false, nsName)
|
||||
By(fmt.Sprintf("Verified that secrets were deleted from underlying clusters"))
|
||||
})
|
||||
|
||||
It("should not be deleted from underlying clusters when OrphanDependents is true", func() {
|
||||
framework.SkipUnlessFederated(f.ClientSet)
|
||||
nsName := f.FederationNamespace.Name
|
||||
verifyCascadingDeletion(f.FederationClientset_1_5, clusters, true, nsName)
|
||||
By(fmt.Sprintf("Verified that secrets were not deleted from underlying clusters"))
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
func createSecretOrFail(clientset *fedclientset.Clientset, namespace string) *v1.Secret {
|
||||
if clientset == nil || len(namespace) == 0 {
|
||||
Fail(fmt.Sprintf("Internal error: invalid parameters passed to createSecretOrFail: clientset: %v, namespace: %v", clientset, namespace))
|
||||
// Verifies that secrets are deleted from underlying clusters when orphan dependents is false
|
||||
// and they are not deleted when orphan dependents is true.
|
||||
func verifyCascadingDeletion(clientset *fedclientset.Clientset,
|
||||
clusters map[string]*cluster, orphanDependents bool, nsName string) {
|
||||
secret := createSecretOrFail(clientset, nsName)
|
||||
secretName := secret.Name
|
||||
// Check subclusters if the secret was created there.
|
||||
By(fmt.Sprintf("Waiting for secret %s to be created in all underlying clusters", secretName))
|
||||
err := wait.Poll(5*time.Second, 2*time.Minute, func() (bool, error) {
|
||||
for _, cluster := range clusters {
|
||||
_, err := cluster.Core().Secrets(nsName).Get(secretName)
|
||||
if err != nil {
|
||||
if !errors.IsNotFound(err) {
|
||||
return false, err
|
||||
}
|
||||
return false, nil
|
||||
}
|
||||
|
||||
}
|
||||
return true, nil
|
||||
})
|
||||
framework.ExpectNoError(err, "Not all secrets created")
|
||||
|
||||
By(fmt.Sprintf("Deleting secret %s", secretName))
|
||||
deleteSecretOrFail(clientset, nsName, secretName, orphanDependents)
|
||||
|
||||
By(fmt.Sprintf("Verifying secrets %s in underlying clusters", secretName))
|
||||
errMessages := []string{}
|
||||
for clusterName, clusterClientset := range clusters {
|
||||
_, err := clusterClientset.Core().Secrets(nsName).Get(secretName)
|
||||
if orphanDependents && errors.IsNotFound(err) {
|
||||
errMessages = append(errMessages, fmt.Sprintf("unexpected NotFound error for secret %s in cluster %s, expected secret to exist", secretName, clusterName))
|
||||
} else if !orphanDependents && (err == nil || !errors.IsNotFound(err)) {
|
||||
errMessages = append(errMessages, fmt.Sprintf("expected NotFound error for secret %s in cluster %s, got error: %v", secretName, clusterName, err))
|
||||
}
|
||||
}
|
||||
if len(errMessages) != 0 {
|
||||
framework.Failf("%s", strings.Join(errMessages, "; "))
|
||||
}
|
||||
}
|
||||
|
||||
func createSecretOrFail(clientset *fedclientset.Clientset, nsName string) *v1.Secret {
|
||||
if len(nsName) == 0 {
|
||||
Fail(fmt.Sprintf("Internal error: invalid parameters passed to createSecretOrFail: namespace: %v", nsName))
|
||||
}
|
||||
|
||||
secret := &v1.Secret{
|
||||
ObjectMeta: v1.ObjectMeta{
|
||||
Name: FederatedSecretName,
|
||||
Name: api.SimpleNameGenerator.GenerateName(secretNamePrefix),
|
||||
Namespace: nsName,
|
||||
},
|
||||
}
|
||||
By(fmt.Sprintf("Creating secret %q in namespace %q", secret.Name, namespace))
|
||||
_, err := clientset.Core().Secrets(namespace).Create(secret)
|
||||
By(fmt.Sprintf("Creating secret %q in namespace %q", secret.Name, nsName))
|
||||
_, err := clientset.Core().Secrets(nsName).Create(secret)
|
||||
framework.ExpectNoError(err, "Failed to create secret %s", secret.Name)
|
||||
By(fmt.Sprintf("Successfully created federated secret %q in namespace %q", FederatedSecretName, namespace))
|
||||
By(fmt.Sprintf("Successfully created federated secret %q in namespace %q", secret.Name, nsName))
|
||||
return secret
|
||||
}
|
||||
|
||||
func updateSecretOrFail(clientset *fedclientset.Clientset, namespace string) *v1.Secret {
|
||||
if clientset == nil || len(namespace) == 0 {
|
||||
Fail(fmt.Sprintf("Internal error: invalid parameters passed to updateSecretOrFail: clientset: %v, namespace: %v", clientset, namespace))
|
||||
func deleteSecretOrFail(clientset *fedclientset.Clientset, nsName string, secretName string, orphanDependents bool) {
|
||||
By(fmt.Sprintf("Deleting secret %q in namespace %q", secretName, nsName))
|
||||
err := clientset.Core().Secrets(nsName).Delete(secretName, &v1.DeleteOptions{OrphanDependents: &orphanDependents})
|
||||
framework.ExpectNoError(err, "Error deleting secret %q in namespace %q", secretName, nsName)
|
||||
|
||||
// Wait for the secret to be deleted.
|
||||
err = wait.Poll(5*time.Second, wait.ForeverTestTimeout, func() (bool, error) {
|
||||
_, err := clientset.Core().Secrets(nsName).Get(secretName)
|
||||
if err != nil && errors.IsNotFound(err) {
|
||||
return true, nil
|
||||
}
|
||||
return false, err
|
||||
})
|
||||
if err != nil {
|
||||
framework.Failf("Error in deleting secret %s: %v", secretName, err)
|
||||
}
|
||||
}
|
||||
|
||||
func updateSecretOrFail(clientset *fedclientset.Clientset, nsName string, secretName string) *v1.Secret {
|
||||
if clientset == nil || len(nsName) == 0 {
|
||||
Fail(fmt.Sprintf("Internal error: invalid parameters passed to updateSecretOrFail: clientset: %v, namespace: %v", clientset, nsName))
|
||||
}
|
||||
|
||||
var newSecret *v1.Secret
|
||||
for retryCount := 0; retryCount < MaxRetries; retryCount++ {
|
||||
secret, err := clientset.Core().Secrets(namespace).Get(FederatedSecretName)
|
||||
secret, err := clientset.Core().Secrets(nsName).Get(secretName)
|
||||
if err != nil {
|
||||
framework.Failf("failed to get secret %q: %v", FederatedSecretName, err)
|
||||
framework.Failf("failed to get secret %q: %v", secretName, err)
|
||||
}
|
||||
|
||||
// Update one of the data in the secret.
|
||||
secret.Data = map[string][]byte{
|
||||
"key": []byte("value"),
|
||||
}
|
||||
newSecret, err = clientset.Core().Secrets(namespace).Update(secret)
|
||||
newSecret, err = clientset.Core().Secrets(nsName).Update(secret)
|
||||
if err == nil {
|
||||
return newSecret
|
||||
}
|
||||
if !errors.IsConflict(err) && !errors.IsServerTimeout(err) {
|
||||
framework.Failf("failed to update secret %q: %v", FederatedSecretName, err)
|
||||
framework.Failf("failed to update secret %q: %v", secretName, err)
|
||||
}
|
||||
}
|
||||
framework.Failf("too many retries updating secret %q", FederatedSecretName)
|
||||
framework.Failf("too many retries updating secret %q", secretName)
|
||||
return newSecret
|
||||
}
|
||||
|
||||
func waitForSecretShardsOrFail(namespace string, secret *v1.Secret, clusters map[string]*cluster) {
|
||||
func waitForSecretShardsOrFail(nsName string, secret *v1.Secret, clusters map[string]*cluster) {
|
||||
framework.Logf("Waiting for secret %q in %d clusters", secret.Name, len(clusters))
|
||||
for _, c := range clusters {
|
||||
waitForSecretOrFail(c.Clientset, namespace, secret, true, FederatedSecretTimeout)
|
||||
waitForSecretOrFail(c.Clientset, nsName, secret, true, FederatedSecretTimeout)
|
||||
}
|
||||
}
|
||||
|
||||
func waitForSecretOrFail(clientset *kubeclientset.Clientset, namespace string, secret *v1.Secret, present bool, timeout time.Duration) {
|
||||
By(fmt.Sprintf("Fetching a federated secret shard of secret %q in namespace %q from cluster", secret.Name, namespace))
|
||||
func waitForSecretOrFail(clientset *kubeclientset.Clientset, nsName string, secret *v1.Secret, present bool, timeout time.Duration) {
|
||||
By(fmt.Sprintf("Fetching a federated secret shard of secret %q in namespace %q from cluster", secret.Name, nsName))
|
||||
var clusterSecret *v1.Secret
|
||||
err := wait.PollImmediate(framework.Poll, timeout, func() (bool, error) {
|
||||
clusterSecret, err := clientset.Core().Secrets(namespace).Get(secret.Name)
|
||||
clusterSecret, err := clientset.Core().Secrets(nsName).Get(secret.Name)
|
||||
if (!present) && errors.IsNotFound(err) { // We want it gone, and it's gone.
|
||||
By(fmt.Sprintf("Success: shard of federated secret %q in namespace %q in cluster is absent", secret.Name, namespace))
|
||||
By(fmt.Sprintf("Success: shard of federated secret %q in namespace %q in cluster is absent", secret.Name, nsName))
|
||||
return true, nil // Success
|
||||
}
|
||||
if present && err == nil { // We want it present, and the Get succeeded, so we're all good.
|
||||
By(fmt.Sprintf("Success: shard of federated secret %q in namespace %q in cluster is present", secret.Name, namespace))
|
||||
By(fmt.Sprintf("Success: shard of federated secret %q in namespace %q in cluster is present", secret.Name, nsName))
|
||||
return true, nil // Success
|
||||
}
|
||||
By(fmt.Sprintf("Secret %q in namespace %q in cluster. Found: %v, waiting for Found: %v, trying again in %s (err=%v)", secret.Name, namespace, clusterSecret != nil && err == nil, present, framework.Poll, err))
|
||||
By(fmt.Sprintf("Secret %q in namespace %q in cluster. Found: %v, waiting for Found: %v, trying again in %s (err=%v)", secret.Name, nsName, clusterSecret != nil && err == nil, present, framework.Poll, err))
|
||||
return false, nil
|
||||
})
|
||||
framework.ExpectNoError(err, "Failed to verify secret %q in namespace %q in cluster: Present=%v", secret.Name, namespace, present)
|
||||
framework.ExpectNoError(err, "Failed to verify secret %q in namespace %q in cluster: Present=%v", secret.Name, nsName, present)
|
||||
|
||||
if present && clusterSecret != nil {
|
||||
Expect(util.SecretEquivalent(*clusterSecret, *secret))
|
||||
}
|
||||
}
|
||||
|
||||
func waitForSecretShardsUpdatedOrFail(namespace string, secret *v1.Secret, clusters map[string]*cluster) {
|
||||
func waitForSecretShardsUpdatedOrFail(nsName string, secret *v1.Secret, clusters map[string]*cluster) {
|
||||
framework.Logf("Waiting for secret %q in %d clusters", secret.Name, len(clusters))
|
||||
for _, c := range clusters {
|
||||
waitForSecretUpdateOrFail(c.Clientset, namespace, secret, FederatedSecretTimeout)
|
||||
waitForSecretUpdateOrFail(c.Clientset, nsName, secret, FederatedSecretTimeout)
|
||||
}
|
||||
}
|
||||
|
||||
func waitForSecretUpdateOrFail(clientset *kubeclientset.Clientset, namespace string, secret *v1.Secret, timeout time.Duration) {
|
||||
By(fmt.Sprintf("Fetching a federated secret shard of secret %q in namespace %q from cluster", secret.Name, namespace))
|
||||
func waitForSecretUpdateOrFail(clientset *kubeclientset.Clientset, nsName string, secret *v1.Secret, timeout time.Duration) {
|
||||
By(fmt.Sprintf("Fetching a federated secret shard of secret %q in namespace %q from cluster", secret.Name, nsName))
|
||||
err := wait.PollImmediate(framework.Poll, timeout, func() (bool, error) {
|
||||
clusterSecret, err := clientset.Core().Secrets(namespace).Get(secret.Name)
|
||||
clusterSecret, err := clientset.Core().Secrets(nsName).Get(secret.Name)
|
||||
if err == nil { // We want it present, and the Get succeeded, so we're all good.
|
||||
if util.SecretEquivalent(*clusterSecret, *secret) {
|
||||
By(fmt.Sprintf("Success: shard of federated secret %q in namespace %q in cluster is updated", secret.Name, namespace))
|
||||
By(fmt.Sprintf("Success: shard of federated secret %q in namespace %q in cluster is updated", secret.Name, nsName))
|
||||
return true, nil
|
||||
} else {
|
||||
By(fmt.Sprintf("Expected equal secrets. expected: %+v\nactual: %+v", *secret, *clusterSecret))
|
||||
}
|
||||
By(fmt.Sprintf("Secret %q in namespace %q in cluster, waiting for secret being updated, trying again in %s (err=%v)", secret.Name, namespace, framework.Poll, err))
|
||||
By(fmt.Sprintf("Secret %q in namespace %q in cluster, waiting for secret being updated, trying again in %s (err=%v)", secret.Name, nsName, framework.Poll, err))
|
||||
return false, nil
|
||||
}
|
||||
By(fmt.Sprintf("Secret %q in namespace %q in cluster, waiting for being updated, trying again in %s (err=%v)", secret.Name, namespace, framework.Poll, err))
|
||||
By(fmt.Sprintf("Secret %q in namespace %q in cluster, waiting for being updated, trying again in %s (err=%v)", secret.Name, nsName, framework.Poll, err))
|
||||
return false, nil
|
||||
})
|
||||
framework.ExpectNoError(err, "Failed to verify secret %q in namespace %q in cluster", secret.Name, namespace)
|
||||
framework.ExpectNoError(err, "Failed to verify secret %q in namespace %q in cluster", secret.Name, nsName)
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user