Merge pull request #116205 from justinsb/basic_prune_test_option_2

prunev2: Basic pruning logic
This commit is contained in:
Kubernetes Prow Robot 2023-03-13 15:23:09 -07:00 committed by GitHub
commit 4fbfe11b89
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 748 additions and 41 deletions

View File

@ -1011,6 +1011,7 @@ func (o *ApplyOptions) MarkObjectVisited(info *resource.Info) error {
return err
}
o.VisitedUids.Insert(metadata.GetUID())
return nil
}
@ -1029,7 +1030,10 @@ func (o *ApplyOptions) PrintAndPrunePostProcessor() func() error {
if o.Prune {
if cmdutil.ApplySet.IsEnabled() && o.ApplySet != nil {
pruner := newApplySetPruner(o)
pruner, err := newApplySetPruner(o)
if err != nil {
return err
}
if err := pruner.pruneAll(ctx, o.ApplySet); err != nil {
// Do not update the ApplySet. If pruning failed, we want to keep the superset
// of the previous and current resources in the ApplySet, so that the pruning

View File

@ -27,6 +27,7 @@ import (
"path/filepath"
"strings"
"testing"
"time"
"github.com/google/go-cmp/cmp"
"github.com/spf13/cobra"
@ -2210,15 +2211,16 @@ func TestLoadObjects(t *testing.T) {
defer f.Cleanup()
f.Client = &fake.RESTClient{}
testdirs := []string{"testdata/prune/simple"}
for _, testdir := range testdirs {
t.Run(testdir, func(t *testing.T) {
testFiles := []string{"testdata/prune/simple/manifest1", "testdata/prune/simple/manifest2"}
for _, testFile := range testFiles {
t.Run(testFile, func(t *testing.T) {
cmdtesting.WithAlphaEnvs([]cmdutil.FeatureGate{cmdutil.ApplySet}, t, func(t *testing.T) {
cmd := &cobra.Command{}
flags := NewApplyFlags(genericclioptions.NewTestIOStreamsDiscard())
flags.AddFlags(cmd)
cmd.Flags().Set("filename", filepath.Join(testdir, "manifest1.yaml"))
cmd.Flags().Set("applyset", filepath.Base(testdir))
cmd.Flags().Set("filename", testFile+".yaml")
cmd.Flags().Set("applyset", filepath.Base(filepath.Dir(testFile)))
cmd.Flags().Set("prune", "true")
o, err := flags.ToOptions(f, cmd, "kubectl", []string{})
@ -2246,7 +2248,7 @@ func TestLoadObjects(t *testing.T) {
}
got := strings.Join(objectYAMLs, "\n---\n\n")
p := filepath.Join(testdir, "expected-manifest1-getobjects.yaml")
p := testFile + "-expected-getobjects.yaml"
wantBytes, err := os.ReadFile(p)
if err != nil {
t.Fatalf("error reading file %q: %v", p, err)
@ -2261,9 +2263,11 @@ func TestLoadObjects(t *testing.T) {
}
func TestApplySetParentManagement(t *testing.T) {
// TODO: replace with cmdtesting.InitTestErrorHandler() when the feature is fully implemented
cmdutil.BehaviorOnFatal(func(s string, i int) {
if s != "error: ApplySet-based pruning is not yet implemented" {
switch s {
case `error: pruning /v1, Kind=ReplicationController objects: deleting test/test-rc: an error on the server ("") has prevented the request from succeeding`:
t.Logf("got expected error %q", s)
default:
t.Fatalf("unexpected exit %d: %s", i, s)
}
})
@ -2281,29 +2285,129 @@ func TestApplySetParentManagement(t *testing.T) {
pathRC: rc,
}
scheme := runtime.NewScheme()
listMapping := map[schema.GroupVersionResource]string{
{Group: "", Version: "v1", Resource: "services"}: "ServiceList",
{Group: "", Version: "v1", Resource: "replicationcontrollers"}: "ReplicationControllerList",
}
fakeDynamicClient := dynamicfakeclient.NewSimpleDynamicClientWithCustomListKinds(scheme, listMapping)
tf.FakeDynamicClient = fakeDynamicClient
failDeletes := false
fakeDynamicClient.PrependReactor("delete", "*", func(action testing2.Action) (handled bool, ret runtime.Object, err error) {
if failDeletes {
return true, nil, fmt.Errorf("an error on the server (\"\") has prevented the request from succeeding")
}
return false, nil, nil
})
tf.Client = &fake.RESTClient{
NegotiatedSerializer: resource.UnstructuredPlusDefaultContentConfig().NegotiatedSerializer,
Client: fake.CreateHTTPClient(func(req *http.Request) (*http.Response, error) {
switch req.Method {
case "GET":
data, ok := serverSideData[req.URL.Path]
if !ok {
return &http.Response{StatusCode: http.StatusNotFound, Header: cmdtesting.DefaultHeader(), Body: io.NopCloser(bytes.NewReader(nil))}, nil
}
return &http.Response{StatusCode: http.StatusOK, Header: cmdtesting.DefaultHeader(), Body: io.NopCloser(bytes.NewReader(data))}, nil
case "PATCH":
if got := req.Header.Get("Content-Type"); got == string(types.ApplyPatchType) {
// crudely save the patch data as the new object and return it
serverSideData[req.URL.Path], _ = io.ReadAll(req.Body)
return &http.Response{StatusCode: http.StatusOK, Header: cmdtesting.DefaultHeader(), Body: io.NopCloser(bytes.NewReader(serverSideData[req.URL.Path]))}, nil
} else {
t.Fatalf("unexpected content-type: %s\n", got)
if req.URL.Path == "/namespaces/test/secrets/mySet" {
switch req.Method {
case "GET":
data, ok := serverSideData[req.URL.Path]
if !ok {
return &http.Response{StatusCode: http.StatusNotFound, Header: cmdtesting.DefaultHeader(), Body: io.NopCloser(bytes.NewReader(nil))}, nil
}
return &http.Response{StatusCode: http.StatusOK, Header: cmdtesting.DefaultHeader(), Body: io.NopCloser(bytes.NewReader(data))}, nil
case "PATCH":
if got := req.Header.Get("Content-Type"); got == string(types.ApplyPatchType) {
// crudely save the patch data as the new object and return it
serverSideData[req.URL.Path], _ = io.ReadAll(req.Body)
return &http.Response{StatusCode: http.StatusOK, Header: cmdtesting.DefaultHeader(), Body: io.NopCloser(bytes.NewReader(serverSideData[req.URL.Path]))}, nil
} else {
t.Fatalf("unexpected content-type: %s\n", got)
return nil, nil
}
default:
t.Fatalf("unexpected request: %#v\n%#v", req.URL, req)
return nil, nil
}
default:
t.Fatalf("unexpected request: %#v\n%#v", req.URL, req)
return nil, nil
}
method := req.Method
tokens := strings.Split(strings.TrimPrefix(req.URL.Path, "/"), "/")
if len(tokens) == 4 && tokens[0] == "namespaces" && method == "GET" {
namespace := tokens[1]
name := tokens[3]
gvr := schema.GroupVersionResource{Version: "v1", Resource: tokens[2]}
obj, err := fakeDynamicClient.Tracker().Get(gvr, namespace, name)
if err != nil {
if apierrors.IsNotFound(err) {
return &http.Response{StatusCode: http.StatusNotFound, Header: cmdtesting.DefaultHeader()}, nil
}
t.Fatalf("error getting object: %v", err)
}
return &http.Response{StatusCode: http.StatusOK, Header: cmdtesting.DefaultHeader(), Body: cmdtesting.ObjBody(codec, obj)}, nil
}
if len(tokens) == 4 && tokens[0] == "namespaces" && method == "PATCH" {
namespace := tokens[1]
name := tokens[3]
gvr := schema.GroupVersionResource{Version: "v1", Resource: tokens[2]}
var existing *unstructured.Unstructured
existingObj, err := fakeDynamicClient.Tracker().Get(gvr, namespace, name)
if err != nil {
if !apierrors.IsNotFound(err) {
t.Fatalf("error getting object: %v", err)
}
} else {
existing = existingObj.(*unstructured.Unstructured)
}
data, err := io.ReadAll(req.Body)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
patch := &unstructured.Unstructured{}
if err := runtime.DecodeInto(codec, data, patch); err != nil {
t.Fatalf("unexpected error: %v", err)
}
var returnData []byte
if existing == nil {
uid := types.UID(fmt.Sprintf("%v", time.Now().UnixNano()))
patch.SetUID(uid)
if err := fakeDynamicClient.Tracker().Create(gvr, patch, namespace); err != nil {
t.Fatalf("error creating object: %v", err)
}
b, err := json.Marshal(patch)
if err != nil {
t.Fatalf("error marshalling response: %v", err)
}
returnData = b
} else {
uid := existing.GetUID()
patch.DeepCopyInto(existing)
existing.SetUID(uid)
if err := fakeDynamicClient.Tracker().Update(gvr, existing, namespace); err != nil {
t.Fatalf("error updating object: %v", err)
}
b, err := json.Marshal(existing)
if err != nil {
t.Fatalf("error marshalling response: %v", err)
}
returnData = b
}
return &http.Response{StatusCode: http.StatusOK, Header: cmdtesting.DefaultHeader(), Body: io.NopCloser(bytes.NewReader(returnData))}, nil
}
t.Fatalf("unexpected request: %#v\n%#v", req.URL, req)
return nil, nil
}),
}
@ -2368,7 +2472,7 @@ metadata:
// Next, do an apply that attempts to remove the rc from the set, but pruning fails
// Both types remain in the ApplySet
// TODO: this case will need to be updated to fail the deletion request once pruning works
failDeletes = true
outbuff.Reset()
errbuff.Reset()
cmdtesting.WithAlphaEnvs([]cmdutil.FeatureGate{cmdutil.ApplySet}, t, func(t *testing.T) {
@ -2398,7 +2502,35 @@ metadata:
`, string(updatedSecret))
// Finally, do an apply that successfully removes the rc and updates the set
// TODO: add this part once pruning can work
failDeletes = false
outbuff.Reset()
errbuff.Reset()
cmdtesting.WithAlphaEnvs([]cmdutil.FeatureGate{cmdutil.ApplySet}, t, func(t *testing.T) {
cmd := NewCmdApply("kubectl", tf, ioStreams)
cmd.Flags().Set("filename", filenameSVC)
cmd.Flags().Set("server-side", "true")
cmd.Flags().Set("applyset", nameParentSecret)
cmd.Flags().Set("prune", "true")
cmd.Run(cmd, []string{})
})
assert.Equal(t, "service/test-service serverside-applied\nreplicationcontroller/test-rc pruned\n", outbuff.String())
assert.Equal(t, "", errbuff.String())
updatedSecret, err = yaml.JSONToYAML(serverSideData[pathSecret])
require.NoError(t, err)
require.Equal(t, `apiVersion: v1
kind: Secret
metadata:
annotations:
applyset.k8s.io/additional-namespaces: ""
applyset.k8s.io/contains-group-resources: services
applyset.k8s.io/tooling: kubectl/v0.0.0-master+$Format:%H$
creationTimestamp: null
labels:
applyset.k8s.io/id: placeholder-todo
name: mySet
namespace: test
`, string(updatedSecret))
}
func TestApplySetInvalidLiveParent(t *testing.T) {
@ -2519,13 +2651,208 @@ func TestApplySetInvalidLiveParent(t *testing.T) {
}
}
func TestApplyWithPruneV2(t *testing.T) {
testdirs := []string{"testdata/prune/simple"}
for _, testdir := range testdirs {
t.Run(testdir, func(t *testing.T) {
tf := cmdtesting.NewTestFactory().WithNamespace("test")
defer tf.Cleanup()
codec := scheme.Codecs.LegacyCodec(scheme.Scheme.PrioritizedVersionsAllGroups()...)
scheme := runtime.NewScheme()
listMapping := map[schema.GroupVersionResource]string{
{Group: "", Version: "v1", Resource: "namespaces"}: "NamespaceList",
}
fakeDynamicClient := dynamicfakeclient.NewSimpleDynamicClientWithCustomListKinds(scheme, listMapping)
tf.FakeDynamicClient = fakeDynamicClient
tf.UnstructuredClient = &fake.RESTClient{
NegotiatedSerializer: resource.UnstructuredPlusDefaultContentConfig().NegotiatedSerializer,
Client: fake.CreateHTTPClient(func(req *http.Request) (*http.Response, error) {
method := req.Method
tokens := strings.Split(strings.TrimPrefix(req.URL.Path, "/"), "/")
if len(tokens) == 2 && tokens[0] == "namespaces" && method == "GET" {
name := tokens[1]
gvr := schema.GroupVersionResource{Version: "v1", Resource: "namespaces"}
ns, err := fakeDynamicClient.Tracker().Get(gvr, "", name)
if err != nil {
if apierrors.IsNotFound(err) {
return &http.Response{StatusCode: http.StatusNotFound, Header: cmdtesting.DefaultHeader()}, nil
}
t.Fatalf("error getting object: %v", err)
}
return &http.Response{StatusCode: http.StatusOK, Header: cmdtesting.DefaultHeader(), Body: cmdtesting.ObjBody(codec, ns)}, nil
}
if len(tokens) == 4 && tokens[0] == "namespaces" && tokens[2] == "secrets" && method == "GET" {
namespace := tokens[1]
name := tokens[3]
gvr := schema.GroupVersionResource{Version: "v1", Resource: "secrets"}
obj, err := fakeDynamicClient.Tracker().Get(gvr, namespace, name)
if err != nil {
if apierrors.IsNotFound(err) {
return &http.Response{StatusCode: http.StatusNotFound, Header: cmdtesting.DefaultHeader()}, nil
}
t.Fatalf("error getting object: %v", err)
}
return &http.Response{StatusCode: http.StatusOK, Header: cmdtesting.DefaultHeader(), Body: cmdtesting.ObjBody(codec, obj)}, nil
}
if len(tokens) == 4 && tokens[0] == "namespaces" && tokens[2] == "secrets" && method == "PATCH" {
namespace := tokens[1]
name := tokens[3]
gvr := schema.GroupVersionResource{Version: "v1", Resource: "secrets"}
var existing *unstructured.Unstructured
existingObj, err := fakeDynamicClient.Tracker().Get(gvr, namespace, name)
if err != nil {
if !apierrors.IsNotFound(err) {
t.Fatalf("error getting object: %v", err)
}
} else {
existing = existingObj.(*unstructured.Unstructured)
}
data, err := io.ReadAll(req.Body)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
patch := &unstructured.Unstructured{}
if err := runtime.DecodeInto(codec, data, patch); err != nil {
t.Fatalf("unexpected error: %v", err)
}
var returnData []byte
if existing == nil {
uid := types.UID(fmt.Sprintf("%v", time.Now().UnixNano()))
patch.SetUID(uid)
if err := fakeDynamicClient.Tracker().Create(gvr, patch, namespace); err != nil {
t.Fatalf("error creating object: %v", err)
}
b, err := json.Marshal(patch)
if err != nil {
t.Fatalf("error marshalling response: %v", err)
}
returnData = b
} else {
patch.DeepCopyInto(existing)
if err := fakeDynamicClient.Tracker().Update(gvr, existing, namespace); err != nil {
t.Fatalf("error updating object: %v", err)
}
b, err := json.Marshal(existing)
if err != nil {
t.Fatalf("error marshalling response: %v", err)
}
returnData = b
}
return &http.Response{StatusCode: http.StatusOK, Header: cmdtesting.DefaultHeader(), Body: io.NopCloser(bytes.NewReader(returnData))}, nil
}
if len(tokens) == 1 && tokens[0] == "namespaces" && method == "POST" {
data, err := io.ReadAll(req.Body)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
u := &unstructured.Unstructured{}
if err := runtime.DecodeInto(codec, data, u); err != nil {
t.Fatalf("unexpected error: %v", err)
}
name := u.GetName()
ns := u.GetNamespace()
gvr := schema.GroupVersionResource{Version: "v1", Resource: "namespaces"}
existing, err := fakeDynamicClient.Tracker().Get(gvr, ns, name)
if err != nil {
if apierrors.IsNotFound(err) {
existing = nil
} else {
t.Fatalf("error fetching object: %v", err)
}
}
if existing != nil {
return &http.Response{StatusCode: http.StatusConflict, Header: cmdtesting.DefaultHeader()}, nil
}
uid := types.UID(fmt.Sprintf("%v", time.Now().UnixNano()))
u.SetUID(uid)
if err := fakeDynamicClient.Tracker().Create(gvr, u, ns); err != nil {
t.Fatalf("error creating object: %v", err)
}
body := cmdtesting.ObjBody(codec, u)
return &http.Response{StatusCode: http.StatusOK, Header: cmdtesting.DefaultHeader(), Body: body}, nil
}
t.Fatalf("unexpected request: %v %v\n%#v", req.Method, req.URL, req)
return nil, nil
}),
}
tf.Client = tf.UnstructuredClient
cmdtesting.WithAlphaEnvs([]cmdutil.FeatureGate{cmdutil.ApplySet}, t, func(t *testing.T) {
manifests := []string{"manifest1", "manifest2"}
for _, manifest := range manifests {
t.Logf("applying manifest %v", manifest)
cmd := &cobra.Command{}
flags := NewApplyFlags(genericclioptions.NewTestIOStreamsDiscard())
flags.AddFlags(cmd)
cmd.Flags().Set("filename", filepath.Join(testdir, manifest+".yaml"))
cmd.Flags().Set("applyset", filepath.Base(testdir))
cmd.Flags().Set("prune", "true")
cmd.Flags().Set("validate", "false")
o, err := flags.ToOptions(tf, cmd, "kubectl", []string{})
if err != nil {
t.Fatalf("unexpected error creating apply options: %v", err)
}
err = o.Validate()
if err != nil {
t.Fatalf("unexpected error from validate: %v", err)
}
var unifiedOutput bytes.Buffer
o.Out = &unifiedOutput
o.ErrOut = &unifiedOutput
if err := o.Run(); err != nil {
t.Errorf("error running apply: %v", err)
}
got := unifiedOutput.String()
p := filepath.Join(testdir, manifest+"-expected-apply.txt")
wantBytes, err := os.ReadFile(p)
if err != nil {
t.Fatalf("error reading file %q: %v", p, err)
}
want := string(wantBytes)
if diff := cmp.Diff(want, got); diff != "" {
t.Errorf("apply output has unexpected diff (-want +got):\n%s", diff)
}
}
})
})
}
}
func TestApplySetUpdateConflictsAreRetried(t *testing.T) {
// TODO: replace with cmdtesting.InitTestErrorHandler() when the feature is fully implemented
cmdutil.BehaviorOnFatal(func(s string, i int) {
if s != "error: ApplySet-based pruning is not yet implemented" {
t.Fatalf("unexpected exit %d: %s", i, s)
}
})
cmdtesting.InitTestErrorHandler(t)
defer cmdutil.DefaultBehaviorOnFatal()
nameParentSecret := "mySet"
@ -2590,3 +2917,233 @@ metadata:
assert.Truef(t, applyReturnedConflict, "test did not simulate a conflict scenario")
assert.Truef(t, appliedWithConflictsForced, "conflicts were never forced")
}
func TestApplyWithPruneV2Fail(t *testing.T) {
tf := cmdtesting.NewTestFactory().WithNamespace("test")
defer tf.Cleanup()
codec := scheme.Codecs.LegacyCodec(scheme.Scheme.PrioritizedVersionsAllGroups()...)
scheme := runtime.NewScheme()
listMapping := map[schema.GroupVersionResource]string{
{Group: "", Version: "v1", Resource: "namespaces"}: "NamespaceList",
}
fakeDynamicClient := dynamicfakeclient.NewSimpleDynamicClientWithCustomListKinds(scheme, listMapping)
tf.FakeDynamicClient = fakeDynamicClient
failDelete := false
fakeDynamicClient.PrependReactor("delete", "*", func(action testing2.Action) (handled bool, ret runtime.Object, err error) {
if failDelete {
return true, nil, fmt.Errorf("an error on the server (\"\") has prevented the request from succeeding")
}
return false, nil, nil
})
tf.UnstructuredClient = &fake.RESTClient{
NegotiatedSerializer: resource.UnstructuredPlusDefaultContentConfig().NegotiatedSerializer,
Client: fake.CreateHTTPClient(func(req *http.Request) (*http.Response, error) {
method := req.Method
tokens := strings.Split(strings.TrimPrefix(req.URL.Path, "/"), "/")
if len(tokens) == 2 && tokens[0] == "namespaces" && method == "GET" {
name := tokens[1]
gvr := schema.GroupVersionResource{Version: "v1", Resource: "namespaces"}
ns, err := fakeDynamicClient.Tracker().Get(gvr, "", name)
if err != nil {
if apierrors.IsNotFound(err) {
return &http.Response{StatusCode: http.StatusNotFound, Header: cmdtesting.DefaultHeader()}, nil
}
t.Fatalf("error getting object: %v", err)
}
return &http.Response{StatusCode: http.StatusOK, Header: cmdtesting.DefaultHeader(), Body: cmdtesting.ObjBody(codec, ns)}, nil
}
if len(tokens) == 4 && tokens[0] == "namespaces" && tokens[2] == "secrets" && method == "GET" {
namespace := tokens[1]
name := tokens[3]
gvr := schema.GroupVersionResource{Version: "v1", Resource: "secrets"}
obj, err := fakeDynamicClient.Tracker().Get(gvr, namespace, name)
if err != nil {
if apierrors.IsNotFound(err) {
return &http.Response{StatusCode: http.StatusNotFound, Header: cmdtesting.DefaultHeader()}, nil
}
t.Fatalf("error getting object: %v", err)
}
return &http.Response{StatusCode: http.StatusOK, Header: cmdtesting.DefaultHeader(), Body: cmdtesting.ObjBody(codec, obj)}, nil
}
if len(tokens) == 4 && tokens[0] == "namespaces" && tokens[2] == "secrets" && method == "PATCH" {
namespace := tokens[1]
name := tokens[3]
gvr := schema.GroupVersionResource{Version: "v1", Resource: "secrets"}
var existing *unstructured.Unstructured
existingObj, err := fakeDynamicClient.Tracker().Get(gvr, namespace, name)
if err != nil {
if !apierrors.IsNotFound(err) {
t.Fatalf("error getting object: %v", err)
}
} else {
existing = existingObj.(*unstructured.Unstructured)
}
data, err := io.ReadAll(req.Body)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
patch := &unstructured.Unstructured{}
if err := runtime.DecodeInto(codec, data, patch); err != nil {
t.Fatalf("unexpected error: %v", err)
}
var returnData []byte
if existing == nil {
uid := types.UID(fmt.Sprintf("%v", time.Now().UnixNano()))
patch.SetUID(uid)
if err := fakeDynamicClient.Tracker().Create(gvr, patch, namespace); err != nil {
t.Fatalf("error creating object: %v", err)
}
b, err := json.Marshal(patch)
if err != nil {
t.Fatalf("error marshalling response: %v", err)
}
returnData = b
} else {
patch.DeepCopyInto(existing)
if err := fakeDynamicClient.Tracker().Update(gvr, existing, namespace); err != nil {
t.Fatalf("error updating object: %v", err)
}
b, err := json.Marshal(existing)
if err != nil {
t.Fatalf("error marshalling response: %v", err)
}
returnData = b
}
return &http.Response{StatusCode: http.StatusOK, Header: cmdtesting.DefaultHeader(), Body: io.NopCloser(bytes.NewReader(returnData))}, nil
}
if len(tokens) == 1 && tokens[0] == "namespaces" && method == "POST" {
data, err := io.ReadAll(req.Body)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
u := &unstructured.Unstructured{}
if err := runtime.DecodeInto(codec, data, u); err != nil {
t.Fatalf("unexpected error: %v", err)
}
name := u.GetName()
ns := u.GetNamespace()
gvr := schema.GroupVersionResource{Version: "v1", Resource: "namespaces"}
existing, err := fakeDynamicClient.Tracker().Get(gvr, ns, name)
if err != nil {
if apierrors.IsNotFound(err) {
existing = nil
} else {
t.Fatalf("error fetching object: %v", err)
}
}
if existing != nil {
return &http.Response{StatusCode: http.StatusConflict, Header: cmdtesting.DefaultHeader()}, nil
}
uid := types.UID(fmt.Sprintf("%v", time.Now().UnixNano()))
u.SetUID(uid)
if err := fakeDynamicClient.Tracker().Create(gvr, u, ns); err != nil {
t.Fatalf("error creating object: %v", err)
}
body := cmdtesting.ObjBody(codec, u)
return &http.Response{StatusCode: http.StatusOK, Header: cmdtesting.DefaultHeader(), Body: body}, nil
}
t.Fatalf("unexpected request: %v %v\n%#v", req.Method, req.URL, req)
return nil, nil
}),
}
tf.Client = tf.UnstructuredClient
testdirs := []string{"testdata/prune/simple"}
for _, testdir := range testdirs {
t.Run(testdir, func(t *testing.T) {
cmdtesting.WithAlphaEnvs([]cmdutil.FeatureGate{cmdutil.ApplySet}, t, func(t *testing.T) {
manifests := []string{"manifest1", "manifest2"}
for i, manifest := range manifests {
if i != 0 {
t.Logf("will inject failures into future delete operations")
failDelete = true
}
t.Logf("applying manifest %v", manifest)
var unifiedOutput bytes.Buffer
ioStreams := genericclioptions.IOStreams{
ErrOut: &unifiedOutput,
Out: &unifiedOutput,
In: bytes.NewBufferString(""),
}
cmdutil.BehaviorOnFatal(fatalNoExit(t, ioStreams))
defer cmdutil.DefaultBehaviorOnFatal()
rootCmd := &cobra.Command{
Use: "kubectl",
}
kubeConfigFlags := genericclioptions.NewConfigFlags(true).WithDeprecatedPasswordFlag().WithDiscoveryBurst(300).WithDiscoveryQPS(50.0)
kubeConfigFlags.AddFlags(rootCmd.PersistentFlags())
applyCmd := NewCmdApply("kubectl", tf, ioStreams)
rootCmd.AddCommand(applyCmd)
rootCmd.SetArgs([]string{
"apply",
"--filename=" + filepath.Join(testdir, manifest+".yaml"),
"--applyset=" + filepath.Base(testdir),
"--namespace=default",
"--prune=true",
"--validate=false",
})
if err := rootCmd.Execute(); err != nil {
t.Errorf("error running apply command: %v", err)
}
got := unifiedOutput.String()
p := filepath.Join(testdir, "scenarios", "error-on-apply", manifest+"-expected-apply.txt")
wantBytes, err := os.ReadFile(p)
if err != nil {
t.Fatalf("error reading file %q: %v", p, err)
}
want := string(wantBytes)
if diff := cmp.Diff(want, got); diff != "" {
t.Errorf("apply output has unexpected diff (-want +got):\n%s", diff)
}
}
})
})
}
}
// fatalNoExit is a handler that replaces the default cmdutil.BehaviorOnFatal,
// that still prints as expected, but does not call os.Exit (which terminates our tests)
func fatalNoExit(t *testing.T, ioStreams genericclioptions.IOStreams) func(msg string, code int) {
return func(msg string, code int) {
if len(msg) > 0 {
// add newline if needed
if !strings.HasSuffix(msg, "\n") {
msg += "\n"
}
fmt.Fprint(ioStreams.ErrOut, msg)
}
}
}

View File

@ -232,6 +232,31 @@ func (a *ApplySet) FetchParent() error {
}
return nil
}
func (a *ApplySet) LabelSelectorForMembers() string {
return metav1.FormatLabelSelector(&metav1.LabelSelector{
MatchLabels: a.LabelsForMember(),
})
}
// AllPrunableResources returns the list of all resources that should be considered for pruning.
// This is potentially a superset of the resources types that actually contain resources.
func (a *ApplySet) AllPrunableResources() []*meta.RESTMapping {
var ret []*meta.RESTMapping
for _, m := range a.currentResources {
ret = append(ret, m)
}
return ret
}
// AllPrunableNamespaces returns the list of all namespaces that should be considered for pruning.
// This is potentially a superset of the namespaces that actually contain resources.
func (a *ApplySet) AllPrunableNamespaces() []string {
var ret []string
for ns := range a.currentNamespaces {
ret = append(ret, ns)
}
return ret
}
func getLabelsAndAnnotations(obj runtime.Object) (map[string]string, map[string]string, error) {
accessor, err := meta.Accessor(obj)
@ -277,6 +302,10 @@ func parseNamespacesAnnotation(annotations map[string]string) sets.Set[string] {
if !ok { // this annotation is completely optional
return sets.Set[string]{}
}
// Don't include an empty namespace
if annotation == "" {
return sets.Set[string]{}
}
return sets.New(strings.Split(annotation, ",")...)
}

View File

@ -19,15 +19,113 @@ package apply
import (
"context"
"fmt"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/cli-runtime/pkg/genericclioptions"
"k8s.io/cli-runtime/pkg/printers"
"k8s.io/client-go/dynamic"
"k8s.io/klog/v2"
cmdutil "k8s.io/kubectl/pkg/cmd/util"
)
type applySetPruner struct {
dynamicClient dynamic.Interface
visitedUids sets.Set[types.UID]
cascadingStrategy metav1.DeletionPropagation
dryRunStrategy cmdutil.DryRunStrategy
gracePeriod int
printer printers.ResourcePrinter
ioStreams genericclioptions.IOStreams
}
func newApplySetPruner(_ *ApplyOptions) *applySetPruner {
return &applySetPruner{}
func newApplySetPruner(o *ApplyOptions) (*applySetPruner, error) {
printer, err := o.ToPrinter("pruned")
if err != nil {
return nil, err
}
return &applySetPruner{
dynamicClient: o.DynamicClient,
visitedUids: o.VisitedUids,
cascadingStrategy: o.DeleteOptions.CascadingStrategy,
dryRunStrategy: o.DryRunStrategy,
gracePeriod: o.DeleteOptions.GracePeriod,
printer: printer,
ioStreams: o.IOStreams,
}, nil
}
func (p *applySetPruner) pruneAll(context.Context, *ApplySet) error {
return fmt.Errorf("ApplySet-based pruning is not yet implemented")
func (p *applySetPruner) pruneAll(ctx context.Context, applyset *ApplySet) error {
// TODO: Split into discovery and deletion, run discovery in parallel (and maybe in consistent order or in parallel?)
for _, restMapping := range applyset.AllPrunableResources() {
switch restMapping.Scope.Name() {
case meta.RESTScopeNameNamespace:
for _, namespace := range applyset.AllPrunableNamespaces() {
if namespace == "" {
// Just double-check because otherwise we get cryptic error messages
return fmt.Errorf("unexpectedly encountered empty namespace during prune of namespace-scoped resource %v", restMapping.GroupVersionKind)
}
if err := p.prune(ctx, namespace, restMapping, applyset); err != nil {
return fmt.Errorf("pruning %v objects: %w", restMapping.GroupVersionKind.String(), err)
}
}
case meta.RESTScopeNameRoot:
if err := p.prune(ctx, metav1.NamespaceNone, restMapping, applyset); err != nil {
return fmt.Errorf("pruning %v objects: %w", restMapping.GroupVersionKind.String(), err)
}
default:
return fmt.Errorf("unhandled scope %q", restMapping.Scope.Name())
}
}
return nil
}
func (p *applySetPruner) prune(ctx context.Context, namespace string, mapping *meta.RESTMapping, applyset *ApplySet) error {
applysetLabelSelector := applyset.LabelSelectorForMembers()
opt := metav1.ListOptions{
LabelSelector: applysetLabelSelector,
}
klog.V(2).Infof("listing objects for pruning; namespace=%q, resource=%v", namespace, mapping.Resource)
objects, err := p.dynamicClient.Resource(mapping.Resource).Namespace(namespace).List(ctx, opt)
if err != nil {
return err
}
for i := range objects.Items {
obj := &objects.Items[i]
uid := obj.GetUID()
if p.visitedUids.Has(uid) {
continue
}
name := obj.GetName()
if p.dryRunStrategy != cmdutil.DryRunClient {
if err := p.delete(ctx, namespace, name, mapping); err != nil {
return fmt.Errorf("deleting %s/%s: %w", namespace, name, err)
}
}
p.printer.PrintObj(obj, p.ioStreams.Out)
}
return nil
}
func (p *applySetPruner) delete(ctx context.Context, namespace string, name string, mapping *meta.RESTMapping) error {
return runDelete(ctx, namespace, name, mapping, p.dynamicClient, p.cascadingStrategy, p.gracePeriod, p.dryRunStrategy == cmdutil.DryRunServer)
}

View File

@ -140,15 +140,16 @@ func (p *pruner) prune(namespace string, mapping *meta.RESTMapping) error {
}
func (p *pruner) delete(namespace, name string, mapping *meta.RESTMapping) error {
return runDelete(namespace, name, mapping, p.dynamicClient, p.cascadingStrategy, p.gracePeriod, p.dryRunStrategy == cmdutil.DryRunServer)
ctx := context.TODO()
return runDelete(ctx, namespace, name, mapping, p.dynamicClient, p.cascadingStrategy, p.gracePeriod, p.dryRunStrategy == cmdutil.DryRunServer)
}
func runDelete(namespace, name string, mapping *meta.RESTMapping, c dynamic.Interface, cascadingStrategy metav1.DeletionPropagation, gracePeriod int, serverDryRun bool) error {
func runDelete(ctx context.Context, namespace, name string, mapping *meta.RESTMapping, c dynamic.Interface, cascadingStrategy metav1.DeletionPropagation, gracePeriod int, serverDryRun bool) error {
options := asDeleteOptions(cascadingStrategy, gracePeriod)
if serverDryRun {
options.DryRun = []string{metav1.DryRunAll}
}
return c.Resource(mapping.Resource).Namespace(namespace).Delete(context.TODO(), name, options)
return c.Resource(mapping.Resource).Namespace(namespace).Delete(ctx, name, options)
}
func asDeleteOptions(cascadingStrategy metav1.DeletionPropagation, gracePeriod int) metav1.DeleteOptions {

View File

@ -0,0 +1,2 @@
namespace/foo created
namespace/bar created

View File

@ -0,0 +1,2 @@
namespace/foo unchanged
namespace/bar pruned

View File

@ -0,0 +1,6 @@
apiVersion: v1
kind: Namespace
metadata:
labels:
applyset.k8s.io/part-of: placeholder-todo
name: foo

View File

@ -0,0 +1,4 @@
apiVersion: v1
kind: Namespace
metadata:
name: foo

View File

@ -0,0 +1,2 @@
namespace/foo created
namespace/bar created

View File

@ -0,0 +1,2 @@
namespace/foo unchanged
error: pruning /v1, Kind=Namespace objects: deleting /bar: an error on the server ("") has prevented the request from succeeding