make storage version manager wait for lease creation

to avoid storageversions being GC'ed accidentally
This commit is contained in:
Haowei Cai 2020-11-12 16:21:50 -08:00
parent ee9ace14c2
commit 5112a51d62

View File

@ -17,10 +17,12 @@ limitations under the License.
package apiserver
import (
"context"
"fmt"
"net/http"
"time"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/sets"
@ -30,6 +32,7 @@ import (
"k8s.io/apiserver/pkg/server/egressselector"
serverstorage "k8s.io/apiserver/pkg/server/storage"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/pkg/version"
openapicommon "k8s.io/kube-openapi/pkg/common"
@ -271,7 +274,28 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg
utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIServerIdentity) {
// Spawn a goroutine in aggregator apiserver to update storage version for
// all built-in resources
s.GenericAPIServer.AddPostStartHookOrDie("built-in-resources-storage-version-updater", func(context genericapiserver.PostStartHookContext) error {
s.GenericAPIServer.AddPostStartHookOrDie("built-in-resources-storage-version-updater", func(hookContext genericapiserver.PostStartHookContext) error {
// Wait for apiserver-identity to exist first before updating storage
// versions, to avoid storage version GC accidentally garbage-collecting
// storage versions.
kubeClient, err := kubernetes.NewForConfig(hookContext.LoopbackClientConfig)
if err != nil {
return err
}
if err := wait.PollImmediateUntil(100*time.Millisecond, func() (bool, error) {
_, err := kubeClient.CoordinationV1().Leases(metav1.NamespaceSystem).Get(
context.TODO(), s.GenericAPIServer.APIServerID, metav1.GetOptions{})
if apierrors.IsNotFound(err) {
return false, nil
}
if err != nil {
return false, err
}
return true, nil
}, hookContext.StopCh); err != nil {
return fmt.Errorf("failed to wait for apiserver-identity lease %s to be created: %v",
s.GenericAPIServer.APIServerID, err)
}
// Technically an apiserver only needs to update storage version once during bootstrap.
// Reconcile StorageVersion objects every 10 minutes will help in the case that the
// StorageVersion objects get accidentally modified/deleted by a different agent. In that
@ -282,16 +306,16 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg
// All apiservers (aggregator-apiserver, kube-apiserver, apiextensions-apiserver)
// share the same generic apiserver config. The same StorageVersion manager is used
// to register all built-in resources when the generic apiservers install APIs.
s.GenericAPIServer.StorageVersionManager.UpdateStorageVersions(context.LoopbackClientConfig, s.GenericAPIServer.APIServerID)
s.GenericAPIServer.StorageVersionManager.UpdateStorageVersions(hookContext.LoopbackClientConfig, s.GenericAPIServer.APIServerID)
return false, nil
}, context.StopCh)
}, hookContext.StopCh)
// Once the storage version updater finishes the first round of update,
// the PostStartHook will return to unblock /healthz. The handler chain
// won't block write requests anymore. Check every second since it's not
// expensive.
wait.PollImmediateUntil(1*time.Second, func() (bool, error) {
return s.GenericAPIServer.StorageVersionManager.Completed(), nil
}, context.StopCh)
}, hookContext.StopCh)
return nil
})
}