mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-11 13:02:14 +00:00
test/integration/garbagecollector: add test TestCascadingDeleteOnCRDConversionFailure which tests that GC controller cannot be blocked by a bad conversion webhook
Signed-off-by: haorenfsa <haorenfsa@gmail.com> Co-authored-by: Andrew Sy Kim <andrewsy@google.com>
This commit is contained in:
parent
e8b1d7dc24
commit
d4fdfaf17d
@ -24,6 +24,7 @@ import (
|
||||
"reflect"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@ -49,6 +50,7 @@ import (
|
||||
"k8s.io/apimachinery/pkg/util/json"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
"k8s.io/apimachinery/pkg/util/strategicpatch"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/client-go/discovery"
|
||||
"k8s.io/client-go/informers"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
@ -60,9 +62,11 @@ import (
|
||||
clientgotesting "k8s.io/client-go/testing"
|
||||
"k8s.io/client-go/tools/record"
|
||||
"k8s.io/client-go/util/workqueue"
|
||||
metricsutil "k8s.io/component-base/metrics/testutil"
|
||||
"k8s.io/controller-manager/pkg/informerfactory"
|
||||
"k8s.io/kubernetes/pkg/api/legacyscheme"
|
||||
c "k8s.io/kubernetes/pkg/controller"
|
||||
"k8s.io/kubernetes/pkg/controller/garbagecollector/metrics"
|
||||
"k8s.io/kubernetes/test/utils/ktesting"
|
||||
)
|
||||
|
||||
@ -846,7 +850,6 @@ func TestGarbageCollectorSync(t *testing.T) {
|
||||
PreferredResources: serverResources,
|
||||
Error: nil,
|
||||
Lock: sync.Mutex{},
|
||||
InterfaceUsedCount: 0,
|
||||
}
|
||||
|
||||
testHandler := &fakeActionHandler{
|
||||
@ -865,7 +868,24 @@ func TestGarbageCollectorSync(t *testing.T) {
|
||||
},
|
||||
},
|
||||
}
|
||||
srv, clientConfig := testServerAndClientConfig(testHandler.ServeHTTP)
|
||||
|
||||
testHandler2 := &fakeActionHandler{
|
||||
response: map[string]FakeResponse{
|
||||
"GET" + "/api/v1/secrets": {
|
||||
200,
|
||||
[]byte("{}"),
|
||||
},
|
||||
},
|
||||
}
|
||||
var secretSyncOK atomic.Bool
|
||||
var alternativeTestHandler = func(response http.ResponseWriter, request *http.Request) {
|
||||
if request.URL.Path == "/api/v1/secrets" && secretSyncOK.Load() {
|
||||
testHandler2.ServeHTTP(response, request)
|
||||
return
|
||||
}
|
||||
testHandler.ServeHTTP(response, request)
|
||||
}
|
||||
srv, clientConfig := testServerAndClientConfig(alternativeTestHandler)
|
||||
defer srv.Close()
|
||||
clientConfig.ContentConfig.NegotiatedSerializer = nil
|
||||
client, err := kubernetes.NewForConfig(clientConfig)
|
||||
@ -885,7 +905,7 @@ func TestGarbageCollectorSync(t *testing.T) {
|
||||
|
||||
sharedInformers := informers.NewSharedInformerFactory(client, 0)
|
||||
|
||||
tCtx := ktesting.Init(t)
|
||||
logger, tCtx := ktesting.NewTestContext(t)
|
||||
defer tCtx.Cancel("test has completed")
|
||||
alwaysStarted := make(chan struct{})
|
||||
close(alwaysStarted)
|
||||
@ -913,30 +933,49 @@ func TestGarbageCollectorSync(t *testing.T) {
|
||||
|
||||
// Wait until the sync discovers the initial resources
|
||||
time.Sleep(1 * time.Second)
|
||||
|
||||
err = expectSyncNotBlocked(fakeDiscoveryClient)
|
||||
if err != nil {
|
||||
t.Fatalf("Expected garbagecollector.Sync to still be running but it is blocked: %v", err)
|
||||
}
|
||||
assertMonitors(t, gc, "pods", "deployments")
|
||||
|
||||
// Simulate the discovery client returning an error
|
||||
fakeDiscoveryClient.setPreferredResources(nil, fmt.Errorf("error calling discoveryClient.ServerPreferredResources()"))
|
||||
|
||||
// Wait until sync discovers the change
|
||||
time.Sleep(1 * time.Second)
|
||||
// No monitor changes
|
||||
assertMonitors(t, gc, "pods", "deployments")
|
||||
|
||||
// Remove the error from being returned and see if the garbage collector sync is still working
|
||||
fakeDiscoveryClient.setPreferredResources(serverResources, nil)
|
||||
time.Sleep(1 * time.Second)
|
||||
|
||||
err = expectSyncNotBlocked(fakeDiscoveryClient)
|
||||
if err != nil {
|
||||
t.Fatalf("Expected garbagecollector.Sync to still be running but it is blocked: %v", err)
|
||||
}
|
||||
assertMonitors(t, gc, "pods", "deployments")
|
||||
|
||||
// Simulate the discovery client returning a resource the restmapper can resolve, but will not sync caches
|
||||
fakeDiscoveryClient.setPreferredResources(unsyncableServerResources, nil)
|
||||
|
||||
// Wait until sync discovers the change
|
||||
time.Sleep(1 * time.Second)
|
||||
assertMonitors(t, gc, "pods", "secrets")
|
||||
|
||||
// Put the resources back to normal and ensure garbage collector sync recovers
|
||||
fakeDiscoveryClient.setPreferredResources(serverResources, nil)
|
||||
time.Sleep(1 * time.Second)
|
||||
|
||||
err = expectSyncNotBlocked(fakeDiscoveryClient)
|
||||
if err != nil {
|
||||
t.Fatalf("Expected garbagecollector.Sync to still be running but it is blocked: %v", err)
|
||||
}
|
||||
assertMonitors(t, gc, "pods", "deployments")
|
||||
|
||||
// Partial discovery failure
|
||||
fakeDiscoveryClient.setPreferredResources(unsyncableServerResources, appsV1Error)
|
||||
// Wait until sync discovers the change
|
||||
time.Sleep(1 * time.Second)
|
||||
// Deployments monitor kept
|
||||
assertMonitors(t, gc, "pods", "deployments", "secrets")
|
||||
@ -945,35 +984,33 @@ func TestGarbageCollectorSync(t *testing.T) {
|
||||
fakeDiscoveryClient.setPreferredResources(serverResources, nil)
|
||||
// Wait until sync discovers the change
|
||||
time.Sleep(1 * time.Second)
|
||||
err = expectSyncNotBlocked(fakeDiscoveryClient)
|
||||
if err != nil {
|
||||
t.Fatalf("Expected garbagecollector.Sync to still be running but it is blocked: %v", err)
|
||||
}
|
||||
// Unsyncable monitor removed
|
||||
assertMonitors(t, gc, "pods", "deployments")
|
||||
|
||||
// Add fake controller simulate the initial not-synced informer which will be synced at the end.
|
||||
fc := fakeController{}
|
||||
gc.dependencyGraphBuilder.monitors[schema.GroupVersionResource{
|
||||
Version: "v1",
|
||||
Resource: "secrets",
|
||||
}] = &monitor{controller: &fc}
|
||||
if gc.IsSynced(logger) {
|
||||
t.Fatal("cache from garbage collector should not be synced")
|
||||
}
|
||||
|
||||
// Simulate initial not-synced informer which will be synced at the end.
|
||||
metrics.GarbageCollectorResourcesSyncError.Reset()
|
||||
fakeDiscoveryClient.setPreferredResources(unsyncableServerResources, nil)
|
||||
time.Sleep(1 * time.Second)
|
||||
assertMonitors(t, gc, "pods", "secrets")
|
||||
|
||||
// The informer is synced now.
|
||||
fc.SetSynced(true)
|
||||
time.Sleep(1 * time.Second)
|
||||
assertMonitors(t, gc, "pods", "secrets")
|
||||
|
||||
if !gc.IsSynced(logger) {
|
||||
t.Fatal("cache from garbage collector should be synced")
|
||||
if gc.IsSynced(logger) {
|
||||
t.Fatal("cache from garbage collector should not be synced")
|
||||
}
|
||||
val, _ := metricsutil.GetCounterMetricValue(metrics.GarbageCollectorResourcesSyncError)
|
||||
if val < 1 {
|
||||
t.Fatalf("expect sync error metric > 0")
|
||||
}
|
||||
|
||||
fakeDiscoveryClient.setPreferredResources(serverResources, nil)
|
||||
time.Sleep(1 * time.Second)
|
||||
assertMonitors(t, gc, "pods", "deployments")
|
||||
// The informer is synced now.
|
||||
secretSyncOK.Store(true)
|
||||
if err := wait.PollUntilContextTimeout(tCtx, time.Second, wait.ForeverTestTimeout, true, func(ctx context.Context) (bool, error) {
|
||||
return gc.IsSynced(logger), nil
|
||||
}); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
func assertMonitors(t *testing.T, gc *GarbageCollector, resources ...string) {
|
||||
@ -988,6 +1025,17 @@ func assertMonitors(t *testing.T, gc *GarbageCollector, resources ...string) {
|
||||
}
|
||||
}
|
||||
|
||||
func expectSyncNotBlocked(fakeDiscoveryClient *fakeServerResources) error {
|
||||
before := fakeDiscoveryClient.getInterfaceUsedCount()
|
||||
t := 1 * time.Second
|
||||
time.Sleep(t)
|
||||
after := fakeDiscoveryClient.getInterfaceUsedCount()
|
||||
if before == after {
|
||||
return fmt.Errorf("discoveryClient.ServerPreferredResources() not called over %v", t)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
type fakeServerResources struct {
|
||||
PreferredResources []*metav1.APIResourceList
|
||||
Error error
|
||||
@ -1017,6 +1065,12 @@ func (f *fakeServerResources) setPreferredResources(resources []*metav1.APIResou
|
||||
f.Error = err
|
||||
}
|
||||
|
||||
func (f *fakeServerResources) getInterfaceUsedCount() int {
|
||||
f.Lock.Lock()
|
||||
defer f.Lock.Unlock()
|
||||
return f.InterfaceUsedCount
|
||||
}
|
||||
|
||||
func (*fakeServerResources) ServerPreferredNamespacedResources() ([]*metav1.APIResourceList, error) {
|
||||
return nil, nil
|
||||
}
|
||||
@ -2754,28 +2808,6 @@ func assertState(s state) step {
|
||||
|
||||
}
|
||||
|
||||
type fakeController struct {
|
||||
synced bool
|
||||
lock sync.Mutex
|
||||
}
|
||||
|
||||
func (f *fakeController) Run(stopCh <-chan struct{}) {
|
||||
}
|
||||
|
||||
func (f *fakeController) HasSynced() bool {
|
||||
return f.synced
|
||||
}
|
||||
|
||||
func (f *fakeController) SetSynced(synced bool) {
|
||||
f.lock.Lock()
|
||||
defer f.lock.Unlock()
|
||||
f.synced = synced
|
||||
}
|
||||
|
||||
func (f *fakeController) LastSyncResourceVersion() string {
|
||||
return ""
|
||||
}
|
||||
|
||||
// trackingWorkqueue implements RateLimitingInterface,
|
||||
// allows introspection of the items in the queue,
|
||||
// and treats AddAfter and AddRateLimited the same as Add
|
||||
|
@ -81,6 +81,29 @@ const oneValidOwnerPodName = "test.pod.3"
|
||||
const toBeDeletedRCName = "test.rc.1"
|
||||
const remainingRCName = "test.rc.2"
|
||||
|
||||
// testCert was generated from crypto/tls/generate_cert.go with the following command:
|
||||
//
|
||||
// go run generate_cert.go --rsa-bits 2048 --host 127.0.0.1,::1,example.com --ca --start-date "Jan 1 00:00:00 1970" --duration=1000000h
|
||||
var testCert = []byte(`-----BEGIN CERTIFICATE-----
|
||||
MIIDGDCCAgCgAwIBAgIQTKCKn99d5HhQVCLln2Q+eTANBgkqhkiG9w0BAQsFADAS
|
||||
MRAwDgYDVQQKEwdBY21lIENvMCAXDTcwMDEwMTAwMDAwMFoYDzIwODQwMTI5MTYw
|
||||
MDAwWjASMRAwDgYDVQQKEwdBY21lIENvMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8A
|
||||
MIIBCgKCAQEA1Z5/aTwqY706M34tn60l8ZHkanWDl8mM1pYf4Q7qg3zA9XqWLX6S
|
||||
4rTYDYCb4stEasC72lQnbEWHbthiQE76zubP8WOFHdvGR3mjAvHWz4FxvLOTheZ+
|
||||
3iDUrl6Aj9UIsYqzmpBJAoY4+vGGf+xHvuukHrVcFqR9ZuBdZuJ/HbbjUyuNr3X9
|
||||
erNIr5Ha17gVzf17SNbYgNrX9gbCeEB8Z9Ox7dVuJhLDkpF0T/B5Zld3BjyUVY/T
|
||||
cukU4dTVp6isbWPvCMRCZCCOpb+qIhxEjJ0n6tnPt8nf9lvDl4SWMl6X1bH+2EFa
|
||||
a8R06G0QI+XhwPyjXUyCR8QEOZPCR5wyqQIDAQABo2gwZjAOBgNVHQ8BAf8EBAMC
|
||||
AqQwEwYDVR0lBAwwCgYIKwYBBQUHAwEwDwYDVR0TAQH/BAUwAwEB/zAuBgNVHREE
|
||||
JzAlggtleGFtcGxlLmNvbYcEfwAAAYcQAAAAAAAAAAAAAAAAAAAAATANBgkqhkiG
|
||||
9w0BAQsFAAOCAQEAThqgJ/AFqaANsOp48lojDZfZBFxJQ3A4zfR/MgggUoQ9cP3V
|
||||
rxuKAFWQjze1EZc7J9iO1WvH98lOGVNRY/t2VIrVoSsBiALP86Eew9WucP60tbv2
|
||||
8/zsBDSfEo9Wl+Q/gwdEh8dgciUKROvCm76EgAwPGicMAgRsxXgwXHhS5e8nnbIE
|
||||
Ewaqvb5dY++6kh0Oz+adtNT5OqOwXTIRI67WuEe6/B3Z4LNVPQDIj7ZUJGNw8e6L
|
||||
F4nkUthwlKx4yEJHZBRuFPnO7Z81jNKuwL276+mczRH7piI6z9uyMV/JbEsOIxyL
|
||||
W6CzB7pZ9Nj1YLpgzc1r6oONHLokMJJIz/IvkQ==
|
||||
-----END CERTIFICATE-----`)
|
||||
|
||||
func newPod(podName, podNamespace string, ownerReferences []metav1.OwnerReference) *v1.Pod {
|
||||
for i := 0; i < len(ownerReferences); i++ {
|
||||
if len(ownerReferences[i].Kind) == 0 {
|
||||
@ -252,6 +275,7 @@ func setupWithServer(t *testing.T, result *kubeapiservertesting.TestServer, work
|
||||
logger := tCtx.Logger()
|
||||
alwaysStarted := make(chan struct{})
|
||||
close(alwaysStarted)
|
||||
|
||||
gc, err := garbagecollector.NewGarbageCollector(
|
||||
tCtx,
|
||||
clientSet,
|
||||
@ -1285,3 +1309,119 @@ func testCRDDeletion(t *testing.T, ctx *testContext, ns *v1.Namespace, definitio
|
||||
t.Fatalf("failed waiting for dependent %q (owned by %q) to be deleted", dependent.GetName(), owner.GetName())
|
||||
}
|
||||
}
|
||||
|
||||
// TestCascadingDeleteOnCRDConversionFailure tests that a bad conversion webhook cannot block the entire GC controller.
|
||||
// Historically, a cache sync failure from a single resource prevented GC controller from running. This test creates
|
||||
// a CRD, updates the storage version with a bad conversion webhook and then runs a simple cascading delete test.
|
||||
func TestCascadingDeleteOnCRDConversionFailure(t *testing.T) {
|
||||
ctx := setup(t, 0)
|
||||
defer ctx.tearDown()
|
||||
gc, apiExtensionClient, dynamicClient, clientSet := ctx.gc, ctx.apiExtensionClient, ctx.dynamicClient, ctx.clientSet
|
||||
|
||||
ns := createNamespaceOrDie("gc-cache-sync-fail", clientSet, t)
|
||||
defer deleteNamespaceOrDie(ns.Name, clientSet, t)
|
||||
|
||||
// Create a CRD with storage/serving version v1beta2. Then update the CRD with v1 as the storage version
|
||||
// and an invalid conversion webhook. This should result in cache sync failures for the CRD from the GC controller.
|
||||
def, dc := createRandomCustomResourceDefinition(t, apiExtensionClient, dynamicClient, ns.Name)
|
||||
_, err := dc.Create(context.TODO(), newCRDInstance(def, ns.Name, names.SimpleNameGenerator.GenerateName("test")), metav1.CreateOptions{})
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create custom resource: %v", err)
|
||||
}
|
||||
|
||||
def, err = apiExtensionClient.ApiextensionsV1().CustomResourceDefinitions().Get(context.TODO(), def.Name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to get custom resource: %v", err)
|
||||
}
|
||||
|
||||
newDefinition := def.DeepCopy()
|
||||
newDefinition.Spec.Conversion = &apiextensionsv1.CustomResourceConversion{
|
||||
Strategy: apiextensionsv1.WebhookConverter,
|
||||
Webhook: &apiextensionsv1.WebhookConversion{
|
||||
ClientConfig: &apiextensionsv1.WebhookClientConfig{
|
||||
Service: &apiextensionsv1.ServiceReference{
|
||||
Name: "foobar",
|
||||
Namespace: ns.Name,
|
||||
},
|
||||
CABundle: testCert,
|
||||
},
|
||||
ConversionReviewVersions: []string{
|
||||
"v1", "v1beta1",
|
||||
},
|
||||
},
|
||||
}
|
||||
newDefinition.Spec.Versions = []apiextensionsv1.CustomResourceDefinitionVersion{
|
||||
{
|
||||
Name: "v1",
|
||||
Served: true,
|
||||
Storage: true,
|
||||
Schema: apiextensionstestserver.AllowAllSchema(),
|
||||
},
|
||||
{
|
||||
Name: "v1beta1",
|
||||
Served: true,
|
||||
Storage: false,
|
||||
Schema: apiextensionstestserver.AllowAllSchema(),
|
||||
},
|
||||
}
|
||||
|
||||
_, err = apiExtensionClient.ApiextensionsV1().CustomResourceDefinitions().Update(context.TODO(), newDefinition, metav1.UpdateOptions{})
|
||||
if err != nil {
|
||||
t.Fatalf("Error updating CRD with conversion webhook: %v", err)
|
||||
}
|
||||
|
||||
ctx.startGC(5)
|
||||
|
||||
rcClient := clientSet.CoreV1().ReplicationControllers(ns.Name)
|
||||
podClient := clientSet.CoreV1().Pods(ns.Name)
|
||||
|
||||
toBeDeletedRC, err := rcClient.Create(context.TODO(), newOwnerRC(toBeDeletedRCName, ns.Name), metav1.CreateOptions{})
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create replication controller: %v", err)
|
||||
}
|
||||
|
||||
rcs, err := rcClient.List(context.TODO(), metav1.ListOptions{})
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to list replication controllers: %v", err)
|
||||
}
|
||||
if len(rcs.Items) != 1 {
|
||||
t.Fatalf("Expect only 1 replication controller")
|
||||
}
|
||||
|
||||
pod := newPod(garbageCollectedPodName, ns.Name, []metav1.OwnerReference{{UID: toBeDeletedRC.ObjectMeta.UID, Name: toBeDeletedRCName}})
|
||||
_, err = podClient.Create(context.TODO(), pod, metav1.CreateOptions{})
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create Pod: %v", err)
|
||||
}
|
||||
|
||||
pods, err := podClient.List(context.TODO(), metav1.ListOptions{})
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to list pods: %v", err)
|
||||
}
|
||||
if len(pods.Items) != 1 {
|
||||
t.Fatalf("Expect only 1 pods")
|
||||
}
|
||||
|
||||
if err := rcClient.Delete(context.TODO(), toBeDeletedRCName, getNonOrphanOptions()); err != nil {
|
||||
t.Fatalf("failed to delete replication controller: %v", err)
|
||||
}
|
||||
|
||||
// sometimes the deletion of the RC takes long time to be observed by
|
||||
// the gc, so wait for the garbage collector to observe the deletion of
|
||||
// the toBeDeletedRC
|
||||
if err := wait.PollUntilContextTimeout(context.Background(), 1*time.Second, 60*time.Second, true, func(ctx context.Context) (bool, error) {
|
||||
return !gc.GraphHasUID(toBeDeletedRC.ObjectMeta.UID), nil
|
||||
}); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := integration.WaitForPodToDisappear(podClient, garbageCollectedPodName, 1*time.Second, 30*time.Second); err != nil {
|
||||
t.Fatalf("expect pod %s to be garbage collected, got err= %v", garbageCollectedPodName, err)
|
||||
}
|
||||
|
||||
// Check that the cache is still not synced after cascading delete succeeded
|
||||
// If this check passes, check that the conversion webhook is correctly misconfigured
|
||||
// to prevent watch cache from listing the CRD.
|
||||
if ctx.gc.IsSynced(ctx.logger) {
|
||||
t.Fatal("cache is not expected to be synced due to bad conversion webhook")
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user