mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 19:56:01 +00:00
Merge pull request #45985 from deads2k/tpr-16-finalizer
Automatic merge from submit-queue (batch tested with PRs 41535, 45985, 45929, 45948, 46056) add CRD finalizer to remove CRs Fixes https://github.com/kubernetes/kubernetes/issues/45878 This adds a finalizer for customresourcedefinitions to prevent CRD deletion until all CR instances are gone. @sdminonne I lost track of your issue, but here's the fix I'm at a loss for how to test this. It's tested from the outside by ensuring that a CRD delete removes its instances (integration test) and we could add more integration tests, but for unit tests I can't seem to find a mock `dynamic.ClientPool` and its not easily writeable at the moment. I'm thinking about saying we just add more black box tests given the options.
This commit is contained in:
commit
61eace2b1e
@ -1,4 +1,4 @@
|
||||
apiVersion: apiregistration.k8s.io/v1alpha1
|
||||
apiVersion: apiregistration.k8s.io/v1beta1
|
||||
kind: APIService
|
||||
metadata:
|
||||
name: v1alpha1.mygroup.example.com
|
||||
|
@ -1,4 +1,4 @@
|
||||
apiVersion: apiregistration.k8s.io/v1alpha1
|
||||
apiVersion: apiregistration.k8s.io/v1beta1
|
||||
kind: APIService
|
||||
metadata:
|
||||
name: v1alpha1.apiextensions.k8s.io
|
||||
|
@ -5,6 +5,7 @@ licenses(["notice"])
|
||||
load(
|
||||
"@io_bazel_rules_go//go:def.bzl",
|
||||
"go_library",
|
||||
"go_test",
|
||||
)
|
||||
|
||||
go_library(
|
||||
@ -24,3 +25,11 @@ go_library(
|
||||
"//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
|
||||
],
|
||||
)
|
||||
|
||||
go_test(
|
||||
name = "go_default_test",
|
||||
srcs = ["helpers_test.go"],
|
||||
library = ":go_default_library",
|
||||
tags = ["automanaged"],
|
||||
deps = ["//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library"],
|
||||
)
|
||||
|
@ -18,10 +18,10 @@ package apiextensions
|
||||
|
||||
// SetCRDCondition sets the status condition. It either overwrites the existing one or
|
||||
// creates a new one
|
||||
func SetCRDCondition(customResourceDefinition *CustomResourceDefinition, newCondition CustomResourceDefinitionCondition) {
|
||||
existingCondition := FindCRDCondition(customResourceDefinition, newCondition.Type)
|
||||
func SetCRDCondition(crd *CustomResourceDefinition, newCondition CustomResourceDefinitionCondition) {
|
||||
existingCondition := FindCRDCondition(crd, newCondition.Type)
|
||||
if existingCondition == nil {
|
||||
customResourceDefinition.Status.Conditions = append(customResourceDefinition.Status.Conditions, newCondition)
|
||||
crd.Status.Conditions = append(crd.Status.Conditions, newCondition)
|
||||
return
|
||||
}
|
||||
|
||||
@ -34,11 +34,22 @@ func SetCRDCondition(customResourceDefinition *CustomResourceDefinition, newCond
|
||||
existingCondition.Message = newCondition.Message
|
||||
}
|
||||
|
||||
// RemoveCRDCondition removes the status condition.
|
||||
func RemoveCRDCondition(crd *CustomResourceDefinition, conditionType CustomResourceDefinitionConditionType) {
|
||||
newConditions := []CustomResourceDefinitionCondition{}
|
||||
for _, condition := range crd.Status.Conditions {
|
||||
if condition.Type != conditionType {
|
||||
newConditions = append(newConditions, condition)
|
||||
}
|
||||
}
|
||||
crd.Status.Conditions = newConditions
|
||||
}
|
||||
|
||||
// FindCRDCondition returns the condition you're looking for or nil
|
||||
func FindCRDCondition(customResourceDefinition *CustomResourceDefinition, conditionType CustomResourceDefinitionConditionType) *CustomResourceDefinitionCondition {
|
||||
for i := range customResourceDefinition.Status.Conditions {
|
||||
if customResourceDefinition.Status.Conditions[i].Type == conditionType {
|
||||
return &customResourceDefinition.Status.Conditions[i]
|
||||
func FindCRDCondition(crd *CustomResourceDefinition, conditionType CustomResourceDefinitionConditionType) *CustomResourceDefinitionCondition {
|
||||
for i := range crd.Status.Conditions {
|
||||
if crd.Status.Conditions[i].Type == conditionType {
|
||||
return &crd.Status.Conditions[i]
|
||||
}
|
||||
}
|
||||
|
||||
@ -46,18 +57,18 @@ func FindCRDCondition(customResourceDefinition *CustomResourceDefinition, condit
|
||||
}
|
||||
|
||||
// IsCRDConditionTrue indicates if the condition is present and strictly true
|
||||
func IsCRDConditionTrue(customResourceDefinition *CustomResourceDefinition, conditionType CustomResourceDefinitionConditionType) bool {
|
||||
return IsCRDConditionPresentAndEqual(customResourceDefinition, conditionType, ConditionTrue)
|
||||
func IsCRDConditionTrue(crd *CustomResourceDefinition, conditionType CustomResourceDefinitionConditionType) bool {
|
||||
return IsCRDConditionPresentAndEqual(crd, conditionType, ConditionTrue)
|
||||
}
|
||||
|
||||
// IsCRDConditionFalse indicates if the condition is present and false true
|
||||
func IsCRDConditionFalse(customResourceDefinition *CustomResourceDefinition, conditionType CustomResourceDefinitionConditionType) bool {
|
||||
return IsCRDConditionPresentAndEqual(customResourceDefinition, conditionType, ConditionFalse)
|
||||
func IsCRDConditionFalse(crd *CustomResourceDefinition, conditionType CustomResourceDefinitionConditionType) bool {
|
||||
return IsCRDConditionPresentAndEqual(crd, conditionType, ConditionFalse)
|
||||
}
|
||||
|
||||
// IsCRDConditionPresentAndEqual indicates if the condition is present and equal to the arg
|
||||
func IsCRDConditionPresentAndEqual(customResourceDefinition *CustomResourceDefinition, conditionType CustomResourceDefinitionConditionType, status ConditionStatus) bool {
|
||||
for _, condition := range customResourceDefinition.Status.Conditions {
|
||||
func IsCRDConditionPresentAndEqual(crd *CustomResourceDefinition, conditionType CustomResourceDefinitionConditionType, status ConditionStatus) bool {
|
||||
for _, condition := range crd.Status.Conditions {
|
||||
if condition.Type == conditionType {
|
||||
return condition.Status == status
|
||||
}
|
||||
@ -76,3 +87,25 @@ func IsCRDConditionEquivalent(lhs, rhs *CustomResourceDefinitionCondition) bool
|
||||
|
||||
return lhs.Message == rhs.Message && lhs.Reason == rhs.Reason && lhs.Status == rhs.Status && lhs.Type == rhs.Type
|
||||
}
|
||||
|
||||
// CRDHasFinalizer returns true if the finalizer is in the list
|
||||
func CRDHasFinalizer(crd *CustomResourceDefinition, needle string) bool {
|
||||
for _, finalizer := range crd.Finalizers {
|
||||
if finalizer == needle {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
// CRDRemoveFinalizer removes the finalizer if present
|
||||
func CRDRemoveFinalizer(crd *CustomResourceDefinition, needle string) {
|
||||
newFinalizers := []string{}
|
||||
for _, finalizer := range crd.Finalizers {
|
||||
if finalizer != needle {
|
||||
newFinalizers = append(newFinalizers, finalizer)
|
||||
}
|
||||
}
|
||||
crd.Finalizers = newFinalizers
|
||||
}
|
||||
|
@ -0,0 +1,90 @@
|
||||
/*
|
||||
Copyright 2016 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package apiextensions
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
)
|
||||
|
||||
func TestCRDHasFinalizer(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
crd *CustomResourceDefinition
|
||||
finalizerToCheck string
|
||||
|
||||
expected bool
|
||||
}{
|
||||
{
|
||||
name: "missing",
|
||||
crd: &CustomResourceDefinition{
|
||||
ObjectMeta: metav1.ObjectMeta{Finalizers: []string{"not-it"}},
|
||||
},
|
||||
finalizerToCheck: "it",
|
||||
expected: false,
|
||||
},
|
||||
{
|
||||
name: "present",
|
||||
crd: &CustomResourceDefinition{
|
||||
ObjectMeta: metav1.ObjectMeta{Finalizers: []string{"not-it", "it"}},
|
||||
},
|
||||
finalizerToCheck: "it",
|
||||
expected: true,
|
||||
},
|
||||
}
|
||||
for _, tc := range tests {
|
||||
actual := CRDHasFinalizer(tc.crd, tc.finalizerToCheck)
|
||||
if tc.expected != actual {
|
||||
t.Errorf("%v expected %v, got %v", tc.name, tc.expected, actual)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestCRDRemoveFinalizer(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
crd *CustomResourceDefinition
|
||||
finalizerToCheck string
|
||||
|
||||
expected []string
|
||||
}{
|
||||
{
|
||||
name: "missing",
|
||||
crd: &CustomResourceDefinition{
|
||||
ObjectMeta: metav1.ObjectMeta{Finalizers: []string{"not-it"}},
|
||||
},
|
||||
finalizerToCheck: "it",
|
||||
expected: []string{"not-it"},
|
||||
},
|
||||
{
|
||||
name: "present",
|
||||
crd: &CustomResourceDefinition{
|
||||
ObjectMeta: metav1.ObjectMeta{Finalizers: []string{"not-it", "it"}},
|
||||
},
|
||||
finalizerToCheck: "it",
|
||||
expected: []string{"not-it"},
|
||||
},
|
||||
}
|
||||
for _, tc := range tests {
|
||||
CRDRemoveFinalizer(tc.crd, tc.finalizerToCheck)
|
||||
if !reflect.DeepEqual(tc.expected, tc.crd.Finalizers) {
|
||||
t.Errorf("%v expected %v, got %v", tc.name, tc.expected, tc.crd.Finalizers)
|
||||
}
|
||||
}
|
||||
}
|
@ -104,6 +104,10 @@ type CustomResourceDefinitionStatus struct {
|
||||
AcceptedNames CustomResourceDefinitionNames
|
||||
}
|
||||
|
||||
// CustomResourceCleanupFinalizer is the name of the finalizer which will delete instances of
|
||||
// a CustomResourceDefinition
|
||||
const CustomResourceCleanupFinalizer = "customresourcecleanup.apiextensions.k8s.io"
|
||||
|
||||
// +genclient=true
|
||||
// +nonNamespaced=true
|
||||
|
||||
|
@ -104,6 +104,10 @@ type CustomResourceDefinitionStatus struct {
|
||||
AcceptedNames CustomResourceDefinitionNames `json:"acceptedNames" protobuf:"bytes,2,opt,name=acceptedNames"`
|
||||
}
|
||||
|
||||
// CustomResourceCleanupFinalizer is the name of the finalizer which will delete instances of
|
||||
// a CustomResourceDefinition
|
||||
const CustomResourceCleanupFinalizer = "customresourcecleanup.apiextensions.k8s.io"
|
||||
|
||||
// +genclient=true
|
||||
// +nonNamespaced=true
|
||||
|
||||
|
@ -43,6 +43,7 @@ go_library(
|
||||
"//vendor/k8s.io/apiserver/pkg/server:go_default_library",
|
||||
"//vendor/k8s.io/apiserver/pkg/storage/storagebackend:go_default_library",
|
||||
"//vendor/k8s.io/client-go/discovery:go_default_library",
|
||||
"//vendor/k8s.io/client-go/dynamic:go_default_library",
|
||||
"//vendor/k8s.io/client-go/tools/cache:go_default_library",
|
||||
"//vendor/k8s.io/client-go/util/workqueue:go_default_library",
|
||||
"//vendor/k8s.io/kube-apiextensions-server/pkg/apis/apiextensions:go_default_library",
|
||||
@ -54,6 +55,7 @@ go_library(
|
||||
"//vendor/k8s.io/kube-apiextensions-server/pkg/client/informers/internalversion:go_default_library",
|
||||
"//vendor/k8s.io/kube-apiextensions-server/pkg/client/informers/internalversion/apiextensions/internalversion:go_default_library",
|
||||
"//vendor/k8s.io/kube-apiextensions-server/pkg/client/listers/apiextensions/internalversion:go_default_library",
|
||||
"//vendor/k8s.io/kube-apiextensions-server/pkg/controller/finalizer:go_default_library",
|
||||
"//vendor/k8s.io/kube-apiextensions-server/pkg/controller/status:go_default_library",
|
||||
"//vendor/k8s.io/kube-apiextensions-server/pkg/registry/customresource:go_default_library",
|
||||
"//vendor/k8s.io/kube-apiextensions-server/pkg/registry/customresourcedefinition:go_default_library",
|
||||
|
@ -31,12 +31,14 @@ import (
|
||||
genericregistry "k8s.io/apiserver/pkg/registry/generic"
|
||||
"k8s.io/apiserver/pkg/registry/rest"
|
||||
genericapiserver "k8s.io/apiserver/pkg/server"
|
||||
"k8s.io/client-go/dynamic"
|
||||
|
||||
"k8s.io/kube-apiextensions-server/pkg/apis/apiextensions"
|
||||
"k8s.io/kube-apiextensions-server/pkg/apis/apiextensions/install"
|
||||
"k8s.io/kube-apiextensions-server/pkg/apis/apiextensions/v1alpha1"
|
||||
"k8s.io/kube-apiextensions-server/pkg/client/clientset/internalclientset"
|
||||
internalinformers "k8s.io/kube-apiextensions-server/pkg/client/informers/internalversion"
|
||||
"k8s.io/kube-apiextensions-server/pkg/controller/finalizer"
|
||||
"k8s.io/kube-apiextensions-server/pkg/controller/status"
|
||||
"k8s.io/kube-apiextensions-server/pkg/registry/customresourcedefinition"
|
||||
|
||||
@ -157,6 +159,10 @@ func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget)
|
||||
|
||||
customResourceDefinitionController := NewDiscoveryController(customResourceDefinitionInformers.Apiextensions().InternalVersion().CustomResourceDefinitions(), versionDiscoveryHandler, groupDiscoveryHandler)
|
||||
namingController := status.NewNamingConditionController(customResourceDefinitionInformers.Apiextensions().InternalVersion().CustomResourceDefinitions(), customResourceDefinitionClient)
|
||||
finalizingController := finalizer.NewCRDFinalizer(
|
||||
customResourceDefinitionInformers.Apiextensions().InternalVersion().CustomResourceDefinitions(),
|
||||
customResourceDefinitionClient,
|
||||
dynamic.NewDynamicClientPool(s.GenericAPIServer.LoopbackClientConfig))
|
||||
|
||||
s.GenericAPIServer.AddPostStartHook("start-apiextensions-informers", func(context genericapiserver.PostStartHookContext) error {
|
||||
customResourceDefinitionInformers.Start(context.StopCh)
|
||||
@ -165,6 +171,7 @@ func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget)
|
||||
s.GenericAPIServer.AddPostStartHook("start-apiextensions-controllers", func(context genericapiserver.PostStartHookContext) error {
|
||||
go customResourceDefinitionController.Run(context.StopCh)
|
||||
go namingController.Run(context.StopCh)
|
||||
go finalizingController.Run(5, context.StopCh)
|
||||
return nil
|
||||
})
|
||||
|
||||
|
@ -101,12 +101,18 @@ func (c *DiscoveryController) sync(version schema.GroupVersion) error {
|
||||
}
|
||||
foundVersion = true
|
||||
|
||||
verbs := metav1.Verbs([]string{"delete", "deletecollection", "get", "list", "patch", "create", "update", "watch"})
|
||||
// if we're terminating we don't allow some verbs
|
||||
if apiextensions.IsCRDConditionTrue(crd, apiextensions.Terminating) {
|
||||
verbs = metav1.Verbs([]string{"delete", "deletecollection", "get", "list", "watch"})
|
||||
}
|
||||
|
||||
apiResourcesForDiscovery = append(apiResourcesForDiscovery, metav1.APIResource{
|
||||
Name: crd.Status.AcceptedNames.Plural,
|
||||
SingularName: crd.Status.AcceptedNames.Singular,
|
||||
Namespaced: crd.Spec.Scope == apiextensions.NamespaceScoped,
|
||||
Kind: crd.Status.AcceptedNames.Kind,
|
||||
Verbs: metav1.Verbs([]string{"delete", "deletecollection", "get", "list", "patch", "create", "update", "watch"}),
|
||||
Verbs: verbs,
|
||||
ShortNames: crd.Status.AcceptedNames.ShortNames,
|
||||
})
|
||||
}
|
||||
|
@ -153,6 +153,8 @@ func (r *crdHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
|
||||
r.delegate.ServeHTTP(w, req)
|
||||
}
|
||||
|
||||
terminating := apiextensions.IsCRDConditionTrue(crd, apiextensions.Terminating)
|
||||
|
||||
crdInfo := r.getServingInfoFor(crd)
|
||||
storage := crdInfo.storage
|
||||
requestScope := crdInfo.requestScope
|
||||
@ -174,14 +176,26 @@ func (r *crdHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
|
||||
handler(w, req)
|
||||
return
|
||||
case "create":
|
||||
if terminating {
|
||||
http.Error(w, fmt.Sprintf("%v not allowed while CustomResourceDefinition is terminating", requestInfo.Verb), http.StatusMethodNotAllowed)
|
||||
return
|
||||
}
|
||||
handler := handlers.CreateResource(storage, requestScope, discovery.NewUnstructuredObjectTyper(nil), r.admission)
|
||||
handler(w, req)
|
||||
return
|
||||
case "update":
|
||||
if terminating {
|
||||
http.Error(w, fmt.Sprintf("%v not allowed while CustomResourceDefinition is terminating", requestInfo.Verb), http.StatusMethodNotAllowed)
|
||||
return
|
||||
}
|
||||
handler := handlers.UpdateResource(storage, requestScope, discovery.NewUnstructuredObjectTyper(nil), r.admission)
|
||||
handler(w, req)
|
||||
return
|
||||
case "patch":
|
||||
if terminating {
|
||||
http.Error(w, fmt.Sprintf("%v not allowed while CustomResourceDefinition is terminating", requestInfo.Verb), http.StatusMethodNotAllowed)
|
||||
return
|
||||
}
|
||||
handler := handlers.PatchResource(storage, requestScope, r.admission, unstructured.UnstructuredObjectConverter{})
|
||||
handler(w, req)
|
||||
return
|
||||
@ -190,6 +204,11 @@ func (r *crdHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
|
||||
handler := handlers.DeleteResource(storage, allowsOptions, requestScope, r.admission)
|
||||
handler(w, req)
|
||||
return
|
||||
case "deletecollection":
|
||||
checkBody := true
|
||||
handler := handlers.DeleteCollection(storage, checkBody, requestScope, r.admission)
|
||||
handler(w, req)
|
||||
return
|
||||
|
||||
default:
|
||||
http.Error(w, fmt.Sprintf("unhandled verb %q", requestInfo.Verb), http.StatusMethodNotAllowed)
|
||||
|
@ -0,0 +1,34 @@
|
||||
package(default_visibility = ["//visibility:public"])
|
||||
|
||||
licenses(["notice"])
|
||||
|
||||
load(
|
||||
"@io_bazel_rules_go//go:def.bzl",
|
||||
"go_library",
|
||||
)
|
||||
|
||||
go_library(
|
||||
name = "go_default_library",
|
||||
srcs = ["crd_finalizer.go"],
|
||||
tags = ["automanaged"],
|
||||
deps = [
|
||||
"//vendor/github.com/golang/glog:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/api/meta:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1/unstructured:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/conversion:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/errors:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
|
||||
"//vendor/k8s.io/client-go/dynamic:go_default_library",
|
||||
"//vendor/k8s.io/client-go/tools/cache:go_default_library",
|
||||
"//vendor/k8s.io/client-go/util/workqueue:go_default_library",
|
||||
"//vendor/k8s.io/kube-apiextensions-server/pkg/apis/apiextensions:go_default_library",
|
||||
"//vendor/k8s.io/kube-apiextensions-server/pkg/client/clientset/internalclientset/typed/apiextensions/internalversion:go_default_library",
|
||||
"//vendor/k8s.io/kube-apiextensions-server/pkg/client/informers/internalversion/apiextensions/internalversion:go_default_library",
|
||||
"//vendor/k8s.io/kube-apiextensions-server/pkg/client/listers/apiextensions/internalversion:go_default_library",
|
||||
],
|
||||
)
|
@ -0,0 +1,333 @@
|
||||
/*
|
||||
Copyright 2017 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package finalizer
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"reflect"
|
||||
"time"
|
||||
|
||||
"github.com/golang/glog"
|
||||
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
"k8s.io/apimachinery/pkg/api/meta"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||
"k8s.io/apimachinery/pkg/conversion"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
utilerrors "k8s.io/apimachinery/pkg/util/errors"
|
||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/client-go/dynamic"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
"k8s.io/client-go/util/workqueue"
|
||||
|
||||
"k8s.io/kube-apiextensions-server/pkg/apis/apiextensions"
|
||||
client "k8s.io/kube-apiextensions-server/pkg/client/clientset/internalclientset/typed/apiextensions/internalversion"
|
||||
informers "k8s.io/kube-apiextensions-server/pkg/client/informers/internalversion/apiextensions/internalversion"
|
||||
listers "k8s.io/kube-apiextensions-server/pkg/client/listers/apiextensions/internalversion"
|
||||
)
|
||||
|
||||
var cloner = conversion.NewCloner()
|
||||
|
||||
// This controller finalizes the CRD by deleting all the CRs associated with it.
|
||||
type CRDFinalizer struct {
|
||||
crdClient client.CustomResourceDefinitionsGetter
|
||||
// clientPool is a dynamic client used to delete the individual instances
|
||||
clientPool dynamic.ClientPool
|
||||
|
||||
crdLister listers.CustomResourceDefinitionLister
|
||||
crdSynced cache.InformerSynced
|
||||
|
||||
// To allow injection for testing.
|
||||
syncFn func(key string) error
|
||||
|
||||
queue workqueue.RateLimitingInterface
|
||||
}
|
||||
|
||||
func NewCRDFinalizer(
|
||||
crdInformer informers.CustomResourceDefinitionInformer,
|
||||
crdClient client.CustomResourceDefinitionsGetter,
|
||||
clientPool dynamic.ClientPool,
|
||||
) *CRDFinalizer {
|
||||
c := &CRDFinalizer{
|
||||
crdClient: crdClient,
|
||||
clientPool: clientPool,
|
||||
crdLister: crdInformer.Lister(),
|
||||
crdSynced: crdInformer.Informer().HasSynced,
|
||||
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "CustomResourceDefinition-CRDFinalizer"),
|
||||
}
|
||||
|
||||
crdInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: c.addCustomResourceDefinition,
|
||||
UpdateFunc: c.updateCustomResourceDefinition,
|
||||
})
|
||||
|
||||
c.syncFn = c.sync
|
||||
|
||||
return c
|
||||
}
|
||||
|
||||
func (c *CRDFinalizer) sync(key string) error {
|
||||
cachedCRD, err := c.crdLister.Get(key)
|
||||
if apierrors.IsNotFound(err) {
|
||||
return nil
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// no work to do
|
||||
if cachedCRD.DeletionTimestamp.IsZero() || !apiextensions.CRDHasFinalizer(cachedCRD, apiextensions.CustomResourceCleanupFinalizer) {
|
||||
return nil
|
||||
}
|
||||
|
||||
crd := &apiextensions.CustomResourceDefinition{}
|
||||
if err := apiextensions.DeepCopy_apiextensions_CustomResourceDefinition(cachedCRD, crd, cloner); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// update the status condition. This cleanup could take a while.
|
||||
apiextensions.SetCRDCondition(crd, apiextensions.CustomResourceDefinitionCondition{
|
||||
Type: apiextensions.Terminating,
|
||||
Status: apiextensions.ConditionTrue,
|
||||
Reason: "InstanceDeletionInProgress",
|
||||
Message: "CustomResource deletion is in progress",
|
||||
})
|
||||
crd, err = c.crdClient.CustomResourceDefinitions().UpdateStatus(crd)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Its possible for a naming conflict to have removed this resource from the API after instances were created.
|
||||
// For now we will cowardly stop finalizing. If we don't go through the REST API, weird things may happen:
|
||||
// no audit trail, no admission checks or side effects, finalization would probably still work but defaulting
|
||||
// would be missed. It would be a mess.
|
||||
// This requires human intervention to solve, update status so they have a reason.
|
||||
// TODO split coreNamesAccepted from extendedNamesAccepted. If coreNames were accepted, then we have something to cleanup
|
||||
// and the endpoint is serviceable. if they aren't, then there's nothing to cleanup.
|
||||
if !apiextensions.IsCRDConditionFalse(crd, apiextensions.NameConflict) {
|
||||
apiextensions.SetCRDCondition(crd, apiextensions.CustomResourceDefinitionCondition{
|
||||
Type: apiextensions.Terminating,
|
||||
Status: apiextensions.ConditionTrue,
|
||||
Reason: "InstanceDeletionStuck",
|
||||
Message: fmt.Sprintf("cannot proceed with deletion because of %v condition", apiextensions.NameConflict),
|
||||
})
|
||||
crd, err = c.crdClient.CustomResourceDefinitions().UpdateStatus(crd)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return fmt.Errorf("cannot proceed with deletion because of %v condition", apiextensions.NameConflict)
|
||||
}
|
||||
|
||||
// Now we can start deleting items. We should use the REST API to ensure that all normal admission runs.
|
||||
// Since we control the endpoints, we know that delete collection works.
|
||||
crClient, err := c.clientPool.ClientForGroupVersionResource(schema.GroupVersionResource{Group: crd.Spec.Group, Version: crd.Spec.Version, Resource: crd.Status.AcceptedNames.Plural})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
crAPIResource := &metav1.APIResource{
|
||||
Name: crd.Status.AcceptedNames.Plural,
|
||||
SingularName: crd.Status.AcceptedNames.Singular,
|
||||
Namespaced: crd.Spec.Scope == apiextensions.NamespaceScoped,
|
||||
Kind: crd.Status.AcceptedNames.Kind,
|
||||
Verbs: metav1.Verbs([]string{"deletecollection", "list"}),
|
||||
ShortNames: crd.Status.AcceptedNames.ShortNames,
|
||||
}
|
||||
crResourceClient := crClient.Resource(crAPIResource, "" /* namespace all */)
|
||||
allResources, err := crResourceClient.List(metav1.ListOptions{})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
deletedNamespaces := sets.String{}
|
||||
deleteErrors := []error{}
|
||||
for _, item := range allResources.(*unstructured.UnstructuredList).Items {
|
||||
metadata, err := meta.Accessor(&item)
|
||||
if err != nil {
|
||||
utilruntime.HandleError(err)
|
||||
continue
|
||||
}
|
||||
if deletedNamespaces.Has(metadata.GetNamespace()) {
|
||||
continue
|
||||
}
|
||||
// don't retry deleting the same namespace
|
||||
deletedNamespaces.Insert(metadata.GetNamespace())
|
||||
if err := crClient.Resource(crAPIResource, metadata.GetNamespace()).DeleteCollection(nil, metav1.ListOptions{}); err != nil {
|
||||
deleteErrors = append(deleteErrors, err)
|
||||
continue
|
||||
}
|
||||
}
|
||||
if deleteError := utilerrors.NewAggregate(deleteErrors); deleteError != nil {
|
||||
apiextensions.SetCRDCondition(crd, apiextensions.CustomResourceDefinitionCondition{
|
||||
Type: apiextensions.Terminating,
|
||||
Status: apiextensions.ConditionTrue,
|
||||
Reason: "InstanceDeletionFailed",
|
||||
Message: fmt.Sprintf("could not issue all deletes: %v", deleteError),
|
||||
})
|
||||
crd, err = c.crdClient.CustomResourceDefinitions().UpdateStatus(crd)
|
||||
if err != nil {
|
||||
utilruntime.HandleError(err)
|
||||
}
|
||||
return deleteError
|
||||
}
|
||||
|
||||
// now we need to wait until all the resources are deleted. Start with a simple poll before we do anything fancy.
|
||||
// TODO not all servers are synchronized on caches. It is possible for a stale one to still be creating things.
|
||||
// Once we have a mechanism for servers to indicate their states, we should check that for concurrence.
|
||||
listErr := wait.PollImmediate(5*time.Second, 1*time.Minute, func() (bool, error) {
|
||||
listObj, err := crResourceClient.List(metav1.ListOptions{})
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
if len(listObj.(*unstructured.UnstructuredList).Items) == 0 {
|
||||
return true, nil
|
||||
}
|
||||
glog.V(2).Infof("%s.%s waiting for %d items to be removed", crd.Status.AcceptedNames.Plural, crd.Spec.Group, len(listObj.(*unstructured.UnstructuredList).Items))
|
||||
return false, nil
|
||||
})
|
||||
if listErr != nil {
|
||||
apiextensions.SetCRDCondition(crd, apiextensions.CustomResourceDefinitionCondition{
|
||||
Type: apiextensions.Terminating,
|
||||
Status: apiextensions.ConditionTrue,
|
||||
Reason: "InstanceDeletionCheck",
|
||||
Message: fmt.Sprintf("could not confirm zero CustomResources remaining: %v", listErr),
|
||||
})
|
||||
crd, err = c.crdClient.CustomResourceDefinitions().UpdateStatus(crd)
|
||||
if err != nil {
|
||||
utilruntime.HandleError(err)
|
||||
}
|
||||
return listErr
|
||||
}
|
||||
|
||||
apiextensions.SetCRDCondition(crd, apiextensions.CustomResourceDefinitionCondition{
|
||||
Type: apiextensions.Terminating,
|
||||
Status: apiextensions.ConditionFalse,
|
||||
Reason: "InstanceDeletionCompleted",
|
||||
Message: "removed all instances",
|
||||
})
|
||||
apiextensions.CRDRemoveFinalizer(crd, apiextensions.CustomResourceCleanupFinalizer)
|
||||
crd, err = c.crdClient.CustomResourceDefinitions().UpdateStatus(crd)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// and now issue another delete, which should clean it all up if no finalizers remain or no-op if they do
|
||||
return c.crdClient.CustomResourceDefinitions().Delete(crd.Name, nil)
|
||||
}
|
||||
|
||||
func (c *CRDFinalizer) Run(workers int, stopCh <-chan struct{}) {
|
||||
defer utilruntime.HandleCrash()
|
||||
defer c.queue.ShutDown()
|
||||
|
||||
glog.Infof("Starting CRDFinalizer")
|
||||
defer glog.Infof("Shutting down CRDFinalizer")
|
||||
|
||||
if !cache.WaitForCacheSync(stopCh, c.crdSynced) {
|
||||
return
|
||||
}
|
||||
|
||||
for i := 0; i < workers; i++ {
|
||||
go wait.Until(c.runWorker, time.Second, stopCh)
|
||||
}
|
||||
|
||||
<-stopCh
|
||||
}
|
||||
|
||||
func (c *CRDFinalizer) runWorker() {
|
||||
for c.processNextWorkItem() {
|
||||
}
|
||||
}
|
||||
|
||||
// processNextWorkItem deals with one key off the queue. It returns false when it's time to quit.
|
||||
func (c *CRDFinalizer) processNextWorkItem() bool {
|
||||
key, quit := c.queue.Get()
|
||||
if quit {
|
||||
return false
|
||||
}
|
||||
defer c.queue.Done(key)
|
||||
|
||||
err := c.syncFn(key.(string))
|
||||
if err == nil {
|
||||
c.queue.Forget(key)
|
||||
return true
|
||||
}
|
||||
|
||||
utilruntime.HandleError(fmt.Errorf("%v failed with: %v", key, err))
|
||||
c.queue.AddRateLimited(key)
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
func (c *CRDFinalizer) enqueue(obj *apiextensions.CustomResourceDefinition) {
|
||||
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
|
||||
if err != nil {
|
||||
utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %#v: %v", obj, err))
|
||||
return
|
||||
}
|
||||
|
||||
c.queue.Add(key)
|
||||
}
|
||||
|
||||
func (c *CRDFinalizer) addCustomResourceDefinition(obj interface{}) {
|
||||
castObj := obj.(*apiextensions.CustomResourceDefinition)
|
||||
// only queue deleted things
|
||||
if !castObj.DeletionTimestamp.IsZero() && apiextensions.CRDHasFinalizer(castObj, apiextensions.CustomResourceCleanupFinalizer) {
|
||||
c.enqueue(castObj)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *CRDFinalizer) updateCustomResourceDefinition(oldObj, newObj interface{}) {
|
||||
oldCRD := oldObj.(*apiextensions.CustomResourceDefinition)
|
||||
newCRD := newObj.(*apiextensions.CustomResourceDefinition)
|
||||
// only queue deleted things that haven't been finalized by us
|
||||
if newCRD.DeletionTimestamp.IsZero() || !apiextensions.CRDHasFinalizer(newCRD, apiextensions.CustomResourceCleanupFinalizer) {
|
||||
return
|
||||
}
|
||||
|
||||
// always requeue resyncs just in case
|
||||
if oldCRD.ResourceVersion == newCRD.ResourceVersion {
|
||||
c.enqueue(newCRD)
|
||||
return
|
||||
}
|
||||
|
||||
// If the only difference is in the terminating condition, then there's no reason to requeue here. This controller
|
||||
// is likely to be the originator, so requeuing would hot-loop us. Failures are requeued by the workqueue directly.
|
||||
// This is a low traffic and scale resource, so the copy is terrible. It's not good, so better ideas
|
||||
// are welcome.
|
||||
oldCopy := &apiextensions.CustomResourceDefinition{}
|
||||
if err := apiextensions.DeepCopy_apiextensions_CustomResourceDefinition(oldCRD, oldCopy, cloner); err != nil {
|
||||
utilruntime.HandleError(err)
|
||||
c.enqueue(newCRD)
|
||||
return
|
||||
}
|
||||
newCopy := &apiextensions.CustomResourceDefinition{}
|
||||
if err := apiextensions.DeepCopy_apiextensions_CustomResourceDefinition(newCRD, newCopy, cloner); err != nil {
|
||||
utilruntime.HandleError(err)
|
||||
c.enqueue(newCRD)
|
||||
return
|
||||
}
|
||||
oldCopy.ResourceVersion = ""
|
||||
newCopy.ResourceVersion = ""
|
||||
apiextensions.RemoveCRDCondition(oldCopy, apiextensions.Terminating)
|
||||
apiextensions.RemoveCRDCondition(newCopy, apiextensions.Terminating)
|
||||
|
||||
if !reflect.DeepEqual(oldCopy, newCopy) {
|
||||
c.enqueue(newCRD)
|
||||
}
|
||||
}
|
@ -15,6 +15,8 @@ go_library(
|
||||
],
|
||||
tags = ["automanaged"],
|
||||
deps = [
|
||||
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/fields:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/labels:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
|
||||
@ -24,6 +26,7 @@ go_library(
|
||||
"//vendor/k8s.io/apiserver/pkg/registry/generic/registry:go_default_library",
|
||||
"//vendor/k8s.io/apiserver/pkg/registry/rest:go_default_library",
|
||||
"//vendor/k8s.io/apiserver/pkg/storage:go_default_library",
|
||||
"//vendor/k8s.io/apiserver/pkg/storage/errors:go_default_library",
|
||||
"//vendor/k8s.io/apiserver/pkg/storage/names:go_default_library",
|
||||
"//vendor/k8s.io/kube-apiextensions-server/pkg/apis/apiextensions:go_default_library",
|
||||
"//vendor/k8s.io/kube-apiextensions-server/pkg/apis/apiextensions/validation:go_default_library",
|
||||
|
@ -17,11 +17,17 @@ limitations under the License.
|
||||
package customresourcedefinition
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
genericapirequest "k8s.io/apiserver/pkg/endpoints/request"
|
||||
"k8s.io/apiserver/pkg/registry/generic"
|
||||
genericregistry "k8s.io/apiserver/pkg/registry/generic/registry"
|
||||
"k8s.io/apiserver/pkg/registry/rest"
|
||||
"k8s.io/apiserver/pkg/storage"
|
||||
storageerr "k8s.io/apiserver/pkg/storage/errors"
|
||||
"k8s.io/kube-apiextensions-server/pkg/apis/apiextensions"
|
||||
)
|
||||
|
||||
@ -52,6 +58,87 @@ func NewREST(scheme *runtime.Scheme, optsGetter generic.RESTOptionsGetter) *REST
|
||||
return &REST{store}
|
||||
}
|
||||
|
||||
// Delete adds the CRD finalizer to the list
|
||||
func (r *REST) Delete(ctx genericapirequest.Context, name string, options *metav1.DeleteOptions) (runtime.Object, bool, error) {
|
||||
obj, err := r.Get(ctx, name, &metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
|
||||
crd := obj.(*apiextensions.CustomResourceDefinition)
|
||||
|
||||
// Ensure we have a UID precondition
|
||||
if options == nil {
|
||||
options = metav1.NewDeleteOptions(0)
|
||||
}
|
||||
if options.Preconditions == nil {
|
||||
options.Preconditions = &metav1.Preconditions{}
|
||||
}
|
||||
if options.Preconditions.UID == nil {
|
||||
options.Preconditions.UID = &crd.UID
|
||||
} else if *options.Preconditions.UID != crd.UID {
|
||||
err = apierrors.NewConflict(
|
||||
apiextensions.Resource("customresourcedefinitions"),
|
||||
name,
|
||||
fmt.Errorf("Precondition failed: UID in precondition: %v, UID in object meta: %v", *options.Preconditions.UID, crd.UID),
|
||||
)
|
||||
return nil, false, err
|
||||
}
|
||||
|
||||
// upon first request to delete, add our finalizer and then delegate
|
||||
if crd.DeletionTimestamp.IsZero() {
|
||||
key, err := r.Store.KeyFunc(ctx, name)
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
|
||||
preconditions := storage.Preconditions{UID: options.Preconditions.UID}
|
||||
|
||||
out := r.Store.NewFunc()
|
||||
err = r.Store.Storage.GuaranteedUpdate(
|
||||
ctx, key, out, false, &preconditions,
|
||||
storage.SimpleUpdate(func(existing runtime.Object) (runtime.Object, error) {
|
||||
existingCRD, ok := existing.(*apiextensions.CustomResourceDefinition)
|
||||
if !ok {
|
||||
// wrong type
|
||||
return nil, fmt.Errorf("expected *apiextensions.CustomResourceDefinition, got %v", existing)
|
||||
}
|
||||
|
||||
// Set the deletion timestamp if needed
|
||||
if existingCRD.DeletionTimestamp.IsZero() {
|
||||
now := metav1.Now()
|
||||
existingCRD.DeletionTimestamp = &now
|
||||
}
|
||||
|
||||
if !apiextensions.CRDHasFinalizer(existingCRD, apiextensions.CustomResourceCleanupFinalizer) {
|
||||
existingCRD.Finalizers = append(existingCRD.Finalizers, apiextensions.CustomResourceCleanupFinalizer)
|
||||
}
|
||||
// update the status condition too
|
||||
apiextensions.SetCRDCondition(existingCRD, apiextensions.CustomResourceDefinitionCondition{
|
||||
Type: apiextensions.Terminating,
|
||||
Status: apiextensions.ConditionTrue,
|
||||
Reason: "InstanceDeletionPending",
|
||||
Message: "CustomResourceDefinition marked for deletion; CustomResource deletion will begin soon",
|
||||
})
|
||||
return existingCRD, nil
|
||||
}),
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
err = storageerr.InterpretGetError(err, apiextensions.Resource("customresourcedefinitions"), name)
|
||||
err = storageerr.InterpretUpdateError(err, apiextensions.Resource("customresourcedefinitions"), name)
|
||||
if _, ok := err.(*apierrors.StatusError); !ok {
|
||||
err = apierrors.NewInternalError(err)
|
||||
}
|
||||
return nil, false, err
|
||||
}
|
||||
|
||||
return out, false, nil
|
||||
}
|
||||
|
||||
return r.Store.Delete(ctx, name, options)
|
||||
}
|
||||
|
||||
// NewStatusREST makes a RESTStorage for status that has more limited options.
|
||||
// It is based on the original REST so that we can share the same underlying store
|
||||
func NewStatusREST(scheme *runtime.Scheme, rest *REST) *StatusREST {
|
||||
|
@ -89,7 +89,6 @@ func (statusStrategy) PrepareForUpdate(ctx genericapirequest.Context, obj, old r
|
||||
newObj.Spec = oldObj.Spec
|
||||
newObj.Labels = oldObj.Labels
|
||||
newObj.Annotations = oldObj.Annotations
|
||||
newObj.Finalizers = oldObj.Finalizers
|
||||
newObj.OwnerReferences = oldObj.OwnerReferences
|
||||
}
|
||||
|
||||
|
@ -271,11 +271,6 @@ func TestDeRegistrationAndReRegistration(t *testing.T) {
|
||||
if _, err := instantiateCustomResource(t, testserver.NewNoxuInstance(ns, sameInstanceName), noxuNamespacedResourceClient, noxuDefinition); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
// Remove sameInstanceName since at the moment there's no finalizers.
|
||||
// TODO: as soon finalizers will be implemented Delete can be removed.
|
||||
if err := noxuNamespacedResourceClient.Delete(sameInstanceName, nil); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := testserver.DeleteCustomResourceDefinition(noxuDefinition, apiExtensionClient); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user