mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-21 02:41:25 +00:00
250 lines
9.1 KiB
Go
250 lines
9.1 KiB
Go
/*
|
|
Copyright 2020 The Kubernetes Authors.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package storageversion
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"strings"
|
|
"testing"
|
|
"time"
|
|
|
|
apiserverinternalv1alpha1 "k8s.io/api/apiserverinternal/v1alpha1"
|
|
coordinationv1 "k8s.io/api/coordination/v1"
|
|
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
"k8s.io/apimachinery/pkg/util/wait"
|
|
"k8s.io/apiserver/pkg/features"
|
|
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
|
"k8s.io/client-go/informers"
|
|
"k8s.io/client-go/kubernetes"
|
|
featuregatetesting "k8s.io/component-base/featuregate/testing"
|
|
"k8s.io/klog/v2/ktesting"
|
|
kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
|
|
"k8s.io/kubernetes/pkg/controller/storageversiongc"
|
|
"k8s.io/kubernetes/pkg/controlplane"
|
|
"k8s.io/kubernetes/test/integration/framework"
|
|
"k8s.io/utils/pointer"
|
|
)
|
|
|
|
const (
|
|
svName = "storageversion.integration.test.foos"
|
|
idA = "id-1"
|
|
idB = "id-2"
|
|
idNonExist = "id-non-exist"
|
|
)
|
|
|
|
func TestStorageVersionGarbageCollection(t *testing.T) {
|
|
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.APIServerIdentity, true)()
|
|
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.StorageVersionAPI, true)()
|
|
result := kubeapiservertesting.StartTestServerOrDie(t, nil, nil, framework.SharedEtcd())
|
|
defer result.TearDownFn()
|
|
|
|
kubeclient, err := kubernetes.NewForConfig(result.ClientConfig)
|
|
if err != nil {
|
|
t.Fatalf("Unexpected error: %v", err)
|
|
}
|
|
|
|
informers := informers.NewSharedInformerFactory(kubeclient, time.Second)
|
|
leaseInformer := informers.Coordination().V1().Leases()
|
|
storageVersionInformer := informers.Internal().V1alpha1().StorageVersions()
|
|
|
|
_, ctx := ktesting.NewTestContext(t)
|
|
controller := storageversiongc.NewStorageVersionGC(ctx, kubeclient, leaseInformer, storageVersionInformer)
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
go leaseInformer.Informer().Run(ctx.Done())
|
|
go storageVersionInformer.Informer().Run(ctx.Done())
|
|
go controller.Run(ctx)
|
|
|
|
createTestAPIServerIdentityLease(t, kubeclient, idA)
|
|
createTestAPIServerIdentityLease(t, kubeclient, idB)
|
|
|
|
t.Run("storage version with non-existing id should be GC'ed", func(t *testing.T) {
|
|
createTestStorageVersion(t, kubeclient, idNonExist)
|
|
assertStorageVersionDeleted(t, kubeclient)
|
|
})
|
|
|
|
t.Run("storage version with valid id should not be GC'ed", func(t *testing.T) {
|
|
createTestStorageVersion(t, kubeclient, idA)
|
|
time.Sleep(10 * time.Second)
|
|
sv, err := kubeclient.InternalV1alpha1().StorageVersions().Get(
|
|
context.TODO(), svName, metav1.GetOptions{})
|
|
if err != nil {
|
|
t.Fatalf("failed to retrieve valid storage version: %v", err)
|
|
}
|
|
if len(sv.Status.StorageVersions) != 1 {
|
|
t.Errorf("unexpected number of storage version entries, expected 1, got: %v",
|
|
sv.Status.StorageVersions)
|
|
}
|
|
expectedID := idA
|
|
if sv.Status.StorageVersions[0].APIServerID != expectedID {
|
|
t.Errorf("unexpected storage version entry id, expected %v, got: %v",
|
|
expectedID, sv.Status.StorageVersions[0].APIServerID)
|
|
}
|
|
assertCommonEncodingVersion(t, kubeclient, pointer.String(idToVersion(t, idA)))
|
|
if err := kubeclient.InternalV1alpha1().StorageVersions().Delete(
|
|
context.TODO(), svName, metav1.DeleteOptions{}); err != nil {
|
|
t.Fatalf("failed to cleanup valid storage version: %v", err)
|
|
}
|
|
})
|
|
|
|
t.Run("deleting an id should delete a storage version entry that it owns", func(t *testing.T) {
|
|
createTestStorageVersion(t, kubeclient, idA, idB)
|
|
assertStorageVersionEntries(t, kubeclient, 2, idA)
|
|
assertCommonEncodingVersion(t, kubeclient, nil)
|
|
deleteTestAPIServerIdentityLease(t, kubeclient, idA)
|
|
assertStorageVersionEntries(t, kubeclient, 1, idB)
|
|
assertCommonEncodingVersion(t, kubeclient, pointer.String(idToVersion(t, idB)))
|
|
})
|
|
|
|
t.Run("deleting an id should delete a storage version object that it owns entirely", func(t *testing.T) {
|
|
deleteTestAPIServerIdentityLease(t, kubeclient, idB)
|
|
assertStorageVersionDeleted(t, kubeclient)
|
|
})
|
|
}
|
|
|
|
func createTestStorageVersion(t *testing.T, client kubernetes.Interface, ids ...string) {
|
|
sv := &apiserverinternalv1alpha1.StorageVersion{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: svName,
|
|
},
|
|
}
|
|
for _, id := range ids {
|
|
version := idToVersion(t, id)
|
|
v := apiserverinternalv1alpha1.ServerStorageVersion{
|
|
APIServerID: id,
|
|
EncodingVersion: version,
|
|
DecodableVersions: []string{version},
|
|
}
|
|
sv.Status.StorageVersions = append(sv.Status.StorageVersions, v)
|
|
}
|
|
// every id is unique and creates a different version. We know we have a common encoding
|
|
// version when there is only one id. Pick it
|
|
if len(ids) == 1 {
|
|
sv.Status.CommonEncodingVersion = pointer.String(sv.Status.StorageVersions[0].EncodingVersion)
|
|
}
|
|
|
|
createdSV, err := client.InternalV1alpha1().StorageVersions().Create(context.TODO(), sv, metav1.CreateOptions{})
|
|
if err != nil {
|
|
t.Fatalf("failed to create storage version %s: %v", svName, err)
|
|
}
|
|
// update the created sv with intended status
|
|
createdSV.Status = sv.Status
|
|
if _, err := client.InternalV1alpha1().StorageVersions().UpdateStatus(
|
|
context.TODO(), createdSV, metav1.UpdateOptions{}); err != nil {
|
|
t.Fatalf("failed to update store version status: %v", err)
|
|
}
|
|
}
|
|
|
|
func assertStorageVersionDeleted(t *testing.T, client kubernetes.Interface) {
|
|
if err := wait.PollImmediate(100*time.Millisecond, 10*time.Second, func() (bool, error) {
|
|
_, err := client.InternalV1alpha1().StorageVersions().Get(
|
|
context.TODO(), svName, metav1.GetOptions{})
|
|
if apierrors.IsNotFound(err) {
|
|
return true, nil
|
|
}
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
return false, nil
|
|
}); err != nil {
|
|
t.Fatalf("failed to wait for storageversion garbage collection: %v", err)
|
|
}
|
|
}
|
|
|
|
func createTestAPIServerIdentityLease(t *testing.T, client kubernetes.Interface, name string) {
|
|
lease := &coordinationv1.Lease{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: name,
|
|
Namespace: metav1.NamespaceSystem,
|
|
Labels: map[string]string{
|
|
controlplane.IdentityLeaseComponentLabelKey: controlplane.KubeAPIServer,
|
|
},
|
|
},
|
|
Spec: coordinationv1.LeaseSpec{
|
|
HolderIdentity: pointer.String(name),
|
|
LeaseDurationSeconds: pointer.Int32(3600),
|
|
// create fresh leases
|
|
AcquireTime: &metav1.MicroTime{Time: time.Now()},
|
|
RenewTime: &metav1.MicroTime{Time: time.Now()},
|
|
},
|
|
}
|
|
if _, err := client.CoordinationV1().Leases(metav1.NamespaceSystem).Create(
|
|
context.TODO(), lease, metav1.CreateOptions{}); err != nil {
|
|
t.Fatalf("failed to create apiserver identity lease %s: %v", name, err)
|
|
}
|
|
}
|
|
|
|
func deleteTestAPIServerIdentityLease(t *testing.T, client kubernetes.Interface, name string) {
|
|
if err := client.CoordinationV1().Leases(metav1.NamespaceSystem).Delete(
|
|
context.TODO(), name, metav1.DeleteOptions{}); err != nil {
|
|
t.Fatalf("failed to delete apiserver identity lease %s: %v", name, err)
|
|
}
|
|
}
|
|
|
|
func assertStorageVersionEntries(t *testing.T, client kubernetes.Interface,
|
|
numEntries int, firstID string) {
|
|
var lastErr error
|
|
if err := wait.PollImmediate(100*time.Millisecond, 10*time.Second, func() (bool, error) {
|
|
sv, err := client.InternalV1alpha1().StorageVersions().Get(
|
|
context.TODO(), svName, metav1.GetOptions{})
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
if len(sv.Status.StorageVersions) != numEntries {
|
|
lastErr = fmt.Errorf("unexpected number of storage version entries, expected %v, got: %v",
|
|
numEntries, len(sv.Status.StorageVersions))
|
|
return false, nil
|
|
}
|
|
if sv.Status.StorageVersions[0].APIServerID != firstID {
|
|
lastErr = fmt.Errorf("unexpected first storage version entry id, expected %v, got: %v",
|
|
firstID, sv.Status.StorageVersions[0].APIServerID)
|
|
return false, nil
|
|
}
|
|
return true, nil
|
|
}); err != nil {
|
|
t.Fatalf("failed to get expected storage verion entries: %v, last error: %v", err, lastErr)
|
|
}
|
|
}
|
|
|
|
func assertCommonEncodingVersion(t *testing.T, client kubernetes.Interface, e *string) {
|
|
sv, err := client.InternalV1alpha1().StorageVersions().Get(
|
|
context.TODO(), svName, metav1.GetOptions{})
|
|
if err != nil {
|
|
t.Fatalf("failed to retrieve storage version: %v", err)
|
|
}
|
|
if e == nil {
|
|
if sv.Status.CommonEncodingVersion != nil {
|
|
t.Errorf("unexpected non-nil common encoding version: %v", sv.Status.CommonEncodingVersion)
|
|
}
|
|
return
|
|
}
|
|
if sv.Status.CommonEncodingVersion == nil || *sv.Status.CommonEncodingVersion != *e {
|
|
t.Errorf("unexpected common encoding version, expected: %v, got %v", e, sv.Status.CommonEncodingVersion)
|
|
}
|
|
}
|
|
|
|
func idToVersion(t *testing.T, id string) string {
|
|
// TODO(roycaihw): rewrite the test, use a id-version table
|
|
if !strings.HasPrefix(id, "id-") {
|
|
t.Fatalf("should not happen: test using id without id- prefix: %s", id)
|
|
}
|
|
return fmt.Sprintf("v%s", strings.TrimPrefix(id, "id-"))
|
|
}
|