Merge pull request #95978 from roycaihw/storage-version/gc

Storage version garbage collector
This commit is contained in:
Kubernetes Prow Robot 2020-11-12 18:36:37 -08:00 committed by GitHub
commit da75c26648
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 677 additions and 6 deletions

View File

@ -70,6 +70,7 @@ go_library(
"//pkg/controller/resourcequota:go_default_library",
"//pkg/controller/serviceaccount:go_default_library",
"//pkg/controller/statefulset:go_default_library",
"//pkg/controller/storageversiongc:go_default_library",
"//pkg/controller/ttl:go_default_library",
"//pkg/controller/ttlafterfinished:go_default_library",
"//pkg/controller/volume/attachdetach:go_default_library",
@ -112,6 +113,7 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/uuid:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/features:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/quota/v1/generic:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/server:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/server/healthz:go_default_library",

View File

@ -36,9 +36,11 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/apimachinery/pkg/util/wait"
genericfeatures "k8s.io/apiserver/pkg/features"
"k8s.io/apiserver/pkg/server"
"k8s.io/apiserver/pkg/server/healthz"
"k8s.io/apiserver/pkg/server/mux"
utilfeature "k8s.io/apiserver/pkg/util/feature"
cacheddiscovery "k8s.io/client-go/discovery/cached"
"k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes"
@ -424,6 +426,10 @@ func NewControllerInitializers(loopMode ControllerLoopMode) map[string]InitFunc
controllers["ttl-after-finished"] = startTTLAfterFinishedController
controllers["root-ca-cert-publisher"] = startRootCACertPublisher
controllers["ephemeral-volume"] = startEphemeralVolumeController
if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIServerIdentity) &&
utilfeature.DefaultFeatureGate.Enabled(genericfeatures.StorageVersionAPI) {
controllers["storage-version-gc"] = startStorageVersionGCController
}
return controllers
}

View File

@ -56,6 +56,7 @@ import (
replicationcontroller "k8s.io/kubernetes/pkg/controller/replication"
resourcequotacontroller "k8s.io/kubernetes/pkg/controller/resourcequota"
serviceaccountcontroller "k8s.io/kubernetes/pkg/controller/serviceaccount"
"k8s.io/kubernetes/pkg/controller/storageversiongc"
ttlcontroller "k8s.io/kubernetes/pkg/controller/ttl"
"k8s.io/kubernetes/pkg/controller/ttlafterfinished"
"k8s.io/kubernetes/pkg/controller/volume/attachdetach"
@ -674,3 +675,12 @@ func getNodeCIDRMaskSizes(clusterCIDRs []*net.IPNet, maskSizeIPv4, maskSizeIPv6
}
return nodeMaskCIDRs
}
func startStorageVersionGCController(ctx ControllerContext) (http.Handler, bool, error) {
go storageversiongc.NewStorageVersionGC(
ctx.ClientBuilder.ClientOrDie("storage-version-garbage-collector"),
ctx.InformerFactory.Coordination().V1().Leases(),
ctx.InformerFactory.Internal().V1alpha1().StorageVersions(),
).Run(ctx.Stop)
return nil, true, nil
}

View File

@ -119,6 +119,7 @@ filegroup(
"//pkg/controller/resourcequota:all-srcs",
"//pkg/controller/serviceaccount:all-srcs",
"//pkg/controller/statefulset:all-srcs",
"//pkg/controller/storageversiongc:all-srcs",
"//pkg/controller/testutil:all-srcs",
"//pkg/controller/ttl:all-srcs",
"//pkg/controller/ttlafterfinished:all-srcs",

View File

@ -0,0 +1,39 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library(
name = "go_default_library",
srcs = ["gc_controller.go"],
importpath = "k8s.io/kubernetes/pkg/controller/storageversiongc",
visibility = ["//visibility:public"],
deps = [
"//pkg/controlplane:go_default_library",
"//staging/src/k8s.io/api/apiserverinternal/v1alpha1:go_default_library",
"//staging/src/k8s.io/api/coordination/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/client-go/informers/apiserverinternal/v1alpha1:go_default_library",
"//staging/src/k8s.io/client-go/informers/coordination/v1:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/listers/coordination/v1:go_default_library",
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
"//staging/src/k8s.io/client-go/util/workqueue:go_default_library",
"//vendor/k8s.io/klog/v2:go_default_library",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)

View File

@ -0,0 +1,297 @@
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package storageversiongc
import (
"context"
"fmt"
"time"
apiserverinternalv1alpha1 "k8s.io/api/apiserverinternal/v1alpha1"
coordinationv1 "k8s.io/api/coordination/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
apiserverinternalinformers "k8s.io/client-go/informers/apiserverinternal/v1alpha1"
coordinformers "k8s.io/client-go/informers/coordination/v1"
"k8s.io/client-go/kubernetes"
coordlisters "k8s.io/client-go/listers/coordination/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/kubernetes/pkg/controlplane"
"k8s.io/klog/v2"
)
// Controller watches kube-apiserver leases and storageversions, and delete stale
// storage version entries and objects.
type Controller struct {
kubeclientset kubernetes.Interface
leaseLister coordlisters.LeaseLister
leasesSynced cache.InformerSynced
storageVersionSynced cache.InformerSynced
leaseQueue workqueue.RateLimitingInterface
storageVersionQueue workqueue.RateLimitingInterface
}
// NewStorageVersionGC creates a new Controller.
func NewStorageVersionGC(clientset kubernetes.Interface, leaseInformer coordinformers.LeaseInformer, storageVersionInformer apiserverinternalinformers.StorageVersionInformer) *Controller {
c := &Controller{
kubeclientset: clientset,
leaseLister: leaseInformer.Lister(),
leasesSynced: leaseInformer.Informer().HasSynced,
storageVersionSynced: storageVersionInformer.Informer().HasSynced,
leaseQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "storage_version_garbage_collector_leases"),
storageVersionQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "storage_version_garbage_collector_storageversions"),
}
leaseInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
DeleteFunc: c.onDeleteLease,
})
// use the default resync period from the informer
storageVersionInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: c.onAddStorageVersion,
UpdateFunc: c.onUpdateStorageVersion,
})
return c
}
// Run starts one worker.
func (c *Controller) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
defer c.leaseQueue.ShutDown()
defer c.storageVersionQueue.ShutDown()
defer klog.Infof("Shutting down storage version garbage collector")
klog.Infof("Starting storage version garbage collector")
if !cache.WaitForCacheSync(stopCh, c.leasesSynced, c.storageVersionSynced) {
utilruntime.HandleError(fmt.Errorf("timed out waiting for caches to sync"))
return
}
// Identity lease deletion and storageversion update don't happen too often. Start one
// worker for each of them.
// runLeaseWorker handles legit identity lease deletion, while runStorageVersionWorker
// handles storageversion creation/update with non-existing id. The latter should rarely
// happen. It's okay for the two workers to conflict on update.
go wait.Until(c.runLeaseWorker, time.Second, stopCh)
go wait.Until(c.runStorageVersionWorker, time.Second, stopCh)
<-stopCh
}
func (c *Controller) runLeaseWorker() {
for c.processNextLease() {
}
}
func (c *Controller) processNextLease() bool {
key, quit := c.leaseQueue.Get()
if quit {
return false
}
defer c.leaseQueue.Done(key)
err := c.processDeletedLease(key.(string))
if err == nil {
c.leaseQueue.Forget(key)
return true
}
utilruntime.HandleError(fmt.Errorf("lease %v failed with: %v", key, err))
c.leaseQueue.AddRateLimited(key)
return true
}
func (c *Controller) runStorageVersionWorker() {
for c.processNextStorageVersion() {
}
}
func (c *Controller) processNextStorageVersion() bool {
key, quit := c.storageVersionQueue.Get()
if quit {
return false
}
defer c.storageVersionQueue.Done(key)
err := c.syncStorageVersion(key.(string))
if err == nil {
c.storageVersionQueue.Forget(key)
return true
}
utilruntime.HandleError(fmt.Errorf("storage version %v failed with: %v", key, err))
c.storageVersionQueue.AddRateLimited(key)
return true
}
func (c *Controller) processDeletedLease(name string) error {
_, err := c.kubeclientset.CoordinationV1().Leases(metav1.NamespaceSystem).Get(context.TODO(), name, metav1.GetOptions{})
// the lease isn't deleted, nothing we need to do here
if err == nil {
return nil
}
if !apierrors.IsNotFound(err) {
return err
}
// the frequency of this call won't be too high because we only trigger on identity lease deletions
storageVersionList, err := c.kubeclientset.InternalV1alpha1().StorageVersions().List(context.TODO(), metav1.ListOptions{})
if err != nil {
return err
}
var errors []error
for _, sv := range storageVersionList.Items {
var serverStorageVersions []apiserverinternalv1alpha1.ServerStorageVersion
hasStaleRecord := false
for _, ssv := range sv.Status.StorageVersions {
if ssv.APIServerID == name {
hasStaleRecord = true
continue
}
serverStorageVersions = append(serverStorageVersions, ssv)
}
if !hasStaleRecord {
continue
}
if err := c.updateOrDeleteStorageVersion(&sv, serverStorageVersions); err != nil {
errors = append(errors, err)
}
}
return utilerrors.NewAggregate(errors)
}
func (c *Controller) syncStorageVersion(name string) error {
sv, err := c.kubeclientset.InternalV1alpha1().StorageVersions().Get(context.TODO(), name, metav1.GetOptions{})
if apierrors.IsNotFound(err) {
// The problematic storage version that was added/updated recently is gone.
// Nothing we need to do here.
return nil
}
if err != nil {
return err
}
hasInvalidID := false
var serverStorageVersions []apiserverinternalv1alpha1.ServerStorageVersion
for _, v := range sv.Status.StorageVersions {
lease, err := c.kubeclientset.CoordinationV1().Leases(metav1.NamespaceSystem).Get(context.TODO(), v.APIServerID, metav1.GetOptions{})
if err != nil || lease == nil || lease.Labels == nil ||
lease.Labels[controlplane.IdentityLeaseComponentLabelKey] != controlplane.KubeAPIServer {
// We cannot find a corresponding identity lease from apiserver as well.
// We need to clean up this storage version.
hasInvalidID = true
continue
}
serverStorageVersions = append(serverStorageVersions, v)
}
if !hasInvalidID {
return nil
}
return c.updateOrDeleteStorageVersion(sv, serverStorageVersions)
}
func (c *Controller) onAddStorageVersion(obj interface{}) {
castObj := obj.(*apiserverinternalv1alpha1.StorageVersion)
c.enqueueStorageVersion(castObj)
}
func (c *Controller) onUpdateStorageVersion(oldObj, newObj interface{}) {
castNewObj := newObj.(*apiserverinternalv1alpha1.StorageVersion)
c.enqueueStorageVersion(castNewObj)
}
// enqueueStorageVersion enqueues the storage version if it has entry for invalid apiserver
func (c *Controller) enqueueStorageVersion(obj *apiserverinternalv1alpha1.StorageVersion) {
for _, sv := range obj.Status.StorageVersions {
lease, err := c.leaseLister.Leases(metav1.NamespaceSystem).Get(sv.APIServerID)
if err != nil || lease == nil || lease.Labels == nil ||
lease.Labels[controlplane.IdentityLeaseComponentLabelKey] != controlplane.KubeAPIServer {
// we cannot find a corresponding identity lease in cache, enqueue the storageversion
klog.V(4).Infof("Observed storage version %s with invalid apiserver entry", obj.Name)
c.storageVersionQueue.Add(obj.Name)
return
}
}
}
func (c *Controller) onDeleteLease(obj interface{}) {
castObj, ok := obj.(*coordinationv1.Lease)
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %#v", obj))
return
}
castObj, ok = tombstone.Obj.(*coordinationv1.Lease)
if !ok {
utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a Lease %#v", obj))
return
}
}
if castObj.Namespace == metav1.NamespaceSystem &&
castObj.Labels != nil &&
castObj.Labels[controlplane.IdentityLeaseComponentLabelKey] == controlplane.KubeAPIServer {
klog.V(4).Infof("Observed lease %s deleted", castObj.Name)
c.enqueueLease(castObj)
}
}
func (c *Controller) enqueueLease(obj *coordinationv1.Lease) {
c.leaseQueue.Add(obj.Name)
}
func setCommonEncodingVersion(sv *apiserverinternalv1alpha1.StorageVersion) {
if len(sv.Status.StorageVersions) == 0 {
return
}
firstVersion := sv.Status.StorageVersions[0].EncodingVersion
agreed := true
for _, ssv := range sv.Status.StorageVersions {
if ssv.EncodingVersion != firstVersion {
agreed = false
break
}
}
if agreed {
sv.Status.CommonEncodingVersion = &firstVersion
} else {
sv.Status.CommonEncodingVersion = nil
}
}
func (c *Controller) updateOrDeleteStorageVersion(sv *apiserverinternalv1alpha1.StorageVersion, serverStorageVersions []apiserverinternalv1alpha1.ServerStorageVersion) error {
if len(serverStorageVersions) == 0 {
return c.kubeclientset.InternalV1alpha1().StorageVersions().Delete(
context.TODO(), sv.Name, metav1.DeleteOptions{})
}
sv.Status.StorageVersions = serverStorageVersions
setCommonEncodingVersion(sv)
_, err := c.kubeclientset.InternalV1alpha1().StorageVersions().UpdateStatus(
context.TODO(), sv, metav1.UpdateOptions{})
return err
}

View File

@ -22,8 +22,10 @@ import (
"net/http"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apiserver/pkg/authentication/user"
"k8s.io/apiserver/pkg/endpoints/handlers/responsewriters"
"k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/storageversion"
@ -36,7 +38,9 @@ import (
// 1. non-resource requests,
// 2. read requests,
// 3. write requests to the storageversion API,
// 4. resources whose StorageVersion is not pending update, including non-persisted resources.
// 4. create requests to the namespace API sent by apiserver itself,
// 5. write requests to the lease API in kube-system namespace,
// 6. resources whose StorageVersion is not pending update, including non-persisted resources.
func WithStorageVersionPrecondition(handler http.Handler, svm storageversion.Manager, s runtime.NegotiatedSerializer) http.Handler {
if svm == nil {
// TODO(roycaihw): switch to warning after the feature graduate to beta/GA
@ -69,6 +73,31 @@ func WithStorageVersionPrecondition(handler http.Handler, svm storageversion.Man
handler.ServeHTTP(w, req)
return
}
// The system namespace is required for apiserver-identity lease to exist. Allow the apiserver
// itself to create namespaces.
// NOTE: with this exception, if the bootstrap client writes namespaces with a new version,
// and the upgraded apiserver dies before updating the StorageVersion for namespaces, the
// storage migrator won't be able to tell these namespaces are stored in a different version in etcd.
// Because the bootstrap client only creates system namespace and doesn't update them, this can
// only happen if the upgraded apiserver is the first apiserver that kicks off namespace creation,
// or if an upgraded server that joins an existing cluster has new system namespaces (other
// than kube-system, kube-public, kube-node-lease) that need to be created.
u, hasUser := request.UserFrom(ctx)
if requestInfo.APIGroup == "" && requestInfo.Resource == "namespaces" &&
requestInfo.Verb == "create" && hasUser &&
u.GetName() == user.APIServerUser && contains(u.GetGroups(), user.SystemPrivilegedGroup) {
handler.ServeHTTP(w, req)
return
}
// Allow writes to the lease API in kube-system. The storage version API depends on the
// apiserver-identity leases to operate. Leases in kube-system are either apiserver-identity
// lease (which gets garbage collected when stale) or leader-election leases (which gets
// periodically updated by system components). Both types of leases won't be stale in etcd.
if requestInfo.APIGroup == "coordination.k8s.io" && requestInfo.Resource == "leases" &&
requestInfo.Namespace == metav1.NamespaceSystem {
handler.ServeHTTP(w, req)
return
}
// If the resource's StorageVersion is not in the to-be-updated list, let it pass.
// Non-persisted resources are not in the to-be-updated list, so they will pass.
gr := schema.GroupResource{requestInfo.APIGroup, requestInfo.Resource}
@ -81,3 +110,12 @@ func WithStorageVersionPrecondition(handler http.Handler, svm storageversion.Man
responsewriters.ErrorNegotiated(apierrors.NewServiceUnavailable(fmt.Sprintf("wait for storage version registration to complete for resource: %v, last seen error: %v", gr, svm.LastUpdateError(gr))), s, gv, w, req)
})
}
func contains(s []string, e string) bool {
for _, a := range s {
if a == e {
return true
}
}
return false
}

View File

@ -68,6 +68,7 @@ go_library(
"//staging/src/k8s.io/apiserver/pkg/server/storage:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/proxy:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/listers/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/pkg/version:go_default_library",
"//staging/src/k8s.io/client-go/rest:go_default_library",

View File

@ -17,10 +17,12 @@ limitations under the License.
package apiserver
import (
"context"
"fmt"
"net/http"
"time"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/sets"
@ -30,6 +32,7 @@ import (
"k8s.io/apiserver/pkg/server/egressselector"
serverstorage "k8s.io/apiserver/pkg/server/storage"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/pkg/version"
openapicommon "k8s.io/kube-openapi/pkg/common"
@ -271,7 +274,28 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg
utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIServerIdentity) {
// Spawn a goroutine in aggregator apiserver to update storage version for
// all built-in resources
s.GenericAPIServer.AddPostStartHookOrDie("built-in-resources-storage-version-updater", func(context genericapiserver.PostStartHookContext) error {
s.GenericAPIServer.AddPostStartHookOrDie("built-in-resources-storage-version-updater", func(hookContext genericapiserver.PostStartHookContext) error {
// Wait for apiserver-identity to exist first before updating storage
// versions, to avoid storage version GC accidentally garbage-collecting
// storage versions.
kubeClient, err := kubernetes.NewForConfig(hookContext.LoopbackClientConfig)
if err != nil {
return err
}
if err := wait.PollImmediateUntil(100*time.Millisecond, func() (bool, error) {
_, err := kubeClient.CoordinationV1().Leases(metav1.NamespaceSystem).Get(
context.TODO(), s.GenericAPIServer.APIServerID, metav1.GetOptions{})
if apierrors.IsNotFound(err) {
return false, nil
}
if err != nil {
return false, err
}
return true, nil
}, hookContext.StopCh); err != nil {
return fmt.Errorf("failed to wait for apiserver-identity lease %s to be created: %v",
s.GenericAPIServer.APIServerID, err)
}
// Technically an apiserver only needs to update storage version once during bootstrap.
// Reconcile StorageVersion objects every 10 minutes will help in the case that the
// StorageVersion objects get accidentally modified/deleted by a different agent. In that
@ -282,16 +306,16 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg
// All apiservers (aggregator-apiserver, kube-apiserver, apiextensions-apiserver)
// share the same generic apiserver config. The same StorageVersion manager is used
// to register all built-in resources when the generic apiservers install APIs.
s.GenericAPIServer.StorageVersionManager.UpdateStorageVersions(context.LoopbackClientConfig, s.GenericAPIServer.APIServerID)
s.GenericAPIServer.StorageVersionManager.UpdateStorageVersions(hookContext.LoopbackClientConfig, s.GenericAPIServer.APIServerID)
return false, nil
}, context.StopCh)
}, hookContext.StopCh)
// Once the storage version updater finishes the first round of update,
// the PostStartHook will return to unblock /healthz. The handler chain
// won't block write requests anymore. Check every second since it's not
// expensive.
wait.PollImmediateUntil(1*time.Second, func() (bool, error) {
return s.GenericAPIServer.StorageVersionManager.Completed(), nil
}, context.StopCh)
}, hookContext.StopCh)
return nil
})
}

View File

@ -3,14 +3,18 @@ load("@io_bazel_rules_go//go:def.bzl", "go_test")
go_test(
name = "go_default_test",
srcs = [
"gc_test.go",
"storage_version_filter_test.go",
"storage_version_main_test.go",
],
tags = ["integration"],
deps = [
"//cmd/kube-apiserver/app/testing:go_default_library",
"//pkg/controller/storageversiongc:go_default_library",
"//pkg/controlplane:go_default_library",
"//staging/src/k8s.io/api/apiserverinternal/v1alpha1:go_default_library",
"//staging/src/k8s.io/api/authentication/v1:go_default_library",
"//staging/src/k8s.io/api/coordination/v1:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
@ -22,6 +26,7 @@ go_test(
"//staging/src/k8s.io/apiserver/pkg/storageversion:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/client-go/dynamic:go_default_library",
"//staging/src/k8s.io/client-go/informers:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/rest:go_default_library",
"//staging/src/k8s.io/component-base/featuregate/testing:go_default_library",
@ -29,6 +34,7 @@ go_test(
"//staging/src/k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset:go_default_library",
"//test/integration/etcd:go_default_library",
"//test/integration/framework:go_default_library",
"//vendor/k8s.io/utils/pointer:go_default_library",
],
)

View File

@ -0,0 +1,247 @@
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package storageversion
import (
"context"
"fmt"
"strings"
"testing"
"time"
apiserverinternalv1alpha1 "k8s.io/api/apiserverinternal/v1alpha1"
coordinationv1 "k8s.io/api/coordination/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/features"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
featuregatetesting "k8s.io/component-base/featuregate/testing"
kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
"k8s.io/kubernetes/pkg/controller/storageversiongc"
"k8s.io/kubernetes/pkg/controlplane"
"k8s.io/kubernetes/test/integration/framework"
"k8s.io/utils/pointer"
)
const (
svName = "storageversion.integration.test.foos"
idA = "id-1"
idB = "id-2"
idNonExist = "id-non-exist"
)
func TestStorageVersionGarbageCollection(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.APIServerIdentity, true)()
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.StorageVersionAPI, true)()
result := kubeapiservertesting.StartTestServerOrDie(t, nil, nil, framework.SharedEtcd())
defer result.TearDownFn()
kubeclient, err := kubernetes.NewForConfig(result.ClientConfig)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
informers := informers.NewSharedInformerFactory(kubeclient, time.Second)
leaseInformer := informers.Coordination().V1().Leases()
storageVersionInformer := informers.Internal().V1alpha1().StorageVersions()
controller := storageversiongc.NewStorageVersionGC(kubeclient, leaseInformer, storageVersionInformer)
stopCh := make(chan struct{})
defer close(stopCh)
go leaseInformer.Informer().Run(stopCh)
go storageVersionInformer.Informer().Run(stopCh)
go controller.Run(stopCh)
createTestAPIServerIdentityLease(t, kubeclient, idA)
createTestAPIServerIdentityLease(t, kubeclient, idB)
t.Run("storage version with non-existing id should be GC'ed", func(t *testing.T) {
createTestStorageVersion(t, kubeclient, idNonExist)
assertStorageVersionDeleted(t, kubeclient)
})
t.Run("storage version with valid id should not be GC'ed", func(t *testing.T) {
createTestStorageVersion(t, kubeclient, idA)
time.Sleep(10 * time.Second)
sv, err := kubeclient.InternalV1alpha1().StorageVersions().Get(
context.TODO(), svName, metav1.GetOptions{})
if err != nil {
t.Fatalf("failed to retrieve valid storage version: %v", err)
}
if len(sv.Status.StorageVersions) != 1 {
t.Errorf("unexpected number of storage version entries, expected 1, got: %v",
sv.Status.StorageVersions)
}
expectedID := idA
if sv.Status.StorageVersions[0].APIServerID != expectedID {
t.Errorf("unexpected storage version entry id, expected %v, got: %v",
expectedID, sv.Status.StorageVersions[0].APIServerID)
}
assertCommonEncodingVersion(t, kubeclient, pointer.StringPtr(idToVersion(t, idA)))
if err := kubeclient.InternalV1alpha1().StorageVersions().Delete(
context.TODO(), svName, metav1.DeleteOptions{}); err != nil {
t.Fatalf("failed to cleanup valid storage version: %v", err)
}
})
t.Run("deleting an id should delete a storage version entry that it owns", func(t *testing.T) {
createTestStorageVersion(t, kubeclient, idA, idB)
assertStorageVersionEntries(t, kubeclient, 2, idA)
assertCommonEncodingVersion(t, kubeclient, nil)
deleteTestAPIServerIdentityLease(t, kubeclient, idA)
assertStorageVersionEntries(t, kubeclient, 1, idB)
assertCommonEncodingVersion(t, kubeclient, pointer.StringPtr(idToVersion(t, idB)))
})
t.Run("deleting an id should delete a storage version object that it owns entirely", func(t *testing.T) {
deleteTestAPIServerIdentityLease(t, kubeclient, idB)
assertStorageVersionDeleted(t, kubeclient)
})
}
func createTestStorageVersion(t *testing.T, client kubernetes.Interface, ids ...string) {
sv := &apiserverinternalv1alpha1.StorageVersion{
ObjectMeta: metav1.ObjectMeta{
Name: svName,
},
}
for _, id := range ids {
version := idToVersion(t, id)
v := apiserverinternalv1alpha1.ServerStorageVersion{
APIServerID: id,
EncodingVersion: version,
DecodableVersions: []string{version},
}
sv.Status.StorageVersions = append(sv.Status.StorageVersions, v)
}
// every id is unique and creates a different version. We know we have a common encoding
// version when there is only one id. Pick it
if len(ids) == 1 {
sv.Status.CommonEncodingVersion = pointer.StringPtr(sv.Status.StorageVersions[0].EncodingVersion)
}
createdSV, err := client.InternalV1alpha1().StorageVersions().Create(context.TODO(), sv, metav1.CreateOptions{})
if err != nil {
t.Fatalf("failed to create storage version %s: %v", svName, err)
}
// update the created sv with intended status
createdSV.Status = sv.Status
if _, err := client.InternalV1alpha1().StorageVersions().UpdateStatus(
context.TODO(), createdSV, metav1.UpdateOptions{}); err != nil {
t.Fatalf("failed to update store version status: %v", err)
}
}
func assertStorageVersionDeleted(t *testing.T, client kubernetes.Interface) {
if err := wait.PollImmediate(100*time.Millisecond, 10*time.Second, func() (bool, error) {
_, err := client.InternalV1alpha1().StorageVersions().Get(
context.TODO(), svName, metav1.GetOptions{})
if apierrors.IsNotFound(err) {
return true, nil
}
if err != nil {
return false, err
}
return false, nil
}); err != nil {
t.Fatalf("failed to wait for storageversion garbage collection: %v", err)
}
}
func createTestAPIServerIdentityLease(t *testing.T, client kubernetes.Interface, name string) {
lease := &coordinationv1.Lease{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: metav1.NamespaceSystem,
Labels: map[string]string{
controlplane.IdentityLeaseComponentLabelKey: controlplane.KubeAPIServer,
},
},
Spec: coordinationv1.LeaseSpec{
HolderIdentity: pointer.StringPtr(name),
LeaseDurationSeconds: pointer.Int32Ptr(3600),
// create fresh leases
AcquireTime: &metav1.MicroTime{Time: time.Now()},
RenewTime: &metav1.MicroTime{Time: time.Now()},
},
}
if _, err := client.CoordinationV1().Leases(metav1.NamespaceSystem).Create(
context.TODO(), lease, metav1.CreateOptions{}); err != nil {
t.Fatalf("failed to create apiserver identity lease %s: %v", name, err)
}
}
func deleteTestAPIServerIdentityLease(t *testing.T, client kubernetes.Interface, name string) {
if err := client.CoordinationV1().Leases(metav1.NamespaceSystem).Delete(
context.TODO(), name, metav1.DeleteOptions{}); err != nil {
t.Fatalf("failed to delete apiserver identity lease %s: %v", name, err)
}
}
func assertStorageVersionEntries(t *testing.T, client kubernetes.Interface,
numEntries int, firstID string) {
var lastErr error
if err := wait.PollImmediate(100*time.Millisecond, 10*time.Second, func() (bool, error) {
sv, err := client.InternalV1alpha1().StorageVersions().Get(
context.TODO(), svName, metav1.GetOptions{})
if err != nil {
return false, err
}
if len(sv.Status.StorageVersions) != numEntries {
lastErr = fmt.Errorf("unexpected number of storage version entries, expected %v, got: %v",
numEntries, len(sv.Status.StorageVersions))
return false, nil
}
if sv.Status.StorageVersions[0].APIServerID != firstID {
lastErr = fmt.Errorf("unexpected fisrt storage version entry id, expected %v, got: %v",
firstID, sv.Status.StorageVersions[0].APIServerID)
return false, nil
}
return true, nil
}); err != nil {
t.Fatalf("failed to get expected storage verion entries: %v, last error: %v", err, lastErr)
}
}
func assertCommonEncodingVersion(t *testing.T, client kubernetes.Interface, e *string) {
sv, err := client.InternalV1alpha1().StorageVersions().Get(
context.TODO(), svName, metav1.GetOptions{})
if err != nil {
t.Fatalf("failed to retrieve storage version: %v", err)
}
if e == nil {
if sv.Status.CommonEncodingVersion != nil {
t.Errorf("unexpected non-nil common encoding version: %v", sv.Status.CommonEncodingVersion)
}
return
}
if sv.Status.CommonEncodingVersion == nil || *sv.Status.CommonEncodingVersion != *e {
t.Errorf("unexpected common encoding version, expected: %v, got %v", e, sv.Status.CommonEncodingVersion)
}
}
func idToVersion(t *testing.T, id string) string {
// TODO(roycaihw): rewrite the test, use a id-version table
if !strings.HasPrefix(id, "id-") {
t.Fatalf("should not happen: test using id without id- prefix: %s", id)
}
return fmt.Sprintf("v%s", strings.TrimPrefix(id, "id-"))
}

View File

@ -82,7 +82,7 @@ func assertBlocking(name string, t *testing.T, err error, shouldBlock bool) {
func testBuiltinResourceWrite(t *testing.T, cfg *rest.Config, shouldBlock bool) {
client := clientset.NewForConfigOrDie(cfg)
_, err := client.CoreV1().Namespaces().Create(context.TODO(), &v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: "test"}}, metav1.CreateOptions{})
_, err := client.CoreV1().ConfigMaps("default").Create(context.TODO(), &v1.ConfigMap{ObjectMeta: metav1.ObjectMeta{Name: "test"}}, metav1.CreateOptions{})
assertBlocking("writes to built in resources", t, err, shouldBlock)
}